redis.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. gtime "time"
  7. "go-common/app/service/main/relation/model"
  8. "go-common/library/cache/redis"
  9. "go-common/library/log"
  10. "go-common/library/time"
  11. )
  12. const (
  13. _prefixFollowings = "at_" // key of public following with tags datas.
  14. _prefixMonitor = "rs_mo_list" // key of monitor
  15. _prefixRecentFollower = "rf_" // recent follower sorted set
  16. _prefixRecentFollowerTime = "rft_" // recent follower sorted set
  17. _prefixDailyNotifyCount = "dnc_%d_%s" // daily new-follower notificaiton count
  18. _notifyCountExpire = 24 * 3600 // notify count scope is daily
  19. )
  20. func followingsKey(mid int64) string {
  21. return _prefixFollowings + strconv.FormatInt(mid, 10)
  22. }
  23. func monitorKey() string {
  24. return _prefixMonitor
  25. }
  26. func recentFollower(mid int64) string {
  27. return _prefixRecentFollower + strconv.FormatInt(mid, 10)
  28. }
  29. func recentFollowerNotify(mid int64) string {
  30. return _prefixRecentFollowerTime + strconv.FormatInt(mid%10000, 10)
  31. }
  32. func dailyNotifyCount(mid int64, date gtime.Time) string {
  33. // _cacheShard 作为sharding
  34. return fmt.Sprintf(_prefixDailyNotifyCount, mid%_cacheShard, date.Format("2006-01-02"))
  35. }
  36. // pingRedis ping redis.
  37. func (d *Dao) pingRedis(c context.Context) (err error) {
  38. conn := d.redis.Get(c)
  39. if _, err = conn.Do("SET", "PING", "PONG"); err != nil {
  40. log.Error("conn.Do(SET,PING,PONG) error(%v)", err)
  41. }
  42. conn.Close()
  43. return
  44. }
  45. // SetFollowingsCache set followings cache.
  46. func (d *Dao) SetFollowingsCache(c context.Context, mid int64, followings []*model.Following) (err error) {
  47. key := followingsKey(mid)
  48. args := redis.Args{}.Add(key)
  49. expire := d.redisExpire
  50. if len(followings) == 0 {
  51. expire = 7200
  52. }
  53. ef, _ := d.encode(0, 0, nil, 0)
  54. args = args.Add(0, ef)
  55. for i := 0; i < len(followings); i++ {
  56. var ef []byte
  57. if ef, err = d.encode(followings[i].Attribute, followings[i].MTime, followings[i].Tag, followings[i].Special); err != nil {
  58. return
  59. }
  60. args = args.Add(followings[i].Mid, ef)
  61. }
  62. conn := d.redis.Get(c)
  63. defer conn.Close()
  64. if err = conn.Send("DEL", key); err != nil {
  65. log.Error("conn.Send(DEL, %s) error(%v)", key, err)
  66. return
  67. }
  68. if err = conn.Send("HMSET", args...); err != nil {
  69. log.Error("conn.Send(HMSET, %s) error(%v)", key, err)
  70. return
  71. }
  72. if err = conn.Send("EXPIRE", key, expire); err != nil {
  73. log.Error("conn.Send(EXPIRE, %s) error(%v)", key, err)
  74. return
  75. }
  76. if err = conn.Flush(); err != nil {
  77. log.Error("conn.Flush() error(%v)", err)
  78. return
  79. }
  80. for i := 0; i < 3; i++ {
  81. if _, err = conn.Receive(); err != nil {
  82. log.Error("conn.Receive() %d error(%v)", i+1, err)
  83. break
  84. }
  85. }
  86. return
  87. }
  88. // AddFollowingCache add following cache.
  89. func (d *Dao) AddFollowingCache(c context.Context, mid int64, following *model.Following) (err error) {
  90. var (
  91. ok bool
  92. key = followingsKey(mid)
  93. )
  94. conn := d.redis.Get(c)
  95. if ok, err = redis.Bool(conn.Do("EXPIRE", key, d.redisExpire)); err != nil {
  96. log.Error("redis.Bool(conn.Do(EXPIRE, %s)) error(%v)", key, err)
  97. } else if ok {
  98. var ef []byte
  99. if ef, err = d.encode(following.Attribute, following.MTime, following.Tag, following.Special); err != nil {
  100. return
  101. }
  102. if _, err = conn.Do("HSET", key, following.Mid, ef); err != nil {
  103. log.Error("conn.Do(HSET, %s, %d) error(%v)", key, following.Mid, err)
  104. }
  105. }
  106. conn.Close()
  107. return
  108. }
  109. // DelFollowing del following cache.
  110. func (d *Dao) DelFollowing(c context.Context, mid int64, following *model.Following) (err error) {
  111. var (
  112. ok bool
  113. key = followingsKey(mid)
  114. )
  115. conn := d.redis.Get(c)
  116. if ok, err = redis.Bool(conn.Do("EXPIRE", key, d.redisExpire)); err != nil {
  117. log.Error("redis.Bool(conn.Do(EXPIRE, %s)) error(%v)", key, err)
  118. } else if ok {
  119. if _, err = conn.Do("HDEL", key, following.Mid); err != nil {
  120. log.Error("conn.Do(HDEL, %s, %d) error(%v)", key, following.Mid, err)
  121. }
  122. }
  123. conn.Close()
  124. return
  125. }
  126. // FollowingsCache get followings cache.
  127. func (d *Dao) FollowingsCache(c context.Context, mid int64) (followings []*model.Following, err error) {
  128. key := followingsKey(mid)
  129. conn := d.redis.Get(c)
  130. defer conn.Close()
  131. tmp, err := redis.StringMap(conn.Do("HGETALL", key))
  132. if err != nil {
  133. return
  134. }
  135. if err == nil && len(tmp) > 0 {
  136. for k, v := range tmp {
  137. if mid, err = strconv.ParseInt(k, 10, 64); err != nil {
  138. return
  139. }
  140. if mid <= 0 {
  141. continue
  142. }
  143. vf := &model.FollowingTags{}
  144. if err = d.decode([]byte(v), vf); err != nil {
  145. //todo
  146. return
  147. }
  148. followings = append(followings, &model.Following{
  149. Mid: mid,
  150. Attribute: vf.Attr,
  151. Tag: vf.TagIds,
  152. MTime: vf.Ts,
  153. Special: vf.Special,
  154. })
  155. }
  156. }
  157. return
  158. }
  159. // DelFollowingsCache delete followings cache.
  160. func (d *Dao) DelFollowingsCache(c context.Context, mid int64) (err error) {
  161. key := followingsKey(mid)
  162. conn := d.redis.Get(c)
  163. if _, err = conn.Do("DEL", key); err != nil {
  164. log.Error("conn.Do(DEL, %s) error(%v)", key, err)
  165. }
  166. conn.Close()
  167. return
  168. }
  169. // RelationsCache relations cache.
  170. func (d *Dao) RelationsCache(c context.Context, mid int64, fids []int64) (resMap map[int64]*model.Following, err error) {
  171. var retRedis [][]byte
  172. key := followingsKey(mid)
  173. args := redis.Args{}.Add(key)
  174. for _, fid := range fids {
  175. args = args.Add(fid)
  176. }
  177. args.Add(0)
  178. conn := d.redis.Get(c)
  179. defer conn.Close()
  180. if retRedis, err = redis.ByteSlices(conn.Do("HMGET", args...)); err != nil {
  181. log.Error("redis.Int64s(conn.DO(HMGET, %v)) error(%v)", args, err)
  182. return
  183. }
  184. resMap = make(map[int64]*model.Following)
  185. for index, fid := range fids {
  186. if retRedis[index] == nil {
  187. continue
  188. }
  189. v := &model.FollowingTags{}
  190. if err = d.decode(retRedis[index], v); err != nil {
  191. return
  192. }
  193. resMap[fid] = &model.Following{
  194. Mid: fid,
  195. Attribute: v.Attr,
  196. Tag: v.TagIds,
  197. MTime: v.Ts,
  198. Special: v.Special,
  199. }
  200. }
  201. return
  202. }
  203. // encode
  204. func (d *Dao) encode(attribute uint32, mtime time.Time, tagids []int64, special int32) (res []byte, err error) {
  205. ft := &model.FollowingTags{Attr: attribute, Ts: mtime, TagIds: tagids, Special: special}
  206. return ft.Marshal()
  207. }
  208. // decode
  209. func (d *Dao) decode(src []byte, v *model.FollowingTags) (err error) {
  210. return v.Unmarshal(src)
  211. }
  212. // MonitorCache monitor cache
  213. func (d *Dao) MonitorCache(c context.Context, mid int64) (exist bool, err error) {
  214. key := monitorKey()
  215. conn := d.redis.Get(c)
  216. if exist, err = redis.Bool(conn.Do("SISMEMBER", key, mid)); err != nil {
  217. log.Error("redis.Bool(conn.Do(SISMEMBER, %s, %d)) error(%v)", key, mid, err)
  218. }
  219. conn.Close()
  220. return
  221. }
  222. // SetMonitorCache set monitor cache
  223. func (d *Dao) SetMonitorCache(c context.Context, mid int64) (err error) {
  224. var (
  225. key = monitorKey()
  226. conn = d.redis.Get(c)
  227. )
  228. defer conn.Close()
  229. if _, err = conn.Do("SADD", key, mid); err != nil {
  230. log.Error("SADD conn.Do error(%v)", err)
  231. return
  232. }
  233. return
  234. }
  235. // DelMonitorCache del monitor cache
  236. func (d *Dao) DelMonitorCache(c context.Context, mid int64) (err error) {
  237. var (
  238. key = monitorKey()
  239. conn = d.redis.Get(c)
  240. )
  241. defer conn.Close()
  242. if _, err = redis.Int64(conn.Do("SREM", key, mid)); err != nil {
  243. log.Error("SREM conn.Do(%s,%d) err(%v)", key, mid, err)
  244. }
  245. return
  246. }
  247. // LoadMonitorCache load monitor cache
  248. func (d *Dao) LoadMonitorCache(c context.Context, mids []int64) (err error) {
  249. var (
  250. key = monitorKey()
  251. conn = d.redis.Get(c)
  252. )
  253. defer conn.Close()
  254. for _, v := range mids {
  255. if err = conn.Send("SADD", key, v); err != nil {
  256. log.Error("SADD conn.Do error(%v)", err)
  257. return
  258. }
  259. }
  260. if err = conn.Flush(); err != nil {
  261. log.Error("conn.Flush error(%v)", err)
  262. return
  263. }
  264. return
  265. }
  266. // TodayNotifyCountCache get notify count in the current day
  267. func (d *Dao) TodayNotifyCountCache(c context.Context, mid int64) (notifyCount int64, err error) {
  268. var (
  269. key = dailyNotifyCount(mid, gtime.Now())
  270. conn = d.redis.Get(c)
  271. )
  272. defer conn.Close()
  273. if notifyCount, err = redis.Int64(conn.Do("HGET", key, mid)); err != nil {
  274. if err == redis.ErrNil {
  275. err = nil
  276. return
  277. }
  278. log.Error("HGET conn.Do error(%v)", err)
  279. return
  280. }
  281. if err = conn.Flush(); err != nil {
  282. log.Error("conn.Flush error(%v)", err)
  283. return
  284. }
  285. return
  286. }
  287. // IncrTodayNotifyCount increment the today notify count in the current day
  288. func (d *Dao) IncrTodayNotifyCount(c context.Context, mid int64) (err error) {
  289. var (
  290. key = dailyNotifyCount(mid, gtime.Now())
  291. conn = d.redis.Get(c)
  292. )
  293. defer conn.Close()
  294. if err = conn.Send("HINCRBY", key, mid, 1); err != nil {
  295. log.Error("HINCRBY conn.Do error(%v)", err)
  296. return
  297. }
  298. if err = conn.Send("EXPIRE", key, _notifyCountExpire); err != nil {
  299. log.Error("EXPIRE conn.Do error(%v)", err)
  300. return
  301. }
  302. if err = conn.Flush(); err != nil {
  303. log.Error("conn.Flush error(%v)", err)
  304. return
  305. }
  306. return
  307. }