123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728 |
- package service
- import (
- "context"
- "encoding/json"
- xlog "log"
- "strconv"
- "sync"
- "time"
- "go-common/app/job/main/vip/conf"
- "go-common/app/job/main/vip/dao"
- "go-common/app/job/main/vip/model"
- couponrpc "go-common/app/service/main/coupon/rpc/client"
- v1 "go-common/app/service/main/vip/api"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "github.com/pkg/errors"
- "github.com/robfig/cron"
- )
- const (
- _tableUserInfo = "vip_user_info"
- _tablePayOrder = "vip_pay_order"
- _insertAction = "insert"
- _updateAction = "update"
- _deleteAction = "delete"
- notifyAction = "updateVip"
- _ps = 50
- _defsleepmsec = 100
- )
- //Service vip service
- type Service struct {
- dao *dao.Dao
- c *conf.Config
- //vipRPC *client.Service
- reducePayOrder map[string]*model.VipPayOrder
- appMap map[int64]*model.VipAppInfo
- confMap map[string]*model.VipConfig
- cleanVipCache chan int64
- cleanAppCache chan *model.AppCache
- ds *databus.Databus
- handlerFailPayOrder chan *model.VipPayOrder
- handlerFailUserInfo chan *model.VipUserInfo
- handlerFailRechargeOrder chan *model.VipPayOrder
- handlerFailVipbuy chan *model.VipBuyResq
- handlerInsertOrder chan *model.VipPayOrder
- handlerUpdateOrder chan *model.VipPayOrder
- handlerRechargeOrder chan *model.VipPayOrder
- handlerInsertUserInfo chan *model.VipUserInfo
- handlerUpdateUserInfo chan *model.VipUserInfo
- handlerStationActive chan *model.VipPayOrder
- handlerAutoRenewLog chan *model.VipUserInfo
- handlerAddVipHistory chan *model.VipChangeHistoryMsg
- handlerAddBcoinSalary chan *model.VipBcoinSalaryMsg
- handlerUpdateBcoinSalary chan *model.VipBcoinSalaryMsg
- handlerDelBcoinSalary chan *model.VipBcoinSalaryMsg
- notifycouponchan chan func()
- accLogin *databus.Databus
- frozenDate time.Duration
- newVipDatabus *databus.Databus
- salaryCoupnDatabus *databus.Databus
- accountNoitfyDatabus *databus.Databus
- couponNotifyDatabus *databus.Databus
- autoRenewdDatabus *databus.Databus
- // waiter
- waiter sync.WaitGroup
- closed bool
- sendmsgchan chan func()
- couponRPC *couponrpc.Service
- // vip service
- vipgRPC v1.VipClient
- }
- //New new service
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- dao: dao.New(c),
- //vipRPC: client.New(c.VipRPC),
- cleanVipCache: make(chan int64, 10240),
- confMap: make(map[string]*model.VipConfig),
- cleanAppCache: make(chan *model.AppCache, 10240),
- handlerFailPayOrder: make(chan *model.VipPayOrder, 10240),
- handlerFailUserInfo: make(chan *model.VipUserInfo, 10240),
- handlerFailRechargeOrder: make(chan *model.VipPayOrder, 10240),
- handlerInsertOrder: make(chan *model.VipPayOrder, 10240),
- handlerRechargeOrder: make(chan *model.VipPayOrder, 10240),
- handlerUpdateOrder: make(chan *model.VipPayOrder, 10240),
- handlerInsertUserInfo: make(chan *model.VipUserInfo, 10240),
- handlerUpdateUserInfo: make(chan *model.VipUserInfo, 10240),
- handlerStationActive: make(chan *model.VipPayOrder, 10240),
- handlerAutoRenewLog: make(chan *model.VipUserInfo, 10240),
- handlerAddVipHistory: make(chan *model.VipChangeHistoryMsg, 10240),
- handlerAddBcoinSalary: make(chan *model.VipBcoinSalaryMsg, 10240),
- handlerUpdateBcoinSalary: make(chan *model.VipBcoinSalaryMsg, 10240),
- handlerDelBcoinSalary: make(chan *model.VipBcoinSalaryMsg, 10240),
- handlerFailVipbuy: make(chan *model.VipBuyResq, 10240),
- notifycouponchan: make(chan func(), 10240),
- ds: databus.New(c.Databus.OldVipBinLog),
- newVipDatabus: databus.New(c.Databus.NewVipBinLog),
- accLogin: databus.New(c.Databus.AccLogin),
- frozenDate: time.Duration(c.Property.FrozenDate),
- reducePayOrder: make(map[string]*model.VipPayOrder),
- sendmsgchan: make(chan func(), 10240),
- accountNoitfyDatabus: databus.New(c.Databus.AccountNotify),
- couponRPC: couponrpc.New(c.RPCClient2.Coupon),
- }
- vipgRPC, err := v1.NewClient(c.VipClient)
- if err != nil {
- panic(err)
- }
- s.vipgRPC = vipgRPC
- t := cron.New()
- go s.loadappinfoproc()
- go s.cleanappcacheretryproc()
- go s.cleanvipretryproc()
- go s.sendmessageproc()
- go s.handlerfailpayorderproc()
- go s.handlerfailrechargeorderproc()
- go s.handlerfailuserinfoproc()
- go s.handlerautorenewlogproc()
- go s.handlerdelbcoinproc()
- if c.Databus.SalaryCoupon != nil {
- s.salaryCoupnDatabus = databus.New(c.Databus.SalaryCoupon)
- s.waiter.Add(1)
- go s.salarycouponproc()
- }
- if c.Databus.CouponNotify != nil {
- s.couponNotifyDatabus = databus.New(c.Databus.CouponNotify)
- go s.couponnotifyproc()
- s.waiter.Add(1)
- go s.couponnotifybinlogproc()
- }
- for i := 0; i < s.c.Property.HandlerThread; i++ {
- go s.handlerinsertorderproc()
- go s.handlerupdateorderproc()
- go s.handlerinsertuserinfoproc()
- go s.handlerupdateuserinfoproc()
- go s.handleraddchangehistoryproc()
- go s.handleraddbcoinproc()
- go s.handlerupdatebcoinproc()
- go s.handlerupdaterechargeorderproc()
- }
- for i := 0; i < s.c.Property.ReadThread; i++ {
- go s.readdatabusproc()
- }
- go s.readnewvipdatabusproc()
- if c.Property.FrozenCron != "" {
- go s.accloginproc()
- t.AddFunc(c.Property.FrozenCron, s.unFrozenJob)
- }
- t.AddFunc(c.Property.UpdateUserInfoCron, s.updateUserInfoJob)
- t.AddFunc(c.Property.SalaryVideoCouponCron, s.salaryVideoCouponJob)
- t.AddFunc(c.Property.PushDataCron, s.pushDataJob)
- t.AddFunc(c.Property.EleEompensateCron, s.eleEompensateJob)
- //t.AddFunc(c.Property.HadExpiredMsgCron, s.hadExpiredMsgJob)
- //t.AddFunc(c.Property.WillExpireMsgCron, s.willExpiredMsgJob)
- //t.AddFunc(c.Property.SendMessageCron, s.sendMessageJob)
- //t.AddFunc(c.Property.AutoRenewCron, s.autoRenewJob)
- //t.AddFunc(c.Property.SendBcoinCron, s.sendBcoinJob)
- t.Start()
- go s.consumercheckproc()
- if c.Databus.AutoRenew != nil {
- s.autoRenewdDatabus = databus.New(c.Databus.AutoRenew)
- s.waiter.Add(1)
- go s.retryautorenewpayproc()
- }
- return
- }
- func (s *Service) readnewvipdatabusproc() {
- defer func() {
- if r := recover(); r != nil {
- r = errors.WithStack(r.(error))
- log.Error("Runtime error caught: %+v", r)
- go s.readnewvipdatabusproc()
- }
- }()
- var err error
- for msg := range s.newVipDatabus.Messages() {
- val := msg.Value
- if err = msg.Commit(); err != nil {
- log.Error("readdatabusproc msg.commit() error(%v)", err)
- msg.Commit()
- }
- log.Info("cur consumer new vip db message(%v)", string(msg.Value))
- message := new(model.Message)
- if err = json.Unmarshal(val, message); err != nil {
- log.Error("readnewvipdatabusproc json.unmarshal val(%+v) error(%+v)", string(val), err)
- continue
- }
- if message.Table == "vip_user_info" {
- userInfo := new(model.VipUserInfoNewMsg)
- if err = json.Unmarshal(message.New, userInfo); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
- continue
- }
- vipUser := convertUserInfoByNewMsg(userInfo)
- s.dao.DelInfoCache(context.Background(), vipUser.Mid)
- if message.Action == _insertAction {
- if vipUser.PayType == model.AutoRenew {
- select {
- case s.handlerAutoRenewLog <- vipUser:
- default:
- log.Error("s.handlerAutoRenewLog full!")
- }
- }
- } else if message.Action == _updateAction {
- oldUserMsg := new(model.VipUserInfoNewMsg)
- if err = json.Unmarshal(message.Old, oldUserMsg); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.Old), err)
- continue
- }
- oldUser := convertUserInfoByNewMsg(oldUserMsg)
- if oldUser.PayType != vipUser.PayType {
- select {
- case s.handlerAutoRenewLog <- vipUser:
- default:
- log.Error("s.handlerAutoRenewLog full update!")
- }
- }
- }
- s.pubAccountNotify(vipUser.Mid)
- }
- }
- }
- func (s *Service) readdatabusproc() {
- defer func() {
- if r := recover(); r != nil {
- r = errors.WithStack(r.(error))
- log.Error("Runtime error caught: %+v", r)
- go s.readdatabusproc()
- }
- }()
- var err error
- for msg := range s.ds.Messages() {
- val := msg.Value
- message := new(model.Message)
- if err = json.Unmarshal(val, message); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(val), err)
- if err = msg.Commit(); err != nil {
- log.Error("msg.commit() error(%v)", err)
- }
- continue
- }
- if message.Table == "vip_pay_order" {
- order := new(model.VipPayOrderOldMsg)
- if err = json.Unmarshal(message.New, order); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
- if err = msg.Commit(); err != nil {
- log.Error("msg.commit() error(%v)", err)
- }
- continue
- }
- payOrder := s.convertPayOrder(order)
- if message.Action == "insert" {
- select {
- case s.handlerInsertOrder <- payOrder:
- default:
- xlog.Panic("s.handlerInsertOrder full!")
- }
- } else if message.Action == "update" {
- select {
- case s.handlerUpdateOrder <- payOrder:
- default:
- xlog.Panic("s.handlerUpdateOrder full!")
- }
- }
- } else if message.Table == "vip_recharge_order" {
- order := new(model.VipRechargeOrderMsg)
- if err = json.Unmarshal(message.New, order); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
- if err = msg.Commit(); err != nil {
- log.Error("msg.commit() error(%v)", err)
- }
- continue
- }
- payOrder := s.convertPayOrderByMsg(order)
- if message.Action == "update" {
- select {
- case s.handlerRechargeOrder <- payOrder:
- default:
- xlog.Panic("s.handlerRechargeOrder full!")
- }
- } else if message.Action == "insert" {
- if len(payOrder.ThirdTradeNo) > 0 {
- select {
- case s.handlerRechargeOrder <- payOrder:
- default:
- xlog.Panic("s.handlerRechargeOrder full!")
- }
- }
- }
- } else if message.Table == "vip_user_info" {
- userInfo := new(model.VipUserInfoMsg)
- if err = json.Unmarshal(message.New, userInfo); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
- if err = msg.Commit(); err != nil {
- log.Error("msg.commit() error(%v)", err)
- }
- continue
- }
- vipUser := convertMsgToUserInfo(userInfo)
- if message.Action == "insert" {
- select {
- case s.handlerInsertUserInfo <- vipUser:
- default:
- xlog.Panic("s.handlerInsertUserInfo full!")
- }
- } else if message.Action == "update" {
- oldUser := new(model.VipUserInfoMsg)
- if err = json.Unmarshal(message.Old, oldUser); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.Old), err)
- if err = msg.Commit(); err != nil {
- log.Error("msg.commit() error(%v)", err)
- }
- continue
- }
- vipUser.OldVer = oldUser.Ver
- select {
- case s.handlerUpdateUserInfo <- vipUser:
- default:
- xlog.Panic("s.handlerUpdateUserInfo full!")
- }
- }
- if !s.grayScope(userInfo.Mid) {
- s.cleanCache(userInfo.Mid)
- }
- } else if message.Table == "vip_change_history" {
- historyMsg := new(model.VipChangeHistoryMsg)
- if err = json.Unmarshal(message.New, historyMsg); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
- if err = msg.Commit(); err != nil {
- log.Error("msg.commit() error(%v)", err)
- }
- continue
- }
- if message.Action == "insert" {
- select {
- case s.handlerAddVipHistory <- historyMsg:
- default:
- xlog.Panic("s.handlerAddVipHistory full!")
- }
- }
- } else if message.Table == "vip_bcoin_salary" {
- bcoinMsg := new(model.VipBcoinSalaryMsg)
- if err = json.Unmarshal(message.New, bcoinMsg); err != nil {
- log.Error("readdatabusproc json.Unmarshal val(%v) error(%v)", string(message.New), err)
- if err = msg.Commit(); err != nil {
- log.Error("msg.commit() error(%v)", err)
- }
- continue
- }
- if message.Action == _insertAction {
- select {
- case s.handlerAddBcoinSalary <- bcoinMsg:
- default:
- xlog.Panic("s.handlerAddBcoinSalary full!")
- }
- } else if message.Action == _updateAction {
- select {
- case s.handlerUpdateBcoinSalary <- bcoinMsg:
- default:
- xlog.Panic("s.handlerUpdateBcoinSalary full!")
- }
- } else if message.Action == _deleteAction {
- select {
- case s.handlerDelBcoinSalary <- bcoinMsg:
- default:
- xlog.Panic("s.handlerDelBcoinSalary full!")
- }
- }
- }
- if err = msg.Commit(); err != nil {
- log.Error("readdatabusproc msg.commit() error(%v)", err)
- msg.Commit()
- }
- log.Info("cur consumer message(%v)", string(msg.Value))
- }
- }
- func (s *Service) cleanCache(mid int64) {
- var (
- hv = new(model.HandlerVip)
- err error
- )
- hv.Type = 2
- hv.Days = 0
- hv.Months = 0
- hv.Mid = mid
- if err = s.cleanCacheAndNotify(context.TODO(), hv); err != nil {
- select {
- case s.cleanVipCache <- hv.Mid:
- default:
- xlog.Panic("s.cleanVipCache full!")
- }
- }
- s.pubAccountNotify(mid)
- }
- func (s *Service) pubAccountNotify(mid int64) (err error) {
- data := new(struct {
- Mid int64 `json:"mid"`
- Action string `json:"action"`
- })
- data.Mid = mid
- data.Action = notifyAction
- if err = s.accountNoitfyDatabus.Send(context.TODO(), strconv.FormatInt(mid, 10), data); err != nil {
- log.Error("send (%+v) error(%+v)", data, err)
- }
- log.Info("send(mid:%+v) data:%+v", mid, data)
- return
- }
- func (s *Service) loadappinfoproc() {
- defer func() {
- if r := recover(); r != nil {
- r = errors.WithStack(r.(error))
- log.Error("Runtime error caught: %+v", r)
- go s.loadappinfoproc()
- }
- }()
- for {
- s.loadAppInfo()
- time.Sleep(time.Minute * 2)
- }
- }
- func (s *Service) loadAppInfo() {
- var (
- res []*model.VipAppInfo
- err error
- )
- if res, err = s.dao.SelAppInfo(context.TODO()); err != nil {
- log.Error("loadAppInfo SelAppInfo error(%v)", err)
- return
- }
- aMap := make(map[int64]*model.VipAppInfo, len(res))
- for _, v := range res {
- aMap[v.ID] = v
- }
- s.appMap = aMap
- bytes, _ := json.Marshal(res)
- log.Info("load app success :%v", string(bytes))
- }
- func (s *Service) cleanvipretryproc() {
- defer func() {
- if r := recover(); r != nil {
- r = errors.WithStack(r.(error))
- log.Error("Runtime error caught: %+v", r)
- go s.cleanvipretryproc()
- }
- }()
- for {
- mid := <-s.cleanVipCache
- s.cleanVipRetry(mid)
- }
- }
- func (s *Service) cleanVipRetry(mid int64) {
- hv := new(model.HandlerVip)
- hv.Type = 2
- hv.Days = 0
- hv.Months = 0
- hv.Mid = mid
- s.dao.DelInfoCache(context.Background(), mid)
- for i := 0; i < s.c.Property.Retry; i++ {
- if err := s.cleanCacheAndNotify(context.TODO(), hv); err == nil {
- break
- }
- s.dao.DelVipInfoCache(context.TODO(), int64(hv.Mid))
- }
- log.Info("handler success cache fail mid(%v)", mid)
- }
- func (s *Service) cleanappcacheretryproc() {
- defer func() {
- if r := recover(); r != nil {
- r = errors.WithStack(r.(error))
- log.Error("Runtime error caught: %+v", r)
- go s.cleanappcacheretryproc()
- }
- }()
- for {
- ac := <-s.cleanAppCache
- s.cleanAppCacheRetry(ac)
- }
- }
- func (s *Service) cleanAppCacheRetry(ac *model.AppCache) {
- appInfo := s.appMap[ac.AppID]
- hv := new(model.HandlerVip)
- hv.Type = 2
- hv.Days = 0
- hv.Months = 0
- hv.Mid = ac.Mid
- for i := 0; i < s.c.Property.Retry; i++ {
- if err := s.dao.SendAppCleanCache(context.TODO(), hv, appInfo); err == nil {
- break
- }
- }
- log.Info("handler success cache app fail appInfo(%v)", ac)
- }
- func (s *Service) salarycouponproc() {
- defer func() {
- if r := recover(); r != nil {
- r = errors.WithStack(r.(error))
- log.Error("Runtime error caught: %+v", r)
- go s.salarycouponproc()
- }
- }()
- defer s.waiter.Done()
- var (
- err error
- msg *databus.Message
- msgChan = s.salaryCoupnDatabus.Messages()
- ok bool
- c = context.Background()
- )
- for {
- msg, ok = <-msgChan
- if !ok || s.closed {
- log.Info("salary coupon msgChan closed")
- return
- }
- msg.Commit()
- v := &model.Message{}
- if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
- log.Error("json.Unmarshal(%v) err(%v)", v, err)
- continue
- }
- if v.Table != _tableUserInfo {
- continue
- }
- nvip := &model.VipUserInfoMsg{}
- if err = json.Unmarshal(v.New, &nvip); err != nil {
- log.Error("salary new json.Unmarshal values(%v),error(%v)", string(v.New), err)
- continue
- }
- ovip := &model.VipUserInfoMsg{}
- if v.Action != _insertAction {
- if err = json.Unmarshal(v.Old, &ovip); err != nil {
- log.Error("salary old json.Unmarshal values(%v),error(%v)", string(v.Old), err)
- continue
- }
- }
- log.Info("salary coupon start mid(%d)", nvip.Mid)
- if _, err = s.SalaryVideoCouponAtOnce(c, nvip, ovip, v.Action); err != nil {
- log.Error("SalaryVideoCouponAtOnce fail(%d) nvip(%v) ovip(%v) %s error(%v)", nvip.Mid, nvip, ovip, v.Action, err)
- continue
- }
- log.Info("salary coupon suc mid(%d)", nvip.Mid)
- }
- }
- func (s *Service) couponnotifybinlogproc() {
- defer func() {
- if r := recover(); r != nil {
- r = errors.WithStack(r.(error))
- log.Error("Runtime error couponnotifybinlogproc caught: %+v", r)
- go s.couponnotifybinlogproc()
- }
- }()
- defer s.waiter.Done()
- var (
- err error
- msg *databus.Message
- msgChan = s.couponNotifyDatabus.Messages()
- ok bool
- c = context.Background()
- )
- for {
- msg, ok = <-msgChan
- if !ok || s.closed {
- log.Info("coupon notify couponnotifybinlogproc msgChan closed")
- return
- }
- if err = msg.Commit(); err != nil {
- log.Error("couponnotifybinlogproc msg.Commit err(%v)", err)
- continue
- }
- log.Info("cur consumer couponnotifybinlogproc(%v)", string(msg.Value))
- v := &model.Message{}
- if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
- log.Error("couponnotifybinlogproc json.Unmarshal(%v) err(%v)", v, err)
- continue
- }
- if v.Table != _tablePayOrder || v.Action != _updateAction {
- continue
- }
- newo := new(model.VipPayOrderNewMsg)
- if err = json.Unmarshal(v.New, newo); err != nil {
- log.Error("couponnotifybinlogproc json.Unmarshal val(%v) error(%v)", string(v.New), err)
- continue
- }
- oldo := new(model.VipPayOrderNewMsg)
- if err = json.Unmarshal(v.Old, oldo); err != nil {
- log.Error("couponnotifybinlogproc json.Unmarshal val(%v) error(%v)", string(v.Old), err)
- continue
- }
- if newo == nil || oldo == nil {
- continue
- }
- if oldo.Status != model.PAYING {
- continue
- }
- if newo.Status != model.SUCCESS && newo.Status != model.FAILED {
- continue
- }
- if newo.CouponMoney <= 0 {
- continue
- }
- s.couponnotify(func() {
- s.CouponNotify(c, newo)
- })
- }
- }
- func (s *Service) updateUserInfoJob() {
- log.Info("update user info job start ....................................")
- s.ScanUserInfo(context.TODO())
- log.Info("update user info job end ........................................")
- }
- func (s *Service) salaryVideoCouponJob() {
- log.Info("salary video coupon job start ....................................")
- var err error
- if ok := s.dao.AddTransferLock(context.TODO(), "_transferLock"); !ok {
- log.Info("salary video coupon job had run ....................................")
- return
- }
- if err = s.ScanSalaryVideoCoupon(context.TODO()); err != nil {
- log.Error("ScanSalaryVideoCoupon error(%v)", err)
- return
- }
- log.Info("salary video coupon job end ........................................")
- }
- //Ping check db live
- func (s *Service) Ping(c context.Context) (err error) {
- return s.dao.Ping(c)
- }
- // Close all resource.
- func (s *Service) Close() {
- defer s.waiter.Wait()
- s.closed = true
- s.salaryCoupnDatabus.Close()
- s.dao.Close()
- s.ds.Close()
- s.newVipDatabus.Close()
- }
- func (s *Service) sendmessageproc() {
- defer func() {
- if x := recover(); x != nil {
- log.Error("service.sendmessageproc panic(%v)", x)
- go s.sendmessageproc()
- log.Info("service.sendmessageproc recover")
- }
- }()
- for {
- f := <-s.sendmsgchan
- f()
- }
- }
- func (s *Service) sendmessage(f func()) {
- defer func() {
- if x := recover(); x != nil {
- log.Error("service.sendmessage panic(%v)", x)
- }
- }()
- select {
- case s.sendmsgchan <- f:
- default:
- log.Error("service.sendmessage chan full")
- }
- }
- func (s *Service) consumercheckproc() {
- for {
- time.Sleep(time.Second)
- log.Info("consumercheckproc chan(cleanVipCache) size: %d", len(s.cleanVipCache))
- log.Info("consumercheckproc chan(cleanAppCache) size: %d", len(s.cleanAppCache))
- log.Info("consumercheckproc chan(handlerFailPayOrder) size: %d", len(s.handlerFailPayOrder))
- log.Info("consumercheckproc chan(handlerFailUserInfo) size: %d", len(s.handlerFailUserInfo))
- log.Info("consumercheckproc chan(handlerFailRechargeOrder) size: %d", len(s.handlerFailRechargeOrder))
- log.Info("consumercheckproc chan(handlerFailVipbuy) size: %d", len(s.handlerFailVipbuy))
- log.Info("consumercheckproc chan(handlerInsertOrder) size: %d", len(s.handlerInsertOrder))
- log.Info("consumercheckproc chan(handlerUpdateOrder) size: %d", len(s.handlerUpdateOrder))
- log.Info("consumercheckproc chan(handlerRechargeOrder) size: %d", len(s.handlerRechargeOrder))
- log.Info("consumercheckproc chan(handlerInsertUserInfo) size: %d", len(s.handlerInsertUserInfo))
- log.Info("consumercheckproc chan(handlerUpdateUserInfo) size: %d", len(s.handlerUpdateUserInfo))
- log.Info("consumercheckproc chan(handlerStationActive) size: %d", len(s.handlerStationActive))
- log.Info("consumercheckproc chan(handlerAutoRenewLog) size: %d", len(s.handlerAutoRenewLog))
- log.Info("consumercheckproc chan(handlerAddVipHistory) size: %d", len(s.handlerAddVipHistory))
- log.Info("consumercheckproc chan(handlerAddBcoinSalary) size: %d", len(s.handlerAddBcoinSalary))
- log.Info("consumercheckproc chan(handlerUpdateBcoinSalary) size: %d", len(s.handlerUpdateBcoinSalary))
- log.Info("consumercheckproc chan(handlerDelBcoinSalary) size: %d", len(s.handlerDelBcoinSalary))
- log.Info("consumercheckproc chan(sendmsgchan) size: %d", len(s.sendmsgchan))
- log.Info("consumercheckproc chan(notifycouponchan) size: %d", len(s.notifycouponchan))
- }
- }
|