service.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "time"
  7. "go-common/app/job/main/feed/conf"
  8. "go-common/app/job/main/feed/dao"
  9. "go-common/app/job/main/feed/model"
  10. feed "go-common/app/service/main/feed/rpc/client"
  11. "go-common/library/conf/env"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. )
  15. type Service struct {
  16. c *conf.Config
  17. dao *dao.Dao
  18. archiveSub *databus.Databus
  19. arcUpMo int64
  20. feedRPC *feed.Service
  21. waiter *sync.WaitGroup
  22. }
  23. // New is feed service implementation.
  24. func New(c *conf.Config) (s *Service) {
  25. s = &Service{
  26. c: c,
  27. dao: dao.New(c),
  28. archiveSub: databus.New(c.ArchiveSub),
  29. feedRPC: feed.New(c.FeedRPC),
  30. waiter: new(sync.WaitGroup),
  31. }
  32. // arc databus consumer
  33. s.waiter.Add(1)
  34. go s.arcConsumeproc()
  35. go s.checkConsumeproc()
  36. return s
  37. }
  38. // arcConsumeproc consumer archive
  39. func (s *Service) arcConsumeproc() {
  40. var (
  41. msgs = s.archiveSub.Messages()
  42. err error
  43. )
  44. defer s.waiter.Done()
  45. for {
  46. msg, ok := <-msgs
  47. if !ok {
  48. log.Info("arc databus Consumer exit")
  49. return
  50. }
  51. s.arcUpMo++
  52. dao.PromInfo("消费稿件变更")
  53. msg.Commit()
  54. m := &model.Message{}
  55. if err = json.Unmarshal(msg.Value, m); err != nil {
  56. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  57. continue
  58. }
  59. if m.Table != "archive" {
  60. continue
  61. }
  62. s.archiveUpdate(m.Action, m.New, m.Old)
  63. }
  64. }
  65. // checkConsumeproc check consumer stat
  66. func (s *Service) checkConsumeproc() {
  67. if env.DeployEnv != env.DeployEnvProd {
  68. return
  69. }
  70. var arcMo int64
  71. for {
  72. time.Sleep(1 * time.Minute)
  73. if s.arcUpMo-arcMo == 0 {
  74. msg := "feed-job arhieve did not consume within a minute"
  75. s.dao.SendSMS(msg)
  76. log.Warn(msg)
  77. }
  78. arcMo = s.arcUpMo
  79. }
  80. }
  81. // Close Databus consumer close.
  82. func (s *Service) Close() error {
  83. return s.archiveSub.Close()
  84. }
  85. // Wait goroutinue to close
  86. func (s *Service) Wait() {
  87. s.waiter.Wait()
  88. }
  89. // Ping check server ok
  90. func (s *Service) Ping(c context.Context) error {
  91. return s.dao.Ping(c)
  92. }