dao.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "go-common/app/interface/bbq/bullet/api"
  8. "go-common/app/interface/bbq/bullet/internal/model"
  9. "go-common/library/log"
  10. "go-common/library/net/rpc/warden"
  11. "go-common/app/interface/bbq/bullet/internal/conf"
  12. user "go-common/app/service/bbq/user/api"
  13. video "go-common/app/service/bbq/video/api/grpc/v1"
  14. filter "go-common/app/service/main/filter/api/grpc/v1"
  15. "go-common/library/cache/redis"
  16. xsql "go-common/library/database/sql"
  17. )
  18. // Dao dao
  19. type Dao struct {
  20. c *conf.Config
  21. redis *redis.Pool
  22. db *xsql.DB
  23. filterClient filter.FilterClient
  24. userClient user.UserClient
  25. videoClient video.VideoClient
  26. }
  27. // New init mysql db
  28. func New(c *conf.Config) (dao *Dao) {
  29. dao = &Dao{
  30. c: c,
  31. redis: redis.NewPool(c.Redis),
  32. db: xsql.NewMySQL(c.MySQL),
  33. filterClient: newFilterClient(c.GRPCClient["filter"]),
  34. userClient: newUserClient(c.GRPCClient["user"]),
  35. videoClient: newVideoClient(c.GRPCClient["video"]),
  36. }
  37. return
  38. }
  39. // newVideoClient .
  40. func newVideoClient(cfg *conf.GRPCConf) video.VideoClient {
  41. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  42. if err != nil {
  43. panic(err)
  44. }
  45. return video.NewVideoClient(cc)
  46. }
  47. // newUserClient .
  48. func newUserClient(cfg *conf.GRPCConf) user.UserClient {
  49. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  50. if err != nil {
  51. panic(err)
  52. }
  53. return user.NewUserClient(cc)
  54. }
  55. // newUserClient .
  56. func newFilterClient(cfg *conf.GRPCConf) filter.FilterClient {
  57. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  58. if err != nil {
  59. panic(err)
  60. }
  61. return filter.NewFilterClient(cc)
  62. }
  63. // Close close the resource.
  64. func (d *Dao) Close() {
  65. d.redis.Close()
  66. d.db.Close()
  67. }
  68. // Ping dao ping
  69. func (d *Dao) Ping(ctx context.Context) error {
  70. // TODO: add mc,redis... if you use
  71. return d.db.Ping(ctx)
  72. }
  73. // ContentPost .
  74. func (d *Dao) ContentPost(ctx context.Context, req *api.Bullet) (dmid int64, err error) {
  75. result, err := d.db.Exec(ctx,
  76. "insert into bullet_content (oid, mid, offset_ms, offset, content) values (?, ?, ?, ?, ?)",
  77. req.Oid, req.Mid, req.OffsetMs, req.OffsetMs/1000, req.Content)
  78. if err != nil {
  79. log.Errorv(ctx, log.KV("log", "insert bullet fail: req=%s"+req.String()))
  80. return
  81. }
  82. dmid, err = result.LastInsertId()
  83. return
  84. }
  85. // ContentGet .
  86. func (d *Dao) ContentGet(ctx context.Context, req *api.ListBulletReq) (res []*api.Bullet, err error) {
  87. res = []*api.Bullet{}
  88. mid := req.Mid
  89. querySQL := fmt.Sprintf("select id, mid, offset, content from bullet_content where "+
  90. "oid=%d and state=0 and offset>=%d and offset<%d order by offset, id desc",
  91. req.Oid, req.StartMs/1000, req.EndMs/1000)
  92. rows, err := d.db.Query(ctx, querySQL)
  93. if err != nil {
  94. return
  95. }
  96. defer rows.Close()
  97. log.V(1).Infow(ctx, "sql", querySQL)
  98. // 获取时间范围内的全量视频
  99. var allBullet []*api.Bullet
  100. midBullets := make(map[int32]*[]*api.Bullet)
  101. for rows.Next() {
  102. bullet := new(api.Bullet)
  103. if err = rows.Scan(&bullet.Id, &bullet.Mid, &bullet.Offset, &bullet.Content); err != nil {
  104. log.Errorv(ctx, log.KV("log", "scan mysql fail: sql="+querySQL))
  105. return
  106. }
  107. bullet.OffsetMs = bullet.Offset * 1000
  108. allBullet = append(allBullet, bullet)
  109. // 先把访问者发过的弹幕按照秒级别进行汇总
  110. if mid == bullet.Mid {
  111. v, exists := midBullets[bullet.Offset]
  112. if !exists {
  113. v = new([]*api.Bullet)
  114. midBullets[bullet.Offset] = v
  115. }
  116. *v = append(*v, bullet)
  117. }
  118. }
  119. // 根据全量数据,选择满足条件的弹幕
  120. currSecond := int32(-1)
  121. currSecondCount := 0
  122. for _, bullet := range allBullet {
  123. if currSecond != bullet.Offset {
  124. currSecond = bullet.Offset
  125. currSecondCount = 0
  126. if midBulletArray, exists := midBullets[currSecond]; exists {
  127. log.V(10).Infow(ctx, "log", "current second user have published danmu", "offset", currSecond, "len", len(*midBulletArray))
  128. for _, midBullet := range *midBulletArray {
  129. currSecondCount++
  130. res = append(res, midBullet)
  131. if currSecondCount >= model.SecondMaxNum {
  132. break
  133. }
  134. }
  135. }
  136. }
  137. if currSecondCount >= model.SecondMaxNum {
  138. continue
  139. }
  140. if bullet.Mid != mid {
  141. currSecondCount++
  142. bullet.OffsetMs = bullet.Offset * 1000
  143. res = append(res, bullet)
  144. }
  145. }
  146. if len(res) > 0 {
  147. var cursor CursorValue
  148. cursor.Offset = res[len(res)-1].Offset
  149. b, _ := json.Marshal(cursor)
  150. res[len(res)-1].CursorValue = string(b)
  151. }
  152. return
  153. }
  154. // ContentList 用于返回弹幕列表
  155. /*
  156. */
  157. func (d *Dao) ContentList(ctx context.Context, req *api.ListBulletReq) (res *api.ListBulletReply, err error) {
  158. res = new(api.ListBulletReply)
  159. // 0. 前期准备
  160. // 获取当前oid的最大offset弹幕的offset
  161. oidLastOffset, err := d.lastOffset(ctx, req.Oid)
  162. if err != nil {
  163. log.Warnv(ctx, log.KV("log", "get has more info fail"))
  164. return
  165. }
  166. // 解析cursor
  167. cursor, err := parseCursorValue(ctx, req.CursorNext)
  168. if err != nil {
  169. log.Warnv(ctx, log.KV("log", "parse cursor value fail"))
  170. return
  171. }
  172. // 当两者相等,则说明已经到列表的最后了
  173. if oidLastOffset <= cursor.Offset {
  174. res.HasMore = false
  175. log.Warnw(ctx, "log", "offset already end", "oid_last_offset", oidLastOffset, "cursor_offset", cursor.Offset)
  176. return
  177. }
  178. // 1. 按照条数取SecondMaxNum条,返回数据的offset范围start和end
  179. // 这步是为了保证该次返回至少有条数
  180. startS := cursor.Offset + 1
  181. startS, endS, err := d.getNumBulletTs(ctx, req.Oid, startS, model.SecondMaxNum)
  182. if err != nil {
  183. log.Warnv(ctx, log.KV("log", "get num start bullet fail"))
  184. return
  185. }
  186. log.V(1).Infow(ctx, "log", "get num bullet ts", "start_s", startS, "end_s", endS)
  187. endS += 1
  188. // 2. 根据选择的时间范围获取弹幕
  189. newReq := &api.ListBulletReq{StartMs: startS * 1000, EndMs: endS * 1000, Oid: req.Oid, Mid: req.Mid}
  190. bullets, err := d.ContentGet(ctx, newReq)
  191. if err != nil {
  192. log.Warnv(ctx, log.KV("log", "content get fail: req="+newReq.String()))
  193. return
  194. }
  195. res.List = bullets
  196. // 3. has_more设置,如果offset和最后时间offset相等,那么肯定没有更多弹幕了
  197. if len(bullets) > 0 && oidLastOffset > bullets[len(bullets)-1].Offset {
  198. res.HasMore = true
  199. } else {
  200. res.HasMore = false
  201. }
  202. return
  203. }
  204. func (d *Dao) getNumBulletTs(ctx context.Context, oid int64, startOffset, size int32) (startS, endS int32, err error) {
  205. querySQL := fmt.Sprintf(
  206. "select offset from bullet_content where oid=%d and state=0 and offset>=%d order by offset limit %d",
  207. oid, startOffset, size)
  208. rows, err := d.db.Query(ctx, querySQL)
  209. if err != nil {
  210. log.Errorv(ctx, log.KV("log", fmt.Sprintf("get num bullet from db fail: sql=%s", querySQL)))
  211. return
  212. }
  213. log.V(1).Infow(ctx, "sql", querySQL)
  214. var offset int32
  215. var index int32
  216. for rows.Next() {
  217. if err = rows.Scan(&offset); err != nil {
  218. log.Errorv(ctx, log.KV("log", "scan mysql fail: sql="+querySQL))
  219. return
  220. }
  221. if index == 0 {
  222. startS = offset
  223. }
  224. endS = offset
  225. index++
  226. }
  227. return
  228. }
  229. func (d *Dao) lastOffset(ctx context.Context, oid int64) (lastOffset int32, err error) {
  230. querySQL := fmt.Sprintf("select offset from bullet_content where oid=%d and state=0 order by offset desc limit 1", oid)
  231. row := d.db.QueryRow(ctx, querySQL)
  232. if err = row.Scan(&lastOffset); err != nil {
  233. if err == sql.ErrNoRows {
  234. err = nil
  235. lastOffset = -1
  236. } else {
  237. log.Errorw(ctx, "log", "get has more from db fail", "sql", querySQL, "err", err)
  238. return
  239. }
  240. }
  241. return
  242. }
  243. // CursorValue .
  244. type CursorValue struct {
  245. Offset int32 `json:"offset"`
  246. // level本来是想要用于避免一次选择太少的弹幕,但后面修改策略进行二次查找之后就没这个必要了
  247. //Level int32 `json:"level"`
  248. //duration int32 `json:"duration"`
  249. }
  250. func parseCursorValue(ctx context.Context, cursorValue string) (cursor CursorValue, err error) {
  251. if len(cursorValue) == 0 {
  252. cursor.Offset = -1
  253. //cursor.Level = 1
  254. return
  255. }
  256. if err = json.Unmarshal([]byte(cursorValue), &cursor); err != nil {
  257. log.Errorw(ctx, "log", "unmarshal fail: str="+cursorValue, "err", err)
  258. return
  259. }
  260. return
  261. }
  262. //
  263. //// 这里做了个优化,当对于弹幕数较少的视频,level等级定的高点,在弹幕列表页中就可以选取更长范围的弹幕
  264. //func getCursorLevel(duration int32, num int32) (level int32) {
  265. // numPerSecond := num / duration
  266. // if numPerSecond < 1 {
  267. // level = 10
  268. // } else if numPerSecond < 2 {
  269. // level = 5
  270. // } else if numPerSecond < 5 {
  271. // level = 2
  272. // } else {
  273. // level = 1
  274. // }
  275. // return
  276. //}