alert.go 9.1 KB


  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "strings"
  8. "go-common/app/interface/openplatform/monitor-end/model"
  9. "go-common/app/interface/openplatform/monitor-end/model/monitor"
  10. "go-common/library/ecode"
  11. )
  12. var (
  13. _duplicateNameErr = errors.New("组名已经存在!")
  14. _duplicateTargetErr = errors.New("已经存在该告警sub_event!")
  15. _invalidTargetIDErr = errors.New("无效的id!")
  16. _duplicateProductErr = errors.New("重复的product name")
  17. )
  18. func targetKey(t *model.Target) string {
  19. if t.Source == "" || t.Product == "" || t.Event == "" || t.SubEvent == "" {
  20. return ""
  21. }
  22. return fmt.Sprintf("%s_%s_%s_%s", t.Source, t.Product, t.Event, t.SubEvent)
  23. }
  24. func productKey(p string) string {
  25. if p == "" {
  26. return ""
  27. }
  28. return fmt.Sprintf("%s", p)
  29. }
  30. func (s *Service) loadalertsettings() {
  31. var (
  32. c = context.Background()
  33. gm = make(map[int64]*model.Group)
  34. tm = make(map[int64]*model.Target)
  35. tmk = make(map[string]*model.Target)
  36. tmn = make(map[string]*model.Target)
  37. pm = make(map[int64]*model.Product)
  38. pmk = make(map[string]*model.Product)
  39. )
  40. if gs, err := s.dao.AllGroups(c); err == nil {
  41. for _, g := range gs {
  42. gm[g.ID] = g
  43. }
  44. }
  45. if ts, err := s.dao.AllTargets(c, 1); err == nil {
  46. for _, t := range ts {
  47. tm[t.ID] = t
  48. if key := targetKey(t); key != "" {
  49. tmk[key] = t
  50. }
  51. }
  52. }
  53. if ts, err := s.dao.AllTargets(c, 0); err == nil {
  54. for _, t := range ts {
  55. if key := targetKey(t); key != "" {
  56. tmn[key] = t
  57. }
  58. }
  59. }
  60. if ps, err := s.dao.AllProducts(c); err == nil {
  61. for _, p := range ps {
  62. pm[p.ID] = p
  63. if key := productKey(p.Name); err == nil {
  64. pmk[key] = p
  65. }
  66. }
  67. }
  68. s.mapMutex.Lock()
  69. s.groups = gm
  70. s.targets = tm
  71. s.products = pm
  72. s.targetKeys = tmk
  73. s.productKeys = pmk
  74. s.newTargets = tmn
  75. s.mapMutex.Unlock()
  76. }
  77. // AddGroup add a new group.
  78. func (s *Service) AddGroup(c context.Context, group *model.Group) (id int64, err error) {
  79. var g *model.Group
  80. if group.Name == "" || group.Receivers == "" || group.Interval < 0 {
  81. err = ecode.RequestErr
  82. return
  83. }
  84. if g, err = s.GroupByName(c, group.Name); err != nil {
  85. return
  86. }
  87. if g.ID > 0 {
  88. err = _duplicateNameErr
  89. return
  90. }
  91. if group.Interval == 0 {
  92. group.Interval = 30
  93. }
  94. return s.dao.AddGroup(c, group)
  95. }
  96. // UpdateGroup update group.
  97. func (s *Service) UpdateGroup(c context.Context, group *model.Group) (err error) {
  98. var g *model.Group
  99. if group.ID == 0 || group.Name == "" || group.Receivers == "" || group.Interval < 0 {
  100. err = ecode.RequestErr
  101. return
  102. }
  103. if g, err = s.GroupByName(c, group.Name); err != nil {
  104. return
  105. }
  106. if g.ID != 0 && g.ID != group.ID {
  107. err = _duplicateNameErr
  108. return
  109. }
  110. if group.Interval == 0 {
  111. group.Interval = 30
  112. }
  113. _, err = s.dao.UpdateGroup(c, group)
  114. return
  115. }
  116. // DeleteGroup delete group.
  117. func (s *Service) DeleteGroup(c context.Context, id int64) (err error) {
  118. if id == 0 {
  119. err = ecode.RequestErr
  120. return
  121. }
  122. _, err = s.dao.DeleteGroup(c, id)
  123. return
  124. }
  125. // GroupList return all groups.
  126. func (s *Service) GroupList(c context.Context, params *model.GroupListParams) (res *model.Groups, err error) {
  127. res = &model.Groups{}
  128. if res.Groups, err = s.dao.AllGroups(c); err != nil {
  129. return
  130. }
  131. res.Total = len(res.Groups)
  132. return
  133. }
  134. // GroupByName get group by name.
  135. func (s *Service) GroupByName(c context.Context, name string) (res *model.Group, err error) {
  136. return s.dao.GroupByName(c, name)
  137. }
  138. // Target get target by id.
  139. func (s *Service) Target(c context.Context, id int64) (res *model.Target, err error) {
  140. return s.dao.Target(c, id)
  141. }
  142. // AddTarget add a new target.
  143. func (s *Service) AddTarget(c context.Context, t *model.Target) (id int64, err error) {
  144. if t.SubEvent == "" || t.Event == "" || t.Product == "" || t.Source == "" {
  145. err = ecode.RequestErr
  146. return
  147. }
  148. if err = s.checkTarget(c, t, true); err != nil {
  149. return
  150. }
  151. return s.dao.AddTarget(c, t)
  152. }
  153. // UpdateTarget update target.
  154. func (s *Service) UpdateTarget(c context.Context, t *model.Target) (err error) {
  155. var (
  156. oldTarget *model.Target
  157. )
  158. if oldTarget, err = s.Target(c, t.ID); err != nil {
  159. return
  160. }
  161. if oldTarget == nil {
  162. err = _invalidTargetIDErr
  163. return
  164. }
  165. mergeTarget(t, oldTarget)
  166. if err = s.checkTarget(c, t, false); err != nil {
  167. return
  168. }
  169. _, err = s.dao.UpdateTarget(c, t)
  170. return
  171. }
  172. // TargetList .
  173. func (s *Service) TargetList(c context.Context, t *model.Target, pn int, ps int, sort string) (res *model.Targets, err error) {
  174. // query := "SELECT id, sub_event, event, product, source, group_id, threshold, duration, state FROM target"
  175. var (
  176. where = ""
  177. order = ""
  178. empty struct{}
  179. sortFields = map[string]struct{}{
  180. "sub_event": empty,
  181. "mtime": empty,
  182. "ctime": empty,
  183. "state": empty,
  184. }
  185. sortOrder = map[string]string{
  186. "0": "DESC",
  187. "1": "ASC",
  188. }
  189. )
  190. if t.SubEvent != "" {
  191. where += " sub_event LIKE '%" + t.SubEvent + "%'"
  192. }
  193. if t.Event != "" {
  194. where += " event = '" + t.Event + "'"
  195. }
  196. if t.Product != "" {
  197. where += " product = '" + t.Product + "'"
  198. }
  199. if t.Source != "" {
  200. where += " source = '" + t.Source + "'"
  201. }
  202. if t.States != "" {
  203. where += " state in (" + t.States + ")"
  204. }
  205. if where == "" {
  206. where = " WHERE" + where + " deleted_time = 0"
  207. } else {
  208. where = " WHERE" + where + " AND deleted_time = 0"
  209. }
  210. countWhere := where
  211. pn = (pn - 1) * ps
  212. sorts := strings.Split(sort, ",")
  213. if len(sorts) == 2 {
  214. _, ok1 := sortFields[sorts[0]]
  215. d, ok2 := sortOrder[sorts[1]]
  216. if ok1 && ok2 {
  217. order = " ORDER BY " + sorts[0] + " " + d
  218. }
  219. }
  220. where += order
  221. where += fmt.Sprintf(" LIMIT %d, %d", pn, ps)
  222. res = &model.Targets{}
  223. if res.Targets, err = s.dao.TargetsByQuery(c, where); err != nil {
  224. return
  225. }
  226. if res.Total, err = s.dao.CountTargets(c, countWhere); err != nil {
  227. return
  228. }
  229. res.Page = pn
  230. res.PageSize = ps
  231. return
  232. }
  233. // TargetSync sync target state.
  234. func (s *Service) TargetSync(c context.Context, id int64, state int) (err error) {
  235. return s.dao.TargetSync(c, id, state)
  236. }
  237. // DeleteTarget delete target by id.
  238. func (s *Service) DeleteTarget(c context.Context, id int64) (err error) {
  239. _, err = s.dao.DeleteTarget(c, id)
  240. return
  241. }
  242. func (s *Service) checkTarget(c context.Context, t *model.Target, isNew bool) (err error) {
  243. var id int64
  244. t.SubEvent = induceSubEvent(t.SubEvent)
  245. if id, err = s.dao.IsExisted(c, t); err != nil {
  246. return
  247. }
  248. if isNew && id != 0 {
  249. fmt.Println("id", id)
  250. err = _duplicateTargetErr
  251. }
  252. if !isNew && id != 0 && t.ID != id {
  253. err = _duplicateTargetErr
  254. }
  255. return
  256. }
  257. func mergeTarget(t *model.Target, o *model.Target) {
  258. te := reflect.ValueOf(t).Elem()
  259. oe := reflect.ValueOf(o).Elem()
  260. for i := 0; i < te.NumField()-2; i++ {
  261. switch v := te.Field(i).Interface().(type) {
  262. case int, int64:
  263. if v == 0 {
  264. te.Field(i).Set(oe.Field(i))
  265. }
  266. case string:
  267. if v == "" {
  268. te.Field(i).Set(oe.Field(i))
  269. }
  270. }
  271. }
  272. }
  273. // AddProduct add a new group.
  274. func (s *Service) AddProduct(c context.Context, p *model.Product) (id int64, err error) {
  275. if p.Name == "" || p.GroupIDs == "" {
  276. err = ecode.RequestErr
  277. return
  278. }
  279. var a *model.Product
  280. if a, err = s.dao.ProductByName(c, p.Name); err != nil {
  281. return
  282. }
  283. if a != nil {
  284. err = _duplicateProductErr
  285. return
  286. }
  287. id, err = s.dao.AddProduct(c, p)
  288. return
  289. }
  290. // UpdateProduct update product.
  291. func (s *Service) UpdateProduct(c context.Context, p *model.Product) (err error) {
  292. if p.ID == 0 || p.Name == "" || p.GroupIDs == "" {
  293. err = ecode.RequestErr
  294. return
  295. }
  296. var a *model.Product
  297. if a, err = s.dao.ProductByName(c, p.Name); err != nil {
  298. return
  299. }
  300. if a != nil && a.ID != p.ID {
  301. err = _duplicateProductErr
  302. return
  303. }
  304. _, err = s.dao.UpdateProduct(c, p)
  305. return
  306. }
  307. // DeleteProduct delete product.
  308. func (s *Service) DeleteProduct(c context.Context, id int64) (err error) {
  309. _, err = s.dao.DeleteProduct(c, id)
  310. return
  311. }
  312. // AllProducts return all products.
  313. func (s *Service) AllProducts(c context.Context) (res *model.Products, err error) {
  314. res = &model.Products{}
  315. if res.Products, err = s.dao.AllProducts(c); err != nil {
  316. return
  317. }
  318. res.Total = len(res.Products)
  319. return
  320. }
  321. // Collect collect.
  322. func (s *Service) Collect(c context.Context, p *monitor.Log) {
  323. var (
  324. curr int
  325. key string
  326. t *model.Target
  327. )
  328. target := &model.Target{
  329. Source: sourceFromLog(p),
  330. Product: p.Product,
  331. Event: p.Event,
  332. SubEvent: induceSubEvent(p.SubEvent),
  333. State: 0,
  334. }
  335. s.infoCh <- p
  336. //TODO 获取buvid, ip等信息,过滤重复请求
  337. if key = targetKey(target); key == "" {
  338. return
  339. }
  340. if t = s.targetKeys[key]; t == nil {
  341. if s.newTargets[key] == nil {
  342. // 添加新的target
  343. s.AddTarget(c, target)
  344. }
  345. s.mapMutex.Lock()
  346. s.newTargets[key] = t
  347. s.mapMutex.Unlock()
  348. return
  349. }
  350. if t.Threshold == 0 {
  351. return
  352. }
  353. p.CalCode()
  354. code := codeFromLog(p)
  355. curr = s.dao.TargetIncr(c, t, code)
  356. if curr > t.Threshold {
  357. go s.mail(c, p, t, curr, code)
  358. }
  359. }
  360. func sourceFromLog(l *monitor.Log) string {
  361. if l.Type == "web/h5" {
  362. return l.Type
  363. }
  364. if l.RequestURI == "" {
  365. return "app"
  366. }
  367. if res := strings.Split(l.RequestURI, "?"); len(res) > 1 {
  368. return res[1]
  369. }
  370. return l.RequestURI
  371. }
  372. func codeFromLog(l *monitor.Log) string {
  373. if l.Codes == "" {
  374. return "999"
  375. }
  376. if l.HTTPCode != "" && l.HTTPCode != "200" {
  377. return l.HTTPCode
  378. }
  379. if l.BusinessCode != "" && l.BusinessCode != "0" {
  380. return l.BusinessCode
  381. }
  382. return "-999"
  383. }