package infoc import ( "bytes" "context" "encoding/binary" "errors" "fmt" "net" "strconv" "strings" "sync" "time" "go-common/library/log" "go-common/library/net/metadata" "go-common/library/net/netutil" xtime "go-common/library/time" ) const ( _infocSpliter = "\001" _infocReplacer = "|" _infocLenStart = 2 _infocLenEnd = 6 _protocolLen = 6 _infocTimeout = 50 * time.Millisecond ) var ( _infocMagic = []byte{172, 190} // NOTE: magic 0xAC0xBE _infocHeaderLen = []byte{0, 0, 0, 0} // NOTE: body len placeholder _infocType = []byte{0, 0} // NOTE: type 0 _maxRetry = 10 // ErrFull error chan buffer full. ErrFull = errors.New("infoc: chan buffer full") ) var ( // ClientWeb ... ClientWeb = "web" ClientIphone = "iphone" ClientIpad = "ipad" ClientAndroid = "android" ItemTypeAv = "av" ItemTypeBangumi = "bangumi" ItemTypeLive = "live" ItemTypeTopic = "topic" ItemTypeRank = "rank" ItemTypeActivity = "activity" ItemTypeTag = "tag" ItemTypeAD = "ad" ItemTypeLV = "lv" ActionClick = "click" ActionPlay = "play" ActionFav = "fav" ActionCoin = "coin" ActionDM = "dm" ActionToView = "toview" ActionShare = "share" ActionSpace = "space" Actionfollow = "follow" ActionHeartbeat = "heartbeat" ActionAnswer = "answer" ) // Config is infoc config. type Config struct { TaskID string // udp or tcp Proto string Addr string ChanSize int DialTimeout xtime.Duration WriteTimeout xtime.Duration } // Infoc infoc struct. type Infoc struct { c *Config header []byte msgs chan *bytes.Buffer dialTimeout time.Duration writeTimeout time.Duration pool sync.Pool waiter sync.WaitGroup } // New new infoc logger. func New(c *Config) (i *Infoc) { i = &Infoc{ c: c, header: []byte(c.TaskID), msgs: make(chan *bytes.Buffer, c.ChanSize), dialTimeout: time.Duration(c.DialTimeout), writeTimeout: time.Duration(c.WriteTimeout), pool: sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, }, } if i.dialTimeout == 0 { i.dialTimeout = _infocTimeout } if i.writeTimeout == 0 { i.writeTimeout = _infocTimeout } i.waiter.Add(1) go i.writeproc() return } func (i *Infoc) buf() *bytes.Buffer { return i.pool.Get().(*bytes.Buffer) } func (i *Infoc) freeBuf(buf *bytes.Buffer) { buf.Reset() i.pool.Put(buf) } // Info record log to file. func (i *Infoc) Info(args ...interface{}) (err error) { err, res := i.info(args...) if err != nil { return } select { case i.msgs <- res: default: i.freeBuf(res) err = ErrFull } return } // Infov support filter mirror request func (i *Infoc) Infov(ctx context.Context, args ...interface{}) (err error) { if metadata.Bool(ctx, metadata.Mirror) { return } return i.Info(args...) } func getValue(i interface{}) (s string) { switch v := i.(type) { case int: s = strconv.FormatInt(int64(v), 10) case int64: s = strconv.FormatInt(v, 10) case string: s = v case bool: s = strconv.FormatBool(v) default: s = fmt.Sprint(i) } return } // Close close the connection. func (i *Infoc) Close() error { i.msgs <- nil i.waiter.Wait() return nil } // writeproc write data into connection. func (i *Infoc) writeproc() { var ( msg *bytes.Buffer conn net.Conn err error ) bc := netutil.BackoffConfig{ MaxDelay: 15 * time.Second, BaseDelay: 1.0 * time.Second, Factor: 1.6, Jitter: 0.2, } for { if msg = <-i.msgs; msg == nil { break // quit infoc writeproc } var j int for j = 0; j < _maxRetry; j++ { if conn == nil || err != nil { if conn, err = net.DialTimeout(i.c.Proto, i.c.Addr, i.dialTimeout); err != nil { log.Error("infoc net dial error(%v)", err) time.Sleep(bc.Backoff(j)) continue } } if i.writeTimeout != 0 { conn.SetWriteDeadline(time.Now().Add(i.writeTimeout)) } if _, err = conn.Write(msg.Bytes()); err != nil { log.Error("infoc conn write error(%v)", err) conn.Close() time.Sleep(bc.Backoff(j)) continue } break } if j == _maxRetry { log.Error("infoc reached max retry times") } i.freeBuf(msg) } i.waiter.Done() if conn != nil && err == nil { conn.Close() } } func (i *Infoc) info(args ...interface{}) (err error, buf *bytes.Buffer) { if len(args) == 0 { return nil, nil } res := i.buf() res.Write(_infocMagic) // type and body buf, for calc length. res.Write(_infocHeaderLen) // placeholder res.Write(_infocType) res.Write(i.header) res.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10)) // // append first arg _, err = res.WriteString(getValue(args[0])) for _, arg := range args[1:] { // append ,arg res.WriteString(_infocSpliter) _, err = res.WriteString(strings.Replace(getValue(arg), _infocSpliter, _infocReplacer, -1)) } if err != nil { i.freeBuf(res) return } bs := res.Bytes() binary.BigEndian.PutUint32(bs[_infocLenStart:_infocLenEnd], uint32(res.Len()-_protocolLen)) buf = res return }