diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index ce28f5eee22..ff91f0da6d8 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -561,6 +561,10 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { if tc.expectedErrMsg == "" { require.NoError(t, err) + vol, err = state.CSIVolumeByID(nil, ns, volID) + require.NoError(t, err) + require.NotNil(t, vol) + require.Len(t, vol.ReadAllocs, 0) } else { require.Error(t, err) require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg), diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 87c453ba855..977eec5bfbc 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2072,6 +2072,17 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum v.ModifyIndex = index } + // Allocations are copy on write, so we want to keep the Allocation ID + // but we need to clear the pointer so that we don't store it when we + // write the volume to the state store. We'll get it from the db in + // denormalize. + for allocID := range v.ReadAllocs { + v.ReadAllocs[allocID] = nil + } + for allocID := range v.WriteAllocs { + v.WriteAllocs[allocID] = nil + } + err = txn.Insert("csi_volumes", v) if err != nil { return fmt.Errorf("volume insert: %v", err) @@ -2263,6 +2274,17 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s volume.ModifyIndex = index + // Allocations are copy on write, so we want to keep the Allocation ID + // but we need to clear the pointer so that we don't store it when we + // write the volume to the state store. We'll get it from the db in + // denormalize. + for allocID := range volume.ReadAllocs { + volume.ReadAllocs[allocID] = nil + } + for allocID := range volume.WriteAllocs { + volume.WriteAllocs[allocID] = nil + } + if err = txn.Insert("csi_volumes", volume); err != nil { return fmt.Errorf("volume update failed: %s: %v", id, err) } diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 79d3c9cab01..ade9181fbeb 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -306,22 +306,14 @@ func NewCSIVolume(volumeID string, index uint64) *CSIVolume { } func (v *CSIVolume) newStructs() { - if v.Topologies == nil { - v.Topologies = []*CSITopology{} - } - if v.Context == nil { - v.Context = map[string]string{} - } - if v.Parameters == nil { - v.Parameters = map[string]string{} - } - if v.Secrets == nil { - v.Secrets = CSISecrets{} - } + v.Topologies = []*CSITopology{} + v.MountOptions = new(CSIMountOptions) + v.Secrets = CSISecrets{} + v.Parameters = map[string]string{} + v.Context = map[string]string{} v.ReadAllocs = map[string]*Allocation{} v.WriteAllocs = map[string]*Allocation{} - v.ReadClaims = map[string]*CSIVolumeClaim{} v.WriteClaims = map[string]*CSIVolumeClaim{} v.PastClaims = map[string]*CSIVolumeClaim{} @@ -386,7 +378,7 @@ func (v *CSIVolume) WriteSchedulable() bool { func (v *CSIVolume) WriteFreeClaims() bool { switch v.AccessMode { case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter: - return len(v.WriteAllocs) == 0 + return len(v.WriteClaims) == 0 case CSIVolumeAccessModeMultiNodeMultiWriter: // the CSI spec doesn't allow for setting a max number of writers. // we track node resource exhaustion through v.ResourceExhausted @@ -405,25 +397,31 @@ func (v *CSIVolume) InUse() bool { // Copy returns a copy of the volume, which shares only the Topologies slice func (v *CSIVolume) Copy() *CSIVolume { - copy := *v - out := © - out.newStructs() + out := new(CSIVolume) + *out = *v + out.newStructs() // zero-out the non-primitive structs + + for _, t := range v.Topologies { + out.Topologies = append(out.Topologies, t.Copy()) + } + if v.MountOptions != nil { + *out.MountOptions = *v.MountOptions + } + for k, v := range v.Secrets { + out.Secrets[k] = v + } for k, v := range v.Parameters { out.Parameters[k] = v } for k, v := range v.Context { out.Context[k] = v } - for k, v := range v.Secrets { - out.Secrets[k] = v - } - for k, v := range v.ReadAllocs { - out.ReadAllocs[k] = v + for k, alloc := range v.ReadAllocs { + out.ReadAllocs[k] = alloc.Copy() } - - for k, v := range v.WriteAllocs { - out.WriteAllocs[k] = v + for k, alloc := range v.WriteAllocs { + out.WriteAllocs[k] = alloc.Copy() } for k, v := range v.ReadClaims { @@ -498,7 +496,7 @@ func (v *CSIVolume) ClaimWrite(claim *CSIVolumeClaim, alloc *Allocation) error { if !v.WriteFreeClaims() { // Check the blocking allocations to see if they belong to this job for _, a := range v.WriteAllocs { - if a.Namespace != alloc.Namespace || a.JobID != alloc.JobID { + if a != nil && (a.Namespace != alloc.Namespace || a.JobID != alloc.JobID) { return fmt.Errorf("volume max claim reached") } } @@ -775,19 +773,19 @@ func (p *CSIPlugin) Copy() *CSIPlugin { out.newStructs() for k, v := range p.Controllers { - out.Controllers[k] = v + out.Controllers[k] = v.Copy() } for k, v := range p.Nodes { - out.Nodes[k] = v + out.Nodes[k] = v.Copy() } for k, v := range p.ControllerJobs { - out.ControllerJobs[k] = v + out.ControllerJobs[k] = v.Copy() } for k, v := range p.NodeJobs { - out.NodeJobs[k] = v + out.NodeJobs[k] = v.Copy() } return out @@ -989,6 +987,14 @@ type JobDescription struct { // JobNamespacedDescriptions maps Job.ID to JobDescription type JobNamespacedDescriptions map[string]JobDescription +func (j JobNamespacedDescriptions) Copy() JobNamespacedDescriptions { + copy := JobNamespacedDescriptions{} + for k, v := range j { + copy[k] = v + } + return copy +} + // JobDescriptions maps Namespace to a mapping of Job.ID to JobDescription type JobDescriptions map[string]JobNamespacedDescriptions diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index d5f63b2413d..b1c2ceda506 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -1,7 +1,9 @@ package structs import ( + "reflect" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -43,6 +45,90 @@ func TestCSIVolumeClaim(t *testing.T) { require.True(t, vol.WriteFreeClaims()) } +func TestVolume_Copy(t *testing.T) { + + a1 := MockAlloc() + a2 := MockAlloc() + a3 := MockAlloc() + c1 := &CSIVolumeClaim{ + AllocationID: a1.ID, + NodeID: a1.NodeID, + ExternalNodeID: "c1", + Mode: CSIVolumeClaimRead, + State: CSIVolumeClaimStateTaken, + } + c2 := &CSIVolumeClaim{ + AllocationID: a2.ID, + NodeID: a2.NodeID, + ExternalNodeID: "c2", + Mode: CSIVolumeClaimRead, + State: CSIVolumeClaimStateNodeDetached, + } + c3 := &CSIVolumeClaim{ + AllocationID: a3.ID, + NodeID: a3.NodeID, + ExternalNodeID: "c3", + Mode: CSIVolumeClaimWrite, + State: CSIVolumeClaimStateTaken, + } + + v1 := &CSIVolume{ + ID: "vol1", + Name: "vol1", + ExternalID: "vol-abcdef", + Namespace: "default", + Topologies: []*CSITopology{{Segments: map[string]string{"AZ1": "123"}}}, + AccessMode: CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: CSIVolumeAttachmentModeBlockDevice, + MountOptions: &CSIMountOptions{FSType: "ext4", MountFlags: []string{"ro", "noatime"}}, + Secrets: CSISecrets{"mysecret": "myvalue"}, + Parameters: map[string]string{"param1": "val1"}, + Context: map[string]string{"ctx1": "val1"}, + + ReadAllocs: map[string]*Allocation{a1.ID: a1, a2.ID: nil}, + WriteAllocs: map[string]*Allocation{a3.ID: a3}, + + ReadClaims: map[string]*CSIVolumeClaim{a1.ID: c1, a2.ID: c2}, + WriteClaims: map[string]*CSIVolumeClaim{a3.ID: c3}, + PastClaims: map[string]*CSIVolumeClaim{}, + + Schedulable: true, + PluginID: "moosefs", + Provider: "n/a", + ProviderVersion: "1.0", + ControllerRequired: true, + ControllersHealthy: 2, + ControllersExpected: 2, + NodesHealthy: 4, + NodesExpected: 5, + ResourceExhausted: time.Now(), + } + + v2 := v1.Copy() + if !reflect.DeepEqual(v1, v2) { + t.Fatalf("Copy() returned an unequal Volume; got %#v; want %#v", v1, v2) + } + + v1.ReadClaims[a1.ID].State = CSIVolumeClaimStateReadyToFree + v1.ReadAllocs[a2.ID] = a2 + v1.WriteAllocs[a3.ID].ClientStatus = AllocClientStatusComplete + v1.MountOptions.FSType = "zfs" + + if v2.ReadClaims[a1.ID].State == CSIVolumeClaimStateReadyToFree { + t.Fatalf("Volume.Copy() failed; changes to original ReadClaims seen in copy") + } + if v2.ReadAllocs[a2.ID] != nil { + t.Fatalf("Volume.Copy() failed; changes to original ReadAllocs seen in copy") + } + if v2.WriteAllocs[a3.ID].ClientStatus == AllocClientStatusComplete { + t.Fatalf("Volume.Copy() failed; changes to original WriteAllocs seen in copy") + } + if v2.MountOptions.FSType == "zfs" { + t.Fatalf("Volume.Copy() failed; changes to original MountOptions seen in copy") + } + +} + func TestCSIPluginJobs(t *testing.T) { plug := NewCSIPlugin("foo", 1000) controller := &Job{