vip.go 14 KB


  1. package service
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/hex"
  6. "fmt"
  7. "time"
  8. "go-common/app/job/main/vip/model"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "go-common/library/sync/errgroup"
  12. xtime "go-common/library/time"
  13. "github.com/pkg/errors"
  14. )
  15. const (
  16. iapChannelID = 100
  17. )
  18. func (s *Service) cleanCacheAndNotify(c context.Context, hv *model.HandlerVip) (err error) {
  19. s.dao.DelInfoCache(c, hv.Mid)
  20. if err = s.dao.SendCleanCache(c, hv); err != nil {
  21. return
  22. }
  23. if err = s.dao.DelVipInfoCache(c, int64(hv.Mid)); err != nil {
  24. log.Error("del vip info cache (mid:%v) error(%+v)", hv.Mid, err)
  25. return
  26. }
  27. eg, ec := errgroup.WithContext(c)
  28. for _, app := range s.appMap {
  29. ta := app
  30. eg.Go(func() error {
  31. if err = s.dao.SendAppCleanCache(ec, hv, ta); err == nil {
  32. log.Info("SendAppCleanCache success hv(%v) app(%v)", hv, ta)
  33. } else {
  34. ac := new(model.AppCache)
  35. ac.AppID = ta.ID
  36. ac.Mid = hv.Mid
  37. s.cleanAppCache <- ac
  38. }
  39. return nil
  40. })
  41. }
  42. if err = eg.Wait(); err != nil {
  43. log.Error(" eg.Wait err(%+v)", err)
  44. }
  45. err = nil
  46. return
  47. }
  48. //ScanUserInfo scan all userinfo update status
  49. func (s *Service) ScanUserInfo(c context.Context) (err error) {
  50. var (
  51. ot = time.Now().Format("2006-01-02 15:04:05")
  52. userInfos []*model.VipUserInfo
  53. size = 2000
  54. endID = 0
  55. )
  56. for {
  57. if endID, err = s.dao.SelOldUserInfoMaxID(context.TODO()); err != nil {
  58. time.Sleep(time.Minute * 2)
  59. continue
  60. }
  61. break
  62. }
  63. page := endID / size
  64. if endID%size != 0 {
  65. page++
  66. }
  67. for i := 0; i < page; {
  68. startID := i * size
  69. eID := (i + 1) * size
  70. if userInfos, err = s.dao.SelVipList(context.TODO(), startID, eID, ot); err != nil {
  71. time.Sleep(time.Second * 5)
  72. continue
  73. }
  74. i++
  75. for _, v := range userInfos {
  76. s.updateUserInfo(context.TODO(), v)
  77. }
  78. }
  79. return
  80. }
  81. func (s *Service) updateUserInfo(c context.Context, v *model.VipUserInfo) (err error) {
  82. var (
  83. curTime = time.Now()
  84. fType = v.Type
  85. fStatus = v.Status
  86. )
  87. if v.AnnualVipOverdueTime.Time().Before(curTime) {
  88. fType = model.Vip
  89. }
  90. if v.OverdueTime.Time().Before(curTime) {
  91. fStatus = model.VipStatusOverTime
  92. }
  93. if fType != v.Type || fStatus != v.Status {
  94. v.Type = fType
  95. v.Status = fStatus
  96. if v.Status == model.VipStatusOverTime && v.PayChannelID == iapChannelID {
  97. v.PayType = model.Normal
  98. }
  99. if _, err = s.dao.UpdateVipUser(c, int64(v.Mid), v.Status, v.Type, v.PayType); err != nil {
  100. return
  101. }
  102. s.dao.DelInfoCache(c, v.Mid)
  103. s.dao.DelVipInfoCache(c, int64(v.Mid))
  104. }
  105. return
  106. }
  107. func (s *Service) handlerautorenewlogproc() {
  108. var (
  109. err error
  110. )
  111. defer func() {
  112. if x := recover(); x != nil {
  113. log.Error("service.handlerautorenewlogproc panic(%v)", x)
  114. go s.handlerautorenewlogproc()
  115. log.Info("service.handlerautorenewlogproc recover")
  116. }
  117. }()
  118. for {
  119. user := <-s.handlerAutoRenewLog
  120. for i := 0; i <= s.c.Property.Retry; i++ {
  121. if err = s.handlerAutoRenewLogInfo(context.TODO(), user); err == nil {
  122. break
  123. }
  124. log.Error("%+v", err)
  125. time.Sleep(2 * time.Second)
  126. }
  127. }
  128. }
  129. func (s *Service) handlerAutoRenewLogInfo(c context.Context, user *model.VipUserInfo) (err error) {
  130. var (
  131. payOrder *model.VipPayOrder
  132. paylog *model.VipPayOrderLog
  133. rlog *model.VipPayOrderLog
  134. )
  135. if user.PayType == model.AutoRenew {
  136. if user.PayChannelID == iapChannelID {
  137. if payOrder, err = s.dao.SelPayOrderByMid(c, user.Mid, model.IAPAutoRenew, model.SUCCESS); err != nil {
  138. err = errors.WithStack(err)
  139. return
  140. }
  141. if payOrder == nil {
  142. err = errors.Errorf("订单号不能为空......")
  143. return
  144. }
  145. rlog = new(model.VipPayOrderLog)
  146. rlog.Mid = payOrder.Mid
  147. rlog.OrderNo = payOrder.OrderNo
  148. rlog.Status = model.SIGN
  149. } else {
  150. if payOrder, err = s.dao.SelPayOrderByMid(c, user.Mid, model.AutoRenew, model.SUCCESS); err != nil {
  151. err = errors.WithStack(err)
  152. return
  153. }
  154. if payOrder == nil {
  155. err = errors.Errorf("订单号不能为空......")
  156. return
  157. }
  158. rlog = new(model.VipPayOrderLog)
  159. rlog.Mid = payOrder.Mid
  160. rlog.OrderNo = payOrder.OrderNo
  161. rlog.Status = model.SIGN
  162. }
  163. } else {
  164. if paylog, err = s.dao.SelPayOrderLog(c, user.Mid, model.SIGN); err != nil {
  165. err = errors.WithStack(err)
  166. return
  167. }
  168. rlog = new(model.VipPayOrderLog)
  169. rlog.Mid = paylog.Mid
  170. rlog.Status = model.UNSIGN
  171. rlog.OrderNo = paylog.OrderNo
  172. }
  173. if rlog != nil {
  174. if _, err = s.dao.AddPayOrderLog(c, rlog); err != nil {
  175. err = errors.WithStack(err)
  176. return
  177. }
  178. }
  179. return
  180. }
  181. func (s *Service) handlerinsertuserinfoproc() {
  182. var (
  183. err error
  184. )
  185. defer func() {
  186. if x := recover(); x != nil {
  187. log.Error("service.handlerinsertuserinfoproc panic(%v)", x)
  188. go s.handlerinsertuserinfoproc()
  189. log.Info("service.handlerinsertuserinfoproc recover")
  190. }
  191. }()
  192. for {
  193. userInfo := <-s.handlerInsertUserInfo
  194. for i := 0; i < s.c.Property.Retry; i++ {
  195. if err = s.addUserInfo(context.TODO(), userInfo); err == nil {
  196. s.dao.DelInfoCache(context.Background(), userInfo.Mid)
  197. s.dao.DelVipInfoCache(context.TODO(), userInfo.Mid)
  198. if s.grayScope(userInfo.Mid) {
  199. s.cleanCache(userInfo.Mid)
  200. }
  201. break
  202. }
  203. log.Error("add info error(%+v)", err)
  204. }
  205. }
  206. }
  207. func (s *Service) addUserInfo(c context.Context, ui *model.VipUserInfo) (err error) {
  208. var (
  209. tx *sql.Tx
  210. udh *model.VipUserDiscountHistory
  211. )
  212. if tx, err = s.dao.StartTx(c); err != nil {
  213. return
  214. }
  215. defer func() {
  216. if err == nil {
  217. if err = tx.Commit(); err != nil {
  218. log.Error("commit(%+v)", err)
  219. return
  220. }
  221. } else {
  222. tx.Rollback()
  223. }
  224. }()
  225. if _, err = s.dao.AddUserInfo(tx, ui); err != nil {
  226. err = errors.WithStack(err)
  227. return
  228. }
  229. if ui.AutoRenewed == 1 {
  230. udh = new(model.VipUserDiscountHistory)
  231. udh.DiscountID = model.VipUserFirstDiscount
  232. udh.Status = model.DiscountUsed
  233. udh.Mid = ui.Mid
  234. if _, err = s.dao.DupUserDiscountHistory(tx, udh); err != nil {
  235. err = errors.WithStack(err)
  236. return
  237. }
  238. }
  239. return
  240. }
  241. func (s *Service) updateVipUserInfo(c context.Context, ui *model.VipUserInfo) (err error) {
  242. var (
  243. tx *sql.Tx
  244. udh *model.VipUserDiscountHistory
  245. eff int64
  246. )
  247. if tx, err = s.dao.StartTx(c); err != nil {
  248. return
  249. }
  250. defer func() {
  251. if err == nil {
  252. if err = tx.Commit(); err != nil {
  253. log.Error("commit(%+v)", err)
  254. return
  255. }
  256. } else {
  257. tx.Rollback()
  258. }
  259. }()
  260. if eff, err = s.dao.UpdateUserInfo(tx, ui); err != nil {
  261. err = errors.WithStack(err)
  262. return
  263. }
  264. if eff <= 0 {
  265. log.Warn("update vip RowsAffected 0 vip(%+v)", ui)
  266. return
  267. }
  268. if ui.AutoRenewed == 1 {
  269. udh = new(model.VipUserDiscountHistory)
  270. udh.DiscountID = model.VipUserFirstDiscount
  271. udh.Status = model.DiscountUsed
  272. udh.Mid = ui.Mid
  273. if _, err = s.dao.DupUserDiscountHistory(tx, udh); err != nil {
  274. err = errors.WithStack(err)
  275. return
  276. }
  277. }
  278. return
  279. }
  280. func (s *Service) handlerfailuserinfoproc() {
  281. var (
  282. err error
  283. )
  284. defer func() {
  285. if x := recover(); x != nil {
  286. log.Error("service.handlerfailuserinfoproc panic(%v)", x)
  287. go s.handlerfailuserinfoproc()
  288. log.Info("service.handlerfailuserinfoproc recover")
  289. }
  290. }()
  291. for {
  292. userInfo := <-s.handlerFailUserInfo
  293. _time := 0
  294. for {
  295. if err = s.updateVipUserInfo(context.TODO(), userInfo); err == nil {
  296. s.dao.DelInfoCache(context.Background(), userInfo.Mid)
  297. s.dao.DelVipInfoCache(context.TODO(), userInfo.Mid)
  298. if s.grayScope(userInfo.Mid) {
  299. s.cleanCache(userInfo.Mid)
  300. }
  301. break
  302. }
  303. log.Error("info error(%+v)", err)
  304. _time++
  305. if _time > _maxtime {
  306. break
  307. }
  308. time.Sleep(_sleep)
  309. }
  310. }
  311. }
  312. func (s *Service) handlerupdateuserinfoproc() {
  313. var (
  314. err error
  315. flag bool
  316. )
  317. defer func() {
  318. if x := recover(); x != nil {
  319. log.Error("service.handlerupdateuserinfoproc panic(%v)", x)
  320. go s.handlerupdateuserinfoproc()
  321. log.Info("service.handlerupdateuserinfoproc recover")
  322. }
  323. }()
  324. for {
  325. userInfo := <-s.handlerUpdateUserInfo
  326. flag = true
  327. for i := 0; i < s.c.Property.Retry; i++ {
  328. if err = s.updateVipUserInfo(context.TODO(), userInfo); err == nil {
  329. s.dao.DelInfoCache(context.Background(), userInfo.Mid)
  330. s.dao.DelVipInfoCache(context.TODO(), userInfo.Mid)
  331. if s.grayScope(userInfo.Mid) {
  332. s.cleanCache(userInfo.Mid)
  333. }
  334. flag = false
  335. break
  336. }
  337. log.Error("info error(%+v)", err)
  338. }
  339. if flag {
  340. s.handlerFailUserInfo <- userInfo
  341. }
  342. }
  343. }
  344. func (s *Service) handleraddchangehistoryproc() {
  345. defer func() {
  346. if x := recover(); x != nil {
  347. log.Error("service.handleraddchangehistoryproc panic(%v)", x)
  348. go s.handleraddchangehistoryproc()
  349. log.Info("service.handleraddchangehistoryproc recover")
  350. }
  351. }()
  352. for {
  353. msg := <-s.handlerAddVipHistory
  354. history := convertMsgToHistory(msg)
  355. var res []*model.VipChangeHistory
  356. res = append(res, history)
  357. for i := 0; i < s.c.Property.Retry; i++ {
  358. if err := s.dao.AddChangeHistoryBatch(res); err == nil {
  359. break
  360. }
  361. }
  362. }
  363. }
  364. func convertMsgToHistory(msg *model.VipChangeHistoryMsg) (r *model.VipChangeHistory) {
  365. r = new(model.VipChangeHistory)
  366. r.Mid = msg.Mid
  367. r.Days = msg.Days
  368. r.Month = msg.Month
  369. r.ChangeType = msg.ChangeType
  370. r.OperatorID = msg.OperatorID
  371. r.RelationID = msg.RelationID
  372. r.BatchID = msg.BatchID
  373. r.Remark = msg.Remark
  374. r.ChangeTime = xtime.Time(parseTime(msg.ChangeTime).Unix())
  375. r.BatchCodeID = msg.BatchCodeID
  376. return
  377. }
  378. func parseTime(timeStr string) (t time.Time) {
  379. var err error
  380. if t, err = time.ParseInLocation("2006-01-02 15:04:05", timeStr, time.Local); err != nil {
  381. t = time.Now()
  382. }
  383. return
  384. }
  385. func convertMsgToUserInfo(msg *model.VipUserInfoMsg) (r *model.VipUserInfo) {
  386. r = new(model.VipUserInfo)
  387. r.AnnualVipOverdueTime = xtime.Time(parseTime(msg.AnnualVipOverdueTime).Unix())
  388. r.Mid = msg.Mid
  389. r.OverdueTime = xtime.Time(parseTime(msg.OverdueTime).Unix())
  390. r.PayType = msg.IsAutoRenew
  391. r.RecentTime = xtime.Time(parseTime(msg.RecentTime).Unix())
  392. r.StartTime = xtime.Time(parseTime(msg.StartTime).Unix())
  393. r.Status = msg.Status
  394. r.Type = msg.Type
  395. r.PayChannelID = msg.PayChannelID
  396. r.AutoRenewed = msg.AutoRenewed
  397. r.IosOverdueTime = xtime.Time(parseTime(msg.IosOverdueTime).Unix())
  398. r.Ver = msg.Ver
  399. return
  400. }
  401. func convertUserInfoByNewMsg(msg *model.VipUserInfoNewMsg) (r *model.VipUserInfo) {
  402. r = new(model.VipUserInfo)
  403. r.AnnualVipOverdueTime = xtime.Time(parseTime(msg.AnnualVipOverdueTime).Unix())
  404. r.Mid = msg.Mid
  405. r.OverdueTime = xtime.Time(parseTime(msg.VipOverdueTime).Unix())
  406. r.PayType = msg.VipPayType
  407. r.RecentTime = xtime.Time(parseTime(msg.VipRecentTime).Unix())
  408. r.StartTime = xtime.Time(parseTime(msg.VipStartTime).Unix())
  409. r.Status = msg.VipStatus
  410. r.Type = msg.VipType
  411. r.PayChannelID = msg.PayChannelID
  412. r.IosOverdueTime = xtime.Time(parseTime(msg.IosOverdueTime).Unix())
  413. r.Ver = msg.Ver
  414. return
  415. }
  416. func convertOldToNew(old *model.VipUserInfoOld) (r *model.VipUserInfo) {
  417. r = new(model.VipUserInfo)
  418. r.AnnualVipOverdueTime = old.AnnualVipOverdueTime
  419. r.Mid = old.Mid
  420. r.OverdueTime = old.OverdueTime
  421. r.PayType = old.IsAutoRenew
  422. r.RecentTime = old.RecentTime
  423. r.PayChannelID = old.PayChannelID
  424. if old.RecentTime.Time().Unix() < 0 {
  425. r.RecentTime = xtime.Time(1451577600)
  426. }
  427. r.StartTime = old.StartTime
  428. r.Status = old.Status
  429. r.Type = old.Type
  430. r.IosOverdueTime = old.IosOverdueTime
  431. r.Ver = old.Ver
  432. return
  433. }
  434. //HandlerVipChangeHistory handler sync change history data
  435. func (s *Service) HandlerVipChangeHistory() (err error) {
  436. var (
  437. newMaxID int64
  438. oldMaxID int64
  439. size = int64(s.c.Property.BatchSize)
  440. startID int64
  441. endID = size
  442. exitMap = make(map[string]int)
  443. )
  444. if oldMaxID, err = s.dao.SelOldChangeHistoryMaxID(context.TODO()); err != nil {
  445. log.Error("selOldChangeHistory error(%+v)", err)
  446. return
  447. }
  448. if newMaxID, err = s.dao.SelChangeHistoryMaxID(context.TODO()); err != nil {
  449. log.Error("selChangeHistoryMaxID error(%+v)", err)
  450. return
  451. }
  452. page := newMaxID / size
  453. if newMaxID%size != 0 {
  454. page++
  455. }
  456. for i := 0; i < int(page); i++ {
  457. startID = int64(i) * size
  458. endID = int64((i + 1)) * size
  459. if endID > newMaxID {
  460. endID = newMaxID
  461. }
  462. var res []*model.VipChangeHistory
  463. if res, err = s.dao.SelChangeHistory(context.TODO(), startID, endID); err != nil {
  464. log.Error("selChangeHistory(startID:%v endID:%v) error(%+v)", startID, endID, endID)
  465. return
  466. }
  467. for _, v := range res {
  468. exitMap[s.madeChangeHistoryMD5(v)] = 1
  469. }
  470. }
  471. page = oldMaxID / size
  472. if oldMaxID%size != 0 {
  473. page++
  474. }
  475. var batch []*model.VipChangeHistory
  476. for i := 0; i < int(page); i++ {
  477. startID = int64(i) * size
  478. endID = int64(i+1) * size
  479. if endID > oldMaxID {
  480. endID = oldMaxID
  481. }
  482. var res []*model.VipChangeHistory
  483. if res, err = s.dao.SelOldChangeHistory(context.TODO(), startID, endID); err != nil {
  484. log.Error("sel old change history (startID:%v endID:%v) error(%+v)", startID, endID, err)
  485. return
  486. }
  487. for _, v := range res {
  488. v.Days = s.calcDay(v)
  489. madeMD5 := s.madeChangeHistoryMD5(v)
  490. if exitMap[madeMD5] == 0 {
  491. batch = append(batch, v)
  492. }
  493. }
  494. if err = s.dao.AddChangeHistoryBatch(batch); err != nil {
  495. log.Error("add change history batch(%+v) error(%+v)", batch, err)
  496. return
  497. }
  498. batch = nil
  499. }
  500. return
  501. }
  502. func (s *Service) calcDay(r *model.VipChangeHistory) int32 {
  503. if r.Month != 0 {
  504. year := r.Month / 12
  505. month := r.Month % 12
  506. return int32(year)*model.VipDaysYear + int32(month)*model.VipDaysMonth
  507. }
  508. return r.Days
  509. }
  510. func (s *Service) madeChangeHistoryMD5(r *model.VipChangeHistory) string {
  511. str := fmt.Sprintf("%v,%v,%v,%v,%v,%v,%v,%v,%v", r.Mid, r.Remark, r.BatchID, r.RelationID, r.OperatorID, r.Days, r.ChangeTime.Time().Format("2006-01-02 15:04:05"), r.ChangeType, r.BatchCodeID)
  512. b := []byte(str)
  513. hash := md5.New()
  514. hash.Write(b)
  515. sum := hash.Sum(nil)
  516. return hex.EncodeToString(sum)
  517. }
  518. //SyncUserInfoByMid sync user by mid.
  519. func (s *Service) SyncUserInfoByMid(c context.Context, mid int64) (err error) {
  520. var (
  521. old *model.VipUserInfoOld
  522. user *model.VipUserInfo
  523. )
  524. if old, err = s.dao.OldVipInfo(c, mid); err != nil {
  525. err = errors.WithStack(err)
  526. return
  527. }
  528. if user, err = s.dao.SelVipUserInfo(c, mid); err != nil {
  529. err = errors.WithStack(err)
  530. return
  531. }
  532. r := convertOldToNew(old)
  533. r.OldVer = user.Ver
  534. if err = s.updateVipUserInfo(c, r); err != nil {
  535. err = errors.WithStack(err)
  536. return
  537. }
  538. // clear cache.
  539. s.cleanVipRetry(mid)
  540. return
  541. }
  542. // ClearUserCache clear user cache.
  543. func (s *Service) ClearUserCache(mid int64) {
  544. s.cleanVipRetry(mid)
  545. }
  546. // ClearUserCache clear user cache.
  547. func (s *Service) grayScope(mid int64) bool {
  548. return mid%10000 < s.c.Property.GrayScope
  549. }