dm_seg_v2.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "sync"
  8. "go-common/app/interface/main/dm2/model"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. "go-common/library/sync/errgroup"
  12. )
  13. // DM dm list.
  14. func (s *Service) DM(c context.Context, tp int32, aid, oid int64) (res *model.DMSeg, err error) {
  15. var (
  16. total, size int64 = 1, model.DefaultVideoEnd
  17. mu sync.Mutex
  18. )
  19. sub, err := s.subject(c, tp, oid)
  20. if err != nil {
  21. return
  22. }
  23. res = &model.DMSeg{Elems: make([]*model.Elem, 0, 2*sub.Maxlimit)}
  24. duration, err := s.videoDuration(c, aid, oid)
  25. if err != nil {
  26. return
  27. }
  28. if duration != 0 {
  29. total = int64(math.Ceil(float64(duration) / float64(model.DefaultPageSize)))
  30. size = model.DefaultPageSize
  31. }
  32. g, ctx := errgroup.WithContext(c)
  33. for i := int64(1); i <= total; i++ {
  34. num := i
  35. g.Go(func() (err error) {
  36. var dmseg *model.DMSeg
  37. if dmseg, err = s.dao.DMSegCache(ctx, tp, oid, total, num); err != nil {
  38. return
  39. }
  40. if dmseg == nil {
  41. ps := (num - 1) * size
  42. pe := num * size
  43. fmt.Println(ps, pe, total, num)
  44. if dmseg, err = s.dmSegV2(ctx, sub, total, num, ps, pe); err != nil {
  45. return
  46. }
  47. }
  48. if dmseg != nil {
  49. mu.Lock()
  50. res.Elems = append(res.Elems, dmseg.Elems...)
  51. mu.Unlock()
  52. }
  53. return
  54. })
  55. }
  56. err = g.Wait()
  57. return
  58. }
  59. // DMSegV2 dm segment new.
  60. func (s *Service) DMSegV2(c context.Context, tp int32, mid, aid, oid, pn int64, plat int32) (res *model.DMSegResp, err error) {
  61. page, err := s.pageinfo(c, tp, aid, oid, pn)
  62. if err != nil {
  63. return
  64. }
  65. ps := (page.Num - 1) * page.Size
  66. pe := page.Num * page.Size
  67. sub, err := s.subject(c, tp, oid)
  68. if err != nil {
  69. return
  70. }
  71. res = &model.DMSegResp{
  72. Flag: model.DefaultFlag,
  73. }
  74. if sub.State == model.SubStateClosed {
  75. return
  76. }
  77. flag, err := s.dao.RecFlag(c, mid, aid, oid, 2*sub.Maxlimit, ps, pe, plat)
  78. if err == nil {
  79. res.Flag = flag
  80. }
  81. dmseg, err := s.dao.DMSegCache(c, tp, oid, page.Total, page.Num)
  82. if err != nil {
  83. return
  84. }
  85. if dmseg != nil {
  86. res.Dms = dmseg.Elems
  87. return
  88. }
  89. if dmseg, err = s.dmSegV2(c, sub, page.Total, page.Num, ps, pe); err != nil {
  90. return
  91. }
  92. res.Dms = dmseg.Elems
  93. s.cache.Do(c, func(ctx context.Context) {
  94. s.dao.SetDMSegCache(ctx, tp, oid, page.Total, page.Num, dmseg) // add mc cache
  95. })
  96. return
  97. }
  98. func (s *Service) dmSegV2(c context.Context, sub *model.Subject, total, num, ps, pe int64) (res *model.DMSeg, err error) {
  99. var (
  100. cache = true
  101. limit = 2 * sub.Maxlimit
  102. dmids = make([]int64, 0, limit)
  103. )
  104. res = &model.DMSeg{Elems: make([]*model.Elem, 0, limit)}
  105. normalIds, err := s.dmNormalIds(c, sub.Type, sub.Oid, total, num, ps, pe, limit)
  106. if err != nil {
  107. return
  108. }
  109. dmids = append(dmids, normalIds...)
  110. if sub.Childpool > 0 {
  111. var subtitleIds []int64
  112. if subtitleIds, err = s.dmSegSubtitlesIds(c, sub.Type, sub.Oid, ps, pe, limit); err != nil {
  113. return
  114. }
  115. dmids = append(dmids, subtitleIds...)
  116. }
  117. if len(dmids) <= 0 {
  118. return
  119. }
  120. elemsCache, missed, err := s.dao.IdxContentCacheV2(c, sub.Type, sub.Oid, dmids)
  121. if err != nil {
  122. missed = dmids
  123. cache = false
  124. } else {
  125. res.Elems = append(res.Elems, elemsCache...)
  126. }
  127. if len(missed) == 0 {
  128. return
  129. }
  130. dms, err := s.dmsSeg(c, sub.Type, sub.Oid, missed)
  131. if err != nil {
  132. return
  133. }
  134. for _, dm := range dms {
  135. if e := dm.ToElem(); e != nil {
  136. res.Elems = append(res.Elems, e)
  137. }
  138. }
  139. if cache && len(dms) > 0 {
  140. s.cache.Do(c, func(ctx context.Context) {
  141. s.dao.AddIdxContentCache(ctx, sub.Type, sub.Oid, dms, false) // add memcache,realname=false
  142. })
  143. }
  144. return
  145. }
  146. // pageinfo get page info of oid.
  147. func (s *Service) pageinfo(c context.Context, tp int32, aid, oid, pn int64) (p *model.Page, err error) {
  148. var duration int64
  149. data, ok := s.localCache[keyDuration(tp, oid)]
  150. if ok {
  151. duration, err = strconv.ParseInt(string(data), 10, 64)
  152. } else {
  153. duration, err = s.videoDuration(c, aid, oid)
  154. }
  155. if err != nil {
  156. return
  157. }
  158. if duration == 0 {
  159. p = &model.Page{
  160. Num: pn,
  161. Size: model.DefaultVideoEnd,
  162. Total: 1,
  163. }
  164. } else {
  165. p = &model.Page{
  166. Num: pn,
  167. Size: model.DefaultPageSize,
  168. Total: int64(math.Ceil(float64(duration) / float64(model.DefaultPageSize))),
  169. }
  170. }
  171. if pn > p.Total {
  172. log.Warn("oid:%d pn:%d larger than total page:%d", oid, pn, p.Total)
  173. err = ecode.NotModified
  174. return
  175. }
  176. return
  177. }