Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: remove prefix matching from CSIVolumeByID and fix CLI prefix matching #10158

Merged
merged 3 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
BUG FIXES:
* agent: Only allow querying Prometheus formatted metrics if Prometheus is enabled within the config [[GH-10140](https://github.com/hashicorp/nomad/pull/10140)]
* api: Added missing devices block to AllocatedTaskResources [[GH-10064](https://github.com/hashicorp/nomad/pull/10064)]
* api: Removed unimplemented `CSIVolumes.PluginList` API. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)]
* cli: Fixed a bug where non-int proxy port would panic CLI [[GH-10072](https://github.com/hashicorp/nomad/issues/10072)]
* cli: Fixed a bug where `nomad operator debug` incorrectly parsed https Consul API URLs. [[GH-10082](https://github.com/hashicorp/nomad/pull/10082)]
* client: Fixed log formatting when killing tasks. [[GH-10135](https://github.com/hashicorp/nomad/issues/10135)]
* csi: Fixed a bug where volume with IDs that are a substring prefix of another volume could use the wrong volume for feasibility checking. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)]
* scheduler: Fixed a bug where jobs requesting multiple CSI volumes could be incorrectly scheduled if only one of the volumes passed feasibility checking. [[GH-10143](https://github.com/hashicorp/nomad/issues/10143)]
* ui: Fixed the rendering of interstitial components shown after processing a dynamic application sizing recommendation. [[GH-10094](https://github.com/hashicorp/nomad/pull/10094)]

Expand Down
5 changes: 0 additions & 5 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er
return resp, qm, nil
}

// PluginList returns all CSI volumes for the specified plugin id
func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) {
return v.List(&QueryOptions{Prefix: pluginID})
Copy link
Member Author

@tgross tgross Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSIVolumes.List has always been how you get this info, and this code has never worked. It has no callers either, so I'm removing here... I can't imagine this will break anyone's code given that it didn't work. But tagging @cgbaker as a reviewer to see if he has a different opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me; we're not using it in the terraform provider, just using List

}

// Info is used to retrieve a single CSIVolume
func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) {
var resp CSIVolume
Expand Down
4 changes: 3 additions & 1 deletion command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Reque
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

if prefix, ok := query["prefix"]; ok {
args.Prefix = prefix[0]
}
if plugin, ok := query["plugin_id"]; ok {
args.PluginID = plugin[0]
}
Expand Down
24 changes: 24 additions & 0 deletions command/volume_deregister.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package command

import (
"fmt"
"sort"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
Expand Down Expand Up @@ -91,6 +93,28 @@ func (c *VolumeDeregisterCommand) Run(args []string) int {
return 1
}

// Prefix search for the volume
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🏆 these are so helpful to have...

vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: volID})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err))
return 1
}
if len(vols) > 1 {
sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID })
out, err := csiFormatSortedVolumes(vols, shortId)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out))
return 1
}
if len(vols) == 0 {
c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", volID))
return 1
}
volID = vols[0].ID
tgross marked this conversation as resolved.
Show resolved Hide resolved

// Confirm the -force flag
if force {
question := fmt.Sprintf("Are you sure you want to force deregister volume %q? [y/N]", volID)
Expand Down
25 changes: 25 additions & 0 deletions command/volume_detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package command

import (
"fmt"
"sort"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
Expand Down Expand Up @@ -112,6 +114,29 @@ func (c *VolumeDetachCommand) Run(args []string) int {
// volume's claimed allocations, otherwise just use the node ID we've been
// given.
if len(nodes) == 0 {

// Prefix search for the volume
vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: volID})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err))
return 1
}
if len(vols) > 1 {
sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID })
out, err := csiFormatSortedVolumes(vols, shortId)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out))
return 1
}
if len(vols) == 0 {
c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", volID))
return 1
}
volID = vols[0].ID
tgross marked this conversation as resolved.
Show resolved Hide resolved

vol, _, err := client.CSIVolumes().Info(volID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volume: %s", err))
Expand Down
28 changes: 27 additions & 1 deletion command/volume_status_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,27 @@ func (c *VolumeStatusCommand) csiStatus(client *api.Client, id string) int {
return 0
}

// Prefix search for the volume
vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: id})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err))
return 1
}
if len(vols) > 1 {
out, err := c.csiFormatVolumes(vols)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out))
return 1
}
if len(vols) == 0 {
c.Ui.Error(fmt.Sprintf("No volumes(s) with prefix or ID %q found", id))
return 1
}
id = vols[0].ID
tgross marked this conversation as resolved.
Show resolved Hide resolved

// Try querying the volume
vol, _, err := client.CSIVolumes().Info(id, nil)
if err != nil {
Expand Down Expand Up @@ -68,11 +89,16 @@ func (c *VolumeStatusCommand) csiFormatVolumes(vols []*api.CSIVolumeListStub) (s
return out, nil
}

return csiFormatSortedVolumes(vols, c.length)
}

