Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-memory realization of dask-based objects can hang process when using multi-threaded dask config and high thread count #19

Open
grovduck opened this issue May 13, 2024 · 5 comments
Labels
bug Something isn't working

Comments

@grovduck
Copy link
Member

grovduck commented May 13, 2024

When running example code with sknnr-spatial based on the suggested workflow from @aazuspan in #18, I'm encountering an issue with the python process hanging. This only seems to occur when using the multi-threaded (default) version of dask. The single-threaded and multi-process versions of dask work as expected, as does the local cluster version.

To reproduce the issue on my machine (with 36 threads available), I can run the following code:

from multiprocessing.pool import ThreadPool

import dask

from sknnr import GNNRegressor

from sknnr_spatial.datasets import load_swo_ecoplot
from sknnr_spatial import wrap

dask.config.set(pool=ThreadPool(36))

X_img, X, y = load_swo_ecoplot(as_dataset=True)
est = wrap(GNNRegressor(n_neighbors=7)).fit(X, y)
pred = est.predict(X_img)

# This line can trigger the hanging process
x = pred.compute()

I am running this from the project's default hatch configuration. This does not hang every time, but consistently hangs once per every 4-5 runs. When this process hangs, I can see that the python process is still running, typically consuming a high amount of CPU, but at a static level of RAM (typically about 250MB). I can use "End Task" to kill the process.

Attempted Fixes

Working with @aazuspan, we have tried a number of things to resolve this issue.

Create xarray dataset from scratch rather than reading from disk

We thought that the issue might be related to the way that the xarray dataset was being created from disk. To test this, we created a synthetic xarray.Dataset from scratch.

from multiprocessing.pool import ThreadPool

import dask
import numpy as np
import xarray as xr

from sknnr import GNNRegressor
from sknnr.datasets import load_swo_ecoplot

from sknnr_spatial import wrap

dask.config.set(pool=ThreadPool(36))
X, y = load_swo_ecoplot(return_X_y=True, as_frame=True)
arrs = [np.random.normal(mn, sd, (128, 128)) for mn, sd in zip(X.mean(), X.std())]
X_img = xr.Dataset(
    data_vars={k: (["x", "y"], v) for k, v in zip(X.columns, arrs)},
    coords={"x": np.arange(128), "y": np.arange(128)},
)
est = wrap(GNNRegressor(n_neighbors=7)).fit(X, y)
pred = est.predict(X_img)
pred.compute()

The script did not hang for 15 repeated attempts.

Compute the X_img on its own

For this test, we read from disk, but we only test whether the X_img can be "computed" without hanging.

from multiprocessing.pool import ThreadPool
import dask

from sknnr_spatial.datasets import load_swo_ecoplot

dask.config.set(pool=ThreadPool(36))
X_img, X, y = load_swo_ecoplot(as_dataset=True)
X_img.compute()

The script did not hang for 15 repeated attempts.

Use an estimator other than GNNRegressor

We can use a different predictor to see if the issue is specific to the GNNRegressor. We tried using sknnr.EuclideanKNNRegressor.

from multiprocessing.pool import ThreadPool
import dask

from sknnr import EuclideanKNNRegressor

from sknnr_spatial.datasets import load_swo_ecoplot
from sknnr_spatial import wrap

dask.config.set(pool=ThreadPool(36))
X_img, X, y = load_swo_ecoplot(as_dataset=True)
est = wrap(EuclideanKNNRegressor(n_neighbors=7)).fit(X, y)
pred = est.predict(X_img)
pred.compute()

The script hung on the first attempt.

Disable file locks when using rioxarray.open_rasterio

