kazoo.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package kazoo
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "path"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/samuel/go-zookeeper/zk"
  11. )
  12. var (
  13. FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
  14. )
  15. // ParseConnectionString parses a zookeeper connection string in the form of
  16. // host1:2181,host2:2181/chroot and returns the list of servers, and the chroot.
  17. func ParseConnectionString(zookeeper string) (nodes []string, chroot string) {
  18. nodesAndChroot := strings.SplitN(zookeeper, "/", 2)
  19. if len(nodesAndChroot) == 2 {
  20. chroot = fmt.Sprintf("/%s", nodesAndChroot[1])
  21. }
  22. nodes = strings.Split(nodesAndChroot[0], ",")
  23. return
  24. }
  25. // BuildConnectionString builds a Zookeeper connection string for a list of nodes.
  26. // Returns a string like "zk1:2181,zk2:2181,zk3:2181"
  27. func BuildConnectionString(nodes []string) string {
  28. return strings.Join(nodes, ",")
  29. }
  30. // ConnectionStringWithChroot builds a Zookeeper connection string for a list
  31. // of nodes and a chroot. The chroot should start with "/".
  32. // Returns a string like "zk1:2181,zk2:2181,zk3:2181/chroot"
  33. func BuildConnectionStringWithChroot(nodes []string, chroot string) string {
  34. return fmt.Sprintf("%s%s", strings.Join(nodes, ","), chroot)
  35. }
  36. // Kazoo interacts with the Kafka metadata in Zookeeper
  37. type Kazoo struct {
  38. conn *zk.Conn
  39. conf *Config
  40. }
  41. // Config holds configuration values f.
  42. type Config struct {
  43. // The chroot the Kafka installation is registerde under. Defaults to "".
  44. Chroot string
  45. // The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster
  46. // before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second.
  47. Timeout time.Duration
  48. }
  49. // NewConfig instantiates a new Config struct with sane defaults.
  50. func NewConfig() *Config {
  51. return &Config{Timeout: 1 * time.Second}
  52. }
  53. // NewKazoo creates a new connection instance
  54. func NewKazoo(servers []string, conf *Config) (*Kazoo, error) {
  55. if conf == nil {
  56. conf = NewConfig()
  57. }
  58. conn, _, err := zk.Connect(servers, conf.Timeout)
  59. if err != nil {
  60. return nil, err
  61. }
  62. return &Kazoo{conn, conf}, nil
  63. }
  64. // NewKazooFromConnectionString creates a new connection instance
  65. // based on a zookeeer connection string that can include a chroot.
  66. func NewKazooFromConnectionString(connectionString string, conf *Config) (*Kazoo, error) {
  67. if conf == nil {
  68. conf = NewConfig()
  69. }
  70. nodes, chroot := ParseConnectionString(connectionString)
  71. conf.Chroot = chroot
  72. return NewKazoo(nodes, conf)
  73. }
  74. // Brokers returns a map of all the brokers that make part of the
  75. // Kafka cluster that is registered in Zookeeper.
  76. func (kz *Kazoo) Brokers() (map[int32]string, error) {
  77. root := fmt.Sprintf("%s/brokers/ids", kz.conf.Chroot)
  78. children, _, err := kz.conn.Children(root)
  79. if err != nil {
  80. return nil, err
  81. }
  82. type brokerEntry struct {
  83. Host string `json:"host"`
  84. Port int `json:"port"`
  85. }
  86. result := make(map[int32]string)
  87. for _, child := range children {
  88. brokerID, err := strconv.ParseInt(child, 10, 32)
  89. if err != nil {
  90. return nil, err
  91. }
  92. value, _, err := kz.conn.Get(path.Join(root, child))
  93. if err != nil {
  94. return nil, err
  95. }
  96. var brokerNode brokerEntry
  97. if err := json.Unmarshal(value, &brokerNode); err != nil {
  98. return nil, err
  99. }
  100. result[int32(brokerID)] = fmt.Sprintf("%s:%d", brokerNode.Host, brokerNode.Port)
  101. }
  102. return result, nil
  103. }
  104. // BrokerList returns a slice of broker addresses that can be used to connect to
  105. // the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`.
  106. func (kz *Kazoo) BrokerList() ([]string, error) {
  107. brokers, err := kz.Brokers()
  108. if err != nil {
  109. return nil, err
  110. }
  111. result := make([]string, 0, len(brokers))
  112. for _, broker := range brokers {
  113. result = append(result, broker)
  114. }
  115. return result, nil
  116. }
  117. // Controller returns what broker is currently acting as controller of the Kafka cluster
  118. func (kz *Kazoo) Controller() (int32, error) {
  119. type controllerEntry struct {
  120. BrokerID int32 `json:"brokerid"`
  121. }
  122. node := fmt.Sprintf("%s/controller", kz.conf.Chroot)
  123. data, _, err := kz.conn.Get(node)
  124. if err != nil {
  125. return -1, err
  126. }
  127. var controllerNode controllerEntry
  128. if err := json.Unmarshal(data, &controllerNode); err != nil {
  129. return -1, err
  130. }
  131. return controllerNode.BrokerID, nil
  132. }
  133. // Close closes the connection with the Zookeeper cluster
  134. func (kz *Kazoo) Close() error {
  135. kz.conn.Close()
  136. return nil
  137. }
  138. ////////////////////////////////////////////////////////////////////////
  139. // Util methods
  140. ////////////////////////////////////////////////////////////////////////
  141. // Exists checks existence of a node
  142. func (kz *Kazoo) exists(node string) (ok bool, err error) {
  143. ok, _, err = kz.conn.Exists(node)
  144. return
  145. }
  146. // DeleteAll deletes a node recursively
  147. func (kz *Kazoo) deleteRecursive(node string) (err error) {
  148. children, stat, err := kz.conn.Children(node)
  149. if err == zk.ErrNoNode {
  150. return nil
  151. } else if err != nil {
  152. return
  153. }
  154. for _, child := range children {
  155. if err = kz.deleteRecursive(path.Join(node, child)); err != nil {
  156. return
  157. }
  158. }
  159. return kz.conn.Delete(node, stat.Version)
  160. }
  161. // MkdirAll creates a directory recursively
  162. func (kz *Kazoo) mkdirRecursive(node string) (err error) {
  163. parent := path.Dir(node)
  164. if parent != "/" {
  165. if err = kz.mkdirRecursive(parent); err != nil {
  166. return
  167. }
  168. }
  169. _, err = kz.conn.Create(node, nil, 0, zk.WorldACL(zk.PermAll))
  170. if err == zk.ErrNodeExists {
  171. err = nil
  172. }
  173. return
  174. }
  175. // Create stores a new value at node. Fails if already set.
  176. func (kz *Kazoo) create(node string, value []byte, ephemeral bool) (err error) {
  177. if err = kz.mkdirRecursive(path.Dir(node)); err != nil {
  178. return
  179. }
  180. flags := int32(0)
  181. if ephemeral {
  182. flags = zk.FlagEphemeral
  183. }
  184. _, err = kz.conn.Create(node, value, flags, zk.WorldACL(zk.PermAll))
  185. return
  186. }