123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- package model
- import (
- "encoding/json"
- "strconv"
- "sync"
- "time"
- "go-common/library/ecode"
- "go-common/library/log"
- )
- // InstanceStatus Status of instance
- // type InstanceStatus uint32
- const (
- // InstanceStatusUP Ready to receive traffic
- InstanceStatusUP = uint32(1)
- // InstancestatusWating Intentionally shutdown for traffic
- InstancestatusWating = uint32(1) << 1
- )
- func (i *Instance) filter(status uint32) bool {
- return status&i.Status > 0
- }
- // Action Replicate type of node
- type Action int
- const (
- // Register Replicate the add action to all nodes
- Register Action = iota
- // Renew Replicate the heartbeat action to all nodes
- Renew
- // Cancel Replicate the cancel action to all nodes
- Cancel
- // Weight Replicate the Weight action to all nodes
- Weight
- // Delete Replicate the Delete action to all nodes
- Delete
- // Status Replicate the Status action to all nodes
- Status
- )
- // Instance holds information required for registration with
- // <Discovery Server> and to be discovered by other components.
- type Instance struct {
- Region string `json:"region"`
- Zone string `json:"zone"`
- Env string `json:"env"`
- Appid string `json:"appid"`
- Treeid int64 `json:"treeid"`
- Hostname string `json:"hostname"`
- HTTP string `json:"http"`
- RPC string `json:"rpc"`
- Version string `json:"version"`
- Metadata map[string]string `json:"metadata"`
- Addrs []string `json:"addrs"`
- // Status enum instance status
- Status uint32 `json:"status"`
- // timestamp
- RegTimestamp int64 `json:"reg_timestamp"`
- UpTimestamp int64 `json:"up_timestamp"` // NOTE: It is latest timestamp that status becomes UP.
- RenewTimestamp int64 `json:"renew_timestamp"`
- DirtyTimestamp int64 `json:"dirty_timestamp"`
- LatestTimestamp int64 `json:"latest_timestamp"`
- }
- // NewInstance new a instance.
- func NewInstance(arg *ArgRegister) (i *Instance) {
- now := time.Now().UnixNano()
- i = &Instance{
- Region: arg.Region,
- Zone: arg.Zone,
- Env: arg.Env,
- Appid: arg.Appid,
- Treeid: arg.Treeid,
- Hostname: arg.Hostname,
- HTTP: arg.HTTP,
- RPC: arg.RPC,
- Version: arg.Version,
- Status: arg.Status,
- Addrs: arg.Addrs,
- RegTimestamp: now,
- UpTimestamp: now,
- LatestTimestamp: now,
- RenewTimestamp: now,
- DirtyTimestamp: now,
- }
- i.Metadata = make(map[string]string)
- if arg.Metadata != "" {
- if err := json.Unmarshal([]byte(arg.Metadata), &i.Metadata); err != nil {
- log.Error("json unmarshal metadata err %v", err)
- }
- }
- return
- }
- // InstanceInfo the info get by consumer.
- type InstanceInfo struct {
- Instances []*Instance `json:"instances"`
- ZoneInstances map[string][]*Instance `json:"zone_instances"`
- LatestTimestamp int64 `json:"latest_timestamp"`
- LatestTimestampStr string `json:"latest_timestamp_str"`
- }
- // Apps app distinguished by zone
- type Apps struct {
- apps map[string]*App
- lock sync.RWMutex
- latestTimestamp int64
- }
- // NewApps return new Apps.
- func NewApps() *Apps {
- return &Apps{
- apps: make(map[string]*App),
- }
- }
- // NewApp news a app by appid. If ok=false, returns the app of already exist.
- func (p *Apps) NewApp(zone, appid string, treeid, lts int64) (a *App, new bool) {
- p.lock.Lock()
- a, ok := p.apps[zone]
- if !ok {
- a = NewApp(zone, appid, treeid)
- p.apps[zone] = a
- }
- if lts <= p.latestTimestamp {
- // insure increase
- lts = p.latestTimestamp + 1
- }
- p.latestTimestamp = lts
- p.lock.Unlock()
- new = !ok
- return
- }
- // App get app by zone.
- func (p *Apps) App(zone string) (as []*App) {
- p.lock.RLock()
- if zone != "" {
- a, ok := p.apps[zone]
- if !ok {
- p.lock.RUnlock()
- return
- }
- as = []*App{a}
- } else {
- for _, a := range p.apps {
- as = append(as, a)
- }
- }
- p.lock.RUnlock()
- return
- }
- // Del del app by zone.
- func (p *Apps) Del(zone string) {
- p.lock.Lock()
- delete(p.apps, zone)
- p.lock.Unlock()
- }
- // InstanceInfo return slice of instances.if up is true,return all status instance else return up status instance
- func (p *Apps) InstanceInfo(zone string, latestTime int64, status uint32) (ci *InstanceInfo, err error) {
- p.lock.RLock()
- defer p.lock.RUnlock()
- if latestTime >= p.latestTimestamp {
- err = ecode.NotModified
- return
- }
- ci = &InstanceInfo{
- LatestTimestamp: p.latestTimestamp,
- LatestTimestampStr: strconv.FormatInt(p.latestTimestamp/int64(time.Second), 10),
- ZoneInstances: make(map[string][]*Instance),
- }
- var ok bool
- for z, app := range p.apps {
- if zone == "" || z == zone {
- ok = true
- as := app.Instances()
- if len(as) == 0 {
- continue
- }
- instance := make([]*Instance, 0, len(as))
- for _, i := range as {
- // if up is false return all status instance
- if i.filter(status) {
- // if i.Status == InstanceStatusUP && i.LatestTimestamp > latestTime { // TODO(felix): increase
- ni := new(Instance)
- *ni = *i
- instance = append(instance, ni)
- }
- }
- ci.Instances = append(ci.Instances, instance...)
- ci.ZoneInstances[z] = instance
- }
- }
- if !ok {
- err = ecode.NothingFound
- } else if len(ci.Instances) == 0 {
- err = ecode.NotModified
- }
- return
- }
- // UpdateLatest update LatestTimestamp.
- func (p *Apps) UpdateLatest(latestTime int64) {
- if latestTime <= p.latestTimestamp {
- // insure increase
- latestTime = p.latestTimestamp + 1
- }
- p.latestTimestamp = latestTime
- }
- // App Instances distinguished by hostname
- type App struct {
- AppID string
- Treeid int64
- Zone string
- instances map[string]*Instance
- latestTimestamp int64
- lock sync.RWMutex
- }
- // NewApp new App.
- func NewApp(zone, appid string, treeid int64) (a *App) {
- a = &App{
- Treeid: treeid,
- AppID: appid,
- Zone: zone,
- instances: make(map[string]*Instance),
- }
- return
- }
- // Instances return slice of instances.
- func (a *App) Instances() (is []*Instance) {
- a.lock.RLock()
- is = make([]*Instance, 0, len(a.instances))
- for _, i := range a.instances {
- ni := new(Instance)
- *ni = *i
- is = append(is, ni)
- }
- a.lock.RUnlock()
- return
- }
- // NewInstance new a instance.
- func (a *App) NewInstance(ni *Instance, latestTime int64) (i *Instance, ok bool) {
- i = new(Instance)
- a.lock.Lock()
- oi, ok := a.instances[ni.Hostname]
- if ok {
- ni.UpTimestamp = oi.UpTimestamp
- if ni.DirtyTimestamp < oi.DirtyTimestamp {
- log.Warn("register exist(%v) dirty timestamp over than caller(%v)", oi, ni)
- ni = oi
- }
- }
- a.instances[ni.Hostname] = ni
- a.updateLatest(latestTime)
- *i = *ni
- a.lock.Unlock()
- ok = !ok
- return
- }
- // Renew new a instance.
- func (a *App) Renew(hostname string) (i *Instance, ok bool) {
- i = new(Instance)
- a.lock.Lock()
- defer a.lock.Unlock()
- oi, ok := a.instances[hostname]
- if !ok {
- return
- }
- oi.RenewTimestamp = time.Now().UnixNano()
- *i = *oi
- return
- }
- func (a *App) updateLatest(latestTime int64) {
- if latestTime <= a.latestTimestamp {
- // insure increase
- latestTime = a.latestTimestamp + 1
- }
- a.latestTimestamp = latestTime
- }
- // Cancel cancel a instance.
- func (a *App) Cancel(hostname string, latestTime int64) (i *Instance, l int, ok bool) {
- i = new(Instance)
- a.lock.Lock()
- defer a.lock.Unlock()
- oi, ok := a.instances[hostname]
- if !ok {
- return
- }
- delete(a.instances, hostname)
- l = len(a.instances)
- oi.LatestTimestamp = latestTime
- a.updateLatest(latestTime)
- *i = *oi
- return
- }
- // Len returns the length of instances.
- func (a *App) Len() (l int) {
- a.lock.RLock()
- l = len(a.instances)
- a.lock.RUnlock()
- return
- }
- // Set set new status,metadata of instance .
- func (a *App) Set(changes *ArgSet) (ok bool) {
- a.lock.Lock()
- defer a.lock.Unlock()
- var (
- dst *Instance
- setTime int64
- )
- if changes.SetTimestamp == 0 {
- setTime = time.Now().UnixNano()
- }
- for i, hostname := range changes.Hostname {
- if dst, ok = a.instances[hostname]; !ok {
- log.Error("Set hostname(%s) not found", hostname)
- return
- }
- if len(changes.Status) != 0 {
- if uint32(changes.Status[i]) != InstanceStatusUP && uint32(changes.Status[i]) != InstancestatusWating {
- log.Error("SetStatus change status(%d) is error", changes.Status[i])
- ok = false
- return
- }
- dst.Status = uint32(changes.Status[i])
- if dst.Status == InstanceStatusUP {
- dst.UpTimestamp = setTime
- }
- }
- if len(changes.Metadata) != 0 {
- metadata := make(map[string]string)
- if err := json.Unmarshal([]byte(changes.Metadata[i]), &metadata); err != nil {
- log.Error("set change metadata err %s", changes.Metadata[i])
- ok = false
- return
- }
- dst.Metadata = metadata
- }
- dst.LatestTimestamp = setTime
- dst.DirtyTimestamp = setTime
- }
- a.updateLatest(setTime)
- return
- }
|