handler.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package monitor
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "time"
  9. "go-common/library/conf/env"
  10. "go-common/library/log"
  11. "go-common/library/net/trace"
  12. xtime "go-common/library/time"
  13. "github.com/json-iterator/go"
  14. )
  15. var json = jsoniter.ConfigCompatibleWithStandardLibrary
  16. const (
  17. // app_id
  18. _appID = "app_id"
  19. // time format
  20. _timeFormat = "2006-01-02T15:04:05.999999"
  21. // log level name: INFO, WARN...
  22. _level = "level"
  23. // log time.
  24. _time = "time"
  25. // uniq ID from trace.
  26. _tid = "traceid"
  27. // default chan size
  28. _defaultChan = 2048
  29. // default agent timeout
  30. _agentTimeout = xtime.Duration(20 * time.Millisecond)
  31. // default merge wait time
  32. _mergeWait = 1 * time.Second
  33. // max buffer size
  34. _maxBuffer = 10 * 1024 * 1024 // 10mb
  35. )
  36. var (
  37. _defaultMonitorConfig = &MonitorConfig{
  38. Proto: "unixgram",
  39. Addr: "/var/run/lancer/collector.sock",
  40. }
  41. _defaultTaskIDs = map[string]string{
  42. env.DeployEnvFat1: "000069",
  43. env.DeployEnvUat: "000069",
  44. env.DeployEnvPre: "000161",
  45. env.DeployEnvProd: "000161",
  46. }
  47. // log separator
  48. _logSeparator = []byte("\u0001")
  49. )
  50. // MonitorConfig agent config.
  51. type MonitorConfig struct {
  52. TaskID string
  53. Buffer int
  54. Proto string `dsn:"network"`
  55. Addr string `dsn:"address"`
  56. Chan int `dsn:"query.chan"`
  57. Timeout xtime.Duration `dsn:"query.timeout"`
  58. }
  59. // MonitorHandler .
  60. type MonitorHandler struct {
  61. c *MonitorConfig
  62. msgs chan map[string]interface{}
  63. waiter sync.WaitGroup
  64. pool sync.Pool
  65. }
  66. // NewMonitor a MonitorHandler.
  67. func NewMonitor(c *MonitorConfig) (a *MonitorHandler) {
  68. if c == nil {
  69. c = _defaultMonitorConfig
  70. }
  71. if c.Buffer == 0 {
  72. c.Buffer = 1
  73. }
  74. if len(c.TaskID) == 0 {
  75. c.TaskID = _defaultTaskIDs[env.DeployEnv]
  76. }
  77. c.Timeout = _agentTimeout
  78. a = &MonitorHandler{c: c}
  79. a.pool.New = func() interface{} {
  80. return make(map[string]interface{}, 20)
  81. }
  82. a.msgs = make(chan map[string]interface{}, _defaultChan)
  83. a.waiter.Add(1)
  84. go a.writeproc()
  85. return
  86. }
  87. // Log log to udp statsd daemon.
  88. func (h *MonitorHandler) Log(ctx context.Context, lv log.Level, appID string, args ...log.D) {
  89. if args == nil {
  90. return
  91. }
  92. d := h.data()
  93. for _, arg := range args {
  94. d[arg.Key] = arg.Value
  95. }
  96. if t, ok := trace.FromContext(ctx); ok {
  97. d[_tid] = fmt.Sprintf("%s", t)
  98. }
  99. d[_appID] = env.AppID + "." + appID
  100. d[_level] = lv.String()
  101. d[_time] = time.Now().Format(_timeFormat)
  102. select {
  103. case h.msgs <- d:
  104. default:
  105. }
  106. }
  107. // writeproc write data into connection.
  108. func (h *MonitorHandler) writeproc() {
  109. var (
  110. buf bytes.Buffer
  111. conn net.Conn
  112. err error
  113. count int
  114. quit bool
  115. )
  116. defer h.waiter.Done()
  117. taskID := []byte(h.c.TaskID)
  118. tick := time.NewTicker(_mergeWait)
  119. enc := json.NewEncoder(&buf)
  120. for {
  121. select {
  122. case d := <-h.msgs:
  123. if d == nil {
  124. quit = true
  125. goto DUMP
  126. }
  127. if buf.Len() >= _maxBuffer {
  128. buf.Reset() // avoid oom
  129. }
  130. now := time.Now()
  131. buf.Write(taskID)
  132. buf.Write([]byte(fmt.Sprintf("%d", now.UnixNano()/1e6)))
  133. enc.Encode(d)
  134. h.free(d)
  135. if count++; count < h.c.Buffer {
  136. buf.Write(_logSeparator)
  137. continue
  138. }
  139. case <-tick.C:
  140. }
  141. if conn == nil || err != nil {
  142. if conn, err = net.DialTimeout(h.c.Proto, h.c.Addr, time.Duration(h.c.Timeout)); err != nil {
  143. log.Error("net.DialTimeout(%s:%s) error(%v)\n", h.c.Proto, h.c.Addr, err)
  144. continue
  145. }
  146. }
  147. DUMP:
  148. if conn != nil && buf.Len() > 0 {
  149. count = 0
  150. if _, err = conn.Write(buf.Bytes()); err != nil {
  151. log.Error("conn.Write(%d bytes) error(%v)\n", buf.Len(), err)
  152. conn.Close()
  153. } else {
  154. // only succeed reset buffer, let conn reconnect.
  155. log.Info("conn Write(%d bytes) data(%v)\n", buf.Len(), string(buf.Bytes()))
  156. buf.Reset()
  157. }
  158. }
  159. if quit {
  160. if conn != nil && err == nil {
  161. conn.Close()
  162. }
  163. return
  164. }
  165. }
  166. }
  167. func (h *MonitorHandler) data() map[string]interface{} {
  168. return h.pool.Get().(map[string]interface{})
  169. }
  170. func (h *MonitorHandler) free(d map[string]interface{}) {
  171. for k := range d {
  172. delete(d, k)
  173. }
  174. h.pool.Put(d)
  175. }
  176. // Close close the connection.
  177. func (h *MonitorHandler) Close() (err error) {
  178. h.msgs <- nil
  179. h.waiter.Wait()
  180. return nil
  181. }
  182. // SetFormat .
  183. func (h *MonitorHandler) SetFormat(string) {
  184. // discard setformat
  185. }
  186. // Info .
  187. func (h *MonitorHandler) Info(ctx context.Context, appID string, args ...log.D) {
  188. h.Log(ctx, log.Level(1), appID, args...)
  189. }
  190. // Warn .
  191. func (h *MonitorHandler) Warn(ctx context.Context, appID string, args ...log.D) {
  192. h.Log(ctx, log.Level(2), appID, args...)
  193. }
  194. // Error .
  195. func (h *MonitorHandler) Error(ctx context.Context, appID string, args ...log.D) {
  196. h.Log(ctx, log.Level(3), appID, args...)
  197. }
  198. // CalCode handle codes.
  199. func (a *Log) CalCode() {
  200. if a.HTTPCode == "" {
  201. return
  202. }
  203. a.Codes = a.HTTPCode
  204. var (
  205. codes Codes
  206. err error
  207. )
  208. if err = json.Unmarshal([]byte(a.HTTPCode), &codes); err != nil {
  209. log.Warn("s.CalCode error(%+v), codes(%s)", err, a.HTTPCode)
  210. return
  211. }
  212. a.HTTPCode = fmt.Sprintf("%v", codes.HTTPCode)
  213. a.BusinessCode = fmt.Sprintf("%v", codes.HTTPBusinessCode)
  214. a.InnerCode = fmt.Sprintf("%v", codes.HTTPInnerCode)
  215. // 电商inner_code 覆盖 business_code
  216. if a.InnerCode != "-1" {
  217. // 电商code 1 转成 0
  218. if a.InnerCode == "1" {
  219. a.BusinessCode = "0"
  220. } else {
  221. a.BusinessCode = a.InnerCode
  222. }
  223. }
  224. if a.BusinessCode == "-1" {
  225. a.BusinessCode = "0"
  226. }
  227. }