Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded custom grouped operations with single-row result #2588

Merged
merged 11 commits into from
Feb 10, 2021
12 changes: 7 additions & 5 deletions docs/src/man/split_apply_combine.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ It is allowed to mix single values and vectors if multiple transformations
are requested. In this case single value will be repeated to match the length
of columns specified by returned vectors.

A separate task is spawned for each specified transformation, allowing for
parallel operation when several transformations are requested and Julia was
started with more than one thread. Passed transformation functions should
therefore not modify global variables (i.e. they should be pure), or use
locks to control parallel accesses.
A separate task is spawned for each specified transformation; each transformation
then spawns as many tasks as Julia threads, and splits processing of groups across
them (however, currently transformations with optimized implementations like `sum`
bkamins marked this conversation as resolved.
Show resolved Hide resolved
and transformations that return multiple rows use a single task for all groups).
This allows for parallel operation when Julia was started with more than one
thread. Passed transformation functions should therefore not modify global variables
(i.e. they should be pure), or use locks to control parallel accesses.

To apply `function` to each row instead of whole columns, it can be wrapped in a
`ByRow` struct. `cols` can be any column indexing syntax, in which case
Expand Down
11 changes: 7 additions & 4 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,13 @@ const TRANSFORMATION_COMMON_RULES =
`copycols=false`, a `SubDataFrame` is returned without copying columns.

If a `GroupedDataFrame` is passed, a separate task is spawned for each
specified transformation, allowing for parallel operation when several
transformations are requested and Julia was started with more than one thread.
Passed transformation functions should therefore not modify global variables
(i.e. they should be pure), or use locks to control parallel accesses.
specified transformation; each transformation then spawns as many tasks
as Julia threads, and splits processing of groups across them
(however, currently transformations with optimized implementations like `sum`
and transformations that return multiple rows use a single task for all groups).
This allows for parallel operation when Julia was started with more than one
thread. Passed transformation functions should therefore not modify global
variables (i.e. they should be pure), or use locks to control parallel accesses.
In the future, parallelism may be extended to other cases, so this requirement
also holds for `DataFrame` inputs.
"""
Expand Down
16 changes: 9 additions & 7 deletions src/groupeddataframe/complextransforms.jl
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,17 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow},
# Create up to one task per thread
# This has lower overhead than creating one task per group,
# but is optimal only if operations take roughly the same time for all groups
basesize = max(1, (len - 1) ÷ Threads.nthreads())
partitions = collect(_partition(2:len, basesize))
@static if VERSION >= v"1.4"
basesize = max(1, (len - 1) ÷ Threads.nthreads())
Copy link
Member

@bkamins bkamins Jan 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is incorrect. as div rounds down and we spawn one thread too many. Eg.:

julia> max(1, (20 - 1) ÷ 4)
4

julia> collect(Iterators.partition(2:20, 4))
5-element Array{UnitRange{Int64},1}:
 2:5
 6:9
 10:13
 14:17
 18:20

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops. This probably slows down things significantly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you change this could you please report the benchmarks? Thank you.

partitions = Iterators.partition(2:len, basesize)
else
partitions = (2:len,)
end
widen_type_lock = ReentrantLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you create a new lock and not use the GroupedDataFrame lock? Is the other lock used in the part of code that does parallel processing of different transformations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory AFAICT we don't have to take gd.lazy_lock, i.e. it should be possible to compute indices in parallel without conflicting with what we're doing here. But yeah, since the code here requires indices to have been computed, gd.lazy_lock will never be locked when we are here, so it doesn't make a difference and I can reuse gd.lazy_lock for simplicity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it actually it is OK to use a separate lock I think. The reason is that if you run 2 transformations that produce one row then you want separate locks for both of them.

Still - as commented below - it would be good to have a benchmark of something like:

combine($gd, :y => (y -> sum(y)) => :y1, :y => (y -> sum(y)) => :y2);

(so that we can see what happens to the performance when we run in parallel two transformations that themselves get run in parallel)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes that's probably why I used separate locks. :-)

I'll run more benchmarks after fixing the PR.

outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols)
type_widened = fill(false, length(partitions))
tasks = similar(partitions, Task)
for tid in 1:length(partitions)
idx = partitions[tid]
tasks = Vector{Task}(undef, length(partitions))
for (tid, idx) in enumerate(partitions)
tasks[tid] =
@spawn _combine_rows_with_first_task!(tid, idx, outcols, outcolsref,
type_widened, widen_type_lock,
Expand All @@ -219,9 +222,8 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow},

# Copy data for any tasks that finished before others widened columns
outcols = outcolsref[]
for tid in 1:length(partitions)
for (tid, idx) in enumerate(partitions)
if type_widened[tid]
idx = partitions[tid]
oldoutcols = fetch(tasks[tid])
for k in 1:length(outcols)
if oldoutcols[k] !== outcols[k]
nalimilan marked this conversation as resolved.
Show resolved Hide resolved
Expand Down