api_pre17.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682
  1. // Copyright 2011 Google Inc. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. // +build !appengine
  5. // +build !go1.7
  6. package internal
  7. import (
  8. "bytes"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "log"
  13. "net"
  14. "net/http"
  15. "net/url"
  16. "os"
  17. "runtime"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/golang/protobuf/proto"
  24. netcontext "golang.org/x/net/context"
  25. basepb "google.golang.org/appengine/internal/base"
  26. logpb "google.golang.org/appengine/internal/log"
  27. remotepb "google.golang.org/appengine/internal/remote_api"
  28. )
  29. const (
  30. apiPath = "/rpc_http"
  31. defaultTicketSuffix = "/default.20150612t184001.0"
  32. )
  33. var (
  34. // Incoming headers.
  35. ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
  36. dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
  37. traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
  38. curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
  39. userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
  40. remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
  41. // Outgoing headers.
  42. apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
  43. apiEndpointHeaderValue = []string{"app-engine-apis"}
  44. apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
  45. apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
  46. apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
  47. apiContentType = http.CanonicalHeaderKey("Content-Type")
  48. apiContentTypeValue = []string{"application/octet-stream"}
  49. logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
  50. apiHTTPClient = &http.Client{
  51. Transport: &http.Transport{
  52. Proxy: http.ProxyFromEnvironment,
  53. Dial: limitDial,
  54. },
  55. }
  56. defaultTicketOnce sync.Once
  57. defaultTicket string
  58. )
  59. func apiURL() *url.URL {
  60. host, port := "appengine.googleapis.internal", "10001"
  61. if h := os.Getenv("API_HOST"); h != "" {
  62. host = h
  63. }
  64. if p := os.Getenv("API_PORT"); p != "" {
  65. port = p
  66. }
  67. return &url.URL{
  68. Scheme: "http",
  69. Host: host + ":" + port,
  70. Path: apiPath,
  71. }
  72. }
  73. func handleHTTP(w http.ResponseWriter, r *http.Request) {
  74. c := &context{
  75. req: r,
  76. outHeader: w.Header(),
  77. apiURL: apiURL(),
  78. }
  79. stopFlushing := make(chan int)
  80. ctxs.Lock()
  81. ctxs.m[r] = c
  82. ctxs.Unlock()
  83. defer func() {
  84. ctxs.Lock()
  85. delete(ctxs.m, r)
  86. ctxs.Unlock()
  87. }()
  88. // Patch up RemoteAddr so it looks reasonable.
  89. if addr := r.Header.Get(userIPHeader); addr != "" {
  90. r.RemoteAddr = addr
  91. } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
  92. r.RemoteAddr = addr
  93. } else {
  94. // Should not normally reach here, but pick a sensible default anyway.
  95. r.RemoteAddr = "127.0.0.1"
  96. }
  97. // The address in the headers will most likely be of these forms:
  98. // 123.123.123.123
  99. // 2001:db8::1
  100. // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
  101. if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
  102. // Assume the remote address is only a host; add a default port.
  103. r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
  104. }
  105. // Start goroutine responsible for flushing app logs.
  106. // This is done after adding c to ctx.m (and stopped before removing it)
  107. // because flushing logs requires making an API call.
  108. go c.logFlusher(stopFlushing)
  109. executeRequestSafely(c, r)
  110. c.outHeader = nil // make sure header changes aren't respected any more
  111. stopFlushing <- 1 // any logging beyond this point will be dropped
  112. // Flush any pending logs asynchronously.
  113. c.pendingLogs.Lock()
  114. flushes := c.pendingLogs.flushes
  115. if len(c.pendingLogs.lines) > 0 {
  116. flushes++
  117. }
  118. c.pendingLogs.Unlock()
  119. go c.flushLog(false)
  120. w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
  121. // Avoid nil Write call if c.Write is never called.
  122. if c.outCode != 0 {
  123. w.WriteHeader(c.outCode)
  124. }
  125. if c.outBody != nil {
  126. w.Write(c.outBody)
  127. }
  128. }
  129. func executeRequestSafely(c *context, r *http.Request) {
  130. defer func() {
  131. if x := recover(); x != nil {
  132. logf(c, 4, "%s", renderPanic(x)) // 4 == critical
  133. c.outCode = 500
  134. }
  135. }()
  136. http.DefaultServeMux.ServeHTTP(c, r)
  137. }
  138. func renderPanic(x interface{}) string {
  139. buf := make([]byte, 16<<10) // 16 KB should be plenty
  140. buf = buf[:runtime.Stack(buf, false)]
  141. // Remove the first few stack frames:
  142. // this func
  143. // the recover closure in the caller
  144. // That will root the stack trace at the site of the panic.
  145. const (
  146. skipStart = "internal.renderPanic"
  147. skipFrames = 2
  148. )
  149. start := bytes.Index(buf, []byte(skipStart))
  150. p := start
  151. for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
  152. p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
  153. if p < 0 {
  154. break
  155. }
  156. }
  157. if p >= 0 {
  158. // buf[start:p+1] is the block to remove.
  159. // Copy buf[p+1:] over buf[start:] and shrink buf.
  160. copy(buf[start:], buf[p+1:])
  161. buf = buf[:len(buf)-(p+1-start)]
  162. }
  163. // Add panic heading.
  164. head := fmt.Sprintf("panic: %v\n\n", x)
  165. if len(head) > len(buf) {
  166. // Extremely unlikely to happen.
  167. return head
  168. }
  169. copy(buf[len(head):], buf)
  170. copy(buf, head)
  171. return string(buf)
  172. }
  173. var ctxs = struct {
  174. sync.Mutex
  175. m map[*http.Request]*context
  176. bg *context // background context, lazily initialized
  177. // dec is used by tests to decorate the netcontext.Context returned
  178. // for a given request. This allows tests to add overrides (such as
  179. // WithAppIDOverride) to the context. The map is nil outside tests.
  180. dec map[*http.Request]func(netcontext.Context) netcontext.Context
  181. }{
  182. m: make(map[*http.Request]*context),
  183. }
  184. // context represents the context of an in-flight HTTP request.
  185. // It implements the appengine.Context and http.ResponseWriter interfaces.
  186. type context struct {
  187. req *http.Request
  188. outCode int
  189. outHeader http.Header
  190. outBody []byte
  191. pendingLogs struct {
  192. sync.Mutex
  193. lines []*logpb.UserAppLogLine
  194. flushes int
  195. }
  196. apiURL *url.URL
  197. }
  198. var contextKey = "holds a *context"
  199. // fromContext returns the App Engine context or nil if ctx is not
  200. // derived from an App Engine context.
  201. func fromContext(ctx netcontext.Context) *context {
  202. c, _ := ctx.Value(&contextKey).(*context)
  203. return c
  204. }
  205. func withContext(parent netcontext.Context, c *context) netcontext.Context {
  206. ctx := netcontext.WithValue(parent, &contextKey, c)
  207. if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
  208. ctx = withNamespace(ctx, ns)
  209. }
  210. return ctx
  211. }
  212. func toContext(c *context) netcontext.Context {
  213. return withContext(netcontext.Background(), c)
  214. }
  215. func IncomingHeaders(ctx netcontext.Context) http.Header {
  216. if c := fromContext(ctx); c != nil {
  217. return c.req.Header
  218. }
  219. return nil
  220. }
  221. func ReqContext(req *http.Request) netcontext.Context {
  222. return WithContext(netcontext.Background(), req)
  223. }
  224. func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
  225. ctxs.Lock()
  226. c := ctxs.m[req]
  227. d := ctxs.dec[req]
  228. ctxs.Unlock()
  229. if d != nil {
  230. parent = d(parent)
  231. }
  232. if c == nil {
  233. // Someone passed in an http.Request that is not in-flight.
  234. // We panic here rather than panicking at a later point
  235. // so that stack traces will be more sensible.
  236. log.Panic("appengine: NewContext passed an unknown http.Request")
  237. }
  238. return withContext(parent, c)
  239. }
  240. // DefaultTicket returns a ticket used for background context or dev_appserver.
  241. func DefaultTicket() string {
  242. defaultTicketOnce.Do(func() {
  243. if IsDevAppServer() {
  244. defaultTicket = "testapp" + defaultTicketSuffix
  245. return
  246. }
  247. appID := partitionlessAppID()
  248. escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
  249. majVersion := VersionID(nil)
  250. if i := strings.Index(majVersion, "."); i > 0 {
  251. majVersion = majVersion[:i]
  252. }
  253. defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
  254. })
  255. return defaultTicket
  256. }
  257. func BackgroundContext() netcontext.Context {
  258. ctxs.Lock()
  259. defer ctxs.Unlock()
  260. if ctxs.bg != nil {
  261. return toContext(ctxs.bg)
  262. }
  263. // Compute background security ticket.
  264. ticket := DefaultTicket()
  265. ctxs.bg = &context{
  266. req: &http.Request{
  267. Header: http.Header{
  268. ticketHeader: []string{ticket},
  269. },
  270. },
  271. apiURL: apiURL(),
  272. }
  273. // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
  274. go ctxs.bg.logFlusher(make(chan int))
  275. return toContext(ctxs.bg)
  276. }
  277. // RegisterTestRequest registers the HTTP request req for testing, such that
  278. // any API calls are sent to the provided URL. It returns a closure to delete
  279. // the registration.
  280. // It should only be used by aetest package.
  281. func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
  282. c := &context{
  283. req: req,
  284. apiURL: apiURL,
  285. }
  286. ctxs.Lock()
  287. defer ctxs.Unlock()
  288. if _, ok := ctxs.m[req]; ok {
  289. log.Panic("req already associated with context")
  290. }
  291. if _, ok := ctxs.dec[req]; ok {
  292. log.Panic("req already associated with context")
  293. }
  294. if ctxs.dec == nil {
  295. ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context)
  296. }
  297. ctxs.m[req] = c
  298. ctxs.dec[req] = decorate
  299. return req, func() {
  300. ctxs.Lock()
  301. delete(ctxs.m, req)
  302. delete(ctxs.dec, req)
  303. ctxs.Unlock()
  304. }
  305. }
  306. var errTimeout = &CallError{
  307. Detail: "Deadline exceeded",
  308. Code: int32(remotepb.RpcError_CANCELLED),
  309. Timeout: true,
  310. }
  311. func (c *context) Header() http.Header { return c.outHeader }
  312. // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
  313. // codes do not permit a response body (nor response entity headers such as
  314. // Content-Length, Content-Type, etc).
  315. func bodyAllowedForStatus(status int) bool {
  316. switch {
  317. case status >= 100 && status <= 199:
  318. return false
  319. case status == 204:
  320. return false
  321. case status == 304:
  322. return false
  323. }
  324. return true
  325. }
  326. func (c *context) Write(b []byte) (int, error) {
  327. if c.outCode == 0 {
  328. c.WriteHeader(http.StatusOK)
  329. }
  330. if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
  331. return 0, http.ErrBodyNotAllowed
  332. }
  333. c.outBody = append(c.outBody, b...)
  334. return len(b), nil
  335. }
  336. func (c *context) WriteHeader(code int) {
  337. if c.outCode != 0 {
  338. logf(c, 3, "WriteHeader called multiple times on request.") // error level
  339. return
  340. }
  341. c.outCode = code
  342. }
  343. func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
  344. hreq := &http.Request{
  345. Method: "POST",
  346. URL: c.apiURL,
  347. Header: http.Header{
  348. apiEndpointHeader: apiEndpointHeaderValue,
  349. apiMethodHeader: apiMethodHeaderValue,
  350. apiContentType: apiContentTypeValue,
  351. apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
  352. },
  353. Body: ioutil.NopCloser(bytes.NewReader(body)),
  354. ContentLength: int64(len(body)),
  355. Host: c.apiURL.Host,
  356. }
  357. if info := c.req.Header.Get(dapperHeader); info != "" {
  358. hreq.Header.Set(dapperHeader, info)
  359. }
  360. if info := c.req.Header.Get(traceHeader); info != "" {
  361. hreq.Header.Set(traceHeader, info)
  362. }
  363. tr := apiHTTPClient.Transport.(*http.Transport)
  364. var timedOut int32 // atomic; set to 1 if timed out
  365. t := time.AfterFunc(timeout, func() {
  366. atomic.StoreInt32(&timedOut, 1)
  367. tr.CancelRequest(hreq)
  368. })
  369. defer t.Stop()
  370. defer func() {
  371. // Check if timeout was exceeded.
  372. if atomic.LoadInt32(&timedOut) != 0 {
  373. err = errTimeout
  374. }
  375. }()
  376. hresp, err := apiHTTPClient.Do(hreq)
  377. if err != nil {
  378. return nil, &CallError{
  379. Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
  380. Code: int32(remotepb.RpcError_UNKNOWN),
  381. }
  382. }
  383. defer hresp.Body.Close()
  384. hrespBody, err := ioutil.ReadAll(hresp.Body)
  385. if hresp.StatusCode != 200 {
  386. return nil, &CallError{
  387. Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
  388. Code: int32(remotepb.RpcError_UNKNOWN),
  389. }
  390. }
  391. if err != nil {
  392. return nil, &CallError{
  393. Detail: fmt.Sprintf("service bridge response bad: %v", err),
  394. Code: int32(remotepb.RpcError_UNKNOWN),
  395. }
  396. }
  397. return hrespBody, nil
  398. }
  399. func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
  400. if ns := NamespaceFromContext(ctx); ns != "" {
  401. if fn, ok := NamespaceMods[service]; ok {
  402. fn(in, ns)
  403. }
  404. }
  405. if f, ctx, ok := callOverrideFromContext(ctx); ok {
  406. return f(ctx, service, method, in, out)
  407. }
  408. // Handle already-done contexts quickly.
  409. select {
  410. case <-ctx.Done():
  411. return ctx.Err()
  412. default:
  413. }
  414. c := fromContext(ctx)
  415. if c == nil {
  416. // Give a good error message rather than a panic lower down.
  417. return errNotAppEngineContext
  418. }
  419. // Apply transaction modifications if we're in a transaction.
  420. if t := transactionFromContext(ctx); t != nil {
  421. if t.finished {
  422. return errors.New("transaction context has expired")
  423. }
  424. applyTransaction(in, &t.transaction)
  425. }
  426. // Default RPC timeout is 60s.
  427. timeout := 60 * time.Second
  428. if deadline, ok := ctx.Deadline(); ok {
  429. timeout = deadline.Sub(time.Now())
  430. }
  431. data, err := proto.Marshal(in)
  432. if err != nil {
  433. return err
  434. }
  435. ticket := c.req.Header.Get(ticketHeader)
  436. // Use a test ticket under test environment.
  437. if ticket == "" {
  438. if appid := ctx.Value(&appIDOverrideKey); appid != nil {
  439. ticket = appid.(string) + defaultTicketSuffix
  440. }
  441. }
  442. // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
  443. if ticket == "" {
  444. ticket = DefaultTicket()
  445. }
  446. req := &remotepb.Request{
  447. ServiceName: &service,
  448. Method: &method,
  449. Request: data,
  450. RequestId: &ticket,
  451. }
  452. hreqBody, err := proto.Marshal(req)
  453. if err != nil {
  454. return err
  455. }
  456. hrespBody, err := c.post(hreqBody, timeout)
  457. if err != nil {
  458. return err
  459. }
  460. res := &remotepb.Response{}
  461. if err := proto.Unmarshal(hrespBody, res); err != nil {
  462. return err
  463. }
  464. if res.RpcError != nil {
  465. ce := &CallError{
  466. Detail: res.RpcError.GetDetail(),
  467. Code: *res.RpcError.Code,
  468. }
  469. switch remotepb.RpcError_ErrorCode(ce.Code) {
  470. case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
  471. ce.Timeout = true
  472. }
  473. return ce
  474. }
  475. if res.ApplicationError != nil {
  476. return &APIError{
  477. Service: *req.ServiceName,
  478. Detail: res.ApplicationError.GetDetail(),
  479. Code: *res.ApplicationError.Code,
  480. }
  481. }
  482. if res.Exception != nil || res.JavaException != nil {
  483. // This shouldn't happen, but let's be defensive.
  484. return &CallError{
  485. Detail: "service bridge returned exception",
  486. Code: int32(remotepb.RpcError_UNKNOWN),
  487. }
  488. }
  489. return proto.Unmarshal(res.Response, out)
  490. }
  491. func (c *context) Request() *http.Request {
  492. return c.req
  493. }
  494. func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
  495. // Truncate long log lines.
  496. // TODO(dsymonds): Check if this is still necessary.
  497. const lim = 8 << 10
  498. if len(*ll.Message) > lim {
  499. suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
  500. ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
  501. }
  502. c.pendingLogs.Lock()
  503. c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
  504. c.pendingLogs.Unlock()
  505. }
  506. var logLevelName = map[int64]string{
  507. 0: "DEBUG",
  508. 1: "INFO",
  509. 2: "WARNING",
  510. 3: "ERROR",
  511. 4: "CRITICAL",
  512. }
  513. func logf(c *context, level int64, format string, args ...interface{}) {
  514. if c == nil {
  515. panic("not an App Engine context")
  516. }
  517. s := fmt.Sprintf(format, args...)
  518. s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
  519. c.addLogLine(&logpb.UserAppLogLine{
  520. TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
  521. Level: &level,
  522. Message: &s,
  523. })
  524. log.Print(logLevelName[level] + ": " + s)
  525. }
  526. // flushLog attempts to flush any pending logs to the appserver.
  527. // It should not be called concurrently.
  528. func (c *context) flushLog(force bool) (flushed bool) {
  529. c.pendingLogs.Lock()
  530. // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
  531. n, rem := 0, 30<<20
  532. for ; n < len(c.pendingLogs.lines); n++ {
  533. ll := c.pendingLogs.lines[n]
  534. // Each log line will require about 3 bytes of overhead.
  535. nb := proto.Size(ll) + 3
  536. if nb > rem {
  537. break
  538. }
  539. rem -= nb
  540. }
  541. lines := c.pendingLogs.lines[:n]
  542. c.pendingLogs.lines = c.pendingLogs.lines[n:]
  543. c.pendingLogs.Unlock()
  544. if len(lines) == 0 && !force {
  545. // Nothing to flush.
  546. return false
  547. }
  548. rescueLogs := false
  549. defer func() {
  550. if rescueLogs {
  551. c.pendingLogs.Lock()
  552. c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
  553. c.pendingLogs.Unlock()
  554. }
  555. }()
  556. buf, err := proto.Marshal(&logpb.UserAppLogGroup{
  557. LogLine: lines,
  558. })
  559. if err != nil {
  560. log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
  561. rescueLogs = true
  562. return false
  563. }
  564. req := &logpb.FlushRequest{
  565. Logs: buf,
  566. }
  567. res := &basepb.VoidProto{}
  568. c.pendingLogs.Lock()
  569. c.pendingLogs.flushes++
  570. c.pendingLogs.Unlock()
  571. if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
  572. log.Printf("internal.flushLog: Flush RPC: %v", err)
  573. rescueLogs = true
  574. return false
  575. }
  576. return true
  577. }
  578. const (
  579. // Log flushing parameters.
  580. flushInterval = 1 * time.Second
  581. forceFlushInterval = 60 * time.Second
  582. )
  583. func (c *context) logFlusher(stop <-chan int) {
  584. lastFlush := time.Now()
  585. tick := time.NewTicker(flushInterval)
  586. for {
  587. select {
  588. case <-stop:
  589. // Request finished.
  590. tick.Stop()
  591. return
  592. case <-tick.C:
  593. force := time.Now().Sub(lastFlush) > forceFlushInterval
  594. if c.flushLog(force) {
  595. lastFlush = time.Now()
  596. }
  597. }
  598. }
  599. }
  600. func ContextForTesting(req *http.Request) netcontext.Context {
  601. return toContext(&context{req: req})
  602. }