-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
sqs_s3_event.go
375 lines (328 loc) · 12 KB
/
sqs_s3_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package awss3
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go"
"go.uber.org/multierr"
"github.com/elastic/beats/v7/libbeat/beat"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
)
const (
sqsApproximateReceiveCountAttribute = "ApproximateReceiveCount"
sqsSentTimestampAttribute = "SentTimestamp"
sqsInvalidParameterValueErrorCode = "InvalidParameterValue"
sqsReceiptHandleIsInvalidErrCode = "ReceiptHandleIsInvalid"
)
type nonRetryableError struct {
Err error
}
func (e *nonRetryableError) Unwrap() error {
return e.Err
}
func (e *nonRetryableError) Error() string {
return "non-retryable error: " + e.Err.Error()
}
func (e *nonRetryableError) Is(err error) bool {
_, ok := err.(*nonRetryableError) //nolint:errorlint // This is not used directly to detected wrapped errors (errors.Is handles unwrapping).
return ok
}
func nonRetryableErrorWrap(err error) error {
if errors.Is(err, &nonRetryableError{}) {
return err
}
return &nonRetryableError{Err: err}
}
// s3EventsV2 is the notification message that Amazon S3 sends to notify of S3 changes.
// This was derived from the version 2.2 schema.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
// If the notification message is sent from SNS to SQS, then Records will be
// replaced by TopicArn and Message fields.
type s3EventsV2 struct {
TopicArn string `json:"TopicArn"`
Message string `json:"Message"`
Records []s3EventV2 `json:"Records"`
}
// s3EventV2 is a S3 change notification event.
type s3EventV2 struct {
AWSRegion string `json:"awsRegion"`
Provider string `json:"provider"`
EventName string `json:"eventName"`
EventSource string `json:"eventSource"`
S3 struct {
Bucket struct {
Name string `json:"name"`
ARN string `json:"arn"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
}
type sqsS3EventProcessor struct {
s3ObjectHandler s3ObjectHandlerFactory
sqsVisibilityTimeout time.Duration
maxReceiveCount int
sqs sqsAPI
pipeline beat.Pipeline // Pipeline creates clients for publishing events.
log *logp.Logger
warnOnce sync.Once
metrics *inputMetrics
script *script
}
func newSQSS3EventProcessor(
log *logp.Logger,
metrics *inputMetrics,
sqs sqsAPI,
script *script,
sqsVisibilityTimeout time.Duration,
maxReceiveCount int,
pipeline beat.Pipeline,
s3 s3ObjectHandlerFactory,
maxWorkers int,
) *sqsS3EventProcessor {
if metrics == nil {
// Metrics are optional. Initialize a stub.
metrics = newInputMetrics("", nil, 0)
}
return &sqsS3EventProcessor{
s3ObjectHandler: s3,
sqsVisibilityTimeout: sqsVisibilityTimeout,
maxReceiveCount: maxReceiveCount,
sqs: sqs,
pipeline: pipeline,
log: log,
metrics: metrics,
script: script,
}
}
func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message) error {
log := p.log.With(
"message_id", *msg.MessageId,
"message_receipt_time", time.Now().UTC())
keepaliveCtx, keepaliveCancel := context.WithCancel(ctx)
defer keepaliveCancel()
// Start SQS keepalive worker.
var keepaliveWg sync.WaitGroup
keepaliveWg.Add(1)
go p.keepalive(keepaliveCtx, log, &keepaliveWg, msg)
receiveCount := getSQSReceiveCount(msg.Attributes)
if receiveCount == 1 {
// Only contribute to the sqs_lag_time histogram on the first message
// to avoid skewing the metric when processing retries.
if s, found := msg.Attributes[sqsSentTimestampAttribute]; found {
if sentTimeMillis, err := strconv.ParseInt(s, 10, 64); err == nil {
sentTime := time.UnixMilli(sentTimeMillis)
p.metrics.sqsLagTime.Update(time.Since(sentTime).Nanoseconds())
}
}
}
handles, processingErr := p.processS3Events(ctx, log, *msg.Body)
// Stop keepalive routine before changing visibility.
keepaliveCancel()
keepaliveWg.Wait()
// No error. Delete SQS.
if processingErr == nil {
if msgDelErr := p.sqs.DeleteMessage(context.Background(), msg); msgDelErr != nil {
return fmt.Errorf("failed deleting message from SQS queue (it may be reprocessed): %w", msgDelErr)
}
p.metrics.sqsMessagesDeletedTotal.Inc()
// SQS message finished and deleted, finalize s3 objects
if finalizeErr := p.finalizeS3Objects(handles); finalizeErr != nil {
return fmt.Errorf("failed finalizing message from SQS queue (manual cleanup is required): %w", finalizeErr)
}
return nil
}
if p.maxReceiveCount > 0 && !errors.Is(processingErr, &nonRetryableError{}) {
// Prevent poison pill messages from consuming all workers. Check how
// many times this message has been received before making a disposition.
if receiveCount >= p.maxReceiveCount {
processingErr = nonRetryableErrorWrap(fmt.Errorf(
"sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w",
receiveCount, p.maxReceiveCount, processingErr))
}
}
// An error that reprocessing cannot correct. Delete SQS.
if errors.Is(processingErr, &nonRetryableError{}) {
if msgDelErr := p.sqs.DeleteMessage(context.Background(), msg); msgDelErr != nil {
return multierr.Combine(
fmt.Errorf("failed processing SQS message (attempted to delete message): %w", processingErr),
fmt.Errorf("failed deleting message from SQS queue (it may be reprocessed): %w", msgDelErr),
)
}
p.metrics.sqsMessagesDeletedTotal.Inc()
return fmt.Errorf("failed processing SQS message (message was deleted): %w", processingErr)
}
// An error that may be resolved by letting the visibility timeout
// expire thereby putting the message back on SQS. If a dead letter
// queue is enabled then the message will eventually placed on the DLQ
// after maximum receives is reached.
p.metrics.sqsMessagesReturnedTotal.Inc()
return fmt.Errorf("failed processing SQS message (it will return to queue after visibility timeout): %w", processingErr)
}
func (p *sqsS3EventProcessor) keepalive(ctx context.Context, log *logp.Logger, wg *sync.WaitGroup, msg *types.Message) {
defer wg.Done()
t := time.NewTicker(p.sqsVisibilityTimeout / 2)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
log.Debugw("Extending SQS message visibility timeout.",
"visibility_timeout", p.sqsVisibilityTimeout,
"expires_at", time.Now().UTC().Add(p.sqsVisibilityTimeout))
p.metrics.sqsVisibilityTimeoutExtensionsTotal.Inc()
// Renew visibility.
if err := p.sqs.ChangeMessageVisibility(ctx, msg, p.sqsVisibilityTimeout); err != nil {
var apiError smithy.APIError
if errors.As(err, &apiError) {
switch apiError.ErrorCode() {
case sqsReceiptHandleIsInvalidErrCode, sqsInvalidParameterValueErrorCode:
log.Warnw("Failed to extend message visibility timeout "+
"because SQS receipt handle is no longer valid. "+
"Stopping SQS message keepalive routine.", "error", err)
return
}
}
}
}
}
}
func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, error) {
// Check if a parsing script is defined. If so, it takes precedence over
// format autodetection.
if p.script != nil {
return p.script.run(body)
}
// NOTE: If AWS introduces a V3 schema this will need updated to handle that schema.
var events s3EventsV2
dec := json.NewDecoder(strings.NewReader(body))
if err := dec.Decode(&events); err != nil {
p.log.Debugw("Invalid SQS message body.", "sqs_message_body", body)
return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err)
}
// Check if the notification is from S3 -> SNS -> SQS
if events.TopicArn != "" {
dec := json.NewDecoder(strings.NewReader(events.Message))
if err := dec.Decode(&events); err != nil {
p.log.Debugw("Invalid SQS message body.", "sqs_message_body", body)
return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err)
}
}
if events.Records == nil {
p.log.Debugw("Invalid SQS message body: missing Records field", "sqs_message_body", body)
return nil, errors.New("the message is an invalid S3 notification: missing Records field")
}
return p.getS3Info(events)
}
func (p *sqsS3EventProcessor) getS3Info(events s3EventsV2) ([]s3EventV2, error) {
out := make([]s3EventV2, 0, len(events.Records))
for _, record := range events.Records {
if !p.isObjectCreatedEvents(record) {
p.warnOnce.Do(func() {
p.log.Warnf("Received S3 notification for %q event type, but "+
"only 'ObjectCreated:*' types are handled. It is recommended "+
"that you update the S3 Event Notification configuration to "+
"only include ObjectCreated event types to save resources.",
record.EventName)
})
continue
}
// Unescape s3 key name. For example, convert "%3D" back to "=".
key, err := url.QueryUnescape(record.S3.Object.Key)
if err != nil {
return nil, fmt.Errorf("url unescape failed for '%v': %w", record.S3.Object.Key, err)
}
record.S3.Object.Key = key
out = append(out, record)
}
return out, nil
}
func (*sqsS3EventProcessor) isObjectCreatedEvents(event s3EventV2) bool {
return event.EventSource == "aws:s3" && strings.HasPrefix(event.EventName, "ObjectCreated:")
}
func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Logger, body string) ([]s3ObjectHandler, error) {
s3Events, err := p.getS3Notifications(body)
if err != nil {
if errors.Is(err, context.Canceled) {
// Messages that are in-flight at shutdown should be returned to SQS.
return nil, err
}
return nil, &nonRetryableError{err}
}
log.Debugf("SQS message contained %d S3 event notifications.", len(s3Events))
defer log.Debug("End processing SQS S3 event notifications.")
if len(s3Events) == 0 {
return nil, nil
}
// Create a pipeline client scoped to this goroutine.
client, err := p.pipeline.ConnectWith(beat.ClientConfig{
EventListener: awscommon.NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
EventNormalization: boolPtr(false),
},
})
if err != nil {
return nil, err
}
defer client.Close()
// Wait for all events to be ACKed before proceeding.
acker := awscommon.NewEventACKTracker(ctx)
defer acker.Wait()
var errs []error
var handles []s3ObjectHandler
for i, event := range s3Events {
s3Processor := p.s3ObjectHandler.Create(ctx, log, client, acker, event)
if s3Processor == nil {
continue
}
// Process S3 object (download, parse, create events).
if err := s3Processor.ProcessS3Object(); err != nil {
errs = append(errs, fmt.Errorf(
"failed processing S3 event for object key %q in bucket %q (object record %d of %d in SQS notification): %w",
event.S3.Object.Key, event.S3.Bucket.Name, i+1, len(s3Events), err))
} else {
handles = append(handles, s3Processor)
}
}
// Make sure all s3 events were processed successfully
if len(handles) == len(s3Events) {
return handles, multierr.Combine(errs...)
}
return nil, multierr.Combine(errs...)
}
func (p *sqsS3EventProcessor) finalizeS3Objects(handles []s3ObjectHandler) error {
var errs []error
for i, handle := range handles {
if err := handle.FinalizeS3Object(); err != nil {
errs = append(errs, fmt.Errorf(
"failed finalizing S3 event (object record %d of %d in SQS notification): %w",
i+1, len(handles), err))
}
}
return multierr.Combine(errs...)
}
// getSQSReceiveCount returns the SQS ApproximateReceiveCount attribute. If the value
// cannot be read then -1 is returned.
func getSQSReceiveCount(attributes map[string]string) int {
if s, found := attributes[sqsApproximateReceiveCountAttribute]; found {
if receiveCount, err := strconv.Atoi(s); err == nil {
return receiveCount
}
}
return -1
}