Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: non-blocking take! #45902

Open
mjg0 opened this issue Jul 1, 2022 · 11 comments
Open

Feature request: non-blocking take! #45902

mjg0 opened this issue Jul 1, 2022 · 11 comments

Comments

@mjg0
Copy link

mjg0 commented Jul 1, 2022

It would be nice to have a non-blocking take! to pull work from a Channel without hanging.

My particular use case is for a program with small units of work that can't be grouped into larger sets because each iteration relies on the results of the last. With so little to do for each unit of work, spawning a task for every one is a lot of overhead, so I would like to use long-lived tasks that repeatedly pull work from a shared queue until the simulation is finished. Here's some pseudocode demonstrating the concept:

# Create a workqueue
workqueue = Channel{Blah}(blah)
# Spawn worker threads that repeatedly pull work from the queue
for _ in 2:Threads.nthreads()
    Threads.@spawn while true
        try
            work = take!(workqueue)
            dowork(work)
        catch e
            e isa InvalidStateException && e.state === :closed && break
            rethrow(e)
        end
    end
end
# Actual work loop
while whatever
    # Put work into the queue
    for i in something
        put!(i, workqueue)
    end
    # Do main thread stuff
    dosomethingwhilewaiting()
    # If there's still work, help out
    while workremains # using an atomic int as a counter, for example
        work = nonblockingtake!(workqueue) # <<< THE CALL
        work !== None && dowork(work)
    end
end
# Tell worker threads we're done
close(workqueue)

For this particular case, the body of the loop in which the main thread takes work from the queue could conceptually be replaced by yield(), but that doesn't seem to perform well from what I've seen. Even if yield() could work in that case, though, I have to imagine there are other cases where not having a non-blocking take! makes life harder.

@JeffBezanson
Copy link
Member

I guess it's possible to write a trytake! that returns Some or Nothing, but it doesn't seem like the best way to synchronize in this case. For example, you could have whoever takes the last item close the channel, then the main thread just waits for the worker threads using @sync. To keep all the threads busy, just spawn more tasks, i.e. 1:nthreads instead of 2:nthreads.

@mjg0
Copy link
Author

mjg0 commented Jul 1, 2022

Wouldn't that require creating a new Channel for each iteration, though? Maybe that's fine, but it seems like it would be a lot of overhead, both in creating the Channel and passing the new Channel to the running workers.

@fredrikekre
Copy link
Member

Xref #41966, which implements maybetake! and tryput!.

@JeffBezanson
Copy link
Member

No, I don't see that it would require multiple Channels.

@JeffBezanson
Copy link
Member

Note maybetake! in that PR is still blocking.

@mjg0
Copy link
Author

mjg0 commented Jul 1, 2022

I may be misunderstanding your suggestion--if the Channel is closed when the queue is exhausted on each iteration:

while !done
    workqueue = Channel{Blah}(blah)
    for i=1:something
        put!(i, workqueue)
    end
    # workqueue gets closed once it's exhausted for this iteration
end

...doesn't that mean I'd need another? Or is there a way to reconstitute a Channel after it's closed?

I'm also not sure where I would put an @sync--if the workers persist across iterations, that would require that the number of iterations be known from the start, and that I could put all the work in the queue all at once before starting, right?

@elextr
Copy link

elextr commented Jul 1, 2022

I think the key point @JeffBezanson made above was:

To keep all the threads busy, just spawn more tasks, i.e. 1:nthreads instead of 2:nthreads.

In other words, only workers do work, having an extra one means all threads are busy and the main thread doesn't have to help out, so it doesn't need a non-blocking take.

[Putting words in Jeffs mouth which he may spit out :-)]

@mjg0
Copy link
Author

mjg0 commented Jul 1, 2022

@elextr ah, gotcha--that was the original plan, but I'm not sure how to keep the main thread out of the way (rather than just busy-waiting); I tried yield():

...
while workremains
    yield()
end
...

...but it gave very poor performance, with usually about 40% of the time spent in yield itself when I profiled with two threads.

@elextr
Copy link

elextr commented Jul 1, 2022

...but it gave very poor performance, with usually about 40% of the time spent in yield itself when I profiled with two threads.

That would be expected, its consuming CPU constantly because its non-blocking.

Make type Blah capable of indicating "last item", and @spawn the putter as well as the takers and have it put "last item" when it finishes all the actual work requests. Then which ever task reads "last item" closes the channel, and all the other tasks exit (nicely, not rethrow IMHO). Then the whole shebang can be waited for by the main thread using @sync.

At least thats what I think Jeff suggested.

[Edit: or use this for the putter]

@JeffBezanson
Copy link
Member

Yes that is what I meant! It probably isn't necessary to define a sentinel type though; you could use nothing or just a separate boolean flag (with synchronization).

@JeffBezanson
Copy link
Member

Also, as the OP indicated, the best solution would be to @spawn each work item (after all, we already have a multi-threaded work queue!). Now, of course that will always be slower than a multi-threaded loop where each thread does many iterations, but if it's much slower than manually pulling items from a channel I think we basically have a performance bug, so we really should fix that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants