election.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package election
  2. import (
  3. "errors"
  4. "github.com/samuel/go-zookeeper/zk"
  5. "go-common/library/log"
  6. "path"
  7. "sort"
  8. "time"
  9. )
  10. var (
  11. //ErrNotInit init fail
  12. ErrNotInit = errors.New("init first")
  13. //ErrFailConn fail to connect
  14. ErrFailConn = errors.New("fail to connect zk")
  15. )
  16. //ZkElection election by zk
  17. type ZkElection struct {
  18. dir string
  19. servers []string
  20. conn *zk.Conn
  21. timeout time.Duration
  22. RootPath string
  23. NodePath string
  24. MasterPath string
  25. IsMaster bool
  26. // wait for this channel, true means master, false means follower
  27. C <-chan bool
  28. leaderChan chan bool
  29. running bool
  30. }
  31. // New create new ZkElection
  32. func New(servers []string, dir string, timeout time.Duration) *ZkElection {
  33. var z = &ZkElection{}
  34. z.servers = servers
  35. z.dir = dir
  36. z.timeout = timeout
  37. z.running = true
  38. return z
  39. }
  40. //Init init the elections
  41. // dir is root path for election, if dir = "/project", then, election will use "/project/election" as election path
  42. // and node would be "/project/election/n_xxxxxxxx"
  43. // if error happens, the election would work
  44. func (z *ZkElection) Init() (err error) {
  45. z.conn, _, err = zk.Connect(z.servers, z.timeout)
  46. if err != nil {
  47. log.Error("fail connect zk, err=%s", err)
  48. return
  49. }
  50. z.RootPath = path.Join(z.dir, "election")
  51. exist, _, err := z.conn.Exists(z.RootPath)
  52. if err != nil {
  53. log.Error("fail to check path, path=%s, err=%v", z.RootPath, err)
  54. return
  55. }
  56. if !exist {
  57. _, err = z.conn.Create(z.RootPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
  58. if err != nil {
  59. log.Error("create election fail, path=%s, err=%v", z.RootPath, err)
  60. return
  61. }
  62. }
  63. var pathPrefix = path.Join(z.RootPath, "/n_")
  64. z.NodePath, err = z.conn.Create(pathPrefix, []byte(""), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
  65. if err != nil {
  66. log.Error("fail create node path, path=%s, err=%s", pathPrefix, err)
  67. return
  68. }
  69. return
  70. }
  71. //Elect elect for leader
  72. // wait for the chan, if get a true, mean you are the leader
  73. // if you already a leader, a false means you are kicked
  74. func (z *ZkElection) Elect() (err error) {
  75. if z.conn == nil {
  76. return ErrNotInit
  77. }
  78. if z.leaderChan != nil {
  79. return
  80. }
  81. z.leaderChan = make(chan bool)
  82. z.C = z.leaderChan
  83. go func() {
  84. defer close(z.leaderChan)
  85. var ch, err = z.watchChange()
  86. if err != nil {
  87. log.Error("fail to watch, err=%s", err)
  88. return
  89. }
  90. for z.running {
  91. event := <-ch
  92. switch event.Type {
  93. case zk.EventNodeDeleted:
  94. ch, err = z.watchChange()
  95. if err != nil {
  96. log.Error("fail to watch, try next time in seconds, err=%s", err)
  97. time.Sleep(time.Second * 5)
  98. }
  99. case zk.EventNotWatching:
  100. log.Warn("receive not watching event, event=%+v", event)
  101. if event.State == zk.StateDisconnected {
  102. log.Info("reinit zk")
  103. if err = z.Init(); err != nil {
  104. log.Error("err init zk again, sleep 5s, err=%v", err)
  105. time.Sleep(5*time.Second)
  106. } else {
  107. ch, err = z.watchChange()
  108. if err != nil {
  109. log.Error("fail to watch, err=%s", err)
  110. return
  111. }
  112. }
  113. }
  114. }
  115. log.Info("zk event, event=%+v", event)
  116. }
  117. log.Error("exit election proc")
  118. }()
  119. return
  120. }
  121. func (z *ZkElection) watchChange() (ch <-chan zk.Event, err error) {
  122. children, _, e := z.conn.Children(z.RootPath)
  123. if err != nil {
  124. err = e
  125. log.Error("get children error, err=%+v\n", err)
  126. return
  127. }
  128. if len(children) == 0 {
  129. log.Warn("no child get from root: %s", z.RootPath)
  130. return
  131. }
  132. var min string
  133. if len(children) > 0 {
  134. sort.SliceStable(children, func(i, j int) bool {
  135. return children[i] < children[j]
  136. })
  137. min = children[0]
  138. z.MasterPath = min
  139. }
  140. var nodebase = path.Base(z.NodePath)
  141. for i, v := range children {
  142. if v == nodebase {
  143. if min == nodebase {
  144. log.Info("this is master, node=%s", z.NodePath)
  145. z.IsMaster = true
  146. z.leaderChan <- true
  147. _, _, ch, err = z.conn.GetW(z.NodePath)
  148. } else {
  149. log.Info("master is %s", min)
  150. prev := children[i-1]
  151. var preNode = path.Join(z.RootPath, prev)
  152. _, _, ch, err = z.conn.GetW(preNode)
  153. if err != nil {
  154. log.Error("watch node fail, node=%s, err=%s", preNode, err)
  155. }
  156. z.IsMaster = false
  157. z.leaderChan <- false
  158. }
  159. } else {
  160. log.Warn("v=%s, not same with base, base=%s", v, nodebase)
  161. }
  162. }
  163. log.Info("watchChange, len(children)=%d", len(children))
  164. return
  165. }
  166. //Close close the election
  167. func (z *ZkElection) Close() {
  168. z.running = false
  169. }