file.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package hostlogcollector
  2. import (
  3. "os"
  4. "io/ioutil"
  5. "path"
  6. "strings"
  7. "fmt"
  8. "time"
  9. "context"
  10. "go-common/library/log"
  11. "go-common/app/service/ops/log-agent/pipeline"
  12. )
  13. type HostLogCollector struct {
  14. c *Config
  15. ctx context.Context
  16. cancel context.CancelFunc
  17. }
  18. func InitHostLogCollector(ctx context.Context, c *Config) (err error) {
  19. if err = c.ConfigValidate(); err != nil {
  20. return err
  21. }
  22. collector := new(HostLogCollector)
  23. collector.c = c
  24. collector.ctx, collector.cancel = context.WithCancel(ctx)
  25. go collector.scan()
  26. return nil
  27. }
  28. //
  29. func (collector *HostLogCollector) scan() {
  30. ticker := time.Tick(time.Duration(collector.c.ScanInterval))
  31. for {
  32. select {
  33. case <-ticker:
  34. configPaths, err := collector.getConfigs()
  35. if err != nil {
  36. log.Error("failed to scan hostlogcollector config file list: %s", err)
  37. continue
  38. }
  39. for _, configPath := range configPaths {
  40. config, err := ioutil.ReadFile(configPath)
  41. if err != nil {
  42. log.Error("filed to read hostlogcollector config file %s: %s", configPath, err)
  43. continue
  44. }
  45. if !pipeline.PipelineManagement.PipelineExisted(configPath) {
  46. go pipeline.PipelineManagement.StartPipeline(collector.ctx, configPath, string(config))
  47. }
  48. }
  49. case <-collector.ctx.Done():
  50. return
  51. }
  52. }
  53. }
  54. // HostLogCollector get file collect configs under path
  55. func (collector *HostLogCollector) getConfigs() ([]string, error) {
  56. var (
  57. err error
  58. cinfos []os.FileInfo
  59. configFiles = make([]string, 0)
  60. )
  61. dinfo, err := os.Lstat(collector.c.HostConfigPath)
  62. if err != nil {
  63. return nil, fmt.Errorf("lstat(%s) failed: %s", collector.c.HostConfigPath, err)
  64. }
  65. if !dinfo.IsDir() {
  66. return nil, fmt.Errorf("file collect config path must be dir")
  67. }
  68. if cinfos, err = ioutil.ReadDir(collector.c.HostConfigPath); err != nil {
  69. return nil, fmt.Errorf("ioutil.ReadDir(%s) error(%v)", collector.c.HostConfigPath, err)
  70. }
  71. for _, cinfo := range cinfos {
  72. name := path.Join(collector.c.HostConfigPath, cinfo.Name())
  73. if !cinfo.IsDir() && strings.HasSuffix(name, collector.c.ConfigSuffix) {
  74. configFiles = append(configFiles, name)
  75. }
  76. }
  77. return configFiles, nil
  78. }