diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 8a58fa80180..80224b0ecc0 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -128,9 +128,9 @@ type allocRunner struct { // transitions. runnerHooks []interfaces.RunnerHook - // hookState is the output of allocrunner hooks - hookState *cstructs.AllocHookResources - hookStateMu sync.RWMutex + // hookResources holds the output from allocrunner hooks so that later + // allocrunner hooks or task runner hooks can read them + hookResources *cstructs.AllocHookResources // tasks are the set of task runners tasks map[string]*taskrunner.TaskRunner @@ -238,6 +238,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { serviceRegWrapper: config.ServiceRegWrapper, checkStore: config.CheckStore, getter: config.Getter, + hookResources: cstructs.NewAllocHookResources(), } // Create the logger based on the allocation ID @@ -293,6 +294,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { ShutdownDelayCtx: ar.shutdownDelayCtx, ServiceRegWrapper: ar.serviceRegWrapper, Getter: ar.getter, + AllocHookResources: ar.hookResources, } if ar.cpusetManager != nil { diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 1f68c6e8f78..5e59c659e40 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -7,40 +7,10 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocrunner/interfaces" clientconfig "github.com/hashicorp/nomad/client/config" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" ) -type hookResourceSetter interface { - GetAllocHookResources() *cstructs.AllocHookResources - SetAllocHookResources(*cstructs.AllocHookResources) -} - -type allocHookResourceSetter struct { - ar *allocRunner -} - -func (a *allocHookResourceSetter) GetAllocHookResources() *cstructs.AllocHookResources { - a.ar.hookStateMu.RLock() - defer a.ar.hookStateMu.RUnlock() - - return a.ar.hookState -} - -func (a *allocHookResourceSetter) SetAllocHookResources(res *cstructs.AllocHookResources) { - a.ar.hookStateMu.Lock() - defer a.ar.hookStateMu.Unlock() - - a.ar.hookState = res - - // Propagate to all of the TRs within the lock to ensure consistent state. - // TODO: Refactor so TR's pull state from AR? - for _, tr := range a.ar.tasks { - tr.SetAllocHookResources(res) - } -} - // allocHealthSetter is a shim to allow the alloc health watcher hook to set // and clear the alloc health without full access to the alloc runner state type allocHealthSetter struct { @@ -117,10 +87,6 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { // create network isolation setting shim ns := &allocNetworkIsolationSetter{ar: ar} - // create hook resource setting shim - hrs := &allocHookResourceSetter{ar: ar} - hrs.SetAllocHookResources(&cstructs.AllocHookResources{}) - // build the network manager nm, err := newNetworkManager(ar.Alloc(), ar.driverManager) if err != nil { @@ -166,7 +132,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { }), newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig, config.Node.Attributes), newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), - newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, hrs, ar.clientConfig.Node.SecretID), + newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID), newChecksHook(hookLogger, alloc, ar.checkStore, ar), } diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 0a09b469595..e9af3ad1b42 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -9,7 +9,9 @@ import ( hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/pluginmanager/csimanager" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" @@ -27,7 +29,7 @@ type csiHook struct { // interfaces implemented by the allocRunner rpcClient RPCer taskCapabilityGetter taskCapabilityGetter - updater hookResourceSetter + hookResources *cstructs.AllocHookResources nodeSecret string volumeRequests map[string]*volumeAndRequest @@ -44,7 +46,7 @@ type taskCapabilityGetter interface { GetTaskDriverCapabilities(string) (*drivers.Capabilities, error) } -func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook { +func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, hookResources *cstructs.AllocHookResources, nodeSecret string) *csiHook { shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background()) @@ -54,7 +56,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M csimanager: csi, rpcClient: rpcClient, taskCapabilityGetter: taskCapabilityGetter, - updater: updater, + hookResources: hookResources, nodeSecret: nodeSecret, volumeRequests: map[string]*volumeAndRequest{}, minBackoffInterval: time.Second, @@ -108,9 +110,8 @@ func (c *csiHook) Prerun() error { mounts[alias] = mountInfo } - res := c.updater.GetAllocHookResources() - res.CSIMounts = mounts - c.updater.SetAllocHookResources(res) + // make the mounts available to the taskrunner's volume_hook + c.hookResources.SetCSIMounts(mounts) return nil } diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index bb5362b9537..3aa6f6c07d1 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -203,7 +203,7 @@ func TestCSIHook(t *testing.T) { MountConfigs: drivers.MountConfigSupportAll, }, } - hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") + hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret") hook.minBackoffInterval = 1 * time.Millisecond hook.maxBackoffInterval = 10 * time.Millisecond hook.maxBackoffDuration = 500 * time.Millisecond @@ -212,11 +212,11 @@ func TestCSIHook(t *testing.T) { if tc.expectedClaimErr != nil { require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error()) - mounts := ar.GetAllocHookResources().GetCSIMounts() + mounts := ar.res.GetCSIMounts() require.Nil(t, mounts) } else { require.NoError(t, hook.Prerun()) - mounts := ar.GetAllocHookResources().GetCSIMounts() + mounts := ar.res.GetCSIMounts() require.NotNil(t, mounts) require.Equal(t, tc.expectedMounts, mounts) require.NoError(t, hook.Postrun()) @@ -308,16 +308,16 @@ func TestCSIHook_claimVolumesFromAlloc_Validation(t *testing.T) { capFunc: tc.capFunc, } - hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") + hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret") require.NotNil(t, hook) if tc.expectedClaimErr != nil { require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error()) - mounts := ar.GetAllocHookResources().GetCSIMounts() + mounts := ar.res.GetCSIMounts() require.Nil(t, mounts) } else { require.NoError(t, hook.Prerun()) - mounts := ar.GetAllocHookResources().GetCSIMounts() + mounts := ar.res.GetCSIMounts() require.NotNil(t, mounts) require.NoError(t, hook.Postrun()) } @@ -431,14 +431,6 @@ type mockAllocRunner struct { capFunc func() (*drivers.Capabilities, error) } -func (ar mockAllocRunner) GetAllocHookResources() *cstructs.AllocHookResources { - return ar.res -} - -func (ar mockAllocRunner) SetAllocHookResources(res *cstructs.AllocHookResources) { - ar.res = res -} - func (ar mockAllocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) { if ar.capFunc != nil { return ar.capFunc() diff --git a/client/allocrunner/interfaces/runner.go b/client/allocrunner/interfaces/runner.go index 4eb1fbe3c8a..9fe9ed28457 100644 --- a/client/allocrunner/interfaces/runner.go +++ b/client/allocrunner/interfaces/runner.go @@ -2,6 +2,7 @@ package interfaces import ( "github.com/hashicorp/nomad/client/allocrunner/state" + "github.com/hashicorp/nomad/client/pluginmanager/csimanager" cstructs "github.com/hashicorp/nomad/client/structs" ) @@ -32,3 +33,9 @@ type TaskStateHandler interface { type AllocStatsReporter interface { LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) } + +// HookResourceSetter is used to communicate between alloc hooks and task hooks +type HookResourceSetter interface { + SetCSIMounts(map[string]*csimanager.MountInfo) + GetCSIMounts(map[string]*csimanager.MountInfo) +} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 12fa88ed6bf..52402af0ca9 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -175,6 +175,9 @@ type TaskRunner struct { // hookResources captures the resources provided by hooks hookResources *hookResources + // allocHookResources captures the resources provided by the allocrunner hooks + allocHookResources *cstructs.AllocHookResources + // consulClient is the client used by the consul service hook for // registering services and checks consulServiceClient serviceregistration.Handler @@ -253,8 +256,6 @@ type TaskRunner struct { networkIsolationLock sync.Mutex networkIsolationSpec *drivers.NetworkIsolationSpec - allocHookResources *cstructs.AllocHookResources - // serviceRegWrapper is the handler wrapper that is used by service hooks // to perform service and check registration and deregistration. serviceRegWrapper *wrapper.HandlerWrapper @@ -329,6 +330,10 @@ type Config struct { // Getter is an interface for retrieving artifacts. Getter cinterfaces.ArtifactGetter + + // AllocHookResources is how taskrunner hooks can get state written by + // allocrunner hooks + AllocHookResources *cstructs.AllocHookResources } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -368,6 +373,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { vaultClient: config.Vault, state: tstate, localState: state.NewLocalState(), + allocHookResources: config.AllocHookResources, stateDB: config.StateDB, stateUpdater: config.StateUpdater, deviceStatsReporter: config.DeviceStatsReporter, @@ -1576,10 +1582,6 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) { return tr.driver.Capabilities() } -func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) { - tr.allocHookResources = res -} - // shutdownDelayCancel is used for testing only and cancels the // shutdownDelayCtx func (tr *TaskRunner) shutdownDelayCancel() { diff --git a/client/allocrunner/taskrunner/volume_hook_test.go b/client/allocrunner/taskrunner/volume_hook_test.go index 0bfff5edb90..7b4d27ca0ae 100644 --- a/client/allocrunner/taskrunner/volume_hook_test.go +++ b/client/allocrunner/taskrunner/volume_hook_test.go @@ -132,16 +132,15 @@ func TestVolumeHook_prepareCSIVolumes(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { tr := &TaskRunner{ - task: req.Task, - driver: tc.Driver, - allocHookResources: &cstructs.AllocHookResources{ - CSIMounts: map[string]*csimanager.MountInfo{ - "foo": { - Source: "/mnt/my-test-volume", - }, - }, - }, + task: req.Task, + driver: tc.Driver, + allocHookResources: cstructs.NewAllocHookResources(), } + tr.allocHookResources.SetCSIMounts(map[string]*csimanager.MountInfo{ + "foo": { + Source: "/mnt/my-test-volume", + }, + }) hook := &volumeHook{ logger: testlog.HCLogger(t), diff --git a/client/pluginmanager/csimanager/interface.go b/client/pluginmanager/csimanager/interface.go index 9f7e067c89e..0e4a1f03a11 100644 --- a/client/pluginmanager/csimanager/interface.go +++ b/client/pluginmanager/csimanager/interface.go @@ -13,6 +13,16 @@ type MountInfo struct { IsDevice bool } +func (mi *MountInfo) Copy() *MountInfo { + if mi == nil { + return nil + } + + nmi := new(MountInfo) + *nmi = *mi + return nmi +} + type UsageOptions struct { ReadOnly bool AttachmentMode structs.CSIVolumeAttachmentMode diff --git a/client/structs/allochook.go b/client/structs/allochook.go index 59c56c0f7e8..3a694d85abc 100644 --- a/client/structs/allochook.go +++ b/client/structs/allochook.go @@ -4,26 +4,39 @@ import ( "sync" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" + "github.com/hashicorp/nomad/helper" ) // AllocHookResources contains data that is provided by AllocRunner Hooks for -// consumption by TaskRunners +// consumption by TaskRunners. This should be instantiated once in the +// AllocRunner and then only accessed via getters and setters that hold the +// lock. type AllocHookResources struct { - CSIMounts map[string]*csimanager.MountInfo + csiMounts map[string]*csimanager.MountInfo mu sync.RWMutex } +func NewAllocHookResources() *AllocHookResources { + return &AllocHookResources{ + csiMounts: map[string]*csimanager.MountInfo{}, + } +} + +// GetCSIMounts returns a copy of the CSI mount info previously written by the +// CSI allocrunner hook func (a *AllocHookResources) GetCSIMounts() map[string]*csimanager.MountInfo { a.mu.RLock() defer a.mu.RUnlock() - return a.CSIMounts + return helper.DeepCopyMap(a.csiMounts) } +// SetCSIMounts stores the CSI mount info for later use by the volume taskrunner +// hook func (a *AllocHookResources) SetCSIMounts(m map[string]*csimanager.MountInfo) { a.mu.Lock() defer a.mu.Unlock() - a.CSIMounts = m + a.csiMounts = m }