collector.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package collector
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/service/main/dapper/conf"
  7. "go-common/app/service/main/dapper/dao"
  8. "go-common/app/service/main/dapper/model"
  9. "go-common/app/service/main/dapper/pkg/batchwrite"
  10. "go-common/app/service/main/dapper/pkg/collect"
  11. "go-common/app/service/main/dapper/pkg/collect/kafkacollect"
  12. "go-common/app/service/main/dapper/pkg/collect/tcpcollect"
  13. "go-common/app/service/main/dapper/pkg/pointwrite"
  14. "go-common/library/log"
  15. bm "go-common/library/net/http/blademaster"
  16. )
  17. // Collector dapper collector receving trace data from tcp and write
  18. // to hbase and influxdb
  19. type Collector struct {
  20. daoImpl dao.Dao
  21. cfg *conf.Config
  22. bw batchwrite.BatchWriter
  23. pw pointwrite.PointWriter
  24. process []Processer
  25. clts []collect.Collecter
  26. tcpClt *tcpcollect.TCPCollect
  27. }
  28. func (c *Collector) checkRetention(span *model.Span) error {
  29. if c.cfg.Dapper.RetentionDay != 0 {
  30. retentionSecond := c.cfg.Dapper.RetentionDay * 3600 * 24
  31. if (time.Now().Unix() - span.StartTime.Unix()) > int64(retentionSecond) {
  32. return fmt.Errorf("span beyond retention policy ignored")
  33. }
  34. }
  35. return nil
  36. }
  37. // Process dispatch protoSpan
  38. func (c *Collector) Process(ctx context.Context, protoSpan *model.ProtoSpan) error {
  39. log.V(5).Info("dispatch span form serviceName: %s, operationName: %s", protoSpan.ServiceName, protoSpan.OperationName)
  40. // ignored serviceName empry span
  41. if protoSpan.ServiceName == "" {
  42. log.Warn("span miss servicename ignored")
  43. return nil
  44. }
  45. // fix operationName
  46. if protoSpan.OperationName == "" {
  47. protoSpan.OperationName = "missing operationname !!"
  48. }
  49. if protoSpan.ServiceName == "" {
  50. log.Warn("span miss servicename ignored")
  51. return nil
  52. }
  53. // convert to model.Span
  54. span, err := model.FromProtoSpan(protoSpan, false)
  55. if err != nil {
  56. return err
  57. }
  58. // run process
  59. for _, p := range c.process {
  60. if err := p.Process(span); err != nil {
  61. return err
  62. }
  63. }
  64. if c.writeSamplePoint(span); err != nil {
  65. log.Error("write sample point error: %s", err)
  66. }
  67. if c.writeRawTrace(span); err != nil {
  68. log.Error("write sample point error: %s", err)
  69. }
  70. return nil
  71. }
  72. func (c *Collector) writeSamplePoint(span *model.Span) error {
  73. return c.pw.WriteSpan(span)
  74. }
  75. func (c *Collector) writeRawTrace(span *model.Span) error {
  76. return c.bw.WriteSpan(span)
  77. }
  78. // ListenAndStart listen and start collector server
  79. func (c *Collector) ListenAndStart() error {
  80. tcpClt := tcpcollect.New(c.cfg.Collect)
  81. // NOTE: remove this future
  82. c.tcpClt = tcpClt
  83. c.clts = append(c.clts, tcpClt)
  84. // if KafkaCollect is configured enable kafka collect
  85. if c.cfg.KafkaCollect != nil {
  86. kafkaClt, err := kafkacollect.New(c.cfg.KafkaCollect.Topic, c.cfg.KafkaCollect.Addrs)
  87. if err != nil {
  88. return err
  89. }
  90. c.clts = append(c.clts, kafkaClt)
  91. }
  92. for _, clt := range c.clts {
  93. clt.RegisterProcess(c)
  94. if err := clt.Start(); err != nil {
  95. return err
  96. }
  97. }
  98. if c.cfg.Dapper.APIListen != "" {
  99. c.listenAndStartAPI()
  100. }
  101. return nil
  102. }
  103. func (c *Collector) listenAndStartAPI() {
  104. engine := bm.NewServer(nil)
  105. engine.GET("/x/internal/dapper-collector/client-status", c.clientStatusHandler)
  106. engine.Start()
  107. }
  108. // Close collector
  109. func (c *Collector) Close() error {
  110. for _, clt := range c.clts {
  111. if err := clt.Close(); err != nil {
  112. log.Error("close collect error: %s", err)
  113. }
  114. }
  115. c.bw.Close()
  116. c.pw.Close()
  117. return nil
  118. }
  119. // ClientStatus .
  120. func (c *Collector) clientStatusHandler(bc *bm.Context) {
  121. resp := &model.ClientStatusResp{QueueLen: c.bw.QueueLen()}
  122. for _, cs := range c.tcpClt.ClientStatus() {
  123. resp.Clients = append(resp.Clients, &model.ClientStatus{
  124. Addr: cs.Addr,
  125. UpTime: cs.UpTime,
  126. ErrCount: cs.ErrorCounter.Value(),
  127. Rate: cs.Counter.Value(),
  128. })
  129. }
  130. bc.JSON(resp, nil)
  131. }
  132. // New new dapper collector
  133. func New(cfg *conf.Config) (*Collector, error) {
  134. daoImpl, err := dao.New(cfg)
  135. if err != nil {
  136. return nil, err
  137. }
  138. bw := batchwrite.NewRawDataBatchWriter(
  139. daoImpl.WriteRawTrace,
  140. cfg.BatchWriter.RawBufSize,
  141. cfg.BatchWriter.RawChanSize,
  142. cfg.BatchWriter.RawWorkers,
  143. 0,
  144. )
  145. detectData := make(map[string]map[string]struct{})
  146. serviceNames, err := daoImpl.FetchServiceName(context.Background())
  147. if err != nil {
  148. return nil, nil
  149. }
  150. log.V(10).Info("fetch serviceNames get %d", len(serviceNames))
  151. for _, serviceName := range serviceNames {
  152. operationNames, err := daoImpl.FetchOperationName(context.Background(), serviceName)
  153. if err != nil {
  154. log.Error("fetch operationName for %s error: %s", serviceName, err)
  155. continue
  156. }
  157. log.V(10).Info("fetch operationName for %s get %d", serviceName, len(operationNames))
  158. detectData[serviceName] = make(map[string]struct{})
  159. for _, operationName := range operationNames {
  160. detectData[serviceName][operationName] = struct{}{}
  161. }
  162. }
  163. // TODO configable
  164. pw := pointwrite.New(daoImpl.BatchWriteSpanPoint, 5, 10*time.Second)
  165. // register processer
  166. var process []Processer
  167. process = append(process, NewOperationNameProcess())
  168. // FIXME: breaker not work
  169. process = append(process, NewServiceBreakerProcess(4096))
  170. process = append(process, NewPeerServiceDetectProcesser(detectData))
  171. return &Collector{
  172. cfg: cfg,
  173. daoImpl: daoImpl,
  174. bw: bw,
  175. pw: pw,
  176. process: process,
  177. }, nil
  178. }