123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package service
- import (
- "context"
- "encoding/json"
- "strconv"
- "strings"
- "time"
- "go-common/library/conf/env"
- "go-common/library/net/metadata"
- "go-common/app/job/live/xroom-feed/internal/model"
- "go-common/library/log"
- "go-common/library/sync/errgroup"
- "go-common/library/xstr"
- )
- const (
- _whiteListType = 1
- _condType = 2
- _fileListType = 3
- _areaType = "area"
- _onlineType = "online"
- _incomeType = "income"
- _dmsType = "dms"
- _liveDaysType = "live_days"
- _hourRankType = "hour_rank"
- _anchorCateType = "anchor_cate"
- _roomStatusType = "room_status"
- _condAnd = "and"
- _condOr = "or"
- _confTypeString = "string"
- _confTypeRange = "range"
- _confTypeTop = "top"
- )
- // JobTTl ...
- type JobTTl struct {
- ttl int
- }
- func (s *Service) reloadConfFromDb() {
- t1 := time.NewTicker(time.Second * 5)
- defer t1.Stop()
- for {
- select {
- case <-t1.C:
- confList, err := s.dao.GetConfFromDb()
- if err != nil {
- log.Error("[reloadConfFromDb]getConfFromDb_error:%+v", err)
- continue
- }
- key, expire := s.getRecConfKey()
- listStr, err := json.Marshal(confList)
- if err == nil {
- s.dao.SetRecPoolCache(context.TODO(), key, string(listStr), expire)
- }
- s.ruleConf.Store(confList)
- }
- }
- }
- func (s *Service) loadConfFromDb() {
- confList, err := s.dao.GetConfFromDb()
- if err != nil {
- log.Error("[loadConfFromDb]getConfFromDb_error:%+v", err)
- return
- }
- s.ruleConf.Store(confList)
- }
- func (s *Service) parseCondConf(cond string) (condConf *model.RuleProtocol, err error) {
- condConf = new(model.RuleProtocol)
- err = json.Unmarshal([]byte(cond), &condConf)
- if err != nil {
- log.Error("[parseCondConf]Unmarshal err: %+v", err)
- return
- }
- return
- }
- func (s *Service) reloadRecList() {
- t1 := time.NewTicker(time.Second * 20)
- defer t1.Stop()
- for {
- select {
- case <-t1.C:
- s.genRecListJob()
- }
- }
- }
- // 对所有配置的规则取数据的主程序
- func (s *Service) genRecListJob() {
- log.Info("[genRecListJob]genRecListJob start")
- ttl := 20
- ttl, err := s.ac.Get("JobTtl").Int()
- if err != nil {
- log.Error("[genRecListJob]getJobTtlConfFromSven_error:%+v", err)
- }
- confList := s.ruleConf.Load()
- // assert
- res, ok := confList.([]*model.RecPoolConf)
- if !ok {
- log.Error("[genRecListJob]conf assert error! %+v", confList)
- return
- }
- if len(res) <= 0 {
- log.Warn("[genRecListJob]confList_empty")
- return
- }
- //可配置时间结束子任务 内部要有阻塞操作
- cCtx := metadata.NewContext(context.TODO(), metadata.MD{metadata.Color: env.Color})
- ctx, cancel := context.WithTimeout(cCtx, time.Duration(ttl)*time.Second)
- wg := errgroup.Group{}
- // 每条规则之间无关联,可以不互相cancel,只看超时ctx (可在内部对每个操作设置)
- // 规则获取loop
- for _, rule := range res {
- // 白名单规则
- if rule.ConfType == _whiteListType || rule.ConfType == _fileListType {
- ruleId := rule.Id
- wg.Go(func() error {
- listStr, err := s.dao.GetWhiteList(ctx, ruleId)
- if err != nil {
- log.Error("[genRecListJob]GetWhiteList_err:%+v", err)
- return nil
- }
- listStrArr := strings.Split(listStr, ",")
- listIntArr := make([]int64, 0)
- for _, roomIdStr := range listStrArr {
- roomIdInt, _ := strconv.ParseInt(roomIdStr, 10, 64)
- listIntArr = append(listIntArr, roomIdInt)
- }
- if len(listIntArr) > 0 {
- ids := s.setRecInfoCache(ctx, listIntArr)
- key, expire := s.getRecPoolKey(ruleId)
- s.dao.SetRecPoolCache(ctx, key, xstr.JoinInts(ids), expire)
- }
- return nil
- })
- }
- // 条件规则
- if rule.ConfType == _condType {
- ruleId := rule.Id
- ruleStr := rule.Rules
- wg.Go(func() error {
- condConf, err := s.parseCondConf(ruleStr)
- if err != nil {
- log.Error("[genRecListJob]parseCondConf_err:%+v", err)
- return nil
- }
- if len(condConf.Condition) == 0 {
- log.Error("[genRecListJob]parseCondConf_empty")
- return nil
- }
- roomIds := s.genCondConfRoomList(ctx, condConf.Condition, condConf.Cond, ruleId)
- if len(roomIds) > 0 {
- ids := s.setRecInfoCache(ctx, roomIds)
- key, expire := s.getRecPoolKey(ruleId)
- s.dao.SetRecPoolCache(ctx, key, xstr.JoinInts(ids), expire)
- }
- return nil
- })
- }
- }
- // 超时检测
- select {
- case <-ctx.Done():
- {
- log.Info("[genRecListJob]job time out: %d", ttl)
- cancel()
- }
- }
- err = wg.Wait()
- if err != nil {
- log.Error("[genRecListJob]rule loop wait err:%+v", err)
- }
- log.Info("[genRecListJob]genRecListJob done")
- }
|