Skip to content

Commit

Permalink
first step in cleaning up parallel runner for streams #1281
Browse files Browse the repository at this point in the history
introduced processor and subscriber interfaces based on the java reactive streams / spi
much better parallel exec only one executor service is needed instead of two
no more flaky countdown-latch, using completable-future and try-catch blocks to avoid runner hang
todo: fix dynamic scenario to return proper lazy iterator, but now we have the foundation for that
todo: simplify the feature-execution-unit and scenario-execution-unit
since now the parallel-processor does more of the async orchestration
also todo: make runner-options --> feature-context --> heirarchy more sane
  • Loading branch information
ptrthomas committed Sep 12, 2020
1 parent 5203a3e commit 5347f02
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 136 deletions.
2 changes: 1 addition & 1 deletion examples/jobserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.version>3.6.0</maven.compiler.version>
<karate.version>0.9.6</karate.version>
<karate.version>2.0.0</karate.version>
</properties>

<dependencies>
Expand Down
110 changes: 68 additions & 42 deletions karate-core/src/main/java/com/intuit/karate/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import com.intuit.karate.core.HtmlFeatureReport;
import com.intuit.karate.core.HtmlReport;
import com.intuit.karate.core.HtmlSummaryReport;
import com.intuit.karate.core.ParallelProcessor;
import com.intuit.karate.core.ScenarioExecutionUnit;
import com.intuit.karate.core.Subscriber;
import com.intuit.karate.core.Tags;
import com.intuit.karate.job.JobConfig;
import com.intuit.karate.job.JobServer;
Expand All @@ -45,9 +47,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -293,53 +296,79 @@ public static Results parallel(Builder options) {
if (options.hooks != null) {
options.hooks.forEach(h -> h.beforeAll(results));
}
ExecutorService featureExecutor = Executors.newFixedThreadPool(threadCount, Executors.privilegedThreadFactory());
ExecutorService scenarioExecutor = Executors.newWorkStealingPool(threadCount);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ExecutorService featureExecutor = Executors.newWorkStealingPool(threadCount);
List<CompletableFuture> futures = new ArrayList();
CompletableFuture latch = new CompletableFuture();
Subscriber<CompletableFuture> subscriber = new Subscriber<CompletableFuture>() {
@Override
public void onNext(CompletableFuture result) {
futures.add(result);
}

@Override
public void onComplete() {
latch.complete(Boolean.TRUE);
}
};
List<Resource> resources = options.resolveResources();
try {
int count = resources.size();
CountDownLatch latch = new CountDownLatch(count);
List<FeatureResult> featureResults = new ArrayList(count);
for (int i = 0; i < count; i++) {
Resource resource = resources.get(i);
int index = i + 1;
Feature feature = FeatureParser.parse(resource);
feature.setCallName(options.scenarioName);
feature.setCallLine(resource.getLine());
FeatureContext featureContext = new FeatureContext(null, feature, options.tagSelector());
CallContext callContext = CallContext.forAsync(feature, options.hooks, options.hookFactory, null, false);
ExecutionContext execContext = new ExecutionContext(results, results.getStartTime(), featureContext, callContext, reportDir,
r -> featureExecutor.submit(r), scenarioExecutor, Thread.currentThread().getContextClassLoader());
featureResults.add(execContext.result);
if (jobServer != null) {
List<ScenarioExecutionUnit> units = feature.getScenarioExecutionUnits(execContext);
jobServer.addFeature(execContext, units, () -> {
onFeatureDone(results, execContext, reportDir, index, count);
latch.countDown();
});
} else {
FeatureExecutionUnit unit = new FeatureExecutionUnit(execContext);
unit.setNext(() -> {
onFeatureDone(results, execContext, reportDir, index, count);
latch.countDown();
});
featureExecutor.submit(unit);
int count = resources.size();
List<FeatureResult> featureResults = new ArrayList(count);
ParallelProcessor<Resource, CompletableFuture> processor = new ParallelProcessor<Resource, CompletableFuture>(featureExecutor, resources.iterator()) {
int index = 0;

@Override
public Iterator<CompletableFuture> process(Resource resource) {
CompletableFuture future = new CompletableFuture();
try {
Feature feature = FeatureParser.parse(resource);
feature.setCallName(options.scenarioName);
feature.setCallLine(resource.getLine());
FeatureContext featureContext = new FeatureContext(null, feature, options.tagSelector());
CallContext callContext = CallContext.forAsync(feature, options.hooks, options.hookFactory, null, false);
ExecutionContext execContext = new ExecutionContext(results, results.getStartTime(), featureContext, callContext, reportDir,
r -> featureExecutor.submit(r), featureExecutor, classLoader);
featureResults.add(execContext.result);
if (jobServer != null) {
List<ScenarioExecutionUnit> units = feature.getScenarioExecutionUnits(execContext);
jobServer.addFeature(execContext, units, () -> {
onFeatureDone(results, execContext, reportDir, ++index, count);
future.complete(Boolean.TRUE);
});
} else {
FeatureExecutionUnit unit = new FeatureExecutionUnit(execContext);
unit.setNext(() -> {
onFeatureDone(results, execContext, reportDir, ++index, count);
future.complete(Boolean.TRUE);
});
unit.run();
}
} catch (Exception e) {
future.complete(Boolean.FALSE);
LOGGER.error("runner failed: {}", e.getMessage());
results.setFailureReason(e);
}
return Collections.singletonList(future).iterator();
}

};
try {
if (jobServer != null) {
jobServer.startExecutors();
}
LOGGER.info("waiting for parallel features to complete ...");
processor.subscribe(subscriber);
latch.join();
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture allFutures = CompletableFuture.allOf(futuresArray);
LOGGER.info("waiting for {} parallel features to complete ...", futuresArray.length);
if (options.timeoutMinutes > 0) {
latch.await(options.timeoutMinutes, TimeUnit.MINUTES);
if (latch.getCount() > 0) {
LOGGER.warn("parallel execution timed out after {} minutes, features remaining: {}",
options.timeoutMinutes, latch.getCount());
}
allFutures.get(options.timeoutMinutes, TimeUnit.MINUTES);
} else {
latch.await();
allFutures.join();
}
LOGGER.info("all features complete");
results.stopTimer();
featureExecutor.shutdownNow();
HtmlSummaryReport summary = new HtmlSummaryReport();
for (FeatureResult result : featureResults) {
int scenarioCount = result.getScenarioCount();
Expand Down Expand Up @@ -367,11 +396,8 @@ public static Results parallel(Builder options) {
options.hooks.forEach(h -> h.afterAll(results));
}
} catch (Exception e) {
LOGGER.error("karate parallel runner failed: ", e.getMessage());
LOGGER.error("runner failed: {}", e);
results.setFailureReason(e);
} finally {
featureExecutor.shutdownNow();
scenarioExecutor.shutdownNow();
}
return results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
package com.intuit.karate.core;

import com.intuit.karate.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.Iterator;
import org.slf4j.LoggerFactory;

/**
Expand All @@ -41,17 +39,15 @@ public class FeatureExecutionUnit implements Runnable {
public final ExecutionContext exec;
private final boolean parallelScenarios;

private CountDownLatch latch;
private List<ScenarioExecutionUnit> units;
private List<ScenarioResult> results;
private Iterator<ScenarioExecutionUnit> units;
private Runnable next;

public FeatureExecutionUnit(ExecutionContext exec) {
this.exec = exec;
parallelScenarios = exec.scenarioExecutor != null;
}

public List<ScenarioExecutionUnit> getScenarioExecutionUnits() {
public Iterator<ScenarioExecutionUnit> getScenarioExecutionUnits() {
return units;
}

Expand All @@ -67,16 +63,13 @@ public void init() {
hookResult = false;
}
if (hookResult == false) {
units = Collections.EMPTY_LIST;
units = Collections.emptyIterator();
}
}
}
if (units == null) { // no hook failed
units = exec.featureContext.feature.getScenarioExecutionUnits(exec);
units = exec.featureContext.feature.getScenarioExecutionUnits(exec).iterator();
}
int count = units.size();
results = new ArrayList(count);
latch = new CountDownLatch(count);
}

public void setNext(Runnable next) {
Expand All @@ -90,30 +83,49 @@ public void run() {
if (units == null) {
init();
}
for (ScenarioExecutionUnit unit : units) {
if (isSelected(unit) && run(unit)) {
// unit.next should count down latch when done
} else { // un-selected / failed scenario
latch.countDown();
Subscriber<ScenarioResult> subscriber = new Subscriber<ScenarioResult>() {
@Override
public void onNext(ScenarioResult result) {
exec.result.addResult(result);
}
}
try {
latch.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
stop();
if (next != null) {
next.run();
}

@Override
public void onComplete() {
stop();
if (next != null) {
next.run();
}
}
};
ParallelProcessor<ScenarioExecutionUnit, ScenarioResult> processor = new ParallelProcessor<ScenarioExecutionUnit, ScenarioResult>(exec.scenarioExecutor, units) {
@Override
public Iterator<ScenarioResult> process(ScenarioExecutionUnit unit) {
if (isSelected(unit) && !unit.result.isFailed()) { // can happen for dynamic scenario outlines with a failed background !
unit.run();
// we also hold a reference to the last scenario-context that executed
// for cases where the caller needs a result
lastContextExecuted = unit.getContext();
return Collections.singletonList(unit.result).iterator();
} else {
return Collections.emptyIterator();
}
}

@Override
public boolean runSync(ScenarioExecutionUnit unit) {
if (!parallelScenarios) {
return true;
}
Tags tags = unit.scenario.getTagsEffective();
return tags.valuesFor("parallel").isAnyOf("false");
}

};
processor.subscribe(subscriber);
}

// extracted for junit 5
public void stop() {
// this is where the feature gets "populated" with stats
// but best of all, the original order is retained
for (ScenarioResult sr : results) {
exec.result.addResult(sr);
}
if (lastContextExecuted != null) {
// set result map that caller will see
exec.result.setResultVars(lastContextExecuted.vars);
Expand Down Expand Up @@ -175,28 +187,4 @@ public static boolean isSelected(FeatureContext fc, Scenario scenario, Logger lo
return false;
}

public boolean run(ScenarioExecutionUnit unit) {
// this is an elegant solution to retaining the order of scenarios
// in the final report - even if they run in parallel !
results.add(unit.result);
if (unit.result.isFailed()) { // can happen for dynamic scenario outlines with a failed background !
return false;
}
Tags tags = unit.scenario.getTagsEffective();
unit.setNext(() -> {
latch.countDown();
// we also hold a reference to the last scenario-context that executed
// for cases where the caller needs a result
lastContextExecuted = unit.getContext(); // IMPORTANT: will handle if actions is null
});
boolean sequential = !parallelScenarios || tags.valuesFor("parallel").isAnyOf("false");
// main
if (sequential) {
unit.run();
} else {
exec.scenarioExecutor.submit(unit);
}
return true;
}

}
Loading

0 comments on commit 5347f02

Please sign in to comment.