http2_server.go 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "context"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "strconv"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/golang/protobuf/proto"
  32. "golang.org/x/net/http2"
  33. "golang.org/x/net/http2/hpack"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/grpclog"
  37. "google.golang.org/grpc/internal/channelz"
  38. "google.golang.org/grpc/internal/grpcrand"
  39. "google.golang.org/grpc/keepalive"
  40. "google.golang.org/grpc/metadata"
  41. "google.golang.org/grpc/peer"
  42. "google.golang.org/grpc/stats"
  43. "google.golang.org/grpc/status"
  44. "google.golang.org/grpc/tap"
  45. )
  46. var (
  47. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  48. // the stream's state.
  49. ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  50. // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  51. // than the limit set by peer.
  52. ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
  53. )
  54. // http2Server implements the ServerTransport interface with HTTP2.
  55. type http2Server struct {
  56. ctx context.Context
  57. ctxDone <-chan struct{} // Cache the context.Done() chan
  58. cancel context.CancelFunc
  59. conn net.Conn
  60. loopy *loopyWriter
  61. readerDone chan struct{} // sync point to enable testing.
  62. writerDone chan struct{} // sync point to enable testing.
  63. remoteAddr net.Addr
  64. localAddr net.Addr
  65. maxStreamID uint32 // max stream ID ever seen
  66. authInfo credentials.AuthInfo // auth info about the connection
  67. inTapHandle tap.ServerInHandle
  68. framer *framer
  69. // The max number of concurrent streams.
  70. maxStreams uint32
  71. // controlBuf delivers all the control related tasks (e.g., window
  72. // updates, reset streams, and various settings) to the controller.
  73. controlBuf *controlBuffer
  74. fc *trInFlow
  75. stats stats.Handler
  76. // Flag to keep track of reading activity on transport.
  77. // 1 is true and 0 is false.
  78. activity uint32 // Accessed atomically.
  79. // Keepalive and max-age parameters for the server.
  80. kp keepalive.ServerParameters
  81. // Keepalive enforcement policy.
  82. kep keepalive.EnforcementPolicy
  83. // The time instance last ping was received.
  84. lastPingAt time.Time
  85. // Number of times the client has violated keepalive ping policy so far.
  86. pingStrikes uint8
  87. // Flag to signify that number of ping strikes should be reset to 0.
  88. // This is set whenever data or header frames are sent.
  89. // 1 means yes.
  90. resetPingStrikes uint32 // Accessed atomically.
  91. initialWindowSize int32
  92. bdpEst *bdpEstimator
  93. maxSendHeaderListSize *uint32
  94. mu sync.Mutex // guard the following
  95. // drainChan is initialized when drain(...) is called the first time.
  96. // After which the server writes out the first GoAway(with ID 2^31-1) frame.
  97. // Then an independent goroutine will be launched to later send the second GoAway.
  98. // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
  99. // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
  100. // already underway.
  101. drainChan chan struct{}
  102. state transportState
  103. activeStreams map[uint32]*Stream
  104. // idle is the time instant when the connection went idle.
  105. // This is either the beginning of the connection or when the number of
  106. // RPCs go down to 0.
  107. // When the connection is busy, this value is set to 0.
  108. idle time.Time
  109. // Fields below are for channelz metric collection.
  110. channelzID int64 // channelz unique identification number
  111. czData *channelzData
  112. }
  113. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  114. // returned if something goes wrong.
  115. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  116. writeBufSize := config.WriteBufferSize
  117. readBufSize := config.ReadBufferSize
  118. maxHeaderListSize := defaultServerMaxHeaderListSize
  119. if config.MaxHeaderListSize != nil {
  120. maxHeaderListSize = *config.MaxHeaderListSize
  121. }
  122. framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
  123. // Send initial settings as connection preface to client.
  124. var isettings []http2.Setting
  125. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  126. // permitted in the HTTP2 spec.
  127. maxStreams := config.MaxStreams
  128. if maxStreams == 0 {
  129. maxStreams = math.MaxUint32
  130. } else {
  131. isettings = append(isettings, http2.Setting{
  132. ID: http2.SettingMaxConcurrentStreams,
  133. Val: maxStreams,
  134. })
  135. }
  136. dynamicWindow := true
  137. iwz := int32(initialWindowSize)
  138. if config.InitialWindowSize >= defaultWindowSize {
  139. iwz = config.InitialWindowSize
  140. dynamicWindow = false
  141. }
  142. icwz := int32(initialWindowSize)
  143. if config.InitialConnWindowSize >= defaultWindowSize {
  144. icwz = config.InitialConnWindowSize
  145. dynamicWindow = false
  146. }
  147. if iwz != defaultWindowSize {
  148. isettings = append(isettings, http2.Setting{
  149. ID: http2.SettingInitialWindowSize,
  150. Val: uint32(iwz)})
  151. }
  152. if config.MaxHeaderListSize != nil {
  153. isettings = append(isettings, http2.Setting{
  154. ID: http2.SettingMaxHeaderListSize,
  155. Val: *config.MaxHeaderListSize,
  156. })
  157. }
  158. if err := framer.fr.WriteSettings(isettings...); err != nil {
  159. return nil, connectionErrorf(false, err, "transport: %v", err)
  160. }
  161. // Adjust the connection flow control window if needed.
  162. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  163. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  164. return nil, connectionErrorf(false, err, "transport: %v", err)
  165. }
  166. }
  167. kp := config.KeepaliveParams
  168. if kp.MaxConnectionIdle == 0 {
  169. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  170. }
  171. if kp.MaxConnectionAge == 0 {
  172. kp.MaxConnectionAge = defaultMaxConnectionAge
  173. }
  174. // Add a jitter to MaxConnectionAge.
  175. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  176. if kp.MaxConnectionAgeGrace == 0 {
  177. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  178. }
  179. if kp.Time == 0 {
  180. kp.Time = defaultServerKeepaliveTime
  181. }
  182. if kp.Timeout == 0 {
  183. kp.Timeout = defaultServerKeepaliveTimeout
  184. }
  185. kep := config.KeepalivePolicy
  186. if kep.MinTime == 0 {
  187. kep.MinTime = defaultKeepalivePolicyMinTime
  188. }
  189. ctx, cancel := context.WithCancel(context.Background())
  190. t := &http2Server{
  191. ctx: ctx,
  192. cancel: cancel,
  193. ctxDone: ctx.Done(),
  194. conn: conn,
  195. remoteAddr: conn.RemoteAddr(),
  196. localAddr: conn.LocalAddr(),
  197. authInfo: config.AuthInfo,
  198. framer: framer,
  199. readerDone: make(chan struct{}),
  200. writerDone: make(chan struct{}),
  201. maxStreams: maxStreams,
  202. inTapHandle: config.InTapHandle,
  203. fc: &trInFlow{limit: uint32(icwz)},
  204. state: reachable,
  205. activeStreams: make(map[uint32]*Stream),
  206. stats: config.StatsHandler,
  207. kp: kp,
  208. idle: time.Now(),
  209. kep: kep,
  210. initialWindowSize: iwz,
  211. czData: new(channelzData),
  212. }
  213. t.controlBuf = newControlBuffer(t.ctxDone)
  214. if dynamicWindow {
  215. t.bdpEst = &bdpEstimator{
  216. bdp: initialWindowSize,
  217. updateFlowControl: t.updateFlowControl,
  218. }
  219. }
  220. if t.stats != nil {
  221. t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
  222. RemoteAddr: t.remoteAddr,
  223. LocalAddr: t.localAddr,
  224. })
  225. connBegin := &stats.ConnBegin{}
  226. t.stats.HandleConn(t.ctx, connBegin)
  227. }
  228. if channelz.IsOn() {
  229. t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
  230. }
  231. t.framer.writer.Flush()
  232. defer func() {
  233. if err != nil {
  234. t.Close()
  235. }
  236. }()
  237. // Check the validity of client preface.
  238. preface := make([]byte, len(clientPreface))
  239. if _, err := io.ReadFull(t.conn, preface); err != nil {
  240. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  241. }
  242. if !bytes.Equal(preface, clientPreface) {
  243. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  244. }
  245. frame, err := t.framer.fr.ReadFrame()
  246. if err == io.EOF || err == io.ErrUnexpectedEOF {
  247. return nil, err
  248. }
  249. if err != nil {
  250. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  251. }
  252. atomic.StoreUint32(&t.activity, 1)
  253. sf, ok := frame.(*http2.SettingsFrame)
  254. if !ok {
  255. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  256. }
  257. t.handleSettings(sf)
  258. go func() {
  259. t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
  260. t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
  261. if err := t.loopy.run(); err != nil {
  262. errorf("transport: loopyWriter.run returning. Err: %v", err)
  263. }
  264. t.conn.Close()
  265. close(t.writerDone)
  266. }()
  267. go t.keepalive()
  268. return t, nil
  269. }
  270. // operateHeader takes action on the decoded headers.
  271. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
  272. streamID := frame.Header().StreamID
  273. state := decodeState{serverSide: true}
  274. if err := state.decodeHeader(frame); err != nil {
  275. if se, ok := status.FromError(err); ok {
  276. t.controlBuf.put(&cleanupStream{
  277. streamID: streamID,
  278. rst: true,
  279. rstCode: statusCodeConvTab[se.Code()],
  280. onWrite: func() {},
  281. })
  282. }
  283. return false
  284. }
  285. buf := newRecvBuffer()
  286. s := &Stream{
  287. id: streamID,
  288. st: t,
  289. buf: buf,
  290. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  291. recvCompress: state.encoding,
  292. method: state.method,
  293. contentSubtype: state.contentSubtype,
  294. }
  295. if frame.StreamEnded() {
  296. // s is just created by the caller. No lock needed.
  297. s.state = streamReadDone
  298. }
  299. if state.timeoutSet {
  300. s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
  301. } else {
  302. s.ctx, s.cancel = context.WithCancel(t.ctx)
  303. }
  304. pr := &peer.Peer{
  305. Addr: t.remoteAddr,
  306. }
  307. // Attach Auth info if there is any.
  308. if t.authInfo != nil {
  309. pr.AuthInfo = t.authInfo
  310. }
  311. s.ctx = peer.NewContext(s.ctx, pr)
  312. // Attach the received metadata to the context.
  313. if len(state.mdata) > 0 {
  314. s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
  315. }
  316. if state.statsTags != nil {
  317. s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
  318. }
  319. if state.statsTrace != nil {
  320. s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
  321. }
  322. if t.inTapHandle != nil {
  323. var err error
  324. info := &tap.Info{
  325. FullMethodName: state.method,
  326. }
  327. s.ctx, err = t.inTapHandle(s.ctx, info)
  328. if err != nil {
  329. warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
  330. t.controlBuf.put(&cleanupStream{
  331. streamID: s.id,
  332. rst: true,
  333. rstCode: http2.ErrCodeRefusedStream,
  334. onWrite: func() {},
  335. })
  336. return false
  337. }
  338. }
  339. t.mu.Lock()
  340. if t.state != reachable {
  341. t.mu.Unlock()
  342. return false
  343. }
  344. if uint32(len(t.activeStreams)) >= t.maxStreams {
  345. t.mu.Unlock()
  346. t.controlBuf.put(&cleanupStream{
  347. streamID: streamID,
  348. rst: true,
  349. rstCode: http2.ErrCodeRefusedStream,
  350. onWrite: func() {},
  351. })
  352. return false
  353. }
  354. if streamID%2 != 1 || streamID <= t.maxStreamID {
  355. t.mu.Unlock()
  356. // illegal gRPC stream id.
  357. errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
  358. return true
  359. }
  360. t.maxStreamID = streamID
  361. t.activeStreams[streamID] = s
  362. if len(t.activeStreams) == 1 {
  363. t.idle = time.Time{}
  364. }
  365. t.mu.Unlock()
  366. if channelz.IsOn() {
  367. atomic.AddInt64(&t.czData.streamsStarted, 1)
  368. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  369. }
  370. s.requestRead = func(n int) {
  371. t.adjustWindow(s, uint32(n))
  372. }
  373. s.ctx = traceCtx(s.ctx, s.method)
  374. if t.stats != nil {
  375. s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  376. inHeader := &stats.InHeader{
  377. FullMethod: s.method,
  378. RemoteAddr: t.remoteAddr,
  379. LocalAddr: t.localAddr,
  380. Compression: s.recvCompress,
  381. WireLength: int(frame.Header().Length),
  382. }
  383. t.stats.HandleRPC(s.ctx, inHeader)
  384. }
  385. s.ctxDone = s.ctx.Done()
  386. s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
  387. s.trReader = &transportReader{
  388. reader: &recvBufferReader{
  389. ctx: s.ctx,
  390. ctxDone: s.ctxDone,
  391. recv: s.buf,
  392. },
  393. windowHandler: func(n int) {
  394. t.updateWindow(s, uint32(n))
  395. },
  396. }
  397. // Register the stream with loopy.
  398. t.controlBuf.put(&registerStream{
  399. streamID: s.id,
  400. wq: s.wq,
  401. })
  402. handle(s)
  403. return false
  404. }
  405. // HandleStreams receives incoming streams using the given handler. This is
  406. // typically run in a separate goroutine.
  407. // traceCtx attaches trace to ctx and returns the new context.
  408. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  409. defer close(t.readerDone)
  410. for {
  411. frame, err := t.framer.fr.ReadFrame()
  412. atomic.StoreUint32(&t.activity, 1)
  413. if err != nil {
  414. if se, ok := err.(http2.StreamError); ok {
  415. warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
  416. t.mu.Lock()
  417. s := t.activeStreams[se.StreamID]
  418. t.mu.Unlock()
  419. if s != nil {
  420. t.closeStream(s, true, se.Code, nil, false)
  421. } else {
  422. t.controlBuf.put(&cleanupStream{
  423. streamID: se.StreamID,
  424. rst: true,
  425. rstCode: se.Code,
  426. onWrite: func() {},
  427. })
  428. }
  429. continue
  430. }
  431. if err == io.EOF || err == io.ErrUnexpectedEOF {
  432. t.Close()
  433. return
  434. }
  435. warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  436. t.Close()
  437. return
  438. }
  439. switch frame := frame.(type) {
  440. case *http2.MetaHeadersFrame:
  441. if t.operateHeaders(frame, handle, traceCtx) {
  442. t.Close()
  443. break
  444. }
  445. case *http2.DataFrame:
  446. t.handleData(frame)
  447. case *http2.RSTStreamFrame:
  448. t.handleRSTStream(frame)
  449. case *http2.SettingsFrame:
  450. t.handleSettings(frame)
  451. case *http2.PingFrame:
  452. t.handlePing(frame)
  453. case *http2.WindowUpdateFrame:
  454. t.handleWindowUpdate(frame)
  455. case *http2.GoAwayFrame:
  456. // TODO: Handle GoAway from the client appropriately.
  457. default:
  458. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  459. }
  460. }
  461. }
  462. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  463. t.mu.Lock()
  464. defer t.mu.Unlock()
  465. if t.activeStreams == nil {
  466. // The transport is closing.
  467. return nil, false
  468. }
  469. s, ok := t.activeStreams[f.Header().StreamID]
  470. if !ok {
  471. // The stream is already done.
  472. return nil, false
  473. }
  474. return s, true
  475. }
  476. // adjustWindow sends out extra window update over the initial window size
  477. // of stream if the application is requesting data larger in size than
  478. // the window.
  479. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  480. if w := s.fc.maybeAdjust(n); w > 0 {
  481. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  482. }
  483. }
  484. // updateWindow adjusts the inbound quota for the stream and the transport.
  485. // Window updates will deliver to the controller for sending when
  486. // the cumulative quota exceeds the corresponding threshold.
  487. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  488. if w := s.fc.onRead(n); w > 0 {
  489. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  490. increment: w,
  491. })
  492. }
  493. }
  494. // updateFlowControl updates the incoming flow control windows
  495. // for the transport and the stream based on the current bdp
  496. // estimation.
  497. func (t *http2Server) updateFlowControl(n uint32) {
  498. t.mu.Lock()
  499. for _, s := range t.activeStreams {
  500. s.fc.newLimit(n)
  501. }
  502. t.initialWindowSize = int32(n)
  503. t.mu.Unlock()
  504. t.controlBuf.put(&outgoingWindowUpdate{
  505. streamID: 0,
  506. increment: t.fc.newLimit(n),
  507. })
  508. t.controlBuf.put(&outgoingSettings{
  509. ss: []http2.Setting{
  510. {
  511. ID: http2.SettingInitialWindowSize,
  512. Val: n,
  513. },
  514. },
  515. })
  516. }
  517. func (t *http2Server) handleData(f *http2.DataFrame) {
  518. size := f.Header().Length
  519. var sendBDPPing bool
  520. if t.bdpEst != nil {
  521. sendBDPPing = t.bdpEst.add(size)
  522. }
  523. // Decouple connection's flow control from application's read.
  524. // An update on connection's flow control should not depend on
  525. // whether user application has read the data or not. Such a
  526. // restriction is already imposed on the stream's flow control,
  527. // and therefore the sender will be blocked anyways.
  528. // Decoupling the connection flow control will prevent other
  529. // active(fast) streams from starving in presence of slow or
  530. // inactive streams.
  531. if w := t.fc.onData(size); w > 0 {
  532. t.controlBuf.put(&outgoingWindowUpdate{
  533. streamID: 0,
  534. increment: w,
  535. })
  536. }
  537. if sendBDPPing {
  538. // Avoid excessive ping detection (e.g. in an L7 proxy)
  539. // by sending a window update prior to the BDP ping.
  540. if w := t.fc.reset(); w > 0 {
  541. t.controlBuf.put(&outgoingWindowUpdate{
  542. streamID: 0,
  543. increment: w,
  544. })
  545. }
  546. t.controlBuf.put(bdpPing)
  547. }
  548. // Select the right stream to dispatch.
  549. s, ok := t.getStream(f)
  550. if !ok {
  551. return
  552. }
  553. if size > 0 {
  554. if err := s.fc.onData(size); err != nil {
  555. t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
  556. return
  557. }
  558. if f.Header().Flags.Has(http2.FlagDataPadded) {
  559. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  560. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  561. }
  562. }
  563. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  564. // guarantee f.Data() is consumed before the arrival of next frame.
  565. // Can this copy be eliminated?
  566. if len(f.Data()) > 0 {
  567. data := make([]byte, len(f.Data()))
  568. copy(data, f.Data())
  569. s.write(recvMsg{data: data})
  570. }
  571. }
  572. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  573. // Received the end of stream from the client.
  574. s.compareAndSwapState(streamActive, streamReadDone)
  575. s.write(recvMsg{err: io.EOF})
  576. }
  577. }
  578. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  579. s, ok := t.getStream(f)
  580. if !ok {
  581. return
  582. }
  583. t.closeStream(s, false, 0, nil, false)
  584. }
  585. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  586. if f.IsAck() {
  587. return
  588. }
  589. var ss []http2.Setting
  590. var updateFuncs []func()
  591. f.ForeachSetting(func(s http2.Setting) error {
  592. switch s.ID {
  593. case http2.SettingMaxHeaderListSize:
  594. updateFuncs = append(updateFuncs, func() {
  595. t.maxSendHeaderListSize = new(uint32)
  596. *t.maxSendHeaderListSize = s.Val
  597. })
  598. default:
  599. ss = append(ss, s)
  600. }
  601. return nil
  602. })
  603. t.controlBuf.executeAndPut(func(interface{}) bool {
  604. for _, f := range updateFuncs {
  605. f()
  606. }
  607. return true
  608. }, &incomingSettings{
  609. ss: ss,
  610. })
  611. }
  612. const (
  613. maxPingStrikes = 2
  614. defaultPingTimeout = 2 * time.Hour
  615. )
  616. func (t *http2Server) handlePing(f *http2.PingFrame) {
  617. if f.IsAck() {
  618. if f.Data == goAwayPing.data && t.drainChan != nil {
  619. close(t.drainChan)
  620. return
  621. }
  622. // Maybe it's a BDP ping.
  623. if t.bdpEst != nil {
  624. t.bdpEst.calculate(f.Data)
  625. }
  626. return
  627. }
  628. pingAck := &ping{ack: true}
  629. copy(pingAck.data[:], f.Data[:])
  630. t.controlBuf.put(pingAck)
  631. now := time.Now()
  632. defer func() {
  633. t.lastPingAt = now
  634. }()
  635. // A reset ping strikes means that we don't need to check for policy
  636. // violation for this ping and the pingStrikes counter should be set
  637. // to 0.
  638. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  639. t.pingStrikes = 0
  640. return
  641. }
  642. t.mu.Lock()
  643. ns := len(t.activeStreams)
  644. t.mu.Unlock()
  645. if ns < 1 && !t.kep.PermitWithoutStream {
  646. // Keepalive shouldn't be active thus, this new ping should
  647. // have come after at least defaultPingTimeout.
  648. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  649. t.pingStrikes++
  650. }
  651. } else {
  652. // Check if keepalive policy is respected.
  653. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  654. t.pingStrikes++
  655. }
  656. }
  657. if t.pingStrikes > maxPingStrikes {
  658. // Send goaway and close the connection.
  659. errorf("transport: Got too many pings from the client, closing the connection.")
  660. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
  661. }
  662. }
  663. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  664. t.controlBuf.put(&incomingWindowUpdate{
  665. streamID: f.Header().StreamID,
  666. increment: f.Increment,
  667. })
  668. }
  669. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
  670. for k, vv := range md {
  671. if isReservedHeader(k) {
  672. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  673. continue
  674. }
  675. for _, v := range vv {
  676. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  677. }
  678. }
  679. return headerFields
  680. }
  681. func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
  682. if t.maxSendHeaderListSize == nil {
  683. return true
  684. }
  685. hdrFrame := it.(*headerFrame)
  686. var sz int64
  687. for _, f := range hdrFrame.hf {
  688. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  689. errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
  690. return false
  691. }
  692. }
  693. return true
  694. }
  695. // WriteHeader sends the header metedata md back to the client.
  696. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  697. if s.updateHeaderSent() || s.getState() == streamDone {
  698. return ErrIllegalHeaderWrite
  699. }
  700. s.hdrMu.Lock()
  701. if md.Len() > 0 {
  702. if s.header.Len() > 0 {
  703. s.header = metadata.Join(s.header, md)
  704. } else {
  705. s.header = md
  706. }
  707. }
  708. if err := t.writeHeaderLocked(s); err != nil {
  709. s.hdrMu.Unlock()
  710. return err
  711. }
  712. s.hdrMu.Unlock()
  713. return nil
  714. }
  715. func (t *http2Server) writeHeaderLocked(s *Stream) error {
  716. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  717. // first and create a slice of that exact size.
  718. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  719. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  720. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  721. if s.sendCompress != "" {
  722. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  723. }
  724. headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  725. success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
  726. streamID: s.id,
  727. hf: headerFields,
  728. endStream: false,
  729. onWrite: func() {
  730. atomic.StoreUint32(&t.resetPingStrikes, 1)
  731. },
  732. })
  733. if !success {
  734. if err != nil {
  735. return err
  736. }
  737. t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
  738. return ErrHeaderListSizeLimitViolation
  739. }
  740. if t.stats != nil {
  741. // Note: WireLength is not set in outHeader.
  742. // TODO(mmukhi): Revisit this later, if needed.
  743. outHeader := &stats.OutHeader{}
  744. t.stats.HandleRPC(s.Context(), outHeader)
  745. }
  746. return nil
  747. }
  748. // WriteStatus sends stream status to the client and terminates the stream.
  749. // There is no further I/O operations being able to perform on this stream.
  750. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  751. // OK is adopted.
  752. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  753. if s.getState() == streamDone {
  754. return nil
  755. }
  756. s.hdrMu.Lock()
  757. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  758. // first and create a slice of that exact size.
  759. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  760. if !s.updateHeaderSent() { // No headers have been sent.
  761. if len(s.header) > 0 { // Send a separate header frame.
  762. if err := t.writeHeaderLocked(s); err != nil {
  763. s.hdrMu.Unlock()
  764. return err
  765. }
  766. } else { // Send a trailer only response.
  767. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  768. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  769. }
  770. }
  771. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  772. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  773. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  774. stBytes, err := proto.Marshal(p)
  775. if err != nil {
  776. // TODO: return error instead, when callers are able to handle it.
  777. grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
  778. } else {
  779. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  780. }
  781. }
  782. // Attach the trailer metadata.
  783. headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  784. trailingHeader := &headerFrame{
  785. streamID: s.id,
  786. hf: headerFields,
  787. endStream: true,
  788. onWrite: func() {
  789. atomic.StoreUint32(&t.resetPingStrikes, 1)
  790. },
  791. }
  792. s.hdrMu.Unlock()
  793. success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
  794. if !success {
  795. if err != nil {
  796. return err
  797. }
  798. t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
  799. return ErrHeaderListSizeLimitViolation
  800. }
  801. t.closeStream(s, false, 0, trailingHeader, true)
  802. if t.stats != nil {
  803. t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
  804. }
  805. return nil
  806. }
  807. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  808. // is returns if it fails (e.g., framing error, transport error).
  809. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  810. if !s.isHeaderSent() { // Headers haven't been written yet.
  811. if err := t.WriteHeader(s, nil); err != nil {
  812. // TODO(mmukhi, dfawley): Make sure this is the right code to return.
  813. return status.Errorf(codes.Internal, "transport: %v", err)
  814. }
  815. } else {
  816. // Writing headers checks for this condition.
  817. if s.getState() == streamDone {
  818. // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
  819. s.cancel()
  820. select {
  821. case <-t.ctx.Done():
  822. return ErrConnClosing
  823. default:
  824. }
  825. return ContextErr(s.ctx.Err())
  826. }
  827. }
  828. // Add some data to header frame so that we can equally distribute bytes across frames.
  829. emptyLen := http2MaxFrameLen - len(hdr)
  830. if emptyLen > len(data) {
  831. emptyLen = len(data)
  832. }
  833. hdr = append(hdr, data[:emptyLen]...)
  834. data = data[emptyLen:]
  835. df := &dataFrame{
  836. streamID: s.id,
  837. h: hdr,
  838. d: data,
  839. onEachWrite: func() {
  840. atomic.StoreUint32(&t.resetPingStrikes, 1)
  841. },
  842. }
  843. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  844. select {
  845. case <-t.ctx.Done():
  846. return ErrConnClosing
  847. default:
  848. }
  849. return ContextErr(s.ctx.Err())
  850. }
  851. return t.controlBuf.put(df)
  852. }
  853. // keepalive running in a separate goroutine does the following:
  854. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  855. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  856. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  857. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  858. // after an additional duration of keepalive.Timeout.
  859. func (t *http2Server) keepalive() {
  860. p := &ping{}
  861. var pingSent bool
  862. maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
  863. maxAge := time.NewTimer(t.kp.MaxConnectionAge)
  864. keepalive := time.NewTimer(t.kp.Time)
  865. // NOTE: All exit paths of this function should reset their
  866. // respective timers. A failure to do so will cause the
  867. // following clean-up to deadlock and eventually leak.
  868. defer func() {
  869. if !maxIdle.Stop() {
  870. <-maxIdle.C
  871. }
  872. if !maxAge.Stop() {
  873. <-maxAge.C
  874. }
  875. if !keepalive.Stop() {
  876. <-keepalive.C
  877. }
  878. }()
  879. for {
  880. select {
  881. case <-maxIdle.C:
  882. t.mu.Lock()
  883. idle := t.idle
  884. if idle.IsZero() { // The connection is non-idle.
  885. t.mu.Unlock()
  886. maxIdle.Reset(t.kp.MaxConnectionIdle)
  887. continue
  888. }
  889. val := t.kp.MaxConnectionIdle - time.Since(idle)
  890. t.mu.Unlock()
  891. if val <= 0 {
  892. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  893. // Gracefully close the connection.
  894. t.drain(http2.ErrCodeNo, []byte{})
  895. // Resetting the timer so that the clean-up doesn't deadlock.
  896. maxIdle.Reset(infinity)
  897. return
  898. }
  899. maxIdle.Reset(val)
  900. case <-maxAge.C:
  901. t.drain(http2.ErrCodeNo, []byte{})
  902. maxAge.Reset(t.kp.MaxConnectionAgeGrace)
  903. select {
  904. case <-maxAge.C:
  905. // Close the connection after grace period.
  906. t.Close()
  907. // Resetting the timer so that the clean-up doesn't deadlock.
  908. maxAge.Reset(infinity)
  909. case <-t.ctx.Done():
  910. }
  911. return
  912. case <-keepalive.C:
  913. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  914. pingSent = false
  915. keepalive.Reset(t.kp.Time)
  916. continue
  917. }
  918. if pingSent {
  919. t.Close()
  920. // Resetting the timer so that the clean-up doesn't deadlock.
  921. keepalive.Reset(infinity)
  922. return
  923. }
  924. pingSent = true
  925. if channelz.IsOn() {
  926. atomic.AddInt64(&t.czData.kpCount, 1)
  927. }
  928. t.controlBuf.put(p)
  929. keepalive.Reset(t.kp.Timeout)
  930. case <-t.ctx.Done():
  931. return
  932. }
  933. }
  934. }
  935. // Close starts shutting down the http2Server transport.
  936. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  937. // could cause some resource issue. Revisit this later.
  938. func (t *http2Server) Close() error {
  939. t.mu.Lock()
  940. if t.state == closing {
  941. t.mu.Unlock()
  942. return errors.New("transport: Close() was already called")
  943. }
  944. t.state = closing
  945. streams := t.activeStreams
  946. t.activeStreams = nil
  947. t.mu.Unlock()
  948. t.controlBuf.finish()
  949. t.cancel()
  950. err := t.conn.Close()
  951. if channelz.IsOn() {
  952. channelz.RemoveEntry(t.channelzID)
  953. }
  954. // Cancel all active streams.
  955. for _, s := range streams {
  956. s.cancel()
  957. }
  958. if t.stats != nil {
  959. connEnd := &stats.ConnEnd{}
  960. t.stats.HandleConn(t.ctx, connEnd)
  961. }
  962. return err
  963. }
  964. // closeStream clears the footprint of a stream when the stream is not needed
  965. // any more.
  966. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  967. if s.swapState(streamDone) == streamDone {
  968. // If the stream was already done, return.
  969. return
  970. }
  971. // In case stream sending and receiving are invoked in separate
  972. // goroutines (e.g., bi-directional streaming), cancel needs to be
  973. // called to interrupt the potential blocking on other goroutines.
  974. s.cancel()
  975. cleanup := &cleanupStream{
  976. streamID: s.id,
  977. rst: rst,
  978. rstCode: rstCode,
  979. onWrite: func() {
  980. t.mu.Lock()
  981. if t.activeStreams != nil {
  982. delete(t.activeStreams, s.id)
  983. if len(t.activeStreams) == 0 {
  984. t.idle = time.Now()
  985. }
  986. }
  987. t.mu.Unlock()
  988. if channelz.IsOn() {
  989. if eosReceived {
  990. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  991. } else {
  992. atomic.AddInt64(&t.czData.streamsFailed, 1)
  993. }
  994. }
  995. },
  996. }
  997. if hdr != nil {
  998. hdr.cleanup = cleanup
  999. t.controlBuf.put(hdr)
  1000. } else {
  1001. t.controlBuf.put(cleanup)
  1002. }
  1003. }
  1004. func (t *http2Server) RemoteAddr() net.Addr {
  1005. return t.remoteAddr
  1006. }
  1007. func (t *http2Server) Drain() {
  1008. t.drain(http2.ErrCodeNo, []byte{})
  1009. }
  1010. func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
  1011. t.mu.Lock()
  1012. defer t.mu.Unlock()
  1013. if t.drainChan != nil {
  1014. return
  1015. }
  1016. t.drainChan = make(chan struct{})
  1017. t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
  1018. }
  1019. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  1020. // Handles outgoing GoAway and returns true if loopy needs to put itself
  1021. // in draining mode.
  1022. func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  1023. t.mu.Lock()
  1024. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  1025. t.mu.Unlock()
  1026. // The transport is closing.
  1027. return false, ErrConnClosing
  1028. }
  1029. sid := t.maxStreamID
  1030. if !g.headsUp {
  1031. // Stop accepting more streams now.
  1032. t.state = draining
  1033. if len(t.activeStreams) == 0 {
  1034. g.closeConn = true
  1035. }
  1036. t.mu.Unlock()
  1037. if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  1038. return false, err
  1039. }
  1040. if g.closeConn {
  1041. // Abruptly close the connection following the GoAway (via
  1042. // loopywriter). But flush out what's inside the buffer first.
  1043. t.framer.writer.Flush()
  1044. return false, fmt.Errorf("transport: Connection closing")
  1045. }
  1046. return true, nil
  1047. }
  1048. t.mu.Unlock()
  1049. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1050. // Follow that with a ping and wait for the ack to come back or a timer
  1051. // to expire. During this time accept new streams since they might have
  1052. // originated before the GoAway reaches the client.
  1053. // After getting the ack or timer expiration send out another GoAway this
  1054. // time with an ID of the max stream server intends to process.
  1055. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
  1056. return false, err
  1057. }
  1058. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1059. return false, err
  1060. }
  1061. go func() {
  1062. timer := time.NewTimer(time.Minute)
  1063. defer timer.Stop()
  1064. select {
  1065. case <-t.drainChan:
  1066. case <-timer.C:
  1067. case <-t.ctx.Done():
  1068. return
  1069. }
  1070. t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1071. }()
  1072. return false, nil
  1073. }
  1074. func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
  1075. s := channelz.SocketInternalMetric{
  1076. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1077. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1078. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1079. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1080. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1081. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1082. LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1083. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1084. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1085. LocalFlowControlWindow: int64(t.fc.getSize()),
  1086. SocketOptions: channelz.GetSocketOption(t.conn),
  1087. LocalAddr: t.localAddr,
  1088. RemoteAddr: t.remoteAddr,
  1089. // RemoteName :
  1090. }
  1091. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1092. s.Security = au.GetSecurityValue()
  1093. }
  1094. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1095. return &s
  1096. }
  1097. func (t *http2Server) IncrMsgSent() {
  1098. atomic.AddInt64(&t.czData.msgSent, 1)
  1099. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1100. }
  1101. func (t *http2Server) IncrMsgRecv() {
  1102. atomic.AddInt64(&t.czData.msgRecv, 1)
  1103. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1104. }
  1105. func (t *http2Server) getOutFlowWindow() int64 {
  1106. resp := make(chan uint32)
  1107. timer := time.NewTimer(time.Second)
  1108. defer timer.Stop()
  1109. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1110. select {
  1111. case sz := <-resp:
  1112. return int64(sz)
  1113. case <-t.ctxDone:
  1114. return -1
  1115. case <-timer.C:
  1116. return -2
  1117. }
  1118. }
  1119. func getJitter(v time.Duration) time.Duration {
  1120. if v == infinity {
  1121. return 0
  1122. }
  1123. // Generate a jitter between +/- 10% of the value.
  1124. r := int64(v / 10)
  1125. j := grpcrand.Int63n(2*r) - r
  1126. return time.Duration(j)
  1127. }