balancer_switching_test.go 13 KB


  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "math"
  23. "testing"
  24. "time"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/balancer/roundrobin"
  27. "google.golang.org/grpc/connectivity"
  28. _ "google.golang.org/grpc/grpclog/glogger"
  29. "google.golang.org/grpc/internal/leakcheck"
  30. "google.golang.org/grpc/resolver"
  31. "google.golang.org/grpc/resolver/manual"
  32. )
  33. var _ balancer.Builder = &magicalLB{}
  34. var _ balancer.Balancer = &magicalLB{}
  35. // magicalLB is a ringer for grpclb. It is used to avoid circular dependencies on the grpclb package
  36. type magicalLB struct{}
  37. func (b *magicalLB) Name() string {
  38. return "grpclb"
  39. }
  40. func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  41. return b
  42. }
  43. func (b *magicalLB) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {}
  44. func (b *magicalLB) HandleResolvedAddrs([]resolver.Address, error) {}
  45. func (b *magicalLB) Close() {}
  46. func init() {
  47. balancer.Register(&magicalLB{})
  48. }
  49. func checkPickFirst(cc *ClientConn, servers []*server) error {
  50. var (
  51. req = "port"
  52. reply string
  53. err error
  54. )
  55. connected := false
  56. for i := 0; i < 5000; i++ {
  57. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
  58. if connected {
  59. // connected is set to false if peer is not server[0]. So if
  60. // connected is true here, this is the second time we saw
  61. // server[0] in a row. Break because pickfirst is in effect.
  62. break
  63. }
  64. connected = true
  65. } else {
  66. connected = false
  67. }
  68. time.Sleep(time.Millisecond)
  69. }
  70. if !connected {
  71. return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  72. }
  73. // The following RPCs should all succeed with the first server.
  74. for i := 0; i < 3; i++ {
  75. err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  76. if errorDesc(err) != servers[0].port {
  77. return fmt.Errorf("Index %d: want peer %v, got peer %v", i, servers[0].port, err)
  78. }
  79. }
  80. return nil
  81. }
  82. func checkRoundRobin(cc *ClientConn, servers []*server) error {
  83. var (
  84. req = "port"
  85. reply string
  86. err error
  87. )
  88. // Make sure connections to all servers are up.
  89. for i := 0; i < 2; i++ {
  90. // Do this check twice, otherwise the first RPC's transport may still be
  91. // picked by the closing pickfirst balancer, and the test becomes flaky.
  92. for _, s := range servers {
  93. var up bool
  94. for i := 0; i < 5000; i++ {
  95. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == s.port {
  96. up = true
  97. break
  98. }
  99. time.Sleep(time.Millisecond)
  100. }
  101. if !up {
  102. return fmt.Errorf("server %v is not up within 5 second", s.port)
  103. }
  104. }
  105. }
  106. serverCount := len(servers)
  107. for i := 0; i < 3*serverCount; i++ {
  108. err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  109. if errorDesc(err) != servers[i%serverCount].port {
  110. return fmt.Errorf("Index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
  111. }
  112. }
  113. return nil
  114. }
  115. func TestSwitchBalancer(t *testing.T) {
  116. defer leakcheck.Check(t)
  117. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  118. defer rcleanup()
  119. numServers := 2
  120. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  121. defer scleanup()
  122. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  123. if err != nil {
  124. t.Fatalf("failed to dial: %v", err)
  125. }
  126. defer cc.Close()
  127. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
  128. // The default balancer is pickfirst.
  129. if err := checkPickFirst(cc, servers); err != nil {
  130. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  131. }
  132. // Switch to roundrobin.
  133. cc.handleServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  134. if err := checkRoundRobin(cc, servers); err != nil {
  135. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  136. }
  137. // Switch to pickfirst.
  138. cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
  139. if err := checkPickFirst(cc, servers); err != nil {
  140. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  141. }
  142. }
  143. // Test that balancer specified by dial option will not be overridden.
  144. func TestBalancerDialOption(t *testing.T) {
  145. defer leakcheck.Check(t)
  146. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  147. defer rcleanup()
  148. numServers := 2
  149. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  150. defer scleanup()
  151. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
  152. if err != nil {
  153. t.Fatalf("failed to dial: %v", err)
  154. }
  155. defer cc.Close()
  156. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
  157. // The init balancer is roundrobin.
  158. if err := checkRoundRobin(cc, servers); err != nil {
  159. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  160. }
  161. // Switch to pickfirst.
  162. cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
  163. // Balancer is still roundrobin.
  164. if err := checkRoundRobin(cc, servers); err != nil {
  165. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  166. }
  167. }
  168. // First addr update contains grpclb.
  169. func TestSwitchBalancerGRPCLBFirst(t *testing.T) {
  170. defer leakcheck.Check(t)
  171. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  172. defer rcleanup()
  173. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  174. if err != nil {
  175. t.Fatalf("failed to dial: %v", err)
  176. }
  177. defer cc.Close()
  178. // ClientConn will switch balancer to grpclb when receives an address of
  179. // type GRPCLB.
  180. r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}})
  181. var isGRPCLB bool
  182. for i := 0; i < 5000; i++ {
  183. cc.mu.Lock()
  184. isGRPCLB = cc.curBalancerName == "grpclb"
  185. cc.mu.Unlock()
  186. if isGRPCLB {
  187. break
  188. }
  189. time.Sleep(time.Millisecond)
  190. }
  191. if !isGRPCLB {
  192. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  193. }
  194. // New update containing new backend and new grpclb. Should not switch
  195. // balancer.
  196. r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}})
  197. for i := 0; i < 200; i++ {
  198. cc.mu.Lock()
  199. isGRPCLB = cc.curBalancerName == "grpclb"
  200. cc.mu.Unlock()
  201. if !isGRPCLB {
  202. break
  203. }
  204. time.Sleep(time.Millisecond)
  205. }
  206. if !isGRPCLB {
  207. t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
  208. }
  209. var isPickFirst bool
  210. // Switch balancer to pickfirst.
  211. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  212. for i := 0; i < 5000; i++ {
  213. cc.mu.Lock()
  214. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  215. cc.mu.Unlock()
  216. if isPickFirst {
  217. break
  218. }
  219. time.Sleep(time.Millisecond)
  220. }
  221. if !isPickFirst {
  222. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  223. }
  224. }
  225. // First addr update does not contain grpclb.
  226. func TestSwitchBalancerGRPCLBSecond(t *testing.T) {
  227. defer leakcheck.Check(t)
  228. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  229. defer rcleanup()
  230. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  231. if err != nil {
  232. t.Fatalf("failed to dial: %v", err)
  233. }
  234. defer cc.Close()
  235. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  236. var isPickFirst bool
  237. for i := 0; i < 5000; i++ {
  238. cc.mu.Lock()
  239. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  240. cc.mu.Unlock()
  241. if isPickFirst {
  242. break
  243. }
  244. time.Sleep(time.Millisecond)
  245. }
  246. if !isPickFirst {
  247. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  248. }
  249. // ClientConn will switch balancer to grpclb when receives an address of
  250. // type GRPCLB.
  251. r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}})
  252. var isGRPCLB bool
  253. for i := 0; i < 5000; i++ {
  254. cc.mu.Lock()
  255. isGRPCLB = cc.curBalancerName == "grpclb"
  256. cc.mu.Unlock()
  257. if isGRPCLB {
  258. break
  259. }
  260. time.Sleep(time.Millisecond)
  261. }
  262. if !isGRPCLB {
  263. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  264. }
  265. // New update containing new backend and new grpclb. Should not switch
  266. // balancer.
  267. r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}})
  268. for i := 0; i < 200; i++ {
  269. cc.mu.Lock()
  270. isGRPCLB = cc.curBalancerName == "grpclb"
  271. cc.mu.Unlock()
  272. if !isGRPCLB {
  273. break
  274. }
  275. time.Sleep(time.Millisecond)
  276. }
  277. if !isGRPCLB {
  278. t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
  279. }
  280. // Switch balancer back.
  281. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  282. for i := 0; i < 5000; i++ {
  283. cc.mu.Lock()
  284. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  285. cc.mu.Unlock()
  286. if isPickFirst {
  287. break
  288. }
  289. time.Sleep(time.Millisecond)
  290. }
  291. if !isPickFirst {
  292. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  293. }
  294. }
  295. // Test that if the current balancer is roundrobin, after switching to grpclb,
  296. // when the resolved address doesn't contain grpclb addresses, balancer will be
  297. // switched back to roundrobin.
  298. func TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
  299. defer leakcheck.Check(t)
  300. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  301. defer rcleanup()
  302. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  303. if err != nil {
  304. t.Fatalf("failed to dial: %v", err)
  305. }
  306. defer cc.Close()
  307. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  308. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  309. var isRoundRobin bool
  310. for i := 0; i < 5000; i++ {
  311. cc.mu.Lock()
  312. isRoundRobin = cc.curBalancerName == "round_robin"
  313. cc.mu.Unlock()
  314. if isRoundRobin {
  315. break
  316. }
  317. time.Sleep(time.Millisecond)
  318. }
  319. if !isRoundRobin {
  320. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  321. }
  322. // ClientConn will switch balancer to grpclb when receives an address of
  323. // type GRPCLB.
  324. r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}})
  325. var isGRPCLB bool
  326. for i := 0; i < 5000; i++ {
  327. cc.mu.Lock()
  328. isGRPCLB = cc.curBalancerName == "grpclb"
  329. cc.mu.Unlock()
  330. if isGRPCLB {
  331. break
  332. }
  333. time.Sleep(time.Millisecond)
  334. }
  335. if !isGRPCLB {
  336. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  337. }
  338. // Switch balancer back.
  339. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  340. for i := 0; i < 5000; i++ {
  341. cc.mu.Lock()
  342. isRoundRobin = cc.curBalancerName == "round_robin"
  343. cc.mu.Unlock()
  344. if isRoundRobin {
  345. break
  346. }
  347. time.Sleep(time.Millisecond)
  348. }
  349. if !isRoundRobin {
  350. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  351. }
  352. }
  353. // Test that if resolved address list contains grpclb, the balancer option in
  354. // service config won't take effect. But when there's no grpclb address in a new
  355. // resolved address list, balancer will be switched to the new one.
  356. func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
  357. defer leakcheck.Check(t)
  358. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  359. defer rcleanup()
  360. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  361. if err != nil {
  362. t.Fatalf("failed to dial: %v", err)
  363. }
  364. defer cc.Close()
  365. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  366. var isPickFirst bool
  367. for i := 0; i < 5000; i++ {
  368. cc.mu.Lock()
  369. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  370. cc.mu.Unlock()
  371. if isPickFirst {
  372. break
  373. }
  374. time.Sleep(time.Millisecond)
  375. }
  376. if !isPickFirst {
  377. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  378. }
  379. // ClientConn will switch balancer to grpclb when receives an address of
  380. // type GRPCLB.
  381. r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}})
  382. var isGRPCLB bool
  383. for i := 0; i < 5000; i++ {
  384. cc.mu.Lock()
  385. isGRPCLB = cc.curBalancerName == "grpclb"
  386. cc.mu.Unlock()
  387. if isGRPCLB {
  388. break
  389. }
  390. time.Sleep(time.Millisecond)
  391. }
  392. if !isGRPCLB {
  393. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  394. }
  395. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  396. var isRoundRobin bool
  397. for i := 0; i < 200; i++ {
  398. cc.mu.Lock()
  399. isRoundRobin = cc.curBalancerName == "round_robin"
  400. cc.mu.Unlock()
  401. if isRoundRobin {
  402. break
  403. }
  404. time.Sleep(time.Millisecond)
  405. }
  406. // Balancer should NOT switch to round_robin because resolved list contains
  407. // grpclb.
  408. if isRoundRobin {
  409. t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
  410. }
  411. // Switch balancer back.
  412. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  413. for i := 0; i < 5000; i++ {
  414. cc.mu.Lock()
  415. isRoundRobin = cc.curBalancerName == "round_robin"
  416. cc.mu.Unlock()
  417. if isRoundRobin {
  418. break
  419. }
  420. time.Sleep(time.Millisecond)
  421. }
  422. if !isRoundRobin {
  423. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  424. }
  425. }