123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721 |
- package discovery
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "net/url"
- "os"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "go-common/library/conf/env"
- "go-common/library/ecode"
- "go-common/library/exp/feature"
- "go-common/library/log"
- "go-common/library/naming"
- bm "go-common/library/net/http/blademaster"
- "go-common/library/net/netutil"
- "go-common/library/net/netutil/breaker"
- xtime "go-common/library/time"
- "go-common/library/xstr"
- )
- const (
- _registerURL = "http://%s/discovery/register"
- _setURL = "http://%s/discovery/set"
- _cancelURL = "http://%s/discovery/cancel"
- _renewURL = "http://%s/discovery/renew"
- _pollURL = "http://%s/discovery/polls"
- _nodesURL = "http://%s/discovery/nodes"
- _registerGap = 30 * time.Second
- _statusUP = "1"
- )
- const (
- _appid = "infra.discovery"
- )
- var (
- _ naming.Builder = &Discovery{}
- _ naming.Registry = &Discovery{}
- _selfDiscoveryFeatrue feature.Feature = "discovery.self"
- _discoveryFeatures = map[feature.Feature]feature.Spec{
- _selfDiscoveryFeatrue: {Default: false},
- }
- // ErrDuplication duplication treeid.
- ErrDuplication = errors.New("discovery: instance duplicate registration")
- )
- func init() {
- feature.DefaultGate.Add(_discoveryFeatures)
- }
- // Config discovery configures.
- type Config struct {
- Nodes []string
- Key string
- Secret string
- Region string
- Zone string
- Env string
- Host string
- }
- type appData struct {
- ZoneInstances map[string][]*naming.Instance `json:"zone_instances"`
- LastTs int64 `json:"latest_timestamp"`
- }
- // Discovery is discovery client.
- type Discovery struct {
- once sync.Once
- conf *Config
- ctx context.Context
- cancelFunc context.CancelFunc
- httpClient *bm.Client
- mutex sync.RWMutex
- apps map[string]*appInfo
- registry map[string]struct{}
- lastHost string
- cancelPolls context.CancelFunc
- idx uint64
- node atomic.Value
- delete chan *appInfo
- }
- type appInfo struct {
- zoneIns atomic.Value
- resolver map[*Resolver]struct{}
- lastTs int64 // latest timestamp
- }
- func fixConfig(c *Config) {
- if len(c.Nodes) == 0 {
- c.Nodes = []string{"api.bilibili.co"}
- }
- if env.Region != "" {
- c.Region = env.Region
- }
- if env.Zone != "" {
- c.Zone = env.Zone
- }
- if env.DeployEnv != "" {
- c.Env = env.DeployEnv
- }
- if env.Hostname != "" {
- c.Host = env.Hostname
- } else {
- c.Host, _ = os.Hostname()
- }
- }
- var (
- once sync.Once
- _defaultDiscovery *Discovery
- )
- func initDefault() {
- once.Do(func() {
- _defaultDiscovery = New(nil)
- })
- }
- // Builder return default discvoery resolver builder.
- func Builder() naming.Builder {
- if _defaultDiscovery == nil {
- initDefault()
- }
- return _defaultDiscovery
- }
- // Build register resolver into default discovery.
- func Build(id string) naming.Resolver {
- if _defaultDiscovery == nil {
- initDefault()
- }
- return _defaultDiscovery.Build(id)
- }
- // New new a discovery client.
- func New(c *Config) (d *Discovery) {
- if c == nil {
- c = &Config{
- Nodes: []string{"discovery.bilibili.co", "api.bilibili.co"},
- Key: "discovery",
- Secret: "discovery",
- }
- }
- fixConfig(c)
- ctx, cancel := context.WithCancel(context.Background())
- d = &Discovery{
- ctx: ctx,
- cancelFunc: cancel,
- conf: c,
- apps: map[string]*appInfo{},
- registry: map[string]struct{}{},
- delete: make(chan *appInfo, 10),
- }
- // httpClient
- cfg := &bm.ClientConfig{
- App: &bm.App{
- Key: c.Key,
- Secret: c.Secret,
- },
- Dial: xtime.Duration(3 * time.Second),
- Timeout: xtime.Duration(40 * time.Second),
- Breaker: &breaker.Config{
- Window: 100,
- Sleep: 3,
- Bucket: 10,
- Ratio: 0.5,
- Request: 100,
- },
- }
- d.httpClient = bm.NewClient(cfg)
- if feature.DefaultGate.Enabled(_selfDiscoveryFeatrue) {
- resolver := d.Build(_appid)
- event := resolver.Watch()
- _, ok := <-event
- if !ok {
- panic("discovery watch failed")
- }
- ins, ok := resolver.Fetch(context.Background())
- if ok {
- d.newSelf(ins)
- }
- go d.selfproc(resolver, event)
- }
- return
- }
- func (d *Discovery) selfproc(resolver naming.Resolver, event <-chan struct{}) {
- for {
- _, ok := <-event
- if !ok {
- return
- }
- zones, ok := resolver.Fetch(context.Background())
- if ok {
- d.newSelf(zones)
- }
- }
- }
- func (d *Discovery) newSelf(zones map[string][]*naming.Instance) {
- ins, ok := zones[d.conf.Zone]
- if !ok {
- return
- }
- var nodes []string
- for _, in := range ins {
- for _, addr := range in.Addrs {
- u, err := url.Parse(addr)
- if err == nil && u.Scheme == "http" {
- nodes = append(nodes, u.Host)
- }
- }
- }
- // diff old nodes
- olds, ok := d.node.Load().([]string)
- if ok {
- var diff int
- for _, n := range nodes {
- for _, o := range olds {
- if o == n {
- diff++
- break
- }
- }
- }
- if len(nodes) == diff {
- return
- }
- }
- // FIXME: we should use rand.Shuffle() in golang 1.10
- Shuffle(len(nodes), func(i, j int) {
- nodes[i], nodes[j] = nodes[j], nodes[i]
- })
- d.node.Store(nodes)
- }
- // Build disovery resovler builder.
- func (d *Discovery) Build(appid string) naming.Resolver {
- r := &Resolver{
- id: appid,
- d: d,
- event: make(chan struct{}, 1),
- }
- d.mutex.Lock()
- app, ok := d.apps[appid]
- if !ok {
- app = &appInfo{
- resolver: make(map[*Resolver]struct{}),
- }
- d.apps[appid] = app
- cancel := d.cancelPolls
- if cancel != nil {
- cancel()
- }
- }
- app.resolver[r] = struct{}{}
- d.mutex.Unlock()
- if ok {
- select {
- case r.event <- struct{}{}:
- default:
- }
- }
- log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok)
- d.once.Do(func() {
- go d.serverproc()
- })
- return r
- }
- // Scheme return discovery's scheme
- func (d *Discovery) Scheme() string {
- return "discovery"
- }
- // Resolver discveory resolver.
- type Resolver struct {
- id string
- event chan struct{}
- d *Discovery
- }
- // Watch watch instance.
- func (r *Resolver) Watch() <-chan struct{} {
- return r.event
- }
- // Fetch fetch resolver instance.
- func (r *Resolver) Fetch(c context.Context) (ins map[string][]*naming.Instance, ok bool) {
- r.d.mutex.RLock()
- app, ok := r.d.apps[r.id]
- r.d.mutex.RUnlock()
- if ok {
- ins, ok = app.zoneIns.Load().(map[string][]*naming.Instance)
- return
- }
- return
- }
- // Close close resolver.
- func (r *Resolver) Close() error {
- r.d.mutex.Lock()
- if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 {
- delete(app.resolver, r)
- // TODO: delete app from builder
- }
- r.d.mutex.Unlock()
- return nil
- }
- func (d *Discovery) pickNode() string {
- nodes, ok := d.node.Load().([]string)
- if !ok || len(nodes) == 0 {
- return d.conf.Nodes[d.idx%uint64(len(d.conf.Nodes))]
- }
- return nodes[d.idx%uint64(len(nodes))]
- }
- func (d *Discovery) switchNode() {
- atomic.AddUint64(&d.idx, 1)
- }
- // Reload reload the config
- func (d *Discovery) Reload(c *Config) {
- fixConfig(c)
- d.mutex.Lock()
- d.conf = c
- d.mutex.Unlock()
- }
- // Close stop all running process including discovery and register
- func (d *Discovery) Close() error {
- d.cancelFunc()
- return nil
- }
- // Register Register an instance with discovery and renew automatically
- func (d *Discovery) Register(c context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) {
- d.mutex.Lock()
- if _, ok := d.registry[ins.AppID]; ok {
- err = ErrDuplication
- } else {
- d.registry[ins.AppID] = struct{}{}
- }
- d.mutex.Unlock()
- if err != nil {
- return
- }
- if err = d.register(c, ins); err != nil {
- d.mutex.Lock()
- delete(d.registry, ins.AppID)
- d.mutex.Unlock()
- return
- }
- ctx, cancel := context.WithCancel(d.ctx)
- ch := make(chan struct{}, 1)
- cancelFunc = context.CancelFunc(func() {
- cancel()
- <-ch
- })
- go func() {
- ticker := time.NewTicker(_registerGap)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- if err := d.renew(ctx, ins); err != nil && ecode.NothingFound.Equal(err) {
- d.register(ctx, ins)
- }
- case <-ctx.Done():
- d.cancel(ins)
- ch <- struct{}{}
- return
- }
- }
- }()
- return
- }
- // Set set ins status and metadata.
- func (d *Discovery) Set(ins *naming.Instance) error {
- return d.set(context.Background(), ins)
- }
- // cancel Remove the registered instance from discovery
- func (d *Discovery) cancel(ins *naming.Instance) (err error) {
- d.mutex.RLock()
- conf := d.conf
- d.mutex.RUnlock()
- res := new(struct {
- Code int `json:"code"`
- Message string `json:"message"`
- })
- uri := fmt.Sprintf(_cancelURL, d.pickNode())
- params := d.newParams(conf)
- params.Set("appid", ins.AppID)
- // request
- if err = d.httpClient.Post(context.Background(), uri, "", params, &res); err != nil {
- d.switchNode()
- log.Error("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
- uri, conf.Env, ins.AppID, conf.Host, err)
- return
- }
- if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
- log.Warn("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
- uri, conf.Env, ins.AppID, conf.Host, res.Code)
- err = ec
- return
- }
- log.Info("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) success",
- uri, conf.Env, ins.AppID, conf.Host)
- return
- }
- // register Register an instance with discovery
- func (d *Discovery) register(ctx context.Context, ins *naming.Instance) (err error) {
- d.mutex.RLock()
- conf := d.conf
- d.mutex.RUnlock()
- var metadata []byte
- if ins.Metadata != nil {
- if metadata, err = json.Marshal(ins.Metadata); err != nil {
- log.Error("discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err)
- }
- }
- res := new(struct {
- Code int `json:"code"`
- Message string `json:"message"`
- })
- uri := fmt.Sprintf(_registerURL, d.pickNode())
- params := d.newParams(conf)
- params.Set("appid", ins.AppID)
- params.Set("addrs", strings.Join(ins.Addrs, ","))
- params.Set("version", ins.Version)
- params.Set("status", _statusUP)
- params.Set("metadata", string(metadata))
- if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
- d.switchNode()
- log.Error("discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)",
- uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err)
- return
- }
- if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
- log.Warn("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)",
- uri, conf.Env, ins.AppID, ins.Addrs, res.Code)
- err = ec
- return
- }
- log.Info("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success",
- uri, conf.Env, ins.AppID, ins.Addrs)
- return
- }
- // rset set instance info with discovery
- func (d *Discovery) set(ctx context.Context, ins *naming.Instance) (err error) {
- d.mutex.RLock()
- conf := d.conf
- d.mutex.RUnlock()
- res := new(struct {
- Code int `json:"code"`
- Message string `json:"message"`
- })
- uri := fmt.Sprintf(_setURL, d.pickNode())
- params := d.newParams(conf)
- params.Set("appid", ins.AppID)
- params.Set("version", ins.Version)
- params.Set("status", strconv.FormatInt(ins.Status, 10))
- if ins.Metadata != nil {
- var metadata []byte
- if metadata, err = json.Marshal(ins.Metadata); err != nil {
- log.Error("discovery:set instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err)
- }
- params.Set("metadata", string(metadata))
- }
- if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
- d.switchNode()
- log.Error("discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)",
- uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err)
- return
- }
- if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
- log.Warn("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)",
- uri, conf.Env, ins.AppID, ins.Addrs, res.Code)
- err = ec
- return
- }
- log.Info("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%s) success",
- uri+"?"+params.Encode(), conf.Env, ins.AppID, ins.Addrs)
- return
- }
- // renew Renew an instance with discovery
- func (d *Discovery) renew(ctx context.Context, ins *naming.Instance) (err error) {
- d.mutex.RLock()
- conf := d.conf
- d.mutex.RUnlock()
- res := new(struct {
- Code int `json:"code"`
- Message string `json:"message"`
- })
- uri := fmt.Sprintf(_renewURL, d.pickNode())
- params := d.newParams(conf)
- params.Set("appid", ins.AppID)
- if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
- d.switchNode()
- log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
- uri, conf.Env, ins.AppID, conf.Host, err)
- return
- }
- if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
- err = ec
- if ec.Equal(ecode.NothingFound) {
- return
- }
- log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
- uri, conf.Env, ins.AppID, conf.Host, res.Code)
- return
- }
- return
- }
- func (d *Discovery) serverproc() {
- var (
- retry int
- update bool
- ctx context.Context
- cancel context.CancelFunc
- )
- bc := netutil.DefaultBackoffConfig
- ticker := time.NewTicker(time.Minute * 30)
- defer ticker.Stop()
- for {
- if ctx == nil {
- ctx, cancel = context.WithCancel(d.ctx)
- d.mutex.Lock()
- d.cancelPolls = cancel
- d.mutex.Unlock()
- }
- select {
- case <-d.ctx.Done():
- return
- case <-ticker.C:
- update = true
- default:
- }
- if !feature.DefaultGate.Enabled(_selfDiscoveryFeatrue) {
- nodes, ok := d.node.Load().([]string)
- if !ok || len(nodes) == 0 || update {
- update = false
- tnodes := d.nodes()
- if len(tnodes) == 0 {
- time.Sleep(bc.Backoff(retry))
- retry++
- continue
- }
- retry = 0
- // FIXME: we should use rand.Shuffle() in golang 1.10
- Shuffle(len(tnodes), func(i, j int) {
- tnodes[i], tnodes[j] = tnodes[j], tnodes[i]
- })
- d.node.Store(tnodes)
- }
- }
- apps, err := d.polls(ctx, d.pickNode())
- if err != nil {
- d.switchNode()
- if ctx.Err() == context.Canceled {
- ctx = nil
- continue
- }
- time.Sleep(bc.Backoff(retry))
- retry++
- continue
- }
- retry = 0
- d.broadcast(apps)
- }
- }
- func (d *Discovery) nodes() (nodes []string) {
- res := new(struct {
- Code int `json:"code"`
- Data []struct {
- Addr string `json:"addr"`
- } `json:"data"`
- })
- uri := fmt.Sprintf(_nodesURL, d.pickNode())
- if err := d.httpClient.Get(d.ctx, uri, "", nil, res); err != nil {
- d.switchNode()
- log.Error("discovery: consumer client.Get(%v)error(%+v)", uri, err)
- return
- }
- if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
- log.Error("discovery: consumer client.Get(%v) error(%v)", uri, res.Code)
- return
- }
- if len(res.Data) == 0 {
- log.Warn("discovery: get nodes(%s) failed,no nodes found!", uri)
- return
- }
- nodes = make([]string, 0, len(res.Data))
- for i := range res.Data {
- nodes = append(nodes, res.Data[i].Addr)
- }
- return
- }
- func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]appData, err error) {
- var (
- lastTs []int64
- appid []string
- changed bool
- )
- if host != d.lastHost {
- d.lastHost = host
- changed = true
- }
- d.mutex.RLock()
- conf := d.conf
- for k, v := range d.apps {
- if changed {
- v.lastTs = 0
- }
- appid = append(appid, k)
- lastTs = append(lastTs, v.lastTs)
- }
- d.mutex.RUnlock()
- if len(appid) == 0 {
- return
- }
- uri := fmt.Sprintf(_pollURL, host)
- res := new(struct {
- Code int `json:"code"`
- Data map[string]appData `json:"data"`
- })
- params := url.Values{}
- params.Set("env", conf.Env)
- params.Set("hostname", conf.Host)
- params.Set("appid", strings.Join(appid, ","))
- params.Set("latest_timestamp", xstr.JoinInts(lastTs))
- if err = d.httpClient.Get(ctx, uri, "", params, res); err != nil {
- log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err)
- return
- }
- if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
- if !ec.Equal(ecode.NotModified) {
- log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code)
- err = ec
- }
- return
- }
- info, _ := json.Marshal(res.Data)
- for _, app := range res.Data {
- if app.LastTs == 0 {
- err = ecode.ServerErr
- log.Error("discovery: client.Get(%s) latest_timestamp is 0,instances:(%s)", uri+"?"+params.Encode(), info)
- return
- }
- }
- log.Info("discovery: polls uri(%s)", uri+"?"+params.Encode())
- log.Info("discovery: successfully polls(%s) instances (%s)", uri+"?"+params.Encode(), info)
- apps = res.Data
- return
- }
- func (d *Discovery) broadcast(apps map[string]appData) {
- for id, v := range apps {
- var count int
- for zone, ins := range v.ZoneInstances {
- if len(ins) == 0 {
- delete(v.ZoneInstances, zone)
- }
- count += len(ins)
- }
- if count == 0 {
- continue
- }
- d.mutex.RLock()
- app, ok := d.apps[id]
- d.mutex.RUnlock()
- if ok {
- app.lastTs = v.LastTs
- app.zoneIns.Store(v.ZoneInstances)
- d.mutex.RLock()
- for rs := range app.resolver {
- select {
- case rs.event <- struct{}{}:
- default:
- }
- }
- d.mutex.RUnlock()
- }
- }
- }
- func (d *Discovery) newParams(conf *Config) url.Values {
- params := url.Values{}
- params.Set("region", conf.Region)
- params.Set("zone", conf.Zone)
- params.Set("env", conf.Env)
- params.Set("hostname", conf.Host)
- return params
- }
|