-
-
Notifications
You must be signed in to change notification settings - Fork 564
/
Copy pathhandler.go
149 lines (130 loc) Β· 3.63 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package grpc
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"goa.design/goa"
)
// Inspired from https://github.com/go-kit/kit/blob/1c17eccf28596f5a2c59314f7923ca66301b90e4/transport/grpc/server.go
type (
// UnaryHandler handles a unary RPC. The request and response types are
// protocol buffer message types.
UnaryHandler interface {
// Handle handles a unary RPC.
//
// It takes a protocol buffer message type and returns a protocol buffer
// message type and any error when executing the RPC.
Handle(ctx context.Context, reqpb interface{}) (respb interface{}, err error)
}
// StreamHandler handles a streaming RPC. The stream may be client-side,
// server-side, or bidirectional.
StreamHandler interface {
// Handle handles a streaming RPC.
//
// input contains the goa endpoint payload type (if any) and goa generated
// endpoint stream interface.
Handle(ctx context.Context, input interface{}) (err error)
// Decode decodes the protocol buffer message type and incoming metadata to
// the goa type. For client-side and bidirectional streams, the request
// message type will be nil.
Decode(ctx context.Context, reqpb interface{}) (req interface{}, err error)
}
unaryHandler struct {
endpoint goa.Endpoint
decoder RequestDecoder
encoder ResponseEncoder
}
streamHandler struct {
endpoint goa.Endpoint
decoder RequestDecoder
}
)
// NewUnaryHandler returns a handler to handle unary gRPC endpoints.
func NewUnaryHandler(e goa.Endpoint, dec RequestDecoder, enc ResponseEncoder) UnaryHandler {
return &unaryHandler{
endpoint: e,
decoder: dec,
encoder: enc,
}
}
// NewStreamHandler returns a handler to handle streaming gRPC endpoints.
func NewStreamHandler(e goa.Endpoint, dec RequestDecoder) StreamHandler {
return &streamHandler{
endpoint: e,
decoder: dec,
}
}
// Handle serves a gRPC request.
func (h *unaryHandler) Handle(ctx context.Context, reqpb interface{}) (interface{}, error) {
var (
req interface{}
err error
)
{
if h.decoder != nil {
// Decode gRPC request message and incoming metadata
md, _ := metadata.FromIncomingContext(ctx)
if req, err = h.decoder(ctx, reqpb, md); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
}
var (
resp interface{}
)
{
// Invoke goa endpoint
if resp, err = h.endpoint(ctx, req); err != nil {
return nil, err
}
}
var (
respb interface{}
hdr = metadata.MD{}
trlr = metadata.MD{}
)
{
if h.encoder != nil {
// Encode gRPC response
if respb, err = h.encoder(ctx, resp, &hdr, &trlr); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
}
}
// Encode gRPC headers
if len(hdr) > 0 {
if err := grpc.SendHeader(ctx, hdr); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
}
// Encode gRPC trailers
if len(trlr) > 0 {
if err := grpc.SetTrailer(ctx, trlr); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
}
return respb, err
}
// Decode decodes the request message and incoming metadata into goa type.
func (h *streamHandler) Decode(ctx context.Context, reqpb interface{}) (interface{}, error) {
var (
req interface{}
err error
)
{
if h.decoder != nil {
md, _ := metadata.FromIncomingContext(ctx)
if req, err = h.decoder(ctx, reqpb, md); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
}
return req, nil
}
// Handle serves a gRPC request.
func (h *streamHandler) Handle(ctx context.Context, stream interface{}) error {
_, err := h.endpoint(ctx, stream)
return err
}