service.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. xlog "log"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "go-common/app/job/main/vip/conf"
  10. "go-common/app/job/main/vip/dao"
  11. "go-common/app/job/main/vip/model"
  12. couponrpc "go-common/app/service/main/coupon/rpc/client"
  13. v1 "go-common/app/service/main/vip/api"
  14. "go-common/library/log"
  15. "go-common/library/queue/databus"
  16. "github.com/pkg/errors"
  17. "github.com/robfig/cron"
  18. )
  19. const (
  20. _tableUserInfo = "vip_user_info"
  21. _tablePayOrder = "vip_pay_order"
  22. _insertAction = "insert"
  23. _updateAction = "update"
  24. _deleteAction = "delete"
  25. notifyAction = "updateVip"
  26. _ps = 50
  27. _defsleepmsec = 100
  28. )
  29. //Service vip service
  30. type Service struct {
  31. dao *dao.Dao
  32. c *conf.Config
  33. //vipRPC *client.Service
  34. reducePayOrder map[string]*model.VipPayOrder
  35. appMap map[int64]*model.VipAppInfo
  36. confMap map[string]*model.VipConfig
  37. cleanVipCache chan int64
  38. cleanAppCache chan *model.AppCache
  39. ds *databus.Databus
  40. handlerFailPayOrder chan *model.VipPayOrder
  41. handlerFailUserInfo chan *model.VipUserInfo
  42. handlerFailRechargeOrder chan *model.VipPayOrder
  43. handlerFailVipbuy chan *model.VipBuyResq
  44. handlerInsertOrder chan *model.VipPayOrder
  45. handlerUpdateOrder chan *model.VipPayOrder
  46. handlerRechargeOrder chan *model.VipPayOrder
  47. handlerInsertUserInfo chan *model.VipUserInfo
  48. handlerUpdateUserInfo chan *model.VipUserInfo
  49. handlerStationActive chan *model.VipPayOrder
  50. handlerAutoRenewLog chan *model.VipUserInfo
  51. handlerAddVipHistory chan *model.VipChangeHistoryMsg
  52. handlerAddBcoinSalary chan *model.VipBcoinSalaryMsg
  53. handlerUpdateBcoinSalary chan *model.VipBcoinSalaryMsg
  54. handlerDelBcoinSalary chan *model.VipBcoinSalaryMsg
  55. notifycouponchan chan func()
  56. accLogin *databus.Databus
  57. frozenDate time.Duration
  58. newVipDatabus *databus.Databus
  59. salaryCoupnDatabus *databus.Databus
  60. accountNoitfyDatabus *databus.Databus
  61. couponNotifyDatabus *databus.Databus
  62. autoRenewdDatabus *databus.Databus
  63. // waiter
  64. waiter sync.WaitGroup
  65. closed bool
  66. sendmsgchan chan func()
  67. couponRPC *couponrpc.Service
  68. // vip service
  69. vipgRPC v1.VipClient
  70. }
  71. //New new service
  72. func New(c *conf.Config) (s *Service) {
  73. s = &Service{
  74. c: c,
  75. dao: dao.New(c),
  76. //vipRPC: client.New(c.VipRPC),
  77. cleanVipCache: make(chan int64, 10240),
  78. confMap: make(map[string]*model.VipConfig),
  79. cleanAppCache: make(chan *model.AppCache, 10240),
  80. handlerFailPayOrder: make(chan *model.VipPayOrder, 10240),
  81. handlerFailUserInfo: make(chan *model.VipUserInfo, 10240),
  82. handlerFailRechargeOrder: make(chan *model.VipPayOrder, 10240),
  83. handlerInsertOrder: make(chan *model.VipPayOrder, 10240),
  84. handlerRechargeOrder: make(chan *model.VipPayOrder, 10240),
  85. handlerUpdateOrder: make(chan *model.VipPayOrder, 10240),
  86. handlerInsertUserInfo: make(chan *model.VipUserInfo, 10240),
  87. handlerUpdateUserInfo: make(chan *model.VipUserInfo, 10240),
  88. handlerStationActive: make(chan *model.VipPayOrder, 10240),
  89. handlerAutoRenewLog: make(chan *model.VipUserInfo, 10240),
  90. handlerAddVipHistory: make(chan *model.VipChangeHistoryMsg, 10240),
  91. handlerAddBcoinSalary: make(chan *model.VipBcoinSalaryMsg, 10240),
  92. handlerUpdateBcoinSalary: make(chan *model.VipBcoinSalaryMsg, 10240),
  93. handlerDelBcoinSalary: make(chan *model.VipBcoinSalaryMsg, 10240),
  94. handlerFailVipbuy: make(chan *model.VipBuyResq, 10240),
  95. notifycouponchan: make(chan func(), 10240),
  96. ds: databus.New(c.Databus.OldVipBinLog),
  97. newVipDatabus: databus.New(c.Databus.NewVipBinLog),
  98. accLogin: databus.New(c.Databus.AccLogin),
  99. frozenDate: time.Duration(c.Property.FrozenDate),
  100. reducePayOrder: make(map[string]*model.VipPayOrder),
  101. sendmsgchan: make(chan func(), 10240),
  102. accountNoitfyDatabus: databus.New(c.Databus.AccountNotify),
  103. couponRPC: couponrpc.New(c.RPCClient2.Coupon),
  104. }
  105. vipgRPC, err := v1.NewClient(c.VipClient)
  106. if err != nil {
  107. panic(err)
  108. }
  109. s.vipgRPC = vipgRPC
  110. t := cron.New()
  111. go s.loadappinfoproc()
  112. go s.cleanappcacheretryproc()
  113. go s.cleanvipretryproc()
  114. go s.sendmessageproc()
  115. go s.handlerfailpayorderproc()
  116. go s.handlerfailrechargeorderproc()
  117. go s.handlerfailuserinfoproc()
  118. go s.handlerautorenewlogproc()
  119. go s.handlerdelbcoinproc()
  120. if c.Databus.SalaryCoupon != nil {
  121. s.salaryCoupnDatabus = databus.New(c.Databus.SalaryCoupon)
  122. s.waiter.Add(1)
  123. go s.salarycouponproc()
  124. }
  125. if c.Databus.CouponNotify != nil {
  126. s.couponNotifyDatabus = databus.New(c.Databus.CouponNotify)
  127. go s.couponnotifyproc()
  128. s.waiter.Add(1)
  129. go s.couponnotifybinlogproc()
  130. }
  131. for i := 0; i < s.c.Property.HandlerThread; i++ {
  132. go s.handlerinsertorderproc()
  133. go s.handlerupdateorderproc()
  134. go s.handlerinsertuserinfoproc()
  135. go s.handlerupdateuserinfoproc()
  136. go s.handleraddchangehistoryproc()
  137. go s.handleraddbcoinproc()
  138. go s.handlerupdatebcoinproc()
  139. go s.handlerupdaterechargeorderproc()
  140. }
  141. for i := 0; i < s.c.Property.ReadThread; i++ {
  142. go s.readdatabusproc()
  143. }
  144. go s.readnewvipdatabusproc()
  145. if c.Property.FrozenCron != "" {
  146. go s.accloginproc()
  147. t.AddFunc(c.Property.FrozenCron, s.unFrozenJob)
  148. }
  149. t.AddFunc(c.Property.UpdateUserInfoCron, s.updateUserInfoJob)
  150. t.AddFunc(c.Property.SalaryVideoCouponCron, s.salaryVideoCouponJob)
  151. t.AddFunc(c.Property.PushDataCron, s.pushDataJob)
  152. t.AddFunc(c.Property.EleEompensateCron, s.eleEompensateJob)
  153. //t.AddFunc(c.Property.HadExpiredMsgCron, s.hadExpiredMsgJob)
  154. //t.AddFunc(c.Property.WillExpireMsgCron, s.willExpiredMsgJob)
  155. //t.AddFunc(c.Property.SendMessageCron, s.sendMessageJob)
  156. //t.AddFunc(c.Property.AutoRenewCron, s.autoRenewJob)
  157. //t.AddFunc(c.Property.SendBcoinCron, s.sendBcoinJob)
  158. t.Start()
  159. go s.consumercheckproc()
  160. if c.Databus.AutoRenew != nil {
  161. s.autoRenewdDatabus = databus.New(c.Databus.AutoRenew)
  162. s.waiter.Add(1)
  163. go s.retryautorenewpayproc()
  164. }
  165. return
  166. }
  167. func (s *Service) readnewvipdatabusproc() {
  168. defer func() {
  169. if r := recover(); r != nil {
  170. r = errors.WithStack(r.(error))
  171. log.Error("Runtime error caught: %+v", r)
  172. go s.readnewvipdatabusproc()
  173. }
  174. }()
  175. var err error
  176. for msg := range s.newVipDatabus.Messages() {
  177. val := msg.Value
  178. if err = msg.Commit(); err != nil {
  179. log.Error("readdatabusproc msg.commit() error(%v)", err)
  180. msg.Commit()
  181. }
  182. log.Info("cur consumer new vip db message(%v)", string(msg.Value))
  183. message := new(model.Message)
  184. if err = json.Unmarshal(val, message); err != nil {
  185. log.Error("readnewvipdatabusproc json.unmarshal val(%+v) error(%+v)", string(val), err)
  186. continue
  187. }
  188. if message.Table == "vip_user_info" {
  189. userInfo := new(model.VipUserInfoNewMsg)
  190. if err = json.Unmarshal(message.New, userInfo); err != nil {
  191. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
  192. continue
  193. }
  194. vipUser := convertUserInfoByNewMsg(userInfo)
  195. s.dao.DelInfoCache(context.Background(), vipUser.Mid)
  196. if message.Action == _insertAction {
  197. if vipUser.PayType == model.AutoRenew {
  198. select {
  199. case s.handlerAutoRenewLog <- vipUser:
  200. default:
  201. log.Error("s.handlerAutoRenewLog full!")
  202. }
  203. }
  204. } else if message.Action == _updateAction {
  205. oldUserMsg := new(model.VipUserInfoNewMsg)
  206. if err = json.Unmarshal(message.Old, oldUserMsg); err != nil {
  207. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.Old), err)
  208. continue
  209. }
  210. oldUser := convertUserInfoByNewMsg(oldUserMsg)
  211. if oldUser.PayType != vipUser.PayType {
  212. select {
  213. case s.handlerAutoRenewLog <- vipUser:
  214. default:
  215. log.Error("s.handlerAutoRenewLog full update!")
  216. }
  217. }
  218. }
  219. s.pubAccountNotify(vipUser.Mid)
  220. }
  221. }
  222. }
  223. func (s *Service) readdatabusproc() {
  224. defer func() {
  225. if r := recover(); r != nil {
  226. r = errors.WithStack(r.(error))
  227. log.Error("Runtime error caught: %+v", r)
  228. go s.readdatabusproc()
  229. }
  230. }()
  231. var err error
  232. for msg := range s.ds.Messages() {
  233. val := msg.Value
  234. message := new(model.Message)
  235. if err = json.Unmarshal(val, message); err != nil {
  236. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(val), err)
  237. if err = msg.Commit(); err != nil {
  238. log.Error("msg.commit() error(%v)", err)
  239. }
  240. continue
  241. }
  242. if message.Table == "vip_pay_order" {
  243. order := new(model.VipPayOrderOldMsg)
  244. if err = json.Unmarshal(message.New, order); err != nil {
  245. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
  246. if err = msg.Commit(); err != nil {
  247. log.Error("msg.commit() error(%v)", err)
  248. }
  249. continue
  250. }
  251. payOrder := s.convertPayOrder(order)
  252. if message.Action == "insert" {
  253. select {
  254. case s.handlerInsertOrder <- payOrder:
  255. default:
  256. xlog.Panic("s.handlerInsertOrder full!")
  257. }
  258. } else if message.Action == "update" {
  259. select {
  260. case s.handlerUpdateOrder <- payOrder:
  261. default:
  262. xlog.Panic("s.handlerUpdateOrder full!")
  263. }
  264. }
  265. } else if message.Table == "vip_recharge_order" {
  266. order := new(model.VipRechargeOrderMsg)
  267. if err = json.Unmarshal(message.New, order); err != nil {
  268. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
  269. if err = msg.Commit(); err != nil {
  270. log.Error("msg.commit() error(%v)", err)
  271. }
  272. continue
  273. }
  274. payOrder := s.convertPayOrderByMsg(order)
  275. if message.Action == "update" {
  276. select {
  277. case s.handlerRechargeOrder <- payOrder:
  278. default:
  279. xlog.Panic("s.handlerRechargeOrder full!")
  280. }
  281. } else if message.Action == "insert" {
  282. if len(payOrder.ThirdTradeNo) > 0 {
  283. select {
  284. case s.handlerRechargeOrder <- payOrder:
  285. default:
  286. xlog.Panic("s.handlerRechargeOrder full!")
  287. }
  288. }
  289. }
  290. } else if message.Table == "vip_user_info" {
  291. userInfo := new(model.VipUserInfoMsg)
  292. if err = json.Unmarshal(message.New, userInfo); err != nil {
  293. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
  294. if err = msg.Commit(); err != nil {
  295. log.Error("msg.commit() error(%v)", err)
  296. }
  297. continue
  298. }
  299. vipUser := convertMsgToUserInfo(userInfo)
  300. if message.Action == "insert" {
  301. select {
  302. case s.handlerInsertUserInfo <- vipUser:
  303. default:
  304. xlog.Panic("s.handlerInsertUserInfo full!")
  305. }
  306. } else if message.Action == "update" {
  307. oldUser := new(model.VipUserInfoMsg)
  308. if err = json.Unmarshal(message.Old, oldUser); err != nil {
  309. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.Old), err)
  310. if err = msg.Commit(); err != nil {
  311. log.Error("msg.commit() error(%v)", err)
  312. }
  313. continue
  314. }
  315. vipUser.OldVer = oldUser.Ver
  316. select {
  317. case s.handlerUpdateUserInfo <- vipUser:
  318. default:
  319. xlog.Panic("s.handlerUpdateUserInfo full!")
  320. }
  321. }
  322. if !s.grayScope(userInfo.Mid) {
  323. s.cleanCache(userInfo.Mid)
  324. }
  325. } else if message.Table == "vip_change_history" {
  326. historyMsg := new(model.VipChangeHistoryMsg)
  327. if err = json.Unmarshal(message.New, historyMsg); err != nil {
  328. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
  329. if err = msg.Commit(); err != nil {
  330. log.Error("msg.commit() error(%v)", err)
  331. }
  332. continue
  333. }
  334. if message.Action == "insert" {
  335. select {
  336. case s.handlerAddVipHistory <- historyMsg:
  337. default:
  338. xlog.Panic("s.handlerAddVipHistory full!")
  339. }
  340. }
  341. } else if message.Table == "vip_bcoin_salary" {
  342. bcoinMsg := new(model.VipBcoinSalaryMsg)
  343. if err = json.Unmarshal(message.New, bcoinMsg); err != nil {
  344. log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
  345. if err = msg.Commit(); err != nil {
  346. log.Error("msg.commit() error(%v)", err)
  347. }
  348. continue
  349. }
  350. if message.Action == _insertAction {
  351. select {
  352. case s.handlerAddBcoinSalary <- bcoinMsg:
  353. default:
  354. xlog.Panic("s.handlerAddBcoinSalary full!")
  355. }
  356. } else if message.Action == _updateAction {
  357. select {
  358. case s.handlerUpdateBcoinSalary <- bcoinMsg:
  359. default:
  360. xlog.Panic("s.handlerUpdateBcoinSalary full!")
  361. }
  362. } else if message.Action == _deleteAction {
  363. select {
  364. case s.handlerDelBcoinSalary <- bcoinMsg:
  365. default:
  366. xlog.Panic("s.handlerDelBcoinSalary full!")
  367. }
  368. }
  369. }
  370. if err = msg.Commit(); err != nil {
  371. log.Error("readdatabusproc msg.commit() error(%v)", err)
  372. msg.Commit()
  373. }
  374. log.Info("cur consumer message(%v)", string(msg.Value))
  375. }
  376. }
  377. func (s *Service) cleanCache(mid int64) {
  378. var (
  379. hv = new(model.HandlerVip)
  380. err error
  381. )
  382. hv.Type = 2
  383. hv.Days = 0
  384. hv.Months = 0
  385. hv.Mid = mid
  386. if err = s.cleanCacheAndNotify(context.TODO(), hv); err != nil {
  387. select {
  388. case s.cleanVipCache <- hv.Mid:
  389. default:
  390. xlog.Panic("s.cleanVipCache full!")
  391. }
  392. }
  393. s.pubAccountNotify(mid)
  394. }
  395. func (s *Service) pubAccountNotify(mid int64) (err error) {
  396. data := new(struct {
  397. Mid int64 `json:"mid"`
  398. Action string `json:"action"`
  399. })
  400. data.Mid = mid
  401. data.Action = notifyAction
  402. if err = s.accountNoitfyDatabus.Send(context.TODO(), strconv.FormatInt(mid, 10), data); err != nil {
  403. log.Error("send (%+v) error(%+v)", data, err)
  404. }
  405. log.Info("send(mid:%+v) data:%+v", mid, data)
  406. return
  407. }
  408. func (s *Service) loadappinfoproc() {
  409. defer func() {
  410. if r := recover(); r != nil {
  411. r = errors.WithStack(r.(error))
  412. log.Error("Runtime error caught: %+v", r)
  413. go s.loadappinfoproc()
  414. }
  415. }()
  416. for {
  417. s.loadAppInfo()
  418. time.Sleep(time.Minute * 2)
  419. }
  420. }
  421. func (s *Service) loadAppInfo() {
  422. var (
  423. res []*model.VipAppInfo
  424. err error
  425. )
  426. if res, err = s.dao.SelAppInfo(context.TODO()); err != nil {
  427. log.Error("loadAppInfo SelAppInfo error(%v)", err)
  428. return
  429. }
  430. aMap := make(map[int64]*model.VipAppInfo, len(res))
  431. for _, v := range res {
  432. aMap[v.ID] = v
  433. }
  434. s.appMap = aMap
  435. bytes, _ := json.Marshal(res)
  436. log.Info("load app success :%v", string(bytes))
  437. }
  438. func (s *Service) cleanvipretryproc() {
  439. defer func() {
  440. if r := recover(); r != nil {
  441. r = errors.WithStack(r.(error))
  442. log.Error("Runtime error caught: %+v", r)
  443. go s.cleanvipretryproc()
  444. }
  445. }()
  446. for {
  447. mid := <-s.cleanVipCache
  448. s.cleanVipRetry(mid)
  449. }
  450. }
  451. func (s *Service) cleanVipRetry(mid int64) {
  452. hv := new(model.HandlerVip)
  453. hv.Type = 2
  454. hv.Days = 0
  455. hv.Months = 0
  456. hv.Mid = mid
  457. s.dao.DelInfoCache(context.Background(), mid)
  458. for i := 0; i < s.c.Property.Retry; i++ {
  459. if err := s.cleanCacheAndNotify(context.TODO(), hv); err == nil {
  460. break
  461. }
  462. s.dao.DelVipInfoCache(context.TODO(), int64(hv.Mid))
  463. }
  464. log.Info("handler success cache fail mid(%v)", mid)
  465. }
  466. func (s *Service) cleanappcacheretryproc() {
  467. defer func() {
  468. if r := recover(); r != nil {
  469. r = errors.WithStack(r.(error))
  470. log.Error("Runtime error caught: %+v", r)
  471. go s.cleanappcacheretryproc()
  472. }
  473. }()
  474. for {
  475. ac := <-s.cleanAppCache
  476. s.cleanAppCacheRetry(ac)
  477. }
  478. }
  479. func (s *Service) cleanAppCacheRetry(ac *model.AppCache) {
  480. appInfo := s.appMap[ac.AppID]
  481. hv := new(model.HandlerVip)
  482. hv.Type = 2
  483. hv.Days = 0
  484. hv.Months = 0
  485. hv.Mid = ac.Mid
  486. for i := 0; i < s.c.Property.Retry; i++ {
  487. if err := s.dao.SendAppCleanCache(context.TODO(), hv, appInfo); err == nil {
  488. break
  489. }
  490. }
  491. log.Info("handler success cache app fail appInfo(%v)", ac)
  492. }
  493. func (s *Service) salarycouponproc() {
  494. defer func() {
  495. if r := recover(); r != nil {
  496. r = errors.WithStack(r.(error))
  497. log.Error("Runtime error caught: %+v", r)
  498. go s.salarycouponproc()
  499. }
  500. }()
  501. defer s.waiter.Done()
  502. var (
  503. err error
  504. msg *databus.Message
  505. msgChan = s.salaryCoupnDatabus.Messages()
  506. ok bool
  507. c = context.Background()
  508. )
  509. for {
  510. msg, ok = <-msgChan
  511. if !ok || s.closed {
  512. log.Info("salary coupon msgChan closed")
  513. return
  514. }
  515. msg.Commit()
  516. v := &model.Message{}
  517. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  518. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  519. continue
  520. }
  521. if v.Table != _tableUserInfo {
  522. continue
  523. }
  524. nvip := &model.VipUserInfoMsg{}
  525. if err = json.Unmarshal(v.New, &nvip); err != nil {
  526. log.Error("salary new json.Unmarshal values(%v),error(%v)", string(v.New), err)
  527. continue
  528. }
  529. ovip := &model.VipUserInfoMsg{}
  530. if v.Action != _insertAction {
  531. if err = json.Unmarshal(v.Old, &ovip); err != nil {
  532. log.Error("salary old json.Unmarshal values(%v),error(%v)", string(v.Old), err)
  533. continue
  534. }
  535. }
  536. log.Info("salary coupon start mid(%d)", nvip.Mid)
  537. if _, err = s.SalaryVideoCouponAtOnce(c, nvip, ovip, v.Action); err != nil {
  538. log.Error("SalaryVideoCouponAtOnce fail(%d) nvip(%v) ovip(%v) %s error(%v)", nvip.Mid, nvip, ovip, v.Action, err)
  539. continue
  540. }
  541. log.Info("salary coupon suc mid(%d)", nvip.Mid)
  542. }
  543. }
  544. func (s *Service) couponnotifybinlogproc() {
  545. defer func() {
  546. if r := recover(); r != nil {
  547. r = errors.WithStack(r.(error))
  548. log.Error("Runtime error couponnotifybinlogproc caught: %+v", r)
  549. go s.couponnotifybinlogproc()
  550. }
  551. }()
  552. defer s.waiter.Done()
  553. var (
  554. err error
  555. msg *databus.Message
  556. msgChan = s.couponNotifyDatabus.Messages()
  557. ok bool
  558. c = context.Background()
  559. )
  560. for {
  561. msg, ok = <-msgChan
  562. if !ok || s.closed {
  563. log.Info("coupon notify couponnotifybinlogproc msgChan closed")
  564. return
  565. }
  566. if err = msg.Commit(); err != nil {
  567. log.Error("couponnotifybinlogproc msg.Commit err(%v)", err)
  568. continue
  569. }
  570. log.Info("cur consumer couponnotifybinlogproc(%v)", string(msg.Value))
  571. v := &model.Message{}
  572. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  573. log.Error("couponnotifybinlogproc json.Unmarshal(%v) err(%v)", v, err)
  574. continue
  575. }
  576. if v.Table != _tablePayOrder || v.Action != _updateAction {
  577. continue
  578. }
  579. newo := new(model.VipPayOrderNewMsg)
  580. if err = json.Unmarshal(v.New, newo); err != nil {
  581. log.Error("couponnotifybinlogproc json.Unmarshal val(%v) error(%v)", string(v.New), err)
  582. continue
  583. }
  584. oldo := new(model.VipPayOrderNewMsg)
  585. if err = json.Unmarshal(v.Old, oldo); err != nil {
  586. log.Error("couponnotifybinlogproc json.Unmarshal val(%v) error(%v)", string(v.Old), err)
  587. continue
  588. }
  589. if newo == nil || oldo == nil {
  590. continue
  591. }
  592. if oldo.Status != model.PAYING {
  593. continue
  594. }
  595. if newo.Status != model.SUCCESS && newo.Status != model.FAILED {
  596. continue
  597. }
  598. if newo.CouponMoney <= 0 {
  599. continue
  600. }
  601. s.couponnotify(func() {
  602. s.CouponNotify(c, newo)
  603. })
  604. }
  605. }
  606. func (s *Service) updateUserInfoJob() {
  607. log.Info("update user info job start ....................................")
  608. s.ScanUserInfo(context.TODO())
  609. log.Info("update user info job end ........................................")
  610. }
  611. func (s *Service) salaryVideoCouponJob() {
  612. log.Info("salary video coupon job start ....................................")
  613. var err error
  614. if ok := s.dao.AddTransferLock(context.TODO(), "_transferLock"); !ok {
  615. log.Info("salary video coupon job had run ....................................")
  616. return
  617. }
  618. if err = s.ScanSalaryVideoCoupon(context.TODO()); err != nil {
  619. log.Error("ScanSalaryVideoCoupon error(%v)", err)
  620. return
  621. }
  622. log.Info("salary video coupon job end ........................................")
  623. }
  624. //Ping check db live
  625. func (s *Service) Ping(c context.Context) (err error) {
  626. return s.dao.Ping(c)
  627. }
  628. // Close all resource.
  629. func (s *Service) Close() {
  630. defer s.waiter.Wait()
  631. s.closed = true
  632. s.salaryCoupnDatabus.Close()
  633. s.dao.Close()
  634. s.ds.Close()
  635. s.newVipDatabus.Close()
  636. }
  637. func (s *Service) sendmessageproc() {
  638. defer func() {
  639. if x := recover(); x != nil {
  640. log.Error("service.sendmessageproc panic(%v)", x)
  641. go s.sendmessageproc()
  642. log.Info("service.sendmessageproc recover")
  643. }
  644. }()
  645. for {
  646. f := <-s.sendmsgchan
  647. f()
  648. }
  649. }
  650. func (s *Service) sendmessage(f func()) {
  651. defer func() {
  652. if x := recover(); x != nil {
  653. log.Error("service.sendmessage panic(%v)", x)
  654. }
  655. }()
  656. select {
  657. case s.sendmsgchan <- f:
  658. default:
  659. log.Error("service.sendmessage chan full")
  660. }
  661. }
  662. func (s *Service) consumercheckproc() {
  663. for {
  664. time.Sleep(time.Second)
  665. log.Info("consumercheckproc chan(cleanVipCache) size: %d", len(s.cleanVipCache))
  666. log.Info("consumercheckproc chan(cleanAppCache) size: %d", len(s.cleanAppCache))
  667. log.Info("consumercheckproc chan(handlerFailPayOrder) size: %d", len(s.handlerFailPayOrder))
  668. log.Info("consumercheckproc chan(handlerFailUserInfo) size: %d", len(s.handlerFailUserInfo))
  669. log.Info("consumercheckproc chan(handlerFailRechargeOrder) size: %d", len(s.handlerFailRechargeOrder))
  670. log.Info("consumercheckproc chan(handlerFailVipbuy) size: %d", len(s.handlerFailVipbuy))
  671. log.Info("consumercheckproc chan(handlerInsertOrder) size: %d", len(s.handlerInsertOrder))
  672. log.Info("consumercheckproc chan(handlerUpdateOrder) size: %d", len(s.handlerUpdateOrder))
  673. log.Info("consumercheckproc chan(handlerRechargeOrder) size: %d", len(s.handlerRechargeOrder))
  674. log.Info("consumercheckproc chan(handlerInsertUserInfo) size: %d", len(s.handlerInsertUserInfo))
  675. log.Info("consumercheckproc chan(handlerUpdateUserInfo) size: %d", len(s.handlerUpdateUserInfo))
  676. log.Info("consumercheckproc chan(handlerStationActive) size: %d", len(s.handlerStationActive))
  677. log.Info("consumercheckproc chan(handlerAutoRenewLog) size: %d", len(s.handlerAutoRenewLog))
  678. log.Info("consumercheckproc chan(handlerAddVipHistory) size: %d", len(s.handlerAddVipHistory))
  679. log.Info("consumercheckproc chan(handlerAddBcoinSalary) size: %d", len(s.handlerAddBcoinSalary))
  680. log.Info("consumercheckproc chan(handlerUpdateBcoinSalary) size: %d", len(s.handlerUpdateBcoinSalary))
  681. log.Info("consumercheckproc chan(handlerDelBcoinSalary) size: %d", len(s.handlerDelBcoinSalary))
  682. log.Info("consumercheckproc chan(sendmsgchan) size: %d", len(s.sendmsgchan))
  683. log.Info("consumercheckproc chan(notifycouponchan) size: %d", len(s.notifycouponchan))
  684. }
  685. }