Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multithreading in row_group_slots refarray method #2661

Merged
merged 8 commits into from
Mar 22, 2021
Merged

Conversation

nalimilan
Copy link
Member

main:

julia> using Revise, DataFrames, PooledArrays, BenchmarkTools

julia> x = PooledArray(rand(1:10, 10_000_000));

julia> df = DataFrame(x=x);

julia> @btime groupby(df, :x);
  40.742 ms (38 allocations: 76.30 MiB)

julia> df.x[df.x .== 5] .= 6; # Empty group

julia> @btime groupby(df, :x);
  52.488 ms (39 allocations: 76.30 MiB)

julia> x = PooledArray(rand([missing; 1:10], 10_000_000));

julia> df = DataFrame(x=x);

julia> @btime groupby(df, :x);
  42.955 ms (42 allocations: 76.30 MiB)

julia> @btime groupby(df, :x, skipmissing=true);
  63.951 ms (42 allocations: 76.30 MiB)

PR with two threads:

julia> using Revise, DataFrames, PooledArrays, BenchmarkTools

julia> x = PooledArray(rand(1:10, 10_000_000));

julia> df = DataFrame(x=x);

julia> @btime groupby(df, :x);
  25.053 ms (113 allocations: 76.30 MiB)

julia> df.x[df.x .== 5] .= 6; # Empty group

julia> @btime groupby(df, :x);
  41.185 ms (114 allocations: 76.30 MiB)

julia> x = PooledArray(rand([missing; 1:10], 10_000_000));

julia> df = DataFrame(x=x);

julia> @btime groupby(df, :x);
  26.842 ms (116 allocations: 76.30 MiB)

julia> @btime groupby(df, :x, skipmissing=true);
  36.391 ms (116 allocations: 76.30 MiB)

src/other/utils.jl Outdated Show resolved Hide resolved
Copy link
Member

@bkamins bkamins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Looks good.
I assume you have mostly copy-pasted the implementation. Right?

I have left some small comments.

src/other/utils.jl Outdated Show resolved Hide resolved
@nalimilan
Copy link
Member Author

I assume you have mostly copy-pasted the implementation. Right?

tforeach is a very simplified version inspired by ThreadsX.foreach, which was itself inspired by ThreadTools. But it's so simplified that it doesn't have a lot in common:
https://github.com/tkf/ThreadsX.jl/blob/6a69562f0fb5663e5661881698d9f11f7d45dc06/src/foreach.jl#L8

@bkamins bkamins added this to the 1.0 milestone Mar 15, 2021
src/other/utils.jl Outdated Show resolved Hide resolved
# Round size up to ensure all chunks have at least `basesize` entries
# This ensures balanced sizes by avoiding a small last chunk
basesize′ = cld(len, fld(len, basesize))
@sync for p in Iterators.partition(x, basesize′)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to have a good sense for "common column types" when doing operations like this. For example, any <:AbstractArray ends up using view for each partition, but for generic iterators, it actually accumulates the partitions manually, which is less efficient.

All that to say, I think we're fine here, it's just something that I pause on because I don't always have a good mental model of what the most common kinds of column types are and whether there might be hidden bottlenecks in things like ths.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But columns are always AbstractArray?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why I think this is really a non-issue; but just gave me a pause to consider if we're really efficient here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. BTW, we don't use views are all, we just iterate over relevant indices.

test/utils.jl Outdated

@test length(x) == max(1, div(len, basesize))
@test reduce(vcat, x) === 1:len
vmin, vmax = extrema(length, x)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix 1.0:

Suggested change
vmin, vmax = extrema(length, x)
vmin, vmax = extrema(length(v) for v in x)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah - this pesky 1.0 LTS :)

test/utils.jl Outdated
@test length(x) == div(len, basesize)
@test x[1][1] === 1
@test x[end][end] === Int(len)
vmin, vmax = extrema(length, x)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
vmin, vmax = extrema(length, x)
vmin, vmax = extrema(length(v) for v in x)

@bkamins
Copy link
Member

bkamins commented Mar 16, 2021

@nalimilan - could you please in this PR start adding something to /benchmarks/ folder? (maybe also for the other threading PRs)? I fear that if we do not start adding the benchmarks incrementally we will never get them. Thank you!

@nalimilan
Copy link
Member Author

I've added benchmarks. I ended up using BenchmarkTools more systematically, as doing the comparisons manually was really hard. This should be easier to automate using PkgBenchmarks. Here are the results. I'm taking the median rather than the minimum as for some reason results were quite inconsistent with the latter.

PR 10 threads versus main 1 thread:

