Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: destroy issue and use maximum workers #44

Merged
merged 7 commits into from
Jan 26, 2023

Conversation

Aslemammad
Copy link
Member

@Aslemammad Aslemammad commented Jan 13, 2023

Resolves #43
Resolves #42

@overlookmotel I did the maximumWorkers trick, but I do not have any reproduction for the halting issue, I'd like to see that so I'd write a proper test case for that.

@Aslemammad Aslemammad changed the base branch from current to main January 13, 2023 17:08
@@ -263,3 +264,22 @@ test('workerId should never be duplicated', async () => {
await pool.destroy()
await sleep(5000)
}, 30000)

/* test('isolateWorkers: true with minThreads of 0 should not halt(#42)', async () => {
const minThreads = 0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@overlookmotel see this bit, it's 0 but it does not halt.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I'll try to put together a failing test. I definitely saw this halting behavior in "real world". But give me a few days - work is crazy at the moment.

Copy link

@overlookmotel overlookmotel Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I know what it is. This bug occurs when you have tasks waiting in the queue when a task completes. It never drains the queue.

If you add maxThreads + 1 tasks to the queue synchronously, then you'll see the last task never gets run.

const promises = [];
for (let i = 0; i < maxThreads + 1; i++) {
  promises.push( pool.run({}) );
}
const promiseForAll = Promise.all(promises);
await promiseForAll; // `promiseForAll` never resolves

I just tried to create a PR with failing test, but I've already forked piscina so Github won't let me fork tinypool too - you can't fork a fork as well as the origin, it seems.

@@ -1,6 +1,6 @@
'use strict'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove 'use strict' if this file is ESM.

src/index.ts Outdated
@@ -843,7 +843,7 @@ class ThreadPool {
// When `isolateWorkers` is enabled, remove the worker after task is finished
if (this.options.isolateWorkers && taskInfo.workerInfo) {
this._removeWorker(taskInfo.workerInfo)
this._ensureMinimumWorkers()
this._ensureMaximumWorkers()
Copy link

@overlookmotel overlookmotel Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will solve the problem, but will create more workers than required.

const pool = new Tinypool({
  filename: 'worker.js',
  minThreads: 0,
  maxThreads: 8,
  isolateWorkers: true,
});

await pool.run({});

In above example:

  1. Pool is created with no workers initially.
  2. pool.run() creates 1 worker to run the task.
  3. When task completes, _ensureMaximumWorkers() creates 8 new workers.
  4. None of these workers are ever given a task.
  5. They'll shut themselves down again shortly after starting (depending on idleTimeout option).

A more ideal solution would be that when a task completes:

  • Check if either:
    • there are any tasks in the queue which don't already have a worker being started for them
    • or number of workers is less than minThreads.
  • If neither condition is true, do nothing.
  • If is either condition is true, create only 1 new worker.
  • When that worker starts it will run the next queued task (if there is one).

The first step is the tricky part. I don't think it's as simple as this.taskQueue.length > 0, because of circumstance where there are tasks in the queue, but they all already have a new worker being started for them.

I tried, but I can't figure out the logic of how a task is marked as "awaiting worker to start".

Comment on lines +605 to +612
_ensureEnoughWorkersForTaskQueue(): void {
while (
this.workers.size < this.taskQueue.size &&
this.workers.size < this.options.maxThreads
) {
this._addNewWorker()
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@overlookmotel Do you think this solution is an optimal one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's not wrong to have workers for those that already have since we do not have info! And I think on the second condition, it creates a limit for the workers and can be helpful. I'd like to see your opinion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@overlookmotel Let me know if you're there.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really sorry but I can't help more. As I mentioned on #42, I am very tight for time. The complexity (which we're discussing here) was exactly why I didn't feel I could make a PR myself. I find piscina's code pretty hard to disentangle.

But my guess is that your solution above is not quite right, and will create new workers when none is actually needed in some case.

My suggestion would be to look at the existing code for how it's handled when a thread is terminated due to use of an AbortController. That results in same situation (worker removed from pool, do we need to add a new one?) so I imagine the code for that contains a good solution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure! ok, thank you so much! I'll investigate it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In AbortController the ensureMin approach is used which is not proper for our case!
I think let's stick with the _ensureEnoughWorkersForTaskQueue approach for now and let's see how it goes! Thanks for yout help.

@Aslemammad Aslemammad linked an issue Jan 26, 2023 that may be closed by this pull request
@Aslemammad Aslemammad merged commit 03fbfdf into main Jan 26, 2023
@Aslemammad Aslemammad deleted the fix-destroy-and-maximum-workers branch January 26, 2023 21:52
@purepear
Copy link

Hi there 👋 , I followed your tweet and tested the latest Vitest 0.28.3 which uses tinypool 0.3.1.

I raised this PR that adds a new GHA workflow:

  1. the tests run using 1 worker(visually observed the tests go one by one) and it took 1m 16s
  2. I set minThreads and maxThreads to 2 and ran the tests again and this time it took 50s and I can observe the tests running 2 by 2.
  maxThreads: process.env.CI ? 2 : undefined,
  minThreads: process.env.CI ? 2 : undefined,

Let me know if I can further help with the debugging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Errors, when destroying a pool isolateWorkers option can result in not utilizing all CPUs
3 participants