fold.go 7.8 KB


  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/admin/main/reply/model"
  7. "go-common/library/database/sql"
  8. "go-common/library/ecode"
  9. "go-common/library/sync/errgroup.v2"
  10. )
  11. func compose(oids, tps, rpIDs []int64) (rpMap map[int64][]int64, tpMap map[int64]int64) {
  12. if len(oids) != len(rpIDs) {
  13. return
  14. }
  15. rpMap = make(map[int64][]int64)
  16. tpMap = make(map[int64]int64)
  17. for i, oid := range oids {
  18. if _, ok := rpMap[oid]; ok {
  19. rpMap[oid] = append(rpMap[oid], rpIDs[i])
  20. } else {
  21. rpMap[oid] = []int64{rpIDs[i]}
  22. }
  23. tpMap[oid] = tps[i]
  24. }
  25. return
  26. }
  27. func extend(oid int64, length int) []int64 {
  28. oids := make([]int64, 0, length)
  29. for i := 0; i < length; i++ {
  30. oids = append(oids, oid)
  31. }
  32. return oids
  33. }
  34. // FoldReplies ...
  35. func (s *Service) FoldReplies(ctx context.Context, oids, tps, rpIDs []int64) (err error) {
  36. g := errgroup.WithContext(ctx)
  37. g.GOMAXPROCS(2)
  38. rpMap, tpMap := compose(oids, tps, rpIDs)
  39. for oid, IDs := range rpMap {
  40. if tp, ok := tpMap[oid]; ok {
  41. oid, tp, IDs := oid, tp, IDs
  42. g.Go(func(ctx context.Context) error {
  43. return s.foldReplies(ctx, oid, tp, IDs)
  44. })
  45. }
  46. }
  47. return g.Wait()
  48. }
  49. func (s *Service) foldReplies(ctx context.Context, oid, tp int64, rpIDs []int64) (err error) {
  50. var (
  51. sub *model.Subject
  52. // 所有的评论,包含需要被折叠的子评论的根评论
  53. rpMap = make(map[int64]*model.Reply)
  54. // 需要被折叠的子评论的根评论IDs
  55. roots []int64
  56. rootMap map[int64]*model.Reply
  57. // 应该被折叠的评论map
  58. rps = make(map[int64]*model.Reply)
  59. mu sync.Mutex
  60. )
  61. if sub, err = s.subject(ctx, oid, int32(tp)); err != nil {
  62. return
  63. }
  64. if rpMap, err = s.dao.Replies(ctx, extend(oid, len(rpIDs)), rpIDs); err != nil {
  65. return
  66. }
  67. for _, rp := range rpMap {
  68. if !rp.IsRoot() {
  69. roots = append(roots, rp.Root)
  70. }
  71. rps[rp.ID] = rp
  72. }
  73. if len(roots) > 0 {
  74. if rootMap, err = s.dao.Replies(ctx, extend(oid, len(roots)), roots); err != nil {
  75. return
  76. }
  77. for _, root := range rootMap {
  78. rpMap[root.ID] = root
  79. }
  80. }
  81. g := errgroup.WithContext(ctx)
  82. g.GOMAXPROCS(4)
  83. for rpID := range rps {
  84. rpID := rpID
  85. g.Go(func(ctx context.Context) error {
  86. if err := s.tranFoldReply(ctx, sub.Oid, rpID); err != nil {
  87. mu.Lock()
  88. delete(rps, rpID)
  89. mu.Unlock()
  90. return err
  91. }
  92. return nil
  93. })
  94. }
  95. err = g.Wait()
  96. for _, rp := range rps {
  97. // 这里不是删除,是为了让reply-feed去掉热评, 折叠评论是可以有互动行为的, 所以这里异步,丢了也无所谓
  98. rp := rp
  99. s.cache.Do(ctx, func(ctx context.Context) {
  100. s.pubEvent(ctx, "reply_del", 0, sub, rp, nil)
  101. })
  102. }
  103. // 标记数据库有折叠评论, rpMap 是所有的被折叠评论以及他们的根评论
  104. s.markHasFolded(ctx, rps, rpMap, sub)
  105. s.handleCacheByFold(ctx, rps, rpMap, sub)
  106. return err
  107. }
  108. // handleCacheByFold ...
  109. func (s *Service) handleCacheByFold(ctx context.Context, rps map[int64]*model.Reply, rpMap map[int64]*model.Reply, sub *model.Subject) {
  110. var (
  111. roots []int64
  112. childMap = make(map[int64][]int64)
  113. )
  114. for _, rp := range rps {
  115. if rp.IsRoot() {
  116. roots = append(roots, rp.ID)
  117. } else {
  118. childMap[rp.Root] = append(childMap[rp.Root], rp.ID)
  119. }
  120. }
  121. s.cacheOperater.Do(ctx, func(ctx context.Context) {
  122. // 删正常列表的redis缓存
  123. s.dao.RemRdsByFold(ctx, roots, childMap, sub, rpMap)
  124. // 加折叠列表redis缓存
  125. s.dao.AddRdsByFold(ctx, roots, childMap, sub, rpMap)
  126. })
  127. }
  128. // markAsFolded ...
  129. func (s *Service) markHasFolded(ctx context.Context, foldedRp map[int64]*model.Reply, rpMap map[int64]*model.Reply, sub *model.Subject) {
  130. var (
  131. dirtyCacheRpIDs []int64
  132. markedRpIDs []int64
  133. )
  134. for _, rp := range foldedRp {
  135. rp := rp
  136. if rp.IsRoot() {
  137. // 如果subject还没被标记过
  138. if !sub.HasFolded() {
  139. sub.MarkHasFolded()
  140. // 修改数据库标记
  141. s.marker.Do(ctx, func(ctx context.Context) {
  142. if err := s.tranMarkSubHasFolded(ctx, sub.Oid, sub.Type); err != nil {
  143. return
  144. }
  145. })
  146. // 删掉 subject mc
  147. s.cacheOperater.Do(ctx, func(ctx context.Context) {
  148. s.dao.DelSubjectCache(ctx, sub.Oid, sub.Type)
  149. })
  150. }
  151. } else {
  152. if _, ok := rpMap[rp.Root]; ok {
  153. markedRpIDs = append(markedRpIDs, rp.Root)
  154. }
  155. }
  156. dirtyCacheRpIDs = append(dirtyCacheRpIDs, rp.ID)
  157. }
  158. for _, rpID := range markedRpIDs {
  159. // 修改数据库标记
  160. rpID := rpID
  161. s.marker.Do(ctx, func(ctx context.Context) {
  162. if err := s.tranMarkReplyHasFolded(ctx, sub.Oid, rpID); err != nil {
  163. return
  164. }
  165. })
  166. }
  167. dirtyCacheRpIDs = append(dirtyCacheRpIDs, markedRpIDs...)
  168. for _, rpID := range dirtyCacheRpIDs {
  169. // 删除被折叠子评论的根评论以及被折叠的子评论和根评论
  170. rpID := rpID
  171. s.cacheOperater.Do(ctx, func(ctx context.Context) {
  172. s.dao.DelReplyCache(ctx, rpID)
  173. })
  174. }
  175. }
  176. func (s *Service) tranFoldReply(ctx context.Context, oid, rpID int64) (err error) {
  177. var (
  178. tx *sql.Tx
  179. rp *model.Reply
  180. )
  181. if tx, err = s.dao.BeginTran(ctx); err != nil {
  182. return
  183. }
  184. if rp, err = s.dao.TxReplyForUpdate(tx, oid, rpID); err != nil {
  185. tx.Rollback()
  186. return
  187. }
  188. if rp.DenyFolded() {
  189. tx.Rollback()
  190. return ecode.ReplyForbidFolded
  191. }
  192. if _, err = s.dao.TxUpdateReplyState(tx, oid, rpID, model.StateFolded, time.Now()); err != nil {
  193. tx.Rollback()
  194. return
  195. }
  196. return tx.Commit()
  197. }
  198. func (s *Service) tranMarkReplyHasFolded(ctx context.Context, oid, rpID int64) (err error) {
  199. var (
  200. tx *sql.Tx
  201. rp *model.Reply
  202. )
  203. if tx, err = s.dao.BeginTran(ctx); err != nil {
  204. return
  205. }
  206. if rp, err = s.dao.TxReplyForUpdate(tx, oid, rpID); err != nil {
  207. tx.Rollback()
  208. return
  209. }
  210. rp.MarkHasFolded()
  211. if _, err = s.dao.TxUpReplyAttr(tx, oid, rpID, rp.Attr, time.Now()); err != nil {
  212. tx.Rollback()
  213. return
  214. }
  215. return tx.Commit()
  216. }
  217. func (s *Service) tranMarkSubHasFolded(ctx context.Context, oid int64, tp int32) (err error) {
  218. var (
  219. tx *sql.Tx
  220. sub *model.Subject
  221. )
  222. if tx, err = s.dao.BeginTran(ctx); err != nil {
  223. return
  224. }
  225. if sub, err = s.dao.TxSubjectForUpdate(tx, oid, tp); err != nil {
  226. tx.Rollback()
  227. return
  228. }
  229. sub.MarkHasFolded()
  230. if _, err = s.dao.TxUpSubAttr(tx, oid, tp, sub.Attr, time.Now()); err != nil {
  231. tx.Rollback()
  232. return
  233. }
  234. return tx.Commit()
  235. }
  236. // handleFolded 处理折叠评论的逻辑,包括折叠评论被删除等, 状态改变之后标记改变的问题...
  237. func (s *Service) handleFolded(ctx context.Context, rp *model.Reply) {
  238. sub, root, err := s.handleHasFoldedMark(ctx, rp.Oid, rp.Type, rp.Root)
  239. if err != nil {
  240. return
  241. }
  242. if sub != nil {
  243. s.dao.DelSubjectCache(ctx, sub.Oid, sub.Type)
  244. }
  245. if root != nil {
  246. s.dao.DelReplyCache(ctx, root.ID)
  247. }
  248. s.remFoldedCache(ctx, rp)
  249. }
  250. func (s *Service) handleHasFoldedMark(ctx context.Context, oid int64, tp int32, root int64) (sub *model.Subject, reply *model.Reply, err error) {
  251. var (
  252. tx *sql.Tx
  253. count int
  254. )
  255. if tx, err = s.dao.BeginTran(ctx); err != nil {
  256. return
  257. }
  258. // 锁subject表
  259. if sub, err = s.dao.TxSubjectForUpdate(tx, oid, tp); err != nil {
  260. tx.Rollback()
  261. return
  262. }
  263. if count, err = s.dao.TxCountFoldedReplies(tx, oid, tp, root); err != nil || count > 0 {
  264. tx.Rollback()
  265. return
  266. }
  267. // 折叠根评论
  268. if root == 0 {
  269. if !sub.HasFolded() {
  270. tx.Rollback()
  271. return
  272. }
  273. sub.UnmarkHasFolded()
  274. if _, err = s.dao.TxUpSubAttr(tx, oid, tp, sub.Attr, time.Now()); err != nil {
  275. tx.Rollback()
  276. return
  277. }
  278. } else {
  279. if reply, err = s.dao.TxReplyForUpdate(tx, oid, root); err != nil {
  280. tx.Rollback()
  281. return
  282. }
  283. if !reply.HasFolded() {
  284. tx.Rollback()
  285. return
  286. }
  287. reply.UnmarkHasFolded()
  288. if _, err = s.dao.TxUpReplyAttr(tx, oid, root, reply.Attr, time.Now()); err != nil {
  289. tx.Rollback()
  290. return
  291. }
  292. }
  293. if err = tx.Commit(); err != nil {
  294. return
  295. }
  296. return
  297. }
  298. // remFoldedCache ...
  299. func (s *Service) remFoldedCache(ctx context.Context, rp *model.Reply) {
  300. if rp.IsRoot() {
  301. s.dao.RemFolder(ctx, model.FolderKindSub, rp.Oid, rp.ID)
  302. } else {
  303. s.dao.RemFolder(ctx, model.FolderKindRoot, rp.Root, rp.ID)
  304. }
  305. }