Skip to content

Commit

Permalink
tests: add new task runner test helper
Browse files Browse the repository at this point in the history
Adds a new helper and removes a duplicated test.
  • Loading branch information
schmichael committed Feb 13, 2019
1 parent fa9537f commit 38da167
Showing 1 changed file with 23 additions and 81 deletions.
104 changes: 23 additions & 81 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,22 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
return conf, trCleanup
}

// runTestTaskRunner runs a TaskRunner and returns its configuration as well as
// a cleanup function that ensures the runner is stopped and cleaned up. Tests
// which need to change the Config *must* use testTaskRunnerConfig instead.
func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) (*TaskRunner, *Config, func()) {
config, cleanup := testTaskRunnerConfig(t, alloc, taskName)

tr, err := NewTaskRunner(config)
require.NoError(t, err)
go tr.Run()

return tr, config, func() {
tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
cleanup()
}
}

// TestTaskRunner_Restore asserts restoring a running task does not rerun the
// task.
func TestTaskRunner_Restore_Running(t *testing.T) {
Expand Down Expand Up @@ -170,7 +186,6 @@ func TestTaskRunner_TaskEnv(t *testing.T) {
"common_user": "somebody",
}
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Meta = map[string]string{
"foo": "bar",
}
Expand All @@ -181,15 +196,9 @@ func TestTaskRunner_TaskEnv(t *testing.T) {
"stdout_string": `${node.region} ${NOMAD_META_foo} ${NOMAD_META_common_user}`,
}

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

// Run the first TaskRunner
tr, err := NewTaskRunner(conf)
require.NoError(err)
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for task to complete
select {
case <-tr.WaitCh():
Expand All @@ -208,48 +217,6 @@ func TestTaskRunner_TaskEnv(t *testing.T) {
assert.Equal(t, "global bar somebody", mockCfg.StdoutString)
}

func TestTaskRunner_TaskConfig(t *testing.T) {
t.Parallel()
require := require.New(t)

alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"

//// Use interpolation from both node attributes and meta vars
//task.Config = map[string]interface{}{
// "run_for": "1ms",
//}

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
defer cleanup()

// Run the first TaskRunner
tr, err := NewTaskRunner(conf)
require.NoError(err)
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for task to complete
select {
case <-tr.WaitCh():
case <-time.After(3 * time.Second):
}

// Get the mock driver plugin
driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name)
require.NoError(err)
mockDriver := driverPlugin.(*mockdriver.Driver)

// Assert its config has been properly interpolated
driverCfg, mockCfg := mockDriver.GetTaskConfig()
require.NotNil(driverCfg)
require.NotNil(mockCfg)
assert.Equal(t, alloc.Job.Name, driverCfg.JobName)
assert.Equal(t, alloc.TaskGroup, driverCfg.TaskGroupName)
assert.Equal(t, alloc.Job.TaskGroups[0].Tasks[0].Name, driverCfg.Name)
}

// Test that devices get sent to the driver
func TestTaskRunner_DevicePropogation(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -473,16 +440,11 @@ func TestTaskRunner_ShutdownDelay(t *testing.T) {
// No shutdown escape hatch for this delay, so don't set it too high
task.ShutdownDelay = 1000 * time.Duration(testutil.TestMultiplier()) * time.Millisecond

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

mockConsul := conf.Consul.(*consul.MockConsulServiceClient)

tr, err := NewTaskRunner(conf)
require.NoError(t, err)
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for the task to start
testWaitForTaskToStart(t, tr)

Expand Down Expand Up @@ -568,14 +530,9 @@ func TestTaskRunner_Dispatch_Payload(t *testing.T) {
compressed := snappy.Encode(nil, expected)
alloc.Job.Payload = compressed

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

tr, err := NewTaskRunner(conf)
require.NoError(t, err)
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for it to finish
testutil.WaitForResult(func() (bool, error) {
ts := tr.TaskState()
Expand Down Expand Up @@ -610,14 +567,9 @@ func TestTaskRunner_SignalFailure(t *testing.T) {
"signal_error": errMsg,
}

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

tr, err := NewTaskRunner(conf)
require.NoError(t, err)
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

testWaitForTaskToStart(t, tr)

require.EqualError(t, tr.Signal(&structs.TaskEvent{}, "SIGINT"), errMsg)
Expand All @@ -635,14 +587,9 @@ func TestTaskRunner_RestartTask(t *testing.T) {
"run_for": "10m",
}

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

tr, err := NewTaskRunner(conf)
require.NoError(t, err)
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

testWaitForTaskToStart(t, tr)

// Restart task. Send a RestartSignal event like check watcher. Restart
Expand Down Expand Up @@ -977,14 +924,9 @@ func TestTaskRunner_Download_List(t *testing.T) {
}
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

tr, err := NewTaskRunner(conf)
require.NoError(t, err)
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
go tr.Run()

// Wait for task to run and exit
select {
case <-tr.WaitCh():
Expand All @@ -1004,7 +946,7 @@ func TestTaskRunner_Download_List(t *testing.T) {
assert.Equal(t, structs.TaskTerminated, state.Events[4].Type)

// Check that both files exist.
_, err = os.Stat(filepath.Join(conf.TaskDir.Dir, f1))
_, err := os.Stat(filepath.Join(conf.TaskDir.Dir, f1))
require.NoErrorf(t, err, "%v not downloaded", f1)

_, err = os.Stat(filepath.Join(conf.TaskDir.Dir, f2))
Expand Down

0 comments on commit 38da167

Please sign in to comment.