main.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package main
  2. import (
  3. "os"
  4. "os/signal"
  5. "syscall"
  6. "time"
  7. "context"
  8. "flag"
  9. "fmt"
  10. "net/http"
  11. "go-common/app/service/ops/log-agent/conf"
  12. "go-common/library/log"
  13. "go-common/app/service/ops/log-agent/pipeline"
  14. "go-common/app/service/ops/log-agent/pipeline/hostlogcollector"
  15. "go-common/app/service/ops/log-agent/pipeline/dockerlogcollector"
  16. "go-common/app/service/ops/log-agent/pkg/limit"
  17. "go-common/app/service/ops/log-agent/pkg/flowmonitor"
  18. "go-common/app/service/ops/log-agent/pkg/httpstream"
  19. "go-common/app/service/ops/log-agent/pkg/lancermonitor"
  20. "go-common/app/service/ops/log-agent/pkg/lancerroute"
  21. "go-common/library/conf/env"
  22. xip "go-common/library/net/ip"
  23. "go-common/library/naming/discovery"
  24. "go-common/library/naming"
  25. )
  26. const AppVersion = "2.1.0"
  27. type Agent struct {
  28. limit *limit.Limit
  29. httpstream *httpstream.HttpStream
  30. }
  31. func main() {
  32. var (
  33. err error
  34. ctx, cancel = context.WithCancel(context.Background())
  35. //cancel context.CancelFunc
  36. )
  37. version := flag.Bool("v", false, "show version and exit")
  38. flag.Parse()
  39. if *version {
  40. fmt.Println(AppVersion)
  41. os.Exit(0)
  42. }
  43. if err = conf.Init(); err != nil {
  44. panic(err)
  45. }
  46. agent := new(Agent)
  47. // set context
  48. ctx = context.WithValue(ctx, "GlobalConfig", conf.Conf)
  49. ctx = context.WithValue(ctx, "MetaPath", conf.Conf.HostLogCollector.MetaPath)
  50. // init xlog
  51. conf.Conf.Log.Stdout = true // ooooo just for test
  52. log.Init(conf.Conf.Log)
  53. defer log.Close()
  54. log.Info("log agent [version: %s] start", AppVersion)
  55. // resource limit by cgroup
  56. if conf.Conf.Limit != nil {
  57. conf.Conf.Limit.AppName = "log-agent"
  58. if agent.limit, err = limit.LimitRes(conf.Conf.Limit); err != nil {
  59. log.Warn("resource limit disabled: %s", err)
  60. }
  61. }
  62. // /debug/vars
  63. if conf.Conf.DebugAddr != "" {
  64. go http.ListenAndServe(conf.Conf.DebugAddr, nil)
  65. }
  66. // init lancer route
  67. err = lancerroute.InitLancerRoute()
  68. if err != nil {
  69. panic(err)
  70. }
  71. // httpstream
  72. if agent.httpstream, err = httpstream.NewHttpStream(conf.Conf.HttpStream); err != nil {
  73. log.Warn("httpstream disabled: %s", err)
  74. }
  75. // start pipeline management
  76. err = pipeline.InitPipelineMng(ctx)
  77. if err != nil {
  78. panic(err)
  79. }
  80. // start host log collector
  81. err = hostlogcollector.InitHostLogCollector(ctx, conf.Conf.HostLogCollector)
  82. if err != nil {
  83. panic(err)
  84. }
  85. // start docker log collector
  86. err = dockerlogcollector.InitDockerLogCollector(ctx, conf.Conf.DockerLogCollector)
  87. if err != nil {
  88. log.Error("failed to start docker log collector: %s", err)
  89. }
  90. // flow monitor
  91. if conf.Conf.Flowmonitor != nil {
  92. if err = flowmonitor.InitFlowMonitor(conf.Conf.Flowmonitor); err != nil {
  93. panic(err)
  94. }
  95. }
  96. // lancer monitor
  97. if conf.Conf.LancerMonitor != nil {
  98. if _, err = lancermonitor.InitLancerMonitor(conf.Conf.LancerMonitor); err != nil {
  99. panic(err)
  100. }
  101. }
  102. // start discovery register
  103. if env.IP == "" {
  104. ip := xip.InternalIP()
  105. dis := discovery.New(conf.Conf.Discovery)
  106. ins := &naming.Instance{
  107. Zone: env.Zone,
  108. Env: env.DeployEnv,
  109. AppID: env.AppID,
  110. Hostname: env.Hostname,
  111. Version: AppVersion,
  112. Addrs: []string{
  113. ip,
  114. },
  115. }
  116. _, err = dis.Register(ctx, ins)
  117. if err != nil {
  118. panic(err)
  119. }
  120. }
  121. // signal
  122. ch := make(chan os.Signal, 1)
  123. signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP)
  124. for {
  125. s := <-ch
  126. log.Info("agent get a signal %s", s.String())
  127. switch s {
  128. case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP, syscall.SIGINT, syscall.SIGHUP:
  129. if cancel != nil {
  130. cancel()
  131. }
  132. time.Sleep(time.Second)
  133. return
  134. default:
  135. return
  136. }
  137. }
  138. }