Skip to content

Commit

Permalink
optimized segy export logic for N-D datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
tasansal committed Nov 19, 2022
1 parent 3d9108e commit 47ea835
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 36 deletions.
77 changes: 59 additions & 18 deletions src/mdio/segy/blocked_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,40 +210,48 @@ def to_zarr(
return stats


def segy_row_concat(
partial_rows: NDArray,
def segy_concat(
partial_files: NDArray,
axis: tuple[int] = None,
keepdims: bool = None,
) -> NDArray:
"""Aggregate partial SEG-Y blocks on disk, preserving order.
Used in conjunction with tree reduction. It will take an array
of file names, which preserved the adjacency of blocks, and then
combines adjacent blocks while flattening for SEG-Y.
For `axis` and `keepdims` parameters, please see `dask.array.reduce`
documentation.
Args:
partial_rows: Array containing paths to parts of a SEG-Y row.
partial_files: Array containing paths to parts of a SEG-Y row.
axis: Which axes to concatenate on.
keepdims: Keep the original dimensionality after merging.
Returns:
Concatenated file name array. Dimensions depend on `keepdims`.
"""
n_rows = partial_rows.shape[0]
concat_rows = np.full_like(partial_rows, fill_value="missing", shape=n_rows)
concat_shape = partial_files.shape[0]
concat_paths = np.full_like(partial_files, fill_value="missing", shape=concat_shape)

# Fast path if all data in block is missing.
if np.all(partial_rows == "missing"):
return np.expand_dims(concat_rows, axis) if keepdims else concat_rows
if np.all(partial_files == "missing"):
return np.expand_dims(concat_paths, axis) if keepdims else concat_paths

# Concat rows and update concat_rows result.
for idx, row_parts in enumerate(partial_rows):
row_missing = row_parts == "missing"
# Flatten and concat section files to a single root file at first axis.
for index, section_paths in enumerate(partial_files):
section_paths = section_paths.ravel()
section_missing = section_paths == "missing"

if np.all(row_missing):
if np.all(section_missing):
continue

row_valid_paths = np.extract(~row_missing, row_parts).tolist()
row_concat_file = concat_files(row_valid_paths)
concat_rows[idx] = row_concat_file
section_valid_paths = np.extract(~section_missing, section_paths).tolist()
section_concat_files = concat_files(section_valid_paths)
concat_paths[index] = section_concat_files

return np.expand_dims(concat_rows, axis) if keepdims else concat_rows
return np.expand_dims(concat_paths, axis) if keepdims else concat_paths


def to_segy(
Expand All @@ -255,7 +263,40 @@ def to_segy(
file_root: str,
axis: tuple[int] | None = None,
) -> Array:
"""Convert MDIO blocks to SEG-Y parts.
r"""Convert MDIO blocks to SEG-Y parts.
This uses as a tree reduction algorithm. Blocks are written out
in parallel via multiple workers, and then adjacent blocks are
tracked and merged on disk via the `segy_concat` function. The
adjacent are hierarchically merged, and it preserves order.
Assume array with shape (4, 3, 2) with chunk sizes (1, 1, 1, 2).
The chunk indices for this array would be:
(0, 0, 0) (0, 1, 0) (0, 2, 0)
(1, 0, 0) (1, 1, 0) (1, 2, 0)
(2, 0, 0) (2, 1, 0) (2, 2, 0)
(3, 0, 0) (3, 1, 0) (3, 2, 0)
let's rename them to this for convenience:
a b c
d e f
g h i
j k l
The tree gets formed this way:
a b c d e f g h i
\/ | \/ | \/ |
ab c de f gh i
\/ \/ \/
abc def ghi
The module will return file names associated with these
concatenated files. Then they can be combined to form the
sequence "abcdefghi" which is what we want.
The above algorithm extrapolates to higher dimensions.
Args:
samples: Sample array.
Expand All @@ -265,7 +306,7 @@ def to_segy(
out_dtype: Desired output data type.
out_byteorder: Desired output data byte order.
file_root: Root directory to write partial SEG-Y files.
axis: Which axes to merge on.
axis: Which axes to merge on. Excluding sample axis.
Returns:
Array containing final, flattened SEG-Y blocks.
Expand All @@ -292,7 +333,7 @@ def to_segy(

result = _tree_reduce(
trace_files,
segy_row_concat,
segy_concat,
axis,
keepdims=False,
dtype=trace_files.dtype,
Expand Down
34 changes: 16 additions & 18 deletions src/mdio/segy/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ def write_to_segy_stack(
"""
# Make output array with string type. We need to know
# the length of the string ahead of time.
# Last axis can be written as sequential, so we collapse that to 1.
mock_path = path.join(file_root, uuid4().hex)
paths_shape = (live.shape[0],) + (1,) * (samples.ndim - 1)
paths_dtype = f"U{len(mock_path)}"

paths_shape = live.shape[:-1] + (1,)
part_segy_paths = np.full(paths_shape, fill_value="missing", dtype=paths_dtype)

# Fast path to return if no live traces.
Expand All @@ -179,29 +179,27 @@ def write_to_segy_stack(
},
)

# Iterate first axis, rows, with headers and live mask.
# We will flatten along first axis to write to SEG-Y.
trace_parts = zip(samples, headers, live)
for row_idx, (row_samp, row_head, row_live) in enumerate(trace_parts):
row_n_live = np.count_nonzero(row_live)
# Iterate on N-1 axes of live mask. Last axis can be written
# without worrying about order because it is sequential.
for index in np.ndindex(live.shape[:-1]):
part_live = live[index]
num_live = np.count_nonzero(part_live)

if row_n_live == 0:
if num_live == 0:
continue

# Generate file name and append to return list.
# Generate unique file name and append to return list.
file_path = path.join(file_root, uuid4().hex)
part_segy_paths[row_idx] = file_path

# Interleave traces
row_trace = np.empty(row_n_live, dtype=trace_dtype)
part_segy_paths[index] = file_path

row_trace["header"] = row_head[row_live]
row_trace["trace"] = row_samp[row_live]
row_trace["pad"] = 0
# Interleave samples and headers
part_traces = np.empty(num_live, dtype=trace_dtype)
part_traces["header"] = headers[index][part_live]
part_traces["trace"] = samples[index][part_live]
part_traces["pad"] = 0

# Write sequence.
with open(file_path, mode="wb") as fp:
row_trace.tofile(fp)
part_traces.tofile(fp)

return part_segy_paths

Expand Down

0 comments on commit 47ea835

Please sign in to comment.