Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shutdown listener #645

Merged
merged 1 commit into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ipamd/datastore/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ func (ds *DataStore) RemoveUnusedENIFromStore(warmIPTarget int) string {

deletableENI := ds.getDeletableENI(warmIPTarget)
if deletableENI == nil {
log.Debugf("No ENI can be deleted at this time")
mogren marked this conversation as resolved.
Show resolved Hide resolved
return ""
}

Expand Down
33 changes: 26 additions & 7 deletions ipamd/ipamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

log "github.com/cihub/seelog"
Expand Down Expand Up @@ -161,10 +162,10 @@ type IPAMContext struct {
primaryIP map[string]string
lastNodeIPPoolAction time.Time
lastDecreaseIPPool time.Time

// reconcileCooldownCache keeps timestamps of the last time an IP address was unassigned from an ENI,
// so that we don't reconcile and add it back too quickly if IMDS lags behind reality.
reconcileCooldownCache ReconcileCooldownCache
terminating int32 // Flag to warn that the pod is about to shut down.
}

// Keep track of recently freed IPs to avoid reading stale EC2 metadata
Expand Down Expand Up @@ -231,10 +232,13 @@ func New(k8sapiClient k8sapi.K8SAPIs, eniConfig *eniconfig.ENIConfigController)
log.Errorf("Failed to initialize awsutil interface %v", err)
return nil, errors.Wrap(err, "ipamd: can not initialize with AWS SDK interface")
}

c.awsClient = client

c.primaryIP = make(map[string]string)
c.reconcileCooldownCache.cache = make(map[string]time.Time)
c.warmENITarget = getWarmENITarget()
c.warmIPTarget = getWarmIPTarget()
c.useCustomNetworking = UseCustomNetworkCfg()

err = c.nodeInit()
if err != nil {
Expand Down Expand Up @@ -265,10 +269,6 @@ func (c *IPAMContext) nodeInit() error {
}
ipMax.Set(float64(c.maxIPsPerENI * c.maxENI))

c.useCustomNetworking = UseCustomNetworkCfg()
c.primaryIP = make(map[string]string)
c.reconcileCooldownCache.cache = make(map[string]time.Time)

enis, err := c.awsClient.GetAttachedENIs()
if err != nil {
log.Error("Failed to retrieve ENI info")
Expand Down Expand Up @@ -453,11 +453,16 @@ func (c *IPAMContext) decreaseIPPool(interval time.Duration) {

// tryFreeENI always tries to free one ENI
func (c *IPAMContext) tryFreeENI() {
if c.isTerminating() {
log.Debug("AWS CNI is terminating, not detaching any ENIs")
return
}

eni := c.dataStore.RemoveUnusedENIFromStore(c.warmIPTarget)
if eni == "" {
log.Info("No ENI to remove, all ENIs have IPs in use")
return
}

log.Debugf("Start freeing ENI %s", eni)
err := c.awsClient.FreeENI(eni)
if err != nil {
Expand Down Expand Up @@ -560,6 +565,11 @@ func (c *IPAMContext) increaseIPPool() {
return
}

if c.isTerminating() {
log.Debug("AWS CNI is terminating, will not try to attach any new IPs or ENIs right now")
return
}

// Try to add more IPs to existing ENIs first.
increasedPool, err := c.tryAssignIPs()
if err != nil {
Expand Down Expand Up @@ -1050,6 +1060,15 @@ func (c *IPAMContext) ipTargetState() (short int, over int, enabled bool) {
return short, over, true
}

// setTerminating atomically sets the terminating flag.
func (c *IPAMContext) setTerminating() {
atomic.StoreInt32(&c.terminating, 1)
}

func (c *IPAMContext) isTerminating() bool {
return atomic.LoadInt32(&c.terminating) > 0
}

// GetConfigForDebug returns the active values of the configuration env vars (for debugging purposes).
func GetConfigForDebug() map[string]interface{} {
return map[string]interface{}{
Expand Down
6 changes: 6 additions & 0 deletions ipamd/ipamd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func TestNodeInit(t *testing.T) {
maxENI: 4,
warmENITarget: 1,
warmIPTarget: 3,
primaryIP: make(map[string]string),
terminating: int32(0),
networkClient: mockNetwork}

eni1 := awsutils.ENIMetadata{
Expand Down Expand Up @@ -175,6 +177,7 @@ func testIncreaseIPPool(t *testing.T, useENIConfig bool) {
useCustomNetworking: UseCustomNetworkCfg(),
eniConfig: mockENIConfig,
primaryIP: make(map[string]string),
terminating: int32(0),
}

mockContext.dataStore = datastore.NewDataStore()
Expand Down Expand Up @@ -252,6 +255,7 @@ func TestTryAddIPToENI(t *testing.T) {
networkClient: mockNetwork,
eniConfig: mockENIConfig,
primaryIP: make(map[string]string),
terminating: int32(0),
}

mockContext.dataStore = datastore.NewDataStore()
Expand Down Expand Up @@ -310,6 +314,7 @@ func TestNodeIPPoolReconcile(t *testing.T) {
k8sClient: mockK8S,
networkClient: mockNetwork,
primaryIP: make(map[string]string),
terminating: int32(0),
}

mockContext.dataStore = datastore.NewDataStore()
Expand Down Expand Up @@ -397,6 +402,7 @@ func TestGetWarmIPTargetState(t *testing.T) {
k8sClient: mockK8S,
networkClient: mockNetwork,
primaryIP: make(map[string]string),
terminating: int32(0),
}

mockContext.dataStore = datastore.NewDataStore()
Expand Down
21 changes: 21 additions & 0 deletions ipamd/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package ipamd

import (
"net"
"os"
"os/signal"
"syscall"

"github.com/pkg/errors"

Expand Down Expand Up @@ -124,9 +127,27 @@ func (c *IPAMContext) RunRPCHandler() error {
healthpb.RegisterHealthServer(s, hs)
// Register reflection service on gRPC server.
reflection.Register(s)
// Add shutdown hook
go c.shutdownListener(s)
if err := s.Serve(lis); err != nil {
log.Errorf("Failed to start server on gRPC port: %v", err)
return errors.Wrap(err, "ipamd: failed to start server on gPRC port")
}
return nil
}

// shutdownListener - Listen to signals and set ipamd to be in status "terminating"
func (c *IPAMContext) shutdownListener(s *grpc.Server) {
log.Info("Setting up shutdown hook.")
sig := make(chan os.Signal, 1)

// Interrupt signal sent from terminal
signal.Notify(sig, syscall.SIGINT)
// Terminate signal sent from Kubernetes
signal.Notify(sig, syscall.SIGTERM)

<-sig
log.Info("Received shutdown signal, setting 'terminating' to true")
// We received an interrupt signal, shut down.
c.setTerminating()
}
2 changes: 1 addition & 1 deletion scripts/install-aws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ if [[ -f /host/etc/cni/net.d/aws.conf ]]; then
fi

echo "====== Starting amazon-k8s-agent ======"
/app/aws-k8s-agent
exec /app/aws-k8s-agent