service.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/account-recovery/conf"
  8. "go-common/app/job/main/account-recovery/dao"
  9. "go-common/app/job/main/account-recovery/model"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. const (
  14. _recoveryLog = "account_recovery_info"
  15. //_retry = 10
  16. _retrySleep = time.Second * 1
  17. )
  18. // Service struct
  19. type Service struct {
  20. c *conf.Config
  21. dao *dao.Dao
  22. compareDatabus *databus.Databus
  23. sendMailDatabus *databus.Databus
  24. }
  25. // New init
  26. func New(c *conf.Config) (s *Service) {
  27. s = &Service{
  28. c: c,
  29. dao: dao.New(c),
  30. compareDatabus: databus.New(c.DataBus.CompareDatabus),
  31. sendMailDatabus: databus.New(c.DataBus.SendMailDatabus),
  32. }
  33. go s.compareConsumeproc()
  34. go s.sendMailConsumeproc()
  35. return
  36. }
  37. func (s *Service) compareConsumeproc() {
  38. var (
  39. msg *databus.Message
  40. err error
  41. ok bool
  42. )
  43. for {
  44. if msg, ok = <-s.compareDatabus.Messages(); !ok {
  45. log.Error("s.compareDatabus.Message err(%v)", err)
  46. return
  47. }
  48. log.Info("receive msg (%v)", msg.Key)
  49. mu := &model.Message{}
  50. if err = json.Unmarshal(msg.Value, &mu); err != nil {
  51. log.Error("s.compareDatabus.Message err(%v)", err)
  52. continue
  53. }
  54. for {
  55. switch {
  56. case strings.HasPrefix(mu.Table, _recoveryLog):
  57. if mu.Action == "insert" {
  58. log.Info("begin compare (%v)", msg.Key)
  59. err = s.compare(context.TODO(), mu.New)
  60. log.Info("end compare (%v)", msg.Key)
  61. }
  62. }
  63. log.Info("compare switch case (%v), (%v), (%v), (%v), (%v)", strings.HasPrefix(mu.Table, _recoveryLog), mu.Action == "insert", mu.Table, msg.Key, mu.Action)
  64. if err != nil {
  65. log.Error("s.flush error(%v)", err)
  66. time.Sleep(_retrySleep)
  67. continue
  68. }
  69. break
  70. }
  71. log.Info("subproc key:%v,topic: %v, part:%v offset:%v,message %s,", msg.Key, msg.Topic, msg.Partition, msg.Offset, msg.Value)
  72. err = msg.Commit()
  73. if err != nil {
  74. log.Error("commit err: %v", err)
  75. }
  76. }
  77. }
  78. func (s *Service) sendMailConsumeproc() {
  79. var (
  80. msg *databus.Message
  81. err error
  82. ok bool
  83. )
  84. for {
  85. if msg, ok = <-s.sendMailDatabus.Messages(); !ok {
  86. log.Error("s.sendMailDatabus.Message err(%v)", err)
  87. return
  88. }
  89. log.Info("receive msg (%v)", msg.Key)
  90. mu := &model.Message{}
  91. if err = json.Unmarshal(msg.Value, &mu); err != nil {
  92. log.Error("s.sendMailDatabus.Message err(%v)", err)
  93. continue
  94. }
  95. for {
  96. switch {
  97. case strings.HasPrefix(mu.Table, _recoveryLog):
  98. if mu.Action == "update" {
  99. log.Info("begin sendMail (%v)", msg.Key)
  100. err = s.sendMail(context.TODO(), mu.New, mu.Old)
  101. }
  102. }
  103. log.Info("sendMail switch case (%v), (%v), (%v), (%v), (%v)", strings.HasPrefix(mu.Table, _recoveryLog), mu.Action == "update", mu.Table, msg.Key, mu.Action)
  104. if err != nil {
  105. log.Error("s.flush error(%v)", err)
  106. time.Sleep(_retrySleep)
  107. continue
  108. }
  109. break
  110. }
  111. log.Info("subproc key:%v,topic: %v, part:%v offset:%v,message %s,", msg.Key, msg.Topic, msg.Partition, msg.Offset, msg.Value)
  112. err = msg.Commit()
  113. if err != nil {
  114. log.Error("commit err: %v", err)
  115. }
  116. }
  117. }
  118. // compare compare
  119. func (s *Service) compare(c context.Context, msg []byte) (err error) {
  120. r := &model.RecoveryInfo{}
  121. if err = json.Unmarshal(msg, r); err != nil {
  122. log.Error("s.compare err(%v)", err)
  123. return
  124. }
  125. err = s.dao.CompareInfo(c, r.Rid)
  126. return
  127. }
  128. // sendMail send mail
  129. func (s *Service) sendMail(c context.Context, new []byte, old []byte) (err error) {
  130. oldInfo := &model.RecoveryInfo{}
  131. newInfo := &model.RecoveryInfo{}
  132. if err = json.Unmarshal(old, oldInfo); err != nil {
  133. log.Error("failed to oldInfo unmarshal err(%v)", err)
  134. return
  135. }
  136. if err = json.Unmarshal(new, newInfo); err != nil {
  137. log.Error("failed to newInfo unmarshal err(%v)", err)
  138. return
  139. }
  140. if oldInfo.Status == 0 && (newInfo.Status == 1 || newInfo.Status == 2) {
  141. err = s.dao.SendMail(c, newInfo.Rid, newInfo.Status)
  142. }
  143. return
  144. }
  145. // Close Service
  146. func (s *Service) Close() {
  147. s.compareDatabus.Close()
  148. s.sendMailDatabus.Close()
  149. }