sock.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package sock
  2. import (
  3. "bufio"
  4. "runtime"
  5. "errors"
  6. "net"
  7. "os"
  8. "time"
  9. "io"
  10. "path/filepath"
  11. "fmt"
  12. "context"
  13. "strings"
  14. "go-common/library/log"
  15. "go-common/app/service/ops/log-agent/input"
  16. "go-common/app/service/ops/log-agent/event"
  17. "go-common/app/service/ops/log-agent/pkg/flowmonitor"
  18. "go-common/app/service/ops/log-agent/pkg/lancermonitor"
  19. "go-common/app/service/ops/log-agent/pkg/lancerroute"
  20. "github.com/BurntSushi/toml"
  21. )
  22. const (
  23. _logIdLen = 6
  24. _logLancerHeaderLen = 19
  25. _logSeparator = byte('\u0001')
  26. )
  27. var (
  28. // ErrInvalidAddr invalid address.
  29. ErrInvalidAddr = errors.New("invalid address")
  30. // logMagic log magic.
  31. logMagic = []byte{0xAC, 0xBE}
  32. local, _ = time.LoadLocation("Local")
  33. )
  34. func init() {
  35. err := input.Register("sock", NewSock)
  36. if err != nil {
  37. panic(err)
  38. }
  39. }
  40. type Sock struct {
  41. c *Config
  42. output chan<- *event.ProcessorEvent
  43. readChan chan *event.ProcessorEvent
  44. ctx context.Context
  45. cancel context.CancelFunc
  46. closed bool
  47. }
  48. func NewSock(ctx context.Context, config interface{}, output chan<- *event.ProcessorEvent) (input.Input, error) {
  49. sock := new(Sock)
  50. if c, ok := config.(*Config); !ok {
  51. return nil, fmt.Errorf("Error config for Sock Input")
  52. } else {
  53. if err := c.ConfigValidate(); err != nil {
  54. return nil, err
  55. }
  56. sock.c = c
  57. }
  58. sock.output = output
  59. sock.ctx, sock.cancel = context.WithCancel(ctx)
  60. sock.readChan = make(chan *event.ProcessorEvent, sock.c.ReadChanSize)
  61. sock.closed = false
  62. return sock, nil
  63. }
  64. func DecodeConfig(md toml.MetaData, primValue toml.Primitive) (c interface{}, err error) {
  65. c = new(Config)
  66. if err = md.PrimitiveDecode(primValue, c); err != nil {
  67. return nil, err
  68. }
  69. return c, nil
  70. }
  71. func (s *Sock) Run() (err error) {
  72. s.listen()
  73. go s.writetoProcessor()
  74. return nil
  75. }
  76. func (s *Sock) Stop() {
  77. s.closed = true
  78. s.cancel()
  79. }
  80. func (s *Sock) Ctx() context.Context {
  81. return s.ctx
  82. }
  83. //listen listen unix socket to buffer
  84. func (s *Sock) listen() {
  85. // SOCK_DGRAM
  86. if s.c.UdpAddr != "" {
  87. if flag, _ := pathExists(s.c.UdpAddr); flag {
  88. os.Remove(s.c.UdpAddr)
  89. }
  90. if flag, _ := pathExists(filepath.Dir(s.c.UdpAddr)); !flag {
  91. os.Mkdir(filepath.Dir(s.c.UdpAddr), os.ModePerm)
  92. }
  93. udpconn, err := net.ListenPacket("unixgram", s.c.UdpAddr)
  94. if err != nil {
  95. panic(err)
  96. }
  97. if err := os.Chmod(s.c.UdpAddr, 0777); err != nil {
  98. panic(err)
  99. }
  100. log.Info("start listen: %s", s.c.UdpAddr)
  101. for i := 0; i < runtime.NumCPU(); i++ {
  102. go s.udpread(udpconn)
  103. }
  104. }
  105. // SOCK_SEQPACKET
  106. if s.c.TcpAddr != "" {
  107. if flag, _ := pathExists(s.c.TcpAddr); flag {
  108. os.Remove(s.c.TcpAddr)
  109. }
  110. if flag, _ := pathExists(filepath.Dir(s.c.TcpAddr)); !flag {
  111. os.Mkdir(filepath.Dir(s.c.TcpAddr), os.ModePerm)
  112. }
  113. tcplistener, err := net.Listen("unixpacket", s.c.TcpAddr)
  114. if err != nil {
  115. panic(err)
  116. }
  117. if err := os.Chmod(s.c.TcpAddr, 0777); err != nil {
  118. panic(err)
  119. }
  120. log.Info("start listen: %s", s.c.TcpAddr)
  121. go s.tcpread(tcplistener)
  122. }
  123. }
  124. func (s *Sock) writeToReadChan(e *event.ProcessorEvent) {
  125. lancermonitor.IncreaseLogCount("agent.receive.count", e.LogId)
  126. select {
  127. case s.readChan <- e:
  128. default:
  129. event.PutEvent(e)
  130. flowmonitor.Fm.AddEvent(e, "log-agent.input.sock", "ERROR", "read chan full")
  131. log.Warn("sock read chan full, discard log")
  132. }
  133. }
  134. // tcpread accept tcp connection
  135. func (s *Sock) tcpread(listener net.Listener) {
  136. for {
  137. conn, err := listener.Accept()
  138. if err != nil {
  139. continue
  140. }
  141. go s.handleTcpConn(conn)
  142. }
  143. }
  144. // process single tcp connection
  145. func (s *Sock) handleTcpConn(conn net.Conn) {
  146. defer conn.Close()
  147. rd := bufio.NewReaderSize(conn, s.c.TcpBatchMaxBytes)
  148. for {
  149. conn.SetReadDeadline(time.Now().Add(time.Duration(s.c.TcpReadTimeout)))
  150. b, err := rd.ReadSlice(_logSeparator)
  151. if err == nil && len(b) <= _logLancerHeaderLen {
  152. continue
  153. }
  154. if err == nil {
  155. e := s.preproccess(b[:len(b)-1])
  156. if e != nil {
  157. s.writeToReadChan(e)
  158. flowmonitor.Fm.AddEvent(e, "log-agent.input.sock", "OK", "received")
  159. continue
  160. }
  161. }
  162. // conn closed and return EOF
  163. if err == io.EOF {
  164. e := s.preproccess(b)
  165. if e != nil {
  166. s.writeToReadChan(e)
  167. flowmonitor.Fm.AddEvent(e, "log-agent.input.sock", "OK", "received")
  168. continue
  169. }
  170. log.Info("get EOF from conn, close conn")
  171. return
  172. }
  173. log.Error("read from tcp conn error(%v). close conn", err)
  174. return
  175. }
  176. }
  177. //read read from unix socket conn
  178. func (s *Sock) udpread(conn net.PacketConn) {
  179. b := make([]byte, s.c.UdpPacketMaxSize)
  180. for {
  181. conn.SetReadDeadline(time.Now().Add(time.Duration(s.c.UdpReadTimeout)))
  182. l, _, err := conn.ReadFrom(b)
  183. if err != nil && !strings.Contains(err.Error(), "i/o timeout") {
  184. log.Error("conn.ReadFrom() error(%v)", err)
  185. continue
  186. }
  187. e := s.preproccess(b[:l])
  188. if e != nil {
  189. flowmonitor.Fm.AddEvent(e, "log-agent.input.sock", "OK", "received")
  190. s.writeToReadChan(e)
  191. }
  192. }
  193. }
  194. func (s *Sock) preproccess(b []byte) *event.ProcessorEvent {
  195. if len(b) <= _logLancerHeaderLen {
  196. return nil
  197. }
  198. e := event.GetEvent()
  199. e.LogId = string(b[:_logIdLen])
  200. e.Destination = lancerroute.GetLancerByLogid(e.LogId)
  201. e.Write(b[_logLancerHeaderLen:])
  202. e.Source = "sock"
  203. flowmonitor.Fm.AddEvent(e, "log-agent.input.sock", "OK", "received")
  204. return e
  205. }
  206. // pathExists judge if the file exists
  207. func pathExists(path string) (bool, error) {
  208. _, err := os.Stat(path)
  209. if err == nil {
  210. return true, nil
  211. }
  212. if os.IsNotExist(err) {
  213. return false, nil
  214. }
  215. return false, err
  216. }
  217. func (s *Sock) writetoProcessor() {
  218. for {
  219. select {
  220. case e := <-s.readChan:
  221. s.output <- e
  222. case <-s.ctx.Done():
  223. return
  224. }
  225. }
  226. }