sv.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math/rand"
  7. "net/url"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "go-common/app/interface/bbq/app-bbq/api/http/v1"
  13. "go-common/app/interface/bbq/app-bbq/model"
  14. "go-common/app/interface/bbq/app-bbq/model/grpc"
  15. rec "go-common/app/service/bbq/recsys/api/grpc/v1"
  16. video "go-common/app/service/bbq/video/api/grpc/v1"
  17. "go-common/library/database/sql"
  18. "go-common/library/log"
  19. bm "go-common/library/net/http/blademaster"
  20. "go-common/library/net/metadata"
  21. xgrpc "google.golang.org/grpc"
  22. )
  23. const (
  24. _defaultPlatform = "html5"
  25. _playBcNum = 1
  26. _queryList = "select `avid`, `cid`, `svid`, `title`, `mid`, `content`, `pubtime`,`duration`,`tid`,`sub_tid`,`ctime`,`cover_url`,`cover_width`,`cover_height`,`state` from video where svid in (%s)"
  27. _queryRand = "select `svid` from video where state in (4,5) order by mtime desc limit 400;"
  28. _queryStatisticsList = "select `svid`, `play`, `subtitles`, `like`, `share`, `report` from video_statistics where svid in (%s)"
  29. _querySvPlay = "select `svid`,`path`,`resolution_retio`,`code_rate`,`video_code`,`file_size`,`duration` from %s where svid in (%s) and is_deleted = 0 order by code_rate desc"
  30. )
  31. // GetList 获取列表,按照db排序,按page返回,用于推荐的降级
  32. func (d *Dao) GetList(c context.Context, pageSize int64) (result []int64, err error) {
  33. rows, err := d.db.Query(c, _queryRand)
  34. if err != nil {
  35. log.Error("Query(%s) error(%v)", _queryRand, err)
  36. return
  37. }
  38. defer rows.Close()
  39. var list []int64
  40. for rows.Next() {
  41. sv := new(model.SvInfo)
  42. if err = rows.Scan(&sv.SVID); err != nil {
  43. log.Error("row.Scan() error(%v)", err)
  44. return
  45. }
  46. if tag, ok := d.c.Tmap[strconv.FormatInt(sv.TID, 10)]; ok {
  47. sv.Tag = tag
  48. }
  49. list = append(list, sv.SVID)
  50. }
  51. size := len(list)
  52. if size > int(pageSize) {
  53. size = int(pageSize)
  54. }
  55. rand.Seed(time.Now().Unix())
  56. sort.Slice(list, func(i, j int) bool {
  57. return rand.Float32() > 0.5
  58. })
  59. result = list[:size]
  60. return
  61. }
  62. // AttentionRecList 关注页为空时的推荐
  63. func (d *Dao) AttentionRecList(ctx context.Context, size int64, mid int64, buvid string) (svIDs []int64, err error) {
  64. svIDs, err = d.abstractRawRecList(ctx, d.recsysClient.UpsRecService, size, mid, buvid)
  65. log.V(1).Infow(ctx, "log", "get ups rec service", "method", "UpsRecService")
  66. return
  67. }
  68. // RawRecList 获取推荐列表
  69. func (d *Dao) RawRecList(ctx context.Context, size int64, mid int64, buvid string) (svIDs []int64, err error) {
  70. svIDs, err = d.abstractRawRecList(ctx, d.recsysClient.RecService, size, mid, buvid)
  71. log.V(1).Infow(ctx, "log", "get ups rec service", "method", "RecService")
  72. return
  73. }
  74. type recsysFunc func(ctx context.Context, in *rec.RecsysRequest, opts ...xgrpc.CallOption) (*rec.RecsysResponse, error)
  75. // abstractRawRecList 由于访问recsys的方法都是同样的请求&回包,同时过程也一样,因此
  76. func (d *Dao) abstractRawRecList(ctx context.Context, f recsysFunc, size int64, mid int64, buvid string) (svIDs []int64, err error) {
  77. var (
  78. res *rec.RecsysResponse
  79. )
  80. req := &rec.RecsysRequest{
  81. MID: mid,
  82. BUVID: buvid,
  83. Limit: int32(size),
  84. }
  85. if tmp, ok := ctx.(*bm.Context).Get("BBQBase"); ok && tmp != nil {
  86. switch tmp.(type) {
  87. case *v1.Base:
  88. base := tmp.(*v1.Base)
  89. req.App = base.App
  90. req.AppVersion = base.Version
  91. }
  92. }
  93. if tmp, ok := ctx.(*bm.Context).Get("QueryID"); ok && tmp != nil {
  94. switch tmp.(type) {
  95. case string:
  96. req.QueryID = tmp.(string)
  97. }
  98. }
  99. log.Info("Rec请求 params: [%v]", req)
  100. //res, err = d.recsysClient.RecService(ctx, req)
  101. res, err = f(ctx, req)
  102. debug := ctx.(*bm.Context).Request.Header.Get("debug")
  103. if err != nil {
  104. log.Errorv(ctx,
  105. log.KV("log", fmt.Sprintf("d.recsysClient.RecService err [%v]", err)),
  106. )
  107. // 降级(推荐服务已挂)
  108. svIDs = d.GetRandSvList(int(size))
  109. return
  110. } else if len(res.List) == 0 || debug == "1" {
  111. log.Warnv(ctx,
  112. log.KV("log", fmt.Sprintf("d.recsysClient.RecService return empty [%v]", res.List)),
  113. )
  114. // 降级(推荐接口返回空)
  115. svIDs = d.GetRandSvList(int(size))
  116. return
  117. } else {
  118. num := len(res.List)
  119. if int64(num) != size {
  120. log.Warnv(ctx,
  121. log.KV("log", fmt.Sprintf("d.recsysClient.RecService return num[%d] not match size[%d]", num, size)),
  122. )
  123. }
  124. for n, sv := range res.List {
  125. if int64(n) > size {
  126. break
  127. }
  128. svIDs = append(svIDs, sv.Svid)
  129. }
  130. }
  131. return
  132. }
  133. // GetVideoDetail 从数据库video中获取svid相应的信息
  134. func (d *Dao) GetVideoDetail(ctx context.Context, svIDs []int64) (list []*model.SvInfo, retIDs []int64, err error) {
  135. list = make([]*model.SvInfo, 0)
  136. num := len(svIDs)
  137. if num == 0 {
  138. return
  139. }
  140. var IDsStr string
  141. for i, id := range svIDs {
  142. if i < num-1 {
  143. IDsStr += strconv.FormatInt(id, 10) + ","
  144. } else {
  145. IDsStr += strconv.FormatInt(id, 10)
  146. }
  147. }
  148. query := fmt.Sprintf(_queryList, IDsStr)
  149. rows, err := d.db.Query(ctx, query)
  150. if err != nil {
  151. log.Error("Query error(%v)", err)
  152. return
  153. }
  154. defer rows.Close()
  155. for rows.Next() {
  156. sv := new(model.SvInfo)
  157. if err = rows.Scan(&sv.AVID, &sv.CID, &sv.SVID, &sv.Title, &sv.MID, &sv.Content, &sv.Pubtime, &sv.Duration, &sv.TID, &sv.SubTID, &sv.Ctime, &sv.CoverURL, &sv.CoverWidth, &sv.CoverHeight, &sv.State); err != nil {
  158. log.Error("row.Scan() error(%v)", err)
  159. return
  160. }
  161. if tag, ok := d.c.Tmap[strconv.FormatInt(sv.TID, 10)]; ok {
  162. sv.Tag = tag
  163. }
  164. retIDs = append(retIDs, sv.SVID)
  165. list = append(list, sv)
  166. }
  167. return
  168. }
  169. // RawVideos 从数据库批量获取视频信息
  170. func (d *Dao) RawVideos(ctx context.Context, svIDs []int64) (res map[int64]*model.SvInfo, err error) {
  171. res = make(map[int64]*model.SvInfo)
  172. num := len(svIDs)
  173. if num == 0 {
  174. return
  175. }
  176. var IDsStr string
  177. for i, id := range svIDs {
  178. if i < num-1 {
  179. IDsStr += strconv.FormatInt(id, 10) + ","
  180. } else {
  181. IDsStr += strconv.FormatInt(id, 10)
  182. }
  183. }
  184. query := fmt.Sprintf(_queryList, IDsStr)
  185. rows, err := d.db.Query(ctx, query)
  186. if err != nil {
  187. log.Error("Query error(%v)", err)
  188. return
  189. }
  190. defer rows.Close()
  191. for rows.Next() {
  192. sv := new(model.SvInfo)
  193. if err = rows.Scan(&sv.AVID, &sv.CID, &sv.SVID, &sv.Title, &sv.MID, &sv.Content, &sv.Pubtime, &sv.Duration, &sv.TID, &sv.SubTID, &sv.Ctime, &sv.CoverURL, &sv.CoverWidth, &sv.CoverHeight, &sv.State); err != nil {
  194. log.Error("row.Scan() error(%v)", err)
  195. return
  196. }
  197. res[sv.SVID] = sv
  198. }
  199. return
  200. }
  201. // RawVideoStatistic get video statistics
  202. func (d *Dao) RawVideoStatistic(c context.Context, svids []int64) (res map[int64]*model.SvStInfo, err error) {
  203. const maxIDNum = 20
  204. var (
  205. idStr string
  206. )
  207. res = make(map[int64]*model.SvStInfo)
  208. if len(svids) > maxIDNum {
  209. svids = svids[:maxIDNum]
  210. }
  211. l := len(svids)
  212. for k, svid := range svids {
  213. if k < l-1 {
  214. idStr += strconv.FormatInt(svid, 10) + ","
  215. } else {
  216. idStr += strconv.FormatInt(svid, 10)
  217. }
  218. res[svid] = &model.SvStInfo{}
  219. }
  220. rows, err := d.db.Query(c, fmt.Sprintf(_queryStatisticsList, idStr))
  221. if err != nil {
  222. log.Error("query error(%s)", err.Error())
  223. return
  224. }
  225. defer rows.Close()
  226. for rows.Next() {
  227. ssv := new(model.SvStInfo)
  228. if err = rows.Scan(&ssv.SVID, &ssv.Play, &ssv.Subtitles, &ssv.Like, &ssv.Share, &ssv.Report); err != nil {
  229. log.Error("RawVideoStatistic rows.Scan() error(%v)", err)
  230. return
  231. }
  232. res[ssv.SVID] = ssv
  233. }
  234. cmtCount, _ := d.ReplyCounts(c, svids, model.DefaultCmType)
  235. for id, cmt := range cmtCount {
  236. if _, ok := res[id]; ok {
  237. res[id].Reply = cmt.Count
  238. }
  239. }
  240. return
  241. }
  242. // RawPlayURLs 批量获取cid playurl
  243. func (d *Dao) RawPlayURLs(c context.Context, cids []int64, qn int64, plat string) (res map[int64]*v1.CVideo, err error) {
  244. res = make(map[int64]*v1.CVideo)
  245. var cs string
  246. // transfer cid array to string
  247. l := len(cids)
  248. for i := 0; i < l; i++ {
  249. if i != l-1 {
  250. cs += strconv.FormatInt(cids[i], 10) + ","
  251. } else {
  252. cs += strconv.FormatInt(cids[i], 10)
  253. }
  254. }
  255. ip := metadata.String(c, metadata.RemoteIP)
  256. params := url.Values{}
  257. params.Set("cid", cs)
  258. params.Set("qn", strconv.FormatInt(qn, 10))
  259. params.Set("platform", plat)
  260. var ret struct {
  261. Code int `json:"code"`
  262. Data map[string]map[string]*v1.CVideo `json:"data"`
  263. }
  264. err = d.httpClient.Get(c, d.c.URLs["bvc_batch"], ip, params, &ret)
  265. if err != nil || ret.Data["cids"] == nil {
  266. log.Error("http Get err %v", err)
  267. return
  268. }
  269. for id, v := range ret.Data["cids"] {
  270. var cid int64
  271. cid, err = strconv.ParseInt(id, 10, 64)
  272. if err != nil {
  273. log.Error("strconv.ParseInt err %v", err)
  274. }
  275. if _, ok := res[cid]; !ok {
  276. res[cid] = new(v1.CVideo)
  277. }
  278. res[cid] = v
  279. }
  280. return
  281. }
  282. // RelPlayURLs 相对地址批量获取playurl
  283. func (d *Dao) RelPlayURLs(c context.Context, addrs []string) (res map[string]*grpc.VideoKeyItem, err error) {
  284. res = make(map[string]*grpc.VideoKeyItem)
  285. req := &grpc.RequestMsg{
  286. Keys: addrs,
  287. Backup: uint32(_playBcNum),
  288. Platform: _defaultPlatform,
  289. UIP: metadata.String(c, metadata.RemoteIP),
  290. }
  291. _str, _ := json.Marshal(req)
  292. log.V(5).Infov(c, log.KV("log", fmt.Sprintf("bvc play req (%s)", string(_str))))
  293. r, err := d.bvcPlayClient.ProtobufPlayurl(c, req)
  294. _str, _ = json.Marshal(r)
  295. if err != nil {
  296. log.Error("bvc play err[%v] ret[%s]", err, string(_str))
  297. return
  298. }
  299. log.V(5).Infov(c, log.KV("log", fmt.Sprintf("bvc play ret (%s)", string(_str))))
  300. res = r.Data
  301. return
  302. }
  303. //RawSVBvcKey 批量获取playurl相对地址
  304. func (d *Dao) RawSVBvcKey(c context.Context, svids []int64) (res map[int64][]*model.SVBvcKey, err error) {
  305. var (
  306. tb map[string][]string
  307. rows *sql.Rows
  308. )
  309. res = make(map[int64][]*model.SVBvcKey)
  310. tb = make(map[string][]string)
  311. tName := "video_bvc_%02d"
  312. for _, v := range svids {
  313. if v <= 0 {
  314. continue
  315. }
  316. tbName := fmt.Sprintf(tName, v%100)
  317. tb[tbName] = append(tb[tbName], strconv.FormatInt(v, 10))
  318. }
  319. for k, v := range tb {
  320. query := fmt.Sprintf(_querySvPlay, k, strings.Join(v, ","))
  321. if rows, err = d.db.Query(c, query); err != nil {
  322. log.Errorv(c, log.KV("log", "RawSVBvcKey query sql"), log.KV("err", err))
  323. continue
  324. }
  325. for rows.Next() {
  326. tmp := model.SVBvcKey{}
  327. if err = rows.Scan(&tmp.SVID, &tmp.Path, &tmp.ResolutionRetio, &tmp.CodeRate, &tmp.VideoCode, &tmp.FileSize, &tmp.Duration); err != nil {
  328. log.Errorv(c, log.KV("log", "RawSVBvcKey scan"), log.KV("err", err))
  329. continue
  330. }
  331. res[tmp.SVID] = append(res[tmp.SVID], &tmp)
  332. }
  333. }
  334. return
  335. }
  336. // RelRecList 相关推荐列表
  337. func (d *Dao) RelRecList(ctx context.Context, req *rec.RecsysRequest) (svIDs []int64, err error) {
  338. log.V(1).Infov(ctx, log.KV("log", fmt.Sprintf("RelatedRecService req [%+v]", req)))
  339. res, err := d.recsysClient.RelatedRecService(ctx, req)
  340. if err != nil {
  341. log.Errorv(ctx,
  342. log.KV("log", fmt.Sprintf("RelatedRecService err [%v]", err)),
  343. )
  344. return
  345. }
  346. num := len(res.List)
  347. if int32(num) != req.Limit {
  348. log.Errorv(ctx,
  349. log.KV("log", fmt.Sprintf("RelatedRecService ret num[%d] not match req size[%d]", num, req.Limit)),
  350. )
  351. }
  352. for n, sv := range res.List {
  353. if int32(n) > req.Limit {
  354. break
  355. }
  356. svIDs = append(svIDs, sv.Svid)
  357. }
  358. log.V(1).Infov(ctx, log.KV("log", fmt.Sprintf("RelatedRecService svid [%+v]", svIDs)))
  359. return
  360. }
  361. // SvDel 视频删除
  362. func (d *Dao) SvDel(c context.Context, in *video.VideoDeleteRequest) (interface{}, error) {
  363. return d.videoClient.VideoDelete(c, in)
  364. }