consumergroup.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. package kazoo
  2. import (
  3. "crypto/rand"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strconv"
  10. "time"
  11. "github.com/samuel/go-zookeeper/zk"
  12. )
  13. var (
  14. ErrRunningInstances = errors.New("Cannot deregister a consumergroup with running instances")
  15. ErrInstanceAlreadyRegistered = errors.New("Cannot register consumer instance because it already is registered")
  16. ErrInstanceNotRegistered = errors.New("Cannot deregister consumer instance because it not registered")
  17. ErrPartitionClaimedByOther = errors.New("Cannot claim partition: it is already claimed by another instance")
  18. ErrPartitionNotClaimed = errors.New("Cannot release partition: it is not claimed by this instance")
  19. )
  20. // Consumergroup represents a high-level consumer that is registered in Zookeeper,
  21. type Consumergroup struct {
  22. kz *Kazoo
  23. Name string
  24. }
  25. // ConsumergroupInstance represents an instance of a Consumergroup.
  26. type ConsumergroupInstance struct {
  27. cg *Consumergroup
  28. ID string
  29. }
  30. // ConsumergroupList implements the sortable interface on top of a consumer group list
  31. type ConsumergroupList []*Consumergroup
  32. // ConsumergroupInstanceList implements the sortable interface on top of a consumer instance list
  33. type ConsumergroupInstanceList []*ConsumergroupInstance
  34. type Registration struct {
  35. Pattern RegPattern `json:"pattern"`
  36. Subscription map[string]int `json:"subscription"`
  37. Timestamp int64 `json:"timestamp"`
  38. Version RegVersion `json:"version"`
  39. }
  40. type RegPattern string
  41. const (
  42. RegPatternStatic RegPattern = "static"
  43. RegPatternWhiteList RegPattern = "white_list"
  44. RegPatternBlackList RegPattern = "black_list"
  45. )
  46. type RegVersion int
  47. const (
  48. RegDefaultVersion RegVersion = 1
  49. )
  50. // Consumergroups returns all the registered consumergroups
  51. func (kz *Kazoo) Consumergroups() (ConsumergroupList, error) {
  52. root := fmt.Sprintf("%s/consumers", kz.conf.Chroot)
  53. cgs, _, err := kz.conn.Children(root)
  54. if err != nil {
  55. return nil, err
  56. }
  57. result := make(ConsumergroupList, 0, len(cgs))
  58. for _, cg := range cgs {
  59. result = append(result, kz.Consumergroup(cg))
  60. }
  61. return result, nil
  62. }
  63. // Consumergroup instantiates a new consumergroup.
  64. func (kz *Kazoo) Consumergroup(name string) *Consumergroup {
  65. return &Consumergroup{Name: name, kz: kz}
  66. }
  67. // Exists checks whether the consumergroup has been registered in Zookeeper
  68. func (cg *Consumergroup) Exists() (bool, error) {
  69. return cg.kz.exists(fmt.Sprintf("%s/consumers/%s", cg.kz.conf.Chroot, cg.Name))
  70. }
  71. // Create registers the consumergroup in zookeeper
  72. func (cg *Consumergroup) Create() error {
  73. return cg.kz.mkdirRecursive(fmt.Sprintf("%s/consumers/%s", cg.kz.conf.Chroot, cg.Name))
  74. }
  75. // Delete removes the consumergroup from zookeeper
  76. func (cg *Consumergroup) Delete() error {
  77. if instances, err := cg.Instances(); err != nil {
  78. return err
  79. } else if len(instances) > 0 {
  80. return ErrRunningInstances
  81. }
  82. return cg.kz.deleteRecursive(fmt.Sprintf("%s/consumers/%s", cg.kz.conf.Chroot, cg.Name))
  83. }
  84. // Instances returns a map of all running instances inside this consumergroup.
  85. func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error) {
  86. root := fmt.Sprintf("%s/consumers/%s/ids", cg.kz.conf.Chroot, cg.Name)
  87. if exists, err := cg.kz.exists(root); err != nil {
  88. return nil, err
  89. } else if exists {
  90. cgis, _, err := cg.kz.conn.Children(root)
  91. if err != nil {
  92. return nil, err
  93. }
  94. result := make(ConsumergroupInstanceList, 0, len(cgis))
  95. for _, cgi := range cgis {
  96. result = append(result, cg.Instance(cgi))
  97. }
  98. return result, nil
  99. } else {
  100. result := make(ConsumergroupInstanceList, 0)
  101. return result, nil
  102. }
  103. }
  104. // WatchInstances returns a ConsumergroupInstanceList, and a channel that will be closed
  105. // as soon the instance list changes.
  106. func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan zk.Event, error) {
  107. node := fmt.Sprintf("%s/consumers/%s/ids", cg.kz.conf.Chroot, cg.Name)
  108. if exists, err := cg.kz.exists(node); err != nil {
  109. return nil, nil, err
  110. } else if !exists {
  111. if err := cg.kz.mkdirRecursive(node); err != nil {
  112. return nil, nil, err
  113. }
  114. }
  115. cgis, _, c, err := cg.kz.conn.ChildrenW(node)
  116. if err != nil {
  117. return nil, nil, err
  118. }
  119. result := make(ConsumergroupInstanceList, 0, len(cgis))
  120. for _, cgi := range cgis {
  121. result = append(result, cg.Instance(cgi))
  122. }
  123. return result, c, nil
  124. }
  125. // NewInstance instantiates a new ConsumergroupInstance inside this consumer group,
  126. // using a newly generated ID.
  127. func (cg *Consumergroup) NewInstance() *ConsumergroupInstance {
  128. id, err := generateConsumerInstanceID()
  129. if err != nil {
  130. panic(err)
  131. }
  132. return cg.Instance(id)
  133. }
  134. // Instance instantiates a new ConsumergroupInstance inside this consumer group,
  135. // using an existing ID.
  136. func (cg *Consumergroup) Instance(id string) *ConsumergroupInstance {
  137. return &ConsumergroupInstance{cg: cg, ID: id}
  138. }
  139. // PartitionOwner returns the ConsumergroupInstance that has claimed the given partition.
  140. // This can be nil if nobody has claimed it yet.
  141. func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*ConsumergroupInstance, error) {
  142. node := fmt.Sprintf("%s/consumers/%s/owners/%s/%d", cg.kz.conf.Chroot, cg.Name, topic, partition)
  143. val, _, err := cg.kz.conn.Get(node)
  144. // If the node does not exists, nobody has claimed it.
  145. switch err {
  146. case nil:
  147. return &ConsumergroupInstance{cg: cg, ID: string(val)}, nil
  148. case zk.ErrNoNode:
  149. return nil, nil
  150. default:
  151. return nil, err
  152. }
  153. }
  154. // WatchPartitionOwner retrieves what instance is currently owning the partition, and sets a
  155. // Zookeeper watch to be notified of changes. If the partition currently does not have an owner,
  156. // the function returns nil for every return value. In this case is should be safe to claim
  157. // the partition for an instance.
  158. func (cg *Consumergroup) WatchPartitionOwner(topic string, partition int32) (*ConsumergroupInstance, <-chan zk.Event, error) {
  159. node := fmt.Sprintf("%s/consumers/%s/owners/%s/%d", cg.kz.conf.Chroot, cg.Name, topic, partition)
  160. instanceID, _, changed, err := cg.kz.conn.GetW(node)
  161. switch err {
  162. case nil:
  163. return &ConsumergroupInstance{cg: cg, ID: string(instanceID)}, changed, nil
  164. case zk.ErrNoNode:
  165. return nil, nil, nil
  166. default:
  167. return nil, nil, err
  168. }
  169. }
  170. // Registered checks whether the consumergroup instance is registered in Zookeeper.
  171. func (cgi *ConsumergroupInstance) Registered() (bool, error) {
  172. node := fmt.Sprintf("%s/consumers/%s/ids/%s", cgi.cg.kz.conf.Chroot, cgi.cg.Name, cgi.ID)
  173. return cgi.cg.kz.exists(node)
  174. }
  175. // Registered returns current registration of the consumer group instance.
  176. func (cgi *ConsumergroupInstance) Registration() (*Registration, error) {
  177. node := fmt.Sprintf("%s/consumers/%s/ids/%s", cgi.cg.kz.conf.Chroot, cgi.cg.Name, cgi.ID)
  178. val, _, err := cgi.cg.kz.conn.Get(node)
  179. if err != nil {
  180. return nil, err
  181. }
  182. reg := &Registration{}
  183. if err := json.Unmarshal(val, reg); err != nil {
  184. return nil, err
  185. }
  186. return reg, nil
  187. }
  188. // RegisterSubscription registers the consumer instance in Zookeeper, with its subscription.
  189. func (cgi *ConsumergroupInstance) RegisterWithSubscription(subscriptionJSON []byte) error {
  190. if exists, err := cgi.Registered(); err != nil {
  191. return err
  192. } else if exists {
  193. return ErrInstanceAlreadyRegistered
  194. }
  195. // Create an ephemeral node for the the consumergroup instance.
  196. node := fmt.Sprintf("%s/consumers/%s/ids/%s", cgi.cg.kz.conf.Chroot, cgi.cg.Name, cgi.ID)
  197. return cgi.cg.kz.create(node, subscriptionJSON, true)
  198. }
  199. // Register registers the consumergroup instance in Zookeeper.
  200. func (cgi *ConsumergroupInstance) Register(topics []string) error {
  201. subscription := make(map[string]int)
  202. for _, topic := range topics {
  203. subscription[topic] = 1
  204. }
  205. data, err := json.Marshal(&Registration{
  206. Pattern: RegPatternStatic,
  207. Subscription: subscription,
  208. Timestamp: time.Now().Unix(),
  209. Version: RegDefaultVersion,
  210. })
  211. if err != nil {
  212. return err
  213. }
  214. return cgi.RegisterWithSubscription(data)
  215. }
  216. // Deregister removes the registration of the instance from zookeeper.
  217. func (cgi *ConsumergroupInstance) Deregister() error {
  218. node := fmt.Sprintf("%s/consumers/%s/ids/%s", cgi.cg.kz.conf.Chroot, cgi.cg.Name, cgi.ID)
  219. exists, stat, err := cgi.cg.kz.conn.Exists(node)
  220. if err != nil {
  221. return err
  222. } else if !exists {
  223. return ErrInstanceNotRegistered
  224. }
  225. return cgi.cg.kz.conn.Delete(node, stat.Version)
  226. }
  227. // Claim claims a topic/partition ownership for a consumer ID within a group. If the
  228. // partition is already claimed by another running instance, it will return ErrAlreadyClaimed.
  229. func (cgi *ConsumergroupInstance) ClaimPartition(topic string, partition int32) error {
  230. root := fmt.Sprintf("%s/consumers/%s/owners/%s", cgi.cg.kz.conf.Chroot, cgi.cg.Name, topic)
  231. if err := cgi.cg.kz.mkdirRecursive(root); err != nil {
  232. return err
  233. }
  234. // Create an ephemeral node for the partition to claim the partition for this instance
  235. node := fmt.Sprintf("%s/%d", root, partition)
  236. err := cgi.cg.kz.create(node, []byte(cgi.ID), true)
  237. switch err {
  238. case zk.ErrNodeExists:
  239. data, _, err := cgi.cg.kz.conn.Get(node)
  240. if err != nil {
  241. return err
  242. }
  243. if string(data) != cgi.ID {
  244. // Return a separate error for this, to allow for implementing a retry mechanism.
  245. return ErrPartitionClaimedByOther
  246. }
  247. return nil
  248. default:
  249. return err
  250. }
  251. }
  252. // ReleasePartition releases a claim to a partition.
  253. func (cgi *ConsumergroupInstance) ReleasePartition(topic string, partition int32) error {
  254. owner, err := cgi.cg.PartitionOwner(topic, partition)
  255. if err != nil {
  256. return err
  257. }
  258. if owner == nil || owner.ID != cgi.ID {
  259. return ErrPartitionNotClaimed
  260. }
  261. node := fmt.Sprintf("%s/consumers/%s/owners/%s/%d", cgi.cg.kz.conf.Chroot, cgi.cg.Name, topic, partition)
  262. return cgi.cg.kz.conn.Delete(node, 0)
  263. }
  264. // Topics retrieves the list of topics the consumergroup has claimed ownership of at some point.
  265. func (cg *Consumergroup) Topics() (TopicList, error) {
  266. root := fmt.Sprintf("%s/consumers/%s/owners", cg.kz.conf.Chroot, cg.Name)
  267. children, _, err := cg.kz.conn.Children(root)
  268. if err != nil {
  269. return nil, err
  270. }
  271. result := make(TopicList, 0, len(children))
  272. for _, name := range children {
  273. result = append(result, cg.kz.Topic(name))
  274. }
  275. return result, nil
  276. }
  277. // CommitOffset commits an offset to a group/topic/partition
  278. func (cg *Consumergroup) CommitOffset(topic string, partition int32, offset int64) error {
  279. node := fmt.Sprintf("%s/consumers/%s/offsets/%s/%d", cg.kz.conf.Chroot, cg.Name, topic, partition)
  280. data := []byte(fmt.Sprintf("%d", offset))
  281. _, stat, err := cg.kz.conn.Get(node)
  282. switch err {
  283. case zk.ErrNoNode: // Create a new node
  284. return cg.kz.create(node, data, false)
  285. case nil: // Update the existing node
  286. _, err := cg.kz.conn.Set(node, data, stat.Version)
  287. return err
  288. default:
  289. return err
  290. }
  291. }
  292. // FetchOffset retrieves an offset to a group/topic/partition
  293. func (cg *Consumergroup) FetchOffset(topic string, partition int32) (int64, error) {
  294. node := fmt.Sprintf("%s/consumers/%s/offsets/%s/%d", cg.kz.conf.Chroot, cg.Name, topic, partition)
  295. val, _, err := cg.kz.conn.Get(node)
  296. if err == zk.ErrNoNode {
  297. return -1, nil
  298. } else if err != nil {
  299. return -1, err
  300. }
  301. return strconv.ParseInt(string(val), 10, 64)
  302. }
  303. // FetchOffset retrieves all the commmitted offsets for a group
  304. func (cg *Consumergroup) FetchAllOffsets() (map[string]map[int32]int64, error) {
  305. result := make(map[string]map[int32]int64)
  306. offsetsNode := fmt.Sprintf("%s/consumers/%s/offsets", cg.kz.conf.Chroot, cg.Name)
  307. topics, _, err := cg.kz.conn.Children(offsetsNode)
  308. if err == zk.ErrNoNode {
  309. return result, nil
  310. } else if err != nil {
  311. return nil, err
  312. }
  313. for _, topic := range topics {
  314. result[topic] = make(map[int32]int64)
  315. topicNode := fmt.Sprintf("%s/consumers/%s/offsets/%s", cg.kz.conf.Chroot, cg.Name, topic)
  316. partitions, _, err := cg.kz.conn.Children(topicNode)
  317. if err != nil {
  318. return nil, err
  319. }
  320. for _, partition := range partitions {
  321. partitionNode := fmt.Sprintf("%s/consumers/%s/offsets/%s/%s", cg.kz.conf.Chroot, cg.Name, topic, partition)
  322. val, _, err := cg.kz.conn.Get(partitionNode)
  323. if err != nil {
  324. return nil, err
  325. }
  326. partition, err := strconv.ParseInt(partition, 10, 32)
  327. if err != nil {
  328. return nil, err
  329. }
  330. offset, err := strconv.ParseInt(string(val), 10, 64)
  331. if err != nil {
  332. return nil, err
  333. }
  334. result[topic][int32(partition)] = offset
  335. }
  336. }
  337. return result, nil
  338. }
  339. func (cg *Consumergroup) ResetOffsets() error {
  340. offsetsNode := fmt.Sprintf("%s/consumers/%s/offsets", cg.kz.conf.Chroot, cg.Name)
  341. topics, _, err := cg.kz.conn.Children(offsetsNode)
  342. if err == zk.ErrNoNode {
  343. return nil
  344. } else if err != nil {
  345. return err
  346. }
  347. for _, topic := range topics {
  348. topicNode := fmt.Sprintf("%s/consumers/%s/offsets/%s", cg.kz.conf.Chroot, cg.Name, topic)
  349. partitions, stat, err := cg.kz.conn.Children(topicNode)
  350. if err != nil {
  351. return err
  352. }
  353. for _, partition := range partitions {
  354. partitionNode := fmt.Sprintf("%s/consumers/%s/offsets/%s/%s", cg.kz.conf.Chroot, cg.Name, topic, partition)
  355. exists, stat, err := cg.kz.conn.Exists(partitionNode)
  356. if exists {
  357. if err = cg.kz.conn.Delete(partitionNode, stat.Version); err != nil {
  358. if err != zk.ErrNoNode {
  359. return err
  360. }
  361. }
  362. }
  363. }
  364. if err := cg.kz.conn.Delete(topicNode, stat.Version); err != nil {
  365. if err != zk.ErrNoNode {
  366. return err
  367. }
  368. }
  369. }
  370. return nil
  371. }
  372. // generateUUID Generates a UUIDv4.
  373. func generateUUID() (string, error) {
  374. uuid := make([]byte, 16)
  375. n, err := io.ReadFull(rand.Reader, uuid)
  376. if n != len(uuid) || err != nil {
  377. return "", err
  378. }
  379. // variant bits; see section 4.1.1
  380. uuid[8] = uuid[8]&^0xc0 | 0x80
  381. // version 4 (pseudo-random); see section 4.1.3
  382. uuid[6] = uuid[6]&^0xf0 | 0x40
  383. return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
  384. }
  385. // generateConsumerInstanceID generates a consumergroup Instance ID
  386. // that is almost certain to be unique.
  387. func generateConsumerInstanceID() (string, error) {
  388. uuid, err := generateUUID()
  389. if err != nil {
  390. return "", err
  391. }
  392. hostname, err := os.Hostname()
  393. if err != nil {
  394. return "", err
  395. }
  396. return fmt.Sprintf("%s:%s", hostname, uuid), nil
  397. }
  398. // Find returns the consumergroup with the given name if it exists in the list.
  399. // Otherwise it will return `nil`.
  400. func (cgl ConsumergroupList) Find(name string) *Consumergroup {
  401. for _, cg := range cgl {
  402. if cg.Name == name {
  403. return cg
  404. }
  405. }
  406. return nil
  407. }
  408. func (cgl ConsumergroupList) Len() int {
  409. return len(cgl)
  410. }
  411. func (cgl ConsumergroupList) Less(i, j int) bool {
  412. return cgl[i].Name < cgl[j].Name
  413. }
  414. func (cgl ConsumergroupList) Swap(i, j int) {
  415. cgl[i], cgl[j] = cgl[j], cgl[i]
  416. }
  417. // Find returns the consumergroup instance with the given ID if it exists in the list.
  418. // Otherwise it will return `nil`.
  419. func (cgil ConsumergroupInstanceList) Find(id string) *ConsumergroupInstance {
  420. for _, cgi := range cgil {
  421. if cgi.ID == id {
  422. return cgi
  423. }
  424. }
  425. return nil
  426. }
  427. func (cgil ConsumergroupInstanceList) Len() int {
  428. return len(cgil)
  429. }
  430. func (cgil ConsumergroupInstanceList) Less(i, j int) bool {
  431. return cgil[i].ID < cgil[j].ID
  432. }
  433. func (cgil ConsumergroupInstanceList) Swap(i, j int) {
  434. cgil[i], cgil[j] = cgil[j], cgil[i]
  435. }