syncup.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "time"
  7. "go-common/app/infra/discovery/conf"
  8. "go-common/app/infra/discovery/dao"
  9. "go-common/app/infra/discovery/model"
  10. libenv "go-common/library/conf/env"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. )
  14. var (
  15. _fetchAllURL = "http://%s/discovery/fetch/all"
  16. )
  17. // syncUp populates the registry information from a peer eureka node.
  18. func (s *Service) syncUp() (err error) {
  19. for _, node := range s.nodes.AllNodes() {
  20. if s.nodes.Myself(node.Addr) {
  21. continue
  22. }
  23. uri := fmt.Sprintf(_fetchAllURL, node.Addr)
  24. var res struct {
  25. Code int `json:"code"`
  26. Data map[string][]*model.Instance `json:"data"`
  27. }
  28. if err = s.client.Get(context.TODO(), uri, "", nil, &res); err != nil {
  29. log.Error("e.client.Get(%v) error(%v)", uri, err)
  30. continue
  31. }
  32. if res.Code != 0 {
  33. log.Error("service syncup from(%s) failed ", uri)
  34. continue
  35. }
  36. for _, is := range res.Data {
  37. for _, i := range is {
  38. s.tLock.RLock()
  39. appid, ok := s.tree[i.Treeid]
  40. s.tLock.RUnlock()
  41. if !ok || appid != i.Appid {
  42. s.tLock.Lock()
  43. s.tree[i.Treeid] = i.Appid
  44. s.tLock.Unlock()
  45. }
  46. s.registry.Register(i, i.LatestTimestamp)
  47. }
  48. }
  49. // NOTE: no return, make sure that all instances from other nodes register into self.
  50. }
  51. s.nodes.UP()
  52. return
  53. }
  54. func (s *Service) regSelf() context.CancelFunc {
  55. ctx, cancel := context.WithCancel(context.Background())
  56. now := time.Now().UnixNano()
  57. ins := &model.Instance{
  58. Region: libenv.Region,
  59. Zone: libenv.Zone,
  60. Env: libenv.DeployEnv,
  61. Hostname: libenv.Hostname,
  62. Appid: model.AppID,
  63. Addrs: []string{
  64. "http://" + s.c.BM.Inner.Addr,
  65. },
  66. Status: model.InstanceStatusUP,
  67. RegTimestamp: now,
  68. UpTimestamp: now,
  69. LatestTimestamp: now,
  70. RenewTimestamp: now,
  71. DirtyTimestamp: now,
  72. }
  73. s.Register(ctx, ins, now, false)
  74. go func() {
  75. ticker := time.NewTicker(30 * time.Second)
  76. defer ticker.Stop()
  77. for {
  78. select {
  79. case <-ticker.C:
  80. arg := &model.ArgRenew{
  81. Appid: model.AppID,
  82. Region: libenv.Region,
  83. Zone: libenv.Zone,
  84. Env: libenv.DeployEnv,
  85. Hostname: libenv.Hostname,
  86. }
  87. if _, err := s.Renew(ctx, arg); err != nil && ecode.NothingFound.Equal(err) {
  88. s.Register(ctx, ins, now, false)
  89. }
  90. case <-ctx.Done():
  91. arg := &model.ArgCancel{
  92. Appid: model.AppID,
  93. Region: libenv.Region,
  94. Zone: libenv.Zone,
  95. Env: libenv.DeployEnv,
  96. Hostname: libenv.Hostname,
  97. }
  98. if err := s.Cancel(context.Background(), arg); err != nil {
  99. log.Error("s.Cancel(%+v) error(%v)", arg, err)
  100. }
  101. return
  102. }
  103. }
  104. }()
  105. return cancel
  106. }
  107. func (s *Service) nodesproc() {
  108. var (
  109. lastTs int64
  110. )
  111. for {
  112. arg := &model.ArgPolls{
  113. Appid: []string{model.AppID},
  114. Region: libenv.Region,
  115. Env: libenv.DeployEnv,
  116. Hostname: libenv.Hostname,
  117. LatestTimestamp: []int64{lastTs},
  118. }
  119. ch, _, err := s.registry.Polls(arg)
  120. if err != nil && err != ecode.NotModified {
  121. log.Error("s.registry(%v) error(%v)", arg, err)
  122. time.Sleep(time.Second)
  123. continue
  124. }
  125. apps := <-ch
  126. ins, ok := apps[model.AppID]
  127. if !ok || ins == nil {
  128. return
  129. }
  130. var (
  131. nodes []string
  132. zones = make(map[string][]string)
  133. )
  134. for _, in := range ins.Instances {
  135. for _, addr := range in.Addrs {
  136. u, err := url.Parse(addr)
  137. if err == nil && u.Scheme == "http" {
  138. if in.Zone == libenv.Zone {
  139. nodes = append(nodes, u.Host)
  140. } else if _, ok := s.c.Zones[in.Zone]; ok {
  141. zones[in.Zone] = append(zones[in.Zone], u.Host)
  142. }
  143. }
  144. }
  145. }
  146. lastTs = ins.LatestTimestamp
  147. log.Info("discovery changed nodes:%v zones:%v", nodes, zones)
  148. c := new(conf.Config)
  149. *c = *s.c
  150. c.Nodes = nodes
  151. c.Zones = zones
  152. s.nodes = dao.NewNodes(c)
  153. s.nodes.UP()
  154. }
  155. }