pipeline_test.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package pipeline
  2. import (
  3. "context"
  4. "reflect"
  5. "strconv"
  6. "testing"
  7. "time"
  8. "go-common/library/net/metadata"
  9. xtime "go-common/library/time"
  10. )
  11. func TestPipeline(t *testing.T) {
  12. conf := &Config{
  13. MaxSize: 3,
  14. Interval: xtime.Duration(time.Millisecond * 20),
  15. Buffer: 3,
  16. Worker: 10,
  17. }
  18. type recv struct {
  19. mirror bool
  20. ch int
  21. values map[string][]interface{}
  22. }
  23. var runs []recv
  24. do := func(c context.Context, ch int, values map[string][]interface{}) {
  25. runs = append(runs, recv{
  26. mirror: metadata.Bool(c, metadata.Mirror),
  27. values: values,
  28. ch: ch,
  29. })
  30. }
  31. split := func(s string) int {
  32. n, _ := strconv.Atoi(s)
  33. return n
  34. }
  35. p := NewPipeline(conf)
  36. p.Do = do
  37. p.Split = split
  38. p.Start()
  39. p.Add(context.Background(), "1", 1)
  40. p.Add(context.Background(), "1", 2)
  41. p.Add(context.Background(), "11", 3)
  42. p.Add(context.Background(), "2", 3)
  43. time.Sleep(time.Millisecond * 60)
  44. mirrorCtx := metadata.NewContext(context.Background(), metadata.MD{metadata.Mirror: true})
  45. p.Add(mirrorCtx, "2", 3)
  46. time.Sleep(time.Millisecond * 60)
  47. p.SyncAdd(mirrorCtx, "5", 5)
  48. time.Sleep(time.Millisecond * 60)
  49. p.Close()
  50. expt := []recv{
  51. {
  52. mirror: false,
  53. ch: 1,
  54. values: map[string][]interface{}{
  55. "1": {1, 2},
  56. "11": {3},
  57. },
  58. },
  59. {
  60. mirror: false,
  61. ch: 2,
  62. values: map[string][]interface{}{
  63. "2": {3},
  64. },
  65. },
  66. {
  67. mirror: true,
  68. ch: 2,
  69. values: map[string][]interface{}{
  70. "2": {3},
  71. },
  72. },
  73. {
  74. mirror: true,
  75. ch: 5,
  76. values: map[string][]interface{}{
  77. "5": {5},
  78. },
  79. },
  80. }
  81. if !reflect.DeepEqual(runs, expt) {
  82. t.Errorf("expect get %+v,\n got: %+v", expt, runs)
  83. }
  84. }
  85. func TestPipelineSmooth(t *testing.T) {
  86. conf := &Config{
  87. MaxSize: 100,
  88. Interval: xtime.Duration(time.Second),
  89. Buffer: 100,
  90. Worker: 10,
  91. Smooth: true,
  92. }
  93. type result struct {
  94. index int
  95. ts time.Time
  96. }
  97. var results []result
  98. do := func(c context.Context, index int, values map[string][]interface{}) {
  99. results = append(results, result{
  100. index: index,
  101. ts: time.Now(),
  102. })
  103. }
  104. split := func(s string) int {
  105. n, _ := strconv.Atoi(s)
  106. return n
  107. }
  108. p := NewPipeline(conf)
  109. p.Do = do
  110. p.Split = split
  111. p.Start()
  112. for i := 0; i < 10; i++ {
  113. p.Add(context.Background(), strconv.Itoa(i), 1)
  114. }
  115. time.Sleep(time.Millisecond * 1500)
  116. if len(results) != conf.Worker {
  117. t.Errorf("expect results equal worker")
  118. t.FailNow()
  119. }
  120. for i, r := range results {
  121. if i > 0 {
  122. if r.ts.Sub(results[i-1].ts) < time.Millisecond*20 {
  123. t.Errorf("expect runs be smooth")
  124. t.FailNow()
  125. }
  126. }
  127. }
  128. }