123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- // Copyright 2012-present Oliver Eilhard. All rights reserved.
- // Use of this source code is governed by a MIT-license.
- // See http://olivere.mit-license.org/license.txt for details.
- package elastic
- import (
- "context"
- "net"
- "sync"
- "sync/atomic"
- "time"
- )
- // BulkProcessorService allows to easily process bulk requests. It allows setting
- // policies when to flush new bulk requests, e.g. based on a number of actions,
- // on the size of the actions, and/or to flush periodically. It also allows
- // to control the number of concurrent bulk requests allowed to be executed
- // in parallel.
- //
- // BulkProcessorService, by default, commits either every 1000 requests or when the
- // (estimated) size of the bulk requests exceeds 5 MB. However, it does not
- // commit periodically. BulkProcessorService also does retry by default, using
- // an exponential backoff algorithm.
- //
- // The caller is responsible for setting the index and type on every
- // bulk request added to BulkProcessorService.
- //
- // BulkProcessorService takes ideas from the BulkProcessor of the
- // Elasticsearch Java API as documented in
- // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.
- type BulkProcessorService struct {
- c *Client
- beforeFn BulkBeforeFunc
- afterFn BulkAfterFunc
- name string // name of processor
- numWorkers int // # of workers (>= 1)
- bulkActions int // # of requests after which to commit
- bulkSize int // # of bytes after which to commit
- flushInterval time.Duration // periodic flush interval
- wantStats bool // indicates whether to gather statistics
- backoff Backoff // a custom Backoff to use for errors
- }
- // NewBulkProcessorService creates a new BulkProcessorService.
- func NewBulkProcessorService(client *Client) *BulkProcessorService {
- return &BulkProcessorService{
- c: client,
- numWorkers: 1,
- bulkActions: 1000,
- bulkSize: 5 << 20, // 5 MB
- backoff: NewExponentialBackoff(
- time.Duration(200)*time.Millisecond,
- time.Duration(10000)*time.Millisecond,
- ),
- }
- }
- // BulkBeforeFunc defines the signature of callbacks that are executed
- // before a commit to Elasticsearch.
- type BulkBeforeFunc func(executionId int64, requests []BulkableRequest)
- // BulkAfterFunc defines the signature of callbacks that are executed
- // after a commit to Elasticsearch. The err parameter signals an error.
- type BulkAfterFunc func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error)
- // Before specifies a function to be executed before bulk requests get comitted
- // to Elasticsearch.
- func (s *BulkProcessorService) Before(fn BulkBeforeFunc) *BulkProcessorService {
- s.beforeFn = fn
- return s
- }
- // After specifies a function to be executed when bulk requests have been
- // comitted to Elasticsearch. The After callback executes both when the
- // commit was successful as well as on failures.
- func (s *BulkProcessorService) After(fn BulkAfterFunc) *BulkProcessorService {
- s.afterFn = fn
- return s
- }
- // Name is an optional name to identify this bulk processor.
- func (s *BulkProcessorService) Name(name string) *BulkProcessorService {
- s.name = name
- return s
- }
- // Workers is the number of concurrent workers allowed to be
- // executed. Defaults to 1 and must be greater or equal to 1.
- func (s *BulkProcessorService) Workers(num int) *BulkProcessorService {
- s.numWorkers = num
- return s
- }
- // BulkActions specifies when to flush based on the number of actions
- // currently added. Defaults to 1000 and can be set to -1 to be disabled.
- func (s *BulkProcessorService) BulkActions(bulkActions int) *BulkProcessorService {
- s.bulkActions = bulkActions
- return s
- }
- // BulkSize specifies when to flush based on the size (in bytes) of the actions
- // currently added. Defaults to 5 MB and can be set to -1 to be disabled.
- func (s *BulkProcessorService) BulkSize(bulkSize int) *BulkProcessorService {
- s.bulkSize = bulkSize
- return s
- }
- // FlushInterval specifies when to flush at the end of the given interval.
- // This is disabled by default. If you want the bulk processor to
- // operate completely asynchronously, set both BulkActions and BulkSize to
- // -1 and set the FlushInterval to a meaningful interval.
- func (s *BulkProcessorService) FlushInterval(interval time.Duration) *BulkProcessorService {
- s.flushInterval = interval
- return s
- }
- // Stats tells bulk processor to gather stats while running.
- // Use Stats to return the stats. This is disabled by default.
- func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
- s.wantStats = wantStats
- return s
- }
- // Backoff sets the backoff strategy to use for errors
- func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService {
- s.backoff = backoff
- return s
- }
- // Do creates a new BulkProcessor and starts it.
- // Consider the BulkProcessor as a running instance that accepts bulk requests
- // and commits them to Elasticsearch, spreading the work across one or more
- // workers.
- //
- // You can interoperate with the BulkProcessor returned by Do, e.g. Start and
- // Stop (or Close) it.
- //
- // Context is an optional context that is passed into the bulk request
- // service calls. In contrast to other operations, this context is used in
- // a long running process. You could use it to pass e.g. loggers, but you
- // shouldn't use it for cancellation.
- //
- // Calling Do several times returns new BulkProcessors. You probably don't
- // want to do this. BulkProcessorService implements just a builder pattern.
- func (s *BulkProcessorService) Do(ctx context.Context) (*BulkProcessor, error) {
- p := newBulkProcessor(
- s.c,
- s.beforeFn,
- s.afterFn,
- s.name,
- s.numWorkers,
- s.bulkActions,
- s.bulkSize,
- s.flushInterval,
- s.wantStats,
- s.backoff)
- err := p.Start(ctx)
- if err != nil {
- return nil, err
- }
- return p, nil
- }
- // -- Bulk Processor Statistics --
- // BulkProcessorStats contains various statistics of a bulk processor
- // while it is running. Use the Stats func to return it while running.
- type BulkProcessorStats struct {
- Flushed int64 // number of times the flush interval has been invoked
- Committed int64 // # of times workers committed bulk requests
- Indexed int64 // # of requests indexed
- Created int64 // # of requests that ES reported as creates (201)
- Updated int64 // # of requests that ES reported as updates
- Deleted int64 // # of requests that ES reported as deletes
- Succeeded int64 // # of requests that ES reported as successful
- Failed int64 // # of requests that ES reported as failed
- Workers []*BulkProcessorWorkerStats // stats for each worker
- }
- // BulkProcessorWorkerStats represents per-worker statistics.
- type BulkProcessorWorkerStats struct {
- Queued int64 // # of requests queued in this worker
- LastDuration time.Duration // duration of last commit
- }
- // newBulkProcessorStats initializes and returns a BulkProcessorStats struct.
- func newBulkProcessorStats(workers int) *BulkProcessorStats {
- stats := &BulkProcessorStats{
- Workers: make([]*BulkProcessorWorkerStats, workers),
- }
- for i := 0; i < workers; i++ {
- stats.Workers[i] = &BulkProcessorWorkerStats{}
- }
- return stats
- }
- func (st *BulkProcessorStats) dup() *BulkProcessorStats {
- dst := new(BulkProcessorStats)
- dst.Flushed = st.Flushed
- dst.Committed = st.Committed
- dst.Indexed = st.Indexed
- dst.Created = st.Created
- dst.Updated = st.Updated
- dst.Deleted = st.Deleted
- dst.Succeeded = st.Succeeded
- dst.Failed = st.Failed
- for _, src := range st.Workers {
- dst.Workers = append(dst.Workers, src.dup())
- }
- return dst
- }
- func (st *BulkProcessorWorkerStats) dup() *BulkProcessorWorkerStats {
- dst := new(BulkProcessorWorkerStats)
- dst.Queued = st.Queued
- dst.LastDuration = st.LastDuration
- return dst
- }
- // -- Bulk Processor --
- // BulkProcessor encapsulates a task that accepts bulk requests and
- // orchestrates committing them to Elasticsearch via one or more workers.
- //
- // BulkProcessor is returned by setting up a BulkProcessorService and
- // calling the Do method.
- type BulkProcessor struct {
- c *Client
- beforeFn BulkBeforeFunc
- afterFn BulkAfterFunc
- name string
- bulkActions int
- bulkSize int
- numWorkers int
- executionId int64
- requestsC chan BulkableRequest
- workerWg sync.WaitGroup
- workers []*bulkWorker
- flushInterval time.Duration
- flusherStopC chan struct{}
- wantStats bool
- backoff Backoff
- startedMu sync.Mutex // guards the following block
- started bool
- statsMu sync.Mutex // guards the following block
- stats *BulkProcessorStats
- stopReconnC chan struct{} // channel to signal stop reconnection attempts
- }
- func newBulkProcessor(
- client *Client,
- beforeFn BulkBeforeFunc,
- afterFn BulkAfterFunc,
- name string,
- numWorkers int,
- bulkActions int,
- bulkSize int,
- flushInterval time.Duration,
- wantStats bool,
- backoff Backoff) *BulkProcessor {
- return &BulkProcessor{
- c: client,
- beforeFn: beforeFn,
- afterFn: afterFn,
- name: name,
- numWorkers: numWorkers,
- bulkActions: bulkActions,
- bulkSize: bulkSize,
- flushInterval: flushInterval,
- wantStats: wantStats,
- backoff: backoff,
- }
- }
- // Start starts the bulk processor. If the processor is already started,
- // nil is returned.
- func (p *BulkProcessor) Start(ctx context.Context) error {
- p.startedMu.Lock()
- defer p.startedMu.Unlock()
- if p.started {
- return nil
- }
- // We must have at least one worker.
- if p.numWorkers < 1 {
- p.numWorkers = 1
- }
- p.requestsC = make(chan BulkableRequest)
- p.executionId = 0
- p.stats = newBulkProcessorStats(p.numWorkers)
- p.stopReconnC = make(chan struct{})
- // Create and start up workers.
- p.workers = make([]*bulkWorker, p.numWorkers)
- for i := 0; i < p.numWorkers; i++ {
- p.workerWg.Add(1)
- p.workers[i] = newBulkWorker(p, i)
- go p.workers[i].work(ctx)
- }
- // Start the ticker for flush (if enabled)
- if int64(p.flushInterval) > 0 {
- p.flusherStopC = make(chan struct{})
- go p.flusher(p.flushInterval)
- }
- p.started = true
- return nil
- }
- // Stop is an alias for Close.
- func (p *BulkProcessor) Stop() error {
- return p.Close()
- }
- // Close stops the bulk processor previously started with Do.
- // If it is already stopped, this is a no-op and nil is returned.
- //
- // By implementing Close, BulkProcessor implements the io.Closer interface.
- func (p *BulkProcessor) Close() error {
- p.startedMu.Lock()
- defer p.startedMu.Unlock()
- // Already stopped? Do nothing.
- if !p.started {
- return nil
- }
- // Tell connection checkers to stop
- if p.stopReconnC != nil {
- close(p.stopReconnC)
- p.stopReconnC = nil
- }
- // Stop flusher (if enabled)
- if p.flusherStopC != nil {
- p.flusherStopC <- struct{}{}
- <-p.flusherStopC
- close(p.flusherStopC)
- p.flusherStopC = nil
- }
- // Stop all workers.
- close(p.requestsC)
- p.workerWg.Wait()
- p.started = false
- return nil
- }
- // Stats returns the latest bulk processor statistics.
- // Collecting stats must be enabled first by calling Stats(true) on
- // the service that created this processor.
- func (p *BulkProcessor) Stats() BulkProcessorStats {
- p.statsMu.Lock()
- defer p.statsMu.Unlock()
- return *p.stats.dup()
- }
- // Add adds a single request to commit by the BulkProcessorService.
- //
- // The caller is responsible for setting the index and type on the request.
- func (p *BulkProcessor) Add(request BulkableRequest) {
- p.requestsC <- request
- }
- // Flush manually asks all workers to commit their outstanding requests.
- // It returns only when all workers acknowledge completion.
- func (p *BulkProcessor) Flush() error {
- p.statsMu.Lock()
- p.stats.Flushed++
- p.statsMu.Unlock()
- for _, w := range p.workers {
- w.flushC <- struct{}{}
- <-w.flushAckC // wait for completion
- }
- return nil
- }
- // flusher is a single goroutine that periodically asks all workers to
- // commit their outstanding bulk requests. It is only started if
- // FlushInterval is greater than 0.
- func (p *BulkProcessor) flusher(interval time.Duration) {
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C: // Periodic flush
- p.Flush() // TODO swallow errors here?
- case <-p.flusherStopC:
- p.flusherStopC <- struct{}{}
- return
- }
- }
- }
- // -- Bulk Worker --
- // bulkWorker encapsulates a single worker, running in a goroutine,
- // receiving bulk requests and eventually committing them to Elasticsearch.
- // It is strongly bound to a BulkProcessor.
- type bulkWorker struct {
- p *BulkProcessor
- i int
- bulkActions int
- bulkSize int
- service *BulkService
- flushC chan struct{}
- flushAckC chan struct{}
- }
- // newBulkWorker creates a new bulkWorker instance.
- func newBulkWorker(p *BulkProcessor, i int) *bulkWorker {
- return &bulkWorker{
- p: p,
- i: i,
- bulkActions: p.bulkActions,
- bulkSize: p.bulkSize,
- service: NewBulkService(p.c),
- flushC: make(chan struct{}),
- flushAckC: make(chan struct{}),
- }
- }
- // work waits for bulk requests and manual flush calls on the respective
- // channels and is invoked as a goroutine when the bulk processor is started.
- func (w *bulkWorker) work(ctx context.Context) {
- defer func() {
- w.p.workerWg.Done()
- close(w.flushAckC)
- close(w.flushC)
- }()
- var stop bool
- for !stop {
- var err error
- select {
- case req, open := <-w.p.requestsC:
- if open {
- // Received a new request
- w.service.Add(req)
- if w.commitRequired() {
- err = w.commit(ctx)
- }
- } else {
- // Channel closed: Stop.
- stop = true
- if w.service.NumberOfActions() > 0 {
- err = w.commit(ctx)
- }
- }
- case <-w.flushC:
- // Commit outstanding requests
- if w.service.NumberOfActions() > 0 {
- err = w.commit(ctx)
- }
- w.flushAckC <- struct{}{}
- }
- if !stop && err != nil {
- waitForActive := func() {
- // Add back pressure to prevent Add calls from filling up the request queue
- ready := make(chan struct{})
- go w.waitForActiveConnection(ready)
- <-ready
- }
- if _, ok := err.(net.Error); ok {
- waitForActive()
- } else if IsConnErr(err) {
- waitForActive()
- }
- }
- }
- }
- // commit commits the bulk requests in the given service,
- // invoking callbacks as specified.
- func (w *bulkWorker) commit(ctx context.Context) error {
- var res *BulkResponse
- // commitFunc will commit bulk requests and, on failure, be retried
- // via exponential backoff
- commitFunc := func() error {
- var err error
- res, err = w.service.Do(ctx)
- return err
- }
- // notifyFunc will be called if retry fails
- notifyFunc := func(err error) {
- w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
- }
- id := atomic.AddInt64(&w.p.executionId, 1)
- // Update # documents in queue before eventual retries
- w.p.statsMu.Lock()
- if w.p.wantStats {
- w.p.stats.Workers[w.i].Queued = int64(len(w.service.requests))
- }
- w.p.statsMu.Unlock()
- // Save requests because they will be reset in commitFunc
- reqs := w.service.requests
- // Invoke before callback
- if w.p.beforeFn != nil {
- w.p.beforeFn(id, reqs)
- }
- // Commit bulk requests
- err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
- w.updateStats(res)
- if err != nil {
- w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
- }
- // Invoke after callback
- if w.p.afterFn != nil {
- w.p.afterFn(id, reqs, res, err)
- }
- return err
- }
- func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {
- defer close(ready)
- t := time.NewTicker(5 * time.Second)
- defer t.Stop()
- client := w.p.c
- stopReconnC := w.p.stopReconnC
- w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name)
- // loop until a health check finds at least 1 active connection or the reconnection channel is closed
- for {
- select {
- case _, ok := <-stopReconnC:
- if !ok {
- w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name)
- return
- }
- case <-t.C:
- client.healthcheck(time.Duration(3)*time.Second, true)
- if client.mustActiveConn() == nil {
- // found an active connection
- // exit and signal done to the WaitGroup
- return
- }
- }
- }
- }
- func (w *bulkWorker) updateStats(res *BulkResponse) {
- // Update stats
- if res != nil {
- w.p.statsMu.Lock()
- if w.p.wantStats {
- w.p.stats.Committed++
- if res != nil {
- w.p.stats.Indexed += int64(len(res.Indexed()))
- w.p.stats.Created += int64(len(res.Created()))
- w.p.stats.Updated += int64(len(res.Updated()))
- w.p.stats.Deleted += int64(len(res.Deleted()))
- w.p.stats.Succeeded += int64(len(res.Succeeded()))
- w.p.stats.Failed += int64(len(res.Failed()))
- }
- w.p.stats.Workers[w.i].Queued = int64(len(w.service.requests))
- w.p.stats.Workers[w.i].LastDuration = time.Duration(int64(res.Took)) * time.Millisecond
- }
- w.p.statsMu.Unlock()
- }
- }
- // commitRequired returns true if the service has to commit its
- // bulk requests. This can be either because the number of actions
- // or the estimated size in bytes is larger than specified in the
- // BulkProcessorService.
- func (w *bulkWorker) commitRequired() bool {
- if w.bulkActions >= 0 && w.service.NumberOfActions() >= w.bulkActions {
- return true
- }
- if w.bulkSize >= 0 && w.service.EstimatedSizeInBytes() >= int64(w.bulkSize) {
- return true
- }
- return false
- }
|