http2_client.go 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "math"
  24. "net"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "golang.org/x/net/http2"
  31. "golang.org/x/net/http2/hpack"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/internal/syscall"
  36. "google.golang.org/grpc/keepalive"
  37. "google.golang.org/grpc/metadata"
  38. "google.golang.org/grpc/peer"
  39. "google.golang.org/grpc/stats"
  40. "google.golang.org/grpc/status"
  41. )
  42. // http2Client implements the ClientTransport interface with HTTP2.
  43. type http2Client struct {
  44. ctx context.Context
  45. cancel context.CancelFunc
  46. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  47. userAgent string
  48. md interface{}
  49. conn net.Conn // underlying communication channel
  50. loopy *loopyWriter
  51. remoteAddr net.Addr
  52. localAddr net.Addr
  53. authInfo credentials.AuthInfo // auth info about the connection
  54. readerDone chan struct{} // sync point to enable testing.
  55. writerDone chan struct{} // sync point to enable testing.
  56. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  57. // that the server sent GoAway on this transport.
  58. goAway chan struct{}
  59. // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
  60. awakenKeepalive chan struct{}
  61. framer *framer
  62. // controlBuf delivers all the control related tasks (e.g., window
  63. // updates, reset streams, and various settings) to the controller.
  64. controlBuf *controlBuffer
  65. fc *trInFlow
  66. // The scheme used: https if TLS is on, http otherwise.
  67. scheme string
  68. isSecure bool
  69. perRPCCreds []credentials.PerRPCCredentials
  70. // Boolean to keep track of reading activity on transport.
  71. // 1 is true and 0 is false.
  72. activity uint32 // Accessed atomically.
  73. kp keepalive.ClientParameters
  74. keepaliveEnabled bool
  75. statsHandler stats.Handler
  76. initialWindowSize int32
  77. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  78. maxSendHeaderListSize *uint32
  79. bdpEst *bdpEstimator
  80. // onSuccess is a callback that client transport calls upon
  81. // receiving server preface to signal that a succefull HTTP2
  82. // connection was established.
  83. onSuccess func()
  84. maxConcurrentStreams uint32
  85. streamQuota int64
  86. streamsQuotaAvailable chan struct{}
  87. waitingStreams uint32
  88. nextID uint32
  89. mu sync.Mutex // guard the following variables
  90. state transportState
  91. activeStreams map[uint32]*Stream
  92. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  93. prevGoAwayID uint32
  94. // goAwayReason records the http2.ErrCode and debug data received with the
  95. // GoAway frame.
  96. goAwayReason GoAwayReason
  97. // Fields below are for channelz metric collection.
  98. channelzID int64 // channelz unique identification number
  99. czData *channelzData
  100. onGoAway func(GoAwayReason)
  101. onClose func()
  102. }
  103. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  104. if fn != nil {
  105. return fn(ctx, addr)
  106. }
  107. return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
  108. }
  109. func isTemporary(err error) bool {
  110. switch err := err.(type) {
  111. case interface {
  112. Temporary() bool
  113. }:
  114. return err.Temporary()
  115. case interface {
  116. Timeout() bool
  117. }:
  118. // Timeouts may be resolved upon retry, and are thus treated as
  119. // temporary.
  120. return err.Timeout()
  121. }
  122. return true
  123. }
  124. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  125. // and starts to receive messages on it. Non-nil error returns if construction
  126. // fails.
  127. func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
  128. scheme := "http"
  129. ctx, cancel := context.WithCancel(ctx)
  130. defer func() {
  131. if err != nil {
  132. cancel()
  133. }
  134. }()
  135. conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  136. if err != nil {
  137. if opts.FailOnNonTempDialError {
  138. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  139. }
  140. return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
  141. }
  142. // Any further errors will close the underlying connection
  143. defer func(conn net.Conn) {
  144. if err != nil {
  145. conn.Close()
  146. }
  147. }(conn)
  148. kp := opts.KeepaliveParams
  149. // Validate keepalive parameters.
  150. if kp.Time == 0 {
  151. kp.Time = defaultClientKeepaliveTime
  152. }
  153. if kp.Timeout == 0 {
  154. kp.Timeout = defaultClientKeepaliveTimeout
  155. }
  156. keepaliveEnabled := false
  157. if kp.Time != infinity {
  158. if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
  159. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  160. }
  161. keepaliveEnabled = true
  162. }
  163. var (
  164. isSecure bool
  165. authInfo credentials.AuthInfo
  166. )
  167. transportCreds := opts.TransportCredentials
  168. perRPCCreds := opts.PerRPCCredentials
  169. if b := opts.CredsBundle; b != nil {
  170. if t := b.TransportCredentials(); t != nil {
  171. transportCreds = t
  172. }
  173. if t := b.PerRPCCredentials(); t != nil {
  174. perRPCCreds = append(perRPCCreds, t)
  175. }
  176. }
  177. if transportCreds != nil {
  178. scheme = "https"
  179. conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
  180. if err != nil {
  181. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  182. }
  183. isSecure = true
  184. }
  185. dynamicWindow := true
  186. icwz := int32(initialWindowSize)
  187. if opts.InitialConnWindowSize >= defaultWindowSize {
  188. icwz = opts.InitialConnWindowSize
  189. dynamicWindow = false
  190. }
  191. writeBufSize := opts.WriteBufferSize
  192. readBufSize := opts.ReadBufferSize
  193. maxHeaderListSize := defaultClientMaxHeaderListSize
  194. if opts.MaxHeaderListSize != nil {
  195. maxHeaderListSize = *opts.MaxHeaderListSize
  196. }
  197. t := &http2Client{
  198. ctx: ctx,
  199. ctxDone: ctx.Done(), // Cache Done chan.
  200. cancel: cancel,
  201. userAgent: opts.UserAgent,
  202. md: addr.Metadata,
  203. conn: conn,
  204. remoteAddr: conn.RemoteAddr(),
  205. localAddr: conn.LocalAddr(),
  206. authInfo: authInfo,
  207. readerDone: make(chan struct{}),
  208. writerDone: make(chan struct{}),
  209. goAway: make(chan struct{}),
  210. awakenKeepalive: make(chan struct{}, 1),
  211. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  212. fc: &trInFlow{limit: uint32(icwz)},
  213. scheme: scheme,
  214. activeStreams: make(map[uint32]*Stream),
  215. isSecure: isSecure,
  216. perRPCCreds: perRPCCreds,
  217. kp: kp,
  218. statsHandler: opts.StatsHandler,
  219. initialWindowSize: initialWindowSize,
  220. onSuccess: onSuccess,
  221. nextID: 1,
  222. maxConcurrentStreams: defaultMaxStreamsClient,
  223. streamQuota: defaultMaxStreamsClient,
  224. streamsQuotaAvailable: make(chan struct{}, 1),
  225. czData: new(channelzData),
  226. onGoAway: onGoAway,
  227. onClose: onClose,
  228. keepaliveEnabled: keepaliveEnabled,
  229. }
  230. t.controlBuf = newControlBuffer(t.ctxDone)
  231. if opts.InitialWindowSize >= defaultWindowSize {
  232. t.initialWindowSize = opts.InitialWindowSize
  233. dynamicWindow = false
  234. }
  235. if dynamicWindow {
  236. t.bdpEst = &bdpEstimator{
  237. bdp: initialWindowSize,
  238. updateFlowControl: t.updateFlowControl,
  239. }
  240. }
  241. // Make sure awakenKeepalive can't be written upon.
  242. // keepalive routine will make it writable, if need be.
  243. t.awakenKeepalive <- struct{}{}
  244. if t.statsHandler != nil {
  245. t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
  246. RemoteAddr: t.remoteAddr,
  247. LocalAddr: t.localAddr,
  248. })
  249. connBegin := &stats.ConnBegin{
  250. Client: true,
  251. }
  252. t.statsHandler.HandleConn(t.ctx, connBegin)
  253. }
  254. if channelz.IsOn() {
  255. t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
  256. }
  257. if t.keepaliveEnabled {
  258. go t.keepalive()
  259. }
  260. // Start the reader goroutine for incoming message. Each transport has
  261. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  262. // dispatches the frame to the corresponding stream entity.
  263. go t.reader()
  264. // Send connection preface to server.
  265. n, err := t.conn.Write(clientPreface)
  266. if err != nil {
  267. t.Close()
  268. return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  269. }
  270. if n != len(clientPreface) {
  271. t.Close()
  272. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  273. }
  274. var ss []http2.Setting
  275. if t.initialWindowSize != defaultWindowSize {
  276. ss = append(ss, http2.Setting{
  277. ID: http2.SettingInitialWindowSize,
  278. Val: uint32(t.initialWindowSize),
  279. })
  280. }
  281. if opts.MaxHeaderListSize != nil {
  282. ss = append(ss, http2.Setting{
  283. ID: http2.SettingMaxHeaderListSize,
  284. Val: *opts.MaxHeaderListSize,
  285. })
  286. }
  287. err = t.framer.fr.WriteSettings(ss...)
  288. if err != nil {
  289. t.Close()
  290. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  291. }
  292. // Adjust the connection flow control window if needed.
  293. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  294. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  295. t.Close()
  296. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  297. }
  298. }
  299. t.framer.writer.Flush()
  300. go func() {
  301. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  302. err := t.loopy.run()
  303. if err != nil {
  304. errorf("transport: loopyWriter.run returning. Err: %v", err)
  305. }
  306. // If it's a connection error, let reader goroutine handle it
  307. // since there might be data in the buffers.
  308. if _, ok := err.(net.Error); !ok {
  309. t.conn.Close()
  310. }
  311. close(t.writerDone)
  312. }()
  313. return t, nil
  314. }
  315. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  316. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  317. s := &Stream{
  318. done: make(chan struct{}),
  319. method: callHdr.Method,
  320. sendCompress: callHdr.SendCompress,
  321. buf: newRecvBuffer(),
  322. headerChan: make(chan struct{}),
  323. contentSubtype: callHdr.ContentSubtype,
  324. }
  325. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  326. s.requestRead = func(n int) {
  327. t.adjustWindow(s, uint32(n))
  328. }
  329. // The client side stream context should have exactly the same life cycle with the user provided context.
  330. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  331. // So we use the original context here instead of creating a copy.
  332. s.ctx = ctx
  333. s.trReader = &transportReader{
  334. reader: &recvBufferReader{
  335. ctx: s.ctx,
  336. ctxDone: s.ctx.Done(),
  337. recv: s.buf,
  338. },
  339. windowHandler: func(n int) {
  340. t.updateWindow(s, uint32(n))
  341. },
  342. }
  343. return s
  344. }
  345. func (t *http2Client) getPeer() *peer.Peer {
  346. pr := &peer.Peer{
  347. Addr: t.remoteAddr,
  348. }
  349. // Attach Auth info if there is any.
  350. if t.authInfo != nil {
  351. pr.AuthInfo = t.authInfo
  352. }
  353. return pr
  354. }
  355. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  356. aud := t.createAudience(callHdr)
  357. authData, err := t.getTrAuthData(ctx, aud)
  358. if err != nil {
  359. return nil, err
  360. }
  361. callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
  362. if err != nil {
  363. return nil, err
  364. }
  365. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  366. // first and create a slice of that exact size.
  367. // Make the slice of certain predictable size to reduce allocations made by append.
  368. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  369. hfLen += len(authData) + len(callAuthData)
  370. headerFields := make([]hpack.HeaderField, 0, hfLen)
  371. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  372. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  373. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  374. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  375. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
  376. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  377. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  378. if callHdr.PreviousAttempts > 0 {
  379. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  380. }
  381. if callHdr.SendCompress != "" {
  382. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  383. }
  384. if dl, ok := ctx.Deadline(); ok {
  385. // Send out timeout regardless its value. The server can detect timeout context by itself.
  386. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  387. timeout := dl.Sub(time.Now())
  388. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  389. }
  390. for k, v := range authData {
  391. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  392. }
  393. for k, v := range callAuthData {
  394. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  395. }
  396. if b := stats.OutgoingTags(ctx); b != nil {
  397. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  398. }
  399. if b := stats.OutgoingTrace(ctx); b != nil {
  400. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  401. }
  402. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  403. var k string
  404. for _, vv := range added {
  405. for i, v := range vv {
  406. if i%2 == 0 {
  407. k = v
  408. continue
  409. }
  410. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  411. if isReservedHeader(k) {
  412. continue
  413. }
  414. headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
  415. }
  416. }
  417. for k, vv := range md {
  418. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  419. if isReservedHeader(k) {
  420. continue
  421. }
  422. for _, v := range vv {
  423. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  424. }
  425. }
  426. }
  427. if md, ok := t.md.(*metadata.MD); ok {
  428. for k, vv := range *md {
  429. if isReservedHeader(k) {
  430. continue
  431. }
  432. for _, v := range vv {
  433. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  434. }
  435. }
  436. }
  437. return headerFields, nil
  438. }
  439. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  440. // Create an audience string only if needed.
  441. if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
  442. return ""
  443. }
  444. // Construct URI required to get auth request metadata.
  445. // Omit port if it is the default one.
  446. host := strings.TrimSuffix(callHdr.Host, ":443")
  447. pos := strings.LastIndex(callHdr.Method, "/")
  448. if pos == -1 {
  449. pos = len(callHdr.Method)
  450. }
  451. return "https://" + host + callHdr.Method[:pos]
  452. }
  453. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  454. authData := map[string]string{}
  455. for _, c := range t.perRPCCreds {
  456. data, err := c.GetRequestMetadata(ctx, audience)
  457. if err != nil {
  458. if _, ok := status.FromError(err); ok {
  459. return nil, err
  460. }
  461. return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
  462. }
  463. for k, v := range data {
  464. // Capital header names are illegal in HTTP/2.
  465. k = strings.ToLower(k)
  466. authData[k] = v
  467. }
  468. }
  469. return authData, nil
  470. }
  471. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  472. callAuthData := map[string]string{}
  473. // Check if credentials.PerRPCCredentials were provided via call options.
  474. // Note: if these credentials are provided both via dial options and call
  475. // options, then both sets of credentials will be applied.
  476. if callCreds := callHdr.Creds; callCreds != nil {
  477. if !t.isSecure && callCreds.RequireTransportSecurity() {
  478. return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  479. }
  480. data, err := callCreds.GetRequestMetadata(ctx, audience)
  481. if err != nil {
  482. return nil, status.Errorf(codes.Internal, "transport: %v", err)
  483. }
  484. for k, v := range data {
  485. // Capital header names are illegal in HTTP/2
  486. k = strings.ToLower(k)
  487. callAuthData[k] = v
  488. }
  489. }
  490. return callAuthData, nil
  491. }
  492. // NewStream creates a stream and registers it into the transport as "active"
  493. // streams.
  494. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  495. ctx = peer.NewContext(ctx, t.getPeer())
  496. headerFields, err := t.createHeaderFields(ctx, callHdr)
  497. if err != nil {
  498. return nil, err
  499. }
  500. s := t.newStream(ctx, callHdr)
  501. cleanup := func(err error) {
  502. if s.swapState(streamDone) == streamDone {
  503. // If it was already done, return.
  504. return
  505. }
  506. // The stream was unprocessed by the server.
  507. atomic.StoreUint32(&s.unprocessed, 1)
  508. s.write(recvMsg{err: err})
  509. close(s.done)
  510. // If headerChan isn't closed, then close it.
  511. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  512. close(s.headerChan)
  513. }
  514. }
  515. hdr := &headerFrame{
  516. hf: headerFields,
  517. endStream: false,
  518. initStream: func(id uint32) (bool, error) {
  519. t.mu.Lock()
  520. if state := t.state; state != reachable {
  521. t.mu.Unlock()
  522. // Do a quick cleanup.
  523. err := error(errStreamDrain)
  524. if state == closing {
  525. err = ErrConnClosing
  526. }
  527. cleanup(err)
  528. return false, err
  529. }
  530. t.activeStreams[id] = s
  531. if channelz.IsOn() {
  532. atomic.AddInt64(&t.czData.streamsStarted, 1)
  533. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  534. }
  535. var sendPing bool
  536. // If the number of active streams change from 0 to 1, then check if keepalive
  537. // has gone dormant. If so, wake it up.
  538. if len(t.activeStreams) == 1 && t.keepaliveEnabled {
  539. select {
  540. case t.awakenKeepalive <- struct{}{}:
  541. sendPing = true
  542. // Fill the awakenKeepalive channel again as this channel must be
  543. // kept non-writable except at the point that the keepalive()
  544. // goroutine is waiting either to be awaken or shutdown.
  545. t.awakenKeepalive <- struct{}{}
  546. default:
  547. }
  548. }
  549. t.mu.Unlock()
  550. return sendPing, nil
  551. },
  552. onOrphaned: cleanup,
  553. wq: s.wq,
  554. }
  555. firstTry := true
  556. var ch chan struct{}
  557. checkForStreamQuota := func(it interface{}) bool {
  558. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  559. if firstTry {
  560. t.waitingStreams++
  561. }
  562. ch = t.streamsQuotaAvailable
  563. return false
  564. }
  565. if !firstTry {
  566. t.waitingStreams--
  567. }
  568. t.streamQuota--
  569. h := it.(*headerFrame)
  570. h.streamID = t.nextID
  571. t.nextID += 2
  572. s.id = h.streamID
  573. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  574. if t.streamQuota > 0 && t.waitingStreams > 0 {
  575. select {
  576. case t.streamsQuotaAvailable <- struct{}{}:
  577. default:
  578. }
  579. }
  580. return true
  581. }
  582. var hdrListSizeErr error
  583. checkForHeaderListSize := func(it interface{}) bool {
  584. if t.maxSendHeaderListSize == nil {
  585. return true
  586. }
  587. hdrFrame := it.(*headerFrame)
  588. var sz int64
  589. for _, f := range hdrFrame.hf {
  590. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  591. hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  592. return false
  593. }
  594. }
  595. return true
  596. }
  597. for {
  598. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  599. if !checkForStreamQuota(it) {
  600. return false
  601. }
  602. if !checkForHeaderListSize(it) {
  603. return false
  604. }
  605. return true
  606. }, hdr)
  607. if err != nil {
  608. return nil, err
  609. }
  610. if success {
  611. break
  612. }
  613. if hdrListSizeErr != nil {
  614. return nil, hdrListSizeErr
  615. }
  616. firstTry = false
  617. select {
  618. case <-ch:
  619. case <-s.ctx.Done():
  620. return nil, ContextErr(s.ctx.Err())
  621. case <-t.goAway:
  622. return nil, errStreamDrain
  623. case <-t.ctx.Done():
  624. return nil, ErrConnClosing
  625. }
  626. }
  627. if t.statsHandler != nil {
  628. outHeader := &stats.OutHeader{
  629. Client: true,
  630. FullMethod: callHdr.Method,
  631. RemoteAddr: t.remoteAddr,
  632. LocalAddr: t.localAddr,
  633. Compression: callHdr.SendCompress,
  634. }
  635. t.statsHandler.HandleRPC(s.ctx, outHeader)
  636. }
  637. return s, nil
  638. }
  639. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  640. // This must not be executed in reader's goroutine.
  641. func (t *http2Client) CloseStream(s *Stream, err error) {
  642. var (
  643. rst bool
  644. rstCode http2.ErrCode
  645. )
  646. if err != nil {
  647. rst = true
  648. rstCode = http2.ErrCodeCancel
  649. }
  650. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  651. }
  652. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  653. // Set stream status to done.
  654. if s.swapState(streamDone) == streamDone {
  655. // If it was already done, return. If multiple closeStream calls
  656. // happen simultaneously, wait for the first to finish.
  657. <-s.done
  658. return
  659. }
  660. // status and trailers can be updated here without any synchronization because the stream goroutine will
  661. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  662. // only after updating this.
  663. s.status = st
  664. if len(mdata) > 0 {
  665. s.trailer = mdata
  666. }
  667. if err != nil {
  668. // This will unblock reads eventually.
  669. s.write(recvMsg{err: err})
  670. }
  671. // If headerChan isn't closed, then close it.
  672. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  673. s.noHeaders = true
  674. close(s.headerChan)
  675. }
  676. cleanup := &cleanupStream{
  677. streamID: s.id,
  678. onWrite: func() {
  679. t.mu.Lock()
  680. if t.activeStreams != nil {
  681. delete(t.activeStreams, s.id)
  682. }
  683. t.mu.Unlock()
  684. if channelz.IsOn() {
  685. if eosReceived {
  686. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  687. } else {
  688. atomic.AddInt64(&t.czData.streamsFailed, 1)
  689. }
  690. }
  691. },
  692. rst: rst,
  693. rstCode: rstCode,
  694. }
  695. addBackStreamQuota := func(interface{}) bool {
  696. t.streamQuota++
  697. if t.streamQuota > 0 && t.waitingStreams > 0 {
  698. select {
  699. case t.streamsQuotaAvailable <- struct{}{}:
  700. default:
  701. }
  702. }
  703. return true
  704. }
  705. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  706. // This will unblock write.
  707. close(s.done)
  708. }
  709. // Close kicks off the shutdown process of the transport. This should be called
  710. // only once on a transport. Once it is called, the transport should not be
  711. // accessed any more.
  712. //
  713. // This method blocks until the addrConn that initiated this transport is
  714. // re-connected. This happens because t.onClose() begins reconnect logic at the
  715. // addrConn level and blocks until the addrConn is successfully connected.
  716. func (t *http2Client) Close() error {
  717. t.mu.Lock()
  718. // Make sure we only Close once.
  719. if t.state == closing {
  720. t.mu.Unlock()
  721. return nil
  722. }
  723. t.state = closing
  724. streams := t.activeStreams
  725. t.activeStreams = nil
  726. t.mu.Unlock()
  727. t.controlBuf.finish()
  728. t.cancel()
  729. err := t.conn.Close()
  730. if channelz.IsOn() {
  731. channelz.RemoveEntry(t.channelzID)
  732. }
  733. // Notify all active streams.
  734. for _, s := range streams {
  735. t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
  736. }
  737. if t.statsHandler != nil {
  738. connEnd := &stats.ConnEnd{
  739. Client: true,
  740. }
  741. t.statsHandler.HandleConn(t.ctx, connEnd)
  742. }
  743. go t.onClose()
  744. return err
  745. }
  746. // GracefulClose sets the state to draining, which prevents new streams from
  747. // being created and causes the transport to be closed when the last active
  748. // stream is closed. If there are no active streams, the transport is closed
  749. // immediately. This does nothing if the transport is already draining or
  750. // closing.
  751. func (t *http2Client) GracefulClose() error {
  752. t.mu.Lock()
  753. // Make sure we move to draining only from active.
  754. if t.state == draining || t.state == closing {
  755. t.mu.Unlock()
  756. return nil
  757. }
  758. t.state = draining
  759. active := len(t.activeStreams)
  760. t.mu.Unlock()
  761. if active == 0 {
  762. return t.Close()
  763. }
  764. t.controlBuf.put(&incomingGoAway{})
  765. return nil
  766. }
  767. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  768. // should proceed only if Write returns nil.
  769. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  770. if opts.Last {
  771. // If it's the last message, update stream state.
  772. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  773. return errStreamDone
  774. }
  775. } else if s.getState() != streamActive {
  776. return errStreamDone
  777. }
  778. df := &dataFrame{
  779. streamID: s.id,
  780. endStream: opts.Last,
  781. }
  782. if hdr != nil || data != nil { // If it's not an empty data frame.
  783. // Add some data to grpc message header so that we can equally
  784. // distribute bytes across frames.
  785. emptyLen := http2MaxFrameLen - len(hdr)
  786. if emptyLen > len(data) {
  787. emptyLen = len(data)
  788. }
  789. hdr = append(hdr, data[:emptyLen]...)
  790. data = data[emptyLen:]
  791. df.h, df.d = hdr, data
  792. // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
  793. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  794. return err
  795. }
  796. }
  797. return t.controlBuf.put(df)
  798. }
  799. func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
  800. t.mu.Lock()
  801. defer t.mu.Unlock()
  802. s, ok := t.activeStreams[f.Header().StreamID]
  803. return s, ok
  804. }
  805. // adjustWindow sends out extra window update over the initial window size
  806. // of stream if the application is requesting data larger in size than
  807. // the window.
  808. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  809. if w := s.fc.maybeAdjust(n); w > 0 {
  810. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  811. }
  812. }
  813. // updateWindow adjusts the inbound quota for the stream.
  814. // Window updates will be sent out when the cumulative quota
  815. // exceeds the corresponding threshold.
  816. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  817. if w := s.fc.onRead(n); w > 0 {
  818. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  819. }
  820. }
  821. // updateFlowControl updates the incoming flow control windows
  822. // for the transport and the stream based on the current bdp
  823. // estimation.
  824. func (t *http2Client) updateFlowControl(n uint32) {
  825. t.mu.Lock()
  826. for _, s := range t.activeStreams {
  827. s.fc.newLimit(n)
  828. }
  829. t.mu.Unlock()
  830. updateIWS := func(interface{}) bool {
  831. t.initialWindowSize = int32(n)
  832. return true
  833. }
  834. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  835. t.controlBuf.put(&outgoingSettings{
  836. ss: []http2.Setting{
  837. {
  838. ID: http2.SettingInitialWindowSize,
  839. Val: n,
  840. },
  841. },
  842. })
  843. }
  844. func (t *http2Client) handleData(f *http2.DataFrame) {
  845. size := f.Header().Length
  846. var sendBDPPing bool
  847. if t.bdpEst != nil {
  848. sendBDPPing = t.bdpEst.add(size)
  849. }
  850. // Decouple connection's flow control from application's read.
  851. // An update on connection's flow control should not depend on
  852. // whether user application has read the data or not. Such a
  853. // restriction is already imposed on the stream's flow control,
  854. // and therefore the sender will be blocked anyways.
  855. // Decoupling the connection flow control will prevent other
  856. // active(fast) streams from starving in presence of slow or
  857. // inactive streams.
  858. //
  859. if w := t.fc.onData(size); w > 0 {
  860. t.controlBuf.put(&outgoingWindowUpdate{
  861. streamID: 0,
  862. increment: w,
  863. })
  864. }
  865. if sendBDPPing {
  866. // Avoid excessive ping detection (e.g. in an L7 proxy)
  867. // by sending a window update prior to the BDP ping.
  868. if w := t.fc.reset(); w > 0 {
  869. t.controlBuf.put(&outgoingWindowUpdate{
  870. streamID: 0,
  871. increment: w,
  872. })
  873. }
  874. t.controlBuf.put(bdpPing)
  875. }
  876. // Select the right stream to dispatch.
  877. s, ok := t.getStream(f)
  878. if !ok {
  879. return
  880. }
  881. if size > 0 {
  882. if err := s.fc.onData(size); err != nil {
  883. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  884. return
  885. }
  886. if f.Header().Flags.Has(http2.FlagDataPadded) {
  887. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  888. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  889. }
  890. }
  891. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  892. // guarantee f.Data() is consumed before the arrival of next frame.
  893. // Can this copy be eliminated?
  894. if len(f.Data()) > 0 {
  895. data := make([]byte, len(f.Data()))
  896. copy(data, f.Data())
  897. s.write(recvMsg{data: data})
  898. }
  899. }
  900. // The server has closed the stream without sending trailers. Record that
  901. // the read direction is closed, and set the status appropriately.
  902. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  903. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  904. }
  905. }
  906. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  907. s, ok := t.getStream(f)
  908. if !ok {
  909. return
  910. }
  911. if f.ErrCode == http2.ErrCodeRefusedStream {
  912. // The stream was unprocessed by the server.
  913. atomic.StoreUint32(&s.unprocessed, 1)
  914. }
  915. statusCode, ok := http2ErrConvTab[f.ErrCode]
  916. if !ok {
  917. warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
  918. statusCode = codes.Unknown
  919. }
  920. if statusCode == codes.Canceled {
  921. // Our deadline was already exceeded, and that was likely the cause of
  922. // this cancelation. Alter the status code accordingly.
  923. if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
  924. statusCode = codes.DeadlineExceeded
  925. }
  926. }
  927. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  928. }
  929. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  930. if f.IsAck() {
  931. return
  932. }
  933. var maxStreams *uint32
  934. var ss []http2.Setting
  935. var updateFuncs []func()
  936. f.ForeachSetting(func(s http2.Setting) error {
  937. switch s.ID {
  938. case http2.SettingMaxConcurrentStreams:
  939. maxStreams = new(uint32)
  940. *maxStreams = s.Val
  941. case http2.SettingMaxHeaderListSize:
  942. updateFuncs = append(updateFuncs, func() {
  943. t.maxSendHeaderListSize = new(uint32)
  944. *t.maxSendHeaderListSize = s.Val
  945. })
  946. default:
  947. ss = append(ss, s)
  948. }
  949. return nil
  950. })
  951. if isFirst && maxStreams == nil {
  952. maxStreams = new(uint32)
  953. *maxStreams = math.MaxUint32
  954. }
  955. sf := &incomingSettings{
  956. ss: ss,
  957. }
  958. if maxStreams != nil {
  959. updateStreamQuota := func() {
  960. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  961. t.maxConcurrentStreams = *maxStreams
  962. t.streamQuota += delta
  963. if delta > 0 && t.waitingStreams > 0 {
  964. close(t.streamsQuotaAvailable) // wake all of them up.
  965. t.streamsQuotaAvailable = make(chan struct{}, 1)
  966. }
  967. }
  968. updateFuncs = append(updateFuncs, updateStreamQuota)
  969. }
  970. t.controlBuf.executeAndPut(func(interface{}) bool {
  971. for _, f := range updateFuncs {
  972. f()
  973. }
  974. return true
  975. }, sf)
  976. }
  977. func (t *http2Client) handlePing(f *http2.PingFrame) {
  978. if f.IsAck() {
  979. // Maybe it's a BDP ping.
  980. if t.bdpEst != nil {
  981. t.bdpEst.calculate(f.Data)
  982. }
  983. return
  984. }
  985. pingAck := &ping{ack: true}
  986. copy(pingAck.data[:], f.Data[:])
  987. t.controlBuf.put(pingAck)
  988. }
  989. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  990. t.mu.Lock()
  991. if t.state == closing {
  992. t.mu.Unlock()
  993. return
  994. }
  995. if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
  996. infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
  997. }
  998. id := f.LastStreamID
  999. if id > 0 && id%2 != 1 {
  1000. t.mu.Unlock()
  1001. t.Close()
  1002. return
  1003. }
  1004. // A client can receive multiple GoAways from the server (see
  1005. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  1006. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  1007. // sent after an RTT delay with the ID of the last stream the server will
  1008. // process.
  1009. //
  1010. // Therefore, when we get the first GoAway we don't necessarily close any
  1011. // streams. While in case of second GoAway we close all streams created after
  1012. // the GoAwayId. This way streams that were in-flight while the GoAway from
  1013. // server was being sent don't get killed.
  1014. select {
  1015. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1016. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1017. if id > t.prevGoAwayID {
  1018. t.mu.Unlock()
  1019. t.Close()
  1020. return
  1021. }
  1022. default:
  1023. t.setGoAwayReason(f)
  1024. close(t.goAway)
  1025. t.state = draining
  1026. t.controlBuf.put(&incomingGoAway{})
  1027. // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
  1028. t.onGoAway(t.goAwayReason)
  1029. }
  1030. // All streams with IDs greater than the GoAwayId
  1031. // and smaller than the previous GoAway ID should be killed.
  1032. upperLimit := t.prevGoAwayID
  1033. if upperLimit == 0 { // This is the first GoAway Frame.
  1034. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1035. }
  1036. for streamID, stream := range t.activeStreams {
  1037. if streamID > id && streamID <= upperLimit {
  1038. // The stream was unprocessed by the server.
  1039. atomic.StoreUint32(&stream.unprocessed, 1)
  1040. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1041. }
  1042. }
  1043. t.prevGoAwayID = id
  1044. active := len(t.activeStreams)
  1045. t.mu.Unlock()
  1046. if active == 0 {
  1047. t.Close()
  1048. }
  1049. }
  1050. // setGoAwayReason sets the value of t.goAwayReason based
  1051. // on the GoAway frame received.
  1052. // It expects a lock on transport's mutext to be held by
  1053. // the caller.
  1054. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1055. t.goAwayReason = GoAwayNoReason
  1056. switch f.ErrCode {
  1057. case http2.ErrCodeEnhanceYourCalm:
  1058. if string(f.DebugData()) == "too_many_pings" {
  1059. t.goAwayReason = GoAwayTooManyPings
  1060. }
  1061. }
  1062. }
  1063. func (t *http2Client) GetGoAwayReason() GoAwayReason {
  1064. t.mu.Lock()
  1065. defer t.mu.Unlock()
  1066. return t.goAwayReason
  1067. }
  1068. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1069. t.controlBuf.put(&incomingWindowUpdate{
  1070. streamID: f.Header().StreamID,
  1071. increment: f.Increment,
  1072. })
  1073. }
  1074. // operateHeaders takes action on the decoded headers.
  1075. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1076. s, ok := t.getStream(frame)
  1077. if !ok {
  1078. return
  1079. }
  1080. atomic.StoreUint32(&s.bytesReceived, 1)
  1081. var state decodeState
  1082. if err := state.decodeHeader(frame); err != nil {
  1083. t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
  1084. // Something wrong. Stops reading even when there is remaining.
  1085. return
  1086. }
  1087. endStream := frame.StreamEnded()
  1088. var isHeader bool
  1089. defer func() {
  1090. if t.statsHandler != nil {
  1091. if isHeader {
  1092. inHeader := &stats.InHeader{
  1093. Client: true,
  1094. WireLength: int(frame.Header().Length),
  1095. }
  1096. t.statsHandler.HandleRPC(s.ctx, inHeader)
  1097. } else {
  1098. inTrailer := &stats.InTrailer{
  1099. Client: true,
  1100. WireLength: int(frame.Header().Length),
  1101. }
  1102. t.statsHandler.HandleRPC(s.ctx, inTrailer)
  1103. }
  1104. }
  1105. }()
  1106. // If headers haven't been received yet.
  1107. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  1108. if !endStream {
  1109. // Headers frame is not actually a trailers-only frame.
  1110. isHeader = true
  1111. // These values can be set without any synchronization because
  1112. // stream goroutine will read it only after seeing a closed
  1113. // headerChan which we'll close after setting this.
  1114. s.recvCompress = state.encoding
  1115. if len(state.mdata) > 0 {
  1116. s.header = state.mdata
  1117. }
  1118. } else {
  1119. s.noHeaders = true
  1120. }
  1121. close(s.headerChan)
  1122. }
  1123. if !endStream {
  1124. return
  1125. }
  1126. // if client received END_STREAM from server while stream was still active, send RST_STREAM
  1127. rst := s.getState() == streamActive
  1128. t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
  1129. }
  1130. // reader runs as a separate goroutine in charge of reading data from network
  1131. // connection.
  1132. //
  1133. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  1134. // optimal.
  1135. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  1136. func (t *http2Client) reader() {
  1137. defer close(t.readerDone)
  1138. // Check the validity of server preface.
  1139. frame, err := t.framer.fr.ReadFrame()
  1140. if err != nil {
  1141. t.Close() // this kicks off resetTransport, so must be last before return
  1142. return
  1143. }
  1144. t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
  1145. if t.keepaliveEnabled {
  1146. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1147. }
  1148. sf, ok := frame.(*http2.SettingsFrame)
  1149. if !ok {
  1150. t.Close() // this kicks off resetTransport, so must be last before return
  1151. return
  1152. }
  1153. t.onSuccess()
  1154. t.handleSettings(sf, true)
  1155. // loop to keep reading incoming messages on this transport.
  1156. for {
  1157. frame, err := t.framer.fr.ReadFrame()
  1158. if t.keepaliveEnabled {
  1159. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1160. }
  1161. if err != nil {
  1162. // Abort an active stream if the http2.Framer returns a
  1163. // http2.StreamError. This can happen only if the server's response
  1164. // is malformed http2.
  1165. if se, ok := err.(http2.StreamError); ok {
  1166. t.mu.Lock()
  1167. s := t.activeStreams[se.StreamID]
  1168. t.mu.Unlock()
  1169. if s != nil {
  1170. // use error detail to provide better err message
  1171. code := http2ErrConvTab[se.Code]
  1172. msg := t.framer.fr.ErrorDetail().Error()
  1173. t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1174. }
  1175. continue
  1176. } else {
  1177. // Transport error.
  1178. t.Close()
  1179. return
  1180. }
  1181. }
  1182. switch frame := frame.(type) {
  1183. case *http2.MetaHeadersFrame:
  1184. t.operateHeaders(frame)
  1185. case *http2.DataFrame:
  1186. t.handleData(frame)
  1187. case *http2.RSTStreamFrame:
  1188. t.handleRSTStream(frame)
  1189. case *http2.SettingsFrame:
  1190. t.handleSettings(frame, false)
  1191. case *http2.PingFrame:
  1192. t.handlePing(frame)
  1193. case *http2.GoAwayFrame:
  1194. t.handleGoAway(frame)
  1195. case *http2.WindowUpdateFrame:
  1196. t.handleWindowUpdate(frame)
  1197. default:
  1198. errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1199. }
  1200. }
  1201. }
  1202. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
  1203. func (t *http2Client) keepalive() {
  1204. p := &ping{data: [8]byte{}}
  1205. timer := time.NewTimer(t.kp.Time)
  1206. for {
  1207. select {
  1208. case <-timer.C:
  1209. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1210. timer.Reset(t.kp.Time)
  1211. continue
  1212. }
  1213. // Check if keepalive should go dormant.
  1214. t.mu.Lock()
  1215. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1216. // Make awakenKeepalive writable.
  1217. <-t.awakenKeepalive
  1218. t.mu.Unlock()
  1219. select {
  1220. case <-t.awakenKeepalive:
  1221. // If the control gets here a ping has been sent
  1222. // need to reset the timer with keepalive.Timeout.
  1223. case <-t.ctx.Done():
  1224. return
  1225. }
  1226. } else {
  1227. t.mu.Unlock()
  1228. if channelz.IsOn() {
  1229. atomic.AddInt64(&t.czData.kpCount, 1)
  1230. }
  1231. // Send ping.
  1232. t.controlBuf.put(p)
  1233. }
  1234. // By the time control gets here a ping has been sent one way or the other.
  1235. timer.Reset(t.kp.Timeout)
  1236. select {
  1237. case <-timer.C:
  1238. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1239. timer.Reset(t.kp.Time)
  1240. continue
  1241. }
  1242. t.Close()
  1243. return
  1244. case <-t.ctx.Done():
  1245. if !timer.Stop() {
  1246. <-timer.C
  1247. }
  1248. return
  1249. }
  1250. case <-t.ctx.Done():
  1251. if !timer.Stop() {
  1252. <-timer.C
  1253. }
  1254. return
  1255. }
  1256. }
  1257. }
  1258. func (t *http2Client) Error() <-chan struct{} {
  1259. return t.ctx.Done()
  1260. }
  1261. func (t *http2Client) GoAway() <-chan struct{} {
  1262. return t.goAway
  1263. }
  1264. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1265. s := channelz.SocketInternalMetric{
  1266. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1267. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1268. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1269. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1270. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1271. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1272. LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1273. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1274. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1275. LocalFlowControlWindow: int64(t.fc.getSize()),
  1276. SocketOptions: channelz.GetSocketOption(t.conn),
  1277. LocalAddr: t.localAddr,
  1278. RemoteAddr: t.remoteAddr,
  1279. // RemoteName :
  1280. }
  1281. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1282. s.Security = au.GetSecurityValue()
  1283. }
  1284. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1285. return &s
  1286. }
  1287. func (t *http2Client) IncrMsgSent() {
  1288. atomic.AddInt64(&t.czData.msgSent, 1)
  1289. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1290. }
  1291. func (t *http2Client) IncrMsgRecv() {
  1292. atomic.AddInt64(&t.czData.msgRecv, 1)
  1293. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1294. }
  1295. func (t *http2Client) getOutFlowWindow() int64 {
  1296. resp := make(chan uint32, 1)
  1297. timer := time.NewTimer(time.Second)
  1298. defer timer.Stop()
  1299. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1300. select {
  1301. case sz := <-resp:
  1302. return int64(sz)
  1303. case <-t.ctxDone:
  1304. return -1
  1305. case <-timer.C:
  1306. return -2
  1307. }
  1308. }