Skip to content

Commit

Permalink
Clean up interfaces and responsibilites
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Kureev committed Jul 12, 2018
1 parent 9958587 commit 29e04d2
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@

'use strict';

import type {
QueueChildMessage,
WorkerPoolInterface,
WorkerInterface,
} from './types';

export default class WorkerQueueManager {
_workerPool: WorkerPoolInterface;
_queue: Array<?QueueChildMessage>;
import type {QueueChildMessage} from './types';

export default class QueueManager {
_callback: Function;
_last: Array<QueueChildMessage>;
_locks: Array<boolean>;
_numOfWorkers: number;
_offset: number;
_queue: Array<?QueueChildMessage>;

constructor(workerPool: WorkerPoolInterface) {
this._workerPool = workerPool;
constructor(numOfWorkers: number, callback: Function) {
this._callback = callback;
this._numOfWorkers = numOfWorkers;
this._queue = [];
this._last = [];
this._locks = [];
Expand All @@ -46,7 +44,7 @@ export default class WorkerQueueManager {
// }
}

_process(workerId: number): WorkerQueueManager {
_process(workerId: number): QueueManager {
if (this.isLocked(workerId)) {
return this;
}
Expand All @@ -57,15 +55,15 @@ export default class WorkerQueueManager {
return this;
}

const onEnd = (error: ?Error, result: mixed, worker: WorkerInterface) => {
job.onEnd(error, result, worker);
const onEnd = (error: ?Error, result: mixed) => {
job.onEnd(error, result);
this.unlock(workerId);
this._process(workerId);
};

this.lock(workerId);

this._workerPool.send(workerId, job.request, job.onStart, onEnd);
this._callback(workerId, job.request, job.onStart, onEnd);

job.request[1] = true;

Expand All @@ -88,29 +86,29 @@ export default class WorkerQueueManager {
return queueHead;
}

enqueue(task: QueueChildMessage, workerId?: number): WorkerQueueManager {
if (workerId != null) {
if (task.request[1]) {
return this;
}
enqueue(task: QueueChildMessage, workerId: number): QueueManager {
if (task.request[1]) {
return this;
}

if (this._queue[workerId]) {
this._last[workerId].next = task;
} else {
this._queue[workerId] = task;
}

if (this._queue[workerId]) {
this._last[workerId].next = task;
} else {
this._queue[workerId] = task;
}
this._last[workerId] = task;
this._process(workerId);

this._last[workerId] = task;
return this;
}

this._process(workerId);
} else {
const numOfWorkers = this._workerPool.getWorkers().length;
for (let i = 0; i < numOfWorkers; i++) {
const workerIdx = (this._offset + i) % numOfWorkers;
this.enqueue(task, workerIdx);
}
this._offset++;
push(task: QueueChildMessage): QueueManager {
for (let i = 0; i < this._numOfWorkers; i++) {
const workerIdx = (this._offset + i) % this._numOfWorkers;
this.enqueue(task, workerIdx);
}
this._offset++;

return this;
}
Expand Down
17 changes: 16 additions & 1 deletion packages/jest-worker/src/WorkerPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@
'use strict';

import BaseWorkerPool from './base/BaseWorkerPool';
import ChildProcessWorker from './workers/ChildProcessWorker';
import NodeThreadsWorker from './workers/NodeThreadsWorker';

import type {ChildMessage, OnStart, OnEnd, WorkerPoolInterface} from './types';
import type {
ChildMessage,
WorkerOptions,
OnStart,
OnEnd,
WorkerPoolInterface,
WorkerInterface,
} from './types';

class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface {
send(
Expand All @@ -22,6 +31,12 @@ class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface {
): void {
this.getWorkerById(workerId).send(request, onStart, onEnd);
}

createWorker(workerOptions: WorkerOptions): WorkerInterface {
return this._options.useWorkers
? new NodeThreadsWorker(workerOptions)
: new ChildProcessWorker(workerOptions);
}
}

export default WorkerPool;
26 changes: 10 additions & 16 deletions packages/jest-worker/src/base/BaseWorkerPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,25 @@
'use strict';

import mergeStream from 'merge-stream';
import os from 'os';
import path from 'path';

import ChildProcessWorker from '../workers/ChildProcessWorker';
import NodeThreadsWorker from '../workers/NodeThreadsWorker';
import {CHILD_MESSAGE_END} from '../types';

import type {Readable} from 'stream';
import type {FarmOptions, WorkerOptions, WorkerInterface} from '../types';
import type {WorkerPoolOptions, WorkerOptions, WorkerInterface} from '../types';

/* istanbul ignore next */
const emptyMethod = () => {};

export default class BaseWorkerPool {
_stderr: Readable;
_stdout: Readable;
_options: FarmOptions;
_options: WorkerPoolOptions;
_workers: Array<WorkerInterface>;

constructor(workerPath: string, options: FarmOptions) {
constructor(workerPath: string, options: WorkerPoolOptions) {
this._options = options;

const numWorkers = options.numWorkers || os.cpus().length - 1;
this._workers = new Array(numWorkers);
this._workers = new Array(options.numWorkers);

if (!path.isAbsolute(workerPath)) {
workerPath = require.resolve(workerPath);
Expand All @@ -42,11 +37,12 @@ export default class BaseWorkerPool {
const stdout = mergeStream();
const stderr = mergeStream();

for (let i = 0; i < numWorkers; i++) {
const {forkOptions, maxRetries} = options;

for (let i = 0; i < options.numWorkers; i++) {
const workerOptions: WorkerOptions = {
forkOptions: options.forkOptions || {},
maxRetries: options.maxRetries || 3,
useNodeWorkersIfPossible: options.useNodeWorkersIfPossible,
forkOptions,
maxRetries,
workerId: i,
workerPath,
};
Expand Down Expand Up @@ -87,9 +83,7 @@ export default class BaseWorkerPool {
}

createWorker(workerOptions: WorkerOptions): WorkerInterface {
return workerOptions.useNodeWorkersIfPossible
? new NodeThreadsWorker(workerOptions)
: new ChildProcessWorker(workerOptions);
throw Error('Missing method createWorker in WorkerPool');
}

end(): void {
Expand Down
57 changes: 35 additions & 22 deletions packages/jest-worker/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@

'use strict';

import type {WorkerPoolInterface, FarmOptions, ChildMessage} from './types';
import type {Readable} from 'stream';

import os from 'os';
import {CHILD_MESSAGE_CALL, WorkerInterface} from './types';
import WorkerPool from './WorkerPool';
import WorkerQueueManager from './WorkerQueueManager';
import QueueManager from './QueueManager';

import type {
WorkerPoolInterface,
WorkerPoolOptions,
FarmOptions,
ChildMessage,
} from './types';
import type {Readable} from 'stream';

function getExposedMethods(
workerPath: string,
options?: FarmOptions = {},
options: FarmOptions,
): $ReadOnlyArray<string> {
let exposedMethods = options.exposedMethods;

Expand Down Expand Up @@ -79,24 +85,33 @@ export default class JestWorker {
_cacheKeys: {[string]: WorkerInterface, __proto__: null};
_ending: boolean;
_options: FarmOptions;
_queueManager: WorkerQueueManager;
_queueManager: QueueManager;
_threadPool: WorkerPoolInterface;

constructor(workerPath: string, options?: FarmOptions = {}) {
constructor(workerPath: string, options?: FarmOptions) {
this._cacheKeys = Object.create(null);
this._options = Object.assign({}, options, {
useNodeWorkersIfPossible: canUseWorkerThreads(),
});
this._options = Object.assign({}, options);

const workerPoolOptions: WorkerPoolOptions = {
forkOptions: this._options.forkOptions || {},
maxRetries: this._options.maxRetries || 3,
numWorkers: os.cpus().length - 1,
useWorkers: canUseWorkerThreads(),
};

this._threadPool = this._options.WorkerPool
? new this._options.WorkerPool(workerPath, this._options)
: new WorkerPool(workerPath, this._options);
this._queueManager = new WorkerQueueManager(this._threadPool);
? new this._options.WorkerPool(workerPath, workerPoolOptions)
: new WorkerPool(workerPath, workerPoolOptions);

this._queueManager = new QueueManager(
workerPoolOptions.numWorkers,
this._threadPool.send.bind(this._threadPool),
);

this._bindExposedWorkerMethods(workerPath, this._options);
}

_bindExposedWorkerMethods(workerPath: string, options?: FarmOptions): void {
_bindExposedWorkerMethods(workerPath: string, options: FarmOptions): void {
getExposedMethods(workerPath, options).forEach(name => {
if (name.startsWith('_')) {
return;
Expand Down Expand Up @@ -135,11 +150,7 @@ export default class JestWorker {
}
};

const onEnd: onEnd = (
error: Error,
result: mixed,
worker: WorkerInterface,
) => {
const onEnd: onEnd = (error: Error, result: mixed) => {
if (error) {
reject(error);
} else {
Expand All @@ -148,9 +159,11 @@ export default class JestWorker {
};

const task = {onEnd, onStart, request};
const workerId = worker ? worker.getWorkerId() : undefined;

this._queueManager.enqueue(task, workerId);
if (worker) {
this._queueManager.enqueue(task, worker.getWorkerId());
} else {
this._queueManager.push(task);
}
});
}

Expand Down
13 changes: 9 additions & 4 deletions packages/jest-worker/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,22 @@ export type FarmOptions = {
numWorkers?: number,
WorkerPool?: (
workerPath: string,
options?: FarmOptions,
options?: WorkerPoolOptions,
) => WorkerPoolInterface,
useNodeWorkersIfPossible?: boolean,
};

export type WorkerPoolOptions = {|
forkOptions: ForkOptions,
maxRetries: number,
numWorkers: number,
useWorkers: boolean,
|};

export type WorkerOptions = {|
forkOptions: ForkOptions,
maxRetries: number,
workerId: number,
workerPath: string,
useNodeWorkersIfPossible?: boolean,
|};

// Messages passed from the parent to the children.
Expand Down Expand Up @@ -132,7 +137,7 @@ export type ParentMessage = ParentMessageOk | ParentMessageError;

// Queue types.
export type OnStart = WorkerInterface => void;
export type OnEnd = (?Error, ?any, WorkerInterface) => void;
export type OnEnd = (?Error, ?any) => void;

export type QueueChildMessage = {|
request: ChildMessage,
Expand Down
4 changes: 2 additions & 2 deletions packages/jest-worker/src/workers/ChildProcessWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export default class ChildProcessWorker implements WorkerInterface {
onMessage(response: any /* Should be ParentMessage */) {
switch (response[0]) {
case PARENT_MESSAGE_OK:
this._onProcessEnd(null, response[1], this);
this._onProcessEnd(null, response[1]);
break;

case PARENT_MESSAGE_ERROR:
Expand All @@ -103,7 +103,7 @@ export default class ChildProcessWorker implements WorkerInterface {
}
}

this._onProcessEnd(error, null, this);
this._onProcessEnd(error, null);
break;

default:
Expand Down
4 changes: 2 additions & 2 deletions packages/jest-worker/src/workers/NodeThreadsWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export default class ExpirementalWorker implements WorkerInterface {
onMessage(response: any /* Should be ParentMessage */) {
switch (response[0]) {
case PARENT_MESSAGE_OK:
this._onProcessEnd(null, response[1], this);
this._onProcessEnd(null, response[1]);
break;

case PARENT_MESSAGE_ERROR:
Expand All @@ -92,7 +92,7 @@ export default class ExpirementalWorker implements WorkerInterface {
}
}

this._onProcessEnd(error, null, this);
this._onProcessEnd(error, null);
break;

default:
Expand Down

0 comments on commit 29e04d2

Please sign in to comment.