archive.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/admin/main/search/model"
  8. "gopkg.in/olivere/elastic.v5"
  9. )
  10. // ArchiveCheck search archive check from ES.
  11. func (d *Dao) ArchiveCheck(c context.Context, p *model.ArchiveCheckParams) (res *model.SearchResult, err error) {
  12. query := elastic.NewBoolQuery()
  13. if len(p.Bsp.KWs) > 0 {
  14. for _, v := range p.Bsp.KWs {
  15. if p.Bsp.Pattern == "equal" {
  16. query = query.Must(elastic.NewMultiMatchQuery(v, p.Bsp.KwFields...).Type("best_fields").TieBreaker(0.3).MinimumShouldMatch("100%"))
  17. } else {
  18. query = query.Should(elastic.NewMultiMatchQuery(v, p.Bsp.KwFields...).Type("best_fields").TieBreaker(0.3).MinimumShouldMatch("80%")).MinimumNumberShouldMatch(1)
  19. }
  20. }
  21. } else if p.Bsp.KW != "" { //高级搜索比下面的高
  22. query = query.Must(elastic.NewMultiMatchQuery(p.Bsp.KW, p.Bsp.KwFields...).Type("best_fields").TieBreaker(0.3).MinimumShouldMatch("100%"))
  23. }
  24. if p.FromIP != "" {
  25. query = query.Must(elastic.NewQueryStringQuery("*" + p.FromIP + "*").AllowLeadingWildcard(true).Field("from_ip"))
  26. }
  27. if len(p.Aids) > 0 {
  28. interfaceSlice := make([]interface{}, len(p.Aids))
  29. for i, d := range p.Aids {
  30. interfaceSlice[i] = d
  31. }
  32. query = query.Filter(elastic.NewTermsQuery("aid", interfaceSlice...))
  33. }
  34. if len(p.TypeIds) > 0 {
  35. interfaceSlice := make([]interface{}, len(p.TypeIds))
  36. for i, d := range p.TypeIds {
  37. interfaceSlice[i] = d
  38. }
  39. query = query.Filter(elastic.NewTermsQuery("typeid", interfaceSlice...))
  40. }
  41. if len(p.Attrs) > 0 {
  42. interfaceSlice := make([]interface{}, len(p.Attrs))
  43. for i, d := range p.Attrs {
  44. interfaceSlice[i] = d
  45. }
  46. query = query.Filter(elastic.NewTermsQuery("attribute", interfaceSlice...))
  47. }
  48. if len(p.States) > 0 {
  49. interfaceSlice := make([]interface{}, len(p.States))
  50. for i, d := range p.States {
  51. interfaceSlice[i] = d
  52. }
  53. query = query.Filter(elastic.NewTermsQuery("state", interfaceSlice...))
  54. }
  55. if len(p.Mids) > 0 {
  56. interfaceSlice := make([]interface{}, len(p.Mids))
  57. for i, d := range p.Mids {
  58. interfaceSlice[i] = d
  59. }
  60. query = query.Filter(elastic.NewTermsQuery("mid", interfaceSlice...))
  61. }
  62. if p.MidFrom > 0 {
  63. query = query.Filter(elastic.NewRangeQuery("mid").Gte(p.MidFrom))
  64. }
  65. if p.MidTo > 0 {
  66. query = query.Filter(elastic.NewRangeQuery("mid").Lte(p.MidTo))
  67. }
  68. if p.DurationFrom > 0 {
  69. query = query.Filter(elastic.NewRangeQuery("duration").Gte(p.DurationFrom))
  70. }
  71. if p.DurationTo > 0 {
  72. query = query.Filter(elastic.NewRangeQuery("duration").Lte(p.DurationTo))
  73. }
  74. if p.TimeFrom != "" && (p.Time == "ctime" || p.Time == "mtime" || p.Time == "pubtime") {
  75. query = query.Filter(elastic.NewRangeQuery(p.Time).Gte(p.TimeFrom))
  76. }
  77. if p.TimeTo != "" && (p.Time == "ctime" || p.Time == "mtime" || p.Time == "pubtime") {
  78. query = query.Filter(elastic.NewRangeQuery(p.Time).Lte(p.TimeTo))
  79. }
  80. if res, err = d.searchResult(c, "ssd_archive", "archivecheck", query, p.Bsp); err != nil {
  81. PromError(fmt.Sprintf("es:%s ", p.Bsp.AppID), "%v", err)
  82. }
  83. return
  84. }
  85. // Video search video from ES (deprecated).
  86. func (d *Dao) Video(c context.Context, p *model.VideoParams) (res *model.SearchResult, err error) {
  87. query := elastic.NewBoolQuery()
  88. if p.Bsp.KW != "" {
  89. query = query.Must(elastic.NewMultiMatchQuery(p.Bsp.KW, p.Bsp.KwFields...).Type("best_fields").TieBreaker(0.3))
  90. }
  91. if len(p.VIDs) > 0 {
  92. interfaceSlice := make([]interface{}, len(p.VIDs))
  93. for i, d := range p.VIDs {
  94. interfaceSlice[i] = d
  95. }
  96. query = query.Filter(elastic.NewTermsQuery("vid", interfaceSlice...))
  97. }
  98. if len(p.AIDs) > 0 {
  99. interfaceSlice := make([]interface{}, len(p.AIDs))
  100. for i, d := range p.AIDs {
  101. interfaceSlice[i] = d
  102. }
  103. query = query.Filter(elastic.NewTermsQuery("aid", interfaceSlice...))
  104. }
  105. if len(p.CIDs) > 0 {
  106. interfaceSlice := make([]interface{}, len(p.CIDs))
  107. for i, d := range p.CIDs {
  108. interfaceSlice[i] = d
  109. }
  110. query = query.Filter(elastic.NewTermsQuery("cid", interfaceSlice...))
  111. }
  112. if len(p.TIDs) > 0 {
  113. interfaceSlice := make([]interface{}, len(p.TIDs))
  114. for i, d := range p.TIDs {
  115. interfaceSlice[i] = d
  116. }
  117. query = query.Filter(elastic.NewTermsQuery("arc_typeid", interfaceSlice...))
  118. }
  119. if len(p.FileNames) > 0 {
  120. interfaceSlice := make([]interface{}, len(p.FileNames))
  121. for i, d := range p.FileNames {
  122. interfaceSlice[i] = d
  123. }
  124. query = query.Filter(elastic.NewTermsQuery("filename", interfaceSlice...))
  125. }
  126. if len(p.RelationStates) > 0 {
  127. interfaceSlice := make([]interface{}, len(p.RelationStates))
  128. for i, d := range p.RelationStates {
  129. interfaceSlice[i] = d
  130. }
  131. query = query.Filter(elastic.NewTermsQuery("relation_state", interfaceSlice...))
  132. }
  133. if len(p.ArcMids) > 0 {
  134. interfaceSlice := make([]interface{}, len(p.ArcMids))
  135. for i, d := range p.ArcMids {
  136. interfaceSlice[i] = d
  137. }
  138. query = query.Filter(elastic.NewTermsQuery("arc_mid", interfaceSlice...))
  139. }
  140. if len(p.ArcMids) > 0 {
  141. interfaceSlice := make([]interface{}, len(p.ArcMids))
  142. for i, d := range p.ArcMids {
  143. interfaceSlice[i] = d
  144. }
  145. query = query.Filter(elastic.NewTermsQuery("arc_mid", interfaceSlice...))
  146. }
  147. if p.TagID > 0 {
  148. query = query.Filter(elastic.NewTermQuery("tag_id", p.TagID))
  149. }
  150. if len(p.Status) > 0 {
  151. interfaceSlice := make([]interface{}, len(p.Status))
  152. for i, d := range p.Status {
  153. interfaceSlice[i] = d
  154. }
  155. query = query.Filter(elastic.NewTermsQuery("status", interfaceSlice...))
  156. }
  157. if len(p.XCodeState) > 0 {
  158. interfaceSlice := make([]interface{}, len(p.XCodeState))
  159. for i, d := range p.XCodeState {
  160. interfaceSlice[i] = d
  161. }
  162. query = query.Filter(elastic.NewTermsQuery("xcode_state", interfaceSlice...))
  163. }
  164. // 不再查库过滤arc_mid
  165. if p.UserType > 0 {
  166. query = query.Filter(elastic.NewTermQuery("user_type", p.UserType))
  167. }
  168. if p.DurationFrom > 0 {
  169. query = query.Filter(elastic.NewRangeQuery("duration").Gte(p.DurationFrom))
  170. }
  171. if p.DurationTo > 0 {
  172. query = query.Filter(elastic.NewRangeQuery("duration").Lte(p.DurationTo))
  173. }
  174. if p.OrderType == 1 {
  175. diffs := time.Now().Unix() - 1420041600
  176. days := fmt.Sprintf("%dd", diffs/(3600*24))
  177. score := elastic.NewFunctionScoreQuery().Add(elastic.NewTermQuery("user_type", 1), elastic.NewExponentialDecayFunction().FieldName("arc_senddate").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(10000))).Add(nil, elastic.NewExponentialDecayFunction().FieldName("arc_senddate").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(1)))
  178. query = query.Must(score)
  179. p.Bsp.Order = []string{}
  180. }
  181. if res, err = d.searchResult(c, "ssd_archive", "archive_video", query, p.Bsp); err != nil {
  182. PromError(fmt.Sprintf("es:%s ", p.Bsp.AppID), "%v", err)
  183. }
  184. return
  185. }
  186. // TaskQa .
  187. func (d *Dao) TaskQa(c context.Context, p *model.TaskQa) (res *model.SearchResult, err error) {
  188. query := elastic.NewBoolQuery()
  189. if p.Bsp.KW != "" {
  190. query = query.Must(elastic.NewMultiMatchQuery(p.Bsp.KW, p.Bsp.KwFields...).Type("best_fields").TieBreaker(0.3))
  191. }
  192. if len(p.Ids) > 0 {
  193. interfaceSlice := make([]interface{}, len(p.Ids))
  194. for i, d := range p.Ids {
  195. interfaceSlice[i] = d
  196. }
  197. query = query.Filter(elastic.NewTermsQuery("id", interfaceSlice...))
  198. }
  199. if len(p.TaskIds) > 0 {
  200. interfaceSlice := make([]interface{}, len(p.TaskIds))
  201. for i, d := range p.TaskIds {
  202. interfaceSlice[i] = d
  203. }
  204. query = query.Filter(elastic.NewTermsQuery("task_id", interfaceSlice...))
  205. }
  206. if len(p.Uids) > 0 {
  207. interfaceSlice := make([]interface{}, len(p.Uids))
  208. for i, d := range p.Uids {
  209. interfaceSlice[i] = d
  210. }
  211. query = query.Filter(elastic.NewTermsQuery("uid", interfaceSlice...))
  212. }
  213. if len(p.ArcTagIds) > 0 {
  214. interfaceSlice := make([]interface{}, len(p.ArcTagIds))
  215. for i, d := range p.ArcTagIds {
  216. interfaceSlice[i] = d
  217. }
  218. query = query.Filter(elastic.NewTermsQuery("arc_tagid", interfaceSlice...))
  219. }
  220. if len(p.AuditTagIds) > 0 {
  221. interfaceSlice := make([]interface{}, len(p.AuditTagIds))
  222. for i, d := range p.AuditTagIds {
  223. interfaceSlice[i] = d
  224. }
  225. query = query.Filter(elastic.NewTermsQuery("audit_tagid", interfaceSlice...))
  226. }
  227. if len(p.UpGroups) > 0 {
  228. interfaceSlice := make([]interface{}, len(p.UpGroups))
  229. for i, d := range p.UpGroups {
  230. interfaceSlice[i] = d
  231. }
  232. query = query.Filter(elastic.NewTermsQuery("up_groups", interfaceSlice...))
  233. }
  234. if len(p.ArcTitles) > 0 {
  235. interfaceSlice := make([]interface{}, len(p.ArcTitles))
  236. for i, d := range p.ArcTitles {
  237. interfaceSlice[i] = d
  238. }
  239. query = query.Filter(elastic.NewTermsQuery("arc_title", interfaceSlice...))
  240. }
  241. if len(p.ArcTypeIds) > 0 {
  242. interfaceSlice := make([]interface{}, len(p.ArcTypeIds))
  243. for i, d := range p.ArcTypeIds {
  244. interfaceSlice[i] = d
  245. }
  246. query = query.Filter(elastic.NewTermsQuery("arc_typeid", interfaceSlice...))
  247. }
  248. if len(p.States) > 0 {
  249. interfaceSlice := make([]interface{}, len(p.States))
  250. for i, d := range p.States {
  251. interfaceSlice[i] = d
  252. }
  253. query = query.Filter(elastic.NewTermsQuery("state", interfaceSlice...))
  254. }
  255. if len(p.AuditStatuses) > 0 {
  256. interfaceSlice := make([]interface{}, len(p.AuditStatuses))
  257. for i, d := range p.AuditStatuses {
  258. interfaceSlice[i] = d
  259. }
  260. query = query.Filter(elastic.NewTermsQuery("audit_status", interfaceSlice...))
  261. }
  262. if p.FansFrom != "" {
  263. query = query.Filter(elastic.NewRangeQuery("fans").Gte(p.FansFrom))
  264. }
  265. if p.FansTo != "" {
  266. query = query.Filter(elastic.NewRangeQuery("fans").Lte(p.FansTo))
  267. }
  268. if p.CtimeFrom != "" {
  269. query = query.Filter(elastic.NewRangeQuery("ctime").Gte(p.CtimeFrom))
  270. }
  271. if p.CtimeTo != "" {
  272. query = query.Filter(elastic.NewRangeQuery("ctime").Lte(p.CtimeTo))
  273. }
  274. if p.FtimeFrom != "" {
  275. query = query.Filter(elastic.NewRangeQuery("ftime").Gte(p.FtimeFrom))
  276. }
  277. if p.FtimeTo != "" {
  278. query = query.Filter(elastic.NewRangeQuery("ftime").Lte(p.FtimeTo))
  279. }
  280. if res, err = d.searchResult(c, "ssd_archive", p.Bsp.AppID, query, p.Bsp); err != nil {
  281. PromError(fmt.Sprintf("es:%s ", p.Bsp.AppID), "%v", err)
  282. }
  283. return
  284. }
  285. // ArchiveCommerce .
  286. func (d *Dao) ArchiveCommerce(c context.Context, p *model.ArchiveCommerce) (res *model.SearchResult, err error) {
  287. query := elastic.NewBoolQuery()
  288. if p.Bsp.KW != "" {
  289. query = query.Must(elastic.NewMultiMatchQuery(p.Bsp.KW, p.Bsp.KwFields...).Type("best_fields").TieBreaker(0.3))
  290. }
  291. if len(p.Ids) > 0 {
  292. interfaceSlice := make([]interface{}, len(p.Ids))
  293. for i, d := range p.Ids {
  294. interfaceSlice[i] = d
  295. }
  296. query = query.Filter(elastic.NewTermsQuery("id", interfaceSlice...))
  297. }
  298. if len(p.Mids) > 0 {
  299. interfaceSlice := make([]interface{}, len(p.Mids))
  300. for i, d := range p.Mids {
  301. interfaceSlice[i] = d
  302. }
  303. query = query.Filter(elastic.NewTermsQuery("mid", interfaceSlice...))
  304. }
  305. if len(p.PTypeIds) > 0 {
  306. interfaceSlice := make([]interface{}, len(p.PTypeIds))
  307. for i, d := range p.PTypeIds {
  308. interfaceSlice[i] = d
  309. }
  310. query = query.Filter(elastic.NewTermsQuery("ptypeid", interfaceSlice...))
  311. }
  312. if len(p.TypeIds) > 0 {
  313. interfaceSlice := make([]interface{}, len(p.TypeIds))
  314. for i, d := range p.TypeIds {
  315. interfaceSlice[i] = d
  316. }
  317. query = query.Filter(elastic.NewTermsQuery("typeid", interfaceSlice...))
  318. }
  319. if len(p.States) > 0 {
  320. interfaceSlice := make([]interface{}, len(p.States))
  321. for i, d := range p.States {
  322. interfaceSlice[i] = d
  323. }
  324. query = query.Filter(elastic.NewTermsQuery("state", interfaceSlice...))
  325. }
  326. if len(p.Copyrights) > 0 {
  327. interfaceSlice := make([]interface{}, len(p.Copyrights))
  328. for i, d := range p.Copyrights {
  329. interfaceSlice[i] = d
  330. }
  331. query = query.Filter(elastic.NewTermsQuery("copyright", interfaceSlice...))
  332. }
  333. if len(p.OrderIds) > 0 {
  334. interfaceSlice := make([]interface{}, len(p.OrderIds))
  335. for i, d := range p.OrderIds {
  336. interfaceSlice[i] = d
  337. }
  338. query = query.Filter(elastic.NewTermsQuery("order_id", interfaceSlice...))
  339. }
  340. if p.IsOrder == 1 {
  341. query = query.Filter(elastic.NewRangeQuery("order_id").Gt(0))
  342. }
  343. if p.IsOrder == 0 {
  344. query = query.MustNot(elastic.NewRangeQuery("order_id").Gt(0))
  345. }
  346. if p.IsOriginal == 1 {
  347. query = query.Filter(elastic.NewTermsQuery("copyright", 1))
  348. }
  349. if p.IsOriginal == 0 {
  350. query = query.MustNot(elastic.NewTermsQuery("copyright", 1))
  351. }
  352. if p.Action == "get_ptypeids" {
  353. if res, err = d.ArchiveCommercePTypeIds(c, query); err != nil {
  354. PromError(fmt.Sprintf("es:%s ", p.Bsp.AppID), "%v", err)
  355. }
  356. return
  357. }
  358. if res, err = d.searchResult(c, "ssd_archive", "archive_commerce_v", query, p.Bsp); err != nil {
  359. PromError(fmt.Sprintf("es:%s ", p.Bsp.AppID), "%v", err)
  360. }
  361. return
  362. }
  363. // ArchiveCommercePTypeIds .
  364. func (d *Dao) ArchiveCommercePTypeIds(c context.Context, query *elastic.BoolQuery) (res *model.SearchResult, err error) {
  365. res = &model.SearchResult{
  366. Result: []json.RawMessage{},
  367. Page: &model.Page{},
  368. }
  369. aggs := elastic.NewTermsAggregation()
  370. aggs = aggs.Field("ptypeid").Size(1000)
  371. if _, ok := d.esPool["ssd_archive"]; !ok {
  372. PromError(fmt.Sprintf("es:集群不存在%s", "ssd_archive"), "s.dao.searchResult indexName:%s", "ssd_archive")
  373. res = &model.SearchResult{Debug: fmt.Sprintf("es:集群不存在%s, %s", "ssd_archive", res.Debug)}
  374. return
  375. }
  376. searchResult, err := d.esPool["ssd_archive"].Search().Index("archive_commerce_v").Query(query).Aggregation("group_by_ptypeid", aggs).Size(0).Do(context.Background())
  377. if err != nil {
  378. PromError(fmt.Sprintf("es:执行查询失败%s ", "ArchiveCommercePTypeIds"), "dao.log.ArchiveCommercePTypeIds(%v)", err)
  379. return
  380. }
  381. result, ok := searchResult.Aggregations.Terms("group_by_ptypeid")
  382. if !ok {
  383. PromError(fmt.Sprintf("es:Unmarshal%s ", "log"), "es:Unmarshal%v", err)
  384. return
  385. }
  386. for _, v := range result.Buckets {
  387. res.Result = append(res.Result, []byte(v.Key.(string)))
  388. }
  389. res.Page.Pn = 1
  390. res.Page.Ps = 1000
  391. res.Page.Total = int64(len(res.Result))
  392. return
  393. }