dao.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/library/cache"
  7. "strconv"
  8. "go-common/library/cache/redis"
  9. xsql "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "go-common/app/service/bbq/sys-msg/api/v1"
  12. "go-common/app/service/bbq/sys-msg/internal/conf"
  13. )
  14. const (
  15. _selectSQL = "select id, type, sender, receiver, jump_url, text, ctime, state from sys_msg where id in (%s)"
  16. _insertSQL = "insert into sys_msg (`type`,`sender`,`receiver`,`jump_url`,`text`) values (?,?,?,?)"
  17. _redisKey = "sys:msg:%d"
  18. _redisExpireS = 600
  19. )
  20. //go:generate $GOPATH/src/go-common/app/tool/cache/gen
  21. type _cache interface {
  22. // cache: -batch=50 -max_group=10 -batch_err=break -nullcache=&v1.SysMsg{Id:0} -check_null_code=$==nil||$.Id==0
  23. SysMsg(c context.Context, ids []int64) (map[int64]*v1.SysMsg, error)
  24. }
  25. // Dao dao
  26. type Dao struct {
  27. c *conf.Config
  28. cache *cache.Cache
  29. redis *redis.Pool
  30. db *xsql.DB
  31. }
  32. // New init mysql db
  33. func New(c *conf.Config) (dao *Dao) {
  34. dao = &Dao{
  35. c: c,
  36. cache: cache.New(1, 1024),
  37. redis: redis.NewPool(c.Redis),
  38. db: xsql.NewMySQL(c.MySQL),
  39. }
  40. return
  41. }
  42. // Close close the resource.
  43. func (d *Dao) Close() {
  44. d.redis.Close()
  45. d.db.Close()
  46. }
  47. // Ping dao ping
  48. func (d *Dao) Ping(ctx context.Context) error {
  49. // TODO: add mc,redis... if you use
  50. return d.db.Ping(ctx)
  51. }
  52. // RawSysMsg 获取系统消息
  53. func (d *Dao) RawSysMsg(ctx context.Context, ids []int64) (res map[int64]*v1.SysMsg, err error) {
  54. if len(ids) == 0 {
  55. return
  56. }
  57. res = make(map[int64]*v1.SysMsg)
  58. querySQL := fmt.Sprintf(_selectSQL, intJoin(ids, ","))
  59. log.V(1).Infov(ctx, log.KV("sql", querySQL))
  60. rows, err := d.db.Query(ctx, querySQL)
  61. if err != nil {
  62. log.Errorv(ctx, log.KV("log", "query mysql sys msg fail"), log.KV("sql", querySQL))
  63. return
  64. }
  65. defer rows.Close()
  66. //"select id, type, sender, receiver, text, ctime from sys_msg where state = 0 and id in (%s)"
  67. for rows.Next() {
  68. var msg v1.SysMsg
  69. if err = rows.Scan(&msg.Id, &msg.Type, &msg.Sender, &msg.Receiver, &msg.JumpUrl, &msg.Text, &msg.Ctime, &msg.State); err != nil {
  70. log.Errorv(ctx, log.KV("log", "scan mysql sys msg fail"), log.KV("sql", querySQL))
  71. return
  72. }
  73. res[msg.Id] = &msg
  74. }
  75. log.V(1).Infov(ctx, log.KV("log", "get sys msg from mysql"), log.KV("req_size", len(ids)), log.KV("rsp_size", len(res)))
  76. return
  77. }
  78. // CreateSysMsg 创建系统消息
  79. func (d *Dao) CreateSysMsg(ctx context.Context, msg *v1.SysMsg) (err error) {
  80. result, err := d.db.Exec(ctx, _insertSQL, msg.Type, msg.Sender, msg.Receiver, msg.JumpUrl, msg.Text)
  81. if err != nil {
  82. log.Errorv(ctx, log.KV("log", "exec mysql fail: create sys msg"), log.KV("sql", _insertSQL), log.KV("msg", msg.String()))
  83. return
  84. }
  85. msgID, _ := result.LastInsertId()
  86. d.DelCacheSysMsg(ctx, msgID)
  87. return
  88. }
  89. func intJoin(raw []int64, split string) (res string) {
  90. for i, v := range raw {
  91. if i != 0 {
  92. res += split
  93. }
  94. res += strconv.FormatInt(v, 10)
  95. }
  96. return
  97. }
  98. // CacheSysMsg .
  99. func (d *Dao) CacheSysMsg(ctx context.Context, ids []int64) (res map[int64]*v1.SysMsg, err error) {
  100. res = make(map[int64]*v1.SysMsg)
  101. conn := d.redis.Get(ctx)
  102. defer conn.Close()
  103. for _, id := range ids {
  104. conn.Send("GET", fmt.Sprintf(_redisKey, id))
  105. }
  106. conn.Flush()
  107. for _, id := range ids {
  108. var by []byte
  109. by, err = redis.Bytes(conn.Receive())
  110. if err == redis.ErrNil {
  111. err = nil
  112. log.V(1).Infov(ctx, log.KV("log", "get sys msg nil from redis"), log.KV("id", id))
  113. continue
  114. }
  115. var msg v1.SysMsg
  116. if err = json.Unmarshal(by, &msg); err != nil {
  117. log.Errorv(ctx, log.KV("log", "unmarshal sys msg fail: str="+string(by)))
  118. return
  119. }
  120. res[id] = &msg
  121. }
  122. return
  123. }
  124. // DelCacheSysMsg 删除sys_msg缓存
  125. func (d *Dao) DelCacheSysMsg(ctx context.Context, msgID int64) {
  126. conn := d.redis.Get(ctx)
  127. defer conn.Close()
  128. redisKey := fmt.Sprintf(_redisKey, msgID)
  129. conn.Do("DEL", redisKey)
  130. log.V(1).Infov(ctx, log.KV("log", "del redis_key: "+redisKey))
  131. }
  132. // AddCacheSysMsg 添加sys_msg缓存
  133. func (d *Dao) AddCacheSysMsg(ctx context.Context, msg map[int64]*v1.SysMsg) {
  134. conn := d.redis.Get(ctx)
  135. defer conn.Close()
  136. for id, val := range msg {
  137. b, _ := json.Marshal(val)
  138. conn.Send("SETEX", fmt.Sprintf(_redisKey, id), _redisExpireS, b)
  139. }
  140. conn.Flush()
  141. for range msg {
  142. conn.Receive()
  143. }
  144. }