service.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package web
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "time"
  7. "go-common/app/job/main/web-goblin/conf"
  8. mdlweb "go-common/app/job/main/web-goblin/dao/web"
  9. "go-common/app/job/main/web-goblin/model/web"
  10. dymdl "go-common/app/service/main/dynamic/model"
  11. dyrpc "go-common/app/service/main/dynamic/rpc/client"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. )
  15. // Service struct .
  16. type Service struct {
  17. c *conf.Config
  18. dao *mdlweb.Dao
  19. dy *dyrpc.Service
  20. waiter *sync.WaitGroup
  21. archiveNotifySub *databus.Databus
  22. }
  23. // New init .
  24. func New(c *conf.Config) (s *Service) {
  25. s = &Service{
  26. c: c,
  27. dao: mdlweb.New(c),
  28. dy: dyrpc.New(c.DynamicRPC),
  29. waiter: new(sync.WaitGroup),
  30. archiveNotifySub: databus.New(c.ArchiveNotifySub),
  31. }
  32. go s.broadcastDy()
  33. s.waiter.Add(1)
  34. go s.allSearch()
  35. return s
  36. }
  37. // Ping Service .
  38. func (s *Service) Ping(c context.Context) (err error) {
  39. return s.dao.Ping(c)
  40. }
  41. // Close Service .
  42. func (s *Service) Close() {
  43. s.dao.Close()
  44. }
  45. func (s *Service) broadcastDy() {
  46. var (
  47. dynamics map[string]int
  48. err error
  49. b []byte
  50. )
  51. for {
  52. if dynamics, err = s.dy.RegionTotal(context.Background(), &dymdl.ArgRegionTotal{RealIP: ""}); err != nil {
  53. mdlweb.PromError("RegionTotal接口错误", "s.dy.RegionTotal error(%v)", err)
  54. time.Sleep(time.Second)
  55. continue
  56. }
  57. if b, err = json.Marshal(dynamics); err != nil {
  58. log.Error("broadcastDy json.Marshal error(%v)", err)
  59. return
  60. }
  61. if err = s.dao.PushAll(context.Background(), string(b), ""); err != nil {
  62. log.Error("s.dao.PushAll(%+v) error(%v)", dynamics, err)
  63. time.Sleep(time.Second)
  64. continue
  65. }
  66. time.Sleep(time.Second * 5)
  67. }
  68. }
  69. func (s *Service) allSearch() {
  70. var (
  71. err error
  72. ctx = context.Background()
  73. )
  74. defer s.waiter.Done()
  75. for {
  76. msg, ok := <-s.archiveNotifySub.Messages()
  77. if !ok {
  78. log.Error("web-goblin-job databus Consumer exit")
  79. return
  80. }
  81. res := &web.ArcMsg{}
  82. if err = json.Unmarshal(msg.Value, res); err != nil {
  83. msg.Commit()
  84. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  85. continue
  86. }
  87. if res.Table != _archive {
  88. continue
  89. }
  90. s.UgcIncrement(ctx, res)
  91. msg.Commit()
  92. log.Info("consume allSearch ugc key:%s partition:%d offset:%d)", msg.Key, msg.Partition, msg.Offset)
  93. time.Sleep(10 * time.Millisecond)
  94. }
  95. }