instance.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package model
  2. import (
  3. "encoding/json"
  4. "strconv"
  5. "sync"
  6. "time"
  7. "go-common/library/ecode"
  8. "go-common/library/log"
  9. )
  10. // InstanceStatus Status of instance
  11. // type InstanceStatus uint32
  12. const (
  13. // InstanceStatusUP Ready to receive traffic
  14. InstanceStatusUP = uint32(1)
  15. // InstancestatusWating Intentionally shutdown for traffic
  16. InstancestatusWating = uint32(1) << 1
  17. )
  18. func (i *Instance) filter(status uint32) bool {
  19. return status&i.Status > 0
  20. }
  21. // Action Replicate type of node
  22. type Action int
  23. const (
  24. // Register Replicate the add action to all nodes
  25. Register Action = iota
  26. // Renew Replicate the heartbeat action to all nodes
  27. Renew
  28. // Cancel Replicate the cancel action to all nodes
  29. Cancel
  30. // Weight Replicate the Weight action to all nodes
  31. Weight
  32. // Delete Replicate the Delete action to all nodes
  33. Delete
  34. // Status Replicate the Status action to all nodes
  35. Status
  36. )
  37. // Instance holds information required for registration with
  38. // <Discovery Server> and to be discovered by other components.
  39. type Instance struct {
  40. Region string `json:"region"`
  41. Zone string `json:"zone"`
  42. Env string `json:"env"`
  43. Appid string `json:"appid"`
  44. Treeid int64 `json:"treeid"`
  45. Hostname string `json:"hostname"`
  46. HTTP string `json:"http"`
  47. RPC string `json:"rpc"`
  48. Version string `json:"version"`
  49. Metadata map[string]string `json:"metadata"`
  50. Addrs []string `json:"addrs"`
  51. // Status enum instance status
  52. Status uint32 `json:"status"`
  53. // timestamp
  54. RegTimestamp int64 `json:"reg_timestamp"`
  55. UpTimestamp int64 `json:"up_timestamp"` // NOTE: It is latest timestamp that status becomes UP.
  56. RenewTimestamp int64 `json:"renew_timestamp"`
  57. DirtyTimestamp int64 `json:"dirty_timestamp"`
  58. LatestTimestamp int64 `json:"latest_timestamp"`
  59. }
  60. // NewInstance new a instance.
  61. func NewInstance(arg *ArgRegister) (i *Instance) {
  62. now := time.Now().UnixNano()
  63. i = &Instance{
  64. Region: arg.Region,
  65. Zone: arg.Zone,
  66. Env: arg.Env,
  67. Appid: arg.Appid,
  68. Treeid: arg.Treeid,
  69. Hostname: arg.Hostname,
  70. HTTP: arg.HTTP,
  71. RPC: arg.RPC,
  72. Version: arg.Version,
  73. Status: arg.Status,
  74. Addrs: arg.Addrs,
  75. RegTimestamp: now,
  76. UpTimestamp: now,
  77. LatestTimestamp: now,
  78. RenewTimestamp: now,
  79. DirtyTimestamp: now,
  80. }
  81. i.Metadata = make(map[string]string)
  82. if arg.Metadata != "" {
  83. if err := json.Unmarshal([]byte(arg.Metadata), &i.Metadata); err != nil {
  84. log.Error("json unmarshal metadata err %v", err)
  85. }
  86. }
  87. return
  88. }
  89. // InstanceInfo the info get by consumer.
  90. type InstanceInfo struct {
  91. Instances []*Instance `json:"instances"`
  92. ZoneInstances map[string][]*Instance `json:"zone_instances"`
  93. LatestTimestamp int64 `json:"latest_timestamp"`
  94. LatestTimestampStr string `json:"latest_timestamp_str"`
  95. }
  96. // Apps app distinguished by zone
  97. type Apps struct {
  98. apps map[string]*App
  99. lock sync.RWMutex
  100. latestTimestamp int64
  101. }
  102. // NewApps return new Apps.
  103. func NewApps() *Apps {
  104. return &Apps{
  105. apps: make(map[string]*App),
  106. }
  107. }
  108. // NewApp news a app by appid. If ok=false, returns the app of already exist.
  109. func (p *Apps) NewApp(zone, appid string, treeid, lts int64) (a *App, new bool) {
  110. p.lock.Lock()
  111. a, ok := p.apps[zone]
  112. if !ok {
  113. a = NewApp(zone, appid, treeid)
  114. p.apps[zone] = a
  115. }
  116. if lts <= p.latestTimestamp {
  117. // insure increase
  118. lts = p.latestTimestamp + 1
  119. }
  120. p.latestTimestamp = lts
  121. p.lock.Unlock()
  122. new = !ok
  123. return
  124. }
  125. // App get app by zone.
  126. func (p *Apps) App(zone string) (as []*App) {
  127. p.lock.RLock()
  128. if zone != "" {
  129. a, ok := p.apps[zone]
  130. if !ok {
  131. p.lock.RUnlock()
  132. return
  133. }
  134. as = []*App{a}
  135. } else {
  136. for _, a := range p.apps {
  137. as = append(as, a)
  138. }
  139. }
  140. p.lock.RUnlock()
  141. return
  142. }
  143. // Del del app by zone.
  144. func (p *Apps) Del(zone string) {
  145. p.lock.Lock()
  146. delete(p.apps, zone)
  147. p.lock.Unlock()
  148. }
  149. // InstanceInfo return slice of instances.if up is true,return all status instance else return up status instance
  150. func (p *Apps) InstanceInfo(zone string, latestTime int64, status uint32) (ci *InstanceInfo, err error) {
  151. p.lock.RLock()
  152. defer p.lock.RUnlock()
  153. if latestTime >= p.latestTimestamp {
  154. err = ecode.NotModified
  155. return
  156. }
  157. ci = &InstanceInfo{
  158. LatestTimestamp: p.latestTimestamp,
  159. LatestTimestampStr: strconv.FormatInt(p.latestTimestamp/int64(time.Second), 10),
  160. ZoneInstances: make(map[string][]*Instance),
  161. }
  162. var ok bool
  163. for z, app := range p.apps {
  164. if zone == "" || z == zone {
  165. ok = true
  166. as := app.Instances()
  167. if len(as) == 0 {
  168. continue
  169. }
  170. instance := make([]*Instance, 0, len(as))
  171. for _, i := range as {
  172. // if up is false return all status instance
  173. if i.filter(status) {
  174. // if i.Status == InstanceStatusUP && i.LatestTimestamp > latestTime { // TODO(felix): increase
  175. ni := new(Instance)
  176. *ni = *i
  177. instance = append(instance, ni)
  178. }
  179. }
  180. ci.Instances = append(ci.Instances, instance...)
  181. ci.ZoneInstances[z] = instance
  182. }
  183. }
  184. if !ok {
  185. err = ecode.NothingFound
  186. } else if len(ci.Instances) == 0 {
  187. err = ecode.NotModified
  188. }
  189. return
  190. }
  191. // UpdateLatest update LatestTimestamp.
  192. func (p *Apps) UpdateLatest(latestTime int64) {
  193. if latestTime <= p.latestTimestamp {
  194. // insure increase
  195. latestTime = p.latestTimestamp + 1
  196. }
  197. p.latestTimestamp = latestTime
  198. }
  199. // App Instances distinguished by hostname
  200. type App struct {
  201. AppID string
  202. Treeid int64
  203. Zone string
  204. instances map[string]*Instance
  205. latestTimestamp int64
  206. lock sync.RWMutex
  207. }
  208. // NewApp new App.
  209. func NewApp(zone, appid string, treeid int64) (a *App) {
  210. a = &App{
  211. Treeid: treeid,
  212. AppID: appid,
  213. Zone: zone,
  214. instances: make(map[string]*Instance),
  215. }
  216. return
  217. }
  218. // Instances return slice of instances.
  219. func (a *App) Instances() (is []*Instance) {
  220. a.lock.RLock()
  221. is = make([]*Instance, 0, len(a.instances))
  222. for _, i := range a.instances {
  223. ni := new(Instance)
  224. *ni = *i
  225. is = append(is, ni)
  226. }
  227. a.lock.RUnlock()
  228. return
  229. }
  230. // NewInstance new a instance.
  231. func (a *App) NewInstance(ni *Instance, latestTime int64) (i *Instance, ok bool) {
  232. i = new(Instance)
  233. a.lock.Lock()
  234. oi, ok := a.instances[ni.Hostname]
  235. if ok {
  236. ni.UpTimestamp = oi.UpTimestamp
  237. if ni.DirtyTimestamp < oi.DirtyTimestamp {
  238. log.Warn("register exist(%v) dirty timestamp over than caller(%v)", oi, ni)
  239. ni = oi
  240. }
  241. }
  242. a.instances[ni.Hostname] = ni
  243. a.updateLatest(latestTime)
  244. *i = *ni
  245. a.lock.Unlock()
  246. ok = !ok
  247. return
  248. }
  249. // Renew new a instance.
  250. func (a *App) Renew(hostname string) (i *Instance, ok bool) {
  251. i = new(Instance)
  252. a.lock.Lock()
  253. defer a.lock.Unlock()
  254. oi, ok := a.instances[hostname]
  255. if !ok {
  256. return
  257. }
  258. oi.RenewTimestamp = time.Now().UnixNano()
  259. *i = *oi
  260. return
  261. }
  262. func (a *App) updateLatest(latestTime int64) {
  263. if latestTime <= a.latestTimestamp {
  264. // insure increase
  265. latestTime = a.latestTimestamp + 1
  266. }
  267. a.latestTimestamp = latestTime
  268. }
  269. // Cancel cancel a instance.
  270. func (a *App) Cancel(hostname string, latestTime int64) (i *Instance, l int, ok bool) {
  271. i = new(Instance)
  272. a.lock.Lock()
  273. defer a.lock.Unlock()
  274. oi, ok := a.instances[hostname]
  275. if !ok {
  276. return
  277. }
  278. delete(a.instances, hostname)
  279. l = len(a.instances)
  280. oi.LatestTimestamp = latestTime
  281. a.updateLatest(latestTime)
  282. *i = *oi
  283. return
  284. }
  285. // Len returns the length of instances.
  286. func (a *App) Len() (l int) {
  287. a.lock.RLock()
  288. l = len(a.instances)
  289. a.lock.RUnlock()
  290. return
  291. }
  292. // Set set new status,metadata of instance .
  293. func (a *App) Set(changes *ArgSet) (ok bool) {
  294. a.lock.Lock()
  295. defer a.lock.Unlock()
  296. var (
  297. dst *Instance
  298. setTime int64
  299. )
  300. if changes.SetTimestamp == 0 {
  301. setTime = time.Now().UnixNano()
  302. }
  303. for i, hostname := range changes.Hostname {
  304. if dst, ok = a.instances[hostname]; !ok {
  305. log.Error("Set hostname(%s) not found", hostname)
  306. return
  307. }
  308. if len(changes.Status) != 0 {
  309. if uint32(changes.Status[i]) != InstanceStatusUP && uint32(changes.Status[i]) != InstancestatusWating {
  310. log.Error("SetStatus change status(%d) is error", changes.Status[i])
  311. ok = false
  312. return
  313. }
  314. dst.Status = uint32(changes.Status[i])
  315. if dst.Status == InstanceStatusUP {
  316. dst.UpTimestamp = setTime
  317. }
  318. }
  319. if len(changes.Metadata) != 0 {
  320. metadata := make(map[string]string)
  321. if err := json.Unmarshal([]byte(changes.Metadata[i]), &metadata); err != nil {
  322. log.Error("set change metadata err %s", changes.Metadata[i])
  323. ok = false
  324. return
  325. }
  326. dst.Metadata = metadata
  327. }
  328. dst.LatestTimestamp = setTime
  329. dst.DirtyTimestamp = setTime
  330. }
  331. a.updateLatest(setTime)
  332. return
  333. }