Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance degradation of repeated aggregations with distributed scheduler and client.persist() on a single machine #332

Open
gbrener opened this issue Apr 28, 2017 · 21 comments

Comments

@gbrener
Copy link
Contributor

gbrener commented Apr 28, 2017

The following code illustrates a performance issue with dask's distributed scheduler when combined the client.persist(df) functionality on a single machine:

from dask import distributed
import multiprocessing
cluster = distributed.LocalCluster(n_workers=multiprocessing.cpu_count(), threads_per_worker=1)
dask_client = distributed.Client(cluster)
dask_df = dd.from_pandas(df, npartitions=multiprocessing.cpu_count())
...
cvs = datashader.Canvas(...)
agg = cvs.points(dask_df, ...)

When we insert the following line before agg = cvs.points(dask_df, ...):

dask_df = dask_client.persist(dask_df)

The runtime performance of cvs.points() gets much worse than when persist was not used.

@gbrener gbrener changed the title Reduced performance with distributed scheduler with client.persist() Reduced performance with distributed scheduler with client.persist() on a single machine Apr 28, 2017
@martindurant
Copy link

To be clear, persist should improve performance when there is more than one computation on the same data, to prevent having to load it into memory more than once. In the above example, the data is dead baggage after it has been used, and would have been freed if not for the persist. Furthermore, not sure if this is the case, but if the aggregation touches only some of the data, persist would still have loaded all of it.

@gbrener
Copy link
Contributor Author

gbrener commented Apr 28, 2017

@martindurant I'm seeing the issue with repeat aggregations on the same dataset - both when the aggregation touches some of the data, and when it touches all of the data. It would be nice if someone could confirm; here are the commands I'm using (in datashader/examples directory, after following the instructions at the top of filetimes.sh):

With threaded scheduler + df.persist():

$ python filetimes.py times/tinycensus.snappy.parq  dask    census meterswest metersnorth race --debug --cache=persist
DEBUG: Cache "persist" mode enabled
DEBUG: "threaded" scheduler is enabled
DEBUG: Memory usage (before read):      140.312576 MB
DEBUG: read_parquet(times/tinycensus.snappy.parq, columns=['meterswest', 'metersnorth', 'race'], index=False)
DEBUG: Force-loading Dask dataframe
DEBUG: Memory usage (after read):       156.065792 MB
DEBUG: Index              80
metersnorth    245344
meterswest     245344
race            30998
dtype: int64
DEBUG: DataFrame size:                  0.521766 MB
DEBUG: column "meterswest" dtype: float64
DEBUG: column "metersnorth" dtype: float64
DEBUG: column "race" dtype: category
DEBUG: Memory usage (after agg1):       157.933568 MB
DEBUG: Memory usage (after agg2):       174.067712 MB
times/tinycensus.snappy.parq dask    Aggregate1:001.45 (001.23+000.22)  Aggregate2:000.01  In:00000335355  Out:00000033122  Total:003.99

With distributed scheduler + client.persist(df):

$ python filetimes.py times/tinycensus.snappy.parq  dask    census meterswest metersnorth race --debug --cache=persist --distributed
DEBUG: Cache "persist" mode enabled
DEBUG: "distributed" scheduler is enabled
DEBUG: Memory usage (before read):      142.39744 MB
DEBUG: read_parquet(times/tinycensus.snappy.parq, index=False, columns=['meterswest', 'metersnorth', 'race'])
DEBUG: Memory usage (after read):       143.036416 MB
DEBUG: Index              80
metersnorth    245344
meterswest     245344
race            30998
dtype: int64
DEBUG: DataFrame size:                  0.521766 MB
DEBUG: column "meterswest" dtype: float64
DEBUG: column "metersnorth" dtype: float64
DEBUG: column "race" dtype: category
DEBUG: Memory usage (after agg1):       145.743872 MB
DEBUG: Memory usage (after agg2):       160.063488 MB
times/tinycensus.snappy.parq dask    Aggregate1:001.55 (000.01+001.54)  Aggregate2:000.27  In:00000335355  Out:00000033122  Total:009.88
Received signal 15, shutting down

Note that because both aggregations take so long for the distributed+persist configuration, overall runtime more than doubles.

@mrocklin
Copy link

@gbrener note that persist is non-blocking. If you are profiling it might be worthwhile to call distributed.wait(dask_df) after calling persist to wait until dask finishes loading in that data.

@gbrener
Copy link
Contributor Author

gbrener commented May 1, 2017

Thanks for the tip @mrocklin. I tried your suggestion and the issue remains. Here is more output, comparing two runs with the distributed scheduler, one with dask_df = client.persist(df); distributed.wait(dask_df), and the other with no persistence/waiting:

With persisting/waiting:

$ python filetimes.py times/census.snappy.parq  dask    census meterswest metersnorth race --debug --cache=persist --distributed
DEBUG: Cache "persist" mode enabled
DEBUG: "distributed" scheduler is enabled
DEBUG: Memory usage (before read):      141.836288 MB
DEBUG: read_parquet(times/census.snappy.parq, columns=['meterswest', 'metersnorth', 'race'], index=False)
DEBUG: Memory usage (after read):       143.29856 MB
DEBUG: Index                 560
metersnorth    2453400032
meterswest     2453400032
race            306677314
dtype: int64
DEBUG: DataFrame size:                  5213.477938 MB
DEBUG: column "meterswest" dtype: float64
DEBUG: column "metersnorth" dtype: float64
DEBUG: column "race" dtype: category
DEBUG: Memory usage (after agg1):       2191.958016 MB
DEBUG: Memory usage (after agg2):       2191.958016 MB
times/census.snappy.parq     dask    Aggregate1:066.17 (010.00+056.17)  Aggregate2:046.07  In:02377123600  Out:00000625044  Total:122.35
Received signal 15, shutting down

Without persisting/waiting:

$ python filetimes.py times/census.snappy.parq  dask    census meterswest metersnorth race --debug --distributed
DEBUG: Cache disabled
DEBUG: "distributed" scheduler is enabled
DEBUG: Memory usage (before read):      141.942784 MB
DEBUG: read_parquet(times/census.snappy.parq, columns=['meterswest', 'metersnorth', 'race'], index=False)
DEBUG: Memory usage (after read):       142.544896 MB
DEBUG: Index                 560
metersnorth    2453400032
meterswest     2453400032
race            306677314
dtype: int64
DEBUG: DataFrame size:                  5213.477938 MB
DEBUG: column "meterswest" dtype: float64
DEBUG: column "metersnorth" dtype: float64
DEBUG: column "race" dtype: category
DEBUG: Memory usage (after agg1):       2281.30816 MB
DEBUG: Memory usage (after agg2):       2281.30816 MB
times/census.snappy.parq     dask    Aggregate1:023.96 (000.01+023.95)  Aggregate2:019.76  In:02377123600  Out:00000625044  Total:061.97
Received signal 15, shutting down

The odd thing (to me) is not necessarily that it takes more time to load (persist) the data initially, but that it takes more time to do aggregations after the data is persisted. Maybe I'm just not using the feature properly; here is the relevant code for anyone who's interested: https://github.com/bokeh/datashader/blob/dd_scheduler/examples/filetimes.py#L94-L98
Based on the localhost:8787/status output, it appears that there are points where a lot of data is being transferred between the worker processes. I wonder if client.persist() is forcing the data to be allocated to each process less efficiently than it otherwise would be, causing additional transfers during later aggregations.

Does client.persist() load all of the data into each worker process, or only some of it?

@martindurant
Copy link

This is mixing issues. The original issue above is load+persist+calculate being slower than load+calculate, even when there is enough memory to go around, which should be investigated. Aside from perhaps time for allocating enough memory to hold the persisted values, they should be the same speed (and subsequent calculations on the same data should be much faster).

The second issue is specific to the census/datashader pipeline, which is almost certainly to do with running out of physical memory, and it makes sense that persist makes a potentially big difference here. For reference, the in-memory data is over 5GB, but the aggregation takes up 2GB more (plus any intermediates) in the workers and is then copied to the client process. This is much bigger than it should be. I don't know what the peak memory during pandas aggregation on the same data is, also would be worth checking with memory_profiler.

@gbrener gbrener changed the title Reduced performance with distributed scheduler with client.persist() on a single machine Performance degradation of repeated aggregations with distributed scheduler with client.persist() on a single machine May 1, 2017
@gbrener gbrener changed the title Performance degradation of repeated aggregations with distributed scheduler with client.persist() on a single machine Performance degradation of repeated aggregations with distributed scheduler and client.persist() on a single machine May 1, 2017
@gbrener
Copy link
Contributor Author

gbrener commented May 1, 2017

@martindurant , I just updated the title. Hopefully that makes it clearer that this specific issue is about the second one you're referring to.

@jbednar
Copy link
Member

jbednar commented May 1, 2017

The aggregation should involve about 4MB of data, not 2GB, because it's just a 900x525 array of doubles. Something seems fishy.

@martindurant
Copy link

martindurant commented May 2, 2017

Memory plot when running in pure-pandas mode (script extract below):

figure_1

The data size is ~5GB, but there are regular excursions to almost 8GB. There being four for each agg, I would suspect these are calculating max/min on the axes - snakeviz seems to confirm this. I note with interest the following:

In [7]: %time df.metersnorth.min()  # pandas with NaN checking
CPU times: user 1.58 s, sys: 898 ms, total: 2.48 s
Wall time: 2.5 s
Out[7]: 2818291.5

In [8]: %time df.metersnorth.values.min()  # pure numpy
CPU times: user 167 ms, sys: 1.58 ms, total: 169 ms
Wall time: 167 ms
Out[8]: 2818291.5

I don't know what datashader has to say about null-handling, but it would be well worthwhile to determine the nulls only once up-front than to pay the cost every time.

Here is the script I used for pure-pandas:

import fastparquet
import datashader as ds
import time

plot_width = 900
plot_height = int(900*7.0/12)
t0 = time.time()
df = fastparquet.ParquetFile('census.parquet').to_pandas()
t1 = time.time()
print('Load', round(t1-t0, 1))
cvs = ds.Canvas(5, 5)
agg = cvs.points(df, 'meterswest', 'metersnorth')
t2 = time.time()
print('Agg1', round(t2-t1, 1))
cvs = ds.Canvas(plot_width, plot_height)
agg = cvs.points(df, 'meterswest', 'metersnorth')
t3 = time.time()
print('Agg2', round(t3-t2, 1))

-EDIT-

In [4]: @numba.njit()
   ...: def nan_min_max(arr):
   ...:     mi = arr[0]
   ...:     ma = arr[0]
   ...:     for i in range(1, len(arr)):
   ...:         v = arr[i]
   ...:         if np.isnan(v):
   ...:             continue
   ...:         if v > ma:
   ...:             ma = v
   ...:         if v < mi:
   ...:             mi = v
   ...:     return ma, mi
In [11]: %time nan_min_max(np.asarray(df.metersnorth.values))
CPU times: user 327 ms, sys: 1.23 ms, total: 328 ms
Wall time: 327 ms
Out[11]: (6335972.0, 2818291.5)

@gbrener
Copy link
Contributor Author

gbrener commented May 2, 2017

@martindurant, I appreciate the snippet. There is different behavior in datashader for handling dask dataframes versus pandas dataframes; the benchmark code uses dd.read_parquet to read the parquet file; would you mind doing the same so that we're comparing apples-to-apples?

What are the specs of the machine you are using (OS, RAM, CPU)?

@martindurant
Copy link

Well, the point is that the profiling is far far easier to do, and illustrates the important point: doing max/min on a pandas column comes at a memory cost. When we persist and then do max/min in dask, we have the memory cost at the same time as some additional overhead to move data between workers and copy some to the client.

Given that it takes 10s to do max/min four times on the whole pandas dataframe, and >>10s for either complete aggregation, I'd say ~4s for the dask aggregations is very reasonably indeed.

I'm saying that instead of trying to tweak dask's memory usage for this specific data size and physical available memory, first we should find the easy win either by assuming a policy around NaNs or using custom complied code something like the numba function above. Combining both max/min runs into the same function, I get ~500ms runtime, instead of 10s for pure pandas. You'll not get such a speedup from dask.

@gbrener
Copy link
Contributor Author

gbrener commented May 2, 2017

@martindurant it sounds like we're in agreement that the additional overhead after client.persist() is likely due to dask moving data between the workers and the client. Do you have any advice for when client.persist() should be used with the distributed scheduler? Perhaps a use-case where it outperforms the distributed scheduler on its own?

@martindurant
Copy link

The extra time is due to running out of memory when forcing all data to be held in processes, and then having some additional memory usage due to copying of intermediates and aggregate products.
If you data fits into memory as a pandas dataframe, it's always going to be hard to beat its optimized aggregations like count and max, which essentially work as fast as you can push data through the memory bus (caveat the nan handling, above).

