-
Notifications
You must be signed in to change notification settings - Fork 2k
/
lifecycle.go
186 lines (148 loc) · 4.81 KB
/
lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package taskrunner
import (
"context"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
// Restart restarts a task that is already running. Returns an error if the
// task is not running. Blocks until existing task exits or passed-in context
// is canceled.
func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
tr.logger.Trace("Restart requested", "failure", failure, "event", event.GoString())
taskState := tr.TaskState()
if taskState == nil {
return ErrTaskNotRunning
}
switch taskState.State {
case structs.TaskStatePending, structs.TaskStateDead:
return ErrTaskNotRunning
}
return tr.restartImpl(ctx, event, failure)
}
// ForceRestart restarts a task that is already running or reruns it if dead.
// Returns an error if the task is not able to rerun. Blocks until existing
// task exits or passed-in context is canceled.
//
// Callers must restart the AllocRuner taskCoordinator beforehand to make sure
// the task will be able to run again.
func (tr *TaskRunner) ForceRestart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
tr.logger.Trace("Force restart requested", "failure", failure, "event", event.GoString())
taskState := tr.TaskState()
if taskState == nil {
return ErrTaskNotRunning
}
tr.stateLock.Lock()
localState := tr.localState.Copy()
tr.stateLock.Unlock()
if localState == nil {
return ErrTaskNotRunning
}
switch taskState.State {
case structs.TaskStatePending:
return ErrTaskNotRunning
case structs.TaskStateDead:
// Tasks that are in the "dead" state are only allowed to restart if
// their Run() method is still active.
if localState.RunComplete {
return ErrTaskNotRunning
}
}
return tr.restartImpl(ctx, event, failure)
}
// restartImpl implements to task restart process.
//
// It should never be called directly as it doesn't verify if the task state
// allows for a restart.
func (tr *TaskRunner) restartImpl(ctx context.Context, event *structs.TaskEvent, failure bool) error {
// Check if the task is able to restart based on its state and the type of
// restart event that was triggered.
taskState := tr.TaskState()
if taskState == nil {
return ErrTaskNotRunning
}
// Emit the event since it may take a long time to kill
tr.EmitEvent(event)
// Tell the restart tracker that a restart triggered the exit
tr.restartTracker.SetRestartTriggered(failure)
// Signal a restart to unblock tasks that are in the "dead" state, but
// don't block since the channel is buffered. Only one signal is enough to
// notify the tr.Run() loop.
// The channel must be signaled after SetRestartTriggered is called so the
// tr.Run() loop runs again.
if taskState.State == structs.TaskStateDead {
select {
case tr.restartCh <- struct{}{}:
default:
}
}
// Grab the handle to see if the task is still running and needs to be
// killed.
handle := tr.getDriverHandle()
if handle == nil {
return nil
}
// Run the pre-kill hooks prior to restarting the task
tr.preKill()
// Grab a handle to the wait channel that will timeout with context cancelation
// _before_ killing the task.
waitCh, err := handle.WaitCh(ctx)
if err != nil {
return err
}
// Kill the task using an exponential backoff in-case of failures.
if _, err := tr.killTask(handle, waitCh); err != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err)
}
select {
case <-waitCh:
case <-ctx.Done():
}
return nil
}
func (tr *TaskRunner) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
tr.logger.Trace("Exec requested")
handle := tr.getDriverHandle()
if handle == nil {
return nil, 0, ErrTaskNotRunning
}
out, code, err := handle.Exec(timeout, cmd, args)
return out, code, err
}
func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
tr.logger.Trace("Signal requested", "signal", s)
// Grab the handle
handle := tr.getDriverHandle()
// Check it is running
if handle == nil {
return ErrTaskNotRunning
}
// Emit the event
tr.EmitEvent(event)
// Send the signal
return handle.Signal(s)
}
// Kill a task. Blocks until task exits or context is canceled. State is set to
// dead.
func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error {
tr.logger.Trace("Kill requested")
// Cancel the task runner to break out of restart delay or the main run
// loop.
tr.killCtxCancel()
// Emit kill event
if event != nil {
tr.logger.Trace("Kill event", "event_type", event.Type, "event_reason", event.KillReason)
tr.EmitEvent(event)
}
select {
case <-tr.WaitCh():
case <-ctx.Done():
return ctx.Err()
}
return tr.getKillErr()
}
func (tr *TaskRunner) IsRunning() bool {
return tr.getDriverHandle() != nil
}