clientconn_state_transition_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "net"
  22. "sync"
  23. "sync/atomic"
  24. "testing"
  25. "time"
  26. "golang.org/x/net/http2"
  27. "google.golang.org/grpc/balancer"
  28. "google.golang.org/grpc/connectivity"
  29. "google.golang.org/grpc/internal/leakcheck"
  30. "google.golang.org/grpc/internal/testutils"
  31. "google.golang.org/grpc/resolver"
  32. "google.golang.org/grpc/resolver/manual"
  33. )
  34. const stateRecordingBalancerName = "state_recoding_balancer"
  35. var testBalancer = &stateRecordingBalancer{}
  36. func init() {
  37. balancer.Register(testBalancer)
  38. }
  39. // These tests use a pipeListener. This listener is similar to net.Listener except that it is unbuffered, so each read
  40. // and write will wait for the other side's corresponding write or read.
  41. func TestStateTransitions_SingleAddress(t *testing.T) {
  42. defer leakcheck.Check(t)
  43. mctBkp := getMinConnectTimeout()
  44. defer func() {
  45. atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
  46. }()
  47. atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(time.Millisecond)*100)
  48. for _, test := range []struct {
  49. desc string
  50. want []connectivity.State
  51. server func(net.Listener) net.Conn
  52. }{
  53. {
  54. desc: "When the server returns server preface, the client enters READY.",
  55. want: []connectivity.State{
  56. connectivity.Connecting,
  57. connectivity.Ready,
  58. },
  59. server: func(lis net.Listener) net.Conn {
  60. conn, err := lis.Accept()
  61. if err != nil {
  62. t.Error(err)
  63. return nil
  64. }
  65. go keepReading(conn)
  66. framer := http2.NewFramer(conn, conn)
  67. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  68. t.Errorf("Error while writing settings frame. %v", err)
  69. return nil
  70. }
  71. return conn
  72. },
  73. },
  74. {
  75. desc: "When the connection is closed, the client enters TRANSIENT FAILURE.",
  76. want: []connectivity.State{
  77. connectivity.Connecting,
  78. connectivity.TransientFailure,
  79. },
  80. server: func(lis net.Listener) net.Conn {
  81. conn, err := lis.Accept()
  82. if err != nil {
  83. t.Error(err)
  84. return nil
  85. }
  86. conn.Close()
  87. return nil
  88. },
  89. },
  90. {
  91. desc: `When the server sends its connection preface, but the connection dies before the client can write its
  92. connection preface, the client enters TRANSIENT FAILURE.`,
  93. want: []connectivity.State{
  94. connectivity.Connecting,
  95. connectivity.TransientFailure,
  96. },
  97. server: func(lis net.Listener) net.Conn {
  98. conn, err := lis.Accept()
  99. if err != nil {
  100. t.Error(err)
  101. return nil
  102. }
  103. framer := http2.NewFramer(conn, conn)
  104. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  105. t.Errorf("Error while writing settings frame. %v", err)
  106. return nil
  107. }
  108. conn.Close()
  109. return nil
  110. },
  111. },
  112. {
  113. desc: `When the server reads the client connection preface but does not send its connection preface, the
  114. client enters TRANSIENT FAILURE.`,
  115. want: []connectivity.State{
  116. connectivity.Connecting,
  117. connectivity.TransientFailure,
  118. },
  119. server: func(lis net.Listener) net.Conn {
  120. conn, err := lis.Accept()
  121. if err != nil {
  122. t.Error(err)
  123. return nil
  124. }
  125. go keepReading(conn)
  126. return conn
  127. },
  128. },
  129. } {
  130. t.Log(test.desc)
  131. testStateTransitionSingleAddress(t, test.want, test.server)
  132. }
  133. }
  134. func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
  135. defer leakcheck.Check(t)
  136. stateNotifications := make(chan connectivity.State, len(want))
  137. testBalancer.ResetNotifier(stateNotifications)
  138. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  139. defer cancel()
  140. pl := testutils.NewPipeListener()
  141. defer pl.Close()
  142. // Launch the server.
  143. var conn net.Conn
  144. var connMu sync.Mutex
  145. go func() {
  146. connMu.Lock()
  147. conn = server(pl)
  148. connMu.Unlock()
  149. }()
  150. client, err := DialContext(ctx, "", WithWaitForHandshake(), WithInsecure(),
  151. WithBalancerName(stateRecordingBalancerName), WithDialer(pl.Dialer()), withBackoff(noBackoff{}))
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. defer client.Close()
  156. timeout := time.After(5 * time.Second)
  157. for i := 0; i < len(want); i++ {
  158. select {
  159. case <-timeout:
  160. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  161. case seen := <-stateNotifications:
  162. if seen != want[i] {
  163. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  164. }
  165. }
  166. }
  167. connMu.Lock()
  168. defer connMu.Unlock()
  169. if conn != nil {
  170. err = conn.Close()
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. }
  175. }
  176. // When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING.
  177. func TestStateTransition_ReadyToTransientFailure(t *testing.T) {
  178. defer leakcheck.Check(t)
  179. want := []connectivity.State{
  180. connectivity.Connecting,
  181. connectivity.Ready,
  182. connectivity.TransientFailure,
  183. connectivity.Connecting,
  184. }
  185. stateNotifications := make(chan connectivity.State, len(want))
  186. testBalancer.ResetNotifier(stateNotifications)
  187. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  188. defer cancel()
  189. lis, err := net.Listen("tcp", "localhost:0")
  190. if err != nil {
  191. t.Fatalf("Error while listening. Err: %v", err)
  192. }
  193. defer lis.Close()
  194. sawReady := make(chan struct{})
  195. // Launch the server.
  196. go func() {
  197. conn, err := lis.Accept()
  198. if err != nil {
  199. t.Error(err)
  200. return
  201. }
  202. go keepReading(conn)
  203. framer := http2.NewFramer(conn, conn)
  204. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  205. t.Errorf("Error while writing settings frame. %v", err)
  206. return
  207. }
  208. // Prevents race between onPrefaceReceipt and onClose.
  209. <-sawReady
  210. conn.Close()
  211. }()
  212. client, err := DialContext(ctx, lis.Addr().String(), WithWaitForHandshake(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. defer client.Close()
  217. timeout := time.After(5 * time.Second)
  218. for i := 0; i < len(want); i++ {
  219. select {
  220. case <-timeout:
  221. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  222. case seen := <-stateNotifications:
  223. if seen == connectivity.Ready {
  224. close(sawReady)
  225. }
  226. if seen != want[i] {
  227. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  228. }
  229. }
  230. }
  231. }
  232. // When the first connection is closed, the client enters stays in CONNECTING until it tries the second
  233. // address (which succeeds, and then it enters READY).
  234. func TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
  235. defer leakcheck.Check(t)
  236. want := []connectivity.State{
  237. connectivity.Connecting,
  238. connectivity.Ready,
  239. }
  240. stateNotifications := make(chan connectivity.State, len(want))
  241. testBalancer.ResetNotifier(stateNotifications)
  242. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  243. defer cancel()
  244. lis1, err := net.Listen("tcp", "localhost:0")
  245. if err != nil {
  246. t.Fatalf("Error while listening. Err: %v", err)
  247. }
  248. defer lis1.Close()
  249. lis2, err := net.Listen("tcp", "localhost:0")
  250. if err != nil {
  251. t.Fatalf("Error while listening. Err: %v", err)
  252. }
  253. defer lis2.Close()
  254. server1Done := make(chan struct{})
  255. server2Done := make(chan struct{})
  256. // Launch server 1.
  257. go func() {
  258. conn, err := lis1.Accept()
  259. if err != nil {
  260. t.Error(err)
  261. return
  262. }
  263. conn.Close()
  264. close(server1Done)
  265. }()
  266. // Launch server 2.
  267. go func() {
  268. conn, err := lis2.Accept()
  269. if err != nil {
  270. t.Error(err)
  271. return
  272. }
  273. go keepReading(conn)
  274. framer := http2.NewFramer(conn, conn)
  275. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  276. t.Errorf("Error while writing settings frame. %v", err)
  277. return
  278. }
  279. close(server2Done)
  280. }()
  281. rb := manual.NewBuilderWithScheme("whatever")
  282. rb.InitialAddrs([]resolver.Address{
  283. {Addr: lis1.Addr().String()},
  284. {Addr: lis2.Addr().String()},
  285. })
  286. client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
  287. if err != nil {
  288. t.Fatal(err)
  289. }
  290. defer client.Close()
  291. timeout := time.After(5 * time.Second)
  292. for i := 0; i < len(want); i++ {
  293. select {
  294. case <-timeout:
  295. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  296. case seen := <-stateNotifications:
  297. if seen != want[i] {
  298. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  299. }
  300. }
  301. }
  302. select {
  303. case <-timeout:
  304. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
  305. case <-server1Done:
  306. }
  307. select {
  308. case <-timeout:
  309. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
  310. case <-server2Done:
  311. }
  312. }
  313. // When there are multiple addresses, and we enter READY on one of them, a later closure should cause
  314. // the client to enter TRANSIENT FAILURE before it re-enters CONNECTING.
  315. func TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
  316. defer leakcheck.Check(t)
  317. want := []connectivity.State{
  318. connectivity.Connecting,
  319. connectivity.Ready,
  320. connectivity.TransientFailure,
  321. connectivity.Connecting,
  322. }
  323. stateNotifications := make(chan connectivity.State, len(want))
  324. testBalancer.ResetNotifier(stateNotifications)
  325. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  326. defer cancel()
  327. lis1, err := net.Listen("tcp", "localhost:0")
  328. if err != nil {
  329. t.Fatalf("Error while listening. Err: %v", err)
  330. }
  331. defer lis1.Close()
  332. // Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
  333. lis2, err := net.Listen("tcp", "localhost:0")
  334. if err != nil {
  335. t.Fatalf("Error while listening. Err: %v", err)
  336. }
  337. defer lis2.Close()
  338. server1Done := make(chan struct{})
  339. sawReady := make(chan struct{})
  340. // Launch server 1.
  341. go func() {
  342. conn, err := lis1.Accept()
  343. if err != nil {
  344. t.Error(err)
  345. return
  346. }
  347. go keepReading(conn)
  348. framer := http2.NewFramer(conn, conn)
  349. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  350. t.Errorf("Error while writing settings frame. %v", err)
  351. return
  352. }
  353. <-sawReady
  354. conn.Close()
  355. _, err = lis1.Accept()
  356. if err != nil {
  357. t.Error(err)
  358. return
  359. }
  360. close(server1Done)
  361. }()
  362. rb := manual.NewBuilderWithScheme("whatever")
  363. rb.InitialAddrs([]resolver.Address{
  364. {Addr: lis1.Addr().String()},
  365. {Addr: lis2.Addr().String()},
  366. })
  367. client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
  368. if err != nil {
  369. t.Fatal(err)
  370. }
  371. defer client.Close()
  372. timeout := time.After(2 * time.Second)
  373. for i := 0; i < len(want); i++ {
  374. select {
  375. case <-timeout:
  376. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  377. case seen := <-stateNotifications:
  378. if seen == connectivity.Ready {
  379. close(sawReady)
  380. }
  381. if seen != want[i] {
  382. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  383. }
  384. }
  385. }
  386. select {
  387. case <-timeout:
  388. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
  389. case <-server1Done:
  390. }
  391. }
  392. type stateRecordingBalancer struct {
  393. mu sync.Mutex
  394. notifier chan<- connectivity.State
  395. balancer.Balancer
  396. }
  397. func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  398. b.mu.Lock()
  399. b.notifier <- s
  400. b.mu.Unlock()
  401. b.Balancer.HandleSubConnStateChange(sc, s)
  402. }
  403. func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
  404. b.mu.Lock()
  405. defer b.mu.Unlock()
  406. b.notifier = r
  407. }
  408. func (b *stateRecordingBalancer) Close() {
  409. b.mu.Lock()
  410. u := b.Balancer
  411. b.mu.Unlock()
  412. u.Close()
  413. }
  414. func (b *stateRecordingBalancer) Name() string {
  415. return stateRecordingBalancerName
  416. }
  417. func (b *stateRecordingBalancer) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  418. b.mu.Lock()
  419. b.Balancer = balancer.Get(PickFirstBalancerName).Build(cc, opts)
  420. b.mu.Unlock()
  421. return b
  422. }
  423. type noBackoff struct{}
  424. func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) }
  425. // Keep reading until something causes the connection to die (EOF, server closed, etc). Useful
  426. // as a tool for mindlessly keeping the connection healthy, since the client will error if
  427. // things like client prefaces are not accepted in a timely fashion.
  428. func keepReading(conn net.Conn) {
  429. buf := make([]byte, 1024)
  430. for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
  431. }
  432. }