123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- package service
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "go-common/app/job/live/xlottery/internal/model"
- "go-common/library/ecode"
- "go-common/library/queue/databus/report"
- "net/http"
- "strconv"
- "sync"
- "time"
- "github.com/robfig/cron"
- "go-common/app/job/live/xlottery/internal/conf"
- "go-common/app/job/live/xlottery/internal/dao"
- "go-common/library/log"
- bm "go-common/library/net/http/blademaster"
- "go-common/library/queue/databus"
- )
- // Service struct
- type Service struct {
- c *conf.Config
- dao *dao.Dao
- cron *cron.Cron
- giftPaySub *databus.Databus
- giftFreeSub *databus.Databus
- capsuleSub *databus.Databus
- ExpireCountFrequency string
- CouponRetryFrequency string
- httpClient *bm.Client
- wg *sync.WaitGroup
- }
- const _sendGiftKey = "lottery:gift:msgid:%s"
- const _addCapsuleKey = "lottery:gift:msgid:%s"
- type info struct {
- MsgContent string `json:"msg_content"`
- }
- type msgContent struct {
- Body *body `json:"body"`
- }
- type body struct {
- GiftId int64 `json:"giftid"`
- RoomId int64 `json:"roomid"`
- Num int64 `json:"num"`
- Uid int64 `json:"uid"`
- Ruid int64 `json:"ruid"`
- TotalCoin int64 `json:"totalCoin"`
- CoinType string `json:"coinType"`
- Tid string `json:"tid"`
- Platform string `json:"platform"`
- RoomInfo *roomInfo `json:"roomInfo"`
- }
- type roomInfo struct {
- AreaV2Id int64 `json:"area_v2_id"`
- AreaV2ParentId int64 `json:"area_v2_parent_id"`
- }
- // New init
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- dao: dao.New(c),
- cron: cron.New(),
- giftPaySub: databus.New(c.GiftPaySub),
- giftFreeSub: databus.New(c.GiftFreeSub),
- capsuleSub: databus.New(c.AddCapsuleSub),
- wg: new(sync.WaitGroup),
- ExpireCountFrequency: c.Cfg.ExpireCountFrequency,
- CouponRetryFrequency: c.Cfg.CouponRetryFrequency,
- httpClient: bm.NewClient(c.HTTPClient),
- }
- report.InitUser(conf.Conf.UserReport)
- dao.InitAPI()
- s.addCrontab()
- s.cron.Start()
- s.tickerReloadCapsuleConf(context.TODO())
- log.Info("[service.lottery| 11start")
- var i int64
- for i = 0; i < c.Cfg.ConsumerProcNum; i++ {
- s.wg.Add(1)
- go s.giftConsumeProc()
- }
- s.wg.Add(1)
- go s.capsuleConsumeProc()
- return s
- }
- // Ping Service
- func (s *Service) Ping(ctx context.Context) (err error) {
- return s.dao.Ping(ctx)
- }
- // Close Service
- func (s *Service) Close() {
- s.subClose()
- s.wg.Wait()
- s.dao.Close()
- }
- // subClose Close all sub channels
- func (s *Service) subClose() {
- s.giftPaySub.Close()
- s.giftFreeSub.Close()
- s.capsuleSub.Close()
- }
- func (s *Service) addCrontab() (err error) {
- //spew.Dump(s.ExpireCountFrequency)
- err = s.cron.AddFunc(s.ExpireCountFrequency, s.TransCapsule)
- if err != nil {
- log.Error("cron job transCapsule error(%v)", err)
- }
- err = s.cron.AddFunc(s.CouponRetryFrequency, s.CouponRetry)
- if err != nil {
- log.Error("cron job couponRetry error(%v)", err)
- }
- return
- }
- // CouponRetry 抽奖券重试
- func (s *Service) CouponRetry() {
- var ctx = context.Background()
- if s.c.CouponConf == nil || s.c.CouponConf.Url == "" || len(s.c.CouponConf.Coupon) == 0 {
- log.Error("[service.capsule | sendAward] couponConf is empty")
- return
- }
- nowTime := time.Now()
- log.Info("[service.service | couponRetry]couponRetry %s", nowTime.Format("2006-01-02 15:04:05"))
- extraData, _ := s.dao.GetCouponData(ctx)
- if len(extraData) == 0 {
- return
- }
- for _, extra := range extraData {
- s.dao.UpdateExtraMtimeById(ctx, extra.Id, nowTime.Format("2006-01-02 15:04:05"))
- awardType := extra.ItemExtra
- if _, ok := s.c.CouponConf.Coupon[awardType]; !ok {
- log.Error("[service.capsule | sendAward] couponConf.coupon is empty %s", awardType)
- continue
- }
- uid := extra.Uid
- var res struct {
- Code int `json:"code"`
- Msg string `json:"message"`
- }
- endPoint := s.c.CouponConf.Url
- postJson := make(map[string]interface{})
- postJson["mid"] = uid
- postJson["couponId"] = s.c.CouponConf.Coupon[awardType]
- bytesData, err := json.Marshal(postJson)
- if err != nil {
- log.Error("[service.capsule | sendAward] json.Marshal(%v) error(%v)", postJson, err)
- continue
- }
- req, err := http.NewRequest("POST", endPoint, bytes.NewReader(bytesData))
- if err != nil {
- log.Error("[service.capsule | sendAward] http.NewRequest(%v) url(%v) error(%v)", postJson, endPoint, err)
- continue
- }
- req.Header.Add("Content-Type", "application/json;charset=UTF-8")
- log.Info("coupon vip mid(%d) couponID(%s)", uid, s.c.CouponConf.Coupon[awardType])
- if err = s.httpClient.Do(ctx, req, &res); err != nil {
- log.Error("[service.capsule | sendAward] s.client.Do error(%v)", err)
- continue
- }
- if res.Code != 0 && res.Code != 83110005 {
- err = ecode.Int(res.Code)
- log.Error("coupon vip url(%v) res code(%d)", endPoint, res.Code)
- continue
- }
- log.Info("[service.capsule | sendAward] s.client.Do endpoint (%v) req (%v)", endPoint, postJson)
- s.dao.UpdateExtraValueById(ctx, extra.Id, 1)
- }
- }
- // TransCapsule 转换扭蛋币
- func (s *Service) TransCapsule() {
- var ctx = context.Background()
- pools, err := s.dao.GetActiveColorPool(ctx)
- if err != nil {
- log.Error("[service.service | TransCapsule]CronJob TransCapsule GetActiveColorPool error(%v)", err)
- return
- }
- nowTime := time.Now().Add(-(60 * time.Second)).Format("2006-01-02 15:04")
- log.Info("[service.service | TransCapsule]TranCapsule %s", nowTime)
- flag := 0
- coinId := int64(0)
- for _, pool := range pools {
- if pool.EndTime == 0 {
- continue
- } else {
- endTimeUnix := time.Unix(pool.EndTime, 0)
- endTime := endTimeUnix.Format("2006-01-02 15:04")
- if endTime == nowTime {
- flag = 1
- coinId = pool.CoinId
- }
- }
- }
- if flag == 1 {
- colorChangeNum, err := s.dao.GetTransNum(ctx, coinId)
- if err != nil || colorChangeNum == 0 {
- log.Error("[service.service | TransCapsule] GetTransNum colorChangeNum: %d, err: %v", colorChangeNum, err)
- return
- }
- normalChangeNum, err := s.dao.GetTransNum(ctx, dao.NormalCoinId)
- if err != nil || normalChangeNum == 0 {
- log.Error("[service.service | TransCapsule] GetTransNum normalChangeNum: %d, err: %v", normalChangeNum, err)
- return
- }
- for i := int64(0); i < 10; i++ {
- err := s.dao.TransCapsule(ctx, strconv.FormatInt(i, 10), colorChangeNum, normalChangeNum)
- if err != nil {
- log.Error("[service.service | TransCapsule]TranCapsule error %v", err)
- return
- }
- log.Info("[service.service | TransCapsule]TranCapsule %s", strconv.FormatInt(i, 10))
- }
- }
- }
- // expCanalConsumeproc consumer archive
- func (s *Service) giftConsumeProc() {
- defer func() {
- log.Warn("giftConsumeProc exited.")
- s.wg.Done()
- }()
- var (
- payMsgs = s.giftPaySub.Messages()
- freeMsgs = s.giftFreeSub.Messages()
- )
- log.Info("[service.lottery|giftConsumeProc")
- for {
- select {
- case msg, ok := <-payMsgs:
- if !ok {
- log.Warn("[service.lottery|giftConsumeProc] giftPaySub has been closed.")
- return
- }
- var value *info
- var subValue *msgContent
- err := json.Unmarshal([]byte(msg.Value), &value)
- if err != nil {
- log.Error("[service.lottery|giftConsumeProc] giftPaySub json decode error:%v", err)
- continue
- }
- err = json.Unmarshal([]byte(value.MsgContent), &subValue)
- if err != nil {
- log.Error("[service.lottery|giftConsumeProc] giftPaySub json decode error:%v", err)
- continue
- }
- areaV2Id := subValue.Body.RoomInfo.AreaV2Id
- areaV2ParentId := subValue.Body.RoomInfo.AreaV2ParentId
- giftId := subValue.Body.GiftId
- roomId := subValue.Body.RoomId
- num := subValue.Body.Num
- uid := subValue.Body.Uid
- ruid := subValue.Body.Ruid
- totalCoin := subValue.Body.TotalCoin
- coinType := subValue.Body.CoinType
- platform := subValue.Body.Platform
- key := fmt.Sprintf(_sendGiftKey, subValue.Body.Tid)
- isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0)
- if err != nil || !isGetLock {
- log.Error("[service.lottery|giftConsumeProc Lock Error msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d) err(%v)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset, err)
- continue
- }
- msg.Commit()
- log.Info("[service.lottery|giftConsumeProc] pay-msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset)
- s.sendGift(context.Background(), uid, giftId, num, totalCoin, coinType, areaV2ParentId, areaV2Id, platform)
- case msg, ok := <-freeMsgs:
- if !ok {
- log.Warn("[service.lottery|giftConsumeProc] giftFreeSub has been closed.")
- return
- }
- var value *info
- var subValue *msgContent
- err := json.Unmarshal([]byte(msg.Value), &value)
- if err != nil {
- log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err)
- continue
- }
- err = json.Unmarshal([]byte(value.MsgContent), &subValue)
- if err != nil {
- log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err)
- continue
- }
- areaV2Id := subValue.Body.RoomInfo.AreaV2Id
- areaV2ParentId := subValue.Body.RoomInfo.AreaV2ParentId
- giftId := subValue.Body.GiftId
- roomId := subValue.Body.RoomId
- num := subValue.Body.Num
- uid := subValue.Body.Uid
- ruid := subValue.Body.Ruid
- totalCoin := subValue.Body.TotalCoin
- coinType := subValue.Body.CoinType
- platform := subValue.Body.Platform
- key := fmt.Sprintf(_sendGiftKey, subValue.Body.Tid)
- isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0)
- if err != nil || !isGetLock {
- log.Error("[service.lottery|giftConsumeProc Lock Error msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d) err(%v)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset, err)
- continue
- }
- msg.Commit()
- log.Info("[service.lottery|giftConsumeProc] pay-msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset)
- s.sendGift(context.Background(), uid, giftId, num, totalCoin, coinType, areaV2ParentId, areaV2Id, platform)
- default:
- time.Sleep(time.Second * 3)
- continue
- }
- }
- }
- func (s *Service) capsuleConsumeProc() {
- defer func() {
- log.Warn("capsuleConsumeProc exited.")
- s.wg.Done()
- }()
- var (
- capsuleMsgs = s.capsuleSub.Messages()
- )
- log.Info("[service.lottery|capsuleConsumeProc")
- for {
- select {
- case msg, ok := <-capsuleMsgs:
- if !ok {
- log.Warn("[service.lottery|capsuleConsumeProc] giftPaySub has been closed.")
- return
- }
- var msgContent *info
- var value *model.AddCapsule
- err := json.Unmarshal([]byte(msg.Value), &msgContent)
- if err != nil {
- log.Error("[service.lottery|capsuleConsumeProc] json decode error:%v", err)
- continue
- }
- err = json.Unmarshal([]byte(msgContent.MsgContent), &value)
- if err != nil {
- log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err)
- continue
- }
- uid := value.Uid
- cType := value.Type
- coinId := value.CoinId
- num := value.Num
- key := fmt.Sprintf(_addCapsuleKey, value.MsgId)
- isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0)
- if err != nil || !isGetLock {
- log.Error("[service.lottery|capsuleConsumeProc Lock Error msgKey(%s) uid(%d) num(%d) type(%s) coinId(%d) tid(%s) offset(%d) err(%v)", msg.Key, uid, num, cType, coinId, value.MsgId, msg.Offset, err)
- continue
- }
- msg.Commit()
- log.Info("[service.lottery|capsuleConsumeProc] msgKey(%s) uid(%d) num(%d) type(%s) coinId(%s) tid(%s) offset(%d)", msg.Key, uid, num, cType, coinId, value.MsgId, msg.Offset)
- s.addCapsule(context.Background(), uid, coinId, num)
- default:
- time.Sleep(time.Second * 3)
- continue
- }
- }
- }
- // SendGift 送礼增加扭蛋积分
- func (s *Service) sendGift(ctx context.Context, uid, giftId, num, totalCoin int64, coinType string, areaV2ParentId, areaV2Id int64, platform string) {
- if totalCoin <= 0 {
- return
- }
- coinConfMap, err := s.dao.GetCapsuleConf(ctx)
- if err != nil || len(coinConfMap) == 0 {
- return
- }
- var addCoinId = int64(dao.NormalCoinId)
- var coinIds = []int64{dao.BlessCoinId, dao.LplCoinId, dao.WeekCoinId, dao.ColorfulCoinId, dao.NormalCoinId}
- for _, coinId := range coinIds {
- if _, ok := coinConfMap[coinId]; ok {
- if coinConfMap[coinId].AreaMap != nil {
- _, v2ID := coinConfMap[coinId].AreaMap[areaV2Id]
- _, v2ParentID := coinConfMap[coinId].AreaMap[areaV2ParentId]
- if v2ID || v2ParentID {
- if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeAll {
- addCoinId = coinId
- } else if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeGold {
- if coinType == "gold" {
- addCoinId = coinId
- }
- } else if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeSelected {
- if coinConfMap[coinId].GiftMap != nil {
- if _, ok := coinConfMap[coinId].GiftMap[giftId]; ok {
- addCoinId = coinId
- }
- }
- }
- }
- }
- }
- if addCoinId != dao.NormalCoinId {
- break
- }
- }
- // 首次赠送
- if addCoinId == dao.LplCoinId {
- if s.dao.CheckLplFirstGift(ctx, uid, giftId) {
- totalCoin = totalCoin + coinConfMap[addCoinId].ChangeNum
- }
- }
- if addCoinId <= dao.ColorfulCoinId {
- _, err = s.dao.UpdateScore(ctx, uid, addCoinId, totalCoin, "sendGift", platform, nil, coinConfMap[addCoinId])
- } else {
- _, err = s.dao.UpdateCapsule(ctx, uid, addCoinId, totalCoin, "sendGift", platform, coinConfMap[addCoinId])
- }
- if err != nil {
- log.Error("[service.lottery|sendGift] UpdateScore type:%d error:%v", addCoinId, err)
- return
- }
- }
- func (s *Service) addCapsule(ctx context.Context, uid, coinId, num int64) {
- coinConfMap, err := s.dao.GetCapsuleConf(ctx)
- if err != nil || len(coinConfMap) == 0 {
- return
- }
- addCoinId := coinId
- if _, ok := coinConfMap[addCoinId]; !ok {
- return
- }
- totalCoin := coinConfMap[addCoinId].ChangeNum * num
- if addCoinId <= dao.ColorfulCoinId {
- _, err = s.dao.UpdateScore(ctx, uid, addCoinId, totalCoin, "sendGift", "", nil, coinConfMap[addCoinId])
- } else {
- _, err = s.dao.UpdateCapsule(ctx, uid, addCoinId, totalCoin, "sendGift", "", coinConfMap[addCoinId])
- }
- if err != nil {
- log.Error("[service.lottery|addCapsule] UpdateScore type:%d error:%v", addCoinId, err)
- return
- }
- }
- //定时重置Capusule
- func (s *Service) tickerReloadCapsuleConf(ctx context.Context) {
- changeFlag, _ := s.dao.GetCapsuleChangeFlag(ctx)
- s.dao.RelaodCapsuleConfig(ctx, changeFlag)
- ticker := time.NewTicker(time.Second)
- go func() {
- for range ticker.C {
- redisChangeFlag, _ := s.dao.GetCapsuleChangeFlag(ctx)
- capsuleCacheTime, capsuleChangeFlag := s.dao.GetCapsuleChangeInfo(ctx)
- if redisChangeFlag != capsuleChangeFlag || time.Now().Unix()-capsuleCacheTime > 60 {
- s.dao.RelaodCapsuleConfig(ctx, redisChangeFlag)
- }
- }
- }()
- }
|