group_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. package databusutil
  2. import (
  3. "context"
  4. "encoding/json"
  5. "runtime"
  6. "strconv"
  7. "sync"
  8. "testing"
  9. "time"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. "go-common/library/sync/errgroup"
  13. xtime "go-common/library/time"
  14. )
  15. type testMsg struct {
  16. Seq int64 `json:"seq"`
  17. Mid int64 `json:"mid"`
  18. Now int64 `json:"now"`
  19. }
  20. var (
  21. _sendSeqsList = make([][]int64, _groupNum)
  22. _recvSeqsList = make([][]int64, _groupNum)
  23. _sMus = make([]sync.Mutex, _groupNum)
  24. _rMus = make([]sync.Mutex, _groupNum)
  25. _groupNum = 8
  26. _tc = 20
  27. _ts = time.Now().Unix()
  28. _st = _ts - _ts%10 + 1000
  29. _ed = _bSt + int64(_groupNum*_tc) - 1
  30. _dsPubConf = &databus.Config{
  31. Key: "0PvKGhAqDvsK7zitmS8t",
  32. Secret: "0PvKGhAqDvsK7zitmS8u",
  33. Group: "databus_test_group",
  34. Topic: "databus_test_topic",
  35. Action: "pub",
  36. Name: "databus",
  37. Proto: "tcp",
  38. Addr: "172.16.33.158:6205",
  39. Active: 1,
  40. Idle: 1,
  41. DialTimeout: xtime.Duration(time.Second),
  42. WriteTimeout: xtime.Duration(time.Second),
  43. ReadTimeout: xtime.Duration(time.Second),
  44. IdleTimeout: xtime.Duration(time.Minute),
  45. }
  46. _dsSubConf = &databus.Config{
  47. Key: "0PvKGhAqDvsK7zitmS8t",
  48. Secret: "0PvKGhAqDvsK7zitmS8u",
  49. Group: "databus_test_group",
  50. Topic: "databus_test_topic",
  51. Action: "sub",
  52. Name: "databus",
  53. Proto: "tcp",
  54. Addr: "172.16.33.158:6205",
  55. Active: 1,
  56. Idle: 1,
  57. DialTimeout: xtime.Duration(time.Second),
  58. WriteTimeout: xtime.Duration(time.Second),
  59. ReadTimeout: xtime.Duration(time.Second * 35),
  60. IdleTimeout: xtime.Duration(time.Minute),
  61. }
  62. )
  63. func TestGroup(t *testing.T) {
  64. for i := 0; i < _groupNum; i++ {
  65. _sendSeqsList[i] = make([]int64, 0)
  66. _recvSeqsList[i] = make([]int64, 0)
  67. }
  68. taskCounts := taskCount(_groupNum, _st, _ed)
  69. runtime.GOMAXPROCS(32)
  70. log.Init(&log.Config{
  71. Dir: "/data/log/queue",
  72. })
  73. c := &Config{
  74. Size: 200,
  75. Ticker: xtime.Duration(time.Second),
  76. Num: _groupNum,
  77. Chan: 1024,
  78. }
  79. dsSub := databus.New(_dsSubConf)
  80. defer dsSub.Close()
  81. group := NewGroup(
  82. c,
  83. dsSub.Messages(),
  84. )
  85. group.New = newTestMsg
  86. group.Split = split
  87. group.Do = do
  88. eg, _ := errgroup.WithContext(context.Background())
  89. // go produce test messages
  90. eg.Go(func() error {
  91. send(_st, _ed)
  92. return nil
  93. })
  94. // go consume test messages
  95. eg.Go(func() error {
  96. group.Start()
  97. defer group.Close()
  98. m := make(map[int]struct{})
  99. for len(m) < _groupNum {
  100. for i := 0; i < _groupNum; i++ {
  101. _, ok := m[i]
  102. if ok {
  103. continue
  104. }
  105. _rMus[i].Lock()
  106. if len(_recvSeqsList[i]) == taskCounts[i] {
  107. m[i] = struct{}{}
  108. }
  109. _rMus[i].Unlock()
  110. log.Info("_recvSeqsList[%d] length: %d, expect: %d", i, len(_recvSeqsList[i]), taskCounts[i])
  111. }
  112. log.Info("m length: %d", len(m))
  113. time.Sleep(time.Millisecond * 500)
  114. }
  115. // check seqs list, sendSeqsList and recvSeqsList will not change since now, so no need to lock
  116. for num := 0; num < _groupNum; num++ {
  117. sendSeqs := _sendSeqsList[num]
  118. recvSeqs := _recvSeqsList[num]
  119. if len(sendSeqs) != taskCounts[num] {
  120. t.Errorf("sendSeqs length of proc %d is incorrect, expcted %d but got %d", num, taskCounts[num], len(sendSeqs))
  121. t.FailNow()
  122. }
  123. if len(recvSeqs) != taskCounts[num] {
  124. t.Errorf("recvSeqs length of proc %d is incorrect, expcted %d but got %d", num, taskCounts[num], len(recvSeqs))
  125. t.FailNow()
  126. }
  127. for i := range recvSeqs {
  128. if recvSeqs[i] != sendSeqs[i] {
  129. t.Errorf("res is incorrect for proc %d, expcted recvSeqs[%d] equal to sendSeqs[%d] but not, recvSeqs[%d]: %d, sendSeqs[%d]: %d", num, i, i, i, recvSeqs[i], i, sendSeqs[i])
  130. t.FailNow()
  131. }
  132. }
  133. t.Logf("proc %d processed %d messages, expected %d messages, check ok", num, taskCounts[num], len(recvSeqs))
  134. }
  135. return nil
  136. })
  137. eg.Wait()
  138. }
  139. func do(msgs []interface{}) {
  140. for _, m := range msgs {
  141. if msg, ok := m.(*testMsg); ok {
  142. shard := int(msg.Mid) % _groupNum
  143. if msg.Seq < _st {
  144. log.Info("proc %d processed old seq: %d, mid: %d", shard, msg.Seq, msg.Mid)
  145. continue
  146. }
  147. _rMus[shard].Lock()
  148. _recvSeqsList[shard] = append(_recvSeqsList[shard], msg.Seq)
  149. _rMus[shard].Unlock()
  150. log.Info("proc %d processed seq: %d, mid: %d", shard, msg.Seq, msg.Mid)
  151. }
  152. }
  153. }
  154. func send(st, ed int64) error {
  155. dsPub := databus.New(_dsPubConf)
  156. defer dsPub.Close()
  157. ts := time.Now().Unix()
  158. for i := st; i <= ed; i++ {
  159. mid := int64(i)
  160. seq := i
  161. k := _dsPubConf.Topic + strconv.FormatInt(mid, 10)
  162. n := &testMsg{
  163. Seq: seq,
  164. Mid: mid,
  165. Now: ts,
  166. }
  167. dsPub.Send(context.TODO(), k, n)
  168. // NOTE: sleep here to avoid network latency caused message out of sequence
  169. time.Sleep(time.Millisecond * 500)
  170. shard := int(mid) % _groupNum
  171. _sMus[shard].Lock()
  172. _sendSeqsList[shard] = append(_sendSeqsList[shard], seq)
  173. _sMus[shard].Unlock()
  174. }
  175. return nil
  176. }
  177. func newTestMsg(msg *databus.Message) (res interface{}, err error) {
  178. res = new(testMsg)
  179. if err = json.Unmarshal(msg.Value, &res); err != nil {
  180. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  181. }
  182. return
  183. }
  184. func split(msg *databus.Message, data interface{}) int {
  185. t, ok := data.(*testMsg)
  186. if !ok {
  187. return 0
  188. }
  189. return int(t.Mid)
  190. }
  191. func taskCount(num int, st, ed int64) []int {
  192. res := make([]int, num)
  193. for i := st; i <= ed; i++ {
  194. res[int(i)%num]++
  195. }
  196. return res
  197. }
  198. func TestTaskCount(t *testing.T) {
  199. groupNum := 10
  200. c := 100
  201. ts := time.Now().Unix()
  202. st := ts - ts%10 + 1000
  203. ed := st + int64(groupNum*c) - 1
  204. res := taskCount(groupNum, st, ed)
  205. for i, v := range res {
  206. if v != c {
  207. t.Errorf("res is incorrect, expected task count 10 for proc %d but got %d", i, v)
  208. t.FailNow()
  209. }
  210. t.Logf("i: %d, v: %d", i, v)
  211. }
  212. }
  213. var (
  214. _bGroupNum = 3
  215. _bSendSeqsList = make([][]int64, _bGroupNum)
  216. _bRecvSeqsList = make([][]int64, _bGroupNum)
  217. _bSMus = make([]sync.Mutex, _bGroupNum)
  218. _bRMus = make([]sync.Mutex, _bGroupNum)
  219. _bTc = 20
  220. _bTs = time.Now().Unix()
  221. _bSt = _bTs - _bTs%10 + 1000
  222. _bEd = _bSt + int64(_bGroupNum*_bTc) - 1
  223. _bTaskCounts = taskCount(_bGroupNum, _bSt, _bEd)
  224. _blockDo = true
  225. _blockDoMu sync.Mutex
  226. _blocked = false
  227. )
  228. func TestGroup_Blocking(t *testing.T) {
  229. for i := 0; i < _bGroupNum; i++ {
  230. _bSendSeqsList[i] = make([]int64, 0)
  231. _bRecvSeqsList[i] = make([]int64, 0)
  232. }
  233. runtime.GOMAXPROCS(32)
  234. log.Init(&log.Config{
  235. Dir: "/data/log/queue",
  236. })
  237. c := &Config{
  238. Size: 20,
  239. Ticker: xtime.Duration(time.Second),
  240. Num: _bGroupNum,
  241. Chan: 5,
  242. }
  243. dsSub := databus.New(_dsSubConf)
  244. defer dsSub.Close()
  245. g := NewGroup(
  246. c,
  247. dsSub.Messages(),
  248. )
  249. g.New = newTestMsg
  250. g.Split = split
  251. g.Do = func(msgs []interface{}) {
  252. blockingDo(t, g, msgs)
  253. }
  254. eg, _ := errgroup.WithContext(context.Background())
  255. // go produce test messages
  256. eg.Go(func() error {
  257. dsPub := databus.New(_dsPubConf)
  258. defer dsPub.Close()
  259. ts := time.Now().Unix()
  260. for i := _bSt; i <= _bEd; i++ {
  261. mid := int64(i)
  262. seq := i
  263. k := _dsPubConf.Topic + strconv.FormatInt(mid, 10)
  264. n := &testMsg{
  265. Seq: seq,
  266. Mid: mid,
  267. Now: ts,
  268. }
  269. dsPub.Send(context.TODO(), k, n)
  270. // NOTE: sleep here to avoid network latency caused message out of sequence
  271. time.Sleep(time.Millisecond * 500)
  272. shard := int(mid) % _bGroupNum
  273. _bSMus[shard].Lock()
  274. _bSendSeqsList[shard] = append(_bSendSeqsList[shard], seq)
  275. _bSMus[shard].Unlock()
  276. }
  277. return nil
  278. })
  279. // go consume test messages
  280. eg.Go(func() error {
  281. g.Start()
  282. defer g.Close()
  283. m := make(map[int]struct{})
  284. // wait until all proc process theirs messages done
  285. for len(m) < _bGroupNum {
  286. for i := 0; i < _bGroupNum; i++ {
  287. _, ok := m[i]
  288. if ok {
  289. continue
  290. }
  291. _bRMus[i].Lock()
  292. if len(_bRecvSeqsList[i]) == _bTaskCounts[i] {
  293. m[i] = struct{}{}
  294. }
  295. _bRMus[i].Unlock()
  296. log.Info("_bRecvSeqsList[%d] length: %d, expect: %d, blockDo: %t", i, len(_bRecvSeqsList[i]), _bTaskCounts[i], _blockDo)
  297. }
  298. log.Info("m length: %d", len(m))
  299. time.Sleep(time.Millisecond * 500)
  300. }
  301. return nil
  302. })
  303. eg.Wait()
  304. }
  305. func blockingDo(t *testing.T, g *Group, msgs []interface{}) {
  306. _blockDoMu.Lock()
  307. if !_blockDo {
  308. _blockDoMu.Unlock()
  309. processMsg(msgs)
  310. return
  311. }
  312. // blocking to see if consume proc blocks finally
  313. lastGLen := 0
  314. cnt := 0
  315. for i := 0; i < 60; i++ {
  316. // print seqs status, not lock because final stable
  317. for i, v := range _bRecvSeqsList {
  318. log.Info("_bRecvSeqsList[%d] length: %d, expect: %d", i, len(v), _bTaskCounts[i])
  319. }
  320. gLen := 0
  321. for h := g.head; h != nil; h = h.next {
  322. gLen++
  323. }
  324. if gLen == lastGLen {
  325. cnt++
  326. } else {
  327. cnt = 0
  328. }
  329. lastGLen = gLen
  330. log.Info("blocking test: gLen: %d, cnt: %d, _bSt: %d, _bEd: %d", gLen, cnt, _bSt, _bEd)
  331. if cnt == 5 {
  332. _blocked = true
  333. log.Info("blocking test: consumeproc now is blocked, now trying to unblocking do callback")
  334. break
  335. }
  336. time.Sleep(time.Millisecond * 500)
  337. }
  338. // assert blocked
  339. if !_blocked {
  340. t.Errorf("res is incorrect, _blocked should be true but got false")
  341. t.FailNow()
  342. }
  343. // unblocking and check if consume proc unblocking too
  344. _blockDo = false
  345. _blockDoMu.Unlock()
  346. processMsg(msgs)
  347. }
  348. func processMsg(msgs []interface{}) {
  349. for _, m := range msgs {
  350. if msg, ok := m.(*testMsg); ok {
  351. shard := int(msg.Mid) % _bGroupNum
  352. if msg.Seq < _bSt {
  353. log.Info("proc %d processed old seq: %d, mid: %d", shard, msg.Seq, msg.Mid)
  354. continue
  355. }
  356. _bRMus[shard].Lock()
  357. _bRecvSeqsList[shard] = append(_bRecvSeqsList[shard], msg.Seq)
  358. log.Info("appended: %d", msg.Seq)
  359. _bRMus[shard].Unlock()
  360. log.Info("proc %d processed seq: %d, mid: %d", shard, msg.Seq, msg.Mid)
  361. }
  362. }
  363. }