123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729 |
- package service
- import (
- "context"
- "encoding/json"
- "runtime"
- "strconv"
- "sync"
- "time"
- actrpc "go-common/app/interface/main/activity/rpc/client"
- artmdl "go-common/app/interface/openplatform/article/model"
- artrpc "go-common/app/interface/openplatform/article/rpc/client"
- "go-common/app/job/main/activity/conf"
- "go-common/app/job/main/activity/dao/bnj"
- "go-common/app/job/main/activity/dao/dm"
- "go-common/app/job/main/activity/dao/kfc"
- "go-common/app/job/main/activity/dao/like"
- kfcmdl "go-common/app/job/main/activity/model/kfc"
- l "go-common/app/job/main/activity/model/like"
- "go-common/app/job/main/activity/model/match"
- "go-common/app/service/main/account/api"
- arcapi "go-common/app/service/main/archive/api"
- arcrpc "go-common/app/service/main/archive/api/gorpc"
- comarcmdl "go-common/app/service/main/archive/model/archive"
- "go-common/app/service/main/coin/api/gorpc"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "github.com/robfig/cron"
- )
- const (
- //_startVoteP = 2
- //_startVote = 3
- //_endingVote = 4
- //_endVote = 5
- //_goOn = 6
- //_next = 7
- _matchObjTable = "act_matchs_object"
- _subjectTable = "act_subject"
- _likesTable = "likes"
- _likeContentTable = "like_content"
- _likeActionTable = "like_action"
- //_vipActOrderTable = "vip_order_activity_record"
- _objectPieceSize = 100
- _retryTimes = 3
- _typeArc = "archive"
- _typeArt = "article"
- _sharding = 10
- )
- // Service service
- type Service struct {
- c *conf.Config
- dao *like.Dao
- bnj *bnj.Dao
- dm *dm.Dao
- kfcDao *kfc.Dao
- // waiter
- waiter sync.WaitGroup
- closed bool
- // cache: type, upper
- // arc rpc
- arcRPC *arcrpc.Service2
- coinRPC *coin.Service
- actRPC *actrpc.Service
- articleRPC *artrpc.Service
- //grpc
- accClient api.AccountClient
- // databus
- actSub *databus.Databus
- bnjSub *databus.Databus
- // vip binlog databus
- //vipSub *databus.Databus
- kfcSub *databus.Databus
- kfcActionCh []chan *kfcmdl.CouponMsg
- kfcShare int
- subActionCh []chan *l.Action
- actionSM []map[int64]*l.LastTmStat
- // bnj
- bnjMaxSecond int64
- bnjLessSecond int64
- bnjTimeFinish int64
- bnjMsgFlagMap map[int]int64
- bnjMsgFlagMu sync.Mutex
- bnjWxMsgFlagMap map[int]int64
- bnjWxMsgFlagMu sync.Mutex
- // cron
- cron *cron.Cron
- }
- // New is archive service implementation.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- dao: like.New(c),
- dm: dm.New(c),
- bnj: bnj.New(c),
- kfcDao: kfc.New(c),
- arcRPC: arcrpc.New2(c.ArchiveRPC),
- articleRPC: artrpc.New(c.ArticleRPC),
- coinRPC: coin.New(c.CoinRPC),
- actRPC: actrpc.New(c.ActRPC),
- actSub: databus.New(c.ActSub),
- bnjSub: databus.New(c.BnjSub),
- //vipSub: databus.New(c.VipSub),
- kfcSub: databus.New(c.KfcSub),
- cron: cron.New(),
- }
- var err error
- if s.accClient, err = api.NewClient(c.AccClient); err != nil {
- panic(err)
- }
- if s.c.Bnj2019.MsgSpec != "" {
- if err = s.cron.AddFunc(s.c.Bnj2019.MsgSpec, s.cronInformationMessage); err != nil {
- panic(err)
- }
- log.Info("cronInformationMessage init")
- s.cron.Start()
- }
- //time.Sleep(2 * time.Second)
- //subject, err := s.sub(context.Background(), c.Rule.BroadcastSid)
- //if err != nil {
- // log.Error("error(%v)", err)
- // return
- //}
- //log.Info("start-subject")
- //log.Info("subject(%v)", subject)
- //log.Info("end-subject")
- //if subject != nil {
- // go s.genesis(subject)
- //}
- s.bnjMsgFlagMap = make(map[int]int64, len(bnjSteps))
- s.bnjWxMsgFlagMap = make(map[int]int64, len(bnjSteps))
- for _, step := range bnjSteps {
- s.bnjMsgFlagMap[step] = 0
- s.bnjWxMsgFlagMap[step] = 0
- }
- for i := 0; i < _sharding; i++ {
- s.subActionCh = append(s.subActionCh, make(chan *l.Action, 10240))
- s.actionSM = append(s.actionSM, map[int64]*l.LastTmStat{})
- s.waiter.Add(1)
- go s.actionDealProc(i)
- }
- //s.waiter.Add(1)
- //go s.vipCanal()
- s.waiter.Add(1)
- go s.consumeCanal()
- go s.subjectStat(s.c.Rule.ArcObjStatSid, _typeArc)
- go s.subjectStat(s.c.Rule.ArtObjStatSid, _typeArt)
- go s.kingStoryTotalStat(s.c.Rule.KingStorySid)
- go s.subsRankproc()
- go s.initBnjSecond()
- if runtime.NumCPU() <= 4 {
- s.kfcShare = 4
- } else if runtime.NumCPU() > 32 {
- s.kfcShare = 32
- } else {
- s.kfcShare = runtime.NumCPU()
- }
- for j := 0; j < s.kfcShare; j++ {
- s.kfcActionCh = append(s.kfcActionCh, make(chan *kfcmdl.CouponMsg, 10240))
- s.waiter.Add(1)
- go s.kfcActionDeal(j)
- }
- s.waiter.Add(1)
- go s.kfcCanal()
- return s
- }
- func (s *Service) likeArc(c context.Context, sub *l.Subject) (res *l.Subject, err error) {
- if sub != nil {
- if sub.ID == 0 {
- res = nil
- } else {
- res = sub
- var (
- ok bool
- arcs map[int64]*arcapi.Arc
- aids []int64
- )
- for _, l := range res.List {
- aids = append(aids, l.Wid)
- }
- argAids := &comarcmdl.ArgAids2{
- Aids: aids,
- }
- if arcs, err = s.arcRPC.Archives3(c, argAids); err != nil {
- log.Error("s.arcRPC.Archives(arcAids:(%v), arcs), err(%v)", aids, err)
- return
- }
- for _, l := range res.List {
- if l.Archive, ok = arcs[l.Wid]; !ok {
- log.Info("s.arcs.wid:(%d), (%v)", l.Wid, ok)
- continue
- }
- }
- }
- }
- return
- }
- //func (s *Service) genesis(l *l.Subject) {
- // var (
- // index int
- // nowTime, stage, yes, no, next int64
- // err error
- // c = context.Background()
- // )
- // log.Info("st")
- // for {
- // if time.Now().Unix() >= l.Stime.Time().Unix() {
- // break
- // }
- // time.Sleep(time.Second)
- // }
- // lstime := map[string]interface{}{
- // "aid": 0,
- // "time": 0,
- // "index": 0,
- // "stage": 0,
- // }
- // go s.inLtime(lstime, l.ID)
- // for i, a := range l.List {
- // arg1 := &comarcmdl.ArgAid2{Aid: a.Archive.Aid}
- // arc, errRPC := s.arcRPC.Archive3(c, arg1)
- // if errRPC != nil {
- // log.Error("act-job s.arcRPC.Archive3(%v) error(%v)", arg1, errRPC)
- // errRPC = nil
- // continue
- // }
- // if arc.State < 0 && arc.State != -6 {
- // log.Error("act-job s.arcRPC.Archive3(%v) stat err", arg1)
- // }
- // index = i
- // nowTime = 0
- // stage = 0
- // next = 0
- // time.Sleep(time.Duration(l.Ltime) * time.Second)
- // log.Info("aid sleep")
- // aidTime := time.Now().Unix()
- // ltime := map[string]interface{}{
- // "aid": a.Archive.Aid,
- // "time": aidTime,
- // "index": i,
- // "stage": stage,
- // "title": arc.Title,
- // "author": arc.Author.Name,
- // "tname": arc.TypeName,
- // }
- // go s.inLtime(ltime, l.ID)
- // for {
- // fmt.Println(stage)
- // log.Info("s.stage{%d}", stage)
- // //演出开始
- // ltime := map[string]interface{}{
- // "aid": a.Archive.Aid,
- // "time": aidTime,
- // "index": i,
- // "stage": stage,
- // "title": arc.Title,
- // "author": arc.Author.Name,
- // "tname": arc.TypeName,
- // }
- // go s.inLtime(ltime, l.ID)
- // tp := l.Interval - l.Ltime
- // nowTime += tp
- // time.Sleep(time.Duration(tp) * time.Second)
- // //预备投票
- // sdm := &dmm.ActDM{Act: _startVoteP, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
- // go s.brodcast(sdm)
- // log.Info("act:1")
- // go s.dao.CreateSelection(c, a.Archive.Aid, stage)
- // nowTime += l.Ltime
- // time.Sleep(time.Duration(l.Ltime) * time.Second)
- // //投票开始
- // interval := &dmm.ActDM{Act: _startVote, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
- // go s.brodcast(interval)
- // log.Info("act:2")
- // tl := l.Tlimit - l.Ltime
- // nowTime += tl
- // time.Sleep(time.Duration(tl) * time.Second)
- // //投票预结束
- // intervalP := &dmm.ActDM{Act: _endingVote, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
- // log.Info("act:3")
- // go s.brodcast(intervalP)
- // nowTime += l.Ltime
- // time.Sleep(time.Duration(l.Ltime) * time.Second)
- // //投票结果
- // if yes, no, err = s.dao.Selection(c, a.Archive.Aid, stage); err != nil {
- // log.Error("s.dao.Selection() error(%v)", err)
- // return
- // }
- // goNext := true
- // if yes != 0 || no != 0 {
- // goNext = (float64(no)/float64(yes+no)*100 > 40)
- // }
- // if goNext {
- // next = 1
- // } else {
- // next = 0
- // }
- // intervalEnd := &dmm.ActDM{Act: _endVote, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
- // go s.brodcast(intervalEnd)
- // log.Info("act:4")
- // nowTime += l.Ltime
- // time.Sleep(time.Duration(l.Ltime) * time.Second)
- // go s.inOnlinelog(c, l.ID, a.Archive.Aid, stage, yes, no)
- // if goNext {
- // tlimit := &dmm.ActDM{Act: _next, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
- // go s.brodcast(tlimit)
- // log.Info("act:6")
- // nowTime += l.Ltime
- // ldtime := map[string]interface{}{
- // "aid": 0,
- // "time": time.Now().Unix() + l.Ltime,
- // "index": i + 1,
- // "stage": 0,
- // "title": arc.Title,
- // "author": arc.Author.Name,
- // "tname": arc.TypeName,
- // }
- // go s.inLtime(ldtime, l.ID)
- // time.Sleep(time.Duration(l.Ltime) * time.Second)
- // break
- // }
- // tlimit := &dmm.ActDM{Act: _goOn, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
- // log.Info("bro:%v", tlimit)
- // go s.brodcast(tlimit)
- // log.Info("act:5")
- // //投票结果判断
- // stage++
- // if a.Archive.Duration-nowTime < 60 {
- // go s.inOnlinelog(c, l.ID, a.Archive.Aid, 100, 0, 0)
- // time.Sleep(time.Duration(a.Archive.Duration-nowTime) * time.Second)
- // break
- // }
- // }
- // }
- // ltime := map[string]interface{}{
- // "aid": 0,
- // "time": time.Now().Unix(),
- // "index": index + 1,
- // "stage": 0,
- // "title": "",
- // "author": "",
- // "tname": "",
- // }
- // go s.inLtime(ltime, l.ID)
- // log.Info("end")
- //}
- //
- //func (s *Service) inOnlinelog(c context.Context, sid, aid, stage, yes, no int64) {
- // if row, err := s.dao.InOnlinelog(c, sid, aid, stage, yes, no); err != nil {
- // log.Error("s.dao.inOnlinelog, err(%v) row(%v)", err, row)
- // }
- //}
- //
- //func (s *Service) inLtime(lt map[string]interface{}, sid int64) {
- // var v, err = json.Marshal(lt)
- // if err != nil {
- // log.Error("s.genesis.inLtime.json.Marshal(dm:(%v)), err(%v)", v, err)
- // return
- // }
- // s.dao.RbSet(context.Background(), "ltime:"+strconv.FormatInt(sid, 10), v)
- //}
- //
- //func (s *Service) brodcast(d *dmm.ActDM) {
- // var ds, err = json.Marshal(d)
- // if err != nil {
- // log.Error("s.genesis.json.Marshal(dm:(%v)), err(%v)", d, err)
- // return
- // }
- // var m = &dmm.Broadcast{
- // RoomID: s.c.Rule.BroadcastCid,
- // CMD: dmm.BroadcastCMDACT,
- // Info: ds,
- // }
- // s.dm.Broadcast(context.Background(), m)
- //}
- //
- //func (s *Service) sub(c context.Context, sid int64) (res *l.Subject, err error) {
- // var (
- // eg errgroup.Group
- // ls []*l.Like
- // )
- // eg.Go(func() (err error) {
- // res, err = s.dao.Subject(c, sid)
- // return
- // })
- // eg.Go(func() (err error) {
- // ls, err = s.dao.Like(c, sid)
- // return
- // })
- // if err = eg.Wait(); err != nil {
- // log.Error("eg.Wait error(%v)", err)
- // return
- // }
- // if res != nil {
- // res.List = ls
- // }
- // if res, err = s.likeArc(c, res); err != nil {
- // return
- // }
- // return
- //}
- //func (s *Service) vipCanal() {
- // defer s.waiter.Done()
- // var c = context.Background()
- // for {
- // msg, ok := <-s.vipSub.Messages()
- // if !ok {
- // log.Info("databus:activity-job vip binlog consumer exit!")
- // return
- // }
- // msg.Commit()
- // m := &match.Message{}
- // if err := json.Unmarshal(msg.Value, m); err != nil {
- // log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
- // continue
- // }
- // switch m.Table {
- // case _vipActOrderTable:
- // if m.Action == match.ActInsert {
- // s.addElemeLottery(c, m.New)
- // }
- // }
- // log.Info("vipCanal key:%s partition:%d offset:%d table:%s", msg.Key, msg.Partition, msg.Offset, m.Table)
- // }
- //}
- func (s *Service) consumeCanal() {
- defer s.waiter.Done()
- var c = context.Background()
- for {
- msg, ok := <-s.actSub.Messages()
- if !ok {
- log.Info("databus: activity-job binlog consumer exit!")
- return
- }
- msg.Commit()
- m := &match.Message{}
- if err := json.Unmarshal(msg.Value, m); err != nil {
- log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
- continue
- }
- switch m.Table {
- case _matchObjTable:
- if m.Action == match.ActUpdate {
- s.upMatchUser(c, m.New, m.Old)
- }
- case _subjectTable:
- if m.Action == match.ActInsert || m.Action == match.ActUpdate {
- s.upSubject(c, m.New)
- } else if m.Action == match.ActDelete {
- s.upSubject(c, m.Old)
- }
- case _likesTable:
- if m.Action == match.ActInsert {
- s.AddLike(c, m.New)
- } else if m.Action == match.ActUpdate {
- s.UpLike(c, m.New, m.Old)
- } else if m.Action == match.ActDelete {
- s.DelLike(c, m.Old)
- }
- case _likeContentTable:
- if m.Action == match.ActDelete {
- s.upLikeContent(c, m.Old)
- } else {
- s.upLikeContent(c, m.New)
- }
- case _likeActionTable:
- if m.Action == match.ActInsert {
- s.actionProc(c, m.New)
- }
- }
- log.Info("consumeCanal key:%s partition:%d offset:%d table:%s", msg.Key, msg.Partition, msg.Offset, m.Table)
- }
- }
- func (s *Service) kfcCanal() {
- defer s.waiter.Done()
- for {
- if s.closed {
- return
- }
- msg, ok := <-s.kfcSub.Messages()
- if !ok {
- log.Info("databus: activity-job binlog consumer exit!")
- return
- }
- msg.Commit()
- m := &kfcmdl.CouponMsg{}
- if err := json.Unmarshal(msg.Value, m); err != nil {
- log.Error("kfcCanal:json.Unmarshal(%s) error(%+v)", msg.Value, err)
- continue
- }
- j := m.CouponID % int64(s.kfcShare)
- select {
- case s.kfcActionCh[j] <- m:
- default:
- log.Info("kfcCanal cache full (%d)", j)
- }
- log.Info("kfcCanal key:%s partition:%d offset:%d value %s goroutine(%d) all(%d)", msg.Key, msg.Partition, msg.Offset, msg.Value, j, s.kfcShare)
- }
- }
- func (s *Service) subjectStat(sid int64, typ string) {
- var err error
- for {
- if s.closed {
- return
- }
- var (
- statLike int64
- likeCnt int
- likes []*l.Like
- )
- if sid <= 0 {
- log.Warn("conf sid == 0 typ(%s)", typ)
- time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
- continue
- }
- if likeCnt, err = s.dao.LikeCnt(context.Background(), sid); err != nil {
- log.Error("s.dao.LikeCnt(sid:%d) error(%v)", sid, err)
- time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
- continue
- }
- if likeCnt == 0 {
- log.Warn("s.dao.LikeCnt(sid:%d) likeCnt == 0", sid)
- time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
- continue
- }
- for i := 0; i < likeCnt; i += _objectPieceSize {
- if likes, err = s.likeList(context.Background(), sid, i, _objectPieceSize, _retryTimes); err != nil {
- log.Error("objectStatproc s.likeList(%d,%d,%d) error(%+v)", sid, i, _objectPieceSize, err)
- time.Sleep(100 * time.Millisecond)
- continue
- } else {
- var aids []int64
- for _, v := range likes {
- if v.Wid > 0 {
- aids = append(aids, v.Wid)
- }
- }
- switch typ {
- case _typeArc:
- var arcs map[int64]*arcapi.Arc
- if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
- log.Error("objectStatproc s.arcs(%v) error(%v)", aids, err)
- time.Sleep(100 * time.Millisecond)
- continue
- } else {
- for _, aid := range aids {
- if arc, ok := arcs[aid]; ok && arc.IsNormal() {
- statLike += int64(arc.Stat.Like)
- } else {
- likeCnt--
- }
- }
- }
- case _typeArt:
- var arts map[int64]*artmdl.Meta
- if arts, err = s.arts(context.Background(), aids, _retryTimes); err != nil {
- log.Error("objectStatproc s.arcs(%v) error(%v)", aids, err)
- time.Sleep(100 * time.Microsecond)
- continue
- } else {
- for _, aid := range aids {
- if art, ok := arts[aid]; ok && art.IsNormal() {
- statLike += art.Stats.Like
- } else {
- likeCnt--
- }
- }
- }
- }
- }
- }
- if err = s.setSubjectStat(context.Background(), sid, &l.SubjectTotalStat{SumLike: statLike}, likeCnt, _retryTimes); err != nil {
- log.Error("objectStatproc s.setObjectStat(%d,%d) error(%+v)", sid, statLike, err)
- time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
- continue
- }
- time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
- }
- }
- func (s *Service) likeList(c context.Context, sid int64, offset, limit, retryCnt int) (list []*l.Like, err error) {
- for i := 0; i < retryCnt; i++ {
- if list, err = s.dao.LikeList(c, sid, offset, limit); err == nil {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- return
- }
- func (s *Service) webDataList(c context.Context, vid int64, offset, limit, retryCnt int) (list []*l.WebData, err error) {
- for i := 0; i < retryCnt; i++ {
- if list, err = s.dao.WebDataList(c, vid, offset, limit); err == nil {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- return
- }
- func (s *Service) arts(c context.Context, aids []int64, retryCnt int) (arcs map[int64]*artmdl.Meta, err error) {
- for i := 0; i < retryCnt; i++ {
- if arcs, err = s.articleRPC.ArticleMetas(c, &artmdl.ArgAids{Aids: aids}); err == nil {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- return
- }
- func (s *Service) kingStoryTotalStat(vid int64) {
- var err error
- for {
- if s.closed {
- return
- }
- var (
- statView, statLike, statFav, StatCoin int64
- likeCnt int
- likes []*l.WebData
- )
- if vid <= 0 {
- log.Warn("conf vid == 0")
- time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
- continue
- }
- if likeCnt, err = s.dao.WebDataCnt(context.Background(), vid); err != nil {
- log.Error("kingStoryTotalStat s.dao.WebDataCnt(sid:%d) error(%v)", vid, err)
- time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
- continue
- }
- if likeCnt == 0 {
- log.Warn("kingStoryTotalStat s.dao.LikeCnt(sid:%d) likeCnt == 0", vid)
- time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
- continue
- }
- for i := 0; i < likeCnt; i += _objectPieceSize {
- if likes, err = s.webDataList(context.Background(), vid, i, _objectPieceSize, _retryTimes); err != nil {
- log.Error("kingStoryTotalStat s.webDataList(%d,%d,%d) error(%+v)", vid, i, _objectPieceSize, err)
- time.Sleep(100 * time.Millisecond)
- continue
- } else {
- var (
- aids []int64
- aidStruct = new(struct {
- Aid string `json:"AID"`
- })
- )
- for _, v := range likes {
- if v.Data != "" {
- if e := json.Unmarshal([]byte(v.Data), &aidStruct); e != nil {
- log.Warn("kingStoryTotalStat json.Unmarshal(%s) error(%v)", v.Data, e)
- continue
- }
- if aid, e := strconv.ParseInt(aidStruct.Aid, 10, 64); e != nil {
- log.Warn("kingStoryTotalStat strconv.ParseInt(%s) error(%v)", aidStruct, e)
- continue
- } else {
- aids = append(aids, aid)
- }
- }
- }
- var arcs map[int64]*arcapi.Arc
- if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
- log.Error("kingStoryTotalStat s.arcs(%v) error(%v)", aids, err)
- time.Sleep(100 * time.Millisecond)
- continue
- } else {
- for _, aid := range aids {
- if arc, ok := arcs[aid]; ok && arc.IsNormal() {
- statView += int64(arc.Stat.View)
- statLike += int64(arc.Stat.Like)
- statFav += int64(arc.Stat.Fav)
- StatCoin += int64(arc.Stat.Coin)
- }
- }
- }
- }
- }
- if err = s.setSubjectStat(context.Background(), vid, &l.SubjectTotalStat{SumView: statView, SumLike: statLike, SumFav: statFav, SumCoin: StatCoin}, likeCnt, _retryTimes); err != nil {
- log.Error("kingStoryTotalStat s.setObjectStat(%d,%d) error(%+v)", vid, statLike, err)
- time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
- continue
- }
- time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
- }
- }
- func (s *Service) setSubjectStat(c context.Context, sid int64, stat *l.SubjectTotalStat, count, retryCnt int) (err error) {
- for i := 0; i < retryCnt; i++ {
- if err = s.dao.SetObjectStat(c, sid, stat, count); err == nil {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- return
- }
- // Ping reports the heath of services.
- func (s *Service) Ping(c context.Context) (err error) {
- return s.dao.Ping(c)
- }
- // Close kafaka consumer close.
- func (s *Service) Close() (err error) {
- defer s.waiter.Wait()
- s.closed = true
- if s.bnjTimeFinish == 0 {
- s.bnj.AddCacheLessTime(context.Background(), s.bnjLessSecond)
- }
- s.cron.Stop()
- s.dao.Close()
- s.actSub.Close()
- s.bnjSub.Close()
- //s.vipSub.Close()
- s.kfcSub.Close()
- s.closed = true
- return
- }
|