client.go 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/json"
  9. "fmt"
  10. "log"
  11. "net/http"
  12. "net/http/httputil"
  13. "net/url"
  14. "os"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. "github.com/pkg/errors"
  20. "gopkg.in/olivere/elastic.v5/config"
  21. )
  22. const (
  23. // Version is the current version of Elastic.
  24. Version = "5.0.69"
  25. // DefaultURL is the default endpoint of Elasticsearch on the local machine.
  26. // It is used e.g. when initializing a new Client without a specific URL.
  27. DefaultURL = "http://127.0.0.1:9200"
  28. // DefaultScheme is the default protocol scheme to use when sniffing
  29. // the Elasticsearch cluster.
  30. DefaultScheme = "http"
  31. // DefaultHealthcheckEnabled specifies if healthchecks are enabled by default.
  32. DefaultHealthcheckEnabled = true
  33. // DefaultHealthcheckTimeoutStartup is the time the healthcheck waits
  34. // for a response from Elasticsearch on startup, i.e. when creating a
  35. // client. After the client is started, a shorter timeout is commonly used
  36. // (its default is specified in DefaultHealthcheckTimeout).
  37. DefaultHealthcheckTimeoutStartup = 5 * time.Second
  38. // DefaultHealthcheckTimeout specifies the time a running client waits for
  39. // a response from Elasticsearch. Notice that the healthcheck timeout
  40. // when a client is created is larger by default (see DefaultHealthcheckTimeoutStartup).
  41. DefaultHealthcheckTimeout = 1 * time.Second
  42. // DefaultHealthcheckInterval is the default interval between
  43. // two health checks of the nodes in the cluster.
  44. DefaultHealthcheckInterval = 60 * time.Second
  45. // DefaultSnifferEnabled specifies if the sniffer is enabled by default.
  46. DefaultSnifferEnabled = true
  47. // DefaultSnifferInterval is the interval between two sniffing procedures,
  48. // i.e. the lookup of all nodes in the cluster and their addition/removal
  49. // from the list of actual connections.
  50. DefaultSnifferInterval = 15 * time.Minute
  51. // DefaultSnifferTimeoutStartup is the default timeout for the sniffing
  52. // process that is initiated while creating a new client. For subsequent
  53. // sniffing processes, DefaultSnifferTimeout is used (by default).
  54. DefaultSnifferTimeoutStartup = 5 * time.Second
  55. // DefaultSnifferTimeout is the default timeout after which the
  56. // sniffing process times out. Notice that for the initial sniffing
  57. // process, DefaultSnifferTimeoutStartup is used.
  58. DefaultSnifferTimeout = 2 * time.Second
  59. // DefaultSendGetBodyAs is the HTTP method to use when elastic is sending
  60. // a GET request with a body.
  61. DefaultSendGetBodyAs = "GET"
  62. // DefaultGzipEnabled specifies if gzip compression is enabled by default.
  63. DefaultGzipEnabled = false
  64. // off is used to disable timeouts.
  65. off = -1 * time.Second
  66. )
  67. var (
  68. // ErrNoClient is raised when no Elasticsearch node is available.
  69. ErrNoClient = errors.New("no Elasticsearch node available")
  70. // ErrRetry is raised when a request cannot be executed after the configured
  71. // number of retries.
  72. ErrRetry = errors.New("cannot connect after several retries")
  73. // ErrTimeout is raised when a request timed out, e.g. when WaitForStatus
  74. // didn't return in time.
  75. ErrTimeout = errors.New("timeout")
  76. // noRetries is a retrier that does not retry.
  77. noRetries = NewStopRetrier()
  78. )
  79. // ClientOptionFunc is a function that configures a Client.
  80. // It is used in NewClient.
  81. type ClientOptionFunc func(*Client) error
  82. // Client is an Elasticsearch client. Create one by calling NewClient.
  83. type Client struct {
  84. c *http.Client // net/http Client to use for requests
  85. connsMu sync.RWMutex // connsMu guards the next block
  86. conns []*conn // all connections
  87. cindex int // index into conns
  88. mu sync.RWMutex // guards the next block
  89. urls []string // set of URLs passed initially to the client
  90. running bool // true if the client's background processes are running
  91. errorlog Logger // error log for critical messages
  92. infolog Logger // information log for e.g. response times
  93. tracelog Logger // trace log for debugging
  94. scheme string // http or https
  95. healthcheckEnabled bool // healthchecks enabled or disabled
  96. healthcheckTimeoutStartup time.Duration // time the healthcheck waits for a response from Elasticsearch on startup
  97. healthcheckTimeout time.Duration // time the healthcheck waits for a response from Elasticsearch
  98. healthcheckInterval time.Duration // interval between healthchecks
  99. healthcheckStop chan bool // notify healthchecker to stop, and notify back
  100. snifferEnabled bool // sniffer enabled or disabled
  101. snifferTimeoutStartup time.Duration // time the sniffer waits for a response from nodes info API on startup
  102. snifferTimeout time.Duration // time the sniffer waits for a response from nodes info API
  103. snifferInterval time.Duration // interval between sniffing
  104. snifferCallback SnifferCallback // callback to modify the sniffing decision
  105. snifferStop chan bool // notify sniffer to stop, and notify back
  106. decoder Decoder // used to decode data sent from Elasticsearch
  107. basicAuth bool // indicates whether to send HTTP Basic Auth credentials
  108. basicAuthUsername string // username for HTTP Basic Auth
  109. basicAuthPassword string // password for HTTP Basic Auth
  110. sendGetBodyAs string // override for when sending a GET with a body
  111. requiredPlugins []string // list of required plugins
  112. gzipEnabled bool // gzip compression enabled or disabled (default)
  113. retrier Retrier // strategy for retries
  114. }
  115. // NewClient creates a new client to work with Elasticsearch.
  116. //
  117. // NewClient, by default, is meant to be long-lived and shared across
  118. // your application. If you need a short-lived client, e.g. for request-scope,
  119. // consider using NewSimpleClient instead.
  120. //
  121. // The caller can configure the new client by passing configuration options
  122. // to the func.
  123. //
  124. // Example:
  125. //
  126. // client, err := elastic.NewClient(
  127. // elastic.SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"),
  128. // elastic.SetBasicAuth("user", "secret"))
  129. //
  130. // If no URL is configured, Elastic uses DefaultURL by default.
  131. //
  132. // If the sniffer is enabled (the default), the new client then sniffes
  133. // the cluster via the Nodes Info API
  134. // (see https://www.elastic.co/guide/en/elasticsearch/reference/5.2/cluster-nodes-info.html#cluster-nodes-info).
  135. // It uses the URLs specified by the caller. The caller is responsible
  136. // to only pass a list of URLs of nodes that belong to the same cluster.
  137. // This sniffing process is run on startup and periodically.
  138. // Use SnifferInterval to set the interval between two sniffs (default is
  139. // 15 minutes). In other words: By default, the client will find new nodes
  140. // in the cluster and remove those that are no longer available every
  141. // 15 minutes. Disable the sniffer by passing SetSniff(false) to NewClient.
  142. //
  143. // The list of nodes found in the sniffing process will be used to make
  144. // connections to the REST API of Elasticsearch. These nodes are also
  145. // periodically checked in a shorter time frame. This process is called
  146. // a health check. By default, a health check is done every 60 seconds.
  147. // You can set a shorter or longer interval by SetHealthcheckInterval.
  148. // Disabling health checks is not recommended, but can be done by
  149. // SetHealthcheck(false).
  150. //
  151. // Connections are automatically marked as dead or healthy while
  152. // making requests to Elasticsearch. When a request fails, Elastic will
  153. // call into the Retry strategy which can be specified with SetRetry.
  154. // The Retry strategy is also responsible for handling backoff i.e. the time
  155. // to wait before starting the next request. There are various standard
  156. // backoff implementations, e.g. ExponentialBackoff or SimpleBackoff.
  157. // Retries are disabled by default.
  158. //
  159. // If no HttpClient is configured, then http.DefaultClient is used.
  160. // You can use your own http.Client with some http.Transport for
  161. // advanced scenarios.
  162. //
  163. // An error is also returned when some configuration option is invalid or
  164. // the new client cannot sniff the cluster (if enabled).
  165. func NewClient(options ...ClientOptionFunc) (*Client, error) {
  166. // Set up the client
  167. c := &Client{
  168. c: http.DefaultClient,
  169. conns: make([]*conn, 0),
  170. cindex: -1,
  171. scheme: DefaultScheme,
  172. decoder: &DefaultDecoder{},
  173. healthcheckEnabled: DefaultHealthcheckEnabled,
  174. healthcheckTimeoutStartup: DefaultHealthcheckTimeoutStartup,
  175. healthcheckTimeout: DefaultHealthcheckTimeout,
  176. healthcheckInterval: DefaultHealthcheckInterval,
  177. healthcheckStop: make(chan bool),
  178. snifferEnabled: DefaultSnifferEnabled,
  179. snifferTimeoutStartup: DefaultSnifferTimeoutStartup,
  180. snifferTimeout: DefaultSnifferTimeout,
  181. snifferInterval: DefaultSnifferInterval,
  182. snifferCallback: nopSnifferCallback,
  183. snifferStop: make(chan bool),
  184. sendGetBodyAs: DefaultSendGetBodyAs,
  185. gzipEnabled: DefaultGzipEnabled,
  186. retrier: noRetries, // no retries by default
  187. }
  188. // Run the options on it
  189. for _, option := range options {
  190. if err := option(c); err != nil {
  191. return nil, err
  192. }
  193. }
  194. // Use a default URL and normalize them
  195. if len(c.urls) == 0 {
  196. c.urls = []string{DefaultURL}
  197. }
  198. c.urls = canonicalize(c.urls...)
  199. // If the URLs have auth info, use them here as an alternative to SetBasicAuth
  200. if !c.basicAuth {
  201. for _, urlStr := range c.urls {
  202. u, err := url.Parse(urlStr)
  203. if err == nil && u.User != nil {
  204. c.basicAuth = true
  205. c.basicAuthUsername = u.User.Username()
  206. c.basicAuthPassword, _ = u.User.Password()
  207. break
  208. }
  209. }
  210. }
  211. // Check if we can make a request to any of the specified URLs
  212. if c.healthcheckEnabled {
  213. if err := c.startupHealthcheck(c.healthcheckTimeoutStartup); err != nil {
  214. return nil, err
  215. }
  216. }
  217. if c.snifferEnabled {
  218. // Sniff the cluster initially
  219. if err := c.sniff(c.snifferTimeoutStartup); err != nil {
  220. return nil, err
  221. }
  222. } else {
  223. // Do not sniff the cluster initially. Use the provided URLs instead.
  224. for _, url := range c.urls {
  225. c.conns = append(c.conns, newConn(url, url))
  226. }
  227. }
  228. if c.healthcheckEnabled {
  229. // Perform an initial health check
  230. c.healthcheck(c.healthcheckTimeoutStartup, true)
  231. }
  232. // Ensure that we have at least one connection available
  233. if err := c.mustActiveConn(); err != nil {
  234. return nil, err
  235. }
  236. // Check the required plugins
  237. for _, plugin := range c.requiredPlugins {
  238. found, err := c.HasPlugin(plugin)
  239. if err != nil {
  240. return nil, err
  241. }
  242. if !found {
  243. return nil, fmt.Errorf("elastic: plugin %s not found", plugin)
  244. }
  245. }
  246. if c.snifferEnabled {
  247. go c.sniffer() // periodically update cluster information
  248. }
  249. if c.healthcheckEnabled {
  250. go c.healthchecker() // start goroutine periodically ping all nodes of the cluster
  251. }
  252. c.mu.Lock()
  253. c.running = true
  254. c.mu.Unlock()
  255. return c, nil
  256. }
  257. // NewClientFromConfig initializes a client from a configuration.
  258. func NewClientFromConfig(cfg *config.Config) (*Client, error) {
  259. var options []ClientOptionFunc
  260. if cfg != nil {
  261. if cfg.URL != "" {
  262. options = append(options, SetURL(cfg.URL))
  263. }
  264. if cfg.Errorlog != "" {
  265. f, err := os.OpenFile(cfg.Errorlog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  266. if err != nil {
  267. return nil, errors.Wrap(err, "unable to initialize error log")
  268. }
  269. l := log.New(f, "", 0)
  270. options = append(options, SetErrorLog(l))
  271. }
  272. if cfg.Tracelog != "" {
  273. f, err := os.OpenFile(cfg.Tracelog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  274. if err != nil {
  275. return nil, errors.Wrap(err, "unable to initialize trace log")
  276. }
  277. l := log.New(f, "", 0)
  278. options = append(options, SetTraceLog(l))
  279. }
  280. if cfg.Infolog != "" {
  281. f, err := os.OpenFile(cfg.Infolog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  282. if err != nil {
  283. return nil, errors.Wrap(err, "unable to initialize info log")
  284. }
  285. l := log.New(f, "", 0)
  286. options = append(options, SetInfoLog(l))
  287. }
  288. if cfg.Username != "" || cfg.Password != "" {
  289. options = append(options, SetBasicAuth(cfg.Username, cfg.Password))
  290. }
  291. if cfg.Sniff != nil {
  292. options = append(options, SetSniff(*cfg.Sniff))
  293. }
  294. }
  295. return NewClient(options...)
  296. }
  297. // NewSimpleClient creates a new short-lived Client that can be used in
  298. // use cases where you need e.g. one client per request.
  299. //
  300. // While NewClient by default sets up e.g. periodic health checks
  301. // and sniffing for new nodes in separate goroutines, NewSimpleClient does
  302. // not and is meant as a simple replacement where you don't need all the
  303. // heavy lifting of NewClient.
  304. //
  305. // NewSimpleClient does the following by default: First, all health checks
  306. // are disabled, including timeouts and periodic checks. Second, sniffing
  307. // is disabled, including timeouts and periodic checks. The number of retries
  308. // is set to 1. NewSimpleClient also does not start any goroutines.
  309. //
  310. // Notice that you can still override settings by passing additional options,
  311. // just like with NewClient.
  312. func NewSimpleClient(options ...ClientOptionFunc) (*Client, error) {
  313. c := &Client{
  314. c: http.DefaultClient,
  315. conns: make([]*conn, 0),
  316. cindex: -1,
  317. scheme: DefaultScheme,
  318. decoder: &DefaultDecoder{},
  319. healthcheckEnabled: false,
  320. healthcheckTimeoutStartup: off,
  321. healthcheckTimeout: off,
  322. healthcheckInterval: off,
  323. healthcheckStop: make(chan bool),
  324. snifferEnabled: false,
  325. snifferTimeoutStartup: off,
  326. snifferTimeout: off,
  327. snifferInterval: off,
  328. snifferCallback: nopSnifferCallback,
  329. snifferStop: make(chan bool),
  330. sendGetBodyAs: DefaultSendGetBodyAs,
  331. gzipEnabled: DefaultGzipEnabled,
  332. retrier: noRetries, // no retries by default
  333. }
  334. // Run the options on it
  335. for _, option := range options {
  336. if err := option(c); err != nil {
  337. return nil, err
  338. }
  339. }
  340. // Use a default URL and normalize them
  341. if len(c.urls) == 0 {
  342. c.urls = []string{DefaultURL}
  343. }
  344. c.urls = canonicalize(c.urls...)
  345. // If the URLs have auth info, use them here as an alternative to SetBasicAuth
  346. if !c.basicAuth {
  347. for _, urlStr := range c.urls {
  348. u, err := url.Parse(urlStr)
  349. if err == nil && u.User != nil {
  350. c.basicAuth = true
  351. c.basicAuthUsername = u.User.Username()
  352. c.basicAuthPassword, _ = u.User.Password()
  353. break
  354. }
  355. }
  356. }
  357. for _, url := range c.urls {
  358. c.conns = append(c.conns, newConn(url, url))
  359. }
  360. // Ensure that we have at least one connection available
  361. if err := c.mustActiveConn(); err != nil {
  362. return nil, err
  363. }
  364. // Check the required plugins
  365. for _, plugin := range c.requiredPlugins {
  366. found, err := c.HasPlugin(plugin)
  367. if err != nil {
  368. return nil, err
  369. }
  370. if !found {
  371. return nil, fmt.Errorf("elastic: plugin %s not found", plugin)
  372. }
  373. }
  374. c.mu.Lock()
  375. c.running = true
  376. c.mu.Unlock()
  377. return c, nil
  378. }
  379. // SetHttpClient can be used to specify the http.Client to use when making
  380. // HTTP requests to Elasticsearch.
  381. func SetHttpClient(httpClient *http.Client) ClientOptionFunc {
  382. return func(c *Client) error {
  383. if httpClient != nil {
  384. c.c = httpClient
  385. } else {
  386. c.c = http.DefaultClient
  387. }
  388. return nil
  389. }
  390. }
  391. // SetBasicAuth can be used to specify the HTTP Basic Auth credentials to
  392. // use when making HTTP requests to Elasticsearch.
  393. func SetBasicAuth(username, password string) ClientOptionFunc {
  394. return func(c *Client) error {
  395. c.basicAuthUsername = username
  396. c.basicAuthPassword = password
  397. c.basicAuth = c.basicAuthUsername != "" || c.basicAuthPassword != ""
  398. return nil
  399. }
  400. }
  401. // SetURL defines the URL endpoints of the Elasticsearch nodes. Notice that
  402. // when sniffing is enabled, these URLs are used to initially sniff the
  403. // cluster on startup.
  404. func SetURL(urls ...string) ClientOptionFunc {
  405. return func(c *Client) error {
  406. switch len(urls) {
  407. case 0:
  408. c.urls = []string{DefaultURL}
  409. default:
  410. c.urls = urls
  411. }
  412. return nil
  413. }
  414. }
  415. // SetScheme sets the HTTP scheme to look for when sniffing (http or https).
  416. // This is http by default.
  417. func SetScheme(scheme string) ClientOptionFunc {
  418. return func(c *Client) error {
  419. c.scheme = scheme
  420. return nil
  421. }
  422. }
  423. // SetSniff enables or disables the sniffer (enabled by default).
  424. func SetSniff(enabled bool) ClientOptionFunc {
  425. return func(c *Client) error {
  426. c.snifferEnabled = enabled
  427. return nil
  428. }
  429. }
  430. // SetSnifferTimeoutStartup sets the timeout for the sniffer that is used
  431. // when creating a new client. The default is 5 seconds. Notice that the
  432. // timeout being used for subsequent sniffing processes is set with
  433. // SetSnifferTimeout.
  434. func SetSnifferTimeoutStartup(timeout time.Duration) ClientOptionFunc {
  435. return func(c *Client) error {
  436. c.snifferTimeoutStartup = timeout
  437. return nil
  438. }
  439. }
  440. // SetSnifferTimeout sets the timeout for the sniffer that finds the
  441. // nodes in a cluster. The default is 2 seconds. Notice that the timeout
  442. // used when creating a new client on startup is usually greater and can
  443. // be set with SetSnifferTimeoutStartup.
  444. func SetSnifferTimeout(timeout time.Duration) ClientOptionFunc {
  445. return func(c *Client) error {
  446. c.snifferTimeout = timeout
  447. return nil
  448. }
  449. }
  450. // SetSnifferInterval sets the interval between two sniffing processes.
  451. // The default interval is 15 minutes.
  452. func SetSnifferInterval(interval time.Duration) ClientOptionFunc {
  453. return func(c *Client) error {
  454. c.snifferInterval = interval
  455. return nil
  456. }
  457. }
  458. // SnifferCallback defines the protocol for sniffing decisions.
  459. type SnifferCallback func(*NodesInfoNode) bool
  460. // nopSnifferCallback is the default sniffer callback: It accepts
  461. // all nodes the sniffer finds.
  462. var nopSnifferCallback = func(*NodesInfoNode) bool { return true }
  463. // SetSnifferCallback allows the caller to modify sniffer decisions.
  464. // When setting the callback, the given SnifferCallback is called for
  465. // each (healthy) node found during the sniffing process.
  466. // If the callback returns false, the node is ignored: No requests
  467. // are routed to it.
  468. func SetSnifferCallback(f SnifferCallback) ClientOptionFunc {
  469. return func(c *Client) error {
  470. if f != nil {
  471. c.snifferCallback = f
  472. }
  473. return nil
  474. }
  475. }
  476. // SetHealthcheck enables or disables healthchecks (enabled by default).
  477. func SetHealthcheck(enabled bool) ClientOptionFunc {
  478. return func(c *Client) error {
  479. c.healthcheckEnabled = enabled
  480. return nil
  481. }
  482. }
  483. // SetHealthcheckTimeoutStartup sets the timeout for the initial health check.
  484. // The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup).
  485. // Notice that timeouts for subsequent health checks can be modified with
  486. // SetHealthcheckTimeout.
  487. func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc {
  488. return func(c *Client) error {
  489. c.healthcheckTimeoutStartup = timeout
  490. return nil
  491. }
  492. }
  493. // SetHealthcheckTimeout sets the timeout for periodic health checks.
  494. // The default timeout is 1 second (see DefaultHealthcheckTimeout).
  495. // Notice that a different (usually larger) timeout is used for the initial
  496. // healthcheck, which is initiated while creating a new client.
  497. // The startup timeout can be modified with SetHealthcheckTimeoutStartup.
  498. func SetHealthcheckTimeout(timeout time.Duration) ClientOptionFunc {
  499. return func(c *Client) error {
  500. c.healthcheckTimeout = timeout
  501. return nil
  502. }
  503. }
  504. // SetHealthcheckInterval sets the interval between two health checks.
  505. // The default interval is 60 seconds.
  506. func SetHealthcheckInterval(interval time.Duration) ClientOptionFunc {
  507. return func(c *Client) error {
  508. c.healthcheckInterval = interval
  509. return nil
  510. }
  511. }
  512. // SetMaxRetries sets the maximum number of retries before giving up when
  513. // performing a HTTP request to Elasticsearch.
  514. //
  515. // Deprecated: Replace with a Retry implementation.
  516. func SetMaxRetries(maxRetries int) ClientOptionFunc {
  517. return func(c *Client) error {
  518. if maxRetries < 0 {
  519. return errors.New("MaxRetries must be greater than or equal to 0")
  520. } else if maxRetries == 0 {
  521. c.retrier = noRetries
  522. } else {
  523. // Create a Retrier that will wait for 100ms (+/- jitter) between requests.
  524. // This resembles the old behavior with maxRetries.
  525. ticks := make([]int, maxRetries)
  526. for i := 0; i < len(ticks); i++ {
  527. ticks[i] = 100
  528. }
  529. backoff := NewSimpleBackoff(ticks...)
  530. c.retrier = NewBackoffRetrier(backoff)
  531. }
  532. return nil
  533. }
  534. }
  535. // SetGzip enables or disables gzip compression (disabled by default).
  536. func SetGzip(enabled bool) ClientOptionFunc {
  537. return func(c *Client) error {
  538. c.gzipEnabled = enabled
  539. return nil
  540. }
  541. }
  542. // SetDecoder sets the Decoder to use when decoding data from Elasticsearch.
  543. // DefaultDecoder is used by default.
  544. func SetDecoder(decoder Decoder) ClientOptionFunc {
  545. return func(c *Client) error {
  546. if decoder != nil {
  547. c.decoder = decoder
  548. } else {
  549. c.decoder = &DefaultDecoder{}
  550. }
  551. return nil
  552. }
  553. }
  554. // SetRequiredPlugins can be used to indicate that some plugins are required
  555. // before a Client will be created.
  556. func SetRequiredPlugins(plugins ...string) ClientOptionFunc {
  557. return func(c *Client) error {
  558. if c.requiredPlugins == nil {
  559. c.requiredPlugins = make([]string, 0)
  560. }
  561. c.requiredPlugins = append(c.requiredPlugins, plugins...)
  562. return nil
  563. }
  564. }
  565. // SetErrorLog sets the logger for critical messages like nodes joining
  566. // or leaving the cluster or failing requests. It is nil by default.
  567. func SetErrorLog(logger Logger) ClientOptionFunc {
  568. return func(c *Client) error {
  569. c.errorlog = logger
  570. return nil
  571. }
  572. }
  573. // SetInfoLog sets the logger for informational messages, e.g. requests
  574. // and their response times. It is nil by default.
  575. func SetInfoLog(logger Logger) ClientOptionFunc {
  576. return func(c *Client) error {
  577. c.infolog = logger
  578. return nil
  579. }
  580. }
  581. // SetTraceLog specifies the log.Logger to use for output of HTTP requests
  582. // and responses which is helpful during debugging. It is nil by default.
  583. func SetTraceLog(logger Logger) ClientOptionFunc {
  584. return func(c *Client) error {
  585. c.tracelog = logger
  586. return nil
  587. }
  588. }
  589. // SetSendGetBodyAs specifies the HTTP method to use when sending a GET request
  590. // with a body. It is GET by default.
  591. func SetSendGetBodyAs(httpMethod string) ClientOptionFunc {
  592. return func(c *Client) error {
  593. c.sendGetBodyAs = httpMethod
  594. return nil
  595. }
  596. }
  597. // SetRetrier specifies the retry strategy that handles errors during
  598. // HTTP request/response with Elasticsearch.
  599. func SetRetrier(retrier Retrier) ClientOptionFunc {
  600. return func(c *Client) error {
  601. if retrier == nil {
  602. retrier = noRetries // no retries by default
  603. }
  604. c.retrier = retrier
  605. return nil
  606. }
  607. }
  608. // String returns a string representation of the client status.
  609. func (c *Client) String() string {
  610. c.connsMu.Lock()
  611. conns := c.conns
  612. c.connsMu.Unlock()
  613. var buf bytes.Buffer
  614. for i, conn := range conns {
  615. if i > 0 {
  616. buf.WriteString(", ")
  617. }
  618. buf.WriteString(conn.String())
  619. }
  620. return buf.String()
  621. }
  622. // IsRunning returns true if the background processes of the client are
  623. // running, false otherwise.
  624. func (c *Client) IsRunning() bool {
  625. c.mu.RLock()
  626. defer c.mu.RUnlock()
  627. return c.running
  628. }
  629. // Start starts the background processes like sniffing the cluster and
  630. // periodic health checks. You don't need to run Start when creating a
  631. // client with NewClient; the background processes are run by default.
  632. //
  633. // If the background processes are already running, this is a no-op.
  634. func (c *Client) Start() {
  635. c.mu.RLock()
  636. if c.running {
  637. c.mu.RUnlock()
  638. return
  639. }
  640. c.mu.RUnlock()
  641. if c.snifferEnabled {
  642. go c.sniffer()
  643. }
  644. if c.healthcheckEnabled {
  645. go c.healthchecker()
  646. }
  647. c.mu.Lock()
  648. c.running = true
  649. c.mu.Unlock()
  650. c.infof("elastic: client started")
  651. }
  652. // Stop stops the background processes that the client is running,
  653. // i.e. sniffing the cluster periodically and running health checks
  654. // on the nodes.
  655. //
  656. // If the background processes are not running, this is a no-op.
  657. func (c *Client) Stop() {
  658. c.mu.RLock()
  659. if !c.running {
  660. c.mu.RUnlock()
  661. return
  662. }
  663. c.mu.RUnlock()
  664. if c.healthcheckEnabled {
  665. c.healthcheckStop <- true
  666. <-c.healthcheckStop
  667. }
  668. if c.snifferEnabled {
  669. c.snifferStop <- true
  670. <-c.snifferStop
  671. }
  672. c.mu.Lock()
  673. c.running = false
  674. c.mu.Unlock()
  675. c.infof("elastic: client stopped")
  676. }
  677. // errorf logs to the error log.
  678. func (c *Client) errorf(format string, args ...interface{}) {
  679. if c.errorlog != nil {
  680. c.errorlog.Printf(format, args...)
  681. }
  682. }
  683. // infof logs informational messages.
  684. func (c *Client) infof(format string, args ...interface{}) {
  685. if c.infolog != nil {
  686. c.infolog.Printf(format, args...)
  687. }
  688. }
  689. // tracef logs to the trace log.
  690. func (c *Client) tracef(format string, args ...interface{}) {
  691. if c.tracelog != nil {
  692. c.tracelog.Printf(format, args...)
  693. }
  694. }
  695. // dumpRequest dumps the given HTTP request to the trace log.
  696. func (c *Client) dumpRequest(r *http.Request) {
  697. if c.tracelog != nil {
  698. out, err := httputil.DumpRequestOut(r, true)
  699. if err == nil {
  700. c.tracef("%s\n", string(out))
  701. }
  702. }
  703. }
  704. // dumpResponse dumps the given HTTP response to the trace log.
  705. func (c *Client) dumpResponse(resp *http.Response) {
  706. if c.tracelog != nil {
  707. out, err := httputil.DumpResponse(resp, true)
  708. if err == nil {
  709. c.tracef("%s\n", string(out))
  710. }
  711. }
  712. }
  713. // sniffer periodically runs sniff.
  714. func (c *Client) sniffer() {
  715. c.mu.RLock()
  716. timeout := c.snifferTimeout
  717. interval := c.snifferInterval
  718. c.mu.RUnlock()
  719. ticker := time.NewTicker(interval)
  720. defer ticker.Stop()
  721. for {
  722. select {
  723. case <-c.snifferStop:
  724. // we are asked to stop, so we signal back that we're stopping now
  725. c.snifferStop <- true
  726. return
  727. case <-ticker.C:
  728. c.sniff(timeout)
  729. }
  730. }
  731. }
  732. // sniff uses the Node Info API to return the list of nodes in the cluster.
  733. // It uses the list of URLs passed on startup plus the list of URLs found
  734. // by the preceding sniffing process (if sniffing is enabled).
  735. //
  736. // If sniffing is disabled, this is a no-op.
  737. func (c *Client) sniff(timeout time.Duration) error {
  738. c.mu.RLock()
  739. if !c.snifferEnabled {
  740. c.mu.RUnlock()
  741. return nil
  742. }
  743. // Use all available URLs provided to sniff the cluster.
  744. var urls []string
  745. urlsMap := make(map[string]bool)
  746. // Add all URLs provided on startup
  747. for _, url := range c.urls {
  748. urlsMap[url] = true
  749. urls = append(urls, url)
  750. }
  751. c.mu.RUnlock()
  752. // Add all URLs found by sniffing
  753. c.connsMu.RLock()
  754. for _, conn := range c.conns {
  755. if !conn.IsDead() {
  756. url := conn.URL()
  757. if _, found := urlsMap[url]; !found {
  758. urls = append(urls, url)
  759. }
  760. }
  761. }
  762. c.connsMu.RUnlock()
  763. if len(urls) == 0 {
  764. return errors.Wrap(ErrNoClient, "no URLs found")
  765. }
  766. // Start sniffing on all found URLs
  767. ch := make(chan []*conn, len(urls))
  768. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  769. defer cancel()
  770. for _, url := range urls {
  771. go func(url string) { ch <- c.sniffNode(ctx, url) }(url)
  772. }
  773. // Wait for the results to come back, or the process times out.
  774. for {
  775. select {
  776. case conns := <-ch:
  777. if len(conns) > 0 {
  778. c.updateConns(conns)
  779. return nil
  780. }
  781. case <-ctx.Done():
  782. // We get here if no cluster responds in time
  783. return errors.Wrap(ErrNoClient, "sniff timeout")
  784. }
  785. }
  786. }
  787. // sniffNode sniffs a single node. This method is run as a goroutine
  788. // in sniff. If successful, it returns the list of node URLs extracted
  789. // from the result of calling Nodes Info API. Otherwise, an empty array
  790. // is returned.
  791. func (c *Client) sniffNode(ctx context.Context, url string) []*conn {
  792. var nodes []*conn
  793. // Call the Nodes Info API at /_nodes/http
  794. req, err := NewRequest("GET", url+"/_nodes/http")
  795. if err != nil {
  796. return nodes
  797. }
  798. c.mu.RLock()
  799. if c.basicAuth {
  800. req.SetBasicAuth(c.basicAuthUsername, c.basicAuthPassword)
  801. }
  802. c.mu.RUnlock()
  803. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  804. if err != nil {
  805. return nodes
  806. }
  807. if res == nil {
  808. return nodes
  809. }
  810. if res.Body != nil {
  811. defer res.Body.Close()
  812. }
  813. var info NodesInfoResponse
  814. if err := json.NewDecoder(res.Body).Decode(&info); err == nil {
  815. if len(info.Nodes) > 0 {
  816. for nodeID, node := range info.Nodes {
  817. if c.snifferCallback(node) {
  818. if node.HTTP != nil && len(node.HTTP.PublishAddress) > 0 {
  819. url := c.extractHostname(c.scheme, node.HTTP.PublishAddress)
  820. if url != "" {
  821. nodes = append(nodes, newConn(nodeID, url))
  822. }
  823. }
  824. }
  825. }
  826. }
  827. }
  828. return nodes
  829. }
  830. // reSniffHostAndPort is used to extract hostname and port from a result
  831. // from a Nodes Info API (example: "inet[/127.0.0.1:9200]").
  832. var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`)
  833. func (c *Client) extractHostname(scheme, address string) string {
  834. if strings.HasPrefix(address, "inet") {
  835. m := reSniffHostAndPort.FindStringSubmatch(address)
  836. if len(m) == 3 {
  837. return fmt.Sprintf("%s://%s:%s", scheme, m[1], m[2])
  838. }
  839. }
  840. s := address
  841. if idx := strings.Index(s, "/"); idx >= 0 {
  842. s = s[idx+1:]
  843. }
  844. if strings.Index(s, ":") < 0 {
  845. return ""
  846. }
  847. return fmt.Sprintf("%s://%s", scheme, s)
  848. }
  849. // updateConns updates the clients' connections with new information
  850. // gather by a sniff operation.
  851. func (c *Client) updateConns(conns []*conn) {
  852. c.connsMu.Lock()
  853. // Build up new connections:
  854. // If we find an existing connection, use that (including no. of failures etc.).
  855. // If we find a new connection, add it.
  856. var newConns []*conn
  857. for _, conn := range conns {
  858. var found bool
  859. for _, oldConn := range c.conns {
  860. if oldConn.NodeID() == conn.NodeID() {
  861. // Take over the old connection
  862. newConns = append(newConns, oldConn)
  863. found = true
  864. break
  865. }
  866. }
  867. if !found {
  868. // New connection didn't exist, so add it to our list of new conns.
  869. c.infof("elastic: %s joined the cluster", conn.URL())
  870. newConns = append(newConns, conn)
  871. }
  872. }
  873. c.conns = newConns
  874. c.cindex = -1
  875. c.connsMu.Unlock()
  876. }
  877. // healthchecker periodically runs healthcheck.
  878. func (c *Client) healthchecker() {
  879. c.mu.RLock()
  880. timeout := c.healthcheckTimeout
  881. interval := c.healthcheckInterval
  882. c.mu.RUnlock()
  883. ticker := time.NewTicker(interval)
  884. defer ticker.Stop()
  885. for {
  886. select {
  887. case <-c.healthcheckStop:
  888. // we are asked to stop, so we signal back that we're stopping now
  889. c.healthcheckStop <- true
  890. return
  891. case <-ticker.C:
  892. c.healthcheck(timeout, false)
  893. }
  894. }
  895. }
  896. // healthcheck does a health check on all nodes in the cluster. Depending on
  897. // the node state, it marks connections as dead, sets them alive etc.
  898. // If healthchecks are disabled and force is false, this is a no-op.
  899. // The timeout specifies how long to wait for a response from Elasticsearch.
  900. func (c *Client) healthcheck(timeout time.Duration, force bool) {
  901. c.mu.RLock()
  902. if !c.healthcheckEnabled && !force {
  903. c.mu.RUnlock()
  904. return
  905. }
  906. basicAuth := c.basicAuth
  907. basicAuthUsername := c.basicAuthUsername
  908. basicAuthPassword := c.basicAuthPassword
  909. c.mu.RUnlock()
  910. c.connsMu.RLock()
  911. conns := c.conns
  912. c.connsMu.RUnlock()
  913. for _, conn := range conns {
  914. // Run the HEAD request against ES with a timeout
  915. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  916. defer cancel()
  917. // Goroutine executes the HTTP request, returns an error and sets status
  918. var status int
  919. errc := make(chan error, 1)
  920. go func(url string) {
  921. req, err := NewRequest("HEAD", url)
  922. if err != nil {
  923. errc <- err
  924. return
  925. }
  926. if basicAuth {
  927. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  928. }
  929. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  930. if res != nil {
  931. status = res.StatusCode
  932. if res.Body != nil {
  933. res.Body.Close()
  934. }
  935. }
  936. errc <- err
  937. }(conn.URL())
  938. // Wait for the Goroutine (or its timeout)
  939. select {
  940. case <-ctx.Done(): // timeout
  941. c.errorf("elastic: %s is dead", conn.URL())
  942. conn.MarkAsDead()
  943. case err := <-errc:
  944. if err != nil {
  945. c.errorf("elastic: %s is dead", conn.URL())
  946. conn.MarkAsDead()
  947. break
  948. }
  949. if status >= 200 && status < 300 {
  950. conn.MarkAsAlive()
  951. } else {
  952. conn.MarkAsDead()
  953. c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status)
  954. }
  955. }
  956. }
  957. }
  958. // startupHealthcheck is used at startup to check if the server is available
  959. // at all.
  960. func (c *Client) startupHealthcheck(timeout time.Duration) error {
  961. c.mu.Lock()
  962. urls := c.urls
  963. basicAuth := c.basicAuth
  964. basicAuthUsername := c.basicAuthUsername
  965. basicAuthPassword := c.basicAuthPassword
  966. c.mu.Unlock()
  967. // If we don't get a connection after "timeout", we bail.
  968. var lastErr error
  969. start := time.Now()
  970. for {
  971. for _, url := range urls {
  972. req, err := http.NewRequest("HEAD", url, nil)
  973. if err != nil {
  974. return err
  975. }
  976. if basicAuth {
  977. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  978. }
  979. ctx, cancel := context.WithTimeout(req.Context(), timeout)
  980. defer cancel()
  981. req = req.WithContext(ctx)
  982. res, err := c.c.Do(req)
  983. if err == nil && res != nil && res.StatusCode >= 200 && res.StatusCode < 300 {
  984. return nil
  985. } else if err != nil {
  986. lastErr = err
  987. }
  988. }
  989. time.Sleep(1 * time.Second)
  990. if time.Since(start) > timeout {
  991. break
  992. }
  993. }
  994. if lastErr != nil {
  995. return errors.Wrapf(ErrNoClient, "health check timeout: %v", lastErr)
  996. }
  997. return errors.Wrap(ErrNoClient, "health check timeout")
  998. }
  999. // next returns the next available connection, or ErrNoClient.
  1000. func (c *Client) next() (*conn, error) {
  1001. // We do round-robin here.
  1002. // TODO(oe) This should be a pluggable strategy, like the Selector in the official clients.
  1003. c.connsMu.Lock()
  1004. defer c.connsMu.Unlock()
  1005. i := 0
  1006. numConns := len(c.conns)
  1007. for {
  1008. i++
  1009. if i > numConns {
  1010. break // we visited all conns: they all seem to be dead
  1011. }
  1012. c.cindex++
  1013. if c.cindex >= numConns {
  1014. c.cindex = 0
  1015. }
  1016. conn := c.conns[c.cindex]
  1017. if !conn.IsDead() {
  1018. return conn, nil
  1019. }
  1020. }
  1021. // We have a deadlock here: All nodes are marked as dead.
  1022. // If sniffing is disabled, connections will never be marked alive again.
  1023. // So we are marking them as alive--if sniffing is disabled.
  1024. // They'll then be picked up in the next call to PerformRequest.
  1025. if !c.snifferEnabled {
  1026. c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
  1027. for _, conn := range c.conns {
  1028. conn.MarkAsAlive()
  1029. }
  1030. }
  1031. // We tried hard, but there is no node available
  1032. return nil, errors.Wrap(ErrNoClient, "no available connection")
  1033. }
  1034. // mustActiveConn returns nil if there is an active connection,
  1035. // otherwise ErrNoClient is returned.
  1036. func (c *Client) mustActiveConn() error {
  1037. c.connsMu.Lock()
  1038. defer c.connsMu.Unlock()
  1039. for _, c := range c.conns {
  1040. if !c.IsDead() {
  1041. return nil
  1042. }
  1043. }
  1044. return errors.Wrap(ErrNoClient, "no active connection found")
  1045. }
  1046. // -- PerformRequest --
  1047. // PerformRequestOptions must be passed into PerformRequest.
  1048. type PerformRequestOptions struct {
  1049. Method string
  1050. Path string
  1051. Params url.Values
  1052. Body interface{}
  1053. ContentType string
  1054. IgnoreErrors []int
  1055. Retrier Retrier
  1056. }
  1057. // PerformRequest does a HTTP request to Elasticsearch.
  1058. // See PerformRequestWithContentType for details.
  1059. func (c *Client) PerformRequest(ctx context.Context, method, path string, params url.Values, body interface{}, ignoreErrors ...int) (*Response, error) {
  1060. return c.PerformRequestWithOptions(ctx, PerformRequestOptions{
  1061. Method: method,
  1062. Path: path,
  1063. Params: params,
  1064. Body: body,
  1065. ContentType: "application/json",
  1066. IgnoreErrors: ignoreErrors,
  1067. })
  1068. }
  1069. // PerformRequestWithContentType executes a HTTP request with a specific content type.
  1070. // It returns a response (which might be nil) and an error on failure.
  1071. //
  1072. // Optionally, a list of HTTP error codes to ignore can be passed.
  1073. // This is necessary for services that expect e.g. HTTP status 404 as a
  1074. // valid outcome (Exists, IndicesExists, IndicesTypeExists).
  1075. func (c *Client) PerformRequestWithContentType(ctx context.Context, method, path string, params url.Values, body interface{}, contentType string, ignoreErrors ...int) (*Response, error) {
  1076. return c.PerformRequestWithOptions(ctx, PerformRequestOptions{
  1077. Method: method,
  1078. Path: path,
  1079. Params: params,
  1080. Body: body,
  1081. ContentType: contentType,
  1082. IgnoreErrors: ignoreErrors,
  1083. })
  1084. }
  1085. // PerformRequestWithOptions executes a HTTP request with the specified options.
  1086. // It returns a response (which might be nil) and an error on failure.
  1087. func (c *Client) PerformRequestWithOptions(ctx context.Context, opt PerformRequestOptions) (*Response, error) {
  1088. start := time.Now().UTC()
  1089. c.mu.RLock()
  1090. timeout := c.healthcheckTimeout
  1091. basicAuth := c.basicAuth
  1092. basicAuthUsername := c.basicAuthUsername
  1093. basicAuthPassword := c.basicAuthPassword
  1094. sendGetBodyAs := c.sendGetBodyAs
  1095. gzipEnabled := c.gzipEnabled
  1096. retrier := c.retrier
  1097. if opt.Retrier != nil {
  1098. retrier = opt.Retrier
  1099. }
  1100. c.mu.RUnlock()
  1101. var err error
  1102. var conn *conn
  1103. var req *Request
  1104. var resp *Response
  1105. var retried bool
  1106. var n int
  1107. // Change method if sendGetBodyAs is specified.
  1108. if opt.Method == "GET" && opt.Body != nil && sendGetBodyAs != "GET" {
  1109. opt.Method = sendGetBodyAs
  1110. }
  1111. for {
  1112. pathWithParams := opt.Path
  1113. if len(opt.Params) > 0 {
  1114. pathWithParams += "?" + opt.Params.Encode()
  1115. }
  1116. // Get a connection
  1117. conn, err = c.next()
  1118. if errors.Cause(err) == ErrNoClient {
  1119. n++
  1120. if !retried {
  1121. // Force a healtcheck as all connections seem to be dead.
  1122. c.healthcheck(timeout, false)
  1123. }
  1124. wait, ok, rerr := retrier.Retry(ctx, n, nil, nil, err)
  1125. if rerr != nil {
  1126. return nil, rerr
  1127. }
  1128. if !ok {
  1129. return nil, err
  1130. }
  1131. retried = true
  1132. time.Sleep(wait)
  1133. continue // try again
  1134. }
  1135. if err != nil {
  1136. c.errorf("elastic: cannot get connection from pool")
  1137. return nil, err
  1138. }
  1139. req, err = NewRequest(opt.Method, conn.URL()+pathWithParams)
  1140. if err != nil {
  1141. c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
  1142. return nil, err
  1143. }
  1144. if basicAuth {
  1145. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  1146. }
  1147. if opt.ContentType != "" {
  1148. req.Header.Set("Content-Type", opt.ContentType)
  1149. }
  1150. // Set body
  1151. if opt.Body != nil {
  1152. err = req.SetBody(opt.Body, gzipEnabled)
  1153. if err != nil {
  1154. c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err)
  1155. return nil, err
  1156. }
  1157. }
  1158. // Tracing
  1159. c.dumpRequest((*http.Request)(req))
  1160. // Get response
  1161. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  1162. if err == context.Canceled || err == context.DeadlineExceeded {
  1163. // Proceed, but don't mark the node as dead
  1164. return nil, err
  1165. }
  1166. if ue, ok := err.(*url.Error); ok {
  1167. // This happens e.g. on redirect errors, see https://golang.org/src/net/http/client_test.go#L329
  1168. if ue.Err == context.Canceled || ue.Err == context.DeadlineExceeded {
  1169. // Proceed, but don't mark the node as dead
  1170. return nil, err
  1171. }
  1172. }
  1173. if err != nil {
  1174. n++
  1175. wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
  1176. if rerr != nil {
  1177. c.errorf("elastic: %s is dead", conn.URL())
  1178. conn.MarkAsDead()
  1179. return nil, rerr
  1180. }
  1181. if !ok {
  1182. c.errorf("elastic: %s is dead", conn.URL())
  1183. conn.MarkAsDead()
  1184. return nil, err
  1185. }
  1186. retried = true
  1187. time.Sleep(wait)
  1188. continue // try again
  1189. }
  1190. if res.Body != nil {
  1191. defer res.Body.Close()
  1192. }
  1193. // Tracing
  1194. c.dumpResponse(res)
  1195. // Check for errors
  1196. if err := checkResponse((*http.Request)(req), res, opt.IgnoreErrors...); err != nil {
  1197. // No retry if request succeeded
  1198. // We still try to return a response.
  1199. resp, _ = c.newResponse(res)
  1200. return resp, err
  1201. }
  1202. // We successfully made a request with this connection
  1203. conn.MarkAsHealthy()
  1204. resp, err = c.newResponse(res)
  1205. if err != nil {
  1206. return nil, err
  1207. }
  1208. break
  1209. }
  1210. duration := time.Now().UTC().Sub(start)
  1211. c.infof("%s %s [status:%d, request:%.3fs]",
  1212. strings.ToUpper(opt.Method),
  1213. req.URL,
  1214. resp.StatusCode,
  1215. float64(int64(duration/time.Millisecond))/1000)
  1216. return resp, nil
  1217. }
  1218. // -- Document APIs --
  1219. // Index a document.
  1220. func (c *Client) Index() *IndexService {
  1221. return NewIndexService(c)
  1222. }
  1223. // Get a document.
  1224. func (c *Client) Get() *GetService {
  1225. return NewGetService(c)
  1226. }
  1227. // MultiGet retrieves multiple documents in one roundtrip.
  1228. func (c *Client) MultiGet() *MgetService {
  1229. return NewMgetService(c)
  1230. }
  1231. // Mget retrieves multiple documents in one roundtrip.
  1232. func (c *Client) Mget() *MgetService {
  1233. return NewMgetService(c)
  1234. }
  1235. // Delete a document.
  1236. func (c *Client) Delete() *DeleteService {
  1237. return NewDeleteService(c)
  1238. }
  1239. // DeleteByQuery deletes documents as found by a query.
  1240. func (c *Client) DeleteByQuery(indices ...string) *DeleteByQueryService {
  1241. return NewDeleteByQueryService(c).Index(indices...)
  1242. }
  1243. // Update a document.
  1244. func (c *Client) Update() *UpdateService {
  1245. return NewUpdateService(c)
  1246. }
  1247. // UpdateByQuery performs an update on a set of documents.
  1248. func (c *Client) UpdateByQuery(indices ...string) *UpdateByQueryService {
  1249. return NewUpdateByQueryService(c).Index(indices...)
  1250. }
  1251. // Bulk is the entry point to mass insert/update/delete documents.
  1252. func (c *Client) Bulk() *BulkService {
  1253. return NewBulkService(c)
  1254. }
  1255. // BulkProcessor allows setting up a concurrent processor of bulk requests.
  1256. func (c *Client) BulkProcessor() *BulkProcessorService {
  1257. return NewBulkProcessorService(c)
  1258. }
  1259. // Reindex copies data from a source index into a destination index.
  1260. //
  1261. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-reindex.html
  1262. // for details on the Reindex API.
  1263. func (c *Client) Reindex() *ReindexService {
  1264. return NewReindexService(c)
  1265. }
  1266. // TermVectors returns information and statistics on terms in the fields
  1267. // of a particular document.
  1268. func (c *Client) TermVectors(index, typ string) *TermvectorsService {
  1269. builder := NewTermvectorsService(c)
  1270. builder = builder.Index(index).Type(typ)
  1271. return builder
  1272. }
  1273. // MultiTermVectors returns information and statistics on terms in the fields
  1274. // of multiple documents.
  1275. func (c *Client) MultiTermVectors() *MultiTermvectorService {
  1276. return NewMultiTermvectorService(c)
  1277. }
  1278. // -- Search APIs --
  1279. // Search is the entry point for searches.
  1280. func (c *Client) Search(indices ...string) *SearchService {
  1281. return NewSearchService(c).Index(indices...)
  1282. }
  1283. // Suggest returns a service to return suggestions.
  1284. func (c *Client) Suggest(indices ...string) *SuggestService {
  1285. return NewSuggestService(c).Index(indices...)
  1286. }
  1287. // MultiSearch is the entry point for multi searches.
  1288. func (c *Client) MultiSearch() *MultiSearchService {
  1289. return NewMultiSearchService(c)
  1290. }
  1291. // Count documents.
  1292. func (c *Client) Count(indices ...string) *CountService {
  1293. return NewCountService(c).Index(indices...)
  1294. }
  1295. // Explain computes a score explanation for a query and a specific document.
  1296. func (c *Client) Explain(index, typ, id string) *ExplainService {
  1297. return NewExplainService(c).Index(index).Type(typ).Id(id)
  1298. }
  1299. // TODO Search Template
  1300. // TODO Search Exists API
  1301. // Validate allows a user to validate a potentially expensive query without executing it.
  1302. func (c *Client) Validate(indices ...string) *ValidateService {
  1303. return NewValidateService(c).Index(indices...)
  1304. }
  1305. // SearchShards returns statistical information about nodes and shards.
  1306. func (c *Client) SearchShards(indices ...string) *SearchShardsService {
  1307. return NewSearchShardsService(c).Index(indices...)
  1308. }
  1309. // FieldCaps returns statistical information about fields in indices.
  1310. func (c *Client) FieldCaps(indices ...string) *FieldCapsService {
  1311. return NewFieldCapsService(c).Index(indices...)
  1312. }
  1313. // FieldStats returns statistical information about fields in indices.
  1314. func (c *Client) FieldStats(indices ...string) *FieldStatsService {
  1315. return NewFieldStatsService(c).Index(indices...)
  1316. }
  1317. // Exists checks if a document exists.
  1318. func (c *Client) Exists() *ExistsService {
  1319. return NewExistsService(c)
  1320. }
  1321. // Scroll through documents. Use this to efficiently scroll through results
  1322. // while returning the results to a client.
  1323. func (c *Client) Scroll(indices ...string) *ScrollService {
  1324. return NewScrollService(c).Index(indices...)
  1325. }
  1326. // ClearScroll can be used to clear search contexts manually.
  1327. func (c *Client) ClearScroll(scrollIds ...string) *ClearScrollService {
  1328. return NewClearScrollService(c).ScrollId(scrollIds...)
  1329. }
  1330. // -- Indices APIs --
  1331. // CreateIndex returns a service to create a new index.
  1332. func (c *Client) CreateIndex(name string) *IndicesCreateService {
  1333. return NewIndicesCreateService(c).Index(name)
  1334. }
  1335. // DeleteIndex returns a service to delete an index.
  1336. func (c *Client) DeleteIndex(indices ...string) *IndicesDeleteService {
  1337. return NewIndicesDeleteService(c).Index(indices)
  1338. }
  1339. // IndexExists allows to check if an index exists.
  1340. func (c *Client) IndexExists(indices ...string) *IndicesExistsService {
  1341. return NewIndicesExistsService(c).Index(indices)
  1342. }
  1343. // ShrinkIndex returns a service to shrink one index into another.
  1344. func (c *Client) ShrinkIndex(source, target string) *IndicesShrinkService {
  1345. return NewIndicesShrinkService(c).Source(source).Target(target)
  1346. }
  1347. // RolloverIndex rolls an alias over to a new index when the existing index
  1348. // is considered to be too large or too old.
  1349. func (c *Client) RolloverIndex(alias string) *IndicesRolloverService {
  1350. return NewIndicesRolloverService(c).Alias(alias)
  1351. }
  1352. // TypeExists allows to check if one or more types exist in one or more indices.
  1353. func (c *Client) TypeExists() *IndicesExistsTypeService {
  1354. return NewIndicesExistsTypeService(c)
  1355. }
  1356. // IndexStats provides statistics on different operations happining
  1357. // in one or more indices.
  1358. func (c *Client) IndexStats(indices ...string) *IndicesStatsService {
  1359. return NewIndicesStatsService(c).Index(indices...)
  1360. }
  1361. // OpenIndex opens an index.
  1362. func (c *Client) OpenIndex(name string) *IndicesOpenService {
  1363. return NewIndicesOpenService(c).Index(name)
  1364. }
  1365. // CloseIndex closes an index.
  1366. func (c *Client) CloseIndex(name string) *IndicesCloseService {
  1367. return NewIndicesCloseService(c).Index(name)
  1368. }
  1369. // IndexGet retrieves information about one or more indices.
  1370. // IndexGet is only available for Elasticsearch 1.4 or later.
  1371. func (c *Client) IndexGet(indices ...string) *IndicesGetService {
  1372. return NewIndicesGetService(c).Index(indices...)
  1373. }
  1374. // IndexGetSettings retrieves settings of all, one or more indices.
  1375. func (c *Client) IndexGetSettings(indices ...string) *IndicesGetSettingsService {
  1376. return NewIndicesGetSettingsService(c).Index(indices...)
  1377. }
  1378. // IndexPutSettings sets settings for all, one or more indices.
  1379. func (c *Client) IndexPutSettings(indices ...string) *IndicesPutSettingsService {
  1380. return NewIndicesPutSettingsService(c).Index(indices...)
  1381. }
  1382. // IndexSegments retrieves low level segment information for all, one or more indices.
  1383. func (c *Client) IndexSegments(indices ...string) *IndicesSegmentsService {
  1384. return NewIndicesSegmentsService(c).Index(indices...)
  1385. }
  1386. // IndexAnalyze performs the analysis process on a text and returns the
  1387. // token breakdown of the text.
  1388. func (c *Client) IndexAnalyze() *IndicesAnalyzeService {
  1389. return NewIndicesAnalyzeService(c)
  1390. }
  1391. // Forcemerge optimizes one or more indices.
  1392. // It replaces the deprecated Optimize API.
  1393. func (c *Client) Forcemerge(indices ...string) *IndicesForcemergeService {
  1394. return NewIndicesForcemergeService(c).Index(indices...)
  1395. }
  1396. // Refresh asks Elasticsearch to refresh one or more indices.
  1397. func (c *Client) Refresh(indices ...string) *RefreshService {
  1398. return NewRefreshService(c).Index(indices...)
  1399. }
  1400. // Flush asks Elasticsearch to free memory from the index and
  1401. // flush data to disk.
  1402. func (c *Client) Flush(indices ...string) *IndicesFlushService {
  1403. return NewIndicesFlushService(c).Index(indices...)
  1404. }
  1405. // Alias enables the caller to add and/or remove aliases.
  1406. func (c *Client) Alias() *AliasService {
  1407. return NewAliasService(c)
  1408. }
  1409. // Aliases returns aliases by index name(s).
  1410. func (c *Client) Aliases() *AliasesService {
  1411. return NewAliasesService(c)
  1412. }
  1413. // GetTemplate gets a search template.
  1414. // Use IndexXXXTemplate funcs to manage index templates.
  1415. func (c *Client) GetTemplate() *GetTemplateService {
  1416. return NewGetTemplateService(c)
  1417. }
  1418. // PutTemplate creates or updates a search template.
  1419. // Use IndexXXXTemplate funcs to manage index templates.
  1420. func (c *Client) PutTemplate() *PutTemplateService {
  1421. return NewPutTemplateService(c)
  1422. }
  1423. // DeleteTemplate deletes a search template.
  1424. // Use IndexXXXTemplate funcs to manage index templates.
  1425. func (c *Client) DeleteTemplate() *DeleteTemplateService {
  1426. return NewDeleteTemplateService(c)
  1427. }
  1428. // IndexGetTemplate gets an index template.
  1429. // Use XXXTemplate funcs to manage search templates.
  1430. func (c *Client) IndexGetTemplate(names ...string) *IndicesGetTemplateService {
  1431. return NewIndicesGetTemplateService(c).Name(names...)
  1432. }
  1433. // IndexTemplateExists gets check if an index template exists.
  1434. // Use XXXTemplate funcs to manage search templates.
  1435. func (c *Client) IndexTemplateExists(name string) *IndicesExistsTemplateService {
  1436. return NewIndicesExistsTemplateService(c).Name(name)
  1437. }
  1438. // IndexPutTemplate creates or updates an index template.
  1439. // Use XXXTemplate funcs to manage search templates.
  1440. func (c *Client) IndexPutTemplate(name string) *IndicesPutTemplateService {
  1441. return NewIndicesPutTemplateService(c).Name(name)
  1442. }
  1443. // IndexDeleteTemplate deletes an index template.
  1444. // Use XXXTemplate funcs to manage search templates.
  1445. func (c *Client) IndexDeleteTemplate(name string) *IndicesDeleteTemplateService {
  1446. return NewIndicesDeleteTemplateService(c).Name(name)
  1447. }
  1448. // GetMapping gets a mapping.
  1449. func (c *Client) GetMapping() *IndicesGetMappingService {
  1450. return NewIndicesGetMappingService(c)
  1451. }
  1452. // PutMapping registers a mapping.
  1453. func (c *Client) PutMapping() *IndicesPutMappingService {
  1454. return NewIndicesPutMappingService(c)
  1455. }
  1456. // GetFieldMapping gets mapping for fields.
  1457. func (c *Client) GetFieldMapping() *IndicesGetFieldMappingService {
  1458. return NewIndicesGetFieldMappingService(c)
  1459. }
  1460. // -- cat APIs --
  1461. // TODO cat aliases
  1462. // TODO cat allocation
  1463. // TODO cat count
  1464. // TODO cat fielddata
  1465. // TODO cat health
  1466. // TODO cat indices
  1467. // TODO cat master
  1468. // TODO cat nodes
  1469. // TODO cat pending tasks
  1470. // TODO cat plugins
  1471. // TODO cat recovery
  1472. // TODO cat thread pool
  1473. // TODO cat shards
  1474. // TODO cat segments
  1475. // -- Ingest APIs --
  1476. // IngestPutPipeline adds pipelines and updates existing pipelines in
  1477. // the cluster.
  1478. func (c *Client) IngestPutPipeline(id string) *IngestPutPipelineService {
  1479. return NewIngestPutPipelineService(c).Id(id)
  1480. }
  1481. // IngestGetPipeline returns pipelines based on ID.
  1482. func (c *Client) IngestGetPipeline(ids ...string) *IngestGetPipelineService {
  1483. return NewIngestGetPipelineService(c).Id(ids...)
  1484. }
  1485. // IngestDeletePipeline deletes a pipeline by ID.
  1486. func (c *Client) IngestDeletePipeline(id string) *IngestDeletePipelineService {
  1487. return NewIngestDeletePipelineService(c).Id(id)
  1488. }
  1489. // IngestSimulatePipeline executes a specific pipeline against the set of
  1490. // documents provided in the body of the request.
  1491. func (c *Client) IngestSimulatePipeline() *IngestSimulatePipelineService {
  1492. return NewIngestSimulatePipelineService(c)
  1493. }
  1494. // -- Cluster APIs --
  1495. // ClusterHealth retrieves the health of the cluster.
  1496. func (c *Client) ClusterHealth() *ClusterHealthService {
  1497. return NewClusterHealthService(c)
  1498. }
  1499. // ClusterState retrieves the state of the cluster.
  1500. func (c *Client) ClusterState() *ClusterStateService {
  1501. return NewClusterStateService(c)
  1502. }
  1503. // ClusterStats retrieves cluster statistics.
  1504. func (c *Client) ClusterStats() *ClusterStatsService {
  1505. return NewClusterStatsService(c)
  1506. }
  1507. // NodesInfo retrieves one or more or all of the cluster nodes information.
  1508. func (c *Client) NodesInfo() *NodesInfoService {
  1509. return NewNodesInfoService(c)
  1510. }
  1511. // NodesStats retrieves one or more or all of the cluster nodes statistics.
  1512. func (c *Client) NodesStats() *NodesStatsService {
  1513. return NewNodesStatsService(c)
  1514. }
  1515. // TasksCancel cancels tasks running on the specified nodes.
  1516. func (c *Client) TasksCancel() *TasksCancelService {
  1517. return NewTasksCancelService(c)
  1518. }
  1519. // TasksList retrieves the list of tasks running on the specified nodes.
  1520. func (c *Client) TasksList() *TasksListService {
  1521. return NewTasksListService(c)
  1522. }
  1523. // TasksGetTask retrieves a task running on the cluster.
  1524. func (c *Client) TasksGetTask() *TasksGetTaskService {
  1525. return NewTasksGetTaskService(c)
  1526. }
  1527. // TODO Pending cluster tasks
  1528. // TODO Cluster Reroute
  1529. // TODO Cluster Update Settings
  1530. // TODO Nodes Stats
  1531. // TODO Nodes hot_threads
  1532. // -- Snapshot and Restore --
  1533. // TODO Snapshot Delete
  1534. // TODO Snapshot Get
  1535. // TODO Snapshot Restore
  1536. // TODO Snapshot Status
  1537. // SnapshotCreate creates a snapshot.
  1538. func (c *Client) SnapshotCreate(repository string, snapshot string) *SnapshotCreateService {
  1539. return NewSnapshotCreateService(c).Repository(repository).Snapshot(snapshot)
  1540. }
  1541. // SnapshotCreateRepository creates or updates a snapshot repository.
  1542. func (c *Client) SnapshotCreateRepository(repository string) *SnapshotCreateRepositoryService {
  1543. return NewSnapshotCreateRepositoryService(c).Repository(repository)
  1544. }
  1545. // SnapshotDeleteRepository deletes a snapshot repository.
  1546. func (c *Client) SnapshotDeleteRepository(repositories ...string) *SnapshotDeleteRepositoryService {
  1547. return NewSnapshotDeleteRepositoryService(c).Repository(repositories...)
  1548. }
  1549. // SnapshotGetRepository gets a snapshot repository.
  1550. func (c *Client) SnapshotGetRepository(repositories ...string) *SnapshotGetRepositoryService {
  1551. return NewSnapshotGetRepositoryService(c).Repository(repositories...)
  1552. }
  1553. // SnapshotVerifyRepository verifies a snapshot repository.
  1554. func (c *Client) SnapshotVerifyRepository(repository string) *SnapshotVerifyRepositoryService {
  1555. return NewSnapshotVerifyRepositoryService(c).Repository(repository)
  1556. }
  1557. // -- Helpers and shortcuts --
  1558. // ElasticsearchVersion returns the version number of Elasticsearch
  1559. // running on the given URL.
  1560. func (c *Client) ElasticsearchVersion(url string) (string, error) {
  1561. res, _, err := c.Ping(url).Do(context.Background())
  1562. if err != nil {
  1563. return "", err
  1564. }
  1565. return res.Version.Number, nil
  1566. }
  1567. // IndexNames returns the names of all indices in the cluster.
  1568. func (c *Client) IndexNames() ([]string, error) {
  1569. res, err := c.IndexGetSettings().Index("_all").Do(context.Background())
  1570. if err != nil {
  1571. return nil, err
  1572. }
  1573. var names []string
  1574. for name := range res {
  1575. names = append(names, name)
  1576. }
  1577. return names, nil
  1578. }
  1579. // Ping checks if a given node in a cluster exists and (optionally)
  1580. // returns some basic information about the Elasticsearch server,
  1581. // e.g. the Elasticsearch version number.
  1582. //
  1583. // Notice that you need to specify a URL here explicitly.
  1584. func (c *Client) Ping(url string) *PingService {
  1585. return NewPingService(c).URL(url)
  1586. }
  1587. // WaitForStatus waits for the cluster to have the given status.
  1588. // This is a shortcut method for the ClusterHealth service.
  1589. //
  1590. // WaitForStatus waits for the specified timeout, e.g. "10s".
  1591. // If the cluster will have the given state within the timeout, nil is returned.
  1592. // If the request timed out, ErrTimeout is returned.
  1593. func (c *Client) WaitForStatus(status string, timeout string) error {
  1594. health, err := c.ClusterHealth().WaitForStatus(status).Timeout(timeout).Do(context.Background())
  1595. if err != nil {
  1596. return err
  1597. }
  1598. if health.TimedOut {
  1599. return ErrTimeout
  1600. }
  1601. return nil
  1602. }
  1603. // WaitForGreenStatus waits for the cluster to have the "green" status.
  1604. // See WaitForStatus for more details.
  1605. func (c *Client) WaitForGreenStatus(timeout string) error {
  1606. return c.WaitForStatus("green", timeout)
  1607. }
  1608. // WaitForYellowStatus waits for the cluster to have the "yellow" status.
  1609. // See WaitForStatus for more details.
  1610. func (c *Client) WaitForYellowStatus(timeout string) error {
  1611. return c.WaitForStatus("yellow", timeout)
  1612. }