bulk_processor_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "context"
  7. "fmt"
  8. "math/rand"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. )
  13. func TestBulkProcessorDefaults(t *testing.T) {
  14. client := setupTestClientAndCreateIndex(t)
  15. p := client.BulkProcessor()
  16. if p == nil {
  17. t.Fatalf("expected BulkProcessorService; got: %v", p)
  18. }
  19. if got, want := p.name, ""; got != want {
  20. t.Errorf("expected %q; got: %q", want, got)
  21. }
  22. if got, want := p.numWorkers, 1; got != want {
  23. t.Errorf("expected %d; got: %d", want, got)
  24. }
  25. if got, want := p.bulkActions, 1000; got != want {
  26. t.Errorf("expected %d; got: %d", want, got)
  27. }
  28. if got, want := p.bulkSize, 5*1024*1024; got != want {
  29. t.Errorf("expected %d; got: %d", want, got)
  30. }
  31. if got, want := p.flushInterval, time.Duration(0); got != want {
  32. t.Errorf("expected %v; got: %v", want, got)
  33. }
  34. if got, want := p.wantStats, false; got != want {
  35. t.Errorf("expected %v; got: %v", want, got)
  36. }
  37. if p.backoff == nil {
  38. t.Fatalf("expected non-nill backoff; got: %v", p.backoff)
  39. }
  40. }
  41. func TestBulkProcessorCommitOnBulkActions(t *testing.T) {
  42. //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
  43. client := setupTestClientAndCreateIndex(t)
  44. testBulkProcessor(t,
  45. 10000,
  46. client.BulkProcessor().
  47. Name("Actions-1").
  48. Workers(1).
  49. BulkActions(100).
  50. BulkSize(-1),
  51. )
  52. testBulkProcessor(t,
  53. 10000,
  54. client.BulkProcessor().
  55. Name("Actions-2").
  56. Workers(2).
  57. BulkActions(100).
  58. BulkSize(-1),
  59. )
  60. }
  61. func TestBulkProcessorCommitOnBulkSize(t *testing.T) {
  62. //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
  63. client := setupTestClientAndCreateIndex(t)
  64. testBulkProcessor(t,
  65. 10000,
  66. client.BulkProcessor().
  67. Name("Size-1").
  68. Workers(1).
  69. BulkActions(-1).
  70. BulkSize(64*1024),
  71. )
  72. testBulkProcessor(t,
  73. 10000,
  74. client.BulkProcessor().
  75. Name("Size-2").
  76. Workers(2).
  77. BulkActions(-1).
  78. BulkSize(64*1024),
  79. )
  80. }
  81. func TestBulkProcessorBasedOnFlushInterval(t *testing.T) {
  82. //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
  83. client := setupTestClientAndCreateIndex(t)
  84. var beforeRequests int64
  85. var befores int64
  86. var afters int64
  87. var failures int64
  88. var afterRequests int64
  89. beforeFn := func(executionId int64, requests []BulkableRequest) {
  90. atomic.AddInt64(&beforeRequests, int64(len(requests)))
  91. atomic.AddInt64(&befores, 1)
  92. }
  93. afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) {
  94. atomic.AddInt64(&afters, 1)
  95. if err != nil {
  96. atomic.AddInt64(&failures, 1)
  97. }
  98. atomic.AddInt64(&afterRequests, int64(len(requests)))
  99. }
  100. svc := client.BulkProcessor().
  101. Name("FlushInterval-1").
  102. Workers(2).
  103. BulkActions(-1).
  104. BulkSize(-1).
  105. FlushInterval(1 * time.Second).
  106. Before(beforeFn).
  107. After(afterFn)
  108. p, err := svc.Do(context.Background())
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. const numDocs = 1000 // low-enough number that flush should be invoked
  113. for i := 1; i <= numDocs; i++ {
  114. tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
  115. request := NewBulkIndexRequest().Index(testIndexName).Type("tweet").Id(fmt.Sprintf("%d", i)).Doc(tweet)
  116. p.Add(request)
  117. }
  118. // Should flush at least once
  119. time.Sleep(2 * time.Second)
  120. err = p.Close()
  121. if err != nil {
  122. t.Fatal(err)
  123. }
  124. if p.stats.Flushed == 0 {
  125. t.Errorf("expected at least 1 flush; got: %d", p.stats.Flushed)
  126. }
  127. if got, want := beforeRequests, int64(numDocs); got != want {
  128. t.Errorf("expected %d requests to before callback; got: %d", want, got)
  129. }
  130. if got, want := afterRequests, int64(numDocs); got != want {
  131. t.Errorf("expected %d requests to after callback; got: %d", want, got)
  132. }
  133. if befores == 0 {
  134. t.Error("expected at least 1 call to before callback")
  135. }
  136. if afters == 0 {
  137. t.Error("expected at least 1 call to after callback")
  138. }
  139. if failures != 0 {
  140. t.Errorf("expected 0 calls to failure callback; got: %d", failures)
  141. }
  142. // Check number of documents that were bulk indexed
  143. _, err = p.c.Flush(testIndexName).Do(context.TODO())
  144. if err != nil {
  145. t.Fatal(err)
  146. }
  147. count, err := p.c.Count(testIndexName).Do(context.TODO())
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. if count != int64(numDocs) {
  152. t.Fatalf("expected %d documents; got: %d", numDocs, count)
  153. }
  154. }
  155. func TestBulkProcessorClose(t *testing.T) {
  156. //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
  157. client := setupTestClientAndCreateIndex(t)
  158. var beforeRequests int64
  159. var befores int64
  160. var afters int64
  161. var failures int64
  162. var afterRequests int64
  163. beforeFn := func(executionId int64, requests []BulkableRequest) {
  164. atomic.AddInt64(&beforeRequests, int64(len(requests)))
  165. atomic.AddInt64(&befores, 1)
  166. }
  167. afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) {
  168. atomic.AddInt64(&afters, 1)
  169. if err != nil {
  170. atomic.AddInt64(&failures, 1)
  171. }
  172. atomic.AddInt64(&afterRequests, int64(len(requests)))
  173. }
  174. p, err := client.BulkProcessor().
  175. Name("FlushInterval-1").
  176. Workers(2).
  177. BulkActions(-1).
  178. BulkSize(-1).
  179. FlushInterval(30 * time.Second). // 30 seconds to flush
  180. Before(beforeFn).After(afterFn).
  181. Do(context.Background())
  182. if err != nil {
  183. t.Fatal(err)
  184. }
  185. const numDocs = 1000 // low-enough number that flush should be invoked
  186. for i := 1; i <= numDocs; i++ {
  187. tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
  188. request := NewBulkIndexRequest().Index(testIndexName).Type("tweet").Id(fmt.Sprintf("%d", i)).Doc(tweet)
  189. p.Add(request)
  190. }
  191. // Should not flush because 30s > 1s
  192. time.Sleep(1 * time.Second)
  193. // Close should flush
  194. err = p.Close()
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. if p.stats.Flushed != 0 {
  199. t.Errorf("expected no flush; got: %d", p.stats.Flushed)
  200. }
  201. if got, want := beforeRequests, int64(numDocs); got != want {
  202. t.Errorf("expected %d requests to before callback; got: %d", want, got)
  203. }
  204. if got, want := afterRequests, int64(numDocs); got != want {
  205. t.Errorf("expected %d requests to after callback; got: %d", want, got)
  206. }
  207. if befores == 0 {
  208. t.Error("expected at least 1 call to before callback")
  209. }
  210. if afters == 0 {
  211. t.Error("expected at least 1 call to after callback")
  212. }
  213. if failures != 0 {
  214. t.Errorf("expected 0 calls to failure callback; got: %d", failures)
  215. }
  216. // Check number of documents that were bulk indexed
  217. _, err = p.c.Flush(testIndexName).Do(context.TODO())
  218. if err != nil {
  219. t.Fatal(err)
  220. }
  221. count, err := p.c.Count(testIndexName).Do(context.TODO())
  222. if err != nil {
  223. t.Fatal(err)
  224. }
  225. if count != int64(numDocs) {
  226. t.Fatalf("expected %d documents; got: %d", numDocs, count)
  227. }
  228. }
  229. func TestBulkProcessorFlush(t *testing.T) {
  230. //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
  231. client := setupTestClientAndCreateIndex(t)
  232. p, err := client.BulkProcessor().
  233. Name("ManualFlush").
  234. Workers(10).
  235. BulkActions(-1).
  236. BulkSize(-1).
  237. FlushInterval(30 * time.Second). // 30 seconds to flush
  238. Stats(true).
  239. Do(context.Background())
  240. if err != nil {
  241. t.Fatal(err)
  242. }
  243. const numDocs = 100
  244. for i := 1; i <= numDocs; i++ {
  245. tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
  246. request := NewBulkIndexRequest().Index(testIndexName).Type("tweet").Id(fmt.Sprintf("%d", i)).Doc(tweet)
  247. p.Add(request)
  248. }
  249. // Should not flush because 30s > 1s
  250. time.Sleep(1 * time.Second)
  251. // No flush yet
  252. stats := p.Stats()
  253. if stats.Flushed != 0 {
  254. t.Errorf("expected no flush; got: %d", p.stats.Flushed)
  255. }
  256. // Manual flush
  257. err = p.Flush()
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. time.Sleep(1 * time.Second)
  262. // Now flushed
  263. stats = p.Stats()
  264. if got, want := p.stats.Flushed, int64(1); got != want {
  265. t.Errorf("expected %d flush; got: %d", want, got)
  266. }
  267. // Close should not start another flush
  268. err = p.Close()
  269. if err != nil {
  270. t.Fatal(err)
  271. }
  272. // Still 1 flush
  273. stats = p.Stats()
  274. if got, want := p.stats.Flushed, int64(1); got != want {
  275. t.Errorf("expected %d flush; got: %d", want, got)
  276. }
  277. // Check number of documents that were bulk indexed
  278. _, err = p.c.Flush(testIndexName).Do(context.TODO())
  279. if err != nil {
  280. t.Fatal(err)
  281. }
  282. count, err := p.c.Count(testIndexName).Do(context.TODO())
  283. if err != nil {
  284. t.Fatal(err)
  285. }
  286. if count != int64(numDocs) {
  287. t.Fatalf("expected %d documents; got: %d", numDocs, count)
  288. }
  289. }
  290. // -- Helper --
  291. func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) {
  292. var beforeRequests int64
  293. var befores int64
  294. var afters int64
  295. var failures int64
  296. var afterRequests int64
  297. beforeFn := func(executionId int64, requests []BulkableRequest) {
  298. atomic.AddInt64(&beforeRequests, int64(len(requests)))
  299. atomic.AddInt64(&befores, 1)
  300. }
  301. afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) {
  302. atomic.AddInt64(&afters, 1)
  303. if err != nil {
  304. atomic.AddInt64(&failures, 1)
  305. }
  306. atomic.AddInt64(&afterRequests, int64(len(requests)))
  307. }
  308. p, err := svc.Before(beforeFn).After(afterFn).Stats(true).Do(context.Background())
  309. if err != nil {
  310. t.Fatal(err)
  311. }
  312. for i := 1; i <= numDocs; i++ {
  313. tweet := tweet{User: "olivere", Message: fmt.Sprintf("%07d. %s", i, randomString(1+rand.Intn(63)))}
  314. request := NewBulkIndexRequest().Index(testIndexName).Type("tweet").Id(fmt.Sprintf("%d", i)).Doc(tweet)
  315. p.Add(request)
  316. }
  317. err = p.Close()
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. stats := p.Stats()
  322. if stats.Flushed != 0 {
  323. t.Errorf("expected no flush; got: %d", stats.Flushed)
  324. }
  325. if stats.Committed <= 0 {
  326. t.Errorf("expected committed > %d; got: %d", 0, stats.Committed)
  327. }
  328. if got, want := stats.Indexed, int64(numDocs); got != want {
  329. t.Errorf("expected indexed = %d; got: %d", want, got)
  330. }
  331. if got, want := stats.Created, int64(0); got != want {
  332. t.Errorf("expected created = %d; got: %d", want, got)
  333. }
  334. if got, want := stats.Updated, int64(0); got != want {
  335. t.Errorf("expected updated = %d; got: %d", want, got)
  336. }
  337. if got, want := stats.Deleted, int64(0); got != want {
  338. t.Errorf("expected deleted = %d; got: %d", want, got)
  339. }
  340. if got, want := stats.Succeeded, int64(numDocs); got != want {
  341. t.Errorf("expected succeeded = %d; got: %d", want, got)
  342. }
  343. if got, want := stats.Failed, int64(0); got != want {
  344. t.Errorf("expected failed = %d; got: %d", want, got)
  345. }
  346. if got, want := beforeRequests, int64(numDocs); got != want {
  347. t.Errorf("expected %d requests to before callback; got: %d", want, got)
  348. }
  349. if got, want := afterRequests, int64(numDocs); got != want {
  350. t.Errorf("expected %d requests to after callback; got: %d", want, got)
  351. }
  352. if befores == 0 {
  353. t.Error("expected at least 1 call to before callback")
  354. }
  355. if afters == 0 {
  356. t.Error("expected at least 1 call to after callback")
  357. }
  358. if failures != 0 {
  359. t.Errorf("expected 0 calls to failure callback; got: %d", failures)
  360. }
  361. // Check number of documents that were bulk indexed
  362. _, err = p.c.Flush(testIndexName).Do(context.TODO())
  363. if err != nil {
  364. t.Fatal(err)
  365. }
  366. count, err := p.c.Count(testIndexName).Do(context.TODO())
  367. if err != nil {
  368. t.Fatal(err)
  369. }
  370. if count != int64(numDocs) {
  371. t.Fatalf("expected %d documents; got: %d", numDocs, count)
  372. }
  373. }