Skip to content

Commit

Permalink
Merge pull request #1154 from dcbw/shared-informer
Browse files Browse the repository at this point in the history
Performance and efficiency improvements in daemon/server mode
  • Loading branch information
dougbtv authored Sep 15, 2023
2 parents 02ce071 + d9c06e9 commit ddb977f
Show file tree
Hide file tree
Showing 364 changed files with 31,563 additions and 304 deletions.
98 changes: 30 additions & 68 deletions cmd/multus-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"syscall"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"

"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
Expand Down Expand Up @@ -58,9 +57,6 @@ func main() {
os.Exit(4)
}

configWatcherDoneChannel := make(chan struct{})
serverDoneChannel := make(chan struct{})
multusConfigFile := ""
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -87,65 +83,42 @@ func main() {
logging.Verbosef("Readiness Indicator file check done!")
}

if err := startMultusDaemon(ctx, daemonConf, serverDoneChannel); err != nil {
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
os.Exit(3)
}

// Wait until daemon ready
logging.Verbosef("API readiness check")
if waitUntilAPIReady(daemonConf.SocketDir) != nil {
logging.Panicf("failed to ready multus-daemon socket: %v", err)
os.Exit(1)
}
logging.Verbosef("API readiness check done!")

// Generate multus CNI config from current CNI config
var configManager *config.Manager
var ignoreReadinessIndicator bool
if multusConf.MultusConfigFile == "auto" {
if multusConf.CNIVersion == "" {
_ = logging.Errorf("the CNI version is a mandatory parameter when the '-multus-config-file=auto' option is used")
}

var configManager *config.Manager
if multusConf.MultusMasterCni == "" {
configManager, err = config.NewManager(*multusConf, multusConf.MultusAutoconfigDir, multusConf.ForceCNIVersion)
} else {
configManager, err = config.NewManagerWithExplicitPrimaryCNIPlugin(
*multusConf, multusConf.MultusAutoconfigDir, multusConf.MultusMasterCni, multusConf.ForceCNIVersion)
}
// Generate multus CNI config from current CNI config
configManager, err = config.NewManager(*multusConf)
if err != nil {
_ = logging.Errorf("failed to create the configuration manager for the primary CNI plugin: %v", err)
os.Exit(2)
}

if multusConf.OverrideNetworkName {
if err := configManager.OverrideNetworkName(); err != nil {
_ = logging.Errorf("could not override the network name: %v", err)
}
}

generatedMultusConfig, err := configManager.GenerateConfig()
if err != nil {
_ = logging.Errorf("failed to generated the multus configuration: %v", err)
}
logging.Verbosef("Generated MultusCNI config: %s", generatedMultusConfig)

multusConfigFile, err = configManager.PersistMultusConfig(generatedMultusConfig)
if err != nil {
_ = logging.Errorf("failed to persist the multus configuration: %v", err)
}

go func(ctx context.Context, doneChannel chan<- struct{}) {
if err := configManager.MonitorPluginConfiguration(ctx, doneChannel); err != nil {
_ = logging.Errorf("error watching file: %v", err)
}
}(ctx, configWatcherDoneChannel)
// ConfigManager watches the readiness indicator file (if configured)
// and exits the daemon when that is removed. The CNIServer does
// not need to re-do that check every CNI operation
ignoreReadinessIndicator = true
} else {
if err := copyUserProvidedConfig(multusConf.MultusConfigFile, multusConf.CniConfigDir); err != nil {
logging.Errorf("failed to copy the user provided configuration %s: %v", multusConf.MultusConfigFile, err)
}
}

if err := startMultusDaemon(ctx, daemonConf, ignoreReadinessIndicator); err != nil {
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
os.Exit(3)
}

// Wait until daemon ready
logging.Verbosef("API readiness check")
if waitUntilAPIReady(daemonConf.SocketDir) != nil {
logging.Panicf("failed to ready multus-daemon socket: %v", err)
os.Exit(1)
}
logging.Verbosef("API readiness check done!")

signalCh := make(chan os.Signal, 16)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
Expand All @@ -156,15 +129,11 @@ func main() {
}()

var wg sync.WaitGroup
if multusConf.MultusConfigFile == "auto" {
wg.Add(1)
go func() {
<-configWatcherDoneChannel
logging.Verbosef("ConfigWatcher done")
logging.Verbosef("Delete old config @ %v", multusConfigFile)
os.Remove(multusConfigFile)
wg.Done()
}()
if configManager != nil {
if err := configManager.Start(ctx, &wg); err != nil {
_ = logging.Errorf("failed to start config manager: %v", err)
os.Exit(3)
}
}

wg.Wait()
Expand All @@ -181,7 +150,7 @@ func waitUntilAPIReady(socketPath string) error {
})
}

