123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- package file
- import (
- "os"
- "bufio"
- "time"
- "io/ioutil"
- "math/rand"
- "path"
- "strconv"
- "sync"
- "fmt"
- "errors"
- "encoding/binary"
- "encoding/json"
- "sort"
- "bytes"
- "io"
- "strings"
- "go-common/app/service/ops/log-agent/event"
- "go-common/library/log"
- "go-common/app/service/ops/log-agent/pkg/flowmonitor"
- "github.com/fsnotify/fsnotify"
- )
- const (
- _formatUpdated = "2006-01-02 15:04:05"
- _logMagicSize = 2
- _logHeadSize = 6
- _logLenSize = 4
- _logIdSize = 6
- _logLancerHeaderLen = 19
- )
- var (
- errLogNotFound = errors.New("log not found")
- errMagicInvaild = errors.New("log magic invalid")
- logMagic = []byte{0xAC, 0xBE}
- _logType = []byte{0, 1}
- _logLength = []byte{0, 0, 0, 0}
- local, _ = time.LoadLocation("Local")
- )
- // Index index.
- type Index struct {
- Name string `json:"name"`
- Offset int64 `json:"offset"`
- Updated string `json:"updated"`
- }
- type FileCache struct {
- c *Config
- next chan string
- storageFull bool
- writeChan chan *event.ProcessorEvent
- readChan chan *event.ProcessorEvent
- eLock sync.RWMutex
- logs map[string]os.FileInfo
- wh *fsnotify.Watcher
- }
- func NewFileCache(c *Config) (f *FileCache, err error) {
- if err = c.ConfigValidate(); err != nil {
- return nil, err
- }
- f = new(FileCache)
- f.c = c
- f.storageFull = false
- f.next = make(chan string, 1)
- f.writeChan = make(chan *event.ProcessorEvent)
- f.readChan = make(chan *event.ProcessorEvent)
- f.logs = make(map[string]os.FileInfo)
- if _, err := os.Stat(f.c.Storage); os.IsNotExist(err) {
- if err = os.MkdirAll(f.c.Storage, 0755); err != nil {
- return nil, err
- }
- }
- if err = f.nextFile(); err != nil {
- return nil, err
- }
- if err = f.watch(); err != nil {
- return
- }
- if err = f.loadFiles(); err != nil {
- return
- }
- go f.watchproc()
- go f.writeProcess()
- go f.readProcess()
- return f, nil
- }
- func (f *FileCache) WriteToCache(e *event.ProcessorEvent) {
- f.writeChan <- e
- }
- func (f *FileCache) ReadFromCache() (e *event.ProcessorEvent) {
- e = <-f.readChan
- return
- }
- // loadFiles loadFiles
- func (f *FileCache) loadFiles() (err error) {
- var (
- fi os.FileInfo
- fis []os.FileInfo
- )
- if fis, err = ioutil.ReadDir(f.c.Storage); err != nil {
- log.Error("ioutil.ReadDir(%s) error(%v)", f.c.Storage, err)
- return
- }
- for _, fi = range fis {
- name := path.Join(f.c.Storage, fi.Name())
- if !fi.IsDir() && strings.HasSuffix(name, f.c.Suffix) {
- f.eLock.Lock()
- f.logs[name] = fi
- f.eLock.Unlock()
- log.Info("loadFile: %s, size: %d", name, fi.Size())
- }
- }
- return
- }
- func (f *FileCache) writeProcess() {
- var (
- err error
- n, total int
- lengthbuf = make([]byte, 4)
- cur *os.File
- wr = bufio.NewWriterSize(nil, f.c.WriteBuffer)
- tk = time.Tick(time.Duration(f.c.CacheFlushInterval))
- timestamp = []byte(fmt.Sprintf("%d", time.Now().UnixNano()/1e6))
- )
- rand.Seed(time.Now().UnixNano())
- for {
- select {
- case next := <-f.next:
- if cur != nil && wr != nil {
- wr.Flush()
- cur.Close()
- }
- f, err := os.OpenFile(next, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
- if err != nil {
- log.Error("os.OpenFile(%s) error(%v)", next, err)
- continue
- }
- cur = f
- wr.Reset(f)
- total = 0
- case <-tk:
- if wr != nil && cur != nil {
- wr.Flush()
- }
- f.checkStorageSize()
- case e := <-f.writeChan:
- if f.storageFull {
- flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "ERROR", "file cache storgefull")
- event.PutEvent(e)
- continue
- }
- if total > f.c.FileBytes && len(f.next) == 0 {
- if err := f.nextFile(); err != nil {
- log.Error("c.nextFile() error(%v)", err)
- }
- }
- binary.BigEndian.PutUint32(lengthbuf, uint32(e.Length+_logLancerHeaderLen))
- // write logMagic
- if n, err = wr.Write(logMagic); err != nil {
- goto HERE
- }
- total += n
- // write length
- if n, err = wr.Write(lengthbuf); err != nil {
- goto HERE
- }
- total += n
- // write log
- if n, err = wr.Write([]byte(e.LogId)); err != nil {
- goto HERE
- }
- if n, err = wr.Write(timestamp); err != nil {
- goto HERE
- }
- if n, err = wr.Write(e.Bytes()); err != nil {
- goto HERE
- }
- total += n
- flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "OK", "write file cache ok")
- event.PutEvent(e)
- continue
- HERE: // write file cache error
- flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "ERROR", "write file cache failed")
- event.PutEvent(e)
- log.Error("wr.Write() error(%v)", err)
- if cur != nil && wr != nil {
- wr.Flush()
- cur.Close()
- }
- name := f.nextFileName()
- f, err := os.OpenFile(name, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
- if err != nil {
- log.Error("os.OpenFile(%s) error(%v)", name, err)
- continue
- }
- cur = f
- wr.Reset(f)
- total = 0
- continue
- }
- }
- }
- // index index
- func (f *FileCache) index() (idx *Index, err error) {
- f.eLock.RLock()
- length := len(f.logs)
- f.eLock.RUnlock()
- if length == 0 {
- err = errLogNotFound
- return
- }
- i, err := os.OpenFile(f.c.Index, os.O_RDONLY, 0666)
- if err != nil {
- log.Error("os.OpenFile(%s) error(%v)", f.c.Index, err)
- return
- }
- defer i.Close()
- b, err := ioutil.ReadAll(i)
- if err != nil {
- log.Error("ioutil.ReadAll(%s) error(%v)", f.c.Index, err)
- return
- }
- idx = &Index{}
- if err = json.Unmarshal(b, idx); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", b, err)
- return
- }
- return
- }
- // nextFile return first filename.
- // sorted by name.
- func (f *FileCache) nextReadFile() (name string) {
- var names []string
- f.eLock.RLock()
- for name = range f.logs {
- names = append(names, name)
- }
- f.eLock.RUnlock()
- if len(names) > 0 {
- sort.Strings(names)
- name = names[0]
- }
- return
- }
- // loadRemain loadRemain
- func (f *FileCache) loadRemain() (i *Index, w *os.File, err error) {
- if i, err = f.index(); err != nil {
- next := f.nextReadFile()
- if next == "" {
- err = errLogNotFound
- return
- }
- i = &Index{
- Name: next,
- Updated: time.Now().Format(_formatUpdated),
- }
- }
- if w, err = f.openLog(i); err != nil {
- log.Warn("a.openLog(%v) error(%v)", i, err)
- return
- }
- return
- }
- // openLog open the log file
- func (f *FileCache) openLog(idx *Index) (w *os.File, err error) {
- if w, err = os.OpenFile(idx.Name, os.O_RDONLY, 0666); err != nil {
- log.Error("os.OpenFile(%s) error(%v)", idx.Name, err)
- return
- }
- if _, err = w.Seek(idx.Offset, os.SEEK_SET); err != nil {
- log.Error("f.Seek(%d) error(%v)", idx.Offset, err)
- return
- }
- return
- }
- // watch watch
- func (f *FileCache) watch() (err error) {
- if f.wh, err = fsnotify.NewWatcher(); err != nil {
- log.Error("fsnotify.NewWatcher() error(%v)", err)
- return
- }
- if err = f.wh.Add(f.c.Storage); err != nil {
- log.Error("wh.Watch(%s) error(%v)", err)
- }
- return
- }
- // watchproc observe the directory file changes
- func (f *FileCache) watchproc() {
- var evt fsnotify.Event
- for {
- evt = <-f.wh.Events
- if evt.Op&fsnotify.Create == fsnotify.Create {
- if !strings.HasSuffix(evt.Name, f.c.Suffix) {
- log.Warn("create invalid file: %s", evt.Name)
- continue
- }
- fi, err := os.Stat(evt.Name)
- if err != nil {
- log.Error("os.Stat(%s) error(%v)", evt.Name, err)
- continue
- }
- f.eLock.Lock()
- f.logs[evt.Name] = fi
- f.eLock.Unlock()
- log.Info("create file: %s", evt.Name)
- }
- if evt.Op&fsnotify.Remove == fsnotify.Remove {
- f.eLock.Lock()
- delete(f.logs, evt.Name)
- f.eLock.Unlock()
- log.Info("remove file: %s", evt.Name)
- }
- }
- }
- // setIndex setIndex
- func (f *FileCache) setIndex(idx *Index) (err error) {
- w, err := os.OpenFile(f.c.Index, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
- if err != nil {
- log.Error("os.OpenFile(%s) error(%v)", f.c.Index, err)
- return
- }
- defer w.Close()
- b, err := json.Marshal(idx)
- if err != nil {
- log.Error("json.Marshal(%v)", idx)
- return
- }
- if _, err = w.Write(b); err != nil {
- log.Error("f.Write(%s) error(%v)", b, err)
- }
- return
- }
- // tailLog check the log format and get log from reader
- func (f *FileCache) tailLog(rr *bufio.Reader) (b []byte, err error) {
- var (
- t []byte
- )
- // peek magic
- for {
- if b, err = rr.Peek(_logMagicSize); err != nil {
- return
- }
- if bytes.Equal(b, logMagic) {
- break
- }
- rr.Discard(1)
- }
- // peek length
- if t, err = rr.Peek(_logHeadSize); err != nil {
- if err != io.EOF {
- log.Error("rr.Peek(len:%d) error(%v)", _logLenSize, err)
- }
- return
- }
- // peek body
- l := int(binary.BigEndian.Uint32(t[_logMagicSize:_logHeadSize]))
- if t, err = rr.Peek(_logHeadSize + l); err != nil {
- if err != io.EOF {
- log.Error("rr.Peek(%d) error(%v)", l, err)
- }
- return
- }
- b = t[_logHeadSize:]
- rr.Discard(l + _logHeadSize)
- return
- }
- // readproc read data and encapsulation protocol from file
- func (f *FileCache) readProcess() {
- var (
- err error
- idx *Index
- rr = bufio.NewReaderSize(nil, f.c.ReadBuffer)
- lastTime int64
- length int
- cur *os.File
- )
- if idx, cur, err = f.loadRemain(); err == nil {
- rr.Reset(cur)
- }
- for {
- if time.Now().Unix()-lastTime > 5 {
- if idx != nil {
- f.setIndex(idx)
- }
- lastTime = time.Now().Unix()
- }
- f.eLock.RLock()
- length = len(f.logs)
- f.eLock.RUnlock()
- // check is available for observing file
- if length == 0 {
- if cur != nil {
- cur.Close()
- cur = nil
- }
- time.Sleep(time.Second * 1)
- continue
- }
- // read first file from observing logs
- if cur == nil {
- next := f.nextReadFile()
- idx = &Index{
- Name: next,
- Updated: time.Now().Format(_formatUpdated),
- }
- if cur, err = f.openLog(idx); err != nil {
- log.Error("a.openLog(%v) error(%v)", idx, err)
- continue
- }
- rr.Reset(cur)
- f.setIndex(idx)
- }
- // tail a log from thos.OpenFilee buffer
- b, err := f.tailLog(rr)
- if err != nil {
- if err == io.EOF {
- if length > 1 {
- cur.Close()
- cur = nil
- os.Remove(idx.Name)
- f.eLock.Lock()
- delete(f.logs, idx.Name)
- f.eLock.Unlock()
- } else {
- time.Sleep(time.Second * 1)
- }
- continue
- }
- log.Error("read log error(%v)", err)
- rr.Discard(1)
- continue
- }
- idx.Offset += int64(len(b)) + _logHeadSize
- if len(b) <= _logLancerHeaderLen {
- continue
- }
- e := event.GetEvent()
- e.Write(b[_logLancerHeaderLen:])
- e.LogId = string(b[:_logIdSize])
- f.readChan <- e
- }
- }
- // check storage size
- func (f *FileCache) checkStorageSize() {
- var size int64
- if entries, err := ioutil.ReadDir(f.c.Storage); err == nil {
- for _, entry := range entries {
- if !entry.IsDir() {
- size += entry.Size()
- }
- }
- }
- if size > int64(f.c.StorageMaxMB*1024*1024) {
- log.Error("storage is full, discard log")
- flowmonitor.Fm.Add("log-agent", "log-agent.output.file-cache", strconv.FormatInt(time.Now().Unix()/100*100, 10), "ERROR", "storage full")
- f.storageFull = true
- } else {
- f.storageFull = false
- }
- }
- func (f *FileCache) nextFileName() string {
- return path.Join(f.c.Storage, strconv.FormatInt(time.Now().Unix(), 10)+f.c.Suffix)
- }
- // nextFile set first log filename.
- // sorted by name.
- func (f *FileCache) nextFile() (err error) {
- f.next <- f.nextFileName()
- return
- }
|