Skip to content

Commit

Permalink
- parallel StartMojo that respects dependencies and starts all contai…
Browse files Browse the repository at this point in the history
…ners in parallel as soon as all dependencies are resolved.
  • Loading branch information
danielwegener committed Jul 31, 2016
1 parent 083322c commit 2fa5811
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 22 deletions.
136 changes: 115 additions & 21 deletions src/main/java/io/fabric8/maven/docker/StartMojo.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
*/

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.*;
import java.util.regex.Pattern;

import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.maven.docker.access.DockerAccess;
import io.fabric8.maven.docker.access.DockerAccessException;
import io.fabric8.maven.docker.access.PortMapping;
Expand Down Expand Up @@ -53,6 +53,9 @@ public class StartMojo extends AbstractDockerMojo {
@Parameter(property = "docker.skip.run", defaultValue = "false")
private boolean skipRun;

@Parameter(property = "docker.startParallel", defaultValue = "false")
private boolean startParallel;

// whether to block during to start. Set it via System property docker.follow
private boolean follow;

Expand Down Expand Up @@ -82,6 +85,16 @@ public class StartMojo extends AbstractDockerMojo {
@Parameter
protected String portPropertyFile;

private static final class StartedContainerImage {
public final ImageConfiguration imageConfig;
public final String containerId;

private StartedContainerImage(ImageConfiguration imageConfig, String containerId) {
this.imageConfig = imageConfig;
this.containerId = containerId;
}
}

/**
* {@inheritDoc}
*/
Expand All @@ -93,18 +106,22 @@ public synchronized void executeInternal(final ServiceHub hub) throws DockerAcce
}
getPluginContext().put(CONTEXT_KEY_START_CALLED, true);

Properties projProperties = project.getProperties();
final Properties projProperties = project.getProperties();
this.follow = Boolean.valueOf(System.getProperty("docker.follow", "false"));

QueryService queryService = hub.getQueryService();
RunService runService = hub.getRunService();
final RunService runService = hub.getRunService();

LogDispatcher dispatcher = getLogDispatcher(hub);
final LogDispatcher dispatcher = getLogDispatcher(hub);
PortMapping.PropertyWriteHelper portMappingPropertyWriteHelper = new PortMapping.PropertyWriteHelper(portPropertyFile);

boolean success = false;
PomLabel pomLabel = getPomLabel();
final PomLabel pomLabel = getPomLabel();

try {

final Queue<ImageConfiguration> imagesWaitingToStart = new ArrayDeque<>();

for (StartOrderResolver.Resolvable resolvable : runService.getImagesConfigsInOrder(queryService, getResolvedImages())) {
final ImageConfiguration imageConfig = (ImageConfiguration) resolvable;

Expand All @@ -113,34 +130,111 @@ public synchronized void executeInternal(final ServiceHub hub) throws DockerAcce

String imageName = imageConfig.getName();
checkImageWithAutoPull(hub, imageName,
getConfiguredRegistry(imageConfig,pullRegistry),imageConfig.getBuildConfiguration() == null);
getConfiguredRegistry(imageConfig, pullRegistry), imageConfig.getBuildConfiguration() == null);

RunImageConfiguration runConfig = imageConfig.getRunConfiguration();
PortMapping portMapping = runService.getPortMapping(runConfig, projProperties);
NetworkConfig config = runConfig.getNetworkingConfig();
if (autoCreateCustomNetworks && config.isCustomNetwork()) {
runService.createCustomNetworkIfNotExistant(config.getCustomNetwork());
}
imagesWaitingToStart.add(imageConfig);
}

final Set<String> startedContainers = new HashSet<>();

final ExecutorService executorService;
if (startParallel) {
executorService = Executors.newCachedThreadPool();
} else {
executorService = MoreExecutors.newDirectExecutorService();
}
final ExecutorCompletionService<StartedContainerImage> startingContainers = new ExecutorCompletionService<>(executorService);

while (!imagesWaitingToStart.isEmpty()) {
final List<ImageConfiguration> startableImages = new ArrayList<>();

for (ImageConfiguration imageWaitingToStart : imagesWaitingToStart) {
if (startedContainers.containsAll(imageWaitingToStart.getDependencies())) {
startableImages.add(imageWaitingToStart);
}
}

for (final ImageConfiguration startableImage : startableImages) {

String containerId = runService.createAndStartContainer(imageConfig, portMapping, pomLabel, projProperties);
final RunImageConfiguration runConfig = startableImage.getRunConfiguration();
final PortMapping portMapping = runService.getPortMapping(runConfig, projProperties);

if (showLogs(imageConfig)) {
dispatcher.trackContainerLog(containerId,
serviceHubFactory.getLogOutputSpecFactory().createSpec(containerId, imageConfig));
startingContainers.submit(new Callable<StartedContainerImage>() {
@Override
public StartedContainerImage call() throws Exception {
final String containerId = runService.createAndStartContainer(startableImage, portMapping, pomLabel, projProperties);

if (showLogs(startableImage)) {
dispatcher.trackContainerLog(containerId,
serviceHubFactory.getLogOutputSpecFactory().createSpec(containerId, startableImage));
}

// Wait if requested
waitIfRequested(hub,startableImage, projProperties, containerId);
WaitConfiguration waitConfig = runConfig.getWaitConfiguration();
if (waitConfig != null && waitConfig.getExec() != null && waitConfig.getExec().getPostStart() != null) {
runService.execInContainer(containerId, waitConfig.getExec().getPostStart(), startableImage);
}

return new StartedContainerImage(startableImage, containerId);
}
});
imagesWaitingToStart.remove(startableImage);
}

portMappingPropertyWriteHelper.add(portMapping, runConfig.getPortPropertyFile());

// Wait if requested
waitIfRequested(hub,imageConfig, projProperties, containerId);
WaitConfiguration waitConfig = runConfig.getWaitConfiguration();
if (waitConfig != null && waitConfig.getExec() != null && waitConfig.getExec().getPostStart() != null) {
runService.execInContainer(containerId, waitConfig.getExec().getPostStart(), imageConfig);
final Future<StartedContainerImage> imageStartResult = startingContainers.take();
try {
final StartedContainerImage startedContainerImage = imageStartResult.get();
final String containerId = startedContainerImage.containerId;
final ImageConfiguration imageConfig = startedContainerImage.imageConfig;
final RunImageConfiguration runConfig = imageConfig.getRunConfiguration();
final PortMapping portMapping = runService.getPortMapping(runConfig, projProperties);


startedContainers.add(startedContainerImage.imageConfig.getAlias());

portMappingPropertyWriteHelper.add(portMapping, runConfig.getPortPropertyFile());
// Expose container info as properties
exposeContainerProps(hub.getQueryService(), containerId, imageConfig.getAlias());

} catch (ExecutionException e) {
try {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException)e.getCause();
} else if (e.getCause() instanceof IOException) {
throw (IOException)e.getCause();
} else if (e.getCause() instanceof InterruptedException) {
throw (InterruptedException)e.getCause();
} else {
throw new RuntimeException("Start-Job failed with unexpected exception: "+e.getCause().getMessage(), e.getCause());
}
} finally {
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
log.warn("ExecutorService did not shutdown correctly. Enforcing shutdown now!");
executorService.shutdownNow();
}
}
}
}

