Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunked processing across multiple raster (geoTIF) files #2314

Closed
lmadaus opened this issue Jul 25, 2018 · 11 comments · Fixed by #7671
Closed

Chunked processing across multiple raster (geoTIF) files #2314

lmadaus opened this issue Jul 25, 2018 · 11 comments · Fixed by #7671

Comments

@lmadaus
Copy link

lmadaus commented Jul 25, 2018

Looking for guidance on best practices, or forcing dask/xarray to work in the way I want it to work :)

Task: I have hundreds of geoTIF files (500 for this test case), each containing a single 2d x-y array (~10000 x 10000). What I need to do is read these all in and apply a function to each x-y point whose argument is a 1-d array populated by the data from that same x-y point across all the files. This function is an optimization fit at each x-y point, so it's not something easily vectorizable, but each point's calculation is independent.

Because each point may be treated independently, the idea is to be able to split the dataset into chunks in the x-y directions and allow Dask to cycle through and fork out the tasks of applying the function to each chunk in turn. Here's how I'm currently trying to implement this:

import xarray as xr
import numpy as np

# Get list of files, set chunksize
filelist = ['image_001.tif', 'image_002.tif', ... 'image_500.tif']
chunksize=300 # Have explored a range of chunksizes from 250-1000

# Open rasters individually in the absence of an open_mfdataset equivalent for rasterio
# Alternatively tried chunking or not in x and y dimensions
# Chunking at this stage blows up the task graph and eats up memory quickly
# but see expected behavior below for reasoning behind trying to chunk at this stage
#rasterlist = [xr.open_rasterio(x, chunks={'x': chunksize, 'y': chunksize}) for x in filelist]
rasterlist = [xr.open_rasterio(x, chunks={'x': None, 'y': None}) for x in filelist]

# Concatenate along band dimension
merged = xr.concat(rasterlist, dim='band')

# Rechunk
merged = merged.chunk({'band': None, 'x': chunksize, 'y': chunksize})

# Use dask map_blocks to apply a function to each chunk.  This triggers the compute.
output = merged.data.map_blocks(myfunction, dtype=np.float32, drop_axis=0)
output = output.compute()

Problem description

What happens now is that all the raster datasets appear to be read into memory in full immediately and without respect to the final chunk alignment needed for the calculation. Then, there seems to be a lot of memory usage and communications overhead as the different chunks get rectified in the concatenation and merge-rechunk operations before myfunction is applied. This blows up the memory requirements, exceeding 300 GB RAM on a 48 worker machine (dataset on disk is ~100 GB). Below is a screenshot of the Dask dashboard right before workers start cutting out with memory limits/can't dump to disk errors.
screen shot 2018-07-24 at 3 56 15 pm

Expected Output

What I'm looking for in this is an out-of-core computation where a single worker stays with a single chunk of the data all the way through to the end. Each "task" would then be a sequence of:

  • Open each of the raster input files and extract only the spatial chunk we were assigned for this task
  • Concatenate these for the computation
  • Perform this computation on this chunk
  • Return the output array for this chunk to be aggregated back together by Dask
  • Release data from memory; move on to the next spatial chunk

I could manually sort this all out with a bunch of dask.delayed tasks, keeping track of the indices/locations of each chunk as they were returned to "reassemble" them later. However, is there an easier way to simplify this operation directly through xarray/dask through some ordering of commands I haven't found yet? Does the netcdf open_mfdataset function somehow handle this sort of idea for multiple netcdf files that could be adapted for rasterio?

Output of xr.show_versions()

INSTALLED VERSIONS ------------------ commit: None python: 3.6.3.final.0 python-bits: 64 OS: Linux OS-release: 4.9.87-linuxkit-aufs machine: x86_64 processor: byteorder: little LC_ALL: None LANG: C.UTF- LOCALE: None.None

xarray: 0.10.7
pandas: 0.23.2
numpy: 1.14.5
scipy: 1.1.0
netCDF4: 1.4.0
h5netcdf: 0.6.1
h5py: 2.8.0
Nio: None
zarr: 2.2.0
bottleneck: 1.2.1
cyordereddict: None
dask: 0.18.1
distributed: 1.22.0
matplotlib: 2.2.2
cartopy: None
seaborn: None
setuptools: 36.5.0.post20170921
pip: 9.0.1
conda: 4.5.5
pytest: None
IPython: 6.4.0
sphinx: None

@jhamman
Copy link
Member

jhamman commented Aug 29, 2018

pinging @scottyhq and @darothen who have both been exploring similar use cases here. I think you all met at the recent pangeo meeting.

@darothen
Copy link

Can you provide a gdalinfo of one of the GeoTiffs? I'm still working on some documentation for use-cases with cloud-optimized GeoTiffs to supplement @scottyhq's fantastic example notebook. One of the wrinkles I'm tracking down and trying to document is when exactly the GDAL->rasterio->dask->xarray pipeline eagerly load the entire file versus when it defers reading or reads subsets of files. So far, it seems that if the GeoTiff is appropriately chunked ahead of time (when it's written to disk), things basically work "automagically."

@shoyer
Copy link
Member

shoyer commented Aug 30, 2018

I think the explicit chunk() call is the source of your woes here. That creates a bunch of tasks to reshard your data that require loading the entire array into memory. If you're using dask-distributed, I think the large intermediate outputs would get cached to disk but this fails if you're using the simpler multithreaded scheduler.

If you drop the line that calls .chunk() and manually index your array to pull out a single time-series before calling map_blocks, does that work properly? e.g., something like merged.isel(x=0, y=0).data.map_blocks(myfunction) (nevermind, this is probably not a great idea)

@scottyhq
Copy link
Contributor

As @darothen mentioned, first thing is to check that the geotiffs themselves are tiled (otherwise I'm guessing that open_rasterio() will open the entire thing. You can do this with:

import rasterio
with rasterio.open('image_001.tif') as src:
    print(src.profile)

Here is the mentioned example notebook which works for tiled geotiffs stored on google cloud:
https://github.com/scottyhq/pangeo-example-notebooks/tree/binderfy

You can use the 'launch binder' button to run it with a pangeo dask-kubernetes cluster, or just read through the landsat8-cog-ndvi.ipynb notebook.

@shoyer
Copy link
Member

shoyer commented Aug 30, 2018

I see now that you are using dask-distributed, but I guess there are still too many intermediate outputs here to do a single rechunk operation.

The crude but effective way to solve this problem would be to loop over spatial tiles using an indexing operation to pull out only a limited extent, compute the calculation on each tile and then reassemble the tiles at the end. To see if this will work, you might try computing a single time-series on your merged dataset before calling .chunk(), e.g., merged.isel(x=0, y=0).compute().

In theory, I think using chunks in open_rasterio should achieve exactly what you want here (assuming the geotiffs are tiled), but as you note it makes for a giant task graph. To balance this tradeoff, I might try picking a very large initial chunksize, e.g., xr.open_rasterio(x, chunks={'x': 3500, 'y': 3500}). This would effectively split the "rechunk" operation into 9 entirely independent parts.

@lmadaus
Copy link
Author

lmadaus commented Aug 30, 2018

Thanks for all the suggestions!

An update from when I originally posted this. Aligning with @shoyer, @darothen and @scottyhq 's comments, we've tested the code using cloud-optimized geoTIF files and regular geoTIFs, and it does perform better with the cloud-optimized form, though still appears to "over-eagerly" load more than just what is being worked on. With the cloud-optimized form, performance is much better when we specify the chunking strategy on the initial open_rasterio and it aligns with the chunk sizes.
e.g.
rasterlist = [xr.open_rasterio(x, chunks={'x': 256, 'y': 256}) for x in filelist]
vs.
rasterlist = [xr.open_rasterio(x, chunks={'x': None, 'y': None}) for x in filelist]

The result is a larger task graph (and much more time spent developing the task graph) but more cases where we don't run into memory problems. There still appears to be a lot more memory used than I expect, but am actively working on exploring options.

We've also noticed better performance using a k8s Dask cluster distributed across multiple "independent" workers as opposed to using a LocalCluster on a single large machine. As in, with the distributed cluster the "myfunction" (fit) operation starts happening on chunks well before the entire dataset is loaded, whereas in the LocalCluster it still tends not to begin until all chunks have been loaded in. Not exactly sure what would cause that...

I'm intrigued by @shoyer 's last suggestion of an "intermediate" chunking step. Will test that and potentially the manual iteration over the tiles. Thanks for all the suggestions and thoughts!

@pblankenau2
Copy link

Has there been any progress on this issue? I am bumping into the same problem.

@shaprann
Copy link

This particular use case is extremely common when working with spatio-temporal data. Can anyone suggest a good workaround for this?

@darothen
Copy link

Hi @shaprann, I haven't re-visited this exact workflow recently, but one really good option (if you can manage the intermediate storage cost) would be to try to use new tools like http://github.com/pangeo-data/rechunker to pre-process and prepare your data archive prior to analysis.

@gjoseph92
Copy link

Just noticed this issue; people needing to do this sort of thing might want to look at stackstac (especially playing with the chunks= parameter) or odc-stac for loading the data. The graph will be cleaner than what you'd get from xr.concat([xr.open_rasterio(...) for ...]).

still appears to "over-eagerly" load more than just what is being worked on

FYI, this is basically expected behavior for distributed, see:

@dcherian
Copy link
Contributor

We've deleted the internal rasterio backend in favor of rioxarray. If this issue is still relevant, please migrate the discussion to the rioxarray repo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants