query.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "go-common/app/admin/main/search/model"
  9. "go-common/library/log"
  10. "gopkg.in/olivere/elastic.v5"
  11. )
  12. const (
  13. _queryConfSQL = `select appid,es_name,index_prefix,index_type,index_id,index_mapping,query_max_indexes from digger_app`
  14. )
  15. // QueryConf query conf
  16. func (d *Dao) QueryConf(ctx context.Context) (res map[string]*model.QueryConfDetail, err error) {
  17. rows, err := d.queryConfStmt.Query(ctx)
  18. if err != nil {
  19. log.Error("d.queryConfStmt.Query() error(%v)", err)
  20. return
  21. }
  22. defer rows.Close()
  23. res = make(map[string]*model.QueryConfDetail)
  24. for rows.Next() {
  25. var (
  26. appid string
  27. qcd = new(model.QueryConfDetail)
  28. )
  29. if err = rows.Scan(&appid, &qcd.ESCluster, &qcd.IndexPrefix, &qcd.IndexType, &qcd.IndexID, &qcd.IndexMapping, &qcd.MaxIndicesNum); err != nil {
  30. log.Error("d.QueryConf() rows.Scan() error(%v)", err)
  31. return
  32. }
  33. res[appid] = qcd
  34. }
  35. err = rows.Err()
  36. return
  37. }
  38. type querysModel struct {
  39. field string
  40. whereKind string
  41. esQuery elastic.Query
  42. }
  43. // QueryBasic 其中boolQuery方便定制化业务传参过来.
  44. func (d *Dao) QueryBasic(c context.Context, sp *model.QueryParams) (mixedQuery *elastic.BoolQuery, qbDebug *model.QueryDebugResult) {
  45. mixedQuery = elastic.NewBoolQuery()
  46. qbDebug = &model.QueryDebugResult{}
  47. querys := []*querysModel{}
  48. netstedQuerys := map[string]*elastic.BoolQuery{} // key: path value: boolQuery
  49. //fields
  50. if len(sp.QueryBody.Fields) == 0 {
  51. sp.QueryBody.Fields = []string{}
  52. }
  53. //from done
  54. //where
  55. if sp.QueryBody.Where == nil {
  56. sp.QueryBody.Where = &model.QueryBodyWhere{} //要给个默认值
  57. }
  58. //where - eq
  59. for k, v := range sp.QueryBody.Where.EQ {
  60. querys = append(querys, &querysModel{
  61. field: k,
  62. whereKind: "eq",
  63. esQuery: elastic.NewTermQuery(k, v),
  64. })
  65. }
  66. //where - or
  67. for k, v := range sp.QueryBody.Where.Or {
  68. querys = append(querys, &querysModel{
  69. field: k,
  70. whereKind: "or",
  71. esQuery: elastic.NewTermQuery(k, v),
  72. })
  73. }
  74. //where - in
  75. for k, v := range sp.QueryBody.Where.In {
  76. if len(v) > 1024 {
  77. e := fmt.Sprintf("where in 超过1024 business(%s) error(%v)", sp.Business, v)
  78. log.Error(e)
  79. qbDebug.AddErrMsg(e)
  80. continue
  81. }
  82. querys = append(querys, &querysModel{
  83. field: k,
  84. whereKind: "in",
  85. esQuery: elastic.NewTermsQuery(k, v...),
  86. })
  87. }
  88. //where - range
  89. ranges, err := d.queryBasicRange(sp.QueryBody.Where.Range)
  90. if err != nil {
  91. qbDebug.AddErrMsg(err.Error())
  92. }
  93. for k, v := range ranges {
  94. querys = append(querys, &querysModel{
  95. field: k,
  96. whereKind: "range",
  97. esQuery: v,
  98. })
  99. }
  100. //where - combo
  101. for _, v := range sp.QueryBody.Where.Combo {
  102. //外面用bool+should+minimum包裹
  103. combo := elastic.NewBoolQuery()
  104. //里面每个子项也是bool+should+minimum
  105. cmbEQ := elastic.NewBoolQuery()
  106. cmbIn := elastic.NewBoolQuery()
  107. cmbRange := elastic.NewBoolQuery()
  108. cmbNotEQ := elastic.NewBoolQuery()
  109. cmbNotIn := elastic.NewBoolQuery()
  110. cmbNotRange := elastic.NewBoolQuery()
  111. //所有的minumum
  112. if v.Min.Min == 0 {
  113. v.Min.Min = 1
  114. }
  115. if v.Min.EQ == 0 {
  116. v.Min.EQ = 1
  117. }
  118. if v.Min.In == 0 {
  119. v.Min.In = 1
  120. }
  121. if v.Min.Range == 0 {
  122. v.Min.Range = 1
  123. }
  124. if v.Min.NotEQ == 0 {
  125. v.Min.NotEQ = 1
  126. }
  127. if v.Min.NotIn == 0 {
  128. v.Min.NotIn = 1
  129. }
  130. if v.Min.NotRange == 0 {
  131. v.Min.NotRange = 1
  132. }
  133. //子项should
  134. for _, vEQ := range v.EQ {
  135. for eqK, eqV := range vEQ {
  136. cmbEQ.Should(elastic.NewTermQuery(eqK, eqV))
  137. }
  138. }
  139. for _, vIn := range v.In {
  140. for inK, inV := range vIn {
  141. cmbIn.Should(elastic.NewTermsQuery(inK, inV...))
  142. }
  143. }
  144. for _, vRange := range v.Range {
  145. ranges, _ := d.queryBasicRange(vRange)
  146. for _, rangeV := range ranges {
  147. cmbRange.Should(rangeV)
  148. }
  149. }
  150. for _, notEQ := range v.NotEQ {
  151. for k, v := range notEQ {
  152. cmbNotEQ.Should(elastic.NewTermQuery(k, v))
  153. }
  154. }
  155. for _, notIn := range v.NotIn {
  156. for k, v := range notIn {
  157. cmbNotIn.Should(elastic.NewTermsQuery(k, v...))
  158. }
  159. }
  160. for _, notRange := range v.NotRange {
  161. ranges, _ := d.queryBasicRange(notRange)
  162. for _, v := range ranges {
  163. cmbNotRange.Should(v)
  164. }
  165. }
  166. //子项minimum
  167. if len(v.EQ) > 0 {
  168. combo.Should(cmbEQ.MinimumNumberShouldMatch(v.Min.EQ))
  169. }
  170. if len(v.In) > 0 {
  171. combo.Should(cmbIn.MinimumNumberShouldMatch(v.Min.In))
  172. }
  173. if len(v.Range) > 0 {
  174. combo.Should(cmbRange.MinimumNumberShouldMatch(v.Min.Range))
  175. }
  176. if len(v.NotEQ) > 0 {
  177. combo.MustNot(elastic.NewBoolQuery().Should(cmbNotEQ.MinimumNumberShouldMatch(v.Min.NotEQ)))
  178. }
  179. if len(v.NotIn) > 0 {
  180. combo.MustNot(elastic.NewBoolQuery().Should(cmbNotIn.MinimumNumberShouldMatch(v.Min.NotIn)))
  181. }
  182. if len(v.NotRange) > 0 {
  183. combo.MustNot(elastic.NewBoolQuery().Should(cmbNotRange.MinimumNumberShouldMatch(v.Min.NotRange)))
  184. }
  185. //合并子项
  186. mixedQuery.Filter(combo.MinimumNumberShouldMatch(v.Min.Min))
  187. }
  188. //where - like
  189. like, err := d.queryBasicLike(sp.QueryBody.Where.Like, sp.Business)
  190. if err != nil {
  191. qbDebug.AddErrMsg(err.Error())
  192. }
  193. for _, v := range like {
  194. querys = append(querys, &querysModel{
  195. whereKind: "like",
  196. esQuery: v,
  197. })
  198. }
  199. //mixedQuery
  200. for _, q := range querys {
  201. // like TODO like的map型字段也要支持must not和 nested
  202. if q.field == "" && q.whereKind == "like" {
  203. mixedQuery.Must(q.esQuery)
  204. continue
  205. }
  206. if q.field == "" {
  207. continue
  208. }
  209. // prepare nested 一个DSL只能出现一个nested,不然会有问题
  210. if mapField := strings.Split(q.field, "."); len(mapField) > 1 && mapField[0] != "" {
  211. if _, ok := netstedQuerys[mapField[0]]; !ok {
  212. netstedQuerys[mapField[0]] = elastic.NewBoolQuery()
  213. }
  214. if bl, ok := sp.QueryBody.Where.Not[q.whereKind][q.field]; ok && bl {
  215. // mixedQuery.Must(elastic.NewNestedQuery(mapField[0], elastic.NewBoolQuery().MustNot(q.esQuery)))
  216. netstedQuerys[mapField[0]].MustNot(q.esQuery)
  217. continue
  218. }
  219. // mixedQuery.Must(elastic.NewNestedQuery(mapField[0], elastic.NewBoolQuery().Must(q.esQuery)))
  220. netstedQuerys[mapField[0]].Must(q.esQuery)
  221. continue
  222. }
  223. // must not
  224. if bl, ok := sp.QueryBody.Where.Not[q.whereKind][q.field]; ok && bl {
  225. mixedQuery.MustNot(q.esQuery)
  226. continue
  227. }
  228. // should
  229. if q.whereKind == "or" {
  230. mixedQuery.Should(q.esQuery)
  231. mixedQuery.MinimumShouldMatch("1") // 暂时为1
  232. continue
  233. }
  234. // default
  235. mixedQuery.Filter(q.esQuery)
  236. // random order with seed
  237. if sp.QueryBody.OrderRandomSeed != "" {
  238. random := elastic.NewRandomFunction().Seed(sp.QueryBody.OrderRandomSeed)
  239. score := elastic.NewFunctionScoreQuery().Add(elastic.NewBoolQuery(), random)
  240. mixedQuery.Must(score)
  241. }
  242. }
  243. // insert nested
  244. for k, n := range netstedQuerys {
  245. mixedQuery.Must(elastic.NewNestedQuery(k, n))
  246. }
  247. // DSL
  248. if sp.DebugLevel != 0 {
  249. if src, e := mixedQuery.Source(); e == nil {
  250. if data, er := json.Marshal(src); er == nil {
  251. qbDebug.DSL = string(data)
  252. }
  253. }
  254. }
  255. return
  256. }
  257. // queryBasicRange .
  258. func (d *Dao) queryBasicRange(rangeMap map[string]string) (rangeQuery map[string]*elastic.RangeQuery, err error) {
  259. rangeQuery = make(map[string]*elastic.RangeQuery)
  260. for k, v := range rangeMap {
  261. if r := strings.Trim(v, " "); r != "" {
  262. if rs := []rune(r); len(rs) > 3 {
  263. firstStr := string(rs[0:1])
  264. endStr := string(rs[len(rs)-1:])
  265. rangeStr := strings.Trim(v, "[]() ")
  266. FromTo := strings.Split(rangeStr, ",")
  267. if len(FromTo) != 2 {
  268. err = fmt.Errorf("sp.QueryBody.Where.Range Fromto err")
  269. continue
  270. }
  271. rQuery := elastic.NewRangeQuery(k)
  272. rc := 0
  273. if firstStr == "(" && strings.Trim(FromTo[0], " ") != "" {
  274. rQuery.Gt(strings.Trim(FromTo[0], " "))
  275. rc++
  276. }
  277. if firstStr == "[" && strings.Trim(FromTo[0], " ") != "" {
  278. rQuery.Gte(strings.Trim(FromTo[0], " "))
  279. rc++
  280. }
  281. if endStr == ")" && strings.Trim(FromTo[1], " ") != "" {
  282. rQuery.Lt(strings.Trim(FromTo[1], " "))
  283. rc++
  284. }
  285. if endStr == "]" && strings.Trim(FromTo[1], " ") != "" {
  286. rQuery.Lte(strings.Trim(FromTo[1], " "))
  287. rc++
  288. }
  289. if rc == 0 {
  290. continue
  291. }
  292. rangeQuery[k] = rQuery
  293. } else {
  294. // 范围格式有问题
  295. err = fmt.Errorf("sp.QueryBody.Where.Range range format err. error(%v)", v)
  296. continue
  297. }
  298. }
  299. }
  300. return
  301. }
  302. func (d *Dao) queryBasicLike(likeMap []model.QueryBodyWhereLike, business string) (likeQuery []elastic.Query, err error) {
  303. for _, v := range likeMap {
  304. if len(v.KW) == 0 {
  305. continue
  306. }
  307. switch v.Level {
  308. case model.LikeLevelHigh:
  309. kw := []string{}
  310. r := []rune(v.KW[0])
  311. for i := 0; i < len(r); i++ {
  312. if k := string(r[i : i+1]); !strings.ContainsAny(k, "~[](){}^?:\"\\/!+-=&* ") { //去掉特殊符号
  313. kw = append(kw, k)
  314. } else if len(kw) > 1 && kw[len(kw)-1:][0] != "*" {
  315. kw = append(kw, "*", " ", "*")
  316. }
  317. }
  318. if len(kw) == 0 || strings.Join(kw, "") == "* *" {
  319. continue
  320. }
  321. qs := elastic.NewQueryStringQuery("*" + strings.Trim(strings.Join(kw, ""), "* ") + "*").AllowLeadingWildcard(true) //默认是or
  322. if !v.Or {
  323. qs.DefaultOperator("AND")
  324. }
  325. for _, v := range v.KWFields {
  326. qs.Field(v)
  327. }
  328. likeQuery = append(likeQuery, qs)
  329. case model.LikeLevelMiddel:
  330. // 单个字要特殊处理
  331. if r := []rune(v.KW[0]); len(r) == 1 && len(v.KW) == 1 {
  332. qs := elastic.NewQueryStringQuery("*" + string(r[:]) + "*").AllowLeadingWildcard(true) //默认是or
  333. if !v.Or {
  334. qs.DefaultOperator("AND")
  335. }
  336. for _, v := range v.KWFields {
  337. qs.Field(v)
  338. }
  339. likeQuery = append(likeQuery, qs)
  340. continue
  341. }
  342. // 自定义analyzer时,multi_match无法使用minimum_should_match,默认为至少一个满足,导致结果集还是很大
  343. // ngram(2,2)
  344. for _, kw := range v.KW {
  345. rn := []rune(kw)
  346. for i := 0; i+1 < len(rn); i++ {
  347. kwStr := string(rn[i : i+2])
  348. for _, kwField := range v.KWFields {
  349. likeQuery = append(likeQuery, elastic.NewTermQuery(kwField, kwStr))
  350. }
  351. }
  352. }
  353. case "", model.LikeLevelLow:
  354. qs := elastic.NewMultiMatchQuery(strings.Join(v.KW, " "), v.KWFields...).Type("best_fields").TieBreaker(0.6).MinimumShouldMatch("90%") //默认是and
  355. // TODO 业务自定义match
  356. if business == "copyright" {
  357. qs.MinimumShouldMatch("10%")
  358. }
  359. if business == "academy_archive" {
  360. qs.MinimumShouldMatch("50%")
  361. }
  362. if v.Or {
  363. qs.Operator("OR")
  364. }
  365. likeQuery = append(likeQuery, qs)
  366. }
  367. }
  368. return
  369. }
  370. func (d *Dao) Scroll(c context.Context, sp *model.QueryParams) (res *model.QueryResult, debug *model.QueryDebugResult, err error) {
  371. var (
  372. tList []json.RawMessage
  373. tLen int
  374. ScrollID = ""
  375. )
  376. res = &model.QueryResult{}
  377. esCluster := sp.AppIDConf.ESCluster
  378. query, _ := d.QueryBasic(c, sp)
  379. eSearch, ok := d.esPool[esCluster]
  380. if !ok {
  381. PromError(fmt.Sprintf("es:集群不存在%s", esCluster), "s.dao.searchResult indexName:%s", esCluster)
  382. return
  383. }
  384. fsc := elastic.NewFetchSourceContext(true).Include(sp.QueryBody.Fields...)
  385. // multi sort
  386. sorterSlice := []elastic.Sorter{}
  387. if len(sp.QueryBody.Where.Like) > 0 && sp.QueryBody.OrderScoreFirst { // like 长度 > 0,但里面是空的也是个问题
  388. sorterSlice = append(sorterSlice, elastic.NewScoreSort().Desc())
  389. }
  390. for _, i := range sp.QueryBody.Order {
  391. for k, v := range i {
  392. if v == "asc" {
  393. sorterSlice = append(sorterSlice, elastic.NewFieldSort(k).Asc())
  394. } else {
  395. sorterSlice = append(sorterSlice, elastic.NewFieldSort(k).Desc())
  396. }
  397. }
  398. }
  399. if len(sp.QueryBody.Where.Like) > 0 && !sp.QueryBody.OrderScoreFirst {
  400. sorterSlice = append(sorterSlice, elastic.NewScoreSort().Desc())
  401. }
  402. for {
  403. searchResult, err := eSearch.Scroll().Index(sp.QueryBody.From).Type("base").
  404. Query(query).FetchSourceContext(fsc).Size(5000).Scroll("1m").ScrollId(ScrollID).SortBy(sorterSlice...).Do(c)
  405. if err == io.EOF {
  406. break
  407. } else if err != nil {
  408. PromError(fmt.Sprintf("es:执行查询失败%s ", "Scroll"), "es:执行查询失败%v", err)
  409. break
  410. }
  411. ScrollID = searchResult.ScrollId
  412. for _, hit := range searchResult.Hits.Hits {
  413. var t json.RawMessage
  414. if err = json.Unmarshal(*hit.Source, &t); err != nil {
  415. PromError(fmt.Sprintf("es:Unmarshal%s ", "Scroll"), "es:Unmarshal%v", err)
  416. break
  417. }
  418. tList = append(tList, t)
  419. tLen++
  420. if tLen >= sp.QueryBody.Pn*sp.QueryBody.Ps {
  421. goto ClearScroll
  422. }
  423. }
  424. }
  425. ClearScroll:
  426. go eSearch.ClearScroll().ScrollId(ScrollID).Do(context.Background())
  427. if res.Result, err = json.Marshal(tList); err != nil {
  428. PromError(fmt.Sprintf("es:Unmarshal%s ", "Scroll"), "es:Unmarshal%v", err)
  429. return
  430. }
  431. return
  432. }