-
Notifications
You must be signed in to change notification settings - Fork 223
/
client.go
295 lines (254 loc) · 7.54 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package client
import (
"context"
"errors"
"fmt"
"io"
"runtime"
"sync"
"go.uber.org/zap"
"github.com/cloudevents/sdk-go/v2/binding"
cecontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
)
// Client interface defines the runtime contract the CloudEvents client supports.
type Client interface {
// Send will transmit the given event over the client's configured transport.
Send(ctx context.Context, event event.Event) protocol.Result
// Request will transmit the given event over the client's configured
// transport and return any response event.
Request(ctx context.Context, event event.Event) (*event.Event, protocol.Result)
// StartReceiver will register the provided function for callback on receipt
// of a cloudevent. It will also start the underlying protocol as it has
// been configured.
// This call is blocking.
// Valid fn signatures are:
// * func()
// * func() error
// * func(context.Context)
// * func(context.Context) protocol.Result
// * func(event.Event)
// * func(event.Event) protocol.Result
// * func(context.Context, event.Event)
// * func(context.Context, event.Event) protocol.Result
// * func(event.Event) *event.Event
// * func(event.Event) (*event.Event, protocol.Result)
// * func(context.Context, event.Event) *event.Event
// * func(context.Context, event.Event) (*event.Event, protocol.Result)
StartReceiver(ctx context.Context, fn interface{}) error
}
// New produces a new client with the provided transport object and applied
// client options.
func New(obj interface{}, opts ...Option) (Client, error) {
c := &ceClient{
// Running runtime.GOMAXPROCS(0) doesn't update the value, just returns the current one
pollGoroutines: runtime.GOMAXPROCS(0),
observabilityService: noopObservabilityService{},
}
if p, ok := obj.(protocol.Sender); ok {
c.sender = p
}
if p, ok := obj.(protocol.Requester); ok {
c.requester = p
}
if p, ok := obj.(protocol.Responder); ok {
c.responder = p
}
if p, ok := obj.(protocol.Receiver); ok {
c.receiver = p
}
if p, ok := obj.(protocol.Opener); ok {
c.opener = p
}
if err := c.applyOptions(opts...); err != nil {
return nil, err
}
return c, nil
}
type ceClient struct {
sender protocol.Sender
requester protocol.Requester
receiver protocol.Receiver
responder protocol.Responder
// Optional.
opener protocol.Opener
observabilityService ObservabilityService
inboundContextDecorators []func(context.Context, binding.Message) context.Context
outboundContextDecorators []func(context.Context) context.Context
invoker Invoker
receiverMu sync.Mutex
eventDefaulterFns []EventDefaulter
pollGoroutines int
blockingCallback bool
ackMalformedEvent bool
}
func (c *ceClient) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(c); err != nil {
return err
}
}
return nil
}
func (c *ceClient) Send(ctx context.Context, e event.Event) protocol.Result {
var err error
if c.sender == nil {
err = errors.New("sender not set")
return err
}
for _, f := range c.outboundContextDecorators {
ctx = f(ctx)
}
if len(c.eventDefaulterFns) > 0 {
for _, fn := range c.eventDefaulterFns {
e = fn(ctx, e)
}
}
if err = e.Validate(); err != nil {
return err
}
// Event has been defaulted and validated, record we are going to perform send.
ctx, cb := c.observabilityService.RecordSendingEvent(ctx, e)
err = c.sender.Send(ctx, (*binding.EventMessage)(&e))
defer cb(err)
return err
}
func (c *ceClient) Request(ctx context.Context, e event.Event) (*event.Event, protocol.Result) {
var resp *event.Event
var err error
if c.requester == nil {
err = errors.New("requester not set")
return nil, err
}
for _, f := range c.outboundContextDecorators {
ctx = f(ctx)
}
if len(c.eventDefaulterFns) > 0 {
for _, fn := range c.eventDefaulterFns {
e = fn(ctx, e)
}
}
if err = e.Validate(); err != nil {
return nil, err
}
// Event has been defaulted and validated, record we are going to perform request.
ctx, cb := c.observabilityService.RecordRequestEvent(ctx, e)
// If provided a requester, use it to do request/response.
var msg binding.Message
msg, err = c.requester.Request(ctx, (*binding.EventMessage)(&e))
if msg != nil {
defer func() {
if err := msg.Finish(err); err != nil {
cecontext.LoggerFrom(ctx).Warnw("failed calling message.Finish", zap.Error(err))
}
}()
}
if protocol.IsUndelivered(err) {
return nil, err
}
// try to turn msg into an event, it might not work and that is ok.
if rs, rserr := binding.ToEvent(ctx, msg); rserr != nil {
cecontext.LoggerFrom(ctx).Debugw("response: failed calling ToEvent", zap.Error(rserr), zap.Any("resp", msg))
// If the protocol returns no error, it is an ACK on the request, but we had
// issues turning the response into an event, so make an ACK Result and pass
// down the ToEvent error as well.
err = protocol.NewReceipt(true, "failed to convert response into event: %v\n%w", rserr, err)
} else {
resp = rs
}
defer cb(err, resp)
return resp, err
}
// StartReceiver sets up the given fn to handle Receive.
// See Client.StartReceiver for details. This is a blocking call.
func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c.receiverMu.Lock()
defer c.receiverMu.Unlock()
if c.invoker != nil {
return fmt.Errorf("client already has a receiver")
}
invoker, err := newReceiveInvoker(
fn,
c.observabilityService,
c.inboundContextDecorators,
c.eventDefaulterFns,
c.ackMalformedEvent,
)
if err != nil {
return err
}
if invoker.IsReceiver() && c.receiver == nil {
return fmt.Errorf("mismatched receiver callback without protocol.Receiver supported by protocol")
}
if invoker.IsResponder() && c.responder == nil {
return fmt.Errorf("mismatched receiver callback without protocol.Responder supported by protocol")
}
c.invoker = invoker
if c.responder == nil && c.receiver == nil {
return errors.New("responder nor receiver set")
}
defer func() {
c.invoker = nil
}()
// Start Polling.
wg := sync.WaitGroup{}
for i := 0; i < c.pollGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
var msg binding.Message
var respFn protocol.ResponseFn
var err error
if c.responder != nil {
msg, respFn, err = c.responder.Respond(ctx)
} else if c.receiver != nil {
msg, err = c.receiver.Receive(ctx)
respFn = noRespFn
}
if err == io.EOF { // Normal close
return
}
if err != nil {
cecontext.LoggerFrom(ctx).Warn("Error while receiving a message: ", err)
continue
}
callback := func() {
if err := c.invoker.Invoke(ctx, msg, respFn); err != nil {
cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err)
}
}
if c.blockingCallback {
callback()
} else {
// Do not block on the invoker.
wg.Add(1)
go func() {
defer wg.Done()
callback()
}()
}
}
}()
}
// Start the opener, if set.
if c.opener != nil {
if err = c.opener.OpenInbound(ctx); err != nil {
err = fmt.Errorf("error while opening the inbound connection: %w", err)
cancel()
}
}
wg.Wait()
return err
}
// noRespFn is used to simply forward the protocol.Result for receivers that aren't responders
func noRespFn(_ context.Context, _ binding.Message, r protocol.Result, _ ...binding.Transformer) error {
return r
}