vip.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/card/model"
  7. cardapi "go-common/app/service/main/card/api/grpc/v1"
  8. cardmol "go-common/app/service/main/card/model"
  9. vipmol "go-common/app/service/main/vip/model"
  10. "go-common/library/log"
  11. )
  12. // ChangeEquipTime change vip equip time.
  13. func (s *Service) ChangeEquipTime(c context.Context, v *model.VipReq) (err error) {
  14. var res *cardapi.UserCardReply
  15. if res, err = s.cardRPC.UserCard(c, &cardapi.UserCardReq{Mid: v.Mid}); err != nil {
  16. return
  17. }
  18. if res.Res == nil ||
  19. res.Res.Id == 0 ||
  20. res.Res.CardType != cardmol.CardTypeVip {
  21. return
  22. }
  23. var expire int64
  24. switch {
  25. case v.VipType == vipmol.NotVip || v.VipStatus == vipmol.Expire:
  26. expire = time.Now().Unix()
  27. case v.VipOverdueTime != res.Res.ExpireTime:
  28. expire = v.VipOverdueTime
  29. default:
  30. }
  31. if expire == 0 {
  32. return
  33. }
  34. if err = s.dao.UpdateExpireTime(c, expire, v.Mid); err != nil {
  35. return
  36. }
  37. err = s.dao.DelCacheEquip(c, v.Mid)
  38. return
  39. }
  40. func (s *Service) vipchangeproc() {
  41. defer s.waiter.Done()
  42. msgs := s.vipConsumer.Messages()
  43. var err error
  44. for {
  45. msg, ok := <-msgs
  46. if !ok {
  47. log.Warn("[service.dataConsume|vip] dataConsumer has been closed.")
  48. return
  49. }
  50. if err = msg.Commit(); err != nil {
  51. log.Error("msg.Commit err(%+v)", err)
  52. }
  53. log.Info("cur consumer vipchangeproc(%v)", string(msg.Value))
  54. v := &model.MsgCanal{}
  55. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  56. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  57. continue
  58. }
  59. if v.Table != _tableUserInfo || v.Action != _updateAction {
  60. continue
  61. }
  62. n := new(model.VipUserInfoMsg)
  63. if err = json.Unmarshal(v.New, n); err != nil {
  64. log.Error("vipchangeproc json.Unmarshal val(%v) error(%v)", string(v.New), err)
  65. continue
  66. }
  67. o := new(model.VipUserInfoMsg)
  68. if err = json.Unmarshal(v.Old, o); err != nil {
  69. log.Error("vipchangeproc json.Unmarshal val(%v) error(%v)", string(v.Old), err)
  70. continue
  71. }
  72. if n.VipStatus == o.VipStatus &&
  73. n.VipType == o.VipType &&
  74. n.VipOverdueTime == o.VipOverdueTime {
  75. continue
  76. }
  77. var duetime time.Time
  78. if duetime, err = time.ParseInLocation("2006-01-02 15:04:05", n.VipOverdueTime, time.Local); err != nil {
  79. log.Error("vipchangeproc ParseInLocation val(%s) error(%v)", n.VipOverdueTime, err)
  80. continue
  81. }
  82. if err = s.ChangeEquipTime(context.Background(), &model.VipReq{
  83. Mid: n.Mid,
  84. VipType: n.VipType,
  85. VipStatus: n.VipStatus,
  86. VipOverdueTime: duetime.Unix(),
  87. }); err != nil {
  88. log.Error("ChangeEquipTime val(%+v) error(%v)", n, err)
  89. continue
  90. }
  91. }
  92. }