redis.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/service/main/share/model"
  6. "go-common/library/cache/redis"
  7. "go-common/library/log"
  8. xip "go-common/library/net/ip"
  9. farm "github.com/dgryski/go-farm"
  10. "github.com/pkg/errors"
  11. )
  12. func redisKey(oid int64, tp int) string {
  13. return fmt.Sprintf("%d_%d", oid, tp)
  14. }
  15. func redisValue(p *model.ShareParams) int64 {
  16. return int64(farm.Hash64([]byte(fmt.Sprintf("%d_%d_%d_%s", p.MID, p.OID, p.TP, p.IP))))
  17. }
  18. func shareKey(oid int64, tp int) string {
  19. return fmt.Sprintf("c_%d_%d", oid, tp)
  20. }
  21. // AddShareMember add share
  22. func (d *Dao) AddShareMember(ctx context.Context, p *model.ShareParams) (ok bool, err error) {
  23. var (
  24. conn = d.rds.Get(ctx)
  25. key = redisKey(p.OID, p.TP)
  26. value = (p.MID << 32) | int64(xip.InetAtoN(p.IP))
  27. )
  28. log.Info("oid-%d mid-%d ip-%s tp-%d key-%s value-%d", p.OID, p.MID, p.IP, p.TP, key, value)
  29. defer conn.Close()
  30. if err = conn.Send("SADD", key, value); err != nil {
  31. err = errors.Wrapf(err, "conn.Do(SADD, %s, %d)", key, value)
  32. return
  33. }
  34. if err = conn.Send("EXPIRE", key, d.c.RedisExpire); err != nil {
  35. err = errors.Wrapf(err, "conn.Do(SADD, %s, %d)", key, value)
  36. return
  37. }
  38. if err = conn.Flush(); err != nil {
  39. err = errors.Wrap(err, "conn.Flush")
  40. return
  41. }
  42. if ok, err = redis.Bool(conn.Receive()); err != nil {
  43. log.Error("sadd failed mid(%d) oid(%d) type(%d) ip(%s) key(%s) value(%d)",
  44. p.MID, p.OID, p.TP, p.IP, key, value)
  45. err = errors.Wrap(err, "redis.Bool(conn.Receive)")
  46. return
  47. }
  48. if _, err = conn.Receive(); err != nil {
  49. err = errors.Wrap(err, "conn.Receive")
  50. return
  51. }
  52. return
  53. }
  54. // SetShareCache set share cache
  55. func (d *Dao) SetShareCache(c context.Context, oid int64, tp int, shared int64) (err error) {
  56. var (
  57. conn = d.rds.Get(c)
  58. key = shareKey(oid, tp)
  59. )
  60. defer conn.Close()
  61. if _, err = conn.Do("SET", key, shared); err != nil {
  62. err = errors.WithStack(err)
  63. return
  64. }
  65. return
  66. }
  67. // ShareCache return oid share count
  68. func (d *Dao) ShareCache(c context.Context, oid int64, tp int) (shared int64, err error) {
  69. var (
  70. conn = d.rds.Get(c)
  71. key = shareKey(oid, tp)
  72. )
  73. defer conn.Close()
  74. if shared, err = redis.Int64(conn.Do("GET", key)); err != nil {
  75. if err == redis.ErrNil {
  76. shared = -1
  77. err = nil
  78. } else {
  79. err = errors.WithStack(err)
  80. }
  81. }
  82. return
  83. }
  84. // SharesCache return oids share
  85. func (d *Dao) SharesCache(c context.Context, oids []int64, tp int) (shares map[int64]int64, err error) {
  86. conn := d.rds.Get(c)
  87. defer conn.Close()
  88. for _, oid := range oids {
  89. if err = conn.Send("GET", shareKey(oid, tp)); err != nil {
  90. log.Error("conn.Send(GET, %s) error(%v)", shareKey(oid, tp), err)
  91. return
  92. }
  93. }
  94. if err = conn.Flush(); err != nil {
  95. log.Error("conn.Flush error(%v)", err)
  96. return
  97. }
  98. shares = make(map[int64]int64, len(oids))
  99. for _, oid := range oids {
  100. var cnt int64
  101. if cnt, err = redis.Int64(conn.Receive()); err != nil {
  102. if err == redis.ErrNil {
  103. err = nil
  104. continue
  105. }
  106. return
  107. }
  108. shares[oid] = cnt
  109. }
  110. return
  111. }