Skip to content

Commit

Permalink
Replace coalescer implementation with new coalescer package
Browse files Browse the repository at this point in the history
Signed-off-by: Connor Catlett <[email protected]>
  • Loading branch information
ConnorJC3 committed May 2, 2024
1 parent c124663 commit a813afe
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 315 deletions.
46 changes: 17 additions & 29 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/coalescer"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util/template"
Expand Down Expand Up @@ -58,20 +59,20 @@ const isManagedByDriver = "true"

// ControllerService represents the controller service of CSI driver
type ControllerService struct {
cloud cloud.Cloud
inFlight *internal.InFlight
options *Options
modifyVolumeManager *modifyVolumeManager
cloud cloud.Cloud
inFlight *internal.InFlight
options *Options
modifyVolumeCoalescer coalescer.Coalescer[modifyVolumeRequest, int32]
rpc.UnimplementedModifyServer
}

// NewControllerService creates a new controller service
func NewControllerService(c cloud.Cloud, o *Options) *ControllerService {
return &ControllerService{
cloud: c,
options: o,
inFlight: internal.NewInFlight(),
modifyVolumeManager: newModifyVolumeManager(),
cloud: c,
options: o,
inFlight: internal.NewInFlight(),
modifyVolumeCoalescer: newModifyVolumeCoalescer(c, o),
}
}

Expand Down Expand Up @@ -557,26 +558,11 @@ func (d *ControllerService) ControllerExpandVolume(ctx context.Context, req *csi
return nil, status.Error(codes.InvalidArgument, "After round-up, volume size exceeds the limit specified")
}

responseChan := make(chan modifyVolumeResponse)
modifyVolumeRequest := modifyVolumeRequest{
newSize: newSize,
responseChan: responseChan,
}

// Intentionally not pass in context as we deal with context locally in this method
d.addModifyVolumeRequest(volumeID, &modifyVolumeRequest) //nolint:contextcheck

var actualSizeGiB int32

select {
case response := <-responseChan:
if response.err != nil {
return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, response.err)
} else {
actualSizeGiB = response.volumeSize
}
case <-ctx.Done():
return nil, status.Errorf(codes.Internal, "Could not resize volume %q: context cancelled", volumeID)
actualSizeGiB, err := d.modifyVolumeCoalescer.Coalesce(volumeID, modifyVolumeRequest{
newSize: newSize,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err)
}

nodeExpansionRequired := true
Expand Down Expand Up @@ -605,7 +591,9 @@ func (d *ControllerService) ControllerModifyVolume(ctx context.Context, req *csi
return nil, err
}

err = d.modifyVolumeWithCoalescing(ctx, volumeID, options)
_, err = d.modifyVolumeCoalescer.Coalesce(volumeID, modifyVolumeRequest{
modifyDiskOptions: *options,
})
if err != nil {
return nil, err
}
Expand Down
233 changes: 54 additions & 179 deletions pkg/driver/controller_modify_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/coalescer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
Expand All @@ -44,150 +44,6 @@ const (
type modifyVolumeRequest struct {
newSize int64
modifyDiskOptions cloud.ModifyDiskOptions
// Channel for sending the response to the request caller
responseChan chan modifyVolumeResponse
}

type modifyVolumeResponse struct {
volumeSize int32
err error
}

type modifyVolumeRequestHandler struct {
volumeID string
// Merged request from the requests that have been accepted for the volume
mergedRequest *modifyVolumeRequest
// Channel for sending requests to the goroutine for the volume
requestChan chan *modifyVolumeRequest
}

type modifyVolumeManager struct {
// Map of volume ID to modifyVolumeRequestHandler
requestHandlerMap sync.Map
}

func newModifyVolumeManager() *modifyVolumeManager {
return &modifyVolumeManager{
requestHandlerMap: sync.Map{},
}
}

func newModifyVolumeRequestHandler(volumeID string, request *modifyVolumeRequest) modifyVolumeRequestHandler {
requestChan := make(chan *modifyVolumeRequest)
return modifyVolumeRequestHandler{
requestChan: requestChan,
mergedRequest: request,
volumeID: volumeID,
}
}

// This function validates the new request against the merged request for the volume.
// If the new request has a volume property that's already included in the merged request and its value is different from that in the merged request,
// this function will return an error and the new request will be rejected.
func (h *modifyVolumeRequestHandler) validateModifyVolumeRequest(r *modifyVolumeRequest) error {
if r.newSize != 0 && h.mergedRequest.newSize != 0 && r.newSize != h.mergedRequest.newSize {
return fmt.Errorf("Different size was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.newSize, r.newSize)
}
if r.modifyDiskOptions.IOPS != 0 && h.mergedRequest.modifyDiskOptions.IOPS != 0 && r.modifyDiskOptions.IOPS != h.mergedRequest.modifyDiskOptions.IOPS {
return fmt.Errorf("Different IOPS was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.modifyDiskOptions.IOPS, r.modifyDiskOptions.IOPS)
}
if r.modifyDiskOptions.Throughput != 0 && h.mergedRequest.modifyDiskOptions.Throughput != 0 && r.modifyDiskOptions.Throughput != h.mergedRequest.modifyDiskOptions.Throughput {
return fmt.Errorf("Different throughput was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.modifyDiskOptions.Throughput, r.modifyDiskOptions.Throughput)
}
if r.modifyDiskOptions.VolumeType != "" && h.mergedRequest.modifyDiskOptions.VolumeType != "" && r.modifyDiskOptions.VolumeType != h.mergedRequest.modifyDiskOptions.VolumeType {
return fmt.Errorf("Different volume type was requested by a previous request. Current: %s, Requested: %s", h.mergedRequest.modifyDiskOptions.VolumeType, r.modifyDiskOptions.VolumeType)
}
return nil
}

func (h *modifyVolumeRequestHandler) mergeModifyVolumeRequest(r *modifyVolumeRequest) {
if r.newSize != 0 {
h.mergedRequest.newSize = r.newSize
}
if r.modifyDiskOptions.IOPS != 0 {
h.mergedRequest.modifyDiskOptions.IOPS = r.modifyDiskOptions.IOPS
}
if r.modifyDiskOptions.Throughput != 0 {
h.mergedRequest.modifyDiskOptions.Throughput = r.modifyDiskOptions.Throughput
}
if r.modifyDiskOptions.VolumeType != "" {
h.mergedRequest.modifyDiskOptions.VolumeType = r.modifyDiskOptions.VolumeType
}
}

// processModifyVolumeRequests method starts its execution with a timer that has modifyVolumeRequestHandlerTimeout as its timeout value.
// When the Timer times out, it calls the ec2 API to perform the volume modification. processModifyVolumeRequests method sends back the response of
// the ec2 API call to the CSI Driver main thread via response channels.
// This method receives requests from CSI driver main thread via the request channel. When a new request is received from the request channel, we first
// validate the new request. If the new request is acceptable, it will be merged with the existing request for the volume.
func (d *ControllerService) processModifyVolumeRequests(h *modifyVolumeRequestHandler, responseChans []chan modifyVolumeResponse) {
klog.V(4).InfoS("Start processing ModifyVolumeRequest for ", "volume ID", h.volumeID)
process := func(req *modifyVolumeRequest) {
if err := h.validateModifyVolumeRequest(req); err != nil {
req.responseChan <- modifyVolumeResponse{err: err}
} else {
h.mergeModifyVolumeRequest(req)
responseChans = append(responseChans, req.responseChan)
}
}

for {
select {
case req := <-h.requestChan:
process(req)
case <-time.After(d.options.ModifyVolumeRequestHandlerTimeout):
d.modifyVolumeManager.requestHandlerMap.Delete(h.volumeID)
// At this point, no new requests can come in on the request channel because it has been removed from the map
// However, the request channel may still have requests waiting on it
// Thus, process any requests still waiting in the channel
for loop := true; loop; {
select {
case req := <-h.requestChan:
process(req)
default:
loop = false
}
}
actualSizeGiB, err := d.executeModifyVolumeRequest(h.volumeID, h.mergedRequest)
for _, c := range responseChans {
select {
case c <- modifyVolumeResponse{volumeSize: actualSizeGiB, err: err}:
default:
klog.V(6).InfoS("Ignoring response channel because it has no receiver", "volumeID", h.volumeID)
}
}
return
}
}
}

// When a new request comes in, we look up requestHandlerMap using the volume ID of the request.
// If there's no ModifyVolumeRequestHandler for the volume, meaning that there’s no inflight requests for the volume, we will start a goroutine
// for the volume calling processModifyVolumeRequests method, and ModifyVolumeRequestHandler for the volume will be added to requestHandlerMap.
// If there’s ModifyVolumeRequestHandler for the volume, meaning that there is inflight request(s) for the volume, we will send the new request
// to the goroutine for the volume via the receiving channel.
// Note that each volume with inflight requests has their own goroutine which follows timeout schedule of their own.
func (d *ControllerService) addModifyVolumeRequest(volumeID string, r *modifyVolumeRequest) {
requestHandler := newModifyVolumeRequestHandler(volumeID, r)
handler, loaded := d.modifyVolumeManager.requestHandlerMap.LoadOrStore(volumeID, requestHandler)
if loaded {
h := handler.(modifyVolumeRequestHandler)
h.requestChan <- r
} else {
responseChans := []chan modifyVolumeResponse{r.responseChan}
go d.processModifyVolumeRequests(&requestHandler, responseChans)
}
}

func (d *ControllerService) executeModifyVolumeRequest(volumeID string, req *modifyVolumeRequest) (int32, error) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
actualSizeGiB, err := d.cloud.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions)
if err != nil {
return 0, err
} else {
return actualSizeGiB, nil
}
}

func (d *ControllerService) GetCSIDriverModificationCapability(
Expand All @@ -202,30 +58,74 @@ func (d *ControllerService) ModifyVolumeProperties(
req *rpc.ModifyVolumePropertiesRequest,
) (*rpc.ModifyVolumePropertiesResponse, error) {
klog.V(4).InfoS("ModifyVolumeProperties called", "req", req)
if err := validateModifyVolumePropertiesRequest(req); err != nil {
return nil, err
name := req.GetName()
if name == "" {
return nil, status.Error(codes.InvalidArgument, "Volume name not provided")
}

options, err := parseModifyVolumeParameters(req.GetParameters())
if err != nil {
return nil, err
}

name := req.GetName()
err = d.modifyVolumeWithCoalescing(ctx, name, options)
_, err = d.modifyVolumeCoalescer.Coalesce(name, modifyVolumeRequest{
modifyDiskOptions: *options,
})
if err != nil {
return nil, err
}

return &rpc.ModifyVolumePropertiesResponse{}, nil
}

func validateModifyVolumePropertiesRequest(req *rpc.ModifyVolumePropertiesRequest) error {
name := req.GetName()
if name == "" {
return status.Error(codes.InvalidArgument, "Volume name not provided")
func newModifyVolumeCoalescer(c cloud.Cloud, o *Options) coalescer.Coalescer[modifyVolumeRequest, int32] {
return coalescer.New[modifyVolumeRequest, int32](o.ModifyVolumeRequestHandlerTimeout, mergeModifyVolumeRequest, executeModifyVolumeRequest(c))
}

func mergeModifyVolumeRequest(input modifyVolumeRequest, existing modifyVolumeRequest) (modifyVolumeRequest, error) {
if input.newSize != 0 {
if existing.newSize != 0 && input.newSize != existing.newSize {
return existing, fmt.Errorf("Different size was requested by a previous request. Current: %d, Requested: %d", existing.newSize, input.newSize)
}
existing.newSize = input.newSize
}
if input.modifyDiskOptions.IOPS != 0 {
if existing.modifyDiskOptions.IOPS != 0 && input.modifyDiskOptions.IOPS != existing.modifyDiskOptions.IOPS {
return existing, fmt.Errorf("Different IOPS was requested by a previous request. Current: %d, Requested: %d", existing.modifyDiskOptions.IOPS, input.modifyDiskOptions.IOPS)
}
existing.modifyDiskOptions.IOPS = input.modifyDiskOptions.IOPS
}
if input.modifyDiskOptions.Throughput != 0 {
if existing.modifyDiskOptions.Throughput != 0 && input.modifyDiskOptions.Throughput != existing.modifyDiskOptions.Throughput {
return existing, fmt.Errorf("Different throughput was requested by a previous request. Current: %d, Requested: %d", existing.modifyDiskOptions.Throughput, input.modifyDiskOptions.Throughput)
}
existing.modifyDiskOptions.Throughput = input.modifyDiskOptions.Throughput
}
if input.modifyDiskOptions.VolumeType != "" {
if existing.modifyDiskOptions.VolumeType != "" && input.modifyDiskOptions.VolumeType != existing.modifyDiskOptions.VolumeType {
return existing, fmt.Errorf("Different volume type was requested by a previous request. Current: %s, Requested: %s", existing.modifyDiskOptions.VolumeType, input.modifyDiskOptions.VolumeType)
}
existing.modifyDiskOptions.VolumeType = input.modifyDiskOptions.VolumeType
}

return existing, nil
}

func executeModifyVolumeRequest(cloud cloud.Cloud) func(string, modifyVolumeRequest) (int32, error) {
return func(volumeID string, req modifyVolumeRequest) (int32, error) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
actualSizeGiB, err := cloud.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions)
if err != nil {
// Kubernetes sidecars treats "Invalid Argument" errors as infeasible and retries less aggressively
if errors.Is(err, cloud.ErrInvalidArgument) {

Check failure on line 121 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / buildx (ubuntu-latest)

cloud.ErrInvalidArgument undefined (type cloud.Cloud has no field or method ErrInvalidArgument)

Check failure on line 121 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / Generate PR Coverage

cloud.ErrInvalidArgument undefined (type cloud.Cloud has no field or method ErrInvalidArgument)
return 0, status.Errorf(codes.InvalidArgument, "Could not modify volume (invalid argument) %q: %v", volume, response.err)

Check failure on line 122 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / buildx (ubuntu-latest)

undefined: volume

Check failure on line 122 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / buildx (ubuntu-latest)

undefined: response

Check failure on line 122 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / Generate PR Coverage

undefined: volume

Check failure on line 122 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / Generate PR Coverage

undefined: response
}
return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volume, response.err)

Check failure on line 124 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / buildx (ubuntu-latest)

undefined: volume

Check failure on line 124 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / buildx (ubuntu-latest)

undefined: response

Check failure on line 124 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / Generate PR Coverage

undefined: volume

Check failure on line 124 in pkg/driver/controller_modify_volume.go

View workflow job for this annotation

GitHub Actions / Generate PR Coverage

undefined: response
} else {
return actualSizeGiB, nil
}
}
return nil
}

func parseModifyVolumeParameters(params map[string]string) (*cloud.ModifyDiskOptions, error) {
Expand Down Expand Up @@ -259,28 +159,3 @@ func parseModifyVolumeParameters(params map[string]string) (*cloud.ModifyDiskOpt

return &options, nil
}

func (d *ControllerService) modifyVolumeWithCoalescing(ctx context.Context, volume string, options *cloud.ModifyDiskOptions) error {
responseChan := make(chan modifyVolumeResponse)
request := modifyVolumeRequest{
modifyDiskOptions: *options,
responseChan: responseChan,
}

// Intentionally not pass in context as we deal with context locally in this method
d.addModifyVolumeRequest(volume, &request) //nolint:contextcheck

select {
case response := <-responseChan:
if response.err != nil {
if errors.Is(response.err, cloud.ErrInvalidArgument) {
return status.Errorf(codes.InvalidArgument, "Could not modify volume %q: %v", volume, response.err)
}
return status.Errorf(codes.Internal, "Could not modify volume %q: %v", volume, response.err)
}
case <-ctx.Done():
return status.Errorf(codes.Internal, "Could not modify volume %q: context cancelled", volume)
}

return nil
}
8 changes: 4 additions & 4 deletions pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3567,10 +3567,10 @@ func TestControllerExpandVolume(t *testing.T) {
mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(tc.req.GetVolumeId()), gomock.Any(), gomock.Any()).Return(retSizeGiB, nil).AnyTimes()

awsDriver := ControllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
options: &Options{},
modifyVolumeManager: newModifyVolumeManager(),
cloud: mockCloud,
inFlight: internal.NewInFlight(),
options: &Options{},
modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, &Options{}),
}

resp, err := awsDriver.ControllerExpandVolume(ctx, tc.req)
Expand Down
Loading

0 comments on commit a813afe

Please sign in to comment.