123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "time"
- "go-common/app/job/main/block/conf"
- "go-common/app/job/main/block/model"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "github.com/pkg/errors"
- )
- func (s *Service) creditExpireHandler(c context.Context) {
- if conf.Conf.Property.CreditExpireCheckLimit <= 0 {
- log.Error("conf.Conf.Property.creditExpireCheckLimit [%d] <= 0", conf.Conf.Property.CreditExpireCheckLimit)
- return
- }
- var (
- mids = make([]int64, conf.Conf.Property.CreditExpireCheckLimit)
- startID int64
- err error
- )
- for len(mids) >= conf.Conf.Property.CreditExpireCheckLimit {
- log.Info("black house expire handle startID (%d)", startID)
- if startID, mids, err = s.dao.UserStatusList(c, model.BlockStatusCredit, startID, conf.Conf.Property.CreditExpireCheckLimit); err != nil {
- log.Error("%+v", err)
- return
- }
- for _, mid := range mids {
- log.Info("Start handle black house mid (%d)", mid)
- var ok bool
- if ok, err = s.creditExpireCheck(c, mid); err != nil {
- log.Error("%+v", err)
- continue
- }
- if ok {
- log.Info("Start remove black house mid (%d)", mid)
- if err = s.creditExpireRemove(c, mid); err != nil {
- log.Error("%+v", err)
- }
- }
- }
- }
- }
- func (s *Service) creditExpireCheck(c context.Context, mid int64) (ok bool, err error) {
- var (
- his *model.DBHistory
- ex *model.DBExtra
- )
- if his, err = s.dao.UserLastHistory(c, mid); err != nil {
- return
- }
- if his == nil {
- return
- }
- log.Info("Credit check his (%+v)", his)
- if his.Action != model.BlockActionLimit || his.Source != model.BlockSourceCredit {
- return
- }
- if ex, err = s.dao.UserExtra(c, mid); err != nil {
- return
- }
- if ex == nil {
- return
- }
- log.Info("Credit check extra (%+v)", his)
- if ex.ActionTime.Before(his.StartTime) {
- return
- }
- if his.StartTime.Add(time.Duration(his.Duration) * time.Second).After(time.Now()) {
- return
- }
- ok = true
- return
- }
- func (s *Service) creditExpireRemove(c context.Context, mid int64) (err error) {
- var (
- db = &model.DBHistory{
- MID: mid,
- AdminID: -1,
- AdminName: "sys",
- Source: model.BlockSourceRemove,
- Area: model.BlockAreaNone,
- Reason: "小黑屋自动解封",
- Comment: "小黑屋自动解封",
- Action: model.BlockActionSelfRemove,
- StartTime: time.Now(),
- Duration: 0,
- Notify: false,
- }
- tx *xsql.Tx
- )
- if tx, err = s.dao.BeginTX(c); err != nil {
- return
- }
- if err = s.dao.TxInsertHistory(c, tx, db); err != nil {
- tx.Rollback()
- return
- }
- if err = s.dao.TxUpsertUser(c, tx, mid, model.BlockStatusFalse); err != nil {
- tx.Rollback()
- return
- }
- if err = tx.Commit(); err != nil {
- err = errors.WithStack(err)
- }
- s.mission(func() {
- if err := s.notifyRemoveMSG(context.TODO(), []int64{mid}); err != nil {
- log.Error("%+v", err)
- }
- })
- s.cache.Save(func() {
- if err := s.dao.DeleteUserCache(context.TODO(), mid); err != nil {
- log.Error("%+v", err)
- }
- if databusErr := s.accountNotify(context.TODO(), mid); databusErr != nil {
- log.Error("%+v", databusErr)
- }
- })
- return
- }
- func (s *Service) notifyRemoveMSG(c context.Context, mids []int64) (err error) {
- code, title, content := s.MSGRemoveInfo()
- if err = s.dao.SendSysMsg(c, code, mids, title, content, ""); err != nil {
- return
- }
- return
- }
- // databus
- func (s *Service) creditsubproc() {
- defer func() {
- if x := recover(); x != nil {
- log.Error("%+v", errors.WithStack(fmt.Errorf("s.creditsubproc panic(%v)", x)))
- go s.creditsubproc()
- log.Info("s.creditsubproc recover")
- }
- }()
- var (
- msg *databus.Message
- eventMSG *model.CreditAnswerMSG
- err error
- msgChan = s.creditSub.Messages()
- c = context.TODO()
- )
- for msg = range msgChan {
- if err = msg.Commit(); err != nil {
- log.Error("msg.Commit error(%v)", err)
- }
- eventMSG = &model.CreditAnswerMSG{}
- if err = json.Unmarshal([]byte(msg.Value), eventMSG); err != nil {
- log.Error("%+v", errors.WithStack(err))
- continue
- }
- if err = s.handleCreditAnswerMSG(c, eventMSG); err != nil {
- log.Error("%+v", err)
- continue
- }
- log.Info("s.handleCreditAnswerMSG(%v) msg", eventMSG)
- }
- log.Info("creditsubproc end")
- }
- func (s *Service) handleCreditAnswerMSG(c context.Context, msg *model.CreditAnswerMSG) (err error) {
- if msg.MID <= 0 {
- return
- }
- var (
- extra = &model.DBExtra{
- MID: msg.MID,
- CreditAnswerFlag: true,
- ActionTime: msg.MTime.Time(),
- }
- checkFlag bool
- )
- if err = s.dao.InsertExtra(c, extra); err != nil {
- return
- }
- // 及时检查解封
- log.Info("Start check black house mid (%d) from answer", extra.MID)
- if checkFlag, err = s.creditExpireCheck(c, extra.MID); err != nil {
- return
- }
- if checkFlag {
- log.Info("Start remove black house mid (%d)", extra.MID)
- if err = s.creditExpireRemove(c, extra.MID); err != nil {
- return
- }
- }
- return
- }
|