errgroup.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package errgroup provides synchronization, error propagation, and Context
  5. // cancelation for groups of goroutines working on subtasks of a common task.
  6. package errgroup
  7. import (
  8. "context"
  9. "fmt"
  10. "runtime"
  11. "sync"
  12. )
  13. // A Group is a collection of goroutines working on subtasks that are part of
  14. // the same overall task.
  15. //
  16. // A zero Group is valid and does not cancel on error.
  17. type Group struct {
  18. err error
  19. wg sync.WaitGroup
  20. errOnce sync.Once
  21. workerOnce sync.Once
  22. ch chan func() error
  23. chs []func() error
  24. cancel func()
  25. }
  26. // WithContext returns a new Group and an associated Context derived from ctx.
  27. //
  28. // The derived Context is canceled the first time a function passed to Go
  29. // returns a non-nil error or the first time Wait returns, whichever occurs
  30. // first.
  31. func WithContext(ctx context.Context) (*Group, context.Context) {
  32. ctx, cancel := context.WithCancel(ctx)
  33. return &Group{cancel: cancel}, ctx
  34. }
  35. func (g *Group) do(f func() error) {
  36. var err error
  37. defer func() {
  38. if r := recover(); r != nil {
  39. buf := make([]byte, 64<<10)
  40. buf = buf[:runtime.Stack(buf, false)]
  41. err = fmt.Errorf("errgroup: panic recovered: %s\n%s", r, buf)
  42. }
  43. if err != nil {
  44. g.errOnce.Do(func() {
  45. g.err = err
  46. if g.cancel != nil {
  47. g.cancel()
  48. }
  49. })
  50. }
  51. g.wg.Done()
  52. }()
  53. err = f()
  54. }
  55. // GOMAXPROCS set max goroutine to work.
  56. func (g *Group) GOMAXPROCS(n int) {
  57. if n <= 0 {
  58. panic("errgroup: GOMAXPROCS must great than 0")
  59. }
  60. g.workerOnce.Do(func() {
  61. g.ch = make(chan func() error, n)
  62. for i := 0; i < n; i++ {
  63. go func() {
  64. for f := range g.ch {
  65. g.do(f)
  66. }
  67. }()
  68. }
  69. })
  70. }
  71. // Go calls the given function in a new goroutine.
  72. //
  73. // The first call to return a non-nil error cancels the group; its error will be
  74. // returned by Wait.
  75. func (g *Group) Go(f func() error) {
  76. g.wg.Add(1)
  77. if g.ch != nil {
  78. select {
  79. case g.ch <- f:
  80. default:
  81. g.chs = append(g.chs, f)
  82. }
  83. return
  84. }
  85. go g.do(f)
  86. }
  87. // Wait blocks until all function calls from the Go method have returned, then
  88. // returns the first non-nil error (if any) from them.
  89. func (g *Group) Wait() error {
  90. if g.ch != nil {
  91. for _, f := range g.chs {
  92. g.ch <- f
  93. }
  94. }
  95. g.wg.Wait()
  96. if g.ch != nil {
  97. close(g.ch) // let all receiver exit
  98. }
  99. if g.cancel != nil {
  100. g.cancel()
  101. }
  102. return g.err
  103. }