From ad72d166ee6143e6283c10d4ffa3dbc76bedc8f9 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 16:42:11 +0200 Subject: [PATCH 01/17] jobspec: Add Volume and VolumeMount declarations --- jobspec/parse_group.go | 67 +++++++++++++++++++++++++++++++++ jobspec/parse_task.go | 47 +++++++++++++++++++++++ jobspec/parse_test.go | 14 +++++++ jobspec/test-fixtures/basic.hcl | 9 +++++ 4 files changed, 137 insertions(+) diff --git a/jobspec/parse_group.go b/jobspec/parse_group.go index 222fca62bc5..d7967780984 100644 --- a/jobspec/parse_group.go +++ b/jobspec/parse_group.go @@ -53,6 +53,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { "spread", "network", "service", + "volume", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -74,6 +75,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { delete(m, "spread") delete(m, "network") delete(m, "service") + delete(m, "volume") // Build the group with the basic decode var g api.TaskGroup @@ -161,6 +163,13 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { } } + // Parse any volume declarations + if o := listVal.Filter("volume"); len(o.Items) > 0 { + if err := parseVolumes(&g.Volumes, o); err != nil { + return multierror.Prefix(err, "volume ->") + } + } + // Parse tasks if o := listVal.Filter("task"); len(o.Items) > 0 { if err := parseTasks(&g.Tasks, o); err != nil { @@ -274,3 +283,61 @@ func parseRestartPolicy(final **api.RestartPolicy, list *ast.ObjectList) error { *final = &result return nil } + +func parseVolumes(out *map[string]*api.Volume, list *ast.ObjectList) error { + volumes := make(map[string]*api.Volume, len(list.Items)) + + for _, item := range list.Items { + n := item.Keys[0].Token.Value().(string) + valid := []string{ + "type", + "read_only", + "hidden", + "config", + } + if err := helper.CheckHCLKeys(item.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, item.Val); err != nil { + return err + } + + // TODO(dani): FIXME: this is gross but we don't have ObjectList.Filter here + var cfg map[string]interface{} + if cfgI, ok := m["config"]; ok { + cfgL, ok := cfgI.([]map[string]interface{}) + if !ok { + return fmt.Errorf("Incorrect `config` type, expected map") + } + + if len(cfgL) != 1 { + return fmt.Errorf("Expected single `config`, found %d", len(cfgL)) + } + + cfg = cfgL[0] + } + delete(m, "config") + + var result api.Volume + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + WeaklyTypedInput: true, + Result: &result, + }) + if err != nil { + return err + } + if err := dec.Decode(m); err != nil { + return err + } + + result.Name = n + result.Config = cfg + + volumes[n] = &result + } + + *out = volumes + return nil +} diff --git a/jobspec/parse_task.go b/jobspec/parse_task.go index 425f9536505..52def23be2e 100644 --- a/jobspec/parse_task.go +++ b/jobspec/parse_task.go @@ -72,6 +72,7 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) { "vault", "kill_signal", "kind", + "volume_mount", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return nil, err @@ -93,6 +94,7 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) { delete(m, "service") delete(m, "template") delete(m, "vault") + delete(m, "volume_mount") // Build the task var t api.Task @@ -173,6 +175,14 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) { } } + // Parse volume mounts + if o := listVal.Filter("volume_mount"); len(o.Items) > 0 { + if err := parseVolumeMounts(&t.VolumeMounts, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf( + "'%s', volume_mount ->", n)) + } + } + // If we have resources, then parse that if o := listVal.Filter("resources"); len(o.Items) > 0 { var r api.Resources @@ -504,3 +514,40 @@ func parseResources(result *api.Resources, list *ast.ObjectList) error { return nil } + +func parseVolumeMounts(out *[]*api.VolumeMount, list *ast.ObjectList) error { + mounts := make([]*api.VolumeMount, len(list.Items)) + + for i, item := range list.Items { + valid := []string{ + "volume", + "read_only", + "destination", + } + if err := helper.CheckHCLKeys(item.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, item.Val); err != nil { + return err + } + + var result api.VolumeMount + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + WeaklyTypedInput: true, + Result: &result, + }) + if err != nil { + return err + } + if err := dec.Decode(m); err != nil { + return err + } + + mounts[i] = &result + } + + *out = mounts + return nil +} diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 5e7e898c288..a1107e030cf 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -86,6 +86,7 @@ func TestParse(t *testing.T) { TaskGroups: []*api.TaskGroup{ { Name: helper.StringToPtr("outside"), + Tasks: []*api.Task{ { Name: "outside", @@ -110,6 +111,13 @@ func TestParse(t *testing.T) { Operand: "=", }, }, + + Volumes: map[string]*api.Volume{ + "foo": { + Name: "foo", + Type: "host", + }, + }, Affinities: []*api.Affinity{ { LTarget: "${node.datacenter}", @@ -187,6 +195,12 @@ func TestParse(t *testing.T) { }, }, }, + VolumeMounts: []*api.VolumeMount{ + { + Volume: "foo", + Destination: "/mnt/foo", + }, + }, Affinities: []*api.Affinity{ { LTarget: "${meta.foo}", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 535895a6802..2d32878ef8d 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -63,6 +63,10 @@ job "binstore-storagelocker" { group "binsl" { count = 5 + volume "foo" { + type = "host" + } + restart { attempts = 5 interval = "10m" @@ -143,6 +147,11 @@ job "binstore-storagelocker" { } } + volume_mount { + volume = "foo" + destination = "/mnt/foo" + } + logs { max_files = 14 max_file_size = 101 From 8514893d1dca53442b37eca58f6ce96f0c43470d Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 16:44:08 +0200 Subject: [PATCH 02/17] structs: Add declarations of basic structs for volume support --- api/tasks.go | 17 +++ nomad/structs/structs.go | 60 +++++++++++ nomad/structs/structs_test.go | 77 ++++++++++++++ nomad/structs/volumes.go | 188 ++++++++++++++++++++++++++++++++++ 4 files changed, 342 insertions(+) create mode 100644 nomad/structs/volumes.go diff --git a/api/tasks.go b/api/tasks.go index f12f761e7d9..e044f9a9fbe 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -362,6 +362,21 @@ func (m *MigrateStrategy) Copy() *MigrateStrategy { return nm } +type Volume struct { + Name string + Type string + ReadOnly bool `mapstructure:"read_only"` + Hidden bool + + Config map[string]interface{} +} + +type VolumeMount struct { + Volume string + Destination string + ReadOnly bool `mapstructure:"read_only"` +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { Name *string @@ -370,6 +385,7 @@ type TaskGroup struct { Affinities []*Affinity Tasks []*Task Spreads []*Spread + Volumes map[string]*Volume RestartPolicy *RestartPolicy ReschedulePolicy *ReschedulePolicy EphemeralDisk *EphemeralDisk @@ -580,6 +596,7 @@ type Task struct { Vault *Vault Templates []*Template DispatchPayload *DispatchPayloadConfig + VolumeMounts []*VolumeMount Leader bool ShutdownDelay time.Duration `mapstructure:"shutdown_delay"` KillSignal string `mapstructure:"kill_signal"` diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c663703aa53..5deda22615d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1575,6 +1575,9 @@ type Node struct { // Drivers is a map of driver names to current driver information Drivers map[string]*DriverInfo + // HostVolumes is a map of host volume names to their configuration + HostVolumes map[string]*ClientHostVolumeConfig + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -1619,6 +1622,7 @@ func (n *Node) Copy() *Node { nn.Events = copyNodeEvents(n.Events) nn.DrainStrategy = nn.DrainStrategy.Copy() nn.Drivers = copyNodeDrivers(n.Drivers) + nn.HostVolumes = copyNodeHostVolumes(n.HostVolumes) return nn } @@ -1650,6 +1654,21 @@ func copyNodeDrivers(drivers map[string]*DriverInfo) map[string]*DriverInfo { return c } +// copyNodeHostVolumes is a helper to copy a map of string to Volume +func copyNodeHostVolumes(volumes map[string]*ClientHostVolumeConfig) map[string]*ClientHostVolumeConfig { + l := len(volumes) + if l == 0 { + return nil + } + + c := make(map[string]*ClientHostVolumeConfig, l) + for volume, v := range volumes { + c[volume] = v.Copy() + } + + return c +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (n *Node) TerminalStatus() bool { @@ -4659,6 +4678,9 @@ type TaskGroup struct { // Services this group provides Services []*Service + + // Volumes is a map of volumes that have been requested by the task group. + Volumes map[string]*VolumeRequest } func (tg *TaskGroup) Copy() *TaskGroup { @@ -4673,6 +4695,7 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.ReschedulePolicy = ntg.ReschedulePolicy.Copy() ntg.Affinities = CopySliceAffinities(ntg.Affinities) ntg.Spreads = CopySliceSpreads(ntg.Spreads) + ntg.Volumes = CopyMapVolumeRequest(ntg.Volumes) // Copy the network objects if tg.Networks != nil { @@ -4860,6 +4883,25 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader")) } + // Validate the Host Volumes + for name, decl := range tg.Volumes { + if decl.Volume.Type != VolumeTypeHost { + // TODO: Remove this error when adding new volume types + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Volume.Type)) + continue + } + + cfg, err := ParseHostVolumeConfig(decl.Config) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unparseable config: %v", name, err)) + continue + } + + if cfg.Source == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has an empty source", name)) + } + } + // Validate task group and task network resources if err := tg.validateNetworks(); err != nil { outer := fmt.Errorf("Task group network validation failed: %v", err) @@ -4868,6 +4910,19 @@ func (tg *TaskGroup) Validate(j *Job) error { // Validate the tasks for _, task := range tg.Tasks { + // Validate the task does not reference undefined volume mounts + for i, mnt := range task.VolumeMounts { + if mnt.Volume == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a volume mount (%d) referencing an empty volume", task.Name, i)) + continue + } + + if _, ok := tg.Volumes[mnt.Volume]; !ok { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a volume mount (%d) referencing undefined volume %s", task.Name, i, mnt.Volume)) + continue + } + } + if err := task.Validate(tg.EphemeralDisk, j.Type); err != nil { outer := fmt.Errorf("Task %s validation failed: %v", task.Name, err) mErr.Errors = append(mErr.Errors, outer) @@ -5137,6 +5192,10 @@ type Task struct { // task from Consul and sending it a signal to shutdown. See #2441 ShutdownDelay time.Duration + // VolumeMounts is a list of Volume name <-> mount configurations that will be + // attached to this task. + VolumeMounts []*VolumeMount + // The kill signal to use for the task. This is an optional specification, // KillSignal is the kill signal to use for the task. This is an optional @@ -5166,6 +5225,7 @@ func (t *Task) Copy() *Task { nt.Constraints = CopySliceConstraints(nt.Constraints) nt.Affinities = CopySliceAffinities(nt.Affinities) + nt.VolumeMounts = CopySliceVolumeMount(nt.VolumeMounts) nt.Vault = nt.Vault.Copy() nt.Resources = nt.Resources.Copy() diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 5456d9db54d..5ec339bdf5e 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -868,6 +868,83 @@ func TestTaskGroup_Validate(t *testing.T) { err = tg.Validate(j) require.Contains(t, err.Error(), "Port label http already in use") require.Contains(t, err.Error(), "Port mapped to 80 already in use") + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Volume: &Volume{ + Type: "nothost", + }, + Config: map[string]interface{}{ + "sOuRcE": "foo", + }, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + } + err = tg.Validate(&Job{}) + require.Contains(t, err.Error(), `Volume foo has unrecognised type nothost`) + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Volume: &Volume{ + Type: "host", + }, + Config: nil, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + } + err = tg.Validate(&Job{}) + require.Contains(t, err.Error(), `Volume foo has an empty source`) + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Volume: &Volume{ + Type: "host", + }, + Config: nil, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "", + }, + }, + }, + { + Name: "task-b", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "foob", + }, + }, + }, + }, + } + err = tg.Validate(&Job{}) + expected = `Task task-a has a volume mount (0) referencing an empty volume` + require.Contains(t, err.Error(), expected) + + expected = `Task task-b has a volume mount (0) referencing undefined volume foob` + require.Contains(t, err.Error(), expected) } func TestTask_Validate(t *testing.T) { diff --git a/nomad/structs/volumes.go b/nomad/structs/volumes.go new file mode 100644 index 00000000000..80ea52f964f --- /dev/null +++ b/nomad/structs/volumes.go @@ -0,0 +1,188 @@ +package structs + +import ( + "github.com/mitchellh/copystructure" + "github.com/mitchellh/mapstructure" +) + +const ( + VolumeTypeHost = "host" +) + +// ClientHostVolumeConfig is used to configure access to host paths on a Nomad Client +type ClientHostVolumeConfig struct { + Name string `hcl:",key"` + Source string `hcl:"source"` + ReadOnly bool `hcl:"read_only"` + Hidden bool `hcl:"hidden"` +} + +func (p *ClientHostVolumeConfig) Copy() *ClientHostVolumeConfig { + if p == nil { + return nil + } + + c := new(ClientHostVolumeConfig) + *c = *p + return c +} + +func CopyMapStringClientHostVolumeConfig(m map[string]*ClientHostVolumeConfig) map[string]*ClientHostVolumeConfig { + if m == nil { + return nil + } + + nm := make(map[string]*ClientHostVolumeConfig, len(m)) + for k, v := range m { + nm[k] = v.Copy() + } + + return nm +} + +func HostVolumeSliceMerge(a, b []*ClientHostVolumeConfig) []*ClientHostVolumeConfig { + n := make([]*ClientHostVolumeConfig, len(a)) + seenKeys := make(map[string]struct{}, len(a)) + + for k, v := range a { + if _, ok := seenKeys[v.Name]; ok { + continue + } + n[k] = v.Copy() + seenKeys[v.Name] = struct{}{} + } + for k, v := range b { + if _, ok := seenKeys[v.Name]; ok { + continue + } + n[k] = v.Copy() + seenKeys[v.Name] = struct{}{} + } + + return n +} + +// HostVolumeConfig is the struct that is expected inside the `config` section +// of a `host` type volume. +type HostVolumeConfig struct { + // Source is the name of the desired HostVolume. + Source string +} + +func (h *HostVolumeConfig) Copy() *HostVolumeConfig { + if h == nil { + return nil + } + nh := new(HostVolumeConfig) + *nh = *h + return nh +} + +// Volume is a representation of a storage volume that a TaskGroup wishes to use. +type Volume struct { + Name string + Type string + ReadOnly bool + Hidden bool + + Config map[string]interface{} +} + +func (v *Volume) Copy() *Volume { + if v == nil { + return nil + } + nv := new(Volume) + *nv = *v + + if i, err := copystructure.Copy(nv.Config); err != nil { + panic(err.Error()) + } else { + nv.Config = i.(map[string]interface{}) + } + + return nv +} + +func CopyMapVolumes(s map[string]*Volume) map[string]*Volume { + if s == nil { + return nil + } + + l := len(s) + c := make(map[string]*Volume, l) + for k, v := range s { + c[k] = v.Copy() + } + return c +} + +// VolumeMount is ... +type VolumeMount struct { + Volume string + Destination string + ReadOnly bool +} + +func (v *VolumeMount) Copy() *VolumeMount { + if v == nil { + return nil + } + + nv := new(VolumeMount) + *nv = *v + return nv +} + +func CopySliceVolumeMount(s []*VolumeMount) []*VolumeMount { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*VolumeMount, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + +type VolumeRequest struct { + Volume *Volume + Config map[string]interface{} +} + +func (h *VolumeRequest) Copy() *VolumeRequest { + if h == nil { + return nil + } + + c := new(VolumeRequest) + c.Volume = h.Volume.Copy() + if i, err := copystructure.Copy(h.Config); err != nil { + panic(err.Error()) + } else { + c.Config = i.(map[string]interface{}) + } + return c +} + +func CopyMapVolumeRequest(m map[string]*VolumeRequest) map[string]*VolumeRequest { + if m == nil { + return nil + } + + l := len(m) + c := make(map[string]*VolumeRequest, l) + for k, v := range m { + c[k] = v.Copy() + } + return c +} + +func ParseHostVolumeConfig(m map[string]interface{}) (*HostVolumeConfig, error) { + var c HostVolumeConfig + err := mapstructure.Decode(m, &c) + + return &c, err +} From 86b4296f9d1bdf56433b28fb9b6eb1a51ee020f1 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 16:45:41 +0200 Subject: [PATCH 03/17] client: Add parsing and registration of HostVolume configuration --- client/client.go | 10 ++++++ client/config/config.go | 4 +++ command/agent/agent.go | 6 ++++ command/agent/config.go | 14 ++++++++ command/agent/config_parse.go | 5 +++ command/agent/config_parse_test.go | 4 +++ command/agent/testdata/basic.hcl | 4 +++ command/agent/testdata/basic.json | 55 +++++++++++++++++++++--------- 8 files changed, 86 insertions(+), 16 deletions(-) diff --git a/client/client.go b/client/client.go index f99c2f330ac..ace692e56e6 100644 --- a/client/client.go +++ b/client/client.go @@ -1225,6 +1225,16 @@ func (c *Client) setupNode() error { if node.Name == "" { node.Name, _ = os.Hostname() } + // TODO(dani): Fingerprint these to handle volumes that don't exist/have bad perms. + if node.HostVolumes == nil { + if l := len(c.config.HostVolumes); l != 0 { + node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig, l) + for k, v := range c.config.HostVolumes { + node.HostVolumes[k] = v.Copy() + } + } + } + if node.Name == "" { node.Name = node.ID } diff --git a/client/config/config.go b/client/config/config.go index 9bac1803a03..cb7e25726e6 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -234,6 +234,9 @@ type Config struct { // for allocations in bridge networking mode. Subnet must be in CIDR // notation BridgeNetworkAllocSubnet string + + // HostVolumes is the set of configured host volumes + HostVolumes map[string]*structs.ClientHostVolumeConfig } func (c *Config) Copy() *Config { @@ -242,6 +245,7 @@ func (c *Config) Copy() *Config { nc.Node = nc.Node.Copy() nc.Servers = helper.CopySliceString(nc.Servers) nc.Options = helper.CopyMapStringString(nc.Options) + nc.HostVolumes = structs.CopyMapStringClientHostVolumeConfig(nc.HostVolumes) nc.ConsulConfig = c.ConsulConfig.Copy() nc.VaultConfig = c.VaultConfig.Copy() return nc diff --git a/command/agent/agent.go b/command/agent/agent.go index f0f2cebf528..6cfe7266bad 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -469,6 +469,12 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { conf.ClientMinPort = uint(agentConfig.Client.ClientMinPort) conf.DisableRemoteExec = agentConfig.Client.DisableRemoteExec + hvMap := make(map[string]*structs.ClientHostVolumeConfig, len(agentConfig.Client.HostVolumes)) + for _, v := range agentConfig.Client.HostVolumes { + hvMap[v.Name] = v + } + conf.HostVolumes = hvMap + // Setup the node conf.Node = new(structs.Node) conf.Node.Datacenter = agentConfig.Datacenter diff --git a/command/agent/config.go b/command/agent/config.go index fe3f74c4bfa..2b500030747 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -245,6 +245,10 @@ type ClientConfig struct { // ServerJoin contains information that is used to attempt to join servers ServerJoin *ServerJoin `hcl:"server_join"` + // HostVolumes contains information about the volumes an operator has made + // available to jobs running on this node. + HostVolumes []*structs.ClientHostVolumeConfig `hcl:"host_volume"` + // ExtraKeysHCL is used by hcl to surface unexpected keys ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` @@ -1322,6 +1326,16 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin) } + if len(a.HostVolumes) == 0 && len(b.HostVolumes) != 0 { + cc := make([]*structs.ClientHostVolumeConfig, len(b.HostVolumes)) + for k, v := range b.HostVolumes { + cc[k] = v.Copy() + } + result.HostVolumes = cc + } else if len(b.HostVolumes) != 0 { + result.HostVolumes = structs.HostVolumeSliceMerge(a.HostVolumes, b.HostVolumes) + } + return &result } diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 225ca894210..0056af6effa 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -138,6 +138,11 @@ func extraKeys(c *Config) error { // stats is an unused key, continue to silently ignore it removeEqualFold(&c.Client.ExtraKeysHCL, "stats") + // Remove HostVolume extra keys + for _, hv := range c.Client.HostVolumes { + removeEqualFold(&c.Client.ExtraKeysHCL, hv.Name) + } + for _, k := range []string{"enabled_schedulers", "start_join", "retry_join", "server_join"} { removeEqualFold(&c.ExtraKeysHCL, k) removeEqualFold(&c.ExtraKeysHCL, "server") diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index c8593ce59c3..cbb57b83386 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/stretchr/testify/require" ) @@ -81,6 +82,9 @@ var basicConfig = &Config{ GCMaxAllocs: 50, NoHostUUID: helper.BoolToPtr(false), DisableRemoteExec: true, + HostVolumes: []*structs.ClientHostVolumeConfig{ + {Name: "tmp", Source: "/tmp"}, + }, }, Server: &ServerConfig{ Enabled: true, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 97b4cd99c04..68176295e56 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -89,6 +89,10 @@ client { gc_max_allocs = 50 no_host_uuid = false disable_remote_exec = true + + host_volume "tmp" { + source = "/tmp" + } } server { diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index 11af6beb4bb..f855dd3976f 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -44,12 +44,22 @@ "client_max_port": 2000, "client_min_port": 1000, "cpu_total_compute": 4444, + "disable_remote_exec": true, "enabled": true, "gc_disk_usage_threshold": 82, "gc_inode_usage_threshold": 91, "gc_interval": "6s", "gc_max_allocs": 50, "gc_parallel_destroys": 6, + "host_volume": [ + { + "tmp": [ + { + "source": "/tmp" + } + ] + } + ], "max_kill_timeout": "10s", "meta": [ { @@ -60,7 +70,6 @@ "network_interface": "eth0", "network_speed": 100, "no_host_uuid": false, - "disable_remote_exec": true, "node_class": "linux-medium-64bit", "options": [ { @@ -137,25 +146,39 @@ "log_json": true, "log_level": "ERR", "name": "my-web", - "plugin": { - "docker": { - "args": [ - "foo", - "bar" - ], - "config": { - "foo": "bar", - "nested": { - "bam": 2 + "plugin": [ + { + "docker": [ + { + "args": [ + "foo", + "bar" + ], + "config": [ + { + "foo": "bar", + "nested": [ + { + "bam": 2 + } + ] + } + ] } - } + ] }, - "exec": { - "config": { - "foo": true + { + "exec": [ + { + "config": [ + { + "foo": true + } + ] } + ] } - }, + ], "plugin_dir": "/tmp/nomad-plugins", "ports": [ { From a216daedceec43f36b2309d2281c50d58246097b Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 16:46:18 +0200 Subject: [PATCH 04/17] api: Allow submission of jobs with volumes --- command/agent/job_endpoint.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 2ff6e48bbf0..fd857a181b2 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -189,6 +189,7 @@ func (s *HTTPServer) ValidateJobRequest(resp http.ResponseWriter, req *http.Requ } job := ApiJobToStructJob(validateRequest.Job) + args := structs.JobValidateRequest{ Job: job, WriteRequest: structs.WriteRequest{ @@ -740,6 +741,29 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { } } + if l := len(taskGroup.Volumes); l != 0 { + tg.Volumes = make(map[string]*structs.VolumeRequest, l) + for k, v := range taskGroup.Volumes { + if v.Type != structs.VolumeTypeHost { + // Ignore non-host volumes in this iteration currently. + continue + } + + vol := &structs.Volume{ + Name: v.Name, + Type: v.Type, + ReadOnly: v.ReadOnly, + Hidden: v.Hidden, + Config: v.Config, + } + + tg.Volumes[k] = &structs.VolumeRequest{ + Volume: vol, + Config: v.Config, + } + } + } + if taskGroup.Update != nil { tg.Update = &structs.UpdateStrategy{ Stagger: *taskGroup.Update.Stagger, @@ -788,6 +812,17 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { structsTask.Constraints = ApiConstraintsToStructs(apiTask.Constraints) structsTask.Affinities = ApiAffinitiesToStructs(apiTask.Affinities) + if l := len(apiTask.VolumeMounts); l != 0 { + structsTask.VolumeMounts = make([]*structs.VolumeMount, l) + for i, mount := range apiTask.VolumeMounts { + structsTask.VolumeMounts[i] = &structs.VolumeMount{ + Volume: mount.Volume, + Destination: mount.Destination, + ReadOnly: mount.ReadOnly, + } + } + } + if l := len(apiTask.Services); l != 0 { structsTask.Services = make([]*structs.Service, l) for i, service := range apiTask.Services { From 709abbc6756d94965850066667c08df5131c16e8 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 16:46:29 +0200 Subject: [PATCH 05/17] scheduler: Add a feasability checker for Host Vols --- scheduler/feasible.go | 63 +++++++++++++++++++++++++++++ scheduler/feasible_test.go | 82 ++++++++++++++++++++++++++++++++++++++ scheduler/stack.go | 33 +++++++++------ scheduler/stack_oss.go | 5 ++- 4 files changed, 169 insertions(+), 14 deletions(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index d8804732846..efad1754a41 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -96,6 +96,69 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { return NewStaticIterator(ctx, nodes) } +// HostVolumeChecker is a FeasibilityChecker which returns whether a node has +// the host volumes necessary to schedule a task group. +type HostVolumeChecker struct { + ctx Context + volumes map[string][]*structs.VolumeRequest +} + +// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes +func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { + return &HostVolumeChecker{ + ctx: ctx, + } +} + +// SetVolumes takes the volumes required by a task group and updates the checker. +func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) { + nm := make(map[string][]*structs.VolumeRequest) + + // Convert the map from map[DesiredName]Request to map[Source][]Request to improve + // lookup performance. Also filter non-host volumes. + for _, req := range volumes { + if req.Volume.Type != "host" { + continue + } + + cfg, err := structs.ParseHostVolumeConfig(req.Config) + if err != nil { + // Could not parse host volume config, skip the volume for now. + continue + } + + nm[cfg.Source] = append(nm[cfg.Source], req) + } + h.volumes = nm +} + +func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool { + if h.hasVolumes(candidate) { + return true + } + + h.ctx.Metrics().FilterNode(candidate, "missing compatible host volumes") + return false +} + +func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { + hLen := len(h.volumes) + nLen := len(n.HostVolumes) + + // Fast path: Requesting more volumes than the node has, can't meet the criteria. + if hLen > nLen { + return false + } + + for source := range h.volumes { + if _, ok := n.HostVolumes[source]; !ok { + return false + } + } + + return true +} + // DriverChecker is a FeasibilityChecker which returns whether a node has the // drivers necessary to scheduler a task group. type DriverChecker struct { diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 1481b318b59..a3abe6ed879 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -81,6 +81,88 @@ func TestRandomIterator(t *testing.T) { } } +func TestHostVolumeChecker(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {Name: "foo"}} + nodes[2].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {}, + "bar": {}, + } + nodes[3].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {}, + "bar": {}, + } + nodes[4].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {}, + "baz": {}, + } + + noVolumes := map[string]*structs.VolumeRequest{} + + volumes := map[string]*structs.VolumeRequest{ + "foo": { + Volume: &structs.Volume{Type: "host"}, + Config: map[string]interface{}{"source": "foo"}, + }, + "bar": { + Volume: &structs.Volume{Type: "host"}, + Config: map[string]interface{}{"source": "bar"}, + }, + "baz": { + Volume: &structs.Volume{Type: "nothost"}, + Config: map[string]interface{}{"source": "baz"}, + }, + } + + checker := NewHostVolumeChecker(ctx) + cases := []struct { + Node *structs.Node + RequestedVolumes map[string]*structs.VolumeRequest + Result bool + }{ + { // Nil Volumes, some requested + Node: nodes[0], + RequestedVolumes: volumes, + Result: false, + }, + { // Mismatched set of volumes + Node: nodes[1], + RequestedVolumes: volumes, + Result: false, + }, + { // Happy Path + Node: nodes[2], + RequestedVolumes: volumes, + Result: true, + }, + { // No Volumes requested or available + Node: nodes[3], + RequestedVolumes: noVolumes, + Result: true, + }, + { // No Volumes requested, some available + Node: nodes[4], + RequestedVolumes: noVolumes, + Result: true, + }, + } + + for i, c := range cases { + checker.SetVolumes(c.RequestedVolumes) + if act := checker.Feasible(c.Node); act != c.Result { + t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) + } + } +} + func TestDriverChecker(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ diff --git a/scheduler/stack.go b/scheduler/stack.go index 037caf61bc5..3e2b1b0b217 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -44,12 +44,13 @@ type GenericStack struct { ctx Context source *StaticIterator - wrappedChecks *FeasibilityWrapper - quota FeasibleIterator - jobConstraint *ConstraintChecker - taskGroupDrivers *DriverChecker - taskGroupConstraint *ConstraintChecker - taskGroupDevices *DeviceChecker + wrappedChecks *FeasibilityWrapper + quota FeasibleIterator + jobConstraint *ConstraintChecker + taskGroupDrivers *DriverChecker + taskGroupConstraint *ConstraintChecker + taskGroupDevices *DeviceChecker + taskGroupHostVolumes *HostVolumeChecker distinctHostsConstraint *DistinctHostsIterator distinctPropertyConstraint *DistinctPropertyIterator @@ -129,6 +130,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) + s.taskGroupHostVolumes.SetVolumes(tg.Volumes) s.distinctHostsConstraint.SetTaskGroup(tg) s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) @@ -165,12 +167,13 @@ type SystemStack struct { ctx Context source *StaticIterator - wrappedChecks *FeasibilityWrapper - quota FeasibleIterator - jobConstraint *ConstraintChecker - taskGroupDrivers *DriverChecker - taskGroupConstraint *ConstraintChecker - taskGroupDevices *DeviceChecker + wrappedChecks *FeasibilityWrapper + quota FeasibleIterator + jobConstraint *ConstraintChecker + taskGroupDrivers *DriverChecker + taskGroupConstraint *ConstraintChecker + taskGroupDevices *DeviceChecker + taskGroupHostVolumes *HostVolumeChecker distinctPropertyConstraint *DistinctPropertyIterator binPack *BinPackIterator @@ -199,6 +202,9 @@ func NewSystemStack(ctx Context) *SystemStack { // Filter on task group constraints second s.taskGroupConstraint = NewConstraintChecker(ctx, nil) + // Filter on task group host volumes + s.taskGroupHostVolumes = NewHostVolumeChecker(ctx) + // Filter on task group devices s.taskGroupDevices = NewDeviceChecker(ctx) @@ -207,7 +213,7 @@ func NewSystemStack(ctx Context) *SystemStack { // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices} + tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs) // Filter on distinct property constraints. @@ -260,6 +266,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) + s.taskGroupHostVolumes.SetVolumes(tg.Volumes) s.wrappedChecks.SetTaskGroup(tg.Name) s.distinctPropertyConstraint.SetTaskGroup(tg) s.binPack.SetTaskGroup(tg) diff --git a/scheduler/stack_oss.go b/scheduler/stack_oss.go index 3ef90286ff5..fcbc533a6a4 100644 --- a/scheduler/stack_oss.go +++ b/scheduler/stack_oss.go @@ -31,12 +31,15 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // Filter on task group devices s.taskGroupDevices = NewDeviceChecker(ctx) + // Filter on task group host volumes + s.taskGroupHostVolumes = NewHostVolumeChecker(ctx) + // Create the feasibility wrapper which wraps all feasibility checks in // which feasibility checking can be skipped if the computed node class has // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices} + tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs) // Filter on distinct host constraints. From c3c003dbd6238169a50f1461513e5bdb4acbc134 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 16:48:28 +0200 Subject: [PATCH 06/17] client: Add volume_hook for mounting volumes --- .../taskrunner/task_runner_hooks.go | 1 + client/allocrunner/taskrunner/volume_hook.go | 123 ++++++++++++++++++ plugins/drivers/driver.go | 6 + 3 files changed, 130 insertions(+) create mode 100644 client/allocrunner/taskrunner/volume_hook.go diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 0419f3e54fe..9fa605498d7 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -61,6 +61,7 @@ func (tr *TaskRunner) initHooks() { newTaskDirHook(tr, hookLogger), newLogMonHook(tr.logmonHookConfig, hookLogger), newDispatchHook(tr.Alloc(), hookLogger), + newVolumeHook(tr, hookLogger), newArtifactHook(tr, hookLogger), newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), newDeviceHook(tr.devicemanager, hookLogger), diff --git a/client/allocrunner/taskrunner/volume_hook.go b/client/allocrunner/taskrunner/volume_hook.go new file mode 100644 index 00000000000..d01a0da8b9d --- /dev/null +++ b/client/allocrunner/taskrunner/volume_hook.go @@ -0,0 +1,123 @@ +package taskrunner + +import ( + "context" + "fmt" + + log "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" +) + +type volumeHook struct { + alloc *structs.Allocation + runner *TaskRunner + logger log.Logger +} + +func newVolumeHook(runner *TaskRunner, logger log.Logger) *volumeHook { + h := &volumeHook{ + alloc: runner.Alloc(), + runner: runner, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*volumeHook) Name() string { + return "volumes" +} + +func validateHostVolumes(requested map[string]*structs.VolumeRequest, client map[string]*structs.ClientHostVolumeConfig) error { + var result error + + for n, req := range requested { + if req.Volume.Type != "host" { + continue + } + + cfg, err := structs.ParseHostVolumeConfig(req.Config) + if err != nil { + result = multierror.Append(result, fmt.Errorf("failed to parse config for %s: %v", n, err)) + } + + _, ok := client[cfg.Source] + if !ok { + result = multierror.Append(result, fmt.Errorf("missing %s", cfg.Source)) + } + } + + return result +} + +func (h *volumeHook) hostVolumeMountConfigurations(vmounts []*structs.VolumeMount, volumes map[string]*structs.VolumeRequest, client map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) { + var mounts []*drivers.MountConfig + for _, m := range vmounts { + req, ok := volumes[m.Volume] + if !ok { + // Should never happen unless we misvalidated on job submission + return nil, fmt.Errorf("No group volume declaration found named: %s", m.Volume) + } + + cfg, err := structs.ParseHostVolumeConfig(req.Config) + if err != nil { + return nil, fmt.Errorf("failed to parse config for %s: %v", m.Volume, err) + } + + hostVolume, ok := client[cfg.Source] + if !ok { + // Should never happen, but unless the client volumes were mutated during + // the execution of this hook. + return nil, fmt.Errorf("No host volume named: %s", cfg.Source) + } + + mcfg := &drivers.MountConfig{ + HostPath: hostVolume.Source, + TaskPath: m.Destination, + Readonly: hostVolume.ReadOnly || req.Volume.ReadOnly || m.ReadOnly, + } + mounts = append(mounts, mcfg) + } + + return mounts, nil +} + +func (h *volumeHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + volumes := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup).Volumes + mounts := h.runner.hookResources.getMounts() + + hostVolumes := h.runner.clientConfig.Node.HostVolumes + + // Always validate volumes to ensure that we do not allow volumes to be used + // if a host is restarted and loses the host volume configuration. + if err := validateHostVolumes(volumes, hostVolumes); err != nil { + h.logger.Error("Requested Host Volume does not exist", "existing", hostVolumes, "requested", volumes) + return fmt.Errorf("host volume validation error: %v", err) + } + + requestedMounts, err := h.hostVolumeMountConfigurations(req.Task.VolumeMounts, volumes, hostVolumes) + if err != nil { + h.logger.Error("Failed to generate volume mounts", "error", err) + return err + } + + // Because this hook is also ran on restores, we only add mounts that do not + // already exist. Although this loop is somewhat expensive, there are only + // a small number of mounts that exist within most individual tasks. We may + // want to revisit this using a `hookdata` param to be "mount only once" +REQUESTED: + for _, m := range requestedMounts { + for _, em := range mounts { + if em.IsEqual(m) { + continue REQUESTED + } + } + + mounts = append(mounts, m) + } + + h.runner.hookResources.setMounts(mounts) + return nil +} diff --git a/plugins/drivers/driver.go b/plugins/drivers/driver.go index f2e77005063..a1b8f2f3ee9 100644 --- a/plugins/drivers/driver.go +++ b/plugins/drivers/driver.go @@ -362,6 +362,12 @@ type MountConfig struct { Readonly bool } +func (m *MountConfig) IsEqual(o *MountConfig) bool { + return m.TaskPath == o.TaskPath && + m.HostPath == o.HostPath && + m.Readonly == o.Readonly +} + func (m *MountConfig) Copy() *MountConfig { if m == nil { return nil From 0f5cf5fa91455a3e86e516a60d97de82cc1a7bfe Mon Sep 17 00:00:00 2001 From: Danielle Date: Wed, 31 Jul 2019 18:39:08 +0200 Subject: [PATCH 07/17] Update scheduler/feasible.go Co-Authored-By: Mahmood Ali --- scheduler/feasible.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index efad1754a41..505ad32084b 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -117,7 +117,7 @@ func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest // Convert the map from map[DesiredName]Request to map[Source][]Request to improve // lookup performance. Also filter non-host volumes. for _, req := range volumes { - if req.Volume.Type != "host" { + if req.Volume.Type != structs.VolumeTypeHost { continue } From af5d42c0580df2c7dd33d815ced9bb54397ff8a9 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 1 Aug 2019 11:33:26 +0200 Subject: [PATCH 08/17] structs: Unify Volume and VolumeRequest --- client/allocrunner/taskrunner/volume_hook.go | 22 +++--- command/agent/config.go | 6 +- command/agent/job_endpoint.go | 7 +- nomad/structs/structs.go | 4 +- nomad/structs/structs_test.go | 14 +--- nomad/structs/volumes.go | 80 ++++++++------------ scheduler/feasible.go | 18 +++-- scheduler/feasible_test.go | 6 +- 8 files changed, 67 insertions(+), 90 deletions(-) diff --git a/client/allocrunner/taskrunner/volume_hook.go b/client/allocrunner/taskrunner/volume_hook.go index d01a0da8b9d..6c548b3f7ea 100644 --- a/client/allocrunner/taskrunner/volume_hook.go +++ b/client/allocrunner/taskrunner/volume_hook.go @@ -30,20 +30,21 @@ func (*volumeHook) Name() string { return "volumes" } -func validateHostVolumes(requested map[string]*structs.VolumeRequest, client map[string]*structs.ClientHostVolumeConfig) error { +func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) error { var result error - for n, req := range requested { - if req.Volume.Type != "host" { + for n, req := range requestedByAlias { + if req.Type != structs.VolumeTypeHost { continue } cfg, err := structs.ParseHostVolumeConfig(req.Config) if err != nil { result = multierror.Append(result, fmt.Errorf("failed to parse config for %s: %v", n, err)) + continue } - _, ok := client[cfg.Source] + _, ok := clientVolumesByName[cfg.Source] if !ok { result = multierror.Append(result, fmt.Errorf("missing %s", cfg.Source)) } @@ -52,10 +53,13 @@ func validateHostVolumes(requested map[string]*structs.VolumeRequest, client map return result } -func (h *volumeHook) hostVolumeMountConfigurations(vmounts []*structs.VolumeMount, volumes map[string]*structs.VolumeRequest, client map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) { +// hostVolumeMountConfigurations takes the users requested volume mounts, +// volumes, and the client host volume configuration and converts them into a +// format that can be used by drivers. +func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeMount, taskVolumesByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) { var mounts []*drivers.MountConfig - for _, m := range vmounts { - req, ok := volumes[m.Volume] + for _, m := range taskMounts { + req, ok := taskVolumesByAlias[m.Volume] if !ok { // Should never happen unless we misvalidated on job submission return nil, fmt.Errorf("No group volume declaration found named: %s", m.Volume) @@ -66,7 +70,7 @@ func (h *volumeHook) hostVolumeMountConfigurations(vmounts []*structs.VolumeMoun return nil, fmt.Errorf("failed to parse config for %s: %v", m.Volume, err) } - hostVolume, ok := client[cfg.Source] + hostVolume, ok := clientVolumesByName[cfg.Source] if !ok { // Should never happen, but unless the client volumes were mutated during // the execution of this hook. @@ -76,7 +80,7 @@ func (h *volumeHook) hostVolumeMountConfigurations(vmounts []*structs.VolumeMoun mcfg := &drivers.MountConfig{ HostPath: hostVolume.Source, TaskPath: m.Destination, - Readonly: hostVolume.ReadOnly || req.Volume.ReadOnly || m.ReadOnly, + Readonly: hostVolume.ReadOnly || req.ReadOnly || m.ReadOnly, } mounts = append(mounts, mcfg) } diff --git a/command/agent/config.go b/command/agent/config.go index 2b500030747..435484a7794 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -1327,11 +1327,7 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { } if len(a.HostVolumes) == 0 && len(b.HostVolumes) != 0 { - cc := make([]*structs.ClientHostVolumeConfig, len(b.HostVolumes)) - for k, v := range b.HostVolumes { - cc[k] = v.Copy() - } - result.HostVolumes = cc + result.HostVolumes = structs.CopySliceClientHostVolumeConfig(b.HostVolumes) } else if len(b.HostVolumes) != 0 { result.HostVolumes = structs.HostVolumeSliceMerge(a.HostVolumes, b.HostVolumes) } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index fd857a181b2..1c2ab13486e 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -749,7 +749,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { continue } - vol := &structs.Volume{ + vol := &structs.VolumeRequest{ Name: v.Name, Type: v.Type, ReadOnly: v.ReadOnly, @@ -757,10 +757,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { Config: v.Config, } - tg.Volumes[k] = &structs.VolumeRequest{ - Volume: vol, - Config: v.Config, - } + tg.Volumes[k] = vol } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5deda22615d..ef2aa5a781b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4885,9 +4885,9 @@ func (tg *TaskGroup) Validate(j *Job) error { // Validate the Host Volumes for name, decl := range tg.Volumes { - if decl.Volume.Type != VolumeTypeHost { + if decl.Type != VolumeTypeHost { // TODO: Remove this error when adding new volume types - mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Volume.Type)) + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Type)) continue } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 5ec339bdf5e..378844ed291 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -872,9 +872,7 @@ func TestTaskGroup_Validate(t *testing.T) { tg = &TaskGroup{ Volumes: map[string]*VolumeRequest{ "foo": { - Volume: &Volume{ - Type: "nothost", - }, + Type: "nothost", Config: map[string]interface{}{ "sOuRcE": "foo", }, @@ -893,10 +891,7 @@ func TestTaskGroup_Validate(t *testing.T) { tg = &TaskGroup{ Volumes: map[string]*VolumeRequest{ "foo": { - Volume: &Volume{ - Type: "host", - }, - Config: nil, + Type: "host", }, }, Tasks: []*Task{ @@ -912,10 +907,7 @@ func TestTaskGroup_Validate(t *testing.T) { tg = &TaskGroup{ Volumes: map[string]*VolumeRequest{ "foo": { - Volume: &Volume{ - Type: "host", - }, - Config: nil, + Type: "host", }, }, Tasks: []*Task{ diff --git a/nomad/structs/volumes.go b/nomad/structs/volumes.go index 80ea52f964f..6486a1ae8aa 100644 --- a/nomad/structs/volumes.go +++ b/nomad/structs/volumes.go @@ -40,23 +40,36 @@ func CopyMapStringClientHostVolumeConfig(m map[string]*ClientHostVolumeConfig) m return nm } +func CopySliceClientHostVolumeConfig(s []*ClientHostVolumeConfig) []*ClientHostVolumeConfig { + l := len(s) + if l == 0 { + return nil + } + + ns := make([]*ClientHostVolumeConfig, l) + for idx, cfg := range s { + ns[idx] = cfg.Copy() + } + + return ns +} + func HostVolumeSliceMerge(a, b []*ClientHostVolumeConfig) []*ClientHostVolumeConfig { n := make([]*ClientHostVolumeConfig, len(a)) - seenKeys := make(map[string]struct{}, len(a)) + seenKeys := make(map[string]int, len(a)) - for k, v := range a { - if _, ok := seenKeys[v.Name]; ok { - continue - } - n[k] = v.Copy() - seenKeys[v.Name] = struct{}{} + for i, config := range a { + n[i] = config.Copy() + seenKeys[config.Name] = i } - for k, v := range b { - if _, ok := seenKeys[v.Name]; ok { + + for _, config := range b { + if fIndex, ok := seenKeys[config.Name]; ok { + n[fIndex] = config.Copy() continue } - n[k] = v.Copy() - seenKeys[v.Name] = struct{}{} + + n = append(n, config.Copy()) } return n @@ -78,8 +91,8 @@ func (h *HostVolumeConfig) Copy() *HostVolumeConfig { return nh } -// Volume is a representation of a storage volume that a TaskGroup wishes to use. -type Volume struct { +// VolumeRequest is a representation of a storage volume that a TaskGroup wishes to use. +type VolumeRequest struct { Name string Type string ReadOnly bool @@ -88,11 +101,11 @@ type Volume struct { Config map[string]interface{} } -func (v *Volume) Copy() *Volume { +func (v *VolumeRequest) Copy() *VolumeRequest { if v == nil { return nil } - nv := new(Volume) + nv := new(VolumeRequest) *nv = *v if i, err := copystructure.Copy(nv.Config); err != nil { @@ -104,13 +117,13 @@ func (v *Volume) Copy() *Volume { return nv } -func CopyMapVolumes(s map[string]*Volume) map[string]*Volume { +func CopyMapVolumeRequest(s map[string]*VolumeRequest) map[string]*VolumeRequest { if s == nil { return nil } l := len(s) - c := make(map[string]*Volume, l) + c := make(map[string]*VolumeRequest, l) for k, v := range s { c[k] = v.Copy() } @@ -147,39 +160,6 @@ func CopySliceVolumeMount(s []*VolumeMount) []*VolumeMount { return c } -type VolumeRequest struct { - Volume *Volume - Config map[string]interface{} -} - -func (h *VolumeRequest) Copy() *VolumeRequest { - if h == nil { - return nil - } - - c := new(VolumeRequest) - c.Volume = h.Volume.Copy() - if i, err := copystructure.Copy(h.Config); err != nil { - panic(err.Error()) - } else { - c.Config = i.(map[string]interface{}) - } - return c -} - -func CopyMapVolumeRequest(m map[string]*VolumeRequest) map[string]*VolumeRequest { - if m == nil { - return nil - } - - l := len(m) - c := make(map[string]*VolumeRequest, l) - for k, v := range m { - c[k] = v.Copy() - } - return c -} - func ParseHostVolumeConfig(m map[string]interface{}) (*HostVolumeConfig, error) { var c HostVolumeConfig err := mapstructure.Decode(m, &c) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 505ad32084b..15f444acea6 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -99,7 +99,10 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { // HostVolumeChecker is a FeasibilityChecker which returns whether a node has // the host volumes necessary to schedule a task group. type HostVolumeChecker struct { - ctx Context + ctx Context + + // volumes is a map[HostVolumeName][]RequestedVolume. The requested volumes are + // a slice because a single task group may request the same volume multiple times. volumes map[string][]*structs.VolumeRequest } @@ -117,7 +120,7 @@ func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest // Convert the map from map[DesiredName]Request to map[Source][]Request to improve // lookup performance. Also filter non-host volumes. for _, req := range volumes { - if req.Volume.Type != structs.VolumeTypeHost { + if req.Type != structs.VolumeTypeHost { continue } @@ -142,11 +145,16 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool { } func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { - hLen := len(h.volumes) - nLen := len(n.HostVolumes) + rLen := len(h.volumes) + hLen := len(n.HostVolumes) + + // Fast path: Requested no volumes. No need to check further. + if rLen == 0 { + return true + } // Fast path: Requesting more volumes than the node has, can't meet the criteria. - if hLen > nLen { + if rLen > hLen { return false } diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index a3abe6ed879..0c74aaf7290 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -109,15 +109,15 @@ func TestHostVolumeChecker(t *testing.T) { volumes := map[string]*structs.VolumeRequest{ "foo": { - Volume: &structs.Volume{Type: "host"}, + Type: "host", Config: map[string]interface{}{"source": "foo"}, }, "bar": { - Volume: &structs.Volume{Type: "host"}, + Type: "host", Config: map[string]interface{}{"source": "bar"}, }, "baz": { - Volume: &structs.Volume{Type: "nothost"}, + Type: "nothost", Config: map[string]interface{}{"source": "baz"}, }, } From 7b7be83aefbd89ccc6b05cea842bcc9c51e793df Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 1 Aug 2019 11:55:42 +0200 Subject: [PATCH 09/17] HostVolumeConfig: Source -> Path --- client/allocrunner/taskrunner/volume_hook.go | 2 +- command/agent/config_parse_test.go | 2 +- command/agent/testdata/basic.hcl | 2 +- command/agent/testdata/basic.json | 2 +- nomad/structs/volumes.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client/allocrunner/taskrunner/volume_hook.go b/client/allocrunner/taskrunner/volume_hook.go index 6c548b3f7ea..b2c0ab3e701 100644 --- a/client/allocrunner/taskrunner/volume_hook.go +++ b/client/allocrunner/taskrunner/volume_hook.go @@ -78,7 +78,7 @@ func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeM } mcfg := &drivers.MountConfig{ - HostPath: hostVolume.Source, + HostPath: hostVolume.Path, TaskPath: m.Destination, Readonly: hostVolume.ReadOnly || req.ReadOnly || m.ReadOnly, } diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index cbb57b83386..362bddb92c0 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -83,7 +83,7 @@ var basicConfig = &Config{ NoHostUUID: helper.BoolToPtr(false), DisableRemoteExec: true, HostVolumes: []*structs.ClientHostVolumeConfig{ - {Name: "tmp", Source: "/tmp"}, + {Name: "tmp", Path: "/tmp"}, }, }, Server: &ServerConfig{ diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 68176295e56..03ce1c7499f 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -91,7 +91,7 @@ client { disable_remote_exec = true host_volume "tmp" { - source = "/tmp" + path = "/tmp" } } diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index f855dd3976f..6eedcb85f59 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -55,7 +55,7 @@ { "tmp": [ { - "source": "/tmp" + "path": "/tmp" } ] } diff --git a/nomad/structs/volumes.go b/nomad/structs/volumes.go index 6486a1ae8aa..a1fdfc6d3f3 100644 --- a/nomad/structs/volumes.go +++ b/nomad/structs/volumes.go @@ -12,7 +12,7 @@ const ( // ClientHostVolumeConfig is used to configure access to host paths on a Nomad Client type ClientHostVolumeConfig struct { Name string `hcl:",key"` - Source string `hcl:"source"` + Path string `hcl:"path"` ReadOnly bool `hcl:"read_only"` Hidden bool `hcl:"hidden"` } From 869f1da310796cb897f3f70296c41a0715bc1e4c Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Mon, 5 Aug 2019 13:33:02 +0200 Subject: [PATCH 10/17] structs: Document VolumeMount --- nomad/structs/volumes.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nomad/structs/volumes.go b/nomad/structs/volumes.go index a1fdfc6d3f3..519c4e42e97 100644 --- a/nomad/structs/volumes.go +++ b/nomad/structs/volumes.go @@ -130,7 +130,8 @@ func CopyMapVolumeRequest(s map[string]*VolumeRequest) map[string]*VolumeRequest return c } -// VolumeMount is ... +// VolumeMount represents the relationship between a destination path in a task +// and the task group volume that should be mounted there. type VolumeMount struct { Volume string Destination string From 346444be357d6bfc7ee882b23bae7303e514d1a8 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 15:43:44 +0200 Subject: [PATCH 11/17] api: Add HostVolumeInfo to response parsing --- api/nodes.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/api/nodes.go b/api/nodes.go index ed768760e68..66c3906f38b 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -436,6 +436,13 @@ type DriverInfo struct { UpdateTime time.Time } +// HostVolumeInfo is used to return metadata about a given HostVolume. +type HostVolumeInfo struct { + Source string + ReadOnly bool + Hidden bool +} + // Node is used to deserialize a node entry. type Node struct { ID string @@ -459,6 +466,7 @@ type Node struct { StatusUpdatedAt int64 Events []*NodeEvent Drivers map[string]*DriverInfo + HostVolumes map[string]*HostVolumeInfo CreateIndex uint64 ModifyIndex uint64 } From f478c8cc24c50c8efd8bbc399812518363e206d5 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 15:44:19 +0200 Subject: [PATCH 12/17] cli: Display host volume info in nomad node status --- command/node_status.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/command/node_status.go b/command/node_status.go index 934ef2f97b0..54303935a1a 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -299,6 +299,16 @@ func nodeDrivers(n *api.Node) []string { return drivers } +func nodeVolumeNames(n *api.Node) []string { + var volumes []string + for name, _ := range n.HostVolumes { + volumes = append(volumes, name) + } + + sort.Strings(volumes) + return volumes +} + func formatDrain(n *api.Node) string { if n.DrainStrategy != nil { b := new(strings.Builder) @@ -333,6 +343,7 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } if c.short { + basic = append(basic, fmt.Sprintf("Host Volumes|%s", strings.Join(nodeVolumeNames(node), ","))) basic = append(basic, fmt.Sprintf("Drivers|%s", strings.Join(nodeDrivers(node), ","))) c.Ui.Output(c.Colorize().Color(formatKV(basic))) } else { @@ -347,6 +358,11 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { basic = append(basic, fmt.Sprintf("Uptime|%s", uptime.String())) } + // Emit the volume info + if !c.verbose { + basic = append(basic, fmt.Sprintf("Host Volumes|%s", strings.Join(nodeVolumeNames(node), ","))) + } + // Emit the driver info if !c.verbose { driverStatus := fmt.Sprintf("Driver Status| %s", c.outputTruncatedNodeDriverInfo(node)) @@ -356,6 +372,7 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { c.Ui.Output(c.Colorize().Color(formatKV(basic))) if c.verbose { + c.outputNodeVolumeInfo(node) c.outputNodeDriverInfo(node) } @@ -443,6 +460,25 @@ func (c *NodeStatusCommand) outputTruncatedNodeDriverInfo(node *api.Node) string return strings.Trim(strings.Join(drivers, ","), ", ") } +func (c *NodeStatusCommand) outputNodeVolumeInfo(node *api.Node) { + c.Ui.Output(c.Colorize().Color("\n[bold]Host Volumes")) + + names := make([]string, 0, len(node.HostVolumes)) + for name := range node.HostVolumes { + names = append(names, name) + } + sort.Strings(names) + + output := make([]string, 0, len(names)+1) + output = append(output, "Name|ReadOnly|Hidden|Source") + + for _, volName := range names { + info := node.HostVolumes[volName] + output = append(output, fmt.Sprintf("%s|%v|%v|%s", volName, info.ReadOnly, info.Hidden, info.Source)) + } + c.Ui.Output(formatList(output)) +} + func (c *NodeStatusCommand) outputNodeDriverInfo(node *api.Node) { c.Ui.Output(c.Colorize().Color("\n[bold]Drivers")) From a86a07770e272c22f9644dbe4007717b18ad6bea Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Fri, 9 Aug 2019 12:47:09 +0200 Subject: [PATCH 13/17] command: Cleanup node-status --- command/node_status.go | 152 +++++++++++++++++++++++------------------ 1 file changed, 84 insertions(+), 68 deletions(-) diff --git a/command/node_status.go b/command/node_status.go index 54303935a1a..0199f7b242b 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -301,7 +301,7 @@ func nodeDrivers(n *api.Node) []string { func nodeVolumeNames(n *api.Node) []string { var volumes []string - for name, _ := range n.HostVolumes { + for name := range n.HostVolumes { volumes = append(volumes, name) } @@ -346,88 +346,104 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { basic = append(basic, fmt.Sprintf("Host Volumes|%s", strings.Join(nodeVolumeNames(node), ","))) basic = append(basic, fmt.Sprintf("Drivers|%s", strings.Join(nodeDrivers(node), ","))) c.Ui.Output(c.Colorize().Color(formatKV(basic))) - } else { - // Get the host stats - hostStats, nodeStatsErr := client.Nodes().Stats(node.ID, nil) - if nodeStatsErr != nil { - c.Ui.Output("") - c.Ui.Error(fmt.Sprintf("error fetching node stats: %v", nodeStatsErr)) - } - if hostStats != nil { - uptime := time.Duration(hostStats.Uptime * uint64(time.Second)) - basic = append(basic, fmt.Sprintf("Uptime|%s", uptime.String())) - } - // Emit the volume info - if !c.verbose { - basic = append(basic, fmt.Sprintf("Host Volumes|%s", strings.Join(nodeVolumeNames(node), ","))) + // Output alloc info + if err := c.outputAllocInfo(client, node); err != nil { + c.Ui.Error(fmt.Sprintf("%s", err)) + return 1 } - // Emit the driver info - if !c.verbose { - driverStatus := fmt.Sprintf("Driver Status| %s", c.outputTruncatedNodeDriverInfo(node)) - basic = append(basic, driverStatus) - } + return 0 + } - c.Ui.Output(c.Colorize().Color(formatKV(basic))) + // Get the host stats + hostStats, nodeStatsErr := client.Nodes().Stats(node.ID, nil) + if nodeStatsErr != nil { + c.Ui.Output("") + c.Ui.Error(fmt.Sprintf("error fetching node stats: %v", nodeStatsErr)) + } + if hostStats != nil { + uptime := time.Duration(hostStats.Uptime * uint64(time.Second)) + basic = append(basic, fmt.Sprintf("Uptime|%s", uptime.String())) + } - if c.verbose { - c.outputNodeVolumeInfo(node) - c.outputNodeDriverInfo(node) - } + // When we're not running in verbose mode, then also include host volumes and + // driver info in the basic output + if !c.verbose { + basic = append(basic, fmt.Sprintf("Host Volumes|%s", strings.Join(nodeVolumeNames(node), ","))) - // Emit node events - c.outputNodeStatusEvents(node) + driverStatus := fmt.Sprintf("Driver Status| %s", c.outputTruncatedNodeDriverInfo(node)) + basic = append(basic, driverStatus) + } - // Get list of running allocations on the node - runningAllocs, err := getRunningAllocs(client, node.ID) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error querying node for running allocations: %s", err)) - return 1 - } + // Output the basic info + c.Ui.Output(c.Colorize().Color(formatKV(basic))) - allocatedResources := getAllocatedResources(client, runningAllocs, node) - c.Ui.Output(c.Colorize().Color("\n[bold]Allocated Resources[reset]")) - c.Ui.Output(formatList(allocatedResources)) + // If we're running in verbose mode, include full host volume and driver info + if c.verbose { + c.outputNodeVolumeInfo(node) + c.outputNodeDriverInfo(node) + } - actualResources, err := getActualResources(client, runningAllocs, node) - if err == nil { - c.Ui.Output(c.Colorize().Color("\n[bold]Allocation Resource Utilization[reset]")) - c.Ui.Output(formatList(actualResources)) - } + // Emit node events + c.outputNodeStatusEvents(node) - hostResources, err := getHostResources(hostStats, node) - if err != nil { - c.Ui.Output("") - c.Ui.Error(fmt.Sprintf("error fetching node stats: %v", err)) - } - if err == nil { - c.Ui.Output(c.Colorize().Color("\n[bold]Host Resource Utilization[reset]")) - c.Ui.Output(formatList(hostResources)) - } + // Get list of running allocations on the node + runningAllocs, err := getRunningAllocs(client, node.ID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying node for running allocations: %s", err)) + return 1 + } - if err == nil && node.NodeResources != nil && len(node.NodeResources.Devices) > 0 { - c.Ui.Output(c.Colorize().Color("\n[bold]Device Resource Utilization[reset]")) - c.Ui.Output(formatList(getDeviceResourcesForNode(hostStats.DeviceStats, node))) - } - if hostStats != nil && c.stats { - c.Ui.Output(c.Colorize().Color("\n[bold]CPU Stats[reset]")) - c.printCpuStats(hostStats) - c.Ui.Output(c.Colorize().Color("\n[bold]Memory Stats[reset]")) - c.printMemoryStats(hostStats) - c.Ui.Output(c.Colorize().Color("\n[bold]Disk Stats[reset]")) - c.printDiskStats(hostStats) - if len(hostStats.DeviceStats) > 0 { - c.Ui.Output(c.Colorize().Color("\n[bold]Device Stats[reset]")) - printDeviceStats(c.Ui, hostStats.DeviceStats) - } + allocatedResources := getAllocatedResources(client, runningAllocs, node) + c.Ui.Output(c.Colorize().Color("\n[bold]Allocated Resources[reset]")) + c.Ui.Output(formatList(allocatedResources)) + + actualResources, err := getActualResources(client, runningAllocs, node) + if err == nil { + c.Ui.Output(c.Colorize().Color("\n[bold]Allocation Resource Utilization[reset]")) + c.Ui.Output(formatList(actualResources)) + } + + hostResources, err := getHostResources(hostStats, node) + if err != nil { + c.Ui.Output("") + c.Ui.Error(fmt.Sprintf("error fetching node stats: %v", err)) + } + if err == nil { + c.Ui.Output(c.Colorize().Color("\n[bold]Host Resource Utilization[reset]")) + c.Ui.Output(formatList(hostResources)) + } + + if err == nil && node.NodeResources != nil && len(node.NodeResources.Devices) > 0 { + c.Ui.Output(c.Colorize().Color("\n[bold]Device Resource Utilization[reset]")) + c.Ui.Output(formatList(getDeviceResourcesForNode(hostStats.DeviceStats, node))) + } + if hostStats != nil && c.stats { + c.Ui.Output(c.Colorize().Color("\n[bold]CPU Stats[reset]")) + c.printCpuStats(hostStats) + c.Ui.Output(c.Colorize().Color("\n[bold]Memory Stats[reset]")) + c.printMemoryStats(hostStats) + c.Ui.Output(c.Colorize().Color("\n[bold]Disk Stats[reset]")) + c.printDiskStats(hostStats) + if len(hostStats.DeviceStats) > 0 { + c.Ui.Output(c.Colorize().Color("\n[bold]Device Stats[reset]")) + printDeviceStats(c.Ui, hostStats.DeviceStats) } } + if err := c.outputAllocInfo(client, node); err != nil { + c.Ui.Error(fmt.Sprintf("%s", err)) + return 1 + } + + return 0 +} + +func (c *NodeStatusCommand) outputAllocInfo(client *api.Client, node *api.Node) error { nodeAllocs, _, err := client.Nodes().Allocations(node.ID, nil) if err != nil { - c.Ui.Error(fmt.Sprintf("Error querying node allocations: %s", err)) - return 1 + return fmt.Errorf("Error querying node allocations: %s", err) } c.Ui.Output(c.Colorize().Color("\n[bold]Allocations[reset]")) @@ -438,8 +454,8 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { c.formatDeviceAttributes(node) c.formatMeta(node) } - return 0 + return nil } func (c *NodeStatusCommand) outputTruncatedNodeDriverInfo(node *api.Node) string { From 065688da40053c99730a06c6b4bc052008ac919a Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Wed, 8 May 2019 13:14:24 +0200 Subject: [PATCH 14/17] acl: Add HostVolume ACLs This adds an initial implementation of ACLs for HostVolumes. Because HostVolumes are a cluster-wide resource, they cannot be tied to a namespace, thus here we allow similar wildcard definitions based on their names, tied to a set of capabilities. Initially, the only available capabilities are deny, or mount. These may be extended in the future to allow read-fs, mount-readonly and similar capabilities. --- acl/acl.go | 135 ++++++++++++++++++++++++++++++++++++++++----- acl/acl_test.go | 56 ++++++++++++++++++- acl/policy.go | 79 ++++++++++++++++++++++++-- acl/policy_test.go | 28 ++++++++++ 4 files changed, 276 insertions(+), 22 deletions(-) diff --git a/acl/acl.go b/acl/acl.go index f5a76ea03c4..11aee174c13 100644 --- a/acl/acl.go +++ b/acl/acl.go @@ -51,6 +51,13 @@ type ACL struct { // We use an iradix for the purposes of ordered iteration. wildcardNamespaces *iradix.Tree + // hostVolumes maps a named host volume to a capabilitySet + hostVolumes *iradix.Tree + + // wildcardHostVolumes maps a glob pattern of host volume names to a capabilitySet + // We use an iradix for the purposes of ordered iteration. + wildcardHostVolumes *iradix.Tree + agent string node string operator string @@ -83,6 +90,8 @@ func NewACL(management bool, policies []*Policy) (*ACL, error) { acl := &ACL{} nsTxn := iradix.New().Txn() wnsTxn := iradix.New().Txn() + hvTxn := iradix.New().Txn() + whvTxn := iradix.New().Txn() for _, policy := range policies { NAMESPACES: @@ -128,6 +137,49 @@ func NewACL(management bool, policies []*Policy) (*ACL, error) { } } + HOSTVOLUMES: + for _, hv := range policy.HostVolumes { + // Should the volume be matched using a glob? + globDefinition := strings.Contains(hv.Name, "*") + + // Check for existing capabilities + var capabilities capabilitySet + + if globDefinition { + raw, ok := whvTxn.Get([]byte(hv.Name)) + if ok { + capabilities = raw.(capabilitySet) + } else { + capabilities = make(capabilitySet) + whvTxn.Insert([]byte(hv.Name), capabilities) + } + } else { + raw, ok := hvTxn.Get([]byte(hv.Name)) + if ok { + capabilities = raw.(capabilitySet) + } else { + capabilities = make(capabilitySet) + hvTxn.Insert([]byte(hv.Name), capabilities) + } + } + + // Deny always takes precedence + if capabilities.Check(HostVolumeCapabilityDeny) { + continue + } + + // Add in all the capabilities + for _, cap := range hv.Capabilities { + if cap == HostVolumeCapabilityDeny { + // Overwrite any existing capabilities + capabilities.Clear() + capabilities.Set(HostVolumeCapabilityDeny) + continue HOSTVOLUMES + } + capabilities.Set(cap) + } + } + // Take the maximum privilege for agent, node, and operator if policy.Agent != nil { acl.agent = maxPrivilege(acl.agent, policy.Agent.Policy) @@ -146,6 +198,9 @@ func NewACL(management bool, policies []*Policy) (*ACL, error) { // Finalize the namespaces acl.namespaces = nsTxn.Commit() acl.wildcardNamespaces = wnsTxn.Commit() + acl.hostVolumes = hvTxn.Commit() + acl.wildcardHostVolumes = whvTxn.Commit() + return acl, nil } @@ -162,7 +217,7 @@ func (a *ACL) AllowNamespaceOperation(ns string, op string) bool { } // Check for a matching capability set - capabilities, ok := a.matchingCapabilitySet(ns) + capabilities, ok := a.matchingNamespaceCapabilitySet(ns) if !ok { return false } @@ -179,7 +234,45 @@ func (a *ACL) AllowNamespace(ns string) bool { } // Check for a matching capability set - capabilities, ok := a.matchingCapabilitySet(ns) + capabilities, ok := a.matchingNamespaceCapabilitySet(ns) + if !ok { + return false + } + + // Check if the capability has been granted + if len(capabilities) == 0 { + return false + } + + return !capabilities.Check(PolicyDeny) +} + +// AllowHostVolumeOperation checks if a given operation is allowed for a host volume +func (a *ACL) AllowHostVolumeOperation(hv string, op string) bool { + // Hot path management tokens + if a.management { + return true + } + + // Check for a matching capability set + capabilities, ok := a.matchingHostVolumeCapabilitySet(hv) + if !ok { + return false + } + + // Check if the capability has been granted + return capabilities.Check(op) +} + +// AllowHostVolume checks if any operations are allowed for a HostVolume +func (a *ACL) AllowHostVolume(ns string) bool { + // Hot path management tokens + if a.management { + return true + } + + // Check for a matching capability set + capabilities, ok := a.matchingHostVolumeCapabilitySet(ns) if !ok { return false } @@ -192,12 +285,12 @@ func (a *ACL) AllowNamespace(ns string) bool { return !capabilities.Check(PolicyDeny) } -// matchingCapabilitySet looks for a capabilitySet that matches the namespace, +// matchingNamespaceCapabilitySet looks for a capabilitySet that matches the namespace, // if no concrete definitions are found, then we return the closest matching // glob. // The closest matching glob is the one that has the smallest character // difference between the namespace and the glob. -func (a *ACL) matchingCapabilitySet(ns string) (capabilitySet, bool) { +func (a *ACL) matchingNamespaceCapabilitySet(ns string) (capabilitySet, bool) { // Check for a concrete matching capability set raw, ok := a.namespaces.Get([]byte(ns)) if ok { @@ -205,18 +298,34 @@ func (a *ACL) matchingCapabilitySet(ns string) (capabilitySet, bool) { } // We didn't find a concrete match, so lets try and evaluate globs. - return a.findClosestMatchingGlob(ns) + return a.findClosestMatchingGlob(a.wildcardNamespaces, ns) +} + +// matchingHostVolumeCapabilitySet looks for a capabilitySet that matches the host volume name, +// if no concrete definitions are found, then we return the closest matching +// glob. +// The closest matching glob is the one that has the smallest character +// difference between the volume name and the glob. +func (a *ACL) matchingHostVolumeCapabilitySet(name string) (capabilitySet, bool) { + // Check for a concrete matching capability set + raw, ok := a.hostVolumes.Get([]byte(name)) + if ok { + return raw.(capabilitySet), true + } + + // We didn't find a concrete match, so lets try and evaluate globs. + return a.findClosestMatchingGlob(a.wildcardHostVolumes, name) } type matchingGlob struct { - ns string + name string difference int capabilitySet capabilitySet } -func (a *ACL) findClosestMatchingGlob(ns string) (capabilitySet, bool) { +func (a *ACL) findClosestMatchingGlob(radix *iradix.Tree, ns string) (capabilitySet, bool) { // First, find all globs that match. - matchingGlobs := a.findAllMatchingWildcards(ns) + matchingGlobs := findAllMatchingWildcards(radix, ns) // If none match, let's return. if len(matchingGlobs) == 0 { @@ -238,19 +347,19 @@ func (a *ACL) findClosestMatchingGlob(ns string) (capabilitySet, bool) { return matchingGlobs[0].capabilitySet, true } -func (a *ACL) findAllMatchingWildcards(ns string) []matchingGlob { +func findAllMatchingWildcards(radix *iradix.Tree, name string) []matchingGlob { var matches []matchingGlob - nsLen := len(ns) + nsLen := len(name) - a.wildcardNamespaces.Root().Walk(func(bk []byte, iv interface{}) bool { + radix.Root().Walk(func(bk []byte, iv interface{}) bool { k := string(bk) v := iv.(capabilitySet) - isMatch := glob.Glob(k, ns) + isMatch := glob.Glob(k, name) if isMatch { pair := matchingGlob{ - ns: k, + name: k, difference: nsLen - len(k) + strings.Count(k, glob.GLOB), capabilitySet: v, } diff --git a/acl/acl_test.go b/acl/acl_test.go index 52320823397..b819bc8afce 100644 --- a/acl/acl_test.go +++ b/acl/acl_test.go @@ -314,6 +314,56 @@ func TestWildcardNamespaceMatching(t *testing.T) { } } +func TestWildcardHostVolumeMatching(t *testing.T) { + tests := []struct { + Policy string + Allow bool + }{ + { // Wildcard matches + Policy: `host_volume "prod-api-*" { policy = "write" }`, + Allow: true, + }, + { // Non globbed volumes are not wildcards + Policy: `host_volume "prod-api" { policy = "write" }`, + Allow: false, + }, + { // Concrete matches take precedence + Policy: `host_volume "prod-api-services" { policy = "deny" } + host_volume "prod-api-*" { policy = "write" }`, + Allow: false, + }, + { + Policy: `host_volume "prod-api-*" { policy = "deny" } + host_volume "prod-api-services" { policy = "write" }`, + Allow: true, + }, + { // The closest character match wins + Policy: `host_volume "*-api-services" { policy = "deny" } + host_volume "prod-api-*" { policy = "write" }`, // 4 vs 8 chars + Allow: false, + }, + { + Policy: `host_volume "prod-api-*" { policy = "write" } + host_volume "*-api-services" { policy = "deny" }`, // 4 vs 8 chars + Allow: false, + }, + } + + for _, tc := range tests { + t.Run(tc.Policy, func(t *testing.T) { + assert := assert.New(t) + + policy, err := Parse(tc.Policy) + assert.NoError(err) + assert.NotNil(policy.HostVolumes) + + acl, err := NewACL(false, []*Policy{policy}) + assert.Nil(err) + + assert.Equal(tc.Allow, acl.AllowHostVolume("prod-api-services")) + }) + } +} func TestACL_matchingCapabilitySet_returnsAllMatches(t *testing.T) { tests := []struct { Policy string @@ -351,8 +401,8 @@ func TestACL_matchingCapabilitySet_returnsAllMatches(t *testing.T) { assert.Nil(err) var namespaces []string - for _, cs := range acl.findAllMatchingWildcards(tc.NS) { - namespaces = append(namespaces, cs.ns) + for _, cs := range findAllMatchingWildcards(acl.wildcardNamespaces, tc.NS) { + namespaces = append(namespaces, cs.name) } assert.Equal(tc.MatchingGlobs, namespaces) @@ -404,7 +454,7 @@ func TestACL_matchingCapabilitySet_difference(t *testing.T) { acl, err := NewACL(false, []*Policy{policy}) assert.Nil(err) - matches := acl.findAllMatchingWildcards(tc.NS) + matches := findAllMatchingWildcards(acl.wildcardNamespaces, tc.NS) assert.Equal(tc.Difference, matches[0].difference) }) } diff --git a/acl/policy.go b/acl/policy.go index b6efaa642ca..3743a74c8e0 100644 --- a/acl/policy.go +++ b/acl/policy.go @@ -21,6 +21,7 @@ const ( // The Policy stanza is a short hand for granting several of these. When capabilities are // combined we take the union of all capabilities. If the deny capability is present, it // takes precedence and overwrites all other capabilities. + NamespaceCapabilityDeny = "deny" NamespaceCapabilityListJobs = "list-jobs" NamespaceCapabilityReadJob = "read-job" @@ -38,20 +39,36 @@ var ( validNamespace = regexp.MustCompile("^[a-zA-Z0-9-*]{1,128}$") ) +const ( + // The following are the fine-grained capabilities that can be granted for a volume set. + // The Policy stanza is a short hand for granting several of these. When capabilities are + // combined we take the union of all capabilities. If the deny capability is present, it + // takes precedence and overwrites all other capabilities. + + HostVolumeCapabilityDeny = "deny" + HostVolumeCapabilityMount = "mount" +) + +var ( + validVolume = regexp.MustCompile("^[a-zA-Z0-9-*]{1,128}$") +) + // Policy represents a parsed HCL or JSON policy. type Policy struct { - Namespaces []*NamespacePolicy `hcl:"namespace,expand"` - Agent *AgentPolicy `hcl:"agent"` - Node *NodePolicy `hcl:"node"` - Operator *OperatorPolicy `hcl:"operator"` - Quota *QuotaPolicy `hcl:"quota"` - Raw string `hcl:"-"` + Namespaces []*NamespacePolicy `hcl:"namespace,expand"` + HostVolumes []*HostVolumePolicy `hcl:"host_volume,expand"` + Agent *AgentPolicy `hcl:"agent"` + Node *NodePolicy `hcl:"node"` + Operator *OperatorPolicy `hcl:"operator"` + Quota *QuotaPolicy `hcl:"quota"` + Raw string `hcl:"-"` } // IsEmpty checks to make sure that at least one policy has been set and is not // comprised of only a raw policy. func (p *Policy) IsEmpty() bool { return len(p.Namespaces) == 0 && + len(p.HostVolumes) == 0 && p.Agent == nil && p.Node == nil && p.Operator == nil && @@ -65,6 +82,13 @@ type NamespacePolicy struct { Capabilities []string } +// HostVolumePolicy is the policy for a specific named host volume +type HostVolumePolicy struct { + Name string `hcl:",key"` + Policy string + Capabilities []string +} + type AgentPolicy struct { Policy string } @@ -134,6 +158,28 @@ func expandNamespacePolicy(policy string) []string { } } +func isHostVolumeCapabilityValid(cap string) bool { + switch cap { + case HostVolumeCapabilityDeny, HostVolumeCapabilityMount: + return true + default: + return false + } +} + +func expandHostVolumePolicy(policy string) []string { + switch policy { + case PolicyDeny: + return []string{HostVolumeCapabilityDeny} + case PolicyRead: + return []string{HostVolumeCapabilityDeny} + case PolicyWrite: + return []string{HostVolumeCapabilityMount} + default: + return nil + } +} + // Parse is used to parse the specified ACL rules into an // intermediary set of policies, before being compiled into // the ACL @@ -178,6 +224,27 @@ func Parse(rules string) (*Policy, error) { } } + for _, hv := range p.HostVolumes { + if !validVolume.MatchString(hv.Name) { + return nil, fmt.Errorf("Invalid host volume name: %#v", hv) + } + if hv.Policy != "" && !isPolicyValid(hv.Policy) { + return nil, fmt.Errorf("Invalid host volume policy: %#v", hv) + } + for _, cap := range hv.Capabilities { + if !isHostVolumeCapabilityValid(cap) { + return nil, fmt.Errorf("Invalid host volume capability '%s': %#v", cap, hv) + } + } + + // Expand the short hand policy to the capabilities and + // add to any existing capabilities + if hv.Policy != "" { + extraCap := expandHostVolumePolicy(hv.Policy) + hv.Capabilities = append(hv.Capabilities, extraCap...) + } + } + if p.Agent != nil && !isPolicyValid(p.Agent.Policy) { return nil, fmt.Errorf("Invalid agent policy: %#v", p.Agent) } diff --git a/acl/policy_test.go b/acl/policy_test.go index 4665d0d4585..d5ef42e4be6 100644 --- a/acl/policy_test.go +++ b/acl/policy_test.go @@ -199,6 +199,34 @@ func TestParse(t *testing.T) { }, }, }, + { + ` + host_volume "production-tls-*" { + capabilities = ["mount"] + } + `, + "", + &Policy{ + HostVolumes: []*HostVolumePolicy{ + { + Name: "production-tls-*", + Policy: "", + Capabilities: []string{ + HostVolumeCapabilityMount, + }, + }, + }, + }, + }, + { + ` + host_volume "volume has a space" { + capabilities = ["mount"] + } + `, + "Invalid host volume name", + nil, + }, } for idx, tc := range tcases { From 547a454b01e852fb3d9db80081d12eaea0526b1d Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 16:32:19 +0200 Subject: [PATCH 15/17] job_endpoint: Validate volume permissions --- nomad/job_endpoint.go | 18 ++++++ nomad/job_endpoint_test.go | 118 ++++++++++++++++++++++++++++--------- nomad/mock/acl.go | 20 +++++++ 3 files changed, 129 insertions(+), 27 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 88bbd478013..f01231adbd4 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -89,6 +89,24 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) { return structs.ErrPermissionDenied } + // Validate Volume Permsissions + for _, tg := range args.Job.TaskGroups { + for _, vol := range tg.Volumes { + if vol.Type != structs.VolumeTypeHost { + return structs.ErrPermissionDenied + } + + cfg, err := structs.ParseHostVolumeConfig(vol.Config) + if err != nil { + return structs.ErrPermissionDenied + } + + if !aclObj.AllowHostVolumeOperation(cfg.Source, acl.HostVolumeCapabilityMount) { + return structs.ErrPermissionDenied + } + } + } + // Check if override is set and we do not have permissions if args.PolicyOverride { if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySentinelOverride) { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index c98084587cd..a9bfd9ae26b 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -106,44 +106,108 @@ func TestJobEndpoint_Register(t *testing.T) { func TestJobEndpoint_Register_ACL(t *testing.T) { t.Parallel() + s1, root := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) defer s1.Shutdown() - codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) - // Create the register request - job := mock.Job() - req := &structs.JobRegisterRequest{ - Job: job, - WriteRequest: structs.WriteRequest{Region: "global"}, - } + newVolumeJob := func() *structs.Job { + j := mock.Job() + tg := j.TaskGroups[0] + tg.Volumes = map[string]*structs.VolumeRequest{ + "ca-certs": { + Type: structs.VolumeTypeHost, + Config: map[string]interface{}{ + "source": "prod-ca-certs", + }, + }, + } - // Try without a token, expect failure - var resp structs.JobRegisterResponse - if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err == nil { - t.Fatalf("expected error") - } + tg.Tasks[0].VolumeMounts = []*structs.VolumeMount{ + { + Volume: "ca-certs", + Destination: "/etc/ca-certificates", + ReadOnly: true, + }, + } - // Try with a token - req.AuthToken = root.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { - t.Fatalf("err: %v", err) - } - if resp.Index == 0 { - t.Fatalf("bad index: %d", resp.Index) + return j } - // Check for the node in the FSM - state := s1.fsm.State() - ws := memdb.NewWatchSet() - out, err := state.JobByID(ws, job.Namespace, job.ID) - if err != nil { - t.Fatalf("err: %v", err) + submitJobPolicy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob, acl.NamespaceCapabilitySubmitJob}) + + submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-submit-job", submitJobPolicy) + + volumesPolicy := mock.HostVolumePolicy("prod-*", "", []string{acl.HostVolumeCapabilityMount}) + + submitJobWithVolumesToken := mock.CreatePolicyAndToken(t, s1.State(), 1002, "test-submit-volumes", submitJobPolicy+"\n"+volumesPolicy) + + cases := []struct { + Name string + Job *structs.Job + Token string + ErrExpected bool + }{ + { + Name: "without a token", + Job: mock.Job(), + Token: "", + ErrExpected: true, + }, + { + Name: "with a token", + Job: mock.Job(), + Token: root.SecretID, + ErrExpected: false, + }, + { + Name: "with a token that can submit a job, but not use a required volumes", + Job: newVolumeJob(), + Token: submitJobToken.SecretID, + ErrExpected: true, + }, + { + Name: "with a token that can submit a job, and use all required volumes", + Job: newVolumeJob(), + Token: submitJobWithVolumesToken.SecretID, + ErrExpected: false, + }, } - if out == nil { - t.Fatalf("expected job") + + for _, tt := range cases { + t.Run(tt.Name, func(t *testing.T) { + codec := rpcClient(t, s1) + req := &structs.JobRegisterRequest{ + Job: tt.Job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + req.AuthToken = tt.Token + + // Try without a token, expect failure + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + + // If we expected an error, then the job should _not_ be registered. + if tt.ErrExpected { + require.Error(t, err, "expected error") + return + } + + if !tt.ErrExpected { + require.NoError(t, err, "unexpected error") + } + + require.NotEqual(t, 0, resp.Index) + + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, tt.Job.Namespace, tt.Job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, tt.Job.TaskGroups, out.TaskGroups) + }) } } diff --git a/nomad/mock/acl.go b/nomad/mock/acl.go index 3166db8cbb1..599bed4b548 100644 --- a/nomad/mock/acl.go +++ b/nomad/mock/acl.go @@ -38,6 +38,26 @@ func NamespacePolicy(namespace string, policy string, capabilities []string) str return policyHCL } +// HostVolumePolicy is a helper for generating the policy hcl for a given +// host-volume. Either policy or capabilities may be nil but not both. +func HostVolumePolicy(vol string, policy string, capabilities []string) string { + policyHCL := fmt.Sprintf("host_volume %q {", vol) + if policy != "" { + policyHCL += fmt.Sprintf("\n\tpolicy = %q", policy) + } + if len(capabilities) != 0 { + for i, s := range capabilities { + if !strings.HasPrefix(s, "\"") { + capabilities[i] = strconv.Quote(s) + } + } + + policyHCL += fmt.Sprintf("\n\tcapabilities = [%v]", strings.Join(capabilities, ",")) + } + policyHCL += "\n}" + return policyHCL +} + // AgentPolicy is a helper for generating the hcl for a given agent policy. func AgentPolicy(policy string) string { return fmt.Sprintf("agent {\n\tpolicy = %q\n}\n", policy) From e57cf06b0d70020866c8217b0effcb80d26cc2cb Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Mon, 12 Aug 2019 15:41:14 +0200 Subject: [PATCH 16/17] fixup rebase --- jobspec/parse_task.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/jobspec/parse_task.go b/jobspec/parse_task.go index 52def23be2e..7edccb43071 100644 --- a/jobspec/parse_task.go +++ b/jobspec/parse_task.go @@ -178,8 +178,7 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) { // Parse volume mounts if o := listVal.Filter("volume_mount"); len(o.Items) > 0 { if err := parseVolumeMounts(&t.VolumeMounts, o); err != nil { - return multierror.Prefix(err, fmt.Sprintf( - "'%s', volume_mount ->", n)) + return nil, multierror.Prefix(err, "volume_mount ->") } } From c486143ced1c8a5df87cba0c70558a0555cc464f Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Mon, 12 Aug 2019 16:22:27 +0200 Subject: [PATCH 17/17] Copy documentation to api/tasks --- api/tasks.go | 7 +++++-- client/config/config.go | 2 +- jobspec/parse_group.go | 8 ++++---- jobspec/parse_test.go | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index e044f9a9fbe..b3022766202 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -362,7 +362,8 @@ func (m *MigrateStrategy) Copy() *MigrateStrategy { return nm } -type Volume struct { +// VolumeRequest is a representation of a storage volume that a TaskGroup wishes to use. +type VolumeRequest struct { Name string Type string ReadOnly bool `mapstructure:"read_only"` @@ -371,6 +372,8 @@ type Volume struct { Config map[string]interface{} } +// VolumeMount represents the relationship between a destination path in a task +// and the task group volume that should be mounted there. type VolumeMount struct { Volume string Destination string @@ -385,7 +388,7 @@ type TaskGroup struct { Affinities []*Affinity Tasks []*Task Spreads []*Spread - Volumes map[string]*Volume + Volumes map[string]*VolumeRequest RestartPolicy *RestartPolicy ReschedulePolicy *ReschedulePolicy EphemeralDisk *EphemeralDisk diff --git a/client/config/config.go b/client/config/config.go index cb7e25726e6..d6e09fde999 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -235,7 +235,7 @@ type Config struct { // notation BridgeNetworkAllocSubnet string - // HostVolumes is the set of configured host volumes + // HostVolumes is a map of the configured host volumes by name. HostVolumes map[string]*structs.ClientHostVolumeConfig } diff --git a/jobspec/parse_group.go b/jobspec/parse_group.go index d7967780984..a835732de56 100644 --- a/jobspec/parse_group.go +++ b/jobspec/parse_group.go @@ -284,8 +284,8 @@ func parseRestartPolicy(final **api.RestartPolicy, list *ast.ObjectList) error { return nil } -func parseVolumes(out *map[string]*api.Volume, list *ast.ObjectList) error { - volumes := make(map[string]*api.Volume, len(list.Items)) +func parseVolumes(out *map[string]*api.VolumeRequest, list *ast.ObjectList) error { + volumes := make(map[string]*api.VolumeRequest, len(list.Items)) for _, item := range list.Items { n := item.Keys[0].Token.Value().(string) @@ -304,7 +304,7 @@ func parseVolumes(out *map[string]*api.Volume, list *ast.ObjectList) error { return err } - // TODO(dani): FIXME: this is gross but we don't have ObjectList.Filter here + // TODO(dani): this is gross but we don't have ObjectList.Filter here var cfg map[string]interface{} if cfgI, ok := m["config"]; ok { cfgL, ok := cfgI.([]map[string]interface{}) @@ -320,7 +320,7 @@ func parseVolumes(out *map[string]*api.Volume, list *ast.ObjectList) error { } delete(m, "config") - var result api.Volume + var result api.VolumeRequest dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ WeaklyTypedInput: true, Result: &result, diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index a1107e030cf..f9f009c0702 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -112,7 +112,7 @@ func TestParse(t *testing.T) { }, }, - Volumes: map[string]*api.Volume{ + Volumes: map[string]*api.VolumeRequest{ "foo": { Name: "foo", Type: "host",