agent.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package agent
  2. import (
  3. "fmt"
  4. "io"
  5. "runtime"
  6. "strings"
  7. "time"
  8. "github.com/golang/protobuf/proto"
  9. "go-common/app/service/main/dapper/conf"
  10. "go-common/app/service/main/dapper/pkg/deliver"
  11. "go-common/app/service/main/dapper/pkg/diskqueue"
  12. "go-common/app/service/main/dapper/server/udpcollect"
  13. "go-common/library/log"
  14. spanpb "go-common/library/net/trace/proto"
  15. )
  16. // Agent dapper collect agent
  17. type Agent struct {
  18. queue diskqueue.DiskQueue
  19. clt *udpcollect.UDPCollect
  20. dlv *deliver.Deliver
  21. }
  22. // New agent
  23. func New(cfg *conf.AgentConfig, debug bool) (*Agent, error) {
  24. var options []diskqueue.Option
  25. if cfg.Queue.BucketBytes != 0 {
  26. options = append(options, diskqueue.SetBucketByte(cfg.Queue.BucketBytes))
  27. }
  28. if cfg.Queue.MemBuckets != 0 {
  29. options = append(options, diskqueue.SetDynamicMemBucket(cfg.Queue.MemBuckets))
  30. }
  31. queue, err := diskqueue.New(cfg.Queue.CacheDir, options...)
  32. if err != nil {
  33. return nil, err
  34. }
  35. workers := cfg.UDPCollect.Workers
  36. if workers == 0 {
  37. if runtime.NumCPU() > 4 {
  38. workers = 4
  39. } else {
  40. workers = runtime.NumCPU()
  41. }
  42. }
  43. clt, err := udpcollect.New(cfg.UDPCollect.Addr, workers, queue.Push)
  44. if err != nil {
  45. return nil, fmt.Errorf("new udpcollect error: %s", err)
  46. }
  47. if err := clt.Start(); err != nil {
  48. return nil, err
  49. }
  50. ag := &Agent{queue: queue, clt: clt}
  51. if !debug {
  52. ag.dlv, err = deliver.New(cfg.Servers, ag.BlockReadFn)
  53. } else {
  54. go ag.debugPrinter()
  55. }
  56. return ag, err
  57. }
  58. // Reload config, only support reload servers config
  59. func (a *Agent) Reload(cfg *conf.AgentConfig) error {
  60. dlv, err := deliver.New(cfg.Servers, a.BlockReadFn)
  61. if err != nil {
  62. return err
  63. }
  64. if err := a.dlv.Close(); err != nil {
  65. log.Warn("close old deliver error: %s", err)
  66. }
  67. a.dlv = dlv
  68. return nil
  69. }
  70. // Close agent service
  71. func (a *Agent) Close() error {
  72. var messages []string
  73. if err := a.clt.Close(); err != nil {
  74. messages = append(messages, err.Error())
  75. }
  76. if a.dlv != nil {
  77. if err := a.dlv.Close(); err != nil {
  78. messages = append(messages, err.Error())
  79. }
  80. }
  81. if err := a.queue.Close(); err != nil {
  82. messages = append(messages, err.Error())
  83. }
  84. if len(messages) == 0 {
  85. return nil
  86. }
  87. return fmt.Errorf("close agent error: %s", strings.Join(messages, "\n"))
  88. }
  89. // BlockReadFn wrap queue.Pop block
  90. func (a *Agent) BlockReadFn() ([]byte, error) {
  91. data, err := a.queue.Pop()
  92. if err != io.EOF {
  93. return data, err
  94. }
  95. tick := time.NewTicker(100 * time.Millisecond)
  96. defer tick.Stop()
  97. for {
  98. select {
  99. case <-tick.C:
  100. }
  101. data, err = a.queue.Pop()
  102. if err != io.EOF {
  103. return data, err
  104. }
  105. }
  106. }
  107. func (a *Agent) debugPrinter() {
  108. for {
  109. data, err := a.BlockReadFn()
  110. if err != nil {
  111. log.Error("read data error: %s", err)
  112. continue
  113. }
  114. span := new(spanpb.Span)
  115. if err = proto.Unmarshal(data, span); err != nil {
  116. log.Error("unmarshal error: %s, data: %s", err, data)
  117. continue
  118. }
  119. fmt.Printf("received span: %s\n", span)
  120. var kind bool
  121. var component bool
  122. for _, tag := range span.Tags {
  123. if tag.Key == "span.kind" {
  124. kind = true
  125. }
  126. if tag.Key == "component" {
  127. component = true
  128. }
  129. }
  130. if !kind {
  131. fmt.Printf("tag span.kind missing, Either \"client\" or \"server\" for the appropriate roles in an RPC, and \"producer\" or \"consumer\" for the appropriate roles in a messaging scenario.")
  132. }
  133. if !component {
  134. fmt.Printf("tag component missing, The software package, framework, library, or module that generated the associated Span. E.g., \"grpc\", \"django\", \"JDBI\".")
  135. }
  136. }
  137. }