From 1d5467d3298ea589c975ac35d83c96051889f98c Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Mon, 21 Dec 2020 15:16:11 +0100 Subject: [PATCH 01/11] Multithreaded custom grouped operations with single-row result Spawn one task per thread in `_combine_rows_with_first!` so that custom grouped operations that return a single row are run in parallel. This is optimal if operations take about the same time for all groups. Spawning one task per group could be faster if these times vary a lot, but the overhead would be larger: we could add this as an option later. The implementation is somewhat tricky as output columns need to be reallocated when a new return type is detected. --- src/groupeddataframe/complextransforms.jl | 150 ++++++++++++++++++---- src/other/utils.jl | 31 +++++ 2 files changed, 157 insertions(+), 24 deletions(-) diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index 8db068c398..e66e6dbb0f 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -57,7 +57,7 @@ function _combine_with_first(first::Union{NamedTuple, DataFrameRow, AbstractData f, gd, incols, targetcolnames, firstmulticol) else - outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, 1, 1, + outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, f, gd, incols, targetcolnames, firstmulticol) end @@ -92,9 +92,87 @@ function fill_row!(row, outcols::NTuple{N, AbstractVector}, return nothing end +function _combine_rows_with_first_task!(tid::Integer, + idx::AbstractVector{<:Integer}, + outcols::NTuple{<:Any, AbstractVector}, + outcolsref::Ref{NTuple{<:Any, AbstractVector}}, + type_widened::AbstractVector{Bool}, + widen_type_lock::ReentrantLock, + i::Integer, + f::Base.Callable, + gd::GroupedDataFrame, + starts::AbstractVector{<:Integer}, + ends::AbstractVector{<:Integer}, + incols::Union{Nothing, AbstractVector, + Tuple, NamedTuple}, + colnames::NTuple{<:Any, Symbol}, + firstmulticol::Val) + j = nothing + gdidx = gd.idx + local newoutcols + for i in idx + row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) + j = fill_row!(row, outcols, i, 1, colnames) + if j !== nothing # Need to widen column + # If another thread is already widening outcols, wait until it's done + lock(widen_type_lock) + try + newoutcols = outcolsref[] + # If another thread widened outcols, try again + # to see whether new eltype is wide enough + if newoutcols !== outcols + j = fill_row!(row, newoutcols, i, j, colnames) + end + while j !== nothing + # Workaround for julia#15276 + newoutcols = let i=i, j=j, outcols=outcols, row=row, idx=idx + ntuple(length(outcols)) do k + S = typeof(row[k]) + T = eltype(outcols[k]) + U = promote_type(S, T) + if S <: T || U <: T + outcols[k] + else + copyto!(Tables.allocatecolumn(U, length(outcols[k])), + idx[1], outcols[k], idx[1], i - idx[1] + (k < j)) + end + end + end + outcolsref[] = newoutcols + j = fill_row!(row, newoutcols, i, j, colnames) + type_widened .= true + type_widened[tid] = false + end + finally + unlock(widen_type_lock) + end + return _combine_rows_with_first_task!(tid, idx, newoutcols, outcolsref, + type_widened, widen_type_lock, + i+1, f, gd, starts, ends, + incols, colnames, firstmulticol) + end + # If other thread widened columns, copy already processed data to new vectors + # This doesn't have to happen immediately (hence type_widened isn't atomic), + # but the more we wait the more data will have to be copied + if type_widened[tid] + lock(widen_type_lock) do + type_widened[tid] = false + newoutcols = outcolsref[] + for k in 1:length(outcols) + copyto!(newoutcols[k], idx[1], outcols[k], idx[1], i - idx[1] + 1) + end + end + return _combine_rows_with_first_task!(tid, idx, newoutcols, outcolsref, + type_widened, widen_type_lock, + i, f, gd, starts, ends, + incols, colnames, firstmulticol) + end + end + return outcols +end + function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, outcols::NTuple{N, AbstractVector}, - rowstart::Integer, colstart::Integer, f::Base.Callable, gd::GroupedDataFrame, incols::Union{Nothing, AbstractVector, Tuple, NamedTuple}, colnames::NTuple{N, Symbol}, @@ -107,32 +185,56 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, # handle empty GroupedDataFrame len == 0 && return outcols, colnames - # Handle first group - j = fill_row!(first, outcols, rowstart, colstart, colnames) - @assert j === nothing # eltype is guaranteed to match - # Handle remaining groups - @inbounds for i in rowstart+1:len - row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) - j = fill_row!(row, outcols, i, 1, colnames) - if j !== nothing # Need to widen column type - local newcols - let i = i, j = j, outcols=outcols, row=row # Workaround for julia#15276 - newcols = ntuple(length(outcols)) do k - S = typeof(row[k]) - T = eltype(outcols[k]) - U = promote_type(S, T) - if S <: T || U <: T - outcols[k] - else - copyto!(Tables.allocatecolumn(U, length(outcols[k])), - 1, outcols[k], 1, k >= j ? i-1 : i) - end + # Handle groups other than the first one (which is handled below) + # Create up to one task per thread + # This has lower overhead than creating one task per group, + # but is optimal only if operations take roughly the same time for all groups + basesize = max(1, (len - 1) ÷ Threads.nthreads()) + partitions = collect(_partition(2:len, basesize)) + widen_type_lock = ReentrantLock() + outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols) + type_widened = fill(false, length(partitions)) + tasks = similar(partitions, Task) + for tid in 1:length(partitions) + idx = partitions[tid] + tasks[tid] = + @spawn _combine_rows_with_first_task!(tid, idx, outcols, outcolsref, + type_widened, widen_type_lock, + 1, f, gd, starts, ends, incols, colnames, + firstmulticol) + end + + # Workaround JuliaLang/julia#38931: + # we want to preserve the exception type thrown in user code, + # and print the backtrace corresponding to it + for t in tasks + try + wait(t) + catch e + throw(t.exception) + end + end + + # Copy data for any tasks that finished before others widened columns + outcols = outcolsref[] + for tid in 1:length(partitions) + if type_widened[tid] + idx = partitions[tid] + oldoutcols = fetch(tasks[tid]) + for k in 1:length(outcols) + if oldoutcols[k] !== outcols[k] + copyto!(outcols[k], idx[1], oldoutcols[k], idx[1], + idx[end] - idx[1] + 1) end end - return _combine_rows_with_first!(row, newcols, i, j, - f, gd, incols, colnames, firstmulticol) end end + + # Handle first group + # This is done at the end to write directly to the final outcols + j1 = fill_row!(first, outcols, 1, 1, colnames) + @assert j1 === nothing # eltype is guaranteed to match + return outcols, colnames end diff --git a/src/other/utils.jl b/src/other/utils.jl index e13627748c..337c858f7a 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -83,3 +83,34 @@ else end funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) + + +if VERSION >= v"1.4" + const _partition = Iterators.partition +else + function _partition(xs, n) + @assert firstindex(xs) == 1 + m = cld(length(xs), n) + return (view(xs, i*n+1:min((i+1)*n, length(xs))) for i in 0:m-1) + end +end + +function tforeach(f, xs::AbstractArray; basesize::Integer) + nt = Threads.nthreads() + if nt > 1 && length(xs) > basesize + # Ensure we don't create more than 10 times more tasks than available threads + basesize′ = min(basesize, length(xs) ÷ nt * 10) + @sync for p in _partition(eachindex(xs), basesize′) + Threads.@spawn begin + for i in p + f(@inbounds xs[i]) + end + end + end + else + for i in eachindex(xs) + f(@inbounds xs[i]) + end + end + return +end \ No newline at end of file From 13f4f76137c5fab31c29d1220223180b1c2012ba Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Fri, 1 Jan 2021 12:25:58 +0100 Subject: [PATCH 02/11] Add tests, fix and simplify code --- src/groupeddataframe/complextransforms.jl | 50 ++++++++++++----------- test/grouping.jl | 12 ++++++ 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index e66e6dbb0f..efa317faee 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -118,31 +118,30 @@ function _combine_rows_with_first_task!(tid::Integer, lock(widen_type_lock) try newoutcols = outcolsref[] - # If another thread widened outcols, try again - # to see whether new eltype is wide enough - if newoutcols !== outcols - j = fill_row!(row, newoutcols, i, j, colnames) - end - while j !== nothing - # Workaround for julia#15276 - newoutcols = let i=i, j=j, outcols=outcols, row=row, idx=idx - ntuple(length(outcols)) do k - S = typeof(row[k]) - T = eltype(outcols[k]) - U = promote_type(S, T) - if S <: T || U <: T - outcols[k] - else - copyto!(Tables.allocatecolumn(U, length(outcols[k])), - idx[1], outcols[k], idx[1], i - idx[1] + (k < j)) - end + # Workaround for julia#15276 + newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row, idx=idx + ntuple(length(newoutcols)) do k + S = typeof(row[k]) + T = eltype(newoutcols[k]) + U = promote_type(S, T) + if S <: T || U <: T + newoutcols[k] + else + Tables.allocatecolumn(U, length(newoutcols[k])) end end - outcolsref[] = newoutcols - j = fill_row!(row, newoutcols, i, j, colnames) - type_widened .= true - type_widened[tid] = false end + j = fill_row!(row, newoutcols, i, j, colnames) + @assert j === nothing # eltype is guaranteed to match + for k in 1:length(outcols) + if outcols[k] !== newoutcols[k] + copyto!(newoutcols[k], idx[1], + outcols[k], idx[1], i - idx[1] + 1) + end + end + outcolsref[] = newoutcols + type_widened .= true + type_widened[tid] = false finally unlock(widen_type_lock) end @@ -159,12 +158,15 @@ function _combine_rows_with_first_task!(tid::Integer, type_widened[tid] = false newoutcols = outcolsref[] for k in 1:length(outcols) - copyto!(newoutcols[k], idx[1], outcols[k], idx[1], i - idx[1] + 1) + if outcols[k] !== newoutcols[k] + copyto!(newoutcols[k], idx[1], + outcols[k], idx[1], i - idx[1] + 1) + end end end return _combine_rows_with_first_task!(tid, idx, newoutcols, outcolsref, type_widened, widen_type_lock, - i, f, gd, starts, ends, + i+1, f, gd, starts, ends, incols, colnames, firstmulticol) end end diff --git a/test/grouping.jl b/test/grouping.jl index 3e1f6c33df..e73e5b751c 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3577,4 +3577,16 @@ end end end +@testset "result eltype widening from different tasks" begin + for df in (DataFrame(x=1:5, y=Any[1, missing, nothing, 2.0, 'a']), + DataFrame(x=1:9, y=Any[1, 1, missing, 1, nothing, 1, 2.0, 1, 'a']), + DataFrame(x=1:9, y=Any[1, 2, 3, 4, 5, 6, 2.0, missing, 'a'])) + gd = groupby(df, :x) + @test combine(gd, :y => (y -> y[1]) => :y) ≅ df + # sleep ensures one task will widen the result after the other is done, + # so that data has to be copied at the end + @test combine(gd, [:x, :y] => ((x, y) -> (sleep(x == [5]); y[1])) => :y) ≅ df + end +end + end # module From 135512bccdc50c10702f27840a63551cb8227ed2 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Thu, 21 Jan 2021 09:18:36 +0100 Subject: [PATCH 03/11] Drop tforeach --- src/other/utils.jl | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/src/other/utils.jl b/src/other/utils.jl index 337c858f7a..e13627748c 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -83,34 +83,3 @@ else end funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) - - -if VERSION >= v"1.4" - const _partition = Iterators.partition -else - function _partition(xs, n) - @assert firstindex(xs) == 1 - m = cld(length(xs), n) - return (view(xs, i*n+1:min((i+1)*n, length(xs))) for i in 0:m-1) - end -end - -function tforeach(f, xs::AbstractArray; basesize::Integer) - nt = Threads.nthreads() - if nt > 1 && length(xs) > basesize - # Ensure we don't create more than 10 times more tasks than available threads - basesize′ = min(basesize, length(xs) ÷ nt * 10) - @sync for p in _partition(eachindex(xs), basesize′) - Threads.@spawn begin - for i in p - f(@inbounds xs[i]) - end - end - end - else - for i in eachindex(xs) - f(@inbounds xs[i]) - end - end - return -end \ No newline at end of file From c5930a955996c14da6fbe2635601b21a50ea4175 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Thu, 21 Jan 2021 10:11:55 +0100 Subject: [PATCH 04/11] Fix failure, small cleanup --- src/groupeddataframe/complextransforms.jl | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index efa317faee..44aff93a4c 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -191,14 +191,17 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, # Create up to one task per thread # This has lower overhead than creating one task per group, # but is optimal only if operations take roughly the same time for all groups - basesize = max(1, (len - 1) ÷ Threads.nthreads()) - partitions = collect(_partition(2:len, basesize)) + @static if VERSION >= v"1.4" + basesize = max(1, (len - 1) ÷ Threads.nthreads()) + partitions = Iterators.partition(2:len, basesize) + else + partitions = (2:len,) + end widen_type_lock = ReentrantLock() outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols) type_widened = fill(false, length(partitions)) - tasks = similar(partitions, Task) - for tid in 1:length(partitions) - idx = partitions[tid] + tasks = Vector{Task}(undef, length(partitions)) + for (tid, idx) in enumerate(partitions) tasks[tid] = @spawn _combine_rows_with_first_task!(tid, idx, outcols, outcolsref, type_widened, widen_type_lock, @@ -219,9 +222,8 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, # Copy data for any tasks that finished before others widened columns outcols = outcolsref[] - for tid in 1:length(partitions) + for (tid, idx) in enumerate(partitions) if type_widened[tid] - idx = partitions[tid] oldoutcols = fetch(tasks[tid]) for k in 1:length(outcols) if oldoutcols[k] !== outcols[k] From 30a650c38bfc4b515d10c73ce9c4744d9179e09f Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Thu, 21 Jan 2021 10:23:59 +0100 Subject: [PATCH 05/11] Docs --- docs/src/man/split_apply_combine.md | 12 +++++++----- src/abstractdataframe/selection.jl | 11 +++++++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/docs/src/man/split_apply_combine.md b/docs/src/man/split_apply_combine.md index b70509d66c..270d45e777 100644 --- a/docs/src/man/split_apply_combine.md +++ b/docs/src/man/split_apply_combine.md @@ -122,11 +122,13 @@ It is allowed to mix single values and vectors if multiple transformations are requested. In this case single value will be repeated to match the length of columns specified by returned vectors. -A separate task is spawned for each specified transformation, allowing for -parallel operation when several transformations are requested and Julia was -started with more than one thread. Passed transformation functions should -therefore not modify global variables (i.e. they should be pure), or use -locks to control parallel accesses. +A separate task is spawned for each specified transformation; each transformation +then spawns as many tasks as Julia threads, and splits processing of groups across +them (however, currently transformations with optimized implementations like `sum` +and transformations that return multiple rows use a single task for all groups). +This allows for parallel operation when Julia was started with more than one +thread. Passed transformation functions should therefore not modify global variables +(i.e. they should be pure), or use locks to control parallel accesses. To apply `function` to each row instead of whole columns, it can be wrapped in a `ByRow` struct. `cols` can be any column indexing syntax, in which case diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index 9f2eb701a7..9c72401612 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -145,10 +145,13 @@ const TRANSFORMATION_COMMON_RULES = `copycols=false`, a `SubDataFrame` is returned without copying columns. If a `GroupedDataFrame` is passed, a separate task is spawned for each - specified transformation, allowing for parallel operation when several - transformations are requested and Julia was started with more than one thread. - Passed transformation functions should therefore not modify global variables - (i.e. they should be pure), or use locks to control parallel accesses. + specified transformation; each transformation then spawns as many tasks + as Julia threads, and splits processing of groups across them + (however, currently transformations with optimized implementations like `sum` + and transformations that return multiple rows use a single task for all groups). + This allows for parallel operation when Julia was started with more than one + thread. Passed transformation functions should therefore not modify global + variables (i.e. they should be pure), or use locks to control parallel accesses. In the future, parallelism may be extended to other cases, so this requirement also holds for `DataFrame` inputs. """ From d94539f0c8cb14020c2d1fceb3b405c2aa918635 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Thu, 21 Jan 2021 18:14:08 +0100 Subject: [PATCH 06/11] Fixes --- src/groupeddataframe/complextransforms.jl | 43 ++++++++++++----------- test/grouping.jl | 33 ++++++++++++++--- 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index 44aff93a4c..b4742066a5 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -93,12 +93,13 @@ function fill_row!(row, outcols::NTuple{N, AbstractVector}, end function _combine_rows_with_first_task!(tid::Integer, - idx::AbstractVector{<:Integer}, + rowstart::Integer, + rowend::Integer, + i::Integer, outcols::NTuple{<:Any, AbstractVector}, outcolsref::Ref{NTuple{<:Any, AbstractVector}}, type_widened::AbstractVector{Bool}, widen_type_lock::ReentrantLock, - i::Integer, f::Base.Callable, gd::GroupedDataFrame, starts::AbstractVector{<:Integer}, @@ -110,7 +111,7 @@ function _combine_rows_with_first_task!(tid::Integer, j = nothing gdidx = gd.idx local newoutcols - for i in idx + for i in i:rowend row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) j = fill_row!(row, outcols, i, 1, colnames) if j !== nothing # Need to widen column @@ -119,7 +120,7 @@ function _combine_rows_with_first_task!(tid::Integer, try newoutcols = outcolsref[] # Workaround for julia#15276 - newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row, idx=idx + newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row ntuple(length(newoutcols)) do k S = typeof(row[k]) T = eltype(newoutcols[k]) @@ -131,23 +132,24 @@ function _combine_rows_with_first_task!(tid::Integer, end end end - j = fill_row!(row, newoutcols, i, j, colnames) - @assert j === nothing # eltype is guaranteed to match for k in 1:length(outcols) if outcols[k] !== newoutcols[k] - copyto!(newoutcols[k], idx[1], - outcols[k], idx[1], i - idx[1] + 1) + copyto!(newoutcols[k], rowstart, + outcols[k], rowstart, i - rowstart + (k < j)) end end + j = fill_row!(row, newoutcols, i, j, colnames) + @assert j === nothing # eltype is guaranteed to match + outcolsref[] = newoutcols type_widened .= true type_widened[tid] = false finally unlock(widen_type_lock) end - return _combine_rows_with_first_task!(tid, idx, newoutcols, outcolsref, + return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, type_widened, widen_type_lock, - i+1, f, gd, starts, ends, + f, gd, starts, ends, incols, colnames, firstmulticol) end # If other thread widened columns, copy already processed data to new vectors @@ -159,21 +161,21 @@ function _combine_rows_with_first_task!(tid::Integer, newoutcols = outcolsref[] for k in 1:length(outcols) if outcols[k] !== newoutcols[k] - copyto!(newoutcols[k], idx[1], - outcols[k], idx[1], i - idx[1] + 1) + copyto!(newoutcols[k], rowstart, + outcols[k], rowstart, i - rowstart + 1) end end end - return _combine_rows_with_first_task!(tid, idx, newoutcols, outcolsref, + return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, type_widened, widen_type_lock, - i+1, f, gd, starts, ends, + f, gd, starts, ends, incols, colnames, firstmulticol) end end return outcols end -function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, +function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, outcols::NTuple{N, AbstractVector}, f::Base.Callable, gd::GroupedDataFrame, incols::Union{Nothing, AbstractVector, Tuple, NamedTuple}, @@ -203,9 +205,10 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, tasks = Vector{Task}(undef, length(partitions)) for (tid, idx) in enumerate(partitions) tasks[tid] = - @spawn _combine_rows_with_first_task!(tid, idx, outcols, outcolsref, + @spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx), + outcols, outcolsref, type_widened, widen_type_lock, - 1, f, gd, starts, ends, incols, colnames, + f, gd, starts, ends, incols, colnames, firstmulticol) end @@ -227,8 +230,8 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, oldoutcols = fetch(tasks[tid]) for k in 1:length(outcols) if oldoutcols[k] !== outcols[k] - copyto!(outcols[k], idx[1], oldoutcols[k], idx[1], - idx[end] - idx[1] + 1) + copyto!(outcols[k], first(idx), oldoutcols[k], first(idx), + last(idx) - first(idx) + 1) end end end @@ -236,7 +239,7 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, # Handle first group # This is done at the end to write directly to the final outcols - j1 = fill_row!(first, outcols, 1, 1, colnames) + j1 = fill_row!(firstrow, outcols, 1, 1, colnames) @assert j1 === nothing # eltype is guaranteed to match return outcols, colnames diff --git a/test/grouping.jl b/test/grouping.jl index e73e5b751c..45dd456efa 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3578,14 +3578,37 @@ end end @testset "result eltype widening from different tasks" begin - for df in (DataFrame(x=1:5, y=Any[1, missing, nothing, 2.0, 'a']), - DataFrame(x=1:9, y=Any[1, 1, missing, 1, nothing, 1, 2.0, 1, 'a']), - DataFrame(x=1:9, y=Any[1, 2, 3, 4, 5, 6, 2.0, missing, 'a'])) + if VERSION < v"1.5" + Base.convert(::Type{Union{Missing, Nothing, Float64}}, x::Int) = float(x) + end + Random.seed!(1) + for y in (Any[1, missing, missing, 2, 4], + Any[1, missing, nothing, 2.1, 'a'], + Any[1, 1, missing, 1, nothing, 1, 2.1, 1, 'a'], + Any[1, 2, 3, 4, 5, 6, 2.1, missing, 'a'], + Any[1, 2, 3.1, 4, 5, 6, 2.1, missing, 'a']), + x in (1:length(y), rand(1:2, length(y)), rand(1:3, length(y))) + df = DataFrame(x=x, y=y) gd = groupby(df, :x) - @test combine(gd, :y => (y -> y[1]) => :y) ≅ df + res = combine(gd, :y => (y -> y[1]) => :y) + # sleep ensures one task will widen the result after the other is done, + # so that data has to be copied at the end + @test res ≅ + combine(gd, [:x, :y] => ((x, y) -> (sleep((x == [5])/10); y[1])) => :y) ≅ + combine(gd, [:x, :y] => ((x, y) -> (sleep(x[1]/100); y[1])) => :y) ≅ + combine(gd, [:x, :y] => ((x, y) -> (sleep(rand()/10); y[1])) => :y) + + if df.x == 1:nrow(df) + @test res ≅ df + end + + res = combine(gd, :y => (y -> (y1=y[1], y2=last(y))) => AsTable) # sleep ensures one task will widen the result after the other is done, # so that data has to be copied at the end - @test combine(gd, [:x, :y] => ((x, y) -> (sleep(x == [5]); y[1])) => :y) ≅ df + @test res ≅ + combine(gd, [:x, :y] => ((x, y) -> (sleep((x == [5])/10); (y1=y[1], y2=last(y)))) => AsTable) ≅ + combine(gd, [:x, :y] => ((x, y) -> (sleep(x[1]/100); (y1=y[1], y2=last(y)))) => AsTable) ≅ + combine(gd, [:x, :y] => ((x, y) -> (sleep(rand()/10); (y1=y[1], y2=last(y)))) => AsTable) end end From 77a0d72e375eb4dd8479657eaff497433bc0576e Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Thu, 28 Jan 2021 22:35:46 +0100 Subject: [PATCH 07/11] Review fixes --- src/groupeddataframe/complextransforms.jl | 10 +++++---- test/grouping.jl | 25 +++++++++++++++-------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index b4742066a5..9b922bbe20 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -95,7 +95,7 @@ end function _combine_rows_with_first_task!(tid::Integer, rowstart::Integer, rowend::Integer, - i::Integer, + rownext::Integer, outcols::NTuple{<:Any, AbstractVector}, outcolsref::Ref{NTuple{<:Any, AbstractVector}}, type_widened::AbstractVector{Bool}, @@ -111,7 +111,7 @@ function _combine_rows_with_first_task!(tid::Integer, j = nothing gdidx = gd.idx local newoutcols - for i in i:rowend + for i in rownext:rowend row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) j = fill_row!(row, outcols, i, 1, colnames) if j !== nothing # Need to widen column @@ -128,6 +128,7 @@ function _combine_rows_with_first_task!(tid::Integer, if S <: T || U <: T newoutcols[k] else + type_widened .= true Tables.allocatecolumn(U, length(newoutcols[k])) end end @@ -142,7 +143,6 @@ function _combine_rows_with_first_task!(tid::Integer, @assert j === nothing # eltype is guaranteed to match outcolsref[] = newoutcols - type_widened .= true type_widened[tid] = false finally unlock(widen_type_lock) @@ -160,6 +160,7 @@ function _combine_rows_with_first_task!(tid::Integer, type_widened[tid] = false newoutcols = outcolsref[] for k in 1:length(outcols) + # Check whether this particular column has been widened if outcols[k] !== newoutcols[k] copyto!(newoutcols[k], rowstart, outcols[k], rowstart, i - rowstart + 1) @@ -194,7 +195,7 @@ function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, # This has lower overhead than creating one task per group, # but is optimal only if operations take roughly the same time for all groups @static if VERSION >= v"1.4" - basesize = max(1, (len - 1) ÷ Threads.nthreads()) + basesize = max(1, cld(len - 1, Threads.nthreads())) partitions = Iterators.partition(2:len, basesize) else partitions = (2:len,) @@ -229,6 +230,7 @@ function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, if type_widened[tid] oldoutcols = fetch(tasks[tid]) for k in 1:length(outcols) + # Check whether this particular column has been widened if oldoutcols[k] !== outcols[k] copyto!(outcols[k], first(idx), oldoutcols[k], first(idx), last(idx) - first(idx) + 1) diff --git a/test/grouping.jl b/test/grouping.jl index 45dd456efa..e2a2c9d116 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3588,27 +3588,34 @@ end Any[1, 2, 3, 4, 5, 6, 2.1, missing, 'a'], Any[1, 2, 3.1, 4, 5, 6, 2.1, missing, 'a']), x in (1:length(y), rand(1:2, length(y)), rand(1:3, length(y))) - df = DataFrame(x=x, y=y) + df = DataFrame(x=x, y1=y, y2=y) gd = groupby(df, :x) - res = combine(gd, :y => (y -> y[1]) => :y) + res = combine(gd, :y1 => (y -> y[1]) => :y1, :y2 => (y -> y[end]) => :y2) # sleep ensures one task will widen the result after the other is done, # so that data has to be copied at the end @test res ≅ - combine(gd, [:x, :y] => ((x, y) -> (sleep((x == [5])/10); y[1])) => :y) ≅ - combine(gd, [:x, :y] => ((x, y) -> (sleep(x[1]/100); y[1])) => :y) ≅ - combine(gd, [:x, :y] => ((x, y) -> (sleep(rand()/10); y[1])) => :y) + combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep((x == [5])/10); y[end])) => :y2) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep(x[1]/100); y[end])) => :y2) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep(rand()/10); y[end])) => :y2) if df.x == 1:nrow(df) @test res ≅ df end - res = combine(gd, :y => (y -> (y1=y[1], y2=last(y))) => AsTable) + res = combine(gd, :y1 => (y -> (y1=y[1], y2=y[end])) => AsTable, + :y2 => (y -> (y3=y[1], y4=y[end])) => AsTable) # sleep ensures one task will widen the result after the other is done, # so that data has to be copied at the end @test res ≅ - combine(gd, [:x, :y] => ((x, y) -> (sleep((x == [5])/10); (y1=y[1], y2=last(y)))) => AsTable) ≅ - combine(gd, [:x, :y] => ((x, y) -> (sleep(x[1]/100); (y1=y[1], y2=last(y)))) => AsTable) ≅ - combine(gd, [:x, :y] => ((x, y) -> (sleep(rand()/10); (y1=y[1], y2=last(y)))) => AsTable) + combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep((x == [5])/10); (y3=y[1], y4=y[end]))) => AsTable) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep(x[1]/100); (y3=y[1], y4=y[end]))) => AsTable) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep(rand()/10); (y3=y[1], y4=y[end]))) => AsTable) end end From 7c97723d1e469cd753b1bb810eb216d601d73519 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Thu, 28 Jan 2021 22:59:48 +0100 Subject: [PATCH 08/11] Better test --- test/grouping.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/grouping.jl b/test/grouping.jl index e2a2c9d116..8f5f30b3f7 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3588,7 +3588,7 @@ end Any[1, 2, 3, 4, 5, 6, 2.1, missing, 'a'], Any[1, 2, 3.1, 4, 5, 6, 2.1, missing, 'a']), x in (1:length(y), rand(1:2, length(y)), rand(1:3, length(y))) - df = DataFrame(x=x, y1=y, y2=y) + df = DataFrame(x=x, y1=y, y2=reverse(y)) gd = groupby(df, :x) res = combine(gd, :y1 => (y -> y[1]) => :y1, :y2 => (y -> y[end]) => :y2) # sleep ensures one task will widen the result after the other is done, From 7d26ba289a5f9e5a4cec12ad5d8acf7787a24fc0 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 6 Feb 2021 15:56:39 +0100 Subject: [PATCH 09/11] Print number of threads --- test/runtests.jl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index ca9e1fd87e..a2f9ff1fc2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -8,8 +8,12 @@ anyerrors = false using DataFrames, Dates, Test, Random -if VERSION > v"1.3" && Threads.nthreads() < 2 - @warn("Running with only one thread: correctness of parallel operations is not tested") +if VERSION > v"1.3" + if Threads.nthreads() < 2 + @warn("Running with only one thread: correctness of parallel operations is not tested") + else + @show Threads.nthreads() + end end my_tests = ["utils.jl", From 9f97dfb526d03fdc1e7ff69e7782fb17b3d63016 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 6 Feb 2021 18:34:02 +0100 Subject: [PATCH 10/11] Add tests for CategoricalArrays thread safety --- test/grouping.jl | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/test/grouping.jl b/test/grouping.jl index 8f5f30b3f7..4e0e2eb9f8 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3619,4 +3619,32 @@ end end end +@testset "CategoricalArray thread safety" begin + Random.seed!(35) + df = DataFrame(x=rand(1:10, 100), + y=categorical(rand(1:10, 100)), + z=categorical(rand(100:200, 100))) + gd = groupby(df, :x) + + @test combine(gd, :y => (y -> first(y)) => :y) == + combine(gd, [:y, :z] => ((y, z) -> first(y)) => :y) == + combine(gd, :y => (y -> get(first(y))) => :y) + + @test combine(gd, [:x, :y, :z] => + ((x, y, z) -> first(x) <= 5 ? first(y) : first(z)) => :y) == + combine(gd, [:x, :y, :z] => + ((x, y, z) -> first(x) <= 5 ? get(first(y)) : get(first(z))) => :y) + + # Check that assigning CategoricalValue never mutates the target pool + # We rely on this behavior to avoid corruption when multiple threads call + # getindex and setindex! on the same array, possibly with CategoricalValues + # with different pools + x = categorical([1, 2, 3]) + y = categorical([1, 2, 4]) + pool = x.pool + x[1] = y[1] + @test x.pool !== pool + @test levels(pool) == 1:3 +end + end # module From 8b6fe9b87cfc523da2f47e71a2071c60923b21ee Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 7 Feb 2021 12:51:12 +0100 Subject: [PATCH 11/11] Disable multithreading with CategoricalArrays with different levels --- src/groupeddataframe/complextransforms.jl | 53 ++++++++++++++++++++--- test/grouping.jl | 36 +++++++-------- 2 files changed, 64 insertions(+), 25 deletions(-) diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index 9b922bbe20..5d4a157ce7 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -176,6 +176,40 @@ function _combine_rows_with_first_task!(tid::Integer, return outcols end +# CategoricalArray is thread-safe only when input and output levels are equal +# since then all values are added upfront. Otherwise, if the function returns +# CategoricalValues mixed from different pools, one thread may add values, +# which may put the invpool in an invalid state while the other one is reading from it +function isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, + incols::Union{Tuple, NamedTuple}) + anycat = any(outcols) do col + T = typeof(col) + # If the first result is missing, widening can give a CategoricalArray + # if later results are CategoricalValues + eltype(col) === Missing || + (nameof(T) === :CategoricalArray && + nameof(parentmodule(T)) === :CategoricalArrays) + end + if anycat + levs = nothing + for col in incols + T = typeof(col) + if nameof(T) === :CategoricalArray && + nameof(parentmodule(T)) === :CategoricalArrays + if levs !== nothing + levs == levels(col) || return false + else + levs = levels(col) + end + end + end + end + return true +end +isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::AbstractVector) = + isthreadsafe(outcols, (incols,)) +isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::Nothing) = true + function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, outcols::NTuple{N, AbstractVector}, f::Base.Callable, gd::GroupedDataFrame, @@ -190,11 +224,15 @@ function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, # handle empty GroupedDataFrame len == 0 && return outcols, colnames - # Handle groups other than the first one (which is handled below) + # Handle first group + j1 = fill_row!(firstrow, outcols, 1, 1, colnames) + @assert j1 === nothing # eltype is guaranteed to match + + # Handle groups other than the first one # Create up to one task per thread # This has lower overhead than creating one task per group, # but is optimal only if operations take roughly the same time for all groups - @static if VERSION >= v"1.4" + if VERSION >= v"1.4" && isthreadsafe(outcols, incols) basesize = max(1, cld(len - 1, Threads.nthreads())) partitions = Iterators.partition(2:len, basesize) else @@ -225,7 +263,13 @@ function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, end # Copy data for any tasks that finished before others widened columns + oldoutcols = outcols outcols = outcolsref[] + if outcols !== oldoutcols # first group + for k in 1:length(outcols) + outcols[k][1] = oldoutcols[k][1] + end + end for (tid, idx) in enumerate(partitions) if type_widened[tid] oldoutcols = fetch(tasks[tid]) @@ -239,11 +283,6 @@ function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, end end - # Handle first group - # This is done at the end to write directly to the final outcols - j1 = fill_row!(firstrow, outcols, 1, 1, colnames) - @assert j1 === nothing # eltype is guaranteed to match - return outcols, colnames end diff --git a/test/grouping.jl b/test/grouping.jl index 4e0e2eb9f8..696bd643c7 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3620,31 +3620,31 @@ end end @testset "CategoricalArray thread safety" begin + # These tests do not actually trigger multithreading bugs, + # but at least they check that the code that disables multithreading + # with CategoricalArray when levels are different works Random.seed!(35) df = DataFrame(x=rand(1:10, 100), - y=categorical(rand(1:10, 100)), - z=categorical(rand(100:200, 100))) + y=categorical(rand(10:15, 100)), + z=categorical(rand(0:20, 100))) + df.y2 = reverse(df.y) # Same levels gd = groupby(df, :x) - @test combine(gd, :y => (y -> first(y)) => :y) == - combine(gd, [:y, :z] => ((y, z) -> first(y)) => :y) == - combine(gd, :y => (y -> get(first(y))) => :y) + @test combine(gd, :y => (y -> y[1]) => :res) == + combine(gd, [:y, :y2] => ((y, x) -> y[1]) => :res) == + combine(gd, [:y, :x] => ((y, x) -> y[1]) => :res) == + combine(gd, [:y, :z] => ((y, z) -> y[1]) => :res) == + combine(gd, :y => (y -> get(y[1])) => :res) + + @test combine(gd, [:x, :y, :y2] => + ((x, y, y2) -> x[1] <= 5 ? y[1] : y2[1]) => :res) == + combine(gd, [:x, :y, :y2] => + ((x, y, y2) -> x[1] <= 5 ? get(y[1]) : get(y2[1])) => :res) @test combine(gd, [:x, :y, :z] => - ((x, y, z) -> first(x) <= 5 ? first(y) : first(z)) => :y) == + ((x, y, z) -> x[1] <= 5 ? y[1] : z[1]) => :res) == combine(gd, [:x, :y, :z] => - ((x, y, z) -> first(x) <= 5 ? get(first(y)) : get(first(z))) => :y) - - # Check that assigning CategoricalValue never mutates the target pool - # We rely on this behavior to avoid corruption when multiple threads call - # getindex and setindex! on the same array, possibly with CategoricalValues - # with different pools - x = categorical([1, 2, 3]) - y = categorical([1, 2, 4]) - pool = x.pool - x[1] = y[1] - @test x.pool !== pool - @test levels(pool) == 1:3 + ((x, y, z) -> x[1] <= 5 ? get(y[1]) : get(z[1])) => :res) end end # module