123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- package tcpcollect
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/binary"
- "net"
- "strconv"
- "sync"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes/duration"
- "github.com/golang/protobuf/ptypes/timestamp"
- "go-common/app/service/main/dapper/conf"
- "go-common/app/service/main/dapper/model"
- "go-common/app/service/main/dapper/pkg/process"
- "go-common/library/log"
- protogen "go-common/library/net/trace/proto"
- "go-common/library/stat/counter"
- "go-common/library/stat/prom"
- )
- var (
- collectCount = prom.New().WithCounter("dapper_collect_count", []string{"remote_host"})
- collectErrCount = prom.New().WithCounter("dapper_collect_err_count", []string{"remote_host"})
- )
- const (
- _magicSize = 2
- _headerSize = 6
- )
- var (
- _magicBuf = []byte{0xAC, 0xBE}
- _separator = []byte("\001")
- )
- // ClientStatus agent client status
- type ClientStatus struct {
- Addr string
- Counter counter.Counter
- ErrorCounter counter.Counter
- UpTime int64
- }
- func (c *ClientStatus) incr(iserr bool) {
- if iserr {
- collectErrCount.Incr(c.ClientHost())
- }
- collectCount.Incr(c.ClientHost())
- c.Counter.Add(1)
- }
- // ClientHost extract from client addr
- func (c *ClientStatus) ClientHost() string {
- host, _, _ := net.SplitHostPort(c.Addr)
- return host
- }
- // TCPCollect tcp server.
- type TCPCollect struct {
- cfg *conf.Collect
- lis net.Listener
- clientMap map[string]*ClientStatus
- rmx sync.RWMutex
- ps []process.Processer
- }
- // New tcp server.
- func New(cfg *conf.Collect) *TCPCollect {
- svr := &TCPCollect{
- cfg: cfg,
- clientMap: make(map[string]*ClientStatus),
- }
- return svr
- }
- // RegisterProcess implement process.Processer
- func (s *TCPCollect) RegisterProcess(p process.Processer) {
- s.ps = append(s.ps, p)
- }
- func (s *TCPCollect) addClient(cs *ClientStatus) {
- s.rmx.Lock()
- defer s.rmx.Unlock()
- s.clientMap[cs.Addr] = cs
- }
- func (s *TCPCollect) removeClient(cs *ClientStatus) {
- s.rmx.Lock()
- defer s.rmx.Unlock()
- delete(s.clientMap, cs.Addr)
- }
- // ClientStatus ClientStatus
- func (s *TCPCollect) ClientStatus() []*ClientStatus {
- s.rmx.RLock()
- defer s.rmx.RUnlock()
- css := make([]*ClientStatus, 0, len(s.clientMap))
- for _, cs := range s.clientMap {
- css = append(css, cs)
- }
- return css
- }
- // Start tcp server.
- func (s *TCPCollect) Start() error {
- var err error
- if s.lis, err = net.Listen(s.cfg.Network, s.cfg.Addr); err != nil {
- return err
- }
- go func() {
- for {
- conn, err := s.lis.Accept()
- if err != nil {
- if netE, ok := err.(net.Error); ok && netE.Temporary() {
- log.Error("l.Accept() error(%v)", err)
- time.Sleep(time.Second)
- continue
- }
- return
- }
- go s.serveConn(conn)
- }
- }()
- log.Info("tcp server start addr:%s@%s", s.cfg.Network, s.cfg.Addr)
- return nil
- }
- // Close tcp server.
- func (s *TCPCollect) Close() error {
- return s.lis.Close()
- }
- func (s *TCPCollect) serveConn(conn net.Conn) {
- log.Info("serverConn remoteIP:%s", conn.RemoteAddr().String())
- cs := &ClientStatus{
- Addr: conn.RemoteAddr().String(),
- Counter: counter.NewRolling(time.Second, 100),
- ErrorCounter: counter.NewGauge(),
- UpTime: time.Now().Unix(),
- }
- s.addClient(cs)
- defer conn.Close()
- defer s.removeClient(cs)
- rd := bufio.NewReaderSize(conn, 65536)
- for {
- buf, err := s.tailPacket(rd)
- if err != nil {
- log.Error("s.tailPacket() remoteIP:%s error(%v)", conn.RemoteAddr().String(), err)
- cs.incr(true)
- return
- }
- if len(buf) == 0 {
- log.Error("s.tailPacket() is empty")
- cs.incr(true)
- continue
- }
- data := buf
- fields := bytes.Split(buf, _separator)
- if len(fields) >= 16 {
- if data, err = s.legacySpan(fields[2:]); err != nil {
- log.Error("convert legacy span error: %s", err)
- continue
- }
- }
- protoSpan := new(protogen.Span)
- if err = proto.Unmarshal(data, protoSpan); err != nil {
- log.Error("unmarshal data %s error: %s", err, data)
- continue
- }
- for _, p := range s.ps {
- if pe := p.Process(context.Background(), (*model.ProtoSpan)(protoSpan)); pe != nil {
- log.Error("process span %s error: %s", protoSpan, err)
- }
- }
- cs.incr(err != nil)
- }
- }
- func (s *TCPCollect) tailPacket(rr *bufio.Reader) (res []byte, err error) {
- var buf []byte
- // peek magic
- for {
- if buf, err = rr.Peek(_magicSize); err != nil {
- return
- }
- if bytes.Equal(buf, _magicBuf) {
- break
- }
- rr.Discard(1)
- }
- // peek length
- if buf, err = rr.Peek(_headerSize); err != nil {
- return
- }
- // peek body
- packetLen := int(binary.BigEndian.Uint32(buf[_magicSize:_headerSize]))
- if buf, err = rr.Peek(_headerSize + packetLen); err != nil {
- return
- }
- res = buf[_headerSize+_magicSize:]
- rr.Discard(packetLen + _headerSize)
- return
- }
- // startTime/endTime/traceID/spanID/parentID/event/level/class/sample/address/family/title/comment/caller/error
- func (s *TCPCollect) legacySpan(fields [][]byte) ([]byte, error) {
- startAt, _ := strconv.ParseInt(string(fields[0]), 10, 64)
- finishAt, _ := strconv.ParseInt(string(fields[1]), 10, 64)
- traceID, _ := strconv.ParseUint(string(fields[2]), 10, 64)
- spanID, _ := strconv.ParseUint(string(fields[3]), 10, 64)
- parentID, _ := strconv.ParseUint(string(fields[4]), 10, 64)
- event, _ := strconv.Atoi(string(fields[5]))
- start := 8
- if len(fields) == 14 {
- start = 7
- }
- address := string(fields[start+1])
- family := string(fields[start+2])
- title := string(fields[start+3])
- comment := string(fields[start+4])
- caller := string(fields[start+5])
- errMsg := string(fields[start+6])
- span := &protogen.Span{Version: 2}
- span.ServiceName = family
- span.OperationName = title
- span.Caller = caller
- span.TraceId = traceID
- span.SpanId = spanID
- span.ParentId = parentID
- span.StartTime = ×tamp.Timestamp{
- Seconds: startAt / int64(time.Second),
- Nanos: int32(startAt % int64(time.Second)),
- }
- d := finishAt - startAt
- span.Duration = &duration.Duration{
- Seconds: d / int64(time.Second),
- Nanos: int32(d % int64(time.Second)),
- }
- if event == 0 {
- span.Tags = append(span.Tags, &protogen.Tag{Key: "span.kind", Kind: protogen.Tag_STRING, Value: []byte("client")})
- } else {
- span.Tags = append(span.Tags, &protogen.Tag{Key: "span.kind", Kind: protogen.Tag_STRING, Value: []byte("server")})
- }
- span.Tags = append(span.Tags, &protogen.Tag{Key: "legacy.address", Kind: protogen.Tag_STRING, Value: []byte(address)})
- span.Tags = append(span.Tags, &protogen.Tag{Key: "legacy.comment", Kind: protogen.Tag_STRING, Value: []byte(comment)})
- if errMsg != "" {
- span.Logs = append(span.Logs, &protogen.Log{Key: "legacy.error", Fields: []*protogen.Field{&protogen.Field{Key: "error", Value: []byte(errMsg)}}})
- }
- return proto.Marshal(span)
- }
|