123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- package election
- import (
- "errors"
- "github.com/samuel/go-zookeeper/zk"
- "go-common/library/log"
- "path"
- "sort"
- "time"
- )
- var (
- //ErrNotInit init fail
- ErrNotInit = errors.New("init first")
- //ErrFailConn fail to connect
- ErrFailConn = errors.New("fail to connect zk")
- )
- //ZkElection election by zk
- type ZkElection struct {
- dir string
- servers []string
- conn *zk.Conn
- timeout time.Duration
- RootPath string
- NodePath string
- MasterPath string
- IsMaster bool
- // wait for this channel, true means master, false means follower
- C <-chan bool
- leaderChan chan bool
- running bool
- }
- // New create new ZkElection
- func New(servers []string, dir string, timeout time.Duration) *ZkElection {
- var z = &ZkElection{}
- z.servers = servers
- z.dir = dir
- z.timeout = timeout
- z.running = true
- return z
- }
- //Init init the elections
- // dir is root path for election, if dir = "/project", then, election will use "/project/election" as election path
- // and node would be "/project/election/n_xxxxxxxx"
- // if error happens, the election would work
- func (z *ZkElection) Init() (err error) {
- z.conn, _, err = zk.Connect(z.servers, z.timeout)
- if err != nil {
- log.Error("fail connect zk, err=%s", err)
- return
- }
- z.RootPath = path.Join(z.dir, "election")
- exist, _, err := z.conn.Exists(z.RootPath)
- if err != nil {
- log.Error("fail to check path, path=%s, err=%v", z.RootPath, err)
- return
- }
- if !exist {
- _, err = z.conn.Create(z.RootPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
- if err != nil {
- log.Error("create election fail, path=%s, err=%v", z.RootPath, err)
- return
- }
- }
- var pathPrefix = path.Join(z.RootPath, "/n_")
- z.NodePath, err = z.conn.Create(pathPrefix, []byte(""), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
- if err != nil {
- log.Error("fail create node path, path=%s, err=%s", pathPrefix, err)
- return
- }
- return
- }
- //Elect elect for leader
- // wait for the chan, if get a true, mean you are the leader
- // if you already a leader, a false means you are kicked
- func (z *ZkElection) Elect() (err error) {
- if z.conn == nil {
- return ErrNotInit
- }
- if z.leaderChan != nil {
- return
- }
- z.leaderChan = make(chan bool)
- z.C = z.leaderChan
- go func() {
- defer close(z.leaderChan)
- var ch, err = z.watchChange()
- if err != nil {
- log.Error("fail to watch, err=%s", err)
- return
- }
- for z.running {
- event := <-ch
- switch event.Type {
- case zk.EventNodeDeleted:
- ch, err = z.watchChange()
- if err != nil {
- log.Error("fail to watch, try next time in seconds, err=%s", err)
- time.Sleep(time.Second * 5)
- }
- case zk.EventNotWatching:
- log.Warn("receive not watching event, event=%+v", event)
- if event.State == zk.StateDisconnected {
- log.Info("reinit zk")
- if err = z.Init(); err != nil {
- log.Error("err init zk again, sleep 5s, err=%v", err)
- time.Sleep(5*time.Second)
- } else {
- ch, err = z.watchChange()
- if err != nil {
- log.Error("fail to watch, err=%s", err)
- return
- }
- }
- }
- }
- log.Info("zk event, event=%+v", event)
- }
- log.Error("exit election proc")
- }()
- return
- }
- func (z *ZkElection) watchChange() (ch <-chan zk.Event, err error) {
- children, _, e := z.conn.Children(z.RootPath)
- if err != nil {
- err = e
- log.Error("get children error, err=%+v\n", err)
- return
- }
- if len(children) == 0 {
- log.Warn("no child get from root: %s", z.RootPath)
- return
- }
- var min string
- if len(children) > 0 {
- sort.SliceStable(children, func(i, j int) bool {
- return children[i] < children[j]
- })
- min = children[0]
- z.MasterPath = min
- }
- var nodebase = path.Base(z.NodePath)
- for i, v := range children {
- if v == nodebase {
- if min == nodebase {
- log.Info("this is master, node=%s", z.NodePath)
- z.IsMaster = true
- z.leaderChan <- true
- _, _, ch, err = z.conn.GetW(z.NodePath)
- } else {
- log.Info("master is %s", min)
- prev := children[i-1]
- var preNode = path.Join(z.RootPath, prev)
- _, _, ch, err = z.conn.GetW(preNode)
- if err != nil {
- log.Error("watch node fail, node=%s, err=%s", preNode, err)
- }
- z.IsMaster = false
- z.leaderChan <- false
- }
- } else {
- log.Warn("v=%s, not same with base, base=%s", v, nodebase)
- }
- }
- log.Info("watchChange, len(children)=%d", len(children))
- return
- }
- //Close close the election
- func (z *ZkElection) Close() {
- z.running = false
- }
|