Skip to content

Commit

Permalink
[Issue bradleyjkemp#96] this is a PoC to solve Issue bradleyjkemp#96.
Browse files Browse the repository at this point in the history
  • Loading branch information
marcellanz committed May 1, 2020
1 parent 200d055 commit 0977fe2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 8 deletions.
41 changes: 36 additions & 5 deletions grpc-dump/dump/dump_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,43 @@ import (
"fmt"
"io"
"strings"
"sync"

"github.com/bradleyjkemp/grpc-tools/internal"
"github.com/bradleyjkemp/grpc-tools/internal/proto_decoder"

"github.com/bradleyjkemp/grpc-tools/internal/proto_descriptor"
"github.com/golang/protobuf/jsonpb"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// dump interceptor implements a gRPC.StreamingServerInterceptor that dumps all RPC details
var mu sync.Mutex

type pbm struct {
*dynamic.Message
}

func (p *pbm) MarshalJSON() ([]byte, error) {
fd := make([]*desc.FileDescriptor, 0)
proto_descriptor.MsgDesc.Lock()
defer proto_descriptor.MsgDesc.Unlock()
for _, d := range proto_descriptor.MsgDesc.Desc {
fd = append(fd, d.GetFile())
}
return p.MarshalJSONPB(
&jsonpb.Marshaler{
AnyResolver: dynamic.AnyResolver(
dynamic.NewMessageFactoryWithDefaults(),
fd...,
),
})
}

func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_decoder.MessageDecoder) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
dss := &recordedServerStream{ServerStream: ss}
Expand All @@ -31,6 +57,7 @@ func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_

fullMethod := strings.Split(info.FullMethod, "/")
md, _ := metadata.FromIncomingContext(ss.Context())
dss.Lock()
rpc := internal.RPC{
Service: fullMethod[1],
Method: fullMethod[2],
Expand All @@ -42,15 +69,19 @@ func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_
}

var err error
for _, message := range rpc.Messages {
message.Message, err = decoder.Decode(info.FullMethod, message)
for i := range rpc.Messages {
msg, err := decoder.Decode(info.FullMethod, rpc.Messages[i])
if err != nil {
logger.WithError(err).Warn("Failed to decode message")
}
rpc.Messages[i].Message = &pbm{msg}
}
dump, err := json.Marshal(rpc)
if err != nil {
logger.WithError(err).Fatal("Failed to marshal rpc")
}

dump, _ := json.Marshal(rpc)
fmt.Fprintln(output, string(dump))
dss.Unlock()
return rpcErr
}
}
6 changes: 5 additions & 1 deletion internal/proto_decoder/unknown_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package proto_decoder

import (
"fmt"
"regexp"

"github.com/bradleyjkemp/grpc-tools/internal"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/builder"
"github.com/jhump/protoreflect/dynamic"
"github.com/pkg/errors"
"regexp"
)

// When we don't have an actual proto message descriptor, this takes a best effort
Expand Down Expand Up @@ -110,6 +111,9 @@ func (u *unknownFieldResolver) enrichMessage(descriptor *builder.MessageBuilder,
if err != nil {
return errors.Wrap(err, "failed to convert nested message to dynamic")
}
if dynamicNestedMessage == nil {
return nil
}

err = u.enrichMessage(nestedMessageDescriptor, dynamicNestedMessage)
if err != nil {
Expand Down
26 changes: 24 additions & 2 deletions internal/proto_descriptor/protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package proto_descriptor

import (
"fmt"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"os"
"path/filepath"
"sync"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
)

func LoadProtoDescriptors(descriptorPaths ...string) (map[string]*desc.MethodDescriptor, error) {
Expand All @@ -21,6 +23,13 @@ func LoadProtoDescriptors(descriptorPaths ...string) (map[string]*desc.MethodDes
return convertDescriptorsToMap(descriptors), nil
}

type MessageDesc struct {
Desc map[string]*desc.MessageDescriptor
sync.Mutex
}

var MsgDesc = MessageDesc{Desc: make(map[string]*desc.MessageDescriptor, 0)}

// recursively walks through all files in the given directories and
// finds .proto files that contains service definitions
func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, error) {
Expand All @@ -45,6 +54,14 @@ func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, e
fmt.Fprintf(os.Stderr, "Skipping %s due to parse error %s", path, err)
return nil
}
fileDescs, err := parser.ParseFiles(relpath)
for _, fileDesc := range fileDescs {
for _, mt := range fileDesc.GetMessageTypes() {
MsgDesc.Lock()
MsgDesc.Desc[mt.GetFullyQualifiedName()] = mt
MsgDesc.Unlock()
}
}
if len(descs[0].Service) > 0 {
// this file is interesting so
fileDesc, err := parser.ParseFiles(relpath)
Expand All @@ -53,6 +70,11 @@ func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, e
return nil
}
servicesFiles = append(servicesFiles, fileDesc[0])
for _, smt := range fileDesc[0].GetMessageTypes() {
MsgDesc.Lock()
MsgDesc.Desc[smt.GetFullyQualifiedName()] = smt
MsgDesc.Unlock()
}
}
}
return nil
Expand Down

0 comments on commit 0977fe2

Please sign in to comment.