Skip to content

Commit

Permalink
Merge pull request #45 from leakingtapan/add-context
Browse files Browse the repository at this point in the history
Update cloud provider interface to take in context
  • Loading branch information
k8s-ci-robot authored Oct 8, 2018
2 parents c4fe1ea + af3bcdd commit 799bb24
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 102 deletions.
68 changes: 35 additions & 33 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cloud

import (
"context"
"errors"
"fmt"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
Expand Down Expand Up @@ -92,23 +94,23 @@ type DiskOptions struct {

// EC2 abstracts aws.EC2 to facilitate its mocking.
type EC2 interface {
DescribeVolumes(input *ec2.DescribeVolumesInput) (*ec2.DescribeVolumesOutput, error)
CreateVolume(input *ec2.CreateVolumeInput) (*ec2.Volume, error)
DeleteVolume(input *ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error)
DetachVolume(input *ec2.DetachVolumeInput) (*ec2.VolumeAttachment, error)
AttachVolume(input *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error)
DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
DescribeVolumesWithContext(ctx aws.Context, input *ec2.DescribeVolumesInput, opts ...request.Option) (*ec2.DescribeVolumesOutput, error)
CreateVolumeWithContext(ctx aws.Context, input *ec2.CreateVolumeInput, opts ...request.Option) (*ec2.Volume, error)
DeleteVolumeWithContext(ctx aws.Context, input *ec2.DeleteVolumeInput, opts ...request.Option) (*ec2.DeleteVolumeOutput, error)
DetachVolumeWithContext(ctx aws.Context, input *ec2.DetachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
AttachVolumeWithContext(ctx aws.Context, input *ec2.AttachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
DescribeInstancesWithContext(ctx aws.Context, input *ec2.DescribeInstancesInput, opts ...request.Option) (*ec2.DescribeInstancesOutput, error)
}

type Cloud interface {
GetMetadata() MetadataService
CreateDisk(volumeName string, diskOptions *DiskOptions) (disk *Disk, err error)
DeleteDisk(volumeID string) (success bool, err error)
AttachDisk(volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(volumeID string, nodeID string) (err error)
GetDiskByName(name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(volumeID string) (disk *Disk, err error)
IsExistInstance(nodeID string) (sucess bool)
CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (disk *Disk, err error)
DeleteDisk(ctx context.Context, volumeID string) (success bool, err error)
AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error)
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
IsExistInstance(ctx context.Context, nodeID string) (sucess bool)
}

type cloud struct {
Expand Down Expand Up @@ -155,7 +157,7 @@ func (c *cloud) GetMetadata() MetadataService {
return c.metadata
}

func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk, error) {
func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) {
var createType string
var iops int64
capacityGiB := util.BytesToGiB(diskOptions.CapacityBytes)
Expand Down Expand Up @@ -198,7 +200,7 @@ func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk,
request.Iops = aws.Int64(iops)
}

response, err := c.ec2.CreateVolume(request)
response, err := c.ec2.CreateVolumeWithContext(ctx, request)
if err != nil {
return nil, fmt.Errorf("could not create volume in EC2: %v", err)
}
Expand All @@ -216,9 +218,9 @@ func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk,
return &Disk{CapacityGiB: size, VolumeID: volumeID}, nil
}

func (c *cloud) DeleteDisk(volumeID string) (bool, error) {
func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
request := &ec2.DeleteVolumeInput{VolumeId: &volumeID}
if _, err := c.ec2.DeleteVolume(request); err != nil {
if _, err := c.ec2.DeleteVolumeWithContext(ctx, request); err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "InvalidVolume.NotFound" {
return false, ErrNotFound
Expand All @@ -229,8 +231,8 @@ func (c *cloud) DeleteDisk(volumeID string) (bool, error) {
return true, nil
}

func (c *cloud) AttachDisk(volumeID, nodeID string) (string, error) {
instance, err := c.getInstance(nodeID)
func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) {
instance, err := c.getInstance(ctx, nodeID)
if err != nil {
return "", err
}
Expand All @@ -248,7 +250,7 @@ func (c *cloud) AttachDisk(volumeID, nodeID string) (string, error) {
VolumeId: aws.String(volumeID),
}

resp, err := c.ec2.AttachVolume(request)
resp, err := c.ec2.AttachVolumeWithContext(ctx, request)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "VolumeInUse" {
Expand Down Expand Up @@ -291,8 +293,8 @@ func (c *cloud) AttachDisk(volumeID, nodeID string) (string, error) {
return device.Path, nil
}

func (c *cloud) DetachDisk(volumeID, nodeID string) error {
instance, err := c.getInstance(nodeID)
func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
instance, err := c.getInstance(ctx, nodeID)
if err != nil {
return err
}
Expand All @@ -313,15 +315,15 @@ func (c *cloud) DetachDisk(volumeID, nodeID string) error {
VolumeId: aws.String(volumeID),
}

_, err = c.ec2.DetachVolume(request)
_, err = c.ec2.DetachVolumeWithContext(ctx, request)
if err != nil {
return fmt.Errorf("could not detach volume %q from node %q: %v", volumeID, nodeID, err)
}

return nil
}

func (c *cloud) GetDiskByName(name string, capacityBytes int64) (*Disk, error) {
func (c *cloud) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (*Disk, error) {
request := &ec2.DescribeVolumesInput{
Filters: []*ec2.Filter{
{
Expand All @@ -331,7 +333,7 @@ func (c *cloud) GetDiskByName(name string, capacityBytes int64) (*Disk, error) {
},
}

volume, err := c.getVolume(request)
volume, err := c.getVolume(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -347,14 +349,14 @@ func (c *cloud) GetDiskByName(name string, capacityBytes int64) (*Disk, error) {
}, nil
}

func (c *cloud) GetDiskByID(volumeID string) (*Disk, error) {
func (c *cloud) GetDiskByID(ctx context.Context, volumeID string) (*Disk, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}

volume, err := c.getVolume(request)
volume, err := c.getVolume(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -365,20 +367,20 @@ func (c *cloud) GetDiskByID(volumeID string) (*Disk, error) {
}, nil
}

func (c *cloud) IsExistInstance(nodeID string) bool {
instance, err := c.getInstance(nodeID)
func (c *cloud) IsExistInstance(ctx context.Context, nodeID string) bool {
instance, err := c.getInstance(ctx, nodeID)
if err != nil || instance == nil {
return false
}
return true
}

func (c *cloud) getVolume(request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string

for {
response, err := c.ec2.DescribeVolumes(request)
response, err := c.ec2.DescribeVolumesWithContext(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -401,15 +403,15 @@ func (c *cloud) getVolume(request *ec2.DescribeVolumesInput) (*ec2.Volume, error
return volumes[0], nil
}

func (c *cloud) getInstance(nodeID string) (*ec2.Instance, error) {
func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance, error) {
instances := []*ec2.Instance{}
request := &ec2.DescribeInstancesInput{
InstanceIds: []*string{&nodeID},
}

var nextToken *string
for {
response, err := c.ec2.DescribeInstances(request)
response, err := c.ec2.DescribeInstancesWithContext(ctx, request)
if err != nil {
return nil, fmt.Errorf("error listing AWS instances: %q", err)
}
Expand Down
36 changes: 22 additions & 14 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cloud

import (
"context"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -76,9 +77,10 @@ func TestCreateDisk(t *testing.T) {
}
}

mockEC2.EXPECT().CreateVolume(gomock.Any()).Return(vol, tc.expErr)
ctx := context.Background()
mockEC2.EXPECT().CreateVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(vol, tc.expErr)

disk, err := c.CreateDisk(tc.volumeName, tc.diskOptions)
disk, err := c.CreateDisk(ctx, tc.volumeName, tc.diskOptions)
if err != nil {
if tc.expErr == nil {
t.Fatalf("CreateDisk() failed: expected no error, got: %v", err)
Expand Down Expand Up @@ -134,9 +136,10 @@ func TestDeleteDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

mockEC2.EXPECT().DeleteVolume(gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, tc.expErr)
ctx := context.Background()
mockEC2.EXPECT().DeleteVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, tc.expErr)

ok, err := c.DeleteDisk(tc.volumeID)
ok, err := c.DeleteDisk(ctx, tc.volumeID)
if err != nil && tc.expErr == nil {
t.Fatalf("DeleteDisk() failed: expected no error, got: %v", err)
}
Expand Down Expand Up @@ -180,10 +183,11 @@ func TestAttachDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

mockEC2.EXPECT().DescribeInstances(gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().AttachVolume(gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)
ctx := context.Background()
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().AttachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)

devicePath, err := c.AttachDisk(tc.volumeID, tc.nodeID)
devicePath, err := c.AttachDisk(ctx, tc.volumeID, tc.nodeID)
if err != nil {
if tc.expErr == nil {
t.Fatalf("AttachDisk() failed: expected no error, got: %v", err)
Expand Down Expand Up @@ -228,10 +232,11 @@ func TestDetachDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

mockEC2.EXPECT().DescribeInstances(gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().DetachVolume(gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)
ctx := context.Background()
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().DetachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)

err := c.DetachDisk(tc.volumeID, tc.nodeID)
err := c.DetachDisk(ctx, tc.volumeID, tc.nodeID)
if err != nil {
if tc.expErr == nil {
t.Fatalf("DetachDisk() failed: expected no error, got: %v", err)
Expand Down Expand Up @@ -277,9 +282,11 @@ func TestGetDiskByName(t *testing.T) {
VolumeId: aws.String(tc.volumeName),
Size: aws.Int64(util.BytesToGiB(tc.volumeCapacity)),
}
mockEC2.EXPECT().DescribeVolumes(gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, tc.expErr)

disk, err := c.GetDiskByName(tc.volumeName, tc.volumeCapacity)
ctx := context.Background()
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, tc.expErr)

disk, err := c.GetDiskByName(ctx, tc.volumeName, tc.volumeCapacity)
if err != nil {
if tc.expErr == nil {
t.Fatalf("GetDiskByName() failed: expected no error, got: %v", err)
Expand Down Expand Up @@ -321,7 +328,8 @@ func TestGetDiskByID(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

mockEC2.EXPECT().DescribeVolumes(gomock.Any()).Return(
ctx := context.Background()
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{
{VolumeId: aws.String(tc.volumeID)},
Expand All @@ -330,7 +338,7 @@ func TestGetDiskByID(t *testing.T) {
tc.expErr,
)

disk, err := c.GetDiskByID(tc.volumeID)
disk, err := c.GetDiskByID(ctx, tc.volumeID)
if err != nil {
if tc.expErr == nil {
t.Fatalf("GetDisk() failed: expected no error, got: %v", err)
Expand Down
15 changes: 8 additions & 7 deletions pkg/cloud/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cloud

import (
"context"
"fmt"
"math/rand"
"time"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (c *FakeCloudProvider) GetMetadata() MetadataService {
return c.m
}

func (c *FakeCloudProvider) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk, error) {
func (c *FakeCloudProvider) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) {
r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
d := &fakeDisk{
Disk: &Disk{
Expand All @@ -60,7 +61,7 @@ func (c *FakeCloudProvider) CreateDisk(volumeName string, diskOptions *DiskOptio
return d.Disk, nil
}

func (c *FakeCloudProvider) DeleteDisk(volumeID string) (bool, error) {
func (c *FakeCloudProvider) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
for volName, f := range c.disks {
if f.Disk.VolumeID == volumeID {
delete(c.disks, volName)
Expand All @@ -69,19 +70,19 @@ func (c *FakeCloudProvider) DeleteDisk(volumeID string) (bool, error) {
return true, nil
}

func (c *FakeCloudProvider) AttachDisk(volumeID, nodeID string) (string, error) {
func (c *FakeCloudProvider) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) {
if _, ok := c.pub[volumeID]; ok {
return "", ErrAlreadyExists
}
c.pub[volumeID] = nodeID
return "/dev/xvdbc", nil
}

func (c *FakeCloudProvider) DetachDisk(volumeID, nodeID string) error {
func (c *FakeCloudProvider) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
return nil
}

func (c *FakeCloudProvider) GetDiskByName(name string, capacityBytes int64) (*Disk, error) {
func (c *FakeCloudProvider) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (*Disk, error) {
var disks []*fakeDisk
for _, d := range c.disks {
for key, value := range d.tags {
Expand All @@ -101,7 +102,7 @@ func (c *FakeCloudProvider) GetDiskByName(name string, capacityBytes int64) (*Di
return nil, nil
}

func (c *FakeCloudProvider) GetDiskByID(volumeID string) (*Disk, error) {
func (c *FakeCloudProvider) GetDiskByID(ctx context.Context, volumeID string) (*Disk, error) {
for _, f := range c.disks {
if f.Disk.VolumeID == volumeID {
return f.Disk, nil
Expand All @@ -110,7 +111,7 @@ func (c *FakeCloudProvider) GetDiskByID(volumeID string) (*Disk, error) {
return nil, ErrNotFound
}

func (c *FakeCloudProvider) IsExistInstance(nodeID string) bool {
func (c *FakeCloudProvider) IsExistInstance(ctx context.Context, nodeID string) bool {
if nodeID != c.m.GetInstanceID() {
return false
}
Expand Down
Loading

0 comments on commit 799bb24

Please sign in to comment.