mask.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/dm2/model"
  6. "go-common/app/service/main/archive/api"
  7. archiveMdl "go-common/app/service/main/archive/model/archive"
  8. "go-common/library/log"
  9. )
  10. var (
  11. _maskJobDay = []int32{3, 7}
  12. )
  13. // maskProc .
  14. func (s *Service) maskProc() {
  15. var (
  16. err error
  17. c = context.Background()
  18. ticker *time.Ticker
  19. )
  20. if s.conf.MaskCate == nil {
  21. return
  22. }
  23. ticker = time.NewTicker(time.Duration(s.conf.MaskCate.Interval))
  24. for range ticker.C {
  25. if err = s.maskSchedule(c); err != nil {
  26. log.Error("maskProc.error(%v)", err)
  27. continue
  28. }
  29. }
  30. }
  31. func (s *Service) maskSchedule(c context.Context) (err error) {
  32. var (
  33. ok bool
  34. now = time.Now()
  35. expire = now.Add(time.Duration(s.conf.MaskCate.Interval))
  36. expireStr = expire.Format(time.RFC3339)
  37. oldExpireStr, oldExpireGetSetStr string
  38. oldExpire time.Time
  39. )
  40. if ok, err = s.dao.SetnxMaskJob(c, expireStr); err != nil {
  41. return
  42. }
  43. // redis中不存在
  44. if ok {
  45. if err = s.maskJob(c); err != nil {
  46. s.dao.DelMaskJob(c)
  47. log.Error("maskJob,error(%v)", err)
  48. return
  49. }
  50. return
  51. }
  52. // redis中已经存在
  53. // 判断是否过期了
  54. if oldExpireStr, err = s.dao.GetMaskJob(c); err != nil {
  55. return
  56. }
  57. if oldExpire, err = time.Parse(time.RFC3339, oldExpireStr); err != nil {
  58. return
  59. }
  60. if oldExpire.Sub(now) > 0 {
  61. return
  62. }
  63. if oldExpireGetSetStr, err = s.dao.GetSetMaskJob(c, expireStr); err != nil {
  64. return
  65. }
  66. if oldExpireGetSetStr != oldExpireStr {
  67. return
  68. }
  69. if err = s.maskJob(c); err != nil {
  70. s.dao.DelMaskJob(c)
  71. log.Error("maskJob,error(%v)", err)
  72. return
  73. }
  74. return
  75. }
  76. // 执行任务
  77. func (s *Service) maskJob(c context.Context) (err error) {
  78. for _, tid := range s.conf.MaskCate.Tids {
  79. if err = s.maskOneCate(c, tid); err != nil {
  80. log.Error("maskOneCate(tid:%v),error(%v)", tid, err)
  81. return
  82. }
  83. }
  84. return
  85. }
  86. func (s *Service) maskOneCate(c context.Context, tid int64) (err error) {
  87. var (
  88. err1 error
  89. resp *model.RankRecentResp
  90. aids []int64
  91. )
  92. for _, day := range _maskJobDay {
  93. if resp, err = s.dao.RankList(c, tid, day); err != nil {
  94. log.Error("RankList(tid:%v,day:%v),error(%v)", tid, day, err)
  95. return
  96. }
  97. for idx, recentRegion := range resp.List {
  98. if idx >= s.conf.MaskCate.Limit {
  99. break
  100. }
  101. aids = append(aids, recentRegion.Aid)
  102. for _, other := range recentRegion.Others {
  103. aids = append(aids, other.Aid)
  104. }
  105. }
  106. }
  107. for _, aid := range aids {
  108. if err1 = s.maskOneArchive(c, aid); err1 != nil {
  109. log.Error("maskOneArchive.err aid:%v,error(%v)", aid, err1)
  110. continue
  111. }
  112. log.Info("maskOneArchive.ok aid:%v", aid)
  113. }
  114. return
  115. }
  116. func (s *Service) maskOneArchive(c context.Context, aid int64) (err error) {
  117. var (
  118. pages []*api.Page
  119. )
  120. if pages, err = s.arcRPC.Page3(c, &archiveMdl.ArgAid2{Aid: aid}); err != nil {
  121. log.Error("s.arcRPC.Page3(aid:%v),error(%v)", aid, err)
  122. return
  123. }
  124. for _, page := range pages {
  125. if err = s.maskOneVideo(c, page.Cid); err != nil {
  126. log.Error("maskOneVideo(oid:%v),error(%v)", page.Cid, err)
  127. return
  128. }
  129. }
  130. return
  131. }
  132. // runGenMask send to gen mask url
  133. func (s *Service) maskOneVideo(c context.Context, oid int64) (err error) {
  134. var (
  135. subject *model.Subject
  136. archive3 *api.Arc
  137. err1 error
  138. duration int64
  139. typeID int32
  140. )
  141. if subject, err = s.subject(c, model.SubTypeVideo, oid); err != nil {
  142. log.Error("s.subject(oid:%v),error(%v)", oid, err)
  143. return
  144. }
  145. if subject.AttrVal(model.AttrSubMaskOpen) == model.AttrYes {
  146. return
  147. }
  148. if archive3, err1 = s.arcRPC.Archive3(c, &archiveMdl.ArgAid2{Aid: subject.Pid}); err1 == nil && archive3 != nil {
  149. duration = archive3.Duration
  150. typeID = archive3.TypeID
  151. }
  152. if err = s.dao.GenerateMask(c, oid, subject.Mid, model.MaskPlatAll, model.MaskPriorityLow, subject.Pid, duration, typeID); err != nil {
  153. log.Error("GenerateMask(oid:%v),error(%v)", oid, err)
  154. return
  155. }
  156. subject.AttrSet(model.AttrYes, model.AttrSubMaskOpen)
  157. if _, err = s.dao.UpdateSubAttr(c, subject.Type, subject.Oid, subject.Attr); err != nil {
  158. log.Error("UpdateSubAttr(oid:%v,attr:%v),error(%v)", oid, subject.Attr, err)
  159. return
  160. }
  161. return
  162. }
  163. func (s *Service) maskMidProc() {
  164. var (
  165. c = context.Background()
  166. mids []int64
  167. err error
  168. )
  169. ticker := time.NewTicker(time.Minute * 5)
  170. defer ticker.Stop()
  171. for range ticker.C {
  172. if mids, err = s.dao.MaskMids(c); err != nil {
  173. continue
  174. }
  175. s.maskMid = mids
  176. log.Info("update mask mid(%v)", s.maskMid)
  177. }
  178. }