upsert.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "go-common/app/admin/main/search/dao"
  7. "go-common/app/admin/main/search/model"
  8. "go-common/library/ecode"
  9. "go-common/library/log"
  10. )
  11. // Upsert upsert docs.
  12. func (s *Service) Upsert(c context.Context, up *model.UpsertParams, dataBody map[string][]model.MapData) (err error) {
  13. app, ok := s.queryConf[up.Business]
  14. if app2, ok2 := model.QueryConf[up.Business]; ok2 {
  15. app = app2
  16. ok = true
  17. }
  18. if !ok {
  19. err = fmt.Errorf("up.Business(%s) not exists in queryConf", up.Business)
  20. return
  21. }
  22. if app.ESCluster == "" {
  23. err = fmt.Errorf("app(%+v) escluster is empty", app)
  24. return
  25. }
  26. // dataBody to upsertBody
  27. up.UpsertBody = []model.UpsertBody{}
  28. for indexName, docs := range dataBody {
  29. if !strings.Contains(indexName, app.IndexPrefix) {
  30. log.Error("invalid indexName (%s)", indexName)
  31. continue
  32. }
  33. for _, doc := range docs {
  34. indexID := doc.StrID(app.IndexID)
  35. //TODO 提前告知base不对
  36. if indexID == "" {
  37. continue
  38. }
  39. upsert := model.UpsertBody{IndexName: indexName, IndexType: app.IndexType, IndexID: indexID, Doc: doc}
  40. up.UpsertBody = append(up.UpsertBody, upsert)
  41. }
  42. }
  43. if err = s.dao.UpsertBulk(c, app.ESCluster, up); err != nil {
  44. dao.PromError(fmt.Sprintf("es:%s 更新失败", app.ESCluster), "s.dao.UpsertBulk error(%v) ", err)
  45. err = ecode.SearchUpdateIndexFailed
  46. }
  47. return
  48. }