Skip to content

Commit

Permalink
newer cut logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Altay Sansal committed Nov 15, 2022
1 parent dad85da commit 3ed44a1
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 151 deletions.
2 changes: 1 addition & 1 deletion src/mdio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
try:
__version__ = metadata.version("multidimio")
except metadata.PackageNotFoundError:
__version__ = "unknown"
__version__ = "unknown"
2 changes: 1 addition & 1 deletion src/mdio/api/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,4 @@ def open_zarr_array_dask(group_handle: zarr.Group, name: str, **kwargs) -> da.Ar
Zarr array opened with Dask engine.
"""
zarr_array = open_zarr_array(group_handle=group_handle, name=name)
return da.from_array(zarr_array, **kwargs)
return da.from_array(zarr_array, **kwargs, inline_array=True)
113 changes: 47 additions & 66 deletions src/mdio/converters/mdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,16 @@

import dask.array as da
import numpy as np
from dask.array.core import Array
from dask.base import compute_as_if_collection
from dask.highlevelgraph import HighLevelGraph
from tqdm.dask import TqdmCallback
from tqdm.auto import tqdm

from mdio import MDIOReader
from mdio.segy._workers import chunk_to_sgy_stack
from mdio.segy.byte_utils import ByteOrder
from mdio.segy.byte_utils import Dtype
from mdio.segy.creation import concat_files
from mdio.segy.creation import mdio_spec_to_segy
from mdio.segy.creation import prepare_traces
from mdio.segy.creation import prepare_headers
from mdio.segy.creation import prepare_samples
from mdio.segy.creation import write_to_segy_stack


try:
Expand Down Expand Up @@ -114,7 +112,7 @@ def mdio_to_segy( # noqa: C901

# We flatten the z-axis (time or depth); so ieee2ibm, and byte-swaps etc
# can run on big chunks of data.
auto_chunk = (None,) * (ndim - 2) + ("100M",) + (-1,)
auto_chunk = (None,) * (ndim - 1) + ("100M",)
new_chunks = new_chunks if new_chunks is not None else auto_chunk

creation_args = [
Expand Down Expand Up @@ -166,81 +164,64 @@ def mdio_to_segy( # noqa: C901
selection_mask = selection_mask[dim_slices]
live_mask = live_mask & selection_mask

# Build a Dask graph to do the computation
# Name of task. Using uuid1 is important because
# we could potentially generate these from different machines
write_task_name = "write_sgy_block"

# Parse output type and byte order
out_dtype = Dtype[out_sample_format.upper()]
out_byteorder = ByteOrder[endian.upper()]

traces = da.blockwise(
prepare_traces,
"ij",
samples,
"ijk",
headers,
"ij",
concatenate=True,
samples_proc = samples.map_blocks(
prepare_samples,
out_dtype=out_dtype,
out_byteorder=out_byteorder,
)
headers_proc = headers.map_blocks(
prepare_headers,
out_byteorder=out_byteorder,
)

trace_dtype = {
"names": ("header", "pad", "trace"),
"formats": [
headers_proc.dtype,
np.dtype("int64"),
samples_proc.shape[-1] * samples_proc.dtype,
],
}

trace_keys = traces.__dask_keys__()
live_keys = live_mask.__dask_keys__()
trace_dtype = np.dtype(trace_dtype)

# tmp file root
out_dir = path.dirname(output_segy_path)
tmp_dir = TemporaryDirectory(dir=out_dir)

task_graph_dict = {}
for row in range(live_mask.blocks.shape[0]):
for col in range(live_mask.blocks.shape[1]):
block_args = (
trace_keys[row][col],
live_keys[row][col],
tmp_dir.name,
row,
col,
)

task_graph_dict[(write_task_name, row, col)] = (
chunk_to_sgy_stack,
) + block_args

# Make actual graph
task_graph = HighLevelGraph.from_collections(
write_task_name,
task_graph_dict,
dependencies=[traces, live_mask],
lazy_traces = da.map_blocks(
write_to_segy_stack,
samples_proc,
headers_proc[..., None],
live_mask[..., None],
file_root=tmp_dir.name,
trace_dtype=trace_dtype,
drop_axis=-1,
)

# Note this doesn't work with distributed.
tqdm_kw = dict(unit="block", dynamic_ncols=True)
block_progress = TqdmCallback(desc="Step 1 / 2 Writing Blocks", **tqdm_kw)

tqdm_kw = dict(
desc="Writing Blocks",
total=lazy_traces.blocks.shape[0],
unit="block",
dynamic_ncols=True,
)
with tmp_dir:
with block_progress:
results = compute_as_if_collection(
cls=Array,
dsk=task_graph,
keys=list(task_graph_dict),
scheduler=client,
)
for segy_block in tqdm(lazy_traces.blocks, **tqdm_kw):
partial_files = segy_block.compute()

concat_file_paths = [output_segy_path]
concat_file_paths = [output_segy_path]

concat_list = []
for block in results:
for file, exists in block:
if exists:
concat_list.append(file)
partial_list = partial_files.ravel().tolist()
partial_list = [path.join(tmp_dir.name, file) for file in partial_list]
partial_list.sort()

concat_list.sort()
concat_file_paths += partial_list

concat_file_paths += concat_list

if client is not None:
_ = client.submit(concat_files, concat_file_paths).result()
else:
concat_files(concat_file_paths)
if client is not None:
_ = client.submit(concat_files, concat_file_paths).result()
else:
concat_files(concat_file_paths)
1 change: 1 addition & 0 deletions src/mdio/converters/segy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from mdio.segy.parsers import parse_text_header
from mdio.segy.utilities import get_grid_plan


try:
API_VERSION = metadata.version("multidimio")
except metadata.PackageNotFoundError:
Expand Down
55 changes: 0 additions & 55 deletions src/mdio/segy/_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@

from __future__ import annotations

from os import path
from typing import Any
from typing import Sequence
from uuid import uuid1

import numpy as np
import segyio
from numpy.typing import ArrayLike
from numpy.typing import NDArray
from zarr import Array

from mdio.constants import UINT32_MAX
Expand Down Expand Up @@ -199,58 +196,6 @@ def trace_worker(
return count, chunk_sum, chunk_sum_squares, min_val, max_val


def traces_to_file(
traces: NDArray,
live: NDArray,
out_path: str,
) -> None:
"""Write traces to binary file.
Args:
traces: Trace data with headers in structured array.
live: Live mask.
out_path: Path to the output file.
"""
with open(out_path, mode="wb") as fp:
traces[live].tofile(fp)


def chunk_to_sgy_stack(
traces: NDArray,
live: NDArray,
out_root: str,
row: int,
col: int,
) -> list[tuple[str, int]]:
"""Convert a partial chunk (block) to stack of SEG-Y traces.
Args:
traces: Trace data.
live: Live mask.
out_root: Root directory for output file.
row: Row index of chunk block within full array.
col: Col index of chunk block within full array.
Returns:
List of (path, exists) tuples created in this function.
"""
block_files = []

for idx, (t, l) in enumerate(zip(traces, live)):
f_name = f".{row:05d}_{idx:05d}_{col:05d}_{str(uuid1())}.sgyblock"
f_path = path.join(out_root, f_name)

if np.count_nonzero(l) == 0:
block_files.append((f_path, 0))
continue

block_files.append((f_path, 1))
traces_to_file(t, l, f_path)

return block_files


# tqdm only works properly with pool.map
# However, we need pool.starmap because we have more than one
# argument to make pool.map work with multiple arguments, we
Expand Down
Loading

0 comments on commit 3ed44a1

Please sign in to comment.