Skip to content

Commit

Permalink
csi: reimplement prefix matching
Browse files Browse the repository at this point in the history
Removing the incorrect prefix matching from `CSIVolumeByID` breaks prefix
matching in the command line client. Add the required elements for prefix
matching to the commands and API.
  • Loading branch information
tgross committed Mar 10, 2021
1 parent 97bb912 commit 25782f3
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
BUG FIXES:
* agent: Only allow querying Prometheus formatted metrics if Prometheus is enabled within the config [[GH-10140](https://github.com/hashicorp/nomad/pull/10140)]
* api: Added missing devices block to AllocatedTaskResources [[GH-10064](https://github.com/hashicorp/nomad/pull/10064)]
* api: Removed unimplemented `CSIVolumes.PluginList` API. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)]
* cli: Fixed a bug where non-int proxy port would panic CLI [[GH-10072](https://github.com/hashicorp/nomad/issues/10072)]
* cli: Fixed a bug where `nomad operator debug` incorrectly parsed https Consul API URLs. [[GH-10082](https://github.com/hashicorp/nomad/pull/10082)]
* client: Fixed log formatting when killing tasks. [[GH-10135](https://github.com/hashicorp/nomad/issues/10135)]
* csi: Fixed a bug where volume with IDs that are a substring prefix of another volume could use the wrong volume for feasibility checking. [[GH-10158](https://github.com/hashicorp/nomad/issues/10158)]
* scheduler: Fixed a bug where jobs requesting multiple CSI volumes could be incorrectly scheduled if only one of the volumes passed feasibility checking. [[GH-10143](https://github.com/hashicorp/nomad/issues/10143)]
* ui: Fixed the rendering of interstitial components shown after processing a dynamic application sizing recommendation. [[GH-10094](https://github.com/hashicorp/nomad/pull/10094)]

Expand Down
5 changes: 0 additions & 5 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er
return resp, qm, nil
}

// PluginList returns all CSI volumes for the specified plugin id
func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) {
return v.List(&QueryOptions{Prefix: pluginID})
}

// Info is used to retrieve a single CSIVolume
func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) {
var resp CSIVolume
Expand Down
4 changes: 3 additions & 1 deletion command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Reque
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

if prefix, ok := query["prefix"]; ok {
args.Prefix = prefix[0]
}
if plugin, ok := query["plugin_id"]; ok {
args.PluginID = plugin[0]
}
Expand Down
20 changes: 20 additions & 0 deletions command/volume_deregister.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package command

import (
"fmt"
"sort"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
Expand Down Expand Up @@ -91,6 +93,24 @@ func (c *VolumeDeregisterCommand) Run(args []string) int {
return 1
}

// Prefix search for the volume
vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: volID})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err))
return 1
}
if len(vols) > 1 {
sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID })
out, err := csiFormatSortedVolumes(vols, shortId)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out))
return 1
}
volID = vols[0].ID

// Confirm the -force flag
if force {
question := fmt.Sprintf("Are you sure you want to force deregister volume %q? [y/N]", volID)
Expand Down
21 changes: 21 additions & 0 deletions command/volume_detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package command

import (
"fmt"
"sort"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
Expand Down Expand Up @@ -112,6 +114,25 @@ func (c *VolumeDetachCommand) Run(args []string) int {
// volume's claimed allocations, otherwise just use the node ID we've been
// given.
if len(nodes) == 0 {

// Prefix search for the volume
vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: volID})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err))
return 1
}
if len(vols) > 1 {
sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID })
out, err := csiFormatSortedVolumes(vols, shortId)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out))
return 1
}
volID = vols[0].ID

vol, _, err := client.CSIVolumes().Info(volID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volume: %s", err))
Expand Down
24 changes: 23 additions & 1 deletion command/volume_status_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ func (c *VolumeStatusCommand) csiStatus(client *api.Client, id string) int {
return 0
}

// Prefix search for the volume
vols, _, err := client.CSIVolumes().List(&api.QueryOptions{Prefix: id})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err))
return 1
}
if len(vols) > 1 {
out, err := c.csiFormatVolumes(vols)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple volumes\n\n%s", out))
return 1
}
id = vols[0].ID

// Try querying the volume
vol, _, err := client.CSIVolumes().Info(id, nil)
if err != nil {
Expand Down Expand Up @@ -68,11 +85,16 @@ func (c *VolumeStatusCommand) csiFormatVolumes(vols []*api.CSIVolumeListStub) (s
return out, nil
}

return csiFormatSortedVolumes(vols, c.length)
}

