Skip to content

Commit

Permalink
pass device path on publish from controller
Browse files Browse the repository at this point in the history
When the node plugin receives a publish (== mount) call, it needs to know
what device needs to be mounted. Up to now, this was done by an API request
to LINSTOR. With the introduction of more aggressive caching, this often
meant the request would end in a 404 if multiple volumes where mounted in
short succession.

To fix this, we move the retrieval of the device path out of the node plugin
entirely. We can simply pass the device path from the ControllerPublish() call
down using the publish context.

We also remove the request for the device path from the NodeExpand() call:
we have to look at the local mount table so we get the device name from there.

With this change, the node plugin should no longer need to request LINSTOR API
outside the node/storage pool information, which is not affected by this caching
issue.

Signed-off-by: Moritz Wanzenböck <[email protected]>
  • Loading branch information
WanzenBug authored and rck committed May 16, 2024
1 parent d42c0d9 commit 08a17ce
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 49 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Pass device paths down from CSI Controller on publish, reducing LINSTOR API requests from the CSI Node.

## [1.6.0] - 2024-05-02

### Added
Expand Down
56 changes: 39 additions & 17 deletions pkg/client/linstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,19 +463,19 @@ func (s *Linstor) GetLegacyVolumeParameters(ctx context.Context, volId string) (
}

