Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature TaskManager Scheduler and Queue and Context Decorators for Timed and Cancellable #438

Merged
merged 32 commits into from
Sep 13, 2022

Conversation

tegefaulkes
Copy link
Contributor

@tegefaulkes tegefaulkes commented Aug 8, 2022

Description

At the same time, we may apply the DB to new a generic queuing system. Automatic indexing still isn't available yet, and we may only provide that after transparent encryption/decryption is implemented in the DB. So for now any DB usage has to continue using its own indexes. However SI transactions will help realise this, since we can now be user that any reads and writes are all consistent within the transaction body.

Queue does not use WorkerManager, it doesn't know about the nature of the task, instead it executes tasks according to a user-specified concurrency limit, task handlers can decide on their own accord to execute tasks by passing it to the WorkerManager for parallel processing, this allows tasks to be IO-bound to be sent into the scheduler/queue without worrying about it being inefficiently sent to the worker manager

Issues Fixed

Tasks

Asynchronous Promise Cancellation and Timer:

  • 1. Timer class with integration into PromiseCancellable
  • 2. tests for Timer
    • timer can be cancelled without unhandled promise rejection
    • timer defaults to eager but can also be lazily cancelled
    • timer does not keep the event loop live
  • 3. @timed decorator using Timer and AbortSignal
  • 4. tests for the timed decorator testing Signal and AbortController
    • runtime validation
    • decorator syntax
    • decorator expiry
    • decorator propagation
    • explicit cancellation or signal abortion
    • timed error stack provides information on where the decorated function is called
  • 4. timed HOF variant to allow function composition
  • 5. @cancellable decorator using AbortSignal and PromiseCancellable
  • 6. tests for the cancellable decorator
  • 7. cancellable HOF variant
  • [ ] 8. @transactional decorator using DBTransaction - not doing due to Feature TaskManager Scheduler and Queue and Context Decorators for Timed and Cancellable #438 (comment)
  • [ ] 9. tests for @transactional
  • [ ] 10. transactional HOF variant
  • 11. timedCancellable as he combination of timed and cancellable

Tasks System:

  • 1. Scheduler
    • Processing task loop has to involve a while loop (even if it is using an async generator)
    • Processing task loop should be blocked based on a Lock from async-locks
    • Use the term "dispatch" for when the processing task loop dispatches the task to the queue and it has to start an independent transaction context here
    • Processing task loop has to be serialised, and the loop should be started or unblocked when there there a newly scheduled tasks AND when tasks already exist in the scheduled DB
    • Processing task loop should be blocked when it has nothing to do, and the time of blocking should depend on a setTimeout where the delay is set to the "next" task to be scheduled
    • Each iteration of the processing task loop should batch-read tasks from the scheduled index sublevel using the async iterable, and on each read, it should immediately dispatch to to the queue, this should be done with await, but take care not to wait for the task to complete
    • Don't use Date.now() always using performance.now() when comparing task schedule time
    • Use TaskPath to represent the arbitrary label for tasks, so this allows domain to look up multiple tasks based on a specific path. Note that when returning the task data, you will need to return tuples of [KeyPath, Task]... or just yield Task where the path: TaskPath allows users to know what the true path of the task is. The TaskPath should be an alias of LevelPath. Note that this means you can use Array<string | Buffer> to query or schedule new tasks, but the returned TaskPath would always need to be Array<Buffer>.
    • Enable the ability to "reschedule" tasks, by allowing on to update the start time of tasks. This is required by NodeGraph.
    • Scheduling tasks should be eager by default.
    • Add a TaskDeadline to set the deadline of how long the task is allowed to run for. This will be used by the timed HOF to ensure that background tasks are given a specific deadline. Default it to Infinity to allow infinite time.
    • Unscheduling a task is done by allowing users to cancel the task, by first getting the task object, then doing task.promise.cancel().
    • [ ] At the end of PolykeyAgent.start you need to do scheduler.startProcessing(), it should be a lazy creation and lazy start from the beginning to allow handlers to be registered. This is why PolykeyAgent would be creating a lazy scheduler so it doesn't start the processing loop. - this will be done in Integrate TaskManager into NodeGraph and Discovery #445
    • Remove all the getTaskData calls, replace with getTask and getTasks. Furthermore when scheduler and queue has been separated, you'll have scheduler.getTask and queue.getTask, it may just be wrapping the queue.getTask method. The scheduler getTask can get all tasks, but Queue can get only the active and inactive tasks (the queued tasks).
  • 2. Queue
    • Encapsulate an new EventTarget, where you can emit events and listen for events, these events, are going to be about the task fulfillment or rejection.
    • Event handler should only be registered when the task promise is interrogated for using the then operator.
    • The queue sublevels should be 2 partitions - inactive to active. This allows you select the tasks that are still inactive. The inactive partition may actually be just tasks that is shared between the Scheduler and Queue. It's important to distinguish inactive vs active tasks, where active tasks are live in-memory executing tasks.
    • The index into the queue should have the Priority. Have a sublevel like Queue/tasks/PriorityIndex/ScheduleTime/TaskId. This is a static priority index.
    • Event listeners should be using callback style API: (e, result). Then check if if (e != null). Then you can resolve.
  • 3. Task
    • Task should be a POJO record, not a class, in order to communicate that it is a value-level object, not some sort of special domain object. It's not live. It's just data. The original reason for Task to be a class was to make it a promise. But we can't do this due to await s.scheduleTask() awaiting both the scheduling and the processing. Now that we have gone to using a promise property, the original reason is no longer valid, so we can go back to using a record for Task so that is clear that this is a POJO data, and not some code.
    • Users do not get access to TaskData, that's encapsulated within the Task POJO record
    • The promise property is a special property, that when the then is called, that's when the event listeners are assigned in the queue. This can be forced if the scheduling of the task is not lazy. The lazy boolean has to be false by default, meaning eager scheduling is the default.
    • A task can be cancelled, the .promise should be a PromiseCancellable enabling one to do task.promise.cancel(). Doing this is independent of whether the event listeners are attached or not. It needs to be capable of cancelling a task that is only scheduled, or a task that is queued and inactive, or queued and active. The cancellability of the background task is always "lazy". The task is never early rejected.
    • TaskId encoded as string should be base32hex. You need put this as an encoding function into the utils. TaskIdEncoded. This is used when emitting events as the event name must be a string or symbol, can't pass a buffer into it. This will maintain the sortability of the task id.
  • 4. Testing
    • [ ] Start using timer mocks to mock the performance.now() - do not use mocking here, it is too complicated, instead prefer using minimal timeouts between 25 to 1000 millisecond timeouts
    • Apply fast-check here to test for concurrent race conditions, but also for normal behaviour of scheduling lots of tasks at different times
    • Create a tests to check persistence between start/stop and creations.
    • Update the concurrency is enforced test to do the following
      • random list of tasks
      • tasks are randomly scheduled
      • random number of activeLimit
      • random spot checks for the current active tasks <= active limit using set timeout to check.
      • This can be used as a framework for other tests.
    • called exactly 4 times, rename and use fastCheck to use a random set of tasks.
    • can await a task promise resolve/reject, use fast check to randomize tasks and check them, make a 3rd test combining the two.
    • task promise for invalid task should throw should check for a task to fail if no handler exists, should check both no handler existed, or handler was de-registered.
    • Eager and lazy of the promise can not use fast-check, just assign task, give delay, await. If it was eager it should resolve, lazy should reject.
    • Try exposing the Tasks.getTaskPromise.
    • Add a tests for seeding incomplete active tasks and the startup repair.
    • stopping the tasks system should gracefully stop all tasks, this should abort and await for finish.
    • tests for taskPaths if iterating over all or subset of paths
    • tests for getTasks
    • tests for updating tasks
      • while still scheduled
      • still queues
      • delay changes should update timer
      • checking for any transaction conflicts.
    • TaskPromises should be singletons, check if promises gotten from getTaskPromise are the same for the same task.
    • test for explicit cancellation
      • while scheduled
      • queued
      • active
      • check that any existing taskPromises rejects.
    • Check for if scheduled delay is respected
      • check for 0 delay should
      • NaN delay should be an exception.
      • infinity delay should lead to an exception.
    • Schedule many tasks with 0 delay and check if priority is respected.
    • Test for deadlines causing abortion and cleaning up tasks.
    • Check for unique tasks on task ID
    • test with timer mocking to generate tasks, move the clock backwards and check new tasks are strictly greater than. infix operators will work for this.
    • Generic fast check tests for concurrent API usage to check robustness.
    • Test task retrying
    • Benchmarking.

