diff --git a/docs/source/analysis/dask.rst b/docs/source/analysis/dask.rst index e00dce0e98..432bdd5790 100644 --- a/docs/source/analysis/dask.rst +++ b/docs/source/analysis/dask.rst @@ -41,6 +41,9 @@ The central Python API calls to convert to DASK datatypes are the ``ParticleSpec # note: no series.flush() needed +The ``to_dask_array`` method will automatically set Dask array chunking based on the available chunks in the read data set. +The default behavior can be overridden by passing an additional keyword argument ``chunks``, see the `dask.array.from_array documentation `__ for more details. +For example, to chunk only along the outermost axis in a 3D dataset using the default Dask array chunk size, call ``to_dask_array(chunks={0: 'auto', 1: -1, 2: -1})``. Example ------- diff --git a/src/binding/python/openpmd_api/DaskArray.py b/src/binding/python/openpmd_api/DaskArray.py index 8c6fc3001b..709b668be8 100644 --- a/src/binding/python/openpmd_api/DaskArray.py +++ b/src/binding/python/openpmd_api/DaskArray.py @@ -1,8 +1,8 @@ """ This file is part of the openPMD-api. -Copyright 2021 openPMD contributors -Authors: Axel Huebl +Copyright 2021-2023 openPMD contributors +Authors: Axel Huebl, Pawel Ordyna License: LGPLv3+ """ import math @@ -49,7 +49,7 @@ def __getitem__(self, slices): return data -def record_component_to_daskarray(record_component): +def record_component_to_daskarray(record_component, chunks=None): """ Load a RecordComponent into a Dask.array. @@ -57,6 +57,10 @@ def record_component_to_daskarray(record_component): ---------- record_component : openpmd_api.Record_Component A record component class in openPMD-api. + chunks : chunks parameter to pass to dask.array.from_array. + See dask documentation for more details. + When set to None (default) the chunking will be automaticaly + determined based on record_component.available_chunks(). Returns ------- @@ -84,54 +88,56 @@ def record_component_to_daskarray(record_component): if not found_dask: raise ImportError("dask NOT found. Install dask for Dask DataFrame " "support.") - - # get optimal chunks - chunks = record_component.available_chunks() - - # sort and prepare the chunks for Dask's array API - # https://docs.dask.org/en/latest/array-chunks.html - # https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions - # sorted and unique - offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks]))) - offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim] - - # print("offsets_sorted_unique_per_dim=", - # list(offsets_sorted_unique_per_dim)) - - # case 1: PIConGPU static load balancing (works with Dask assumptions, - # chunk option no. 3) - # all chunks in the same column have the same column width although - # individual columns have different widths - # case 2: AMReX boxes - # all chunks are multiple of a common block size, offsets are a multiple - # of a common blocksize - # problem: too limited description in Dask - # https://github.com/dask/dask/issues/7475 - # work-around: create smaller chunks (this incurs a read cost) by forcing - # into case 1 - # (this can lead to larger blocks than using the gcd of the - # extents aka AMReX block size) - common_chunk_widths_per_dim = list() - for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim): - # print("d=", d, offsets_in_dim, record_component.shape[d]) - offsets_in_dim_arr = np.array(offsets_in_dim) - # note: this is in the right order of rows/columns, contrary to a - # sorted extent list from chunks - extents_in_dim = np.zeros_like(offsets_in_dim_arr) - extents_in_dim[:-1] = offsets_in_dim_arr[1:] - extents_in_dim[-1] = record_component.shape[d] - if len(extents_in_dim) > 1: - extents_in_dim[:-1] -= offsets_in_dim_arr[:-1] - extents_in_dim[-1] -= offsets_in_dim_arr[-1] - # print("extents_in_dim=", extents_in_dim) - common_chunk_widths_per_dim.append(tuple(extents_in_dim)) - - common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim) - # print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim) + if chunks is None: + # get optimal chunks + chunks = record_component.available_chunks() + + # sort and prepare the chunks for Dask's array API + # https://docs.dask.org/en/latest/array-chunks.html + # https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions + # sorted and unique + offsets_per_dim = list( + map(list, zip(*[chunk.offset for chunk in chunks]))) + offsets_sorted_unique_per_dim = [ + sorted(set(o)) for o in offsets_per_dim] + + # print("offsets_sorted_unique_per_dim=", + # list(offsets_sorted_unique_per_dim)) + + # case 1: PIConGPU static load balancing (works with Dask assumptions, + # chunk option no. 3) + # all chunks in the same column have the same column width although + # individual columns have different widths + # case 2: AMReX boxes + # all chunks are multiple of a common block size, offsets + # are a multiple of a common blocksize + # problem: too limited description in Dask + # https://github.com/dask/dask/issues/7475 + # work-around: create smaller chunks (this incurs a read cost) + # by forcing into case 1 + # (this can lead to larger blocks than using + # the gcd of the extents aka AMReX block size) + common_chunk_widths_per_dim = list() + for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim): + # print("d=", d, offsets_in_dim, record_component.shape[d]) + offsets_in_dim_arr = np.array(offsets_in_dim) + # note: this is in the right order of rows/columns, contrary to a + # sorted extent list from chunks + extents_in_dim = np.zeros_like(offsets_in_dim_arr) + extents_in_dim[:-1] = offsets_in_dim_arr[1:] + extents_in_dim[-1] = record_component.shape[d] + if len(extents_in_dim) > 1: + extents_in_dim[:-1] -= offsets_in_dim_arr[:-1] + extents_in_dim[-1] -= offsets_in_dim_arr[-1] + # print("extents_in_dim=", extents_in_dim) + common_chunk_widths_per_dim.append(tuple(extents_in_dim)) + + chunks = tuple(common_chunk_widths_per_dim) + # print("chunks=", chunks) da = from_array( DaskRecordComponent(record_component), - chunks=common_chunk_widths_per_dim, + chunks=chunks, # name=None, asarray=True, fancy=False,