exp.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/member/model"
  7. "go-common/library/log"
  8. )
  9. const (
  10. expMulti = 100
  11. level1 = 1
  12. level2 = 200
  13. level3 = 1500
  14. level4 = 4500
  15. level5 = 10800
  16. level6 = 28800
  17. )
  18. func (s *Service) initExp(c context.Context, mid int64) (err error) {
  19. var opers []*model.ExpOper
  20. if opers, err = s.CheckExpInit(c, mid); err != nil {
  21. log.Error("s.CheckExpInit(%d) error(%v)", mid, err)
  22. return
  23. }
  24. if len(opers) == 0 {
  25. log.Info("s.CheckExpInit(%d) opers eq(0) continue", mid)
  26. return
  27. }
  28. var exp *model.NewExp
  29. if exp, err = s.dao.SelExp(c, mid); err != nil {
  30. log.Error("s.dao.SelExp(%d) error(%v)", mid, err)
  31. return
  32. }
  33. if exp.Mid == 0 {
  34. if err = s.dao.InitExp(c, mid); err != nil {
  35. log.Error("s.dao.InitExp(%d) init user exp completed error(%v)", mid, err)
  36. return
  37. }
  38. }
  39. var (
  40. rows int64
  41. now = time.Now().Unix()
  42. )
  43. for _, oper := range opers {
  44. if rows, err = s.dao.UpdateExpAped(c, mid, oper.Count*100, oper.Flag); err != nil {
  45. log.Error("s.dao.UpdateExpAped(%d) error(%v)", mid, err)
  46. return
  47. }
  48. if rows == 0 {
  49. log.Info("s.dao.UpdateExpAped(%d) exp(%d) flag(%d) rows affected eq(0) continue", mid, oper.Count*100, oper.Flag)
  50. continue
  51. }
  52. if err = s.dao.DatabusAddLog(c, mid, exp.Exp/100, (exp.Exp+oper.Count*100)/100, now, oper.Oper, oper.Reason, ""); err != nil {
  53. log.Error("s.dao.DatabusAddLog(%d) fromExp(%d) toExp(%d) ts(%d) oper(%s) reason(%s) error(%v)", mid, exp.Exp/100, (exp.Exp+oper.Count*100)/100, now, oper.Oper, oper.Reason, err)
  54. err = nil
  55. continue
  56. } else {
  57. log.Info("s.dao.DatabusAddLog(%d) fromExp(%d) toExp(%d) ts(%d) oper(%s) reason(%s) msg published", mid, exp.Exp/100, (exp.Exp+oper.Count*100)/100, now, oper.Oper, oper.Reason)
  58. exp.Exp = exp.Exp + oper.Count*100
  59. now++
  60. }
  61. }
  62. return
  63. }
  64. func (s *Service) delayUpdateExp() {
  65. s.limiter.UpdateExp.Wait(context.Background())
  66. }
  67. func (s *Service) addExp(c context.Context, e *model.AddExp) (err error) {
  68. if e.Mid <= 0 {
  69. return
  70. }
  71. now := time.Unix(e.Ts, 0)
  72. exp, eo, added, ok, err := s.checkExpAdd(c, e.Mid, e.Event, now)
  73. if err != nil || added || !ok {
  74. log.Info("s.checkExpAdd(%d) event(%s) result added(%v) ok(%v) err(%v)", e.Mid, e.Event, added, ok, err)
  75. return
  76. }
  77. // 写数据库限速,防止写入过大导致主从延迟
  78. s.delayUpdateExp()
  79. var rows int64
  80. if rows, err = s.dao.UpdateExpAped(c, e.Mid, eo.Count*100, eo.Flag); err != nil {
  81. log.Error("s.dao.UpdateExpAped(%d) exp(%d) flag(%d) error(%v) ", e.Mid, eo.Count*100, eo.Flag, err)
  82. return
  83. }
  84. if rows == 0 {
  85. log.Info("s.dao.UpdateExpAped(%d) exp(%d) flag(%d) rows affected eq(0) continue!", e.Mid, eo.Count*100, eo.Flag)
  86. return
  87. }
  88. if _, err = s.dao.SetExpAdded(context.Background(), e.Mid, now.Day(), eo.Oper); err != nil {
  89. log.Error("s.dao.SetExpAdded(%d) oper(%s)", e.Mid, eo.Oper)
  90. err = nil
  91. }
  92. if err = s.dao.DatabusAddLog(context.Background(), e.Mid, (exp.Exp)/100, (exp.Exp+eo.Count*100)/100, e.Ts, eo.Oper, eo.Reason, e.IP); err != nil {
  93. log.Error("s.dao.DatabusAddLog(%d) oper(%s) reason(%s) error(%v)", e.Mid, eo.Oper, eo.Reason, err)
  94. err = nil
  95. } else {
  96. log.Info("s.dao.DatabusAddLog(%d) oper(%s) reason(%s) msg published!", e.Mid, eo.Oper, eo.Reason)
  97. }
  98. return
  99. }
  100. func (s *Service) awardDo(ms []interface{}) {
  101. for _, m := range ms {
  102. l, ok := m.(*model.LoginLogIPString)
  103. if !ok {
  104. continue
  105. }
  106. s.addExp(context.TODO(), &model.AddExp{
  107. Mid: l.Mid,
  108. IP: l.Loginip,
  109. Ts: l.Timestamp,
  110. Event: "login",
  111. })
  112. s.recoverMoral(context.TODO(), l.Mid)
  113. log.Info("consumer mid:%d,ts: %d", l.Mid, l.Timestamp)
  114. }
  115. }
  116. func isExpAndLevelChange(mu *model.Message) (bool, bool) {
  117. if mu.Action == "insert" {
  118. return true, true
  119. }
  120. if len(mu.Old) <= 0 || len(mu.New) <= 0 {
  121. return false, false
  122. }
  123. old := &model.ExpMessage{}
  124. new := &model.ExpMessage{}
  125. if err := json.Unmarshal(mu.New, new); err != nil {
  126. return false, false
  127. }
  128. if err := json.Unmarshal(mu.Old, old); err != nil {
  129. return false, false
  130. }
  131. expChange := false
  132. levelChange := false
  133. if old.Exp != new.Exp {
  134. expChange = true
  135. }
  136. if level(old.Exp) != level(new.Exp) {
  137. levelChange = true
  138. }
  139. return expChange, levelChange
  140. }
  141. func level(exp int64) int8 {
  142. exp = exp / expMulti
  143. switch {
  144. case exp < level1:
  145. return 0
  146. case exp < level2:
  147. return 1
  148. case exp < level3:
  149. return 2
  150. case exp < level4:
  151. return 3
  152. case exp < level5:
  153. return 4
  154. case exp < level6:
  155. return 5
  156. default:
  157. return 6
  158. }
  159. }