balancer.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package cluster
  2. import (
  3. "math"
  4. "sort"
  5. "github.com/Shopify/sarama"
  6. )
  7. // NotificationType defines the type of notification
  8. type NotificationType uint8
  9. // String describes the notification type
  10. func (t NotificationType) String() string {
  11. switch t {
  12. case RebalanceStart:
  13. return "rebalance start"
  14. case RebalanceOK:
  15. return "rebalance OK"
  16. case RebalanceError:
  17. return "rebalance error"
  18. }
  19. return "unknown"
  20. }
  21. const (
  22. UnknownNotification NotificationType = iota
  23. RebalanceStart
  24. RebalanceOK
  25. RebalanceError
  26. )
  27. // Notification are state events emitted by the consumers on rebalance
  28. type Notification struct {
  29. // Type exposes the notification type
  30. Type NotificationType
  31. // Claimed contains topic/partitions that were claimed by this rebalance cycle
  32. Claimed map[string][]int32
  33. // Released contains topic/partitions that were released as part of this rebalance cycle
  34. Released map[string][]int32
  35. // Current are topic/partitions that are currently claimed to the consumer
  36. Current map[string][]int32
  37. }
  38. func newNotification(current map[string][]int32) *Notification {
  39. return &Notification{
  40. Type: RebalanceStart,
  41. Current: current,
  42. }
  43. }
  44. func (n *Notification) success(current map[string][]int32) *Notification {
  45. o := &Notification{
  46. Type: RebalanceOK,
  47. Claimed: make(map[string][]int32),
  48. Released: make(map[string][]int32),
  49. Current: current,
  50. }
  51. for topic, partitions := range current {
  52. o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
  53. }
  54. for topic, partitions := range n.Current {
  55. o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
  56. }
  57. return o
  58. }
  59. // --------------------------------------------------------------------
  60. type topicInfo struct {
  61. Partitions []int32
  62. MemberIDs []string
  63. }
  64. func (info topicInfo) Perform(s Strategy) map[string][]int32 {
  65. if s == StrategyRoundRobin {
  66. return info.RoundRobin()
  67. }
  68. return info.Ranges()
  69. }
  70. func (info topicInfo) Ranges() map[string][]int32 {
  71. sort.Strings(info.MemberIDs)
  72. mlen := len(info.MemberIDs)
  73. plen := len(info.Partitions)
  74. res := make(map[string][]int32, mlen)
  75. for pos, memberID := range info.MemberIDs {
  76. n, i := float64(plen)/float64(mlen), float64(pos)
  77. min := int(math.Floor(i*n + 0.5))
  78. max := int(math.Floor((i+1)*n + 0.5))
  79. sub := info.Partitions[min:max]
  80. if len(sub) > 0 {
  81. res[memberID] = sub
  82. }
  83. }
  84. return res
  85. }
  86. func (info topicInfo) RoundRobin() map[string][]int32 {
  87. sort.Strings(info.MemberIDs)
  88. mlen := len(info.MemberIDs)
  89. res := make(map[string][]int32, mlen)
  90. for i, pnum := range info.Partitions {
  91. memberID := info.MemberIDs[i%mlen]
  92. res[memberID] = append(res[memberID], pnum)
  93. }
  94. return res
  95. }
  96. // --------------------------------------------------------------------
  97. type balancer struct {
  98. client sarama.Client
  99. topics map[string]topicInfo
  100. }
  101. func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
  102. balancer := newBalancer(client)
  103. for memberID, meta := range members {
  104. for _, topic := range meta.Topics {
  105. if err := balancer.Topic(topic, memberID); err != nil {
  106. return nil, err
  107. }
  108. }
  109. }
  110. return balancer, nil
  111. }
  112. func newBalancer(client sarama.Client) *balancer {
  113. return &balancer{
  114. client: client,
  115. topics: make(map[string]topicInfo),
  116. }
  117. }
  118. func (r *balancer) Topic(name string, memberID string) error {
  119. topic, ok := r.topics[name]
  120. if !ok {
  121. nums, err := r.client.Partitions(name)
  122. if err != nil {
  123. return err
  124. }
  125. topic = topicInfo{
  126. Partitions: nums,
  127. MemberIDs: make([]string, 0, 1),
  128. }
  129. }
  130. topic.MemberIDs = append(topic.MemberIDs, memberID)
  131. r.topics[name] = topic
  132. return nil
  133. }
  134. func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
  135. if r == nil {
  136. return nil
  137. }
  138. res := make(map[string]map[string][]int32, 1)
  139. for topic, info := range r.topics {
  140. for memberID, partitions := range info.Perform(s) {
  141. if _, ok := res[memberID]; !ok {
  142. res[memberID] = make(map[string][]int32, 1)
  143. }
  144. res[memberID][topic] = partitions
  145. }
  146. }
  147. return res
  148. }