Skip to content

Commit

Permalink
Use refpool optimized method for integer grouping (#2610)
Browse files Browse the repository at this point in the history
  • Loading branch information
nalimilan authored Jan 31, 2021
1 parent 11b6aff commit f05fc73
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 91 deletions.
124 changes: 106 additions & 18 deletions src/dataframerow/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function hashrows_col!(h::Vector{UInt},
# which is always zero
# also when the number of values in the pool is more than half the length
# of the vector avoid using this path. 50% is roughly based on benchmarks
if firstcol && 2 * length(rp) < length(v)
if firstcol && Int64(2) * length(rp) < length(v)
hashes = Vector{UInt}(undef, length(rp))
@inbounds for (i, v) in zip(eachindex(hashes), rp)
hashes[i] = hash(v)
Expand Down Expand Up @@ -94,6 +94,85 @@ isequal_row(cols1::Tuple{Vararg{AbstractVector}}, r1::Int,
isequal(cols1[1][r1], cols2[1][r2]) &&
isequal_row(Base.tail(cols1), r1, Base.tail(cols2), r2)

# IntegerRefarray and IntegerRefPool are two complementary view types that allow
# wrapping arrays with Union{Real, Missing} eltype to satisfy the DataAPI.refpool
# and DataAPI.refarray API when calling row_group_slots.
# IntegerRefarray converts values to Int and replaces missing with an integer
# (set by the caller to the maximum value + 1)
# IntegerRefPool subtracts the minimum value - 1 and replaces back the maximum
# value + 1 to missing. This ensures all values are in 1:length(refpool), while
# row_group_slots knows the number of (potential) groups via length(refpool)
# and is able to skip missing values when skipmissing=true

struct IntegerRefarray{T<:AbstractArray} <: AbstractVector{Int}
x::T
offset::Int
replacement::Int
end

Base.size(x::IntegerRefarray) = size(x.x)
Base.axes(x::IntegerRefarray) = axes(x.x)
Base.IndexStyle(::Type{<:IntegerRefarray{T}}) where {T} = Base.IndexStyle(T)
@inline function Base.getindex(x::IntegerRefarray, i)
@boundscheck checkbounds(x.x, i)
@inbounds v = x.x[i]
if eltype(x.x) >: Missing && v === missing
return x.replacement
else
# Overflow is guaranteed not to happen by checks before calling the constructor
return Int(v - x.offset)
end
end

struct IntegerRefpool{T<:Union{Int, Missing}} <: AbstractVector{T}
max::Int
function IntegerRefpool{T}(max::Integer) where T<:Union{Int, Missing}
@assert max < typemax(Int) # to store missing values as max + 1
new{T}(max)
end
end

Base.size(x::IntegerRefpool{T}) where {T} = (x.max + (T >: Missing),)
Base.axes(x::IntegerRefpool{T}) where {T} = (Base.OneTo(x.max + (T >: Missing)),)
Base.IndexStyle(::Type{<:IntegerRefpool}) = Base.IndexLinear()
@inline function Base.getindex(x::IntegerRefpool{T}, i::Real) where T
@boundscheck checkbounds(x, i)
if T >: Missing && i == x.max + 1
return missing
else
return Int(i)
end
end
Base.allunique(::IntegerRefpool) = true
Base.issorted(::IntegerRefpool) = true

function refpool_and_array(x::AbstractArray)
refpool = DataAPI.refpool(x)
refarray = DataAPI.refarray(x)

if refpool !== nothing
return refpool, refarray
elseif x isa AbstractArray{<:Union{Real, Missing}} &&
all(v -> ismissing(v) | isinteger(v), x) &&
!isempty(skipmissing(x))
minval, maxval = extrema(skipmissing(x))
ngroups = big(maxval) - big(minval) + 1
# Threshold chosen with the same rationale as the row_group_slots refpool method:
# refpool approach is faster but we should not allocate too much memory either
# We also have to avoid overflow, including with ngroups + 1 for missing values
# (note that it would be possible to allow minval and maxval to be outside of the
# range supported by Int by adding a type parameter for minval to IntegerRefarray)
if typemin(Int) < minval <= maxval < typemax(Int) &&
ngroups + 1 <= Int64(2) * length(x) <= typemax(Int)
T = eltype(x) >: Missing ? Union{Int, Missing} : Int
refpool′ = IntegerRefpool{T}(Int(ngroups))
refarray′ = IntegerRefarray(x, Int(minval) - 1, Int(ngroups) + 1)
return refpool′, refarray′
end
end
return nothing, nothing
end

# Helper function for RowGroupDict.
# Returns a tuple:
# 1) the highest group index in the `groups` vector
Expand All @@ -103,16 +182,21 @@ isequal_row(cols1::Tuple{Vararg{AbstractVector}}, r1::Int,
# 4) whether groups are already sorted
# Optional `groups` vector is set to the group indices of each row (starting at 1)
# With skipmissing=true, rows with missing values are attributed index 0.
row_group_slots(cols::Tuple{Vararg{AbstractVector}},
hash::Val = Val(true),
groups::Union{Vector{Int}, Nothing} = nothing,
skipmissing::Bool = false,
sort::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} =
row_group_slots(cols, DataAPI.refpool.(cols), hash, groups, skipmissing, sort)
function row_group_slots(cols::Tuple{Vararg{AbstractVector}},
hash::Val = Val(true),
groups::Union{Vector{Int}, Nothing} = nothing,
skipmissing::Bool = false,
sort::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool}
rpa = refpool_and_array.(cols)
refpools = first.(rpa)
refarrays = last.(rpa)
row_group_slots(cols, refpools, refarrays, hash, groups, skipmissing, sort)
end

# Generic fallback method based on open adressing hash table
function row_group_slots(cols::Tuple{Vararg{AbstractVector}},
refpools::Any,
refpools::Any, # Ignored
refarrays::Any, # Ignored
hash::Val = Val(true),
groups::Union{Vector{Int}, Nothing} = nothing,
skipmissing::Bool = false,
Expand Down Expand Up @@ -163,8 +247,12 @@ function row_group_slots(cols::Tuple{Vararg{AbstractVector}},
end

# Optimized method for arrays for which DataAPI.refpool is defined and returns an AbstractVector
function row_group_slots(cols::NTuple{N, <:AbstractVector},
refpools::NTuple{N, <:AbstractVector},
function row_group_slots(cols::NTuple{N, AbstractVector},
refpools::NTuple{N, AbstractVector},
refarrays::NTuple{N,
Union{AbstractVector{<:Real},
Missings.EachReplaceMissing{
<:AbstractVector{<:Union{Real, Missing}}}}},
hash::Val{false},
groups::Union{Vector{Int}, Nothing} = nothing,
skipmissing::Bool = false,
Expand All @@ -173,7 +261,6 @@ function row_group_slots(cols::NTuple{N, <:AbstractVector},
# and this method needs to allocate a groups vector anyway
@assert groups !== nothing && all(col -> length(col) == length(groups), cols)

refs = map(DataAPI.refarray, cols)
missinginds = map(refpools) do refpool
eltype(refpool) >: Missing ?
something(findfirst(ismissing, refpool), lastindex(refpool)+1) : lastindex(refpool)+1
Expand Down Expand Up @@ -201,16 +288,17 @@ function row_group_slots(cols::NTuple{N, <:AbstractVector},
# but it needs to remain reasonable compared with the size of the data frame.
anydups = !all(allunique, refpools)
if prod(big.(ngroupstup)) > typemax(Int) ||
ngroups > 2 * length(groups) ||
ngroups > Int64(2) * length(groups) ||
anydups
# In the simplest case, we can work directly with the reference codes
newcols = (skipmissing && any(refpool -> eltype(refpool) >: Missing, refpools)) ||
!(refarrays isa NTuple{<:Any, AbstractVector}) ||
sort ||
anydups ? cols : refs
anydups ? cols : refarrays
return invoke(row_group_slots,
Tuple{Tuple{Vararg{AbstractVector}}, Any, Val,
Tuple{Tuple{Vararg{AbstractVector}}, Any, Any, Val,
Union{Vector{Int}, Nothing}, Bool, Bool},
newcols, refpools, hash, groups, skipmissing, sort)
newcols, refpools, refarrays, hash, groups, skipmissing, sort)
end

seen = fill(false, ngroups)
Expand Down Expand Up @@ -253,7 +341,7 @@ function row_group_slots(cols::NTuple{N, <:AbstractVector},
@inbounds for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(c -> c[i], refs)
refs_i = map(c -> c[i], refarrays)
end
vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds)
j = sum(vals) + 1
Expand All @@ -269,7 +357,7 @@ function row_group_slots(cols::NTuple{N, <:AbstractVector},
@inbounds for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refs, missinginds) do ref, missingind
refs_i = map(refarrays, missinginds) do ref, missingind
r = Int(ref[i])
if skipmissing
return r == missingind ? -1 : (r > missingind ? r-1 : r)
Expand Down Expand Up @@ -322,7 +410,7 @@ function compute_indices(groups::AbstractVector{<:Integer}, ngroups::Integer)

# group start positions in a sorted table
starts = Vector{Int}(undef, ngroups+1)
if length(starts) > 1
if length(starts) > 0
starts[1] = 1
@inbounds for i in 1:ngroups
starts[i+1] = starts[i] + stops[i]
Expand Down
9 changes: 4 additions & 5 deletions test/data.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@ const ≅ = isequal
d3 = randn(N)
d4 = randn(N)
df7 = DataFrame([d1, d2, d3], [:d1, :d2, :d3])
ref_d1 = unique(d1)

#test_group("groupby")
gd = groupby(df7, :d1)
@test length(gd) == 2
@test gd[1][:, :d2] d2[d1 .== ref_d1[1]]
@test gd[2][:, :d2] d2[d1 .== ref_d1[2]]
@test gd[1][:, :d3] == d3[d1 .== ref_d1[1]]
@test gd[2][:, :d3] == d3[d1 .== ref_d1[2]]
@test gd[1][:, :d2] d2[d1 .== 1]
@test gd[2][:, :d2] d2[d1 .== 2]
@test gd[1][:, :d3] == d3[d1 .== 1]
@test gd[2][:, :d3] == d3[d1 .== 2]

g1 = groupby(df7, [:d1, :d2])
g2 = groupby(df7, [:d2, :d1])
Expand Down
Loading

0 comments on commit f05fc73

Please sign in to comment.