123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package dao
- import (
- "context"
- "encoding/json"
- "fmt"
- "go-common/library/cache"
- "strconv"
- "go-common/library/cache/redis"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- "go-common/app/service/bbq/sys-msg/api/v1"
- "go-common/app/service/bbq/sys-msg/internal/conf"
- )
- const (
- _selectSQL = "select id, type, sender, receiver, jump_url, text, ctime, state from sys_msg where id in (%s)"
- _insertSQL = "insert into sys_msg (`type`,`sender`,`receiver`,`jump_url`,`text`) values (?,?,?,?)"
- _redisKey = "sys:msg:%d"
- _redisExpireS = 600
- )
- //go:generate $GOPATH/src/go-common/app/tool/cache/gen
- type _cache interface {
- // cache: -batch=50 -max_group=10 -batch_err=break -nullcache=&v1.SysMsg{Id:0} -check_null_code=$==nil||$.Id==0
- SysMsg(c context.Context, ids []int64) (map[int64]*v1.SysMsg, error)
- }
- // Dao dao
- type Dao struct {
- c *conf.Config
- cache *cache.Cache
- redis *redis.Pool
- db *xsql.DB
- }
- // New init mysql db
- func New(c *conf.Config) (dao *Dao) {
- dao = &Dao{
- c: c,
- cache: cache.New(1, 1024),
- redis: redis.NewPool(c.Redis),
- db: xsql.NewMySQL(c.MySQL),
- }
- return
- }
- // Close close the resource.
- func (d *Dao) Close() {
- d.redis.Close()
- d.db.Close()
- }
- // Ping dao ping
- func (d *Dao) Ping(ctx context.Context) error {
- // TODO: add mc,redis... if you use
- return d.db.Ping(ctx)
- }
- // RawSysMsg 获取系统消息
- func (d *Dao) RawSysMsg(ctx context.Context, ids []int64) (res map[int64]*v1.SysMsg, err error) {
- if len(ids) == 0 {
- return
- }
- res = make(map[int64]*v1.SysMsg)
- querySQL := fmt.Sprintf(_selectSQL, intJoin(ids, ","))
- log.V(1).Infov(ctx, log.KV("sql", querySQL))
- rows, err := d.db.Query(ctx, querySQL)
- if err != nil {
- log.Errorv(ctx, log.KV("log", "query mysql sys msg fail"), log.KV("sql", querySQL))
- return
- }
- defer rows.Close()
- //"select id, type, sender, receiver, text, ctime from sys_msg where state = 0 and id in (%s)"
- for rows.Next() {
- var msg v1.SysMsg
- if err = rows.Scan(&msg.Id, &msg.Type, &msg.Sender, &msg.Receiver, &msg.JumpUrl, &msg.Text, &msg.Ctime, &msg.State); err != nil {
- log.Errorv(ctx, log.KV("log", "scan mysql sys msg fail"), log.KV("sql", querySQL))
- return
- }
- res[msg.Id] = &msg
- }
- log.V(1).Infov(ctx, log.KV("log", "get sys msg from mysql"), log.KV("req_size", len(ids)), log.KV("rsp_size", len(res)))
- return
- }
- // CreateSysMsg 创建系统消息
- func (d *Dao) CreateSysMsg(ctx context.Context, msg *v1.SysMsg) (err error) {
- result, err := d.db.Exec(ctx, _insertSQL, msg.Type, msg.Sender, msg.Receiver, msg.JumpUrl, msg.Text)
- if err != nil {
- log.Errorv(ctx, log.KV("log", "exec mysql fail: create sys msg"), log.KV("sql", _insertSQL), log.KV("msg", msg.String()))
- return
- }
- msgID, _ := result.LastInsertId()
- d.DelCacheSysMsg(ctx, msgID)
- return
- }
- func intJoin(raw []int64, split string) (res string) {
- for i, v := range raw {
- if i != 0 {
- res += split
- }
- res += strconv.FormatInt(v, 10)
- }
- return
- }
- // CacheSysMsg .
- func (d *Dao) CacheSysMsg(ctx context.Context, ids []int64) (res map[int64]*v1.SysMsg, err error) {
- res = make(map[int64]*v1.SysMsg)
- conn := d.redis.Get(ctx)
- defer conn.Close()
- for _, id := range ids {
- conn.Send("GET", fmt.Sprintf(_redisKey, id))
- }
- conn.Flush()
- for _, id := range ids {
- var by []byte
- by, err = redis.Bytes(conn.Receive())
- if err == redis.ErrNil {
- err = nil
- log.V(1).Infov(ctx, log.KV("log", "get sys msg nil from redis"), log.KV("id", id))
- continue
- }
- var msg v1.SysMsg
- if err = json.Unmarshal(by, &msg); err != nil {
- log.Errorv(ctx, log.KV("log", "unmarshal sys msg fail: str="+string(by)))
- return
- }
- res[id] = &msg
- }
- return
- }
- // DelCacheSysMsg 删除sys_msg缓存
- func (d *Dao) DelCacheSysMsg(ctx context.Context, msgID int64) {
- conn := d.redis.Get(ctx)
- defer conn.Close()
- redisKey := fmt.Sprintf(_redisKey, msgID)
- conn.Do("DEL", redisKey)
- log.V(1).Infov(ctx, log.KV("log", "del redis_key: "+redisKey))
- }
- // AddCacheSysMsg 添加sys_msg缓存
- func (d *Dao) AddCacheSysMsg(ctx context.Context, msg map[int64]*v1.SysMsg) {
- conn := d.redis.Get(ctx)
- defer conn.Close()
- for id, val := range msg {
- b, _ := json.Marshal(val)
- conn.Send("SETEX", fmt.Sprintf(_redisKey, id), _redisExpireS, b)
- }
- conn.Flush()
- for range msg {
- conn.Receive()
- }
- }
|