diff --git a/controllers/core/node_controller.go b/controllers/core/node_controller.go index 6e750eec..d8b9b78b 100644 --- a/controllers/core/node_controller.go +++ b/controllers/core/node_controller.go @@ -168,6 +168,12 @@ func (r *NodeReconciler) Check() healthz.Checker { return nil } + if r.Manager.SkipHealthCheck() { + // node manager observes EC2 error on processing node, pausing reconciler check to avoid stressing the system + r.Log.Info("due to EC2 error, node controller skips node reconciler health check for now") + return nil + } + err := rcHealthz.PingWithTimeout(func(c chan<- error) { // when the reconciler is ready, testing the reconciler with a fake node request pingRequest := &ctrl.Request{ diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go index 092caf34..cf22bdfa 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go @@ -102,6 +102,20 @@ func (mr *MockManagerMockRecorder) GetNode(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNode", reflect.TypeOf((*MockManager)(nil).GetNode), arg0) } +// SkipHealthCheck mocks base method. +func (m *MockManager) SkipHealthCheck() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SkipHealthCheck") + ret0, _ := ret[0].(bool) + return ret0 +} + +// SkipHealthCheck indicates an expected call of SkipHealthCheck. +func (mr *MockManagerMockRecorder) SkipHealthCheck() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SkipHealthCheck", reflect.TypeOf((*MockManager)(nil).SkipHealthCheck)) +} + // UpdateNode mocks base method. func (m *MockManager) UpdateNode(arg0 string) error { m.ctrl.T.Helper() diff --git a/pkg/node/manager/manager.go b/pkg/node/manager/manager.go index 2759e775..b6539877 100644 --- a/pkg/node/manager/manager.go +++ b/pkg/node/manager/manager.go @@ -57,6 +57,7 @@ type manager struct { worker asyncWorker.Worker conditions condition.Conditions controllerVersion string + stopHealthCheckAt time.Time } // Manager to perform operation on list of managed/un-managed node @@ -66,6 +67,7 @@ type Manager interface { UpdateNode(nodeName string) error DeleteNode(nodeName string) error CheckNodeForLeakedENIs(nodeName string) + SkipHealthCheck() bool } // AsyncOperation is operation on a node after the lock has been released. @@ -96,6 +98,8 @@ type AsyncOperationJob struct { nodeName string } +const pausingHealthCheckDuration = 10 * time.Minute + // NewNodeManager returns a new node manager func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager, wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions, controllerVersion string, healthzHandler *rcHealthz.HealthzHandler) (Manager, error) { @@ -425,6 +429,10 @@ func (m *manager) performAsyncOperation(job interface{}) (ctrl.Result, error) { utils.SendNodeEventWithNodeName(m.wrapper.K8sAPI, asyncJob.nodeName, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", m.controllerVersion), v1.EventTypeNormal, m.Log) err = asyncJob.node.InitResources(m.resourceManager) if err != nil { + if pauseHealthCheckOnError(err) && !m.SkipHealthCheck() { + m.setStopHealthCheck() + log.Info("node manager sets a pause on health check due to observing a EC2 error", "error", err.Error()) + } log.Error(err, "removing the node from cache as it failed to initialize") m.removeNodeSafe(asyncJob.nodeName) // if initializing node failed, we want to make this visible although the manager will retry @@ -565,12 +573,36 @@ func (m *manager) check() healthz.Checker { randomName := uuid.New().String() _, found := m.GetNode(randomName) m.Log.V(1).Info("health check tested ping GetNode to check on datastore cache in node manager successfully", "TesedNodeName", randomName, "NodeFound", found) - var ping interface{} - m.worker.SubmitJob(ping) - m.Log.V(1).Info("health check tested ping SubmitJob with a nil job to check on worker queue in node manager successfully") + if m.SkipHealthCheck() { + m.Log.Info("due to EC2 error, node manager skips node worker queue health check for now") + } else { + var ping interface{} + m.worker.SubmitJob(ping) + m.Log.V(1).Info("health check tested ping SubmitJob with a nil job to check on worker queue in node manager successfully") + } c <- nil }, m.Log) return err } } + +func (m *manager) SkipHealthCheck() bool { + m.lock.RLock() + defer m.lock.RUnlock() + + return time.Since(m.stopHealthCheckAt) < pausingHealthCheckDuration +} + +func (m *manager) setStopHealthCheck() { + m.lock.Lock() + defer m.lock.Unlock() + + m.stopHealthCheckAt = time.Now() +} + +func pauseHealthCheckOnError(err error) bool { + return lo.ContainsBy(utils.PauseHealthCheckErrors, func(e string) bool { + return strings.Contains(err.Error(), e) + }) +} diff --git a/pkg/node/manager/manager_test.go b/pkg/node/manager/manager_test.go index c8450927..a580f125 100644 --- a/pkg/node/manager/manager_test.go +++ b/pkg/node/manager/manager_test.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "testing" + "time" "github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1" rcV1alpha1 "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1" @@ -684,6 +685,38 @@ func Test_performAsyncOperation_fail(t *testing.T) { assert.NoError(t, err) } +func Test_performAsyncOperation_fail_pausingHealthCheck(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mock := NewMock(ctrl, map[string]node.Node{nodeName: managedNode}) + + job := AsyncOperationJob{ + node: mock.MockNode, + nodeName: nodeName, + op: Init, + } + + mock.MockNode.EXPECT().InitResources(mock.MockResourceManager).Return(&node.ErrInitResources{ + Err: errors.New("RequestLimitExceeded: Request limit exceeded.\n\tstatus code: 503, request id: 123-123-123-123-123"), + }).Times(2) + mock.MockK8sAPI.EXPECT().GetNode(nodeName).Return(v1Node, nil).Times(2) + mock.MockK8sAPI.EXPECT().BroadcastEvent(v1Node, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", mock.Manager.controllerVersion), v1.EventTypeNormal).Times(2) + + _, err := mock.Manager.performAsyncOperation(job) + time.Sleep(time.Millisecond * 100) + assert.True(t, mock.Manager.SkipHealthCheck()) + assert.NotContains(t, mock.Manager.dataStore, nodeName) // It should be cleared from cache + assert.NoError(t, err) + + time.Sleep(time.Second * 2) + _, err = mock.Manager.performAsyncOperation(job) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 100) + assert.True(t, mock.Manager.SkipHealthCheck()) + assert.True(t, time.Since(mock.Manager.stopHealthCheckAt) > time.Second*2 && time.Since(mock.Manager.stopHealthCheckAt) < time.Second*3) +} + // Test_isPodENICapacitySet test if the pod-eni capacity then true is returned func Test_isPodENICapacitySet(t *testing.T) { ctrl := gomock.NewController(t) diff --git a/pkg/utils/errors.go b/pkg/utils/errors.go index 0b89093d..a4458d42 100644 --- a/pkg/utils/errors.go +++ b/pkg/utils/errors.go @@ -23,6 +23,7 @@ var ( ErrInsufficientCidrBlocks = errors.New("InsufficientCidrBlocks: The specified subnet does not have enough free cidr blocks to satisfy the request") ErrMsgProviderAndPoolNotFound = "cannot find the instance provider and pool from the cache" NotRetryErrors = []string{InsufficientCidrBlocksReason} + PauseHealthCheckErrors = []string{"RequestLimitExceeded"} ) // ShouldRetryOnError returns true if the error is retryable, else returns false