123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- package file
- import (
- "fmt"
- "os"
- "errors"
- "io"
- "time"
- "context"
- "bytes"
- "strconv"
- "regexp"
- "go-common/app/service/ops/log-agent/event"
- "go-common/library/log"
- "go-common/app/service/ops/log-agent/pkg/lancerroute"
- )
- type Source interface {
- io.ReadCloser
- Name() string
- Stat() (os.FileInfo, error)
- Continuable() bool // can we continue processing after EOF?
- HasState() bool // does this source have a state?
- }
- type Hfile struct {
- *os.File
- }
- var lineReadTimeout = errors.New("lineReadTimeout")
- func (Hfile) Continuable() bool { return true }
- func (Hfile) HasState() bool { return true }
- // Harvester contains all harvester related data
- type Harvester struct {
- config *Config
- source *os.File
- ctx context.Context
- cancel context.CancelFunc
- state State
- register *Registrar
- reader Reader
- output chan<- *event.ProcessorEvent
- active time.Time
- lineBuffer *bytes.Buffer
- multilineBuffer *bytes.Buffer
- firstLine []byte
- readFunc func() ([]byte, error)
- }
- // startHarvester starts a new harvester with the given offset
- func (f *File) startHarvester(ctx context.Context, c *Config, register *Registrar, state State, offset int64) (err error) {
- // Set state to "not" finished to indicate that a harvester is running
- state.Finished = false
- state.Offset = offset
- // Create harvester with state
- h, err := NewHarvester(c, register, state, f.output)
- if err != nil {
- return err
- }
- h.ctx, h.cancel = context.WithCancel(ctx)
- err = h.Setup()
- if err != nil {
- return fmt.Errorf("error setting up harvester: %s", err)
- }
- // Update state before staring harvester
- // This makes sure the states is set to Finished: false
- // This is synchronous state update as part of the scan
- h.register.SendStateUpdate(h.state)
- h.active = time.Now()
- h.lineBuffer = bytes.NewBuffer(make([]byte, 0, h.config.MaxLength))
- h.multilineBuffer = bytes.NewBuffer(make([]byte, 0, h.config.MaxLength))
- if h.config.Multiline != nil {
- h.readFunc = h.readMultiLine
- } else {
- h.readFunc = h.readOneLine
- }
- go h.stateUpdatePeriodically()
- go h.Run()
- go h.activeCheck()
- return err
- }
- // harvestExistingFile continues harvesting a file with a known state if needed
- func (f *File) harvestExistingFile(ctx context.Context, c *Config, register *Registrar, newState State, oldState State) {
- //log.Info("Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset)
- // No harvester is running for the file, start a new harvester
- // It is important here that only the size is checked and not modification time, as modification time could be incorrect on windows
- // https://blogs.technet.microsoft.com/asiasupp/2010/12/14/file-date-modified-property-are-not-updating-while-modifying-a-file-without-closing-it/
- if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset {
- // Resume harvesting of an old file we've stopped harvesting from
- // This could also be an issue with force_close_older that a new harvester is started after each scan but not needed?
- // One problem with comparing modTime is that it is in seconds, and scans can happen more then once a second
- log.Info("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
- err := f.startHarvester(ctx, c, register, newState, oldState.Offset)
- if err != nil {
- log.Error("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
- }
- return
- }
- // File size was reduced -> truncated file
- if newState.Fileinfo.Size() < oldState.Offset {
- log.Info("Old file was truncated. Starting from the beginning: %s, old size: %d new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
- if oldState.Finished {
- err := f.startHarvester(ctx, c, register, newState, 0)
- if err != nil {
- log.Error("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
- }
- return
- }
- // just stop old harvester
- h := f.register.GetHarvester(oldState.Inode)
- if h != nil {
- h.Stop()
- }
- return
- }
- // Check if file was renamed
- if oldState.Source != "" && oldState.Source != newState.Source {
- // This does not start a new harvester as it is assume that the older harvester is still running
- // or no new lines were detected. It sends only an event status update to make sure the new name is persisted.
- log.Info("File rename was detected: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset)
- oldState.Source = newState.Source
- f.register.SendStateUpdate(oldState)
- }
- if !oldState.Finished {
- // Nothing to do. Harvester is still running and file was not renamed
- log.V(1).Info("Harvester for file is still running: %s, inode %d", newState.Source, newState.Inode)
- } else {
- log.V(1).Info("File didn't change: %s, inode %d", newState.Source, newState.Inode)
- }
- }
- // NewHarvester creates a new harvester
- func NewHarvester(c *Config, register *Registrar, state State, output chan<- *event.ProcessorEvent) (*Harvester, error) {
- h := &Harvester{
- config: c,
- state: state,
- register: register,
- output: output,
- }
- // Add ttl if cleanInactive is set
- if h.config.CleanInactive > 0 {
- h.state.TTL = time.Duration(h.config.CleanInactive)
- }
- // Add outlet signal so harvester can also stop itself
- return h, nil
- }
- // Setup opens the file handler and creates the reader for the harvester
- func (h *Harvester) Setup() error {
- err := h.openFile()
- if err != nil {
- return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
- }
- h.reader, err = h.newLogFileReader()
- if err != nil {
- if h.source != nil {
- h.source.Close()
- }
- return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
- }
- return nil
- }
- // openFile opens a file and checks for the encoding. In case the encoding cannot be detected
- // or the file cannot be opened because for example of failing read permissions, an error
- // is returned and the harvester is closed. The file will be picked up again the next time
- // the file system is scanned
- func (h *Harvester) openFile() error {
- f, err := os.OpenFile(h.state.Source, os.O_RDONLY, os.FileMode(0))
- if err != nil {
- return fmt.Errorf("Failed opening %s: %s", h.state.Source, err)
- }
- // Makes sure file handler is also closed on errors
- err = h.validateFile(f)
- if err != nil {
- f.Close()
- return err
- }
- h.source = f
- return nil
- }
- func (h *Harvester) validateFile(f *os.File) error {
- info, err := f.Stat()
- if err != nil {
- return fmt.Errorf("Failed getting stats for file %s: %s", h.state.Source, err)
- }
- if !info.Mode().IsRegular() {
- return fmt.Errorf("Tried to open non regular file: %q %s", info.Mode(), info.Name())
- }
- // Compares the stat of the opened file to the state given by the input. Abort if not match.
- if !os.SameFile(h.state.Fileinfo, info) {
- return errors.New("file info is not identical with opened file. Aborting harvesting and retrying file later again")
- }
- // get file offset. Only update offset if no error
- offset, err := h.initFileOffset(f)
- if err != nil {
- return err
- }
- log.V(1).Info("harvester Setting offset for file: %s inode %d. Offset: %d ", h.state.Source, h.state.Inode, offset)
- h.state.Offset = offset
- return nil
- }
- // initFileOffset set offset for file handler
- func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
- // continue from last known offset
- if h.state.Offset > 0 {
- return file.Seek(h.state.Offset, os.SEEK_SET)
- }
- var firstRun = false
- if v := h.ctx.Value("firstRun"); v != nil {
- firstRun = v.(bool)
- }
- if h.config.ReadFrom == "newest" && firstRun {
- return file.Seek(0, os.SEEK_END)
- }
- return file.Seek(0, os.SEEK_CUR)
- }
- func (h *Harvester) newLogFileReader() (Reader, error) {
- return NewLineReader(h.source, h.config.MaxLength)
- }
- func (h *Harvester) WriteToProcessor(message []byte) {
- e := event.GetEvent()
- e.Write(message)
- e.AppId = []byte(h.config.AppId)
- e.LogId = h.config.LogId
- e.Source = "file"
- // update fields
- for k, v := range h.config.Fields {
- e.Fields[k] = v
- }
- e.Fields["file"] = h.state.Source
- e.Destination = lancerroute.GetLancerByLogid(e.LogId)
- // time maybe overwrite by processor
- e.Time = time.Now()
- e.TimeRangeKey = strconv.FormatInt(e.Time.Unix()/100*100, 10)
- h.output <- e
- }
- func (h *Harvester) stateUpdatePeriodically() {
- interval := time.Tick(time.Second * 5)
- var offset int64
- for {
- select {
- case <-interval:
- if h.state.Offset > offset {
- offset = h.state.Offset
- h.register.SendStateUpdate(h.state)
- h.active = time.Now()
- }
- case <-h.ctx.Done():
- h.register.SendStateUpdate(h.state)
- return
- }
- }
- }
- func (h *Harvester) Stop() {
- h.state.Finished = true
- h.register.SendStateUpdate(h.state)
- h.cancel()
- log.Info("Harvester for File: %s Inode %d Existed", h.state.Source, h.state.Inode)
- }
- func (h *Harvester) activeCheck() {
- interval := time.Tick(time.Minute * 1)
- for {
- select {
- case <-interval:
- if time.Now().Sub(h.active) > time.Duration(h.config.HarvesterTTL) {
- log.Info("Harvester for file: %s, inode: %d is inactive longer than HarvesterTTL, Ended", h.state.Source, h.state.Inode)
- h.Stop()
- return
- }
- case <-h.ctx.Done():
- return
- }
- }
- }
- func (h *Harvester) readMultiLine() (b []byte, err error) {
- h.multilineBuffer.Reset()
- counter := 0
- if h.firstLine != nil {
- h.multilineBuffer.Write(h.firstLine)
- }
- ctx, _ := context.WithTimeout(h.ctx, time.Duration(h.config.Timeout))
- for {
- select {
- case <-ctx.Done():
- h.firstLine = nil
- return h.multilineBuffer.Bytes(), lineReadTimeout
- default:
- }
- message, err := h.readOneLine()
- if err != nil && err != io.EOF && err != lineReadTimeout {
- h.firstLine = nil
- if h.multilineBuffer.Len() == 0 {
- return message, nil
- }
- if len(message) > 0 {
- h.multilineBuffer.Write([]byte{'\n'})
- h.multilineBuffer.Write(message)
- }
- return h.multilineBuffer.Bytes(), err
- }
- if len(message) == 0 {
- continue
- }
- matched, err := regexp.Match(h.config.Multiline.Pattern, message)
- if matched {
- // old multiline ended
- if h.firstLine != nil {
- h.firstLine = message
- return h.multilineBuffer.Bytes(), nil
- }
- // pure new multiline
- if h.firstLine == nil {
- h.firstLine = message
- h.multilineBuffer.Write(message)
- continue
- }
- }
- if !matched {
- // multiline not begin
- if h.firstLine == nil {
- return message, nil
- }
- if h.firstLine != nil {
- h.multilineBuffer.Write([]byte{'\n'})
- h.multilineBuffer.Write(message)
- counter += 1
- }
- }
- if counter > h.config.Multiline.MaxLines || h.multilineBuffer.Len() > h.config.MaxLength {
- h.firstLine = nil
- return h.multilineBuffer.Bytes(), nil
- }
- }
- }
- func (h *Harvester) readOneLine() (b []byte, err error) {
- h.lineBuffer.Reset()
- ctx, _ := context.WithTimeout(h.ctx, time.Duration(h.config.Timeout))
- for {
- select {
- case <-ctx.Done():
- return h.lineBuffer.Bytes(), lineReadTimeout
- default:
- }
- message, advance, err := h.reader.Next()
- // update offset
- h.state.Offset += int64(advance)
- if err == nil && h.lineBuffer.Len() == 0 {
- return message, nil
- }
- h.lineBuffer.Write(message)
- if err == nil {
- return h.lineBuffer.Bytes(), nil
- }
- if err == io.EOF && advance == 0 {
- time.Sleep(time.Millisecond * 100)
- continue
- }
- if err == io.EOF && advance > 0 && h.lineBuffer.Len() >= h.config.MaxLength {
- return h.lineBuffer.Bytes(), nil
- }
- if err != nil {
- return h.lineBuffer.Bytes(), err
- }
- }
- }
- //// Run start the harvester and reads files line by line and sends events to the defined output
- //func (h *Harvester) Run() {
- // log.V(1).Info("Harvester started for file: %s, inode %d", h.state.Source, h.state.Inode)
- // h.register.RegisterHarvester(h)
- // defer h.register.UnRegisterHarvester(h)
- //
- // var line = make([]byte, 0, h.config.MaxLength)
- // for {
- // select {
- // case <-h.ctx.Done():
- // return
- // default:
- // }
- // //TODO MaxLength check
- // message, advance, err := h.reader.Next()
- // // update offset
- // h.state.Offset += int64(advance)
- //
- // if err == nil {
- // if len(line) == 0 {
- // h.WriteToProcessor(message)
- // continue
- // }
- // line = append(line, message...)
- // h.WriteToProcessor(line)
- // line = line[:0]
- // continue
- // }
- //
- // if err == io.EOF && advance == 0 {
- // time.Sleep(time.Millisecond * 100)
- // continue
- // }
- //
- // if err == io.EOF && advance > 0 {
- // line = append(line, message...)
- // if len(line) >= h.config.MaxLength {
- // h.WriteToProcessor(line)
- // line = line[:0]
- // }
- // continue
- // }
- //
- // if err != nil {
- // log.Error("Harvester Read line error: %v; File: %v", err, h.state.Source)
- // h.Stop()
- // return
- // }
- // }
- //}
- // Run start the harvester and reads files line by line and sends events to the defined output
- func (h *Harvester) Run() {
- log.V(1).Info("Harvester started for file: %s, inode %d", h.state.Source, h.state.Inode)
- h.register.RegisterHarvester(h)
- defer h.register.UnRegisterHarvester(h)
- for {
- select {
- case <-h.ctx.Done():
- return
- default:
- }
- message, err := h.readFunc()
- if err == lineReadTimeout {
- log.V(1).Info("lineReadTimeout when harvesting %s", h.state.Source)
- }
- if len(message) > 0 {
- h.WriteToProcessor(message)
- }
- if err != nil && err != lineReadTimeout && err != io.EOF {
- log.Error("Harvester Read line error: %v; File: %v", err, h.state.Source)
- h.Stop()
- return
- }
- }
- }
|