Skip to content

Commit

Permalink
Merge pull request #9637 from hakman/aws-eventual-consistency
Browse files Browse the repository at this point in the history
Cleanup AWS EC2 eventual consistency warnings
  • Loading branch information
k8s-ci-robot authored Jul 29, 2020
2 parents ae23a2a + 3a11207 commit be78301
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 119 deletions.
4 changes: 2 additions & 2 deletions pkg/resources/ops/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func DeleteResources(cloud fi.Cloud, resourceMap map[string]*resources.Resource)
if err != nil {
mutex.Lock()
if awsresources.IsDependencyViolation(err) {
fmt.Printf("%s\tstill has dependencies, will retry: %v\n", human, err)
klog.V(4).Infof("API call made when had dependency: %s", human)
fmt.Printf("%s\tstill has dependencies, will retry\n", human)
klog.V(4).Infof("resource %q generated a dependency error: %v", human, err)
} else {
fmt.Printf("%s\terror deleting resources, will retry: %v\n", human, err)
}
Expand Down
8 changes: 7 additions & 1 deletion upup/pkg/fi/cloudup/awstasks/autoscalinggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,13 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos

// @step: attempt to create the autoscaling group for us
if _, err := t.Cloud.Autoscaling().CreateAutoScalingGroup(request); err != nil {
return fmt.Errorf("error creating AutoscalingGroup: %v", err)
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "ValidationError" && strings.Contains(message, "Invalid IAM Instance Profile name") {
klog.V(4).Infof("error creating AutoscalingGroup: %s", message)
return fi.NewTryAgainLaterError("waiting for the IAM Instance Profile to be propagated")
}
return fmt.Errorf("error creating AutoScalingGroup: %s", message)
}

// @step: attempt to enable the metrics for us
Expand Down
26 changes: 0 additions & 26 deletions upup/pkg/fi/cloudup/awstasks/iaminstanceprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package awstasks

import (
"fmt"
"time"

"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
Expand Down Expand Up @@ -124,31 +123,6 @@ func (_ *IAMInstanceProfile) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *IAM

e.ID = response.InstanceProfile.InstanceProfileId
e.Name = response.InstanceProfile.InstanceProfileName

// IAM instance profile seems to be highly asynchronous
// and if we don't wait creating dependent resources fail
attempt := 0
for {
if attempt > 10 {
klog.Warningf("unable to retrieve newly-created IAM instance profile %q; timed out", *e.Name)
break
}

ip, err := findIAMInstanceProfile(t.Cloud, *e.Name)
if err != nil {
klog.Warningf("ignoring error while retrieving newly-created IAM instance profile %q: %v", *e.Name, err)
}

if ip != nil {
// Found
klog.V(4).Infof("Found IAM instance profile %q", *e.Name)
break
}

// TODO: Use a real backoff algorithm
time.Sleep(3 * time.Second)
attempt++
}
}

// TODO: Should we use path as our tag?
Expand Down
34 changes: 7 additions & 27 deletions upup/pkg/fi/cloudup/awstasks/launchconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (
"math"
"sort"
"strings"
"time"

"k8s.io/kops/pkg/apis/kops"

"k8s.io/kops/pkg/featureflag"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
Expand Down Expand Up @@ -354,32 +352,14 @@ func (_ *LaunchConfiguration) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *La
request.InstanceMonitoring = &autoscaling.InstanceMonitoring{Enabled: fi.Bool(false)}
}

attempt := 0
maxAttempts := 10
for {
attempt++

klog.V(8).Infof("AWS CreateLaunchConfiguration %s", aws.StringValue(request.LaunchConfigurationName))
_, err = t.Cloud.Autoscaling().CreateLaunchConfiguration(request)
if err == nil {
break
if _, err = t.Cloud.Autoscaling().CreateLaunchConfiguration(request); err != nil {
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "ValidationError" && strings.Contains(message, "Invalid IamInstanceProfile") {
klog.V(4).Infof("error creating LaunchConfiguration: %s", message)
return fi.NewTryAgainLaterError("waiting for the IAM Instance Profile to be propagated")
}

if awsup.AWSErrorCode(err) == "ValidationError" {
message := awsup.AWSErrorMessage(err)
if strings.Contains(message, "not authorized") || strings.Contains(message, "Invalid IamInstance") {
if attempt > maxAttempts {
return fmt.Errorf("IAM instance profile not yet created/propagated (original error: %v)", message)
}
klog.V(4).Infof("got an error indicating that the IAM instance profile %q is not ready: %q", fi.StringValue(e.IAMInstanceProfile.Name), message)
klog.Infof("waiting for IAM instance profile %q to be ready", fi.StringValue(e.IAMInstanceProfile.Name))
time.Sleep(10 * time.Second)
continue
}
klog.V(4).Infof("ErrorCode=%q, Message=%q", awsup.AWSErrorCode(err), awsup.AWSErrorMessage(err))
}

return fmt.Errorf("error creating AutoscalingLaunchConfiguration: %v", err)
return fmt.Errorf("error creating LaunchConfiguration: %s", message)
}

e.ID = fi.String(launchConfigurationName)
Expand Down
28 changes: 2 additions & 26 deletions upup/pkg/fi/cloudup/awstasks/launchtemplate_target_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"sort"
"strings"
"time"

"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
Expand Down Expand Up @@ -142,31 +141,8 @@ func (t *LaunchTemplate) RenderAWS(c *awsup.AWSAPITarget, a, ep, changes *Launch
}
}
// @step: attempt to create the launch template
err = func() error {
for attempt := 0; attempt < 10; attempt++ {
if _, err = c.Cloud.EC2().CreateLaunchTemplate(input); err == nil {
return nil
}

if awsup.AWSErrorCode(err) == "ValidationError" {
message := awsup.AWSErrorMessage(err)
if strings.Contains(message, "not authorized") || strings.Contains(message, "Invalid IamInstance") {
if attempt > 10 {
return fmt.Errorf("IAM instance profile not yet created/propagated (original error: %v)", message)
}
klog.V(4).Infof("got an error indicating that the IAM instance profile %q is not ready: %q", fi.StringValue(ep.IAMInstanceProfile.Name), message)

time.Sleep(5 * time.Second)
continue
}
klog.V(4).Infof("ErrorCode=%q, Message=%q", awsup.AWSErrorCode(err), awsup.AWSErrorMessage(err))
}
}

return err
}()
if err != nil {
return fmt.Errorf("failed to create aws launch template: %s", err)
if _, err = c.Cloud.EC2().CreateLaunchTemplate(input); err != nil {
return fmt.Errorf("error creating LaunchTemplate: %v", err)
}

ep.ID = fi.String(name)
Expand Down
23 changes: 0 additions & 23 deletions upup/pkg/fi/cloudup/awstasks/natgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,29 +272,6 @@ func (e *NatGateway) Run(c *fi.Context) error {
return fi.DefaultDeltaRunMethod(e, c)
}

func (e *NatGateway) waitAvailable(cloud awsup.AWSCloud) error {
// It takes 'forever' (up to 5 min...) for a NatGateway to become available after it has been created
// We have to wait until it is actually up

// TODO: Cache availability status

id := aws.StringValue(e.ID)
if id == "" {
return fmt.Errorf("NAT Gateway %q did not have ID", aws.StringValue(e.Name))
}

klog.Infof("Waiting for NAT Gateway %q to be available (this often takes about 5 minutes)", id)
params := &ec2.DescribeNatGatewaysInput{
NatGatewayIds: []*string{e.ID},
}
err := cloud.EC2().WaitUntilNatGatewayAvailable(params)
if err != nil {
return fmt.Errorf("error waiting for NAT Gateway %q to be available: %v", id, err)
}

return nil
}

func (_ *NatGateway) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *NatGateway) error {
// New NGW

Expand Down
27 changes: 15 additions & 12 deletions upup/pkg/fi/cloudup/awstasks/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error {
} else if e.InternetGateway != nil {
request.GatewayId = checkNotNil(e.InternetGateway.ID)
} else if e.NatGateway != nil {
if err := e.NatGateway.waitAvailable(t.Cloud); err != nil {
return err
}

request.NatGatewayId = checkNotNil(e.NatGateway.ID)
}

Expand All @@ -175,7 +171,13 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error {

response, err := t.Cloud.EC2().CreateRoute(request)
if err != nil {
return fmt.Errorf("error creating Route: %v", err)
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "InvalidNatGatewayID.NotFound" {
klog.V(4).Infof("error creating Route: %s", message)
return fi.NewTryAgainLaterError("waiting for the NAT Gateway to be created")
}
return fmt.Errorf("error creating Route: %s", message)
}

if !aws.BoolValue(response.Return) {
Expand All @@ -191,10 +193,6 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error {
} else if e.InternetGateway != nil {
request.GatewayId = checkNotNil(e.InternetGateway.ID)
} else if e.NatGateway != nil {
if err := e.NatGateway.waitAvailable(t.Cloud); err != nil {
return err
}

request.NatGatewayId = checkNotNil(e.NatGateway.ID)
}

Expand All @@ -204,9 +202,14 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error {

klog.V(2).Infof("Updating Route with RouteTable:%q CIDR:%q", *e.RouteTable.ID, *e.CIDR)

_, err := t.Cloud.EC2().ReplaceRoute(request)
if err != nil {
return fmt.Errorf("error updating Route: %v", err)
if _, err := t.Cloud.EC2().ReplaceRoute(request); err != nil {
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "InvalidNatGatewayID.NotFound" {
klog.V(4).Infof("error creating Route: %s", message)
return fi.NewTryAgainLaterError("waiting for the NAT Gateway to be created")
}
return fmt.Errorf("error creating Route: %s", message)
}
}

Expand Down
15 changes: 15 additions & 0 deletions upup/pkg/fi/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,18 @@ func NewExistsAndWarnIfChangesError(message string) *ExistsAndWarnIfChangesError

// ExistsAndWarnIfChangesError implementation of the error interface.
func (e *ExistsAndWarnIfChangesError) Error() string { return e.msg }

// TryAgainLaterError is the custom used when a task needs to fail validation with a message and try again later
type TryAgainLaterError struct {
msg string
}

// NewTryAgainLaterError is a builder for TryAgainLaterError.
func NewTryAgainLaterError(message string) *TryAgainLaterError {
return &TryAgainLaterError{
msg: message,
}
}

// TryAgainLaterError implementation of the error interface.
func (e *TryAgainLaterError) Error() string { return e.msg }
8 changes: 6 additions & 2 deletions upup/pkg/fi/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func (e *executor) RunTasks(taskMap map[string]Task) error {
}

remaining := time.Second * time.Duration(int(time.Until(ts.deadline).Seconds()))
klog.Warningf("error running task %q (%v remaining to succeed): %v", ts.key, remaining, err)
if _, ok := err.(*TryAgainLaterError); ok {
klog.Infof("Task %q not ready: %v", ts.key, err)
} else {
klog.Warningf("error running task %q (%v remaining to succeed): %v", ts.key, remaining, err)
}
errors = append(errors, err)
ts.lastError = err
} else {
Expand All @@ -140,7 +144,7 @@ func (e *executor) RunTasks(taskMap map[string]Task) error {
// Logic error!
panic("did not make progress executing tasks; but no errors reported")
}
klog.Infof("No progress made, sleeping before retrying %d failed task(s)", len(errors))
klog.Infof("No progress made, sleeping before retrying %d task(s)", len(errors))
time.Sleep(e.options.WaitAfterAllTasksFailed)
}
}
Expand Down

0 comments on commit be78301

Please sign in to comment.