Skip to content

Commit

Permalink
Add WARM_IP_TARGET support
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenwu-amazon committed Jul 17, 2018
1 parent 1bd05b9 commit f2beafa
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 15 deletions.
98 changes: 86 additions & 12 deletions ipamd/ipamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
nodeIPPoolReconcileInterval = 60 * time.Second
maxK8SRetries = 12
retryK8SInterval = 5 * time.Second
noWarmIPTarget = 0
)

var (
Expand Down Expand Up @@ -195,14 +196,6 @@ func (c *IPAMContext) nodeInit() error {
for _, eni := range enis {
log.Debugf("Discovered ENI %s", eni.ENIID)

err = c.awsClient.AllocAllIPAddress(eni.ENIID)
if err != nil {
ipamdErrInc("nodeInitAllocAllIPAddressFailed", err)
//TODO need to increment ipamd err stats
log.Warn("During ipamd init: error encountered on trying to allocate all available IP addresses", err)
// fall though to add those allocated OK addresses
}

err = c.setupENI(eni.ENIID, eni)
if err != nil {
log.Errorf("Failed to setup eni %s network: %v", eni.ENIID, err)
Expand Down Expand Up @@ -310,6 +303,12 @@ func (c *IPAMContext) updateIPPoolIfRequired() {
func (c *IPAMContext) retryAllocENIIP() {
ipamdActionsInprogress.WithLabelValues("retryAllocENIIP").Add(float64(1))
defer ipamdActionsInprogress.WithLabelValues("retryAllocENIIP").Sub(float64(1))

curIPTarget, warmIPTargetDefined := c.getCurWarmIPTarget()
if warmIPTargetDefined && curIPTarget <= 0 {
log.Debugf("Skipping retry allocating ENI IP, warm IP target reached")
return
}
maxIPLimit, err := c.awsClient.GetENIipLimit()
if err != nil {
log.Infof("Failed to retrieve ENI IP limit: %v", err)
Expand All @@ -318,7 +317,12 @@ func (c *IPAMContext) retryAllocENIIP() {
eni := c.dataStore.GetENINeedsIP(maxIPLimit)
if eni != nil {
log.Debugf("Attempt again to allocate IP address for eni :%s", eni.ID)
err := c.awsClient.AllocAllIPAddress(eni.ID)
var err error
if warmIPTargetDefined {
err = c.awsClient.AllocIPAddresses(eni.ID, int64(curIPTarget))
} else {
err = c.awsClient.AllocIPAddresses(eni.ID, maxIPLimit)
}
if err != nil {
ipamdErrInc("retryAllocENIIPAllocAllIPAddressFailed", err)
log.Warn("During eni repair: error encountered on allocate IP address", err)
Expand All @@ -332,6 +336,12 @@ func (c *IPAMContext) retryAllocENIIP() {
}
c.lastNodeIPPoolAction = time.Now()
c.addENIaddressesToDataStore(ec2Addrs, eni.ID)

curIPTarget, warmIPTargetDefined := c.getCurWarmIPTarget()
if warmIPTargetDefined && curIPTarget <= 0 {
log.Debugf("Finish retry allocating ENI IP, warm IP target reached")
return
}
}
}

Expand Down Expand Up @@ -360,6 +370,13 @@ func (c *IPAMContext) increaseIPPool() {
log.Debug("Start increasing IP Pool size")
ipamdActionsInprogress.WithLabelValues("increaseIPPool").Add(float64(1))
defer ipamdActionsInprogress.WithLabelValues("increaseIPPool").Sub(float64(1))

curIPTarget, warmIPTargetDefined := c.getCurWarmIPTarget()
if warmIPTargetDefined && curIPTarget <= 0 {
log.Debugf("Skipping increase IP Pool, warm IP target reached")
return
}

maxENIs, err := c.awsClient.GetENILimit()
enisMax.Set(float64(maxENIs))

Expand Down Expand Up @@ -388,7 +405,17 @@ func (c *IPAMContext) increaseIPPool() {
return
}

err = c.awsClient.AllocAllIPAddress(eni)
maxIPLimit, err := c.awsClient.GetENIipLimit()
if err != nil {
log.Infof("Failed to retrieve ENI IP limit: %v", err)
return
}

if warmIPTargetDefined {
err = c.awsClient.AllocIPAddresses(eni, int64(curIPTarget))
} else {
err = c.awsClient.AllocIPAddresses(eni, maxIPLimit)
}
if err != nil {
log.Warnf("Failed to allocate all available ip addresses on an ENI %v", err)
// continue to proecsses those allocated ip addresses
Expand Down Expand Up @@ -542,12 +569,22 @@ func logPoolStats(total, used, currentMaxAddrsPerENI, maxAddrsPerENI int) {

//nodeIPPoolTooLow returns true if IP pool is below low threshold
func (c *IPAMContext) nodeIPPoolTooLow() bool {
curIPTarget, warmIPTargetDefined := c.getCurWarmIPTarget()
if warmIPTargetDefined && curIPTarget <= 0 {
return false
}

if warmIPTargetDefined && curIPTarget > 0 {
return true
}

// if WARM-IP-TARGET not defined fallback using number of ENIs
warmENITarget := getWarmENITarget()
total, used := c.dataStore.GetStats()
logPoolStats(total, used, c.currentMaxAddrsPerENI, c.maxAddrsPerENI)

available := total - used
return (available <= c.currentMaxAddrsPerENI*warmENITarget)
return (available <= c.maxAddrsPerENI*warmENITarget)
}

// NodeIPPoolTooHigh returns true if IP pool is above high threshold
Expand All @@ -557,8 +594,13 @@ func (c *IPAMContext) nodeIPPoolTooHigh() bool {
logPoolStats(total, used, c.currentMaxAddrsPerENI, c.maxAddrsPerENI)

available := total - used
return (available > (warmENITarget+1)*c.currentMaxAddrsPerENI)

target := getWarmIPTarget()
if target != noWarmIPTarget && target >= available {
return false
}

return (available > (warmENITarget+1)*c.maxAddrsPerENI)
}

func ipamdErrInc(fn string, err error) {
Expand Down Expand Up @@ -679,3 +721,35 @@ func (c *IPAMContext) eniIPPoolReconcile(ipPool map[string]*datastore.AddressInf
return nil

}

func getWarmIPTarget() int {
inputStr, found := os.LookupEnv("WARM_IP_TARGET")

if !found {
return noWarmIPTarget
}

if input, err := strconv.Atoi(inputStr); err == nil {
if input < 0 {
return noWarmIPTarget
}
log.Debugf("Using WARM-IP-TARGET %v", input)
return input
}
return noWarmIPTarget
}

func (c *IPAMContext) getCurWarmIPTarget() (int, bool) {
target := getWarmIPTarget()
if target == noWarmIPTarget {
// there is no WARM_IP_TARGET defined, fallback to use all IP addresses on ENI
return target, false
}

total, used := c.dataStore.GetStats()
log.Debugf("Current warm IP stats: target: %d, total: %d, used: %d",
target, total, used)
curTarget := target - (total - used)

return curTarget, true
}
65 changes: 62 additions & 3 deletions ipamd/ipamd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package ipamd

import (
"net"
"os"
"testing"

"github.com/aws/amazon-vpc-cni-k8s/ipamd/datastore"
Expand Down Expand Up @@ -101,7 +102,6 @@ func TestNodeInit(t *testing.T) {

//primaryENIid
mockAWS.EXPECT().GetPrimaryENI().Return(primaryENIid)
mockAWS.EXPECT().AllocAllIPAddress(primaryENIid).Return(nil)
attachmentID := testAttachmentID
testAddr1 := ipaddr01
testAddr2 := ipaddr02
Expand All @@ -116,7 +116,6 @@ func TestNodeInit(t *testing.T) {

//secENIid
mockAWS.EXPECT().GetPrimaryENI().Return(primaryENIid)
mockAWS.EXPECT().AllocAllIPAddress(secENIid).Return(nil)
attachmentID = testAttachmentID
testAddr11 := ipaddr11
testAddr12 := ipaddr12
Expand Down Expand Up @@ -162,7 +161,9 @@ func TestIncreaseIPPool(t *testing.T) {
mockAWS.EXPECT().GetENILimit().Return(4, nil)
mockAWS.EXPECT().AllocENI().Return(eni2, nil)

mockAWS.EXPECT().AllocAllIPAddress(eni2)
mockAWS.EXPECT().GetENIipLimit().Return(int64(5), nil)

mockAWS.EXPECT().AllocIPAddresses(eni2, int64(5))

mockAWS.EXPECT().GetAttachedENIs().Return([]awsutils.ENIMetadata{
{
Expand Down Expand Up @@ -270,3 +271,61 @@ func TestNodeIPPoolReconcile(t *testing.T) {
assert.Equal(t, len(curENIs.ENIIPPools), 0)
assert.Equal(t, curENIs.TotalIPs, 0)
}

func TestGetWarmENITarget(t *testing.T) {
ctrl, _, _, _, _ := setup(t)
defer ctrl.Finish()

os.Setenv("WARM_IP_TARGET", "5")
warmIPTarget := getWarmIPTarget()
assert.Equal(t, warmIPTarget, 5)

os.Unsetenv("WARM_IP_TARGET")
warmIPTarget = getWarmIPTarget()
assert.Equal(t, warmIPTarget, noWarmIPTarget)

os.Setenv("WARM_IP_TARGET", "non-integer-string")
warmIPTarget = getWarmIPTarget()
assert.Equal(t, warmIPTarget, noWarmIPTarget)
}

func TestGetCurWarmIPTarget(t *testing.T) {
ctrl, mockAWS, mockK8S, _, mockNetwork := setup(t)
defer ctrl.Finish()

mockContext := &IPAMContext{
awsClient: mockAWS,
k8sClient: mockK8S,
networkClient: mockNetwork,
primaryIP: make(map[string]string),
}

mockContext.dataStore = datastore.NewDataStore()

os.Unsetenv("WARM_IP_TARGET")
_, warmIPTargetDefined := mockContext.getCurWarmIPTarget()
assert.False(t, warmIPTargetDefined)

os.Setenv("WARM_IP_TARGET", "5")
curWarmIPTarget, warmIPTargetDefined := mockContext.getCurWarmIPTarget()
assert.True(t, warmIPTargetDefined)
assert.Equal(t, curWarmIPTarget, 5)

// add 2 addresses to datastore
mockContext.dataStore.AddENI("eni-1", 1, true)
mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.1")
mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.2")

curWarmIPTarget, warmIPTargetDefined = mockContext.getCurWarmIPTarget()
assert.True(t, warmIPTargetDefined)
assert.Equal(t, curWarmIPTarget, 3)

// add 3 more addresses to datastore
mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.3")
mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.4")
mockContext.dataStore.AddENIIPv4Address("eni-1", "1.1.1.5")

curWarmIPTarget, warmIPTargetDefined = mockContext.getCurWarmIPTarget()
assert.True(t, warmIPTargetDefined)
assert.Equal(t, curWarmIPTarget, 0)
}
33 changes: 33 additions & 0 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type APIs interface {
// AllocAllIPAddress allocates all ip addresses available on an eni
AllocAllIPAddress(eniID string) error

// Allocate alloactes numIPs of IP address on a eni
AllocIPAddresses(eniID string, numIPs int64) error

// GetVPCIPv4CIDR returns vpc's cidr
GetVPCIPv4CIDR() string

Expand Down Expand Up @@ -829,6 +832,36 @@ func (cache *EC2InstanceMetadataCache) GetENILimit() (int, error) {
return eniLimit, nil
}

// Allocate alloactes numIPs of IP address on a eni
func (cache *EC2InstanceMetadataCache) AllocIPAddresses(eniID string, numIPs int64) error {
var needIPs = int64(numIPs)

ipLimit, err := cache.GetENIipLimit()
if err == nil && ipLimit < int64(needIPs) {
needIPs = ipLimit
}

log.Infof("Trying to allocate %d IP address on eni %s", needIPs, eniID)

input := &ec2.AssignPrivateIpAddressesInput{
NetworkInterfaceId: aws.String(eniID),
SecondaryPrivateIpAddressCount: aws.Int64(int64(needIPs)),
}

start := time.Now()
_, err = cache.ec2SVC.AssignPrivateIpAddresses(input)
awsAPILatency.WithLabelValues("AssignPrivateIpAddresses", fmt.Sprint(err != nil)).Observe(msSince(start))
if err != nil {
awsAPIErrInc("AssignPrivateIpAddresses", err)
if containsPrivateIPAddressLimitExceededError(err) {
return nil
}
log.Errorf("Failed to allocate a private IP address %v", err)
return errors.Wrap(err, "allocate ip address: failed to allocate a private IP address")
}
return nil
}

// AllocAllIPAddress allocates all IP addresses available on eni
func (cache *EC2InstanceMetadataCache) AllocAllIPAddress(eniID string) error {
log.Infof("Trying to allocate all available ip addresses on eni: %s", eniID)
Expand Down
31 changes: 31 additions & 0 deletions pkg/awsutils/awsutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,37 @@ func TestAllocAllIPAddress(t *testing.T) {
assert.NoError(t, err)
}

func TestAllocIPAddresses(t *testing.T) {
ctrl, _, mockEC2 := setup(t)
defer ctrl.Finish()

// when required IP numbers(5) is below ENI's limit(49)
input := &ec2.AssignPrivateIpAddressesInput{
NetworkInterfaceId: aws.String("eni-id"),
SecondaryPrivateIpAddressCount: aws.Int64(5),
}
mockEC2.EXPECT().AssignPrivateIpAddresses(input).Return(nil, nil)

ins := &EC2InstanceMetadataCache{ec2SVC: mockEC2, instanceType: "r4.16xlarge"}

err := ins.AllocIPAddresses("eni-id", 5)

assert.NoError(t, err)

// when required IP numbers(60) is higher than ENI's limit(49)
input = &ec2.AssignPrivateIpAddressesInput{
NetworkInterfaceId: aws.String("eni-id"),
SecondaryPrivateIpAddressCount: aws.Int64(49),
}
mockEC2.EXPECT().AssignPrivateIpAddresses(input).Return(nil, nil)

ins = &EC2InstanceMetadataCache{ec2SVC: mockEC2, instanceType: "r4.16xlarge"}

err = ins.AllocIPAddresses("eni-id", 49)

assert.NoError(t, err)
}

func TestAllocAllIPAddressOnErr(t *testing.T) {
ctrl, _, mockEC2 := setup(t)
defer ctrl.Finish()
Expand Down
12 changes: 12 additions & 0 deletions pkg/awsutils/mocks/awsutils_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f2beafa

Please sign in to comment.