Final checklist

  • Domain specific tests
  • Full tests
  • Updated inline-comment documentation
  • Lint fixed
  • Squash and rebased
  • Sanity check the final build

@ghost
Copy link

ghost commented Aug 8, 2022

👇 Click on the image for a new way to code review
  • Make big changes easier — review code in small groups of related files

  • Know where to start — see the whole change at a glance

  • Take a code tour — explore the change with an interactive tour

  • Make comments and review — all fully sync’ed with github

    Try it now!

Review these changes using an interactive CodeSee Map

Legend

CodeSee Map Legend

@CMCDragonkai
Copy link
Member

Now that the Queue is relieved of scheduling duties, there's a question. What manages the persistence of due tasks? That is tasks that has been dispatched in the sense that they are supposed to execute "now", but they haven't finished yet, so we should persist it in case we need to retry in the event of the agent process shutting down?

At this I was thinking that this means Queue has its own state, where it will maintain a level of pending tasks. But now I'm thinking, the state management can be centralised by the scheduler, while the Queue can be purely in memory.

In such a situation the Queue can be similar to https://nicolas-van.github.io/modern-async/modern-async/1.1.0/Queue.html. And it won't need access to the database.

@CMCDragonkai
Copy link
Member

The TaskPriority will be a number between 0 and 255 inclusive where 0 is the highest priority an 255 is the lowest priority.

However users input an int8 number between -128 to 127, where 0 is the default priority.

This allows us to set a base low priority or base high priority.

The tasksUtils.toPriority and tasksUtils.fromPriority has been implemented to do this.

@CMCDragonkai
Copy link
Member

So in attempting to work out the Task type, I tried to reset the processing timer.

This led me to flesh out timer/TimeOut class.

At this I was attempting to extend Promise<T> as usual.

But I hit some problems with this... namely extending classes also inherits static methods, and TimeOut isn't supposed to be used like a Promise. Therefore instead I followed a Thenable interface by just adding the then method to it. At the same time, my tests with extending the Promise appears to not work as the await just failed with:

TypeError: Promise resolve or reject function is not callable

Not sure what this is about.

At any case, I swapped to working with a promise property instead and creating a deconstructed promise inside the TimeOut object.

Now it is possible to create TimeOut and await on them like a promise, although they are not a promise.

TypeScript does have a special type for "thenable" things. They are PromiseLike interface. Many of the promise functions fulfill actually support PromiseLike and not Promise directly.

This means, we can say that TimeOut implements the PromiseLike interface.

I find that if we were to say that it implements the Promise<T> interface, this would be different from extending the class Promise, it would mean you could also catch it. I may try that later by adding a corresponding catch method.

There was a problem with the constructor types. I want the default type parameter of TimeOut to be void. Because the default handler of TimeOut will actually be a noop handler.

Unfortunately constructor overloading doesn't support generic parameters: microsoft/TypeScript#35387, so it doesn't actually enforce the handler return type atm. There are some solutions that involve some type hackery by separating the constructor type from the instance type, like imagine having a TimeOutConstructor type similar to ArrayConstructor, then you can create overloaded new members with type parameters, but this is not important atm.

Furthermore, there's also a new structuredClone available: https://developer.mozilla.org/en-US/docs/Web/API/structuredClone, which I was hoping to use for Task, but can't yet as it is not available in v16 LTS.

@CMCDragonkai
Copy link
Member

It works:

class TimeOut<T = void> implements Promise<T> {

I wonder if we use only PromiseLike, does await within try catch and finally work?

@CMCDragonkai
Copy link
Member

The TimeOut still requires some metadata on when exactly in the future the handler is due to be processed.

We can do it in 2 ways:

  1. A set millisecond timestamp in the future based on the delay and the current time, most likely using performance.now()
  2. The delta which is the time still needed before processing the timer.

I'm wondering if performance.now() is needed, or does setTimeout use a different time? If we use performance.now() it will be based on the time since the process started which is likely to be more accurate than any kind of clock time which could be reset while the process is live. So I think going with 1. is more accurate.

This means using t.timestamp giving us the the process time to execute. Or t.timestamp meaning when the TimeOut was created, then t.delay indicating how long to delay for. Then we could provide it with respect to real time with performance.timeOrigin.

@CMCDragonkai
Copy link
Member

Ok all changes relating to the PromiseCancellable should be removed in favour of just importing @matrixai/async-cancellable. However I haven't yet released that yet, so you can just remove those changes and focus on the 2 things:

  1. Scheduler, Queue and Task
  2. Timer

The Timer we can move till later, I'll work on the contexts and decorators, and you can take over the scheduler, queue and task.

@CMCDragonkai CMCDragonkai changed the base branch from feature-concurrent-state to staging August 23, 2022 02:22
@CMCDragonkai
Copy link
Member

I've updated the base branch to be staging now.

@tegefaulkes
Copy link
Contributor Author

Ah, I was about to update that. This has been squashed and rebased on staging now.

@CMCDragonkai
Copy link
Member

I'm doing the PromiseCancellable here: MatrixAI/js-async-cancellable#1

@CMCDragonkai
Copy link
Member

Remove utils/PromiseCancellable.ts and remove it from utils/index.ts, and also remove it from tests/utils.test.ts.

@CMCDragonkai
Copy link
Member

There would be 3 domains being prototyped here:

  1. src/tasks
  2. src/timer
  3. src/tracing

Right now timer still requires updates based on the changes I had discovered as part of js-async-cancellable. And the tracing is not yet started.

@CMCDragonkai
Copy link
Member

Both the Task and Timer should not extend the native promise. They aren't actually promises. They should look like them. So they should be implements Promise<T>.

We could also just implement the PromiseLike<T> but it doesn't hurt to also implement the catch and finally methods as well as the Symbol.toStringTag.

@CMCDragonkai
Copy link
Member

I suggest start designing this assuming lazy=false, and then see how we can optimise it with lazy=true.

@tegefaulkes
Copy link
Contributor Author

All the tests are fixed now. Just need to remove the timer domain and use the new library.

@CMCDragonkai
Copy link
Member

CMCDragonkai commented Sep 12, 2022

I ran into a problem with this. When a task failed with ErrorTaskRetry the taskPromise never finished. In the logic, when the handler throws a ErrorTaskRetry then it re-queues the task but never dispatches an event for the taskPromise.

This is intentional. ErrorTaskRetry is only intended to requeue the task, as the task has not actually completed. Anything awaiting on that task promise should just stop evaluation (and nodejs will ignore subsequent code chained on that task promise) if there's no more event listeners and just shutdown the process.

If the domains need to do something else, they should then do whatever fancy work they need by scheduling new tasks.

Plus the order in the PK agent should be:

  1. Create TaskManager
  2. Create Domains
  3. Start Processing
  4. Stop Processing
  5. Stop Tasks
  6. Stop Domains
  7. Stop TaskManager

So during the stopping of the domains and tasks, it's possible to interact with the task manager by scheduling new tasks.

If in the future we have a need to have a way of requeuing a task while also settling the task promise... then a different exception is needed for this, as it also has to decide whether the task promise fulfils or rejects. And this decision should be decided later. The ErrorTaskRetry is simple to understand at this point in the time and should stay the same.

@CMCDragonkai
Copy link
Member

All the tests are fixed now. Just need to remove the timer domain and use the new library.

Yes, the library is released as @matrixai/timer at 1.0.0. The src/timer and tests/timer can be removed in favour of that. I'll still need to update the contexts library in turn to use it and I'm fixing an problem of timer/signal inheritance: #297 (comment)

@tegefaulkes
Copy link
Contributor Author

tegefaulkes commented Sep 12, 2022

Looks like everything has been addressed now. Just need any final changes, clean up and squash the new commits. Then we should be good to merge. I think you're still working on some contexts stuff @CMCDragonkai ?

There are still some tasks un-ticked for tests. They are low priority and will be addressed in a separate issue?

In the mean time I'll be working on #445 .

@CMCDragonkai CMCDragonkai force-pushed the feature-queue branch 6 times, most recently from eaf0281 to 4375f9e Compare September 12, 2022 14:54
@CMCDragonkai
Copy link
Member

The timedCancellable is integrated and working.

tegefaulkes and others added 6 commits September 13, 2022 00:59
…uld do nothing

There are 3 properties for the `timed` wrapper:

A. If timer times out, signal is aborted
B. If signal is aborted, timer is cancelled
C. If timer is owned by the wrapper, then it must be cancelled when the
target finishes

There are 4 cases where the wrapper is used and where the properties are
applied:

  1. Nothing is inherited - A B C
  2. Signal is inherited - A B C
  3. Timer is inherited - A
  4. Both signal and timer are inherited - A*

B and C are only applied to case 1 and 2, because that's when the `Timer`
is owned by the wrapper.

*Case 4 is a special case, because the timer and signal are inherited,
so it is assumed that the handlers are already setup betwen the timer
and signal.
…factored out common functionality in contexts domain
@CMCDragonkai
Copy link
Member

CMCDragonkai commented Sep 12, 2022

@tegefaulkes

Ok I see there's still some stuff to clean up in the tests. This is in the latest WIP commit.

Will review the tasks tests tomorrow in case anything else needs change.

@tegefaulkes in the mean time, can you make sure that the CI/CD is passing for this branch right now, so it is ready for merge to staging?

@CMCDragonkai CMCDragonkai marked this pull request as ready for review September 12, 2022 15:05
@tegefaulkes
Copy link
Contributor Author

Cleaned up TaskManager.test.ts and fixed the type bug with Discovery. All tests are passing locally so we should be good after a CI run.

@CMCDragonkai CMCDragonkai changed the title Feature tasks scheduler and queue Feature TaskManager Scheduler and Queue and Context Decorators for Timed and Cancellable Sep 13, 2022
@tegefaulkes
Copy link
Contributor Author

All tests are passing except for a single timeout in check:test nat.

  ● endpoint independent NAT traversal › node1 behind EIM NAT connects to node2 behind EIM NAT via seed node
    thrown: "Exceeded timeout of 40000 ms for a test.
    Use jest.setTimeout(newTimeout) to increase the timeout value, if this is a long-running test."
      352 |     globalThis.defaultTimeout * 2,
      353 |   );
    > 354 |   testUtils.testIf(supportsNatTesting)(
          |                                       ^
      355 |     'node1 behind EIM NAT connects to node2 behind EIM NAT via seed node',
      356 |     async () => {
      357 |       const {
      at tests/nat/endpointIndependentNAT.test.ts:354:39
      at Object.<anonymous> (tests/nat/endpointIndependentNAT.test.ts:15:1)

I've re-triggered it but since it's not the scope of this PR I think we can merge.

@tegefaulkes tegefaulkes merged commit a722510 into staging Sep 13, 2022
@tegefaulkes
Copy link
Contributor Author

Merged!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants