diff --git a/cni/network/invoker_cns.go b/cni/network/invoker_cns.go index c484824d7e..fa10f5283c 100644 --- a/cni/network/invoker_cns.go +++ b/cni/network/invoker_cns.go @@ -9,6 +9,8 @@ import ( "github.com/Azure/azure-container-networking/cni" "github.com/Azure/azure-container-networking/cni/util" "github.com/Azure/azure-container-networking/cns" + cnscli "github.com/Azure/azure-container-networking/cns/client" + "github.com/Azure/azure-container-networking/cns/fsnotify" "github.com/Azure/azure-container-networking/iptables" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/network" @@ -17,11 +19,13 @@ import ( cniTypes "github.com/containernetworking/cni/pkg/types" cniTypesCurr "github.com/containernetworking/cni/pkg/types/100" "github.com/pkg/errors" + "go.uber.org/zap" ) var ( errEmptyCNIArgs = errors.New("empty CNI cmd args not allowed") errInvalidArgs = errors.New("invalid arg(s)") + watcherPath = "/var/run/azure-vnet/deleteIDs" ) type CNSIPAMInvoker struct { @@ -242,8 +246,19 @@ func (invoker *CNSIPAMInvoker) Delete(address *net.IPNet, nwCfg *cni.NetworkConf } if err := invoker.cnsClient.ReleaseIPAddress(context.TODO(), req); err != nil { - return errors.Wrap(err, fmt.Sprintf("failed to release IP %v with err ", address)+"%w") + var connectionErr *cnscli.ConnectionFailureErr + if errors.As(err, &connectionErr) { + addErr := fsnotify.AddFile(req.PodInterfaceID, args.ContainerID, watcherPath) + if addErr != nil { + log.Errorf("Failed to add file to watcher", zap.String("podInterfaceID", req.PodInterfaceID), zap.String("containerID", args.ContainerID), zap.Error(addErr)) + return errors.Wrap(addErr, fmt.Sprintf("failed to add file to watcher with containerID %s and podInterfaceID %s", args.ContainerID, req.PodInterfaceID)) + } + return nil + } + log.Errorf("Failed to release IP address", + zap.String("infracontainerid", req.InfraContainerID), + zap.Error(err)) + return errors.Wrap(err, fmt.Sprintf("failed to release IP %v using ReleaseIPs with err ", req.DesiredIPAddress)+"%w") } - return nil } diff --git a/cns/client/client.go b/cns/client/client.go index 41b9e7ae56..f78aa934dd 100644 --- a/cns/client/client.go +++ b/cns/client/client.go @@ -54,6 +54,14 @@ type Client struct { routes map[string]url.URL } +type ConnectionFailureErr struct { + cause error +} + +func (e *ConnectionFailureErr) Error() string { + return e.cause.Error() +} + // New returns a new CNS client configured with the passed URL and timeout. func New(baseURL string, requestTimeout time.Duration) (*Client, error) { if baseURL == "" { @@ -291,7 +299,9 @@ func (c *Client) ReleaseIPAddress(ctx context.Context, ipconfig cns.IPConfigRequ req.Header.Set(headerContentType, contentTypeJSON) res, err := c.client.Do(req) if err != nil { - return errors.Wrap(err, "http request failed") + return &ConnectionFailureErr{ + cause: err, + } } defer res.Body.Close() diff --git a/cns/configuration/configuration.go b/cns/configuration/configuration.go index 2753b4f1b5..bdb533a138 100644 --- a/cns/configuration/configuration.go +++ b/cns/configuration/configuration.go @@ -1,4 +1,3 @@ -// Copyright Microsoft. All rights reserved. package configuration import ( @@ -20,12 +19,24 @@ const ( ) type CNSConfig struct { + AsyncPodDeletePath string + CNIConflistFilepath string + CNIConflistScenario string ChannelMode string + EnableAsyncPodDelete bool + EnableCNIConflistGeneration bool EnablePprof bool EnableSubnetScarcity bool + EnableSwiftV2 bool InitializeFromCNI bool + KeyVaultSettings KeyVaultSettings + MSISettings MSISettings + ManageEndpointState bool ManagedSettings ManagedSettings + MellanoxMonitorIntervalSecs int MetricsBindAddress string + PopulateHomeAzCacheRetryIntervalSecs int + ProgramSNATIPTables bool SyncHostNCTimeoutMs int SyncHostNCVersionIntervalMs int TLSCertificatePath string @@ -34,16 +45,8 @@ type CNSConfig struct { TLSSubjectName string TelemetrySettings TelemetrySettings UseHTTPS bool + WatchPods bool WireserverIP string - KeyVaultSettings KeyVaultSettings - MSISettings MSISettings - ProgramSNATIPTables bool - ManageEndpointState bool - CNIConflistScenario string - EnableCNIConflistGeneration bool - CNIConflistFilepath string - PopulateHomeAzCacheRetryIntervalSecs int - MellanoxMonitorIntervalSecs int } type TelemetrySettings struct { @@ -196,4 +199,7 @@ func SetCNSConfigDefaults(config *CNSConfig) { if config.WireserverIP == "" { config.WireserverIP = "168.63.129.16" } + if config.AsyncPodDeletePath == "" { + config.AsyncPodDeletePath = "/var/run/azure-vnet/deleteIDs" + } } diff --git a/cns/configuration/configuration_test.go b/cns/configuration/configuration_test.go index 3e2e40278e..cf86882fc5 100644 --- a/cns/configuration/configuration_test.go +++ b/cns/configuration/configuration_test.go @@ -208,6 +208,7 @@ func TestSetCNSConfigDefaults(t *testing.T) { }, PopulateHomeAzCacheRetryIntervalSecs: 15, WireserverIP: "168.63.129.16", + AsyncPodDeletePath: "/var/run/azure-vnet/deleteIDs", }, }, { @@ -252,6 +253,7 @@ func TestSetCNSConfigDefaults(t *testing.T) { }, PopulateHomeAzCacheRetryIntervalSecs: 10, WireserverIP: "168.63.129.16", + AsyncPodDeletePath: "/var/run/azure-vnet/deleteIDs", }, }, } diff --git a/cns/fsnotify/fsnotify.go b/cns/fsnotify/fsnotify.go new file mode 100644 index 0000000000..cd6bff28dc --- /dev/null +++ b/cns/fsnotify/fsnotify.go @@ -0,0 +1,219 @@ +package fsnotify + +import ( + "context" + "io" + "os" + "sync" + "time" + + "github.com/Azure/azure-container-networking/cns" + "github.com/fsnotify/fsnotify" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type releaseIPsClient interface { + ReleaseIPAddress(ctx context.Context, ipconfig cns.IPConfigRequest) error +} + +type watcher struct { + cli releaseIPsClient + path string + log *zap.Logger + + pendingDelete map[string]struct{} + lock sync.Mutex +} + +// Create the AsyncDelete watcher. +func New(cli releaseIPsClient, path string, logger *zap.Logger) *watcher { //nolint + // Add directory where intended deletes are kept + if err := os.Mkdir(path, 0o755); err != nil { //nolint + logger.Error("error making directory", zap.String("path", path), zap.Error(err)) + } + return &watcher{ + cli: cli, + path: path, + log: logger, + pendingDelete: make(map[string]struct{}), + } +} + +// releaseAll locks and iterates the pendingDeletes map and calls CNS to +// release the IP for any Pod containerIDs present. When an IP is released +// that entry is removed from the map and the file is deleted. If the file +// fails to delete, we still remove it from the map so that we don't retry +// it during the life of this process, but we may retry it on a subsequent +// invocation of CNS. This is okay because calling releaseIP on an already +// processed containerID is a no-op, and we may be able to delete the file +// during that future retry. +func (w *watcher) releaseAll(ctx context.Context) { + w.lock.Lock() + defer w.lock.Unlock() + for containerID := range w.pendingDelete { + // read file contents + filepath := w.path + "/" + containerID + file, err := os.Open(filepath) + if err != nil { + w.log.Error("failed to open file", zap.Error(err)) + } + + data, errReadingFile := io.ReadAll(file) + if errReadingFile != nil { + w.log.Error("failed to read file content", zap.Error(errReadingFile)) + } + file.Close() + podInterfaceID := string(data) + + w.log.Info("releasing IP for missed delete", zap.String("podInterfaceID", podInterfaceID), zap.String("containerID", containerID)) + if err := w.releaseIP(ctx, podInterfaceID, containerID); err != nil { + w.log.Error("failed to release IP for missed delete", zap.String("containerID", containerID), zap.Error(err)) + continue + } + w.log.Info("successfully released IP for missed delete", zap.String("containerID", containerID)) + delete(w.pendingDelete, containerID) + if err := removeFile(containerID, w.path); err != nil { + w.log.Error("failed to remove file for missed delete", zap.Error(err)) + } + } +} + +// watchPendingDelete periodically checks the map for pending release IPs +// and calls releaseAll to process the contents when present. +func (w *watcher) watchPendingDelete(ctx context.Context) error { + ticker := time.NewTicker(15 * time.Second) //nolint + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "exiting watchPendingDelete") + case <-ticker.C: + n := len(w.pendingDelete) + if n == 0 { + continue + } + w.log.Info("processing pending missed deletes", zap.Int("count", n)) + w.releaseAll(ctx) + } + } +} + +// watchFS starts the fsnotify watcher and handles events for file creation +// or deletion in the missed pending delete directory. A file creation event +// indicates that CNS missed the delete call for a containerID and needs +// to process the release IP asynchronously. +func (w *watcher) watchFS(ctx context.Context) error { + // Create new fs watcher. + watcher, err := fsnotify.NewWatcher() + if err != nil { + return errors.Wrap(err, "error creating fsnotify watcher") + } + defer watcher.Close() + + err = watcher.Add(w.path) + if err != nil { + w.log.Error("failed to add path to fsnotify watcher", zap.String("path", w.path), zap.Error(err)) + } + // Start listening for events. + w.log.Info("listening for events from fsnotify watcher") + for { + select { + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "exiting watchFS") + case event, ok := <-watcher.Events: + if !ok { + return errors.New("fsnotify watcher closed") + } + if !event.Has(fsnotify.Create) { + // discard any event that is not a file Create + continue + } + w.log.Info("received create event", zap.String("event", event.Name)) + w.lock.Lock() + w.pendingDelete[event.Name] = struct{}{} + w.lock.Unlock() + case watcherErr := <-watcher.Errors: + w.log.Error("fsnotify watcher error", zap.Error(watcherErr)) + } + } +} + +// readFS lists the directory and enqueues any missed deletes that are already +// present on-disk. +func (w *watcher) readFS() error { + w.log.Info("listing directory", zap.String("path", w.path)) + dirContents, err := os.ReadDir(w.path) + if err != nil { + w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err)) + return errors.Wrapf(err, "failed to read %s", w.path) + } + if len(dirContents) == 0 { + w.log.Info("no missed deletes found") + return nil + } + w.lock.Lock() + for _, file := range dirContents { + w.log.Info("adding missed delete from file", zap.String("name", file.Name())) + w.pendingDelete[file.Name()] = struct{}{} + } + w.lock.Unlock() + return nil +} + +// WatchFS starts the filesystem watcher to handle async Pod deletes. +// Blocks until the context is closed; returns underlying fsnotify errors +// if something goes fatally wrong. +func (w *watcher) Start(ctx context.Context) error { + errs := make(chan error) + // Start watching for enqueued missed deletes so that we process them as soon as they arrive. + go func(errs chan<- error) { + errs <- w.watchPendingDelete(ctx) + }(errs) + + // Start watching for changes to the filesystem so that we don't miss any async deletes. + go func(errs chan<- error) { + errs <- w.watchFS(ctx) + }(errs) + + // Read the directory to enqueue any missed deletes that are already present on-disk. + if err := w.readFS(); err != nil { + return err + } + + // block until one of the goroutines returns an error + err := <-errs + return err +} + +// AddFile creates new file using the containerID as name +func AddFile(podInterfaceID, containerID, path string) error { + filepath := path + "/" + containerID + f, err := os.Create(filepath) + if err != nil { + return errors.Wrap(err, "error creating file") + } + _, writeErr := f.WriteString(podInterfaceID) + if writeErr != nil { + return errors.Wrap(writeErr, "error writing to file") + } + return errors.Wrap(f.Close(), "error adding file to directory") +} + +// removeFile removes the file based on containerID +func removeFile(containerID, path string) error { + filepath := path + "/" + containerID + if err := os.Remove(filepath); err != nil { + return errors.Wrap(err, "error deleting file") + } + return nil +} + +// call cns ReleaseIPs +func (w *watcher) releaseIP(ctx context.Context, podInterfaceID, containerID string) error { + ipconfigreq := &cns.IPConfigRequest{ + PodInterfaceID: podInterfaceID, + InfraContainerID: containerID, + } + return errors.Wrap(w.cli.ReleaseIPAddress(ctx, *ipconfigreq), "failed to release IP from CNS") +} diff --git a/cns/fsnotify/fsnotify_test.go b/cns/fsnotify/fsnotify_test.go new file mode 100644 index 0000000000..d5e081321f --- /dev/null +++ b/cns/fsnotify/fsnotify_test.go @@ -0,0 +1,87 @@ +package fsnotify + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAddFile(t *testing.T) { + type args struct { + podInterfaceID string + containerID string + path string + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "no such directory, add fail", + args: args{ + podInterfaceID: "123", + containerID: "67890", + path: "/bad/path", + }, + wantErr: true, + }, + { + name: "added file to directory", + args: args{ + podInterfaceID: "345", + containerID: "12345", + path: "/path/we/want", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := os.MkdirAll("/path/we/want", 0o777) + require.NoError(t, err) + if err := AddFile(tt.args.podInterfaceID, tt.args.containerID, tt.args.path); (err != nil) != tt.wantErr { + t.Errorf("WatcherAddFile() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestWatcherRemoveFile(t *testing.T) { + type args struct { + containerID string + path string + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "remove file fail", + args: args{ + containerID: "12345", + path: "/bad/path", + }, + wantErr: true, + }, + { + name: "no such directory, add fail", + args: args{ + containerID: "67890", + path: "/path/we/want", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := os.MkdirAll("/path/we/want/67890", 0o777) + require.NoError(t, err) + if err := removeFile(tt.args.containerID, tt.args.path); (err != nil) != tt.wantErr { + t.Errorf("WatcherRemoveFile() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/cns/service/main.go b/cns/service/main.go index 56c4204fd5..b12436c2a5 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -22,11 +22,13 @@ import ( "github.com/Azure/azure-container-networking/cnm/ipam" "github.com/Azure/azure-container-networking/cnm/network" "github.com/Azure/azure-container-networking/cns" + cnsclient "github.com/Azure/azure-container-networking/cns/client" cnscli "github.com/Azure/azure-container-networking/cns/cmd/cli" "github.com/Azure/azure-container-networking/cns/cniconflist" "github.com/Azure/azure-container-networking/cns/cnireconciler" "github.com/Azure/azure-container-networking/cns/common" "github.com/Azure/azure-container-networking/cns/configuration" + "github.com/Azure/azure-container-networking/cns/fsnotify" "github.com/Azure/azure-container-networking/cns/healthserver" "github.com/Azure/azure-container-networking/cns/hnsclient" "github.com/Azure/azure-container-networking/cns/ipampool" @@ -82,6 +84,8 @@ const ( // envVarEnableCNIConflistGeneration enables cni conflist generation if set (value doesn't matter) envVarEnableCNIConflistGeneration = "CNS_ENABLE_CNI_CONFLIST_GENERATION" + + cnsReqTimeout = 15 * time.Second ) type cniConflistScenario string @@ -793,6 +797,25 @@ func main() { } } + if cnsconfig.EnableAsyncPodDelete { + // Start fs watcher here + cnsclient, err := cnsclient.New("", cnsReqTimeout) //nolint + if err != nil { + z.Error("failed to create cnsclient", zap.Error(err)) + } + go func() { + for { + z.Info("starting fsnotify watcher to process missed Pod deletes") + w := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z) + if err := w.Start(rootCtx); err != nil { + z.Error("failed to start fsnotify watcher, will retry", zap.Error(err)) + time.Sleep(time.Minute) + continue + } + } + }() + } + if !disableTelemetry { go logger.SendHeartBeat(rootCtx, cnsconfig.TelemetrySettings.HeartBeatIntervalInMins) go httpRestService.SendNCSnapShotPeriodically(rootCtx, cnsconfig.TelemetrySettings.SnapshotIntervalInMins) diff --git a/test/integration/manifests/cnsconfig/swiftconfigmap.yaml b/test/integration/manifests/cnsconfig/swiftconfigmap.yaml index 14a0359ffe..76bc6642c7 100644 --- a/test/integration/manifests/cnsconfig/swiftconfigmap.yaml +++ b/test/integration/manifests/cnsconfig/swiftconfigmap.yaml @@ -24,5 +24,7 @@ data: "ChannelMode": "CRD", "InitializeFromCNI": true, "ManageEndpointState": false, - "ProgramSNATIPTables" : false + "ProgramSNATIPTables" : false, + "EnableAsyncPodDelete": true, + "AsyncPodDeletePath": "/var/run/azure-vnet/deleteIDs" }