reply.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/job/main/credit/model"
  8. "go-common/app/service/main/archive/model/archive"
  9. "go-common/library/log"
  10. )
  11. func (s *Service) replyAllConsumer() {
  12. defer s.wg.Done()
  13. var (
  14. msgs = s.replyAllSub.Messages()
  15. err error
  16. c = context.TODO()
  17. )
  18. for {
  19. msg, ok := <-msgs
  20. if !ok {
  21. log.Error("s.replyAllSub.Message closed")
  22. return
  23. }
  24. msg.Commit()
  25. m := &model.Reply{}
  26. if err = json.Unmarshal(msg.Value, m); err != nil {
  27. log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  28. continue
  29. }
  30. switch m.Action {
  31. case model.RouteReplyReport:
  32. err = s.addReplyReport(c, m)
  33. default:
  34. log.Warn("replyAllConsumer unknown message action(%s)", m.Action)
  35. }
  36. if err != nil {
  37. log.Error("replyMessage key(%s) value(%s) partition(%d) offset(%d) commit error(%v)", msg.Key, msg.Value, msg.Partition, msg.Offset, err)
  38. continue
  39. }
  40. log.Info("replyMessage key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  41. }
  42. }
  43. func (s *Service) addReplyReport(c context.Context, m *model.Reply) (err error) {
  44. if m.Reply == nil || m.Subject == nil || m.Report == nil {
  45. log.Warn("reply content(%+v) empty!", m)
  46. return
  47. }
  48. if m.Report.State == model.ReportStateAddJuge {
  49. log.Warn("rpid(%d) state(%d) is in juge", m.Report.RPID, m.Report.State)
  50. return
  51. }
  52. if m.Report.Type != model.SubTypeArchive {
  53. log.Warn("m.Report.Type(%d) model.SubTypeArchive(%d)", m.Report.Type, model.SubTypeArchive)
  54. return
  55. }
  56. var ca *model.AutoCaseConf
  57. if ca, err = s.dao.AutoCaseConf(c, model.OriginReply); err != nil {
  58. log.Error("s.dao.AutoCaseConf(%d) error(%v)", model.OriginReply, err)
  59. return
  60. }
  61. if ca == nil {
  62. log.Warn("otype(%d) auto acse conf is not set !", model.OriginReply)
  63. return
  64. }
  65. replyReportReason := model.BlockedReasonTypeByReply(m.Report.Reason)
  66. if _, ok := ca.Reasons[replyReportReason]; !ok {
  67. log.Warn("m.Report.Reason(%d) not int ca.Reasons(%+v)", m.Report.Reason, ca.Reasons)
  68. return
  69. }
  70. if m.Report.Score < ca.ReportScore {
  71. log.Warn("m.Report.Score(%d) ca.ReportScore(%d)", m.Report.Score, ca.ReportScore)
  72. return
  73. }
  74. if m.Reply.Like > int64(ca.Likes) {
  75. log.Warn("m.Reply.Like(%d) ca.Likes(%d)", m.Reply.Like, ca.Likes)
  76. return
  77. }
  78. var disCount, total int64
  79. if disCount, err = s.dao.CountCaseMID(c, m.Reply.MID, model.OriginReply); err != nil {
  80. log.Error("s.dao.CountBlocked(%d) err(%v)", m.Reply.MID, err)
  81. return
  82. }
  83. if disCount > 0 {
  84. log.Warn("rpid(%d) mid(%d) report in 24 hours", m.Report.RPID, m.Reply.MID)
  85. return
  86. }
  87. if total, err = s.dao.CountBlocked(c, m.Reply.MID, time.Now().AddDate(-1, 0, 0)); err != nil {
  88. log.Error("s.dao.CountBlocked(%d) err(%v)", m.Reply.MID, err)
  89. return
  90. }
  91. punishResult, blockedDay := model.PunishResultDays(total)
  92. mc := &model.Case{
  93. Mid: m.Reply.MID,
  94. Status: model.CaseStatusGrantStop,
  95. RelationID: fmt.Sprintf("%d-%d-%d", m.Report.RPID, m.Report.Type, m.Report.OID),
  96. PunishResult: punishResult,
  97. BlockedDay: blockedDay,
  98. OPID: model.AutoOPID,
  99. BCtime: m.Reply.CTime,
  100. }
  101. mc.Origin = model.Origin{
  102. OriginTitle: s.replyOriginTitle(c, m.Report.OID, m.Report.Type),
  103. OriginContent: m.Reply.Content.Message,
  104. OriginType: int64(model.OriginReply),
  105. OriginURL: s.replyOriginURL(m.Report.RPID, m.Report.OID, m.Report.Type),
  106. ReasonType: int64(replyReportReason),
  107. }
  108. var count int64
  109. if count, err = s.dao.CaseRelationIDCount(c, model.OriginReply, mc.RelationID); err != nil {
  110. log.Error("ss.dao.CaseRelationIDCount(%d,%s) err(%v)", model.OriginReply, mc.RelationID, err)
  111. return
  112. }
  113. if count > 0 {
  114. log.Warn("rpid(%d) state(%d) is alreadly juge", m.Report.RPID, m.Report.State)
  115. return
  116. }
  117. var need bool
  118. if need, err = s.dao.CheckFilter(c, "credit", m.Reply.Content.Message, ""); err != nil {
  119. log.Error("s.dao.CheckFilter(%s,%s) error(%v)", "credit", m.Reply.Content.Message, err)
  120. return
  121. }
  122. if need {
  123. log.Warn("reply(%d) message(%s) is filter", m.Report.RPID, m.Reply.Content.Message)
  124. return
  125. }
  126. if err = s.dao.AddBlockedCase(c, mc); err != nil {
  127. log.Error("s.dao.AddBlockedCase(%+v) err(%v)", mc, err)
  128. return
  129. }
  130. s.dao.UpReplyState(c, m.Report.OID, m.Report.RPID, m.Report.Type, model.ReportStateAddJuge)
  131. s.dao.UpAppealState(c, model.AppealBusinessID, m.Report.OID, m.Report.RPID)
  132. return
  133. }
  134. func (s *Service) replyOriginTitle(c context.Context, oid int64, oType int8) (title string) {
  135. switch oType {
  136. case model.SubTypeArchive:
  137. arg := &archive.ArgAid2{Aid: oid}
  138. arc, err := s.arcRPC.Archive3(c, arg)
  139. if err != nil {
  140. log.Error("s.arcRPC.Archive3(%v) error(%v)", arg, err)
  141. return
  142. }
  143. title = arc.Title
  144. return
  145. }
  146. return
  147. }
  148. func (s *Service) replyOriginURL(rpid, oid int64, oType int8) string {
  149. switch oType {
  150. case model.SubTypeArchive:
  151. return fmt.Sprintf(model.ReplyOriginURL, oid, rpid)
  152. }
  153. return ""
  154. }
  155. // RegReply regist reply subject.
  156. func (s *Service) RegReply(c context.Context, table string, nwMsg []byte, oldMsg []byte) (err error) {
  157. var (
  158. replyType int8
  159. replyID int64
  160. )
  161. switch table {
  162. case _blockedCaseTable:
  163. mr := &model.Case{}
  164. if err = json.Unmarshal(nwMsg, mr); err != nil {
  165. log.Error("json.Unmarshal(%s) error(%v)", string(nwMsg), err)
  166. return
  167. }
  168. replyType = model.ReplyCase
  169. replyID = mr.ID
  170. case _blockedInfoTable:
  171. mr := &model.BlockedInfo{}
  172. if err = json.Unmarshal(nwMsg, mr); err != nil {
  173. log.Error("json.Unmarshal(%s) error(%v)", string(nwMsg), err)
  174. return
  175. }
  176. replyType = model.ReplyBlocked
  177. replyID = mr.ID
  178. case _blockedPublishTable:
  179. mr := &model.Publish{}
  180. if err = json.Unmarshal(nwMsg, mr); err != nil {
  181. log.Error("json.Unmarshal(%s) error(%v)", string(nwMsg), err)
  182. return
  183. }
  184. if mr.PStatus != model.PublishOpen {
  185. return
  186. }
  187. replyType = model.ReplyPublish
  188. replyID = mr.ID
  189. }
  190. return s.dao.RegReply(c, replyID, replyType)
  191. }