Skip to content

Commit

Permalink
Implement topology awareness support for dynamic provisioning
Browse files Browse the repository at this point in the history
Changes includes:
1. return topology in NodeGetInfo response eg. accessible_topology =
{"com.amazon.aws.csi.ebs/zone": "us-west-2a"}
2. consumes topology requirement in CreateVolume request and creates
volume using the given zone while favoring preferred topologies
3. pick a random zone when topology requirement is not provided
(this is the case of dynamic provisioning without delayed binding)
4. Add unit test for pickAvailabilityZone()
5. Add test case for CreateDisk with zone
  • Loading branch information
Cheng Pan committed Oct 6, 2018
1 parent c4fe1ea commit 04a7c34
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 33 deletions.
57 changes: 50 additions & 7 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package cloud
import (
"errors"
"fmt"
"math/rand"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -78,16 +80,26 @@ var (
ErrAlreadyExists = errors.New("Resource already exists")
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// Disk represents a EBS volume
type Disk struct {
VolumeID string
CapacityGiB int64
VolumeID string
CapacityGiB int64
AvailabilityZone string
}

// DiskOptions represents parameters to create an EBS volume
type DiskOptions struct {
CapacityBytes int64
Tags map[string]string
VolumeType string
IOPSPerGB int64
// the availability zone to create volume in
// if nil a random zone will be used
AvailabilityZone *string
}

// EC2 abstracts aws.EC2 to facilitate its mocking.
Expand All @@ -98,6 +110,9 @@ type EC2 interface {
DetachVolume(input *ec2.DetachVolumeInput) (*ec2.VolumeAttachment, error)
AttachVolume(input *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error)
DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)

// Get all the zones for current region
DescribeAvailabilityZones(input *ec2.DescribeAvailabilityZonesInput) (*ec2.DescribeAvailabilityZonesOutput, error)
}

type Cloud interface {
Expand Down Expand Up @@ -156,8 +171,10 @@ func (c *cloud) GetMetadata() MetadataService {
}

func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk, error) {
var createType string
var iops int64
var (
createType string
iops int64
)
capacityGiB := util.BytesToGiB(diskOptions.CapacityBytes)

switch diskOptions.VolumeType {
Expand Down Expand Up @@ -187,9 +204,21 @@ func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk,
Tags: tags,
}

m := c.GetMetadata()
var (
zone string
err error
)
if diskOptions.AvailabilityZone == nil {
zone, err = c.pickRandomAvailabilityZone()
if err != nil {
return nil, err
}
} else {
zone = *diskOptions.AvailabilityZone
}

request := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(m.GetAvailabilityZone()),
AvailabilityZone: aws.String(zone),
Size: aws.Int64(capacityGiB),
VolumeType: aws.String(createType),
TagSpecifications: []*ec2.TagSpecification{&tagSpec},
Expand All @@ -213,7 +242,7 @@ func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk,
return nil, fmt.Errorf("disk size was not returned by CreateVolume")
}

return &Disk{CapacityGiB: size, VolumeID: volumeID}, nil
return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone}, nil
}

func (c *cloud) DeleteDisk(volumeID string) (bool, error) {
Expand Down Expand Up @@ -433,3 +462,17 @@ func (c *cloud) getInstance(nodeID string) (*ec2.Instance, error) {

return instances[0], nil
}

func (c *cloud) pickRandomAvailabilityZone() (string, error) {
output, err := c.ec2.DescribeAvailabilityZones(&ec2.DescribeAvailabilityZonesInput{})
if err != nil {
return "", err
}

var zones []string
for _, zone := range output.AvailabilityZones {
zones = append(zones, *zone.ZoneName)
}

return zones[rand.Int()%len(zones)], nil
}
46 changes: 42 additions & 4 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,23 @@ func TestCreateDisk(t *testing.T) {
name: "success: normal",
volumeName: "vol-test-name",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: nil,
},
expDisk: &Disk{
VolumeID: "vol-test",
CapacityGiB: 1,
},
expErr: nil,
},
{
name: "success: normal with provided zone",
volumeName: "vol-test-name",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: stringPtr("us-west-2"),
},
expDisk: &Disk{
VolumeID: "vol-test",
Expand All @@ -55,8 +70,9 @@ func TestCreateDisk(t *testing.T) {
name: "fail: CreateVolume returned an error",
volumeName: "vol-test-name-error",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: nil,
},
expErr: fmt.Errorf("CreateVolume generic error"),
},
Expand All @@ -78,6 +94,24 @@ func TestCreateDisk(t *testing.T) {

mockEC2.EXPECT().CreateVolume(gomock.Any()).Return(vol, tc.expErr)

if tc.diskOptions.AvailabilityZone == nil {
describeAvailabilityZonesResp := &ec2.DescribeAvailabilityZonesOutput{
AvailabilityZones: []*ec2.AvailabilityZone{
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2a"),
},
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2b"),
},
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2c"),
},
},
}

