es.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "strings"
  9. "go-common/app/admin/main/search/model"
  10. "go-common/library/log"
  11. "gopkg.in/olivere/elastic.v5"
  12. )
  13. // UpdateMapBulk (Deprecated).
  14. func (d *Dao) UpdateMapBulk(c context.Context, esName string, bulkData []BulkMapItem) (err error) {
  15. bulkRequest := d.esPool[esName].Bulk()
  16. for _, b := range bulkData {
  17. request := elastic.NewBulkUpdateRequest().Index(b.IndexName()).Type(b.IndexType()).Id(b.IndexID()).Doc(b.PField()).DocAsUpsert(true)
  18. bulkRequest.Add(request)
  19. }
  20. if _, err = bulkRequest.Do(c); err != nil {
  21. log.Error("esName(%s) bulk error(%v)", esName, err)
  22. }
  23. return
  24. }
  25. // UpdateBulk (Deprecated).
  26. func (d *Dao) UpdateBulk(c context.Context, esName string, bulkData []BulkItem) (err error) {
  27. bulkRequest := d.esPool[esName].Bulk()
  28. for _, b := range bulkData {
  29. request := elastic.NewBulkUpdateRequest().Index(b.IndexName()).Type(b.IndexType()).Id(b.IndexID()).Doc(b).DocAsUpsert(true)
  30. bulkRequest.Add(request)
  31. }
  32. if _, err = bulkRequest.Do(c); err != nil {
  33. log.Error("esName(%s) bulk error(%v)", esName, err)
  34. }
  35. return
  36. }
  37. // UpsertBulk 为了替换UpdateMapBulk和UpdateBulk .
  38. func (d *Dao) UpsertBulk(c context.Context, esCluster string, up *model.UpsertParams) (err error) {
  39. es, ok := d.esPool[esCluster]
  40. if !ok {
  41. log.Error("esCluster(%s) not exists", esCluster)
  42. return
  43. }
  44. bulkRequest := es.Bulk()
  45. for _, b := range up.UpsertBody {
  46. request := elastic.NewBulkUpdateRequest().Index(b.IndexName).Type(b.IndexType).Id(b.IndexID).Doc(b.Doc)
  47. if up.Insert {
  48. request.DocAsUpsert(true)
  49. }
  50. //fmt.Println(request)
  51. bulkRequest.Add(request)
  52. }
  53. if _, err = bulkRequest.Do(c); err != nil {
  54. log.Error("esCluster(%s) bulk error(%v)", esCluster, err)
  55. }
  56. return
  57. }
  58. // searchResult get result from ES. (Deprecated) v3迁移完要删掉.
  59. func (d *Dao) searchResult(c context.Context, esClusterName, indexName string, query elastic.Query, bsp *model.BasicSearchParams) (res *model.SearchResult, err error) {
  60. res = &model.SearchResult{Debug: ""}
  61. if bsp.Debug {
  62. if src, e := query.Source(); e == nil {
  63. if data, er := json.Marshal(src); er == nil {
  64. res = &model.SearchResult{Debug: string(data)}
  65. } else {
  66. err = er
  67. log.Error("searchResult query.Source.json.Marshal error(%v)", err)
  68. return
  69. }
  70. } else {
  71. err = e
  72. log.Error("searchResult query.Source error(%v)", err)
  73. return
  74. }
  75. }
  76. if _, ok := d.esPool[esClusterName]; !ok {
  77. PromError(fmt.Sprintf("es:集群不存在%s", esClusterName), "s.dao.searchResult indexName:%s", indexName)
  78. res = &model.SearchResult{Debug: fmt.Sprintf("es:集群不存在%s, %s", esClusterName, res.Debug)}
  79. return
  80. }
  81. // multi sort
  82. sorterSlice := []elastic.Sorter{}
  83. if bsp.KW != "" && bsp.ScoreFirst {
  84. sorterSlice = append(sorterSlice, elastic.NewScoreSort().Desc())
  85. }
  86. for i, d := range bsp.Order {
  87. if len(bsp.Sort) < i+1 {
  88. if bsp.Sort[0] == "desc" {
  89. sorterSlice = append(sorterSlice, elastic.NewFieldSort(d).Desc())
  90. } else {
  91. sorterSlice = append(sorterSlice, elastic.NewFieldSort(d).Asc())
  92. }
  93. } else {
  94. if bsp.Sort[i] == "desc" {
  95. sorterSlice = append(sorterSlice, elastic.NewFieldSort(d).Desc())
  96. } else {
  97. sorterSlice = append(sorterSlice, elastic.NewFieldSort(d).Asc())
  98. }
  99. }
  100. }
  101. if bsp.KW != "" && !bsp.ScoreFirst {
  102. sorterSlice = append(sorterSlice, elastic.NewScoreSort().Desc())
  103. }
  104. // source
  105. fsc := elastic.NewFetchSourceContext(true).Include(bsp.Source...)
  106. // highlight
  107. hl := elastic.NewHighlight()
  108. if bsp.Highlight && len(bsp.KwFields) > 0 {
  109. for _, v := range bsp.KwFields {
  110. hl = hl.Fields(elastic.NewHighlighterField(v))
  111. }
  112. hl = hl.PreTags("<em class=\"keyword\">").PostTags("</em>")
  113. }
  114. // from + size = 10,000
  115. from := (bsp.Pn - 1) * bsp.Ps
  116. size := bsp.Ps
  117. if (from + size) > 10000 {
  118. from = 10000 - size
  119. }
  120. // do
  121. searchResult, err := d.esPool[esClusterName].
  122. Search().Index(indexName).
  123. Highlight(hl).
  124. Query(query).
  125. SortBy(sorterSlice...).
  126. From(from).
  127. Size(size).
  128. Pretty(true).
  129. FetchSourceContext(fsc).
  130. Do(context.Background())
  131. if err != nil {
  132. PromError(fmt.Sprintf("es:执行查询失败%s ", esClusterName), "%v", err)
  133. res = &model.SearchResult{Debug: res.Debug + "es:执行查询失败"}
  134. return
  135. }
  136. var data []json.RawMessage
  137. b := bytes.Buffer{}
  138. b.WriteString("{")
  139. b.WriteString("}")
  140. for _, hit := range searchResult.Hits.Hits {
  141. var t json.RawMessage
  142. e := json.Unmarshal(*hit.Source, &t)
  143. if e != nil {
  144. PromError(fmt.Sprintf("es:%s 索引有脏数据", esClusterName), "s.dao.SearchArchiveCheck(%d,%d) error(%v) ", bsp.Pn*bsp.Ps, bsp.Ps, e)
  145. continue
  146. }
  147. data = append(data, t)
  148. // highlight
  149. if len(hit.Highlight) > 0 {
  150. b, _ := json.Marshal(hit.Highlight)
  151. h := []byte(string(b))
  152. data = append(data, h)
  153. } else if bsp.Highlight {
  154. data = append(data, b.Bytes()) //保证在高亮情况下,肯定有一对数据
  155. }
  156. }
  157. if len(data) == 0 {
  158. data = []json.RawMessage{}
  159. }
  160. res = &model.SearchResult{
  161. Order: strings.Join(bsp.Order, ","),
  162. Sort: strings.Join(bsp.Sort, ","),
  163. Result: data,
  164. Debug: res.Debug,
  165. Page: &model.Page{
  166. Pn: bsp.Pn,
  167. Ps: bsp.Ps,
  168. Total: searchResult.Hits.TotalHits,
  169. },
  170. }
  171. return
  172. }
  173. // QueryResult query result from ES.
  174. func (d *Dao) QueryResult(c context.Context, query elastic.Query, sp *model.QueryParams, qbDebug *model.QueryDebugResult) (res *model.QueryResult, qrDebug *model.QueryDebugResult, err error) {
  175. qrDebug = &model.QueryDebugResult{}
  176. if qbDebug != nil {
  177. qrDebug = qbDebug
  178. }
  179. esCluster := sp.AppIDConf.ESCluster
  180. if _, ok := d.esPool[esCluster]; !ok {
  181. qrDebug.AddErrMsg("es:集群不存在" + esCluster)
  182. return
  183. }
  184. if sp.DebugLevel != 0 {
  185. qrDebug.Mapping, err = d.esPool[esCluster].GetMapping().Index(sp.QueryBody.From).Do(context.Background())
  186. }
  187. // 低级别debug,在dsl执行前退出
  188. if sp.DebugLevel == 1 {
  189. return
  190. }
  191. // multi sort
  192. sorterSlice := []elastic.Sorter{}
  193. if len(sp.QueryBody.Where.Like) > 0 && sp.QueryBody.OrderScoreFirst { // like 长度 > 0,但里面是空的也是个问题
  194. sorterSlice = append(sorterSlice, elastic.NewScoreSort().Desc())
  195. }
  196. for _, i := range sp.QueryBody.Order {
  197. for k, v := range i {
  198. if v == "asc" {
  199. sorterSlice = append(sorterSlice, elastic.NewFieldSort(k).Asc())
  200. } else {
  201. sorterSlice = append(sorterSlice, elastic.NewFieldSort(k).Desc())
  202. }
  203. }
  204. }
  205. if len(sp.QueryBody.Where.Like) > 0 && sp.QueryBody.OrderScoreFirst {
  206. sorterSlice = append(sorterSlice, elastic.NewScoreSort().Desc())
  207. }
  208. // source
  209. fsc := elastic.NewFetchSourceContext(true).Include(sp.QueryBody.Fields...)
  210. // highlight
  211. hl := elastic.NewHighlight()
  212. if sp.QueryBody.Highlight && len(sp.QueryBody.Where.Like) > 0 {
  213. for _, v := range sp.QueryBody.Where.Like {
  214. for _, field := range v.KWFields {
  215. hl = hl.Fields(elastic.NewHighlighterField(field))
  216. }
  217. }
  218. hl = hl.PreTags("<em class=\"keyword\">").PostTags("</em>")
  219. }
  220. // from + size = 10,000
  221. maxRows := 10000
  222. if b, ok := model.PermConf["oht"][sp.Business]; ok && b == "true" {
  223. maxRows = 100000
  224. }
  225. from := (sp.QueryBody.Pn - 1) * sp.QueryBody.Ps
  226. size := sp.QueryBody.Ps
  227. if (from + size) > maxRows {
  228. from = maxRows - size
  229. }
  230. // Scroll
  231. if sp.QueryBody.Scroll == true {
  232. var (
  233. tList []json.RawMessage
  234. tLen int
  235. ScrollID = ""
  236. )
  237. res = &model.QueryResult{}
  238. esCluster := sp.AppIDConf.ESCluster
  239. eSearch, ok := d.esPool[esCluster]
  240. if !ok {
  241. PromError(fmt.Sprintf("es:集群不存在%s", esCluster), "s.dao.searchResult indexName:%s", esCluster)
  242. return
  243. }
  244. fsc := elastic.NewFetchSourceContext(true).Include(sp.QueryBody.Fields...)
  245. // multi sort
  246. sorterSlice := []elastic.Sorter{}
  247. if len(sp.QueryBody.Where.Like) > 0 && sp.QueryBody.OrderScoreFirst { // like 长度 > 0,但里面是空的也是个问题
  248. sorterSlice = append(sorterSlice, elastic.NewScoreSort().Desc())
  249. }
  250. for _, i := range sp.QueryBody.Order {
  251. for k, v := range i {
  252. if v == "asc" {
  253. sorterSlice = append(sorterSlice, elastic.NewFieldSort(k).Asc())
  254. } else {
  255. sorterSlice = append(sorterSlice, elastic.NewFieldSort(k).Desc())
  256. }
  257. }
  258. }
  259. if len(sp.QueryBody.Where.Like) > 0 && !sp.QueryBody.OrderScoreFirst {
  260. sorterSlice = append(sorterSlice, elastic.NewScoreSort().Desc())
  261. }
  262. for {
  263. searchResult, err := eSearch.Scroll().Index(sp.QueryBody.From).
  264. Query(query).FetchSourceContext(fsc).Size(sp.QueryBody.Ps).Scroll("1m").ScrollId(ScrollID).SortBy(sorterSlice...).Do(c)
  265. if err == io.EOF {
  266. break
  267. } else if err != nil {
  268. PromError(fmt.Sprintf("es:执行查询失败%s ", "Scroll"), "es:执行查询失败%v", err)
  269. break
  270. }
  271. ScrollID = searchResult.ScrollId
  272. for _, hit := range searchResult.Hits.Hits {
  273. var t json.RawMessage
  274. if err = json.Unmarshal(*hit.Source, &t); err != nil {
  275. PromError(fmt.Sprintf("es:Unmarshal%s ", "Scroll"), "es:Unmarshal%v", err)
  276. break
  277. }
  278. tList = append(tList, t)
  279. tLen++
  280. if tLen >= sp.QueryBody.Pn*sp.QueryBody.Ps {
  281. goto ClearScroll
  282. }
  283. }
  284. }
  285. ClearScroll:
  286. go eSearch.ClearScroll().ScrollId(ScrollID).Do(context.Background())
  287. if res.Result, err = json.Marshal(tList); err != nil {
  288. PromError(fmt.Sprintf("es:Unmarshal%s ", "Scroll"), "es:Unmarshal%v", err)
  289. return
  290. }
  291. return
  292. }
  293. // do
  294. searchPrepare := d.esPool[esCluster].
  295. Search().Index(sp.QueryBody.From).
  296. Highlight(hl).
  297. Query(query).
  298. SortBy(sorterSlice...).
  299. From(from).
  300. Size(size).
  301. FetchSourceContext(fsc).IgnoreUnavailable(true).AllowNoIndices(true)
  302. if ec, ok := model.PermConf["es_cache"][sp.Business]; ok && ec == "true" {
  303. searchPrepare.RequestCache(true)
  304. }
  305. if rt, ok := model.PermConf["routing"][sp.Business]; ok {
  306. routing := make([]string, 0, 1)
  307. if sp.QueryBody.Where.EQ != nil {
  308. if eq, ok := sp.QueryBody.Where.EQ[rt]; ok {
  309. routing = append(routing, fmt.Sprintf("%v", eq))
  310. }
  311. }
  312. if sp.QueryBody.Where.In != nil {
  313. if in, ok := sp.QueryBody.Where.In[rt]; ok {
  314. for _, v := range in {
  315. routing = append(routing, fmt.Sprintf("%v", v))
  316. }
  317. }
  318. }
  319. if len(routing) == 0 {
  320. qrDebug.AddErrMsg("es:路由不存在" + rt)
  321. return
  322. }
  323. searchPrepare.Routing(routing...)
  324. }
  325. if sp.DebugLevel == 2 {
  326. searchPrepare.Profile(true)
  327. }
  328. // Enhanced
  329. for _, v := range sp.QueryBody.Where.Enhanced {
  330. aggKey := v.Mode + "_" + v.Field
  331. switch v.Mode {
  332. case model.EnhancedModeGroupBy:
  333. aggs := elastic.NewTermsAggregation()
  334. aggs = aggs.Field(v.Field).Size(1000) //要和业务方确定具体值
  335. searchPrepare.Aggregation(aggKey, aggs)
  336. case model.EnhancedModeCollapse, model.EnhancedModeDistinct:
  337. collapse := elastic.NewCollapseBuilder(v.Field).MaxConcurrentGroupRequests(1)
  338. innerHit := elastic.NewInnerHit().Name("last_one").Size(1)
  339. for _, v := range v.Order {
  340. for field, sort := range v {
  341. if sort == "desc" {
  342. innerHit.Sort(field, false)
  343. } else {
  344. innerHit.Sort(field, true)
  345. }
  346. }
  347. }
  348. if len(v.Order) > 0 {
  349. collapse.InnerHit(innerHit)
  350. }
  351. searchPrepare.Collapse(collapse)
  352. case model.EnhancedModeSum:
  353. aggs := elastic.NewSumAggregation()
  354. aggs = aggs.Field(v.Field)
  355. searchPrepare.Aggregation(aggKey, aggs)
  356. case model.EnhancedModeDistinctCount:
  357. aggs := elastic.NewCardinalityAggregation()
  358. aggs = aggs.Field(v.Field)
  359. searchPrepare.Aggregation(aggKey, aggs)
  360. }
  361. }
  362. searchResult, err := searchPrepare.Do(context.Background())
  363. if err != nil {
  364. qrDebug.AddErrMsg(fmt.Sprintf("es:执行查询失败%s. %v", esCluster, err))
  365. PromError(fmt.Sprintf("es:执行查询失败%s ", esCluster), "%v", err)
  366. return
  367. }
  368. // data
  369. data := json.RawMessage{}
  370. docHits := []json.RawMessage{}
  371. docBuckets := map[string][]map[string]*json.RawMessage{}
  372. b := bytes.Buffer{}
  373. b.WriteString("{")
  374. b.WriteString("}")
  375. for _, hit := range searchResult.Hits.Hits {
  376. var t json.RawMessage
  377. e := json.Unmarshal(*hit.Source, &t)
  378. if e != nil {
  379. PromError(fmt.Sprintf("es:%s 索引有脏数据", esCluster), "s.dao.SearchArchiveCheck(%d,%d) error(%v) ", sp.QueryBody.Pn*sp.QueryBody.Ps, sp.QueryBody.Ps, e)
  380. continue
  381. }
  382. docHits = append(docHits, t)
  383. // highlight
  384. if len(hit.Highlight) > 0 {
  385. b, _ := json.Marshal(hit.Highlight)
  386. docHits = append(docHits, b)
  387. } else if sp.QueryBody.Highlight {
  388. docHits = append(docHits, b.Bytes()) //保证在高亮情况下,肯定有一对数据
  389. }
  390. }
  391. if len(docHits) > 0 {
  392. if doc, er := json.Marshal(docHits); er != nil {
  393. qrDebug.AddErrMsg(fmt.Sprintf("es:Unmarshal docHits es:Unmarshal%v ", er))
  394. PromError(fmt.Sprintf("es:Unmarshal%s ", "docHits"), "es:Unmarshal%v", er)
  395. } else {
  396. data = doc
  397. }
  398. } else {
  399. h := bytes.Buffer{}
  400. h.WriteString("[")
  401. h.WriteString("]")
  402. data = h.Bytes()
  403. }
  404. // data overwrite
  405. for _, v := range sp.QueryBody.Where.Enhanced {
  406. key := v.Mode + "_" + v.Field
  407. switch v.Mode {
  408. case model.EnhancedModeGroupBy:
  409. result, ok := searchResult.Aggregations.Terms(key)
  410. if !ok {
  411. PromError(fmt.Sprintf("es:Unmarshal%s ", key), "es:Unmarshal%v", err)
  412. continue
  413. }
  414. for _, b := range result.Buckets {
  415. docBuckets[key] = append(docBuckets[key], b.Aggregations)
  416. }
  417. data = b.Bytes() //保证无数据情况下,有正常返回
  418. case model.EnhancedModeSum:
  419. result, ok := searchResult.Aggregations.Sum(key)
  420. if !ok {
  421. PromError(fmt.Sprintf("es:Unmarshal%s ", key), "es:Unmarshal%v", err)
  422. continue
  423. }
  424. docBuckets[key] = append(docBuckets[key], result.Aggregations)
  425. data = b.Bytes() //保证无数据情况下,有正常返回
  426. case model.EnhancedModeDistinctCount:
  427. result, ok := searchResult.Aggregations.Cardinality(key)
  428. if !ok {
  429. PromError(fmt.Sprintf("es:Unmarshal%s ", key), "es:Unmarshal%v", err)
  430. continue
  431. }
  432. docBuckets[key] = append(docBuckets[key], result.Aggregations)
  433. data = b.Bytes() //保证无数据情况下,有正常返回
  434. default:
  435. // other modes...
  436. }
  437. }
  438. if len(docBuckets) > 0 {
  439. if doc, er := json.Marshal(docBuckets); er != nil {
  440. qrDebug.AddErrMsg(fmt.Sprintf("es:Unmarshal docBuckets es:Unmarshal%v", er))
  441. PromError(fmt.Sprintf("es:Unmarshal%s ", "docBuckets"), "es:Unmarshal%v", er)
  442. } else {
  443. data = doc
  444. }
  445. }
  446. order := []string{}
  447. sort := []string{}
  448. for _, i := range sp.QueryBody.Order {
  449. for k, v := range i {
  450. order = append(order, k)
  451. sort = append(sort, v)
  452. }
  453. }
  454. res = &model.QueryResult{
  455. Order: strings.Join(order, ","),
  456. Sort: strings.Join(sort, ","),
  457. Result: data,
  458. Page: &model.Page{
  459. Pn: sp.QueryBody.Pn,
  460. Ps: sp.QueryBody.Ps,
  461. Total: searchResult.Hits.TotalHits,
  462. },
  463. }
  464. //(默认的debug)高级别debug,在dsl执行后退出
  465. if sp.DebugLevel == 2 {
  466. qrDebug.Profile = searchResult.Profile
  467. return
  468. }
  469. return
  470. }
  471. // BulkIndex .
  472. func (d *Dao) BulkIndex(c context.Context, esName string, bulkData []BulkItem) (err error) {
  473. bulkRequest := d.esPool[esName].Bulk()
  474. for _, b := range bulkData {
  475. request := elastic.NewBulkIndexRequest().Index(b.IndexName()).Type(b.IndexType()).Id(b.IndexID()).Doc(b)
  476. bulkRequest.Add(request)
  477. }
  478. if _, err = bulkRequest.Do(c); err != nil {
  479. log.Error("esName(%s) bulk error(%v)", esName, err)
  480. }
  481. return
  482. }
  483. // ExistIndex .
  484. func (d *Dao) ExistIndex(c context.Context, esClusterName, indexName string) (exist bool, err error) {
  485. if _, ok := d.esPool[esClusterName]; !ok {
  486. PromError(fmt.Sprintf("es:集群不存在%s", esClusterName), "s.dao.searchResult indexName:%s", indexName)
  487. err = fmt.Errorf("集群不存在")
  488. return
  489. }
  490. exist, err = d.esPool[esClusterName].IndexExists(indexName).Do(c)
  491. return
  492. }