Some simple guidelines for dask:

  • dask always comes with some overhead, and simple in-memory operations may well be faster without it
  • use when you have more data than fits in memory; without persist, it will be loaded in chunks for each computation
  • use when you have computations that are CPU-intensive and can be expressed in parallel algorithms, and especially when they don't require a lot of temporary working memory
  • use on a cluster to be able to use more cores and memory than you reasonably could on one machine
  • for IO-intensive operations, parallelism may well give the full multiplier, until the connection is saturated, when adding more workers will only add overhead.

Use persist when:

  • you intend to reuse some data for multiple calculations, but obtaining that data is expensive. This is typical for, say, a dataframe which requires multiple operations to get it into canonical form, but then has multiple queries to run on it.
  • There must be enough memory for each of the subsequent queries for this to be efficient; if there is plenty of memory, the data could also be replicated between workers. How much impact this makes depends on the specifics of the job.

@jbednar
Copy link
Member

jbednar commented May 2, 2017

@martindurant, thanks for all that! I've just discussed some of this briefly with Greg, and I think there are several issues raised here he'll be splitting out separately.

For what it's worth, we do not need any special handling for NaNs; the original point data that we are dealing with should never be NaN in the first place, so we can simply assert that there are no NaNs and use whatever function is fastest with no NaNs.

That said, I'm still not getting why there is a non-trivial memory cost to doing max/min on a pandas column, so there is clearly something I'm missing. Why would computing max involve anything other than scanning all the values and keeping the largest? Even if it's threaded, seems like it shouldn't need more than the number of threads worth of data (e.g. keeping four partial maxes and comparing them later), so why would the memory required be in gigabytes?

@gbrener
Copy link
Contributor Author

gbrener commented May 2, 2017

@martindurant , thank you for the clarifications. Can you please provide a working example showing client.persist() used with the distributed scheduler on a single machine, outperforming the distributed scheduler alone? Once I have that I can close this issue.

@martindurant
Copy link

"Why would computing max involve anything other than scanning all the values and keeping the largest?" - honestly, I don't know what pandas can be doing during this time. From experience, pandas copies data at the most innocuous and inconvenient commands.
If you are certain there should be no nans, the min/max of the underlying numpy array seems favourite; @mrocklin , can you extract a dask array like df[column].values and expect min/max to work efficiently (without copy)? The other option is to write the numba function, which can also handle nans and do min/max in one shot, but the dask code will require a little modification for the combination of the partial results, similar to the code for the main aggregation.

btw: one obvious way to avoid the cost of calculating bounds is to provide them up-front. Then the benchmark if of something different, of course, but it would seem reasonable in the proposed situation where the bounds are the same for both aggs.

@martindurant
Copy link

Also, I think we probably have the answer for why cachey was doing so much better: it was caching not just the dataset itself, but the max/min values, for instant recall.

@mrocklin
Copy link

mrocklin commented May 2, 2017

@mrocklin , can you extract a dask array like df[column].values and expect min/max to work efficiently (without copy)?

I think so. We're just calling s.values under the hood. Normal caveats for per-task overheads for Dask apply.

@jbednar
Copy link
Member

jbednar commented May 2, 2017

Also, I think we probably have the answer for why cachey was doing so much better: it was caching not just the dataset itself, but the max/min values, for instant recall.

If that's true, then we can surely do something about that! But I'm not sure why that would have varied so much between different ways of loading the data, which was the original finding with cachey; surely the min and max would be preserved regardless of what else might have been cached.

@martindurant
Copy link

martindurant commented May 2, 2017

Care to try this one? https://gist.github.com/martindurant/b23dcb8eb205e769707c1f58c46d13e0
I get the best timings yet:

Persisted 9.6
Agg1 1.9807648658752441
Agg2 0.7665679454803467

...of course this isn't very flexible, but I thought you might appreciate it, @gbrener . This might give you hints. (btw: not sure I have my x and y axes the right way around, but you know what I mean)

(add nogil to the numba decorator, and it's equally fast with threads instead of processes)

@gbrener
Copy link
Contributor Author

gbrener commented May 6, 2017

Thanks for the example @martindurant . I closed #336 (the min/max NaN-checking issue) after adding some speed improvements to the bounds-calculations code in datashader (#344), including @memoize and np.nanmin/np.nanmax (for the dask dataframe side) and using numba to only do one pass through the data (for the pandas dataframe side). I had difficulties combining numba and dask for this area of the code, so that optimization is only present for pandas, but if someone else wants to take a stab at it please be my guest :)

@martindurant
Copy link

Yes, to use numba on the dask side, you would need to write code to combine the results for each partition. This is exactly what already happens for the actual aggregation, so similar code is there somewhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants