【go学习】goroutine并发学习总结

go最大的特性就是并发了,所以这一块是go的重点知识,我自己花了一些时间,好好学习这个基础知识。

声明

文章内容为个人学习理解,所以文章如果有不对之处,非常感谢指出。

goroutine

说起go并发,一般都会指go协程,即goroutine,而实现goroutin的关键字就是 go。 我学习go并发用到的关键字总结

  • go //异步执行
  • chan // make channel 用于各协程间通信 (重点)
    • ch := make(chan 数据类型, 缓冲区容量) // 格式
  • sync.Mutex // 互斥锁
    • m.Lock()// 实例化的sync.Mutex对象 即var m sync.Mutex
    • m.Unlock()
  • sync.Once // 只执行一次函数
    • var once sync.Once once.DO(func(){})
  • sync.WaitGroup // 等待所有协程执行完
    • wg.Add(协程数量) // 添加执行的协程数量
    • wg.Done() // 每执行完一个单元协程就执行一次Done 把上面添加的数量 -1
    • wg.Wait() // 等待所有的协程执行完
  • runtime.NumGoroutine() // 查看当前运行的协程数量

最基础的一个demo

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("Hello World")
        // 异步执行协程函数
    go func() {
        fmt.Println("goroutine 执行了")
    }()
        // sleep 1 秒的目的是为了 等待协执行完看到结果 不至于 主程序执行完了 协程还没结束
    time.Sleep(1*time.Second)
}

channel 协程间数据通信

go里面协程间可以使用channel进行通信,不推荐直接使用变量交互数据,避免数据操作混乱。 协程数据相当于一个 管道队列机构 先进先出

无缓冲区

无缓冲区时,必须同时读取才行,否则会报错 fatal error: all goroutines are asleep - deadlock!

错误的例子

func main(){
       // 定义一个int类型的channel  channel 只能使用make创建
    c1 := make(chan int)
       // 无缓冲区 单独存 或者取 或者 先后执行存取都会报错
    // c1 <- 1
    n := <- c1
    fmt.Println(n)
}

正确的例子

func main(){
        c1 := make(chan int)
    go func() {
        c1 <- 1
    }()
    n := <- c1
       // 输出1
    fmt.Println(n)
}

补充说明一下,当缓冲区还没有数据进来时,读取操作会被阻塞, 比如以下例子

func main(){
    c1 := make(chan int)
    go func() {
                // sleep 5秒
        time.Sleep(5*time.Second)
        c1 <- 1
    }()
       // 先输出 ----
    fmt.Println("-----")
    n := <- c1
        // 等待5秒缓冲区有数据了  程序才会接着往下执行
    fmt.Println("+++++")
    fmt.Println(n)
}

有缓冲区

func main(){
      // 声明一个channel Buffer为3 表示可以存储 3个 int 数据
      c := make(chan int, 3)
      c <- 1
      c <- 2
      c <- 8
      // 取出一个数据(可以选择不接收) 队列顺序
      <-c
      // 查看channel的 Buffer 容量
      t.Log(cap(c))
      // 也可以多次取值
      x, y := <-c, <-c
      t.Log(fmt.Sprintf("%d + %d = %d", x, y, x+y))
}

上面的例子中,一个个的写入和读取channel中的数据,特别麻烦,于是就有循环写入和读取。需要注意的是使用range循环读取时,channel必须得用close来结束, 否则就回因为range无法判断是否结束,而导致异常

func main(){
      // 声明一个 string chan
      c := make(chan string, 3)

      // 协程异步调用 添加到chan
      go func(n int, c chan string) {
          for i := 0; i < n; i++ {
                // 循环添加到
                c <- fmt.Sprintf("--- %d ---", i)
          }
          // 结束添加
          close(c)
      }(cap(c), c)

      // 循环取出
      for v := range c {
          t.Log(v)
      }
}

多个channnel操作

使用sync.WaitGroup等待多个goroutine同时执行完,使用selectcase随机执行返回值

// 两个协程任务分别往不同的channel存入数据
func AsyncCh1(n int, c chan string, wg *sync.WaitGroup) {
    for i := 0; i < n; i++ {
        c <- fmt.Sprintf("++ %d ++", i)
    }
       // Done() 结束时会把之前添加的协程数量减一
    wg.Done()
}

func AsyncCh2(n int, c chan string, wg *sync.WaitGroup) {
    for i := n; i > 0; i-- {
        c <- fmt.Sprintf("-- %d --", i)
    }
    wg.Done()
}

func main() {
    var wg sync.WaitGroup

    ch1 := make(chan string, 5)
    ch2 := make(chan string, 5)

       // 添加协程执行数量 
    wg.Add(2)
    go AsyncCh1(cap(ch1), ch1, &wg)
    //wg.Add(1)
    go AsyncCh2(cap(ch2), ch2, &wg)

    // 等待协程全部执行完
    wg.Wait()

    for i := 0; i < 10; i++ {
                // 随机从任意的channel中取值 如果有就回立即返回
        select {
        case ret1 := <-ch1:
            t.Log(ret1)
        case ret2 := <-ch2:
            t.Log(ret2)
                // 设置的超时  如果任意的协程超过100毫秒就回报错
        case <-time.After(time.Millisecond * 100):
            t.Error("time out")
        }
    }
}

模拟实现超时操作,把上面的例子改造一下,然后添加超时操作。

func AsyncCh1(n int, c chan string) {
       // 添加3秒sleep 
    time.Sleep(3*time.Second)
    for i := 0; i < n; i++ {
        c <- fmt.Sprintf("++ %d ++", i)
    }
       // 去掉等待结束完成 
    //wg.Done()
}

