Skip to content

Commit

Permalink
Merge pull request #943 from terascope/fix-worker-es-failure
Browse files Browse the repository at this point in the history
Fixes for better error handling and prevention for stuck workers
  • Loading branch information
godber authored Jan 14, 2019
2 parents 1c26348 + d32595a commit f038089
Show file tree
Hide file tree
Showing 25 changed files with 539 additions and 353 deletions.
2 changes: 1 addition & 1 deletion e2e/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"@terascope/fetch-github-release": "^0.6.0",
"bluebird": "^3.5.3",
"bunyan": "^1.8.12",
"elasticsearch": "^15.1.1",
"elasticsearch": "^15.3.0",
"jest": "^23.6.0",
"jest-extended": "^0.11.0",
"lodash": "^4.17.11",
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"@types/socket.io": "^2.1.2",
"@types/socket.io-client": "^1.4.32",
"@types/uuid": "^3.4.4",
"lerna": "^3.10.1",
"lerna": "^3.10.5",
"rimraf": "^2.6.3",
"typescript": "^3.2.2"
},
Expand All @@ -53,7 +53,7 @@
"lerna-alias": "^3.0.2",
"semver": "^5.6.0",
"ts-jest": "^23.10.5",
"tslint": "^5.12.0"
"tslint": "^5.12.1"
},
"workspaces": [
"packages/*",
Expand Down
1 change: 1 addition & 0 deletions packages/elasticsearch-api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
return function _errorHandlerFn(err) {
const isRejectedError = _.get(err, 'body.error.type') === 'es_rejected_execution_exception';
// const isConnectionError = _.get(err, 'message') === 'No Living connections';

if (isRejectedError) {
// this iteration we will not handle with no living connections issue
retry();
Expand Down
2 changes: 1 addition & 1 deletion packages/job-components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"jest-fixtures": "^0.6.0",
"rimraf": "^2.6.3",
"ts-jest": "^23.10.5",
"tslint": "^5.12.0",
"tslint": "^5.12.1",
"tslint-config-airbnb": "^5.11.1",
"typescript": "^3.2.2"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"syncprompt": "^2.0.0",
"teraslice-client-js": "^0.7.0",
"terminal-kit": "^1.26.10",
"tty-table": "^2.6.12",
"tty-table": "^2.6.14",
"yargs": "^12.0.2"
},
"devDependencies": {
Expand Down
5 changes: 2 additions & 3 deletions packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/teraslice-messaging",
"version": "0.2.6",
"version": "0.3.0",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -38,7 +38,6 @@
},
"dependencies": {
"@terascope/queue": "^1.1.6",
"bluebird": "^3.5.3",
"debug": "^4.1.1",
"nanoid": "^2.0.0",
"p-event": "^2.1.0",
Expand All @@ -55,7 +54,7 @@
"jest-extended": "^0.11.0",
"rimraf": "^2.6.3",
"ts-jest": "^23.10.5",
"tslint": "^5.12.0",
"tslint": "^5.12.1",
"tslint-config-airbnb": "^5.11.1",
"typescript": "^3.2.2"
},
Expand Down
19 changes: 16 additions & 3 deletions packages/teraslice-messaging/src/execution-controller/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { isString, pickBy } from 'lodash';
import * as core from '../messenger';
import * as i from './interfaces';

const ONE_MIN = 60 * 1000;

export class Client extends core.Client {
public workerId: string;

Expand Down Expand Up @@ -81,18 +83,29 @@ export class Client extends core.Client {
});
}

async waitForSlice(fn: i.WaitUntilFn = () => false, interval = 100): Promise<i.Slice|undefined> {
async waitForSlice(fn: i.WaitUntilFn = () => false, timeoutMs = 2 * ONE_MIN): Promise<i.Slice|undefined> {
this.sendAvailable();

const startTime = Date.now();

const isExpired = () => {
const elapsed = Date.now() - startTime;
if (elapsed < timeoutMs) return false;

// force the next time to send a new available message
this.sendUnavailable();
return true;
};

const slice = await new Promise((resolve) => {
this.once('execution:slice:new', onMessage);

const intervalId = setInterval(() => {
if (this.serverShutdown || !this.ready || fn()) {
if (this.serverShutdown || !this.ready || fn() || isExpired()) {
this.removeListener('execution:slice:new', onMessage);
finish();
}
}, interval);
}, 100);

function onMessage(msg: core.EventMessage) {
finish(msg.payload as i.Slice);
Expand Down
28 changes: 22 additions & 6 deletions packages/teraslice-messaging/test/execution-controller-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'jest-extended';

import bluebird from 'bluebird';
import findPort from './helpers/find-port';
import { pDelay, findPort } from './helpers';
import {
formatURL,
newMsgId,
Expand Down Expand Up @@ -159,7 +158,7 @@ describe('ExecutionController', () => {
},
});

await bluebird.delay(100);
await pDelay(100);

expect(sliceComplete).toHaveBeenCalled();

Expand Down Expand Up @@ -201,7 +200,7 @@ describe('ExecutionController', () => {
error: 'hello'
});

await bluebird.delay(100);
await pDelay(100);

expect(sliceFailure).toHaveBeenCalled();
if (msg == null) {
Expand Down Expand Up @@ -246,7 +245,7 @@ describe('ExecutionController', () => {

const slice = client.waitForSlice(() => (Date.now() - stopAt) > 0);

await bluebird.delay(500);
await pDelay(500);

expect(server.queue.exists('workerId', workerId)).toBeTrue();

Expand All @@ -273,10 +272,27 @@ describe('ExecutionController', () => {
}
});

await bluebird.delay(100);
await pDelay(100);

expect(server.activeWorkerCount).toBe(0);
});

describe('when no slice is sent from the server', () => {
it('should handle the timeout properly', async () => {
await client.sendAvailable();

const stopAt = Date.now() + 2000;
const stopFn = () => (Date.now() - stopAt) > 0;
const slice = client.waitForSlice(stopFn, 500);

await pDelay(600);

await expect(slice).resolves.toBeUndefined();

expect(server.activeWorkerCount).toBe(0);
expect(client.available).toBeFalse();
});
});
});

describe('when the client is set as unavailable', () => {
Expand Down
5 changes: 5 additions & 0 deletions packages/teraslice-messaging/test/helpers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { promisify } from 'util';
import findPort from './find-port';

export const pDelay = promisify(setTimeout);
export { findPort };
5 changes: 2 additions & 3 deletions packages/teraslice-messaging/test/messenger-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import 'jest-extended';

import bluebird from 'bluebird';
import http from 'http';
import { Message } from '../src/messenger';
import { Messenger, formatURL, newMsgId } from '../src';
import findPort from './helpers/find-port';
import { findPort, pDelay } from './helpers';

describe('Messenger', () => {
describe('->Core', () => {
Expand Down Expand Up @@ -386,7 +385,7 @@ describe('Messenger', () => {

// @ts-ignore
server.handleResponse(server.server.to(clientId), 'hello', async () => {
await bluebird.delay(1000);
await pDelay(1000);
});

try {
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-test-harness/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"jest-extended": "^0.11.0",
"rimraf": "^2.6.3",
"ts-jest": "^23.10.5",
"tslint": "^5.12.0",
"tslint": "^5.12.1",
"tslint-config-airbnb": "^5.11.1",
"typescript": "^3.2.2"
},
Expand Down
10 changes: 9 additions & 1 deletion packages/teraslice/lib/cluster/services/execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,19 @@ module.exports = function module(context, { clusterMasterServer }) {
function stopExecution(exId, timeout, excludeNode) {
return getExecutionContext(exId)
.then((execution) => {
const isTerminal = _isTerminalStatus(execution);
const isTerminal = _isTerminalStatus(execution._status);
if (isTerminal) {
logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`);
return true;
}

if (execution._status === 'stopping') {
logger.info('execution is already stopping...');
// we are kicking this off in the background, not part of the promise chain
executionHasStopped(exId);
return true;
}

logger.debug(`stopping execution ${exId}...`, _.pickBy({ timeout, excludeNode }));
return setExecutionStatus(exId, 'stopping')
.then(() => clusterService.stopExecution(exId, timeout, excludeNode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS

return elasticsearch.bulkSend(bulkRequest)
.then((results) => {
logger.info(`Flushed ${results.items.length} records to index ${indexName}`);
logger.debug(`Flushed ${results.items.length} records to index ${indexName}`);
})
.catch((err) => {
const errMsg = parseError(err);
Expand Down
5 changes: 3 additions & 2 deletions packages/teraslice/lib/cluster/storage/execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ module.exports = function module(context) {
// verify the current status to make sure it can be updated to the desired status
function verifyStatusUpdate(exId, desiredStatus) {
if (!desiredStatus || !_isValidStatus(desiredStatus)) {
return Promise.reject(new Error(`Invalid Job status: "${desiredStatus}"`));
const error = new Error(`Invalid Job status: "${desiredStatus}"`);
error.statusCode = 422;
return Promise.reject(error);
}


return getStatus(exId)
.then((status) => {
// when setting the same status to shouldn't throw an error
Expand Down
Loading

0 comments on commit f038089

Please sign in to comment.