clientconn_test.go 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "math"
  24. "net"
  25. "sync/atomic"
  26. "testing"
  27. "time"
  28. "golang.org/x/net/http2"
  29. "google.golang.org/grpc/connectivity"
  30. "google.golang.org/grpc/credentials"
  31. "google.golang.org/grpc/internal/backoff"
  32. "google.golang.org/grpc/internal/envconfig"
  33. "google.golang.org/grpc/internal/leakcheck"
  34. "google.golang.org/grpc/internal/transport"
  35. "google.golang.org/grpc/keepalive"
  36. "google.golang.org/grpc/naming"
  37. "google.golang.org/grpc/resolver"
  38. "google.golang.org/grpc/resolver/manual"
  39. _ "google.golang.org/grpc/resolver/passthrough"
  40. "google.golang.org/grpc/testdata"
  41. )
  42. var (
  43. mutableMinConnectTimeout = time.Second * 20
  44. )
  45. func init() {
  46. getMinConnectTimeout = func() time.Duration {
  47. return time.Duration(atomic.LoadInt64((*int64)(&mutableMinConnectTimeout)))
  48. }
  49. }
  50. func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.State, bool) {
  51. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  52. defer cancel()
  53. var state connectivity.State
  54. for state = cc.GetState(); state != wantState && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
  55. }
  56. return state, state == wantState
  57. }
  58. func TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
  59. defer leakcheck.Check(t)
  60. numServers := 2
  61. servers := make([]net.Listener, numServers)
  62. var err error
  63. for i := 0; i < numServers; i++ {
  64. servers[i], err = net.Listen("tcp", "localhost:0")
  65. if err != nil {
  66. t.Fatalf("Error while listening. Err: %v", err)
  67. }
  68. }
  69. dones := make([]chan struct{}, numServers)
  70. for i := 0; i < numServers; i++ {
  71. dones[i] = make(chan struct{})
  72. }
  73. for i := 0; i < numServers; i++ {
  74. go func(i int) {
  75. defer func() {
  76. close(dones[i])
  77. }()
  78. conn, err := servers[i].Accept()
  79. if err != nil {
  80. t.Errorf("Error while accepting. Err: %v", err)
  81. return
  82. }
  83. defer conn.Close()
  84. switch i {
  85. case 0: // 1st server accepts the connection and immediately closes it.
  86. case 1: // 2nd server accepts the connection and sends settings frames.
  87. framer := http2.NewFramer(conn, conn)
  88. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  89. t.Errorf("Error while writing settings frame. %v", err)
  90. return
  91. }
  92. conn.SetDeadline(time.Now().Add(time.Second))
  93. buf := make([]byte, 1024)
  94. for { // Make sure the connection stays healthy.
  95. _, err = conn.Read(buf)
  96. if err == nil {
  97. continue
  98. }
  99. if nerr, ok := err.(net.Error); !ok || !nerr.Timeout() {
  100. t.Errorf("Server expected the conn.Read(_) to timeout instead got error: %v", err)
  101. }
  102. return
  103. }
  104. }
  105. }(i)
  106. }
  107. r, cleanup := manual.GenerateAndRegisterManualResolver()
  108. defer cleanup()
  109. resolvedAddrs := make([]resolver.Address, numServers)
  110. for i := 0; i < numServers; i++ {
  111. resolvedAddrs[i] = resolver.Address{Addr: servers[i].Addr().String()}
  112. }
  113. r.InitialAddrs(resolvedAddrs)
  114. client, err := Dial(r.Scheme()+":///test.server", WithInsecure())
  115. if err != nil {
  116. t.Errorf("Dial failed. Err: %v", err)
  117. } else {
  118. defer client.Close()
  119. }
  120. time.Sleep(time.Second) // Close the servers after a second for cleanup.
  121. for _, s := range servers {
  122. s.Close()
  123. }
  124. for _, done := range dones {
  125. <-done
  126. }
  127. }
  128. var allReqHSSettings = []envconfig.RequireHandshakeSetting{
  129. envconfig.RequireHandshakeOff,
  130. envconfig.RequireHandshakeOn,
  131. envconfig.RequireHandshakeHybrid,
  132. }
  133. var reqNoHSSettings = []envconfig.RequireHandshakeSetting{
  134. envconfig.RequireHandshakeOff,
  135. envconfig.RequireHandshakeHybrid,
  136. }
  137. var reqHSBeforeSuccess = []envconfig.RequireHandshakeSetting{
  138. envconfig.RequireHandshakeOn,
  139. envconfig.RequireHandshakeHybrid,
  140. }
  141. func TestDialWaitsForServerSettings(t *testing.T) {
  142. // Restore current setting after test.
  143. old := envconfig.RequireHandshake
  144. defer func() { envconfig.RequireHandshake = old }()
  145. defer leakcheck.Check(t)
  146. // Test with all environment variable settings, which should not impact the
  147. // test case since WithWaitForHandshake has higher priority.
  148. for _, setting := range allReqHSSettings {
  149. envconfig.RequireHandshake = setting
  150. lis, err := net.Listen("tcp", "localhost:0")
  151. if err != nil {
  152. t.Fatalf("Error while listening. Err: %v", err)
  153. }
  154. defer lis.Close()
  155. done := make(chan struct{})
  156. sent := make(chan struct{})
  157. dialDone := make(chan struct{})
  158. go func() { // Launch the server.
  159. defer func() {
  160. close(done)
  161. }()
  162. conn, err := lis.Accept()
  163. if err != nil {
  164. t.Errorf("Error while accepting. Err: %v", err)
  165. return
  166. }
  167. defer conn.Close()
  168. // Sleep for a little bit to make sure that Dial on client
  169. // side blocks until settings are received.
  170. time.Sleep(100 * time.Millisecond)
  171. framer := http2.NewFramer(conn, conn)
  172. close(sent)
  173. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  174. t.Errorf("Error while writing settings. Err: %v", err)
  175. return
  176. }
  177. <-dialDone // Close conn only after dial returns.
  178. }()
  179. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  180. defer cancel()
  181. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock())
  182. close(dialDone)
  183. if err != nil {
  184. t.Fatalf("Error while dialing. Err: %v", err)
  185. }
  186. defer client.Close()
  187. select {
  188. case <-sent:
  189. default:
  190. t.Fatalf("Dial returned before server settings were sent")
  191. }
  192. <-done
  193. }
  194. }
  195. func TestDialWaitsForServerSettingsViaEnv(t *testing.T) {
  196. // Set default behavior and restore current setting after test.
  197. old := envconfig.RequireHandshake
  198. envconfig.RequireHandshake = envconfig.RequireHandshakeOn
  199. defer func() { envconfig.RequireHandshake = old }()
  200. defer leakcheck.Check(t)
  201. lis, err := net.Listen("tcp", "localhost:0")
  202. if err != nil {
  203. t.Fatalf("Error while listening. Err: %v", err)
  204. }
  205. defer lis.Close()
  206. done := make(chan struct{})
  207. sent := make(chan struct{})
  208. dialDone := make(chan struct{})
  209. go func() { // Launch the server.
  210. defer func() {
  211. close(done)
  212. }()
  213. conn, err := lis.Accept()
  214. if err != nil {
  215. t.Errorf("Error while accepting. Err: %v", err)
  216. return
  217. }
  218. defer conn.Close()
  219. // Sleep for a little bit to make sure that Dial on client
  220. // side blocks until settings are received.
  221. time.Sleep(100 * time.Millisecond)
  222. framer := http2.NewFramer(conn, conn)
  223. close(sent)
  224. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  225. t.Errorf("Error while writing settings. Err: %v", err)
  226. return
  227. }
  228. <-dialDone // Close conn only after dial returns.
  229. }()
  230. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  231. defer cancel()
  232. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock())
  233. close(dialDone)
  234. if err != nil {
  235. t.Fatalf("Error while dialing. Err: %v", err)
  236. }
  237. defer client.Close()
  238. select {
  239. case <-sent:
  240. default:
  241. t.Fatalf("Dial returned before server settings were sent")
  242. }
  243. <-done
  244. }
  245. func TestDialWaitsForServerSettingsAndFails(t *testing.T) {
  246. // Restore current setting after test.
  247. old := envconfig.RequireHandshake
  248. defer func() { envconfig.RequireHandshake = old }()
  249. defer leakcheck.Check(t)
  250. for _, setting := range allReqHSSettings {
  251. envconfig.RequireHandshake = setting
  252. lis, err := net.Listen("tcp", "localhost:0")
  253. if err != nil {
  254. t.Fatalf("Error while listening. Err: %v", err)
  255. }
  256. done := make(chan struct{})
  257. numConns := 0
  258. go func() { // Launch the server.
  259. defer func() {
  260. close(done)
  261. }()
  262. for {
  263. conn, err := lis.Accept()
  264. if err != nil {
  265. break
  266. }
  267. numConns++
  268. defer conn.Close()
  269. }
  270. }()
  271. getMinConnectTimeout = func() time.Duration { return time.Second / 4 }
  272. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  273. defer cancel()
  274. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock(), withBackoff(noBackoff{}))
  275. lis.Close()
  276. if err == nil {
  277. client.Close()
  278. t.Fatalf("Unexpected success (err=nil) while dialing")
  279. }
  280. if err != context.DeadlineExceeded {
  281. t.Fatalf("DialContext(_) = %v; want context.DeadlineExceeded", err)
  282. }
  283. if numConns < 2 {
  284. t.Fatalf("dial attempts: %v; want > 1", numConns)
  285. }
  286. <-done
  287. }
  288. }
  289. func TestDialWaitsForServerSettingsViaEnvAndFails(t *testing.T) {
  290. // Set default behavior and restore current setting after test.
  291. old := envconfig.RequireHandshake
  292. envconfig.RequireHandshake = envconfig.RequireHandshakeOn
  293. defer func() { envconfig.RequireHandshake = old }()
  294. defer leakcheck.Check(t)
  295. lis, err := net.Listen("tcp", "localhost:0")
  296. if err != nil {
  297. t.Fatalf("Error while listening. Err: %v", err)
  298. }
  299. done := make(chan struct{})
  300. numConns := 0
  301. go func() { // Launch the server.
  302. defer func() {
  303. close(done)
  304. }()
  305. for {
  306. conn, err := lis.Accept()
  307. if err != nil {
  308. break
  309. }
  310. numConns++
  311. defer conn.Close()
  312. }
  313. }()
  314. getMinConnectTimeout = func() time.Duration { return time.Second / 4 }
  315. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  316. defer cancel()
  317. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock(), withBackoff(noBackoff{}))
  318. lis.Close()
  319. if err == nil {
  320. client.Close()
  321. t.Fatalf("Unexpected success (err=nil) while dialing")
  322. }
  323. if err != context.DeadlineExceeded {
  324. t.Fatalf("DialContext(_) = %v; want context.DeadlineExceeded", err)
  325. }
  326. if numConns < 2 {
  327. t.Fatalf("dial attempts: %v; want > 1", numConns)
  328. }
  329. <-done
  330. }
  331. func TestDialDoesNotWaitForServerSettings(t *testing.T) {
  332. // Restore current setting after test.
  333. old := envconfig.RequireHandshake
  334. defer func() { envconfig.RequireHandshake = old }()
  335. defer leakcheck.Check(t)
  336. // Test with "off" and "hybrid".
  337. for _, setting := range reqNoHSSettings {
  338. envconfig.RequireHandshake = setting
  339. lis, err := net.Listen("tcp", "localhost:0")
  340. if err != nil {
  341. t.Fatalf("Error while listening. Err: %v", err)
  342. }
  343. defer lis.Close()
  344. done := make(chan struct{})
  345. dialDone := make(chan struct{})
  346. go func() { // Launch the server.
  347. defer func() {
  348. close(done)
  349. }()
  350. conn, err := lis.Accept()
  351. if err != nil {
  352. t.Errorf("Error while accepting. Err: %v", err)
  353. return
  354. }
  355. defer conn.Close()
  356. <-dialDone // Close conn only after dial returns.
  357. }()
  358. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  359. defer cancel()
  360. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock())
  361. if err != nil {
  362. t.Fatalf("DialContext returned err =%v; want nil", err)
  363. }
  364. defer client.Close()
  365. if state := client.GetState(); state != connectivity.Ready {
  366. t.Fatalf("client.GetState() = %v; want connectivity.Ready", state)
  367. }
  368. close(dialDone)
  369. <-done
  370. }
  371. }
  372. func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
  373. // Restore current setting after test.
  374. old := envconfig.RequireHandshake
  375. defer func() { envconfig.RequireHandshake = old }()
  376. // 1. Client connects to a server that doesn't send preface.
  377. // 2. After minConnectTimeout(500 ms here), client disconnects and retries.
  378. // 3. The new server sends its preface.
  379. // 4. Client doesn't kill the connection this time.
  380. mctBkp := getMinConnectTimeout()
  381. defer func() {
  382. atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
  383. }()
  384. defer leakcheck.Check(t)
  385. atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(time.Millisecond)*500)
  386. // Test with "on" and "hybrid".
  387. for _, setting := range reqHSBeforeSuccess {
  388. envconfig.RequireHandshake = setting
  389. lis, err := net.Listen("tcp", "localhost:0")
  390. if err != nil {
  391. t.Fatalf("Error while listening. Err: %v", err)
  392. }
  393. var (
  394. conn2 net.Conn
  395. over uint32
  396. )
  397. defer func() {
  398. lis.Close()
  399. // conn2 shouldn't be closed until the client has
  400. // observed a successful test.
  401. if conn2 != nil {
  402. conn2.Close()
  403. }
  404. }()
  405. done := make(chan struct{})
  406. accepted := make(chan struct{})
  407. go func() { // Launch the server.
  408. defer close(done)
  409. conn1, err := lis.Accept()
  410. if err != nil {
  411. t.Errorf("Error while accepting. Err: %v", err)
  412. return
  413. }
  414. defer conn1.Close()
  415. // Don't send server settings and the client should close the connection and try again.
  416. conn2, err = lis.Accept() // Accept a reconnection request from client.
  417. if err != nil {
  418. t.Errorf("Error while accepting. Err: %v", err)
  419. return
  420. }
  421. close(accepted)
  422. framer := http2.NewFramer(conn2, conn2)
  423. if err = framer.WriteSettings(http2.Setting{}); err != nil {
  424. t.Errorf("Error while writing settings. Err: %v", err)
  425. return
  426. }
  427. b := make([]byte, 8)
  428. for {
  429. _, err = conn2.Read(b)
  430. if err == nil {
  431. continue
  432. }
  433. if atomic.LoadUint32(&over) == 1 {
  434. // The connection stayed alive for the timer.
  435. // Success.
  436. return
  437. }
  438. t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err)
  439. break
  440. }
  441. }()
  442. client, err := Dial(lis.Addr().String(), WithInsecure())
  443. if err != nil {
  444. t.Fatalf("Error while dialing. Err: %v", err)
  445. }
  446. // wait for connection to be accepted on the server.
  447. timer := time.NewTimer(time.Second * 10)
  448. select {
  449. case <-accepted:
  450. case <-timer.C:
  451. t.Fatalf("Client didn't make another connection request in time.")
  452. }
  453. // Make sure the connection stays alive for sometime.
  454. time.Sleep(time.Second)
  455. atomic.StoreUint32(&over, 1)
  456. client.Close()
  457. <-done
  458. }
  459. }
  460. func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
  461. defer leakcheck.Check(t)
  462. lis, err := net.Listen("tcp", "localhost:0")
  463. if err != nil {
  464. t.Fatalf("Error while listening. Err: %v", err)
  465. }
  466. defer lis.Close()
  467. done := make(chan struct{})
  468. go func() { // Launch the server.
  469. defer func() {
  470. close(done)
  471. }()
  472. conn, err := lis.Accept() // Accept the connection only to close it immediately.
  473. if err != nil {
  474. t.Errorf("Error while accepting. Err: %v", err)
  475. return
  476. }
  477. prevAt := time.Now()
  478. conn.Close()
  479. var prevDuration time.Duration
  480. // Make sure the retry attempts are backed off properly.
  481. for i := 0; i < 3; i++ {
  482. conn, err := lis.Accept()
  483. if err != nil {
  484. t.Errorf("Error while accepting. Err: %v", err)
  485. return
  486. }
  487. meow := time.Now()
  488. conn.Close()
  489. dr := meow.Sub(prevAt)
  490. if dr <= prevDuration {
  491. t.Errorf("Client backoff did not increase with retries. Previous duration: %v, current duration: %v", prevDuration, dr)
  492. return
  493. }
  494. prevDuration = dr
  495. prevAt = meow
  496. }
  497. }()
  498. client, err := Dial(lis.Addr().String(), WithInsecure())
  499. if err != nil {
  500. t.Fatalf("Error while dialing. Err: %v", err)
  501. }
  502. defer client.Close()
  503. <-done
  504. }
  505. func TestConnectivityStates(t *testing.T) {
  506. defer leakcheck.Check(t)
  507. servers, resolver, cleanup := startServers(t, 2, math.MaxUint32)
  508. defer cleanup()
  509. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(resolver)), WithInsecure())
  510. if err != nil {
  511. t.Fatalf("Dial(\"foo.bar.com\", WithBalancer(_)) = _, %v, want _ <nil>", err)
  512. }
  513. defer cc.Close()
  514. wantState := connectivity.Ready
  515. if state, ok := assertState(wantState, cc); !ok {
  516. t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
  517. }
  518. // Send an update to delete the server connection (tearDown addrConn).
  519. update := []*naming.Update{
  520. {
  521. Op: naming.Delete,
  522. Addr: "localhost:" + servers[0].port,
  523. },
  524. }
  525. resolver.w.inject(update)
  526. wantState = connectivity.TransientFailure
  527. if state, ok := assertState(wantState, cc); !ok {
  528. t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
  529. }
  530. update[0] = &naming.Update{
  531. Op: naming.Add,
  532. Addr: "localhost:" + servers[1].port,
  533. }
  534. resolver.w.inject(update)
  535. wantState = connectivity.Ready
  536. if state, ok := assertState(wantState, cc); !ok {
  537. t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
  538. }
  539. }
  540. func TestWithTimeout(t *testing.T) {
  541. defer leakcheck.Check(t)
  542. conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTimeout(time.Millisecond), WithBlock(), WithInsecure())
  543. if err == nil {
  544. conn.Close()
  545. }
  546. if err != context.DeadlineExceeded {
  547. t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
  548. }
  549. }
  550. func TestWithTransportCredentialsTLS(t *testing.T) {
  551. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  552. defer cancel()
  553. defer leakcheck.Check(t)
  554. creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
  555. if err != nil {
  556. t.Fatalf("Failed to create credentials %v", err)
  557. }
  558. conn, err := DialContext(ctx, "passthrough:///Non-Existent.Server:80", WithTransportCredentials(creds), WithBlock())
  559. if err == nil {
  560. conn.Close()
  561. }
  562. if err != context.DeadlineExceeded {
  563. t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
  564. }
  565. }
  566. func TestDefaultAuthority(t *testing.T) {
  567. defer leakcheck.Check(t)
  568. target := "Non-Existent.Server:8080"
  569. conn, err := Dial(target, WithInsecure())
  570. if err != nil {
  571. t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
  572. }
  573. defer conn.Close()
  574. if conn.authority != target {
  575. t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, target)
  576. }
  577. }
  578. func TestTLSServerNameOverwrite(t *testing.T) {
  579. defer leakcheck.Check(t)
  580. overwriteServerName := "over.write.server.name"
  581. creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), overwriteServerName)
  582. if err != nil {
  583. t.Fatalf("Failed to create credentials %v", err)
  584. }
  585. conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTransportCredentials(creds))
  586. if err != nil {
  587. t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
  588. }
  589. defer conn.Close()
  590. if conn.authority != overwriteServerName {
  591. t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
  592. }
  593. }
  594. func TestWithAuthority(t *testing.T) {
  595. defer leakcheck.Check(t)
  596. overwriteServerName := "over.write.server.name"
  597. conn, err := Dial("passthrough:///Non-Existent.Server:80", WithInsecure(), WithAuthority(overwriteServerName))
  598. if err != nil {
  599. t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
  600. }
  601. defer conn.Close()
  602. if conn.authority != overwriteServerName {
  603. t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
  604. }
  605. }
  606. func TestWithAuthorityAndTLS(t *testing.T) {
  607. defer leakcheck.Check(t)
  608. overwriteServerName := "over.write.server.name"
  609. creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), overwriteServerName)
  610. if err != nil {
  611. t.Fatalf("Failed to create credentials %v", err)
  612. }
  613. conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTransportCredentials(creds), WithAuthority("no.effect.authority"))
  614. if err != nil {
  615. t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
  616. }
  617. defer conn.Close()
  618. if conn.authority != overwriteServerName {
  619. t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
  620. }
  621. }
  622. func TestDialContextCancel(t *testing.T) {
  623. defer leakcheck.Check(t)
  624. ctx, cancel := context.WithCancel(context.Background())
  625. cancel()
  626. if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure()); err != context.Canceled {
  627. t.Fatalf("DialContext(%v, _) = _, %v, want _, %v", ctx, err, context.Canceled)
  628. }
  629. }
  630. type failFastError struct{}
  631. func (failFastError) Error() string { return "failfast" }
  632. func (failFastError) Temporary() bool { return false }
  633. func TestDialContextFailFast(t *testing.T) {
  634. defer leakcheck.Check(t)
  635. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  636. defer cancel()
  637. failErr := failFastError{}
  638. dialer := func(string, time.Duration) (net.Conn, error) {
  639. return nil, failErr
  640. }
  641. _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithDialer(dialer), FailOnNonTempDialError(true))
  642. if terr, ok := err.(transport.ConnectionError); !ok || terr.Origin() != failErr {
  643. t.Fatalf("DialContext() = _, %v, want _, %v", err, failErr)
  644. }
  645. }
  646. // blockingBalancer mimics the behavior of balancers whose initialization takes a long time.
  647. // In this test, reading from blockingBalancer.Notify() blocks forever.
  648. type blockingBalancer struct {
  649. ch chan []Address
  650. }
  651. func newBlockingBalancer() Balancer {
  652. return &blockingBalancer{ch: make(chan []Address)}
  653. }
  654. func (b *blockingBalancer) Start(target string, config BalancerConfig) error {
  655. return nil
  656. }
  657. func (b *blockingBalancer) Up(addr Address) func(error) {
  658. return nil
  659. }
  660. func (b *blockingBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
  661. return Address{}, nil, nil
  662. }
  663. func (b *blockingBalancer) Notify() <-chan []Address {
  664. return b.ch
  665. }
  666. func (b *blockingBalancer) Close() error {
  667. close(b.ch)
  668. return nil
  669. }
  670. func TestDialWithBlockingBalancer(t *testing.T) {
  671. defer leakcheck.Check(t)
  672. ctx, cancel := context.WithCancel(context.Background())
  673. dialDone := make(chan struct{})
  674. go func() {
  675. DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithBalancer(newBlockingBalancer()))
  676. close(dialDone)
  677. }()
  678. cancel()
  679. <-dialDone
  680. }
  681. // securePerRPCCredentials always requires transport security.
  682. type securePerRPCCredentials struct{}
  683. func (c securePerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
  684. return nil, nil
  685. }
  686. func (c securePerRPCCredentials) RequireTransportSecurity() bool {
  687. return true
  688. }
  689. func TestCredentialsMisuse(t *testing.T) {
  690. defer leakcheck.Check(t)
  691. tlsCreds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
  692. if err != nil {
  693. t.Fatalf("Failed to create authenticator %v", err)
  694. }
  695. // Two conflicting credential configurations
  696. if _, err := Dial("passthrough:///Non-Existent.Server:80", WithTransportCredentials(tlsCreds), WithBlock(), WithInsecure()); err != errCredentialsConflict {
  697. t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errCredentialsConflict)
  698. }
  699. // security info on insecure connection
  700. if _, err := Dial("passthrough:///Non-Existent.Server:80", WithPerRPCCredentials(securePerRPCCredentials{}), WithBlock(), WithInsecure()); err != errTransportCredentialsMissing {
  701. t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing)
  702. }
  703. }
  704. func TestWithBackoffConfigDefault(t *testing.T) {
  705. defer leakcheck.Check(t)
  706. testBackoffConfigSet(t, &DefaultBackoffConfig)
  707. }
  708. func TestWithBackoffConfig(t *testing.T) {
  709. defer leakcheck.Check(t)
  710. b := BackoffConfig{MaxDelay: DefaultBackoffConfig.MaxDelay / 2}
  711. expected := b
  712. testBackoffConfigSet(t, &expected, WithBackoffConfig(b))
  713. }
  714. func TestWithBackoffMaxDelay(t *testing.T) {
  715. defer leakcheck.Check(t)
  716. md := DefaultBackoffConfig.MaxDelay / 2
  717. expected := BackoffConfig{MaxDelay: md}
  718. testBackoffConfigSet(t, &expected, WithBackoffMaxDelay(md))
  719. }
  720. func testBackoffConfigSet(t *testing.T, expected *BackoffConfig, opts ...DialOption) {
  721. opts = append(opts, WithInsecure())
  722. conn, err := Dial("passthrough:///foo:80", opts...)
  723. if err != nil {
  724. t.Fatalf("unexpected error dialing connection: %v", err)
  725. }
  726. defer conn.Close()
  727. if conn.dopts.bs == nil {
  728. t.Fatalf("backoff config not set")
  729. }
  730. actual, ok := conn.dopts.bs.(backoff.Exponential)
  731. if !ok {
  732. t.Fatalf("unexpected type of backoff config: %#v", conn.dopts.bs)
  733. }
  734. expectedValue := backoff.Exponential{
  735. MaxDelay: expected.MaxDelay,
  736. }
  737. if actual != expectedValue {
  738. t.Fatalf("unexpected backoff config on connection: %v, want %v", actual, expected)
  739. }
  740. }
  741. // emptyBalancer returns an empty set of servers.
  742. type emptyBalancer struct {
  743. ch chan []Address
  744. }
  745. func newEmptyBalancer() Balancer {
  746. return &emptyBalancer{ch: make(chan []Address, 1)}
  747. }
  748. func (b *emptyBalancer) Start(_ string, _ BalancerConfig) error {
  749. b.ch <- nil
  750. return nil
  751. }
  752. func (b *emptyBalancer) Up(_ Address) func(error) {
  753. return nil
  754. }
  755. func (b *emptyBalancer) Get(_ context.Context, _ BalancerGetOptions) (Address, func(), error) {
  756. return Address{}, nil, nil
  757. }
  758. func (b *emptyBalancer) Notify() <-chan []Address {
  759. return b.ch
  760. }
  761. func (b *emptyBalancer) Close() error {
  762. close(b.ch)
  763. return nil
  764. }
  765. func TestNonblockingDialWithEmptyBalancer(t *testing.T) {
  766. defer leakcheck.Check(t)
  767. ctx, cancel := context.WithCancel(context.Background())
  768. defer cancel()
  769. dialDone := make(chan error)
  770. go func() {
  771. dialDone <- func() error {
  772. conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer()))
  773. if err != nil {
  774. return err
  775. }
  776. return conn.Close()
  777. }()
  778. }()
  779. if err := <-dialDone; err != nil {
  780. t.Fatalf("unexpected error dialing connection: %s", err)
  781. }
  782. }
  783. func TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
  784. defer leakcheck.Check(t)
  785. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  786. defer rcleanup()
  787. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
  788. if err != nil {
  789. t.Fatalf("failed to dial: %v", err)
  790. }
  791. defer cc.Close()
  792. // SwitchBalancer before NewAddress. There was no balancer created, this
  793. // makes sure we don't call close on nil balancerWrapper.
  794. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic.
  795. time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
  796. }
  797. func TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
  798. defer leakcheck.Check(t)
  799. for i := 0; i < 10; i++ { // Run this multiple times to make sure it doesn't panic.
  800. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  801. defer rcleanup()
  802. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
  803. if err != nil {
  804. t.Fatalf("failed to dial: %v", err)
  805. }
  806. // Send a new service config while closing the ClientConn.
  807. go cc.Close()
  808. go r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic.
  809. }
  810. }
  811. func TestResolverEmptyUpdateNotPanic(t *testing.T) {
  812. defer leakcheck.Check(t)
  813. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  814. defer rcleanup()
  815. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
  816. if err != nil {
  817. t.Fatalf("failed to dial: %v", err)
  818. }
  819. defer cc.Close()
  820. // This make sure we don't create addrConn with empty address list.
  821. r.NewAddress([]resolver.Address{}) // This should not panic.
  822. time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
  823. }
  824. func TestClientUpdatesParamsAfterGoAway(t *testing.T) {
  825. defer leakcheck.Check(t)
  826. lis, err := net.Listen("tcp", "localhost:0")
  827. if err != nil {
  828. t.Fatalf("Failed to listen. Err: %v", err)
  829. }
  830. defer lis.Close()
  831. addr := lis.Addr().String()
  832. s := NewServer()
  833. go s.Serve(lis)
  834. defer s.Stop()
  835. cc, err := Dial(addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
  836. Time: 50 * time.Millisecond,
  837. Timeout: 100 * time.Millisecond,
  838. PermitWithoutStream: true,
  839. }))
  840. if err != nil {
  841. t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  842. }
  843. defer cc.Close()
  844. time.Sleep(time.Second)
  845. cc.mu.RLock()
  846. defer cc.mu.RUnlock()
  847. v := cc.mkp.Time
  848. if v < 100*time.Millisecond {
  849. t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v)
  850. }
  851. }
  852. func TestDisableServiceConfigOption(t *testing.T) {
  853. r, cleanup := manual.GenerateAndRegisterManualResolver()
  854. defer cleanup()
  855. addr := r.Scheme() + ":///non.existent"
  856. cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig())
  857. if err != nil {
  858. t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  859. }
  860. defer cc.Close()
  861. r.NewServiceConfig(`{
  862. "methodConfig": [
  863. {
  864. "name": [
  865. {
  866. "service": "foo",
  867. "method": "Bar"
  868. }
  869. ],
  870. "waitForReady": true
  871. }
  872. ]
  873. }`)
  874. time.Sleep(time.Second)
  875. m := cc.GetMethodConfig("/foo/Bar")
  876. if m.WaitForReady != nil {
  877. t.Fatalf("want: method (\"/foo/bar/\") config to be empty, got: %v", m)
  878. }
  879. }
  880. func TestGetClientConnTarget(t *testing.T) {
  881. addr := "nonexist:///non.existent"
  882. cc, err := Dial(addr, WithInsecure())
  883. if err != nil {
  884. t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  885. }
  886. defer cc.Close()
  887. if cc.Target() != addr {
  888. t.Fatalf("Target() = %s, want %s", cc.Target(), addr)
  889. }
  890. }
  891. type backoffForever struct{}
  892. func (b backoffForever) Backoff(int) time.Duration { return time.Duration(math.MaxInt64) }
  893. func TestResetConnectBackoff(t *testing.T) {
  894. defer leakcheck.Check(t)
  895. dials := make(chan struct{})
  896. defer func() { // If we fail, let the http2client break out of dialing.
  897. select {
  898. case <-dials:
  899. default:
  900. }
  901. }()
  902. dialer := func(string, time.Duration) (net.Conn, error) {
  903. dials <- struct{}{}
  904. return nil, errors.New("failed to fake dial")
  905. }
  906. cc, err := Dial("any", WithInsecure(), WithDialer(dialer), withBackoff(backoffForever{}))
  907. if err != nil {
  908. t.Fatalf("Dial() = _, %v; want _, nil", err)
  909. }
  910. defer cc.Close()
  911. select {
  912. case <-dials:
  913. case <-time.NewTimer(10 * time.Second).C:
  914. t.Fatal("Failed to call dial within 10s")
  915. }
  916. select {
  917. case <-dials:
  918. t.Fatal("Dial called unexpectedly before resetting backoff")
  919. case <-time.NewTimer(100 * time.Millisecond).C:
  920. }
  921. cc.ResetConnectBackoff()
  922. select {
  923. case <-dials:
  924. case <-time.NewTimer(10 * time.Second).C:
  925. t.Fatal("Failed to call dial within 10s after resetting backoff")
  926. }
  927. }
  928. func TestBackoffCancel(t *testing.T) {
  929. defer leakcheck.Check(t)
  930. dialStrCh := make(chan string)
  931. cc, err := Dial("any", WithInsecure(), WithDialer(func(t string, _ time.Duration) (net.Conn, error) {
  932. dialStrCh <- t
  933. return nil, fmt.Errorf("test dialer, always error")
  934. }))
  935. if err != nil {
  936. t.Fatalf("Failed to create ClientConn: %v", err)
  937. }
  938. <-dialStrCh
  939. cc.Close()
  940. // Should not leak. May need -count 5000 to exercise.
  941. }