Skip to content

Commit

Permalink
Skipping health check on nodes if EC2 returns throttling errors (aws#485
Browse files Browse the repository at this point in the history
)
  • Loading branch information
haouc authored Oct 24, 2024
1 parent edffd4c commit 0895be1
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 3 deletions.
6 changes: 6 additions & 0 deletions controllers/core/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func (r *NodeReconciler) Check() healthz.Checker {
return nil
}

if r.Manager.SkipHealthCheck() {
// node manager observes EC2 error on processing node, pausing reconciler check to avoid stressing the system
r.Log.Info("due to EC2 error, node controller skips node reconciler health check for now")
return nil
}

err := rcHealthz.PingWithTimeout(func(c chan<- error) {
// when the reconciler is ready, testing the reconciler with a fake node request
pingRequest := &ctrl.Request{
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 35 additions & 3 deletions pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type manager struct {
worker asyncWorker.Worker
conditions condition.Conditions
controllerVersion string
stopHealthCheckAt time.Time
}

// Manager to perform operation on list of managed/un-managed node
Expand All @@ -66,6 +67,7 @@ type Manager interface {
UpdateNode(nodeName string) error
DeleteNode(nodeName string) error
CheckNodeForLeakedENIs(nodeName string)
SkipHealthCheck() bool
}

// AsyncOperation is operation on a node after the lock has been released.
Expand Down Expand Up @@ -96,6 +98,8 @@ type AsyncOperationJob struct {
nodeName string
}

const pausingHealthCheckDuration = 10 * time.Minute

// NewNodeManager returns a new node manager
func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager,
wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions, controllerVersion string, healthzHandler *rcHealthz.HealthzHandler) (Manager, error) {
Expand Down Expand Up @@ -425,6 +429,10 @@ func (m *manager) performAsyncOperation(job interface{}) (ctrl.Result, error) {
utils.SendNodeEventWithNodeName(m.wrapper.K8sAPI, asyncJob.nodeName, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", m.controllerVersion), v1.EventTypeNormal, m.Log)
err = asyncJob.node.InitResources(m.resourceManager)
if err != nil {
if pauseHealthCheckOnError(err) && !m.SkipHealthCheck() {
m.setStopHealthCheck()
log.Info("node manager sets a pause on health check due to observing a EC2 error", "error", err.Error())
}
log.Error(err, "removing the node from cache as it failed to initialize")
m.removeNodeSafe(asyncJob.nodeName)
// if initializing node failed, we want to make this visible although the manager will retry
Expand Down Expand Up @@ -565,12 +573,36 @@ func (m *manager) check() healthz.Checker {
randomName := uuid.New().String()
_, found := m.GetNode(randomName)
m.Log.V(1).Info("health check tested ping GetNode to check on datastore cache in node manager successfully", "TesedNodeName", randomName, "NodeFound", found)
var ping interface{}
m.worker.SubmitJob(ping)
m.Log.V(1).Info("health check tested ping SubmitJob with a nil job to check on worker queue in node manager successfully")
if m.SkipHealthCheck() {
m.Log.Info("due to EC2 error, node manager skips node worker queue health check for now")
} else {
var ping interface{}
m.worker.SubmitJob(ping)
m.Log.V(1).Info("health check tested ping SubmitJob with a nil job to check on worker queue in node manager successfully")
}
c <- nil
}, m.Log)

return err
}
}

func (m *manager) SkipHealthCheck() bool {
m.lock.RLock()
defer m.lock.RUnlock()

return time.Since(m.stopHealthCheckAt) < pausingHealthCheckDuration
}

func (m *manager) setStopHealthCheck() {
m.lock.Lock()
defer m.lock.Unlock()

m.stopHealthCheckAt = time.Now()
}

func pauseHealthCheckOnError(err error) bool {
return lo.ContainsBy(utils.PauseHealthCheckErrors, func(e string) bool {
return strings.Contains(err.Error(), e)
})
}
33 changes: 33 additions & 0 deletions pkg/node/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1"
rcV1alpha1 "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
Expand Down Expand Up @@ -684,6 +685,38 @@ func Test_performAsyncOperation_fail(t *testing.T) {
assert.NoError(t, err)
}

func Test_performAsyncOperation_fail_pausingHealthCheck(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mock := NewMock(ctrl, map[string]node.Node{nodeName: managedNode})

job := AsyncOperationJob{
node: mock.MockNode,
nodeName: nodeName,
op: Init,
}

mock.MockNode.EXPECT().InitResources(mock.MockResourceManager).Return(&node.ErrInitResources{
Err: errors.New("RequestLimitExceeded: Request limit exceeded.\n\tstatus code: 503, request id: 123-123-123-123-123"),
}).Times(2)
mock.MockK8sAPI.EXPECT().GetNode(nodeName).Return(v1Node, nil).Times(2)
mock.MockK8sAPI.EXPECT().BroadcastEvent(v1Node, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", mock.Manager.controllerVersion), v1.EventTypeNormal).Times(2)

_, err := mock.Manager.performAsyncOperation(job)
time.Sleep(time.Millisecond * 100)
assert.True(t, mock.Manager.SkipHealthCheck())
assert.NotContains(t, mock.Manager.dataStore, nodeName) // It should be cleared from cache
assert.NoError(t, err)

time.Sleep(time.Second * 2)
_, err = mock.Manager.performAsyncOperation(job)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 100)
assert.True(t, mock.Manager.SkipHealthCheck())
assert.True(t, time.Since(mock.Manager.stopHealthCheckAt) > time.Second*2 && time.Since(mock.Manager.stopHealthCheckAt) < time.Second*3)
}

// Test_isPodENICapacitySet test if the pod-eni capacity then true is returned
func Test_isPodENICapacitySet(t *testing.T) {
ctrl := gomock.NewController(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
ErrInsufficientCidrBlocks = errors.New("InsufficientCidrBlocks: The specified subnet does not have enough free cidr blocks to satisfy the request")
ErrMsgProviderAndPoolNotFound = "cannot find the instance provider and pool from the cache"
NotRetryErrors = []string{InsufficientCidrBlocksReason}
PauseHealthCheckErrors = []string{"RequestLimitExceeded"}
)

// ShouldRetryOnError returns true if the error is retryable, else returns false
Expand Down

0 comments on commit 0895be1

Please sign in to comment.