diff --git a/benchmarks/grouping_performance.jl b/benchmarks/grouping_performance.jl new file mode 100644 index 0000000000..671e222fc1 --- /dev/null +++ b/benchmarks/grouping_performance.jl @@ -0,0 +1,52 @@ +using DataFrames +using CategoricalArrays +using PooledArrays +using BenchmarkTools +using Random + +Random.seed!(1) + +grouping_benchmarks = BenchmarkGroup() + +# `refpool`/`refarray` optimized grouping method +refpool_benchmarks = grouping_benchmarks["refpool"] = BenchmarkGroup() + +for k in (10, 10_000), n in (100, 100_000, 10_000_000) + for x in (PooledArray(rand(1:k, n)), + CategoricalArray(rand(1:n, 10_000_000)), + PooledArray(rand([missing; 1:n], 10_000_000)), + CategoricalArray(rand([missing; 1:n], 10_000_000))) + df = DataFrame(x=x) + + refpool_benchmarks[k, n, nameof(typeof(x)), "skipmissing=false"] = + @benchmarkable groupby($df, :x) + + # Skipping missing values + refpool_benchmarks[k, n, nameof(typeof(x)), "skipmissing=true"] = + @benchmarkable groupby($df, :x, skipmissing=true) + + # Empty group which requires adjusting group indices + replace!(df.x, 5 => 6) + refpool_benchmarks[k, n, nameof(typeof(x)), "empty group"] = + @benchmarkable groupby($df, :x) + end +end + +# If a cache of tuned parameters already exists, use it, otherwise, tune and cache +# the benchmark parameters. Reusing cached parameters is faster and more reliable +# than re-tuning `suite` every time the file is included. +paramspath = joinpath(dirname(@__FILE__), "params.json") + +if isfile(paramspath) + loadparams!(grouping_benchmarks, BenchmarkTools.load(paramspath)[1], :evals); +else + tune!(grouping_benchmarks) + BenchmarkTools.save(paramspath, params(grouping_benchmarks)); +end + +grouping_results = run(grouping_benchmarks, verbose=true) +# using Serialization +# serialize("grouping_results.jls", grouping_results) +# leaves(judge(median(grouping_results1), median(grouping_results2))) +# leaves(regressions(judge(median(grouping_results1), median(grouping_results2)))) +# leaves(improvements(judge(median(grouping_results1), median(grouping_results2)))) \ No newline at end of file diff --git a/docs/src/lib/internals.md b/docs/src/lib/internals.md index 852037c545..b67c11f365 100644 --- a/docs/src/lib/internals.md +++ b/docs/src/lib/internals.md @@ -15,4 +15,5 @@ gennames getmaxwidths ourshow ourstrwidth +tforeach ``` diff --git a/src/dataframerow/utils.jl b/src/dataframerow/utils.jl index 4d16e36e72..fb13357b77 100644 --- a/src/dataframerow/utils.jl +++ b/src/dataframerow/utils.jl @@ -338,46 +338,53 @@ function row_group_slots(cols::NTuple{N, AbstractVector}, end refmap end - @inbounds for i in eachindex(groups) - local refs_i - let i=i # Workaround for julia#15276 - refs_i = map(c -> c[i], refarrays) - end - vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds) - j = sum(vals) + 1 - # x < 0 happens with -1 in refmap, which corresponds to missing - if skipmissing && any(x -> x < 0, vals) - j = 0 - else - seen[j] = true + tforeach(eachindex(groups), basesize=1_000_000) do i + @inbounds begin + local refs_i + let i=i # Workaround for julia#15276 + refs_i = map(c -> c[i], refarrays) + end + vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds) + j = sum(vals) + 1 + # x < 0 happens with -1 in refmap, which corresponds to missing + if skipmissing && any(x -> x < 0, vals) + j = 0 + else + seen[j] = true + end + groups[i] = j end - groups[i] = j end else - @inbounds for i in eachindex(groups) - local refs_i - let i=i # Workaround for julia#15276 - refs_i = map(refarrays, missinginds) do ref, missingind - r = Int(ref[i]) - if skipmissing - return r == missingind ? -1 : (r > missingind ? r-1 : r) - else - return r + tforeach(eachindex(groups), basesize=1_000_000) do i + @inbounds begin + local refs_i + let i=i # Workaround for julia#15276 + refs_i = map(refarrays, missinginds) do ref, missingind + r = Int(ref[i]) + if skipmissing + return r == missingind ? -1 : (r > missingind ? r-1 : r) + else + return r + end end end + vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds) + j = sum(vals) + 1 + # x < 0 happens with -1, which corresponds to missing + if skipmissing && any(x -> x < 0, vals) + j = 0 + else + seen[j] = true + end + groups[i] = j end - vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds) - j = sum(vals) + 1 - # x < 0 happens with -1, which corresponds to missing - if skipmissing && any(x -> x < 0, vals) - j = 0 - else - seen[j] = true - end - groups[i] = j end end - if !all(seen) # Compress group indices to remove unused ones + # If some groups are unused, compress group indices to drop them + # sum(seen) is faster than all(seen) when not short-circuiting, + # and short-circuit would only happen in the slower case anyway + if sum(seen) < length(seen) oldngroups = ngroups remap = zeros(Int, ngroups) ngroups = 0 diff --git a/src/other/utils.jl b/src/other/utils.jl index e13627748c..05c1be7028 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -83,3 +83,48 @@ else end funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) + +# Compute chunks of indices, each with at least `basesize` entries +# This method ensures balanced sizes by avoiding a small last chunk +function split_indices(len::Integer, basesize::Integer) + len′ = Int64(len) # Avoid overflow on 32-bit machines + np = max(1, div(len, basesize)) + return (Int(1 + ((i - 1) * len′) ÷ np):Int((i * len′) ÷ np) for i in 1:np) +end + +""" + tforeach(f, x::AbstractArray; basesize::Integer) + +Apply function `f` to each entry in `x` in parallel, spawning +one separate task for each block of at least `basesize` entries. + +A number of task higher than `Threads.nthreads()` may be spawned, +since that can allow for a more efficient load balancing in case +some threads are busy (nested parallelism). +""" +function tforeach(f, x::AbstractArray; basesize::Integer) + @assert firstindex(x) == 1 + + @static if VERSION >= v"1.4" + nt = Threads.nthreads() + len = length(x) + if nt > 1 && len > basesize + @sync for p in split_indices(len, basesize) + Threads.@spawn begin + for i in p + f(@inbounds x[i]) + end + end + end + else + for i in eachindex(x) + f(@inbounds x[i]) + end + end + else + for i in eachindex(x) + f(@inbounds x[i]) + end + end + return +end diff --git a/test/grouping.jl b/test/grouping.jl index 91db3e5abb..1faf95ccd3 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3831,4 +3831,22 @@ end ((x, y, z) -> x[1] <= 5 ? unwrap(y[1]) : unwrap(z[1])) => :res) end +@testset "groupby multithreading" begin + for x in (PooledArray(rand(1:10, 1_100_000)), + PooledArray(rand([1:9; missing], 1_100_000))), + y in (PooledArray(rand(["a", "b", "c", "d"], 1_100_000)), + PooledArray(rand(["a"; "b"; "c"; missing], 1_100_000))) + df = DataFrame(x=x, y=y) + + # Checks are done by groupby_checked + @test length(groupby_checked(df, :x)) == 10 + @test length(groupby_checked(df, :x, skipmissing=true)) == + length(unique(skipmissing(x))) + + @test length(groupby_checked(df, [:x, :y])) == 40 + @test length(groupby_checked(df, [:x, :y], skipmissing=true)) == + length(unique(skipmissing(x))) * length(unique(skipmissing(y))) + end +end + end # module diff --git a/test/utils.jl b/test/utils.jl index 0db4577030..f5efeaf6d9 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -101,4 +101,28 @@ end @test fetch(t) === true end +@testset "split_indices" begin + for len in 0:12 + basesize = 10 + x = DataFrames.split_indices(len, basesize) + + @test length(x) == max(1, div(len, basesize)) + @test reduce(vcat, x) === 1:len + vmin, vmax = extrema(length(v) for v in x) + @test vmin + 1 == vmax || vmin == vmax + @test len < basesize || vmin >= basesize + end + + # Check overflow on 32-bit + len = typemax(Int32) + basesize = 100_000_000 + x = collect(DataFrames.split_indices(len, basesize)) + @test length(x) == div(len, basesize) + @test x[1][1] === 1 + @test x[end][end] === Int(len) + vmin, vmax = extrema(length(v) for v in x) + @test vmin + 1 == vmax || vmin == vmax + @test len < basesize || vmin >= basesize +end + end # module