utils.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package consumergroup
  2. import (
  3. "crypto/rand"
  4. "fmt"
  5. "io"
  6. "os"
  7. "sort"
  8. "github.com/wvanbergen/kazoo-go"
  9. )
  10. func retrievePartitionLeaders(partitions kazoo.PartitionList) (partitionLeaders, error) {
  11. pls := make(partitionLeaders, 0, len(partitions))
  12. for _, partition := range partitions {
  13. leader, err := partition.Leader()
  14. if err != nil {
  15. return nil, err
  16. }
  17. pl := partitionLeader{id: partition.ID, leader: leader, partition: partition}
  18. pls = append(pls, pl)
  19. }
  20. return pls, nil
  21. }
  22. // Divides a set of partitions between a set of consumers.
  23. func dividePartitionsBetweenConsumers(consumers kazoo.ConsumergroupInstanceList, partitions partitionLeaders) map[string][]*kazoo.Partition {
  24. result := make(map[string][]*kazoo.Partition)
  25. plen := len(partitions)
  26. clen := len(consumers)
  27. if clen == 0 {
  28. return result
  29. }
  30. sort.Sort(partitions)
  31. sort.Sort(consumers)
  32. n := plen / clen
  33. m := plen % clen
  34. p := 0
  35. for i, consumer := range consumers {
  36. first := p
  37. last := first + n
  38. if m > 0 && i < m {
  39. last++
  40. }
  41. if last > plen {
  42. last = plen
  43. }
  44. for _, pl := range partitions[first:last] {
  45. result[consumer.ID] = append(result[consumer.ID], pl.partition)
  46. }
  47. p = last
  48. }
  49. return result
  50. }
  51. type partitionLeader struct {
  52. id int32
  53. leader int32
  54. partition *kazoo.Partition
  55. }
  56. // A sortable slice of PartitionLeader structs
  57. type partitionLeaders []partitionLeader
  58. func (pls partitionLeaders) Len() int {
  59. return len(pls)
  60. }
  61. func (pls partitionLeaders) Less(i, j int) bool {
  62. return pls[i].leader < pls[j].leader || (pls[i].leader == pls[j].leader && pls[i].id < pls[j].id)
  63. }
  64. func (s partitionLeaders) Swap(i, j int) {
  65. s[i], s[j] = s[j], s[i]
  66. }
  67. func generateUUID() (string, error) {
  68. uuid := make([]byte, 16)
  69. n, err := io.ReadFull(rand.Reader, uuid)
  70. if n != len(uuid) || err != nil {
  71. return "", err
  72. }
  73. // variant bits; see section 4.1.1
  74. uuid[8] = uuid[8]&^0xc0 | 0x80
  75. // version 4 (pseudo-random); see section 4.1.3
  76. uuid[6] = uuid[6]&^0xf0 | 0x40
  77. return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
  78. }
  79. func generateConsumerID() (consumerID string, err error) {
  80. var uuid, hostname string
  81. uuid, err = generateUUID()
  82. if err != nil {
  83. return
  84. }
  85. hostname, err = os.Hostname()
  86. if err != nil {
  87. return
  88. }
  89. consumerID = fmt.Sprintf("%s:%s", hostname, uuid)
  90. return
  91. }