event.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package event
  2. import (
  3. "sync"
  4. "time"
  5. "go-common/library/log"
  6. )
  7. var pool sync.Pool
  8. // event between input and processor
  9. type ProcessorEvent struct {
  10. Source string
  11. Destination string
  12. LogId string
  13. AppId []byte
  14. Level []byte
  15. Time time.Time
  16. Body []byte
  17. Priority string
  18. Length int
  19. TimeRangeKey string
  20. Fields map[string]interface{}
  21. ParsedFields map[string]string
  22. Tags []string
  23. }
  24. // GetEvent get event from pool.
  25. func GetEvent() (e *ProcessorEvent) {
  26. var (
  27. ok bool
  28. tmp = pool.Get()
  29. )
  30. if e, ok = tmp.(*ProcessorEvent); !ok {
  31. e = &ProcessorEvent{Body: make([]byte, 1024*64), Tags: make([]string, 0, 1)} // max 64K, should be longer than max log lentth
  32. }
  33. e.LogId = ""
  34. e.Length = 0
  35. e.AppId = nil
  36. e.Level = nil
  37. e.Time = time.Time{}
  38. e.TimeRangeKey = ""
  39. e.Source = ""
  40. e.Priority = ""
  41. e.Destination = ""
  42. e.Tags = e.Tags[:0]
  43. e.Fields = make(map[string]interface{})
  44. e.ParsedFields = make(map[string]string)
  45. return e
  46. }
  47. // PutEvent put event back to pool
  48. func PutEvent(e *ProcessorEvent) {
  49. pool.Put(e)
  50. }
  51. func (e *ProcessorEvent) Bytes() []byte {
  52. return e.Body[:e.Length]
  53. }
  54. func (e *ProcessorEvent) String() string {
  55. return string(e.Body[:e.Length])
  56. }
  57. func (e *ProcessorEvent) Write(b []byte) {
  58. if len(b) > cap(e.Body) {
  59. log.Error("bytes write beyond e.Body capacity")
  60. }
  61. e.Length = copy(e.Body, b)
  62. }