Skip to content

Commit

Permalink
Despecialization part 2 (#2709)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkamins authored Apr 13, 2021
1 parent 9a2c4d2 commit 3b45c5b
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 71 deletions.
73 changes: 36 additions & 37 deletions src/groupeddataframe/callprocessing.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
abstract type FirstColCount end

struct FirstMultiCol <: FirstColCount end
struct FirstSingleCol <: FirstColCount end

firstcoltype(firstmulticol::Bool) =
firstmulticol ? FirstMultiCol() : FirstSingleCol()

# Wrapping automatically adds column names when the value returned
# by the user-provided function lacks them
wrap(x::Union{AbstractDataFrame, DataFrameRow}) = x
Expand All @@ -22,53 +30,44 @@ const ERROR_ROW_COUNT = "return value must not change its kind " *
const ERROR_COL_COUNT = "function must return only single-column values, " *
"or only multiple-column values"

wrap_table(x::Any, ::Val) =
wrap_table(x::Any, ::FirstSingleCol) =
throw(ArgumentError(ERROR_ROW_COUNT))
function wrap_table(x::Union{NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}},
AbstractDataFrame, AbstractMatrix},
::Val{firstmulticol}) where firstmulticol
if !firstmulticol
throw(ArgumentError(ERROR_COL_COUNT))
end
return wrap(x)
end

function wrap_table(x::AbstractVector, ::Val{firstmulticol}) where firstmulticol
if firstmulticol
throw(ArgumentError(ERROR_COL_COUNT))
end
return wrap(x)
end

function wrap_row(x::Any, ::Val{firstmulticol}) where firstmulticol
# NamedTuple is not possible in this branch
if (x isa DataFrameRow) firstmulticol
throw(ArgumentError(ERROR_COL_COUNT))
end
return wrap(x)
end

function wrap_row(x::Union{AbstractArray{<:Any, 0}, Ref},
::Val{firstmulticol}) where firstmulticol
if firstmulticol
wrap_table(x::Any, ::FirstMultiCol) =
throw(ArgumentError(ERROR_ROW_COUNT))
wrap_table(x::Union{NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}},
AbstractDataFrame, AbstractMatrix}, ::FirstSingleCol) =
throw(ArgumentError(ERROR_COL_COUNT))
end
return (x1 = x[],)
end

wrap_table(x::Union{NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}},
AbstractDataFrame, AbstractMatrix}, ::FirstMultiCol) =
wrap(x)
wrap_table(x::AbstractVector, ::FirstSingleCol) = wrap(x)
wrap_table(x::AbstractVector, ::FirstMultiCol) = throw(ArgumentError(ERROR_COL_COUNT))

wrap_row(x::DataFrameRow, ::FirstSingleCol) = throw(ArgumentError(ERROR_COL_COUNT))
wrap_row(x::DataFrameRow, ::FirstMultiCol) = wrap(x)
wrap_row(x::Any, ::FirstSingleCol) = wrap(x)
# NamedTuple is not possible in this branch
wrap_row(x::Any, ::FirstMultiCol) = throw(ArgumentError(ERROR_COL_COUNT))

wrap_row(x::Union{AbstractArray{<:Any, 0}, Ref}, ::FirstMultiCol) =
throw(ArgumentError(ERROR_COL_COUNT))
wrap_row(x::Union{AbstractArray{<:Any, 0}, Ref}, ::FirstSingleCol) =
(x1 = x[],)
# note that also NamedTuple() is correctly captured by this definition
# as it is more specific than the one below
wrap_row(::Union{AbstractVecOrMat, AbstractDataFrame,
NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}, ::Val) =
NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}, ::FirstSingleCol) =
throw(ArgumentError(ERROR_ROW_COUNT))
wrap_row(::Union{AbstractVecOrMat, AbstractDataFrame,
NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}, ::FirstMultiCol) =
throw(ArgumentError(ERROR_ROW_COUNT))

function wrap_row(x::NamedTuple, ::Val{firstmulticol}) where firstmulticol
wrap_row(x::NamedTuple, ::FirstSingleCol) = throw(ArgumentError(ERROR_COL_COUNT))

function wrap_row(x::NamedTuple, ::FirstMultiCol)
if any(v -> v isa AbstractVector, x)
throw(ArgumentError("mixing single values and vectors in a named tuple is not allowed"))
end
if !firstmulticol
throw(ArgumentError(ERROR_COL_COUNT))
end
return x
end

