harvester.go 13 KB


  1. package file
  2. import (
  3. "fmt"
  4. "os"
  5. "errors"
  6. "io"
  7. "time"
  8. "context"
  9. "bytes"
  10. "strconv"
  11. "regexp"
  12. "go-common/app/service/ops/log-agent/event"
  13. "go-common/library/log"
  14. "go-common/app/service/ops/log-agent/pkg/lancerroute"
  15. )
  16. type Source interface {
  17. io.ReadCloser
  18. Name() string
  19. Stat() (os.FileInfo, error)
  20. Continuable() bool // can we continue processing after EOF?
  21. HasState() bool // does this source have a state?
  22. }
  23. type Hfile struct {
  24. *os.File
  25. }
  26. var lineReadTimeout = errors.New("lineReadTimeout")
  27. func (Hfile) Continuable() bool { return true }
  28. func (Hfile) HasState() bool { return true }
  29. // Harvester contains all harvester related data
  30. type Harvester struct {
  31. config *Config
  32. source *os.File
  33. ctx context.Context
  34. cancel context.CancelFunc
  35. state State
  36. register *Registrar
  37. reader Reader
  38. output chan<- *event.ProcessorEvent
  39. active time.Time
  40. lineBuffer *bytes.Buffer
  41. multilineBuffer *bytes.Buffer
  42. firstLine []byte
  43. readFunc func() ([]byte, error)
  44. }
  45. // startHarvester starts a new harvester with the given offset
  46. func (f *File) startHarvester(ctx context.Context, c *Config, register *Registrar, state State, offset int64) (err error) {
  47. // Set state to "not" finished to indicate that a harvester is running
  48. state.Finished = false
  49. state.Offset = offset
  50. // Create harvester with state
  51. h, err := NewHarvester(c, register, state, f.output)
  52. if err != nil {
  53. return err
  54. }
  55. h.ctx, h.cancel = context.WithCancel(ctx)
  56. err = h.Setup()
  57. if err != nil {
  58. return fmt.Errorf("error setting up harvester: %s", err)
  59. }
  60. // Update state before staring harvester
  61. // This makes sure the states is set to Finished: false
  62. // This is synchronous state update as part of the scan
  63. h.register.SendStateUpdate(h.state)
  64. h.active = time.Now()
  65. h.lineBuffer = bytes.NewBuffer(make([]byte, 0, h.config.MaxLength))
  66. h.multilineBuffer = bytes.NewBuffer(make([]byte, 0, h.config.MaxLength))
  67. if h.config.Multiline != nil {
  68. h.readFunc = h.readMultiLine
  69. } else {
  70. h.readFunc = h.readOneLine
  71. }
  72. go h.stateUpdatePeriodically()
  73. go h.Run()
  74. go h.activeCheck()
  75. return err
  76. }
  77. // harvestExistingFile continues harvesting a file with a known state if needed
  78. func (f *File) harvestExistingFile(ctx context.Context, c *Config, register *Registrar, newState State, oldState State) {
  79. //log.Info("Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset)
  80. // No harvester is running for the file, start a new harvester
  81. // It is important here that only the size is checked and not modification time, as modification time could be incorrect on windows
  82. // https://blogs.technet.microsoft.com/asiasupp/2010/12/14/file-date-modified-property-are-not-updating-while-modifying-a-file-without-closing-it/
  83. if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset {
  84. // Resume harvesting of an old file we've stopped harvesting from
  85. // This could also be an issue with force_close_older that a new harvester is started after each scan but not needed?
  86. // One problem with comparing modTime is that it is in seconds, and scans can happen more then once a second
  87. log.Info("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
  88. err := f.startHarvester(ctx, c, register, newState, oldState.Offset)
  89. if err != nil {
  90. log.Error("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
  91. }
  92. return
  93. }
  94. // File size was reduced -> truncated file
  95. if newState.Fileinfo.Size() < oldState.Offset {
  96. log.Info("Old file was truncated. Starting from the beginning: %s, old size: %d new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
  97. if oldState.Finished {
  98. err := f.startHarvester(ctx, c, register, newState, 0)
  99. if err != nil {
  100. log.Error("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
  101. }
  102. return
  103. }
  104. // just stop old harvester
  105. h := f.register.GetHarvester(oldState.Inode)
  106. if h != nil {
  107. h.Stop()
  108. }
  109. return
  110. }
  111. // Check if file was renamed
  112. if oldState.Source != "" && oldState.Source != newState.Source {
  113. // This does not start a new harvester as it is assume that the older harvester is still running
  114. // or no new lines were detected. It sends only an event status update to make sure the new name is persisted.
  115. log.Info("File rename was detected: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset)
  116. oldState.Source = newState.Source
  117. f.register.SendStateUpdate(oldState)
  118. }
  119. if !oldState.Finished {
  120. // Nothing to do. Harvester is still running and file was not renamed
  121. log.V(1).Info("Harvester for file is still running: %s, inode %d", newState.Source, newState.Inode)
  122. } else {
  123. log.V(1).Info("File didn't change: %s, inode %d", newState.Source, newState.Inode)
  124. }
  125. }
  126. // NewHarvester creates a new harvester
  127. func NewHarvester(c *Config, register *Registrar, state State, output chan<- *event.ProcessorEvent) (*Harvester, error) {
  128. h := &Harvester{
  129. config: c,
  130. state: state,
  131. register: register,
  132. output: output,
  133. }
  134. // Add ttl if cleanInactive is set
  135. if h.config.CleanInactive > 0 {
  136. h.state.TTL = time.Duration(h.config.CleanInactive)
  137. }
  138. // Add outlet signal so harvester can also stop itself
  139. return h, nil
  140. }
  141. // Setup opens the file handler and creates the reader for the harvester
  142. func (h *Harvester) Setup() error {
  143. err := h.openFile()
  144. if err != nil {
  145. return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
  146. }
  147. h.reader, err = h.newLogFileReader()
  148. if err != nil {
  149. if h.source != nil {
  150. h.source.Close()
  151. }
  152. return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
  153. }
  154. return nil
  155. }
  156. // openFile opens a file and checks for the encoding. In case the encoding cannot be detected
  157. // or the file cannot be opened because for example of failing read permissions, an error
  158. // is returned and the harvester is closed. The file will be picked up again the next time
  159. // the file system is scanned
  160. func (h *Harvester) openFile() error {
  161. f, err := os.OpenFile(h.state.Source, os.O_RDONLY, os.FileMode(0))
  162. if err != nil {
  163. return fmt.Errorf("Failed opening %s: %s", h.state.Source, err)
  164. }
  165. // Makes sure file handler is also closed on errors
  166. err = h.validateFile(f)
  167. if err != nil {
  168. f.Close()
  169. return err
  170. }
  171. h.source = f
  172. return nil
  173. }
  174. func (h *Harvester) validateFile(f *os.File) error {
  175. info, err := f.Stat()
  176. if err != nil {
  177. return fmt.Errorf("Failed getting stats for file %s: %s", h.state.Source, err)
  178. }
  179. if !info.Mode().IsRegular() {
  180. return fmt.Errorf("Tried to open non regular file: %q %s", info.Mode(), info.Name())
  181. }
  182. // Compares the stat of the opened file to the state given by the input. Abort if not match.
  183. if !os.SameFile(h.state.Fileinfo, info) {
  184. return errors.New("file info is not identical with opened file. Aborting harvesting and retrying file later again")
  185. }
  186. // get file offset. Only update offset if no error
  187. offset, err := h.initFileOffset(f)
  188. if err != nil {
  189. return err
  190. }
  191. log.V(1).Info("harvester Setting offset for file: %s inode %d. Offset: %d ", h.state.Source, h.state.Inode, offset)
  192. h.state.Offset = offset
  193. return nil
  194. }
  195. // initFileOffset set offset for file handler
  196. func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
  197. // continue from last known offset
  198. if h.state.Offset > 0 {
  199. return file.Seek(h.state.Offset, os.SEEK_SET)
  200. }
  201. var firstRun = false
  202. if v := h.ctx.Value("firstRun"); v != nil {
  203. firstRun = v.(bool)
  204. }
  205. if h.config.ReadFrom == "newest" && firstRun {
  206. return file.Seek(0, os.SEEK_END)
  207. }
  208. return file.Seek(0, os.SEEK_CUR)
  209. }
  210. func (h *Harvester) newLogFileReader() (Reader, error) {
  211. return NewLineReader(h.source, h.config.MaxLength)
  212. }
  213. func (h *Harvester) WriteToProcessor(message []byte) {
  214. e := event.GetEvent()
  215. e.Write(message)
  216. e.AppId = []byte(h.config.AppId)
  217. e.LogId = h.config.LogId
  218. e.Source = "file"
  219. // update fields
  220. for k, v := range h.config.Fields {
  221. e.Fields[k] = v
  222. }
  223. e.Fields["file"] = h.state.Source
  224. e.Destination = lancerroute.GetLancerByLogid(e.LogId)
  225. // time maybe overwrite by processor
  226. e.Time = time.Now()
  227. e.TimeRangeKey = strconv.FormatInt(e.Time.Unix()/100*100, 10)
  228. h.output <- e
  229. }
  230. func (h *Harvester) stateUpdatePeriodically() {
  231. interval := time.Tick(time.Second * 5)
  232. var offset int64
  233. for {
  234. select {
  235. case <-interval:
  236. if h.state.Offset > offset {
  237. offset = h.state.Offset
  238. h.register.SendStateUpdate(h.state)
  239. h.active = time.Now()
  240. }
  241. case <-h.ctx.Done():
  242. h.register.SendStateUpdate(h.state)
  243. return
  244. }
  245. }
  246. }
  247. func (h *Harvester) Stop() {
  248. h.state.Finished = true
  249. h.register.SendStateUpdate(h.state)
  250. h.cancel()
  251. log.Info("Harvester for File: %s Inode %d Existed", h.state.Source, h.state.Inode)
  252. }
  253. func (h *Harvester) activeCheck() {
  254. interval := time.Tick(time.Minute * 1)
  255. for {
  256. select {
  257. case <-interval:
  258. if time.Now().Sub(h.active) > time.Duration(h.config.HarvesterTTL) {
  259. log.Info("Harvester for file: %s, inode: %d is inactive longer than HarvesterTTL, Ended", h.state.Source, h.state.Inode)
  260. h.Stop()
  261. return
  262. }
  263. case <-h.ctx.Done():
  264. return
  265. }
  266. }
  267. }
  268. func (h *Harvester) readMultiLine() (b []byte, err error) {
  269. h.multilineBuffer.Reset()
  270. counter := 0
  271. if h.firstLine != nil {
  272. h.multilineBuffer.Write(h.firstLine)
  273. }
  274. ctx, _ := context.WithTimeout(h.ctx, time.Duration(h.config.Timeout))
  275. for {
  276. select {
  277. case <-ctx.Done():
  278. h.firstLine = nil
  279. return h.multilineBuffer.Bytes(), lineReadTimeout
  280. default:
  281. }
  282. message, err := h.readOneLine()
  283. if err != nil && err != io.EOF && err != lineReadTimeout {
  284. h.firstLine = nil
  285. if h.multilineBuffer.Len() == 0 {
  286. return message, nil
  287. }
  288. if len(message) > 0 {
  289. h.multilineBuffer.Write([]byte{'\n'})
  290. h.multilineBuffer.Write(message)
  291. }
  292. return h.multilineBuffer.Bytes(), err
  293. }
  294. if len(message) == 0 {
  295. continue
  296. }
  297. matched, err := regexp.Match(h.config.Multiline.Pattern, message)
  298. if matched {
  299. // old multiline ended
  300. if h.firstLine != nil {
  301. h.firstLine = message
  302. return h.multilineBuffer.Bytes(), nil
  303. }
  304. // pure new multiline
  305. if h.firstLine == nil {
  306. h.firstLine = message
  307. h.multilineBuffer.Write(message)
  308. continue
  309. }
  310. }
  311. if !matched {
  312. // multiline not begin
  313. if h.firstLine == nil {
  314. return message, nil
  315. }
  316. if h.firstLine != nil {
  317. h.multilineBuffer.Write([]byte{'\n'})
  318. h.multilineBuffer.Write(message)
  319. counter += 1
  320. }
  321. }
  322. if counter > h.config.Multiline.MaxLines || h.multilineBuffer.Len() > h.config.MaxLength {
  323. h.firstLine = nil
  324. return h.multilineBuffer.Bytes(), nil
  325. }
  326. }
  327. }
  328. func (h *Harvester) readOneLine() (b []byte, err error) {
  329. h.lineBuffer.Reset()
  330. ctx, _ := context.WithTimeout(h.ctx, time.Duration(h.config.Timeout))
  331. for {
  332. select {
  333. case <-ctx.Done():
  334. return h.lineBuffer.Bytes(), lineReadTimeout
  335. default:
  336. }
  337. message, advance, err := h.reader.Next()
  338. // update offset
  339. h.state.Offset += int64(advance)
  340. if err == nil && h.lineBuffer.Len() == 0 {
  341. return message, nil
  342. }
  343. h.lineBuffer.Write(message)
  344. if err == nil {
  345. return h.lineBuffer.Bytes(), nil
  346. }
  347. if err == io.EOF && advance == 0 {
  348. time.Sleep(time.Millisecond * 100)
  349. continue
  350. }
  351. if err == io.EOF && advance > 0 && h.lineBuffer.Len() >= h.config.MaxLength {
  352. return h.lineBuffer.Bytes(), nil
  353. }
  354. if err != nil {
  355. return h.lineBuffer.Bytes(), err
  356. }
  357. }
  358. }
  359. //// Run start the harvester and reads files line by line and sends events to the defined output
  360. //func (h *Harvester) Run() {
  361. // log.V(1).Info("Harvester started for file: %s, inode %d", h.state.Source, h.state.Inode)
  362. // h.register.RegisterHarvester(h)
  363. // defer h.register.UnRegisterHarvester(h)
  364. //
  365. // var line = make([]byte, 0, h.config.MaxLength)
  366. // for {
  367. // select {
  368. // case <-h.ctx.Done():
  369. // return
  370. // default:
  371. // }
  372. // //TODO MaxLength check
  373. // message, advance, err := h.reader.Next()
  374. // // update offset
  375. // h.state.Offset += int64(advance)
  376. //
  377. // if err == nil {
  378. // if len(line) == 0 {
  379. // h.WriteToProcessor(message)
  380. // continue
  381. // }
  382. // line = append(line, message...)
  383. // h.WriteToProcessor(line)
  384. // line = line[:0]
  385. // continue
  386. // }
  387. //
  388. // if err == io.EOF && advance == 0 {
  389. // time.Sleep(time.Millisecond * 100)
  390. // continue
  391. // }
  392. //
  393. // if err == io.EOF && advance > 0 {
  394. // line = append(line, message...)
  395. // if len(line) >= h.config.MaxLength {
  396. // h.WriteToProcessor(line)
  397. // line = line[:0]
  398. // }
  399. // continue
  400. // }
  401. //
  402. // if err != nil {
  403. // log.Error("Harvester Read line error: %v; File: %v", err, h.state.Source)
  404. // h.Stop()
  405. // return
  406. // }
  407. // }
  408. //}
  409. // Run start the harvester and reads files line by line and sends events to the defined output
  410. func (h *Harvester) Run() {
  411. log.V(1).Info("Harvester started for file: %s, inode %d", h.state.Source, h.state.Inode)
  412. h.register.RegisterHarvester(h)
  413. defer h.register.UnRegisterHarvester(h)
  414. for {
  415. select {
  416. case <-h.ctx.Done():
  417. return
  418. default:
  419. }
  420. message, err := h.readFunc()
  421. if err == lineReadTimeout {
  422. log.V(1).Info("lineReadTimeout when harvesting %s", h.state.Source)
  423. }
  424. if len(message) > 0 {
  425. h.WriteToProcessor(message)
  426. }
  427. if err != nil && err != lineReadTimeout && err != io.EOF {
  428. log.Error("Harvester Read line error: %v; File: %v", err, h.state.Source)
  429. h.Stop()
  430. return
  431. }
  432. }
  433. }