filewriter.go 7.7 KB


  1. package filewriter
  2. import (
  3. "bytes"
  4. "container/list"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "os"
  9. "path/filepath"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. // FileWriter create file log writer
  18. type FileWriter struct {
  19. opt option
  20. dir string
  21. fname string
  22. ch chan *bytes.Buffer
  23. stdlog *log.Logger
  24. pool *sync.Pool
  25. lastRotateFormat string
  26. lastSplitNum int
  27. current *wrapFile
  28. files *list.List
  29. closed int32
  30. wg sync.WaitGroup
  31. }
  32. type rotateItem struct {
  33. rotateTime int64
  34. rotateNum int
  35. fname string
  36. }
  37. func parseRotateItem(dir, fname, rotateFormat string) (*list.List, error) {
  38. fis, err := ioutil.ReadDir(dir)
  39. if err != nil {
  40. return nil, err
  41. }
  42. // parse exists log file filename
  43. parse := func(s string) (rt rotateItem, err error) {
  44. // remove filename and left "." error.log.2018-09-12.001 -> 2018-09-12.001
  45. rt.fname = s
  46. s = strings.TrimLeft(s[len(fname):], ".")
  47. seqs := strings.Split(s, ".")
  48. var t time.Time
  49. switch len(seqs) {
  50. case 2:
  51. if rt.rotateNum, err = strconv.Atoi(seqs[1]); err != nil {
  52. return
  53. }
  54. fallthrough
  55. case 1:
  56. if t, err = time.Parse(rotateFormat, seqs[0]); err != nil {
  57. return
  58. }
  59. rt.rotateTime = t.Unix()
  60. }
  61. return
  62. }
  63. var items []rotateItem
  64. for _, fi := range fis {
  65. if strings.HasPrefix(fi.Name(), fname) && fi.Name() != fname {
  66. rt, err := parse(fi.Name())
  67. if err != nil {
  68. // TODO deal with error
  69. continue
  70. }
  71. items = append(items, rt)
  72. }
  73. }
  74. sort.Slice(items, func(i, j int) bool {
  75. if items[i].rotateTime == items[j].rotateTime {
  76. return items[i].rotateNum > items[j].rotateNum
  77. }
  78. return items[i].rotateTime > items[j].rotateTime
  79. })
  80. l := list.New()
  81. for _, item := range items {
  82. l.PushBack(item)
  83. }
  84. return l, nil
  85. }
  86. type wrapFile struct {
  87. fsize int64
  88. fp *os.File
  89. }
  90. func (w *wrapFile) size() int64 {
  91. return w.fsize
  92. }
  93. func (w *wrapFile) write(p []byte) (n int, err error) {
  94. n, err = w.fp.Write(p)
  95. w.fsize += int64(n)
  96. return
  97. }
  98. func newWrapFile(fpath string) (*wrapFile, error) {
  99. fp, err := os.OpenFile(fpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  100. if err != nil {
  101. return nil, err
  102. }
  103. fi, err := fp.Stat()
  104. if err != nil {
  105. return nil, err
  106. }
  107. return &wrapFile{fp: fp, fsize: fi.Size()}, nil
  108. }
  109. // New FileWriter A FileWriter is safe for use by multiple goroutines simultaneously.
  110. func New(fpath string, fns ...Option) (*FileWriter, error) {
  111. opt := defaultOption
  112. for _, fn := range fns {
  113. fn(&opt)
  114. }
  115. fname := filepath.Base(fpath)
  116. if fname == "" {
  117. return nil, fmt.Errorf("filename can't empty")
  118. }
  119. dir := filepath.Dir(fpath)
  120. fi, err := os.Stat(dir)
  121. if err == nil && !fi.IsDir() {
  122. return nil, fmt.Errorf("%s already exists and not a directory", dir)
  123. }
  124. if os.IsNotExist(err) {
  125. if err = os.MkdirAll(dir, 0755); err != nil {
  126. return nil, fmt.Errorf("create dir %s error: %s", dir, err.Error())
  127. }
  128. }
  129. current, err := newWrapFile(fpath)
  130. if err != nil {
  131. return nil, err
  132. }
  133. stdlog := log.New(os.Stderr, "flog ", log.LstdFlags)
  134. ch := make(chan *bytes.Buffer, opt.ChanSize)
  135. files, err := parseRotateItem(dir, fname, opt.RotateFormat)
  136. if err != nil {
  137. // set files a empty list
  138. files = list.New()
  139. stdlog.Printf("parseRotateItem error: %s", err)
  140. }
  141. lastRotateFormat := time.Now().Format(opt.RotateFormat)
  142. var lastSplitNum int
  143. if files.Len() > 0 {
  144. rt := files.Front().Value.(rotateItem)
  145. // check contains is mush esay than compared with timestamp
  146. if strings.Contains(rt.fname, lastRotateFormat) {
  147. lastSplitNum = rt.rotateNum
  148. }
  149. }
  150. fw := &FileWriter{
  151. opt: opt,
  152. dir: dir,
  153. fname: fname,
  154. stdlog: stdlog,
  155. ch: ch,
  156. pool: &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }},
  157. lastSplitNum: lastSplitNum,
  158. lastRotateFormat: lastRotateFormat,
  159. files: files,
  160. current: current,
  161. }
  162. fw.wg.Add(1)
  163. go fw.daemon()
  164. return fw, nil
  165. }
  166. // Write write data to log file, return write bytes is pseudo just for implement io.Writer.
  167. func (f *FileWriter) Write(p []byte) (int, error) {
  168. // atomic is not necessary
  169. if atomic.LoadInt32(&f.closed) == 1 {
  170. f.stdlog.Printf("%s", p)
  171. return 0, fmt.Errorf("filewriter already closed")
  172. }
  173. // because write to file is asynchronousc,
  174. // copy p to internal buf prevent p be change on outside
  175. buf := f.getBuf()
  176. buf.Write(p)
  177. if f.opt.WriteTimeout == 0 {
  178. select {
  179. case f.ch <- buf:
  180. return len(p), nil
  181. default:
  182. // TODO: write discard log to to stdout?
  183. return 0, fmt.Errorf("log channel is full, discard log")
  184. }
  185. }
  186. // write log with timeout
  187. timeout := time.NewTimer(f.opt.WriteTimeout)
  188. select {
  189. case f.ch <- buf:
  190. return len(p), nil
  191. case <-timeout.C:
  192. // TODO: write discard log to to stdout?
  193. return 0, fmt.Errorf("log channel is full, discard log")
  194. }
  195. }
  196. func (f *FileWriter) daemon() {
  197. // TODO: check aggsbuf size prevent it too big
  198. aggsbuf := &bytes.Buffer{}
  199. tk := time.NewTicker(f.opt.RotateInterval)
  200. // TODO: make it configrable
  201. aggstk := time.NewTicker(10 * time.Millisecond)
  202. var err error
  203. for {
  204. select {
  205. case t := <-tk.C:
  206. f.checkRotate(t)
  207. case buf, ok := <-f.ch:
  208. if ok {
  209. aggsbuf.Write(buf.Bytes())
  210. f.putBuf(buf)
  211. }
  212. case <-aggstk.C:
  213. if aggsbuf.Len() > 0 {
  214. if err = f.write(aggsbuf.Bytes()); err != nil {
  215. f.stdlog.Printf("write log error: %s", err)
  216. }
  217. aggsbuf.Reset()
  218. }
  219. }
  220. if atomic.LoadInt32(&f.closed) != 1 {
  221. continue
  222. }
  223. // read all buf from channel and break loop
  224. if err = f.write(aggsbuf.Bytes()); err != nil {
  225. f.stdlog.Printf("write log error: %s", err)
  226. }
  227. for buf := range f.ch {
  228. if err = f.write(buf.Bytes()); err != nil {
  229. f.stdlog.Printf("write log error: %s", err)
  230. }
  231. f.putBuf(buf)
  232. }
  233. break
  234. }
  235. f.wg.Done()
  236. }
  237. // Close close file writer
  238. func (f *FileWriter) Close() error {
  239. atomic.StoreInt32(&f.closed, 1)
  240. close(f.ch)
  241. f.wg.Wait()
  242. return nil
  243. }
  244. func (f *FileWriter) checkRotate(t time.Time) {
  245. formatFname := func(format string, num int) string {
  246. if num == 0 {
  247. return fmt.Sprintf("%s.%s", f.fname, format)
  248. }
  249. return fmt.Sprintf("%s.%s.%03d", f.fname, format, num)
  250. }
  251. format := t.Format(f.opt.RotateFormat)
  252. if f.opt.MaxFile != 0 {
  253. for f.files.Len() > f.opt.MaxFile {
  254. rt := f.files.Remove(f.files.Front()).(rotateItem)
  255. fpath := filepath.Join(f.dir, rt.fname)
  256. if err := os.Remove(fpath); err != nil {
  257. f.stdlog.Printf("remove file %s error: %s", fpath, err)
  258. }
  259. }
  260. }
  261. if format != f.lastRotateFormat || (f.opt.MaxSize != 0 && f.current.size() > f.opt.MaxSize) {
  262. var err error
  263. // close current file first
  264. if err = f.current.fp.Close(); err != nil {
  265. f.stdlog.Printf("close current file error: %s", err)
  266. }
  267. // rename file
  268. fname := formatFname(f.lastRotateFormat, f.lastSplitNum)
  269. oldpath := filepath.Join(f.dir, f.fname)
  270. newpath := filepath.Join(f.dir, fname)
  271. if err = os.Rename(oldpath, newpath); err != nil {
  272. f.stdlog.Printf("rename file %s to %s error: %s", oldpath, newpath, err)
  273. return
  274. }
  275. f.files.PushBack(rotateItem{fname: fname /*rotateNum: f.lastSplitNum, rotateTime: t.Unix() unnecessary*/})
  276. if format != f.lastRotateFormat {
  277. f.lastRotateFormat = format
  278. f.lastSplitNum = 0
  279. } else {
  280. f.lastSplitNum++
  281. }
  282. // recreate current file
  283. f.current, err = newWrapFile(filepath.Join(f.dir, f.fname))
  284. if err != nil {
  285. f.stdlog.Printf("create log file error: %s", err)
  286. }
  287. }
  288. }
  289. func (f *FileWriter) write(p []byte) error {
  290. // f.current may be nil, if newWrapFile return err in checkRotate, redirect log to stderr
  291. if f.current == nil {
  292. f.stdlog.Printf("can't write log to file, please check stderr log for detail")
  293. f.stdlog.Printf("%s", p)
  294. }
  295. _, err := f.current.write(p)
  296. return err
  297. }
  298. func (f *FileWriter) putBuf(buf *bytes.Buffer) {
  299. buf.Reset()
  300. f.pool.Put(buf)
  301. }
  302. func (f *FileWriter) getBuf() *bytes.Buffer {
  303. return f.pool.Get().(*bytes.Buffer)
  304. }