123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- package kazoo
- import (
- "encoding/json"
- "errors"
- "fmt"
- "path"
- "strconv"
- "strings"
- "time"
- "github.com/samuel/go-zookeeper/zk"
- )
- var (
- FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
- )
- // ParseConnectionString parses a zookeeper connection string in the form of
- // host1:2181,host2:2181/chroot and returns the list of servers, and the chroot.
- func ParseConnectionString(zookeeper string) (nodes []string, chroot string) {
- nodesAndChroot := strings.SplitN(zookeeper, "/", 2)
- if len(nodesAndChroot) == 2 {
- chroot = fmt.Sprintf("/%s", nodesAndChroot[1])
- }
- nodes = strings.Split(nodesAndChroot[0], ",")
- return
- }
- // BuildConnectionString builds a Zookeeper connection string for a list of nodes.
- // Returns a string like "zk1:2181,zk2:2181,zk3:2181"
- func BuildConnectionString(nodes []string) string {
- return strings.Join(nodes, ",")
- }
- // ConnectionStringWithChroot builds a Zookeeper connection string for a list
- // of nodes and a chroot. The chroot should start with "/".
- // Returns a string like "zk1:2181,zk2:2181,zk3:2181/chroot"
- func BuildConnectionStringWithChroot(nodes []string, chroot string) string {
- return fmt.Sprintf("%s%s", strings.Join(nodes, ","), chroot)
- }
- // Kazoo interacts with the Kafka metadata in Zookeeper
- type Kazoo struct {
- conn *zk.Conn
- conf *Config
- }
- // Config holds configuration values f.
- type Config struct {
- // The chroot the Kafka installation is registerde under. Defaults to "".
- Chroot string
- // The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster
- // before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second.
- Timeout time.Duration
- }
- // NewConfig instantiates a new Config struct with sane defaults.
- func NewConfig() *Config {
- return &Config{Timeout: 1 * time.Second}
- }
- // NewKazoo creates a new connection instance
- func NewKazoo(servers []string, conf *Config) (*Kazoo, error) {
- if conf == nil {
- conf = NewConfig()
- }
- conn, _, err := zk.Connect(servers, conf.Timeout)
- if err != nil {
- return nil, err
- }
- return &Kazoo{conn, conf}, nil
- }
- // NewKazooFromConnectionString creates a new connection instance
- // based on a zookeeer connection string that can include a chroot.
- func NewKazooFromConnectionString(connectionString string, conf *Config) (*Kazoo, error) {
- if conf == nil {
- conf = NewConfig()
- }
- nodes, chroot := ParseConnectionString(connectionString)
- conf.Chroot = chroot
- return NewKazoo(nodes, conf)
- }
- // Brokers returns a map of all the brokers that make part of the
- // Kafka cluster that is registered in Zookeeper.
- func (kz *Kazoo) Brokers() (map[int32]string, error) {
- root := fmt.Sprintf("%s/brokers/ids", kz.conf.Chroot)
- children, _, err := kz.conn.Children(root)
- if err != nil {
- return nil, err
- }
- type brokerEntry struct {
- Host string `json:"host"`
- Port int `json:"port"`
- }
- result := make(map[int32]string)
- for _, child := range children {
- brokerID, err := strconv.ParseInt(child, 10, 32)
- if err != nil {
- return nil, err
- }
- value, _, err := kz.conn.Get(path.Join(root, child))
- if err != nil {
- return nil, err
- }
- var brokerNode brokerEntry
- if err := json.Unmarshal(value, &brokerNode); err != nil {
- return nil, err
- }
- result[int32(brokerID)] = fmt.Sprintf("%s:%d", brokerNode.Host, brokerNode.Port)
- }
- return result, nil
- }
- // BrokerList returns a slice of broker addresses that can be used to connect to
- // the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`.
- func (kz *Kazoo) BrokerList() ([]string, error) {
- brokers, err := kz.Brokers()
- if err != nil {
- return nil, err
- }
- result := make([]string, 0, len(brokers))
- for _, broker := range brokers {
- result = append(result, broker)
- }
- return result, nil
- }
- // Controller returns what broker is currently acting as controller of the Kafka cluster
- func (kz *Kazoo) Controller() (int32, error) {
- type controllerEntry struct {
- BrokerID int32 `json:"brokerid"`
- }
- node := fmt.Sprintf("%s/controller", kz.conf.Chroot)
- data, _, err := kz.conn.Get(node)
- if err != nil {
- return -1, err
- }
- var controllerNode controllerEntry
- if err := json.Unmarshal(data, &controllerNode); err != nil {
- return -1, err
- }
- return controllerNode.BrokerID, nil
- }
- // Close closes the connection with the Zookeeper cluster
- func (kz *Kazoo) Close() error {
- kz.conn.Close()
- return nil
- }
- ////////////////////////////////////////////////////////////////////////
- // Util methods
- ////////////////////////////////////////////////////////////////////////
- // Exists checks existence of a node
- func (kz *Kazoo) exists(node string) (ok bool, err error) {
- ok, _, err = kz.conn.Exists(node)
- return
- }
- // DeleteAll deletes a node recursively
- func (kz *Kazoo) deleteRecursive(node string) (err error) {
- children, stat, err := kz.conn.Children(node)
- if err == zk.ErrNoNode {
- return nil
- } else if err != nil {
- return
- }
- for _, child := range children {
- if err = kz.deleteRecursive(path.Join(node, child)); err != nil {
- return
- }
- }
- return kz.conn.Delete(node, stat.Version)
- }
- // MkdirAll creates a directory recursively
- func (kz *Kazoo) mkdirRecursive(node string) (err error) {
- parent := path.Dir(node)
- if parent != "/" {
- if err = kz.mkdirRecursive(parent); err != nil {
- return
- }
- }
- _, err = kz.conn.Create(node, nil, 0, zk.WorldACL(zk.PermAll))
- if err == zk.ErrNodeExists {
- err = nil
- }
- return
- }
- // Create stores a new value at node. Fails if already set.
- func (kz *Kazoo) create(node string, value []byte, ephemeral bool) (err error) {
- if err = kz.mkdirRecursive(path.Dir(node)); err != nil {
- return
- }
- flags := int32(0)
- if ephemeral {
- flags = zk.FlagEphemeral
- }
- _, err = kz.conn.Create(node, value, flags, zk.WorldACL(zk.PermAll))
- return
- }
|