Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: make volume GC in job deregister safely async #7632

Merged
merged 3 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
csiReq := req.ToCSIRequest()

// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFn()
// CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithPerRetryTimeout(10*time.Second),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
Expand Down Expand Up @@ -190,7 +190,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
AccessMode: string(req.AccessMode),
}

err = mounter.UnmountVolume(ctx, req.VolumeID, req.AllocID, usageOpts)
err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion client/pluginmanager/csimanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (u *UsageOptions) ToFS() string {

type VolumeMounter interface {
MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error)
UnmountVolume(ctx context.Context, volID, allocID string, usageOpts *UsageOptions) error
UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error
}

type Manager interface {
Expand Down
14 changes: 7 additions & 7 deletions client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,15 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
// once for each staging path that a volume has been staged under.
// It is safe to call multiple times and a plugin is required to return OK if
// the volume has been unstaged or was never staged on the node.
func (v *volumeManager) unstageVolume(ctx context.Context, volID string, usage *UsageOptions) error {
func (v *volumeManager) unstageVolume(ctx context.Context, volID, remoteID string, usage *UsageOptions) error {
logger := hclog.FromContext(ctx)
logger.Trace("Unstaging volume")
stagingPath := v.stagingDirForVolume(v.containerMountPoint, volID, usage)

// CSI NodeUnstageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeUnstageVolume(ctx,
volID,
remoteID,
stagingPath,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
Expand All @@ -288,12 +288,12 @@ func combineErrors(maybeErrs ...error) error {
return result.ErrorOrNil()
}

func (v *volumeManager) unpublishVolume(ctx context.Context, volID, allocID string, usage *UsageOptions) error {
func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) error {
pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, volID, allocID, usage)

// CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
rpcErr := v.plugin.NodeUnpublishVolume(ctx, volID, pluginTargetPath,
rpcErr := v.plugin.NodeUnpublishVolume(ctx, remoteID, pluginTargetPath,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
Expand Down Expand Up @@ -325,16 +325,16 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, allocID stri
return rpcErr
}

func (v *volumeManager) UnmountVolume(ctx context.Context, volID, allocID string, usage *UsageOptions) (err error) {
func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
logger := v.logger.With("volume_id", volID, "alloc_id", allocID)
ctx = hclog.WithContext(ctx, logger)

err = v.unpublishVolume(ctx, volID, allocID, usage)
err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage)

if err == nil {
canRelease := v.usageTracker.Free(allocID, volID, usage)
if v.requiresStaging && canRelease {
err = v.unstageVolume(ctx, volID, usage)
err = v.unstageVolume(ctx, volID, remoteID, usage)
}
}

Expand Down
8 changes: 5 additions & 3 deletions client/pluginmanager/csimanager/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ func TestVolumeManager_unstageVolume(t *testing.T) {
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true)
ctx := context.Background()

err := manager.unstageVolume(ctx, tc.Volume.ID, tc.UsageOptions)
err := manager.unstageVolume(ctx,
tc.Volume.ID, tc.Volume.RemoteID(), tc.UsageOptions)

if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
Expand Down Expand Up @@ -416,7 +417,8 @@ func TestVolumeManager_unpublishVolume(t *testing.T) {
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true)
ctx := context.Background()

err := manager.unpublishVolume(ctx, tc.Volume.ID, tc.Allocation.ID, tc.UsageOptions)
err := manager.unpublishVolume(ctx,
tc.Volume.ID, tc.Volume.RemoteID(), tc.Allocation.ID, tc.UsageOptions)

if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
Expand Down Expand Up @@ -476,7 +478,7 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) {
require.Equal(t, "true", e.Details["success"])
events = events[1:]

err = manager.UnmountVolume(ctx, vol.ID, alloc.ID, usage)
err = manager.UnmountVolume(ctx, vol.ID, vol.RemoteID(), alloc.ID, usage)
require.NoError(t, err)

require.Equal(t, 1, len(events))
Expand Down
13 changes: 7 additions & 6 deletions client/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type CSIControllerQuery struct {
}

type ClientCSIControllerValidateVolumeRequest struct {
VolumeID string
VolumeID string // note: this is the external ID

AttachmentMode structs.CSIVolumeAttachmentMode
AccessMode structs.CSIVolumeAccessMode
Expand All @@ -43,7 +43,7 @@ type ClientCSIControllerValidateVolumeResponse struct {
}

type ClientCSIControllerAttachVolumeRequest struct {
// The ID of the volume to be used on a node.
// The external ID of the volume to be used on a node.
// This field is REQUIRED.
VolumeID string

Expand Down Expand Up @@ -137,10 +137,11 @@ type ClientCSIControllerDetachVolumeResponse struct{}
// a Nomad client to tell a CSI node plugin on that client to perform
// NodeUnpublish and NodeUnstage.
type ClientCSINodeDetachVolumeRequest struct {
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be unpublished (required)
AllocID string // ID of the allocation we're unpublishing for (required)
NodeID string // ID of the Nomad client targeted
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be unpublished (required)
AllocID string // ID of the allocation we're unpublishing for (required)
NodeID string // ID of the Nomad client targeted
ExternalID string // External ID of the volume to be unpublished (required)

// These fields should match the original volume request so that
// we can find the mount points on the client
Expand Down
10 changes: 8 additions & 2 deletions e2e/csi/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ func (tc *CSIVolumesTest) TestEBSVolumeClaim(f *framework.F) {
// Shutdown the writer so we can run a reader.
// we could mount the EBS volume with multi-attach, but we
// want this test to exercise the unpublish workflow.
// this runs the equivalent of 'nomad job stop -purge'
nomadClient.Jobs().Deregister(writeJobID, true, nil)
//
// TODO(tgross): we should pass true here to run the equivalent
// of 'nomad job stop -purge' but this makes the test really
// racy. Once the unmount hang problem with -purge is fixed,
// we can restore this.
nomadClient.Jobs().Deregister(writeJobID, false, nil)
e2eutil.WaitForAllocStopped(t, nomadClient, writeAllocID)

// deploy a job so we can read from the volume
readJobID := "read-ebs-" + uuid[0:8]
Expand Down Expand Up @@ -179,6 +184,7 @@ func (tc *CSIVolumesTest) TestEFSVolumeClaim(f *framework.F) {
// does not.
// this runs the equivalent of 'nomad job stop'
nomadClient.Jobs().Deregister(writeJobID, false, nil)
e2eutil.WaitForAllocStopped(t, nomadClient, writeAllocID)

// deploy a job that reads from the volume.
readJobID := "read-efs-" + uuid[0:8]
Expand Down
23 changes: 23 additions & 0 deletions e2e/e2eutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,29 @@ func WaitForAllocNotPending(t *testing.T, nomadClient *api.Client, allocID strin
})
}

func WaitForAllocStopped(t *testing.T, nomadClient *api.Client, allocID string) {
testutil.WaitForResultRetries(retries, func() (bool, error) {
time.Sleep(time.Millisecond * 100)
alloc, _, err := nomadClient.Allocations().Info(allocID, nil)
if err != nil {
return false, err
}
switch alloc.ClientStatus {
case structs.AllocClientStatusComplete:
return true, nil
case structs.AllocClientStatusFailed:
return true, nil
case structs.AllocClientStatusLost:
return true, nil
default:
return false, fmt.Errorf("expected stopped alloc, but was: %s",
alloc.ClientStatus)
}
}, func(err error) {
t.Fatalf("failed to wait on alloc: %v", err)
})
}

func AllocIDsFromAllocationListStubs(allocs []*api.AllocationListStub) []string {
allocIDs := make([]string, 0, len(allocs))
for _, alloc := range allocs {
Expand Down
145 changes: 66 additions & 79 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@ OUTER:
c.logger.Debug("job GC found eligible objects",
"jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc))

// Clean up any outstanding volume claims
if err := c.volumeClaimReap(gcJob, eval.LeaderACL); err != nil {
return err
}

// Reap the evals and allocs
if err := c.evalReap(gcEval, gcAlloc); err != nil {
return err
Expand Down Expand Up @@ -720,90 +715,64 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
c.logger.Trace("garbage collecting unclaimed CSI volume claims")

// JobID smuggled in with the eval's own JobID
var jobID string
evalJobID := strings.Split(eval.JobID, ":")
if len(evalJobID) != 2 {
c.logger.Error("volume gc called without jobID")
// Volume ID smuggled in with the eval's own JobID
evalVolID := strings.Split(eval.JobID, ":")
if len(evalVolID) != 3 {
c.logger.Error("volume gc called without volID")
return nil
}

jobID = evalJobID[1]
job, err := c.srv.State().JobByID(nil, eval.Namespace, jobID)
if err != nil || job == nil {
c.logger.Trace(
"cannot find job to perform volume claim GC. it may have been garbage collected",
"job", jobID)
return nil
}
return c.volumeClaimReap([]*structs.Job{job}, eval.LeaderACL)
volID := evalVolID[1]
runningAllocs := evalVolID[2] == "purge"
return volumeClaimReap(c.srv, volID, eval.Namespace,
c.srv.config.Region, eval.LeaderACL, runningAllocs)
}

// volumeClaimReap contacts the leader and releases volume claims from terminal allocs
func (c *CoreScheduler) volumeClaimReap(jobs []*structs.Job, leaderACL string) error {
return volumeClaimReap(c.srv, c.logger, jobs, leaderACL, false)
}
func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error {

// volumeClaimReap contacts the leader and releases volume claims from terminal allocs
func volumeClaimReap(srv *Server, logger log.Logger, jobs []*structs.Job, leaderACL string, runningAllocs bool) error {
ws := memdb.NewWatchSet()
var result *multierror.Error

for _, job := range jobs {
logger.Trace("garbage collecting unclaimed CSI volume claims for job", "job", job.ID)
for _, taskGroup := range job.TaskGroups {
for _, tgVolume := range taskGroup.Volumes {
if tgVolume.Type != structs.VolumeTypeCSI {
continue // filter to just CSI volumes
}
volID := tgVolume.Source
vol, err := srv.State().CSIVolumeByID(ws, job.Namespace, volID)
if err != nil {
result = multierror.Append(result, err)
continue
}
if vol == nil {
logger.Trace("cannot find volume to be GC'd. it may have been deregistered",
"volume", volID)
continue
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
result = multierror.Append(result, err)
continue
}
vol, err := srv.State().CSIVolumeByID(ws, namespace, volID)
if err != nil {
return err
}
if vol == nil {
return nil
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}

plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
result = multierror.Append(result, err)
continue
}
plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
return err
}

gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

for _, claim := range gcClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
allocID: claim.allocID,
nodeID: claim.nodeID,
mode: claim.mode,
region: job.Region,
namespace: job.Namespace,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
}
}
gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

var result *multierror.Error
for _, claim := range gcClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
allocID: claim.allocID,
nodeID: claim.nodeID,
mode: claim.mode,
namespace: namespace,
region: region,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
}
return result.ErrorOrNil()

}

type gcClaimRequest struct {
Expand Down Expand Up @@ -862,7 +831,8 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i
// operations or releasing the claim.
nReq := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: args.plug.ID,
VolumeID: vol.RemoteID(),
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: args.allocID,
NodeID: nodeID,
AttachmentMode: vol.AttachmentMode,
Expand All @@ -880,13 +850,30 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i
// on the node need it, but we also only want to make this
// call at most once per node
if vol.ControllerRequired && args.nodeClaims[nodeID] < 1 {

// we need to get the CSI Node ID, which is not the same as
// the Nomad Node ID
ws := memdb.NewWatchSet()
targetNode, err := srv.State().NodeByID(ws, nodeID)
if err != nil {
return args.nodeClaims, err
}
if targetNode == nil {
return args.nodeClaims, fmt.Errorf("%s: %s",
structs.ErrUnknownNodePrefix, nodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
if !ok {
return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}

controllerNodeID, err := nodeForControllerPlugin(srv.State(), args.plug)
if err != nil || nodeID == "" {
if err != nil || controllerNodeID == "" {
return args.nodeClaims, err
}
cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: nodeID,
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
}
cReq.PluginID = args.plug.ID
cReq.ControllerNodeID = controllerNodeID
Expand Down
Loading