service.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "time"
  8. tagrpc "go-common/app/interface/main/tag/rpc/client"
  9. artmdl "go-common/app/interface/openplatform/article/model"
  10. artrpc "go-common/app/interface/openplatform/article/rpc/client"
  11. "go-common/app/job/openplatform/article/conf"
  12. "go-common/app/job/openplatform/article/dao"
  13. "go-common/app/job/openplatform/article/model"
  14. thumdl "go-common/app/service/main/thumbup/model"
  15. "go-common/library/conf/env"
  16. "go-common/library/log"
  17. "go-common/library/log/infoc"
  18. "go-common/library/queue/databus"
  19. "go-common/library/stat/prom"
  20. )
  21. const (
  22. _sharding = 100 // goroutines for dealing the stat
  23. _chanSize = 10240
  24. _articleTable = "filtered_articles" // article table name
  25. _authorTable = "article_authors" // 作者表
  26. )
  27. type lastTimeStat struct {
  28. time int64
  29. stat *artmdl.StatMsg
  30. }
  31. // Service .
  32. type Service struct {
  33. c *conf.Config
  34. dao *dao.Dao
  35. waiter *sync.WaitGroup
  36. closed bool
  37. // monitor *monitor.Service
  38. articleSub *databus.Databus
  39. articleStatSub *databus.Databus
  40. likeStatSub *databus.Databus
  41. replyStatSub *databus.Databus
  42. favoriteStatSub *databus.Databus
  43. coinStatSub *databus.Databus
  44. articleRPC *artrpc.Service
  45. tagRPC *tagrpc.Service
  46. statCh [_sharding]chan *artmdl.StatMsg
  47. statLastTime [_sharding]map[int64]*lastTimeStat
  48. categoriesMap map[int64]*artmdl.Category
  49. categoriesReverseMap map[int64][]*artmdl.Category
  50. sitemapMap map[int64]struct{}
  51. urlListHead *urlNode
  52. urlListTail *urlNode
  53. lastURLNode *urlNode
  54. sitemapXML string
  55. likeCh chan *thumdl.StatMsg
  56. updateDbInterval int64
  57. updateSortInterval time.Duration
  58. cheatInfoc *infoc.Infoc
  59. // *Cnt means sum of consumed messages.
  60. statCnt, binCnt int64
  61. cheatArts map[int64]int
  62. sortLimitTime int64
  63. setting *model.Setting
  64. // infoc
  65. logCh chan interface{}
  66. }
  67. // New creates a Service instance.
  68. func New(c *conf.Config) (s *Service) {
  69. s = &Service{
  70. c: c,
  71. dao: dao.New(c),
  72. waiter: new(sync.WaitGroup),
  73. // monitor: monitor.New(),
  74. articleSub: databus.New(c.ArticleSub),
  75. articleStatSub: databus.New(c.ArticleStatSub),
  76. replyStatSub: databus.New(c.ReplyStatSub),
  77. favoriteStatSub: databus.New(c.FavoriteStatSub),
  78. coinStatSub: databus.New(c.CoinStatSub),
  79. likeStatSub: databus.New(c.LikeStatSub),
  80. articleRPC: artrpc.New(c.ArticleRPC),
  81. tagRPC: tagrpc.New2(c.TagRPC),
  82. updateDbInterval: int64(time.Duration(c.Job.UpdateDbInterval) / time.Second),
  83. updateSortInterval: time.Duration(c.Job.UpdateSortInterval),
  84. likeCh: make(chan *thumdl.StatMsg, 1e4),
  85. cheatInfoc: infoc.New(c.CheatInfoc),
  86. categoriesMap: make(map[int64]*artmdl.Category),
  87. categoriesReverseMap: make(map[int64][]*artmdl.Category),
  88. sitemapMap: make(map[int64]struct{}),
  89. sortLimitTime: int64(time.Duration(c.Job.SortLimitTime) / time.Second),
  90. logCh: make(chan interface{}, 1024),
  91. }
  92. // s.monitor.SetConfig(c.HTTPClient)
  93. for i := int64(0); i < _sharding; i++ {
  94. i := i
  95. // for stat
  96. s.statCh[i] = make(chan *artmdl.StatMsg, _chanSize)
  97. s.statLastTime[i] = make(map[int64]*lastTimeStat)
  98. s.waiter.Add(1)
  99. go s.statproc(i)
  100. }
  101. s.loadCategories()
  102. s.loadSettings()
  103. // 10 go routines with WaitGroup
  104. s.waiter.Add(10)
  105. go s.consumeStat()
  106. go s.consumeCanal()
  107. go s.checkConsumer()
  108. go s.retryStat()
  109. go s.retryReply()
  110. go s.retryGame()
  111. go s.retryFlow()
  112. go s.retryDynamic()
  113. go s.retryCDN()
  114. go s.retryCache()
  115. // other go routines
  116. go s.updateSortproc()
  117. go s.activityLikeproc()
  118. go s.updateReadCountproc()
  119. go s.updateHotspotsproc()
  120. go s.updateCheatArtsproc()
  121. go s.loadCategoriesproc()
  122. go s.loadSettingsproc()
  123. go s.recommendAuthorproc()
  124. go s.checkReadStatusProc()
  125. go s.infocproc()
  126. go s.sitemapproc()
  127. return
  128. }
  129. // consumeStat consumes article's stat.
  130. func (s *Service) consumeStat() {
  131. defer s.waiter.Done()
  132. for {
  133. if s.closed {
  134. for i := 0; i < _sharding; i++ {
  135. close(s.statCh[i])
  136. }
  137. log.Info("databus: article-job stat consumer exit!")
  138. return
  139. }
  140. L:
  141. select {
  142. case msg, ok := <-s.articleStatSub.Messages():
  143. if !ok {
  144. break L
  145. }
  146. s.statCnt++
  147. msg.Commit()
  148. sm := &artmdl.StatMsg{}
  149. if err := json.Unmarshal(msg.Value, sm); err != nil {
  150. log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
  151. dao.PromError("service:解析计数databus消息")
  152. break L
  153. }
  154. if sm.Aid <= 0 {
  155. log.Warn("aid(%d) <=0 message(%s)", sm.Aid, msg.Value)
  156. break L
  157. }
  158. key := sm.Aid % _sharding
  159. s.statCh[key] <- sm
  160. prom.BusinessInfoCount.State(fmt.Sprintf("statChan-%v", key), int64(len(s.statCh[key])))
  161. log.Info("consumeStat key:%s partition:%d offset:%d msg: %v)", msg.Key, msg.Partition, msg.Offset, sm.String())
  162. case msg, ok := <-s.likeStatSub.Messages():
  163. if !ok {
  164. break L
  165. }
  166. msg.Commit()
  167. like := &thumdl.StatMsg{}
  168. if err := json.Unmarshal(msg.Value, like); err != nil {
  169. log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
  170. dao.PromError("service:解析like计数databus消息")
  171. break L
  172. }
  173. if like.Type != "article" {
  174. continue
  175. }
  176. select {
  177. case s.likeCh <- like:
  178. default:
  179. }
  180. key := like.ID % _sharding
  181. s.statCh[key] <- &artmdl.StatMsg{Like: &like.Count, Dislike: &like.DislikeCount, Aid: like.ID}
  182. prom.BusinessInfoCount.State(fmt.Sprintf("statChan-%v", key), int64(len(s.statCh[key])))
  183. log.Info("consumeLikeStat key:%s partition:%d offset:%d msg: %+v)", msg.Key, msg.Partition, msg.Offset, like)
  184. case msg, ok := <-s.replyStatSub.Messages():
  185. if !ok {
  186. break L
  187. }
  188. msg.Commit()
  189. m := &thumdl.StatMsg{}
  190. if err := json.Unmarshal(msg.Value, m); err != nil {
  191. log.Error("reply json.Unmarshal(%s) error(%+v)", msg.Value, err)
  192. dao.PromError("service:解析reply计数databus消息")
  193. break L
  194. }
  195. if m.Type != "article" {
  196. continue
  197. }
  198. key := m.ID % _sharding
  199. s.statCh[key] <- &artmdl.StatMsg{Reply: &m.Count, Aid: m.ID}
  200. prom.BusinessInfoCount.State(fmt.Sprintf("statChan-%v", key), int64(len(s.statCh[key])))
  201. log.Info("consumeReplyStat key:%s partition:%d offset:%d msg: %+v)", msg.Key, msg.Partition, msg.Offset, m)
  202. case msg, ok := <-s.favoriteStatSub.Messages():
  203. if !ok {
  204. break L
  205. }
  206. msg.Commit()
  207. m := &thumdl.StatMsg{}
  208. if err := json.Unmarshal(msg.Value, m); err != nil {
  209. log.Error("favorite json.Unmarshal(%s) error(%+v)", msg.Value, err)
  210. dao.PromError("service:解析favorite计数databus消息")
  211. break L
  212. }
  213. if m.Type != "article" {
  214. continue
  215. }
  216. key := m.ID % _sharding
  217. s.statCh[key] <- &artmdl.StatMsg{Favorite: &m.Count, Aid: m.ID}
  218. prom.BusinessInfoCount.State(fmt.Sprintf("statChan-%v", key), int64(len(s.statCh[key])))
  219. log.Info("consumeFavoriteStat key:%s partition:%d offset:%d msg: %+v)", msg.Key, msg.Partition, msg.Offset, m)
  220. case msg, ok := <-s.coinStatSub.Messages():
  221. if !ok {
  222. break L
  223. }
  224. msg.Commit()
  225. m := &thumdl.StatMsg{}
  226. if err := json.Unmarshal(msg.Value, m); err != nil {
  227. log.Error("coin json.Unmarshal(%s) error(%+v)", msg.Value, err)
  228. dao.PromError("service:解析coin计数databus消息")
  229. break L
  230. }
  231. if m.Type != "article" {
  232. continue
  233. }
  234. key := m.ID % _sharding
  235. s.statCh[key] <- &artmdl.StatMsg{Coin: &m.Count, Aid: m.ID}
  236. prom.BusinessInfoCount.State(fmt.Sprintf("statChan-%v", key), int64(len(s.statCh[key])))
  237. log.Info("consumeCoinStat key:%s partition:%d offset:%d msg: %+v)", msg.Key, msg.Partition, msg.Offset, m)
  238. }
  239. }
  240. }
  241. // consumeCanal consumes article's binlog databus.
  242. func (s *Service) consumeCanal() {
  243. defer s.waiter.Done()
  244. var c = context.TODO()
  245. for {
  246. msg, ok := <-s.articleSub.Messages()
  247. if !ok {
  248. log.Info("databus: article-job binlog consumer exit!")
  249. return
  250. }
  251. s.binCnt++
  252. msg.Commit()
  253. m := &model.Message{}
  254. if err := json.Unmarshal(msg.Value, m); err != nil {
  255. log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
  256. continue
  257. }
  258. switch m.Table {
  259. case _articleTable:
  260. s.upArticles(c, m.Action, m.New, m.Old)
  261. case _authorTable:
  262. s.upAuthors(c, m.Action, m.New, m.Old)
  263. }
  264. log.Info("consumeCanal key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  265. }
  266. }
  267. func (s *Service) retryStat() {
  268. defer s.waiter.Done()
  269. var (
  270. err error
  271. bs []byte
  272. c = context.TODO()
  273. )
  274. for {
  275. if s.closed {
  276. return
  277. }
  278. bs, err = s.dao.PopStat(c)
  279. if err != nil || bs == nil {
  280. time.Sleep(time.Second)
  281. continue
  282. }
  283. msg := &dao.StatRetry{}
  284. if err = json.Unmarshal(bs, msg); err != nil {
  285. log.Error("json.Unmarshal(%s) error(%+v)", bs, err)
  286. dao.PromError("service:解析计数重试消息")
  287. continue
  288. }
  289. if msg.Count > dao.RetryStatCount {
  290. continue
  291. }
  292. log.Info("retry: %s", bs)
  293. switch msg.Action {
  294. case dao.RetryUpdateStatCache:
  295. if err = s.updateCache(c, msg.Data, msg.Count+1); err != nil {
  296. time.Sleep(100 * time.Millisecond)
  297. }
  298. dao.PromInfo("service:重试计数更新缓存")
  299. case dao.RetryUpdateStatDB:
  300. if err = s.updateDB(c, msg.Data, msg.Count+1); err != nil {
  301. time.Sleep(100 * time.Millisecond)
  302. }
  303. dao.PromInfo("service:重试计数更新DB")
  304. }
  305. }
  306. }
  307. func (s *Service) retryReply() {
  308. defer s.waiter.Done()
  309. var (
  310. err error
  311. aid, mid int64
  312. c = context.TODO()
  313. )
  314. for {
  315. if s.closed {
  316. return
  317. }
  318. aid, mid, err = s.dao.PopReply(c)
  319. if err != nil || aid == 0 || mid == 0 {
  320. time.Sleep(time.Second)
  321. continue
  322. }
  323. log.Info("retry reply: aid(%d) mid(%d)", aid, mid)
  324. if err = s.openReply(c, aid, mid); err != nil {
  325. log.Error("s.openReply(%d,%d) error(%+v)", aid, mid, err)
  326. dao.PromInfo("service:重试打开评论区")
  327. time.Sleep(100 * time.Millisecond)
  328. continue
  329. }
  330. log.Info("s.openReply(%d,%d) retry success", aid, mid)
  331. }
  332. }
  333. func (s *Service) retryCDN() {
  334. defer s.waiter.Done()
  335. var (
  336. err error
  337. file string
  338. c = context.TODO()
  339. )
  340. for {
  341. if s.closed {
  342. return
  343. }
  344. file, err = s.dao.PopCDN(c)
  345. if err != nil || file == "" {
  346. time.Sleep(time.Second)
  347. continue
  348. }
  349. log.Info("retry CDN: file(%s)", file)
  350. if err = s.dao.PurgeCDN(c, file); err != nil {
  351. log.Error("s.dao.PurgeCDN(%s) error(%+v)", file, err)
  352. dao.PromInfo("service:刷新CDN重试")
  353. time.Sleep(100 * time.Millisecond)
  354. continue
  355. }
  356. log.Info("s.dao.PurgeCDN(%s) retry success.", file)
  357. }
  358. }
  359. func (s *Service) retryCache() {
  360. defer s.waiter.Done()
  361. var (
  362. err error
  363. bs []byte
  364. c = context.TODO()
  365. )
  366. for {
  367. if s.closed {
  368. return
  369. }
  370. bs, err = s.dao.PopArtCache(c)
  371. if err != nil || bs == nil {
  372. time.Sleep(time.Second)
  373. continue
  374. }
  375. msg := &dao.CacheRetry{}
  376. if err = json.Unmarshal(bs, msg); err != nil {
  377. log.Error("json.Unmarshal(%s) error(%+v)", bs, err)
  378. dao.PromError("service:解析文章缓存重试消息")
  379. continue
  380. }
  381. log.Info("retry cache: %s", bs)
  382. switch msg.Action {
  383. case dao.RetryAddArtCache:
  384. if err = s.addArtCache(c, msg.Aid); err != nil {
  385. time.Sleep(100 * time.Millisecond)
  386. }
  387. dao.PromInfo("service:重试添加文章缓存")
  388. case dao.RetryUpdateArtCache:
  389. if err = s.updateArtCache(c, msg.Aid, msg.Cid); err != nil {
  390. time.Sleep(100 * time.Millisecond)
  391. }
  392. dao.PromInfo("service:重试更新文章缓存")
  393. case dao.RetryDeleteArtCache:
  394. if err = s.deleteArtCache(c, msg.Aid, msg.Mid); err != nil {
  395. time.Sleep(100 * time.Millisecond)
  396. }
  397. dao.PromInfo("service:重试删除文章缓存")
  398. case dao.RetryDeleteArtRecCache:
  399. if err = s.deleteArtRecommendCache(c, msg.Aid, msg.Cid); err != nil {
  400. time.Sleep(100 * time.Millisecond)
  401. }
  402. dao.PromInfo("service:重试删除文章推荐缓存")
  403. }
  404. }
  405. }
  406. // checkConsumer checks consumer state.
  407. func (s *Service) checkConsumer() {
  408. defer s.waiter.Done()
  409. if env.DeployEnv != env.DeployEnvProd {
  410. return
  411. }
  412. var (
  413. // ctx = context.TODO()
  414. // c* means sum of consumed messages.
  415. c1, c2 int64
  416. )
  417. for {
  418. time.Sleep(5 * time.Hour)
  419. if s.statCnt-c1 == 0 {
  420. // msg := "databus: article-job stat did not consume within a minute"
  421. // s.monitor.Sms(ctx, s.c.SMS.Phone, s.c.SMS.Token, msg)
  422. // log.Warn(msg)
  423. log.Warn("databus: article-job stat did not consume within a minute")
  424. }
  425. c1 = s.statCnt
  426. if s.binCnt-c2 == 0 {
  427. // msg := "databus: article-job binlog did not consume within a minute"
  428. // s.monitor.Sms(ctx, s.c.SMS.Phone, s.c.SMS.Token, msg)
  429. // log.Warn(msg)
  430. log.Warn("databus: article-job binlog did not consume within a minute")
  431. }
  432. c2 = s.binCnt
  433. }
  434. }
  435. // Ping reports the heath of services.
  436. func (s *Service) Ping(c context.Context) (err error) {
  437. return s.dao.Ping(c)
  438. }
  439. // Close releases resources which owned by the Service instance.
  440. func (s *Service) Close() (err error) {
  441. defer s.waiter.Wait()
  442. s.articleSub.Close()
  443. s.articleStatSub.Close()
  444. s.closed = true
  445. log.Info("article-job has been closed.")
  446. return
  447. }
  448. func (s *Service) retryGame() {
  449. defer s.waiter.Done()
  450. var (
  451. err error
  452. info *model.GameCacheRetry
  453. c = context.TODO()
  454. )
  455. for {
  456. if s.closed {
  457. return
  458. }
  459. info, err = s.dao.PopGameCache(c)
  460. if (err != nil) || (info == nil) {
  461. time.Sleep(time.Second)
  462. continue
  463. }
  464. for {
  465. log.Info("retry_game: %+v", info)
  466. dao.PromInfo("service:重试同步游戏")
  467. if err = s.dao.GameSync(c, info.Action, info.Aid); err != nil {
  468. time.Sleep(time.Millisecond * 100)
  469. continue
  470. }
  471. break
  472. }
  473. log.Info("s.GameSync(%s, aid: %d) retry success", info.Action, info.Aid)
  474. }
  475. }
  476. func (s *Service) activityLikeproc() {
  477. var c = context.TODO()
  478. for {
  479. msg, ok := <-s.likeCh
  480. if !ok {
  481. log.Info("activityLikeproc: exit!")
  482. return
  483. }
  484. s.dao.LikeSync(c, msg.ID, msg.Count)
  485. }
  486. }
  487. func (s *Service) retryFlow() {
  488. defer s.waiter.Done()
  489. var (
  490. err error
  491. info *model.FlowCacheRetry
  492. c = context.TODO()
  493. )
  494. for {
  495. if s.closed {
  496. return
  497. }
  498. info, err = s.dao.PopFlowCache(c)
  499. if (err != nil) || (info == nil) {
  500. time.Sleep(time.Second)
  501. continue
  502. }
  503. for {
  504. log.Info("retry_flow: %+v", info)
  505. dao.PromInfo("service:重试同步flow")
  506. if err = s.dao.FlowSync(c, info.Mid, info.Aid); err != nil {
  507. time.Sleep(time.Second * 1)
  508. continue
  509. }
  510. break
  511. }
  512. log.Info("s.FlowSync(mid:%v, aid: %d) retry success", info.Mid, info.Aid)
  513. }
  514. }
  515. func (s *Service) retryDynamic() {
  516. defer s.waiter.Done()
  517. var (
  518. err error
  519. info *model.DynamicCacheRetry
  520. c = context.TODO()
  521. )
  522. for {
  523. if s.closed {
  524. return
  525. }
  526. info, err = s.dao.PopDynamicCache(c)
  527. if (err != nil) || (info == nil) {
  528. time.Sleep(time.Second)
  529. continue
  530. }
  531. for {
  532. log.Info("retry_dynamic: %+v", info)
  533. dao.PromInfo("service:重试同步dynamic")
  534. if err = s.dao.PubDynamic(c, info.Mid, info.Aid, info.Show, info.Comment, info.Ts, info.DynamicIntro); err != nil {
  535. time.Sleep(time.Second * 1)
  536. continue
  537. }
  538. break
  539. }
  540. log.Info("s.PubDynamic(mid:%v, aid: %d) retry success %+v", info.Mid, info.Aid, info)
  541. }
  542. }
  543. func (s *Service) updateReadCountproc() {
  544. var c = context.TODO()
  545. for {
  546. err := s.articleRPC.RebuildAllListReadCount(c)
  547. if err != nil {
  548. log.Error("s.updateReadCountproc() err: %+v", err)
  549. time.Sleep(time.Second * 5)
  550. continue
  551. }
  552. log.Info("s.updateReadCountproc() success")
  553. dao.PromError("更新文集阅读数")
  554. time.Sleep(time.Duration(s.c.Job.ListReadCountInterval))
  555. }
  556. }
  557. func (s *Service) updateHotspotsproc() {
  558. var c = context.TODO()
  559. var lastUpdate int64
  560. for {
  561. var force bool
  562. duration := int64(time.Duration(s.c.Job.HotspotForceInterval) / time.Second)
  563. if (time.Now().Unix() - lastUpdate) > duration {
  564. force = true
  565. }
  566. err := s.articleRPC.UpdateHotspots(c, &artmdl.ArgForce{Force: force})
  567. if err != nil {
  568. log.Error("s.UpdateHotspots() err: %+v", err)
  569. dao.PromError("更新热点运营文章")
  570. time.Sleep(time.Second * 5)
  571. continue
  572. }
  573. if force {
  574. lastUpdate = time.Now().Unix()
  575. }
  576. log.Info("s.UpdateHotspots() success force:%v", force)
  577. time.Sleep(time.Duration(s.c.Job.HotspotInterval))
  578. }
  579. }
  580. func (s *Service) updateCheatArtsproc() {
  581. var c = context.TODO()
  582. for {
  583. arts, err := s.dao.CheatArts(c)
  584. if err != nil {
  585. log.Error("s.updateCheatArtsproc() err: %+v", err)
  586. dao.PromError("更新反作弊文章列表")
  587. time.Sleep(time.Second * 5)
  588. continue
  589. }
  590. log.Info("s.updateCheatArtsproc() success, len: %v", len(arts))
  591. s.cheatArts = arts
  592. time.Sleep(time.Minute)
  593. }
  594. }