123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- package service
- import (
- "context"
- "encoding/json"
- "strings"
- "go-common/app/job/main/member-cache/model"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "github.com/pkg/errors"
- )
- // consts
- const (
- _MemberBaseInfo = "user_base_"
- _MemberMoral = "user_moral"
- _memberExp = "user_exp_"
- _memberRealnameApply = "realname_apply"
- _memberRealnameInfo = "realname_info"
- )
- // consts
- const (
- expMulti = 100
- level1 = 1
- level2 = 200
- level3 = 1500
- level4 = 4500
- level5 = 10800
- level6 = 28800
- )
- func isExpAndLevelChange(mu *model.Binlog) (bool, bool) {
- if mu.Action == "insert" {
- return true, true
- }
- if len(mu.Old) <= 0 || len(mu.New) <= 0 {
- return false, false
- }
- old := &model.ExpMessage{}
- new := &model.ExpMessage{}
- if err := json.Unmarshal(mu.New, new); err != nil {
- return false, false
- }
- if err := json.Unmarshal(mu.Old, old); err != nil {
- return false, false
- }
- expChange := false
- levelChange := false
- if old.Exp != new.Exp {
- expChange = true
- }
- if level(old.Exp) != level(new.Exp) {
- levelChange = true
- }
- return expChange, levelChange
- }
- func level(exp int64) int8 {
- exp = exp / expMulti
- switch {
- case exp < level1:
- return 0
- case exp < level2:
- return 1
- case exp < level3:
- return 2
- case exp < level4:
- return 3
- case exp < level5:
- return 4
- case exp < level6:
- return 5
- default:
- return 6
- }
- }
- func resolveBaseAct(old json.RawMessage, new json.RawMessage) string {
- if old == nil || new == nil {
- return model.ActUpdateFace
- }
- ob := &model.MemberBase{}
- if err := json.Unmarshal(old, ob); err != nil {
- log.Error("Failed to parse data: %s", string(old))
- return model.ActUpdateFace
- }
- nb := &model.MemberBase{}
- if err := json.Unmarshal(new, nb); err != nil {
- log.Error("Failed to parse data: %s", string(new))
- return model.ActUpdateFace
- }
- if ob.Name != nb.Name {
- return model.ActUpdateUname
- }
- if ob.Face != nb.Face {
- return model.ActUpdateFace
- }
- return model.ActUpdateFace
- }
- func (s *Service) handleMemberBinLog(ctx context.Context, msg *databus.Message) error {
- defer func() {
- if err := msg.Commit(); err != nil {
- log.Error("Failed to commit message: %+v", BeautifyMessage(msg))
- return
- }
- }()
- mu := &model.Binlog{}
- if err := json.Unmarshal(msg.Value, mu); err != nil {
- return errors.WithStack(err)
- }
- mmid := &model.NeastMid{}
- bs := mu.New
- if len(bs) <= 0 {
- bs = mu.Old
- }
- if err := json.Unmarshal(bs, mmid); err != nil {
- return errors.WithStack(err)
- }
- switch {
- case strings.HasPrefix(mu.Table, _MemberBaseInfo):
- if err := s.dao.DelBaseInfoCache(ctx, mmid.Mid); err != nil {
- return err
- }
- s.NotifyPurgeCache(ctx, mmid.Mid, resolveBaseAct(mu.Old, mu.New))
- case mu.Table == _MemberMoral:
- if err := s.dao.DelMoralCache(ctx, mmid.Mid); err != nil {
- return err
- }
- s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateMoral)
- case mu.Table == _memberRealnameInfo || mu.Table == _memberRealnameApply:
- if err := s.dao.DeleteRealnameCache(ctx, mmid.Mid); err != nil {
- return err
- }
- s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateRealname)
- case strings.HasPrefix(mu.Table, _memberExp):
- mexp := &model.NewExp{}
- if err := json.Unmarshal(mu.New, mexp); err != nil {
- return errors.WithStack(err)
- }
- if err := s.dao.SetExpCache(ctx, mexp.Mid, mexp.Exp); err != nil {
- return err
- }
- expChange, levelChange := isExpAndLevelChange(mu)
- if expChange {
- s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateExp)
- }
- if levelChange {
- s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateLevel)
- }
- default:
- if mmid.Mid <= 0 {
- log.Info("Invalid message: %+v", BeautifyMessage(msg))
- return nil
- }
- s.deleteAllCache(ctx, mmid.Mid)
- s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateByAdmin)
- }
- return nil
- }
- func (s *Service) deleteAllCache(ctx context.Context, mid int64) error {
- if err := s.dao.DelBaseInfoCache(ctx, mid); err != nil {
- log.Error("Failed to delete cache: %+v", err)
- }
- if err := s.dao.DelMoralCache(ctx, mid); err != nil {
- log.Error("Failed to delete cache: %+v", err)
- }
- if err := s.dao.DeleteRealnameCache(ctx, mid); err != nil {
- log.Error("Failed to delete cache: %+v", err)
- }
- if err := s.dao.DelExpCache(ctx, mid); err != nil {
- log.Error("Failed to delete cache: %+v", err)
- }
- return nil
- }
- func (s *Service) memberBinLogproc(ctx context.Context) {
- for msg := range s.memberBinLog.Messages() {
- if err := s.handleMemberBinLog(ctx, msg); err != nil {
- log.Error("Failed to handle member binlog: %s: %+v", BeautifyMessage(msg), err)
- continue
- }
- log.Info("Succeed to handle member binlog: %s", BeautifyMessage(msg))
- }
- }
|