Expand Down
53 changes: 34 additions & 19 deletions src/groupeddataframe/complextransforms.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ function _combine_multicol((firstres,)::Ref{Any}, wfun::Ref{Any}, gd::GroupedDat
idx_agg = NOTHING_IDX_AGG
end
return _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols,
Val(firstmulticol), idx_agg)
firstmulticol, idx_agg)
end

function _combine_with_first((first,)::Ref{Any},
(f,)::Ref{Any}, gd::GroupedDataFrame,
(incols,)::Ref{Any},
firstmulticol::Val, idx_agg::Vector{Int})
firstmulticol::Bool, idx_agg::Vector{Int})
@assert first isa Union{NamedTuple, DataFrameRow, AbstractDataFrame}
@assert f isa Base.Callable
@assert incols isa Union{Nothing, AbstractVector, Tuple, NamedTuple}
@assert first isa Union{NamedTuple, DataFrameRow, AbstractDataFrame}
Expand All @@ -38,9 +39,9 @@ function _combine_with_first((first,)::Ref{Any},
elseif first isa DataFrameRow
n = length(gd)
eltys = [eltype(parent(first)[!, i]) for i in parentcols(index(first))]
elseif firstmulticol == Val(false) && first[1] isa Union{AbstractArray{<:Any, 0}, Ref}
elseif !firstmulticol && first[1] isa Union{AbstractArray{<:Any, 0}, Ref}
extrude = true
first = wrap_row(first[1], firstmulticol)
first = wrap_row(first[1], firstcoltype(firstmulticol))
n = length(gd)
eltys = (typeof(first[1]),)
else # other NamedTuple giving a single row
Expand All @@ -60,10 +61,14 @@ function _combine_with_first((first,)::Ref{Any},
NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}
outcols, finalcolnames = _combine_tables_with_first!(first, initialcols, idx, 1, 1,
f, gd, incols, targetcolnames,
firstmulticol)
firstcoltype(firstmulticol))
else
outcols, finalcolnames = _combine_rows_with_first!(first, initialcols,
f, gd, incols, targetcolnames,
outcols, finalcolnames = _combine_rows_with_first!(Ref{Any}(first),
Ref{Any}(initialcols),
Ref{Any}(f),
gd,
Ref{Any}(incols),
Ref{Any}(targetcolnames),
firstmulticol)
end
return idx, outcols, collect(Symbol, finalcolnames)
Expand Down Expand Up @@ -114,12 +119,13 @@ function _combine_rows_with_first_task!(tid::Integer,
incols::Union{Nothing, AbstractVector,
Tuple, NamedTuple},
colnames::NTuple{<:Any, Symbol},
firstmulticol::Val)
firstmulticol::FirstColCount)
j = nothing
gdidx = gd.idx
local newoutcols
for i in rownext:rowend
row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol)
row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i),
firstmulticol)
j = fill_row!(row, outcols, i, 1, colnames)
if j !== nothing # Need to widen column
# If another thread is already widening outcols, wait until it's done
Expand Down Expand Up @@ -157,7 +163,8 @@ function _combine_rows_with_first_task!(tid::Integer,
return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends,
incols, colnames, firstmulticol)
incols, colnames,
firstmulticol)
end
# If other thread widened columns, copy already processed data to new vectors
# This doesn't have to happen immediately (hence type_widened isn't atomic),
Expand All @@ -177,7 +184,8 @@ function _combine_rows_with_first_task!(tid::Integer,
return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends,
incols, colnames, firstmulticol)
incols, colnames,
firstmulticol)
end
end
return outcols
Expand Down Expand Up @@ -217,12 +225,19 @@ isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::AbstractVector) =
isthreadsafe(outcols, (incols,))
isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::Nothing) = true

