From 91906c9cdc6156f99af7edd2de8cb897bc5e3993 Mon Sep 17 00:00:00 2001 From: Kylie Meli Date: Mon, 3 Apr 2023 09:27:01 -0400 Subject: [PATCH] Adding aws-s3 metric for sqs worker utilization (#34793) --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-aws-s3.asciidoc | 1 + x-pack/filebeat/input/awss3/input.go | 12 +- .../input/awss3/input_benchmark_test.go | 10 +- .../input/awss3/input_integration_test.go | 2 + x-pack/filebeat/input/awss3/metrics.go | 121 ++++++++++++++++-- x-pack/filebeat/input/awss3/metrics_test.go | 79 +++++++++++- x-pack/filebeat/input/awss3/s3.go | 2 +- x-pack/filebeat/input/awss3/s3_objects.go | 4 +- .../filebeat/input/awss3/s3_objects_test.go | 14 +- x-pack/filebeat/input/awss3/s3_test.go | 4 +- x-pack/filebeat/input/awss3/sqs.go | 7 +- x-pack/filebeat/input/awss3/sqs_s3_event.go | 3 +- .../filebeat/input/awss3/sqs_s3_event_test.go | 16 +-- 14 files changed, 229 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1adf4c125ff..8b0a41c9b6b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -244,6 +244,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add Basic Authentication support on constructed requests to CEL input {issue}34609[34609] {pull}34689[34689] - Add string manipulation extensions to CEL input {issue}34610[34610] {pull}34689[34689] - Add unix socket log parsing for nginx ingress_controller {pull}34732[34732] +- Added metric `sqs_worker_utilization` for aws-s3 input. {pull}34793[34793] - Improve CEL input documentation {pull}34831[34831] - Add metrics documentation for CEL and AWS CloudWatch inputs. {issue}34887[34887] {pull}34889[34889] - Register MIME handlers for CSV types in CEL input. {pull}34934[34934] diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 40b412fab61..50bc1fe7f99 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -738,6 +738,7 @@ observe the activity of the input. | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. | `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. +| `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. | `s3_objects_requested_total` | Number of S3 objects downloaded. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 669cf7f53e6..bc14a594a44 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -129,7 +129,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } defer receiver.metrics.Close() - // Poll sqs waiting metric periodically in the background. + // Poll metrics periodically in the background go pollSqsWaitingMetric(ctx, receiver) if err := receiver.Receive(ctx); err != nil { @@ -208,9 +208,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s if err != nil { return nil, err } - in.metrics = newInputMetrics(ctx.ID, nil) - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig) - sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory) + in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory, in.config.MaxNumberOfMessages) sqsReader := newSQSReader(log.Named("sqs"), in.metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler) return sqsReader, nil @@ -281,8 +281,8 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli if len(in.config.FileSelectors) == 0 { fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}} } - in.metrics = newInputMetrics(ctx.ID, nil) - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig) + in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages) s3Poller := newS3Poller(log.Named("s3_poller"), in.metrics, s3API, diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 2199d2b4980..d0a25410638 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -211,14 +211,14 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR return testing.Benchmark(func(b *testing.B) { log := logp.NewLogger(inputName) metricRegistry := monitoring.NewRegistry() - metrics := newInputMetrics("test_id", metricRegistry) + metrics := newInputMetrics("test_id", metricRegistry, maxMessagesInflight) sqsAPI := newConstantSQS() s3API := newConstantS3(t) pipeline := &fakePipeline{} conf := makeBenchmarkConfig(t) - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{}) - sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{}, maxMessagesInflight) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory, maxMessagesInflight) sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler) ctx, cancel := context.WithCancel(context.Background()) @@ -302,7 +302,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult log.Infof("benchmark with %d number of workers", numberOfWorkers) metricRegistry := monitoring.NewRegistry() - metrics := newInputMetrics("test_id", metricRegistry) + metrics := newInputMetrics("test_id", metricRegistry, numberOfWorkers) client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) { event.Private.(*awscommon.EventACKTracker).ACK() @@ -348,7 +348,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult return } - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}, numberOfWorkers) s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second) if err := s3Poller.Poll(ctx); err != nil { diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 29d43aca8e7..ad4fd7dc5ea 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -192,6 +192,7 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) + assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) } func TestInputRunS3(t *testing.T) { @@ -430,4 +431,5 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) + assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 3dcce8eee14..28819054d53 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -5,27 +5,46 @@ package awss3 import ( + "context" "io" + "math" + "sync" + "time" "github.com/rcrowley/go-metrics" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/adapter" + "github.com/elastic/go-concert/timed" ) +// currentTime returns the current time. This exists to allow unit tests +// simulate the passage of time. +var currentTime = time.Now + type inputMetrics struct { registry *monitoring.Registry unregister func() + ctx context.Context // ctx signals when to stop the sqs worker utilization goroutine. + cancel context.CancelFunc // cancel cancels the ctx context. + + sqsMaxMessagesInflight int // Maximum number of SQS workers allowed. + sqsWorkerUtilizationMutex sync.Mutex // Guards the sqs worker utilization fields. + sqsWorkerUtilizationLastUpdate time.Time // Time of the last SQS worker utilization calculation. + sqsWorkerUtilizationCurrentPeriod time.Duration // Elapsed execution duration of any SQS workers that completed during the current period. + sqsWorkerIDCounter uint64 // Counter used to assigned unique IDs to SQS workers. + sqsWorkerStartTimes map[uint64]time.Time // Map of SQS worker ID to the time at which the worker started. - sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). - sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. - sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). - sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). - sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. - sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. - sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). - sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. + sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). + sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. + sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). + sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). + sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. + sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. + sqsWorkerUtilization *monitoring.Float // Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized. + sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). + sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded. s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed. @@ -37,8 +56,9 @@ type inputMetrics struct { s3ObjectProcessingTime metrics.Sample // Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing). } -// Close removes the metrics from the registry. +// Close cancels the context and removes the metrics from the registry. func (m *inputMetrics) Close() { + m.cancel() m.unregister() } @@ -54,17 +74,90 @@ func (m *inputMetrics) setSQSMessagesWaiting(count int64) { m.sqsMessagesWaiting.Set(count) } -func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { +// beginSQSWorker tracks the start of a new SQS worker. The returned ID +// must be used to call endSQSWorker when the worker finishes. It also +// increments the sqsMessagesInflight counter. +func (m *inputMetrics) beginSQSWorker() (id uint64) { + m.sqsMessagesInflight.Inc() + + m.sqsWorkerUtilizationMutex.Lock() + defer m.sqsWorkerUtilizationMutex.Unlock() + m.sqsWorkerIDCounter++ + m.sqsWorkerStartTimes[m.sqsWorkerIDCounter] = currentTime() + return m.sqsWorkerIDCounter +} + +// endSQSWorker is used to signal that the specified worker has +// finished. This is used update the SQS worker utilization metric. +// It also decrements the sqsMessagesInflight counter and +// sqsMessageProcessingTime histogram. +func (m *inputMetrics) endSQSWorker(id uint64) { + m.sqsMessagesInflight.Dec() + + m.sqsWorkerUtilizationMutex.Lock() + defer m.sqsWorkerUtilizationMutex.Unlock() + now := currentTime() + start := m.sqsWorkerStartTimes[id] + delete(m.sqsWorkerStartTimes, id) + m.sqsMessageProcessingTime.Update(now.Sub(start).Nanoseconds()) + if start.Before(m.sqsWorkerUtilizationLastUpdate) { + m.sqsWorkerUtilizationCurrentPeriod += now.Sub(m.sqsWorkerUtilizationLastUpdate) + } else { + m.sqsWorkerUtilizationCurrentPeriod += now.Sub(start) + } +} + +// updateSqsWorkerUtilization updates the sqsWorkerUtilization metric. +// This is invoked periodically to compute the utilization level +// of the SQS workers. 0 indicates no workers were utilized during +// the period. And 1 indicates that all workers fully utilized +// during the period. +func (m *inputMetrics) updateSqsWorkerUtilization() { + m.sqsWorkerUtilizationMutex.Lock() + defer m.sqsWorkerUtilizationMutex.Unlock() + + now := currentTime() + lastPeriodDuration := now.Sub(m.sqsWorkerUtilizationLastUpdate) + maxUtilization := float64(m.sqsMaxMessagesInflight) * lastPeriodDuration.Seconds() + + for _, startTime := range m.sqsWorkerStartTimes { + // If the worker started before the current period then only compute + // from elapsed time since the last update. Otherwise, it started + // during the current period so compute time elapsed since it started. + if startTime.Before(m.sqsWorkerUtilizationLastUpdate) { + m.sqsWorkerUtilizationCurrentPeriod += lastPeriodDuration + } else { + m.sqsWorkerUtilizationCurrentPeriod += now.Sub(startTime) + } + } + + utilization := math.Round(m.sqsWorkerUtilizationCurrentPeriod.Seconds()/maxUtilization*1000) / 1000 + if utilization > 1 { + utilization = 1 + } + m.sqsWorkerUtilization.Set(utilization) + m.sqsWorkerUtilizationCurrentPeriod = 0 + m.sqsWorkerUtilizationLastUpdate = now +} + +func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers int) *inputMetrics { reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) + ctx, cancel := context.WithCancel(context.Background()) out := &inputMetrics{ registry: reg, unregister: unreg, + ctx: ctx, + cancel: cancel, + sqsMaxMessagesInflight: maxWorkers, + sqsWorkerStartTimes: map[uint64]time.Time{}, + sqsWorkerUtilizationLastUpdate: currentTime(), sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), + sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"), sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"), @@ -82,6 +175,14 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri Register("histogram", metrics.NewHistogram(out.sqsLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + + // Periodically update the sqs worker utilization metric. + //nolint:errcheck // This never returns an error. + go timed.Periodic(ctx, 5*time.Second, func() error { + out.updateSqsWorkerUtilization() + return nil + }) + return out } diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index c40a6dd4a9e..5d3132d5da9 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -6,7 +6,11 @@ package awss3 import ( "testing" + "time" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -17,10 +21,83 @@ import ( func TestInputMetricsClose(t *testing.T) { reg := monitoring.NewRegistry() - metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg) + metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg, 1) metrics.Close() reg.Do(monitoring.Full, func(s string, _ interface{}) { t.Errorf("registry should be empty, but found %v", s) }) } + +func TestInputMetricsSQSWorkerUtilization(t *testing.T) { + const interval = 5000 + + t.Run("worker ends before one interval", func(t *testing.T) { + fakeTimeMs.Store(0) + defer useFakeCurrentTimeThenReset()() + + reg := monitoring.NewRegistry() + metrics := newInputMetrics("test", reg, 1) + metrics.Close() + + id := metrics.beginSQSWorker() + fakeTimeMs.Add(2500) + metrics.endSQSWorker(id) + + fakeTimeMs.Store(1 * interval) + metrics.updateSqsWorkerUtilization() + assert.Equal(t, 0.5, metrics.sqsWorkerUtilization.Get()) + }) + t.Run("worker ends mid interval", func(t *testing.T) { + fakeTimeMs.Store(0) + defer useFakeCurrentTimeThenReset()() + + reg := monitoring.NewRegistry() + metrics := newInputMetrics("test", reg, 1) + metrics.Close() + + fakeTimeMs.Add(4000) + id := metrics.beginSQSWorker() + + fakeTimeMs.Store(1 * interval) + metrics.updateSqsWorkerUtilization() + + fakeTimeMs.Add(1000) + metrics.endSQSWorker(id) + + fakeTimeMs.Store(2 * interval) + metrics.updateSqsWorkerUtilization() + assert.Equal(t, 0.2, metrics.sqsWorkerUtilization.Get()) + }) + t.Run("running worker goes longer than an interval", func(t *testing.T) { + fakeTimeMs.Store(0) + defer useFakeCurrentTimeThenReset()() + + reg := monitoring.NewRegistry() + metrics := newInputMetrics("test", reg, 1) + metrics.Close() + + id := metrics.beginSQSWorker() + + fakeTimeMs.Store(1 * interval) + metrics.updateSqsWorkerUtilization() + assert.Equal(t, 1.0, metrics.sqsWorkerUtilization.Get()) + + fakeTimeMs.Store(2 * interval) + metrics.updateSqsWorkerUtilization() + assert.Equal(t, 1.0, metrics.sqsWorkerUtilization.Get()) + + metrics.endSQSWorker(id) + }) +} + +var fakeTimeMs = &atomic.Int64{} + +func useFakeCurrentTimeThenReset() (reset func()) { + currentTime = func() time.Time { + return time.UnixMilli(fakeTimeMs.Load()) + } + return func() { + currentTime = time.Now + } +} diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index d07af133f0a..817255f3795 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -77,7 +77,7 @@ func newS3Poller(log *logp.Logger, bucketPollInterval time.Duration, ) *s3Poller { if metrics == nil { - metrics = newInputMetrics("", monitoring.NewRegistry()) + metrics = newInputMetrics("", monitoring.NewRegistry(), numberOfWorkers) } return &s3Poller{ numberOfWorkers: numberOfWorkers, diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index a8dd796c68b..ef66181e9a3 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -45,9 +45,9 @@ type s3ObjectProcessorFactory struct { backupConfig backupConfig } -func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory { +func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig, maxWorkers int) *s3ObjectProcessorFactory { if metrics == nil { - metrics = newInputMetrics("", monitoring.NewRegistry()) + metrics = newInputMetrics("", monitoring.NewRegistry(), maxWorkers) } if len(sel) == 0 { sel = []fileSelectorConfig{ diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index b85fe43eff0..61b6124cd9a 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -144,7 +144,7 @@ func TestS3ObjectProcessor(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() require.Error(t, err) @@ -166,7 +166,7 @@ func TestS3ObjectProcessor(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() require.Error(t, err) @@ -193,7 +193,7 @@ func TestS3ObjectProcessor(t *testing.T) { Times(2), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() require.NoError(t, err) @@ -219,7 +219,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) @@ -249,7 +249,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) @@ -276,7 +276,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) @@ -322,7 +322,7 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int, Times(numEvents), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, selectors, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, selectors, backupConfig{}, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 99c88a6f9c6..1872388c8ce 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -125,7 +125,7 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}, numberOfWorkers) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) @@ -248,7 +248,7 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}, numberOfWorkers) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 1a62d0a4976..7b652e6c160 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -35,7 +35,7 @@ type sqsReader struct { func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessagesInflight int, msgHandler sqsProcessor) *sqsReader { if metrics == nil { - metrics = newInputMetrics("", monitoring.NewRegistry()) + metrics = newInputMetrics("", monitoring.NewRegistry(), maxMessagesInflight) } return &sqsReader{ maxMessagesInflight: maxMessagesInflight, @@ -79,13 +79,12 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Process each SQS message asynchronously with a goroutine. r.log.Debugf("Received %v SQS messages.", len(msgs)) r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) - r.metrics.sqsMessagesInflight.Add(uint64(len(msgs))) workerWg.Add(len(msgs)) for _, msg := range msgs { go func(msg types.Message, start time.Time) { + id := r.metrics.beginSQSWorker() defer func() { - r.metrics.sqsMessagesInflight.Dec() - r.metrics.sqsMessageProcessingTime.Update(time.Since(start).Nanoseconds()) + r.metrics.endSQSWorker(id) workerWg.Done() r.workerSem.Release(1) }() diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index 0ab0e3376e5..08700351e99 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -105,9 +105,10 @@ func newSQSS3EventProcessor( maxReceiveCount int, pipeline beat.Pipeline, s3 s3ObjectHandlerFactory, + maxWorkers int, ) *sqsS3EventProcessor { if metrics == nil { - metrics = newInputMetrics("", monitoring.NewRegistry()) + metrics = newInputMetrics("", monitoring.NewRegistry(), maxWorkers) } return &sqsS3EventProcessor{ s3ObjectHandler: s3, diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index b1020cd4943..8209d298031 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -50,7 +50,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) require.NoError(t, p.ProcessSQS(ctx, &msg)) }) @@ -73,7 +73,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&invalidBodyMsg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) err := p.ProcessSQS(ctx, &invalidBodyMsg) require.Error(t, err) t.Log(err) @@ -95,7 +95,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&emptyRecordsMsg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) require.NoError(t, p.ProcessSQS(ctx, &emptyRecordsMsg)) }) @@ -127,7 +127,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockS3Handler.EXPECT().FinalizeS3Object().Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory, 5) require.NoError(t, p.ProcessSQS(ctx, &msg)) }) @@ -150,7 +150,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockClient.EXPECT().Close(), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) err := p.ProcessSQS(ctx, &msg) t.Log(err) require.Error(t, err) @@ -181,7 +181,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) err := p.ProcessSQS(ctx, &msg) t.Log(err) require.Error(t, err) @@ -227,7 +227,7 @@ func TestSqsProcessor_keepalive(t *testing.T) { mockAPI.EXPECT().ChangeMessageVisibility(gomock.Any(), gomock.Eq(&msg), gomock.Eq(visibilityTimeout)). Times(1).Return(tc.Err) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory, 5) var wg sync.WaitGroup wg.Add(1) p.keepalive(ctx, p.log, &wg, &msg) @@ -239,7 +239,7 @@ func TestSqsProcessor_keepalive(t *testing.T) { func TestSqsProcessor_getS3Notifications(t *testing.T) { require.NoError(t, logp.TestingSetup()) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil, nil) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil, nil, 5) t.Run("s3 key is url unescaped", func(t *testing.T) { msg := newSQSMessage(newS3Event("Happy+Face.jpg"))