consumer_group.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. package consumergroup
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/Shopify/sarama"
  8. "github.com/wvanbergen/kazoo-go"
  9. )
  10. var (
  11. AlreadyClosing = errors.New("The consumer group is already shutting down.")
  12. )
  13. type Config struct {
  14. *sarama.Config
  15. Zookeeper *kazoo.Config
  16. Offsets struct {
  17. Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
  18. ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
  19. CommitInterval time.Duration // The interval between which the processed offsets are commited.
  20. ResetOffsets bool // Resets the offsets for the consumergroup so that it won't resume from where it left off previously.
  21. }
  22. }
  23. func NewConfig() *Config {
  24. config := &Config{}
  25. config.Config = sarama.NewConfig()
  26. config.Zookeeper = kazoo.NewConfig()
  27. config.Offsets.Initial = sarama.OffsetOldest
  28. config.Offsets.ProcessingTimeout = 60 * time.Second
  29. config.Offsets.CommitInterval = 10 * time.Second
  30. return config
  31. }
  32. func (cgc *Config) Validate() error {
  33. if cgc.Zookeeper.Timeout <= 0 {
  34. return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0")
  35. }
  36. if cgc.Offsets.CommitInterval < 0 {
  37. return sarama.ConfigurationError("CommitInterval should have a duration >= 0")
  38. }
  39. if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest {
  40. return errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.")
  41. }
  42. if cgc.Config != nil {
  43. if err := cgc.Config.Validate(); err != nil {
  44. return err
  45. }
  46. }
  47. return nil
  48. }
  49. // The ConsumerGroup type holds all the information for a consumer that is part
  50. // of a consumer group. Call JoinConsumerGroup to start a consumer.
  51. type ConsumerGroup struct {
  52. config *Config
  53. consumer sarama.Consumer
  54. kazoo *kazoo.Kazoo
  55. group *kazoo.Consumergroup
  56. instance *kazoo.ConsumergroupInstance
  57. wg sync.WaitGroup
  58. singleShutdown sync.Once
  59. messages chan *sarama.ConsumerMessage
  60. errors chan error
  61. stopper chan struct{}
  62. consumers kazoo.ConsumergroupInstanceList
  63. offsetManager OffsetManager
  64. }
  65. // Connects to a consumer group, using Zookeeper for auto-discovery
  66. func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {
  67. if name == "" {
  68. return nil, sarama.ConfigurationError("Empty consumergroup name")
  69. }
  70. if len(topics) == 0 {
  71. return nil, sarama.ConfigurationError("No topics provided")
  72. }
  73. if len(zookeeper) == 0 {
  74. return nil, errors.New("You need to provide at least one zookeeper node address!")
  75. }
  76. if config == nil {
  77. config = NewConfig()
  78. }
  79. config.ClientID = name
  80. // Validate configuration
  81. if err = config.Validate(); err != nil {
  82. return
  83. }
  84. var kz *kazoo.Kazoo
  85. if kz, err = kazoo.NewKazoo(zookeeper, config.Zookeeper); err != nil {
  86. return
  87. }
  88. brokers, err := kz.BrokerList()
  89. if err != nil {
  90. kz.Close()
  91. return
  92. }
  93. group := kz.Consumergroup(name)
  94. if config.Offsets.ResetOffsets {
  95. err = group.ResetOffsets()
  96. if err != nil {
  97. kz.Close()
  98. return
  99. }
  100. }
  101. instance := group.NewInstance()
  102. var consumer sarama.Consumer
  103. if consumer, err = sarama.NewConsumer(brokers, config.Config); err != nil {
  104. kz.Close()
  105. return
  106. }
  107. cg = &ConsumerGroup{
  108. config: config,
  109. consumer: consumer,
  110. kazoo: kz,
  111. group: group,
  112. instance: instance,
  113. messages: make(chan *sarama.ConsumerMessage, config.ChannelBufferSize),
  114. errors: make(chan error, config.ChannelBufferSize),
  115. stopper: make(chan struct{}),
  116. }
  117. // Register consumer group
  118. if exists, err := cg.group.Exists(); err != nil {
  119. cg.Logf("FAILED to check for existence of consumergroup: %s!\n", err)
  120. _ = consumer.Close()
  121. _ = kz.Close()
  122. return nil, err
  123. } else if !exists {
  124. cg.Logf("Consumergroup `%s` does not yet exists, creating...\n", cg.group.Name)
  125. if err := cg.group.Create(); err != nil {
  126. cg.Logf("FAILED to create consumergroup in Zookeeper: %s!\n", err)
  127. _ = consumer.Close()
  128. _ = kz.Close()
  129. return nil, err
  130. }
  131. }
  132. // Register itself with zookeeper
  133. if err := cg.instance.Register(topics); err != nil {
  134. cg.Logf("FAILED to register consumer instance: %s!\n", err)
  135. return nil, err
  136. } else {
  137. cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
  138. }
  139. offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
  140. cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)
  141. go cg.topicListConsumer(topics)
  142. return
  143. }
  144. // Returns a channel that you can read to obtain events from Kafka to process.
  145. func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage {
  146. return cg.messages
  147. }
  148. // Returns a channel that you can read to obtain events from Kafka to process.
  149. func (cg *ConsumerGroup) Errors() <-chan error {
  150. return cg.errors
  151. }
  152. func (cg *ConsumerGroup) Closed() bool {
  153. return cg.instance == nil
  154. }
  155. func (cg *ConsumerGroup) Close() error {
  156. shutdownError := AlreadyClosing
  157. cg.singleShutdown.Do(func() {
  158. defer cg.kazoo.Close()
  159. shutdownError = nil
  160. close(cg.stopper)
  161. cg.wg.Wait()
  162. if err := cg.offsetManager.Close(); err != nil {
  163. cg.Logf("FAILED closing the offset manager: %s!\n", err)
  164. }
  165. if shutdownError = cg.instance.Deregister(); shutdownError != nil {
  166. cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError)
  167. } else {
  168. cg.Logf("Deregistered consumer instance %s.\n", cg.instance.ID)
  169. }
  170. if shutdownError = cg.consumer.Close(); shutdownError != nil {
  171. cg.Logf("FAILED closing the Sarama client: %s\n", shutdownError)
  172. }
  173. close(cg.messages)
  174. close(cg.errors)
  175. cg.instance = nil
  176. })
  177. return shutdownError
  178. }
  179. func (cg *ConsumerGroup) Logf(format string, args ...interface{}) {
  180. var identifier string
  181. if cg.instance == nil {
  182. identifier = "(defunct)"
  183. } else {
  184. identifier = cg.instance.ID[len(cg.instance.ID)-12:]
  185. }
  186. sarama.Logger.Printf("[%s/%s] %s", cg.group.Name, identifier, fmt.Sprintf(format, args...))
  187. }
  188. func (cg *ConsumerGroup) InstanceRegistered() (bool, error) {
  189. return cg.instance.Registered()
  190. }
  191. func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error {
  192. cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
  193. return nil
  194. }
  195. func (cg *ConsumerGroup) FlushOffsets() error {
  196. return cg.offsetManager.Flush()
  197. }
  198. func (cg *ConsumerGroup) topicListConsumer(topics []string) {
  199. for {
  200. select {
  201. case <-cg.stopper:
  202. return
  203. default:
  204. }
  205. consumers, consumerChanges, err := cg.group.WatchInstances()
  206. if err != nil {
  207. cg.Logf("FAILED to get list of registered consumer instances: %s\n", err)
  208. return
  209. }
  210. cg.consumers = consumers
  211. cg.Logf("Currently registered consumers: %d\n", len(cg.consumers))
  212. stopper := make(chan struct{})
  213. for _, topic := range topics {
  214. cg.wg.Add(1)
  215. go cg.topicConsumer(topic, cg.messages, cg.errors, stopper)
  216. }
  217. select {
  218. case <-cg.stopper:
  219. close(stopper)
  220. return
  221. case <-consumerChanges:
  222. registered, err := cg.instance.Registered()
  223. if err != nil {
  224. cg.Logf("FAILED to get register status: %s\n", err)
  225. } else if !registered {
  226. err = cg.instance.Register(topics)
  227. if err != nil {
  228. cg.Logf("FAILED to register consumer instance: %s!\n", err)
  229. } else {
  230. cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
  231. }
  232. }
  233. cg.Logf("Triggering rebalance due to consumer list change\n")
  234. close(stopper)
  235. cg.wg.Wait()
  236. }
  237. }
  238. }
  239. func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error, stopper <-chan struct{}) {
  240. defer cg.wg.Done()
  241. select {
  242. case <-stopper:
  243. return
  244. default:
  245. }
  246. cg.Logf("%s :: Started topic consumer\n", topic)
  247. // Fetch a list of partition IDs
  248. partitions, err := cg.kazoo.Topic(topic).Partitions()
  249. if err != nil {
  250. cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err)
  251. cg.errors <- &sarama.ConsumerError{
  252. Topic: topic,
  253. Partition: -1,
  254. Err: err,
  255. }
  256. return
  257. }
  258. partitionLeaders, err := retrievePartitionLeaders(partitions)
  259. if err != nil {
  260. cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
  261. cg.errors <- &sarama.ConsumerError{
  262. Topic: topic,
  263. Partition: -1,
  264. Err: err,
  265. }
  266. return
  267. }
  268. dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitionLeaders)
  269. myPartitions := dividedPartitions[cg.instance.ID]
  270. cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitionLeaders))
  271. // Consume all the assigned partitions
  272. var wg sync.WaitGroup
  273. for _, pid := range myPartitions {
  274. wg.Add(1)
  275. go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper)
  276. }
  277. wg.Wait()
  278. cg.Logf("%s :: Stopped topic consumer\n", topic)
  279. }
  280. func (cg *ConsumerGroup) consumePartition(topic string, partition int32, nextOffset int64) (sarama.PartitionConsumer, error) {
  281. consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset)
  282. if err == sarama.ErrOffsetOutOfRange {
  283. cg.Logf("%s/%d :: Partition consumer offset out of Range.\n", topic, partition)
  284. // if the offset is out of range, simplistically decide whether to use OffsetNewest or OffsetOldest
  285. // if the configuration specified offsetOldest, then switch to the oldest available offset, else
  286. // switch to the newest available offset.
  287. if cg.config.Offsets.Initial == sarama.OffsetOldest {
  288. nextOffset = sarama.OffsetOldest
  289. cg.Logf("%s/%d :: Partition consumer offset reset to oldest available offset.\n", topic, partition)
  290. } else {
  291. nextOffset = sarama.OffsetNewest
  292. cg.Logf("%s/%d :: Partition consumer offset reset to newest available offset.\n", topic, partition)
  293. }
  294. // retry the consumePartition with the adjusted offset
  295. consumer, err = cg.consumer.ConsumePartition(topic, partition, nextOffset)
  296. }
  297. if err != nil {
  298. cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
  299. return nil, err
  300. }
  301. return consumer, err
  302. }
  303. // Consumes a partition
  304. func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- error, wg *sync.WaitGroup, stopper <-chan struct{}) {
  305. defer wg.Done()
  306. select {
  307. case <-stopper:
  308. return
  309. default:
  310. }
  311. // Since ProcessingTimeout is the amount of time we'll wait for the final batch
  312. // of messages to be processed before releasing a partition, we need to wait slightly
  313. // longer than that before timing out here to ensure that another consumer has had
  314. // enough time to release the partition. Hence, +2 seconds.
  315. maxRetries := int(cg.config.Offsets.ProcessingTimeout/time.Second) + 2
  316. for tries := 0; tries < maxRetries; tries++ {
  317. if err := cg.instance.ClaimPartition(topic, partition); err == nil {
  318. break
  319. } else if tries+1 < maxRetries {
  320. if err == kazoo.ErrPartitionClaimedByOther {
  321. // Another consumer still owns this partition. We should wait longer for it to release it.
  322. time.Sleep(1 * time.Second)
  323. } else {
  324. // An unexpected error occurred. Log it and continue trying until we hit the timeout.
  325. cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err)
  326. time.Sleep(1 * time.Second)
  327. }
  328. } else {
  329. cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
  330. cg.errors <- &sarama.ConsumerError{
  331. Topic: topic,
  332. Partition: partition,
  333. Err: err,
  334. }
  335. return
  336. }
  337. }
  338. defer func() {
  339. err := cg.instance.ReleasePartition(topic, partition)
  340. if err != nil {
  341. cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err)
  342. cg.errors <- &sarama.ConsumerError{
  343. Topic: topic,
  344. Partition: partition,
  345. Err: err,
  346. }
  347. }
  348. }()
  349. nextOffset, err := cg.offsetManager.InitializePartition(topic, partition)
  350. if err != nil {
  351. cg.Logf("%s/%d :: FAILED to determine initial offset: %s\n", topic, partition, err)
  352. return
  353. }
  354. if nextOffset >= 0 {
  355. cg.Logf("%s/%d :: Partition consumer starting at offset %d.\n", topic, partition, nextOffset)
  356. } else {
  357. nextOffset = cg.config.Offsets.Initial
  358. if nextOffset == sarama.OffsetOldest {
  359. cg.Logf("%s/%d :: Partition consumer starting at the oldest available offset.\n", topic, partition)
  360. } else if nextOffset == sarama.OffsetNewest {
  361. cg.Logf("%s/%d :: Partition consumer listening for new messages only.\n", topic, partition)
  362. }
  363. }
  364. consumer, err := cg.consumePartition(topic, partition, nextOffset)
  365. if err != nil {
  366. cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
  367. return
  368. }
  369. defer consumer.Close()
  370. err = nil
  371. var lastOffset int64 = -1 // aka unknown
  372. partitionConsumerLoop:
  373. for {
  374. select {
  375. case <-stopper:
  376. break partitionConsumerLoop
  377. case err := <-consumer.Errors():
  378. if err == nil {
  379. cg.Logf("%s/%d :: Consumer encountered an invalid state: re-establishing consumption of partition.\n", topic, partition)
  380. // Errors encountered (if any) are logged in the consumerPartition function
  381. var cErr error
  382. consumer, cErr = cg.consumePartition(topic, partition, lastOffset)
  383. if cErr != nil {
  384. break partitionConsumerLoop
  385. }
  386. continue partitionConsumerLoop
  387. }
  388. for {
  389. select {
  390. case errors <- err:
  391. continue partitionConsumerLoop
  392. case <-stopper:
  393. break partitionConsumerLoop
  394. }
  395. }
  396. case message := <-consumer.Messages():
  397. if message == nil {
  398. cg.Logf("%s/%d :: Consumer encountered an invalid state: re-establishing consumption of partition.\n", topic, partition)
  399. // Errors encountered (if any) are logged in the consumerPartition function
  400. var cErr error
  401. consumer, cErr = cg.consumePartition(topic, partition, lastOffset)
  402. if cErr != nil {
  403. break partitionConsumerLoop
  404. }
  405. continue partitionConsumerLoop
  406. }
  407. for {
  408. select {
  409. case <-stopper:
  410. break partitionConsumerLoop
  411. case messages <- message:
  412. lastOffset = message.Offset
  413. continue partitionConsumerLoop
  414. }
  415. }
  416. }
  417. }
  418. cg.Logf("%s/%d :: Stopping partition consumer at offset %d\n", topic, partition, lastOffset)
  419. if err := cg.offsetManager.FinalizePartition(topic, partition, lastOffset, cg.config.Offsets.ProcessingTimeout); err != nil {
  420. cg.Logf("%s/%d :: %s\n", topic, partition, err)
  421. }
  422. }