aggr.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package lancerlogstream
  2. import (
  3. "bytes"
  4. "time"
  5. "go-common/app/service/ops/log-agent/event"
  6. )
  7. const (
  8. _logSeparator = byte('\u0001')
  9. _logLancerHeaderLen = 19
  10. )
  11. // logAggr aggregates multi logs to one log
  12. func (l *Lancer) logAggr(e *event.ProcessorEvent) (err error) {
  13. logAddrbuf := l.getlogAggrBuf(e.LogId)
  14. l.logAggrBufLock.Lock()
  15. logAddrbuf.Write(e.Bytes())
  16. logAddrbuf.WriteByte(_logSeparator)
  17. l.logAggrBufLock.Unlock()
  18. if logAddrbuf.Len() > l.c.AggrSize {
  19. return l.flushLogAggr(e.LogId)
  20. }
  21. event.PutEvent(e)
  22. return nil
  23. }
  24. // getlogAggrBuf get logAggrBuf by logId
  25. func (l *Lancer) getlogAggrBuf(logId string) (*bytes.Buffer) {
  26. if _, ok := l.logAggrBuf[logId]; !ok {
  27. l.logAggrBuf[logId] = new(bytes.Buffer)
  28. }
  29. return l.logAggrBuf[logId]
  30. }
  31. // flushLogAggr write aggregated logs to conn
  32. func (l *Lancer) flushLogAggr(logId string) (err error) {
  33. l.logAggrBufLock.Lock()
  34. defer l.logAggrBufLock.Unlock()
  35. buf := l.getlogAggrBuf(logId)
  36. if buf.Len() > 0 {
  37. logDoc := new(logDoc)
  38. logDoc.b = make([]byte, buf.Len())
  39. copy(logDoc.b, buf.Bytes())
  40. logDoc.logId = logId
  41. l.sendChan <- logDoc
  42. }
  43. buf.Reset()
  44. return nil
  45. }
  46. // flushLogAggrPeriodically run flushLogAggr Periodically
  47. func (l *Lancer) flushLogAggrPeriodically() {
  48. tick := time.NewTicker(5 * time.Second)
  49. for {
  50. select {
  51. case <-tick.C:
  52. for logid, _ := range l.logAggrBuf {
  53. l.flushLogAggr(logid)
  54. }
  55. }
  56. }
  57. }