From 47ea8355dbe7327b042fcebe6ff10bfd9ff660c3 Mon Sep 17 00:00:00 2001 From: Altay Sansal Date: Fri, 18 Nov 2022 21:24:17 -0600 Subject: [PATCH] optimized segy export logic for N-D datasets --- src/mdio/segy/blocked_io.py | 77 ++++++++++++++++++++++++++++--------- src/mdio/segy/creation.py | 34 ++++++++-------- 2 files changed, 75 insertions(+), 36 deletions(-) diff --git a/src/mdio/segy/blocked_io.py b/src/mdio/segy/blocked_io.py index 177d0988..1e315ba5 100644 --- a/src/mdio/segy/blocked_io.py +++ b/src/mdio/segy/blocked_io.py @@ -210,40 +210,48 @@ def to_zarr( return stats -def segy_row_concat( - partial_rows: NDArray, +def segy_concat( + partial_files: NDArray, axis: tuple[int] = None, keepdims: bool = None, ) -> NDArray: """Aggregate partial SEG-Y blocks on disk, preserving order. + Used in conjunction with tree reduction. It will take an array + of file names, which preserved the adjacency of blocks, and then + combines adjacent blocks while flattening for SEG-Y. + + For `axis` and `keepdims` parameters, please see `dask.array.reduce` + documentation. + Args: - partial_rows: Array containing paths to parts of a SEG-Y row. + partial_files: Array containing paths to parts of a SEG-Y row. axis: Which axes to concatenate on. keepdims: Keep the original dimensionality after merging. Returns: Concatenated file name array. Dimensions depend on `keepdims`. """ - n_rows = partial_rows.shape[0] - concat_rows = np.full_like(partial_rows, fill_value="missing", shape=n_rows) + concat_shape = partial_files.shape[0] + concat_paths = np.full_like(partial_files, fill_value="missing", shape=concat_shape) # Fast path if all data in block is missing. - if np.all(partial_rows == "missing"): - return np.expand_dims(concat_rows, axis) if keepdims else concat_rows + if np.all(partial_files == "missing"): + return np.expand_dims(concat_paths, axis) if keepdims else concat_paths - # Concat rows and update concat_rows result. - for idx, row_parts in enumerate(partial_rows): - row_missing = row_parts == "missing" + # Flatten and concat section files to a single root file at first axis. + for index, section_paths in enumerate(partial_files): + section_paths = section_paths.ravel() + section_missing = section_paths == "missing" - if np.all(row_missing): + if np.all(section_missing): continue - row_valid_paths = np.extract(~row_missing, row_parts).tolist() - row_concat_file = concat_files(row_valid_paths) - concat_rows[idx] = row_concat_file + section_valid_paths = np.extract(~section_missing, section_paths).tolist() + section_concat_files = concat_files(section_valid_paths) + concat_paths[index] = section_concat_files - return np.expand_dims(concat_rows, axis) if keepdims else concat_rows + return np.expand_dims(concat_paths, axis) if keepdims else concat_paths def to_segy( @@ -255,7 +263,40 @@ def to_segy( file_root: str, axis: tuple[int] | None = None, ) -> Array: - """Convert MDIO blocks to SEG-Y parts. + r"""Convert MDIO blocks to SEG-Y parts. + + This uses as a tree reduction algorithm. Blocks are written out + in parallel via multiple workers, and then adjacent blocks are + tracked and merged on disk via the `segy_concat` function. The + adjacent are hierarchically merged, and it preserves order. + + Assume array with shape (4, 3, 2) with chunk sizes (1, 1, 1, 2). + The chunk indices for this array would be: + + (0, 0, 0) (0, 1, 0) (0, 2, 0) + (1, 0, 0) (1, 1, 0) (1, 2, 0) + (2, 0, 0) (2, 1, 0) (2, 2, 0) + (3, 0, 0) (3, 1, 0) (3, 2, 0) + + let's rename them to this for convenience: + + a b c + d e f + g h i + j k l + + The tree gets formed this way: + a b c d e f g h i + \/ | \/ | \/ | + ab c de f gh i + \/ \/ \/ + abc def ghi + + The module will return file names associated with these + concatenated files. Then they can be combined to form the + sequence "abcdefghi" which is what we want. + + The above algorithm extrapolates to higher dimensions. Args: samples: Sample array. @@ -265,7 +306,7 @@ def to_segy( out_dtype: Desired output data type. out_byteorder: Desired output data byte order. file_root: Root directory to write partial SEG-Y files. - axis: Which axes to merge on. + axis: Which axes to merge on. Excluding sample axis. Returns: Array containing final, flattened SEG-Y blocks. @@ -292,7 +333,7 @@ def to_segy( result = _tree_reduce( trace_files, - segy_row_concat, + segy_concat, axis, keepdims=False, dtype=trace_files.dtype, diff --git a/src/mdio/segy/creation.py b/src/mdio/segy/creation.py index 3eea8f1d..9eb306f7 100644 --- a/src/mdio/segy/creation.py +++ b/src/mdio/segy/creation.py @@ -154,10 +154,10 @@ def write_to_segy_stack( """ # Make output array with string type. We need to know # the length of the string ahead of time. + # Last axis can be written as sequential, so we collapse that to 1. mock_path = path.join(file_root, uuid4().hex) - paths_shape = (live.shape[0],) + (1,) * (samples.ndim - 1) paths_dtype = f"U{len(mock_path)}" - + paths_shape = live.shape[:-1] + (1,) part_segy_paths = np.full(paths_shape, fill_value="missing", dtype=paths_dtype) # Fast path to return if no live traces. @@ -179,29 +179,27 @@ def write_to_segy_stack( }, ) - # Iterate first axis, rows, with headers and live mask. - # We will flatten along first axis to write to SEG-Y. - trace_parts = zip(samples, headers, live) - for row_idx, (row_samp, row_head, row_live) in enumerate(trace_parts): - row_n_live = np.count_nonzero(row_live) + # Iterate on N-1 axes of live mask. Last axis can be written + # without worrying about order because it is sequential. + for index in np.ndindex(live.shape[:-1]): + part_live = live[index] + num_live = np.count_nonzero(part_live) - if row_n_live == 0: + if num_live == 0: continue - # Generate file name and append to return list. + # Generate unique file name and append to return list. file_path = path.join(file_root, uuid4().hex) - part_segy_paths[row_idx] = file_path - - # Interleave traces - row_trace = np.empty(row_n_live, dtype=trace_dtype) + part_segy_paths[index] = file_path - row_trace["header"] = row_head[row_live] - row_trace["trace"] = row_samp[row_live] - row_trace["pad"] = 0 + # Interleave samples and headers + part_traces = np.empty(num_live, dtype=trace_dtype) + part_traces["header"] = headers[index][part_live] + part_traces["trace"] = samples[index][part_live] + part_traces["pad"] = 0 - # Write sequence. with open(file_path, mode="wb") as fp: - row_trace.tofile(fp) + part_traces.tofile(fp) return part_segy_paths