dao.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/service/bbq/notice-service/api/v1"
  6. "go-common/app/service/bbq/notice-service/internal/conf"
  7. push "go-common/app/service/bbq/push/api/grpc/v1"
  8. "go-common/library/cache/redis"
  9. xsql "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "go-common/library/net/rpc/warden"
  12. )
  13. const (
  14. _listSQL = "select id, mid, action_mid, svid, notice_type, title, text, jump_url, biz_type, biz_id, ctime from notice_%02d where mid = ? and notice_type = ? and id < ? order by id desc limit %d"
  15. _insertSQL = "insert into notice_%02d (mid, action_mid, svid, notice_type, title, text, jump_url, biz_type, biz_id) values (?,?,?,?,?,?,?,?,?)"
  16. _noticeLen = 10
  17. _redisUnreadKey = "notice:unread:%d"
  18. _redisExpireTime = 7776000 // 90days
  19. )
  20. // Dao dao
  21. type Dao struct {
  22. c *conf.Config
  23. db *xsql.DB
  24. redis *redis.Pool
  25. pushClient push.PushClient
  26. }
  27. // New init mysql db
  28. func New(c *conf.Config) (dao *Dao) {
  29. dao = &Dao{
  30. c: c,
  31. db: xsql.NewMySQL(c.MySQL),
  32. redis: redis.NewPool(c.Redis),
  33. pushClient: newPushClient(c.GRPCClient["push"]),
  34. }
  35. return
  36. }
  37. // Close close the resource.
  38. func (d *Dao) Close() {
  39. d.db.Close()
  40. }
  41. // Ping dao ping
  42. func (d *Dao) Ping(ctx context.Context) error {
  43. // TODO: add mc,redis... if you use
  44. return d.db.Ping(ctx)
  45. }
  46. func getTableIndex(id int64) int64 {
  47. return id % 100
  48. }
  49. // newPushClient .
  50. func newPushClient(cfg *conf.GRPCClientConfig) push.PushClient {
  51. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  52. if err != nil {
  53. panic(err)
  54. }
  55. return push.NewPushClient(cc)
  56. }
  57. // ListNotices 获取通知列表
  58. func (d *Dao) ListNotices(ctx context.Context, mid, cursorID int64, noticeType int32) (list []*v1.NoticeBase, err error) {
  59. querySQL := fmt.Sprintf(_listSQL, getTableIndex(mid), _noticeLen)
  60. log.V(1).Infov(ctx, log.KV("mid", mid), log.KV("mid", mid), log.KV("notice_type", noticeType), log.KV("cursor_id", cursorID), log.KV("sql", querySQL))
  61. rows, err := d.db.Query(ctx, querySQL, mid, noticeType, cursorID)
  62. if err != nil {
  63. log.Errorv(ctx, log.KV("log", "query mysql notice list fail"), log.KV("sql", querySQL), log.KV("mid", mid), log.KV("biz_type", noticeType), log.KV("cursor_id", cursorID))
  64. return
  65. }
  66. defer rows.Close()
  67. for rows.Next() {
  68. var notice v1.NoticeBase
  69. if err = rows.Scan(&notice.Id, &notice.Mid, &notice.ActionMid, &notice.SvId, &notice.NoticeType, &notice.Title, &notice.Text, &notice.JumpUrl, &notice.BizType, &notice.BizId, &notice.NoticeTime); err != nil {
  70. log.Errorv(ctx, log.KV("log", "scan mysql notice list fail"), log.KV("sql", querySQL), log.KV("mid", mid), log.KV("biz_type", noticeType), log.KV("mid", mid), log.KV("cursor_id", cursorID))
  71. return
  72. }
  73. list = append(list, &notice)
  74. }
  75. // 只要用户读取数据,即清理未读数
  76. conn := d.redis.Get(ctx)
  77. defer conn.Close()
  78. redisKey := fmt.Sprintf(_redisUnreadKey, mid)
  79. if _, tmpErr := conn.Do("HSET", redisKey, noticeType, 0); tmpErr != nil {
  80. log.Warnv(ctx, log.KV("log", "clear unread info redis fail: key="+redisKey))
  81. }
  82. log.V(1).Infov(ctx, log.KV("req_size", _noticeLen), log.KV("rsp_size", len(list)))
  83. return
  84. }
  85. // CreateNotice 创建通知
  86. func (d *Dao) CreateNotice(ctx context.Context, notice *v1.NoticeBase) (id int64, err error) {
  87. querySQL := fmt.Sprintf(_insertSQL, getTableIndex(notice.Mid))
  88. res, err := d.db.Exec(ctx, querySQL, notice.Mid, notice.ActionMid, notice.SvId, notice.NoticeType, notice.Title, notice.Text, notice.JumpUrl, notice.BizType, notice.BizId)
  89. if err != nil {
  90. log.Errorv(ctx, log.KV("log", "exec mysql fail: create notice"), log.KV("sql", querySQL))
  91. return
  92. }
  93. id, _ = res.LastInsertId()
  94. return
  95. }
  96. // IncreaseUnread 增加未读
  97. func (d *Dao) IncreaseUnread(ctx context.Context, mid int64, noticeType int32, num int64) (err error) {
  98. conn := d.redis.Get(ctx)
  99. defer conn.Close()
  100. redisKey := fmt.Sprintf(_redisUnreadKey, mid)
  101. expireResult, _ := redis.Int(conn.Do("EXPIRE", redisKey, _redisExpireTime))
  102. if expireResult == 0 {
  103. log.Infov(ctx, log.KV("log", "expire fail: key="+redisKey))
  104. }
  105. _, err = conn.Do("HINCRBY", redisKey, noticeType, num)
  106. if err != nil {
  107. log.Errorv(ctx, log.KV("log", "HINCRBY notice unread fail: err="+err.Error()))
  108. return
  109. }
  110. log.V(1).Infov(ctx, log.KV("log", "hincrby notice unread : key="+redisKey), log.KV("notice_type", noticeType), log.KV("num", num))
  111. return
  112. }
  113. // ClearUnread 清理未读
  114. func (d *Dao) ClearUnread(ctx context.Context, mid int64, noticeType int32) (err error) {
  115. conn := d.redis.Get(ctx)
  116. defer conn.Close()
  117. redisKey := fmt.Sprintf(_redisUnreadKey, mid)
  118. expireResult, _ := redis.Int(conn.Do("EXPIRE", redisKey, _redisExpireTime))
  119. if expireResult == 0 {
  120. log.Infov(ctx, log.KV("log", "expire fail and return: key="+redisKey))
  121. return
  122. }
  123. _, err = conn.Do("HSET", redisKey, noticeType, 0)
  124. if err != nil {
  125. log.Errorv(ctx, log.KV("log", "HSET notice unread fail: err="+err.Error()))
  126. return
  127. }
  128. log.V(1).Infov(ctx, log.KV("log", "HSET clear notice unread : key="+redisKey), log.KV("notice_type", noticeType))
  129. // 清理推送用户
  130. err = d.ClearPushActionMid(ctx, mid, noticeType)
  131. if err != nil {
  132. log.Errorv(ctx, log.KV("log", "ClearPushActionMid fail: err="+err.Error()))
  133. return
  134. }
  135. return
  136. }
  137. // GetUnreadInfo 获取未读情况
  138. func (d *Dao) GetUnreadInfo(ctx context.Context, mid int64) (list []*v1.UnreadItem, err error) {
  139. redisKey := fmt.Sprintf(_redisUnreadKey, mid)
  140. conn := d.redis.Get(ctx)
  141. defer conn.Close()
  142. expireResult, _ := redis.Int(conn.Do("EXPIRE", redisKey, _redisExpireTime))
  143. if expireResult == 0 {
  144. log.V(1).Infov(ctx, log.KV("log", "expire fail: key="+redisKey))
  145. return
  146. }
  147. result, err := redis.Int64s(conn.Do("HMGET", redisKey, 1, 2, 3, 4))
  148. if err != nil {
  149. log.Errorv(ctx, log.KV("log", "hmget notice unread fail: err="+err.Error()))
  150. return
  151. }
  152. for i, val := range result {
  153. var item v1.UnreadItem
  154. item.NoticeType = int32(i + 1)
  155. item.UnreadNum = val
  156. list = append(list, &item)
  157. }
  158. return
  159. }