123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736 |
- package dao
- import (
- "bytes"
- "context"
- "database/sql"
- "encoding/json"
- "fmt"
- "go-common/app/service/bbq/video/api/grpc/v1"
- "go-common/app/service/bbq/video/model"
- acc "go-common/app/service/main/account/api"
- "go-common/library/cache/redis"
- xsql "go-common/library/database/sql"
- "go-common/library/ecode"
- "go-common/library/log"
- "go-common/library/net/metadata"
- "go-common/library/xstr"
- xhttp "net/http"
- "regexp"
- "strconv"
- "strings"
- "time"
- )
- const (
- _BVCSubTableSize = 100
- _queryVideo = "SELECT svid FROM video WHERE svid = ?"
- _addVideo = "INSERT INTO video(`cover_url`,`cover_width`,`cover_height`,`svid`,`title`,`mid`,`avid`,`cid`,`pubtime`,`from`,`tid`,`sub_tid`,`home_img_url`,`home_img_width`,`home_img_height`,`state`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
- _queryTagByName = "SELECT `id` FROM tag WHERE name = ? and type = ?"
- _insertTag = "INSERT INTO tag (`name`,`type`,`status`) VALUES %s "
- _insOrUpUserBase = "INSERT IGNORE user_base (mid, uname, face, user_type) VALUES (?, ?, ?, ?) "
- _insOrUpUserSta = "INSERT IGNORE user_statistics_hive (mid, uname) VALUES (?, ?)"
- _queryStatisticsList = "select `svid`, `play`, `subtitles`, `like`, `share`, `report` from video_statistics where svid in (%s)"
- _addBVCData = "insert into %s (`svid`,`path`,`resolution_retio`,`code_rate`,`video_code`,`duration`,`file_size`) values (?,?,?,?,?,?,?)"
- _updateBVCData = "update %s set path=?, resolution_retio=?, video_code=?, duration=?, file_size=? where svid = ? and code_rate = ?"
- _updateSvPIC = "update video_repository set cover_url=?,cover_width=?,cover_height=? ,sync_status = sync_status|? where svid = ?"
- _addVideoViews = "update `video_statistics` set `play` = `play` + ? where `svid` = ?"
- _existedStatistics = "select `id` from `video_statistics` where `svid` = ?;"
- _insertStatistics = "insert into `video_statistics`(`svid`, `play`, `subtitles`, `like`, `share`, `report`) values(?,?,?,?,?,?);"
- _queryVideoList = "select `avid`, `cid`, `svid`, `title`, `mid`, `content`, `pubtime`,`duration`,`tid`,`sub_tid`,`cover_url`,`cover_width`,`cover_height`,`limits`, `state` from video where svid in (%s)"
- _updateVideoState = "update `video` set `state` = ? where `svid`= ?;"
- )
- const (
- videoBaseCacheExpire = 600
- videoBaseCacheKey = "video_base:%d"
- )
- func keyVideoBase(svid int64) string {
- return fmt.Sprintf(videoBaseCacheKey, svid)
- }
- // ModifyLimits .
- func (d *Dao) ModifyLimits(c context.Context, svid int64, limitType uint64, limitOp uint64) (num int64, err error) {
- // 根据操作选择合适的limits update语句
- limitOpCond := fmt.Sprintf("|%d", 1<<limitType)
- if limitOp == 0 {
- limitOpCond = fmt.Sprintf("&~%d", 1<<limitType)
- }
- querySQL := fmt.Sprintf("update video set limits = limits%s where svid = %d", limitOpCond, svid)
- res, err := d.db.Exec(c, querySQL)
- if err != nil {
- log.Warnw(c, "log", "modify video limits fail", "sql", querySQL)
- return
- }
- num, _ = res.RowsAffected()
- log.V(1).Infow(c, "sql", querySQL, "affected_num", num)
- d.DelCacheVideoBase(c, svid)
- return
- }
- // RawVideoBase mysql获取video_base
- func (d *Dao) RawVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) {
- res = make(map[int64]*v1.VideoBase)
- if len(svids) == 0 {
- return
- }
- querySQL := fmt.Sprintf(_queryVideoList, xstr.JoinInts(svids))
- rows, err := d.db.Query(c, querySQL)
- if err != nil {
- log.Errorw(c, "log", "get video base from mysql fail", "sql", querySQL, "err", err)
- return
- }
- defer rows.Close()
- log.V(1).Infow(c, "log", "raw get video base from mysql", "sql", querySQL)
- for rows.Next() {
- sv := new(v1.VideoBase)
- if err = rows.Scan(&sv.Avid, &sv.Cid, &sv.Svid, &sv.Title, &sv.Mid, &sv.Content, &sv.Pubtime, &sv.Duration, &sv.Tid, &sv.SubTid, &sv.CoverUrl, &sv.CoverWidth, &sv.CoverHeight, &sv.Limits, &sv.State); err != nil {
- log.Error("row.Scan() error(%v)", err)
- return
- }
- res[sv.Svid] = sv
- }
- if len(svids) > len(res) {
- var rspID []int64
- for k := range res {
- rspID = append(rspID, k)
- }
- log.Warnw(c, "log", fmt.Sprintf("video req and rsp size not equal: req=%v, rsp=%v", svids, rspID))
- }
- log.V(1).Infow(c, "req_size", len(svids), "rsp_size", len(res))
- return
- }
- // CacheVideoBase cache video base
- func (d *Dao) CacheVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) {
- res = make(map[int64]*v1.VideoBase)
- keys := make([]string, 0, len(svids))
- keyMidMap := make(map[int64]bool, len(svids))
- for _, svid := range svids {
- key := keyVideoBase(svid)
- if _, exist := keyMidMap[svid]; !exist {
- // duplicate svid
- keyMidMap[svid] = true
- keys = append(keys, key)
- }
- }
- conn := d.redis.Get(c)
- defer conn.Close()
- for _, key := range keys {
- conn.Send("GET", key)
- }
- conn.Flush()
- var data []byte
- for i := 0; i < len(keys); i++ {
- if data, err = redis.Bytes(conn.Receive()); err != nil {
- if err == redis.ErrNil {
- err = nil
- } else {
- log.Errorv(c, log.KV("event", "redis_get"), log.KV("key", keys[i]))
- }
- continue
- }
- baseItem := new(v1.VideoBase)
- json.Unmarshal(data, baseItem)
- res[baseItem.Svid] = baseItem
- }
- log.Infov(c, log.KV("event", "redis_get"), log.KV("row_num", len(res)))
- return
- }
- // AddCacheVideoBase 添加缓存
- func (d *Dao) AddCacheVideoBase(c context.Context, videoBases map[int64]*v1.VideoBase) (err error) {
- keyValueMap := make(map[string][]byte, len(videoBases))
- for mid, videoBase := range videoBases {
- key := keyVideoBase(mid)
- if _, exist := keyValueMap[key]; !exist {
- data, _ := json.Marshal(videoBase)
- keyValueMap[key] = data
- }
- }
- conn := d.redis.Get(c)
- defer conn.Close()
- for key, value := range keyValueMap {
- conn.Send("SET", key, value, "EX", videoBaseCacheExpire)
- }
- conn.Flush()
- for i := 0; i < len(keyValueMap); i++ {
- conn.Receive()
- }
- log.Infov(c, log.KV("event", "redis_set"), log.KV("row_num", len(videoBases)))
- return
- }
- // DelCacheVideoBase 删除缓存
- func (d *Dao) DelCacheVideoBase(c context.Context, svid int64) {
- var key = keyVideoBase(svid)
- conn := d.redis.Get(c)
- defer conn.Close()
- conn.Do("DEL", key)
- }
- // AddOrUpdateVideo 添加或更新视频记录
- func (d *Dao) AddOrUpdateVideo(c context.Context, vh *v1.ImportVideoInfo) (err error) {
- var (
- svid int64
- )
- tx, err := d.BeginTran(c)
- if err != nil {
- log.Error("begin transaction err :%v", err)
- return
- }
- defer func() {
- if err != nil {
- if err = tx.Rollback(); err != nil {
- log.Error("tx.Rollback() error(%v)", err)
- }
- } else {
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit() error(%v)", err)
- }
- }
- }()
- p := &model.VideoInfo{
- CoverURL: vh.CoverUrl,
- CoverWidth: vh.CoverWidth,
- CoverHeight: vh.CoverHeight,
- SVID: vh.Svid,
- Title: vh.Title,
- MID: vh.MID,
- AVID: vh.AVID,
- CID: vh.CID,
- Pubtime: vh.Pubtime,
- From: int16(vh.From),
- State: int16(vh.State),
- TID: vh.TID,
- SubTID: vh.SubTID,
- HomeImgURL: vh.HomeImgUrl,
- HomeImgWidth: vh.HomeImgWidth,
- HomeImgHeight: vh.HomeImgHeight,
- }
- if err = tx.QueryRow(_queryVideo, vh.Svid).Scan(&svid); err == sql.ErrNoRows {
- if err = d.txInsertVideo(c, tx, p); err != nil {
- log.Warn("insert video err:%v,svid:%v", err, vh.Svid)
- return
- }
- } else if err != nil {
- log.Error("video queryrow scan err:[%v], svid[%v]", err, vh.Svid)
- return
- }
- //sync video_upload_process status
- if err = d.txUpdateVideoUploadProcessStatus(c, tx, vh.Svid, model.VideoUploadProcessStatusSuccessed); err != nil {
- log.Errorw(c, "event", "d.UpdateVideoUploadProcessStatus err", "err", err)
- }
- return
- }
- //UpdateVideoUploadProcessStatus ...
- func (d *Dao) txUpdateVideoUploadProcessStatus(ctx context.Context, tx *xsql.Tx, SVID int64, st int64) (err error) {
- if _, err = tx.Exec("update video_upload_process set upload_status = ? where svid = ?", st, SVID); err != nil {
- log.Errorw(ctx, "errmsg", "UpdateVideoUploadProcessStatus update failed", "err", err)
- }
- return
- }
- //txInsertVideo insert video
- func (d *Dao) txInsertVideo(c context.Context, tx *xsql.Tx, vh *model.VideoInfo) (err error) {
- if _, err = tx.Exec(_addVideo,
- vh.CoverURL,
- vh.CoverWidth,
- vh.CoverHeight,
- vh.SVID,
- vh.Title,
- vh.MID,
- vh.AVID,
- vh.CID,
- vh.Pubtime,
- vh.From,
- vh.TID,
- vh.SubTID,
- vh.HomeImgURL,
- vh.HomeImgWidth,
- vh.HomeImgHeight,
- vh.State,
- ); err != nil {
- log.Errorw(c, "event", "insert video err", "err", err, "param", vh)
- return
- }
- return
- }
- // AddOrUpdateTag 更新或添加标签
- func (d *Dao) AddOrUpdateTag(c context.Context, tmap []*v1.TagInfo) (tids []int64, err error) {
- // 检查已存在的tag
- for _, v := range tmap {
- row := d.db.QueryRow(c, _queryTagByName, v.TagName, v.TagType)
- t := &model.Tag{
- Type: v.TagType,
- Name: v.TagName,
- }
- err = row.Scan(&t.ID)
- if err == sql.ErrNoRows {
- var q string
- var id int64
- var res sql.Result
- n := strings.Replace(t.Name, "'", "\\'", -1)
- q = "('" + n + "'," + strconv.FormatInt(int64(t.Type), 10) + ",1)"
- res, _ = d.db.Exec(c, fmt.Sprintf(_insertTag, q))
- if res != nil {
- id, err = res.LastInsertId()
- }
- if id != 0 {
- tids = append(tids, id)
- }
- } else if t.ID != 0 {
- tids = append(tids, t.ID)
- } else {
- log.Error("d.db.QueryRow[%v],err:%v", v.TagName, err)
- return
- }
- }
- return
- }
- //根据mids批量查询用户基本信息
- func (d *Dao) getUserInfos(c context.Context, mids []int64) (userBases []*model.UserBase, err error) {
- midsReq := &acc.MidsReq{
- Mids: mids,
- RealIp: metadata.String(c, metadata.RemoteIP)}
- infosReply, err := d.AccountClient.Infos3(c, midsReq)
- if infosReply == nil {
- log.Error("query infos3 failed, err (%v)", err)
- return
- }
- userBases = make([]*model.UserBase, 0, 50)
- for _, info := range infosReply.Infos {
- if info.Mid != 0 {
- if len(info.Face) > 255 {
- info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
- log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png, mid(%v)", info.Mid)
- }
- userBase := &model.UserBase{
- Mid: info.Mid,
- Name: info.Name,
- Sex: info.Sex,
- Face: info.Face,
- Sign: info.Sign,
- Rank: info.Rank,
- }
- userBases = append(userBases, userBase)
- }
- }
- return
- }
- //根据mid查询用户基本信息
- func (d *Dao) getUserInfo(c context.Context, mid int64) (userBase *model.UserBase, err error) {
- midReq := &acc.MidReq{
- Mid: mid,
- RealIp: metadata.String(c, metadata.RemoteIP)}
- info, err := d.AccountClient.Info3(c, midReq)
- if err != nil {
- log.Error("query info3 failed,mid(%v), err(%v)", mid, err)
- return
- }
- if len(info.Info.Face) > 255 {
- info.Info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
- log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png,,mid(%v)", mid)
- }
- userBase = &model.UserBase{
- Mid: info.Info.Mid,
- Name: info.Info.Name,
- Sex: info.Info.Sex,
- Face: info.Info.Face,
- Sign: info.Info.Sign,
- Rank: info.Info.Rank,
- }
- log.Info("getUserInfo userbase (%v)", userBase)
- return
- }
- //InOrUpUserBase 更新用户基本信息
- func (d *Dao) InOrUpUserBase(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) {
- var (
- retry = 3
- try int
- tx *xsql.Tx
- res sql.Result
- )
- userBase, _ := d.getUserInfo(c, mid)
- response = &v1.SyncUserBaseResponse{Affc: -1}
- for try = 0; try <= retry; try++ {
- if tx, err = d.BeginTran(c); err != nil {
- time.Sleep(time.Duration(try) * time.Second)
- log.Warn("InOrUpUserBase try begin transaction failed ,err(%v)", err)
- continue
- }
- if res, err = tx.Exec(
- _insOrUpUserBase,
- userBase.Mid,
- userBase.Name,
- userBase.Face,
- ); err != nil {
- if err = tx.Rollback(); err != nil {
- log.Warn("InOrUpUserBase try rollback failed ,error(%v)", err)
- }
- } else {
- if err = tx.Commit(); err != nil {
- log.Warn("InOrUpUserBase try commit failed , error(%v)", err)
- } else {
- //提交成功,退出
- response.Affc, _ = res.RowsAffected()
- log.Info("InOrUpUserBase success, affected %v rows", response.Affc)
- break
- }
- }
- }
- if err != nil {
- log.Error("InOrUpUserBase failed, mid(%v), err(%v)", mid, err)
- }
- return
- }
- //InOrUpUserBases 批量更新用户基本信息
- func (d *Dao) InOrUpUserBases(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) {
- var (
- retry = 3
- try int
- tx *xsql.Tx
- res sql.Result
- )
- userBases, _ := d.getUserInfos(c, mids)
- response = &v1.SyncUserBaseResponse{Affc: -1}
- for try = 0; try <= retry; try++ {
- if tx, err = d.BeginTran(c); err != nil {
- time.Sleep(time.Duration(try) * time.Second)
- log.Warn("InOrUpUserBases try begin transaction failed failed ,error(%v)", err)
- continue
- }
- sql := "INSERT INTO user_base (mid, uname, face, user_type) VALUES "
- for _, userBase := range userBases {
- if userBase.Mid != 0 {
- sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "','" + userBase.Face + "', 1),"
- }
- }
- if sql == "INSERT INTO user_base (mid, uname, face) VALUES " {
- response.Affc = 0
- log.Info("InOrUpUserBases param mids are not exist")
- return
- }
- sql = sql[0:len(sql)-1] + " ON DUPLICATE KEY UPDATE uname=values(uname), face=values(face);"
- if res, err = tx.Exec(sql); err != nil {
- log.Info("InOrUpUserBases sql = (%s)", sql)
- if err = tx.Rollback(); err != nil {
- log.Warn("InOrUpUserBases try rollback failed ,error(%v)", err)
- }
- } else {
- log.Info("InOrUpUserBases sql = (%s)", sql)
- if err = tx.Commit(); err != nil {
- log.Warn("InOrUpUserBases try commit failed , error(%v)", err)
- } else {
- //提交成功,退出
- response.Affc, _ = res.RowsAffected()
- log.Info("InOrUpUserBases commit success, affected %v rows", response.Affc)
- break
- }
- }
- }
- if err != nil {
- log.Error("InOrUpUserBases failed, err(%v)", err)
- }
- return
- }
- //InOrUpUserSta 更新用户up主主站画像
- func (d *Dao) InOrUpUserSta(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) {
- var (
- retry = 3
- try int
- tx *xsql.Tx
- res sql.Result
- )
- log.Info("InOrUpUserSta start")
- response = &v1.SyncUserBaseResponse{Affc: -1}
- userBase, _ := d.getUserInfo(c, mid)
- for try = 0; try <= retry; try++ {
- if tx, err = d.BeginTran(c); err != nil {
- time.Sleep(time.Duration(try) * time.Second)
- log.Info("InOrUpUserSta on mid(%v) try begin transaction failed failed ,error(%v)", userBase.Mid, err)
- continue
- }
- if res, err = tx.Exec(
- _insOrUpUserSta,
- userBase.Mid,
- userBase.Name,
- ); err != nil {
- fmt.Printf("sql exec error,err(%v)", err)
- if err = tx.Rollback(); err != nil {
- log.Info("InOrUpUserSta on mid(%v) rollback failed ,error(%v)", userBase.Mid, err)
- } else {
- fmt.Println("rollbacked")
- }
- } else {
- if err = tx.Commit(); err != nil {
- log.Info("InOrUpUserSta on mid(%v) commit failed , error(%v)", userBase.Mid, err)
- } else {
- //提交成功,退出
- response.Affc, _ = res.RowsAffected()
- break
- }
- }
- }
- if err != nil {
- log.Error("InOrUpUserSta mid(%v) failed, err(%v)", mid, err)
- }
- return
- }
- //InOrUpUserStas 批量更新用户状态
- func (d *Dao) InOrUpUserStas(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) {
- var (
- retry = 3
- try int
- tx *xsql.Tx
- res sql.Result
- )
- log.Info("InOrUpUserStas start")
- response = &v1.SyncUserBaseResponse{Affc: -1}
- userBases, _ := d.getUserInfos(c, mids)
- for try = 0; try <= retry; try++ {
- if tx, err = d.BeginTran(c); err != nil {
- time.Sleep(time.Duration(try) * time.Second)
- log.Warn("InOrUpUserStas try begin transaction failed failed ,error(%v)", err)
- continue
- }
- sql := "INSERT INTO user_statistics_hive (mid, uname) VALUES"
- for _, userBase := range userBases {
- sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "'),"
- }
- sql = sql[0:len(sql)-1] + "ON DUPLICATE KEY UPDATE uname=values(uname)"
- if res, err = tx.Exec(sql); err != nil {
- if err = tx.Rollback(); err != nil {
- log.Warn("InOrUpUserStas try rollback failed ,error(%v)", err)
- } else {
- log.Warn("InOrUpUserStas rollbacked")
- }
- } else {
- if err = tx.Commit(); err != nil {
- log.Warn("InOrUpUserStas try commit failed , error(%v)", err)
- } else {
- //提交成功,退出
- response.Affc, _ = res.RowsAffected()
- log.Info("InOrUpUserStas on commit success, affected %v rows", response.Affc)
- break
- }
- }
- }
- if err != nil {
- log.Error("InOrUpUserSta run failed, err(%v)", err)
- }
- return
- }
- // GetVideoBvcTable 获取bvc分表名
- func (d *Dao) getVideoBvcTable(svid int64) string {
- return fmt.Sprintf("video_bvc_%02d", svid%_BVCSubTableSize)
- }
- //RawVideoStatistic get video statistics
- func (d *Dao) RawVideoStatistic(c context.Context, svids []int64) (res map[int64]*model.SvStInfo, err error) {
- const maxIDNum = 20
- var (
- idStr string
- )
- res = make(map[int64]*model.SvStInfo)
- if len(svids) > maxIDNum {
- svids = svids[:maxIDNum]
- }
- l := len(svids)
- for k, svid := range svids {
- if k < l-1 {
- idStr += strconv.FormatInt(svid, 10) + ","
- } else {
- idStr += strconv.FormatInt(svid, 10)
- }
- res[svid] = &model.SvStInfo{}
- }
- rows, err := d.db.Query(c, fmt.Sprintf(_queryStatisticsList, idStr))
- if err != nil {
- log.Error("query error(%s)", err.Error())
- return
- }
- defer rows.Close()
- for rows.Next() {
- ssv := new(model.SvStInfo)
- if err = rows.Scan(&ssv.SVID, &ssv.Play, &ssv.Subtitles, &ssv.Like, &ssv.Share, &ssv.Report); err != nil {
- log.Error("RawVideoStatistic rows.Scan() error(%v)", err)
- return
- }
- res[ssv.SVID] = ssv
- }
- cmtCount, _ := d.ReplyCounts(c, svids, DefaultCmType)
- for id, cmt := range cmtCount {
- if _, ok := res[id]; ok {
- res[id].Reply = cmt.Count
- }
- }
- return
- }
- // CommitTrans 提交转码
- func (d *Dao) CommitTrans(c context.Context, arg *v1.BVideoTransRequset) error {
- path, ok := d.c.URLs["bvc_push"]
- if !ok {
- log.Warnv(c, log.KV("log", "bvc_push url not set"))
- return ecode.ReqParamErr
- }
- data, _ := json.Marshal(arg)
- b := string(data)
- req, err := xhttp.NewRequest("POST", path, bytes.NewBuffer(data))
- req.Header.Set("Content-Type", "application/json")
- if err != nil {
- log.Error("bvc_push url(%s) req(%+v) body(%s) error(%v)", path, req, b, err)
- return err
- }
- var res struct {
- Code int `json:"code"`
- Msg string `json:"message"`
- }
- if err = d.httpClient.Do(c, req, &res); err != nil {
- log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret (%+v) err[%v]", path, req, b, res, err)))
- return err
- }
- log.V(5).Infov(c, log.KV("log", fmt.Sprintf("bvc_push req(%+v) body(%s) ret (%+v)", req, b, res)))
- if res.Code != 0 {
- err = ecode.Int(res.Code)
- log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret(%+v) error(%v)", path, req, b, res, err)))
- return err
- }
- return nil
- }
- //AddOrUpdateBVCInfo 添加或更新BVC转码信息
- func (d *Dao) AddOrUpdateBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
- err = d.AddBVCInfo(c, arg)
- if err != nil {
- if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched {
- err = d.UpdataBVCInfo(c, arg)
- return
- }
- log.Errorv(c,
- log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)),
- )
- }
- return
- }
- //TxAddOrUpdateBVCInfo 事务添加或更新BVC转码信息
- func (d *Dao) TxAddOrUpdateBVCInfo(c context.Context, tx *xsql.Tx, arg *model.VideoBVC) (err error) {
- err = d.TxAddBVCInfo(tx, arg)
- if err != nil {
- if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched {
- err = d.TxUpdataBVCInfo(tx, arg)
- return
- }
- log.Errorv(c,
- log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)),
- )
- }
- return
- }
- // AddBVCInfo 添加BVC转码信息
- func (d *Dao) AddBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
- t := d.getVideoBvcTable(arg.SVID)
- sql := fmt.Sprintf(_addBVCData, t)
- _, err = d.db.Exec(c, sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize)
- return
- }
- // TxAddBVCInfo 事务添加BVC转码信息
- func (d *Dao) TxAddBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) {
- t := d.getVideoBvcTable(arg.SVID)
- sql := fmt.Sprintf(_addBVCData, t)
- _, err = tx.Exec(sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize)
- return
- }
- // TxUpdataBVCInfo 事务更新BVC转码信息
- func (d *Dao) TxUpdataBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) {
- t := d.getVideoBvcTable(arg.SVID)
- sql := fmt.Sprintf(_updateBVCData, t)
- _, err = tx.Exec(sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate)
- return
- }
- // UpdataBVCInfo 更新BVC转码信息
- func (d *Dao) UpdataBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
- t := d.getVideoBvcTable(arg.SVID)
- sql := fmt.Sprintf(_updateBVCData, t)
- _, err = d.db.Exec(c, sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate)
- return
- }
- // UpdateCmsSvPIC 更新封面图
- func (d *Dao) UpdateCmsSvPIC(c context.Context, svid int64, pic *v1.SvPic, st int64) error {
- _, err := d.cmsdb.Exec(c, _updateSvPIC, pic.PicURL, pic.PicWidth, pic.PicHeight, st, svid)
- return err
- }
- // HostnameRegister .
- func (d *Dao) HostnameRegister(hostnameIndex int64) (succ bool) {
- conn := d.redis.Get(context.Background())
- defer conn.Close()
- redisKey := fmt.Sprintf("hostname:index:%d", hostnameIndex)
- exists, err := redis.Int(conn.Do("EXISTS", redisKey))
- if err != nil {
- log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey)))
- // 即使redis失败了,也给返回成功
- return true
- }
- if exists == 1 {
- return false
- }
- // 不去管返回结果,永远返回成功
- if _, err = conn.Do("SETEX", redisKey, 1000, 1); err != nil {
- log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey)))
- }
- return true
- }
- // AddVideoViews .
- func (d *Dao) AddVideoViews(c context.Context, svid int64, views int) (affected int64, err error) {
- row := d.db.QueryRow(c, _existedStatistics, svid)
- tmp := 0
- if err = row.Scan(&tmp); err != nil || tmp == 0 {
- _, err = d.db.Exec(c, _insertStatistics, svid, 0, 0, 0, 0, 0)
- if err != nil {
- return
- }
- }
- result, err := d.db.Exec(c, _addVideoViews, views, svid)
- if err != nil {
- return
- }
- return result.RowsAffected()
- }
- // VideoStateUpdate .
- func (d *Dao) VideoStateUpdate(c context.Context, svid int64, newState int) (aff int64, err error) {
- result, err := d.db.Exec(c, _updateVideoState, newState, svid)
- if err != nil {
- return
- }
- aff, err = result.RowsAffected()
- return
- }
|