123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- package log
- import (
- "context"
- "fmt"
- stdlog "log"
- "net"
- "strconv"
- "sync"
- "time"
- "go-common/library/conf/env"
- "go-common/library/log/internal"
- "go-common/library/net/metadata"
- "go-common/library/net/trace"
- xtime "go-common/library/time"
- )
- const (
- _agentTimeout = xtime.Duration(20 * time.Millisecond)
- _mergeWait = 1 * time.Second
- _maxBuffer = 10 * 1024 * 1024 // 10mb
- _defaultChan = 2048
- _defaultAgentConfig = "unixpacket:///var/run/lancer/collector_tcp.sock?timeout=100ms&chan=1024"
- )
- var (
- _logSeparator = []byte("\u0001")
- _defaultTaskIDs = map[string]string{
- env.DeployEnvFat1: "000069",
- env.DeployEnvUat: "000069",
- env.DeployEnvPre: "000161",
- env.DeployEnvProd: "000161",
- }
- )
- // AgentHandler agent struct.
- type AgentHandler struct {
- c *AgentConfig
- msgs chan []core.Field
- waiter sync.WaitGroup
- pool sync.Pool
- enc core.Encoder
- batchSend bool
- filters map[string]struct{}
- }
- // AgentConfig agent config.
- type AgentConfig struct {
- TaskID string
- Buffer int
- Proto string `dsn:"network"`
- Addr string `dsn:"address"`
- Chan int `dsn:"query.chan"`
- Timeout xtime.Duration `dsn:"query.timeout"`
- }
- // NewAgent a Agent.
- func NewAgent(ac *AgentConfig) (a *AgentHandler) {
- if ac == nil {
- ac = parseDSN(_agentDSN)
- }
- if len(ac.TaskID) == 0 {
- ac.TaskID = _defaultTaskIDs[env.DeployEnv]
- }
- a = &AgentHandler{
- c: ac,
- enc: core.NewJSONEncoder(core.EncoderConfig{
- EncodeTime: core.EpochTimeEncoder,
- EncodeDuration: core.SecondsDurationEncoder,
- }, core.NewBuffer(0)),
- }
- a.pool.New = func() interface{} {
- return make([]core.Field, 0, 16)
- }
- if ac.Chan == 0 {
- ac.Chan = _defaultChan
- }
- a.msgs = make(chan []core.Field, ac.Chan)
- if ac.Timeout == 0 {
- ac.Timeout = _agentTimeout
- }
- if ac.Buffer == 0 {
- ac.Buffer = 100
- }
- a.waiter.Add(1)
- // set fixed k/v into enc buffer
- KV(_appID, c.Family).AddTo(a.enc)
- KV(_deplyEnv, env.DeployEnv).AddTo(a.enc)
- KV(_instanceID, c.Host).AddTo(a.enc)
- KV(_zone, env.Zone).AddTo(a.enc)
- if a.c.Proto == "unixpacket" {
- a.batchSend = true
- }
- go a.writeproc()
- return
- }
- func (h *AgentHandler) data() []core.Field {
- return h.pool.Get().([]core.Field)
- }
- func (h *AgentHandler) free(f []core.Field) {
- f = f[0:0]
- h.pool.Put(f)
- }
- // Log log to udp statsd daemon.
- func (h *AgentHandler) Log(ctx context.Context, lv Level, args ...D) {
- if args == nil {
- return
- }
- f := h.data()
- for i := range args {
- f = append(f, args[i])
- }
- if t, ok := trace.FromContext(ctx); ok {
- if s, ok := t.(fmt.Stringer); ok {
- f = append(f, KV(_tid, s.String()))
- } else {
- f = append(f, KV(_tid, fmt.Sprintf("%s", t)))
- }
- }
- if caller := metadata.String(ctx, metadata.Caller); caller != "" {
- f = append(f, KV(_caller, caller))
- }
- if color := metadata.String(ctx, metadata.Color); color != "" {
- f = append(f, KV(_color, color))
- }
- if cluster := metadata.String(ctx, metadata.Cluster); cluster != "" {
- f = append(f, KV(_cluster, cluster))
- }
- if metadata.Bool(ctx, metadata.Mirror) {
- f = append(f, KV(_mirror, true))
- }
- select {
- case h.msgs <- f:
- default:
- }
- }
- // writeproc write data into connection.
- func (h *AgentHandler) writeproc() {
- var (
- conn net.Conn
- err error
- count int
- quit bool
- )
- buf := core.NewBuffer(2048)
- defer h.waiter.Done()
- taskID := []byte(h.c.TaskID)
- tick := time.NewTicker(_mergeWait)
- for {
- select {
- case d := <-h.msgs:
- if d == nil {
- quit = true
- goto DUMP
- }
- if buf.Len() >= _maxBuffer {
- buf.Reset() // avoid oom
- }
- now := time.Now()
- buf.Write(taskID)
- buf.Write([]byte(strconv.FormatInt(now.UnixNano()/1e6, 10)))
- h.enc.Encode(buf, d...)
- h.free(d)
- if h.batchSend {
- buf.Write(_logSeparator)
- if count++; count < h.c.Buffer && buf.Len() < _maxBuffer {
- continue
- }
- }
- case <-tick.C:
- }
- if conn == nil || err != nil {
- if conn, err = net.DialTimeout(h.c.Proto, h.c.Addr, time.Duration(h.c.Timeout)); err != nil {
- stdlog.Printf("net.DialTimeout(%s:%s) error(%v)\n", h.c.Proto, h.c.Addr, err)
- continue
- }
- }
- DUMP:
- if conn != nil && buf.Len() > 0 {
- count = 0
- if _, err = conn.Write(buf.Bytes()); err != nil {
- stdlog.Printf("conn.Write(%d bytes) error(%v)\n", buf.Len(), err)
- conn.Close()
- } else {
- // only succeed reset buffer, let conn reconnect.
- buf.Reset()
- }
- }
- if quit {
- if conn != nil && err == nil {
- conn.Close()
- }
- return
- }
- }
- }
- // Close close the connection.
- func (h *AgentHandler) Close() (err error) {
- h.msgs <- nil
- h.waiter.Wait()
- return nil
- }
- // SetFormat .
- func (h *AgentHandler) SetFormat(string) {
- // discard setformat
- }
|