123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- package dao
- import (
- "context"
- "fmt"
- "go-common/app/service/bbq/notice-service/api/v1"
- "go-common/app/service/bbq/notice-service/internal/conf"
- push "go-common/app/service/bbq/push/api/grpc/v1"
- "go-common/library/cache/redis"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- "go-common/library/net/rpc/warden"
- )
- const (
- _listSQL = "select id, mid, action_mid, svid, notice_type, title, text, jump_url, biz_type, biz_id, ctime from notice_%02d where mid = ? and notice_type = ? and id < ? order by id desc limit %d"
- _insertSQL = "insert into notice_%02d (mid, action_mid, svid, notice_type, title, text, jump_url, biz_type, biz_id) values (?,?,?,?,?,?,?,?,?)"
- _noticeLen = 10
- _redisUnreadKey = "notice:unread:%d"
- _redisExpireTime = 7776000 // 90days
- )
- // Dao dao
- type Dao struct {
- c *conf.Config
- db *xsql.DB
- redis *redis.Pool
- pushClient push.PushClient
- }
- // New init mysql db
- func New(c *conf.Config) (dao *Dao) {
- dao = &Dao{
- c: c,
- db: xsql.NewMySQL(c.MySQL),
- redis: redis.NewPool(c.Redis),
- pushClient: newPushClient(c.GRPCClient["push"]),
- }
- return
- }
- // Close close the resource.
- func (d *Dao) Close() {
- d.db.Close()
- }
- // Ping dao ping
- func (d *Dao) Ping(ctx context.Context) error {
- // TODO: add mc,redis... if you use
- return d.db.Ping(ctx)
- }
- func getTableIndex(id int64) int64 {
- return id % 100
- }
- // newPushClient .
- func newPushClient(cfg *conf.GRPCClientConfig) push.PushClient {
- cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
- if err != nil {
- panic(err)
- }
- return push.NewPushClient(cc)
- }
- // ListNotices 获取通知列表
- func (d *Dao) ListNotices(ctx context.Context, mid, cursorID int64, noticeType int32) (list []*v1.NoticeBase, err error) {
- querySQL := fmt.Sprintf(_listSQL, getTableIndex(mid), _noticeLen)
- log.V(1).Infov(ctx, log.KV("mid", mid), log.KV("mid", mid), log.KV("notice_type", noticeType), log.KV("cursor_id", cursorID), log.KV("sql", querySQL))
- rows, err := d.db.Query(ctx, querySQL, mid, noticeType, cursorID)
- if err != nil {
- log.Errorv(ctx, log.KV("log", "query mysql notice list fail"), log.KV("sql", querySQL), log.KV("mid", mid), log.KV("biz_type", noticeType), log.KV("cursor_id", cursorID))
- return
- }
- defer rows.Close()
- for rows.Next() {
- var notice v1.NoticeBase
- if err = rows.Scan(¬ice.Id, ¬ice.Mid, ¬ice.ActionMid, ¬ice.SvId, ¬ice.NoticeType, ¬ice.Title, ¬ice.Text, ¬ice.JumpUrl, ¬ice.BizType, ¬ice.BizId, ¬ice.NoticeTime); err != nil {
- log.Errorv(ctx, log.KV("log", "scan mysql notice list fail"), log.KV("sql", querySQL), log.KV("mid", mid), log.KV("biz_type", noticeType), log.KV("mid", mid), log.KV("cursor_id", cursorID))
- return
- }
- list = append(list, ¬ice)
- }
- // 只要用户读取数据,即清理未读数
- conn := d.redis.Get(ctx)
- defer conn.Close()
- redisKey := fmt.Sprintf(_redisUnreadKey, mid)
- if _, tmpErr := conn.Do("HSET", redisKey, noticeType, 0); tmpErr != nil {
- log.Warnv(ctx, log.KV("log", "clear unread info redis fail: key="+redisKey))
- }
- log.V(1).Infov(ctx, log.KV("req_size", _noticeLen), log.KV("rsp_size", len(list)))
- return
- }
- // CreateNotice 创建通知
- func (d *Dao) CreateNotice(ctx context.Context, notice *v1.NoticeBase) (id int64, err error) {
- querySQL := fmt.Sprintf(_insertSQL, getTableIndex(notice.Mid))
- res, err := d.db.Exec(ctx, querySQL, notice.Mid, notice.ActionMid, notice.SvId, notice.NoticeType, notice.Title, notice.Text, notice.JumpUrl, notice.BizType, notice.BizId)
- if err != nil {
- log.Errorv(ctx, log.KV("log", "exec mysql fail: create notice"), log.KV("sql", querySQL))
- return
- }
- id, _ = res.LastInsertId()
- return
- }
- // IncreaseUnread 增加未读
- func (d *Dao) IncreaseUnread(ctx context.Context, mid int64, noticeType int32, num int64) (err error) {
- conn := d.redis.Get(ctx)
- defer conn.Close()
- redisKey := fmt.Sprintf(_redisUnreadKey, mid)
- expireResult, _ := redis.Int(conn.Do("EXPIRE", redisKey, _redisExpireTime))
- if expireResult == 0 {
- log.Infov(ctx, log.KV("log", "expire fail: key="+redisKey))
- }
- _, err = conn.Do("HINCRBY", redisKey, noticeType, num)
- if err != nil {
- log.Errorv(ctx, log.KV("log", "HINCRBY notice unread fail: err="+err.Error()))
- return
- }
- log.V(1).Infov(ctx, log.KV("log", "hincrby notice unread : key="+redisKey), log.KV("notice_type", noticeType), log.KV("num", num))
- return
- }
- // ClearUnread 清理未读
- func (d *Dao) ClearUnread(ctx context.Context, mid int64, noticeType int32) (err error) {
- conn := d.redis.Get(ctx)
- defer conn.Close()
- redisKey := fmt.Sprintf(_redisUnreadKey, mid)
- expireResult, _ := redis.Int(conn.Do("EXPIRE", redisKey, _redisExpireTime))
- if expireResult == 0 {
- log.Infov(ctx, log.KV("log", "expire fail and return: key="+redisKey))
- return
- }
- _, err = conn.Do("HSET", redisKey, noticeType, 0)
- if err != nil {
- log.Errorv(ctx, log.KV("log", "HSET notice unread fail: err="+err.Error()))
- return
- }
- log.V(1).Infov(ctx, log.KV("log", "HSET clear notice unread : key="+redisKey), log.KV("notice_type", noticeType))
- // 清理推送用户
- err = d.ClearPushActionMid(ctx, mid, noticeType)
- if err != nil {
- log.Errorv(ctx, log.KV("log", "ClearPushActionMid fail: err="+err.Error()))
- return
- }
- return
- }
- // GetUnreadInfo 获取未读情况
- func (d *Dao) GetUnreadInfo(ctx context.Context, mid int64) (list []*v1.UnreadItem, err error) {
- redisKey := fmt.Sprintf(_redisUnreadKey, mid)
- conn := d.redis.Get(ctx)
- defer conn.Close()
- expireResult, _ := redis.Int(conn.Do("EXPIRE", redisKey, _redisExpireTime))
- if expireResult == 0 {
- log.V(1).Infov(ctx, log.KV("log", "expire fail: key="+redisKey))
- return
- }
- result, err := redis.Int64s(conn.Do("HMGET", redisKey, 1, 2, 3, 4))
- if err != nil {
- log.Errorv(ctx, log.KV("log", "hmget notice unread fail: err="+err.Error()))
- return
- }
- for i, val := range result {
- var item v1.UnreadItem
- item.NoticeType = int32(i + 1)
- item.UnreadNum = val
- list = append(list, &item)
- }
- return
- }
|