Skip to content

Commit

Permalink
feat: CNS/CNI async pod delete (#2183)
Browse files Browse the repository at this point in the history
* initial changes for cni/cns delete deadlock

* add logs and set watcher path

* working fswatcher, removing extra debug lines

* watcher changes for azure-ipam

* remove additional logger from fsnotify and address comments

* /deleteIDs directory as part of cnsconfig

* add feature flag for async delete

* adds some unit test + remove changes for azure-ipam(split pr, dependency conflicts)

* update ut

* update uts

* swift configmap update

* fix configmap for test

* addressing comments

* fix lint

* adding cause to connection error struct

* connectionerr lint

* addressing comments, change watchfs to watcher method

* add ctx to releaseIP func

* log containerID in failure to add watcher, exit select if context is cancelled

* fix logs in network.go after rebase

* catch release ip error in invoker_cns.go

* retry on failure to release ip

* lint fix

* rework asyncdelete watcher

Signed-off-by: Evan Baker <[email protected]>

* include podinterfaceID in file for releaseIP

* close file before delete

---------

Signed-off-by: Evan Baker <[email protected]>
Co-authored-by: Evan Baker <[email protected]>
  • Loading branch information
camrynl and rbtr authored Sep 26, 2023
1 parent 5d25eb1 commit 5370f32
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 14 deletions.
19 changes: 17 additions & 2 deletions cni/network/invoker_cns.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/Azure/azure-container-networking/cni"
"github.com/Azure/azure-container-networking/cni/util"
"github.com/Azure/azure-container-networking/cns"
cnscli "github.com/Azure/azure-container-networking/cns/client"
"github.com/Azure/azure-container-networking/cns/fsnotify"
"github.com/Azure/azure-container-networking/iptables"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/network"
Expand All @@ -17,11 +19,13 @@ import (
cniTypes "github.com/containernetworking/cni/pkg/types"
cniTypesCurr "github.com/containernetworking/cni/pkg/types/100"
"github.com/pkg/errors"
"go.uber.org/zap"
)

var (
errEmptyCNIArgs = errors.New("empty CNI cmd args not allowed")
errInvalidArgs = errors.New("invalid arg(s)")
watcherPath = "/var/run/azure-vnet/deleteIDs"
)

type CNSIPAMInvoker struct {
Expand Down Expand Up @@ -242,8 +246,19 @@ func (invoker *CNSIPAMInvoker) Delete(address *net.IPNet, nwCfg *cni.NetworkConf
}

if err := invoker.cnsClient.ReleaseIPAddress(context.TODO(), req); err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to release IP %v with err ", address)+"%w")
var connectionErr *cnscli.ConnectionFailureErr
if errors.As(err, &connectionErr) {
addErr := fsnotify.AddFile(req.PodInterfaceID, args.ContainerID, watcherPath)
if addErr != nil {
log.Errorf("Failed to add file to watcher", zap.String("podInterfaceID", req.PodInterfaceID), zap.String("containerID", args.ContainerID), zap.Error(addErr))
return errors.Wrap(addErr, fmt.Sprintf("failed to add file to watcher with containerID %s and podInterfaceID %s", args.ContainerID, req.PodInterfaceID))
}
return nil
}
log.Errorf("Failed to release IP address",
zap.String("infracontainerid", req.InfraContainerID),
zap.Error(err))
return errors.Wrap(err, fmt.Sprintf("failed to release IP %v using ReleaseIPs with err ", req.DesiredIPAddress)+"%w")
}

return nil
}
12 changes: 11 additions & 1 deletion cns/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ type Client struct {
routes map[string]url.URL
}

type ConnectionFailureErr struct {
cause error
}

func (e *ConnectionFailureErr) Error() string {
return e.cause.Error()
}

// New returns a new CNS client configured with the passed URL and timeout.
func New(baseURL string, requestTimeout time.Duration) (*Client, error) {
if baseURL == "" {
Expand Down Expand Up @@ -291,7 +299,9 @@ func (c *Client) ReleaseIPAddress(ctx context.Context, ipconfig cns.IPConfigRequ
req.Header.Set(headerContentType, contentTypeJSON)
res, err := c.client.Do(req)
if err != nil {
return errors.Wrap(err, "http request failed")
return &ConnectionFailureErr{
cause: err,
}
}
defer res.Body.Close()

Expand Down
26 changes: 16 additions & 10 deletions cns/configuration/configuration.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// Copyright Microsoft. All rights reserved.
package configuration

import (
Expand All @@ -20,12 +19,24 @@ const (
)

type CNSConfig struct {
AsyncPodDeletePath string
CNIConflistFilepath string
CNIConflistScenario string
ChannelMode string
EnableAsyncPodDelete bool
EnableCNIConflistGeneration bool
EnablePprof bool
EnableSubnetScarcity bool
EnableSwiftV2 bool
InitializeFromCNI bool
KeyVaultSettings KeyVaultSettings
MSISettings MSISettings
ManageEndpointState bool
ManagedSettings ManagedSettings
MellanoxMonitorIntervalSecs int
MetricsBindAddress string
PopulateHomeAzCacheRetryIntervalSecs int
ProgramSNATIPTables bool
SyncHostNCTimeoutMs int
SyncHostNCVersionIntervalMs int
TLSCertificatePath string
Expand All @@ -34,16 +45,8 @@ type CNSConfig struct {
TLSSubjectName string
TelemetrySettings TelemetrySettings
UseHTTPS bool
WatchPods bool
WireserverIP string
KeyVaultSettings KeyVaultSettings
MSISettings MSISettings
ProgramSNATIPTables bool
ManageEndpointState bool
CNIConflistScenario string
EnableCNIConflistGeneration bool
CNIConflistFilepath string
PopulateHomeAzCacheRetryIntervalSecs int
MellanoxMonitorIntervalSecs int
}

type TelemetrySettings struct {
Expand Down Expand Up @@ -196,4 +199,7 @@ func SetCNSConfigDefaults(config *CNSConfig) {
if config.WireserverIP == "" {
config.WireserverIP = "168.63.129.16"
}
if config.AsyncPodDeletePath == "" {
config.AsyncPodDeletePath = "/var/run/azure-vnet/deleteIDs"
}
}
2 changes: 2 additions & 0 deletions cns/configuration/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func TestSetCNSConfigDefaults(t *testing.T) {
},
PopulateHomeAzCacheRetryIntervalSecs: 15,
WireserverIP: "168.63.129.16",
AsyncPodDeletePath: "/var/run/azure-vnet/deleteIDs",
},
},
{
Expand Down Expand Up @@ -252,6 +253,7 @@ func TestSetCNSConfigDefaults(t *testing.T) {
},
PopulateHomeAzCacheRetryIntervalSecs: 10,
WireserverIP: "168.63.129.16",
AsyncPodDeletePath: "/var/run/azure-vnet/deleteIDs",
},
},
}
Expand Down
219 changes: 219 additions & 0 deletions cns/fsnotify/fsnotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package fsnotify

import (
"context"
"io"
"os"
"sync"
"time"

"github.com/Azure/azure-container-networking/cns"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"go.uber.org/zap"
)

type releaseIPsClient interface {
ReleaseIPAddress(ctx context.Context, ipconfig cns.IPConfigRequest) error
}

type watcher struct {
cli releaseIPsClient
path string
log *zap.Logger

pendingDelete map[string]struct{}
lock sync.Mutex
}

// Create the AsyncDelete watcher.
func New(cli releaseIPsClient, path string, logger *zap.Logger) *watcher { //nolint
// Add directory where intended deletes are kept
if err := os.Mkdir(path, 0o755); err != nil { //nolint
logger.Error("error making directory", zap.String("path", path), zap.Error(err))
}
return &watcher{
cli: cli,
path: path,
log: logger,
pendingDelete: make(map[string]struct{}),
}
}

// releaseAll locks and iterates the pendingDeletes map and calls CNS to
// release the IP for any Pod containerIDs present. When an IP is released
// that entry is removed from the map and the file is deleted. If the file
// fails to delete, we still remove it from the map so that we don't retry
// it during the life of this process, but we may retry it on a subsequent
// invocation of CNS. This is okay because calling releaseIP on an already
// processed containerID is a no-op, and we may be able to delete the file
// during that future retry.
func (w *watcher) releaseAll(ctx context.Context) {
w.lock.Lock()
defer w.lock.Unlock()
for containerID := range w.pendingDelete {
// read file contents
filepath := w.path + "/" + containerID
file, err := os.Open(filepath)
if err != nil {
w.log.Error("failed to open file", zap.Error(err))
}

data, errReadingFile := io.ReadAll(file)
if errReadingFile != nil {
w.log.Error("failed to read file content", zap.Error(errReadingFile))
}
file.Close()
podInterfaceID := string(data)

w.log.Info("releasing IP for missed delete", zap.String("podInterfaceID", podInterfaceID), zap.String("containerID", containerID))
if err := w.releaseIP(ctx, podInterfaceID, containerID); err != nil {
w.log.Error("failed to release IP for missed delete", zap.String("containerID", containerID), zap.Error(err))
continue
}
w.log.Info("successfully released IP for missed delete", zap.String("containerID", containerID))
delete(w.pendingDelete, containerID)
if err := removeFile(containerID, w.path); err != nil {
w.log.Error("failed to remove file for missed delete", zap.Error(err))
}
}
}

// watchPendingDelete periodically checks the map for pending release IPs
// and calls releaseAll to process the contents when present.
func (w *watcher) watchPendingDelete(ctx context.Context) error {
ticker := time.NewTicker(15 * time.Second) //nolint
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "exiting watchPendingDelete")
case <-ticker.C:
n := len(w.pendingDelete)
if n == 0 {
continue
}
w.log.Info("processing pending missed deletes", zap.Int("count", n))
w.releaseAll(ctx)
}
}
}

// watchFS starts the fsnotify watcher and handles events for file creation
// or deletion in the missed pending delete directory. A file creation event
// indicates that CNS missed the delete call for a containerID and needs
// to process the release IP asynchronously.
func (w *watcher) watchFS(ctx context.Context) error {
// Create new fs watcher.
watcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "error creating fsnotify watcher")
}
defer watcher.Close()

err = watcher.Add(w.path)
if err != nil {
w.log.Error("failed to add path to fsnotify watcher", zap.String("path", w.path), zap.Error(err))
}
// Start listening for events.
w.log.Info("listening for events from fsnotify watcher")
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "exiting watchFS")
case event, ok := <-watcher.Events:
if !ok {
return errors.New("fsnotify watcher closed")
}
if !event.Has(fsnotify.Create) {
// discard any event that is not a file Create
continue
}
w.log.Info("received create event", zap.String("event", event.Name))
w.lock.Lock()
w.pendingDelete[event.Name] = struct{}{}
w.lock.Unlock()
case watcherErr := <-watcher.Errors:
w.log.Error("fsnotify watcher error", zap.Error(watcherErr))
}
}
}

// readFS lists the directory and enqueues any missed deletes that are already
// present on-disk.
func (w *watcher) readFS() error {
w.log.Info("listing directory", zap.String("path", w.path))
dirContents, err := os.ReadDir(w.path)
if err != nil {
w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err))
return errors.Wrapf(err, "failed to read %s", w.path)
}
if len(dirContents) == 0 {
w.log.Info("no missed deletes found")
return nil
}
w.lock.Lock()
for _, file := range dirContents {
w.log.Info("adding missed delete from file", zap.String("name", file.Name()))
w.pendingDelete[file.Name()] = struct{}{}
}
w.lock.Unlock()
return nil
}

// WatchFS starts the filesystem watcher to handle async Pod deletes.
// Blocks until the context is closed; returns underlying fsnotify errors
// if something goes fatally wrong.
func (w *watcher) Start(ctx context.Context) error {
errs := make(chan error)
// Start watching for enqueued missed deletes so that we process them as soon as they arrive.
go func(errs chan<- error) {
errs <- w.watchPendingDelete(ctx)
}(errs)

// Start watching for changes to the filesystem so that we don't miss any async deletes.
go func(errs chan<- error) {
errs <- w.watchFS(ctx)
}(errs)

// Read the directory to enqueue any missed deletes that are already present on-disk.
if err := w.readFS(); err != nil {
return err
}

// block until one of the goroutines returns an error
err := <-errs
return err
}

// AddFile creates new file using the containerID as name
func AddFile(podInterfaceID, containerID, path string) error {
filepath := path + "/" + containerID
f, err := os.Create(filepath)
if err != nil {
return errors.Wrap(err, "error creating file")
}
_, writeErr := f.WriteString(podInterfaceID)
if writeErr != nil {
return errors.Wrap(writeErr, "error writing to file")
}
return errors.Wrap(f.Close(), "error adding file to directory")
}

// removeFile removes the file based on containerID
func removeFile(containerID, path string) error {
filepath := path + "/" + containerID
if err := os.Remove(filepath); err != nil {
return errors.Wrap(err, "error deleting file")
}
return nil
}

// call cns ReleaseIPs
func (w *watcher) releaseIP(ctx context.Context, podInterfaceID, containerID string) error {
ipconfigreq := &cns.IPConfigRequest{
PodInterfaceID: podInterfaceID,
InfraContainerID: containerID,
}
return errors.Wrap(w.cli.ReleaseIPAddress(ctx, *ipconfigreq), "failed to release IP from CNS")
}
Loading

0 comments on commit 5370f32

Please sign in to comment.