Skip to content

Commit

Permalink
CSI: set mounts in alloc hook resources atomically (#16722)
Browse files Browse the repository at this point in the history
The allocrunner has a facility for passing data written by allocrunner hooks to
taskrunner hooks. Currently the only consumers of this facility are the
allocrunner CSI hook (which writes data) and the taskrunner volume hook (which
reads that same data).

The allocrunner hook for CSI volumes doesn't set the alloc hook resources
atomically. Instead, it gets the current resources and then writes a new version
back. Because the CSI hook is currently the only writer and all readers happen
long afterwards, this should be safe but #16623 shows there's some sequence of
events during restore where this breaks down.

Refactor hook resources so that hook data is accessed via setters and getters
that hold the mutex.
  • Loading branch information
tgross authored Apr 3, 2023
1 parent db2a4e5 commit f3fc54a
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 77 deletions.
8 changes: 5 additions & 3 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ type allocRunner struct {
// transitions.
runnerHooks []interfaces.RunnerHook

// hookState is the output of allocrunner hooks
hookState *cstructs.AllocHookResources
hookStateMu sync.RWMutex
// hookResources holds the output from allocrunner hooks so that later
// allocrunner hooks or task runner hooks can read them
hookResources *cstructs.AllocHookResources

// tasks are the set of task runners
tasks map[string]*taskrunner.TaskRunner
Expand Down Expand Up @@ -238,6 +238,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
serviceRegWrapper: config.ServiceRegWrapper,
checkStore: config.CheckStore,
getter: config.Getter,
hookResources: cstructs.NewAllocHookResources(),
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -293,6 +294,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
ShutdownDelayCtx: ar.shutdownDelayCtx,
ServiceRegWrapper: ar.serviceRegWrapper,
Getter: ar.getter,
AllocHookResources: ar.hookResources,
}

if ar.cpusetManager != nil {
Expand Down
36 changes: 1 addition & 35 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,10 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
)

type hookResourceSetter interface {
GetAllocHookResources() *cstructs.AllocHookResources
SetAllocHookResources(*cstructs.AllocHookResources)
}

type allocHookResourceSetter struct {
ar *allocRunner
}

func (a *allocHookResourceSetter) GetAllocHookResources() *cstructs.AllocHookResources {
a.ar.hookStateMu.RLock()
defer a.ar.hookStateMu.RUnlock()

return a.ar.hookState
}

func (a *allocHookResourceSetter) SetAllocHookResources(res *cstructs.AllocHookResources) {
a.ar.hookStateMu.Lock()
defer a.ar.hookStateMu.Unlock()

a.ar.hookState = res

// Propagate to all of the TRs within the lock to ensure consistent state.
// TODO: Refactor so TR's pull state from AR?
for _, tr := range a.ar.tasks {
tr.SetAllocHookResources(res)
}
}

// allocHealthSetter is a shim to allow the alloc health watcher hook to set
// and clear the alloc health without full access to the alloc runner state
type allocHealthSetter struct {
Expand Down Expand Up @@ -117,10 +87,6 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
// create network isolation setting shim
ns := &allocNetworkIsolationSetter{ar: ar}

// create hook resource setting shim
hrs := &allocHookResourceSetter{ar: ar}
hrs.SetAllocHookResources(&cstructs.AllocHookResources{})

// build the network manager
nm, err := newNetworkManager(ar.Alloc(), ar.driverManager)
if err != nil {
Expand Down Expand Up @@ -166,7 +132,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
}),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig, config.Node.Attributes),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, hrs, ar.clientConfig.Node.SecretID),
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
newChecksHook(hookLogger, alloc, ar.checkStore, ar),
}

