Skip to content

Commit

Permalink
Multithreaded custom grouped operations with single-row result
Browse files Browse the repository at this point in the history
Spawn one task per thread in `_combine_rows_with_first!` so that custom grouped
operations that return a single row are run in parallel. This is optimal if
operations take about the same time for all groups. Spawning one task per group
could be faster if these times vary a lot, but the overhead would be larger:
we could add this as an option later.

The implementation is somewhat tricky as output columns need to be reallocated
when a new return type is detected.
  • Loading branch information
nalimilan committed Dec 27, 2020
1 parent 4a12320 commit d2bef41
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 24 deletions.
150 changes: 126 additions & 24 deletions src/groupeddataframe/complextransforms.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function _combine_with_first(first::Union{NamedTuple, DataFrameRow, AbstractData
f, gd, incols, targetcolnames,
firstmulticol)
else
outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, 1, 1,
outcols, finalcolnames = _combine_rows_with_first!(first, initialcols,
f, gd, incols, targetcolnames,
firstmulticol)
end
Expand Down Expand Up @@ -92,9 +92,87 @@ function fill_row!(row, outcols::NTuple{N, AbstractVector},
return nothing
end

function _combine_rows_with_first_task!(tid::Integer,
idx::AbstractVector{<:Integer},
outcols::NTuple{<:Any, AbstractVector},
outcolsref::Ref{NTuple{<:Any, AbstractVector}},
type_widened::AbstractVector{Bool},
widen_type_lock::ReentrantLock,
i::Integer,
f::Base.Callable,
gd::GroupedDataFrame,
starts::AbstractVector{<:Integer},
ends::AbstractVector{<:Integer},
incols::Union{Nothing, AbstractVector,
Tuple, NamedTuple},
colnames::NTuple{<:Any, Symbol},
firstmulticol::Val)
j = nothing
gdidx = gd.idx
local newoutcols
for i in idx
row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol)
j = fill_row!(row, outcols, i, 1, colnames)
if j !== nothing # Need to widen column
# If another thread is already widening outcols, wait until it's done
lock(widen_type_lock)
try
newoutcols = outcolsref[]
# If another thread widened outcols, try again
# to see whether new eltype is wide enough
if newoutcols !== outcols
j = fill_row!(row, newoutcols, i, j, colnames)
end
while j !== nothing
# Workaround for julia#15276
newoutcols = let i=i, j=j, outcols=outcols, row=row, idx=idx
ntuple(length(outcols)) do k
S = typeof(row[k])
T = eltype(outcols[k])
U = promote_type(S, T)
if S <: T || U <: T
outcols[k]
else
copyto!(Tables.allocatecolumn(U, length(outcols[k])),
idx[1], outcols[k], idx[1], i - idx[1] + (k < j))
end
end
end
outcolsref[] = newoutcols
j = fill_row!(row, newoutcols, i, j, colnames)
type_widened .= true
type_widened[tid] = false
end
finally
unlock(widen_type_lock)
end
return _combine_rows_with_first_task!(tid, idx, newoutcols, outcolsref,
type_widened, widen_type_lock,
i+1, f, gd, starts, ends,
incols, colnames, firstmulticol)
end
# If other thread widened columns, copy already processed data to new vectors
# This doesn't have to happen immediately (hence type_widened isn't atomic),
# but the more we wait the more data will have to be copied
if type_widened[tid]
lock(widen_type_lock) do
type_widened[tid] = false
newoutcols = outcolsref[]
for k in 1:length(outcols)
copyto!(newoutcols[k], idx[1], outcols[k], idx[1], i - idx[1] + 1)
end
end
return _combine_rows_with_first_task!(tid, idx, newoutcols, outcolsref,
type_widened, widen_type_lock,
i, f, gd, starts, ends,
incols, colnames, firstmulticol)
end
end
return outcols
end

function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow},
outcols::NTuple{N, AbstractVector},
rowstart::Integer, colstart::Integer,
f::Base.Callable, gd::GroupedDataFrame,
incols::Union{Nothing, AbstractVector, Tuple, NamedTuple},
colnames::NTuple{N, Symbol},
Expand All @@ -107,32 +185,56 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow},
# handle empty GroupedDataFrame
len == 0 && return outcols, colnames

# Handle first group
j = fill_row!(first, outcols, rowstart, colstart, colnames)
@assert j === nothing # eltype is guaranteed to match
# Handle remaining groups
@inbounds for i in rowstart+1:len
row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol)
j = fill_row!(row, outcols, i, 1, colnames)
if j !== nothing # Need to widen column type
local newcols
let i = i, j = j, outcols=outcols, row=row # Workaround for julia#15276
newcols = ntuple(length(outcols)) do k
S = typeof(row[k])
T = eltype(outcols[k])
U = promote_type(S, T)
if S <: T || U <: T
outcols[k]
else
copyto!(Tables.allocatecolumn(U, length(outcols[k])),
1, outcols[k], 1, k >= j ? i-1 : i)
end
# Handle groups other than the first one (which is handled below)
# Create up to one task per thread
# This has lower overhead than creating one task per group,
# but is optimal only if operations take roughly the same time for all groups
basesize = max(1, (len - 1) ÷ Threads.nthreads())
partitions = collect(_partition(2:len, basesize))
widen_type_lock = ReentrantLock()
outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols)
type_widened = fill(false, length(partitions))
tasks = similar(partitions, Task)
for tid in 1:length(partitions)
idx = partitions[tid]
tasks[tid] =
@spawn _combine_rows_with_first_task!(tid, idx, outcols, outcolsref,
type_widened, widen_type_lock,
1, f, gd, starts, ends, incols, colnames,
firstmulticol)
end

# Workaround JuliaLang/julia#38931:
# we want to preserve the exception type thrown in user code,
# and print the backtrace corresponding to it
for t in tasks
try
wait(t)
catch e
throw(t.exception)
end
end

# Copy data for any tasks that finished before others widened columns
outcols = outcolsref[]
for tid in 1:length(partitions)
if type_widened[tid]
idx = partitions[tid]
oldoutcols = fetch(tasks[tid])
for k in 1:length(outcols)
if oldoutcols[k] !== outcols[k]
copyto!(outcols[k], idx[1], oldoutcols[k], idx[1],
idx[end] - idx[1] + 1)
end
end
return _combine_rows_with_first!(row, newcols, i, j,
f, gd, incols, colnames, firstmulticol)
end
end

# Handle first group
# This is done at the end to write directly to the final outcols
j1 = fill_row!(first, outcols, 1, 1, colnames)
@assert j1 === nothing # eltype is guaranteed to match

return outcols, colnames
end

Expand Down
31 changes: 31 additions & 0 deletions src/other/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,34 @@ else
end

funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner))


if VERSION >= v"1.4"
const _partition = Iterators.partition
else
function _partition(xs, n)
@assert firstindex(xs) == 1
m = cld(length(xs), n)
return (view(xs, i*n+1:min((i+1)*n, length(xs))) for i in 0:m-1)
end
end

function tforeach(f, xs::AbstractArray; basesize::Integer)
nt = Threads.nthreads()
if nt > 1 && length(xs) > basesize
# Ensure we don't create more than 10 times more tasks than available threads
basesize′ = min(basesize, length(xs) ÷ nt * 10)
@sync for p in _partition(eachindex(xs), basesize′)
Threads.@spawn begin
for i in p
f(@inbounds xs[i])
end
end
end
else
for i in eachindex(xs)
f(@inbounds xs[i])
end
end
return
end

0 comments on commit d2bef41

Please sign in to comment.