|
@@ -60,10 +60,12 @@ var (
|
|
|
|
|
|
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
|
|
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
|
|
// for use in a grpc.Dial call.
|
|
// for use in a grpc.Dial call.
|
|
|
|
+//
|
|
|
|
+// Deprecated: Use [NewClientHandler] instead.
|
|
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
|
|
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
|
|
- cfg := newConfig(opts)
|
|
|
|
|
|
+ cfg := newConfig(opts, "client")
|
|
tracer := cfg.TracerProvider.Tracer(
|
|
tracer := cfg.TracerProvider.Tracer(
|
|
- instrumentationName,
|
|
|
|
|
|
+ ScopeName,
|
|
trace.WithInstrumentationVersion(Version()),
|
|
trace.WithInstrumentationVersion(Version()),
|
|
)
|
|
)
|
|
|
|
|
|
@@ -83,11 +85,12 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
|
|
return invoker(ctx, method, req, reply, cc, callOpts...)
|
|
return invoker(ctx, method, req, reply, cc, callOpts...)
|
|
}
|
|
}
|
|
|
|
|
|
- name, attr := spanInfo(method, cc.Target())
|
|
|
|
|
|
+ name, attr, _ := telemetryAttributes(method, cc.Target())
|
|
|
|
|
|
startOpts := append([]trace.SpanStartOption{
|
|
startOpts := append([]trace.SpanStartOption{
|
|
trace.WithSpanKind(trace.SpanKindClient),
|
|
trace.WithSpanKind(trace.SpanKindClient),
|
|
- trace.WithAttributes(attr...)},
|
|
|
|
|
|
+ trace.WithAttributes(attr...),
|
|
|
|
+ },
|
|
cfg.SpanStartOptions...,
|
|
cfg.SpanStartOptions...,
|
|
)
|
|
)
|
|
|
|
|
|
@@ -122,27 +125,13 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-type streamEventType int
|
|
|
|
-
|
|
|
|
-type streamEvent struct {
|
|
|
|
- Type streamEventType
|
|
|
|
- Err error
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-const (
|
|
|
|
- receiveEndEvent streamEventType = iota
|
|
|
|
- errorEvent
|
|
|
|
-)
|
|
|
|
-
|
|
|
|
// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
|
|
// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
|
|
// SendMsg method call.
|
|
// SendMsg method call.
|
|
type clientStream struct {
|
|
type clientStream struct {
|
|
grpc.ClientStream
|
|
grpc.ClientStream
|
|
|
|
+ desc *grpc.StreamDesc
|
|
|
|
|
|
- desc *grpc.StreamDesc
|
|
|
|
- events chan streamEvent
|
|
|
|
- eventsDone chan struct{}
|
|
|
|
- finished chan error
|
|
|
|
|
|
+ span trace.Span
|
|
|
|
|
|
receivedEvent bool
|
|
receivedEvent bool
|
|
sentEvent bool
|
|
sentEvent bool
|
|
@@ -157,11 +146,11 @@ func (w *clientStream) RecvMsg(m interface{}) error {
|
|
err := w.ClientStream.RecvMsg(m)
|
|
err := w.ClientStream.RecvMsg(m)
|
|
|
|
|
|
if err == nil && !w.desc.ServerStreams {
|
|
if err == nil && !w.desc.ServerStreams {
|
|
- w.sendStreamEvent(receiveEndEvent, nil)
|
|
|
|
|
|
+ w.endSpan(nil)
|
|
} else if err == io.EOF {
|
|
} else if err == io.EOF {
|
|
- w.sendStreamEvent(receiveEndEvent, nil)
|
|
|
|
|
|
+ w.endSpan(nil)
|
|
} else if err != nil {
|
|
} else if err != nil {
|
|
- w.sendStreamEvent(errorEvent, err)
|
|
|
|
|
|
+ w.endSpan(err)
|
|
} else {
|
|
} else {
|
|
w.receivedMessageID++
|
|
w.receivedMessageID++
|
|
|
|
|
|
@@ -183,7 +172,7 @@ func (w *clientStream) SendMsg(m interface{}) error {
|
|
}
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
- w.sendStreamEvent(errorEvent, err)
|
|
|
|
|
|
+ w.endSpan(err)
|
|
}
|
|
}
|
|
|
|
|
|
return err
|
|
return err
|
|
@@ -191,9 +180,8 @@ func (w *clientStream) SendMsg(m interface{}) error {
|
|
|
|
|
|
func (w *clientStream) Header() (metadata.MD, error) {
|
|
func (w *clientStream) Header() (metadata.MD, error) {
|
|
md, err := w.ClientStream.Header()
|
|
md, err := w.ClientStream.Header()
|
|
-
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
- w.sendStreamEvent(errorEvent, err)
|
|
|
|
|
|
+ w.endSpan(err)
|
|
}
|
|
}
|
|
|
|
|
|
return md, err
|
|
return md, err
|
|
@@ -201,64 +189,43 @@ func (w *clientStream) Header() (metadata.MD, error) {
|
|
|
|
|
|
func (w *clientStream) CloseSend() error {
|
|
func (w *clientStream) CloseSend() error {
|
|
err := w.ClientStream.CloseSend()
|
|
err := w.ClientStream.CloseSend()
|
|
-
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
- w.sendStreamEvent(errorEvent, err)
|
|
|
|
|
|
+ w.endSpan(err)
|
|
}
|
|
}
|
|
|
|
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
-func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream {
|
|
|
|
- events := make(chan streamEvent)
|
|
|
|
- eventsDone := make(chan struct{})
|
|
|
|
- finished := make(chan error)
|
|
|
|
-
|
|
|
|
- go func() {
|
|
|
|
- defer close(eventsDone)
|
|
|
|
-
|
|
|
|
- for {
|
|
|
|
- select {
|
|
|
|
- case event := <-events:
|
|
|
|
- switch event.Type {
|
|
|
|
- case receiveEndEvent:
|
|
|
|
- finished <- nil
|
|
|
|
- return
|
|
|
|
- case errorEvent:
|
|
|
|
- finished <- event.Err
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- case <-ctx.Done():
|
|
|
|
- finished <- ctx.Err()
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
|
|
+func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream {
|
|
return &clientStream{
|
|
return &clientStream{
|
|
ClientStream: s,
|
|
ClientStream: s,
|
|
|
|
+ span: span,
|
|
desc: desc,
|
|
desc: desc,
|
|
- events: events,
|
|
|
|
- eventsDone: eventsDone,
|
|
|
|
- finished: finished,
|
|
|
|
receivedEvent: cfg.ReceivedEvent,
|
|
receivedEvent: cfg.ReceivedEvent,
|
|
sentEvent: cfg.SentEvent,
|
|
sentEvent: cfg.SentEvent,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
|
|
|
|
- select {
|
|
|
|
- case <-w.eventsDone:
|
|
|
|
- case w.events <- streamEvent{Type: eventType, Err: err}:
|
|
|
|
|
|
+func (w *clientStream) endSpan(err error) {
|
|
|
|
+ if err != nil {
|
|
|
|
+ s, _ := status.FromError(err)
|
|
|
|
+ w.span.SetStatus(codes.Error, s.Message())
|
|
|
|
+ w.span.SetAttributes(statusCodeAttr(s.Code()))
|
|
|
|
+ } else {
|
|
|
|
+ w.span.SetAttributes(statusCodeAttr(grpc_codes.OK))
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ w.span.End()
|
|
}
|
|
}
|
|
|
|
|
|
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
|
|
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
|
|
// for use in a grpc.Dial call.
|
|
// for use in a grpc.Dial call.
|
|
|
|
+//
|
|
|
|
+// Deprecated: Use [NewClientHandler] instead.
|
|
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
|
|
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
|
|
- cfg := newConfig(opts)
|
|
|
|
|
|
+ cfg := newConfig(opts, "client")
|
|
tracer := cfg.TracerProvider.Tracer(
|
|
tracer := cfg.TracerProvider.Tracer(
|
|
- instrumentationName,
|
|
|
|
|
|
+ ScopeName,
|
|
trace.WithInstrumentationVersion(Version()),
|
|
trace.WithInstrumentationVersion(Version()),
|
|
)
|
|
)
|
|
|
|
|
|
@@ -278,11 +245,12 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
|
|
return streamer(ctx, desc, cc, method, callOpts...)
|
|
return streamer(ctx, desc, cc, method, callOpts...)
|
|
}
|
|
}
|
|
|
|
|
|
- name, attr := spanInfo(method, cc.Target())
|
|
|
|
|
|
+ name, attr, _ := telemetryAttributes(method, cc.Target())
|
|
|
|
|
|
startOpts := append([]trace.SpanStartOption{
|
|
startOpts := append([]trace.SpanStartOption{
|
|
trace.WithSpanKind(trace.SpanKindClient),
|
|
trace.WithSpanKind(trace.SpanKindClient),
|
|
- trace.WithAttributes(attr...)},
|
|
|
|
|
|
+ trace.WithAttributes(attr...),
|
|
|
|
+ },
|
|
cfg.SpanStartOptions...,
|
|
cfg.SpanStartOptions...,
|
|
)
|
|
)
|
|
|
|
|
|
@@ -302,32 +270,19 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
|
|
span.End()
|
|
span.End()
|
|
return s, err
|
|
return s, err
|
|
}
|
|
}
|
|
- stream := wrapClientStream(ctx, s, desc, cfg)
|
|
|
|
-
|
|
|
|
- go func() {
|
|
|
|
- err := <-stream.finished
|
|
|
|
-
|
|
|
|
- if err != nil {
|
|
|
|
- s, _ := status.FromError(err)
|
|
|
|
- span.SetStatus(codes.Error, s.Message())
|
|
|
|
- span.SetAttributes(statusCodeAttr(s.Code()))
|
|
|
|
- } else {
|
|
|
|
- span.SetAttributes(statusCodeAttr(grpc_codes.OK))
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- span.End()
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
|
|
+ stream := wrapClientStream(ctx, s, desc, span, cfg)
|
|
return stream, nil
|
|
return stream, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
|
|
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
|
|
// for use in a grpc.NewServer call.
|
|
// for use in a grpc.NewServer call.
|
|
|
|
+//
|
|
|
|
+// Deprecated: Use [NewServerHandler] instead.
|
|
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
|
|
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
|
|
- cfg := newConfig(opts)
|
|
|
|
|
|
+ cfg := newConfig(opts, "server")
|
|
tracer := cfg.TracerProvider.Tracer(
|
|
tracer := cfg.TracerProvider.Tracer(
|
|
- instrumentationName,
|
|
|
|
|
|
+ ScopeName,
|
|
trace.WithInstrumentationVersion(Version()),
|
|
trace.WithInstrumentationVersion(Version()),
|
|
)
|
|
)
|
|
|
|
|
|
@@ -346,11 +301,12 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
|
|
}
|
|
}
|
|
|
|
|
|
ctx = extract(ctx, cfg.Propagators)
|
|
ctx = extract(ctx, cfg.Propagators)
|
|
- name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
|
|
|
|
|
|
+ name, attr, metricAttrs := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
|
|
|
|
|
|
startOpts := append([]trace.SpanStartOption{
|
|
startOpts := append([]trace.SpanStartOption{
|
|
trace.WithSpanKind(trace.SpanKindServer),
|
|
trace.WithSpanKind(trace.SpanKindServer),
|
|
- trace.WithAttributes(attr...)},
|
|
|
|
|
|
+ trace.WithAttributes(attr...),
|
|
|
|
+ },
|
|
cfg.SpanStartOptions...,
|
|
cfg.SpanStartOptions...,
|
|
)
|
|
)
|
|
|
|
|
|
@@ -365,30 +321,30 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
|
|
messageReceived.Event(ctx, 1, req)
|
|
messageReceived.Event(ctx, 1, req)
|
|
}
|
|
}
|
|
|
|
|
|
- var statusCode grpc_codes.Code
|
|
|
|
- defer func(t time.Time) {
|
|
|
|
- elapsedTime := time.Since(t) / time.Millisecond
|
|
|
|
- attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode)))
|
|
|
|
- o := metric.WithAttributes(attr...)
|
|
|
|
- cfg.rpcServerDuration.Record(ctx, int64(elapsedTime), o)
|
|
|
|
- }(time.Now())
|
|
|
|
|
|
+ before := time.Now()
|
|
|
|
|
|
resp, err := handler(ctx, req)
|
|
resp, err := handler(ctx, req)
|
|
|
|
+
|
|
|
|
+ s, _ := status.FromError(err)
|
|
if err != nil {
|
|
if err != nil {
|
|
- s, _ := status.FromError(err)
|
|
|
|
statusCode, msg := serverStatus(s)
|
|
statusCode, msg := serverStatus(s)
|
|
span.SetStatus(statusCode, msg)
|
|
span.SetStatus(statusCode, msg)
|
|
- span.SetAttributes(statusCodeAttr(s.Code()))
|
|
|
|
if cfg.SentEvent {
|
|
if cfg.SentEvent {
|
|
messageSent.Event(ctx, 1, s.Proto())
|
|
messageSent.Event(ctx, 1, s.Proto())
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- statusCode = grpc_codes.OK
|
|
|
|
- span.SetAttributes(statusCodeAttr(grpc_codes.OK))
|
|
|
|
if cfg.SentEvent {
|
|
if cfg.SentEvent {
|
|
messageSent.Event(ctx, 1, resp)
|
|
messageSent.Event(ctx, 1, resp)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ grpcStatusCodeAttr := statusCodeAttr(s.Code())
|
|
|
|
+ span.SetAttributes(grpcStatusCodeAttr)
|
|
|
|
+
|
|
|
|
+ // Use floating point division here for higher precision (instead of Millisecond method).
|
|
|
|
+ elapsedTime := float64(time.Since(before)) / float64(time.Millisecond)
|
|
|
|
+
|
|
|
|
+ metricAttrs = append(metricAttrs, grpcStatusCodeAttr)
|
|
|
|
+ cfg.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(metricAttrs...))
|
|
|
|
|
|
return resp, err
|
|
return resp, err
|
|
}
|
|
}
|
|
@@ -446,10 +402,12 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *s
|
|
|
|
|
|
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
|
|
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
|
|
// for use in a grpc.NewServer call.
|
|
// for use in a grpc.NewServer call.
|
|
|
|
+//
|
|
|
|
+// Deprecated: Use [NewServerHandler] instead.
|
|
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
|
|
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
|
|
- cfg := newConfig(opts)
|
|
|
|
|
|
+ cfg := newConfig(opts, "server")
|
|
tracer := cfg.TracerProvider.Tracer(
|
|
tracer := cfg.TracerProvider.Tracer(
|
|
- instrumentationName,
|
|
|
|
|
|
+ ScopeName,
|
|
trace.WithInstrumentationVersion(Version()),
|
|
trace.WithInstrumentationVersion(Version()),
|
|
)
|
|
)
|
|
|
|
|
|
@@ -469,11 +427,12 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
|
|
}
|
|
}
|
|
|
|
|
|
ctx = extract(ctx, cfg.Propagators)
|
|
ctx = extract(ctx, cfg.Propagators)
|
|
- name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
|
|
|
|
|
|
+ name, attr, _ := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
|
|
|
|
|
|
startOpts := append([]trace.SpanStartOption{
|
|
startOpts := append([]trace.SpanStartOption{
|
|
trace.WithSpanKind(trace.SpanKindServer),
|
|
trace.WithSpanKind(trace.SpanKindServer),
|
|
- trace.WithAttributes(attr...)},
|
|
|
|
|
|
+ trace.WithAttributes(attr...),
|
|
|
|
+ },
|
|
cfg.SpanStartOptions...,
|
|
cfg.SpanStartOptions...,
|
|
)
|
|
)
|
|
|
|
|
|
@@ -498,17 +457,18 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-// spanInfo returns a span name and all appropriate attributes from the gRPC
|
|
|
|
-// method and peer address.
|
|
|
|
-func spanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) {
|
|
|
|
- name, mAttrs := internal.ParseFullMethod(fullMethod)
|
|
|
|
|
|
+// telemetryAttributes returns a span name and span and metric attributes from
|
|
|
|
+// the gRPC method and peer address.
|
|
|
|
+func telemetryAttributes(fullMethod, peerAddress string) (string, []attribute.KeyValue, []attribute.KeyValue) {
|
|
|
|
+ name, methodAttrs := internal.ParseFullMethod(fullMethod)
|
|
peerAttrs := peerAttr(peerAddress)
|
|
peerAttrs := peerAttr(peerAddress)
|
|
|
|
|
|
- attrs := make([]attribute.KeyValue, 0, 1+len(mAttrs)+len(peerAttrs))
|
|
|
|
|
|
+ attrs := make([]attribute.KeyValue, 0, 1+len(methodAttrs)+len(peerAttrs))
|
|
attrs = append(attrs, RPCSystemGRPC)
|
|
attrs = append(attrs, RPCSystemGRPC)
|
|
- attrs = append(attrs, mAttrs...)
|
|
|
|
|
|
+ attrs = append(attrs, methodAttrs...)
|
|
|
|
+ metricAttrs := attrs[:1+len(methodAttrs)]
|
|
attrs = append(attrs, peerAttrs...)
|
|
attrs = append(attrs, peerAttrs...)
|
|
- return name, attrs
|
|
|
|
|
|
+ return name, attrs, metricAttrs
|
|
}
|
|
}
|
|
|
|
|
|
// peerAttr returns attributes about the peer address.
|
|
// peerAttr returns attributes about the peer address.
|