Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed Dask queue : producer consumer #5843

Open
4 tasks
AmineDiro opened this issue Feb 21, 2022 · 5 comments
Open
4 tasks

distributed Dask queue : producer consumer #5843

AmineDiro opened this issue Feb 21, 2022 · 5 comments

Comments

@AmineDiro
Copy link

AmineDiro commented Feb 21, 2022

Hello, 😄

I'm a data scientist/data engineer working on a specific workflow where I need to process a huge amount of documents so i tried using distributed.Queue to right a producer/consumer pattern using Dask distributed.

I might miss something about how to correctly implement the producer/consumer using distributed but here are some important features that could be added to distributed.Queue. I would also like to help writing this class, so thanks for your guidance :

  • The queue is not garbage collected, calling del q doesnt actually free up the distributed memory
  • We might need to implement a q.join() in distributed manner to block until all the queue items have been processed
  • Batching in queue doesnt have a 'last object mode' where we return lesser objects than the batch_size if the queue is empty
  • Spawning a "pool of consumers" in a distributed system can be tricky : basically the machines producing are also the ones waiting for the tasks to be added ? Is there a way to load balance producers and consumers in the dask scheduler ?

Thanks a lot and I hope these features make sens. I hope, I can be of some help in advancing this class !

@fjetter
Copy link
Member

fjetter commented Feb 21, 2022

Welcome @AmineDiro !

Thanks for your great question. We are typically using https://dask.discourse.group/ for questions and discussions around usage, etc. It's more likely to get community responses over there :)

We try to use the github tracker mostly for bug reports, feature requests, etc.

@AmineDiro
Copy link
Author

Hello, @fjetter Thanks for your response !

I'll cleanup this issue by submitting clear feature request for distributed.Queue and move my question to forum !

@pavithraes
Copy link
Member

@crusaderky Based on your answer on #5671 (thanks for that!), I believe this feature request may be inaccurate and can be closed?

Jim also mentioned that Queues were designed to store intermediate futures, small metadata information, etc., and not actual data which might overload the scheduler.

@crusaderky
Copy link
Collaborator

crusaderky commented Mar 2, 2022

There is some consensus for the design in #5671, although not a final one.
The design of distributed.Queue is problematic - the op and Jim listed the reasons above. It is also intrinsically bound to the concept of long-running tasks, which is another problematic design as I explained in the initial post of #5671.
There should be a discussion about the future of Queue in the long term; but it should be made clear to the users that it is a very bad idea to use it to implement textbook producer-consumer patterns.

@crusaderky
Copy link
Collaborator

The op does raise a few valid points. Namely, it is correct that descoping a Queue on the client side, as opposed to explicitly calling close(), will create a memory leak on the cluster, and this is definitely a bug as it should be treated like Future objects.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants