Skip to content

Commit

Permalink
Implement Scherer & Scott dual queue (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkf authored Sep 29, 2021
1 parent 0621002 commit d270582
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ ConcurrentCollections.jl provides the following lock-free collections for Julia
≥ 1.7:

* [`DualLinkedConcurrentRingQueue`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.DualLinkedConcurrentRingQueue)
* [`DualLinkedQueue`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.DualLinkedQueue)
* [`LinkedConcurrentRingQueue`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.LinkedConcurrentRingQueue)
* [`ConcurrentQueue`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.ConcurrentQueue)
* [`ConcurrentStack`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.ConcurrentStack)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ function setup(; kwargs...)
suite["channel"] = @benchmarkable hotpotato!(Channel{Bool}(Inf); $kwargs...)
suite["dlcrq"] =
@benchmarkable hotpotato!(DualLinkedConcurrentRingQueue{Bool}(); $kwargs...)
suite["dlq"] = @benchmarkable hotpotato!(DualLinkedQueue{Bool}(); $kwargs...)
return suite
end

Expand Down
1 change: 1 addition & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

```@docs
DualLinkedConcurrentRingQueue
DualLinkedQueue
LinkedConcurrentRingQueue
ConcurrentQueue
ConcurrentStack
Expand Down
3 changes: 3 additions & 0 deletions src/ConcurrentCollections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export
ConcurrentStack,
Delete,
DualLinkedConcurrentRingQueue,
DualLinkedQueue,
Keep,
LinkedConcurrentRingQueue,
WorkStealingDeque,
Expand Down Expand Up @@ -70,6 +71,7 @@ include("msqueue.jl")
include("stack.jl")
include("lcrq.jl")
include("dlcrq.jl")
include("ssqueue.jl")
include("misc.jl")

end # module Implementations
Expand All @@ -78,6 +80,7 @@ using .Implementations:
ConcurrentQueue,
ConcurrentStack,
DualLinkedConcurrentRingQueue,
DualLinkedQueue,
LinkedConcurrentRingQueue,
WorkStealingDeque

Expand Down
2 changes: 1 addition & 1 deletion src/docs/DualLinkedConcurrentRingQueue.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
A concurrent queue with "almost" nonblocking `push!` and `popfirst!`. Calling
`popfirst!` on an empty queue waits for a `push!` in another task.

See also: [`LinkedConcurrentRingQueue`](@ref)
See also: [`LinkedConcurrentRingQueue`](@ref), [`DualLinkedQueue`](@ref)

# Examples
```julia
Expand Down
37 changes: 37 additions & 0 deletions src/docs/DualLinkedQueue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
DualLinkedQueue{T}()

A concurrent queue with nonblocking `push!` and `popfirst!`. Calling
`popfirst!` on an empty queue waits for a `push!` in another task.

[`DualLinkedConcurrentRingQueue`](@ref) provides a faster dual queue with a
larger memory footprint.

# Examples
```julia
julia> using ConcurrentCollections

julia> q = DualLinkedQueue{Int}();

julia> push!(q, 111);

julia> push!(q, 222);

julia> popfirst!(q) # first-in first-out
111

julia> popfirst!(q)
222
```

# Extended help

Since `popfirst!` blocks when called on an empty queue, a `DualLinkedQueue` acts
almost like an unbounded `Base.Channel`. However, `DualLinkedQueue` does not
support `close` or blocking on `push!` when exceeding a bound.

`DualLinkedQueue` implements the dual queue by Scherer and Scott (2004):

> Scherer, William N., and Michael L. Scott. “Nonblocking Concurrent Data
> Structures with Condition Synchronization.” In Distributed Computing, edited
> by Rachid Guerraoui, 174–87. Lecture Notes in Computer Science. Berlin,
> Heidelberg: Springer, 2004. <https://doi.org/10.1007/978-3-540-30186-8_13>.
1 change: 1 addition & 0 deletions src/msqueue.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mutable struct MSQNode{T}
MSQNode{T}(next::Union{MSQNode{T},Nothing}, value::T) where {T} = new{T}(next, value)
end

# TODO: rename it to ConcurrentLinkedQueue?
mutable struct ConcurrentQueue{T}
@atomic head::MSQNode{T}
@atomic tail::MSQNode{T}
Expand Down
257 changes: 257 additions & 0 deletions src/ssqueue.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
struct IsData end
struct IsAntiData end
const PolarityTrait = Union{IsData,IsAntiData}

Base.adjoint(::IsData) = IsAntiData()
Base.adjoint(::IsAntiData) = IsData()

abstract type AbstractSSQNode{T} end

mutable struct SSQDataNode{T} <: AbstractSSQNode{T}
@atomic next::Union{AbstractSSQNode{T},Nothing}
value::T

SSQDataNode{T}() where {T} = new{T}(nothing)
SSQDataNode{T}(next::Union{AbstractSSQNode{T},Nothing}, value::T) where {T} =
new{T}(next, value)
end

mutable struct SSQWaiterNode{T} <: AbstractSSQNode{T}
@atomic next::Union{AbstractSSQNode{T},Nothing}
value::Waiter{T}

SSQWaiterNode{T}() where {T} = new{T}(nothing)
SSQWaiterNode{T}(next::Union{AbstractSSQNode{T},Nothing}, value::Waiter{T}) where {T} =
new{T}(next, value)
end

const SSQNode{T} = Union{SSQDataNode{T},SSQWaiterNode{T}}

polarityof(::SSQDataNode) = IsData
polarityof(::SSQWaiterNode) = IsAntiData
ssqnodetype(::IsData, ::Type{T}) where {T} = SSQDataNode{T}
ssqnodetype(::IsAntiData, ::Type{T}) where {T} = SSQWaiterNode{T}

#=
mutable struct SSQNode{Polarity<:PolarityTrait,T,V<:Union{T,Waiter{T}}}
@atomic next::Union{SSQNode{IsData,T,T},SSQNode{IsAntiData,T,Waiter{T}},Nothing}
value::V
SSQNode{Polarity,T}() where {Polarity,T} = new{Polarity,T,vtype(Polarity(), T)}(nothing)
function SSQNode{Polarity,T}(
next::Union{SSQNode{<:Any,T},Nothing},
value::Union{T,Waiter{T}},
) where {Polarity,T}
V = vtype(Polarity(), T)
value = value::V
return new{Polarity,T,V}(next, value)
end
end
@inline SSQNode{Polarity,T,V}(
next::Union{SSQNode{<:Any,T},Nothing},
value::Union{T,Waiter{T}},
) where {Polarity,T,V} = SSQNode{Polarity,T}(next, value)::SSQNode{Polarity,T,V}
polarityof(::SSQNode{Polarity}) where {Polarity} = Polarity
=#

mutable struct DualLinkedQueue{T}
@atomic head::SSQNode{T}
@atomic tail::SSQNode{T}
end

Base.eltype(::Type{DualLinkedQueue{T}}) where {T} = T

DualLinkedQueue() = DualLinkedQueue{Any}()
function DualLinkedQueue{T}() where {T}
n = SSQDataNode{T}()
return DualLinkedQueue{T}(n, n)
end

function denqueue!(
queue::DualLinkedQueue{T},
x::Union{T,Waiter{T}},
polarity::PolarityTrait,
) where {T}
local node::Union{Nothing,SSQNode{T}} = nothing

head = @atomic queue.head
tail = @atomic queue.tail
while true
next = (@atomic head.next)::Union{Nothing,SSQNode{T}}
head′ = @atomic queue.head
if head !== head′ # snapshot failed
head = head′
tail = @atomic queue.tail
continue
end
if head === tail
last = next
should_enqueue = true
elseif polarity isa polarityof(next)
last = @atomic tail.next
should_enqueue = true
else
last = nothing
should_enqueue = false
end

if should_enqueue
tail′ = @atomic queue.tail
if tail′ !== tail # snapshot failed
tail = tail′
continue
end
if last !== nothing
old, ok = @atomicreplace(queue.tail, tail => last)
tail = ok ? next : old
continue
end
node = if node === nothing
ssqnodetype(polarity, T)(nothing, x)
else
node
end::ssqnodetype(polarity, T)
last, ok = @atomicreplace(tail.next, nothing => node)
if ok
@atomicreplace(queue.tail, tail => node)
return nothing
end
last = last::SSQNode{T} # can be any polarity
old, ok = @atomicreplace(queue.tail, tail => last)
tail = ok ? last : old
else
next = next::ssqnodetype(polarity', T)
value = next.value
head, ok = @atomicreplace(queue.head, head => next)
if ok
return Some(value)
end
tail = @atomic queue.tail
end
end
end

function Base.push!(queue::DualLinkedQueue{T}, x) where {T}
x = convert(T, x)
while true
y = denqueue!(queue, x, IsData())
if y isa Some
w = something(y)::Waiter{T}
tryput!(w, x) || continue
else
y::Nothing
end
return queue
end
end

function Base.popfirst!(queue::DualLinkedQueue{T}) where {T}
# TODO: cache Waiter
w = Waiter{T}()
y = denqueue!(queue, w, IsAntiData())
if y isa Some
return something(y)
else
y::Nothing
x = fetch(w)
return x::T
end
end

Base.IteratorSize(::Type{<:DualLinkedQueue}) = Base.SizeUnknown()

function Base.iterate(queue::DualLinkedQueue{T}) where {T}
head = @atomic queue.head
node = @atomic head.next
if node === nothing
return nothing
else
if node isa SSQWaiterNode
return nothing
else
return (node.value, node)
end
end
end

function Base.iterate(::DualLinkedQueue{T}, prev::SSQDataNode) where {T}
node = (@atomic prev.next)::Union{Nothing,SSQDataNode}
if node === nothing
return nothing
else
return (node.value, node)
end
end

struct NodeIterator{T}
queue::T
end

Base.IteratorSize(::Type{<:NodeIterator}) = Base.SizeUnknown()

Base.eltype(::Type{NodeIterator{<:DualLinkedQueue{T}}}) where {T} = SSQNode{T}

function Base.iterate(
iter::NodeIterator{DualLinkedQueue{T}},
prev::SSQNode{T} = let queue = iter.queue
@atomic queue.head
end,
) where {T}
node = (@atomic prev.next)::Union{Nothing,SSQNode{T}}
if node === nothing
return nothing
else
return (node, node)
end
end

function check_invariance(queue::DualLinkedQueue{T}) where {T}
isdata = nothing
for node in NodeIterator(queue)
if isdata === nothing
isdata = node isa SSQDataNode
elseif isdata !=′ (node isa SSQDataNode)
return false
end
end
return true
end

function summaryinfo(queue::DualLinkedQueue{T}) where {T}
counter = Ref(0)
isdata = Ref(true)
for node in NodeIterator(queue)
isdata[] = !(node isa SSQWaiterNode)
counter[] += 1
end
return (; nitems = counter[], isdata = isdata[])
end

function Base.summary(io::IO, queue::DualLinkedQueue)
show(io, MIME"text/plain"(), typeof(queue))
nitems, isdata = summaryinfo(queue)
s = nitems > 1 ? "s" : ""
print(io, " with ", nitems, isdata ? " data item$s" : " waiter$s")
end

function Base.show(io::IO, ::MIME"text/plain", queue::DualLinkedQueue)
summary(io, queue)
eio = IOContext(io, :typeinfo => eltype(queue), :limit => true, :compact => true)
n = 0
for x in queue
if n == 0
println(io, ":")
else
println(io)
end
n += 1
if n > 3
print(io, "")
return
end
print(io, " ")
show(eio, MIME"text/plain"(), x)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ include("test_doctest.jl")
include("test_lcrq.jl")
include("test_mpcrq.jl")
include("test_msqueue.jl")
include("test_ssqueue.jl")
include("test_tsstack.jl")
include("test_work_stealing_deque.jl")

Expand Down
9 changes: 4 additions & 5 deletions test/ConcurrentCollectionsTests/src/test_dlcrq.jl
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,16 @@ function test_concurrent_push_pop(ntrials = 100)
@withprogress name = "concurrent push-pop" begin
@testset for trial in 1:ntrials
@logprogress (trial - 1) / ntrials
check_concurrent_push_pop()
q = DualLinkedConcurrentRingQueue{Int}(; log2ringsize = 5)
# q = Channel{Int}(Inf)
check_concurrent_push_pop!(q)
end
end
end

function check_concurrent_push_pop()
function check_concurrent_push_pop!(q; nitems = 2^20)
nsend = cld(Threads.nthreads(), 2)
nrecv = max(1, Threads.nthreads() - nsend)
q = DualLinkedConcurrentRingQueue{Int}(; log2ringsize = 5)
# q = Channel{Int}(Inf)
nitems = 2^20
received = concurrent_push_pop!(q, nitems, nsend, nrecv)
allreceived = reduce(vcat, received)
sort!(allreceived)
Expand Down
Loading

0 comments on commit d270582

Please sign in to comment.