Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for Host Volumes #5923

Merged
merged 10 commits into from
Aug 9, 2019
17 changes: 17 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,21 @@ func (m *MigrateStrategy) Copy() *MigrateStrategy {
return nm
}

type Volume struct {
Name string
Type string
ReadOnly bool `mapstructure:"read_only"`
Hidden bool

Config map[string]interface{}
}

type VolumeMount struct {
Volume string
Destination string
ReadOnly bool `mapstructure:"read_only"`
}

// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name *string
Expand All @@ -508,6 +523,7 @@ type TaskGroup struct {
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
Volumes map[string]*Volume
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Expand Down Expand Up @@ -715,6 +731,7 @@ type Task struct {
Vault *Vault
Templates []*Template
DispatchPayload *DispatchPayloadConfig
VolumeMounts []*VolumeMount
Leader bool
ShutdownDelay time.Duration `mapstructure:"shutdown_delay"`
KillSignal string `mapstructure:"kill_signal"`
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (tr *TaskRunner) initHooks() {
newTaskDirHook(tr, hookLogger),
newLogMonHook(tr.logmonHookConfig, hookLogger),
newDispatchHook(tr.Alloc(), hookLogger),
newVolumeHook(tr, hookLogger),
newArtifactHook(tr, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
Expand Down
127 changes: 127 additions & 0 deletions client/allocrunner/taskrunner/volume_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package taskrunner

import (
"context"
"fmt"

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)

type volumeHook struct {
alloc *structs.Allocation
runner *TaskRunner
logger log.Logger
}

func newVolumeHook(runner *TaskRunner, logger log.Logger) *volumeHook {
h := &volumeHook{
alloc: runner.Alloc(),
runner: runner,
}
h.logger = logger.Named(h.Name())
return h
}

func (*volumeHook) Name() string {
return "volumes"
}

func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) error {
var result error

for n, req := range requestedByAlias {
if req.Type != structs.VolumeTypeHost {
continue
}

cfg, err := structs.ParseHostVolumeConfig(req.Config)
if err != nil {
result = multierror.Append(result, fmt.Errorf("failed to parse config for %s: %v", n, err))
endocrimes marked this conversation as resolved.
Show resolved Hide resolved
continue
}

_, ok := clientVolumesByName[cfg.Source]
if !ok {
result = multierror.Append(result, fmt.Errorf("missing %s", cfg.Source))
}
}

return result
}

// hostVolumeMountConfigurations takes the users requested volume mounts,
// volumes, and the client host volume configuration and converts them into a
// format that can be used by drivers.
func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeMount, taskVolumesByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) {
var mounts []*drivers.MountConfig
for _, m := range taskMounts {
req, ok := taskVolumesByAlias[m.Volume]
if !ok {
// Should never happen unless we misvalidated on job submission
return nil, fmt.Errorf("No group volume declaration found named: %s", m.Volume)
}

cfg, err := structs.ParseHostVolumeConfig(req.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse config for %s: %v", m.Volume, err)
}

hostVolume, ok := clientVolumesByName[cfg.Source]
if !ok {
// Should never happen, but unless the client volumes were mutated during
// the execution of this hook.
return nil, fmt.Errorf("No host volume named: %s", cfg.Source)
}

mcfg := &drivers.MountConfig{
HostPath: hostVolume.Path,
TaskPath: m.Destination,
Readonly: hostVolume.ReadOnly || req.ReadOnly || m.ReadOnly,
}
mounts = append(mounts, mcfg)
}

return mounts, nil
}

func (h *volumeHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
volumes := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup).Volumes
mounts := h.runner.hookResources.getMounts()
endocrimes marked this conversation as resolved.
Show resolved Hide resolved

hostVolumes := h.runner.clientConfig.Node.HostVolumes

// Always validate volumes to ensure that we do not allow volumes to be used
// if a host is restarted and loses the host volume configuration.
if err := validateHostVolumes(volumes, hostVolumes); err != nil {
h.logger.Error("Requested Host Volume does not exist", "existing", hostVolumes, "requested", volumes)
return fmt.Errorf("host volume validation error: %v", err)
}

requestedMounts, err := h.hostVolumeMountConfigurations(req.Task.VolumeMounts, volumes, hostVolumes)
if err != nil {
h.logger.Error("Failed to generate volume mounts", "error", err)
return err
}

// Because this hook is also ran on restores, we only add mounts that do not
// already exist. Although this loop is somewhat expensive, there are only
// a small number of mounts that exist within most individual tasks. We may
// want to revisit this using a `hookdata` param to be "mount only once"
REQUESTED:
for _, m := range requestedMounts {
for _, em := range mounts {
if em.IsEqual(m) {
continue REQUESTED
}
}

mounts = append(mounts, m)
}

h.runner.hookResources.setMounts(mounts)
endocrimes marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,16 @@ func (c *Client) setupNode() error {
if node.Name == "" {
node.Name, _ = os.Hostname()
}
// TODO(dani): Fingerprint these to handle volumes that don't exist/have bad perms.
endocrimes marked this conversation as resolved.
Show resolved Hide resolved
if node.HostVolumes == nil {
if l := len(c.config.HostVolumes); l != 0 {
node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig, l)
for k, v := range c.config.HostVolumes {
node.HostVolumes[k] = v.Copy()
}
}
}

if node.Name == "" {
node.Name = node.ID
}
Expand Down
4 changes: 4 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ type Config struct {
// AutoFetchCNIDir is the destination dir to use when auto doanloading CNI plugins.
// This directory will be appended to the CNIPath so it is searched last
AutoFetchCNIDir string

// HostVolumes is the set of configured host volumes
HostVolumes map[string]*structs.ClientHostVolumeConfig
}

func (c *Config) Copy() *Config {
Expand All @@ -254,6 +257,7 @@ func (c *Config) Copy() *Config {
nc.Node = nc.Node.Copy()
nc.Servers = helper.CopySliceString(nc.Servers)
nc.Options = helper.CopyMapStringString(nc.Options)
nc.HostVolumes = structs.CopyMapStringClientHostVolumeConfig(nc.HostVolumes)
nc.ConsulConfig = c.ConsulConfig.Copy()
nc.VaultConfig = c.VaultConfig.Copy()
return nc
Expand Down
6 changes: 6 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
conf.ClientMinPort = uint(agentConfig.Client.ClientMinPort)
conf.DisableRemoteExec = agentConfig.Client.DisableRemoteExec

hvMap := make(map[string]*structs.ClientHostVolumeConfig, len(agentConfig.Client.HostVolumes))
for _, v := range agentConfig.Client.HostVolumes {
hvMap[v.Name] = v
}
conf.HostVolumes = hvMap

// Setup the node
conf.Node = new(structs.Node)
conf.Node.Datacenter = agentConfig.Datacenter
Expand Down
10 changes: 10 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ type ClientConfig struct {
// ServerJoin contains information that is used to attempt to join servers
ServerJoin *ServerJoin `hcl:"server_join"`

// HostVolumes contains information about the volumes an operator has made
// available to jobs running on this node.
HostVolumes []*structs.ClientHostVolumeConfig `hcl:"host_volume"`

// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`

Expand Down Expand Up @@ -1333,6 +1337,12 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin)
}

if len(a.HostVolumes) == 0 && len(b.HostVolumes) != 0 {
endocrimes marked this conversation as resolved.
Show resolved Hide resolved
result.HostVolumes = structs.CopySliceClientHostVolumeConfig(b.HostVolumes)
} else if len(b.HostVolumes) != 0 {
result.HostVolumes = structs.HostVolumeSliceMerge(a.HostVolumes, b.HostVolumes)
}

return &result
}

Expand Down
5 changes: 5 additions & 0 deletions command/agent/config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func extraKeys(c *Config) error {
// stats is an unused key, continue to silently ignore it
removeEqualFold(&c.Client.ExtraKeysHCL, "stats")

// Remove HostVolume extra keys
for _, hv := range c.Client.HostVolumes {
removeEqualFold(&c.Client.ExtraKeysHCL, hv.Name)
}

for _, k := range []string{"enabled_schedulers", "start_join", "retry_join", "server_join"} {
removeEqualFold(&c.ExtraKeysHCL, k)
removeEqualFold(&c.ExtraKeysHCL, "server")
Expand Down
4 changes: 4 additions & 0 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -81,6 +82,9 @@ var basicConfig = &Config{
GCMaxAllocs: 50,
NoHostUUID: helper.BoolToPtr(false),
DisableRemoteExec: true,
HostVolumes: []*structs.ClientHostVolumeConfig{
{Name: "tmp", Path: "/tmp"},
},
},
Server: &ServerConfig{
Enabled: true,
Expand Down
32 changes: 32 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (s *HTTPServer) ValidateJobRequest(resp http.ResponseWriter, req *http.Requ
}

job := ApiJobToStructJob(validateRequest.Job)

args := structs.JobValidateRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Expand Down Expand Up @@ -728,6 +729,26 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
}
}

if l := len(taskGroup.Volumes); l != 0 {
tg.Volumes = make(map[string]*structs.VolumeRequest, l)
for k, v := range taskGroup.Volumes {
if v.Type != structs.VolumeTypeHost {
// Ignore non-host volumes in this iteration currently.
continue
}

vol := &structs.VolumeRequest{
Name: v.Name,
Type: v.Type,
ReadOnly: v.ReadOnly,
Hidden: v.Hidden,
Config: v.Config,
}

tg.Volumes[k] = vol
}
}

if taskGroup.Update != nil {
tg.Update = &structs.UpdateStrategy{
Stagger: *taskGroup.Update.Stagger,
Expand Down Expand Up @@ -775,6 +796,17 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
structsTask.Constraints = ApiConstraintsToStructs(apiTask.Constraints)
structsTask.Affinities = ApiAffinitiesToStructs(apiTask.Affinities)

if l := len(apiTask.VolumeMounts); l != 0 {
structsTask.VolumeMounts = make([]*structs.VolumeMount, l)
for i, mount := range apiTask.VolumeMounts {
structsTask.VolumeMounts[i] = &structs.VolumeMount{
Volume: mount.Volume,
Destination: mount.Destination,
ReadOnly: mount.ReadOnly,
}
}
}

if l := len(apiTask.Services); l != 0 {
structsTask.Services = make([]*structs.Service, l)
for i, service := range apiTask.Services {
Expand Down
4 changes: 4 additions & 0 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ client {
gc_max_allocs = 50
no_host_uuid = false
disable_remote_exec = true

host_volume "tmp" {
path = "/tmp"
}
}

server {
Expand Down
Loading