node.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/url"
  7. "strconv"
  8. "strings"
  9. "go-common/app/infra/discovery/conf"
  10. "go-common/app/infra/discovery/model"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. bm "go-common/library/net/http/blademaster"
  14. "go-common/library/xstr"
  15. )
  16. const (
  17. _registerURL = "/discovery/register"
  18. _cancelURL = "/discovery/cancel"
  19. _renewURL = "/discovery/renew"
  20. _setURL = "/discovery/set"
  21. )
  22. // Node represents a peer node to which information should be shared from this node.
  23. //
  24. // This struct handles replicating all update operations like 'Register,Renew,Cancel,Expiration and Status Changes'
  25. // to the <Discovery Server> node it represents.
  26. type Node struct {
  27. c *conf.Config
  28. client *bm.Client
  29. pRegisterURL string
  30. registerURL string
  31. cancelURL string
  32. renewURL string
  33. setURL string
  34. addr string
  35. status model.NodeStatus
  36. zone string
  37. otherZone bool
  38. }
  39. // newNode return a node.
  40. func newNode(c *conf.Config, addr string) (n *Node) {
  41. n = &Node{
  42. c: c,
  43. addr: addr,
  44. registerURL: fmt.Sprintf("http://%s%s", addr, _registerURL),
  45. cancelURL: fmt.Sprintf("http://%s%s", addr, _cancelURL),
  46. renewURL: fmt.Sprintf("http://%s%s", addr, _renewURL),
  47. setURL: fmt.Sprintf("http://%s%s", addr, _setURL),
  48. client: bm.NewClient(c.HTTPClient),
  49. status: model.NodeStatusLost,
  50. }
  51. return
  52. }
  53. // Register send the registration information of Instance receiving by this node to the peer node represented.
  54. func (n *Node) Register(c context.Context, i *model.Instance) (err error) {
  55. err = n.call(c, model.Register, i, n.registerURL, nil)
  56. if err != nil {
  57. log.Warn("node be called(%s) register instance(%v) error(%v)", n.registerURL, i, err)
  58. }
  59. return
  60. }
  61. // Cancel send the cancellation information of Instance receiving by this node to the peer node represented.
  62. func (n *Node) Cancel(c context.Context, i *model.Instance) (err error) {
  63. err = n.call(c, model.Cancel, i, n.cancelURL, nil)
  64. if ec := ecode.Cause(err); ec.Code() == ecode.NothingFound.Code() {
  65. log.Warn("node be called(%s) instance(%v) already canceled", n.cancelURL, i)
  66. }
  67. return
  68. }
  69. // Renew send the heartbeat information of Instance receiving by this node to the peer node represented.
  70. // If the instance does not exist the node, the instance registration information is sent again to the peer node.
  71. func (n *Node) Renew(c context.Context, i *model.Instance) (err error) {
  72. var res *model.Instance
  73. err = n.call(c, model.Renew, i, n.renewURL, &res)
  74. ec := ecode.Cause(err)
  75. if ec.Code() == ecode.ServerErr.Code() {
  76. log.Warn("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err)
  77. n.status = model.NodeStatusLost
  78. return
  79. }
  80. n.status = model.NodeStatusUP
  81. if ec.Code() == ecode.NothingFound.Code() {
  82. log.Warn("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err)
  83. err = n.call(c, model.Register, i, n.registerURL, nil)
  84. return
  85. }
  86. // NOTE: register response instance whitch in conflict with peer node
  87. if ec.Code() == ecode.Conflict.Code() && res != nil {
  88. err = n.call(c, model.Register, res, n.pRegisterURL, nil)
  89. }
  90. return
  91. }
  92. // Set the infomation of instance by this node to the peer node represented
  93. func (n *Node) Set(c context.Context, arg *model.ArgSet) (err error) {
  94. err = n.setCall(c, arg, n.setURL)
  95. return
  96. }
  97. func (n *Node) call(c context.Context, action model.Action, i *model.Instance, uri string, data interface{}) (err error) {
  98. params := url.Values{}
  99. params.Set("region", i.Region)
  100. params.Set("zone", i.Zone)
  101. params.Set("env", i.Env)
  102. params.Set("treeid", strconv.FormatInt(i.Treeid, 10))
  103. params.Set("appid", i.Appid)
  104. params.Set("hostname", i.Hostname)
  105. if n.otherZone {
  106. params.Set("replication", "false")
  107. } else {
  108. params.Set("replication", "true")
  109. }
  110. switch action {
  111. case model.Register:
  112. params.Set("status", strconv.FormatUint(uint64(i.Status), 10))
  113. params.Set("version", i.Version)
  114. meta, _ := json.Marshal(i.Metadata)
  115. params.Set("metadata", string(meta))
  116. params.Set("addrs", strings.Join(i.Addrs, ","))
  117. params.Set("reg_timestamp", strconv.FormatInt(i.RegTimestamp, 10))
  118. params.Set("dirty_timestamp", strconv.FormatInt(i.DirtyTimestamp, 10))
  119. params.Set("latest_timestamp", strconv.FormatInt(i.LatestTimestamp, 10))
  120. case model.Renew:
  121. params.Set("dirty_timestamp", strconv.FormatInt(i.DirtyTimestamp, 10))
  122. case model.Cancel:
  123. params.Set("latest_timestamp", strconv.FormatInt(i.LatestTimestamp, 10))
  124. }
  125. var res struct {
  126. Code int `json:"code"`
  127. Data json.RawMessage `json:"data"`
  128. }
  129. if err = n.client.Post(c, uri, "", params, &res); err != nil {
  130. log.Error("node be called(%s) instance(%v) error(%v)", uri, i, err)
  131. return
  132. }
  133. if res.Code != 0 {
  134. log.Error("node be called(%s) instance(%v) responce code(%v)", uri, i, res.Code)
  135. if err = ecode.Int(res.Code); err == ecode.Conflict {
  136. json.Unmarshal([]byte(res.Data), data)
  137. }
  138. return
  139. }
  140. return
  141. }
  142. func (n *Node) setCall(c context.Context, arg *model.ArgSet, uri string) (err error) {
  143. params := url.Values{}
  144. params.Set("region", arg.Region)
  145. params.Set("zone", arg.Zone)
  146. params.Set("env", arg.Env)
  147. params.Set("appid", arg.Appid)
  148. params.Set("hostname", strings.Join(arg.Hostname, ","))
  149. params.Set("set_timestamp", strconv.FormatInt(arg.SetTimestamp, 10))
  150. params.Set("replication", "true")
  151. if len(arg.Status) != 0 {
  152. params.Set("status", xstr.JoinInts(arg.Status))
  153. }
  154. if len(arg.Metadata) != 0 {
  155. params.Set("metadata", strings.Join(arg.Metadata, ","))
  156. }
  157. var res struct {
  158. Code int `json:"code"`
  159. }
  160. if err = n.client.Post(c, uri, "", params, &res); err != nil {
  161. log.Error("node be setCalled(%s) appid(%s) env (%s) error(%v)", uri, arg.Appid, arg.Env, err)
  162. return
  163. }
  164. if res.Code != 0 {
  165. log.Error("node be setCalled(%s) appid(%s) env (%s) responce code(%v)", uri, arg.Appid, arg.Env, res.Code)
  166. }
  167. return
  168. }