lancer.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package lancerlogstream
  2. import (
  3. "context"
  4. "fmt"
  5. "bytes"
  6. "sync"
  7. "encoding/binary"
  8. "strconv"
  9. "time"
  10. "go-common/app/service/ops/log-agent/event"
  11. "go-common/app/service/ops/log-agent/output"
  12. "go-common/app/service/ops/log-agent/pkg/flowmonitor"
  13. "go-common/app/service/ops/log-agent/pkg/common"
  14. "go-common/app/service/ops/log-agent/output/cache/file"
  15. "go-common/library/log"
  16. "go-common/app/service/ops/log-agent/pkg/lancermonitor"
  17. )
  18. const (
  19. _logLenStart = 2
  20. _logLenEnd = 6
  21. _tokenHeaderFormat = "logId=%s&timestamp=%s&version=1.1"
  22. _protocolLen = 6
  23. _appIdKey = `"app_id":`
  24. _levelKey = `"level":`
  25. _logTime = `"time":`
  26. )
  27. var (
  28. logMagic = []byte{0xAC, 0xBE}
  29. logMagicBuf = []byte{0xAC, 0xBE}
  30. _logType = []byte{0, 1}
  31. _logLength = []byte{0, 0, 0, 0}
  32. local, _ = time.LoadLocation("Local")
  33. )
  34. type logDoc struct {
  35. b []byte
  36. logId string
  37. }
  38. func init() {
  39. err := output.Register("lancer", NewLancer)
  40. if err != nil {
  41. panic(err)
  42. }
  43. }
  44. type Lancer struct {
  45. c *Config
  46. next chan string
  47. i chan *event.ProcessorEvent
  48. cache *file.FileCache
  49. logAggrBuf map[string]*bytes.Buffer
  50. logAggrBufLock sync.Mutex
  51. sendChan chan *logDoc
  52. connPool *connPool
  53. ctx context.Context
  54. cancel context.CancelFunc
  55. }
  56. func NewLancer(ctx context.Context, config interface{}) (output.Output, error) {
  57. var err error
  58. lancer := new(Lancer)
  59. if c, ok := config.(*Config); !ok {
  60. return nil, fmt.Errorf("Error config for Lancer output")
  61. } else {
  62. if err = c.ConfigValidate(); err != nil {
  63. return nil, err
  64. }
  65. lancer.c = c
  66. }
  67. if output.OutputRunning(lancer.c.Name) {
  68. return nil, fmt.Errorf("Output %s already running", lancer.c.Name)
  69. }
  70. lancer.i = make(chan *event.ProcessorEvent)
  71. lancer.next = make(chan string, 1)
  72. lancer.logAggrBuf = make(map[string]*bytes.Buffer)
  73. lancer.sendChan = make(chan *logDoc)
  74. cache, err := file.NewFileCache(lancer.c.CacheConfig)
  75. if err != nil {
  76. return nil, err
  77. }
  78. lancer.cache = cache
  79. lancer.c.PoolConfig.Name = lancer.c.Name
  80. lancer.connPool, err = initConnPool(lancer.c.PoolConfig)
  81. if err != nil {
  82. return nil, err
  83. }
  84. lancer.ctx, lancer.cancel = context.WithCancel(ctx)
  85. return lancer, nil
  86. }
  87. func (l *Lancer) InputChan() (chan *event.ProcessorEvent) {
  88. return l.i
  89. }
  90. func (l *Lancer) Run() (err error) {
  91. go l.writeToCache()
  92. go l.readFromCache()
  93. go l.flushLogAggrPeriodically()
  94. for i := 0; i < l.c.SendConcurrency; i++ {
  95. go l.sendToLancer()
  96. }
  97. output.RegisterOutput(l.c.Name, l)
  98. return nil
  99. }
  100. func (l *Lancer) Stop() {
  101. l.cancel()
  102. }
  103. // writeToCache write the log to cache
  104. func (l *Lancer) writeToCache() {
  105. for e := range l.i {
  106. if e.Length < _logLancerHeaderLen {
  107. event.PutEvent(e)
  108. continue
  109. }
  110. l.cache.WriteToCache(e)
  111. }
  112. }
  113. func (l *Lancer) readFromCache() {
  114. for {
  115. e := l.cache.ReadFromCache()
  116. if e.Length < _logLancerHeaderLen {
  117. event.PutEvent(e)
  118. continue
  119. }
  120. // monitor should be called before event recycle
  121. l.parseOpslog(e)
  122. flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "OK", "write to lancer")
  123. lancermonitor.IncreaseLogCount("agent.send.success.count", e.LogId)
  124. if l.c.Name == "lancer-ops-log" {
  125. l.logAggr(e)
  126. } else {
  127. l.sendLogDirectToLancer(e)
  128. }
  129. }
  130. }
  131. func (l *Lancer) parseOpslog(e *event.ProcessorEvent) {
  132. if l.c.Name == "lancer-ops-log" && e.Length > _logLancerHeaderLen {
  133. logBody := e.Body[(_logLancerHeaderLen):(e.Length)]
  134. e.AppId, _ = common.SeekValue([]byte(_appIdKey), logBody)
  135. if timeValue, err := common.SeekValue([]byte(_logTime), logBody); err == nil {
  136. if len(timeValue) >= 19 {
  137. // parse time
  138. var t time.Time
  139. if t, err = time.Parse(time.RFC3339Nano, string(timeValue)); err != nil {
  140. if t, err = time.ParseInLocation("2006-01-02T15:04:05", string(timeValue), local); err != nil {
  141. if t, err = time.ParseInLocation("2006-01-02T15:04:05", string(timeValue[0:19]), local); err != nil {
  142. }
  143. }
  144. }
  145. if !t.IsZero() {
  146. e.TimeRangeKey = strconv.FormatInt(t.Unix()/100*100, 10)
  147. }
  148. }
  149. }
  150. }
  151. }
  152. // sendLogDirectToLancer send log direct to lancer without aggr
  153. func (l *Lancer) sendLogDirectToLancer(e *event.ProcessorEvent) {
  154. logDoc := new(logDoc)
  155. logDoc.b = make([]byte, e.Length)
  156. copy(logDoc.b, e.Bytes())
  157. logDoc.logId = e.LogId
  158. event.PutEvent(e)
  159. l.sendChan <- logDoc
  160. }
  161. // sendproc send the proc to lancer
  162. func (l *Lancer) sendToLancer() {
  163. logSend := new(bytes.Buffer)
  164. tokenHeaderLen := []byte{0, 0}
  165. for {
  166. select {
  167. case logDoc := <-l.sendChan:
  168. var err error
  169. if len(logDoc.b) == 0 {
  170. continue
  171. }
  172. // header
  173. logSend.Reset()
  174. logSend.Write(logMagicBuf)
  175. logSend.Write(_logLength) // placeholder
  176. logSend.Write(_logType)
  177. // token header
  178. tokenheader := []byte(fmt.Sprintf(_tokenHeaderFormat, logDoc.logId, strconv.FormatInt(time.Now().Unix()/100*100, 10)))
  179. binary.BigEndian.PutUint16(tokenHeaderLen, uint16(len(tokenheader)))
  180. logSend.Write(tokenHeaderLen)
  181. logSend.Write(tokenheader)
  182. // log body
  183. logSend.Write(logDoc.b)
  184. // set log length
  185. bs := logSend.Bytes()
  186. binary.BigEndian.PutUint32(bs[_logLenStart:_logLenEnd], uint32(logSend.Len()-_protocolLen))
  187. // write
  188. connBuf, err := l.connPool.getBufConn()
  189. if err != nil {
  190. flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", "get conn failed")
  191. log.Error("get conn error: %v", err)
  192. continue
  193. }
  194. if _, err = connBuf.write(bs); err != nil {
  195. log.Error("wr.Write(log) error(%v)", err)
  196. connBuf.enabled = false
  197. l.connPool.putBufConn(connBuf)
  198. flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", "write to lancer failed")
  199. continue
  200. }
  201. l.connPool.putBufConn(connBuf)
  202. // TODO: flowmonitor for specific appId
  203. }
  204. }
  205. }