client.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package health
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "time"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/codes"
  26. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  27. "google.golang.org/grpc/internal"
  28. "google.golang.org/grpc/internal/backoff"
  29. "google.golang.org/grpc/status"
  30. )
  31. const maxDelay = 120 * time.Second
  32. var backoffStrategy = backoff.Exponential{MaxDelay: maxDelay}
  33. var backoffFunc = func(ctx context.Context, retries int) bool {
  34. d := backoffStrategy.Backoff(retries)
  35. timer := time.NewTimer(d)
  36. select {
  37. case <-timer.C:
  38. return true
  39. case <-ctx.Done():
  40. timer.Stop()
  41. return false
  42. }
  43. }
  44. func init() {
  45. internal.HealthCheckFunc = clientHealthCheck
  46. }
  47. func clientHealthCheck(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), service string) error {
  48. tryCnt := 0
  49. retryConnection:
  50. for {
  51. // Backs off if the connection has failed in some way without receiving a message in the previous retry.
  52. if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) {
  53. return nil
  54. }
  55. tryCnt++
  56. if ctx.Err() != nil {
  57. return nil
  58. }
  59. rawS, err := newStream()
  60. if err != nil {
  61. continue retryConnection
  62. }
  63. s, ok := rawS.(grpc.ClientStream)
  64. // Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes.
  65. if !ok {
  66. reportHealth(true)
  67. return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS)
  68. }
  69. if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF {
  70. // Stream should have been closed, so we can safely continue to create a new stream.
  71. continue retryConnection
  72. }
  73. s.CloseSend()
  74. resp := new(healthpb.HealthCheckResponse)
  75. for {
  76. err = s.RecvMsg(resp)
  77. // Reports healthy for the LBing purposes if health check is not implemented in the server.
  78. if status.Code(err) == codes.Unimplemented {
  79. reportHealth(true)
  80. return err
  81. }
  82. // Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED.
  83. if err != nil {
  84. reportHealth(false)
  85. continue retryConnection
  86. }
  87. // As a message has been received, removes the need for backoff for the next retry by reseting the try count.
  88. tryCnt = 0
  89. reportHealth(resp.Status == healthpb.HealthCheckResponse_SERVING)
  90. }
  91. }
  92. }