monitor.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/admin/main/videoup/model/archive"
  5. "go-common/app/admin/main/videoup/model/manager"
  6. "go-common/app/admin/main/videoup/model/monitor"
  7. "go-common/library/log"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/pkg/errors"
  12. )
  13. // MonitorResult 获取监控业务的统计结果列表
  14. func (s *Service) MonitorResult(c context.Context, p *monitor.RuleResultParams) (res []*monitor.RuleResultData, err error) {
  15. var (
  16. rules []*monitor.Rule
  17. mUser *manager.User
  18. )
  19. res = []*monitor.RuleResultData{}
  20. if rules, err = s.monitor.GetRules(c, p.Type, p.Business, true); err != nil {
  21. return
  22. }
  23. for _, v := range rules {
  24. var stats *monitor.Stats
  25. if stats, err = s.MonitorStats(c, v); err != nil {
  26. return
  27. }
  28. if mUser, err = s.mng.User(c, v.UID); err != nil {
  29. mUser = &manager.User{
  30. ID: v.UID,
  31. }
  32. err = nil
  33. }
  34. tmp := &monitor.RuleResultData{
  35. Stats: stats,
  36. User: mUser,
  37. Rule: v,
  38. }
  39. res = append(res, tmp)
  40. }
  41. return
  42. }
  43. // MonitorStats 根据business和rule获取统计结果
  44. func (s *Service) MonitorStats(c context.Context, rule *monitor.Rule) (stats *monitor.Stats, err error) {
  45. var (
  46. qualiKeys []string //符合条件的统计redis key
  47. )
  48. qualiKeys, err = s.RuleQualifiedKeys(c, rule)
  49. //根据符合条件的redis key qualiKeys获取数据
  50. stats = &monitor.Stats{}
  51. for _, k := range qualiKeys {
  52. var res *monitor.Stats
  53. if res, err = s.monitor.StatsResult(c, k, rule.RuleConf); err != nil {
  54. return
  55. }
  56. stats.TotalCount += res.TotalCount
  57. stats.MoniCount += res.MoniCount
  58. if res.MaxTime > stats.MaxTime {
  59. stats.MaxTime = res.MaxTime
  60. }
  61. }
  62. return
  63. }
  64. // MonitorRuleUpdate 更新监控规则
  65. func (s *Service) MonitorRuleUpdate(c context.Context, rule *monitor.Rule) (err error) {
  66. rule.CTime = time.Now().Format("2006-01-02 15:04:05")
  67. err = s.monitor.SetRule(c, rule)
  68. return
  69. }
  70. // RuleQualifiedKeys 获取监控业务中符合监控条件的Redis key
  71. func (s *Service) RuleQualifiedKeys(c context.Context, rule *monitor.Rule) (qualiKeys []string, err error) {
  72. var (
  73. allKeys []string //当前业务的所有统计redis key
  74. //qualiKeys []string //符合条件的统计redis key
  75. prefix string //当前业务redis 可以前缀
  76. keyConf *monitor.KeyConf //当前业务redis key 的字段配置信息
  77. moniCdt = rule.RuleConf.MoniCdt //当前业务中,需要统计的条件
  78. ok bool
  79. )
  80. qualiKeys = []string{}
  81. if keyConf, ok = monitor.RedisKeyConf[rule.Business]; !ok {
  82. err = errors.New("Business Not Exists")
  83. return
  84. }
  85. prefix, allKeys, err = s.monitor.BusStatsKeys(c, rule.Business)
  86. kFields := keyConf.KFields //Redis key中的字段
  87. for _, fullK := range allKeys {
  88. k := strings.Replace(fullK, prefix, "", 1) //去掉redis key 中的前缀monitor_stats_{business}_
  89. ks := strings.Split(k, "_") //redis key 的切割,格式:%d_%d
  90. if len(ks) != len(kFields) { //KFields中的字段数必须与redis key中数量一致
  91. err = errors.New("KeyConf 中 KFields 的字段配置错误")
  92. return
  93. }
  94. //把满足条件的Redis key找出来
  95. qualified := true //当前key是否满足条件
  96. i := -1
  97. for kf := range kFields {
  98. i++
  99. var (
  100. fv int64
  101. )
  102. fv, err = strconv.ParseInt(ks[i], 10, 64) //Redis key 中第i位的值
  103. if err != nil {
  104. return
  105. }
  106. if _, ok = moniCdt[kf]; !ok {
  107. err = errors.New("配置的 moniCdt 中不存在: " + kf)
  108. return
  109. }
  110. switch moniCdt[kf].Comp {
  111. case monitor.CompE:
  112. if fv != moniCdt[kf].Value {
  113. qualified = false
  114. }
  115. case monitor.CompNE:
  116. if fv == moniCdt[kf].Value {
  117. qualified = false
  118. }
  119. case monitor.CompGET:
  120. if fv < moniCdt[kf].Value {
  121. qualified = false
  122. }
  123. case monitor.CompLET:
  124. if fv > moniCdt[kf].Value {
  125. qualified = false
  126. }
  127. case monitor.CompGT:
  128. if fv <= moniCdt[kf].Value {
  129. qualified = false
  130. }
  131. case monitor.CompLT:
  132. if fv >= moniCdt[kf].Value {
  133. qualified = false
  134. }
  135. default:
  136. err = errors.New("配置的 MoniCdt 中 comparison 不合法: " + moniCdt[kf].Comp)
  137. return
  138. }
  139. }
  140. if qualified {
  141. qualiKeys = append(qualiKeys, fullK)
  142. }
  143. }
  144. return
  145. }
  146. // MoniStayOids 获取监控范围内,滞留的oids
  147. func (s *Service) MoniStayOids(c context.Context, tp, bid int8, id int64) (total int, oidMap map[int64]int, qualiKeys []string, err error) {
  148. var (
  149. rule *monitor.Rule
  150. )
  151. if rule, err = s.monitor.GetRule(c, tp, bid, id); err != nil {
  152. return
  153. }
  154. //查找符合条件的统计redis key
  155. if qualiKeys, err = s.RuleQualifiedKeys(c, rule); err != nil {
  156. return
  157. }
  158. oidMap, total, err = s.monitor.StayOids(c, rule, qualiKeys)
  159. log.Info("MoniStayOids(%d,%d,%d) oidMap(%v)", tp, bid, id, oidMap)
  160. return
  161. }
  162. // MonitorStayOids 获取监控范围内,滞留的oids
  163. func (s *Service) MonitorStayOids(c context.Context, id int64) (oidMap map[int64]int, err error) {
  164. return s.data.MonitorOids(c, id)
  165. }
  166. // MonitorNotifyResult 获取达到了报警阀值的数据
  167. func (s *Service) MonitorNotifyResult(c context.Context) (res []*monitor.RuleResultData, err error) {
  168. var (
  169. rules []*monitor.Rule
  170. stats *monitor.Stats
  171. )
  172. res = []*monitor.RuleResultData{}
  173. if rules, err = s.monitor.GetAllRules(c, false); err != nil {
  174. log.Error("MonitorNotifyCheck() error(%v)", err)
  175. return
  176. }
  177. for _, v := range rules {
  178. if v.Business == monitor.BusVideo {
  179. s.MonitorCheckVideoStatus(c, v.Type, v.ID)
  180. }
  181. if stats, err = s.MonitorStats(c, v); err != nil {
  182. log.Error("MonitorNotifyCheck() error(%v)", err)
  183. err = nil
  184. continue
  185. }
  186. notify := true
  187. //暂时只有time、count这两个报警条件
  188. if _, ok := v.RuleConf.NotifyCdt["time"]; ok {
  189. threshold := v.RuleConf.NotifyCdt["time"].Value
  190. comp := v.RuleConf.NotifyCdt["time"].Comp
  191. switch comp {
  192. case monitor.CompGT:
  193. if int64(stats.MaxTime) < threshold {
  194. notify = false
  195. }
  196. case monitor.CompLT:
  197. if int64(stats.MaxTime) > threshold {
  198. notify = false
  199. }
  200. }
  201. }
  202. if _, ok := v.RuleConf.NotifyCdt["count"]; ok {
  203. threshold := v.RuleConf.NotifyCdt["count"].Value
  204. comp := v.RuleConf.NotifyCdt["count"].Comp
  205. switch comp {
  206. case monitor.CompGT:
  207. if int64(stats.MoniCount) < threshold {
  208. notify = false
  209. }
  210. case monitor.CompLT:
  211. if int64(stats.MoniCount) > threshold {
  212. notify = false
  213. }
  214. }
  215. }
  216. if notify {
  217. tmp := &monitor.RuleResultData{
  218. Stats: stats,
  219. Rule: v,
  220. }
  221. res = append(res, tmp)
  222. }
  223. }
  224. return
  225. }
  226. // MonitorCheckVideoStatus 检查视频的稿件状态,如果是-100则剔除SortedSet的数据
  227. func (s *Service) MonitorCheckVideoStatus(c context.Context, tp int8, id int64) (err error) {
  228. var (
  229. vidMap map[int64]int
  230. vidAidMap map[int64]int64
  231. arcStates map[int64]int
  232. vids, aids []int64
  233. bid = monitor.BusVideo
  234. keys []string
  235. )
  236. if _, vidMap, keys, err = s.MoniStayOids(c, tp, bid, id); err != nil {
  237. log.Error("s.MoniStayOids(%d,%d,%d) error(%v)", tp, bid, id, err)
  238. return
  239. }
  240. for vid := range vidMap {
  241. vids = append(vids, vid)
  242. }
  243. if vidAidMap, err = s.arc.VideoAidMap(c, vids); err != nil {
  244. log.Error("s.VideoAidMap(%d) error(%v)", vids, err)
  245. return
  246. }
  247. for _, aid := range vidAidMap {
  248. aids = append(aids, aid)
  249. }
  250. if arcStates, err = s.arc.ArcStateMap(c, aids); err != nil {
  251. log.Error("s.ArcStateMap(%d) error(%v)", aids, err)
  252. return
  253. }
  254. for vid, aid := range vidAidMap {
  255. if _, ok := arcStates[aid]; !ok {
  256. continue
  257. }
  258. if arcStates[aid] != int(archive.StateForbidUpDelete) && arcStates[aid] != int(archive.StateForbidLock) && arcStates[aid] != int(archive.StateForbidRecycle) {
  259. continue
  260. }
  261. for _, k := range keys {
  262. if err = s.monitor.RemMonitorStats(c, k, vid); err != nil {
  263. log.Error("s.monitor.RemMonitorStats(%s,%d) error(%v)", k, vid, err)
  264. }
  265. }
  266. }
  267. return
  268. }