lancer.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package common
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "go-common/library/log"
  8. "net"
  9. "runtime"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. const (
  16. LancerMinimumLength = 6
  17. LancerLengthPartBegin = 2
  18. LancerLengthPartEnd = 6
  19. )
  20. type LancerLogStream struct {
  21. conn net.Conn
  22. addr string
  23. pool sync.Pool
  24. dataChannel chan *LancerData
  25. quit chan struct{}
  26. wg sync.WaitGroup
  27. splitter string
  28. replacer string
  29. timeout time.Duration
  30. }
  31. type LancerData struct {
  32. bytes.Buffer
  33. logid string
  34. isAppend bool
  35. lancer *LancerLogStream
  36. }
  37. func NewLancerLogStream(address string, capacity int, timeout time.Duration) *LancerLogStream {
  38. c, err := net.DialTimeout("tcp", address, timeout)
  39. if err != nil {
  40. log.Error("[Lancer]Dial lancer %s error:%+v", address, err)
  41. c = nil
  42. }
  43. lancer := &LancerLogStream{
  44. conn: c,
  45. addr: address,
  46. pool: sync.Pool{
  47. New: func() interface{} {
  48. return new(LancerData)
  49. },
  50. },
  51. dataChannel: make(chan *LancerData, capacity),
  52. quit: make(chan struct{}),
  53. splitter: "\u0001",
  54. replacer: "|",
  55. timeout: timeout,
  56. }
  57. lancer.wg.Add(1)
  58. go lancer.processor()
  59. return lancer
  60. }
  61. func (lancer *LancerLogStream) Close() {
  62. close(lancer.dataChannel)
  63. close(lancer.quit)
  64. lancer.wg.Wait()
  65. }
  66. func (lancer *LancerLogStream) processor() {
  67. defer func() {
  68. if lancer.conn != nil {
  69. lancer.conn.Close()
  70. }
  71. lancer.wg.Done()
  72. }()
  73. var lastFail *LancerData
  74. PROCESSOR:
  75. for {
  76. select {
  77. case <-lancer.quit:
  78. return
  79. default:
  80. }
  81. if lastFail != nil {
  82. if err := lancer.write(lastFail.Bytes()); err != nil {
  83. runtime.Gosched()
  84. continue PROCESSOR
  85. }
  86. lancer.pool.Put(lastFail)
  87. lastFail = nil
  88. }
  89. for b := range lancer.dataChannel {
  90. if err := lancer.write(b.Bytes()); err != nil {
  91. lastFail = b
  92. runtime.Gosched()
  93. continue PROCESSOR
  94. }
  95. lancer.pool.Put(b)
  96. }
  97. return
  98. }
  99. }
  100. func (lancer *LancerLogStream) write(b []byte) error {
  101. if lancer.conn == nil {
  102. c, err := net.DialTimeout("tcp", lancer.addr, lancer.timeout)
  103. if err != nil {
  104. log.Error("[Lancer]Dial %s error:%+v", lancer.addr, err)
  105. return err
  106. }
  107. lancer.conn = c
  108. }
  109. _, err := lancer.conn.Write(b)
  110. if err != nil {
  111. log.Error("[Lancer]Conn write error:%+v", err)
  112. lancer.conn.Close()
  113. lancer.conn = nil
  114. }
  115. return err
  116. }
  117. func (lancer *LancerLogStream) NewLancerData(logid string, token string) *LancerData {
  118. ld := lancer.pool.Get().(*LancerData)
  119. ld.Reset()
  120. ld.lancer = lancer
  121. ld.isAppend = false
  122. ld.logid = logid
  123. ld.Write([]byte{0xAC, 0xBE})
  124. ld.Write([]byte{0, 0, 0, 0})
  125. ld.Write([]byte{0, 1})
  126. header := fmt.Sprintf("logId=%s&timestamp=%d&token=%s&version=1.1", logid,
  127. time.Now().UnixNano()/int64(time.Millisecond), token)
  128. headerLength := uint16(len(header))
  129. ld.Write([]byte{byte(headerLength >> 8), byte(headerLength)})
  130. ld.Write([]byte(header))
  131. return ld
  132. }
  133. func (ld *LancerData) PutString(v string) *LancerData {
  134. ld.splitter()
  135. ld.WriteString(strings.Replace(v, ld.lancer.splitter, ld.lancer.replacer, -1))
  136. return ld
  137. }
  138. func (ld *LancerData) PutTimestamp(v time.Time) *LancerData {
  139. return ld.PutInt(v.Unix())
  140. }
  141. func (ld *LancerData) PutUint(v uint64) *LancerData {
  142. ld.splitter()
  143. ld.WriteString(strconv.FormatUint(v, 10))
  144. return ld
  145. }
  146. func (ld *LancerData) PutInt(v int64) *LancerData {
  147. ld.splitter()
  148. ld.WriteString(strconv.FormatInt(v, 10))
  149. return ld
  150. }
  151. func (ld *LancerData) PutFloat(v float64) *LancerData {
  152. ld.splitter()
  153. ld.WriteString(strconv.FormatFloat(v, 'f', -1, 64))
  154. return ld
  155. }
  156. func (ld *LancerData) PutBool(v bool) *LancerData {
  157. ld.splitter()
  158. ld.WriteString(strconv.FormatBool(v))
  159. return ld
  160. }
  161. func (ld *LancerData) splitter() {
  162. if ld.isAppend {
  163. ld.WriteString(ld.lancer.splitter)
  164. }
  165. ld.isAppend = true
  166. }
  167. func (ld *LancerData) Commit() error {
  168. if ld.Len() < LancerMinimumLength {
  169. return errors.New("protocol error")
  170. }
  171. l := uint32(ld.Len()) - LancerMinimumLength
  172. binary.BigEndian.PutUint32(ld.Bytes()[LancerLengthPartBegin:LancerLengthPartEnd], l)
  173. select {
  174. case ld.lancer.dataChannel <- ld:
  175. return nil
  176. default:
  177. ld.lancer.pool.Put(ld)
  178. return errors.New("lancer channel is full")
  179. }
  180. }