account.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/archive-shjd/model"
  7. accmdl "go-common/app/service/main/account/model"
  8. "go-common/app/service/main/archive/api"
  9. arcmdl "go-common/app/service/main/archive/model/archive"
  10. "go-common/library/ecode"
  11. "go-common/library/log"
  12. "go-common/library/queue/databus"
  13. )
  14. const (
  15. _actForUname = "updateUname"
  16. _actForFace = "updateFace"
  17. _actForAdmin = "updateByAdmin"
  18. )
  19. func (s *Service) accountNotifyproc() {
  20. defer s.waiter.Done()
  21. var msgs = s.accountNotify.Messages()
  22. for {
  23. var (
  24. msg *databus.Message
  25. ok bool
  26. err error
  27. c = context.TODO()
  28. )
  29. if msg, ok = <-msgs; !ok {
  30. log.Error("s.cachesub.messages closed")
  31. return
  32. }
  33. msg.Commit()
  34. m := &model.AccountNotify{}
  35. if err = json.Unmarshal(msg.Value, m); err != nil {
  36. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  37. continue
  38. }
  39. log.Info("accountNotify got key(%s) value(%s)", msg.Key, msg.Value)
  40. if m.Action != _actForAdmin && m.Action != _actForFace && m.Action != _actForUname {
  41. log.Warn("accountNotify skip action(%s) values(%s)", m.Action, msg.Value)
  42. continue
  43. }
  44. var count int
  45. if count, err = s.arcRPCs["group1"].UpCount2(c, &arcmdl.ArgUpCount2{Mid: m.Mid}); err != nil {
  46. log.Error("s.arcRPC.UpCount2(%d) error(%v)", m.Mid, err)
  47. continue
  48. }
  49. if count == 0 {
  50. log.Info("accountNotify mid(%d) passed(%d)", m.Mid, count)
  51. continue
  52. }
  53. if m.Action == _actForAdmin {
  54. // check uname or face is updated
  55. var am []*api.Arc
  56. if am, err = s.arcRPCs["group1"].UpArcs3(c, &arcmdl.ArgUpArcs2{Mid: m.Mid, Ps: 2, Pn: 1}); err != nil {
  57. if ecode.Cause(err).Equal(ecode.NothingFound) {
  58. err = nil
  59. log.Info("accountNotify mid(%d) no passed archive", m.Mid)
  60. continue
  61. }
  62. log.Error("accountNotify mid(%d) error(%v)", m.Mid, err)
  63. continue
  64. }
  65. if len(am) == 0 {
  66. log.Info("accountNotify mid(%d) no passed archive", m.Mid)
  67. continue
  68. }
  69. var info *accmdl.Info
  70. if info, err = s.accRPC.Info3(c, &accmdl.ArgMid{Mid: m.Mid}); err != nil {
  71. log.Error("accountNotify accRPC.info3(%d) error(%v)", m.Mid, err)
  72. continue
  73. }
  74. if info.Name == am[0].Author.Name && info.Face == am[0].Author.Face {
  75. log.Info("accountNotify face(%s) name(%s) not change", info.Face, info.Name)
  76. continue
  77. }
  78. }
  79. s.notifyMu.Lock()
  80. s.notifyMid[m.Mid] = struct{}{}
  81. s.notifyMu.Unlock()
  82. }
  83. }
  84. func (s *Service) clearMidCache() {
  85. defer s.waiter.Done()
  86. for {
  87. time.Sleep(5 * time.Second)
  88. s.notifyMu.Lock()
  89. mids := s.notifyMid
  90. s.notifyMid = make(map[int64]struct{})
  91. s.notifyMu.Unlock()
  92. log.Info("start clearMidCache mids(%d)", len(mids))
  93. for mid := range mids {
  94. s.updateUpperCache(context.TODO(), mid)
  95. }
  96. log.Info("finish clearMidCache mids(%d)", len(mids))
  97. if s.close && len(s.notifyMid) == 0 {
  98. return
  99. }
  100. }
  101. }
  102. func (s *Service) updateUpperCache(c context.Context, mid int64) (err error) {
  103. failedCnt := 0
  104. for k, rpc := range s.arcRPCs {
  105. pn := 1
  106. for {
  107. var arcs []*api.Arc
  108. if arcs, err = rpc.UpArcs3(c, &arcmdl.ArgUpArcs2{Mid: mid, Pn: pn}); err != nil {
  109. log.Error("rpc(%s) UpArcs3(%d) error(%v)", k, mid, err)
  110. break
  111. }
  112. pn++
  113. if len(arcs) == 0 {
  114. break
  115. }
  116. for _, arc := range arcs {
  117. if err = rpc.ArcCache2(c, &arcmdl.ArgCache2{Aid: arc.Aid, Tp: arcmdl.CacheUpdate}); err != nil {
  118. log.Error("s.arcRPC(%d).ArcCache2(%d, %s) mid(%d) error(%v)", k, arc.Aid, arcmdl.CacheUpdate, mid, err)
  119. failedCnt++
  120. continue
  121. }
  122. }
  123. }
  124. }
  125. if failedCnt > 0 {
  126. log.Error("accountNotify updateUpperCache mid(%d) failed(%d)", mid, failedCnt)
  127. return
  128. }
  129. log.Info("accountNofity updateUpperCache mid(%d)", mid)
  130. return
  131. }