file.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package file
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "path"
  7. "crypto/sha1"
  8. "encoding/base64"
  9. "errors"
  10. "os"
  11. "go-common/app/service/ops/log-agent/event"
  12. "go-common/app/service/ops/log-agent/input"
  13. "go-common/library/log"
  14. )
  15. type File struct {
  16. c *Config
  17. output chan<- *event.ProcessorEvent
  18. ctx context.Context
  19. cancel context.CancelFunc
  20. register *Registrar
  21. }
  22. func init() {
  23. err := input.Register("file", NewFile)
  24. if err != nil {
  25. panic(err)
  26. }
  27. }
  28. func NewFile(ctx context.Context, config interface{}, output chan<- *event.ProcessorEvent) (input.Input, error) {
  29. f := new(File)
  30. if c, ok := config.(*Config); !ok {
  31. return nil, fmt.Errorf("Error config for File Input")
  32. } else {
  33. if err := c.ConfigValidate(); err != nil {
  34. return nil, err
  35. }
  36. f.c = c
  37. }
  38. f.output = output
  39. f.ctx, f.cancel = context.WithCancel(ctx)
  40. // set config by ctx
  41. if f.c.ConfigPath == "" {
  42. configPath := ctx.Value("configPath")
  43. if configPath == nil {
  44. return nil, errors.New("can't get configPath from context")
  45. }
  46. f.c.ConfigPath = configPath.(string)
  47. }
  48. if f.c.ID == "" {
  49. hasher := sha1.New()
  50. hasher.Write([]byte(f.c.ConfigPath))
  51. f.c.ID = base64.URLEncoding.EncodeToString(hasher.Sum(nil))
  52. }
  53. if f.c.MetaPath == "" {
  54. f.c.MetaPath = ctx.Value("MetaPath").(string)
  55. }
  56. // init register
  57. if err := f.initRegister(); err != nil {
  58. return nil, err
  59. }
  60. return f, nil
  61. }
  62. func (f *File) Run() (err error) {
  63. log.Info("start collect log configured in %s", f.c.ConfigPath)
  64. f.scan()
  65. ticker := time.Tick(time.Duration(f.c.ScanFrequency))
  66. go func() {
  67. for {
  68. select {
  69. case <-f.ctx.Done():
  70. return
  71. case <-ticker:
  72. f.scan()
  73. }
  74. }
  75. }()
  76. return nil
  77. }
  78. func (f *File) Stop() {
  79. f.cancel()
  80. }
  81. func (f *File) Ctx() (context.Context) {
  82. return f.ctx
  83. }
  84. func (f *File) initRegister() error {
  85. path := path.Join(f.c.MetaPath, f.c.ID)
  86. register, err := NewRegistry(f.ctx, path)
  87. if err != nil {
  88. return err
  89. }
  90. f.register = register
  91. return nil
  92. }
  93. // Scan starts a scanGlob for each provided path/glob
  94. func (f *File) scan() {
  95. paths := f.getFiles()
  96. // clean files older than
  97. if time.Duration(f.c.CleanFilesOlder) != 0 {
  98. f.cleanOldFiles(paths)
  99. }
  100. for path, info := range paths {
  101. select {
  102. case <-f.ctx.Done():
  103. return
  104. default:
  105. }
  106. newState, err := getFileState(path, info)
  107. if err != nil {
  108. log.Error("Skipping file %s due to error %s", path, err)
  109. continue
  110. }
  111. // Load last state
  112. lastState := f.register.FindPrevious(newState)
  113. // Ignores all files which fall under ignore_older
  114. if f.isIgnoreOlder(newState) {
  115. continue
  116. }
  117. // Decides if previous state exists
  118. if lastState.IsEmpty() {
  119. log.Info("Start harvester for new file: %s, inode: %d", newState.Source, newState.Inode)
  120. ctx := context.WithValue(f.ctx, "firstRun", true)
  121. err := f.startHarvester(ctx, f.c, f.register, newState, 0)
  122. if err != nil {
  123. log.Error("Harvester could not be started on new file: %s, Err: %s", newState.Source, err)
  124. }
  125. } else {
  126. ctx := context.WithValue(f.ctx, "firstRun", false)
  127. f.harvestExistingFile(ctx, f.c, f.register, newState, lastState)
  128. }
  129. }
  130. }
  131. func (f *File) cleanOldFiles(paths map[string]os.FileInfo) {
  132. if time.Duration(f.c.CleanFilesOlder) == 0 {
  133. return
  134. }
  135. var latestFile *State
  136. for path, info := range paths {
  137. newState, err := getFileState(path, info)
  138. if err != nil {
  139. log.Error("Skipping file %s due to error %s", path, err)
  140. continue
  141. }
  142. if latestFile == nil {
  143. latestFile = &newState
  144. continue
  145. }
  146. if newState.Fileinfo.ModTime().After(latestFile.Fileinfo.ModTime()) {
  147. // delete latestFile if newer file existing and modtime of latestFile is older than f.c.CleanFilesOlder
  148. if time.Since(latestFile.Fileinfo.ModTime()) > time.Duration(f.c.CleanFilesOlder) {
  149. if err := os.Remove(latestFile.Source); err != nil {
  150. log.Error("Failed to delete file %s", latestFile.Source)
  151. } else {
  152. log.Info("Delete file %s older than %s", latestFile.Source, time.Duration(f.c.CleanFilesOlder).String())
  153. }
  154. }
  155. latestFile = &newState
  156. continue
  157. }
  158. if newState.Fileinfo.ModTime().Before(latestFile.Fileinfo.ModTime()) {
  159. if time.Since(newState.Fileinfo.ModTime()) > time.Duration(f.c.CleanFilesOlder) {
  160. if err := os.Remove(newState.Source); err != nil {
  161. log.Error("Failed to delete file %s", newState.Source)
  162. } else {
  163. log.Info("Delete file %s older than %s", newState.Source, time.Duration(f.c.CleanFilesOlder))
  164. }
  165. }
  166. }
  167. }
  168. }
  169. // isIgnoreOlder checks if the given state reached ignore_older
  170. func (f *File) isIgnoreOlder(state State) bool {
  171. // ignore_older is disable
  172. if f.c.IgnoreOlder == 0 {
  173. return false
  174. }
  175. modTime := state.Fileinfo.ModTime()
  176. if time.Since(modTime) > time.Duration(f.c.IgnoreOlder) {
  177. return true
  178. }
  179. return false
  180. }