topic_video.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/service/bbq/topic/api"
  7. "go-common/app/service/bbq/topic/internal/model"
  8. "go-common/library/database/sql"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. "go-common/library/xstr"
  12. )
  13. const (
  14. _insertTopicVideo = "insert ignore into topic_video (`topic_id`,`svid`,`state`) values %s"
  15. _updateScore = "update topic_video set score=? where svid=%d"
  16. _updateState = "update topic_video set state=? where svid=%d"
  17. _selectTopicVideo = "select svid from topic_video where topic_id=%d and state=0 %s order by score desc limit %d,%d"
  18. _selectVideoTopic = "select topic_id, score, state from topic_video where svid=%d"
  19. )
  20. // InsertTopicVideo 插入topic video表
  21. func (d *Dao) InsertTopicVideo(ctx context.Context, svid int64, topicIDs []int64) (rowsAffected int64, err error) {
  22. var str string
  23. for _, topicID := range topicIDs {
  24. if len(str) != 0 {
  25. str += ","
  26. }
  27. str += fmt.Sprintf("(%d,%d,%d)", topicID, svid, api.TopicVideoStateUnAvailable)
  28. }
  29. insertSQL := fmt.Sprintf(_insertTopicVideo, str)
  30. res, err := d.db.Exec(ctx, insertSQL)
  31. if err != nil {
  32. log.Errorw(ctx, "log", "insert topic_video fail", "svid", svid, "topic_ids", topicIDs)
  33. return
  34. }
  35. rowsAffected, tmpErr := res.RowsAffected()
  36. if tmpErr != nil {
  37. log.Errorw(ctx, "log", "get rows affected fail", "svid", svid, "topic_ids", topicIDs)
  38. }
  39. log.V(1).Infow(ctx, "log", "insert one video topics", "svid", svid, "topics", topicIDs)
  40. return
  41. }
  42. // UpdateVideoScore 更新视频的score
  43. // @param topicID: 携带的时候会修改指定的topicID的video的score,否则会全部修改
  44. func (d *Dao) UpdateVideoScore(ctx context.Context, svid int64, score float64) (err error) {
  45. updateSQL := fmt.Sprintf(_updateScore, svid)
  46. _, err = d.db.Exec(ctx, updateSQL, score)
  47. if err != nil {
  48. log.Errorw(ctx, "log", "update topic video score fail", "svid", svid, "score", score)
  49. return
  50. }
  51. return
  52. }
  53. // UpdateVideoState 更新视频的state
  54. func (d *Dao) UpdateVideoState(ctx context.Context, svid int64, state int32) (err error) {
  55. updateSQL := fmt.Sprintf(_updateState, svid)
  56. _, err = d.db.Exec(ctx, updateSQL, state)
  57. if err != nil {
  58. log.Errorw(ctx, "log", "update topic video score fail", "svid", svid, "state", state)
  59. return
  60. }
  61. return
  62. }
  63. // ListTopicVideos 获取话题下排序的视频列表
  64. // 按道理来说,Dao层不应该有那么多的复杂逻辑的,但是redis、db等操作在业务本身就是耦合在一起的,因此移到dao层,简化逻辑操作
  65. // TODO: 这里把置顶的数据放在了redis里,所以导致排序问题过于复杂,待修正
  66. func (d *Dao) ListTopicVideos(ctx context.Context, topicID int64, cursorPrev, cursorNext string, size int) (res []*api.VideoItem, hasMore bool, err error) {
  67. hasMore = true
  68. // 0. check
  69. if topicID == 0 {
  70. log.Errorw(ctx, "log", "topic_id=0")
  71. return
  72. }
  73. // 0.1 获取cursor和direction
  74. cursor, directionNext, err := parseCursor(ctx, cursorPrev, cursorNext)
  75. if err != nil {
  76. log.Warnw(ctx, "log", "parse cursor fail", "prev", cursorPrev, "next", cursorNext)
  77. return
  78. }
  79. // 1. 获取置顶视频
  80. stickSvid, err := d.GetStickTopicVideo(ctx, topicID)
  81. if err != nil {
  82. log.Warnw(ctx, "log", "get stick topic video fail")
  83. // 获取置顶视频失败后,属于可失败事件,继续往下走
  84. }
  85. stickMap := make(map[int64]bool)
  86. additionalConditionSQL := ""
  87. if len(stickSvid) > 0 {
  88. additionalConditionSQL = fmt.Sprintf("and svid not in (%s)", xstr.JoinInts(stickSvid))
  89. for _, svid := range stickSvid {
  90. stickMap[svid] = true
  91. }
  92. }
  93. // 2. 查询db
  94. var svids []int64
  95. dbOffset := cursor.Offset
  96. limit := size
  97. var rows *sql.Rows
  98. // 有两种情况才需要请求db:1、directionNext;2、directionPrev && stickRank==0
  99. if directionNext || cursor.StickRank == 0 {
  100. if !directionNext {
  101. dbOffset = cursor.Offset - 1 - size
  102. if dbOffset < 0 {
  103. dbOffset = 0
  104. limit = cursor.Offset - 1
  105. }
  106. }
  107. querySQL := fmt.Sprintf(_selectTopicVideo, topicID, additionalConditionSQL, dbOffset, limit)
  108. log.V(1).Infow(ctx, "log", "select topic video", "sql", querySQL)
  109. rows, err = d.db.Query(ctx, querySQL)
  110. if err != nil {
  111. log.Errorw(ctx, "log", "get topic video error", "err", err, "sql", querySQL)
  112. return
  113. }
  114. defer rows.Close()
  115. for rows.Next() {
  116. var svid int64
  117. if err = rows.Scan(&svid); err != nil {
  118. log.Errorw(ctx, "log", "get topic from mysql fail", "sql", querySQL)
  119. return
  120. }
  121. svids = append(svids, svid)
  122. }
  123. log.V(1).Infow(ctx, "log", "get topic video", "svid", svids)
  124. }
  125. // 3. 组装回包
  126. if directionNext {
  127. if dbOffset == 0 {
  128. index := 0
  129. if cursor.StickRank != 0 {
  130. index = cursor.StickRank
  131. }
  132. for ; index < len(stickSvid); index++ {
  133. data, _ := json.Marshal(model.CursorValue{StickRank: index + 1})
  134. res = append(res, &api.VideoItem{Svid: stickSvid[index], CursorValue: string(data)})
  135. }
  136. }
  137. for index, svid := range svids {
  138. data, _ := json.Marshal(model.CursorValue{Offset: dbOffset + 1 + index})
  139. res = append(res, &api.VideoItem{Svid: svid, CursorValue: string(data)})
  140. }
  141. // TODO:为了避免db查询量过大,这里做限制
  142. if len(svids) != limit || dbOffset > model.MaxTopicVideoOffset {
  143. hasMore = false
  144. }
  145. } else {
  146. for index := len(svids) - 1; index >= 0; index-- {
  147. data, _ := json.Marshal(model.CursorValue{Offset: dbOffset + 1 + index})
  148. res = append(res, &api.VideoItem{Svid: svids[index], CursorValue: string(data)})
  149. }
  150. // 如果dbOffset==0,我们会把stick的视频页附上
  151. if dbOffset == 0 {
  152. index := len(stickSvid) - 1
  153. if cursor.StickRank != 0 {
  154. index = cursor.StickRank - 2
  155. }
  156. for ; index >= 0; index-- {
  157. data, _ := json.Marshal(model.CursorValue{StickRank: index + 1})
  158. res = append(res, &api.VideoItem{Svid: stickSvid[index], CursorValue: string(data)})
  159. }
  160. hasMore = false
  161. }
  162. }
  163. // 4. 添加hot_type结果
  164. for _, videoItem := range res {
  165. if _, exists := stickMap[videoItem.Svid]; exists {
  166. videoItem.HotType = api.TopicHotTypeStick
  167. }
  168. }
  169. return
  170. }
  171. // GetVideoTopic 获取视频的话题列表
  172. func (d *Dao) GetVideoTopic(ctx context.Context, svid int64) (list []*api.TopicVideoItem, err error) {
  173. querySQL := fmt.Sprintf(_selectVideoTopic, svid)
  174. rows, err := d.db.Query(ctx, querySQL)
  175. if err != nil {
  176. log.Errorw(ctx, "log", "get video topic_id fail", "svid", svid)
  177. return
  178. }
  179. defer rows.Close()
  180. for rows.Next() {
  181. item := new(api.TopicVideoItem)
  182. item.Svid = svid
  183. if err = rows.Scan(&item.TopicId, &item.Score, &item.State); err != nil {
  184. log.Errorw(ctx, "log", "get topic video fail", "svid", svid)
  185. return
  186. }
  187. list = append(list, item)
  188. }
  189. log.V(1).Infow(ctx, "log", "get video topic", "sql", querySQL)
  190. return
  191. }
  192. // GetStickTopicVideo 获取置顶视频
  193. func (d *Dao) GetStickTopicVideo(ctx context.Context, topicID int64) (list []int64, err error) {
  194. return d.getRedisList(ctx, fmt.Sprintf(model.ReidsStickTopicVideoKey, topicID))
  195. }
  196. // SetStickTopicVideo 设置置顶视频
  197. func (d *Dao) SetStickTopicVideo(ctx context.Context, topicID int64, list []int64) (err error) {
  198. return d.setRedisList(ctx, fmt.Sprintf(model.ReidsStickTopicVideoKey, topicID), list)
  199. }
  200. // StickTopicVideo 操作置顶视频
  201. func (d *Dao) StickTopicVideo(ctx context.Context, opTopicID, opSvid, op int64) (err error) {
  202. // 0. check
  203. topicVideoItems, err := d.GetVideoTopic(ctx, opSvid)
  204. if err != nil {
  205. log.Warnw(ctx, "log", "get svid topic topicVideoItems fail", "topic_id", opTopicID)
  206. return
  207. }
  208. var topicVideoItem *api.TopicVideoItem
  209. for _, item := range topicVideoItems {
  210. if item.TopicId == opTopicID {
  211. topicVideoItem = item
  212. break
  213. }
  214. }
  215. if topicVideoItem == nil {
  216. log.Errorw(ctx, "log", "stick topic fail due to error topic_id", "topic_id", opTopicID)
  217. err = ecode.TopicIDNotFound
  218. return
  219. }
  220. if topicVideoItem.State != api.TopicVideoStateAvailable {
  221. log.Errorw(ctx, "log", "topic video state unavailable to do sticking", "state", topicVideoItem.State, "topic_id", opTopicID)
  222. err = ecode.TopicVideoStateErr
  223. return
  224. }
  225. // 1. 获取stick topic video
  226. stickList, err := d.GetStickTopicVideo(ctx, opTopicID)
  227. if err != nil {
  228. log.Warnw(ctx, "log", "get stick topic video fail")
  229. return
  230. }
  231. // 2. 操作stick topic video
  232. var newStickList []int64
  233. if op != 0 {
  234. newStickList = append(newStickList, opSvid)
  235. }
  236. for _, stickSvid := range stickList {
  237. if stickSvid != opSvid {
  238. newStickList = append(newStickList, stickSvid)
  239. }
  240. }
  241. if len(newStickList) > model.MaxStickTopicVideoNum {
  242. newStickList = newStickList[:model.MaxStickTopicVideoNum]
  243. }
  244. // 3. 更新stick topic video
  245. err = d.SetStickTopicVideo(ctx, opTopicID, newStickList)
  246. if err != nil {
  247. log.Warnw(ctx, "update stick topic video fail")
  248. return
  249. }
  250. return
  251. }