redis.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "go-common/app/job/main/dm2/model"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. )
  10. const (
  11. // dm xml list v1
  12. _prefixDM = "dm_v1_%d_%d" // dm_v1_tpe_oid
  13. divide = 34359738368 // 2^35
  14. )
  15. func keyDM(tp int32, oid int64) (key string) {
  16. return fmt.Sprintf(_prefixDM, tp, oid)
  17. }
  18. // 弹幕在redis sortset 中的score
  19. // 通过score保证弹幕在缓存中的排序为:普通弹幕、普通弹幕中的保护弹幕、字幕弹幕、脚本弹幕
  20. func score(dm *model.DM) (score float64) {
  21. // NOTE redis score最多17位表示,这里采用整数十位+小数部分十位
  22. v := dm.ID / divide // 2^63 / 2^35 = 2^28-1 整数部分最大值:268435455
  23. k := dm.ID % divide // 精度8位,最后5位可忽略
  24. r := int64(dm.Pool)<<29 | int64(dm.Attr)&1<<28 | v // NOTE v should less than 2^28
  25. score, _ = strconv.ParseFloat(fmt.Sprintf("%d.%d", r, k), 64)
  26. return
  27. }
  28. // AddDMCache add dm to redis.
  29. func (d *Dao) AddDMCache(c context.Context, dm *model.DM) (err error) {
  30. var (
  31. conn = d.dmRds.Get(c)
  32. value []byte
  33. key = keyDM(dm.Type, dm.Oid)
  34. )
  35. defer conn.Close()
  36. if value, err = dm.Marshal(); err != nil {
  37. log.Error("dm.Marshal(%v) error(%v)", dm, err)
  38. return
  39. }
  40. if err = conn.Send("ZADD", key, score(dm), value); err != nil {
  41. log.Error("conn.Send(ZADD %v) error(%v)", dm, err)
  42. return
  43. }
  44. if err = conn.Send("EXPIRE", key, d.dmRdsExpire); err != nil {
  45. log.Error("conn.Send(EXPIRE %s) error(%v)", key, err)
  46. return
  47. }
  48. if err = conn.Flush(); err != nil {
  49. log.Error("conn.Flush() error(%v)", err)
  50. return
  51. }
  52. for i := 0; i < 2; i++ {
  53. if _, err = conn.Receive(); err != nil {
  54. log.Error("conn.Receive() error(%v)", err)
  55. return
  56. }
  57. }
  58. return
  59. }
  60. // SetDMCache flush dm list to redis.
  61. func (d *Dao) SetDMCache(c context.Context, tp int32, oid int64, dms []*model.DM) (err error) {
  62. var (
  63. value []byte
  64. conn = d.dmRds.Get(c)
  65. key = keyDM(tp, oid)
  66. )
  67. defer conn.Close()
  68. for _, dm := range dms {
  69. if value, err = dm.Marshal(); err != nil {
  70. log.Error("dm.Marshal(%v) error(%v)", dm, err)
  71. return
  72. }
  73. if err = conn.Send("ZADD", key, score(dm), value); err != nil {
  74. log.Error("conn.Send(ZADD %v) error(%v)", dm, err)
  75. return
  76. }
  77. }
  78. if err = conn.Send("EXPIRE", key, d.dmRdsExpire); err != nil {
  79. log.Error("conn.Send(EXPIRE %s) error(%v)", key, err)
  80. return
  81. }
  82. if err = conn.Flush(); err != nil {
  83. log.Error("conn.Flush() error(%v)", err)
  84. return
  85. }
  86. for i := 0; i < len(dms)+1; i++ {
  87. if _, err = conn.Receive(); err != nil {
  88. log.Error("conn.Receive() error(%v)", err)
  89. return
  90. }
  91. }
  92. return
  93. }
  94. // DelDMCache delete redis cache of oid.
  95. func (d *Dao) DelDMCache(c context.Context, tp int32, oid int64) (err error) {
  96. var (
  97. key = keyDM(tp, oid)
  98. conn = d.dmRds.Get(c)
  99. )
  100. if _, err = conn.Do("DEL", key); err != nil {
  101. log.Error("conn.Do(DEL %s) error(%v)", key, err)
  102. }
  103. conn.Close()
  104. return
  105. }
  106. // ExpireDMCache expire dm.
  107. func (d *Dao) ExpireDMCache(c context.Context, tp int32, oid int64) (ok bool, err error) {
  108. key := keyDM(tp, oid)
  109. conn := d.dmRds.Get(c)
  110. if ok, err = redis.Bool(conn.Do("EXPIRE", key, d.dmRdsExpire)); err != nil {
  111. log.Error("conn.Do(EXPIRE %s) error(%v)", key, err)
  112. }
  113. conn.Close()
  114. return
  115. }
  116. // DMCache 获取redis列表中的弹幕.
  117. func (d *Dao) DMCache(c context.Context, tp int32, oid int64) (res [][]byte, err error) {
  118. conn := d.dmRds.Get(c)
  119. key := keyDM(tp, oid)
  120. if res, err = redis.ByteSlices(conn.Do("ZRANGE", key, 0, -1)); err != nil {
  121. log.Error("conn.Do(ZRANGE %s) error(%v)", key, err)
  122. }
  123. conn.Close()
  124. return
  125. }
  126. // TrimDMCache 从redis列表中pop掉count条弹幕.
  127. func (d *Dao) TrimDMCache(c context.Context, tp int32, oid, count int64) (err error) {
  128. conn := d.dmRds.Get(c)
  129. key := keyDM(tp, oid)
  130. if _, err = conn.Do("ZREMRANGEBYRANK", key, 0, count-1); err != nil {
  131. log.Error("conn.Do(ZREMRANGEBYRANK %s) error(%v)", key, err)
  132. }
  133. conn.Close()
  134. return
  135. }