Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with locking when reading from multiple threads with Dask #214

Closed
TomAugspurger opened this issue Jan 26, 2021 · 13 comments · Fixed by #224
Closed

Issue with locking when reading from multiple threads with Dask #214

TomAugspurger opened this issue Jan 26, 2021 · 13 comments · Fixed by #224
Labels
proposal Idea for a new feature.

Comments

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Jan 26, 2021

Currently, rioxarray.open_rasterio builds a DataArray with a _file_obj that's an xarray CacheFileManager that eventually calls rasterio.open on the user-provided input (typically a filename). When using rioxarray with chunks=True to do parallel reads from multiple threads this can cause workers to wait for a lock, since GDAL file objects are not safe to be read from multiple threads.

This snippet demonstrates the issue by creating and then reading a (4, 2560, 2560) TIF. It uses a SerializableLock subclass that adds a bit of logging when locks are requested and acquired.

# file: rioxarray-locks.py
import rasterio
import rioxarray
import numpy as np
import logging
import threading
import time
import dask
from dask.utils import SerializableLock

logger = logging.getLogger("lock")
logger.setLevel(logging.INFO)
handler = logging.FileHandler("lock.log")
handler.setLevel(logging.INFO)
logger.addHandler(handler)


class InstrumentedLock(SerializableLock):
    """A lock that logs acquisition attempts and completions."""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
    def __enter__(self, *args, **kwargs):
        ident = threading.get_ident()
        t0 = time.perf_counter()
        logger.info("GET lock %d from %d", id(self.lock), ident)
        super().__enter__()
        logger.info("GOT lock %d from %d waited %f", id(self.lock), ident, time.perf_counter() - t0)


if __name__ == "__main__":
    # Generate a tiff file. Has 4 bands that could be read in parallel.
    a = np.random.randint(0, 255, size=(4, 10 * 256, 10 * 256), dtype="uint8")
    transform = rasterio.Affine(30.0, 0.0, 358485.0, 0.0, -30.0, 4265115.0)
    count, width, height = a.shape

    with rasterio.open('example.tif', 'w',
                       width=width, height=height, count=count,
                       dtype=rasterio.uint8,
                       crs='+proj=latlong', transform=transform) as dst:
        for band in range(count):
            dst.write(a[band], band + 1)


    lock = InstrumentedLock()
    ds = rioxarray.open_rasterio("example.tif", chunks=True, lock=lock)

    logger.info("-------------- read meatadata ------------------")

    with dask.config.set(scheduler="threads"):
        ds.mean().compute()

Here's the output to `lock.log:

GET lock 139663598009152 from 139664510379840
GOT lock 139663598009152 from 139664510379840 waited 0.000443
GET lock 139663598009152 from 139664510379840
GOT lock 139663598009152 from 139664510379840 waited 0.000405
-------------- read meatadata ------------------
GET lock 139663598009152 from 139663571498752
GET lock 139663598009152 from 139663563106048
GET lock 139663598009152 from 139663554713344
GET lock 139663598009152 from 139663343875840
GOT lock 139663598009152 from 139663571498752 waited 0.001341
GOT lock 139663598009152 from 139663563106048 waited 0.703704
GOT lock 139663598009152 from 139663554713344 waited 0.711287
GOT lock 139663598009152 from 139663343875840 waited 0.714647

So the section after --- read metadata --- is what matters. We see all the threads try to get the same lock. The first succeeds quickly. But the rest all need to wait. Interestingly, it seems that someone (probably GDAL) has cached the array , so the actual reads of the second through 4th bands are fast (shown by the waits all being around 0.7s). I don't know enough about GDAL / rasterio to say when pieces of data are read and cached, vs. when they're just not read period, but I suspect it depends on the internal chunking of the TIF file and the chunking pattern of the request.

Another note: this only affects multi-threaded reads. If we change dask.config.set(scheduler="processes"), we don't see any waiting for locks. That's because the lock objects themselves are different after they've been serialized.

GET lock 140529993845664 from 140530906224448
GOT lock 140529993845664 from 140530906224448 waited 0.000643
GET lock 140529993845664 from 140530906224448
GOT lock 140529993845664 from 140530906224448 waited 0.000487
-------------- read meatadata ------------------
GET lock 140451225032720 from 140452137330496
GOT lock 140451225032720 from 140452137330496 waited 0.000512
GET lock 140156806203456 from 140157718505280
GOT lock 140156806203456 from 140157718505280 waited 0.001993
GET lock 140098670495760 from 140099582793536
GOT lock 140098670495760 from 140099582793536 waited 0.003202
GET lock 140257691564096 from 140258603865920
GOT lock 140257691564096 from 140258603865920 waited 0.000851

As for potential solutions: a candidate is to pass around URIs like example.tif rather than cached file objects. Then each thread can open its own File object. I think this would have to be an option controlled by a parameter, since the exact tradeoff between the cost of opening a new file vs. reusing an open file object can't be known ahead of time.

I haven't looked closely at whether an implementation like this is easily doable, but if there's interest I can take a look.