func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, done chan struct{}) error {
func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, ignoreReadinessIndicator bool) error {
if user, err := user.Current(); err != nil || user.Uid != "0" {
return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid)
}
Expand All @@ -190,7 +159,7 @@ func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf,
return fmt.Errorf("failed to prepare the cni-socket for communicating with the shim: %w", err)
}

server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents)
server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents, ignoreReadinessIndicator)
if err != nil {
return fmt.Errorf("failed to create the server: %v", err)
}
Expand All @@ -208,15 +177,8 @@ func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf,
return fmt.Errorf("failed to start the CNI server using socket %s. Reason: %+v", api.SocketPath(daemonConfig.SocketDir), err)
}

server.SetKeepAlivesEnabled(false)
go func() {
utilwait.UntilWithContext(ctx, func(ctx context.Context) {
logging.Debugf("open for business")
if err := server.Serve(l); err != nil {
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
}
}, 0)
}()
server.Start(ctx, l)

go func() {
<-ctx.Done()
server.Shutdown(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions cmd/multus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {

skel.PluginMain(
func(args *skel.CmdArgs) error {
result, err := multus.CmdAdd(args, nil, nil)
result, err := multus.CmdAdd(args, nil, nil, nil)
if err != nil {
return err
}
Expand All @@ -54,6 +54,6 @@ func main() {
func(args *skel.CmdArgs) error {
return multus.CmdCheck(args, nil, nil)
},
func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil) },
func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil, nil) },
cniversion.All, "meta-plugin that delegates to other CNI plugins")
}
6 changes: 6 additions & 0 deletions deployments/multus-daemonset-thick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ rules:
- pods/status
verbs:
- get
- list
- update
- apiGroups:
- ""
Expand Down Expand Up @@ -183,6 +184,11 @@ spec:
- name: hostroot
mountPath: /hostroot
mountPropagation: HostToContainer
env:
- name: MULTUS_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
initContainers:
- name: install-multus-binary
image: ghcr.io/k8snetworkplumbingwg/multus-cni:snapshot-thick
Expand Down
6 changes: 6 additions & 0 deletions e2e/templates/multus-daemonset-thick.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ rules:
- pods/status
verbs:
- get
- list
- update
- apiGroups:
- ""
Expand Down Expand Up @@ -157,6 +158,11 @@ spec:
- name: multus-daemon-config
mountPath: /etc/cni/net.d/multus.d
readOnly: true
env:
- name: MULTUS_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
initContainers:
- name: install-multus-shim
image: localhost:5000/multus:e2e
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
k8s.io/api v0.22.8
k8s.io/apimachinery v0.22.8
k8s.io/client-go v0.22.8
k8s.io/client-go v1.5.2
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/kube-openapi v0.0.0-20220413171646-5e7f5fdc6da6 // indirect
Expand Down
19 changes: 12 additions & 7 deletions pkg/k8sclient/k8sclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,14 @@ func TryLoadPodDelegates(pod *v1.Pod, conf *types.NetConf, clientInfo *ClientInf
// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to
// the k8s API.
func InClusterK8sClient() (*ClientInfo, error) {
config, err := rest.InClusterConfig()
clientInfo, err := GetK8sClient("", nil)
if err != nil {
return nil, err
}

logging.Debugf("InClusterK8sClient: in cluster config: %+v", config)
return NewClientInfo(config)
if clientInfo == nil {
return nil, fmt.Errorf("failed to create in-cluster kube client")
}
return clientInfo, err
}

// GetK8sClient gets client info from kubeconfig
Expand Down Expand Up @@ -440,13 +441,17 @@ func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error
config.ContentType = "application/vnd.kubernetes.protobuf"
// Set the config timeout to one minute.
config.Timeout = time.Minute
// Allow multus (especially in server mode) to make more concurrent requests
// to reduce client-side throttling
config.QPS = 50
config.Burst = 50

return NewClientInfo(config)
return newClientInfo(config)
}

// NewClientInfo returns a `ClientInfo` from a configuration created from an
// newClientInfo returns a `ClientInfo` from a configuration created from an
// existing kubeconfig file.
func NewClientInfo(config *rest.Config) (*ClientInfo, error) {
func newClientInfo(config *rest.Config) (*ClientInfo, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit ddb977f

Please sign in to comment.