consumer.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864
  1. package cluster
  2. import (
  3. "sort"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/Shopify/sarama"
  8. )
  9. // Consumer is a cluster group consumer
  10. type Consumer struct {
  11. client *Client
  12. ownClient bool
  13. consumer sarama.Consumer
  14. subs *partitionMap
  15. consumerID string
  16. groupID string
  17. memberID string
  18. generationID int32
  19. membershipMu sync.RWMutex
  20. coreTopics []string
  21. extraTopics []string
  22. dying, dead chan none
  23. closeOnce sync.Once
  24. consuming int32
  25. messages chan *sarama.ConsumerMessage
  26. errors chan error
  27. partitions chan PartitionConsumer
  28. notifications chan *Notification
  29. commitMu sync.Mutex
  30. }
  31. // NewConsumer initializes a new consumer
  32. func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
  33. client, err := NewClient(addrs, config)
  34. if err != nil {
  35. return nil, err
  36. }
  37. consumer, err := NewConsumerFromClient(client, groupID, topics)
  38. if err != nil {
  39. return nil, err
  40. }
  41. consumer.ownClient = true
  42. return consumer, nil
  43. }
  44. // NewConsumerFromClient initializes a new consumer from an existing client.
  45. //
  46. // Please note that clients cannot be shared between consumers (due to Kafka internals),
  47. // they can only be re-used which requires the user to call Close() on the first consumer
  48. // before using this method again to initialize another one. Attempts to use a client with
  49. // more than one consumer at a time will return errors.
  50. func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
  51. if !client.claim() {
  52. return nil, errClientInUse
  53. }
  54. consumer, err := sarama.NewConsumerFromClient(client.Client)
  55. if err != nil {
  56. client.release()
  57. return nil, err
  58. }
  59. sort.Strings(topics)
  60. c := &Consumer{
  61. client: client,
  62. consumer: consumer,
  63. subs: newPartitionMap(),
  64. groupID: groupID,
  65. coreTopics: topics,
  66. dying: make(chan none),
  67. dead: make(chan none),
  68. messages: make(chan *sarama.ConsumerMessage),
  69. errors: make(chan error, client.config.ChannelBufferSize),
  70. partitions: make(chan PartitionConsumer, 1),
  71. notifications: make(chan *Notification),
  72. }
  73. if err := c.client.RefreshCoordinator(groupID); err != nil {
  74. client.release()
  75. return nil, err
  76. }
  77. go c.mainLoop()
  78. return c, nil
  79. }
  80. // Messages returns the read channel for the messages that are returned by
  81. // the broker.
  82. //
  83. // This channel will only return if Config.Group.Mode option is set to
  84. // ConsumerModeMultiplex (default).
  85. func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
  86. // Partitions returns the read channels for individual partitions of this broker.
  87. //
  88. // This will channel will only return if Config.Group.Mode option is set to
  89. // ConsumerModePartitions.
  90. //
  91. // The Partitions() channel must be listened to for the life of this consumer;
  92. // when a rebalance happens old partitions will be closed (naturally come to
  93. // completion) and new ones will be emitted. The returned channel will only close
  94. // when the consumer is completely shut down.
  95. func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
  96. // Errors returns a read channel of errors that occur during offset management, if
  97. // enabled. By default, errors are logged and not returned over this channel. If
  98. // you want to implement any custom error handling, set your config's
  99. // Consumer.Return.Errors setting to true, and read from this channel.
  100. func (c *Consumer) Errors() <-chan error { return c.errors }
  101. // Notifications returns a channel of Notifications that occur during consumer
  102. // rebalancing. Notifications will only be emitted over this channel, if your config's
  103. // Group.Return.Notifications setting to true.
  104. func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
  105. // HighWaterMarks returns the current high water marks for each topic and partition
  106. // Consistency between partitions is not guaranteed since high water marks are updated separately.
  107. func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
  108. // MarkOffset marks the provided message as processed, alongside a metadata string
  109. // that represents the state of the partition consumer at that point in time. The
  110. // metadata string can be used by another consumer to restore that state, so it
  111. // can resume consumption.
  112. //
  113. // Note: calling MarkOffset does not necessarily commit the offset to the backend
  114. // store immediately for efficiency reasons, and it may never be committed if
  115. // your application crashes. This means that you may end up processing the same
  116. // message twice, and your processing should ideally be idempotent.
  117. func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
  118. c.subs.Fetch(msg.Topic, msg.Partition).MarkOffset(msg.Offset+1, metadata)
  119. }
  120. // MarkPartitionOffset marks an offset of the provided topic/partition as processed.
  121. // See MarkOffset for additional explanation.
  122. func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
  123. c.subs.Fetch(topic, partition).MarkOffset(offset+1, metadata)
  124. }
  125. // MarkOffsets marks stashed offsets as processed.
  126. // See MarkOffset for additional explanation.
  127. func (c *Consumer) MarkOffsets(s *OffsetStash) {
  128. s.mu.Lock()
  129. defer s.mu.Unlock()
  130. for tp, info := range s.offsets {
  131. c.subs.Fetch(tp.Topic, tp.Partition).MarkOffset(info.Offset+1, info.Metadata)
  132. delete(s.offsets, tp)
  133. }
  134. }
  135. // Subscriptions returns the consumed topics and partitions
  136. func (c *Consumer) Subscriptions() map[string][]int32 {
  137. return c.subs.Info()
  138. }
  139. // CommitOffsets allows to manually commit previously marked offsets. By default there is no
  140. // need to call this function as the consumer will commit offsets automatically
  141. // using the Config.Consumer.Offsets.CommitInterval setting.
  142. //
  143. // Please be aware that calling this function during an internal rebalance cycle may return
  144. // broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
  145. func (c *Consumer) CommitOffsets() error {
  146. c.commitMu.Lock()
  147. defer c.commitMu.Unlock()
  148. memberID, generationID := c.membership()
  149. req := &sarama.OffsetCommitRequest{
  150. Version: 2,
  151. ConsumerGroup: c.groupID,
  152. ConsumerGroupGeneration: generationID,
  153. ConsumerID: memberID,
  154. RetentionTime: -1,
  155. }
  156. if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
  157. req.RetentionTime = int64(ns / time.Millisecond)
  158. }
  159. snap := c.subs.Snapshot()
  160. dirty := false
  161. for tp, state := range snap {
  162. if state.Dirty {
  163. dirty = true
  164. req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
  165. }
  166. }
  167. if !dirty {
  168. return nil
  169. }
  170. broker, err := c.client.Coordinator(c.groupID)
  171. if err != nil {
  172. c.closeCoordinator(broker, err)
  173. return err
  174. }
  175. resp, err := broker.CommitOffset(req)
  176. if err != nil {
  177. c.closeCoordinator(broker, err)
  178. return err
  179. }
  180. for topic, errs := range resp.Errors {
  181. for partition, kerr := range errs {
  182. if kerr != sarama.ErrNoError {
  183. err = kerr
  184. } else if state, ok := snap[topicPartition{topic, partition}]; ok {
  185. c.subs.Fetch(topic, partition).MarkCommitted(state.Info.Offset)
  186. }
  187. }
  188. }
  189. return err
  190. }
  191. // Close safely closes the consumer and releases all resources
  192. func (c *Consumer) Close() (err error) {
  193. c.closeOnce.Do(func() {
  194. close(c.dying)
  195. <-c.dead
  196. if e := c.release(); e != nil {
  197. err = e
  198. }
  199. if e := c.consumer.Close(); e != nil {
  200. err = e
  201. }
  202. close(c.messages)
  203. close(c.errors)
  204. if e := c.leaveGroup(); e != nil {
  205. err = e
  206. }
  207. close(c.partitions)
  208. close(c.notifications)
  209. c.client.release()
  210. if c.ownClient {
  211. if e := c.client.Close(); e != nil {
  212. err = e
  213. }
  214. }
  215. })
  216. return
  217. }
  218. func (c *Consumer) mainLoop() {
  219. defer close(c.dead)
  220. defer atomic.StoreInt32(&c.consuming, 0)
  221. for {
  222. atomic.StoreInt32(&c.consuming, 0)
  223. // Check if close was requested
  224. select {
  225. case <-c.dying:
  226. return
  227. default:
  228. }
  229. // Start next consume cycle
  230. c.nextTick()
  231. }
  232. }
  233. func (c *Consumer) nextTick() {
  234. // Remember previous subscriptions
  235. var notification *Notification
  236. if c.client.config.Group.Return.Notifications {
  237. notification = newNotification(c.subs.Info())
  238. }
  239. // Refresh coordinator
  240. if err := c.refreshCoordinator(); err != nil {
  241. c.rebalanceError(err, nil)
  242. return
  243. }
  244. // Release subscriptions
  245. if err := c.release(); err != nil {
  246. c.rebalanceError(err, nil)
  247. return
  248. }
  249. // Issue rebalance start notification
  250. if c.client.config.Group.Return.Notifications {
  251. c.handleNotification(notification)
  252. }
  253. // Rebalance, fetch new subscriptions
  254. subs, err := c.rebalance()
  255. if err != nil {
  256. c.rebalanceError(err, notification)
  257. return
  258. }
  259. // Coordinate loops, make sure everything is
  260. // stopped on exit
  261. tomb := newLoopTomb()
  262. defer tomb.Close()
  263. // Start the heartbeat
  264. tomb.Go(c.hbLoop)
  265. // Subscribe to topic/partitions
  266. if err := c.subscribe(tomb, subs); err != nil {
  267. c.rebalanceError(err, notification)
  268. return
  269. }
  270. // Update/issue notification with new claims
  271. if c.client.config.Group.Return.Notifications {
  272. notification = notification.success(subs)
  273. c.handleNotification(notification)
  274. }
  275. // Start topic watcher loop
  276. tomb.Go(c.twLoop)
  277. // Start consuming and committing offsets
  278. tomb.Go(c.cmLoop)
  279. atomic.StoreInt32(&c.consuming, 1)
  280. // Wait for signals
  281. select {
  282. case <-tomb.Dying():
  283. case <-c.dying:
  284. }
  285. }
  286. // heartbeat loop, triggered by the mainLoop
  287. func (c *Consumer) hbLoop(stopped <-chan none) {
  288. ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
  289. defer ticker.Stop()
  290. for {
  291. select {
  292. case <-ticker.C:
  293. switch err := c.heartbeat(); err {
  294. case nil, sarama.ErrNoError:
  295. case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
  296. return
  297. default:
  298. c.handleError(&Error{Ctx: "heartbeat", error: err})
  299. return
  300. }
  301. case <-stopped:
  302. return
  303. case <-c.dying:
  304. return
  305. }
  306. }
  307. }
  308. // topic watcher loop, triggered by the mainLoop
  309. func (c *Consumer) twLoop(stopped <-chan none) {
  310. ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
  311. defer ticker.Stop()
  312. for {
  313. select {
  314. case <-ticker.C:
  315. topics, err := c.client.Topics()
  316. if err != nil {
  317. c.handleError(&Error{Ctx: "topics", error: err})
  318. return
  319. }
  320. for _, topic := range topics {
  321. if !c.isKnownCoreTopic(topic) &&
  322. !c.isKnownExtraTopic(topic) &&
  323. c.isPotentialExtraTopic(topic) {
  324. return
  325. }
  326. }
  327. case <-stopped:
  328. return
  329. case <-c.dying:
  330. return
  331. }
  332. }
  333. }
  334. // commit loop, triggered by the mainLoop
  335. func (c *Consumer) cmLoop(stopped <-chan none) {
  336. ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
  337. defer ticker.Stop()
  338. for {
  339. select {
  340. case <-ticker.C:
  341. if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
  342. c.handleError(&Error{Ctx: "commit", error: err})
  343. return
  344. }
  345. case <-stopped:
  346. return
  347. case <-c.dying:
  348. return
  349. }
  350. }
  351. }
  352. func (c *Consumer) rebalanceError(err error, n *Notification) {
  353. if n != nil {
  354. n.Type = RebalanceError
  355. c.handleNotification(n)
  356. }
  357. switch err {
  358. case sarama.ErrRebalanceInProgress:
  359. default:
  360. c.handleError(&Error{Ctx: "rebalance", error: err})
  361. }
  362. select {
  363. case <-c.dying:
  364. case <-time.After(c.client.config.Metadata.Retry.Backoff):
  365. }
  366. }
  367. func (c *Consumer) handleNotification(n *Notification) {
  368. if c.client.config.Group.Return.Notifications {
  369. select {
  370. case c.notifications <- n:
  371. case <-c.dying:
  372. return
  373. }
  374. }
  375. }
  376. func (c *Consumer) handleError(e *Error) {
  377. if c.client.config.Consumer.Return.Errors {
  378. select {
  379. case c.errors <- e:
  380. case <-c.dying:
  381. return
  382. }
  383. } else {
  384. sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
  385. }
  386. }
  387. // Releases the consumer and commits offsets, called from rebalance() and Close()
  388. func (c *Consumer) release() (err error) {
  389. // Stop all consumers
  390. c.subs.Stop()
  391. // Clear subscriptions on exit
  392. defer c.subs.Clear()
  393. // Wait for messages to be processed
  394. timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
  395. defer timeout.Stop()
  396. select {
  397. case <-c.dying:
  398. case <-timeout.C:
  399. }
  400. // Commit offsets, continue on errors
  401. if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
  402. err = e
  403. }
  404. return
  405. }
  406. // --------------------------------------------------------------------
  407. // Performs a heartbeat, part of the mainLoop()
  408. func (c *Consumer) heartbeat() error {
  409. broker, err := c.client.Coordinator(c.groupID)
  410. if err != nil {
  411. c.closeCoordinator(broker, err)
  412. return err
  413. }
  414. memberID, generationID := c.membership()
  415. resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
  416. GroupId: c.groupID,
  417. MemberId: memberID,
  418. GenerationId: generationID,
  419. })
  420. if err != nil {
  421. c.closeCoordinator(broker, err)
  422. return err
  423. }
  424. return resp.Err
  425. }
  426. // Performs a rebalance, part of the mainLoop()
  427. func (c *Consumer) rebalance() (map[string][]int32, error) {
  428. memberID, _ := c.membership()
  429. sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
  430. allTopics, err := c.client.Topics()
  431. if err != nil {
  432. return nil, err
  433. }
  434. c.extraTopics = c.selectExtraTopics(allTopics)
  435. sort.Strings(c.extraTopics)
  436. // Re-join consumer group
  437. strategy, err := c.joinGroup()
  438. switch {
  439. case err == sarama.ErrUnknownMemberId:
  440. c.membershipMu.Lock()
  441. c.memberID = ""
  442. c.membershipMu.Unlock()
  443. return nil, err
  444. case err != nil:
  445. return nil, err
  446. }
  447. // Sync consumer group state, fetch subscriptions
  448. subs, err := c.syncGroup(strategy)
  449. switch {
  450. case err == sarama.ErrRebalanceInProgress:
  451. return nil, err
  452. case err != nil:
  453. _ = c.leaveGroup()
  454. return nil, err
  455. }
  456. return subs, nil
  457. }
  458. // Performs the subscription, part of the mainLoop()
  459. func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
  460. // fetch offsets
  461. offsets, err := c.fetchOffsets(subs)
  462. if err != nil {
  463. _ = c.leaveGroup()
  464. return err
  465. }
  466. // create consumers in parallel
  467. var mu sync.Mutex
  468. var wg sync.WaitGroup
  469. for topic, partitions := range subs {
  470. for _, partition := range partitions {
  471. wg.Add(1)
  472. info := offsets[topic][partition]
  473. go func(topic string, partition int32) {
  474. if e := c.createConsumer(tomb, topic, partition, info); e != nil {
  475. mu.Lock()
  476. err = e
  477. mu.Unlock()
  478. }
  479. wg.Done()
  480. }(topic, partition)
  481. }
  482. }
  483. wg.Wait()
  484. if err != nil {
  485. _ = c.release()
  486. _ = c.leaveGroup()
  487. }
  488. return err
  489. }
  490. // --------------------------------------------------------------------
  491. // Send a request to the broker to join group on rebalance()
  492. func (c *Consumer) joinGroup() (*balancer, error) {
  493. memberID, _ := c.membership()
  494. req := &sarama.JoinGroupRequest{
  495. GroupId: c.groupID,
  496. MemberId: memberID,
  497. SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
  498. ProtocolType: "consumer",
  499. }
  500. meta := &sarama.ConsumerGroupMemberMetadata{
  501. Version: 1,
  502. Topics: append(c.coreTopics, c.extraTopics...),
  503. UserData: c.client.config.Group.Member.UserData,
  504. }
  505. err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
  506. if err != nil {
  507. return nil, err
  508. }
  509. err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
  510. if err != nil {
  511. return nil, err
  512. }
  513. broker, err := c.client.Coordinator(c.groupID)
  514. if err != nil {
  515. c.closeCoordinator(broker, err)
  516. return nil, err
  517. }
  518. resp, err := broker.JoinGroup(req)
  519. if err != nil {
  520. c.closeCoordinator(broker, err)
  521. return nil, err
  522. } else if resp.Err != sarama.ErrNoError {
  523. c.closeCoordinator(broker, resp.Err)
  524. return nil, resp.Err
  525. }
  526. var strategy *balancer
  527. if resp.LeaderId == resp.MemberId {
  528. members, err := resp.GetMembers()
  529. if err != nil {
  530. return nil, err
  531. }
  532. strategy, err = newBalancerFromMeta(c.client, members)
  533. if err != nil {
  534. return nil, err
  535. }
  536. }
  537. c.membershipMu.Lock()
  538. c.memberID = resp.MemberId
  539. c.generationID = resp.GenerationId
  540. c.membershipMu.Unlock()
  541. return strategy, nil
  542. }
  543. // Send a request to the broker to sync the group on rebalance().
  544. // Returns a list of topics and partitions to consume.
  545. func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
  546. memberID, generationID := c.membership()
  547. req := &sarama.SyncGroupRequest{
  548. GroupId: c.groupID,
  549. MemberId: memberID,
  550. GenerationId: generationID,
  551. }
  552. for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
  553. if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
  554. Version: 1,
  555. Topics: topics,
  556. }); err != nil {
  557. return nil, err
  558. }
  559. }
  560. broker, err := c.client.Coordinator(c.groupID)
  561. if err != nil {
  562. c.closeCoordinator(broker, err)
  563. return nil, err
  564. }
  565. resp, err := broker.SyncGroup(req)
  566. if err != nil {
  567. c.closeCoordinator(broker, err)
  568. return nil, err
  569. } else if resp.Err != sarama.ErrNoError {
  570. c.closeCoordinator(broker, resp.Err)
  571. return nil, resp.Err
  572. }
  573. // Return if there is nothing to subscribe to
  574. if len(resp.MemberAssignment) == 0 {
  575. return nil, nil
  576. }
  577. // Get assigned subscriptions
  578. members, err := resp.GetMemberAssignment()
  579. if err != nil {
  580. return nil, err
  581. }
  582. // Sort partitions, for each topic
  583. for topic := range members.Topics {
  584. sort.Sort(int32Slice(members.Topics[topic]))
  585. }
  586. return members.Topics, nil
  587. }
  588. // Fetches latest committed offsets for all subscriptions
  589. func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
  590. offsets := make(map[string]map[int32]offsetInfo, len(subs))
  591. req := &sarama.OffsetFetchRequest{
  592. Version: 1,
  593. ConsumerGroup: c.groupID,
  594. }
  595. for topic, partitions := range subs {
  596. offsets[topic] = make(map[int32]offsetInfo, len(partitions))
  597. for _, partition := range partitions {
  598. offsets[topic][partition] = offsetInfo{Offset: -1}
  599. req.AddPartition(topic, partition)
  600. }
  601. }
  602. broker, err := c.client.Coordinator(c.groupID)
  603. if err != nil {
  604. c.closeCoordinator(broker, err)
  605. return nil, err
  606. }
  607. resp, err := broker.FetchOffset(req)
  608. if err != nil {
  609. c.closeCoordinator(broker, err)
  610. return nil, err
  611. }
  612. for topic, partitions := range subs {
  613. for _, partition := range partitions {
  614. block := resp.GetBlock(topic, partition)
  615. if block == nil {
  616. return nil, sarama.ErrIncompleteResponse
  617. }
  618. if block.Err == sarama.ErrNoError {
  619. offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
  620. } else {
  621. return nil, block.Err
  622. }
  623. }
  624. }
  625. return offsets, nil
  626. }
  627. // Send a request to the broker to leave the group on failes rebalance() and on Close()
  628. func (c *Consumer) leaveGroup() error {
  629. broker, err := c.client.Coordinator(c.groupID)
  630. if err != nil {
  631. c.closeCoordinator(broker, err)
  632. return err
  633. }
  634. memberID, _ := c.membership()
  635. if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
  636. GroupId: c.groupID,
  637. MemberId: memberID,
  638. }); err != nil {
  639. c.closeCoordinator(broker, err)
  640. }
  641. return err
  642. }
  643. // --------------------------------------------------------------------
  644. func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
  645. memberID, _ := c.membership()
  646. sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
  647. // Create partitionConsumer
  648. pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
  649. if err != nil {
  650. return err
  651. }
  652. // Store in subscriptions
  653. c.subs.Store(topic, partition, pc)
  654. // Start partition consumer goroutine
  655. tomb.Go(func(stopper <-chan none) {
  656. if c.client.config.Group.Mode == ConsumerModePartitions {
  657. pc.WaitFor(stopper, c.errors)
  658. } else {
  659. pc.Multiplex(stopper, c.messages, c.errors)
  660. }
  661. })
  662. if c.client.config.Group.Mode == ConsumerModePartitions {
  663. c.partitions <- pc
  664. }
  665. return nil
  666. }
  667. func (c *Consumer) commitOffsetsWithRetry(retries int) error {
  668. err := c.CommitOffsets()
  669. if err != nil && retries > 0 {
  670. return c.commitOffsetsWithRetry(retries - 1)
  671. }
  672. return err
  673. }
  674. func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
  675. if broker != nil {
  676. _ = broker.Close()
  677. }
  678. switch err {
  679. case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
  680. _ = c.client.RefreshCoordinator(c.groupID)
  681. }
  682. }
  683. func (c *Consumer) selectExtraTopics(allTopics []string) []string {
  684. extra := allTopics[:0]
  685. for _, topic := range allTopics {
  686. if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
  687. extra = append(extra, topic)
  688. }
  689. }
  690. return extra
  691. }
  692. func (c *Consumer) isKnownCoreTopic(topic string) bool {
  693. pos := sort.SearchStrings(c.coreTopics, topic)
  694. return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
  695. }
  696. func (c *Consumer) isKnownExtraTopic(topic string) bool {
  697. pos := sort.SearchStrings(c.extraTopics, topic)
  698. return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
  699. }
  700. func (c *Consumer) isPotentialExtraTopic(topic string) bool {
  701. rx := c.client.config.Group.Topics
  702. if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
  703. return false
  704. }
  705. if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
  706. return true
  707. }
  708. return false
  709. }
  710. func (c *Consumer) refreshCoordinator() error {
  711. if err := c.refreshMetadata(); err != nil {
  712. return err
  713. }
  714. return c.client.RefreshCoordinator(c.groupID)
  715. }
  716. func (c *Consumer) refreshMetadata() (err error) {
  717. if c.client.config.Metadata.Full {
  718. err = c.client.RefreshMetadata()
  719. } else {
  720. var topics []string
  721. if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
  722. err = c.client.RefreshMetadata(topics...)
  723. }
  724. }
  725. // maybe we didn't have authorization to describe all topics
  726. switch err {
  727. case sarama.ErrTopicAuthorizationFailed:
  728. err = c.client.RefreshMetadata(c.coreTopics...)
  729. }
  730. return
  731. }
  732. func (c *Consumer) membership() (memberID string, generationID int32) {
  733. c.membershipMu.RLock()
  734. memberID, generationID = c.memberID, c.generationID
  735. c.membershipMu.RUnlock()
  736. return
  737. }