Based on a rioxarray issue submitted by Tom Augspurger, I tried to set lock=False (per Tom's fix) when reading the raster data. This meant modifying datasets._base on line 48 to add the lock=False argument.

From:

    da = rioxarray.open_rasterio(path, chunks=chunks).rename(var_name).squeeze()

to:

    da = (
        rioxarray.open_rasterio(path, lock=False, chunks=chunks)
        .rename(var_name)
        .squeeze()
    )

This did not resolve the issue as the script hung on the first attempt.

Reduce the number of threads

On a whim, I reduced the number of threads used in calculation from 36 to 10 (chosen arbitrarily).

from multiprocessing.pool import ThreadPool

import dask

from sknnr import GNNRegressor

from sknnr_spatial.datasets import load_swo_ecoplot
from sknnr_spatial import wrap

dask.config.set(pool=ThreadPool(10))
X_img, X, y = load_swo_ecoplot(as_dataset=True)
est = wrap(GNNRegressor(n_neighbors=7)).fit(X, y)
pred = est.predict(X_img)
x = pred.compute()

When the number of threads is set to 10, the script does not hang for 15 repeated attempts.

Introduced logging

In perhaps the most bizarre "fix" of all, I added logging to the script to see if I could get any information about where the script was hanging:

import logging
from multiprocessing.pool import ThreadPool
import dask

from sknnr import GNNRegressor

from sknnr_spatial.datasets import load_swo_ecoplot
from sknnr_spatial import wrap

logging.basicConfig(
    level=logging.DEBUG,
    format="[%(asctime)s - %(threadName)s - %(levelname)s] %(message)s",
)


dask.config.set(pool=ThreadPool(36))
X_img, X, y = load_swo_ecoplot(as_dataset=True)
est = wrap(GNNRegressor(n_neighbors=7)).fit(X, y)
pred = est.predict(X_img)
pred.compute()

For reasons I can't explain, when I added logging, the script did not hang for 15 repeated attempts. I can also tell that all 36 threads are being used based on the output of the logging, i.e.

[2024-05-13 11:47:26,247 - Thread-29 (worker) - DEBUG] Window: Window(col_off=64, row_off=64, width=64, height=64)
[2024-05-13 11:47:26,248 - Thread-20 (worker) - DEBUG] Window: Window(col_off=0, row_off=64, width=64, height=64)
[2024-05-13 11:47:26,249 - Thread-35 (worker) - DEBUG] IO window xoff=0.0 yoff=64.0 width=64.0 height=64.0
[2024-05-13 11:47:26,250 - Thread-34 (worker) - DEBUG] IO window xoff=64.0 yoff=64.0 width=64.0 height=64.0
[2024-05-13 11:47:26,252 - Thread-26 (worker) - DEBUG] IO window xoff=0.0 yoff=64.0 width=64.0 height=64.0
[2024-05-13 11:47:26,255 - Thread-25 (worker) - DEBUG] IO window xoff=64.0 yoff=64.0 width=64.0 height=64.0
[2024-05-13 11:47:26,258 - Thread-32 (worker) - DEBUG] IO window xoff=0.0 yoff=64.0 width=64.0 height=64.0
[2024-05-13 11:47:26,260 - Thread-33 (worker) - DEBUG] IO window xoff=0.0 yoff=64.0 width=64.0 height=64.0
[2024-05-13 11:47:26,262 - Thread-22 (worker) - DEBUG] IO window xoff=64.0 yoff=64.0 width=64.0 height=64.0
[2024-05-13 11:47:26,263 - Thread-1 (worker) - DEBUG] all_valid: False

This one may be a total red herring, though.

Status

Unfortunately, this issue appears to only occur on processes with a high number of threads. @aazuspan is not able to reproduce this issue in any configuration, which makes this problematic to test.

Workarounds

Luckily, using any of these dask-based schedulers appears to work correctly:

  • Single-threaded schedulers, i.e. dask.config.set(scheduler="single-threaded")
  • Multi-process schedulers, i.e. dask.config.set(scheduler="processes")
  • Using a local cluster, i.e.
    from dask.distributed import Client
    client = Client()
    
@grovduck grovduck added the bug Something isn't working label May 13, 2024
@aazuspan
Copy link
Contributor

Thanks for the detailed write-up! Just to broadly summarize, it seems like the current takeaways are (please correct me if I'm wrong):

  • Threads are definitely related as the issue never occurs with <=10 threads or with non-thread schedulers.
  • File access is probably related as the issue doesn't occur with generated images.
  • sklearn is probably related as the issue doesn't occur when computing unmodified images.

For reasons I can't explain, when I added logging, the script did not hang for 15 repeated attempts.

This one is definitely a head scratcher... Maybe it's possible that the locks used in the logging module to ensure thread-safety are somehow preventing or resolving an issue with threads locking each other, but that's way beyond my understanding.

I'm fine with leaving this for now as it seems to be an isolated issue with a simple workaround.

@grovduck
Copy link
Member Author

Thanks for the detailed write-up! Just to broadly summarize, it seems like the current takeaways are (please correct me if I'm wrong):

  • Threads are definitely related as the issue never occurs with <=10 threads or with non-thread schedulers.
  • File access is probably related as the issue doesn't occur with generated images.
  • sklearn is probably related as the issue doesn't occur when computing unmodified images.

Yes, I think this is a good synopsis of where I think we are.

Maybe it's possible that the locks used in the logging module to ensure thread-safety are somehow preventing or resolving an issue

Ooo, good find. I'm sure I don't understand either.

@aazuspan
Copy link
Contributor

aazuspan commented Jun 28, 2024

@grovduck I just got an indefinite hang trying to compute GNNRegressor.predict from the large SWO raster in a Python script. I was using the default threads scheduler, and wrapping the computation in timeit.timeit:

import timeit

from sknnr import GNNRegressor

from sknnr_spatial import wrap
from sknnr_spatial.datasets import load_swo_ecoplot

if __name__ == "__main__":
    X_img, X, y = load_swo_ecoplot(as_dataset=True, large_rasters=True)
    est = wrap(GNNRegressor()).fit(X, y)

    print(f"predict: {timeit.timeit(lambda: est.predict(X_img).compute(), number=1):.03f}s")
    print(f"kneighbors: {timeit.timeit(lambda: [x.compute() for x in est.kneighbors(X_img)], number=1):.03f}s")

Interestingly, I got a warning that I haven't seen before:

OpenBLAS warning: precompiled NUM_THREADS exceeded, adding auxiliary array for thread metadata.

That warning led me to this thread, where one of the Dask maintainers suggested that Dask and BLAS (which I believe is used internally by sklearn for kmeans) can interact poorly because Dask will create many threads, which will in turn create many threads via BLAS. He only mentioned performance issues, but it seems possible that this might also lead to threading deadlocks.

I was able to solve my hanging with this suggestion to use threadpool_limits (the accepted solution of setting env variables didn't work for some reason). Interestingly, setting the Dask config to a small ThreadPool didn't help my issue, so maybe this is unrelated...

I'm curious if you've ever noticed that OpenBLAS warning on your end?

EDIT: I don't want to derail this, but I'm also getting OpenBLAS crashes on the ufunc branch due to allocating "too many memory regions". Seems related to scikit-learn/scikit-learn#20539, which suggests a fix is incoming in scipy 1.9. This is also resolved by the threadpoolctl fix above. No idea why this is only popping up now...

@grovduck
Copy link
Member Author

grovduck commented Jul 1, 2024

@aazuspan, running your above code with the addition of the context manager with threadpool_limits(limits=1, user_api="blas") (as in #28), I am seeing this warning. This is the first time that I had remembered encountering this warning, even when I had first written up this issue. Here is my output using the ufunc branch:

OpenBLAS warning: precompiled NUM_THREADS exceeded, adding auxiliary array for thread metadata.
To avoid this warning, please rebuild your copy of OpenBLAS with a larger NUM_THREADS setting
or set the environment variable OPENBLAS_NUM_THREADS to 24 or lower
predict: 13.412s
kneighbors: 21.881s

I then switched back to the main branch and ran my initial example from the initial post on this issue (into a script called initial_bug.py and had this output:

(sknnr-spatial) PS D:\code\sknnr-spatial> git checkout main
Switched to branch 'main'
Your branch is up to date with 'origin/main'.
(sknnr-spatial) PS D:\code\sknnr-spatial> python extra\initial_bug.py
(sknnr-spatial) PS D:\code\sknnr-spatial> python extra\initial_bug.py
(sknnr-spatial) PS D:\code\sknnr-spatial> python extra\initial_bug.py
(sknnr-spatial) PS D:\code\sknnr-spatial> python extra\initial_bug.py
(sknnr-spatial) PS D:\code\sknnr-spatial> python extra\initial_bug.py
(sknnr-spatial) PS D:\code\sknnr-spatial> python extra\initial_bug.py
OpenBLAS warning: precompiled NUM_THREADS exceeded, adding auxiliary array for thread metadata.
To avoid this warning, please rebuild your copy of OpenBLAS with a larger NUM_THREADS setting
or set the environment variable OPENBLAS_NUM_THREADS to 24 or lower
(sknnr-spatial) PS D:\code\sknnr-spatial> python extra\initial_bug.py
(sknnr-spatial) PS D:\code\sknnr-spatial> python extra\initial_bug.py

So it succeeded without issues or warnings for 1-5, 7 runs, threw the warning (but completed) on the sixth, and hung on the eighth run. So even though I don't remember it, it's possible that this warning cropped up during my initial testing, although I would have thought I would have mentioned it.

EDIT: Substituting your context manager (with threadpool_limits(limits=1, user_api="blas")) in for dask.config.set(pool=ThreadPool(36)) in the initial_bug.py script, I'm not able to reproduce the hanging, so that seems like a likely fix for this issue?

@aazuspan
Copy link
Contributor

aazuspan commented Jul 1, 2024

Thanks for the details @grovduck! It's nice to know this isn't isolated to my machine, but I'm at a loss for why this is just now popping up for both of us, and why it appears inconsistently.

Substituting your context manager (with threadpool_limits(limits=1, user_api="blas")) in for dask.config.set(pool=ThreadPool(36)) in the initial_bug.py script, I'm not able to reproduce the hanging, so that seems like a likely fix for this issue?

Great, it seems like that strongly suggests that this is all related to threading and BLAS (somehow). I'd suggest we leave this open for now and I'll dive a little deeper into the research to try and figure out what's happening and why, but if the threadpool_limits works consistently on your end, that seems like a good workaround. I guess we could consider hard-coding that into our wrapped estimator methods, although that might be a little heavy-handed since it shouldn't be necessary for non-thread schedulers...

EDIT: Dask best practices recommend limiting OpenBLAS threads to 1 prior to parallelizing Numpy operations, to avoid performance issues. The Numpy docs also call out OpenBLAS parallelization as a potential issue when parallelizing with Dask or sklearn:

Both MKL and OpenBLAS will use multi-threading for function calls like np.dot ... It typically yields better performance, but can also be harmful - for example when using another level of parallelization with Dask, scikit-learn or multiprocessing.

EDIT 2: I've never encountered OpenBLAS warnings or errors on my Linux machine, but did find that limiting OpenBLAS to 1 thread reduced prediction time from ~44s to ~23s.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants