video.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. jobmdl "go-common/app/job/main/archive/model/databus"
  6. "go-common/app/job/main/archive/model/retry"
  7. "go-common/app/service/main/archive/model/archive"
  8. "go-common/library/ecode"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. )
  12. func (s *Service) videoupConsumer() {
  13. defer s.waiter.Done()
  14. var msgs = s.videoupSub.Messages()
  15. for {
  16. var (
  17. msg *databus.Message
  18. ok bool
  19. err error
  20. )
  21. if msg, ok = <-msgs; !ok {
  22. log.Error("s.videoupSub.messages closed")
  23. return
  24. }
  25. if s.closeSub {
  26. return
  27. }
  28. msg.Commit()
  29. m := &jobmdl.Videoup{}
  30. if err = json.Unmarshal(msg.Value, m); err != nil {
  31. log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err)
  32. continue
  33. }
  34. log.Info("videoupMessage key(%s) value(%s) start", msg.Key, msg.Value)
  35. if m.Aid <= 0 {
  36. log.Warn("aid(%d) <= 0 WTF(%s)", m.Aid, msg.Value)
  37. continue
  38. }
  39. switch m.Route {
  40. case jobmdl.RouteAutoOpen, jobmdl.RouteDelayOpen, jobmdl.RouteDeleteArchive, jobmdl.RouteSecondRound, jobmdl.RouteFirstRoundForbid, jobmdl.RouteForceSync:
  41. select {
  42. case s.videoupAids[m.Aid%int64(s.c.ChanSize)] <- m.Aid:
  43. default:
  44. rt := &retry.Info{Action: retry.FailResultAdd}
  45. rt.Data.Aid = m.Aid
  46. s.PushFail(context.TODO(), rt)
  47. log.Warn("s.videoupAids is full!!! async databus archive(%d)", m.Aid)
  48. }
  49. }
  50. log.Info("videoupMessage key(%s) value(%s) finish", msg.Key, msg.Value)
  51. }
  52. }
  53. func (s *Service) delVideoCache(aid int64, cids []int64) (err error) {
  54. for _, cid := range cids {
  55. for k, rpc := range s.arcServices {
  56. if err = rpc.DelVideo2(context.TODO(), &archive.ArgVideo2{Aid: aid, Cid: cid}); err != nil {
  57. log.Error("s.arcRpc(%d).DelVideo2(%d, %d) error(%v)", k, aid, cid, err)
  58. if ecode.Cause(err) != ecode.NothingFound {
  59. rt := &retry.Info{Action: retry.FailDelVideoCache}
  60. rt.Data.Aid = aid
  61. rt.Data.Cids = []int64{cid}
  62. s.PushFail(context.TODO(), rt)
  63. log.Error("delVideoCache error(%v)", err)
  64. }
  65. }
  66. }
  67. }
  68. return
  69. }
  70. func (s *Service) upVideoCache(aid int64, cids []int64) (err error) {
  71. for _, cid := range cids {
  72. for k, rpc := range s.arcServices {
  73. if err = rpc.UpVideo2(context.TODO(), &archive.ArgVideo2{Aid: aid, Cid: cid}); err != nil {
  74. log.Error("s.arcRpc(%d).UpVideo2(%d, %d) error(%v)", k, aid, cid, err)
  75. if ecode.Cause(err) != ecode.NothingFound {
  76. rt := &retry.Info{Action: retry.FailUpVideoCache}
  77. rt.Data.Aid = aid
  78. rt.Data.Cids = []int64{cid}
  79. s.PushFail(context.TODO(), rt)
  80. log.Error("upVideoCache error(%v)", err)
  81. }
  82. }
  83. }
  84. }
  85. return
  86. }