Skip to content

Commit

Permalink
Merge pull request #438 from MatrixAI/feature-queue
Browse files Browse the repository at this point in the history
Feature TaskManager Scheduler and Queue and Context Decorators for Timed and Cancellable
  • Loading branch information
tegefaulkes authored Sep 13, 2022
2 parents 5d90a03 + 5647b39 commit a722510
Show file tree
Hide file tree
Showing 38 changed files with 7,692 additions and 104 deletions.
2 changes: 1 addition & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
"@typescript-eslint/consistent-type-imports": ["error"],
"@typescript-eslint/consistent-type-exports": ["error"],
"no-throw-literal": "off",
"@typescript-eslint/no-throw-literal": ["error"],
"@typescript-eslint/no-throw-literal": "off",
"@typescript-eslint/no-floating-promises": ["error", {
"ignoreVoid": true,
"ignoreIIFE": true
Expand Down
218 changes: 124 additions & 94 deletions package-lock.json

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"postbuild": "shx cp -fR src/proto dist && shx cp -f src/notifications/*.json dist/notifications/ && shx cp -f src/claims/*.json dist/claims/ && shx cp -f src/status/*.json dist/status/",
"postversion": "npm install --package-lock-only --ignore-scripts --silent",
"ts-node": "ts-node",
"ts-node-inspect": "node --require ts-node/register --inspect",
"test": "jest",
"lint": "eslint '{src,tests,scripts,benches}/**/*.{js,ts}'",
"lintfix": "eslint '{src,tests,scripts,benches}/**/*.{js,ts}' --fix",
Expand All @@ -77,6 +78,7 @@
},
"dependencies": {
"@grpc/grpc-js": "1.6.7",
"@matrixai/async-cancellable": "^1.0.2",
"@matrixai/async-init": "^1.8.2",
"@matrixai/async-locks": "^3.1.2",
"@matrixai/db": "^5.0.3",
Expand All @@ -85,6 +87,7 @@
"@matrixai/logger": "^3.0.0",
"@matrixai/resources": "^1.1.4",
"@matrixai/workers": "^1.3.6",
"@matrixai/timer": "^1.0.0",
"ajv": "^7.0.4",
"bip39": "^3.0.3",
"canonicalize": "^1.0.5",
Expand Down Expand Up @@ -117,14 +120,14 @@
"@types/google-protobuf": "^3.7.4",
"@types/jest": "^28.1.3",
"@types/nexpect": "^0.4.31",
"@types/node": "^16.11.7",
"@types/node": "^16.11.57",
"@types/node-forge": "^0.10.4",
"@types/pako": "^1.0.2",
"@types/prompts": "^2.0.13",
"@types/readable-stream": "^2.3.11",
"@types/uuid": "^8.3.0",
"@typescript-eslint/eslint-plugin": "^5.23.0",
"@typescript-eslint/parser": "^5.23.0",
"@typescript-eslint/eslint-plugin": "^5.36.2",
"@typescript-eslint/parser": "^5.36.2",
"babel-jest": "^28.1.3",
"benny": "^3.7.1",
"common-tags": "^1.8.2",
Expand All @@ -151,6 +154,6 @@
"ts-node": "^10.9.1",
"tsconfig-paths": "^3.9.0",
"typedoc": "^0.22.15",
"typescript": "^4.5.2"
"typescript": "^4.7.4"
}
}
47 changes: 47 additions & 0 deletions src/contexts/decorators/cancellable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { ContextCancellable } from '../types';
import { setupCancellable } from '../functions/cancellable';
import * as contextsUtils from '../utils';

function cancellable(lazy: boolean = false) {
return <
T extends TypedPropertyDescriptor<
(...params: Array<any>) => PromiseLike<any>
>,
>(
target: any,
key: string | symbol,
descriptor: T,
): T => {
// Target is instance prototype for instance methods // or the class prototype for static methods
const targetName = target['name'] ?? target.constructor.name;
const f = descriptor['value'];
if (typeof f !== 'function') {
throw new TypeError(
`\`${targetName}.${key.toString()}\` is not a function`,
);
}
const contextIndex = contextsUtils.getContextIndex(target, key, targetName);
descriptor['value'] = function (...args) {
let ctx: Partial<ContextCancellable> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
}
// Runtime type check on the context parameter
contextsUtils.checkContextCancellable(ctx, key, targetName);
return setupCancellable(
(_, ...args) => f.apply(this, args),
lazy,
ctx,
args,
);
};
// Preserve the name
Object.defineProperty(descriptor['value'], 'name', {
value: typeof key === 'symbol' ? `[${key.description}]` : key,
});
return descriptor;
};
}

export default cancellable;
18 changes: 18 additions & 0 deletions src/contexts/decorators/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import * as contextsUtils from '../utils';

/**
* Context parameter decorator
* It is only allowed to be used once
*/
function context(target: any, key: string | symbol, index: number) {
const targetName = target['name'] ?? target.constructor.name;
const method = target[key];
if (contextsUtils.contexts.has(method)) {
throw new TypeError(
`\`${targetName}.${key.toString()}\` redeclares \`@context\` decorator`,
);
}
contextsUtils.contexts.set(method, index);
}

