upper_rds.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package archive
  2. import (
  3. "context"
  4. "strconv"
  5. "go-common/app/service/main/up/model"
  6. "go-common/library/cache/redis"
  7. "go-common/library/log"
  8. "go-common/library/time"
  9. )
  10. const (
  11. _prefixUpCnt = "uc_"
  12. _prefixUpPas = "up_"
  13. )
  14. func upCntKey(mid int64) string {
  15. return _prefixUpCnt + strconv.FormatInt(mid, 10)
  16. }
  17. func upPasKey(mid int64) string {
  18. return _prefixUpPas + strconv.FormatInt(mid, 10)
  19. }
  20. // AddUpperCountCache the count of up's archives
  21. func (d *Dao) AddUpperCountCache(c context.Context, mid int64, count int64) (err error) {
  22. var (
  23. key = upCntKey(mid)
  24. conn = d.upRds.Get(c)
  25. expireTime = d.upExpire
  26. )
  27. defer conn.Close()
  28. if count == 0 {
  29. expireTime = 600
  30. }
  31. if _, err = conn.Do("SETEX", key, expireTime, count); err != nil {
  32. log.Error("conn.Do(SETEX, %s, %d, %d)", key, expireTime, count)
  33. return
  34. }
  35. return
  36. }
  37. // UpperCountCache get up count from cache.
  38. func (d *Dao) UpperCountCache(c context.Context, mid int64) (count int64, err error) {
  39. var (
  40. key = upCntKey(mid)
  41. conn = d.upRds.Get(c)
  42. )
  43. defer conn.Close()
  44. if count, err = redis.Int64(conn.Do("GET", key)); err != nil {
  45. if err == redis.ErrNil {
  46. count = -1
  47. err = nil
  48. } else {
  49. d.errProm.Incr("upper_redis")
  50. log.Error("conn.Do(GET, %s) error(%v)", key, err)
  51. }
  52. }
  53. return
  54. }
  55. // UppersCountCache return uppers count cache
  56. func (d *Dao) UppersCountCache(c context.Context, mids []int64) (cached map[int64]int64, missed []int64, err error) {
  57. conn := d.upRds.Get(c)
  58. defer conn.Close()
  59. cached = make(map[int64]int64)
  60. for _, mid := range mids {
  61. key := upCntKey(mid)
  62. if err = conn.Send("GET", key); err != nil {
  63. missed = mids
  64. continue
  65. }
  66. }
  67. if err = conn.Flush(); err != nil {
  68. missed = mids
  69. return
  70. }
  71. for _, mid := range mids {
  72. var cnt int64
  73. if cnt, err = redis.Int64(conn.Receive()); err != nil {
  74. if err == redis.ErrNil {
  75. missed = append(missed, mid)
  76. err = nil
  77. continue
  78. }
  79. }
  80. cached[mid] = int64(cnt)
  81. }
  82. return
  83. }
  84. // UpperPassedCache get upper passed archives from cache.
  85. func (d *Dao) UpperPassedCache(c context.Context, mid int64, start, end int) (aids []int64, err error) {
  86. var (
  87. key = upPasKey(mid)
  88. conn = d.upRds.Get(c)
  89. )
  90. defer conn.Close()
  91. if aids, err = redis.Int64s(conn.Do("ZREVRANGE", key, start, end)); err != nil {
  92. d.errProm.Incr("upper_redis")
  93. log.Error("conn.Do(ZRANGE, %s, 0, -1) error(%v)", key, err)
  94. }
  95. return
  96. }
  97. // UppersPassedCacheWithScore get uppers passed archive from cache with score
  98. func (d *Dao) UppersPassedCacheWithScore(c context.Context, mids []int64, start, end int) (aidm map[int64][]*model.AidPubTime, err error) {
  99. conn := d.upRds.Get(c)
  100. defer conn.Close()
  101. aidm = make(map[int64][]*model.AidPubTime, len(mids))
  102. for _, mid := range mids {
  103. key := upPasKey(mid)
  104. if err = conn.Send("ZREVRANGE", key, start, end, "WITHSCORES"); err != nil {
  105. d.errProm.Incr("upper_redis")
  106. log.Error("conn.Send(ZREVRANGE, %s) error(%v)", key, err)
  107. return
  108. }
  109. }
  110. if err = conn.Flush(); err != nil {
  111. log.Error("conn.Flush error(%v)", err)
  112. return
  113. }
  114. for _, mid := range mids {
  115. aidScores, err := redis.Int64s(conn.Receive())
  116. if err != nil {
  117. d.errProm.Incr("upper_redis")
  118. log.Error("conn.Do(GET, %d) error(%v)", mid, err)
  119. continue
  120. }
  121. for i := 0; i < len(aidScores); i += 2 {
  122. var (
  123. score int64
  124. ptime int64
  125. copyright int8
  126. )
  127. score = aidScores[i+1]
  128. if score > 1000000000 {
  129. ptime = score >> 2
  130. copyright = int8(score & 3)
  131. aidm[mid] = append(aidm[mid], &model.AidPubTime{Aid: aidScores[i], PubDate: time.Time(ptime), Copyright: copyright})
  132. } else {
  133. aidm[mid] = append(aidm[mid], &model.AidPubTime{Aid: aidScores[i], PubDate: time.Time(score)})
  134. }
  135. }
  136. }
  137. return
  138. }
  139. // UppersPassedCache get uppers passed archives from cache.
  140. func (d *Dao) UppersPassedCache(c context.Context, mids []int64, start, end int) (aidm map[int64][]int64, err error) {
  141. conn := d.upRds.Get(c)
  142. defer conn.Close()
  143. aidm = make(map[int64][]int64, len(mids))
  144. for _, mid := range mids {
  145. key := upPasKey(mid)
  146. if err = conn.Send("ZREVRANGE", key, start, end); err != nil {
  147. d.errProm.Incr("upper_redis")
  148. log.Error("conn.Send(ZREVRANGE, %s) error(%v)", key, err)
  149. return
  150. }
  151. }
  152. if err = conn.Flush(); err != nil {
  153. log.Error("conn.Flush error(%v)", err)
  154. return
  155. }
  156. for _, mid := range mids {
  157. aids, err := redis.Int64s(conn.Receive())
  158. if err != nil {
  159. d.errProm.Incr("upper_redis")
  160. log.Error("conn.Do(GET, %d) error(%v)", mid, err)
  161. continue
  162. }
  163. aidm[mid] = aids
  164. }
  165. return
  166. }
  167. // ExpireUpperPassedCache expire up passed cache.
  168. func (d *Dao) ExpireUpperPassedCache(c context.Context, mid int64) (ok bool, err error) {
  169. var (
  170. key = upPasKey(mid)
  171. conn = d.upRds.Get(c)
  172. )
  173. defer conn.Close()
  174. if ok, err = redis.Bool(conn.Do("EXPIRE", key, d.upExpire)); err != nil {
  175. d.errProm.Incr("upper_redis")
  176. log.Error("conn.Do(EXPIRE, %s, %d) error(%v)", key, d.upExpire, err)
  177. }
  178. return
  179. }
  180. // ExpireUppersCountCache expire ups count cache
  181. func (d *Dao) ExpireUppersCountCache(c context.Context, mids []int64) (cachedUp, missed []int64, err error) {
  182. var conn = d.upRds.Get(c)
  183. defer conn.Close()
  184. defer func() {
  185. if err != nil {
  186. d.errProm.Incr("upper_redis")
  187. }
  188. }()
  189. for _, mid := range mids {
  190. var key = upCntKey(mid)
  191. if err = conn.Send("GET", key); err != nil {
  192. log.Error("conn.Send(GET, %s) error(%v)", key, err)
  193. return
  194. }
  195. }
  196. for _, mid := range mids {
  197. var key = upCntKey(mid)
  198. if err = conn.Send("EXPIRE", key, d.upExpire); err != nil {
  199. log.Error("conn.Send(GET, %s) error(%v)", key, err)
  200. return
  201. }
  202. }
  203. if err = conn.Flush(); err != nil {
  204. log.Error("conn.Flush error(%v)", err)
  205. return
  206. }
  207. cachedUp = make([]int64, 0)
  208. missed = make([]int64, 0)
  209. for _, mid := range mids {
  210. var cnt int
  211. if cnt, err = redis.Int(conn.Receive()); err != nil {
  212. if err == redis.ErrNil {
  213. err = nil
  214. missed = append(missed, mid)
  215. } else {
  216. log.Error("conn.Receive error(%v)", err)
  217. return
  218. }
  219. } else if cnt > 0 {
  220. cachedUp = append(cachedUp, mid)
  221. }
  222. }
  223. for _, mid := range mids {
  224. if _, err = redis.Bool(conn.Receive()); err != nil {
  225. log.Error("conn.Receive mid(%d) error(%v)", mid, err)
  226. return
  227. }
  228. }
  229. return
  230. }
  231. // ExpireUppersPassedCache expire uppers passed cache.
  232. func (d *Dao) ExpireUppersPassedCache(c context.Context, mids []int64) (res map[int64]bool, err error) {
  233. conn := d.upRds.Get(c)
  234. defer conn.Close()
  235. res = make(map[int64]bool, len(mids))
  236. for _, mid := range mids {
  237. key := upPasKey(mid)
  238. if err = conn.Send("EXPIRE", key, d.upExpire); err != nil {
  239. d.errProm.Incr("upper_redis")
  240. log.Error("conn.Send(%s) error(%v)", key, err)
  241. return
  242. }
  243. }
  244. if err = conn.Flush(); err != nil {
  245. log.Error("conn.Flush error(%v)", err)
  246. return
  247. }
  248. var ok bool
  249. for _, mid := range mids {
  250. if ok, err = redis.Bool(conn.Receive()); err != nil {
  251. d.errProm.Incr("upper_redis")
  252. log.Error("conn.Receive() error(%v)", err)
  253. return
  254. }
  255. res[mid] = ok
  256. }
  257. return
  258. }
  259. // AddUpperPassedCache add up paassed cache.
  260. func (d *Dao) AddUpperPassedCache(c context.Context, mid int64, aids []int64, ptimes []time.Time, copyrights []int8) (err error) {
  261. var (
  262. key = upPasKey(mid)
  263. conn = d.upRds.Get(c)
  264. )
  265. defer conn.Close()
  266. for k, aid := range aids {
  267. score := int64(ptimes[k]<<2) | int64(copyrights[k])
  268. if err = conn.Send("ZADD", key, score, aid); err != nil {
  269. d.errProm.Incr("upper_redis")
  270. log.Error("conn.Send(ZADD, %s, %d, %d) error(%v)", key, aid, ptimes[k], err)
  271. return
  272. }
  273. }
  274. if err = conn.Flush(); err != nil {
  275. d.errProm.Incr("upper_redis")
  276. log.Error("conn.Flush error(%v)", err)
  277. return
  278. }
  279. for i := 0; i < len(aids); i++ {
  280. if _, err = conn.Receive(); err != nil {
  281. d.errProm.Incr("upper_redis")
  282. log.Error("conn.Receive error(%v)", err)
  283. return
  284. }
  285. }
  286. return
  287. }
  288. // DelUpperPassedCache delete up passed cache.
  289. func (d *Dao) DelUpperPassedCache(c context.Context, mid, aid int64) (err error) {
  290. var (
  291. key = upPasKey(mid)
  292. conn = d.upRds.Get(c)
  293. )
  294. defer conn.Close()
  295. if _, err = conn.Do("ZREM", key, aid); err != nil {
  296. d.errProm.Incr("upper_redis")
  297. log.Error("conn.Do(ZERM, %s, %d) error(%v)", key, aid, err)
  298. }
  299. return
  300. }