julia> leaves(judge(median(grouping_results_main), median(grouping_results)))
36-element Vector{Any}:
 (Any["refpool", (10000, 100, "PooledArray", "skipmissing=false")], TrialJudgement(-67.02% => improvement))
 (Any["refpool", (10, 100000, "CategoricalArray", "skipmissing=true")], TrialJudgement(-0.85% => invariant))
 (Any["refpool", (10000, 100000, "CategoricalArray", "skipmissing=false")], TrialJudgement(+7.02% => regression))
 (Any["refpool", (10, 100, "PooledArray", "skipmissing=true")], TrialJudgement(-66.93% => improvement))
 (Any["refpool", (10000, 100, "PooledArray", "empty group")], TrialJudgement(-67.29% => improvement))
 (Any["refpool", (10000, 10000000, "CategoricalArray", "skipmissing=false")], TrialJudgement(+3.44% => invariant))
 (Any["refpool", (10, 100000, "PooledArray", "skipmissing=false")], TrialJudgement(+1.72% => invariant))
 (Any["refpool", (10, 10000000, "CategoricalArray", "skipmissing=true")], TrialJudgement(-28.75% => improvement))
 (Any["refpool", (10000, 100, "CategoricalArray", "empty group")], TrialJudgement(-68.22% => improvement))
 (Any["refpool", (10000, 100000, "CategoricalArray", "skipmissing=true")], TrialJudgement(-1.00% => invariant))
 (Any["refpool", (10, 100000, "PooledArray", "empty group")], TrialJudgement(-4.63% => invariant))
 (Any["refpool", (10000, 100, "PooledArray", "skipmissing=true")], TrialJudgement(-65.23% => improvement))
 (Any["refpool", (10, 100, "CategoricalArray", "skipmissing=false")], TrialJudgement(-68.95% => improvement))
 (Any["refpool", (10, 10000000, "PooledArray", "skipmissing=false")], TrialJudgement(-42.16% => improvement))
 (Any["refpool", (10, 100000, "CategoricalArray", "empty group")], TrialJudgement(+6.91% => regression))
 (Any["refpool", (10000, 100000, "PooledArray", "skipmissing=false")], TrialJudgement(+5.11% => regression))
 (Any["refpool", (10000, 10000000, "CategoricalArray", "skipmissing=true")], TrialJudgement(-29.63% => improvement))
 (Any["refpool", (10, 10000000, "PooledArray", "empty group")], TrialJudgement(-39.39% => improvement))
 (Any["refpool", (10, 100000, "PooledArray", "skipmissing=true")], TrialJudgement(-1.25% => invariant))
 (Any["refpool", (10, 10000000, "CategoricalArray", "empty group")], TrialJudgement(-51.84% => improvement))
 (Any["refpool", (10000, 100000, "PooledArray", "empty group")], TrialJudgement(+1.60% => invariant))
 (Any["refpool", (10000, 100, "CategoricalArray", "skipmissing=false")], TrialJudgement(-71.81% => improvement))
 (Any["refpool", (10, 100, "CategoricalArray", "skipmissing=true")], TrialJudgement(-65.20% => improvement))
 (Any["refpool", (10000, 100000, "CategoricalArray", "empty group")], TrialJudgement(+4.60% => invariant))
 (Any["refpool", (10000, 10000000, "PooledArray", "skipmissing=false")], TrialJudgement(-37.56% => improvement))
 (Any["refpool", (10, 10000000, "PooledArray", "skipmissing=true")], TrialJudgement(-39.75% => improvement))
 (Any["refpool", (10000, 10000000, "PooledArray", "empty group")], TrialJudgement(-33.09% => improvement))
 (Any["refpool", (10000, 100000, "PooledArray", "skipmissing=true")], TrialJudgement(+0.40% => invariant))
 (Any["refpool", (10, 100, "PooledArray", "skipmissing=false")], TrialJudgement(-66.95% => improvement))
 (Any["refpool", (10, 100000, "CategoricalArray", "skipmissing=false")], TrialJudgement(-10.23% => improvement))
 (Any["refpool", (10000, 10000000, "CategoricalArray", "empty group")], TrialJudgement(+5.86% => regression))
 (Any["refpool", (10000, 100, "CategoricalArray", "skipmissing=true")], TrialJudgement(-69.29% => improvement))
 (Any["refpool", (10, 100, "PooledArray", "empty group")], TrialJudgement(-69.21% => improvement))
 (Any["refpool", (10000, 10000000, "PooledArray", "skipmissing=true")], TrialJudgement(-33.82% => improvement))
 (Any["refpool", (10, 10000000, "CategoricalArray", "skipmissing=false")], TrialJudgement(-52.23% => improvement))
 (Any["refpool", (10, 100, "CategoricalArray", "empty group")], TrialJudgement(-72.82% => improvement))

PR 1 thread versus main 1 thread:

