-
Notifications
You must be signed in to change notification settings - Fork 2k
/
driver.go
351 lines (297 loc) · 9.62 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
package driver
import (
"context"
"crypto/md5"
"errors"
"fmt"
"io"
"log"
"os"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
)
var (
// BuiltinDrivers contains the built in registered drivers
// which are available for allocation handling
BuiltinDrivers = map[string]Factory{
"docker": NewDockerDriver,
"exec": NewExecDriver,
"raw_exec": NewRawExecDriver,
"java": NewJavaDriver,
"qemu": NewQemuDriver,
"rkt": NewRktDriver,
}
// DriverStatsNotImplemented is the error to be returned if a driver doesn't
// implement stats.
DriverStatsNotImplemented = errors.New("stats not implemented for driver")
)
// NewDriver is used to instantiate and return a new driver
// given the name and a logger
func NewDriver(name string, ctx *DriverContext) (Driver, error) {
// Lookup the factory function
factory, ok := BuiltinDrivers[name]
if !ok {
return nil, fmt.Errorf("unknown driver '%s'", name)
}
// Instantiate the driver
d := factory(ctx)
return d, nil
}
// Factory is used to instantiate a new Driver
type Factory func(*DriverContext) Driver
// PrestartResponse is driver state returned by Driver.Prestart.
type PrestartResponse struct {
// CreatedResources by the driver.
CreatedResources *CreatedResources
// Network contains driver-specific network parameters such as the port
// map between the host and a container.
//
// Since the network configuration may not be fully populated by
// Prestart, it will only be used for creating an environment for
// Start. It will be overridden by the DriverNetwork returned by Start.
Network *cstructs.DriverNetwork
}
// NewPrestartResponse creates a new PrestartResponse with CreatedResources
// initialized.
func NewPrestartResponse() *PrestartResponse {
return &PrestartResponse{
CreatedResources: NewCreatedResources(),
}
}
// CreatedResources is a map of resources (eg downloaded images) created by a driver
// that must be cleaned up.
type CreatedResources struct {
Resources map[string][]string
}
func NewCreatedResources() *CreatedResources {
return &CreatedResources{Resources: make(map[string][]string)}
}
// Add a new resource if it doesn't already exist.
func (r *CreatedResources) Add(k, v string) {
if r.Resources == nil {
r.Resources = map[string][]string{k: {v}}
return
}
existing, ok := r.Resources[k]
if !ok {
// Key doesn't exist, create it
r.Resources[k] = []string{v}
return
}
for _, item := range existing {
if item == v {
// resource exists, return
return
}
}
// Resource type exists but value did not, append it
r.Resources[k] = append(existing, v)
return
}
// Remove a resource. Return true if removed, otherwise false.
//
// Removes the entire key if the needle is the last value in the list.
func (r *CreatedResources) Remove(k, needle string) bool {
haystack := r.Resources[k]
for i, item := range haystack {
if item == needle {
r.Resources[k] = append(haystack[:i], haystack[i+1:]...)
if len(r.Resources[k]) == 0 {
delete(r.Resources, k)
}
return true
}
}
return false
}
// Copy returns a new deep copy of CreatedResrouces.
func (r *CreatedResources) Copy() *CreatedResources {
if r == nil {
return nil
}
newr := CreatedResources{
Resources: make(map[string][]string, len(r.Resources)),
}
for k, v := range r.Resources {
newv := make([]string, len(v))
copy(newv, v)
newr.Resources[k] = newv
}
return &newr
}
// Merge another CreatedResources into this one. If the other CreatedResources
// is nil this method is a noop.
func (r *CreatedResources) Merge(o *CreatedResources) {
if o == nil {
return
}
for k, v := range o.Resources {
// New key
if len(r.Resources[k]) == 0 {
r.Resources[k] = v
continue
}
// Existing key
OUTER:
for _, item := range v {
for _, existing := range r.Resources[k] {
if item == existing {
// Found it, move on
continue OUTER
}
}
// New item, append it
r.Resources[k] = append(r.Resources[k], item)
}
}
}
func (r *CreatedResources) Hash() []byte {
h := md5.New()
for k, values := range r.Resources {
io.WriteString(h, k)
io.WriteString(h, "values")
for i, v := range values {
io.WriteString(h, fmt.Sprintf("%d-%v", i, v))
}
}
return h.Sum(nil)
}
// StartResponse is returned by Driver.Start.
type StartResponse struct {
// Handle to the driver's task executor for controlling the lifecycle
// of the task.
Handle DriverHandle
// Network contains driver-specific network parameters such as the port
// map between the host and a container.
//
// Network may be nil as not all drivers or configurations create
// networks.
Network *cstructs.DriverNetwork
}
// Driver is used for execution of tasks. This allows Nomad
// to support many pluggable implementations of task drivers.
// Examples could include LXC, Docker, Qemu, etc.
type Driver interface {
// Drivers must support the fingerprint interface for detection
fingerprint.Fingerprint
// Prestart prepares the task environment and performs expensive
// intialization steps like downloading images.
//
// CreatedResources may be non-nil even when an error occurs.
Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error)
// Start is used to begin task execution. If error is nil,
// StartResponse.Handle will be the handle to the task's executor.
// StartResponse.Network may be nil if the task doesn't configure a
// network.
Start(ctx *ExecContext, task *structs.Task) (*StartResponse, error)
// Open is used to re-open a handle to a task
Open(ctx *ExecContext, handleID string) (DriverHandle, error)
// Cleanup is called to remove resources which were created for a task
// and no longer needed. Cleanup is not called if CreatedResources is
// nil.
//
// If Cleanup returns a recoverable error it may be retried. On retry
// it will be passed the same CreatedResources, so all successfully
// cleaned up resources should be removed or handled idempotently.
Cleanup(*ExecContext, *CreatedResources) error
// Drivers must validate their configuration
Validate(map[string]interface{}) error
// Abilities returns the abilities of the driver
Abilities() DriverAbilities
// FSIsolation returns the method of filesystem isolation used
FSIsolation() cstructs.FSIsolation
}
// DriverAbilities marks the abilities the driver has.
type DriverAbilities struct {
// SendSignals marks the driver as being able to send signals
SendSignals bool
// Exec marks the driver as being able to execute arbitrary commands
// such as health checks. Used by the ScriptExecutor interface.
Exec bool
}
// LogEventFn is a callback which allows Drivers to emit task events.
type LogEventFn func(message string, args ...interface{})
// DriverContext is a means to inject dependencies such as loggers, configs, and
// node attributes into a Driver without having to change the Driver interface
// each time we do it. Used in conjection with Factory, above.
type DriverContext struct {
taskName string
allocID string
config *config.Config
logger *log.Logger
node *structs.Node
emitEvent LogEventFn
}
// NewEmptyDriverContext returns a DriverContext with all fields set to their
// zero value.
func NewEmptyDriverContext() *DriverContext {
return &DriverContext{}
}
// NewDriverContext initializes a new DriverContext with the specified fields.
// This enables other packages to create DriverContexts but keeps the fields
// private to the driver. If we want to change this later we can gorename all of
// the fields in DriverContext.
func NewDriverContext(taskName, allocID string, config *config.Config, node *structs.Node,
logger *log.Logger, eventEmitter LogEventFn) *DriverContext {
return &DriverContext{
taskName: taskName,
allocID: allocID,
config: config,
node: node,
logger: logger,
emitEvent: eventEmitter,
}
}
// DriverHandle is an opaque handle into a driver used for task
// manipulation
type DriverHandle interface {
// Returns an opaque handle that can be used to re-open the handle
ID() string
// WaitCh is used to return a channel used wait for task completion
WaitCh() chan *dstructs.WaitResult
// Update is used to update the task if possible and update task related
// configurations.
Update(task *structs.Task) error
// Kill is used to stop the task
Kill() error
// Stats returns aggregated stats of the driver
Stats() (*cstructs.TaskResourceUsage, error)
// Signal is used to send a signal to the task
Signal(s os.Signal) error
// ScriptExecutor is an interface used to execute commands such as
// health check scripts in the a DriverHandle's context.
ScriptExecutor
}
// ScriptExecutor is an interface that supports Exec()ing commands in the
// driver's context. Split out of DriverHandle to ease testing.
type ScriptExecutor interface {
Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error)
}
// ExecContext is a task's execution context
type ExecContext struct {
// TaskDir contains information about the task directory structure.
TaskDir *allocdir.TaskDir
// TaskEnv contains the task's environment variables.
TaskEnv *env.TaskEnv
}
// NewExecContext is used to create a new execution context
func NewExecContext(td *allocdir.TaskDir, te *env.TaskEnv) *ExecContext {
return &ExecContext{
TaskDir: td,
TaskEnv: te,
}
}
func mapMergeStrStr(maps ...map[string]string) map[string]string {
out := map[string]string{}
for _, in := range maps {
for key, val := range in {
out[key] = val
}
}
return out
}