reindex_test.go 12 KB


  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "context"
  7. "encoding/json"
  8. "testing"
  9. )
  10. func TestReindexSourceWithBodyMap(t *testing.T) {
  11. client := setupTestClient(t)
  12. out, err := client.Reindex().Body(map[string]interface{}{
  13. "source": map[string]interface{}{
  14. "index": "twitter",
  15. },
  16. "dest": map[string]interface{}{
  17. "index": "new_twitter",
  18. },
  19. }).getBody()
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. b, err := json.Marshal(out)
  24. if err != nil {
  25. t.Fatal(err)
  26. }
  27. got := string(b)
  28. want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter"}}`
  29. if got != want {
  30. t.Fatalf("\ngot %s\nwant %s", got, want)
  31. }
  32. }
  33. func TestReindexSourceWithBodyString(t *testing.T) {
  34. client := setupTestClient(t)
  35. got, err := client.Reindex().Body(`{"source":{"index":"twitter"},"dest":{"index":"new_twitter"}}`).getBody()
  36. if err != nil {
  37. t.Fatal(err)
  38. }
  39. want := `{"source":{"index":"twitter"},"dest":{"index":"new_twitter"}}`
  40. if got != want {
  41. t.Fatalf("\ngot %s\nwant %s", got, want)
  42. }
  43. }
  44. func TestReindexSourceWithSourceIndexAndDestinationIndex(t *testing.T) {
  45. client := setupTestClient(t)
  46. out, err := client.Reindex().SourceIndex("twitter").DestinationIndex("new_twitter").getBody()
  47. if err != nil {
  48. t.Fatal(err)
  49. }
  50. b, err := json.Marshal(out)
  51. if err != nil {
  52. t.Fatal(err)
  53. }
  54. got := string(b)
  55. want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter"}}`
  56. if got != want {
  57. t.Fatalf("\ngot %s\nwant %s", got, want)
  58. }
  59. }
  60. func TestReindexSourceWithSourceAndDestinationAndVersionType(t *testing.T) {
  61. client := setupTestClient(t)
  62. src := NewReindexSource().Index("twitter")
  63. dst := NewReindexDestination().Index("new_twitter").VersionType("external")
  64. out, err := client.Reindex().Source(src).Destination(dst).getBody()
  65. if err != nil {
  66. t.Fatal(err)
  67. }
  68. b, err := json.Marshal(out)
  69. if err != nil {
  70. t.Fatal(err)
  71. }
  72. got := string(b)
  73. want := `{"dest":{"index":"new_twitter","version_type":"external"},"source":{"index":"twitter"}}`
  74. if got != want {
  75. t.Fatalf("\ngot %s\nwant %s", got, want)
  76. }
  77. }
  78. func TestReindexSourceWithSourceAndRemoteAndDestination(t *testing.T) {
  79. client := setupTestClient(t)
  80. src := NewReindexSource().Index("twitter").RemoteInfo(
  81. NewReindexRemoteInfo().Host("http://otherhost:9200").
  82. Username("alice").
  83. Password("secret").
  84. ConnectTimeout("10s").
  85. SocketTimeout("1m"),
  86. )
  87. dst := NewReindexDestination().Index("new_twitter")
  88. out, err := client.Reindex().Source(src).Destination(dst).getBody()
  89. if err != nil {
  90. t.Fatal(err)
  91. }
  92. b, err := json.Marshal(out)
  93. if err != nil {
  94. t.Fatal(err)
  95. }
  96. got := string(b)
  97. want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter","remote":{"connect_timeout":"10s","host":"http://otherhost:9200","password":"secret","socket_timeout":"1m","username":"alice"}}}`
  98. if got != want {
  99. t.Fatalf("\ngot %s\nwant %s", got, want)
  100. }
  101. }
  102. func TestReindexSourceWithSourceAndDestinationAndOpType(t *testing.T) {
  103. client := setupTestClient(t)
  104. src := NewReindexSource().Index("twitter")
  105. dst := NewReindexDestination().Index("new_twitter").OpType("create")
  106. out, err := client.Reindex().Source(src).Destination(dst).getBody()
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. b, err := json.Marshal(out)
  111. if err != nil {
  112. t.Fatal(err)
  113. }
  114. got := string(b)
  115. want := `{"dest":{"index":"new_twitter","op_type":"create"},"source":{"index":"twitter"}}`
  116. if got != want {
  117. t.Fatalf("\ngot %s\nwant %s", got, want)
  118. }
  119. }
  120. func TestReindexSourceWithConflictsProceed(t *testing.T) {
  121. client := setupTestClient(t)
  122. src := NewReindexSource().Index("twitter")
  123. dst := NewReindexDestination().Index("new_twitter").OpType("create")
  124. out, err := client.Reindex().Conflicts("proceed").Source(src).Destination(dst).getBody()
  125. if err != nil {
  126. t.Fatal(err)
  127. }
  128. b, err := json.Marshal(out)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. got := string(b)
  133. want := `{"conflicts":"proceed","dest":{"index":"new_twitter","op_type":"create"},"source":{"index":"twitter"}}`
  134. if got != want {
  135. t.Fatalf("\ngot %s\nwant %s", got, want)
  136. }
  137. }
  138. func TestReindexSourceWithProceedOnVersionConflict(t *testing.T) {
  139. client := setupTestClient(t)
  140. src := NewReindexSource().Index("twitter")
  141. dst := NewReindexDestination().Index("new_twitter").OpType("create")
  142. out, err := client.Reindex().ProceedOnVersionConflict().Source(src).Destination(dst).getBody()
  143. if err != nil {
  144. t.Fatal(err)
  145. }
  146. b, err := json.Marshal(out)
  147. if err != nil {
  148. t.Fatal(err)
  149. }
  150. got := string(b)
  151. want := `{"conflicts":"proceed","dest":{"index":"new_twitter","op_type":"create"},"source":{"index":"twitter"}}`
  152. if got != want {
  153. t.Fatalf("\ngot %s\nwant %s", got, want)
  154. }
  155. }
  156. func TestReindexSourceWithQuery(t *testing.T) {
  157. client := setupTestClient(t)
  158. src := NewReindexSource().Index("twitter").Type("tweet").Query(NewTermQuery("user", "olivere"))
  159. dst := NewReindexDestination().Index("new_twitter")
  160. out, err := client.Reindex().Source(src).Destination(dst).getBody()
  161. if err != nil {
  162. t.Fatal(err)
  163. }
  164. b, err := json.Marshal(out)
  165. if err != nil {
  166. t.Fatal(err)
  167. }
  168. got := string(b)
  169. want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter","query":{"term":{"user":"olivere"}},"type":"tweet"}}`
  170. if got != want {
  171. t.Fatalf("\ngot %s\nwant %s", got, want)
  172. }
  173. }
  174. func TestReindexSourceWithMultipleSourceIndicesAndTypes(t *testing.T) {
  175. client := setupTestClient(t)
  176. src := NewReindexSource().Index("twitter", "blog").Type("tweet", "post")
  177. dst := NewReindexDestination().Index("all_together")
  178. out, err := client.Reindex().Source(src).Destination(dst).getBody()
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. b, err := json.Marshal(out)
  183. if err != nil {
  184. t.Fatal(err)
  185. }
  186. got := string(b)
  187. want := `{"dest":{"index":"all_together"},"source":{"index":["twitter","blog"],"type":["tweet","post"]}}`
  188. if got != want {
  189. t.Fatalf("\ngot %s\nwant %s", got, want)
  190. }
  191. }
  192. func TestReindexSourceWithSourceAndSize(t *testing.T) {
  193. client := setupTestClient(t)
  194. src := NewReindexSource().Index("twitter").Sort("date", false)
  195. dst := NewReindexDestination().Index("new_twitter")
  196. out, err := client.Reindex().Size(10000).Source(src).Destination(dst).getBody()
  197. if err != nil {
  198. t.Fatal(err)
  199. }
  200. b, err := json.Marshal(out)
  201. if err != nil {
  202. t.Fatal(err)
  203. }
  204. got := string(b)
  205. want := `{"dest":{"index":"new_twitter"},"size":10000,"source":{"index":"twitter","sort":[{"date":{"order":"desc"}}]}}`
  206. if got != want {
  207. t.Fatalf("\ngot %s\nwant %s", got, want)
  208. }
  209. }
  210. func TestReindexSourceWithScript(t *testing.T) {
  211. client := setupTestClient(t)
  212. src := NewReindexSource().Index("twitter")
  213. dst := NewReindexDestination().Index("new_twitter").VersionType("external")
  214. scr := NewScriptInline("if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}")
  215. out, err := client.Reindex().Source(src).Destination(dst).Script(scr).getBody()
  216. if err != nil {
  217. t.Fatal(err)
  218. }
  219. b, err := json.Marshal(out)
  220. if err != nil {
  221. t.Fatal(err)
  222. }
  223. got := string(b)
  224. want := `{"dest":{"index":"new_twitter","version_type":"external"},"script":{"inline":"if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}"},"source":{"index":"twitter"}}`
  225. if got != want {
  226. t.Fatalf("\ngot %s\nwant %s", got, want)
  227. }
  228. }
  229. func TestReindexSourceWithRouting(t *testing.T) {
  230. client := setupTestClient(t)
  231. src := NewReindexSource().Index("source").Query(NewMatchQuery("company", "cat"))
  232. dst := NewReindexDestination().Index("dest").Routing("=cat")
  233. out, err := client.Reindex().Source(src).Destination(dst).getBody()
  234. if err != nil {
  235. t.Fatal(err)
  236. }
  237. b, err := json.Marshal(out)
  238. if err != nil {
  239. t.Fatal(err)
  240. }
  241. got := string(b)
  242. want := `{"dest":{"index":"dest","routing":"=cat"},"source":{"index":"source","query":{"match":{"company":{"query":"cat"}}}}}`
  243. if got != want {
  244. t.Fatalf("\ngot %s\nwant %s", got, want)
  245. }
  246. }
  247. func TestReindex(t *testing.T) {
  248. client := setupTestClientAndCreateIndexAndAddDocs(t) // , SetTraceLog(log.New(os.Stdout, "", 0)))
  249. esversion, err := client.ElasticsearchVersion(DefaultURL)
  250. if err != nil {
  251. t.Fatal(err)
  252. }
  253. if esversion < "2.3.0" {
  254. t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
  255. }
  256. sourceCount, err := client.Count(testIndexName).Do(context.TODO())
  257. if err != nil {
  258. t.Fatal(err)
  259. }
  260. if sourceCount <= 0 {
  261. t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
  262. }
  263. targetCount, err := client.Count(testIndexName2).Do(context.TODO())
  264. if err != nil {
  265. t.Fatal(err)
  266. }
  267. if targetCount != 0 {
  268. t.Fatalf("expected %d documents; got: %d", 0, targetCount)
  269. }
  270. // Simple copying
  271. src := NewReindexSource().Index(testIndexName)
  272. dst := NewReindexDestination().Index(testIndexName2)
  273. res, err := client.Reindex().Source(src).Destination(dst).Refresh("true").Do(context.TODO())
  274. if err != nil {
  275. t.Fatal(err)
  276. }
  277. if res == nil {
  278. t.Fatal("expected result != nil")
  279. }
  280. if res.Total != sourceCount {
  281. t.Errorf("expected %d, got %d", sourceCount, res.Total)
  282. }
  283. if res.Updated != 0 {
  284. t.Errorf("expected %d, got %d", 0, res.Updated)
  285. }
  286. if res.Created != sourceCount {
  287. t.Errorf("expected %d, got %d", sourceCount, res.Created)
  288. }
  289. targetCount, err = client.Count(testIndexName2).Do(context.TODO())
  290. if err != nil {
  291. t.Fatal(err)
  292. }
  293. if targetCount != sourceCount {
  294. t.Fatalf("expected %d documents; got: %d", sourceCount, targetCount)
  295. }
  296. }
  297. func TestReindexAsync(t *testing.T) {
  298. client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0)))
  299. esversion, err := client.ElasticsearchVersion(DefaultURL)
  300. if err != nil {
  301. t.Fatal(err)
  302. }
  303. if esversion < "2.3.0" {
  304. t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
  305. }
  306. sourceCount, err := client.Count(testIndexName).Do(context.TODO())
  307. if err != nil {
  308. t.Fatal(err)
  309. }
  310. if sourceCount <= 0 {
  311. t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
  312. }
  313. targetCount, err := client.Count(testIndexName2).Do(context.TODO())
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. if targetCount != 0 {
  318. t.Fatalf("expected %d documents; got: %d", 0, targetCount)
  319. }
  320. // Simple copying
  321. src := NewReindexSource().Index(testIndexName)
  322. dst := NewReindexDestination().Index(testIndexName2)
  323. res, err := client.Reindex().Source(src).Destination(dst).DoAsync(context.TODO())
  324. if err != nil {
  325. t.Fatal(err)
  326. }
  327. if res == nil {
  328. t.Fatal("expected result != nil")
  329. }
  330. if res.TaskId == "" {
  331. t.Errorf("expected a task id, got %+v", res)
  332. }
  333. tasksGetTask := client.TasksGetTask()
  334. taskStatus, err := tasksGetTask.TaskId(res.TaskId).Do(context.TODO())
  335. if err != nil {
  336. t.Fatal(err)
  337. }
  338. if taskStatus == nil {
  339. t.Fatal("expected task status result != nil")
  340. }
  341. }
  342. func TestReindexWithWaitForCompletionTrueCannotBeStarted(t *testing.T) {
  343. client := setupTestClientAndCreateIndexAndAddDocs(t)
  344. esversion, err := client.ElasticsearchVersion(DefaultURL)
  345. if err != nil {
  346. t.Fatal(err)
  347. }
  348. if esversion < "2.3.0" {
  349. t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
  350. }
  351. sourceCount, err := client.Count(testIndexName).Do(context.TODO())
  352. if err != nil {
  353. t.Fatal(err)
  354. }
  355. if sourceCount <= 0 {
  356. t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
  357. }
  358. targetCount, err := client.Count(testIndexName2).Do(context.TODO())
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. if targetCount != 0 {
  363. t.Fatalf("expected %d documents; got: %d", 0, targetCount)
  364. }
  365. // DoAsync should fail when WaitForCompletion is true
  366. src := NewReindexSource().Index(testIndexName)
  367. dst := NewReindexDestination().Index(testIndexName2)
  368. _, err = client.Reindex().Source(src).Destination(dst).WaitForCompletion(true).DoAsync(context.TODO())
  369. if err == nil {
  370. t.Fatal("error should have been returned")
  371. }
  372. }