Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Struggling with dask delayed to parallelize tidal analysis #194

Closed
rsignell-usgs opened this issue Apr 3, 2018 · 31 comments
Closed

Struggling with dask delayed to parallelize tidal analysis #194

rsignell-usgs opened this issue Apr 3, 2018 · 31 comments

Comments

@rsignell-usgs
Copy link
Member

rsignell-usgs commented Apr 3, 2018

I told @rabernat I'd post this on SO, but the simplified test I made to isolate the problem for SO ended up working, so I'm posting here in case the pangeo env or workflow has something to do with the issue.

I'm trying to use dask delayed to do an embarrassingly parallel problem: apply a time series function (tidal analysis) to each (lat, lon) location in a 3D (time, lat, lon) data cube.

This simple example worked fine:

import dask.array as da
from dask import delayed
import numpy as np
from dask.distributed import Client, LocalCluster

def mysin(x):
    return np.sin(x)

dsin = delayed(mysin)
z = np.random.random((50,100,100))

cluster = LocalCluster(n_workers=3,ncores=3)
client = Client(cluster)

nt, n, m = z.shape
zr = [(dsin(z[:,j,i])) for j in range(n) for i in range(m)]
total = delayed(zr).compute()

but my tide notebook with tidal analysis on model output loaded from Zarr looks good up to the line:

%time total = delayed(coefs).compute()

where the kernel dies.

The logs look good just before the kernel dies, but then I can't access the logs since the kernel has died. 😢

Does anyone see anything obviously wrong with my tide notebook?

@ah-
Copy link

ah- commented Apr 3, 2018

Haven't looked in detail, but it seems like you might have a huge number of delayed tasks?

If so, it will probably run much better with fewer more coarse tasks. Or you might be able to use dask array/xarray to do the chunking for you?

@rsignell-usgs
Copy link
Member Author

rsignell-usgs commented Apr 3, 2018

@ah, yes, there are about 125K delayed tasks. But I'm not sure how to make the task coarser. The tidal analysis function can't take larger chunks, like multiple time series at once, so I need to run it at each lon,lat location. And there are lots (125K) of lon,lat locations.

@rsignell-usgs
Copy link
Member Author

rsignell-usgs commented Apr 4, 2018

So if I subsample the grid with nsub=4 instead of nsub=2 (35K delayed tasks), the kernel doesn't die, and the notebook works, producing the map of M2 amplitude below.

  • What controls whether the kernel will die (e.g. how many delayed tasks is "huge")?
  • What would I need to do differently with my workflow to get Dask to work with nsub=2 ?

2018-04-03_23-45-56

@pbranson
Copy link
Member

pbranson commented Apr 4, 2018 via email

@mrocklin
Copy link
Member

mrocklin commented Apr 4, 2018

Each task takes up about 10kB and a millisecond of overhead

In [1]: from dask.distributed import Client, wait

In [2]: from distributed.utils import format_bytes

In [3]: import psutil

In [4]: client = Client()

In [5]: start = psutil.Process().memory_info().rss

In [6]: format_bytes(start)
Out[6]: '104.30 MB'

In [7]: def inc(x):
   ...:     return x + 1
   ...: 

In [8]: %time futures = client.map(inc, range(100000))
CPU times: user 3.41 s, sys: 72.8 ms, total: 3.48 s
Wall time: 3.49 s

In [9]: %time _ = wait(futures)
CPU times: user 32.4 s, sys: 235 ms, total: 32.6 s
Wall time: 33.1 s

In [10]: end = psutil.Process().memory_info().rss

In [11]: format_bytes(end)
Out[11]: '1.07 GB'

In [12]: format_bytes((end - start) / len(futures))
Out[12]: '9.64 kB'

If your tasks deal with times that are shorter than this then you might consider bundling several such operations within a single task, perhaps using a for loop within your function.

Typically I watch the diagnostic dashboard and, if I see a lot of white space in the task stream plot (the central plot on the status page) and only sporadic thin vertical bars then that is a sign that my system is spending most of its time in scheduling overhead (the 1ms cost) rather than actual computation. See dashboard video at this time

@rsignell-usgs
Copy link
Member Author

@mrocklin, I checked out the dashboard, and it seems that dask with 35k delayed tasks is actually working very nicely (not too much white space):

2018-04-04_8-43-17

So I then ran the tidal analysis in serial mode to see how much slower it was compared to my Dask workflow, and it took 1 hour 15 min instead of 1min 52 s. That's a speedup of 40, so perfect linear speedup with the 40 cpus in my Dask cluster:

