123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package service
- import (
- "context"
- "encoding/json"
- "time"
- jobmdl "go-common/app/job/main/archive/model/databus"
- "go-common/app/job/main/archive/model/result"
- accgrpc "go-common/app/service/main/account/api"
- "go-common/app/service/main/archive/api"
- arcmdl "go-common/app/service/main/archive/model/archive"
- "go-common/library/ecode"
- "go-common/library/log"
- "go-common/library/queue/databus"
- )
- const (
- _actForUname = "updateUname"
- _actForFace = "updateFace"
- _actForAdmin = "updateByAdmin"
- )
- func (s *Service) cachesubproc() {
- defer s.waiter.Done()
- var msgs = s.cacheSub.Messages()
- for {
- var (
- msg *databus.Message
- ok bool
- err error
- )
- if msg, ok = <-msgs; !ok {
- log.Error("s.cachesub.messages closed")
- return
- }
- if s.closeSub {
- return
- }
- m := &jobmdl.Rebuild{}
- if err = json.Unmarshal(msg.Value, m); err != nil {
- log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err)
- continue
- }
- log.Info("cacheSub key(%s) value(%s) start", msg.Key, msg.Value)
- var retryError error
- for {
- var (
- a *result.Archive
- infoReply *accgrpc.InfoReply
- c = context.TODO()
- )
- if retryError != nil {
- time.Sleep(10 * time.Millisecond)
- }
- a, retryError = s.resultDao.Archive(c, m.Aid)
- if retryError != nil {
- log.Error("s.resultDao.Archive(%d) error(%v)", m.Aid, retryError)
- continue
- }
- if a == nil || a.Mid == 0 {
- log.Info("cache break archive(%d) not exist or mid==0", m.Aid)
- break
- }
- infoReply, retryError = s.accGRPC.Info3(c, &accgrpc.MidReq{Mid: a.Mid})
- if retryError != nil {
- if ecode.Cause(retryError).Equal(ecode.MemberNotExist) {
- log.Info("archive(%d) mid(%d) not exist", m.Aid, a.Mid)
- break
- }
- log.Error("s.acc.RPC.Info3(%d) error(%v)", m.Aid, retryError)
- continue
- }
- if infoReply == nil {
- log.Error("infoReply mid(%d) err is nil,but info is nil too", a.Mid)
- break
- }
- if infoReply.Info.Name == "" || infoReply.Info.Face == "" {
- log.Error("empty info mid(%d) info(%+v)", infoReply.Info.Mid, infoReply.Info)
- break
- }
- for k, arcRPC := range s.arcServices {
- if retryError = arcRPC.ArcCache2(c, &arcmdl.ArgCache2{Aid: m.Aid, Tp: arcmdl.CacheUpdate}); retryError != nil {
- log.Error("s.arcRPC(%d).ArcCache2(%d) error(%v)", k, m.Aid, retryError)
- continue
- }
- }
- log.Info("archive(%d) mid(%d) uname(%s) update success", m.Aid, infoReply.Info.Mid, infoReply.Info.Name)
- break
- }
- msg.Commit()
- }
- }
- func (s *Service) accountNotifyproc() {
- defer s.waiter.Done()
- var msgs = s.accountNotifySub.Messages()
- for {
- var (
- msg *databus.Message
- ok bool
- err error
- c = context.TODO()
- )
- if msg, ok = <-msgs; !ok {
- log.Error("s.cachesub.messages closed")
- return
- }
- if s.closeSub {
- return
- }
- msg.Commit()
- m := &jobmdl.AccountNotify{}
- if err = json.Unmarshal(msg.Value, m); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
- continue
- }
- log.Info("accountNotify got key(%s) value(%s)", msg.Key, msg.Value)
- if m.Action != _actForAdmin && m.Action != _actForFace && m.Action != _actForUname {
- log.Warn("accountNotify skip action(%s) values(%s)", m.Action, msg.Value)
- continue
- }
- var count int
- if count, err = s.arcServices[0].UpCount2(c, &arcmdl.ArgUpCount2{Mid: m.Mid}); err != nil {
- log.Error("s.arcRPC.UpCount2(%d) error(%v)", m.Mid, err)
- continue
- }
- if count == 0 {
- log.Info("accountNotify mid(%d) passed(%d)", m.Mid, count)
- continue
- }
- if m.Action == _actForAdmin {
- // check uname or face is updated
- var am []*api.Arc
- if am, err = s.arcServices[0].UpArcs3(c, &arcmdl.ArgUpArcs2{Mid: m.Mid, Ps: 2, Pn: 1}); err != nil {
- if ecode.Cause(err).Equal(ecode.NothingFound) {
- err = nil
- log.Info("accountNotify mid(%d) no passed archive", m.Mid)
- continue
- }
- log.Error("accountNotify mid(%d) error(%v)", m.Mid, err)
- continue
- }
- if len(am) == 0 {
- log.Info("accountNotify mid(%d) no passed archive", m.Mid)
- continue
- }
- var reply *accgrpc.InfoReply
- if reply, err = s.accGRPC.Info3(c, &accgrpc.MidReq{Mid: m.Mid}); err != nil || reply == nil {
- log.Error("accountNotify accRPC.info3(%d) error(%v)", m.Mid, err)
- continue
- }
- if reply.Info.Name == am[0].Author.Name && reply.Info.Face == am[0].Author.Face {
- log.Info("accountNotify face(%s) name(%s) not change", reply.Info.Face, reply.Info.Name)
- continue
- }
- }
- s.notifyMu.Lock()
- s.notifyMid[m.Mid] = struct{}{}
- s.notifyMu.Unlock()
- }
- }
- func (s *Service) clearMidCache() {
- defer s.waiter.Done()
- for {
- time.Sleep(5 * time.Second)
- s.notifyMu.Lock()
- mids := s.notifyMid
- s.notifyMid = make(map[int64]struct{})
- s.notifyMu.Unlock()
- for mid := range mids {
- s.updateUpperCache(context.TODO(), mid)
- }
- if s.closeSub && len(s.notifyMid) == 0 {
- return
- }
- }
- }
- func (s *Service) updateUpperCache(c context.Context, mid int64) (err error) {
- // update archive cache
- var aids []int64
- if aids, err = s.resultDao.UpPassed(c, mid); err != nil {
- log.Error("s.resultDao.UpPassed(%d) error(%v)", mid, err)
- return
- }
- failedCnt := 0
- for _, aid := range aids {
- for k, rpc := range s.arcServices {
- if err = rpc.ArcCache2(c, &arcmdl.ArgCache2{Aid: aid}); err != nil {
- log.Error("s.arcRPC(%d).ArcCache2(%d) mid(%d) error(%v)", k, aid, mid, err)
- failedCnt++
- }
- }
- }
- if failedCnt > 0 {
- log.Error("accountNotify updateUpperCache mid(%d) failed(%d)", mid, failedCnt)
- return
- }
- log.Info("accountNofity updateUpperCache mid(%d) successed(%d)", mid, len(aids))
- return
- }
|