service.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/admin/main/videoup-task/conf"
  7. "go-common/app/admin/main/videoup-task/dao"
  8. "go-common/app/admin/main/videoup-task/model"
  9. account "go-common/app/service/main/account/api"
  10. upsrpc "go-common/app/service/main/up/api/v1"
  11. "go-common/library/database/elastic"
  12. "go-common/library/log"
  13. bm "go-common/library/net/http/blademaster"
  14. )
  15. // MemberCache .
  16. type MemberCache struct {
  17. sync.RWMutex
  18. uptime time.Time
  19. ms map[int64]*model.MemberStat
  20. }
  21. //Service service
  22. type Service struct {
  23. c *conf.Config
  24. dao *dao.Dao
  25. es *elastic.Elastic
  26. httpClient *bm.Client
  27. memberCache *MemberCache
  28. // cache
  29. typeCache map[int16]*model.Type
  30. reviewCache *model.ReviewCache
  31. twConCache map[int8]map[int64]*model.WCItem
  32. upperCache map[int8]map[int64]struct{}
  33. typeCache2 map[int16][]int64 // 记录每个一级分区下的二级分区
  34. //grpc
  35. accRPC account.AccountClient
  36. upsRPC upsrpc.UpClient
  37. }
  38. //New new service
  39. func New(conf *conf.Config) (svr *Service) {
  40. svr = &Service{
  41. c: conf,
  42. dao: dao.New(conf),
  43. es: elastic.NewElastic(nil),
  44. httpClient: bm.NewClient(conf.HTTPClient),
  45. memberCache: &MemberCache{},
  46. // cache
  47. reviewCache: model.NewRC(),
  48. }
  49. var err error
  50. if svr.accRPC, err = account.NewClient(conf.GRPC.AccRPC); err != nil {
  51. panic(err)
  52. }
  53. if svr.upsRPC, err = upsrpc.NewClient(conf.GRPC.UpsRPC); err != nil {
  54. panic(err)
  55. }
  56. go svr.memberproc()
  57. svr.loadConf()
  58. go svr.cacheproc()
  59. go svr.delProc()
  60. svr.loadRC()
  61. go svr.loadRCproc()
  62. return
  63. }
  64. //Close close
  65. func (s *Service) Close() {
  66. s.dao.Close()
  67. }
  68. //Ping ping
  69. func (s *Service) Ping(ctx context.Context) (err error) {
  70. err = s.dao.Ping(ctx)
  71. return
  72. }
  73. func (s *Service) loadConf() {
  74. var (
  75. err error
  76. tpm map[int16]*model.Type
  77. tpm2 map[int16][]int64
  78. twConCache map[int8]map[int64]*model.WCItem
  79. upm map[int8]map[int64]struct{}
  80. )
  81. if tpm, err = s.dao.TypeMapping(context.TODO()); err != nil {
  82. log.Error("s.dao.TypeMapping error(%v)", err)
  83. return
  84. }
  85. s.typeCache = tpm
  86. tpm2 = make(map[int16][]int64)
  87. for id, tmod := range tpm {
  88. if tmod.PID == 0 {
  89. if _, ok := tpm2[id]; !ok {
  90. tpm2[id] = []int64{}
  91. }
  92. continue
  93. }
  94. arrid, ok := tpm2[tmod.PID]
  95. if !ok {
  96. tpm2[tmod.PID] = []int64{int64(id)}
  97. } else {
  98. tpm2[tmod.PID] = append(arrid, int64(id))
  99. }
  100. }
  101. s.typeCache2 = tpm2
  102. if twConCache, err = s.weightConf(context.TODO()); err != nil {
  103. log.Error("s.weightConf error(%v)", err)
  104. return
  105. }
  106. s.twConCache = twConCache
  107. upm, err = s.upSpecial(context.TODO())
  108. if err != nil {
  109. log.Error("s.upSpecial error(%v)", err)
  110. return
  111. }
  112. s.upperCache = upm
  113. }
  114. func (s *Service) cacheproc() {
  115. for {
  116. time.Sleep(3 * time.Minute)
  117. s.loadConf()
  118. }
  119. }