-
-
Notifications
You must be signed in to change notification settings - Fork 18.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelization for embarrassingly parallel tasks #5751
Comments
Maybe this would be something better done in a |
for windows vs. linux/osx threading/multiprocessing that is. |
I think this could start with an optional keyword / option to enable parallel conputation where useful. this solves the windows/Linux issues because the default to do parallel can be different ( or have different thresholds, like we do for numexpr) |
They could all be grouped under a |
@michaelaye can u put up a simple example that would be nice for benchmarking? |
Here's an implementation using joblib: https://github.com/jreback/pandas/tree/parallel and some timings.....I had to use a pretty contrived function actually....
pickle time outweighs the perf gains, function is too quick so no benefit here
So you need a sufficiently slow function on a single-column to make this worthwhile
|
Good write up Jeff. I think pickle time is a big factor but also the time to spawn a new process. I would envision this working with a set of compute processes that are pre launched on startup and wait to do work. For my future parallel work I will probably use iPython parallel across distributed hdf5 data. The problem that I often run into is slowly growing memory consumption for python processes that live for too long. On disk parallel access of HDF5 row chunks to speed up computation sounds great. |
yep...I don't think adding a HDF5 and groupby apply look like especially nice cases for enhancement with this. Pls play around and give me feedback on the API (and even take a stab at a backend!) |
http://docs.cython.org/src/userguide/parallelism.html a Cynthon engine is also straightforward (though needs a slight change in setup to compile with OpenMP Support) |
I've loaded a data-frame with roughly 170M rows in memory (python used ~35GB RAM) and timed the same operation with 3 methods and ran it over night. The machine has 32 physical or 64 hypervised cores and enough free RAM. While date conversion is a very cheap operation it shows the overhead of these methods. While the single threaded way is the fastest its quite boring to see a single core continuously running at 100% while 63 are idling. Ideally I want for parallel operations some kind of batching to reduce the overhead, e.g. always 100000 rows or something like batchsize=100000. @interactive
def to_date(strdate) :
return datetime.fromtimestamp(int(strdate)/1000)
%time res['milisecondsdtnormal']=res['miliseconds'].map(to_date)
#CPU times: user 14min 52s, sys: 2h 1min 30s, total: 2h 16min 22s
#Wall time: 2h 17min 5s
pool = Pool(processes=64)
%time res['milisecondsdtpool']=pool.map(to_date, res['miliseconds'])
#CPU times: user 21min 37s, sys: 2min 30s, total: 24min 8s
#Wall time: 5h 40min 50s
from IPython.parallel import Client
rc = Client() #local 64 engines
rc[:].execute("from datetime import datetime")
%time res['milisecondsipython'] = rc[:].map_sync(to_date, res['miliseconds'])
#CPU times: user 5h 27min 4s, sys: 1h 23min 50s, total: 6h 50min 54s
#Wall time: 10h 56min 18s |
it's not at all clear what you are timing here; the way pool and ipython split this is exceedingly poor; they turn this type of task into a lot of scalar evaluations where the cost of transport is MUCH higher than the evaluations time. the pr does exactly this type of batching you need to have each processor execute a slice and work on it in a single task (for each proessor), not distrute the pool like you are showing. |
@michaelaye did you have a look at this branch? https://github.com/jreback/pandas/tree/parallel |
Oh, this is exciting. I've been waiting for a parallel scatter/gather apply function using IPython.parallel. Please keep us up to date on any progress here. |
Indeed! It would be a great feature. I have been using concurrent.futures and that makes things pretty easy, however the cost of spooling up new processes still takes up a bunch of time. If we have IPython parallel kernels just waiting to do work with all the proper imports, passing data pointers to them and aggregating results would be fantastic. |
@dragoljub you have hit the nail on the head. Do you have some code that I could hijack? I don't think it would be very hard to add this using |
It may be overkill but I have a notebook on using IPython.parallel here. There are some quick examples. https://github.com/jseabold/zorro Their docs are also quite good |
thanks skipper....ok next thing....do you guys have some non-trivial examples for vbenching purposes? e.g. stuff that does actual work (and takes a non-trivial amount of time) that can use for benchmarking? (needs to be relatively self-contained....though obviously could use say statsmodels :) |
I happen to be running one such problem right now. :) I'm skipping apply in favor of joblib.Parallel map. Let me see if I can make it self contained. |
Hmm, maybe it is too trivial. My actual use case takes much longer 20 obs ~ 1s and the data is quite big. Find the first occurrence of a word in some text. You can scale up
There are maybe some better examples in
Where the DataFrame contains rows of random starting values and you iterate over the zero axis to do poor man's global optimization. |
Some API inspiration. See aaply, adply, etc. |
I forgot that Hadley kindly re-licensed all of his code to BSD-compatible, so you can take more than API inspiration if for some reason you're curious. |
actually the api really is very easy:
e.g. you just pass an engine (prob will allow you to simply pass a Client directly as an engine) |
Great. You might also allow just 'ipython' and use a default |
passing and will only pass with a threshold number of rows (could have a function do that too) |
Sounds awesome. Can't wait for this. Going to be a big feature. |
What is the status of this? It seems awesome. Do you just need some functions for benchmarks? I can come up with something if that's helpful/ all that's needed. how much should a target function take (per row, say; that's what I always apply on)? 0.1 s? 1 s? 10 s? RAM limitations? |
Well it works for joblib, sort of with IPython.parallel. needs some more work/time. I am also convinced that you need a pretty intensive task for this to be really useful. e.g. the creation/comm time is non-trivial. I won't have time for this for a while, but my code is out there. |
Agree with the comments above :
That's dead on. Also, I very much like the idea of an engine = argument option. This would be a huge benefit for most end users, especially those using pandas as core dependency in their own applications—immediate parallelism across Excited about this, and just amazing, all the work here. Kudos to you all. |
Is it possible to integrate pandas with the @parallel decorator in ipyparallel, like the example that they have with numpy? http://ipyparallel.readthedocs.org/en/latest/multiengine.html#remote-function-decorators I think theoretically speaking, even pandas does not support parallel computing by default, user can still refer to mpi4py for parallel computation. It's just some more coding time if one knows about MPI already. |
@Aspire1Inspire2 check out dask dataframe. https://jakevdp.github.io/blog/2015/08/14/out-of-core-dataframes-in-python/ |
let's just doc this to direct to dask |
I tried out groupby and apply with pandas 0.17.1, and surprised to see the function is applied in parallel. I am confused, is this feature already added and enabled by default? I am not using dask. |
@heroxbd well the GIL IS released during groupby operations. However there is not paralleism natively / inherent in groupby. Why do you think its in parallel? |
The evidence is that all the loads of my CPUs grow to 100% when I call groupby apply. |
so if you do arithmetic ops inside These are performance things that pandas does w/o the user being specifically aware. |
@jreback Ah-ha. Here it is: |
Sorry for the noise. The parallel execution come from OpenMP used by OpenBLAS, which in turn is used by NumPy. |
closing this as |
Was there a reason when releasing the GIL in pandas group by operations to only allows separate group by and apply operations to happen concurrently rather than the computing independent group-level aggregations in parallel? |
Once you have GIL-releasing groupby operations then other developers can use Pandas in parallel. It's actually quite tricky to intelligently write down the algorithms to handle all groupbys. I think that if someone wants to do this for Pandas that'd be great. It's a pretty serious undertaking though. To do this with dask.dataframe
import pandas as pd
df = ... your pandas code ...
import dask.dataframe as dd
df = dd.from_pandas(df, npartitions=20)
dd_result = df.groupby(df.A).B.mean() # or whatever
pd_result = dd_result.compute() # uses as many threads as you have logical cores Of course, this doesn't work on all groupbys (as mentioned before, this is incredibly difficult to do generally) but does work in many common cases. |
@mrocklin thanks for the tip. How long would |
There is a single copy involved (just to other pandas dataframes). We're effectively just doing Generally the |
In particular YMMV depending on such things as the number of groups that you are dealing and how you partition Small number of groups
Larger number of groups
Can do even better if actually use our index
Note that these are pretty naive timings. This is a single computation that is split into embarassingly parallel tasks. Generally you would use dask for multiple steps in a computation. If you data doesn't fit in memory that dask can often help a lot more. |
In an embarrassingly parallel calculation, I create many dataframes which must be dumped to disc (This is the desired output of the program). I tried doing the computation and the dumping (to hdf5) in parallel using joblib. And I run into HDFWrite trouble. Note, that at this point, I do not worry so much about performance of the parallel write. An example which demonstrates the problem is in https://github.com/rbiswas4/ParallelHDFWriteProblem The program What I find is that the serial case always works. But the parallel case is not reproducible: Sometimes it works without a problem and sometimes it crashes. The two log files https://github.com/rbiswas4/ParallelHDFWriteProblem/blob/master/demo.log.worked Is there a better way to write to hdf files in a parallel way from pandas that I should use? Is this a question for other fora like joblib and pyTables? |
@rbiswas4 If you want to dump a bunch of data to disk in parallel, the easiest thing to do is to create a separate HDF5 file for each process. Your approach is certainly going to result in corrupted data -- see the pytables FAQ for more details (http://www.pytables.org/FAQ.html#can-pytables-be-used-in-concurrent-access-scenarios). You might also be interested in dask. |
Writing to separate files is something I have to avoid because I might end up creating too many files (inode limits). I suppose this means one of the following:
It seems the first is the best bet. And, yes I intend to see whether I should use Thanks @shoyer |
@rbiswas4, if you can write out raw hdf5 (via h5py) instead of pytables, please have a look at SWMR, https://www.hdfgroup.org/HDF5/docNewFeatures/NewFeaturesSwmrDocs.html available in hdf5-1.10. |
I would like to promote the idea of applying multiprocessing.Pool() execution of embarrassingly parallel tasks, e.g. the application of a function to a large number of columns.
Obviously, there's some overhead to set it up, so there will be a lower limit of number of columns from which on this only can be faster than the already fast cython approach. I will be adding my performance tests later to this issue.
I was emailing @jreback about this and he added the following remarks/complications:
links:
http://stackoverflow.com/questions/13065172/multiprocess-python-numpy-code-for-processing-data-faster
http://docs.cython.org/src/userguide/parallelism.html
http://distarray.readthedocs.org/en/v0.5.0/
The text was updated successfully, but these errors were encountered: