balancer_test.go 24 KB


  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "math"
  23. "strconv"
  24. "sync"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc/codes"
  28. _ "google.golang.org/grpc/grpclog/glogger"
  29. "google.golang.org/grpc/internal/leakcheck"
  30. "google.golang.org/grpc/naming"
  31. "google.golang.org/grpc/status"
  32. // V1 balancer tests use passthrough resolver instead of dns.
  33. // TODO(bar) remove this when removing v1 balaner entirely.
  34. _ "google.golang.org/grpc/resolver/passthrough"
  35. )
  36. func pickFirstBalancerV1(r naming.Resolver) Balancer {
  37. return &pickFirst{&roundRobin{r: r}}
  38. }
  39. type testWatcher struct {
  40. // the channel to receives name resolution updates
  41. update chan *naming.Update
  42. // the side channel to get to know how many updates in a batch
  43. side chan int
  44. // the channel to notify update injector that the update reading is done
  45. readDone chan int
  46. }
  47. func (w *testWatcher) Next() (updates []*naming.Update, err error) {
  48. n := <-w.side
  49. if n == 0 {
  50. return nil, fmt.Errorf("w.side is closed")
  51. }
  52. for i := 0; i < n; i++ {
  53. u := <-w.update
  54. if u != nil {
  55. updates = append(updates, u)
  56. }
  57. }
  58. w.readDone <- 0
  59. return
  60. }
  61. func (w *testWatcher) Close() {
  62. close(w.side)
  63. }
  64. // Inject naming resolution updates to the testWatcher.
  65. func (w *testWatcher) inject(updates []*naming.Update) {
  66. w.side <- len(updates)
  67. for _, u := range updates {
  68. w.update <- u
  69. }
  70. <-w.readDone
  71. }
  72. type testNameResolver struct {
  73. w *testWatcher
  74. addr string
  75. }
  76. func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
  77. r.w = &testWatcher{
  78. update: make(chan *naming.Update, 1),
  79. side: make(chan int, 1),
  80. readDone: make(chan int),
  81. }
  82. r.w.side <- 1
  83. r.w.update <- &naming.Update{
  84. Op: naming.Add,
  85. Addr: r.addr,
  86. }
  87. go func() {
  88. <-r.w.readDone
  89. }()
  90. return r.w, nil
  91. }
  92. func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
  93. var servers []*server
  94. for i := 0; i < numServers; i++ {
  95. s := newTestServer()
  96. servers = append(servers, s)
  97. go s.start(t, 0, maxStreams)
  98. s.wait(t, 2*time.Second)
  99. }
  100. // Point to server[0]
  101. addr := "localhost:" + servers[0].port
  102. return servers, &testNameResolver{
  103. addr: addr,
  104. }, func() {
  105. for i := 0; i < numServers; i++ {
  106. servers[i].stop()
  107. }
  108. }
  109. }
  110. func TestNameDiscovery(t *testing.T) {
  111. defer leakcheck.Check(t)
  112. // Start 2 servers on 2 ports.
  113. numServers := 2
  114. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  115. defer cleanup()
  116. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  117. if err != nil {
  118. t.Fatalf("Failed to create ClientConn: %v", err)
  119. }
  120. defer cc.Close()
  121. req := "port"
  122. var reply string
  123. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  124. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  125. }
  126. // Inject the name resolution change to remove servers[0] and add servers[1].
  127. var updates []*naming.Update
  128. updates = append(updates, &naming.Update{
  129. Op: naming.Delete,
  130. Addr: "localhost:" + servers[0].port,
  131. })
  132. updates = append(updates, &naming.Update{
  133. Op: naming.Add,
  134. Addr: "localhost:" + servers[1].port,
  135. })
  136. r.w.inject(updates)
  137. // Loop until the rpcs in flight talks to servers[1].
  138. for {
  139. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  140. break
  141. }
  142. time.Sleep(10 * time.Millisecond)
  143. }
  144. }
  145. func TestEmptyAddrs(t *testing.T) {
  146. defer leakcheck.Check(t)
  147. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  148. defer cleanup()
  149. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  150. if err != nil {
  151. t.Fatalf("Failed to create ClientConn: %v", err)
  152. }
  153. defer cc.Close()
  154. var reply string
  155. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
  156. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
  157. }
  158. // Inject name resolution change to remove the server so that there is no address
  159. // available after that.
  160. u := &naming.Update{
  161. Op: naming.Delete,
  162. Addr: "localhost:" + servers[0].port,
  163. }
  164. r.w.inject([]*naming.Update{u})
  165. // Loop until the above updates apply.
  166. for {
  167. time.Sleep(10 * time.Millisecond)
  168. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  169. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
  170. cancel()
  171. break
  172. }
  173. cancel()
  174. }
  175. }
  176. func TestRoundRobin(t *testing.T) {
  177. defer leakcheck.Check(t)
  178. // Start 3 servers on 3 ports.
  179. numServers := 3
  180. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  181. defer cleanup()
  182. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  183. if err != nil {
  184. t.Fatalf("Failed to create ClientConn: %v", err)
  185. }
  186. defer cc.Close()
  187. // Add servers[1] to the service discovery.
  188. u := &naming.Update{
  189. Op: naming.Add,
  190. Addr: "localhost:" + servers[1].port,
  191. }
  192. r.w.inject([]*naming.Update{u})
  193. req := "port"
  194. var reply string
  195. // Loop until servers[1] is up
  196. for {
  197. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  198. break
  199. }
  200. time.Sleep(10 * time.Millisecond)
  201. }
  202. // Add server2[2] to the service discovery.
  203. u = &naming.Update{
  204. Op: naming.Add,
  205. Addr: "localhost:" + servers[2].port,
  206. }
  207. r.w.inject([]*naming.Update{u})
  208. // Loop until both servers[2] are up.
  209. for {
  210. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
  211. break
  212. }
  213. time.Sleep(10 * time.Millisecond)
  214. }
  215. // Check the incoming RPCs served in a round-robin manner.
  216. for i := 0; i < 10; i++ {
  217. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port {
  218. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
  219. }
  220. }
  221. }
  222. func TestCloseWithPendingRPC(t *testing.T) {
  223. defer leakcheck.Check(t)
  224. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  225. defer cleanup()
  226. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  227. if err != nil {
  228. t.Fatalf("Failed to create ClientConn: %v", err)
  229. }
  230. defer cc.Close()
  231. var reply string
  232. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
  233. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  234. }
  235. // Remove the server.
  236. updates := []*naming.Update{{
  237. Op: naming.Delete,
  238. Addr: "localhost:" + servers[0].port,
  239. }}
  240. r.w.inject(updates)
  241. // Loop until the above update applies.
  242. for {
  243. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  244. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
  245. cancel()
  246. break
  247. }
  248. time.Sleep(10 * time.Millisecond)
  249. cancel()
  250. }
  251. // Issue 2 RPCs which should be completed with error status once cc is closed.
  252. var wg sync.WaitGroup
  253. wg.Add(2)
  254. go func() {
  255. defer wg.Done()
  256. var reply string
  257. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
  258. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  259. }
  260. }()
  261. go func() {
  262. defer wg.Done()
  263. var reply string
  264. time.Sleep(5 * time.Millisecond)
  265. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
  266. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  267. }
  268. }()
  269. time.Sleep(5 * time.Millisecond)
  270. cc.Close()
  271. wg.Wait()
  272. }
  273. func TestGetOnWaitChannel(t *testing.T) {
  274. defer leakcheck.Check(t)
  275. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  276. defer cleanup()
  277. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  278. if err != nil {
  279. t.Fatalf("Failed to create ClientConn: %v", err)
  280. }
  281. defer cc.Close()
  282. // Remove all servers so that all upcoming RPCs will block on waitCh.
  283. updates := []*naming.Update{{
  284. Op: naming.Delete,
  285. Addr: "localhost:" + servers[0].port,
  286. }}
  287. r.w.inject(updates)
  288. for {
  289. var reply string
  290. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  291. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
  292. cancel()
  293. break
  294. }
  295. cancel()
  296. time.Sleep(10 * time.Millisecond)
  297. }
  298. var wg sync.WaitGroup
  299. wg.Add(1)
  300. go func() {
  301. defer wg.Done()
  302. var reply string
  303. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
  304. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
  305. }
  306. }()
  307. // Add a connected server to get the above RPC through.
  308. updates = []*naming.Update{{
  309. Op: naming.Add,
  310. Addr: "localhost:" + servers[0].port,
  311. }}
  312. r.w.inject(updates)
  313. // Wait until the above RPC succeeds.
  314. wg.Wait()
  315. }
  316. func TestOneServerDown(t *testing.T) {
  317. defer leakcheck.Check(t)
  318. // Start 2 servers.
  319. numServers := 2
  320. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  321. defer cleanup()
  322. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
  323. if err != nil {
  324. t.Fatalf("Failed to create ClientConn: %v", err)
  325. }
  326. defer cc.Close()
  327. // Add servers[1] to the service discovery.
  328. var updates []*naming.Update
  329. updates = append(updates, &naming.Update{
  330. Op: naming.Add,
  331. Addr: "localhost:" + servers[1].port,
  332. })
  333. r.w.inject(updates)
  334. req := "port"
  335. var reply string
  336. // Loop until servers[1] is up
  337. for {
  338. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  339. break
  340. }
  341. time.Sleep(10 * time.Millisecond)
  342. }
  343. var wg sync.WaitGroup
  344. numRPC := 100
  345. sleepDuration := 10 * time.Millisecond
  346. wg.Add(1)
  347. go func() {
  348. time.Sleep(sleepDuration)
  349. // After sleepDuration, kill server[0].
  350. servers[0].stop()
  351. wg.Done()
  352. }()
  353. // All non-failfast RPCs should not block because there's at least one connection available.
  354. for i := 0; i < numRPC; i++ {
  355. wg.Add(1)
  356. go func() {
  357. time.Sleep(sleepDuration)
  358. // After sleepDuration, invoke RPC.
  359. // server[0] is killed around the same time to make it racy between balancer and gRPC internals.
  360. cc.Invoke(context.Background(), "/foo/bar", &req, &reply, FailFast(false))
  361. wg.Done()
  362. }()
  363. }
  364. wg.Wait()
  365. }
  366. func TestOneAddressRemoval(t *testing.T) {
  367. defer leakcheck.Check(t)
  368. // Start 2 servers.
  369. numServers := 2
  370. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  371. defer cleanup()
  372. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  373. if err != nil {
  374. t.Fatalf("Failed to create ClientConn: %v", err)
  375. }
  376. defer cc.Close()
  377. // Add servers[1] to the service discovery.
  378. var updates []*naming.Update
  379. updates = append(updates, &naming.Update{
  380. Op: naming.Add,
  381. Addr: "localhost:" + servers[1].port,
  382. })
  383. r.w.inject(updates)
  384. req := "port"
  385. var reply string
  386. // Loop until servers[1] is up
  387. for {
  388. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  389. break
  390. }
  391. time.Sleep(10 * time.Millisecond)
  392. }
  393. var wg sync.WaitGroup
  394. numRPC := 100
  395. sleepDuration := 10 * time.Millisecond
  396. wg.Add(1)
  397. go func() {
  398. time.Sleep(sleepDuration)
  399. // After sleepDuration, delete server[0].
  400. var updates []*naming.Update
  401. updates = append(updates, &naming.Update{
  402. Op: naming.Delete,
  403. Addr: "localhost:" + servers[0].port,
  404. })
  405. r.w.inject(updates)
  406. wg.Done()
  407. }()
  408. // All non-failfast RPCs should not fail because there's at least one connection available.
  409. for i := 0; i < numRPC; i++ {
  410. wg.Add(1)
  411. go func() {
  412. var reply string
  413. time.Sleep(sleepDuration)
  414. // After sleepDuration, invoke RPC.
  415. // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
  416. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
  417. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
  418. }
  419. wg.Done()
  420. }()
  421. }
  422. wg.Wait()
  423. }
  424. func checkServerUp(t *testing.T, currentServer *server) {
  425. req := "port"
  426. port := currentServer.port
  427. cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  428. if err != nil {
  429. t.Fatalf("Failed to create ClientConn: %v", err)
  430. }
  431. defer cc.Close()
  432. var reply string
  433. for {
  434. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port {
  435. break
  436. }
  437. time.Sleep(10 * time.Millisecond)
  438. }
  439. }
  440. func TestPickFirstEmptyAddrs(t *testing.T) {
  441. defer leakcheck.Check(t)
  442. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  443. defer cleanup()
  444. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  445. if err != nil {
  446. t.Fatalf("Failed to create ClientConn: %v", err)
  447. }
  448. defer cc.Close()
  449. var reply string
  450. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
  451. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
  452. }
  453. // Inject name resolution change to remove the server so that there is no address
  454. // available after that.
  455. u := &naming.Update{
  456. Op: naming.Delete,
  457. Addr: "localhost:" + servers[0].port,
  458. }
  459. r.w.inject([]*naming.Update{u})
  460. // Loop until the above updates apply.
  461. for {
  462. time.Sleep(10 * time.Millisecond)
  463. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  464. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
  465. cancel()
  466. break
  467. }
  468. cancel()
  469. }
  470. }
  471. func TestPickFirstCloseWithPendingRPC(t *testing.T) {
  472. defer leakcheck.Check(t)
  473. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  474. defer cleanup()
  475. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  476. if err != nil {
  477. t.Fatalf("Failed to create ClientConn: %v", err)
  478. }
  479. defer cc.Close()
  480. var reply string
  481. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
  482. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  483. }
  484. // Remove the server.
  485. updates := []*naming.Update{{
  486. Op: naming.Delete,
  487. Addr: "localhost:" + servers[0].port,
  488. }}
  489. r.w.inject(updates)
  490. // Loop until the above update applies.
  491. for {
  492. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  493. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
  494. cancel()
  495. break
  496. }
  497. time.Sleep(10 * time.Millisecond)
  498. cancel()
  499. }
  500. // Issue 2 RPCs which should be completed with error status once cc is closed.
  501. var wg sync.WaitGroup
  502. wg.Add(2)
  503. go func() {
  504. defer wg.Done()
  505. var reply string
  506. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
  507. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  508. }
  509. }()
  510. go func() {
  511. defer wg.Done()
  512. var reply string
  513. time.Sleep(5 * time.Millisecond)
  514. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
  515. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  516. }
  517. }()
  518. time.Sleep(5 * time.Millisecond)
  519. cc.Close()
  520. wg.Wait()
  521. }
  522. func TestPickFirstOrderAllServerUp(t *testing.T) {
  523. defer leakcheck.Check(t)
  524. // Start 3 servers on 3 ports.
  525. numServers := 3
  526. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  527. defer cleanup()
  528. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  529. if err != nil {
  530. t.Fatalf("Failed to create ClientConn: %v", err)
  531. }
  532. defer cc.Close()
  533. // Add servers[1] and [2] to the service discovery.
  534. u := &naming.Update{
  535. Op: naming.Add,
  536. Addr: "localhost:" + servers[1].port,
  537. }
  538. r.w.inject([]*naming.Update{u})
  539. u = &naming.Update{
  540. Op: naming.Add,
  541. Addr: "localhost:" + servers[2].port,
  542. }
  543. r.w.inject([]*naming.Update{u})
  544. // Loop until all 3 servers are up
  545. checkServerUp(t, servers[0])
  546. checkServerUp(t, servers[1])
  547. checkServerUp(t, servers[2])
  548. // Check the incoming RPCs served in server[0]
  549. req := "port"
  550. var reply string
  551. for i := 0; i < 20; i++ {
  552. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  553. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  554. }
  555. time.Sleep(10 * time.Millisecond)
  556. }
  557. // Delete server[0] in the balancer, the incoming RPCs served in server[1]
  558. // For test addrconn, close server[0] instead
  559. u = &naming.Update{
  560. Op: naming.Delete,
  561. Addr: "localhost:" + servers[0].port,
  562. }
  563. r.w.inject([]*naming.Update{u})
  564. // Loop until it changes to server[1]
  565. for {
  566. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  567. break
  568. }
  569. time.Sleep(10 * time.Millisecond)
  570. }
  571. for i := 0; i < 20; i++ {
  572. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  573. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  574. }
  575. time.Sleep(10 * time.Millisecond)
  576. }
  577. // Add server[0] back to the balancer, the incoming RPCs served in server[1]
  578. // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
  579. u = &naming.Update{
  580. Op: naming.Add,
  581. Addr: "localhost:" + servers[0].port,
  582. }
  583. r.w.inject([]*naming.Update{u})
  584. for i := 0; i < 20; i++ {
  585. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  586. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  587. }
  588. time.Sleep(10 * time.Millisecond)
  589. }
  590. // Delete server[1] in the balancer, the incoming RPCs served in server[2]
  591. u = &naming.Update{
  592. Op: naming.Delete,
  593. Addr: "localhost:" + servers[1].port,
  594. }
  595. r.w.inject([]*naming.Update{u})
  596. for {
  597. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
  598. break
  599. }
  600. time.Sleep(1 * time.Second)
  601. }
  602. for i := 0; i < 20; i++ {
  603. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
  604. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
  605. }
  606. time.Sleep(10 * time.Millisecond)
  607. }
  608. // Delete server[2] in the balancer, the incoming RPCs served in server[0]
  609. u = &naming.Update{
  610. Op: naming.Delete,
  611. Addr: "localhost:" + servers[2].port,
  612. }
  613. r.w.inject([]*naming.Update{u})
  614. for {
  615. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  616. break
  617. }
  618. time.Sleep(1 * time.Second)
  619. }
  620. for i := 0; i < 20; i++ {
  621. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  622. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  623. }
  624. time.Sleep(10 * time.Millisecond)
  625. }
  626. }
  627. func TestPickFirstOrderOneServerDown(t *testing.T) {
  628. defer leakcheck.Check(t)
  629. // Start 3 servers on 3 ports.
  630. numServers := 3
  631. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  632. defer cleanup()
  633. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
  634. if err != nil {
  635. t.Fatalf("Failed to create ClientConn: %v", err)
  636. }
  637. defer cc.Close()
  638. // Add servers[1] and [2] to the service discovery.
  639. u := &naming.Update{
  640. Op: naming.Add,
  641. Addr: "localhost:" + servers[1].port,
  642. }
  643. r.w.inject([]*naming.Update{u})
  644. u = &naming.Update{
  645. Op: naming.Add,
  646. Addr: "localhost:" + servers[2].port,
  647. }
  648. r.w.inject([]*naming.Update{u})
  649. // Loop until all 3 servers are up
  650. checkServerUp(t, servers[0])
  651. checkServerUp(t, servers[1])
  652. checkServerUp(t, servers[2])
  653. // Check the incoming RPCs served in server[0]
  654. req := "port"
  655. var reply string
  656. for i := 0; i < 20; i++ {
  657. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  658. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  659. }
  660. time.Sleep(10 * time.Millisecond)
  661. }
  662. // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
  663. // {server[0] server[1] server[2]}
  664. servers[0].stop()
  665. // Loop until it changes to server[1]
  666. for {
  667. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  668. break
  669. }
  670. time.Sleep(10 * time.Millisecond)
  671. }
  672. for i := 0; i < 20; i++ {
  673. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  674. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  675. }
  676. time.Sleep(10 * time.Millisecond)
  677. }
  678. // up the server[0] back, the incoming RPCs served in server[1]
  679. p, _ := strconv.Atoi(servers[0].port)
  680. servers[0] = newTestServer()
  681. go servers[0].start(t, p, math.MaxUint32)
  682. defer servers[0].stop()
  683. servers[0].wait(t, 2*time.Second)
  684. checkServerUp(t, servers[0])
  685. for i := 0; i < 20; i++ {
  686. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  687. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  688. }
  689. time.Sleep(10 * time.Millisecond)
  690. }
  691. // Delete server[1] in the balancer, the incoming RPCs served in server[0]
  692. u = &naming.Update{
  693. Op: naming.Delete,
  694. Addr: "localhost:" + servers[1].port,
  695. }
  696. r.w.inject([]*naming.Update{u})
  697. for {
  698. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  699. break
  700. }
  701. time.Sleep(1 * time.Second)
  702. }
  703. for i := 0; i < 20; i++ {
  704. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  705. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  706. }
  707. time.Sleep(10 * time.Millisecond)
  708. }
  709. }
  710. func TestPickFirstOneAddressRemoval(t *testing.T) {
  711. defer leakcheck.Check(t)
  712. // Start 2 servers.
  713. numServers := 2
  714. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  715. defer cleanup()
  716. cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  717. if err != nil {
  718. t.Fatalf("Failed to create ClientConn: %v", err)
  719. }
  720. defer cc.Close()
  721. // Add servers[1] to the service discovery.
  722. var updates []*naming.Update
  723. updates = append(updates, &naming.Update{
  724. Op: naming.Add,
  725. Addr: "localhost:" + servers[1].port,
  726. })
  727. r.w.inject(updates)
  728. // Create a new cc to Loop until servers[1] is up
  729. checkServerUp(t, servers[0])
  730. checkServerUp(t, servers[1])
  731. var wg sync.WaitGroup
  732. numRPC := 100
  733. sleepDuration := 10 * time.Millisecond
  734. wg.Add(1)
  735. go func() {
  736. time.Sleep(sleepDuration)
  737. // After sleepDuration, delete server[0].
  738. var updates []*naming.Update
  739. updates = append(updates, &naming.Update{
  740. Op: naming.Delete,
  741. Addr: "localhost:" + servers[0].port,
  742. })
  743. r.w.inject(updates)
  744. wg.Done()
  745. }()
  746. // All non-failfast RPCs should not fail because there's at least one connection available.
  747. for i := 0; i < numRPC; i++ {
  748. wg.Add(1)
  749. go func() {
  750. var reply string
  751. time.Sleep(sleepDuration)
  752. // After sleepDuration, invoke RPC.
  753. // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
  754. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
  755. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
  756. }
  757. wg.Done()
  758. }()
  759. }
  760. wg.Wait()
  761. }