Skip to content

Commit

Permalink
Refactor runner (jestjs#3166)
Browse files Browse the repository at this point in the history
* Improve runJest by breaking up the promise chain into functions.

* Use async/await.

* Add a TestSequencer and “Test” type to clarify responsibilities and improve type interfaces.

* async/await in runTests.

* Minor cleanups

* Fix the test.

* Move `runCLI` into the cli folder.

* Updates to TestSequencer + test the code.
  • Loading branch information
cpojer authored and skovhus committed Apr 29, 2017
1 parent 6badc04 commit 6da58b6
Show file tree
Hide file tree
Showing 9 changed files with 532 additions and 346 deletions.
260 changes: 86 additions & 174 deletions packages/jest-cli/src/TestRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import type {
SerializableError as TestError,
TestResult,
} from 'types/TestResult';
import type {Config, Path} from 'types/Config';
import type {Config} from 'types/Config';
import type {HasteContext, HasteFS} from 'types/HasteMap';
import type {RunnerContext} from 'types/Reporters';
import type {Test, Tests} from 'types/TestRunner';
import type BaseReporter from './reporters/BaseReporter';

const {formatExecError} = require('jest-message-util');
const fs = require('graceful-fs');
const getCacheFilePath = require('jest-haste-map').getCacheFilePath;

const DefaultReporter = require('./reporters/DefaultReporter');
const NotifyReporter = require('./reporters/NotifyReporter');
const SummaryReporter = require('./reporters/SummaryReporter');
Expand All @@ -33,9 +33,7 @@ const throat = require('throat');
const workerFarm = require('worker-farm');
const TestWatcher = require('./TestWatcher');

const FAIL = 0;
const SLOW_TEST_TIME = 3000;
const SUCCESS = 1;

class CancelRun extends Error {
constructor(message: ?string) {
Expand All @@ -49,9 +47,8 @@ type Options = {|
getTestSummary: () => string,
|};

type OnRunFailure = (path: string, err: TestError) => void;

type OnTestResult = (path: string, result: TestResult) => void;
type OnTestFailure = (test: Test, err: TestError) => void;
type OnTestSuccess = (test: Test, result: TestResult) => void;

const TEST_WORKER_PATH = require.resolve('./TestWorker');

Expand All @@ -61,7 +58,6 @@ class TestRunner {
_options: Options;
_startRun: () => *;
_dispatcher: ReporterDispatcher;
_testPerformanceCache: Object;

constructor(
hasteContext: HasteContext,
Expand All @@ -78,10 +74,6 @@ class TestRunner {
this._options = options;
this._startRun = startRun;
this._setupReporters();

// Map from testFilePath -> time it takes to run the test. Used to
// optimally schedule bigger test runs.
this._testPerformanceCache = {};
}

addReporter(reporter: BaseReporter) {
Expand All @@ -92,95 +84,36 @@ class TestRunner {
this._dispatcher.unregister(ReporterClass);
}

_getTestPerformanceCachePath() {
const config = this._config;
return getCacheFilePath(config.cacheDirectory, 'perf-cache-' + config.name);
}

_sortTests(testPaths: Array<string>) {
// When running more tests than we have workers available, sort the tests
// by size - big test files usually take longer to complete, so we run
// them first in an effort to minimize worker idle time at the end of a
// long test run.
//
// After a test run we store the time it took to run a test and on
// subsequent runs we use that to run the slowest tests first, yielding the
// fastest results.
try {
if (this._config.cache) {
this._testPerformanceCache = JSON.parse(
fs.readFileSync(this._getTestPerformanceCachePath(), 'utf8'),
);
} else {
this._testPerformanceCache = {};
}
} catch (e) {
this._testPerformanceCache = {};
}

const cache = this._testPerformanceCache;
async runTests(tests: Tests, watcher: TestWatcher) {
const timings = [];
const stats = {};
const getFileSize = filePath =>
stats[filePath] || (stats[filePath] = fs.statSync(filePath).size);
const getTestRunTime = filePath => {
if (cache[filePath]) {
return cache[filePath][0] === FAIL ? Infinity : cache[filePath][1];
}
return null;
};

testPaths = testPaths.sort((pathA, pathB) => {
const timeA = getTestRunTime(pathA);
const timeB = getTestRunTime(pathB);
if (timeA != null && timeB != null) {
return timeA < timeB ? 1 : -1;
}
return getFileSize(pathA) < getFileSize(pathB) ? 1 : -1;
});

testPaths.forEach(filePath => {
const timing = cache[filePath] && cache[filePath][1];
if (timing) {
timings.push(timing);
tests.forEach(test => {
if (test.duration) {
timings.push(test.duration);
}
});

return {testPaths, timings};
}

_cacheTestResults(aggregatedResults: AggregatedResult) {
const cache = this._testPerformanceCache;
aggregatedResults.testResults.forEach(test => {
if (test && !test.skipped) {
const perf = test.perfStats;
cache[test.testFilePath] = [
test.numFailingTests ? FAIL : SUCCESS,
perf.end - perf.start || 0,
];
}
});
fs.writeFileSync(
this._getTestPerformanceCachePath(),
JSON.stringify(cache),
);
}

runTests(paths: Array<string>, watcher: TestWatcher) {
const config = this._config;
const {testPaths, timings} = this._sortTests(paths);
const aggregatedResults = createAggregatedResults(testPaths.length);
const aggregatedResults = createAggregatedResults(tests.length);
const estimatedTime = Math.ceil(
getEstimatedTime(timings, this._options.maxWorkers) / 1000,
);

const onResult = (testPath: Path, testResult: TestResult) => {
// Run in band if we only have one test or one worker available.
// If we are confident from previous runs that the tests will finish quickly
// we also run in band to reduce the overhead of spawning workers.
const runInBand = this._options.maxWorkers <= 1 ||
tests.length <= 1 ||
(tests.length <= 20 &&
timings.length > 0 &&
timings.every(timing => timing < SLOW_TEST_TIME));

const onResult = (test: Test, testResult: TestResult) => {
if (watcher.isInterrupted()) {
return;
}
if (testResult.testResults.length === 0) {
const message = 'Your test suite must contain at least one test.';
onFailure(testPath, {
onFailure(test, {
message,
stack: new Error(message).stack,
});
Expand All @@ -191,26 +124,20 @@ class TestRunner {
this._bailIfNeeded(aggregatedResults, watcher);
};

const onFailure = (testPath: Path, error: TestError) => {
const onFailure = (test: Test, error: TestError) => {
if (watcher.isInterrupted()) {
return;
}
const testResult = buildFailureTestResult(testPath, error);
testResult.failureMessage = formatExecError(testResult, config, testPath);
const testResult = buildFailureTestResult(test.path, error);
testResult.failureMessage = formatExecError(
testResult,
test.config,
test.path,
);
addResult(aggregatedResults, testResult);
this._dispatcher.onTestResult(config, testResult, aggregatedResults);
};

// Run in band if we only have one test or one worker available.
// If we are confident from previous runs that the tests will finish quickly
// we also run in band to reduce the overhead of spawning workers.
const shouldRunInBand = () =>
this._options.maxWorkers <= 1 ||
testPaths.length <= 1 ||
(testPaths.length <= 20 &&
timings.length > 0 &&
timings.every(timing => timing < SLOW_TEST_TIME));

const updateSnapshotState = () => {
const status = snapshot.cleanup(
this._hasteContext.hasteFS,
Expand All @@ -224,74 +151,66 @@ class TestRunner {
aggregatedResults.snapshot.filesRemoved));
};

const runInBand = shouldRunInBand();

this._dispatcher.onRunStart(config, aggregatedResults, {
estimatedTime,
showStatus: !runInBand,
});

const testRun = runInBand
? this._createInBandTestRun(testPaths, watcher, onResult, onFailure)
: this._createParallelTestRun(testPaths, watcher, onResult, onFailure);
try {
await (runInBand
? this._createInBandTestRun(tests, watcher, onResult, onFailure)
: this._createParallelTestRun(tests, watcher, onResult, onFailure));
} catch (error) {
if (!watcher.isInterrupted()) {
throw error;
}
}

return testRun
.catch(error => {
if (!watcher.isInterrupted()) {
throw error;
}
})
.then(() => {
updateSnapshotState();
aggregatedResults.wasInterrupted = watcher.isInterrupted();
updateSnapshotState();
aggregatedResults.wasInterrupted = watcher.isInterrupted();

this._dispatcher.onRunComplete(config, aggregatedResults);
this._dispatcher.onRunComplete(config, aggregatedResults);

const anyTestFailures = !(aggregatedResults.numFailedTests === 0 &&
aggregatedResults.numRuntimeErrorTestSuites === 0);
const anyReporterErrors = this._dispatcher.hasErrors();
const anyTestFailures = !(aggregatedResults.numFailedTests === 0 &&
aggregatedResults.numRuntimeErrorTestSuites === 0);
const anyReporterErrors = this._dispatcher.hasErrors();

aggregatedResults.success = !(anyTestFailures ||
aggregatedResults.snapshot.failure ||
anyReporterErrors);
aggregatedResults.success = !(anyTestFailures ||
aggregatedResults.snapshot.failure ||
anyReporterErrors);

this._cacheTestResults(aggregatedResults);
return aggregatedResults;
});
return aggregatedResults;
}

_createInBandTestRun(
testPaths: Array<Path>,
tests: Tests,
watcher: TestWatcher,
onResult: OnTestResult,
onFailure: OnRunFailure,
onResult: OnTestSuccess,
onFailure: OnTestFailure,
) {
const mutex = throat(1);
return testPaths.reduce(
(promise, path) =>
mutex(() =>
promise
.then(() => {
if (watcher.isInterrupted()) {
throw new CancelRun();
}

this._dispatcher.onTestStart(this._config, path);
return runTest(path, this._config, this._hasteContext.resolver);
})
.then(result => onResult(path, result))
.catch(err => onFailure(path, err))),
return tests.reduce(
(promise, test) => mutex(() => promise
.then(() => {
if (watcher.isInterrupted()) {
throw new CancelRun();
}

this._dispatcher.onTestStart(test.config, test.path);
return runTest(test.path, test.config, this._hasteContext.resolver);
})
.then(result => onResult(test, result))
.catch(err => onFailure(test, err))),
Promise.resolve(),
);
}

_createParallelTestRun(
testPaths: Array<Path>,
tests: Tests,
watcher: TestWatcher,
onResult: OnTestResult,
onFailure: OnRunFailure,
onResult: OnTestSuccess,
onFailure: OnTestFailure,
) {
const config = this._config;
const farm = workerFarm(
{
autoStart: true,
Expand All @@ -306,23 +225,22 @@ class TestRunner {

// Send test suites to workers continuously instead of all at once to track
// the start time of individual tests.
const runTestInWorker = ({config, path}) =>
mutex(() => {
if (watcher.isInterrupted()) {
return Promise.reject();
}
this._dispatcher.onTestStart(config, path);
return worker({
config,
path,
rawModuleMap: watcher.isWatchMode()
? this._hasteContext.moduleMap.getRawModuleMap()
: null,
});
const runTestInWorker = ({config, path}) => mutex(() => {
if (watcher.isInterrupted()) {
return Promise.reject();
}
this._dispatcher.onTestStart(config, path);
return worker({
config,
path,
rawModuleMap: watcher.isWatchMode()
? this._hasteContext.moduleMap.getRawModuleMap()
: null,
});
});

const onError = (err, path) => {
onFailure(path, err);
const onError = (err, test) => {
onFailure(test, err);
if (err.type === 'ProcessTerminatedError') {
console.error(
'A worker process has quit unexpectedly! ' +
Expand All @@ -341,15 +259,13 @@ class TestRunner {
});

const runAllTests = Promise.all(
testPaths.map(path => {
return runTestInWorker({config, path})
.then(testResult => onResult(path, testResult))
.catch(error => onError(error, path));
}),
tests.map(test =>
runTestInWorker(test)
.then(testResult => onResult(test, testResult))
.catch(error => onError(error, test))),
);

const cleanup = () => workerFarm.end(farm);

return Promise.race([runAllTests, onInterrupt]).then(cleanup, cleanup);
}

Expand Down Expand Up @@ -499,8 +415,6 @@ const buildFailureTestResult = (
};
};

// Proxy class that holds all reporter and dispatchers events to each
// of them.
class ReporterDispatcher {
_disabled: boolean;
_reporters: Array<BaseReporter>;
Expand Down Expand Up @@ -563,11 +477,9 @@ const getEstimatedTime = (timings, workers) => {
}

const max = Math.max.apply(null, timings);
if (timings.length <= workers) {
return max;
}

return Math.max(timings.reduce((sum, time) => sum + time) / workers, max);
return timings.length <= workers
? max
: Math.max(timings.reduce((sum, time) => sum + time) / workers, max);
};

module.exports = TestRunner;
Loading

0 comments on commit 6da58b6

Please sign in to comment.