cheat.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/growup/model"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _spyArchiveCoin = 26 //稿件硬币
  13. _spyArchiveFavorite = 27 //稿件收藏
  14. _spyArchivePlay = 28 //稿件播放
  15. _spyUpFans = 29 //异常粉丝数
  16. query = "{\"select\": [{\"name\": \"id\", \"as\": \"id\"}," +
  17. "{\"name\": \"log_date\",\"as\": \"log_date\"},{\"name\": \"target_mid\",\"as\": \"target_mid\"},{\"name\": \"target_id\",\"as\": \"target_id\"}," +
  18. "{\"name\": \"event_id\",\"as\": \"event_id\"},{\"name\": \"state\",\"as\": \"state\"},{\"name\": \"type\",\"as\": \"type\"},{\"name\": \"quantity\",\"as\": \"quantity\"}," +
  19. "{\"name\": \"isdel\",\"as\": \"isdel\"}],\"where\": {\"log_date\": {\"in\": [\"%s\"]}},\"sort\": {\"id\": -1},\"page\": {\"skip\": %d,\"limit\": %d}}"
  20. )
  21. // UpdateCheatHTTP update cheat by http
  22. func (s *Service) UpdateCheatHTTP(c context.Context, date time.Time) (err error) {
  23. err = s.CheatStatistics(c, date)
  24. if err != nil {
  25. log.Error("s.UpdateSpy UpdateSpyData error(%v)", err)
  26. }
  27. return
  28. }
  29. // CheatStatistics task update cheat
  30. func (s *Service) CheatStatistics(c context.Context, date time.Time) (err error) {
  31. spies, err := s.getSpy(c, date)
  32. if err != nil {
  33. return
  34. }
  35. // first filter
  36. cs := cheats(spies)
  37. // aggregation by mid
  38. cm := aggreByMID(cs)
  39. log.Info("agreegation by mid spies:", len(cm))
  40. // aggregation by av_id
  41. am := aggreByAvID(cs)
  42. log.Info("agreegation by av_id spies:", len(cm))
  43. // get up base info
  44. upm, err := s.getUps(c, cm)
  45. if err != nil {
  46. return
  47. }
  48. // get av base info
  49. avm, err := s.getAvs(c, am, time.Now().Add(-30*24*time.Hour))
  50. if err != nil {
  51. return
  52. }
  53. pcs, err := s.playCount(c, cm)
  54. if err != nil {
  55. return
  56. }
  57. deducted, err := s.breachRecord(c)
  58. if err != nil {
  59. return
  60. }
  61. var ups []*model.Cheating
  62. for mid, cheat := range cm {
  63. if up, ok := upm[mid]; ok {
  64. cheat.Nickname = up.Nickname
  65. cheat.Fans = up.Fans
  66. cheat.SignedAt = up.SignedAt
  67. cheat.AccountState = 3
  68. cheat.PlayCount = pcs[mid]
  69. ups = append(ups, cheat)
  70. }
  71. }
  72. var avs []*model.Cheating
  73. for avID, cheat := range am {
  74. if av, ok := avm[avID]; ok {
  75. cheat.UploadTime = av.UploadTime
  76. cheat.TotalIncome = av.TotalIncome
  77. cheat.Nickname = cm[cheat.MID].Nickname
  78. if deducted[avID] {
  79. cheat.Deducted = 1
  80. } else {
  81. cheat.Deducted = 0
  82. }
  83. avs = append(avs, cheat)
  84. }
  85. }
  86. log.Info("signed cheat up count:", len(ups))
  87. err = s.batchInsertCheats(c, ups, s.batchInsertCheatUps)
  88. if err != nil {
  89. log.Error("batchInsertCheatUps error(%v)", err)
  90. return
  91. }
  92. log.Info("signed cheat av count:", len(avs))
  93. err = s.batchInsertCheats(c, avs, s.batchInsertCheatArchives)
  94. if err != nil {
  95. log.Error("batchInsertCheatArchives error(%v)", err)
  96. return
  97. }
  98. return
  99. }
  100. func (s *Service) breachRecord(c context.Context) (deducted map[int64]bool, err error) {
  101. deducted = make(map[int64]bool)
  102. var id int64
  103. for {
  104. var ds map[int64]bool
  105. id, ds, err = s.dao.AvBreachRecord(c, id, 2000)
  106. if err != nil {
  107. return
  108. }
  109. if len(ds) == 0 {
  110. break
  111. }
  112. for k, v := range ds {
  113. deducted[k] = v
  114. }
  115. }
  116. return
  117. }
  118. func (s *Service) getSpy(c context.Context, date time.Time) (spies []*model.Spy, err error) {
  119. from, limit := 0, 500
  120. var info []*model.Spy
  121. for {
  122. dateStr := date.Format("20060102")
  123. info, err = s.dp.SendSpyRequest(c, fmt.Sprintf(query, dateStr, from, limit))
  124. if err != nil {
  125. log.Error("s.getSpyData error(%v)", err)
  126. return
  127. }
  128. if len(info) == 0 {
  129. break
  130. }
  131. spies = append(spies, info...)
  132. from += len(info)
  133. }
  134. log.Info("get spy data total (%d) rows", from)
  135. return
  136. }
  137. func cheats(spies []*model.Spy) (cs []*model.Cheating) {
  138. for _, spy := range spies {
  139. c := &model.Cheating{}
  140. c.MID = spy.TargetMID
  141. switch spy.EventID {
  142. case _spyArchiveCoin:
  143. c.CheatCoin = spy.Quantity
  144. c.AvID = spy.TargetID
  145. case _spyArchiveFavorite:
  146. c.CheatFavorite = spy.Quantity
  147. c.AvID = spy.TargetID
  148. case _spyArchivePlay:
  149. c.CheatPlayCount = spy.Quantity
  150. c.AvID = spy.TargetID
  151. case _spyUpFans:
  152. c.CheatFans = spy.Quantity
  153. }
  154. cs = append(cs, c)
  155. }
  156. return
  157. }
  158. func aggreByMID(source []*model.Cheating) (cheats map[int64]*model.Cheating) {
  159. cheats = make(map[int64]*model.Cheating)
  160. for _, c := range source {
  161. if cheat, ok := cheats[c.MID]; !ok {
  162. cheats[c.MID] = &model.Cheating{
  163. MID: c.MID,
  164. CheatFans: c.CheatFans,
  165. CheatPlayCount: c.CheatPlayCount,
  166. }
  167. } else {
  168. cheat.CheatFans += c.CheatFans
  169. cheat.CheatPlayCount += c.CheatPlayCount
  170. }
  171. }
  172. return
  173. }
  174. func aggreByAvID(source []*model.Cheating) (cheats map[int64]*model.Cheating) {
  175. cheats = make(map[int64]*model.Cheating)
  176. for _, c := range source {
  177. if c.AvID == 0 {
  178. continue
  179. }
  180. if cheat, ok := cheats[c.AvID]; !ok {
  181. cheats[c.AvID] = &model.Cheating{
  182. MID: c.MID,
  183. AvID: c.AvID,
  184. CheatPlayCount: c.CheatPlayCount,
  185. CheatCoin: c.CheatCoin,
  186. CheatFavorite: c.CheatFavorite,
  187. }
  188. } else {
  189. cheat.CheatPlayCount += c.CheatPlayCount
  190. cheat.CheatCoin += c.CheatCoin
  191. cheat.CheatFavorite += c.CheatFavorite
  192. }
  193. }
  194. return
  195. }
  196. // Ups get ups in up_info_video
  197. func (s *Service) getUps(c context.Context, cheats map[int64]*model.Cheating) (ups map[int64]*model.Cheating, err error) {
  198. ups = make(map[int64]*model.Cheating)
  199. var mids []int64
  200. for mid := range cheats {
  201. mids = append(mids, mid)
  202. if len(mids) == 200 {
  203. var nc map[int64]*model.Cheating
  204. nc, err = s.dao.Ups(c, mids)
  205. if err != nil {
  206. return
  207. }
  208. for k, v := range nc {
  209. if v.IsDeleted == 0 {
  210. ups[k] = v
  211. }
  212. }
  213. mids = make([]int64, 0)
  214. time.Sleep(200 * time.Millisecond)
  215. }
  216. }
  217. if len(mids) > 0 {
  218. var nc map[int64]*model.Cheating
  219. nc, err = s.dao.Ups(c, mids)
  220. if err != nil {
  221. return
  222. }
  223. for k, v := range nc {
  224. ups[k] = v
  225. }
  226. }
  227. return
  228. }
  229. // avs result key: av_id, value: cheating with total_income, upload_time
  230. func (s *Service) getAvs(c context.Context, cheats map[int64]*model.Cheating, mtime time.Time) (avs map[int64]*model.Cheating, err error) {
  231. avs = make(map[int64]*model.Cheating)
  232. var avIds []int64
  233. for avID := range cheats {
  234. avIds = append(avIds, avID)
  235. if len(avIds) == 200 {
  236. var nc map[int64]*model.Cheating
  237. nc, err = s.dao.Avs(c, mtime, avIds)
  238. if err != nil {
  239. return
  240. }
  241. for k, v := range nc {
  242. avs[k] = v
  243. }
  244. avIds = make([]int64, 0)
  245. time.Sleep(200 * time.Millisecond)
  246. }
  247. }
  248. if len(avIds) > 0 {
  249. var na map[int64]*model.Cheating
  250. na, err = s.dao.Avs(c, mtime, avIds)
  251. if err != nil {
  252. return
  253. }
  254. for k, v := range na {
  255. avs[k] = v
  256. }
  257. }
  258. return
  259. }
  260. // key: mid, value: play_count
  261. func (s *Service) playCount(c context.Context, cheats map[int64]*model.Cheating) (pcs map[int64]int64, err error) {
  262. var mids []int64
  263. pcs = make(map[int64]int64)
  264. for mid := range cheats {
  265. mids = append(mids, mid)
  266. if len(mids) == 200 {
  267. var pc map[int64]int64
  268. pc, err = s.dao.PlayCount(c, mids)
  269. if err != nil {
  270. return
  271. }
  272. for k, v := range pc {
  273. pcs[k] = v
  274. }
  275. mids = make([]int64, 200)
  276. }
  277. }
  278. if len(mids) > 0 {
  279. var pc map[int64]int64
  280. pc, err = s.dao.PlayCount(c, mids)
  281. if err != nil {
  282. return
  283. }
  284. for k, v := range pc {
  285. pcs[k] = v
  286. }
  287. }
  288. return
  289. }
  290. type insertCheats func(c context.Context, cheats []*model.Cheating) (err error)
  291. func (s *Service) batchInsertCheats(c context.Context, cheats []*model.Cheating, insert insertCheats) (err error) {
  292. var end int
  293. for range cheats {
  294. end++
  295. if end%2000 == 0 {
  296. err = insert(c, cheats[:end])
  297. if err != nil {
  298. return
  299. }
  300. cheats = cheats[end:]
  301. end = 0
  302. }
  303. }
  304. if end > 0 {
  305. err = insert(c, cheats)
  306. }
  307. return
  308. }
  309. func (s *Service) batchInsertCheatUps(c context.Context, cheats []*model.Cheating) (err error) {
  310. var buf bytes.Buffer
  311. for _, cheat := range cheats {
  312. buf.WriteString("(")
  313. buf.WriteString(strconv.FormatInt(cheat.MID, 10))
  314. buf.WriteByte(',')
  315. buf.WriteString("\"" + cheat.SignedAt.Time().Format("2006-01-02 15:04:05") + "\"")
  316. buf.WriteByte(',')
  317. buf.WriteString("\"" + cheat.Nickname + "\"")
  318. buf.WriteByte(',')
  319. buf.WriteString(strconv.Itoa(cheat.Fans))
  320. buf.WriteByte(',')
  321. buf.WriteString(strconv.Itoa(cheat.CheatFans))
  322. buf.WriteByte(',')
  323. buf.WriteString(strconv.FormatInt(cheat.PlayCount, 10))
  324. buf.WriteByte(',')
  325. buf.WriteString(strconv.Itoa(cheat.CheatPlayCount))
  326. buf.WriteByte(',')
  327. buf.WriteString(strconv.Itoa(cheat.AccountState))
  328. buf.WriteString(")")
  329. buf.WriteByte(',')
  330. }
  331. if buf.Len() > 0 {
  332. buf.Truncate(buf.Len() - 1)
  333. }
  334. values := buf.String()
  335. buf.Reset()
  336. _, err = s.dao.InsertCheatUps(c, values)
  337. return
  338. }
  339. func (s *Service) batchInsertCheatArchives(c context.Context, cheats []*model.Cheating) (err error) {
  340. var buf bytes.Buffer
  341. for _, cheat := range cheats {
  342. buf.WriteString("(")
  343. buf.WriteString(strconv.FormatInt(cheat.AvID, 10))
  344. buf.WriteByte(',')
  345. buf.WriteString(strconv.FormatInt(cheat.MID, 10))
  346. buf.WriteByte(',')
  347. buf.WriteString("\"" + cheat.Nickname + "\"")
  348. buf.WriteByte(',')
  349. buf.WriteString("\"" + cheat.UploadTime.Time().Format("2006-01-02 15:04:05") + "\"")
  350. buf.WriteByte(',')
  351. buf.WriteString(strconv.Itoa(cheat.TotalIncome))
  352. buf.WriteByte(',')
  353. buf.WriteString(strconv.Itoa(cheat.CheatPlayCount))
  354. buf.WriteByte(',')
  355. buf.WriteString(strconv.Itoa(cheat.CheatFavorite))
  356. buf.WriteByte(',')
  357. buf.WriteString(strconv.Itoa(cheat.CheatCoin))
  358. buf.WriteByte(',')
  359. buf.WriteString(strconv.Itoa(cheat.Deducted))
  360. buf.WriteString(")")
  361. buf.WriteByte(',')
  362. }
  363. if buf.Len() > 0 {
  364. buf.Truncate(buf.Len() - 1)
  365. }
  366. values := buf.String()
  367. buf.Reset()
  368. _, err = s.dao.InsertCheatArchives(c, values)
  369. return
  370. }