report.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package trace
  2. import (
  3. "fmt"
  4. "net"
  5. "os"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. // MaxPackageSize .
  11. _maxPackageSize = 1024 * 32
  12. // safe udp package size // MaxPackageSize = 508 _dataChSize = 4096)
  13. // max memory usage 1024 * 32 * 4096 -> 128MB
  14. _dataChSize = 4096
  15. _defaultWriteChannalTimeout = 50 * time.Millisecond
  16. _defaultWriteTimeout = 200 * time.Millisecond
  17. )
  18. // reporter trace reporter.
  19. type reporter interface {
  20. WriteSpan(sp *span) error
  21. Close() error
  22. }
  23. // newReport with network address
  24. func newReport(network, address string, timeout time.Duration, protocolVersion int32) reporter {
  25. if timeout == 0 {
  26. timeout = _defaultWriteTimeout
  27. }
  28. report := &connReport{
  29. network: network,
  30. address: address,
  31. dataCh: make(chan []byte, _dataChSize),
  32. done: make(chan struct{}),
  33. timeout: timeout,
  34. version: protocolVersion,
  35. }
  36. go report.daemon()
  37. return report
  38. }
  39. type connReport struct {
  40. version int32
  41. rmx sync.RWMutex
  42. closed bool
  43. network, address string
  44. dataCh chan []byte
  45. conn net.Conn
  46. done chan struct{}
  47. timeout time.Duration
  48. }
  49. func (c *connReport) daemon() {
  50. for b := range c.dataCh {
  51. c.send(b)
  52. }
  53. c.done <- struct{}{}
  54. }
  55. func (c *connReport) WriteSpan(sp *span) error {
  56. data, err := marshalSpan(sp, c.version)
  57. if err != nil {
  58. return err
  59. }
  60. return c.writePackage(data)
  61. }
  62. func (c *connReport) writePackage(data []byte) error {
  63. c.rmx.RLock()
  64. defer c.rmx.RUnlock()
  65. if c.closed {
  66. return fmt.Errorf("report already closed")
  67. }
  68. if len(data) > _maxPackageSize {
  69. return fmt.Errorf("package too large length %d > %d", len(data), _maxPackageSize)
  70. }
  71. select {
  72. case c.dataCh <- data:
  73. return nil
  74. case <-time.After(_defaultWriteChannalTimeout):
  75. return fmt.Errorf("write to data channel timeout")
  76. }
  77. }
  78. func (c *connReport) Close() error {
  79. c.rmx.Lock()
  80. c.closed = true
  81. c.rmx.Unlock()
  82. t := time.NewTimer(time.Second)
  83. close(c.dataCh)
  84. select {
  85. case <-t.C:
  86. c.closeConn()
  87. return fmt.Errorf("close report timeout force close")
  88. case <-c.done:
  89. return c.closeConn()
  90. }
  91. }
  92. func (c *connReport) send(data []byte) {
  93. if c.conn == nil {
  94. if err := c.reconnect(); err != nil {
  95. c.Errorf("connect error: %s retry after second", err)
  96. time.Sleep(time.Second)
  97. return
  98. }
  99. }
  100. c.conn.SetWriteDeadline(time.Now().Add(100 * time.Microsecond))
  101. if _, err := c.conn.Write(data); err != nil {
  102. c.Errorf("write to conn error: %s, close connect", err)
  103. c.conn.Close()
  104. c.conn = nil
  105. }
  106. }
  107. func (c *connReport) reconnect() (err error) {
  108. c.conn, err = net.DialTimeout(c.network, c.address, c.timeout)
  109. return
  110. }
  111. func (c *connReport) closeConn() error {
  112. if c.conn != nil {
  113. return c.conn.Close()
  114. }
  115. return nil
  116. }
  117. func (c *connReport) Errorf(format string, args ...interface{}) {
  118. fmt.Fprintf(os.Stderr, format+"\n", args...)
  119. }