file.go 11 KB


  1. package file
  2. import (
  3. "os"
  4. "bufio"
  5. "time"
  6. "io/ioutil"
  7. "math/rand"
  8. "path"
  9. "strconv"
  10. "sync"
  11. "fmt"
  12. "errors"
  13. "encoding/binary"
  14. "encoding/json"
  15. "sort"
  16. "bytes"
  17. "io"
  18. "strings"
  19. "go-common/app/service/ops/log-agent/event"
  20. "go-common/library/log"
  21. "go-common/app/service/ops/log-agent/pkg/flowmonitor"
  22. "github.com/fsnotify/fsnotify"
  23. )
  24. const (
  25. _formatUpdated = "2006-01-02 15:04:05"
  26. _logMagicSize = 2
  27. _logHeadSize = 6
  28. _logLenSize = 4
  29. _logIdSize = 6
  30. _logLancerHeaderLen = 19
  31. )
  32. var (
  33. errLogNotFound = errors.New("log not found")
  34. errMagicInvaild = errors.New("log magic invalid")
  35. logMagic = []byte{0xAC, 0xBE}
  36. _logType = []byte{0, 1}
  37. _logLength = []byte{0, 0, 0, 0}
  38. local, _ = time.LoadLocation("Local")
  39. )
  40. // Index index.
  41. type Index struct {
  42. Name string `json:"name"`
  43. Offset int64 `json:"offset"`
  44. Updated string `json:"updated"`
  45. }
  46. type FileCache struct {
  47. c *Config
  48. next chan string
  49. storageFull bool
  50. writeChan chan *event.ProcessorEvent
  51. readChan chan *event.ProcessorEvent
  52. eLock sync.RWMutex
  53. logs map[string]os.FileInfo
  54. wh *fsnotify.Watcher
  55. }
  56. func NewFileCache(c *Config) (f *FileCache, err error) {
  57. if err = c.ConfigValidate(); err != nil {
  58. return nil, err
  59. }
  60. f = new(FileCache)
  61. f.c = c
  62. f.storageFull = false
  63. f.next = make(chan string, 1)
  64. f.writeChan = make(chan *event.ProcessorEvent)
  65. f.readChan = make(chan *event.ProcessorEvent)
  66. f.logs = make(map[string]os.FileInfo)
  67. if _, err := os.Stat(f.c.Storage); os.IsNotExist(err) {
  68. if err = os.MkdirAll(f.c.Storage, 0755); err != nil {
  69. return nil, err
  70. }
  71. }
  72. if err = f.nextFile(); err != nil {
  73. return nil, err
  74. }
  75. if err = f.watch(); err != nil {
  76. return
  77. }
  78. if err = f.loadFiles(); err != nil {
  79. return
  80. }
  81. go f.watchproc()
  82. go f.writeProcess()
  83. go f.readProcess()
  84. return f, nil
  85. }
  86. func (f *FileCache) WriteToCache(e *event.ProcessorEvent) {
  87. f.writeChan <- e
  88. }
  89. func (f *FileCache) ReadFromCache() (e *event.ProcessorEvent) {
  90. e = <-f.readChan
  91. return
  92. }
  93. // loadFiles loadFiles
  94. func (f *FileCache) loadFiles() (err error) {
  95. var (
  96. fi os.FileInfo
  97. fis []os.FileInfo
  98. )
  99. if fis, err = ioutil.ReadDir(f.c.Storage); err != nil {
  100. log.Error("ioutil.ReadDir(%s) error(%v)", f.c.Storage, err)
  101. return
  102. }
  103. for _, fi = range fis {
  104. name := path.Join(f.c.Storage, fi.Name())
  105. if !fi.IsDir() && strings.HasSuffix(name, f.c.Suffix) {
  106. f.eLock.Lock()
  107. f.logs[name] = fi
  108. f.eLock.Unlock()
  109. log.Info("loadFile: %s, size: %d", name, fi.Size())
  110. }
  111. }
  112. return
  113. }
  114. func (f *FileCache) writeProcess() {
  115. var (
  116. err error
  117. n, total int
  118. lengthbuf = make([]byte, 4)
  119. cur *os.File
  120. wr = bufio.NewWriterSize(nil, f.c.WriteBuffer)
  121. tk = time.Tick(time.Duration(f.c.CacheFlushInterval))
  122. timestamp = []byte(fmt.Sprintf("%d", time.Now().UnixNano()/1e6))
  123. )
  124. rand.Seed(time.Now().UnixNano())
  125. for {
  126. select {
  127. case next := <-f.next:
  128. if cur != nil && wr != nil {
  129. wr.Flush()
  130. cur.Close()
  131. }
  132. f, err := os.OpenFile(next, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
  133. if err != nil {
  134. log.Error("os.OpenFile(%s) error(%v)", next, err)
  135. continue
  136. }
  137. cur = f
  138. wr.Reset(f)
  139. total = 0
  140. case <-tk:
  141. if wr != nil && cur != nil {
  142. wr.Flush()
  143. }
  144. f.checkStorageSize()
  145. case e := <-f.writeChan:
  146. if f.storageFull {
  147. flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "ERROR", "file cache storgefull")
  148. event.PutEvent(e)
  149. continue
  150. }
  151. if total > f.c.FileBytes && len(f.next) == 0 {
  152. if err := f.nextFile(); err != nil {
  153. log.Error("c.nextFile() error(%v)", err)
  154. }
  155. }
  156. binary.BigEndian.PutUint32(lengthbuf, uint32(e.Length+_logLancerHeaderLen))
  157. // write logMagic
  158. if n, err = wr.Write(logMagic); err != nil {
  159. goto HERE
  160. }
  161. total += n
  162. // write length
  163. if n, err = wr.Write(lengthbuf); err != nil {
  164. goto HERE
  165. }
  166. total += n
  167. // write log
  168. if n, err = wr.Write([]byte(e.LogId)); err != nil {
  169. goto HERE
  170. }
  171. if n, err = wr.Write(timestamp); err != nil {
  172. goto HERE
  173. }
  174. if n, err = wr.Write(e.Bytes()); err != nil {
  175. goto HERE
  176. }
  177. total += n
  178. flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "OK", "write file cache ok")
  179. event.PutEvent(e)
  180. continue
  181. HERE: // write file cache error
  182. flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "ERROR", "write file cache failed")
  183. event.PutEvent(e)
  184. log.Error("wr.Write() error(%v)", err)
  185. if cur != nil && wr != nil {
  186. wr.Flush()
  187. cur.Close()
  188. }
  189. name := f.nextFileName()
  190. f, err := os.OpenFile(name, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
  191. if err != nil {
  192. log.Error("os.OpenFile(%s) error(%v)", name, err)
  193. continue
  194. }
  195. cur = f
  196. wr.Reset(f)
  197. total = 0
  198. continue
  199. }
  200. }
  201. }
  202. // index index
  203. func (f *FileCache) index() (idx *Index, err error) {
  204. f.eLock.RLock()
  205. length := len(f.logs)
  206. f.eLock.RUnlock()
  207. if length == 0 {
  208. err = errLogNotFound
  209. return
  210. }
  211. i, err := os.OpenFile(f.c.Index, os.O_RDONLY, 0666)
  212. if err != nil {
  213. log.Error("os.OpenFile(%s) error(%v)", f.c.Index, err)
  214. return
  215. }
  216. defer i.Close()
  217. b, err := ioutil.ReadAll(i)
  218. if err != nil {
  219. log.Error("ioutil.ReadAll(%s) error(%v)", f.c.Index, err)
  220. return
  221. }
  222. idx = &Index{}
  223. if err = json.Unmarshal(b, idx); err != nil {
  224. log.Error("json.Unmarshal(%s) error(%v)", b, err)
  225. return
  226. }
  227. return
  228. }
  229. // nextFile return first filename.
  230. // sorted by name.
  231. func (f *FileCache) nextReadFile() (name string) {
  232. var names []string
  233. f.eLock.RLock()
  234. for name = range f.logs {
  235. names = append(names, name)
  236. }
  237. f.eLock.RUnlock()
  238. if len(names) > 0 {
  239. sort.Strings(names)
  240. name = names[0]
  241. }
  242. return
  243. }
  244. // loadRemain loadRemain
  245. func (f *FileCache) loadRemain() (i *Index, w *os.File, err error) {
  246. if i, err = f.index(); err != nil {
  247. next := f.nextReadFile()
  248. if next == "" {
  249. err = errLogNotFound
  250. return
  251. }
  252. i = &Index{
  253. Name: next,
  254. Updated: time.Now().Format(_formatUpdated),
  255. }
  256. }
  257. if w, err = f.openLog(i); err != nil {
  258. log.Warn("a.openLog(%v) error(%v)", i, err)
  259. return
  260. }
  261. return
  262. }
  263. // openLog open the log file
  264. func (f *FileCache) openLog(idx *Index) (w *os.File, err error) {
  265. if w, err = os.OpenFile(idx.Name, os.O_RDONLY, 0666); err != nil {
  266. log.Error("os.OpenFile(%s) error(%v)", idx.Name, err)
  267. return
  268. }
  269. if _, err = w.Seek(idx.Offset, os.SEEK_SET); err != nil {
  270. log.Error("f.Seek(%d) error(%v)", idx.Offset, err)
  271. return
  272. }
  273. return
  274. }
  275. // watch watch
  276. func (f *FileCache) watch() (err error) {
  277. if f.wh, err = fsnotify.NewWatcher(); err != nil {
  278. log.Error("fsnotify.NewWatcher() error(%v)", err)
  279. return
  280. }
  281. if err = f.wh.Add(f.c.Storage); err != nil {
  282. log.Error("wh.Watch(%s) error(%v)", err)
  283. }
  284. return
  285. }
  286. // watchproc observe the directory file changes
  287. func (f *FileCache) watchproc() {
  288. var evt fsnotify.Event
  289. for {
  290. evt = <-f.wh.Events
  291. if evt.Op&fsnotify.Create == fsnotify.Create {
  292. if !strings.HasSuffix(evt.Name, f.c.Suffix) {
  293. log.Warn("create invalid file: %s", evt.Name)
  294. continue
  295. }
  296. fi, err := os.Stat(evt.Name)
  297. if err != nil {
  298. log.Error("os.Stat(%s) error(%v)", evt.Name, err)
  299. continue
  300. }
  301. f.eLock.Lock()
  302. f.logs[evt.Name] = fi
  303. f.eLock.Unlock()
  304. log.Info("create file: %s", evt.Name)
  305. }
  306. if evt.Op&fsnotify.Remove == fsnotify.Remove {
  307. f.eLock.Lock()
  308. delete(f.logs, evt.Name)
  309. f.eLock.Unlock()
  310. log.Info("remove file: %s", evt.Name)
  311. }
  312. }
  313. }
  314. // setIndex setIndex
  315. func (f *FileCache) setIndex(idx *Index) (err error) {
  316. w, err := os.OpenFile(f.c.Index, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
  317. if err != nil {
  318. log.Error("os.OpenFile(%s) error(%v)", f.c.Index, err)
  319. return
  320. }
  321. defer w.Close()
  322. b, err := json.Marshal(idx)
  323. if err != nil {
  324. log.Error("json.Marshal(%v)", idx)
  325. return
  326. }
  327. if _, err = w.Write(b); err != nil {
  328. log.Error("f.Write(%s) error(%v)", b, err)
  329. }
  330. return
  331. }
  332. // tailLog check the log format and get log from reader
  333. func (f *FileCache) tailLog(rr *bufio.Reader) (b []byte, err error) {
  334. var (
  335. t []byte
  336. )
  337. // peek magic
  338. for {
  339. if b, err = rr.Peek(_logMagicSize); err != nil {
  340. return
  341. }
  342. if bytes.Equal(b, logMagic) {
  343. break
  344. }
  345. rr.Discard(1)
  346. }
  347. // peek length
  348. if t, err = rr.Peek(_logHeadSize); err != nil {
  349. if err != io.EOF {
  350. log.Error("rr.Peek(len:%d) error(%v)", _logLenSize, err)
  351. }
  352. return
  353. }
  354. // peek body
  355. l := int(binary.BigEndian.Uint32(t[_logMagicSize:_logHeadSize]))
  356. if t, err = rr.Peek(_logHeadSize + l); err != nil {
  357. if err != io.EOF {
  358. log.Error("rr.Peek(%d) error(%v)", l, err)
  359. }
  360. return
  361. }
  362. b = t[_logHeadSize:]
  363. rr.Discard(l + _logHeadSize)
  364. return
  365. }
  366. // readproc read data and encapsulation protocol from file
  367. func (f *FileCache) readProcess() {
  368. var (
  369. err error
  370. idx *Index
  371. rr = bufio.NewReaderSize(nil, f.c.ReadBuffer)
  372. lastTime int64
  373. length int
  374. cur *os.File
  375. )
  376. if idx, cur, err = f.loadRemain(); err == nil {
  377. rr.Reset(cur)
  378. }
  379. for {
  380. if time.Now().Unix()-lastTime > 5 {
  381. if idx != nil {
  382. f.setIndex(idx)
  383. }
  384. lastTime = time.Now().Unix()
  385. }
  386. f.eLock.RLock()
  387. length = len(f.logs)
  388. f.eLock.RUnlock()
  389. // check is available for observing file
  390. if length == 0 {
  391. if cur != nil {
  392. cur.Close()
  393. cur = nil
  394. }
  395. time.Sleep(time.Second * 1)
  396. continue
  397. }
  398. // read first file from observing logs
  399. if cur == nil {
  400. next := f.nextReadFile()
  401. idx = &Index{
  402. Name: next,
  403. Updated: time.Now().Format(_formatUpdated),
  404. }
  405. if cur, err = f.openLog(idx); err != nil {
  406. log.Error("a.openLog(%v) error(%v)", idx, err)
  407. continue
  408. }
  409. rr.Reset(cur)
  410. f.setIndex(idx)
  411. }
  412. // tail a log from thos.OpenFilee buffer
  413. b, err := f.tailLog(rr)
  414. if err != nil {
  415. if err == io.EOF {
  416. if length > 1 {
  417. cur.Close()
  418. cur = nil
  419. os.Remove(idx.Name)
  420. f.eLock.Lock()
  421. delete(f.logs, idx.Name)
  422. f.eLock.Unlock()
  423. } else {
  424. time.Sleep(time.Second * 1)
  425. }
  426. continue
  427. }
  428. log.Error("read log error(%v)", err)
  429. rr.Discard(1)
  430. continue
  431. }
  432. idx.Offset += int64(len(b)) + _logHeadSize
  433. if len(b) <= _logLancerHeaderLen {
  434. continue
  435. }
  436. e := event.GetEvent()
  437. e.Write(b[_logLancerHeaderLen:])
  438. e.LogId = string(b[:_logIdSize])
  439. f.readChan <- e
  440. }
  441. }
  442. // check storage size
  443. func (f *FileCache) checkStorageSize() {
  444. var size int64
  445. if entries, err := ioutil.ReadDir(f.c.Storage); err == nil {
  446. for _, entry := range entries {
  447. if !entry.IsDir() {
  448. size += entry.Size()
  449. }
  450. }
  451. }
  452. if size > int64(f.c.StorageMaxMB*1024*1024) {
  453. log.Error("storage is full, discard log")
  454. flowmonitor.Fm.Add("log-agent", "log-agent.output.file-cache", strconv.FormatInt(time.Now().Unix()/100*100, 10), "ERROR", "storage full")
  455. f.storageFull = true
  456. } else {
  457. f.storageFull = false
  458. }
  459. }
  460. func (f *FileCache) nextFileName() string {
  461. return path.Join(f.c.Storage, strconv.FormatInt(time.Now().Unix(), 10)+f.c.Suffix)
  462. }
  463. // nextFile set first log filename.
  464. // sorted by name.
  465. func (f *FileCache) nextFile() (err error) {
  466. f.next <- f.nextFileName()
  467. return
  468. }