consumer.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "go-common/app/interface/openplatform/monitor-end/conf"
  8. "go-common/app/interface/openplatform/monitor-end/model/kafka"
  9. "go-common/library/log"
  10. "github.com/Shopify/sarama"
  11. cluster "github.com/bsm/sarama-cluster"
  12. )
  13. const _group = "open-monitor-end"
  14. // Consumer .
  15. type Consumer struct {
  16. // c sarama.Consumer
  17. c *cluster.Consumer
  18. Config *kafka.Config
  19. closed bool
  20. paused bool
  21. duration time.Duration
  22. messages chan *sarama.ConsumerMessage
  23. }
  24. var (
  25. errClosedMsgChannel = errors.New("message channel is closed")
  26. errClosedNotifyChannel = errors.New("notification channel is closed")
  27. errConsumerOver = errors.New("too many consumers")
  28. )
  29. // NewConsumer .
  30. func NewConsumer(c *conf.Config) (con *Consumer, err error) {
  31. con = &Consumer{
  32. Config: c.Kafka,
  33. messages: make(chan *sarama.ConsumerMessage, 1024),
  34. }
  35. cfg := cluster.NewConfig()
  36. cfg.Version = sarama.V0_8_2_0
  37. cfg.ClientID = fmt.Sprintf("%s-%s", _group, c.Kafka.Topic)
  38. cfg.Net.KeepAlive = 30 * time.Second
  39. // NOTE cluster auto commit offset interval
  40. cfg.Consumer.Offsets.CommitInterval = time.Second * 1
  41. // NOTE set fetch.wait.max.ms
  42. cfg.Consumer.MaxWaitTime = time.Millisecond * 250
  43. cfg.Consumer.MaxProcessingTime = 50 * time.Millisecond
  44. // NOTE errors that occur during offset management,if enabled, c.Errors channel must be read
  45. cfg.Consumer.Return.Errors = true
  46. // NOTE notifications that occur during consumer, if enabled, c.Notifications channel must be read
  47. cfg.Group.Return.Notifications = true
  48. // con.c = sarama.NewConsumer(c.Kafka.Addr, nil)
  49. // consumer.Partitions = consumer.c.Partitions(consumer.Config.Topic)
  50. cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
  51. if con.c, err = cluster.NewConsumer(c.Kafka.Addr, _group, []string{c.Kafka.Topic}, cfg); err != nil {
  52. log.Error("s.NewConsumer group(%s) topic(%s) addr(%s) cluster.NewConsumer() error(%v)", _group, c.Kafka.Topic, strings.Join(c.Kafka.Addr, ","), err)
  53. } else {
  54. log.Info("s.NewConsumer group(%s) topic(%s) addr(%s) cluster.NewConsumer() ok", _group, c.Kafka.Topic, strings.Join(c.Kafka.Addr, ","))
  55. }
  56. return
  57. }
  58. func (s *Service) consume() {
  59. for {
  60. if s.consumer.closed {
  61. return
  62. }
  63. if err := s.consumer.message(); err != nil {
  64. time.Sleep(time.Minute)
  65. }
  66. }
  67. }
  68. func (s *Service) handleMsg() {
  69. for {
  70. select {
  71. case msg := <-s.consumer.messages:
  72. s.HandleMsg(msg.Value)
  73. }
  74. if s.consumer.closed {
  75. return
  76. }
  77. }
  78. }
  79. func (s *Consumer) message() (err error) {
  80. var (
  81. msg *sarama.ConsumerMessage
  82. notify *cluster.Notification
  83. ok bool
  84. )
  85. for {
  86. if s.closed {
  87. s.c.Close()
  88. err = nil
  89. return
  90. }
  91. if s.paused {
  92. s.paused = false
  93. time.Sleep(s.duration)
  94. }
  95. select {
  96. case err = <-s.c.Errors():
  97. log.Error("group(%s) topic(%s) addr(%s) catch error(%v)", _group, s.Config.Topic, s.Config.Addr, err)
  98. return
  99. case notify, ok = <-s.c.Notifications():
  100. if !ok {
  101. log.Info("notification notOk group(%s) topic(%s) addr(%v) catch error(%v)", _group, s.Config.Topic, s.Config.Addr, err)
  102. err = errClosedNotifyChannel
  103. return
  104. }
  105. switch notify.Type {
  106. case cluster.UnknownNotification, cluster.RebalanceError:
  107. log.Error("notification(%s) group(%s) topic(%s) addr(%v) catch error(%v)", notify.Type, _group, s.Config.Topic, s.Config.Addr, err)
  108. err = errClosedNotifyChannel
  109. return
  110. case cluster.RebalanceStart:
  111. log.Info("notification(%s) group(%s) topic(%s) addr(%v) catch error(%v)", notify.Type, _group, s.Config.Topic, s.Config.Addr, err)
  112. continue
  113. case cluster.RebalanceOK:
  114. log.Info("notification(%s) group(%s) topic(%s) addr(%v) catch error(%v)", notify.Type, _group, s.Config.Topic, s.Config.Addr, err)
  115. }
  116. if len(notify.Current[s.Config.Topic]) == 0 {
  117. log.Warn("notification(%s) no topic group(%s) topic(%s) addr(%v) catch error(%v)", notify.Type, _group, s.Config.Topic, s.Config.Addr, err)
  118. err = errConsumerOver
  119. return
  120. }
  121. case msg, ok = <-s.c.Messages():
  122. if !ok {
  123. log.Error("group(%s) topic(%s) addr(%v) message channel closed", _group, s.Config.Topic, s.Config.Addr)
  124. err = errClosedMsgChannel
  125. return
  126. }
  127. s.messages <- msg
  128. }
  129. }
  130. }