123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- package service
- import (
- "context"
- "go-common/app/admin/main/videoup/dao/monitor"
- "math"
- "sync"
- "time"
- "go-common/app/admin/main/videoup/conf"
- arcdao "go-common/app/admin/main/videoup/dao/archive"
- datadao "go-common/app/admin/main/videoup/dao/data"
- busdao "go-common/app/admin/main/videoup/dao/databus"
- mngdao "go-common/app/admin/main/videoup/dao/manager"
- musicdao "go-common/app/admin/main/videoup/dao/music"
- overseadao "go-common/app/admin/main/videoup/dao/oversea"
- searchdao "go-common/app/admin/main/videoup/dao/search"
- staffdao "go-common/app/admin/main/videoup/dao/staff"
- tagDao "go-common/app/admin/main/videoup/dao/tag"
- taskdao "go-common/app/admin/main/videoup/dao/task"
- trackdao "go-common/app/admin/main/videoup/dao/track"
- arcmdl "go-common/app/admin/main/videoup/model/archive"
- "go-common/app/admin/main/videoup/model/manager"
- mngmdl "go-common/app/admin/main/videoup/model/manager"
- msgmdl "go-common/app/admin/main/videoup/model/message"
- accApi "go-common/app/service/main/account/api"
- upsrpc "go-common/app/service/main/up/api/v1"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "github.com/jinzhu/gorm"
- "go-common/library/net/http/blademaster/middleware/permit"
- )
- // Service is service.
- type Service struct {
- c *conf.Config
- arc *arcdao.Dao
- busCache *busdao.Dao
- mng *mngdao.Dao
- oversea *overseadao.Dao
- track *trackdao.Dao
- music *musicdao.Dao
- tag *tagDao.Dao
- DB *gorm.DB
- search *searchdao.Dao
- staff *staffdao.Dao
- overseaDB *gorm.DB
- data *datadao.Dao
- monitor *monitor.Dao
- task *taskdao.Dao
- // acc rpc
- accRPC accApi.AccountClient
- upsRPC upsrpc.UpClient
- // databus
- videoupPub *databus.Databus
- upCreditPub *databus.Databus
- // cache: upper
- adtTpsCache map[int16]struct{}
- thrTpsCache map[int16]int
- thrMin, thrMax int
- upperCache map[int8]map[int64]struct{} //TODO 这个缓存需要从up服务里取
- allUpGroupCache map[int64]*manager.UpGroup //UP主分组列表
- fansCache int64
- roundTpsCache map[int16]struct{}
- typeCache map[int16]*arcmdl.Type
- typeCache2 map[int16][]int64 // 记录每个一级分区下的二级分区
- flowsCache map[int64]string
- porderConfigCache map[int64]*arcmdl.PorderConfig
- twConCache map[int8]map[int64]*arcmdl.WCItem
- // error chan
- msgCh chan *msgmdl.Videoup
- // wait
- wg sync.WaitGroup
- closed bool
- stop chan struct{}
- auth *permit.Permit
- }
- // New is videoup-admin service implementation.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- arc: arcdao.New(c),
- mng: mngdao.New(c),
- oversea: overseadao.New(c),
- music: musicdao.New(c),
- track: trackdao.New(c),
- busCache: busdao.New(c),
- search: searchdao.New(c),
- staff: staffdao.New(c),
- tag: tagDao.New(c),
- data: datadao.New(c),
- monitor: monitor.New(c),
- task: taskdao.New(c),
- // pub
- videoupPub: databus.New(c.VideoupPub),
- upCreditPub: databus.New(c.UpCreditPub),
- // chan
- msgCh: make(chan *msgmdl.Videoup, c.ChanSize),
- // map
- stop: make(chan struct{}),
- auth: permit.New(c.Auth),
- }
- var err error
- if s.accRPC, err = accApi.NewClient(c.AccountRPC); err != nil {
- panic(err)
- }
- if s.upsRPC, err = upsrpc.NewClient(c.UpsRPC); err != nil {
- panic(err)
- }
- s.DB = s.music.DB
- s.overseaDB = s.oversea.OverseaDB
- // load cache.
- s.loadType()
- s.loadUpGroups()
- s.loadUpper()
- s.loadConf()
- go s.cacheproc()
- go s.msgproc()
- s.wg.Add(1) // NOTE: sync add wait group
- go s.multSyncProc()
- return s
- }
- func (s *Service) isWhite(mid int64) bool {
- if ups, ok := s.upperCache[mngmdl.UpperTypeWhite]; ok {
- _, is := ups[mid]
- return is
- }
- return false
- }
- //PGCWhite whether mid in pgcwhite list
- func (s *Service) PGCWhite(mid int64) bool {
- if ups, ok := s.upperCache[mngmdl.UpperTypePGCWhite]; ok {
- _, is := ups[mid]
- return is
- }
- return false
- }
- func (s *Service) isBlack(mid int64) bool {
- if ups, ok := s.upperCache[mngmdl.UpperTypeBlack]; ok {
- _, is := ups[mid]
- return is
- }
- return false
- }
- func (s *Service) getAllUPGroups(mid int64) (gs []int64) {
- gs = []int64{}
- for tp, item := range s.upperCache {
- if _, exist := item[mid]; !exist {
- continue
- }
- gs = append(gs, int64(tp))
- }
- return
- }
- func (s *Service) isAuditType(tpID int16) bool {
- _, isAt := s.adtTpsCache[tpID]
- return isAt
- }
- func (s *Service) isRoundType(tpID int16) bool {
- _, in := s.roundTpsCache[tpID]
- return in
- }
- func (s *Service) isTypeID(tpID int16) bool {
- _, in := s.typeCache[tpID]
- return in
- }
- func (s *Service) loadType() {
- // TODO : audit types
- // threshold
- thr, err := s.arc.ThresholdConf(context.TODO())
- if err != nil {
- log.Error("s.arc.ThresholdConf error(%v)", err)
- return
- }
- s.thrTpsCache = thr
- var min, max = math.MaxInt32, 0
- for _, t := range thr {
- if min > t {
- min = t
- }
- if max < t {
- max = t
- }
- }
- s.thrMin = min
- s.thrMax = max
- }
- func (s *Service) loadUpper() {
- upm, err := s.upSpecial(context.Background())
- if err != nil {
- log.Error("s.upSpecial error(%v)", err)
- return
- }
- s.upperCache = upm
- }
- // loadUpGroups 加载所有UP分组列表
- func (s *Service) loadUpGroups() {
- groups, err := s.mng.UpGroups(context.TODO())
- if err != nil {
- log.Error("s.mng.UpGroups() error(%v)", err)
- return
- }
- s.allUpGroupCache = groups
- }
- func (s *Service) loadConf() {
- var (
- fans int64
- err error
- auditTypes map[int16]struct{}
- roundTypes map[int16]struct{}
- flows map[int64]string
- tpm map[int16]*arcmdl.Type
- porderConfigs map[int64]*arcmdl.PorderConfig
- twConCache map[int8]map[int64]*arcmdl.WCItem
- tpm2 map[int16][]int64
- )
- if fans, err = s.arc.FansConf(context.TODO()); err != nil {
- log.Error("s.arc.FansConf error(%v)", err)
- return
- }
- s.fansCache = fans
- if auditTypes, err = s.arc.AuditTypesConf(context.TODO()); err != nil {
- log.Error("s.arc.AuditTypesConf error(%v)", err)
- return
- }
- s.adtTpsCache = auditTypes
- if roundTypes, err = s.arc.RoundTypeConf(context.TODO()); err != nil {
- log.Error("s.arc.RoundTypeConf error(%v)", err)
- return
- }
- s.roundTpsCache = roundTypes
- if flows, err = s.arc.Flows(context.TODO()); err != nil {
- log.Error("s.arc.Flows error(%v)", err)
- return
- }
- s.flowsCache = flows
- if tpm, err = s.arc.TypeMapping(context.TODO()); err != nil {
- log.Error("s.arc.TypeMapping error(%v)", err)
- return
- }
- s.typeCache = tpm
- tpm2 = make(map[int16][]int64)
- for id, tmod := range tpm {
- if tmod.PID == 0 {
- if _, ok := tpm2[id]; !ok {
- tpm2[id] = []int64{}
- }
- continue
- }
- arrid, ok := tpm2[tmod.PID]
- if !ok {
- tpm2[tmod.PID] = []int64{int64(id)}
- } else {
- tpm2[tmod.PID] = append(arrid, int64(id))
- }
- }
- s.typeCache2 = tpm2
- if porderConfigs, err = s.arc.PorderConfig(context.TODO()); err != nil {
- log.Error("s.arc.PorderConfig error(%v)", err)
- return
- }
- s.porderConfigCache = porderConfigs
- if twConCache, err = s.weightConf(context.TODO()); err != nil {
- log.Error("s.weightConf error(%v)", err)
- return
- }
- s.twConCache = twConCache
- }
- func (s *Service) cacheproc() {
- for {
- time.Sleep(3 * time.Minute)
- s.loadType()
- s.loadUpper()
- s.loadUpGroups()
- s.loadConf()
- s.lockVideo()
- go s.MonitorNotifyResult(context.TODO())
- }
- }
- // Close consumer close.
- func (s *Service) Close() {
- s.arc.Close()
- s.mng.Close()
- s.music.Close()
- s.busCache.Close()
- time.Sleep(1 * time.Second)
- close(s.stop)
- close(s.msgCh)
- s.closed = true
- s.wg.Wait()
- }
- // Ping check server ok.
- func (s *Service) Ping(c context.Context) (err error) {
- if err = s.arc.Ping(c); err != nil {
- return
- }
- if err = s.mng.Ping(c); err != nil {
- return
- }
- return s.busCache.Ping(c)
- }
|