video.go 23 KB


  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql"
  6. "encoding/json"
  7. "fmt"
  8. "go-common/app/service/bbq/video/api/grpc/v1"
  9. "go-common/app/service/bbq/video/model"
  10. acc "go-common/app/service/main/account/api"
  11. "go-common/library/cache/redis"
  12. xsql "go-common/library/database/sql"
  13. "go-common/library/ecode"
  14. "go-common/library/log"
  15. "go-common/library/net/metadata"
  16. "go-common/library/xstr"
  17. xhttp "net/http"
  18. "regexp"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. const (
  24. _BVCSubTableSize = 100
  25. _queryVideo = "SELECT svid FROM video WHERE svid = ?"
  26. _addVideo = "INSERT INTO video(`cover_url`,`cover_width`,`cover_height`,`svid`,`title`,`mid`,`avid`,`cid`,`pubtime`,`from`,`tid`,`sub_tid`,`home_img_url`,`home_img_width`,`home_img_height`,`state`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
  27. _queryTagByName = "SELECT `id` FROM tag WHERE name = ? and type = ?"
  28. _insertTag = "INSERT INTO tag (`name`,`type`,`status`) VALUES %s "
  29. _insOrUpUserBase = "INSERT IGNORE user_base (mid, uname, face, user_type) VALUES (?, ?, ?, ?) "
  30. _insOrUpUserSta = "INSERT IGNORE user_statistics_hive (mid, uname) VALUES (?, ?)"
  31. _queryStatisticsList = "select `svid`, `play`, `subtitles`, `like`, `share`, `report` from video_statistics where svid in (%s)"
  32. _addBVCData = "insert into %s (`svid`,`path`,`resolution_retio`,`code_rate`,`video_code`,`duration`,`file_size`) values (?,?,?,?,?,?,?)"
  33. _updateBVCData = "update %s set path=?, resolution_retio=?, video_code=?, duration=?, file_size=? where svid = ? and code_rate = ?"
  34. _updateSvPIC = "update video_repository set cover_url=?,cover_width=?,cover_height=? ,sync_status = sync_status|? where svid = ?"
  35. _addVideoViews = "update `video_statistics` set `play` = `play` + ? where `svid` = ?"
  36. _existedStatistics = "select `id` from `video_statistics` where `svid` = ?;"
  37. _insertStatistics = "insert into `video_statistics`(`svid`, `play`, `subtitles`, `like`, `share`, `report`) values(?,?,?,?,?,?);"
  38. _queryVideoList = "select `avid`, `cid`, `svid`, `title`, `mid`, `content`, `pubtime`,`duration`,`tid`,`sub_tid`,`cover_url`,`cover_width`,`cover_height`,`limits`, `state` from video where svid in (%s)"
  39. _updateVideoState = "update `video` set `state` = ? where `svid`= ?;"
  40. )
  41. const (
  42. videoBaseCacheExpire = 600
  43. videoBaseCacheKey = "video_base:%d"
  44. )
  45. func keyVideoBase(svid int64) string {
  46. return fmt.Sprintf(videoBaseCacheKey, svid)
  47. }
  48. // ModifyLimits .
  49. func (d *Dao) ModifyLimits(c context.Context, svid int64, limitType uint64, limitOp uint64) (num int64, err error) {
  50. // 根据操作选择合适的limits update语句
  51. limitOpCond := fmt.Sprintf("|%d", 1<<limitType)
  52. if limitOp == 0 {
  53. limitOpCond = fmt.Sprintf("&~%d", 1<<limitType)
  54. }
  55. querySQL := fmt.Sprintf("update video set limits = limits%s where svid = %d", limitOpCond, svid)
  56. res, err := d.db.Exec(c, querySQL)
  57. if err != nil {
  58. log.Warnw(c, "log", "modify video limits fail", "sql", querySQL)
  59. return
  60. }
  61. num, _ = res.RowsAffected()
  62. log.V(1).Infow(c, "sql", querySQL, "affected_num", num)
  63. d.DelCacheVideoBase(c, svid)
  64. return
  65. }
  66. // RawVideoBase mysql获取video_base
  67. func (d *Dao) RawVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) {
  68. res = make(map[int64]*v1.VideoBase)
  69. if len(svids) == 0 {
  70. return
  71. }
  72. querySQL := fmt.Sprintf(_queryVideoList, xstr.JoinInts(svids))
  73. rows, err := d.db.Query(c, querySQL)
  74. if err != nil {
  75. log.Errorw(c, "log", "get video base from mysql fail", "sql", querySQL, "err", err)
  76. return
  77. }
  78. defer rows.Close()
  79. log.V(1).Infow(c, "log", "raw get video base from mysql", "sql", querySQL)
  80. for rows.Next() {
  81. sv := new(v1.VideoBase)
  82. if err = rows.Scan(&sv.Avid, &sv.Cid, &sv.Svid, &sv.Title, &sv.Mid, &sv.Content, &sv.Pubtime, &sv.Duration, &sv.Tid, &sv.SubTid, &sv.CoverUrl, &sv.CoverWidth, &sv.CoverHeight, &sv.Limits, &sv.State); err != nil {
  83. log.Error("row.Scan() error(%v)", err)
  84. return
  85. }
  86. res[sv.Svid] = sv
  87. }
  88. if len(svids) > len(res) {
  89. var rspID []int64
  90. for k := range res {
  91. rspID = append(rspID, k)
  92. }
  93. log.Warnw(c, "log", fmt.Sprintf("video req and rsp size not equal: req=%v, rsp=%v", svids, rspID))
  94. }
  95. log.V(1).Infow(c, "req_size", len(svids), "rsp_size", len(res))
  96. return
  97. }
  98. // CacheVideoBase cache video base
  99. func (d *Dao) CacheVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) {
  100. res = make(map[int64]*v1.VideoBase)
  101. keys := make([]string, 0, len(svids))
  102. keyMidMap := make(map[int64]bool, len(svids))
  103. for _, svid := range svids {
  104. key := keyVideoBase(svid)
  105. if _, exist := keyMidMap[svid]; !exist {
  106. // duplicate svid
  107. keyMidMap[svid] = true
  108. keys = append(keys, key)
  109. }
  110. }
  111. conn := d.redis.Get(c)
  112. defer conn.Close()
  113. for _, key := range keys {
  114. conn.Send("GET", key)
  115. }
  116. conn.Flush()
  117. var data []byte
  118. for i := 0; i < len(keys); i++ {
  119. if data, err = redis.Bytes(conn.Receive()); err != nil {
  120. if err == redis.ErrNil {
  121. err = nil
  122. } else {
  123. log.Errorv(c, log.KV("event", "redis_get"), log.KV("key", keys[i]))
  124. }
  125. continue
  126. }
  127. baseItem := new(v1.VideoBase)
  128. json.Unmarshal(data, baseItem)
  129. res[baseItem.Svid] = baseItem
  130. }
  131. log.Infov(c, log.KV("event", "redis_get"), log.KV("row_num", len(res)))
  132. return
  133. }
  134. // AddCacheVideoBase 添加缓存
  135. func (d *Dao) AddCacheVideoBase(c context.Context, videoBases map[int64]*v1.VideoBase) (err error) {
  136. keyValueMap := make(map[string][]byte, len(videoBases))
  137. for mid, videoBase := range videoBases {
  138. key := keyVideoBase(mid)
  139. if _, exist := keyValueMap[key]; !exist {
  140. data, _ := json.Marshal(videoBase)
  141. keyValueMap[key] = data
  142. }
  143. }
  144. conn := d.redis.Get(c)
  145. defer conn.Close()
  146. for key, value := range keyValueMap {
  147. conn.Send("SET", key, value, "EX", videoBaseCacheExpire)
  148. }
  149. conn.Flush()
  150. for i := 0; i < len(keyValueMap); i++ {
  151. conn.Receive()
  152. }
  153. log.Infov(c, log.KV("event", "redis_set"), log.KV("row_num", len(videoBases)))
  154. return
  155. }
  156. // DelCacheVideoBase 删除缓存
  157. func (d *Dao) DelCacheVideoBase(c context.Context, svid int64) {
  158. var key = keyVideoBase(svid)
  159. conn := d.redis.Get(c)
  160. defer conn.Close()
  161. conn.Do("DEL", key)
  162. }
  163. // AddOrUpdateVideo 添加或更新视频记录
  164. func (d *Dao) AddOrUpdateVideo(c context.Context, vh *v1.ImportVideoInfo) (err error) {
  165. var (
  166. svid int64
  167. )
  168. tx, err := d.BeginTran(c)
  169. if err != nil {
  170. log.Error("begin transaction err :%v", err)
  171. return
  172. }
  173. defer func() {
  174. if err != nil {
  175. if err = tx.Rollback(); err != nil {
  176. log.Error("tx.Rollback() error(%v)", err)
  177. }
  178. } else {
  179. if err = tx.Commit(); err != nil {
  180. log.Error("tx.Commit() error(%v)", err)
  181. }
  182. }
  183. }()
  184. p := &model.VideoInfo{
  185. CoverURL: vh.CoverUrl,
  186. CoverWidth: vh.CoverWidth,
  187. CoverHeight: vh.CoverHeight,
  188. SVID: vh.Svid,
  189. Title: vh.Title,
  190. MID: vh.MID,
  191. AVID: vh.AVID,
  192. CID: vh.CID,
  193. Pubtime: vh.Pubtime,
  194. From: int16(vh.From),
  195. State: int16(vh.State),
  196. TID: vh.TID,
  197. SubTID: vh.SubTID,
  198. HomeImgURL: vh.HomeImgUrl,
  199. HomeImgWidth: vh.HomeImgWidth,
  200. HomeImgHeight: vh.HomeImgHeight,
  201. }
  202. if err = tx.QueryRow(_queryVideo, vh.Svid).Scan(&svid); err == sql.ErrNoRows {
  203. if err = d.txInsertVideo(c, tx, p); err != nil {
  204. log.Warn("insert video err:%v,svid:%v", err, vh.Svid)
  205. return
  206. }
  207. } else if err != nil {
  208. log.Error("video queryrow scan err:[%v], svid[%v]", err, vh.Svid)
  209. return
  210. }
  211. //sync video_upload_process status
  212. if err = d.txUpdateVideoUploadProcessStatus(c, tx, vh.Svid, model.VideoUploadProcessStatusSuccessed); err != nil {
  213. log.Errorw(c, "event", "d.UpdateVideoUploadProcessStatus err", "err", err)
  214. }
  215. return
  216. }
  217. //UpdateVideoUploadProcessStatus ...
  218. func (d *Dao) txUpdateVideoUploadProcessStatus(ctx context.Context, tx *xsql.Tx, SVID int64, st int64) (err error) {
  219. if _, err = tx.Exec("update video_upload_process set upload_status = ? where svid = ?", st, SVID); err != nil {
  220. log.Errorw(ctx, "errmsg", "UpdateVideoUploadProcessStatus update failed", "err", err)
  221. }
  222. return
  223. }
  224. //txInsertVideo insert video
  225. func (d *Dao) txInsertVideo(c context.Context, tx *xsql.Tx, vh *model.VideoInfo) (err error) {
  226. if _, err = tx.Exec(_addVideo,
  227. vh.CoverURL,
  228. vh.CoverWidth,
  229. vh.CoverHeight,
  230. vh.SVID,
  231. vh.Title,
  232. vh.MID,
  233. vh.AVID,
  234. vh.CID,
  235. vh.Pubtime,
  236. vh.From,
  237. vh.TID,
  238. vh.SubTID,
  239. vh.HomeImgURL,
  240. vh.HomeImgWidth,
  241. vh.HomeImgHeight,
  242. vh.State,
  243. ); err != nil {
  244. log.Errorw(c, "event", "insert video err", "err", err, "param", vh)
  245. return
  246. }
  247. return
  248. }
  249. // AddOrUpdateTag 更新或添加标签
  250. func (d *Dao) AddOrUpdateTag(c context.Context, tmap []*v1.TagInfo) (tids []int64, err error) {
  251. // 检查已存在的tag
  252. for _, v := range tmap {
  253. row := d.db.QueryRow(c, _queryTagByName, v.TagName, v.TagType)
  254. t := &model.Tag{
  255. Type: v.TagType,
  256. Name: v.TagName,
  257. }
  258. err = row.Scan(&t.ID)
  259. if err == sql.ErrNoRows {
  260. var q string
  261. var id int64
  262. var res sql.Result
  263. n := strings.Replace(t.Name, "'", "\\'", -1)
  264. q = "('" + n + "'," + strconv.FormatInt(int64(t.Type), 10) + ",1)"
  265. res, _ = d.db.Exec(c, fmt.Sprintf(_insertTag, q))
  266. if res != nil {
  267. id, err = res.LastInsertId()
  268. }
  269. if id != 0 {
  270. tids = append(tids, id)
  271. }
  272. } else if t.ID != 0 {
  273. tids = append(tids, t.ID)
  274. } else {
  275. log.Error("d.db.QueryRow[%v],err:%v", v.TagName, err)
  276. return
  277. }
  278. }
  279. return
  280. }
  281. //根据mids批量查询用户基本信息
  282. func (d *Dao) getUserInfos(c context.Context, mids []int64) (userBases []*model.UserBase, err error) {
  283. midsReq := &acc.MidsReq{
  284. Mids: mids,
  285. RealIp: metadata.String(c, metadata.RemoteIP)}
  286. infosReply, err := d.AccountClient.Infos3(c, midsReq)
  287. if infosReply == nil {
  288. log.Error("query infos3 failed, err (%v)", err)
  289. return
  290. }
  291. userBases = make([]*model.UserBase, 0, 50)
  292. for _, info := range infosReply.Infos {
  293. if info.Mid != 0 {
  294. if len(info.Face) > 255 {
  295. info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
  296. log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png, mid(%v)", info.Mid)
  297. }
  298. userBase := &model.UserBase{
  299. Mid: info.Mid,
  300. Name: info.Name,
  301. Sex: info.Sex,
  302. Face: info.Face,
  303. Sign: info.Sign,
  304. Rank: info.Rank,
  305. }
  306. userBases = append(userBases, userBase)
  307. }
  308. }
  309. return
  310. }
  311. //根据mid查询用户基本信息
  312. func (d *Dao) getUserInfo(c context.Context, mid int64) (userBase *model.UserBase, err error) {
  313. midReq := &acc.MidReq{
  314. Mid: mid,
  315. RealIp: metadata.String(c, metadata.RemoteIP)}
  316. info, err := d.AccountClient.Info3(c, midReq)
  317. if err != nil {
  318. log.Error("query info3 failed,mid(%v), err(%v)", mid, err)
  319. return
  320. }
  321. if len(info.Info.Face) > 255 {
  322. info.Info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
  323. log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png,,mid(%v)", mid)
  324. }
  325. userBase = &model.UserBase{
  326. Mid: info.Info.Mid,
  327. Name: info.Info.Name,
  328. Sex: info.Info.Sex,
  329. Face: info.Info.Face,
  330. Sign: info.Info.Sign,
  331. Rank: info.Info.Rank,
  332. }
  333. log.Info("getUserInfo userbase (%v)", userBase)
  334. return
  335. }
  336. //InOrUpUserBase 更新用户基本信息
  337. func (d *Dao) InOrUpUserBase(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) {
  338. var (
  339. retry = 3
  340. try int
  341. tx *xsql.Tx
  342. res sql.Result
  343. )
  344. userBase, _ := d.getUserInfo(c, mid)
  345. response = &v1.SyncUserBaseResponse{Affc: -1}
  346. for try = 0; try <= retry; try++ {
  347. if tx, err = d.BeginTran(c); err != nil {
  348. time.Sleep(time.Duration(try) * time.Second)
  349. log.Warn("InOrUpUserBase try begin transaction failed ,err(%v)", err)
  350. continue
  351. }
  352. if res, err = tx.Exec(
  353. _insOrUpUserBase,
  354. userBase.Mid,
  355. userBase.Name,
  356. userBase.Face,
  357. ); err != nil {
  358. if err = tx.Rollback(); err != nil {
  359. log.Warn("InOrUpUserBase try rollback failed ,error(%v)", err)
  360. }
  361. } else {
  362. if err = tx.Commit(); err != nil {
  363. log.Warn("InOrUpUserBase try commit failed , error(%v)", err)
  364. } else {
  365. //提交成功,退出
  366. response.Affc, _ = res.RowsAffected()
  367. log.Info("InOrUpUserBase success, affected %v rows", response.Affc)
  368. break
  369. }
  370. }
  371. }
  372. if err != nil {
  373. log.Error("InOrUpUserBase failed, mid(%v), err(%v)", mid, err)
  374. }
  375. return
  376. }
  377. //InOrUpUserBases 批量更新用户基本信息
  378. func (d *Dao) InOrUpUserBases(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) {
  379. var (
  380. retry = 3
  381. try int
  382. tx *xsql.Tx
  383. res sql.Result
  384. )
  385. userBases, _ := d.getUserInfos(c, mids)
  386. response = &v1.SyncUserBaseResponse{Affc: -1}
  387. for try = 0; try <= retry; try++ {
  388. if tx, err = d.BeginTran(c); err != nil {
  389. time.Sleep(time.Duration(try) * time.Second)
  390. log.Warn("InOrUpUserBases try begin transaction failed failed ,error(%v)", err)
  391. continue
  392. }
  393. sql := "INSERT INTO user_base (mid, uname, face, user_type) VALUES "
  394. for _, userBase := range userBases {
  395. if userBase.Mid != 0 {
  396. sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "','" + userBase.Face + "', 1),"
  397. }
  398. }
  399. if sql == "INSERT INTO user_base (mid, uname, face) VALUES " {
  400. response.Affc = 0
  401. log.Info("InOrUpUserBases param mids are not exist")
  402. return
  403. }
  404. sql = sql[0:len(sql)-1] + " ON DUPLICATE KEY UPDATE uname=values(uname), face=values(face);"
  405. if res, err = tx.Exec(sql); err != nil {
  406. log.Info("InOrUpUserBases sql = (%s)", sql)
  407. if err = tx.Rollback(); err != nil {
  408. log.Warn("InOrUpUserBases try rollback failed ,error(%v)", err)
  409. }
  410. } else {
  411. log.Info("InOrUpUserBases sql = (%s)", sql)
  412. if err = tx.Commit(); err != nil {
  413. log.Warn("InOrUpUserBases try commit failed , error(%v)", err)
  414. } else {
  415. //提交成功,退出
  416. response.Affc, _ = res.RowsAffected()
  417. log.Info("InOrUpUserBases commit success, affected %v rows", response.Affc)
  418. break
  419. }
  420. }
  421. }
  422. if err != nil {
  423. log.Error("InOrUpUserBases failed, err(%v)", err)
  424. }
  425. return
  426. }
  427. //InOrUpUserSta 更新用户up主主站画像
  428. func (d *Dao) InOrUpUserSta(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) {
  429. var (
  430. retry = 3
  431. try int
  432. tx *xsql.Tx
  433. res sql.Result
  434. )
  435. log.Info("InOrUpUserSta start")
  436. response = &v1.SyncUserBaseResponse{Affc: -1}
  437. userBase, _ := d.getUserInfo(c, mid)
  438. for try = 0; try <= retry; try++ {
  439. if tx, err = d.BeginTran(c); err != nil {
  440. time.Sleep(time.Duration(try) * time.Second)
  441. log.Info("InOrUpUserSta on mid(%v) try begin transaction failed failed ,error(%v)", userBase.Mid, err)
  442. continue
  443. }
  444. if res, err = tx.Exec(
  445. _insOrUpUserSta,
  446. userBase.Mid,
  447. userBase.Name,
  448. ); err != nil {
  449. fmt.Printf("sql exec error,err(%v)", err)
  450. if err = tx.Rollback(); err != nil {
  451. log.Info("InOrUpUserSta on mid(%v) rollback failed ,error(%v)", userBase.Mid, err)
  452. } else {
  453. fmt.Println("rollbacked")
  454. }
  455. } else {
  456. if err = tx.Commit(); err != nil {
  457. log.Info("InOrUpUserSta on mid(%v) commit failed , error(%v)", userBase.Mid, err)
  458. } else {
  459. //提交成功,退出
  460. response.Affc, _ = res.RowsAffected()
  461. break
  462. }
  463. }
  464. }
  465. if err != nil {
  466. log.Error("InOrUpUserSta mid(%v) failed, err(%v)", mid, err)
  467. }
  468. return
  469. }
  470. //InOrUpUserStas 批量更新用户状态
  471. func (d *Dao) InOrUpUserStas(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) {
  472. var (
  473. retry = 3
  474. try int
  475. tx *xsql.Tx
  476. res sql.Result
  477. )
  478. log.Info("InOrUpUserStas start")
  479. response = &v1.SyncUserBaseResponse{Affc: -1}
  480. userBases, _ := d.getUserInfos(c, mids)
  481. for try = 0; try <= retry; try++ {
  482. if tx, err = d.BeginTran(c); err != nil {
  483. time.Sleep(time.Duration(try) * time.Second)
  484. log.Warn("InOrUpUserStas try begin transaction failed failed ,error(%v)", err)
  485. continue
  486. }
  487. sql := "INSERT INTO user_statistics_hive (mid, uname) VALUES"
  488. for _, userBase := range userBases {
  489. sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "'),"
  490. }
  491. sql = sql[0:len(sql)-1] + "ON DUPLICATE KEY UPDATE uname=values(uname)"
  492. if res, err = tx.Exec(sql); err != nil {
  493. if err = tx.Rollback(); err != nil {
  494. log.Warn("InOrUpUserStas try rollback failed ,error(%v)", err)
  495. } else {
  496. log.Warn("InOrUpUserStas rollbacked")
  497. }
  498. } else {
  499. if err = tx.Commit(); err != nil {
  500. log.Warn("InOrUpUserStas try commit failed , error(%v)", err)
  501. } else {
  502. //提交成功,退出
  503. response.Affc, _ = res.RowsAffected()
  504. log.Info("InOrUpUserStas on commit success, affected %v rows", response.Affc)
  505. break
  506. }
  507. }
  508. }
  509. if err != nil {
  510. log.Error("InOrUpUserSta run failed, err(%v)", err)
  511. }
  512. return
  513. }
  514. // GetVideoBvcTable 获取bvc分表名
  515. func (d *Dao) getVideoBvcTable(svid int64) string {
  516. return fmt.Sprintf("video_bvc_%02d", svid%_BVCSubTableSize)
  517. }
  518. //RawVideoStatistic get video statistics
  519. func (d *Dao) RawVideoStatistic(c context.Context, svids []int64) (res map[int64]*model.SvStInfo, err error) {
  520. const maxIDNum = 20
  521. var (
  522. idStr string
  523. )
  524. res = make(map[int64]*model.SvStInfo)
  525. if len(svids) > maxIDNum {
  526. svids = svids[:maxIDNum]
  527. }
  528. l := len(svids)
  529. for k, svid := range svids {
  530. if k < l-1 {
  531. idStr += strconv.FormatInt(svid, 10) + ","
  532. } else {
  533. idStr += strconv.FormatInt(svid, 10)
  534. }
  535. res[svid] = &model.SvStInfo{}
  536. }
  537. rows, err := d.db.Query(c, fmt.Sprintf(_queryStatisticsList, idStr))
  538. if err != nil {
  539. log.Error("query error(%s)", err.Error())
  540. return
  541. }
  542. defer rows.Close()
  543. for rows.Next() {
  544. ssv := new(model.SvStInfo)
  545. if err = rows.Scan(&ssv.SVID, &ssv.Play, &ssv.Subtitles, &ssv.Like, &ssv.Share, &ssv.Report); err != nil {
  546. log.Error("RawVideoStatistic rows.Scan() error(%v)", err)
  547. return
  548. }
  549. res[ssv.SVID] = ssv
  550. }
  551. cmtCount, _ := d.ReplyCounts(c, svids, DefaultCmType)
  552. for id, cmt := range cmtCount {
  553. if _, ok := res[id]; ok {
  554. res[id].Reply = cmt.Count
  555. }
  556. }
  557. return
  558. }
  559. // CommitTrans 提交转码
  560. func (d *Dao) CommitTrans(c context.Context, arg *v1.BVideoTransRequset) error {
  561. path, ok := d.c.URLs["bvc_push"]
  562. if !ok {
  563. log.Warnv(c, log.KV("log", "bvc_push url not set"))
  564. return ecode.ReqParamErr
  565. }
  566. data, _ := json.Marshal(arg)
  567. b := string(data)
  568. req, err := xhttp.NewRequest("POST", path, bytes.NewBuffer(data))
  569. req.Header.Set("Content-Type", "application/json")
  570. if err != nil {
  571. log.Error("bvc_push url(%s) req(%+v) body(%s) error(%v)", path, req, b, err)
  572. return err
  573. }
  574. var res struct {
  575. Code int `json:"code"`
  576. Msg string `json:"message"`
  577. }
  578. if err = d.httpClient.Do(c, req, &res); err != nil {
  579. log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret (%+v) err[%v]", path, req, b, res, err)))
  580. return err
  581. }
  582. log.V(5).Infov(c, log.KV("log", fmt.Sprintf("bvc_push req(%+v) body(%s) ret (%+v)", req, b, res)))
  583. if res.Code != 0 {
  584. err = ecode.Int(res.Code)
  585. log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret(%+v) error(%v)", path, req, b, res, err)))
  586. return err
  587. }
  588. return nil
  589. }
  590. //AddOrUpdateBVCInfo 添加或更新BVC转码信息
  591. func (d *Dao) AddOrUpdateBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
  592. err = d.AddBVCInfo(c, arg)
  593. if err != nil {
  594. if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched {
  595. err = d.UpdataBVCInfo(c, arg)
  596. return
  597. }
  598. log.Errorv(c,
  599. log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)),
  600. )
  601. }
  602. return
  603. }
  604. //TxAddOrUpdateBVCInfo 事务添加或更新BVC转码信息
  605. func (d *Dao) TxAddOrUpdateBVCInfo(c context.Context, tx *xsql.Tx, arg *model.VideoBVC) (err error) {
  606. err = d.TxAddBVCInfo(tx, arg)
  607. if err != nil {
  608. if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched {
  609. err = d.TxUpdataBVCInfo(tx, arg)
  610. return
  611. }
  612. log.Errorv(c,
  613. log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)),
  614. )
  615. }
  616. return
  617. }
  618. // AddBVCInfo 添加BVC转码信息
  619. func (d *Dao) AddBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
  620. t := d.getVideoBvcTable(arg.SVID)
  621. sql := fmt.Sprintf(_addBVCData, t)
  622. _, err = d.db.Exec(c, sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize)
  623. return
  624. }
  625. // TxAddBVCInfo 事务添加BVC转码信息
  626. func (d *Dao) TxAddBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) {
  627. t := d.getVideoBvcTable(arg.SVID)
  628. sql := fmt.Sprintf(_addBVCData, t)
  629. _, err = tx.Exec(sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize)
  630. return
  631. }
  632. // TxUpdataBVCInfo 事务更新BVC转码信息
  633. func (d *Dao) TxUpdataBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) {
  634. t := d.getVideoBvcTable(arg.SVID)
  635. sql := fmt.Sprintf(_updateBVCData, t)
  636. _, err = tx.Exec(sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate)
  637. return
  638. }
  639. // UpdataBVCInfo 更新BVC转码信息
  640. func (d *Dao) UpdataBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
  641. t := d.getVideoBvcTable(arg.SVID)
  642. sql := fmt.Sprintf(_updateBVCData, t)
  643. _, err = d.db.Exec(c, sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate)
  644. return
  645. }
  646. // UpdateCmsSvPIC 更新封面图
  647. func (d *Dao) UpdateCmsSvPIC(c context.Context, svid int64, pic *v1.SvPic, st int64) error {
  648. _, err := d.cmsdb.Exec(c, _updateSvPIC, pic.PicURL, pic.PicWidth, pic.PicHeight, st, svid)
  649. return err
  650. }
  651. // HostnameRegister .
  652. func (d *Dao) HostnameRegister(hostnameIndex int64) (succ bool) {
  653. conn := d.redis.Get(context.Background())
  654. defer conn.Close()
  655. redisKey := fmt.Sprintf("hostname:index:%d", hostnameIndex)
  656. exists, err := redis.Int(conn.Do("EXISTS", redisKey))
  657. if err != nil {
  658. log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey)))
  659. // 即使redis失败了,也给返回成功
  660. return true
  661. }
  662. if exists == 1 {
  663. return false
  664. }
  665. // 不去管返回结果,永远返回成功
  666. if _, err = conn.Do("SETEX", redisKey, 1000, 1); err != nil {
  667. log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey)))
  668. }
  669. return true
  670. }
  671. // AddVideoViews .
  672. func (d *Dao) AddVideoViews(c context.Context, svid int64, views int) (affected int64, err error) {
  673. row := d.db.QueryRow(c, _existedStatistics, svid)
  674. tmp := 0
  675. if err = row.Scan(&tmp); err != nil || tmp == 0 {
  676. _, err = d.db.Exec(c, _insertStatistics, svid, 0, 0, 0, 0, 0)
  677. if err != nil {
  678. return
  679. }
  680. }
  681. result, err := d.db.Exec(c, _addVideoViews, views, svid)
  682. if err != nil {
  683. return
  684. }
  685. return result.RowsAffected()
  686. }
  687. // VideoStateUpdate .
  688. func (d *Dao) VideoStateUpdate(c context.Context, svid int64, newState int) (aff int64, err error) {
  689. result, err := d.db.Exec(c, _updateVideoState, newState, svid)
  690. if err != nil {
  691. return
  692. }
  693. aff, err = result.RowsAffected()
  694. return
  695. }