123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- package dao
- import (
- "context"
- "encoding/json"
- "fmt"
- "go-common/app/service/bbq/topic/api"
- "go-common/library/cache/redis"
- "go-common/library/log"
- "go-common/library/xstr"
- )
- const (
- _selectExtension = "select svid, content from extension where svid in (%s)"
- _insertExtension = "insert ignore into extension (`svid`,`type`,`content`) values (?,?,?)"
- )
- const (
- _videoExtensionKey = "ext:%d"
- )
- // RawVideoExtension 从mysql获取extension
- func (d *Dao) RawVideoExtension(ctx context.Context, svids []int64) (res map[int64]*api.VideoExtension, err error) {
- res = make(map[int64]*api.VideoExtension)
- if len(svids) == 0 {
- return
- }
- querySQL := fmt.Sprintf(_selectExtension, xstr.JoinInts(svids))
- rows, err := d.db.Query(ctx, querySQL)
- if err != nil {
- log.Errorw(ctx, "log", "get extension error", "err", err, "sql", querySQL)
- return
- }
- defer rows.Close()
- var svid int64
- var content string
- for rows.Next() {
- if err = rows.Scan(&svid, &content); err != nil {
- log.Errorw(ctx, "log", "get extension from mysql fail", "sql", querySQL)
- return
- }
- // 由于数据库中的数据和缓存中还不太一样,因此这里需要对db读取的数据进行额外处理
- var extension api.Extension
- json.Unmarshal([]byte(content), &extension.TitleExtra)
- // TODO:check
- log.V(10).Infow(ctx, "log", "unmarshal content", "result", extension)
- data, _ := json.Marshal(&extension)
- res[svid] = &api.VideoExtension{Svid: svid, Extension: string(data)}
- }
- log.V(1).Infow(ctx, "log", "get extension", "req", svids, "rsp_size", len(res))
- return
- }
- // CacheVideoExtension 从缓存获取extension
- func (d *Dao) CacheVideoExtension(ctx context.Context, svids []int64) (res map[int64]*api.VideoExtension, err error) {
- res = make(map[int64]*api.VideoExtension)
- conn := d.redis.Get(ctx)
- defer conn.Close()
- for _, svid := range svids {
- conn.Send("GET", fmt.Sprintf(_videoExtensionKey, svid))
- }
- conn.Flush()
- var data string
- for _, svid := range svids {
- if data, err = redis.String(conn.Receive()); err != nil {
- if err == redis.ErrNil {
- err = nil
- } else {
- log.Errorv(ctx, log.KV("event", "redis_get"), log.KV("svid", svid))
- }
- continue
- }
- extension := new(api.VideoExtension)
- extension.Svid = svid
- extension.Extension = data
- res[extension.Svid] = extension
- }
- log.Infov(ctx, log.KV("event", "redis_get"), log.KV("row_num", len(res)))
- return
- }
- // AddCacheVideoExtension 添加extension缓存
- func (d *Dao) AddCacheVideoExtension(ctx context.Context, extensions map[int64]*api.VideoExtension) (err error) {
- conn := d.redis.Get(ctx)
- defer conn.Close()
- for svid, value := range extensions {
- conn.Send("SET", fmt.Sprintf(_videoExtensionKey, svid), value.Extension, "EX", d.topicExpire)
- }
- conn.Flush()
- for i := 0; i < len(extensions); i++ {
- conn.Receive()
- }
- log.Infov(ctx, log.KV("event", "redis_set"), log.KV("row_num", len(extensions)))
- return
- }
- // DelCacheVideoExtension 删除extension缓存
- func (d *Dao) DelCacheVideoExtension(ctx context.Context, svid int64) {
- var key = fmt.Sprintf(_videoExtensionKey, svid)
- conn := d.redis.Get(ctx)
- defer conn.Close()
- conn.Do("DEL", key)
- }
- // InsertExtension 插入extension到db
- func (d *Dao) InsertExtension(ctx context.Context, svid int64, extensionType int64, extension *api.Extension) (rowsAffected int64, err error) {
- data, _ := json.Marshal(extension.TitleExtra)
- res, err := d.db.Exec(ctx, _insertExtension, svid, extensionType, string(data))
- if err != nil {
- log.Errorw(ctx, "log", "insert extension db fail", "svid", svid, "extension_type", extensionType, "extension", extensionType)
- return
- }
- rowsAffected, tmpErr := res.RowsAffected()
- if tmpErr != nil {
- log.Warnw(ctx, "log", "get rows affected fail")
- }
- d.DelCacheVideoExtension(ctx, svid)
- return
- }
|