bulk_processor.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "context"
  7. "net"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. )
  12. // BulkProcessorService allows to easily process bulk requests. It allows setting
  13. // policies when to flush new bulk requests, e.g. based on a number of actions,
  14. // on the size of the actions, and/or to flush periodically. It also allows
  15. // to control the number of concurrent bulk requests allowed to be executed
  16. // in parallel.
  17. //
  18. // BulkProcessorService, by default, commits either every 1000 requests or when the
  19. // (estimated) size of the bulk requests exceeds 5 MB. However, it does not
  20. // commit periodically. BulkProcessorService also does retry by default, using
  21. // an exponential backoff algorithm.
  22. //
  23. // The caller is responsible for setting the index and type on every
  24. // bulk request added to BulkProcessorService.
  25. //
  26. // BulkProcessorService takes ideas from the BulkProcessor of the
  27. // Elasticsearch Java API as documented in
  28. // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.
  29. type BulkProcessorService struct {
  30. c *Client
  31. beforeFn BulkBeforeFunc
  32. afterFn BulkAfterFunc
  33. name string // name of processor
  34. numWorkers int // # of workers (>= 1)
  35. bulkActions int // # of requests after which to commit
  36. bulkSize int // # of bytes after which to commit
  37. flushInterval time.Duration // periodic flush interval
  38. wantStats bool // indicates whether to gather statistics
  39. backoff Backoff // a custom Backoff to use for errors
  40. }
  41. // NewBulkProcessorService creates a new BulkProcessorService.
  42. func NewBulkProcessorService(client *Client) *BulkProcessorService {
  43. return &BulkProcessorService{
  44. c: client,
  45. numWorkers: 1,
  46. bulkActions: 1000,
  47. bulkSize: 5 << 20, // 5 MB
  48. backoff: NewExponentialBackoff(
  49. time.Duration(200)*time.Millisecond,
  50. time.Duration(10000)*time.Millisecond,
  51. ),
  52. }
  53. }
  54. // BulkBeforeFunc defines the signature of callbacks that are executed
  55. // before a commit to Elasticsearch.
  56. type BulkBeforeFunc func(executionId int64, requests []BulkableRequest)
  57. // BulkAfterFunc defines the signature of callbacks that are executed
  58. // after a commit to Elasticsearch. The err parameter signals an error.
  59. type BulkAfterFunc func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error)
  60. // Before specifies a function to be executed before bulk requests get comitted
  61. // to Elasticsearch.
  62. func (s *BulkProcessorService) Before(fn BulkBeforeFunc) *BulkProcessorService {
  63. s.beforeFn = fn
  64. return s
  65. }
  66. // After specifies a function to be executed when bulk requests have been
  67. // comitted to Elasticsearch. The After callback executes both when the
  68. // commit was successful as well as on failures.
  69. func (s *BulkProcessorService) After(fn BulkAfterFunc) *BulkProcessorService {
  70. s.afterFn = fn
  71. return s
  72. }
  73. // Name is an optional name to identify this bulk processor.
  74. func (s *BulkProcessorService) Name(name string) *BulkProcessorService {
  75. s.name = name
  76. return s
  77. }
  78. // Workers is the number of concurrent workers allowed to be
  79. // executed. Defaults to 1 and must be greater or equal to 1.
  80. func (s *BulkProcessorService) Workers(num int) *BulkProcessorService {
  81. s.numWorkers = num
  82. return s
  83. }
  84. // BulkActions specifies when to flush based on the number of actions
  85. // currently added. Defaults to 1000 and can be set to -1 to be disabled.
  86. func (s *BulkProcessorService) BulkActions(bulkActions int) *BulkProcessorService {
  87. s.bulkActions = bulkActions
  88. return s
  89. }
  90. // BulkSize specifies when to flush based on the size (in bytes) of the actions
  91. // currently added. Defaults to 5 MB and can be set to -1 to be disabled.
  92. func (s *BulkProcessorService) BulkSize(bulkSize int) *BulkProcessorService {
  93. s.bulkSize = bulkSize
  94. return s
  95. }
  96. // FlushInterval specifies when to flush at the end of the given interval.
  97. // This is disabled by default. If you want the bulk processor to
  98. // operate completely asynchronously, set both BulkActions and BulkSize to
  99. // -1 and set the FlushInterval to a meaningful interval.
  100. func (s *BulkProcessorService) FlushInterval(interval time.Duration) *BulkProcessorService {
  101. s.flushInterval = interval
  102. return s
  103. }
  104. // Stats tells bulk processor to gather stats while running.
  105. // Use Stats to return the stats. This is disabled by default.
  106. func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
  107. s.wantStats = wantStats
  108. return s
  109. }
  110. // Backoff sets the backoff strategy to use for errors
  111. func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService {
  112. s.backoff = backoff
  113. return s
  114. }
  115. // Do creates a new BulkProcessor and starts it.
  116. // Consider the BulkProcessor as a running instance that accepts bulk requests
  117. // and commits them to Elasticsearch, spreading the work across one or more
  118. // workers.
  119. //
  120. // You can interoperate with the BulkProcessor returned by Do, e.g. Start and
  121. // Stop (or Close) it.
  122. //
  123. // Context is an optional context that is passed into the bulk request
  124. // service calls. In contrast to other operations, this context is used in
  125. // a long running process. You could use it to pass e.g. loggers, but you
  126. // shouldn't use it for cancellation.
  127. //
  128. // Calling Do several times returns new BulkProcessors. You probably don't
  129. // want to do this. BulkProcessorService implements just a builder pattern.
  130. func (s *BulkProcessorService) Do(ctx context.Context) (*BulkProcessor, error) {
  131. p := newBulkProcessor(
  132. s.c,
  133. s.beforeFn,
  134. s.afterFn,
  135. s.name,
  136. s.numWorkers,
  137. s.bulkActions,
  138. s.bulkSize,
  139. s.flushInterval,
  140. s.wantStats,
  141. s.backoff)
  142. err := p.Start(ctx)
  143. if err != nil {
  144. return nil, err
  145. }
  146. return p, nil
  147. }
  148. // -- Bulk Processor Statistics --
  149. // BulkProcessorStats contains various statistics of a bulk processor
  150. // while it is running. Use the Stats func to return it while running.
  151. type BulkProcessorStats struct {
  152. Flushed int64 // number of times the flush interval has been invoked
  153. Committed int64 // # of times workers committed bulk requests
  154. Indexed int64 // # of requests indexed
  155. Created int64 // # of requests that ES reported as creates (201)
  156. Updated int64 // # of requests that ES reported as updates
  157. Deleted int64 // # of requests that ES reported as deletes
  158. Succeeded int64 // # of requests that ES reported as successful
  159. Failed int64 // # of requests that ES reported as failed
  160. Workers []*BulkProcessorWorkerStats // stats for each worker
  161. }
  162. // BulkProcessorWorkerStats represents per-worker statistics.
  163. type BulkProcessorWorkerStats struct {
  164. Queued int64 // # of requests queued in this worker
  165. LastDuration time.Duration // duration of last commit
  166. }
  167. // newBulkProcessorStats initializes and returns a BulkProcessorStats struct.
  168. func newBulkProcessorStats(workers int) *BulkProcessorStats {
  169. stats := &BulkProcessorStats{
  170. Workers: make([]*BulkProcessorWorkerStats, workers),
  171. }
  172. for i := 0; i < workers; i++ {
  173. stats.Workers[i] = &BulkProcessorWorkerStats{}
  174. }
  175. return stats
  176. }
  177. func (st *BulkProcessorStats) dup() *BulkProcessorStats {
  178. dst := new(BulkProcessorStats)
  179. dst.Flushed = st.Flushed
  180. dst.Committed = st.Committed
  181. dst.Indexed = st.Indexed
  182. dst.Created = st.Created
  183. dst.Updated = st.Updated
  184. dst.Deleted = st.Deleted
  185. dst.Succeeded = st.Succeeded
  186. dst.Failed = st.Failed
  187. for _, src := range st.Workers {
  188. dst.Workers = append(dst.Workers, src.dup())
  189. }
  190. return dst
  191. }
  192. func (st *BulkProcessorWorkerStats) dup() *BulkProcessorWorkerStats {
  193. dst := new(BulkProcessorWorkerStats)
  194. dst.Queued = st.Queued
  195. dst.LastDuration = st.LastDuration
  196. return dst
  197. }
  198. // -- Bulk Processor --
  199. // BulkProcessor encapsulates a task that accepts bulk requests and
  200. // orchestrates committing them to Elasticsearch via one or more workers.
  201. //
  202. // BulkProcessor is returned by setting up a BulkProcessorService and
  203. // calling the Do method.
  204. type BulkProcessor struct {
  205. c *Client
  206. beforeFn BulkBeforeFunc
  207. afterFn BulkAfterFunc
  208. name string
  209. bulkActions int
  210. bulkSize int
  211. numWorkers int
  212. executionId int64
  213. requestsC chan BulkableRequest
  214. workerWg sync.WaitGroup
  215. workers []*bulkWorker
  216. flushInterval time.Duration
  217. flusherStopC chan struct{}
  218. wantStats bool
  219. backoff Backoff
  220. startedMu sync.Mutex // guards the following block
  221. started bool
  222. statsMu sync.Mutex // guards the following block
  223. stats *BulkProcessorStats
  224. stopReconnC chan struct{} // channel to signal stop reconnection attempts
  225. }
  226. func newBulkProcessor(
  227. client *Client,
  228. beforeFn BulkBeforeFunc,
  229. afterFn BulkAfterFunc,
  230. name string,
  231. numWorkers int,
  232. bulkActions int,
  233. bulkSize int,
  234. flushInterval time.Duration,
  235. wantStats bool,
  236. backoff Backoff) *BulkProcessor {
  237. return &BulkProcessor{
  238. c: client,
  239. beforeFn: beforeFn,
  240. afterFn: afterFn,
  241. name: name,
  242. numWorkers: numWorkers,
  243. bulkActions: bulkActions,
  244. bulkSize: bulkSize,
  245. flushInterval: flushInterval,
  246. wantStats: wantStats,
  247. backoff: backoff,
  248. }
  249. }
  250. // Start starts the bulk processor. If the processor is already started,
  251. // nil is returned.
  252. func (p *BulkProcessor) Start(ctx context.Context) error {
  253. p.startedMu.Lock()
  254. defer p.startedMu.Unlock()
  255. if p.started {
  256. return nil
  257. }
  258. // We must have at least one worker.
  259. if p.numWorkers < 1 {
  260. p.numWorkers = 1
  261. }
  262. p.requestsC = make(chan BulkableRequest)
  263. p.executionId = 0
  264. p.stats = newBulkProcessorStats(p.numWorkers)
  265. p.stopReconnC = make(chan struct{})
  266. // Create and start up workers.
  267. p.workers = make([]*bulkWorker, p.numWorkers)
  268. for i := 0; i < p.numWorkers; i++ {
  269. p.workerWg.Add(1)
  270. p.workers[i] = newBulkWorker(p, i)
  271. go p.workers[i].work(ctx)
  272. }
  273. // Start the ticker for flush (if enabled)
  274. if int64(p.flushInterval) > 0 {
  275. p.flusherStopC = make(chan struct{})
  276. go p.flusher(p.flushInterval)
  277. }
  278. p.started = true
  279. return nil
  280. }
  281. // Stop is an alias for Close.
  282. func (p *BulkProcessor) Stop() error {
  283. return p.Close()
  284. }
  285. // Close stops the bulk processor previously started with Do.
  286. // If it is already stopped, this is a no-op and nil is returned.
  287. //
  288. // By implementing Close, BulkProcessor implements the io.Closer interface.
  289. func (p *BulkProcessor) Close() error {
  290. p.startedMu.Lock()
  291. defer p.startedMu.Unlock()
  292. // Already stopped? Do nothing.
  293. if !p.started {
  294. return nil
  295. }
  296. // Tell connection checkers to stop
  297. if p.stopReconnC != nil {
  298. close(p.stopReconnC)
  299. p.stopReconnC = nil
  300. }
  301. // Stop flusher (if enabled)
  302. if p.flusherStopC != nil {
  303. p.flusherStopC <- struct{}{}
  304. <-p.flusherStopC
  305. close(p.flusherStopC)
  306. p.flusherStopC = nil
  307. }
  308. // Stop all workers.
  309. close(p.requestsC)
  310. p.workerWg.Wait()
  311. p.started = false
  312. return nil
  313. }
  314. // Stats returns the latest bulk processor statistics.
  315. // Collecting stats must be enabled first by calling Stats(true) on
  316. // the service that created this processor.
  317. func (p *BulkProcessor) Stats() BulkProcessorStats {
  318. p.statsMu.Lock()
  319. defer p.statsMu.Unlock()
  320. return *p.stats.dup()
  321. }
  322. // Add adds a single request to commit by the BulkProcessorService.
  323. //
  324. // The caller is responsible for setting the index and type on the request.
  325. func (p *BulkProcessor) Add(request BulkableRequest) {
  326. p.requestsC <- request
  327. }
  328. // Flush manually asks all workers to commit their outstanding requests.
  329. // It returns only when all workers acknowledge completion.
  330. func (p *BulkProcessor) Flush() error {
  331. p.statsMu.Lock()
  332. p.stats.Flushed++
  333. p.statsMu.Unlock()
  334. for _, w := range p.workers {
  335. w.flushC <- struct{}{}
  336. <-w.flushAckC // wait for completion
  337. }
  338. return nil
  339. }
  340. // flusher is a single goroutine that periodically asks all workers to
  341. // commit their outstanding bulk requests. It is only started if
  342. // FlushInterval is greater than 0.
  343. func (p *BulkProcessor) flusher(interval time.Duration) {
  344. ticker := time.NewTicker(interval)
  345. defer ticker.Stop()
  346. for {
  347. select {
  348. case <-ticker.C: // Periodic flush
  349. p.Flush() // TODO swallow errors here?
  350. case <-p.flusherStopC:
  351. p.flusherStopC <- struct{}{}
  352. return
  353. }
  354. }
  355. }
  356. // -- Bulk Worker --
  357. // bulkWorker encapsulates a single worker, running in a goroutine,
  358. // receiving bulk requests and eventually committing them to Elasticsearch.
  359. // It is strongly bound to a BulkProcessor.
  360. type bulkWorker struct {
  361. p *BulkProcessor
  362. i int
  363. bulkActions int
  364. bulkSize int
  365. service *BulkService
  366. flushC chan struct{}
  367. flushAckC chan struct{}
  368. }
  369. // newBulkWorker creates a new bulkWorker instance.
  370. func newBulkWorker(p *BulkProcessor, i int) *bulkWorker {
  371. return &bulkWorker{
  372. p: p,
  373. i: i,
  374. bulkActions: p.bulkActions,
  375. bulkSize: p.bulkSize,
  376. service: NewBulkService(p.c),
  377. flushC: make(chan struct{}),
  378. flushAckC: make(chan struct{}),
  379. }
  380. }
  381. // work waits for bulk requests and manual flush calls on the respective
  382. // channels and is invoked as a goroutine when the bulk processor is started.
  383. func (w *bulkWorker) work(ctx context.Context) {
  384. defer func() {
  385. w.p.workerWg.Done()
  386. close(w.flushAckC)
  387. close(w.flushC)
  388. }()
  389. var stop bool
  390. for !stop {
  391. var err error
  392. select {
  393. case req, open := <-w.p.requestsC:
  394. if open {
  395. // Received a new request
  396. w.service.Add(req)
  397. if w.commitRequired() {
  398. err = w.commit(ctx)
  399. }
  400. } else {
  401. // Channel closed: Stop.
  402. stop = true
  403. if w.service.NumberOfActions() > 0 {
  404. err = w.commit(ctx)
  405. }
  406. }
  407. case <-w.flushC:
  408. // Commit outstanding requests
  409. if w.service.NumberOfActions() > 0 {
  410. err = w.commit(ctx)
  411. }
  412. w.flushAckC <- struct{}{}
  413. }
  414. if !stop && err != nil {
  415. waitForActive := func() {
  416. // Add back pressure to prevent Add calls from filling up the request queue
  417. ready := make(chan struct{})
  418. go w.waitForActiveConnection(ready)
  419. <-ready
  420. }
  421. if _, ok := err.(net.Error); ok {
  422. waitForActive()
  423. } else if IsConnErr(err) {
  424. waitForActive()
  425. }
  426. }
  427. }
  428. }
  429. // commit commits the bulk requests in the given service,
  430. // invoking callbacks as specified.
  431. func (w *bulkWorker) commit(ctx context.Context) error {
  432. var res *BulkResponse
  433. // commitFunc will commit bulk requests and, on failure, be retried
  434. // via exponential backoff
  435. commitFunc := func() error {
  436. var err error
  437. res, err = w.service.Do(ctx)
  438. return err
  439. }
  440. // notifyFunc will be called if retry fails
  441. notifyFunc := func(err error) {
  442. w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
  443. }
  444. id := atomic.AddInt64(&w.p.executionId, 1)
  445. // Update # documents in queue before eventual retries
  446. w.p.statsMu.Lock()
  447. if w.p.wantStats {
  448. w.p.stats.Workers[w.i].Queued = int64(len(w.service.requests))
  449. }
  450. w.p.statsMu.Unlock()
  451. // Save requests because they will be reset in commitFunc
  452. reqs := w.service.requests
  453. // Invoke before callback
  454. if w.p.beforeFn != nil {
  455. w.p.beforeFn(id, reqs)
  456. }
  457. // Commit bulk requests
  458. err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
  459. w.updateStats(res)
  460. if err != nil {
  461. w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
  462. }
  463. // Invoke after callback
  464. if w.p.afterFn != nil {
  465. w.p.afterFn(id, reqs, res, err)
  466. }
  467. return err
  468. }
  469. func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {
  470. defer close(ready)
  471. t := time.NewTicker(5 * time.Second)
  472. defer t.Stop()
  473. client := w.p.c
  474. stopReconnC := w.p.stopReconnC
  475. w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name)
  476. // loop until a health check finds at least 1 active connection or the reconnection channel is closed
  477. for {
  478. select {
  479. case _, ok := <-stopReconnC:
  480. if !ok {
  481. w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name)
  482. return
  483. }
  484. case <-t.C:
  485. client.healthcheck(time.Duration(3)*time.Second, true)
  486. if client.mustActiveConn() == nil {
  487. // found an active connection
  488. // exit and signal done to the WaitGroup
  489. return
  490. }
  491. }
  492. }
  493. }
  494. func (w *bulkWorker) updateStats(res *BulkResponse) {
  495. // Update stats
  496. if res != nil {
  497. w.p.statsMu.Lock()
  498. if w.p.wantStats {
  499. w.p.stats.Committed++
  500. if res != nil {
  501. w.p.stats.Indexed += int64(len(res.Indexed()))
  502. w.p.stats.Created += int64(len(res.Created()))
  503. w.p.stats.Updated += int64(len(res.Updated()))
  504. w.p.stats.Deleted += int64(len(res.Deleted()))
  505. w.p.stats.Succeeded += int64(len(res.Succeeded()))
  506. w.p.stats.Failed += int64(len(res.Failed()))
  507. }
  508. w.p.stats.Workers[w.i].Queued = int64(len(w.service.requests))
  509. w.p.stats.Workers[w.i].LastDuration = time.Duration(int64(res.Took)) * time.Millisecond
  510. }
  511. w.p.statsMu.Unlock()
  512. }
  513. }
  514. // commitRequired returns true if the service has to commit its
  515. // bulk requests. This can be either because the number of actions
  516. // or the estimated size in bytes is larger than specified in the
  517. // BulkProcessorService.
  518. func (w *bulkWorker) commitRequired() bool {
  519. if w.bulkActions >= 0 && w.service.NumberOfActions() >= w.bulkActions {
  520. return true
  521. }
  522. if w.bulkSize >= 0 && w.service.EstimatedSizeInBytes() >= int64(w.bulkSize) {
  523. return true
  524. }
  525. return false
  526. }