pipeline.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package pipeline
  2. import (
  3. "fmt"
  4. "context"
  5. "go-common/app/service/ops/log-agent/output/lancerlogstream"
  6. "go-common/app/service/ops/log-agent/output/lancergrpc"
  7. "go-common/app/service/ops/log-agent/input/sock"
  8. "go-common/app/service/ops/log-agent/input/file"
  9. "go-common/app/service/ops/log-agent/processor/classify"
  10. "go-common/app/service/ops/log-agent/processor/jsonLog"
  11. "go-common/app/service/ops/log-agent/processor/fileLog"
  12. "go-common/app/service/ops/log-agent/processor/lengthCheck"
  13. "go-common/app/service/ops/log-agent/processor/sample"
  14. "go-common/app/service/ops/log-agent/processor/httpstream"
  15. "go-common/app/service/ops/log-agent/processor/grok"
  16. "go-common/app/service/ops/log-agent/output/stdout"
  17. "go-common/library/log"
  18. "github.com/BurntSushi/toml"
  19. )
  20. var inputConfigDecodeFactory = make(map[string]configDecodeFunc)
  21. var processorConfigDecodeFactory = make(map[string]configDecodeFunc)
  22. var outputConfigDecodeFactory = make(map[string]configDecodeFunc)
  23. func init() {
  24. RegisterInputConfigDecodeFunc("sock", sock.DecodeConfig)
  25. RegisterInputConfigDecodeFunc("file", file.DecodeConfig)
  26. RegisterProcessorConfigDecodeFunc("classify", classify.DecodeConfig)
  27. RegisterProcessorConfigDecodeFunc("jsonLog", jsonLog.DecodeConfig)
  28. RegisterProcessorConfigDecodeFunc("lengthCheck", lengthCheck.DecodeConfig)
  29. RegisterProcessorConfigDecodeFunc("sample", sample.DecodeConfig)
  30. RegisterProcessorConfigDecodeFunc("httpStream", httpstream.DecodeConfig)
  31. RegisterProcessorConfigDecodeFunc("fileLog", fileLog.DecodeConfig)
  32. RegisterProcessorConfigDecodeFunc("grok", grok.DecodeConfig)
  33. RegisterOutputConfigDecodeFunc("stdout", stdout.DecodeConfig)
  34. RegisterOutputConfigDecodeFunc("lancer", lancerlogstream.DecodeConfig)
  35. RegisterOutputConfigDecodeFunc("lancergrpc", lancergrpc.DecodeConfig)
  36. }
  37. type Pipeline struct {
  38. c *Config
  39. ctx context.Context
  40. cancel context.CancelFunc
  41. configPath string
  42. configMd5 string
  43. }
  44. type Config struct {
  45. Input ConfigItem `toml:"input"`
  46. Processor map[string]ConfigItem `toml:"processor"`
  47. Output map[string]ConfigItem `toml:"output"`
  48. }
  49. type ConfigItem struct {
  50. Name string `toml:"type"`
  51. Config toml.Primitive `toml:"config"`
  52. }
  53. func (pipe *Pipeline) Stop() {
  54. pipe.cancel()
  55. }
  56. type configDecodeFunc = func(md toml.MetaData, primValue toml.Primitive) (c interface{}, err error)
  57. func RegisterInputConfigDecodeFunc(name string, f configDecodeFunc) {
  58. inputConfigDecodeFactory[name] = f
  59. }
  60. func RegisterProcessorConfigDecodeFunc(name string, f configDecodeFunc) {
  61. processorConfigDecodeFactory[name] = f
  62. }
  63. func GetInputConfigDecodeFunc(name string) (f configDecodeFunc, err error) {
  64. if f, exist := inputConfigDecodeFactory[name]; exist {
  65. return f, nil
  66. }
  67. return nil, fmt.Errorf("InputConfigDecodeFunc for %s not exist", name)
  68. }
  69. func GetProcessorConfigDecodeFunc(name string) (f configDecodeFunc, err error) {
  70. if f, exist := processorConfigDecodeFactory[name]; exist {
  71. return f, nil
  72. }
  73. return nil, fmt.Errorf("ProcessorConfigDecodeFunc for %s not exist", name)
  74. }
  75. func DecodeInputConfig(name string, md toml.MetaData, primValue toml.Primitive) (c interface{}, err error) {
  76. dFunc, err := GetInputConfigDecodeFunc(name)
  77. if err != nil {
  78. return nil, err
  79. }
  80. return dFunc(md, primValue)
  81. }
  82. func DecodeProcessorConfig(name string, md toml.MetaData, primValue toml.Primitive) (c interface{}, err error) {
  83. dFunc, err := GetProcessorConfigDecodeFunc(name)
  84. if err != nil {
  85. return nil, err
  86. }
  87. return dFunc(md, primValue)
  88. }
  89. func RegisterOutputConfigDecodeFunc(name string, f configDecodeFunc) {
  90. outputConfigDecodeFactory[name] = f
  91. }
  92. func GetOutputConfigDecodeFunc(name string) (f configDecodeFunc, err error) {
  93. if f, exist := outputConfigDecodeFactory[name]; exist {
  94. return f, nil
  95. }
  96. return nil, fmt.Errorf("OutputConfigDecodeFunc for %s not exist", name)
  97. }
  98. func DecodeOutputConfig(name string, md toml.MetaData, primValue toml.Primitive) (c interface{}, err error) {
  99. dFunc, err := GetOutputConfigDecodeFunc(name)
  100. if err != nil {
  101. return nil, err
  102. }
  103. return dFunc(md, primValue)
  104. }
  105. func (p *Pipeline) logError(err error) {
  106. configPath := p.ctx.Value("configPath")
  107. log.Error("failed to run pipeline for %s: %s", configPath, err)
  108. }