healthcheck_test.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package test
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "sync"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/connectivity"
  30. _ "google.golang.org/grpc/health"
  31. healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
  32. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  33. "google.golang.org/grpc/internal"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/internal/leakcheck"
  36. "google.golang.org/grpc/resolver"
  37. "google.golang.org/grpc/resolver/manual"
  38. "google.golang.org/grpc/status"
  39. testpb "google.golang.org/grpc/test/grpc_testing"
  40. )
  41. var testHealthCheckFunc = internal.HealthCheckFunc
  42. func replaceHealthCheckFunc(f func(context.Context, func() (interface{}, error), func(bool), string) error) func() {
  43. oldHcFunc := internal.HealthCheckFunc
  44. internal.HealthCheckFunc = f
  45. return func() {
  46. internal.HealthCheckFunc = oldHcFunc
  47. }
  48. }
  49. func newTestHealthServer() *testHealthServer {
  50. return newTestHealthServerWithWatchFunc(defaultWatchFunc)
  51. }
  52. func newTestHealthServerWithWatchFunc(f func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error) *testHealthServer {
  53. return &testHealthServer{
  54. watchFunc: f,
  55. update: make(chan struct{}, 1),
  56. status: make(map[string]healthpb.HealthCheckResponse_ServingStatus),
  57. }
  58. }
  59. // defaultWatchFunc will send a HealthCheckResponse to the client whenever SetServingStatus is called.
  60. func defaultWatchFunc(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  61. if in.Service != "foo" {
  62. return status.Error(codes.FailedPrecondition,
  63. "the defaultWatchFunc only handles request with service name to be \"foo\"")
  64. }
  65. var done bool
  66. for {
  67. select {
  68. case <-stream.Context().Done():
  69. done = true
  70. case <-s.update:
  71. }
  72. if done {
  73. break
  74. }
  75. s.mu.Lock()
  76. resp := &healthpb.HealthCheckResponse{
  77. Status: s.status[in.Service],
  78. }
  79. s.mu.Unlock()
  80. stream.SendMsg(resp)
  81. }
  82. return nil
  83. }
  84. type testHealthServer struct {
  85. watchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
  86. mu sync.Mutex
  87. status map[string]healthpb.HealthCheckResponse_ServingStatus
  88. update chan struct{}
  89. }
  90. func (s *testHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
  91. return &healthpb.HealthCheckResponse{
  92. Status: healthpb.HealthCheckResponse_SERVING,
  93. }, nil
  94. }
  95. func (s *testHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  96. return s.watchFunc(s, in, stream)
  97. }
  98. // SetServingStatus is called when need to reset the serving status of a service
  99. // or insert a new service entry into the statusMap.
  100. func (s *testHealthServer) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
  101. s.mu.Lock()
  102. s.status[service] = status
  103. select {
  104. case <-s.update:
  105. default:
  106. }
  107. s.update <- struct{}{}
  108. s.mu.Unlock()
  109. }
  110. func TestHealthCheckWatchStateChange(t *testing.T) {
  111. defer leakcheck.Check(t)
  112. s := grpc.NewServer()
  113. lis, err := net.Listen("tcp", "localhost:0")
  114. if err != nil {
  115. t.Fatalf("failed to listen due to err: %v", err)
  116. }
  117. ts := newTestHealthServer()
  118. healthgrpc.RegisterHealthServer(s, ts)
  119. go s.Serve(lis)
  120. defer s.Stop()
  121. // The table below shows the expected series of addrConn connectivity transitions when server
  122. // updates its health status. As there's only one addrConn corresponds with the ClientConn in this
  123. // test, we use ClientConn's connectivity state as the addrConn connectivity state.
  124. //+------------------------------+-------------------------------------------+
  125. //| Health Check Returned Status | Expected addrConn Connectivity Transition |
  126. //+------------------------------+-------------------------------------------+
  127. //| NOT_SERVING | ->TRANSIENT FAILURE |
  128. //| SERVING | ->READY |
  129. //| SERVICE_UNKNOWN | ->TRANSIENT FAILURE |
  130. //| SERVING | ->READY |
  131. //| UNKNOWN | ->TRANSIENT FAILURE |
  132. //+------------------------------+-------------------------------------------+
  133. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING)
  134. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  135. defer rcleanup()
  136. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  137. if err != nil {
  138. t.Fatalf("dial failed due to err: %v", err)
  139. }
  140. defer cc.Close()
  141. r.NewServiceConfig(`{
  142. "healthCheckConfig": {
  143. "serviceName": "foo"
  144. }
  145. }`)
  146. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  147. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  148. defer cancel()
  149. if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
  150. t.Fatal("ClientConn is still in IDLE state when the context times out.")
  151. }
  152. if ok := cc.WaitForStateChange(ctx, connectivity.Connecting); !ok {
  153. t.Fatal("ClientConn is still in CONNECTING state when the context times out.")
  154. }
  155. if s := cc.GetState(); s != connectivity.TransientFailure {
  156. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  157. }
  158. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  159. if ok := cc.WaitForStateChange(ctx, connectivity.TransientFailure); !ok {
  160. t.Fatal("ClientConn is still in TRANSIENT FAILURE state when the context times out.")
  161. }
  162. if s := cc.GetState(); s != connectivity.Ready {
  163. t.Fatalf("ClientConn is in %v state, want READY", s)
  164. }
  165. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
  166. if ok := cc.WaitForStateChange(ctx, connectivity.Ready); !ok {
  167. t.Fatal("ClientConn is still in READY state when the context times out.")
  168. }
  169. if s := cc.GetState(); s != connectivity.TransientFailure {
  170. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  171. }
  172. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  173. if ok := cc.WaitForStateChange(ctx, connectivity.TransientFailure); !ok {
  174. t.Fatal("ClientConn is still in TRANSIENT FAILURE state when the context times out.")
  175. }
  176. if s := cc.GetState(); s != connectivity.Ready {
  177. t.Fatalf("ClientConn is in %v state, want READY", s)
  178. }
  179. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_UNKNOWN)
  180. if ok := cc.WaitForStateChange(ctx, connectivity.Ready); !ok {
  181. t.Fatal("ClientConn is still in READY state when the context times out.")
  182. }
  183. if s := cc.GetState(); s != connectivity.TransientFailure {
  184. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  185. }
  186. }
  187. // If Watch returns Unimplemented, then the ClientConn should go into READY state.
  188. func TestHealthCheckHealthServerNotRegistered(t *testing.T) {
  189. defer leakcheck.Check(t)
  190. s := grpc.NewServer()
  191. lis, err := net.Listen("tcp", "localhost:0")
  192. if err != nil {
  193. t.Fatalf("failed to listen due to err: %v", err)
  194. }
  195. go s.Serve(lis)
  196. defer s.Stop()
  197. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  198. defer rcleanup()
  199. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  200. if err != nil {
  201. t.Fatalf("dial failed due to err: %v", err)
  202. }
  203. defer cc.Close()
  204. r.NewServiceConfig(`{
  205. "healthCheckConfig": {
  206. "serviceName": "foo"
  207. }
  208. }`)
  209. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  210. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  211. defer cancel()
  212. if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
  213. t.Fatal("ClientConn is still in IDLE state when the context times out.")
  214. }
  215. if ok := cc.WaitForStateChange(ctx, connectivity.Connecting); !ok {
  216. t.Fatal("ClientConn is still in CONNECTING state when the context times out.")
  217. }
  218. if s := cc.GetState(); s != connectivity.Ready {
  219. t.Fatalf("ClientConn is in %v state, want READY", s)
  220. }
  221. }
  222. // In the case of a goaway received, the health check stream should be terminated and health check
  223. // function should exit.
  224. func TestHealthCheckWithGoAway(t *testing.T) {
  225. defer leakcheck.Check(t)
  226. s := grpc.NewServer()
  227. lis, err := net.Listen("tcp", "localhost:0")
  228. if err != nil {
  229. t.Fatalf("failed to listen due to err: %v", err)
  230. }
  231. ts := newTestHealthServer()
  232. healthgrpc.RegisterHealthServer(s, ts)
  233. testpb.RegisterTestServiceServer(s, &testServer{})
  234. go s.Serve(lis)
  235. defer s.Stop()
  236. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  237. hcExitChan := make(chan struct{})
  238. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  239. err := testHealthCheckFunc(ctx, newStream, update, service)
  240. close(hcExitChan)
  241. return err
  242. }
  243. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  244. defer replace()
  245. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  246. defer rcleanup()
  247. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  248. if err != nil {
  249. t.Fatalf("dial failed due to err: %v", err)
  250. }
  251. defer cc.Close()
  252. tc := testpb.NewTestServiceClient(cc)
  253. r.NewServiceConfig(`{
  254. "healthCheckConfig": {
  255. "serviceName": "foo"
  256. }
  257. }`)
  258. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  259. // make some rpcs to make sure connection is working.
  260. if err := verifyResultWithDelay(func() (bool, error) {
  261. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  262. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  263. }
  264. return true, nil
  265. }); err != nil {
  266. t.Fatal(err)
  267. }
  268. // the stream rpc will persist through goaway event.
  269. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  270. defer cancel()
  271. stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
  272. if err != nil {
  273. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  274. }
  275. respParam := []*testpb.ResponseParameters{{Size: 1}}
  276. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  277. if err != nil {
  278. t.Fatal(err)
  279. }
  280. req := &testpb.StreamingOutputCallRequest{
  281. ResponseParameters: respParam,
  282. Payload: payload,
  283. }
  284. if err := stream.Send(req); err != nil {
  285. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  286. }
  287. if _, err := stream.Recv(); err != nil {
  288. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  289. }
  290. select {
  291. case <-hcExitChan:
  292. t.Fatal("Health check function has exited, which is not expected.")
  293. default:
  294. }
  295. // server sends GoAway
  296. go s.GracefulStop()
  297. select {
  298. case <-hcExitChan:
  299. case <-time.After(5 * time.Second):
  300. t.Fatal("Health check function has not exited after 5s.")
  301. }
  302. // The existing RPC should be still good to proceed.
  303. if err := stream.Send(req); err != nil {
  304. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  305. }
  306. if _, err := stream.Recv(); err != nil {
  307. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  308. }
  309. }
  310. func TestHealthCheckWithConnClose(t *testing.T) {
  311. defer leakcheck.Check(t)
  312. s := grpc.NewServer()
  313. lis, err := net.Listen("tcp", "localhost:0")
  314. if err != nil {
  315. t.Fatalf("failed to listen due to err: %v", err)
  316. }
  317. ts := newTestHealthServer()
  318. healthgrpc.RegisterHealthServer(s, ts)
  319. testpb.RegisterTestServiceServer(s, &testServer{})
  320. go s.Serve(lis)
  321. defer s.Stop()
  322. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  323. hcExitChan := make(chan struct{})
  324. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  325. err := testHealthCheckFunc(ctx, newStream, update, service)
  326. close(hcExitChan)
  327. return err
  328. }
  329. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  330. defer replace()
  331. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  332. defer rcleanup()
  333. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  334. if err != nil {
  335. t.Fatalf("dial failed due to err: %v", err)
  336. }
  337. defer cc.Close()
  338. tc := testpb.NewTestServiceClient(cc)
  339. r.NewServiceConfig(`{
  340. "healthCheckConfig": {
  341. "serviceName": "foo"
  342. }
  343. }`)
  344. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  345. // make some rpcs to make sure connection is working.
  346. if err := verifyResultWithDelay(func() (bool, error) {
  347. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  348. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  349. }
  350. return true, nil
  351. }); err != nil {
  352. t.Fatal(err)
  353. }
  354. select {
  355. case <-hcExitChan:
  356. t.Fatal("Health check function has exited, which is not expected.")
  357. default:
  358. }
  359. // server closes the connection
  360. s.Stop()
  361. select {
  362. case <-hcExitChan:
  363. case <-time.After(5 * time.Second):
  364. t.Fatal("Health check function has not exited after 5s.")
  365. }
  366. }
  367. // addrConn drain happens when addrConn gets torn down due to its address being no longer in the
  368. // address list returned by the resolver.
  369. func TestHealthCheckWithAddrConnDrain(t *testing.T) {
  370. defer leakcheck.Check(t)
  371. s := grpc.NewServer()
  372. lis, err := net.Listen("tcp", "localhost:0")
  373. if err != nil {
  374. t.Fatalf("failed to listen due to err: %v", err)
  375. }
  376. ts := newTestHealthServer()
  377. healthgrpc.RegisterHealthServer(s, ts)
  378. testpb.RegisterTestServiceServer(s, &testServer{})
  379. go s.Serve(lis)
  380. defer s.Stop()
  381. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  382. hcExitChan := make(chan struct{})
  383. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  384. err := testHealthCheckFunc(ctx, newStream, update, service)
  385. close(hcExitChan)
  386. return err
  387. }
  388. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  389. defer replace()
  390. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  391. defer rcleanup()
  392. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  393. if err != nil {
  394. t.Fatalf("dial failed due to err: %v", err)
  395. }
  396. defer cc.Close()
  397. tc := testpb.NewTestServiceClient(cc)
  398. r.NewServiceConfig(`{
  399. "healthCheckConfig": {
  400. "serviceName": "foo"
  401. }
  402. }`)
  403. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  404. // make some rpcs to make sure connection is working.
  405. if err := verifyResultWithDelay(func() (bool, error) {
  406. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  407. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  408. }
  409. return true, nil
  410. }); err != nil {
  411. t.Fatal(err)
  412. }
  413. // the stream rpc will persist through goaway event.
  414. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  415. defer cancel()
  416. stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
  417. if err != nil {
  418. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  419. }
  420. respParam := []*testpb.ResponseParameters{{Size: 1}}
  421. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  422. if err != nil {
  423. t.Fatal(err)
  424. }
  425. req := &testpb.StreamingOutputCallRequest{
  426. ResponseParameters: respParam,
  427. Payload: payload,
  428. }
  429. if err := stream.Send(req); err != nil {
  430. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  431. }
  432. if _, err := stream.Recv(); err != nil {
  433. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  434. }
  435. select {
  436. case <-hcExitChan:
  437. t.Fatal("Health check function has exited, which is not expected.")
  438. default:
  439. }
  440. // trigger teardown of the ac
  441. r.NewAddress([]resolver.Address{})
  442. select {
  443. case <-hcExitChan:
  444. case <-time.After(5 * time.Second):
  445. t.Fatal("Health check function has not exited after 5s.")
  446. }
  447. // The existing RPC should be still good to proceed.
  448. if err := stream.Send(req); err != nil {
  449. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  450. }
  451. if _, err := stream.Recv(); err != nil {
  452. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  453. }
  454. }
  455. // ClientConn close will lead to its addrConns being torn down.
  456. func TestHealthCheckWithClientConnClose(t *testing.T) {
  457. defer leakcheck.Check(t)
  458. s := grpc.NewServer()
  459. lis, err := net.Listen("tcp", "localhost:0")
  460. if err != nil {
  461. t.Fatalf("failed to listen due to err: %v", err)
  462. }
  463. ts := newTestHealthServer()
  464. healthgrpc.RegisterHealthServer(s, ts)
  465. testpb.RegisterTestServiceServer(s, &testServer{})
  466. go s.Serve(lis)
  467. defer s.Stop()
  468. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  469. hcExitChan := make(chan struct{})
  470. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  471. err := testHealthCheckFunc(ctx, newStream, update, service)
  472. close(hcExitChan)
  473. return err
  474. }
  475. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  476. defer replace()
  477. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  478. defer rcleanup()
  479. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  480. if err != nil {
  481. t.Fatalf("dial failed due to err: %v", err)
  482. }
  483. defer cc.Close()
  484. tc := testpb.NewTestServiceClient(cc)
  485. r.NewServiceConfig(`{
  486. "healthCheckConfig": {
  487. "serviceName": "foo"
  488. }
  489. }`)
  490. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  491. // make some rpcs to make sure connection is working.
  492. if err := verifyResultWithDelay(func() (bool, error) {
  493. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  494. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  495. }
  496. return true, nil
  497. }); err != nil {
  498. t.Fatal(err)
  499. }
  500. select {
  501. case <-hcExitChan:
  502. t.Fatal("Health check function has exited, which is not expected.")
  503. default:
  504. }
  505. // trigger addrConn teardown
  506. cc.Close()
  507. select {
  508. case <-hcExitChan:
  509. case <-time.After(5 * time.Second):
  510. t.Fatal("Health check function has not exited after 5s.")
  511. }
  512. }
  513. // This test is to test the logic in the createTransport after the health check function returns which
  514. // closes the skipReset channel(since it has not been closed inside health check func) to unblock
  515. // onGoAway/onClose goroutine.
  516. func TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T) {
  517. defer leakcheck.Check(t)
  518. s := grpc.NewServer()
  519. lis, err := net.Listen("tcp", "localhost:0")
  520. if err != nil {
  521. t.Fatalf("failed to listen due to err %v", err)
  522. }
  523. ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  524. if in.Service != "delay" {
  525. return status.Error(codes.FailedPrecondition,
  526. "this special Watch function only handles request with service name to be \"delay\"")
  527. }
  528. // Do nothing to mock a delay of health check response from server side.
  529. // This case is to help with the test that covers the condition that reportHealth is not
  530. // called inside HealthCheckFunc before the func returns.
  531. select {
  532. case <-stream.Context().Done():
  533. case <-time.After(5 * time.Second):
  534. }
  535. return nil
  536. })
  537. healthgrpc.RegisterHealthServer(s, ts)
  538. testpb.RegisterTestServiceServer(s, &testServer{})
  539. go s.Serve(lis)
  540. defer s.Stop()
  541. ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
  542. hcEnterChan := make(chan struct{})
  543. hcExitChan := make(chan struct{})
  544. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  545. close(hcEnterChan)
  546. err := testHealthCheckFunc(ctx, newStream, update, service)
  547. close(hcExitChan)
  548. return err
  549. }
  550. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  551. defer replace()
  552. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  553. defer rcleanup()
  554. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  555. if err != nil {
  556. t.Fatalf("dial failed due to err: %v", err)
  557. }
  558. defer cc.Close()
  559. // The serviceName "delay" is specially handled at server side, where response will not be sent
  560. // back to client immediately upon receiving the request (client should receive no response until
  561. // test ends).
  562. r.NewServiceConfig(`{
  563. "healthCheckConfig": {
  564. "serviceName": "delay"
  565. }
  566. }`)
  567. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  568. select {
  569. case <-hcExitChan:
  570. t.Fatal("Health check function has exited, which is not expected.")
  571. default:
  572. }
  573. select {
  574. case <-hcEnterChan:
  575. case <-time.After(5 * time.Second):
  576. t.Fatal("Health check function has not been invoked after 5s.")
  577. }
  578. // trigger teardown of the ac, ac in SHUTDOWN state
  579. r.NewAddress([]resolver.Address{})
  580. // The health check func should exit without calling the reportHealth func, as server hasn't sent
  581. // any response.
  582. select {
  583. case <-hcExitChan:
  584. case <-time.After(5 * time.Second):
  585. t.Fatal("Health check function has not exited after 5s.")
  586. }
  587. // The deferred leakcheck will check whether there's leaked goroutine, which is an indication
  588. // whether we closes the skipReset channel to unblock onGoAway/onClose goroutine.
  589. }
  590. // This test is to test the logic in the createTransport after the health check function returns which
  591. // closes the allowedToReset channel(since it has not been closed inside health check func) to unblock
  592. // onGoAway/onClose goroutine.
  593. func TestHealthCheckWithoutReportHealthCalled(t *testing.T) {
  594. defer leakcheck.Check(t)
  595. s := grpc.NewServer()
  596. lis, err := net.Listen("tcp", "localhost:0")
  597. if err != nil {
  598. t.Fatalf("failed to listen due to err: %v", err)
  599. }
  600. ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  601. if in.Service != "delay" {
  602. return status.Error(codes.FailedPrecondition,
  603. "this special Watch function only handles request with service name to be \"delay\"")
  604. }
  605. // Do nothing to mock a delay of health check response from server side.
  606. // This case is to help with the test that covers the condition that reportHealth is not
  607. // called inside HealthCheckFunc before the func returns.
  608. select {
  609. case <-stream.Context().Done():
  610. case <-time.After(5 * time.Second):
  611. }
  612. return nil
  613. })
  614. healthgrpc.RegisterHealthServer(s, ts)
  615. testpb.RegisterTestServiceServer(s, &testServer{})
  616. go s.Serve(lis)
  617. defer s.Stop()
  618. ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
  619. hcEnterChan := make(chan struct{})
  620. hcExitChan := make(chan struct{})
  621. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  622. close(hcEnterChan)
  623. err := testHealthCheckFunc(ctx, newStream, update, service)
  624. close(hcExitChan)
  625. return err
  626. }
  627. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  628. defer replace()
  629. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  630. defer rcleanup()
  631. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  632. if err != nil {
  633. t.Fatalf("dial failed due to err: %v", err)
  634. }
  635. defer cc.Close()
  636. // The serviceName "delay" is specially handled at server side, where response will not be sent
  637. // back to client immediately upon receiving the request (client should receive no response until
  638. // test ends).
  639. r.NewServiceConfig(`{
  640. "healthCheckConfig": {
  641. "serviceName": "delay"
  642. }
  643. }`)
  644. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  645. select {
  646. case <-hcExitChan:
  647. t.Fatal("Health check function has exited, which is not expected.")
  648. default:
  649. }
  650. select {
  651. case <-hcEnterChan:
  652. case <-time.After(5 * time.Second):
  653. t.Fatal("Health check function has not been invoked after 5s.")
  654. }
  655. // trigger transport being closed
  656. s.Stop()
  657. // The health check func should exit without calling the reportHealth func, as server hasn't sent
  658. // any response.
  659. select {
  660. case <-hcExitChan:
  661. case <-time.After(5 * time.Second):
  662. t.Fatal("Health check function has not exited after 5s.")
  663. }
  664. // The deferred leakcheck will check whether there's leaked goroutine, which is an indication
  665. // whether we closes the allowedToReset channel to unblock onGoAway/onClose goroutine.
  666. }
  667. func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
  668. hcEnterChan := make(chan struct{})
  669. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  670. close(hcEnterChan)
  671. return nil
  672. }
  673. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  674. defer replace()
  675. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  676. defer rcleanup()
  677. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"), grpc.WithDisableHealthCheck())
  678. if err != nil {
  679. t.Fatalf("dial failed due to err: %v", err)
  680. }
  681. tc := testpb.NewTestServiceClient(cc)
  682. defer cc.Close()
  683. r.NewServiceConfig(`{
  684. "healthCheckConfig": {
  685. "serviceName": "foo"
  686. }
  687. }`)
  688. r.NewAddress([]resolver.Address{{Addr: addr}})
  689. // send some rpcs to make sure transport has been created and is ready for use.
  690. if err := verifyResultWithDelay(func() (bool, error) {
  691. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  692. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  693. }
  694. return true, nil
  695. }); err != nil {
  696. t.Fatal(err)
  697. }
  698. select {
  699. case <-hcEnterChan:
  700. t.Fatal("Health check function has exited, which is not expected.")
  701. default:
  702. }
  703. }
  704. func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
  705. hcEnterChan := make(chan struct{})
  706. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  707. close(hcEnterChan)
  708. return nil
  709. }
  710. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  711. defer replace()
  712. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  713. defer rcleanup()
  714. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("pick_first"))
  715. if err != nil {
  716. t.Fatalf("dial failed due to err: %v", err)
  717. }
  718. tc := testpb.NewTestServiceClient(cc)
  719. defer cc.Close()
  720. r.NewServiceConfig(`{
  721. "healthCheckConfig": {
  722. "serviceName": "foo"
  723. }
  724. }`)
  725. r.NewAddress([]resolver.Address{{Addr: addr}})
  726. // send some rpcs to make sure transport has been created and is ready for use.
  727. if err := verifyResultWithDelay(func() (bool, error) {
  728. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  729. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  730. }
  731. return true, nil
  732. }); err != nil {
  733. t.Fatal(err)
  734. }
  735. select {
  736. case <-hcEnterChan:
  737. t.Fatal("Health check function has started, which is not expected.")
  738. default:
  739. }
  740. }
  741. func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
  742. hcEnterChan := make(chan struct{})
  743. testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  744. close(hcEnterChan)
  745. return nil
  746. }
  747. replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
  748. defer replace()
  749. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  750. defer rcleanup()
  751. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  752. if err != nil {
  753. t.Fatalf("dial failed due to err: %v", err)
  754. }
  755. tc := testpb.NewTestServiceClient(cc)
  756. defer cc.Close()
  757. r.NewAddress([]resolver.Address{{Addr: addr}})
  758. // send some rpcs to make sure transport has been created and is ready for use.
  759. if err := verifyResultWithDelay(func() (bool, error) {
  760. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  761. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  762. }
  763. return true, nil
  764. }); err != nil {
  765. t.Fatal(err)
  766. }
  767. select {
  768. case <-hcEnterChan:
  769. t.Fatal("Health check function has started, which is not expected.")
  770. default:
  771. }
  772. }
  773. func TestHealthCheckDisable(t *testing.T) {
  774. defer leakcheck.Check(t)
  775. // set up server side
  776. s := grpc.NewServer()
  777. lis, err := net.Listen("tcp", "localhost:0")
  778. if err != nil {
  779. t.Fatalf("failed to listen due to err: %v", err)
  780. }
  781. ts := newTestHealthServer()
  782. healthgrpc.RegisterHealthServer(s, ts)
  783. testpb.RegisterTestServiceServer(s, &testServer{})
  784. go s.Serve(lis)
  785. defer s.Stop()
  786. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  787. // test client side disabling configuration.
  788. testHealthCheckDisableWithDialOption(t, lis.Addr().String())
  789. testHealthCheckDisableWithBalancer(t, lis.Addr().String())
  790. testHealthCheckDisableWithServiceConfig(t, lis.Addr().String())
  791. }
  792. func TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
  793. defer leakcheck.Check(t)
  794. s := grpc.NewServer()
  795. lis, err := net.Listen("tcp", "localhost:0")
  796. if err != nil {
  797. t.Fatalf("failed to listen due to err: %v", err)
  798. }
  799. ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  800. if in.Service != "channelzSuccess" {
  801. return status.Error(codes.FailedPrecondition,
  802. "this special Watch function only handles request with service name to be \"channelzSuccess\"")
  803. }
  804. return status.Error(codes.OK, "fake success")
  805. })
  806. healthgrpc.RegisterHealthServer(s, ts)
  807. testpb.RegisterTestServiceServer(s, &testServer{})
  808. go s.Serve(lis)
  809. defer s.Stop()
  810. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  811. defer rcleanup()
  812. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  813. if err != nil {
  814. t.Fatalf("dial failed due to err: %v", err)
  815. }
  816. defer cc.Close()
  817. r.NewServiceConfig(`{
  818. "healthCheckConfig": {
  819. "serviceName": "channelzSuccess"
  820. }
  821. }`)
  822. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  823. if err := verifyResultWithDelay(func() (bool, error) {
  824. cm, _ := channelz.GetTopChannels(0)
  825. if len(cm) == 0 {
  826. return false, errors.New("channelz.GetTopChannels return 0 top channel")
  827. }
  828. if len(cm[0].SubChans) == 0 {
  829. return false, errors.New("there is 0 subchannel")
  830. }
  831. var id int64
  832. for k := range cm[0].SubChans {
  833. id = k
  834. break
  835. }
  836. scm := channelz.GetSubChannel(id)
  837. if scm == nil || scm.ChannelData == nil {
  838. return false, errors.New("nil subchannel metric or nil subchannel metric ChannelData returned")
  839. }
  840. // exponential backoff retry may result in more than one health check call.
  841. if scm.ChannelData.CallsStarted > 0 && scm.ChannelData.CallsSucceeded > 0 && scm.ChannelData.CallsFailed == 0 {
  842. return true, nil
  843. }
  844. return false, fmt.Errorf("got %d CallsStarted, %d CallsSucceeded, want >0 >0", scm.ChannelData.CallsStarted, scm.ChannelData.CallsSucceeded)
  845. }); err != nil {
  846. t.Fatal(err)
  847. }
  848. }
  849. func TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
  850. defer leakcheck.Check(t)
  851. s := grpc.NewServer()
  852. lis, err := net.Listen("tcp", "localhost:0")
  853. if err != nil {
  854. t.Fatalf("failed to listen due to err: %v", err)
  855. }
  856. ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  857. if in.Service != "channelzFailure" {
  858. return status.Error(codes.FailedPrecondition,
  859. "this special Watch function only handles request with service name to be \"channelzFailure\"")
  860. }
  861. return status.Error(codes.Internal, "fake failure")
  862. })
  863. healthgrpc.RegisterHealthServer(s, ts)
  864. testpb.RegisterTestServiceServer(s, &testServer{})
  865. go s.Serve(lis)
  866. defer s.Stop()
  867. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  868. defer rcleanup()
  869. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
  870. if err != nil {
  871. t.Fatalf("dial failed due to err: %v", err)
  872. }
  873. defer cc.Close()
  874. r.NewServiceConfig(`{
  875. "healthCheckConfig": {
  876. "serviceName": "channelzFailure"
  877. }
  878. }`)
  879. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  880. if err := verifyResultWithDelay(func() (bool, error) {
  881. cm, _ := channelz.GetTopChannels(0)
  882. if len(cm) == 0 {
  883. return false, errors.New("channelz.GetTopChannels return 0 top channel")
  884. }
  885. if len(cm[0].SubChans) == 0 {
  886. return false, errors.New("there is 0 subchannel")
  887. }
  888. var id int64
  889. for k := range cm[0].SubChans {
  890. id = k
  891. break
  892. }
  893. scm := channelz.GetSubChannel(id)
  894. if scm == nil || scm.ChannelData == nil {
  895. return false, errors.New("nil subchannel metric or nil subchannel metric ChannelData returned")
  896. }
  897. // exponential backoff retry may result in more than one health check call.
  898. if scm.ChannelData.CallsStarted > 0 && scm.ChannelData.CallsFailed > 0 && scm.ChannelData.CallsSucceeded == 0 {
  899. return true, nil
  900. }
  901. return false, fmt.Errorf("got %d CallsStarted, %d CallsFailed, want >0, >0", scm.ChannelData.CallsStarted, scm.ChannelData.CallsFailed)
  902. }); err != nil {
  903. t.Fatal(err)
  904. }
  905. }