block.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/member-cache/model"
  6. "go-common/library/log"
  7. "go-common/library/queue/databus"
  8. "github.com/pkg/errors"
  9. )
  10. func (s *Service) handleBlockBinLog(ctx context.Context, msg *databus.Message) error {
  11. defer func() {
  12. if err := msg.Commit(); err != nil {
  13. log.Error("Failed to commit message: %s: %+v", BeautifyMessage(msg), err)
  14. return
  15. }
  16. }()
  17. mu := &model.Binlog{}
  18. if err := json.Unmarshal(msg.Value, mu); err != nil {
  19. return errors.WithStack(err)
  20. }
  21. mmid := &model.NeastMid{}
  22. bs := mu.New
  23. if len(bs) <= 0 {
  24. bs = mu.Old
  25. }
  26. if err := json.Unmarshal(bs, mmid); err != nil {
  27. return errors.WithStack(err)
  28. }
  29. defer s.NotifyPurgeCache(ctx, mmid.Mid, model.ActBlockUser)
  30. return s.dao.DeleteUserBlockCache(ctx, mmid.Mid)
  31. }
  32. func (s *Service) blockBinLogproc(ctx context.Context) {
  33. for msg := range s.blockBinLog.Messages() {
  34. if err := s.handleBlockBinLog(ctx, msg); err != nil {
  35. log.Error("Failed to handle block binlog: %s: %+v", BeautifyMessage(msg), err)
  36. continue
  37. }
  38. log.Info("Succeed to handle block binlog: %s", BeautifyMessage(msg))
  39. }
  40. }