123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- package monitor
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "go-common/app/admin/main/videoup/model/monitor"
- "go-common/library/cache/redis"
- "go-common/library/log"
- "sort"
- "strconv"
- "time"
- )
- const (
- FieldKeyFormat = "%d_%d_%d" //监控规则配置的Redis key中的field格式
- )
- // StatsResult 获取稿件停留统计
- func (d *Dao) StatsResult(c context.Context, key string, conf *monitor.RuleConf) (res *monitor.Stats, err error) {
- var (
- conn = d.redis.Get(c)
- totalC, moniC, maxT int
- now = time.Now().Unix()
- tFrom, tTo int64
- timeCdt int64
- compCdt string
- ok bool
- )
- defer conn.Close()
- if _, ok = conf.NotifyCdt["time"]; !ok {
- err = errors.New("配置的 NotifyCdt 中不存在 time")
- return
- }
- timeCdt = conf.NotifyCdt["time"].Value
- compCdt = conf.NotifyCdt["time"].Comp
- switch compCdt {
- case monitor.CompGT:
- tFrom = 0
- tTo = now - timeCdt
- case monitor.CompLT:
- tFrom = now - timeCdt
- tTo = now
- default:
- err = errors.New("配置的 NotifyCdt 中 comparison 不合法: " + compCdt)
- return
- }
- if totalC, err = redis.Int(conn.Do("ZCOUNT", key, 0, now)); err != nil {
- log.Error("conn.Do(ZCOUNT,%s,0,%d) error(%v)", key, now, err)
- return
- }
- if moniC, err = redis.Int(conn.Do("ZCOUNT", key, tFrom, tTo)); err != nil {
- log.Error("conn.Do(ZCOUNT,%s,%d,%d) error(%v)", key, tFrom, tTo, err)
- return
- }
- var oldest map[string]string //进入列表最久的项
- oldest, err = redis.StringMap(conn.Do("ZRANGE", key, 0, 0, "WITHSCORES"))
- for _, t := range oldest {
- var i int
- if i, err = strconv.Atoi(t); err != nil {
- return
- }
- maxT = int(now) - i
- }
- res = &monitor.Stats{
- TotalCount: totalC,
- MoniCount: moniC,
- MaxTime: maxT,
- }
- return
- }
- // GetAllRules 获取所有规则
- func (d *Dao) GetAllRules(c context.Context, all bool) (rules []*monitor.Rule, err error) {
- var (
- conn = d.redis.Get(c)
- res = make(map[string]string)
- )
- defer conn.Close()
- if res, err = redis.StringMap(conn.Do("HGETALL", monitor.RulesKey)); err != nil {
- if err != redis.ErrNil {
- log.Error("conn.Do(HGETALL, %s) error(%v)", monitor.RulesKey, err)
- return
- }
- }
- for _, v := range res {
- rule := &monitor.Rule{}
- if err = json.Unmarshal([]byte(v), rule); err != nil {
- log.Error("json.Unmarshal(%v) error(%v)", v, err)
- break
- }
- if !all && rule.State != 1 {
- continue
- }
- rules = append(rules, rule)
- }
- return
- }
- // GetRules 获取业务下的规则
- func (d *Dao) GetRules(c context.Context, tp, bid int8, all bool) (rules []*monitor.Rule, err error) {
- if rules, err = d.GetAllRules(c, all); err != nil {
- return
- }
- for k := 0; k < len(rules); k++ {
- v := rules[k]
- if v.Type != tp || v.Business != bid { //去掉非当前业务开头的配置
- rules = append(rules[:k], rules[k+1:]...)
- k--
- continue
- }
- }
- return
- }
- // SetRule 修改/添加监控规则
- func (d *Dao) SetRule(c context.Context, rule *monitor.Rule) (err error) {
- if rule.ID == 0 {
- if rule.ID, err = d.RuleIDIncKey(c); err != nil {
- return
- }
- }
- var (
- conn = d.redis.Get(c)
- field = fmt.Sprintf(FieldKeyFormat, rule.Type, rule.Business, rule.ID)
- bs []byte
- )
- defer conn.Close()
- if bs, err = json.Marshal(rule); err != nil {
- log.Error("json.Marshal(%v) error(%v)", rule, err)
- return
- }
- if _, err = conn.Do("HSET", monitor.RulesKey, field, bs); err != nil {
- log.Error("conn.Do(HSET,%s,%s,%s) error(%v)", monitor.RulesKey, field, bs, err)
- return
- }
- return
- }
- // GetRule 获取某条监控规则
- func (d *Dao) GetRule(c context.Context, tp, bid int8, id int64) (rule *monitor.Rule, err error) {
- var (
- conn = d.redis.Get(c)
- field = fmt.Sprintf(FieldKeyFormat, tp, bid, id)
- bs []byte
- )
- defer conn.Close()
- if bs, err = redis.Bytes(conn.Do("HGET", monitor.RulesKey, field)); err != nil {
- log.Error("conn.Do(HGET,%s,%s) error(%v)", monitor.RulesKey, field, err)
- return
- }
- rule = &monitor.Rule{}
- if err = json.Unmarshal(bs, rule); err != nil {
- log.Error("json.Unmarshal(%v) error(%v)", bs, err)
- return
- }
- return
- }
- // SetRuleState 修改监控规则的状态
- func (d *Dao) SetRuleState(c context.Context, tp, bid int8, id int64, state int8) (err error) {
- var (
- rule *monitor.Rule
- )
- if rule, err = d.GetRule(c, tp, bid, id); err != nil {
- return
- }
- rule.State = state
- if err = d.SetRule(c, rule); err != nil {
- return
- }
- return
- }
- // RuleIDIncKey 自增配置id
- func (d *Dao) RuleIDIncKey(c context.Context) (id int64, err error) {
- var (
- conn = d.redis.Get(c)
- )
- defer conn.Close()
- if id, err = redis.Int64(conn.Do("INCR", monitor.RuleIDIncKey)); err != nil {
- log.Error("conn.Do(INCR,%s) error(%v)", monitor.RuleIDIncKey, err)
- }
- return
- }
- // BusStatsKeys 获取某业务统计的所有keys
- func (d *Dao) BusStatsKeys(c context.Context, bid int8) (prefix string, keys []string, err error) {
- var (
- conf *monitor.KeyConf
- ok bool
- )
- if conf, ok = monitor.RedisKeyConf[bid]; !ok {
- err = errors.New("业务redis key配置不存在")
- log.Error("d.BusStatsKeys(%d) error(%v)", bid, err)
- return
- }
- prefix = fmt.Sprintf(monitor.BusPrefix, bid)
- //TODO 递归实现
- if bid == monitor.BusVideo {
- for _, v := range conf.KFields["state"] {
- key := prefix + fmt.Sprintf(monitor.SuffixVideo, v)
- keys = append(keys, key)
- }
- } else if bid == monitor.BusArc {
- for _, round := range conf.KFields["round"] {
- for _, state := range conf.KFields["state"] {
- key := prefix + fmt.Sprintf(monitor.SuffixArc, round, state)
- keys = append(keys, key)
- }
- }
- }
- return
- }
- // StayOids 获取多个key 中的滞留oid
- func (d *Dao) StayOids(c context.Context, rule *monitor.Rule, keys []string) (oidMap map[int64]int, total int, err error) {
- var (
- conn = d.redis.Get(c)
- intMap map[string]int
- min, max int64
- now = time.Now().Unix()
- )
- defer conn.Close()
- oidMap = make(map[int64]int)
- intMap = make(map[string]int)
- if _, ok := rule.RuleConf.NotifyCdt["time"]; !ok {
- log.Error("StayOids(%+v) Rule配置中NotifyCdt 没有time", *rule)
- err = errors.New(fmt.Sprintf("Rule(%d) NotifyCdt Error: no time", rule.ID))
- return
- }
- timeConf := rule.RuleConf.NotifyCdt["time"]
- switch timeConf.Comp {
- case monitor.CompGT:
- min = 0
- max = now - timeConf.Value
- case monitor.CompLT:
- min = now - timeConf.Value
- max = now
- default:
- log.Error("StayOids(%+v) Rule配置NotifyCdt中time的表达式错误", *rule)
- err = errors.New(fmt.Sprintf("Rule(%d) NotifyCdt Error: unknown time comp", rule.ID))
- return
- }
- //key排序
- sort.Strings(keys)
- //计算count 翻页
- for _, key := range keys {
- count := 0
- if count, err = redis.Int(conn.Do("ZCOUNT", key, min, max)); err != nil {
- log.Error("redis.Int(conn.Do(\"ZCOUNT\", %s, %d, %d)) error(%v)", key, min, max, err)
- return
- }
- total += count
- if intMap, err = redis.IntMap(conn.Do("ZRANGEBYSCORE", key, min, max, "WITHSCORES")); err != nil {
- log.Error("redis.IntMap(conn.Do(\"ZRANGEBYSCORE\", %s, %d, %d, \"WITHSCORES\")) error(%v)", key, min, max, err)
- return
- }
- for k, v := range intMap {
- oid := 0
- if oid, err = strconv.Atoi(k); err != nil {
- log.Error("strconv.Atoi(%s) error(%v)", k, err)
- }
- oidMap[int64(oid)] = v
- }
- }
- return
- }
- // RemMonitorStats remove stay stats
- func (d *Dao) RemMonitorStats(c context.Context, key string, oid int64) (err error) {
- var (
- conn = d.redis.Get(c)
- )
- defer conn.Close()
- if _, err = conn.Do("ZREM", key, oid); err != nil {
- log.Error("conn.Do(ZADD, %s, %d) error(%v)", key, oid, err)
- }
- return
- }
|