123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- // Copyright 2012-present Oliver Eilhard. All rights reserved.
- // Use of this source code is governed by a MIT-license.
- // See http://olivere.mit-license.org/license.txt for details.
- // BulkInsert illustrates how to bulk insert documents into Elasticsearch.
- //
- // It uses two goroutines to do so. The first creates a simple document
- // and sends it to the second via a channel. The second goroutine collects
- // those documents, creates a bulk request that is added to a Bulk service
- // and committed to Elasticsearch after reaching a number of documents.
- // The number of documents after which a commit happens can be specified
- // via the "bulk-size" flag.
- //
- // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html
- // for details on the Bulk API in Elasticsearch.
- //
- // Example
- //
- // Bulk index 100.000 documents into the index "warehouse", type "product",
- // committing every set of 1.000 documents.
- //
- // bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000
- //
- package main
- import (
- "context"
- "encoding/base64"
- "errors"
- "flag"
- "fmt"
- "log"
- "math/rand"
- "sync/atomic"
- "time"
- "golang.org/x/sync/errgroup"
- "gopkg.in/olivere/elastic.v5"
- )
- func main() {
- var (
- url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
- index = flag.String("index", "", "Elasticsearch index name")
- typ = flag.String("type", "", "Elasticsearch type name")
- sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
- n = flag.Int("n", 0, "Number of documents to bulk insert")
- bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing")
- )
- flag.Parse()
- log.SetFlags(0)
- rand.Seed(time.Now().UnixNano())
- if *url == "" {
- log.Fatal("missing url parameter")
- }
- if *index == "" {
- log.Fatal("missing index parameter")
- }
- if *typ == "" {
- log.Fatal("missing type parameter")
- }
- if *n <= 0 {
- log.Fatal("n must be a positive number")
- }
- if *bulkSize <= 0 {
- log.Fatal("bulk-size must be a positive number")
- }
- // Create an Elasticsearch client
- client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
- if err != nil {
- log.Fatal(err)
- }
- // Setup a group of goroutines from the excellent errgroup package
- g, ctx := errgroup.WithContext(context.TODO())
- // The first goroutine will emit documents and send it to the second goroutine
- // via the docsc channel.
- // The second Goroutine will simply bulk insert the documents.
- type doc struct {
- ID string `json:"id"`
- Timestamp time.Time `json:"@timestamp"`
- }
- docsc := make(chan doc)
- begin := time.Now()
- // Goroutine to create documents
- g.Go(func() error {
- defer close(docsc)
- buf := make([]byte, 32)
- for i := 0; i < *n; i++ {
- // Generate a random ID
- _, err := rand.Read(buf)
- if err != nil {
- return err
- }
- id := base64.URLEncoding.EncodeToString(buf)
- // Construct the document
- d := doc{
- ID: id,
- Timestamp: time.Now(),
- }
- // Send over to 2nd goroutine, or cancel
- select {
- case docsc <- d:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- return nil
- })
- // Second goroutine will consume the documents sent from the first and bulk insert into ES
- var total uint64
- g.Go(func() error {
- bulk := client.Bulk().Index(*index).Type(*typ)
- for d := range docsc {
- // Simple progress
- current := atomic.AddUint64(&total, 1)
- dur := time.Since(begin).Seconds()
- sec := int(dur)
- pps := int64(float64(current) / dur)
- fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60)
- // Enqueue the document
- bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d))
- if bulk.NumberOfActions() >= *bulkSize {
- // Commit
- res, err := bulk.Do(ctx)
- if err != nil {
- return err
- }
- if res.Errors {
- // Look up the failed documents with res.Failed(), and e.g. recommit
- return errors.New("bulk commit failed")
- }
- // "bulk" is reset after Do, so you can reuse it
- }
- select {
- default:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // Commit the final batch before exiting
- if bulk.NumberOfActions() > 0 {
- _, err = bulk.Do(ctx)
- if err != nil {
- return err
- }
- }
- return nil
- })
- // Wait until all goroutines are finished
- if err := g.Wait(); err != nil {
- log.Fatal(err)
- }
- // Final results
- dur := time.Since(begin).Seconds()
- sec := int(dur)
- pps := int64(float64(total) / dur)
- fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60)
- }
|