From 99daca78e9ef2207b78e922ad393cadf9a8ab019 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pawe=C5=82=20Ordyna?=
Date: Thu, 17 Aug 2023 06:21:42 +0200
Subject: [PATCH] make it possible to manually set chunks when loading dask
arrays (#1477)
* make it possible to manually set chunks when loading dask arrays
* Apply suggestions from code review
Copyright and typos in setting chunks for dask arrays.
Co-authored-by: Axel Huebl
---
docs/source/analysis/dask.rst | 3 +
src/binding/python/openpmd_api/DaskArray.py | 102 +++++++++++---------
2 files changed, 57 insertions(+), 48 deletions(-)
diff --git a/docs/source/analysis/dask.rst b/docs/source/analysis/dask.rst
index e00dce0e98..432bdd5790 100644
--- a/docs/source/analysis/dask.rst
+++ b/docs/source/analysis/dask.rst
@@ -41,6 +41,9 @@ The central Python API calls to convert to DASK datatypes are the ``ParticleSpec
# note: no series.flush() needed
+The ``to_dask_array`` method will automatically set Dask array chunking based on the available chunks in the read data set.
+The default behavior can be overridden by passing an additional keyword argument ``chunks``, see the `dask.array.from_array documentation `__ for more details.
+For example, to chunk only along the outermost axis in a 3D dataset using the default Dask array chunk size, call ``to_dask_array(chunks={0: 'auto', 1: -1, 2: -1})``.
Example
-------
diff --git a/src/binding/python/openpmd_api/DaskArray.py b/src/binding/python/openpmd_api/DaskArray.py
index 8c6fc3001b..709b668be8 100644
--- a/src/binding/python/openpmd_api/DaskArray.py
+++ b/src/binding/python/openpmd_api/DaskArray.py
@@ -1,8 +1,8 @@
"""
This file is part of the openPMD-api.
-Copyright 2021 openPMD contributors
-Authors: Axel Huebl
+Copyright 2021-2023 openPMD contributors
+Authors: Axel Huebl, Pawel Ordyna
License: LGPLv3+
"""
import math
@@ -49,7 +49,7 @@ def __getitem__(self, slices):
return data
-def record_component_to_daskarray(record_component):
+def record_component_to_daskarray(record_component, chunks=None):
"""
Load a RecordComponent into a Dask.array.
@@ -57,6 +57,10 @@ def record_component_to_daskarray(record_component):
----------
record_component : openpmd_api.Record_Component
A record component class in openPMD-api.
+ chunks : chunks parameter to pass to dask.array.from_array.
+ See dask documentation for more details.
+ When set to None (default) the chunking will be automaticaly
+ determined based on record_component.available_chunks().
Returns
-------
@@ -84,54 +88,56 @@ def record_component_to_daskarray(record_component):
if not found_dask:
raise ImportError("dask NOT found. Install dask for Dask DataFrame "
"support.")
-
- # get optimal chunks
- chunks = record_component.available_chunks()
-
- # sort and prepare the chunks for Dask's array API
- # https://docs.dask.org/en/latest/array-chunks.html
- # https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
- # sorted and unique
- offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks])))
- offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim]
-
- # print("offsets_sorted_unique_per_dim=",
- # list(offsets_sorted_unique_per_dim))
-
- # case 1: PIConGPU static load balancing (works with Dask assumptions,
- # chunk option no. 3)
- # all chunks in the same column have the same column width although
- # individual columns have different widths
- # case 2: AMReX boxes
- # all chunks are multiple of a common block size, offsets are a multiple
- # of a common blocksize
- # problem: too limited description in Dask
- # https://github.com/dask/dask/issues/7475
- # work-around: create smaller chunks (this incurs a read cost) by forcing
- # into case 1
- # (this can lead to larger blocks than using the gcd of the
- # extents aka AMReX block size)
- common_chunk_widths_per_dim = list()
- for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
- # print("d=", d, offsets_in_dim, record_component.shape[d])
- offsets_in_dim_arr = np.array(offsets_in_dim)
- # note: this is in the right order of rows/columns, contrary to a
- # sorted extent list from chunks
- extents_in_dim = np.zeros_like(offsets_in_dim_arr)
- extents_in_dim[:-1] = offsets_in_dim_arr[1:]
- extents_in_dim[-1] = record_component.shape[d]
- if len(extents_in_dim) > 1:
- extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
- extents_in_dim[-1] -= offsets_in_dim_arr[-1]
- # print("extents_in_dim=", extents_in_dim)
- common_chunk_widths_per_dim.append(tuple(extents_in_dim))
-
- common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim)
- # print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim)
+ if chunks is None:
+ # get optimal chunks
+ chunks = record_component.available_chunks()
+
+ # sort and prepare the chunks for Dask's array API
+ # https://docs.dask.org/en/latest/array-chunks.html
+ # https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
+ # sorted and unique
+ offsets_per_dim = list(
+ map(list, zip(*[chunk.offset for chunk in chunks])))
+ offsets_sorted_unique_per_dim = [
+ sorted(set(o)) for o in offsets_per_dim]
+
+ # print("offsets_sorted_unique_per_dim=",
+ # list(offsets_sorted_unique_per_dim))
+
+ # case 1: PIConGPU static load balancing (works with Dask assumptions,
+ # chunk option no. 3)
+ # all chunks in the same column have the same column width although
+ # individual columns have different widths
+ # case 2: AMReX boxes
+ # all chunks are multiple of a common block size, offsets
+ # are a multiple of a common blocksize
+ # problem: too limited description in Dask
+ # https://github.com/dask/dask/issues/7475
+ # work-around: create smaller chunks (this incurs a read cost)
+ # by forcing into case 1
+ # (this can lead to larger blocks than using
+ # the gcd of the extents aka AMReX block size)
+ common_chunk_widths_per_dim = list()
+ for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
+ # print("d=", d, offsets_in_dim, record_component.shape[d])
+ offsets_in_dim_arr = np.array(offsets_in_dim)
+ # note: this is in the right order of rows/columns, contrary to a
+ # sorted extent list from chunks
+ extents_in_dim = np.zeros_like(offsets_in_dim_arr)
+ extents_in_dim[:-1] = offsets_in_dim_arr[1:]
+ extents_in_dim[-1] = record_component.shape[d]
+ if len(extents_in_dim) > 1:
+ extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
+ extents_in_dim[-1] -= offsets_in_dim_arr[-1]
+ # print("extents_in_dim=", extents_in_dim)
+ common_chunk_widths_per_dim.append(tuple(extents_in_dim))
+
+ chunks = tuple(common_chunk_widths_per_dim)
+ # print("chunks=", chunks)
da = from_array(
DaskRecordComponent(record_component),
- chunks=common_chunk_widths_per_dim,
+ chunks=chunks,
# name=None,
asarray=True,
fancy=False,