action.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/dm2/model"
  6. "go-common/library/log"
  7. )
  8. func (s *Service) actionAct(c context.Context, act *model.Action) (err error) {
  9. switch act.Action {
  10. case model.ActFlushDM:
  11. fc := new(model.Flush)
  12. if err = json.Unmarshal(act.Data, &fc); err != nil {
  13. log.Error("json.Unmarshal(%s) error(%v)", act.Data, err)
  14. return
  15. }
  16. s.asyncAddFlushDM(c, fc)
  17. case model.ActFlushDMSeg:
  18. fc := new(model.FlushDMSeg)
  19. if err = json.Unmarshal(act.Data, &fc); err != nil {
  20. log.Error("json.Unmarshal(%s) error(%v)", act.Data, err)
  21. return
  22. }
  23. if fc.Page == nil {
  24. log.Error("s.ActFlushDMSeg(+%v) error page nil", fc)
  25. return
  26. }
  27. // async flush cache
  28. s.asyncAddFlushDMSeg(c, fc)
  29. case model.ActAddDM:
  30. var (
  31. dm = &model.DM{}
  32. sub *model.Subject
  33. )
  34. if err = json.Unmarshal(act.Data, &dm); err != nil {
  35. log.Error("json.Unmarshal(%s) error(%v)", act.Data, err)
  36. return
  37. }
  38. if sub, err = s.subject(c, dm.Type, dm.Oid); err != nil {
  39. return
  40. }
  41. if err = s.actionAddDM(c, sub, dm); err != nil {
  42. log.Error("s.actionAddDM(+%v) error(%v)", dm, err)
  43. return
  44. }
  45. if dm.State == model.StateNormal || dm.State == model.StateMonitorAfter {
  46. // 1. 创作中心最新1000条弹幕
  47. s.asyncAddRecent(c, dm)
  48. // 2. 刷新全段弹幕,NOTE 忽略redis缓存报错
  49. if ok, _ := s.dao.ExpireDMCache(c, dm.Type, dm.Oid); ok {
  50. s.dao.AddDMCache(c, dm)
  51. }
  52. s.asyncAddFlushDM(c, &model.Flush{
  53. Type: dm.Type,
  54. Oid: dm.Oid,
  55. Force: false,
  56. })
  57. // 3. 刷新分段弹幕缓存,NOTE 忽略redis缓存报错
  58. var p *model.Page
  59. if p, err = s.pageinfo(c, sub.Pid, dm); err != nil {
  60. return
  61. }
  62. switch dm.Pool {
  63. case model.PoolNormal:
  64. if ok, _ := s.dao.ExpireDMID(c, dm.Type, dm.Oid, p.Total, p.Num); ok {
  65. s.dao.AddDMIDCache(c, dm.Type, dm.Oid, p.Total, p.Num, dm.ID)
  66. }
  67. case model.PoolSubtitle:
  68. if ok, _ := s.dao.ExpireDMIDSubtitle(c, dm.Type, dm.Oid); ok {
  69. s.dao.AddDMIDSubtitleCache(c, dm.Type, dm.Oid, dm)
  70. }
  71. case model.PoolSpecial:
  72. if err = s.specialLocationUpdate(c, dm.Type, dm.Oid); err != nil {
  73. return
  74. }
  75. // TODO add cache
  76. default:
  77. return
  78. }
  79. s.dao.AddIdxContentCaches(c, dm.Type, dm.Oid, dm)
  80. s.asyncAddFlushDMSeg(c, &model.FlushDMSeg{
  81. Type: dm.Type,
  82. Oid: dm.Oid,
  83. Force: false,
  84. Page: p,
  85. })
  86. }
  87. s.bnjDmCount(c, sub, dm)
  88. }
  89. return
  90. }
  91. func (s *Service) actionFlushDM(c context.Context, tp int32, oid int64, force bool) (err error) {
  92. sub, err := s.subject(c, tp, oid)
  93. if err != nil {
  94. return
  95. }
  96. if force {
  97. s.dao.DelDMCache(c, tp, oid) // delete redis cache,ignore error
  98. }
  99. xml, err := s.genXML(c, sub) // generate xml from redis or database
  100. if err != nil {
  101. log.Error("s.genXML(%d) error(%v)", oid, err)
  102. return
  103. }
  104. data, err := s.gzflate(xml, 4)
  105. if err != nil {
  106. log.Error("s.gzflate(type:%d,oid:%d) error(%v)", tp, oid, err)
  107. return
  108. }
  109. if err = s.dao.AddXMLCache(c, sub.Oid, data); err != nil {
  110. return
  111. }
  112. log.Info("actionFlushDM type:%d,oid:%d fore:%v", tp, oid, force)
  113. return
  114. }
  115. // actionAddDM add dm index and content to db by transaction.
  116. func (s *Service) actionAddDM(c context.Context, sub *model.Subject, dm *model.DM) (err error) {
  117. tx, err := s.dao.BeginTran(c)
  118. if err != nil {
  119. return
  120. }
  121. // special dm
  122. if dm.Pool == model.PoolSpecial && dm.ContentSpe != nil {
  123. if _, err = s.dao.TxAddContentSpecial(tx, dm.ContentSpe); err != nil {
  124. return tx.Rollback()
  125. }
  126. }
  127. if _, err = s.dao.TxAddContent(tx, dm.Oid, dm.Content); err != nil {
  128. return tx.Rollback()
  129. }
  130. if _, err = s.dao.TxAddIndex(tx, dm); err != nil {
  131. return tx.Rollback()
  132. }
  133. if dm.State == model.StateMonitorBefore || dm.State == model.StateMonitorAfter {
  134. if _, err = s.dao.TxIncrSubMCount(tx, dm.Type, dm.Oid); err != nil {
  135. return tx.Rollback()
  136. }
  137. }
  138. var count int64
  139. if dm.State == model.StateNormal || dm.State == model.StateMonitorAfter || dm.State == model.StateHide {
  140. count = 1
  141. if sub.Childpool == model.PoolNormal && dm.Pool != model.PoolNormal {
  142. sub.Childpool = 1
  143. }
  144. }
  145. if _, err = s.dao.TxIncrSubjectCount(tx, sub.Type, sub.Oid, 1, count, sub.Childpool); err != nil {
  146. return tx.Rollback()
  147. }
  148. return tx.Commit()
  149. }
  150. // actionFlushXMLDmSeg flush xml dm seg
  151. func (s *Service) actionFlushXMLDmSeg(c context.Context, tp int32, oid int64, p *model.Page, force bool) (err error) {
  152. var (
  153. sub *model.Subject
  154. duration int64
  155. seg *model.Segment
  156. )
  157. if sub, err = s.subject(c, tp, oid); err != nil {
  158. return
  159. }
  160. if force {
  161. if err = s.dao.DelDMIDCache(c, tp, oid, p.Total, p.Num); err != nil {
  162. return
  163. }
  164. if sub.Childpool > 0 {
  165. s.dao.DelDMIDSubtitleCache(c, tp, oid)
  166. }
  167. }
  168. if duration, err = s.videoDuration(c, sub.Pid, sub.Oid); err != nil {
  169. return
  170. }
  171. ps, _ := model.SegmentPoint(p.Num, duration)
  172. if seg, err = s.segmentInfo(c, sub.Pid, sub.Oid, ps, duration); err != nil {
  173. return
  174. }
  175. res, err := s.dmSegXML(c, sub, seg)
  176. if err != nil {
  177. return
  178. }
  179. if err = s.dao.SetXMLSegCache(c, tp, oid, seg.Cnt, seg.Num, res); err != nil {
  180. return
  181. }
  182. log.Info("actionFlushXMLDmSeg type:%d,oid:%d,seg:%+v", tp, oid, seg)
  183. return
  184. }
  185. func (s *Service) flushDmSegCache(c context.Context, fc *model.FlushDMSeg) (err error) {
  186. if fc.Page == nil {
  187. return
  188. }
  189. if err = s.actionFlushXMLDmSeg(c, fc.Type, fc.Oid, fc.Page, fc.Force); err != nil {
  190. return
  191. }
  192. return
  193. }
  194. func (s *Service) flushDmCache(c context.Context, fc *model.Flush) (err error) {
  195. if err = s.actionFlushDM(c, fc.Type, fc.Oid, fc.Force); err != nil {
  196. return
  197. }
  198. if err = s.dao.DelAjaxDMCache(c, fc.Oid); err != nil {
  199. return
  200. }
  201. return
  202. }