diff --git a/docs/src/man/split_apply_combine.md b/docs/src/man/split_apply_combine.md index b70509d66c..270d45e777 100644 --- a/docs/src/man/split_apply_combine.md +++ b/docs/src/man/split_apply_combine.md @@ -122,11 +122,13 @@ It is allowed to mix single values and vectors if multiple transformations are requested. In this case single value will be repeated to match the length of columns specified by returned vectors. -A separate task is spawned for each specified transformation, allowing for -parallel operation when several transformations are requested and Julia was -started with more than one thread. Passed transformation functions should -therefore not modify global variables (i.e. they should be pure), or use -locks to control parallel accesses. +A separate task is spawned for each specified transformation; each transformation +then spawns as many tasks as Julia threads, and splits processing of groups across +them (however, currently transformations with optimized implementations like `sum` +and transformations that return multiple rows use a single task for all groups). +This allows for parallel operation when Julia was started with more than one +thread. Passed transformation functions should therefore not modify global variables +(i.e. they should be pure), or use locks to control parallel accesses. To apply `function` to each row instead of whole columns, it can be wrapped in a `ByRow` struct. `cols` can be any column indexing syntax, in which case diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index 9f2eb701a7..9c72401612 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -145,10 +145,13 @@ const TRANSFORMATION_COMMON_RULES = `copycols=false`, a `SubDataFrame` is returned without copying columns. If a `GroupedDataFrame` is passed, a separate task is spawned for each - specified transformation, allowing for parallel operation when several - transformations are requested and Julia was started with more than one thread. - Passed transformation functions should therefore not modify global variables - (i.e. they should be pure), or use locks to control parallel accesses. + specified transformation; each transformation then spawns as many tasks + as Julia threads, and splits processing of groups across them + (however, currently transformations with optimized implementations like `sum` + and transformations that return multiple rows use a single task for all groups). + This allows for parallel operation when Julia was started with more than one + thread. Passed transformation functions should therefore not modify global + variables (i.e. they should be pure), or use locks to control parallel accesses. In the future, parallelism may be extended to other cases, so this requirement also holds for `DataFrame` inputs. """ diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index 8db068c398..5d4a157ce7 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -57,7 +57,7 @@ function _combine_with_first(first::Union{NamedTuple, DataFrameRow, AbstractData f, gd, incols, targetcolnames, firstmulticol) else - outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, 1, 1, + outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, f, gd, incols, targetcolnames, firstmulticol) end @@ -92,9 +92,126 @@ function fill_row!(row, outcols::NTuple{N, AbstractVector}, return nothing end -function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, +function _combine_rows_with_first_task!(tid::Integer, + rowstart::Integer, + rowend::Integer, + rownext::Integer, + outcols::NTuple{<:Any, AbstractVector}, + outcolsref::Ref{NTuple{<:Any, AbstractVector}}, + type_widened::AbstractVector{Bool}, + widen_type_lock::ReentrantLock, + f::Base.Callable, + gd::GroupedDataFrame, + starts::AbstractVector{<:Integer}, + ends::AbstractVector{<:Integer}, + incols::Union{Nothing, AbstractVector, + Tuple, NamedTuple}, + colnames::NTuple{<:Any, Symbol}, + firstmulticol::Val) + j = nothing + gdidx = gd.idx + local newoutcols + for i in rownext:rowend + row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) + j = fill_row!(row, outcols, i, 1, colnames) + if j !== nothing # Need to widen column + # If another thread is already widening outcols, wait until it's done + lock(widen_type_lock) + try + newoutcols = outcolsref[] + # Workaround for julia#15276 + newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row + ntuple(length(newoutcols)) do k + S = typeof(row[k]) + T = eltype(newoutcols[k]) + U = promote_type(S, T) + if S <: T || U <: T + newoutcols[k] + else + type_widened .= true + Tables.allocatecolumn(U, length(newoutcols[k])) + end + end + end + for k in 1:length(outcols) + if outcols[k] !== newoutcols[k] + copyto!(newoutcols[k], rowstart, + outcols[k], rowstart, i - rowstart + (k < j)) + end + end + j = fill_row!(row, newoutcols, i, j, colnames) + @assert j === nothing # eltype is guaranteed to match + + outcolsref[] = newoutcols + type_widened[tid] = false + finally + unlock(widen_type_lock) + end + return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, + type_widened, widen_type_lock, + f, gd, starts, ends, + incols, colnames, firstmulticol) + end + # If other thread widened columns, copy already processed data to new vectors + # This doesn't have to happen immediately (hence type_widened isn't atomic), + # but the more we wait the more data will have to be copied + if type_widened[tid] + lock(widen_type_lock) do + type_widened[tid] = false + newoutcols = outcolsref[] + for k in 1:length(outcols) + # Check whether this particular column has been widened + if outcols[k] !== newoutcols[k] + copyto!(newoutcols[k], rowstart, + outcols[k], rowstart, i - rowstart + 1) + end + end + end + return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, + type_widened, widen_type_lock, + f, gd, starts, ends, + incols, colnames, firstmulticol) + end + end + return outcols +end + +# CategoricalArray is thread-safe only when input and output levels are equal +# since then all values are added upfront. Otherwise, if the function returns +# CategoricalValues mixed from different pools, one thread may add values, +# which may put the invpool in an invalid state while the other one is reading from it +function isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, + incols::Union{Tuple, NamedTuple}) + anycat = any(outcols) do col + T = typeof(col) + # If the first result is missing, widening can give a CategoricalArray + # if later results are CategoricalValues + eltype(col) === Missing || + (nameof(T) === :CategoricalArray && + nameof(parentmodule(T)) === :CategoricalArrays) + end + if anycat + levs = nothing + for col in incols + T = typeof(col) + if nameof(T) === :CategoricalArray && + nameof(parentmodule(T)) === :CategoricalArrays + if levs !== nothing + levs == levels(col) || return false + else + levs = levels(col) + end + end + end + end + return true +end +isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::AbstractVector) = + isthreadsafe(outcols, (incols,)) +isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::Nothing) = true + +function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, outcols::NTuple{N, AbstractVector}, - rowstart::Integer, colstart::Integer, f::Base.Callable, gd::GroupedDataFrame, incols::Union{Nothing, AbstractVector, Tuple, NamedTuple}, colnames::NTuple{N, Symbol}, @@ -108,31 +225,64 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, len == 0 && return outcols, colnames # Handle first group - j = fill_row!(first, outcols, rowstart, colstart, colnames) - @assert j === nothing # eltype is guaranteed to match - # Handle remaining groups - @inbounds for i in rowstart+1:len - row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) - j = fill_row!(row, outcols, i, 1, colnames) - if j !== nothing # Need to widen column type - local newcols - let i = i, j = j, outcols=outcols, row=row # Workaround for julia#15276 - newcols = ntuple(length(outcols)) do k - S = typeof(row[k]) - T = eltype(outcols[k]) - U = promote_type(S, T) - if S <: T || U <: T - outcols[k] - else - copyto!(Tables.allocatecolumn(U, length(outcols[k])), - 1, outcols[k], 1, k >= j ? i-1 : i) - end + j1 = fill_row!(firstrow, outcols, 1, 1, colnames) + @assert j1 === nothing # eltype is guaranteed to match + + # Handle groups other than the first one + # Create up to one task per thread + # This has lower overhead than creating one task per group, + # but is optimal only if operations take roughly the same time for all groups + if VERSION >= v"1.4" && isthreadsafe(outcols, incols) + basesize = max(1, cld(len - 1, Threads.nthreads())) + partitions = Iterators.partition(2:len, basesize) + else + partitions = (2:len,) + end + widen_type_lock = ReentrantLock() + outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols) + type_widened = fill(false, length(partitions)) + tasks = Vector{Task}(undef, length(partitions)) + for (tid, idx) in enumerate(partitions) + tasks[tid] = + @spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx), + outcols, outcolsref, + type_widened, widen_type_lock, + f, gd, starts, ends, incols, colnames, + firstmulticol) + end + + # Workaround JuliaLang/julia#38931: + # we want to preserve the exception type thrown in user code, + # and print the backtrace corresponding to it + for t in tasks + try + wait(t) + catch e + throw(t.exception) + end + end + + # Copy data for any tasks that finished before others widened columns + oldoutcols = outcols + outcols = outcolsref[] + if outcols !== oldoutcols # first group + for k in 1:length(outcols) + outcols[k][1] = oldoutcols[k][1] + end + end + for (tid, idx) in enumerate(partitions) + if type_widened[tid] + oldoutcols = fetch(tasks[tid]) + for k in 1:length(outcols) + # Check whether this particular column has been widened + if oldoutcols[k] !== outcols[k] + copyto!(outcols[k], first(idx), oldoutcols[k], first(idx), + last(idx) - first(idx) + 1) end end - return _combine_rows_with_first!(row, newcols, i, j, - f, gd, incols, colnames, firstmulticol) end end + return outcols, colnames end diff --git a/test/grouping.jl b/test/grouping.jl index 3e1f6c33df..696bd643c7 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3577,4 +3577,74 @@ end end end +@testset "result eltype widening from different tasks" begin + if VERSION < v"1.5" + Base.convert(::Type{Union{Missing, Nothing, Float64}}, x::Int) = float(x) + end + Random.seed!(1) + for y in (Any[1, missing, missing, 2, 4], + Any[1, missing, nothing, 2.1, 'a'], + Any[1, 1, missing, 1, nothing, 1, 2.1, 1, 'a'], + Any[1, 2, 3, 4, 5, 6, 2.1, missing, 'a'], + Any[1, 2, 3.1, 4, 5, 6, 2.1, missing, 'a']), + x in (1:length(y), rand(1:2, length(y)), rand(1:3, length(y))) + df = DataFrame(x=x, y1=y, y2=reverse(y)) + gd = groupby(df, :x) + res = combine(gd, :y1 => (y -> y[1]) => :y1, :y2 => (y -> y[end]) => :y2) + # sleep ensures one task will widen the result after the other is done, + # so that data has to be copied at the end + @test res ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep((x == [5])/10); y[end])) => :y2) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep(x[1]/100); y[end])) => :y2) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep(rand()/10); y[end])) => :y2) + + if df.x == 1:nrow(df) + @test res ≅ df + end + + res = combine(gd, :y1 => (y -> (y1=y[1], y2=y[end])) => AsTable, + :y2 => (y -> (y3=y[1], y4=y[end])) => AsTable) + # sleep ensures one task will widen the result after the other is done, + # so that data has to be copied at the end + @test res ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep((x == [5])/10); (y3=y[1], y4=y[end]))) => AsTable) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep(x[1]/100); (y3=y[1], y4=y[end]))) => AsTable) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep(rand()/10); (y3=y[1], y4=y[end]))) => AsTable) + end +end + +@testset "CategoricalArray thread safety" begin + # These tests do not actually trigger multithreading bugs, + # but at least they check that the code that disables multithreading + # with CategoricalArray when levels are different works + Random.seed!(35) + df = DataFrame(x=rand(1:10, 100), + y=categorical(rand(10:15, 100)), + z=categorical(rand(0:20, 100))) + df.y2 = reverse(df.y) # Same levels + gd = groupby(df, :x) + + @test combine(gd, :y => (y -> y[1]) => :res) == + combine(gd, [:y, :y2] => ((y, x) -> y[1]) => :res) == + combine(gd, [:y, :x] => ((y, x) -> y[1]) => :res) == + combine(gd, [:y, :z] => ((y, z) -> y[1]) => :res) == + combine(gd, :y => (y -> get(y[1])) => :res) + + @test combine(gd, [:x, :y, :y2] => + ((x, y, y2) -> x[1] <= 5 ? y[1] : y2[1]) => :res) == + combine(gd, [:x, :y, :y2] => + ((x, y, y2) -> x[1] <= 5 ? get(y[1]) : get(y2[1])) => :res) + + @test combine(gd, [:x, :y, :z] => + ((x, y, z) -> x[1] <= 5 ? y[1] : z[1]) => :res) == + combine(gd, [:x, :y, :z] => + ((x, y, z) -> x[1] <= 5 ? get(y[1]) : get(z[1])) => :res) +end + end # module diff --git a/test/runtests.jl b/test/runtests.jl index ca9e1fd87e..a2f9ff1fc2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -8,8 +8,12 @@ anyerrors = false using DataFrames, Dates, Test, Random -if VERSION > v"1.3" && Threads.nthreads() < 2 - @warn("Running with only one thread: correctness of parallel operations is not tested") +if VERSION > v"1.3" + if Threads.nthreads() < 2 + @warn("Running with only one thread: correctness of parallel operations is not tested") + else + @show Threads.nthreads() + end end my_tests = ["utils.jl",