binlog_elec.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "runtime/debug"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/ugcpay/model"
  9. // "go-common/app/service/main/ugcpay-rank/internal/service/rank"
  10. "go-common/library/log"
  11. "github.com/pkg/errors"
  12. )
  13. const (
  14. _tableOldElecOrder = "elec_pay_order"
  15. _tableOldElecMessage = "elec_message"
  16. _tableOldElecUserSetting = "elec_user_setting"
  17. )
  18. var (
  19. _ctx = context.Background()
  20. )
  21. func (s *Service) elecbinlogproc() {
  22. defer func() {
  23. if x := recover(); x != nil {
  24. log.Error("binlogproc panic(%+v) :\n %s", x, debug.Stack())
  25. go s.binlogproc()
  26. }
  27. }()
  28. log.Info("Start binlogproc")
  29. var (
  30. err error
  31. )
  32. for res := range s.elecBinlogMQ.Messages() {
  33. if err != nil {
  34. log.Error("binlogproc consume key:%v, topic: %v, part:%v, offset:%v, message %s, err: %+v", res.Key, res.Topic, res.Partition, res.Offset, res.Value, err)
  35. err = nil
  36. }
  37. if time.Since(time.Unix(res.Timestamp, 0)) >= time.Hour*24 {
  38. log.Error("binlogproc consume expired msg, key:%v, topic: %v, part:%v, offset:%v, message %s", res.Key, res.Topic, res.Partition, res.Offset, res.Value)
  39. continue
  40. }
  41. msg := &model.Message{}
  42. if err = json.Unmarshal(res.Value, msg); err != nil {
  43. err = errors.WithStack(err)
  44. continue
  45. }
  46. switch msg.Table {
  47. case _tableOldElecOrder:
  48. data := &model.DBOldElecPayOrder{}
  49. if err = json.Unmarshal(msg.New, data); err != nil {
  50. err = errors.Wrapf(err, "%s", msg.New)
  51. continue
  52. }
  53. if err = s.handleElecOrder(_ctx, data); err != nil {
  54. continue
  55. }
  56. case _tableOldElecMessage:
  57. data := &model.DBOldElecMessage{}
  58. if err = json.Unmarshal(msg.New, data); err != nil {
  59. err = errors.Wrapf(err, "%s", msg.New)
  60. continue
  61. }
  62. if err = s.handleOldElecMessage(_ctx, data); err != nil {
  63. continue
  64. }
  65. case _tableOldElecUserSetting:
  66. data := &model.DBOldElecUserSetting{}
  67. if err = json.Unmarshal(msg.New, data); err != nil {
  68. err = errors.Wrapf(err, "%s", msg.New)
  69. continue
  70. }
  71. if err = s.handleElecUserSetting(_ctx, data); err != nil {
  72. continue
  73. }
  74. default:
  75. log.Error("binlogproc unknown table: %s", msg.Table)
  76. }
  77. if err = res.Commit(); err != nil {
  78. err = errors.Wrapf(err, "binlogproc commit")
  79. continue
  80. }
  81. log.Info("binlogproc consume msg, key:%v, topic: %v, part:%v, offset:%v, message %s", res.Key, res.Topic, res.Partition, res.Offset, res.Value)
  82. }
  83. log.Info("End binlogproc")
  84. }
  85. func (s *Service) handleOldElecMessage(ctx context.Context, msg *model.DBOldElecMessage) (err error) {
  86. log.Info("handleOldElecMessage message: %+v", msg)
  87. var (
  88. ver int64
  89. verTime time.Time
  90. avID int64
  91. )
  92. if verTime, err = time.Parse("2006-01", msg.DateVer); err != nil {
  93. err = errors.WithStack(err)
  94. return
  95. }
  96. ver = monthlyBillVer(verTime)
  97. if msg.AVID != "" {
  98. if avID, err = strconv.ParseInt(msg.AVID, 10, 64); err != nil {
  99. log.Error("%+v", errors.WithStack(err))
  100. avID = 0
  101. err = nil
  102. }
  103. }
  104. switch msg.Type {
  105. // 用户对up主留言
  106. case 1:
  107. dbMSG := &model.DBElecMessage{
  108. ID: msg.ID,
  109. Ver: ver,
  110. AVID: avID,
  111. UPMID: msg.RefMID,
  112. PayMID: msg.MID,
  113. Message: msg.Message,
  114. Replied: msg.State == 1,
  115. Hidden: msg.State == 2,
  116. CTime: msg.ParseCTime(),
  117. MTime: msg.ParseMTime(),
  118. }
  119. if err = s.dao.UpsertElecMessage(ctx, dbMSG); err != nil {
  120. return
  121. }
  122. if err = s.dao.RankElecUpdateMessage(ctx, dbMSG.AVID, dbMSG.UPMID, dbMSG.PayMID, dbMSG.Ver, dbMSG.Message, dbMSG.Hidden); err != nil {
  123. return
  124. }
  125. // up主回复用户
  126. case 2:
  127. dbReply := &model.DBElecReply{
  128. ID: msg.ID,
  129. MSGID: msg.RefID,
  130. Reply: msg.Message,
  131. Hidden: msg.State == 2,
  132. CTime: msg.ParseCTime(),
  133. MTime: msg.ParseMTime(),
  134. }
  135. if err = s.dao.UpsertElecReply(ctx, dbReply); err != nil {
  136. return
  137. }
  138. default:
  139. log.Error("old_ele_message unknown type: %+v", msg)
  140. }
  141. return
  142. }
  143. func (s *Service) handleElecOrder(ctx context.Context, order *model.DBOldElecPayOrder) (err error) {
  144. log.Info("handleElecOrder order: %+v", order)
  145. if !order.IsPaid() {
  146. return
  147. }
  148. var ok bool
  149. ok, err = s.dao.AddCacheOrderID(ctx, order.OrderID)
  150. if err != nil {
  151. return
  152. }
  153. // 重复消费
  154. if !ok {
  155. log.Info("handleElecOrder order: %+v, has consumed before", order)
  156. err = nil
  157. return
  158. }
  159. if order.IsHiddnRank() {
  160. log.Info("handleElecOrder order: %+v which app_id == 19", order)
  161. return
  162. }
  163. tradeInfo, err := s.dao.RawOldElecTradeInfo(ctx, order.OrderID)
  164. if err != nil {
  165. return
  166. }
  167. avID := int64(0)
  168. if tradeInfo != nil {
  169. // log.Info("RawOldElecTradeInfo data not found, order: %+v", order)
  170. if avID, err = strconv.ParseInt(tradeInfo.AVID, 10, 64); err != nil {
  171. log.Error("handleElecOrder cant convert avID from: %s, err: %+v", tradeInfo.AVID, err)
  172. avID = 0
  173. }
  174. }
  175. var (
  176. ver = monthlyBillVer(order.ParseMTime())
  177. hidden = false
  178. )
  179. // 更新DB
  180. tx, err := s.dao.BeginTranRank(ctx)
  181. if err != nil {
  182. return
  183. }
  184. rollbackFN := func() {
  185. if theErr := s.dao.DelCacheOrderID(ctx, order.OrderID); theErr != nil {
  186. log.Error("%+v", theErr)
  187. }
  188. tx.Rollback()
  189. }
  190. if avID != 0 {
  191. if err = s.dao.TXUpsertElecAVRank(ctx, tx, 0, avID, order.UPMID, order.PayMID, order.ElecNum, hidden); err != nil {
  192. rollbackFN()
  193. return
  194. }
  195. if err = s.dao.TXUpsertElecAVRank(ctx, tx, ver, avID, order.UPMID, order.PayMID, order.ElecNum, hidden); err != nil {
  196. rollbackFN()
  197. return
  198. }
  199. }
  200. if err = s.dao.TXUpsertElecUPRank(ctx, tx, 0, order.UPMID, order.PayMID, order.ElecNum, hidden); err != nil {
  201. rollbackFN()
  202. return
  203. }
  204. if err = s.dao.TXUpsertElecUPRank(ctx, tx, ver, order.UPMID, order.PayMID, order.ElecNum, hidden); err != nil {
  205. rollbackFN()
  206. return
  207. }
  208. if err = tx.Commit(); err != nil {
  209. return
  210. }
  211. err = s.dao.RankElecUpdateOrder(ctx, avID, order.UPMID, order.PayMID, ver, order.ElecNum)
  212. return
  213. }
  214. func (s *Service) handleElecUserSetting(ctx context.Context, setting *model.DBOldElecUserSetting) (err error) {
  215. if setting.Status > 0 {
  216. log.Info("handleElecUserSetting add setting: %+v", setting)
  217. err = s.dao.ElecAddSetting(ctx, model.DefaultUserSetting, setting.MID, setting.BitValue())
  218. } else {
  219. log.Info("handleElecUserSetting delete setting: %+v", setting)
  220. err = s.dao.ElecDeleteSetting(ctx, model.DefaultUserSetting, setting.MID, setting.BitValue())
  221. }
  222. if err != nil {
  223. return
  224. }
  225. // 清理缓存
  226. if err = s.dao.DelCacheUserSetting(ctx, setting.MID); err != nil {
  227. log.Error("DelCacheUserSetting: %d, err: %+v", setting.MID, err)
  228. err = nil
  229. }
  230. return
  231. }