notify.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "go-common/app/job/main/coupon/model"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. xtime "go-common/library/time"
  12. "github.com/pkg/errors"
  13. )
  14. // Notify notify.
  15. func (s *Service) Notify(c context.Context, msg *model.MsgCanal) (err error) {
  16. var (
  17. mid int64
  18. token string
  19. ok, ok1 bool
  20. ct int64
  21. couponToken, batchToken string
  22. )
  23. if strings.Contains(msg.Table, _couponTable) {
  24. if msg.Action != _updateAct {
  25. return
  26. }
  27. if mid, token, ct, ok, err = s.conventMsg(c, msg); err != nil {
  28. err = errors.WithStack(err)
  29. return
  30. }
  31. } else if msg.Table == _orderTable {
  32. if msg.Action != _insertAct {
  33. return
  34. }
  35. if mid, token, ct, ok, err = s.conventOrderMsg(c, msg); err != nil {
  36. err = errors.WithStack(err)
  37. return
  38. }
  39. } else if strings.Contains(msg.Table, _couponAllowanceTable) { // 元旦活动
  40. if msg.Action != _updateAct {
  41. return
  42. }
  43. if mid, couponToken, batchToken, ok1, err = s.conventAllowanceInfoMsg(c, msg); err != nil {
  44. err = errors.WithStack(err)
  45. return
  46. }
  47. if ok1 {
  48. if _, err = s.dao.UpdateUserCard(c, mid, model.Used, couponToken, batchToken); err != nil {
  49. err = errors.WithStack(err)
  50. return
  51. }
  52. if err = s.dao.DelPrizeCardsKey(c, mid, s.c.NewYearConf.ActID); err != nil {
  53. err = errors.WithStack(err)
  54. return
  55. }
  56. }
  57. }
  58. if !ok {
  59. return
  60. }
  61. arg := &model.NotifyParam{
  62. Mid: mid,
  63. CouponToken: token,
  64. NotifyURL: s.c.Properties.BangumiNotifyURL,
  65. Type: ct,
  66. }
  67. if err = s.CheckCouponDeliver(c, arg); err != nil {
  68. log.Error("CheckCouponDeliver fail arg(%v) err(%v)", arg, err)
  69. arg.NotifyCount++
  70. s.notifyChan <- arg
  71. return
  72. }
  73. return
  74. }
  75. func (s *Service) conventMsg(c context.Context, msg *model.MsgCanal) (mid int64, token string, ct int64, ok bool, err error) {
  76. ok = true
  77. cnew := new(model.CouponInfo)
  78. if err = json.Unmarshal(msg.New, cnew); err != nil {
  79. err = errors.WithStack(err)
  80. return
  81. }
  82. cold := new(model.CouponInfo)
  83. if err = json.Unmarshal(msg.Old, cold); err != nil {
  84. err = errors.WithStack(err)
  85. return
  86. }
  87. if cold.State != model.NotUsed || cnew.State != model.InUse {
  88. ok = false
  89. }
  90. mid = cnew.Mid
  91. token = cnew.CouponToken
  92. ct = cnew.CouponType
  93. return
  94. }
  95. func (s *Service) conventOrderMsg(c context.Context, msg *model.MsgCanal) (mid int64, token string, ct int64, ok bool, err error) {
  96. ok = true
  97. cnew := new(model.CouponOrder)
  98. if err = json.Unmarshal(msg.New, cnew); err != nil {
  99. err = errors.WithStack(err)
  100. return
  101. }
  102. if cnew.State != model.InPay {
  103. ok = false
  104. }
  105. mid = cnew.Mid
  106. token = cnew.OrderNo
  107. ct = int64(cnew.CouponType)
  108. return
  109. }
  110. func (s *Service) conventAllowanceInfoMsg(c context.Context, msg *model.MsgCanal) (mid int64, couponToken, batchToken string, ok bool, err error) {
  111. ok = true
  112. cnew := new(model.CouponAllowanceInfo)
  113. if err = json.Unmarshal(msg.New, cnew); err != nil {
  114. err = errors.WithStack(err)
  115. return
  116. }
  117. log.Info("conventAllowanceInfoMsg(%+v)", cnew)
  118. if cnew.State != model.Used || cnew.AppID != 1 || cnew.Origin != model.AllowanceBusinessNewYear {
  119. ok = false
  120. }
  121. mid = cnew.MID
  122. couponToken = cnew.CouponToken
  123. batchToken = cnew.BatchToken
  124. return
  125. }
  126. //CheckCouponDeliver check coupon deliver
  127. func (s *Service) CheckCouponDeliver(c context.Context, arg *model.NotifyParam) (err error) {
  128. switch arg.Type {
  129. case model.BangumiVideo:
  130. err = s.CouponDeliver(c, arg)
  131. case model.Cartoon:
  132. err = s.CouponCartoonDeliver(c, arg)
  133. }
  134. return
  135. }
  136. // CouponDeliver def.
  137. func (s *Service) CouponDeliver(c context.Context, arg *model.NotifyParam) (err error) {
  138. var (
  139. data *model.CallBackRet
  140. cp *model.CouponInfo
  141. nstate int8
  142. )
  143. if cp, err = s.dao.CouponInfo(c, arg.Mid, arg.CouponToken); err != nil {
  144. err = errors.WithStack(err)
  145. return
  146. }
  147. if cp == nil {
  148. log.Warn("notify coupon is nil(%v)", arg)
  149. return
  150. }
  151. if cp.State != model.InUse {
  152. log.Warn("notify coupon had deal with(%v)", arg)
  153. return
  154. }
  155. if data, err = s.dao.NotifyRet(c, arg.NotifyURL, cp.CouponToken, cp.OrderNO, "127.0.0.1"); err != nil {
  156. err = errors.WithStack(err)
  157. return
  158. }
  159. if data.Ver == cp.UseVer {
  160. err = fmt.Errorf("coupon ver not change resp(%v) db(%v)", data, cp)
  161. return
  162. }
  163. switch data.IsPaid {
  164. case model.PaidSuccess:
  165. nstate = model.Used
  166. case model.Unpaid:
  167. nstate = model.NotUsed
  168. default:
  169. log.Warn("state not found resp(%v) db(%v)", data, cp)
  170. return
  171. }
  172. log.Info("update coupon state(%s,%d,%d,%d,%d)", cp.CouponToken, cp.Mid, nstate, data.Ver, cp.Ver)
  173. if err = s.updateCouponState(c, cp, nstate, data); err != nil {
  174. log.Error("updateCouponState fail %+v", err)
  175. return
  176. }
  177. return
  178. }
  179. func (s *Service) updateCouponState(c context.Context, cp *model.CouponInfo, nstate int8, data *model.CallBackRet) (err error) {
  180. var (
  181. tx *sql.Tx
  182. aff int64
  183. )
  184. if tx, err = s.dao.BeginTran(c); err != nil {
  185. err = errors.WithStack(err)
  186. return
  187. }
  188. defer func() {
  189. if err != nil {
  190. if err1 := tx.Rollback(); err1 != nil {
  191. log.Error("tx.Rollback %+v", err)
  192. }
  193. return
  194. }
  195. if err = tx.Commit(); err != nil {
  196. log.Error("tx.Commit %+v", err)
  197. }
  198. }()
  199. if aff, err = s.dao.UpdateCoupon(c, tx, cp.Mid, nstate, data.Ver, cp.Ver, cp.CouponToken); err != nil {
  200. err = errors.WithStack(err)
  201. return
  202. }
  203. if aff != 1 {
  204. err = fmt.Errorf("coupon deal fail (%v) db(%v)", data, cp)
  205. return
  206. }
  207. l := &model.CouponChangeLog{}
  208. l.CouponToken = cp.CouponToken
  209. l.Mid = cp.Mid
  210. l.State = nstate
  211. l.Ctime = xtime.Time(time.Now().Unix())
  212. if _, err = s.dao.InsertPointHistory(c, tx, l); err != nil {
  213. err = errors.WithStack(err)
  214. return
  215. }
  216. s.dao.DelCouponsCache(c, cp.Mid, int8(cp.CouponType))
  217. return
  218. }
  219. // CouponCartoonDeliver coupon cartoon deliver def.
  220. func (s *Service) CouponCartoonDeliver(c context.Context, arg *model.NotifyParam) (err error) {
  221. var (
  222. data *model.CallBackRet
  223. o *model.CouponOrder
  224. nstate int8
  225. )
  226. if o, err = s.dao.ByOrderNo(c, arg.CouponToken); err != nil {
  227. err = errors.WithStack(err)
  228. return
  229. }
  230. if o == nil {
  231. log.Warn("notify coupon order is nil(%v)", arg)
  232. return
  233. }
  234. if o.State != model.InPay {
  235. log.Warn("notify coupon order had deal with(%v)", arg)
  236. return
  237. }
  238. if data, err = s.dao.NotifyRet(c, arg.NotifyURL, o.OrderNo, o.ThirdTradeNo, "127.0.0.1"); err != nil {
  239. err = errors.WithStack(err)
  240. return
  241. }
  242. if data.Ver == o.UseVer {
  243. err = fmt.Errorf("coupon order ver not change resp(%v) db(%v)", data, o)
  244. return
  245. }
  246. switch data.IsPaid {
  247. case model.PaidSuccess:
  248. nstate = model.PaySuccess
  249. case model.Unpaid:
  250. nstate = model.PayFaild
  251. default:
  252. log.Warn("order state not found resp(%v) db(%v)", data, o)
  253. return
  254. }
  255. log.Info("update coupon order state(%s,%d,%d,%d,%d)", o.OrderNo, o.Mid, nstate, data.Ver, o.UseVer)
  256. if err = s.UpdateOrderState(c, o, nstate, data); err != nil {
  257. log.Error("updateCouponState fail %+v", err)
  258. return
  259. }
  260. return
  261. }
  262. // UpdateOrderState update order state.
  263. func (s *Service) UpdateOrderState(c context.Context, o *model.CouponOrder, nstate int8, data *model.CallBackRet) (err error) {
  264. var (
  265. tx *sql.Tx
  266. aff int64
  267. ls []*model.CouponBalanceChangeLog
  268. )
  269. if tx, err = s.dao.BeginTran(c); err != nil {
  270. err = errors.WithStack(err)
  271. return
  272. }
  273. defer func() {
  274. if err != nil {
  275. if err1 := tx.Rollback(); err1 != nil {
  276. log.Error("tx.Rollback %+v", err)
  277. }
  278. return
  279. }
  280. if err = tx.Commit(); err != nil {
  281. log.Error("tx.Commit %+v", err)
  282. }
  283. }()
  284. if aff, err = s.dao.UpdateOrderState(c, tx, o.Mid, nstate, data.Ver, o.Ver, o.OrderNo); err != nil {
  285. err = errors.WithStack(err)
  286. return
  287. }
  288. if aff != 1 {
  289. err = fmt.Errorf("coupon order deal fail (%v) db(%v)", data, o)
  290. return
  291. }
  292. // add order log.
  293. ol := new(model.CouponOrderLog)
  294. ol.OrderNo = o.OrderNo
  295. ol.Mid = o.Mid
  296. ol.State = nstate
  297. ol.Ctime = xtime.Time(time.Now().Unix())
  298. if _, err = s.dao.AddOrderLog(c, tx, ol); err != nil {
  299. err = errors.WithStack(err)
  300. return
  301. }
  302. if nstate == model.PayFaild {
  303. // coupon back to user
  304. if ls, err = s.dao.ConsumeCouponLog(c, o.Mid, o.OrderNo, model.Consume); err != nil {
  305. err = errors.WithStack(err)
  306. return
  307. }
  308. if len(ls) == 0 {
  309. err = fmt.Errorf("ConsumeCouponLog not found (mid:%d,orderNo:%s)", o.Mid, o.OrderNo)
  310. return
  311. }
  312. if err = s.UpdateBalance(c, tx, o.Mid, o.CouponType, ls, o.OrderNo); err != nil {
  313. err = errors.WithStack(err)
  314. return
  315. }
  316. }
  317. return
  318. }
  319. // UpdateBalance update user balance.
  320. func (s *Service) UpdateBalance(c context.Context, tx *sql.Tx, mid int64, ct int8, ls []*model.CouponBalanceChangeLog, orderNo string) (err error) {
  321. var (
  322. now = time.Now()
  323. bs []*model.CouponBalanceInfo
  324. aff int64
  325. usebs []*model.CouponBalanceInfo
  326. blogs []*model.CouponBalanceChangeLog
  327. )
  328. if bs, err = s.dao.BlanceList(c, mid, ct); err != nil {
  329. err = errors.WithStack(err)
  330. return
  331. }
  332. if len(bs) == 0 {
  333. err = fmt.Errorf("coupon balance not found (mid:%d ct:%d)", mid, ct)
  334. return
  335. }
  336. for _, ob := range bs {
  337. for _, l := range ls {
  338. if ob.BatchToken == l.BatchToken {
  339. b := new(model.CouponBalanceInfo)
  340. b.ID = ob.ID
  341. b.Ver = ob.Ver
  342. b.Balance = ob.Balance - l.ChangeBalance
  343. usebs = append(usebs, b)
  344. blog := new(model.CouponBalanceChangeLog)
  345. blog.OrderNo = orderNo
  346. blog.Mid = mid
  347. blog.BatchToken = ob.BatchToken
  348. blog.ChangeType = model.ConsumeFaildBack
  349. blog.Ctime = xtime.Time(now.Unix())
  350. blog.Balance = b.Balance
  351. blog.ChangeBalance = -l.ChangeBalance
  352. blogs = append(blogs, blog)
  353. }
  354. }
  355. }
  356. if len(ls) != len(usebs) {
  357. err = fmt.Errorf("coupon balance not found (mid:%d len(ls):%d) len(usebs):%d", mid, len(ls), len(usebs))
  358. return
  359. }
  360. if len(usebs) == 1 {
  361. b := usebs[0]
  362. if aff, err = s.dao.UpdateBlance(c, tx, b.ID, mid, b.Ver, b.Balance); err != nil {
  363. err = errors.WithStack(err)
  364. return
  365. }
  366. } else {
  367. if aff, err = s.dao.BatchUpdateBlance(c, tx, mid, usebs); err != nil {
  368. err = errors.WithStack(err)
  369. return
  370. }
  371. }
  372. if int(aff) != len(usebs) {
  373. err = fmt.Errorf("coupon balance back faild mid(%d) order(%s)", mid, orderNo)
  374. return
  375. }
  376. if _, err = s.dao.BatchInsertBlanceLog(c, tx, mid, blogs); err != nil {
  377. err = errors.WithStack(err)
  378. return
  379. }
  380. s.dao.DelCouponBalancesCache(c, mid, ct)
  381. return
  382. }
  383. // CheckInUseCoupon check inuse coupon.
  384. func (s *Service) CheckInUseCoupon() {
  385. var (
  386. c = context.TODO()
  387. cps []*model.CouponInfo
  388. t = time.Now().AddDate(0, 0, -1)
  389. err error
  390. )
  391. log.Info("check inuse coupon job start")
  392. for i := 0; i < 100; i++ {
  393. if cps, err = s.dao.CouponList(c, int64(i), model.InUse, t); err != nil {
  394. log.Error("query coupon list(%d,%v) err(%v)", i, t, err)
  395. return
  396. }
  397. log.Info("check inuse coupon job ing size(%d)", len(cps))
  398. for _, v := range cps {
  399. var notifyURL string
  400. if v.CouponType == model.BangumiVideo {
  401. notifyURL = s.c.Properties.BangumiNotifyURL
  402. }
  403. if len(notifyURL) == 0 {
  404. continue
  405. }
  406. // point callback.
  407. arg := &model.NotifyParam{
  408. Mid: v.Mid,
  409. CouponToken: v.CouponToken,
  410. NotifyURL: notifyURL,
  411. Type: v.CouponType,
  412. }
  413. if err = s.CheckCouponDeliver(c, arg); err != nil {
  414. log.Error("CheckCouponDeliver fail arg(%v) err(%v)", arg, err)
  415. continue
  416. }
  417. }
  418. time.Sleep(time.Second * 1)
  419. }
  420. log.Info("check inuse coupon job start")
  421. }
  422. // CheckOrderInPayCoupon check order inuse coupon.
  423. func (s *Service) CheckOrderInPayCoupon() {
  424. var (
  425. c = context.TODO()
  426. cps []*model.CouponOrder
  427. t = time.Now().AddDate(0, 0, -1)
  428. err error
  429. )
  430. log.Info("check inuse coupon order job start")
  431. if cps, err = s.dao.OrderInPay(c, model.InPay, t); err != nil {
  432. log.Error("query coupon order list(%d,%v) err(%v)", model.InPay, t, err)
  433. return
  434. }
  435. log.Info("check inuse coupon order job ing size(%d)", len(cps))
  436. for _, v := range cps {
  437. var notifyURL string
  438. if v.CouponType == model.Cartoon {
  439. notifyURL = s.c.Properties.BangumiNotifyURL
  440. }
  441. if len(notifyURL) == 0 {
  442. continue
  443. }
  444. // point callback.
  445. arg := &model.NotifyParam{
  446. Mid: v.Mid,
  447. CouponToken: v.OrderNo,
  448. NotifyURL: notifyURL,
  449. Type: int64(v.CouponType),
  450. }
  451. if err = s.CheckCouponDeliver(c, arg); err != nil {
  452. log.Error("CheckCouponDeliver order fail arg(%v) err(%v)", arg, err)
  453. continue
  454. }
  455. }
  456. log.Info("check inuse coupon order job start")
  457. }
  458. // ByOrderNo by order no.
  459. func (s *Service) ByOrderNo(c context.Context, orderNo string) (o *model.CouponOrder, err error) {
  460. if o, err = s.dao.ByOrderNo(c, orderNo); err != nil {
  461. err = errors.WithStack(err)
  462. }
  463. return
  464. }