Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add maybetake! and tryput! #41966

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open

add maybetake! and tryput! #41966

wants to merge 5 commits into from

Conversation

tkf
Copy link
Member

@tkf tkf commented Aug 23, 2021

This PR adds two APIs for the channel types:

maybetake!(channel) -> Some(value) or nothing
tryput!(channel, v) -> success::Bool

Motivation

Currently, handling closed channel is rather tedious:

try
    value = take!(channel)
    # handle nonempty channel
catch err
    if isa(err, InvalidStateException) && err.state === :closed
        # handle empty channel
    end
    rethrow()
end

Furthermore, it arguably relies on the implementation detail of Channel since it is not specified when InvalidStateException is thrown and what the err.state field means. More importantly, it makes hard to implement AbstractChannel without committing to the exception-based close-event handling.

To fix the issue, this PR adds two APIs maybetake! and tryput!.

This pattern composes well. For example, it makes implementing the same API for RemoteChannel very straightforward. We can now trivially derive iterate for arbitrary channels (close #33555).

Naming

The asymmetry in the function names is for emphasizing that returned value has different meaning for take and put. Although we could have maybeput!(channel, v) -> Some(v) or nothing, it is likely overly monadic for most of the use cases; e.g., if tryput!(...) seems to be a good idiom to support. However, it is important that it is possible to add such functions. For generic collections, it is meaningful to have, e.g., maybepush!(xs::T, x) -> ans::Union{Some{T},Nothing}. This will be useful for composable mutate-or-widen pattern and can play an important role when exposing immutable collection support #31630 in an extensible manner (ref: BangBang.jl; Efficient and safe approaches to mutation in data parallelism).

Note that the naming scheme in the PR is consistent with trylock but not with tryparse. If this PR is merged, it may be a good idea to add the recommendation of using maybe prefix for Union{Some,Nothing}-valued function.

An alternative approach may be to add trytake!(ch1) -> (success, value or nothing). However, there is no pre-existing function with this type of returned value with the try prefix (note: @atomicreplace does not have the try prefix). Furthermore, Union{Some,Nothing}-valued functions compose well with @something:

while true
    v = @something(
        maybetake!(ch1),
        maybetake!(ch2),
        break,
    )
    process(v)
end

"Non-blocking" variant is outside of the scope

Note also that this PR does not implement the "non-blocking" version of put/take. However, it is conceivable to add maybetake_nowait! and tryput_nowait! in the future (with a warning in the docstring that they are "racy" APIs and it is discouraged to use them, unless you know what you are doing.)

base/channels.jl Outdated Show resolved Hide resolved
Co-authored-by: Julian Samaroo <[email protected]>
@vtjnash
Copy link
Member

vtjnash commented Aug 25, 2021

I am not sure I understand the use case or symmetry with tryput, but maybetake seems quite useful (instead of calling it iterate(foo))

@tkf
Copy link
Member Author

tkf commented Aug 25, 2021

I think tryput! is useful when the taker dictates the "length" of the channel (e.g., taking the first N items from an infinite stream). It's also useful for "request-reply pattern" where you can use close as a "negative acknowledgement". For example, here's a unique ID server implementation:

idserver() =
    Channel{Channel{Int}}(Inf) do request
        for i in 1:typemax(Int)
            while true
                reply = @something(maybetake!(request), return)
                tryput!(reply, i) && break
            end
        end
    end

with client API

function getid_promise(request)
    reply = Channel{Int}()
    put!(request, reply)
    return reply
end

getid(request) = take!(getid_promise(request))

function idserver(f)
    request = idserver()
    try
        f(request)
    finally
        close(request)
    end
end

The client can call close(reply) to indicate that it lost the interest on the reply:

using Test

idserver() do request
    @test getid(request) == 1
    @test getid(request) == 2
    close(getid_promise(request))  # cancel
    @test getid(request) == 3  # id not wasted
end

For an ID server, it's probably not required to ensure that IDs are not wasted. But imagine writing a lock server where you need to ensure that lock requester takes the reply and hence eventually calls unlock.

@tkf
Copy link
Member Author

tkf commented Aug 26, 2021

Here's what I meant by lock server. This is from Concurrent Programming in ML. This is not really an efficient way of writing a lock. But I think it is a good example for demonstrating non-trivial channel-based programming. In particular, you can see tryput! in various places.

It has a server task behind a channel managing a set of locks identified by an integer. The client sends AcquireRequest and ReleaseRequest to acquire and release the lock respectively:

struct AcquireRequest
    id::Int
    reply::Channel{Nothing}
end

struct ReleaseRequest
    id::Int
end

const LockRequest = Union{AcquireRequest,ReleaseRequest}

lockserver() =
    Channel{LockRequest}(Inf) do request
        locked = Set{Int}()
        waiters = Dict{Int,Vector{Channel{Nothing}}}()
        try
            while true
                msg = @something(maybetake!(request), return)
                if msg isa AcquireRequest
                    (; id, reply) = msg
                    try
                        if id in locked
                            push!(get!(() -> Channel{Nothing}[], waiters, id), reply)
                        else
                            # Don't lock it if the client is not taking it:
                            if tryput!(reply, nothing)
                                push!(locked, id)
                            end
                        end
                    catch
                        close(reply)
                        rethrow()
                    end
                elseif msg isa ReleaseRequest
                    list = get(waiters, msg.id, nothing)
                    if list !== nothing
                        while !isempty(list)
                            reply = popfirst!(list)
                            if tryput!(reply, nothing)
                                break
                            end
                        end
                        if isempty(list)
                            delete!(waiters, msg.id)
                            list = nothing
                        end
                    end
                    if list === nothing
                        delete!(locked, msg.id)
                    end
                end
            end
        finally
            for replies in values(waiters)
                foreach(close, replies)
            end
        end
    end

The client-side functions can be wrapped in an AbstractLock API:

struct LockEndpoint <: Base.AbstractLock
    request::Channel{LockRequest}
    id::Int
end

function locking(lck::LockEndpoint)
    reply = Channel{Nothing}()
    msg = AcquireRequest(lck.id, reply)
    put!(lck.request, msg)
    return reply
end

function Base.lock(lck::LockEndpoint)
    reply = locking(lck)
    try
        take!(reply)
    finally
        close(reply)
    end
end

function Base.unlock(lck::LockEndpoint)
    msg = ReleaseRequest(lck.id)
    put!(lck.request, msg)
end

function lockserver(f)
    request = lockserver()
    id = Threads.Atomic{Int}(0)
    makelock() = LockEndpoint(request, Threads.atomic_add!(id, 1))
    try
        f(makelock)
    finally
        close(request)
    end
end

Here's a simple usage/smoke test:

lockserver() do makelock
    l1 = makelock()
    l2 = makelock()
    donothing() = nothing
    lock(donothing, l1)
    lock(donothing, l2)
    lock(donothing, l1)
    lock(donothing, l1)
    lock(donothing, l2)
end

Here's an example for sending the acquire requests through multiple endpoints and then acquire at most one lock and cancel others:

function lockfirst(locks::AbstractVector{LockEndpoint})
    winner = Channel{Pair{LockEndpoint,Channel{Nothing}}}(1)
    replies = Channel{Channel{Nothing}}(Inf)
    local wl
    @sync begin
        for l in locks
            @async try
                reply = locking(l)
                if !tryput!(replies, reply)
                    close(reply)
                    return
                end
                try
                    wait(reply)
                catch
                    return
                end
                tryput!(winner, l => reply) || close(reply)
            catch
                close(winner)
                rethrow()
            end
        end
        try
            wl, reply = fetch(winner)
            close(winner)  # cancel others
            take!(reply)  # take the lock
        finally
            close(replies)
            foreach(close, replies)
            close(winner)
        end
    end
    return wl
end

lockserver() do makelock1
    lockserver() do makelock2
        lockserver() do makelock3
            locks = [
                [makelock1() for _ in 1:2]
                [makelock2() for _ in 1:2]
                [makelock3() for _ in 1:2]
            ]
            ntasks = 100
            stats = Vector{Vector{Pair{UInt,Int}}}(undef, ntasks)
            @sync begin
                for i in 1:ntasks
                    Threads.@spawn begin
                        locked = Dict{UInt,Int}()
                        for _ in 1:100
                            l = lockfirst(locks)
                            unlock(l)

                            k = objectid(l)
                            locked[k] = get!(locked, k, 0) + 1
                        end
                        stats[i] = sort!(collect(locked))
                    end
                end
            end
            @info "Stats" stats
        end
    end
end

@vtjnash
Copy link
Member

vtjnash commented Nov 12, 2021

In terms of API, perhaps this would be more consistent to call trytake!, to go with the pattern set by tryparse and trylock and tryput!, instead of going with the pattern of Base.maybeview (to exhaustively list all functions that appear to currently match that naming pattern)?

base/channels.jl Outdated Show resolved Hide resolved
@vtjnash vtjnash changed the title RFC: Add maybetake! and tryput! add maybetake! and tryput! Nov 12, 2021
@vtjnash
Copy link
Member

vtjnash commented Nov 12, 2021

Removing RFC, since the function is useful, though may need a decision to be made on the name

@tkf
Copy link
Member Author

tkf commented Nov 13, 2021

x-ref #34821 for naming

@jonas-schulze
Copy link
Contributor

In terms of API, perhaps this would be more consistent to call trytake!, to go with the pattern set by tryparse and trylock and tryput!, instead of going with the pattern of Base.maybeview (to exhaustively list all functions that appear to currently match that naming pattern)?

I agree that consistency is important, but I'm somewhat sad that Union{Some,Nothing}, which is essentially the same as Maybe from Haskell, follows the try* naming scheme instead of maybe*. 😢

@vtjnash
Copy link
Member

vtjnash commented Mar 15, 2022

bump?

@tkf
Copy link
Member Author

tkf commented Apr 25, 2022

I'm suggesting to use Union{Ok,Err}-valued functions for this. See #45080

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants