partitions.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package cluster
  2. import (
  3. "sort"
  4. "sync"
  5. "time"
  6. "github.com/Shopify/sarama"
  7. )
  8. // PartitionConsumer allows code to consume individual partitions from the cluster.
  9. //
  10. // See docs for Consumer.Partitions() for more on how to implement this.
  11. type PartitionConsumer interface {
  12. // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown, drain
  13. // the Messages channel, harvest any errors & return them to the caller and trigger a rebalance.
  14. Close() error
  15. // Messages returns the read channel for the messages that are returned by
  16. // the broker.
  17. Messages() <-chan *sarama.ConsumerMessage
  18. // HighWaterMarkOffset returns the high water mark offset of the partition,
  19. // i.e. the offset that will be used for the next message that will be produced.
  20. // You can use this to determine how far behind the processing is.
  21. HighWaterMarkOffset() int64
  22. // Topic returns the consumed topic name
  23. Topic() string
  24. // Partition returns the consumed partition
  25. Partition() int32
  26. }
  27. type partitionConsumer struct {
  28. sarama.PartitionConsumer
  29. state partitionState
  30. mu sync.Mutex
  31. topic string
  32. partition int32
  33. once sync.Once
  34. dying, dead chan none
  35. }
  36. func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
  37. pcm, err := manager.ConsumePartition(topic, partition, info.NextOffset(defaultOffset))
  38. // Resume from default offset, if requested offset is out-of-range
  39. if err == sarama.ErrOffsetOutOfRange {
  40. info.Offset = -1
  41. pcm, err = manager.ConsumePartition(topic, partition, defaultOffset)
  42. }
  43. if err != nil {
  44. return nil, err
  45. }
  46. return &partitionConsumer{
  47. PartitionConsumer: pcm,
  48. state: partitionState{Info: info},
  49. topic: topic,
  50. partition: partition,
  51. dying: make(chan none),
  52. dead: make(chan none),
  53. }, nil
  54. }
  55. // Topic implements PartitionConsumer
  56. func (c *partitionConsumer) Topic() string { return c.topic }
  57. // Partition implements PartitionConsumer
  58. func (c *partitionConsumer) Partition() int32 { return c.partition }
  59. func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) {
  60. defer close(c.dead)
  61. for {
  62. select {
  63. case err, ok := <-c.Errors():
  64. if !ok {
  65. return
  66. }
  67. select {
  68. case errors <- err:
  69. case <-stopper:
  70. return
  71. case <-c.dying:
  72. return
  73. }
  74. case <-stopper:
  75. return
  76. case <-c.dying:
  77. return
  78. }
  79. }
  80. }
  81. func (c *partitionConsumer) Multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
  82. defer close(c.dead)
  83. for {
  84. select {
  85. case msg, ok := <-c.Messages():
  86. if !ok {
  87. return
  88. }
  89. select {
  90. case messages <- msg:
  91. case <-stopper:
  92. return
  93. case <-c.dying:
  94. return
  95. }
  96. case err, ok := <-c.Errors():
  97. if !ok {
  98. return
  99. }
  100. select {
  101. case errors <- err:
  102. case <-stopper:
  103. return
  104. case <-c.dying:
  105. return
  106. }
  107. case <-stopper:
  108. return
  109. case <-c.dying:
  110. return
  111. }
  112. }
  113. }
  114. func (c *partitionConsumer) Close() (err error) {
  115. c.once.Do(func() {
  116. err = c.PartitionConsumer.Close()
  117. close(c.dying)
  118. })
  119. <-c.dead
  120. return err
  121. }
  122. func (c *partitionConsumer) State() partitionState {
  123. if c == nil {
  124. return partitionState{}
  125. }
  126. c.mu.Lock()
  127. state := c.state
  128. c.mu.Unlock()
  129. return state
  130. }
  131. func (c *partitionConsumer) MarkCommitted(offset int64) {
  132. if c == nil {
  133. return
  134. }
  135. c.mu.Lock()
  136. if offset == c.state.Info.Offset {
  137. c.state.Dirty = false
  138. }
  139. c.mu.Unlock()
  140. }
  141. func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
  142. if c == nil {
  143. return
  144. }
  145. c.mu.Lock()
  146. if offset > c.state.Info.Offset {
  147. c.state.Info.Offset = offset
  148. c.state.Info.Metadata = metadata
  149. c.state.Dirty = true
  150. }
  151. c.mu.Unlock()
  152. }
  153. // --------------------------------------------------------------------
  154. type partitionState struct {
  155. Info offsetInfo
  156. Dirty bool
  157. LastCommit time.Time
  158. }
  159. // --------------------------------------------------------------------
  160. type partitionMap struct {
  161. data map[topicPartition]*partitionConsumer
  162. mu sync.RWMutex
  163. }
  164. func newPartitionMap() *partitionMap {
  165. return &partitionMap{
  166. data: make(map[topicPartition]*partitionConsumer),
  167. }
  168. }
  169. func (m *partitionMap) IsSubscribedTo(topic string) bool {
  170. m.mu.RLock()
  171. defer m.mu.RUnlock()
  172. for tp := range m.data {
  173. if tp.Topic == topic {
  174. return true
  175. }
  176. }
  177. return false
  178. }
  179. func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
  180. m.mu.RLock()
  181. pc, _ := m.data[topicPartition{topic, partition}]
  182. m.mu.RUnlock()
  183. return pc
  184. }
  185. func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
  186. m.mu.Lock()
  187. m.data[topicPartition{topic, partition}] = pc
  188. m.mu.Unlock()
  189. }
  190. func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
  191. m.mu.RLock()
  192. defer m.mu.RUnlock()
  193. snap := make(map[topicPartition]partitionState, len(m.data))
  194. for tp, pc := range m.data {
  195. snap[tp] = pc.State()
  196. }
  197. return snap
  198. }
  199. func (m *partitionMap) Stop() {
  200. m.mu.RLock()
  201. defer m.mu.RUnlock()
  202. var wg sync.WaitGroup
  203. for tp := range m.data {
  204. wg.Add(1)
  205. go func(p *partitionConsumer) {
  206. _ = p.Close()
  207. wg.Done()
  208. }(m.data[tp])
  209. }
  210. wg.Wait()
  211. }
  212. func (m *partitionMap) Clear() {
  213. m.mu.Lock()
  214. for tp := range m.data {
  215. delete(m.data, tp)
  216. }
  217. m.mu.Unlock()
  218. }
  219. func (m *partitionMap) Info() map[string][]int32 {
  220. info := make(map[string][]int32)
  221. m.mu.RLock()
  222. for tp := range m.data {
  223. info[tp.Topic] = append(info[tp.Topic], tp.Partition)
  224. }
  225. m.mu.RUnlock()
  226. for topic := range info {
  227. sort.Sort(int32Slice(info[topic]))
  228. }
  229. return info
  230. }