2018-04-04_10-14-35

So I'm pretty happy with the workflow. I just wished it worked with 100K or 500K tasks also.

@mrocklin
Copy link
Member

mrocklin commented Apr 4, 2018 via email

@mrocklin
Copy link
Member

mrocklin commented Apr 4, 2018 via email

@rsignell-usgs
Copy link
Member Author

rsignell-usgs commented Apr 4, 2018

@mrocklin, looks like the kernel dies when the memory on the "system" tab hits about 3.7GB:
2018-04-04_11-45-40

It looks like the myworker.yml config I'm using has a 6GB limit:

$jovyan@jupyter-rsignell-2dusgs:~$ more myworker.yml
metadata:
spec:
  restartPolicy: Never
  containers:
  - args:
      - dask-worker
      - --nthreads
      - '2'
      - --no-bokeh
      - --memory-limit
      - 6GB
      - --death-timeout
      - '60'
    image: pangeo/worker:2018-03-28
    name: dask-worker
    securityContext:
      capabilities:
        add: [SYS_ADMIN]
      privileged: true
    env:
      - name: GCSFUSE_BUCKET
        value: pangeo-data
      - name: EXTRA_CONDA_PACKAGES
        value: utide -c conda-forge
    resources:
      limits:
        cpu: "1.75"
        memory: 6G
      requests:
        cpu: "1.75"
        memory: 6G

Does this info point to memory being the reason the kernel dies?
If so, can I just increase the limits in myworker.yml?

@mrocklin
Copy link
Member

mrocklin commented Apr 4, 2018 via email

@stale
Copy link

stale bot commented Jun 25, 2018

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Jun 25, 2018
@stale
Copy link

stale bot commented Jul 2, 2018

This issue has been automatically closed because it had not seen recent activity. The issue can always be reopened at a later date.

@stale stale bot closed this as completed Jul 2, 2018
@rsignell-usgs
Copy link
Member Author

rsignell-usgs commented Jan 7, 2020

I'm revisiting this topic to give a summary of how I eventually addressed this issue. I've been able to make it work by splitting the large (500,000) list of dask tasks into 30,000 task chunks, and letting Dask work on each chunk in series. This way Dask stays busy but we don't run out of scheduler memory.

Each chunk takes about 3.25 minutes to run, with 2 minutes taken by the scheduler, then 1.5 minutes by the workers (with 120cpus) to do the tasks. The tasks each take about 300ms to run. In the Dask Documentation it says:

The scheduler adds about one millisecond of overhead per task or Future object. While this may sound fast it’s quite slow if you run a billion tasks. If your functions run faster than 100ms or so then you might not see any speedup from using distributed computing.

30,000 tasks taking 2 minutes in the scheduler works out to 4 milliseconds per task. Obviously it's not too optimal to be spending 60% of our time in the scheduler, and it would be great to be able to follow this advice from the Dask documentation to create fewer, longer running tasks:

A common solution is to batch your input into larger chunks.

Each task, however, is a call to a tidal analysis program with a single time series as input, so it would seem that we would need to modify the code for this program to accept multiple time series if we are to see any additional benefit.

Does this seem like an accurate assessment?

@rsignell-usgs rsignell-usgs reopened this Jan 7, 2020
@stale stale bot removed the stale label Jan 7, 2020
@dcherian
Copy link
Contributor

dcherian commented Jan 7, 2020

(500,000) list of dask tasks

It looks like you are effectively chunking your dataset as each chunk being size 1 in lat, lon.

If so, you could use apply_ufunc(vectorize=True) to process larger chunks. That will still be slow but you could use numba.guvectorize instead of vectorize=True to (maybe) go faster. This example notebook may help: pydata/xarray#3629 (comments on the notebook are welcome)

@guillaumeeb
Copy link
Member

Have you seen https://examples.dask.org/applications/embarrassingly-parallel.html#Handling-very-large-simulation-with-Bags ?

I'm not sure this applies to your problem though, as your input data is more complex than in the example. Trying to launch all at once would probably overwhelm the scheduler as you experienced...

@rsignell-usgs
Copy link
Member Author

Thanks @dcherian and @guillaumeeb. I'm going to try recasting using that bag approach and I will report back. Also, just for reference, here is my current notebook solution.

@mrocklin
Copy link
Member

mrocklin commented Jan 7, 2020 via email

@rsignell-usgs
Copy link
Member Author

