Skip to content

Commit

Permalink
Adding aws-s3 metric for approximate messages waiting (#34488)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeller authored and chrisberkhout committed Jun 1, 2023
1 parent 5eceee8 commit aa5aeb7
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add pagination support for Salesforce module. {issue}34057[34057] {pull}34065[34065]
- Allow users to redact sensitive data from CEL input debug logs. {pull}34302[34302]
- Added support for HTTP destination override to Google Cloud Storage input. {pull}34413[34413]
- Added metric `sqs_messages_waiting_gauge` for aws-s3 input. {pull}34488[34488]
- Add support for new Rabbitmq timestamp format for logs {pull}34211[34211]
- Allow user configuration of timezone offset in Cisco ASA and FTD modules. {pull}34436[34436]
- Allow user configuration of timezone offset in Checkpoint module. {pull}34472[34472]
Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ In case `delete_after_backup` is set the following permission is required as wel
s3:DeleteObject
----

In case optional SQS metric `sqs_messages_waiting_gauge` is desired, the following permission is required:
----
sqs:GetQueueAttributes
----

[float]
=== S3 and SQS setup

Expand Down Expand Up @@ -732,6 +737,7 @@ observe the activity of the input.
| `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge).
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
| `s3_objects_requested_total` | Number of S3 objects downloaded.
Expand Down
36 changes: 35 additions & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ package awss3

import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/smithy-go"

"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand All @@ -24,7 +27,10 @@ import (
"github.com/elastic/go-concert/unison"
)

const inputName = "aws-s3"
const (
inputName = "aws-s3"
sqsAccessDeniedErrorCode = "AccessDeniedException"
)

func Plugin(store beater.StateStore) v2.Plugin {
return v2.Plugin{
Expand Down Expand Up @@ -123,6 +129,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
defer receiver.metrics.Close()

// Poll sqs waiting metric periodically in the background.
go pollSqsWaitingMetric(ctx, receiver)

if err := receiver.Receive(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -376,5 +385,30 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string {
return "unknown"
}

func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) {
t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
count, err := receiver.GetApproximateMessageCount(ctx)

var apiError smithy.APIError
if errors.As(err, &apiError) {
switch apiError.ErrorCode() {
case sqsAccessDeniedErrorCode:
// stop polling if auth error is encountered
receiver.metrics.setSQSMessagesWaiting(int64(count))
return
}
}

receiver.metrics.setSQSMessagesWaiting(int64(count))
}
}
}

// boolPtr returns a pointer to b.
func boolPtr(b bool) *bool { return &b }
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqsTypes.M
return nil
}

func (c *constantSQS) GetQueueAttributes(ctx context.Context, attr []sqsTypes.QueueAttributeName) (map[string]string, error) {
return map[string]string{}, nil
}

type s3PagerConstant struct {
mutex *sync.Mutex
objects []s3Types.Object
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
Expand Down Expand Up @@ -424,6 +425,7 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
Expand Down
24 changes: 24 additions & 0 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type sqsAPI interface {
sqsReceiver
sqsDeleter
sqsVisibilityChanger
sqsAttributeGetter
}

type sqsReceiver interface {
Expand All @@ -55,6 +56,10 @@ type sqsVisibilityChanger interface {
ChangeMessageVisibility(ctx context.Context, msg *types.Message, timeout time.Duration) error
}

type sqsAttributeGetter interface {
GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error)
}

type sqsProcessor interface {
// ProcessSQS processes and SQS message. It takes fully ownership of the
// given message and is responsible for updating the message's visibility
Expand Down Expand Up @@ -197,6 +202,25 @@ func (a *awsSQSAPI) ChangeMessageVisibility(ctx context.Context, msg *types.Mess
return nil
}

func (a *awsSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) {
ctx, cancel := context.WithTimeout(ctx, a.apiTimeout)
defer cancel()

attributeOutput, err := a.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
AttributeNames: attr,
QueueUrl: awssdk.String(a.queueURL),
})

if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("api_timeout exceeded: %w", err)
}
return nil, fmt.Errorf("sqs GetQueueAttributes failed: %w", err)
}

return attributeOutput.Attributes, nil
}

// ------
// AWS S3 implementation
// ------
Expand Down
15 changes: 15 additions & 0 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
)

type inputMetrics struct {
registry *monitoring.Registry
unregister func()

sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge).
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.

Expand All @@ -40,10 +42,23 @@ func (m *inputMetrics) Close() {
m.unregister()
}

func (m *inputMetrics) setSQSMessagesWaiting(count int64) {
if m.sqsMessagesWaiting == nil {
// if metric not initialized, and count is -1, do nothing
if count == -1 {
return
}
m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge")
}

m.sqsMessagesWaiting.Set(count)
}

func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent)

out := &inputMetrics{
registry: reg,
unregister: unreg,
sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"),
sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"),
Expand Down
Loading

0 comments on commit aa5aeb7

Please sign in to comment.