From 97bb9125af95f5cfc74a78323ad257029b209440 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 10 Mar 2021 11:37:55 -0500 Subject: [PATCH 1/3] csi: CSIVolumeByID should not use prefix query Callers of `CSIVolumeByID` are generally assuming they should receive a single volume. This potentially results in feasibility checking being performed against the wrong volume if a volume's ID is a prefix substring of other volume (for example: "test" and "testing"). --- nomad/state/state_store.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f4738e2d70d..fdef18e3ae5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2126,20 +2126,20 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) { txn := s.db.ReadTxn() - watchCh, obj, err := txn.FirstWatch("csi_volumes", "id_prefix", namespace, id) + obj, err := txn.First("csi_volumes", "id", namespace, id) if err != nil { - return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) + return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err) } - - ws.Add(watchCh) - if obj == nil { return nil, nil } + vol, ok := obj.(*structs.CSIVolume) + if !ok { + return nil, fmt.Errorf("volume row conversion error") + } // we return the volume with the plugins denormalized by default, // because the scheduler needs them for feasibility checking - vol := obj.(*structs.CSIVolume) return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy()) } From 25782f32c8566de61fc5fe4de36f63a06faded17 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 10 Mar 2021 14:26:23 -0500 Subject: [PATCH 2/3] csi: reimplement prefix matching Removing the incorrect prefix matching from `CSIVolumeByID` breaks prefix matching in the command line client. Add the required elements for prefix matching to the commands and API. --- CHANGELOG.md | 2 ++ api/csi.go | 5 ---- command/agent/csi_endpoint.go | 4 +++- command/volume_deregister.go | 20 ++++++++++++++++ command/volume_detach.go | 21 +++++++++++++++++ command/volume_status_csi.go | 24 +++++++++++++++++++- nomad/csi_endpoint.go | 10 +++++--- nomad/state/state_store.go | 20 ++++++++-------- nomad/state/state_store_test.go | 14 ++++++------ scheduler/feasible.go | 2 +- scheduler/scheduler.go | 2 +- vendor/github.com/hashicorp/nomad/api/csi.go | 5 ---- 12 files changed, 96 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e82d70345f..b40607644ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,11 @@ BUG FIXES: * agent: Only allow querying Prometheus formatted metrics if Prometheus is enabled within the config [[GH-10140](https://github.com/hashicorp/nomad/pull/10140)] * api: Added missing devices block to AllocatedTaskResources [[GH-10064](https://github.com/hashicorp/nomad/pull/10064)] + * api: Removed unimplemented `CSIVolumes.PluginList` API. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)] * cli: Fixed a bug where non-int proxy port would panic CLI [[GH-10072](https://github.com/hashicorp/nomad/issues/10072)] * cli: Fixed a bug where `nomad operator debug` incorrectly parsed https Consul API URLs. [[GH-10082](https://github.com/hashicorp/nomad/pull/10082)] * client: Fixed log formatting when killing tasks. [[GH-10135](https://github.com/hashicorp/nomad/issues/10135)] + * csi: Fixed a bug where volume with IDs that are a substring prefix of another volume could use the wrong volume for feasibility checking. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)] * scheduler: Fixed a bug where jobs requesting multiple CSI volumes could be incorrectly scheduled if only one of the volumes passed feasibility checking. [[GH-10143](https://github.com/hashicorp/nomad/issues/10143)] * ui: Fixed the rendering of interstitial components shown after processing a dynamic application sizing recommendation. [[GH-10094](https://github.com/hashicorp/nomad/pull/10094)] diff --git a/api/csi.go b/api/csi.go index 66ce2731d88..28da54bf0c8 100644 --- a/api/csi.go +++ b/api/csi.go @@ -28,11 +28,6 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er return resp, qm, nil } -// PluginList returns all CSI volumes for the specified plugin id -func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) { - return v.List(&QueryOptions{Prefix: pluginID}) -} - // Info is used to retrieve a single CSIVolume func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) { var resp CSIVolume diff --git a/command/agent/csi_endpoint.go b/command/agent/csi_endpoint.go index 15d09893586..1a2b91e8237 100644 --- a/command/agent/csi_endpoint.go +++ b/command/agent/csi_endpoint.go @@ -34,7 +34,9 @@ func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Reque if s.parse(resp, req, &args.Region, &args.QueryOptions) { return nil, nil } - + if prefix, ok := query["prefix"]; ok { + args.Prefix = prefix[0] + } if plugin, ok := query["plugin_id"]; ok { args.PluginID = plugin[0] } diff --git a/command/volume_deregister.go b/command/volume_deregister.go index 83ec35595cf..6456e513691 100644 --- a/command/volume_deregister.go +++ b/command/volume_deregister.go @@ -2,8 +2,10 @@ package command import ( "fmt" + "sort" "strings" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" "github.com/posener/complete" ) @@ -91,6 +93,24 @@ func (c *VolumeDeregisterCommand) Run(args []string) int { return 1 } + // Prefix search for the volume + vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: volID}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err)) + return 1 + } + if len(vols) > 1 { + sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID }) + out, err := csiFormatSortedVolumes(vols, shortId) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting: %s", err)) + return 1 + } + c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) + return 1 + } + volID = vols[0].ID + // Confirm the -force flag if force { question := fmt.Sprintf("Are you sure you want to force deregister volume %q? [y/N]", volID) diff --git a/command/volume_detach.go b/command/volume_detach.go index ec6855c75e6..14d89b3c080 100644 --- a/command/volume_detach.go +++ b/command/volume_detach.go @@ -2,8 +2,10 @@ package command import ( "fmt" + "sort" "strings" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" "github.com/posener/complete" ) @@ -112,6 +114,25 @@ func (c *VolumeDetachCommand) Run(args []string) int { // volume's claimed allocations, otherwise just use the node ID we've been // given. if len(nodes) == 0 { + + // Prefix search for the volume + vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: volID}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err)) + return 1 + } + if len(vols) > 1 { + sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID }) + out, err := csiFormatSortedVolumes(vols, shortId) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting: %s", err)) + return 1 + } + c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) + return 1 + } + volID = vols[0].ID + vol, _, err := client.CSIVolumes().Info(volID, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error querying volume: %s", err)) diff --git a/command/volume_status_csi.go b/command/volume_status_csi.go index 95f6883bf75..c625d57b586 100644 --- a/command/volume_status_csi.go +++ b/command/volume_status_csi.go @@ -39,6 +39,23 @@ func (c *VolumeStatusCommand) csiStatus(client *api.Client, id string) int { return 0 } + // Prefix search for the volume + vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: id}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err)) + return 1 + } + if len(vols) > 1 { + out, err := c.csiFormatVolumes(vols) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting: %s", err)) + return 1 + } + c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) + return 1 + } + id = vols[0].ID + // Try querying the volume vol, _, err := client.CSIVolumes().Info(id, nil) if err != nil { @@ -68,11 +85,16 @@ func (c *VolumeStatusCommand) csiFormatVolumes(vols []*api.CSIVolumeListStub) (s return out, nil } + return csiFormatSortedVolumes(vols, c.length) +} + +// Format the volumes, assumes that we're already sorted by volume ID +func csiFormatSortedVolumes(vols []*api.CSIVolumeListStub, length int) (string, error) { rows := make([]string, len(vols)+1) rows[0] = "ID|Name|Plugin ID|Schedulable|Access Mode" for i, v := range vols { rows[i+1] = fmt.Sprintf("%s|%s|%s|%t|%s", - limit(v.ID, c.length), + limit(v.ID, length), v.Name, v.PluginID, v.Schedulable, diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 19df0de9c04..e5eb0902d33 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -120,15 +120,19 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV if err != nil { return err } + // TODO: handle prefix arg + // Query all volumes var iter memdb.ResultIterator + prefix := args.Prefix + if args.NodeID != "" { - iter, err = snap.CSIVolumesByNodeID(ws, args.NodeID) + iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID) } else if args.PluginID != "" { - iter, err = snap.CSIVolumesByPluginID(ws, ns, args.PluginID) + iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID) } else { - iter, err = snap.CSIVolumesByNamespace(ws, ns) + iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix) } if err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index fdef18e3ae5..73e605b619d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2145,7 +2145,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st // CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it // wants to also denormalize the plugins. -func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID string) (memdb.ResultIterator, error) { +func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "plugin_id", pluginID) @@ -2159,7 +2159,7 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID if !ok { return false } - return v.Namespace != namespace + return v.Namespace != namespace && strings.HasPrefix(v.ID, prefix) } wrap := memdb.NewFilterIterator(iter, f) @@ -2183,7 +2183,7 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID // CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should // snapshot if it wants to also denormalize the plugins. -func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) { +func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) { allocs, err := s.AllocsByNode(ws, nodeID) if err != nil { return nil, fmt.Errorf("alloc lookup failed: %v", err) @@ -2212,11 +2212,13 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb iter := NewSliceIterator() txn := s.db.ReadTxn() for id, namespace := range ids { - raw, err := txn.First("csi_volumes", "id", namespace, id) - if err != nil { - return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) + if strings.HasPrefix(id, prefix) { + raw, err := txn.First("csi_volumes", "id", namespace, id) + if err != nil { + return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) + } + iter.Add(raw) } - iter.Add(raw) } ws.Add(iter.WatchCh()) @@ -2225,10 +2227,10 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb } // CSIVolumesByNamespace looks up the entire csi_volumes table -func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { +func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - iter, err := txn.Get("csi_volumes", "id_prefix", namespace, "") + iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix) if err != nil { return nil, fmt.Errorf("volume lookup failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a8875dc11d1..113c2ade02b 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2913,7 +2913,7 @@ func TestStateStore_CSIVolume(t *testing.T) { require.Error(t, err, fmt.Sprintf("volume exists: %s", v0.ID)) ws := memdb.NewWatchSet() - iter, err := state.CSIVolumesByNamespace(ws, ns) + iter, err := state.CSIVolumesByNamespace(ws, ns, "") require.NoError(t, err) slurp := func(iter memdb.ResultIterator) (vs []*structs.CSIVolume) { @@ -2932,13 +2932,13 @@ func TestStateStore_CSIVolume(t *testing.T) { require.Equal(t, 2, len(vs)) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 1, len(vs)) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByNodeID(ws, node.ID) + iter, err = state.CSIVolumesByNodeID(ws, "", node.ID) require.NoError(t, err) vs = slurp(iter) require.Equal(t, 1, len(vs)) @@ -2973,7 +2973,7 @@ func TestStateStore_CSIVolume(t *testing.T) { require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie") require.NoError(t, err) vs = slurp(iter) require.False(t, vs[0].WriteFreeClaims()) @@ -2982,7 +2982,7 @@ func TestStateStore_CSIVolume(t *testing.T) { err = state.CSIVolumeClaim(2, ns, vol0, claim0) require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie") require.NoError(t, err) vs = slurp(iter) require.True(t, vs[0].ReadSchedulable()) @@ -3018,13 +3018,13 @@ func TestStateStore_CSIVolume(t *testing.T) { // List, now omitting the deregistered volume ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 0, len(vs)) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByNamespace(ws, ns) + iter, err = state.CSIVolumesByNamespace(ws, ns, "") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 1, len(vs)) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 399ab457e86..0d6346084de 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -266,7 +266,7 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) { // Find the count per plugin for this node, so that can enforce MaxVolumes pluginCount := map[string]int64{} - iter, err := c.ctx.State().CSIVolumesByNodeID(ws, n.ID) + iter, err := c.ctx.State().CSIVolumesByNodeID(ws, "", n.ID) if err != nil { return false, FilterConstraintCSIVolumesLookupFailed } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index a950690db44..95877569db3 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -105,7 +105,7 @@ type State interface { CSIVolumeByID(memdb.WatchSet, string, string) (*structs.CSIVolume, error) // CSIVolumeByID fetch CSI volumes, containing controller jobs - CSIVolumesByNodeID(memdb.WatchSet, string) (memdb.ResultIterator, error) + CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error) } // Planner interface is used to submit a task allocation plan. diff --git a/vendor/github.com/hashicorp/nomad/api/csi.go b/vendor/github.com/hashicorp/nomad/api/csi.go index 66ce2731d88..28da54bf0c8 100644 --- a/vendor/github.com/hashicorp/nomad/api/csi.go +++ b/vendor/github.com/hashicorp/nomad/api/csi.go @@ -28,11 +28,6 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er return resp, qm, nil } -// PluginList returns all CSI volumes for the specified plugin id -func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) { - return v.List(&QueryOptions{Prefix: pluginID}) -} - // Info is used to retrieve a single CSIVolume func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) { var resp CSIVolume From 5836f3bd1b5c794dd2f2b9a9162fe2767295623d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 18 Mar 2021 08:16:21 -0400 Subject: [PATCH 3/3] resolve comments from code review --- command/volume_deregister.go | 4 ++++ command/volume_detach.go | 4 ++++ command/volume_status_csi.go | 4 ++++ nomad/csi_endpoint.go | 1 - nomad/state/state_store.go | 9 +++++---- 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/command/volume_deregister.go b/command/volume_deregister.go index 6456e513691..1710f3cafd8 100644 --- a/command/volume_deregister.go +++ b/command/volume_deregister.go @@ -109,6 +109,10 @@ func (c *VolumeDeregisterCommand) Run(args []string) int { c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) return 1 } + if len(vols) == 0 { + c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", volID)) + return 1 + } volID = vols[0].ID // Confirm the -force flag diff --git a/command/volume_detach.go b/command/volume_detach.go index 14d89b3c080..16e453a43dd 100644 --- a/command/volume_detach.go +++ b/command/volume_detach.go @@ -131,6 +131,10 @@ func (c *VolumeDetachCommand) Run(args []string) int { c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) return 1 } + if len(vols) == 0 { + c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", volID)) + return 1 + } volID = vols[0].ID vol, _, err := client.CSIVolumes().Info(volID, nil) diff --git a/command/volume_status_csi.go b/command/volume_status_csi.go index c625d57b586..d81191c072f 100644 --- a/command/volume_status_csi.go +++ b/command/volume_status_csi.go @@ -54,6 +54,10 @@ func (c *VolumeStatusCommand) csiStatus(client *api.Client, id string) int { c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) return 1 } + if len(vols) == 0 { + c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", id)) + return 1 + } id = vols[0].ID // Try querying the volume diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index e5eb0902d33..a68eabb4fa6 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -120,7 +120,6 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV if err != nil { return err } - // TODO: handle prefix arg // Query all volumes var iter memdb.ResultIterator diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 73e605b619d..53d7ce902b8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2126,10 +2126,12 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) { txn := s.db.ReadTxn() - obj, err := txn.First("csi_volumes", "id", namespace, id) + watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id) if err != nil { return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err) } + ws.Add(watchCh) + if obj == nil { return nil, nil } @@ -2213,16 +2215,15 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string txn := s.db.ReadTxn() for id, namespace := range ids { if strings.HasPrefix(id, prefix) { - raw, err := txn.First("csi_volumes", "id", namespace, id) + watchCh, raw, err := txn.FirstWatch("csi_volumes", "id", namespace, id) if err != nil { return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) } + ws.Add(watchCh) iter.Add(raw) } } - ws.Add(iter.WatchCh()) - return iter, nil }