Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multithreading with several operations in combine/select/transform #2574

Merged
merged 23 commits into from
Jan 21, 2021

Conversation

nalimilan
Copy link
Member

No description provided.

@nalimilan
Copy link
Member Author

Benchmarks indicate a relatively small overhead for small data frames and a large gain with large ones when there are multiple operations. Maybe the overhead could be reduced by not using anonymous functions.

using Revise, DataFrames, BenchmarkTools, Random, PooledArrays
Random.seed!(1);
for N in (1_000, 10_000, 1_000_000, 100_000_000)
    @show N
    df = DataFrame(x=rand(1:10, N), y=rand(N));
    gd = groupby(df, :x);
    @btime combine($gd, :y => sum);
    @btime combine($gd, :y => sum, :y => maximum);
    @btime combine($gd, :y => (y -> sum(y)) => :sum, :y => (y -> maximum(y)) => :maximum);
end

# master, 1 thread (similar with 2 threads)
N = 1000
  18.067 μs (159 allocations: 12.72 KiB)
  28.645 μs (194 allocations: 14.31 KiB)
  43.973 μs (400 allocations: 35.06 KiB)
N = 10000
  26.693 μs (159 allocations: 12.72 KiB)
  55.812 μs (194 allocations: 14.31 KiB)
  103.867 μs (400 allocations: 176.94 KiB)
N = 1000000
  1.382 ms (159 allocations: 12.72 KiB)
  3.970 ms (194 allocations: 14.31 KiB)
  16.699 ms (420 allocations: 15.28 MiB)
N = 100000000
  147.389 ms (159 allocations: 12.72 KiB)
  405.261 ms (194 allocations: 14.31 KiB)
  2.523 s (420 allocations: 1.49 GiB)

# PR, 2 threads
N = 1000
  27.866 μs (179 allocations: 14.03 KiB)
  39.069 μs (219 allocations: 16.31 KiB)
  49.763 μs (445 allocations: 38.00 KiB)
N = 10000
  42.318 μs (179 allocations: 14.03 KiB)
  61.622 μs (219 allocations: 16.31 KiB)
  100.182 μs (443 allocations: 179.81 KiB)
N = 1000000
  1.412 ms (179 allocations: 14.03 KiB)
  2.957 ms (219 allocations: 16.31 KiB)
  8.945 ms (463 allocations: 15.28 MiB)
N = 100000000
  147.132 ms (179 allocations: 14.03 KiB)
  285.011 ms (220 allocations: 16.34 KiB)
  2.379 s (464 allocations: 1.49 GiB)

# PR, 1 thread
N = 1000
  22.249 μs (177 allocations: 13.97 KiB)
  32.287 μs (215 allocations: 16.19 KiB)
  55.203 μs (440 allocations: 37.84 KiB)
N = 10000
  31.565 μs (177 allocations: 13.97 KiB)
  60.558 μs (215 allocations: 16.19 KiB)
  115.944 μs (439 allocations: 179.69 KiB)
N = 1000000
  1.394 ms (177 allocations: 13.97 KiB)
  3.845 ms (215 allocations: 16.19 KiB)
  15.734 ms (459 allocations: 15.28 MiB)
N = 100000000
  146.931 ms (177 allocations: 13.97 KiB)
  408.077 ms (215 allocations: 16.19 KiB)
  2.506 s (459 allocations: 1.49 GiB)

@bkamins
Copy link
Member

bkamins commented Dec 6, 2020

Yes - this is very nice. I would not be overly concerned about the overhead given its size (which is very small). Thank you for working on this!

Could you please check if the following produce a correct result (if you prefer I can write a test-set for this):

using Combinatorics
trans = [:id => (y -> sum(y)) => :v1, :id => (y -> 10maximum(y)) => :v2, y -> (v3=100y.id[1],), y -> (v4=fill(1000y.id[1],y.id[1]+1),)]
for p in permutations(1:4), i in 1:4
    @show combine(groupby(DataFrame(id=1:2), :id), trans[p[1:i]]...)
end

this test is made to make sure we correctly expand the columns if needed.

@bkamins
Copy link
Member

bkamins commented Dec 7, 2020

Ah - one thing. So in this PR you propose to use all threads that Julia was started with to use? I would be OK with this, but wanted to double check with you if this is the intent (as earlier we considered a different strategy - still now it is probably OK to do as you propose).

@nalimilan
Copy link
Member Author

Ah - one thing. So in this PR you propose to use all threads that Julia was started with to use? I would be OK with this, but wanted to double check with you if this is the intent (as earlier we considered a different strategy - still now it is probably OK to do as you propose).

Yes, here the overhead of starting multiple tasks is much lower so I'm not sure it's necessary to allow tweaking that. Even if other threads are busy it shouldn't be a problem. Though we'll probably have to address this issue in other places where the tradeoff is less clear.

@bkamins
Copy link
Member

