search.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "go-common/app/admin/main/dm/model"
  9. "go-common/library/database/elastic"
  10. "go-common/library/log"
  11. "go-common/library/xstr"
  12. )
  13. var (
  14. _dmMointorFields = []string{"id", "type", "pid", "oid", "state", "attr", "mcount", "ctime", "mtime", "mid", "title", "author"}
  15. _dmReportFields = []string{"id", "dmid", "cid", "arc_aid", "arc_typeid", "dm_owner_uid", "dm_msg", "count", "content", "up_op",
  16. "state", "uid", "rp_time", "reason", "arc_title", "dm_deleted", "arc_mid", "pool_id", "model", "score", "dm_ctime", "ctime", "mtime"}
  17. )
  18. // return recent two years report search index.
  19. func (d *Dao) rptSearchIndex() string {
  20. year := time.Now().Year()
  21. return fmt.Sprintf("dmreport_%d,dmreport_%d", year-1, year)
  22. }
  23. // SearchMonitor get monitor list from search
  24. func (d *Dao) SearchMonitor(c context.Context, tp int32, pid, oid, mid int64, attr int32, kw, sort, order string, page, size int64) (data *model.SearchMonitorResult, err error) {
  25. req := d.esCli.NewRequest("dm_monitor_list").Index("dm_monitoring").Fields(_dmMointorFields...).Pn(int(page)).Ps(int(size))
  26. if tp > 0 {
  27. req.WhereEq("type", tp)
  28. }
  29. if pid > 0 {
  30. req.WhereEq("pid", pid)
  31. }
  32. if oid > 0 {
  33. req.WhereEq("oid", oid)
  34. }
  35. if mid > 0 {
  36. req.WhereEq("mid", mid)
  37. }
  38. if len(kw) > 0 {
  39. req.WhereLike([]string{"title"}, []string{kw}, false, elastic.LikeLevelLow)
  40. }
  41. if attr != 0 {
  42. req.WhereEq("attr_format", attr)
  43. } else {
  44. req.WhereIn("attr_format", []int64{int64(model.AttrSubMonitorBefore + 1), int64(model.AttrSubMonitorAfter + 1)})
  45. }
  46. if len(sort) > 0 && len(order) > 0 {
  47. req.Order(order, sort)
  48. }
  49. if err = req.Scan(c, &data); err != nil {
  50. log.Error("SearchMonitor:Scan params(%s) error(%v)", req.Params(), err)
  51. return
  52. }
  53. if data == nil || data.Page == nil {
  54. err = fmt.Errorf("data or data.Page nil")
  55. log.Error("SearchMonitor params(%s) error(%v)", req.Params(), err)
  56. return
  57. }
  58. return
  59. }
  60. // SearchReport2 .
  61. func (d *Dao) SearchReport2(c context.Context, params *model.ReportListParams) (data *model.SearchReportResult, err error) {
  62. req := d.esCli.NewRequest("dmreport").Index(d.rptSearchIndex()).Fields(_dmReportFields...).Ps(int(params.PageSize)).Pn(int(params.Page))
  63. if len(params.Tids) > 0 {
  64. req.WhereIn("arc_typeid", params.Tids)
  65. }
  66. if len(params.RpTypes) > 0 {
  67. req.WhereIn("reason", params.RpTypes)
  68. }
  69. if params.Aid > 0 {
  70. req.WhereEq("arc_aid", params.Aid)
  71. }
  72. if params.Cid > 0 {
  73. req.WhereEq("cid", params.Cid)
  74. }
  75. if params.UID > 0 {
  76. req.WhereEq("dm_owner_uid", params.UID)
  77. } else {
  78. req.WhereNot(elastic.NotTypeEq, "dm_owner_uid").WhereEq("dm_owner_uid", 0)
  79. }
  80. if params.RpUID > 0 {
  81. req.WhereEq("uid", params.RpUID)
  82. }
  83. if len(params.States) > 0 {
  84. req.WhereIn("state", params.States)
  85. }
  86. if len(params.UpOps) > 0 {
  87. req.WhereIn("up_op", params.UpOps)
  88. }
  89. if params.Start != "" || params.End != "" {
  90. req.WhereRange("rp_time", params.Start, params.End, elastic.RangeScopeLcRc)
  91. }
  92. if params.Keyword != "" {
  93. req.WhereLike([]string{"dm_msg"}, []string{params.Keyword}, false, elastic.LikeLevelLow)
  94. }
  95. if len(params.Sort) > 0 && len(params.Order) > 0 {
  96. req.Order(params.Order, params.Sort)
  97. }
  98. if err = req.Scan(c, &data); err != nil {
  99. log.Error("SearchReport:Scan params(%s) error(%v)", req.Params(), err)
  100. return
  101. }
  102. if data == nil || data.Page == nil {
  103. err = fmt.Errorf("data or data.Page nil")
  104. log.Error("SearchReport params(%s) error(%v)", req.Params(), err)
  105. return
  106. }
  107. return
  108. }
  109. // SearchReport get report list from search
  110. func (d *Dao) SearchReport(c context.Context, page, size int64, start, end, order, sort, keyword string, tid, rpID, state, upOp []int64, rt *model.Report) (data *model.SearchReportResult, err error) {
  111. req := d.esCli.NewRequest("dmreport").Index(d.rptSearchIndex()).Fields(_dmReportFields...).Ps(int(size)).Pn(int(page))
  112. if len(tid) > 0 {
  113. req.WhereIn("arc_typeid", tid)
  114. }
  115. if len(rpID) > 0 {
  116. req.WhereIn("reason", rpID)
  117. }
  118. if rt.Aid != -1 {
  119. req.WhereEq("arc_aid", rt.Aid)
  120. }
  121. if rt.Cid != -1 {
  122. req.WhereEq("cid", rt.Cid)
  123. }
  124. if rt.UID != -1 {
  125. req.WhereEq("dm_owner_uid", rt.UID)
  126. } else {
  127. req.WhereNot(elastic.NotTypeEq, "dm_owner_uid").WhereEq("dm_owner_uid", 0)
  128. }
  129. if rt.RpUID != -1 {
  130. req.WhereEq("uid", rt.RpUID)
  131. }
  132. if len(state) > 0 {
  133. req.WhereIn("state", state)
  134. }
  135. if len(upOp) > 0 {
  136. req.WhereIn("up_op", (upOp))
  137. }
  138. if start != "" || end != "" {
  139. req.WhereRange("rp_time", start, end, elastic.RangeScopeLcRc)
  140. }
  141. if keyword != "" {
  142. req.WhereLike([]string{"dm_msg"}, []string{keyword}, false, elastic.LikeLevelLow)
  143. }
  144. if len(sort) > 0 && len(order) > 0 {
  145. req.Order(order, sort)
  146. }
  147. if err = req.Scan(c, &data); err != nil {
  148. log.Error("SearchReport:Scan params(%s) error(%v)", req.Params(), err)
  149. return
  150. }
  151. if data == nil || data.Page == nil {
  152. err = fmt.Errorf("data or data.Page nil")
  153. log.Error("SearchReport params(%s) error(%v)", req.Params(), err)
  154. return
  155. }
  156. return
  157. }
  158. // SearchReportByID search report by cid and dmids.
  159. func (d *Dao) SearchReportByID(c context.Context, dmids []int64) (data *model.SearchReportResult, err error) {
  160. req := d.esCli.NewRequest("dmreport").Index(d.rptSearchIndex()).Fields(_dmReportFields...).Ps(100)
  161. req.WhereIn("dmid", dmids)
  162. if err = req.Scan(c, &data); err != nil {
  163. log.Error("SearchReportByID:Scan params(%s) error(%v)", req.Params(), err)
  164. return
  165. }
  166. if data == nil || data.Page == nil {
  167. err = fmt.Errorf("data or data.Page nil")
  168. log.Error("SearchReportByID params(%s) error(%v)", req.Params(), err)
  169. return
  170. }
  171. return
  172. }
  173. // UptSearchReport 强制更新举报搜索索引 使用 v3
  174. func (d *Dao) UptSearchReport(c context.Context, uptRpts []*model.UptSearchReport) (err error) {
  175. upt := d.esCli.NewUpdate("dmreport")
  176. var t time.Time
  177. for _, rpt := range uptRpts {
  178. t, err = time.ParseInLocation("2006-01-02 15:04:05", rpt.Ctime, time.Local)
  179. if err != nil {
  180. log.Error("time.ParseInLocation(%s) error(%v)", rpt.Ctime, err)
  181. return
  182. }
  183. upt.AddData(fmt.Sprintf("dmreport_%d", t.Year()), rpt)
  184. }
  185. if err = upt.Do(c); err != nil {
  186. log.Error("update.Do() error(%v)", err)
  187. }
  188. return
  189. }
  190. // SearchDM 搜索弹幕
  191. func (d *Dao) SearchDM(c context.Context, p *model.SearchDMParams) (data *model.SearchDMData, err error) {
  192. var (
  193. order = "ctime"
  194. sort = "desc"
  195. req *elastic.Request
  196. )
  197. req = d.esCli.NewRequest("dm").Index(fmt.Sprintf("dm_%03d", p.Oid%_indexSharding)).Fields("id").Ps(int(p.Size)).Pn(int(p.Page))
  198. req.WhereEq("oid", p.Oid)
  199. if p.Mid != model.CondIntNil {
  200. req.WhereEq("mid", p.Mid)
  201. }
  202. if p.State != "" {
  203. if states, err1 := xstr.SplitInts(p.State); err1 == nil {
  204. req.WhereIn("state", states)
  205. }
  206. }
  207. if p.Pool != "" {
  208. if pools, err1 := xstr.SplitInts(p.Pool); err1 == nil {
  209. req.WhereIn("pool", pools)
  210. }
  211. }
  212. if p.Attrs != "" {
  213. if attrs, err1 := xstr.SplitInts(p.Attrs); err1 == nil {
  214. req.WhereIn("attr_format", attrs)
  215. }
  216. }
  217. if p.IP != "" {
  218. req.WhereEq("ip_format", p.IP)
  219. }
  220. switch {
  221. case p.ProgressFrom != model.CondIntNil && p.ProgressTo != model.CondIntNil:
  222. req.WhereRange("progress", p.ProgressFrom, p.ProgressTo, elastic.RangeScopeLcRc)
  223. case p.ProgressFrom != model.CondIntNil:
  224. req.WhereRange("progress", p.ProgressFrom, nil, elastic.RangeScopeLcRc)
  225. case p.ProgressTo != model.CondIntNil:
  226. req.WhereRange("progress", nil, p.ProgressTo, elastic.RangeScopeLcRc)
  227. }
  228. switch {
  229. case p.CtimeFrom != model.CondIntNil && p.CtimeTo != model.CondIntNil:
  230. req.WhereRange("ctime", time.Unix(p.CtimeFrom, 0).Format("2006-01-02 15:04:05"), time.Unix(p.CtimeTo, 0).Format("2006-01-02 15:04:05"), elastic.RangeScopeLcRc)
  231. case p.CtimeFrom != model.CondIntNil:
  232. req.WhereRange("ctime", time.Unix(p.CtimeFrom, 0).Format("2006-01-02 15:04:05"), nil, elastic.RangeScopeLcRc)
  233. case p.CtimeTo != model.CondIntNil:
  234. req.WhereRange("ctime", nil, time.Unix(p.CtimeTo, 0).Format("2006-01-02 15:04:05"), elastic.RangeScopeLcRc)
  235. }
  236. if p.Keyword != "" {
  237. req.WhereLike([]string{"kwmsg"}, []string{p.Keyword}, false, elastic.LikeLevelHigh)
  238. }
  239. if p.Order != "" {
  240. order = p.Order
  241. }
  242. if p.Sort == "asc" {
  243. sort = p.Sort
  244. }
  245. req.Order(order, sort)
  246. if err = req.Scan(c, &data); err != nil {
  247. log.Error("SearchDM:Scan params(%s) error(%v)", req.Params(), err)
  248. return
  249. }
  250. if data == nil || data.Page == nil {
  251. err = fmt.Errorf("data or data.Page nil")
  252. log.Error("SearchDM params(%s) error(%v)", req.Params(), err)
  253. return
  254. }
  255. return
  256. }
  257. // SearchProtectCount get protected dm count.
  258. func (d *Dao) SearchProtectCount(c context.Context, tp int32, oid int64) (count int64, err error) {
  259. var res struct {
  260. Result map[string][]struct {
  261. Key string `json:"key"`
  262. Count int64 `json:"doc_count"`
  263. } `json:"result"`
  264. }
  265. order := []map[string]string{{"attr": "desc"}}
  266. req := d.esCli.NewRequest("dm").Fields("attr").Index(fmt.Sprintf("dm_%03d", d.hitIndex(oid)))
  267. req.WhereEq("oid", oid).WhereIn("attr_format", "1").GroupBy(elastic.EnhancedModeGroupBy, "attr", order)
  268. req.Pn(1).Ps(10)
  269. if err = req.Scan(c, &res); err != nil {
  270. log.Error("req.Scan() error(%v)", err)
  271. return
  272. }
  273. if values, ok := res.Result["group_by_attr"]; ok {
  274. for _, v := range values {
  275. count = count + v.Count
  276. }
  277. }
  278. return
  279. }
  280. // UpSearchDMState 通知搜索服务更新弹幕状态
  281. func (d *Dao) UpSearchDMState(c context.Context, tp int32, state int32, dmidM map[int64][]int64) (err error) {
  282. upt := d.esCli.NewUpdate("dm")
  283. for oid, dmids := range dmidM {
  284. for _, dmid := range dmids {
  285. data := &model.UptSearchDMState{
  286. ID: dmid,
  287. Oid: oid,
  288. State: state,
  289. Type: tp,
  290. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  291. }
  292. upt.AddData(fmt.Sprintf("dm_%03d", oid%_indexSharding), data)
  293. }
  294. }
  295. if err = upt.Do(c); err != nil {
  296. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  297. }
  298. return
  299. }
  300. // UpSearchDMPool 通知搜索服务更新弹幕池
  301. func (d *Dao) UpSearchDMPool(c context.Context, tp int32, oid int64, pool int32, dmids []int64) (err error) {
  302. upt := d.esCli.NewUpdate("dm")
  303. for _, dmid := range dmids {
  304. data := &model.UptSearchDMPool{
  305. ID: dmid,
  306. Oid: oid,
  307. Pool: pool,
  308. Type: tp,
  309. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  310. }
  311. upt.AddData(fmt.Sprintf("dm_%03d", oid%_indexSharding), data)
  312. }
  313. if err = upt.Do(c); err != nil {
  314. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  315. }
  316. return
  317. }
  318. // UpSearchDMAttr 通知搜索服务更新弹幕属性
  319. func (d *Dao) UpSearchDMAttr(c context.Context, tp int32, oid int64, attr int32, dmids []int64) (err error) {
  320. var bits []int64
  321. for k, v := range strconv.FormatInt(int64(attr), 2) {
  322. if v == 49 {
  323. bits = append(bits, int64(k+1))
  324. }
  325. }
  326. upt := d.esCli.NewUpdate("dm")
  327. for _, dmid := range dmids {
  328. data := &model.UptSearchDMAttr{
  329. ID: dmid,
  330. Oid: oid,
  331. Attr: attr,
  332. AttrFormat: bits,
  333. Type: tp,
  334. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  335. }
  336. upt.AddData(fmt.Sprintf("dm_%03d", oid%_indexSharding), data)
  337. }
  338. if err = upt.Do(c); err != nil {
  339. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  340. }
  341. return
  342. }
  343. // SearchSubjectLog get subject log
  344. func (d *Dao) SearchSubjectLog(c context.Context, tp int32, oid int64) (data []*model.SubjectLog, err error) {
  345. req := d.esCli.NewRequest("log_audit").Index("log_audit_31_all").Fields("uid", "uname", "oid", "ctime", "action", "extra_data").WhereEq("oid", oid).WhereEq("type", tp)
  346. req.Ps(20).Order("ctime", "desc")
  347. res := &model.SearchSubjectLog{}
  348. if err = req.Scan(c, &res); err != nil || res == nil {
  349. log.Error("SearchSubcetLog:Scan params(%s) error(%v)", req.Params(), err)
  350. return
  351. }
  352. data = make([]*model.SubjectLog, 0)
  353. s := new(struct {
  354. Comment string `json:"comment"`
  355. })
  356. for _, v := range res.Result {
  357. if err = json.Unmarshal([]byte(v.ExtraData), s); err != nil {
  358. log.Error("json.Unmarshal(%s) error(%v)", v.ExtraData, err)
  359. return
  360. }
  361. log := &model.SubjectLog{
  362. UID: v.UID,
  363. Uname: v.Uname,
  364. Oid: v.Oid,
  365. Action: v.Action,
  366. Comment: s.Comment,
  367. Ctime: v.Ctime,
  368. }
  369. data = append(data, log)
  370. }
  371. return
  372. }
  373. // UpSearchRecentDMState .
  374. func (d *Dao) UpSearchRecentDMState(c context.Context, tp int32, state int32, dmidM map[int64][]int64) (err error) {
  375. upt := d.esCli.NewUpdate("dm_home")
  376. year, month, _ := time.Now().Date()
  377. yearPre, monthPre, _ := time.Now().AddDate(0, -1, 0).Date()
  378. for oid, dmids := range dmidM {
  379. for _, dmid := range dmids {
  380. data := &model.UptSearchDMState{
  381. ID: dmid,
  382. Oid: oid,
  383. State: state,
  384. Type: tp,
  385. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  386. }
  387. upt.AddData(fmt.Sprintf("dm_home_%v",
  388. fmt.Sprintf("%d_%02d", yearPre, int(monthPre)),
  389. ), data)
  390. upt.AddData(fmt.Sprintf("dm_home_%v",
  391. fmt.Sprintf("%d_%02d", year, int(month)),
  392. ), data)
  393. }
  394. }
  395. if err = upt.Do(c); err != nil {
  396. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  397. }
  398. return
  399. }
  400. // UpSearchRecentDMPool .
  401. func (d *Dao) UpSearchRecentDMPool(c context.Context, tp int32, oid int64, pool int32, dmids []int64) (err error) {
  402. upt := d.esCli.NewUpdate("dm_home")
  403. year, month, _ := time.Now().Date()
  404. yearPre, monthPre, _ := time.Now().AddDate(0, -1, 0).Date()
  405. for _, dmid := range dmids {
  406. data := &model.UptSearchDMPool{
  407. ID: dmid,
  408. Oid: oid,
  409. Pool: pool,
  410. Type: tp,
  411. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  412. }
  413. upt.AddData(fmt.Sprintf("dm_home_%v",
  414. fmt.Sprintf("%d_%02d", yearPre, int(monthPre)),
  415. ), data)
  416. upt.AddData(fmt.Sprintf("dm_home_%v",
  417. fmt.Sprintf("%d_%02d", year, int(month)),
  418. ), data)
  419. }
  420. if err = upt.Do(c); err != nil {
  421. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  422. }
  423. return
  424. }
  425. // UpSearchRecentDMAttr .
  426. func (d *Dao) UpSearchRecentDMAttr(c context.Context, tp int32, oid int64, attr int32, dmids []int64) (err error) {
  427. var bits []int64
  428. for k, v := range strconv.FormatInt(int64(attr), 2) {
  429. if v == 49 {
  430. bits = append(bits, int64(k+1))
  431. }
  432. }
  433. upt := d.esCli.NewUpdate("dm_home")
  434. year, month, _ := time.Now().Date()
  435. yearPre, monthPre, _ := time.Now().AddDate(0, -1, 0).Date()
  436. for _, dmid := range dmids {
  437. data := &model.UptSearchDMAttr{
  438. ID: dmid,
  439. Oid: oid,
  440. Attr: attr,
  441. AttrFormat: bits,
  442. Type: tp,
  443. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  444. }
  445. upt.AddData(fmt.Sprintf("dm_home_%v",
  446. fmt.Sprintf("%d_%02d", yearPre, int(monthPre)),
  447. ), data)
  448. upt.AddData(fmt.Sprintf("dm_home_%v",
  449. fmt.Sprintf("%d_%02d", year, int(month)),
  450. ), data)
  451. }
  452. if err = upt.Do(c); err != nil {
  453. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  454. }
  455. return
  456. }
  457. // SearchSubject get subject log list from search
  458. func (d *Dao) SearchSubject(c context.Context, req *model.SearchSubjectReq) (data []int64, page *model.Page, err error) {
  459. r := d.esCli.NewRequest("dm_monitor_list").Index("dm_monitoring").Fields("oid")
  460. if len(req.Oids) > 0 {
  461. r.WhereIn("oid", req.Oids)
  462. }
  463. if len(req.Mids) > 0 {
  464. r.WhereIn("mid", req.Mids)
  465. }
  466. if len(req.Aids) > 0 {
  467. r.WhereIn("pid", req.Aids)
  468. }
  469. if len(req.Attrs) > 0 {
  470. r.WhereIn("attr_format", req.Attrs)
  471. }
  472. if req.State != model.CondIntNil {
  473. r.WhereEq("state", req.State)
  474. }
  475. r.Ps(int(req.Ps)).Pn(int(req.Pn)).Order(req.Order, req.Sort)
  476. res := &model.SearchSubjectResult{}
  477. if err = r.Scan(c, &res); err != nil {
  478. log.Error("SearchSubject:Scan params(%s) error(%v)", r.Params(), err)
  479. return
  480. }
  481. if res == nil || res.Page == nil {
  482. err = fmt.Errorf("data or data.Page nil")
  483. log.Error("SearchSubject params(%s) error(%v)", r.Params(), err)
  484. return
  485. }
  486. for _, v := range res.Result {
  487. data = append(data, v.Oid)
  488. }
  489. page = res.Page
  490. return
  491. }