roundrobin_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package roundrobin_test
  19. import (
  20. "context"
  21. "fmt"
  22. "net"
  23. "sync"
  24. "testing"
  25. "time"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/balancer/roundrobin"
  28. "google.golang.org/grpc/codes"
  29. _ "google.golang.org/grpc/grpclog/glogger"
  30. "google.golang.org/grpc/internal/leakcheck"
  31. "google.golang.org/grpc/peer"
  32. "google.golang.org/grpc/resolver"
  33. "google.golang.org/grpc/resolver/manual"
  34. "google.golang.org/grpc/status"
  35. testpb "google.golang.org/grpc/test/grpc_testing"
  36. )
  37. type testServer struct {
  38. testpb.TestServiceServer
  39. }
  40. func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  41. return &testpb.Empty{}, nil
  42. }
  43. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  44. return nil
  45. }
  46. type test struct {
  47. servers []*grpc.Server
  48. addresses []string
  49. }
  50. func (t *test) cleanup() {
  51. for _, s := range t.servers {
  52. s.Stop()
  53. }
  54. }
  55. func startTestServers(count int) (_ *test, err error) {
  56. t := &test{}
  57. defer func() {
  58. if err != nil {
  59. for _, s := range t.servers {
  60. s.Stop()
  61. }
  62. }
  63. }()
  64. for i := 0; i < count; i++ {
  65. lis, err := net.Listen("tcp", "localhost:0")
  66. if err != nil {
  67. return nil, fmt.Errorf("Failed to listen %v", err)
  68. }
  69. s := grpc.NewServer()
  70. testpb.RegisterTestServiceServer(s, &testServer{})
  71. t.servers = append(t.servers, s)
  72. t.addresses = append(t.addresses, lis.Addr().String())
  73. go func(s *grpc.Server, l net.Listener) {
  74. s.Serve(l)
  75. }(s, lis)
  76. }
  77. return t, nil
  78. }
  79. func TestOneBackend(t *testing.T) {
  80. defer leakcheck.Check(t)
  81. r, cleanup := manual.GenerateAndRegisterManualResolver()
  82. defer cleanup()
  83. test, err := startTestServers(1)
  84. if err != nil {
  85. t.Fatalf("failed to start servers: %v", err)
  86. }
  87. defer test.cleanup()
  88. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  89. if err != nil {
  90. t.Fatalf("failed to dial: %v", err)
  91. }
  92. defer cc.Close()
  93. testc := testpb.NewTestServiceClient(cc)
  94. // The first RPC should fail because there's no address.
  95. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  96. defer cancel()
  97. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  98. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  99. }
  100. r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
  101. // The second RPC should succeed.
  102. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  103. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  104. }
  105. }
  106. func TestBackendsRoundRobin(t *testing.T) {
  107. defer leakcheck.Check(t)
  108. r, cleanup := manual.GenerateAndRegisterManualResolver()
  109. defer cleanup()
  110. backendCount := 5
  111. test, err := startTestServers(backendCount)
  112. if err != nil {
  113. t.Fatalf("failed to start servers: %v", err)
  114. }
  115. defer test.cleanup()
  116. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  117. if err != nil {
  118. t.Fatalf("failed to dial: %v", err)
  119. }
  120. defer cc.Close()
  121. testc := testpb.NewTestServiceClient(cc)
  122. // The first RPC should fail because there's no address.
  123. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  124. defer cancel()
  125. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  126. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  127. }
  128. var resolvedAddrs []resolver.Address
  129. for i := 0; i < backendCount; i++ {
  130. resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
  131. }
  132. r.NewAddress(resolvedAddrs)
  133. var p peer.Peer
  134. // Make sure connections to all servers are up.
  135. for si := 0; si < backendCount; si++ {
  136. var connected bool
  137. for i := 0; i < 1000; i++ {
  138. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  139. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  140. }
  141. if p.Addr.String() == test.addresses[si] {
  142. connected = true
  143. break
  144. }
  145. time.Sleep(time.Millisecond)
  146. }
  147. if !connected {
  148. t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
  149. }
  150. }
  151. for i := 0; i < 3*backendCount; i++ {
  152. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  153. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  154. }
  155. if p.Addr.String() != test.addresses[i%backendCount] {
  156. t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
  157. }
  158. }
  159. }
  160. func TestAddressesRemoved(t *testing.T) {
  161. defer leakcheck.Check(t)
  162. r, cleanup := manual.GenerateAndRegisterManualResolver()
  163. defer cleanup()
  164. test, err := startTestServers(1)
  165. if err != nil {
  166. t.Fatalf("failed to start servers: %v", err)
  167. }
  168. defer test.cleanup()
  169. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  170. if err != nil {
  171. t.Fatalf("failed to dial: %v", err)
  172. }
  173. defer cc.Close()
  174. testc := testpb.NewTestServiceClient(cc)
  175. // The first RPC should fail because there's no address.
  176. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  177. defer cancel()
  178. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  179. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  180. }
  181. r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
  182. // The second RPC should succeed.
  183. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  184. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  185. }
  186. r.NewAddress([]resolver.Address{})
  187. for i := 0; i < 1000; i++ {
  188. ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
  189. defer cancel()
  190. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
  191. return
  192. }
  193. time.Sleep(time.Millisecond)
  194. }
  195. t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
  196. }
  197. func TestCloseWithPendingRPC(t *testing.T) {
  198. defer leakcheck.Check(t)
  199. r, cleanup := manual.GenerateAndRegisterManualResolver()
  200. defer cleanup()
  201. test, err := startTestServers(1)
  202. if err != nil {
  203. t.Fatalf("failed to start servers: %v", err)
  204. }
  205. defer test.cleanup()
  206. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  207. if err != nil {
  208. t.Fatalf("failed to dial: %v", err)
  209. }
  210. testc := testpb.NewTestServiceClient(cc)
  211. var wg sync.WaitGroup
  212. for i := 0; i < 3; i++ {
  213. wg.Add(1)
  214. go func() {
  215. defer wg.Done()
  216. // This RPC blocks until cc is closed.
  217. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  218. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
  219. t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
  220. }
  221. cancel()
  222. }()
  223. }
  224. cc.Close()
  225. wg.Wait()
  226. }
  227. func TestNewAddressWhileBlocking(t *testing.T) {
  228. defer leakcheck.Check(t)
  229. r, cleanup := manual.GenerateAndRegisterManualResolver()
  230. defer cleanup()
  231. test, err := startTestServers(1)
  232. if err != nil {
  233. t.Fatalf("failed to start servers: %v", err)
  234. }
  235. defer test.cleanup()
  236. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  237. if err != nil {
  238. t.Fatalf("failed to dial: %v", err)
  239. }
  240. defer cc.Close()
  241. testc := testpb.NewTestServiceClient(cc)
  242. // The first RPC should fail because there's no address.
  243. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  244. defer cancel()
  245. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  246. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  247. }
  248. r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
  249. // The second RPC should succeed.
  250. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
  251. defer cancel()
  252. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  253. t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
  254. }
  255. r.NewAddress([]resolver.Address{})
  256. var wg sync.WaitGroup
  257. for i := 0; i < 3; i++ {
  258. wg.Add(1)
  259. go func() {
  260. defer wg.Done()
  261. // This RPC blocks until NewAddress is called.
  262. testc.EmptyCall(context.Background(), &testpb.Empty{})
  263. }()
  264. }
  265. time.Sleep(50 * time.Millisecond)
  266. r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
  267. wg.Wait()
  268. }
  269. func TestOneServerDown(t *testing.T) {
  270. defer leakcheck.Check(t)
  271. r, cleanup := manual.GenerateAndRegisterManualResolver()
  272. defer cleanup()
  273. backendCount := 3
  274. test, err := startTestServers(backendCount)
  275. if err != nil {
  276. t.Fatalf("failed to start servers: %v", err)
  277. }
  278. defer test.cleanup()
  279. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
  280. if err != nil {
  281. t.Fatalf("failed to dial: %v", err)
  282. }
  283. defer cc.Close()
  284. testc := testpb.NewTestServiceClient(cc)
  285. // The first RPC should fail because there's no address.
  286. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  287. defer cancel()
  288. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  289. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  290. }
  291. var resolvedAddrs []resolver.Address
  292. for i := 0; i < backendCount; i++ {
  293. resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
  294. }
  295. r.NewAddress(resolvedAddrs)
  296. var p peer.Peer
  297. // Make sure connections to all servers are up.
  298. for si := 0; si < backendCount; si++ {
  299. var connected bool
  300. for i := 0; i < 1000; i++ {
  301. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  302. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  303. }
  304. if p.Addr.String() == test.addresses[si] {
  305. connected = true
  306. break
  307. }
  308. time.Sleep(time.Millisecond)
  309. }
  310. if !connected {
  311. t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
  312. }
  313. }
  314. for i := 0; i < 3*backendCount; i++ {
  315. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  316. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  317. }
  318. if p.Addr.String() != test.addresses[i%backendCount] {
  319. t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
  320. }
  321. }
  322. // Stop one server, RPCs should roundrobin among the remaining servers.
  323. backendCount--
  324. test.servers[backendCount].Stop()
  325. // Loop until see server[backendCount-1] twice without seeing server[backendCount].
  326. var targetSeen int
  327. for i := 0; i < 1000; i++ {
  328. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  329. targetSeen = 0
  330. t.Logf("EmptyCall() = _, %v, want _, <nil>", err)
  331. // Due to a race, this RPC could possibly get the connection that
  332. // was closing, and this RPC may fail. Keep trying when this
  333. // happens.
  334. continue
  335. }
  336. switch p.Addr.String() {
  337. case test.addresses[backendCount-1]:
  338. targetSeen++
  339. case test.addresses[backendCount]:
  340. // Reset targetSeen if peer is server[backendCount].
  341. targetSeen = 0
  342. }
  343. // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
  344. if targetSeen >= 2 {
  345. break
  346. }
  347. }
  348. if targetSeen != 2 {
  349. t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
  350. }
  351. for i := 0; i < 3*backendCount; i++ {
  352. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  353. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  354. }
  355. if p.Addr.String() != test.addresses[i%backendCount] {
  356. t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
  357. }
  358. }
  359. }
  360. func TestAllServersDown(t *testing.T) {
  361. defer leakcheck.Check(t)
  362. r, cleanup := manual.GenerateAndRegisterManualResolver()
  363. defer cleanup()
  364. backendCount := 3
  365. test, err := startTestServers(backendCount)
  366. if err != nil {
  367. t.Fatalf("failed to start servers: %v", err)
  368. }
  369. defer test.cleanup()
  370. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
  371. if err != nil {
  372. t.Fatalf("failed to dial: %v", err)
  373. }
  374. defer cc.Close()
  375. testc := testpb.NewTestServiceClient(cc)
  376. // The first RPC should fail because there's no address.
  377. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  378. defer cancel()
  379. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  380. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  381. }
  382. var resolvedAddrs []resolver.Address
  383. for i := 0; i < backendCount; i++ {
  384. resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
  385. }
  386. r.NewAddress(resolvedAddrs)
  387. var p peer.Peer
  388. // Make sure connections to all servers are up.
  389. for si := 0; si < backendCount; si++ {
  390. var connected bool
  391. for i := 0; i < 1000; i++ {
  392. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  393. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  394. }
  395. if p.Addr.String() == test.addresses[si] {
  396. connected = true
  397. break
  398. }
  399. time.Sleep(time.Millisecond)
  400. }
  401. if !connected {
  402. t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
  403. }
  404. }
  405. for i := 0; i < 3*backendCount; i++ {
  406. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  407. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  408. }
  409. if p.Addr.String() != test.addresses[i%backendCount] {
  410. t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
  411. }
  412. }
  413. // All servers are stopped, failfast RPC should fail with unavailable.
  414. for i := 0; i < backendCount; i++ {
  415. test.servers[i].Stop()
  416. }
  417. time.Sleep(100 * time.Millisecond)
  418. for i := 0; i < 1000; i++ {
  419. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable {
  420. return
  421. }
  422. time.Sleep(time.Millisecond)
  423. }
  424. t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
  425. }