service.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "runtime"
  6. "strconv"
  7. "sync"
  8. "time"
  9. actrpc "go-common/app/interface/main/activity/rpc/client"
  10. artmdl "go-common/app/interface/openplatform/article/model"
  11. artrpc "go-common/app/interface/openplatform/article/rpc/client"
  12. "go-common/app/job/main/activity/conf"
  13. "go-common/app/job/main/activity/dao/bnj"
  14. "go-common/app/job/main/activity/dao/dm"
  15. "go-common/app/job/main/activity/dao/kfc"
  16. "go-common/app/job/main/activity/dao/like"
  17. kfcmdl "go-common/app/job/main/activity/model/kfc"
  18. l "go-common/app/job/main/activity/model/like"
  19. "go-common/app/job/main/activity/model/match"
  20. "go-common/app/service/main/account/api"
  21. arcapi "go-common/app/service/main/archive/api"
  22. arcrpc "go-common/app/service/main/archive/api/gorpc"
  23. comarcmdl "go-common/app/service/main/archive/model/archive"
  24. "go-common/app/service/main/coin/api/gorpc"
  25. "go-common/library/log"
  26. "go-common/library/queue/databus"
  27. "github.com/robfig/cron"
  28. )
  29. const (
  30. //_startVoteP = 2
  31. //_startVote = 3
  32. //_endingVote = 4
  33. //_endVote = 5
  34. //_goOn = 6
  35. //_next = 7
  36. _matchObjTable = "act_matchs_object"
  37. _subjectTable = "act_subject"
  38. _likesTable = "likes"
  39. _likeContentTable = "like_content"
  40. _likeActionTable = "like_action"
  41. //_vipActOrderTable = "vip_order_activity_record"
  42. _objectPieceSize = 100
  43. _retryTimes = 3
  44. _typeArc = "archive"
  45. _typeArt = "article"
  46. _sharding = 10
  47. )
  48. // Service service
  49. type Service struct {
  50. c *conf.Config
  51. dao *like.Dao
  52. bnj *bnj.Dao
  53. dm *dm.Dao
  54. kfcDao *kfc.Dao
  55. // waiter
  56. waiter sync.WaitGroup
  57. closed bool
  58. // cache: type, upper
  59. // arc rpc
  60. arcRPC *arcrpc.Service2
  61. coinRPC *coin.Service
  62. actRPC *actrpc.Service
  63. articleRPC *artrpc.Service
  64. //grpc
  65. accClient api.AccountClient
  66. // databus
  67. actSub *databus.Databus
  68. bnjSub *databus.Databus
  69. // vip binlog databus
  70. //vipSub *databus.Databus
  71. kfcSub *databus.Databus
  72. kfcActionCh []chan *kfcmdl.CouponMsg
  73. kfcShare int
  74. subActionCh []chan *l.Action
  75. actionSM []map[int64]*l.LastTmStat
  76. // bnj
  77. bnjMaxSecond int64
  78. bnjLessSecond int64
  79. bnjTimeFinish int64
  80. bnjMsgFlagMap map[int]int64
  81. bnjMsgFlagMu sync.Mutex
  82. bnjWxMsgFlagMap map[int]int64
  83. bnjWxMsgFlagMu sync.Mutex
  84. // cron
  85. cron *cron.Cron
  86. }
  87. // New is archive service implementation.
  88. func New(c *conf.Config) (s *Service) {
  89. s = &Service{
  90. c: c,
  91. dao: like.New(c),
  92. dm: dm.New(c),
  93. bnj: bnj.New(c),
  94. kfcDao: kfc.New(c),
  95. arcRPC: arcrpc.New2(c.ArchiveRPC),
  96. articleRPC: artrpc.New(c.ArticleRPC),
  97. coinRPC: coin.New(c.CoinRPC),
  98. actRPC: actrpc.New(c.ActRPC),
  99. actSub: databus.New(c.ActSub),
  100. bnjSub: databus.New(c.BnjSub),
  101. //vipSub: databus.New(c.VipSub),
  102. kfcSub: databus.New(c.KfcSub),
  103. cron: cron.New(),
  104. }
  105. var err error
  106. if s.accClient, err = api.NewClient(c.AccClient); err != nil {
  107. panic(err)
  108. }
  109. if s.c.Bnj2019.MsgSpec != "" {
  110. if err = s.cron.AddFunc(s.c.Bnj2019.MsgSpec, s.cronInformationMessage); err != nil {
  111. panic(err)
  112. }
  113. log.Info("cronInformationMessage init")
  114. s.cron.Start()
  115. }
  116. //time.Sleep(2 * time.Second)
  117. //subject, err := s.sub(context.Background(), c.Rule.BroadcastSid)
  118. //if err != nil {
  119. // log.Error("error(%v)", err)
  120. // return
  121. //}
  122. //log.Info("start-subject")
  123. //log.Info("subject(%v)", subject)
  124. //log.Info("end-subject")
  125. //if subject != nil {
  126. // go s.genesis(subject)
  127. //}
  128. s.bnjMsgFlagMap = make(map[int]int64, len(bnjSteps))
  129. s.bnjWxMsgFlagMap = make(map[int]int64, len(bnjSteps))
  130. for _, step := range bnjSteps {
  131. s.bnjMsgFlagMap[step] = 0
  132. s.bnjWxMsgFlagMap[step] = 0
  133. }
  134. for i := 0; i < _sharding; i++ {
  135. s.subActionCh = append(s.subActionCh, make(chan *l.Action, 10240))
  136. s.actionSM = append(s.actionSM, map[int64]*l.LastTmStat{})
  137. s.waiter.Add(1)
  138. go s.actionDealProc(i)
  139. }
  140. //s.waiter.Add(1)
  141. //go s.vipCanal()
  142. s.waiter.Add(1)
  143. go s.consumeCanal()
  144. go s.subjectStat(s.c.Rule.ArcObjStatSid, _typeArc)
  145. go s.subjectStat(s.c.Rule.ArtObjStatSid, _typeArt)
  146. go s.kingStoryTotalStat(s.c.Rule.KingStorySid)
  147. go s.subsRankproc()
  148. go s.initBnjSecond()
  149. if runtime.NumCPU() <= 4 {
  150. s.kfcShare = 4
  151. } else if runtime.NumCPU() > 32 {
  152. s.kfcShare = 32
  153. } else {
  154. s.kfcShare = runtime.NumCPU()
  155. }
  156. for j := 0; j < s.kfcShare; j++ {
  157. s.kfcActionCh = append(s.kfcActionCh, make(chan *kfcmdl.CouponMsg, 10240))
  158. s.waiter.Add(1)
  159. go s.kfcActionDeal(j)
  160. }
  161. s.waiter.Add(1)
  162. go s.kfcCanal()
  163. return s
  164. }
  165. func (s *Service) likeArc(c context.Context, sub *l.Subject) (res *l.Subject, err error) {
  166. if sub != nil {
  167. if sub.ID == 0 {
  168. res = nil
  169. } else {
  170. res = sub
  171. var (
  172. ok bool
  173. arcs map[int64]*arcapi.Arc
  174. aids []int64
  175. )
  176. for _, l := range res.List {
  177. aids = append(aids, l.Wid)
  178. }
  179. argAids := &comarcmdl.ArgAids2{
  180. Aids: aids,
  181. }
  182. if arcs, err = s.arcRPC.Archives3(c, argAids); err != nil {
  183. log.Error("s.arcRPC.Archives(arcAids:(%v), arcs), err(%v)", aids, err)
  184. return
  185. }
  186. for _, l := range res.List {
  187. if l.Archive, ok = arcs[l.Wid]; !ok {
  188. log.Info("s.arcs.wid:(%d), (%v)", l.Wid, ok)
  189. continue
  190. }
  191. }
  192. }
  193. }
  194. return
  195. }
  196. //func (s *Service) genesis(l *l.Subject) {
  197. // var (
  198. // index int
  199. // nowTime, stage, yes, no, next int64
  200. // err error
  201. // c = context.Background()
  202. // )
  203. // log.Info("st")
  204. // for {
  205. // if time.Now().Unix() >= l.Stime.Time().Unix() {
  206. // break
  207. // }
  208. // time.Sleep(time.Second)
  209. // }
  210. // lstime := map[string]interface{}{
  211. // "aid": 0,
  212. // "time": 0,
  213. // "index": 0,
  214. // "stage": 0,
  215. // }
  216. // go s.inLtime(lstime, l.ID)
  217. // for i, a := range l.List {
  218. // arg1 := &comarcmdl.ArgAid2{Aid: a.Archive.Aid}
  219. // arc, errRPC := s.arcRPC.Archive3(c, arg1)
  220. // if errRPC != nil {
  221. // log.Error("act-job s.arcRPC.Archive3(%v) error(%v)", arg1, errRPC)
  222. // errRPC = nil
  223. // continue
  224. // }
  225. // if arc.State < 0 && arc.State != -6 {
  226. // log.Error("act-job s.arcRPC.Archive3(%v) stat err", arg1)
  227. // }
  228. // index = i
  229. // nowTime = 0
  230. // stage = 0
  231. // next = 0
  232. // time.Sleep(time.Duration(l.Ltime) * time.Second)
  233. // log.Info("aid sleep")
  234. // aidTime := time.Now().Unix()
  235. // ltime := map[string]interface{}{
  236. // "aid": a.Archive.Aid,
  237. // "time": aidTime,
  238. // "index": i,
  239. // "stage": stage,
  240. // "title": arc.Title,
  241. // "author": arc.Author.Name,
  242. // "tname": arc.TypeName,
  243. // }
  244. // go s.inLtime(ltime, l.ID)
  245. // for {
  246. // fmt.Println(stage)
  247. // log.Info("s.stage{%d}", stage)
  248. // //演出开始
  249. // ltime := map[string]interface{}{
  250. // "aid": a.Archive.Aid,
  251. // "time": aidTime,
  252. // "index": i,
  253. // "stage": stage,
  254. // "title": arc.Title,
  255. // "author": arc.Author.Name,
  256. // "tname": arc.TypeName,
  257. // }
  258. // go s.inLtime(ltime, l.ID)
  259. // tp := l.Interval - l.Ltime
  260. // nowTime += tp
  261. // time.Sleep(time.Duration(tp) * time.Second)
  262. // //预备投票
  263. // sdm := &dmm.ActDM{Act: _startVoteP, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
  264. // go s.brodcast(sdm)
  265. // log.Info("act:1")
  266. // go s.dao.CreateSelection(c, a.Archive.Aid, stage)
  267. // nowTime += l.Ltime
  268. // time.Sleep(time.Duration(l.Ltime) * time.Second)
  269. // //投票开始
  270. // interval := &dmm.ActDM{Act: _startVote, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
  271. // go s.brodcast(interval)
  272. // log.Info("act:2")
  273. // tl := l.Tlimit - l.Ltime
  274. // nowTime += tl
  275. // time.Sleep(time.Duration(tl) * time.Second)
  276. // //投票预结束
  277. // intervalP := &dmm.ActDM{Act: _endingVote, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
  278. // log.Info("act:3")
  279. // go s.brodcast(intervalP)
  280. // nowTime += l.Ltime
  281. // time.Sleep(time.Duration(l.Ltime) * time.Second)
  282. // //投票结果
  283. // if yes, no, err = s.dao.Selection(c, a.Archive.Aid, stage); err != nil {
  284. // log.Error("s.dao.Selection() error(%v)", err)
  285. // return
  286. // }
  287. // goNext := true
  288. // if yes != 0 || no != 0 {
  289. // goNext = (float64(no)/float64(yes+no)*100 > 40)
  290. // }
  291. // if goNext {
  292. // next = 1
  293. // } else {
  294. // next = 0
  295. // }
  296. // intervalEnd := &dmm.ActDM{Act: _endVote, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
  297. // go s.brodcast(intervalEnd)
  298. // log.Info("act:4")
  299. // nowTime += l.Ltime
  300. // time.Sleep(time.Duration(l.Ltime) * time.Second)
  301. // go s.inOnlinelog(c, l.ID, a.Archive.Aid, stage, yes, no)
  302. // if goNext {
  303. // tlimit := &dmm.ActDM{Act: _next, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
  304. // go s.brodcast(tlimit)
  305. // log.Info("act:6")
  306. // nowTime += l.Ltime
  307. // ldtime := map[string]interface{}{
  308. // "aid": 0,
  309. // "time": time.Now().Unix() + l.Ltime,
  310. // "index": i + 1,
  311. // "stage": 0,
  312. // "title": arc.Title,
  313. // "author": arc.Author.Name,
  314. // "tname": arc.TypeName,
  315. // }
  316. // go s.inLtime(ldtime, l.ID)
  317. // time.Sleep(time.Duration(l.Ltime) * time.Second)
  318. // break
  319. // }
  320. // tlimit := &dmm.ActDM{Act: _goOn, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
  321. // log.Info("bro:%v", tlimit)
  322. // go s.brodcast(tlimit)
  323. // log.Info("act:5")
  324. // //投票结果判断
  325. // stage++
  326. // if a.Archive.Duration-nowTime < 60 {
  327. // go s.inOnlinelog(c, l.ID, a.Archive.Aid, 100, 0, 0)
  328. // time.Sleep(time.Duration(a.Archive.Duration-nowTime) * time.Second)
  329. // break
  330. // }
  331. // }
  332. // }
  333. // ltime := map[string]interface{}{
  334. // "aid": 0,
  335. // "time": time.Now().Unix(),
  336. // "index": index + 1,
  337. // "stage": 0,
  338. // "title": "",
  339. // "author": "",
  340. // "tname": "",
  341. // }
  342. // go s.inLtime(ltime, l.ID)
  343. // log.Info("end")
  344. //}
  345. //
  346. //func (s *Service) inOnlinelog(c context.Context, sid, aid, stage, yes, no int64) {
  347. // if row, err := s.dao.InOnlinelog(c, sid, aid, stage, yes, no); err != nil {
  348. // log.Error("s.dao.inOnlinelog, err(%v) row(%v)", err, row)
  349. // }
  350. //}
  351. //
  352. //func (s *Service) inLtime(lt map[string]interface{}, sid int64) {
  353. // var v, err = json.Marshal(lt)
  354. // if err != nil {
  355. // log.Error("s.genesis.inLtime.json.Marshal(dm:(%v)), err(%v)", v, err)
  356. // return
  357. // }
  358. // s.dao.RbSet(context.Background(), "ltime:"+strconv.FormatInt(sid, 10), v)
  359. //}
  360. //
  361. //func (s *Service) brodcast(d *dmm.ActDM) {
  362. // var ds, err = json.Marshal(d)
  363. // if err != nil {
  364. // log.Error("s.genesis.json.Marshal(dm:(%v)), err(%v)", d, err)
  365. // return
  366. // }
  367. // var m = &dmm.Broadcast{
  368. // RoomID: s.c.Rule.BroadcastCid,
  369. // CMD: dmm.BroadcastCMDACT,
  370. // Info: ds,
  371. // }
  372. // s.dm.Broadcast(context.Background(), m)
  373. //}
  374. //
  375. //func (s *Service) sub(c context.Context, sid int64) (res *l.Subject, err error) {
  376. // var (
  377. // eg errgroup.Group
  378. // ls []*l.Like
  379. // )
  380. // eg.Go(func() (err error) {
  381. // res, err = s.dao.Subject(c, sid)
  382. // return
  383. // })
  384. // eg.Go(func() (err error) {
  385. // ls, err = s.dao.Like(c, sid)
  386. // return
  387. // })
  388. // if err = eg.Wait(); err != nil {
  389. // log.Error("eg.Wait error(%v)", err)
  390. // return
  391. // }
  392. // if res != nil {
  393. // res.List = ls
  394. // }
  395. // if res, err = s.likeArc(c, res); err != nil {
  396. // return
  397. // }
  398. // return
  399. //}
  400. //func (s *Service) vipCanal() {
  401. // defer s.waiter.Done()
  402. // var c = context.Background()
  403. // for {
  404. // msg, ok := <-s.vipSub.Messages()
  405. // if !ok {
  406. // log.Info("databus:activity-job vip binlog consumer exit!")
  407. // return
  408. // }
  409. // msg.Commit()
  410. // m := &match.Message{}
  411. // if err := json.Unmarshal(msg.Value, m); err != nil {
  412. // log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
  413. // continue
  414. // }
  415. // switch m.Table {
  416. // case _vipActOrderTable:
  417. // if m.Action == match.ActInsert {
  418. // s.addElemeLottery(c, m.New)
  419. // }
  420. // }
  421. // log.Info("vipCanal key:%s partition:%d offset:%d table:%s", msg.Key, msg.Partition, msg.Offset, m.Table)
  422. // }
  423. //}
  424. func (s *Service) consumeCanal() {
  425. defer s.waiter.Done()
  426. var c = context.Background()
  427. for {
  428. msg, ok := <-s.actSub.Messages()
  429. if !ok {
  430. log.Info("databus: activity-job binlog consumer exit!")
  431. return
  432. }
  433. msg.Commit()
  434. m := &match.Message{}
  435. if err := json.Unmarshal(msg.Value, m); err != nil {
  436. log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
  437. continue
  438. }
  439. switch m.Table {
  440. case _matchObjTable:
  441. if m.Action == match.ActUpdate {
  442. s.upMatchUser(c, m.New, m.Old)
  443. }
  444. case _subjectTable:
  445. if m.Action == match.ActInsert || m.Action == match.ActUpdate {
  446. s.upSubject(c, m.New)
  447. } else if m.Action == match.ActDelete {
  448. s.upSubject(c, m.Old)
  449. }
  450. case _likesTable:
  451. if m.Action == match.ActInsert {
  452. s.AddLike(c, m.New)
  453. } else if m.Action == match.ActUpdate {
  454. s.UpLike(c, m.New, m.Old)
  455. } else if m.Action == match.ActDelete {
  456. s.DelLike(c, m.Old)
  457. }
  458. case _likeContentTable:
  459. if m.Action == match.ActDelete {
  460. s.upLikeContent(c, m.Old)
  461. } else {
  462. s.upLikeContent(c, m.New)
  463. }
  464. case _likeActionTable:
  465. if m.Action == match.ActInsert {
  466. s.actionProc(c, m.New)
  467. }
  468. }
  469. log.Info("consumeCanal key:%s partition:%d offset:%d table:%s", msg.Key, msg.Partition, msg.Offset, m.Table)
  470. }
  471. }
  472. func (s *Service) kfcCanal() {
  473. defer s.waiter.Done()
  474. for {
  475. if s.closed {
  476. return
  477. }
  478. msg, ok := <-s.kfcSub.Messages()
  479. if !ok {
  480. log.Info("databus: activity-job binlog consumer exit!")
  481. return
  482. }
  483. msg.Commit()
  484. m := &kfcmdl.CouponMsg{}
  485. if err := json.Unmarshal(msg.Value, m); err != nil {
  486. log.Error("kfcCanal:json.Unmarshal(%s) error(%+v)", msg.Value, err)
  487. continue
  488. }
  489. j := m.CouponID % int64(s.kfcShare)
  490. select {
  491. case s.kfcActionCh[j] <- m:
  492. default:
  493. log.Info("kfcCanal cache full (%d)", j)
  494. }
  495. log.Info("kfcCanal key:%s partition:%d offset:%d value %s goroutine(%d) all(%d)", msg.Key, msg.Partition, msg.Offset, msg.Value, j, s.kfcShare)
  496. }
  497. }
  498. func (s *Service) subjectStat(sid int64, typ string) {
  499. var err error
  500. for {
  501. if s.closed {
  502. return
  503. }
  504. var (
  505. statLike int64
  506. likeCnt int
  507. likes []*l.Like
  508. )
  509. if sid <= 0 {
  510. log.Warn("conf sid == 0 typ(%s)", typ)
  511. time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
  512. continue
  513. }
  514. if likeCnt, err = s.dao.LikeCnt(context.Background(), sid); err != nil {
  515. log.Error("s.dao.LikeCnt(sid:%d) error(%v)", sid, err)
  516. time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
  517. continue
  518. }
  519. if likeCnt == 0 {
  520. log.Warn("s.dao.LikeCnt(sid:%d) likeCnt == 0", sid)
  521. time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
  522. continue
  523. }
  524. for i := 0; i < likeCnt; i += _objectPieceSize {
  525. if likes, err = s.likeList(context.Background(), sid, i, _objectPieceSize, _retryTimes); err != nil {
  526. log.Error("objectStatproc s.likeList(%d,%d,%d) error(%+v)", sid, i, _objectPieceSize, err)
  527. time.Sleep(100 * time.Millisecond)
  528. continue
  529. } else {
  530. var aids []int64
  531. for _, v := range likes {
  532. if v.Wid > 0 {
  533. aids = append(aids, v.Wid)
  534. }
  535. }
  536. switch typ {
  537. case _typeArc:
  538. var arcs map[int64]*arcapi.Arc
  539. if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
  540. log.Error("objectStatproc s.arcs(%v) error(%v)", aids, err)
  541. time.Sleep(100 * time.Millisecond)
  542. continue
  543. } else {
  544. for _, aid := range aids {
  545. if arc, ok := arcs[aid]; ok && arc.IsNormal() {
  546. statLike += int64(arc.Stat.Like)
  547. } else {
  548. likeCnt--
  549. }
  550. }
  551. }
  552. case _typeArt:
  553. var arts map[int64]*artmdl.Meta
  554. if arts, err = s.arts(context.Background(), aids, _retryTimes); err != nil {
  555. log.Error("objectStatproc s.arcs(%v) error(%v)", aids, err)
  556. time.Sleep(100 * time.Microsecond)
  557. continue
  558. } else {
  559. for _, aid := range aids {
  560. if art, ok := arts[aid]; ok && art.IsNormal() {
  561. statLike += art.Stats.Like
  562. } else {
  563. likeCnt--
  564. }
  565. }
  566. }
  567. }
  568. }
  569. }
  570. if err = s.setSubjectStat(context.Background(), sid, &l.SubjectTotalStat{SumLike: statLike}, likeCnt, _retryTimes); err != nil {
  571. log.Error("objectStatproc s.setObjectStat(%d,%d) error(%+v)", sid, statLike, err)
  572. time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
  573. continue
  574. }
  575. time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
  576. }
  577. }
  578. func (s *Service) likeList(c context.Context, sid int64, offset, limit, retryCnt int) (list []*l.Like, err error) {
  579. for i := 0; i < retryCnt; i++ {
  580. if list, err = s.dao.LikeList(c, sid, offset, limit); err == nil {
  581. break
  582. }
  583. time.Sleep(100 * time.Millisecond)
  584. }
  585. return
  586. }
  587. func (s *Service) webDataList(c context.Context, vid int64, offset, limit, retryCnt int) (list []*l.WebData, err error) {
  588. for i := 0; i < retryCnt; i++ {
  589. if list, err = s.dao.WebDataList(c, vid, offset, limit); err == nil {
  590. break
  591. }
  592. time.Sleep(100 * time.Millisecond)
  593. }
  594. return
  595. }
  596. func (s *Service) arts(c context.Context, aids []int64, retryCnt int) (arcs map[int64]*artmdl.Meta, err error) {
  597. for i := 0; i < retryCnt; i++ {
  598. if arcs, err = s.articleRPC.ArticleMetas(c, &artmdl.ArgAids{Aids: aids}); err == nil {
  599. break
  600. }
  601. time.Sleep(100 * time.Millisecond)
  602. }
  603. return
  604. }
  605. func (s *Service) kingStoryTotalStat(vid int64) {
  606. var err error
  607. for {
  608. if s.closed {
  609. return
  610. }
  611. var (
  612. statView, statLike, statFav, StatCoin int64
  613. likeCnt int
  614. likes []*l.WebData
  615. )
  616. if vid <= 0 {
  617. log.Warn("conf vid == 0")
  618. time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
  619. continue
  620. }
  621. if likeCnt, err = s.dao.WebDataCnt(context.Background(), vid); err != nil {
  622. log.Error("kingStoryTotalStat s.dao.WebDataCnt(sid:%d) error(%v)", vid, err)
  623. time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
  624. continue
  625. }
  626. if likeCnt == 0 {
  627. log.Warn("kingStoryTotalStat s.dao.LikeCnt(sid:%d) likeCnt == 0", vid)
  628. time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
  629. continue
  630. }
  631. for i := 0; i < likeCnt; i += _objectPieceSize {
  632. if likes, err = s.webDataList(context.Background(), vid, i, _objectPieceSize, _retryTimes); err != nil {
  633. log.Error("kingStoryTotalStat s.webDataList(%d,%d,%d) error(%+v)", vid, i, _objectPieceSize, err)
  634. time.Sleep(100 * time.Millisecond)
  635. continue
  636. } else {
  637. var (
  638. aids []int64
  639. aidStruct = new(struct {
  640. Aid string `json:"AID"`
  641. })
  642. )
  643. for _, v := range likes {
  644. if v.Data != "" {
  645. if e := json.Unmarshal([]byte(v.Data), &aidStruct); e != nil {
  646. log.Warn("kingStoryTotalStat json.Unmarshal(%s) error(%v)", v.Data, e)
  647. continue
  648. }
  649. if aid, e := strconv.ParseInt(aidStruct.Aid, 10, 64); e != nil {
  650. log.Warn("kingStoryTotalStat strconv.ParseInt(%s) error(%v)", aidStruct, e)
  651. continue
  652. } else {
  653. aids = append(aids, aid)
  654. }
  655. }
  656. }
  657. var arcs map[int64]*arcapi.Arc
  658. if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
  659. log.Error("kingStoryTotalStat s.arcs(%v) error(%v)", aids, err)
  660. time.Sleep(100 * time.Millisecond)
  661. continue
  662. } else {
  663. for _, aid := range aids {
  664. if arc, ok := arcs[aid]; ok && arc.IsNormal() {
  665. statView += int64(arc.Stat.View)
  666. statLike += int64(arc.Stat.Like)
  667. statFav += int64(arc.Stat.Fav)
  668. StatCoin += int64(arc.Stat.Coin)
  669. }
  670. }
  671. }
  672. }
  673. }
  674. if err = s.setSubjectStat(context.Background(), vid, &l.SubjectTotalStat{SumView: statView, SumLike: statLike, SumFav: statFav, SumCoin: StatCoin}, likeCnt, _retryTimes); err != nil {
  675. log.Error("kingStoryTotalStat s.setObjectStat(%d,%d) error(%+v)", vid, statLike, err)
  676. time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
  677. continue
  678. }
  679. time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
  680. }
  681. }
  682. func (s *Service) setSubjectStat(c context.Context, sid int64, stat *l.SubjectTotalStat, count, retryCnt int) (err error) {
  683. for i := 0; i < retryCnt; i++ {
  684. if err = s.dao.SetObjectStat(c, sid, stat, count); err == nil {
  685. break
  686. }
  687. time.Sleep(100 * time.Millisecond)
  688. }
  689. return
  690. }
  691. // Ping reports the heath of services.
  692. func (s *Service) Ping(c context.Context) (err error) {
  693. return s.dao.Ping(c)
  694. }
  695. // Close kafaka consumer close.
  696. func (s *Service) Close() (err error) {
  697. defer s.waiter.Wait()
  698. s.closed = true
  699. if s.bnjTimeFinish == 0 {
  700. s.bnj.AddCacheLessTime(context.Background(), s.bnjLessSecond)
  701. }
  702. s.cron.Stop()
  703. s.dao.Close()
  704. s.actSub.Close()
  705. s.bnjSub.Close()
  706. //s.vipSub.Close()
  707. s.kfcSub.Close()
  708. s.closed = true
  709. return
  710. }