Skip to content

Commit

Permalink
Backport of CSI: set mounts in alloc hook resources atomically into r…
Browse files Browse the repository at this point in the history
…elease/1.4.x (#16773)

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core authored Apr 3, 2023
1 parent cf1adb0 commit b169e8d
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 77 deletions.
8 changes: 5 additions & 3 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ type allocRunner struct {
// transitions.
runnerHooks []interfaces.RunnerHook

// hookState is the output of allocrunner hooks
hookState *cstructs.AllocHookResources
hookStateMu sync.RWMutex
// hookResources holds the output from allocrunner hooks so that later
// allocrunner hooks or task runner hooks can read them
hookResources *cstructs.AllocHookResources

// tasks are the set of task runners
tasks map[string]*taskrunner.TaskRunner
Expand Down Expand Up @@ -238,6 +238,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
serviceRegWrapper: config.ServiceRegWrapper,
checkStore: config.CheckStore,
getter: config.Getter,
hookResources: cstructs.NewAllocHookResources(),
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -293,6 +294,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
ShutdownDelayCtx: ar.shutdownDelayCtx,
ServiceRegWrapper: ar.serviceRegWrapper,
Getter: ar.getter,
AllocHookResources: ar.hookResources,
}

if ar.cpusetManager != nil {
Expand Down
36 changes: 1 addition & 35 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,10 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
)

type hookResourceSetter interface {
GetAllocHookResources() *cstructs.AllocHookResources
SetAllocHookResources(*cstructs.AllocHookResources)
}

type allocHookResourceSetter struct {
ar *allocRunner
}

func (a *allocHookResourceSetter) GetAllocHookResources() *cstructs.AllocHookResources {
a.ar.hookStateMu.RLock()
defer a.ar.hookStateMu.RUnlock()

return a.ar.hookState
}

func (a *allocHookResourceSetter) SetAllocHookResources(res *cstructs.AllocHookResources) {
a.ar.hookStateMu.Lock()
defer a.ar.hookStateMu.Unlock()

a.ar.hookState = res

// Propagate to all of the TRs within the lock to ensure consistent state.
// TODO: Refactor so TR's pull state from AR?
for _, tr := range a.ar.tasks {
tr.SetAllocHookResources(res)
}
}

// allocHealthSetter is a shim to allow the alloc health watcher hook to set
// and clear the alloc health without full access to the alloc runner state
type allocHealthSetter struct {
Expand Down Expand Up @@ -117,10 +87,6 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
// create network isolation setting shim
ns := &allocNetworkIsolationSetter{ar: ar}

// create hook resource setting shim
hrs := &allocHookResourceSetter{ar: ar}
hrs.SetAllocHookResources(&cstructs.AllocHookResources{})

// build the network manager
nm, err := newNetworkManager(ar.Alloc(), ar.driverManager)
if err != nil {
Expand Down Expand Up @@ -166,7 +132,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
}),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig, config.Node.Attributes),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, hrs, ar.clientConfig.Node.SecretID),
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
newChecksHook(hookLogger, alloc, ar.checkStore, ar),
}

