From c3bc60e215bcd72f9e4b0def84d3e9d779667cb7 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Tue, 6 Sep 2022 13:35:40 +1000 Subject: [PATCH] wip: refreshBuckets --- src/PolykeyAgent.ts | 73 ++++---------- src/nodes/NodeManager.ts | 213 ++++----------------------------------- src/tasks/Tasks.ts | 79 ++++++++++++--- 3 files changed, 107 insertions(+), 258 deletions(-) diff --git a/src/PolykeyAgent.ts b/src/PolykeyAgent.ts index 99ff241b5d..543175f0da 100644 --- a/src/PolykeyAgent.ts +++ b/src/PolykeyAgent.ts @@ -35,9 +35,7 @@ import * as errors from './errors'; import * as utils from './utils'; import * as keysUtils from './keys/utils'; import * as nodesUtils from './nodes/utils'; -import Scheduler from './tasks/Scheduler'; -// FIXME: update name to queue, currently conflicts -import TaskQueue from './tasks/Queue'; +import Tasks from './tasks/Tasks'; type NetworkConfig = { forwardHost?: Host; @@ -90,8 +88,7 @@ class PolykeyAgent { acl, gestaltGraph, proxy, - taskQueue, - taskScheduler, + tasks, nodeGraph, queue, nodeConnectionManager, @@ -139,8 +136,7 @@ class PolykeyAgent { acl?: ACL; gestaltGraph?: GestaltGraph; proxy?: Proxy; - taskQueue?: TaskQueue; - taskScheduler?: Scheduler; + tasks?: Tasks; nodeGraph?: NodeGraph; queue?: Queue; nodeConnectionManager?: NodeConnectionManager; @@ -292,24 +288,13 @@ class PolykeyAgent { keyManager, logger: logger.getChild(NodeGraph.name), })); - taskQueue = - taskQueue ?? - (await TaskQueue.createQueue({ + tasks = + tasks ?? + (await Tasks.createTasks({ + activeLimit: 5, db, - keyManager, - concurrencyLimit: 3, - delay: true, - fresh, - handlers: {}, - logger, - })); - taskScheduler = - taskScheduler ?? - (await Scheduler.createScheduler({ - db, - queue: taskQueue, - delay: true, fresh, + lazy: true, logger, })); queue = @@ -336,8 +321,7 @@ class PolykeyAgent { keyManager, nodeGraph, nodeConnectionManager, - scheduler: taskScheduler, - taskQueue, + tasks, logger: logger.getChild(NodeManager.name), }); await nodeManager.start(); @@ -401,8 +385,7 @@ class PolykeyAgent { await notificationsManager?.stop(); await vaultManager?.stop(); await discovery?.stop(); - await taskScheduler?.stop(); - await taskQueue?.stop(); + await tasks?.stop(); await proxy?.stop(); await gestaltGraph?.stop(); await acl?.stop(); @@ -426,8 +409,7 @@ class PolykeyAgent { gestaltGraph, proxy, nodeGraph, - taskQueue, - taskScheduler, + tasks, queue, nodeConnectionManager, nodeManager, @@ -461,8 +443,7 @@ class PolykeyAgent { public readonly gestaltGraph: GestaltGraph; public readonly proxy: Proxy; public readonly nodeGraph: NodeGraph; - public readonly taskQueue: TaskQueue; - public readonly taskScheduler: Scheduler; + public readonly tasks: Tasks; public readonly queue: Queue; public readonly nodeConnectionManager: NodeConnectionManager; public readonly nodeManager: NodeManager; @@ -488,8 +469,7 @@ class PolykeyAgent { gestaltGraph, proxy, nodeGraph, - taskQueue, - taskScheduler, + tasks, queue, nodeConnectionManager, nodeManager, @@ -514,8 +494,7 @@ class PolykeyAgent { gestaltGraph: GestaltGraph; proxy: Proxy; nodeGraph: NodeGraph; - taskQueue: TaskQueue; - taskScheduler: Scheduler; + tasks: Tasks; queue: Queue; nodeConnectionManager: NodeConnectionManager; nodeManager: NodeManager; @@ -542,8 +521,7 @@ class PolykeyAgent { this.proxy = proxy; this.discovery = discovery; this.nodeGraph = nodeGraph; - this.taskQueue = taskQueue; - this.taskScheduler = taskScheduler; + this.tasks = tasks; this.queue = queue; this.nodeConnectionManager = nodeConnectionManager; this.nodeManager = nodeManager; @@ -707,8 +685,7 @@ class PolykeyAgent { proxyPort: networkConfig_.proxyPort, tlsConfig, }); - await this.taskQueue.start({ fresh }); - await this.taskScheduler.start({ fresh }); + await this.tasks.start({ fresh, lazy: true }); await this.queue.start(); await this.nodeManager.start(); await this.nodeConnectionManager.start({ nodeManager: this.nodeManager }); @@ -718,8 +695,7 @@ class PolykeyAgent { await this.vaultManager.start({ fresh }); await this.notificationsManager.start({ fresh }); await this.sessionManager.start({ fresh }); - await this.taskQueue.startTasks(); - await this.taskScheduler.startDispatching(); + await this.tasks.startProcessing(); await this.status.finishStart({ pid: process.pid, nodeId: this.keyManager.getNodeId(), @@ -737,15 +713,13 @@ class PolykeyAgent { this.logger.warn(`Failed Starting ${this.constructor.name}`); this.events.removeAllListeners(); await this.status?.beginStop({ pid: process.pid }); - await this.taskScheduler.stopDispatching(); - await this.taskQueue.stopTasks(); + await this.tasks?.stopProcessing(); await this.sessionManager?.stop(); await this.notificationsManager?.stop(); await this.vaultManager?.stop(); await this.discovery?.stop(); await this.queue?.stop(); - await this.taskScheduler?.stop(); - await this.taskQueue?.stop(); + await this.tasks?.stop(); await this.nodeGraph?.stop(); await this.nodeConnectionManager?.stop(); await this.nodeManager?.stop(); @@ -771,8 +745,7 @@ class PolykeyAgent { this.logger.info(`Stopping ${this.constructor.name}`); this.events.removeAllListeners(); await this.status.beginStop({ pid: process.pid }); - await this.taskScheduler.stopDispatching(); - await this.taskQueue.stopTasks(); + await this.tasks.stopProcessing(); await this.sessionManager.stop(); await this.notificationsManager.stop(); await this.vaultManager.stop(); @@ -781,8 +754,7 @@ class PolykeyAgent { await this.nodeGraph.stop(); await this.nodeManager.stop(); await this.queue.stop(); - await this.taskScheduler?.stop(); - await this.taskQueue?.stop(); + await this.tasks.stop(); await this.proxy.stop(); await this.grpcServerAgent.stop(); await this.grpcServerClient.stop(); @@ -807,8 +779,7 @@ class PolykeyAgent { await this.discovery.destroy(); await this.nodeGraph.destroy(); await this.gestaltGraph.destroy(); - await this.taskScheduler.destroy(); - await this.taskQueue.destroy(); + await this.tasks.destroy(); await this.acl.destroy(); await this.sigchain.destroy(); await this.identitiesManager.destroy(); diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index b33fddc6b7..49b1837c96 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -13,7 +13,8 @@ import type { DBTransaction } from '@matrixai/db'; import type { NodeId, NodeAddress } from '../nodes/types'; import type Scheduler from '../tasks/Scheduler'; import type TaskQueue from '../tasks/Queue'; -import type { TaskHandlerId } from '../tasks/types'; +import type Tasks from '../tasks/Tasks'; +import type { Task, TaskHandlerId } from '../tasks/types'; import Logger from '@matrixai/logger'; import { StartStop, ready } from '@matrixai/async-init/dist/StartStop'; import * as nodesErrors from './errors'; @@ -35,22 +36,9 @@ class NodeManager { protected keyManager: KeyManager; protected nodeConnectionManager: NodeConnectionManager; protected nodeGraph: NodeGraph; - protected scheduler: Scheduler; - protected taskQueue: TaskQueue; - // Refresh bucket timer - protected refreshBucketDeadlineMap: Map = new Map(); - protected refreshBucketTimer: NodeJS.Timer; - protected refreshBucketNext: NodeBucketIndex; - public readonly refreshBucketTimerDefault; - protected refreshBucketQueue: Set = new Set(); - protected refreshBucketQueueRunning: boolean = false; - protected refreshBucketQueueRunner: Promise; - protected refreshBucketQueuePlug_: PromiseDeconstructed = promise(); - protected refreshBucketQueueDrained_: PromiseDeconstructed = promise(); - protected refreshBucketQueuePause_: PromiseDeconstructed = promise(); - protected refreshBucketQueueAbortController: AbortController; + protected tasks: Tasks; + public readonly setNodeHandlerId = 'setNode' as TaskHandlerId; - public readonly setNodeHanderId = 'setNode' as TaskHandlerId; constructor({ db, @@ -58,8 +46,7 @@ class NodeManager { sigchain, nodeConnectionManager, nodeGraph, - scheduler, - taskQueue, + tasks, refreshBucketTimerDefault = 3600000, // 1 hour in milliseconds logger, }: { @@ -68,8 +55,7 @@ class NodeManager { sigchain: Sigchain; nodeConnectionManager: NodeConnectionManager; nodeGraph: NodeGraph; - scheduler: Scheduler; - taskQueue: TaskQueue; + tasks: Tasks; refreshBucketTimerDefault?: number; logger?: Logger; }) { @@ -79,8 +65,7 @@ class NodeManager { this.sigchain = sigchain; this.nodeConnectionManager = nodeConnectionManager; this.nodeGraph = nodeGraph; - this.scheduler = scheduler; - this.taskQueue = taskQueue; + this.tasks = tasks; this.refreshBucketTimerDefault = refreshBucketTimerDefault; } @@ -89,8 +74,8 @@ class NodeManager { this.startRefreshBucketTimers(); this.refreshBucketQueueRunner = this.startRefreshBucketQueue(); this.logger.info(`Registering handler for setNode`); - this.taskQueue.registerHandler( - this.setNodeHanderId, + this.tasks.registerHandler( + this.setNodeHandlerId, async (nodeId, nodeAddress, timeout) => this.setNode(nodeId, nodeAddress, true, false, timeout), ); @@ -102,7 +87,7 @@ class NodeManager { await this.stopRefreshBucketTimers(); await this.stopRefreshBucketQueue(); this.logger.info(`Unregistering handler for setNode`); - this.taskQueue.deregisterHandler(this.setNodeHanderId); + this.tasks.deregisterHandler(this.setNodeHandlerId); this.logger.info(`Stopped ${this.constructor.name}`); } @@ -495,15 +480,12 @@ class NodeManager { )} to queue`, ); // Re-attempt this later asynchronously by adding to the scheduler - await this.scheduler.scheduleTask( - this.setNodeHanderId, - [nodeId, nodeAddress, timeout], - undefined, - undefined, - ['setNode'], - true, - tran, - ); + await this.tasks.scheduleTask({ + handlerId: this.setNodeHandlerId, + parameters: [nodeId, nodeAddress, timeout], + path: ['setNode'], + lazy: true, + }, tran) } } } @@ -597,166 +579,15 @@ class NodeManager { await this.nodeConnectionManager.findNode(bucketRandomNodeId, { signal }); } - // Refresh bucket activity timer methods - - private startRefreshBucketTimers() { - // Setting initial bucket to refresh - this.refreshBucketNext = 0; - // Setting initial deadline - this.refreshBucketTimerReset(this.refreshBucketTimerDefault); - - for ( - let bucketIndex = 0; - bucketIndex < this.nodeGraph.nodeIdBits; - bucketIndex++ - ) { - const deadline = Date.now() + this.refreshBucketTimerDefault; - this.refreshBucketDeadlineMap.set(bucketIndex, deadline); + private async setupRefreshBucketTasks() { + // 1. Iterate over existing tasks and reset the delay + const existingTasks: Array = new Array(this.nodeGraph.nodeIdBits).fill(false); + for await (const [taskPath, task] of this.tasks.getTasksByPath(['refreshBucket'])) { + const bucket = } - } - - private async stopRefreshBucketTimers() { - clearTimeout(this.refreshBucketTimer); - } - private refreshBucketTimerReset(timeout: number) { - clearTimeout(this.refreshBucketTimer); - this.refreshBucketTimer = setTimeout(() => { - this.refreshBucketRefreshTimer(); - }, timeout); - } - - public refreshBucketUpdateDeadline(bucketIndex: NodeBucketIndex) { - // Update the map deadline - this.refreshBucketDeadlineMap.set( - bucketIndex, - Date.now() + this.refreshBucketTimerDefault, - ); - // If the bucket was pending a refresh we remove it - this.refreshBucketQueueRemove(bucketIndex); - if (bucketIndex === this.refreshBucketNext) { - // Bucket is same as next bucket, this affects the timer - this.refreshBucketRefreshTimer(); - } - } - - private refreshBucketRefreshTimer() { - // Getting new closest deadline - let closestBucket = this.refreshBucketNext; - let closestDeadline = Date.now() + this.refreshBucketTimerDefault; - const now = Date.now(); - for (const [bucketIndex, deadline] of this.refreshBucketDeadlineMap) { - // Skip any queued buckets marked by 0 deadline - if (deadline === 0) continue; - if (deadline <= now) { - // Deadline for this has already passed, we add it to the queue - this.refreshBucketQueueAdd(bucketIndex); - continue; - } - if (deadline < closestDeadline) { - closestBucket = bucketIndex; - closestDeadline = deadline; - } - } - // Working out time left - const timeout = closestDeadline - Date.now(); - this.logger.debug( - `Refreshing refreshBucket timer with new timeout ${timeout}`, - ); - // Updating timer and next - this.refreshBucketNext = closestBucket; - this.refreshBucketTimerReset(timeout); - } - - // Refresh bucket async queue methods - - public refreshBucketQueueAdd(bucketIndex: NodeBucketIndex) { - this.logger.debug(`Adding bucket ${bucketIndex} to queue`); - this.refreshBucketDeadlineMap.set(bucketIndex, 0); - this.refreshBucketQueue.add(bucketIndex); - this.refreshBucketQueueUnplug(); - } - - public refreshBucketQueueRemove(bucketIndex: NodeBucketIndex) { - this.logger.debug(`Removing bucket ${bucketIndex} from queue`); - this.refreshBucketQueue.delete(bucketIndex); - } - - public async refreshBucketQueueDrained() { - await this.refreshBucketQueueDrained_.p; - } - - public refreshBucketQueuePause() { - this.logger.debug('Pausing refreshBucketQueue'); - this.refreshBucketQueuePause_ = promise(); - } - - public refreshBucketQueueResume() { - this.logger.debug('Resuming refreshBucketQueue'); - this.refreshBucketQueuePause_.resolveP(); - } - - private async startRefreshBucketQueue(): Promise { - this.refreshBucketQueueRunning = true; - this.refreshBucketQueuePlug(); - this.refreshBucketQueueResume(); - let iterator: IterableIterator | undefined; - this.refreshBucketQueueAbortController = new AbortController(); - const pace = async () => { - // Wait if paused - await this.refreshBucketQueuePause_.p; - // Wait for plug - await this.refreshBucketQueuePlug_.p; - if (iterator == null) { - iterator = this.refreshBucketQueue[Symbol.iterator](); - } - return this.refreshBucketQueueRunning; - }; - while (await pace()) { - const bucketIndex: NodeBucketIndex = iterator?.next().value; - if (bucketIndex == null) { - // Iterator is empty, plug and continue - iterator = undefined; - this.refreshBucketQueuePlug(); - continue; - } - // Do the job - this.logger.debug( - `processing refreshBucket for bucket ${bucketIndex}, ${this.refreshBucketQueue.size} left in queue`, - ); - try { - await this.refreshBucket(bucketIndex, { - signal: this.refreshBucketQueueAbortController.signal, - }); - } catch (e) { - if (e instanceof nodesErrors.ErrorNodeAborted) break; - throw e; - } - // Remove from queue and update bucket deadline - this.refreshBucketQueue.delete(bucketIndex); - this.refreshBucketUpdateDeadline(bucketIndex); - } - this.logger.debug('startRefreshBucketQueue has ended'); - } - - private async stopRefreshBucketQueue(): Promise { - // Flag end and await queue finish - this.refreshBucketQueueAbortController.abort(); - this.refreshBucketQueueRunning = false; - this.refreshBucketQueueUnplug(); - this.refreshBucketQueueResume(); - } - - private refreshBucketQueuePlug() { - this.logger.debug('refresh bucket queue has plugged'); - this.refreshBucketQueuePlug_ = promise(); - this.refreshBucketQueueDrained_?.resolveP(); - } - private refreshBucketQueueUnplug() { - this.logger.debug('refresh bucket queue has unplugged'); - this.refreshBucketQueueDrained_ = promise(); - this.refreshBucketQueuePlug_?.resolveP(); + // 2. Recreate any missing tasks for buckets } } diff --git a/src/tasks/Tasks.ts b/src/tasks/Tasks.ts index f60965ea08..4c4dd19a73 100644 --- a/src/tasks/Tasks.ts +++ b/src/tasks/Tasks.ts @@ -186,7 +186,8 @@ class Tasks { // AND if it is cancelled, then it will be settled // so it is guaranteed to settle // tasks can only be cancelled, not removed - protected taskPromises: Map> = new Map(); + protected taskPromises: Map> = + new Map(); protected activePromises: Map> = new Map(); @@ -226,7 +227,7 @@ class Tasks { this.handlers.clear(); await this.db.clear(this.tasksDbPath); } - this.batchLimit = batchLimit != null ? Math.max(batchLimit, 1) : undefined; + this.batchLimit = batchLimit != null ? Math.max(batchLimit, 1) : undefined; const lastTaskId = await this.getLastTaskId(); this.generateTaskId = tasksUtils.createTaskIdGenerator(lastTaskId); for (const taskHandlerId in handlers) { @@ -447,14 +448,14 @@ class Tasks { }); let promise: () => Promise; if (lazy) { - // create promise when called + // Create promise when called promise = () => this.getTaskPromise(taskId); } else { - // pre-creating the promise and returning that when called + // Pre-creating the promise and returning that when called const newPromise = this.getTaskPromise(taskId, tran); tran.queueFailure((e) => { newPromise.cancel(e); - }) + }); promise = () => newPromise; } this.logger.debug( @@ -476,7 +477,10 @@ class Tasks { } @ready(new tasksErrors.ErrorSchedulerNotRunning()) - public getTaskPromise(taskId: TaskId, tran?: DBTransaction): PromiseCancellable { + public getTaskPromise( + taskId: TaskId, + tran?: DBTransaction, + ): PromiseCancellable { const taskIdString = taskId.toString() as TaskIdString; // This will return a task promise if it already exists const existingTaskPromise = this.taskPromises.get(taskIdString); @@ -489,8 +493,12 @@ class Tasks { // this.cancelTask(taskId) // FIXME: this reject is temporary - reject(new Error('TMP fast rejection for edge case, remove me when task cancellation is implemented')) - } + reject( + new Error( + 'TMP fast rejection for edge case, remove me when task cancellation is implemented', + ), + ); + }; signal.addEventListener('abort', abortListener); const resultListener = (event: TaskEvent) => { signal.removeEventListener('abort', abortListener); @@ -506,12 +514,13 @@ class Tasks { .then( (taskData) => { if (taskData == null) { - this.taskEvents.removeEventListener( - taskIdString, - resultListener, - ); + this.taskEvents.removeEventListener(taskIdString, resultListener); signal.removeEventListener('abort', abortListener); - reject(new tasksErrors.ErrorTaskMissing(`task ${taskId.toMultibase('base32hex')} was not found`)); + reject( + new tasksErrors.ErrorTaskMissing( + `task ${taskId.toMultibase('base32hex')} was not found`, + ), + ); } }, (reason) => reject(reason), @@ -523,6 +532,38 @@ class Tasks { return newTaskPromise; } + // FIXME: Stub for usage, may be replaced. + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async getTask( + taskId: TaskId, + lazy: boolean = false, + tran?: DBTransaction, + ): Promise { + throw Error('IMP, not implemented'); + } + + // FIXME: Stub for usage, may be replaced. + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async *getTasksByPath( + path: TaskPath, + lazy: boolean = false, + tran?: DBTransaction, + ): AsyncGenerator<[TaskPath, Task]> { + if (tran == null) { + return yield* this.db.withTransactionG((tran) => + this.getTasksByPath(path, lazy, tran), + ); + } + + for await (const [taskPath, taskIdBuffer] of tran.iterator([ + ...this.tasksPathDbPath, + ...path, + ])) { + const taskId = IdInternal.fromBuffer(taskIdBuffer); + yield [taskPath as TaskPath, await this.getTask(taskId, lazy, tran)]; + } + } + /** * Transition tasks from `scheduled` to `queued` */ @@ -597,7 +638,7 @@ class Tasks { } while (queuedTasks >= (this.batchLimit ?? Infinity)); await this.db.withTransactionF(async (tran) => { - // When the transaction commits + // When the transaction commits // trigger the queue // tran.queueSuccess(() => { // this.triggerQueuing(); @@ -940,7 +981,11 @@ class Tasks { // Removing it from active index await tran.del([...this.tasksActiveDbPath, ...kP]); const taskId = IdInternal.fromBuffer(taskIdBuffer); - this.logger.warn(`task ${taskId.toMultibase('base32hex')} was moved from Active to Queued tasks`) + this.logger.warn( + `task ${taskId.toMultibase( + 'base32hex', + )} was moved from Active to Queued tasks`, + ); } // 2. Check for any tasks in the scheduled index that are in the @@ -963,7 +1008,9 @@ class Tasks { // Remove await tran.del([...this.tasksScheduledDbPath, ...kP]); const taskId = IdInternal.fromBuffer(taskIdBuffer); - this.logger.warn(`Duplicate task ${taskId.toMultibase} was removed from scheduled tasks`) + this.logger.warn( + `Duplicate task ${taskId.toMultibase} was removed from scheduled tasks`, + ); } else { // Break from the loop, there shouldn't be more break;