diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index d57b610a1dda..c30962a03ffc 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -24,9 +24,10 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/awslabs/karpenter/pkg/cloudprovider" - "go.uber.org/multierr" + "go.uber.org/multierr" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" ) const ( @@ -100,10 +101,7 @@ func (p *InstanceProvider) Create(ctx context.Context, if err != nil { return nil, fmt.Errorf("creating fleet %w", err) } - if count := len(createFleetOutput.Instances); count != 1 { - return nil, combineFleetErrors(createFleetOutput.Errors) - } - if count := len(createFleetOutput.Instances[0].InstanceIds); count != 1 { + if len(createFleetOutput.Instances) != 1 || len(createFleetOutput.Instances[0].InstanceIds) != 1 { return nil, combineFleetErrors(createFleetOutput.Errors) } return createFleetOutput.Instances[0].InstanceIds[0], nil @@ -134,8 +132,12 @@ func getInstanceID(node *v1.Node) (*string, error) { } func combineFleetErrors(errors []*ec2.CreateFleetError) (errs error) { + unique := sets.NewString() for _, err := range errors { - errs = multierr.Append(errs, fmt.Errorf("%s", *err.ErrorCode)) + unique.Insert(aws.StringValue(err.ErrorCode)) + } + for _, errorCode := range unique.List() { + errs = multierr.Append(errs, fmt.Errorf(errorCode)) } - return errs + return fmt.Errorf("with fleet errs, %w", errs) } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 1b36cb87d7ba..b8242e811d12 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -476,7 +476,7 @@ var _ = Describe("Allocation", func() { It("should not schedule a pod with an invalid subnet", func() { provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningFailed(ctx, env.Client, controller, provisioner, + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SubnetTagKeyLabel: "Invalid"}}), ) // Assertions @@ -562,7 +562,7 @@ var _ = Describe("Allocation", func() { }) It("should not schedule a pod with an invalid security group", func() { ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningFailed(ctx, env.Client, controller, provisioner, + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SecurityGroupTagKeyLabel: "Invalid"}}), ) // Assertions diff --git a/pkg/controllers/allocation/controller.go b/pkg/controllers/allocation/controller.go index 970a9f8d9a26..acda6749214f 100644 --- a/pkg/controllers/allocation/controller.go +++ b/pkg/controllers/allocation/controller.go @@ -22,10 +22,11 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/packing" + "github.com/awslabs/karpenter/pkg/utils/result" + "go.uber.org/multierr" "golang.org/x/time/rate" "knative.dev/pkg/logging" - "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -93,23 +94,21 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco // 3. Filter pods pods, err := c.Filter.GetProvisionablePods(ctx, provisioner) if err != nil { - return reconcile.Result{}, fmt.Errorf("filtering pods, %w", err) + return result.RetryIfError(ctx, fmt.Errorf("filtering pods, %w", err)) } if len(pods) == 0 { return reconcile.Result{}, nil } - logging.FromContext(ctx).Infof("Found %d provisionable pods", len(pods)) - // 4. Group by constraints constraintGroups, err := c.Constraints.Group(ctx, provisioner, pods) if err != nil { - return reconcile.Result{}, fmt.Errorf("building constraint groups, %w", err) + return result.RetryIfError(ctx, fmt.Errorf("building constraint groups, %w", err)) } // 5. Get Instance Types Options instanceTypes, err := c.CloudProvider.GetInstanceTypes(ctx) if err != nil { - return reconcile.Result{}, fmt.Errorf("getting instance types, %w", err) + return result.RetryIfError(ctx, fmt.Errorf("getting instance types, %w", err)) } // 6. Binpack each group @@ -128,7 +127,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco return c.Binder.Bind(ctx, node, packing.Pods) }) }) - return reconcile.Result{}, multierr.Combine(errs...) + return result.RetryIfError(ctx, multierr.Combine(errs...)) } func (c *Controller) Register(ctx context.Context, m manager.Manager) error { diff --git a/pkg/controllers/allocation/filter.go b/pkg/controllers/allocation/filter.go index fdd39113c522..d10511dacd9d 100644 --- a/pkg/controllers/allocation/filter.go +++ b/pkg/controllers/allocation/filter.go @@ -54,6 +54,7 @@ func (f *Filter) GetProvisionablePods(ctx context.Context, provisioner *v1alpha3 } provisionable = append(provisionable, ptr.Pod(p)) } + logging.FromContext(ctx).Infof("Found %d provisionable pods", len(provisionable)) return provisionable, nil } diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index c67ea36743a1..ce1029de594a 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -122,23 +122,6 @@ func ExpectProvisioningSucceeded(ctx context.Context, c client.Client, reconcile return result } -func ExpectProvisioningFailed(ctx context.Context, c client.Client, reconciler reconcile.Reconciler, provisioner *v1alpha3.Provisioner, pods ...*v1.Pod) []*v1.Pod { - for _, pod := range pods { - ExpectCreatedWithStatus(c, pod) - } - ExpectReconcileFailed(ctx, reconciler, client.ObjectKeyFromObject(provisioner)) - result := []*v1.Pod{} - for _, pod := range pods { - result = append(result, ExpectPodExists(c, pod.GetName(), pod.GetNamespace())) - } - return result -} - -func ExpectReconcileFailed(ctx context.Context, reconciler reconcile.Reconciler, key client.ObjectKey) { - _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).To(HaveOccurred()) -} - func ExpectReconcileSucceeded(ctx context.Context, reconciler reconcile.Reconciler, key client.ObjectKey) { _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/utils/result/result.go b/pkg/utils/result/result.go new file mode 100644 index 000000000000..b5cf52d9b449 --- /dev/null +++ b/pkg/utils/result/result.go @@ -0,0 +1,17 @@ +package result + +import ( + "context" + + "go.uber.org/multierr" + "knative.dev/pkg/logging" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// RetryIfError logs any errors and requeues if not nil. Supports multierr unwrapping. +func RetryIfError(ctx context.Context, err error) (reconcile.Result, error) { + for _, err := range multierr.Errors(err) { + logging.FromContext(ctx).Errorf("Failed allocation, %s", err.Error()) + } + return reconcile.Result{Requeue: err != nil}, nil +}