index.go 11 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sort"
  6. "time"
  7. model "go-common/app/job/main/reply/model/reply"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _replySliceNum = 20000
  12. )
  13. func dialogMapByRoot(rootID int64, rps []*model.RpItem, oid int64, tp int8) (dialogMap map[int64][]*model.RpItem) {
  14. length := len(rps)
  15. dialogMap = make(map[int64][]*model.RpItem)
  16. // 根评论下没有评论
  17. if length == 0 {
  18. return
  19. }
  20. // 这里由于种种原因, 可能子评论的ID比父评论大,故按Floor排序
  21. // 按Floor严格排序,保证父评论也几乎是升序排列,提高后续的命中率
  22. sort.Slice(rps, func(i, j int) bool {
  23. return rps[i].Floor < rps[j].Floor
  24. })
  25. for i := 0; i < length; i++ {
  26. if rps[i] == nil {
  27. return
  28. }
  29. if rps[i].Parent == rootID {
  30. // 对话根评论
  31. continue
  32. }
  33. // i-1 must >= 0, 因为第一个元素必然是对话根评论which parrent=root, 出现这种情况说明数据是脏的
  34. if i-1 < 0 {
  35. log.Error("invalid data, rootID(%d), rpID(%d), parrent(%d) oid(%d) type(%d)", rootID, rps[i].ID, rps[i].Parent, oid, tp)
  36. return
  37. }
  38. if rps[i].Parent == rps[i-1].Parent {
  39. // 和上一条评论回复同一条评论的情况,按ID排序,所以这种情况的概率会很高
  40. rps[i].Next = rps[i-1].Next
  41. } else {
  42. var j int
  43. if sort.IsSorted(model.RpItems(rps)) {
  44. // 和上一条评论回复不同评论的情况, 其父评论一定在它之前
  45. j = sort.Search(i, func(n int) bool {
  46. return rps[n].ID >= rps[i].Parent
  47. })
  48. } else {
  49. for index := range rps[:i] {
  50. if rps[index].ID == rps[i].Parent {
  51. j = index
  52. break
  53. }
  54. }
  55. }
  56. // search 如果返回j==i说明没搜索,或者遍历到了最后一个, 这种情况说明数据是脏的
  57. if j == i {
  58. log.Error("invalid data, rootID(%d), rpID(%d), parrent(%d) oid(%d) type(%d)", rootID, rps[i].ID, rps[i].Parent, oid, tp)
  59. return nil
  60. }
  61. rps[i].Next = rps[j]
  62. }
  63. }
  64. tmp := new(struct {
  65. ID int64
  66. DiaglogID int64
  67. })
  68. for i := 0; i < length; i++ {
  69. if rps[i] == nil {
  70. return
  71. }
  72. next := rps[i].Next
  73. if next == nil {
  74. // 如果是对话根评论
  75. dialogMap[rps[i].ID] = append(dialogMap[rps[i].ID], rps[i])
  76. } else if next.ID == tmp.ID {
  77. // 这里tmp缓存了上一个评论的父评论, 减少查找的次数
  78. // 如果跟上一条评论评论的是同一条评论,则可以直接加进上一个dialog
  79. dialogMap[tmp.DiaglogID] = append(dialogMap[tmp.DiaglogID], rps[i])
  80. } else {
  81. depth := 0
  82. for next.Next != nil {
  83. next = next.Next
  84. depth++
  85. if depth > 10000 {
  86. for i := range rps {
  87. log.Error("rp: %v", rps[i])
  88. }
  89. log.Error("recursive reach max depth")
  90. return nil
  91. }
  92. }
  93. }
  94. }
  95. return
  96. }
  97. func (s *Service) setDialogByRoot(c context.Context, oid int64, tp int8, rootID int64) (err error) {
  98. // 循环获取某个根评论下的所有子评论
  99. rps, err := s.dao.Reply.FixDialogGetRepliesByRoot(c, oid, tp, rootID)
  100. if err != nil {
  101. log.Error("fix dialog error (%v)", err)
  102. return
  103. }
  104. //根据所有子评论构造 key为二级父评论(即对话根评论), value为二级父评论下的所有子评论的map
  105. dialogMap := dialogMapByRoot(rootID, rps, oid, tp)
  106. for k, v := range dialogMap {
  107. ids := make([]int64, len(v))
  108. for i := range v {
  109. ids[i] = v[i].ID
  110. }
  111. s.dao.Reply.FixDialogSetDialogBatch(c, oid, k, ids)
  112. }
  113. return
  114. }
  115. // actionRecoverFixDialog fix dialog
  116. func (s *Service) actionRecoverFixDialog(c context.Context, msg *consumerMsg) {
  117. var (
  118. err error
  119. )
  120. var d struct {
  121. Oid int64 `json:"oid"`
  122. Tp int8 `json:"tp"`
  123. Root int64 `json:"root"`
  124. }
  125. if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
  126. log.Error("json.Unmarshal() error(%v)", err)
  127. return
  128. }
  129. s.setDialogByRoot(c, d.Oid, d.Tp, d.Root)
  130. }
  131. func (s *Service) actionRecoverDialog(c context.Context, msg *consumerMsg) {
  132. var (
  133. ok bool
  134. err error
  135. )
  136. var d struct {
  137. Oid int64 `json:"oid"`
  138. Tp int8 `json:"tp"`
  139. Root int64 `json:"root"`
  140. Dialog int64 `json:"dialog"`
  141. }
  142. if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
  143. log.Error("json.Unmarshal() error(%v)", err)
  144. return
  145. }
  146. if ok, err = s.dao.Redis.ExpireDialogIndex(c, d.Dialog); err == nil && !ok {
  147. rps, err := s.dao.Reply.GetByDialog(c, d.Oid, d.Tp, d.Root, d.Dialog)
  148. if err != nil {
  149. return
  150. }
  151. err = s.dao.Redis.AddDialogIndex(c, d.Dialog, rps)
  152. if err != nil {
  153. log.Error("s.dao.Redis.AddDialogIndex() error (%v)", err)
  154. return
  155. }
  156. }
  157. }
  158. func (s *Service) acionRecoverFloorIdx(c context.Context, msg *consumerMsg) {
  159. var (
  160. err error
  161. rCount int
  162. limit int
  163. ok bool
  164. sub *model.Subject
  165. )
  166. var d struct {
  167. Oid int64 `json:"oid"`
  168. Tp int8 `json:"tp"`
  169. Count int `json:"count"`
  170. Floor int `json:"floor"`
  171. }
  172. if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
  173. log.Error("json.Unmarshal() error(%v)", err)
  174. return
  175. }
  176. sub, err = s.getSubject(c, d.Oid, d.Tp)
  177. if err != nil || sub == nil {
  178. log.Error("s.getSubject(%d,%d) failed!err:=%v", d.Oid, d.Tp, err)
  179. return
  180. }
  181. if sub.RCount == 0 {
  182. return
  183. }
  184. startFloor := sub.Count + 1
  185. if ok, err = s.dao.Redis.ExpireIndex(c, d.Oid, d.Tp, model.SortByFloor); err == nil && ok {
  186. startFloor, err = s.dao.Redis.MinScore(c, d.Oid, d.Tp, model.SortByFloor)
  187. if err != nil {
  188. log.Error("s.dao.Redis.MinScore(%d,%d) failed!err:=%v", d.Oid, d.Tp, err)
  189. return
  190. }
  191. if startFloor <= 1 {
  192. if startFloor != -1 {
  193. if err = s.dao.Redis.AddFloorIndexEnd(c, d.Oid, d.Tp); err != nil {
  194. log.Error("s.dao.Redis.AddFloorIndexEnd(%d, %d) error(%v)", d.Oid, d.Tp, err)
  195. }
  196. }
  197. return
  198. }
  199. if d.Count > 0 {
  200. rCount, err = s.dao.Redis.CountReplies(c, d.Oid, d.Tp, model.SortByFloor)
  201. if err != nil {
  202. log.Error("s.dao.Redis.CountReplies(%d,%d) failed!err:=%v", d.Oid, d.Tp, err)
  203. return
  204. }
  205. }
  206. }
  207. if d.Count > 0 {
  208. limit = d.Count - rCount
  209. } else if d.Floor > 0 {
  210. limit = startFloor - d.Floor
  211. } else {
  212. log.Warn("RecoverFloorByCount(%d,%d) count(%d) or floor(%d) invalid!", d.Oid, d.Tp, d.Floor, d.Count)
  213. return
  214. }
  215. limit += s.batchNumber
  216. if limit < (s.batchNumber / 2) {
  217. return
  218. } else if limit < s.batchNumber {
  219. limit = s.batchNumber
  220. }
  221. rs, err := s.dao.Reply.GetByFloorLimit(c, d.Oid, d.Tp, startFloor, limit)
  222. if err != nil {
  223. log.Error("s.dao.Reply.GetByFloorLimit(%d,%d) failed!err:=%v", d.Oid, d.Tp, err)
  224. return
  225. }
  226. if err = s.dao.Redis.AddFloorIndex(c, d.Oid, d.Tp, rs...); err != nil {
  227. log.Error("s.dao.Redis.AddFloorIndex(%d, %d) error(%v)", d.Oid, d.Tp, err)
  228. }
  229. if len(rs) < limit {
  230. if err = s.dao.Redis.AddFloorIndexEnd(c, d.Oid, d.Tp); err != nil {
  231. log.Error("s.dao.Redis.AddFloorIndexEnd(%d, %d) error(%v)", d.Oid, d.Tp, err)
  232. }
  233. return
  234. }
  235. }
  236. // actionRecoverIndex recover index of archive's reply
  237. func (s *Service) actionRecoverIndex(c context.Context, msg *consumerMsg) {
  238. var (
  239. err error
  240. ok bool
  241. sub *model.Subject
  242. rs []*model.Reply
  243. )
  244. var d struct {
  245. Oid int64 `json:"oid"`
  246. Tp int8 `json:"tp"`
  247. Sort int8 `json:"sort"`
  248. }
  249. if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
  250. log.Error("json.Unmarshal() error(%v)", err)
  251. return
  252. }
  253. if d.Oid <= 0 || !model.CheckSort(d.Sort) {
  254. log.Error("The structure of doActionRecoverIndex msg.Data(%s) was wrong", msg.Data)
  255. return
  256. }
  257. if ok, err = s.dao.Redis.ExpireIndex(c, d.Oid, d.Tp, d.Sort); err == nil && !ok {
  258. sub, err = s.getSubject(c, d.Oid, d.Tp)
  259. if err != nil || sub == nil {
  260. log.Error("s.getSubject failed , oid(%d,%d) err(%v)", d.Oid, d.Tp, err)
  261. return
  262. }
  263. if d.Sort == model.SortByFloor {
  264. rs, err = s.dao.Reply.GetAllInSlice(c, d.Oid, d.Tp, sub.Count, _replySliceNum)
  265. if err != nil {
  266. log.Error("dao.Reply.GetAllInSlice(%d, %d) error(%v)", d.Oid, d.Tp, err)
  267. return
  268. }
  269. // floor index
  270. if ok, err = s.dao.Redis.ExpireIndex(c, d.Oid, d.Tp, model.SortByFloor); err == nil && !ok {
  271. if err = s.dao.Redis.AddFloorIndex(c, d.Oid, d.Tp, rs...); err != nil {
  272. log.Error("s.dao.Redis.AddFloorIndex(%d, %d) error(%v)", d.Oid, d.Tp, err)
  273. }
  274. if err = s.dao.Redis.AddFloorIndexEnd(c, d.Oid, d.Tp); err != nil {
  275. log.Error("s.dao.Redis.AddFloorIndexEnd(%d, %d) error(%v)", d.Oid, d.Tp, err)
  276. }
  277. }
  278. } else if d.Sort == model.SortByLike {
  279. rs, err = s.dao.Reply.GetByLikeLimit(c, d.Oid, d.Tp, 30000)
  280. if err != nil {
  281. log.Error("dao.Reply.GetAllInSlice(%d, %d) error(%v)", d.Oid, d.Tp, err)
  282. return
  283. }
  284. // like index
  285. if ok, err = s.dao.Redis.ExpireIndex(c, d.Oid, d.Tp, model.SortByLike); err == nil && !ok {
  286. rpts, _ := s.dao.Report.GetMapByOid(c, d.Oid, d.Tp)
  287. if err = s.dao.Redis.AddLikeIndexBatch(c, d.Oid, d.Tp, rpts, rs...); err != nil {
  288. log.Error("s.dao.Redis.AddLikeIndexBatch(%d, %d) error(%v)", d.Oid, d.Tp, err)
  289. }
  290. }
  291. } else if d.Sort == model.SortByCount {
  292. rs, err = s.dao.Reply.GetByCountLimit(c, d.Oid, d.Tp, 20000)
  293. if err != nil {
  294. log.Error("dao.Reply.GetAllInSlice(%d, %d) error(%v)", d.Oid, d.Tp, err)
  295. return
  296. }
  297. // count index
  298. if ok, err = s.dao.Redis.ExpireIndex(c, d.Oid, d.Tp, model.SortByCount); err == nil && !ok {
  299. if err = s.dao.Redis.AddCountIndexBatch(c, d.Oid, d.Tp, rs...); err != nil {
  300. log.Error("s.dao.Redis.AddCountIndex(%d, %d) error(%v)", d.Oid, d.Tp, err)
  301. }
  302. }
  303. }
  304. // 回源index时把top缓存初始化,减少attr扫表慢查询
  305. if n := sub.TopCount(); n > 0 {
  306. for _, r := range rs {
  307. if r.IsTop() {
  308. top := model.SubAttrAdminTop
  309. if r.IsUpTop() {
  310. top = model.SubAttrUpperTop
  311. }
  312. err = sub.TopSet(r.RpID, top, 1)
  313. if err == nil {
  314. _, err = s.dao.Subject.UpMeta(c, d.Oid, d.Tp, sub.Meta, time.Now())
  315. if err != nil {
  316. log.Error("s.dao.Subject.UpMeta(%d,%d,%d) failed!err:=%v ", r.RpID, r.Oid, d.Tp, err)
  317. }
  318. s.dao.Mc.AddSubject(c, sub)
  319. }
  320. // get reply with content
  321. var rp *model.Reply
  322. rp, err = s.getReply(c, d.Oid, r.RpID)
  323. if err == nil && rp != nil {
  324. s.dao.Mc.AddTop(c, rp)
  325. }
  326. n--
  327. }
  328. if n == 0 {
  329. break
  330. }
  331. }
  332. }
  333. }
  334. }
  335. // actionRecoverRootIndex recover index of root reply
  336. func (s *Service) actionRecoverRootIndex(c context.Context, msg *consumerMsg) {
  337. var (
  338. err error
  339. ok bool
  340. rs []*model.Reply
  341. )
  342. var d struct {
  343. Oid int64 `json:"oid"`
  344. Tp int8 `json:"tp"`
  345. Root int64 `json:"root"`
  346. }
  347. if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
  348. log.Error("json.Unmarshal() error(%v)", err)
  349. return
  350. }
  351. if d.Oid <= 0 || d.Root <= 0 {
  352. log.Error("The structure of doActionRecoverRootIndex msg.Data(%s) was wrong", msg.Data)
  353. return
  354. }
  355. if ok, err = s.dao.Redis.ExpireNewChildIndex(c, d.Root); err == nil && !ok {
  356. if rs, err = s.dao.Reply.GetAllByRoot(c, d.Oid, d.Root, d.Tp); err != nil {
  357. log.Error("dao.Reply.GetAllReply(%d, %d) error(%v)", d.Oid, d.Tp, err)
  358. return
  359. }
  360. if err = s.dao.Redis.AddNewChildIndex(c, d.Root, rs...); err != nil {
  361. log.Error("s.dao.Redis.AddFloorIndexByRoot(%d, %d) error(%v)", d.Oid, d.Tp, err)
  362. return
  363. }
  364. }
  365. }