// Attach idempotently creates a resource on the given node.
func (s *Linstor) Attach(ctx context.Context, volId, node string, rwxBlock bool) error {
func (s *Linstor) Attach(ctx context.Context, volId, node string, rwxBlock bool) (string, error) {
s.log.WithFields(logrus.Fields{
"volume": volId,
"targetNode": node,
}).Info("attaching volume")

ress, err := s.client.Resources.GetAll(ctx, volId)
if err != nil {
return err
return "", err
}

if len(ress) == 0 {
return fmt.Errorf("failed to attach resource with no deployed replica")
return "", fmt.Errorf("failed to attach resource with no deployed replica")
}

var existingRes *lapi.Resource
Expand All @@ -497,7 +497,7 @@ func (s *Linstor) Attach(ctx context.Context, volId, node string, rwxBlock bool)
}

if otherResInUse >= 2 {
return fmt.Errorf("two other resources already InUse")
return "", fmt.Errorf("two other resources already InUse")
}

if otherResInUse > 0 && rwxBlock {
Expand All @@ -507,11 +507,11 @@ func (s *Linstor) Attach(ctx context.Context, volId, node string, rwxBlock bool)

err = s.client.ResourceDefinitions.Modify(ctx, volId, rdPropsModify)
if err != nil {
return err
return "", err
}
}

propsModify := lapi.GenericPropsModify{OverrideProps: map[string]string{}}
overrideProps := map[string]string{}

// If the resource is already on the node, don't worry about attaching, unless we also need to activate it.
if existingRes == nil || slice.ContainsString(existingRes.Flags, lapiconsts.FlagRscInactive) {
Expand All @@ -523,7 +523,7 @@ func (s *Linstor) Attach(ctx context.Context, volId, node string, rwxBlock bool)
s.log.WithError(err).Info("fall back to manual diskless creation after make-available refused")

if disklessFlag == "" {
return fmt.Errorf("resource does not support diskless attachment")
return "", fmt.Errorf("resource does not support diskless attachment")
}

rCreate := lapi.ResourceCreate{Resource: lapi.Resource{
Expand All @@ -536,35 +536,44 @@ func (s *Linstor) Attach(ctx context.Context, volId, node string, rwxBlock bool)
}

if err != nil {
return err
return "", err
}

if existingRes == nil {
propsModify.OverrideProps[linstor.PropertyCreatedFor] = linstor.CreatedForTemporaryDisklessAttach
overrideProps[linstor.PropertyCreatedFor] = linstor.CreatedForTemporaryDisklessAttach
}

newRsc, err := s.client.Resources.Get(ctx, volId, node)
if err != nil {
return err
return "", err
}

existingRes = &newRsc
}

err = s.client.Resources.ModifyVolume(ctx, volId, node, 0, propsModify)
if err != nil {
return err
if len(overrideProps) > 0 {
err = s.client.Resources.ModifyVolume(ctx, volId, node, 0, lapi.GenericPropsModify{
OverrideProps: overrideProps,
})
if err != nil {
return "", err
}
}

if slice.ContainsString(existingRes.Flags, lapiconsts.FlagDelete) {
return &DeleteInProgressError{
return "", &DeleteInProgressError{
Operation: "attach volume",
Kind: "resource",
Name: volId,
}
}

return nil
vol, err := s.client.Resources.GetVolume(ctx, volId, node, 0)
if err != nil {
return "", err
}

return vol.DevicePath, nil
}

// getDisklessFlag inspects a resource to determine the right diskless flag to use.
Expand Down Expand Up @@ -2117,9 +2126,22 @@ func (s *Linstor) GetVolumeStats(path string) (volume.VolumeStats, error) {
}, nil
}

func (s *Linstor) NodeExpand(source, target string) error {
func (s *Linstor) NodeExpand(target string) error {
mounts, err := s.mounter.List()
if err != nil {
return fmt.Errorf("failed to list mount points: %w", err)
}

i := slices.IndexFunc(mounts, func(m mount.MountPoint) bool {
return m.Path == target
})
if i == -1 {
// Mark this explicitly as a "NotExist" error so upper layers can pass it to CSI appropriately
return fmt.Errorf("mount point '%s' not found: %w", target, os.ErrNotExist)
}

resizer := mount.NewResizeFs(s.mounter.Exec)
_, err := resizer.Resize(source, target)
_, err = resizer.Resize(mounts[i].Device, target)
return err
}

Expand Down
19 changes: 12 additions & 7 deletions pkg/client/linstor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ const (

func TestLinstor_Attach(t *testing.T) {
var (
ResourceModifyReadWrite = lapi.GenericPropsModify{OverrideProps: map[string]string{}}
ResourceModifyReadWriteWithTemporaryAttach = lapi.GenericPropsModify{OverrideProps: map[string]string{linstor.PropertyCreatedFor: linstor.CreatedForTemporaryDisklessAttach}}
)

Expand All @@ -173,12 +172,13 @@ func TestLinstor_Attach(t *testing.T) {

m.ExpectedCalls = []*mock.Call{
{Method: "GetAll", Arguments: mock.Arguments{mock.Anything, ExampleResourceID}, ReturnArguments: mock.Arguments{rv, rvErr}},
{Method: "ModifyVolume", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-2", 0, ResourceModifyReadWrite}, ReturnArguments: mock.Arguments{nil}},
{Method: "GetVolume", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-2", 0}, ReturnArguments: mock.Arguments{lapi.Volume{DevicePath: "/dev/vol1"}, nil}},
}
cl := Linstor{client: &lc.HighLevelClient{Client: &lapi.Client{Resources: &m}}, log: logrus.WithField("test", t.Name())}

err := cl.Attach(context.Background(), ExampleResourceID, "node-2", false)
path, err := cl.Attach(context.Background(), ExampleResourceID, "node-2", false)
assert.NoError(t, err)
assert.Equal(t, "/dev/vol1", path)
m.AssertExpectations(t)
})

Expand All @@ -191,11 +191,13 @@ func TestLinstor_Attach(t *testing.T) {
{Method: "Get", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-3"}, ReturnArguments: mock.Arguments{lapi.Resource{}, nil}},
{Method: "MakeAvailable", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-3", lapi.ResourceMakeAvailable{Diskful: false}}, ReturnArguments: mock.Arguments{nil}},
{Method: "ModifyVolume", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-3", 0, ResourceModifyReadWriteWithTemporaryAttach}, ReturnArguments: mock.Arguments{nil}},
{Method: "GetVolume", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-3", 0}, ReturnArguments: mock.Arguments{lapi.Volume{DevicePath: "/dev/vol1"}, nil}},
}
cl := Linstor{client: &lc.HighLevelClient{Client: &lapi.Client{Resources: &m}}, log: logrus.WithField("test", t.Name())}

err := cl.Attach(context.Background(), ExampleResourceID, "node-3", false)
path, err := cl.Attach(context.Background(), ExampleResourceID, "node-3", false)
assert.NoError(t, err)
assert.Equal(t, "/dev/vol1", path)
m.AssertExpectations(t)
})

Expand All @@ -209,11 +211,13 @@ func TestLinstor_Attach(t *testing.T) {
{Method: "MakeAvailable", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-3", lapi.ResourceMakeAvailable{Diskful: false}}, ReturnArguments: mock.Arguments{lapi.NotFoundError}},
{Method: "Create", Arguments: mock.Arguments{mock.Anything, lapi.ResourceCreate{Resource: lapi.Resource{Name: ExampleResourceID, NodeName: "node-3", Flags: []string{lapiconsts.FlagDrbdDiskless}}}}, ReturnArguments: mock.Arguments{nil}},
{Method: "ModifyVolume", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-3", 0, ResourceModifyReadWriteWithTemporaryAttach}, ReturnArguments: mock.Arguments{nil}},
{Method: "GetVolume", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-3", 0}, ReturnArguments: mock.Arguments{lapi.Volume{DevicePath: "/dev/vol1"}, nil}},
}
cl := Linstor{client: &lc.HighLevelClient{Client: &lapi.Client{Resources: &m}}, log: logrus.WithField("test", t.Name())}

err := cl.Attach(context.Background(), ExampleResourceID, "node-3", false)
path, err := cl.Attach(context.Background(), ExampleResourceID, "node-3", false)
assert.NoError(t, err)
assert.Equal(t, "/dev/vol1", path)
m.AssertExpectations(t)
})

Expand All @@ -225,12 +229,13 @@ func TestLinstor_Attach(t *testing.T) {
{Method: "GetAll", Arguments: mock.Arguments{mock.Anything, ExampleResourceID}, ReturnArguments: mock.Arguments{rv, rvErr}},
{Method: "Get", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-2"}, ReturnArguments: mock.Arguments{lapi.Resource{}, nil}},
{Method: "MakeAvailable", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-2", lapi.ResourceMakeAvailable{Diskful: false}}, ReturnArguments: mock.Arguments{nil}},
{Method: "ModifyVolume", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-2", 0, ResourceModifyReadWrite}, ReturnArguments: mock.Arguments{nil}},
{Method: "GetVolume", Arguments: mock.Arguments{mock.Anything, ExampleResourceID, "node-2", 0}, ReturnArguments: mock.Arguments{lapi.Volume{DevicePath: "/dev/vol1"}, nil}},
}
cl := Linstor{client: &lc.HighLevelClient{Client: &lapi.Client{Resources: &m}}, log: logrus.WithField("test", t.Name())}

err := cl.Attach(context.Background(), ExampleResourceID, "node-2", false)
path, err := cl.Attach(context.Background(), ExampleResourceID, "node-2", false)
assert.NoError(t, err)
assert.Equal(t, "/dev/vol1", path)
m.AssertExpectations(t)
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ func (s *MockStorage) VolFromVol(ctx context.Context, sourceVol, vol *volume.Inf
return nil
}

func (s *MockStorage) Attach(ctx context.Context, volId, node string, rwxBlock bool) error {
func (s *MockStorage) Attach(ctx context.Context, volId, node string, rwxBlock bool) (string, error) {
s.assignedVolumes[volId] = append(s.assignedVolumes[volId], volume.Assignment{Node: node, Path: "/dev/" + volId})
return nil
return "/dev/" + volId, nil
}

func (s *MockStorage) Detach(ctx context.Context, volId, node string) error {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (s *MockStorage) GetVolumeStats(path string) (volume.VolumeStats, error) {
return volume.VolumeStats{}, nil
}

func (s *MockStorage) NodeExpand(source, target string) error {
func (s *MockStorage) NodeExpand(target string) error {
_, err := os.Stat(target)
if err != nil {
return err
Expand Down
46 changes: 26 additions & 20 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package driver

import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -356,18 +357,25 @@ func (d Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolum
volCtx.MountOptions = append(volCtx.MountOptions, "nouuid")
}

assignment, err := d.Assignments.FindAssignmentOnNode(ctx, req.GetVolumeId(), d.nodeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "NodePublishVolume failed for %s: %v", req.GetVolumeId(), err)
}
publishCtx := PublishContextFromMap(req.GetPublishContext())
if publishCtx == nil {
assignment, err := d.Assignments.FindAssignmentOnNode(ctx, req.GetVolumeId(), d.nodeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "NodePublishVolume failed for %s: %v", req.GetVolumeId(), err)
}

if assignment == nil {
return nil, status.Errorf(codes.NotFound, "NodePublishVolume failed for %s: assignment not found", req.GetVolumeId())
}

if assignment == nil {
return nil, status.Errorf(codes.NotFound, "NodePublishVolume failed for %s: assignment not found", req.GetVolumeId())
publishCtx = &PublishContext{
DevicePath: assignment.Path,
}
}

ro := req.GetReadonly() || req.GetVolumeCapability().GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY

err = d.Mounter.Mount(ctx, assignment.Path, req.GetTargetPath(), fsType, ro, volCtx.MountOptions)
err = d.Mounter.Mount(ctx, publishCtx.DevicePath, req.GetTargetPath(), fsType, ro, volCtx.MountOptions)
if err != nil {
return nil, status.Errorf(codes.Internal, "NodePublishVolume failed for %s: %v", req.GetVolumeId(), err)
}
Expand Down Expand Up @@ -657,13 +665,17 @@ func (d Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controller
// ReadWriteMany block volume
rwxBlock := req.VolumeCapability.AccessMode.GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER && req.VolumeCapability.GetBlock() != nil

err = d.Assignments.Attach(ctx, req.GetVolumeId(), req.GetNodeId(), rwxBlock)
devPath, err := d.Assignments.Attach(ctx, req.GetVolumeId(), req.GetNodeId(), rwxBlock)
if err != nil {
return nil, status.Errorf(codes.Internal,
"ControllerPublishVolume failed for %s: %v", req.GetVolumeId(), err)
}

return &csi.ControllerPublishVolumeResponse{}, nil
return &csi.ControllerPublishVolumeResponse{
PublishContext: (&PublishContext{
DevicePath: devPath,
}).ToMap(),
}, nil
}

// ControllerUnpublishVolume https://github.com/container-storage-interface/spec/blob/v1.9.0/spec.md#controllerunpublishvolume
Expand Down Expand Up @@ -1112,19 +1124,13 @@ func (d Driver) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeR
return nil, missingAttr("NodeExpandVolume", req.GetVolumeId(), "TargetPath")
}

assignment, err := d.Assignments.FindAssignmentOnNode(ctx, req.GetVolumeId(), d.nodeID)
err := d.Expander.NodeExpand(req.GetVolumePath())
if err != nil {
return nil, status.Errorf(codes.Internal, "NodeExpandVolume - get assignment failed for volume %s node: %s: %v", req.GetVolumeId(), d.nodeID, err)
}

if assignment == nil {
return nil, status.Errorf(codes.NotFound, "NodeExpandVolume - resource-definitions %s not found", req.GetVolumeId())
}
if errors.Is(err, os.ErrNotExist) {
return nil, status.Errorf(codes.NotFound, "NodePublishVolume failed for %s: mount not found", req.GetVolumeId())
}

err = d.Expander.NodeExpand(assignment.Path, req.GetVolumePath())
if err != nil {
return nil, status.Errorf(codes.Internal, "NodeExpandVolume - expand volume fail source %s target %s, err: %v.",
assignment.Path, req.GetVolumePath(), err)
return nil, status.Errorf(codes.Internal, "NodeExpandVolume - expand volume failed for target %s, err: %v", req.GetVolumePath(), err)
}

return &csi.NodeExpandVolumeResponse{}, nil
Expand Down
32 changes: 32 additions & 0 deletions pkg/driver/publish_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package driver

import (
"github.com/piraeusdatastore/linstor-csi/pkg/linstor"
)

const (
PublishContextMarker = linstor.ParameterNamespace + "/uses-publish-context"
DevicePath = linstor.ParameterNamespace + "/device-path"
)

type PublishContext struct {
DevicePath string
}

func PublishContextFromMap(ctx map[string]string) *PublishContext {
_, ok := ctx[PublishContextMarker]
if !ok {
return nil
}

return &PublishContext{
DevicePath: ctx[DevicePath],
}
}

func (p *PublishContext) ToMap() map[string]string {
return map[string]string{
PublishContextMarker: "true",
DevicePath: p.DevicePath,
}
}
3 changes: 3 additions & 0 deletions pkg/driver/volume_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type VolumeContext struct {
MountOptions []string
PostMountXfsOptions string
RemoteAccessPolicy volume.RemoteAccessPolicy
DevicePath string
}

// NewVolumeContext creates a new default volume context, which does not specify any fancy mkfs/mount/post-mount options
Expand Down Expand Up @@ -56,6 +57,7 @@ func VolumeContextFromMap(ctx map[string]string) (*VolumeContext, error) {
MountOptions: mountOpts,
PostMountXfsOptions: ctx[PostMountXfsOpts],
RemoteAccessPolicy: policy,
DevicePath: ctx[DevicePath],
}, nil
}

Expand All @@ -70,6 +72,7 @@ func (v *VolumeContext) ToMap() (map[string]string, error) {
MountOptions: encodeMountOpts(v.MountOptions),
PostMountXfsOpts: v.PostMountXfsOptions,
RemoteAccessPolicyOpts: string(policy),
DevicePath: v.DevicePath,
}, nil
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type SnapshotCreateDeleter interface {
// AttacherDettacher handles operations relating to volume accessiblity on nodes.
type AttacherDettacher interface {
Querier
Attach(ctx context.Context, volId, node string, rwxBlock bool) error
Attach(ctx context.Context, volId, node string, rwxBlock bool) (string, error)
Detach(ctx context.Context, volId, node string) error
NodeAvailable(ctx context.Context, node string) error
FindAssignmentOnNode(ctx context.Context, volId, node string) (*Assignment, error)
Expand Down Expand Up @@ -143,7 +143,9 @@ type NodeInformer interface {

// Expander handles the resizing operations for volumes.
type Expander interface {
NodeExpand(source, target string) error
// NodeExpand runs the appropriate resize operation on the target path.
// Must return os.ErrNotExist if the path does not exist or is not a valid mount point.
NodeExpand(target string) error
ControllerExpand(ctx context.Context, vol *Info) error
}

Expand Down

0 comments on commit 08a17ce

Please sign in to comment.