func AsyncCh2(n int, c chan string) {
    time.Sleep(3*time.Second)
    for i := n; i > 0; i-- {
        c <- fmt.Sprintf("-- %d --", i)
    }
}

func TestSelect(t *testing.T) {
        // 去掉等待
    //var wg sync.WaitGroup

    ch1 := make(chan string, 5)
    ch2 := make(chan string, 5)

    go AsyncCh1(cap(ch1), ch1)
    go AsyncCh2(cap(ch2), ch2)

    for i := 0; i < 10; i++ {
        select {
        case ret1 := <-ch1:
            t.Log(ret1)
        case ret2 := <-ch2:
            t.Log(ret2)
                // 添加1秒超时检测  结果就是前两个数据会打印time out
        case <-time.After(time.Second * 1):
            t.Error("time out")
        }
    }
}

模拟生产和消费

如果是使用了close关闭chan,那么channel取值其实是有两个返回值的,相当于close发出了一个信号

v, ok := <-c;if ok {
    fmt.Println(fmt.Sprintf("有数据  %d", v))
} else {
    fmt.Println("没有数据")
}

如下完整例子

// 不断的生产
func Producer1(c chan int, wg *sync.WaitGroup) {

    for i := 0; i <= 10; i++ {
        fmt.Println(fmt.Sprintf("生产了++++++ %d", i))
        time.Sleep(time.Millisecond * 100)
        c <- i
    }

    // 关闭channel
    close(c)
    //c <- 22  // 关闭后就不能发了  panic: send on closed channel

    wg.Done()
}

// 不断的从chanel里面拿
func Consumer(c chan int, wg *sync.WaitGroup) {

    for {
        //time.Sleep(time.Millisecond*800)
        // 判断生产者是否已经停止了
        v, ok := <-c
        if ok {
            fmt.Println(fmt.Sprintf("-------消费了 %d", v))
        } else {
            fmt.Println("结束")
            break
        }
    }
    wg.Done()

}

func main() {
    var wg sync.WaitGroup
    c := make(chan int, 20)
    wg.Add(2)
    go Producer1(c, &wg)
    go Consumer(c, &wg)
    wg.Wait()

}

仅读和仅写的channel

func Producer5(writeC chan<- int) {
    for i := 0; i < 10; i++ {
        fmt.Printf("生产+++%d\n", i)
        writeC <- i
    }
}
func Consumer5(redC <-chan int) {
    for i := 0; i < 10; i++ {
        fmt.Printf("-----------------消费 %d \n", <-redC)
    }
}

func main() {

    c := make(chan int, 15)

    // 只读
    var redC <-chan int = c
    // 只写
    var writeC chan<- int = c

    // 生产
    go Producer5(writeC)
    // 消费
    Consumer5(redC)

}

只执行一次的方法 sync.Once


var once sync.Once

func NormalFunc(i int) {
    timeStr := time.Now().Format("2006-01-02 15:04:05")

    fmt.Printf(" %d 测试函数 %s \n", i, timeStr)
}

func SingleFunc(i int) {
    fmt.Printf("单例测试函数执行++ %d \n", i)

    once.Do(func() {
        // 这里面只执行一次
        timeStr := time.Now().Format("2006-01-02 15:04:05")

        fmt.Printf("%d------单例子测试函数 只执行一次 %s \n", i, timeStr)
    })
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func(i int) {
            NormalFunc(i)
            SingleFunc(i)

            wg.Done()
        }(i)
    }
    wg.Wait()
}

利用channel构建对象池

// 新建一个空结构体 相当于对象
type Tool struct {
    name string
}

// 对象池 用于存储 Tool对象
type ToolsBox struct {
    // 属性是一个 channel 内容是 Tool 结构体指针
    bufChan chan *Tool
}

// 获取工具 给结构体绑定方法
func (p *ToolsBox) GetTool(timeout time.Duration) (*Tool, error) {
    select {
    case tool := <-p.bufChan:
        return tool, nil
    case <-time.After(timeout): //超时控制
        return nil, errors.New("time out")
    }
}

// 用完归还(释放)
func (p *ToolsBox) ReleaseTool(tool *Tool) error {
    select {
    case p.bufChan <- tool:
        return nil
    default:
        return errors.New("overflow")
    }
}

// new一个 ToolBox对象
func NewToolsBox(poolNum int) *ToolsBox {
    objPool := ToolsBox{}
    objPool.bufChan = make(chan *Tool, poolNum)

    for i := 0; i < poolNum; i++ {

        // 生成一个 工具结构体
        tool := &Tool{fmt.Sprintf("🔧--%d", i)}
        // 存入对象池
        objPool.bufChan <- tool
    }

    return &objPool
}

func main() {

    pool := NewToolsBox(5)

    for i := 0; i < 8; i++ {
        tool, err := pool.GetTool(time.Second * 1)

        if err != nil {
            t.Log(fmt.Sprintf("---取出有问题 %s 当前容量%d", tool, len(pool.bufChan)))
        } else {
            // 取出没问题
            t.Log(fmt.Sprintf("----取出一个 %s 当前容量%d", tool, len(pool.bufChan)))

            // 接着就释放 和判断写在一起
            if err := pool.ReleaseTool(tool); err != nil {
                t.Log("释放有问题")
            } else {
                t.Log(fmt.Sprintf("释放一个 +++ %s 当前容量%d", tool, len(pool.bufChan)))
            }
        }

    }

    t.Log("结束")
}

代码地址

github


文章作者: 王小右
版权声明: 咳咳想白嫖文章?本文章著作权归作者所有,任何形式的转载都请注明出处。 https://www.charmcode.cn !
还能输入 100 字
  目录