export default context;
4 changes: 4 additions & 0 deletions src/contexts/decorators/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export { default as context } from './context';
export { default as cancellable } from './cancellable';
export { default as timed } from './timed';
export { default as timedCancellable } from './timedCancellable';
145 changes: 145 additions & 0 deletions src/contexts/decorators/timed.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import type { ContextTimed } from '../types';
import { setupTimedContext } from '../functions/timed';
import * as contextsUtils from '../utils';
import * as contextsErrors from '../errors';
import * as utils from '../../utils';

/**
* Timed method decorator
*/
function timed(
delay: number = Infinity,
errorTimeoutConstructor: new () => Error = contextsErrors.ErrorContextsTimedTimeOut,
) {
return (
target: any,
key: string | symbol,
descriptor: TypedPropertyDescriptor<(...params: Array<any>) => any>,
) => {
// Target is instance prototype for instance methods
// or the class prototype for static methods
const targetName = target['name'] ?? target.constructor.name;
const f = descriptor['value'];
if (typeof f !== 'function') {
throw new TypeError(
`\`${targetName}.${key.toString()}\` is not a function`,
);
}
const contextIndex = contextsUtils.getContextIndex(target, key, targetName);
if (f instanceof utils.AsyncFunction) {
descriptor['value'] = async function (...args) {
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
}
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const teardownContext = setupTimedContext(
delay,
errorTimeoutConstructor,
ctx,
);
try {
return await f.apply(this, args);
} finally {
teardownContext();
}
};
} else if (f instanceof utils.GeneratorFunction) {
descriptor['value'] = function* (...args) {
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
}
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const teardownContext = setupTimedContext(
delay,
errorTimeoutConstructor,
ctx,
);
try {
return yield* f.apply(this, args);
} finally {
teardownContext();
}
};
} else if (f instanceof utils.AsyncGeneratorFunction) {
descriptor['value'] = async function* (...args) {
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
}
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const teardownContext = setupTimedContext(
delay,
errorTimeoutConstructor,
ctx,
);
try {
return yield* f.apply(this, args);
} finally {
teardownContext();
}
};
} else {
descriptor['value'] = function (...args) {
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
}
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const teardownContext = setupTimedContext(
delay,
errorTimeoutConstructor,
ctx,
);
const result = f.apply(this, args);
if (utils.isPromiseLike(result)) {
return result.then(
(r) => {
teardownContext();
return r;
},
(e) => {
teardownContext();
throw e;
},
);
} else if (utils.isGenerator(result)) {
return (function* () {
try {
return yield* result;
} finally {
teardownContext();
}
})();
} else if (utils.isAsyncGenerator(result)) {
return (async function* () {
try {
return yield* result;
} finally {
teardownContext();
}
})();
} else {
teardownContext();
return result;
}
};
}
// Preserve the name
Object.defineProperty(descriptor['value'], 'name', {
value: typeof key === 'symbol' ? `[${key.description}]` : key,
});
return descriptor;
};
}

export default timed;
55 changes: 55 additions & 0 deletions src/contexts/decorators/timedCancellable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { ContextTimed } from '../types';
import { setupTimedCancellable } from '../functions/timedCancellable';
import * as contextsUtils from '../utils';
import * as contextsErrors from '../errors';

function timedCancellable(
lazy: boolean = false,
delay: number = Infinity,
errorTimeoutConstructor: new () => Error = contextsErrors.ErrorContextsTimedTimeOut,
) {
return <
T extends TypedPropertyDescriptor<
(...params: Array<any>) => PromiseLike<any>
>,
>(
target: any,
key: string | symbol,
descriptor: T,
) => {
// Target is instance prototype for instance methods
// or the class prototype for static methods
const targetName: string = target['name'] ?? target.constructor.name;
const f = descriptor['value'];
if (typeof f !== 'function') {
throw new TypeError(
`\`${targetName}.${key.toString()}\` is not a function`,
);
}
const contextIndex = contextsUtils.getContextIndex(target, key, targetName);
descriptor['value'] = function (...args) {
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
}
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
return setupTimedCancellable(
(_, ...args) => f.apply(this, args),
lazy,
delay,
errorTimeoutConstructor,
ctx,
args,
);
};
// Preserve the name
Object.defineProperty(descriptor['value'], 'name', {
value: typeof key === 'symbol' ? `[${key.description}]` : key,
});
return descriptor;
};
}

export default timedCancellable;
10 changes: 10 additions & 0 deletions src/contexts/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { ErrorPolykey, sysexits } from '../errors';

class ErrorContexts<T> extends ErrorPolykey<T> {}

class ErrorContextsTimedTimeOut<T> extends ErrorContexts<T> {
static description = 'Aborted due to timer expiration';
exitCode = sysexits.UNAVAILABLE;
}

export { ErrorContexts, ErrorContextsTimedTimeOut };
Loading

0 comments on commit a722510

Please sign in to comment.