From 9c945551acafdafa852ec54312c1c13fc36e58c4 Mon Sep 17 00:00:00 2001 From: liwen wu Date: Sat, 7 Jul 2018 16:29:56 +0000 Subject: [PATCH] Add WARM_IP_TARGET support --- ipamd/ipamd.go | 98 ++++++++++++++++++++++++---- ipamd/ipamd_test.go | 65 +++++++++++++++++- pkg/awsutils/awsutils.go | 33 ++++++++++ pkg/awsutils/awsutils_test.go | 31 +++++++++ pkg/awsutils/mocks/awsutils_mocks.go | 12 ++++ 5 files changed, 224 insertions(+), 15 deletions(-) diff --git a/ipamd/ipamd.go b/ipamd/ipamd.go index 3e568832de9..fa199d1220f 100644 --- a/ipamd/ipamd.go +++ b/ipamd/ipamd.go @@ -47,6 +47,7 @@ const ( nodeIPPoolReconcileInterval = 60 * time.Second maxK8SRetries = 12 retryK8SInterval = 5 * time.Second + noWarmIPTarget = 0 ) var ( @@ -180,14 +181,6 @@ func (c *IPAMContext) nodeInit() error { for _, eni := range enis { log.Debugf("Discovered ENI %s", eni.ENIID) - err = c.awsClient.AllocAllIPAddress(eni.ENIID) - if err != nil { - ipamdErrInc("nodeInitAllocAllIPAddressFailed", err) - //TODO need to increment ipamd err stats - log.Warn("During ipamd init: error encountered on trying to allocate all available IP addresses", err) - // fall though to add those allocated OK addresses - } - err = c.setupENI(eni.ENIID, eni) if err != nil { log.Errorf("Failed to setup eni %s network: %v", eni.ENIID, err) @@ -295,6 +288,12 @@ func (c *IPAMContext) updateIPPoolIfRequired() { func (c *IPAMContext) retryAllocENIIP() { ipamdActionsInprogress.WithLabelValues("retryAllocENIIP").Add(float64(1)) defer ipamdActionsInprogress.WithLabelValues("retryAllocENIIP").Sub(float64(1)) + + curIPTarget, warmIPTargetDefined := c.getCurWarmIPTarget() + if warmIPTargetDefined && curIPTarget <= 0 { + log.Debugf("Skipping retry allocating ENI IP, warm IP target reached") + return + } maxIPLimit, err := c.awsClient.GetENIipLimit() if err != nil { log.Infof("Failed to retrieve ENI IP limit: %v", err) @@ -303,7 +302,12 @@ func (c *IPAMContext) retryAllocENIIP() { eni := c.dataStore.GetENINeedsIP(maxIPLimit) if eni != nil { log.Debugf("Attempt again to allocate IP address for eni :%s", eni.ID) - err := c.awsClient.AllocAllIPAddress(eni.ID) + var err error + if warmIPTargetDefined { + err = c.awsClient.AllocIPAddresses(eni.ID, int64(curIPTarget)) + } else { + err = c.awsClient.AllocIPAddresses(eni.ID, maxIPLimit) + } if err != nil { ipamdErrInc("retryAllocENIIPAllocAllIPAddressFailed", err) log.Warn("During eni repair: error encountered on allocate IP address", err) @@ -317,6 +321,12 @@ func (c *IPAMContext) retryAllocENIIP() { } c.lastNodeIPPoolAction = time.Now() c.addENIaddressesToDataStore(ec2Addrs, eni.ID) + + curIPTarget, warmIPTargetDefined := c.getCurWarmIPTarget() + if warmIPTargetDefined && curIPTarget <= 0 { + log.Debugf("Finish retry allocating ENI IP, warm IP target reached") + return + } } } @@ -345,6 +355,13 @@ func (c *IPAMContext) increaseIPPool() { log.Debug("Start increasing IP Pool size") ipamdActionsInprogress.WithLabelValues("increaseIPPool").Add(float64(1)) defer ipamdActionsInprogress.WithLabelValues("increaseIPPool").Sub(float64(1)) + + curIPTarget, warmIPTargetDefined := c.getCurWarmIPTarget() + if warmIPTargetDefined && curIPTarget <= 0 { + log.Debugf("Skipping increase IP Pool, warm IP target reached") + return + } + maxENIs, err := c.awsClient.GetENILimit() enisMax.Set(float64(maxENIs)) @@ -373,7 +390,17 @@ func (c *IPAMContext) increaseIPPool() { return } - err = c.awsClient.AllocAllIPAddress(eni) + maxIPLimit, err := c.awsClient.GetENIipLimit() + if err != nil { + log.Infof("Failed to retrieve ENI IP limit: %v", err) + return + } + + if warmIPTargetDefined { + err = c.awsClient.AllocIPAddresses(eni, int64(curIPTarget)) + } else { + err = c.awsClient.AllocIPAddresses(eni, maxIPLimit) + } if err != nil { log.Warnf("Failed to allocate all available ip addresses on an ENI %v", err) // continue to proecsses those allocated ip addresses @@ -527,12 +554,22 @@ func logPoolStats(total, used, currentMaxAddrsPerENI, maxAddrsPerENI int) { //nodeIPPoolTooLow returns true if IP pool is below low threshold func (c *IPAMContext) nodeIPPoolTooLow() bool { + curIPTarget, warmIPTargetDefined := c.getCurWarmIPTarget() + if warmIPTargetDefined && curIPTarget <= 0 { + return false + } + + if warmIPTargetDefined && curIPTarget > 0 { + return true + } + + // if WARM-IP-TARGET not defined fallback using number of ENIs warmENITarget := getWarmENITarget() total, used := c.dataStore.GetStats() logPoolStats(total, used, c.currentMaxAddrsPerENI, c.maxAddrsPerENI) available := total - used - return (available <= c.currentMaxAddrsPerENI*warmENITarget) + return (available <= c.maxAddrsPerENI*warmENITarget) } // NodeIPPoolTooHigh returns true if IP pool is above high threshold @@ -542,8 +579,13 @@ func (c *IPAMContext) nodeIPPoolTooHigh() bool { logPoolStats(total, used, c.currentMaxAddrsPerENI, c.maxAddrsPerENI) available := total - used - return (available > (warmENITarget+1)*c.currentMaxAddrsPerENI) + target := getWarmIPTarget() + if target != noWarmIPTarget && target >= available { + return false + } + + return (available > (warmENITarget+1)*c.maxAddrsPerENI) } func ipamdErrInc(fn string, err error) { @@ -664,3 +706,35 @@ func (c *IPAMContext) eniIPPoolReconcile(ipPool map[string]*datastore.AddressInf return nil } + +func getWarmIPTarget() int { + inputStr, found := os.LookupEnv("WARM_IP_TARGET") + + if !found { + return noWarmIPTarget + } + + if input, err := strconv.Atoi(inputStr); err == nil { + if input < 0 { + return noWarmIPTarget + } + log.Debugf("Using WARM-IP-TARGET %v", input) + return input + } + return noWarmIPTarget +} + +func (c *IPAMContext) getCurWarmIPTarget() (int, bool) { + target := getWarmIPTarget() + if target == noWarmIPTarget { + // there is no WARM_IP_TARGET defined, fallback to use all IP addresses on ENI + return target, false + } + + total, used := c.dataStore.GetStats() + log.Debugf("Current warm IP stats: target: %d, total: %d, used: %d", + target, total, used) + curTarget := target - (total - used) + + return curTarget, true +} diff --git a/ipamd/ipamd_test.go b/ipamd/ipamd_test.go index ea180772170..d2afdf4d6eb 100644 --- a/ipamd/ipamd_test.go +++ b/ipamd/ipamd_test.go @@ -15,6 +15,7 @@ package ipamd import ( "net" + "os" "testing" "github.com/aws/amazon-vpc-cni-k8s/ipamd/datastore" @@ -101,7 +102,6 @@ func TestNodeInit(t *testing.T) { //primaryENIid mockAWS.EXPECT().GetPrimaryENI().Return(primaryENIid) - mockAWS.EXPECT().AllocAllIPAddress(primaryENIid).Return(nil) attachmentID := testAttachmentID testAddr1 := ipaddr01 testAddr2 := ipaddr02 @@ -116,7 +116,6 @@ func TestNodeInit(t *testing.T) { //secENIid mockAWS.EXPECT().GetPrimaryENI().Return(primaryENIid) - mockAWS.EXPECT().AllocAllIPAddress(secENIid).Return(nil) attachmentID = testAttachmentID testAddr11 := ipaddr11 testAddr12 := ipaddr12 @@ -162,7 +161,9 @@ func TestIncreaseIPPool(t *testing.T) { mockAWS.EXPECT().GetENILimit().Return(4, nil) mockAWS.EXPECT().AllocENI().Return(eni2, nil) - mockAWS.EXPECT().AllocAllIPAddress(eni2) + mockAWS.EXPECT().GetENIipLimit().Return(int64(5), nil) + + mockAWS.EXPECT().AllocIPAddresses(eni2, int64(5)) mockAWS.EXPECT().GetAttachedENIs().Return([]awsutils.ENIMetadata{ { @@ -270,3 +271,61 @@ func TestNodeIPPoolReconcile(t *testing.T) { assert.Equal(t, len(curENIs.ENIIPPools), 0) assert.Equal(t, curENIs.TotalIPs, 0) } + +func TestGetWarmENITarget(t *testing.T) { + ctrl, _, _, _, _ := setup(t) + defer ctrl.Finish() + + os.Setenv("WARM_IP_TARGET", "5") + warmIPTarget := getWarmIPTarget() + assert.Equal(t, warmIPTarget, 5) + + os.Unsetenv("WARM_IP_TARGET") + warmIPTarget = getWarmIPTarget() + assert.Equal(t, warmIPTarget, noWarmIPTarget) + + os.Setenv("WARM_IP_TARGET", "non-integer-string") + warmIPTarget = getWarmIPTarget() + assert.Equal(t, warmIPTarget, noWarmIPTarget) +} + +func TestGetCurWarmIPTarget(t *testing.T) { + ctrl, mockAWS, mockK8S, _, mockNetwork := setup(t) + defer ctrl.Finish() + + mockContext := &IPAMContext{ + awsClient: mockAWS, + k8sClient: mockK8S, + networkClient: mockNetwork, + primaryIP: make(map[string]string), + } + + mockContext.dataStore = datastore.NewDataStore() + + os.Unsetenv("WARM_IP_TARGET") + _, warmIPTargetDefined := mockContext.getCurWarmIPTarget() + assert.False(t, warmIPTargetDefined) + + os.Setenv("WARM_IP_TARGET", "5") + curWarmIPTarget, warmIPTargetDefined := mockContext.getCurWarmIPTarget() + assert.True(t, warmIPTargetDefined) + assert.Equal(t, curWarmIPTarget, 5) + + // add 2 addresses to datastore + mockContext.dataStore.AddENI("eni-1", 1, true) + mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.1") + mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.2") + + curWarmIPTarget, warmIPTargetDefined = mockContext.getCurWarmIPTarget() + assert.True(t, warmIPTargetDefined) + assert.Equal(t, curWarmIPTarget, 3) + + // add 3 more addresses to datastore + mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.3") + mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.4") + mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.5") + + curWarmIPTarget, warmIPTargetDefined = mockContext.getCurWarmIPTarget() + assert.True(t, warmIPTargetDefined) + assert.Equal(t, curWarmIPTarget, 0) +} diff --git a/pkg/awsutils/awsutils.go b/pkg/awsutils/awsutils.go index 02e77836fd4..4423139bc92 100644 --- a/pkg/awsutils/awsutils.go +++ b/pkg/awsutils/awsutils.go @@ -107,6 +107,9 @@ type APIs interface { // AllocAllIPAddress allocates all ip addresses available on an eni AllocAllIPAddress(eniID string) error + // Allocate alloactes numIPs of IP address on a eni + AllocIPAddresses(eniID string, numIPs int64) error + // GetVPCIPv4CIDR returns vpc's cidr GetVPCIPv4CIDR() string @@ -826,6 +829,36 @@ func (cache *EC2InstanceMetadataCache) GetENILimit() (int, error) { return eniLimit, nil } +// Allocate alloactes numIPs of IP address on a eni +func (cache *EC2InstanceMetadataCache) AllocIPAddresses(eniID string, numIPs int64) error { + var needIPs = int64(numIPs) + + ipLimit, err := cache.GetENIipLimit() + if err == nil && ipLimit < int64(needIPs) { + needIPs = ipLimit + } + + log.Infof("Trying to allocate %d IP address on eni %s", needIPs, eniID) + + input := &ec2.AssignPrivateIpAddressesInput{ + NetworkInterfaceId: aws.String(eniID), + SecondaryPrivateIpAddressCount: aws.Int64(int64(needIPs)), + } + + start := time.Now() + _, err = cache.ec2SVC.AssignPrivateIpAddresses(input) + awsAPILatency.WithLabelValues("AssignPrivateIpAddresses", fmt.Sprint(err != nil)).Observe(msSince(start)) + if err != nil { + awsAPIErrInc("AssignPrivateIpAddresses", err) + if containsPrivateIPAddressLimitExceededError(err) { + return nil + } + log.Errorf("Failed to allocate a private IP address %v", err) + return errors.Wrap(err, "allocate ip address: failed to allocate a private IP address") + } + return nil +} + // AllocAllIPAddress allocates all IP addresses available on eni func (cache *EC2InstanceMetadataCache) AllocAllIPAddress(eniID string) error { log.Infof("Trying to allocate all available ip addresses on eni: %s", eniID) diff --git a/pkg/awsutils/awsutils_test.go b/pkg/awsutils/awsutils_test.go index 113880c5a99..b8f6e4aafc7 100644 --- a/pkg/awsutils/awsutils_test.go +++ b/pkg/awsutils/awsutils_test.go @@ -552,6 +552,37 @@ func TestAllocAllIPAddress(t *testing.T) { assert.NoError(t, err) } +func TestAllocIPAddresses(t *testing.T) { + ctrl, _, mockEC2, _ := setup(t) + defer ctrl.Finish() + + // when required IP numbers(5) is below ENI's limit(49) + input := &ec2.AssignPrivateIpAddressesInput{ + NetworkInterfaceId: aws.String("eni-id"), + SecondaryPrivateIpAddressCount: aws.Int64(5), + } + mockEC2.EXPECT().AssignPrivateIpAddresses(input).Return(nil, nil) + + ins := &EC2InstanceMetadataCache{ec2SVC: mockEC2, instanceType: "r4.16xlarge"} + + err := ins.AllocIPAddresses("eni-id", 5) + + assert.NoError(t, err) + + // when required IP numbers(60) is higher than ENI's limit(49) + input = &ec2.AssignPrivateIpAddressesInput{ + NetworkInterfaceId: aws.String("eni-id"), + SecondaryPrivateIpAddressCount: aws.Int64(49), + } + mockEC2.EXPECT().AssignPrivateIpAddresses(input).Return(nil, nil) + + ins = &EC2InstanceMetadataCache{ec2SVC: mockEC2, instanceType: "r4.16xlarge"} + + err = ins.AllocIPAddresses("eni-id", 49) + + assert.NoError(t, err) +} + func TestAllocAllIPAddressOnErr(t *testing.T) { ctrl, _, mockEC2, _ := setup(t) defer ctrl.Finish() diff --git a/pkg/awsutils/mocks/awsutils_mocks.go b/pkg/awsutils/mocks/awsutils_mocks.go index 2b44b028f08..009027f9851 100644 --- a/pkg/awsutils/mocks/awsutils_mocks.go +++ b/pkg/awsutils/mocks/awsutils_mocks.go @@ -85,6 +85,18 @@ func (mr *MockAPIsMockRecorder) AllocIPAddress(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllocIPAddress", reflect.TypeOf((*MockAPIs)(nil).AllocIPAddress), arg0) } +// AllocIPAddresses mocks base method +func (m *MockAPIs) AllocIPAddresses(arg0 string, arg1 int64) error { + ret := m.ctrl.Call(m, "AllocIPAddresses", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AllocIPAddresses indicates an expected call of AllocIPAddresses +func (mr *MockAPIsMockRecorder) AllocIPAddresses(arg0, arg1 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllocIPAddresses", reflect.TypeOf((*MockAPIs)(nil).AllocIPAddresses), arg0, arg1) +} + // DescribeENI mocks base method func (m *MockAPIs) DescribeENI(arg0 string) ([]*ec2.NetworkInterfacePrivateIpAddress, *string, error) { ret := m.ctrl.Call(m, "DescribeENI", arg0)