From 8e77ca7f8be9d101c7149d041b8cc1714c002bb0 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 17 May 2021 19:19:02 -0400 Subject: [PATCH] [Filebeat] Instrument aws-s3 with metrics (#25711) * Instrument aws-s3 with metrics Diagnosing performance issues with the aws-s3 input is difficult so this instruments it with some metrics to make this easier. These are the metrics that are added. - Number of SQS messages received (not necessarily processed fully). - Number of SQS visibility timeout extensions. - Number of SQS messages inflight (gauge). - Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). - Number of SQS messages deleted. - Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). - Number of S3 objects downloaded. - Number of S3 bytes processed. - Number of events created from processing S3 data. - Number of S3 objects inflight (gauge). - Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing). The metrics are structured as: dataset.: id= input=aws-s3 sqs_messages_received_total sqs_visibility_timeout_extensions_total sqs_messages_inflight_gauge sqs_messages_returned_total sqs_messages_deleted_total sqs_message_processing_time.histogram s3_objects_requested_total s3_bytes_processed_total s3_events_created_total s3_objects_inflight_gauge s3_object_processing_time.histogram The v2 input logger was updated to include the input ID to make correlation with metrics possible when an explicit `id` is not set in the input config. --- CHANGELOG.next.asciidoc | 1 + filebeat/input/v2/compat/compat.go | 2 +- .../docs/inputs/input-aws-s3.asciidoc | 158 +++++++++++------- x-pack/filebeat/input/awss3/collector.go | 51 +++++- x-pack/filebeat/input/awss3/input.go | 4 + x-pack/filebeat/input/awss3/metrics.go | 79 +++++++++ 6 files changed, 220 insertions(+), 75 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/metrics.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9fc182da2ca..abfcc1a4ef4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -589,6 +589,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update PanOS module to parse Global Protect & User ID logs. {issue}24722[24722] {issue}24724[24724] {pull}24927[24927] - Add HMAC signature validation support for http_endpoint input. {pull}24918[24918] - Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710] +- Add monitoring metrics to the `aws-s3` input. {pull}25711[25711] *Heartbeat* diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index ad2d2a16e8f..012fb3d9a2b 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -93,7 +93,7 @@ func (f *factory) Create( return &runner{ id: id, - log: f.log.Named(input.Name()), + log: f.log.Named(input.Name()).With("id", id), agent: &f.info, sig: ctxtool.WithCancelContext(context.Background()), input: input, diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 894a0aff926..da27e908fbd 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -11,17 +11,17 @@ AWS S3 ++++ -Use the `aws-s3` input to retrieve logs from S3 objects that are pointed by messages -from specific SQS queues. This input can, for example, be used to receive S3 -server access logs to monitor detailed records for the requests that are made to -a bucket. +Use the `aws-s3` input to retrieve logs from S3 objects that are pointed by +messages from specific SQS queues. This input can, for example, be used to +receive S3 server access logs to monitor detailed records for the requests that +are made to a bucket. -When processing a s3 object which pointed by a sqs message, if half of the set +When processing a S3 object which pointed by a SQS message, if half of the set visibility timeout passed and the processing is still ongoing, then the -visibility timeout of that sqs message will be reset to make sure the message +visibility timeout of that SQS message will be reset to make sure the message does not go back to the queue in the middle of the processing. If there are -errors happening during the processing of the s3 object, then the process will be -stopped and the sqs message will be returned back to the queue. +errors happening during the processing of the S3 object, then the process will +be stopped and the SQS message will be returned back to the queue. ["source","yaml",subs="attributes"] ---- @@ -39,9 +39,9 @@ The `aws-s3` input supports the following configuration options plus the ==== `api_timeout` The maximum duration of the AWS API call. If it exceeds the timeout, the AWS API -call will be interrupted. -The default AWS API call timeout for a message is 120 seconds. The minimum -is 0 seconds. The maximum is half of the visibility timeout value. +call will be interrupted. The default AWS API call timeout for a message is 120 +seconds. The minimum is 0 seconds. The maximum is half of the visibility timeout +value. [id="input-{type}-buffer_size"] [float] @@ -63,10 +63,10 @@ characters. This only applies to non-JSON logs. See <<_encoding_5>>. ==== `expand_event_list_from_field` If the fileset using this input expects to receive multiple messages bundled -under a specific field then the config option expand_event_list_from_field value -can be assigned the name of the field. This setting will be able to split the -messages under the group value into separate events. For example, CloudTrail logs -are in JSON format and events are found under the JSON object "Records". +under a specific field then the config option `expand_event_list_from_field` +value can be assigned the name of the field. This setting will be able to split +the messages under the group value into separate events. For example, CloudTrail +logs are in JSON format and events are found under the JSON object "Records". ["source","json"] ---- @@ -88,57 +88,59 @@ are in JSON format and events are found under the JSON object "Records". } ---- -Note: When `expand_event_list_from_field` parameter is given in the config, aws-s3 -input will assume the logs are in JSON format and decode them as JSON. Content -type will not be checked. -If a file has "application/json" content-type, `expand_event_list_from_field` -becomes required to read the json file. +Note: When `expand_event_list_from_field` parameter is given in the config, +aws-s3 input will assume the logs are in JSON format and decode them as JSON. +Content type will not be checked. If a file has "application/json" content-type, +`expand_event_list_from_field` becomes required to read the JSON file. [float] ==== `file_selectors` -If the SQS queue will have events that correspond to files that -{beatname_uc} shouldn't process `file_selectors` can be used to limit -the files that are downloaded. This is a list of selectors which are -made up of `regex` and `expand_event_list_from_field` options. The -`regex` should match the S3 object key in the SQS message, and the -optional `expand_event_list_from_field` is the same as the global -setting. If `file_selectors` is given, then any global -`expand_event_list_from_field` value is ignored in favor of the ones -specified in the `file_selectors`. Regex syntax is the same as the Go -language. Files that don't match one of the regexes won't be -processed. <>, <>, -<> and <> may also be -set for each file selector. +If the SQS queue will have events that correspond to files that {beatname_uc} +shouldn't process `file_selectors` can be used to limit the files that are +downloaded. This is a list of selectors which are made up of `regex` and +`expand_event_list_from_field` options. The `regex` should match the S3 object +key in the SQS message, and the optional `expand_event_list_from_field` is the +same as the global setting. If `file_selectors` is given, then any global +`expand_event_list_from_field` value is ignored in favor of the ones specified +in the `file_selectors`. Regex syntax is the same as the Go language. Files +that don't match one of the regexes won't be processed. +<>, <>, +<>, and <> may also be set for +each file selector. ["source", "yml"] ---- file_selectors: - - regex: '^AWSLogs/\d+/CloudTrail/' + - regex: '/CloudTrail/' + expand_event_list_from_field: 'Records' + - regex: '/CloudTrail-Digest/' + - regex: '/CloudTrail-Insight/' expand_event_list_from_field: 'Records' - - regex: '^AWSLogs/\d+/CloudTrail-Digest' ---- [float] ==== `fips_enabled` -Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`. +Enabling this option changes the service name from `s3` to `s3-fips` for +connecting to the correct service endpoint. For example: +`s3-fips.us-gov-east-1.amazonaws.com`. [id="input-{type}-max_bytes"] [float] ==== `max_bytes` -The maximum number of bytes that a single log message can have. All -bytes after `max_bytes` are discarded and not sent. This setting is -especially useful for multiline log messages, which can get -large. This only applies to non-JSON logs. The default is 10MB -(10485760). +The maximum number of bytes that a single log message can have. All bytes after +`max_bytes` are discarded and not sent. This setting is especially useful for +multiline log messages, which can get large. This only applies to non-JSON logs. +The default is 10MB (10485760). [float] ==== `max_number_of_messages` + The maximum number of messages to return. Amazon SQS never returns more messages -than this value (however, fewer messages might be returned). -Valid values: 1 to 10. Default: 5. +than this value (however, fewer messages might be returned). Valid values: 1 to +10. Default: 5. [id="input-{type}-multiline"] [float] @@ -146,10 +148,9 @@ Valid values: 1 to 10. Default: 5. beta[] -Options that control how {beatname_uc} deals with log messages that -span multiple lines. This only applies to non-JSON logs. See -<> for more information about configuring -multiline options. +Options that control how {beatname_uc} deals with log messages that span +multiple lines. This only applies to non-JSON logs. See <> +for more information about configuring multiline options. [float] ==== `queue_url` @@ -159,22 +160,24 @@ URL of the AWS SQS queue that messages will be received from. Required. [float] ==== `visibility_timeout` -The duration that the received messages are hidden from subsequent -retrieve requests after being retrieved by a ReceiveMessage request. -This value needs to be a lot bigger than {beatname_uc} collection frequency so -if it took too long to read the s3 log, this sqs message will not be reprocessed. -The default visibility timeout for a message is 300 seconds. The minimum -is 0 seconds. The maximum is 12 hours. +The duration that the received messages are hidden from subsequent retrieve +requests after being retrieved by a ReceiveMessage request. This value needs to +be a lot bigger than {beatname_uc} collection frequency so if it took too long +to read the S3 log, this SQS message will not be reprocessed. The default +visibility timeout for a message is 300 seconds. The minimum is 0 seconds. The +maximum is 12 hours. [float] ==== `aws credentials` -In order to make AWS API calls, `aws-s3` input requires AWS credentials.Please see - <> for more details. +In order to make AWS API calls, `aws-s3` input requires AWS credentials. Please +see <> for more details. [float] === AWS Permissions + Specific AWS permissions are required for IAM user to access SQS and S3: + ---- s3:GetObject sqs:ReceiveMessage @@ -184,6 +187,7 @@ sqs:DeleteMessage [float] === S3 and SQS setup + Enable bucket notification: any new object creation in S3 bucket will also create a notification through SQS. Please see https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html#step1-create-sqs-queue-for-notification[create-sqs-queue-for-notification] @@ -191,25 +195,49 @@ for more details. [float] === Parallel Processing + Multiple Filebeat instances can read from the same SQS queues at the same time. To horizontally scale processing when there are large amounts of log data flowing into an S3 bucket, you can run multiple {beatname_uc} instances that read from the same SQS queues at the same time. No additional configuration is required. -Using SQS ensures that each message in the queue is processed only once -even when multiple {beatname_uc} instances are running in parallel. To prevent +Using SQS ensures that each message in the queue is processed only once even +when multiple {beatname_uc} instances are running in parallel. To prevent {beatname_uc} from receiving and processing the message more than once, set the visibility timeout. -The visibility timeout begins when SQS returns a message to Filebeat. -During this time, Filebeat processes and deletes the message. However, if -Filebeat fails before deleting the message and your system doesn't call the -DeleteMessage action for that message before the visibility timeout expires, the -message becomes visible to other {beatname_uc} instances, and the message is -received again. By default, the visibility timeout is set to 5 minutes for aws-s3 -input in {beatname_uc}. 5 minutes is sufficient time for {beatname_uc} to read -SQS messages and process related s3 log files. +The visibility timeout begins when SQS returns a message to Filebeat. During +this time, Filebeat processes and deletes the message. However, if Filebeat +fails before deleting the message and your system doesn't call the DeleteMessage +action for that message before the visibility timeout expires, the message +becomes visible to other {beatname_uc} instances, and the message is received +again. By default, the visibility timeout is set to 5 minutes for aws-s3 input +in {beatname_uc}. 5 minutes is sufficient time for {beatname_uc} to read SQS +messages and process related s3 log files. + +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/dataset` path. They can be used to +observe the activity of the input. + +[options="header"] +|======= +| Metric | Description +| `sqs_messages_received_total` | Number of SQS messages received (not necessarily processed fully). +| `sqs_visibility_timeout_extensions_total` | Number of SQS visibility timeout extensions. +| `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). +| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). +| `sqs_messages_deleted_total` | Number of SQS messages deleted. +| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). +| `s3_objects_requested_total` | Number of S3 objects downloaded. +| `s3_bytes_processed_total` | Number of S3 bytes processed. +| `s3_events_created_total` | Number of events created from processing S3 data. +| `s3_objects_inflight_gauge` | Number of S3 objects inflight (gauge). +| `s3_object_processing_time` | Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing). +|======= [id="{beatname_lc}-input-{type}-common-options"] include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index 3735467385c..28edf97f7f3 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -40,6 +40,7 @@ import ( type s3Collector struct { cancellation context.Context logger *logp.Logger + metrics *inputMetrics config *config visibilityTimeout int64 @@ -89,7 +90,7 @@ type s3Context struct { mux sync.Mutex refs int err error // first error witnessed or multi error - errC chan error + errC chan<- error } // The duration (in seconds) for which the call waits for a message to arrive @@ -116,6 +117,7 @@ func (c *s3Collector) run() { c.logger.Debug("no message received from SQS") continue } + c.metrics.sqsMessagesReceivedTotal.Add(uint64(len(output.Messages))) // process messages received from sqs, get logs from s3 and create events c.processor(c.config.QueueURL, output.Messages, c.visibilityTimeout, c.s3, c.sqs) @@ -125,23 +127,29 @@ func (c *s3Collector) run() { func (c *s3Collector) processor(queueURL string, messages []sqs.Message, visibilityTimeout int64, svcS3 s3iface.ClientAPI, svcSQS sqsiface.ClientAPI) { var grp unison.MultiErrGroup numMessages := len(messages) - c.logger.Debugf("Processing %v messages", numMessages) + c.logger.Debugf("Processing %v SQS messages", numMessages) + c.metrics.sqsMessagesInflight.Add(uint64(len(messages))) // process messages received from sqs for i := range messages { i := i errC := make(chan error) + start := time.Now() grp.Go(func() (err error) { return c.processMessage(svcS3, messages[i], errC) }) grp.Go(func() (err error) { + defer func() { + c.metrics.sqsMessagesInflight.Dec() + c.metrics.sqsMessageProcessingTime.Update(time.Since(start).Nanoseconds()) + }() return c.processorKeepAlive(svcSQS, messages[i], queueURL, visibilityTimeout, errC) }) } grp.Wait() } -func (c *s3Collector) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, errC chan error) error { +func (c *s3Collector) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, errC chan<- error) error { s3Infos, err := c.handleSQSMessage(message) if err != nil { c.logger.Error(fmt.Errorf("handleSQSMessage failed: %w", err)) @@ -159,7 +167,17 @@ func (c *s3Collector) processMessage(svcS3 s3iface.ClientAPI, message sqs.Messag return nil } -func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Message, queueURL string, visibilityTimeout int64, errC chan error) error { +func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Message, queueURL string, visibilityTimeout int64, errC <-chan error) error { + var deletedSQSMessage bool + defer func() { + if deletedSQSMessage { + c.metrics.sqsMessagesDeletedTotal.Inc() + } else { + // This happens implicitly after the visibility timeout expires. + c.metrics.sqsMessagesReturnedTotal.Inc() + } + }() + for { select { case <-c.cancellation.Done(): @@ -175,8 +193,9 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs. err := c.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { c.logger.Error(fmt.Errorf("SQS ChangeMessageVisibilityRequest failed: %w", err)) + } else { + c.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) } - c.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) } else { // When ACK done, message will be deleted. Or when message is // not s3 ObjectCreated event related(handleSQSMessage function @@ -186,6 +205,8 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs. err := c.deleteMessage(queueURL, *message.ReceiptHandle, svcSQS) if err != nil { c.logger.Error(fmt.Errorf("deleteMessages failed: %w", err)) + } else { + deletedSQSMessage = true } } return err @@ -196,8 +217,10 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs. err := c.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { c.logger.Error(fmt.Errorf("SQS ChangeMessageVisibilityRequest failed: %w", err)) + } else { + c.logger.Infof("Message visibility timeout updated to %v seconds", visibilityTimeout) + c.metrics.sqsVisibilityTimeoutExtensionsTotal.Inc() } - c.logger.Infof("Message visibility timeout updated to %v seconds", visibilityTimeout) return err } } @@ -326,7 +349,7 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) { return s3Infos, nil } -func (c *s3Collector) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC chan error) error { +func (c *s3Collector) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC chan<- error) error { s3Ctx := &s3Context{ refs: 1, errC: errC, @@ -358,6 +381,16 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, ctx, cancelFn := context.WithTimeout(c.cancellation, c.config.APITimeout) defer cancelFn() + { + c.metrics.s3ObjectsRequestedTotal.Inc() + c.metrics.s3ObjectsInflight.Inc() + start := time.Now() + defer func() { + c.metrics.s3ObjectsInflight.Dec() + c.metrics.s3ObjectProcessingTime.Update(time.Since(start).Nanoseconds()) + }() + } + resp, err := req.Send(ctx) if err != nil { if awsErr, ok := err.(awserr.Error); ok { @@ -378,8 +411,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, defer resp.Body.Close() - bodyReader := bufio.NewReader(resp.Body) - + bodyReader := bufio.NewReader(newMonitoredReader(resp.Body, c.metrics.s3BytesProcessedTotal)) isS3ObjGzipped, err := isStreamGzipped(bodyReader) if err != nil { c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err)) @@ -552,6 +584,7 @@ func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int, obj func (c *s3Collector) forwardEvent(event beat.Event) error { c.publisher.Publish(event) + c.metrics.s3EventsCreatedTotal.Inc() return c.cancellation.Err() } diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index bdb5976bf2d..fcb22bb6510 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/go-concert/ctxtool" ) @@ -66,6 +67,7 @@ func (in *s3Input) Run(ctx v2.Context, pipeline beat.Pipeline) error { return err } + defer collector.metrics.Close() defer collector.publisher.Close() collector.run() @@ -114,6 +116,7 @@ func (in *s3Input) createCollector(ctx v2.Context, pipeline beat.Pipeline) (*s3C log.Debug("s3 service name = ", s3Servicename) log.Debug("s3 input config max_number_of_messages = ", in.config.MaxNumberOfMessages) log.Debug("s3 input config endpoint = ", in.config.AwsConfig.Endpoint) + metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() return &s3Collector{ cancellation: ctxtool.FromCanceller(ctx.Cancelation), logger: log, @@ -122,6 +125,7 @@ func (in *s3Input) createCollector(ctx v2.Context, pipeline beat.Pipeline) (*s3C visibilityTimeout: visibilityTimeout, sqs: sqs.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AwsConfig.Endpoint, "sqs", regionName, awsConfig)), s3: s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AwsConfig.Endpoint, s3Servicename, regionName, awsConfig)), + metrics: newInputMetrics(metricRegistry, ctx.ID), }, nil } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go new file mode 100644 index 00000000000..57f56561268 --- /dev/null +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -0,0 +1,79 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "io" + + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/libbeat/monitoring/adapter" +) + +type inputMetrics struct { + id string // Input ID. + parent *monitoring.Registry // Parent registry holding this input's ID as a key. + + sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). + sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. + sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). + sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). + sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. + sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). + + s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded. + s3BytesProcessedTotal *monitoring.Uint // Number of S3 bytes processed. + s3EventsCreatedTotal *monitoring.Uint // Number of events created from processing S3 data. + s3ObjectsInflight *monitoring.Uint // Number of S3 objects inflight (gauge). + s3ObjectProcessingTime metrics.Sample // Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing). +} + +// Close removes the metrics from the registry. +func (m *inputMetrics) Close() { + m.parent.Remove(m.id) +} + +func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { + reg := parent.NewRegistry(id) + monitoring.NewString(reg, "input").Set(inputName) + monitoring.NewString(reg, "id").Set(id) + out := &inputMetrics{ + id: id, + parent: reg, + sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), + sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), + sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), + sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), + sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), + sqsMessageProcessingTime: metrics.NewUniformSample(1024), + s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"), + s3BytesProcessedTotal: monitoring.NewUint(reg, "s3_bytes_processed_total"), + s3EventsCreatedTotal: monitoring.NewUint(reg, "s3_events_created_total"), + s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"), + s3ObjectProcessingTime: metrics.NewUniformSample(1024), + } + adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) + adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) + return out +} + +// monitoredReader implements io.Reader and counts the number of bytes read. +type monitoredReader struct { + reader io.Reader + totalBytesRead *monitoring.Uint +} + +func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader { + return &monitoredReader{reader: r, totalBytesRead: metric} +} + +func (m *monitoredReader) Read(p []byte) (int, error) { + n, err := m.reader.Read(p) + m.totalBytesRead.Add(uint64(n)) + return n, err +}