redis.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package monitor
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "go-common/app/admin/main/videoup/model/monitor"
  8. "go-common/library/cache/redis"
  9. "go-common/library/log"
  10. "sort"
  11. "strconv"
  12. "time"
  13. )
  14. const (
  15. FieldKeyFormat = "%d_%d_%d" //监控规则配置的Redis key中的field格式
  16. )
  17. // StatsResult 获取稿件停留统计
  18. func (d *Dao) StatsResult(c context.Context, key string, conf *monitor.RuleConf) (res *monitor.Stats, err error) {
  19. var (
  20. conn = d.redis.Get(c)
  21. totalC, moniC, maxT int
  22. now = time.Now().Unix()
  23. tFrom, tTo int64
  24. timeCdt int64
  25. compCdt string
  26. ok bool
  27. )
  28. defer conn.Close()
  29. if _, ok = conf.NotifyCdt["time"]; !ok {
  30. err = errors.New("配置的 NotifyCdt 中不存在 time")
  31. return
  32. }
  33. timeCdt = conf.NotifyCdt["time"].Value
  34. compCdt = conf.NotifyCdt["time"].Comp
  35. switch compCdt {
  36. case monitor.CompGT:
  37. tFrom = 0
  38. tTo = now - timeCdt
  39. case monitor.CompLT:
  40. tFrom = now - timeCdt
  41. tTo = now
  42. default:
  43. err = errors.New("配置的 NotifyCdt 中 comparison 不合法: " + compCdt)
  44. return
  45. }
  46. if totalC, err = redis.Int(conn.Do("ZCOUNT", key, 0, now)); err != nil {
  47. log.Error("conn.Do(ZCOUNT,%s,0,%d) error(%v)", key, now, err)
  48. return
  49. }
  50. if moniC, err = redis.Int(conn.Do("ZCOUNT", key, tFrom, tTo)); err != nil {
  51. log.Error("conn.Do(ZCOUNT,%s,%d,%d) error(%v)", key, tFrom, tTo, err)
  52. return
  53. }
  54. var oldest map[string]string //进入列表最久的项
  55. oldest, err = redis.StringMap(conn.Do("ZRANGE", key, 0, 0, "WITHSCORES"))
  56. for _, t := range oldest {
  57. var i int
  58. if i, err = strconv.Atoi(t); err != nil {
  59. return
  60. }
  61. maxT = int(now) - i
  62. }
  63. res = &monitor.Stats{
  64. TotalCount: totalC,
  65. MoniCount: moniC,
  66. MaxTime: maxT,
  67. }
  68. return
  69. }
  70. // GetAllRules 获取所有规则
  71. func (d *Dao) GetAllRules(c context.Context, all bool) (rules []*monitor.Rule, err error) {
  72. var (
  73. conn = d.redis.Get(c)
  74. res = make(map[string]string)
  75. )
  76. defer conn.Close()
  77. if res, err = redis.StringMap(conn.Do("HGETALL", monitor.RulesKey)); err != nil {
  78. if err != redis.ErrNil {
  79. log.Error("conn.Do(HGETALL, %s) error(%v)", monitor.RulesKey, err)
  80. return
  81. }
  82. }
  83. for _, v := range res {
  84. rule := &monitor.Rule{}
  85. if err = json.Unmarshal([]byte(v), rule); err != nil {
  86. log.Error("json.Unmarshal(%v) error(%v)", v, err)
  87. break
  88. }
  89. if !all && rule.State != 1 {
  90. continue
  91. }
  92. rules = append(rules, rule)
  93. }
  94. return
  95. }
  96. // GetRules 获取业务下的规则
  97. func (d *Dao) GetRules(c context.Context, tp, bid int8, all bool) (rules []*monitor.Rule, err error) {
  98. if rules, err = d.GetAllRules(c, all); err != nil {
  99. return
  100. }
  101. for k := 0; k < len(rules); k++ {
  102. v := rules[k]
  103. if v.Type != tp || v.Business != bid { //去掉非当前业务开头的配置
  104. rules = append(rules[:k], rules[k+1:]...)
  105. k--
  106. continue
  107. }
  108. }
  109. return
  110. }
  111. // SetRule 修改/添加监控规则
  112. func (d *Dao) SetRule(c context.Context, rule *monitor.Rule) (err error) {
  113. if rule.ID == 0 {
  114. if rule.ID, err = d.RuleIDIncKey(c); err != nil {
  115. return
  116. }
  117. }
  118. var (
  119. conn = d.redis.Get(c)
  120. field = fmt.Sprintf(FieldKeyFormat, rule.Type, rule.Business, rule.ID)
  121. bs []byte
  122. )
  123. defer conn.Close()
  124. if bs, err = json.Marshal(rule); err != nil {
  125. log.Error("json.Marshal(%v) error(%v)", rule, err)
  126. return
  127. }
  128. if _, err = conn.Do("HSET", monitor.RulesKey, field, bs); err != nil {
  129. log.Error("conn.Do(HSET,%s,%s,%s) error(%v)", monitor.RulesKey, field, bs, err)
  130. return
  131. }
  132. return
  133. }
  134. // GetRule 获取某条监控规则
  135. func (d *Dao) GetRule(c context.Context, tp, bid int8, id int64) (rule *monitor.Rule, err error) {
  136. var (
  137. conn = d.redis.Get(c)
  138. field = fmt.Sprintf(FieldKeyFormat, tp, bid, id)
  139. bs []byte
  140. )
  141. defer conn.Close()
  142. if bs, err = redis.Bytes(conn.Do("HGET", monitor.RulesKey, field)); err != nil {
  143. log.Error("conn.Do(HGET,%s,%s) error(%v)", monitor.RulesKey, field, err)
  144. return
  145. }
  146. rule = &monitor.Rule{}
  147. if err = json.Unmarshal(bs, rule); err != nil {
  148. log.Error("json.Unmarshal(%v) error(%v)", bs, err)
  149. return
  150. }
  151. return
  152. }
  153. // SetRuleState 修改监控规则的状态
  154. func (d *Dao) SetRuleState(c context.Context, tp, bid int8, id int64, state int8) (err error) {
  155. var (
  156. rule *monitor.Rule
  157. )
  158. if rule, err = d.GetRule(c, tp, bid, id); err != nil {
  159. return
  160. }
  161. rule.State = state
  162. if err = d.SetRule(c, rule); err != nil {
  163. return
  164. }
  165. return
  166. }
  167. // RuleIDIncKey 自增配置id
  168. func (d *Dao) RuleIDIncKey(c context.Context) (id int64, err error) {
  169. var (
  170. conn = d.redis.Get(c)
  171. )
  172. defer conn.Close()
  173. if id, err = redis.Int64(conn.Do("INCR", monitor.RuleIDIncKey)); err != nil {
  174. log.Error("conn.Do(INCR,%s) error(%v)", monitor.RuleIDIncKey, err)
  175. }
  176. return
  177. }
  178. // BusStatsKeys 获取某业务统计的所有keys
  179. func (d *Dao) BusStatsKeys(c context.Context, bid int8) (prefix string, keys []string, err error) {
  180. var (
  181. conf *monitor.KeyConf
  182. ok bool
  183. )
  184. if conf, ok = monitor.RedisKeyConf[bid]; !ok {
  185. err = errors.New("业务redis key配置不存在")
  186. log.Error("d.BusStatsKeys(%d) error(%v)", bid, err)
  187. return
  188. }
  189. prefix = fmt.Sprintf(monitor.BusPrefix, bid)
  190. //TODO 递归实现
  191. if bid == monitor.BusVideo {
  192. for _, v := range conf.KFields["state"] {
  193. key := prefix + fmt.Sprintf(monitor.SuffixVideo, v)
  194. keys = append(keys, key)
  195. }
  196. } else if bid == monitor.BusArc {
  197. for _, round := range conf.KFields["round"] {
  198. for _, state := range conf.KFields["state"] {
  199. key := prefix + fmt.Sprintf(monitor.SuffixArc, round, state)
  200. keys = append(keys, key)
  201. }
  202. }
  203. }
  204. return
  205. }
  206. // StayOids 获取多个key 中的滞留oid
  207. func (d *Dao) StayOids(c context.Context, rule *monitor.Rule, keys []string) (oidMap map[int64]int, total int, err error) {
  208. var (
  209. conn = d.redis.Get(c)
  210. intMap map[string]int
  211. min, max int64
  212. now = time.Now().Unix()
  213. )
  214. defer conn.Close()
  215. oidMap = make(map[int64]int)
  216. intMap = make(map[string]int)
  217. if _, ok := rule.RuleConf.NotifyCdt["time"]; !ok {
  218. log.Error("StayOids(%+v) Rule配置中NotifyCdt 没有time", *rule)
  219. err = errors.New(fmt.Sprintf("Rule(%d) NotifyCdt Error: no time", rule.ID))
  220. return
  221. }
  222. timeConf := rule.RuleConf.NotifyCdt["time"]
  223. switch timeConf.Comp {
  224. case monitor.CompGT:
  225. min = 0
  226. max = now - timeConf.Value
  227. case monitor.CompLT:
  228. min = now - timeConf.Value
  229. max = now
  230. default:
  231. log.Error("StayOids(%+v) Rule配置NotifyCdt中time的表达式错误", *rule)
  232. err = errors.New(fmt.Sprintf("Rule(%d) NotifyCdt Error: unknown time comp", rule.ID))
  233. return
  234. }
  235. //key排序
  236. sort.Strings(keys)
  237. //计算count 翻页
  238. for _, key := range keys {
  239. count := 0
  240. if count, err = redis.Int(conn.Do("ZCOUNT", key, min, max)); err != nil {
  241. log.Error("redis.Int(conn.Do(\"ZCOUNT\", %s, %d, %d)) error(%v)", key, min, max, err)
  242. return
  243. }
  244. total += count
  245. if intMap, err = redis.IntMap(conn.Do("ZRANGEBYSCORE", key, min, max, "WITHSCORES")); err != nil {
  246. log.Error("redis.IntMap(conn.Do(\"ZRANGEBYSCORE\", %s, %d, %d, \"WITHSCORES\")) error(%v)", key, min, max, err)
  247. return
  248. }
  249. for k, v := range intMap {
  250. oid := 0
  251. if oid, err = strconv.Atoi(k); err != nil {
  252. log.Error("strconv.Atoi(%s) error(%v)", k, err)
  253. }
  254. oidMap[int64(oid)] = v
  255. }
  256. }
  257. return
  258. }
  259. // RemMonitorStats remove stay stats
  260. func (d *Dao) RemMonitorStats(c context.Context, key string, oid int64) (err error) {
  261. var (
  262. conn = d.redis.Get(c)
  263. )
  264. defer conn.Close()
  265. if _, err = conn.Do("ZREM", key, oid); err != nil {
  266. log.Error("conn.Do(ZADD, %s, %d) error(%v)", key, oid, err)
  267. }
  268. return
  269. }