Skip to content

Commit

Permalink
Fix race condition in tunnel server startup
Browse files Browse the repository at this point in the history
Several places in the code used a 5-second retry loop to wait on
Runtime.Core to be set. This caused a race condition where OnChange
handlers could be added after the Wrangler shared informers were already
started. When this happened, the handlers were never called because the
shared informers they relied upon were not started.

Fix that by requiring anything that waits on Runtime.Core to run from a
cluster controller startup hook that is guaranteed to be called before
the shared informers are started, instead of just firing it off in a
goroutine that retries until it is set.

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Apr 28, 2023
1 parent 1ca035a commit c44d33d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 40 deletions.
33 changes: 8 additions & 25 deletions pkg/cluster/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *Cluster) start(ctx context.Context) error {
if _, err := os.Stat(resetFile); err == nil {
// Before removing reset file we need to delete the node passwd secret in case the node
// password from the previously restored snapshot differs from the current password on disk.
go c.deleteNodePasswdSecret(ctx)
c.config.Runtime.ClusterControllerStarts["node-password-secret-cleanup"] = c.deleteNodePasswdSecret
os.Remove(resetFile)
}

Expand Down Expand Up @@ -176,30 +176,13 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {

// deleteNodePasswdSecret wipes out the node password secret after restoration
func (c *Cluster) deleteNodePasswdSecret(ctx context.Context) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logrus.Infof("waiting for node name to be set")
continue
nodeName := os.Getenv("NODE_NAME")
secretsClient := c.config.Runtime.Core.Core().V1().Secret()
if err := nodepassword.Delete(secretsClient, nodeName); err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("node password secret is not found for node %s", nodeName)
return
}
// the core factory may not yet be initialized so we
// want to wait until it is so not to evoke a panic.
if c.config.Runtime.Core == nil {
logrus.Infof("runtime is not yet initialized")
continue
}
secretsClient := c.config.Runtime.Core.Core().V1().Secret()
if err := nodepassword.Delete(secretsClient, nodeName); err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("node password secret is not found for node %s", nodeName)
return
}
logrus.Warnf("failed to delete old node password secret: %v", err)
continue
}
return
logrus.Warnf("failed to delete old node password secret: %v", err)
}

}
19 changes: 5 additions & 14 deletions pkg/daemons/control/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/proxy"
Expand Down Expand Up @@ -45,7 +44,7 @@ func setupTunnel(ctx context.Context, cfg *config.Control) (http.Handler, error)
server: remotedialer.New(authorizer, loggingErrorWriter),
egress: map[string]bool{},
}
go tunnel.watch(ctx)
cfg.Runtime.ClusterControllerStarts["tunnel-server"] = tunnel.watch
return tunnel, nil
}

Expand Down Expand Up @@ -112,17 +111,10 @@ func (t *TunnelServer) watch(ctx context.Context) {
return
}

for {
if t.config.Runtime.Core != nil {
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
switch t.config.EgressSelectorMode {
case config.EgressSelectorModeCluster, config.EgressSelectorModePod:
t.config.Runtime.Core.Core().V1().Pod().OnChange(ctx, version.Program+"-tunnel-server", t.onChangePod)
}
return
}
logrus.Infof("Tunnel server egress proxy waiting for runtime core to become available")
time.Sleep(5 * time.Second)
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
switch t.config.EgressSelectorMode {
case config.EgressSelectorModeCluster, config.EgressSelectorModePod:
t.config.Runtime.Core.Core().V1().Pod().OnChange(ctx, version.Program+"-tunnel-server", t.onChangePod)
}
}

Expand Down Expand Up @@ -173,7 +165,6 @@ func (t *TunnelServer) onChangePod(podName string, pod *v1.Pod) (*v1.Pod, error)
}
}
return pod, nil

}

// serveConnect attempts to handle the HTTP CONNECT request by dialing
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ func apiserverControllers(ctx context.Context, sc *Context, config *Config) {
panic(errors.Wrapf(err, "failed to start %s leader controller", util.GetFunctionName(controller)))
}
}

// Re-run context startup after core and leader-elected controllers have started. Additional
// informer caches may need to start for the newly added OnChange callbacks.
if err := sc.Start(ctx); err != nil {
panic(err)
panic(errors.Wrap(err, "failed to start wranger controllers"))
}
}

Expand Down

0 comments on commit c44d33d

Please sign in to comment.