diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index da0ffd929e6..040b06ca5f1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -212,6 +212,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add pagination support for Salesforce module. {issue}34057[34057] {pull}34065[34065] - Allow users to redact sensitive data from CEL input debug logs. {pull}34302[34302] - Added support for HTTP destination override to Google Cloud Storage input. {pull}34413[34413] +- Added metric `sqs_messages_waiting_gauge` for aws-s3 input. {pull}34488[34488] - Add support for new Rabbitmq timestamp format for logs {pull}34211[34211] - Allow user configuration of timezone offset in Cisco ASA and FTD modules. {pull}34436[34436] - Allow user configuration of timezone offset in Checkpoint module. {pull}34472[34472] diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index bc165ccba29..40b412fab61 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -497,6 +497,11 @@ In case `delete_after_backup` is set the following permission is required as wel s3:DeleteObject ---- +In case optional SQS metric `sqs_messages_waiting_gauge` is desired, the following permission is required: +---- +sqs:GetQueueAttributes +---- + [float] === S3 and SQS setup @@ -732,6 +737,7 @@ observe the activity of the input. | `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. +| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. | `s3_objects_requested_total` | Number of S3 objects downloaded. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 4178661c42b..669cf7f53e6 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -6,13 +6,16 @@ package awss3 import ( "context" + "errors" "fmt" "net/url" "strings" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/smithy-go" "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -24,7 +27,10 @@ import ( "github.com/elastic/go-concert/unison" ) -const inputName = "aws-s3" +const ( + inputName = "aws-s3" + sqsAccessDeniedErrorCode = "AccessDeniedException" +) func Plugin(store beater.StateStore) v2.Plugin { return v2.Plugin{ @@ -123,6 +129,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } defer receiver.metrics.Close() + // Poll sqs waiting metric periodically in the background. + go pollSqsWaitingMetric(ctx, receiver) + if err := receiver.Receive(ctx); err != nil { return err } @@ -376,5 +385,30 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { return "unknown" } +func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { + t := time.NewTicker(time.Minute) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + count, err := receiver.GetApproximateMessageCount(ctx) + + var apiError smithy.APIError + if errors.As(err, &apiError) { + switch apiError.ErrorCode() { + case sqsAccessDeniedErrorCode: + // stop polling if auth error is encountered + receiver.metrics.setSQSMessagesWaiting(int64(count)) + return + } + } + + receiver.metrics.setSQSMessagesWaiting(int64(count)) + } + } +} + // boolPtr returns a pointer to b. func boolPtr(b bool) *bool { return &b } diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index ee113c4c303..2199d2b4980 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -67,6 +67,10 @@ func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqsTypes.M return nil } +func (c *constantSQS) GetQueueAttributes(ctx context.Context, attr []sqsTypes.QueueAttributeName) (map[string]string, error) { + return map[string]string{}, nil +} + type s3PagerConstant struct { mutex *sync.Mutex objects []s3Types.Object diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index d950e87ff69..29d43aca8e7 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -187,6 +187,7 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) @@ -424,6 +425,7 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 64e522168b6..1f1390c4f2f 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -41,6 +41,7 @@ type sqsAPI interface { sqsReceiver sqsDeleter sqsVisibilityChanger + sqsAttributeGetter } type sqsReceiver interface { @@ -55,6 +56,10 @@ type sqsVisibilityChanger interface { ChangeMessageVisibility(ctx context.Context, msg *types.Message, timeout time.Duration) error } +type sqsAttributeGetter interface { + GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) +} + type sqsProcessor interface { // ProcessSQS processes and SQS message. It takes fully ownership of the // given message and is responsible for updating the message's visibility @@ -197,6 +202,25 @@ func (a *awsSQSAPI) ChangeMessageVisibility(ctx context.Context, msg *types.Mess return nil } +func (a *awsSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { + ctx, cancel := context.WithTimeout(ctx, a.apiTimeout) + defer cancel() + + attributeOutput, err := a.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + AttributeNames: attr, + QueueUrl: awssdk.String(a.queueURL), + }) + + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + err = fmt.Errorf("api_timeout exceeded: %w", err) + } + return nil, fmt.Errorf("sqs GetQueueAttributes failed: %w", err) + } + + return attributeOutput.Attributes, nil +} + // ------ // AWS S3 implementation // ------ diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index a1e9f4d4423..3dcce8eee14 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -15,6 +15,7 @@ import ( ) type inputMetrics struct { + registry *monitoring.Registry unregister func() sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). @@ -22,6 +23,7 @@ type inputMetrics struct { sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. + sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. @@ -40,10 +42,23 @@ func (m *inputMetrics) Close() { m.unregister() } +func (m *inputMetrics) setSQSMessagesWaiting(count int64) { + if m.sqsMessagesWaiting == nil { + // if metric not initialized, and count is -1, do nothing + if count == -1 { + return + } + m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") + } + + m.sqsMessagesWaiting.Set(count) +} + func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) out := &inputMetrics{ + registry: reg, unregister: unreg, sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index 39889de990c..b976cf00ebc 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -73,6 +73,21 @@ func (mr *MockSQSAPIMockRecorder) DeleteMessage(ctx, msg interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMessage", reflect.TypeOf((*MockSQSAPI)(nil).DeleteMessage), ctx, msg) } +// GetQueueAttributes mocks base method. +func (m *MockSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetQueueAttributes", ctx, attr) + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetQueueAttributes indicates an expected call of GetQueueAttributes. +func (mr *MockSQSAPIMockRecorder) GetQueueAttributes(ctx, attr interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MockSQSAPI)(nil).GetQueueAttributes), ctx, attr) +} + // ReceiveMessage mocks base method. func (m *MockSQSAPI) ReceiveMessage(ctx context.Context, maxMessages int) ([]types.Message, error) { m.ctrl.T.Helper() @@ -200,6 +215,44 @@ func (mr *MocksqsVisibilityChangerMockRecorder) ChangeMessageVisibility(ctx, msg return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangeMessageVisibility", reflect.TypeOf((*MocksqsVisibilityChanger)(nil).ChangeMessageVisibility), ctx, msg, timeout) } +// MocksqsAttributeGetter is a mock of sqsAttributeGetter interface. +type MocksqsAttributeGetter struct { + ctrl *gomock.Controller + recorder *MocksqsAttributeGetterMockRecorder +} + +// MocksqsAttributeGetterMockRecorder is the mock recorder for MocksqsAttributeGetter. +type MocksqsAttributeGetterMockRecorder struct { + mock *MocksqsAttributeGetter +} + +// NewMocksqsAttributeGetter creates a new mock instance. +func NewMocksqsAttributeGetter(ctrl *gomock.Controller) *MocksqsAttributeGetter { + mock := &MocksqsAttributeGetter{ctrl: ctrl} + mock.recorder = &MocksqsAttributeGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MocksqsAttributeGetter) EXPECT() *MocksqsAttributeGetterMockRecorder { + return m.recorder +} + +// GetQueueAttributes mocks base method. +func (m *MocksqsAttributeGetter) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetQueueAttributes", ctx, attr) + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetQueueAttributes indicates an expected call of GetQueueAttributes. +func (mr *MocksqsAttributeGetterMockRecorder) GetQueueAttributes(ctx, attr interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MocksqsAttributeGetter)(nil).GetQueueAttributes), ctx, attr) +} + // MockSQSProcessor is a mock of sqsProcessor interface. type MockSQSProcessor struct { ctrl *gomock.Controller @@ -260,21 +313,7 @@ func (m *MockS3API) EXPECT() *MockS3APIMockRecorder { return m.recorder } -// GetObject mocks base method. -func (m *MockS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetObject", ctx, bucket, key) - ret0, _ := ret[0].(*s3.GetObjectOutput) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetObject indicates an expected call of GetObject. -func (mr *MockS3APIMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) -} - +// CopyObject mocks base method. func (m *MockS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CopyObject", ctx, from_bucket, to_bucket, from_key, to_key) @@ -283,11 +322,13 @@ func (m *MockS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from return ret0, ret1 } +// CopyObject indicates an expected call of CopyObject. func (mr *MockS3APIMockRecorder) CopyObject(ctx, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, from_bucket, to_bucket, from_key, to_key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockS3API)(nil).CopyObject), ctx, from_bucket, to_bucket, from_key, to_key) } +// DeleteObject mocks base method. func (m *MockS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteObject", ctx, bucket, key) @@ -296,9 +337,25 @@ func (m *MockS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.D return ret0, ret1 } +// DeleteObject indicates an expected call of DeleteObject. func (mr *MockS3APIMockRecorder) DeleteObject(ctx, bucket, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockS3API)(nil).DeleteObject), ctx, bucket, key) +} + +// GetObject mocks base method. +func (m *MockS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetObject", ctx, bucket, key) + ret0, _ := ret[0].(*s3.GetObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetObject indicates an expected call of GetObject. +func (mr *MockS3APIMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) } // ListObjectsPaginator mocks base method. @@ -353,6 +410,59 @@ func (mr *Mocks3GetterMockRecorder) GetObject(ctx, bucket, key interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*Mocks3Getter)(nil).GetObject), ctx, bucket, key) } +// Mocks3Mover is a mock of s3Mover interface. +type Mocks3Mover struct { + ctrl *gomock.Controller + recorder *Mocks3MoverMockRecorder +} + +// Mocks3MoverMockRecorder is the mock recorder for Mocks3Mover. +type Mocks3MoverMockRecorder struct { + mock *Mocks3Mover +} + +// NewMocks3Mover creates a new mock instance. +func NewMocks3Mover(ctrl *gomock.Controller) *Mocks3Mover { + mock := &Mocks3Mover{ctrl: ctrl} + mock.recorder = &Mocks3MoverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Mocks3Mover) EXPECT() *Mocks3MoverMockRecorder { + return m.recorder +} + +// CopyObject mocks base method. +func (m *Mocks3Mover) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CopyObject", ctx, from_bucket, to_bucket, from_key, to_key) + ret0, _ := ret[0].(*s3.CopyObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CopyObject indicates an expected call of CopyObject. +func (mr *Mocks3MoverMockRecorder) CopyObject(ctx, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*Mocks3Mover)(nil).CopyObject), ctx, from_bucket, to_bucket, from_key, to_key) +} + +// DeleteObject mocks base method. +func (m *Mocks3Mover) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", ctx, bucket, key) + ret0, _ := ret[0].(*s3.DeleteObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *Mocks3MoverMockRecorder) DeleteObject(ctx, bucket, key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*Mocks3Mover)(nil).DeleteObject), ctx, bucket, key) +} + // Mocks3Lister is a mock of s3Lister interface. type Mocks3Lister struct { ctrl *gomock.Controller @@ -507,32 +617,32 @@ func (m *MockS3ObjectHandler) EXPECT() *MockS3ObjectHandlerMockRecorder { return m.recorder } -// ProcessS3Object mocks base method. -func (m *MockS3ObjectHandler) ProcessS3Object() error { +// FinalizeS3Object mocks base method. +func (m *MockS3ObjectHandler) FinalizeS3Object() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessS3Object") + ret := m.ctrl.Call(m, "FinalizeS3Object") ret0, _ := ret[0].(error) return ret0 } -// ProcessS3Object indicates an expected call of ProcessS3Object. -func (mr *MockS3ObjectHandlerMockRecorder) ProcessS3Object() *gomock.Call { +// FinalizeS3Object indicates an expected call of FinalizeS3Object. +func (mr *MockS3ObjectHandlerMockRecorder) FinalizeS3Object() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).ProcessS3Object)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinalizeS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).FinalizeS3Object)) } // ProcessS3Object mocks base method. -func (m *MockS3ObjectHandler) FinalizeS3Object() error { +func (m *MockS3ObjectHandler) ProcessS3Object() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FinalizeS3Object") + ret := m.ctrl.Call(m, "ProcessS3Object") ret0, _ := ret[0].(error) return ret0 } // ProcessS3Object indicates an expected call of ProcessS3Object. -func (mr *MockS3ObjectHandlerMockRecorder) FinalizeS3Object() *gomock.Call { +func (mr *MockS3ObjectHandlerMockRecorder) ProcessS3Object() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinalizeS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).FinalizeS3Object)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).ProcessS3Object)) } // Wait mocks base method. diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index f5f175d4f6d..1a62d0a4976 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -7,6 +7,7 @@ package awss3 import ( "context" "errors" + "strconv" "sync" "time" @@ -19,7 +20,8 @@ import ( ) const ( - sqsRetryDelay = 10 * time.Second + sqsRetryDelay = 10 * time.Second + sqsApproximateNumberOfMessages = "ApproximateNumberOfMessages" ) type sqsReader struct { @@ -107,3 +109,15 @@ func (r *sqsReader) Receive(ctx context.Context) error { } return ctx.Err() } + +func (r *sqsReader) GetApproximateMessageCount(ctx context.Context) (int, error) { + attributes, err := r.sqs.GetQueueAttributes(ctx, []types.QueueAttributeName{sqsApproximateNumberOfMessages}) + if err == nil { + if c, found := attributes[sqsApproximateNumberOfMessages]; found { + if messagesCount, err := strconv.Atoi(c); err == nil { + return messagesCount, nil + } + } + } + return -1, err +} diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index d798f6e782e..fe0c731da06 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -23,7 +23,10 @@ import ( const testTimeout = 10 * time.Second -var errFakeConnectivityFailure = errors.New("fake connectivity failure") +var ( + errFakeConnectivityFailure = errors.New("fake connectivity failure") + errFakeGetAttributeFailute = errors.New("something went wrong") +) func TestSQSReceiver(t *testing.T) { err := logp.TestingSetup() @@ -109,6 +112,65 @@ func TestSQSReceiver(t *testing.T) { }) } +func TestGetApproximateMessageCount(t *testing.T) { + err := logp.TestingSetup() + assert.Nil(t, err) + + const maxMessages = 5 + const count = 500 + attrName := []types.QueueAttributeName{sqsApproximateNumberOfMessages} + attr := map[string]string{"ApproximateNumberOfMessages": "500"} + + t.Run("GetApproximateMessageCount success", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockMsgHandler := NewMockSQSProcessor(ctrl) + + gomock.InOrder( + mockAPI.EXPECT(). + GetQueueAttributes(gomock.Any(), gomock.Eq(attrName)). + Times(1). + DoAndReturn(func(_ context.Context, _ []types.QueueAttributeName) (map[string]string, error) { + return attr, nil + }), + ) + + receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) + receivedCount, err := receiver.GetApproximateMessageCount(ctx) + assert.Equal(t, count, receivedCount) + assert.Nil(t, err) + }) + + t.Run("GetApproximateMessageCount error", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + + mockAPI := NewMockSQSAPI(ctrl) + mockMsgHandler := NewMockSQSProcessor(ctrl) + + gomock.InOrder( + mockAPI.EXPECT(). + GetQueueAttributes(gomock.Any(), gomock.Eq(attrName)). + Times(1). + DoAndReturn(func(_ context.Context, _ []types.QueueAttributeName) (map[string]string, error) { + return nil, errFakeGetAttributeFailute + }), + ) + + receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) + receivedCount, err := receiver.GetApproximateMessageCount(ctx) + assert.Equal(t, -1, receivedCount) + assert.NotNil(t, err) + }) +} + func newSQSMessage(events ...s3EventV2) types.Message { body, err := json.Marshal(s3EventsV2{Records: events}) if err != nil {