reply.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "go-common/library/database/elastic"
  8. "net/url"
  9. "regexp"
  10. "strings"
  11. "time"
  12. "go-common/app/job/main/reply/conf"
  13. "go-common/app/job/main/reply/model/reply"
  14. model "go-common/app/job/main/reply/model/reply"
  15. accmdl "go-common/app/service/main/account/api"
  16. assmdl "go-common/app/service/main/assist/model/assist"
  17. relmdl "go-common/app/service/main/relation/model"
  18. xsql "go-common/library/database/sql"
  19. "go-common/library/log"
  20. xhttp "go-common/library/net/http/blademaster"
  21. )
  22. var (
  23. _atReg = regexp.MustCompile(`@([^\s^:^,^@]+)`)
  24. _topicReg = regexp.MustCompile(`#([^\n^@^#^\x{1F000}-\x{1F02F}^\x{1F0A0}-\x{1F0FF}^\x{1F100}-\x{1F64F}^\x{1F680}-\x{1F6FF}^\x{1F910}-\x{1F96B}^\x{1F980}-\x{1F9E0}]{1,32})#`)
  25. _urlReg = regexp.MustCompile(`(((http:\/\/|https:\/\/)[a-z0-9A-Z]+\.(bilibili|biligame)\.com[a-z0-9A-Z\/\.\$\*\?~=#!%@&-]*)|((http:\/\/|https:\/\/)(acg|b23)\.tv[a-z0-9A-Z\/\.\$\*\?~=#!@&]*))`)
  26. _avReg = regexp.MustCompile(`#(cv\d+)|#(av\d+)|#(vc\d+)`)
  27. searchHTTPClient *xhttp.Client
  28. errReplyContentNotFound = errors.New("reply content not found")
  29. )
  30. const (
  31. _appIDReply = "reply"
  32. _appIDReport = "replyreport"
  33. timeFormat = "2006-01-02 15:03:04"
  34. // event
  35. _eventReply = "reply"
  36. _eventHate = "hate"
  37. _eventLike = "like"
  38. _eventLikeCancel = "like_cancel"
  39. _eventHateCancel = "hate_cancel"
  40. )
  41. func (s *Service) beginTran(c context.Context) (*xsql.Tx, error) {
  42. return s.dao.BeginTran(c)
  43. }
  44. func (s *Service) actionAdd(c context.Context, msg *consumerMsg) {
  45. var rp *model.Reply
  46. if err := json.Unmarshal([]byte(msg.Data), &rp); err != nil {
  47. log.Error("json.Unmarshal() error(%v)", err)
  48. return
  49. }
  50. if rp.RpID == 0 || rp.Oid == 0 || rp.Content == nil {
  51. log.Error("The structure of reply(%s) from rpCh was wrong", msg.Data)
  52. return
  53. }
  54. if rp.Root == 0 && rp.Parent == 0 {
  55. s.addReply(c, rp)
  56. } else {
  57. s.addReplyReply(c, rp)
  58. }
  59. }
  60. func (s *Service) tranAdd(c context.Context, rp *model.Reply, is bool) (err error) {
  61. tx, err := s.beginTran(c)
  62. if err != nil {
  63. log.Error("reply(%s) beginTran error(%v)", rp, err)
  64. return
  65. }
  66. var rows int64
  67. defer func() {
  68. if err == nil && rows == 0 {
  69. err = errors.New("sql: transaction add reply failed")
  70. }
  71. }()
  72. if is {
  73. if rp.IsNormal() {
  74. rows, err = s.dao.Subject.TxIncrCount(tx, rp.Oid, rp.Type, rp.CTime.Time())
  75. if err != nil || rows == 0 {
  76. tx.Rollback()
  77. log.Error("dao.Subject.TxIncrCount(%v) error(%v) or rows==0", rp, err)
  78. return
  79. }
  80. } else {
  81. rows, err = s.dao.Subject.TxIncrFCount(tx, rp.Oid, rp.Type, rp.CTime.Time())
  82. if err != nil || rows == 0 {
  83. tx.Rollback()
  84. log.Error("dao.Subject.TxIncrCount(%v) error(%v) or rows==0", rp, err)
  85. return
  86. }
  87. }
  88. } else {
  89. var rootReply *model.Reply
  90. if rootReply, err = s.dao.Reply.GetForUpdate(tx, rp.Oid, rp.Root); err != nil {
  91. tx.Rollback()
  92. return err
  93. }
  94. if rootReply.IsDeleted() {
  95. return fmt.Errorf("the root reply is deleted(%d,%d,%d)", rp.Oid, rp.Type, rp.Root)
  96. }
  97. if rp.IsNormal() {
  98. rows, err = s.dao.Reply.TxIncrCount(tx, rp.Oid, rp.Root, rp.CTime.Time())
  99. if err != nil || rows == 0 {
  100. tx.Rollback()
  101. log.Error("dao.Reply.TxIncrCount(%v) error(%v) or rows==0", rp, err)
  102. return
  103. }
  104. rows, err = s.dao.Subject.TxIncrACount(tx, rp.Oid, rp.Type, 1, rp.CTime.Time())
  105. if err != nil || rows == 0 {
  106. tx.Rollback()
  107. log.Error("dao.Subject.TxIncrACount(%v) error(%v) or rows==0", rp, err)
  108. return
  109. }
  110. } else {
  111. rows, err = s.dao.Reply.TxIncrFCount(tx, rp.Oid, rp.Root, rp.CTime.Time())
  112. if err != nil || rows == 0 {
  113. tx.Rollback()
  114. log.Error("dao.Reply.TxIncrCount(%v) error(%v) or rows==0", rp, err)
  115. return
  116. }
  117. }
  118. }
  119. if rp.State == model.ReplyStateAudit || rp.State == model.ReplyStateMonitor {
  120. if rows, err = s.dao.Subject.TxIncrMCount(tx, rp.Oid, rp.Type, rp.CTime.Time()); err != nil || rows == 0 {
  121. tx.Rollback()
  122. log.Error("dao.Subject.TxIncrMCount(%v) error(%v) or rows==0", rp, err)
  123. return
  124. }
  125. }
  126. rows, err = s.dao.Content.TxInsert(tx, rp.Oid, rp.Content)
  127. if err != nil || rows == 0 {
  128. tx.Rollback()
  129. log.Error("dao.Content.TxInContent(%v) error(%v) or rows==0", rp, err)
  130. return
  131. }
  132. rows, err = s.dao.Reply.TxInsert(tx, rp)
  133. if err != nil || rows == 0 {
  134. tx.Rollback()
  135. log.Error("dao.Reply.TxInReply(%v) error(%v) or rows==0", rp, err)
  136. return
  137. }
  138. return tx.Commit()
  139. }
  140. func (s *Service) regTopic(c context.Context, msg string) (topics []string) {
  141. msg = _urlReg.ReplaceAllString(msg, "")
  142. msg = _avReg.ReplaceAllString(msg, "#")
  143. ss := _topicReg.FindAllStringSubmatch(msg, -1)
  144. if len(ss) == 0 {
  145. return
  146. }
  147. for _, nns := range ss {
  148. if len(nns) == 2 {
  149. topic := strings.TrimSpace(nns[1])
  150. if len(topic) > 0 {
  151. topics = append(topics, topic)
  152. }
  153. }
  154. if len(topics) >= 5 {
  155. break
  156. }
  157. }
  158. return
  159. }
  160. func (s *Service) regAt(c context.Context, msg string, over, self int64) (ats []int64) {
  161. var err error
  162. ss := _atReg.FindAllStringSubmatch(msg, 10)
  163. if len(ss) == 0 {
  164. return
  165. }
  166. names := make([]string, 0, len(ss))
  167. for _, nns := range ss {
  168. if len(nns) == 2 {
  169. names = append(names, nns[1])
  170. }
  171. }
  172. if len(names) == 0 {
  173. return
  174. }
  175. us, err := s.accSrv.InfosByName3(c, &accmdl.NamesReq{Names: names})
  176. if err != nil {
  177. log.Error("s.accSrv.InfosByName2 failed, err(%v)", err)
  178. return
  179. }
  180. ats = make([]int64, 0, len(us.Infos))
  181. for mid := range us.Infos {
  182. if mid != over && mid != self {
  183. ats = append(ats, mid)
  184. }
  185. }
  186. if len(ats) == 0 {
  187. return
  188. }
  189. ats = s.getFilterBlacklist(c, self, ats)
  190. return
  191. }
  192. func (s *Service) addReply(c context.Context, rp *model.Reply) {
  193. var (
  194. err error
  195. ok bool
  196. )
  197. sub, err := s.getSubject(c, rp.Oid, rp.Type)
  198. if err != nil || sub == nil {
  199. log.Error("s.getSubject failed , oid(%d,%d) err(%v)", rp.Oid, rp.Type, err)
  200. return
  201. }
  202. if sub == nil {
  203. log.Error("get subject is nil oid(%d) type(%d)", rp.Oid, rp.Type)
  204. return
  205. }
  206. // init some field
  207. if rp.IsNormal() {
  208. sub.RCount = sub.RCount + 1
  209. sub.ACount = sub.ACount + 1
  210. }
  211. sub.Count = sub.Count + 1
  212. rp.Floor = sub.Count
  213. rp.MTime = rp.CTime
  214. rp.Content.RpID = rp.RpID
  215. rp.Content.CTime = rp.CTime
  216. rp.Content.MTime = rp.MTime
  217. if len(rp.Content.Ats) == 0 {
  218. rp.Content.Ats = s.regAt(c, rp.Content.Message, 0, rp.Mid)
  219. }
  220. rp.Content.Topics = s.regTopic(c, rp.Content.Message)
  221. // begin transaction
  222. if err = s.tranAdd(c, rp, true); err != nil {
  223. log.Error("Transaction add reply(%v) error(%v)", rp, err)
  224. return
  225. }
  226. // add cache
  227. if err = s.dao.Mc.AddSubject(c, sub); err != nil {
  228. log.Error("s.dao.Mc.AddSubject failed , oid(%d) err(%v)", sub.Oid, err)
  229. }
  230. if err = s.dao.Mc.AddReply(c, rp); err != nil {
  231. log.Error("s.dao.Mc.AddReply failed , RpID(%d) err(%v)", rp.RpID, err)
  232. }
  233. if rp.IsNormal() {
  234. // update reply count
  235. s.upAcount(c, sub.Oid, sub.Type, sub.ACount, rp.CTime.Time())
  236. // add index cache
  237. if ok, err = s.dao.Redis.ExpireIndex(c, sub.Oid, sub.Type, model.SortByFloor); err == nil && ok {
  238. if err = s.dao.Redis.AddFloorIndex(c, sub.Oid, sub.Type, rp); err != nil {
  239. log.Error("s.dao.Redis.AddFloorIndex failed , oid(%d) type(%d) err(%v)", sub.Oid, sub.Type, err)
  240. }
  241. }
  242. if ok, err = s.dao.Redis.ExpireIndex(c, sub.Oid, sub.Type, model.SortByCount); err == nil && ok {
  243. if err = s.dao.Redis.AddCountIndex(c, sub.Oid, sub.Type, rp); err != nil {
  244. log.Error("s.dao.Redis.AddCountIndex failed , oid(%d) type(%d) err(%v)", sub.Oid, sub.Type, err)
  245. }
  246. }
  247. if ok, err = s.dao.Redis.ExpireIndex(c, sub.Oid, sub.Type, model.SortByLike); err == nil && ok {
  248. rpts := make(map[int64]*reply.Report, 1)
  249. if rpt, _ := s.dao.Report.Get(c, rp.Oid, rp.RpID); rpt != nil {
  250. rpts[rp.RpID] = rpt
  251. }
  252. if err = s.dao.Redis.AddLikeIndex(c, sub.Oid, sub.Type, rpts, rp); err != nil {
  253. log.Error("s.dao.Redis.AddLikeIndex failed , oid(%d) type(%d) err(%v)", sub.Oid, sub.Type, err)
  254. }
  255. }
  256. s.notifyReply(c, sub, rp)
  257. } else if rp.State == model.ReplyStateAudit {
  258. if err = s.dao.Redis.AddAuditIndex(c, rp); err != nil {
  259. log.Error("s.dao.Redis.AddAUditIndex(%d,%d,%d) error(%v)", rp.Oid, rp.RpID, rp.Type, err)
  260. }
  261. }
  262. if err = s.dao.PubEvent(c, _eventReply, rp.Mid, sub, rp, nil); err != nil {
  263. return
  264. }
  265. }
  266. func (s *Service) addReplyReply(c context.Context, rp *model.Reply) {
  267. var (
  268. err error
  269. ok bool
  270. )
  271. sub, err := s.getSubject(c, rp.Oid, rp.Type)
  272. if err != nil || sub == nil {
  273. log.Error("s.getSubject failed , oid(%d,%d) err(%v)", rp.Oid, rp.Type, err)
  274. return
  275. }
  276. // NOTE:depend on db,do not get from cache
  277. rootRp, err := s.getReply(c, rp.Oid, rp.Root)
  278. if err != nil {
  279. log.Error("s.getReply failed , oid(%d), root(%d) err(%v)", rp.Oid, rp.Root, err)
  280. return
  281. }
  282. if rootRp == nil {
  283. log.Error("get reply is nil oid(%d) type(%d) rpid(%d)", rp.Oid, rp.Type, rp.Root)
  284. return
  285. }
  286. var parentRp *model.Reply
  287. if rp.Root != rp.Parent {
  288. parentRp, err = s.getReply(c, rp.Oid, rp.Parent)
  289. if err != nil {
  290. log.Error("s.getReply failed , oid(%d), parent(%d) err(%v)", rp.Oid, rp.Parent, err)
  291. return
  292. }
  293. if parentRp == nil {
  294. log.Error("get reply is nil oid(%d) type(%d) rpid(%d)", rp.Oid, rp.Type, rp.Parent)
  295. return
  296. }
  297. if parentRp.Dialog == 0 {
  298. log.Warn("Dialog Need Migration oid(%d) type(%d) rootID(%d)", rp.Oid, rp.Type, rootRp.RpID)
  299. // s.setDialogByRoot(context.Background(), rp.Oid, rp.Type, rp.Root)
  300. }
  301. rp.Dialog = parentRp.Dialog
  302. } else {
  303. parentRp = rootRp
  304. if rp.Dialog != rp.RpID {
  305. rp.Dialog = rp.RpID
  306. }
  307. }
  308. // init some field
  309. if rp.IsNormal() {
  310. sub.ACount = sub.ACount + 1
  311. rootRp.RCount = rootRp.RCount + 1
  312. }
  313. rootRp.Count = rootRp.Count + 1
  314. rootRp.MTime = rp.CTime
  315. rp.Floor = rootRp.Count
  316. rp.MTime = rp.CTime
  317. rp.Content.RpID = rp.RpID
  318. rp.Content.CTime = rp.CTime
  319. rp.Content.MTime = rp.MTime
  320. if len(rp.Content.Ats) == 0 {
  321. rp.Content.Ats = s.regAt(c, rp.Content.Message, 0, rp.Mid)
  322. }
  323. rp.Content.Topics = s.regTopic(c, rp.Content.Message)
  324. // begin transaction
  325. if err = s.tranAdd(c, rp, false); err != nil {
  326. log.Error("Transaction add reply(%v) error(%v)", rp, err)
  327. return
  328. }
  329. // add cache
  330. if err = s.dao.Mc.AddSubject(c, sub); err != nil {
  331. log.Error("s.dao.Mc.AddSubject failed , oid(%d), err(%v)", sub.Oid, err)
  332. }
  333. if err = s.dao.Mc.AddReply(c, rp); err != nil {
  334. log.Error("s.dao.Mc.AddReply failed , RpID(%d), err(%v)", rp.RpID, err)
  335. }
  336. if err = s.dao.Mc.AddReply(c, rootRp); err != nil {
  337. log.Error("s.dao.Mc.AddReply failed , RpID(%d), err(%v)", rootRp.RpID, err)
  338. }
  339. if rootRp.IsTop() {
  340. if err = s.dao.Mc.AddTop(c, rootRp); err != nil {
  341. log.Error("s.dao.Mc.AddReply failed , RpID(%d), err(%v)", rootRp.RpID, err)
  342. }
  343. } else if rootRp.IsNormal() {
  344. if ok, err = s.dao.Redis.ExpireIndex(c, rp.Oid, rp.Type, model.SortByCount); err == nil && ok {
  345. s.dao.Redis.AddCountIndex(c, rp.Oid, rp.Type, rootRp)
  346. }
  347. }
  348. if rp.IsNormal() {
  349. // update reply count
  350. s.upAcount(c, sub.Oid, sub.Type, sub.ACount, rp.CTime.Time())
  351. // add index cache
  352. if ok, err = s.dao.Redis.ExpireNewChildIndex(c, rootRp.RpID); err == nil && ok {
  353. if err = s.dao.Redis.AddNewChildIndex(c, rootRp.RpID, rp); err != nil {
  354. log.Error("s.dao.Redis.AddFloorIndexByRoot failed , RpID(%d), err(%v)", rootRp.RpID, err)
  355. }
  356. }
  357. // add dialog cache
  358. if rp.Dialog != 0 {
  359. if ok, err = s.dao.Redis.ExpireDialogIndex(c, rp.Dialog); err == nil && ok {
  360. rps := []*model.Reply{rp}
  361. if err = s.dao.Redis.AddDialogIndex(c, rp.Dialog, rps); err != nil {
  362. log.Error("s.dao.Redis.AddDialogIndex failed , RpID(%d), Dialog(%d), Floor(%d) err(%v)", rp.RpID, rp.Dialog, rp.Floor, err)
  363. }
  364. }
  365. }
  366. s.notifyReplyReply(c, sub, rootRp, parentRp, rp)
  367. } else if rp.State == model.ReplyStateAudit {
  368. if err = s.dao.Redis.AddAuditIndex(c, rp); err != nil {
  369. log.Error("s.dao.Redis.AddAUditIndex(%d,%d,%d) error(%v)", rp.Oid, rp.RpID, rp.Type, err)
  370. }
  371. }
  372. if err = s.dao.PubEvent(c, _eventReply, rp.Mid, sub, rp, nil); err != nil {
  373. return
  374. }
  375. }
  376. func (s *Service) addTopCache(c context.Context, msg *consumerMsg) {
  377. var (
  378. err error
  379. sub *model.Subject
  380. rp *model.Reply
  381. )
  382. var d struct {
  383. Oid int64 `json:"oid"`
  384. Tp int8 `json:"tp"`
  385. Top uint32 `json:"top"`
  386. }
  387. if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
  388. log.Error("json.Unmarshal() error(%v)", err)
  389. return
  390. }
  391. if rp, err = s.dao.Mc.GetTop(c, d.Oid, d.Tp, d.Top); err != nil {
  392. log.Error("s.dao.Mc.GetTop(oid %v,top %v) err(%v)", d.Oid, d.Top, err)
  393. return
  394. } else if rp == nil {
  395. if rp, err = s.dao.Reply.GetTop(c, d.Oid, d.Tp, d.Top); err != nil || rp == nil {
  396. log.Error("s.dao.Reply.GetTop(%d, %d) error(%v)", d.Oid, d.Tp, err)
  397. return
  398. }
  399. if rp.Content, err = s.dao.Content.Get(c, d.Oid, rp.RpID); err != nil {
  400. return
  401. }
  402. s.dao.Mc.AddTop(c, rp)
  403. sub, err = s.dao.Subject.Get(c, d.Oid, d.Tp)
  404. if err != nil {
  405. log.Error("s.dao.Subject.Get(%d, %d) error(%v)", d.Oid, d.Tp, err)
  406. return
  407. }
  408. err = sub.TopSet(rp.RpID, d.Top, 1)
  409. if err != nil {
  410. return
  411. }
  412. _, err = s.dao.Subject.UpMeta(c, d.Oid, d.Tp, sub.Meta, time.Now())
  413. if err != nil {
  414. log.Error("s.dao.Subject.UpMeta(%d,%d,%d) failed!err:=%v ", rp.RpID, rp.Oid, d.Tp, err)
  415. return
  416. }
  417. s.dao.Mc.AddSubject(c, sub)
  418. }
  419. }
  420. func (s *Service) actionRpt(c context.Context, msg *consumerMsg) {
  421. var (
  422. err error
  423. ok bool
  424. )
  425. var d struct {
  426. Oid int64 `json:"oid"`
  427. RpID int64 `json:"rpid"`
  428. Tp int8 `json:"tp"`
  429. }
  430. if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
  431. log.Error("json.Unmarshal() error(%v)", err)
  432. return
  433. }
  434. rp, err := s.getReplyCache(c, d.Oid, d.RpID)
  435. if err != nil {
  436. log.Error("s.getReply failed , oid(%d), RpID(%d) err(%v)", d.Oid, d.RpID, err)
  437. return
  438. }
  439. if rp == nil {
  440. return
  441. }
  442. sub, err := s.getSubject(c, d.Oid, d.Tp)
  443. if err != nil || sub == nil {
  444. log.Error("s.getSubject failed , oid(%d),tp(%d), RpID(%d) err(%v)", d.Oid, d.Tp, d.RpID, err)
  445. return
  446. }
  447. // update like index
  448. if rp.Root == 0 && rp.Parent == 0 && !rp.IsDeleted() {
  449. if ok, err = s.dao.Redis.ExpireIndex(c, d.Oid, rp.Type, model.SortByLike); err == nil && ok {
  450. rpts := make(map[int64]*reply.Report, 1)
  451. if rpt, _ := s.dao.Report.Get(c, rp.Oid, rp.RpID); rpt != nil {
  452. rpts[rp.RpID] = rpt
  453. }
  454. if err = s.dao.Redis.AddLikeIndex(c, d.Oid, rp.Type, rpts, rp); err != nil {
  455. log.Error("s.dao.Redis.AddLikeIndex(%d, %d) error(%v)", d.Oid, rp.Type, err)
  456. }
  457. }
  458. }
  459. report, err := s.dao.Report.Get(c, d.Oid, d.RpID)
  460. if err != nil || report == nil {
  461. log.Error("dao.Report.GetReport(%d, %d) met error (%v)", rp.Oid, rp.RpID, err)
  462. return
  463. }
  464. if err = s.dao.PubEvent(c, _eventReportAdd, report.Mid, sub, rp, report); err != nil {
  465. return
  466. }
  467. }
  468. func (s *Service) setLike(c context.Context, cmsg *StatMsg) {
  469. var (
  470. event string
  471. )
  472. rp, err := s.getReply(c, cmsg.Oid, cmsg.ID)
  473. if err != nil || rp == nil || rp.Content == nil {
  474. log.Error("s.getReply(%d, %d) reply:%+v error(%v)", cmsg.Oid, cmsg.ID, rp, err)
  475. return
  476. }
  477. sub, err := s.getSubject(c, rp.Oid, rp.Type)
  478. if err != nil || sub == nil {
  479. log.Error("s.getSubject failed , oid(%d) type(%d) err(%v)", rp.Oid, rp.Type, err)
  480. return
  481. }
  482. _, err = s.dao.Reply.UpLike(c, cmsg.Oid, cmsg.ID, cmsg.Count, cmsg.DislikeCount, time.Now())
  483. if err != nil {
  484. log.Error("s.dao.Reply.UpLike (%v) failed!err:=%v", cmsg, err)
  485. return
  486. }
  487. if cmsg.Count > rp.Like {
  488. event = _eventLike
  489. var max int
  490. if max, err = s.dao.Redis.MaxLikeCnt(c, rp.RpID); err == nil && cmsg.Count > max {
  491. if err = s.dao.Redis.SetMaxLikeCnt(c, rp.RpID, int64(cmsg.Count)); err == nil {
  492. rp.Like = cmsg.Count
  493. rp.Hate = cmsg.DislikeCount
  494. s.notifyLike(c, cmsg.Mid, rp)
  495. }
  496. }
  497. } else if cmsg.DislikeCount > rp.Hate {
  498. event = _eventHate
  499. } else if cmsg.Count < rp.Like {
  500. event = _eventLikeCancel
  501. } else {
  502. event = _eventHateCancel
  503. }
  504. rp.Like = cmsg.Count
  505. rp.Hate = cmsg.DislikeCount
  506. s.dao.Mc.AddReply(c, rp)
  507. if rp.AttrVal(model.ReplyAttrAdminTop) == 1 || rp.AttrVal(model.ReplyAttrUpperTop) == 1 {
  508. s.dao.Mc.AddTop(c, rp)
  509. return
  510. }
  511. // if have root, then update root's index
  512. if rp.Root == 0 && rp.IsNormal() {
  513. var ok bool
  514. if ok, err = s.dao.Redis.ExpireIndex(c, rp.Oid, rp.Type, model.SortByLike); err == nil && ok {
  515. rpts := make(map[int64]*reply.Report, 1)
  516. if rpt, _ := s.dao.Report.Get(c, rp.Oid, rp.RpID); rpt != nil {
  517. rpts[rp.RpID] = rpt
  518. }
  519. if err = s.dao.Redis.AddLikeIndex(c, rp.Oid, rp.Type, rpts, rp); err != nil {
  520. log.Error("s.dao.Redis.AddLikeIndex(%d, %d) error(%v)", rp.Oid, rp.Type, err)
  521. }
  522. }
  523. }
  524. if err = s.dao.PubEvent(c, event, cmsg.Mid, sub, rp, nil); err != nil {
  525. return
  526. }
  527. }
  528. func (s *Service) adminLog(c context.Context, rp *model.Reply, adid int64, isreport, state int8, result, remark string) {
  529. // admin log
  530. s.dao.Admin.UpIsNotNew(c, rp.RpID, time.Now())
  531. s.dao.Admin.Insert(c, adid, rp.Oid, rp.RpID, rp.Type, result, remark, model.AdminIsNew, isreport, state, time.Now())
  532. }
  533. // getSubject get reply subject from mysql .
  534. // NOTE : note get from mc,count must depend on mysql
  535. func (s *Service) getSubject(c context.Context, oid int64, tp int8) (sub *model.Subject, err error) {
  536. if sub, err = s.dao.Subject.Get(c, oid, tp); err != nil {
  537. log.Error("dao.Subject.Get(%d, %d) error(%v)", oid, tp, err)
  538. }
  539. return
  540. }
  541. func (s *Service) getReply(c context.Context, oid, RpID int64) (rp *model.Reply, err error) {
  542. if rp, err = s.dao.Reply.Get(c, oid, RpID); err != nil {
  543. log.Error("s.dao.Reply.Get(%d, %d) error(%v)", oid, RpID, err)
  544. return
  545. } else if rp == nil {
  546. return
  547. }
  548. if rp.Content, err = s.dao.Content.Get(c, rp.Oid, rp.RpID); err != nil {
  549. log.Error("s.dao.Content.Get(%d,%d) error(%v)", rp.Oid, rp.RpID, err)
  550. } else if rp.Content == nil {
  551. err = errReplyContentNotFound
  552. }
  553. return
  554. }
  555. func (s *Service) getReplyCache(c context.Context, oid, RpID int64) (rp *model.Reply, err error) {
  556. if rp, err = s.dao.Mc.GetReply(c, RpID); err != nil {
  557. log.Error("replyCacheDao.GetReply(%d, %d) error(%v)", oid, RpID, err)
  558. }
  559. if rp != nil {
  560. return
  561. }
  562. if rp, err = s.dao.Reply.Get(c, oid, RpID); err != nil {
  563. log.Error("dao.Reply.GetReply(%d, %d) error(%v)", oid, RpID, err)
  564. }
  565. if rp != nil {
  566. rp.Content, _ = s.dao.Content.Get(c, rp.Oid, rp.RpID)
  567. // NOTE not add member info to cache
  568. }
  569. return
  570. }
  571. func (s *Service) upAcount(c context.Context, oid int64, tp int8, count int, now time.Time) {
  572. s.statDao.Send(c, tp, oid, count)
  573. }
  574. func (s *Service) callSearchUp(c context.Context, res map[string]*searchFlush) (err error) {
  575. var (
  576. rps []*searchFlush
  577. rpts []*searchFlush
  578. )
  579. for _, r := range res {
  580. if r.Report != nil {
  581. rpts = append(rpts, r)
  582. } else {
  583. rps = append(rps, r)
  584. }
  585. }
  586. if len(rps) > 0 {
  587. err = s.callSearch(c, rps, false)
  588. }
  589. if len(rpts) > 0 {
  590. err = s.callSearch(c, rpts, true)
  591. }
  592. return
  593. }
  594. // callSearch update reply or report info to ES search.
  595. func (s *Service) callSearch(c context.Context, params []*searchFlush, isRpt bool) (err error) {
  596. var (
  597. b []byte
  598. ms []map[string]interface{}
  599. p = url.Values{}
  600. urlStr = conf.Conf.Host.Search + "/api/reply/internal/update"
  601. res struct {
  602. Code int `json:"code"`
  603. Msg string `json:"msg"`
  604. }
  605. )
  606. // 更新搜索ES数据字段
  607. if isRpt {
  608. // report
  609. p.Set("appid", _appIDReport)
  610. for _, p := range params {
  611. m := make(map[string]interface{})
  612. m["id"] = fmt.Sprintf("%d_%d_%d", p.Reply.RpID, p.Reply.Oid, p.Reply.Type)
  613. m["reply_state"] = fmt.Sprintf("%d", p.Reply.State)
  614. m["reason"] = fmt.Sprintf("%d", p.Report.Reason)
  615. m["content"] = p.Report.Content
  616. m["state"] = fmt.Sprintf("%d", p.Report.State)
  617. m["mtime"] = p.Report.MTime.Time().Format(timeFormat)
  618. m["index_time"] = p.Report.CTime.Time().Format(timeFormat)
  619. if p.Report.Attr == 1 {
  620. m["attr"] = []int{1}
  621. } else {
  622. m["attr"] = []int{}
  623. }
  624. ms = append(ms, m)
  625. }
  626. if b, err = json.Marshal(ms); err != nil {
  627. log.Error("json.Marshal(%v) error(%v)", ms, err)
  628. return
  629. }
  630. p.Set("val", string(b))
  631. if err = searchHTTPClient.Post(c, urlStr, "", p, &res); err != nil {
  632. log.Error("xhttp.Post(%s) failed error(%v)", urlStr+"?"+p.Encode(), err)
  633. }
  634. log.Info("updateSearch: %s post:%s ret:%v", urlStr, p.Encode(), res)
  635. } else {
  636. // reply
  637. var rps = make(map[int64]*model.Reply)
  638. for _, p := range params {
  639. rps[p.Reply.RpID] = p.Reply
  640. }
  641. err = s.UpSearchReply(c, rps)
  642. }
  643. return
  644. }
  645. // UpSearchReply update search reply index.
  646. func (s *Service) UpSearchReply(c context.Context, rps map[int64]*model.Reply) (err error) {
  647. if len(rps) <= 0 {
  648. return
  649. }
  650. stales := s.es.NewUpdate("reply_list")
  651. for _, rp := range rps {
  652. m := make(map[string]interface{})
  653. m["id"] = rp.RpID
  654. m["state"] = rp.State
  655. m["mtime"] = rp.MTime.Time().Format("2006-01-02 15:04:05")
  656. m["oid"] = rp.Oid
  657. m["type"] = rp.Type
  658. if rp.Content != nil {
  659. m["message"] = rp.Content.Message
  660. }
  661. stales = stales.AddData(s.es.NewUpdate("reply_list").IndexByTime("reply_list", elastic.IndexTypeWeek, rp.CTime.Time()), m)
  662. }
  663. err = stales.Do(c)
  664. if err != nil {
  665. log.Error("upSearchReply update stales(%s) failed!err:=%v", stales.Params(), err)
  666. return
  667. }
  668. log.Info("upSearchReply:stale:%s ret:%+v", stales.Params(), err)
  669. return
  670. }
  671. // getBlackListRelation check if the source user blacklisted the target user
  672. func (s *Service) getBlackListRelation(c context.Context, srcID, targetID int64) (rel bool) {
  673. relMap, err := s.accSrv.RichRelations3(c, &accmdl.RichRelationReq{Owner: srcID, Mids: []int64{targetID}, RealIp: ""})
  674. if err != nil {
  675. log.Error("s.acc.RichRelations2 sourceId(%v) targetId(%v)error(%v)", srcID, targetID, err)
  676. err = nil
  677. return false
  678. }
  679. if len(relMap.RichRelations) == 0 {
  680. return false
  681. }
  682. if rel, ok := relMap.RichRelations[targetID]; ok && relmdl.Attr(uint32(rel)) == relmdl.AttrBlack {
  683. return true
  684. }
  685. return false
  686. }
  687. // getFilterBlacklist filters the user list that the mid user can notify message for
  688. func (s *Service) getFilterBlacklist(c context.Context, mid int64, targetIds []int64) (filterIds []int64) {
  689. filterIds = make([]int64, 0, len(targetIds))
  690. for _, tmp := range targetIds {
  691. if !s.getBlackListRelation(c, tmp, mid) {
  692. filterIds = append(filterIds, tmp)
  693. }
  694. }
  695. return
  696. }
  697. func (s *Service) addAssistLog(c context.Context, mid, uid, subjectID, typeID, action int64, objectID, content string) (err error) {
  698. if len(content) > 50 {
  699. content = substr2(content, 0, 50) + "..."
  700. }
  701. arg := &assmdl.ArgAssistLogAdd{
  702. Mid: mid,
  703. AssistMid: uid,
  704. Type: 1,
  705. Action: 1,
  706. SubjectID: subjectID,
  707. ObjectID: objectID,
  708. Detail: content,
  709. RealIP: "",
  710. }
  711. if err = s.assistSrv.AssistLogAdd(c, arg); err != nil {
  712. log.Error("s.assistSrv.Assist(%d, %d, %d, %d, %d) error(%v)", mid, uid, subjectID, typeID, action, err)
  713. }
  714. return
  715. }
  716. func substr2(str string, start int, subLength int) string {
  717. rs := []rune(str)
  718. length := len(rs)
  719. if start < 0 || start > length {
  720. start = 0
  721. }
  722. if subLength < 0 || subLength > length {
  723. subLength = length
  724. }
  725. return string(rs[start:subLength])
  726. }