Skip to content

Commit

Permalink
Batch EC2 DescribeVolumesModifications API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSirenko committed Mar 29, 2024
1 parent ca30b31 commit 0ccce07
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 34 deletions.
126 changes: 107 additions & 19 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ type snapshotBatcherType int

// batcherManager maintains a collection of batchers for different types of tasks.
type batcherManager struct {
volumeIDBatcher *batcher.Batcher[string, *types.Volume]
volumeTagBatcher *batcher.Batcher[string, *types.Volume]
instanceIDBatcher *batcher.Batcher[string, *types.Instance]
snapshotIDBatcher *batcher.Batcher[string, *types.Snapshot]
snapshotTagBatcher *batcher.Batcher[string, *types.Snapshot]
volumeIDBatcher *batcher.Batcher[string, *types.Volume]
volumeTagBatcher *batcher.Batcher[string, *types.Volume]
instanceIDBatcher *batcher.Batcher[string, *types.Instance]
snapshotIDBatcher *batcher.Batcher[string, *types.Snapshot]
snapshotTagBatcher *batcher.Batcher[string, *types.Snapshot]
volumeModificationIDBatcher *batcher.Batcher[string, *types.VolumeModification]
}

type cloud struct {
Expand Down Expand Up @@ -358,6 +359,9 @@ func newBatcherManager(svc EC2API) *batcherManager {
snapshotTagBatcher: batcher.New(1000, 300*time.Millisecond, func(names []string) (map[string]*types.Snapshot, error) {
return execBatchDescribeSnapshots(svc, names, snapshotTagBatcher)
}),
volumeModificationIDBatcher: batcher.New(500, 300*time.Millisecond, func(names []string) (map[string]*types.VolumeModification, error) {
return execBatchDescribeVolumesModifications(svc, names)
}),
}
}

Expand Down Expand Up @@ -638,6 +642,58 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone, SnapshotID: snapshotID, OutpostArn: outpostArn}, nil
}

// execBatchDescribeVolumesModifications executes a batched DescribeVolumesModifications API call
func execBatchDescribeVolumesModifications(svc EC2API, input []string) (map[string]*types.VolumeModification, error) {
klog.V(7).InfoS("execBatchDescribeVolumeModifications", "volumeIds", input)
request := &ec2.DescribeVolumesModificationsInput{
VolumeIds: input,
}

ctx, cancel := context.WithTimeout(context.Background(), batchDescribeTimeout)
defer cancel()

resp, err := describeVolumesModifications(ctx, svc, request)
if err != nil {
return nil, err
}

result := make(map[string]*types.VolumeModification)

for _, i := range resp {
volumeModification := i
result[*volumeModification.VolumeId] = &volumeModification
}

klog.V(7).InfoS("execBatchDescribeVolumeModifications: success", "result", result)
return result, nil
}

// batchDescribeVolumesModifications processes a DescribeVolumesModifications request by queuing the task and waiting for the result.
func (c *cloud) batchDescribeVolumesModifications(request *ec2.DescribeVolumesModificationsInput) (*types.VolumeModification, error) {
var task string

if len(request.VolumeIds) == 1 && request.VolumeIds[0] != "" {
task = request.VolumeIds[0]
} else {
return nil, fmt.Errorf("batchDescribeVolumesModifications: invalid request, request: %v", request)
}

ch := make(chan batcher.BatchResult[*types.VolumeModification])

b := c.bm.volumeModificationIDBatcher
b.AddTask(task, ch)

r := <-ch

if r.Err != nil {
return nil, r.Err
}
if r.Result == nil {
return nil, VolumeNotBeingModified
}
return r.Result, nil
}

// ResizeOrModifyDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit, and/or modifies an EBS
// volume with the parameters in ModifyDiskOptions.
// The resizing operation is performed only when newSizeBytes != 0.
Expand Down Expand Up @@ -705,7 +761,7 @@ func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
return true, nil
}

// executes a batched DescribeInstances API call
// execBatchDescribeInstances executes a batched DescribeInstances API call
func execBatchDescribeInstances(svc EC2API, input []string) (map[string]*types.Instance, error) {
klog.V(7).InfoS("execBatchDescribeInstances", "instanceIds", input)
request := &ec2.DescribeInstancesInput{
Expand Down Expand Up @@ -1598,7 +1654,7 @@ func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string)
}

waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
m, err := c.getLatestVolumeModification(ctx, volumeID)
m, err := c.getLatestVolumeModification(ctx, volumeID, true)
// Consider volumes that have never been modified as done
if err != nil && errors.Is(err, VolumeNotBeingModified) {
return true, nil
Expand All @@ -1621,25 +1677,55 @@ func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string)
return nil
}

func describeVolumesModifications(ctx context.Context, svc EC2API, request *ec2.DescribeVolumesModificationsInput) ([]types.VolumeModification, error) {
volumeModifications := []types.VolumeModification{}
var nextToken *string
for {
response, err := svc.DescribeVolumesModifications(ctx, request)
if err != nil {
if isAWSErrorModificationNotFound(err) {
return nil, VolumeNotBeingModified
}
return nil, fmt.Errorf("error describing volume modifications: %w", err)
}

volumeModifications = append(volumeModifications, response.VolumesModifications...)

nextToken = response.NextToken
if aws.ToString(nextToken) == "" {
break
}
request.NextToken = nextToken
}
return volumeModifications, nil
}

// getLatestVolumeModification returns the last modification of the volume.
func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string) (*types.VolumeModification, error) {
func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string, isBatched bool) (*types.VolumeModification, error) {
request := &ec2.DescribeVolumesModificationsInput{
VolumeIds: []string{volumeID},
}
mod, err := c.ec2.DescribeVolumesModifications(ctx, request)
if err != nil {
if isAWSErrorModificationNotFound(err) {

// TODO Q: I see this as the cleanest way to NOT batch certain DVM calls.
// TODO Q Cont: Would making a separate batcher with maxEntries 1 or maxDelay 0 be cleaner?
if c.bm == nil || !isBatched {
mod, err := c.ec2.DescribeVolumesModifications(ctx, request)
if err != nil {
if isAWSErrorModificationNotFound(err) {
return nil, VolumeNotBeingModified
}
return nil, fmt.Errorf("error describing modifications in volume %q: %w", volumeID, err)
}

volumeMods := mod.VolumesModifications
if len(volumeMods) == 0 {
return nil, VolumeNotBeingModified
}
return nil, fmt.Errorf("error describing modifications in volume %q: %w", volumeID, err)
}

volumeMods := mod.VolumesModifications
if len(volumeMods) == 0 {
return nil, VolumeNotBeingModified
return &volumeMods[len(volumeMods)-1], nil // TODO Q Check for nil pointer shenanigan
} else {
return c.batchDescribeVolumesModifications(request)
}

return &volumeMods[len(volumeMods)-1], nil
}

// randomAvailabilityZone returns a random zone from the given region
Expand Down Expand Up @@ -1710,7 +1796,8 @@ func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSi
}
oldSizeGiB := *volume.Size

latestMod, err := c.getLatestVolumeModification(ctx, volumeID)
// This call must NOT be batched because a missing volume modification will return client error
latestMod, err := c.getLatestVolumeModification(ctx, volumeID, false)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err)
}
Expand All @@ -1733,6 +1820,7 @@ func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSi

// At this point, we know we are starting a new volume modification
// If we're asked to modify a volume to its current state, ignore the request and immediately return a success
// This is because as of March 2024, EC2 ModifyVolume calls that don't change any parameters still modify the volume
if !needsVolumeModification(*volume, newSizeGiB, options) {
klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID)
// Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new
Expand Down
126 changes: 111 additions & 15 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,15 @@ func executeDescribeVolumesTest(t *testing.T, c *cloud, volumeIDs, volumeNames [
wg.Add(1)
r[i] = make(chan *types.Volume, 1)
e[i] = make(chan error, 1)
go func(req *ec2.DescribeVolumesInput, resultCh chan *types.Volume, errCh chan error) {
go func(resultCh chan *types.Volume, errCh chan error) {
defer wg.Done()
volume, err := c.batchDescribeVolumes(req)
volume, err := c.batchDescribeVolumes(request)
if err != nil {
errCh <- err
return
}
resultCh <- volume
// passing `request` as a parameter to create a copy
// TODO remove after govet stops complaining about https://github.com/golang/go/discussions/56010
}(request, r[i], e[i])
}(r[i], e[i])
}

wg.Wait()
Expand Down Expand Up @@ -323,17 +321,15 @@ func executeDescribeInstancesTest(t *testing.T, c *cloud, instanceIds []string,
r[i] = make(chan types.Instance, 1)
e[i] = make(chan error, 1)

go func(req *ec2.DescribeInstancesInput, resultCh chan types.Instance, errCh chan error) {
go func(resultCh chan types.Instance, errCh chan error) {
defer wg.Done()
instance, err := c.batchDescribeInstances(req)
instance, err := c.batchDescribeInstances(request)
if err != nil {
errCh <- err
return
}
resultCh <- *instance
// passing `request` as a parameter to create a copy
// TODO remove after govet stops complaining about https://github.com/golang/go/discussions/56010
}(request, r[i], e[i])
}(r[i], e[i])
}

wg.Wait()
Expand Down Expand Up @@ -499,17 +495,15 @@ func executeDescribeSnapshotsTest(t *testing.T, c *cloud, snapshotIDs, snapshotN
r[i] = make(chan *types.Snapshot, 1)
e[i] = make(chan error, 1)

go func(req *ec2.DescribeSnapshotsInput, resultCh chan *types.Snapshot, errCh chan error) {
go func(resultCh chan *types.Snapshot, errCh chan error) {
defer wg.Done()
snapshot, err := c.batchDescribeSnapshots(req)
snapshot, err := c.batchDescribeSnapshots(request)
if err != nil {
errCh <- err
return
}
resultCh <- snapshot
// passing `request` as a parameter to create a copy
// TODO remove after govet stops complaining about https://github.com/golang/go/discussions/56010
}(request, r[i], e[i])
}(r[i], e[i])
}

wg.Wait()
Expand All @@ -533,6 +527,108 @@ func executeDescribeSnapshotsTest(t *testing.T, c *cloud, snapshotIDs, snapshotN
}
}

func TestBatchDescribeVolumesModifications(t *testing.T) {
testCases := []struct {
name string
volumeIds []string
mockFunc func(mockEC2 *MockEC2API, expErr error, volumeModifications []types.VolumeModification)
expErr error
}{
{
name: "success: volumeModification by ID",
volumeIds: []string{"vol-001", "vol-002", "vol-003"},
mockFunc: func(mockEC2 *MockEC2API, expErr error, volumeModifications []types.VolumeModification) {
volumeModificationsOutput := &ec2.DescribeVolumesModificationsOutput{VolumesModifications: volumeModifications}
mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any()).Return(volumeModificationsOutput, expErr).Times(1)
},
expErr: nil,
},
{
name: "fail: EC2 API generic error",
volumeIds: []string{"vol-001", "vol-002", "vol-003"},
mockFunc: func(mockEC2 *MockEC2API, expErr error, volumeModifications []types.VolumeModification) {
mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any()).Return(nil, expErr).Times(1)
},
expErr: fmt.Errorf("generic EC2 API error"),
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)

// Setup mocks
var volumeModifications []types.VolumeModification
for _, volumeId := range tc.volumeIds {
volumeModifications = append(volumeModifications, types.VolumeModification{VolumeId: aws.String(volumeId)})
}
tc.mockFunc(mockEC2, tc.expErr, volumeModifications)

executeDescribeVolumesModificationsTest(t, cloudInstance, tc.volumeIds, tc.expErr)
})
}
}

func executeDescribeVolumesModificationsTest(t *testing.T, c *cloud, volumeIds []string, expErr error) {
var wg sync.WaitGroup

getRequestForID := func(id string) *ec2.DescribeVolumesModificationsInput {
return &ec2.DescribeVolumesModificationsInput{VolumeIds: []string{id}}
}

requests := make([]*ec2.DescribeVolumesModificationsInput, 0, len(volumeIds))
for _, volumeId := range volumeIds {
requests = append(requests, getRequestForID(volumeId))
}

r := make([]chan types.VolumeModification, len(requests))
e := make([]chan error, len(requests))

for i, request := range requests {
wg.Add(1)
r[i] = make(chan types.VolumeModification, 1)
e[i] = make(chan error, 1)

go func(resultCh chan types.VolumeModification, errCh chan error) {
defer wg.Done()
volumeModification, err := c.batchDescribeVolumesModifications(request)
if err != nil {
errCh <- err
return
}
resultCh <- *volumeModification
}(r[i], e[i])
}

wg.Wait()

for i := range requests {
select {
case result := <-r[i]:
if &result == (&types.VolumeModification{}) {
t.Errorf("Received nil result for a request")
}
case err := <-e[i]:
if expErr == nil {
t.Errorf("Error while processing request: %v", err)
}
if !errors.Is(err, expErr) {
t.Errorf("Expected error %v, but got %v", expErr, err)
}
default:
t.Errorf("Did not receive a result or an error for a request")
}
}
}

func TestCreateDisk(t *testing.T) {
testCases := []struct {
name string
Expand Down

0 comments on commit 0ccce07

Please sign in to comment.