service.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "go-common/app/job/live/xlottery/internal/model"
  8. "go-common/library/ecode"
  9. "go-common/library/queue/databus/report"
  10. "net/http"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "github.com/robfig/cron"
  15. "go-common/app/job/live/xlottery/internal/conf"
  16. "go-common/app/job/live/xlottery/internal/dao"
  17. "go-common/library/log"
  18. bm "go-common/library/net/http/blademaster"
  19. "go-common/library/queue/databus"
  20. )
  21. // Service struct
  22. type Service struct {
  23. c *conf.Config
  24. dao *dao.Dao
  25. cron *cron.Cron
  26. giftPaySub *databus.Databus
  27. giftFreeSub *databus.Databus
  28. capsuleSub *databus.Databus
  29. ExpireCountFrequency string
  30. CouponRetryFrequency string
  31. httpClient *bm.Client
  32. wg *sync.WaitGroup
  33. }
  34. const _sendGiftKey = "lottery:gift:msgid:%s"
  35. const _addCapsuleKey = "lottery:gift:msgid:%s"
  36. type info struct {
  37. MsgContent string `json:"msg_content"`
  38. }
  39. type msgContent struct {
  40. Body *body `json:"body"`
  41. }
  42. type body struct {
  43. GiftId int64 `json:"giftid"`
  44. RoomId int64 `json:"roomid"`
  45. Num int64 `json:"num"`
  46. Uid int64 `json:"uid"`
  47. Ruid int64 `json:"ruid"`
  48. TotalCoin int64 `json:"totalCoin"`
  49. CoinType string `json:"coinType"`
  50. Tid string `json:"tid"`
  51. Platform string `json:"platform"`
  52. RoomInfo *roomInfo `json:"roomInfo"`
  53. }
  54. type roomInfo struct {
  55. AreaV2Id int64 `json:"area_v2_id"`
  56. AreaV2ParentId int64 `json:"area_v2_parent_id"`
  57. }
  58. // New init
  59. func New(c *conf.Config) (s *Service) {
  60. s = &Service{
  61. c: c,
  62. dao: dao.New(c),
  63. cron: cron.New(),
  64. giftPaySub: databus.New(c.GiftPaySub),
  65. giftFreeSub: databus.New(c.GiftFreeSub),
  66. capsuleSub: databus.New(c.AddCapsuleSub),
  67. wg: new(sync.WaitGroup),
  68. ExpireCountFrequency: c.Cfg.ExpireCountFrequency,
  69. CouponRetryFrequency: c.Cfg.CouponRetryFrequency,
  70. httpClient: bm.NewClient(c.HTTPClient),
  71. }
  72. report.InitUser(conf.Conf.UserReport)
  73. dao.InitAPI()
  74. s.addCrontab()
  75. s.cron.Start()
  76. s.tickerReloadCapsuleConf(context.TODO())
  77. log.Info("[service.lottery| 11start")
  78. var i int64
  79. for i = 0; i < c.Cfg.ConsumerProcNum; i++ {
  80. s.wg.Add(1)
  81. go s.giftConsumeProc()
  82. }
  83. s.wg.Add(1)
  84. go s.capsuleConsumeProc()
  85. return s
  86. }
  87. // Ping Service
  88. func (s *Service) Ping(ctx context.Context) (err error) {
  89. return s.dao.Ping(ctx)
  90. }
  91. // Close Service
  92. func (s *Service) Close() {
  93. s.subClose()
  94. s.wg.Wait()
  95. s.dao.Close()
  96. }
  97. // subClose Close all sub channels
  98. func (s *Service) subClose() {
  99. s.giftPaySub.Close()
  100. s.giftFreeSub.Close()
  101. s.capsuleSub.Close()
  102. }
  103. func (s *Service) addCrontab() (err error) {
  104. //spew.Dump(s.ExpireCountFrequency)
  105. err = s.cron.AddFunc(s.ExpireCountFrequency, s.TransCapsule)
  106. if err != nil {
  107. log.Error("cron job transCapsule error(%v)", err)
  108. }
  109. err = s.cron.AddFunc(s.CouponRetryFrequency, s.CouponRetry)
  110. if err != nil {
  111. log.Error("cron job couponRetry error(%v)", err)
  112. }
  113. return
  114. }
  115. // CouponRetry 抽奖券重试
  116. func (s *Service) CouponRetry() {
  117. var ctx = context.Background()
  118. if s.c.CouponConf == nil || s.c.CouponConf.Url == "" || len(s.c.CouponConf.Coupon) == 0 {
  119. log.Error("[service.capsule | sendAward] couponConf is empty")
  120. return
  121. }
  122. nowTime := time.Now()
  123. log.Info("[service.service | couponRetry]couponRetry %s", nowTime.Format("2006-01-02 15:04:05"))
  124. extraData, _ := s.dao.GetCouponData(ctx)
  125. if len(extraData) == 0 {
  126. return
  127. }
  128. for _, extra := range extraData {
  129. s.dao.UpdateExtraMtimeById(ctx, extra.Id, nowTime.Format("2006-01-02 15:04:05"))
  130. awardType := extra.ItemExtra
  131. if _, ok := s.c.CouponConf.Coupon[awardType]; !ok {
  132. log.Error("[service.capsule | sendAward] couponConf.coupon is empty %s", awardType)
  133. continue
  134. }
  135. uid := extra.Uid
  136. var res struct {
  137. Code int `json:"code"`
  138. Msg string `json:"message"`
  139. }
  140. endPoint := s.c.CouponConf.Url
  141. postJson := make(map[string]interface{})
  142. postJson["mid"] = uid
  143. postJson["couponId"] = s.c.CouponConf.Coupon[awardType]
  144. bytesData, err := json.Marshal(postJson)
  145. if err != nil {
  146. log.Error("[service.capsule | sendAward] json.Marshal(%v) error(%v)", postJson, err)
  147. continue
  148. }
  149. req, err := http.NewRequest("POST", endPoint, bytes.NewReader(bytesData))
  150. if err != nil {
  151. log.Error("[service.capsule | sendAward] http.NewRequest(%v) url(%v) error(%v)", postJson, endPoint, err)
  152. continue
  153. }
  154. req.Header.Add("Content-Type", "application/json;charset=UTF-8")
  155. log.Info("coupon vip mid(%d) couponID(%s)", uid, s.c.CouponConf.Coupon[awardType])
  156. if err = s.httpClient.Do(ctx, req, &res); err != nil {
  157. log.Error("[service.capsule | sendAward] s.client.Do error(%v)", err)
  158. continue
  159. }
  160. if res.Code != 0 && res.Code != 83110005 {
  161. err = ecode.Int(res.Code)
  162. log.Error("coupon vip url(%v) res code(%d)", endPoint, res.Code)
  163. continue
  164. }
  165. log.Info("[service.capsule | sendAward] s.client.Do endpoint (%v) req (%v)", endPoint, postJson)
  166. s.dao.UpdateExtraValueById(ctx, extra.Id, 1)
  167. }
  168. }
  169. // TransCapsule 转换扭蛋币
  170. func (s *Service) TransCapsule() {
  171. var ctx = context.Background()
  172. pools, err := s.dao.GetActiveColorPool(ctx)
  173. if err != nil {
  174. log.Error("[service.service | TransCapsule]CronJob TransCapsule GetActiveColorPool error(%v)", err)
  175. return
  176. }
  177. nowTime := time.Now().Add(-(60 * time.Second)).Format("2006-01-02 15:04")
  178. log.Info("[service.service | TransCapsule]TranCapsule %s", nowTime)
  179. flag := 0
  180. coinId := int64(0)
  181. for _, pool := range pools {
  182. if pool.EndTime == 0 {
  183. continue
  184. } else {
  185. endTimeUnix := time.Unix(pool.EndTime, 0)
  186. endTime := endTimeUnix.Format("2006-01-02 15:04")
  187. if endTime == nowTime {
  188. flag = 1
  189. coinId = pool.CoinId
  190. }
  191. }
  192. }
  193. if flag == 1 {
  194. colorChangeNum, err := s.dao.GetTransNum(ctx, coinId)
  195. if err != nil || colorChangeNum == 0 {
  196. log.Error("[service.service | TransCapsule] GetTransNum colorChangeNum: %d, err: %v", colorChangeNum, err)
  197. return
  198. }
  199. normalChangeNum, err := s.dao.GetTransNum(ctx, dao.NormalCoinId)
  200. if err != nil || normalChangeNum == 0 {
  201. log.Error("[service.service | TransCapsule] GetTransNum normalChangeNum: %d, err: %v", normalChangeNum, err)
  202. return
  203. }
  204. for i := int64(0); i < 10; i++ {
  205. err := s.dao.TransCapsule(ctx, strconv.FormatInt(i, 10), colorChangeNum, normalChangeNum)
  206. if err != nil {
  207. log.Error("[service.service | TransCapsule]TranCapsule error %v", err)
  208. return
  209. }
  210. log.Info("[service.service | TransCapsule]TranCapsule %s", strconv.FormatInt(i, 10))
  211. }
  212. }
  213. }
  214. // expCanalConsumeproc consumer archive
  215. func (s *Service) giftConsumeProc() {
  216. defer func() {
  217. log.Warn("giftConsumeProc exited.")
  218. s.wg.Done()
  219. }()
  220. var (
  221. payMsgs = s.giftPaySub.Messages()
  222. freeMsgs = s.giftFreeSub.Messages()
  223. )
  224. log.Info("[service.lottery|giftConsumeProc")
  225. for {
  226. select {
  227. case msg, ok := <-payMsgs:
  228. if !ok {
  229. log.Warn("[service.lottery|giftConsumeProc] giftPaySub has been closed.")
  230. return
  231. }
  232. var value *info
  233. var subValue *msgContent
  234. err := json.Unmarshal([]byte(msg.Value), &value)
  235. if err != nil {
  236. log.Error("[service.lottery|giftConsumeProc] giftPaySub json decode error:%v", err)
  237. continue
  238. }
  239. err = json.Unmarshal([]byte(value.MsgContent), &subValue)
  240. if err != nil {
  241. log.Error("[service.lottery|giftConsumeProc] giftPaySub json decode error:%v", err)
  242. continue
  243. }
  244. areaV2Id := subValue.Body.RoomInfo.AreaV2Id
  245. areaV2ParentId := subValue.Body.RoomInfo.AreaV2ParentId
  246. giftId := subValue.Body.GiftId
  247. roomId := subValue.Body.RoomId
  248. num := subValue.Body.Num
  249. uid := subValue.Body.Uid
  250. ruid := subValue.Body.Ruid
  251. totalCoin := subValue.Body.TotalCoin
  252. coinType := subValue.Body.CoinType
  253. platform := subValue.Body.Platform
  254. key := fmt.Sprintf(_sendGiftKey, subValue.Body.Tid)
  255. isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0)
  256. if err != nil || !isGetLock {
  257. log.Error("[service.lottery|giftConsumeProc Lock Error msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d) err(%v)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset, err)
  258. continue
  259. }
  260. msg.Commit()
  261. log.Info("[service.lottery|giftConsumeProc] pay-msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset)
  262. s.sendGift(context.Background(), uid, giftId, num, totalCoin, coinType, areaV2ParentId, areaV2Id, platform)
  263. case msg, ok := <-freeMsgs:
  264. if !ok {
  265. log.Warn("[service.lottery|giftConsumeProc] giftFreeSub has been closed.")
  266. return
  267. }
  268. var value *info
  269. var subValue *msgContent
  270. err := json.Unmarshal([]byte(msg.Value), &value)
  271. if err != nil {
  272. log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err)
  273. continue
  274. }
  275. err = json.Unmarshal([]byte(value.MsgContent), &subValue)
  276. if err != nil {
  277. log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err)
  278. continue
  279. }
  280. areaV2Id := subValue.Body.RoomInfo.AreaV2Id
  281. areaV2ParentId := subValue.Body.RoomInfo.AreaV2ParentId
  282. giftId := subValue.Body.GiftId
  283. roomId := subValue.Body.RoomId
  284. num := subValue.Body.Num
  285. uid := subValue.Body.Uid
  286. ruid := subValue.Body.Ruid
  287. totalCoin := subValue.Body.TotalCoin
  288. coinType := subValue.Body.CoinType
  289. platform := subValue.Body.Platform
  290. key := fmt.Sprintf(_sendGiftKey, subValue.Body.Tid)
  291. isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0)
  292. if err != nil || !isGetLock {
  293. log.Error("[service.lottery|giftConsumeProc Lock Error msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d) err(%v)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset, err)
  294. continue
  295. }
  296. msg.Commit()
  297. log.Info("[service.lottery|giftConsumeProc] pay-msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset)
  298. s.sendGift(context.Background(), uid, giftId, num, totalCoin, coinType, areaV2ParentId, areaV2Id, platform)
  299. default:
  300. time.Sleep(time.Second * 3)
  301. continue
  302. }
  303. }
  304. }
  305. func (s *Service) capsuleConsumeProc() {
  306. defer func() {
  307. log.Warn("capsuleConsumeProc exited.")
  308. s.wg.Done()
  309. }()
  310. var (
  311. capsuleMsgs = s.capsuleSub.Messages()
  312. )
  313. log.Info("[service.lottery|capsuleConsumeProc")
  314. for {
  315. select {
  316. case msg, ok := <-capsuleMsgs:
  317. if !ok {
  318. log.Warn("[service.lottery|capsuleConsumeProc] giftPaySub has been closed.")
  319. return
  320. }
  321. var msgContent *info
  322. var value *model.AddCapsule
  323. err := json.Unmarshal([]byte(msg.Value), &msgContent)
  324. if err != nil {
  325. log.Error("[service.lottery|capsuleConsumeProc] json decode error:%v", err)
  326. continue
  327. }
  328. err = json.Unmarshal([]byte(msgContent.MsgContent), &value)
  329. if err != nil {
  330. log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err)
  331. continue
  332. }
  333. uid := value.Uid
  334. cType := value.Type
  335. coinId := value.CoinId
  336. num := value.Num
  337. key := fmt.Sprintf(_addCapsuleKey, value.MsgId)
  338. isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0)
  339. if err != nil || !isGetLock {
  340. log.Error("[service.lottery|capsuleConsumeProc Lock Error msgKey(%s) uid(%d) num(%d) type(%s) coinId(%d) tid(%s) offset(%d) err(%v)", msg.Key, uid, num, cType, coinId, value.MsgId, msg.Offset, err)
  341. continue
  342. }
  343. msg.Commit()
  344. log.Info("[service.lottery|capsuleConsumeProc] msgKey(%s) uid(%d) num(%d) type(%s) coinId(%s) tid(%s) offset(%d)", msg.Key, uid, num, cType, coinId, value.MsgId, msg.Offset)
  345. s.addCapsule(context.Background(), uid, coinId, num)
  346. default:
  347. time.Sleep(time.Second * 3)
  348. continue
  349. }
  350. }
  351. }
  352. // SendGift 送礼增加扭蛋积分
  353. func (s *Service) sendGift(ctx context.Context, uid, giftId, num, totalCoin int64, coinType string, areaV2ParentId, areaV2Id int64, platform string) {
  354. if totalCoin <= 0 {
  355. return
  356. }
  357. coinConfMap, err := s.dao.GetCapsuleConf(ctx)
  358. if err != nil || len(coinConfMap) == 0 {
  359. return
  360. }
  361. var addCoinId = int64(dao.NormalCoinId)
  362. var coinIds = []int64{dao.BlessCoinId, dao.LplCoinId, dao.WeekCoinId, dao.ColorfulCoinId, dao.NormalCoinId}
  363. for _, coinId := range coinIds {
  364. if _, ok := coinConfMap[coinId]; ok {
  365. if coinConfMap[coinId].AreaMap != nil {
  366. _, v2ID := coinConfMap[coinId].AreaMap[areaV2Id]
  367. _, v2ParentID := coinConfMap[coinId].AreaMap[areaV2ParentId]
  368. if v2ID || v2ParentID {
  369. if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeAll {
  370. addCoinId = coinId
  371. } else if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeGold {
  372. if coinType == "gold" {
  373. addCoinId = coinId
  374. }
  375. } else if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeSelected {
  376. if coinConfMap[coinId].GiftMap != nil {
  377. if _, ok := coinConfMap[coinId].GiftMap[giftId]; ok {
  378. addCoinId = coinId
  379. }
  380. }
  381. }
  382. }
  383. }
  384. }
  385. if addCoinId != dao.NormalCoinId {
  386. break
  387. }
  388. }
  389. // 首次赠送
  390. if addCoinId == dao.LplCoinId {
  391. if s.dao.CheckLplFirstGift(ctx, uid, giftId) {
  392. totalCoin = totalCoin + coinConfMap[addCoinId].ChangeNum
  393. }
  394. }
  395. if addCoinId <= dao.ColorfulCoinId {
  396. _, err = s.dao.UpdateScore(ctx, uid, addCoinId, totalCoin, "sendGift", platform, nil, coinConfMap[addCoinId])
  397. } else {
  398. _, err = s.dao.UpdateCapsule(ctx, uid, addCoinId, totalCoin, "sendGift", platform, coinConfMap[addCoinId])
  399. }
  400. if err != nil {
  401. log.Error("[service.lottery|sendGift] UpdateScore type:%d error:%v", addCoinId, err)
  402. return
  403. }
  404. }
  405. func (s *Service) addCapsule(ctx context.Context, uid, coinId, num int64) {
  406. coinConfMap, err := s.dao.GetCapsuleConf(ctx)
  407. if err != nil || len(coinConfMap) == 0 {
  408. return
  409. }
  410. addCoinId := coinId
  411. if _, ok := coinConfMap[addCoinId]; !ok {
  412. return
  413. }
  414. totalCoin := coinConfMap[addCoinId].ChangeNum * num
  415. if addCoinId <= dao.ColorfulCoinId {
  416. _, err = s.dao.UpdateScore(ctx, uid, addCoinId, totalCoin, "sendGift", "", nil, coinConfMap[addCoinId])
  417. } else {
  418. _, err = s.dao.UpdateCapsule(ctx, uid, addCoinId, totalCoin, "sendGift", "", coinConfMap[addCoinId])
  419. }
  420. if err != nil {
  421. log.Error("[service.lottery|addCapsule] UpdateScore type:%d error:%v", addCoinId, err)
  422. return
  423. }
  424. }
  425. //定时重置Capusule
  426. func (s *Service) tickerReloadCapsuleConf(ctx context.Context) {
  427. changeFlag, _ := s.dao.GetCapsuleChangeFlag(ctx)
  428. s.dao.RelaodCapsuleConfig(ctx, changeFlag)
  429. ticker := time.NewTicker(time.Second)
  430. go func() {
  431. for range ticker.C {
  432. redisChangeFlag, _ := s.dao.GetCapsuleChangeFlag(ctx)
  433. capsuleCacheTime, capsuleChangeFlag := s.dao.GetCapsuleChangeInfo(ctx)
  434. if redisChangeFlag != capsuleChangeFlag || time.Now().Unix()-capsuleCacheTime > 60 {
  435. s.dao.RelaodCapsuleConfig(ctx, redisChangeFlag)
  436. }
  437. }
  438. }()
  439. }