rsignell-usgs commented Jan 8, 2020

@mrocklin , cool, I did not know about this capability. Here's the dask performance report you requested!

@mrocklin
Copy link
Member

mrocklin commented Jan 8, 2020 via email

@rsignell-usgs
Copy link
Member Author

@mrocklin, I regenerated the dask performance report , and this time the scheduler and admin profiles are not blank!

@mrocklin
Copy link
Member

mrocklin commented Jan 9, 2020

Thanks @rsignell-usgs ! I guess what we're seeing there is that the scheduler and worker administrative threads just don't seem to be busy at all. They don't seem to be under load. You might add the following to your config to make sure that communication costs end up on the main thread, and see if that changes things qualitatively.

distributed
  comm:
    offload: False

Looking at your task stream plot, I also notice that you're spending a lot of time in inv. If this is something like np.linalg.inv I would encourage you to look at using a solve function instead if that makes sense. They generally do the same work, but are faster and more stable.

@rsignell-usgs
Copy link
Member Author

Thanks @mrocklin, I'll check both of those things out. I'm pretty sure there is no reason to invert the matrix. So for the communications, just to make sure, I just add

distributed:
  comm:
    offload: False

to the end of my ~/.dask/config.yaml file?

@rsignell-usgs
Copy link
Member Author

rsignell-usgs commented Jan 10, 2020

And thanks @guillaumeeb for the tip on dask bag for this workflow. I can now run 20% faster and don't have to create ugly loops or wait for the giant list of delayed functions to get created! The bag approach is in cells [17-25] of this new notebook.

@guillaumeeb
Copy link
Member

guillaumeeb commented Jan 11, 2020

Glad to hear that ! Just one question though, why are you using 60000 partitions ? That's quite a lot.

@mrocklin
Copy link
Member

mrocklin commented Jan 11, 2020 via email

@rsignell-usgs
Copy link
Member Author

@guillaumeeb, I tried 6,000 partitions instead of 60,000 partitions for my 500,000 tasks, which shortened my run time by only 4%, but it made my performance report 10 times smaller. Thanks for the idea! 😸

@mrocklin, adding those lines to the dask config indeed did the trick. Now I have filled-in admin tabs in my new smaller Dask performance report!

@dcherian , I did take a look at the ufunc approach you suggested, but the bag approach here seems more straightforward and likely to be comparable in performance. Do you agree?

@mrocklin
Copy link
Member

mrocklin commented Jan 11, 2020 via email

@rsignell-usgs
Copy link
Member Author

I just tried running the same job on a different HPC system with the same number of cores (120) and instead of taking 2.5 minutes it takes 25 minutes. The CPUS are not the same, but what could explain an order of magnitude difference in performance for these 500 tasks?
Here is the slow performance report.

@mrocklin
Copy link
Member

mrocklin commented Jan 14, 2020 via email

@rsignell-usgs
Copy link
Member Author

Understanding (and using) dask bag allowed me to solve this use case.

@guziy
Copy link

guziy commented Aug 24, 2021

I am doing detiding using ttide (found it a bit faster than utide), similar problem)) I have 1.5 million points.

I do chunking i.e. n points (500) per task. And another thing that helped to make workers start faster is batch_size of the client.map... But I still find that the jobs could be starting earlier, here is the gist of my workflow..

with get_client(args) as client:

        # save dashboard link to the disk

        with Path("dashboard_link.html").open("w") as f:

            f.write(f"""<html><body><a href="{client.dashboard_link}">Dashboard</></body></html>""")

        n_tasks = len(inputs_chunks)
       
        # launch execution

        logger.info("Submitting %d tasks", n_tasks)

        max_n_batches = max(int(0.01 * n_tasks), 1)

        batch_size = n_tasks // max_n_batches

        logger.info("batch_size for client.map: %d", batch_size)

        futures = client.map(compute_surge_map_many, inputs_chunks, batch_size=batch_size)

        progress(futures)

        chunk_results = client.gather(futures)

        vals = np.array(sum(chunk_results, start=[])).T

        logger.info(f"Client gathered results: type=%s", type(vals[0]))

Do you have any suggestions for better calculation of the batch size or maybe other optimizations?

Currently, it takes 20 minutes to process on 800 cpus (1 thread per process) and the task flow is not so nicely filled as yours on the dashboard)). It would be cool if I could save the results without gathering into one netcdf directly from the workers, but I read that this is not supported for netcdf4/hdf5....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants