credit.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/credit/model"
  7. "go-common/library/log"
  8. )
  9. const (
  10. _blockedJuryTable = "blocked_jury"
  11. _blockedInfoTable = "blocked_info"
  12. _blockedCaseTable = "blocked_case"
  13. _voteOpinion = "blocked_opinion"
  14. _blockedKpiTable = "blocked_kpi"
  15. _blockedPublishTable = "blocked_publish"
  16. _blockedVoteCaseTable = "blocked_case_vote"
  17. _blockedCaseApplyLogTable = "blocked_case_apply_log"
  18. _blockedLabourAnswerLog = "blocked_labour_answer_log"
  19. _retry = 3
  20. _retrySleep = time.Second * 1
  21. )
  22. func (s *Service) creditConsumer() {
  23. var err error
  24. for res := range s.credbSub.Messages() {
  25. mu := &model.Message{}
  26. if err = json.Unmarshal(res.Value, mu); err != nil {
  27. log.Error("credit-job,json.Unmarshal (%v) error(%v)", string(res.Value), err)
  28. continue
  29. }
  30. for i := 0; ; i++ {
  31. err = s.dealCredit(mu)
  32. if err != nil {
  33. log.Error("s.flush error(%v)", err)
  34. time.Sleep(_retrySleep)
  35. if i > _retry && s.c.Env == "prod" {
  36. s.dao.Sms(context.TODO(), s.c.Sms.Phone, s.c.Sms.Token, "credit-job update cache fail for 5 times")
  37. i = 0
  38. }
  39. continue
  40. }
  41. break
  42. }
  43. if err = res.Commit(); err != nil {
  44. log.Error("databus.Commit err(%v)", err)
  45. }
  46. log.Info("subproc key:%v,topic: %v, part:%v offset:%v,message %s,", res.Key, res.Topic, res.Partition, res.Offset, res.Value)
  47. }
  48. }
  49. // dealAction deal databus action
  50. func (s *Service) dealCredit(mu *model.Message) (err error) {
  51. switch mu.Table {
  52. case _blockedCaseTable:
  53. if mu.Action == "insert" {
  54. s.RegReply(context.TODO(), mu.Table, mu.New, mu.Old)
  55. }
  56. err = s.Judge(context.TODO(), mu.New, mu.Old)
  57. s.GrantCase(context.TODO(), mu.New, mu.Old)
  58. s.DelGrantCase(context.TODO(), mu.New, mu.Old)
  59. s.DelCaseInfoCache(context.TODO(), mu.New)
  60. case _blockedInfoTable:
  61. if mu.Action == "insert" {
  62. s.RegReply(context.TODO(), mu.Table, mu.New, mu.Old)
  63. s.InvalidJury(context.TODO(), mu.New, mu.Old)
  64. }
  65. if mu.Action == "update" {
  66. s.UnBlockAccount(context.TODO(), mu.New, mu.Old)
  67. }
  68. s.UpdateCache(context.TODO(), mu.New)
  69. case _blockedKpiTable:
  70. if mu.Action == "insert" {
  71. s.KPIReward(context.TODO(), mu.New, mu.Old)
  72. }
  73. case _voteOpinion:
  74. s.DeleteIdx(context.TODO(), mu.New)
  75. case _blockedPublishTable:
  76. s.RegReply(context.TODO(), mu.Table, mu.New, mu.Old)
  77. case _blockedVoteCaseTable:
  78. s.DelVoteCaseCache(context.TODO(), mu.New)
  79. case _blockedLabourAnswerLog:
  80. if mu.Action == "insert" {
  81. s.NotifyBlockAnswer(context.TODO(), mu.New)
  82. }
  83. case _blockedCaseApplyLogTable:
  84. if mu.Action == "insert" {
  85. s.DealCaseApplyReason(context.TODO(), mu.New)
  86. }
  87. case _blockedJuryTable:
  88. s.DelJuryInfoCache(context.TODO(), mu.New)
  89. }
  90. return
  91. }