redis.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/service/main/antispam/model"
  6. "go-common/library/cache/redis"
  7. "go-common/library/log"
  8. )
  9. const (
  10. _regexpsKey = "regexps"
  11. _localCountsKey = "resource_id:%d:keyword_id:%d:local_limit_counts"
  12. _totalCountsKey = "keyword_id:%d:total_counts"
  13. _globalCountsKey = "keyword_id:%d:global_limit_counts"
  14. _keywordsSenderIDsKey = "keyword_id:%d:sender_ids"
  15. _rulesKey = "rule:area:%s:limit_type:%s"
  16. _areaSendersKey = "area:%s:sender_id:%d"
  17. )
  18. func sendersKey(keywordID int64) string {
  19. return fmt.Sprintf(_keywordsSenderIDsKey, keywordID)
  20. }
  21. func areaSendersKey(area string, senderID int64) string {
  22. return fmt.Sprintf(_areaSendersKey, area, senderID)
  23. }
  24. func totalCountsKey(keywordID int64) string {
  25. return fmt.Sprintf(_totalCountsKey, keywordID)
  26. }
  27. func localCountsKey(keywordID, oid int64) string {
  28. return fmt.Sprintf(_localCountsKey, oid, keywordID)
  29. }
  30. func globalCountsKey(keywordID int64) string {
  31. return fmt.Sprintf(_globalCountsKey, keywordID)
  32. }
  33. func rulesKey(area, limitType string) string {
  34. return fmt.Sprintf(_rulesKey, area, limitType)
  35. }
  36. // pingRedis check redis connection
  37. func (d *Dao) pingRedis(c context.Context) (err error) {
  38. conn := d.redis.Get(c)
  39. _, err = conn.Do("SET", "PING", "PONG")
  40. conn.Close()
  41. return
  42. }
  43. // CntSendersCache .
  44. func (d *Dao) CntSendersCache(c context.Context, keywordID int64) (cnt int64, err error) {
  45. var (
  46. key = sendersKey(keywordID)
  47. conn = d.redis.Get(c)
  48. )
  49. defer conn.Close()
  50. if cnt, err = redis.Int64(conn.Do("ZCARD", key)); err != nil {
  51. log.Error("redis.Int64(conn.Do(ZCARD, %s)) error(%v)", key, err)
  52. }
  53. return
  54. }
  55. // GlobalLocalLimitCache .
  56. func (d *Dao) GlobalLocalLimitCache(c context.Context, keywordID, oid int64) ([]int64, error) {
  57. var (
  58. globalKey = globalCountsKey(keywordID)
  59. localKey = localCountsKey(keywordID, oid)
  60. conn = d.redis.Get(c)
  61. )
  62. defer conn.Close()
  63. if err := conn.Send("GET", globalKey); err != nil {
  64. log.Error("%v", err)
  65. return nil, err
  66. }
  67. if err := conn.Send("GET", localKey); err != nil {
  68. log.Error("%v", err)
  69. return nil, err
  70. }
  71. if err := conn.Flush(); err != nil {
  72. log.Error("%v", err)
  73. return nil, err
  74. }
  75. counts := make([]int64, 0)
  76. for i := 0; i < 2; i++ {
  77. count, err := redis.Int64(conn.Receive())
  78. if err == nil || err == redis.ErrNil {
  79. counts = append(counts, count)
  80. continue
  81. }
  82. log.Error("%v", err)
  83. return nil, err
  84. }
  85. return counts, nil
  86. }
  87. // IncrGlobalLimitCache .
  88. func (d *Dao) IncrGlobalLimitCache(c context.Context, keywordID int64) (int64, error) {
  89. var (
  90. key = globalCountsKey(keywordID)
  91. conn = d.redis.Get(c)
  92. )
  93. defer conn.Close()
  94. count, err := redis.Int64(conn.Do("INCR", key))
  95. if err != nil {
  96. log.Error("%v", err)
  97. return 0, err
  98. }
  99. return count, nil
  100. }
  101. // IncrLocalLimitCache .
  102. func (d *Dao) IncrLocalLimitCache(c context.Context, keywordID, oid int64) (int64, error) {
  103. var (
  104. key = localCountsKey(keywordID, oid)
  105. conn = d.redis.Get(c)
  106. )
  107. defer conn.Close()
  108. count, err := redis.Int64(conn.Do("INCR", key))
  109. if err != nil {
  110. log.Error("%v", err)
  111. return 0, err
  112. }
  113. return count, nil
  114. }
  115. // LocalLimitExpire .
  116. func (d *Dao) LocalLimitExpire(c context.Context, keywordID, oid, dur int64) error {
  117. var (
  118. key = localCountsKey(keywordID, oid)
  119. conn = d.redis.Get(c)
  120. )
  121. defer conn.Close()
  122. if _, err := conn.Do("EXPIRE", key, dur); err != nil {
  123. log.Error("%v", err)
  124. return err
  125. }
  126. return nil
  127. }
  128. // GlobalLimitExpire .
  129. func (d *Dao) GlobalLimitExpire(c context.Context, keywordID, dur int64) error {
  130. var (
  131. key = globalCountsKey(keywordID)
  132. conn = d.redis.Get(c)
  133. )
  134. defer conn.Close()
  135. if _, err := conn.Do("EXPIRE", key, dur); err != nil {
  136. log.Error("%v", err)
  137. return err
  138. }
  139. return nil
  140. }
  141. // DelRegexpCache .
  142. func (d *Dao) DelRegexpCache(c context.Context) error {
  143. conn := d.redis.Get(c)
  144. defer conn.Close()
  145. if _, err := conn.Do("DEL", _regexpsKey); err != nil {
  146. log.Error("%v", err)
  147. return err
  148. }
  149. return nil
  150. }
  151. // DelRulesCache .
  152. func (d *Dao) DelRulesCache(c context.Context, area, limitType string) error {
  153. var (
  154. key = rulesKey(area, limitType)
  155. conn = d.redis.Get(c)
  156. )
  157. defer conn.Close()
  158. if _, err := conn.Do("DEL", key); err != nil {
  159. log.Error("%v", err)
  160. return err
  161. }
  162. return nil
  163. }
  164. // AreaSendersExpire .
  165. func (d *Dao) AreaSendersExpire(c context.Context, area string, senderID, dur int64) error {
  166. var (
  167. key = areaSendersKey(area, senderID)
  168. conn = d.redis.Get(c)
  169. )
  170. defer conn.Close()
  171. if _, err := conn.Do("EXPIRE", key, dur); err != nil {
  172. log.Error("%v", err)
  173. return err
  174. }
  175. return nil
  176. }
  177. // IncrAreaSendersCache .
  178. func (d *Dao) IncrAreaSendersCache(c context.Context, area string, senderID int64) (int64, error) {
  179. var (
  180. key = areaSendersKey(area, senderID)
  181. conn = d.redis.Get(c)
  182. )
  183. defer conn.Close()
  184. count, err := redis.Int64(conn.Do("INCR", key))
  185. if err != nil {
  186. log.Error("%v", err)
  187. return 0, err
  188. }
  189. return count, nil
  190. }
  191. // AllSendersCache .
  192. func (d *Dao) AllSendersCache(c context.Context, keywordID int64) ([]string, error) {
  193. var (
  194. key = sendersKey(keywordID)
  195. conn = d.redis.Get(c)
  196. )
  197. defer conn.Close()
  198. r, err := redis.Strings(conn.Do("ZRANGEBYSCORE", key, "-inf", "+inf"))
  199. if err != nil {
  200. log.Error("%v", err)
  201. return nil, err
  202. }
  203. return r, nil
  204. }
  205. // SendersCache .
  206. func (d *Dao) SendersCache(c context.Context, keywordID, limit, offset int64) ([]string, error) {
  207. var (
  208. key = sendersKey(keywordID)
  209. conn = d.redis.Get(c)
  210. )
  211. defer conn.Close()
  212. r, err := redis.Strings(conn.Do("ZRANGEBYSCORE", key, "-inf", "+inf", "LIMIT", limit, offset))
  213. if err != nil {
  214. log.Error("%v", err)
  215. return nil, err
  216. }
  217. return r, nil
  218. }
  219. // TotalLimitExpire .
  220. func (d *Dao) TotalLimitExpire(c context.Context, keywordID, dur int64) error {
  221. var (
  222. key = totalCountsKey(keywordID)
  223. conn = d.redis.Get(c)
  224. )
  225. defer conn.Close()
  226. if _, err := conn.Do("EXPIRE", key, dur); err != nil {
  227. log.Error("%v", err)
  228. return err
  229. }
  230. return nil
  231. }
  232. // IncrTotalLimitCache .
  233. func (d *Dao) IncrTotalLimitCache(c context.Context, keywordID int64) (int64, error) {
  234. var (
  235. key = totalCountsKey(keywordID)
  236. conn = d.redis.Get(c)
  237. )
  238. defer conn.Close()
  239. count, err := redis.Int64(conn.Do("INCR", key))
  240. if err != nil {
  241. log.Error("%v", err)
  242. return 0, err
  243. }
  244. return count, nil
  245. }
  246. // ZaddSendersCache insert into sortedset and return total counts of sorted set
  247. func (d *Dao) ZaddSendersCache(c context.Context, keywordID, score, senderID int64) (int64, error) {
  248. var (
  249. key = sendersKey(keywordID)
  250. val = fmt.Sprintf("%d", senderID)
  251. conn = d.redis.Get(c)
  252. )
  253. defer conn.Close()
  254. _, err := redis.Int64(conn.Do("ZADD", key, score, val))
  255. if err != nil {
  256. log.Error("%v", err)
  257. return 0, err
  258. }
  259. r, err := redis.Int64(conn.Do("ZCARD", key))
  260. if err != nil {
  261. log.Error("%v", err)
  262. return 0, err
  263. }
  264. return r, nil
  265. }
  266. // ZremSendersCache return the number of memebers removed from the sorted set
  267. func (d *Dao) ZremSendersCache(c context.Context, keywordID int64, senderIDStr string) (int64, error) {
  268. var (
  269. key = sendersKey(keywordID)
  270. conn = d.redis.Get(c)
  271. )
  272. defer conn.Close()
  273. r, err := redis.Int64(conn.Do("ZREM", key, senderIDStr))
  274. if err != nil {
  275. log.Error("%v", err)
  276. return 0, err
  277. }
  278. return r, nil
  279. }
  280. // DelKeywordRelatedCache .
  281. func (d *Dao) DelKeywordRelatedCache(c context.Context, ks []*model.Keyword) error {
  282. var conn = d.redis.Get(c)
  283. defer conn.Close()
  284. for _, v := range ks {
  285. if err := conn.Send("DEL", totalCountsKey(v.ID)); err != nil {
  286. log.Error("%v", err)
  287. return err
  288. }
  289. if err := conn.Send("DEL", sendersKey(v.ID)); err != nil {
  290. log.Error("%v", err)
  291. return err
  292. }
  293. }
  294. if err := conn.Flush(); err != nil {
  295. log.Error("%v", err)
  296. return err
  297. }
  298. for i := 0; i < len(ks)*2; i++ {
  299. if _, err := conn.Receive(); err != nil {
  300. log.Error("conn.Receive() error(%v)", err)
  301. return err
  302. }
  303. }
  304. return nil
  305. }
  306. // DelCountRelatedCache .
  307. func (d *Dao) DelCountRelatedCache(c context.Context, k *model.Keyword) error {
  308. var conn = d.redis.Get(c)
  309. defer conn.Close()
  310. if err := conn.Send("DEL", globalCountsKey(k.ID)); err != nil {
  311. log.Error("%v", err)
  312. return err
  313. }
  314. if err := conn.Send("DEL", localCountsKey(k.ID, k.SenderID)); err != nil {
  315. log.Error("%v", err)
  316. return err
  317. }
  318. if err := conn.Send("DEL", sendersKey(k.ID)); err != nil {
  319. log.Error("%v", err)
  320. return err
  321. }
  322. if err := conn.Flush(); err != nil {
  323. log.Error("%v", err)
  324. return err
  325. }
  326. for i := 0; i < 3; i++ {
  327. if _, err := conn.Receive(); err != nil {
  328. log.Error("conn.Receive() error(%v)", err)
  329. return err
  330. }
  331. }
  332. return nil
  333. }