From c6b3c0c9f4ef329268d223ec5ba5fc8b62493716 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 8 Aug 2022 18:12:49 +1000 Subject: [PATCH] wip prototyping queue, tasks and timer --- package-lock.json | 25 +- package.json | 3 +- src/contexts/decorators/cancellable.ts | 4 + src/contexts/decorators/context.ts | 18 + src/contexts/decorators/index.ts | 4 + src/contexts/decorators/timed.ts | 128 +++++ src/contexts/decorators/transactional.ts | 0 src/contexts/errors.ts | 13 + src/contexts/index.ts | 4 + src/contexts/types.ts | 20 + src/contexts/utils.ts | 5 + src/tasks/Queue.ts | 193 ++++++++ src/tasks/Scheduler.ts | 538 ++++++++++++++++++++++ src/tasks/Task.ts | 134 ++++++ src/tasks/errors.ts | 79 ++++ src/tasks/index.ts | 4 + src/tasks/types.ts | 104 +++++ src/tasks/utils.ts | 47 ++ src/timer/Timer.ts | 193 ++++++++ src/timer/errors.ts | 13 + src/timer/index.ts | 2 + src/tracing/Trace.ts | 2 + src/types.ts | 6 + src/utils/PromiseCancellable.ts | 244 ++++++++++ src/utils/index.ts | 1 + src/utils/utils.ts | 7 + test-abort.ts | 28 ++ test-cancellable.ts | 94 ++++ test-constructor.ts | 25 + test-dec.ts | 56 +++ test-p.ts | 22 + test-promises.ts | 35 ++ test-timer-async.ts | 133 ++++++ test-transaction-decorator.ts | 100 ++++ tests/contexts/decorators/context.test.ts | 27 ++ tests/contexts/decorators/timed.test.ts | 97 ++++ tests/tasks/Scheduler.test.ts | 72 +++ tests/tasks/utils.test.ts | 29 ++ tests/timer/Timer.test.ts | 109 +++++ tests/utils.test.ts | 1 + tsconfig.build.json | 3 +- tsconfig.json | 6 +- 42 files changed, 2617 insertions(+), 11 deletions(-) create mode 100644 src/contexts/decorators/cancellable.ts create mode 100644 src/contexts/decorators/context.ts create mode 100644 src/contexts/decorators/index.ts create mode 100644 src/contexts/decorators/timed.ts create mode 100644 src/contexts/decorators/transactional.ts create mode 100644 src/contexts/errors.ts create mode 100644 src/contexts/index.ts create mode 100644 src/contexts/types.ts create mode 100644 src/contexts/utils.ts create mode 100644 src/tasks/Queue.ts create mode 100644 src/tasks/Scheduler.ts create mode 100644 src/tasks/Task.ts create mode 100644 src/tasks/errors.ts create mode 100644 src/tasks/index.ts create mode 100644 src/tasks/types.ts create mode 100644 src/tasks/utils.ts create mode 100644 src/timer/Timer.ts create mode 100644 src/timer/errors.ts create mode 100644 src/timer/index.ts create mode 100644 src/tracing/Trace.ts create mode 100644 src/utils/PromiseCancellable.ts create mode 100644 test-abort.ts create mode 100644 test-cancellable.ts create mode 100644 test-constructor.ts create mode 100644 test-dec.ts create mode 100644 test-p.ts create mode 100644 test-promises.ts create mode 100644 test-timer-async.ts create mode 100644 test-transaction-decorator.ts create mode 100644 tests/contexts/decorators/context.test.ts create mode 100644 tests/contexts/decorators/timed.test.ts create mode 100644 tests/tasks/Scheduler.test.ts create mode 100644 tests/tasks/utils.test.ts create mode 100644 tests/timer/Timer.test.ts diff --git a/package-lock.json b/package-lock.json index 835225da2..20e91d198 100644 --- a/package-lock.json +++ b/package-lock.json @@ -38,6 +38,7 @@ "pako": "^1.0.11", "prompts": "^2.4.1", "readable-stream": "^3.6.0", + "real-cancellable-promise": "^1.1.1", "resource-counter": "^1.2.4", "threads": "^1.6.5", "utp-native": "^2.5.3", @@ -54,7 +55,7 @@ "@types/google-protobuf": "^3.7.4", "@types/jest": "^28.1.3", "@types/nexpect": "^0.4.31", - "@types/node": "^16.11.7", + "@types/node": "^16.11.49", "@types/node-forge": "^0.10.4", "@types/pako": "^1.0.2", "@types/prompts": "^2.0.13", @@ -3027,9 +3028,9 @@ } }, "node_modules/@types/node": { - "version": "16.11.39", - "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.39.tgz", - "integrity": "sha512-K0MsdV42vPwm9L6UwhIxMAOmcvH/1OoVkZyCgEtVu4Wx7sElGloy/W7kMBNe/oJ7V/jW9BVt1F6RahH6e7tPXw==" + "version": "16.11.49", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.49.tgz", + "integrity": "sha512-Abq9fBviLV93OiXMu+f6r0elxCzRwc0RC5f99cU892uBITL44pTvgvEqlRlPRi8EGcO1z7Cp8A4d0s/p3J/+Nw==" }, "node_modules/@types/node-forge": { "version": "0.10.10", @@ -9913,6 +9914,11 @@ "node": ">= 6" } }, + "node_modules/real-cancellable-promise": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/real-cancellable-promise/-/real-cancellable-promise-1.1.1.tgz", + "integrity": "sha512-vxanUX4Aff5sRX6Rb1CSeCDWhO20L0hKQXWTLOYbtRo9WYFMjlhEBX0E75iz3+7ucrmFdPpDolwLC7L65P7hag==" + }, "node_modules/rechoir": { "version": "0.6.2", "resolved": "https://registry.npmjs.org/rechoir/-/rechoir-0.6.2.tgz", @@ -13749,9 +13755,9 @@ } }, "@types/node": { - "version": "16.11.39", - "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.39.tgz", - "integrity": "sha512-K0MsdV42vPwm9L6UwhIxMAOmcvH/1OoVkZyCgEtVu4Wx7sElGloy/W7kMBNe/oJ7V/jW9BVt1F6RahH6e7tPXw==" + "version": "16.11.49", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.49.tgz", + "integrity": "sha512-Abq9fBviLV93OiXMu+f6r0elxCzRwc0RC5f99cU892uBITL44pTvgvEqlRlPRi8EGcO1z7Cp8A4d0s/p3J/+Nw==" }, "@types/node-forge": { "version": "0.10.10", @@ -18882,6 +18888,11 @@ "util-deprecate": "^1.0.1" } }, + "real-cancellable-promise": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/real-cancellable-promise/-/real-cancellable-promise-1.1.1.tgz", + "integrity": "sha512-vxanUX4Aff5sRX6Rb1CSeCDWhO20L0hKQXWTLOYbtRo9WYFMjlhEBX0E75iz3+7ucrmFdPpDolwLC7L65P7hag==" + }, "rechoir": { "version": "0.6.2", "resolved": "https://registry.npmjs.org/rechoir/-/rechoir-0.6.2.tgz", diff --git a/package.json b/package.json index 29403fed3..0fe7b96c5 100644 --- a/package.json +++ b/package.json @@ -105,6 +105,7 @@ "pako": "^1.0.11", "prompts": "^2.4.1", "readable-stream": "^3.6.0", + "real-cancellable-promise": "^1.1.1", "resource-counter": "^1.2.4", "threads": "^1.6.5", "utp-native": "^2.5.3", @@ -117,7 +118,7 @@ "@types/google-protobuf": "^3.7.4", "@types/jest": "^28.1.3", "@types/nexpect": "^0.4.31", - "@types/node": "^16.11.7", + "@types/node": "^16.11.49", "@types/node-forge": "^0.10.4", "@types/pako": "^1.0.2", "@types/prompts": "^2.0.13", diff --git a/src/contexts/decorators/cancellable.ts b/src/contexts/decorators/cancellable.ts new file mode 100644 index 000000000..b35248414 --- /dev/null +++ b/src/contexts/decorators/cancellable.ts @@ -0,0 +1,4 @@ +// let's attempt the cancellable one as well +// it requires the promise +// we can avoid needing to use this in EFS for now +// it's specific to PK diff --git a/src/contexts/decorators/context.ts b/src/contexts/decorators/context.ts new file mode 100644 index 000000000..f2e88ee44 --- /dev/null +++ b/src/contexts/decorators/context.ts @@ -0,0 +1,18 @@ +import * as contextsUtils from '../utils'; + +/** + * Context parameter decorator + * It is only allowed to be used once + */ +function context(target: Object, key: string | symbol, index: number) { + const targetName = (target['name'] ?? target.constructor.name); + const method = target[key]; + if (contextsUtils.contexts.has(method)) { + throw new TypeError( + `\`${targetName}.${key.toString()}\` redeclares \`@context\` decorator` + ); + } + contextsUtils.contexts.set(method, index); +} + +export default context; diff --git a/src/contexts/decorators/index.ts b/src/contexts/decorators/index.ts new file mode 100644 index 000000000..c3527bdad --- /dev/null +++ b/src/contexts/decorators/index.ts @@ -0,0 +1,4 @@ +export { default as context } from './context'; +// export { default as cancellable }, * from './cancellable'; +export { default as timed } from './timed'; +// export { default as transactional }, * from './transactional'; diff --git a/src/contexts/decorators/timed.ts b/src/contexts/decorators/timed.ts new file mode 100644 index 000000000..7c6210234 --- /dev/null +++ b/src/contexts/decorators/timed.ts @@ -0,0 +1,128 @@ +import * as contextsUtils from '../utils'; +import * as contextsErrors from '../errors'; +import Timer from '../../timer/Timer'; +import * as timerErrors from '../../timer/errors'; +import { + AsyncFunction, + GeneratorFunction, + AsyncGeneratorFunction +} from '../../utils'; + +/** + * Timed method decorator + */ +function timed(delay: number = Infinity) { + return ( + target: any, + key: string | symbol, + descriptor: TypedPropertyDescriptor<(...params: any[]) => any> + ): TypedPropertyDescriptor<(...params: any[]) => any> => { + const targetName = (target['name'] ?? target.constructor.name); + const f = descriptor['value']; + if (typeof f !== 'function') { + throw new TypeError(`\`${targetName}.${key.toString()}\` is not a function`); + } + const contextIndex = contextsUtils.contexts.get(target[key]); + if (contextIndex == null) { + throw new TypeError(`\`${targetName}.${key.toString()}\` does not have a \`@context\` parameter decorator`); + } + const wrap = (that: any, params: Array) => { + const context = params[contextIndex]; + if (context !== undefined && (typeof context !== 'object' || context === null)) { + throw new TypeError( + `\`${targetName}.${key.toString()}\` decorated \`@context\` parameter is not a context object` + ); + } + if (context?.timer !== undefined && !(context.timer instanceof Timer)) { + throw new TypeError( + `\`${targetName}.${key.toString()}\` decorated \`@context\` parameter's \`timer\` property is not an instance of \`Timer\`` + ); + } + if (context?.signal !== undefined && !(context.signal instanceof AbortSignal)) { + throw new TypeError( + `\`${targetName}.${key.toString()}\` decorated \`@context\` parameter's \`signal\` property is not an instance of \`AbortSignal\`` + ); + } + // Now `context: { timer: Timer | undefined; signal: AbortSignal | undefined } | undefined` + if ( + context === undefined || + context.timer === undefined && context.signal === undefined + ) { + const abortController = new AbortController(); + const timer = new Timer({ + delay, + handler: () => void abortController.abort(new contextsErrors.ErrorContextsTimerExpired) + }); + params[contextIndex] = (context !== undefined) ? context : {}; + params[contextIndex].signal = abortController.signal; + params[contextIndex].timer = timer; + const result = f.apply(that, params); + timer.catch((e) => { + // Ignore cancellation + if (!(e instanceof timerErrors.ErrorTimerCancelled)) { + throw e; + } + }); + timer.cancel(); + return result; + } else if ( + context.timer === undefined && + context.signal instanceof AbortSignal + ) { + const abortController = new AbortController(); + const timer = new Timer({ + delay, + handler: () => void abortController.abort(new contextsErrors.ErrorContextsTimerExpired) + }); + context.signal.onabort = () => void abortController.abort(context.signal.reason); + params[contextIndex].signal = abortController.signal; + params[contextIndex].timer = timer; + const result = f.apply(that, params); + timer.catch((e) => { + // Ignore cancellation + if (!(e instanceof timerErrors.ErrorTimerCancelled)) { + throw e; + } + }); + timer.cancel(); + return result; + } else if ( + context.timer instanceof Timer && + context.signal === undefined + ) { + const abortController = new AbortController(); + context.timer.then(() => void abortController.abort(new contextsErrors.ErrorContextsTimerExpired)); + params[contextIndex].signal = abortController.signal; + return f.apply(that, params); + } else if ( + context.timer instanceof Timer && context.signal instanceof AbortSignal + ) { + return f.apply(that, params); + } + }; + if (f instanceof AsyncFunction) { + descriptor['value'] = async function (...params) { + return wrap(this, params); + }; + } else if (f instanceof GeneratorFunction) { + descriptor['value'] = function* (...params) { + return yield* wrap(this, params); + }; + } else if (f instanceof AsyncGeneratorFunction) { + descriptor['value'] = async function* (...params) { + return yield* wrap(this, params); + }; + } else { + descriptor['value'] = function (...params) { + return wrap(this, params); + }; + } + // Preserve the name + Object.defineProperty(descriptor['value'], 'name', { + value: (typeof key === 'symbol') ? `[${key.description}]` : key + }); + return descriptor; + }; +} + +export default timed; diff --git a/src/contexts/decorators/transactional.ts b/src/contexts/decorators/transactional.ts new file mode 100644 index 000000000..e69de29bb diff --git a/src/contexts/errors.ts b/src/contexts/errors.ts new file mode 100644 index 000000000..3e13a1906 --- /dev/null +++ b/src/contexts/errors.ts @@ -0,0 +1,13 @@ +import { ErrorPolykey, sysexits } from '../errors'; + +class ErrorContexts extends ErrorPolykey {} + +class ErrorContextsTimerExpired extends ErrorContexts { + static description = 'Aborted due to timer expiration'; + exitCode = sysexits.UNAVAILABLE; +} + +export { + ErrorContexts, + ErrorContextsTimerExpired +}; diff --git a/src/contexts/index.ts b/src/contexts/index.ts new file mode 100644 index 000000000..9432815a9 --- /dev/null +++ b/src/contexts/index.ts @@ -0,0 +1,4 @@ +export * from './decorators'; +export * from './utils'; +export * as types from './types'; +export * as errors from './errors'; diff --git a/src/contexts/types.ts b/src/contexts/types.ts new file mode 100644 index 000000000..57f1cb9ea --- /dev/null +++ b/src/contexts/types.ts @@ -0,0 +1,20 @@ +import type { DBTransaction } from '@matrixai/db'; +import type Timer from '../timer/Timer'; + +type ContextCancellable = { + signal: AbortSignal; +}; + +type ContextTimed = ContextCancellable & { + timer: Timer; +}; + +type ContextTransactional = { + tran: DBTransaction; +}; + +export type { + ContextCancellable, + ContextTimed, + ContextTransactional +}; diff --git a/src/contexts/utils.ts b/src/contexts/utils.ts new file mode 100644 index 000000000..be6d3e291 --- /dev/null +++ b/src/contexts/utils.ts @@ -0,0 +1,5 @@ +const contexts = new WeakMap(); + +export { + contexts, +}; diff --git a/src/tasks/Queue.ts b/src/tasks/Queue.ts new file mode 100644 index 000000000..f4c41cc15 --- /dev/null +++ b/src/tasks/Queue.ts @@ -0,0 +1,193 @@ +import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; +import type { TaskId, TaskIdString, TaskPriority, TaskFunction } from './types'; +import type { PromiseDeconstructed } from '../types'; +import Logger from '@matrixai/logger'; +import { + CreateDestroyStartStop, + ready, +} from '@matrixai/async-init/dist/CreateDestroyStartStop'; +import * as tasksUtils from './utils'; +import * as tasksErrors from './errors'; + +interface Queue extends CreateDestroyStartStop {} +@CreateDestroyStartStop( + new tasksErrors.ErrorQueueRunning(), + new tasksErrors.ErrorQueueDestroyed(), +) +class Queue { + public static async createQueue({ + db, + concurrencyLimit = Number.POSITIVE_INFINITY, + logger = new Logger(this.name), + fresh = false, + }: { + db: DB; + concurrencyLimit?: number; + logger?: Logger; + fresh?: boolean; + }) { + logger.info(`Creating ${this.name}`); + const queue = new this({ db, concurrencyLimit, logger }); + await queue.start({ fresh }); + logger.info(`Created ${this.name}`); + return queue; + } + + public concurrencyLimit: number; + + protected logger: Logger; + protected db: DB; + protected queueDbPath: LevelPath = [this.constructor.name]; + // when the queue to execute the tasks + // the task id is generated outside + // you don't get a task id here + // you just "push" tasks there to be executed + // this is the "shared" set of promises to be maintained + protected promises: Map> = new Map(); + // /** + // * Listeners for task execution + // * When a task is executed, these listeners are synchronously executed + // * The listeners are intended for resolving or rejecting task promises + // */ + // protected listeners: Map> = new Map(); + + public constructor({ + db, + concurrencyLimit, + logger + }: { + db: DB; + concurrencyLimit: number, + logger: Logger + }) { + this.logger = logger; + this.concurrencyLimit = concurrencyLimit; + this.db = db; + } + + public async start({ + fresh = false, + }: { + fresh?: boolean; + } = {}): Promise { + this.logger.info(`Starting ${this.constructor.name}`); + if (fresh) { + await this.db.clear(this.queueDbPath); + } + this.logger.info(`Started ${this.constructor.name}`); + } + + public async stop(): Promise { + this.logger.info(`Stopping ${this.constructor.name}`); + this.logger.info(`Stopped ${this.constructor.name}`); + } + + public async destroy() { + this.logger.info(`Destroying ${this.constructor.name}`); + await this.db.clear(this.queueDbPath); + this.logger.info(`Destroyed ${this.constructor.name}`); + } + + // promises are "connected" to events + + // when tasks are "dispatched" to the queue + // they are actually put into a persistent system + // then we proceed to execution + + // a task here is a function + // this is already managed by the Scheduler + // along with the actual function itself? + // we also have a priority + + // t is a task + // but it's actually just a function + // and in this case + // note that we are "passing" in the parameters at this point + // but it is any function + // () => taskHandler(parameters) + + // it returns a "task" + // that should be used like a "lazy" promise + // the actual task function depends on the situation + // don't we need to know actual metadata + // wait a MINUTE + // if we are "persisting" it + // do we persist it here? + + public async pushTask(taskF: TaskFunction, priority: TaskPriority) { + // this needs to proceed to push it into an in-memory queue + // and maintain a concurrency limit? + // my issue is that whatever does the persistence + // will need to execute it with the parmaeters and the task handler + // so by the time it is in memory as a `taskF` + // then no persistence makes sense anymore + } + + + /** + * IF a handler does not exist + * if the task is executed + * then an exception is thrown + * if listener exists, the exception is passed into this listener function + * if it doesn't exist, then it's just a reference exception in general, this can be logged + * There's nothing else to do + */ + // @ready(new tasksErrors.ErrorSchedulerNotRunning()) + // protected registerListener( + // taskId: TaskId, + // taskListener: TaskListener + // ): void { + // const taskIdString = taskId.toString() as TaskIdString; + // const taskListeners = this.listeners.get(taskIdString); + // if (taskListeners != null) { + // taskListeners.push(taskListener); + // } else { + // this.listeners.set(taskIdString, [taskListener]); + // } + // } + + // @ready(new tasksErrors.ErrorSchedulerNotRunning()) + // protected deregisterListener( + // taskId: TaskId, + // taskListener: TaskListener + // ): void { + // const taskIdString = taskId.toString() as TaskIdString; + // const taskListeners = this.listeners.get(taskIdString); + // if (taskListeners == null || taskListeners.length < 1) return; + // const index = taskListeners.indexOf(taskListener); + // if (index !== -1) { + // taskListeners.splice(index, 1); + // } + // } + +} + +export default Queue; + + +// epic queue +// need to do a couple things: +// 1. integrate fast-check +// 2. integrate span checks +// 3. might also consider span logs? +// 4. open tracing observability +// 5. structured logging +// 6. async hooks to get traced promises to understand the situation +// 7. do we also get fantasy land promises? and async cancellable stuff? +// 8. task abstractions? +// need to use the db for this +// 9. priority structure +// 10. timers +// abort controller + +// kinetic data structure +// the priority grows as a function of time +// order by priority <- this thing has a static value +// in a key value DB, you can maintain sorted index of values +// IDs can be lexicographically sortable + +// this is a persistent queue +// of tasks that should be EXECUTED right now +// the scheduler is a persistent scheduler of scheduled tasks +// tasks get pushed from the scheduler into the queue +// the queue connects to the WorkerManager diff --git a/src/tasks/Scheduler.ts b/src/tasks/Scheduler.ts new file mode 100644 index 000000000..7725685e1 --- /dev/null +++ b/src/tasks/Scheduler.ts @@ -0,0 +1,538 @@ +import type { DB, DBTransaction, LevelPath, KeyPath } from '@matrixai/db'; +import type { + TaskId, + TaskHandlerId, + TaskHandler, + TaskData, + TaskInfo, + TaskIdString, + TaskParameters, + TaskDelay, + TaskTimestamp +} from './types'; +import type KeyManager from '../keys/KeyManager'; +import type { PolykeyWorkerManagerInterface } from '../workers/types'; +import type { POJO, Callback, PromiseDeconstructed } from '../types'; +import Logger from '@matrixai/logger'; +import { IdInternal } from '@matrixai/id'; +import { extractTs } from '@matrixai/id/dist/IdSortable'; +import { + CreateDestroyStartStop, + ready, +} from '@matrixai/async-init/dist/CreateDestroyStartStop'; +import lexi from 'lexicographic-integer'; +import Queue from './Queue'; +import * as tasksUtils from './utils'; +import * as tasksErrors from './errors'; +import * as utils from '../utils'; + +interface Scheduler extends CreateDestroyStartStop {} +@CreateDestroyStartStop( + new tasksErrors.ErrorSchedulerRunning(), + new tasksErrors.ErrorSchedulerDestroyed(), +) +class Scheduler { + /** + * Create the scheduler, which will create its own Queue + * This will automatically start the scheduler + * If the scheduler needs to be started after the fact + * Make sure to construct it, and then call `start` manually + */ + public static async createScheduler({ + db, + keyManager, + queue, + logger = new Logger(this.name), + handlers = {}, + delay = false, + fresh = false, + }: { + db: DB; + keyManager: KeyManager; + queue?: Queue; + logger?: Logger; + handlers?: Record; + delay?: boolean; + fresh?: boolean; + }): Promise { + logger.info(`Creating ${this.name}`); + queue = queue ?? await Queue.createQueue({ + db, + logger: logger.getChild(Queue.name), + fresh + }); + const scheduler = new this({ db, keyManager, queue, logger }); + await scheduler.start({ handlers, delay, fresh }); + logger.info(`Created ${this.name}`); + return scheduler; + } + + protected logger: Logger; + protected db: DB; + protected keyManager: KeyManager; + protected queue: Queue; + protected handlers: Map = new Map(); + protected generateTaskId: () => TaskId; + + // TODO: swap this out for the timer system later + + protected processingTimer?: ReturnType; + protected processingTimerTimestamp?: number; + + protected schedulerDbPath: LevelPath = [this.constructor.name]; + + /** + * Last Task Id + */ + protected schedulerLastTaskIdPath: KeyPath = [...this.schedulerDbPath, 'lastTaskId']; + + /** + * Tasks collection + * `tasks/{TaskId} -> {json(Task)}` + */ + protected schedulerTasksDbPath: LevelPath = [...this.schedulerDbPath, 'tasks']; + + /** + * Tasks scheduled by time + * `time/{lexi(TaskTimestamp + TaskDelay)} -> {raw(TaskId)}` + */ + protected schedulerTimeDbPath: LevelPath = [...this.schedulerDbPath, 'time']; + + /** + * Tasks queued for execution + * `pending/{lexi(TaskPriority)}/{lexi(TaskTimestamp)} -> {raw(TaskId)}` + */ + protected schedulerPendingDbPath: LevelPath = [...this.schedulerDbPath, 'pending']; + + /** + * Task handlers + * `handlers/{TaskHandlerId}/{TaskId} -> {raw(TaskId)}` + */ + protected schedulerHandlersDbPath: LevelPath = [...this.schedulerDbPath, 'handlers']; + + public constructor({ + db, + keyManager, + queue, + logger + }: { + db: DB; + keyManager: KeyManager; + queue: Queue; + logger: Logger + }) { + this.logger = logger; + this.db = db; + this.keyManager = keyManager; + this.queue = queue; + } + + public get isProcessing(): boolean { + return this.processingTimer != null; + } + + public async start({ + handlers = {}, + delay = false, + fresh = false, + }: { + handlers?: Record; + delay?: boolean; + fresh?: boolean; + } = {}): Promise { + this.logger.info(`Starting ${this.constructor.name}`); + if (fresh) { + this.handlers.clear(); + // this.promises.clear(); + await this.db.clear(this.schedulerDbPath); + } + for (const taskHandlerId in handlers) { + this.handlers.set(taskHandlerId as TaskHandlerId, handlers[taskHandlerId]); + } + const lastTaskId = await this.getLastTaskId(); + this.generateTaskId = tasksUtils.createTaskIdGenerator( + this.keyManager.getNodeId(), + lastTaskId, + ); + // Flip this to true to delay the processing start + // if task handler registration is done after the scheduler is created + if (!delay) { + await this.startProcessing(); + } + this.logger.info(`Started ${this.constructor.name}`); + } + + /** + * Stop the scheduler + * This does not clear the handlers nor promises + * This maintains any registered handlers and awaiting promises + */ + public async stop(): Promise { + this.logger.info(`Stopping ${this.constructor.name}`); + await this.stopProcessing(); + this.logger.info(`Stopped ${this.constructor.name}`); + } + + /** + * Destroys the scheduler + * This must first clear all handlers + * Then it needs to cancel all promises + * Finally destroys all underlying state + */ + public async destroy() { + this.logger.info(`Destroying ${this.constructor.name}`); + this.handlers.clear(); + + // TODO: cancel the task promises so that any function awaiting may receive a cancellation + // this.promises.clear(); + + await this.db.clear(this.schedulerDbPath); + this.logger.info(`Destroyed ${this.constructor.name}`); + } + + /** + * Starts the processing of the work + */ + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async startProcessing(timeout?: number): Promise { + // If already started, do nothing + if (this.processingTimer != null) return; + + // We actually need to find ht elast task + + await this.db.withTransactionF( + async (tran) => { + // we use the transaction here + // and we use it run our tasks + // every "execution" involves running it here + return; + } + ); + + // when we "pop" a task + // it is actually to peek at the latest task + // then to set a timeout + // the process is that we find tasks that are worth executing right now + // then we dispatch to execution + + + this.processingTimer = setTimeout(() => { + + }, 1000); + } + + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async stopProcessing(): Promise { + clearTimeout(this.processingTimer); + delete this.processingTimer; + } + + public getHandler(handlerId: TaskHandlerId): TaskHandler | undefined { + return this.handlers.get(handlerId); + } + + public getHandlers(): Record { + return Object.fromEntries(this.handlers); + } + + /** + * Registers a handler for tasks with the same `TaskHandlerId` + * If tasks are dispatched without their respective handler, + * the scheduler will throw `tasksErrors.ErrorSchedulerHandlerMissing` + */ + public registerHandler(handlerId: TaskHandlerId, handler: TaskHandler) { + this.handlers.set(handlerId, handler); + } + + /** + * Deregisters a handler + */ + public deregisterHandler(handlerId: TaskHandlerId) { + this.handlers.delete(handlerId); + } + + /** + * Gets a scheduled task data + */ + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async getTaskData( + taskId: TaskId, + tran?: DBTransaction + ): Promise { + return await (tran ?? this.db).get( + [...this.schedulerTasksDbPath, taskId.toBuffer()] + ); + } + + /** + * Gets all scheduled task datas + * Tasks are sorted by the `TaskId` + */ + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async *getTaskDatas( + order: 'asc' | 'desc' = 'asc', + tran?: DBTransaction + ): AsyncGenerator<[TaskId, TaskData]> { + if (tran == null) { + return yield* this.db.withTransactionG( + (tran) => this.getTaskDatas(...arguments, tran) + ); + } + for await (const [keyPath, taskData] of tran.iterator( + this.schedulerTasksDbPath, + { valueAsBuffer: false, reverse: order !== 'asc' } + )) { + const taskId = IdInternal.fromBuffer(keyPath[0] as Buffer); + yield [ + taskId, + taskData + ]; + } + } + + // /** + // * Gets a task abstraction + // */ + // @ready(new tasksErrors.ErrorSchedulerNotRunning()) + // public async getTask(id: TaskId, tran?: DBTransaction) { + // const taskData = await (tran ?? this.db).get([...this.queueTasksDbPath, id.toBuffer()]); + // if (taskData == null) { + // return; + // } + // const { p: taskP, resolveP, rejectP } = utils.promise(); + + // // can we standardise on the unified listener + // // that is 1 listener for every task is created automatically + // // if 1000 tasks are inserted into the DB + // // 1000 listeners are created automatically? + + // // we can either... + // // A standardise on the listener + // // B standardise on the promise + + // // if the creation of the promise is lazy + // // then one can standardise on the promise + // // the idea being if the promise exists, just return the promise + // // if it doesn't exist, then first check if the task id still exists + // // if so, create a promise out of that lazily + // // now you need an object map locking to prevent race conditions on promise creation + // // then there's only ever 1 promise for a given task + // // any other cases, they always give back the same promise + + + // const listener = (taskError, taskResult) => { + // if (taskError != null) { + // rejectP(taskError); + // } else { + // resolveP(taskResult); + // } + // this.deregisterListener(id, listener); + // }; + // this.registerListener(id, listener); + // return taskP; + // } + + + // protected async getTaskP(taskId: TaskId, lazy: boolean, tran: DBTransaction) { + // // does that mean we don't extend the promise? + // // we just make it look like it + // // i guess it works too + // // if not lazy, then we do it immediately + // // we would assert that we already have this + // // so if it is not lazy + // // it is not necessary to do this? + + // if (!lazy) { + // const taskData = await tran.get([...this.queueTasksDbPath, taskId.toBuffer()]); + // if (taskData == null) { + // return; + // } + + // } + + // const { p: taskP, resolveP, rejectP } = utils.promise(); + // const listener = (taskError, taskResult) => { + // if (taskError != null) { + // rejectP(taskError); + // } else { + // resolveP(taskResult); + // } + // this.deregisterListener(id, listener); + // }; + // this.registerListener(id, listener); + + // // can we say that taskP + // return taskP; + // } + + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async scheduleTask( + handlerId: TaskHandlerId, + parameters: TaskParameters = [], + delay: TaskDelay = 0, + priority: number = 0, + lazy: boolean = false, + tran?: DBTransaction + ): Promise { + if (tran == null) { + return this.db.withTransactionF( + (tran) => this.scheduleTask(handlerId, parameters, delay, priority, lazy, tran) + ); + } + const taskId = this.generateTaskId(); + // Timestamp extracted from `IdSortable` is a floating point in seconds + // with subsecond fractionals, multiply it by 1000 gives us milliseconds + const taskTimestamp = Math.trunc(extractTs(taskId) * 1000) as TaskTimestamp; + const taskPriority = tasksUtils.toPriority(priority); + const taskData = { + handlerId, + parameters, + timestamp: taskTimestamp, + delay, + priority: taskPriority, + }; + + const taskIdBuffer = taskId.toBuffer(); + + // Save the task + await tran.put([...this.schedulerTasksDbPath, taskIdBuffer], taskData); + // Save the last task ID + await tran.put(this.schedulerLastTaskIdPath, taskIdBuffer, true); + // Save the scheduled time + const taskScheduledLexi = Buffer.from(lexi.pack(taskTimestamp + delay)); + await tran.put( + [...this.schedulerTimeDbPath, taskScheduledLexi], + taskId.toBuffer(), + true + ); + + // do we do this? + // new Task() + + + + if (!lazy) { + // task().then(onFullfill, onReject).finally(onFinally) + // const { p: taskP, resolveP: resolveTaskP, rejectP: rejectTaskP } = utils.promise(); + // this.promises.set( + // taskId.toString() as TaskIdString, + // { + // taskP, + // resolveTaskP, + // rejectTaskP + // } + // ); + // const taskListener = (taskError, taskResult) => { + // if (taskError != null) { + // resolveTaskP(taskError); + // } else { + // rejectTaskP(taskResult); + // } + // this.deregisterListener(taskId, taskListener); + // }; + // this.registerListener(taskId, taskListener); + } + + + const task = { + id: taskId, + ...taskData, + then: async (onFulfilled, onRejected) => { + + // if this taskP already exists + // then there's no need to set it up? + const taskP = this.promises.get(taskId.toString() as TaskIdString); + // this is going to be bound to somnething? + // we need to create a promise for it? + // but this means you start doing this by default + + } + }; + + // return [ + // { + // id: taskId, + // ...taskData, + // } + // ]; + + // TODO: trigger the processing of the task? + + // TODO: reset the timeout in case the timeouts have been exhausted + // if the timeouts haven't been started or stopped + // scheduling a task can be halted + // you should "set the next setTimeout" + // depending if the setTimeout is already set + // if set, it will reset + + if (this.processingTimer != null) { + // proceed to update the processing timer + // startProcessing will peek at the next task + // and start timing out there + // if the timeout isn't given + // it will instead be given and do it there + // it depends on the timer + // seems like if the timer exists + + // we can "reset" t othe current time + // reschedules it + // but that's not what we want to do + // we want to clear that timeout + // and schedule a new TIMER + // if this overrides that timer + // but we don't know how much time + // or when this is scheduled to run? + + await this.startProcessing(); + } + + } + + // we have to start the loop + // the `setTimeout` is what actually starts the execution + // Pop up the next highest priority + + // when pushing a task + // it is "scheduled" + // but that is not what happens here + + // instead scheduling is triggered in 2 ways + // one by starting the system + // and another when a task is entered into the system + // in both cases, a trigger takes place + + public async popTask(tran?: DBTransaction) { + if (tran == null) { + return this.db.withTransactionF( + (tran) => this.popTask.apply(this, [...arguments, tran]) + ); + } + let taskId: TaskId | undefined; + let taskData: TaskData | undefined; + for await (const [, taskIdBuffer] of tran.iterator( + this.schedulerTimeDbPath, + { + limit: 1, + keys: false + } + )) { + taskId = IdInternal.fromBuffer(taskIdBuffer); + taskData = await tran.get( + [...this.schedulerTasksDbPath, taskIdBuffer] + ); + } + return { + id: taskId, + ...taskData + }; + } + + @ready(new tasksErrors.ErrorSchedulerNotRunning(), false, ['starting']) + public async getLastTaskId(tran?: DBTransaction): Promise { + const lastTaskIdBuffer = await (tran ?? this.db).get(this.schedulerLastTaskIdPath, true); + if (lastTaskIdBuffer == null) return; + return IdInternal.fromBuffer(lastTaskIdBuffer); + } +} + +export default Scheduler; diff --git a/src/tasks/Task.ts b/src/tasks/Task.ts new file mode 100644 index 000000000..3b8fef7c1 --- /dev/null +++ b/src/tasks/Task.ts @@ -0,0 +1,134 @@ +import type Queue from "./Scheduler"; +import type { TaskId, TaskData, TaskHandlerId, TaskTimestamp, TaskDelay, TaskPriority, TaskHandler, TaskParameters } from "./types"; +import type { DeepReadonly } from '../types'; + +class Task extends Promise { + public readonly id: TaskId; + public readonly handlerId: TaskHandlerId; + public readonly parameters: DeepReadonly; + public readonly timestamp: TaskTimestamp; + public readonly delay: TaskDelay; + public readonly priority: TaskPriority; + + protected queue: Queue; + protected resolveP: (value: T | PromiseLike) => void; + protected rejectP: (reason?: any) => void; + + constructor( + queue: Queue, + id: TaskId, + handlerId: TaskHandlerId, + parameters: TaskParameters, + timestamp: TaskTimestamp, + delay: TaskDelay, + priority: TaskPriority, + ) { + let resolveP, rejectP; + super((resolve, reject) => { + resolveP = resolve; + rejectP = reject; + }); + this.resolveP = resolveP; + this.rejectP = rejectP; + + // I'm not sure about the queue + // but if this is the reference here + // then we need to add the event handler into the queue to wait for this + this.queue = queue; + + this.id = id; + this.handlerId = handlerId; + this.parameters = parameters; + this.timestamp = timestamp; + this.delay = delay; + this.priority = priority; + } + + public toJSON(): TaskData & { id: TaskId } { + return { + id: this.id, + handlerId: this.handlerId, + // TODO: change this to `structuredClone` when available + parameters: JSON.parse(JSON.stringify(this.parameters)), + timestamp: this.timestamp, + delay: this.delay, + priority: this.priority, + }; + } + + /** + * This is called when `await` is used + */ + public async then( + onFulfilled?: ((value: T) => TResult1 | PromiseLike) | undefined | null, + onRejected?: ((reason: any) => TResult2 | PromiseLike) | undefined | null + ): Promise { + + // this is the promise now + // we can say that we only do what is needed + // we can make this `then` asynchronous + // do we use the same db + // or ask the Task to have the same capability? + + + + return undefined as any; + } + + // public then( + // onFulfilled?: ((value: T) => TResult1 | PromiseLike) | undefined | null, + // onRejected?: ((reason: any) => TResult2 | PromiseLike) | undefined | null, + // ): Promise { + + // // these callbacks + // // how are they supposed to be used? + // // this is a promise + // return undefined as any; + + // } + +// then( +// onfulfilled?: ((value: T) => TResult1 | PromiseLike) | undefined | null, +// onrejected?: ((reason: any) => TResult2 | PromiseLike) | undefined | null): +// Promise; + + // public catch () { + + // } + + // public finally () { + + // } + +} + +// const t = new Task(); + +const p = new Promise((resolve, reject) => { + resolve(); +}); + +p.then +// p.catch +// p.finally +// /** +// * Represents the completion of an asynchronous operation +// */ +// interface Promise { +// /** +// * Attaches callbacks for the resolution and/or rejection of the Promise. +// * @param onfulfilled The callback to execute when the Promise is resolved. +// * @param onrejected The callback to execute when the Promise is rejected. +// * @returns A Promise for the completion of which ever callback is executed. +// */ + +// /** +// * Attaches a callback for only the rejection of the Promise. +// * @param onrejected The callback to execute when the Promise is rejected. +// * @returns A Promise for the completion of the callback. +// */ +// catch(onrejected?: ((reason: any) => TResult | PromiseLike) | undefined | null): Promise; +// } + + +export default Task; diff --git a/src/tasks/errors.ts b/src/tasks/errors.ts new file mode 100644 index 000000000..d33620344 --- /dev/null +++ b/src/tasks/errors.ts @@ -0,0 +1,79 @@ +import { ErrorPolykey, sysexits } from '../errors'; + +class ErrorTasks extends ErrorPolykey {} + +class ErrorScheduler extends ErrorTasks {} + +class ErrorSchedulerRunning extends ErrorScheduler { + static description = 'Scheduler is running'; + exitCode = sysexits.USAGE; +} + +class ErrorSchedulerNotRunning extends ErrorScheduler { + static description = 'Scheduler is not running'; + exitCode = sysexits.USAGE; +} + +class ErrorSchedulerDestroyed extends ErrorScheduler { + static description = 'Scheduler is destroyed'; + exitCode = sysexits.USAGE; +} + +class ErrorSchedulerHandlerMissing extends ErrorScheduler { + static description = 'Scheduler task handler is not registered'; + exitCode = sysexits.USAGE; +} + +class ErrorQueue extends ErrorTasks {} + +class ErrorQueueRunning extends ErrorQueue { + static description = 'Queue is running'; + exitCode = sysexits.USAGE; +} + +class ErrorQueueNotRunning extends ErrorQueue { + static description = 'Queue is not running'; + exitCode = sysexits.USAGE; +} + +class ErrorQueueDestroyed extends ErrorQueue { + static description = 'Queue is destroyed'; + exitCode = sysexits.USAGE; +} + +class ErrorTask extends ErrorTasks { + static description = 'Task error'; + exitCode = sysexits.USAGE; +} + +class ErrorTaskRejected extends ErrorTask { + static description = 'Task handler threw an exception'; + exitCode = sysexits.USAGE; +} + +class ErrorTaskCancelled extends ErrorTask { + static description = 'Task has been cancelled'; + exitCode = sysexits.USAGE; +} + +class ErrorTaskMissing extends ErrorTask { + static description = 'Task does not (or never) existed anymore, it may have been fulfilled or cancelled'; + exitCode = sysexits.USAGE; +} + +export { + ErrorTasks, + ErrorScheduler, + ErrorSchedulerRunning, + ErrorSchedulerNotRunning, + ErrorSchedulerDestroyed, + ErrorSchedulerHandlerMissing, + ErrorQueue, + ErrorQueueRunning, + ErrorQueueNotRunning, + ErrorQueueDestroyed, + ErrorTask, + ErrorTaskRejected, + ErrorTaskCancelled, + ErrorTaskMissing, +}; diff --git a/src/tasks/index.ts b/src/tasks/index.ts new file mode 100644 index 000000000..ae900e45b --- /dev/null +++ b/src/tasks/index.ts @@ -0,0 +1,4 @@ +export { default as Scheduler } from './Scheduler'; +export * as types from './types'; +export * as utils from './utils'; +export * as errors from './errors'; diff --git a/src/tasks/types.ts b/src/tasks/types.ts new file mode 100644 index 000000000..02f1d2928 --- /dev/null +++ b/src/tasks/types.ts @@ -0,0 +1,104 @@ +import type { Id } from '@matrixai/id'; +import type { POJO, Opaque, Callback } from '../types'; + +type TaskId = Opaque<'TaskId', Id>; +type TaskIdString = Opaque<'TaskIdString', string>; +type TaskIdEncoded = Opaque<'TaskIdEncoded', string>; + +/** + * Timestamp unix time in milliseconds + */ +type TaskTimestamp = number; + +/** + * Timestamp is millisecond number >= 0 + */ +type TaskDelay = number; + +type TaskParameters = Array; + +/** + * Task priority is an `uint8` [0 to 255] + * Where `0` is the highest priority and `255` is the lowest priority + */ +type TaskPriority = Opaque<'TaskPriority', number>; + +/** + * Task data to be persisted + */ +type TaskData = { + handlerId: TaskHandlerId; + parameters: TaskParameters; + timestamp: TaskTimestamp; + delay: TaskDelay; + priority: TaskPriority; +}; + +/** + * Task information that is returned to the user + */ +type TaskInfo = TaskData & { + id: TaskId; +}; + +type TaskHandlerId = Opaque<'TaskHandlerId', string>; + +type TaskHandler< + P extends Array = [], + R = any +> = (...params: P) => Promise; + + +/** + * Task function is the result of a lambda abstraction of applying + * `TaskHandler` to its respective parameters + * This is what gets executed + */ +type TaskFunction = () => Promise; + + +// type TaskListener = Callback<[taskResult: any], void>; +// Make Task something that can be awaited on +// but when you "make" a promise or reference it +// you're for a promise +// that will resolve an event occurs +// or reject when an event occurs +// and the result of the execution +// now the exeuction of the event itself is is going to return ap romise +// something must be lisetning to it +// If you have a Record +// it has to be TaskIdString +// you can store things in it +// type X = Record; +// Task is the lowest level +// TaskData is low level +// TaskInfo is high level +// TaskId +// Task <- lazy promise +// TaskData <- low level data of a task (does not include id) +// TaskInfo <- high level (includes id) +// This is a lazy promise +// it's a promise of something that may not yet immediately executed +// type TaskPromise = Promise; +// Consider these variants... (should standardise what these are to be used) +// Task +// Tasks (usually a record, sometimes an array) +// TaskData - lower level data of a task +// TaskInfo - higher level information that is inclusive of data +// type TaskData = Record; + +export type { + TaskId, + TaskIdString, + TaskIdEncoded, + // Task, + TaskData, + TaskInfo, + TaskHandlerId, + TaskHandler, + TaskPriority, + // TaskListener + TaskParameters, + TaskTimestamp, + TaskDelay +}; diff --git a/src/tasks/utils.ts b/src/tasks/utils.ts new file mode 100644 index 000000000..aeea37f17 --- /dev/null +++ b/src/tasks/utils.ts @@ -0,0 +1,47 @@ +import type { TaskId, TaskPriority } from './types'; +import type { NodeId } from '../nodes/types'; +import { IdSortable } from '@matrixai/id'; + +/** + * Generates TaskId + * TaskIds are lexicographically sortable 128 bit IDs + * They are strictly monotonic and unique with respect to the `nodeId` + * When the `NodeId` changes, make sure to regenerate this generator + */ +function createTaskIdGenerator(nodeId: NodeId, lastTaskId?: TaskId) { + const generator = new IdSortable({ + lastId: lastTaskId, + nodeId, + }); + return () => generator.get(); +} + +/** + * Converts `int8` to flipped `uint8` task priority + * Clips number to between -128 to 127 inclusive + */ +function toPriority(n: number): TaskPriority { + n = Math.min(n, 127); + n = Math.max(n, -128); + n *= -1 + n -= 1 + n += 128; + return n as TaskPriority; +} + +/** + * Converts flipped `uint8` task priority to `int8` + */ +function fromPriority(p: TaskPriority): number { + let n = p - 128; + n += 1; + // Prevent returning `-0` + if (n !== 0) n *= -1; + return n; +} + +export { + createTaskIdGenerator, + toPriority, + fromPriority +}; diff --git a/src/timer/Timer.ts b/src/timer/Timer.ts new file mode 100644 index 000000000..5a0a46984 --- /dev/null +++ b/src/timer/Timer.ts @@ -0,0 +1,193 @@ +import { performance } from 'perf_hooks'; +import { CreateDestroy } from '@matrixai/async-init/dist/CreateDestroy'; +import * as timerErrors from './errors'; + +/** + * Unlike `setTimeout` or `setInterval`, + * this will not keep the NodeJS event loop alive + */ +interface Timer extends CreateDestroy {} +@CreateDestroy() +class Timer implements Promise { + public static createTimer({ + handler, + delay = 0, + }: { + handler?: () => T; + delay?: number; + } = {}): Timer { + return new this({handler, delay}); + } + + /** + * Delay in milliseconds + * This may be `Infinity` + */ + public readonly delay: number; + + /** + * Timestamp when this is constructed + * Guaranteed to be weakly monotonic within the process lifetime + * Compare this with `performance.now()` not `Date.now()` + */ + public readonly timestamp: Date; + + /** + * Timestamp when this is scheduled to finish and execute the handler + * Guaranteed to be weakly monotonic within the process lifetime + * Compare this with `performance.now()` not `Date.now()` + */ + public readonly scheduled?: Date; + + /** + * Handler to be executed + */ + protected handler?: () => T; + + /** + * Deconstructed promise + */ + protected p: Promise; + + /** + * Resolve deconstructed promise + */ + protected resolveP: (value?: T) => void; + + /** + * Reject deconstructed promise + */ + protected rejectP: (reason?: timerErrors.ErrorTimer) => void; + + /** + * Internal timeout reference + */ + protected timeoutRef?: ReturnType; + + /** + * Whether the timer has timed out + * This is only `true` when the timer resolves + * If the timer rejects, this stays `false` + */ + protected _status: 'resolved' | 'rejected' | null = null; + + constructor({ + handler, + delay = 0, + }: { + handler?: () => T; + delay?: number; + } = {}) { + // Clip to delay >= 0 + delay = Math.max(delay, 0); + // Coerce NaN to minimal delay of 0 + if (isNaN(delay)) delay = 0; + this.handler = handler; + this.delay = delay; + this.p = new Promise((resolve, reject) => { + this.resolveP = resolve.bind(this.p); + this.rejectP = reject.bind(this.p); + }); + // If the delay is Infinity, there is no `setTimeout` + // therefore this promise will never resolve + // it may still reject however + if (isFinite(delay)) { + this.timeoutRef = setTimeout(() => void this.destroy('resolve'), delay); + if (typeof this.timeoutRef.unref === 'function') { + // Do not keep the event loop alive + this.timeoutRef.unref(); + } + this.timestamp = new Date( + performance.timeOrigin + performance.now() + ); + this.scheduled = new Date( + this.timestamp.getTime() + delay + ); + } else { + // There is no `setTimeout` nor `setInterval` + // so the event loop will not be kept alive + this.timestamp = new Date( + performance.timeOrigin + performance.now() + ); + } + } + + public get [Symbol.toStringTag](): string { + return this.constructor.name; + } + + public get status(): 'resolved' | 'rejected' | null { + return this._status; + } + + public async destroy(type: 'resolve' | 'reject' = 'resolve'): Promise { + clearTimeout(this.timeoutRef); + delete this.timeoutRef; + if (type ==='resolve') { + this._status = 'resolved'; + if (this.handler != null) { + this.resolveP(this.handler()); + } else { + this.resolveP(); + } + } else if (type === 'reject') { + this._status = 'rejected'; + this.rejectP(new timerErrors.ErrorTimerCancelled()); + } + } + + /** + * Gets the remaining time in milliseconds + * This will return `Infinity` if `delay` is `Infinity` + * This will return `0` if status is `resolved` or `rejected` + */ + public getTimeout(): number { + if (this._status !== null) return 0; + if (this.scheduled == null) return Infinity; + return Math.max( + Math.trunc(this.scheduled.getTime() - ( + performance.timeOrigin + performance.now() + )), + 0, + ); + } + + /** + * To remaining time as a string + * This may return `'Infinity'` if `this.delay` is `Infinity` + */ + public toString(): string { + return this.getTimeout().toString(); + } + + /** + * To remaining time as a number + * This may return `Infinity` if `this.delay` is `Infinity` + */ + public valueOf(): number { + return this.getTimeout(); + } + + public then( + onFulfilled?: ((value: T) => TResult1 | PromiseLike) | undefined | null, + onRejected?: ((reason: any) => TResult2 | PromiseLike) | undefined | null + ): Promise { + return this.p.then(onFulfilled, onRejected); + } + + public catch( + onRejected?: ((reason: any) => TResult | PromiseLike) | undefined | null + ): Promise { + return this.p.catch(onRejected); + } + + public finally(onFinally?: (() => void) | undefined | null): Promise { + return this.p.finally(onFinally); + } + + public cancel() { + void this.destroy('reject'); + } +} + +export default Timer; diff --git a/src/timer/errors.ts b/src/timer/errors.ts new file mode 100644 index 000000000..74dd112d6 --- /dev/null +++ b/src/timer/errors.ts @@ -0,0 +1,13 @@ +import { ErrorPolykey, sysexits } from '../errors'; + +class ErrorTimer extends ErrorPolykey {} + +class ErrorTimerCancelled extends ErrorTimer { + static description = 'Timer is cancelled'; + exitCode = sysexits.USAGE; +} + +export { + ErrorTimer, + ErrorTimerCancelled, +}; diff --git a/src/timer/index.ts b/src/timer/index.ts new file mode 100644 index 000000000..641d7a25d --- /dev/null +++ b/src/timer/index.ts @@ -0,0 +1,2 @@ +export { default as Timer } from './Timer'; +export * as errors from './errors'; diff --git a/src/tracing/Trace.ts b/src/tracing/Trace.ts new file mode 100644 index 000000000..dd930ad72 --- /dev/null +++ b/src/tracing/Trace.ts @@ -0,0 +1,2 @@ +// creates Traces +// should be used to create `Span` diff --git a/src/types.ts b/src/types.ts index d0d73eef5..216f4fc49 100644 --- a/src/types.ts +++ b/src/types.ts @@ -45,6 +45,11 @@ interface ToString { toString(): string; } +/** + * Recursive readonly + */ +type DeepReadonly = { readonly [K in keyof T]: DeepReadonly }; + /** * Wrap a type to be reference counted * Useful for when we need to garbage collect data @@ -122,6 +127,7 @@ export type { Initial, InitialParameters, ToString, + DeepReadonly, Ref, Timer, PromiseDeconstructed, diff --git a/src/utils/PromiseCancellable.ts b/src/utils/PromiseCancellable.ts new file mode 100644 index 000000000..539ee3811 --- /dev/null +++ b/src/utils/PromiseCancellable.ts @@ -0,0 +1,244 @@ +type PromiseCancellableController = ((signal: AbortSignal) => void) | AbortController; + +class PromiseCancellable extends Promise { + public static resolve(): PromiseCancellable; + public static resolve(value: T | PromiseLike): PromiseCancellable; + public static resolve(value?: T | PromiseLike): PromiseCancellable { + return super.resolve(value) as PromiseCancellable; + } + + public static reject(reason?: any): PromiseCancellable { + return super.reject(reason) as PromiseCancellable; + } + + public static all( + values: T, + controller?: PromiseCancellableController + ): PromiseCancellable<{ -readonly [P in keyof T]: Awaited }>; + public static all( + values: Iterable>, + controller?: PromiseCancellableController + ): PromiseCancellable[]>; + public static all( + values: Iterable>, + controller?: PromiseCancellableController + ): PromiseCancellable[]> { + const p = super.all(values) as PromiseCancellable[]>; + if (typeof controller === 'function') { + controller(p.abortController.signal); + } else if (controller != null) { + p.abortController = controller; + } else { + p.abortController.signal.onabort = () => { + p.reject(p.abortController.signal.reason); + }; + } + return p; + } + + public static allSettled( + values: T, + controller?: PromiseCancellableController + ): PromiseCancellable<{ -readonly [P in keyof T]: PromiseSettledResult> }>; + public static allSettled( + values: Iterable>, + controller?: PromiseCancellableController + ): PromiseCancellable>[]>; + public static allSettled( + values: Iterable>, + controller?: PromiseCancellableController + ): PromiseCancellable>[]> { + const p = super.allSettled(values) as PromiseCancellable>[]>; + if (typeof controller === 'function') { + controller(p.abortController.signal); + } else if (controller != null) { + p.abortController = controller; + } else { + p.abortController.signal.onabort = () => { + p.reject(p.abortController.signal.reason); + }; + } + return p; + } + + public static race( + values: T, + controller?: PromiseCancellableController + ): PromiseCancellable>; + public static race( + values: Iterable>, + controller?: PromiseCancellableController + ): PromiseCancellable>; + public static race( + values: Iterable>, + controller?: PromiseCancellableController + ): PromiseCancellable> { + const p = super.race(values) as PromiseCancellable>; + if (typeof controller === 'function') { + controller(p.abortController.signal); + } else if (controller != null) { + p.abortController = controller; + } else { + p.abortController.signal.onabort = () => { + p.reject(p.abortController.signal.reason); + }; + } + return p; + } + + public static any( + values: T, + controller?: PromiseCancellableController + ): Promise>; + public static any( + values: Iterable>, + controller?: PromiseCancellableController + ): Promise>; + public static any( + values: Iterable>, + controller?: PromiseCancellableController + ): PromiseCancellable> { + const p = super.any(values) as PromiseCancellable>; + if (typeof controller === 'function') { + controller(p.abortController.signal); + } else if (controller != null) { + p.abortController = controller; + } else { + p.abortController.signal.onabort = () => { + p.reject(p.abortController.signal.reason); + }; + } + return p; + } + + public static from( + p: PromiseLike, + controller?: PromiseCancellableController + ): PromiseCancellable { + if (p instanceof PromiseCancellable) return p; + if (typeof controller === 'function') { + return new this((resolve, reject, signal) => { + controller(signal); + void p.then(resolve, reject) + }); + } else if (controller != null) { + return new this((resolve, reject) => { + void p.then(resolve, reject) + }, controller); + } else { + return new this((resolve, reject, signal) => { + signal.onabort = () => { + reject(signal.reason); + }; + void p.then(resolve, reject) + }); + } + } + + protected readonly reject: (reason?: any) => void; + protected abortController: AbortController; + + public constructor( + executor: ( + resolve: (value: T | PromiseLike) => void, + reject: (reason?: any) => void, + signal: AbortSignal + ) => void, + abortController: AbortController = new AbortController + ) { + let reject_: (reason?: any) => void; + super((resolve, reject) => { + reject_ = reject; + executor(resolve, reject, abortController.signal); + }); + this.reject = reject_!; + this.abortController = abortController; + } + + public get [Symbol.toStringTag](): string { + return this.constructor.name; + } + + public cancel(reason?: any): void { + this.abortController.abort(reason); + } + + public then( + onFulfilled?: ((value: T, signal: AbortSignal) => TResult1 | PromiseLike) | undefined | null, + onRejected?: ((reason: any, signal: AbortSignal) => TResult2 | PromiseLike) | undefined | null, + controller?: PromiseCancellableController + ): PromiseCancellable { + let signal; + let onFulfilled_; + let onRejected_; + if (typeof onFulfilled === 'function') { + onFulfilled_ = (value: T) => onFulfilled(value, signal); + } + if (typeof onRejected === 'function') { + onRejected_ = (reason: any) => onRejected(reason, signal); + } + const p = super.then( + onFulfilled_, + onRejected_ + ) as PromiseCancellable; + if (typeof controller === 'function') { + controller(p.abortController.signal); + } else if (controller != null) { + p.abortController = controller; + } else { + p.abortController.signal.onabort = () => { + p.reject(p.abortController.signal.reason); + }; + } + signal = p.abortController.signal; + return p; + } + + public catch( + onRejected?: ((reason: any, signal: AbortSignal) => TResult | PromiseLike) | undefined | null, + controller?: PromiseCancellableController + ): PromiseCancellable { + let signal; + let onRejected_; + if (typeof onRejected === 'function') { + onRejected_ = (reason: any) => onRejected(reason, signal); + } + const p = super.catch(onRejected_) as PromiseCancellable; + if (typeof controller === 'function') { + controller(p.abortController.signal); + } else if (controller != null) { + p.abortController = controller; + } else { + p.abortController.signal.onabort = () => { + p.reject(p.abortController.signal.reason); + }; + } + signal = p.abortController.signal; + return p; + } + + public finally( + onFinally?: ((signal: AbortSignal) => void) | undefined | null, + controller?: PromiseCancellableController + ): PromiseCancellable { + let signal; + let onFinally_; + if (typeof onFinally === 'function') { + onFinally_ = () => onFinally(signal); + } + const p = super.finally(onFinally_) as PromiseCancellable; + if (typeof controller === 'function') { + controller(p.abortController.signal); + } else if (controller != null) { + p.abortController = controller; + } else { + p.abortController.signal.onabort = () => { + p.reject(p.abortController.signal.reason); + }; + } + signal = p.abortController.signal; + return p; + } +} + +export default PromiseCancellable; diff --git a/src/utils/index.ts b/src/utils/index.ts index 2ee8414ff..712e50b42 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -1,4 +1,5 @@ export { default as sysexits } from './sysexits'; +export { default as PromiseCancellable } from './PromiseCancellable'; export * from './utils'; export * from './matchers'; export * from './binary'; diff --git a/src/utils/utils.ts b/src/utils/utils.ts index f7c904194..4b99a9eb1 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -309,6 +309,10 @@ function debounce

( }; } +const AsyncFunction = (async () => {}).constructor; +const GeneratorFunction = function* () {}.constructor; +const AsyncGeneratorFunction = async function* () {}.constructor; + export { getDefaultNodePath, never, @@ -331,4 +335,7 @@ export { asyncIterableArray, bufferSplit, debounce, + AsyncFunction, + GeneratorFunction, + AsyncGeneratorFunction, }; diff --git a/test-abort.ts b/test-abort.ts new file mode 100644 index 000000000..c4b421214 --- /dev/null +++ b/test-abort.ts @@ -0,0 +1,28 @@ +const abc = new AbortController(); + +abc.signal.onabort = (e: Event) => { + console.log(e); + console.log(this); + // @ts-ignore + console.log(e.target.reason); + // @ts-ignore + console.log(e.currentTarget.reason); + console.log('REASON:', abc.signal.reason); +}; + +// so we just need th reason +// cause it's not going to be there +// currentTarget is where the listener is attached +// target is where it was emitted +// it was emitted on the original reason +// ok I think i understand + + +abc.abort('oh'); + + + + +// abc.signal.onabort + +// onabort: ((this: AbortSignal, ev: Event) => any) | null; diff --git a/test-cancellable.ts b/test-cancellable.ts new file mode 100644 index 000000000..d2ec7df92 --- /dev/null +++ b/test-cancellable.ts @@ -0,0 +1,94 @@ +type PromiseLikeCancellable = PromiseLike & { cancel(): void }; + +function isPromiseLikeCancellable(value: unknown): value is PromiseLikeCancellable { + return typeof value === 'object' && + value !== null && + typeof value['then'] === 'function' && + typeof value['cancel'] === 'function'; +} + +type PromiseCancelHandler = ( + reason: any, + reject: (reason?: any) => void, +) => void; + +class PromiseCancellable extends Promise { + public static from( + p: PromiseLike, + onCancel?: PromiseCancelHandler + ): PromiseCancellable { + return new this( + (resolve, reject) => void p.then(resolve, reject), + onCancel + ); + } + + protected reject: (reason?: any) => void; + protected onCancel: PromiseCancelHandler; + + public constructor( + executor: ( + resolve: (value: T | PromiseLike) => void, + reject: (reason?: any) => void + ) => void, + onCancel: PromiseCancelHandler = (reason, reject) => void reject(reason) + ) { + let reject_; + super((resolve, reject) => { + reject_ = reject; + executor(resolve, reject); + }); + this.reject = reject_; + this.onCancel = onCancel; + } + + public cancel(reason?: any): void { + this.onCancel(reason, this.reject); + } +} + +function f(ctx: { signal: AbortSignal }): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + resolve('result'); + }, 1000); + ctx.signal.onabort = () => { + clearTimeout(timeout); + console.log('ABORTING SIDE EFFECT'); + if (ctx.signal.reason === undefined) { + reject(new Error('Aborted F')); + } else { + reject(new Error('Aborted F', { cause: ctx.signal.reason })); + } + }; + }); +} + +async function main () { + + const abortController = new AbortController(); + const p = f({ signal: abortController.signal }); + const pC = PromiseCancellable.from( + p, + (reason) => { + abortController.abort(reason); + } + ); + + // Ok the reason can now be anything + pC.cancel('The Reason'); + + try { + const r = await pC; + console.log('GOT THE RESULT', r); + } catch (e) { + console.log('GOT THE ERROR', typeof e, e); + } + +} + +// all(values: Iterable>): Promise[]>; +// all(values: T): Promise<{ -readonly [P in keyof T]: Awaited }>; +// Promise.all + +void main(); diff --git a/test-constructor.ts b/test-constructor.ts new file mode 100644 index 000000000..837463425 --- /dev/null +++ b/test-constructor.ts @@ -0,0 +1,25 @@ +class Parent { + static createParent() { + // when you call `this`, + // it constructs the instnace + // this can fail, because the child class can + // take over the constructor signature + return new this(); + } + constructor() { + console.log('parent'); + } +} + +class Child extends Parent { + constructor(b: number) { + console.log('What is b', b + 1); + super(); + } +} + +const p = Child.createParent(); + +console.log(p); + +// Ok I get it diff --git a/test-dec.ts b/test-dec.ts new file mode 100644 index 000000000..deca30746 --- /dev/null +++ b/test-dec.ts @@ -0,0 +1,56 @@ +import Timer from './src/timer/Timer'; +import { context, timer } from './src/timer/utils'; + +// class C { + +// // f(b: string, a: string, opts?: { timer?: Timer}): void; +// @timer(1000) +// f(b: string, @context a: string, @context opts: { timer: Timer }): void { +// console.log('DONE', opts?.timer instanceof Timer); +// } + +// // g(b: string, a: string, opts?: { timer?: Timer}): Promise; +// // @timer(1000) +// // async g(b: string, a: string, @context opts: { timer: Timer }): Promise { +// // console.log('DONE', opts?.timer instanceof Timer); +// // } + +// } + +// const c = new C(); +// c.f('lol', 'abc'); +// const AsyncFunction = (async () => {}).constructor; +// const GeneratorFunction = function* () {}.constructor; +// const AsyncGeneratorFunction = async function* () {}.constructor; +// console.log(c.g instanceof AsyncFunction); + + +// try { + + // class A { + + // static f(@context context: string, @context b: string) { + // } + + // // public f(@context context: string, b: number) { + + // // } + // } + + // instance name doesn't make sense + // lol + + + // a[s] + + // a.f + // A.f + +// } catch (e) { +// console.log(e.message); + +// } + +// function lol(@context context: string) { + +// } diff --git a/test-p.ts b/test-p.ts new file mode 100644 index 000000000..206d44276 --- /dev/null +++ b/test-p.ts @@ -0,0 +1,22 @@ +async function main () { + + const p = Promise.resolve(1); + const p2 = p.then((v) => { + console.log('DO SOME COMPUTATION'); + return new Promise((resolve, reject) => { + setTimeout(() => resolve('abc'), 1000); + reject(new Error('yea')); + }); + }); + + try { + await p2; + } catch (e) { + console.log('ERROR', e); + } + + console.log('DONE'); + +} + +void main(); diff --git a/test-promises.ts b/test-promises.ts new file mode 100644 index 000000000..546d21f9b --- /dev/null +++ b/test-promises.ts @@ -0,0 +1,35 @@ +async function main () { + + const p = new Promise((resolve, reject) => { + setTimeout(() => { + reject(new Error('oh no')) + }, 500); + setTimeout(resolve, 1000); + }); + + const f = async () => { + await p; + return 'f'; + }; + + const g = async () => { + await p; + return 'g'; + }; + + const r = await Promise.allSettled([ + f(), + g() + ]); + + console.log(r); + + // @ts-ignore + console.log(r[0].reason === r[1].reason); // This is `true` + + // The same exception object is thrown to all awaiters + +} + +void main(); + diff --git a/test-timer-async.ts b/test-timer-async.ts new file mode 100644 index 000000000..577059cba --- /dev/null +++ b/test-timer-async.ts @@ -0,0 +1,133 @@ +import { DBTransaction } from '@matrixai/db'; +import Timer from './src/timer/Timer'; + +// `cancellable` + +// function f (options?: { signal?: AbortSignal }) { + +// } + +// Functions that should be timed out + +// This is better +// additionally +// function f(timer: Timer | number = Infinity) { +// timer = (timer instanceof Timer) ? timer : new Timer({ delay: timer }); +// } + +// function g(options?: { signal?: AbortSignal }) { + + + +// } + +// it returns a new Promise() +// class CustomPromise +// then () { return new CustomPromise() } +// so all the functions don't return `this` +// but it's always a new promise +// and that's what we have done here +// which is to say +// as oon as you bind into the Task or into the Timer +// it is no longer a Task or a Timer +// it is infact just a normal Promise +// and it is not the same as the original reference + +class X { + f(tran?: number): number; + f(tran: number): number { + return tran; + } +} + +const x = new X(); + +x.f + + +async function sub() { + const t = new Timer(); + console.log(t instanceof Timer); + return t.then((v) => { + console.log('inside then', v); + return 'DONE'; + }); +} + +// const p = Promise.resolve(1); + +// class CustomPromise extends Promise { +// doSomething() { +// console.log('DID SOMETHING'); +// } +// } + +// async function f(): CustomPromise { +// return CustomPromise.resolve(1); +// // const t = new Timer(); +// // return t; +// } + +// function g() { +// return CustomPromise.resolve(1); +// // const t = new Timer(); +// // return t; +// } + + +async function main () { + + // console.log(f() instanceof CustomPromise); + // console.log(g() instanceof CustomPromise); + + // // @ts-ignore + // // f().doSomething() + // // @ts-ignore + // g().doSomething() + + + // console.log(p === f()); + + // console.log(p === g()); + + // const p = f(); + // console.log(p instanceof Timer); + + // const t = g(); + // console.log(t instanceof Timer); + + + // const p = sub(); + + // console.log(typeof p); + + // console.log(p instanceof Timer); + + // const r = await p; + + // console.log(typeof r); + +} + +void main(); + +// cancel +// cancelAsync + +// implements Promise +// implements CancellablePromise + +// cancellable promise + + + + + +// function allTogether(timer: Timer | number = Infinity, tran?: DBTransaction) { +// if (tran == null) { return this.db.withTransactionF( +// (tran) => t(timer, tran) +// ); +// } +// timer = (timer instanceof Timer) ? timer : new Timer({ delay: timer }); +// } + diff --git a/test-transaction-decorator.ts b/test-transaction-decorator.ts new file mode 100644 index 000000000..701488fb0 --- /dev/null +++ b/test-transaction-decorator.ts @@ -0,0 +1,100 @@ +const AsyncFunction = (async () => {}).constructor; +const GeneratorFunction = function* () {}.constructor; +const AsyncGeneratorFunction = async function* () {}.constructor; + +// function transaction any> (callback: T): T { +// return ((...args: [...any, number]) => { +// // we take all the arguments +// // it is necessary to take the last argument +// // how many arguments are we talking about +// // if it is undefined? +// // how does it work? +// if (args[args.length - 1] === undefined) { +// this.db.withTransactionF( +// (tran) => { +// return f.apply(this, ...args, tran); +// } +// ); + + +// } + +// }) as any; +// } + +// transactional (vs transaction) +// then we use the db +// @transaction(this.db) + +function transaction(db: { a: string }) { + return ( + target: any, + key: string, + descriptor: TypedPropertyDescriptor<(tran: string) => any> + ) => { + const f = descriptor.value; + if (typeof f !== 'function') { + throw new TypeError(`${key} is not a function`); + } + descriptor.value = function (tran: string) { + return f.call(this, tran ?? db.a); + }; + return descriptor; + }; +} + +// function dec(f: () => string) { +// return () => f(); +// } + +// // the method signature takes tran: DBTransaction +// // but the decorator cannot change the type +// // it would still be "required" + +// const task = ( +// target: any, +// propertyKey: string, +// descriptor: TypedPropertyDescriptor<(name: string) => void>, +// ) => { +// const original = descriptor.value +// descriptor.value = function () { +// return original?.call(this, 'Mark') +// } +// return descriptor +// } + + +class X { + + protected y: string = 'parameter'; + + @transaction({a: 'abc'}) + public async foo(name?: string): Promise { + return name! + this.y; + } + + @transaction({a: 'blah'}) + public bar(name?: string): string { + return name!; + } + + // @task + // public done(name: string) { + + // } + +} + +const x = new X(); + +async function main () { + + const r = await x.foo('OVER'); + console.log(r); + + + +} + + +void main(); diff --git a/tests/contexts/decorators/context.test.ts b/tests/contexts/decorators/context.test.ts new file mode 100644 index 000000000..30ff5dedb --- /dev/null +++ b/tests/contexts/decorators/context.test.ts @@ -0,0 +1,27 @@ +import context from '@/contexts/decorators/context'; +import * as contextsUtils from '@/contexts/utils'; + +describe('contexts/utils', () => { + test('context parameter decorator', () => { + class C { + f(@context _a: any) { } + g(_a: any, @context _b: any) { } + h(_a: any, _b: any, @context ..._rest: Array) {} + } + expect(contextsUtils.contexts.get(C.prototype.f)).toBe(0); + expect(contextsUtils.contexts.get(C.prototype.g)).toBe(1); + expect(contextsUtils.contexts.get(C.prototype.h)).toBe(2); + const c = new C(); + expect(contextsUtils.contexts.get(c.f)).toBe(0); + expect(contextsUtils.contexts.get(c.g)).toBe(1); + expect(contextsUtils.contexts.get(c.h)).toBe(2); + }); + test('context parameter decorator can only be used once', () => { + expect(() => { + class C { + f(@context _a: any, @context _b: any) { } + } + new C(); + }).toThrow(TypeError); + }); +}); diff --git a/tests/contexts/decorators/timed.test.ts b/tests/contexts/decorators/timed.test.ts new file mode 100644 index 000000000..3f0239b9a --- /dev/null +++ b/tests/contexts/decorators/timed.test.ts @@ -0,0 +1,97 @@ +import context from '@/contexts/decorators/context'; +import timed from '@/contexts/decorators/timed'; +import Timer from '@/timer/Timer'; +import { + AsyncFunction, + GeneratorFunction, + AsyncGeneratorFunction +} from '@/utils'; + +describe('context/decorators/timed', () => { + test('timed decorator', async () => { + const s = Symbol('sym'); + class X { + a(ctx?: { signal?: AbortSignal; timer?: Timer }, check?: (t: Timer) => any): void; + @timed(1000) + a(@context ctx: { signal: AbortSignal; timer: Timer }, check?: (t: Timer) => any): void { + expect(ctx.signal).toBeInstanceOf(AbortSignal); + expect(ctx.timer).toBeInstanceOf(Timer); + if (check != null) check(ctx.timer); + } + + b(ctx?: { signal?: AbortSignal; timer?: Timer }, check?: (t: Timer) => any): Promise; + @timed(Infinity) + async b(@context ctx: { signal: AbortSignal; timer: Timer }, check?: (t: Timer) => any): Promise { + expect(ctx.signal).toBeInstanceOf(AbortSignal); + expect(ctx.timer).toBeInstanceOf(Timer); + if (check != null) check(ctx.timer); + } + + c(ctx?: { signal?: AbortSignal; timer?: Timer }, check?: (t: Timer) => any): Generator; + @timed(0) + *c(@context ctx: { signal: AbortSignal; timer: Timer }, check?: (t: Timer) => any): Generator { + expect(ctx.signal).toBeInstanceOf(AbortSignal); + expect(ctx.timer).toBeInstanceOf(Timer); + if (check != null) check(ctx.timer); + } + + d(ctx?: { signal?: AbortSignal; timer?: Timer }, check?: (t: Timer) => any): AsyncGenerator; + @timed(NaN) + async *d(@context ctx: { signal: AbortSignal; timer: Timer }, check?: (t: Timer) => any): AsyncGenerator { + expect(ctx.signal).toBeInstanceOf(AbortSignal); + expect(ctx.timer).toBeInstanceOf(Timer); + if (check != null) check(ctx.timer); + } + + [s](ctx?: { signal?: AbortSignal; timer?: Timer }, check?: (t: Timer) => any): void; + @timed() + [s](@context ctx: { signal: AbortSignal; timer: Timer }, check?: (t: Timer) => any): void { + expect(ctx.signal).toBeInstanceOf(AbortSignal); + expect(ctx.timer).toBeInstanceOf(Timer); + if (check != null) check(ctx.timer); + } + } + const x = new X(); + x.a(); + x.a({}); + x.a({ timer: new Timer({ delay: 100 }) }, (t) => { + expect(t.delay).toBe(100); + }); + expect(x.a).toBeInstanceOf(Function); + expect(x.a.name).toBe('a'); + await x.b(); + await x.b({}); + await x.b({ timer: new Timer({ delay: 50 })}, (t) => { + expect(t.delay).toBe(50); + }); + expect(x.b).toBeInstanceOf(AsyncFunction); + expect(x.b.name).toBe('b'); + for (const _ of x.c()) { } + for (const _ of x.c({})) { } + for (const _ of x.c( + { timer: new Timer({ delay: 150 }) }, + (t) => { + expect(t.delay).toBe(150) + } + )) { } + expect(x.c).toBeInstanceOf(GeneratorFunction); + expect(x.c.name).toBe('c'); + for await (const _ of x.d()) { } + for await (const _ of x.d({})) { } + for await (const _ of x.d( + { timer: new Timer({ delay: 200 })}, + (t) => { + expect(t.delay).toBe(200) + } + )) { } + expect(x.d).toBeInstanceOf(AsyncGeneratorFunction); + expect(x.d.name).toBe('d'); + x[s](); + x[s]({}); + x[s]({ timer: new Timer({ delay: 250 }) }, (t) => { + expect(t.delay).toBe(250); + }); + expect(x[s]).toBeInstanceOf(Function); + expect(x[s].name).toBe('[sym]'); + }); +}); diff --git a/tests/tasks/Scheduler.test.ts b/tests/tasks/Scheduler.test.ts new file mode 100644 index 000000000..a8ad8a593 --- /dev/null +++ b/tests/tasks/Scheduler.test.ts @@ -0,0 +1,72 @@ +import os from 'os'; +import path from 'path'; +import fs from 'fs'; +import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; +import { DB } from '@matrixai/db'; +import KeyManager from '@/keys/KeyManager'; +import Queue from '@/tasks/Queue'; +import Scheduler from '@/tasks/Scheduler'; +import * as keysUtils from '@/keys/utils'; +import { globalRootKeyPems } from '../fixtures/globalRootKeyPems'; + +describe(Scheduler.name, () => { + const password = 'password'; + const logger = new Logger(`${Scheduler.name} test`, LogLevel.WARN, [ + new StreamHandler(), + ]); + let keyManager: KeyManager; + let dbKey: Buffer; + let dbPath: string; + let db: DB; + beforeAll(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(os.tmpdir(), 'polykey-test-'), + ); + const keysPath = `${dataDir}/keys`; + keyManager = await KeyManager.createKeyManager({ + password, + keysPath, + logger, + privateKeyPemOverride: globalRootKeyPems[0], + }); + dbKey = await keysUtils.generateKey(); + dbPath = `${dataDir}/db`; + }); + beforeEach(async () => { + db = await DB.createDB({ + dbPath, + logger, + crypto: { + key: dbKey, + ops: { + encrypt: keysUtils.encryptWithKey, + decrypt: keysUtils.decryptWithKey, + }, + }, + }); + }); + afterEach(async () => { + await db.stop(); + await db.destroy(); + }); + test('do it', async () => { + const queue = await Scheduler.createScheduler({ + db, + keyManager, + logger, + }); + + await queue.registerHandler('somename', async () => { + console.log('hi'); + }); + + const result = await queue.pushTask('somename', [], 1000); + + console.log(result); + + + + await queue.stop(); + await queue.destroy(); + }); +}); diff --git a/tests/tasks/utils.test.ts b/tests/tasks/utils.test.ts new file mode 100644 index 000000000..9bf3e1cab --- /dev/null +++ b/tests/tasks/utils.test.ts @@ -0,0 +1,29 @@ +import type { TaskPriority } from '@/tasks/types'; +import * as tasksUtils from '@/tasks/utils'; + +describe('tasks/utils', () => { + test('encode priority from `int8` to flipped `uint8`', () => { + expect(tasksUtils.toPriority(128)).toBe(0); + expect(tasksUtils.toPriority(127)).toBe(0); + expect(tasksUtils.toPriority(126)).toBe(1); + expect(tasksUtils.toPriority(2)).toBe(125); + expect(tasksUtils.toPriority(1)).toBe(126); + expect(tasksUtils.toPriority(0)).toBe(127); + expect(tasksUtils.toPriority(-1)).toBe(128); + expect(tasksUtils.toPriority(-2)).toBe(129); + expect(tasksUtils.toPriority(-127)).toBe(254); + expect(tasksUtils.toPriority(-128)).toBe(255); + expect(tasksUtils.toPriority(-129)).toBe(255); + }); + test('decode from priority from flipped `uint8` to `int8`', () => { + expect(tasksUtils.fromPriority(0 as TaskPriority)).toBe(127); + expect(tasksUtils.fromPriority(1 as TaskPriority)).toBe(126); + expect(tasksUtils.fromPriority(125 as TaskPriority)).toBe(2); + expect(tasksUtils.fromPriority(126 as TaskPriority)).toBe(1); + expect(tasksUtils.fromPriority(127 as TaskPriority)).toBe(0); + expect(tasksUtils.fromPriority(128 as TaskPriority)).toBe(-1); + expect(tasksUtils.fromPriority(129 as TaskPriority)).toBe(-2); + expect(tasksUtils.fromPriority(254 as TaskPriority)).toBe(-127); + expect(tasksUtils.fromPriority(255 as TaskPriority)).toBe(-128); + }); +}); diff --git a/tests/timer/Timer.test.ts b/tests/timer/Timer.test.ts new file mode 100644 index 000000000..e32fed0b8 --- /dev/null +++ b/tests/timer/Timer.test.ts @@ -0,0 +1,109 @@ +import { performance } from 'perf_hooks'; +import { Timer } from '@/timer'; +import * as timerErrors from '@/timer/errors'; +import { sleep } from '@/utils'; + +describe(Timer.name, () => { + test('timer is thenable and awaitable', async () => { + const t1 = new Timer(); + expect(await t1).toBeUndefined(); + expect(t1.status).toBe('resolved'); + const t2 = new Timer(); + await expect(t2).resolves.toBeUndefined(); + expect(t2.status).toBe('resolved'); + }); + test('timer delays', async () => { + const t1 = new Timer({ delay: 20, handler: () => 1 }); + const t2 = new Timer({ delay: 10, handler: () => 2 }); + const result = await Promise.any([t1, t2]); + expect(result).toBe(2); + }); + test('timer handlers', async () => { + const t1 = new Timer({ handler: () => 123 }); + expect(await t1).toBe(123); + expect(t1.status).toBe('resolved'); + const t2 = new Timer({ delay: 100, handler: () => '123' }); + expect(await t2).toBe('123'); + expect(t2.status).toBe('resolved'); + }); + test('timer cancellation', async () => { + const t1 = new Timer({ delay: 100 }); + t1.cancel(); + await expect(t1).rejects.toThrow(timerErrors.ErrorTimerCancelled); + expect(t1.status).toBe('rejected'); + const t2 = new Timer({ delay: 100 }); + const results = await Promise.all([ + (async () => { + try { + await t2; + } catch (e) { + return e; + } + })(), + (async () => { + t2.cancel(); + })() + ]); + expect(results[0]).toBeInstanceOf(timerErrors.ErrorTimerCancelled); + expect(t2.status).toBe('rejected'); + }); + test('timer timestamps', async () => { + const start = new Date(performance.timeOrigin + performance.now()); + await sleep(10); + const t = new Timer({ delay: 100 }); + expect(t.status).toBeNull(); + expect(t.timestamp).toBeAfter(start); + expect(t.scheduled).toBeAfter(start); + expect(t.scheduled).toBeAfterOrEqualTo(t.timestamp); + const delta = t.scheduled!.getTime() - t.timestamp.getTime(); + expect(t.getTimeout()).toBeLessThanOrEqual(delta); + }); + test('timer primitive string and number', () => { + const t1 = new Timer(); + expect(t1.valueOf()).toBe(0); + expect(+t1).toBe(0); + expect(t1.toString()).toBe('0'); + expect(`${t1}`).toBe('0'); + const t2 = new Timer({ delay: 100 }); + expect(t2.valueOf()).toBePositive(); + expect(+t2).toBePositive(); + expect(t2.toString()).toMatch(/\d+/); + expect(`${t2}`).toMatch(/\d+/); + }); + test('timer with infinite delay', async () => { + const t1 = new Timer({ delay: Infinity }); + expect(t1.delay).toBe(Infinity); + expect(t1.scheduled).toBeUndefined(); + expect(t1.getTimeout()).toBe(Infinity); + expect(t1.valueOf()).toBe(Infinity); + expect(+t1).toBe(Infinity); + expect(t1.toString()).toBe('Infinity'); + expect(`${t1}`).toBe('Infinity'); + t1.cancel(); + await expect(t1).rejects.toThrow(timerErrors.ErrorTimerCancelled); + }); + test('timer does not keep event loop alive', async () => { + const f = async (timer: Timer | number = globalThis.maxTimeout) => { + timer = (timer instanceof Timer) ? timer : new Timer({ delay: timer }); + }; + const g = async (timer: Timer | number = Infinity) => { + timer = (timer instanceof Timer) ? timer : new Timer({ delay: timer }); + }; + await f(); + await f(); + await f(); + await g(); + await g(); + await g(); + }); + test('timer lifecycle', async () => { + const t1 = Timer.createTimer({ delay: 1000 }); + await t1.destroy('resolve'); + expect(t1.status).toBe('resolved'); + await expect(t1).resolves.toBeUndefined(); + const t2 = Timer.createTimer({ delay: 1000 }); + await t2.destroy('reject'); + expect(t2.status).toBe('rejected'); + await expect(t2).rejects.toThrow(timerErrors.ErrorTimerCancelled); + }); +}); diff --git a/tests/utils.test.ts b/tests/utils.test.ts index a4de7648b..01c7685ea 100644 --- a/tests/utils.test.ts +++ b/tests/utils.test.ts @@ -2,6 +2,7 @@ import os from 'os'; import path from 'path'; import process from 'process'; import * as utils from '@/utils'; +import PromiseCancellable from '@/utils/PromiseCancellable'; describe('utils', () => { test('getting default node path', () => { diff --git a/tsconfig.build.json b/tsconfig.build.json index 724de4425..22a623378 100644 --- a/tsconfig.build.json +++ b/tsconfig.build.json @@ -8,6 +8,7 @@ "exclude": [ "./tests/**/*", "./scripts/**/*", - "./benches/**/*" + "./benches/**/*", + "./*" ] } diff --git a/tsconfig.json b/tsconfig.json index 2fffd2833..8c6f9511f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -9,12 +9,13 @@ "strictNullChecks": true, "noImplicitAny": false, "experimentalDecorators": true, + "emitDecoratorMetadata": true, "esModuleInterop": true, "allowSyntheticDefaultImports": true, "resolveJsonModule": true, "moduleResolution": "node", "module": "CommonJS", - "target": "ES2021", + "target": "ES2022", "baseUrl": "./src", "paths": { "@": ["index"], @@ -27,7 +28,8 @@ "./src/**/*.json", "./tests/**/*", "./scripts/**/*", - "./benches/**/*" + "./benches/**/*", + "./*" ], "ts-node": { "require": ["tsconfig-paths/register"],