mockEC2.EXPECT().DescribeAvailabilityZones(gomock.Any()).Return(describeAvailabilityZonesResp, nil)
}

disk, err := c.CreateDisk(tc.volumeName, tc.diskOptions)
if err != nil {
if tc.expErr == nil {
Expand Down Expand Up @@ -369,3 +403,7 @@ func newDescribeInstancesOutput(nodeID string) *ec2.DescribeInstancesOutput {
}},
}
}

func stringPtr(str string) *string {
return &str
}
16 changes: 14 additions & 2 deletions pkg/cloud/mocks/mock_ec2.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pkg/cloud/mocks/mock_ec2metadata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 49 additions & 16 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,23 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}
}

if disk == nil {
opts := &cloud.DiskOptions{
CapacityBytes: volSizeBytes,
Tags: map[string]string{cloud.VolumeNameTagKey: volName},
}
newDisk, err := d.cloud.CreateDisk(volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
disk = newDisk
// volume exists already
if disk != nil {
return newCreateVolumeResponse(disk), nil
}

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Id: disk.VolumeID,
CapacityBytes: util.GiBToBytes(disk.CapacityGiB),
},
}, nil
// create a new volume
zone := pickAvailabilityZone(req.GetAccessibilityRequirements())
opts := &cloud.DiskOptions{
CapacityBytes: volSizeBytes,
AvailabilityZone: zone,
Tags: map[string]string{cloud.VolumeNameTagKey: volName},
}
disk, err = d.cloud.CreateDisk(volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
return newCreateVolumeResponse(disk), nil
}

func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
Expand Down Expand Up @@ -253,3 +252,37 @@ func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequ
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

// pickAvailabilityZone selects 1 zone given topology requirement.
func pickAvailabilityZone(requirement *csi.TopologyRequirement) *string {
if requirement == nil {
return nil
}
for _, topology := range requirement.GetPreferred() {
zone, exists := topology.GetSegments()[topologyKey]
if exists {
return &zone
}
}
for _, topology := range requirement.GetRequisite() {
zone, exists := topology.GetSegments()[topologyKey]
if exists {
return &zone
}
}
return nil
}

func newCreateVolumeResponse(disk *cloud.Disk) *csi.CreateVolumeResponse {
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Id: disk.VolumeID,
CapacityBytes: util.GiBToBytes(disk.CapacityGiB),
AccessibleTopology: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: disk.AvailabilityZone},
},
},
},
}
}
74 changes: 73 additions & 1 deletion pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestCreateVolume(t *testing.T) {
t.Fatalf("Could not get error status code from error: %v", srvErr)
}
if srvErr.Code() != tc.expErrCode {
t.Fatalf("Expected error code %d, got %d", tc.expErrCode, srvErr.Code())
t.Fatalf("Expected error code %d, got %d message %s", tc.expErrCode, srvErr.Code(), srvErr.Message())
}
continue
}
Expand Down Expand Up @@ -235,3 +235,75 @@ func TestDeleteVolume(t *testing.T) {
}
}
}

func TestPickAvailabilityZone(t *testing.T) {
expZone := "us-west-2b"
testCases := []struct {
name string
requirement *csi.TopologyRequirement
expZone *string
}{
{
name: "Pick from preferred",
requirement: &csi.TopologyRequirement{
Requisite: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
Preferred: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
},
expZone: stringPtr(expZone),
},
{
name: "Pick from requisite",
requirement: &csi.TopologyRequirement{
Requisite: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
},
expZone: stringPtr(expZone),
},
{
name: "Pick from empty topology",
requirement: &csi.TopologyRequirement{
Preferred: []*csi.Topology{&csi.Topology{}},
Requisite: []*csi.Topology{&csi.Topology{}},
},
expZone: nil,
},

{
name: "Topology Requirement is nil",
requirement: nil,
expZone: nil,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := pickAvailabilityZone(tc.requirement)
if tc.expZone == nil {
if actual != nil {
t.Fatalf("Expected zone to be nil, got %v", actual)
}
} else {
if *actual != *tc.expZone {
t.Fatalf("Expected zone %v, got zone: %v", tc.expZone, actual)

}
}
})
}

}

func stringPtr(str string) *string {
return &str
}
1 change: 1 addition & 0 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
const (
driverName = "com.amazon.aws.csi.ebs"
vendorVersion = "0.0.1" // FIXME
topologyKey = driverName + "/zone"
)

type Driver struct {
Expand Down
Loading

0 comments on commit 04a7c34

Please sign in to comment.