Skip to content

Commit

Permalink
backport of commit 2ae8711
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Jul 20, 2023
1 parent 189cdeb commit ac07ac1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
8 changes: 8 additions & 0 deletions .changelog/17996.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
```release-note:bug
csi: Fixed a bug in sending concurrent requests to CSI controller plugins by serializing them per plugin
```

```release-note:bug
csi: Fixed a bug where CSI controller requests could be sent to unhealthy plugins
```

```release-note:bug
csi: Fixed a bug where CSI controller requests could not be sent to controllers on nodes ineligible for scheduling
```
42 changes: 34 additions & 8 deletions nomad/client_csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package nomad

import (
"errors"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -275,21 +276,46 @@ func (a *ClientCSI) clientIDsForController(pluginID string) ([]string, error) {

clientIDs := []string{}

if len(plugin.Controllers) == 0 {
return nil, fmt.Errorf("failed to find instances of controller plugin %q", pluginID)
}

var merr error
for clientID, controller := range plugin.Controllers {
if !controller.IsController() || !controller.Healthy {
// we don't have separate types for CSIInfo depending on
// whether it's a controller or node. this error shouldn't
// make it to production but is to aid developers during
// development
if !controller.IsController() {
// we don't have separate types for CSIInfo depending on whether
// it's a controller or node. this error should never make it to
// production
merr = errors.Join(merr, fmt.Errorf(
"plugin instance %q is not a controller but was registered as one - this is always a bug", controller.AllocID))
continue
}

if !controller.Healthy {
merr = errors.Join(merr, fmt.Errorf(
"plugin instance %q is not healthy", controller.AllocID))
continue
}

node, err := getNodeForRpc(snap, clientID)
if err == nil && node != nil && node.Ready() {
clientIDs = append(clientIDs, clientID)
if err != nil || node == nil {
merr = errors.Join(merr, fmt.Errorf(
"cannot find node %q for plugin instance %q", clientID, controller.AllocID))
continue
}

if node.Status != structs.NodeStatusReady {
merr = errors.Join(merr, fmt.Errorf(
"node %q for plugin instance %q is not ready", clientID, controller.AllocID))
continue
}

clientIDs = append(clientIDs, clientID)
}

if len(clientIDs) == 0 {
return nil, fmt.Errorf("failed to find clients running controller plugin %q", pluginID)
return nil, fmt.Errorf("failed to find clients running controller plugin %q: %v",
pluginID, merr)
}

// Many plugins don't handle concurrent requests as described in the spec,
Expand Down

0 comments on commit ac07ac1

Please sign in to comment.