search.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "go-common/app/interface/main/dm2/model"
  9. "go-common/library/database/elastic"
  10. "go-common/library/log"
  11. "go-common/library/xstr"
  12. )
  13. var (
  14. _dataTimeFormat = "2006-01-02 15:03:04"
  15. _subtitleFields = []string{"oid", "id"}
  16. _dmRecentFields = []string{"attr", "color", "ctime", "fontsize", "id", "mid", "mode", "msg", "mtime", "oid", "pool", "progress", "state", "type", "pid"}
  17. )
  18. func hisDateIndex(month string) string {
  19. return "dm_date_" + strings.Replace(month, "-", "_", -1)
  20. }
  21. // SearchDMHisIndex get dm date index by oid from search.
  22. func (d *Dao) SearchDMHisIndex(c context.Context, tp int32, oid int64, month string) (dates []string, err error) {
  23. var (
  24. pn, ps = 1, 31
  25. res model.SearchHistoryIdxResult
  26. )
  27. req := d.elastic.NewRequest("dm_date")
  28. req.Fields("date").WhereEq("oid", oid)
  29. req.Index(hisDateIndex(month)).Pn(pn).Ps(ps).Order("date", "asc")
  30. if err = req.Scan(c, &res); err != nil {
  31. log.Error("elastic search(%s) error(%v)", req.Params(), err)
  32. return
  33. }
  34. for _, v := range res.Result {
  35. dates = append(dates, v.Date)
  36. }
  37. return
  38. }
  39. // SearchDMHistory get history dmid from search.
  40. // 搜索定制api,改动需要沟通
  41. func (d *Dao) SearchDMHistory(c context.Context, tp int32, oid, ctimeTo int64, pn, ps int) (dmids []int64, err error) {
  42. var (
  43. res model.SearchHistoryResult
  44. end = time.Unix(ctimeTo, 0).Format("2006-01-02 15:04:05")
  45. )
  46. req := d.elastic.NewRequest("dm_history")
  47. req.Index(fmt.Sprintf("dm_search_%03d", oid%_indexSharding))
  48. req.Fields("id").WhereEq("oid", oid).WhereIn("state", []int64{0, 2, 6})
  49. req.WhereRange("ctime", nil, end, elastic.RangeScopeLcRc)
  50. req.Pn(pn).Ps(ps).Order("ctime", "desc")
  51. if err = req.Scan(c, &res); err != nil {
  52. log.Error("elastic search(%s) error(%v)", req.Params(), err)
  53. return
  54. }
  55. for _, v := range res.Result {
  56. dmids = append(dmids, v.ID)
  57. }
  58. return
  59. }
  60. // SearchDM 搜索弹幕
  61. func (d *Dao) SearchDM(c context.Context, p *model.SearchDMParams) (res *model.SearchDMData, err error) {
  62. req := d.elastic.NewRequest("dm_search")
  63. req.Fields("id").Index(fmt.Sprintf("dm_search_%03d", p.Oid%_indexSharding)).WhereEq("oidstr", p.Oid)
  64. if p.Mids != "" {
  65. mids, _ := xstr.SplitInts(p.Mids)
  66. req.WhereIn("mid", mids)
  67. }
  68. if p.State != "" {
  69. states, _ := xstr.SplitInts(p.State)
  70. req.WhereIn("state", states)
  71. }
  72. if p.Mode != "" {
  73. modes, _ := xstr.SplitInts(p.Mode)
  74. req.WhereIn("mode", modes)
  75. }
  76. if p.Pool != "" {
  77. pools, _ := xstr.SplitInts(p.Pool)
  78. req.WhereIn("pool", pools)
  79. }
  80. if p.Attrs != "" {
  81. attrs, _ := xstr.SplitInts(p.Attrs)
  82. req.WhereIn("attr_format", attrs)
  83. }
  84. req.WhereEq("type", p.Type)
  85. switch {
  86. case p.ProgressFrom != model.CondIntNil && p.ProgressTo != model.CondIntNil:
  87. req.WhereRange("progress_long", p.ProgressFrom, p.ProgressTo, elastic.RangeScopeLcRc)
  88. case p.ProgressFrom != model.CondIntNil:
  89. req.WhereRange("progress_long", p.ProgressFrom, nil, elastic.RangeScopeLcRc)
  90. case p.ProgressTo != model.CondIntNil:
  91. req.WhereRange("progress_long", nil, p.ProgressTo, elastic.RangeScopeLcRc)
  92. }
  93. req.WhereRange("ctime", p.CtimeFrom, p.CtimeTo, elastic.RangeScopeLcRc)
  94. if p.Keyword != "" {
  95. req.WhereLike([]string{"msg"}, []string{p.Keyword}, true, elastic.LikeLevelHigh)
  96. req.OrderScoreFirst(true)
  97. }
  98. if p.Order == "progress" {
  99. p.Order = "progress_long"
  100. }
  101. req.Order(p.Order, p.Sort)
  102. req.Pn(int(p.Pn)).Ps(int(p.Ps))
  103. res = &model.SearchDMData{}
  104. if err = req.Scan(c, &res); err != nil {
  105. log.Error("search params(%s), error(%v)", req.Params(), err)
  106. }
  107. return
  108. }
  109. // UptSearchDMState update dm search state
  110. func (d *Dao) UptSearchDMState(c context.Context, dmids []int64, oid int64, state, tp int32) (err error) {
  111. upt := d.elastic.NewUpdate("dm_search")
  112. for _, dmid := range dmids {
  113. data := &model.UptSearchDMState{
  114. ID: dmid,
  115. Oid: oid,
  116. State: state,
  117. Type: tp,
  118. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  119. }
  120. upt.AddData(fmt.Sprintf("dm_search_%03d", oid%_indexSharding), data)
  121. }
  122. if err = upt.Do(c); err != nil {
  123. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  124. }
  125. return
  126. }
  127. // UptSearchDMPool update dm search pool
  128. func (d *Dao) UptSearchDMPool(c context.Context, dmids []int64, oid int64, pool, tp int32) (err error) {
  129. upt := d.elastic.NewUpdate("dm_search")
  130. for _, dmid := range dmids {
  131. data := &model.UptSearchDMPool{
  132. ID: dmid,
  133. Oid: oid,
  134. Pool: pool,
  135. Type: tp,
  136. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  137. }
  138. upt.AddData(fmt.Sprintf("dm_search_%03d", oid%_indexSharding), data)
  139. }
  140. if err = upt.Do(c); err != nil {
  141. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  142. }
  143. return
  144. }
  145. // UptSearchDMAttr update dm search attr
  146. func (d *Dao) UptSearchDMAttr(c context.Context, dmids []int64, oid int64, attr, tp int32) (err error) {
  147. var bits []int64
  148. for k, v := range strconv.FormatInt(int64(attr), 2) {
  149. if v == 49 {
  150. bits = append(bits, int64(k+1))
  151. }
  152. }
  153. upt := d.elastic.NewUpdate("dm_search")
  154. for _, dmid := range dmids {
  155. data := &model.UptSearchDMAttr{
  156. ID: dmid,
  157. Oid: oid,
  158. Attr: attr,
  159. AttrFormat: bits,
  160. Type: tp,
  161. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  162. }
  163. upt.AddData(fmt.Sprintf("dm_search_%03d", oid%_indexSharding), data)
  164. }
  165. if err = upt.Do(c); err != nil {
  166. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  167. }
  168. return
  169. }
  170. // SearchSubtitles .
  171. func (d *Dao) SearchSubtitles(c context.Context, page, size int32, mid int64, upMids []int64, aid, oid int64, tp int32, status []int64) (res *model.SearchSubtitleResult, err error) {
  172. var (
  173. req *elastic.Request
  174. fields []string
  175. )
  176. fields = _subtitleFields
  177. req = d.elastic.NewRequest("dm_subtitle").Index("subtitle").Fields(fields...).Pn(int(page)).Ps(int(size))
  178. if mid > 0 {
  179. req.WhereEq("mid", mid)
  180. }
  181. if aid > 0 {
  182. req.WhereEq("aid", aid)
  183. }
  184. if oid > 0 {
  185. req.WhereEq("oid", oid)
  186. req.WhereEq("type", tp)
  187. }
  188. switch {
  189. case len(upMids) > 0 && len(status) > 0:
  190. cmbs := &elastic.Combo{}
  191. cmbu := &elastic.Combo{}
  192. var (
  193. statusInf []interface{}
  194. upMidsInf []interface{}
  195. )
  196. for _, s := range status {
  197. statusInf = append(statusInf, s)
  198. }
  199. for _, s := range upMids {
  200. upMidsInf = append(upMidsInf, s)
  201. }
  202. cmbs.ComboIn([]map[string][]interface{}{
  203. {"status": statusInf},
  204. })
  205. cmbu.ComboIn([]map[string][]interface{}{
  206. {"up_mid": upMidsInf},
  207. })
  208. req = req.WhereCombo(cmbs.MinIn(1).MinAll(1), cmbu.MinIn(1).MinAll(1))
  209. case len(upMids) > 0:
  210. req.WhereIn("up_mid", upMids)
  211. case len(status) > 0:
  212. req.WhereIn("status", status)
  213. }
  214. req.Order("mtime", "desc")
  215. if err = req.Scan(c, &res); err != nil {
  216. log.Error("elastic search(%s) error(%v)", req.Params(), err)
  217. return
  218. }
  219. return
  220. }
  221. // CountSubtitles .
  222. func (d *Dao) CountSubtitles(c context.Context, mid int64, upMids []int64, aid, oid int64, tp int32) (countSubtitle *model.CountSubtitleResult, err error) {
  223. var (
  224. req *elastic.Request
  225. fields []string
  226. res map[string]interface{}
  227. _searchStatus = []string{
  228. fmt.Sprint(model.SubtitleStatusDraft),
  229. fmt.Sprint(model.SubtitleStatusToAudit),
  230. fmt.Sprint(model.SubtitleStatusAuditBack),
  231. fmt.Sprint(model.SubtitleStatusPublish),
  232. fmt.Sprint(model.SubtitleStatusCheckToAudit),
  233. fmt.Sprint(model.SubtitleStatusCheckPublish),
  234. fmt.Sprint(model.SubtitleStatusManagerBack),
  235. }
  236. result map[string]interface{}
  237. groupStatus []interface{}
  238. itemStatus map[string]interface{}
  239. ok bool
  240. )
  241. req = d.elastic.NewRequest("dm_subtitle").Index("subtitle").Fields(fields...).Pn(0).Ps(0)
  242. if mid > 0 {
  243. req.WhereEq("mid", mid)
  244. }
  245. if aid > 0 {
  246. req.WhereEq("aid", aid)
  247. }
  248. if oid > 0 {
  249. req.WhereEq("oid", oid)
  250. req.WhereEq("type", tp)
  251. }
  252. if len(upMids) > 0 {
  253. req.WhereIn("up_mid", upMids)
  254. }
  255. req = req.GroupBy(elastic.EnhancedModeGroupBy, "status", nil).WhereIn("status", _searchStatus)
  256. if err = req.Scan(c, &res); err != nil {
  257. log.Error("elastic search(%s) error(%v)", req.Params(), err)
  258. return
  259. }
  260. countSubtitle = &model.CountSubtitleResult{}
  261. if result, ok = res["result"].(map[string]interface{}); !ok {
  262. return
  263. }
  264. if groupStatus, ok = result["group_by_status"].([]interface{}); !ok {
  265. return
  266. }
  267. for _, item := range groupStatus {
  268. if itemStatus, ok = item.(map[string]interface{}); ok {
  269. docCount, _ := itemStatus["doc_count"].(float64)
  270. switch itemStatus["key"] {
  271. case fmt.Sprint(model.SubtitleStatusDraft):
  272. countSubtitle.Draft += int64(docCount)
  273. case fmt.Sprint(model.SubtitleStatusToAudit):
  274. countSubtitle.ToAudit += int64(docCount)
  275. case fmt.Sprint(model.SubtitleStatusAuditBack):
  276. countSubtitle.AuditBack += int64(docCount)
  277. case fmt.Sprint(model.SubtitleStatusPublish):
  278. countSubtitle.Publish += int64(docCount)
  279. case fmt.Sprint(model.SubtitleStatusCheckToAudit):
  280. countSubtitle.ToAudit += int64(docCount)
  281. case fmt.Sprint(model.SubtitleStatusCheckPublish):
  282. countSubtitle.Publish += int64(docCount)
  283. case fmt.Sprint(model.SubtitleStatusManagerBack):
  284. countSubtitle.AuditBack += int64(docCount)
  285. }
  286. }
  287. }
  288. return
  289. }
  290. // UptSearchRecentState .
  291. func (d *Dao) UptSearchRecentState(c context.Context, dmids []int64, oid int64, state, tp int32) (err error) {
  292. upt := d.elastic.NewUpdate("dm_home")
  293. year, month, _ := time.Now().Date()
  294. yearPre, monthPre, _ := time.Now().AddDate(0, -1, 0).Date()
  295. for _, dmid := range dmids {
  296. data := &model.UptSearchDMState{
  297. ID: dmid,
  298. Oid: oid,
  299. State: state,
  300. Type: tp,
  301. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  302. }
  303. upt.AddData(fmt.Sprintf("dm_home_%v",
  304. fmt.Sprintf("%d_%02d", yearPre, int(monthPre)),
  305. ), data)
  306. upt.AddData(fmt.Sprintf("dm_home_%v",
  307. fmt.Sprintf("%d_%02d", year, int(month)),
  308. ), data)
  309. }
  310. if err = upt.Do(c); err != nil {
  311. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  312. }
  313. return
  314. }
  315. // UptSearchRecentAttr .
  316. func (d *Dao) UptSearchRecentAttr(c context.Context, dmids []int64, oid int64, attr, tp int32) (err error) {
  317. var bits []int64
  318. for k, v := range strconv.FormatInt(int64(attr), 2) {
  319. if v == 49 {
  320. bits = append(bits, int64(k+1))
  321. }
  322. }
  323. upt := d.elastic.NewUpdate("dm_home")
  324. year, month, _ := time.Now().Date()
  325. yearPre, monthPre, _ := time.Now().AddDate(0, -1, 0).Date()
  326. for _, dmid := range dmids {
  327. data := &model.UptSearchDMAttr{
  328. ID: dmid,
  329. Oid: oid,
  330. Attr: attr,
  331. AttrFormat: bits,
  332. Type: tp,
  333. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  334. }
  335. upt.AddData(fmt.Sprintf("dm_home_%v",
  336. fmt.Sprintf("%d_%02d", yearPre, int(monthPre)),
  337. ), data)
  338. upt.AddData(fmt.Sprintf("dm_home_%v",
  339. fmt.Sprintf("%d_%02d", year, int(month)),
  340. ), data)
  341. }
  342. if err = upt.Do(c); err != nil {
  343. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  344. }
  345. return
  346. }
  347. // UptSearchRecentPool .
  348. func (d *Dao) UptSearchRecentPool(c context.Context, dmids []int64, oid int64, pool, tp int32) (err error) {
  349. upt := d.elastic.NewUpdate("dm_home")
  350. year, month, _ := time.Now().Date()
  351. yearPre, monthPre, _ := time.Now().AddDate(0, -1, 0).Date()
  352. for _, dmid := range dmids {
  353. data := &model.UptSearchDMPool{
  354. ID: dmid,
  355. Oid: oid,
  356. Pool: pool,
  357. Type: tp,
  358. Mtime: time.Now().Format("2006-01-02 15:04:05"),
  359. }
  360. upt.AddData(fmt.Sprintf("dm_home_%v",
  361. fmt.Sprintf("%d_%02d", yearPre, int(monthPre)),
  362. ), data)
  363. upt.AddData(fmt.Sprintf("dm_home_%v",
  364. fmt.Sprintf("%d_%02d", year, int(month)),
  365. ), data)
  366. }
  367. if err = upt.Do(c); err != nil {
  368. log.Error("update.Do() params(%s) error(%v)", upt.Params(), err)
  369. }
  370. return
  371. }
  372. // SearhcDmRecent .
  373. func (d *Dao) SearhcDmRecent(c context.Context, param *model.SearchRecentDMParam) (res *model.SearchRecentDMResult, err error) {
  374. var (
  375. req *elastic.Request
  376. )
  377. year, month, _ := time.Now().Date()
  378. yearPre, monthPre, _ := time.Now().AddDate(0, -1, 0).Date()
  379. req = d.elastic.NewRequest("dm_home").Index(fmt.Sprintf("dm_home_%v,dm_home_%v",
  380. fmt.Sprintf("%d_%02d", year, int(month)),
  381. fmt.Sprintf("%d_%02d", yearPre, int(monthPre)),
  382. )).Fields(_dmRecentFields...).Pn(param.Pn).Ps(param.Ps)
  383. if param.Type > 0 {
  384. req.WhereEq("type", param.Type)
  385. }
  386. if param.UpMid > 0 {
  387. req.WhereEq("o_mid", param.UpMid)
  388. }
  389. if len(param.States) > 0 {
  390. req.WhereIn("state", param.States)
  391. }
  392. req.WhereRange("ctime", time.Now().Local().AddDate(0, 0, -30).Format(_dataTimeFormat), time.Now().Local().Format(_dataTimeFormat), elastic.RangeScopeLcRo)
  393. req.Order(param.Field, param.Sort)
  394. res = &model.SearchRecentDMResult{}
  395. if err = req.Scan(c, &res); err != nil {
  396. log.Error("search params(%s), error(%v)", req.Params(), err)
  397. }
  398. return
  399. }