offset_manager.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package consumergroup
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.
  9. type OffsetManager interface {
  10. // InitializePartition is called when the consumergroup is starting to consume a
  11. // partition. It should return the last processed offset for this partition. Note:
  12. // the same partition can be initialized multiple times during a single run of a
  13. // consumer group due to other consumer instances coming online and offline.
  14. InitializePartition(topic string, partition int32) (int64, error)
  15. // MarkAsProcessed tells the offset manager than a certain message has been successfully
  16. // processed by the consumer, and should be committed. The implementation does not have
  17. // to store this offset right away, but should return true if it intends to do this at
  18. // some point.
  19. //
  20. // Offsets should generally be increasing if the consumer
  21. // processes events serially, but this cannot be guaranteed if the consumer does any
  22. // asynchronous processing. This can be handled in various ways, e.g. by only accepting
  23. // offsets that are higehr than the offsets seen before for the same partition.
  24. MarkAsProcessed(topic string, partition int32, offset int64) bool
  25. // Flush tells the offset manager to immediately commit offsets synchronously and to
  26. // return any errors that may have occured during the process.
  27. Flush() error
  28. // FinalizePartition is called when the consumergroup is done consuming a
  29. // partition. In this method, the offset manager can flush any remaining offsets to its
  30. // backend store. It should return an error if it was not able to commit the offset.
  31. // Note: it's possible that the consumergroup instance will start to consume the same
  32. // partition again after this function is called.
  33. FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error
  34. // Close is called when the consumergroup is shutting down. In normal circumstances, all
  35. // offsets are committed because FinalizePartition is called for all the running partition
  36. // consumers. You may want to check for this to be true, and try to commit any outstanding
  37. // offsets. If this doesn't succeed, it should return an error.
  38. Close() error
  39. }
  40. var (
  41. UncleanClose = errors.New("Not all offsets were committed before shutdown was completed")
  42. )
  43. // OffsetManagerConfig holds configuration setting son how the offset manager should behave.
  44. type OffsetManagerConfig struct {
  45. CommitInterval time.Duration // Interval between offset flushes to the backend store.
  46. VerboseLogging bool // Whether to enable verbose logging.
  47. }
  48. // NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.
  49. func NewOffsetManagerConfig() *OffsetManagerConfig {
  50. return &OffsetManagerConfig{
  51. CommitInterval: 10 * time.Second,
  52. }
  53. }
  54. type (
  55. topicOffsets map[int32]*partitionOffsetTracker
  56. offsetsMap map[string]topicOffsets
  57. offsetCommitter func(int64) error
  58. )
  59. type partitionOffsetTracker struct {
  60. l sync.Mutex
  61. waitingForOffset int64
  62. highestProcessedOffset int64
  63. lastCommittedOffset int64
  64. done chan struct{}
  65. }
  66. type zookeeperOffsetManager struct {
  67. config *OffsetManagerConfig
  68. l sync.RWMutex
  69. offsets offsetsMap
  70. cg *ConsumerGroup
  71. closing, closed, flush chan struct{}
  72. flushErr chan error
  73. }
  74. // NewZookeeperOffsetManager returns an offset manager that uses Zookeeper
  75. // to store offsets.
  76. func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager {
  77. if config == nil {
  78. config = NewOffsetManagerConfig()
  79. }
  80. zom := &zookeeperOffsetManager{
  81. config: config,
  82. cg: cg,
  83. offsets: make(offsetsMap),
  84. closing: make(chan struct{}),
  85. closed: make(chan struct{}),
  86. flush: make(chan struct{}),
  87. flushErr: make(chan error),
  88. }
  89. go zom.offsetCommitter()
  90. return zom
  91. }
  92. func (zom *zookeeperOffsetManager) InitializePartition(topic string, partition int32) (int64, error) {
  93. zom.l.Lock()
  94. defer zom.l.Unlock()
  95. if zom.offsets[topic] == nil {
  96. zom.offsets[topic] = make(topicOffsets)
  97. }
  98. nextOffset, err := zom.cg.group.FetchOffset(topic, partition)
  99. if err != nil {
  100. return 0, err
  101. }
  102. zom.offsets[topic][partition] = &partitionOffsetTracker{
  103. highestProcessedOffset: nextOffset - 1,
  104. lastCommittedOffset: nextOffset - 1,
  105. done: make(chan struct{}),
  106. }
  107. return nextOffset, nil
  108. }
  109. func (zom *zookeeperOffsetManager) FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error {
  110. zom.l.RLock()
  111. tracker := zom.offsets[topic][partition]
  112. zom.l.RUnlock()
  113. if lastOffset >= 0 {
  114. if lastOffset-tracker.highestProcessedOffset > 0 {
  115. zom.cg.Logf("%s/%d :: Last processed offset: %d. Waiting up to %ds for another %d messages to process...", topic, partition, tracker.highestProcessedOffset, timeout/time.Second, lastOffset-tracker.highestProcessedOffset)
  116. if !tracker.waitForOffset(lastOffset, timeout) {
  117. return fmt.Errorf("TIMEOUT waiting for offset %d. Last committed offset: %d", lastOffset, tracker.lastCommittedOffset)
  118. }
  119. }
  120. if err := zom.commitOffset(topic, partition, tracker); err != nil {
  121. return fmt.Errorf("FAILED to commit offset %d to Zookeeper. Last committed offset: %d", tracker.highestProcessedOffset, tracker.lastCommittedOffset)
  122. }
  123. }
  124. zom.l.Lock()
  125. delete(zom.offsets[topic], partition)
  126. zom.l.Unlock()
  127. return nil
  128. }
  129. func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool {
  130. zom.l.RLock()
  131. defer zom.l.RUnlock()
  132. if p, ok := zom.offsets[topic][partition]; ok {
  133. return p.markAsProcessed(offset)
  134. } else {
  135. return false
  136. }
  137. }
  138. func (zom *zookeeperOffsetManager) Flush() error {
  139. zom.flush <- struct{}{}
  140. return <-zom.flushErr
  141. }
  142. func (zom *zookeeperOffsetManager) Close() error {
  143. close(zom.closing)
  144. <-zom.closed
  145. zom.l.Lock()
  146. defer zom.l.Unlock()
  147. var closeError error
  148. for _, partitionOffsets := range zom.offsets {
  149. if len(partitionOffsets) > 0 {
  150. closeError = UncleanClose
  151. }
  152. }
  153. return closeError
  154. }
  155. func (zom *zookeeperOffsetManager) offsetCommitter() {
  156. var tickerChan <-chan time.Time
  157. if zom.config.CommitInterval != 0 {
  158. commitTicker := time.NewTicker(zom.config.CommitInterval)
  159. tickerChan = commitTicker.C
  160. defer commitTicker.Stop()
  161. }
  162. for {
  163. select {
  164. case <-zom.closing:
  165. close(zom.closed)
  166. return
  167. case <-tickerChan:
  168. if err := zom.commitOffsets(); err != nil {
  169. zom.cg.errors <- err
  170. }
  171. case <-zom.flush:
  172. zom.flushErr <- zom.commitOffsets()
  173. }
  174. }
  175. }
  176. func (zom *zookeeperOffsetManager) commitOffsets() error {
  177. zom.l.RLock()
  178. defer zom.l.RUnlock()
  179. var returnErr error
  180. for topic, partitionOffsets := range zom.offsets {
  181. for partition, offsetTracker := range partitionOffsets {
  182. err := zom.commitOffset(topic, partition, offsetTracker)
  183. switch err {
  184. case nil:
  185. // noop
  186. default:
  187. returnErr = err
  188. }
  189. }
  190. }
  191. return returnErr
  192. }
  193. func (zom *zookeeperOffsetManager) commitOffset(topic string, partition int32, tracker *partitionOffsetTracker) error {
  194. err := tracker.commit(func(offset int64) error {
  195. if offset >= 0 {
  196. return zom.cg.group.CommitOffset(topic, partition, offset+1)
  197. } else {
  198. return nil
  199. }
  200. })
  201. if err != nil {
  202. zom.cg.Logf("FAILED to commit offset %d for %s/%d!", tracker.highestProcessedOffset, topic, partition)
  203. } else if zom.config.VerboseLogging {
  204. zom.cg.Logf("Committed offset %d for %s/%d!", tracker.lastCommittedOffset, topic, partition)
  205. }
  206. return err
  207. }
  208. // MarkAsProcessed marks the provided offset as highest processed offset if
  209. // it's higher than any previous offset it has received.
  210. func (pot *partitionOffsetTracker) markAsProcessed(offset int64) bool {
  211. pot.l.Lock()
  212. defer pot.l.Unlock()
  213. if offset > pot.highestProcessedOffset {
  214. pot.highestProcessedOffset = offset
  215. if pot.waitingForOffset == pot.highestProcessedOffset {
  216. close(pot.done)
  217. }
  218. return true
  219. } else {
  220. return false
  221. }
  222. }
  223. // Commit calls a committer function if the highest processed offset is out
  224. // of sync with the last committed offset.
  225. func (pot *partitionOffsetTracker) commit(committer offsetCommitter) error {
  226. pot.l.Lock()
  227. defer pot.l.Unlock()
  228. if pot.highestProcessedOffset > pot.lastCommittedOffset {
  229. if err := committer(pot.highestProcessedOffset); err != nil {
  230. return err
  231. }
  232. pot.lastCommittedOffset = pot.highestProcessedOffset
  233. return nil
  234. } else {
  235. return nil
  236. }
  237. }
  238. func (pot *partitionOffsetTracker) waitForOffset(offset int64, timeout time.Duration) bool {
  239. pot.l.Lock()
  240. if offset > pot.highestProcessedOffset {
  241. pot.waitingForOffset = offset
  242. pot.l.Unlock()
  243. select {
  244. case <-pot.done:
  245. return true
  246. case <-time.After(timeout):
  247. return false
  248. }
  249. } else {
  250. pot.l.Unlock()
  251. return true
  252. }
  253. }