databus.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "go-common/app/job/main/creative/model"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _archive = "archive"
  13. _insert = "insert"
  14. _update = "update"
  15. fromArchiveUp = 0
  16. fromArchiveNewUp = 1
  17. _relationMidTable = "user_relation_mid_"
  18. _relationStatTable = "user_relation_stat_"
  19. )
  20. //pub up auth msg.
  21. func (s *Service) pub(mid int64, from, isAuthor int) (err error) {
  22. c := context.TODO()
  23. msg := &model.Msg{
  24. MID: mid,
  25. From: from,
  26. IsAuthor: isAuthor,
  27. TimeStamp: time.Now().Unix(),
  28. }
  29. if err = s.upPub.Send(c, strconv.FormatInt(mid, 10), msg); err != nil {
  30. log.Error("pub mid(%d) error(%v)", mid, err)
  31. }
  32. return
  33. }
  34. func (s *Service) arcNotifyCanalConsume() {
  35. var err error
  36. for msg := range s.arcNotifySub.Messages() {
  37. msg.Commit()
  38. s.arcNotifyMo++
  39. m := &model.CanalMsg{}
  40. if err = json.Unmarshal(msg.Value, m); err != nil {
  41. log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  42. continue
  43. }
  44. if m.Table == _archive {
  45. s.arcNotifyMsg(m)
  46. log.Info("arcNotifyCanalConsume key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  47. }
  48. }
  49. s.wg.Done()
  50. }
  51. func (s *Service) arcNotifyMsg(m *model.CanalMsg) {
  52. var (
  53. err error
  54. newArc = &model.Archive{}
  55. oldArc = &model.Archive{}
  56. )
  57. if err = json.Unmarshal(m.New, newArc); err != nil {
  58. log.Error("arcNotifyMsg newMsg json.Unmarshal(%s) error(%v)", string(m.New), err)
  59. return
  60. }
  61. mid := newArc.MID
  62. if mid <= 0 {
  63. log.Error("arcNotifyMsg mid (%d) error", mid)
  64. return
  65. }
  66. if m.Action == _insert && newArc.State >= 0 { //0->1
  67. s.pub(mid, fromArchiveUp, 1)
  68. } else if m.Action == _update {
  69. if err = json.Unmarshal(m.Old, oldArc); err != nil {
  70. log.Error("arcNotifyMsg oldMsg json.Unmarshal(%s) error(%v)", string(m.Old), err)
  71. return
  72. }
  73. if oldArc.State < 0 && (newArc.State >= 0 || newArc.State == -6) { //0->1
  74. s.pub(mid, fromArchiveUp, 1)
  75. }
  76. if (oldArc.State >= 0 || oldArc.State == -6) && newArc.State < 0 { //1->0
  77. cnt, err := s.arc.UpCount(context.Background(), mid)
  78. if err != nil {
  79. log.Error("arcNotifyMsg s.arc.UpCount(%d) error(%v)", mid, err)
  80. return
  81. }
  82. if cnt <= 0 {
  83. s.pub(mid, fromArchiveUp, 0)
  84. }
  85. }
  86. }
  87. }
  88. func (s *Service) arcCanalConsume() {
  89. var err error
  90. for msg := range s.arcSub.Messages() {
  91. msg.Commit()
  92. s.arcMo++
  93. m := &model.CanalMsg{}
  94. if err = json.Unmarshal(msg.Value, m); err != nil {
  95. log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  96. continue
  97. }
  98. if m.Table == _archive && m.Action == _insert {
  99. arc := &model.Archive{}
  100. if err = json.Unmarshal(m.New, arc); err != nil {
  101. log.Error("creative-job binglog newMsg json.Unmarshal(%s) error(%v)", m.New, err)
  102. continue
  103. }
  104. if arc.MID > 0 {
  105. s.pub(arc.MID, fromArchiveNewUp, 1)
  106. }
  107. log.Info("arcCanalConsume key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  108. }
  109. }
  110. s.wg.Done()
  111. }
  112. func (s *Service) task() {
  113. for msg := range s.taskSub.Messages() {
  114. msg.Commit()
  115. log.Info("databus task key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  116. s.taskSubQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- msg
  117. }
  118. for _, c := range s.taskSubQueue {
  119. close(c)
  120. }
  121. s.wg.Done()
  122. }
  123. func (s *Service) share() {
  124. for msg := range s.shareSub.Messages() {
  125. msg.Commit()
  126. m := &model.ShareMsg{}
  127. if err := json.Unmarshal(msg.Value, m); err != nil {
  128. log.Error("databus share json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  129. continue
  130. }
  131. if m.TP != 3 {
  132. continue
  133. }
  134. mid, authorMID := m.MID, s.getMIDByAID(m.OID)
  135. if mid != authorMID { //不是该用户分享的稿件则不做任何处理
  136. log.Warn("s.arc.Archive mid(%d)|author mid(%d)", mid, authorMID)
  137. continue
  138. }
  139. log.Info("databus share key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  140. s.shareSubQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- m
  141. }
  142. for _, c := range s.shareSubQueue {
  143. close(c)
  144. }
  145. s.wg.Done()
  146. }
  147. func (s *Service) relation() {
  148. var err error
  149. newF, advancedF := s.c.Task.NewFollower, s.c.Task.AdvancedFollower
  150. for msg := range s.relationSub.Messages() {
  151. msg.Commit()
  152. rl := &model.RelaMessage{}
  153. if err = json.Unmarshal(msg.Value, rl); err != nil {
  154. log.Error("databus relation json.Unmarshal (%v) error(%v)", msg.Value, err)
  155. continue
  156. }
  157. if !strings.HasPrefix(rl.Table, _relationStatTable) && !strings.HasPrefix(rl.Table, _relationMidTable) {
  158. continue
  159. }
  160. if strings.HasPrefix(rl.Table, _relationStatTable) {
  161. ost := &model.Stat{}
  162. st := &model.Stat{}
  163. if rl.Action == "update" {
  164. if err = json.Unmarshal(rl.Old, ost); err != nil {
  165. log.Error("relation old msg json.Unmarshal(%s) error(%v)", string(rl.Old), err)
  166. continue
  167. }
  168. }
  169. if err = json.Unmarshal(rl.New, st); err != nil {
  170. log.Error("relation new msg json.Unmarshal(%s) error(%v)", string(rl.New), err)
  171. continue
  172. }
  173. isFollower := false
  174. if ost.Follower < newF && st.Follower >= newF { //新手任务粉丝数限制
  175. isFollower = true
  176. }
  177. if ost.Follower < advancedF && st.Follower >= advancedF { //进阶任务粉丝数限制
  178. isFollower = true
  179. }
  180. if isFollower {
  181. log.Info("databus relation key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  182. s.followerQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- st
  183. }
  184. }
  185. if strings.HasPrefix(rl.Table, _relationMidTable) {
  186. fl := &model.Relation{}
  187. if err = json.Unmarshal(rl.New, fl); err != nil {
  188. log.Error("json.Unmarshal(%s) error(%v)", string(rl.New), err)
  189. continue
  190. }
  191. if fl.FID != s.c.Task.BiliMID { //过滤关注哔哩哔哩创作中心
  192. continue
  193. }
  194. log.Info("databus relation key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  195. s.relationQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- fl
  196. }
  197. }
  198. for _, c := range s.relationQueue {
  199. close(c)
  200. }
  201. for _, c := range s.followerQueue {
  202. close(c)
  203. }
  204. s.wg.Done()
  205. }
  206. func (s *Service) statView() {
  207. statView, statViewUp := s.c.Task.StatView, s.c.Task.StatViewUp
  208. for msg := range s.statViewSub.Messages() {
  209. msg.Commit()
  210. m := &model.StatView{}
  211. if err := json.Unmarshal(msg.Value, m); err != nil {
  212. log.Error("databus statView json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  213. continue
  214. }
  215. if !strings.EqualFold(m.Type, "archive") {
  216. continue
  217. }
  218. if m.Count >= statView && m.Count <= statViewUp {
  219. s.statViewSubQueue[s.shardingQueueIndex(msg.Key, s.statViewQueueLen)] <- m
  220. }
  221. }
  222. for _, c := range s.statViewSubQueue {
  223. close(c)
  224. }
  225. s.wg.Done()
  226. }
  227. func (s *Service) statLike() {
  228. statLike, statLikeUp := s.c.Task.StatLike, s.c.Task.StatLikeUp
  229. for msg := range s.statLikeSub.Messages() {
  230. msg.Commit()
  231. m := &model.StatLike{}
  232. if err := json.Unmarshal(msg.Value, m); err != nil {
  233. log.Error("databus statLike json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  234. continue
  235. }
  236. if !strings.EqualFold(m.Type, "archive") {
  237. continue
  238. }
  239. if m.Count >= statLike && m.Count <= statLikeUp {
  240. s.statLikeSubQueue[s.shardingQueueIndex(msg.Key, s.statLikeQueueLen)] <- m
  241. }
  242. }
  243. for _, c := range s.statLikeSubQueue {
  244. close(c)
  245. }
  246. s.wg.Done()
  247. }
  248. func (s *Service) statShare() {
  249. statShare, statShareUp := s.c.Task.StatShare, s.c.Task.StatShareUp
  250. for msg := range s.statShareSub.Messages() {
  251. msg.Commit()
  252. m := &model.StatShare{}
  253. if err := json.Unmarshal(msg.Value, m); err != nil {
  254. log.Error("databus statShare json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  255. continue
  256. }
  257. if !strings.EqualFold(m.Type, "archive") {
  258. continue
  259. }
  260. if m.Count >= statShare && m.Count <= statShareUp {
  261. log.Info("databus statShare key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  262. s.statShareSubQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- m
  263. }
  264. }
  265. for _, c := range s.statShareSubQueue {
  266. close(c)
  267. }
  268. s.wg.Done()
  269. }
  270. func (s *Service) statCoin() {
  271. statCoin, statCoinUp := s.c.Task.StatCoin, s.c.Task.StatCoinUp
  272. for msg := range s.statCoinSub.Messages() {
  273. msg.Commit()
  274. m := &model.StatCoin{}
  275. if err := json.Unmarshal(msg.Value, m); err != nil {
  276. log.Error("databus statCoin json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  277. continue
  278. }
  279. if !strings.EqualFold(m.Type, "archive") {
  280. continue
  281. }
  282. if m.Count >= statCoin && m.Count <= statCoinUp {
  283. log.Info("databus statCoin key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  284. s.statCoinSubQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- m
  285. }
  286. }
  287. for _, c := range s.statCoinSubQueue {
  288. close(c)
  289. }
  290. s.wg.Done()
  291. }
  292. func (s *Service) statFav() {
  293. statFav, statFavUp := s.c.Task.StatFav, s.c.Task.StatFavUp
  294. for msg := range s.statFavSub.Messages() {
  295. msg.Commit()
  296. m := &model.StatFav{}
  297. if err := json.Unmarshal(msg.Value, m); err != nil {
  298. log.Error("databus statFav json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  299. continue
  300. }
  301. if !strings.EqualFold(m.Type, "archive") {
  302. continue
  303. }
  304. if m.Count >= statFav && m.Count <= statFavUp {
  305. log.Info("databus statFav key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  306. s.statFavSubQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- m
  307. }
  308. }
  309. for _, c := range s.statFavSubQueue {
  310. close(c)
  311. }
  312. s.wg.Done()
  313. }
  314. func (s *Service) statReply() {
  315. statReply, statReplyUp := s.c.Task.StatReply, s.c.Task.StatReplyUp
  316. for msg := range s.statReplySub.Messages() {
  317. msg.Commit()
  318. m := &model.StatReply{}
  319. if err := json.Unmarshal(msg.Value, m); err != nil {
  320. log.Error("databus statReply json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  321. continue
  322. }
  323. if !strings.EqualFold(m.Type, "archive") || m.Count < s.c.Task.StatReply {
  324. continue
  325. }
  326. if m.Count >= statReply && m.Count <= statReplyUp {
  327. log.Info("databus statReply key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  328. s.statReplySubQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- m
  329. }
  330. }
  331. for _, c := range s.statReplySubQueue {
  332. close(c)
  333. }
  334. s.wg.Done()
  335. }
  336. func (s *Service) statDM() {
  337. statDM, statDMUp := s.c.Task.StatDM, s.c.Task.StatDMUp
  338. for msg := range s.statDMSub.Messages() {
  339. msg.Commit()
  340. m := &model.StatDM{}
  341. if err := json.Unmarshal(msg.Value, m); err != nil {
  342. log.Error("databus statDM json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  343. continue
  344. }
  345. if !strings.EqualFold(m.Type, "archive") || m.Count < s.c.Task.StatDM {
  346. continue
  347. }
  348. if m.Count >= statDM && m.Count <= statDMUp {
  349. log.Info("databus statDM key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  350. s.statDMSubQueue[s.shardingQueueIndex(msg.Key, s.databusQueueLen)] <- m
  351. }
  352. }
  353. for _, c := range s.statDMSubQueue {
  354. close(c)
  355. }
  356. s.wg.Done()
  357. }
  358. func (s *Service) newUp() {
  359. var err error
  360. for msg := range s.newUpSub.Messages() {
  361. msg.Commit()
  362. m := &model.CanalMsg{}
  363. if err = json.Unmarshal(msg.Value, m); err != nil {
  364. log.Error("databus newUp json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  365. continue
  366. }
  367. if m.Table != _archive {
  368. continue
  369. }
  370. newArc := &model.Archive{}
  371. if err = json.Unmarshal(m.New, newArc); err != nil {
  372. log.Error("databus newUp newMsg json.Unmarshal(%s) error(%v)", string(m.New), err)
  373. continue
  374. }
  375. mid, aid := newArc.MID, newArc.AID
  376. if mid == 0 || aid == 0 {
  377. log.Error("databus newUp mid(%d) | aid(%d) error", mid, aid)
  378. continue
  379. }
  380. isUp := false
  381. if m.Action == "insert" && newArc.State >= 0 {
  382. isUp = true
  383. } else if m.Action == "update" {
  384. oldArc := &model.Archive{}
  385. if err = json.Unmarshal(m.Old, oldArc); err != nil {
  386. log.Error("newUp oldMsg json.Unmarshal(%s) error(%v)", string(m.Old), err)
  387. continue
  388. }
  389. if oldArc.State < 0 && (newArc.State >= 0 || newArc.State == -6) { //0->1
  390. isUp = true
  391. }
  392. }
  393. if isUp {
  394. av, err := s.arc.View(context.Background(), mid, aid) //获取投稿来源
  395. if err != nil {
  396. log.Error("newUp s.arc.View mid(%d) mid(%d) av(%+v) error(%v)", mid, aid, av, err)
  397. }
  398. if av != nil && av.Archive != nil && (av.Archive.UpFrom == 3 || av.Archive.UpFrom == 8 || av.Archive.UpFrom == 9) { // 3-App , 8-Android , 9-IOS
  399. log.Info("databus mobile mid(%d) aid(%d) av(%+v)", mid, aid, av)
  400. s.mobileUpQueue[s.shardingQueueIndex(strconv.FormatInt(mid, 10), s.databusQueueLen)] <- &model.Up{AID: aid, MID: mid}
  401. }
  402. cnt, err := s.arc.UpCount(context.Background(), mid)
  403. if err != nil {
  404. log.Error("newUp s.arc.UpCount(%d) error(%v)", mid, err)
  405. continue
  406. }
  407. if cnt == 1 { //新手投下自己的第一个稿件
  408. log.Info("databus newUp mid(%d) aid(%d) count(%d)", mid, aid, cnt)
  409. s.newUpQueue[s.shardingQueueIndex(strconv.FormatInt(mid, 10), s.databusQueueLen)] <- &model.Up{AID: aid, MID: mid}
  410. }
  411. if cnt >= 5 { //进阶任务视频投稿超过5个
  412. log.Info("databus oldUp mid(%d) aid(%d) count(%d)", mid, aid, cnt)
  413. s.oldUpQueue[s.shardingQueueIndex(strconv.FormatInt(mid, 10), s.databusQueueLen)] <- &model.Up{AID: aid, MID: mid}
  414. }
  415. }
  416. }
  417. for _, c := range s.newUpQueue {
  418. close(c)
  419. }
  420. for _, c := range s.oldUpQueue {
  421. close(c)
  422. }
  423. for _, c := range s.mobileUpQueue {
  424. close(c)
  425. }
  426. s.wg.Done()
  427. }