redis.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. package guard
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/pkg/errors"
  6. dahanghaiModel "go-common/app/service/live/xuser/model/dhh"
  7. gmc "go-common/library/cache/memcache"
  8. "go-common/library/cache/redis"
  9. "go-common/library/log"
  10. "go-common/library/stat/prom"
  11. "math/rand"
  12. "strconv"
  13. "time"
  14. )
  15. // redis cache
  16. const (
  17. _prefixUID = "live_user:guard:uid:v1:" // 用户侧key
  18. _prefixTopList = "GOVERNOR_SHOW_TID:" // 最近购买总督key
  19. _prefixAnchorID = "live_user:guard:target_id:v1:" // 主播侧key
  20. _emptyExpire = 3600
  21. _errorRedisLogPrefix = "xuser.dahanghai.dao.redis"
  22. _promGetSuccess = "xuser_dahanghai_redis:获取用户大航海cache成功"
  23. _promDelSuccess = "xuser_dahanghai_redis:成功删除用户大航海cache"
  24. _promDelErr = "xuser_dahanghai_redis:删除用户大航海cache失败"
  25. _promGetErr = "xuser_dahanghai_redis:批量获取用户大航海key失败"
  26. // _promScanErr = "xuser_dahanghai_redis:解析用户大航海key失败"
  27. )
  28. var (
  29. errorsCount = prom.BusinessErrCount
  30. infosCount = prom.BusinessInfoCount
  31. cacheHitCount = prom.CacheHit
  32. cacheMissCount = prom.CacheMiss
  33. )
  34. // PromError prometheus error count.
  35. func PromError(name string) {
  36. errorsCount.Incr(name)
  37. }
  38. // PromInfo prometheus info count.
  39. func PromInfo(name string) {
  40. infosCount.Incr(name)
  41. }
  42. // PromCacheHit prometheus cache hit count.
  43. func PromCacheHit(name string) {
  44. cacheHitCount.Incr(name)
  45. }
  46. // PromCacheMiss prometheus cache hit count.
  47. func PromCacheMiss(name string) {
  48. cacheMissCount.Incr(name)
  49. }
  50. func dahanghaiUIDKey(mid int64) string {
  51. return _prefixUID + strconv.FormatInt(mid, 10)
  52. }
  53. func guardAnchorUIDKey(mid int64) string {
  54. return _prefixAnchorID + strconv.FormatInt(mid, 10)
  55. }
  56. func recentGuardTopKey(mid int64) string {
  57. return _prefixTopList + strconv.FormatInt(mid, 10)
  58. }
  59. // SetDHHListCache ... 批量设置用户守护cache
  60. func (d *GuardDao) SetDHHListCache(c context.Context, dhhList []dahanghaiModel.DaHangHaiRedis2, uid int64) (err error) {
  61. return d.setDHHListCache(c, dhhList, uid)
  62. }
  63. // SetAnchorGuardListCache ... 批量设置主播维度守护信息cache
  64. func (d *GuardDao) SetAnchorGuardListCache(c context.Context, dhhList []dahanghaiModel.DaHangHaiRedis2, uid int64) (err error) {
  65. return d.setAnchorGuardListCache(c, dhhList, uid)
  66. }
  67. // DelDHHFromRedis 删除获取用户守护cache,不支持批量
  68. func (d *GuardDao) DelDHHFromRedis(c context.Context, mid int64) (err error) {
  69. return d.delDHHFromRedis(c, dahanghaiUIDKey(mid))
  70. }
  71. // GetUIDAllGuardFromRedis 获取单个用户的全量守护信息
  72. func (d *GuardDao) GetUIDAllGuardFromRedis(ctx context.Context, mids []int64) (dhhList []*dahanghaiModel.DaHangHaiRedis2, err error) {
  73. return d.getUIDAllGuardFromRedis(ctx, mids)
  74. }
  75. // GetUIDAllGuardFromRedisBatch 获取批量用户的全量守护信息
  76. func (d *GuardDao) GetUIDAllGuardFromRedisBatch(ctx context.Context, mids []int64) (dhhList []*dahanghaiModel.DaHangHaiRedis2, err error) {
  77. return d.getUIDAllGuardFromRedisBatch(ctx, mids)
  78. }
  79. // GetAnchorAllGuardFromRedis 获取单个主播的全量被守护信息(同一个主播仅获取最高级别)
  80. func (d *GuardDao) GetAnchorAllGuardFromRedis(ctx context.Context, anchorUIDs []int64) (dhhList []*dahanghaiModel.DaHangHaiRedis2, err error) {
  81. return d.getAnchorAllGuardFromRedis(ctx, anchorUIDs)
  82. }
  83. // GetGuardTopListCache 获取单个用户的全量守护信息
  84. func (d *GuardDao) GetGuardTopListCache(ctx context.Context, uid int64) (dhhList []*dahanghaiModel.DaHangHaiRedis2, err error) {
  85. return d.getGuardTopListCache(ctx, uid)
  86. }
  87. // GetAnchorRecentTopGuardCache 获取单个主播最近的总督信息
  88. func (d *GuardDao) GetAnchorRecentTopGuardCache(ctx context.Context, uid int64) (resp map[int64]int64, err error) {
  89. return d.getAnchorRecentTopGuardCache(ctx, uid)
  90. }
  91. func (d *GuardDao) getGuardTopListCache(ctx context.Context, uid int64) (dhhList []*dahanghaiModel.DaHangHaiRedis2, err error) {
  92. return
  93. }
  94. // getUIDAllGuardFromRedis 批量获取用户cache
  95. func (d *GuardDao) getUIDAllGuardFromRedis(ctx context.Context, mids []int64) (dhhList []*dahanghaiModel.DaHangHaiRedis2, err error) {
  96. var (
  97. conn = d.redis.Get(ctx)
  98. args = redis.Args{}
  99. cacheResult [][]byte
  100. )
  101. defer conn.Close()
  102. for _, uid := range mids {
  103. args = args.Add(dahanghaiUIDKey(uid))
  104. }
  105. if cacheResult, err = redis.ByteSlices(conn.Do("MGET", args...)); err != nil {
  106. if err == redis.ErrNil {
  107. err = nil
  108. } else {
  109. PromError(_promGetErr)
  110. log.Error(_errorRedisLogPrefix+"|conn.MGET(%v) error(%v)", args, err)
  111. err = errors.Wrapf(err, "redis.StringMap(conn.Do(MGET,%v)", args)
  112. }
  113. return
  114. }
  115. dhhList = make([]*dahanghaiModel.DaHangHaiRedis2, 0)
  116. dhhListSingle := &dahanghaiModel.DaHangHaiRedis2{}
  117. if len(cacheResult) > 0 {
  118. for k, v := range cacheResult {
  119. if v == nil {
  120. return nil, nil
  121. }
  122. if len(v) > 0 {
  123. if err = json.Unmarshal([]byte(v), &dhhList); err != nil {
  124. if err = json.Unmarshal([]byte(v), &dhhListSingle); err != nil {
  125. log.Error("[dao.dahanghai.cache|GetDHHFromRedis] json.Unmarshal rawInfo error(%v), uid(%d), reply(%s)",
  126. err, k, v)
  127. return nil, nil
  128. }
  129. dhhList = append(dhhList, dhhListSingle)
  130. }
  131. }
  132. }
  133. }
  134. PromInfo(_promGetSuccess)
  135. return
  136. }
  137. // getUIDAllGuardFromRedis 批量获取用户cache
  138. func (d *GuardDao) getUIDAllGuardFromRedisBatch(ctx context.Context, mids []int64) (dhhList []*dahanghaiModel.DaHangHaiRedis2, err error) {
  139. var (
  140. conn = d.redis.Get(ctx)
  141. args = redis.Args{}
  142. cacheResult [][]byte
  143. )
  144. defer conn.Close()
  145. for _, uid := range mids {
  146. args = args.Add(dahanghaiUIDKey(uid))
  147. }
  148. if cacheResult, err = redis.ByteSlices(conn.Do("MGET", args...)); err != nil {
  149. if err == redis.ErrNil {
  150. err = nil
  151. } else {
  152. PromError(_promGetErr)
  153. log.Error(_errorRedisLogPrefix+"|conn.MGET(%v) error(%v)", args, err)
  154. err = errors.Wrapf(err, "redis.StringMap(conn.Do(MGET,%v)", args)
  155. }
  156. return
  157. }
  158. dhhList = make([]*dahanghaiModel.DaHangHaiRedis2, 0)
  159. if len(cacheResult) > 0 {
  160. for k, v := range cacheResult {
  161. if v == nil {
  162. continue
  163. }
  164. dhhListLoop := make([]*dahanghaiModel.DaHangHaiRedis2, 0)
  165. dhhListSingle := &dahanghaiModel.DaHangHaiRedis2{}
  166. if len(v) > 0 {
  167. if err = json.Unmarshal([]byte(v), &dhhListLoop); err != nil {
  168. if err = json.Unmarshal([]byte(v), &dhhListSingle); err != nil {
  169. log.Error("[dao.dahanghai.cache|GetDHHFromRedis] json.Unmarshal rawInfo error(%v), uid(%d), reply(%s)",
  170. err, k, v)
  171. return nil, nil
  172. }
  173. dhhList = append(dhhList, dhhListSingle)
  174. } else {
  175. dhhList = append(dhhList, dhhListLoop...)
  176. }
  177. }
  178. }
  179. }
  180. PromInfo(_promGetSuccess)
  181. return
  182. }
  183. // getAnchorAllGuardFromRedis 批量获取用户cache
  184. func (d *GuardDao) getAnchorAllGuardFromRedis(ctx context.Context, mids []int64) (dhhList []*dahanghaiModel.DaHangHaiRedis2, err error) {
  185. var (
  186. conn = d.redis.Get(ctx)
  187. args = redis.Args{}
  188. cacheResult [][]byte
  189. )
  190. defer conn.Close()
  191. for _, uid := range mids {
  192. args = args.Add(guardAnchorUIDKey(uid))
  193. }
  194. if cacheResult, err = redis.ByteSlices(conn.Do("MGET", args...)); err != nil {
  195. if err == redis.ErrNil {
  196. err = nil
  197. } else {
  198. PromError(_promGetErr)
  199. log.Error(_errorRedisLogPrefix+"|conn.MGET(%v) error(%v)", args, err)
  200. err = errors.Wrapf(err, "redis.StringMap(conn.Do(MGET,%v)", args)
  201. }
  202. return
  203. }
  204. dhhList = make([]*dahanghaiModel.DaHangHaiRedis2, 0)
  205. dhhListSingle := &dahanghaiModel.DaHangHaiRedis2{}
  206. if len(cacheResult) > 0 {
  207. for k, v := range cacheResult {
  208. if v == nil {
  209. return nil, nil
  210. }
  211. if len(v) > 0 {
  212. if err = json.Unmarshal([]byte(v), &dhhList); err != nil {
  213. if err = json.Unmarshal([]byte(v), &dhhListSingle); err != nil {
  214. log.Error("[dao.dahanghai.cache|GetDHHFromRedis] json.Unmarshal rawInfo error(%v), uid(%d), reply(%s)",
  215. err, k, v)
  216. return nil, nil
  217. }
  218. dhhList = append(dhhList, dhhListSingle)
  219. }
  220. }
  221. }
  222. }
  223. PromInfo(_promGetSuccess)
  224. return
  225. }
  226. func (d *GuardDao) delDHHFromRedis(ctx context.Context, key string) (err error) {
  227. conn := d.redis.Get(ctx)
  228. defer conn.Close()
  229. _, err = conn.Do("DEL", key)
  230. if err == gmc.ErrNotFound {
  231. err = nil
  232. } else {
  233. log.Error(_errorRedisLogPrefix+"|Delete(%s) error(%v)", key, err)
  234. PromError(_promDelErr)
  235. }
  236. PromInfo(_promDelSuccess)
  237. conn.Close()
  238. return
  239. }
  240. func (d *GuardDao) setDHHListCache(ctx context.Context, dhhList []dahanghaiModel.DaHangHaiRedis2, uid int64) (err error) {
  241. expire := d.getExpire()
  242. rand.Seed(time.Now().UnixNano())
  243. expire = expire + rand.Int31n(60)
  244. var (
  245. argsMid = redis.Args{}
  246. conn = d.redis.Get(ctx)
  247. dhhJSON []byte
  248. )
  249. defer conn.Close()
  250. key := dahanghaiUIDKey(uid)
  251. if len(dhhList) == 0 {
  252. argsMid = argsMid.Add(key).Add("")
  253. } else {
  254. dhhJSON, err = json.Marshal(dhhList)
  255. if err != nil {
  256. log.Error("[dao.dahanghai.cache|GetDHHFromRedis] json.Marshal rawInfo error(%v), uid(%d)", err, uid)
  257. return
  258. }
  259. argsMid = argsMid.Add(key).Add(string(dhhJSON))
  260. }
  261. if err = conn.Send("SET", argsMid...); err != nil {
  262. err = errors.Wrap(err, "conn.Send(SET) error")
  263. return
  264. }
  265. rand.Seed(time.Now().UnixNano())
  266. expire = expire + rand.Int31n(60)
  267. if err = conn.Send("EXPIRE", key, expire); err != nil {
  268. log.Error("setDHHListCache conn.Send(Expire, %s, %d) error(%v)", key, expire, err)
  269. return
  270. }
  271. return
  272. }
  273. func (d *GuardDao) setAnchorGuardListCache(ctx context.Context, dhhList []dahanghaiModel.DaHangHaiRedis2, uid int64) (err error) {
  274. expire := d.getExpire()
  275. rand.Seed(time.Now().UnixNano())
  276. expire = expire + rand.Int31n(60)
  277. var (
  278. argsMid = redis.Args{}
  279. conn = d.redis.Get(ctx)
  280. dhhJSON []byte
  281. )
  282. defer conn.Close()
  283. key := guardAnchorUIDKey(uid)
  284. if len(dhhList) == 0 {
  285. argsMid = argsMid.Add(key).Add("")
  286. } else {
  287. dhhJSON, err = json.Marshal(dhhList)
  288. if err != nil {
  289. log.Error("[dao.dahanghai.cache|setAnchorGuardListCache] json.Marshal rawInfo error(%v), uid(%d)", err, uid)
  290. return
  291. }
  292. argsMid = argsMid.Add(key).Add(string(dhhJSON))
  293. }
  294. if err = conn.Send("SET", argsMid...); err != nil {
  295. err = errors.Wrap(err, "conn.Send(SET) error")
  296. return
  297. }
  298. rand.Seed(time.Now().UnixNano())
  299. expire = expire + rand.Int31n(60)
  300. if err = conn.Send("EXPIRE", key, expire); err != nil {
  301. log.Error("setAnchorGuardListCache conn.Send(Expire, %s, %d) error(%v)", key, expire, err)
  302. return
  303. }
  304. return
  305. }
  306. func (d *GuardDao) getAnchorRecentTopGuardCache(ctx context.Context, uid int64) (resp map[int64]int64, err error) {
  307. resp = make(map[int64]int64)
  308. nowTime := time.Now().Unix()
  309. cacheKey := recentGuardTopKey(uid)
  310. var (
  311. conn = d.redis.Get(ctx)
  312. )
  313. values, err := redis.Values(conn.Do("ZRANGEBYSCORE", cacheKey, nowTime, "INF", "WITHSCORES"))
  314. if err != nil {
  315. log.Error("getAnchorRecentTopGuardCache.conn.Do(ZRANGEBYSCORE %v) error(%v)", cacheKey, err)
  316. return
  317. }
  318. if len(values) == 0 {
  319. return
  320. }
  321. var aid, unix int64
  322. for len(values) > 0 {
  323. if values, err = redis.Scan(values, &aid, &unix); err != nil {
  324. log.Error("getAnchorRecentTopGuardCache.redis.Scan(%v) error(%v)", values, err)
  325. return
  326. }
  327. resp[aid] = unix
  328. }
  329. return
  330. }