cache.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. jobmdl "go-common/app/job/main/archive/model/databus"
  7. "go-common/app/job/main/archive/model/result"
  8. accgrpc "go-common/app/service/main/account/api"
  9. "go-common/app/service/main/archive/api"
  10. arcmdl "go-common/app/service/main/archive/model/archive"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. )
  15. const (
  16. _actForUname = "updateUname"
  17. _actForFace = "updateFace"
  18. _actForAdmin = "updateByAdmin"
  19. )
  20. func (s *Service) cachesubproc() {
  21. defer s.waiter.Done()
  22. var msgs = s.cacheSub.Messages()
  23. for {
  24. var (
  25. msg *databus.Message
  26. ok bool
  27. err error
  28. )
  29. if msg, ok = <-msgs; !ok {
  30. log.Error("s.cachesub.messages closed")
  31. return
  32. }
  33. if s.closeSub {
  34. return
  35. }
  36. m := &jobmdl.Rebuild{}
  37. if err = json.Unmarshal(msg.Value, m); err != nil {
  38. log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err)
  39. continue
  40. }
  41. log.Info("cacheSub key(%s) value(%s) start", msg.Key, msg.Value)
  42. var retryError error
  43. for {
  44. var (
  45. a *result.Archive
  46. infoReply *accgrpc.InfoReply
  47. c = context.TODO()
  48. )
  49. if retryError != nil {
  50. time.Sleep(10 * time.Millisecond)
  51. }
  52. a, retryError = s.resultDao.Archive(c, m.Aid)
  53. if retryError != nil {
  54. log.Error("s.resultDao.Archive(%d) error(%v)", m.Aid, retryError)
  55. continue
  56. }
  57. if a == nil || a.Mid == 0 {
  58. log.Info("cache break archive(%d) not exist or mid==0", m.Aid)
  59. break
  60. }
  61. infoReply, retryError = s.accGRPC.Info3(c, &accgrpc.MidReq{Mid: a.Mid})
  62. if retryError != nil {
  63. if ecode.Cause(retryError).Equal(ecode.MemberNotExist) {
  64. log.Info("archive(%d) mid(%d) not exist", m.Aid, a.Mid)
  65. break
  66. }
  67. log.Error("s.acc.RPC.Info3(%d) error(%v)", m.Aid, retryError)
  68. continue
  69. }
  70. if infoReply == nil {
  71. log.Error("infoReply mid(%d) err is nil,but info is nil too", a.Mid)
  72. break
  73. }
  74. if infoReply.Info.Name == "" || infoReply.Info.Face == "" {
  75. log.Error("empty info mid(%d) info(%+v)", infoReply.Info.Mid, infoReply.Info)
  76. break
  77. }
  78. for k, arcRPC := range s.arcServices {
  79. if retryError = arcRPC.ArcCache2(c, &arcmdl.ArgCache2{Aid: m.Aid, Tp: arcmdl.CacheUpdate}); retryError != nil {
  80. log.Error("s.arcRPC(%d).ArcCache2(%d) error(%v)", k, m.Aid, retryError)
  81. continue
  82. }
  83. }
  84. log.Info("archive(%d) mid(%d) uname(%s) update success", m.Aid, infoReply.Info.Mid, infoReply.Info.Name)
  85. break
  86. }
  87. msg.Commit()
  88. }
  89. }
  90. func (s *Service) accountNotifyproc() {
  91. defer s.waiter.Done()
  92. var msgs = s.accountNotifySub.Messages()
  93. for {
  94. var (
  95. msg *databus.Message
  96. ok bool
  97. err error
  98. c = context.TODO()
  99. )
  100. if msg, ok = <-msgs; !ok {
  101. log.Error("s.cachesub.messages closed")
  102. return
  103. }
  104. if s.closeSub {
  105. return
  106. }
  107. msg.Commit()
  108. m := &jobmdl.AccountNotify{}
  109. if err = json.Unmarshal(msg.Value, m); err != nil {
  110. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  111. continue
  112. }
  113. log.Info("accountNotify got key(%s) value(%s)", msg.Key, msg.Value)
  114. if m.Action != _actForAdmin && m.Action != _actForFace && m.Action != _actForUname {
  115. log.Warn("accountNotify skip action(%s) values(%s)", m.Action, msg.Value)
  116. continue
  117. }
  118. var count int
  119. if count, err = s.arcServices[0].UpCount2(c, &arcmdl.ArgUpCount2{Mid: m.Mid}); err != nil {
  120. log.Error("s.arcRPC.UpCount2(%d) error(%v)", m.Mid, err)
  121. continue
  122. }
  123. if count == 0 {
  124. log.Info("accountNotify mid(%d) passed(%d)", m.Mid, count)
  125. continue
  126. }
  127. if m.Action == _actForAdmin {
  128. // check uname or face is updated
  129. var am []*api.Arc
  130. if am, err = s.arcServices[0].UpArcs3(c, &arcmdl.ArgUpArcs2{Mid: m.Mid, Ps: 2, Pn: 1}); err != nil {
  131. if ecode.Cause(err).Equal(ecode.NothingFound) {
  132. err = nil
  133. log.Info("accountNotify mid(%d) no passed archive", m.Mid)
  134. continue
  135. }
  136. log.Error("accountNotify mid(%d) error(%v)", m.Mid, err)
  137. continue
  138. }
  139. if len(am) == 0 {
  140. log.Info("accountNotify mid(%d) no passed archive", m.Mid)
  141. continue
  142. }
  143. var reply *accgrpc.InfoReply
  144. if reply, err = s.accGRPC.Info3(c, &accgrpc.MidReq{Mid: m.Mid}); err != nil || reply == nil {
  145. log.Error("accountNotify accRPC.info3(%d) error(%v)", m.Mid, err)
  146. continue
  147. }
  148. if reply.Info.Name == am[0].Author.Name && reply.Info.Face == am[0].Author.Face {
  149. log.Info("accountNotify face(%s) name(%s) not change", reply.Info.Face, reply.Info.Name)
  150. continue
  151. }
  152. }
  153. s.notifyMu.Lock()
  154. s.notifyMid[m.Mid] = struct{}{}
  155. s.notifyMu.Unlock()
  156. }
  157. }
  158. func (s *Service) clearMidCache() {
  159. defer s.waiter.Done()
  160. for {
  161. time.Sleep(5 * time.Second)
  162. s.notifyMu.Lock()
  163. mids := s.notifyMid
  164. s.notifyMid = make(map[int64]struct{})
  165. s.notifyMu.Unlock()
  166. for mid := range mids {
  167. s.updateUpperCache(context.TODO(), mid)
  168. }
  169. if s.closeSub && len(s.notifyMid) == 0 {
  170. return
  171. }
  172. }
  173. }
  174. func (s *Service) updateUpperCache(c context.Context, mid int64) (err error) {
  175. // update archive cache
  176. var aids []int64
  177. if aids, err = s.resultDao.UpPassed(c, mid); err != nil {
  178. log.Error("s.resultDao.UpPassed(%d) error(%v)", mid, err)
  179. return
  180. }
  181. failedCnt := 0
  182. for _, aid := range aids {
  183. for k, rpc := range s.arcServices {
  184. if err = rpc.ArcCache2(c, &arcmdl.ArgCache2{Aid: aid}); err != nil {
  185. log.Error("s.arcRPC(%d).ArcCache2(%d) mid(%d) error(%v)", k, aid, mid, err)
  186. failedCnt++
  187. }
  188. }
  189. }
  190. if failedCnt > 0 {
  191. log.Error("accountNotify updateUpperCache mid(%d) failed(%d)", mid, failedCnt)
  192. return
  193. }
  194. log.Info("accountNofity updateUpperCache mid(%d) successed(%d)", mid, len(aids))
  195. return
  196. }