diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 828002968f8..139786b4452 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -2,12 +2,15 @@ package allocrunner import ( "fmt" + "io/ioutil" "os" + "path/filepath" "testing" "time" "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/allochealth" + "github.com/hashicorp/nomad/client/allocwatcher" cconsul "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" @@ -625,3 +628,152 @@ func TestAllocRunner_Destroy(t *testing.T) { require.Failf(t, "expected NotExist error", "found %v", err) } } + +func TestAllocRunner_SimpleRun(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + go ar.Run() + defer destroy(ar) + + // Wait for alloc to be running + testutil.WaitForResult(func() (bool, error) { + state := ar.AllocState() + + if state.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("got status %v; want %v", state.ClientStatus, structs.AllocClientStatusComplete) + } + + for t, s := range state.TaskStates { + if s.FinishedAt.IsZero() { + return false, fmt.Errorf("task %q has zero FinishedAt value", t) + } + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + +} + +// TestAllocRunner_MoveAllocDir asserts that a rescheduled +// allocation copies ephemeral disk content from previous alloc run +func TestAllocRunner_MoveAllocDir(t *testing.T) { + t.Parallel() + + // Step 1: start and run a task + alloc := mock.BatchAlloc() + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + ar.Run() + defer destroy(ar) + + require.Equal(t, structs.AllocClientStatusComplete, ar.AllocState().ClientStatus) + + // Step 2. Modify its directory + task := alloc.Job.TaskGroups[0].Tasks[0] + dataFile := filepath.Join(ar.allocDir.SharedDir, "data", "data_file") + ioutil.WriteFile(dataFile, []byte("hello world"), os.ModePerm) + taskDir := ar.allocDir.TaskDirs[task.Name] + taskLocalFile := filepath.Join(taskDir.LocalDir, "local_file") + ioutil.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm) + + // Step 3. Start a new alloc + alloc2 := mock.BatchAlloc() + alloc2.PreviousAllocation = alloc.ID + alloc2.Job.TaskGroups[0].EphemeralDisk.Sticky = true + + conf2, cleanup := testAllocRunnerConfig(t, alloc2) + conf2.PrevAllocWatcher, conf2.PrevAllocMigrator = allocwatcher.NewAllocWatcher(allocwatcher.Config{ + Alloc: alloc2, + PreviousRunner: ar, + Logger: conf2.Logger, + }) + defer cleanup() + ar2, err := NewAllocRunner(conf2) + require.NoError(t, err) + + ar2.Run() + defer destroy(ar2) + + require.Equal(t, structs.AllocClientStatusComplete, ar2.AllocState().ClientStatus) + + // Ensure that data from ar was moved to ar2 + dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file") + fileInfo, _ := os.Stat(dataFile) + require.NotNilf(t, fileInfo, "file %q not found", dataFile) + + taskDir = ar2.allocDir.TaskDirs[task.Name] + taskLocalFile = filepath.Join(taskDir.LocalDir, "local_file") + fileInfo, _ = os.Stat(taskLocalFile) + require.NotNilf(t, fileInfo, "file %q not found", dataFile) + +} + +// TestAllocRuner_HandlesArtifactFailure ensures that if one task in a task group is +// retrying fetching an artifact, other tasks in the group should be able +// to proceed. +func TestAllocRunner_HandlesArtifactFailure(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{ + Mode: structs.RestartPolicyModeFail, + Attempts: 1, + Delay: time.Nanosecond, + Interval: time.Hour, + } + + // Create a new task with a bad artifact + badtask := alloc.Job.TaskGroups[0].Tasks[0].Copy() + badtask.Name = "bad" + badtask.Artifacts = []*structs.TaskArtifact{ + {GetterSource: "http://127.0.0.1:0/foo/bar/baz"}, + } + + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, badtask) + alloc.AllocatedResources.Tasks["bad"] = &structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{ + CpuShares: 500, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 256, + }, + } + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + go ar.Run() + defer destroy(ar) + + testutil.WaitForResult(func() (bool, error) { + state := ar.AllocState() + + switch state.ClientStatus { + case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed: + return true, nil + default: + return false, fmt.Errorf("got status %v but want terminal", state.ClientStatus) + } + + }, func(err error) { + require.NoError(t, err) + }) + + state := ar.AllocState() + require.Equal(t, structs.AllocClientStatusFailed, state.ClientStatus) + require.Equal(t, structs.TaskStateDead, state.TaskStates["web"].State) + require.True(t, state.TaskStates["web"].Successful()) + require.Equal(t, structs.TaskStateDead, state.TaskStates["bad"].State) + require.True(t, state.TaskStates["bad"].Failed) +} diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 2dedc011f4e..7eed9742add 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -1238,6 +1238,323 @@ func TestTaskRunner_Run_RecoverableStartError(t *testing.T) { require.Equal(t, structs.TaskNotRestarting, state.Events[5].Type) } +// TestTaskRunner_Template_Artifact asserts that tasks can use artifacts as templates. +func TestTaskRunner_Template_Artifact(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.FileServer(http.Dir("."))) + defer ts.Close() + + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + f1 := "task_runner.go" + f2 := "test" + task.Artifacts = []*structs.TaskArtifact{ + {GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1)}, + } + task.Templates = []*structs.Template{ + { + SourcePath: f1, + DestPath: "local/test", + ChangeMode: structs.TemplateChangeModeNoop, + }, + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + go tr.Run() + + // Wait for task to run and exit + select { + case <-tr.WaitCh(): + case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): + require.Fail(t, "timed out waiting for task runner to exit") + } + + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + require.True(t, state.Successful()) + require.False(t, state.Failed) + + artifactsDownloaded := false + for _, e := range state.Events { + if e.Type == structs.TaskDownloadingArtifacts { + artifactsDownloaded = true + } + } + assert.True(t, artifactsDownloaded, "expected artifacts downloaded events") + + // Check that both files exist. + _, err = os.Stat(filepath.Join(conf.TaskDir.Dir, f1)) + require.NoErrorf(t, err, "%v not downloaded", f1) + + _, err = os.Stat(filepath.Join(conf.TaskDir.LocalDir, f2)) + require.NoErrorf(t, err, "%v not rendered", f2) +} + +// TestTaskRunner_Template_NewVaultToken asserts that a new vault token is +// created when rendering template and that it is revoked on alloc completion +func TestTaskRunner_Template_NewVaultToken(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Templates = []*structs.Template{ + { + EmbeddedTmpl: `{{key "foo"}}`, + DestPath: "local/test", + ChangeMode: structs.TemplateChangeModeNoop, + }, + } + task.Vault = &structs.Vault{Policies: []string{"default"}} + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + go tr.Run() + + // Wait for a Vault token + var token string + testutil.WaitForResult(func() (bool, error) { + token = tr.getVaultToken() + + if token == "" { + return false, fmt.Errorf("No Vault token") + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + vault := conf.Vault.(*vaultclient.MockVaultClient) + renewalCh, ok := vault.RenewTokens()[token] + require.True(t, ok, "no renewal channel for token") + + renewalCh <- fmt.Errorf("Test killing") + close(renewalCh) + + var token2 string + testutil.WaitForResult(func() (bool, error) { + token2 = tr.getVaultToken() + + if token2 == "" { + return false, fmt.Errorf("No Vault token") + } + + if token2 == token { + return false, fmt.Errorf("token wasn't recreated") + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + // Check the token was revoked + testutil.WaitForResult(func() (bool, error) { + if len(vault.StoppedTokens()) != 1 { + return false, fmt.Errorf("Expected a stopped token: %v", vault.StoppedTokens()) + } + + if a := vault.StoppedTokens()[0]; a != token { + return false, fmt.Errorf("got stopped token %q; want %q", a, token) + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + +} + +// TestTaskRunner_VaultManager_Restart asserts that the alloc is restarted when the alloc +// derived vault token expires, when task is configured with Restart change mode +func TestTaskRunner_VaultManager_Restart(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Config = map[string]interface{}{ + "run_for": "10s", + } + task.Vault = &structs.Vault{ + Policies: []string{"default"}, + ChangeMode: structs.VaultChangeModeRestart, + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + go tr.Run() + + testWaitForTaskToStart(t, tr) + + tr.vaultTokenLock.Lock() + token := tr.vaultToken + tr.vaultTokenLock.Unlock() + + require.NotEmpty(t, token) + + vault := conf.Vault.(*vaultclient.MockVaultClient) + renewalCh, ok := vault.RenewTokens()[token] + require.True(t, ok, "no renewal channel for token") + + renewalCh <- fmt.Errorf("Test killing") + close(renewalCh) + + testutil.WaitForResult(func() (bool, error) { + state := tr.TaskState() + + if len(state.Events) == 0 { + return false, fmt.Errorf("no events yet") + } + + foundRestartSignal, foundRestarting := false, false + for _, e := range state.Events { + switch e.Type { + case structs.TaskRestartSignal: + foundRestartSignal = true + case structs.TaskRestarting: + foundRestarting = true + } + } + + if !foundRestartSignal { + return false, fmt.Errorf("no restart signal event yet: %#v", state.Events) + } + + if !foundRestarting { + return false, fmt.Errorf("no restarting event yet: %#v", state.Events) + } + + lastEvent := state.Events[len(state.Events)-1] + if lastEvent.Type != structs.TaskStarted { + return false, fmt.Errorf("expected last event to be task starting but was %#v", lastEvent) + } + return true, nil + }, func(err error) { + require.NoError(t, err) + }) +} + +// TestTaskRunner_VaultManager_Signal asserts that the alloc is signalled when the alloc +// derived vault token expires, when task is configured with signal change mode +func TestTaskRunner_VaultManager_Signal(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Config = map[string]interface{}{ + "run_for": "10s", + } + task.Vault = &structs.Vault{ + Policies: []string{"default"}, + ChangeMode: structs.VaultChangeModeSignal, + ChangeSignal: "SIGUSR1", + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + go tr.Run() + + testWaitForTaskToStart(t, tr) + + tr.vaultTokenLock.Lock() + token := tr.vaultToken + tr.vaultTokenLock.Unlock() + + require.NotEmpty(t, token) + + vault := conf.Vault.(*vaultclient.MockVaultClient) + renewalCh, ok := vault.RenewTokens()[token] + require.True(t, ok, "no renewal channel for token") + + renewalCh <- fmt.Errorf("Test killing") + close(renewalCh) + + testutil.WaitForResult(func() (bool, error) { + state := tr.TaskState() + + if len(state.Events) == 0 { + return false, fmt.Errorf("no events yet") + } + + foundSignaling := false + for _, e := range state.Events { + if e.Type == structs.TaskSignaling { + foundSignaling = true + } + } + + if !foundSignaling { + return false, fmt.Errorf("no signaling event yet: %#v", state.Events) + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + +} + +// TestTaskRunner_UnregisterConsul_Retries asserts a task is unregistered from +// Consul when waiting to be retried. +func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + // Make the restart policy try one ctx.update + alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{ + Attempts: 1, + Interval: 10 * time.Minute, + Delay: time.Nanosecond, + Mode: structs.RestartPolicyModeFail, + } + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "exit_code": "1", + "run_for": "1ns", + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + tr.Run() + + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + + consul := conf.Consul.(*consulapi.MockConsulServiceClient) + consulOps := consul.GetOps() + require.Len(t, consulOps, 6) + // pattern: add followed by two removals + require.Equal(t, "add", consulOps[0].Op) + require.Equal(t, "remove", consulOps[1].Op) + require.Equal(t, "remove", consulOps[2].Op) + require.Equal(t, "add", consulOps[3].Op) + require.Equal(t, "remove", consulOps[4].Op) + require.Equal(t, "remove", consulOps[5].Op) +} + // testWaitForTaskToStart waits for the task to be running or fails the test func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { diff --git a/client/allocrunner/taskrunner/vault_hook.go b/client/allocrunner/taskrunner/vault_hook.go index e14446a7466..016fbf6108f 100644 --- a/client/allocrunner/taskrunner/vault_hook.go +++ b/client/allocrunner/taskrunner/vault_hook.go @@ -254,7 +254,7 @@ OUTER: case structs.VaultChangeModeRestart: const noFailure = false h.lifecycle.Restart(h.ctx, - structs.NewTaskEvent(structs.TaskRestarting). + structs.NewTaskEvent(structs.TaskRestartSignal). SetDisplayMessage("Vault: new Vault token acquired"), false) case structs.VaultChangeModeNoop: fallthrough