123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- package otgrpc
- import (
- "github.com/opentracing/opentracing-go"
- "github.com/opentracing/opentracing-go/ext"
- "github.com/opentracing/opentracing-go/log"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
- )
- // OpenTracingServerInterceptor returns a grpc.UnaryServerInterceptor suitable
- // for use in a grpc.NewServer call.
- //
- // For example:
- //
- // s := grpc.NewServer(
- // ..., // (existing ServerOptions)
- // grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))
- //
- // All gRPC server spans will look for an OpenTracing SpanContext in the gRPC
- // metadata; if found, the server span will act as the ChildOf that RPC
- // SpanContext.
- //
- // Root or not, the server Span will be embedded in the context.Context for the
- // application-specific gRPC handler(s) to access.
- func OpenTracingServerInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryServerInterceptor {
- otgrpcOpts := newOptions()
- otgrpcOpts.apply(optFuncs...)
- return func(
- ctx context.Context,
- req interface{},
- info *grpc.UnaryServerInfo,
- handler grpc.UnaryHandler,
- ) (resp interface{}, err error) {
- spanContext, err := extractSpanContext(ctx, tracer)
- if err != nil && err != opentracing.ErrSpanContextNotFound {
- // TODO: establish some sort of error reporting mechanism here. We
- // don't know where to put such an error and must rely on Tracer
- // implementations to do something appropriate for the time being.
- }
- if otgrpcOpts.inclusionFunc != nil &&
- !otgrpcOpts.inclusionFunc(spanContext, info.FullMethod, req, nil) {
- return handler(ctx, req)
- }
- serverSpan := tracer.StartSpan(
- info.FullMethod,
- ext.RPCServerOption(spanContext),
- gRPCComponentTag,
- )
- defer serverSpan.Finish()
- ctx = opentracing.ContextWithSpan(ctx, serverSpan)
- if otgrpcOpts.logPayloads {
- serverSpan.LogFields(log.Object("gRPC request", req))
- }
- resp, err = handler(ctx, req)
- if err == nil {
- if otgrpcOpts.logPayloads {
- serverSpan.LogFields(log.Object("gRPC response", resp))
- }
- } else {
- SetSpanTags(serverSpan, err, false)
- serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
- }
- if otgrpcOpts.decorator != nil {
- otgrpcOpts.decorator(serverSpan, info.FullMethod, req, resp, err)
- }
- return resp, err
- }
- }
- // OpenTracingStreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
- // for use in a grpc.NewServer call. The interceptor instruments streaming RPCs by
- // creating a single span to correspond to the lifetime of the RPC's stream.
- //
- // For example:
- //
- // s := grpc.NewServer(
- // ..., // (existing ServerOptions)
- // grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)))
- //
- // All gRPC server spans will look for an OpenTracing SpanContext in the gRPC
- // metadata; if found, the server span will act as the ChildOf that RPC
- // SpanContext.
- //
- // Root or not, the server Span will be embedded in the context.Context for the
- // application-specific gRPC handler(s) to access.
- func OpenTracingStreamServerInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.StreamServerInterceptor {
- otgrpcOpts := newOptions()
- otgrpcOpts.apply(optFuncs...)
- return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- spanContext, err := extractSpanContext(ss.Context(), tracer)
- if err != nil && err != opentracing.ErrSpanContextNotFound {
- // TODO: establish some sort of error reporting mechanism here. We
- // don't know where to put such an error and must rely on Tracer
- // implementations to do something appropriate for the time being.
- }
- if otgrpcOpts.inclusionFunc != nil &&
- !otgrpcOpts.inclusionFunc(spanContext, info.FullMethod, nil, nil) {
- return handler(srv, ss)
- }
- serverSpan := tracer.StartSpan(
- info.FullMethod,
- ext.RPCServerOption(spanContext),
- gRPCComponentTag,
- )
- defer serverSpan.Finish()
- ss = &openTracingServerStream{
- ServerStream: ss,
- ctx: opentracing.ContextWithSpan(ss.Context(), serverSpan),
- }
- err = handler(srv, ss)
- if err != nil {
- SetSpanTags(serverSpan, err, false)
- serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
- }
- if otgrpcOpts.decorator != nil {
- otgrpcOpts.decorator(serverSpan, info.FullMethod, nil, nil, err)
- }
- return err
- }
- }
- type openTracingServerStream struct {
- grpc.ServerStream
- ctx context.Context
- }
- func (ss *openTracingServerStream) Context() context.Context {
- return ss.ctx
- }
- func extractSpanContext(ctx context.Context, tracer opentracing.Tracer) (opentracing.SpanContext, error) {
- md, ok := metadata.FromIncomingContext(ctx)
- if !ok {
- md = metadata.New(nil)
- }
- return tracer.Extract(opentracing.HTTPHeaders, metadataReaderWriter{md})
- }
|