Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of CSI: make plugin health_timeout configurable in csi_plugin stanza into release/1.2.x #13367

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/13340.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvements
csi: Made the CSI Plugin supervisor health check configurable with a new CSI Stanza health_timeout field
```
8 changes: 8 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,10 +1037,18 @@ type TaskCSIPluginConfig struct {
//
// Default is /csi.
MountDir string `mapstructure:"mount_dir" hcl:"mount_dir,optional"`

// HealthTimeout is the time after which the CSI plugin tasks will be killed
// if the CSI Plugin is not healthy.
HealthTimeout time.Duration `mapstructure:"health_timeout" hcl:"health_timeout,optional"`
}

func (t *TaskCSIPluginConfig) Canonicalize() {
if t.MountDir == "" {
t.MountDir = "/csi"
}

if t.HealthTimeout == 0 {
t.HealthTimeout = 30 * time.Second
}
}
8 changes: 6 additions & 2 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
pluginRoot := filepath.Join(config.clientStateDirPath, "csi",
string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID)

if task.CSIPluginConfig.HealthTimeout == 0 {
task.CSIPluginConfig.HealthTimeout = 30 * time.Second
}

shutdownCtx, cancelFn := context.WithCancel(context.Background())

hook := &csiPluginSupervisorHook{
Expand Down Expand Up @@ -231,7 +235,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {

// We're in Poststart at this point, so if we can't connect within
// this deadline, assume it's broken so we can restart the task
startCtx, startCancelFn := context.WithTimeout(ctx, 30*time.Second)
startCtx, startCancelFn := context.WithTimeout(ctx, h.task.CSIPluginConfig.HealthTimeout)
defer startCancelFn()

var err error
Expand Down Expand Up @@ -414,7 +418,7 @@ func (h *csiPluginSupervisorHook) kill(ctx context.Context, reason error) {
if err := h.lifecycle.Kill(ctx,
structs.NewTaskEvent(structs.TaskKilling).
SetFailsTask().
SetDisplayMessage("CSI plugin did not become healthy before timeout"),
SetDisplayMessage(fmt.Sprintf("CSI plugin did not become healthy before configured %v health timeout", h.task.CSIPluginConfig.HealthTimeout.String())),
); err != nil {
h.logger.Error("failed to kill task", "kill_reason", reason, "error", err)
}
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,7 @@ func ApiCSIPluginConfigToStructsCSIPluginConfig(apiConfig *api.TaskCSIPluginConf
sc.ID = apiConfig.ID
sc.Type = structs.CSIPluginType(apiConfig.Type)
sc.MountDir = apiConfig.MountDir
sc.HealthTimeout = apiConfig.HealthTimeout
return sc
}

Expand Down
12 changes: 10 additions & 2 deletions jobspec/parse_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,20 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
i := o.Elem().Items[0]

var m map[string]interface{}
var cfg api.TaskCSIPluginConfig
if err := hcl.DecodeObject(&m, i.Val); err != nil {
return nil, err
}

var cfg api.TaskCSIPluginConfig
if err := mapstructure.WeakDecode(m, &cfg); err != nil {
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &cfg,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, err
}

Expand Down
7 changes: 4 additions & 3 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,10 @@ func TestParse(t *testing.T) {
Name: "binstore",
Driver: "docker",
CSIPluginConfig: &api.TaskCSIPluginConfig{
ID: "org.hashicorp.csi",
Type: api.CSIPluginTypeMonolith,
MountDir: "/csi/test",
ID: "org.hashicorp.csi",
Type: api.CSIPluginTypeMonolith,
MountDir: "/csi/test",
HealthTimeout: 1 * time.Minute,
},
},
},
Expand Down
7 changes: 4 additions & 3 deletions jobspec/test-fixtures/csi-plugin.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ job "binstore-storagelocker" {
driver = "docker"

csi_plugin {
id = "org.hashicorp.csi"
type = "monolith"
mount_dir = "/csi/test"
id = "org.hashicorp.csi"
type = "monolith"
mount_dir = "/csi/test"
health_timeout = "1m"
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type TaskCSIPluginConfig struct {
// to be created by the plugin, and will provide references into
// "MountDir/CSIIntermediaryDirname/{VolumeName}/{AllocID} for mounts.
MountDir string

// HealthTimeout is the time after which the CSI plugin tasks will be killed
// if the CSI Plugin is not healthy.
HealthTimeout time.Duration `mapstructure:"health_timeout" hcl:"health_timeout,optional"`
}

func (t *TaskCSIPluginConfig) Copy() *TaskCSIPluginConfig {
Expand Down
19 changes: 13 additions & 6 deletions website/content/docs/job-specification/csi_plugin.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ to claim [volumes][csi_volumes].

```hcl
csi_plugin {
id = "csi-hostpath"
type = "monolith"
mount_dir = "/csi"
id = "csi-hostpath"
type = "monolith"
mount_dir = "/csi"
health_timeout = "30s"
}
```

Expand All @@ -43,6 +44,11 @@ csi_plugin {
container where the plugin will expect a Unix domain socket for
bidirectional communication with Nomad.

- `health_timeout` `(duration: <optional>)` - The duration that
the plugin supervisor will wait before restarting an unhealthy
CSI plugin. Must be a duration value such as `30s` or `2m`.
Defaults to `30s` if not set.

~> **Note:** Plugins running as `node` or `monolith` require root
privileges (or `CAP_SYS_ADMIN` on Linux) to mount volumes on the
host. With the Docker task driver, you can use the `privileged = true`
Expand Down Expand Up @@ -112,10 +118,11 @@ job "plugin-efs" {
}

csi_plugin {
id = "aws-efs0"
type = "node"
mount_dir = "/csi" # this path /csi matches the --endpoint
id = "aws-efs0"
type = "node"
mount_dir = "/csi" # this path /csi matches the --endpoint
# argument for the container
health_timeout = "30s"
}
}
}
Expand Down