Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Aug 11, 2022
1 parent 1bbea4b commit c715cf3
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 50 deletions.
112 changes: 70 additions & 42 deletions src/tasks/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { DB, DBTransaction, LevelPath } from '@matrixai/db';
import type { TaskIdString } from './types';
import type { PolykeyWorkerManagerInterface } from '../workers/types';
import type { TaskId, TaskIdString, TaskPriority, TaskFunction } from './types';
import type { PromiseDeconstructed } from '../types';
import Logger from '@matrixai/logger';
import {
Expand All @@ -16,35 +15,35 @@ interface Queue extends CreateDestroyStartStop {}
new tasksErrors.ErrorQueueDestroyed(),
)
class Queue {

public static async createQueue({
db,
concurrencyLimit = Number.POSITIVE_INFINITY,
logger = new Logger(this.name),
fresh = false,
}: {
db: DB;
concurrencyLimit?: number;
logger?: Logger;
fresh?: boolean;
}) {
logger.info(`Creating ${this.name}`);
const queue = new this({ db, logger });
const queue = new this({ db, concurrencyLimit, logger });
await queue.start({ fresh });
logger.info(`Created ${this.name}`);
return queue;
}

public concurrencyLimit: number;

protected logger: Logger;
protected db: DB;
protected workerManager?: PolykeyWorkerManagerInterface;
protected queueDbPath: LevelPath = [this.constructor.name];

// when the queue to execute the tasks
// the task id is generated outside
// you don't get a task id here
// you just "push" tasks there to be executed
// this is the "shared" set of promises to be maintained
protected promises: Map<TaskIdString, PromiseDeconstructed<any>> = new Map();

// /**
// * Listeners for task execution
// * When a task is executed, these listeners are synchronously executed
Expand All @@ -54,23 +53,18 @@ class Queue {

public constructor({
db,
concurrencyLimit,
logger
}: {
db: DB;
concurrencyLimit: number,
logger: Logger
}) {
this.logger = logger;
this.concurrencyLimit = concurrencyLimit;
this.db = db;
}

public setWorkerManager(workerManager: PolykeyWorkerManagerInterface) {
this.workerManager = workerManager;
}

public unsetWorkerManager() {
delete this.workerManager;
}

public async start({
fresh = false,
}: {
Expand All @@ -96,6 +90,40 @@ class Queue {

// promises are "connected" to events

// when tasks are "dispatched" to the queue
// they are actually put into a persistent system
// then we proceed to execution

// a task here is a function
// this is already managed by the Scheduler
// along with the actual function itself?
// we also have a priority

// t is a task
// but it's actually just a function
// and in this case
// note that we are "passing" in the parameters at this point
// but it is any function
// () => taskHandler(parameters)

// it returns a "task"
// that should be used like a "lazy" promise
// the actual task function depends on the situation
// don't we need to know actual metadata
// wait a MINUTE
// if we are "persisting" it
// do we persist it here?

public async pushTask(taskF: TaskFunction, priority: TaskPriority) {
// this needs to proceed to push it into an in-memory queue
// and maintain a concurrency limit?
// my issue is that whatever does the persistence
// will need to execute it with the parmaeters and the task handler
// so by the time it is in memory as a `taskF`
// then no persistence makes sense anymore
}


/**
* IF a handler does not exist
* if the task is executed
Expand All @@ -104,33 +132,33 @@ class Queue {
* if it doesn't exist, then it's just a reference exception in general, this can be logged
* There's nothing else to do
*/
@ready(new tasksErrors.ErrorSchedulerNotRunning())
protected registerListener(
taskId: TaskId,
taskListener: TaskListener
): void {
const taskIdString = taskId.toString() as TaskIdString;
const taskListeners = this.listeners.get(taskIdString);
if (taskListeners != null) {
taskListeners.push(taskListener);
} else {
this.listeners.set(taskIdString, [taskListener]);
}
}

@ready(new tasksErrors.ErrorSchedulerNotRunning())
protected deregisterListener(
taskId: TaskId,
taskListener: TaskListener
): void {
const taskIdString = taskId.toString() as TaskIdString;
const taskListeners = this.listeners.get(taskIdString);
if (taskListeners == null || taskListeners.length < 1) return;
const index = taskListeners.indexOf(taskListener);
if (index !== -1) {
taskListeners.splice(index, 1);
}
}
// @ready(new tasksErrors.ErrorSchedulerNotRunning())
// protected registerListener(
// taskId: TaskId,
// taskListener: TaskListener
// ): void {
// const taskIdString = taskId.toString() as TaskIdString;
// const taskListeners = this.listeners.get(taskIdString);
// if (taskListeners != null) {
// taskListeners.push(taskListener);
// } else {
// this.listeners.set(taskIdString, [taskListener]);
// }
// }

// @ready(new tasksErrors.ErrorSchedulerNotRunning())
// protected deregisterListener(
// taskId: TaskId,
// taskListener: TaskListener
// ): void {
// const taskIdString = taskId.toString() as TaskIdString;
// const taskListeners = this.listeners.get(taskIdString);
// if (taskListeners == null || taskListeners.length < 1) return;
// const index = taskListeners.indexOf(taskListener);
// if (index !== -1) {
// taskListeners.splice(index, 1);
// }
// }

}

Expand Down
8 changes: 0 additions & 8 deletions src/tasks/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,6 @@ class Scheduler {
return this.processingTimer != null;
}

public setWorkerManager(workerManager: PolykeyWorkerManagerInterface): void {
this.queue?.setWorkerManager(workerManager);
}

public unsetWorkerManager(): void {
this.queue?.unsetWorkerManager();
}

public async start({
handlers = {},
delay = false,
Expand Down
12 changes: 12 additions & 0 deletions src/tasks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ type TaskHandler<
R = any
> = (...params: P) => Promise<R>;

type TaskPriority = number;


/**
* Task function is the result of a lambda abstraction of applying
* `TaskHandler` to its respective parameters
* This is what gets executed
*/
type TaskFunction<T> = () => Promise<T>;


// type TaskListener = Callback<[taskResult: any], void>;
// Make Task something that can be awaited on
// but when you "make" a promise or reference it
Expand Down Expand Up @@ -68,5 +79,6 @@ export type {
TaskInfo,
TaskHandlerId,
TaskHandler,
TaskPriority,
// TaskListener
};

0 comments on commit c715cf3

Please sign in to comment.