123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package kafkacollect
- import (
- "context"
- "encoding/json"
- "fmt"
- "sync"
- "github.com/Shopify/sarama"
- "go-common/app/service/main/dapper/model"
- "go-common/app/service/main/dapper/pkg/collect"
- "go-common/app/service/main/dapper/pkg/process"
- "go-common/library/log"
- "go-common/library/stat/prom"
- )
- var (
- collectCount = prom.New().WithCounter("dapper_kafka_collect_count", []string{"name"})
- collectErrCount = prom.New().WithCounter("dapper_kafka_collect_err_count", []string{"name"})
- )
- // Option set option
- type Option func(*option)
- type option struct {
- group string
- topic string
- addrs []string
- }
- func (o option) saramaConfig() *sarama.Config {
- return nil
- }
- var defaultOption = option{
- group: "default",
- }
- //func NewConsumer(addrs []string, config *Config) (Consumer, error)
- // New kafka collect
- func New(topic string, addrs []string, options ...Option) (collect.Collecter, error) {
- log.V(10).Info("new kafkacollect topic %s addrs: %v", topic, addrs)
- if len(addrs) == 0 {
- return nil, fmt.Errorf("kafka addrs required")
- }
- opt := defaultOption
- for _, fn := range options {
- fn(&opt)
- }
- opt.addrs = addrs
- opt.topic = topic
- clt := &kafkaCollect{opt: opt}
- return clt, nil
- }
- type kafkaCollect struct {
- wg sync.WaitGroup
- opt option
- ps []process.Processer
- consumers []*consumer
- client sarama.Client
- offsetManager sarama.OffsetManager
- baseConsumer sarama.Consumer
- }
- func (k *kafkaCollect) RegisterProcess(p process.Processer) {
- k.ps = append(k.ps, p)
- }
- func (k *kafkaCollect) Start() error {
- var err error
- if k.client, err = sarama.NewClient(k.opt.addrs, k.opt.saramaConfig()); err != nil {
- return fmt.Errorf("new kafka client error: %s", err)
- }
- if k.offsetManager, err = sarama.NewOffsetManagerFromClient(k.opt.group, k.client); err != nil {
- return fmt.Errorf("new offset manager error: %s", err)
- }
- if k.baseConsumer, err = sarama.NewConsumerFromClient(k.client); err != nil {
- return fmt.Errorf("new kafka consumer error: %s", err)
- }
- log.Info("kafkacollect consumer from topic: %s addrs: %s", k.opt.topic, k.opt.topic)
- return k.start()
- }
- func (k *kafkaCollect) handler(protoSpan *model.ProtoSpan) {
- var err error
- for _, p := range k.ps {
- if err = p.Process(context.Background(), protoSpan); err != nil {
- log.Error("process span error: %s, discard", err)
- }
- }
- }
- func (k *kafkaCollect) start() error {
- ps, err := k.client.Partitions(k.opt.topic)
- if err != nil {
- return fmt.Errorf("get partitions error: %s", err)
- }
- for _, p := range ps {
- var pom sarama.PartitionOffsetManager
- if pom, err = k.offsetManager.ManagePartition(k.opt.topic, p); err != nil {
- return fmt.Errorf("new manage partition error: %s", err)
- }
- offset, _ := pom.NextOffset()
- if offset == -1 {
- offset = sarama.OffsetOldest
- }
- var c sarama.PartitionConsumer
- log.V(10).Info("partitions %d start offset %d", p, offset)
- if c, err = k.baseConsumer.ConsumePartition(k.opt.topic, p, offset); err != nil {
- return fmt.Errorf("new consume partition error: %s", err)
- }
- log.V(10).Info("start partition consumer partition: %d, offset: %d", p, offset)
- consumer := newConsumer(k, c, pom)
- k.consumers = append(k.consumers, consumer)
- k.wg.Add(1)
- go consumer.start()
- }
- return nil
- }
- func (k *kafkaCollect) Close() error {
- for _, c := range k.consumers {
- if err := c.close(); err != nil {
- log.Warn("close consumer error: %s", err)
- }
- }
- k.wg.Wait()
- return nil
- }
- func newConsumer(k *kafkaCollect, c sarama.PartitionConsumer, pom sarama.PartitionOffsetManager) *consumer {
- return &consumer{kafkaCollect: k, consumer: c, pom: pom, closeCh: make(chan struct{}, 1)}
- }
- type consumer struct {
- *kafkaCollect
- pom sarama.PartitionOffsetManager
- consumer sarama.PartitionConsumer
- closeCh chan struct{}
- }
- func (c *consumer) close() error {
- c.closeCh <- struct{}{}
- c.pom.Close()
- return c.consumer.Close()
- }
- func (c *consumer) start() {
- defer c.wg.Done()
- var err error
- var value []byte
- for {
- select {
- case msg := <-c.consumer.Messages():
- collectCount.Incr("count")
- c.pom.MarkOffset(msg.Offset+1, "")
- log.V(10).Info("receive message from kafka topic: %s key: %s content: %s", msg.Key, msg.Topic, msg.Value)
- protoSpan := new(model.ProtoSpan)
- if err = json.Unmarshal(msg.Value, protoSpan); err != nil {
- collectErrCount.Incr("count_error")
- log.Error("unmarshal span from kafka error: %s, value: %v", err, value)
- continue
- }
- c.handler(protoSpan)
- case <-c.closeCh:
- log.V(10).Info("receive closed return")
- return
- }
- }
- }
|