discovery.go 17 KB


  1. package discovery
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/url"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "go-common/library/conf/env"
  15. "go-common/library/ecode"
  16. "go-common/library/exp/feature"
  17. "go-common/library/log"
  18. "go-common/library/naming"
  19. bm "go-common/library/net/http/blademaster"
  20. "go-common/library/net/netutil"
  21. "go-common/library/net/netutil/breaker"
  22. xtime "go-common/library/time"
  23. "go-common/library/xstr"
  24. )
  25. const (
  26. _registerURL = "http://%s/discovery/register"
  27. _setURL = "http://%s/discovery/set"
  28. _cancelURL = "http://%s/discovery/cancel"
  29. _renewURL = "http://%s/discovery/renew"
  30. _pollURL = "http://%s/discovery/polls"
  31. _nodesURL = "http://%s/discovery/nodes"
  32. _registerGap = 30 * time.Second
  33. _statusUP = "1"
  34. )
  35. const (
  36. _appid = "infra.discovery"
  37. )
  38. var (
  39. _ naming.Builder = &Discovery{}
  40. _ naming.Registry = &Discovery{}
  41. _selfDiscoveryFeatrue feature.Feature = "discovery.self"
  42. _discoveryFeatures = map[feature.Feature]feature.Spec{
  43. _selfDiscoveryFeatrue: {Default: false},
  44. }
  45. // ErrDuplication duplication treeid.
  46. ErrDuplication = errors.New("discovery: instance duplicate registration")
  47. )
  48. func init() {
  49. feature.DefaultGate.Add(_discoveryFeatures)
  50. }
  51. // Config discovery configures.
  52. type Config struct {
  53. Nodes []string
  54. Key string
  55. Secret string
  56. Region string
  57. Zone string
  58. Env string
  59. Host string
  60. }
  61. type appData struct {
  62. ZoneInstances map[string][]*naming.Instance `json:"zone_instances"`
  63. LastTs int64 `json:"latest_timestamp"`
  64. }
  65. // Discovery is discovery client.
  66. type Discovery struct {
  67. once sync.Once
  68. conf *Config
  69. ctx context.Context
  70. cancelFunc context.CancelFunc
  71. httpClient *bm.Client
  72. mutex sync.RWMutex
  73. apps map[string]*appInfo
  74. registry map[string]struct{}
  75. lastHost string
  76. cancelPolls context.CancelFunc
  77. idx uint64
  78. node atomic.Value
  79. delete chan *appInfo
  80. }
  81. type appInfo struct {
  82. zoneIns atomic.Value
  83. resolver map[*Resolver]struct{}
  84. lastTs int64 // latest timestamp
  85. }
  86. func fixConfig(c *Config) {
  87. if len(c.Nodes) == 0 {
  88. c.Nodes = []string{"api.bilibili.co"}
  89. }
  90. if env.Region != "" {
  91. c.Region = env.Region
  92. }
  93. if env.Zone != "" {
  94. c.Zone = env.Zone
  95. }
  96. if env.DeployEnv != "" {
  97. c.Env = env.DeployEnv
  98. }
  99. if env.Hostname != "" {
  100. c.Host = env.Hostname
  101. } else {
  102. c.Host, _ = os.Hostname()
  103. }
  104. }
  105. var (
  106. once sync.Once
  107. _defaultDiscovery *Discovery
  108. )
  109. func initDefault() {
  110. once.Do(func() {
  111. _defaultDiscovery = New(nil)
  112. })
  113. }
  114. // Builder return default discvoery resolver builder.
  115. func Builder() naming.Builder {
  116. if _defaultDiscovery == nil {
  117. initDefault()
  118. }
  119. return _defaultDiscovery
  120. }
  121. // Build register resolver into default discovery.
  122. func Build(id string) naming.Resolver {
  123. if _defaultDiscovery == nil {
  124. initDefault()
  125. }
  126. return _defaultDiscovery.Build(id)
  127. }
  128. // New new a discovery client.
  129. func New(c *Config) (d *Discovery) {
  130. if c == nil {
  131. c = &Config{
  132. Nodes: []string{"discovery.bilibili.co", "api.bilibili.co"},
  133. Key: "discovery",
  134. Secret: "discovery",
  135. }
  136. }
  137. fixConfig(c)
  138. ctx, cancel := context.WithCancel(context.Background())
  139. d = &Discovery{
  140. ctx: ctx,
  141. cancelFunc: cancel,
  142. conf: c,
  143. apps: map[string]*appInfo{},
  144. registry: map[string]struct{}{},
  145. delete: make(chan *appInfo, 10),
  146. }
  147. // httpClient
  148. cfg := &bm.ClientConfig{
  149. App: &bm.App{
  150. Key: c.Key,
  151. Secret: c.Secret,
  152. },
  153. Dial: xtime.Duration(3 * time.Second),
  154. Timeout: xtime.Duration(40 * time.Second),
  155. Breaker: &breaker.Config{
  156. Window: 100,
  157. Sleep: 3,
  158. Bucket: 10,
  159. Ratio: 0.5,
  160. Request: 100,
  161. },
  162. }
  163. d.httpClient = bm.NewClient(cfg)
  164. if feature.DefaultGate.Enabled(_selfDiscoveryFeatrue) {
  165. resolver := d.Build(_appid)
  166. event := resolver.Watch()
  167. _, ok := <-event
  168. if !ok {
  169. panic("discovery watch failed")
  170. }
  171. ins, ok := resolver.Fetch(context.Background())
  172. if ok {
  173. d.newSelf(ins)
  174. }
  175. go d.selfproc(resolver, event)
  176. }
  177. return
  178. }
  179. func (d *Discovery) selfproc(resolver naming.Resolver, event <-chan struct{}) {
  180. for {
  181. _, ok := <-event
  182. if !ok {
  183. return
  184. }
  185. zones, ok := resolver.Fetch(context.Background())
  186. if ok {
  187. d.newSelf(zones)
  188. }
  189. }
  190. }
  191. func (d *Discovery) newSelf(zones map[string][]*naming.Instance) {
  192. ins, ok := zones[d.conf.Zone]
  193. if !ok {
  194. return
  195. }
  196. var nodes []string
  197. for _, in := range ins {
  198. for _, addr := range in.Addrs {
  199. u, err := url.Parse(addr)
  200. if err == nil && u.Scheme == "http" {
  201. nodes = append(nodes, u.Host)
  202. }
  203. }
  204. }
  205. // diff old nodes
  206. olds, ok := d.node.Load().([]string)
  207. if ok {
  208. var diff int
  209. for _, n := range nodes {
  210. for _, o := range olds {
  211. if o == n {
  212. diff++
  213. break
  214. }
  215. }
  216. }
  217. if len(nodes) == diff {
  218. return
  219. }
  220. }
  221. // FIXME: we should use rand.Shuffle() in golang 1.10
  222. Shuffle(len(nodes), func(i, j int) {
  223. nodes[i], nodes[j] = nodes[j], nodes[i]
  224. })
  225. d.node.Store(nodes)
  226. }
  227. // Build disovery resovler builder.
  228. func (d *Discovery) Build(appid string) naming.Resolver {
  229. r := &Resolver{
  230. id: appid,
  231. d: d,
  232. event: make(chan struct{}, 1),
  233. }
  234. d.mutex.Lock()
  235. app, ok := d.apps[appid]
  236. if !ok {
  237. app = &appInfo{
  238. resolver: make(map[*Resolver]struct{}),
  239. }
  240. d.apps[appid] = app
  241. cancel := d.cancelPolls
  242. if cancel != nil {
  243. cancel()
  244. }
  245. }
  246. app.resolver[r] = struct{}{}
  247. d.mutex.Unlock()
  248. if ok {
  249. select {
  250. case r.event <- struct{}{}:
  251. default:
  252. }
  253. }
  254. log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok)
  255. d.once.Do(func() {
  256. go d.serverproc()
  257. })
  258. return r
  259. }
  260. // Scheme return discovery's scheme
  261. func (d *Discovery) Scheme() string {
  262. return "discovery"
  263. }
  264. // Resolver discveory resolver.
  265. type Resolver struct {
  266. id string
  267. event chan struct{}
  268. d *Discovery
  269. }
  270. // Watch watch instance.
  271. func (r *Resolver) Watch() <-chan struct{} {
  272. return r.event
  273. }
  274. // Fetch fetch resolver instance.
  275. func (r *Resolver) Fetch(c context.Context) (ins map[string][]*naming.Instance, ok bool) {
  276. r.d.mutex.RLock()
  277. app, ok := r.d.apps[r.id]
  278. r.d.mutex.RUnlock()
  279. if ok {
  280. ins, ok = app.zoneIns.Load().(map[string][]*naming.Instance)
  281. return
  282. }
  283. return
  284. }
  285. // Close close resolver.
  286. func (r *Resolver) Close() error {
  287. r.d.mutex.Lock()
  288. if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 {
  289. delete(app.resolver, r)
  290. // TODO: delete app from builder
  291. }
  292. r.d.mutex.Unlock()
  293. return nil
  294. }
  295. func (d *Discovery) pickNode() string {
  296. nodes, ok := d.node.Load().([]string)
  297. if !ok || len(nodes) == 0 {
  298. return d.conf.Nodes[d.idx%uint64(len(d.conf.Nodes))]
  299. }
  300. return nodes[d.idx%uint64(len(nodes))]
  301. }
  302. func (d *Discovery) switchNode() {
  303. atomic.AddUint64(&d.idx, 1)
  304. }
  305. // Reload reload the config
  306. func (d *Discovery) Reload(c *Config) {
  307. fixConfig(c)
  308. d.mutex.Lock()
  309. d.conf = c
  310. d.mutex.Unlock()
  311. }
  312. // Close stop all running process including discovery and register
  313. func (d *Discovery) Close() error {
  314. d.cancelFunc()
  315. return nil
  316. }
  317. // Register Register an instance with discovery and renew automatically
  318. func (d *Discovery) Register(c context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) {
  319. d.mutex.Lock()
  320. if _, ok := d.registry[ins.AppID]; ok {
  321. err = ErrDuplication
  322. } else {
  323. d.registry[ins.AppID] = struct{}{}
  324. }
  325. d.mutex.Unlock()
  326. if err != nil {
  327. return
  328. }
  329. if err = d.register(c, ins); err != nil {
  330. d.mutex.Lock()
  331. delete(d.registry, ins.AppID)
  332. d.mutex.Unlock()
  333. return
  334. }
  335. ctx, cancel := context.WithCancel(d.ctx)
  336. ch := make(chan struct{}, 1)
  337. cancelFunc = context.CancelFunc(func() {
  338. cancel()
  339. <-ch
  340. })
  341. go func() {
  342. ticker := time.NewTicker(_registerGap)
  343. defer ticker.Stop()
  344. for {
  345. select {
  346. case <-ticker.C:
  347. if err := d.renew(ctx, ins); err != nil && ecode.NothingFound.Equal(err) {
  348. d.register(ctx, ins)
  349. }
  350. case <-ctx.Done():
  351. d.cancel(ins)
  352. ch <- struct{}{}
  353. return
  354. }
  355. }
  356. }()
  357. return
  358. }
  359. // Set set ins status and metadata.
  360. func (d *Discovery) Set(ins *naming.Instance) error {
  361. return d.set(context.Background(), ins)
  362. }
  363. // cancel Remove the registered instance from discovery
  364. func (d *Discovery) cancel(ins *naming.Instance) (err error) {
  365. d.mutex.RLock()
  366. conf := d.conf
  367. d.mutex.RUnlock()
  368. res := new(struct {
  369. Code int `json:"code"`
  370. Message string `json:"message"`
  371. })
  372. uri := fmt.Sprintf(_cancelURL, d.pickNode())
  373. params := d.newParams(conf)
  374. params.Set("appid", ins.AppID)
  375. // request
  376. if err = d.httpClient.Post(context.Background(), uri, "", params, &res); err != nil {
  377. d.switchNode()
  378. log.Error("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
  379. uri, conf.Env, ins.AppID, conf.Host, err)
  380. return
  381. }
  382. if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
  383. log.Warn("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
  384. uri, conf.Env, ins.AppID, conf.Host, res.Code)
  385. err = ec
  386. return
  387. }
  388. log.Info("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) success",
  389. uri, conf.Env, ins.AppID, conf.Host)
  390. return
  391. }
  392. // register Register an instance with discovery
  393. func (d *Discovery) register(ctx context.Context, ins *naming.Instance) (err error) {
  394. d.mutex.RLock()
  395. conf := d.conf
  396. d.mutex.RUnlock()
  397. var metadata []byte
  398. if ins.Metadata != nil {
  399. if metadata, err = json.Marshal(ins.Metadata); err != nil {
  400. log.Error("discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err)
  401. }
  402. }
  403. res := new(struct {
  404. Code int `json:"code"`
  405. Message string `json:"message"`
  406. })
  407. uri := fmt.Sprintf(_registerURL, d.pickNode())
  408. params := d.newParams(conf)
  409. params.Set("appid", ins.AppID)
  410. params.Set("addrs", strings.Join(ins.Addrs, ","))
  411. params.Set("version", ins.Version)
  412. params.Set("status", _statusUP)
  413. params.Set("metadata", string(metadata))
  414. if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
  415. d.switchNode()
  416. log.Error("discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)",
  417. uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err)
  418. return
  419. }
  420. if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
  421. log.Warn("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)",
  422. uri, conf.Env, ins.AppID, ins.Addrs, res.Code)
  423. err = ec
  424. return
  425. }
  426. log.Info("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success",
  427. uri, conf.Env, ins.AppID, ins.Addrs)
  428. return
  429. }
  430. // rset set instance info with discovery
  431. func (d *Discovery) set(ctx context.Context, ins *naming.Instance) (err error) {
  432. d.mutex.RLock()
  433. conf := d.conf
  434. d.mutex.RUnlock()
  435. res := new(struct {
  436. Code int `json:"code"`
  437. Message string `json:"message"`
  438. })
  439. uri := fmt.Sprintf(_setURL, d.pickNode())
  440. params := d.newParams(conf)
  441. params.Set("appid", ins.AppID)
  442. params.Set("version", ins.Version)
  443. params.Set("status", strconv.FormatInt(ins.Status, 10))
  444. if ins.Metadata != nil {
  445. var metadata []byte
  446. if metadata, err = json.Marshal(ins.Metadata); err != nil {
  447. log.Error("discovery:set instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err)
  448. }
  449. params.Set("metadata", string(metadata))
  450. }
  451. if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
  452. d.switchNode()
  453. log.Error("discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)",
  454. uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err)
  455. return
  456. }
  457. if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
  458. log.Warn("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)",
  459. uri, conf.Env, ins.AppID, ins.Addrs, res.Code)
  460. err = ec
  461. return
  462. }
  463. log.Info("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%s) success",
  464. uri+"?"+params.Encode(), conf.Env, ins.AppID, ins.Addrs)
  465. return
  466. }
  467. // renew Renew an instance with discovery
  468. func (d *Discovery) renew(ctx context.Context, ins *naming.Instance) (err error) {
  469. d.mutex.RLock()
  470. conf := d.conf
  471. d.mutex.RUnlock()
  472. res := new(struct {
  473. Code int `json:"code"`
  474. Message string `json:"message"`
  475. })
  476. uri := fmt.Sprintf(_renewURL, d.pickNode())
  477. params := d.newParams(conf)
  478. params.Set("appid", ins.AppID)
  479. if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
  480. d.switchNode()
  481. log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
  482. uri, conf.Env, ins.AppID, conf.Host, err)
  483. return
  484. }
  485. if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
  486. err = ec
  487. if ec.Equal(ecode.NothingFound) {
  488. return
  489. }
  490. log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
  491. uri, conf.Env, ins.AppID, conf.Host, res.Code)
  492. return
  493. }
  494. return
  495. }
  496. func (d *Discovery) serverproc() {
  497. var (
  498. retry int
  499. update bool
  500. ctx context.Context
  501. cancel context.CancelFunc
  502. )
  503. bc := netutil.DefaultBackoffConfig
  504. ticker := time.NewTicker(time.Minute * 30)
  505. defer ticker.Stop()
  506. for {
  507. if ctx == nil {
  508. ctx, cancel = context.WithCancel(d.ctx)
  509. d.mutex.Lock()
  510. d.cancelPolls = cancel
  511. d.mutex.Unlock()
  512. }
  513. select {
  514. case <-d.ctx.Done():
  515. return
  516. case <-ticker.C:
  517. update = true
  518. default:
  519. }
  520. if !feature.DefaultGate.Enabled(_selfDiscoveryFeatrue) {
  521. nodes, ok := d.node.Load().([]string)
  522. if !ok || len(nodes) == 0 || update {
  523. update = false
  524. tnodes := d.nodes()
  525. if len(tnodes) == 0 {
  526. time.Sleep(bc.Backoff(retry))
  527. retry++
  528. continue
  529. }
  530. retry = 0
  531. // FIXME: we should use rand.Shuffle() in golang 1.10
  532. Shuffle(len(tnodes), func(i, j int) {
  533. tnodes[i], tnodes[j] = tnodes[j], tnodes[i]
  534. })
  535. d.node.Store(tnodes)
  536. }
  537. }
  538. apps, err := d.polls(ctx, d.pickNode())
  539. if err != nil {
  540. d.switchNode()
  541. if ctx.Err() == context.Canceled {
  542. ctx = nil
  543. continue
  544. }
  545. time.Sleep(bc.Backoff(retry))
  546. retry++
  547. continue
  548. }
  549. retry = 0
  550. d.broadcast(apps)
  551. }
  552. }
  553. func (d *Discovery) nodes() (nodes []string) {
  554. res := new(struct {
  555. Code int `json:"code"`
  556. Data []struct {
  557. Addr string `json:"addr"`
  558. } `json:"data"`
  559. })
  560. uri := fmt.Sprintf(_nodesURL, d.pickNode())
  561. if err := d.httpClient.Get(d.ctx, uri, "", nil, res); err != nil {
  562. d.switchNode()
  563. log.Error("discovery: consumer client.Get(%v)error(%+v)", uri, err)
  564. return
  565. }
  566. if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
  567. log.Error("discovery: consumer client.Get(%v) error(%v)", uri, res.Code)
  568. return
  569. }
  570. if len(res.Data) == 0 {
  571. log.Warn("discovery: get nodes(%s) failed,no nodes found!", uri)
  572. return
  573. }
  574. nodes = make([]string, 0, len(res.Data))
  575. for i := range res.Data {
  576. nodes = append(nodes, res.Data[i].Addr)
  577. }
  578. return
  579. }
  580. func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]appData, err error) {
  581. var (
  582. lastTs []int64
  583. appid []string
  584. changed bool
  585. )
  586. if host != d.lastHost {
  587. d.lastHost = host
  588. changed = true
  589. }
  590. d.mutex.RLock()
  591. conf := d.conf
  592. for k, v := range d.apps {
  593. if changed {
  594. v.lastTs = 0
  595. }
  596. appid = append(appid, k)
  597. lastTs = append(lastTs, v.lastTs)
  598. }
  599. d.mutex.RUnlock()
  600. if len(appid) == 0 {
  601. return
  602. }
  603. uri := fmt.Sprintf(_pollURL, host)
  604. res := new(struct {
  605. Code int `json:"code"`
  606. Data map[string]appData `json:"data"`
  607. })
  608. params := url.Values{}
  609. params.Set("env", conf.Env)
  610. params.Set("hostname", conf.Host)
  611. params.Set("appid", strings.Join(appid, ","))
  612. params.Set("latest_timestamp", xstr.JoinInts(lastTs))
  613. if err = d.httpClient.Get(ctx, uri, "", params, res); err != nil {
  614. log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err)
  615. return
  616. }
  617. if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
  618. if !ec.Equal(ecode.NotModified) {
  619. log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code)
  620. err = ec
  621. }
  622. return
  623. }
  624. info, _ := json.Marshal(res.Data)
  625. for _, app := range res.Data {
  626. if app.LastTs == 0 {
  627. err = ecode.ServerErr
  628. log.Error("discovery: client.Get(%s) latest_timestamp is 0,instances:(%s)", uri+"?"+params.Encode(), info)
  629. return
  630. }
  631. }
  632. log.Info("discovery: polls uri(%s)", uri+"?"+params.Encode())
  633. log.Info("discovery: successfully polls(%s) instances (%s)", uri+"?"+params.Encode(), info)
  634. apps = res.Data
  635. return
  636. }
  637. func (d *Discovery) broadcast(apps map[string]appData) {
  638. for id, v := range apps {
  639. var count int
  640. for zone, ins := range v.ZoneInstances {
  641. if len(ins) == 0 {
  642. delete(v.ZoneInstances, zone)
  643. }
  644. count += len(ins)
  645. }
  646. if count == 0 {
  647. continue
  648. }
  649. d.mutex.RLock()
  650. app, ok := d.apps[id]
  651. d.mutex.RUnlock()
  652. if ok {
  653. app.lastTs = v.LastTs
  654. app.zoneIns.Store(v.ZoneInstances)
  655. d.mutex.RLock()
  656. for rs := range app.resolver {
  657. select {
  658. case rs.event <- struct{}{}:
  659. default:
  660. }
  661. }
  662. d.mutex.RUnlock()
  663. }
  664. }
  665. }
  666. func (d *Discovery) newParams(conf *Config) url.Values {
  667. params := url.Values{}
  668. params.Set("region", conf.Region)
  669. params.Set("zone", conf.Zone)
  670. params.Set("env", conf.Env)
  671. params.Set("hostname", conf.Host)
  672. return params
  673. }