method_logger.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog
  19. import (
  20. "net"
  21. "strings"
  22. "sync/atomic"
  23. "time"
  24. "github.com/golang/protobuf/proto"
  25. "github.com/golang/protobuf/ptypes"
  26. pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  27. "google.golang.org/grpc/grpclog"
  28. "google.golang.org/grpc/metadata"
  29. "google.golang.org/grpc/status"
  30. )
  31. type callIDGenerator struct {
  32. id uint64
  33. }
  34. func (g *callIDGenerator) next() uint64 {
  35. id := atomic.AddUint64(&g.id, 1)
  36. return id
  37. }
  38. // reset is for testing only, and doesn't need to be thread safe.
  39. func (g *callIDGenerator) reset() {
  40. g.id = 0
  41. }
  42. var idGen callIDGenerator
  43. // MethodLogger is the sub-logger for each method.
  44. type MethodLogger struct {
  45. headerMaxLen, messageMaxLen uint64
  46. callID uint64
  47. idWithinCallGen *callIDGenerator
  48. sink Sink // TODO(blog): make this plugable.
  49. }
  50. func newMethodLogger(h, m uint64) *MethodLogger {
  51. return &MethodLogger{
  52. headerMaxLen: h,
  53. messageMaxLen: m,
  54. callID: idGen.next(),
  55. idWithinCallGen: &callIDGenerator{},
  56. sink: defaultSink, // TODO(blog): make it plugable.
  57. }
  58. }
  59. // Log creates a proto binary log entry, and logs it to the sink.
  60. func (ml *MethodLogger) Log(c LogEntryConfig) {
  61. m := c.toProto()
  62. timestamp, _ := ptypes.TimestampProto(time.Now())
  63. m.Timestamp = timestamp
  64. m.CallId = ml.callID
  65. m.SequenceIdWithinCall = ml.idWithinCallGen.next()
  66. switch pay := m.Payload.(type) {
  67. case *pb.GrpcLogEntry_ClientHeader:
  68. m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
  69. case *pb.GrpcLogEntry_ServerHeader:
  70. m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
  71. case *pb.GrpcLogEntry_Message:
  72. m.PayloadTruncated = ml.truncateMessage(pay.Message)
  73. }
  74. ml.sink.Write(m)
  75. }
  76. func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
  77. if ml.headerMaxLen == maxUInt {
  78. return false
  79. }
  80. var (
  81. bytesLimit = ml.headerMaxLen
  82. index int
  83. )
  84. // At the end of the loop, index will be the first entry where the total
  85. // size is greater than the limit:
  86. //
  87. // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
  88. for ; index < len(mdPb.Entry); index++ {
  89. entry := mdPb.Entry[index]
  90. if entry.Key == "grpc-trace-bin" {
  91. // "grpc-trace-bin" is a special key. It's kept in the log entry,
  92. // but not counted towards the size limit.
  93. continue
  94. }
  95. currentEntryLen := uint64(len(entry.Value))
  96. if currentEntryLen > bytesLimit {
  97. break
  98. }
  99. bytesLimit -= currentEntryLen
  100. }
  101. truncated = index < len(mdPb.Entry)
  102. mdPb.Entry = mdPb.Entry[:index]
  103. return truncated
  104. }
  105. func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
  106. if ml.messageMaxLen == maxUInt {
  107. return false
  108. }
  109. if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
  110. return false
  111. }
  112. msgPb.Data = msgPb.Data[:ml.messageMaxLen]
  113. return true
  114. }
  115. // LogEntryConfig represents the configuration for binary log entry.
  116. type LogEntryConfig interface {
  117. toProto() *pb.GrpcLogEntry
  118. }
  119. // ClientHeader configs the binary log entry to be a ClientHeader entry.
  120. type ClientHeader struct {
  121. OnClientSide bool
  122. Header metadata.MD
  123. MethodName string
  124. Authority string
  125. Timeout time.Duration
  126. // PeerAddr is required only when it's on server side.
  127. PeerAddr net.Addr
  128. }
  129. func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
  130. // This function doesn't need to set all the fields (e.g. seq ID). The Log
  131. // function will set the fields when necessary.
  132. clientHeader := &pb.ClientHeader{
  133. Metadata: mdToMetadataProto(c.Header),
  134. MethodName: c.MethodName,
  135. Authority: c.Authority,
  136. }
  137. if c.Timeout > 0 {
  138. clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
  139. }
  140. ret := &pb.GrpcLogEntry{
  141. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
  142. Payload: &pb.GrpcLogEntry_ClientHeader{
  143. ClientHeader: clientHeader,
  144. },
  145. }
  146. if c.OnClientSide {
  147. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  148. } else {
  149. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  150. }
  151. if c.PeerAddr != nil {
  152. ret.Peer = addrToProto(c.PeerAddr)
  153. }
  154. return ret
  155. }
  156. // ServerHeader configs the binary log entry to be a ServerHeader entry.
  157. type ServerHeader struct {
  158. OnClientSide bool
  159. Header metadata.MD
  160. // PeerAddr is required only when it's on client side.
  161. PeerAddr net.Addr
  162. }
  163. func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
  164. ret := &pb.GrpcLogEntry{
  165. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
  166. Payload: &pb.GrpcLogEntry_ServerHeader{
  167. ServerHeader: &pb.ServerHeader{
  168. Metadata: mdToMetadataProto(c.Header),
  169. },
  170. },
  171. }
  172. if c.OnClientSide {
  173. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  174. } else {
  175. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  176. }
  177. if c.PeerAddr != nil {
  178. ret.Peer = addrToProto(c.PeerAddr)
  179. }
  180. return ret
  181. }
  182. // ClientMessage configs the binary log entry to be a ClientMessage entry.
  183. type ClientMessage struct {
  184. OnClientSide bool
  185. // Message can be a proto.Message or []byte. Other messages formats are not
  186. // supported.
  187. Message interface{}
  188. }
  189. func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
  190. var (
  191. data []byte
  192. err error
  193. )
  194. if m, ok := c.Message.(proto.Message); ok {
  195. data, err = proto.Marshal(m)
  196. if err != nil {
  197. grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
  198. }
  199. } else if b, ok := c.Message.([]byte); ok {
  200. data = b
  201. } else {
  202. grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
  203. }
  204. ret := &pb.GrpcLogEntry{
  205. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
  206. Payload: &pb.GrpcLogEntry_Message{
  207. Message: &pb.Message{
  208. Length: uint32(len(data)),
  209. Data: data,
  210. },
  211. },
  212. }
  213. if c.OnClientSide {
  214. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  215. } else {
  216. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  217. }
  218. return ret
  219. }
  220. // ServerMessage configs the binary log entry to be a ServerMessage entry.
  221. type ServerMessage struct {
  222. OnClientSide bool
  223. // Message can be a proto.Message or []byte. Other messages formats are not
  224. // supported.
  225. Message interface{}
  226. }
  227. func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
  228. var (
  229. data []byte
  230. err error
  231. )
  232. if m, ok := c.Message.(proto.Message); ok {
  233. data, err = proto.Marshal(m)
  234. if err != nil {
  235. grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
  236. }
  237. } else if b, ok := c.Message.([]byte); ok {
  238. data = b
  239. } else {
  240. grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
  241. }
  242. ret := &pb.GrpcLogEntry{
  243. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
  244. Payload: &pb.GrpcLogEntry_Message{
  245. Message: &pb.Message{
  246. Length: uint32(len(data)),
  247. Data: data,
  248. },
  249. },
  250. }
  251. if c.OnClientSide {
  252. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  253. } else {
  254. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  255. }
  256. return ret
  257. }
  258. // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
  259. type ClientHalfClose struct {
  260. OnClientSide bool
  261. }
  262. func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
  263. ret := &pb.GrpcLogEntry{
  264. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
  265. Payload: nil, // No payload here.
  266. }
  267. if c.OnClientSide {
  268. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  269. } else {
  270. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  271. }
  272. return ret
  273. }
  274. // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
  275. type ServerTrailer struct {
  276. OnClientSide bool
  277. Trailer metadata.MD
  278. // Err is the status error.
  279. Err error
  280. // PeerAddr is required only when it's on client side and the RPC is trailer
  281. // only.
  282. PeerAddr net.Addr
  283. }
  284. func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
  285. st, ok := status.FromError(c.Err)
  286. if !ok {
  287. grpclog.Info("binarylogging: error in trailer is not a status error")
  288. }
  289. var (
  290. detailsBytes []byte
  291. err error
  292. )
  293. stProto := st.Proto()
  294. if stProto != nil && len(stProto.Details) != 0 {
  295. detailsBytes, err = proto.Marshal(stProto)
  296. if err != nil {
  297. grpclog.Infof("binarylogging: failed to marshal status proto: %v", err)
  298. }
  299. }
  300. ret := &pb.GrpcLogEntry{
  301. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
  302. Payload: &pb.GrpcLogEntry_Trailer{
  303. Trailer: &pb.Trailer{
  304. Metadata: mdToMetadataProto(c.Trailer),
  305. StatusCode: uint32(st.Code()),
  306. StatusMessage: st.Message(),
  307. StatusDetails: detailsBytes,
  308. },
  309. },
  310. }
  311. if c.OnClientSide {
  312. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  313. } else {
  314. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  315. }
  316. if c.PeerAddr != nil {
  317. ret.Peer = addrToProto(c.PeerAddr)
  318. }
  319. return ret
  320. }
  321. // Cancel configs the binary log entry to be a Cancel entry.
  322. type Cancel struct {
  323. OnClientSide bool
  324. }
  325. func (c *Cancel) toProto() *pb.GrpcLogEntry {
  326. ret := &pb.GrpcLogEntry{
  327. Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
  328. Payload: nil,
  329. }
  330. if c.OnClientSide {
  331. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  332. } else {
  333. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  334. }
  335. return ret
  336. }
  337. // metadataKeyOmit returns whether the metadata entry with this key should be
  338. // omitted.
  339. func metadataKeyOmit(key string) bool {
  340. switch key {
  341. case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
  342. return true
  343. case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
  344. return false
  345. }
  346. if strings.HasPrefix(key, "grpc-") {
  347. return true
  348. }
  349. return false
  350. }
  351. func mdToMetadataProto(md metadata.MD) *pb.Metadata {
  352. ret := &pb.Metadata{}
  353. for k, vv := range md {
  354. if metadataKeyOmit(k) {
  355. continue
  356. }
  357. for _, v := range vv {
  358. ret.Entry = append(ret.Entry,
  359. &pb.MetadataEntry{
  360. Key: k,
  361. Value: []byte(v),
  362. },
  363. )
  364. }
  365. }
  366. return ret
  367. }
  368. func addrToProto(addr net.Addr) *pb.Address {
  369. ret := &pb.Address{}
  370. switch a := addr.(type) {
  371. case *net.TCPAddr:
  372. if a.IP.To4() != nil {
  373. ret.Type = pb.Address_TYPE_IPV4
  374. } else if a.IP.To16() != nil {
  375. ret.Type = pb.Address_TYPE_IPV6
  376. } else {
  377. ret.Type = pb.Address_TYPE_UNKNOWN
  378. // Do not set address and port fields.
  379. break
  380. }
  381. ret.Address = a.IP.String()
  382. ret.IpPort = uint32(a.Port)
  383. case *net.UnixAddr:
  384. ret.Type = pb.Address_TYPE_UNIX
  385. ret.Address = a.String()
  386. default:
  387. ret.Type = pb.Address_TYPE_UNKNOWN
  388. }
  389. return ret
  390. }