123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413 |
- package dao
- import (
- "context"
- "encoding/json"
- "fmt"
- "strings"
- "time"
- "go-common/app/admin/main/search/model"
- "go-common/library/ecode"
- "go-common/library/log"
- "gopkg.in/olivere/elastic.v5"
- )
- // ArchiveVideoScore 稿件一审打分排序.
- func (d *Dao) ArchiveVideoScore(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
- query, qbDebug := d.QueryBasic(c, sp)
- // query append
- diffs := time.Now().Unix() - 1420041600
- days := fmt.Sprintf("%dd", diffs/(3600*24))
- score := elastic.NewFunctionScoreQuery().Add(elastic.NewTermQuery("user_type", 1), elastic.NewExponentialDecayFunction().FieldName("arc_senddate").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(10000))).Add(nil, elastic.NewExponentialDecayFunction().FieldName("arc_senddate").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(1)))
- query = query.Must(score)
- sp.QueryBody.Order = []map[string]string{}
- // do
- if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- }
- return
- }
- // ArchiveScore 稿件二审打分排序.
- func (d *Dao) ArchiveScore(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
- query, qbDebug := d.QueryBasic(c, sp)
- // query append
- diffs := time.Now().Unix() - 1420041600
- days := fmt.Sprintf("%dd", diffs/(3600*24))
- score := elastic.NewFunctionScoreQuery().Add(elastic.NewTermQuery("user_type", 1), elastic.NewExponentialDecayFunction().FieldName("ctime").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(10000))).Add(nil, elastic.NewExponentialDecayFunction().FieldName("ctime").Origin("2015-01-01 00:00:00").Scale(days).Offset("1d").Decay(0.8).Weight(float64(1)))
- query = query.Must(score)
- sp.QueryBody.Order = []map[string]string{}
- // do
- if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- }
- return
- }
- // TaskQaRandom .
- func (d *Dao) TaskQaRandom(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
- random := elastic.NewRandomFunction()
- if sp != nil && sp.QueryBody != nil && sp.QueryBody.Where != nil && sp.QueryBody.Where.EQ != nil {
- if seed, ok := sp.QueryBody.Where.EQ["seed"]; ok {
- random = elastic.NewRandomFunction().Seed(seed)
- delete(sp.QueryBody.Where.EQ, "seed")
- }
- }
- query, qbDebug := d.QueryBasic(c, sp)
- if err != nil {
- PromError(fmt.Sprintf("es basic:%s ", sp.Business), "%v", err)
- }
- score := elastic.NewFunctionScoreQuery().Add(elastic.NewBoolQuery(), random)
- qy := elastic.NewBoolQuery().Must(query, score)
- if res, debug, err = d.QueryResult(c, qy, sp, qbDebug); err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- }
- return
- }
- // EsportsContestsDate 电竞右侧日历联动.
- func (d *Dao) EsportsContestsDate(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
- res = &model.QueryResult{}
- // query basic
- query, qbDebug := d.QueryBasic(c, sp)
- debug = qbDebug
- esCluster := sp.AppIDConf.ESCluster
- if _, ok := d.esPool[esCluster]; !ok {
- debug.AddErrMsg("es:集群不存在" + esCluster)
- return
- }
- aggs := elastic.NewTermsAggregation()
- fsc := elastic.NewFetchSourceContext(true).Include("ids")
- aggs = aggs.Field("stime").Size(1000).SubAggregation("top_ids_hits", elastic.NewTopHitsAggregation().FetchSourceContext(fsc).Size(1000))
- searchPrepare := d.esPool[esCluster].Search().Index(sp.QueryBody.From).Query(query).Aggregation("group_by_stime", aggs).Size(0)
- if sp.DebugLevel == 2 {
- searchPrepare.Profile(true)
- }
- searchResult, err := searchPrepare.Do(context.Background())
- if err != nil {
- debug.AddErrMsg(fmt.Sprintf("es:执行查询失败%s. %v", esCluster, err))
- PromError(fmt.Sprintf("es:执行查询失败%s ", esCluster), "%v", err)
- return
- }
- result, ok := searchResult.Aggregations.Terms("group_by_stime")
- if !ok {
- return
- }
- type hitDoc struct {
- Hits []struct {
- Source struct {
- IDs []string `json:"ids"`
- } `json:"_source"`
- } `json:"hits"`
- }
- type idsRes struct {
- Date string
- IDs []string
- }
- ids := []idsRes{}
- for _, b := range result.Buckets {
- var hit hitDoc
- //b.KeyAsString
- if list, ok := b.Terms("top_ids_hits"); ok {
- a, _ := list.Aggregations["hits"].MarshalJSON()
- if err = json.Unmarshal(a, &hit); err != nil {
- return
- }
- for _, h := range hit.Hits {
- ids = append(ids, idsRes{
- Date: *b.KeyAsString,
- IDs: h.Source.IDs,
- })
- }
- }
- }
- resDoc := map[string]int{}
- resDocTmp := map[string]map[string]bool{}
- for _, v := range ids {
- if _, ok := resDocTmp[v.Date]; !ok {
- resDocTmp[v.Date] = map[string]bool{}
- }
- for _, id := range v.IDs {
- resDocTmp[v.Date][id] = true
- }
- }
- for date, idList := range resDocTmp {
- resDoc[date] = len(idList)
- }
- if doc, er := json.Marshal(resDoc); er != nil {
- debug.AddErrMsg(fmt.Sprintf("es:Unmarshal docBuckets es:Unmarshal%v", er))
- } else {
- res.Result = doc
- }
- return
- }
- var (
- _pubed = []interface{}{-40, 0, 10000, 1, 1001, 15000, 20000, 30000}
- _notpubed = []interface{}{-2, -4, -5, -11, -12, -16}
- _ispubing = []interface{}{-1, -6, -7, -8, -9, -10, -13, -15, -30}
- _all = append(append(_pubed, _notpubed...), _ispubing...)
- )
- // CreativeArchiveSearch 创作中心
- func (d *Dao) CreativeArchiveSearch(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
- var (
- mid interface{}
- ok bool
- )
- docBuckets := map[string]interface{}{}
- if sp == nil && sp.QueryBody == nil && sp.QueryBody.Where == nil && sp.QueryBody.Where.EQ == nil {
- return res, debug, ecode.RequestErr
- }
- if mid, ok = sp.QueryBody.Where.EQ["mid"]; !ok {
- return res, debug, ecode.RequestErr
- }
- // 列表
- if state, ok := sp.QueryBody.Where.EQ["state"]; ok {
- if sp.QueryBody.Where.In == nil {
- sp.QueryBody.Where.In = map[string][]interface{}{}
- }
- switch state {
- case "pubed":
- sp.QueryBody.Where.In["state"] = _pubed
- case "not_pubed":
- sp.QueryBody.Where.In["state"] = _notpubed
- case "is_pubing":
- sp.QueryBody.Where.In["state"] = _ispubing
- default:
- sp.QueryBody.Where.In["state"] = _all
- }
- delete(sp.QueryBody.Where.EQ, "state")
- } else {
- if sp.QueryBody.Where.In == nil {
- sp.QueryBody.Where.In = map[string][]interface{}{}
- }
- sp.QueryBody.Where.In["state"] = _all
- }
- query, qbDebug := d.QueryBasic(c, sp)
- if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- return
- }
- docBuckets["vlist"] = res.Result
- // 类型统计
- typeFilter := elastic.NewBoolQuery().Must(elastic.NewTermsQuery("mid", mid))
- typeFilter = typeFilter.Filter(elastic.NewTermsQuery("state", _all...))
- for _, v := range sp.QueryBody.Where.Like {
- typeFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("100%"))
- }
- typeAgg := elastic.NewTermsAggregation().Field("pid")
- request1 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(typeFilter).Aggregation("pid", typeAgg))
- // 状态统计
- stateFilter := elastic.NewBoolQuery().Filter(elastic.NewTermsQuery("mid", mid))
- if pid, ok := sp.QueryBody.Where.EQ["pid"]; ok {
- stateFilter = stateFilter.Filter(elastic.NewTermsQuery("pid", pid))
- }
- for _, v := range sp.QueryBody.Where.Like {
- stateFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("100%"))
- }
- stateAgg := elastic.NewFiltersAggregation().
- FilterWithName("pubed", elastic.NewTermsQuery("state", _pubed...)).
- FilterWithName("not_pubed", elastic.NewTermsQuery("state", _notpubed...)).
- FilterWithName("is_pubing", elastic.NewTermsQuery("state", _ispubing...))
- request2 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(stateFilter).Aggregation("state", stateAgg))
- MultiRes, err := d.esPool[sp.AppIDConf.ESCluster].MultiSearch().Add(request1, request2).Do(c)
- if err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- return
- }
- // 取得数据
- tmp := map[string]interface{}{}
- json.Unmarshal(*MultiRes.Responses[0].Aggregations["pid"], &tmp)
- docBuckets["tlist"] = tmp["buckets"]
- tmp = map[string]interface{}{}
- json.Unmarshal(*MultiRes.Responses[1].Aggregations["state"], &tmp)
- docBuckets["plist"] = tmp["buckets"]
- if resResult, e := json.Marshal(docBuckets); e != nil {
- log.Error("CreativeArchiveSearch.json.error(%v)", e)
- } else {
- res.Result = resResult
- }
- return
- }
- // CreativeArchiveStaff 创作中心
- func (d *Dao) CreativeArchiveStaff(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
- docBuckets := map[string]interface{}{}
- if sp == nil || sp.QueryBody == nil || sp.QueryBody.Where == nil || sp.QueryBody.Where.Combo == nil || len(sp.QueryBody.Where.Combo) != 1 {
- return res, debug, ecode.RequestErr
- }
- combo := sp.QueryBody.Where.Combo[0]
- if len(combo.EQ) == 0 {
- return res, debug, ecode.RequestErr
- }
- queryListParams := &model.QueryParams{
- QueryBody: &model.QueryBody{
- Where: &model.QueryBodyWhere{
- Combo: sp.QueryBody.Where.Combo,
- },
- },
- }
- queryList, _ := d.QueryBasic(c, queryListParams)
- // 列表
- if state, ok := sp.QueryBody.Where.EQ["state"]; ok {
- if sp.QueryBody.Where.In == nil {
- sp.QueryBody.Where.In = map[string][]interface{}{}
- }
- switch state {
- case "pubed":
- sp.QueryBody.Where.In["state"] = _pubed
- case "not_pubed":
- sp.QueryBody.Where.In["state"] = _notpubed
- case "is_pubing":
- sp.QueryBody.Where.In["state"] = _ispubing
- default:
- sp.QueryBody.Where.In["state"] = _all
- }
- delete(sp.QueryBody.Where.EQ, "state")
- } else {
- if sp.QueryBody.Where.In == nil {
- sp.QueryBody.Where.In = map[string][]interface{}{}
- }
- sp.QueryBody.Where.In["state"] = _all
- }
- query, qbDebug := d.QueryBasic(c, sp)
- if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- return
- }
- docBuckets["vlist"] = res.Result
- // 类型统计
- typeFilter := elastic.NewBoolQuery().Filter(queryList)
- typeFilter = typeFilter.Filter(elastic.NewTermsQuery("state", _all...))
- for _, v := range sp.QueryBody.Where.Like {
- typeFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%"))
- }
- typeAgg := elastic.NewTermsAggregation().Field("pid")
- request1 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(typeFilter).Aggregation("pid", typeAgg).Size(0))
- // 状态统计
- stateFilter := elastic.NewBoolQuery().Filter(queryList)
- if pid, ok := sp.QueryBody.Where.EQ["pid"]; ok {
- stateFilter = stateFilter.Filter(elastic.NewTermsQuery("pid", pid))
- }
- for _, v := range sp.QueryBody.Where.Like {
- stateFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%"))
- }
- stateAgg := elastic.NewFiltersAggregation().
- // 稿件状态
- FilterWithName("pubed", elastic.NewTermsQuery("state", _pubed...)).
- FilterWithName("not_pubed", elastic.NewTermsQuery("state", _notpubed...)).
- FilterWithName("is_pubing", elastic.NewTermsQuery("state", _ispubing...))
- request2 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(stateFilter).Aggregation("state", stateAgg).Size(0))
- MultiRes, err := d.esPool[sp.AppIDConf.ESCluster].MultiSearch().Add(request1, request2).Do(c)
- if err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- return
- }
- // 取得数据
- tmp := map[string]interface{}{}
- json.Unmarshal(*MultiRes.Responses[0].Aggregations["pid"], &tmp)
- docBuckets["tlist"] = tmp["buckets"]
- tmp = map[string]interface{}{}
- json.Unmarshal(*MultiRes.Responses[1].Aggregations["state"], &tmp)
- docBuckets["plist"] = tmp["buckets"]
- if resResult, e := json.Marshal(docBuckets); e != nil {
- log.Error("CreativeArchiveSearch.json.error(%v)", e)
- } else {
- res.Result = resResult
- }
- return
- }
- // CreativeArchiveStaff 创作中心
- func (d *Dao) CreativeArchiveApply(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
- var (
- applyStaffMid interface{}
- ok bool
- )
- docBuckets := map[string]interface{}{}
- if sp == nil || sp.QueryBody == nil || sp.QueryBody.Where == nil || sp.QueryBody.Where.EQ == nil {
- return res, debug, ecode.RequestErr
- }
- if applyStaffMid, ok = sp.QueryBody.Where.EQ["apply_staff.apply_staff_mid"]; !ok {
- return res, debug, ecode.RequestErr
- }
- // 列表
- if state, ok := sp.QueryBody.Where.EQ["apply_staff.deal_state"]; ok {
- if sp.QueryBody.Where.In == nil {
- sp.QueryBody.Where.In = map[string][]interface{}{}
- }
- switch state {
- case "pending": //待处理
- sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{1}
- case "processed": //已处理
- sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{2}
- case "neglected": //已忽略
- sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{3}
- default:
- sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{1, 2, 3}
- }
- delete(sp.QueryBody.Where.EQ, "apply_staff.deal_state")
- } else {
- if sp.QueryBody.Where.In == nil {
- sp.QueryBody.Where.In = map[string][]interface{}{}
- }
- sp.QueryBody.Where.In["apply_staff.deal_state"] = []interface{}{1, 2, 3}
- }
- sp.QueryBody.Where.In["state"] = _all
- query, qbDebug := d.QueryBasic(c, sp)
- if res, debug, err = d.QueryResult(c, query, sp, qbDebug); err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- return
- }
- docBuckets["vlist"] = res.Result
- // 类型统计
- typeFilter := elastic.NewBoolQuery().Filter(
- elastic.NewTermsQuery("state", _all...),
- elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(
- elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid),
- elastic.NewTermsQuery("apply_staff.deal_state", []interface{}{1, 2, 3}...),
- )),
- )
- for _, v := range sp.QueryBody.Where.Like {
- typeFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%"))
- }
- typeAgg := elastic.NewTermsAggregation().Field("pid")
- request1 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(typeFilter).Aggregation("pid", typeAgg).Size(0))
- // 状态统计
- stateFilter := elastic.NewBoolQuery().Filter(
- elastic.NewTermsQuery("state", _all...),
- elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid))),
- )
- if pid, ok := sp.QueryBody.Where.EQ["pid"]; ok {
- stateFilter = stateFilter.Filter(elastic.NewTermsQuery("pid", pid))
- }
- for _, v := range sp.QueryBody.Where.Like {
- stateFilter = typeFilter.Filter(elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%"))
- }
- stateAgg := elastic.NewFiltersAggregation().
- FilterWithName("pending", elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid), elastic.NewTermQuery("apply_staff.deal_state", 1)))).
- FilterWithName("processed", elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid), elastic.NewTermQuery("apply_staff.deal_state", 2)))).
- FilterWithName("neglected", elastic.NewNestedQuery("apply_staff", elastic.NewBoolQuery().Must(elastic.NewTermQuery("apply_staff.apply_staff_mid", applyStaffMid), elastic.NewTermQuery("apply_staff.deal_state", 3))))
- request2 := elastic.NewSearchRequest().Index(sp.QueryBody.From).Type("base").Source(elastic.NewSearchSource().Query(stateFilter).Aggregation("state", stateAgg).Size(0))
- MultiRes, err := d.esPool[sp.AppIDConf.ESCluster].MultiSearch().Add(request1, request2).Do(c)
- if err != nil {
- PromError(fmt.Sprintf("es:%s ", sp.Business), "%v", err)
- return
- }
- // 取得数据
- tmp := map[string]interface{}{}
- json.Unmarshal(*MultiRes.Responses[0].Aggregations["pid"], &tmp)
- docBuckets["tlist"] = tmp["buckets"]
- tmp = map[string]interface{}{}
- json.Unmarshal(*MultiRes.Responses[1].Aggregations["state"], &tmp)
- docBuckets["plist"] = tmp["buckets"]
- if resResult, e := json.Marshal(docBuckets); e != nil {
- log.Error("CreativeArchiveSearch.json.error(%v)", e)
- } else {
- res.Result = resResult
- }
- return
- }
|