errgroup.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package errgroup provides synchronization, error propagation, and Context
  5. // cancelation for groups of goroutines working on subtasks of a common task.
  6. package errgroup
  7. import (
  8. "context"
  9. "fmt"
  10. "runtime"
  11. "sync"
  12. )
  13. // A Group is a collection of goroutines working on subtasks that are part of
  14. // the same overall task.
  15. //
  16. // A zero Group is valid and does not cancel on error.
  17. type Group struct {
  18. err error
  19. wg sync.WaitGroup
  20. errOnce sync.Once
  21. workerOnce sync.Once
  22. ch chan func(ctx context.Context) error
  23. chs []func(ctx context.Context) error
  24. ctx context.Context
  25. cancel func()
  26. }
  27. // WithContext create a Group.
  28. // given function from Go will receive this context,
  29. func WithContext(ctx context.Context) *Group {
  30. return &Group{ctx: ctx}
  31. }
  32. // WithCancel create a new Group and an associated Context derived from ctx.
  33. //
  34. // given function from Go will receive context derived from this ctx,
  35. // The derived Context is canceled the first time a function passed to Go
  36. // returns a non-nil error or the first time Wait returns, whichever occurs
  37. // first.
  38. func WithCancel(ctx context.Context) *Group {
  39. ctx, cancel := context.WithCancel(ctx)
  40. return &Group{ctx: ctx, cancel: cancel}
  41. }
  42. func (g *Group) do(f func(ctx context.Context) error) {
  43. ctx := g.ctx
  44. if ctx == nil {
  45. ctx = context.Background()
  46. }
  47. var err error
  48. defer func() {
  49. if r := recover(); r != nil {
  50. buf := make([]byte, 64<<10)
  51. buf = buf[:runtime.Stack(buf, false)]
  52. err = fmt.Errorf("errgroup: panic recovered: %s\n%s", r, buf)
  53. }
  54. if err != nil {
  55. g.errOnce.Do(func() {
  56. g.err = err
  57. if g.cancel != nil {
  58. g.cancel()
  59. }
  60. })
  61. }
  62. g.wg.Done()
  63. }()
  64. err = f(ctx)
  65. }
  66. // GOMAXPROCS set max goroutine to work.
  67. func (g *Group) GOMAXPROCS(n int) {
  68. if n <= 0 {
  69. panic("errgroup: GOMAXPROCS must great than 0")
  70. }
  71. g.workerOnce.Do(func() {
  72. g.ch = make(chan func(context.Context) error, n)
  73. for i := 0; i < n; i++ {
  74. go func() {
  75. for f := range g.ch {
  76. g.do(f)
  77. }
  78. }()
  79. }
  80. })
  81. }
  82. // Go calls the given function in a new goroutine.
  83. //
  84. // The first call to return a non-nil error cancels the group; its error will be
  85. // returned by Wait.
  86. func (g *Group) Go(f func(ctx context.Context) error) {
  87. g.wg.Add(1)
  88. if g.ch != nil {
  89. select {
  90. case g.ch <- f:
  91. default:
  92. g.chs = append(g.chs, f)
  93. }
  94. return
  95. }
  96. go g.do(f)
  97. }
  98. // Wait blocks until all function calls from the Go method have returned, then
  99. // returns the first non-nil error (if any) from them.
  100. func (g *Group) Wait() error {
  101. if g.ch != nil {
  102. for _, f := range g.chs {
  103. g.ch <- f
  104. }
  105. }
  106. g.wg.Wait()
  107. if g.ch != nil {
  108. close(g.ch) // let all receiver exit
  109. }
  110. if g.cancel != nil {
  111. g.cancel()
  112. }
  113. return g.err
  114. }