service.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package watermark
  2. import (
  3. "context"
  4. "time"
  5. "sync"
  6. "go-common/app/interface/main/creative/conf"
  7. "go-common/app/interface/main/creative/dao/account"
  8. "go-common/app/interface/main/creative/dao/bfs"
  9. "go-common/app/interface/main/creative/dao/drawimg"
  10. "go-common/app/interface/main/creative/dao/monitor"
  11. "go-common/app/interface/main/creative/dao/watermark"
  12. wmMDL "go-common/app/interface/main/creative/model/watermark"
  13. "go-common/app/interface/main/creative/service"
  14. "go-common/library/log"
  15. "go-common/library/queue/databus"
  16. )
  17. //Service struct
  18. type Service struct {
  19. c *conf.Config
  20. wm *watermark.Dao
  21. drawimg *drawimg.Dao
  22. acc *account.Dao
  23. bfs *bfs.Dao
  24. // wait group
  25. wg sync.WaitGroup
  26. // databus sub
  27. userInfoSub *databus.Databus
  28. // monitor
  29. monitor *monitor.Dao
  30. userInfoMo int64
  31. // closed
  32. closed bool
  33. //async set watermark
  34. wmChan chan *wmMDL.WatermarkParam
  35. //task
  36. p *service.Public
  37. }
  38. //New get service
  39. func New(c *conf.Config, rpcdaos *service.RPCDaos, p *service.Public) *Service {
  40. s := &Service{
  41. c: c,
  42. wm: watermark.New(c),
  43. drawimg: drawimg.New(c),
  44. acc: rpcdaos.Acc,
  45. bfs: bfs.New(c),
  46. monitor: monitor.New(c),
  47. userInfoSub: databus.New(c.UserInfoSub),
  48. wmChan: make(chan *wmMDL.WatermarkParam, 1024),
  49. p: p,
  50. }
  51. if c.WaterMark.Consume {
  52. s.wg.Add(1)
  53. go s.userInfoConsumer()
  54. go s.monitorConsume()
  55. }
  56. go s.asyncWmSetProc()
  57. return s
  58. }
  59. // Ping service
  60. func (s *Service) Ping(c context.Context) (err error) {
  61. if err = s.wm.Ping(c); err != nil {
  62. log.Error("s.watermark.Dao.PingDb err(%v)", err)
  63. }
  64. return
  65. }
  66. func (s *Service) monitorConsume() {
  67. var userinfo int64
  68. for {
  69. time.Sleep(1 * time.Minute)
  70. if s.userInfoMo-userinfo == 0 {
  71. s.monitor.Send(context.TODO(), "creative userinfo did not consume within a minute")
  72. }
  73. userinfo = s.userInfoMo
  74. }
  75. }
  76. // Close dao
  77. func (s *Service) Close() {
  78. s.userInfoSub.Close()
  79. s.closed = true
  80. s.wm.Close()
  81. s.wg.Wait()
  82. }