tcp_test.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package tcp
  2. import (
  3. "net"
  4. "sync"
  5. "testing"
  6. "time"
  7. "go-common/app/infra/databus/conf"
  8. )
  9. var (
  10. _testGroup = "test-consumer-group"
  11. _testTopic = "test_topic"
  12. _testAddr = "172.22.33.174:9092"
  13. _testConfig = &conf.Kafka{
  14. Cluster: _testGroup,
  15. Brokers: []string{_testAddr},
  16. }
  17. )
  18. func TestNewSub(t *testing.T) {
  19. var (
  20. mu sync.Mutex
  21. err error
  22. )
  23. subs := []*Sub{}
  24. c, _ := net.Dial("tcp", _testAddr)
  25. sub, err := NewSub(newConn(c, time.Second, time.Second), _testGroup, _testTopic, "", _testConfig, 100)
  26. if err != nil {
  27. t.Fatal(err)
  28. }
  29. subs = append(subs, sub)
  30. time.Sleep(time.Second * 5)
  31. go func() {
  32. for i := 0; i < 200; i++ {
  33. sub, err := NewSub(newConn(c, time.Second, time.Second), _testGroup, _testTopic, "", _testConfig, 100)
  34. if err != nil {
  35. t.Errorf("NewSub error(%v)", err)
  36. continue
  37. }
  38. mu.Lock()
  39. subs = append(subs, sub)
  40. mu.Unlock()
  41. }
  42. }()
  43. time.Sleep(time.Second * 5)
  44. for i := 0; i < 20; i++ {
  45. sub, err := NewSub(newConn(c, time.Second, time.Second), _testGroup, _testTopic, "", _testConfig, 100)
  46. if err != nil {
  47. t.Fatal(err)
  48. continue
  49. }
  50. mu.Lock()
  51. subs = append(subs, sub)
  52. mu.Unlock()
  53. }
  54. time.Sleep(time.Second * 5)
  55. }