deliver.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package deliver
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "sync"
  8. "time"
  9. "go-common/library/log"
  10. )
  11. var (
  12. _magicBuf = []byte{0xAC, 0xBE}
  13. _bufpool sync.Pool
  14. )
  15. func init() {
  16. rand.Seed(time.Now().UnixNano())
  17. _bufpool = sync.Pool{New: func() interface{} {
  18. return make([]byte, 0, 4096)
  19. }}
  20. }
  21. func freeBuf(buf []byte) {
  22. buf = buf[:0]
  23. _bufpool.Put(buf)
  24. }
  25. func getBuf() []byte {
  26. return _bufpool.Get().([]byte)
  27. }
  28. // Deliver deliver span to dapper-service through tcp
  29. type Deliver struct {
  30. servers []string
  31. readFn func() ([]byte, error)
  32. conn *net.TCPConn
  33. dataCh chan []byte
  34. closeCh chan struct{}
  35. closed bool
  36. }
  37. // New Deliver
  38. func New(servers []string, readFn func() ([]byte, error)) (*Deliver, error) {
  39. if len(servers) == 0 {
  40. return nil, fmt.Errorf("no server provide")
  41. }
  42. d := &Deliver{
  43. servers: servers,
  44. readFn: readFn,
  45. closeCh: make(chan struct{}, 1),
  46. dataCh: make(chan []byte),
  47. }
  48. return d, d.start()
  49. }
  50. func (d *Deliver) start() error {
  51. if err := d.dial(); err != nil {
  52. return err
  53. }
  54. go d.fetch()
  55. go d.loop()
  56. return nil
  57. }
  58. func (d *Deliver) fetch() {
  59. for {
  60. if d.closed {
  61. return
  62. }
  63. data, err := d.readFn()
  64. if err != nil {
  65. log.Error("deliver read data error: %s", err)
  66. continue
  67. }
  68. d.dataCh <- data
  69. }
  70. }
  71. func (d *Deliver) loop() {
  72. for {
  73. select {
  74. case <-d.closeCh:
  75. return
  76. case data := <-d.dataCh:
  77. data = warpData(data)
  78. send:
  79. _, err := d.conn.Write(data)
  80. if err == nil {
  81. freeBuf(data)
  82. continue
  83. }
  84. d.reDial()
  85. goto send
  86. }
  87. }
  88. }
  89. // Close deliver
  90. func (d *Deliver) Close() error {
  91. if d.closed {
  92. return fmt.Errorf("already closed")
  93. }
  94. d.closed = true
  95. d.closeCh <- struct{}{}
  96. timer := time.NewTimer(50 * time.Millisecond)
  97. select {
  98. case data := <-d.dataCh:
  99. // write last data to conn
  100. _, err := d.conn.Write(data)
  101. return fmt.Errorf("write last data error: %s", err)
  102. case <-timer.C:
  103. return nil
  104. }
  105. return nil
  106. }
  107. func (d *Deliver) reDial() {
  108. if d.conn != nil {
  109. d.conn.Close()
  110. }
  111. for {
  112. if err := d.dial(); err != nil {
  113. log.Error("redial error: %s, retry after second", err)
  114. time.Sleep(time.Second)
  115. }
  116. break
  117. }
  118. }
  119. func (d *Deliver) dial() error {
  120. server := chioceServer(d.servers)
  121. conn, err := net.Dial("tcp", server)
  122. if err != nil {
  123. return fmt.Errorf("dial tcp://%s error: %s", server, err)
  124. }
  125. d.conn = conn.(*net.TCPConn)
  126. d.conn.SetKeepAlive(true)
  127. return nil
  128. }
  129. func chioceServer(servers []string) string {
  130. return servers[rand.Intn(len(servers))]
  131. }
  132. func warpData(data []byte) []byte {
  133. buf := getBuf()
  134. buf = append(buf, _magicBuf...)
  135. buf = append(buf, []byte{0, 0, 0, 0, 0, 0}...)
  136. binary.BigEndian.PutUint32(buf[2:6], uint32(len(data)+2))
  137. buf = append(buf, data...)
  138. return buf
  139. }