123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- /*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package binarylog
- import (
- "net"
- "strings"
- "sync/atomic"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
- pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- )
- type callIDGenerator struct {
- id uint64
- }
- func (g *callIDGenerator) next() uint64 {
- id := atomic.AddUint64(&g.id, 1)
- return id
- }
- // reset is for testing only, and doesn't need to be thread safe.
- func (g *callIDGenerator) reset() {
- g.id = 0
- }
- var idGen callIDGenerator
- // MethodLogger is the sub-logger for each method.
- type MethodLogger struct {
- headerMaxLen, messageMaxLen uint64
- callID uint64
- idWithinCallGen *callIDGenerator
- sink Sink // TODO(blog): make this plugable.
- }
- func newMethodLogger(h, m uint64) *MethodLogger {
- return &MethodLogger{
- headerMaxLen: h,
- messageMaxLen: m,
- callID: idGen.next(),
- idWithinCallGen: &callIDGenerator{},
- sink: defaultSink, // TODO(blog): make it plugable.
- }
- }
- // Log creates a proto binary log entry, and logs it to the sink.
- func (ml *MethodLogger) Log(c LogEntryConfig) {
- m := c.toProto()
- timestamp, _ := ptypes.TimestampProto(time.Now())
- m.Timestamp = timestamp
- m.CallId = ml.callID
- m.SequenceIdWithinCall = ml.idWithinCallGen.next()
- switch pay := m.Payload.(type) {
- case *pb.GrpcLogEntry_ClientHeader:
- m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
- case *pb.GrpcLogEntry_ServerHeader:
- m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
- case *pb.GrpcLogEntry_Message:
- m.PayloadTruncated = ml.truncateMessage(pay.Message)
- }
- ml.sink.Write(m)
- }
- func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
- if ml.headerMaxLen == maxUInt {
- return false
- }
- var (
- bytesLimit = ml.headerMaxLen
- index int
- )
- // At the end of the loop, index will be the first entry where the total
- // size is greater than the limit:
- //
- // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
- for ; index < len(mdPb.Entry); index++ {
- entry := mdPb.Entry[index]
- if entry.Key == "grpc-trace-bin" {
- // "grpc-trace-bin" is a special key. It's kept in the log entry,
- // but not counted towards the size limit.
- continue
- }
- currentEntryLen := uint64(len(entry.Value))
- if currentEntryLen > bytesLimit {
- break
- }
- bytesLimit -= currentEntryLen
- }
- truncated = index < len(mdPb.Entry)
- mdPb.Entry = mdPb.Entry[:index]
- return truncated
- }
- func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
- if ml.messageMaxLen == maxUInt {
- return false
- }
- if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
- return false
- }
- msgPb.Data = msgPb.Data[:ml.messageMaxLen]
- return true
- }
- // LogEntryConfig represents the configuration for binary log entry.
- type LogEntryConfig interface {
- toProto() *pb.GrpcLogEntry
- }
- // ClientHeader configs the binary log entry to be a ClientHeader entry.
- type ClientHeader struct {
- OnClientSide bool
- Header metadata.MD
- MethodName string
- Authority string
- Timeout time.Duration
- // PeerAddr is required only when it's on server side.
- PeerAddr net.Addr
- }
- func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
- // This function doesn't need to set all the fields (e.g. seq ID). The Log
- // function will set the fields when necessary.
- clientHeader := &pb.ClientHeader{
- Metadata: mdToMetadataProto(c.Header),
- MethodName: c.MethodName,
- Authority: c.Authority,
- }
- if c.Timeout > 0 {
- clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
- }
- ret := &pb.GrpcLogEntry{
- Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
- Payload: &pb.GrpcLogEntry_ClientHeader{
- ClientHeader: clientHeader,
- },
- }
- if c.OnClientSide {
- ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
- } else {
- ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
- }
- if c.PeerAddr != nil {
- ret.Peer = addrToProto(c.PeerAddr)
- }
- return ret
- }
- // ServerHeader configs the binary log entry to be a ServerHeader entry.
- type ServerHeader struct {
- OnClientSide bool
- Header metadata.MD
- // PeerAddr is required only when it's on client side.
- PeerAddr net.Addr
- }
- func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
- ret := &pb.GrpcLogEntry{
- Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
- Payload: &pb.GrpcLogEntry_ServerHeader{
- ServerHeader: &pb.ServerHeader{
- Metadata: mdToMetadataProto(c.Header),
- },
- },
- }
- if c.OnClientSide {
- ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
- } else {
- ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
- }
- if c.PeerAddr != nil {
- ret.Peer = addrToProto(c.PeerAddr)
- }
- return ret
- }
- // ClientMessage configs the binary log entry to be a ClientMessage entry.
- type ClientMessage struct {
- OnClientSide bool
- // Message can be a proto.Message or []byte. Other messages formats are not
- // supported.
- Message interface{}
- }
- func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
- var (
- data []byte
- err error
- )
- if m, ok := c.Message.(proto.Message); ok {
- data, err = proto.Marshal(m)
- if err != nil {
- grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
- }
- } else if b, ok := c.Message.([]byte); ok {
- data = b
- } else {
- grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
- }
- ret := &pb.GrpcLogEntry{
- Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
- Payload: &pb.GrpcLogEntry_Message{
- Message: &pb.Message{
- Length: uint32(len(data)),
- Data: data,
- },
- },
- }
- if c.OnClientSide {
- ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
- } else {
- ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
- }
- return ret
- }
- // ServerMessage configs the binary log entry to be a ServerMessage entry.
- type ServerMessage struct {
- OnClientSide bool
- // Message can be a proto.Message or []byte. Other messages formats are not
- // supported.
- Message interface{}
- }
- func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
- var (
- data []byte
- err error
- )
- if m, ok := c.Message.(proto.Message); ok {
- data, err = proto.Marshal(m)
- if err != nil {
- grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
- }
- } else if b, ok := c.Message.([]byte); ok {
- data = b
- } else {
- grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
- }
- ret := &pb.GrpcLogEntry{
- Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
- Payload: &pb.GrpcLogEntry_Message{
- Message: &pb.Message{
- Length: uint32(len(data)),
- Data: data,
- },
- },
- }
- if c.OnClientSide {
- ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
- } else {
- ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
- }
- return ret
- }
- // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
- type ClientHalfClose struct {
- OnClientSide bool
- }
- func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
- ret := &pb.GrpcLogEntry{
- Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
- Payload: nil, // No payload here.
- }
- if c.OnClientSide {
- ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
- } else {
- ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
- }
- return ret
- }
- // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
- type ServerTrailer struct {
- OnClientSide bool
- Trailer metadata.MD
- // Err is the status error.
- Err error
- // PeerAddr is required only when it's on client side and the RPC is trailer
- // only.
- PeerAddr net.Addr
- }
- func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
- st, ok := status.FromError(c.Err)
- if !ok {
- grpclog.Info("binarylogging: error in trailer is not a status error")
- }
- var (
- detailsBytes []byte
- err error
- )
- stProto := st.Proto()
- if stProto != nil && len(stProto.Details) != 0 {
- detailsBytes, err = proto.Marshal(stProto)
- if err != nil {
- grpclog.Infof("binarylogging: failed to marshal status proto: %v", err)
- }
- }
- ret := &pb.GrpcLogEntry{
- Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
- Payload: &pb.GrpcLogEntry_Trailer{
- Trailer: &pb.Trailer{
- Metadata: mdToMetadataProto(c.Trailer),
- StatusCode: uint32(st.Code()),
- StatusMessage: st.Message(),
- StatusDetails: detailsBytes,
- },
- },
- }
- if c.OnClientSide {
- ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
- } else {
- ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
- }
- if c.PeerAddr != nil {
- ret.Peer = addrToProto(c.PeerAddr)
- }
- return ret
- }
- // Cancel configs the binary log entry to be a Cancel entry.
- type Cancel struct {
- OnClientSide bool
- }
- func (c *Cancel) toProto() *pb.GrpcLogEntry {
- ret := &pb.GrpcLogEntry{
- Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
- Payload: nil,
- }
- if c.OnClientSide {
- ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
- } else {
- ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
- }
- return ret
- }
- // metadataKeyOmit returns whether the metadata entry with this key should be
- // omitted.
- func metadataKeyOmit(key string) bool {
- switch key {
- case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
- return true
- case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
- return false
- }
- if strings.HasPrefix(key, "grpc-") {
- return true
- }
- return false
- }
- func mdToMetadataProto(md metadata.MD) *pb.Metadata {
- ret := &pb.Metadata{}
- for k, vv := range md {
- if metadataKeyOmit(k) {
- continue
- }
- for _, v := range vv {
- ret.Entry = append(ret.Entry,
- &pb.MetadataEntry{
- Key: k,
- Value: []byte(v),
- },
- )
- }
- }
- return ret
- }
- func addrToProto(addr net.Addr) *pb.Address {
- ret := &pb.Address{}
- switch a := addr.(type) {
- case *net.TCPAddr:
- if a.IP.To4() != nil {
- ret.Type = pb.Address_TYPE_IPV4
- } else if a.IP.To16() != nil {
- ret.Type = pb.Address_TYPE_IPV6
- } else {
- ret.Type = pb.Address_TYPE_UNKNOWN
- // Do not set address and port fields.
- break
- }
- ret.Address = a.IP.String()
- ret.IpPort = uint32(a.Port)
- case *net.UnixAddr:
- ret.Type = pb.Address_TYPE_UNIX
- ret.Address = a.String()
- default:
- ret.Type = pb.Address_TYPE_UNKNOWN
- }
- return ret
- }
|