123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- package agent
- import (
- "fmt"
- "io"
- "runtime"
- "strings"
- "time"
- "github.com/golang/protobuf/proto"
- "go-common/app/service/main/dapper/conf"
- "go-common/app/service/main/dapper/pkg/deliver"
- "go-common/app/service/main/dapper/pkg/diskqueue"
- "go-common/app/service/main/dapper/server/udpcollect"
- "go-common/library/log"
- spanpb "go-common/library/net/trace/proto"
- )
- type Agent struct {
- queue diskqueue.DiskQueue
- clt *udpcollect.UDPCollect
- dlv *deliver.Deliver
- }
- func New(cfg *conf.AgentConfig, debug bool) (*Agent, error) {
- var options []diskqueue.Option
- if cfg.Queue.BucketBytes != 0 {
- options = append(options, diskqueue.SetBucketByte(cfg.Queue.BucketBytes))
- }
- if cfg.Queue.MemBuckets != 0 {
- options = append(options, diskqueue.SetDynamicMemBucket(cfg.Queue.MemBuckets))
- }
- queue, err := diskqueue.New(cfg.Queue.CacheDir, options...)
- if err != nil {
- return nil, err
- }
- workers := cfg.UDPCollect.Workers
- if workers == 0 {
- if runtime.NumCPU() > 4 {
- workers = 4
- } else {
- workers = runtime.NumCPU()
- }
- }
- clt, err := udpcollect.New(cfg.UDPCollect.Addr, workers, queue.Push)
- if err != nil {
- return nil, fmt.Errorf("new udpcollect error: %s", err)
- }
- if err := clt.Start(); err != nil {
- return nil, err
- }
- ag := &Agent{queue: queue, clt: clt}
- if !debug {
- ag.dlv, err = deliver.New(cfg.Servers, ag.BlockReadFn)
- } else {
- go ag.debugPrinter()
- }
- return ag, err
- }
- func (a *Agent) Reload(cfg *conf.AgentConfig) error {
- dlv, err := deliver.New(cfg.Servers, a.BlockReadFn)
- if err != nil {
- return err
- }
- if err := a.dlv.Close(); err != nil {
- log.Warn("close old deliver error: %s", err)
- }
- a.dlv = dlv
- return nil
- }
- func (a *Agent) Close() error {
- var messages []string
- if err := a.clt.Close(); err != nil {
- messages = append(messages, err.Error())
- }
- if a.dlv != nil {
- if err := a.dlv.Close(); err != nil {
- messages = append(messages, err.Error())
- }
- }
- if err := a.queue.Close(); err != nil {
- messages = append(messages, err.Error())
- }
- if len(messages) == 0 {
- return nil
- }
- return fmt.Errorf("close agent error: %s", strings.Join(messages, "\n"))
- }
- func (a *Agent) BlockReadFn() ([]byte, error) {
- data, err := a.queue.Pop()
- if err != io.EOF {
- return data, err
- }
- tick := time.NewTicker(100 * time.Millisecond)
- defer tick.Stop()
- for {
- select {
- case <-tick.C:
- }
- data, err = a.queue.Pop()
- if err != io.EOF {
- return data, err
- }
- }
- }
- func (a *Agent) debugPrinter() {
- for {
- data, err := a.BlockReadFn()
- if err != nil {
- log.Error("read data error: %s", err)
- continue
- }
- span := new(spanpb.Span)
- if err = proto.Unmarshal(data, span); err != nil {
- log.Error("unmarshal error: %s, data: %s", err, data)
- continue
- }
- fmt.Printf("received span: %s\n", span)
- var kind bool
- var component bool
- for _, tag := range span.Tags {
- if tag.Key == "span.kind" {
- kind = true
- }
- if tag.Key == "component" {
- component = true
- }
- }
- if !kind {
- fmt.Printf("tag span.kind missing, Either \"client\" or \"server\" for the appropriate roles in an RPC, and \"producer\" or \"consumer\" for the appropriate roles in a messaging scenario.")
- }
- if !component {
- fmt.Printf("tag component missing, The software package, framework, library, or module that generated the associated Span. E.g., \"grpc\", \"django\", \"JDBI\".")
- }
- }
- }
|