cache.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package service
  2. import (
  3. "context"
  4. "strconv"
  5. "go-common/app/job/main/archive-shjd/model"
  6. arcmdl "go-common/app/service/main/archive/model/archive"
  7. "go-common/library/ecode"
  8. "go-common/library/log"
  9. "github.com/pkg/errors"
  10. )
  11. // UpdateCache is
  12. func (s *Service) UpdateCache(old *model.Archive, nw *model.Archive, action string) (err error) {
  13. defer func() {
  14. if err == nil {
  15. s.notifyPub.Send(context.Background(), strconv.FormatInt(nw.AID, 10), &model.Notify{Table: _tableArchive, Nw: nw, Old: old, Action: action})
  16. return
  17. }
  18. // retry
  19. item := &model.RetryItem{
  20. Old: old,
  21. Nw: nw,
  22. Tp: model.TypeForUpdateArchive,
  23. Action: action,
  24. }
  25. if err1 := s.PushItem(context.TODO(), item); err1 != nil {
  26. log.Error("s.PushItem(%+v) error(%+v)", item, err1)
  27. return
  28. }
  29. }()
  30. args := &arcmdl.ArgCache2{}
  31. args.Aid = nw.AID
  32. args.Tp = arcmdl.CacheUpdate
  33. if old == nil {
  34. // insert
  35. if nw.State >= 0 {
  36. args.Tp = arcmdl.CacheAdd
  37. }
  38. } else {
  39. if nw.State >= 0 {
  40. args.Tp = arcmdl.CacheAdd
  41. } else {
  42. args.Tp = arcmdl.CacheDelete
  43. }
  44. if nw.Mid != old.Mid {
  45. args.OldMid = old.Mid
  46. }
  47. if old.TypeID != nw.TypeID {
  48. fieldAgs := &arcmdl.ArgFieldCache2{Aid: nw.AID, TypeID: nw.TypeID, OldTypeID: old.TypeID}
  49. for cluster, arc := range s.arcRPCs {
  50. if err = arc.ArcFieldCache2(context.TODO(), fieldAgs); err != nil {
  51. log.Error("s.arcRPC.ArcFieldCache2(%s, %+v) error(%+v)", cluster, fieldAgs, err)
  52. return
  53. }
  54. }
  55. }
  56. }
  57. for cluster, arc := range s.arcRPCs {
  58. if err = arc.ArcCache2(context.TODO(), args); err != nil {
  59. log.Error("s.arcRPC.ArcCache2(%s,%+v) error(%v)", cluster, args, err)
  60. return
  61. }
  62. }
  63. return
  64. }
  65. // UpdateVideoCache is
  66. func (s *Service) UpdateVideoCache(aid, cid int64) (err error) {
  67. defer func() {
  68. if err == nil {
  69. return
  70. }
  71. // retry
  72. item := &model.RetryItem{
  73. AID: aid,
  74. CID: cid,
  75. Tp: model.TypeForUpdateVideo,
  76. }
  77. if err1 := s.PushItem(context.TODO(), item); err1 != nil {
  78. log.Error("s.PushItem(%+v) error(%+v)", item, err1)
  79. return
  80. }
  81. }()
  82. for cluster, arc := range s.arcRPCs {
  83. if err = arc.UpVideo2(context.TODO(), &arcmdl.ArgVideo2{Aid: aid, Cid: cid}); err != nil {
  84. if ecode.Cause(err).Equal(ecode.NothingFound) {
  85. err = nil
  86. return
  87. }
  88. err = errors.Wrapf(err, "s.arcRPC.UpVideo2 cluster(%s)", cluster)
  89. }
  90. }
  91. return
  92. }
  93. // DelteVideoCache del video cache
  94. func (s *Service) DelteVideoCache(aid, cid int64) (err error) {
  95. defer func() {
  96. if err == nil {
  97. return
  98. }
  99. // retry
  100. item := &model.RetryItem{
  101. AID: aid,
  102. CID: cid,
  103. Tp: model.TypeForDelVideo,
  104. }
  105. if err1 := s.PushItem(context.TODO(), item); err1 != nil {
  106. log.Error("s.PushItem(%+v) error(%+v)", item, err1)
  107. return
  108. }
  109. }()
  110. for cluster, arc := range s.arcRPCs {
  111. if err = arc.DelVideo2(context.TODO(), &arcmdl.ArgVideo2{Aid: aid, Cid: cid}); err != nil {
  112. if ecode.Cause(err).Equal(ecode.NothingFound) {
  113. err = nil
  114. return
  115. }
  116. err = errors.Wrapf(err, "s.arcRPC.VdelVideo2 cluster(%s)", cluster)
  117. }
  118. }
  119. return
  120. }