databus_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package databus_test
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. "go-common/library/naming/discovery"
  7. "go-common/library/queue/databus"
  8. xtime "go-common/library/time"
  9. )
  10. var (
  11. pCfg = &databus.Config{
  12. // Key: "0PvKGhAqDvsK7zitmS8t",
  13. // Secret: "0PvKGhAqDvsK7zitmS8u",
  14. // Group: "databus_test_group",
  15. // Topic: "databus_test_topic",
  16. Key: "dbe67e6a4c36f877",
  17. Secret: "8c775ea242caa367ba5c876c04576571",
  18. Group: "Test1-MainCommonArch-P",
  19. Topic: "test1",
  20. Action: "pub",
  21. Name: "databus",
  22. Proto: "tcp",
  23. // Addr: "172.16.33.158:6205",
  24. Addr: "172.18.33.50:6205",
  25. Active: 10,
  26. Idle: 5,
  27. DialTimeout: xtime.Duration(time.Second),
  28. WriteTimeout: xtime.Duration(time.Second),
  29. ReadTimeout: xtime.Duration(time.Second),
  30. IdleTimeout: xtime.Duration(time.Minute),
  31. }
  32. sCfg = &databus.Config{
  33. // Key: "0PvKGhAqDvsK7zitmS8t",
  34. // Secret: "0PvKGhAqDvsK7zitmS8u",
  35. // Group: "databus_test_group",
  36. // Topic: "databus_test_topic",
  37. Key: "dbe67e6a4c36f877",
  38. Secret: "8c775ea242caa367ba5c876c04576571",
  39. Group: "Test1-MainCommonArch-S",
  40. Topic: "test1",
  41. Action: "sub",
  42. Name: "databus",
  43. Proto: "tcp",
  44. // Addr: "172.16.33.158:6205",
  45. Addr: "172.18.33.50:6205",
  46. Active: 10,
  47. Idle: 5,
  48. DialTimeout: xtime.Duration(time.Second),
  49. WriteTimeout: xtime.Duration(time.Second),
  50. ReadTimeout: xtime.Duration(time.Second * 35),
  51. IdleTimeout: xtime.Duration(time.Minute),
  52. }
  53. dCfg = &discovery.Config{
  54. Nodes: []string{"172.18.33.50:7171"},
  55. Key: "0c4b8fe3ff35a4b6",
  56. Secret: "b370880d1aca7d3a289b9b9a7f4d6812",
  57. Zone: "sh001",
  58. Env: "uat",
  59. }
  60. )
  61. type TestMsg struct {
  62. Now int64 `json:"now"`
  63. }
  64. func testSub(t *testing.T, d *databus.Databus) {
  65. for {
  66. m, ok := <-d.Messages()
  67. if !ok {
  68. return
  69. }
  70. t.Logf("sub message: %s", string(m.Value))
  71. if err := m.Commit(); err != nil {
  72. t.Errorf("sub commit error(%v)\n", err)
  73. }
  74. }
  75. }
  76. func testPub(t *testing.T, d *databus.Databus) {
  77. // pub
  78. m := &TestMsg{Now: time.Now().UnixNano()}
  79. if err := d.Send(context.TODO(), "test", m); err != nil {
  80. t.Errorf("d.Send(test) error(%v)", err)
  81. } else {
  82. t.Logf("pub message %v", m)
  83. }
  84. }
  85. func TestDatabus(t *testing.T) {
  86. d := databus.New(pCfg)
  87. // pub
  88. testPub(t, d)
  89. testPub(t, d)
  90. testPub(t, d)
  91. d.Close()
  92. // sub
  93. d = databus.New(sCfg)
  94. go testSub(t, d)
  95. time.Sleep(time.Second * 15)
  96. d.Close()
  97. }
  98. func TestDiscoveryDatabus(t *testing.T) {
  99. d := databus.New(pCfg)
  100. // pub
  101. testPub(t, d)
  102. testPub(t, d)
  103. testPub(t, d)
  104. d.Close()
  105. // sub
  106. d = databus.New(sCfg)
  107. go testSub(t, d)
  108. time.Sleep(time.Second * 15)
  109. d.Close()
  110. }
  111. func BenchmarkPub(b *testing.B) {
  112. d := databus.New(pCfg)
  113. defer d.Close()
  114. b.ResetTimer()
  115. b.RunParallel(func(pb *testing.PB) {
  116. for pb.Next() {
  117. m := &TestMsg{Now: time.Now().UnixNano()}
  118. if err := d.Send(context.TODO(), "test", m); err != nil {
  119. b.Errorf("d.Send(test) error(%v)", err)
  120. continue
  121. }
  122. }
  123. })
  124. }
  125. func BenchmarkDiscoveryPub(b *testing.B) {
  126. d := databus.New(pCfg)
  127. defer d.Close()
  128. b.ResetTimer()
  129. b.RunParallel(func(pb *testing.PB) {
  130. for pb.Next() {
  131. m := &TestMsg{Now: time.Now().UnixNano()}
  132. if err := d.Send(context.TODO(), "test", m); err != nil {
  133. b.Errorf("d.Send(test) error(%v)", err)
  134. continue
  135. }
  136. }
  137. })
  138. }