management.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. package pipeline
  2. import (
  3. "sync"
  4. "errors"
  5. "context"
  6. "sort"
  7. "time"
  8. "os"
  9. "go-common/app/service/ops/log-agent/event"
  10. "go-common/app/service/ops/log-agent/input"
  11. "go-common/app/service/ops/log-agent/processor"
  12. "go-common/app/service/ops/log-agent/output"
  13. "go-common/app/service/ops/log-agent/pkg/common"
  14. "go-common/library/log"
  15. "github.com/BurntSushi/toml"
  16. )
  17. type PipelineMng struct {
  18. Pipelines map[string]*Pipeline
  19. PipelinesLock sync.RWMutex
  20. ctx context.Context
  21. cancel context.CancelFunc
  22. scanInterval time.Duration
  23. }
  24. var PipelineManagement *PipelineMng
  25. const defaultPipeline = "defaultPipeline"
  26. func InitPipelineMng(ctx context.Context) (err error) {
  27. m := new(PipelineMng)
  28. m.Pipelines = make(map[string]*Pipeline)
  29. m.ctx, m.cancel = context.WithCancel(ctx)
  30. if err = m.StartDefaultOutput(); err != nil {
  31. return err
  32. }
  33. m.scanInterval = time.Second * 10
  34. go m.scan()
  35. m.StartDefaultPipeline()
  36. // Todo check defaultPipeline
  37. //if !m.PipelineExisted(defaultPipeline) {
  38. // return errors.New("failed to start defaultPipeline, see log for more details")
  39. //}
  40. PipelineManagement = m
  41. return nil
  42. }
  43. func (m *PipelineMng) RegisterHostFileCollector(configPath string, p *Pipeline) {
  44. m.PipelinesLock.Lock()
  45. defer m.PipelinesLock.Unlock()
  46. m.Pipelines[configPath] = p
  47. }
  48. func (m *PipelineMng) UnRegisterHostFileCollector(configPath string) {
  49. m.PipelinesLock.Lock()
  50. defer m.PipelinesLock.Unlock()
  51. delete(m.Pipelines, configPath)
  52. }
  53. func (m *PipelineMng) PipelineExisted(configPath string) bool {
  54. m.PipelinesLock.RLock()
  55. defer m.PipelinesLock.RUnlock()
  56. _, ok := m.Pipelines[configPath]
  57. return ok
  58. }
  59. func (m *PipelineMng) GetPipeline(configPath string) *Pipeline {
  60. m.PipelinesLock.RLock()
  61. defer m.PipelinesLock.RUnlock()
  62. if pipe, ok := m.Pipelines[configPath]; ok {
  63. return pipe
  64. }
  65. return nil
  66. }
  67. // pipelines get configPath list of registered pipeline
  68. func (m *PipelineMng) configPaths() []string {
  69. m.PipelinesLock.RLock()
  70. defer m.PipelinesLock.RUnlock()
  71. result := make([]string, 0, len(m.Pipelines))
  72. for p, _ := range m.Pipelines {
  73. result = append(result, p)
  74. }
  75. return result
  76. }
  77. func (m *PipelineMng) scan() {
  78. ticker := time.Tick(m.scanInterval)
  79. whiteList := make(map[string]struct{})
  80. whiteList[defaultPipeline] = struct{}{}
  81. for {
  82. select {
  83. case <-ticker:
  84. for _, configPath := range m.configPaths() {
  85. if _, ok := whiteList[configPath]; ok {
  86. continue
  87. }
  88. pipe := m.GetPipeline(configPath)
  89. // config removed
  90. if _, err := os.Stat(configPath); os.IsNotExist(err) {
  91. if pipe != nil {
  92. log.Info("config file not exist any more, stop pipeline: %s", configPath)
  93. pipe.Stop()
  94. continue
  95. }
  96. }
  97. // config updated
  98. oldMd5 := pipe.configMd5
  99. newMd5 := common.FileMd5(configPath)
  100. if oldMd5 != newMd5 {
  101. log.Info("config file updated, stop old pipeline: %s", configPath)
  102. pipe.Stop()
  103. continue
  104. }
  105. }
  106. case <-m.ctx.Done():
  107. return
  108. }
  109. }
  110. }
  111. func (m *PipelineMng) StartPipeline(ctx context.Context, configPath string, config string) () {
  112. var err error
  113. p := new(Pipeline)
  114. p.configPath = configPath
  115. p.configMd5 = common.FileMd5(configPath)
  116. ctx = context.WithValue(ctx, "configPath", configPath)
  117. p.ctx, p.cancel = context.WithCancel(ctx)
  118. defer p.Stop()
  119. var sortedOrder []string
  120. p.c = new(Config)
  121. md, err := toml.Decode(config, p.c)
  122. if err != nil {
  123. p.logError(err)
  124. return
  125. }
  126. inputToProcessor := make(chan *event.ProcessorEvent)
  127. // start input
  128. inputName := p.c.Input.Name
  129. if inputName == "" {
  130. p.logError(errors.New("type of Config can't be nil"))
  131. return
  132. }
  133. c, err := DecodeInputConfig(inputName, md, p.c.Input.Config)
  134. if err != nil {
  135. p.logError(err)
  136. return
  137. }
  138. InputFactory, err := input.GetFactory(inputName)
  139. if err != nil {
  140. p.logError(err)
  141. return
  142. }
  143. i, err := InputFactory(p.ctx, c, inputToProcessor)
  144. if err != nil {
  145. p.logError(err)
  146. return
  147. }
  148. if err = i.Run(); err != nil {
  149. p.logError(err)
  150. return
  151. }
  152. // start processor
  153. var ProcessorConnector chan *event.ProcessorEvent
  154. ProcessorConnector = inputToProcessor
  155. sortedOrder = make([]string, 0)
  156. for order, _ := range p.c.Processor {
  157. sortedOrder = append(sortedOrder, order)
  158. }
  159. sort.Strings(sortedOrder)
  160. for _, order := range sortedOrder {
  161. name := p.c.Processor[order].Name
  162. if name == "" {
  163. p.logError(errors.New("type of Processor can't be nil"))
  164. return
  165. }
  166. c, err := DecodeProcessorConfig(name, md, p.c.Processor[order].Config)
  167. if err != nil {
  168. p.logError(err)
  169. return
  170. }
  171. proc, err := processor.GetFactory(name)
  172. if err != nil {
  173. p.logError(err)
  174. return
  175. }
  176. ProcessorConnector, err = proc(p.ctx, c, ProcessorConnector)
  177. if err != nil {
  178. p.logError(err)
  179. return
  180. }
  181. }
  182. // add classify and fileLog processor by default if inputName == "file"
  183. if inputName == "file" {
  184. config := `
  185. [processor]
  186. [processor.1]
  187. type = "classify"
  188. [processor.2]
  189. type = "fileLog"
  190. `
  191. fProcessor := new(Config)
  192. md, _ := toml.Decode(config, fProcessor)
  193. fsortedOrder := make([]string, 0)
  194. for order, _ := range fProcessor.Processor {
  195. fsortedOrder = append(fsortedOrder, order)
  196. }
  197. sort.Strings(fsortedOrder)
  198. for _, order := range fsortedOrder {
  199. name := fProcessor.Processor[order].Name
  200. if name == "" {
  201. p.logError(errors.New("type of Processor can't be nil"))
  202. return
  203. }
  204. fc, err := DecodeProcessorConfig(name, md, fProcessor.Processor[order].Config)
  205. if err != nil {
  206. p.logError(err)
  207. return
  208. }
  209. proc, err := processor.GetFactory(name)
  210. if err != nil {
  211. p.logError(err)
  212. return
  213. }
  214. ProcessorConnector, err = proc(p.ctx, fc, ProcessorConnector)
  215. if err != nil {
  216. p.logError(err)
  217. return
  218. }
  219. }
  220. }
  221. // start output
  222. if p.c.Output != nil {
  223. if len(p.c.Output) > 1 {
  224. p.logError(errors.New("only One Output is allowed in One pipeline"))
  225. return
  226. }
  227. var first string
  228. for key, _ := range p.c.Output {
  229. first = key
  230. break
  231. }
  232. o, err := StartOutput(p.ctx, md, p.c.Output[first])
  233. if err != nil {
  234. p.logError(err)
  235. return
  236. }
  237. // connect processor and output
  238. output.ChanConnect(m.ctx, ProcessorConnector, o.InputChan())
  239. } else {
  240. // write to default output
  241. if err := processor.WriteToOutput(p.ctx, "", ProcessorConnector); err != nil {
  242. p.logError(err)
  243. return
  244. }
  245. }
  246. m.RegisterHostFileCollector(configPath, p)
  247. defer m.UnRegisterHostFileCollector(configPath)
  248. <-p.ctx.Done()
  249. }
  250. func (m *PipelineMng) StartDefaultPipeline() {
  251. // config := `
  252. //[input]
  253. //type = "file"
  254. //[input.config]
  255. //paths = ["/data/log-agent/log/info.log.2018-11-07.001"]
  256. //appId = "ops.billions.test"
  257. //[processor]
  258. //[output]
  259. //[output.1]
  260. //type = "stdout"
  261. //`
  262. // config := `
  263. //[input]
  264. //type = "file"
  265. //[input.config]
  266. //paths = ["/data/log-agent/log/info.log.2018-*"]
  267. //appId = "ops.billions.test"
  268. //logId = "000069"
  269. //[processor]
  270. //[processor.1]
  271. //type = "fileLog"
  272. //`
  273. config := `
  274. [input]
  275. type = "sock"
  276. [input.config]
  277. [processor]
  278. [processor.1]
  279. type = "jsonLog"
  280. [processor.2]
  281. type = "lengthCheck"
  282. [processor.3]
  283. type = "httpStream"
  284. [processor.4]
  285. type = "sample"
  286. [processor.5]
  287. type = "classify"
  288. `
  289. go m.StartPipeline(context.Background(), defaultPipeline, config)
  290. }
  291. func (m *PipelineMng) StartDefaultOutput() (err error) {
  292. var value string
  293. if value, err = output.ReadConfig(); err != nil {
  294. return err
  295. }
  296. p := new(Pipeline)
  297. p.c = new(Config)
  298. md, err := toml.Decode(value, p.c)
  299. if err != nil {
  300. return err
  301. }
  302. return StartOutputs(m.ctx, md, p.c.Output)
  303. }
  304. func StartOutputs(ctx context.Context, md toml.MetaData, config map[string]ConfigItem) (err error) {
  305. for _, item := range config {
  306. name := item.Name
  307. if name == "" {
  308. return errors.New("type of Output can't be nil")
  309. }
  310. if _, err = StartOutput(ctx, md, item); err != nil {
  311. return err
  312. }
  313. }
  314. return nil
  315. }
  316. func StartOutput(ctx context.Context, md toml.MetaData, config ConfigItem) (o output.Output, err error) {
  317. name := config.Name
  318. if name == "" {
  319. return nil, errors.New("type of Output can't be nil")
  320. }
  321. c, err := DecodeOutputConfig(name, md, config.Config)
  322. if err != nil {
  323. return nil, err
  324. }
  325. OutputFactory, err := output.GetFactory(name)
  326. if err != nil {
  327. return nil, err
  328. }
  329. o, err = OutputFactory(ctx, c)
  330. if err != nil {
  331. return nil, err
  332. }
  333. if err = o.Run(); err != nil {
  334. return nil, err
  335. }
  336. return o, nil
  337. }