Expand Down
13 changes: 7 additions & 6 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (

hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
Expand All @@ -27,7 +29,7 @@ type csiHook struct {
// interfaces implemented by the allocRunner
rpcClient RPCer
taskCapabilityGetter taskCapabilityGetter
updater hookResourceSetter
hookResources *cstructs.AllocHookResources

nodeSecret string
volumeRequests map[string]*volumeAndRequest
Expand All @@ -44,7 +46,7 @@ type taskCapabilityGetter interface {
GetTaskDriverCapabilities(string) (*drivers.Capabilities, error)
}

func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook {
func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, hookResources *cstructs.AllocHookResources, nodeSecret string) *csiHook {

shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background())

Expand All @@ -54,7 +56,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
csimanager: csi,
rpcClient: rpcClient,
taskCapabilityGetter: taskCapabilityGetter,
updater: updater,
hookResources: hookResources,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
minBackoffInterval: time.Second,
Expand Down Expand Up @@ -108,9 +110,8 @@ func (c *csiHook) Prerun() error {
mounts[alias] = mountInfo
}

res := c.updater.GetAllocHookResources()
res.CSIMounts = mounts
c.updater.SetAllocHookResources(res)
// make the mounts available to the taskrunner's volume_hook
c.hookResources.SetCSIMounts(mounts)

return nil
}
Expand Down
20 changes: 6 additions & 14 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestCSIHook(t *testing.T) {
MountConfigs: drivers.MountConfigSupportAll,
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret")
hook.minBackoffInterval = 1 * time.Millisecond
hook.maxBackoffInterval = 10 * time.Millisecond
hook.maxBackoffDuration = 500 * time.Millisecond
Expand All @@ -212,11 +212,11 @@ func TestCSIHook(t *testing.T) {

if tc.expectedClaimErr != nil {
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.GetAllocHookResources().GetCSIMounts()
mounts := ar.res.GetCSIMounts()
require.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
mounts := ar.res.GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)
require.NoError(t, hook.Postrun())
Expand Down Expand Up @@ -308,16 +308,16 @@ func TestCSIHook_claimVolumesFromAlloc_Validation(t *testing.T) {
capFunc: tc.capFunc,
}

hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret")
require.NotNil(t, hook)

if tc.expectedClaimErr != nil {
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.GetAllocHookResources().GetCSIMounts()
mounts := ar.res.GetCSIMounts()
require.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
mounts := ar.res.GetCSIMounts()
require.NotNil(t, mounts)
require.NoError(t, hook.Postrun())
}
Expand Down Expand Up @@ -431,14 +431,6 @@ type mockAllocRunner struct {
capFunc func() (*drivers.Capabilities, error)
}

func (ar mockAllocRunner) GetAllocHookResources() *cstructs.AllocHookResources {
return ar.res
}

func (ar mockAllocRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
ar.res = res
}

func (ar mockAllocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) {
if ar.capFunc != nil {
return ar.capFunc()
Expand Down
7 changes: 7 additions & 0 deletions client/allocrunner/interfaces/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package interfaces

import (
"github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
)

Expand Down Expand Up @@ -32,3 +33,9 @@ type TaskStateHandler interface {
type AllocStatsReporter interface {
LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error)
}

// HookResourceSetter is used to communicate between alloc hooks and task hooks
type HookResourceSetter interface {
SetCSIMounts(map[string]*csimanager.MountInfo)
GetCSIMounts(map[string]*csimanager.MountInfo)
}
14 changes: 8 additions & 6 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type TaskRunner struct {
// hookResources captures the resources provided by hooks
hookResources *hookResources

// allocHookResources captures the resources provided by the allocrunner hooks
allocHookResources *cstructs.AllocHookResources

// consulClient is the client used by the consul service hook for
// registering services and checks
consulServiceClient serviceregistration.Handler
Expand Down Expand Up @@ -253,8 +256,6 @@ type TaskRunner struct {
networkIsolationLock sync.Mutex
networkIsolationSpec *drivers.NetworkIsolationSpec

allocHookResources *cstructs.AllocHookResources

// serviceRegWrapper is the handler wrapper that is used by service hooks
// to perform service and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper
Expand Down Expand Up @@ -329,6 +330,10 @@ type Config struct {

// Getter is an interface for retrieving artifacts.
Getter cinterfaces.ArtifactGetter

// AllocHookResources is how taskrunner hooks can get state written by
// allocrunner hooks
AllocHookResources *cstructs.AllocHookResources
}

func NewTaskRunner(config *Config) (*TaskRunner, error) {
Expand Down Expand Up @@ -368,6 +373,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
vaultClient: config.Vault,
state: tstate,
localState: state.NewLocalState(),
allocHookResources: config.AllocHookResources,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
deviceStatsReporter: config.DeviceStatsReporter,
Expand Down Expand Up @@ -1576,10 +1582,6 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) {
return tr.driver.Capabilities()
}

func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
tr.allocHookResources = res
}

// shutdownDelayCancel is used for testing only and cancels the
// shutdownDelayCtx
func (tr *TaskRunner) shutdownDelayCancel() {
Expand Down
17 changes: 8 additions & 9 deletions client/allocrunner/taskrunner/volume_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,15 @@ func TestVolumeHook_prepareCSIVolumes(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {

tr := &TaskRunner{
task: req.Task,
driver: tc.Driver,
allocHookResources: &cstructs.AllocHookResources{
CSIMounts: map[string]*csimanager.MountInfo{
"foo": {
Source: "/mnt/my-test-volume",
},
},
},
task: req.Task,
driver: tc.Driver,
allocHookResources: cstructs.NewAllocHookResources(),
}
tr.allocHookResources.SetCSIMounts(map[string]*csimanager.MountInfo{
"foo": {
Source: "/mnt/my-test-volume",
},
})

hook := &volumeHook{
logger: testlog.HCLogger(t),
Expand Down
10 changes: 10 additions & 0 deletions client/pluginmanager/csimanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ type MountInfo struct {
IsDevice bool
}

func (mi *MountInfo) Copy() *MountInfo {
if mi == nil {
return nil
}

nmi := new(MountInfo)
*nmi = *mi
return nmi
}

type UsageOptions struct {
ReadOnly bool
AttachmentMode structs.CSIVolumeAttachmentMode
Expand Down
21 changes: 17 additions & 4 deletions client/structs/allochook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,39 @@ import (
"sync"

"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/helper"
)

// AllocHookResources contains data that is provided by AllocRunner Hooks for
// consumption by TaskRunners
// consumption by TaskRunners. This should be instantiated once in the
// AllocRunner and then only accessed via getters and setters that hold the
// lock.
type AllocHookResources struct {
CSIMounts map[string]*csimanager.MountInfo
csiMounts map[string]*csimanager.MountInfo

mu sync.RWMutex
}

func NewAllocHookResources() *AllocHookResources {
return &AllocHookResources{
csiMounts: map[string]*csimanager.MountInfo{},
}
}

// GetCSIMounts returns a copy of the CSI mount info previously written by the
// CSI allocrunner hook
func (a *AllocHookResources) GetCSIMounts() map[string]*csimanager.MountInfo {
a.mu.RLock()
defer a.mu.RUnlock()

return a.CSIMounts
return helper.DeepCopyMap(a.csiMounts)
}

// SetCSIMounts stores the CSI mount info for later use by the volume taskrunner
// hook
func (a *AllocHookResources) SetCSIMounts(m map[string]*csimanager.MountInfo) {
a.mu.Lock()
defer a.mu.Unlock()

a.CSIMounts = m
a.csiMounts = m
}

0 comments on commit b169e8d

Please sign in to comment.