目录
  • 一、Channel并发控制
    • 1.1 channel切片控制携程执行
    • 1.2 channel控制并发数量
  • 二、WaitGroup并发控制
    • 2.1 WaitGroup 控制协程并行
    • 2.2 WaitGroup封装通用函数
  • 三、Context
    • 3.1 Context定义的接口
    • 3.2 Context控制协程结束
  • 四、 ErrorGroup
    • 五、通用协程控制工具封装

      一、Channel并发控制

      1.1 channel切片控制携程执行

      通过创建一个切片channel 控制多个携程地并发执行,并收集携程执行获取的数据及错误信息

      type ResultDto struct {
          Err  error
          Data interface{}
      }
      ​
      func main() {
          channel := make([]chan *ResultDto, 10) 
          for i := 0; i < 10; i++ {
              channel[i] = make(chan *ResultDto)
              temp := i
              go Process(temp, channel[i])
          }
      ​
          for _, ch := range channel {
              fmt.Println(<-ch)
          }
      }
      ​
      func Process(i int, ch chan *ResultDto) {
          // Do some work...
          if i == 1 {
              ch <- &ResultDto{Err: errors.New("do work err")}
          } else {
              ch <- &ResultDto{Data: i}
          }
      }

      1.2 channel控制并发数量

      通过带缓冲区的channel控制并发执行携程的数量 , 注意这里需要配合 sync.WaitGroup 一起使用,不然当执行到i为7 8 9 时,子携程还没有执行完,主携程就退出了

      func main() {
          wg := &sync.WaitGroup{}
          ch := make(chan struct{}, 3)
          
          for i := 0; i < 10; i++ {
              ch <- struct{}{}
              wg.Add(1)
              
              // 执行携程
              temp := i
              go Process(wg, temp, ch)
              
          }
          
          wg.Wait()
      }
      ​
      func Process(wg *sync.WaitGroup, i int, ch chan struct{}) {
          defer func() {
              <-ch
              wg.Done()
          }()
          
          // Do some work...
          time.Sleep(1 * time.Second)
          fmt.Println(i)
      }

      二、WaitGroup并发控制

      2.1 WaitGroup 控制协程并行

      WaitGroup是Golang应用开发过程中经常使用的并发控制技术。

      WaitGroup,可理解为Wait-Goroutine-Group,即等待一组goroutine结束。比如某个goroutine需要等待其他几个goroutine全部完成,那么使用WaitGroup可以轻松实现。

      func main() {
          wg := &sync.WaitGroup{}
          for i := 0; i < 10; i++ {
              wg.Add(1)
              temp := i
              go Process(wg, temp)
          }
          wg.Wait()
      }
      ​
      func Process(wg *sync.WaitGroup, i int) {
          defer func() {
              wg.Done()
          }()
          // Do some work...
          time.Sleep(1 * time.Second)
          fmt.Println(i)
      }
      

      简单的说,上面程序中wg内部维护了一个计数器:

      • 启动goroutine前将计数器通过Add(2)将计数器设置为待启动的goroutine个数。
      • 启动goroutine后,使用Wait()方法阻塞自己,等待计数器变为0。
      • 每个goroutine执行结束通过Done()方法将计数器减1。
      • 计数器变为0后,阻塞的goroutine被唤醒。

      2.2 WaitGroup封装通用函数

      waitGroup控制并发执行,limit 并发上限,收集错误返回

      func main() {
          funcList := []ExeFunc{
              func(ctx context.Context) error {
                  fmt.Println("5 开始")
                  time.Sleep(5 * time.Second)
                  fmt.Println("5 结束")
                  return nil
              },
              func(ctx context.Context) error {
                  fmt.Println("3 开始")
                  time.Sleep(3 * time.Second)
                  fmt.Println("3 结束")
                  return nil
              },
          }
          err := GoExeAll(context.Background(), 2,  funcList...)
          if err != nil {
              fmt.Println(err)
          }
      }
      ​
      type ExeFunc func(ctx context.Context) error
      ​
      // GoExeAll 并发执行所有,limit 为并发上限,收集所有错误返回
      func GoExeAll(ctx context.Context, limit int, fs ...ExeFunc) (errs []error) {
          wg := &sync.WaitGroup{}
          ch := make(chan struct{}, limit)
          errCh := make(chan error, len(fs))
          for _, f := range fs {
              fTmp := f
              wg.Add(1)
              ch <- struct{}{}
              go func() {
                  defer func() {
                      if panicErr := recover(); panicErr != nil {
                          errCh <- errors.New("execution panic:" + fmt.Sprintf("%v", panicErr))
                      }
                      wg.Done()
                      <-ch
                  }()
                  if err := fTmp(ctx); err != nil {
                      errCh <- err
                  }
              }()
          }
          wg.Wait()
          close(errCh)
          close(ch)
          for chErr := range errCh {
              errs = append(errs, chErr)
          }
          return
      }

      三、Context

      Golang context是Golang应用开发常用的并发控制技术,它与WaitGroup最大的不同点是context对于派生goroutine有更强的控制力,它可以控制多级的goroutine。

      3.1 Context定义的接口

      context实际上只定义了接口,凡是实现该接口的类都可称为是一种context,官方包中实现了几个常用的context,分别可用于不同的场景。

      type Context interface {
          Deadline() (deadline time.Time, ok bool)
      ​
          Done() <-chan struct{}
      ​
          Err() error
      ​
          Value(key interface{}) interface{}
      }
      

      Deadline()

      该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok == false,此时deadline为一个初始值的time.Time值

      Done()

      该方法返回一个channel,需要在select-case语句中使用,如”case <-context.Done():”。

      当context关闭后,Done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;当context还未关闭时,Done()返回nil。

      Err()

      该方法描述context关闭的原因。关闭原因由context实现控制,不需要用户设置。比如Deadline context,关闭原因可能是因为deadline,也可能提前被主动关闭,那么关闭原因就会不同:

      Value()

      有一种context,它不是用于控制呈树状分布的goroutine,而是用于在树状分布的goroutine间传递信息

      3.2 Context控制协程结束

      func main() {
          wg := &sync.WaitGroup{}
          ctx, cancelFunc := context.WithCancel(context.Background())
          for i := 0; i < 10; i++ {
              wg.Add(1)
              temp := i
              go Process(ctx, wg, temp)
          }
          time.Sleep(5 * time.Second)
          cancelFunc()
          wg.Wait()
      }
      ​
      func Process(ctx context.Context, wg *sync.WaitGroup, i int) {
          defer wg.Done()
          ch := make(chan error)
          go DoWork(ctx, ch, i)
          select {
          case <-ctx.Done():
              fmt.Println("cancelFunc")
              return
          case <-ch:
              return
          }
      }
      ​
      func DoWork(ctx context.Context, ch chan error, i int) {
          defer func() {
              ch <- nil
          }()
          time.Sleep(time.Duration(i) * time.Second)
          fmt.Println(i)
      }
      

      四、 ErrorGroup

      可采用第三方库golang.org/x/sync/errgroup堆多个协助并发执行进行控制

      4.1 errorGroup并发执行,limit 为并发上限,timeout超时

      func main() {
          funcList := []ExeFunc{
              func(ctx context.Context) error {
                  fmt.Println("5 开始")
                  time.Sleep(5 * time.Second)
                  fmt.Println("5 结束")
                  return nil
              },
              func(ctx context.Context) error {
                  fmt.Println("3 开始")
                  time.Sleep(3 * time.Second)
                  fmt.Println("3 结束")
                  return nil
              },
          }
      ​
          err := GoExe(context.Background(), 2, 10*time.Second, funcList...)
          if err != nil {
              fmt.Println(err)
          }
      }
      ​
      type ExeFunc func(ctx context.Context) error
      ​
      // GoExe 并发执行,limit 为并发上限,其中任意一个报错,其他中断,timeout为0不超时
      func GoExe(ctx context.Context, limit int, timeout time.Duration, fs ...ExeFunc) error {
          eg, ctx := errgroup.WithContext(ctx)
          eg.SetLimit(limit)
          var timeCh <-chan time.Time
          if timeout > 0 {
              timeCh = time.After(timeout)
          }
          for _, f := range fs {
              fTmp := f
              eg.Go(func() (err error) {
                  ch := make(chan error)
                  defer close(ch)
                  go DoWorkFunc(ctx, ch, fTmp)
                  select {
                  case <-ctx.Done():
                      return ctx.Err()
                  case <-timeCh:
                      return errors.New("execution timeout")
                  case err = <-ch:
                      return err
                  }
              })
          }
          if err := eg.Wait(); err != nil {
              return err
          }
          return nil
      }
      ​
      func DoWorkFunc(ctx context.Context, ch chan error, fs ExeFunc) {
          var err error
          defer func() {
              if panicErr := recover(); panicErr != nil {
                  err = errors.New("execution panic:" + fmt.Sprintf("%v", panicErr))
              }
              ch <- err
          }()
          err = fs(ctx)
          return
      }

      五、通用协程控制工具封装

      import (
          "context"
          "errors"
          "fmt"
          "golang.org/x/sync/errgroup"
          "sync"
          "time"
      )
      ​
      ​
      // ExeFunc 要被执行的函数或方法
      type ExeFunc func(ctx context.Context) error
      ​
      // SeqExe 顺序执行,遇到错误就返回
      func SeqExe(ctx context.Context, fs ...ExeFunc) error {
          for _, f := range fs {
              if err := f(ctx); err != nil {
                  return err
              }
          }
          return nil
      }
      ​
      // GoExe 并发执行,limit 为并发上限,其中任意一个报错,其他中断,timeout为0不超时
      func GoExe(ctx context.Context, limit int, timeout time.Duration, fs ...ExeFunc) error {
          eg, ctx := errgroup.WithContext(ctx)
          eg.SetLimit(limit)
          var timeCh <-chan time.Time
          if timeout > 0 {
              timeCh = time.After(timeout)
          }
          for _, f := range fs {
              fTmp := f
              eg.Go(func() (err error) {
                  ch := make(chan error)
                  defer close(ch)
                  go DoWorkFunc(ctx, ch, fTmp)
                  select {
                  case <-ctx.Done():
                      return ctx.Err()
                  case <-timeCh:
                      return errors.New("execution timeout")
                  case err = <-ch:
                      return err
                  }
              })
          }
          if err := eg.Wait(); err != nil {
              return err
          }
          return nil
      }
      ​
      func DoWorkFunc(ctx context.Context, ch chan error, fs ExeFunc) {
          var err error
          defer func() {
              if panicErr := recover(); panicErr != nil {
                  err = errors.New("execution panic:" + fmt.Sprintf("%v", panicErr))
              }
              ch <- err
          }()
          err = fs(ctx)
          return
      }
      ​
      // SeqExeAll 顺序执行所有,收集所有错误返回
      func SeqExeAll(ctx context.Context, fs ...ExeFunc) (errs []error) {
          for _, f := range fs {
              if err := f(ctx); err != nil {
                  errs = append(errs, err)
              }
          }
          return errs
      }
      ​
      // GoExeAll 并发执行所有,limit 为并发上限,收集所有错误返回
      func GoExeAll(ctx context.Context, limit int, fs ...ExeFunc) (errs []error) {
          wg := &sync.WaitGroup{}
          ch := make(chan struct{}, limit)
          errCh := make(chan error, len(fs))
          for _, f := range fs {
              fTmp := f
              wg.Add(1)
              ch <- struct{}{}
              go func() {
                  defer func() {
                      if panicErr := recover(); panicErr != nil {
                          errCh <- errors.New("execution panic:" + fmt.Sprintf("%v", panicErr))
                      }
                      wg.Done()
                      <-ch
                  }()
                  if err := fTmp(ctx); err != nil {
                      errCh <- err
                  }
              }()
          }
          wg.Wait()
          close(errCh)
          close(ch)
          for chErr := range errCh {
              errs = append(errs, chErr)
          }
          return
      }

      以上就是Go语言实现并发控制的常见方式详解的详细内容,更多关于Go并发控制的资料请关注本网站其它相关文章!

      您可能感兴趣的文章:

      • golang实现并发控制的方法和技巧
      • Golang中常见的三种并发控制方式使用小结
      • Go中并发控制的实现方式总结
      • Golang并发控制的三种实现方法
      • Go并发控制Channel使用场景分析