collect.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package kafkacollect
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "github.com/Shopify/sarama"
  8. "go-common/app/service/main/dapper/model"
  9. "go-common/app/service/main/dapper/pkg/collect"
  10. "go-common/app/service/main/dapper/pkg/process"
  11. "go-common/library/log"
  12. "go-common/library/stat/prom"
  13. )
  14. var (
  15. collectCount = prom.New().WithCounter("dapper_kafka_collect_count", []string{"name"})
  16. collectErrCount = prom.New().WithCounter("dapper_kafka_collect_err_count", []string{"name"})
  17. )
  18. // Option set option
  19. type Option func(*option)
  20. type option struct {
  21. group string
  22. topic string
  23. addrs []string
  24. }
  25. func (o option) saramaConfig() *sarama.Config {
  26. return nil
  27. }
  28. var defaultOption = option{
  29. group: "default",
  30. }
  31. //func NewConsumer(addrs []string, config *Config) (Consumer, error)
  32. // New kafka collect
  33. func New(topic string, addrs []string, options ...Option) (collect.Collecter, error) {
  34. log.V(10).Info("new kafkacollect topic %s addrs: %v", topic, addrs)
  35. if len(addrs) == 0 {
  36. return nil, fmt.Errorf("kafka addrs required")
  37. }
  38. opt := defaultOption
  39. for _, fn := range options {
  40. fn(&opt)
  41. }
  42. opt.addrs = addrs
  43. opt.topic = topic
  44. clt := &kafkaCollect{opt: opt}
  45. return clt, nil
  46. }
  47. type kafkaCollect struct {
  48. wg sync.WaitGroup
  49. opt option
  50. ps []process.Processer
  51. consumers []*consumer
  52. client sarama.Client
  53. offsetManager sarama.OffsetManager
  54. baseConsumer sarama.Consumer
  55. }
  56. func (k *kafkaCollect) RegisterProcess(p process.Processer) {
  57. k.ps = append(k.ps, p)
  58. }
  59. func (k *kafkaCollect) Start() error {
  60. var err error
  61. if k.client, err = sarama.NewClient(k.opt.addrs, k.opt.saramaConfig()); err != nil {
  62. return fmt.Errorf("new kafka client error: %s", err)
  63. }
  64. if k.offsetManager, err = sarama.NewOffsetManagerFromClient(k.opt.group, k.client); err != nil {
  65. return fmt.Errorf("new offset manager error: %s", err)
  66. }
  67. if k.baseConsumer, err = sarama.NewConsumerFromClient(k.client); err != nil {
  68. return fmt.Errorf("new kafka consumer error: %s", err)
  69. }
  70. log.Info("kafkacollect consumer from topic: %s addrs: %s", k.opt.topic, k.opt.topic)
  71. return k.start()
  72. }
  73. func (k *kafkaCollect) handler(protoSpan *model.ProtoSpan) {
  74. var err error
  75. for _, p := range k.ps {
  76. if err = p.Process(context.Background(), protoSpan); err != nil {
  77. log.Error("process span error: %s, discard", err)
  78. }
  79. }
  80. }
  81. func (k *kafkaCollect) start() error {
  82. ps, err := k.client.Partitions(k.opt.topic)
  83. if err != nil {
  84. return fmt.Errorf("get partitions error: %s", err)
  85. }
  86. for _, p := range ps {
  87. var pom sarama.PartitionOffsetManager
  88. if pom, err = k.offsetManager.ManagePartition(k.opt.topic, p); err != nil {
  89. return fmt.Errorf("new manage partition error: %s", err)
  90. }
  91. offset, _ := pom.NextOffset()
  92. if offset == -1 {
  93. offset = sarama.OffsetOldest
  94. }
  95. var c sarama.PartitionConsumer
  96. log.V(10).Info("partitions %d start offset %d", p, offset)
  97. if c, err = k.baseConsumer.ConsumePartition(k.opt.topic, p, offset); err != nil {
  98. return fmt.Errorf("new consume partition error: %s", err)
  99. }
  100. log.V(10).Info("start partition consumer partition: %d, offset: %d", p, offset)
  101. consumer := newConsumer(k, c, pom)
  102. k.consumers = append(k.consumers, consumer)
  103. k.wg.Add(1)
  104. go consumer.start()
  105. }
  106. return nil
  107. }
  108. func (k *kafkaCollect) Close() error {
  109. for _, c := range k.consumers {
  110. if err := c.close(); err != nil {
  111. log.Warn("close consumer error: %s", err)
  112. }
  113. }
  114. k.wg.Wait()
  115. return nil
  116. }
  117. func newConsumer(k *kafkaCollect, c sarama.PartitionConsumer, pom sarama.PartitionOffsetManager) *consumer {
  118. return &consumer{kafkaCollect: k, consumer: c, pom: pom, closeCh: make(chan struct{}, 1)}
  119. }
  120. type consumer struct {
  121. *kafkaCollect
  122. pom sarama.PartitionOffsetManager
  123. consumer sarama.PartitionConsumer
  124. closeCh chan struct{}
  125. }
  126. func (c *consumer) close() error {
  127. c.closeCh <- struct{}{}
  128. c.pom.Close()
  129. return c.consumer.Close()
  130. }
  131. func (c *consumer) start() {
  132. defer c.wg.Done()
  133. var err error
  134. var value []byte
  135. for {
  136. select {
  137. case msg := <-c.consumer.Messages():
  138. collectCount.Incr("count")
  139. c.pom.MarkOffset(msg.Offset+1, "")
  140. log.V(10).Info("receive message from kafka topic: %s key: %s content: %s", msg.Key, msg.Topic, msg.Value)
  141. protoSpan := new(model.ProtoSpan)
  142. if err = json.Unmarshal(msg.Value, protoSpan); err != nil {
  143. collectErrCount.Incr("count_error")
  144. log.Error("unmarshal span from kafka error: %s, value: %v", err, value)
  145. continue
  146. }
  147. c.handler(protoSpan)
  148. case <-c.closeCh:
  149. log.V(10).Info("receive closed return")
  150. return
  151. }
  152. }
  153. }