udpcollect.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package udpcollect
  2. import (
  3. "fmt"
  4. "net"
  5. "net/url"
  6. "os"
  7. "path"
  8. "strings"
  9. "sync"
  10. "time"
  11. "go-common/library/log"
  12. )
  13. const (
  14. _bufsize = 32 * 1024
  15. )
  16. // New UnixCollect
  17. func New(addr string, workers int, writeFn func(p []byte) error) (*UDPCollect, error) {
  18. if workers == 0 {
  19. workers = 1
  20. }
  21. addrURL, err := url.Parse(addr)
  22. if err != nil {
  23. return nil, fmt.Errorf("parse addr error: %s", err)
  24. }
  25. return &UDPCollect{
  26. addr: addrURL,
  27. writeFn: writeFn,
  28. workers: workers,
  29. pool: sync.Pool{
  30. New: func() interface{} {
  31. return make([]byte, _bufsize)
  32. },
  33. },
  34. readTimeout: 60 * time.Second,
  35. }, nil
  36. }
  37. // UDPCollect collect span data from unix socket
  38. type UDPCollect struct {
  39. wg sync.WaitGroup
  40. workers int
  41. addr *url.URL
  42. writeFn func(p []byte) error
  43. readTimeout time.Duration
  44. pool sync.Pool
  45. closed bool
  46. pconn net.PacketConn
  47. }
  48. // Start collector
  49. func (u *UDPCollect) Start() error {
  50. var err error
  51. switch u.addr.Scheme {
  52. case "unixgram":
  53. u.pconn, err = listenUNIX(u.addr.Path)
  54. case "udp", "udp4", "udp6":
  55. u.pconn, err = listtenNet(u.addr.Scheme, u.addr.Host)
  56. default:
  57. return fmt.Errorf("unsupport network %s", u.addr.Scheme)
  58. }
  59. if err != nil {
  60. return fmt.Errorf("listen packet error: %s", err)
  61. }
  62. log.Info("dapper agent listen at: %s, workers: %d", u.addr, u.workers)
  63. u.wg.Add(u.workers)
  64. for i := 0; i < u.workers; i++ {
  65. go u.serve()
  66. }
  67. return nil
  68. }
  69. func listenUNIX(addr string) (net.PacketConn, error) {
  70. dirname := path.Dir(addr)
  71. info, err := os.Stat(dirname)
  72. if err != nil {
  73. if !os.IsNotExist(err) {
  74. return nil, err
  75. }
  76. if err := os.MkdirAll(dirname, 0755); err != nil {
  77. return nil, fmt.Errorf("create directory %s error: %s", dirname, err)
  78. }
  79. }
  80. if err == nil && !info.IsDir() {
  81. return nil, fmt.Errorf("%s is already exists and not a directory", dirname)
  82. }
  83. if _, err := os.Stat(addr); err == nil {
  84. // remove old socket file
  85. os.Remove(addr)
  86. }
  87. conn, err := net.ListenPacket("unixgram", addr)
  88. if err != nil {
  89. return nil, err
  90. }
  91. // make file permission to 666, so php can wirte span to this socket
  92. return conn, os.Chmod(addr, 0666)
  93. }
  94. func listtenNet(network, addr string) (net.PacketConn, error) {
  95. return net.ListenPacket(network, addr)
  96. }
  97. func (u *UDPCollect) serve() {
  98. defer u.wg.Done()
  99. for {
  100. if err := u.handler(u.pconn); err != nil {
  101. if strings.Contains(err.Error(), "closed") && u.closed {
  102. return
  103. }
  104. log.Error("handler PacketConn error: %s, retry after second", err)
  105. time.Sleep(time.Second)
  106. }
  107. }
  108. }
  109. func (u *UDPCollect) handler(pconn net.PacketConn) error {
  110. p := u.buffer()
  111. defer u.freeBuffer(p)
  112. pconn.SetReadDeadline(time.Now().Add(u.readTimeout))
  113. n, _, err := pconn.ReadFrom(p)
  114. if n > 0 {
  115. u.writeFn(p[:n])
  116. }
  117. if err == nil {
  118. return nil
  119. }
  120. if netErr, ok := err.(net.Error); ok {
  121. // ignore timeout and temporyary
  122. if netErr.Timeout() || netErr.Temporary() {
  123. return nil
  124. }
  125. }
  126. return err
  127. }
  128. func (u *UDPCollect) buffer() []byte {
  129. return u.pool.Get().([]byte)
  130. }
  131. func (u *UDPCollect) freeBuffer(p []byte) {
  132. u.pool.Put(p)
  133. }
  134. // Close udp collect
  135. func (u *UDPCollect) Close() error {
  136. u.closed = true
  137. u.pconn.Close()
  138. // wait all workers exit
  139. u.wg.Wait()
  140. if u.addr.Scheme == "unixgram" {
  141. return os.Remove(u.addr.Path)
  142. }
  143. return nil
  144. }