123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- package consumergroup
- import (
- "errors"
- "fmt"
- "sync"
- "time"
- )
- // OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.
- type OffsetManager interface {
- // InitializePartition is called when the consumergroup is starting to consume a
- // partition. It should return the last processed offset for this partition. Note:
- // the same partition can be initialized multiple times during a single run of a
- // consumer group due to other consumer instances coming online and offline.
- InitializePartition(topic string, partition int32) (int64, error)
- // MarkAsProcessed tells the offset manager than a certain message has been successfully
- // processed by the consumer, and should be committed. The implementation does not have
- // to store this offset right away, but should return true if it intends to do this at
- // some point.
- //
- // Offsets should generally be increasing if the consumer
- // processes events serially, but this cannot be guaranteed if the consumer does any
- // asynchronous processing. This can be handled in various ways, e.g. by only accepting
- // offsets that are higehr than the offsets seen before for the same partition.
- MarkAsProcessed(topic string, partition int32, offset int64) bool
- // Flush tells the offset manager to immediately commit offsets synchronously and to
- // return any errors that may have occured during the process.
- Flush() error
- // FinalizePartition is called when the consumergroup is done consuming a
- // partition. In this method, the offset manager can flush any remaining offsets to its
- // backend store. It should return an error if it was not able to commit the offset.
- // Note: it's possible that the consumergroup instance will start to consume the same
- // partition again after this function is called.
- FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error
- // Close is called when the consumergroup is shutting down. In normal circumstances, all
- // offsets are committed because FinalizePartition is called for all the running partition
- // consumers. You may want to check for this to be true, and try to commit any outstanding
- // offsets. If this doesn't succeed, it should return an error.
- Close() error
- }
- var (
- UncleanClose = errors.New("Not all offsets were committed before shutdown was completed")
- )
- // OffsetManagerConfig holds configuration setting son how the offset manager should behave.
- type OffsetManagerConfig struct {
- CommitInterval time.Duration // Interval between offset flushes to the backend store.
- VerboseLogging bool // Whether to enable verbose logging.
- }
- // NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.
- func NewOffsetManagerConfig() *OffsetManagerConfig {
- return &OffsetManagerConfig{
- CommitInterval: 10 * time.Second,
- }
- }
- type (
- topicOffsets map[int32]*partitionOffsetTracker
- offsetsMap map[string]topicOffsets
- offsetCommitter func(int64) error
- )
- type partitionOffsetTracker struct {
- l sync.Mutex
- waitingForOffset int64
- highestProcessedOffset int64
- lastCommittedOffset int64
- done chan struct{}
- }
- type zookeeperOffsetManager struct {
- config *OffsetManagerConfig
- l sync.RWMutex
- offsets offsetsMap
- cg *ConsumerGroup
- closing, closed, flush chan struct{}
- flushErr chan error
- }
- // NewZookeeperOffsetManager returns an offset manager that uses Zookeeper
- // to store offsets.
- func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager {
- if config == nil {
- config = NewOffsetManagerConfig()
- }
- zom := &zookeeperOffsetManager{
- config: config,
- cg: cg,
- offsets: make(offsetsMap),
- closing: make(chan struct{}),
- closed: make(chan struct{}),
- flush: make(chan struct{}),
- flushErr: make(chan error),
- }
- go zom.offsetCommitter()
- return zom
- }
- func (zom *zookeeperOffsetManager) InitializePartition(topic string, partition int32) (int64, error) {
- zom.l.Lock()
- defer zom.l.Unlock()
- if zom.offsets[topic] == nil {
- zom.offsets[topic] = make(topicOffsets)
- }
- nextOffset, err := zom.cg.group.FetchOffset(topic, partition)
- if err != nil {
- return 0, err
- }
- zom.offsets[topic][partition] = &partitionOffsetTracker{
- highestProcessedOffset: nextOffset - 1,
- lastCommittedOffset: nextOffset - 1,
- done: make(chan struct{}),
- }
- return nextOffset, nil
- }
- func (zom *zookeeperOffsetManager) FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error {
- zom.l.RLock()
- tracker := zom.offsets[topic][partition]
- zom.l.RUnlock()
- if lastOffset >= 0 {
- if lastOffset-tracker.highestProcessedOffset > 0 {
- zom.cg.Logf("%s/%d :: Last processed offset: %d. Waiting up to %ds for another %d messages to process...", topic, partition, tracker.highestProcessedOffset, timeout/time.Second, lastOffset-tracker.highestProcessedOffset)
- if !tracker.waitForOffset(lastOffset, timeout) {
- return fmt.Errorf("TIMEOUT waiting for offset %d. Last committed offset: %d", lastOffset, tracker.lastCommittedOffset)
- }
- }
- if err := zom.commitOffset(topic, partition, tracker); err != nil {
- return fmt.Errorf("FAILED to commit offset %d to Zookeeper. Last committed offset: %d", tracker.highestProcessedOffset, tracker.lastCommittedOffset)
- }
- }
- zom.l.Lock()
- delete(zom.offsets[topic], partition)
- zom.l.Unlock()
- return nil
- }
- func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool {
- zom.l.RLock()
- defer zom.l.RUnlock()
- if p, ok := zom.offsets[topic][partition]; ok {
- return p.markAsProcessed(offset)
- } else {
- return false
- }
- }
- func (zom *zookeeperOffsetManager) Flush() error {
- zom.flush <- struct{}{}
- return <-zom.flushErr
- }
- func (zom *zookeeperOffsetManager) Close() error {
- close(zom.closing)
- <-zom.closed
- zom.l.Lock()
- defer zom.l.Unlock()
- var closeError error
- for _, partitionOffsets := range zom.offsets {
- if len(partitionOffsets) > 0 {
- closeError = UncleanClose
- }
- }
- return closeError
- }
- func (zom *zookeeperOffsetManager) offsetCommitter() {
- var tickerChan <-chan time.Time
- if zom.config.CommitInterval != 0 {
- commitTicker := time.NewTicker(zom.config.CommitInterval)
- tickerChan = commitTicker.C
- defer commitTicker.Stop()
- }
- for {
- select {
- case <-zom.closing:
- close(zom.closed)
- return
- case <-tickerChan:
- if err := zom.commitOffsets(); err != nil {
- zom.cg.errors <- err
- }
- case <-zom.flush:
- zom.flushErr <- zom.commitOffsets()
- }
- }
- }
- func (zom *zookeeperOffsetManager) commitOffsets() error {
- zom.l.RLock()
- defer zom.l.RUnlock()
- var returnErr error
- for topic, partitionOffsets := range zom.offsets {
- for partition, offsetTracker := range partitionOffsets {
- err := zom.commitOffset(topic, partition, offsetTracker)
- switch err {
- case nil:
- // noop
- default:
- returnErr = err
- }
- }
- }
- return returnErr
- }
- func (zom *zookeeperOffsetManager) commitOffset(topic string, partition int32, tracker *partitionOffsetTracker) error {
- err := tracker.commit(func(offset int64) error {
- if offset >= 0 {
- return zom.cg.group.CommitOffset(topic, partition, offset+1)
- } else {
- return nil
- }
- })
- if err != nil {
- zom.cg.Logf("FAILED to commit offset %d for %s/%d!", tracker.highestProcessedOffset, topic, partition)
- } else if zom.config.VerboseLogging {
- zom.cg.Logf("Committed offset %d for %s/%d!", tracker.lastCommittedOffset, topic, partition)
- }
- return err
- }
- // MarkAsProcessed marks the provided offset as highest processed offset if
- // it's higher than any previous offset it has received.
- func (pot *partitionOffsetTracker) markAsProcessed(offset int64) bool {
- pot.l.Lock()
- defer pot.l.Unlock()
- if offset > pot.highestProcessedOffset {
- pot.highestProcessedOffset = offset
- if pot.waitingForOffset == pot.highestProcessedOffset {
- close(pot.done)
- }
- return true
- } else {
- return false
- }
- }
- // Commit calls a committer function if the highest processed offset is out
- // of sync with the last committed offset.
- func (pot *partitionOffsetTracker) commit(committer offsetCommitter) error {
- pot.l.Lock()
- defer pot.l.Unlock()
- if pot.highestProcessedOffset > pot.lastCommittedOffset {
- if err := committer(pot.highestProcessedOffset); err != nil {
- return err
- }
- pot.lastCommittedOffset = pot.highestProcessedOffset
- return nil
- } else {
- return nil
- }
- }
- func (pot *partitionOffsetTracker) waitForOffset(offset int64, timeout time.Duration) bool {
- pot.l.Lock()
- if offset > pot.highestProcessedOffset {
- pot.waitingForOffset = offset
- pot.l.Unlock()
- select {
- case <-pot.done:
- return true
- case <-time.After(timeout):
- return false
- }
- } else {
- pot.l.Unlock()
- return true
- }
- }
|