server.go 6.5 KB


  1. package tcpcollect
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/binary"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "github.com/golang/protobuf/proto"
  12. "github.com/golang/protobuf/ptypes/duration"
  13. "github.com/golang/protobuf/ptypes/timestamp"
  14. "go-common/app/service/main/dapper/conf"
  15. "go-common/app/service/main/dapper/model"
  16. "go-common/app/service/main/dapper/pkg/process"
  17. "go-common/library/log"
  18. protogen "go-common/library/net/trace/proto"
  19. "go-common/library/stat/counter"
  20. "go-common/library/stat/prom"
  21. )
  22. var (
  23. collectCount = prom.New().WithCounter("dapper_collect_count", []string{"remote_host"})
  24. collectErrCount = prom.New().WithCounter("dapper_collect_err_count", []string{"remote_host"})
  25. )
  26. const (
  27. _magicSize = 2
  28. _headerSize = 6
  29. )
  30. var (
  31. _magicBuf = []byte{0xAC, 0xBE}
  32. _separator = []byte("\001")
  33. )
  34. // ClientStatus agent client status
  35. type ClientStatus struct {
  36. Addr string
  37. Counter counter.Counter
  38. ErrorCounter counter.Counter
  39. UpTime int64
  40. }
  41. func (c *ClientStatus) incr(iserr bool) {
  42. if iserr {
  43. collectErrCount.Incr(c.ClientHost())
  44. }
  45. collectCount.Incr(c.ClientHost())
  46. c.Counter.Add(1)
  47. }
  48. // ClientHost extract from client addr
  49. func (c *ClientStatus) ClientHost() string {
  50. host, _, _ := net.SplitHostPort(c.Addr)
  51. return host
  52. }
  53. // TCPCollect tcp server.
  54. type TCPCollect struct {
  55. cfg *conf.Collect
  56. lis net.Listener
  57. clientMap map[string]*ClientStatus
  58. rmx sync.RWMutex
  59. ps []process.Processer
  60. }
  61. // New tcp server.
  62. func New(cfg *conf.Collect) *TCPCollect {
  63. svr := &TCPCollect{
  64. cfg: cfg,
  65. clientMap: make(map[string]*ClientStatus),
  66. }
  67. return svr
  68. }
  69. // RegisterProcess implement process.Processer
  70. func (s *TCPCollect) RegisterProcess(p process.Processer) {
  71. s.ps = append(s.ps, p)
  72. }
  73. func (s *TCPCollect) addClient(cs *ClientStatus) {
  74. s.rmx.Lock()
  75. defer s.rmx.Unlock()
  76. s.clientMap[cs.Addr] = cs
  77. }
  78. func (s *TCPCollect) removeClient(cs *ClientStatus) {
  79. s.rmx.Lock()
  80. defer s.rmx.Unlock()
  81. delete(s.clientMap, cs.Addr)
  82. }
  83. // ClientStatus ClientStatus
  84. func (s *TCPCollect) ClientStatus() []*ClientStatus {
  85. s.rmx.RLock()
  86. defer s.rmx.RUnlock()
  87. css := make([]*ClientStatus, 0, len(s.clientMap))
  88. for _, cs := range s.clientMap {
  89. css = append(css, cs)
  90. }
  91. return css
  92. }
  93. // Start tcp server.
  94. func (s *TCPCollect) Start() error {
  95. var err error
  96. if s.lis, err = net.Listen(s.cfg.Network, s.cfg.Addr); err != nil {
  97. return err
  98. }
  99. go func() {
  100. for {
  101. conn, err := s.lis.Accept()
  102. if err != nil {
  103. if netE, ok := err.(net.Error); ok && netE.Temporary() {
  104. log.Error("l.Accept() error(%v)", err)
  105. time.Sleep(time.Second)
  106. continue
  107. }
  108. return
  109. }
  110. go s.serveConn(conn)
  111. }
  112. }()
  113. log.Info("tcp server start addr:%s@%s", s.cfg.Network, s.cfg.Addr)
  114. return nil
  115. }
  116. // Close tcp server.
  117. func (s *TCPCollect) Close() error {
  118. return s.lis.Close()
  119. }
  120. func (s *TCPCollect) serveConn(conn net.Conn) {
  121. log.Info("serverConn remoteIP:%s", conn.RemoteAddr().String())
  122. cs := &ClientStatus{
  123. Addr: conn.RemoteAddr().String(),
  124. Counter: counter.NewRolling(time.Second, 100),
  125. ErrorCounter: counter.NewGauge(),
  126. UpTime: time.Now().Unix(),
  127. }
  128. s.addClient(cs)
  129. defer conn.Close()
  130. defer s.removeClient(cs)
  131. rd := bufio.NewReaderSize(conn, 65536)
  132. for {
  133. buf, err := s.tailPacket(rd)
  134. if err != nil {
  135. log.Error("s.tailPacket() remoteIP:%s error(%v)", conn.RemoteAddr().String(), err)
  136. cs.incr(true)
  137. return
  138. }
  139. if len(buf) == 0 {
  140. log.Error("s.tailPacket() is empty")
  141. cs.incr(true)
  142. continue
  143. }
  144. data := buf
  145. fields := bytes.Split(buf, _separator)
  146. if len(fields) >= 16 {
  147. if data, err = s.legacySpan(fields[2:]); err != nil {
  148. log.Error("convert legacy span error: %s", err)
  149. continue
  150. }
  151. }
  152. protoSpan := new(protogen.Span)
  153. if err = proto.Unmarshal(data, protoSpan); err != nil {
  154. log.Error("unmarshal data %s error: %s", err, data)
  155. continue
  156. }
  157. for _, p := range s.ps {
  158. if pe := p.Process(context.Background(), (*model.ProtoSpan)(protoSpan)); pe != nil {
  159. log.Error("process span %s error: %s", protoSpan, err)
  160. }
  161. }
  162. cs.incr(err != nil)
  163. }
  164. }
  165. func (s *TCPCollect) tailPacket(rr *bufio.Reader) (res []byte, err error) {
  166. var buf []byte
  167. // peek magic
  168. for {
  169. if buf, err = rr.Peek(_magicSize); err != nil {
  170. return
  171. }
  172. if bytes.Equal(buf, _magicBuf) {
  173. break
  174. }
  175. rr.Discard(1)
  176. }
  177. // peek length
  178. if buf, err = rr.Peek(_headerSize); err != nil {
  179. return
  180. }
  181. // peek body
  182. packetLen := int(binary.BigEndian.Uint32(buf[_magicSize:_headerSize]))
  183. if buf, err = rr.Peek(_headerSize + packetLen); err != nil {
  184. return
  185. }
  186. res = buf[_headerSize+_magicSize:]
  187. rr.Discard(packetLen + _headerSize)
  188. return
  189. }
  190. // startTime/endTime/traceID/spanID/parentID/event/level/class/sample/address/family/title/comment/caller/error
  191. func (s *TCPCollect) legacySpan(fields [][]byte) ([]byte, error) {
  192. startAt, _ := strconv.ParseInt(string(fields[0]), 10, 64)
  193. finishAt, _ := strconv.ParseInt(string(fields[1]), 10, 64)
  194. traceID, _ := strconv.ParseUint(string(fields[2]), 10, 64)
  195. spanID, _ := strconv.ParseUint(string(fields[3]), 10, 64)
  196. parentID, _ := strconv.ParseUint(string(fields[4]), 10, 64)
  197. event, _ := strconv.Atoi(string(fields[5]))
  198. start := 8
  199. if len(fields) == 14 {
  200. start = 7
  201. }
  202. address := string(fields[start+1])
  203. family := string(fields[start+2])
  204. title := string(fields[start+3])
  205. comment := string(fields[start+4])
  206. caller := string(fields[start+5])
  207. errMsg := string(fields[start+6])
  208. span := &protogen.Span{Version: 2}
  209. span.ServiceName = family
  210. span.OperationName = title
  211. span.Caller = caller
  212. span.TraceId = traceID
  213. span.SpanId = spanID
  214. span.ParentId = parentID
  215. span.StartTime = &timestamp.Timestamp{
  216. Seconds: startAt / int64(time.Second),
  217. Nanos: int32(startAt % int64(time.Second)),
  218. }
  219. d := finishAt - startAt
  220. span.Duration = &duration.Duration{
  221. Seconds: d / int64(time.Second),
  222. Nanos: int32(d % int64(time.Second)),
  223. }
  224. if event == 0 {
  225. span.Tags = append(span.Tags, &protogen.Tag{Key: "span.kind", Kind: protogen.Tag_STRING, Value: []byte("client")})
  226. } else {
  227. span.Tags = append(span.Tags, &protogen.Tag{Key: "span.kind", Kind: protogen.Tag_STRING, Value: []byte("server")})
  228. }
  229. span.Tags = append(span.Tags, &protogen.Tag{Key: "legacy.address", Kind: protogen.Tag_STRING, Value: []byte(address)})
  230. span.Tags = append(span.Tags, &protogen.Tag{Key: "legacy.comment", Kind: protogen.Tag_STRING, Value: []byte(comment)})
  231. if errMsg != "" {
  232. span.Logs = append(span.Logs, &protogen.Log{Key: "legacy.error", Fields: []*protogen.Field{&protogen.Field{Key: "error", Value: []byte(errMsg)}}})
  233. }
  234. return proto.Marshal(span)
  235. }