Skip to content

Commit

Permalink
Updating kill timeout adheres to operator specified maximum
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Mar 3, 2016
1 parent 225ed95 commit 6ef841b
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 155 deletions.
30 changes: 18 additions & 12 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ func (c *DockerDriverConfig) Validate() error {
}

type dockerPID struct {
Version string
ImageID string
ContainerID string
KillTimeout time.Duration
PluginConfig *PluginReattachConfig
Version string
ImageID string
ContainerID string
KillTimeout time.Duration
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
}

type DockerHandle struct {
Expand All @@ -101,6 +102,7 @@ type DockerHandle struct {
containerID string
version string
killTimeout time.Duration
maxKillTimeout time.Duration
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}
Expand Down Expand Up @@ -600,6 +602,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &DockerHandle{
client: client,
logCollector: logCollector,
Expand All @@ -610,7 +613,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
imageID: dockerImage.ID,
containerID: container.ID,
version: d.config.Version,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand Down Expand Up @@ -679,6 +683,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
containerID: pid.ContainerID,
version: pid.Version,
killTimeout: pid.KillTimeout,
maxKillTimeout: pid.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -689,11 +694,12 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
func (h *DockerHandle) ID() string {
// Return a handle to the PID
pid := dockerPID{
Version: h.version,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
Version: h.version,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
}
data, err := json.Marshal(pid)
if err != nil {
Expand All @@ -712,7 +718,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {

func (h *DockerHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
}
Expand Down
19 changes: 0 additions & 19 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -84,24 +83,6 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node
}
}

// KillTimeout returns the timeout that should be used for the task between
// signaling and killing the task.
func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration {
max := d.config.MaxKillTimeout.Nanoseconds()
desired := task.KillTimeout.Nanoseconds()

// Make the minimum time between signal and kill, 1 second.
if desired == 0 {
desired = (1 * time.Second).Nanoseconds()
}

if desired < max {
return time.Duration(desired)
}

return d.config.MaxKillTimeout
}

// DriverHandle is an opaque handle into a driver used for task
// manipulation
type DriverHandle interface {
Expand Down
10 changes: 8 additions & 2 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type execHandle struct {
userPid int
allocDir *allocdir.AllocDir
killTimeout time.Duration
maxKillTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
Expand Down Expand Up @@ -134,13 +135,15 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &execHandle{
pluginClient: pluginClient,
userPid: ps.Pid,
executor: exec,
allocDir: ctx.AllocDir,
isolationConfig: ps.IsolationConfig,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
logger: d.logger,
version: d.config.Version,
doneCh: make(chan struct{}),
Expand All @@ -153,6 +156,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
type execId struct {
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
UserPid int
TaskDir string
AllocDir *allocdir.AllocDir
Expand Down Expand Up @@ -198,6 +202,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
logger: d.logger,
version: id.Version,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -209,6 +214,7 @@ func (h *execHandle) ID() string {
id := execId{
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
Expand All @@ -228,7 +234,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {

func (h *execHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateLogConfig(task.LogConfig)

// Update is not possible
Expand Down
24 changes: 15 additions & 9 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ type javaHandle struct {
executor executor.Executor
isolationConfig *cstructs.IsolationConfig

taskDir string
allocDir *allocdir.AllocDir
killTimeout time.Duration
version string
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
taskDir string
allocDir *allocdir.AllocDir
killTimeout time.Duration
maxKillTimeout time.Duration
version string
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}

// NewJavaDriver is used to create a new exec driver
Expand Down Expand Up @@ -182,14 +183,16 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
d.logger.Printf("[DEBUG] driver.java: started process with pid: %v", ps.Pid)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &javaHandle{
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: taskDir,
allocDir: ctx.AllocDir,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
version: d.config.Version,
logger: d.logger,
doneCh: make(chan struct{}),
Expand All @@ -210,6 +213,7 @@ func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool {
type javaId struct {
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
IsolationConfig *cstructs.IsolationConfig
TaskDir string
Expand Down Expand Up @@ -257,6 +261,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
logger: d.logger,
version: id.Version,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -269,6 +274,7 @@ func (h *javaHandle) ID() string {
id := javaId{
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
TaskDir: h.taskDir,
Expand All @@ -289,7 +295,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {

func (h *javaHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateLogConfig(task.LogConfig)

// Update is not possible
Expand Down
82 changes: 44 additions & 38 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ type QemuDriverConfig struct {

// qemuHandle is returned from Start/Open as a handle to the PID
type qemuHandle struct {
pluginClient *plugin.Client
userPid int
executor executor.Executor
allocDir *allocdir.AllocDir
killTimeout time.Duration
logger *log.Logger
version string
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
pluginClient *plugin.Client
userPid int
executor executor.Executor
allocDir *allocdir.AllocDir
killTimeout time.Duration
maxKillTimeout time.Duration
logger *log.Logger
version string
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}

// NewQemuDriver is used to create a new exec driver
Expand Down Expand Up @@ -219,28 +220,31 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
d.logger.Printf("[INFO] Started new QemuVM: %s", vmID)

// Create and Return Handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &qemuHandle{
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
allocDir: ctx.AllocDir,
killTimeout: d.DriverContext.KillTimeout(task),
version: d.config.Version,
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
allocDir: ctx.AllocDir,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
version: d.config.Version,
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}

go h.run()
return h, nil
}

type qemuId struct {
Version string
KillTimeout time.Duration
UserPid int
PluginConfig *PluginReattachConfig
AllocDir *allocdir.AllocDir
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
UserPid int
PluginConfig *PluginReattachConfig
AllocDir *allocdir.AllocDir
}

func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
Expand All @@ -264,27 +268,29 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro

// Return a driver handle
h := &qemuHandle{
pluginClient: pluginClient,
executor: executor,
userPid: id.UserPid,
allocDir: id.AllocDir,
logger: d.logger,
killTimeout: id.KillTimeout,
version: id.Version,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: executor,
userPid: id.UserPid,
allocDir: id.AllocDir,
logger: d.logger,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
version: id.Version,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}

func (h *qemuHandle) ID() string {
id := qemuId{
Version: h.version,
KillTimeout: h.killTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
}

data, err := json.Marshal(id)
Expand All @@ -300,7 +306,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult {

func (h *qemuHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateLogConfig(task.LogConfig)

// Update is not possible
Expand Down
Loading

0 comments on commit 6ef841b

Please sign in to comment.