award.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/coin/dao"
  7. "go-common/app/job/main/coin/model"
  8. accmdl "go-common/app/service/main/account/api"
  9. coinmdl "go-common/app/service/main/coin/model"
  10. mmdl "go-common/app/service/main/member/api"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. "go-common/library/stat/prom"
  15. )
  16. var _passportLog = 53
  17. func (s *Service) awardDo(ms []interface{}) {
  18. for _, m := range ms {
  19. mu, ok := m.(*model.LoginLog)
  20. if !ok {
  21. continue
  22. }
  23. if mu.Business != _passportLog {
  24. continue
  25. }
  26. prom.BusinessInfoCount.Incr("award-event")
  27. err := s.award(context.TODO(), mu.Mid, mu.Timestamp, mu.IP)
  28. if err != nil {
  29. log.Error("s.award mid %v err %v", mu.Mid, err)
  30. }
  31. log.Info("conmsumer login log, mid:%v,time %d, ip: %s err: %v", mu.Mid, mu.Timestamp, mu.IP, err)
  32. }
  33. }
  34. func split(msg *databus.Message, data interface{}) int {
  35. t, ok := data.(*model.LoginLog)
  36. if !ok {
  37. return 0
  38. }
  39. return int(t.Mid)
  40. }
  41. func newMsg(msg *databus.Message) (res interface{}, err error) {
  42. loginlog := new(model.LoginLog)
  43. if err = json.Unmarshal(msg.Value, &loginlog); err != nil {
  44. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  45. dao.PromError("loginlog:Unmarshal")
  46. return
  47. }
  48. var t time.Time
  49. if t, err = time.ParseInLocation("2006-01-02 15:04:05", loginlog.CTime, time.Local); err != nil {
  50. log.Error("time.parse(%s) error(%v)", msg.Value, err)
  51. dao.PromError("loginlog:Timeparse")
  52. return
  53. }
  54. loginlog.Timestamp = t.Unix()
  55. loginlog.RawData = string(msg.Value)
  56. res = loginlog
  57. return
  58. }
  59. func newExpMsg(msg *databus.Message) (res interface{}, err error) {
  60. loginlog := new(model.LoginLog)
  61. explog := new(model.AddExp)
  62. if err = json.Unmarshal(msg.Value, &explog); err != nil {
  63. log.Error("newExpMsg json.Unmarshal(%s) error(%v)", msg.Value, err)
  64. dao.PromError("loginlog:ExpUnmarshal")
  65. return
  66. }
  67. loginlog.Mid = explog.Mid
  68. loginlog.Timestamp = explog.Ts
  69. loginlog.IP = explog.IP
  70. loginlog.RawData = string(msg.Value)
  71. res = loginlog
  72. return
  73. }
  74. func (s *Service) award(c context.Context, mid, ts int64, ip string) (err error) {
  75. if !s.c.CoinJob.Start || ts < s.c.CoinJob.StartTime {
  76. return
  77. }
  78. if (mid == 0) || (ts == 0) || (ip == "") {
  79. return
  80. }
  81. day := int64(time.Unix(ts, 0).Day())
  82. var login bool
  83. for {
  84. if login, err = s.coinDao.Logined(c, mid, day); err == nil {
  85. break
  86. }
  87. dao.PromError("redis-logined-retry")
  88. time.Sleep(time.Millisecond * 500)
  89. }
  90. if login {
  91. return
  92. }
  93. // false mean first login,
  94. var base *mmdl.BaseInfoReply
  95. base, err = s.memRPC.Base(c, &mmdl.MemberMidReq{Mid: mid})
  96. if err != nil {
  97. if err == ecode.MemberNotExist {
  98. return
  99. }
  100. log.Errorv(c, log.KV("log", "memRPC"), log.KV("err", err), log.KV("mid", mid))
  101. dao.PromError("登录奖励member")
  102. return
  103. }
  104. if (base != nil) && (base.Rank == 5000) {
  105. log.Infov(c, log.KV("log", "add coin but user not member"), log.KV("mid", mid))
  106. return
  107. }
  108. var profile *accmdl.ProfileReply
  109. profile, _ = s.profile(c, mid)
  110. if (profile != nil) && (profile.Profile != nil) && (profile.Profile.TelStatus == 0) {
  111. log.Infov(c, log.KV("log", "login award failed. no telphone"), log.KV("mid", mid))
  112. return
  113. }
  114. if _, err = s.coinRPC.ModifyCoin(c, &coinmdl.ArgModifyCoin{Mid: mid, Count: 1, Reason: "登录奖励", IP: ip, CheckZero: 1}); err != nil {
  115. dao.PromError("登录奖励RPC")
  116. return
  117. }
  118. for {
  119. if err = s.coinDao.SetLogin(c, mid, day); err == nil {
  120. break
  121. }
  122. dao.PromError("登录奖励setLogin-retry")
  123. time.Sleep(time.Millisecond * 500)
  124. }
  125. prom.BusinessInfoCount.Incr("award-event-success")
  126. log.Info("add coin success mid: %+v", mid)
  127. return
  128. }
  129. func (s *Service) profile(c context.Context, mid int64) (res *accmdl.ProfileReply, err error) {
  130. arg := &accmdl.MidReq{Mid: mid}
  131. if res, err = s.accRPC.Profile3(c, arg); err != nil {
  132. dao.PromError("award:Profile3")
  133. log.Errorv(c, log.KV("log", "Profile3"), log.KV("err", err))
  134. }
  135. return
  136. }