// Format the volumes, assumes that we're already sorted by volume ID
func csiFormatSortedVolumes(vols []*api.CSIVolumeListStub, length int) (string, error) {
rows := make([]string, len(vols)+1)
rows[0] = "ID|Name|Plugin ID|Schedulable|Access Mode"
for i, v := range vols {
rows[i+1] = fmt.Sprintf("%s|%s|%s|%t|%s",
limit(v.ID, c.length),
limit(v.ID, length),
v.Name,
v.PluginID,
v.Schedulable,
Expand Down
9 changes: 6 additions & 3 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,18 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
if err != nil {
return err
}

// Query all volumes
var iter memdb.ResultIterator

prefix := args.Prefix

if args.NodeID != "" {
iter, err = snap.CSIVolumesByNodeID(ws, args.NodeID)
iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID)
} else if args.PluginID != "" {
iter, err = snap.CSIVolumesByPluginID(ws, ns, args.PluginID)
iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID)
} else {
iter, err = snap.CSIVolumesByNamespace(ws, ns)
iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix)
}

if err != nil {
Expand Down
33 changes: 18 additions & 15 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2126,26 +2126,28 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error)
func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) {
txn := s.db.ReadTxn()

watchCh, obj, err := txn.FirstWatch("csi_volumes", "id_prefix", namespace, id)
watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err)
}

ws.Add(watchCh)

if obj == nil {
return nil, nil
}
vol, ok := obj.(*structs.CSIVolume)
if !ok {
return nil, fmt.Errorf("volume row conversion error")
}

// we return the volume with the plugins denormalized by default,
// because the scheduler needs them for feasibility checking
vol := obj.(*structs.CSIVolume)
return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy())
}

// CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it
// wants to also denormalize the plugins.
func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID string) (memdb.ResultIterator, error) {
func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("csi_volumes", "plugin_id", pluginID)
Expand All @@ -2159,7 +2161,7 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID
if !ok {
return false
}
return v.Namespace != namespace
return v.Namespace != namespace && strings.HasPrefix(v.ID, prefix)
}

wrap := memdb.NewFilterIterator(iter, f)
Expand All @@ -2183,7 +2185,7 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID

// CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should
// snapshot if it wants to also denormalize the plugins.
func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) {
func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) {
allocs, err := s.AllocsByNode(ws, nodeID)
if err != nil {
return nil, fmt.Errorf("alloc lookup failed: %v", err)
Expand Down Expand Up @@ -2212,23 +2214,24 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb
iter := NewSliceIterator()
txn := s.db.ReadTxn()
for id, namespace := range ids {
raw, err := txn.First("csi_volumes", "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
if strings.HasPrefix(id, prefix) {
watchCh, raw, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
}
ws.Add(watchCh)
iter.Add(raw)
}
iter.Add(raw)
}

ws.Add(iter.WatchCh())

return iter, nil
}

// CSIVolumesByNamespace looks up the entire csi_volumes table
func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("csi_volumes", "id_prefix", namespace, "")
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,7 +2913,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
require.Error(t, err, fmt.Sprintf("volume exists: %s", v0.ID))

ws := memdb.NewWatchSet()
iter, err := state.CSIVolumesByNamespace(ws, ns)
iter, err := state.CSIVolumesByNamespace(ws, ns, "")
require.NoError(t, err)

slurp := func(iter memdb.ResultIterator) (vs []*structs.CSIVolume) {
Expand All @@ -2932,13 +2932,13 @@ func TestStateStore_CSIVolume(t *testing.T) {
require.Equal(t, 2, len(vs))

ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 1, len(vs))

ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByNodeID(ws, node.ID)
iter, err = state.CSIVolumesByNodeID(ws, "", node.ID)
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 1, len(vs))
Expand Down Expand Up @@ -2973,7 +2973,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
require.NoError(t, err)

ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.False(t, vs[0].WriteFreeClaims())
Expand All @@ -2982,7 +2982,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
err = state.CSIVolumeClaim(2, ns, vol0, claim0)
require.NoError(t, err)
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.True(t, vs[0].ReadSchedulable())
Expand Down Expand Up @@ -3018,13 +3018,13 @@ func TestStateStore_CSIVolume(t *testing.T) {

// List, now omitting the deregistered volume
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 0, len(vs))

ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByNamespace(ws, ns)
iter, err = state.CSIVolumesByNamespace(ws, ns, "")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 1, len(vs))
Expand Down
2 changes: 1 addition & 1 deletion scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {

// Find the count per plugin for this node, so that can enforce MaxVolumes
pluginCount := map[string]int64{}
iter, err := c.ctx.State().CSIVolumesByNodeID(ws, n.ID)
iter, err := c.ctx.State().CSIVolumesByNodeID(ws, "", n.ID)
if err != nil {
return false, FilterConstraintCSIVolumesLookupFailed
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type State interface {
CSIVolumeByID(memdb.WatchSet, string, string) (*structs.CSIVolume, error)

// CSIVolumeByID fetch CSI volumes, containing controller jobs
CSIVolumesByNodeID(memdb.WatchSet, string) (memdb.ResultIterator, error)
CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error)
}

// Planner interface is used to submit a task allocation plan.
Expand Down
5 changes: 0 additions & 5 deletions vendor/github.com/hashicorp/nomad/api/csi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.