offsets.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package cluster
  2. import (
  3. "sync"
  4. "github.com/Shopify/sarama"
  5. )
  6. // OffsetStash allows to accumulate offsets and
  7. // mark them as processed in a bulk
  8. type OffsetStash struct {
  9. offsets map[topicPartition]offsetInfo
  10. mu sync.Mutex
  11. }
  12. // NewOffsetStash inits a blank stash
  13. func NewOffsetStash() *OffsetStash {
  14. return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
  15. }
  16. // MarkOffset stashes the provided message offset
  17. func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
  18. s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
  19. }
  20. // MarkPartitionOffset stashes the offset for the provided topic/partition combination
  21. func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
  22. s.mu.Lock()
  23. defer s.mu.Unlock()
  24. key := topicPartition{Topic: topic, Partition: partition}
  25. if info := s.offsets[key]; offset >= info.Offset {
  26. info.Offset = offset
  27. info.Metadata = metadata
  28. s.offsets[key] = info
  29. }
  30. }
  31. // Offsets returns the latest stashed offsets by topic-partition
  32. func (s *OffsetStash) Offsets() map[string]int64 {
  33. s.mu.Lock()
  34. defer s.mu.Unlock()
  35. res := make(map[string]int64, len(s.offsets))
  36. for tp, info := range s.offsets {
  37. res[tp.String()] = info.Offset
  38. }
  39. return res
  40. }