-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add communications bandwidth to benchmarks #938
Add communications bandwidth to benchmarks #938
Conversation
Two new utility functions are added to print benchmarks results to avoid the need to rearrange number of individual white spaces or separator lengths any time a new longer row is added, also preventing the count of spaces given indentation of code.
The existing throughput would only give an overview of total data processed for the complete workflow, but no insight to communications which the new throughput does. Additionally, moved common peer-to-peer bandwidth computation as utility function.
Codecov Report
@@ Coverage Diff @@
## branch-22.08 #938 +/- ##
==============================================
Coverage ? 0.00%
==============================================
Files ? 16
Lines ? 2092
Branches ? 0
==============================================
Hits ? 0
Misses ? 2092
Partials ? 0 Continue to review full report at Codecov.
|
key="Bandwidth", | ||
value=f"{format_bytes(bandwidths_all.mean())}/s +/- " | ||
f"{format_bytes(bandwidths_all.std())}/s", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, and this is just the same as before, the arithmetic mean is generally the wrong thing for rate-like data, and the more correct option is the harmonic mean len(xs)/sum(1/xs)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was actually wondering if that was correct but didn't spend the time in investigating what would be the correct way to compute. Harmonic means do make sense indeed. However, it looks depressing 🙁
Bandwidth | 10.18 GiB/s +/- 8.16 GiB/s
BandwidthH | 3.47 GiB/s +/- 1.25 GiB/s
We seemingly go down from 10.18GB/s to 3.47GB/s, of course the former is incorrect anyway, but it is sad.
I now replaced mean and standard deviation in 268f893, I am not sure whether standard deviation is correct, can you double-check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Standard deviations of inverse random variables is a huge can of worms :(, but as per this stats.stackexchange post there are a few options. Suppose we have our rates
Then the variance of the harmonic mean of
The above-linked post shows that this is a good estimate for uniformly distributed data at least.
So I think we can calculate harmonic mean and standard deviation as
def hmean(data):
return 1/np.mean(1/data)
def hstd(data):
rmean = np.mean(1/data)
rvar = np.var(1/data)
return np.sqrt(rvar / (len(data) * rmean**4))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I saw that yesterday but wasn't totally sure that would be necessarily correct either. I've made the changes in d120cd3 , thanks for the code as well!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor queries
) | ||
print_key_value( | ||
key="Throughput (Comms only)", | ||
value=f"{format_bytes(bandwidths_all.mean())} +/- " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whereas this bandwidth measure is the mean of the all the p2p data movement observeed by dask?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct.
print_key_value( | ||
key="Throughput (Comms only)", | ||
value=f"{format_bytes(bandwidths_all.mean())} +/- " | ||
f"{format_bytes(bandwidths_all.std())}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If args.backend == "explicit-comms"
I think these will all be empty (since dask doesn't record p2p in those cases), c.f. lines 329 and onwards below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem to be the case, although it's possible this is only for normal message passing (e.g., heartbeats):
$ DASK_DISTRIBUTED__COMM__COMPRESSION=None OPENBLAS_NUM_THREADS=1 python dask_cuda/benchmarks/local_cudf_merge.py -p ucx -d 0,1,2,3,4,5,6,7 -c 1_000_000 --runs 30 -b explicit-comms
...
Throughput | 1.07 GiB/s +/- 209.63 MiB/s
Bandwidth | 103.27 MiB/s +/- 90.80 MiB/s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this must be background messaging since I don't think that the bandwidth can be lower than the throughput if we're actually measuring data movement of the computation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can. Consider for example a problem that processes 10GB of data being one second with 8 workers, or 10GB/s throughput. Communications with all 8 workers could be 90% of the total time at a rate of 100MB/s in each direction, times 8 workers, would still keep bandwidth at only 1600MB/s. Therefore, there's no direct relationship between both, not in this benchmark at least. You could also have a problem with no communication whatsoever, that would give us still some throughput, but bandwidth would be 0.
Despite the above I think you are right and we need to review explicit-comms anyway, over time this benchmark has grown organically, and possibly not always considering the differences posed by explicit-comms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can. Consider for example a problem that processes 10GB of data being one second with 8 workers, or 10GB/s throughput. Communications with all 8 workers could be 90% of the total time at a rate of 100MB/s in each direction, times 8 workers, would still keep bandwidth at only 1600MB/s. Therefore, there's no direct relationship between both, not in this benchmark at least. You could also have a problem with no communication whatsoever, that would give us still some throughput, but bandwidth would be 0.
Thanks, wasn't thinking very hard, as you note a more extreme example is that all the data shuffling is local to each worker (high throughput but tiny bandwidth)
Despite the above I think you are right and we need to review explicit-comms anyway, over time this benchmark has grown organically, and possibly not always considering the differences posed by explicit-comms.
Perhaps let's discuss in #940 to pin down exactly what we want to be measuring (since that changes some of the things that are measured/saved)
print_separator(separator="=") | ||
print_key_value( | ||
key="Throughput", | ||
value=f"{format_bytes(t_p.mean())} +/- {format_bytes(t_p.std()) }", | ||
value=f"{format_bytes(hmean(t_p))}/s +/- {format_bytes(hstd(t_p))}/s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a check to compare with how standard errors are propagated for reciprocals and these formulae are consistent with that approach. For this case data_processed
is the same for every run and the times vary. So we can compute the mean throughput with data_processed / np.mean(times)
and the standard deviation with data_processed * np.std(times) / np.mean(times)**2
. But these are just special cases of the harmonic approach, so I'm happy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpicks, but otherwise fine
dask_cuda/benchmarks/local_cupy.py
Outdated
t = format_time(took) | ||
print_key_value(key=f"{t}", value=f"{npartitions}") | ||
times = [] | ||
for idx, (took, npartitions) in enumerate(took_list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idx
unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in dc1a587
t = format_time(took) | ||
print_key_value(key=f"{t}", value=f"{npartitions}") | ||
times = [] | ||
for idx, (took, npartitions) in enumerate(took_list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idx
unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in dc1a587
times = np.asarray(times) | ||
bandwidths_all = np.asarray(bandwidths_all) | ||
print_separator(separator="=") | ||
print_key_value( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can deduce throughput for these operations as well, but it would need to be done for each op in turn, and we'd need to pick a cache model. For example does (A + A.T).sum()
move A.nbytes
of data or 2*A.nbytes
(or more)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The amount of data that gets moved (if by move you mean the same as transferring between workers) in the cuDF benchmarks is irrelevant for throughput, instead the throughput is based on the data_processed
which is a sum of the DataFrames sizes:
dask-cuda/dask_cuda/benchmarks/local_cudf_merge.py
Lines 169 to 170 in a2e3223
data_processed = len(ddf_base) * sum([t.itemsize for t in ddf_base.dtypes]) | |
data_processed += len(ddf_other) * sum([t.itemsize for t in ddf_other.dtypes]) |
In the same manner, I believe the equivalent data_processed
for CuPy benchmarks here would be the sum of sizes of input arrays, e.g., x.nbytes
for
x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() |
and x.nbytes + y.nbytes
for
dask-cuda/dask_cuda/benchmarks/local_cupy.py
Lines 44 to 45 in a2e3223
x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() | |
y = rs.random((args.size, args.size), chunks=args.chunk_size).persist() |
IOW, A.T
is not an input and thus doesn't get accounted for in data_processed
. Does the above make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant data movement in the local processing, but I guess this is a slightly philosophical argument. I think I was thinking from the point of view of what is the streaming data cost? e.g. if I just compute B = A.T
then I need to stream A
and stream the output to B
so the data movement is 2A.nbytes (on cpus at least it would normally be 3A.nbytes since caches are usually write-allocate without hinting that the output can be done with a non-temporal store). This kind of metric tells us something about the performance on the hardware.
But I agree that a throughput of "this is how much data I started with, how fast can I process it?" is also a good metric, and is generally the one we actually care about since that's the wallclock time question.
In conclusion: happy to go with sum of input array sizes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree with your statement on data movement, but I think this can get impractical quite fast when we need to compute for various different operations. Since you agree, for now let's go with the input array sizes then, which is now addressed in dc1a587 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on @wence-
Probably good to address Lawrence's comments before merging in
@wence- I think all your suggestions/requests have been addressed, do you think we're good to merge? |
Yes, go ahead. |
@gpucibot merge |
The existing throughput would only give an overview of total data processed for the complete workflow, but no insight on communications which the new bandwidth value now does. Additionally, moved common peer-to-peer bandwidth computation as utility function.
Depends on #937