achive.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/job/main/feed/dao"
  8. "go-common/app/job/main/feed/model"
  9. feedmdl "go-common/app/service/main/feed/model"
  10. "go-common/library/ecode"
  11. "go-common/library/log"
  12. )
  13. const (
  14. _insertAct = "insert"
  15. _updateAct = "update"
  16. _retryTimes = 10
  17. _retrySleep = time.Second
  18. )
  19. func retry(_retryTimes int, sleep time.Duration, callback func() error) (err error) {
  20. for i := 0; i < _retryTimes; i++ {
  21. err = callback()
  22. if err == nil {
  23. return
  24. }
  25. if ecode.Cause(err) == ecode.NothingFound {
  26. err = nil
  27. return
  28. }
  29. time.Sleep(sleep)
  30. log.Error("retrying after error: %v", err)
  31. }
  32. return fmt.Errorf("after %d _retryTimes, last error: %s", _retryTimes, err)
  33. }
  34. func (s *Service) archiveUpdate(action string, nwMsg []byte, oldMsg []byte) {
  35. var err error
  36. nw := &model.Archive{}
  37. if err = json.Unmarshal(nwMsg, nw); err != nil {
  38. dao.PromError("archive:解析新稿件")
  39. log.Error("json.Unmarshal(%s) error(%v)", string(nwMsg), err)
  40. return
  41. }
  42. switch action {
  43. case _insertAct:
  44. s.insert(nw)
  45. case _updateAct:
  46. old := &model.Archive{}
  47. if err = json.Unmarshal(oldMsg, old); err != nil {
  48. dao.PromError("archive:解析旧稿件")
  49. log.Error("json.Unmarshal(%s) error(%v)", string(oldMsg), err)
  50. return
  51. }
  52. s.update(nw, old)
  53. }
  54. }
  55. func (s *Service) insert(nw *model.Archive) {
  56. var (
  57. err error
  58. sleep = time.Second
  59. ts int64
  60. )
  61. if !nw.IsNormal() {
  62. return
  63. }
  64. if ts, err = transTime(nw.PubTime); err != nil {
  65. dao.PromError("archive:插入时解析时间")
  66. log.Error("s.insert.transTime(%v) err: %v", nw.PubTime, err)
  67. err = nil
  68. }
  69. if err = retry(_retryTimes, sleep, func() error {
  70. return s.feedRPC.AddArc(context.TODO(), &feedmdl.ArgArc{Aid: nw.ID, Mid: nw.Mid, PubDate: ts})
  71. }); err != nil {
  72. dao.PromError("archive:增加新稿件")
  73. log.Error("s.feedRPC.AddArc(%v)", nw.ID)
  74. } else {
  75. dao.PromInfo("archive:增加新稿件")
  76. log.Info("feed: Archive(%v) passed, new_arc: %+v.", nw.ID, *nw)
  77. }
  78. }
  79. func (s *Service) update(nw *model.Archive, old *model.Archive) {
  80. var (
  81. err error
  82. pubDate int64
  83. )
  84. if old.Mid != nw.Mid {
  85. arg := &feedmdl.ArgChangeUpper{Aid: nw.ID, OldMid: old.Mid, NewMid: nw.Mid}
  86. if err = retry(_retryTimes, _retrySleep, func() error { return s.feedRPC.ChangeArcUpper(context.TODO(), arg) }); err != nil {
  87. dao.PromError("archive:修改up主")
  88. log.Error("s.feedRPC.ChangeArcUpper(%v) error(%v)", arg.Aid, err)
  89. } else {
  90. dao.PromInfo("archive:修改up主")
  91. log.Info("feed: Archive(%v) change upper, old: %+v new: %+v", old.ID, *old, *nw)
  92. }
  93. }
  94. if nw.IsNormal() && ((old.PubTime != nw.PubTime) || (old.State != nw.State)) {
  95. if pubDate, err = transTime(nw.PubTime); err != nil {
  96. dao.PromError("archive:解析发布时间")
  97. log.Error("s.Feed ParsePubTime(%v) error(%v)", nw.PubTime, err)
  98. err = nil
  99. }
  100. if err = retry(_retryTimes, _retrySleep, func() error {
  101. return s.feedRPC.AddArc(context.TODO(), &feedmdl.ArgArc{Aid: nw.ID, PubDate: pubDate, Mid: nw.Mid})
  102. }); err != nil {
  103. dao.PromError("archive:增加新稿件")
  104. log.Error("s.feedRPC.AddArc error(%v) old_arc: %+v, new_arc: %+v.", err, nw.ID, *old, *nw)
  105. } else {
  106. dao.PromInfo("archive:增加新稿件")
  107. log.Info("feed: Archive(%v) passed, old_arc: %+v, new_arc: %+v.", nw.ID, *old, *nw)
  108. }
  109. return
  110. }
  111. if (old.State != nw.State) && !nw.IsNormal() {
  112. if err = retry(_retryTimes, _retrySleep, func() error { return s.feedRPC.DelArc(context.TODO(), &feedmdl.ArgAidMid{Aid: nw.ID, Mid: nw.Mid}) }); err != nil {
  113. dao.PromError("archive:删除稿件")
  114. log.Error("s.feedRPC.DelArc(%v) error(%v)", nw.ID, err)
  115. } else {
  116. dao.PromInfo("archive:删除稿件")
  117. log.Info("feed: Archive(%v) not passed, old_arc: %+v, new_arc:%+v.", nw.ID, *old, *nw)
  118. }
  119. }
  120. }
  121. func transTime(t string) (res int64, err error) {
  122. var ts time.Time
  123. if ts, err = time.Parse("2006-01-02 15:04:05 MST", t+" CST"); err != nil {
  124. return
  125. }
  126. res = ts.Unix()
  127. return
  128. }