// Expose container info as properties
exposeContainerProps(hub.getQueryService(), containerId,imageConfig.getAlias());
if (!executorService.isShutdown()) {
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("ExecutorService did not shutdown normally.");
}
}

if (follow) {
runService.addShutdownHookForStoppingContainers(keepContainer, removeVolumes, autoCreateCustomNetworks);
wait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public List<String> getDependencies() {
addVolumes(runConfig, ret);
addLinks(runConfig, ret);
addContainerNetwork(runConfig, ret);
addDependsOn(runConfig, ret);
}
return ret;
}
Expand Down Expand Up @@ -112,6 +113,15 @@ private void addContainerNetwork(RunImageConfiguration runConfig, List<String> r
}
}

private void addDependsOn(RunImageConfiguration runConfig, List<String> ret) {
// Only used in required networks.
if (runConfig.getDependsOn() != null && runConfig.getNetworkingConfig().isCustomNetwork()) {
for (String[] link : EnvUtil.splitOnLastColon(runConfig.getDependsOn())) {
ret.add(link[0]);
}
}
}

public boolean isDataImage() {
// If there is no explicit run configuration, its a data image
// TODO: Probably add an explicit property so that a user can indicated whether it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class RunImageConfiguration {
@Parameter
private String domainname;

// container domain name
@Parameter
private List<String> dependsOn;

// container entry point
@Parameter
private Arguments entrypoint;
Expand Down Expand Up @@ -171,6 +175,10 @@ public String getDomainname() {
return domainname;
}

public List<String> getDependsOn() {
return dependsOn;
}

public String getUser() {
return user;
}
Expand Down Expand Up @@ -390,6 +398,11 @@ public Builder network(NetworkConfig networkConfig) {
return this;
}

public Builder dependsOn(List<String> dependsOn) {
config.dependsOn = dependsOn;
return this;
}

public Builder dns(List<String> dns) {
config.dns = dns;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum ConfigKey {
NOCACHE,
OPTIMISE,
CMD,
DEPENDS_ON,
DOMAINNAME,
DNS,
DNS_SEARCH,
Expand Down Expand Up @@ -134,4 +135,4 @@ public String asPropertyKey(String prefix) {
public String asPropertyKey() {
return DEFAULT_PREFIX + "." + key;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ private RunImageConfiguration extractRunConfiguration(String prefix, Properties
.securityOpts(listWithPrefix(prefix, SECURITY_OPTS, properties))
.cmd(withPrefix(prefix, CMD, properties))
.dns(listWithPrefix(prefix, DNS, properties))
.dependsOn(listWithPrefix(prefix, DEPENDS_ON, properties))
.net(withPrefix(prefix, NET, properties))
.network(extractNetworkConfig(prefix, properties))
.dnsSearch(listWithPrefix(prefix, DNS_SEARCH, properties))
Expand Down

0 comments on commit 2fa5811

Please sign in to comment.