server.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package otgrpc
  2. import (
  3. "github.com/opentracing/opentracing-go"
  4. "github.com/opentracing/opentracing-go/ext"
  5. "github.com/opentracing/opentracing-go/log"
  6. "golang.org/x/net/context"
  7. "google.golang.org/grpc"
  8. "google.golang.org/grpc/metadata"
  9. )
  10. // OpenTracingServerInterceptor returns a grpc.UnaryServerInterceptor suitable
  11. // for use in a grpc.NewServer call.
  12. //
  13. // For example:
  14. //
  15. // s := grpc.NewServer(
  16. // ..., // (existing ServerOptions)
  17. // grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))
  18. //
  19. // All gRPC server spans will look for an OpenTracing SpanContext in the gRPC
  20. // metadata; if found, the server span will act as the ChildOf that RPC
  21. // SpanContext.
  22. //
  23. // Root or not, the server Span will be embedded in the context.Context for the
  24. // application-specific gRPC handler(s) to access.
  25. func OpenTracingServerInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryServerInterceptor {
  26. otgrpcOpts := newOptions()
  27. otgrpcOpts.apply(optFuncs...)
  28. return func(
  29. ctx context.Context,
  30. req interface{},
  31. info *grpc.UnaryServerInfo,
  32. handler grpc.UnaryHandler,
  33. ) (resp interface{}, err error) {
  34. spanContext, err := extractSpanContext(ctx, tracer)
  35. if err != nil && err != opentracing.ErrSpanContextNotFound {
  36. // TODO: establish some sort of error reporting mechanism here. We
  37. // don't know where to put such an error and must rely on Tracer
  38. // implementations to do something appropriate for the time being.
  39. }
  40. if otgrpcOpts.inclusionFunc != nil &&
  41. !otgrpcOpts.inclusionFunc(spanContext, info.FullMethod, req, nil) {
  42. return handler(ctx, req)
  43. }
  44. serverSpan := tracer.StartSpan(
  45. info.FullMethod,
  46. ext.RPCServerOption(spanContext),
  47. gRPCComponentTag,
  48. )
  49. defer serverSpan.Finish()
  50. ctx = opentracing.ContextWithSpan(ctx, serverSpan)
  51. if otgrpcOpts.logPayloads {
  52. serverSpan.LogFields(log.Object("gRPC request", req))
  53. }
  54. resp, err = handler(ctx, req)
  55. if err == nil {
  56. if otgrpcOpts.logPayloads {
  57. serverSpan.LogFields(log.Object("gRPC response", resp))
  58. }
  59. } else {
  60. SetSpanTags(serverSpan, err, false)
  61. serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
  62. }
  63. if otgrpcOpts.decorator != nil {
  64. otgrpcOpts.decorator(serverSpan, info.FullMethod, req, resp, err)
  65. }
  66. return resp, err
  67. }
  68. }
  69. // OpenTracingStreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
  70. // for use in a grpc.NewServer call. The interceptor instruments streaming RPCs by
  71. // creating a single span to correspond to the lifetime of the RPC's stream.
  72. //
  73. // For example:
  74. //
  75. // s := grpc.NewServer(
  76. // ..., // (existing ServerOptions)
  77. // grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)))
  78. //
  79. // All gRPC server spans will look for an OpenTracing SpanContext in the gRPC
  80. // metadata; if found, the server span will act as the ChildOf that RPC
  81. // SpanContext.
  82. //
  83. // Root or not, the server Span will be embedded in the context.Context for the
  84. // application-specific gRPC handler(s) to access.
  85. func OpenTracingStreamServerInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.StreamServerInterceptor {
  86. otgrpcOpts := newOptions()
  87. otgrpcOpts.apply(optFuncs...)
  88. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  89. spanContext, err := extractSpanContext(ss.Context(), tracer)
  90. if err != nil && err != opentracing.ErrSpanContextNotFound {
  91. // TODO: establish some sort of error reporting mechanism here. We
  92. // don't know where to put such an error and must rely on Tracer
  93. // implementations to do something appropriate for the time being.
  94. }
  95. if otgrpcOpts.inclusionFunc != nil &&
  96. !otgrpcOpts.inclusionFunc(spanContext, info.FullMethod, nil, nil) {
  97. return handler(srv, ss)
  98. }
  99. serverSpan := tracer.StartSpan(
  100. info.FullMethod,
  101. ext.RPCServerOption(spanContext),
  102. gRPCComponentTag,
  103. )
  104. defer serverSpan.Finish()
  105. ss = &openTracingServerStream{
  106. ServerStream: ss,
  107. ctx: opentracing.ContextWithSpan(ss.Context(), serverSpan),
  108. }
  109. err = handler(srv, ss)
  110. if err != nil {
  111. SetSpanTags(serverSpan, err, false)
  112. serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
  113. }
  114. if otgrpcOpts.decorator != nil {
  115. otgrpcOpts.decorator(serverSpan, info.FullMethod, nil, nil, err)
  116. }
  117. return err
  118. }
  119. }
  120. type openTracingServerStream struct {
  121. grpc.ServerStream
  122. ctx context.Context
  123. }
  124. func (ss *openTracingServerStream) Context() context.Context {
  125. return ss.ctx
  126. }
  127. func extractSpanContext(ctx context.Context, tracer opentracing.Tracer) (opentracing.SpanContext, error) {
  128. md, ok := metadata.FromIncomingContext(ctx)
  129. if !ok {
  130. md = metadata.New(nil)
  131. }
  132. return tracer.Extract(opentracing.HTTPHeaders, metadataReaderWriter{md})
  133. }