nodes.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "go-common/app/infra/discovery/conf"
  7. "go-common/app/infra/discovery/model"
  8. "go-common/library/sync/errgroup"
  9. )
  10. // Nodes is helper to manage lifecycle of a collection of Nodes.
  11. type Nodes struct {
  12. nodes []*Node
  13. zones map[string][]*Node
  14. selfAddr string
  15. }
  16. // NewNodes new nodes and return.
  17. func NewNodes(c *conf.Config) *Nodes {
  18. nodes := make([]*Node, 0, len(c.Nodes))
  19. for _, addr := range c.Nodes {
  20. n := newNode(c, addr)
  21. n.pRegisterURL = fmt.Sprintf("http://%s%s", c.BM.Inner.Addr, _registerURL)
  22. nodes = append(nodes, n)
  23. }
  24. zones := make(map[string][]*Node)
  25. for name, addrs := range c.Zones {
  26. var znodes []*Node
  27. for _, addr := range addrs {
  28. n := newNode(c, addr)
  29. n.otherZone = true
  30. n.zone = name
  31. n.pRegisterURL = fmt.Sprintf("http://%s%s", c.BM.Inner.Addr, _registerURL)
  32. znodes = append(znodes, n)
  33. }
  34. zones[name] = znodes
  35. }
  36. return &Nodes{
  37. nodes: nodes,
  38. zones: zones,
  39. selfAddr: c.BM.Inner.Addr,
  40. }
  41. }
  42. // Replicate replicate information to all nodes except for this node.
  43. func (ns *Nodes) Replicate(c context.Context, action model.Action, i *model.Instance, otherZone bool) (err error) {
  44. if len(ns.nodes) == 0 {
  45. return
  46. }
  47. eg, c := errgroup.WithContext(c)
  48. for _, n := range ns.nodes {
  49. if !ns.Myself(n.addr) {
  50. ns.action(c, eg, action, n, i)
  51. }
  52. }
  53. if !otherZone {
  54. for _, zns := range ns.zones {
  55. if n := len(zns); n > 0 {
  56. ns.action(c, eg, action, zns[rand.Intn(n)], i)
  57. }
  58. }
  59. }
  60. err = eg.Wait()
  61. return
  62. }
  63. func (ns *Nodes) action(c context.Context, eg *errgroup.Group, action model.Action, n *Node, i *model.Instance) {
  64. switch action {
  65. case model.Register:
  66. eg.Go(func() error {
  67. n.Register(c, i)
  68. return nil
  69. })
  70. case model.Renew:
  71. eg.Go(func() error {
  72. n.Renew(c, i)
  73. return nil
  74. })
  75. case model.Cancel:
  76. eg.Go(func() error {
  77. n.Cancel(c, i)
  78. return nil
  79. })
  80. }
  81. }
  82. // ReplicateSet replicate set information to all nodes except for this node.
  83. func (ns *Nodes) ReplicateSet(c context.Context, arg *model.ArgSet, otherZone bool) (err error) {
  84. if len(ns.nodes) == 0 {
  85. return
  86. }
  87. eg, c := errgroup.WithContext(c)
  88. for _, n := range ns.nodes {
  89. if !ns.Myself(n.addr) {
  90. eg.Go(func() error {
  91. return n.Set(c, arg)
  92. })
  93. }
  94. }
  95. if !otherZone {
  96. for _, zns := range ns.zones {
  97. if n := len(zns); n > 0 {
  98. node := zns[rand.Intn(n)]
  99. eg.Go(func() error {
  100. return node.Set(c, arg)
  101. })
  102. }
  103. }
  104. }
  105. err = eg.Wait()
  106. return
  107. }
  108. // Nodes returns nodes of local zone.
  109. func (ns *Nodes) Nodes() (nsi []*model.Node) {
  110. nsi = make([]*model.Node, 0, len(ns.nodes))
  111. for _, nd := range ns.nodes {
  112. if nd.otherZone {
  113. continue
  114. }
  115. node := &model.Node{
  116. Addr: nd.addr,
  117. Status: nd.status,
  118. Zone: nd.zone,
  119. }
  120. nsi = append(nsi, node)
  121. }
  122. return
  123. }
  124. // AllNodes returns nodes contain other zone nodes.
  125. func (ns *Nodes) AllNodes() (nsi []*model.Node) {
  126. nsi = make([]*model.Node, 0, len(ns.nodes))
  127. for _, nd := range ns.nodes {
  128. node := &model.Node{
  129. Addr: nd.addr,
  130. Status: nd.status,
  131. Zone: nd.zone,
  132. }
  133. nsi = append(nsi, node)
  134. }
  135. for _, zns := range ns.zones {
  136. if n := len(zns); n > 0 {
  137. nd := zns[rand.Intn(n)]
  138. node := &model.Node{
  139. Addr: nd.addr,
  140. Status: nd.status,
  141. Zone: nd.zone,
  142. }
  143. nsi = append(nsi, node)
  144. }
  145. }
  146. return
  147. }
  148. // Myself returns whether or not myself.
  149. func (ns *Nodes) Myself(addr string) bool {
  150. return ns.selfAddr == addr
  151. }
  152. // UP marks status of myself node up.
  153. func (ns *Nodes) UP() {
  154. for _, nd := range ns.nodes {
  155. if ns.Myself(nd.addr) {
  156. nd.status = model.NodeStatusUP
  157. }
  158. }
  159. }