diff --git a/client/driver/docker.go b/client/driver/docker.go index 48897077c03..028589d4c0f 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -83,11 +83,12 @@ func (c *DockerDriverConfig) Validate() error { } type dockerPID struct { - Version string - ImageID string - ContainerID string - KillTimeout time.Duration - PluginConfig *PluginReattachConfig + Version string + ImageID string + ContainerID string + KillTimeout time.Duration + MaxKillTimeout time.Duration + PluginConfig *PluginReattachConfig } type DockerHandle struct { @@ -101,6 +102,7 @@ type DockerHandle struct { containerID string version string killTimeout time.Duration + maxKillTimeout time.Duration waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -600,6 +602,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle d.logger.Printf("[INFO] driver.docker: started container %s", container.ID) // Return a driver handle + maxKill := d.DriverContext.config.MaxKillTimeout h := &DockerHandle{ client: client, logCollector: logCollector, @@ -610,7 +613,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle imageID: dockerImage.ID, containerID: container.ID, version: d.config.Version, - killTimeout: d.DriverContext.KillTimeout(task), + killTimeout: GetKillTimeout(task.KillTimeout, maxKill), + maxKillTimeout: maxKill, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } @@ -679,6 +683,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er containerID: pid.ContainerID, version: pid.Version, killTimeout: pid.KillTimeout, + maxKillTimeout: pid.MaxKillTimeout, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } @@ -689,11 +694,12 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er func (h *DockerHandle) ID() string { // Return a handle to the PID pid := dockerPID{ - Version: h.version, - ImageID: h.imageID, - ContainerID: h.containerID, - KillTimeout: h.killTimeout, - PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), + Version: h.version, + ImageID: h.imageID, + ContainerID: h.containerID, + KillTimeout: h.killTimeout, + MaxKillTimeout: h.maxKillTimeout, + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), } data, err := json.Marshal(pid) if err != nil { @@ -712,7 +718,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { func (h *DockerHandle) Update(task *structs.Task) error { // Store the updated kill timeout. - h.killTimeout = task.KillTimeout + h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil { h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err) } diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index b0cbd615216..9e641c219b3 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -152,18 +152,19 @@ func TestDockerDriver_Handle(t *testing.T) { defer pluginClient.Kill() h := &DockerHandle{ - version: "version", - imageID: "imageid", - logCollector: logCollector, - pluginClient: pluginClient, - containerID: "containerid", - killTimeout: 5 * time.Nanosecond, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + version: "version", + imageID: "imageid", + logCollector: logCollector, + pluginClient: pluginClient, + containerID: "containerid", + killTimeout: 5 * time.Nanosecond, + maxKillTimeout: 15 * time.Nanosecond, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() - expected := fmt.Sprintf("DOCKER:{\"Version\":\"version\",\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}", + expected := fmt.Sprintf("DOCKER:{\"Version\":\"version\",\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"MaxKillTimeout\":15,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}", pluginClient.ReattachConfig().Pid, pluginClient.ReattachConfig().Addr.String()) if actual != expected { t.Errorf("Expected `%s`, found `%s`", expected, actual) diff --git a/client/driver/driver.go b/client/driver/driver.go index 7d13292339b..c4e8f772640 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -5,7 +5,6 @@ import ( "log" "path/filepath" "sync" - "time" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -84,24 +83,6 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node } } -// KillTimeout returns the timeout that should be used for the task between -// signaling and killing the task. -func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration { - max := d.config.MaxKillTimeout.Nanoseconds() - desired := task.KillTimeout.Nanoseconds() - - // Make the minimum time between signal and kill, 1 second. - if desired == 0 { - desired = (1 * time.Second).Nanoseconds() - } - - if desired < max { - return time.Duration(desired) - } - - return d.config.MaxKillTimeout -} - // DriverHandle is an opaque handle into a driver used for task // manipulation type DriverHandle interface { diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 89beb40bf28..d1ce7963c0c 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -66,24 +66,6 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) { return driverCtx, execCtx } -func TestDriver_KillTimeout(t *testing.T) { - expected := 1 * time.Second - task := &structs.Task{Name: "foo", KillTimeout: expected} - ctx, _ := testDriverContexts(task) - ctx.config.MaxKillTimeout = 10 * time.Second - - if actual := ctx.KillTimeout(task); expected != actual { - t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected) - } - - expected = 10 * time.Second - task = &structs.Task{KillTimeout: 11 * time.Second} - - if actual := ctx.KillTimeout(task); expected != actual { - t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected) - } -} - func TestDriver_GetTaskEnv(t *testing.T) { t.Parallel() task := &structs.Task{ diff --git a/client/driver/exec.go b/client/driver/exec.go index cf8e021cc75..b134cc3decd 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -42,6 +42,7 @@ type execHandle struct { userPid int allocDir *allocdir.AllocDir killTimeout time.Duration + maxKillTimeout time.Duration logger *log.Logger waitCh chan *cstructs.WaitResult doneCh chan struct{} @@ -134,13 +135,15 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid) // Return a driver handle + maxKill := d.DriverContext.config.MaxKillTimeout h := &execHandle{ pluginClient: pluginClient, userPid: ps.Pid, executor: exec, allocDir: ctx.AllocDir, isolationConfig: ps.IsolationConfig, - killTimeout: d.DriverContext.KillTimeout(task), + killTimeout: GetKillTimeout(task.KillTimeout, maxKill), + maxKillTimeout: maxKill, logger: d.logger, version: d.config.Version, doneCh: make(chan struct{}), @@ -153,6 +156,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, type execId struct { Version string KillTimeout time.Duration + MaxKillTimeout time.Duration UserPid int TaskDir string AllocDir *allocdir.AllocDir @@ -198,6 +202,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro logger: d.logger, version: id.Version, killTimeout: id.KillTimeout, + maxKillTimeout: id.MaxKillTimeout, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } @@ -209,6 +214,7 @@ func (h *execHandle) ID() string { id := execId{ Version: h.version, KillTimeout: h.killTimeout, + MaxKillTimeout: h.maxKillTimeout, PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, AllocDir: h.allocDir, @@ -228,7 +234,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult { func (h *execHandle) Update(task *structs.Task) error { // Store the updated kill timeout. - h.killTimeout = task.KillTimeout + h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible diff --git a/client/driver/java.go b/client/driver/java.go index b9908356b16..e662052678b 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -47,13 +47,14 @@ type javaHandle struct { executor executor.Executor isolationConfig *cstructs.IsolationConfig - taskDir string - allocDir *allocdir.AllocDir - killTimeout time.Duration - version string - logger *log.Logger - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + taskDir string + allocDir *allocdir.AllocDir + killTimeout time.Duration + maxKillTimeout time.Duration + version string + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewJavaDriver is used to create a new exec driver @@ -182,6 +183,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, d.logger.Printf("[DEBUG] driver.java: started process with pid: %v", ps.Pid) // Return a driver handle + maxKill := d.DriverContext.config.MaxKillTimeout h := &javaHandle{ pluginClient: pluginClient, executor: exec, @@ -189,7 +191,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, isolationConfig: ps.IsolationConfig, taskDir: taskDir, allocDir: ctx.AllocDir, - killTimeout: d.DriverContext.KillTimeout(task), + killTimeout: GetKillTimeout(task.KillTimeout, maxKill), + maxKillTimeout: maxKill, version: d.config.Version, logger: d.logger, doneCh: make(chan struct{}), @@ -210,6 +213,7 @@ func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool { type javaId struct { Version string KillTimeout time.Duration + MaxKillTimeout time.Duration PluginConfig *PluginReattachConfig IsolationConfig *cstructs.IsolationConfig TaskDir string @@ -257,6 +261,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro logger: d.logger, version: id.Version, killTimeout: id.KillTimeout, + maxKillTimeout: id.MaxKillTimeout, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } @@ -269,6 +274,7 @@ func (h *javaHandle) ID() string { id := javaId{ Version: h.version, KillTimeout: h.killTimeout, + MaxKillTimeout: h.maxKillTimeout, PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, TaskDir: h.taskDir, @@ -289,7 +295,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { func (h *javaHandle) Update(task *structs.Task) error { // Store the updated kill timeout. - h.killTimeout = task.KillTimeout + h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 1ec44f4f183..e4efe585888 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -44,15 +44,16 @@ type QemuDriverConfig struct { // qemuHandle is returned from Start/Open as a handle to the PID type qemuHandle struct { - pluginClient *plugin.Client - userPid int - executor executor.Executor - allocDir *allocdir.AllocDir - killTimeout time.Duration - logger *log.Logger - version string - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + userPid int + executor executor.Executor + allocDir *allocdir.AllocDir + killTimeout time.Duration + maxKillTimeout time.Duration + logger *log.Logger + version string + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewQemuDriver is used to create a new exec driver @@ -219,16 +220,18 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, d.logger.Printf("[INFO] Started new QemuVM: %s", vmID) // Create and Return Handle + maxKill := d.DriverContext.config.MaxKillTimeout h := &qemuHandle{ - pluginClient: pluginClient, - executor: exec, - userPid: ps.Pid, - allocDir: ctx.AllocDir, - killTimeout: d.DriverContext.KillTimeout(task), - version: d.config.Version, - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + userPid: ps.Pid, + allocDir: ctx.AllocDir, + killTimeout: GetKillTimeout(task.KillTimeout, maxKill), + maxKillTimeout: maxKill, + version: d.config.Version, + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -236,11 +239,12 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } type qemuId struct { - Version string - KillTimeout time.Duration - UserPid int - PluginConfig *PluginReattachConfig - AllocDir *allocdir.AllocDir + Version string + KillTimeout time.Duration + MaxKillTimeout time.Duration + UserPid int + PluginConfig *PluginReattachConfig + AllocDir *allocdir.AllocDir } func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -264,15 +268,16 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro // Return a driver handle h := &qemuHandle{ - pluginClient: pluginClient, - executor: executor, - userPid: id.UserPid, - allocDir: id.AllocDir, - logger: d.logger, - killTimeout: id.KillTimeout, - version: id.Version, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: executor, + userPid: id.UserPid, + allocDir: id.AllocDir, + logger: d.logger, + killTimeout: id.KillTimeout, + maxKillTimeout: id.MaxKillTimeout, + version: id.Version, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -280,11 +285,12 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *qemuHandle) ID() string { id := qemuId{ - Version: h.version, - KillTimeout: h.killTimeout, - PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), - UserPid: h.userPid, - AllocDir: h.allocDir, + Version: h.version, + KillTimeout: h.killTimeout, + MaxKillTimeout: h.maxKillTimeout, + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), + UserPid: h.userPid, + AllocDir: h.allocDir, } data, err := json.Marshal(id) @@ -300,7 +306,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { func (h *qemuHandle) Update(task *structs.Task) error { // Store the updated kill timeout. - h.killTimeout = task.KillTimeout + h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 00cdafc30a2..f029525b589 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -35,15 +35,16 @@ type RawExecDriver struct { // rawExecHandle is returned from Start/Open as a handle to the PID type rawExecHandle struct { - version string - pluginClient *plugin.Client - userPid int - executor executor.Executor - killTimeout time.Duration - allocDir *allocdir.AllocDir - logger *log.Logger - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + version string + pluginClient *plugin.Client + userPid int + executor executor.Executor + killTimeout time.Duration + maxKillTimeout time.Duration + allocDir *allocdir.AllocDir + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewRawExecDriver is used to create a new raw exec driver @@ -125,27 +126,30 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl d.logger.Printf("[DEBUG] driver.raw_exec: started process with pid: %v", ps.Pid) // Return a driver handle + maxKill := d.DriverContext.config.MaxKillTimeout h := &rawExecHandle{ - pluginClient: pluginClient, - executor: exec, - userPid: ps.Pid, - killTimeout: d.DriverContext.KillTimeout(task), - allocDir: ctx.AllocDir, - version: d.config.Version, - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + userPid: ps.Pid, + killTimeout: GetKillTimeout(task.KillTimeout, maxKill), + maxKillTimeout: maxKill, + allocDir: ctx.AllocDir, + version: d.config.Version, + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } type rawExecId struct { - Version string - KillTimeout time.Duration - UserPid int - PluginConfig *PluginReattachConfig - AllocDir *allocdir.AllocDir + Version string + KillTimeout time.Duration + MaxKillTimeout time.Duration + UserPid int + PluginConfig *PluginReattachConfig + AllocDir *allocdir.AllocDir } func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -168,15 +172,16 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e // Return a driver handle h := &rawExecHandle{ - pluginClient: pluginClient, - executor: executor, - userPid: id.UserPid, - logger: d.logger, - killTimeout: id.KillTimeout, - allocDir: id.AllocDir, - version: id.Version, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: executor, + userPid: id.UserPid, + logger: d.logger, + killTimeout: id.KillTimeout, + maxKillTimeout: id.MaxKillTimeout, + allocDir: id.AllocDir, + version: id.Version, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -184,11 +189,12 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e func (h *rawExecHandle) ID() string { id := rawExecId{ - Version: h.version, - KillTimeout: h.killTimeout, - PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), - UserPid: h.userPid, - AllocDir: h.allocDir, + Version: h.version, + KillTimeout: h.killTimeout, + MaxKillTimeout: h.maxKillTimeout, + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), + UserPid: h.userPid, + AllocDir: h.allocDir, } data, err := json.Marshal(id) @@ -204,7 +210,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { func (h *rawExecHandle) Update(task *structs.Task) error { // Store the updated kill timeout. - h.killTimeout = task.KillTimeout + h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 574fc74f202..b4762012221 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -55,23 +55,25 @@ type RktDriverConfig struct { // rktHandle is returned from Start/Open as a handle to the PID type rktHandle struct { - pluginClient *plugin.Client - executorPid int - executor executor.Executor - allocDir *allocdir.AllocDir - logger *log.Logger - killTimeout time.Duration - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + executorPid int + executor executor.Executor + allocDir *allocdir.AllocDir + logger *log.Logger + killTimeout time.Duration + maxKillTimeout time.Duration + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // rktPID is a struct to map the pid running the process to the vm image on // disk type rktPID struct { - PluginConfig *PluginReattachConfig - AllocDir *allocdir.AllocDir - ExecutorPid int - KillTimeout time.Duration + PluginConfig *PluginReattachConfig + AllocDir *allocdir.AllocDir + ExecutorPid int + KillTimeout time.Duration + MaxKillTimeout time.Duration } // NewRktDriver is used to create a new exec driver @@ -227,15 +229,17 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e } d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmdArgs) + maxKill := d.DriverContext.config.MaxKillTimeout h := &rktHandle{ - pluginClient: pluginClient, - executor: exec, - executorPid: ps.Pid, - allocDir: ctx.AllocDir, - logger: d.logger, - killTimeout: d.DriverContext.KillTimeout(task), - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + executorPid: ps.Pid, + allocDir: ctx.AllocDir, + logger: d.logger, + killTimeout: GetKillTimeout(task.KillTimeout, maxKill), + maxKillTimeout: maxKill, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -244,18 +248,18 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { // Parse the handle pidBytes := []byte(strings.TrimPrefix(handleID, "Rkt:")) - qpid := &rktPID{} - if err := json.Unmarshal(pidBytes, qpid); err != nil { + id := &rktPID{} + if err := json.Unmarshal(pidBytes, id); err != nil { return nil, fmt.Errorf("failed to parse Rkt handle '%s': %v", handleID, err) } pluginConfig := &plugin.ClientConfig{ - Reattach: qpid.PluginConfig.PluginConfig(), + Reattach: id.PluginConfig.PluginConfig(), } executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { d.logger.Println("[ERROR] driver.rkt: error connecting to plugin so destroying plugin pid and user pid") - if e := destroyPlugin(qpid.PluginConfig.Pid, qpid.ExecutorPid); e != nil { + if e := destroyPlugin(id.PluginConfig.Pid, id.ExecutorPid); e != nil { d.logger.Printf("[ERROR] driver.rkt: error destroying plugin and executor pid: %v", e) } return nil, fmt.Errorf("error connecting to plugin: %v", err) @@ -263,14 +267,15 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error // Return a driver handle h := &rktHandle{ - pluginClient: pluginClient, - executorPid: qpid.ExecutorPid, - allocDir: qpid.AllocDir, - executor: executor, - logger: d.logger, - killTimeout: qpid.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executorPid: id.ExecutorPid, + allocDir: id.AllocDir, + executor: executor, + logger: d.logger, + killTimeout: id.KillTimeout, + maxKillTimeout: id.MaxKillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -280,10 +285,11 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error func (h *rktHandle) ID() string { // Return a handle to the PID pid := &rktPID{ - PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), - KillTimeout: h.killTimeout, - ExecutorPid: h.executorPid, - AllocDir: h.allocDir, + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), + KillTimeout: h.killTimeout, + MaxKillTimeout: h.maxKillTimeout, + ExecutorPid: h.executorPid, + AllocDir: h.allocDir, } data, err := json.Marshal(pid) if err != nil { @@ -298,7 +304,7 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { func (h *rktHandle) Update(task *structs.Task) error { // Store the updated kill timeout. - h.killTimeout = task.KillTimeout + h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible diff --git a/client/driver/utils.go b/client/driver/utils.go index ce2530555cd..e8921c54227 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -5,6 +5,7 @@ import ( "io" "os" "strings" + "time" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" @@ -109,3 +110,26 @@ func validateCommand(command, argField string) error { return nil } + +// GetKillTimeout returns the kill timeout to use given the tasks desired kill +// timeout and the operator configured max kill timeout. +func GetKillTimeout(desired, max time.Duration) time.Duration { + maxNanos := max.Nanoseconds() + desiredNanos := desired.Nanoseconds() + + // Make the minimum time between signal and kill, 1 second. + if desiredNanos <= 0 { + desiredNanos = (1 * time.Second).Nanoseconds() + } + + // Protect against max not being set properly. + if maxNanos <= 0 { + maxNanos = (10 * time.Second).Nanoseconds() + } + + if desiredNanos < maxNanos { + return time.Duration(desiredNanos) + } + + return max +} diff --git a/client/driver/utils_test.go b/client/driver/utils_test.go new file mode 100644 index 00000000000..7ea78c680c9 --- /dev/null +++ b/client/driver/utils_test.go @@ -0,0 +1,22 @@ +package driver + +import ( + "testing" + "time" +) + +func TestDriver_KillTimeout(t *testing.T) { + expected := 1 * time.Second + max := 10 * time.Second + + if actual := GetKillTimeout(expected, max); expected != actual { + t.Fatalf("GetKillTimeout() returned %v; want %v", actual, expected) + } + + expected = 10 * time.Second + input := 11 * time.Second + + if actual := GetKillTimeout(input, max); expected != actual { + t.Fatalf("KillTimeout() returned %v; want %v", actual, expected) + } +}