123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package service
- import (
- "context"
- "encoding/json"
- "sync"
- "time"
- "go-common/app/job/main/coin/conf"
- "go-common/app/job/main/coin/dao"
- "go-common/app/job/main/coin/model"
- accrpc "go-common/app/service/main/account/api"
- arcrpc "go-common/app/service/main/archive/api/gorpc"
- coinrpc "go-common/app/service/main/coin/api/gorpc"
- coinmdl "go-common/app/service/main/coin/model"
- memrpc "go-common/app/service/main/member/api"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "go-common/library/queue/databus/databusutil"
- )
- // Service coin job service.
- type Service struct {
- coinDao *dao.Dao
- c *conf.Config
- waiter *sync.WaitGroup
- accRPC accrpc.AccountClient
- memRPC memrpc.MemberClient
- arcRPC *arcrpc.Service2
- coinRPC *coinrpc.Service
- databus *databus.Databus
- group *databusutil.Group
- expGroup *databusutil.Group
- }
- // New new and return service.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- coinDao: dao.New(c),
- c: c,
- waiter: new(sync.WaitGroup),
- arcRPC: arcrpc.New2(c.ArchiveRPC),
- coinRPC: coinrpc.New(c.CoinRPC),
- }
- var err error
- if s.memRPC, err = memrpc.NewClient(c.MemRPC); err != nil {
- panic(err)
- }
- if s.accRPC, err = accrpc.NewClient(c.AccountRPC); err != nil {
- panic(err)
- }
- s.databus = databus.New(c.Databus)
- g := databusutil.NewGroup(c.Databusutil, databus.New(c.LoginDatabus).Messages())
- g.New = newMsg
- g.Split = split
- g.Do = s.awardDo
- g.Start()
- s.group = g
- eg := databusutil.NewGroup(c.Databusutil, databus.New(c.ExpDatabus).Messages())
- eg.New = newExpMsg
- eg.Split = split
- eg.Do = s.awardDo
- eg.Start()
- s.expGroup = eg
- s.waiter.Add(1)
- go s.consumeproc()
- go s.settleproc()
- return
- }
- func (s *Service) consumeproc() {
- defer s.waiter.Done()
- var (
- msg *databus.Message
- err error
- ok bool
- period *model.CoinSettlePeriod
- ctx = context.TODO()
- )
- for {
- if msg, ok = <-s.databus.Messages(); !ok {
- log.Error("s.databus.Message err(%v)", err)
- return
- }
- r := &coinmdl.Record{}
- if err = json.Unmarshal([]byte(msg.Value), r); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
- dao.PromError("msg:JSON")
- continue
- }
- var ip = r.IPV6
- for i := 0; i < 3; i++ {
- if err = s.addCoinExp(ctx, r.Mid, r.AvType, r.Multiply, ip); err != nil {
- time.Sleep(time.Millisecond * 50)
- continue
- }
- break
- }
- if err != nil {
- log.Errorv(ctx, log.KV("log", "fix: addCoinExp"), log.KV("mid", r.Mid), log.KV("type", r.AvType), log.KV("num", r.Multiply), log.KV("ip", ip))
- continue
- }
- if err = s.updateAddCoin(ctx, r); err != nil {
- continue
- }
- at := time.Unix(r.Timestamp, 0)
- if period, err = s.coinDao.HitSettlePeriod(ctx, at); err != nil {
- log.Errorv(ctx, log.KV("log", "s.coinDao.HitCoinPeriod"), log.KV("record", r), log.KV("err", err))
- dao.PromError("service:HitSettlePeriod")
- continue
- }
- for i := 0; ; i++ {
- if err = s.coinDao.UpsertSettle(ctx, period.ID, r.Up, r.Aid, r.AvType, r.Multiply, time.Now()); err != nil {
- log.Error("s.coinDao.UpsertCoinSettle(%d, %d, %d) error(%v)", r.Up, r.Aid, r.Multiply)
- dao.PromError("service:UpsertSettle")
- i++
- if i > 5 {
- // if env.DeployEnv == env.DeployEnvProd {
- // s.moni.Sms(ctx, s.c.Sms.Phone, s.c.Sms.Token, "coin-job upsetSettle fail for 5 time")
- // }
- break
- }
- continue
- }
- break
- }
- log.Info("key: %s,partion:%d,offset:%d success %s", msg.Key, msg.Partition, msg.Offset, msg.Value)
- err = msg.Commit()
- if err != nil {
- log.Error("msg.Commit partition:%d offset:%d err %v", msg.Partition, msg.Offset, err)
- dao.PromError("service:msgCommit")
- }
- }
- }
- // Close close service.
- func (s *Service) Close() {
- s.group.Close()
- s.expGroup.Close()
- s.databus.Close()
- }
- // Wait wait routine unitl all close.
- func (s *Service) Wait() {
- s.waiter.Wait()
- }
- // Ping check service health.
- func (s *Service) Ping(c context.Context) error {
- return s.coinDao.Ping(c)
- }
- func (s *Service) updateAddCoin(c context.Context, record *coinmdl.Record) (err error) {
- if record == nil {
- return
- }
- if err = s.coinRPC.UpdateAddCoin(c, record); err != nil {
- log.Errorv(c, log.KV("log", "UpdateAddCoin"), log.KV("mid", record.Mid), log.KV("record", record))
- dao.PromError("service:updateAddCoin")
- }
- return
- }
|