livezk.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package livezk
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "net/url"
  7. "path"
  8. "strings"
  9. "time"
  10. "go-common/library/log"
  11. "go-common/library/naming"
  12. xtime "go-common/library/time"
  13. "github.com/samuel/go-zookeeper/zk"
  14. )
  15. const (
  16. basePath = "/live/service"
  17. scheme = "grpc"
  18. )
  19. // Zookeeper Server&Client settings.
  20. type Zookeeper struct {
  21. Root string
  22. Addrs []string
  23. Timeout xtime.Duration
  24. }
  25. // New new live zookeeper registry
  26. func New(config *Zookeeper) (naming.Registry, error) {
  27. lz := &livezk{
  28. zkConfig: config,
  29. }
  30. var err error
  31. lz.zkConn, lz.zkEvent, err = zk.Connect(config.Addrs, time.Duration(config.Timeout))
  32. if err != nil {
  33. go lz.eventproc()
  34. }
  35. return lz, err
  36. }
  37. type zkIns struct {
  38. Group string `json:"group"`
  39. LibVersion string `json:"lib_version"`
  40. StartupTime string `json:"startup_time"`
  41. }
  42. func newZkInsData(ins *naming.Instance) ([]byte, error) {
  43. zi := &zkIns{
  44. // TODO group support
  45. Group: "default",
  46. LibVersion: ins.Version,
  47. StartupTime: time.Now().Format("2006-01-02 15:04:05"),
  48. }
  49. return json.Marshal(zi)
  50. }
  51. // livezk live service zookeeper registry
  52. type livezk struct {
  53. zkConfig *Zookeeper
  54. zkConn *zk.Conn
  55. zkEvent <-chan zk.Event
  56. }
  57. var _ naming.Registry = &livezk{}
  58. func (l *livezk) Register(ctx context.Context, ins *naming.Instance) (cancel context.CancelFunc, err error) {
  59. nodePath := path.Join(l.zkConfig.Root, basePath, ins.AppID)
  60. if err = l.createAll(nodePath); err != nil {
  61. return
  62. }
  63. var rpc string
  64. for _, addr := range ins.Addrs {
  65. u, ue := url.Parse(addr)
  66. if ue == nil && u.Scheme == scheme {
  67. rpc = u.Host
  68. break
  69. }
  70. }
  71. if rpc == "" {
  72. err = errors.New("no GRPC addr")
  73. return
  74. }
  75. dataPath := path.Join(nodePath, rpc)
  76. data, err := newZkInsData(ins)
  77. if err != nil {
  78. return nil, err
  79. }
  80. _, err = l.zkConn.Create(dataPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
  81. if err != nil {
  82. return nil, err
  83. }
  84. return func() {
  85. l.unregister(dataPath)
  86. }, nil
  87. }
  88. func (l *livezk) Close() error {
  89. l.zkConn.Close()
  90. return nil
  91. }
  92. func (l *livezk) createAll(nodePath string) (err error) {
  93. seps := strings.Split(nodePath, "/")
  94. lastPath := "/"
  95. ok := false
  96. for _, part := range seps {
  97. if part == "" {
  98. continue
  99. }
  100. lastPath = path.Join(lastPath, part)
  101. if ok, _, err = l.zkConn.Exists(lastPath); err != nil {
  102. return err
  103. } else if ok {
  104. continue
  105. }
  106. if _, err = l.zkConn.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)); err != nil {
  107. return
  108. }
  109. }
  110. return
  111. }
  112. func (l *livezk) eventproc() {
  113. for event := range l.zkEvent {
  114. // TODO handle zookeeper event
  115. log.Info("zk event: err: %s, path: %s, server: %s, state: %s, type: %s",
  116. event.Err, event.Path, event.Server, event.State, event.Type)
  117. }
  118. }
  119. func (l *livezk) unregister(dataPath string) error {
  120. return l.zkConn.Delete(dataPath, -1)
  121. }