service.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/admin/main/videoup/dao/monitor"
  5. "math"
  6. "sync"
  7. "time"
  8. "go-common/app/admin/main/videoup/conf"
  9. arcdao "go-common/app/admin/main/videoup/dao/archive"
  10. datadao "go-common/app/admin/main/videoup/dao/data"
  11. busdao "go-common/app/admin/main/videoup/dao/databus"
  12. mngdao "go-common/app/admin/main/videoup/dao/manager"
  13. musicdao "go-common/app/admin/main/videoup/dao/music"
  14. overseadao "go-common/app/admin/main/videoup/dao/oversea"
  15. searchdao "go-common/app/admin/main/videoup/dao/search"
  16. staffdao "go-common/app/admin/main/videoup/dao/staff"
  17. tagDao "go-common/app/admin/main/videoup/dao/tag"
  18. taskdao "go-common/app/admin/main/videoup/dao/task"
  19. trackdao "go-common/app/admin/main/videoup/dao/track"
  20. arcmdl "go-common/app/admin/main/videoup/model/archive"
  21. "go-common/app/admin/main/videoup/model/manager"
  22. mngmdl "go-common/app/admin/main/videoup/model/manager"
  23. msgmdl "go-common/app/admin/main/videoup/model/message"
  24. accApi "go-common/app/service/main/account/api"
  25. upsrpc "go-common/app/service/main/up/api/v1"
  26. "go-common/library/log"
  27. "go-common/library/queue/databus"
  28. "github.com/jinzhu/gorm"
  29. "go-common/library/net/http/blademaster/middleware/permit"
  30. )
  31. // Service is service.
  32. type Service struct {
  33. c *conf.Config
  34. arc *arcdao.Dao
  35. busCache *busdao.Dao
  36. mng *mngdao.Dao
  37. oversea *overseadao.Dao
  38. track *trackdao.Dao
  39. music *musicdao.Dao
  40. tag *tagDao.Dao
  41. DB *gorm.DB
  42. search *searchdao.Dao
  43. staff *staffdao.Dao
  44. overseaDB *gorm.DB
  45. data *datadao.Dao
  46. monitor *monitor.Dao
  47. task *taskdao.Dao
  48. // acc rpc
  49. accRPC accApi.AccountClient
  50. upsRPC upsrpc.UpClient
  51. // databus
  52. videoupPub *databus.Databus
  53. upCreditPub *databus.Databus
  54. // cache: upper
  55. adtTpsCache map[int16]struct{}
  56. thrTpsCache map[int16]int
  57. thrMin, thrMax int
  58. upperCache map[int8]map[int64]struct{} //TODO 这个缓存需要从up服务里取
  59. allUpGroupCache map[int64]*manager.UpGroup //UP主分组列表
  60. fansCache int64
  61. roundTpsCache map[int16]struct{}
  62. typeCache map[int16]*arcmdl.Type
  63. typeCache2 map[int16][]int64 // 记录每个一级分区下的二级分区
  64. flowsCache map[int64]string
  65. porderConfigCache map[int64]*arcmdl.PorderConfig
  66. twConCache map[int8]map[int64]*arcmdl.WCItem
  67. // error chan
  68. msgCh chan *msgmdl.Videoup
  69. // wait
  70. wg sync.WaitGroup
  71. closed bool
  72. stop chan struct{}
  73. auth *permit.Permit
  74. }
  75. // New is videoup-admin service implementation.
  76. func New(c *conf.Config) (s *Service) {
  77. s = &Service{
  78. c: c,
  79. arc: arcdao.New(c),
  80. mng: mngdao.New(c),
  81. oversea: overseadao.New(c),
  82. music: musicdao.New(c),
  83. track: trackdao.New(c),
  84. busCache: busdao.New(c),
  85. search: searchdao.New(c),
  86. staff: staffdao.New(c),
  87. tag: tagDao.New(c),
  88. data: datadao.New(c),
  89. monitor: monitor.New(c),
  90. task: taskdao.New(c),
  91. // pub
  92. videoupPub: databus.New(c.VideoupPub),
  93. upCreditPub: databus.New(c.UpCreditPub),
  94. // chan
  95. msgCh: make(chan *msgmdl.Videoup, c.ChanSize),
  96. // map
  97. stop: make(chan struct{}),
  98. auth: permit.New(c.Auth),
  99. }
  100. var err error
  101. if s.accRPC, err = accApi.NewClient(c.AccountRPC); err != nil {
  102. panic(err)
  103. }
  104. if s.upsRPC, err = upsrpc.NewClient(c.UpsRPC); err != nil {
  105. panic(err)
  106. }
  107. s.DB = s.music.DB
  108. s.overseaDB = s.oversea.OverseaDB
  109. // load cache.
  110. s.loadType()
  111. s.loadUpGroups()
  112. s.loadUpper()
  113. s.loadConf()
  114. go s.cacheproc()
  115. go s.msgproc()
  116. s.wg.Add(1) // NOTE: sync add wait group
  117. go s.multSyncProc()
  118. return s
  119. }
  120. func (s *Service) isWhite(mid int64) bool {
  121. if ups, ok := s.upperCache[mngmdl.UpperTypeWhite]; ok {
  122. _, is := ups[mid]
  123. return is
  124. }
  125. return false
  126. }
  127. //PGCWhite whether mid in pgcwhite list
  128. func (s *Service) PGCWhite(mid int64) bool {
  129. if ups, ok := s.upperCache[mngmdl.UpperTypePGCWhite]; ok {
  130. _, is := ups[mid]
  131. return is
  132. }
  133. return false
  134. }
  135. func (s *Service) isBlack(mid int64) bool {
  136. if ups, ok := s.upperCache[mngmdl.UpperTypeBlack]; ok {
  137. _, is := ups[mid]
  138. return is
  139. }
  140. return false
  141. }
  142. func (s *Service) getAllUPGroups(mid int64) (gs []int64) {
  143. gs = []int64{}
  144. for tp, item := range s.upperCache {
  145. if _, exist := item[mid]; !exist {
  146. continue
  147. }
  148. gs = append(gs, int64(tp))
  149. }
  150. return
  151. }
  152. func (s *Service) isAuditType(tpID int16) bool {
  153. _, isAt := s.adtTpsCache[tpID]
  154. return isAt
  155. }
  156. func (s *Service) isRoundType(tpID int16) bool {
  157. _, in := s.roundTpsCache[tpID]
  158. return in
  159. }
  160. func (s *Service) isTypeID(tpID int16) bool {
  161. _, in := s.typeCache[tpID]
  162. return in
  163. }
  164. func (s *Service) loadType() {
  165. // TODO : audit types
  166. // threshold
  167. thr, err := s.arc.ThresholdConf(context.TODO())
  168. if err != nil {
  169. log.Error("s.arc.ThresholdConf error(%v)", err)
  170. return
  171. }
  172. s.thrTpsCache = thr
  173. var min, max = math.MaxInt32, 0
  174. for _, t := range thr {
  175. if min > t {
  176. min = t
  177. }
  178. if max < t {
  179. max = t
  180. }
  181. }
  182. s.thrMin = min
  183. s.thrMax = max
  184. }
  185. func (s *Service) loadUpper() {
  186. upm, err := s.upSpecial(context.Background())
  187. if err != nil {
  188. log.Error("s.upSpecial error(%v)", err)
  189. return
  190. }
  191. s.upperCache = upm
  192. }
  193. // loadUpGroups 加载所有UP分组列表
  194. func (s *Service) loadUpGroups() {
  195. groups, err := s.mng.UpGroups(context.TODO())
  196. if err != nil {
  197. log.Error("s.mng.UpGroups() error(%v)", err)
  198. return
  199. }
  200. s.allUpGroupCache = groups
  201. }
  202. func (s *Service) loadConf() {
  203. var (
  204. fans int64
  205. err error
  206. auditTypes map[int16]struct{}
  207. roundTypes map[int16]struct{}
  208. flows map[int64]string
  209. tpm map[int16]*arcmdl.Type
  210. porderConfigs map[int64]*arcmdl.PorderConfig
  211. twConCache map[int8]map[int64]*arcmdl.WCItem
  212. tpm2 map[int16][]int64
  213. )
  214. if fans, err = s.arc.FansConf(context.TODO()); err != nil {
  215. log.Error("s.arc.FansConf error(%v)", err)
  216. return
  217. }
  218. s.fansCache = fans
  219. if auditTypes, err = s.arc.AuditTypesConf(context.TODO()); err != nil {
  220. log.Error("s.arc.AuditTypesConf error(%v)", err)
  221. return
  222. }
  223. s.adtTpsCache = auditTypes
  224. if roundTypes, err = s.arc.RoundTypeConf(context.TODO()); err != nil {
  225. log.Error("s.arc.RoundTypeConf error(%v)", err)
  226. return
  227. }
  228. s.roundTpsCache = roundTypes
  229. if flows, err = s.arc.Flows(context.TODO()); err != nil {
  230. log.Error("s.arc.Flows error(%v)", err)
  231. return
  232. }
  233. s.flowsCache = flows
  234. if tpm, err = s.arc.TypeMapping(context.TODO()); err != nil {
  235. log.Error("s.arc.TypeMapping error(%v)", err)
  236. return
  237. }
  238. s.typeCache = tpm
  239. tpm2 = make(map[int16][]int64)
  240. for id, tmod := range tpm {
  241. if tmod.PID == 0 {
  242. if _, ok := tpm2[id]; !ok {
  243. tpm2[id] = []int64{}
  244. }
  245. continue
  246. }
  247. arrid, ok := tpm2[tmod.PID]
  248. if !ok {
  249. tpm2[tmod.PID] = []int64{int64(id)}
  250. } else {
  251. tpm2[tmod.PID] = append(arrid, int64(id))
  252. }
  253. }
  254. s.typeCache2 = tpm2
  255. if porderConfigs, err = s.arc.PorderConfig(context.TODO()); err != nil {
  256. log.Error("s.arc.PorderConfig error(%v)", err)
  257. return
  258. }
  259. s.porderConfigCache = porderConfigs
  260. if twConCache, err = s.weightConf(context.TODO()); err != nil {
  261. log.Error("s.weightConf error(%v)", err)
  262. return
  263. }
  264. s.twConCache = twConCache
  265. }
  266. func (s *Service) cacheproc() {
  267. for {
  268. time.Sleep(3 * time.Minute)
  269. s.loadType()
  270. s.loadUpper()
  271. s.loadUpGroups()
  272. s.loadConf()
  273. s.lockVideo()
  274. go s.MonitorNotifyResult(context.TODO())
  275. }
  276. }
  277. // Close consumer close.
  278. func (s *Service) Close() {
  279. s.arc.Close()
  280. s.mng.Close()
  281. s.music.Close()
  282. s.busCache.Close()
  283. time.Sleep(1 * time.Second)
  284. close(s.stop)
  285. close(s.msgCh)
  286. s.closed = true
  287. s.wg.Wait()
  288. }
  289. // Ping check server ok.
  290. func (s *Service) Ping(c context.Context) (err error) {
  291. if err = s.arc.Ping(c); err != nil {
  292. return
  293. }
  294. if err = s.mng.Ping(c); err != nil {
  295. return
  296. }
  297. return s.busCache.Ping(c)
  298. }