agent.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package log
  2. import (
  3. "context"
  4. "fmt"
  5. stdlog "log"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "go-common/library/conf/env"
  11. "go-common/library/log/internal"
  12. "go-common/library/net/metadata"
  13. "go-common/library/net/trace"
  14. xtime "go-common/library/time"
  15. )
  16. const (
  17. _agentTimeout = xtime.Duration(20 * time.Millisecond)
  18. _mergeWait = 1 * time.Second
  19. _maxBuffer = 10 * 1024 * 1024 // 10mb
  20. _defaultChan = 2048
  21. _defaultAgentConfig = "unixpacket:///var/run/lancer/collector_tcp.sock?timeout=100ms&chan=1024"
  22. )
  23. var (
  24. _logSeparator = []byte("\u0001")
  25. _defaultTaskIDs = map[string]string{
  26. env.DeployEnvFat1: "000069",
  27. env.DeployEnvUat: "000069",
  28. env.DeployEnvPre: "000161",
  29. env.DeployEnvProd: "000161",
  30. }
  31. )
  32. // AgentHandler agent struct.
  33. type AgentHandler struct {
  34. c *AgentConfig
  35. msgs chan []core.Field
  36. waiter sync.WaitGroup
  37. pool sync.Pool
  38. enc core.Encoder
  39. batchSend bool
  40. filters map[string]struct{}
  41. }
  42. // AgentConfig agent config.
  43. type AgentConfig struct {
  44. TaskID string
  45. Buffer int
  46. Proto string `dsn:"network"`
  47. Addr string `dsn:"address"`
  48. Chan int `dsn:"query.chan"`
  49. Timeout xtime.Duration `dsn:"query.timeout"`
  50. }
  51. // NewAgent a Agent.
  52. func NewAgent(ac *AgentConfig) (a *AgentHandler) {
  53. if ac == nil {
  54. ac = parseDSN(_agentDSN)
  55. }
  56. if len(ac.TaskID) == 0 {
  57. ac.TaskID = _defaultTaskIDs[env.DeployEnv]
  58. }
  59. a = &AgentHandler{
  60. c: ac,
  61. enc: core.NewJSONEncoder(core.EncoderConfig{
  62. EncodeTime: core.EpochTimeEncoder,
  63. EncodeDuration: core.SecondsDurationEncoder,
  64. }, core.NewBuffer(0)),
  65. }
  66. a.pool.New = func() interface{} {
  67. return make([]core.Field, 0, 16)
  68. }
  69. if ac.Chan == 0 {
  70. ac.Chan = _defaultChan
  71. }
  72. a.msgs = make(chan []core.Field, ac.Chan)
  73. if ac.Timeout == 0 {
  74. ac.Timeout = _agentTimeout
  75. }
  76. if ac.Buffer == 0 {
  77. ac.Buffer = 100
  78. }
  79. a.waiter.Add(1)
  80. // set fixed k/v into enc buffer
  81. KV(_appID, c.Family).AddTo(a.enc)
  82. KV(_deplyEnv, env.DeployEnv).AddTo(a.enc)
  83. KV(_instanceID, c.Host).AddTo(a.enc)
  84. KV(_zone, env.Zone).AddTo(a.enc)
  85. if a.c.Proto == "unixpacket" {
  86. a.batchSend = true
  87. }
  88. go a.writeproc()
  89. return
  90. }
  91. func (h *AgentHandler) data() []core.Field {
  92. return h.pool.Get().([]core.Field)
  93. }
  94. func (h *AgentHandler) free(f []core.Field) {
  95. f = f[0:0]
  96. h.pool.Put(f)
  97. }
  98. // Log log to udp statsd daemon.
  99. func (h *AgentHandler) Log(ctx context.Context, lv Level, args ...D) {
  100. if args == nil {
  101. return
  102. }
  103. f := h.data()
  104. for i := range args {
  105. f = append(f, args[i])
  106. }
  107. if t, ok := trace.FromContext(ctx); ok {
  108. if s, ok := t.(fmt.Stringer); ok {
  109. f = append(f, KV(_tid, s.String()))
  110. } else {
  111. f = append(f, KV(_tid, fmt.Sprintf("%s", t)))
  112. }
  113. }
  114. if caller := metadata.String(ctx, metadata.Caller); caller != "" {
  115. f = append(f, KV(_caller, caller))
  116. }
  117. if color := metadata.String(ctx, metadata.Color); color != "" {
  118. f = append(f, KV(_color, color))
  119. }
  120. if cluster := metadata.String(ctx, metadata.Cluster); cluster != "" {
  121. f = append(f, KV(_cluster, cluster))
  122. }
  123. if metadata.Bool(ctx, metadata.Mirror) {
  124. f = append(f, KV(_mirror, true))
  125. }
  126. select {
  127. case h.msgs <- f:
  128. default:
  129. }
  130. }
  131. // writeproc write data into connection.
  132. func (h *AgentHandler) writeproc() {
  133. var (
  134. conn net.Conn
  135. err error
  136. count int
  137. quit bool
  138. )
  139. buf := core.NewBuffer(2048)
  140. defer h.waiter.Done()
  141. taskID := []byte(h.c.TaskID)
  142. tick := time.NewTicker(_mergeWait)
  143. for {
  144. select {
  145. case d := <-h.msgs:
  146. if d == nil {
  147. quit = true
  148. goto DUMP
  149. }
  150. if buf.Len() >= _maxBuffer {
  151. buf.Reset() // avoid oom
  152. }
  153. now := time.Now()
  154. buf.Write(taskID)
  155. buf.Write([]byte(strconv.FormatInt(now.UnixNano()/1e6, 10)))
  156. h.enc.Encode(buf, d...)
  157. h.free(d)
  158. if h.batchSend {
  159. buf.Write(_logSeparator)
  160. if count++; count < h.c.Buffer && buf.Len() < _maxBuffer {
  161. continue
  162. }
  163. }
  164. case <-tick.C:
  165. }
  166. if conn == nil || err != nil {
  167. if conn, err = net.DialTimeout(h.c.Proto, h.c.Addr, time.Duration(h.c.Timeout)); err != nil {
  168. stdlog.Printf("net.DialTimeout(%s:%s) error(%v)\n", h.c.Proto, h.c.Addr, err)
  169. continue
  170. }
  171. }
  172. DUMP:
  173. if conn != nil && buf.Len() > 0 {
  174. count = 0
  175. if _, err = conn.Write(buf.Bytes()); err != nil {
  176. stdlog.Printf("conn.Write(%d bytes) error(%v)\n", buf.Len(), err)
  177. conn.Close()
  178. } else {
  179. // only succeed reset buffer, let conn reconnect.
  180. buf.Reset()
  181. }
  182. }
  183. if quit {
  184. if conn != nil && err == nil {
  185. conn.Close()
  186. }
  187. return
  188. }
  189. }
  190. }
  191. // Close close the connection.
  192. func (h *AgentHandler) Close() (err error) {
  193. h.msgs <- nil
  194. h.waiter.Wait()
  195. return nil
  196. }
  197. // SetFormat .
  198. func (h *AgentHandler) SetFormat(string) {
  199. // discard setformat
  200. }