Expand Down
13 changes: 7 additions & 6 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (

hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
Expand All @@ -27,7 +29,7 @@ type csiHook struct {
// interfaces implemented by the allocRunner
rpcClient RPCer
taskCapabilityGetter taskCapabilityGetter
updater hookResourceSetter
hookResources *cstructs.AllocHookResources

nodeSecret string
volumeRequests map[string]*volumeAndRequest
Expand All @@ -44,7 +46,7 @@ type taskCapabilityGetter interface {
GetTaskDriverCapabilities(string) (*drivers.Capabilities, error)
}

func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook {
func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, hookResources *cstructs.AllocHookResources, nodeSecret string) *csiHook {

shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background())

Expand All @@ -54,7 +56,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
csimanager: csi,
rpcClient: rpcClient,
taskCapabilityGetter: taskCapabilityGetter,
updater: updater,
hookResources: hookResources,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
minBackoffInterval: time.Second,
Expand Down Expand Up @@ -108,9 +110,8 @@ func (c *csiHook) Prerun() error {
mounts[alias] = mountInfo
}

res := c.updater.GetAllocHookResources()
res.CSIMounts = mounts
c.updater.SetAllocHookResources(res)
// make the mounts available to the taskrunner's volume_hook
c.hookResources.SetCSIMounts(mounts)

return nil
}
Expand Down
20 changes: 6 additions & 14 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestCSIHook(t *testing.T) {
MountConfigs: drivers.MountConfigSupportAll,
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret")
hook.minBackoffInterval = 1 * time.Millisecond
hook.maxBackoffInterval = 10 * time.Millisecond
hook.maxBackoffDuration = 500 * time.Millisecond
Expand All @@ -212,11 +212,11 @@ func TestCSIHook(t *testing.T) {

if tc.expectedClaimErr != nil {
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.GetAllocHookResources().GetCSIMounts()
mounts := ar.res.GetCSIMounts()
require.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
mounts := ar.res.GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)
require.NoError(t, hook.Postrun())
Expand Down Expand Up @@ -308,16 +308,16 @@ func TestCSIHook_claimVolumesFromAlloc_Validation(t *testing.T) {
capFunc: tc.capFunc,
}

hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret")
require.NotNil(t, hook)

if tc.expectedClaimErr != nil {
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.GetAllocHookResources().GetCSIMounts()
mounts := ar.res.GetCSIMounts()
require.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
mounts := ar.res.GetCSIMounts()
require.NotNil(t, mounts)
require.NoError(t, hook.Postrun())
}
Expand Down Expand Up @@ -431,14 +431,6 @@ type mockAllocRunner struct {
capFunc func() (*drivers.Capabilities, error)
}

func (ar mockAllocRunner) GetAllocHookResources() *cstructs.AllocHookResources {
return ar.res
}

func (ar mockAllocRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
ar.res = res
}

func (ar mockAllocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) {
if ar.capFunc != nil {
return ar.capFunc()
Expand Down
7 changes: 7 additions & 0 deletions client/allocrunner/interfaces/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package interfaces

import (
"github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
)

Expand Down Expand Up @@ -32,3 +33,9 @@ type TaskStateHandler interface {
type AllocStatsReporter interface {
LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error)
}

// HookResourceSetter is used to communicate between alloc hooks and task hooks
type HookResourceSetter interface {
SetCSIMounts(map[string]*csimanager.MountInfo)
GetCSIMounts(map[string]*csimanager.MountInfo)
}
14 changes: 8 additions & 6 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type TaskRunner struct {
// hookResources captures the resources provided by hooks
hookResources *hookResources

// allocHookResources captures the resources provided by the allocrunner hooks
allocHookResources *cstructs.AllocHookResources

// consulClient is the client used by the consul service hook for
// registering services and checks
consulServiceClient serviceregistration.Handler
Expand Down Expand Up @@ -253,8 +256,6 @@ type TaskRunner struct {
networkIsolationLock sync.Mutex
networkIsolationSpec *drivers.NetworkIsolationSpec

allocHookResources *cstructs.AllocHookResources

// serviceRegWrapper is the handler wrapper that is used by service hooks
// to perform service and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper
Expand Down Expand Up @@ -329,6 +330,10 @@ type Config struct {

// Getter is an interface for retrieving artifacts.
Getter cinterfaces.ArtifactGetter

// AllocHookResources is how taskrunner hooks can get state written by
// allocrunner hooks
AllocHookResources *cstructs.AllocHookResources
}

func NewTaskRunner(config *Config) (*TaskRunner, error) {
Expand Down Expand Up @@ -368,6 +373,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
vaultClient: config.Vault,
state: tstate,
localState: state.NewLocalState(),
allocHookResources: config.AllocHookResources,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
deviceStatsReporter: config.DeviceStatsReporter,
Expand Down Expand Up @@ -1586,10 +1592,6 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) {
return tr.driver.Capabilities()
}

func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
tr.allocHookResources = res
}

// shutdownDelayCancel is used for testing only and cancels the
// shutdownDelayCtx
func (tr *TaskRunner) shutdownDelayCancel() {
Expand Down
17 changes: 8 additions & 9 deletions client/allocrunner/taskrunner/volume_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,15 @@ func TestVolumeHook_prepareCSIVolumes(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {

tr := &TaskRunner{
task: req.Task,
driver: tc.Driver,
allocHookResources: &cstructs.AllocHookResources{
CSIMounts: map[string]*csimanager.MountInfo{
"foo": {
Source: "/mnt/my-test-volume",
},
},
},
task: req.Task,
driver: tc.Driver,
allocHookResources: cstructs.NewAllocHookResources(),
}
tr.allocHookResources.SetCSIMounts(map[string]*csimanager.MountInfo{
"foo": {
Source: "/mnt/my-test-volume",
},
})

hook := &volumeHook{
logger: testlog.HCLogger(t),
Expand Down
10 changes: 10 additions & 0 deletions client/pluginmanager/csimanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ type MountInfo struct {
IsDevice bool
}

func (mi *MountInfo) Copy() *MountInfo {
if mi == nil {
return nil
}

nmi := new(MountInfo)
*nmi = *mi
return nmi
}

type UsageOptions struct {
ReadOnly bool
AttachmentMode structs.CSIVolumeAttachmentMode
Expand Down
21 changes: 17 additions & 4 deletions client/structs/allochook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,39 @@ import (
"sync"

"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/helper"
)

// AllocHookResources contains data that is provided by AllocRunner Hooks for
// consumption by TaskRunners
// consumption by TaskRunners. This should be instantiated once in the
// AllocRunner and then only accessed via getters and setters that hold the
// lock.
type AllocHookResources struct {
CSIMounts map[string]*csimanager.MountInfo
csiMounts map[string]*csimanager.MountInfo

mu sync.RWMutex
}

func NewAllocHookResources() *AllocHookResources {
return &AllocHookResources{
csiMounts: map[string]*csimanager.MountInfo{},
}
}

// GetCSIMounts returns a copy of the CSI mount info previously written by the
// CSI allocrunner hook
func (a *AllocHookResources) GetCSIMounts() map[string]*csimanager.MountInfo {
a.mu.RLock()
defer a.mu.RUnlock()

return a.CSIMounts
return helper.DeepCopyMap(a.csiMounts)
}

// SetCSIMounts stores the CSI mount info for later use by the volume taskrunner
// hook
func (a *AllocHookResources) SetCSIMounts(m map[string]*csimanager.MountInfo) {
a.mu.Lock()
defer a.mu.Unlock()

a.CSIMounts = m
a.csiMounts = m
}

0 comments on commit f3fc54a

Please sign in to comment.