redis.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. pb "go-common/app/service/main/history/api/grpc"
  8. "go-common/app/service/main/history/model"
  9. "go-common/library/cache/redis"
  10. "go-common/library/log"
  11. )
  12. const (
  13. _keySwitch = "s_" // mid -> bit(value)
  14. _bucket = 1000 // bit bucket
  15. )
  16. // keyHistory return history key.
  17. func keyHistory(business string, mid int64) string {
  18. return fmt.Sprintf("h_%d_%s", mid, business)
  19. }
  20. // keyIndex return history index key.
  21. func keyIndex(business string, mid int64) string {
  22. return fmt.Sprintf("i_%d_%s", mid, business)
  23. }
  24. // keySwitch return Switch key.
  25. func keySwitch(mid int64) string {
  26. return _keySwitch + strconv.FormatInt(mid/_bucket, 10)
  27. }
  28. // ListCacheByTime get aids from redis where score.
  29. func (d *Dao) ListCacheByTime(c context.Context, business string, mid int64, start int64) (aids []int64, err error) {
  30. conn := d.redis.Get(c)
  31. defer conn.Close()
  32. values, err := redis.Values(conn.Do("ZRANGEBYSCORE", keyIndex(business, mid), start, "INF", "WITHSCORES"))
  33. if err != nil {
  34. log.Error("conn.Do(ZRANGEBYSCORE %v) error(%v)", keyIndex(business, mid), err)
  35. return
  36. }
  37. if len(values) == 0 {
  38. return
  39. }
  40. var aid, unix int64
  41. for len(values) > 0 {
  42. if values, err = redis.Scan(values, &aid, &unix); err != nil {
  43. log.Error("redis.Scan(%v) error(%v)", values, err)
  44. return
  45. }
  46. aids = append(aids, aid)
  47. }
  48. return
  49. }
  50. // ListsCacheByTime get aids from redis where score.
  51. func (d *Dao) ListsCacheByTime(c context.Context, businesses []string, mid int64, viewAt, ps int64) (res map[string][]int64, err error) {
  52. conn := d.redis.Get(c)
  53. defer conn.Close()
  54. var count int
  55. for _, business := range businesses {
  56. if err = conn.Send("ZREVRANGEBYSCORE", keyIndex(business, mid), viewAt, "-INF", "LIMIT", 0, ps); err != nil {
  57. log.Error("conn.Do(ZRANGEBYSCORE %v) error(%v)", keyIndex(business, mid), err)
  58. return
  59. }
  60. count++
  61. }
  62. if err = conn.Flush(); err != nil {
  63. log.Error("conn.Flush() error(%v)", err)
  64. return
  65. }
  66. for i := 0; i < count; i++ {
  67. var values []int64
  68. values, err = redis.Int64s(conn.Receive())
  69. if err != nil {
  70. if err == redis.ErrNil {
  71. err = nil
  72. continue
  73. }
  74. log.Error("receive error(%v)", err)
  75. return
  76. }
  77. if len(values) == 0 {
  78. continue
  79. }
  80. if res == nil {
  81. res = make(map[string][]int64)
  82. }
  83. res[businesses[i]] = values
  84. }
  85. return
  86. }
  87. // SetUserHideCache set the user hide to redis.
  88. func (d *Dao) SetUserHideCache(c context.Context, mid, value int64) (err error) {
  89. key := keySwitch(mid)
  90. conn := d.redis.Get(c)
  91. if _, err = conn.Do("HSET", key, mid%_bucket, value); err != nil {
  92. log.Error("conn.Do(HSET %s,%d) error(%v)", key, value, err)
  93. }
  94. conn.Close()
  95. return
  96. }
  97. // UserHideCache return user hide state from redis.
  98. func (d *Dao) UserHideCache(c context.Context, mid int64) (value int64, err error) {
  99. key := keySwitch(mid)
  100. conn := d.redis.Get(c)
  101. defer conn.Close()
  102. if value, err = redis.Int64(conn.Do("HGET", key, mid%_bucket)); err != nil {
  103. if err == redis.ErrNil {
  104. return model.HideStateNotFound, nil
  105. }
  106. log.Error("conn.Do(HGET %s) error(%v)", key, err)
  107. }
  108. return
  109. }
  110. // HistoriesCache return the user histories from redis.
  111. func (d *Dao) HistoriesCache(c context.Context, mid int64, hs map[string][]int64) (res map[string]map[int64]*model.History, err error) {
  112. var (
  113. values, businesses []string
  114. aid int64
  115. k int
  116. )
  117. conn := d.redis.Get(c)
  118. defer conn.Close()
  119. for business, aids := range hs {
  120. businesses = append(businesses, business)
  121. key := keyHistory(business, mid)
  122. args := []interface{}{key}
  123. for _, aid := range aids {
  124. args = append(args, aid)
  125. }
  126. if err = conn.Send("HMGET", args...); err != nil {
  127. log.Error("conn.Send(HMGET %v %v) error(%v)", key, aids, err)
  128. return
  129. }
  130. }
  131. if err = conn.Flush(); err != nil {
  132. log.Error("conn.Flush() error(%v)", err)
  133. return
  134. }
  135. for i := 0; i < len(hs); i++ {
  136. if values, err = redis.Strings(conn.Receive()); err != nil {
  137. log.Error("conn.Receive error(%v)", err)
  138. if err == redis.ErrNil {
  139. continue
  140. }
  141. return
  142. }
  143. if res == nil {
  144. res = make(map[string]map[int64]*model.History)
  145. }
  146. business := businesses[i]
  147. for k, aid = range hs[business] {
  148. if values[k] == "" {
  149. continue
  150. }
  151. h := &model.History{}
  152. if err = json.Unmarshal([]byte(values[k]), h); err != nil {
  153. log.Error("json.Unmarshal(%s) error(%v)", values[k], err)
  154. err = nil
  155. continue
  156. }
  157. h.BusinessID = d.BusinessNames[h.Business].ID
  158. if res[business] == nil {
  159. res[business] = make(map[int64]*model.History)
  160. }
  161. res[business][aid] = h
  162. }
  163. }
  164. return
  165. }
  166. // ClearHistoryCache clear the user redis.
  167. func (d *Dao) ClearHistoryCache(c context.Context, mid int64, businesses []string) (err error) {
  168. var conn = d.redis.Get(c)
  169. var count int
  170. defer conn.Close()
  171. for _, business := range businesses {
  172. idxKey := keyIndex(business, mid)
  173. key := keyHistory(business, mid)
  174. if err = conn.Send("DEL", idxKey); err != nil {
  175. log.Error("conn.Send(DEL %s) error(%v)", idxKey, err)
  176. return
  177. }
  178. count++
  179. if err = conn.Send("DEL", key); err != nil {
  180. log.Error("conn.Send(DEL %s) error(%v)", key, err)
  181. return
  182. }
  183. count++
  184. }
  185. if err = conn.Flush(); err != nil {
  186. log.Error("conn.Flush() error(%v)", err)
  187. return
  188. }
  189. for i := 0; i < count; i++ {
  190. if _, err = conn.Receive(); err != nil {
  191. log.Error("conn.Receive() error(%v)", err)
  192. return
  193. }
  194. }
  195. return
  196. }
  197. // DelHistoryCache delete the history redis.
  198. func (d *Dao) DelHistoryCache(c context.Context, arg *pb.DelHistoriesReq) (err error) {
  199. conn := d.redis.Get(c)
  200. defer conn.Close()
  201. var count int
  202. for _, r := range arg.Records {
  203. var (
  204. indxKey = keyIndex(r.Business, arg.Mid)
  205. key = keyHistory(r.Business, arg.Mid)
  206. )
  207. if err = conn.Send("ZREM", indxKey, r.ID); err != nil {
  208. log.Error("conn.Send(ZREM %s,%v) error(%v)", indxKey, r.ID, err)
  209. return
  210. }
  211. count++
  212. if err = conn.Send("HDEL", key, r.ID); err != nil {
  213. log.Error("conn.Send(HDEL %s,%v) error(%v)", key, r.ID, err)
  214. return
  215. }
  216. count++
  217. }
  218. if err = conn.Flush(); err != nil {
  219. log.Error("conn.Flush() error(%v)", err)
  220. return
  221. }
  222. for i := 0; i < count; i++ {
  223. if _, err = conn.Receive(); err != nil {
  224. log.Error("conn.Receive() error(%v)", err)
  225. return
  226. }
  227. }
  228. return
  229. }
  230. // AddHistoryCache add the history to redis.
  231. func (d *Dao) AddHistoryCache(c context.Context, h *pb.AddHistoryReq) (err error) {
  232. var (
  233. b []byte
  234. mid = h.Mid
  235. idxKey, key = keyIndex(h.Business, mid), keyHistory(h.Business, mid)
  236. )
  237. if b, err = json.Marshal(h); err != nil {
  238. return
  239. }
  240. conn := d.redis.Get(c)
  241. defer conn.Close()
  242. if err = conn.Send("ZADD", idxKey, h.ViewAt, h.Kid); err != nil {
  243. log.Error("conn.Send(ZADD %s,%d) error(%v)", key, h.Kid, err)
  244. return
  245. }
  246. if err = conn.Send("HSET", key, h.Kid, string(b)); err != nil {
  247. log.Error("conn.Send(HSET %s,%d) error(%v)", key, h.Kid, err)
  248. return
  249. }
  250. if err = conn.Send("EXPIRE", idxKey, d.redisExpire); err != nil {
  251. log.Error("conn.Send(EXPIRE) error(%v)", err)
  252. return
  253. }
  254. if err = conn.Send("EXPIRE", key, d.redisExpire); err != nil {
  255. log.Error("conn.Send(EXPIRE) error(%v)", err)
  256. return
  257. }
  258. if err = conn.Flush(); err != nil {
  259. log.Error("conn.Flush() error(%v)", err)
  260. return
  261. }
  262. for i := 0; i < 2+2; i++ {
  263. if _, err = conn.Receive(); err != nil {
  264. log.Error("conn.Receive() error(%v)", err)
  265. return
  266. }
  267. }
  268. return
  269. }
  270. // AddHistoriesCache add the user to redis.
  271. func (d *Dao) AddHistoriesCache(c context.Context, hs []*pb.AddHistoryReq) (err error) {
  272. if len(hs) == 0 {
  273. return
  274. }
  275. conn := d.redis.Get(c)
  276. defer conn.Close()
  277. var count int
  278. for _, h := range hs {
  279. var (
  280. b []byte
  281. mid = h.Mid
  282. idxKey, key = keyIndex(h.Business, mid), keyHistory(h.Business, mid)
  283. )
  284. if b, err = json.Marshal(h); err != nil {
  285. continue
  286. }
  287. if err = conn.Send("ZADD", idxKey, h.ViewAt, h.Kid); err != nil {
  288. log.Error("conn.Send(ZADD %s,%d) error(%v)", key, h.Kid, err)
  289. continue
  290. }
  291. count++
  292. if err = conn.Send("HSET", key, h.Kid, string(b)); err != nil {
  293. log.Error("conn.Send(HSET %s,%d) error(%v)", key, h.Kid, err)
  294. continue
  295. }
  296. count++
  297. if err = conn.Send("EXPIRE", idxKey, d.redisExpire); err != nil {
  298. log.Error("conn.Send(EXPIRE) error(%v)", err)
  299. continue
  300. }
  301. count++
  302. if err = conn.Send("EXPIRE", key, d.redisExpire); err != nil {
  303. log.Error("conn.Send(EXPIRE) error(%v)", err)
  304. continue
  305. }
  306. count++
  307. }
  308. if err = conn.Flush(); err != nil {
  309. log.Error("conn.Flush() error(%v)", err)
  310. return
  311. }
  312. for i := 0; i < count; i++ {
  313. if _, err = conn.Receive(); err != nil {
  314. log.Error("conn.Receive() error(%v)", err)
  315. return
  316. }
  317. }
  318. return
  319. }
  320. // TrimCache trim history.
  321. func (d *Dao) TrimCache(c context.Context, business string, mid int64, limit int) (err error) {
  322. conn := d.redis.Get(c)
  323. defer conn.Close()
  324. aids, err := redis.Int64s(conn.Do("ZRANGE", keyIndex(business, mid), 0, -limit-1))
  325. if err != nil {
  326. log.Error("conn.Do(ZRANGE %v) error(%v)", keyIndex(business, mid), err)
  327. return
  328. }
  329. if len(aids) == 0 {
  330. return
  331. }
  332. return d.DelCache(c, business, mid, aids)
  333. }
  334. // DelCache delete the history redis.
  335. func (d *Dao) DelCache(c context.Context, business string, mid int64, aids []int64) (err error) {
  336. var (
  337. key1 = keyIndex(business, mid)
  338. key2 = keyHistory(business, mid)
  339. args1 = []interface{}{key1}
  340. args2 = []interface{}{key2}
  341. )
  342. for _, aid := range aids {
  343. args1 = append(args1, aid)
  344. args2 = append(args2, aid)
  345. }
  346. conn := d.redis.Get(c)
  347. defer conn.Close()
  348. if err = conn.Send("ZREM", args1...); err != nil {
  349. log.Error("conn.Send(ZREM %s,%v) error(%v)", key1, aids, err)
  350. return
  351. }
  352. if err = conn.Send("HDEL", args2...); err != nil {
  353. log.Error("conn.Send(HDEL %s,%v) error(%v)", key2, aids, err)
  354. return
  355. }
  356. if err = conn.Flush(); err != nil {
  357. log.Error("conn.Flush() error(%v)", err)
  358. return
  359. }
  360. for i := 0; i < 2; i++ {
  361. if _, err = conn.Receive(); err != nil {
  362. log.Error("conn.Receive() error(%v)", err)
  363. return
  364. }
  365. }
  366. return
  367. }
  368. // pingRedis ping redis.
  369. func (d *Dao) pingRedis(c context.Context) (err error) {
  370. conn := d.redis.Get(c)
  371. if _, err = conn.Do("SET", "PING", "PONG"); err != nil {
  372. log.Error("redis: conn.Do(SET,PING,PONG) error(%v)", err)
  373. }
  374. conn.Close()
  375. return
  376. }