123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- package filewriter
- import (
- "bytes"
- "container/list"
- "fmt"
- "io/ioutil"
- "log"
- "os"
- "path/filepath"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // FileWriter create file log writer
- type FileWriter struct {
- opt option
- dir string
- fname string
- ch chan *bytes.Buffer
- stdlog *log.Logger
- pool *sync.Pool
- lastRotateFormat string
- lastSplitNum int
- current *wrapFile
- files *list.List
- closed int32
- wg sync.WaitGroup
- }
- type rotateItem struct {
- rotateTime int64
- rotateNum int
- fname string
- }
- func parseRotateItem(dir, fname, rotateFormat string) (*list.List, error) {
- fis, err := ioutil.ReadDir(dir)
- if err != nil {
- return nil, err
- }
- // parse exists log file filename
- parse := func(s string) (rt rotateItem, err error) {
- // remove filename and left "." error.log.2018-09-12.001 -> 2018-09-12.001
- rt.fname = s
- s = strings.TrimLeft(s[len(fname):], ".")
- seqs := strings.Split(s, ".")
- var t time.Time
- switch len(seqs) {
- case 2:
- if rt.rotateNum, err = strconv.Atoi(seqs[1]); err != nil {
- return
- }
- fallthrough
- case 1:
- if t, err = time.Parse(rotateFormat, seqs[0]); err != nil {
- return
- }
- rt.rotateTime = t.Unix()
- }
- return
- }
- var items []rotateItem
- for _, fi := range fis {
- if strings.HasPrefix(fi.Name(), fname) && fi.Name() != fname {
- rt, err := parse(fi.Name())
- if err != nil {
- // TODO deal with error
- continue
- }
- items = append(items, rt)
- }
- }
- sort.Slice(items, func(i, j int) bool {
- if items[i].rotateTime == items[j].rotateTime {
- return items[i].rotateNum > items[j].rotateNum
- }
- return items[i].rotateTime > items[j].rotateTime
- })
- l := list.New()
- for _, item := range items {
- l.PushBack(item)
- }
- return l, nil
- }
- type wrapFile struct {
- fsize int64
- fp *os.File
- }
- func (w *wrapFile) size() int64 {
- return w.fsize
- }
- func (w *wrapFile) write(p []byte) (n int, err error) {
- n, err = w.fp.Write(p)
- w.fsize += int64(n)
- return
- }
- func newWrapFile(fpath string) (*wrapFile, error) {
- fp, err := os.OpenFile(fpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
- if err != nil {
- return nil, err
- }
- fi, err := fp.Stat()
- if err != nil {
- return nil, err
- }
- return &wrapFile{fp: fp, fsize: fi.Size()}, nil
- }
- // New FileWriter A FileWriter is safe for use by multiple goroutines simultaneously.
- func New(fpath string, fns ...Option) (*FileWriter, error) {
- opt := defaultOption
- for _, fn := range fns {
- fn(&opt)
- }
- fname := filepath.Base(fpath)
- if fname == "" {
- return nil, fmt.Errorf("filename can't empty")
- }
- dir := filepath.Dir(fpath)
- fi, err := os.Stat(dir)
- if err == nil && !fi.IsDir() {
- return nil, fmt.Errorf("%s already exists and not a directory", dir)
- }
- if os.IsNotExist(err) {
- if err = os.MkdirAll(dir, 0755); err != nil {
- return nil, fmt.Errorf("create dir %s error: %s", dir, err.Error())
- }
- }
- current, err := newWrapFile(fpath)
- if err != nil {
- return nil, err
- }
- stdlog := log.New(os.Stderr, "flog ", log.LstdFlags)
- ch := make(chan *bytes.Buffer, opt.ChanSize)
- files, err := parseRotateItem(dir, fname, opt.RotateFormat)
- if err != nil {
- // set files a empty list
- files = list.New()
- stdlog.Printf("parseRotateItem error: %s", err)
- }
- lastRotateFormat := time.Now().Format(opt.RotateFormat)
- var lastSplitNum int
- if files.Len() > 0 {
- rt := files.Front().Value.(rotateItem)
- // check contains is mush esay than compared with timestamp
- if strings.Contains(rt.fname, lastRotateFormat) {
- lastSplitNum = rt.rotateNum
- }
- }
- fw := &FileWriter{
- opt: opt,
- dir: dir,
- fname: fname,
- stdlog: stdlog,
- ch: ch,
- pool: &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }},
- lastSplitNum: lastSplitNum,
- lastRotateFormat: lastRotateFormat,
- files: files,
- current: current,
- }
- fw.wg.Add(1)
- go fw.daemon()
- return fw, nil
- }
- // Write write data to log file, return write bytes is pseudo just for implement io.Writer.
- func (f *FileWriter) Write(p []byte) (int, error) {
- // atomic is not necessary
- if atomic.LoadInt32(&f.closed) == 1 {
- f.stdlog.Printf("%s", p)
- return 0, fmt.Errorf("filewriter already closed")
- }
- // because write to file is asynchronousc,
- // copy p to internal buf prevent p be change on outside
- buf := f.getBuf()
- buf.Write(p)
- if f.opt.WriteTimeout == 0 {
- select {
- case f.ch <- buf:
- return len(p), nil
- default:
- // TODO: write discard log to to stdout?
- return 0, fmt.Errorf("log channel is full, discard log")
- }
- }
- // write log with timeout
- timeout := time.NewTimer(f.opt.WriteTimeout)
- select {
- case f.ch <- buf:
- return len(p), nil
- case <-timeout.C:
- // TODO: write discard log to to stdout?
- return 0, fmt.Errorf("log channel is full, discard log")
- }
- }
- func (f *FileWriter) daemon() {
- // TODO: check aggsbuf size prevent it too big
- aggsbuf := &bytes.Buffer{}
- tk := time.NewTicker(f.opt.RotateInterval)
- // TODO: make it configrable
- aggstk := time.NewTicker(10 * time.Millisecond)
- var err error
- for {
- select {
- case t := <-tk.C:
- f.checkRotate(t)
- case buf, ok := <-f.ch:
- if ok {
- aggsbuf.Write(buf.Bytes())
- f.putBuf(buf)
- }
- case <-aggstk.C:
- if aggsbuf.Len() > 0 {
- if err = f.write(aggsbuf.Bytes()); err != nil {
- f.stdlog.Printf("write log error: %s", err)
- }
- aggsbuf.Reset()
- }
- }
- if atomic.LoadInt32(&f.closed) != 1 {
- continue
- }
- // read all buf from channel and break loop
- if err = f.write(aggsbuf.Bytes()); err != nil {
- f.stdlog.Printf("write log error: %s", err)
- }
- for buf := range f.ch {
- if err = f.write(buf.Bytes()); err != nil {
- f.stdlog.Printf("write log error: %s", err)
- }
- f.putBuf(buf)
- }
- break
- }
- f.wg.Done()
- }
- // Close close file writer
- func (f *FileWriter) Close() error {
- atomic.StoreInt32(&f.closed, 1)
- close(f.ch)
- f.wg.Wait()
- return nil
- }
- func (f *FileWriter) checkRotate(t time.Time) {
- formatFname := func(format string, num int) string {
- if num == 0 {
- return fmt.Sprintf("%s.%s", f.fname, format)
- }
- return fmt.Sprintf("%s.%s.%03d", f.fname, format, num)
- }
- format := t.Format(f.opt.RotateFormat)
- if f.opt.MaxFile != 0 {
- for f.files.Len() > f.opt.MaxFile {
- rt := f.files.Remove(f.files.Front()).(rotateItem)
- fpath := filepath.Join(f.dir, rt.fname)
- if err := os.Remove(fpath); err != nil {
- f.stdlog.Printf("remove file %s error: %s", fpath, err)
- }
- }
- }
- if format != f.lastRotateFormat || (f.opt.MaxSize != 0 && f.current.size() > f.opt.MaxSize) {
- var err error
- // close current file first
- if err = f.current.fp.Close(); err != nil {
- f.stdlog.Printf("close current file error: %s", err)
- }
- // rename file
- fname := formatFname(f.lastRotateFormat, f.lastSplitNum)
- oldpath := filepath.Join(f.dir, f.fname)
- newpath := filepath.Join(f.dir, fname)
- if err = os.Rename(oldpath, newpath); err != nil {
- f.stdlog.Printf("rename file %s to %s error: %s", oldpath, newpath, err)
- return
- }
- f.files.PushBack(rotateItem{fname: fname /*rotateNum: f.lastSplitNum, rotateTime: t.Unix() unnecessary*/})
- if format != f.lastRotateFormat {
- f.lastRotateFormat = format
- f.lastSplitNum = 0
- } else {
- f.lastSplitNum++
- }
- // recreate current file
- f.current, err = newWrapFile(filepath.Join(f.dir, f.fname))
- if err != nil {
- f.stdlog.Printf("create log file error: %s", err)
- }
- }
- }
- func (f *FileWriter) write(p []byte) error {
- // f.current may be nil, if newWrapFile return err in checkRotate, redirect log to stderr
- if f.current == nil {
- f.stdlog.Printf("can't write log to file, please check stderr log for detail")
- f.stdlog.Printf("%s", p)
- }
- _, err := f.current.write(p)
- return err
- }
- func (f *FileWriter) putBuf(buf *bytes.Buffer) {
- buf.Reset()
- f.pool.Put(buf)
- }
- func (f *FileWriter) getBuf() *bytes.Buffer {
- return f.pool.Get().(*bytes.Buffer)
- }
|