redis.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package monitor
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/job/main/aegis/model/monitor"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. "strconv"
  10. "time"
  11. )
  12. const (
  13. // _maxAge Sorted
  14. _maxAge = 604800 //7天
  15. )
  16. // AddToSet add monitor stats
  17. func (d *Dao) AddToSet(c context.Context, keys []string, oid int64) (logs []string, err error) {
  18. if len(keys) == 0 {
  19. return
  20. }
  21. var (
  22. conn = d.redis.Get(c)
  23. now = time.Now().Unix()
  24. )
  25. defer conn.Close()
  26. for _, key := range keys {
  27. //先判断key是否存在,存在则忽略
  28. if v, _ := redis.Int(conn.Do("ZSCORE", key, oid)); v != 0 {
  29. logs = append(logs, fmt.Sprintf("AddToSet() conn.Do(ZSCORE, %s, %d) member exists success", key, oid))
  30. continue
  31. }
  32. if _, err = conn.Do("ZADD", key, now, oid); err != nil {
  33. log.Error("conn.Do(ZADD, %s, %d, %d) error(%v)", key, now, oid, err)
  34. logs = append(logs, fmt.Sprintf("AddToSet() conn.Do(ZADD, %s, %d, %d) error(%v)", key, now, oid, err))
  35. } else {
  36. logs = append(logs, fmt.Sprintf("AddToSet() conn.Do(ZADD, %s, %d, %d) success", key, now, oid))
  37. }
  38. if _, err = conn.Do("EXPIRE", key, _maxAge); err != nil {
  39. log.Error("conn.Do(EXPIRE, %s, %d) error(%v)", key, _maxAge, err)
  40. logs = append(logs, fmt.Sprintf("AddToSet() conn.Do(EXPIRE, %s, %d) error(%v)", key, _maxAge, err))
  41. } else {
  42. logs = append(logs, fmt.Sprintf("AddToSet() conn.Do(EXPIRE, %s, %d) success", key, _maxAge))
  43. }
  44. }
  45. return
  46. }
  47. // RemFromSet remove monitor stats
  48. func (d *Dao) RemFromSet(c context.Context, keys []string, oid int64) (logs []string, err error) {
  49. if len(keys) == 0 {
  50. return
  51. }
  52. var (
  53. conn = d.redis.Get(c)
  54. )
  55. defer conn.Close()
  56. for _, key := range keys {
  57. if _, er := conn.Do("ZREM", key, oid); er != nil {
  58. err = er
  59. log.Error("conn.Do(ZREM, %s, %d) error(%v)", key, oid, err)
  60. logs = append(logs, fmt.Sprintf("RemFromSet() conn.Do(ZREM, %s, %d) error(%v)", key, oid, err))
  61. continue
  62. }
  63. logs = append(logs, fmt.Sprintf("RemFromSet() conn.Do(ZREM, %s, %d) success", key, oid))
  64. }
  65. return
  66. }
  67. // ClearExpireSet clear expire stats
  68. func (d *Dao) ClearExpireSet(c context.Context, keys []string) (logs []string, err error) {
  69. if len(keys) == 0 {
  70. return
  71. }
  72. var (
  73. conn = d.redis.Get(c)
  74. now = time.Now().Unix()
  75. min int64
  76. max = now - _maxAge
  77. )
  78. defer conn.Close()
  79. for _, key := range keys {
  80. if _, er := conn.Do("ZREMRANGEBYSCORE", key, min, max); er != nil {
  81. err = er
  82. log.Error("conn.Do(ZREMRANGEBYSCORE, %s, %d, %d) error(%v)", key, min, max, err)
  83. logs = append(logs, fmt.Sprintf("ClearExpireSet() key: %s min:%d max:%d error:%v", key, min, max, err))
  84. continue
  85. }
  86. logs = append(logs, fmt.Sprintf("ClearExpireSet() key: %s min:%d max:%d success", key, min, max))
  87. }
  88. return
  89. }
  90. // AddToDelArc 添加稿件信息到
  91. func (d *Dao) AddToDelArc(c context.Context, a *monitor.BinlogArchive) (err error) {
  92. var (
  93. conn = d.redis.Get(c)
  94. bs []byte
  95. )
  96. defer conn.Close()
  97. info := &monitor.DelArcInfo{
  98. AID: a.ID,
  99. MID: a.MID,
  100. Time: a.MTime,
  101. Title: a.Title,
  102. }
  103. if bs, err = json.Marshal(info); err != nil {
  104. log.Error("json.Marshal(%+v) error:%v", info, err)
  105. return
  106. }
  107. if _, err = conn.Do("HSET", monitor.RedisDelArcInfo, a.ID, string(bs)); err != nil {
  108. log.Error("conn.Send(HSET,%s,%d,%s) error(%v)", monitor.RedisDelArcInfo, a.ID, bs, err)
  109. return
  110. }
  111. return
  112. }
  113. // ArcDelInfos 获取被删除稿件的信息
  114. func (d *Dao) ArcDelInfos(c context.Context, aids []int64) (infos map[int64]*monitor.DelArcInfo, err error) {
  115. var (
  116. conn = d.redis.Get(c)
  117. strs []string
  118. )
  119. defer conn.Close()
  120. infos = make(map[int64]*monitor.DelArcInfo)
  121. if len(aids) == 0 {
  122. return
  123. }
  124. args := redis.Args{}
  125. args = args.Add(monitor.RedisDelArcInfo)
  126. for _, id := range aids {
  127. args = args.Add(id)
  128. }
  129. log.Info("s.monitorNotify() ArcDelInfos. aids(%v) args(%+v)", aids, args)
  130. if strs, err = redis.Strings(conn.Do("HMGET", args...)); err != nil {
  131. log.Error("conn.Send(HMGET,%v) error(%v)", args, err)
  132. return
  133. }
  134. log.Info("s.monitorNotify() ArcDelInfos. aids(%v) strs(%v)", aids, strs)
  135. for _, v := range strs {
  136. info := &monitor.DelArcInfo{}
  137. if err = json.Unmarshal([]byte(v), info); err != nil {
  138. log.Error("json.Unmarshal(%s) error:%v", v, err)
  139. continue
  140. }
  141. infos[info.AID] = info
  142. }
  143. return
  144. }
  145. // MoniRuleStats 获取监控统计
  146. func (d *Dao) MoniRuleStats(c context.Context, id int64, min, max int64) (stats *monitor.Stats, err error) {
  147. var (
  148. conn = d.redis.Get(c)
  149. key = fmt.Sprintf(monitor.RedisPrefix, id)
  150. now = time.Now().Unix()
  151. )
  152. stats = &monitor.Stats{}
  153. defer conn.Close()
  154. if stats.TotalCount, err = redis.Int(conn.Do("ZCOUNT", key, 0, now)); err != nil {
  155. log.Error("conn.Do(ZCOUNT,%s,0,%d) error(%v)", key, now, err)
  156. return
  157. }
  158. if stats.MoniCount, err = redis.Int(conn.Do("ZCOUNT", key, min, max)); err != nil {
  159. log.Error("conn.Do(ZCOUNT,%s,%d,%d) error(%v)", key, min, max, err)
  160. return
  161. }
  162. var oldest map[string]string //进入列表最久的项
  163. oldest, err = redis.StringMap(conn.Do("ZRANGE", key, 0, 0, "WITHSCORES"))
  164. for _, t := range oldest {
  165. var i int
  166. if i, err = strconv.Atoi(t); err != nil {
  167. return
  168. }
  169. stats.MaxTime = int(now) - i
  170. }
  171. return
  172. }
  173. // MoniRuleOids 获取监控的id
  174. func (d *Dao) MoniRuleOids(c context.Context, id int64, min, max int64) (oidMap map[int64]int, err error) {
  175. var (
  176. conn = d.redis.Get(c)
  177. key = fmt.Sprintf(monitor.RedisPrefix, id)
  178. intMap map[string]int
  179. )
  180. oidMap = make(map[int64]int)
  181. intMap = make(map[string]int)
  182. defer conn.Close()
  183. if intMap, err = redis.IntMap(conn.Do("ZRANGEBYSCORE", key, min, max, "WITHSCORES")); err != nil {
  184. log.Error("redis.IntMap(conn.Do(\"ZRANGEBYSCORE\", %s, %d, %d, \"WITHSCORES\")) error(%v)", key, min, max, err)
  185. return
  186. }
  187. for k, v := range intMap {
  188. oid := 0
  189. if oid, err = strconv.Atoi(k); err != nil {
  190. log.Error("strconv.Atoi(%s) error(%v)", k, err)
  191. }
  192. oidMap[int64(oid)] = v
  193. }
  194. return
  195. }