From 940b5c00d9df43bc28e9557467c38fdf216e8d88 Mon Sep 17 00:00:00 2001 From: Tiationg Kho Date: Tue, 10 Dec 2024 15:55:49 -0800 Subject: [PATCH 1/2] Add sqs client retryer --- cmd/node-termination-handler.go | 5 +- pkg/monitor/sqsevent/sqs-retryer.go | 48 ++++++++++ pkg/monitor/sqsevent/sqs-retryer_test.go | 116 +++++++++++++++++++++++ 3 files changed, 166 insertions(+), 3 deletions(-) create mode 100644 pkg/monitor/sqsevent/sqs-retryer.go create mode 100644 pkg/monitor/sqsevent/sqs-retryer_test.go diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index fc906d4a..700543ea 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -16,7 +16,6 @@ package main import ( "context" "fmt" - "github.com/aws/aws-node-termination-handler/pkg/monitor/asglifecycle" "os" "os/signal" "strings" @@ -31,6 +30,7 @@ import ( "github.com/aws/aws-node-termination-handler/pkg/interruptioneventstore" "github.com/aws/aws-node-termination-handler/pkg/logging" "github.com/aws/aws-node-termination-handler/pkg/monitor" + "github.com/aws/aws-node-termination-handler/pkg/monitor/asglifecycle" "github.com/aws/aws-node-termination-handler/pkg/monitor/rebalancerecommendation" "github.com/aws/aws-node-termination-handler/pkg/monitor/scheduledevent" "github.com/aws/aws-node-termination-handler/pkg/monitor/spotitn" @@ -43,7 +43,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/ec2" - "github.com/aws/aws-sdk-go/service/sqs" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "k8s.io/apimachinery/pkg/util/wait" @@ -223,7 +222,7 @@ func main() { QueueURL: nthConfig.QueueURL, InterruptionChan: interruptionChan, CancelChan: cancelChan, - SQS: sqs.New(sess), + SQS: sqsevent.GetSqsClient(sess), ASG: autoscaling.New(sess), EC2: ec2.New(sess), BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) }, diff --git a/pkg/monitor/sqsevent/sqs-retryer.go b/pkg/monitor/sqsevent/sqs-retryer.go new file mode 100644 index 00000000..c11d746a --- /dev/null +++ b/pkg/monitor/sqsevent/sqs-retryer.go @@ -0,0 +1,48 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sqsevent + +import ( + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/client" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" +) + +type SqsRetryer struct { + client.DefaultRetryer +} + +func (r SqsRetryer) ShouldRetry(req *request.Request) bool { + return r.DefaultRetryer.ShouldRetry(req) || + (req.Error != nil && strings.Contains(req.Error.Error(), "connection reset")) +} + +func GetSqsClient(sess *session.Session) *sqs.SQS { + return sqs.New(sess, &aws.Config{ + Retryer: SqsRetryer{ + DefaultRetryer: client.DefaultRetryer{ + NumMaxRetries: client.DefaultRetryerMaxNumRetries, + MinRetryDelay: client.DefaultRetryerMinRetryDelay, + MaxRetryDelay: 1200 * time.Millisecond, + MinThrottleDelay: client.DefaultRetryerMinThrottleDelay, + MaxThrottleDelay: 1200 * time.Millisecond, + }, + }, + }) +} diff --git a/pkg/monitor/sqsevent/sqs-retryer_test.go b/pkg/monitor/sqsevent/sqs-retryer_test.go new file mode 100644 index 00000000..d097f930 --- /dev/null +++ b/pkg/monitor/sqsevent/sqs-retryer_test.go @@ -0,0 +1,116 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sqsevent_test + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/aws/aws-node-termination-handler/pkg/monitor/sqsevent" + h "github.com/aws/aws-node-termination-handler/pkg/test" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/client" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" +) + +type temporaryError struct { + error + temp bool +} + +func TestGetSqsClient(t *testing.T) { + retryer := getSqsRetryer(t) + + h.Equals(t, client.DefaultRetryerMaxNumRetries, retryer.NumMaxRetries) + h.Equals(t, time.Duration(1200*time.Millisecond), retryer.MaxRetryDelay) +} + +func TestShouldRetry(t *testing.T) { + retryer := getSqsRetryer(t) + + testCases := []struct { + name string + req *request.Request + shouldRetry bool + }{ + { + name: "AWS throttling error", + req: &request.Request{ + Error: awserr.New("ThrottlingException", "Rate exceeded", nil), + }, + shouldRetry: true, + }, + { + name: "AWS validation error", + req: &request.Request{ + Error: awserr.New("ValidationError", "Invalid parameter", nil), + }, + shouldRetry: false, + }, + { + name: "read connection reset by peer error", + req: &request.Request{ + Error: &temporaryError{ + error: &net.OpError{ + Op: "read", + Err: fmt.Errorf("read: connection reset by peer"), + }, + temp: false, + }}, + shouldRetry: true, + }, + { + name: "read unknown error", + req: &request.Request{ + Error: &temporaryError{ + error: &net.OpError{ + Op: "read", + Err: fmt.Errorf("read unknown error"), + }, + temp: false, + }}, + shouldRetry: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := retryer.ShouldRetry(tc.req) + h.Equals(t, tc.shouldRetry, result) + }) + } +} + +func getSqsRetryer(t *testing.T) sqsevent.SqsRetryer { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + }) + h.Ok(t, err) + + sqsClient := sqsevent.GetSqsClient(sess) + h.Assert(t, sqsClient.Client.Config.Region != nil, "Region should not be nil") + h.Equals(t, "us-east-1", *sqsClient.Client.Config.Region) + + retryer, ok := sqsClient.Client.Config.Retryer.(sqsevent.SqsRetryer) + h.Assert(t, ok, "Retryer should be of type SqsRetryer") + return retryer +} + +func (e *temporaryError) Temporary() bool { + return e.temp +} From d7809a88ac7d24eea7c67fc90a37aa2cfb8f61e4 Mon Sep 17 00:00:00 2001 From: Tiationg Kho Date: Thu, 12 Dec 2024 10:10:04 -0800 Subject: [PATCH 2/2] Add doc comment to sqs retryer --- pkg/monitor/sqsevent/sqs-retryer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/monitor/sqsevent/sqs-retryer.go b/pkg/monitor/sqsevent/sqs-retryer.go index c11d746a..d740d758 100644 --- a/pkg/monitor/sqsevent/sqs-retryer.go +++ b/pkg/monitor/sqsevent/sqs-retryer.go @@ -37,6 +37,7 @@ func GetSqsClient(sess *session.Session) *sqs.SQS { return sqs.New(sess, &aws.Config{ Retryer: SqsRetryer{ DefaultRetryer: client.DefaultRetryer{ + // Monitor continuously monitors SQS for events every 2 seconds NumMaxRetries: client.DefaultRetryerMaxNumRetries, MinRetryDelay: client.DefaultRetryerMinRetryDelay, MaxRetryDelay: 1200 * time.Millisecond,