method_logger.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog
  19. import (
  20. "net"
  21. "strings"
  22. "sync/atomic"
  23. "time"
  24. "github.com/golang/protobuf/proto"
  25. "github.com/golang/protobuf/ptypes"
  26. pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  27. "google.golang.org/grpc/metadata"
  28. "google.golang.org/grpc/status"
  29. )
  30. type callIDGenerator struct {
  31. id uint64
  32. }
  33. func (g *callIDGenerator) next() uint64 {
  34. id := atomic.AddUint64(&g.id, 1)
  35. return id
  36. }
  37. // reset is for testing only, and doesn't need to be thread safe.
  38. func (g *callIDGenerator) reset() {
  39. g.id = 0
  40. }
  41. var idGen callIDGenerator
  42. // MethodLogger is the sub-logger for each method.
  43. type MethodLogger interface {
  44. Log(LogEntryConfig)
  45. }
  46. type methodLogger struct {
  47. headerMaxLen, messageMaxLen uint64
  48. callID uint64
  49. idWithinCallGen *callIDGenerator
  50. sink Sink // TODO(blog): make this plugable.
  51. }
  52. func newMethodLogger(h, m uint64) *methodLogger {
  53. return &methodLogger{
  54. headerMaxLen: h,
  55. messageMaxLen: m,
  56. callID: idGen.next(),
  57. idWithinCallGen: &callIDGenerator{},
  58. sink: DefaultSink, // TODO(blog): make it plugable.
  59. }
  60. }
  61. // Build is an internal only method for building the proto message out of the
  62. // input event. It's made public to enable other library to reuse as much logic
  63. // in methodLogger as possible.
  64. func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
  65. m := c.toProto()
  66. timestamp, _ := ptypes.TimestampProto(time.Now())
  67. m.Timestamp = timestamp
  68. m.CallId = ml.callID
  69. m.SequenceIdWithinCall = ml.idWithinCallGen.next()
  70. switch pay := m.Payload.(type) {
  71. case *pb.GrpcLogEntry_ClientHeader:
  72. m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
  73. case *pb.GrpcLogEntry_ServerHeader:
  74. m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
  75. case *pb.GrpcLogEntry_Message:
  76. m.PayloadTruncated = ml.truncateMessage(pay.Message)
  77. }
  78. return m
  79. }
  80. // Log creates a proto binary log entry, and logs it to the sink.
  81. func (ml *methodLogger) Log(c LogEntryConfig) {
  82. ml.sink.Write(ml.Build(c))
  83. }
  84. func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
  85. if ml.headerMaxLen == maxUInt {
  86. return false
  87. }
  88. var (
  89. bytesLimit = ml.headerMaxLen
  90. index int
  91. )
  92. // At the end of the loop, index will be the first entry where the total
  93. // size is greater than the limit:
  94. //
  95. // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
  96. for ; index < len(mdPb.Entry); index++ {
  97. entry := mdPb.Entry[index]
  98. if entry.Key == "grpc-trace-bin" {
  99. // "grpc-trace-bin" is a special key. It's kept in the log entry,
  100. // but not counted towards the size limit.
  101. continue
  102. }
  103. currentEntryLen := uint64(len(entry.Value))
  104. if currentEntryLen > bytesLimit {
  105. break
  106. }
  107. bytesLimit -= currentEntryLen
  108. }
  109. truncated = index < len(mdPb.Entry)
  110. mdPb.Entry = mdPb.Entry[:index]
  111. return truncated
  112. }
  113. func (ml *methodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
  114. if ml.messageMaxLen == maxUInt {
  115. return false
  116. }
  117. if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
  118. return false
  119. }
  120. msgPb.Data = msgPb.Data[:ml.messageMaxLen]
  121. return true
  122. }
  123. // LogEntryConfig represents the configuration for binary log entry.
  124. type LogEntryConfig interface {
  125. toProto() *pb.GrpcLogEntry
  126. }
  127. // ClientHeader configs the binary log entry to be a ClientHeader entry.
  128. type ClientHeader struct {
  129. OnClientSide bool
  130. Header metadata.MD
  131. MethodName string
  132. Authority string
  133. Timeout time.Duration
  134. // PeerAddr is required only when it's on server side.
  135. PeerAddr net.Addr
  136. }
  137. func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
  138. // This function doesn't need to set all the fields (e.g. seq ID). The Log
  139. // function will set the fields when necessary.
  140. clientHeader := &pb.ClientHeader{
  141. Metadata: mdToMetadataProto(c.Header),
  142. MethodName: c.MethodName,
  143. Authority: c.Authority,
  144. }
  145. if c.Timeout > 0 {
  146. clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
  147. }
  148. ret := &pb.GrpcLogEntry{
  149. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
  150. Payload: &pb.GrpcLogEntry_ClientHeader{
  151. ClientHeader: clientHeader,
  152. },
  153. }
  154. if c.OnClientSide {
  155. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  156. } else {
  157. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  158. }
  159. if c.PeerAddr != nil {
  160. ret.Peer = addrToProto(c.PeerAddr)
  161. }
  162. return ret
  163. }
  164. // ServerHeader configs the binary log entry to be a ServerHeader entry.
  165. type ServerHeader struct {
  166. OnClientSide bool
  167. Header metadata.MD
  168. // PeerAddr is required only when it's on client side.
  169. PeerAddr net.Addr
  170. }
  171. func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
  172. ret := &pb.GrpcLogEntry{
  173. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
  174. Payload: &pb.GrpcLogEntry_ServerHeader{
  175. ServerHeader: &pb.ServerHeader{
  176. Metadata: mdToMetadataProto(c.Header),
  177. },
  178. },
  179. }
  180. if c.OnClientSide {
  181. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  182. } else {
  183. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  184. }
  185. if c.PeerAddr != nil {
  186. ret.Peer = addrToProto(c.PeerAddr)
  187. }
  188. return ret
  189. }
  190. // ClientMessage configs the binary log entry to be a ClientMessage entry.
  191. type ClientMessage struct {
  192. OnClientSide bool
  193. // Message can be a proto.Message or []byte. Other messages formats are not
  194. // supported.
  195. Message interface{}
  196. }
  197. func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
  198. var (
  199. data []byte
  200. err error
  201. )
  202. if m, ok := c.Message.(proto.Message); ok {
  203. data, err = proto.Marshal(m)
  204. if err != nil {
  205. grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
  206. }
  207. } else if b, ok := c.Message.([]byte); ok {
  208. data = b
  209. } else {
  210. grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
  211. }
  212. ret := &pb.GrpcLogEntry{
  213. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
  214. Payload: &pb.GrpcLogEntry_Message{
  215. Message: &pb.Message{
  216. Length: uint32(len(data)),
  217. Data: data,
  218. },
  219. },
  220. }
  221. if c.OnClientSide {
  222. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  223. } else {
  224. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  225. }
  226. return ret
  227. }
  228. // ServerMessage configs the binary log entry to be a ServerMessage entry.
  229. type ServerMessage struct {
  230. OnClientSide bool
  231. // Message can be a proto.Message or []byte. Other messages formats are not
  232. // supported.
  233. Message interface{}
  234. }
  235. func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
  236. var (
  237. data []byte
  238. err error
  239. )
  240. if m, ok := c.Message.(proto.Message); ok {
  241. data, err = proto.Marshal(m)
  242. if err != nil {
  243. grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
  244. }
  245. } else if b, ok := c.Message.([]byte); ok {
  246. data = b
  247. } else {
  248. grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
  249. }
  250. ret := &pb.GrpcLogEntry{
  251. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
  252. Payload: &pb.GrpcLogEntry_Message{
  253. Message: &pb.Message{
  254. Length: uint32(len(data)),
  255. Data: data,
  256. },
  257. },
  258. }
  259. if c.OnClientSide {
  260. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  261. } else {
  262. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  263. }
  264. return ret
  265. }
  266. // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
  267. type ClientHalfClose struct {
  268. OnClientSide bool
  269. }
  270. func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
  271. ret := &pb.GrpcLogEntry{
  272. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
  273. Payload: nil, // No payload here.
  274. }
  275. if c.OnClientSide {
  276. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  277. } else {
  278. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  279. }
  280. return ret
  281. }
  282. // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
  283. type ServerTrailer struct {
  284. OnClientSide bool
  285. Trailer metadata.MD
  286. // Err is the status error.
  287. Err error
  288. // PeerAddr is required only when it's on client side and the RPC is trailer
  289. // only.
  290. PeerAddr net.Addr
  291. }
  292. func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
  293. st, ok := status.FromError(c.Err)
  294. if !ok {
  295. grpclogLogger.Info("binarylogging: error in trailer is not a status error")
  296. }
  297. var (
  298. detailsBytes []byte
  299. err error
  300. )
  301. stProto := st.Proto()
  302. if stProto != nil && len(stProto.Details) != 0 {
  303. detailsBytes, err = proto.Marshal(stProto)
  304. if err != nil {
  305. grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
  306. }
  307. }
  308. ret := &pb.GrpcLogEntry{
  309. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
  310. Payload: &pb.GrpcLogEntry_Trailer{
  311. Trailer: &pb.Trailer{
  312. Metadata: mdToMetadataProto(c.Trailer),
  313. StatusCode: uint32(st.Code()),
  314. StatusMessage: st.Message(),
  315. StatusDetails: detailsBytes,
  316. },
  317. },
  318. }
  319. if c.OnClientSide {
  320. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  321. } else {
  322. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  323. }
  324. if c.PeerAddr != nil {
  325. ret.Peer = addrToProto(c.PeerAddr)
  326. }
  327. return ret
  328. }
  329. // Cancel configs the binary log entry to be a Cancel entry.
  330. type Cancel struct {
  331. OnClientSide bool
  332. }
  333. func (c *Cancel) toProto() *pb.GrpcLogEntry {
  334. ret := &pb.GrpcLogEntry{
  335. Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
  336. Payload: nil,
  337. }
  338. if c.OnClientSide {
  339. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  340. } else {
  341. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  342. }
  343. return ret
  344. }
  345. // metadataKeyOmit returns whether the metadata entry with this key should be
  346. // omitted.
  347. func metadataKeyOmit(key string) bool {
  348. switch key {
  349. case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
  350. return true
  351. case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
  352. return false
  353. }
  354. return strings.HasPrefix(key, "grpc-")
  355. }
  356. func mdToMetadataProto(md metadata.MD) *pb.Metadata {
  357. ret := &pb.Metadata{}
  358. for k, vv := range md {
  359. if metadataKeyOmit(k) {
  360. continue
  361. }
  362. for _, v := range vv {
  363. ret.Entry = append(ret.Entry,
  364. &pb.MetadataEntry{
  365. Key: k,
  366. Value: []byte(v),
  367. },
  368. )
  369. }
  370. }
  371. return ret
  372. }
  373. func addrToProto(addr net.Addr) *pb.Address {
  374. ret := &pb.Address{}
  375. switch a := addr.(type) {
  376. case *net.TCPAddr:
  377. if a.IP.To4() != nil {
  378. ret.Type = pb.Address_TYPE_IPV4
  379. } else if a.IP.To16() != nil {
  380. ret.Type = pb.Address_TYPE_IPV6
  381. } else {
  382. ret.Type = pb.Address_TYPE_UNKNOWN
  383. // Do not set address and port fields.
  384. break
  385. }
  386. ret.Address = a.IP.String()
  387. ret.IpPort = uint32(a.Port)
  388. case *net.UnixAddr:
  389. ret.Type = pb.Address_TYPE_UNIX
  390. ret.Address = a.String()
  391. default:
  392. ret.Type = pb.Address_TYPE_UNKNOWN
  393. }
  394. return ret
  395. }