fav.go 28 KB


  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "time"
  7. "go-common/app/job/main/favorite/model"
  8. favmdl "go-common/app/service/main/favorite/model"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. xtime "go-common/library/time"
  12. )
  13. const gap uint64 = 1 << 47
  14. const maxSquence uint64 = 1 + gap*64000
  15. const initSquence uint64 = 1 + gap*2000
  16. func (s *Service) upResource(c context.Context, msg *favmdl.Message) (err error) {
  17. if msg.Otype == 0 {
  18. //为了上线的时候兼容老的service逻辑
  19. msg.Otype = msg.Type
  20. }
  21. if msg.Type == favmdl.TypeMusicNew {
  22. msg.Type = favmdl.TypeVideo
  23. }
  24. switch msg.Action {
  25. case favmdl.ActionAdd:
  26. if err = s.addFav(c, msg.Otype, msg.Mid, msg.Fid, msg.Oid, msg.FTime, msg.Type); err != nil {
  27. return
  28. }
  29. s.delRecentOidsMc(c, msg.Type, msg.Mid)
  30. s.upFolderCnt(c, msg.Type, msg.Mid, msg.Fid, msg.FTime)
  31. s.upFavStat(c, msg.Otype, msg.Mid, msg.Oid, msg.FTime, true)
  32. case favmdl.ActionDel:
  33. if err = s.delFav(c, msg.Otype, msg.Mid, msg.Fid, msg.Oid, msg.FTime); err != nil {
  34. return
  35. }
  36. s.delRecentOidsMc(c, msg.Type, msg.Mid)
  37. s.upFolderCnt(c, msg.Type, msg.Mid, msg.Fid, msg.FTime)
  38. s.upFavStat(c, msg.Otype, msg.Mid, msg.Oid, msg.FTime, false)
  39. case favmdl.ActionMultiAdd:
  40. s.batchUpdateFavSeqsByAadd(c, msg.Mid, msg.Fid, msg.Type, msg.Oids)
  41. s.upFolderCnt(c, msg.Type, msg.Mid, msg.Fid, msg.FTime)
  42. s.upFavStats(c, msg.Otype, msg.Mid, msg.Oids, msg.FTime, true)
  43. s.delRecentOidsMc(c, msg.Type, msg.Mid)
  44. case favmdl.ActionMultiDel:
  45. s.upFolderCnt(c, msg.Type, msg.Mid, msg.Fid, msg.FTime)
  46. s.upFavStats(c, msg.Otype, msg.Mid, msg.Oids, msg.FTime, false)
  47. s.delRecentOidsMc(c, msg.Type, msg.Mid)
  48. s.initFolderRelations(c, msg.Type, msg.Mid, msg.Fid)
  49. s.initAllRelations(c, msg.Mid, msg.Fid)
  50. case model.ActionCopy:
  51. s.batchUpdateFavSeqsByAadd(c, msg.Mid, msg.NewFid, msg.Type, msg.Oids)
  52. s.upFolderCnt(c, msg.Type, msg.Mid, msg.NewFid, msg.FTime)
  53. s.upFavStats(c, msg.Otype, msg.Mid, msg.Oids, msg.FTime, true)
  54. s.delRecentOidsMc(c, msg.Type, msg.Mid)
  55. s.initFolderRelations(c, msg.Type, msg.Mid, msg.NewFid)
  56. s.initAllRelations(c, msg.Mid, msg.NewFid)
  57. case model.ActionMove:
  58. s.batchUpdateFavSeqsByAadd(c, msg.Mid, msg.NewFid, msg.Type, msg.Oids)
  59. s.upFolderCnt(c, msg.Type, msg.Mid, msg.NewFid, msg.FTime)
  60. s.upFolderCnt(c, msg.Type, msg.Mid, msg.OldFid, msg.FTime)
  61. s.delRecentOidsMc(c, msg.Type, msg.Mid)
  62. s.initFolderRelations(c, msg.Type, msg.Mid, msg.NewFid)
  63. s.initAllRelations(c, msg.Mid, msg.NewFid)
  64. case model.ActionClean:
  65. s.cleanInvalidFavs(c, msg.Type, msg.Mid, msg.Fid, msg.FTime)
  66. s.upFolderCnt(c, msg.Type, msg.Mid, msg.Fid, msg.FTime)
  67. s.delRecentOidsMc(c, msg.Type, msg.Mid)
  68. s.initFolderRelations(c, msg.Type, msg.Mid, msg.Fid)
  69. s.initAllRelations(c, msg.Mid, msg.Fid)
  70. case favmdl.ActionFolderDel:
  71. if err = s.delRelationsByFid(c, msg.Type, msg.Mid, msg.Fid, msg.FTime); err != nil {
  72. log.Error("s.delRelationsByFid(%d,%d,%d) error(%v)", msg.Type, msg.Mid, msg.Fid, err)
  73. }
  74. if err = s.favDao.DelRelationsCache(c, msg.Mid, msg.Fid); err != nil {
  75. log.Error("s.favDao.DelRelationsCache(%d,%d) error(%v)", msg.Mid, msg.Fid, err)
  76. }
  77. if err = s.favDao.DelAllRelationsCache(c, msg.Mid, msg.Fid); err != nil {
  78. log.Error("s.favDao.DelAllRelationsCache(%d,%d) error(%v)", msg.Mid, msg.Fid, err)
  79. }
  80. s.delRecentOidsMc(c, msg.Type, msg.Mid)
  81. case favmdl.ActionInitFolderRelations:
  82. s.initFolderRelations(c, msg.Type, msg.Mid, msg.Fid)
  83. case favmdl.ActionInitAllFolderRelations:
  84. s.initAllRelations(c, msg.Mid, msg.Fid)
  85. case favmdl.ActionInitRelationFids:
  86. var ok bool
  87. if ok, err = s.favDao.ExpireRelationOids(c, msg.Otype, msg.Mid); err != nil || ok {
  88. return
  89. }
  90. var rfmap map[int64][]int64
  91. if rfmap, err = s.favDao.RelationFids(c, msg.Otype, msg.Mid); err != nil {
  92. log.Error("favDao.FavedFids(%d,%d) error(%v)", msg.Otype, msg.Mid, err)
  93. return
  94. }
  95. if len(rfmap) == 0 {
  96. if err = s.favDao.SetUnFavedBit(c, msg.Otype, msg.Mid); err != nil {
  97. log.Error("s,favDao.SetUnFavedBit(type:%d,mid:%d) error(%v)", msg.Otype, msg.Mid, err)
  98. }
  99. } else {
  100. var oids []int64
  101. for oid := range rfmap {
  102. oids = append(oids, oid)
  103. }
  104. if err = s.favDao.SetRelationOidsCache(c, msg.Otype, msg.Mid, oids); err != nil {
  105. log.Error("s.favDao.SetRelationOidsCache(%d,%d,%v) error(%v)", msg.Otype, msg.Mid, oids, err)
  106. }
  107. }
  108. case favmdl.ActionSortFavs:
  109. s.sortFavs(c, msg)
  110. }
  111. return
  112. }
  113. func (s *Service) batchUpdateFavSeqsByAadd(c context.Context, mid, fid int64, typ int8, oids []int64) {
  114. log.Info("begin batchUpdateFavSeqsByAadd(%d,%d,%d,%+v)", mid, fid, typ, oids)
  115. favs, err := s.favDao.RelationsByOids(c, typ, mid, fid, oids)
  116. if err != nil {
  117. log.Error("s.RelationsByOids(%d,%d,%d,%v) failed!err:=%v", typ, mid, fid, oids, err)
  118. return
  119. }
  120. next, err := s.nextSquence(c, mid, fid)
  121. if err != nil {
  122. log.Error("s.nextSquence(%d,%d) failed!err:=%v", mid, fid, err)
  123. return
  124. }
  125. for _, fav := range favs {
  126. fav.Sequence = next
  127. if next >= maxSquence {
  128. next++
  129. } else {
  130. next += gap
  131. }
  132. }
  133. if err = s.batchUpdateSeqs(c, mid, fid, favs); err != nil {
  134. log.Error("s.batchUpdateSeqs (%d,%+v) err:=%v", mid, favs, err)
  135. }
  136. if err = s.favDao.DelAllRelationsCache(c, mid, fid); err != nil {
  137. err = nil
  138. log.Error("s.favDao.DelAllRelationsCache(%d,%d) err(%v)", mid, fid, err)
  139. }
  140. }
  141. func (s *Service) sortFavs(c context.Context, msg *favmdl.Message) {
  142. favs, err := s.favDao.AllRelations(c, msg.Mid, msg.Fid, 0, 1024)
  143. if err != nil {
  144. log.Error("s.favDao.AllRelations(%d,%d,%d,%d) error(%v)", msg.Mid, msg.Fid, 0, 2000, err)
  145. return
  146. }
  147. if len(favs) > 1000 || len(favs) <= 1 {
  148. log.Warn("sortFavs invalid fav(%d,%d) length:%d", msg.Mid, msg.Fid, len(favs))
  149. return
  150. }
  151. favsM := make(map[int64]*favmdl.Favorite)
  152. for i := range favs {
  153. favsM[favs[i].ResourceID()] = favs[i]
  154. }
  155. changed := make([]*favmdl.Favorite, 0)
  156. var reCount = 0
  157. sortFavsDesc(favs)
  158. if favs[len(favs)-1].Sequence == 0 {
  159. reSequence(favs)
  160. reCount++
  161. }
  162. for _, req := range msg.SortFavs {
  163. if req.Insert == nil {
  164. return
  165. }
  166. target, ok := favsM[req.Insert.ResourceID()]
  167. if !ok {
  168. return
  169. }
  170. if req.Pre == nil {
  171. max := favs[0].Sequence
  172. if max >= maxSquence {
  173. reCount++
  174. reSequence(favs)
  175. target.Sequence = favs[len(favs)-1].Sequence + gap
  176. } else if max >= maxSquence-gap {
  177. target.Sequence = (max + maxSquence + 1) / 2
  178. } else {
  179. target.Sequence = max + gap
  180. }
  181. } else {
  182. pre, ok := favsM[req.Pre.ResourceID()]
  183. if !ok {
  184. return
  185. }
  186. idx := searchIdx(favs, pre.Sequence)
  187. if idx == -1 {
  188. return
  189. }
  190. if idx < len(favs)-1 {
  191. next := favs[idx+1]
  192. if next.Oid == req.Insert.Oid && next.Type == int8(req.Insert.Typ) {
  193. // already sorted
  194. continue
  195. }
  196. if next.Sequence-pre.Sequence <= 1 {
  197. reCount++
  198. reSequence(favs)
  199. }
  200. target.Sequence = (pre.Sequence + next.Sequence) / 2
  201. } else {
  202. min := pre.Sequence
  203. if min <= 1 {
  204. // no space , need to reidx
  205. reCount++
  206. reSequence(favs)
  207. target.Sequence = favs[0].Sequence - gap
  208. } else if min <= 1+gap {
  209. // insert into the gap
  210. target.Sequence = min / 2
  211. } else {
  212. target.Sequence = min - gap
  213. }
  214. }
  215. }
  216. changed = append(changed, target)
  217. sortFavsDesc(favs)
  218. }
  219. if reCount > 0 {
  220. s.batchUpdateSeqs(c, msg.Mid, msg.Fid, favs)
  221. } else {
  222. s.batchUpdateSeqs(c, msg.Mid, msg.Fid, changed)
  223. }
  224. for i := range favs {
  225. favs[len(favs)-1-i].Sequence = uint64(i)
  226. }
  227. if err = s.favDao.AddAllRelationsCache(c, msg.Mid, msg.Fid, favs); err != nil {
  228. err = nil
  229. log.Error("s.favDao.AddAllRelationsCache(%d,%d,%v) err(%v)", msg.Mid, msg.Fid, favs, err)
  230. }
  231. }
  232. func (s *Service) batchUpdateSeqs(c context.Context, mid int64, fid int64, favs []*favmdl.Favorite) (err error) {
  233. for i := 0; i < len(favs); i += 10000 {
  234. end := i + 10000
  235. if end > len(favs) {
  236. end = len(favs)
  237. }
  238. _, err = s.favDao.BatchUpdateSeq(c, mid, favs[i:end])
  239. if err != nil {
  240. errStr := err.Error()
  241. if len(errStr) > 200 {
  242. errStr = errStr[:200]
  243. }
  244. log.Error("s.favDao.BatchUpdateSeq(%d,%v) err(%v)", mid, favs, errStr)
  245. _, err = s.favDao.BatchUpdateSeq(c, mid, favs[i:end])
  246. if err != nil {
  247. errStr := err.Error()
  248. if len(errStr) > 200 {
  249. errStr = errStr[:200]
  250. }
  251. log.Error("s.favDao.BatchUpdateSeq(%d,%v) err(%v)", mid, favs, errStr)
  252. return err
  253. }
  254. }
  255. }
  256. return
  257. }
  258. func searchIdx(favs []*favmdl.Favorite, sequence uint64) int {
  259. i := sort.Search(len(favs), func(i int) bool { return favs[i].Sequence <= sequence })
  260. if i < len(favs) && favs[i].Sequence == sequence {
  261. return i
  262. }
  263. return -1
  264. }
  265. func sortFavsDesc(favs []*favmdl.Favorite) {
  266. sort.Slice(favs, func(i, j int) bool {
  267. if favs[i].Sequence == favs[j].Sequence {
  268. return favs[i].MTime > favs[j].MTime
  269. }
  270. return favs[i].Sequence > favs[j].Sequence
  271. })
  272. }
  273. // 重新计算所有数据的Sequence
  274. func reSequence(favs []*favmdl.Favorite) {
  275. seq := initSquence
  276. last := len(favs) - 1
  277. for i := range favs {
  278. favs[last-i].Sequence = seq
  279. seq += gap
  280. }
  281. }
  282. func (s *Service) nextSquence(c context.Context, mid, fid int64) (uint64, error) {
  283. max, err := s.favDao.MaxRelation(c, mid, fid)
  284. if err != nil {
  285. log.Error("s.favDao.MaxRelation(%d,%d) error(%v)", mid, fid, err)
  286. return 0, err
  287. }
  288. var seq uint64
  289. if max == nil {
  290. seq = initSquence
  291. } else if max.Sequence == 0 {
  292. var cnt int
  293. cnt, err = s.favDao.RelationCnt(c, mid, fid)
  294. if err != nil {
  295. log.Error("s.favDao.RelationCnt(%d,%d) error(%v)", mid, fid, err)
  296. return 0, err
  297. }
  298. if cnt <= 50000 {
  299. seq = initSquence + uint64(cnt+10)*gap
  300. } else {
  301. log.Error("nextSquence: can't add res over 50000")
  302. err = ecode.FavMaxVideoCount
  303. return 0, err
  304. }
  305. } else if max.Sequence+gap <= maxSquence {
  306. seq = max.Sequence + gap
  307. } else {
  308. seq = max.Sequence + 1
  309. }
  310. return seq, nil
  311. }
  312. func (s *Service) addFav(c context.Context, typ int8, mid, fid, oid, ftime int64, ftype int8) (err error) {
  313. v := &favmdl.Favorite{
  314. Type: typ,
  315. Mid: mid,
  316. Fid: fid,
  317. Oid: oid,
  318. CTime: xtime.Time(ftime),
  319. MTime: xtime.Time(ftime),
  320. }
  321. v.Sequence, err = s.nextSquence(c, mid, fid)
  322. if err != nil {
  323. return
  324. }
  325. rows, err := s.favDao.AddRelation(c, v)
  326. if err != nil {
  327. log.Error("s.favDao.AddRelation(%v) error(%v)", v, err)
  328. return
  329. }
  330. if rows < 1 {
  331. log.Warn("type(%d) oid(%d) already favoured", typ, oid)
  332. err = model.ErrFavResourceExist
  333. return
  334. }
  335. v.ID = rows
  336. err = s.cache.Do(c, func(c context.Context) {
  337. if err = s.favDao.SetFavedBit(c, typ, mid); err != nil {
  338. log.Error("s.favDao.SetFavedBit(%d, %d) error(%v)", typ, mid, err)
  339. }
  340. var ok bool
  341. if ftype == typ {
  342. if ok, err = s.favDao.ExpireRelations(c, mid, fid); err != nil {
  343. log.Error("s.favDao.ExpireRelations(%d,%d) error(%v)", mid, fid, err)
  344. }
  345. if ok {
  346. if err = s.favDao.AddRelationCache(c, v); err != nil {
  347. log.Error("s.favDao.AddRelationCache(%d, %d,%d) error(%v)", typ, mid, fid, err)
  348. }
  349. }
  350. }
  351. if ok, err = s.favDao.ExpireAllRelations(c, mid, fid); err != nil {
  352. log.Error("s.favDao.ExpireAllRelations(%d,%d) error(%v)", mid, fid, err)
  353. }
  354. if ok {
  355. if err = s.favDao.AddAllRelationCache(c, v); err != nil {
  356. log.Error("s.favDao.AddAllRelationCache(%d, %d,%d) error(%v)", typ, mid, fid, err)
  357. }
  358. }
  359. if ok, err = s.favDao.ExpireRelationOids(c, typ, mid); err != nil {
  360. log.Error("s.favDao.ExpireRelationOids(%d,%d) error(%v)", typ, mid, err)
  361. }
  362. if ok {
  363. if err = s.favDao.AddRelationOidCache(c, typ, mid, oid); err != nil {
  364. log.Error("s.favDao.AddRelationOidCache(%d,%d,%d) error(%v)", typ, mid, oid, err)
  365. }
  366. }
  367. if err = s.favDao.DelRelationFidsMc(c, typ, mid, oid); err != nil {
  368. log.Error("s.favDao.DelRelationFidsMc(%d,%d,%d) error(%v)", typ, mid, oid, err)
  369. }
  370. })
  371. if err != nil {
  372. log.Error("s.cache.Do error(%v)", err)
  373. }
  374. return
  375. }
  376. // DelFav delete a favorite.
  377. func (s *Service) delFav(c context.Context, typ int8, mid, fid, oid, ftime int64) (err error) {
  378. rows, err := s.favDao.DelRelation(c, typ, mid, fid, oid, xtime.Time(ftime))
  379. if err != nil {
  380. log.Error("s.favDao.DelRelation(%d,%d,%d) error(%v)", typ, oid, fid, err)
  381. return
  382. }
  383. if rows < 1 {
  384. log.Warn("s.favDao.DelRelation(%d,%d,%d,%d) have no del", typ, mid, fid, oid)
  385. err = ecode.FavResourceAlreadyDel
  386. return
  387. }
  388. err = s.cache.Do(c, func(c context.Context) {
  389. if err1 := s.favDao.DelRelationCache(c, mid, fid, oid); err1 != nil {
  390. log.Error("s.favDao.DelRelationCache(%d,%d,%d) error(%v)", mid, fid, oid, err1)
  391. }
  392. if err1 := s.favDao.DelAllRelationCache(c, mid, fid, oid, typ); err1 != nil {
  393. log.Error("s.favDao.DelAllRelationCache(%d,%d,%d) error(%v)", mid, fid, oid, err1)
  394. }
  395. if err1 := s.favDao.DelRelationFidsMc(c, typ, mid, oid); err1 != nil {
  396. log.Error("s.favDao.DelRelationFidsMc(%d,%d,%d) error(%v)", typ, mid, oid, err1)
  397. }
  398. })
  399. if err != nil {
  400. log.Error("s.cache.Do error(%v)", err)
  401. }
  402. return
  403. }
  404. func (s *Service) setAllRelationCache(c context.Context, mid, fid int64) (err error) {
  405. var (
  406. mtime = xtime.Time(0)
  407. pageSize = 8000
  408. favss []*favmdl.Favorite
  409. )
  410. for {
  411. favs, err1 := s.favDao.AllRelations(c, mid, fid, mtime, pageSize)
  412. if err1 != nil {
  413. if err = s.favDao.DelAllRelationsCache(c, mid, fid); err != nil {
  414. log.Error("s.favDao.DelAllRelationsCache(%d,%d) error(%v)", mid, fid, err)
  415. }
  416. return err1
  417. }
  418. if len(favs) == 0 {
  419. break
  420. }
  421. mtime = favs[len(favs)-1].MTime
  422. if mtime == favs[0].MTime {
  423. mtime++
  424. }
  425. favss = append(favss, favs...)
  426. }
  427. if len(favss) <= 0 {
  428. return
  429. }
  430. sortFavsDesc(favss)
  431. var needUpdateSeq bool
  432. if favss[len(favss)-1].Sequence == 0 {
  433. needUpdateSeq = true
  434. }
  435. for i := range favss {
  436. favss[len(favss)-1-i].Sequence = uint64(i)
  437. }
  438. if err = s.favDao.AddAllRelationsCache(c, mid, fid, favss); err != nil {
  439. log.Error("s.favDao.AddAllRelationsCache(%d,%d,%d,%v) error(%v)", mid, fid, favss, err)
  440. }
  441. if needUpdateSeq {
  442. reSequence(favss)
  443. s.batchUpdateSeqs(c, mid, fid, favss)
  444. }
  445. return
  446. }
  447. func (s *Service) setRelationCache(c context.Context, tp int8, mid, fid int64) (err error) {
  448. var (
  449. mtime = xtime.Time(0)
  450. pageSize = 8000
  451. favss []*favmdl.Favorite
  452. )
  453. for {
  454. favs, err1 := s.favDao.Relations(c, tp, mid, fid, mtime, pageSize)
  455. if err1 != nil {
  456. log.Error("s.favDao.Relations(%d,%d,%d,%d,%d) error(%v)", tp, mid, fid, mtime, pageSize, err)
  457. if err = s.favDao.DelRelationsCache(c, mid, fid); err != nil {
  458. log.Error("s.favDao.DelRelationsCache(%d,%d) error(%v)", mid, fid, err)
  459. }
  460. return err1
  461. }
  462. if len(favs) == 0 {
  463. break
  464. }
  465. mtime = favs[len(favs)-1].MTime
  466. if mtime == favs[0].MTime {
  467. mtime++
  468. }
  469. favss = append(favss, favs...)
  470. }
  471. if err = s.favDao.AddRelationsCache(c, tp, mid, fid, favss); err != nil {
  472. log.Error("s.favDao.AddRelationsCache(%d,%d,%d,%v) error(%v)", tp, mid, fid, favss, err)
  473. }
  474. return
  475. }
  476. func (s *Service) upFolderCnt(c context.Context, tp int8, mid, fid, ftime int64) (err error) {
  477. cnt, err := s.favDao.RelationCnt(c, mid, fid)
  478. if err != nil {
  479. log.Error("s.favDao.CntRelations(%d,%d,%d) error(%v)", tp, mid, fid, err)
  480. return
  481. }
  482. folder, err := s.folder(c, tp, mid, fid)
  483. if err != nil {
  484. return
  485. }
  486. if _, err = s.favDao.UpFolderCnt(c, mid, fid, cnt, xtime.Time(ftime)); err != nil {
  487. log.Error("s.favDao.UpFolderCnt(%d,%d,%d,%d) error(%v)", mid, fid, ftime, cnt, err)
  488. return
  489. }
  490. folder.Count = cnt
  491. folder.MTime = xtime.Time(ftime)
  492. var recent []*favmdl.Resource
  493. if recent, err = s.favDao.RecentRes(c, mid, fid); err != nil {
  494. log.Error(" s.favDao.RecentRes(%d,%d) error(%v) or folder is nil", mid, fid, err)
  495. err = nil
  496. }
  497. folder.RecentOids = []int64{}
  498. folder.RecentRes = []*favmdl.Resource{}
  499. if len(recent) > 0 {
  500. folder.RecentRes = recent
  501. for _, res := range recent {
  502. if res.Typ == int32(tp) {
  503. folder.RecentOids = append(folder.RecentOids, res.Oid)
  504. }
  505. }
  506. }
  507. err = s.cache.Do(c, func(c context.Context) {
  508. if err1 := s.favDao.SetFoldersMc(c, folder); err1 != nil {
  509. log.Error("s.favDao.SetFolderMc(%v) error(%v)", folder, err1)
  510. }
  511. if err1 := s.favDao.DelNewCoverCache(c, folder.Mid, folder.ID); err1 != nil {
  512. log.Error("s.favDao.DelNewCoverCache(%v) error(%v)", folder, err1)
  513. }
  514. })
  515. if err != nil {
  516. log.Error("s.cache.Do error(%v)", err)
  517. }
  518. return
  519. }
  520. func (s *Service) folder(c context.Context, typ int8, mid, fid int64) (folder *favmdl.Folder, err error) {
  521. if folder, err = s.favDao.FolderMc(c, typ, mid, fid); err != nil {
  522. log.Error("s.favDao.FolderMc(%d,%d) error(%v)", mid, fid, err)
  523. err = nil
  524. }
  525. if folder == nil {
  526. if folder, err = s.favDao.Folder(c, typ, mid, fid); err != nil {
  527. log.Error("favDao.Folder(%d,%d) error(%v) or folder is nil", mid, fid, err)
  528. return
  529. }
  530. }
  531. if folder == nil {
  532. err = ecode.FavFolderNotExist
  533. }
  534. return
  535. }
  536. // upFavStats update upFavStat.
  537. func (s *Service) upFavStats(c context.Context, typ int8, mid int64, oids []int64, now int64, isAdd bool) error {
  538. for _, oid := range oids {
  539. if err := s.upFavStat(c, typ, mid, oid, now, isAdd); err != nil {
  540. log.Error("s.upFavStat(%d,%d,%d,%d,%d) error(%v)", typ, mid, oid, now, isAdd, err)
  541. return err
  542. }
  543. }
  544. return nil
  545. }
  546. // upFavStat update update resource fav count.
  547. func (s *Service) upFavStat(c context.Context, tp int8, mid, oid, now int64, isAdd bool) error {
  548. fids, err := s.favDao.RelationFidsByOid(c, tp, mid, oid)
  549. if err != nil {
  550. log.Error("s.favDao.RelationFidsByOid(%d,%d,%d) error(%v)", tp, mid, oid, err)
  551. return err
  552. }
  553. if len(fids) != 0 {
  554. if err1 := s.favDao.SetRelaitonFidsMc(c, tp, mid, oid, fids); err != nil {
  555. log.Error("s.favDao.SetRelaitonFidsMc(%d,%d,%d) error(%v)", tp, mid, oid, err1)
  556. }
  557. }
  558. length := len(fids)
  559. var incr int
  560. if isAdd && length == 1 {
  561. incr = 1
  562. err = s.addFavOperations(c, tp, mid, oid, now)
  563. if err != nil {
  564. return err
  565. }
  566. } else if !isAdd && length == 0 {
  567. incr = -1
  568. err = s.delFavOperations(c, tp, mid, oid, now)
  569. if err != nil {
  570. return err
  571. }
  572. }
  573. if incr != 0 {
  574. cnt, err := s.favDao.StatCnt(c, tp, oid)
  575. if err != nil {
  576. return err
  577. }
  578. if (cnt + incr) < 0 {
  579. return nil
  580. }
  581. rows, err := s.favDao.UpStatCnt(c, tp, oid, incr, xtime.Time(now))
  582. if err != nil {
  583. log.Error("s.favDao.UpStatCnt(%d,%d,%d) error(%v)", tp, oid, incr, err)
  584. return err
  585. }
  586. if rows < 1 {
  587. log.Warn("s.favDao.UpStatCnt(%d,%d,%d) rows(%d)", tp, oid, incr, rows)
  588. return nil
  589. }
  590. err = s.cache.Do(c, func(c context.Context) {
  591. if err = s.favDao.SetOidCountMc(c, tp, oid, int64(cnt+incr)); err != nil {
  592. log.Error("s.favDao.SetOidCountMc(%d,%d,%d) error(%v)", tp, oid, int64(cnt+incr), err)
  593. }
  594. if err = s.favDao.DelBatchOidsMc(c, tp, mid); err != nil {
  595. log.Error("s.favDao.SetOidCountMc(%d,%d) error(%v)", tp, mid, err)
  596. }
  597. })
  598. if err != nil {
  599. log.Error("s.cache.Do error(%v)", err)
  600. }
  601. s.addCoin(c, isAdd, cnt+incr, tp, oid)
  602. s.pubDao.PubStats(c, tp, oid, int64(cnt+incr))
  603. // bnj merge stat
  604. if err1 := s.bnjStatMerge(c, tp, oid, incr); err1 != nil {
  605. log.Error("s.bnjMergeStat(%d,%d,%d) error(%v)", tp, oid, incr, err1)
  606. }
  607. }
  608. return nil
  609. }
  610. func (s *Service) bnjStatMerge(c context.Context, typ int8, oid int64, incr int) (err error) {
  611. target := s.mergeTarget(int(typ), oid)
  612. if target <= 0 {
  613. return
  614. }
  615. cnt, err := s.favDao.StatCnt(c, typ, target)
  616. if err != nil {
  617. return
  618. }
  619. rows, err := s.favDao.UpStatCnt(c, typ, target, incr, xtime.Time(time.Now().Unix()))
  620. if err != nil || rows < 1 {
  621. log.Error("s.favDao.UpStatCnt(%d,%d,%d,%d) error(%v)", typ, target, incr, rows, err)
  622. return
  623. }
  624. s.pubDao.PubStats(c, typ, target, int64(cnt+incr))
  625. return
  626. }
  627. func (s *Service) addCoin(c context.Context, isAdd bool, count int, tp int8, oid int64) (err error) {
  628. var (
  629. mid int64
  630. msgAdd, msgDel string
  631. )
  632. mod := count % 200
  633. if mod != 0 && mod != 199 {
  634. return
  635. }
  636. switch tp {
  637. case favmdl.Article:
  638. article, err := s.articleRPC(c, oid)
  639. if err != nil {
  640. log.Error("s.favDao.ArticleRPC error(%v)", oid, err)
  641. return err
  642. }
  643. meta, ok := article[oid]
  644. if !ok || meta == nil {
  645. log.Error("article martmdl.Meta(%v) error(%v)", article, err)
  646. return err
  647. }
  648. mid = meta.Author.Mid
  649. msgAdd = "专栏CV%d新增200人收藏,总收藏%d"
  650. msgDel = "专栏CV%d有200人取消收藏,总收藏%d"
  651. case favmdl.TypeVideo:
  652. archive, err := s.archiveRPC(c, oid)
  653. if err != nil {
  654. log.Error("s.favDao.archiveRPC error(%v)", oid, err)
  655. return err
  656. }
  657. mid = archive.Author.Mid
  658. msgAdd = "稿件AV%d新增200人收藏,总收藏%d"
  659. msgDel = "稿件AV%d有200人取消收藏,总收藏%d"
  660. default:
  661. log.Warn("this type(%d) need not to add coin", tp)
  662. return
  663. }
  664. // add money to upper
  665. if isAdd && mod == 0 {
  666. if err := s.addCoinRPC(c, mid, 1, fmt.Sprintf(msgAdd, oid, count)); err != nil {
  667. log.Error("s.addCoinRPC(%d,%s) error(%v)", mid, fmt.Sprintf(msgAdd, oid, count), err)
  668. return err
  669. }
  670. }
  671. if !isAdd && mod == 199 {
  672. if err := s.addCoinRPC(c, mid, -1, fmt.Sprintf(msgDel, oid, count)); err != nil {
  673. log.Error("s.addCoinRPC(%d,%s) error(%v)", mid, fmt.Sprintf(msgAdd, oid, count), err)
  674. return err
  675. }
  676. }
  677. return
  678. }
  679. func (s *Service) delRelationsByFid(c context.Context, typ int8, mid, fid, ftime int64) (err error) {
  680. var (
  681. offset int
  682. count = s.c.Fav.MaxPageSize
  683. )
  684. typs := []int8{typ}
  685. if typ == 2 {
  686. // 收藏夹type=2是混合类型的收藏夹,需要删除多个type的稿件关系,现在只有music所以只要append 12
  687. typs = append(typs, 12)
  688. }
  689. for _, tp := range typs {
  690. for {
  691. var (
  692. rows int64
  693. oids []int64
  694. )
  695. if oids, err = s.favDao.OidsByFid(c, tp, mid, fid, offset, count); err != nil {
  696. log.Error("s.favDao.OidsByFid(%d,%d,%d,%d,%d) error(%v)", tp, mid, fid, offset, count, err)
  697. time.Sleep(time.Millisecond * 500) // avoid endless loop
  698. continue
  699. }
  700. if len(oids) == 0 {
  701. break
  702. }
  703. if rows, err = s.favDao.DelRelationsByOids(c, tp, mid, fid, oids, xtime.Time(ftime)); err != nil {
  704. log.Error("s.favDao.DelRelationsByOids(%d,%d,%d,%v) error(%v)", tp, mid, fid, oids, err)
  705. time.Sleep(time.Millisecond * 500) // avoid endless loop
  706. continue
  707. }
  708. offset += count
  709. if rows != int64(len(oids)) {
  710. log.Error("rows!=int64(len(oids)) rows:%d,len(aids):%d", rows, len(oids))
  711. }
  712. if rows > 0 {
  713. s.upFavStats(c, tp, mid, oids, ftime, false)
  714. }
  715. time.Sleep(time.Duration(s.c.Fav.SleepTime)) // for binlog cunsumers
  716. }
  717. }
  718. return
  719. }
  720. func (s *Service) addFavOperations(c context.Context, typ int8, mid, oid, now int64) (err error) {
  721. ok, err := s.favDao.ExpireRelationOids(c, typ, mid)
  722. if err != nil {
  723. log.Error("s.favDao.ExpireRelationFids(%d,%d) error(%v)", typ, mid, err)
  724. } else if ok {
  725. if err = s.favDao.AddRelationOidCache(c, typ, mid, oid); err != nil {
  726. log.Error("s.favDao.AddRelationOidCache(%d,%d,%d) error(%v)", typ, mid, oid, err)
  727. }
  728. }
  729. if typ < favmdl.TypeBangumi {
  730. err = nil
  731. return
  732. }
  733. u := &favmdl.User{
  734. Type: typ,
  735. Oid: oid,
  736. Mid: mid,
  737. CTime: xtime.Time(now),
  738. MTime: xtime.Time(now),
  739. }
  740. rows, err := s.favDao.AddUser(c, u)
  741. if err != nil {
  742. log.Error("s.favDao.AddUser(%+v) error(%v)", u, err)
  743. return
  744. }
  745. if rows == 0 {
  746. log.Warn("s.favDao.DelUser(%+v) rows(%v)", u, rows)
  747. }
  748. return
  749. }
  750. func (s *Service) delFavOperations(c context.Context, typ int8, mid, oid, now int64) (err error) {
  751. if err = s.favDao.RemRelationOidCache(c, typ, mid, oid); err != nil {
  752. log.Error("s.favDao.RemRelationOidCache(%d,%d,%d) error(%v)", typ, mid, oid, err)
  753. err = nil
  754. }
  755. if typ < favmdl.TypeBangumi {
  756. return
  757. }
  758. u := &favmdl.User{
  759. Type: typ,
  760. Oid: oid,
  761. Mid: mid,
  762. State: favmdl.StateIsDel,
  763. CTime: xtime.Time(now),
  764. MTime: xtime.Time(now),
  765. }
  766. rows, err := s.favDao.DelUser(c, u)
  767. if err != nil {
  768. log.Error("s.favDao.DelUser(%+v) error(%v)", u, err)
  769. return err
  770. }
  771. if rows == 0 {
  772. log.Warn("s.favDao.DelUser(%+v) rows(%v)", u, rows)
  773. }
  774. return
  775. }
  776. func (s *Service) delRecentOidsMc(c context.Context, typ int8, mid int64) {
  777. if err := s.favDao.DelRecentOidsMc(c, typ, mid); err != nil {
  778. log.Error("s.favDao.DelRecentOidsMc(%d,%d) error(%v)", typ, mid, err)
  779. }
  780. if err := s.favDao.DelRecentResMc(c, favmdl.TypeVideo, mid); err != nil {
  781. log.Error("s.favDao.DelRecentResMc(%d,%d) error(%v)", typ, mid, err)
  782. }
  783. }
  784. func (s *Service) cleanInvalidFavs(c context.Context, typ int8, mid, fid, ftime int64) (err error) {
  785. if typ != favmdl.TypeVideo {
  786. return
  787. }
  788. var (
  789. mtime = xtime.Time(0)
  790. pageSize = 8000
  791. batchCount = s.c.Fav.MaxPageSize
  792. )
  793. var oids = make(map[int64]struct{})
  794. var musicIds = make(map[int64]struct{})
  795. var batchOids []int64
  796. for {
  797. favs, err := s.favDao.AllRelations(c, mid, fid, mtime, pageSize)
  798. if err != nil {
  799. return err
  800. }
  801. if len(favs) == 0 {
  802. break
  803. }
  804. mtime = favs[len(favs)-1].MTime
  805. if mtime == favs[0].MTime {
  806. mtime++
  807. }
  808. for _, fav := range favs {
  809. if fav.Type == favmdl.TypeVideo {
  810. oids[fav.Oid] = struct{}{}
  811. } else if fav.Type == favmdl.TypeMusicNew {
  812. musicIds[fav.Oid] = struct{}{}
  813. }
  814. }
  815. }
  816. for oid := range oids {
  817. if len(batchOids) >= batchCount {
  818. s.cleanVideoFavs(c, mid, fid, ftime, batchOids)
  819. batchOids = batchOids[:0]
  820. }
  821. batchOids = append(batchOids, oid)
  822. }
  823. if len(batchOids) > 0 {
  824. s.cleanVideoFavs(c, mid, fid, ftime, batchOids)
  825. }
  826. batchOids = batchOids[:0]
  827. for oid := range musicIds {
  828. if len(batchOids) >= batchCount {
  829. s.cleanMuiscFavs(c, mid, fid, ftime, batchOids)
  830. batchOids = batchOids[:0]
  831. }
  832. batchOids = append(batchOids, oid)
  833. }
  834. if len(batchOids) > 0 {
  835. s.cleanMuiscFavs(c, mid, fid, ftime, batchOids)
  836. }
  837. batchOids = batchOids[:0]
  838. err = s.favDao.SetCleanedCache(c, typ, mid, fid, ftime, s.cleanCDTime)
  839. s.cache.Do(c, func(c context.Context) {
  840. if err1 := s.favDao.DelRecentOidsMc(c, typ, mid); err1 != nil {
  841. log.Error("s.favDao.DelRecentOidsMc(%d,%d) error(%v)", typ, mid, err1)
  842. }
  843. if err := s.favDao.DelRecentResMc(c, favmdl.TypeVideo, mid); err != nil {
  844. log.Error("s.favDao.DelRecentResMc(%d,%d) error(%v)", typ, mid, err)
  845. }
  846. if err1 := s.favDao.DelRelationOidsCache(c, typ, mid); err1 != nil {
  847. log.Error("s.favDao.DelRelationOidsCache(%d,%d) error(%v)", typ, mid, err1)
  848. }
  849. if err1 := s.favDao.DelRelationsCache(c, mid, fid); err1 != nil {
  850. log.Error("s.favDao.DelRelationsCache(%d,%d) error(%v)", mid, fid, err1)
  851. }
  852. if err1 := s.favDao.DelAllRelationsCache(c, mid, fid); err1 != nil {
  853. log.Error("s.favDao.DelAllRelationsCache(%d,%d) error(%v)", mid, fid, err1)
  854. }
  855. })
  856. return
  857. }
  858. func (s *Service) cleanMuiscFavs(c context.Context, mid, fid, ftime int64, oids []int64) (err error) {
  859. var delOids []int64
  860. musics, err := s.musicDao.MusicMap(c, oids)
  861. if err != nil {
  862. log.Error("s.ArcsRPC(%v) error(%v)", oids, err)
  863. return
  864. }
  865. for _, oid := range oids {
  866. if _, ok := musics[oid]; !ok {
  867. delOids = append(delOids, oid)
  868. }
  869. }
  870. if len(delOids) > 0 {
  871. var rows int64
  872. if rows, err = s.favDao.DelRelationsByOids(c, favmdl.TypeMusicNew, mid, fid, delOids, xtime.Time(ftime)); err != nil {
  873. log.Error("s.favDao.DelRelationsByOids(%d,%d,%v) error(%v)", favmdl.TypeMusicNew, mid, fid, delOids, err)
  874. }
  875. if rows > 0 {
  876. s.upFavStats(c, favmdl.TypeMusicNew, mid, delOids, ftime, false)
  877. }
  878. }
  879. return
  880. }
  881. func (s *Service) cleanVideoFavs(c context.Context, mid, fid, ftime int64, oids []int64) (err error) {
  882. var delOids []int64
  883. arcs, err := s.ArcsRPC(c, oids)
  884. if err != nil {
  885. log.Error("s.ArcsRPC(%v) error(%v)", oids, err)
  886. return
  887. }
  888. for aid, arc := range arcs {
  889. if arc.IsNormal() {
  890. continue
  891. }
  892. delOids = append(delOids, aid)
  893. }
  894. if len(delOids) > 0 {
  895. var rows int64
  896. if rows, err = s.favDao.DelRelationsByOids(c, favmdl.TypeVideo, mid, fid, delOids, xtime.Time(ftime)); err != nil {
  897. log.Error("s.favDao.DelRelationsByOids(%d,%d,%v) error(%v)", favmdl.TypeVideo, mid, fid, delOids, err)
  898. }
  899. if rows > 0 {
  900. s.upFavStats(c, favmdl.TypeVideo, mid, delOids, ftime, false)
  901. }
  902. }
  903. return
  904. }
  905. func (s *Service) initFolderRelations(c context.Context, typ int8, mid, fid int64) (err error) {
  906. if fid <= 0 {
  907. log.Warn("folderID must not be zero!%d %d", mid, fid)
  908. return
  909. }
  910. var ok bool
  911. if ok, err = s.favDao.ExpireRelations(c, mid, fid); err != nil || ok {
  912. return
  913. }
  914. // 顺带更新folder的count
  915. s.setRelationCache(c, typ, mid, fid)
  916. return
  917. }
  918. func (s *Service) initAllRelations(c context.Context, mid, fid int64) (err error) {
  919. if fid <= 0 {
  920. log.Warn("folderID must not be zero!%d %d", mid, fid)
  921. return
  922. }
  923. var ok bool
  924. if ok, err = s.favDao.ExpireAllRelations(c, mid, fid); err != nil || ok {
  925. return
  926. }
  927. // 顺带更新folder的count
  928. s.setAllRelationCache(c, mid, fid)
  929. return
  930. }