// Format the volumes, assumes that we're already sorted by volume ID
func csiFormatSortedVolumes(vols []*api.CSIVolumeListStub, length int) (string, error) {
rows := make([]string, len(vols)+1)
rows[0] = "ID|Name|Plugin ID|Schedulable|Access Mode"
for i, v := range vols {
rows[i+1] = fmt.Sprintf("%s|%s|%s|%t|%s",
limit(v.ID, c.length),
limit(v.ID, length),
v.Name,
v.PluginID,
v.Schedulable,
Expand Down
10 changes: 7 additions & 3 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,19 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
if err != nil {
return err
}
// TODO: handle prefix arg

// Query all volumes
var iter memdb.ResultIterator

prefix := args.Prefix

if args.NodeID != "" {
iter, err = snap.CSIVolumesByNodeID(ws, args.NodeID)
iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID)
} else if args.PluginID != "" {
iter, err = snap.CSIVolumesByPluginID(ws, ns, args.PluginID)
iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID)
} else {
iter, err = snap.CSIVolumesByNamespace(ws, ns)
iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix)
}

if err != nil {
Expand Down
20 changes: 11 additions & 9 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2145,7 +2145,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st

// CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it
// wants to also denormalize the plugins.
func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID string) (memdb.ResultIterator, error) {
func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("csi_volumes", "plugin_id", pluginID)
Expand All @@ -2159,7 +2159,7 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID
if !ok {
return false
}
return v.Namespace != namespace
return v.Namespace != namespace && strings.HasPrefix(v.ID, prefix)
}

wrap := memdb.NewFilterIterator(iter, f)
Expand All @@ -2183,7 +2183,7 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID

// CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should
// snapshot if it wants to also denormalize the plugins.
func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) {
func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) {
allocs, err := s.AllocsByNode(ws, nodeID)
if err != nil {
return nil, fmt.Errorf("alloc lookup failed: %v", err)
Expand Down Expand Up @@ -2212,11 +2212,13 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb
iter := NewSliceIterator()
txn := s.db.ReadTxn()
for id, namespace := range ids {
raw, err := txn.First("csi_volumes", "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
if strings.HasPrefix(id, prefix) {
raw, err := txn.First("csi_volumes", "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
}
iter.Add(raw)
}
iter.Add(raw)
}

ws.Add(iter.WatchCh())
Expand All @@ -2225,10 +2227,10 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb
}

// CSIVolumesByNamespace looks up the entire csi_volumes table
func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("csi_volumes", "id_prefix", namespace, "")
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,7 +2913,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
require.Error(t, err, fmt.Sprintf("volume exists: %s", v0.ID))

ws := memdb.NewWatchSet()
iter, err := state.CSIVolumesByNamespace(ws, ns)
iter, err := state.CSIVolumesByNamespace(ws, ns, "")
require.NoError(t, err)

slurp := func(iter memdb.ResultIterator) (vs []*structs.CSIVolume) {
Expand All @@ -2932,13 +2932,13 @@ func TestStateStore_CSIVolume(t *testing.T) {
require.Equal(t, 2, len(vs))

ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 1, len(vs))

ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByNodeID(ws, node.ID)
iter, err = state.CSIVolumesByNodeID(ws, "", node.ID)
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 1, len(vs))
Expand Down Expand Up @@ -2973,7 +2973,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
require.NoError(t, err)

ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.False(t, vs[0].WriteFreeClaims())
Expand All @@ -2982,7 +2982,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
err = state.CSIVolumeClaim(2, ns, vol0, claim0)
require.NoError(t, err)
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.True(t, vs[0].ReadSchedulable())
Expand Down Expand Up @@ -3018,13 +3018,13 @@ func TestStateStore_CSIVolume(t *testing.T) {

// List, now omitting the deregistered volume
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 0, len(vs))

ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByNamespace(ws, ns)
iter, err = state.CSIVolumesByNamespace(ws, ns, "")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 1, len(vs))
Expand Down
2 changes: 1 addition & 1 deletion scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {

// Find the count per plugin for this node, so that can enforce MaxVolumes
pluginCount := map[string]int64{}
iter, err := c.ctx.State().CSIVolumesByNodeID(ws, n.ID)
iter, err := c.ctx.State().CSIVolumesByNodeID(ws, "", n.ID)
if err != nil {
return false, FilterConstraintCSIVolumesLookupFailed
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type State interface {
CSIVolumeByID(memdb.WatchSet, string, string) (*structs.CSIVolume, error)

// CSIVolumeByID fetch CSI volumes, containing controller jobs
CSIVolumesByNodeID(memdb.WatchSet, string) (memdb.ResultIterator, error)
CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error)
}

// Planner interface is used to submit a task allocation plan.
Expand Down
5 changes: 0 additions & 5 deletions vendor/github.com/hashicorp/nomad/api/csi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 25782f3

Please sign in to comment.