diff --git a/CHANGELOG.md b/CHANGELOG.md index f31814f1cab..36811345a69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,11 @@ SECURITY: BUG FIXES: * agent: Only allow querying Prometheus formatted metrics if Prometheus is enabled within the config [[GH-10140](https://github.com/hashicorp/nomad/pull/10140)] * api: Added missing devices block to AllocatedTaskResources [[GH-10064](https://github.com/hashicorp/nomad/pull/10064)] + * api: Removed unimplemented `CSIVolumes.PluginList` API. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)] * cli: Fixed a bug where non-int proxy port would panic CLI [[GH-10072](https://github.com/hashicorp/nomad/issues/10072)] * cli: Fixed a bug where `nomad operator debug` incorrectly parsed https Consul API URLs. [[GH-10082](https://github.com/hashicorp/nomad/pull/10082)] * client: Fixed log formatting when killing tasks. [[GH-10135](https://github.com/hashicorp/nomad/issues/10135)] + * csi: Fixed a bug where volume with IDs that are a substring prefix of another volume could use the wrong volume for feasibility checking. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)] * scheduler: Fixed a bug where jobs requesting multiple CSI volumes could be incorrectly scheduled if only one of the volumes passed feasibility checking. [[GH-10143](https://github.com/hashicorp/nomad/issues/10143)] * ui: Fixed the rendering of interstitial components shown after processing a dynamic application sizing recommendation. [[GH-10094](https://github.com/hashicorp/nomad/pull/10094)] diff --git a/api/csi.go b/api/csi.go index 66ce2731d88..28da54bf0c8 100644 --- a/api/csi.go +++ b/api/csi.go @@ -28,11 +28,6 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er return resp, qm, nil } -// PluginList returns all CSI volumes for the specified plugin id -func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) { - return v.List(&QueryOptions{Prefix: pluginID}) -} - // Info is used to retrieve a single CSIVolume func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) { var resp CSIVolume diff --git a/command/agent/csi_endpoint.go b/command/agent/csi_endpoint.go index 15d09893586..1a2b91e8237 100644 --- a/command/agent/csi_endpoint.go +++ b/command/agent/csi_endpoint.go @@ -34,7 +34,9 @@ func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Reque if s.parse(resp, req, &args.Region, &args.QueryOptions) { return nil, nil } - + if prefix, ok := query["prefix"]; ok { + args.Prefix = prefix[0] + } if plugin, ok := query["plugin_id"]; ok { args.PluginID = plugin[0] } diff --git a/command/volume_deregister.go b/command/volume_deregister.go index 83ec35595cf..1710f3cafd8 100644 --- a/command/volume_deregister.go +++ b/command/volume_deregister.go @@ -2,8 +2,10 @@ package command import ( "fmt" + "sort" "strings" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" "github.com/posener/complete" ) @@ -91,6 +93,28 @@ func (c *VolumeDeregisterCommand) Run(args []string) int { return 1 } + // Prefix search for the volume + vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: volID}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err)) + return 1 + } + if len(vols) > 1 { + sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID }) + out, err := csiFormatSortedVolumes(vols, shortId) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting: %s", err)) + return 1 + } + c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) + return 1 + } + if len(vols) == 0 { + c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", volID)) + return 1 + } + volID = vols[0].ID + // Confirm the -force flag if force { question := fmt.Sprintf("Are you sure you want to force deregister volume %q? [y/N]", volID) diff --git a/command/volume_detach.go b/command/volume_detach.go index ec6855c75e6..16e453a43dd 100644 --- a/command/volume_detach.go +++ b/command/volume_detach.go @@ -2,8 +2,10 @@ package command import ( "fmt" + "sort" "strings" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" "github.com/posener/complete" ) @@ -112,6 +114,29 @@ func (c *VolumeDetachCommand) Run(args []string) int { // volume's claimed allocations, otherwise just use the node ID we've been // given. if len(nodes) == 0 { + + // Prefix search for the volume + vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: volID}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err)) + return 1 + } + if len(vols) > 1 { + sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID }) + out, err := csiFormatSortedVolumes(vols, shortId) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting: %s", err)) + return 1 + } + c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) + return 1 + } + if len(vols) == 0 { + c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", volID)) + return 1 + } + volID = vols[0].ID + vol, _, err := client.CSIVolumes().Info(volID, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error querying volume: %s", err)) diff --git a/command/volume_status_csi.go b/command/volume_status_csi.go index 95f6883bf75..d81191c072f 100644 --- a/command/volume_status_csi.go +++ b/command/volume_status_csi.go @@ -39,6 +39,27 @@ func (c *VolumeStatusCommand) csiStatus(client *api.Client, id string) int { return 0 } + // Prefix search for the volume + vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: id}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err)) + return 1 + } + if len(vols) > 1 { + out, err := c.csiFormatVolumes(vols) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting: %s", err)) + return 1 + } + c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out)) + return 1 + } + if len(vols) == 0 { + c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", id)) + return 1 + } + id = vols[0].ID + // Try querying the volume vol, _, err := client.CSIVolumes().Info(id, nil) if err != nil { @@ -68,11 +89,16 @@ func (c *VolumeStatusCommand) csiFormatVolumes(vols []*api.CSIVolumeListStub) (s return out, nil } + return csiFormatSortedVolumes(vols, c.length) +} + +// Format the volumes, assumes that we're already sorted by volume ID +func csiFormatSortedVolumes(vols []*api.CSIVolumeListStub, length int) (string, error) { rows := make([]string, len(vols)+1) rows[0] = "ID|Name|Plugin ID|Schedulable|Access Mode" for i, v := range vols { rows[i+1] = fmt.Sprintf("%s|%s|%s|%t|%s", - limit(v.ID, c.length), + limit(v.ID, length), v.Name, v.PluginID, v.Schedulable, diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 19df0de9c04..a68eabb4fa6 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -120,15 +120,18 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV if err != nil { return err } + // Query all volumes var iter memdb.ResultIterator + prefix := args.Prefix + if args.NodeID != "" { - iter, err = snap.CSIVolumesByNodeID(ws, args.NodeID) + iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID) } else if args.PluginID != "" { - iter, err = snap.CSIVolumesByPluginID(ws, ns, args.PluginID) + iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID) } else { - iter, err = snap.CSIVolumesByNamespace(ws, ns) + iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix) } if err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 2317631e2cb..ba0f94712af 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2126,26 +2126,28 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) { txn := s.db.ReadTxn() - watchCh, obj, err := txn.FirstWatch("csi_volumes", "id_prefix", namespace, id) + watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id) if err != nil { - return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) + return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err) } - ws.Add(watchCh) if obj == nil { return nil, nil } + vol, ok := obj.(*structs.CSIVolume) + if !ok { + return nil, fmt.Errorf("volume row conversion error") + } // we return the volume with the plugins denormalized by default, // because the scheduler needs them for feasibility checking - vol := obj.(*structs.CSIVolume) return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy()) } // CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it // wants to also denormalize the plugins. -func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID string) (memdb.ResultIterator, error) { +func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "plugin_id", pluginID) @@ -2159,7 +2161,7 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID if !ok { return false } - return v.Namespace != namespace + return v.Namespace != namespace && strings.HasPrefix(v.ID, prefix) } wrap := memdb.NewFilterIterator(iter, f) @@ -2183,7 +2185,7 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID // CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should // snapshot if it wants to also denormalize the plugins. -func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) { +func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) { allocs, err := s.AllocsByNode(ws, nodeID) if err != nil { return nil, fmt.Errorf("alloc lookup failed: %v", err) @@ -2212,23 +2214,24 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb iter := NewSliceIterator() txn := s.db.ReadTxn() for id, namespace := range ids { - raw, err := txn.First("csi_volumes", "id", namespace, id) - if err != nil { - return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) + if strings.HasPrefix(id, prefix) { + watchCh, raw, err := txn.FirstWatch("csi_volumes", "id", namespace, id) + if err != nil { + return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) + } + ws.Add(watchCh) + iter.Add(raw) } - iter.Add(raw) } - ws.Add(iter.WatchCh()) - return iter, nil } // CSIVolumesByNamespace looks up the entire csi_volumes table -func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { +func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - iter, err := txn.Get("csi_volumes", "id_prefix", namespace, "") + iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix) if err != nil { return nil, fmt.Errorf("volume lookup failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 8f01cac7835..6747c3d05e2 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2913,7 +2913,7 @@ func TestStateStore_CSIVolume(t *testing.T) { require.Error(t, err, fmt.Sprintf("volume exists: %s", v0.ID)) ws := memdb.NewWatchSet() - iter, err := state.CSIVolumesByNamespace(ws, ns) + iter, err := state.CSIVolumesByNamespace(ws, ns, "") require.NoError(t, err) slurp := func(iter memdb.ResultIterator) (vs []*structs.CSIVolume) { @@ -2932,13 +2932,13 @@ func TestStateStore_CSIVolume(t *testing.T) { require.Equal(t, 2, len(vs)) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 1, len(vs)) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByNodeID(ws, node.ID) + iter, err = state.CSIVolumesByNodeID(ws, "", node.ID) require.NoError(t, err) vs = slurp(iter) require.Equal(t, 1, len(vs)) @@ -2973,7 +2973,7 @@ func TestStateStore_CSIVolume(t *testing.T) { require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie") require.NoError(t, err) vs = slurp(iter) require.False(t, vs[0].WriteFreeClaims()) @@ -2982,7 +2982,7 @@ func TestStateStore_CSIVolume(t *testing.T) { err = state.CSIVolumeClaim(2, ns, vol0, claim0) require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie") require.NoError(t, err) vs = slurp(iter) require.True(t, vs[0].ReadSchedulable()) @@ -3018,13 +3018,13 @@ func TestStateStore_CSIVolume(t *testing.T) { // List, now omitting the deregistered volume ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 0, len(vs)) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByNamespace(ws, ns) + iter, err = state.CSIVolumesByNamespace(ws, ns, "") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 1, len(vs)) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 399ab457e86..0d6346084de 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -266,7 +266,7 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) { // Find the count per plugin for this node, so that can enforce MaxVolumes pluginCount := map[string]int64{} - iter, err := c.ctx.State().CSIVolumesByNodeID(ws, n.ID) + iter, err := c.ctx.State().CSIVolumesByNodeID(ws, "", n.ID) if err != nil { return false, FilterConstraintCSIVolumesLookupFailed } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index a950690db44..95877569db3 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -105,7 +105,7 @@ type State interface { CSIVolumeByID(memdb.WatchSet, string, string) (*structs.CSIVolume, error) // CSIVolumeByID fetch CSI volumes, containing controller jobs - CSIVolumesByNodeID(memdb.WatchSet, string) (memdb.ResultIterator, error) + CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error) } // Planner interface is used to submit a task allocation plan. diff --git a/vendor/github.com/hashicorp/nomad/api/csi.go b/vendor/github.com/hashicorp/nomad/api/csi.go index 66ce2731d88..28da54bf0c8 100644 --- a/vendor/github.com/hashicorp/nomad/api/csi.go +++ b/vendor/github.com/hashicorp/nomad/api/csi.go @@ -28,11 +28,6 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er return resp, qm, nil } -// PluginList returns all CSI volumes for the specified plugin id -func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) { - return v.List(&QueryOptions{Prefix: pluginID}) -} - // Info is used to retrieve a single CSIVolume func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) { var resp CSIVolume