client_test.go 35 KB


  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "log"
  12. "net"
  13. "net/http"
  14. "reflect"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "testing"
  19. "time"
  20. "github.com/fortytw2/leaktest"
  21. )
  22. func findConn(s string, slice ...*conn) (int, bool) {
  23. for i, t := range slice {
  24. if s == t.URL() {
  25. return i, true
  26. }
  27. }
  28. return -1, false
  29. }
  30. // -- NewClient --
  31. func TestClientDefaults(t *testing.T) {
  32. client, err := NewClient()
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. if client.healthcheckEnabled != true {
  37. t.Errorf("expected health checks to be enabled, got: %v", client.healthcheckEnabled)
  38. }
  39. if client.healthcheckTimeoutStartup != DefaultHealthcheckTimeoutStartup {
  40. t.Errorf("expected health checks timeout on startup = %v, got: %v", DefaultHealthcheckTimeoutStartup, client.healthcheckTimeoutStartup)
  41. }
  42. if client.healthcheckTimeout != DefaultHealthcheckTimeout {
  43. t.Errorf("expected health checks timeout = %v, got: %v", DefaultHealthcheckTimeout, client.healthcheckTimeout)
  44. }
  45. if client.healthcheckInterval != DefaultHealthcheckInterval {
  46. t.Errorf("expected health checks interval = %v, got: %v", DefaultHealthcheckInterval, client.healthcheckInterval)
  47. }
  48. if client.snifferEnabled != true {
  49. t.Errorf("expected sniffing to be enabled, got: %v", client.snifferEnabled)
  50. }
  51. if client.snifferTimeoutStartup != DefaultSnifferTimeoutStartup {
  52. t.Errorf("expected sniffer timeout on startup = %v, got: %v", DefaultSnifferTimeoutStartup, client.snifferTimeoutStartup)
  53. }
  54. if client.snifferTimeout != DefaultSnifferTimeout {
  55. t.Errorf("expected sniffer timeout = %v, got: %v", DefaultSnifferTimeout, client.snifferTimeout)
  56. }
  57. if client.snifferInterval != DefaultSnifferInterval {
  58. t.Errorf("expected sniffer interval = %v, got: %v", DefaultSnifferInterval, client.snifferInterval)
  59. }
  60. if client.basicAuth != false {
  61. t.Errorf("expected no basic auth; got: %v", client.basicAuth)
  62. }
  63. if client.basicAuthUsername != "" {
  64. t.Errorf("expected no basic auth username; got: %q", client.basicAuthUsername)
  65. }
  66. if client.basicAuthPassword != "" {
  67. t.Errorf("expected no basic auth password; got: %q", client.basicAuthUsername)
  68. }
  69. if client.sendGetBodyAs != "GET" {
  70. t.Errorf("expected sendGetBodyAs to be GET; got: %q", client.sendGetBodyAs)
  71. }
  72. }
  73. func TestClientWithoutURL(t *testing.T) {
  74. client, err := NewClient()
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. // Two things should happen here:
  79. // 1. The client starts sniffing the cluster on DefaultURL
  80. // 2. The sniffing process should find (at least) one node in the cluster, i.e. the DefaultURL
  81. if len(client.conns) == 0 {
  82. t.Fatalf("expected at least 1 node in the cluster, got: %d (%v)", len(client.conns), client.conns)
  83. }
  84. if !isTravis() {
  85. if _, found := findConn(DefaultURL, client.conns...); !found {
  86. t.Errorf("expected to find node with default URL of %s in %v", DefaultURL, client.conns)
  87. }
  88. }
  89. }
  90. func TestClientWithSingleURL(t *testing.T) {
  91. client, err := NewClient(SetURL("http://127.0.0.1:9200"))
  92. if err != nil {
  93. t.Fatal(err)
  94. }
  95. // Two things should happen here:
  96. // 1. The client starts sniffing the cluster on DefaultURL
  97. // 2. The sniffing process should find (at least) one node in the cluster, i.e. the DefaultURL
  98. if len(client.conns) == 0 {
  99. t.Fatalf("expected at least 1 node in the cluster, got: %d (%v)", len(client.conns), client.conns)
  100. }
  101. if !isTravis() {
  102. if _, found := findConn(DefaultURL, client.conns...); !found {
  103. t.Errorf("expected to find node with default URL of %s in %v", DefaultURL, client.conns)
  104. }
  105. }
  106. }
  107. func TestClientWithMultipleURLs(t *testing.T) {
  108. client, err := NewClient(SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. // The client should sniff both URLs, but only 127.0.0.1:9200 should return nodes.
  113. if len(client.conns) != 1 {
  114. t.Fatalf("expected exactly 1 node in the local cluster, got: %d (%v)", len(client.conns), client.conns)
  115. }
  116. if !isTravis() {
  117. if client.conns[0].URL() != DefaultURL {
  118. t.Errorf("expected to find node with default URL of %s in %v", DefaultURL, client.conns)
  119. }
  120. }
  121. }
  122. func TestClientWithBasicAuth(t *testing.T) {
  123. client, err := NewClient(SetBasicAuth("user", "secret"))
  124. if err != nil {
  125. t.Fatal(err)
  126. }
  127. if client.basicAuth != true {
  128. t.Errorf("expected basic auth; got: %v", client.basicAuth)
  129. }
  130. if got, want := client.basicAuthUsername, "user"; got != want {
  131. t.Errorf("expected basic auth username %q; got: %q", want, got)
  132. }
  133. if got, want := client.basicAuthPassword, "secret"; got != want {
  134. t.Errorf("expected basic auth password %q; got: %q", want, got)
  135. }
  136. }
  137. func TestClientWithBasicAuthInUserInfo(t *testing.T) {
  138. client, err := NewClient(SetURL("http://user1:secret1@localhost:9200", "http://user2:secret2@localhost:9200"))
  139. if err != nil {
  140. t.Fatal(err)
  141. }
  142. if client.basicAuth != true {
  143. t.Errorf("expected basic auth; got: %v", client.basicAuth)
  144. }
  145. if got, want := client.basicAuthUsername, "user1"; got != want {
  146. t.Errorf("expected basic auth username %q; got: %q", want, got)
  147. }
  148. if got, want := client.basicAuthPassword, "secret1"; got != want {
  149. t.Errorf("expected basic auth password %q; got: %q", want, got)
  150. }
  151. }
  152. func TestClientSniffSuccess(t *testing.T) {
  153. client, err := NewClient(SetURL("http://127.0.0.1:19200", "http://127.0.0.1:9200"))
  154. if err != nil {
  155. t.Fatal(err)
  156. }
  157. // The client should sniff both URLs, but only 127.0.0.1:9200 should return nodes.
  158. if len(client.conns) != 1 {
  159. t.Fatalf("expected exactly 1 node in the local cluster, got: %d (%v)", len(client.conns), client.conns)
  160. }
  161. }
  162. func TestClientSniffFailure(t *testing.T) {
  163. _, err := NewClient(SetURL("http://127.0.0.1:19200", "http://127.0.0.1:19201"))
  164. if err == nil {
  165. t.Fatalf("expected cluster to fail with no nodes found")
  166. }
  167. }
  168. func TestClientSnifferCallback(t *testing.T) {
  169. var calls int
  170. cb := func(node *NodesInfoNode) bool {
  171. calls++
  172. return false
  173. }
  174. _, err := NewClient(
  175. SetURL("http://127.0.0.1:19200", "http://127.0.0.1:9200"),
  176. SetSnifferCallback(cb))
  177. if err == nil {
  178. t.Fatalf("expected cluster to fail with no nodes found")
  179. }
  180. if calls != 1 {
  181. t.Fatalf("expected 1 call to the sniffer callback, got %d", calls)
  182. }
  183. }
  184. func TestClientSniffDisabled(t *testing.T) {
  185. client, err := NewClient(SetSniff(false), SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  186. if err != nil {
  187. t.Fatal(err)
  188. }
  189. // The client should not sniff, so it should have two connections.
  190. if len(client.conns) != 2 {
  191. t.Fatalf("expected 2 nodes, got: %d (%v)", len(client.conns), client.conns)
  192. }
  193. // Make two requests, so that both connections are being used
  194. for i := 0; i < len(client.conns); i++ {
  195. client.Flush().Do(context.TODO())
  196. }
  197. // The first connection (127.0.0.1:9200) should now be okay.
  198. if i, found := findConn("http://127.0.0.1:9200", client.conns...); !found {
  199. t.Fatalf("expected connection to %q to be found", "http://127.0.0.1:9200")
  200. } else {
  201. if conn := client.conns[i]; conn.IsDead() {
  202. t.Fatal("expected connection to be alive, but it is dead")
  203. }
  204. }
  205. // The second connection (127.0.0.1:9201) should now be marked as dead.
  206. if i, found := findConn("http://127.0.0.1:9201", client.conns...); !found {
  207. t.Fatalf("expected connection to %q to be found", "http://127.0.0.1:9201")
  208. } else {
  209. if conn := client.conns[i]; !conn.IsDead() {
  210. t.Fatal("expected connection to be dead, but it is alive")
  211. }
  212. }
  213. }
  214. func TestClientWillMarkConnectionsAsAliveWhenAllAreDead(t *testing.T) {
  215. client, err := NewClient(SetURL("http://127.0.0.1:9201"),
  216. SetSniff(false), SetHealthcheck(false), SetMaxRetries(0))
  217. if err != nil {
  218. t.Fatal(err)
  219. }
  220. // We should have a connection.
  221. if len(client.conns) != 1 {
  222. t.Fatalf("expected 1 node, got: %d (%v)", len(client.conns), client.conns)
  223. }
  224. // Make a request, so that the connections is marked as dead.
  225. client.Flush().Do(context.TODO())
  226. // The connection should now be marked as dead.
  227. if i, found := findConn("http://127.0.0.1:9201", client.conns...); !found {
  228. t.Fatalf("expected connection to %q to be found", "http://127.0.0.1:9201")
  229. } else {
  230. if conn := client.conns[i]; !conn.IsDead() {
  231. t.Fatalf("expected connection to be dead, got: %v", conn)
  232. }
  233. }
  234. // Now send another request and the connection should be marked as alive again.
  235. client.Flush().Do(context.TODO())
  236. if i, found := findConn("http://127.0.0.1:9201", client.conns...); !found {
  237. t.Fatalf("expected connection to %q to be found", "http://127.0.0.1:9201")
  238. } else {
  239. if conn := client.conns[i]; conn.IsDead() {
  240. t.Fatalf("expected connection to be alive, got: %v", conn)
  241. }
  242. }
  243. }
  244. func TestClientWithRequiredPlugins(t *testing.T) {
  245. _, err := NewClient(SetRequiredPlugins("no-such-plugin"))
  246. if err == nil {
  247. t.Fatal("expected error when creating client")
  248. }
  249. if got, want := err.Error(), "elastic: plugin no-such-plugin not found"; got != want {
  250. t.Fatalf("expected error %q; got: %q", want, got)
  251. }
  252. }
  253. func TestClientHealthcheckStartupTimeout(t *testing.T) {
  254. start := time.Now()
  255. _, err := NewClient(SetURL("http://localhost:9299"), SetHealthcheckTimeoutStartup(5*time.Second))
  256. duration := time.Since(start)
  257. if !IsConnErr(err) {
  258. t.Fatal(err)
  259. }
  260. if !strings.Contains(err.Error(), "connection refused") {
  261. t.Fatalf("expected error to contain %q, have %q", "connection refused", err.Error())
  262. }
  263. if duration < 5*time.Second {
  264. t.Fatalf("expected a timeout in more than 5 seconds; got: %v", duration)
  265. }
  266. }
  267. func TestClientHealthcheckTimeoutLeak(t *testing.T) {
  268. // This test test checks if healthcheck requests are canceled
  269. // after timeout.
  270. // It contains couple of hacks which won't be needed once we
  271. // stop supporting Go1.7.
  272. // On Go1.7 it uses server side effects to monitor if connection
  273. // was closed,
  274. // and on Go 1.8+ we're additionally honestly monitoring routine
  275. // leaks via leaktest.
  276. mux := http.NewServeMux()
  277. var reqDoneMu sync.Mutex
  278. var reqDone bool
  279. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  280. cn, ok := w.(http.CloseNotifier)
  281. if !ok {
  282. t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name())
  283. }
  284. <-cn.CloseNotify()
  285. reqDoneMu.Lock()
  286. reqDone = true
  287. reqDoneMu.Unlock()
  288. })
  289. lis, err := net.Listen("tcp", "127.0.0.1:0")
  290. if err != nil {
  291. t.Fatalf("Couldn't setup listener: %v", err)
  292. }
  293. addr := lis.Addr().String()
  294. srv := &http.Server{
  295. Handler: mux,
  296. }
  297. go srv.Serve(lis)
  298. cli := &Client{
  299. c: &http.Client{},
  300. conns: []*conn{
  301. &conn{
  302. url: "http://" + addr + "/",
  303. },
  304. },
  305. }
  306. type closer interface {
  307. Shutdown(context.Context) error
  308. }
  309. // pre-Go1.8 Server can't Shutdown
  310. cl, isServerCloseable := (interface{}(srv)).(closer)
  311. // Since Go1.7 can't Shutdown() - there will be leak from server
  312. // Monitor leaks on Go 1.8+
  313. if isServerCloseable {
  314. defer leaktest.CheckTimeout(t, time.Second*10)()
  315. }
  316. cli.healthcheck(time.Millisecond*500, true)
  317. if isServerCloseable {
  318. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  319. defer cancel()
  320. cl.Shutdown(ctx)
  321. }
  322. <-time.After(time.Second)
  323. reqDoneMu.Lock()
  324. if !reqDone {
  325. reqDoneMu.Unlock()
  326. t.Fatal("Request wasn't canceled or stopped")
  327. }
  328. reqDoneMu.Unlock()
  329. }
  330. // -- NewSimpleClient --
  331. func TestSimpleClientDefaults(t *testing.T) {
  332. client, err := NewSimpleClient()
  333. if err != nil {
  334. t.Fatal(err)
  335. }
  336. if client.healthcheckEnabled != false {
  337. t.Errorf("expected health checks to be disabled, got: %v", client.healthcheckEnabled)
  338. }
  339. if client.healthcheckTimeoutStartup != off {
  340. t.Errorf("expected health checks timeout on startup = %v, got: %v", off, client.healthcheckTimeoutStartup)
  341. }
  342. if client.healthcheckTimeout != off {
  343. t.Errorf("expected health checks timeout = %v, got: %v", off, client.healthcheckTimeout)
  344. }
  345. if client.healthcheckInterval != off {
  346. t.Errorf("expected health checks interval = %v, got: %v", off, client.healthcheckInterval)
  347. }
  348. if client.snifferEnabled != false {
  349. t.Errorf("expected sniffing to be disabled, got: %v", client.snifferEnabled)
  350. }
  351. if client.snifferTimeoutStartup != off {
  352. t.Errorf("expected sniffer timeout on startup = %v, got: %v", off, client.snifferTimeoutStartup)
  353. }
  354. if client.snifferTimeout != off {
  355. t.Errorf("expected sniffer timeout = %v, got: %v", off, client.snifferTimeout)
  356. }
  357. if client.snifferInterval != off {
  358. t.Errorf("expected sniffer interval = %v, got: %v", off, client.snifferInterval)
  359. }
  360. if client.basicAuth != false {
  361. t.Errorf("expected no basic auth; got: %v", client.basicAuth)
  362. }
  363. if client.basicAuthUsername != "" {
  364. t.Errorf("expected no basic auth username; got: %q", client.basicAuthUsername)
  365. }
  366. if client.basicAuthPassword != "" {
  367. t.Errorf("expected no basic auth password; got: %q", client.basicAuthUsername)
  368. }
  369. if client.sendGetBodyAs != "GET" {
  370. t.Errorf("expected sendGetBodyAs to be GET; got: %q", client.sendGetBodyAs)
  371. }
  372. }
  373. // -- Start and stop --
  374. func TestClientStartAndStop(t *testing.T) {
  375. client, err := NewClient()
  376. if err != nil {
  377. t.Fatal(err)
  378. }
  379. running := client.IsRunning()
  380. if !running {
  381. t.Fatalf("expected background processes to run; got: %v", running)
  382. }
  383. // Stop
  384. client.Stop()
  385. running = client.IsRunning()
  386. if running {
  387. t.Fatalf("expected background processes to be stopped; got: %v", running)
  388. }
  389. // Stop again => no-op
  390. client.Stop()
  391. running = client.IsRunning()
  392. if running {
  393. t.Fatalf("expected background processes to be stopped; got: %v", running)
  394. }
  395. // Start
  396. client.Start()
  397. running = client.IsRunning()
  398. if !running {
  399. t.Fatalf("expected background processes to run; got: %v", running)
  400. }
  401. // Start again => no-op
  402. client.Start()
  403. running = client.IsRunning()
  404. if !running {
  405. t.Fatalf("expected background processes to run; got: %v", running)
  406. }
  407. }
  408. func TestClientStartAndStopWithSnifferAndHealthchecksDisabled(t *testing.T) {
  409. client, err := NewClient(SetSniff(false), SetHealthcheck(false))
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. running := client.IsRunning()
  414. if !running {
  415. t.Fatalf("expected background processes to run; got: %v", running)
  416. }
  417. // Stop
  418. client.Stop()
  419. running = client.IsRunning()
  420. if running {
  421. t.Fatalf("expected background processes to be stopped; got: %v", running)
  422. }
  423. // Stop again => no-op
  424. client.Stop()
  425. running = client.IsRunning()
  426. if running {
  427. t.Fatalf("expected background processes to be stopped; got: %v", running)
  428. }
  429. // Start
  430. client.Start()
  431. running = client.IsRunning()
  432. if !running {
  433. t.Fatalf("expected background processes to run; got: %v", running)
  434. }
  435. // Start again => no-op
  436. client.Start()
  437. running = client.IsRunning()
  438. if !running {
  439. t.Fatalf("expected background processes to run; got: %v", running)
  440. }
  441. }
  442. // -- Sniffing --
  443. func TestClientSniffNode(t *testing.T) {
  444. client, err := NewClient()
  445. if err != nil {
  446. t.Fatal(err)
  447. }
  448. ch := make(chan []*conn)
  449. go func() { ch <- client.sniffNode(context.Background(), DefaultURL) }()
  450. select {
  451. case nodes := <-ch:
  452. if len(nodes) != 1 {
  453. t.Fatalf("expected %d nodes; got: %d", 1, len(nodes))
  454. }
  455. pattern := `http:\/\/[\d\.]+:9200`
  456. matched, err := regexp.MatchString(pattern, nodes[0].URL())
  457. if err != nil {
  458. t.Fatal(err)
  459. }
  460. if !matched {
  461. t.Fatalf("expected node URL pattern %q; got: %q", pattern, nodes[0].URL())
  462. }
  463. case <-time.After(2 * time.Second):
  464. t.Fatal("expected no timeout in sniff node")
  465. break
  466. }
  467. }
  468. func TestClientSniffOnDefaultURL(t *testing.T) {
  469. client, _ := NewClient()
  470. if client == nil {
  471. t.Fatal("no client returned")
  472. }
  473. ch := make(chan error, 1)
  474. go func() {
  475. ch <- client.sniff(DefaultSnifferTimeoutStartup)
  476. }()
  477. select {
  478. case err := <-ch:
  479. if err != nil {
  480. t.Fatalf("expected sniff to succeed; got: %v", err)
  481. }
  482. if len(client.conns) != 1 {
  483. t.Fatalf("expected %d nodes; got: %d", 1, len(client.conns))
  484. }
  485. pattern := `http:\/\/[\d\.]+:9200`
  486. matched, err := regexp.MatchString(pattern, client.conns[0].URL())
  487. if err != nil {
  488. t.Fatal(err)
  489. }
  490. if !matched {
  491. t.Fatalf("expected node URL pattern %q; got: %q", pattern, client.conns[0].URL())
  492. }
  493. case <-time.After(2 * time.Second):
  494. t.Fatal("expected no timeout in sniff")
  495. break
  496. }
  497. }
  498. func TestClientSniffTimeoutLeak(t *testing.T) {
  499. // This test test checks if sniff requests are canceled
  500. // after timeout.
  501. // It contains couple of hacks which won't be needed once we
  502. // stop supporting Go1.7.
  503. // On Go1.7 it uses server side effects to monitor if connection
  504. // was closed,
  505. // and on Go 1.8+ we're additionally honestly monitoring routine
  506. // leaks via leaktest.
  507. mux := http.NewServeMux()
  508. var reqDoneMu sync.Mutex
  509. var reqDone bool
  510. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  511. cn, ok := w.(http.CloseNotifier)
  512. if !ok {
  513. t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name())
  514. }
  515. <-cn.CloseNotify()
  516. reqDoneMu.Lock()
  517. reqDone = true
  518. reqDoneMu.Unlock()
  519. })
  520. lis, err := net.Listen("tcp", "127.0.0.1:0")
  521. if err != nil {
  522. t.Fatalf("Couldn't setup listener: %v", err)
  523. }
  524. addr := lis.Addr().String()
  525. srv := &http.Server{
  526. Handler: mux,
  527. }
  528. go srv.Serve(lis)
  529. cli := &Client{
  530. c: &http.Client{},
  531. conns: []*conn{
  532. &conn{
  533. url: "http://" + addr + "/",
  534. },
  535. },
  536. snifferEnabled: true,
  537. }
  538. type closer interface {
  539. Shutdown(context.Context) error
  540. }
  541. // pre-Go1.8 Server can't Shutdown
  542. cl, isServerCloseable := (interface{}(srv)).(closer)
  543. // Since Go1.7 can't Shutdown() - there will be leak from server
  544. // Monitor leaks on Go 1.8+
  545. if isServerCloseable {
  546. defer leaktest.CheckTimeout(t, time.Second*10)()
  547. }
  548. cli.sniff(time.Millisecond * 500)
  549. if isServerCloseable {
  550. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  551. defer cancel()
  552. cl.Shutdown(ctx)
  553. }
  554. <-time.After(time.Second)
  555. reqDoneMu.Lock()
  556. if !reqDone {
  557. reqDoneMu.Unlock()
  558. t.Fatal("Request wasn't canceled or stopped")
  559. }
  560. reqDoneMu.Unlock()
  561. }
  562. func TestClientExtractHostname(t *testing.T) {
  563. tests := []struct {
  564. Scheme string
  565. Address string
  566. Output string
  567. }{
  568. {
  569. Scheme: "http",
  570. Address: "",
  571. Output: "",
  572. },
  573. {
  574. Scheme: "https",
  575. Address: "abc",
  576. Output: "",
  577. },
  578. {
  579. Scheme: "http",
  580. Address: "127.0.0.1:19200",
  581. Output: "http://127.0.0.1:19200",
  582. },
  583. {
  584. Scheme: "https",
  585. Address: "127.0.0.1:9200",
  586. Output: "https://127.0.0.1:9200",
  587. },
  588. {
  589. Scheme: "http",
  590. Address: "myelk.local/10.1.0.24:9200",
  591. Output: "http://10.1.0.24:9200",
  592. },
  593. }
  594. client, err := NewClient(SetSniff(false), SetHealthcheck(false))
  595. if err != nil {
  596. t.Fatal(err)
  597. }
  598. for _, test := range tests {
  599. got := client.extractHostname(test.Scheme, test.Address)
  600. if want := test.Output; want != got {
  601. t.Errorf("expected %q; got: %q", want, got)
  602. }
  603. }
  604. }
  605. // -- Selector --
  606. func TestClientSelectConnHealthy(t *testing.T) {
  607. client, err := NewClient(
  608. SetSniff(false),
  609. SetHealthcheck(false),
  610. SetURL("http://127.0.0.1:9200/node1", "http://127.0.0.1:9201/node2/"))
  611. if err != nil {
  612. t.Fatal(err)
  613. }
  614. // Both are healthy, so we should get both URLs in round-robin
  615. client.conns[0].MarkAsHealthy()
  616. client.conns[1].MarkAsHealthy()
  617. // #1: Return 1st
  618. c, err := client.next()
  619. if err != nil {
  620. t.Fatal(err)
  621. }
  622. if c.URL() != client.conns[0].URL() {
  623. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  624. }
  625. // #2: Return 2nd
  626. c, err = client.next()
  627. if err != nil {
  628. t.Fatal(err)
  629. }
  630. if c.URL() != client.conns[1].URL() {
  631. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  632. }
  633. // #3: Return 1st
  634. c, err = client.next()
  635. if err != nil {
  636. t.Fatal(err)
  637. }
  638. if c.URL() != client.conns[0].URL() {
  639. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  640. }
  641. }
  642. func TestClientSelectConnHealthyWithURLPrefix(t *testing.T) {
  643. client, err := NewClient(
  644. SetSniff(false),
  645. SetHealthcheck(false),
  646. SetURL("http://127.0.0.1:9200/node1", "http://127.0.0.1:9201/node2/prefix/"))
  647. if err != nil {
  648. t.Fatal(err)
  649. }
  650. // Both are healthy, so we should get both URLs in round-robin
  651. client.conns[0].MarkAsHealthy()
  652. client.conns[1].MarkAsHealthy()
  653. // Check that the connection used the URLs, including its prefix
  654. if want, have := "http://127.0.0.1:9200/node1", client.conns[0].URL(); want != have {
  655. t.Fatalf("want Node[0] = %q, have %q", want, have)
  656. }
  657. // Note that it stripped the / off the suffix
  658. if want, have := "http://127.0.0.1:9201/node2/prefix", client.conns[1].URL(); want != have {
  659. t.Fatalf("want Node[1] = %q, have %q", want, have)
  660. }
  661. // #1: Return 1st
  662. c, err := client.next()
  663. if err != nil {
  664. t.Fatal(err)
  665. }
  666. if c.URL() != client.conns[0].URL() {
  667. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  668. }
  669. // #2: Return 2nd
  670. c, err = client.next()
  671. if err != nil {
  672. t.Fatal(err)
  673. }
  674. if c.URL() != client.conns[1].URL() {
  675. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  676. }
  677. // #3: Return 1st
  678. c, err = client.next()
  679. if err != nil {
  680. t.Fatal(err)
  681. }
  682. if c.URL() != client.conns[0].URL() {
  683. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  684. }
  685. }
  686. func TestClientSelectConnHealthyAndDead(t *testing.T) {
  687. client, err := NewClient(
  688. SetSniff(false),
  689. SetHealthcheck(false),
  690. SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  691. if err != nil {
  692. t.Fatal(err)
  693. }
  694. // 1st is healthy, second is dead
  695. client.conns[0].MarkAsHealthy()
  696. client.conns[1].MarkAsDead()
  697. // #1: Return 1st
  698. c, err := client.next()
  699. if err != nil {
  700. t.Fatal(err)
  701. }
  702. if c.URL() != client.conns[0].URL() {
  703. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  704. }
  705. // #2: Return 1st again
  706. c, err = client.next()
  707. if err != nil {
  708. t.Fatal(err)
  709. }
  710. if c.URL() != client.conns[0].URL() {
  711. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  712. }
  713. // #3: Return 1st again and again
  714. c, err = client.next()
  715. if err != nil {
  716. t.Fatal(err)
  717. }
  718. if c.URL() != client.conns[0].URL() {
  719. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  720. }
  721. }
  722. func TestClientSelectConnDeadAndHealthy(t *testing.T) {
  723. client, err := NewClient(
  724. SetSniff(false),
  725. SetHealthcheck(false),
  726. SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  727. if err != nil {
  728. t.Fatal(err)
  729. }
  730. // 1st is dead, 2nd is healthy
  731. client.conns[0].MarkAsDead()
  732. client.conns[1].MarkAsHealthy()
  733. // #1: Return 2nd
  734. c, err := client.next()
  735. if err != nil {
  736. t.Fatal(err)
  737. }
  738. if c.URL() != client.conns[1].URL() {
  739. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  740. }
  741. // #2: Return 2nd again
  742. c, err = client.next()
  743. if err != nil {
  744. t.Fatal(err)
  745. }
  746. if c.URL() != client.conns[1].URL() {
  747. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  748. }
  749. // #3: Return 2nd again and again
  750. c, err = client.next()
  751. if err != nil {
  752. t.Fatal(err)
  753. }
  754. if c.URL() != client.conns[1].URL() {
  755. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  756. }
  757. }
  758. func TestClientSelectConnAllDead(t *testing.T) {
  759. client, err := NewClient(
  760. SetSniff(false),
  761. SetHealthcheck(false),
  762. SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  763. if err != nil {
  764. t.Fatal(err)
  765. }
  766. // Both are dead
  767. client.conns[0].MarkAsDead()
  768. client.conns[1].MarkAsDead()
  769. // If all connections are dead, next should make them alive again, but
  770. // still return an error when it first finds out.
  771. c, err := client.next()
  772. if !IsConnErr(err) {
  773. t.Fatal(err)
  774. }
  775. if c != nil {
  776. t.Fatalf("expected no connection; got: %v", c)
  777. }
  778. // Return a connection
  779. c, err = client.next()
  780. if err != nil {
  781. t.Fatalf("expected no error; got: %v", err)
  782. }
  783. if c == nil {
  784. t.Fatalf("expected connection; got: %v", c)
  785. }
  786. // Return a connection
  787. c, err = client.next()
  788. if err != nil {
  789. t.Fatalf("expected no error; got: %v", err)
  790. }
  791. if c == nil {
  792. t.Fatalf("expected connection; got: %v", c)
  793. }
  794. }
  795. // -- ElasticsearchVersion --
  796. func TestElasticsearchVersion(t *testing.T) {
  797. client, err := NewClient()
  798. if err != nil {
  799. t.Fatal(err)
  800. }
  801. version, err := client.ElasticsearchVersion(DefaultURL)
  802. if err != nil {
  803. t.Fatal(err)
  804. }
  805. if version == "" {
  806. t.Errorf("expected a version number, got: %q", version)
  807. }
  808. }
  809. // -- IndexNames --
  810. func TestIndexNames(t *testing.T) {
  811. client := setupTestClientAndCreateIndex(t)
  812. names, err := client.IndexNames()
  813. if err != nil {
  814. t.Fatal(err)
  815. }
  816. if len(names) == 0 {
  817. t.Fatalf("expected some index names, got: %d", len(names))
  818. }
  819. var found bool
  820. for _, name := range names {
  821. if name == testIndexName {
  822. found = true
  823. break
  824. }
  825. }
  826. if !found {
  827. t.Fatalf("expected to find index %q; got: %v", testIndexName, found)
  828. }
  829. }
  830. // -- PerformRequest --
  831. func TestPerformRequest(t *testing.T) {
  832. client, err := NewClient()
  833. if err != nil {
  834. t.Fatal(err)
  835. }
  836. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  837. if err != nil {
  838. t.Fatal(err)
  839. }
  840. if res == nil {
  841. t.Fatal("expected response to be != nil")
  842. }
  843. ret := new(PingResult)
  844. if err := json.Unmarshal(res.Body, ret); err != nil {
  845. t.Fatalf("expected no error on decode; got: %v", err)
  846. }
  847. if ret.ClusterName == "" {
  848. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  849. }
  850. }
  851. func TestPerformRequestWithSimpleClient(t *testing.T) {
  852. client, err := NewSimpleClient()
  853. if err != nil {
  854. t.Fatal(err)
  855. }
  856. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  857. if err != nil {
  858. t.Fatal(err)
  859. }
  860. if res == nil {
  861. t.Fatal("expected response to be != nil")
  862. }
  863. ret := new(PingResult)
  864. if err := json.Unmarshal(res.Body, ret); err != nil {
  865. t.Fatalf("expected no error on decode; got: %v", err)
  866. }
  867. if ret.ClusterName == "" {
  868. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  869. }
  870. }
  871. func TestPerformRequestWithLogger(t *testing.T) {
  872. var w bytes.Buffer
  873. out := log.New(&w, "LOGGER ", log.LstdFlags)
  874. client, err := NewClient(SetInfoLog(out), SetSniff(false))
  875. if err != nil {
  876. t.Fatal(err)
  877. }
  878. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  879. if err != nil {
  880. t.Fatal(err)
  881. }
  882. if res == nil {
  883. t.Fatal("expected response to be != nil")
  884. }
  885. ret := new(PingResult)
  886. if err := json.Unmarshal(res.Body, ret); err != nil {
  887. t.Fatalf("expected no error on decode; got: %v", err)
  888. }
  889. if ret.ClusterName == "" {
  890. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  891. }
  892. got := w.String()
  893. pattern := `^LOGGER \d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} GET http://.*/ \[status:200, request:\d+\.\d{3}s\]\n`
  894. matched, err := regexp.MatchString(pattern, got)
  895. if err != nil {
  896. t.Fatalf("expected log line to match %q; got: %v", pattern, err)
  897. }
  898. if !matched {
  899. t.Errorf("expected log line to match %q; got: %v", pattern, got)
  900. }
  901. }
  902. func TestPerformRequestWithLoggerAndTracer(t *testing.T) {
  903. var lw bytes.Buffer
  904. lout := log.New(&lw, "LOGGER ", log.LstdFlags)
  905. var tw bytes.Buffer
  906. tout := log.New(&tw, "TRACER ", log.LstdFlags)
  907. client, err := NewClient(SetInfoLog(lout), SetTraceLog(tout), SetSniff(false))
  908. if err != nil {
  909. t.Fatal(err)
  910. }
  911. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  912. if err != nil {
  913. t.Fatal(err)
  914. }
  915. if res == nil {
  916. t.Fatal("expected response to be != nil")
  917. }
  918. ret := new(PingResult)
  919. if err := json.Unmarshal(res.Body, ret); err != nil {
  920. t.Fatalf("expected no error on decode; got: %v", err)
  921. }
  922. if ret.ClusterName == "" {
  923. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  924. }
  925. lgot := lw.String()
  926. if lgot == "" {
  927. t.Errorf("expected logger output; got: %q", lgot)
  928. }
  929. tgot := tw.String()
  930. if tgot == "" {
  931. t.Errorf("expected tracer output; got: %q", tgot)
  932. }
  933. }
  934. func TestPerformRequestWithTracerOnError(t *testing.T) {
  935. var tw bytes.Buffer
  936. tout := log.New(&tw, "TRACER ", log.LstdFlags)
  937. client, err := NewClient(SetTraceLog(tout), SetSniff(false))
  938. if err != nil {
  939. t.Fatal(err)
  940. }
  941. client.PerformRequest(context.TODO(), "GET", "/no-such-index", nil, nil)
  942. tgot := tw.String()
  943. if tgot == "" {
  944. t.Errorf("expected tracer output; got: %q", tgot)
  945. }
  946. }
  947. type customLogger struct {
  948. out bytes.Buffer
  949. }
  950. func (l *customLogger) Printf(format string, v ...interface{}) {
  951. l.out.WriteString(fmt.Sprintf(format, v...) + "\n")
  952. }
  953. func TestPerformRequestWithCustomLogger(t *testing.T) {
  954. logger := &customLogger{}
  955. client, err := NewClient(SetInfoLog(logger), SetSniff(false))
  956. if err != nil {
  957. t.Fatal(err)
  958. }
  959. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  960. if err != nil {
  961. t.Fatal(err)
  962. }
  963. if res == nil {
  964. t.Fatal("expected response to be != nil")
  965. }
  966. ret := new(PingResult)
  967. if err := json.Unmarshal(res.Body, ret); err != nil {
  968. t.Fatalf("expected no error on decode; got: %v", err)
  969. }
  970. if ret.ClusterName == "" {
  971. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  972. }
  973. got := logger.out.String()
  974. pattern := `^GET http://.*/ \[status:200, request:\d+\.\d{3}s\]\n`
  975. matched, err := regexp.MatchString(pattern, got)
  976. if err != nil {
  977. t.Fatalf("expected log line to match %q; got: %v", pattern, err)
  978. }
  979. if !matched {
  980. t.Errorf("expected log line to match %q; got: %v", pattern, got)
  981. }
  982. }
  983. // failingTransport will run a fail callback if it sees a given URL path prefix.
  984. type failingTransport struct {
  985. path string // path prefix to look for
  986. fail func(*http.Request) (*http.Response, error) // call when path prefix is found
  987. next http.RoundTripper // next round-tripper (use http.DefaultTransport if nil)
  988. }
  989. // RoundTrip implements a failing transport.
  990. func (tr *failingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
  991. if strings.HasPrefix(r.URL.Path, tr.path) && tr.fail != nil {
  992. return tr.fail(r)
  993. }
  994. if tr.next != nil {
  995. return tr.next.RoundTrip(r)
  996. }
  997. return http.DefaultTransport.RoundTrip(r)
  998. }
  999. func TestPerformRequestRetryOnHttpError(t *testing.T) {
  1000. var numFailedReqs int
  1001. fail := func(r *http.Request) (*http.Response, error) {
  1002. numFailedReqs += 1
  1003. //return &http.Response{Request: r, StatusCode: 400}, nil
  1004. return nil, errors.New("request failed")
  1005. }
  1006. // Run against a failing endpoint and see if PerformRequest
  1007. // retries correctly.
  1008. tr := &failingTransport{path: "/fail", fail: fail}
  1009. httpClient := &http.Client{Transport: tr}
  1010. client, err := NewClient(SetHttpClient(httpClient), SetMaxRetries(5), SetHealthcheck(false))
  1011. if err != nil {
  1012. t.Fatal(err)
  1013. }
  1014. res, err := client.PerformRequest(context.TODO(), "GET", "/fail", nil, nil)
  1015. if err == nil {
  1016. t.Fatal("expected error")
  1017. }
  1018. if res != nil {
  1019. t.Fatal("expected no response")
  1020. }
  1021. // Connection should be marked as dead after it failed
  1022. if numFailedReqs != 5 {
  1023. t.Errorf("expected %d failed requests; got: %d", 5, numFailedReqs)
  1024. }
  1025. }
  1026. func TestPerformRequestNoRetryOnValidButUnsuccessfulHttpStatus(t *testing.T) {
  1027. var numFailedReqs int
  1028. fail := func(r *http.Request) (*http.Response, error) {
  1029. numFailedReqs += 1
  1030. return &http.Response{Request: r, StatusCode: 500}, nil
  1031. }
  1032. // Run against a failing endpoint and see if PerformRequest
  1033. // retries correctly.
  1034. tr := &failingTransport{path: "/fail", fail: fail}
  1035. httpClient := &http.Client{Transport: tr}
  1036. client, err := NewClient(SetHttpClient(httpClient), SetMaxRetries(5), SetHealthcheck(false))
  1037. if err != nil {
  1038. t.Fatal(err)
  1039. }
  1040. res, err := client.PerformRequest(context.TODO(), "GET", "/fail", nil, nil)
  1041. if err == nil {
  1042. t.Fatal("expected error")
  1043. }
  1044. if res == nil {
  1045. t.Fatal("expected response, got nil")
  1046. }
  1047. if want, got := 500, res.StatusCode; want != got {
  1048. t.Fatalf("expected status code = %d, got %d", want, got)
  1049. }
  1050. // Retry should not have triggered additional requests because
  1051. if numFailedReqs != 1 {
  1052. t.Errorf("expected %d failed requests; got: %d", 1, numFailedReqs)
  1053. }
  1054. }
  1055. // failingBody will return an error when json.Marshal is called on it.
  1056. type failingBody struct{}
  1057. // MarshalJSON implements the json.Marshaler interface and always returns an error.
  1058. func (fb failingBody) MarshalJSON() ([]byte, error) {
  1059. return nil, errors.New("failing to marshal")
  1060. }
  1061. func TestPerformRequestWithSetBodyError(t *testing.T) {
  1062. client, err := NewClient()
  1063. if err != nil {
  1064. t.Fatal(err)
  1065. }
  1066. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, failingBody{})
  1067. if err == nil {
  1068. t.Fatal("expected error")
  1069. }
  1070. if res != nil {
  1071. t.Fatal("expected no response")
  1072. }
  1073. }
  1074. // sleepingTransport will sleep before doing a request.
  1075. type sleepingTransport struct {
  1076. timeout time.Duration
  1077. }
  1078. // RoundTrip implements a "sleepy" transport.
  1079. func (tr *sleepingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
  1080. time.Sleep(tr.timeout)
  1081. return http.DefaultTransport.RoundTrip(r)
  1082. }
  1083. func TestPerformRequestWithCancel(t *testing.T) {
  1084. tr := &sleepingTransport{timeout: 3 * time.Second}
  1085. httpClient := &http.Client{Transport: tr}
  1086. client, err := NewSimpleClient(SetHttpClient(httpClient), SetMaxRetries(0))
  1087. if err != nil {
  1088. t.Fatal(err)
  1089. }
  1090. type result struct {
  1091. res *Response
  1092. err error
  1093. }
  1094. ctx, cancel := context.WithCancel(context.Background())
  1095. resc := make(chan result, 1)
  1096. go func() {
  1097. res, err := client.PerformRequest(ctx, "GET", "/", nil, nil)
  1098. resc <- result{res: res, err: err}
  1099. }()
  1100. select {
  1101. case <-time.After(1 * time.Second):
  1102. cancel()
  1103. case res := <-resc:
  1104. t.Fatalf("expected response before cancel, got %v", res)
  1105. case <-ctx.Done():
  1106. t.Fatalf("expected no early termination, got ctx.Done(): %v", ctx.Err())
  1107. }
  1108. err = ctx.Err()
  1109. if err != context.Canceled {
  1110. t.Fatalf("expected error context.Canceled, got: %v", err)
  1111. }
  1112. }
  1113. func TestPerformRequestWithTimeout(t *testing.T) {
  1114. tr := &sleepingTransport{timeout: 3 * time.Second}
  1115. httpClient := &http.Client{Transport: tr}
  1116. client, err := NewSimpleClient(SetHttpClient(httpClient), SetMaxRetries(0))
  1117. if err != nil {
  1118. t.Fatal(err)
  1119. }
  1120. type result struct {
  1121. res *Response
  1122. err error
  1123. }
  1124. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  1125. defer cancel()
  1126. resc := make(chan result, 1)
  1127. go func() {
  1128. res, err := client.PerformRequest(ctx, "GET", "/", nil, nil)
  1129. resc <- result{res: res, err: err}
  1130. }()
  1131. select {
  1132. case res := <-resc:
  1133. t.Fatalf("expected timeout before response, got %v", res)
  1134. case <-ctx.Done():
  1135. err := ctx.Err()
  1136. if err != context.DeadlineExceeded {
  1137. t.Fatalf("expected error context.DeadlineExceeded, got: %v", err)
  1138. }
  1139. }
  1140. }
  1141. // -- Compression --
  1142. // Notice that the trace log does always print "Accept-Encoding: gzip"
  1143. // regardless of whether compression is enabled or not. This is because
  1144. // of the underlying "httputil.DumpRequestOut".
  1145. //
  1146. // Use a real HTTP proxy/recorder to convince yourself that
  1147. // "Accept-Encoding: gzip" is NOT sent when DisableCompression
  1148. // is set to true.
  1149. //
  1150. // See also:
  1151. // https://groups.google.com/forum/#!topic/golang-nuts/ms8QNCzew8Q
  1152. func TestPerformRequestWithCompressionEnabled(t *testing.T) {
  1153. testPerformRequestWithCompression(t, &http.Client{
  1154. Transport: &http.Transport{
  1155. DisableCompression: true,
  1156. },
  1157. })
  1158. }
  1159. func TestPerformRequestWithCompressionDisabled(t *testing.T) {
  1160. testPerformRequestWithCompression(t, &http.Client{
  1161. Transport: &http.Transport{
  1162. DisableCompression: false,
  1163. },
  1164. })
  1165. }
  1166. func testPerformRequestWithCompression(t *testing.T, hc *http.Client) {
  1167. client, err := NewClient(SetHttpClient(hc), SetSniff(false))
  1168. if err != nil {
  1169. t.Fatal(err)
  1170. }
  1171. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  1172. if err != nil {
  1173. t.Fatal(err)
  1174. }
  1175. if res == nil {
  1176. t.Fatal("expected response to be != nil")
  1177. }
  1178. ret := new(PingResult)
  1179. if err := json.Unmarshal(res.Body, ret); err != nil {
  1180. t.Fatalf("expected no error on decode; got: %v", err)
  1181. }
  1182. if ret.ClusterName == "" {
  1183. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  1184. }
  1185. }