cursor.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "strconv"
  6. "time"
  7. "go-common/app/interface/main/reply/dao/reply"
  8. model "go-common/app/interface/main/reply/model/reply"
  9. xmodel "go-common/app/interface/main/reply/model/xreply"
  10. accmdl "go-common/app/service/main/account/api"
  11. assmdl "go-common/app/service/main/assist/model/assist"
  12. "go-common/library/ecode"
  13. "go-common/library/log"
  14. "sort"
  15. "go-common/library/sync/errgroup.v2"
  16. )
  17. const (
  18. defaultChildrenSize = 5
  19. )
  20. // NewCursorByReplyID NewCursorByReplyID
  21. func (s *Service) NewCursorByReplyID(ctx context.Context, oid int64,
  22. otyp int8, replyID int64, size int, cmp model.Comp) (*model.Cursor, error) {
  23. rs, err := s.GetReplyByIDs(ctx, oid, otyp, []int64{replyID})
  24. if err != nil {
  25. return nil, err
  26. }
  27. if r, ok := rs[replyID]; ok {
  28. return model.NewCursor(int64(r.Floor), 0, size, cmp)
  29. }
  30. return nil, ecode.ReplyNotExist
  31. }
  32. // NewSubCursorByReplyID NewSubCursorByReplyID
  33. func (s *Service) NewSubCursorByReplyID(ctx context.Context, oid int64, otyp int8, replyID int64, size int, cmp model.Comp) (rootID int64, cursor *model.Cursor, err error) {
  34. rs, err := s.GetReplyByIDs(ctx, oid, otyp, []int64{replyID})
  35. if err != nil {
  36. return 0, nil, err
  37. }
  38. if r, ok := rs[replyID]; ok {
  39. if r.IsRoot() {
  40. rootID = r.RpID
  41. cursor, err = model.NewCursor(0, 1, size, cmp)
  42. return
  43. }
  44. // 不足一页面时,展示够一页
  45. floor := r.Floor
  46. if floor < size {
  47. floor = size
  48. }
  49. rootID = r.Root
  50. cursor, err = model.NewCursor(int64(floor), 0, size, cmp)
  51. return
  52. }
  53. return 0, nil, ecode.ReplyNotExist
  54. }
  55. // GetRootReplyListHeader GetRootReplyListHeader
  56. func (s *Service) GetRootReplyListHeader(ctx context.Context, sub *model.Subject, params *model.CursorParams) (*model.RootReplyListHeader, error) {
  57. var hotIDs []int64
  58. res, err := s.replyHotFeed(ctx, params.Mid, sub.Oid, int(sub.Type), 1, params.HotSize+2)
  59. if err == nil && res != nil && len(res.RpIDs) > 0 {
  60. log.Info("reply-feed(test): reply abtest mid(%d) oid(%d) type(%d) test name(%s) rpIDs(%v)", params.Mid, sub.Oid, sub.Type, res.Name, res.RpIDs)
  61. hotIDs = res.RpIDs
  62. } else {
  63. if err != nil {
  64. log.Error("reply-feed error(%v)", err)
  65. err = nil
  66. } else {
  67. log.Info("reply-feed(origin): reply abtest mid(%d) oid(%d) type(%d) test name(%s) rpIDs(%v)", params.Mid, sub.Oid, sub.Type, res.Name, res.RpIDs)
  68. }
  69. if hotIDs, err = s.GetRootReplyIDs(ctx, sub.Oid, sub.Type, model.SortByLike, 0, int64(params.HotSize+2)); err != nil {
  70. log.Error("%v", err)
  71. return nil, err
  72. }
  73. }
  74. var parentIDs []int64
  75. parentIDs = append(parentIDs, hotIDs...)
  76. var adminTopReply, upperTopReply *model.Reply
  77. if sub.AttrVal(model.SubAttrAdminTop) == model.AttrYes {
  78. adminTopReply, err = s.GetTopReply(ctx, params.Oid, params.OTyp, model.SubAttrAdminTop)
  79. if err != nil {
  80. return nil, err
  81. }
  82. if adminTopReply != nil {
  83. parentIDs = append(parentIDs, adminTopReply.RpID)
  84. }
  85. }
  86. if sub.AttrVal(model.SubAttrUpperTop) == model.AttrYes {
  87. upperTopReply, err = s.GetTopReply(ctx, params.Oid, params.OTyp, model.SubAttrUpperTop)
  88. if err != nil {
  89. return nil, err
  90. }
  91. if upperTopReply != nil {
  92. if !upperTopReply.IsNormal() && sub.Mid != params.Mid {
  93. upperTopReply = nil
  94. } else {
  95. parentIDs = append(parentIDs, upperTopReply.RpID)
  96. }
  97. }
  98. }
  99. parentChildrenIDRelation, err := s.ParentChildrenReplyIDRelation(ctx, sub, parentIDs)
  100. if err != nil {
  101. return nil, err
  102. }
  103. idReplyMap, err := s.IDReplyMap(ctx, sub, parentChildrenIDRelation)
  104. if err != nil {
  105. return nil, err
  106. }
  107. rootIDReplyMap := assemble(idReplyMap, parentChildrenIDRelation)
  108. if adminTopReply != nil {
  109. if r, ok := rootIDReplyMap[adminTopReply.RpID]; ok {
  110. adminTopReply = r
  111. }
  112. // For historic reasons, TopReply and HotReply may be overlapped
  113. hotIDs = Remove(hotIDs, adminTopReply.RpID)
  114. }
  115. if upperTopReply != nil {
  116. if r, ok := rootIDReplyMap[upperTopReply.RpID]; ok {
  117. upperTopReply = r
  118. }
  119. // For historic reasons, TopReply and HotReply may be overlapped
  120. hotIDs = Remove(hotIDs, upperTopReply.RpID)
  121. }
  122. return &model.RootReplyListHeader{
  123. TopAdmin: adminTopReply,
  124. TopUpper: upperTopReply,
  125. Hots: filterHot(Fetch(rootIDReplyMap, hotIDs), params.HotSize),
  126. }, nil
  127. }
  128. func filterHot(rs []*model.Reply, maxSize int) (hots []*model.Reply) {
  129. for _, r := range rs {
  130. if r.Like >= 3 {
  131. hots = append(hots, r)
  132. }
  133. }
  134. if hots == nil {
  135. hots = _emptyReplies
  136. } else if len(hots) > maxSize {
  137. hots = hots[:maxSize]
  138. }
  139. return hots
  140. }
  141. func needHeader(cursor *model.Cursor, rootLen int) bool {
  142. return cursor.Latest() ||
  143. (cursor.Increase() && rootLen < int(cursor.Len()))
  144. }
  145. // NeedInsertPendingReply NeedInsertPendingReply
  146. func NeedInsertPendingReply(params *model.CursorParams, sub *model.Subject) bool {
  147. return params.Mid > 0 &&
  148. params.Sort == model.SortByFloor &&
  149. sub.AttrVal(model.SubAttrAudit) == model.AttrYes
  150. }
  151. func collect(r *model.Reply, allIDs []int64, allMIDs []int64,
  152. allReply []*model.Reply) ([]int64, []int64, []*model.Reply) {
  153. if r == nil {
  154. return nil, nil, nil
  155. }
  156. allIDs = append(allIDs, r.RpID)
  157. allReply = append(allReply, r)
  158. allMIDs = append(allMIDs, r.Mid)
  159. if r.Content != nil {
  160. for _, mid := range r.Content.Ats {
  161. allMIDs = append(allMIDs, mid)
  162. }
  163. }
  164. return allIDs, allMIDs, allReply
  165. }
  166. // IDReplyMap IDReplyMap
  167. func (s *Service) IDReplyMap(ctx context.Context, sub *model.Subject,
  168. parentChildrenIDRelation map[int64][]int64) (map[int64]*model.Reply, error) {
  169. var allIDs []int64
  170. for parentID, childrenIDs := range parentChildrenIDRelation {
  171. allIDs = append(allIDs, childrenIDs...)
  172. allIDs = append(allIDs, parentID)
  173. }
  174. // WARNING: GetReplyByIDs should not contains subReplies, but currently there
  175. // exists a bug, which makes `idReplyMap` may contains sub_reply
  176. idReplyMap, err := s.GetReplyByIDs(ctx, sub.Oid, sub.Type, allIDs)
  177. if err != nil {
  178. return nil, err
  179. }
  180. // temporary solution :(, remove all children replies
  181. for _, reply := range idReplyMap {
  182. if reply.Replies != nil {
  183. reply.Replies = reply.Replies[:0]
  184. }
  185. }
  186. return idReplyMap, nil
  187. }
  188. // RootReplyListByCursor RootReplyListByCursor
  189. func (s *Service) RootReplyListByCursor(ctx context.Context, sub *model.Subject, params *model.CursorParams) ([]*model.Reply, error) {
  190. var parentIDs []int64
  191. if params.Cursor.Latest() {
  192. // 忽略错误,这个请求只为了增加统计数据
  193. s.replyFeed(ctx, params.Mid, 1, 20)
  194. } else {
  195. s.replyFeed(ctx, params.Mid, 2, 20)
  196. }
  197. rootIDs, err := s.GetRootReplyIDsByCursor(ctx, sub, params.Sort, params.Cursor)
  198. if err != nil {
  199. return nil, err
  200. }
  201. // 老版本折叠评论的逻辑
  202. if params.ShowFolded && sub.HasFolded() {
  203. foldedrpIDs, _ := s.foldedRepliesCursor(ctx, sub, 0, params.Cursor)
  204. if len(foldedrpIDs) > 0 {
  205. rootIDs = append(rootIDs, foldedrpIDs...)
  206. sort.Slice(rootIDs, func(x, y int) bool { return rootIDs[x] > rootIDs[y] })
  207. length := len(rootIDs)
  208. if length > params.Cursor.Len() {
  209. if params.Cursor.Increase() {
  210. // 对于根评论列表,往楼层大的方向翻页是向上翻,需要从后往前截断
  211. rootIDs = rootIDs[length-params.Cursor.Len():]
  212. } else {
  213. rootIDs = rootIDs[:params.Cursor.Len()]
  214. }
  215. }
  216. }
  217. }
  218. parentIDs = append(parentIDs, rootIDs...)
  219. parentChildrenIDRelation, err := s.ParentChildrenReplyIDRelation(ctx, sub, parentIDs)
  220. if err != nil {
  221. return nil, err
  222. }
  223. idReplyMap, err := s.IDReplyMap(ctx, sub, parentChildrenIDRelation)
  224. if err != nil {
  225. return nil, err
  226. }
  227. if NeedInsertPendingReply(params, sub) {
  228. // WARNING: here we assume that pending replies have no children
  229. // otherwise, we need to change logic here
  230. pendingIDReplyMap, err := s.GetPendingReply(ctx, params.Mid, sub.Oid, sub.Type)
  231. if err != nil {
  232. return nil, err
  233. }
  234. for id, r := range pendingIDReplyMap {
  235. if r.IsRoot() && !r.IsTop() {
  236. // insert pending reply into root reply list
  237. if params.Cursor.Latest() &&
  238. ((len(rootIDs) > 0 && id > rootIDs[0]) || len(rootIDs) == 0) {
  239. // when fetch latest reply list, and root reply list's length < the default size
  240. // and the pending reply ID > the max rootID
  241. // then just append the pending reply
  242. rootIDs = append([]int64{id}, rootIDs...)
  243. if len(rootIDs) > int(params.Cursor.Len()) {
  244. rootIDs = rootIDs[:params.Cursor.Len()]
  245. }
  246. } else {
  247. // otherwise, we need an algorithm to insert pending replyID into
  248. // rootIDs
  249. rootIDs = InsertInto(rootIDs, id, int(params.Cursor.Len()), model.OrderDESC)
  250. }
  251. parentChildrenIDRelation[id] = []int64{}
  252. } else if _, ok := idReplyMap[r.Root]; ok {
  253. // insert pending reply into sub reply list
  254. parentChildrenIDRelation[r.Root] = InsertInto(parentChildrenIDRelation[r.Root], id, defaultChildrenSize, model.OrderASC)
  255. } else {
  256. continue
  257. }
  258. sub.ACount++
  259. idReplyMap[id] = r
  260. }
  261. }
  262. return Fetch(assemble(idReplyMap, parentChildrenIDRelation), rootIDs), nil
  263. }
  264. // Remove Remove
  265. func Remove(arr []int64, k int64) []int64 {
  266. b := arr[:0]
  267. for _, a := range arr {
  268. if a != k {
  269. b = append(b, a)
  270. }
  271. }
  272. return b
  273. }
  274. // Unique Unique
  275. func Unique(arr []int64) []int64 {
  276. m := make(map[int64]struct{})
  277. for _, a := range arr {
  278. m[a] = struct{}{}
  279. }
  280. res := make([]int64, 0)
  281. for a := range m {
  282. res = append(res, a)
  283. }
  284. return res
  285. }
  286. // GetTopReply GetTopReply
  287. func (s *Service) GetTopReply(ctx context.Context, oid int64, otyp int8, topType uint32) (*model.Reply, error) {
  288. r, err := s.dao.Mc.GetTop(ctx, oid, otyp, topType)
  289. if err != nil {
  290. return nil, err
  291. }
  292. if r == nil {
  293. s.dao.Databus.AddTop(ctx, oid, otyp, topType)
  294. return nil, nil
  295. }
  296. return r, nil
  297. }
  298. // GetReplyFromDBByIDs GetReplyFromDBByIDs
  299. func (s *Service) GetReplyFromDBByIDs(ctx context.Context, oid int64, otyp int8, ids []int64) ([]*model.Reply, error) {
  300. rs := make([]*model.Reply, 0)
  301. if len(ids) == 0 {
  302. return rs, nil
  303. }
  304. idReplyMap, err := s.dao.Reply.GetByIds(ctx, oid, otyp, ids)
  305. if err != nil {
  306. return nil, err
  307. }
  308. idReplyContentMap, err := s.dao.Content.GetByIds(ctx, oid, ids)
  309. if err != nil {
  310. return nil, err
  311. }
  312. for _, id := range ids {
  313. if r, ok := idReplyMap[id]; ok {
  314. if r == nil {
  315. rs = append(rs, nil)
  316. continue
  317. }
  318. if content, ok := idReplyContentMap[id]; ok {
  319. r.Content = content
  320. }
  321. rs = append(rs, r)
  322. }
  323. }
  324. return rs, nil
  325. }
  326. // GetReplyByIDs GetReplyByIDs
  327. func (s *Service) GetReplyByIDs(ctx context.Context, oid int64, otyp int8, ids []int64) (map[int64]*model.Reply, error) {
  328. res := make(map[int64]*model.Reply)
  329. if len(ids) == 0 {
  330. return res, nil
  331. }
  332. cachedReplies, missedIDs, err := s.dao.Mc.GetReplyByIDs(ctx, ids)
  333. var rs []*model.Reply
  334. if err != nil {
  335. rs, err = s.GetReplyFromDBByIDs(ctx, oid, otyp, ids)
  336. if err != nil {
  337. return nil, err
  338. }
  339. for _, r := range rs {
  340. res[r.RpID] = r
  341. }
  342. return res, nil
  343. }
  344. for _, r := range cachedReplies {
  345. res[r.RpID] = r
  346. }
  347. if len(missedIDs) == 0 {
  348. return res, nil
  349. }
  350. missedReplies, err := s.GetReplyFromDBByIDs(ctx, oid, otyp, missedIDs)
  351. if err != nil {
  352. return nil, err
  353. }
  354. select {
  355. case s.replyChan <- replyChan{rps: missedReplies}:
  356. default:
  357. log.Error("s.replyChan is full")
  358. }
  359. for _, r := range missedReplies {
  360. res[r.RpID] = r.Clone()
  361. }
  362. return res, nil
  363. }
  364. // GetChildrenIDsByCursor GetChildrenIDsByCursor
  365. func (s *Service) GetChildrenIDsByCursor(ctx context.Context, sub *model.Subject, rootID int64, sort int8, cursor *model.Cursor) ([]int64, error) {
  366. k := reply.GenNewChildrenKeyByRootReplyID(rootID)
  367. cacheExist, err := s.dao.Redis.ExpireCache(ctx, k)
  368. if err != nil {
  369. return nil, err
  370. }
  371. var ids []int64
  372. if cacheExist {
  373. ids, err = s.dao.Redis.RangeChildrenIDByCursorScore(ctx, k, cursor)
  374. if err != nil {
  375. return nil, err
  376. }
  377. return ids, nil
  378. }
  379. s.dao.Databus.RecoverIndexByRoot(ctx, sub.Oid, rootID, sub.Type)
  380. switch sort {
  381. case model.SortByFloor:
  382. ids, err = s.dao.Reply.ChildrenIDSortByFloorCursor(ctx, sub.Oid, sub.Type, rootID, cursor)
  383. default:
  384. return nil, ecode.RequestErr
  385. }
  386. if err != nil {
  387. return nil, err
  388. }
  389. return ids, nil
  390. }
  391. // GetRootReplyIDsByCursor GetRootReplyIDsByCursor
  392. func (s *Service) GetRootReplyIDsByCursor(ctx context.Context, sub *model.Subject, sort int8, cursor *model.Cursor) ([]int64, error) {
  393. var (
  394. ids []int64
  395. isEnd bool
  396. )
  397. if sub.RCount == 0 {
  398. return []int64{}, nil
  399. }
  400. k := s.dao.Redis.CacheKeyRootReplyIDs(sub.Oid, sub.Type, sort)
  401. cacheExist, err := s.dao.Redis.ExpireCache(ctx, k)
  402. if err != nil {
  403. return nil, err
  404. }
  405. minFloor := cursor.Current() - 20
  406. if cursor.Latest() {
  407. minFloor = int64(sub.Count) - 20
  408. }
  409. if minFloor <= 0 {
  410. minFloor = 1
  411. }
  412. if cacheExist {
  413. ids, isEnd, err = s.dao.Redis.RangeRootIDByCursorScore(ctx, k, cursor)
  414. if err != nil {
  415. return nil, err
  416. }
  417. if sort == model.SortByFloor && len(ids) < cursor.Len() && !cursor.Increase() && !isEnd {
  418. ids, err = s.dao.Reply.RootIDSortByFloorCursor(ctx, sub.Oid, sub.Type, cursor)
  419. if err != nil {
  420. return nil, err
  421. }
  422. s.dao.Databus.RecoverFloorIdx(ctx, sub.Oid, sub.Type, int(minFloor), true)
  423. }
  424. return ids, nil
  425. }
  426. switch sort {
  427. case model.SortByFloor:
  428. s.dao.Databus.RecoverFloorIdx(ctx, sub.Oid, sub.Type, int(minFloor), true)
  429. ids, err = s.dao.Reply.RootIDSortByFloorCursor(ctx, sub.Oid, sub.Type, cursor)
  430. default:
  431. return nil, ecode.RequestErr
  432. }
  433. if err != nil {
  434. return nil, err
  435. }
  436. return ids, nil
  437. }
  438. // GetRootReplyIDs GetRootReplyIDs
  439. func (s *Service) GetRootReplyIDs(ctx context.Context, oid int64, otyp int8, sort int8, offset, limit int64) ([]int64, error) {
  440. var ids []int64
  441. k := s.dao.Redis.CacheKeyRootReplyIDs(oid, otyp, sort)
  442. cacheExist, err := s.dao.Redis.ExpireCache(ctx, k)
  443. if err != nil {
  444. return nil, err
  445. }
  446. if cacheExist {
  447. ids, err = s.dao.Redis.RangeRootReplyIDs(ctx, k, int(offset), int(offset+limit-1))
  448. if err != nil {
  449. return nil, err
  450. }
  451. return ids, nil
  452. }
  453. s.dao.Databus.RecoverIndex(ctx, oid, otyp, sort)
  454. switch sort {
  455. case model.SortByFloor:
  456. ids, err = s.dao.Reply.GetIdsSortFloor(ctx, oid, otyp, int(offset), int(limit))
  457. case model.SortByCount:
  458. ids, err = s.dao.Reply.GetIdsSortCount(ctx, oid, otyp, int(offset), int(limit))
  459. case model.SortByLike:
  460. ids, err = s.dao.Reply.GetIdsSortLike(ctx, oid, otyp, int(offset), int(limit))
  461. default:
  462. log.Error("unsupported sort:%d", sort)
  463. return nil, ecode.RequestErr
  464. }
  465. if err != nil {
  466. return nil, err
  467. }
  468. return ids, nil
  469. }
  470. // GetSubject GetSubject
  471. func (s *Service) GetSubject(ctx context.Context, oid int64, tp int8) (*model.Subject, error) {
  472. subject, err := s.getSubject(ctx, oid, tp)
  473. if err != nil {
  474. return nil, err
  475. }
  476. if subject.State == model.SubStateForbid {
  477. return nil, ecode.ReplyForbidReply
  478. }
  479. return subject, nil
  480. }
  481. func elementOf(k int64, arr []int64) bool {
  482. for _, i := range arr {
  483. if i == k {
  484. return true
  485. }
  486. }
  487. return false
  488. }
  489. // InsertInto insert `id` into sorted list(order by `cmp`) `ids`
  490. // after insertion if the total length > `size`, truncate the extra element
  491. func InsertInto(ids []int64, id int64, size int, cmp model.Comp) []int64 {
  492. if elementOf(id, ids) {
  493. return ids
  494. }
  495. if len(ids) < size {
  496. return model.SortArr(append(ids, id), cmp)
  497. }
  498. ids = model.SortArr(ids, cmp)
  499. if !withInRange(id, ids[0], ids[len(ids)-1]) {
  500. return ids
  501. }
  502. for i := 0; i < len(ids); i++ {
  503. if cmp(id, ids[i]) {
  504. ids = append(ids[:i], append([]int64{id}, ids[i:]...)...)
  505. break
  506. }
  507. }
  508. return ids[:size]
  509. }
  510. func withInRange(i, begin, end int64) bool {
  511. return (begin > i && end < i) || (begin < i && end > i)
  512. }
  513. // FillRootReplies FillRootReplies
  514. func (s *Service) FillRootReplies(ctx context.Context,
  515. rs []*model.Reply,
  516. mid int64,
  517. ip string,
  518. htmlEscape bool,
  519. sub *model.Subject) {
  520. var (
  521. allReply []*model.Reply
  522. allIDs, allMIDs []int64
  523. )
  524. if mid > 0 {
  525. allMIDs = append(allMIDs, mid)
  526. }
  527. for _, r := range rs {
  528. allIDs, allMIDs, allReply = collect(r, allIDs, allMIDs, allReply)
  529. for _, rr := range r.Replies {
  530. allIDs, allMIDs, allReply = collect(rr, allIDs, allMIDs, allReply)
  531. }
  532. }
  533. s.fillReplies(ctx, sub, allIDs, allReply, Unique(allMIDs), mid, ip, htmlEscape)
  534. }
  535. func (s *Service) fillReplies(ctx context.Context,
  536. sub *model.Subject,
  537. allReplyIDs []int64,
  538. rs []*model.Reply,
  539. mids []int64,
  540. reqMid int64,
  541. ip string,
  542. htmlEscape bool) {
  543. var (
  544. actionMap map[int64]int8
  545. blackedMap map[int64]bool
  546. relationMap map[int64]*accmdl.RelationReply
  547. assistMap map[int64]int
  548. fansMap map[int64]*model.FansDetail
  549. accountMap map[int64]*accmdl.Card
  550. )
  551. g := errgroup.WithContext(ctx)
  552. if reqMid > 0 {
  553. g.Go(func(ctx context.Context) error {
  554. actionMap, _ = s.actions(ctx, reqMid, sub.Oid, allReplyIDs)
  555. return nil
  556. })
  557. g.Go(func(ctx context.Context) error {
  558. relationMap, _ = s.GetRelationMap(ctx, reqMid, mids, ip)
  559. return nil
  560. })
  561. g.Go(func(ctx context.Context) error {
  562. blackedMap, _ = s.GetBlacklistMap(ctx, reqMid, ip)
  563. return nil
  564. })
  565. }
  566. g.Go(func(ctx context.Context) error {
  567. accountMap, _ = s.GetAccountInfoMap(ctx, mids, ip)
  568. return nil
  569. })
  570. if !(s.IsWhiteAid(sub.Oid, sub.Type)) {
  571. g.Go(func(ctx context.Context) error {
  572. assistMap, _ = s.GetAssistMap(ctx, sub.Mid, ip)
  573. return nil
  574. })
  575. g.Go(func(ctx context.Context) error {
  576. fansMap, _ = s.GetFansMap(ctx, mids, sub.Mid, ip)
  577. return nil
  578. })
  579. }
  580. g.Wait()
  581. for _, r := range rs {
  582. s.fillReply(r,
  583. htmlEscape,
  584. accountMap,
  585. actionMap,
  586. fansMap,
  587. blackedMap,
  588. assistMap,
  589. relationMap)
  590. }
  591. }
  592. func (s *Service) fillReply(r *model.Reply,
  593. escape bool,
  594. accountMap map[int64]*accmdl.Card,
  595. actionMap map[int64]int8,
  596. fansMap map[int64]*model.FansDetail,
  597. blackedMap map[int64]bool,
  598. assistMap map[int64]int,
  599. relationMap map[int64]*accmdl.RelationReply) {
  600. if r == nil {
  601. return
  602. }
  603. r.FillFolder()
  604. r.FillStr(escape)
  605. if r.Content != nil {
  606. r.Content.FillAts(accountMap)
  607. }
  608. r.Action = actionMap[r.RpID]
  609. r.Member = new(model.Member)
  610. var (
  611. ok bool
  612. blacked bool
  613. card *accmdl.Card
  614. )
  615. if card, ok = accountMap[r.Mid]; ok {
  616. r.Member.Info = new(model.Info)
  617. r.Member.Info.FromCard(card)
  618. } else {
  619. r.Member.Info = new(model.Info)
  620. *r.Member.Info = *s.defMember
  621. r.Member.Info.Mid = strconv.FormatInt(r.Mid, 10)
  622. }
  623. if r.Member.FansDetail, ok = fansMap[r.Mid]; ok {
  624. r.FansGrade = r.Member.FansDetail.Status
  625. }
  626. if blacked, ok = blackedMap[r.Mid]; ok && blacked {
  627. r.State = model.ReplyStateBlacklist
  628. }
  629. if r.Replies == nil {
  630. r.Replies = []*model.Reply{}
  631. }
  632. if _, ok = assistMap[r.Mid]; ok {
  633. r.Assist = 1
  634. }
  635. if attetion, ok := relationMap[r.Mid]; ok {
  636. if attetion.Following {
  637. r.Member.Following = 1
  638. }
  639. }
  640. if r.RCount < 0 {
  641. r.RCount = 0
  642. }
  643. }
  644. // Fetch Fetch
  645. func Fetch(idReplyMap map[int64]*model.Reply, ids []int64) []*model.Reply {
  646. res := make([]*model.Reply, 0, len(ids))
  647. for _, pid := range ids {
  648. if p, ok := idReplyMap[pid]; ok && p != nil {
  649. res = append(res, p)
  650. }
  651. }
  652. return res
  653. }
  654. // assemble insert children replies into their corresponding parents
  655. func assemble(idReplyMap map[int64]*model.Reply, parentChildrenMap map[int64][]int64) map[int64]*model.Reply {
  656. parentIDs := make([]int64, 0)
  657. for pid := range parentChildrenMap {
  658. parentIDs = append(parentIDs, pid)
  659. }
  660. res := make(map[int64]*model.Reply)
  661. for _, pid := range parentIDs {
  662. if p, ok := idReplyMap[pid]; ok {
  663. if childrenIDs, ok := parentChildrenMap[pid]; ok {
  664. for _, childID := range childrenIDs {
  665. if r, ok := idReplyMap[childID]; ok {
  666. p.Replies = append(p.Replies, r)
  667. }
  668. }
  669. }
  670. res[pid] = p
  671. }
  672. }
  673. return res
  674. }
  675. // ParentChildrenReplyIDRelation ParentChildrenReplyIDRelation
  676. func (s *Service) ParentChildrenReplyIDRelation(ctx context.Context, sub *model.Subject, parentIDs []int64) (map[int64][]int64, error) {
  677. idReplyMap, err := s.GetReplyByIDs(ctx, sub.Oid, sub.Type, parentIDs)
  678. if err != nil {
  679. return nil, err
  680. }
  681. var parentWithChildren, parentWithoutChildren []int64
  682. for id, reply := range idReplyMap {
  683. if reply.RCount > 0 {
  684. parentWithChildren = append(parentWithChildren, id)
  685. } else {
  686. parentWithoutChildren = append(parentWithoutChildren, id)
  687. }
  688. }
  689. parentChildrenIDRelation, err := s.parentChildrenReplyIDRelation(ctx, sub.Oid, sub.Type, parentWithChildren)
  690. if err != nil {
  691. return nil, err
  692. }
  693. for _, pid := range parentWithoutChildren {
  694. parentChildrenIDRelation[pid] = []int64{}
  695. }
  696. return parentChildrenIDRelation, nil
  697. }
  698. func (s *Service) parentChildrenReplyIDRelation(ctx context.Context, oid int64,
  699. tp int8, parentIDs []int64) (map[int64][]int64, error) {
  700. parentChildrenIDRelation, missedIDs, err := s.dao.Redis.ParentChildrenReplyIDMap(ctx, parentIDs, 0, 4)
  701. if err != nil {
  702. return nil, err
  703. }
  704. if len(missedIDs) > 0 {
  705. for _, rootID := range missedIDs {
  706. childrenIDs, err := s.dao.Reply.ChildrenIDsOfRootReply(ctx, oid, rootID, tp, 0, defaultChildrenSize)
  707. if err != nil {
  708. return nil, err
  709. }
  710. parentChildrenIDRelation[rootID] = childrenIDs
  711. s.dao.Databus.RecoverIndexByRoot(ctx, oid, rootID, tp)
  712. }
  713. }
  714. return parentChildrenIDRelation, nil
  715. }
  716. // GetAccountInfoMap fn
  717. func (s *Service) GetAccountInfoMap(ctx context.Context, mids []int64, ip string) (map[int64]*accmdl.Card, error) {
  718. if len(mids) == 0 {
  719. return _emptyCards, nil
  720. }
  721. args := &accmdl.MidsReq{Mids: mids}
  722. res, err := s.acc.Cards3(ctx, args)
  723. if err != nil {
  724. log.Error("s.acc.Infos2(%v), error(%v)", args, err)
  725. return nil, err
  726. }
  727. return res.Cards, nil
  728. }
  729. // GetFansMap fn
  730. func (s *Service) GetFansMap(ctx context.Context, uids []int64, mid int64, ip string) (map[int64]*model.FansDetail, error) {
  731. fans, err := s.fans.Fetch(ctx, uids, mid, time.Now())
  732. if err != nil {
  733. return nil, err
  734. }
  735. return fans, nil
  736. }
  737. // GetAssistMap fn
  738. func (s *Service) GetAssistMap(ctx context.Context, mid int64, ip string) (assistMap map[int64]int, err error) {
  739. arg := &assmdl.ArgAssists{
  740. Mid: mid,
  741. RealIP: ip,
  742. }
  743. assistMap = make(map[int64]int)
  744. ids, err := s.assist.AssistIDs(ctx, arg)
  745. if err != nil {
  746. log.Error("s.assist.AssistIDs(%v), error(%v)", arg, err)
  747. return
  748. }
  749. for _, id := range ids {
  750. assistMap[id] = 1
  751. }
  752. return
  753. }
  754. // GetRelationMap GetRelationMap
  755. func (s *Service) GetRelationMap(ctx context.Context, mid int64, targetMids []int64, ip string) (map[int64]*accmdl.RelationReply, error) {
  756. if len(targetMids) == 0 {
  757. return _emptyRelations, nil
  758. }
  759. relations, err := s.acc.Relations3(ctx, &accmdl.RelationsReq{Mid: mid, Owners: targetMids, RealIp: ip})
  760. if err != nil {
  761. log.Error("s.acc.Relations2(%v, %v) error(%v)", mid, targetMids, err)
  762. return nil, err
  763. }
  764. return relations.Relations, nil
  765. }
  766. // GetBlacklistMap GetBlacklistMap
  767. func (s *Service) GetBlacklistMap(ctx context.Context,
  768. mid int64, ip string) (map[int64]bool, error) {
  769. if mid == 0 {
  770. return _emptyBlackList, nil
  771. }
  772. args := &accmdl.MidReq{Mid: mid}
  773. blacklistMap, err := s.acc.Blacks3(ctx, args)
  774. if err != nil {
  775. log.Error("s.acc.Blacks(%v) error(%v)", args, err)
  776. return nil, err
  777. }
  778. return blacklistMap.BlackList, nil
  779. }
  780. // GetPendingReply GetPendingReply
  781. func (s *Service) GetPendingReply(ctx context.Context, mid int64, oid int64, typ int8) (map[int64]*model.Reply, error) {
  782. // WARNING: here we assume that pending replies have no children
  783. // otherwise, we need to change logic here
  784. pendingIDs, err := s.dao.Redis.UserAuditReplies(ctx, mid, oid, typ)
  785. if err != nil {
  786. return nil, err
  787. }
  788. pendingIDReplyMap, err := s.GetReplyByIDs(ctx, oid, typ, pendingIDs)
  789. if err != nil {
  790. return nil, err
  791. }
  792. return pendingIDReplyMap, nil
  793. }
  794. // GetSubReplyListByCursor GetSubReplyListByCursor
  795. func (s *Service) GetSubReplyListByCursor(ctx context.Context, params *model.CursorParams) (*model.RootReplyList, error) {
  796. var (
  797. hasFolded bool
  798. )
  799. sub, err := s.Subject(ctx, params.Oid, params.OTyp)
  800. if err != nil {
  801. return nil, err
  802. }
  803. rp, err := s.ReplyContent(ctx, params.Oid, params.RootID, params.OTyp)
  804. if err != nil {
  805. return nil, err
  806. }
  807. if rp.IsRoot() && rp.HasFolded() {
  808. hasFolded = true
  809. }
  810. if rp.Root != 0 {
  811. params.RootID = rp.Root
  812. root, _ := s.reply(ctx, 0, params.Oid, rp.Root, params.OTyp)
  813. if root != nil && rp.IsRoot() && rp.HasFolded() {
  814. hasFolded = true
  815. }
  816. }
  817. childrenIDs, err := s.GetChildrenIDsByCursor(ctx, sub, params.RootID, params.Sort, params.Cursor)
  818. if err != nil {
  819. return nil, err
  820. }
  821. // 这里是处理被折叠的评论的逻辑
  822. if params.ShowFolded && hasFolded {
  823. foldedRpIDs, _ := s.foldedRepliesCursor(ctx, sub, params.RootID, params.Cursor)
  824. if len(foldedRpIDs) > 0 {
  825. childrenIDs = append(childrenIDs, foldedRpIDs...)
  826. sort.Slice(childrenIDs, func(x, y int) bool { return childrenIDs[x] < childrenIDs[y] })
  827. length := len(childrenIDs)
  828. if length > params.Cursor.Len() {
  829. if params.Cursor.Descrease() {
  830. // 往楼层小的地方翻页, 对于子评论就是往上翻页,这个时候要从后往前截断
  831. childrenIDs = childrenIDs[length-params.Cursor.Len():]
  832. } else {
  833. childrenIDs = childrenIDs[:params.Cursor.Len()]
  834. }
  835. }
  836. }
  837. }
  838. parentChildrenIDRelation := map[int64][]int64{params.RootID: childrenIDs}
  839. idReplyMap, err := s.IDReplyMap(ctx, sub, parentChildrenIDRelation)
  840. if err != nil {
  841. return nil, err
  842. }
  843. if NeedInsertPendingReply(params, sub) {
  844. var pendingIDReplyMap map[int64]*model.Reply
  845. pendingIDReplyMap, err = s.GetPendingReply(ctx, params.Mid, sub.Oid, sub.Type)
  846. if err != nil {
  847. return nil, err
  848. }
  849. for id, r := range pendingIDReplyMap {
  850. if _, ok := idReplyMap[r.Root]; ok {
  851. parentChildrenIDRelation[r.Root] = InsertInto(parentChildrenIDRelation[r.Root], id, defaultChildrenSize, model.OrderASC)
  852. sub.ACount++
  853. idReplyMap[id] = r
  854. }
  855. }
  856. }
  857. rootReply := assemble(idReplyMap, parentChildrenIDRelation)[params.RootID]
  858. if rootReply == nil || rootReply.IsDeleted() {
  859. return nil, ecode.ReplyNotExist
  860. }
  861. max, min, err := cursorRange(rootReply.Replies, params.Sort)
  862. if err != nil {
  863. return nil, err
  864. }
  865. s.FillRootReplies(ctx, []*model.Reply{rootReply}, params.Mid, params.IP, params.HTMLEscape, sub)
  866. return &model.RootReplyList{
  867. Subject: sub,
  868. Roots: []*model.Reply{rootReply},
  869. CursorRangeMax: max,
  870. CursorRangeMin: min,
  871. }, nil
  872. }
  873. func cursorRange(rs []*model.Reply, sort int8) (max, min int64, err error) {
  874. if len(rs) > 0 {
  875. switch sort {
  876. case model.SortByFloor:
  877. // NOTE("这里是为了12月13号给bishi搞零时置顶子评论做的ios兼容逻辑")
  878. var head int64
  879. if rs[0].RpID != 1237270231 {
  880. head = int64(rs[0].Floor)
  881. } else {
  882. if len(rs) > 1 {
  883. head = int64(rs[1].Floor)
  884. } else {
  885. head = int64(1)
  886. }
  887. }
  888. tail := int64(rs[len(rs)-1].Floor)
  889. if model.OrderDESC(head, tail) {
  890. max, min = head, tail
  891. } else {
  892. max, min = tail, head
  893. }
  894. return
  895. default:
  896. err = errors.New("unsupported cursor type")
  897. log.Error("%v", err)
  898. return 0, 0, err
  899. }
  900. }
  901. return
  902. }
  903. // GetRootReplyListByCursor GetRootReplyListByCursor
  904. func (s *Service) GetRootReplyListByCursor(ctx context.Context, params *model.CursorParams) (*model.RootReplyList, error) {
  905. params.HotSize = s.hotNum(params.Oid, params.OTyp)
  906. sub, err := s.Subject(ctx, params.Oid, params.OTyp)
  907. if err != nil {
  908. return nil, err
  909. }
  910. roots, err := s.RootReplyListByCursor(ctx, sub, params)
  911. if err != nil {
  912. return nil, err
  913. }
  914. max, min, err := cursorRange(roots, params.Sort)
  915. if err != nil {
  916. return nil, err
  917. }
  918. // WARN: rootIDs AND hotIDs may be overlapped
  919. var allRootReply []*model.Reply
  920. allRootReply = append(allRootReply, roots...)
  921. var header *model.RootReplyListHeader
  922. if needHeader(params.Cursor, len(roots)) {
  923. header, err = s.GetRootReplyListHeader(ctx, sub, params)
  924. if err != nil {
  925. return nil, err
  926. }
  927. allRootReply = append(allRootReply, header.Hots...)
  928. if header.TopAdmin != nil {
  929. allRootReply = append(allRootReply, header.TopAdmin)
  930. }
  931. if header.TopUpper != nil {
  932. allRootReply = append(allRootReply, header.TopUpper)
  933. }
  934. }
  935. s.FillRootReplies(ctx, allRootReply, params.Mid, params.IP, params.HTMLEscape, sub)
  936. return &model.RootReplyList{
  937. Subject: sub,
  938. Roots: roots,
  939. Header: header,
  940. CursorRangeMax: max,
  941. CursorRangeMin: min,
  942. }, nil
  943. }
  944. // DialogMaxMinFloor return max and min floor in dialog
  945. func (s *Service) DialogMaxMinFloor(c context.Context, oid int64, tp int8, root, dialog int64) (maxFloor, minFloor int, err error) {
  946. var (
  947. ok bool
  948. )
  949. if ok, err = s.dao.Redis.ExpireDialogIndex(c, dialog); err != nil {
  950. log.Error("s.dao.Redis.ExpireDialogIndex error (%v)", err)
  951. return
  952. }
  953. if ok {
  954. minFloor, maxFloor, err = s.dao.Redis.DialogMinMaxFloor(c, dialog)
  955. } else {
  956. minFloor, maxFloor, err = s.dao.Reply.GetDialogMinMaxFloor(c, oid, tp, root, dialog)
  957. }
  958. return
  959. }
  960. // DialogByCursor ...
  961. func (s *Service) DialogByCursor(c context.Context, mid, oid int64, tp int8, root, dialog int64, cursor *model.Cursor) (rps []*model.Reply, dialogCursor *model.DialogCursor, dialogMeta *model.DialogMeta, err error) {
  962. var (
  963. ok bool
  964. rpIDs []int64
  965. rpMap map[int64]*model.Reply
  966. )
  967. dialogCursor = new(model.DialogCursor)
  968. dialogMeta = new(model.DialogMeta)
  969. dialogMeta.MaxFloor, dialogMeta.MinFloor, err = s.DialogMaxMinFloor(c, oid, tp, root, dialog)
  970. if err != nil {
  971. log.Error("get max and min floor for dialog from redis or db error", err)
  972. return
  973. }
  974. if (cursor.Max() != 0 && cursor.Max() > int64(dialogMeta.MaxFloor)) || (cursor.Min() != 0 && cursor.Min() < int64(dialogMeta.MinFloor)) {
  975. log.Warn("cursor max %d min %d, dialogmeta max %d min %d", cursor.Max(), cursor.Min(), dialogMeta.MinFloor, dialogMeta.MinFloor)
  976. err = ecode.RequestErr
  977. return
  978. }
  979. if ok, err = s.dao.Redis.ExpireDialogIndex(c, dialog); err != nil {
  980. log.Error("s.dao.Redis.ExpireDialogIndex error (%v)", err)
  981. return
  982. }
  983. if ok {
  984. rpIDs, err = s.dao.Redis.DialogByCursor(c, dialog, cursor)
  985. } else {
  986. s.dao.Databus.RecoverDialogIdx(c, oid, tp, root, dialog)
  987. if cursor.Latest() {
  988. rpIDs, err = s.dao.Reply.GetIDsByDialogAsc(c, oid, tp, root, dialog, int64(dialogMeta.MinFloor), cursor.Len())
  989. } else if cursor.Descrease() {
  990. rpIDs, err = s.dao.Reply.GetIDsByDialogDesc(c, oid, tp, root, dialog, cursor.Current(), cursor.Len())
  991. } else if cursor.Increase() {
  992. rpIDs, err = s.dao.Reply.GetIDsByDialogAsc(c, oid, tp, root, dialog, cursor.Current(), cursor.Len())
  993. } else {
  994. err = ecode.RequestErr
  995. }
  996. }
  997. if err != nil {
  998. log.Error("dialog by cursor from redis or db error (%v)", err)
  999. return
  1000. }
  1001. rpMap, err = s.repliesMap(c, oid, tp, rpIDs)
  1002. if err != nil {
  1003. return
  1004. }
  1005. for _, rpid := range rpIDs {
  1006. if r, ok := rpMap[rpid]; ok {
  1007. rps = append(rps, r)
  1008. }
  1009. }
  1010. if !sort.SliceIsSorted(rps, func(i, j int) bool { return rps[i].Floor < rps[j].Floor }) {
  1011. sort.Slice(rps, func(i, j int) bool { return rps[i].Floor < rps[j].Floor })
  1012. }
  1013. sub, err := s.Subject(c, oid, tp)
  1014. if err != nil {
  1015. log.Error("s.dao.Subject.Get(%d, %d) error(%v)", oid, tp, err)
  1016. return
  1017. }
  1018. if err = s.buildReply(c, sub, rps, mid, false); err != nil {
  1019. return
  1020. }
  1021. dialogCursor.Size = len(rps)
  1022. if dialogCursor.Size == 0 {
  1023. return
  1024. }
  1025. dialogCursor.MinFloor = rps[0].Floor
  1026. dialogCursor.MaxFloor = rps[dialogCursor.Size-1].Floor
  1027. return
  1028. }
  1029. // ...
  1030. func (s *Service) foldedRepliesCursor(c context.Context, sub *model.Subject, root int64, cursor *model.Cursor) (foldedRpIDs []int64, err error) {
  1031. var (
  1032. xcursor = new(xmodel.Cursor)
  1033. max = int(cursor.Max())
  1034. min = int(cursor.Min())
  1035. )
  1036. xcursor.Ps = cursor.Len()
  1037. // 针对子评论的情况
  1038. if cursor.Increase() {
  1039. xcursor.Prev = min
  1040. } else if cursor.Descrease() {
  1041. xcursor.Next = max
  1042. }
  1043. return s.foldedReplies(c, sub, root, xcursor)
  1044. }