From f0eb39025b83b845faa17b2a4f34b68299f68d3e Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 21 May 2024 17:55:22 +0300 Subject: [PATCH] fix: s3_input integration tests --- .ci/jobs/docker-compose.yml | 4 +- .../input/awss3/input_integration_test.go | 76 ++++++++++--------- 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/.ci/jobs/docker-compose.yml b/.ci/jobs/docker-compose.yml index e9fc43ff704..c27417158a3 100644 --- a/.ci/jobs/docker-compose.yml +++ b/.ci/jobs/docker-compose.yml @@ -6,10 +6,10 @@ services: image: busybox depends_on: localstack: { condition: service_healthy } - + localstack: container_name: "${localstack_integration_test_container}" - image: localstack/localstack:2.1.0 # Latest stable release + image: localstack/localstack:3.1.0 # Latest stable release ports: - "127.0.0.1:4566:4566" # LocalStack Gateway environment: diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 62cbc835011..af488505d0e 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -135,13 +135,22 @@ file_selectors: `, queueURL)) } -func createInput(t *testing.T, cfg *conf.C) *s3Input { +func createSQSInput(t *testing.T, cfg *conf.C) *sqsReaderInput { inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg) if err != nil { t.Fatal(err) } - return inputV2.(*s3Input) + return inputV2.(*sqsReaderInput) +} + +func createS3Input(t *testing.T, cfg *conf.C) *s3PollerInput { + inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg) + if err != nil { + t.Fatal(err) + } + + return inputV2.(*s3PollerInput) } func newV2Context() (v2.Context, func()) { @@ -235,11 +244,7 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { }) // Initialize s3Input with the test config - s3Input := &s3Input{ - config: config, - awsConfig: awsCfg, - store: openTestStatestore(), - } + s3Input := newSQSReaderInput(config, awsCfg) // Run S3 Input with desired context var errGroup errgroup.Group errGroup.Go(func() error { @@ -284,7 +289,7 @@ func TestInputRunSQS(t *testing.T) { "testdata/log.txt", // Skipped (no match). ) - s3Input := createInput(t, makeTestConfigSQS(tfConfig.QueueURL)) + sqsInput := createSQSInput(t, makeTestConfigSQS(tfConfig.QueueURL)) inputCtx, cancel := newV2Context() t.Cleanup(cancel) @@ -294,23 +299,23 @@ func TestInputRunSQS(t *testing.T) { var errGroup errgroup.Group errGroup.Go(func() error { - return s3Input.Run(inputCtx, &fakePipeline{}) + return sqsInput.Run(inputCtx, &fakePipeline{}) }) if err := errGroup.Wait(); err != nil { t.Fatal(err) } - assert.EqualValues(t, s3Input.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. - assert.EqualValues(t, s3Input.metrics.sqsMessagesInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. - assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) - assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end + assert.EqualValues(t, sqsInput.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. + assert.EqualValues(t, sqsInput.metrics.sqsMessagesInflight.Get(), 0) + assert.EqualValues(t, sqsInput.metrics.sqsMessagesDeletedTotal.Get(), 7) + assert.EqualValues(t, sqsInput.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) + assert.EqualValues(t, sqsInput.metrics.s3ObjectsInflight.Get(), 0) + assert.EqualValues(t, sqsInput.metrics.s3ObjectsRequestedTotal.Get(), 7) + assert.EqualValues(t, sqsInput.metrics.s3EventsCreatedTotal.Get(), 12) + assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0) + assert.EqualValues(t, sqsInput.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunS3(t *testing.T) { @@ -332,7 +337,7 @@ func TestInputRunS3(t *testing.T) { "testdata/log.txt", // Skipped (no match). ) - s3Input := createInput(t, makeTestConfigS3(tfConfig.BucketName)) + s3Input := createS3Input(t, makeTestConfigS3(tfConfig.BucketName)) inputCtx, cancel := newV2Context() t.Cleanup(cancel) @@ -353,7 +358,7 @@ func TestInputRunS3(t *testing.T) { assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3ObjectsListedTotal.Get(), 8) assert.EqualValues(t, s3Input.metrics.s3ObjectsProcessedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 6) + assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) } @@ -402,7 +407,10 @@ func makeAWSConfig(t *testing.T, region string) aws.Config { func drainSQS(t *testing.T, region string, queueURL string, cfg aws.Config) { sqs := &awsSQSAPI{ - client: sqs.NewFromConfig(cfg), + client: sqs.NewFromConfig(cfg, func(options *sqs.Options) { + //options.ClientLogMode = aws.LogResponseWithBody + options.Region = region + }), queueURL: queueURL, apiTimeout: 1 * time.Minute, visibilityTimeout: 30 * time.Second, @@ -523,7 +531,7 @@ func TestInputRunSNS(t *testing.T) { "testdata/log.txt", // Skipped (no match). ) - s3Input := createInput(t, makeTestConfigSQS(tfConfig.QueueURLForSNS)) + sqsInput := createSQSInput(t, makeTestConfigSQS(tfConfig.QueueURLForSNS)) inputCtx, cancel := newV2Context() t.Cleanup(cancel) @@ -533,21 +541,21 @@ func TestInputRunSNS(t *testing.T) { var errGroup errgroup.Group errGroup.Go(func() error { - return s3Input.Run(inputCtx, &fakePipeline{}) + return sqsInput.Run(inputCtx, &fakePipeline{}) }) if err := errGroup.Wait(); err != nil { t.Fatal(err) } - assert.EqualValues(t, s3Input.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. - assert.EqualValues(t, s3Input.metrics.sqsMessagesInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. - assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) - assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end + assert.EqualValues(t, sqsInput.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. + assert.EqualValues(t, sqsInput.metrics.sqsMessagesInflight.Get(), 0) + assert.EqualValues(t, sqsInput.metrics.sqsMessagesDeletedTotal.Get(), 7) + assert.EqualValues(t, sqsInput.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) + assert.EqualValues(t, sqsInput.metrics.s3ObjectsInflight.Get(), 0) + assert.EqualValues(t, sqsInput.metrics.s3ObjectsRequestedTotal.Get(), 7) + assert.EqualValues(t, sqsInput.metrics.s3EventsCreatedTotal.Get(), 12) + assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0) + assert.EqualValues(t, sqsInput.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end }