function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow},
outcols::NTuple{N, AbstractVector},
f::Base.Callable, gd::GroupedDataFrame,
incols::Union{Nothing, AbstractVector, Tuple, NamedTuple},
colnames::NTuple{N, Symbol},
firstmulticol::Val) where N
function _combine_rows_with_first!((firstrow,)::Ref{Any},
(outcols,)::Ref{Any},
(f,)::Ref{Any},
gd::GroupedDataFrame,
(incols,)::Ref{Any},
(colnames,)::Ref{Any},
firstmulticol::Bool)
@assert firstrow isa Union{NamedTuple, DataFrameRow}
@assert outcols isa NTuple{N, AbstractVector} where N
@assert f isa Base.Callable
@assert incols isa Union{Nothing, AbstractVector, Tuple, NamedTuple}
@assert colnames isa NTuple{N, Symbol} where N
@assert length(colnames) == length(outcols)
len = length(gd)
gdidx = gd.idx
starts = gd.starts
Expand Down Expand Up @@ -255,7 +270,7 @@ function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow},
outcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends, incols, colnames,
firstmulticol)
firstcoltype(firstmulticol))
end

# Workaround JuliaLang/julia#38931:
Expand Down Expand Up @@ -336,7 +351,7 @@ function _combine_tables_with_first!(first::Union{AbstractDataFrame,
f::Base.Callable, gd::GroupedDataFrame,
incols::Union{Nothing, AbstractVector, Tuple, NamedTuple},
colnames::NTuple{N, Symbol},
firstmulticol::Val) where N
firstmulticol::FirstColCount) where N
len = length(gd)
gdidx = gd.idx
starts = gd.starts
Expand Down
37 changes: 24 additions & 13 deletions src/groupeddataframe/fastaggregates.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,9 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
return res
end

function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame)
n = length(gd)
if adjust !== nothing || checkempty
counts = zeros(Int, n)
end
groups = gd.groups
@static if VERSION >= v"1.4"
batchsize = Threads.nthreads() > 1 ? 100_000 : typemax(Int)
batches = Iterators.partition(eachindex(incol, groups), batchsize)
else
batches = (eachindex(incol, groups),)
end
function groupreduce!_helper(res::AbstractVector, f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, groups::Vector{Int}, counts::Vector{Int},
batches)
for batch in batches
# Allow other tasks to do garbage collection while this one runs
@static if VERSION >= v"1.4"
Expand All @@ -186,12 +176,33 @@ function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Boo
else
res[gix] = op(res[gix], f(x, gix))
end
# this check is optimized out by constant propagation
if adjust !== nothing || checkempty
counts[gix] += 1
end
end
end
end
end

function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame)
n = length(gd)
if adjust !== nothing || checkempty
counts = zeros(Int, n)
else
counts = Int[]
end
groups = gd.groups
@static if VERSION >= v"1.4"
batchsize = Threads.nthreads() > 1 ? 100_000 : typemax(Int)
batches = Iterators.partition(eachindex(incol, groups), batchsize)
else
batches = (eachindex(incol, groups),)
end

groupreduce!_helper(res, f, op, condf, adjust, checkempty,
incol, groups, counts, batches)
# handle the case of an unitialized reduction
if eltype(res) === Any
if op === Base.add_sum
Expand Down
4 changes: 2 additions & 2 deletions src/groupeddataframe/splitapplycombine.jl
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ function _combine_process_pair_symbol(optional_i::Bool,
# idx. Currently we do it only for single-row return values otherwise we pass
# NOTHING_IDX_AGG to signal that idx has to be computed in _combine_with_first
idx, outcols, _ = _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols,
Val(firstmulticol),
firstmulticol,
firstres isa AbstractVector ? NOTHING_IDX_AGG : idx_agg[])
@assert length(outcols) == 1
outcol = outcols[1]
Expand Down Expand Up @@ -393,7 +393,7 @@ function _combine_process_pair_astable(optional_i::Bool,
@assert only(wincols) isa Union{Tuple, NamedTuple}
if firstres isa AbstractVector
idx, outcol_vec, _ = _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols,
Val(firstmulticol), NOTHING_IDX_AGG)
firstmulticol, NOTHING_IDX_AGG)
@assert length(outcol_vec) == 1
res = outcol_vec[1]
@assert length(res) > 0
Expand Down
7 changes: 7 additions & 0 deletions test/grouping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3886,4 +3886,11 @@ end
@test df2.x_prod isa Vector{Int}
end

@testset "extra tests of wrapper corner cases" begin
df = DataFrame(a=1:2)
gdf = groupby(df, :a)
@test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? 1 : x[1, :])
@test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? (a=1, b=2) : Ref(1))
end

end # module

0 comments on commit 3b45c5b

Please sign in to comment.