From 67fe7d360b10bbd0ddc495cd3f280a0106fce0d3 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Tue, 26 Nov 2024 18:12:54 +0000 Subject: [PATCH] Split xtdf image data into 5 GB chunks for reading --- extra_data/components.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/extra_data/components.py b/extra_data/components.py index 0f7f5803..3846b937 100644 --- a/extra_data/components.py +++ b/extra_data/components.py @@ -1,6 +1,7 @@ """Interfaces to data from specific instruments """ import logging +import math import re from collections.abc import Iterable from copy import copy @@ -312,7 +313,7 @@ def _select_trains(cls, data, mod_data_counts, min_modules): return data.select_trains(by_id[train_ids]) @staticmethod - def _split_align_chunk(chunk, target_train_ids: np.ndarray): + def _split_align_chunk(chunk, target_train_ids: np.ndarray, length_limit=np.inf): """ Split up a source chunk to align with parts of a joined array. @@ -328,6 +329,9 @@ def _split_align_chunk(chunk, target_train_ids: np.ndarray): target_train_ids: numpy.ndarray Train ID index for target array to align chunk data to. Train IDs may occur more than once in here. + length_limit: int + Maximum length of slices (stop - start) to yield. Larger slices will + be split up into several pieces. Unlimited by default. """ # Expand the list of train IDs to one per frame chunk_tids = np.repeat(chunk.train_ids, chunk.counts.astype(np.intp)) @@ -353,14 +357,16 @@ def _split_align_chunk(chunk, target_train_ids: np.ndarray): else: n_match = len(chunk_tids) - # Select the matching data - chunk_match_end = chunk_match_start + n_match - tgt_end = tgt_start + n_match - - yield slice(tgt_start, tgt_end), slice(chunk_match_start, chunk_match_end) + # Split the matched data if needed for length_limit + n_batches = max(math.ceil(n_match / length_limit), 1) + for i in range(n_batches): + start = i * n_match // n_batches + stop = (i + 1) * n_match // n_batches + yield (slice(tgt_start + start, tgt_start + stop), + slice(chunk_match_start + start, chunk_match_start + stop)) # Prepare remaining data in the chunk for the next match - chunk_match_start = chunk_match_end + chunk_match_start += n_match chunk_tids = chunk_tids[n_match:] @property @@ -1157,8 +1163,15 @@ def _sel_frames(self): def _read_chunk(self, chunk: DataChunk, mod_out, roi): """Read per-pulse data from file into an output array (of 1 module)""" + # Limit to 5 GB sections of the dataset at once, so the temporary + # arrays used in the workaround below are not too large. + nbytes_frame = chunk.dataset.dtype.itemsize + for dim in chunk.dataset.shape[1:]: + nbytes_frame *= dim + frame_limit = 5 * (1024 ** 3) // nbytes_frame + for tgt_slice, chunk_slice in self.det._split_align_chunk( - chunk, self.det.train_ids_perframe + chunk, self.det.train_ids_perframe, length_limit=frame_limit ): inc_pulses_chunk = self._sel_frames[tgt_slice] if inc_pulses_chunk.sum() == 0: # No data from this chunk selected