-
Notifications
You must be signed in to change notification settings - Fork 507
/
invoke.go
409 lines (344 loc) · 12.8 KB
/
invoke.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
package grpcurl
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 we have to import this because it appears in exported API
"github.com/golang/protobuf/proto" //lint:ignore SA1019 we have to import this because it appears in exported API
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"github.com/jhump/protoreflect/grpcreflect"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// InvocationEventHandler is a bag of callbacks for handling events that occur in the course
// of invoking an RPC. The handler also provides request data that is sent. The callbacks are
// generally called in the order they are listed below.
type InvocationEventHandler interface {
// OnResolveMethod is called with a descriptor of the method that is being invoked.
OnResolveMethod(*desc.MethodDescriptor)
// OnSendHeaders is called with the request metadata that is being sent.
OnSendHeaders(metadata.MD)
// OnReceiveHeaders is called when response headers have been received.
OnReceiveHeaders(metadata.MD)
// OnReceiveResponse is called for each response message received.
OnReceiveResponse(proto.Message)
// OnReceiveTrailers is called when response trailers and final RPC status have been received.
OnReceiveTrailers(*status.Status, metadata.MD)
}
// RequestMessageSupplier is a function that is called to retrieve request
// messages for a GRPC operation. This type is deprecated and will be removed in
// a future release.
//
// Deprecated: This is only used with the deprecated InvokeRpc. Instead, use
// RequestSupplier with InvokeRPC.
type RequestMessageSupplier func() ([]byte, error)
// InvokeRpc uses the given gRPC connection to invoke the given method. This function is deprecated
// and will be removed in a future release. It just delegates to the similarly named InvokeRPC
// method, whose signature is only slightly different.
//
// Deprecated: use InvokeRPC instead.
func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier) error {
return InvokeRPC(ctx, source, cc, methodName, headers, handler, func(m proto.Message) error {
// New function is almost identical, but the request supplier function works differently.
// So we adapt the logic here to maintain compatibility.
data, err := requestData()
if err != nil {
return err
}
return jsonpb.Unmarshal(bytes.NewReader(data), m)
})
}
// RequestSupplier is a function that is called to populate messages for a gRPC operation. The
// function should populate the given message or return a non-nil error. If the supplier has no
// more messages, it should return io.EOF. When it returns io.EOF, it should not in any way
// modify the given message argument.
type RequestSupplier func(proto.Message) error
// InvokeRPC uses the given gRPC channel to invoke the given method. The given descriptor source
// is used to determine the type of method and the type of request and response message. The given
// headers are sent as request metadata. Methods on the given event handler are called as the
// invocation proceeds.
//
// The given requestData function supplies the actual data to send. It should return io.EOF when
// there is no more request data. If the method being invoked is a unary or server-streaming RPC
// (e.g. exactly one request message) and there is no request data (e.g. the first invocation of
// the function returns io.EOF), then an empty request message is sent.
//
// If the requestData function and the given event handler coordinate or share any state, they should
// be thread-safe. This is because the requestData function may be called from a different goroutine
// than the one invoking event callbacks. (This only happens for bi-directional streaming RPCs, where
// one goroutine sends request messages and another consumes the response messages).
func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Channel, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestSupplier) error {
md := MetadataFromHeaders(headers)
svc, mth := parseSymbol(methodName)
if svc == "" || mth == "" {
return fmt.Errorf("given method name %q is not in expected format: 'service/method' or 'service.method'", methodName)
}
dsc, err := source.FindSymbol(svc)
if err != nil {
// return a gRPC status error if hasStatus is true
errStatus, hasStatus := status.FromError(err)
switch {
case hasStatus && isNotFoundError(err):
return status.Errorf(errStatus.Code(), "target server does not expose service %q: %s", svc, errStatus.Message())
case hasStatus:
return status.Errorf(errStatus.Code(), "failed to query for service descriptor %q: %s", svc, errStatus.Message())
case isNotFoundError(err):
return fmt.Errorf("target server does not expose service %q", svc)
}
return fmt.Errorf("failed to query for service descriptor %q: %v", svc, err)
}
sd, ok := dsc.(*desc.ServiceDescriptor)
if !ok {
return fmt.Errorf("target server does not expose service %q", svc)
}
mtd := sd.FindMethodByName(mth)
if mtd == nil {
return fmt.Errorf("service %q does not include a method named %q", svc, mth)
}
handler.OnResolveMethod(mtd)
// we also download any applicable extensions so we can provide full support for parsing user-provided data
var ext dynamic.ExtensionRegistry
alreadyFetched := map[string]bool{}
if err = fetchAllExtensions(source, &ext, mtd.GetInputType(), alreadyFetched); err != nil {
return fmt.Errorf("error resolving server extensions for message %s: %v", mtd.GetInputType().GetFullyQualifiedName(), err)
}
if err = fetchAllExtensions(source, &ext, mtd.GetOutputType(), alreadyFetched); err != nil {
return fmt.Errorf("error resolving server extensions for message %s: %v", mtd.GetOutputType().GetFullyQualifiedName(), err)
}
msgFactory := dynamic.NewMessageFactoryWithExtensionRegistry(&ext)
req := msgFactory.NewMessage(mtd.GetInputType())
handler.OnSendHeaders(md)
ctx = metadata.NewOutgoingContext(ctx, md)
stub := grpcdynamic.NewStubWithMessageFactory(ch, msgFactory)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if mtd.IsClientStreaming() && mtd.IsServerStreaming() {
return invokeBidi(ctx, stub, mtd, handler, requestData, req)
} else if mtd.IsClientStreaming() {
return invokeClientStream(ctx, stub, mtd, handler, requestData, req)
} else if mtd.IsServerStreaming() {
return invokeServerStream(ctx, stub, mtd, handler, requestData, req)
} else {
return invokeUnary(ctx, stub, mtd, handler, requestData, req)
}
}
func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestSupplier, req proto.Message) error {
err := requestData(req)
if err != nil && err != io.EOF {
return fmt.Errorf("error getting request data: %v", err)
}
if err != io.EOF {
// verify there is no second message, which is a usage error
err := requestData(req)
if err == nil {
return fmt.Errorf("method %q is a unary RPC, but request data contained more than 1 message", md.GetFullyQualifiedName())
} else if err != io.EOF {
return fmt.Errorf("error getting request data: %v", err)
}
}
// Now we can actually invoke the RPC!
var respHeaders metadata.MD
var respTrailers metadata.MD
resp, err := stub.InvokeRpc(ctx, md, req, grpc.Trailer(&respTrailers), grpc.Header(&respHeaders))
stat, ok := status.FromError(err)
if !ok {
// Error codes sent from the server will get printed differently below.
// So just bail for other kinds of errors here.
return fmt.Errorf("grpc call for %q failed: %v", md.GetFullyQualifiedName(), err)
}
handler.OnReceiveHeaders(respHeaders)
if stat.Code() == codes.OK {
handler.OnReceiveResponse(resp)
}
handler.OnReceiveTrailers(stat, respTrailers)
return nil
}
func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestSupplier, req proto.Message) error {
// invoke the RPC!
str, err := stub.InvokeRpcClientStream(ctx, md)
// Upload each request message in the stream
var resp proto.Message
for err == nil {
err = requestData(req)
if err == io.EOF {
resp, err = str.CloseAndReceive()
break
}
if err != nil {
return fmt.Errorf("error getting request data: %v", err)
}
err = str.SendMsg(req)
if err == io.EOF {
// We get EOF on send if the server says "go away"
// We have to use CloseAndReceive to get the actual code
resp, err = str.CloseAndReceive()
break
}
req.Reset()
}
// finally, process response data
stat, ok := status.FromError(err)
if !ok {
// Error codes sent from the server will get printed differently below.
// So just bail for other kinds of errors here.
return fmt.Errorf("grpc call for %q failed: %v", md.GetFullyQualifiedName(), err)
}
if str != nil {
if respHeaders, err := str.Header(); err == nil {
handler.OnReceiveHeaders(respHeaders)
}
}
if stat.Code() == codes.OK {
handler.OnReceiveResponse(resp)
}
if str != nil {
handler.OnReceiveTrailers(stat, str.Trailer())
}
return nil
}
func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestSupplier, req proto.Message) error {
err := requestData(req)
if err != nil && err != io.EOF {
return fmt.Errorf("error getting request data: %v", err)
}
if err != io.EOF {
// verify there is no second message, which is a usage error
err := requestData(req)
if err == nil {
return fmt.Errorf("method %q is a server-streaming RPC, but request data contained more than 1 message", md.GetFullyQualifiedName())
} else if err != io.EOF {
return fmt.Errorf("error getting request data: %v", err)
}
}
// Now we can actually invoke the RPC!
str, err := stub.InvokeRpcServerStream(ctx, md, req)
if str != nil {
if respHeaders, err := str.Header(); err == nil {
handler.OnReceiveHeaders(respHeaders)
}
}
// Download each response message
for err == nil {
var resp proto.Message
resp, err = str.RecvMsg()
if err != nil {
if err == io.EOF {
err = nil
}
break
}
handler.OnReceiveResponse(resp)
}
stat, ok := status.FromError(err)
if !ok {
// Error codes sent from the server will get printed differently below.
// So just bail for other kinds of errors here.
return fmt.Errorf("grpc call for %q failed: %v", md.GetFullyQualifiedName(), err)
}
if str != nil {
handler.OnReceiveTrailers(stat, str.Trailer())
}
return nil
}
func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestSupplier, req proto.Message) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// invoke the RPC!
str, err := stub.InvokeRpcBidiStream(ctx, md)
var wg sync.WaitGroup
var sendErr atomic.Value
defer wg.Wait()
if err == nil {
wg.Add(1)
go func() {
defer wg.Done()
// Concurrently upload each request message in the stream
var err error
for err == nil {
err = requestData(req)
if err == io.EOF {
err = str.CloseSend()
break
}
if err != nil {
err = fmt.Errorf("error getting request data: %v", err)
cancel()
break
}
err = str.SendMsg(req)
req.Reset()
}
if err != nil {
sendErr.Store(err)
}
}()
}
if str != nil {
if respHeaders, err := str.Header(); err == nil {
handler.OnReceiveHeaders(respHeaders)
}
}
// Download each response message
for err == nil {
var resp proto.Message
resp, err = str.RecvMsg()
if err != nil {
if err == io.EOF {
err = nil
}
break
}
handler.OnReceiveResponse(resp)
}
if se, ok := sendErr.Load().(error); ok && se != io.EOF {
err = se
}
stat, ok := status.FromError(err)
if !ok {
// Error codes sent from the server will get printed differently below.
// So just bail for other kinds of errors here.
return fmt.Errorf("grpc call for %q failed: %v", md.GetFullyQualifiedName(), err)
}
if str != nil {
handler.OnReceiveTrailers(stat, str.Trailer())
}
return nil
}
type notFoundError string
func notFound(kind, name string) error {
return notFoundError(fmt.Sprintf("%s not found: %s", kind, name))
}
func (e notFoundError) Error() string {
return string(e)
}
func isNotFoundError(err error) bool {
if grpcreflect.IsElementNotFoundError(err) {
return true
}
_, ok := err.(notFoundError)
return ok
}
func parseSymbol(svcAndMethod string) (string, string) {
pos := strings.LastIndex(svcAndMethod, "/")
if pos < 0 {
pos = strings.LastIndex(svcAndMethod, ".")
if pos < 0 {
return "", ""
}
}
return svcAndMethod[:pos], svcAndMethod[pos+1:]
}