sink.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog
  19. import (
  20. "bufio"
  21. "encoding/binary"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "sync"
  26. "time"
  27. "github.com/golang/protobuf/proto"
  28. pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  29. "google.golang.org/grpc/grpclog"
  30. )
  31. var (
  32. defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
  33. )
  34. // SetDefaultSink sets the sink where binary logs will be written to.
  35. //
  36. // Not thread safe. Only set during initialization.
  37. func SetDefaultSink(s Sink) {
  38. if defaultSink != nil {
  39. defaultSink.Close()
  40. }
  41. defaultSink = s
  42. }
  43. // Sink writes log entry into the binary log sink.
  44. type Sink interface {
  45. // Write will be called to write the log entry into the sink.
  46. //
  47. // It should be thread-safe so it can be called in parallel.
  48. Write(*pb.GrpcLogEntry) error
  49. // Close will be called when the Sink is replaced by a new Sink.
  50. Close() error
  51. }
  52. type noopSink struct{}
  53. func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
  54. func (ns *noopSink) Close() error { return nil }
  55. // newWriterSink creates a binary log sink with the given writer.
  56. //
  57. // Write() marshalls the proto message and writes it to the given writer. Each
  58. // message is prefixed with a 4 byte big endian unsigned integer as the length.
  59. //
  60. // No buffer is done, Close() doesn't try to close the writer.
  61. func newWriterSink(w io.Writer) *writerSink {
  62. return &writerSink{out: w}
  63. }
  64. type writerSink struct {
  65. out io.Writer
  66. }
  67. func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
  68. b, err := proto.Marshal(e)
  69. if err != nil {
  70. grpclog.Infof("binary logging: failed to marshal proto message: %v", err)
  71. }
  72. hdr := make([]byte, 4)
  73. binary.BigEndian.PutUint32(hdr, uint32(len(b)))
  74. if _, err := ws.out.Write(hdr); err != nil {
  75. return err
  76. }
  77. if _, err := ws.out.Write(b); err != nil {
  78. return err
  79. }
  80. return nil
  81. }
  82. func (ws *writerSink) Close() error { return nil }
  83. type bufWriteCloserSink struct {
  84. mu sync.Mutex
  85. closer io.Closer
  86. out *writerSink // out is built on buf.
  87. buf *bufio.Writer // buf is kept for flush.
  88. writeStartOnce sync.Once
  89. writeTicker *time.Ticker
  90. }
  91. func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
  92. // Start the write loop when Write is called.
  93. fs.writeStartOnce.Do(fs.startFlushGoroutine)
  94. fs.mu.Lock()
  95. if err := fs.out.Write(e); err != nil {
  96. fs.mu.Unlock()
  97. return err
  98. }
  99. fs.mu.Unlock()
  100. return nil
  101. }
  102. const (
  103. bufFlushDuration = 60 * time.Second
  104. )
  105. func (fs *bufWriteCloserSink) startFlushGoroutine() {
  106. fs.writeTicker = time.NewTicker(bufFlushDuration)
  107. go func() {
  108. for range fs.writeTicker.C {
  109. fs.mu.Lock()
  110. fs.buf.Flush()
  111. fs.mu.Unlock()
  112. }
  113. }()
  114. }
  115. func (fs *bufWriteCloserSink) Close() error {
  116. if fs.writeTicker != nil {
  117. fs.writeTicker.Stop()
  118. }
  119. fs.mu.Lock()
  120. fs.buf.Flush()
  121. fs.closer.Close()
  122. fs.out.Close()
  123. fs.mu.Unlock()
  124. return nil
  125. }
  126. func newBufWriteCloserSink(o io.WriteCloser) Sink {
  127. bufW := bufio.NewWriter(o)
  128. return &bufWriteCloserSink{
  129. closer: o,
  130. out: newWriterSink(bufW),
  131. buf: bufW,
  132. }
  133. }
  134. // NewTempFileSink creates a temp file and returns a Sink that writes to this
  135. // file.
  136. func NewTempFileSink() (Sink, error) {
  137. tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
  138. if err != nil {
  139. return nil, fmt.Errorf("failed to create temp file: %v", err)
  140. }
  141. return newBufWriteCloserSink(tempFile), nil
  142. }