Skip to content

Commit

Permalink
Fix PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bhapas committed Jul 25, 2023
1 parent e46df6c commit 26dff85
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 8 deletions.
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ This directory contains a Terraform module that creates the AWS resources needed
for executing the integration tests for the `aws-s3` Filebeat input. It creates
an S3 bucket and SQS queue and configures S3 `ObjectCreated:*` notifications to
be delivered to SQS. It also creates a second S3 bucket, SNS topic, SQS queue and configures S3 `ObjectCreated:*` notifications to be delivered to SNS and also creates a subscription for this SNS topic to SQS queue to automatically place messages sent to SNS topic in SQS queue.

## Cloud AWS environment

It outputs configuration information that is consumed by the tests to
Expand Down
4 changes: 0 additions & 4 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,6 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) {

t := time.NewTicker(time.Minute)
defer t.Stop()
// Initialize the metric to 0 at the start of the one minute time interval to avoid
// giving misleading metric value -1 even though SQS messages are processed.
// The value will be updated every minute
receiver.metrics.sqsMessagesWaiting.Set(int64(0))
for {
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestInputRunSQSOnLocalstack(t *testing.T) {
// Ensure SQS is empty before testing.
drainSQS(t, region, queueUrl, awsCfg)

//Upload test files to S3 to generate SQS notification
// Upload test files to S3 to generate SQS notification
s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.UsePathStyle = true
})
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"),
s3ObjectProcessingTime: metrics.NewUniformSample(1024),
}

// Initializing the sqs_messages_waiting_gauge value to -1 so that we can distinguish between no messages waiting (0) and never collected / error collecting (-1).
out.sqsMessagesWaiting.Set(int64(-1))
adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept).
Expand Down
3 changes: 0 additions & 3 deletions x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func (r *sqsReader) Receive(ctx context.Context) error {
// Process each SQS message asynchronously with a goroutine.
r.log.Debugf("Received %v SQS messages.", len(msgs))
r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs)))
// Initialize the sqs_message_waiting_gauge to 0 to indicate that that SQS messages are received.
// PollSqsWaitingMetric shall reassign the value if there are messages waiting
r.metrics.sqsMessagesWaiting.Set(int64(0))
workerWg.Add(len(msgs))

for _, msg := range msgs {
Expand Down

0 comments on commit 26dff85

Please sign in to comment.