service.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "go-common/app/job/main/click/conf"
  13. "go-common/app/job/main/click/dao"
  14. "go-common/app/job/main/click/model"
  15. "go-common/app/service/main/archive/api"
  16. arcrpc "go-common/app/service/main/archive/api/gorpc"
  17. arcmdl "go-common/app/service/main/archive/model/archive"
  18. "go-common/library/cache/redis"
  19. "go-common/library/log"
  20. "go-common/library/log/infoc"
  21. "go-common/library/queue/databus"
  22. )
  23. const (
  24. _unLock = 0
  25. _locked = 1
  26. )
  27. // Service struct
  28. type Service struct {
  29. c *conf.Config
  30. db *dao.Dao
  31. // archive
  32. reportMergeSub *databus.Databus
  33. statViewPub *databus.Databus
  34. chanWg sync.WaitGroup
  35. redis *redis.Pool
  36. cliChan []chan *model.ClickMsg
  37. closed bool
  38. maxAID int64
  39. gotMaxAIDTime int64
  40. lockedMap []int64
  41. currentLockedIdx int64
  42. // aid%50[aid[plat[cnt]]]
  43. aidMap []map[int64]*model.ClickInfo
  44. // send databus chan
  45. busChan chan *model.StatMsg
  46. bigDataChan chan *model.BigDataMsg
  47. // forbid cache
  48. forbids map[int64]map[int8]*model.Forbid
  49. forbidMids map[int64]struct{}
  50. // epid to aid map
  51. eTam map[int64]int64
  52. etamMutex sync.RWMutex
  53. infoc2 *infoc.Infoc
  54. arcRPC *arcrpc.Service2
  55. arcDurWithMutex struct {
  56. Durations map[int64]*model.ArcDuration
  57. Mutex sync.RWMutex
  58. }
  59. allowPlat map[int8]struct{}
  60. bnjListAidMap map[int64]struct{}
  61. }
  62. // New is archive service implementation.
  63. func New(c *conf.Config) (s *Service) {
  64. s = &Service{
  65. c: c,
  66. arcRPC: arcrpc.New2(c.ArchiveRPC),
  67. redis: redis.NewPool(c.Redis),
  68. db: dao.New(c),
  69. busChan: make(chan *model.StatMsg, 10240),
  70. bigDataChan: make(chan *model.BigDataMsg, 10240),
  71. reportMergeSub: databus.New(c.ReportMergeDatabus),
  72. statViewPub: databus.New(c.StatViewPub),
  73. infoc2: infoc.New(c.Infoc2),
  74. allowPlat: make(map[int8]struct{}),
  75. }
  76. s.allowPlat[model.PlatForWeb] = struct{}{}
  77. s.allowPlat[model.PlatForH5] = struct{}{}
  78. s.allowPlat[model.PlatForOuter] = struct{}{}
  79. s.allowPlat[model.PlatForIos] = struct{}{}
  80. s.allowPlat[model.PlatForAndroid] = struct{}{}
  81. s.allowPlat[model.PlatForAndroidTV] = struct{}{}
  82. s.allowPlat[model.PlatForAutoPlayIOS] = struct{}{}
  83. s.allowPlat[model.PlafForAutoPlayInlineIOS] = struct{}{}
  84. s.allowPlat[model.PlatForAutoPlayAndroid] = struct{}{}
  85. s.allowPlat[model.PlatForAutoPlayInlineAndroid] = struct{}{}
  86. s.arcDurWithMutex.Durations = make(map[int64]*model.ArcDuration)
  87. s.loadConf()
  88. go s.confproc()
  89. go s.releaseAIDMap()
  90. for i := int64(0); i < s.c.ChanNum; i++ {
  91. s.aidMap = append(s.aidMap, make(map[int64]*model.ClickInfo, 300000))
  92. s.cliChan = append(s.cliChan, make(chan *model.ClickMsg, 256))
  93. s.lockedMap = append(s.lockedMap, _unLock)
  94. }
  95. for i := int64(0); i < s.c.ChanNum; i++ {
  96. s.chanWg.Add(1)
  97. go s.cliChanProc(i)
  98. }
  99. for i := 0; i < 10; i++ {
  100. s.chanWg.Add(1)
  101. go s.sendStat()
  102. }
  103. s.chanWg.Add(1)
  104. go s.sendBigDataMsg()
  105. s.chanWg.Add(1)
  106. go s.reportMergeSubConsumer()
  107. return s
  108. }
  109. func (s *Service) reportMergeSubConsumer() {
  110. defer s.chanWg.Done()
  111. msgs := s.reportMergeSub.Messages()
  112. for {
  113. msg, ok := <-msgs
  114. if !ok || s.closed {
  115. log.Info("s.reportMergeSub is closed")
  116. return
  117. }
  118. msg.Commit()
  119. var (
  120. sbs [][]byte
  121. err error
  122. )
  123. if err = json.Unmarshal(msg.Value, &sbs); err != nil {
  124. log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err)
  125. continue
  126. }
  127. for _, bs := range sbs {
  128. var (
  129. click *model.ClickMsg
  130. allow bool
  131. now = time.Now().Unix()
  132. )
  133. log.Info("split merged message(%s)", strings.Replace(string(bs), "\001", "|", -1))
  134. if click, err = s.checkMsgIllegal(bs); err != nil {
  135. log.Error("s.checkMsgIllegal(%s) error(%v)", strings.Replace(string(bs), "\001", "|", -1), err)
  136. continue
  137. }
  138. if s.maxAID > 0 && now-s.gotMaxAIDTime < 120 {
  139. allow = s.maxAID+300 > click.AID
  140. }
  141. if !allow {
  142. log.Error("maxAid(%d) currentAid(%d) not allow!!!!", s.maxAID, click.AID)
  143. continue
  144. }
  145. log.Info("merge consumer(%d) append to chan", click.AID)
  146. s.cliChan[click.AID%s.c.ChanNum] <- click
  147. }
  148. }
  149. }
  150. func (s *Service) loadConf() {
  151. var (
  152. forbids map[int64]map[int8]*model.Forbid
  153. bnjListAids = make(map[int64]struct{})
  154. forbidMids map[int64]struct{}
  155. etam map[int64]int64
  156. maxAID int64
  157. err error
  158. )
  159. for _, aid := range s.c.BnjListAids {
  160. bnjListAids[aid] = struct{}{}
  161. }
  162. s.bnjListAidMap = bnjListAids
  163. if forbidMids, err = s.db.ForbidMids(context.Background()); err == nil {
  164. s.forbidMids = forbidMids
  165. log.Info("forbid mids(%d)", len(forbidMids))
  166. }
  167. if forbids, err = s.db.Forbids(context.TODO()); err == nil {
  168. s.forbids = forbids
  169. log.Info("forbid av(%d)", len(forbids))
  170. }
  171. if maxAID, err = s.db.MaxAID(context.TODO()); err == nil {
  172. s.maxAID = maxAID
  173. s.gotMaxAIDTime = time.Now().Unix()
  174. }
  175. if etam, err = s.db.LoadAllBangumi(context.TODO()); err == nil {
  176. s.etamMutex.Lock()
  177. s.eTam = etam
  178. s.etamMutex.Unlock()
  179. }
  180. }
  181. func (s *Service) releaseAIDMap() {
  182. for {
  183. time.Sleep(5 * time.Minute)
  184. now := time.Now()
  185. if (now.Hour() > 1 && now.Hour() < 6) || (now.Hour() == 6 && now.Minute() < 30) { // 2:00 to 6:30
  186. if s.currentLockedIdx < int64(len(s.aidMap)) {
  187. atomic.StoreInt64(&s.lockedMap[s.currentLockedIdx], _locked)
  188. }
  189. s.currentLockedIdx++
  190. continue
  191. }
  192. s.currentLockedIdx = 0
  193. }
  194. }
  195. func (s *Service) confproc() {
  196. for {
  197. time.Sleep(1 * time.Minute)
  198. s.loadConf()
  199. }
  200. }
  201. func (s *Service) sendBigDataMsg() {
  202. defer s.chanWg.Done()
  203. for {
  204. var (
  205. msg *model.BigDataMsg
  206. msgBs []byte
  207. ok bool
  208. err error
  209. infos []interface{}
  210. )
  211. if msg, ok = <-s.bigDataChan; !ok {
  212. break
  213. }
  214. infos = append(infos, strconv.FormatInt(int64(msg.Tp), 10))
  215. for _, v := range strings.Split(msg.Info, "\001") {
  216. infos = append(infos, v)
  217. }
  218. log.Info("truly used %+v", infos)
  219. if err = s.infoc2.Info(infos...); err != nil {
  220. log.Error("s.infoc2.Info(%s) error(%v)", msgBs, err)
  221. continue
  222. }
  223. }
  224. }
  225. func (s *Service) sendStat() {
  226. defer s.chanWg.Done()
  227. for {
  228. var (
  229. msg *model.StatMsg
  230. ok bool
  231. c = context.TODO()
  232. err error
  233. key string
  234. )
  235. if msg, ok = <-s.busChan; !ok {
  236. break
  237. }
  238. key = strconv.FormatInt(msg.AID, 10)
  239. vmsg := &model.StatViewMsg{Type: "archive", ID: msg.AID, Count: msg.Click, Ts: time.Now().Unix()}
  240. if err = s.statViewPub.Send(c, key, vmsg); err != nil {
  241. log.Error("s.statViewPub.Send(%d, %+v) error(%v)", msg.AID, vmsg, err)
  242. }
  243. }
  244. }
  245. func (s *Service) cliChanProc(i int64) {
  246. defer s.chanWg.Done()
  247. var (
  248. cli *model.ClickMsg
  249. cliChan = s.cliChan[i]
  250. ok bool
  251. )
  252. for {
  253. if cli, ok = <-cliChan; !ok {
  254. s.countClick(context.TODO(), nil, i)
  255. return
  256. }
  257. var (
  258. rtype int8
  259. err error
  260. c = context.TODO()
  261. )
  262. if rtype, err = s.isAllow(c, cli); err != nil {
  263. log.Error("cliChanProc Err %v", err)
  264. }
  265. select {
  266. case s.bigDataChan <- &model.BigDataMsg{Info: string(cli.KafkaBs), Tp: rtype}:
  267. default:
  268. log.Error("s.bigDataChan is full")
  269. }
  270. if rtype == model.LogTypeForTurly {
  271. s.countClick(context.TODO(), cli, i)
  272. }
  273. }
  274. }
  275. func (s *Service) checkMsgIllegal(msg []byte) (click *model.ClickMsg, err error) {
  276. var (
  277. aid int64
  278. clickMsg []string
  279. plat int64
  280. did string
  281. buvid string
  282. mid int64
  283. lv int64
  284. ctime int64
  285. stime int64
  286. epid int64
  287. ip string
  288. seasonType int
  289. userAgent string
  290. )
  291. clickMsg = strings.Split(string(msg), "\001")
  292. if len(clickMsg) < 10 {
  293. err = errors.New("click msg error")
  294. return
  295. }
  296. if aid, err = strconv.ParseInt(clickMsg[1], 10, 64); err != nil {
  297. err = fmt.Errorf("aid(%s) error", clickMsg[1])
  298. return
  299. }
  300. if aid <= 0 {
  301. err = fmt.Errorf("wocao aid(%s) error", clickMsg[1])
  302. return
  303. }
  304. if plat, err = strconv.ParseInt(clickMsg[0], 10, 64); err != nil {
  305. err = fmt.Errorf("plat(%s) error", clickMsg[0])
  306. return
  307. }
  308. if _, ok := s.allowPlat[int8(plat)]; !ok {
  309. err = fmt.Errorf("plat(%d) is illegal", plat)
  310. return
  311. }
  312. userAgent = clickMsg[10]
  313. did = clickMsg[8]
  314. if did == "" {
  315. err = fmt.Errorf("bvID(%s) is illegal", clickMsg[8])
  316. return
  317. }
  318. buvid = clickMsg[11]
  319. if clickMsg[4] != "" && clickMsg[4] != "0" {
  320. if mid, err = strconv.ParseInt(clickMsg[4], 10, 64); err != nil {
  321. err = fmt.Errorf("mid(%s) is illegal", clickMsg[4])
  322. return
  323. }
  324. }
  325. if clickMsg[5] != "" {
  326. if lv, err = strconv.ParseInt(clickMsg[5], 10, 64); err != nil {
  327. err = fmt.Errorf("lv(%s) is illegal", clickMsg[5])
  328. return
  329. }
  330. }
  331. if ctime, err = strconv.ParseInt(clickMsg[6], 10, 64); err != nil {
  332. err = fmt.Errorf("ctime(%s) is illegal", clickMsg[6])
  333. return
  334. }
  335. if stime, err = strconv.ParseInt(clickMsg[7], 10, 64); err != nil {
  336. err = fmt.Errorf("stime(%s) is illegal", clickMsg[7])
  337. return
  338. }
  339. if ip = clickMsg[9]; ip == "" {
  340. err = errors.New("ip is illegal")
  341. return
  342. }
  343. if clickMsg[17] != "" {
  344. if epid, err = strconv.ParseInt(clickMsg[17], 10, 64); err != nil {
  345. err = fmt.Errorf("epid(%s) is illegal", clickMsg[17])
  346. return
  347. }
  348. if clickMsg[15] != "null" {
  349. if seasonType, err = strconv.Atoi(clickMsg[15]); err != nil {
  350. err = fmt.Errorf("seasonType(%s) is illegal", clickMsg[15])
  351. return
  352. }
  353. }
  354. }
  355. if strings.Contains(userAgent, "(auto_play)") ||
  356. strings.Contains(userAgent, "(inline_play_heartbeat)") ||
  357. strings.Contains(userAgent, "(inline_play_to_view)") || // to remove auto_play & inline_play_heartbeat
  358. strings.Contains(userAgent, "(played_time_enough)") {
  359. if did, err = s.getRealDid(context.TODO(), buvid, aid); err != nil || did == "" {
  360. err = fmt.Errorf("bvid(%s) dont have did", buvid)
  361. return
  362. }
  363. did = buvid
  364. msg = []byte(strings.Replace(string(msg), buvid, did, 1))
  365. }
  366. click = &model.ClickMsg{
  367. Plat: int8(plat),
  368. AID: aid,
  369. MID: mid,
  370. Lv: int8(lv),
  371. CTime: ctime,
  372. STime: stime,
  373. Did: did,
  374. Buvid: buvid,
  375. IP: ip,
  376. KafkaBs: msg,
  377. EpID: epid,
  378. SeasonType: seasonType,
  379. UserAgent: userAgent,
  380. }
  381. return
  382. }
  383. // ArcDuration return archive duration, manager local cache
  384. func (s *Service) ArcDuration(c context.Context, aid int64) (duration int64) {
  385. var (
  386. ok bool
  387. arcDur *model.ArcDuration
  388. now = time.Now().Unix()
  389. err error
  390. )
  391. // duration default
  392. duration = s.c.CacheConf.PGCReplayTime
  393. s.arcDurWithMutex.Mutex.RLock()
  394. arcDur, ok = s.arcDurWithMutex.Durations[aid]
  395. s.arcDurWithMutex.Mutex.RUnlock()
  396. if ok && now-arcDur.GotTime > s.c.CacheConf.ArcUpCacheTime {
  397. duration = arcDur.Duration
  398. return
  399. }
  400. var arc *api.Arc
  401. if arc, err = s.arcRPC.Archive3(c, &arcmdl.ArgAid2{Aid: aid}); err != nil {
  402. // just log
  403. log.Error("s.arcRPC.Archive3(%d) error(%v)", aid, err)
  404. } else {
  405. duration = arc.Duration
  406. }
  407. s.arcDurWithMutex.Mutex.Lock()
  408. s.arcDurWithMutex.Durations[aid] = &model.ArcDuration{Duration: duration, GotTime: now}
  409. s.arcDurWithMutex.Mutex.Unlock()
  410. return
  411. }
  412. // Close kafaka consumer close.
  413. func (s *Service) Close() (err error) {
  414. s.closed = true
  415. time.Sleep(time.Second)
  416. for i := 0; i < len(s.cliChan); i++ {
  417. close(s.cliChan[i])
  418. }
  419. close(s.bigDataChan)
  420. s.chanWg.Wait()
  421. s.statViewPub.Close()
  422. return
  423. }