infoc.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. package infoc
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "go-common/library/log"
  14. "go-common/library/net/metadata"
  15. "go-common/library/net/netutil"
  16. xtime "go-common/library/time"
  17. )
  18. const (
  19. _infocSpliter = "\001"
  20. _infocReplacer = "|"
  21. _infocLenStart = 2
  22. _infocLenEnd = 6
  23. _protocolLen = 6
  24. _infocTimeout = 50 * time.Millisecond
  25. )
  26. var (
  27. _infocMagic = []byte{172, 190} // NOTE: magic 0xAC0xBE
  28. _infocHeaderLen = []byte{0, 0, 0, 0} // NOTE: body len placeholder
  29. _infocType = []byte{0, 0} // NOTE: type 0
  30. _maxRetry = 10
  31. // ErrFull error chan buffer full.
  32. ErrFull = errors.New("infoc: chan buffer full")
  33. )
  34. var (
  35. // ClientWeb ...
  36. ClientWeb = "web"
  37. ClientIphone = "iphone"
  38. ClientIpad = "ipad"
  39. ClientAndroid = "android"
  40. ItemTypeAv = "av"
  41. ItemTypeBangumi = "bangumi"
  42. ItemTypeLive = "live"
  43. ItemTypeTopic = "topic"
  44. ItemTypeRank = "rank"
  45. ItemTypeActivity = "activity"
  46. ItemTypeTag = "tag"
  47. ItemTypeAD = "ad"
  48. ItemTypeLV = "lv"
  49. ActionClick = "click"
  50. ActionPlay = "play"
  51. ActionFav = "fav"
  52. ActionCoin = "coin"
  53. ActionDM = "dm"
  54. ActionToView = "toview"
  55. ActionShare = "share"
  56. ActionSpace = "space"
  57. Actionfollow = "follow"
  58. ActionHeartbeat = "heartbeat"
  59. ActionAnswer = "answer"
  60. )
  61. // Config is infoc config.
  62. type Config struct {
  63. TaskID string
  64. // udp or tcp
  65. Proto string
  66. Addr string
  67. ChanSize int
  68. DialTimeout xtime.Duration
  69. WriteTimeout xtime.Duration
  70. }
  71. // Infoc infoc struct.
  72. type Infoc struct {
  73. c *Config
  74. header []byte
  75. msgs chan *bytes.Buffer
  76. dialTimeout time.Duration
  77. writeTimeout time.Duration
  78. pool sync.Pool
  79. waiter sync.WaitGroup
  80. }
  81. // New new infoc logger.
  82. func New(c *Config) (i *Infoc) {
  83. i = &Infoc{
  84. c: c,
  85. header: []byte(c.TaskID),
  86. msgs: make(chan *bytes.Buffer, c.ChanSize),
  87. dialTimeout: time.Duration(c.DialTimeout),
  88. writeTimeout: time.Duration(c.WriteTimeout),
  89. pool: sync.Pool{
  90. New: func() interface{} {
  91. return &bytes.Buffer{}
  92. },
  93. },
  94. }
  95. if i.dialTimeout == 0 {
  96. i.dialTimeout = _infocTimeout
  97. }
  98. if i.writeTimeout == 0 {
  99. i.writeTimeout = _infocTimeout
  100. }
  101. i.waiter.Add(1)
  102. go i.writeproc()
  103. return
  104. }
  105. func (i *Infoc) buf() *bytes.Buffer {
  106. return i.pool.Get().(*bytes.Buffer)
  107. }
  108. func (i *Infoc) freeBuf(buf *bytes.Buffer) {
  109. buf.Reset()
  110. i.pool.Put(buf)
  111. }
  112. // Info record log to file.
  113. func (i *Infoc) Info(args ...interface{}) (err error) {
  114. err, res := i.info(args...)
  115. if err != nil {
  116. return
  117. }
  118. select {
  119. case i.msgs <- res:
  120. default:
  121. i.freeBuf(res)
  122. err = ErrFull
  123. }
  124. return
  125. }
  126. // Infov support filter mirror request
  127. func (i *Infoc) Infov(ctx context.Context, args ...interface{}) (err error) {
  128. if metadata.Bool(ctx, metadata.Mirror) {
  129. return
  130. }
  131. return i.Info(args...)
  132. }
  133. func getValue(i interface{}) (s string) {
  134. switch v := i.(type) {
  135. case int:
  136. s = strconv.FormatInt(int64(v), 10)
  137. case int64:
  138. s = strconv.FormatInt(v, 10)
  139. case string:
  140. s = v
  141. case bool:
  142. s = strconv.FormatBool(v)
  143. default:
  144. s = fmt.Sprint(i)
  145. }
  146. return
  147. }
  148. // Close close the connection.
  149. func (i *Infoc) Close() error {
  150. i.msgs <- nil
  151. i.waiter.Wait()
  152. return nil
  153. }
  154. // writeproc write data into connection.
  155. func (i *Infoc) writeproc() {
  156. var (
  157. msg *bytes.Buffer
  158. conn net.Conn
  159. err error
  160. )
  161. bc := netutil.BackoffConfig{
  162. MaxDelay: 15 * time.Second,
  163. BaseDelay: 1.0 * time.Second,
  164. Factor: 1.6,
  165. Jitter: 0.2,
  166. }
  167. for {
  168. if msg = <-i.msgs; msg == nil {
  169. break // quit infoc writeproc
  170. }
  171. var j int
  172. for j = 0; j < _maxRetry; j++ {
  173. if conn == nil || err != nil {
  174. if conn, err = net.DialTimeout(i.c.Proto, i.c.Addr, i.dialTimeout); err != nil {
  175. log.Error("infoc net dial error(%v)", err)
  176. time.Sleep(bc.Backoff(j))
  177. continue
  178. }
  179. }
  180. if i.writeTimeout != 0 {
  181. conn.SetWriteDeadline(time.Now().Add(i.writeTimeout))
  182. }
  183. if _, err = conn.Write(msg.Bytes()); err != nil {
  184. log.Error("infoc conn write error(%v)", err)
  185. conn.Close()
  186. time.Sleep(bc.Backoff(j))
  187. continue
  188. }
  189. break
  190. }
  191. if j == _maxRetry {
  192. log.Error("infoc reached max retry times")
  193. }
  194. i.freeBuf(msg)
  195. }
  196. i.waiter.Done()
  197. if conn != nil && err == nil {
  198. conn.Close()
  199. }
  200. }
  201. func (i *Infoc) info(args ...interface{}) (err error, buf *bytes.Buffer) {
  202. if len(args) == 0 {
  203. return nil, nil
  204. }
  205. res := i.buf()
  206. res.Write(_infocMagic) // type and body buf, for calc length.
  207. res.Write(_infocHeaderLen) // placeholder
  208. res.Write(_infocType)
  209. res.Write(i.header)
  210. res.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10))
  211. // // append first arg
  212. _, err = res.WriteString(getValue(args[0]))
  213. for _, arg := range args[1:] {
  214. // append ,arg
  215. res.WriteString(_infocSpliter)
  216. _, err = res.WriteString(strings.Replace(getValue(arg), _infocSpliter, _infocReplacer, -1))
  217. }
  218. if err != nil {
  219. i.freeBuf(res)
  220. return
  221. }
  222. bs := res.Bytes()
  223. binary.BigEndian.PutUint32(bs[_infocLenStart:_infocLenEnd], uint32(res.Len()-_protocolLen))
  224. buf = res
  225. return
  226. }