cc @scottyhq as well, in case this interests you :) xref to pangeo-data/cog-best-practices#2 and https://github.com/pangeo-data/cog-best-practices/blob/main/rasterio-concurrency.ipynb.

@TomAugspurger TomAugspurger added the proposal Idea for a new feature. label Jan 26, 2021
@snowman2
Copy link
Member

a candidate is to pass around URIs like example.tif rather than cached file objects. Then each thread can open its own File object. I think this would have to be an option controlled by a parameter, since the exact tradeoff between the cost of opening a new file vs. reusing an open file object can't be known ahead of time.

I had issues passing around the file-like objects in #210 and it was resolved by passing around the path. I think passing around the path for reading will likely be the the most flexible and stable solution, even if it is not the most performant. It will likely also enable multi-core reading of the data. So, making that the default would be fine with me, If you want to look into it, I say 🚀

@snowman2
Copy link
Member

It will likely also enable multi-core reading of the data

Apparently this was already possible. With the tip from @djhoese of passing in the different lock types, it works: https://corteva.github.io/rioxarray/latest/examples/dask_read_write.html. However, once this issue is resolved, I suspect we will be able to remove locks entirely when reading the data.

@TomAugspurger
Copy link
Contributor Author

It will likely also enable multi-core reading of the data

Apparently this was already possible.

Hmm, at least in my experiments (very rough notebook at https://nbviewer.jupyter.org/gist/TomAugspurger/73224c1aed6671085dfafdf778790e56) I didn't find this to be the case. You could read from multiple processes in parallel just fine, but not with multiple threads from within a single process (My use case had multiple threads reading from blob storage to ensure that a GPU was getting data fast enough).

@snowman2
Copy link
Member

You could read from multiple processes in parallel just fine, but not with multiple threads from within a single process

That is interesting. Is there a lock that works with multi-thread + multi-core workflows such as dask.distributed.Lock?

@TomAugspurger
Copy link
Contributor Author

Mmm the default lock used in rioxarray, the SerializableLock "works" in the sense that things don't segfault from multiple threads reading at once. But it's not necessarily the highest performance read since threads will need to wait for eachother

Consider a setup with

  1. 4 dask workers (four separate processes)
  2. With 2 threads per worker

So your cluster has 8 cores. With lock=True (or a SerializableLock instance) you'll end up with four separate locks, one per worker, after they've been serialized. Within each worker the two threads will take turns reading. So you can have at most 4 threads reading at once (one per worker).

That's different from a dask.distributed.Lock, which is global to the cluster. With that you'd only have 1 thread reading at once.

I haven't looked yet, but it might be necessary to have a global lock when writing. I don't know if it's safe to write chunks of a COG from multiple processes at once.

@snowman2
Copy link
Member

it might be necessary to have a global lock when writing

This is definitely the case.

@snowman2
Copy link
Member

Mmm the default lock used in rioxarray, the SerializableLock "works" in the sense that things don't segfault from multiple threads reading at once. But it's not necessarily the highest performance read since threads will need to wait for eachother

This is really good information. I think having all of this documented in the dask section would be helpful for users to be able to make that choice.

@snowman2
Copy link
Member

snowman2 commented Feb 5, 2021

Version 0.2 is now released with the change: https://corteva.github.io/rioxarray/stable/examples/read-locks.html

@jessjaco
Copy link

I'm trying to understand the use of this, particularly when lock=True. With some test code, I'm getting an error from xarray:

import rioxarray

url = (
    "https://naipeuwest.blob.core.windows.net/naip/v002/md/2013/md_100cm_2013/"
    "39076/m_3907617_ne_18_1_20130924.tif"
)
ds = rioxarray.open_rasterio(url, lock=True, chunks=(4, "auto", -1))
   File "<stdin>", line 1, in <module>
  File "/venv/lib/python3.8/site-packages/rioxarray/_io.py", line 751, in open_rasterio
    riods = manager.acquire()
  File "/venv/lib/python3.8/site-packages/xarray/backends/file_manager.py", line 181, in acquire
    file, _ = self._acquire_with_cache_info(needs_lock)
  File "/venv/lib/python3.8/site-packages/xarray/backends/file_manager.py", line 197, in _acquire_with_cache_info
    with self._optional_lock(needs_lock):
  File "/opt/conda/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/venv/lib/python3.8/site-packages/xarray/backends/file_manager.py", line 161, in _optional_lock
    with self._lock:
AttributeError: __enter__

In the corresponding rioxarray code it looks like True is just being passed as the lock argument to CachingFileManager. But the in the xarray docs and code it doesn't look like True is a valid argument

@TomAugspurger
Copy link
Contributor Author

I think you're supposed to pass an actual lock. Like threading.Lock().

@snowman2
Copy link
Member

@jessjaco
Copy link

Ok, yeah that's what it looks like. The open_rasterio documentation (https://corteva.github.io/rioxarray/stable/rioxarray.html#rioxarray-open-rasterio) was throwing me off though, because it says lock=True is a valid option.

@snowman2
Copy link
Member

it says lock=True is a valid option.

Good catch 👍. Probably leftovers from another idea. Mind opening a separate issue for that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Idea for a new feature.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants