bulk_insert.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. // BulkInsert illustrates how to bulk insert documents into Elasticsearch.
  5. //
  6. // It uses two goroutines to do so. The first creates a simple document
  7. // and sends it to the second via a channel. The second goroutine collects
  8. // those documents, creates a bulk request that is added to a Bulk service
  9. // and committed to Elasticsearch after reaching a number of documents.
  10. // The number of documents after which a commit happens can be specified
  11. // via the "bulk-size" flag.
  12. //
  13. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html
  14. // for details on the Bulk API in Elasticsearch.
  15. //
  16. // Example
  17. //
  18. // Bulk index 100.000 documents into the index "warehouse", type "product",
  19. // committing every set of 1.000 documents.
  20. //
  21. // bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000
  22. //
  23. package main
  24. import (
  25. "context"
  26. "encoding/base64"
  27. "errors"
  28. "flag"
  29. "fmt"
  30. "log"
  31. "math/rand"
  32. "sync/atomic"
  33. "time"
  34. "golang.org/x/sync/errgroup"
  35. "gopkg.in/olivere/elastic.v5"
  36. )
  37. func main() {
  38. var (
  39. url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
  40. index = flag.String("index", "", "Elasticsearch index name")
  41. typ = flag.String("type", "", "Elasticsearch type name")
  42. sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
  43. n = flag.Int("n", 0, "Number of documents to bulk insert")
  44. bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing")
  45. )
  46. flag.Parse()
  47. log.SetFlags(0)
  48. rand.Seed(time.Now().UnixNano())
  49. if *url == "" {
  50. log.Fatal("missing url parameter")
  51. }
  52. if *index == "" {
  53. log.Fatal("missing index parameter")
  54. }
  55. if *typ == "" {
  56. log.Fatal("missing type parameter")
  57. }
  58. if *n <= 0 {
  59. log.Fatal("n must be a positive number")
  60. }
  61. if *bulkSize <= 0 {
  62. log.Fatal("bulk-size must be a positive number")
  63. }
  64. // Create an Elasticsearch client
  65. client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
  66. if err != nil {
  67. log.Fatal(err)
  68. }
  69. // Setup a group of goroutines from the excellent errgroup package
  70. g, ctx := errgroup.WithContext(context.TODO())
  71. // The first goroutine will emit documents and send it to the second goroutine
  72. // via the docsc channel.
  73. // The second Goroutine will simply bulk insert the documents.
  74. type doc struct {
  75. ID string `json:"id"`
  76. Timestamp time.Time `json:"@timestamp"`
  77. }
  78. docsc := make(chan doc)
  79. begin := time.Now()
  80. // Goroutine to create documents
  81. g.Go(func() error {
  82. defer close(docsc)
  83. buf := make([]byte, 32)
  84. for i := 0; i < *n; i++ {
  85. // Generate a random ID
  86. _, err := rand.Read(buf)
  87. if err != nil {
  88. return err
  89. }
  90. id := base64.URLEncoding.EncodeToString(buf)
  91. // Construct the document
  92. d := doc{
  93. ID: id,
  94. Timestamp: time.Now(),
  95. }
  96. // Send over to 2nd goroutine, or cancel
  97. select {
  98. case docsc <- d:
  99. case <-ctx.Done():
  100. return ctx.Err()
  101. }
  102. }
  103. return nil
  104. })
  105. // Second goroutine will consume the documents sent from the first and bulk insert into ES
  106. var total uint64
  107. g.Go(func() error {
  108. bulk := client.Bulk().Index(*index).Type(*typ)
  109. for d := range docsc {
  110. // Simple progress
  111. current := atomic.AddUint64(&total, 1)
  112. dur := time.Since(begin).Seconds()
  113. sec := int(dur)
  114. pps := int64(float64(current) / dur)
  115. fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60)
  116. // Enqueue the document
  117. bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d))
  118. if bulk.NumberOfActions() >= *bulkSize {
  119. // Commit
  120. res, err := bulk.Do(ctx)
  121. if err != nil {
  122. return err
  123. }
  124. if res.Errors {
  125. // Look up the failed documents with res.Failed(), and e.g. recommit
  126. return errors.New("bulk commit failed")
  127. }
  128. // "bulk" is reset after Do, so you can reuse it
  129. }
  130. select {
  131. default:
  132. case <-ctx.Done():
  133. return ctx.Err()
  134. }
  135. }
  136. // Commit the final batch before exiting
  137. if bulk.NumberOfActions() > 0 {
  138. _, err = bulk.Do(ctx)
  139. if err != nil {
  140. return err
  141. }
  142. }
  143. return nil
  144. })
  145. // Wait until all goroutines are finished
  146. if err := g.Wait(); err != nil {
  147. log.Fatal(err)
  148. }
  149. // Final results
  150. dur := time.Since(begin).Seconds()
  151. sec := int(dur)
  152. pps := int64(float64(total) / dur)
  153. fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60)
  154. }