bkamins commented Dec 13, 2020

CI fails because with threading the exception type changes. I would unwrap CompositeException in tests and check the wrapped exception. OK?

@nalimilan
Copy link
Member Author

CI fails because with threading the exception type changes. I would unwrap CompositeException in tests and check the wrapped exception. OK?

Actually as I replied above I suggest we return the right exception type, and then we don't need to change tests. What do you think?

@nalimilan nalimilan marked this pull request as ready for review December 20, 2020 18:44
@bkamins
Copy link
Member

bkamins commented Jan 14, 2021

so the two PRs are kind of complementary.

But how would they play-together? I.e. this PR uses threads for different aggregations and #2588 uses them for single aggregation. So the question is if they are combined what would happen.

@nalimilan
Copy link
Member Author

Actually #2588 is on top of this PR, so it includes both. The idea is that each operation gets a task, and then inside operations those that use the fast path only get one task, but those that use the slow path get one task per CPU. Since operations that use the fast path are, well, faster, this should be quite to optimal when mixing fast path and slow path operations.

@bkamins
Copy link
Member

bkamins commented Jan 14, 2021

Makes sense. I will look into the other PR tomorrow.

@nalimilan
Copy link
Member Author

Could you try with the latest commit? It should allow the GC to run every 100,000 rows, i.e. 1000 times in this example. Performance doesn't seem to be affected too much.

@bkamins
Copy link
Member

bkamins commented Jan 17, 2021

What is our strategy? You want to merge this PR first and then the other threading PR separately?

@nalimilan
Copy link
Member Author

I'd say yes.

@bkamins
Copy link
Member

bkamins commented Jan 17, 2021

OK - I think it is good to merge, except the minor comments I have left and resolving merge conflicts

@nalimilan
Copy link
Member Author

Do you mean the GC issue is fixed with the last commit? :-)

@bkamins
Copy link
Member

bkamins commented Jan 17, 2021

Yes, I cannot reproduce it now.

@bkamins
Copy link
Member

bkamins commented Jan 17, 2021

I have run some more stress tests on larger data. The system chokes (even System Monitor stalls for a second in extreme memory usage cases), but I cannot get Julia "Killed". Fantastic job as usual!

Do you think we should ask someone with experience in threading for another review, or you are confident with what we have? (I have checked the "logic", but I do not know internals of threading implementation)

@bkamins
Copy link
Member

bkamins commented Jan 17, 2021

Can you please also add information about using multithreading in the docstings and in the manual? I think more advanced users would want to know the "way" we do it and use the threads as it might affect their workflows and decision with how many threads to start Julia.

@bkamins
Copy link
Member

bkamins commented Jan 17, 2021

I also asked for opinion on Slack on using all available threads.

@nalimilan
Copy link
Member Author

Cool!

I'm reasonably confident with the implementation, but of course it's always good to have more eyes if somebody familiar with threading is willing to look at the PR.

for j in s:e
k += 1
newcol[gd_idx[j]] = col[k]
@sync for i in eachindex(trans_res)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a comment on the logic how we handle the threading here? (i.e. a high-level logic how we ensure that we do not have race-conditions and correctly process things). Thank you!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what to say about this loop actually. Each entry can be processed in parallel because we extract its fields, do some computing based on them, and replace the original entry with the result. We don't update any external state. Is there something in particular that you feel is worth mentioning?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not mean this loop but the logic in general. I.e. that we have to:

  1. sync idx_agg across threads
  2. do actual inserting of columns in post-processing as we have to do it sequentially (although we compute them in parallel)

maybe something more that might be considered hard to understand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I've added a comment, let me know what you think.

Copy link
Member

@bkamins bkamins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I will test the other PR when it is rebased against the current state of this one

@@ -122,6 +122,10 @@ It is allowed to mix single values and vectors if multiple transformations
are requested. In this case single value will be repeated to match the length
of columns specified by returned vectors.

A separate task is spawned for each specified transformation, allowing for
parallel operation when several transformations are requested and Julia was
started with more than one thread.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add that this means that the transformation functions passed should not modify the same state of the Julia program?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I've also mentioned that this may be extended in the future so that people are not caught by surprise.

@nalimilan
Copy link
Member Author

I've realized I forgot to actually enable multithreading on CI. I chose 4 threads to maximize the chances of catching synchronization bugs, and a warning so that we detect issues with settings instead of things silently succeeding.

@bkamins
Copy link
Member

bkamins commented Jan 20, 2021

Thank you for the changes. I was testing it on 1 to 8 threads and did not catch a bug. I am OK with merging it once CI passess.

@nalimilan nalimilan merged commit cf4f3ab into main Jan 21, 2021
@nalimilan nalimilan deleted the nl/threadedops branch January 21, 2021 08:10
@bkamins
Copy link
Member

bkamins commented Jan 21, 2021

OK - please let me know when you rebase the other PR and I will review. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants