From 3b45c5bfe94ab15a98bb5ef8efcf6574e4adbfa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Tue, 13 Apr 2021 12:05:45 +0200 Subject: [PATCH] Despecialization part 2 (#2709) --- src/groupeddataframe/callprocessing.jl | 73 +++++++++++------------ src/groupeddataframe/complextransforms.jl | 53 ++++++++++------ src/groupeddataframe/fastaggregates.jl | 37 ++++++++---- src/groupeddataframe/splitapplycombine.jl | 4 +- test/grouping.jl | 7 +++ 5 files changed, 103 insertions(+), 71 deletions(-) diff --git a/src/groupeddataframe/callprocessing.jl b/src/groupeddataframe/callprocessing.jl index 859987d83d..e52def8dcf 100644 --- a/src/groupeddataframe/callprocessing.jl +++ b/src/groupeddataframe/callprocessing.jl @@ -1,3 +1,11 @@ +abstract type FirstColCount end + +struct FirstMultiCol <: FirstColCount end +struct FirstSingleCol <: FirstColCount end + +firstcoltype(firstmulticol::Bool) = + firstmulticol ? FirstMultiCol() : FirstSingleCol() + # Wrapping automatically adds column names when the value returned # by the user-provided function lacks them wrap(x::Union{AbstractDataFrame, DataFrameRow}) = x @@ -22,53 +30,44 @@ const ERROR_ROW_COUNT = "return value must not change its kind " * const ERROR_COL_COUNT = "function must return only single-column values, " * "or only multiple-column values" -wrap_table(x::Any, ::Val) = +wrap_table(x::Any, ::FirstSingleCol) = throw(ArgumentError(ERROR_ROW_COUNT)) -function wrap_table(x::Union{NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}, - AbstractDataFrame, AbstractMatrix}, - ::Val{firstmulticol}) where firstmulticol - if !firstmulticol - throw(ArgumentError(ERROR_COL_COUNT)) - end - return wrap(x) -end - -function wrap_table(x::AbstractVector, ::Val{firstmulticol}) where firstmulticol - if firstmulticol - throw(ArgumentError(ERROR_COL_COUNT)) - end - return wrap(x) -end - -function wrap_row(x::Any, ::Val{firstmulticol}) where firstmulticol - # NamedTuple is not possible in this branch - if (x isa DataFrameRow) ⊻ firstmulticol - throw(ArgumentError(ERROR_COL_COUNT)) - end - return wrap(x) -end - -function wrap_row(x::Union{AbstractArray{<:Any, 0}, Ref}, - ::Val{firstmulticol}) where firstmulticol - if firstmulticol +wrap_table(x::Any, ::FirstMultiCol) = + throw(ArgumentError(ERROR_ROW_COUNT)) +wrap_table(x::Union{NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}, + AbstractDataFrame, AbstractMatrix}, ::FirstSingleCol) = throw(ArgumentError(ERROR_COL_COUNT)) - end - return (x1 = x[],) -end - +wrap_table(x::Union{NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}, + AbstractDataFrame, AbstractMatrix}, ::FirstMultiCol) = + wrap(x) +wrap_table(x::AbstractVector, ::FirstSingleCol) = wrap(x) +wrap_table(x::AbstractVector, ::FirstMultiCol) = throw(ArgumentError(ERROR_COL_COUNT)) + +wrap_row(x::DataFrameRow, ::FirstSingleCol) = throw(ArgumentError(ERROR_COL_COUNT)) +wrap_row(x::DataFrameRow, ::FirstMultiCol) = wrap(x) +wrap_row(x::Any, ::FirstSingleCol) = wrap(x) +# NamedTuple is not possible in this branch +wrap_row(x::Any, ::FirstMultiCol) = throw(ArgumentError(ERROR_COL_COUNT)) + +wrap_row(x::Union{AbstractArray{<:Any, 0}, Ref}, ::FirstMultiCol) = + throw(ArgumentError(ERROR_COL_COUNT)) +wrap_row(x::Union{AbstractArray{<:Any, 0}, Ref}, ::FirstSingleCol) = + (x1 = x[],) # note that also NamedTuple() is correctly captured by this definition # as it is more specific than the one below wrap_row(::Union{AbstractVecOrMat, AbstractDataFrame, - NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}, ::Val) = + NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}, ::FirstSingleCol) = + throw(ArgumentError(ERROR_ROW_COUNT)) +wrap_row(::Union{AbstractVecOrMat, AbstractDataFrame, + NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}, ::FirstMultiCol) = throw(ArgumentError(ERROR_ROW_COUNT)) -function wrap_row(x::NamedTuple, ::Val{firstmulticol}) where firstmulticol +wrap_row(x::NamedTuple, ::FirstSingleCol) = throw(ArgumentError(ERROR_COL_COUNT)) + +function wrap_row(x::NamedTuple, ::FirstMultiCol) if any(v -> v isa AbstractVector, x) throw(ArgumentError("mixing single values and vectors in a named tuple is not allowed")) end - if !firstmulticol - throw(ArgumentError(ERROR_COL_COUNT)) - end return x end diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index ae6151f4f4..9585574d20 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -17,13 +17,14 @@ function _combine_multicol((firstres,)::Ref{Any}, wfun::Ref{Any}, gd::GroupedDat idx_agg = NOTHING_IDX_AGG end return _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols, - Val(firstmulticol), idx_agg) + firstmulticol, idx_agg) end function _combine_with_first((first,)::Ref{Any}, (f,)::Ref{Any}, gd::GroupedDataFrame, (incols,)::Ref{Any}, - firstmulticol::Val, idx_agg::Vector{Int}) + firstmulticol::Bool, idx_agg::Vector{Int}) + @assert first isa Union{NamedTuple, DataFrameRow, AbstractDataFrame} @assert f isa Base.Callable @assert incols isa Union{Nothing, AbstractVector, Tuple, NamedTuple} @assert first isa Union{NamedTuple, DataFrameRow, AbstractDataFrame} @@ -38,9 +39,9 @@ function _combine_with_first((first,)::Ref{Any}, elseif first isa DataFrameRow n = length(gd) eltys = [eltype(parent(first)[!, i]) for i in parentcols(index(first))] - elseif firstmulticol == Val(false) && first[1] isa Union{AbstractArray{<:Any, 0}, Ref} + elseif !firstmulticol && first[1] isa Union{AbstractArray{<:Any, 0}, Ref} extrude = true - first = wrap_row(first[1], firstmulticol) + first = wrap_row(first[1], firstcoltype(firstmulticol)) n = length(gd) eltys = (typeof(first[1]),) else # other NamedTuple giving a single row @@ -60,10 +61,14 @@ function _combine_with_first((first,)::Ref{Any}, NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}} outcols, finalcolnames = _combine_tables_with_first!(first, initialcols, idx, 1, 1, f, gd, incols, targetcolnames, - firstmulticol) + firstcoltype(firstmulticol)) else - outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, - f, gd, incols, targetcolnames, + outcols, finalcolnames = _combine_rows_with_first!(Ref{Any}(first), + Ref{Any}(initialcols), + Ref{Any}(f), + gd, + Ref{Any}(incols), + Ref{Any}(targetcolnames), firstmulticol) end return idx, outcols, collect(Symbol, finalcolnames) @@ -114,12 +119,13 @@ function _combine_rows_with_first_task!(tid::Integer, incols::Union{Nothing, AbstractVector, Tuple, NamedTuple}, colnames::NTuple{<:Any, Symbol}, - firstmulticol::Val) + firstmulticol::FirstColCount) j = nothing gdidx = gd.idx local newoutcols for i in rownext:rowend - row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) + row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), + firstmulticol) j = fill_row!(row, outcols, i, 1, colnames) if j !== nothing # Need to widen column # If another thread is already widening outcols, wait until it's done @@ -157,7 +163,8 @@ function _combine_rows_with_first_task!(tid::Integer, return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, type_widened, widen_type_lock, f, gd, starts, ends, - incols, colnames, firstmulticol) + incols, colnames, + firstmulticol) end # If other thread widened columns, copy already processed data to new vectors # This doesn't have to happen immediately (hence type_widened isn't atomic), @@ -177,7 +184,8 @@ function _combine_rows_with_first_task!(tid::Integer, return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, type_widened, widen_type_lock, f, gd, starts, ends, - incols, colnames, firstmulticol) + incols, colnames, + firstmulticol) end end return outcols @@ -217,12 +225,19 @@ isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::AbstractVector) = isthreadsafe(outcols, (incols,)) isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::Nothing) = true -function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, - outcols::NTuple{N, AbstractVector}, - f::Base.Callable, gd::GroupedDataFrame, - incols::Union{Nothing, AbstractVector, Tuple, NamedTuple}, - colnames::NTuple{N, Symbol}, - firstmulticol::Val) where N +function _combine_rows_with_first!((firstrow,)::Ref{Any}, + (outcols,)::Ref{Any}, + (f,)::Ref{Any}, + gd::GroupedDataFrame, + (incols,)::Ref{Any}, + (colnames,)::Ref{Any}, + firstmulticol::Bool) + @assert firstrow isa Union{NamedTuple, DataFrameRow} + @assert outcols isa NTuple{N, AbstractVector} where N + @assert f isa Base.Callable + @assert incols isa Union{Nothing, AbstractVector, Tuple, NamedTuple} + @assert colnames isa NTuple{N, Symbol} where N + @assert length(colnames) == length(outcols) len = length(gd) gdidx = gd.idx starts = gd.starts @@ -255,7 +270,7 @@ function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, outcols, outcolsref, type_widened, widen_type_lock, f, gd, starts, ends, incols, colnames, - firstmulticol) + firstcoltype(firstmulticol)) end # Workaround JuliaLang/julia#38931: @@ -336,7 +351,7 @@ function _combine_tables_with_first!(first::Union{AbstractDataFrame, f::Base.Callable, gd::GroupedDataFrame, incols::Union{Nothing, AbstractVector, Tuple, NamedTuple}, colnames::NTuple{N, Symbol}, - firstmulticol::Val) where N + firstmulticol::FirstColCount) where N len = length(gd) gdidx = gd.idx starts = gd.starts diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 83353f9321..c4b3f817fe 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -157,19 +157,9 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T return res end -function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) - n = length(gd) - if adjust !== nothing || checkempty - counts = zeros(Int, n) - end - groups = gd.groups - @static if VERSION >= v"1.4" - batchsize = Threads.nthreads() > 1 ? 100_000 : typemax(Int) - batches = Iterators.partition(eachindex(incol, groups), batchsize) - else - batches = (eachindex(incol, groups),) - end +function groupreduce!_helper(res::AbstractVector, f, op, condf, adjust, checkempty::Bool, + incol::AbstractVector, groups::Vector{Int}, counts::Vector{Int}, + batches) for batch in batches # Allow other tasks to do garbage collection while this one runs @static if VERSION >= v"1.4" @@ -186,12 +176,33 @@ function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Boo else res[gix] = op(res[gix], f(x, gix)) end + # this check is optimized out by constant propagation if adjust !== nothing || checkempty counts[gix] += 1 end end end end +end + +function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool, + incol::AbstractVector, gd::GroupedDataFrame) + n = length(gd) + if adjust !== nothing || checkempty + counts = zeros(Int, n) + else + counts = Int[] + end + groups = gd.groups + @static if VERSION >= v"1.4" + batchsize = Threads.nthreads() > 1 ? 100_000 : typemax(Int) + batches = Iterators.partition(eachindex(incol, groups), batchsize) + else + batches = (eachindex(incol, groups),) + end + + groupreduce!_helper(res, f, op, condf, adjust, checkempty, + incol, groups, counts, batches) # handle the case of an unitialized reduction if eltype(res) === Any if op === Base.add_sum diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index ad4f61cff5..e2336689fe 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -354,7 +354,7 @@ function _combine_process_pair_symbol(optional_i::Bool, # idx. Currently we do it only for single-row return values otherwise we pass # NOTHING_IDX_AGG to signal that idx has to be computed in _combine_with_first idx, outcols, _ = _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols, - Val(firstmulticol), + firstmulticol, firstres isa AbstractVector ? NOTHING_IDX_AGG : idx_agg[]) @assert length(outcols) == 1 outcol = outcols[1] @@ -393,7 +393,7 @@ function _combine_process_pair_astable(optional_i::Bool, @assert only(wincols) isa Union{Tuple, NamedTuple} if firstres isa AbstractVector idx, outcol_vec, _ = _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols, - Val(firstmulticol), NOTHING_IDX_AGG) + firstmulticol, NOTHING_IDX_AGG) @assert length(outcol_vec) == 1 res = outcol_vec[1] @assert length(res) > 0 diff --git a/test/grouping.jl b/test/grouping.jl index 1e6e8af43d..6a4f0afaee 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3886,4 +3886,11 @@ end @test df2.x_prod isa Vector{Int} end +@testset "extra tests of wrapper corner cases" begin + df = DataFrame(a=1:2) + gdf = groupby(df, :a) + @test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? 1 : x[1, :]) + @test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? (a=1, b=2) : Ref(1)) +end + end # module