julia> leaves(judge(median(grouping_results_main), median(grouping_results)))
36-element Vector{Any}:
 (Any["refpool", (10000, 100, "PooledArray", "skipmissing=false")], TrialJudgement(+0.09% => invariant))
 (Any["refpool", (10, 100000, "CategoricalArray", "skipmissing=true")], TrialJudgement(+0.96% => invariant))
 (Any["refpool", (10000, 100000, "CategoricalArray", "skipmissing=false")], TrialJudgement(+0.97% => invariant))
 (Any["refpool", (10, 100, "PooledArray", "skipmissing=true")], TrialJudgement(-5.04% => improvement))
 (Any["refpool", (10000, 100, "PooledArray", "empty group")], TrialJudgement(-0.09% => invariant))
 (Any["refpool", (10000, 10000000, "CategoricalArray", "skipmissing=false")], TrialJudgement(-2.39% => invariant))
 (Any["refpool", (10, 100000, "PooledArray", "skipmissing=false")], TrialJudgement(+1.42% => invariant))
 (Any["refpool", (10, 10000000, "CategoricalArray", "skipmissing=true")], TrialJudgement(-0.47% => invariant))
 (Any["refpool", (10000, 100, "CategoricalArray", "empty group")], TrialJudgement(-1.51% => invariant))
 (Any["refpool", (10000, 100000, "CategoricalArray", "skipmissing=true")], TrialJudgement(+0.87% => invariant))
 (Any["refpool", (10, 100000, "PooledArray", "empty group")], TrialJudgement(+1.44% => invariant))
 (Any["refpool", (10000, 100, "PooledArray", "skipmissing=true")], TrialJudgement(-5.27% => improvement))
 (Any["refpool", (10, 100, "CategoricalArray", "skipmissing=false")], TrialJudgement(-1.57% => invariant))
 (Any["refpool", (10, 10000000, "PooledArray", "skipmissing=false")], TrialJudgement(+0.16% => invariant))
 (Any["refpool", (10, 100000, "CategoricalArray", "empty group")], TrialJudgement(+1.10% => invariant))
 (Any["refpool", (10000, 100000, "PooledArray", "skipmissing=false")], TrialJudgement(+1.75% => invariant))
 (Any["refpool", (10000, 10000000, "CategoricalArray", "skipmissing=true")], TrialJudgement(-0.28% => invariant))
 (Any["refpool", (10, 10000000, "PooledArray", "empty group")], TrialJudgement(+0.11% => invariant))
 (Any["refpool", (10, 100000, "PooledArray", "skipmissing=true")], TrialJudgement(+1.96% => invariant))
 (Any["refpool", (10, 10000000, "CategoricalArray", "empty group")], TrialJudgement(-0.64% => invariant))
 (Any["refpool", (10000, 100000, "PooledArray", "empty group")], TrialJudgement(+1.52% => invariant))
 (Any["refpool", (10000, 100, "CategoricalArray", "skipmissing=false")], TrialJudgement(-1.30% => invariant))
 (Any["refpool", (10, 100, "CategoricalArray", "skipmissing=true")], TrialJudgement(-7.19% => improvement))
 (Any["refpool", (10000, 100000, "CategoricalArray", "empty group")], TrialJudgement(+0.84% => invariant))
 (Any["refpool", (10000, 10000000, "PooledArray", "skipmissing=false")], TrialJudgement(+1.45% => invariant))
 (Any["refpool", (10, 10000000, "PooledArray", "skipmissing=true")], TrialJudgement(-0.32% => invariant))
 (Any["refpool", (10000, 10000000, "PooledArray", "empty group")], TrialJudgement(+1.32% => invariant))
 (Any["refpool", (10000, 100000, "PooledArray", "skipmissing=true")], TrialJudgement(+1.96% => invariant))
 (Any["refpool", (10, 100, "PooledArray", "skipmissing=false")], TrialJudgement(+0.26% => invariant))
 (Any["refpool", (10, 100000, "CategoricalArray", "skipmissing=false")], TrialJudgement(+0.90% => invariant))
 (Any["refpool", (10000, 10000000, "CategoricalArray", "empty group")], TrialJudgement(-0.04% => invariant))
 (Any["refpool", (10000, 100, "CategoricalArray", "skipmissing=true")], TrialJudgement(-7.07% => improvement))
 (Any["refpool", (10, 100, "PooledArray", "empty group")], TrialJudgement(+0.25% => invariant))
 (Any["refpool", (10000, 10000000, "PooledArray", "skipmissing=true")], TrialJudgement(+3.22% => invariant))
 (Any["refpool", (10, 10000000, "CategoricalArray", "skipmissing=false")], TrialJudgement(-1.97% => invariant))
 (Any["refpool", (10, 100, "CategoricalArray", "empty group")], TrialJudgement(-1.45% => invariant))

@bkamins
Copy link
Member

bkamins commented Mar 21, 2021

Looks good to merge. Thank you!

@nalimilan nalimilan merged commit 872ec18 into main Mar 22, 2021
@nalimilan nalimilan deleted the nl/rowgroupslots branch March 22, 2021 08:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants