Skip to content

Commit

Permalink
wip prototyping queue, tasks and timer
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Aug 8, 2022
1 parent 3c1b8c8 commit d1f2217
Show file tree
Hide file tree
Showing 13 changed files with 1,248 additions and 10 deletions.
20 changes: 10 additions & 10 deletions benches/gitgc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import path from 'path';
import b from 'benny';
import { suiteCommon } from './utils';

async function main () {
async function main() {
let map = new Map();
let obj = {};
let arr: any = [];
Expand All @@ -18,10 +18,10 @@ async function main () {
for (let i = 0; i < 1000; i++) {
map.delete(i);
}
for (const i of map) {
for (const _i of map) {
// NOOP
}
}
};
}),
b.add('obj', async () => {
obj = {};
Expand All @@ -32,26 +32,26 @@ async function main () {
for (let i = 0; i < 1000; i++) {
delete obj[i];
}
for (const i in obj) {
for (const _i in obj) {
// NOOP
}
};
}),
b.add('arr', async () => {
// you first have to count the number of objects
// You first have to count the number of objects
arr = [];
return async () => {
// you have to iterate for each object
// You have to iterate for each object
// then for each value in length
for (let i = 0; i < 1000; i++) {
if (i === arr.length) {
// double the vector
// Double the vector
arr.length = arr.length * 2 || 2;
}
arr[i] = { id: i, mark: false };
// arr.push({ id: i, mark: false});
// Arr.push({ id: i, mark: false});
}
// this has to iterate the length of the array
// This has to iterate the length of the array
// but stop as soon as it reaches the end
// it gets complicate, but for 5x improvement
// it could be interesting
Expand All @@ -74,7 +74,7 @@ async function main () {
for (let i = 0; i < 1000; i++) {
set.delete(i);
}
for (const i of set) {
for (const _i of set) {
// NOOP
}
};
Expand Down
165 changes: 165 additions & 0 deletions src/tasks/Queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import type { DB, DBTransaction, LevelPath } from '@matrixai/db';
import type { TaskIdString } from './types';
import type { PolykeyWorkerManagerInterface } from '../workers/types';
import type { PromiseDeconstructed } from '../types';
import Logger from '@matrixai/logger';
import {
CreateDestroyStartStop,
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import * as tasksUtils from './utils';
import * as tasksErrors from './errors';

interface Queue extends CreateDestroyStartStop {}
@CreateDestroyStartStop(
new tasksErrors.ErrorQueueRunning(),
new tasksErrors.ErrorQueueDestroyed(),
)
class Queue {

public static async createQueue({
db,
logger = new Logger(this.name),
fresh = false,
}: {
db: DB;
logger?: Logger;
fresh?: boolean;
}) {
logger.info(`Creating ${this.name}`);
const queue = new this({ db, logger });
await queue.start({ fresh });
logger.info(`Created ${this.name}`);
return queue;
}

protected logger: Logger;
protected db: DB;
protected workerManager?: PolykeyWorkerManagerInterface;
protected queueDbPath: LevelPath = [this.constructor.name];

// when the queue to execute the tasks
// the task id is generated outside
// you don't get a task id here
// you just "push" tasks there to be executed
// this is the "shared" set of promises to be maintained
protected promises: Map<TaskIdString, PromiseDeconstructed<any>> = new Map();

// /**
// * Listeners for task execution
// * When a task is executed, these listeners are synchronously executed
// * The listeners are intended for resolving or rejecting task promises
// */
// protected listeners: Map<TaskIdString, Array<TaskListener>> = new Map();

public constructor({
db,
logger
}: {
db: DB;
logger: Logger
}) {
this.logger = logger;
this.db = db;
}

public setWorkerManager(workerManager: PolykeyWorkerManagerInterface) {
this.workerManager = workerManager;
}

public unsetWorkerManager() {
delete this.workerManager;
}

public async start({
fresh = false,
}: {
fresh?: boolean;
} = {}): Promise<void> {
this.logger.info(`Starting ${this.constructor.name}`);
if (fresh) {
await this.db.clear(this.queueDbPath);
}
this.logger.info(`Started ${this.constructor.name}`);
}

public async stop(): Promise<void> {
this.logger.info(`Stopping ${this.constructor.name}`);
this.logger.info(`Stopped ${this.constructor.name}`);
}

public async destroy() {
this.logger.info(`Destroying ${this.constructor.name}`);
await this.db.clear(this.queueDbPath);
this.logger.info(`Destroyed ${this.constructor.name}`);
}

// promises are "connected" to events

/**
* IF a handler does not exist
* if the task is executed
* then an exception is thrown
* if listener exists, the exception is passed into this listener function
* if it doesn't exist, then it's just a reference exception in general, this can be logged
* There's nothing else to do
*/
@ready(new tasksErrors.ErrorSchedulerNotRunning())
protected registerListener(
taskId: TaskId,
taskListener: TaskListener
): void {
const taskIdString = taskId.toString() as TaskIdString;
const taskListeners = this.listeners.get(taskIdString);
if (taskListeners != null) {
taskListeners.push(taskListener);
} else {
this.listeners.set(taskIdString, [taskListener]);
}
}

@ready(new tasksErrors.ErrorSchedulerNotRunning())
protected deregisterListener(
taskId: TaskId,
taskListener: TaskListener
): void {
const taskIdString = taskId.toString() as TaskIdString;
const taskListeners = this.listeners.get(taskIdString);
if (taskListeners == null || taskListeners.length < 1) return;
const index = taskListeners.indexOf(taskListener);
if (index !== -1) {
taskListeners.splice(index, 1);
}
}

}

export default Queue;


// epic queue
// need to do a couple things:
// 1. integrate fast-check
// 2. integrate span checks
// 3. might also consider span logs?
// 4. open tracing observability
// 5. structured logging
// 6. async hooks to get traced promises to understand the situation
// 7. do we also get fantasy land promises? and async cancellable stuff?
// 8. task abstractions?
// need to use the db for this
// 9. priority structure
// 10. timers
// abort controller

// kinetic data structure
// the priority grows as a function of time
// order by priority <- this thing has a static value
// in a key value DB, you can maintain sorted index of values
// IDs can be lexicographically sortable

// this is a persistent queue
// of tasks that should be EXECUTED right now
// the scheduler is a persistent scheduler of scheduled tasks
// tasks get pushed from the scheduler into the queue
// the queue connects to the WorkerManager
Loading

0 comments on commit d1f2217

Please sign in to comment.