diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 71ecaa47e..4d14ab2c3 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -27,6 +27,7 @@ import ( "github.com/awslabs/volume-modifier-for-k8s/pkg/rpc" csi "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/coalescer" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util/template" @@ -58,20 +59,20 @@ const isManagedByDriver = "true" // ControllerService represents the controller service of CSI driver type ControllerService struct { - cloud cloud.Cloud - inFlight *internal.InFlight - options *Options - modifyVolumeManager *modifyVolumeManager + cloud cloud.Cloud + inFlight *internal.InFlight + options *Options + modifyVolumeCoalescer coalescer.Coalescer[modifyVolumeRequest, int32] rpc.UnimplementedModifyServer } // NewControllerService creates a new controller service func NewControllerService(c cloud.Cloud, o *Options) *ControllerService { return &ControllerService{ - cloud: c, - options: o, - inFlight: internal.NewInFlight(), - modifyVolumeManager: newModifyVolumeManager(), + cloud: c, + options: o, + inFlight: internal.NewInFlight(), + modifyVolumeCoalescer: newModifyVolumeCoalescer(c, o), } } @@ -557,26 +558,11 @@ func (d *ControllerService) ControllerExpandVolume(ctx context.Context, req *csi return nil, status.Error(codes.InvalidArgument, "After round-up, volume size exceeds the limit specified") } - responseChan := make(chan modifyVolumeResponse) - modifyVolumeRequest := modifyVolumeRequest{ - newSize: newSize, - responseChan: responseChan, - } - - // Intentionally not pass in context as we deal with context locally in this method - d.addModifyVolumeRequest(volumeID, &modifyVolumeRequest) //nolint:contextcheck - - var actualSizeGiB int32 - - select { - case response := <-responseChan: - if response.err != nil { - return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, response.err) - } else { - actualSizeGiB = response.volumeSize - } - case <-ctx.Done(): - return nil, status.Errorf(codes.Internal, "Could not resize volume %q: context cancelled", volumeID) + actualSizeGiB, err := d.modifyVolumeCoalescer.Coalesce(volumeID, modifyVolumeRequest{ + newSize: newSize, + }) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err) } nodeExpansionRequired := true @@ -605,7 +591,9 @@ func (d *ControllerService) ControllerModifyVolume(ctx context.Context, req *csi return nil, err } - err = d.modifyVolumeWithCoalescing(ctx, volumeID, options) + _, err = d.modifyVolumeCoalescer.Coalesce(volumeID, modifyVolumeRequest{ + modifyDiskOptions: *options, + }) if err != nil { return nil, err } diff --git a/pkg/driver/controller_modify_volume.go b/pkg/driver/controller_modify_volume.go index 9e033f39d..94d8aeb85 100644 --- a/pkg/driver/controller_modify_volume.go +++ b/pkg/driver/controller_modify_volume.go @@ -21,11 +21,11 @@ import ( "errors" "fmt" "strconv" - "sync" "time" "github.com/awslabs/volume-modifier-for-k8s/pkg/rpc" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/coalescer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog/v2" @@ -44,150 +44,6 @@ const ( type modifyVolumeRequest struct { newSize int64 modifyDiskOptions cloud.ModifyDiskOptions - // Channel for sending the response to the request caller - responseChan chan modifyVolumeResponse -} - -type modifyVolumeResponse struct { - volumeSize int32 - err error -} - -type modifyVolumeRequestHandler struct { - volumeID string - // Merged request from the requests that have been accepted for the volume - mergedRequest *modifyVolumeRequest - // Channel for sending requests to the goroutine for the volume - requestChan chan *modifyVolumeRequest -} - -type modifyVolumeManager struct { - // Map of volume ID to modifyVolumeRequestHandler - requestHandlerMap sync.Map -} - -func newModifyVolumeManager() *modifyVolumeManager { - return &modifyVolumeManager{ - requestHandlerMap: sync.Map{}, - } -} - -func newModifyVolumeRequestHandler(volumeID string, request *modifyVolumeRequest) modifyVolumeRequestHandler { - requestChan := make(chan *modifyVolumeRequest) - return modifyVolumeRequestHandler{ - requestChan: requestChan, - mergedRequest: request, - volumeID: volumeID, - } -} - -// This function validates the new request against the merged request for the volume. -// If the new request has a volume property that's already included in the merged request and its value is different from that in the merged request, -// this function will return an error and the new request will be rejected. -func (h *modifyVolumeRequestHandler) validateModifyVolumeRequest(r *modifyVolumeRequest) error { - if r.newSize != 0 && h.mergedRequest.newSize != 0 && r.newSize != h.mergedRequest.newSize { - return fmt.Errorf("Different size was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.newSize, r.newSize) - } - if r.modifyDiskOptions.IOPS != 0 && h.mergedRequest.modifyDiskOptions.IOPS != 0 && r.modifyDiskOptions.IOPS != h.mergedRequest.modifyDiskOptions.IOPS { - return fmt.Errorf("Different IOPS was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.modifyDiskOptions.IOPS, r.modifyDiskOptions.IOPS) - } - if r.modifyDiskOptions.Throughput != 0 && h.mergedRequest.modifyDiskOptions.Throughput != 0 && r.modifyDiskOptions.Throughput != h.mergedRequest.modifyDiskOptions.Throughput { - return fmt.Errorf("Different throughput was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.modifyDiskOptions.Throughput, r.modifyDiskOptions.Throughput) - } - if r.modifyDiskOptions.VolumeType != "" && h.mergedRequest.modifyDiskOptions.VolumeType != "" && r.modifyDiskOptions.VolumeType != h.mergedRequest.modifyDiskOptions.VolumeType { - return fmt.Errorf("Different volume type was requested by a previous request. Current: %s, Requested: %s", h.mergedRequest.modifyDiskOptions.VolumeType, r.modifyDiskOptions.VolumeType) - } - return nil -} - -func (h *modifyVolumeRequestHandler) mergeModifyVolumeRequest(r *modifyVolumeRequest) { - if r.newSize != 0 { - h.mergedRequest.newSize = r.newSize - } - if r.modifyDiskOptions.IOPS != 0 { - h.mergedRequest.modifyDiskOptions.IOPS = r.modifyDiskOptions.IOPS - } - if r.modifyDiskOptions.Throughput != 0 { - h.mergedRequest.modifyDiskOptions.Throughput = r.modifyDiskOptions.Throughput - } - if r.modifyDiskOptions.VolumeType != "" { - h.mergedRequest.modifyDiskOptions.VolumeType = r.modifyDiskOptions.VolumeType - } -} - -// processModifyVolumeRequests method starts its execution with a timer that has modifyVolumeRequestHandlerTimeout as its timeout value. -// When the Timer times out, it calls the ec2 API to perform the volume modification. processModifyVolumeRequests method sends back the response of -// the ec2 API call to the CSI Driver main thread via response channels. -// This method receives requests from CSI driver main thread via the request channel. When a new request is received from the request channel, we first -// validate the new request. If the new request is acceptable, it will be merged with the existing request for the volume. -func (d *ControllerService) processModifyVolumeRequests(h *modifyVolumeRequestHandler, responseChans []chan modifyVolumeResponse) { - klog.V(4).InfoS("Start processing ModifyVolumeRequest for ", "volume ID", h.volumeID) - process := func(req *modifyVolumeRequest) { - if err := h.validateModifyVolumeRequest(req); err != nil { - req.responseChan <- modifyVolumeResponse{err: err} - } else { - h.mergeModifyVolumeRequest(req) - responseChans = append(responseChans, req.responseChan) - } - } - - for { - select { - case req := <-h.requestChan: - process(req) - case <-time.After(d.options.ModifyVolumeRequestHandlerTimeout): - d.modifyVolumeManager.requestHandlerMap.Delete(h.volumeID) - // At this point, no new requests can come in on the request channel because it has been removed from the map - // However, the request channel may still have requests waiting on it - // Thus, process any requests still waiting in the channel - for loop := true; loop; { - select { - case req := <-h.requestChan: - process(req) - default: - loop = false - } - } - actualSizeGiB, err := d.executeModifyVolumeRequest(h.volumeID, h.mergedRequest) - for _, c := range responseChans { - select { - case c <- modifyVolumeResponse{volumeSize: actualSizeGiB, err: err}: - default: - klog.V(6).InfoS("Ignoring response channel because it has no receiver", "volumeID", h.volumeID) - } - } - return - } - } -} - -// When a new request comes in, we look up requestHandlerMap using the volume ID of the request. -// If there's no ModifyVolumeRequestHandler for the volume, meaning that there’s no inflight requests for the volume, we will start a goroutine -// for the volume calling processModifyVolumeRequests method, and ModifyVolumeRequestHandler for the volume will be added to requestHandlerMap. -// If there’s ModifyVolumeRequestHandler for the volume, meaning that there is inflight request(s) for the volume, we will send the new request -// to the goroutine for the volume via the receiving channel. -// Note that each volume with inflight requests has their own goroutine which follows timeout schedule of their own. -func (d *ControllerService) addModifyVolumeRequest(volumeID string, r *modifyVolumeRequest) { - requestHandler := newModifyVolumeRequestHandler(volumeID, r) - handler, loaded := d.modifyVolumeManager.requestHandlerMap.LoadOrStore(volumeID, requestHandler) - if loaded { - h := handler.(modifyVolumeRequestHandler) - h.requestChan <- r - } else { - responseChans := []chan modifyVolumeResponse{r.responseChan} - go d.processModifyVolumeRequests(&requestHandler, responseChans) - } -} - -func (d *ControllerService) executeModifyVolumeRequest(volumeID string, req *modifyVolumeRequest) (int32, error) { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - actualSizeGiB, err := d.cloud.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions) - if err != nil { - return 0, err - } else { - return actualSizeGiB, nil - } } func (d *ControllerService) GetCSIDriverModificationCapability( @@ -202,8 +58,9 @@ func (d *ControllerService) ModifyVolumeProperties( req *rpc.ModifyVolumePropertiesRequest, ) (*rpc.ModifyVolumePropertiesResponse, error) { klog.V(4).InfoS("ModifyVolumeProperties called", "req", req) - if err := validateModifyVolumePropertiesRequest(req); err != nil { - return nil, err + name := req.GetName() + if name == "" { + return nil, status.Error(codes.InvalidArgument, "Volume name not provided") } options, err := parseModifyVolumeParameters(req.GetParameters()) @@ -211,8 +68,9 @@ func (d *ControllerService) ModifyVolumeProperties( return nil, err } - name := req.GetName() - err = d.modifyVolumeWithCoalescing(ctx, name, options) + _, err = d.modifyVolumeCoalescer.Coalesce(name, modifyVolumeRequest{ + modifyDiskOptions: *options, + }) if err != nil { return nil, err } @@ -220,12 +78,54 @@ func (d *ControllerService) ModifyVolumeProperties( return &rpc.ModifyVolumePropertiesResponse{}, nil } -func validateModifyVolumePropertiesRequest(req *rpc.ModifyVolumePropertiesRequest) error { - name := req.GetName() - if name == "" { - return status.Error(codes.InvalidArgument, "Volume name not provided") +func newModifyVolumeCoalescer(c cloud.Cloud, o *Options) coalescer.Coalescer[modifyVolumeRequest, int32] { + return coalescer.New[modifyVolumeRequest, int32](o.ModifyVolumeRequestHandlerTimeout, mergeModifyVolumeRequest, executeModifyVolumeRequest(c)) +} + +func mergeModifyVolumeRequest(input modifyVolumeRequest, existing modifyVolumeRequest) (modifyVolumeRequest, error) { + if input.newSize != 0 { + if existing.newSize != 0 && input.newSize != existing.newSize { + return existing, fmt.Errorf("Different size was requested by a previous request. Current: %d, Requested: %d", existing.newSize, input.newSize) + } + existing.newSize = input.newSize + } + if input.modifyDiskOptions.IOPS != 0 { + if existing.modifyDiskOptions.IOPS != 0 && input.modifyDiskOptions.IOPS != existing.modifyDiskOptions.IOPS { + return existing, fmt.Errorf("Different IOPS was requested by a previous request. Current: %d, Requested: %d", existing.modifyDiskOptions.IOPS, input.modifyDiskOptions.IOPS) + } + existing.modifyDiskOptions.IOPS = input.modifyDiskOptions.IOPS + } + if input.modifyDiskOptions.Throughput != 0 { + if existing.modifyDiskOptions.Throughput != 0 && input.modifyDiskOptions.Throughput != existing.modifyDiskOptions.Throughput { + return existing, fmt.Errorf("Different throughput was requested by a previous request. Current: %d, Requested: %d", existing.modifyDiskOptions.Throughput, input.modifyDiskOptions.Throughput) + } + existing.modifyDiskOptions.Throughput = input.modifyDiskOptions.Throughput + } + if input.modifyDiskOptions.VolumeType != "" { + if existing.modifyDiskOptions.VolumeType != "" && input.modifyDiskOptions.VolumeType != existing.modifyDiskOptions.VolumeType { + return existing, fmt.Errorf("Different volume type was requested by a previous request. Current: %s, Requested: %s", existing.modifyDiskOptions.VolumeType, input.modifyDiskOptions.VolumeType) + } + existing.modifyDiskOptions.VolumeType = input.modifyDiskOptions.VolumeType + } + + return existing, nil +} + +func executeModifyVolumeRequest(c cloud.Cloud) func(string, modifyVolumeRequest) (int32, error) { + return func(volumeID string, req modifyVolumeRequest) (int32, error) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + actualSizeGiB, err := c.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions) + if err != nil { + // Kubernetes sidecars treats "Invalid Argument" errors as infeasible and retries less aggressively + if errors.Is(err, cloud.ErrInvalidArgument) { + return 0, status.Errorf(codes.InvalidArgument, "Could not modify volume (invalid argument) %q: %v", volumeID, err) + } + return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volumeID, err) + } else { + return actualSizeGiB, nil + } } - return nil } func parseModifyVolumeParameters(params map[string]string) (*cloud.ModifyDiskOptions, error) { @@ -259,28 +159,3 @@ func parseModifyVolumeParameters(params map[string]string) (*cloud.ModifyDiskOpt return &options, nil } - -func (d *ControllerService) modifyVolumeWithCoalescing(ctx context.Context, volume string, options *cloud.ModifyDiskOptions) error { - responseChan := make(chan modifyVolumeResponse) - request := modifyVolumeRequest{ - modifyDiskOptions: *options, - responseChan: responseChan, - } - - // Intentionally not pass in context as we deal with context locally in this method - d.addModifyVolumeRequest(volume, &request) //nolint:contextcheck - - select { - case response := <-responseChan: - if response.err != nil { - if errors.Is(response.err, cloud.ErrInvalidArgument) { - return status.Errorf(codes.InvalidArgument, "Could not modify volume %q: %v", volume, response.err) - } - return status.Errorf(codes.Internal, "Could not modify volume %q: %v", volume, response.err) - } - case <-ctx.Done(): - return status.Errorf(codes.Internal, "Could not modify volume %q: context cancelled", volume) - } - - return nil -} diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index 9ae985b63..678d749ac 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -3567,10 +3567,10 @@ func TestControllerExpandVolume(t *testing.T) { mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(tc.req.GetVolumeId()), gomock.Any(), gomock.Any()).Return(retSizeGiB, nil).AnyTimes() awsDriver := ControllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - options: &Options{}, - modifyVolumeManager: newModifyVolumeManager(), + cloud: mockCloud, + inFlight: internal.NewInFlight(), + options: &Options{}, + modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, &Options{}), } resp, err := awsDriver.ControllerExpandVolume(ctx, tc.req) diff --git a/pkg/driver/request_coalescing_test.go b/pkg/driver/request_coalescing_test.go index f44c578eb..326ebc689 100644 --- a/pkg/driver/request_coalescing_test.go +++ b/pkg/driver/request_coalescing_test.go @@ -77,10 +77,6 @@ func TestVolumeModificationWithCoalescing(t *testing.T) { name: "duplicate requests", testFunction: testDuplicateRequest, }, - { - name: "context timeout", - testFunction: testContextTimeout, - }, { name: "timing", testFunction: testResponseReturnTiming, @@ -121,13 +117,14 @@ func testBasicRequestCoalescingSuccess(t *testing.T, executor modifyVolumeExecut return newSize, nil }) + options := &Options{ + ModifyVolumeRequestHandlerTimeout: 2 * time.Second, + } awsDriver := ControllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - options: &Options{ - ModifyVolumeRequestHandlerTimeout: 2 * time.Second, - }, - modifyVolumeManager: newModifyVolumeManager(), + cloud: mockCloud, + inFlight: internal.NewInFlight(), + options: options, + modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, options), } var wg sync.WaitGroup @@ -175,13 +172,14 @@ func testRequestFail(t *testing.T, executor modifyVolumeExecutor) { return 0, fmt.Errorf("ResizeOrModifyDisk failed") }) + options := &Options{ + ModifyVolumeRequestHandlerTimeout: 2 * time.Second, + } awsDriver := ControllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - options: &Options{ - ModifyVolumeRequestHandlerTimeout: 2 * time.Second, - }, - modifyVolumeManager: newModifyVolumeManager(), + cloud: mockCloud, + inFlight: internal.NewInFlight(), + options: options, + modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, options), } var wg sync.WaitGroup @@ -243,13 +241,16 @@ func testPartialFail(t *testing.T, executor modifyVolumeExecutor) { return newSize, nil }) + options := &Options{ + ModifyVolumeRequestHandlerTimeout: 2 * time.Second, + } awsDriver := ControllerService{ cloud: mockCloud, inFlight: internal.NewInFlight(), options: &Options{ ModifyVolumeRequestHandlerTimeout: 2 * time.Second, }, - modifyVolumeManager: newModifyVolumeManager(), + modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, options), } var wg sync.WaitGroup @@ -324,13 +325,14 @@ func testSequentialRequests(t *testing.T, executor modifyVolumeExecutor) { return newSize, nil }).Times(2) + options := &Options{ + ModifyVolumeRequestHandlerTimeout: 2 * time.Second, + } awsDriver := ControllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - options: &Options{ - ModifyVolumeRequestHandlerTimeout: 2 * time.Second, - }, - modifyVolumeManager: newModifyVolumeManager(), + cloud: mockCloud, + inFlight: internal.NewInFlight(), + options: options, + modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, options), } var wg sync.WaitGroup @@ -381,13 +383,14 @@ func testDuplicateRequest(t *testing.T, executor modifyVolumeExecutor) { return newSize, nil }) + options := &Options{ + ModifyVolumeRequestHandlerTimeout: 2 * time.Second, + } awsDriver := ControllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - options: &Options{ - ModifyVolumeRequestHandlerTimeout: 2 * time.Second, - }, - modifyVolumeManager: newModifyVolumeManager(), + cloud: mockCloud, + inFlight: internal.NewInFlight(), + options: options, + modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, options), } var wg sync.WaitGroup @@ -421,74 +424,6 @@ func testDuplicateRequest(t *testing.T, executor modifyVolumeExecutor) { wg.Wait() } -// TestContextTimeout tests request failing due to context cancellation and the behavior of the following request. -func testContextTimeout(t *testing.T, executor modifyVolumeExecutor) { - const NewVolumeType = "gp3" - const NewSize = 5 * util.GiB - volumeID := t.Name() - - mockCtl := gomock.NewController(t) - defer mockCtl.Finish() - - mockCloud := cloud.NewMockCloud(mockCtl) - mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(volumeID), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { - klog.InfoS("ResizeOrModifyDisk called", "volumeID", volumeID, "newSize", newSize, "options", options) - time.Sleep(3 * time.Second) - - // Controller could decide to coalesce the timed out request, or to drop it - if newSize != 0 && newSize != NewSize { - t.Errorf("newSize incorrect") - } else if options.VolumeType != NewVolumeType { - t.Errorf("volumeType incorrect") - } - - return newSize, nil - }) - - awsDriver := ControllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - options: &Options{ - ModifyVolumeRequestHandlerTimeout: 2 * time.Second, - }, - modifyVolumeManager: newModifyVolumeManager(), - } - - var wg sync.WaitGroup - wg.Add(2) - - ctx, cancel := context.WithCancel(context.Background()) - go wrapTimeout(t, "ControllerExpandVolume timed out", func() { - _, err := awsDriver.ControllerExpandVolume(ctx, &csi.ControllerExpandVolumeRequest{ - VolumeId: volumeID, - CapacityRange: &csi.CapacityRange{ - RequiredBytes: NewSize, - }, - }) - if err == nil { - t.Error("ControllerExpandVolume should return err because context is cancelled") - } - wg.Done() - }) - - // Cancel the context (simulate a "sidecar timeout") - time.Sleep(500 * time.Millisecond) - cancel() - - go wrapTimeout(t, "Modify timed out", func() { - err := executor(context.Background(), awsDriver, volumeID, map[string]string{ - ModificationKeyVolumeType: NewVolumeType, - }) - - if err != nil { - t.Error("Modify returned error") - } - wg.Done() - }) - - wg.Wait() -} - // TestResponseReturnTiming tests the caller of request coalescing blocking until receiving response from cloud.ResizeOrModifyDisk func testResponseReturnTiming(t *testing.T, executor modifyVolumeExecutor) { const NewVolumeType = "gp3" @@ -510,13 +445,14 @@ func testResponseReturnTiming(t *testing.T, executor modifyVolumeExecutor) { return newSize, nil }) + options := &Options{ + ModifyVolumeRequestHandlerTimeout: 2 * time.Second, + } awsDriver := ControllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - options: &Options{ - ModifyVolumeRequestHandlerTimeout: 2 * time.Second, - }, - modifyVolumeManager: newModifyVolumeManager(), + cloud: mockCloud, + inFlight: internal.NewInFlight(), + options: options, + modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, options), } var wg sync.WaitGroup