From cc2217ced1dd549dbbed0abbd048caac6150ecf7 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 22 Oct 2020 12:45:17 -0600 Subject: [PATCH] Check context.Canceled and fix s3 input config (#22036) --- .../_meta/config/filebeat.inputs.reference.xpack.yml.tmpl | 4 ++-- x-pack/filebeat/filebeat.reference.yml | 4 ++-- x-pack/filebeat/input/s3/collector.go | 2 ++ x-pack/filebeat/input/s3/input.go | 8 +++++++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 16964b2c84e..f083b4c814b 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -67,8 +67,8 @@ #session_token: '${AWS_SESSION_TOKEN:"”}' #credential_profile_name: test-s3-input - # Queue urls (required) to receive queue messages from - #queue_urls: ["https://sqs.us-east-1.amazonaws.com/1234/test-s3-logs-queue"] + # Queue url (required) to receive queue messages from + #queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-s3-logs-queue" # The duration (in seconds) that the received messages are hidden from subsequent # retrieve requests after being retrieved by a ReceiveMessage request. diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index f10a46aa20e..80bfacbf2c3 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2410,8 +2410,8 @@ filebeat.inputs: #session_token: '${AWS_SESSION_TOKEN:"”}' #credential_profile_name: test-s3-input - # Queue urls (required) to receive queue messages from - #queue_urls: ["https://sqs.us-east-1.amazonaws.com/1234/test-s3-logs-queue"] + # Queue url (required) to receive queue messages from + #queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-s3-logs-queue" # The duration (in seconds) that the received messages are hidden from subsequent # retrieve requests after being retrieved by a ReceiveMessage request. diff --git a/x-pack/filebeat/input/s3/collector.go b/x-pack/filebeat/input/s3/collector.go index c3d3114c723..9596b5ab23f 100644 --- a/x-pack/filebeat/input/s3/collector.go +++ b/x-pack/filebeat/input/s3/collector.go @@ -153,8 +153,10 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs. for { select { case <-c.cancellation.Done(): + fmt.Println("------- c.cancellation.Done()") return nil case err := <-errC: + fmt.Println("------- err = ", err) if err != nil { if err == context.DeadlineExceeded { c.logger.Info("Context deadline exceeded, updating visibility timeout") diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 36f160d759e..a3f19f66327 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -5,6 +5,7 @@ package s3 import ( + "context" "fmt" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -67,7 +68,12 @@ func (in *s3Input) Run(ctx v2.Context, pipeline beat.Pipeline) error { defer collector.publisher.Close() collector.run() - return ctx.Cancelation.Err() + + if ctx.Cancelation.Err() == context.Canceled { + return nil + } else { + return ctx.Cancelation.Err() + } } func (in *s3Input) createCollector(ctx v2.Context, pipeline beat.Pipeline) (*s3Collector, error) {