clt.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. // Package cltclient provide fetch and merge data from collector
  2. package cltclient
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "sync"
  9. "go-common/library/sync/errgroup"
  10. )
  11. const (
  12. _jsonMime = "application/json"
  13. )
  14. // ClientStatusResp response clientstatus request just for debug
  15. type ClientStatusResp struct {
  16. QueueLen int `json:"queue_len"`
  17. Clients []*ClientStatus `json:"clients"`
  18. }
  19. // ClientStatus client status
  20. type ClientStatus struct {
  21. Addr string `json:"addr"`
  22. UpTime int64 `json:"up_time"`
  23. ErrCount int64 `json:"err_count"`
  24. Rate int64 `json:"rate"`
  25. }
  26. // CltStatus collector status
  27. type CltStatus struct {
  28. Node string `json:"node"`
  29. QueueLen int `json:"queue_len"`
  30. Clients []*ClientStatus `json:"clients"`
  31. }
  32. // New collector client
  33. func New(nodes []string, httpclient *http.Client) (*Client, error) {
  34. if len(nodes) == 0 {
  35. return nil, fmt.Errorf("no node provided")
  36. }
  37. if httpclient == nil {
  38. httpclient = http.DefaultClient
  39. }
  40. return &Client{nodes: nodes, httpclient: httpclient}, nil
  41. }
  42. // Client collector client
  43. type Client struct {
  44. nodes []string
  45. httpclient *http.Client
  46. }
  47. // Status return all collector status
  48. func (c *Client) Status(ctx context.Context) ([]*CltStatus, error) {
  49. var mx sync.Mutex
  50. var g errgroup.Group
  51. results := make(map[string]*ClientStatusResp)
  52. for _, node := range c.nodes {
  53. node := node // https://golang.org/doc/faq#closures_and_goroutines
  54. g.Go(func() error {
  55. resp, err := c.fetchStatus(ctx, node)
  56. if err != nil {
  57. return err
  58. }
  59. mx.Lock()
  60. results[node] = resp
  61. mx.Unlock()
  62. return nil
  63. })
  64. }
  65. if err := g.Wait(); err != nil {
  66. return nil, err
  67. }
  68. clts := make([]*CltStatus, 0, len(results))
  69. for node, resp := range results {
  70. clts = append(clts, &CltStatus{
  71. Node: node,
  72. QueueLen: resp.QueueLen,
  73. Clients: resp.Clients,
  74. })
  75. }
  76. return clts, nil
  77. }
  78. func (c *Client) fetchStatus(ctx context.Context, node string) (*ClientStatusResp, error) {
  79. var wrapResp struct {
  80. Code int `json:"code"`
  81. Message string `json:"message"`
  82. Data ClientStatusResp `json:"data"`
  83. }
  84. reqURL := "http://" + node + "/x/internal/dapper-collector/client-status"
  85. req, err := http.NewRequest(http.MethodGet, reqURL, nil)
  86. if err != nil {
  87. return nil, err
  88. }
  89. req.Header.Set("Accept", _jsonMime)
  90. req = req.WithContext(ctx)
  91. resp, err := c.httpclient.Do(req)
  92. if err != nil {
  93. return nil, err
  94. }
  95. defer resp.Body.Close()
  96. if resp.StatusCode/100 != 2 {
  97. p := make([]byte, 2048)
  98. n, _ := resp.Body.Read(p)
  99. return nil, fmt.Errorf("request url: %s status code: %d, body: %s", reqURL, resp.StatusCode, p[:n])
  100. }
  101. err = json.NewDecoder(resp.Body).Decode(&wrapResp)
  102. return &wrapResp.Data, err
  103. }