databus_test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package service
  2. import (
  3. "context"
  4. "flag"
  5. "os"
  6. "path/filepath"
  7. "testing"
  8. "go-common/app/admin/main/apm/conf"
  9. . "github.com/smartystreets/goconvey/convey"
  10. )
  11. var (
  12. svr *Service
  13. )
  14. func TestMain(m *testing.M) {
  15. var (
  16. err error
  17. )
  18. dir, _ := filepath.Abs("../cmd/apm-admin-test.toml")
  19. if err = flag.Set("conf", dir); err != nil {
  20. panic(err)
  21. }
  22. if err = conf.Init(); err != nil {
  23. panic(err)
  24. }
  25. svr = New(conf.Conf)
  26. os.Exit(m.Run())
  27. }
  28. func TestFake(t *testing.T) {
  29. Convey("fake", t, func() {
  30. t.Log("fake test")
  31. })
  32. }
  33. func TestService_NewClient(t *testing.T) {
  34. Convey("should new client all", t, func() {
  35. c, err := NewClient(conf.Conf.Kafka["test_kafka_9092-266"].Brokers, "Archive-T", "Archive-Live-S")
  36. t.Log(err, c)
  37. So(err, ShouldBeNil)
  38. })
  39. }
  40. func TestService_OffsetNew(t *testing.T) {
  41. Convey("should offset new all", t, func() {
  42. c, err := NewClient(conf.Conf.Kafka["test_kafka_9092-266"].Brokers, "Archive-T", "Archive-Live-S")
  43. So(err, ShouldBeNil)
  44. info, err := c.OffsetNew()
  45. t.Log(err, info)
  46. So(err, ShouldBeNil)
  47. })
  48. }
  49. func TestService_OffsetOld(t *testing.T) {
  50. Convey("should offset old all", t, func() {
  51. c, err := NewClient(conf.Conf.Kafka["test_kafka_9092-266"].Brokers, "Archive-T", "Archive-Live-S")
  52. So(err, ShouldBeNil)
  53. info, err := c.OffsetOld()
  54. t.Log(err, info)
  55. So(err, ShouldBeNil)
  56. })
  57. }
  58. func TestService_SeekBegin(t *testing.T) {
  59. Convey("should seek begin all", t, func() {
  60. c, err := NewClient(conf.Conf.Kafka["test_kafka_9092-266"].Brokers, "Archive-T", "Archive-Live-S")
  61. c.SeekBegin()
  62. t.Log(err)
  63. So(err, ShouldBeNil)
  64. })
  65. }
  66. func TestService_SeekEnd(t *testing.T) {
  67. Convey("should seek end all", t, func() {
  68. c, err := NewClient(conf.Conf.Kafka["test_kafka_9092-266"].Brokers, "Archive-T", "Archive-Live-S")
  69. So(err, ShouldBeNil)
  70. err = c.SeekEnd()
  71. t.Log(err)
  72. So(err, ShouldBeNil)
  73. })
  74. }
  75. func TestCreateTopic(t *testing.T) {
  76. Convey("test create topic", t, func() {
  77. err := CreateTopic([]string{"172.18.33.51:9098", "172.18.33.52:9098", "172.18.33.50:9098"}, "testcreate11", 1, 1)
  78. So(err, ShouldBeNil)
  79. })
  80. }
  81. func TestService_OffsetMarked(t *testing.T) {
  82. Convey("should offset marked all", t, func() {
  83. c, err := NewClient(conf.Conf.Kafka["test_kafka_9092-266"].Brokers, "Archive-T", "Archive-Live-S")
  84. So(err, ShouldBeNil)
  85. _, err = c.OffsetMarked()
  86. t.Log(err)
  87. So(err, ShouldBeNil)
  88. })
  89. }
  90. func TestService_MsgFetch(t *testing.T) {
  91. Convey("should msg fetch", t, func() {
  92. res, err := FetchMessage(context.Background(), "test_kafka_9092-266", "Archive-T", "ArchiveAPM-MainCommonArch-S", "", 0, 0, 10)
  93. So(err, ShouldBeNil)
  94. for _, r := range res {
  95. t.Logf("fetch key:%s value:%s partition:%d offset:%d timestamp:%d", r.Key, r.Value, r.Partition, r.Offset, r.Timestamp)
  96. }
  97. })
  98. }