From 29b1b8d6b89ff7cab60581dd413732b4732fd232 Mon Sep 17 00:00:00 2001 From: Alex Ciminian Date: Tue, 29 Jan 2013 15:02:47 +0200 Subject: [PATCH 1/6] added spot instances stubs --- .../amazon/activities/RunSpotInstances.java | 34 +++++++++++++++++++ .../activities/RunSpotInstancesLiveTest.java | 20 +++++++++++ 2 files changed, 54 insertions(+) create mode 100644 providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java create mode 100644 providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java new file mode 100644 index 0000000..80d95ef --- /dev/null +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java @@ -0,0 +1,34 @@ +/* +* Copyright (c) 2012 S.C. Axemblr Software Solutions S.R.L +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package com.axemblr.provisionr.amazon.activities; + +import com.amazonaws.services.ec2.AmazonEC2; +import com.axemblr.provisionr.amazon.core.ProviderClientCache; +import com.axemblr.provisionr.api.pool.Pool; +import org.activiti.engine.delegate.DelegateExecution; + + +public class RunSpotInstances extends AmazonActivity { + + public RunSpotInstances(ProviderClientCache cache) { + super(cache); + } + + @Override + public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) throws Exception { + //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java new file mode 100644 index 0000000..0684d86 --- /dev/null +++ b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java @@ -0,0 +1,20 @@ +/* +* Copyright (c) 2012 S.C. Axemblr Software Solutions S.R.L +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.axemblr.provisionr.amazon.activities; + +public class RunSpotInstancesLiveTest extends AmazonActivityLiveTest { +} From fae8220ef8d6d46e8d262f05e6e75b097a84b1f6 Mon Sep 17 00:00:00 2001 From: Alex Ciminian Date: Tue, 29 Jan 2013 16:57:45 +0200 Subject: [PATCH 2/6] creating a spot instance request --- .../amazon/activities/RunInstances.java | 118 ++++++++++++++++++ .../activities/RunOnDemandInstances.java | 80 ++---------- .../amazon/activities/RunSpotInstances.java | 31 ++++- 3 files changed, 155 insertions(+), 74 deletions(-) create mode 100644 providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java new file mode 100644 index 0000000..f8154a3 --- /dev/null +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java @@ -0,0 +1,118 @@ +package com.axemblr.provisionr.amazon.activities; + +import java.io.IOException; +import java.util.Arrays; + +import net.schmizz.sshj.common.Base64; + +import org.activiti.engine.delegate.DelegateExecution; +import org.activiti.engine.delegate.VariableScope; + +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.services.ec2.model.LaunchSpecification; +import com.amazonaws.services.ec2.model.RequestSpotInstancesRequest; +import com.amazonaws.services.ec2.model.RunInstancesRequest; +import com.axemblr.provisionr.amazon.ProcessVariables; +import com.axemblr.provisionr.amazon.core.ImageTable; +import com.axemblr.provisionr.amazon.core.ImageTableQuery; +import com.axemblr.provisionr.amazon.core.KeyPairs; +import com.axemblr.provisionr.amazon.core.ProviderClientCache; +import com.axemblr.provisionr.amazon.core.SecurityGroups; +import com.axemblr.provisionr.amazon.options.ProviderOptions; +import com.axemblr.provisionr.amazon.options.SoftwareOptions; +import com.axemblr.provisionr.api.pool.Pool; +import com.axemblr.provisionr.api.provider.Provider; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.io.Resources; + +public abstract class RunInstances extends AmazonActivity { + + public static final String DEFAULT_ARCH = "amd64"; + public static final String DEFAULT_TYPE = "instance-store"; + + + protected RunInstances(ProviderClientCache providerClientCache) { + super(providerClientCache); + } + + protected RunInstancesRequest createOnDemandInstancesRequest(Pool pool, DelegateExecution execution) + throws IOException { + return (RunInstancesRequest) createRequest(pool, execution, false); + } + + protected RequestSpotInstancesRequest createSpotInstancesRequest(Pool pool, DelegateExecution execution) + throws IOException { + return (RequestSpotInstancesRequest) createRequest(pool, execution, true); + } + + private AmazonWebServiceRequest createRequest(Pool pool, DelegateExecution execution, boolean spot) + throws IOException { + final String businessKey = execution.getProcessBusinessKey(); + + final String securityGroupName = SecurityGroups.formatNameFromBusinessKey(businessKey); + final String keyPairName = KeyPairs.formatNameFromBusinessKey(businessKey); + + final String instanceType = pool.getHardware().getType(); + final String imageId = getImageIdFromProcessVariablesOrQueryImageTable( + execution, pool.getProvider(), instanceType); + + final String userData = Resources.toString(Resources.getResource(RunInstances.class, + "/com/axemblr/provisionr/amazon/userdata.sh"), Charsets.UTF_8); + + if (spot) { + LaunchSpecification ls = new LaunchSpecification(); + ls.setInstanceType(instanceType); + ls.setKeyName(keyPairName); + ls.setImageId(imageId); + ls.setSecurityGroups(Arrays.asList(new String[] {securityGroupName})); + ls.setUserData(Base64.encodeBytes(userData.getBytes(Charsets.UTF_8))); + return new RequestSpotInstancesRequest() + .withLaunchSpecification(ls) + .withInstanceCount(pool.getExpectedSize()); + } else { + return new RunInstancesRequest() + .withClientToken(businessKey) + .withSecurityGroups(securityGroupName) + .withKeyName(keyPairName) + .withInstanceType(instanceType) + .withImageId(imageId) + .withMinCount(pool.getMinSize()) + .withMaxCount(pool.getExpectedSize()) + .withUserData(Base64.encodeBytes(userData.getBytes(Charsets.UTF_8))); + } + } + + private String getImageIdFromProcessVariablesOrQueryImageTable( + VariableScope execution, Provider provider, String instanceType + ) { + final String imageId = (String) execution.getVariable(ProcessVariables.CACHED_IMAGE_ID); + if (imageId != null) { + return imageId; + } + + ImageTable imageTable; + try { + imageTable = ImageTable.fromCsvResource("/com/axemblr/provisionr/amazon/ubuntu.csv"); + } catch (IOException e) { + throw Throwables.propagate(e); + } + + final String region = provider.getOptionOr(ProviderOptions.REGION, ProviderOptions.DEFAULT_REGION); + final String version = provider.getOptionOr(SoftwareOptions.BASE_OPERATING_SYSTEM_VERSION, + SoftwareOptions.DEFAULT_BASE_OPERATING_SYSTEM_VERSION); + + ImageTableQuery query = imageTable.query() + .filterBy("region", region) + .filterBy("version", version) + .filterBy("arch", DEFAULT_ARCH); + + if (instanceType.equals("t1.micro")) { + query.filterBy("type", "ebs"); + } else { + query.filterBy("type", DEFAULT_TYPE); + } + + return query.singleResult(); + } +} diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunOnDemandInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunOnDemandInstances.java index aec3931..6407b81 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunOnDemandInstances.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunOnDemandInstances.java @@ -16,34 +16,24 @@ package com.axemblr.provisionr.amazon.activities; +import java.io.IOException; +import java.util.List; + +import org.activiti.engine.delegate.DelegateExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.model.Instance; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.axemblr.provisionr.amazon.ProcessVariables; -import com.axemblr.provisionr.amazon.core.ImageTable; -import com.axemblr.provisionr.amazon.core.ImageTableQuery; -import com.axemblr.provisionr.amazon.core.KeyPairs; import com.axemblr.provisionr.amazon.core.ProviderClientCache; -import com.axemblr.provisionr.amazon.core.SecurityGroups; -import com.axemblr.provisionr.amazon.options.ProviderOptions; -import com.axemblr.provisionr.amazon.options.SoftwareOptions; import com.axemblr.provisionr.api.pool.Pool; -import com.axemblr.provisionr.api.provider.Provider; -import com.google.common.base.Charsets; import com.google.common.base.Function; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import com.google.common.io.Resources; -import java.io.IOException; -import java.util.List; -import net.schmizz.sshj.common.Base64; -import org.activiti.engine.delegate.DelegateExecution; -import org.activiti.engine.delegate.VariableScope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class RunOnDemandInstances extends AmazonActivity { +public class RunOnDemandInstances extends RunInstances { private static final Logger LOG = LoggerFactory.getLogger(RunOnDemandInstances.class); @@ -56,28 +46,8 @@ public RunOnDemandInstances(ProviderClientCache cache) { @Override public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) throws IOException { - final String businessKey = execution.getProcessBusinessKey(); - - final String securityGroupName = SecurityGroups.formatNameFromBusinessKey(businessKey); - final String keyPairName = KeyPairs.formatNameFromBusinessKey(businessKey); - - final String instanceType = pool.getHardware().getType(); - final String imageId = getImageIdFromProcessVariablesOrQueryImageTable( - execution, pool.getProvider(), instanceType); - - final String userData = Resources.toString(Resources.getResource(RunOnDemandInstances.class, - "/com/axemblr/provisionr/amazon/userdata.sh"), Charsets.UTF_8); - - final RunInstancesRequest request = new RunInstancesRequest() - .withClientToken(businessKey) - .withSecurityGroups(securityGroupName) - .withKeyName(keyPairName) - .withInstanceType(instanceType) - .withImageId(imageId) - .withMinCount(pool.getMinSize()) - .withMaxCount(pool.getExpectedSize()) - .withUserData(Base64.encodeBytes(userData.getBytes(Charsets.UTF_8))); + final RunInstancesRequest request = createOnDemandInstancesRequest(pool, execution); // TODO allow for more options (e.g. monitoring & termination protection etc.) LOG.info(">> Sending RunInstances request: {}", request); @@ -92,38 +62,6 @@ public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) th collectInstanceIdsAsList(result.getReservation().getInstances())); } - private String getImageIdFromProcessVariablesOrQueryImageTable( - VariableScope execution, Provider provider, String instanceType - ) { - final String imageId = (String) execution.getVariable(ProcessVariables.CACHED_IMAGE_ID); - if (imageId != null) { - return imageId; - } - - ImageTable imageTable; - try { - imageTable = ImageTable.fromCsvResource("/com/axemblr/provisionr/amazon/ubuntu.csv"); - } catch (IOException e) { - throw Throwables.propagate(e); - } - - final String region = provider.getOptionOr(ProviderOptions.REGION, ProviderOptions.DEFAULT_REGION); - final String version = provider.getOptionOr(SoftwareOptions.BASE_OPERATING_SYSTEM_VERSION, - SoftwareOptions.DEFAULT_BASE_OPERATING_SYSTEM_VERSION); - - ImageTableQuery query = imageTable.query() - .filterBy("region", region) - .filterBy("version", version) - .filterBy("arch", DEFAULT_ARCH); - - if (instanceType.equals("t1.micro")) { - query.filterBy("type", "ebs"); - } else { - query.filterBy("type", DEFAULT_TYPE); - } - - return query.singleResult(); - } private List collectInstanceIdsAsList(List instances) { /* Make a copy as an ArrayList to force lazy collection evaluation */ diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java index 80d95ef..59bfb9b 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java @@ -15,13 +15,22 @@ */ package com.axemblr.provisionr.amazon.activities; +import java.util.List; + +import org.activiti.engine.delegate.DelegateExecution; + import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.model.RequestSpotInstancesRequest; +import com.amazonaws.services.ec2.model.RequestSpotInstancesResult; +import com.amazonaws.services.ec2.model.SpotInstanceRequest; +import com.axemblr.provisionr.amazon.ProcessVariables; import com.axemblr.provisionr.amazon.core.ProviderClientCache; import com.axemblr.provisionr.api.pool.Pool; -import org.activiti.engine.delegate.DelegateExecution; +import com.google.common.base.Function; +import com.google.common.collect.Lists; -public class RunSpotInstances extends AmazonActivity { +public class RunSpotInstances extends RunInstances { public RunSpotInstances(ProviderClientCache cache) { super(cache); @@ -29,6 +38,22 @@ public RunSpotInstances(ProviderClientCache cache) { @Override public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. + final RequestSpotInstancesRequest request = createSpotInstancesRequest(pool, execution); + + RequestSpotInstancesResult requestResult = client.requestSpotInstances(request); + + execution.setVariable(ProcessVariables.INSTANCE_IDS, + collectSpotInstanceRequestIds(requestResult.getSpotInstanceRequests())); + } + + private List collectSpotInstanceRequestIds(List requestResponses) { + /* Make a copy as an ArrayList to force lazy collection evaluation */ + return Lists.newArrayList(Lists.transform(requestResponses, + new Function() { + @Override + public String apply(SpotInstanceRequest instanceRequest) { + return instanceRequest.getSpotInstanceRequestId(); + } + })); } } From 6fba680728af894b4021b1ee76fb7c63b05ca862 Mon Sep 17 00:00:00 2001 From: Alex Ciminian Date: Tue, 29 Jan 2013 17:24:05 +0200 Subject: [PATCH 3/6] checking if spot instances are open or not --- .../amazon/activities/RunSpotInstances.java | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java index 59bfb9b..e901beb 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java @@ -15,11 +15,15 @@ */ package com.axemblr.provisionr.amazon.activities; +import java.util.ArrayList; import java.util.List; import org.activiti.engine.delegate.DelegateExecution; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsRequest; +import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsResult; import com.amazonaws.services.ec2.model.RequestSpotInstancesRequest; import com.amazonaws.services.ec2.model.RequestSpotInstancesResult; import com.amazonaws.services.ec2.model.SpotInstanceRequest; @@ -42,8 +46,59 @@ public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) th RequestSpotInstancesResult requestResult = client.requestSpotInstances(request); - execution.setVariable(ProcessVariables.INSTANCE_IDS, - collectSpotInstanceRequestIds(requestResult.getSpotInstanceRequests())); + List spotInstanceRequestIds = + collectSpotInstanceRequestIds(requestResult.getSpotInstanceRequests()); + + // Create a variable that will track whether there are any + // requests still in the open state. + boolean anyOpen; + List instanceIds = new ArrayList(); + do { + // Create the describeRequest object with all of the request ids + // to monitor (e.g. that we started). + DescribeSpotInstanceRequestsRequest describeRequest = new DescribeSpotInstanceRequestsRequest(); + describeRequest.setSpotInstanceRequestIds(spotInstanceRequestIds); + + // Initialize the anyOpen variable to false - which assumes there + // are no requests open unless we find one that is still open. + anyOpen = false; + + try { + // Retrieve all of the requests we want to monitor. + DescribeSpotInstanceRequestsResult describeResult = client.describeSpotInstanceRequests(describeRequest); + List describeResponses = describeResult.getSpotInstanceRequests(); + + // Look through each request and determine if they are all in + // the active state. + for (SpotInstanceRequest describeResponse : describeResponses) { + if (describeResponse.getState().equals("open")) { + anyOpen = true; + break; + } + // Add the instance id to the list we will + // eventually terminate. + if (describeResponse.getState().equals("active")) { + instanceIds.add(describeResponse.getInstanceId()); + } + } + } catch (AmazonServiceException e) { + // If we have an exception, ensure we don't break out of + // the loop. This prevents the scenario where there was + // blip on the wire. + anyOpen = true; + } + + // TODO: check that this timeout is ok + try { + // Sleep for 60 seconds. + Thread.sleep(60*1000); + } catch (Exception e) { + // Do nothing because it woke up early. + } + } while (anyOpen); + + + execution.setVariable(ProcessVariables.INSTANCE_IDS, instanceIds); } private List collectSpotInstanceRequestIds(List requestResponses) { From 073465c82c49a80a82a5e5fa2525e59eb349e840 Mon Sep 17 00:00:00 2001 From: Alex Ciminian Date: Tue, 29 Jan 2013 15:02:47 +0200 Subject: [PATCH 4/6] Initial draft for spot instance requests from the Amazon tutorial. --- .../activities/RunInstancesLiveTest.java | 66 +++++++++++++++++ .../RunOnDemandInstancesLiveTest.java | 74 ++----------------- .../activities/RunSpotInstancesLiveTest.java | 25 ++++++- 3 files changed, 98 insertions(+), 67 deletions(-) create mode 100644 providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunInstancesLiveTest.java diff --git a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunInstancesLiveTest.java b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunInstancesLiveTest.java new file mode 100644 index 0000000..c194f2b --- /dev/null +++ b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunInstancesLiveTest.java @@ -0,0 +1,66 @@ +package com.axemblr.provisionr.amazon.activities; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.activiti.engine.delegate.DelegateExecution; + +import com.axemblr.provisionr.api.access.AdminAccess; +import com.axemblr.provisionr.api.hardware.Hardware; +import com.axemblr.provisionr.api.network.Network; +import com.axemblr.provisionr.api.network.Rule; +import com.axemblr.provisionr.api.pool.Pool; +import com.axemblr.provisionr.core.CoreProcessVariables; + +public abstract class RunInstancesLiveTest extends AmazonActivityLiveTest { + + protected DelegateExecution execution; + protected Pool pool; + + @Override + @SuppressWarnings("unchecked") + public void setUp() throws Exception { + super.setUp(); + + execution = mock(DelegateExecution.class); + pool = mock(Pool.class); + + final AdminAccess adminAccess = AdminAccess.builder() + .username("admin") + .publicKey(getResourceAsString("keys/test.pub")) + .privateKey(getResourceAsString("keys/test")) + .createAdminAccess(); + + final Network network = Network.builder().addRules( + Rule.builder().anySource().tcp().port(22).createRule()).createNetwork(); + + final Hardware hardware = Hardware.builder().type("t1.micro").createHardware(); + + when(pool.getProvider()).thenReturn(provider); + when(pool.getAdminAccess()).thenReturn(adminAccess); + when(pool.getNetwork()).thenReturn(network); + + when(pool.getMinSize()).thenReturn(1); + when(pool.getExpectedSize()).thenReturn(1); + + when(pool.getHardware()).thenReturn(hardware); + + when(execution.getProcessBusinessKey()).thenReturn(BUSINESS_KEY); + when(execution.getVariable(CoreProcessVariables.POOL)).thenReturn(pool); + + executeActivitiesInSequence(execution, + EnsureKeyPairExists.class, + EnsureSecurityGroupExists.class + ); + } + + @Override + @SuppressWarnings("unchecked") + public void tearDown() throws Exception { + executeActivitiesInSequence(execution, + DeleteSecurityGroup.class, + DeleteKeyPair.class + ); + super.tearDown(); + } +} diff --git a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunOnDemandInstancesLiveTest.java b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunOnDemandInstancesLiveTest.java index 113bdca..e2a4123 100644 --- a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunOnDemandInstancesLiveTest.java +++ b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunOnDemandInstancesLiveTest.java @@ -16,81 +16,23 @@ package com.axemblr.provisionr.amazon.activities; -import com.amazonaws.services.ec2.model.DescribeInstancesRequest; -import com.amazonaws.services.ec2.model.DescribeInstancesResult; -import com.amazonaws.services.ec2.model.TerminateInstancesRequest; -import com.axemblr.provisionr.core.CoreProcessVariables; -import com.axemblr.provisionr.test.ProcessVariablesCollector; -import com.axemblr.provisionr.amazon.ProcessVariables; -import com.axemblr.provisionr.api.access.AdminAccess; -import com.axemblr.provisionr.api.hardware.Hardware; -import com.axemblr.provisionr.api.network.Network; -import com.axemblr.provisionr.api.network.Rule; -import com.axemblr.provisionr.api.pool.Pool; -import java.util.List; -import org.activiti.engine.delegate.DelegateExecution; import static org.fest.assertions.api.Assertions.assertThat; -import org.junit.Test; -import org.mockito.Matchers; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class RunOnDemandInstancesLiveTest extends AmazonActivityLiveTest { - - private DelegateExecution execution; - private Pool pool; - - @Override - @SuppressWarnings("unchecked") - public void setUp() throws Exception { - super.setUp(); - - execution = mock(DelegateExecution.class); - pool = mock(Pool.class); - - final AdminAccess adminAccess = AdminAccess.builder() - .username("admin") - .publicKey(getResourceAsString("keys/test.pub")) - .privateKey(getResourceAsString("keys/test")) - .createAdminAccess(); - - final Network network = Network.builder().addRules( - Rule.builder().anySource().tcp().port(22).createRule()).createNetwork(); - final Hardware hardware = Hardware.builder().type("t1.micro").createHardware(); - - when(pool.getProvider()).thenReturn(provider); - when(pool.getAdminAccess()).thenReturn(adminAccess); - when(pool.getNetwork()).thenReturn(network); - - when(pool.getMinSize()).thenReturn(1); - when(pool.getExpectedSize()).thenReturn(1); - - when(pool.getHardware()).thenReturn(hardware); +import java.util.List; - when(execution.getProcessBusinessKey()).thenReturn(BUSINESS_KEY); - when(execution.getVariable(CoreProcessVariables.POOL)).thenReturn(pool); +import org.junit.Test; - executeActivitiesInSequence(execution, - EnsureKeyPairExists.class, - EnsureSecurityGroupExists.class - ); - } +import com.amazonaws.services.ec2.model.DescribeInstancesRequest; +import com.amazonaws.services.ec2.model.DescribeInstancesResult; +import com.amazonaws.services.ec2.model.TerminateInstancesRequest; +import com.axemblr.provisionr.amazon.ProcessVariables; +import com.axemblr.provisionr.test.ProcessVariablesCollector; - @Override - @SuppressWarnings("unchecked") - public void tearDown() throws Exception { - executeActivitiesInSequence(execution, - DeleteSecurityGroup.class, - DeleteKeyPair.class - ); - super.tearDown(); - } +public class RunOnDemandInstancesLiveTest extends RunInstancesLiveTest { @Test public void testRunInstances() throws Exception { diff --git a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java index 0684d86..52c9982 100644 --- a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java +++ b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java @@ -16,5 +16,28 @@ package com.axemblr.provisionr.amazon.activities; -public class RunSpotInstancesLiveTest extends AmazonActivityLiveTest { +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +public class RunSpotInstancesLiveTest extends RunInstancesLiveTest { + + @Override + public void setUp() throws Exception { + super.setUp(); + + // TODO: is adding the bid as an option for the pool ok? + Map options = new HashMap(); + options.put("bid", "0.03"); + when(pool.getOptions()).thenReturn(options); + } + + @Test + public void testRunSpotInstances() { + // TODO + } + } From 73c90a05feb812db56d6f9b431863232ee0ac691 Mon Sep 17 00:00:00 2001 From: Alex Ciminian Date: Thu, 31 Jan 2013 16:13:21 +0200 Subject: [PATCH 5/6] Moved the logic for spot requests in separate activities. Working process test for this case. --- .gitignore | 2 + .../commands/CreatePoolCommand.java | 39 +- .../amazon/AmazonProvisionrLiveTest.java | 19 +- .../provisionr/amazon/AmazonProvisionr.java | 4 + .../provisionr/amazon/ProcessVariables.java | 42 + .../AllInstancesMatchPredicate.java | 22 +- .../AllSpotRequestsMatchPredicate.java | 83 ++ .../amazon/activities/AmazonActivity.java | 20 + .../amazon/activities/CancelSpotRequests.java | 42 + .../activities/CheckAllRequestsAreActive.java | 35 + .../activities/CheckNoRequestsAreOpen.java | 26 + .../GetInstanceIdsFromSpotRequests.java | 44 + .../activities/PublishListOfMachines.java | 7 +- .../amazon/activities/RunInstances.java | 30 +- .../amazon/activities/RunSpotInstances.java | 95 +-- .../amazon/activities/TerminateInstances.java | 5 +- .../amazon/options/ProviderOptions.java | 3 + .../activiti/amazonPoolManagement.bpmn20.xml | 793 +++++++++++------- .../resources/OSGI-INF/blueprint/context.xml | 32 +- .../activities/RunSpotInstancesLiveTest.java | 96 ++- 20 files changed, 1056 insertions(+), 383 deletions(-) create mode 100644 providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllSpotRequestsMatchPredicate.java create mode 100644 providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CancelSpotRequests.java create mode 100644 providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CheckAllRequestsAreActive.java create mode 100644 providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CheckNoRequestsAreOpen.java create mode 100644 providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/GetInstanceIdsFromSpotRequests.java diff --git a/.gitignore b/.gitignore index 53989c5..b666d83 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ target core/.xml felix-cache .metadata +test-support/.externalToolBuilders +test-support/maven-eclipse.xml diff --git a/karaf/commands/src/main/java/com/axemblr/provisionr/commands/CreatePoolCommand.java b/karaf/commands/src/main/java/com/axemblr/provisionr/commands/CreatePoolCommand.java index 05490c8..7b6c647 100644 --- a/karaf/commands/src/main/java/com/axemblr/provisionr/commands/CreatePoolCommand.java +++ b/karaf/commands/src/main/java/com/axemblr/provisionr/commands/CreatePoolCommand.java @@ -16,6 +16,18 @@ package com.axemblr.provisionr.commands; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; + +import org.apache.felix.gogo.commands.Command; +import org.apache.felix.gogo.commands.Option; +import org.apache.karaf.shell.console.OsgiCommandSupport; + import com.axemblr.provisionr.api.Provisionr; import com.axemblr.provisionr.api.access.AdminAccess; import com.axemblr.provisionr.api.hardware.Hardware; @@ -29,21 +41,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Optional; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Files; -import java.io.File; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Set; -import org.apache.felix.gogo.commands.Command; -import org.apache.felix.gogo.commands.Option; -import org.apache.karaf.shell.console.OsgiCommandSupport; /** * A typical call looks like this: @@ -68,6 +71,10 @@ public class CreatePoolCommand extends OsgiCommandSupport { @Option(name = "-h", aliases = "--hardware-type", description = "Virtual machine hardware type") private String hardwareType = "t1.micro"; + + @Option(name = "-b", aliases = "--bid", description = "Bid for Amazon Spot Instance. If specified, requests" + + "spot instances, otherwise defaults to on demand instances.") + private Float spotBid; @Option(name = "--port", description = "Firewall port that need to be open for any TCP traffic " + "(multi-valued). SSH (22) is always open by default.", multiValued = true) @@ -109,7 +116,16 @@ Pool createPoolFromArgumentsAndServiceDefaults(Provisionr service) { final Optional defaultProvider = service.getDefaultProvider(); checkArgument(defaultProvider.isPresent(), String.format("please configure a default provider " + "by editing etc/com.axemblr.provisionr.%s.cfg", id)); + + Provider provider; + if (spotBid != null) { + provider = defaultProvider.get().toBuilder() + .option("spotBid", spotBid.toString()).createProvider(); + } else { + provider = defaultProvider.get(); + } + /* Always allow ICMP and ssh traffic by default */ final Network network = Network.builder().addRules( Rule.builder().anySource().icmp().createRule(), @@ -122,8 +138,9 @@ Pool createPoolFromArgumentsAndServiceDefaults(Provisionr service) { final Software software = Software.builder().packages(packages).createSoftware(); + final Pool pool = Pool.builder() - .provider(defaultProvider.get()) + .provider(provider) .hardware(hardware) .software(software) .network(network) @@ -132,7 +149,7 @@ Pool createPoolFromArgumentsAndServiceDefaults(Provisionr service) { .expectedSize(size) .cacheBaseImage(cacheBaseImage) .createPool(); - + if (template != null) { for (PoolTemplate candidate : templates) { if (candidate.getId().equalsIgnoreCase(template)) { diff --git a/providers/amazon-tests/src/test/java/com/axemblr/provisionr/amazon/AmazonProvisionrLiveTest.java b/providers/amazon-tests/src/test/java/com/axemblr/provisionr/amazon/AmazonProvisionrLiveTest.java index 16c87d5..e9cffb2 100644 --- a/providers/amazon-tests/src/test/java/com/axemblr/provisionr/amazon/AmazonProvisionrLiveTest.java +++ b/providers/amazon-tests/src/test/java/com/axemblr/provisionr/amazon/AmazonProvisionrLiveTest.java @@ -77,16 +77,31 @@ public Option[] configuration() throws Exception { } @Test - public void startProvisioningProcess() throws Exception { + public void startProvisioningProcessForOnDemandInstances() throws Exception { + startProvisioningProcess(null); + } + + @Test + public void startProvisioningProcessForSpotInstances() throws Exception { + startProvisioningProcess("0.04"); + } + + private void startProvisioningProcess(String spotBid) throws Exception { waitForProcessDeployment(AmazonProvisionr.MANAGEMENT_PROCESS_KEY); final Provisionr provisionr = getOsgiService(Provisionr.class, 5000); - final Provider provider = collectProviderCredentialsFromSystemProperties() + Provider provider = collectProviderCredentialsFromSystemProperties() .option(ProviderOptions.REGION, getProviderProperty( ProviderOptions.REGION, ProviderOptions.DEFAULT_REGION)) .createProvider(); + if (spotBid != null) { + provider = provider.toBuilder() + .option(ProviderOptions.SPOT_BID, spotBid) + .createProvider(); + } + final Network network = Network.builder().addRules( Rule.builder().anySource().icmp().createRule(), Rule.builder().anySource().port(22).protocol(Protocol.TCP).createRule() diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/AmazonProvisionr.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/AmazonProvisionr.java index 30b0d73..739314e 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/AmazonProvisionr.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/AmazonProvisionr.java @@ -17,6 +17,7 @@ package com.axemblr.provisionr.amazon; import com.axemblr.provisionr.amazon.config.DefaultProviderConfig; +import com.axemblr.provisionr.amazon.options.ProviderOptions; import com.axemblr.provisionr.api.pool.Machine; import com.axemblr.provisionr.api.pool.Pool; import com.axemblr.provisionr.api.provider.Provider; @@ -75,6 +76,9 @@ public String startPoolManagementProcess(String businessKey, Pool pool) { arguments.put(CoreProcessVariables.POOL, pool); arguments.put(CoreProcessVariables.PROVIDER, getId()); arguments.put(CoreProcessVariables.POOL_BUSINESS_KEY, businessKey); + + /* needed because the Activiti EL doesn't work as expected and properties can't be read from the pool. */ + arguments.put(ProcessVariables.SPOT_BID, pool.getProvider().getOption(ProviderOptions.SPOT_BID)); /* Authenticate as kermit to make the process visible in the Explorer UI */ processEngine.getIdentityService().setAuthenticatedUserId(CoreConstants.ACTIVITI_EXPLORER_DEFAULT_USER); diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/ProcessVariables.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/ProcessVariables.java index 29cdb6b..0afcb5c 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/ProcessVariables.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/ProcessVariables.java @@ -34,6 +34,48 @@ private ProcessVariables() { */ public static final String RESERVATION_ID = "reservationId"; + /** + * The amount the user is willing to pay for spot instances in the + * Amazon pool he's trying to start. If set, the request is for spot + * instances, if null the request is for on demand instances. + * + * @see com.axemblr.provisionr.amazon.activities.RunSpotInstances + */ + public static final String SPOT_BID = "spotBid"; + + /** + * Flag that gets set when the process attempts to send spot requests + * for the first time. Because the describe call is not consistent + * until a reasonable delay passes, this will be used to timeout the + * Activiti retries so that the requests are not resent if they were + * successful. + * + * @see com.axemblr.provisionr.amazon.activities.RunSpotInstances + */ + public static final String SPOT_REQUESTS_SENT = "spotRequestsSent"; + + /** + * List of request IDs as returned by Amazon for spot instances. These need to + * be followed up to get the actual instance IDs. + * + * @see com.axemblr.provisionr.amazon.activities.RunSpotInstances + */ + public static final String SPOT_INSTANCE_REQUEST_IDS = "spotInstanceRequestIds"; + + /** + * Have all spot instance requests been handled by Amazon? (none are pending) + * + * @see com.axemblr.provisionr.amazon.activities.CheckNoRequestsAreOpen + */ + public static final String NO_SPOT_INSTANCE_REQUESTS_OPEN = "noSpotInstanceRequestsOpen"; + + /** + * Are all spot instance requests in an active state? (none cancelled, none terminated) + * + * @see com.axemblr.provisionr.amazon.activities.CheckAllRequestsAreActive + */ + public static final String ALL_SPOT_INSTANCE_REQUESTS_ACTIVE = "allSpotInstanceRequestsActive"; + /** * List of instance IDs as returned by Amazon * diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllInstancesMatchPredicate.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllInstancesMatchPredicate.java index c0caa16..4dbadc2 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllInstancesMatchPredicate.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllInstancesMatchPredicate.java @@ -16,23 +16,29 @@ package com.axemblr.provisionr.amazon.activities; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; + +import org.activiti.engine.delegate.DelegateExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.amazonaws.AmazonServiceException; import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.Instance; +import com.amazonaws.services.ec2.model.Reservation; +import com.amazonaws.services.ec2.model.SpotInstanceRequest; import com.axemblr.provisionr.amazon.ProcessVariables; import com.axemblr.provisionr.amazon.core.ProviderClientCache; import com.axemblr.provisionr.api.pool.Pool; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; -import java.util.List; -import org.activiti.engine.delegate.DelegateExecution; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; public abstract class AllInstancesMatchPredicate extends AmazonActivity { @@ -57,9 +63,9 @@ public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) th try { DescribeInstancesResult result = client.describeInstances(new DescribeInstancesRequest() .withInstanceIds(instanceIds)); - checkState(result.getReservations().size() == 1, "the instance ids are part of multiple reservations"); - List instances = result.getReservations().get(0).getInstances(); + List instances = collectInstancesFromReservations(result.getReservations()); + if (Iterables.all(instances, predicate)) { LOG.info(">> All {} instances match predicate {} ", instanceIds, predicate); execution.setVariable(resultVariable, true); diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllSpotRequestsMatchPredicate.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllSpotRequestsMatchPredicate.java new file mode 100644 index 0000000..531a5c1 --- /dev/null +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllSpotRequestsMatchPredicate.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2012 S.C. Axemblr Software Solutions S.R.L + * Copyright (c) 2012 S.C. Axemblr Software Solutions S.R.L + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.axemblr.provisionr.amazon.activities; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; + +import org.activiti.engine.delegate.DelegateExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsRequest; +import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsResult; +import com.amazonaws.services.ec2.model.SpotInstanceRequest; +import com.axemblr.provisionr.amazon.ProcessVariables; +import com.axemblr.provisionr.amazon.core.ProviderClientCache; +import com.axemblr.provisionr.api.pool.Pool; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; + +public class AllSpotRequestsMatchPredicate extends AmazonActivity { + + private static final Logger LOG = LoggerFactory.getLogger(AllSpotRequestsMatchPredicate.class); + + protected final String resultVariable; + private final Predicate predicate; + + protected AllSpotRequestsMatchPredicate(ProviderClientCache cache, String resultVariable, + Predicate predicate) { + super(cache); + this.resultVariable = checkNotNull(resultVariable, "resultVariable is null"); + this.predicate = checkNotNull(predicate, "predicate is null"); + } + + @Override + public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) throws Exception { + + LOG.info(">> Checking if all spot requests match predicate {}", predicate); + + @SuppressWarnings("unchecked") + List requestIds = (List) execution.getVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS); + checkNotNull(requestIds, "process variable '{}' not found", ProcessVariables.SPOT_INSTANCE_REQUEST_IDS); + + DescribeSpotInstanceRequestsRequest describeRequest = new DescribeSpotInstanceRequestsRequest(); + describeRequest.setSpotInstanceRequestIds(requestIds); + + try { + // Retrieve all of the requests we want to monitor. + DescribeSpotInstanceRequestsResult describeResult = client.describeSpotInstanceRequests(describeRequest); + List requests = describeResult.getSpotInstanceRequests(); + + if (Iterables.all(requests, predicate)) { + LOG.info(">> All {} requests match predicate {} ", requests, predicate); + execution.setVariable(resultVariable, true); + } else { + LOG.info("<< Not all requests {} match predicate {}", requests, predicate); + execution.setVariable(resultVariable, false); + } + } catch (AmazonServiceException exception) { + // couldn't find relevant error codes, so we always propagate the exception + throw Throwables.propagate(exception); + } + } +} diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AmazonActivity.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AmazonActivity.java index 47ac0e3..11a3a30 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AmazonActivity.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AmazonActivity.java @@ -16,10 +16,18 @@ package com.axemblr.provisionr.amazon.activities; +import java.util.List; + import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.model.Instance; +import com.amazonaws.services.ec2.model.Reservation; import com.axemblr.provisionr.amazon.core.ProviderClientCache; import com.axemblr.provisionr.api.pool.Pool; import com.axemblr.provisionr.core.CoreProcessVariables; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + import static com.google.common.base.Preconditions.checkNotNull; import org.activiti.engine.delegate.DelegateExecution; import org.activiti.engine.delegate.JavaDelegate; @@ -52,4 +60,16 @@ public void execute(DelegateExecution execution) throws Exception { execute(providerClientCache.getUnchecked(pool.getProvider()), pool, execution); } + + protected List collectInstancesFromReservations(List reservation) { + /* Make a copy as an ArrayList to force lazy collection evaluation */ + List> allInstances = Lists.newArrayList(Lists.transform(reservation, new Function>() { + @Override + public List apply(Reservation reservation) { + return reservation.getInstances(); + } + })); + + return Lists.newArrayList(Iterables.concat(allInstances)); + } } diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CancelSpotRequests.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CancelSpotRequests.java new file mode 100644 index 0000000..6361794 --- /dev/null +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CancelSpotRequests.java @@ -0,0 +1,42 @@ +package com.axemblr.provisionr.amazon.activities; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; + +import org.activiti.engine.delegate.DelegateExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.model.CancelSpotInstanceRequestsRequest; +import com.axemblr.provisionr.amazon.AmazonProvisionr; +import com.axemblr.provisionr.amazon.ProcessVariables; +import com.axemblr.provisionr.amazon.core.ProviderClientCache; +import com.axemblr.provisionr.api.pool.Pool; + +public class CancelSpotRequests extends AmazonActivity { + + public static final Logger LOG = LoggerFactory.getLogger(AmazonProvisionr.class); + + public CancelSpotRequests(ProviderClientCache providerClientCache) { + super(providerClientCache); + } + + @Override + public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) throws Exception { + @SuppressWarnings("unchecked") + List requests = (List) execution.getVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS); + checkNotNull(requests, "process variable '{}' not found", ProcessVariables.SPOT_INSTANCE_REQUEST_IDS); + try { + if (requests.size() > 0) { + client.cancelSpotInstanceRequests(new CancelSpotInstanceRequestsRequest() + .withSpotInstanceRequestIds(requests)); + } + } catch (AmazonServiceException exception) { + LOG.warn("There was an error cancelling your spot instance requests: {}", exception); + throw exception; + } + } +} diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CheckAllRequestsAreActive.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CheckAllRequestsAreActive.java new file mode 100644 index 0000000..22fe199 --- /dev/null +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CheckAllRequestsAreActive.java @@ -0,0 +1,35 @@ +package com.axemblr.provisionr.amazon.activities; + +import org.activiti.engine.delegate.DelegateExecution; + +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.model.SpotInstanceRequest; +import com.axemblr.provisionr.amazon.ProcessVariables; +import com.axemblr.provisionr.amazon.core.ProviderClientCache; +import com.axemblr.provisionr.api.pool.Pool; +import com.google.common.base.Predicate; + +public class CheckAllRequestsAreActive extends AllSpotRequestsMatchPredicate { + + public static class RequestIsActive implements Predicate { + + @Override + public boolean apply(SpotInstanceRequest request) { + return "active".equalsIgnoreCase(request.getState()); + } + + @Override + public String toString() { + return "RequestIsActive{}"; + } + } + + public CheckAllRequestsAreActive(ProviderClientCache cache) { + super(cache, ProcessVariables.ALL_SPOT_INSTANCE_REQUESTS_ACTIVE, new RequestIsActive()); + } + + @Override + public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) throws Exception { + super.execute(client, pool, execution); + } +} diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CheckNoRequestsAreOpen.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CheckNoRequestsAreOpen.java new file mode 100644 index 0000000..4641e1f --- /dev/null +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/CheckNoRequestsAreOpen.java @@ -0,0 +1,26 @@ +package com.axemblr.provisionr.amazon.activities; + +import com.amazonaws.services.ec2.model.SpotInstanceRequest; +import com.axemblr.provisionr.amazon.ProcessVariables; +import com.axemblr.provisionr.amazon.core.ProviderClientCache; +import com.google.common.base.Predicate; + +public class CheckNoRequestsAreOpen extends AllSpotRequestsMatchPredicate { + + public static class RequestIsNotOpen implements Predicate { + + @Override + public boolean apply(SpotInstanceRequest request) { + return !"open".equalsIgnoreCase(request.getState()); + } + + @Override + public String toString() { + return "RequestIsNotOpen{}"; + } + } + + public CheckNoRequestsAreOpen(ProviderClientCache cache) { + super(cache, ProcessVariables.NO_SPOT_INSTANCE_REQUESTS_OPEN, new RequestIsNotOpen()); + } +} diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/GetInstanceIdsFromSpotRequests.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/GetInstanceIdsFromSpotRequests.java new file mode 100644 index 0000000..64b613b --- /dev/null +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/GetInstanceIdsFromSpotRequests.java @@ -0,0 +1,44 @@ +package com.axemblr.provisionr.amazon.activities; + +import java.util.ArrayList; +import java.util.List; + +import org.activiti.engine.delegate.DelegateExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsRequest; +import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsResult; +import com.amazonaws.services.ec2.model.SpotInstanceRequest; +import com.axemblr.provisionr.amazon.ProcessVariables; +import com.axemblr.provisionr.amazon.core.ProviderClientCache; +import com.axemblr.provisionr.api.pool.Pool; + +public class GetInstanceIdsFromSpotRequests extends AmazonActivity { + + private static final Logger LOG = LoggerFactory.getLogger(GetInstanceIdsFromSpotRequests.class); + + public GetInstanceIdsFromSpotRequests(ProviderClientCache providerClientCache) { + super(providerClientCache); + } + + @Override + public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) throws Exception { + LOG.info(">> retrieving instance Ids from spot request Ids"); + + @SuppressWarnings("unchecked") + List requestIds = + (List) execution.getVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS); + DescribeSpotInstanceRequestsResult result = client.describeSpotInstanceRequests( + new DescribeSpotInstanceRequestsRequest().withSpotInstanceRequestIds(requestIds)); + List instanceIds = new ArrayList(); + for (SpotInstanceRequest spotRequest : result.getSpotInstanceRequests()) { + if (spotRequest.getInstanceId() != null) { + instanceIds.add(spotRequest.getInstanceId()); + } + } + execution.setVariable(ProcessVariables.INSTANCE_IDS, instanceIds); + } + +} diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/PublishListOfMachines.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/PublishListOfMachines.java index fd5742b..30adafd 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/PublishListOfMachines.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/PublishListOfMachines.java @@ -59,12 +59,11 @@ public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) th LOG.info(">> Describing instances {}", instanceIds); DescribeInstancesResult result = client.describeInstances(new DescribeInstancesRequest() .withInstanceIds(instanceIds)); - checkArgument(result.getReservations().size() == 1, "found more than one reservation"); - Reservation reservation = result.getReservations().get(0); - LOG.info("<< Got one reservation with {} running instances", reservation.getInstances().size()); + LOG.info("<< Got the following reservations: {}", result.getReservations()); - List machines = Lists.transform(reservation.getInstances(), + List instances = collectInstancesFromReservations(result.getReservations()); + List machines = Lists.transform(instances, new Function() { @Override public Machine apply(Instance instance) { diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java index f8154a3..34fed89 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java @@ -1,7 +1,10 @@ package com.axemblr.provisionr.amazon.activities; +import static com.google.common.base.Preconditions.checkNotNull; + import java.io.IOException; import java.util.Arrays; +import java.util.Calendar; import net.schmizz.sshj.common.Base64; @@ -12,6 +15,7 @@ import com.amazonaws.services.ec2.model.LaunchSpecification; import com.amazonaws.services.ec2.model.RequestSpotInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesRequest; +import com.amazonaws.services.ec2.model.SpotInstanceType; import com.axemblr.provisionr.amazon.ProcessVariables; import com.axemblr.provisionr.amazon.core.ImageTable; import com.axemblr.provisionr.amazon.core.ImageTableQuery; @@ -61,15 +65,23 @@ private AmazonWebServiceRequest createRequest(Pool pool, DelegateExecution execu "/com/axemblr/provisionr/amazon/userdata.sh"), Charsets.UTF_8); if (spot) { - LaunchSpecification ls = new LaunchSpecification(); - ls.setInstanceType(instanceType); - ls.setKeyName(keyPairName); - ls.setImageId(imageId); - ls.setSecurityGroups(Arrays.asList(new String[] {securityGroupName})); - ls.setUserData(Base64.encodeBytes(userData.getBytes(Charsets.UTF_8))); - return new RequestSpotInstancesRequest() - .withLaunchSpecification(ls) - .withInstanceCount(pool.getExpectedSize()); + Calendar validUntil = Calendar.getInstance(); + validUntil.add(Calendar.MINUTE, 10); + final String spotPrice = checkNotNull(pool.getProvider().getOption(ProviderOptions.SPOT_BID), + "The bid for spot instances was not specified"); + LaunchSpecification ls = new LaunchSpecification() + .withInstanceType(instanceType) + .withKeyName(keyPairName) + .withImageId(imageId) + .withSecurityGroups(Arrays.asList(new String[] { securityGroupName })) + .withUserData(Base64.encodeBytes(userData.getBytes(Charsets.UTF_8))); + return new RequestSpotInstancesRequest() + .withSpotPrice(spotPrice) + .withLaunchSpecification(ls) + .withLaunchGroup(businessKey) + .withInstanceCount(pool.getExpectedSize()) + .withType(SpotInstanceType.OneTime) + .withValidUntil(validUntil.getTime()); } else { return new RunInstancesRequest() .withClientToken(businessKey) diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java index e901beb..14ae45e 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java @@ -15,15 +15,18 @@ */ package com.axemblr.provisionr.amazon.activities; -import java.util.ArrayList; +import static com.google.common.base.Preconditions.checkNotNull; + import java.util.List; import org.activiti.engine.delegate.DelegateExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.amazonaws.AmazonServiceException; import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsRequest; import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsResult; +import com.amazonaws.services.ec2.model.Filter; import com.amazonaws.services.ec2.model.RequestSpotInstancesRequest; import com.amazonaws.services.ec2.model.RequestSpotInstancesResult; import com.amazonaws.services.ec2.model.SpotInstanceRequest; @@ -31,74 +34,54 @@ import com.axemblr.provisionr.amazon.core.ProviderClientCache; import com.axemblr.provisionr.api.pool.Pool; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.collect.Lists; public class RunSpotInstances extends RunInstances { + private static final Logger LOG = LoggerFactory.getLogger(RunSpotInstances.class); + public RunSpotInstances(ProviderClientCache cache) { super(cache); } @Override public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) throws Exception { - final RequestSpotInstancesRequest request = createSpotInstancesRequest(pool, execution); - - RequestSpotInstancesResult requestResult = client.requestSpotInstances(request); - - List spotInstanceRequestIds = - collectSpotInstanceRequestIds(requestResult.getSpotInstanceRequests()); - - // Create a variable that will track whether there are any - // requests still in the open state. - boolean anyOpen; - List instanceIds = new ArrayList(); - do { - // Create the describeRequest object with all of the request ids - // to monitor (e.g. that we started). - DescribeSpotInstanceRequestsRequest describeRequest = new DescribeSpotInstanceRequestsRequest(); - describeRequest.setSpotInstanceRequestIds(spotInstanceRequestIds); + /* before sending a new request, we check to see if we already registered + a launch group with the process ID, if yes, we don't re-send the request */ + final String businessKey = execution.getProcessBusinessKey(); + + /* we timeout if requests have already been sent - the activity is being retried. */ + Optional alreadySent = Optional.fromNullable( + execution.getVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS)); + if (alreadySent.isPresent()) { + try { + Thread.sleep(60 * 1000); + } catch (InterruptedException exception) { + LOG.warn("Timeout to make describe calls consistent was interrupted", exception); + } + } - // Initialize the anyOpen variable to false - which assumes there - // are no requests open unless we find one that is still open. - anyOpen = false; + DescribeSpotInstanceRequestsResult result = client.describeSpotInstanceRequests( + new DescribeSpotInstanceRequestsRequest() + .withFilters(new Filter() + .withName("launch-group").withValues(businessKey) + .withName("state").withValues("open", "active"))); + List pending = result.getSpotInstanceRequests(); + if (pending.size() > 0) { + LOG.info("Not resending spot instance requests {} for businessKey: {}.", pending, businessKey); + execution.setVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS, + collectSpotInstanceRequestIds(pending)); + return; + } - try { - // Retrieve all of the requests we want to monitor. - DescribeSpotInstanceRequestsResult describeResult = client.describeSpotInstanceRequests(describeRequest); - List describeResponses = describeResult.getSpotInstanceRequests(); + final RequestSpotInstancesRequest request = createSpotInstancesRequest(pool, execution); + execution.setVariable(ProcessVariables.SPOT_REQUESTS_SENT, true); + RequestSpotInstancesResult requestResult = client.requestSpotInstances(request); + List spotInstanceRequestIds = collectSpotInstanceRequestIds(requestResult.getSpotInstanceRequests()); - // Look through each request and determine if they are all in - // the active state. - for (SpotInstanceRequest describeResponse : describeResponses) { - if (describeResponse.getState().equals("open")) { - anyOpen = true; - break; - } - // Add the instance id to the list we will - // eventually terminate. - if (describeResponse.getState().equals("active")) { - instanceIds.add(describeResponse.getInstanceId()); - } - } - } catch (AmazonServiceException e) { - // If we have an exception, ensure we don't break out of - // the loop. This prevents the scenario where there was - // blip on the wire. - anyOpen = true; - } - - // TODO: check that this timeout is ok - try { - // Sleep for 60 seconds. - Thread.sleep(60*1000); - } catch (Exception e) { - // Do nothing because it woke up early. - } - } while (anyOpen); - - - execution.setVariable(ProcessVariables.INSTANCE_IDS, instanceIds); + execution.setVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS, spotInstanceRequestIds); } private List collectSpotInstanceRequestIds(List requestResponses) { diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/TerminateInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/TerminateInstances.java index 458acf9..3f6b13e 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/TerminateInstances.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/TerminateInstances.java @@ -22,7 +22,6 @@ import com.axemblr.provisionr.amazon.core.ProviderClientCache; import com.axemblr.provisionr.api.pool.Pool; import static com.google.common.base.Preconditions.checkNotNull; -import java.util.Arrays; import java.util.List; import org.activiti.engine.delegate.DelegateExecution; import org.slf4j.Logger; @@ -46,6 +45,8 @@ public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) { checkNotNull(instanceIds, "process variable '{}' not found", ProcessVariables.INSTANCE_IDS); LOG.info(">> Terminating instances: {}", instanceIds); - client.terminateInstances(new TerminateInstancesRequest().withInstanceIds(instanceIds)); + if (instanceIds.size() > 0) { + client.terminateInstances(new TerminateInstancesRequest().withInstanceIds(instanceIds)); + } } } diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/options/ProviderOptions.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/options/ProviderOptions.java index b4e5892..613a8f9 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/options/ProviderOptions.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/options/ProviderOptions.java @@ -16,6 +16,8 @@ package com.axemblr.provisionr.amazon.options; +import com.google.common.base.Optional; + public class ProviderOptions { private ProviderOptions() { @@ -24,5 +26,6 @@ private ProviderOptions() { public static final String REGION = "region"; public static final String DEFAULT_REGION = "us-east-1"; + public static final String SPOT_BID = "spotBid"; } diff --git a/providers/amazon/src/main/resources/OSGI-INF/activiti/amazonPoolManagement.bpmn20.xml b/providers/amazon/src/main/resources/OSGI-INF/activiti/amazonPoolManagement.bpmn20.xml index 4b8efad..06dc3d9 100644 --- a/providers/amazon/src/main/resources/OSGI-INF/activiti/amazonPoolManagement.bpmn20.xml +++ b/providers/amazon/src/main/resources/OSGI-INF/activiti/amazonPoolManagement.bpmn20.xml @@ -2,7 +2,8 @@ - + + @@ -22,7 +23,7 @@ - + @@ -35,14 +36,14 @@ - - + + - + - + PT10M @@ -53,24 +54,24 @@ - + - + - + - + - + @@ -83,14 +84,14 @@ - - - + + + - + PT15M @@ -98,45 +99,44 @@ - - - - - - - - + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - + @@ -149,428 +149,657 @@ - + + + + + + + + + + + + + + PT15S + + + + + + + + + + + + + + PT10M + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + 10 minutes timeout (PT10M) static - + Check all instances are running by using batch API operations - + Activated by terminatePoolEvent signal (external) - + Activated by terminatePoolOnTimeoutEvent signal thrown by boundary timers - + Throws terminatePoolOnTimeoutEvent signal - + Throws terminatePoolOnTimeoutEvent signal - - - - - - + + + + + - + Wait to 5 seconds and check again - + - + TODO: extend loop to monitor machines + + + + Check that all requests have been acknowledged by Amazon. + + + + + 10 minutes timeout - + + + Throws terminatePoolOnSpotRequestError signal + + + + Throws terminatePoolOnSpotRequestError signal + + + + Activated by the terminatePoolOnSpotRequestError signalo newline at end of file diff --git a/providers/amazon/src/main/resources/OSGI-INF/blueprint/context.xml b/providers/amazon/src/main/resources/OSGI-INF/blueprint/context.xml index b456d58..66d1ebd 100644 --- a/providers/amazon/src/main/resources/OSGI-INF/blueprint/context.xml +++ b/providers/amazon/src/main/resources/OSGI-INF/blueprint/context.xml @@ -80,7 +80,7 @@ - + @@ -99,6 +99,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java index 52c9982..b072688 100644 --- a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java +++ b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java @@ -16,28 +16,108 @@ package com.axemblr.provisionr.amazon.activities; +import static org.fest.assertions.api.Assertions.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsRequest; +import com.amazonaws.services.ec2.model.DescribeSpotInstanceRequestsResult; +import com.amazonaws.services.ec2.model.Filter; +import com.axemblr.provisionr.amazon.ProcessVariables; +import com.axemblr.provisionr.amazon.options.ProviderOptions; +import com.axemblr.provisionr.test.ProcessVariablesCollector; public class RunSpotInstancesLiveTest extends RunInstancesLiveTest { + private static final Logger LOG = LoggerFactory.getLogger(RunSpotInstancesLiveTest.class); + + /** + * This should be set a bit higher than the on demand instance + * price to avoid the situation in which the test fails because + * the spot bid is too low. + */ + public static String AMAZON_SPOT_BID = "0.04"; + + /** + * The timeout is needed because the describe calls don't + * immediately return all the spot requests, we need to wait a while. + */ + private static int TIMEOUT = 60 * 1000; + @Override public void setUp() throws Exception { - super.setUp(); - - // TODO: is adding the bid as an option for the pool ok? - Map options = new HashMap(); - options.put("bid", "0.03"); - when(pool.getOptions()).thenReturn(options); + super.setUp(); + + final String region = getProviderProperty(ProviderOptions.REGION, ProviderOptions.DEFAULT_REGION); + provider = collectProviderCredentialsFromSystemProperties() + .option(ProviderOptions.REGION, region) + .option(ProviderOptions.SPOT_BID, AMAZON_SPOT_BID) + .createProvider(); + when(pool.getProvider()).thenReturn(provider); } @Test - public void testRunSpotInstances() { - // TODO + public void testRunSpotInstances() throws Exception { + ProcessVariablesCollector collector = new ProcessVariablesCollector(); + collector.install(execution); + + activity.execute(execution); + + @SuppressWarnings("unchecked") + ArgumentCaptor> argument = (ArgumentCaptor>) + (Object) ArgumentCaptor.forClass(List.class); + verify(execution).setVariable(eq(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS), argument.capture()); + when(execution.getVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS)).thenReturn(argument.getValue()); + timeout(TIMEOUT); + + // shouldn't do anything + activity.execute(execution); + + timeout(TIMEOUT); + + DescribeSpotInstanceRequestsResult result = client.describeSpotInstanceRequests( + new DescribeSpotInstanceRequestsRequest().withFilters(new Filter() + .withName("launch-group").withValues(BUSINESS_KEY))); + + assertThat(result.getSpotInstanceRequests()).hasSize(1); + + timeout(TIMEOUT); + } + + @SuppressWarnings("unchecked") + @Override + public void tearDown() throws Exception { + // cleanup any pending requests or instances + ArgumentCaptor> argument = (ArgumentCaptor>) + (Object) ArgumentCaptor.forClass(List.class); + + executeActivitiesInSequence(execution, + CancelSpotRequests.class, + GetInstanceIdsFromSpotRequests.class); + + verify(execution).setVariable(eq(ProcessVariables.INSTANCE_IDS), argument.capture()); + when(execution.getVariable(ProcessVariables.INSTANCE_IDS)).thenReturn(argument.getValue()); + + executeActivitiesInSequence(execution, TerminateInstances.class); + super.tearDown(); + } + + private void timeout(int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException exception) { + LOG.info("Prematurely woken up"); + } } } From 77471692b827fe07aa5b077db76a22857466bc34 Mon Sep 17 00:00:00 2001 From: Alex Ciminian Date: Fri, 8 Feb 2013 14:57:05 +0200 Subject: [PATCH 6/6] Fixed review issues - added generic option for provider-specific properties with tests - fixed formatting and comments - better collection handling with Guava - used polling & stopwatches instead of just sleeping when waiting for spot requests --- .../commands/CreatePoolCommand.java | 66 ++++++++++++------- .../commands/CreatePoolCommandTest.java | 43 ++++++++++-- .../AllSpotRequestsMatchPredicate.java | 1 - .../amazon/activities/AmazonActivity.java | 6 +- .../amazon/activities/RunInstances.java | 61 ++++++++--------- .../amazon/activities/RunSpotInstances.java | 37 ++++++----- .../activities/RunSpotInstancesLiveTest.java | 62 +++++++---------- 7 files changed, 159 insertions(+), 117 deletions(-) diff --git a/karaf/commands/src/main/java/com/axemblr/provisionr/commands/CreatePoolCommand.java b/karaf/commands/src/main/java/com/axemblr/provisionr/commands/CreatePoolCommand.java index 7b6c647..516f1d8 100644 --- a/karaf/commands/src/main/java/com/axemblr/provisionr/commands/CreatePoolCommand.java +++ b/karaf/commands/src/main/java/com/axemblr/provisionr/commands/CreatePoolCommand.java @@ -19,15 +19,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import java.io.File; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.felix.gogo.commands.Command; -import org.apache.felix.gogo.commands.Option; -import org.apache.karaf.shell.console.OsgiCommandSupport; - import com.axemblr.provisionr.api.Provisionr; import com.axemblr.provisionr.api.access.AdminAccess; import com.axemblr.provisionr.api.hardware.Hardware; @@ -43,11 +34,23 @@ import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.io.Files; +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +import org.apache.felix.gogo.commands.Command; +import org.apache.felix.gogo.commands.Option; +import org.apache.karaf.shell.console.OsgiCommandSupport; + /** * A typical call looks like this: *

@@ -71,10 +74,11 @@ public class CreatePoolCommand extends OsgiCommandSupport { @Option(name = "-h", aliases = "--hardware-type", description = "Virtual machine hardware type") private String hardwareType = "t1.micro"; - - @Option(name = "-b", aliases = "--bid", description = "Bid for Amazon Spot Instance. If specified, requests" + - "spot instances, otherwise defaults to on demand instances.") - private Float spotBid; + + @Option(name = "-o", aliases = "--provider-options", description = "Provider-specific options (multi-valued)." + + "Expects either the key=value format or just plain key. If value is not specified, defaults to 'true'." + + "Supported values: spotBid=x.xxx (Amazon).", multiValued = true) + private List providerOptions = Lists.newArrayList(); @Option(name = "--port", description = "Firewall port that need to be open for any TCP traffic " + "(multi-valued). SSH (22) is always open by default.", multiValued = true) @@ -116,16 +120,15 @@ Pool createPoolFromArgumentsAndServiceDefaults(Provisionr service) { final Optional defaultProvider = service.getDefaultProvider(); checkArgument(defaultProvider.isPresent(), String.format("please configure a default provider " + "by editing etc/com.axemblr.provisionr.%s.cfg", id)); - - Provider provider; - if (spotBid != null) { - provider = defaultProvider.get().toBuilder() - .option("spotBid", spotBid.toString()).createProvider(); - } else { - provider = defaultProvider.get(); - } - + /* append the provider options that were passed in and rebuild the default provider */ + // TODO: this currently does not support overriding default options, it will throw an exception + Map options = ImmutableMap.builder() + .putAll(defaultProvider.get().getOptions()) // default options + .putAll(parseProviderOptions(providerOptions)) // options added by the user + .build(); + Provider provider = defaultProvider.get().toBuilder().options(options).createProvider(); + /* Always allow ICMP and ssh traffic by default */ final Network network = Network.builder().addRules( Rule.builder().anySource().icmp().createRule(), @@ -138,7 +141,7 @@ Pool createPoolFromArgumentsAndServiceDefaults(Provisionr service) { final Software software = Software.builder().packages(packages).createSoftware(); - + final Pool pool = Pool.builder() .provider(provider) .hardware(hardware) @@ -149,7 +152,7 @@ Pool createPoolFromArgumentsAndServiceDefaults(Provisionr service) { .expectedSize(size) .cacheBaseImage(cacheBaseImage) .createPool(); - + if (template != null) { for (PoolTemplate candidate : templates) { if (candidate.getId().equalsIgnoreCase(template)) { @@ -162,6 +165,16 @@ Pool createPoolFromArgumentsAndServiceDefaults(Provisionr service) { return pool; } + private Map parseProviderOptions(List providerOptions) { + Map result = Maps.newHashMap(); + for (String option : providerOptions) { + String[] parts = option.split("="); + String value = parts.length > 1 ? parts[1] : "true"; + result.put(parts[0], value); + } + return result; + } + private Set formatPortsAsIngressRules() { ImmutableSet.Builder rules = ImmutableSet.builder(); for (int port : ports) { @@ -220,6 +233,11 @@ void setPackages(List packages) { this.packages = ImmutableList.copyOf(packages); } + @VisibleForTesting + void setProviderOptions(List providerOptions) { + this.providerOptions = ImmutableList.copyOf(providerOptions); + } + @VisibleForTesting void setCacheBaseImage(boolean cacheBaseImage) { this.cacheBaseImage = cacheBaseImage; diff --git a/karaf/commands/src/test/java/com/axemblr/provisionr/commands/CreatePoolCommandTest.java b/karaf/commands/src/test/java/com/axemblr/provisionr/commands/CreatePoolCommandTest.java index eeeae22..8cdc7d1 100644 --- a/karaf/commands/src/test/java/com/axemblr/provisionr/commands/CreatePoolCommandTest.java +++ b/karaf/commands/src/test/java/com/axemblr/provisionr/commands/CreatePoolCommandTest.java @@ -16,23 +16,31 @@ package com.axemblr.provisionr.commands; +import static org.fest.assertions.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyMapOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.axemblr.provisionr.api.Provisionr; import com.axemblr.provisionr.api.access.AdminAccess; import com.axemblr.provisionr.api.pool.Pool; import com.axemblr.provisionr.api.provider.Provider; +import com.axemblr.provisionr.api.provider.ProviderBuilder; import com.axemblr.provisionr.core.templates.JenkinsTemplate; import com.axemblr.provisionr.core.templates.PoolTemplate; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; + import org.apache.felix.service.command.CommandSession; -import static org.fest.assertions.api.Assertions.assertThat; import org.junit.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class CreatePoolCommandTest { @@ -89,7 +97,13 @@ protected AdminAccess collectCurrentUserCredentialsForAdminAccess() { command.setTemplate(template.getId()); Provisionr service = mock(Provisionr.class); - when(service.getDefaultProvider()).thenReturn(Optional.of(mock(Provider.class))); + Provider provider = mock(Provider.class); + ProviderBuilder providerBuilder = mock(ProviderBuilder.class); + when(providerBuilder.options(anyMapOf(String.class, String.class))).thenReturn(providerBuilder); + when(providerBuilder.createProvider()).thenReturn(provider); + when(provider.toBuilder()).thenReturn(providerBuilder); + + when(service.getDefaultProvider()).thenReturn(Optional.of(provider)); Pool pool = command.createPoolFromArgumentsAndServiceDefaults(service); @@ -97,6 +111,25 @@ protected AdminAccess collectCurrentUserCredentialsForAdminAccess() { assertThat(pool.getSoftware().getPackages()).contains("jenkins").contains("git-core"); } + @Test + public void testProviderSpecificOptions() { + CreatePoolCommand command = new CreatePoolCommand(Collections.emptyList(), + Collections.emptyList()); + command.setId("service"); + command.setKey("key"); + command.setProviderOptions(Lists.newArrayList("spotBid=0.07")); + + Provisionr service = mock(Provisionr.class); + // TODO: consider refactoring this with an argument captor instead of + // using an actual object + Provider provider = new ProviderBuilder().id("id").endpoint("endpoint") + .accessKey("aKey").secretKey("sKey").createProvider(); + when(service.getDefaultProvider()).thenReturn(Optional.of(provider)); + + Pool pool = command.createPoolFromArgumentsAndServiceDefaults(service); + assertThat(pool.getProvider().getOption("spotBid")).isEqualTo("0.07"); + } + private Provisionr newProvisionrMockWithId(String id) { Provisionr service = mock(Provisionr.class); when(service.getId()).thenReturn(id); diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllSpotRequestsMatchPredicate.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllSpotRequestsMatchPredicate.java index 531a5c1..6588337 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllSpotRequestsMatchPredicate.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AllSpotRequestsMatchPredicate.java @@ -1,5 +1,4 @@ /* - * Copyright (c) 2012 S.C. Axemblr Software Solutions S.R.L * Copyright (c) 2012 S.C. Axemblr Software Solutions S.R.L * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AmazonActivity.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AmazonActivity.java index 11a3a30..6d8d27a 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AmazonActivity.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/AmazonActivity.java @@ -60,15 +60,15 @@ public void execute(DelegateExecution execution) throws Exception { execute(providerClientCache.getUnchecked(pool.getProvider()), pool, execution); } - + protected List collectInstancesFromReservations(List reservation) { /* Make a copy as an ArrayList to force lazy collection evaluation */ - List> allInstances = Lists.newArrayList(Lists.transform(reservation, new Function>() { + List> allInstances = Lists.transform(reservation, new Function>() { @Override public List apply(Reservation reservation) { return reservation.getInstances(); } - })); + }); return Lists.newArrayList(Iterables.concat(allInstances)); } diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java index 34fed89..ec5ac13 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunInstances.java @@ -28,30 +28,31 @@ import com.axemblr.provisionr.api.provider.Provider; import com.google.common.base.Charsets; import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.google.common.io.Resources; public abstract class RunInstances extends AmazonActivity { - public static final String DEFAULT_ARCH = "amd64"; + public static final String DEFAULT_ARCH = "amd64"; public static final String DEFAULT_TYPE = "instance-store"; - - protected RunInstances(ProviderClientCache providerClientCache) { - super(providerClientCache); - } - - protected RunInstancesRequest createOnDemandInstancesRequest(Pool pool, DelegateExecution execution) - throws IOException { - return (RunInstancesRequest) createRequest(pool, execution, false); - } - - protected RequestSpotInstancesRequest createSpotInstancesRequest(Pool pool, DelegateExecution execution) - throws IOException { - return (RequestSpotInstancesRequest) createRequest(pool, execution, true); - } - - private AmazonWebServiceRequest createRequest(Pool pool, DelegateExecution execution, boolean spot) - throws IOException { + + protected RunInstances(ProviderClientCache providerClientCache) { + super(providerClientCache); + } + + protected RunInstancesRequest createOnDemandInstancesRequest(Pool pool, DelegateExecution execution) + throws IOException { + return (RunInstancesRequest) createRequest(pool, execution, false); + } + + protected RequestSpotInstancesRequest createSpotInstancesRequest(Pool pool, DelegateExecution execution) + throws IOException { + return (RequestSpotInstancesRequest) createRequest(pool, execution, true); + } + + private AmazonWebServiceRequest createRequest(Pool pool, DelegateExecution execution, boolean spot) + throws IOException { final String businessKey = execution.getProcessBusinessKey(); final String securityGroupName = SecurityGroups.formatNameFromBusinessKey(businessKey); @@ -73,7 +74,7 @@ private AmazonWebServiceRequest createRequest(Pool pool, DelegateExecution execu .withInstanceType(instanceType) .withKeyName(keyPairName) .withImageId(imageId) - .withSecurityGroups(Arrays.asList(new String[] { securityGroupName })) + .withSecurityGroups(Lists.newArrayList(securityGroupName)) .withUserData(Base64.encodeBytes(userData.getBytes(Charsets.UTF_8))); return new RequestSpotInstancesRequest() .withSpotPrice(spotPrice) @@ -83,18 +84,18 @@ private AmazonWebServiceRequest createRequest(Pool pool, DelegateExecution execu .withType(SpotInstanceType.OneTime) .withValidUntil(validUntil.getTime()); } else { - return new RunInstancesRequest() - .withClientToken(businessKey) - .withSecurityGroups(securityGroupName) - .withKeyName(keyPairName) - .withInstanceType(instanceType) - .withImageId(imageId) - .withMinCount(pool.getMinSize()) - .withMaxCount(pool.getExpectedSize()) - .withUserData(Base64.encodeBytes(userData.getBytes(Charsets.UTF_8))); + return new RunInstancesRequest() + .withClientToken(businessKey) + .withSecurityGroups(securityGroupName) + .withKeyName(keyPairName) + .withInstanceType(instanceType) + .withImageId(imageId) + .withMinCount(pool.getMinSize()) + .withMaxCount(pool.getExpectedSize()) + .withUserData(Base64.encodeBytes(userData.getBytes(Charsets.UTF_8))); } - } - + } + private String getImageIdFromProcessVariablesOrQueryImageTable( VariableScope execution, Provider provider, String instanceType ) { diff --git a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java index 14ae45e..16766f4 100644 --- a/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java +++ b/providers/amazon/src/main/java/com/axemblr/provisionr/amazon/activities/RunSpotInstances.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.util.List; +import java.util.concurrent.TimeUnit; import org.activiti.engine.delegate.DelegateExecution; import org.slf4j.Logger; @@ -35,7 +36,9 @@ import com.axemblr.provisionr.api.pool.Pool; import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; public class RunSpotInstances extends RunInstances { @@ -55,27 +58,27 @@ public void execute(AmazonEC2 client, Pool pool, DelegateExecution execution) th /* we timeout if requests have already been sent - the activity is being retried. */ Optional alreadySent = Optional.fromNullable( execution.getVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS)); + if (alreadySent.isPresent()) { - try { - Thread.sleep(60 * 1000); - } catch (InterruptedException exception) { - LOG.warn("Timeout to make describe calls consistent was interrupted", exception); + DescribeSpotInstanceRequestsRequest describeRequest = new DescribeSpotInstanceRequestsRequest() + .withFilters(new Filter() + .withName("launch-group").withValues(businessKey) + .withName("state").withValues("open", "active")); + Stopwatch stopwatch = new Stopwatch().start(); + while (stopwatch.elapsedTime(TimeUnit.MINUTES) < 2) { + DescribeSpotInstanceRequestsResult result = client.describeSpotInstanceRequests(describeRequest); + List pending = result.getSpotInstanceRequests(); + if (pending.size() > 0) { + LOG.info("Not resending spot instance requests {} for businessKey: {}.", pending, businessKey); + execution.setVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS, + collectSpotInstanceRequestIds(pending)); + return; + } + LOG.info("The describe call has not returned anything yet, waiting 20s and retrying."); + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.SECONDS); } } - DescribeSpotInstanceRequestsResult result = client.describeSpotInstanceRequests( - new DescribeSpotInstanceRequestsRequest() - .withFilters(new Filter() - .withName("launch-group").withValues(businessKey) - .withName("state").withValues("open", "active"))); - List pending = result.getSpotInstanceRequests(); - if (pending.size() > 0) { - LOG.info("Not resending spot instance requests {} for businessKey: {}.", pending, businessKey); - execution.setVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS, - collectSpotInstanceRequestIds(pending)); - return; - } - final RequestSpotInstancesRequest request = createSpotInstancesRequest(pool, execution); execution.setVariable(ProcessVariables.SPOT_REQUESTS_SENT, true); RequestSpotInstancesResult requestResult = client.requestSpotInstances(request); diff --git a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java index b072688..63b41cc 100644 --- a/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java +++ b/providers/amazon/src/test/java/com/axemblr/provisionr/amazon/activities/RunSpotInstancesLiveTest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -36,10 +37,9 @@ import com.axemblr.provisionr.amazon.ProcessVariables; import com.axemblr.provisionr.amazon.options.ProviderOptions; import com.axemblr.provisionr.test.ProcessVariablesCollector; +import com.google.common.util.concurrent.Uninterruptibles; public class RunSpotInstancesLiveTest extends RunInstancesLiveTest { - - private static final Logger LOG = LoggerFactory.getLogger(RunSpotInstancesLiveTest.class); /** * This should be set a bit higher than the on demand instance @@ -47,53 +47,49 @@ public class RunSpotInstancesLiveTest extends RunInstancesLiveTest> argument = (ArgumentCaptor>) (Object) ArgumentCaptor.forClass(List.class); verify(execution).setVariable(eq(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS), argument.capture()); when(execution.getVariable(ProcessVariables.SPOT_INSTANCE_REQUEST_IDS)).thenReturn(argument.getValue()); - timeout(TIMEOUT); + /* The timeout is needed because the describe calls don't return immediately. */ + // TODO: see if we can eliminate this after adding the process variables conditions + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES); // shouldn't do anything activity.execute(execution); - - timeout(TIMEOUT); - + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES); + DescribeSpotInstanceRequestsResult result = client.describeSpotInstanceRequests( new DescribeSpotInstanceRequestsRequest().withFilters(new Filter() .withName("launch-group").withValues(BUSINESS_KEY))); - + assertThat(result.getSpotInstanceRequests()).hasSize(1); - - timeout(TIMEOUT); - } - + /* we also need to sleep before the teardown */ + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES); + } + @SuppressWarnings("unchecked") @Override public void tearDown() throws Exception { @@ -101,7 +97,7 @@ public void tearDown() throws Exception { ArgumentCaptor> argument = (ArgumentCaptor>) (Object) ArgumentCaptor.forClass(List.class); - executeActivitiesInSequence(execution, + executeActivitiesInSequence(execution, CancelSpotRequests.class, GetInstanceIdsFromSpotRequests.class); @@ -111,13 +107,5 @@ public void tearDown() throws Exception { executeActivitiesInSequence(execution, TerminateInstances.class); super.tearDown(); } - - private void timeout(int ms) { - try { - Thread.sleep(ms); - } catch (InterruptedException exception) { - LOG.info("Prematurely woken up"); - } - } - + }