zk.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package server
  2. import (
  3. "github.com/samuel/go-zookeeper/zk"
  4. "go-common/library/log"
  5. "strings"
  6. "time"
  7. )
  8. type ZkClient struct {
  9. conn *zk.Conn
  10. address []string
  11. timeout time.Duration
  12. dialTime time.Time
  13. closed bool
  14. stopper chan struct{}
  15. }
  16. func NewZkClient(addrs []string, timeout time.Duration) (*ZkClient, error) {
  17. if timeout <= 0 {
  18. timeout = time.Second * 5
  19. }
  20. c := &ZkClient{
  21. address: addrs,
  22. timeout: timeout,
  23. stopper: make(chan struct{}),
  24. }
  25. if err := c.Reset(); err != nil {
  26. return nil, err
  27. }
  28. return c, nil
  29. }
  30. func (c *ZkClient) GetTimeout() time.Duration {
  31. return c.timeout
  32. }
  33. func (c *ZkClient) RecursiveCreate(path string) error {
  34. if path == "" || path == "/" {
  35. return nil
  36. }
  37. if exists, _, err := c.conn.Exists(path); err != nil {
  38. return err
  39. } else if exists {
  40. return nil
  41. }
  42. if err := c.RecursiveCreate(path[0:strings.LastIndex(path, "/")]); err != nil {
  43. return err
  44. }
  45. _, err := c.conn.Create(path, []byte{}, 0, zk.WorldACL(zk.PermAll))
  46. if err != nil && err != zk.ErrNodeExists {
  47. return err
  48. }
  49. return nil
  50. }
  51. func (c *ZkClient) Reset() error {
  52. c.dialTime = time.Now()
  53. conn, events, err := zk.Connect(c.address, c.timeout)
  54. if err != nil {
  55. return err
  56. }
  57. if c.conn != nil {
  58. c.conn.Close()
  59. c.conn = nil
  60. }
  61. c.conn = conn
  62. go func() {
  63. for ev := range events {
  64. if ev.Err == nil {
  65. log.V(2).Info("[ZooKeeper]Event Info:%+v", ev)
  66. } else {
  67. log.Error("[ZooKeeper]Event Error:%+v", ev)
  68. }
  69. }
  70. }()
  71. return nil
  72. }
  73. func (c *ZkClient) CreateEphemeralNode(path string, node string, data []byte) (string, error) {
  74. if err := c.RecursiveCreate(path); err != nil {
  75. return "", err
  76. }
  77. nodePath := strings.Join([]string{path, node}, "/")
  78. path, err := c.conn.Create(nodePath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
  79. return path, err
  80. }
  81. func (c *ZkClient) CreatePersistNode(path string, node string, data []byte) (string, error) {
  82. if err := c.RecursiveCreate(path); err != nil {
  83. return "", err
  84. }
  85. nodePath := strings.Join([]string{path, node}, "/")
  86. path, err := c.conn.Create(nodePath, data, 0, zk.WorldACL(zk.PermAll))
  87. return path, err
  88. }
  89. func (c *ZkClient) Exists(path string, node string) (bool, int32, error) {
  90. fullPath := strings.Join([]string{path, node}, "/")
  91. exists, stat, err := c.conn.Exists(fullPath)
  92. return exists, stat.Version, err
  93. }
  94. func (c *ZkClient) SetNodeData(path string, node string, data []byte, version int32) error {
  95. var err error
  96. fullPath := strings.Join([]string{path, node}, "/")
  97. _, err = c.conn.Set(fullPath, data, version)
  98. return err
  99. }
  100. func (c *ZkClient) GetNodeData(path string, node string) ([]byte, int32, error) {
  101. var err error
  102. fullPath := strings.Join([]string{path, node}, "/")
  103. data, stat, err := c.conn.Get(fullPath)
  104. return data, stat.Version, err
  105. }
  106. func (c *ZkClient) DeleteNode(path string, node string) error {
  107. var err error
  108. fullPath := strings.Join([]string{path, node}, "/")
  109. exists, stat, err := c.conn.Exists(fullPath)
  110. if err != nil {
  111. return err
  112. }
  113. if !exists {
  114. return nil
  115. }
  116. err = c.conn.Delete(fullPath, stat.Version)
  117. return err
  118. }
  119. func (c *ZkClient) Close() {
  120. if c.closed {
  121. return
  122. }
  123. c.closed = true
  124. if c.conn != nil {
  125. c.conn.Close()
  126. }
  127. close(c.stopper)
  128. }
  129. func (c *ZkClient) GetChildren(node string) ([]string, error) {
  130. children, _, err := c.conn.Children(node)
  131. return children, err
  132. }
  133. func (c *ZkClient) GetChildrenWithData(node string) (map[string]string, error) {
  134. children, _, err := c.conn.Children(node)
  135. result := make(map[string]string)
  136. for _, child := range children {
  137. if data, _, e := c.conn.Get(strings.Join([]string{node, child}, "/")); e == nil {
  138. result[child] = string(data)
  139. } else {
  140. log.Error("[ZookeeperClient]GetChildrenWithData:get child:%s failed, err:%s", child, e.Error())
  141. }
  142. }
  143. return result, err
  144. }
  145. func (c *ZkClient) GetData(path string) (string, error) {
  146. data, _, err := c.conn.Get(path)
  147. return string(data), err
  148. }
  149. func (c *ZkClient) WatchChildren(path string) (map[string]struct{}, <-chan zk.Event, error) {
  150. if exists, _, err := c.conn.Exists(path); err != nil {
  151. return nil, nil, err
  152. } else if !exists {
  153. return nil, nil, zk.ErrNoNode
  154. }
  155. children, _, event, err := c.conn.ChildrenW(path)
  156. if err != nil {
  157. return nil, nil, err
  158. }
  159. result := make(map[string]struct{})
  160. for _, child := range children {
  161. result[child] = struct{}{}
  162. }
  163. return result, event, nil
  164. }
  165. func (c *ZkClient) WatchChildrenWithData(node string) (map[string]string, <-chan zk.Event, error) {
  166. if exists, _, err := c.conn.Exists(node); err != nil {
  167. return nil, nil, err
  168. } else if !exists {
  169. return nil, nil, zk.ErrNoNode
  170. }
  171. children, _, event, err := c.conn.ChildrenW(node)
  172. if err != nil {
  173. return nil, nil, err
  174. }
  175. result := make(map[string]string)
  176. for _, child := range children {
  177. if data, _, e := c.conn.Get(strings.Join([]string{node, child}, "/")); e == nil {
  178. result[child] = string(data)
  179. } else {
  180. return nil, nil, e
  181. }
  182. }
  183. return result, event, nil
  184. }
  185. func (c *ZkClient) WatchData(path string) ([]byte, <-chan zk.Event, error) {
  186. data, _, event, err := c.conn.GetW(path)
  187. return data, event, err
  188. }
  189. func (c *ZkClient) ZooKeeperPath(args ...string) string {
  190. return strings.Join(args, "/")
  191. }