Skip to content

Commit

Permalink
csi: reuse gRPC client in plugin supervisor hook
Browse files Browse the repository at this point in the history
The plugin supervisor hook creates a new gRPC client for every probe
and then throws it away. Instead, reuse the client as we do for the
plugin manager.
  • Loading branch information
tgross committed Feb 15, 2022
1 parent e898121 commit c2b7dda
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)

client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()

t := time.NewTimer(0)

// Step 1: Wait for the plugin to initially become available.
Expand All @@ -210,7 +216,7 @@ WAITFORREADY:
case <-ctx.Done():
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil || !pluginHealthy {
h.logger.Debug("CSI Plugin not ready", "error", err)

Expand All @@ -232,9 +238,9 @@ WAITFORREADY:
}

// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(socketPath)
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
if err != nil {
h.logger.Error("CSI Plugin registration failed", "error", err)
h.logger.Error("CSI plugin registration failed", "error", err)
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
event.SetMessage(fmt.Sprintf("failed to register plugin: %s, reason: %v", h.task.CSIPluginConfig.ID, err))
h.eventEmitter.EmitEvent(event)
Expand All @@ -249,9 +255,9 @@ WAITFORREADY:
deregisterPluginFn()
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil {
h.logger.Error("CSI Plugin fingerprinting failed", "error", err)
h.logger.Error("CSI plugin fingerprinting failed", "error", err)
}

// The plugin has transitioned to a healthy state. Emit an event.
Expand Down Expand Up @@ -281,13 +287,9 @@ WAITFORREADY:
}
}

func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), error) {

func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) {
// At this point we know the plugin is ready and we can fingerprint it
// to get its vendor name and version
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()

info, err := client.PluginInfo()
if err != nil {
return nil, fmt.Errorf("failed to probe plugin: %v", err)
Expand Down Expand Up @@ -351,16 +353,11 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err
}, nil
}

func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socketPath string) (bool, error) {
_, err := os.Stat(socketPath)
if err != nil {
return false, fmt.Errorf("failed to stat socket: %v", err)
}

client := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) {
probeCtx, probeCancelFn := context.WithTimeout(ctx, 5*time.Second)
defer probeCancelFn()

healthy, err := client.PluginProbe(ctx)
healthy, err := client.PluginProbe(probeCtx)
if err != nil {
return false, fmt.Errorf("failed to probe plugin: %v", err)
}
Expand Down

0 comments on commit c2b7dda

Please sign in to comment.