From 3ed44a1a5d6cb74ad91643f310b9507a94c57e75 Mon Sep 17 00:00:00 2001 From: Altay Sansal Date: Mon, 14 Nov 2022 21:49:40 -0600 Subject: [PATCH] newer cut logic --- src/mdio/__init__.py | 2 +- src/mdio/api/io_utils.py | 2 +- src/mdio/converters/mdio.py | 113 +++++++++++++++--------------------- src/mdio/converters/segy.py | 1 + src/mdio/segy/_workers.py | 55 ------------------ src/mdio/segy/creation.py | 113 +++++++++++++++++++++++++++--------- 6 files changed, 135 insertions(+), 151 deletions(-) diff --git a/src/mdio/__init__.py b/src/mdio/__init__.py index 499b6587..21397ee9 100644 --- a/src/mdio/__init__.py +++ b/src/mdio/__init__.py @@ -22,4 +22,4 @@ try: __version__ = metadata.version("multidimio") except metadata.PackageNotFoundError: - __version__ = "unknown" \ No newline at end of file + __version__ = "unknown" diff --git a/src/mdio/api/io_utils.py b/src/mdio/api/io_utils.py index 64d8356a..1d01a1bf 100644 --- a/src/mdio/api/io_utils.py +++ b/src/mdio/api/io_utils.py @@ -138,4 +138,4 @@ def open_zarr_array_dask(group_handle: zarr.Group, name: str, **kwargs) -> da.Ar Zarr array opened with Dask engine. """ zarr_array = open_zarr_array(group_handle=group_handle, name=name) - return da.from_array(zarr_array, **kwargs) + return da.from_array(zarr_array, **kwargs, inline_array=True) diff --git a/src/mdio/converters/mdio.py b/src/mdio/converters/mdio.py index 9f8fa2e1..5fcc0c4c 100644 --- a/src/mdio/converters/mdio.py +++ b/src/mdio/converters/mdio.py @@ -8,18 +8,16 @@ import dask.array as da import numpy as np -from dask.array.core import Array -from dask.base import compute_as_if_collection -from dask.highlevelgraph import HighLevelGraph -from tqdm.dask import TqdmCallback +from tqdm.auto import tqdm from mdio import MDIOReader -from mdio.segy._workers import chunk_to_sgy_stack from mdio.segy.byte_utils import ByteOrder from mdio.segy.byte_utils import Dtype from mdio.segy.creation import concat_files from mdio.segy.creation import mdio_spec_to_segy -from mdio.segy.creation import prepare_traces +from mdio.segy.creation import prepare_headers +from mdio.segy.creation import prepare_samples +from mdio.segy.creation import write_to_segy_stack try: @@ -114,7 +112,7 @@ def mdio_to_segy( # noqa: C901 # We flatten the z-axis (time or depth); so ieee2ibm, and byte-swaps etc # can run on big chunks of data. - auto_chunk = (None,) * (ndim - 2) + ("100M",) + (-1,) + auto_chunk = (None,) * (ndim - 1) + ("100M",) new_chunks = new_chunks if new_chunks is not None else auto_chunk creation_args = [ @@ -166,81 +164,64 @@ def mdio_to_segy( # noqa: C901 selection_mask = selection_mask[dim_slices] live_mask = live_mask & selection_mask - # Build a Dask graph to do the computation - # Name of task. Using uuid1 is important because - # we could potentially generate these from different machines - write_task_name = "write_sgy_block" - + # Parse output type and byte order out_dtype = Dtype[out_sample_format.upper()] out_byteorder = ByteOrder[endian.upper()] - traces = da.blockwise( - prepare_traces, - "ij", - samples, - "ijk", - headers, - "ij", - concatenate=True, + samples_proc = samples.map_blocks( + prepare_samples, out_dtype=out_dtype, out_byteorder=out_byteorder, ) + headers_proc = headers.map_blocks( + prepare_headers, + out_byteorder=out_byteorder, + ) + + trace_dtype = { + "names": ("header", "pad", "trace"), + "formats": [ + headers_proc.dtype, + np.dtype("int64"), + samples_proc.shape[-1] * samples_proc.dtype, + ], + } - trace_keys = traces.__dask_keys__() - live_keys = live_mask.__dask_keys__() + trace_dtype = np.dtype(trace_dtype) # tmp file root out_dir = path.dirname(output_segy_path) tmp_dir = TemporaryDirectory(dir=out_dir) - task_graph_dict = {} - for row in range(live_mask.blocks.shape[0]): - for col in range(live_mask.blocks.shape[1]): - block_args = ( - trace_keys[row][col], - live_keys[row][col], - tmp_dir.name, - row, - col, - ) - - task_graph_dict[(write_task_name, row, col)] = ( - chunk_to_sgy_stack, - ) + block_args - - # Make actual graph - task_graph = HighLevelGraph.from_collections( - write_task_name, - task_graph_dict, - dependencies=[traces, live_mask], + lazy_traces = da.map_blocks( + write_to_segy_stack, + samples_proc, + headers_proc[..., None], + live_mask[..., None], + file_root=tmp_dir.name, + trace_dtype=trace_dtype, + drop_axis=-1, ) - # Note this doesn't work with distributed. - tqdm_kw = dict(unit="block", dynamic_ncols=True) - block_progress = TqdmCallback(desc="Step 1 / 2 Writing Blocks", **tqdm_kw) - + tqdm_kw = dict( + desc="Writing Blocks", + total=lazy_traces.blocks.shape[0], + unit="block", + dynamic_ncols=True, + ) with tmp_dir: - with block_progress: - results = compute_as_if_collection( - cls=Array, - dsk=task_graph, - keys=list(task_graph_dict), - scheduler=client, - ) + for segy_block in tqdm(lazy_traces.blocks, **tqdm_kw): + partial_files = segy_block.compute() - concat_file_paths = [output_segy_path] + concat_file_paths = [output_segy_path] - concat_list = [] - for block in results: - for file, exists in block: - if exists: - concat_list.append(file) + partial_list = partial_files.ravel().tolist() + partial_list = [path.join(tmp_dir.name, file) for file in partial_list] + partial_list.sort() - concat_list.sort() + concat_file_paths += partial_list - concat_file_paths += concat_list - - if client is not None: - _ = client.submit(concat_files, concat_file_paths).result() - else: - concat_files(concat_file_paths) + if client is not None: + _ = client.submit(concat_files, concat_file_paths).result() + else: + concat_files(concat_file_paths) diff --git a/src/mdio/converters/segy.py b/src/mdio/converters/segy.py index bc6252b6..97308fd7 100644 --- a/src/mdio/converters/segy.py +++ b/src/mdio/converters/segy.py @@ -23,6 +23,7 @@ from mdio.segy.parsers import parse_text_header from mdio.segy.utilities import get_grid_plan + try: API_VERSION = metadata.version("multidimio") except metadata.PackageNotFoundError: diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index 0ec49520..135cf8ba 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -3,15 +3,12 @@ from __future__ import annotations -from os import path from typing import Any from typing import Sequence -from uuid import uuid1 import numpy as np import segyio from numpy.typing import ArrayLike -from numpy.typing import NDArray from zarr import Array from mdio.constants import UINT32_MAX @@ -199,58 +196,6 @@ def trace_worker( return count, chunk_sum, chunk_sum_squares, min_val, max_val -def traces_to_file( - traces: NDArray, - live: NDArray, - out_path: str, -) -> None: - """Write traces to binary file. - - Args: - traces: Trace data with headers in structured array. - live: Live mask. - out_path: Path to the output file. - """ - with open(out_path, mode="wb") as fp: - traces[live].tofile(fp) - - -def chunk_to_sgy_stack( - traces: NDArray, - live: NDArray, - out_root: str, - row: int, - col: int, -) -> list[tuple[str, int]]: - """Convert a partial chunk (block) to stack of SEG-Y traces. - - Args: - traces: Trace data. - live: Live mask. - out_root: Root directory for output file. - row: Row index of chunk block within full array. - col: Col index of chunk block within full array. - - Returns: - List of (path, exists) tuples created in this function. - - """ - block_files = [] - - for idx, (t, l) in enumerate(zip(traces, live)): - f_name = f".{row:05d}_{idx:05d}_{col:05d}_{str(uuid1())}.sgyblock" - f_path = path.join(out_root, f_name) - - if np.count_nonzero(l) == 0: - block_files.append((f_path, 0)) - continue - - block_files.append((f_path, 1)) - traces_to_file(t, l, f_path) - - return block_files - - # tqdm only works properly with pool.map # However, we need pool.starmap because we have more than one # argument to make pool.map work with multiple arguments, we diff --git a/src/mdio/segy/creation.py b/src/mdio/segy/creation.py index de457f0a..35dbb599 100644 --- a/src/mdio/segy/creation.py +++ b/src/mdio/segy/creation.py @@ -4,6 +4,7 @@ from __future__ import annotations import os +from os import path from shutil import copyfileobj from time import sleep @@ -116,33 +117,106 @@ def mdio_spec_to_segy( return mdio, out_sample_format -def interleave_traces( +def write_to_segy_stack( samples: NDArray, headers: NDArray, + live: NDArray, trace_dtype: DTypeLike, + file_root: str, + block_info=None, ) -> NDArray: """Interleave separate headers and traces together. Args: samples: Array containing the trace samples. headers: Array containing the trace headers. + live: Array containing the trace live mask. trace_dtype: Structured dtype of the interleaved output. + file_root: Root directory to write partial SEG-Y files. + block_info: Dask array specific metadata. Returns: - Traces with headers and samples combined into structured array. + Jagged array containing file names for partial data. """ - traces = np.empty(headers.shape, dtype=trace_dtype) + headers = np.squeeze(headers) + live = np.squeeze(live) - traces["header"] = headers - traces["trace"] = samples - traces["pad"] = 0 + if block_info is None: + return np.empty(live.shape, dtype="object") - return traces + # Get global position from sample data, ignore last sample axis. + array_loc = block_info[0]["array-location"][:-1] + # Iterate first axis (to be flattened) with headers and live mask. + valid_files = [] + for idx, (line_samp, line_hdr, line_live) in enumerate(zip(samples, headers, live)): + n_live = np.count_nonzero(line_live) -def prepare_traces( + if n_live == 0: + continue + + # Generate first axis sequence number. + # Generate rest of the axes sequence start numbers. + first_dim_sequence = [f"{array_loc[0][0] + idx:09}"] + other_dim_sequence = [f"{loc[0]:09}" for loc in array_loc[1:]] + + # Generate file name and append to return list. + line_idx = first_dim_sequence + other_dim_sequence + file_name = "_".join(line_idx) + file_path = path.join(file_root, file_name) + valid_files.append(file_name) + + # Interleave traces + line_trc = np.empty(n_live, dtype=trace_dtype) + + line_trc["header"] = line_hdr[line_live] + line_trc["trace"] = line_samp[line_live] + line_trc["pad"] = 0 + + # Write sequence. + with open(file_path, mode="wb") as fp: + line_trc.tofile(fp) + + return np.asarray([valid_files], dtype="object") + + +def check_byteswap(array: NDArray, out_byteorder: ByteOrder) -> NDArray: + """Check input byteorder and swap if user wants the other. + + Args: + array: Array containing the data. + out_byteorder: Desired output data byte order. + + Returns: + Original or byte-order swapped array. + """ + in_byteorder = get_byteorder(array) + + if in_byteorder != out_byteorder: + array.byteswap(inplace=True) + array = array.newbyteorder() + + return array + + +def prepare_headers(headers: NDArray, out_byteorder: ByteOrder) -> NDArray: + """Prepare headers to be written to SEG-Y. + + They usually only need a byte-swap. + + Args: + headers: Array containing the trace headers. + out_byteorder: Desired output data byte order. + + Returns: + New header array with pre-processing applied. + """ + headers = check_byteswap(headers, out_byteorder) + return headers + + +def prepare_samples( samples: NDArray, - headers: NDArray, out_dtype: Dtype, out_byteorder: ByteOrder, ) -> NDArray: @@ -153,7 +227,6 @@ def prepare_traces( Args: samples: Array containing the trace samples. - headers: Array containing the trace headers. out_dtype: Desired output data type. out_byteorder: Desired output data byte order. @@ -166,25 +239,9 @@ def prepare_traces( else: samples = samples.astype(out_dtype, copy=False) - trace_dtype = { - "names": ("header", "pad", "trace"), - "formats": [ - headers.dtype, - np.dtype("int64"), - samples.shape[-1] * samples.dtype, - ], - } - trace_dtype = np.dtype(trace_dtype) - - traces = interleave_traces(samples, headers, trace_dtype) - - in_byteorder = get_byteorder(headers) - - if in_byteorder != out_byteorder: - traces.byteswap(inplace=True) - traces = traces.newbyteorder() + samples = check_byteswap(samples, out_byteorder) - return traces + return samples # TODO: Abstract this to support various implementations by