123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- package service
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "go-common/app/job/main/click/conf"
- "go-common/app/job/main/click/dao"
- "go-common/app/job/main/click/model"
- "go-common/app/service/main/archive/api"
- arcrpc "go-common/app/service/main/archive/api/gorpc"
- arcmdl "go-common/app/service/main/archive/model/archive"
- "go-common/library/cache/redis"
- "go-common/library/log"
- "go-common/library/log/infoc"
- "go-common/library/queue/databus"
- )
- const (
- _unLock = 0
- _locked = 1
- )
- // Service struct
- type Service struct {
- c *conf.Config
- db *dao.Dao
- // archive
- reportMergeSub *databus.Databus
- statViewPub *databus.Databus
- chanWg sync.WaitGroup
- redis *redis.Pool
- cliChan []chan *model.ClickMsg
- closed bool
- maxAID int64
- gotMaxAIDTime int64
- lockedMap []int64
- currentLockedIdx int64
- // aid%50[aid[plat[cnt]]]
- aidMap []map[int64]*model.ClickInfo
- // send databus chan
- busChan chan *model.StatMsg
- bigDataChan chan *model.BigDataMsg
- // forbid cache
- forbids map[int64]map[int8]*model.Forbid
- forbidMids map[int64]struct{}
- // epid to aid map
- eTam map[int64]int64
- etamMutex sync.RWMutex
- infoc2 *infoc.Infoc
- arcRPC *arcrpc.Service2
- arcDurWithMutex struct {
- Durations map[int64]*model.ArcDuration
- Mutex sync.RWMutex
- }
- allowPlat map[int8]struct{}
- bnjListAidMap map[int64]struct{}
- }
- // New is archive service implementation.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- arcRPC: arcrpc.New2(c.ArchiveRPC),
- redis: redis.NewPool(c.Redis),
- db: dao.New(c),
- busChan: make(chan *model.StatMsg, 10240),
- bigDataChan: make(chan *model.BigDataMsg, 10240),
- reportMergeSub: databus.New(c.ReportMergeDatabus),
- statViewPub: databus.New(c.StatViewPub),
- infoc2: infoc.New(c.Infoc2),
- allowPlat: make(map[int8]struct{}),
- }
- s.allowPlat[model.PlatForWeb] = struct{}{}
- s.allowPlat[model.PlatForH5] = struct{}{}
- s.allowPlat[model.PlatForOuter] = struct{}{}
- s.allowPlat[model.PlatForIos] = struct{}{}
- s.allowPlat[model.PlatForAndroid] = struct{}{}
- s.allowPlat[model.PlatForAndroidTV] = struct{}{}
- s.allowPlat[model.PlatForAutoPlayIOS] = struct{}{}
- s.allowPlat[model.PlafForAutoPlayInlineIOS] = struct{}{}
- s.allowPlat[model.PlatForAutoPlayAndroid] = struct{}{}
- s.allowPlat[model.PlatForAutoPlayInlineAndroid] = struct{}{}
- s.arcDurWithMutex.Durations = make(map[int64]*model.ArcDuration)
- s.loadConf()
- go s.confproc()
- go s.releaseAIDMap()
- for i := int64(0); i < s.c.ChanNum; i++ {
- s.aidMap = append(s.aidMap, make(map[int64]*model.ClickInfo, 300000))
- s.cliChan = append(s.cliChan, make(chan *model.ClickMsg, 256))
- s.lockedMap = append(s.lockedMap, _unLock)
- }
- for i := int64(0); i < s.c.ChanNum; i++ {
- s.chanWg.Add(1)
- go s.cliChanProc(i)
- }
- for i := 0; i < 10; i++ {
- s.chanWg.Add(1)
- go s.sendStat()
- }
- s.chanWg.Add(1)
- go s.sendBigDataMsg()
- s.chanWg.Add(1)
- go s.reportMergeSubConsumer()
- return s
- }
- func (s *Service) reportMergeSubConsumer() {
- defer s.chanWg.Done()
- msgs := s.reportMergeSub.Messages()
- for {
- msg, ok := <-msgs
- if !ok || s.closed {
- log.Info("s.reportMergeSub is closed")
- return
- }
- msg.Commit()
- var (
- sbs [][]byte
- err error
- )
- if err = json.Unmarshal(msg.Value, &sbs); err != nil {
- log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err)
- continue
- }
- for _, bs := range sbs {
- var (
- click *model.ClickMsg
- allow bool
- now = time.Now().Unix()
- )
- log.Info("split merged message(%s)", strings.Replace(string(bs), "\001", "|", -1))
- if click, err = s.checkMsgIllegal(bs); err != nil {
- log.Error("s.checkMsgIllegal(%s) error(%v)", strings.Replace(string(bs), "\001", "|", -1), err)
- continue
- }
- if s.maxAID > 0 && now-s.gotMaxAIDTime < 120 {
- allow = s.maxAID+300 > click.AID
- }
- if !allow {
- log.Error("maxAid(%d) currentAid(%d) not allow!!!!", s.maxAID, click.AID)
- continue
- }
- log.Info("merge consumer(%d) append to chan", click.AID)
- s.cliChan[click.AID%s.c.ChanNum] <- click
- }
- }
- }
- func (s *Service) loadConf() {
- var (
- forbids map[int64]map[int8]*model.Forbid
- bnjListAids = make(map[int64]struct{})
- forbidMids map[int64]struct{}
- etam map[int64]int64
- maxAID int64
- err error
- )
- for _, aid := range s.c.BnjListAids {
- bnjListAids[aid] = struct{}{}
- }
- s.bnjListAidMap = bnjListAids
- if forbidMids, err = s.db.ForbidMids(context.Background()); err == nil {
- s.forbidMids = forbidMids
- log.Info("forbid mids(%d)", len(forbidMids))
- }
- if forbids, err = s.db.Forbids(context.TODO()); err == nil {
- s.forbids = forbids
- log.Info("forbid av(%d)", len(forbids))
- }
- if maxAID, err = s.db.MaxAID(context.TODO()); err == nil {
- s.maxAID = maxAID
- s.gotMaxAIDTime = time.Now().Unix()
- }
- if etam, err = s.db.LoadAllBangumi(context.TODO()); err == nil {
- s.etamMutex.Lock()
- s.eTam = etam
- s.etamMutex.Unlock()
- }
- }
- func (s *Service) releaseAIDMap() {
- for {
- time.Sleep(5 * time.Minute)
- now := time.Now()
- if (now.Hour() > 1 && now.Hour() < 6) || (now.Hour() == 6 && now.Minute() < 30) { // 2:00 to 6:30
- if s.currentLockedIdx < int64(len(s.aidMap)) {
- atomic.StoreInt64(&s.lockedMap[s.currentLockedIdx], _locked)
- }
- s.currentLockedIdx++
- continue
- }
- s.currentLockedIdx = 0
- }
- }
- func (s *Service) confproc() {
- for {
- time.Sleep(1 * time.Minute)
- s.loadConf()
- }
- }
- func (s *Service) sendBigDataMsg() {
- defer s.chanWg.Done()
- for {
- var (
- msg *model.BigDataMsg
- msgBs []byte
- ok bool
- err error
- infos []interface{}
- )
- if msg, ok = <-s.bigDataChan; !ok {
- break
- }
- infos = append(infos, strconv.FormatInt(int64(msg.Tp), 10))
- for _, v := range strings.Split(msg.Info, "\001") {
- infos = append(infos, v)
- }
- log.Info("truly used %+v", infos)
- if err = s.infoc2.Info(infos...); err != nil {
- log.Error("s.infoc2.Info(%s) error(%v)", msgBs, err)
- continue
- }
- }
- }
- func (s *Service) sendStat() {
- defer s.chanWg.Done()
- for {
- var (
- msg *model.StatMsg
- ok bool
- c = context.TODO()
- err error
- key string
- )
- if msg, ok = <-s.busChan; !ok {
- break
- }
- key = strconv.FormatInt(msg.AID, 10)
- vmsg := &model.StatViewMsg{Type: "archive", ID: msg.AID, Count: msg.Click, Ts: time.Now().Unix()}
- if err = s.statViewPub.Send(c, key, vmsg); err != nil {
- log.Error("s.statViewPub.Send(%d, %+v) error(%v)", msg.AID, vmsg, err)
- }
- }
- }
- func (s *Service) cliChanProc(i int64) {
- defer s.chanWg.Done()
- var (
- cli *model.ClickMsg
- cliChan = s.cliChan[i]
- ok bool
- )
- for {
- if cli, ok = <-cliChan; !ok {
- s.countClick(context.TODO(), nil, i)
- return
- }
- var (
- rtype int8
- err error
- c = context.TODO()
- )
- if rtype, err = s.isAllow(c, cli); err != nil {
- log.Error("cliChanProc Err %v", err)
- }
- select {
- case s.bigDataChan <- &model.BigDataMsg{Info: string(cli.KafkaBs), Tp: rtype}:
- default:
- log.Error("s.bigDataChan is full")
- }
- if rtype == model.LogTypeForTurly {
- s.countClick(context.TODO(), cli, i)
- }
- }
- }
- func (s *Service) checkMsgIllegal(msg []byte) (click *model.ClickMsg, err error) {
- var (
- aid int64
- clickMsg []string
- plat int64
- did string
- buvid string
- mid int64
- lv int64
- ctime int64
- stime int64
- epid int64
- ip string
- seasonType int
- userAgent string
- )
- clickMsg = strings.Split(string(msg), "\001")
- if len(clickMsg) < 10 {
- err = errors.New("click msg error")
- return
- }
- if aid, err = strconv.ParseInt(clickMsg[1], 10, 64); err != nil {
- err = fmt.Errorf("aid(%s) error", clickMsg[1])
- return
- }
- if aid <= 0 {
- err = fmt.Errorf("wocao aid(%s) error", clickMsg[1])
- return
- }
- if plat, err = strconv.ParseInt(clickMsg[0], 10, 64); err != nil {
- err = fmt.Errorf("plat(%s) error", clickMsg[0])
- return
- }
- if _, ok := s.allowPlat[int8(plat)]; !ok {
- err = fmt.Errorf("plat(%d) is illegal", plat)
- return
- }
- userAgent = clickMsg[10]
- did = clickMsg[8]
- if did == "" {
- err = fmt.Errorf("bvID(%s) is illegal", clickMsg[8])
- return
- }
- buvid = clickMsg[11]
- if clickMsg[4] != "" && clickMsg[4] != "0" {
- if mid, err = strconv.ParseInt(clickMsg[4], 10, 64); err != nil {
- err = fmt.Errorf("mid(%s) is illegal", clickMsg[4])
- return
- }
- }
- if clickMsg[5] != "" {
- if lv, err = strconv.ParseInt(clickMsg[5], 10, 64); err != nil {
- err = fmt.Errorf("lv(%s) is illegal", clickMsg[5])
- return
- }
- }
- if ctime, err = strconv.ParseInt(clickMsg[6], 10, 64); err != nil {
- err = fmt.Errorf("ctime(%s) is illegal", clickMsg[6])
- return
- }
- if stime, err = strconv.ParseInt(clickMsg[7], 10, 64); err != nil {
- err = fmt.Errorf("stime(%s) is illegal", clickMsg[7])
- return
- }
- if ip = clickMsg[9]; ip == "" {
- err = errors.New("ip is illegal")
- return
- }
- if clickMsg[17] != "" {
- if epid, err = strconv.ParseInt(clickMsg[17], 10, 64); err != nil {
- err = fmt.Errorf("epid(%s) is illegal", clickMsg[17])
- return
- }
- if clickMsg[15] != "null" {
- if seasonType, err = strconv.Atoi(clickMsg[15]); err != nil {
- err = fmt.Errorf("seasonType(%s) is illegal", clickMsg[15])
- return
- }
- }
- }
- if strings.Contains(userAgent, "(auto_play)") ||
- strings.Contains(userAgent, "(inline_play_heartbeat)") ||
- strings.Contains(userAgent, "(inline_play_to_view)") || // to remove auto_play & inline_play_heartbeat
- strings.Contains(userAgent, "(played_time_enough)") {
- if did, err = s.getRealDid(context.TODO(), buvid, aid); err != nil || did == "" {
- err = fmt.Errorf("bvid(%s) dont have did", buvid)
- return
- }
- did = buvid
- msg = []byte(strings.Replace(string(msg), buvid, did, 1))
- }
- click = &model.ClickMsg{
- Plat: int8(plat),
- AID: aid,
- MID: mid,
- Lv: int8(lv),
- CTime: ctime,
- STime: stime,
- Did: did,
- Buvid: buvid,
- IP: ip,
- KafkaBs: msg,
- EpID: epid,
- SeasonType: seasonType,
- UserAgent: userAgent,
- }
- return
- }
- // ArcDuration return archive duration, manager local cache
- func (s *Service) ArcDuration(c context.Context, aid int64) (duration int64) {
- var (
- ok bool
- arcDur *model.ArcDuration
- now = time.Now().Unix()
- err error
- )
- // duration default
- duration = s.c.CacheConf.PGCReplayTime
- s.arcDurWithMutex.Mutex.RLock()
- arcDur, ok = s.arcDurWithMutex.Durations[aid]
- s.arcDurWithMutex.Mutex.RUnlock()
- if ok && now-arcDur.GotTime > s.c.CacheConf.ArcUpCacheTime {
- duration = arcDur.Duration
- return
- }
- var arc *api.Arc
- if arc, err = s.arcRPC.Archive3(c, &arcmdl.ArgAid2{Aid: aid}); err != nil {
- // just log
- log.Error("s.arcRPC.Archive3(%d) error(%v)", aid, err)
- } else {
- duration = arc.Duration
- }
- s.arcDurWithMutex.Mutex.Lock()
- s.arcDurWithMutex.Durations[aid] = &model.ArcDuration{Duration: duration, GotTime: now}
- s.arcDurWithMutex.Mutex.Unlock()
- return
- }
- // Close kafaka consumer close.
- func (s *Service) Close() (err error) {
- s.closed = true
- time.Sleep(time.Second)
- for i := 0; i < len(s.cliChan); i++ {
- close(s.cliChan[i])
- }
- close(s.bigDataChan)
- s.chanWg.Wait()
- s.statViewPub.Close()
- return
- }
|