123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- // Copyright 2012-present Oliver Eilhard. All rights reserved.
- // Use of this source code is governed by a MIT-license.
- // See http://olivere.mit-license.org/license.txt for details.
- package elastic
- import (
- "context"
- "encoding/json"
- "testing"
- )
- func TestReindexSourceWithBodyMap(t *testing.T) {
- client := setupTestClient(t)
- out, err := client.Reindex().Body(map[string]interface{}{
- "source": map[string]interface{}{
- "index": "twitter",
- },
- "dest": map[string]interface{}{
- "index": "new_twitter",
- },
- }).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithBodyString(t *testing.T) {
- client := setupTestClient(t)
- got, err := client.Reindex().Body(`{"source":{"index":"twitter"},"dest":{"index":"new_twitter"}}`).getBody()
- if err != nil {
- t.Fatal(err)
- }
- want := `{"source":{"index":"twitter"},"dest":{"index":"new_twitter"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithSourceIndexAndDestinationIndex(t *testing.T) {
- client := setupTestClient(t)
- out, err := client.Reindex().SourceIndex("twitter").DestinationIndex("new_twitter").getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithSourceAndDestinationAndVersionType(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter")
- dst := NewReindexDestination().Index("new_twitter").VersionType("external")
- out, err := client.Reindex().Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"new_twitter","version_type":"external"},"source":{"index":"twitter"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithSourceAndRemoteAndDestination(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter").RemoteInfo(
- NewReindexRemoteInfo().Host("http://otherhost:9200").
- Username("alice").
- Password("secret").
- ConnectTimeout("10s").
- SocketTimeout("1m"),
- )
- dst := NewReindexDestination().Index("new_twitter")
- out, err := client.Reindex().Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter","remote":{"connect_timeout":"10s","host":"http://otherhost:9200","password":"secret","socket_timeout":"1m","username":"alice"}}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithSourceAndDestinationAndOpType(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter")
- dst := NewReindexDestination().Index("new_twitter").OpType("create")
- out, err := client.Reindex().Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"new_twitter","op_type":"create"},"source":{"index":"twitter"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithConflictsProceed(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter")
- dst := NewReindexDestination().Index("new_twitter").OpType("create")
- out, err := client.Reindex().Conflicts("proceed").Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"conflicts":"proceed","dest":{"index":"new_twitter","op_type":"create"},"source":{"index":"twitter"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithProceedOnVersionConflict(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter")
- dst := NewReindexDestination().Index("new_twitter").OpType("create")
- out, err := client.Reindex().ProceedOnVersionConflict().Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"conflicts":"proceed","dest":{"index":"new_twitter","op_type":"create"},"source":{"index":"twitter"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithQuery(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter").Type("tweet").Query(NewTermQuery("user", "olivere"))
- dst := NewReindexDestination().Index("new_twitter")
- out, err := client.Reindex().Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter","query":{"term":{"user":"olivere"}},"type":"tweet"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithMultipleSourceIndicesAndTypes(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter", "blog").Type("tweet", "post")
- dst := NewReindexDestination().Index("all_together")
- out, err := client.Reindex().Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"all_together"},"source":{"index":["twitter","blog"],"type":["tweet","post"]}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithSourceAndSize(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter").Sort("date", false)
- dst := NewReindexDestination().Index("new_twitter")
- out, err := client.Reindex().Size(10000).Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"new_twitter"},"size":10000,"source":{"index":"twitter","sort":[{"date":{"order":"desc"}}]}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithScript(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("twitter")
- dst := NewReindexDestination().Index("new_twitter").VersionType("external")
- scr := NewScriptInline("if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}")
- out, err := client.Reindex().Source(src).Destination(dst).Script(scr).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"new_twitter","version_type":"external"},"script":{"inline":"if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}"},"source":{"index":"twitter"}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindexSourceWithRouting(t *testing.T) {
- client := setupTestClient(t)
- src := NewReindexSource().Index("source").Query(NewMatchQuery("company", "cat"))
- dst := NewReindexDestination().Index("dest").Routing("=cat")
- out, err := client.Reindex().Source(src).Destination(dst).getBody()
- if err != nil {
- t.Fatal(err)
- }
- b, err := json.Marshal(out)
- if err != nil {
- t.Fatal(err)
- }
- got := string(b)
- want := `{"dest":{"index":"dest","routing":"=cat"},"source":{"index":"source","query":{"match":{"company":{"query":"cat"}}}}}`
- if got != want {
- t.Fatalf("\ngot %s\nwant %s", got, want)
- }
- }
- func TestReindex(t *testing.T) {
- client := setupTestClientAndCreateIndexAndAddDocs(t) // , SetTraceLog(log.New(os.Stdout, "", 0)))
- esversion, err := client.ElasticsearchVersion(DefaultURL)
- if err != nil {
- t.Fatal(err)
- }
- if esversion < "2.3.0" {
- t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
- }
- sourceCount, err := client.Count(testIndexName).Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if sourceCount <= 0 {
- t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
- }
- targetCount, err := client.Count(testIndexName2).Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if targetCount != 0 {
- t.Fatalf("expected %d documents; got: %d", 0, targetCount)
- }
- // Simple copying
- src := NewReindexSource().Index(testIndexName)
- dst := NewReindexDestination().Index(testIndexName2)
- res, err := client.Reindex().Source(src).Destination(dst).Refresh("true").Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if res == nil {
- t.Fatal("expected result != nil")
- }
- if res.Total != sourceCount {
- t.Errorf("expected %d, got %d", sourceCount, res.Total)
- }
- if res.Updated != 0 {
- t.Errorf("expected %d, got %d", 0, res.Updated)
- }
- if res.Created != sourceCount {
- t.Errorf("expected %d, got %d", sourceCount, res.Created)
- }
- targetCount, err = client.Count(testIndexName2).Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if targetCount != sourceCount {
- t.Fatalf("expected %d documents; got: %d", sourceCount, targetCount)
- }
- }
- func TestReindexAsync(t *testing.T) {
- client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0)))
- esversion, err := client.ElasticsearchVersion(DefaultURL)
- if err != nil {
- t.Fatal(err)
- }
- if esversion < "2.3.0" {
- t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
- }
- sourceCount, err := client.Count(testIndexName).Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if sourceCount <= 0 {
- t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
- }
- targetCount, err := client.Count(testIndexName2).Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if targetCount != 0 {
- t.Fatalf("expected %d documents; got: %d", 0, targetCount)
- }
- // Simple copying
- src := NewReindexSource().Index(testIndexName)
- dst := NewReindexDestination().Index(testIndexName2)
- res, err := client.Reindex().Source(src).Destination(dst).DoAsync(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if res == nil {
- t.Fatal("expected result != nil")
- }
- if res.TaskId == "" {
- t.Errorf("expected a task id, got %+v", res)
- }
- tasksGetTask := client.TasksGetTask()
- taskStatus, err := tasksGetTask.TaskId(res.TaskId).Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if taskStatus == nil {
- t.Fatal("expected task status result != nil")
- }
- }
- func TestReindexWithWaitForCompletionTrueCannotBeStarted(t *testing.T) {
- client := setupTestClientAndCreateIndexAndAddDocs(t)
- esversion, err := client.ElasticsearchVersion(DefaultURL)
- if err != nil {
- t.Fatal(err)
- }
- if esversion < "2.3.0" {
- t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
- }
- sourceCount, err := client.Count(testIndexName).Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if sourceCount <= 0 {
- t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
- }
- targetCount, err := client.Count(testIndexName2).Do(context.TODO())
- if err != nil {
- t.Fatal(err)
- }
- if targetCount != 0 {
- t.Fatalf("expected %d documents; got: %d", 0, targetCount)
- }
- // DoAsync should fail when WaitForCompletion is true
- src := NewReindexSource().Index(testIndexName)
- dst := NewReindexDestination().Index(testIndexName2)
- _, err = client.Reindex().Source(src).Destination(dst).WaitForCompletion(true).DoAsync(context.TODO())
- if err == nil {
- t.Fatal("error should have been returned")
- }
- }
|