query_extra.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "go-common/app/admin/main/search/model"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. "gopkg.in/olivere/elastic.v5"
  12. )
  13. // ArchiveVideoScore 稿件一审打分排序.
  14. func (d *Dao) ArchiveVideoScore(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
  15. query, qbDebug := d.QueryBasic(c, sp)
  16. // query append
  17. diffs := time.Now().Unix() - 1420041600
  18. days := fmt.Sprintf("%dd", diffs/(3600*24))
  19. score := elastic.NewFunctionScoreQuery().Add(elastic.NewTermQuery("user_type", 1), elastic.NewExponentialDecayFunction().FieldName("arc_senddate").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(10000))).Add(nil, elastic.NewExponentialDecayFunction().FieldName("arc_senddate").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(1)))
  20. query = query.Must(score)
  21. sp.QueryBody.Order = []map[string]string{}
  22. // do
  23. if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
  24. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  25. }
  26. return
  27. }
  28. // ArchiveScore 稿件二审打分排序.
  29. func (d *Dao) ArchiveScore(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
  30. query, qbDebug := d.QueryBasic(c, sp)
  31. // query append
  32. diffs := time.Now().Unix() - 1420041600
  33. days := fmt.Sprintf("%dd", diffs/(3600*24))
  34. score := elastic.NewFunctionScoreQuery().Add(elastic.NewTermQuery("user_type", 1), elastic.NewExponentialDecayFunction().FieldName("ctime").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(10000))).Add(nil, elastic.NewExponentialDecayFunction().FieldName("ctime").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(1)))
  35. query = query.Must(score)
  36. sp.QueryBody.Order = []map[string]string{}
  37. // do
  38. if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
  39. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  40. }
  41. return
  42. }
  43. // TaskQaRandom .
  44. func (d *Dao) TaskQaRandom(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
  45. random := elastic.NewRandomFunction()
  46. if sp != nil && sp.QueryBody != nil && sp.QueryBody.Where != nil && sp.QueryBody.Where.EQ != nil {
  47. if seed, ok := sp.QueryBody.Where.EQ["seed"]; ok {
  48. random = elastic.NewRandomFunction().Seed(seed)
  49. delete(sp.QueryBody.Where.EQ, "seed")
  50. }
  51. }
  52. query, qbDebug := d.QueryBasic(c, sp)
  53. if err != nil {
  54. PromError(fmt.Sprintf("es basic:%s ", sp.Business), "%v", err)
  55. }
  56. score := elastic.NewFunctionScoreQuery().Add(elastic.NewBoolQuery(), random)
  57. qy := elastic.NewBoolQuery().Must(query, score)
  58. if res, debug, err = d.QueryResult(c, qy, sp, qbDebug); err != nil {
  59. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  60. }
  61. return
  62. }
  63. // EsportsContestsDate 电竞右侧日历联动.
  64. func (d *Dao) EsportsContestsDate(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
  65. res = &model.QueryResult{}
  66. // query basic
  67. query, qbDebug := d.QueryBasic(c, sp)
  68. debug = qbDebug
  69. esCluster := sp.AppIDConf.ESCluster
  70. if _, ok := d.esPool[esCluster]; !ok {
  71. debug.AddErrMsg("es:集群不存在" + esCluster)
  72. return
  73. }
  74. aggs := elastic.NewTermsAggregation()
  75. fsc := elastic.NewFetchSourceContext(true).Include("ids")
  76. aggs = aggs.Field("stime").Size(1000).SubAggregation("top_ids_hits", elastic.NewTopHitsAggregation().FetchSourceContext(fsc).Size(1000))
  77. searchPrepare := d.esPool[esCluster].Search().Index(sp.QueryBody.From).Query(query).Aggregation("group_by_stime", aggs).Size(0)
  78. if sp.DebugLevel == 2 {
  79. searchPrepare.Profile(true)
  80. }
  81. searchResult, err := searchPrepare.Do(context.Background())
  82. if err != nil {
  83. debug.AddErrMsg(fmt.Sprintf("es:执行查询失败%s. %v", esCluster, err))
  84. PromError(fmt.Sprintf("es:执行查询失败%s ", esCluster), "%v", err)
  85. return
  86. }
  87. result, ok := searchResult.Aggregations.Terms("group_by_stime")
  88. if !ok {
  89. return
  90. }
  91. type hitDoc struct {
  92. Hits []struct {
  93. Source struct {
  94. IDs []string `json:"ids"`
  95. } `json:"_source"`
  96. } `json:"hits"`
  97. }
  98. type idsRes struct {
  99. Date string
  100. IDs []string
  101. }
  102. ids := []idsRes{}
  103. for _, b := range result.Buckets {
  104. var hit hitDoc
  105. //b.KeyAsString
  106. if list, ok := b.Terms("top_ids_hits"); ok {
  107. a, _ := list.Aggregations["hits"].MarshalJSON()
  108. if err = json.Unmarshal(a, &hit); err != nil {
  109. return
  110. }
  111. for _, h := range hit.Hits {
  112. ids = append(ids, idsRes{
  113. Date: *b.KeyAsString,
  114. IDs: h.Source.IDs,
  115. })
  116. }
  117. }
  118. }
  119. resDoc := map[string]int{}
  120. resDocTmp := map[string]map[string]bool{}
  121. for _, v := range ids {
  122. if _, ok := resDocTmp[v.Date]; !ok {
  123. resDocTmp[v.Date] = map[string]bool{}
  124. }
  125. for _, id := range v.IDs {
  126. resDocTmp[v.Date][id] = true
  127. }
  128. }
  129. for date, idList := range resDocTmp {
  130. resDoc[date] = len(idList)
  131. }
  132. if doc, er := json.Marshal(resDoc); er != nil {
  133. debug.AddErrMsg(fmt.Sprintf("es:Unmarshal docBuckets es:Unmarshal%v", er))
  134. } else {
  135. res.Result = doc
  136. }
  137. return
  138. }
  139. var (
  140. _pubed = []interface{}{-40, 0, 10000, 1, 1001, 15000, 20000, 30000}
  141. _notpubed = []interface{}{-2, -4, -5, -11, -12, -16}
  142. _ispubing = []interface{}{-1, -6, -7, -8, -9, -10, -13, -15, -30}
  143. _all = append(append(_pubed, _notpubed...), _ispubing...)
  144. )
  145. // CreativeArchiveSearch 创作中心
  146. func (d *Dao) CreativeArchiveSearch(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
  147. var (
  148. mid interface{}
  149. ok bool
  150. )
  151. docBuckets := map[string]interface{}{}
  152. if sp == nil && sp.QueryBody == nil && sp.QueryBody.Where == nil && sp.QueryBody.Where.EQ == nil {
  153. return res, debug, ecode.RequestErr
  154. }
  155. if mid, ok = sp.QueryBody.Where.EQ["mid"]; !ok {
  156. return res, debug, ecode.RequestErr
  157. }
  158. // 列表
  159. if state, ok := sp.QueryBody.Where.EQ["state"]; ok {
  160. if sp.QueryBody.Where.In == nil {
  161. sp.QueryBody.Where.In = map[string][]interface{}{}
  162. }
  163. switch state {
  164. case "pubed":
  165. sp.QueryBody.Where.In["state"] = _pubed
  166. case "not_pubed":
  167. sp.QueryBody.Where.In["state"] = _notpubed
  168. case "is_pubing":
  169. sp.QueryBody.Where.In["state"] = _ispubing
  170. default:
  171. sp.QueryBody.Where.In["state"] = _all
  172. }
  173. delete(sp.QueryBody.Where.EQ, "state")
  174. } else {
  175. if sp.QueryBody.Where.In == nil {
  176. sp.QueryBody.Where.In = map[string][]interface{}{}
  177. }
  178. sp.QueryBody.Where.In["state"] = _all
  179. }
  180. query, qbDebug := d.QueryBasic(c, sp)
  181. if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
  182. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  183. return
  184. }
  185. docBuckets["vlist"] = res.Result
  186. // 类型统计
  187. typeFilter := elastic.NewBoolQuery().Must(elastic.NewTermsQuery("mid", mid))
  188. typeFilter = typeFilter.Filter(elastic.NewTermsQuery("state", _all...))
  189. for _, v := range sp.QueryBody.Where.Like {
  190. typeFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("100%"))
  191. }
  192. typeAgg := elastic.NewTermsAggregation().Field("pid")
  193. request1 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(typeFilter).Aggregation("pid", typeAgg))
  194. // 状态统计
  195. stateFilter := elastic.NewBoolQuery().Filter(elastic.NewTermsQuery("mid", mid))
  196. if pid, ok := sp.QueryBody.Where.EQ["pid"]; ok {
  197. stateFilter = stateFilter.Filter(elastic.NewTermsQuery("pid", pid))
  198. }
  199. for _, v := range sp.QueryBody.Where.Like {
  200. stateFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("100%"))
  201. }
  202. stateAgg := elastic.NewFiltersAggregation().
  203. FilterWithName("pubed", elastic.NewTermsQuery("state", _pubed...)).
  204. FilterWithName("not_pubed", elastic.NewTermsQuery("state", _notpubed...)).
  205. FilterWithName("is_pubing", elastic.NewTermsQuery("state", _ispubing...))
  206. request2 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(stateFilter).Aggregation("state", stateAgg))
  207. MultiRes, err := d.esPool[sp.AppIDConf.ESCluster].MultiSearch().Add(request1, request2).Do(c)
  208. if err != nil {
  209. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  210. return
  211. }
  212. // 取得数据
  213. tmp := map[string]interface{}{}
  214. json.Unmarshal(*MultiRes.Responses[0].Aggregations["pid"], &tmp)
  215. docBuckets["tlist"] = tmp["buckets"]
  216. tmp = map[string]interface{}{}
  217. json.Unmarshal(*MultiRes.Responses[1].Aggregations["state"], &tmp)
  218. docBuckets["plist"] = tmp["buckets"]
  219. if resResult, e := json.Marshal(docBuckets); e != nil {
  220. log.Error("CreativeArchiveSearch.json.error(%v)", e)
  221. } else {
  222. res.Result = resResult
  223. }
  224. return
  225. }
  226. // CreativeArchiveStaff 创作中心
  227. func (d *Dao) CreativeArchiveStaff(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
  228. docBuckets := map[string]interface{}{}
  229. if sp == nil || sp.QueryBody == nil || sp.QueryBody.Where == nil || sp.QueryBody.Where.Combo == nil || len(sp.QueryBody.Where.Combo) != 1 {
  230. return res, debug, ecode.RequestErr
  231. }
  232. combo := sp.QueryBody.Where.Combo[0]
  233. if len(combo.EQ) == 0 {
  234. return res, debug, ecode.RequestErr
  235. }
  236. queryListParams := &model.QueryParams{
  237. QueryBody: &model.QueryBody{
  238. Where: &model.QueryBodyWhere{
  239. Combo: sp.QueryBody.Where.Combo,
  240. },
  241. },
  242. }
  243. queryList, _ := d.QueryBasic(c, queryListParams)
  244. // 列表
  245. if state, ok := sp.QueryBody.Where.EQ["state"]; ok {
  246. if sp.QueryBody.Where.In == nil {
  247. sp.QueryBody.Where.In = map[string][]interface{}{}
  248. }
  249. switch state {
  250. case "pubed":
  251. sp.QueryBody.Where.In["state"] = _pubed
  252. case "not_pubed":
  253. sp.QueryBody.Where.In["state"] = _notpubed
  254. case "is_pubing":
  255. sp.QueryBody.Where.In["state"] = _ispubing
  256. default:
  257. sp.QueryBody.Where.In["state"] = _all
  258. }
  259. delete(sp.QueryBody.Where.EQ, "state")
  260. } else {
  261. if sp.QueryBody.Where.In == nil {
  262. sp.QueryBody.Where.In = map[string][]interface{}{}
  263. }
  264. sp.QueryBody.Where.In["state"] = _all
  265. }
  266. query, qbDebug := d.QueryBasic(c, sp)
  267. if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
  268. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  269. return
  270. }
  271. docBuckets["vlist"] = res.Result
  272. // 类型统计
  273. typeFilter := elastic.NewBoolQuery().Filter(queryList)
  274. typeFilter = typeFilter.Filter(elastic.NewTermsQuery("state", _all...))
  275. for _, v := range sp.QueryBody.Where.Like {
  276. typeFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%"))
  277. }
  278. typeAgg := elastic.NewTermsAggregation().Field("pid")
  279. request1 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(typeFilter).Aggregation("pid", typeAgg).Size(0))
  280. // 状态统计
  281. stateFilter := elastic.NewBoolQuery().Filter(queryList)
  282. if pid, ok := sp.QueryBody.Where.EQ["pid"]; ok {
  283. stateFilter = stateFilter.Filter(elastic.NewTermsQuery("pid", pid))
  284. }
  285. for _, v := range sp.QueryBody.Where.Like {
  286. stateFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%"))
  287. }
  288. stateAgg := elastic.NewFiltersAggregation().
  289. // 稿件状态
  290. FilterWithName("pubed", elastic.NewTermsQuery("state", _pubed...)).
  291. FilterWithName("not_pubed", elastic.NewTermsQuery("state", _notpubed...)).
  292. FilterWithName("is_pubing", elastic.NewTermsQuery("state", _ispubing...))
  293. request2 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(stateFilter).Aggregation("state", stateAgg).Size(0))
  294. MultiRes, err := d.esPool[sp.AppIDConf.ESCluster].MultiSearch().Add(request1, request2).Do(c)
  295. if err != nil {
  296. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  297. return
  298. }
  299. // 取得数据
  300. tmp := map[string]interface{}{}
  301. json.Unmarshal(*MultiRes.Responses[0].Aggregations["pid"], &tmp)
  302. docBuckets["tlist"] = tmp["buckets"]
  303. tmp = map[string]interface{}{}
  304. json.Unmarshal(*MultiRes.Responses[1].Aggregations["state"], &tmp)
  305. docBuckets["plist"] = tmp["buckets"]
  306. if resResult, e := json.Marshal(docBuckets); e != nil {
  307. log.Error("CreativeArchiveSearch.json.error(%v)", e)
  308. } else {
  309. res.Result = resResult
  310. }
  311. return
  312. }
  313. // CreativeArchiveStaff 创作中心
  314. func (d *Dao) CreativeArchiveApply(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
  315. var (
  316. applyStaffMid interface{}
  317. ok bool
  318. )
  319. docBuckets := map[string]interface{}{}
  320. if sp == nil || sp.QueryBody == nil || sp.QueryBody.Where == nil || sp.QueryBody.Where.EQ == nil {
  321. return res, debug, ecode.RequestErr
  322. }
  323. if applyStaffMid, ok = sp.QueryBody.Where.EQ["apply_staff.apply_staff_mid"]; !ok {
  324. return res, debug, ecode.RequestErr
  325. }
  326. // 列表
  327. if state, ok := sp.QueryBody.Where.EQ["apply_staff.deal_state"]; ok {
  328. if sp.QueryBody.Where.In == nil {
  329. sp.QueryBody.Where.In = map[string][]interface{}{}
  330. }
  331. switch state {
  332. case "pending": //待处理
  333. sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{1}
  334. case "processed": //已处理
  335. sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{2}
  336. case "neglected": //已忽略
  337. sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{3}
  338. default:
  339. sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{1, 2, 3}
  340. }
  341. delete(sp.QueryBody.Where.EQ, "apply_staff.deal_state")
  342. } else {
  343. if sp.QueryBody.Where.In == nil {
  344. sp.QueryBody.Where.In = map[string][]interface{}{}
  345. }
  346. sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{1, 2, 3}
  347. }
  348. sp.QueryBody.Where.In["state"] = _all
  349. query, qbDebug := d.QueryBasic(c, sp)
  350. if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
  351. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  352. return
  353. }
  354. docBuckets["vlist"] = res.Result
  355. // 类型统计
  356. typeFilter := elastic.NewBoolQuery().Filter(
  357. elastic.NewTermsQuery("state", _all...),
  358. elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(
  359. elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid),
  360. elastic.NewTermsQuery("apply_staff.deal_state", []interface{}{1, 2, 3}...),
  361. )),
  362. )
  363. for _, v := range sp.QueryBody.Where.Like {
  364. typeFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%"))
  365. }
  366. typeAgg := elastic.NewTermsAggregation().Field("pid")
  367. request1 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(typeFilter).Aggregation("pid", typeAgg).Size(0))
  368. // 状态统计
  369. stateFilter := elastic.NewBoolQuery().Filter(
  370. elastic.NewTermsQuery("state", _all...),
  371. elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid))),
  372. )
  373. if pid, ok := sp.QueryBody.Where.EQ["pid"]; ok {
  374. stateFilter = stateFilter.Filter(elastic.NewTermsQuery("pid", pid))
  375. }
  376. for _, v := range sp.QueryBody.Where.Like {
  377. stateFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%"))
  378. }
  379. stateAgg := elastic.NewFiltersAggregation().
  380. FilterWithName("pending", elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid), elastic.NewTermQuery("apply_staff.deal_state", 1)))).
  381. FilterWithName("processed", elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid), elastic.NewTermQuery("apply_staff.deal_state", 2)))).
  382. FilterWithName("neglected", elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid), elastic.NewTermQuery("apply_staff.deal_state", 3))))
  383. request2 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(stateFilter).Aggregation("state", stateAgg).Size(0))
  384. MultiRes, err := d.esPool[sp.AppIDConf.ESCluster].MultiSearch().Add(request1, request2).Do(c)
  385. if err != nil {
  386. PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
  387. return
  388. }
  389. // 取得数据
  390. tmp := map[string]interface{}{}
  391. json.Unmarshal(*MultiRes.Responses[0].Aggregations["pid"], &tmp)
  392. docBuckets["tlist"] = tmp["buckets"]
  393. tmp = map[string]interface{}{}
  394. json.Unmarshal(*MultiRes.Responses[1].Aggregations["state"], &tmp)
  395. docBuckets["plist"] = tmp["buckets"]
  396. if resResult, e := json.Marshal(docBuckets); e != nil {
  397. log.Error("CreativeArchiveSearch.json.error(%v)", e)
  398. } else {
  399. res.Result = resResult
  400. }
  401. return
  402. }