diff --git a/api/tasks.go b/api/tasks.go index e627dae09d1..c62fdc99c39 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -500,6 +500,21 @@ func (m *MigrateStrategy) Copy() *MigrateStrategy { return nm } +type Volume struct { + Name string + Type string + ReadOnly bool `mapstructure:"read_only"` + Hidden bool + + Config map[string]interface{} +} + +type VolumeMount struct { + Volume string + Destination string + ReadOnly bool `mapstructure:"read_only"` +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { Name *string @@ -508,6 +523,7 @@ type TaskGroup struct { Affinities []*Affinity Tasks []*Task Spreads []*Spread + Volumes map[string]*Volume RestartPolicy *RestartPolicy ReschedulePolicy *ReschedulePolicy EphemeralDisk *EphemeralDisk @@ -715,6 +731,7 @@ type Task struct { Vault *Vault Templates []*Template DispatchPayload *DispatchPayloadConfig + VolumeMounts []*VolumeMount Leader bool ShutdownDelay time.Duration `mapstructure:"shutdown_delay"` KillSignal string `mapstructure:"kill_signal"` diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 0419f3e54fe..9fa605498d7 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -61,6 +61,7 @@ func (tr *TaskRunner) initHooks() { newTaskDirHook(tr, hookLogger), newLogMonHook(tr.logmonHookConfig, hookLogger), newDispatchHook(tr.Alloc(), hookLogger), + newVolumeHook(tr, hookLogger), newArtifactHook(tr, hookLogger), newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), newDeviceHook(tr.devicemanager, hookLogger), diff --git a/client/allocrunner/taskrunner/volume_hook.go b/client/allocrunner/taskrunner/volume_hook.go new file mode 100644 index 00000000000..b2c0ab3e701 --- /dev/null +++ b/client/allocrunner/taskrunner/volume_hook.go @@ -0,0 +1,127 @@ +package taskrunner + +import ( + "context" + "fmt" + + log "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" +) + +type volumeHook struct { + alloc *structs.Allocation + runner *TaskRunner + logger log.Logger +} + +func newVolumeHook(runner *TaskRunner, logger log.Logger) *volumeHook { + h := &volumeHook{ + alloc: runner.Alloc(), + runner: runner, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*volumeHook) Name() string { + return "volumes" +} + +func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) error { + var result error + + for n, req := range requestedByAlias { + if req.Type != structs.VolumeTypeHost { + continue + } + + cfg, err := structs.ParseHostVolumeConfig(req.Config) + if err != nil { + result = multierror.Append(result, fmt.Errorf("failed to parse config for %s: %v", n, err)) + continue + } + + _, ok := clientVolumesByName[cfg.Source] + if !ok { + result = multierror.Append(result, fmt.Errorf("missing %s", cfg.Source)) + } + } + + return result +} + +// hostVolumeMountConfigurations takes the users requested volume mounts, +// volumes, and the client host volume configuration and converts them into a +// format that can be used by drivers. +func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeMount, taskVolumesByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) { + var mounts []*drivers.MountConfig + for _, m := range taskMounts { + req, ok := taskVolumesByAlias[m.Volume] + if !ok { + // Should never happen unless we misvalidated on job submission + return nil, fmt.Errorf("No group volume declaration found named: %s", m.Volume) + } + + cfg, err := structs.ParseHostVolumeConfig(req.Config) + if err != nil { + return nil, fmt.Errorf("failed to parse config for %s: %v", m.Volume, err) + } + + hostVolume, ok := clientVolumesByName[cfg.Source] + if !ok { + // Should never happen, but unless the client volumes were mutated during + // the execution of this hook. + return nil, fmt.Errorf("No host volume named: %s", cfg.Source) + } + + mcfg := &drivers.MountConfig{ + HostPath: hostVolume.Path, + TaskPath: m.Destination, + Readonly: hostVolume.ReadOnly || req.ReadOnly || m.ReadOnly, + } + mounts = append(mounts, mcfg) + } + + return mounts, nil +} + +func (h *volumeHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + volumes := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup).Volumes + mounts := h.runner.hookResources.getMounts() + + hostVolumes := h.runner.clientConfig.Node.HostVolumes + + // Always validate volumes to ensure that we do not allow volumes to be used + // if a host is restarted and loses the host volume configuration. + if err := validateHostVolumes(volumes, hostVolumes); err != nil { + h.logger.Error("Requested Host Volume does not exist", "existing", hostVolumes, "requested", volumes) + return fmt.Errorf("host volume validation error: %v", err) + } + + requestedMounts, err := h.hostVolumeMountConfigurations(req.Task.VolumeMounts, volumes, hostVolumes) + if err != nil { + h.logger.Error("Failed to generate volume mounts", "error", err) + return err + } + + // Because this hook is also ran on restores, we only add mounts that do not + // already exist. Although this loop is somewhat expensive, there are only + // a small number of mounts that exist within most individual tasks. We may + // want to revisit this using a `hookdata` param to be "mount only once" +REQUESTED: + for _, m := range requestedMounts { + for _, em := range mounts { + if em.IsEqual(m) { + continue REQUESTED + } + } + + mounts = append(mounts, m) + } + + h.runner.hookResources.setMounts(mounts) + return nil +} diff --git a/client/client.go b/client/client.go index 636ed941730..b8b41c1fd3d 100644 --- a/client/client.go +++ b/client/client.go @@ -1266,6 +1266,16 @@ func (c *Client) setupNode() error { if node.Name == "" { node.Name, _ = os.Hostname() } + // TODO(dani): Fingerprint these to handle volumes that don't exist/have bad perms. + if node.HostVolumes == nil { + if l := len(c.config.HostVolumes); l != 0 { + node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig, l) + for k, v := range c.config.HostVolumes { + node.HostVolumes[k] = v.Copy() + } + } + } + if node.Name == "" { node.Name = node.ID } diff --git a/client/config/config.go b/client/config/config.go index 06bc96d4b9f..32f81a4f3f4 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -246,6 +246,9 @@ type Config struct { // AutoFetchCNIDir is the destination dir to use when auto doanloading CNI plugins. // This directory will be appended to the CNIPath so it is searched last AutoFetchCNIDir string + + // HostVolumes is the set of configured host volumes + HostVolumes map[string]*structs.ClientHostVolumeConfig } func (c *Config) Copy() *Config { @@ -254,6 +257,7 @@ func (c *Config) Copy() *Config { nc.Node = nc.Node.Copy() nc.Servers = helper.CopySliceString(nc.Servers) nc.Options = helper.CopyMapStringString(nc.Options) + nc.HostVolumes = structs.CopyMapStringClientHostVolumeConfig(nc.HostVolumes) nc.ConsulConfig = c.ConsulConfig.Copy() nc.VaultConfig = c.VaultConfig.Copy() return nc diff --git a/command/agent/agent.go b/command/agent/agent.go index 0b8ce926956..626acc365ca 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -470,6 +470,12 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { conf.ClientMinPort = uint(agentConfig.Client.ClientMinPort) conf.DisableRemoteExec = agentConfig.Client.DisableRemoteExec + hvMap := make(map[string]*structs.ClientHostVolumeConfig, len(agentConfig.Client.HostVolumes)) + for _, v := range agentConfig.Client.HostVolumes { + hvMap[v.Name] = v + } + conf.HostVolumes = hvMap + // Setup the node conf.Node = new(structs.Node) conf.Node.Datacenter = agentConfig.Datacenter diff --git a/command/agent/config.go b/command/agent/config.go index 2f804855319..8ba3e0ec198 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -245,6 +245,10 @@ type ClientConfig struct { // ServerJoin contains information that is used to attempt to join servers ServerJoin *ServerJoin `hcl:"server_join"` + // HostVolumes contains information about the volumes an operator has made + // available to jobs running on this node. + HostVolumes []*structs.ClientHostVolumeConfig `hcl:"host_volume"` + // ExtraKeysHCL is used by hcl to surface unexpected keys ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` @@ -1333,6 +1337,12 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin) } + if len(a.HostVolumes) == 0 && len(b.HostVolumes) != 0 { + result.HostVolumes = structs.CopySliceClientHostVolumeConfig(b.HostVolumes) + } else if len(b.HostVolumes) != 0 { + result.HostVolumes = structs.HostVolumeSliceMerge(a.HostVolumes, b.HostVolumes) + } + return &result } diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 225ca894210..0056af6effa 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -138,6 +138,11 @@ func extraKeys(c *Config) error { // stats is an unused key, continue to silently ignore it removeEqualFold(&c.Client.ExtraKeysHCL, "stats") + // Remove HostVolume extra keys + for _, hv := range c.Client.HostVolumes { + removeEqualFold(&c.Client.ExtraKeysHCL, hv.Name) + } + for _, k := range []string{"enabled_schedulers", "start_join", "retry_join", "server_join"} { removeEqualFold(&c.ExtraKeysHCL, k) removeEqualFold(&c.ExtraKeysHCL, "server") diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index c8593ce59c3..362bddb92c0 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/stretchr/testify/require" ) @@ -81,6 +82,9 @@ var basicConfig = &Config{ GCMaxAllocs: 50, NoHostUUID: helper.BoolToPtr(false), DisableRemoteExec: true, + HostVolumes: []*structs.ClientHostVolumeConfig{ + {Name: "tmp", Path: "/tmp"}, + }, }, Server: &ServerConfig{ Enabled: true, diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index a42ce63de9f..7271ee96d50 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -183,6 +183,7 @@ func (s *HTTPServer) ValidateJobRequest(resp http.ResponseWriter, req *http.Requ } job := ApiJobToStructJob(validateRequest.Job) + args := structs.JobValidateRequest{ Job: job, WriteRequest: structs.WriteRequest{ @@ -728,6 +729,26 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { } } + if l := len(taskGroup.Volumes); l != 0 { + tg.Volumes = make(map[string]*structs.VolumeRequest, l) + for k, v := range taskGroup.Volumes { + if v.Type != structs.VolumeTypeHost { + // Ignore non-host volumes in this iteration currently. + continue + } + + vol := &structs.VolumeRequest{ + Name: v.Name, + Type: v.Type, + ReadOnly: v.ReadOnly, + Hidden: v.Hidden, + Config: v.Config, + } + + tg.Volumes[k] = vol + } + } + if taskGroup.Update != nil { tg.Update = &structs.UpdateStrategy{ Stagger: *taskGroup.Update.Stagger, @@ -775,6 +796,17 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { structsTask.Constraints = ApiConstraintsToStructs(apiTask.Constraints) structsTask.Affinities = ApiAffinitiesToStructs(apiTask.Affinities) + if l := len(apiTask.VolumeMounts); l != 0 { + structsTask.VolumeMounts = make([]*structs.VolumeMount, l) + for i, mount := range apiTask.VolumeMounts { + structsTask.VolumeMounts[i] = &structs.VolumeMount{ + Volume: mount.Volume, + Destination: mount.Destination, + ReadOnly: mount.ReadOnly, + } + } + } + if l := len(apiTask.Services); l != 0 { structsTask.Services = make([]*structs.Service, l) for i, service := range apiTask.Services { diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 97b4cd99c04..03ce1c7499f 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -89,6 +89,10 @@ client { gc_max_allocs = 50 no_host_uuid = false disable_remote_exec = true + + host_volume "tmp" { + path = "/tmp" + } } server { diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index 11af6beb4bb..6eedcb85f59 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -44,12 +44,22 @@ "client_max_port": 2000, "client_min_port": 1000, "cpu_total_compute": 4444, + "disable_remote_exec": true, "enabled": true, "gc_disk_usage_threshold": 82, "gc_inode_usage_threshold": 91, "gc_interval": "6s", "gc_max_allocs": 50, "gc_parallel_destroys": 6, + "host_volume": [ + { + "tmp": [ + { + "path": "/tmp" + } + ] + } + ], "max_kill_timeout": "10s", "meta": [ { @@ -60,7 +70,6 @@ "network_interface": "eth0", "network_speed": 100, "no_host_uuid": false, - "disable_remote_exec": true, "node_class": "linux-medium-64bit", "options": [ { @@ -137,25 +146,39 @@ "log_json": true, "log_level": "ERR", "name": "my-web", - "plugin": { - "docker": { - "args": [ - "foo", - "bar" - ], - "config": { - "foo": "bar", - "nested": { - "bam": 2 + "plugin": [ + { + "docker": [ + { + "args": [ + "foo", + "bar" + ], + "config": [ + { + "foo": "bar", + "nested": [ + { + "bam": 2 + } + ] + } + ] } - } + ] }, - "exec": { - "config": { - "foo": true + { + "exec": [ + { + "config": [ + { + "foo": true + } + ] } + ] } - }, + ], "plugin_dir": "/tmp/nomad-plugins", "ports": [ { diff --git a/jobspec/parse.go b/jobspec/parse.go index c0a7dc60878..510da3bc935 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -316,6 +316,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { "spread", "network", "service", + "volume", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -337,6 +338,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { delete(m, "spread") delete(m, "network") delete(m, "service") + delete(m, "volume") // Build the group with the basic decode var g api.TaskGroup @@ -424,6 +426,12 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { } } + // Parse any volume declarations + if o := listVal.Filter("volume"); len(o.Items) > 0 { + if err := parseVolumes(&g.Volumes, o); err != nil { + return multierror.Prefix(err, "volume ->") + } + } // Parse tasks if o := listVal.Filter("task"); len(o.Items) > 0 { if err := parseTasks(*result.Name, *g.Name, &g.Tasks, o); err != nil { @@ -463,6 +471,101 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { return nil } +func parseVolumes(out *map[string]*api.Volume, list *ast.ObjectList) error { + volumes := make(map[string]*api.Volume, len(list.Items)) + + for _, item := range list.Items { + n := item.Keys[0].Token.Value().(string) + valid := []string{ + "type", + "read_only", + "hidden", + "config", + } + if err := helper.CheckHCLKeys(item.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, item.Val); err != nil { + return err + } + + // TODO(dani): FIXME: this is gross but we don't have ObjectList.Filter here + var cfg map[string]interface{} + if cfgI, ok := m["config"]; ok { + cfgL, ok := cfgI.([]map[string]interface{}) + if !ok { + return fmt.Errorf("Incorrect `config` type, expected map") + } + + if len(cfgL) != 1 { + return fmt.Errorf("Expected single `config`, found %d", len(cfgL)) + } + + cfg = cfgL[0] + } + delete(m, "config") + + var result api.Volume + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + WeaklyTypedInput: true, + Result: &result, + }) + if err != nil { + return err + } + if err := dec.Decode(m); err != nil { + return err + } + + result.Name = n + result.Config = cfg + + volumes[n] = &result + } + + *out = volumes + return nil +} + +func parseVolumeMounts(out *[]*api.VolumeMount, list *ast.ObjectList) error { + mounts := make([]*api.VolumeMount, len(list.Items)) + + for i, item := range list.Items { + valid := []string{ + "volume", + "read_only", + "destination", + } + if err := helper.CheckHCLKeys(item.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, item.Val); err != nil { + return err + } + + var result api.VolumeMount + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + WeaklyTypedInput: true, + Result: &result, + }) + if err != nil { + return err + } + if err := dec.Decode(m); err != nil { + return err + } + + mounts[i] = &result + } + + *out = mounts + return nil +} + func parseRestartPolicy(final **api.RestartPolicy, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) > 1 { @@ -892,6 +995,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list "user", "vault", "kill_signal", + "volume_mount", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -913,6 +1017,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list delete(m, "service") delete(m, "template") delete(m, "vault") + delete(m, "volume_mount") // Build the task var t api.Task @@ -995,6 +1100,14 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list } } + // Parse volume mounts + if o := listVal.Filter("volume_mount"); len(o.Items) > 0 { + if err := parseVolumeMounts(&t.VolumeMounts, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf( + "'%s', volume_mount ->", n)) + } + } + // If we have resources, then parse that if o := listVal.Filter("resources"); len(o.Items) > 0 { var r api.Resources diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index a8c36f6a350..45cc6da5403 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -86,6 +86,7 @@ func TestParse(t *testing.T) { TaskGroups: []*api.TaskGroup{ { Name: helper.StringToPtr("outside"), + Tasks: []*api.Task{ { Name: "outside", @@ -110,6 +111,13 @@ func TestParse(t *testing.T) { Operand: "=", }, }, + + Volumes: map[string]*api.Volume{ + "foo": { + Name: "foo", + Type: "host", + }, + }, Affinities: []*api.Affinity{ { LTarget: "${node.datacenter}", @@ -186,6 +194,12 @@ func TestParse(t *testing.T) { }, }, }, + VolumeMounts: []*api.VolumeMount{ + { + Volume: "foo", + Destination: "/mnt/foo", + }, + }, Affinities: []*api.Affinity{ { LTarget: "${meta.foo}", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 8c0f0aacacc..cd9bc8a372a 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -63,6 +63,10 @@ job "binstore-storagelocker" { group "binsl" { count = 5 + volume "foo" { + type = "host" + } + restart { attempts = 5 interval = "10m" @@ -142,6 +146,11 @@ job "binstore-storagelocker" { } } + volume_mount { + volume = "foo" + destination = "/mnt/foo" + } + logs { max_files = 14 max_file_size = 101 diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 59851fcf97c..b5be61de684 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1578,6 +1578,9 @@ type Node struct { // Drivers is a map of driver names to current driver information Drivers map[string]*DriverInfo + // HostVolumes is a map of host volume names to their configuration + HostVolumes map[string]*ClientHostVolumeConfig + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -1622,6 +1625,7 @@ func (n *Node) Copy() *Node { nn.Events = copyNodeEvents(n.Events) nn.DrainStrategy = nn.DrainStrategy.Copy() nn.Drivers = copyNodeDrivers(n.Drivers) + nn.HostVolumes = copyNodeHostVolumes(n.HostVolumes) return nn } @@ -1653,6 +1657,21 @@ func copyNodeDrivers(drivers map[string]*DriverInfo) map[string]*DriverInfo { return c } +// copyNodeHostVolumes is a helper to copy a map of string to Volume +func copyNodeHostVolumes(volumes map[string]*ClientHostVolumeConfig) map[string]*ClientHostVolumeConfig { + l := len(volumes) + if l == 0 { + return nil + } + + c := make(map[string]*ClientHostVolumeConfig, l) + for volume, v := range volumes { + c[volume] = v.Copy() + } + + return c +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (n *Node) TerminalStatus() bool { @@ -4659,6 +4678,9 @@ type TaskGroup struct { // Services this group provides Services []*Service + + // Volumes is a map of volumes that have been requested by the task group. + Volumes map[string]*VolumeRequest } func (tg *TaskGroup) Copy() *TaskGroup { @@ -4673,6 +4695,7 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.ReschedulePolicy = ntg.ReschedulePolicy.Copy() ntg.Affinities = CopySliceAffinities(ntg.Affinities) ntg.Spreads = CopySliceSpreads(ntg.Spreads) + ntg.Volumes = CopyMapVolumeRequest(ntg.Volumes) // Copy the network objects if tg.Networks != nil { @@ -4860,6 +4883,25 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader")) } + // Validate the Host Volumes + for name, decl := range tg.Volumes { + if decl.Type != VolumeTypeHost { + // TODO: Remove this error when adding new volume types + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Type)) + continue + } + + cfg, err := ParseHostVolumeConfig(decl.Config) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unparseable config: %v", name, err)) + continue + } + + if cfg.Source == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has an empty source", name)) + } + } + // Validate task group and task network resources if err := tg.validateNetworks(); err != nil { outer := fmt.Errorf("Task group network validation failed: %v", err) @@ -4868,6 +4910,19 @@ func (tg *TaskGroup) Validate(j *Job) error { // Validate the tasks for _, task := range tg.Tasks { + // Validate the task does not reference undefined volume mounts + for i, mnt := range task.VolumeMounts { + if mnt.Volume == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a volume mount (%d) referencing an empty volume", task.Name, i)) + continue + } + + if _, ok := tg.Volumes[mnt.Volume]; !ok { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a volume mount (%d) referencing undefined volume %s", task.Name, i, mnt.Volume)) + continue + } + } + if err := task.Validate(tg.EphemeralDisk, j.Type); err != nil { outer := fmt.Errorf("Task %s validation failed: %v", task.Name, err) mErr.Errors = append(mErr.Errors, outer) @@ -5616,6 +5671,10 @@ type Task struct { // task from Consul and sending it a signal to shutdown. See #2441 ShutdownDelay time.Duration + // VolumeMounts is a list of Volume name <-> mount configurations that will be + // attached to this task. + VolumeMounts []*VolumeMount + // The kill signal to use for the task. This is an optional specification, // KillSignal is the kill signal to use for the task. This is an optional @@ -5641,6 +5700,7 @@ func (t *Task) Copy() *Task { nt.Constraints = CopySliceConstraints(nt.Constraints) nt.Affinities = CopySliceAffinities(nt.Affinities) + nt.VolumeMounts = CopySliceVolumeMount(nt.VolumeMounts) nt.Vault = nt.Vault.Copy() nt.Resources = nt.Resources.Copy() diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 6f1fc03f634..2e02d76cd47 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -868,6 +868,75 @@ func TestTaskGroup_Validate(t *testing.T) { err = tg.Validate(j) require.Contains(t, err.Error(), "Port label http already in use") require.Contains(t, err.Error(), "Port mapped to 80 already in use") + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "nothost", + Config: map[string]interface{}{ + "sOuRcE": "foo", + }, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + } + err = tg.Validate(&Job{}) + require.Contains(t, err.Error(), `Volume foo has unrecognised type nothost`) + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "host", + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + } + err = tg.Validate(&Job{}) + require.Contains(t, err.Error(), `Volume foo has an empty source`) + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "host", + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "", + }, + }, + }, + { + Name: "task-b", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "foob", + }, + }, + }, + }, + } + err = tg.Validate(&Job{}) + expected = `Task task-a has a volume mount (0) referencing an empty volume` + require.Contains(t, err.Error(), expected) + + expected = `Task task-b has a volume mount (0) referencing undefined volume foob` + require.Contains(t, err.Error(), expected) } func TestTask_Validate(t *testing.T) { diff --git a/nomad/structs/volumes.go b/nomad/structs/volumes.go new file mode 100644 index 00000000000..519c4e42e97 --- /dev/null +++ b/nomad/structs/volumes.go @@ -0,0 +1,169 @@ +package structs + +import ( + "github.com/mitchellh/copystructure" + "github.com/mitchellh/mapstructure" +) + +const ( + VolumeTypeHost = "host" +) + +// ClientHostVolumeConfig is used to configure access to host paths on a Nomad Client +type ClientHostVolumeConfig struct { + Name string `hcl:",key"` + Path string `hcl:"path"` + ReadOnly bool `hcl:"read_only"` + Hidden bool `hcl:"hidden"` +} + +func (p *ClientHostVolumeConfig) Copy() *ClientHostVolumeConfig { + if p == nil { + return nil + } + + c := new(ClientHostVolumeConfig) + *c = *p + return c +} + +func CopyMapStringClientHostVolumeConfig(m map[string]*ClientHostVolumeConfig) map[string]*ClientHostVolumeConfig { + if m == nil { + return nil + } + + nm := make(map[string]*ClientHostVolumeConfig, len(m)) + for k, v := range m { + nm[k] = v.Copy() + } + + return nm +} + +func CopySliceClientHostVolumeConfig(s []*ClientHostVolumeConfig) []*ClientHostVolumeConfig { + l := len(s) + if l == 0 { + return nil + } + + ns := make([]*ClientHostVolumeConfig, l) + for idx, cfg := range s { + ns[idx] = cfg.Copy() + } + + return ns +} + +func HostVolumeSliceMerge(a, b []*ClientHostVolumeConfig) []*ClientHostVolumeConfig { + n := make([]*ClientHostVolumeConfig, len(a)) + seenKeys := make(map[string]int, len(a)) + + for i, config := range a { + n[i] = config.Copy() + seenKeys[config.Name] = i + } + + for _, config := range b { + if fIndex, ok := seenKeys[config.Name]; ok { + n[fIndex] = config.Copy() + continue + } + + n = append(n, config.Copy()) + } + + return n +} + +// HostVolumeConfig is the struct that is expected inside the `config` section +// of a `host` type volume. +type HostVolumeConfig struct { + // Source is the name of the desired HostVolume. + Source string +} + +func (h *HostVolumeConfig) Copy() *HostVolumeConfig { + if h == nil { + return nil + } + nh := new(HostVolumeConfig) + *nh = *h + return nh +} + +// VolumeRequest is a representation of a storage volume that a TaskGroup wishes to use. +type VolumeRequest struct { + Name string + Type string + ReadOnly bool + Hidden bool + + Config map[string]interface{} +} + +func (v *VolumeRequest) Copy() *VolumeRequest { + if v == nil { + return nil + } + nv := new(VolumeRequest) + *nv = *v + + if i, err := copystructure.Copy(nv.Config); err != nil { + panic(err.Error()) + } else { + nv.Config = i.(map[string]interface{}) + } + + return nv +} + +func CopyMapVolumeRequest(s map[string]*VolumeRequest) map[string]*VolumeRequest { + if s == nil { + return nil + } + + l := len(s) + c := make(map[string]*VolumeRequest, l) + for k, v := range s { + c[k] = v.Copy() + } + return c +} + +// VolumeMount represents the relationship between a destination path in a task +// and the task group volume that should be mounted there. +type VolumeMount struct { + Volume string + Destination string + ReadOnly bool +} + +func (v *VolumeMount) Copy() *VolumeMount { + if v == nil { + return nil + } + + nv := new(VolumeMount) + *nv = *v + return nv +} + +func CopySliceVolumeMount(s []*VolumeMount) []*VolumeMount { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*VolumeMount, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + +func ParseHostVolumeConfig(m map[string]interface{}) (*HostVolumeConfig, error) { + var c HostVolumeConfig + err := mapstructure.Decode(m, &c) + + return &c, err +} diff --git a/plugins/drivers/driver.go b/plugins/drivers/driver.go index f2e77005063..a1b8f2f3ee9 100644 --- a/plugins/drivers/driver.go +++ b/plugins/drivers/driver.go @@ -362,6 +362,12 @@ type MountConfig struct { Readonly bool } +func (m *MountConfig) IsEqual(o *MountConfig) bool { + return m.TaskPath == o.TaskPath && + m.HostPath == o.HostPath && + m.Readonly == o.Readonly +} + func (m *MountConfig) Copy() *MountConfig { if m == nil { return nil diff --git a/scheduler/feasible.go b/scheduler/feasible.go index d8804732846..15f444acea6 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -96,6 +96,77 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { return NewStaticIterator(ctx, nodes) } +// HostVolumeChecker is a FeasibilityChecker which returns whether a node has +// the host volumes necessary to schedule a task group. +type HostVolumeChecker struct { + ctx Context + + // volumes is a map[HostVolumeName][]RequestedVolume. The requested volumes are + // a slice because a single task group may request the same volume multiple times. + volumes map[string][]*structs.VolumeRequest +} + +// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes +func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { + return &HostVolumeChecker{ + ctx: ctx, + } +} + +// SetVolumes takes the volumes required by a task group and updates the checker. +func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) { + nm := make(map[string][]*structs.VolumeRequest) + + // Convert the map from map[DesiredName]Request to map[Source][]Request to improve + // lookup performance. Also filter non-host volumes. + for _, req := range volumes { + if req.Type != structs.VolumeTypeHost { + continue + } + + cfg, err := structs.ParseHostVolumeConfig(req.Config) + if err != nil { + // Could not parse host volume config, skip the volume for now. + continue + } + + nm[cfg.Source] = append(nm[cfg.Source], req) + } + h.volumes = nm +} + +func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool { + if h.hasVolumes(candidate) { + return true + } + + h.ctx.Metrics().FilterNode(candidate, "missing compatible host volumes") + return false +} + +func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { + rLen := len(h.volumes) + hLen := len(n.HostVolumes) + + // Fast path: Requested no volumes. No need to check further. + if rLen == 0 { + return true + } + + // Fast path: Requesting more volumes than the node has, can't meet the criteria. + if rLen > hLen { + return false + } + + for source := range h.volumes { + if _, ok := n.HostVolumes[source]; !ok { + return false + } + } + + return true +} + // DriverChecker is a FeasibilityChecker which returns whether a node has the // drivers necessary to scheduler a task group. type DriverChecker struct { diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 1481b318b59..0c74aaf7290 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -81,6 +81,88 @@ func TestRandomIterator(t *testing.T) { } } +func TestHostVolumeChecker(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {Name: "foo"}} + nodes[2].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {}, + "bar": {}, + } + nodes[3].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {}, + "bar": {}, + } + nodes[4].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {}, + "baz": {}, + } + + noVolumes := map[string]*structs.VolumeRequest{} + + volumes := map[string]*structs.VolumeRequest{ + "foo": { + Type: "host", + Config: map[string]interface{}{"source": "foo"}, + }, + "bar": { + Type: "host", + Config: map[string]interface{}{"source": "bar"}, + }, + "baz": { + Type: "nothost", + Config: map[string]interface{}{"source": "baz"}, + }, + } + + checker := NewHostVolumeChecker(ctx) + cases := []struct { + Node *structs.Node + RequestedVolumes map[string]*structs.VolumeRequest + Result bool + }{ + { // Nil Volumes, some requested + Node: nodes[0], + RequestedVolumes: volumes, + Result: false, + }, + { // Mismatched set of volumes + Node: nodes[1], + RequestedVolumes: volumes, + Result: false, + }, + { // Happy Path + Node: nodes[2], + RequestedVolumes: volumes, + Result: true, + }, + { // No Volumes requested or available + Node: nodes[3], + RequestedVolumes: noVolumes, + Result: true, + }, + { // No Volumes requested, some available + Node: nodes[4], + RequestedVolumes: noVolumes, + Result: true, + }, + } + + for i, c := range cases { + checker.SetVolumes(c.RequestedVolumes) + if act := checker.Feasible(c.Node); act != c.Result { + t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) + } + } +} + func TestDriverChecker(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ diff --git a/scheduler/stack.go b/scheduler/stack.go index 037caf61bc5..3e2b1b0b217 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -44,12 +44,13 @@ type GenericStack struct { ctx Context source *StaticIterator - wrappedChecks *FeasibilityWrapper - quota FeasibleIterator - jobConstraint *ConstraintChecker - taskGroupDrivers *DriverChecker - taskGroupConstraint *ConstraintChecker - taskGroupDevices *DeviceChecker + wrappedChecks *FeasibilityWrapper + quota FeasibleIterator + jobConstraint *ConstraintChecker + taskGroupDrivers *DriverChecker + taskGroupConstraint *ConstraintChecker + taskGroupDevices *DeviceChecker + taskGroupHostVolumes *HostVolumeChecker distinctHostsConstraint *DistinctHostsIterator distinctPropertyConstraint *DistinctPropertyIterator @@ -129,6 +130,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) + s.taskGroupHostVolumes.SetVolumes(tg.Volumes) s.distinctHostsConstraint.SetTaskGroup(tg) s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) @@ -165,12 +167,13 @@ type SystemStack struct { ctx Context source *StaticIterator - wrappedChecks *FeasibilityWrapper - quota FeasibleIterator - jobConstraint *ConstraintChecker - taskGroupDrivers *DriverChecker - taskGroupConstraint *ConstraintChecker - taskGroupDevices *DeviceChecker + wrappedChecks *FeasibilityWrapper + quota FeasibleIterator + jobConstraint *ConstraintChecker + taskGroupDrivers *DriverChecker + taskGroupConstraint *ConstraintChecker + taskGroupDevices *DeviceChecker + taskGroupHostVolumes *HostVolumeChecker distinctPropertyConstraint *DistinctPropertyIterator binPack *BinPackIterator @@ -199,6 +202,9 @@ func NewSystemStack(ctx Context) *SystemStack { // Filter on task group constraints second s.taskGroupConstraint = NewConstraintChecker(ctx, nil) + // Filter on task group host volumes + s.taskGroupHostVolumes = NewHostVolumeChecker(ctx) + // Filter on task group devices s.taskGroupDevices = NewDeviceChecker(ctx) @@ -207,7 +213,7 @@ func NewSystemStack(ctx Context) *SystemStack { // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices} + tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs) // Filter on distinct property constraints. @@ -260,6 +266,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) + s.taskGroupHostVolumes.SetVolumes(tg.Volumes) s.wrappedChecks.SetTaskGroup(tg.Name) s.distinctPropertyConstraint.SetTaskGroup(tg) s.binPack.SetTaskGroup(tg) diff --git a/scheduler/stack_oss.go b/scheduler/stack_oss.go index 3ef90286ff5..fcbc533a6a4 100644 --- a/scheduler/stack_oss.go +++ b/scheduler/stack_oss.go @@ -31,12 +31,15 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // Filter on task group devices s.taskGroupDevices = NewDeviceChecker(ctx) + // Filter on task group host volumes + s.taskGroupHostVolumes = NewHostVolumeChecker(ctx) + // Create the feasibility wrapper which wraps all feasibility checks in // which feasibility checking can be skipped if the computed node class has // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices} + tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs) // Filter on distinct host constraints.