123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392 |
- package databusutil
- import (
- "context"
- "encoding/json"
- "runtime"
- "strconv"
- "sync"
- "testing"
- "time"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "go-common/library/sync/errgroup"
- xtime "go-common/library/time"
- )
- type testMsg struct {
- Seq int64 `json:"seq"`
- Mid int64 `json:"mid"`
- Now int64 `json:"now"`
- }
- var (
- _sendSeqsList = make([][]int64, _groupNum)
- _recvSeqsList = make([][]int64, _groupNum)
- _sMus = make([]sync.Mutex, _groupNum)
- _rMus = make([]sync.Mutex, _groupNum)
- _groupNum = 8
- _tc = 20
- _ts = time.Now().Unix()
- _st = _ts - _ts%10 + 1000
- _ed = _bSt + int64(_groupNum*_tc) - 1
- _dsPubConf = &databus.Config{
- Key: "0PvKGhAqDvsK7zitmS8t",
- Secret: "0PvKGhAqDvsK7zitmS8u",
- Group: "databus_test_group",
- Topic: "databus_test_topic",
- Action: "pub",
- Name: "databus",
- Proto: "tcp",
- Addr: "172.16.33.158:6205",
- Active: 1,
- Idle: 1,
- DialTimeout: xtime.Duration(time.Second),
- WriteTimeout: xtime.Duration(time.Second),
- ReadTimeout: xtime.Duration(time.Second),
- IdleTimeout: xtime.Duration(time.Minute),
- }
- _dsSubConf = &databus.Config{
- Key: "0PvKGhAqDvsK7zitmS8t",
- Secret: "0PvKGhAqDvsK7zitmS8u",
- Group: "databus_test_group",
- Topic: "databus_test_topic",
- Action: "sub",
- Name: "databus",
- Proto: "tcp",
- Addr: "172.16.33.158:6205",
- Active: 1,
- Idle: 1,
- DialTimeout: xtime.Duration(time.Second),
- WriteTimeout: xtime.Duration(time.Second),
- ReadTimeout: xtime.Duration(time.Second * 35),
- IdleTimeout: xtime.Duration(time.Minute),
- }
- )
- func TestGroup(t *testing.T) {
- for i := 0; i < _groupNum; i++ {
- _sendSeqsList[i] = make([]int64, 0)
- _recvSeqsList[i] = make([]int64, 0)
- }
- taskCounts := taskCount(_groupNum, _st, _ed)
- runtime.GOMAXPROCS(32)
- log.Init(&log.Config{
- Dir: "/data/log/queue",
- })
- c := &Config{
- Size: 200,
- Ticker: xtime.Duration(time.Second),
- Num: _groupNum,
- Chan: 1024,
- }
- dsSub := databus.New(_dsSubConf)
- defer dsSub.Close()
- group := NewGroup(
- c,
- dsSub.Messages(),
- )
- group.New = newTestMsg
- group.Split = split
- group.Do = do
- eg, _ := errgroup.WithContext(context.Background())
- // go produce test messages
- eg.Go(func() error {
- send(_st, _ed)
- return nil
- })
- // go consume test messages
- eg.Go(func() error {
- group.Start()
- defer group.Close()
- m := make(map[int]struct{})
- for len(m) < _groupNum {
- for i := 0; i < _groupNum; i++ {
- _, ok := m[i]
- if ok {
- continue
- }
- _rMus[i].Lock()
- if len(_recvSeqsList[i]) == taskCounts[i] {
- m[i] = struct{}{}
- }
- _rMus[i].Unlock()
- log.Info("_recvSeqsList[%d] length: %d, expect: %d", i, len(_recvSeqsList[i]), taskCounts[i])
- }
- log.Info("m length: %d", len(m))
- time.Sleep(time.Millisecond * 500)
- }
- // check seqs list, sendSeqsList and recvSeqsList will not change since now, so no need to lock
- for num := 0; num < _groupNum; num++ {
- sendSeqs := _sendSeqsList[num]
- recvSeqs := _recvSeqsList[num]
- if len(sendSeqs) != taskCounts[num] {
- t.Errorf("sendSeqs length of proc %d is incorrect, expcted %d but got %d", num, taskCounts[num], len(sendSeqs))
- t.FailNow()
- }
- if len(recvSeqs) != taskCounts[num] {
- t.Errorf("recvSeqs length of proc %d is incorrect, expcted %d but got %d", num, taskCounts[num], len(recvSeqs))
- t.FailNow()
- }
- for i := range recvSeqs {
- if recvSeqs[i] != sendSeqs[i] {
- t.Errorf("res is incorrect for proc %d, expcted recvSeqs[%d] equal to sendSeqs[%d] but not, recvSeqs[%d]: %d, sendSeqs[%d]: %d", num, i, i, i, recvSeqs[i], i, sendSeqs[i])
- t.FailNow()
- }
- }
- t.Logf("proc %d processed %d messages, expected %d messages, check ok", num, taskCounts[num], len(recvSeqs))
- }
- return nil
- })
- eg.Wait()
- }
- func do(msgs []interface{}) {
- for _, m := range msgs {
- if msg, ok := m.(*testMsg); ok {
- shard := int(msg.Mid) % _groupNum
- if msg.Seq < _st {
- log.Info("proc %d processed old seq: %d, mid: %d", shard, msg.Seq, msg.Mid)
- continue
- }
- _rMus[shard].Lock()
- _recvSeqsList[shard] = append(_recvSeqsList[shard], msg.Seq)
- _rMus[shard].Unlock()
- log.Info("proc %d processed seq: %d, mid: %d", shard, msg.Seq, msg.Mid)
- }
- }
- }
- func send(st, ed int64) error {
- dsPub := databus.New(_dsPubConf)
- defer dsPub.Close()
- ts := time.Now().Unix()
- for i := st; i <= ed; i++ {
- mid := int64(i)
- seq := i
- k := _dsPubConf.Topic + strconv.FormatInt(mid, 10)
- n := &testMsg{
- Seq: seq,
- Mid: mid,
- Now: ts,
- }
- dsPub.Send(context.TODO(), k, n)
- // NOTE: sleep here to avoid network latency caused message out of sequence
- time.Sleep(time.Millisecond * 500)
- shard := int(mid) % _groupNum
- _sMus[shard].Lock()
- _sendSeqsList[shard] = append(_sendSeqsList[shard], seq)
- _sMus[shard].Unlock()
- }
- return nil
- }
- func newTestMsg(msg *databus.Message) (res interface{}, err error) {
- res = new(testMsg)
- if err = json.Unmarshal(msg.Value, &res); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
- }
- return
- }
- func split(msg *databus.Message, data interface{}) int {
- t, ok := data.(*testMsg)
- if !ok {
- return 0
- }
- return int(t.Mid)
- }
- func taskCount(num int, st, ed int64) []int {
- res := make([]int, num)
- for i := st; i <= ed; i++ {
- res[int(i)%num]++
- }
- return res
- }
- func TestTaskCount(t *testing.T) {
- groupNum := 10
- c := 100
- ts := time.Now().Unix()
- st := ts - ts%10 + 1000
- ed := st + int64(groupNum*c) - 1
- res := taskCount(groupNum, st, ed)
- for i, v := range res {
- if v != c {
- t.Errorf("res is incorrect, expected task count 10 for proc %d but got %d", i, v)
- t.FailNow()
- }
- t.Logf("i: %d, v: %d", i, v)
- }
- }
- var (
- _bGroupNum = 3
- _bSendSeqsList = make([][]int64, _bGroupNum)
- _bRecvSeqsList = make([][]int64, _bGroupNum)
- _bSMus = make([]sync.Mutex, _bGroupNum)
- _bRMus = make([]sync.Mutex, _bGroupNum)
- _bTc = 20
- _bTs = time.Now().Unix()
- _bSt = _bTs - _bTs%10 + 1000
- _bEd = _bSt + int64(_bGroupNum*_bTc) - 1
- _bTaskCounts = taskCount(_bGroupNum, _bSt, _bEd)
- _blockDo = true
- _blockDoMu sync.Mutex
- _blocked = false
- )
- func TestGroup_Blocking(t *testing.T) {
- for i := 0; i < _bGroupNum; i++ {
- _bSendSeqsList[i] = make([]int64, 0)
- _bRecvSeqsList[i] = make([]int64, 0)
- }
- runtime.GOMAXPROCS(32)
- log.Init(&log.Config{
- Dir: "/data/log/queue",
- })
- c := &Config{
- Size: 20,
- Ticker: xtime.Duration(time.Second),
- Num: _bGroupNum,
- Chan: 5,
- }
- dsSub := databus.New(_dsSubConf)
- defer dsSub.Close()
- g := NewGroup(
- c,
- dsSub.Messages(),
- )
- g.New = newTestMsg
- g.Split = split
- g.Do = func(msgs []interface{}) {
- blockingDo(t, g, msgs)
- }
- eg, _ := errgroup.WithContext(context.Background())
- // go produce test messages
- eg.Go(func() error {
- dsPub := databus.New(_dsPubConf)
- defer dsPub.Close()
- ts := time.Now().Unix()
- for i := _bSt; i <= _bEd; i++ {
- mid := int64(i)
- seq := i
- k := _dsPubConf.Topic + strconv.FormatInt(mid, 10)
- n := &testMsg{
- Seq: seq,
- Mid: mid,
- Now: ts,
- }
- dsPub.Send(context.TODO(), k, n)
- // NOTE: sleep here to avoid network latency caused message out of sequence
- time.Sleep(time.Millisecond * 500)
- shard := int(mid) % _bGroupNum
- _bSMus[shard].Lock()
- _bSendSeqsList[shard] = append(_bSendSeqsList[shard], seq)
- _bSMus[shard].Unlock()
- }
- return nil
- })
- // go consume test messages
- eg.Go(func() error {
- g.Start()
- defer g.Close()
- m := make(map[int]struct{})
- // wait until all proc process theirs messages done
- for len(m) < _bGroupNum {
- for i := 0; i < _bGroupNum; i++ {
- _, ok := m[i]
- if ok {
- continue
- }
- _bRMus[i].Lock()
- if len(_bRecvSeqsList[i]) == _bTaskCounts[i] {
- m[i] = struct{}{}
- }
- _bRMus[i].Unlock()
- log.Info("_bRecvSeqsList[%d] length: %d, expect: %d, blockDo: %t", i, len(_bRecvSeqsList[i]), _bTaskCounts[i], _blockDo)
- }
- log.Info("m length: %d", len(m))
- time.Sleep(time.Millisecond * 500)
- }
- return nil
- })
- eg.Wait()
- }
- func blockingDo(t *testing.T, g *Group, msgs []interface{}) {
- _blockDoMu.Lock()
- if !_blockDo {
- _blockDoMu.Unlock()
- processMsg(msgs)
- return
- }
- // blocking to see if consume proc blocks finally
- lastGLen := 0
- cnt := 0
- for i := 0; i < 60; i++ {
- // print seqs status, not lock because final stable
- for i, v := range _bRecvSeqsList {
- log.Info("_bRecvSeqsList[%d] length: %d, expect: %d", i, len(v), _bTaskCounts[i])
- }
- gLen := 0
- for h := g.head; h != nil; h = h.next {
- gLen++
- }
- if gLen == lastGLen {
- cnt++
- } else {
- cnt = 0
- }
- lastGLen = gLen
- log.Info("blocking test: gLen: %d, cnt: %d, _bSt: %d, _bEd: %d", gLen, cnt, _bSt, _bEd)
- if cnt == 5 {
- _blocked = true
- log.Info("blocking test: consumeproc now is blocked, now trying to unblocking do callback")
- break
- }
- time.Sleep(time.Millisecond * 500)
- }
- // assert blocked
- if !_blocked {
- t.Errorf("res is incorrect, _blocked should be true but got false")
- t.FailNow()
- }
- // unblocking and check if consume proc unblocking too
- _blockDo = false
- _blockDoMu.Unlock()
- processMsg(msgs)
- }
- func processMsg(msgs []interface{}) {
- for _, m := range msgs {
- if msg, ok := m.(*testMsg); ok {
- shard := int(msg.Mid) % _bGroupNum
- if msg.Seq < _bSt {
- log.Info("proc %d processed old seq: %d, mid: %d", shard, msg.Seq, msg.Mid)
- continue
- }
- _bRMus[shard].Lock()
- _bRecvSeqsList[shard] = append(_bRecvSeqsList[shard], msg.Seq)
- log.Info("appended: %d", msg.Seq)
- _bRMus[shard].Unlock()
- log.Info("proc %d processed seq: %d, mid: %d", shard, msg.Seq, msg.Mid)
- }
- }
- }
|