pickfirst_test.go 12 KB


  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "math"
  22. "sync"
  23. "testing"
  24. "time"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/internal/leakcheck"
  27. "google.golang.org/grpc/resolver"
  28. "google.golang.org/grpc/resolver/manual"
  29. "google.golang.org/grpc/status"
  30. )
  31. func errorDesc(err error) string {
  32. if s, ok := status.FromError(err); ok {
  33. return s.Message()
  34. }
  35. return err.Error()
  36. }
  37. func TestOneBackendPickfirst(t *testing.T) {
  38. defer leakcheck.Check(t)
  39. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  40. defer rcleanup()
  41. numServers := 1
  42. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  43. defer scleanup()
  44. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  45. if err != nil {
  46. t.Fatalf("failed to dial: %v", err)
  47. }
  48. defer cc.Close()
  49. // The first RPC should fail because there's no address.
  50. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  51. defer cancel()
  52. req := "port"
  53. var reply string
  54. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  55. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  56. }
  57. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
  58. // The second RPC should succeed.
  59. for i := 0; i < 1000; i++ {
  60. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  61. return
  62. }
  63. time.Sleep(time.Millisecond)
  64. }
  65. t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  66. }
  67. func TestBackendsPickfirst(t *testing.T) {
  68. defer leakcheck.Check(t)
  69. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  70. defer rcleanup()
  71. numServers := 2
  72. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  73. defer scleanup()
  74. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  75. if err != nil {
  76. t.Fatalf("failed to dial: %v", err)
  77. }
  78. defer cc.Close()
  79. // The first RPC should fail because there's no address.
  80. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  81. defer cancel()
  82. req := "port"
  83. var reply string
  84. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  85. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  86. }
  87. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
  88. // The second RPC should succeed with the first server.
  89. for i := 0; i < 1000; i++ {
  90. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  91. return
  92. }
  93. time.Sleep(time.Millisecond)
  94. }
  95. t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  96. }
  97. func TestNewAddressWhileBlockingPickfirst(t *testing.T) {
  98. defer leakcheck.Check(t)
  99. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  100. defer rcleanup()
  101. numServers := 1
  102. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  103. defer scleanup()
  104. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  105. if err != nil {
  106. t.Fatalf("failed to dial: %v", err)
  107. }
  108. defer cc.Close()
  109. // The first RPC should fail because there's no address.
  110. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  111. defer cancel()
  112. req := "port"
  113. var reply string
  114. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  115. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  116. }
  117. var wg sync.WaitGroup
  118. for i := 0; i < 3; i++ {
  119. wg.Add(1)
  120. go func() {
  121. defer wg.Done()
  122. // This RPC blocks until NewAddress is called.
  123. cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  124. }()
  125. }
  126. time.Sleep(50 * time.Millisecond)
  127. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
  128. wg.Wait()
  129. }
  130. func TestCloseWithPendingRPCPickfirst(t *testing.T) {
  131. defer leakcheck.Check(t)
  132. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  133. defer rcleanup()
  134. numServers := 1
  135. _, _, scleanup := startServers(t, numServers, math.MaxInt32)
  136. defer scleanup()
  137. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  138. if err != nil {
  139. t.Fatalf("failed to dial: %v", err)
  140. }
  141. defer cc.Close()
  142. // The first RPC should fail because there's no address.
  143. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  144. defer cancel()
  145. req := "port"
  146. var reply string
  147. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  148. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  149. }
  150. var wg sync.WaitGroup
  151. for i := 0; i < 3; i++ {
  152. wg.Add(1)
  153. go func() {
  154. defer wg.Done()
  155. // This RPC blocks until NewAddress is called.
  156. cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  157. }()
  158. }
  159. time.Sleep(50 * time.Millisecond)
  160. cc.Close()
  161. wg.Wait()
  162. }
  163. func TestOneServerDownPickfirst(t *testing.T) {
  164. defer leakcheck.Check(t)
  165. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  166. defer rcleanup()
  167. numServers := 2
  168. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  169. defer scleanup()
  170. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
  171. if err != nil {
  172. t.Fatalf("failed to dial: %v", err)
  173. }
  174. defer cc.Close()
  175. // The first RPC should fail because there's no address.
  176. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  177. defer cancel()
  178. req := "port"
  179. var reply string
  180. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  181. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  182. }
  183. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
  184. // The second RPC should succeed with the first server.
  185. for i := 0; i < 1000; i++ {
  186. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  187. break
  188. }
  189. time.Sleep(time.Millisecond)
  190. }
  191. servers[0].stop()
  192. for i := 0; i < 1000; i++ {
  193. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  194. return
  195. }
  196. time.Sleep(time.Millisecond)
  197. }
  198. t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  199. }
  200. func TestAllServersDownPickfirst(t *testing.T) {
  201. defer leakcheck.Check(t)
  202. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  203. defer rcleanup()
  204. numServers := 2
  205. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  206. defer scleanup()
  207. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
  208. if err != nil {
  209. t.Fatalf("failed to dial: %v", err)
  210. }
  211. defer cc.Close()
  212. // The first RPC should fail because there's no address.
  213. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  214. defer cancel()
  215. req := "port"
  216. var reply string
  217. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  218. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  219. }
  220. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
  221. // The second RPC should succeed with the first server.
  222. for i := 0; i < 1000; i++ {
  223. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  224. break
  225. }
  226. time.Sleep(time.Millisecond)
  227. }
  228. for i := 0; i < numServers; i++ {
  229. servers[i].stop()
  230. }
  231. for i := 0; i < 1000; i++ {
  232. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); status.Code(err) == codes.Unavailable {
  233. return
  234. }
  235. time.Sleep(time.Millisecond)
  236. }
  237. t.Fatalf("EmptyCall() = _, %v, want _, error with code unavailable", err)
  238. }
  239. func TestAddressesRemovedPickfirst(t *testing.T) {
  240. defer leakcheck.Check(t)
  241. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  242. defer rcleanup()
  243. numServers := 3
  244. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  245. defer scleanup()
  246. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  247. if err != nil {
  248. t.Fatalf("failed to dial: %v", err)
  249. }
  250. defer cc.Close()
  251. // The first RPC should fail because there's no address.
  252. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  253. defer cancel()
  254. req := "port"
  255. var reply string
  256. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  257. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  258. }
  259. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}})
  260. for i := 0; i < 1000; i++ {
  261. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  262. break
  263. }
  264. time.Sleep(time.Millisecond)
  265. }
  266. for i := 0; i < 20; i++ {
  267. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  268. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  269. }
  270. time.Sleep(10 * time.Millisecond)
  271. }
  272. // Remove server[0].
  273. r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}})
  274. for i := 0; i < 1000; i++ {
  275. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  276. break
  277. }
  278. time.Sleep(time.Millisecond)
  279. }
  280. for i := 0; i < 20; i++ {
  281. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  282. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  283. }
  284. time.Sleep(10 * time.Millisecond)
  285. }
  286. // Append server[0], nothing should change.
  287. r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}})
  288. for i := 0; i < 20; i++ {
  289. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  290. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  291. }
  292. time.Sleep(10 * time.Millisecond)
  293. }
  294. // Remove server[1].
  295. r.NewAddress([]resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}})
  296. for i := 0; i < 1000; i++ {
  297. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
  298. break
  299. }
  300. time.Sleep(time.Millisecond)
  301. }
  302. for i := 0; i < 20; i++ {
  303. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
  304. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
  305. }
  306. time.Sleep(10 * time.Millisecond)
  307. }
  308. // Remove server[2].
  309. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
  310. for i := 0; i < 1000; i++ {
  311. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  312. break
  313. }
  314. time.Sleep(time.Millisecond)
  315. }
  316. for i := 0; i < 20; i++ {
  317. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  318. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  319. }
  320. time.Sleep(10 * time.Millisecond)
  321. }
  322. }