Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable node events when instance type is not supported #218

Merged
merged 1 commit into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions mocks/amazon-vcp-resource-controller-k8s/pkg/node/mock_node.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/aws/ec2/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (i *ec2Instance) LoadDetails(ec2APIHelper api.EC2APIHelper) error {
i.instanceType = *instance.InstanceType
limits, ok := vpc.Limits[i.instanceType]
if !ok {
return fmt.Errorf("unsupported instance type, couldn't find ENI Limit for instance %s", i.instanceType)
return fmt.Errorf("unsupported instance type, couldn't find ENI Limit for instance %s, error: %w", i.instanceType, utils.ErrNotFound)
}

defaultCardIdx := limits.DefaultNetworkCardIndex
Expand Down
5 changes: 4 additions & 1 deletion pkg/aws/ec2/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"fmt"
"testing"

"github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api"
mock_api "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
Expand Down Expand Up @@ -258,6 +259,8 @@ func TestEc2Instance_LoadDetails_InstanceENILimitNotFound(t *testing.T) {

err := ec2Instance.LoadDetails(mockEC2ApiHelper)
assert.NotNil(t, err)
// ensure the expected error is returned to trigger a node event
assert.ErrorIs(t, err, utils.ErrNotFound)

// Clean up
nwInterfaces.InstanceType = &instanceType
Expand Down
10 changes: 5 additions & 5 deletions pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (m *manager) AddNode(nodeName string) error {

if shouldManage {
newNode = node.NewManagedNode(m.Log, k8sNode.Name, GetNodeInstanceID(k8sNode),
GetNodeOS(k8sNode))
GetNodeOS(k8sNode), m.wrapper.K8sAPI, m.wrapper.EC2API)
err := m.updateSubnetIfUsingENIConfig(newNode, k8sNode)
if err != nil {
return err
Expand Down Expand Up @@ -195,7 +195,7 @@ func (m *manager) UpdateNode(nodeName string) error {
case UnManagedToManaged:
log.Info("node was previously un-managed, will be added as managed node now")
cachedNode = node.NewManagedNode(m.Log, k8sNode.Name,
GetNodeInstanceID(k8sNode), GetNodeOS(k8sNode))
GetNodeInstanceID(k8sNode), GetNodeOS(k8sNode), m.wrapper.K8sAPI, m.wrapper.EC2API)
// Update the Subnet if the node has custom networking configured
err = m.updateSubnetIfUsingENIConfig(cachedNode, k8sNode)
if err != nil {
Expand Down Expand Up @@ -313,7 +313,7 @@ func (m *manager) performAsyncOperation(job interface{}) (ctrl.Result, error) {
var err error
switch asyncJob.op {
case Init:
err = asyncJob.node.InitResources(m.resourceManager, m.wrapper.EC2API)
err = asyncJob.node.InitResources(m.resourceManager)
if err != nil {
log.Error(err, "removing the node from cache as it failed to initialize")
m.removeNodeSafe(asyncJob.nodeName)
Expand All @@ -328,9 +328,9 @@ func (m *manager) performAsyncOperation(job interface{}) (ctrl.Result, error) {
asyncJob.op = Update
return m.performAsyncOperation(asyncJob)
case Update:
err = asyncJob.node.UpdateResources(m.resourceManager, m.wrapper.EC2API)
err = asyncJob.node.UpdateResources(m.resourceManager)
case Delete:
err = asyncJob.node.DeleteResources(m.resourceManager, m.wrapper.EC2API)
err = asyncJob.node.DeleteResources(m.resourceManager)
default:
m.Log.V(1).Info("no operation operation requested",
"node", asyncJob.nodeName)
Expand Down
12 changes: 6 additions & 6 deletions pkg/node/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
mockError = fmt.Errorf("mock error")

unManagedNode = node.NewUnManagedNode(zap.New(), nodeName, instanceID, config.OSLinux)
managedNode = node.NewManagedNode(zap.New(), nodeName, instanceID, config.OSLinux)
managedNode = node.NewManagedNode(zap.New(), nodeName, instanceID, config.OSLinux, nil, nil)

healthzHandler = healthz.NewHealthzHandler(5)
)
Expand Down Expand Up @@ -432,19 +432,19 @@ func Test_performAsyncOperation(t *testing.T) {
job.op = Init

mock.MockK8sAPI.EXPECT().AddLabelToManageNode(v1Node, config.HasTrunkAttachedLabel, config.BooleanTrue).Return(true, nil).AnyTimes()
mock.MockNode.EXPECT().InitResources(mock.MockResourceManager, mock.MockEC2API).Return(nil)
mock.MockNode.EXPECT().UpdateResources(mock.MockResourceManager, mock.MockEC2API).Return(nil)
mock.MockNode.EXPECT().InitResources(mock.MockResourceManager).Return(nil)
mock.MockNode.EXPECT().UpdateResources(mock.MockResourceManager).Return(nil)
_, err := mock.Manager.performAsyncOperation(job)
assert.Contains(t, mock.Manager.dataStore, nodeName)
assert.NoError(t, err)

job.op = Update
mock.MockNode.EXPECT().UpdateResources(mock.MockResourceManager, mock.MockEC2API).Return(nil)
mock.MockNode.EXPECT().UpdateResources(mock.MockResourceManager).Return(nil)
_, err = mock.Manager.performAsyncOperation(job)
assert.NoError(t, err)

job.op = Delete
mock.MockNode.EXPECT().DeleteResources(mock.MockResourceManager, mock.MockEC2API).Return(nil)
mock.MockNode.EXPECT().DeleteResources(mock.MockResourceManager).Return(nil)
_, err = mock.Manager.performAsyncOperation(job)
assert.NoError(t, err)

Expand All @@ -465,7 +465,7 @@ func Test_performAsyncOperation_fail(t *testing.T) {
op: Init,
}

mock.MockNode.EXPECT().InitResources(mock.MockResourceManager, mock.MockEC2API).Return(&node.ErrInitResources{})
mock.MockNode.EXPECT().InitResources(mock.MockResourceManager).Return(&node.ErrInitResources{})

_, err := mock.Manager.performAsyncOperation(job)
assert.NotContains(t, mock.Manager.dataStore, nodeName) // It should be cleared from cache
Expand Down
34 changes: 24 additions & 10 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
package node

import (
"errors"
"fmt"
"sync"

"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/resource"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
v1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"
)
Expand All @@ -36,6 +40,10 @@ type node struct {
managed bool
// instance stores the ec2 instance details that is shared by all the providers
instance ec2.EC2Instance
// node has reference to k8s APIs
k8sAPI k8s.K8sWrapper
// node has reference to EC2 APIs
ec2API api.EC2APIHelper
}

// ErrInitResources to wrap error messages for all errors encountered
Expand All @@ -50,9 +58,9 @@ func (e *ErrInitResources) Error() string {
}

type Node interface {
InitResources(resourceManager resource.ResourceManager, helper api.EC2APIHelper) error
DeleteResources(resourceManager resource.ResourceManager, helper api.EC2APIHelper) error
UpdateResources(resourceManager resource.ResourceManager, helper api.EC2APIHelper) error
InitResources(resourceManager resource.ResourceManager) error
DeleteResources(resourceManager resource.ResourceManager) error
UpdateResources(resourceManager resource.ResourceManager) error

UpdateCustomNetworkingSpecs(subnetID string, securityGroup []string)
IsReady() bool
Expand All @@ -63,12 +71,14 @@ type Node interface {
}

// NewManagedNode returns node managed by the controller
func NewManagedNode(log logr.Logger, nodeName string, instanceID string, os string) Node {
func NewManagedNode(log logr.Logger, nodeName string, instanceID string, os string, k8sAPI k8s.K8sWrapper, ec2API api.EC2APIHelper) Node {
return &node{
managed: true,
log: log.WithName("node resource handler").
WithValues("node name", nodeName),
instance: ec2.NewEC2Instance(nodeName, instanceID, os),
k8sAPI: k8sAPI,
ec2API: ec2API,
}
}

Expand All @@ -86,7 +96,7 @@ func NewUnManagedNode(log logr.Logger, nodeName, instanceID, os string) Node {
}

// UpdateNode refreshes the capacity if it's reset to 0
func (n *node) UpdateResources(resourceManager resource.ResourceManager, helper api.EC2APIHelper) error {
func (n *node) UpdateResources(resourceManager resource.ResourceManager) error {
n.lock.Lock()
defer n.lock.Unlock()

Expand All @@ -112,7 +122,7 @@ func (n *node) UpdateResources(resourceManager resource.ResourceManager, helper
return fmt.Errorf("failed to update one or more resources %v", errUpdates)
}

err := n.instance.UpdateCurrentSubnetAndCidrBlock(helper)
err := n.instance.UpdateCurrentSubnetAndCidrBlock(n.ec2API)
if err != nil {
n.log.Error(err, "failed to update cidr block", "instance", n.instance.Name())
}
Expand All @@ -121,12 +131,16 @@ func (n *node) UpdateResources(resourceManager resource.ResourceManager, helper
}

// InitResources initializes the resource pool and provider of all supported resources
func (n *node) InitResources(resourceManager resource.ResourceManager, helper api.EC2APIHelper) error {
func (n *node) InitResources(resourceManager resource.ResourceManager) error {
n.lock.Lock()
defer n.lock.Unlock()

err := n.instance.LoadDetails(helper)
err := n.instance.LoadDetails(n.ec2API)
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
// Send a node event for users' visibility
msg := fmt.Sprintf("The instance type %s is not supported yet by the vpc resource controller", n.instance.Type())
utils.SendNodeEvent(n.k8sAPI, n.instance.Name(), "Unsupported", msg, v1.EventTypeWarning, n.log)
}
return &ErrInitResources{
Message: "failed to load instance details",
Err: err,
Expand Down Expand Up @@ -167,7 +181,7 @@ func (n *node) InitResources(resourceManager resource.ResourceManager, helper ap
}

// DeleteResources performs clean up of all the resource pools and provider of the nodes
func (n *node) DeleteResources(resourceManager resource.ResourceManager, _ api.EC2APIHelper) error {
func (n *node) DeleteResources(resourceManager resource.ResourceManager) error {
n.lock.Lock()
defer n.lock.Unlock()

Expand Down
Loading