Skip to content

Commit

Permalink
Merge pull request #109 from TGSAI/feature/refactor_segy_export
Browse files Browse the repository at this point in the history
Refactor SEG-Y export to optimize memory usage.
  • Loading branch information
tasansal authored Nov 26, 2022
2 parents 6f95fa5 + 8253db7 commit 48c1817
Show file tree
Hide file tree
Showing 15 changed files with 564 additions and 303 deletions.
5 changes: 5 additions & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ Now you can:
- Use the container as a remote interpreter.
- Run MDIO developer tools like tests, build docs, etc.

> **NOTE**:
> MDIO doesn't have to be installed here. As long as `$SRC_DIR` from host is mounted to
> `/mdio-python` in the container, the `PYTHONPATH` variable will let the development
> environment interpreter know where to find MDIO.
### Running Developer Tools

Since the container now has developer tools, they can be executed with this pattern:
Expand Down
11 changes: 7 additions & 4 deletions docker/slim-bullseye-3.10-dev.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,23 @@ RUN pip install \
--no-ansi

# Install Git
FROM python:3.10-slim-bullseye as git_base
FROM python:3.10-slim-bullseye as system_tools

RUN apt-get update \
&& apt-get install -y --no-install-recommends git \
&& apt-get install -y --no-install-recommends \
git \
graphviz \
&& rm -rf /var/lib/apt/lists/*

# Final Stage (git + venv)
FROM git_base
FROM system_tools

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PATH="/opt/venv/bin:$PATH" \
SHELL=/bin/bash
SHELL=/bin/bash \
PYTHONPATH=/mdio-python/src

COPY --from=venv_base --chmod=777 /opt/venv /opt/venv

Expand Down
5 changes: 4 additions & 1 deletion src/mdio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@
]


__version__ = metadata.version("multidimio")
try:
__version__ = metadata.version("multidimio")
except metadata.PackageNotFoundError:
__version__ = "unknown"
2 changes: 1 addition & 1 deletion src/mdio/api/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,4 @@ def open_zarr_array_dask(group_handle: zarr.Group, name: str, **kwargs) -> da.Ar
Zarr array opened with Dask engine.
"""
zarr_array = open_zarr_array(group_handle=group_handle, name=name)
return da.from_array(zarr_array, **kwargs)
return da.from_array(zarr_array, **kwargs, inline_array=True)
127 changes: 36 additions & 91 deletions src/mdio/converters/mdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@

from __future__ import annotations

import uuid
from os import path
from tempfile import TemporaryDirectory

import numpy as np
from dask.array.core import Array
from dask.base import compute_as_if_collection
from dask.core import flatten
from dask.highlevelgraph import HighLevelGraph
from tqdm.dask import TqdmCallback

from mdio import MDIOReader
from mdio.segy._workers import write_block_to_segy
from mdio.segy.blocked_io import to_segy
from mdio.segy.byte_utils import ByteOrder
from mdio.segy.byte_utils import Dtype
from mdio.segy.creation import concat_files
from mdio.segy.creation import mdio_spec_to_segy
from mdio.segy.creation import merge_partial_segy
from mdio.segy.utilities import segy_export_rechunker


try:
Expand All @@ -25,7 +24,7 @@
distributed = None


def mdio_to_segy(
def mdio_to_segy( # noqa: C901
mdio_path_or_buffer: str,
output_segy_path: str,
endian: str = "big",
Expand Down Expand Up @@ -62,7 +61,7 @@ def mdio_to_segy(
access_pattern: This specificies the chunk access pattern. Underlying
zarr.Array must exist. Examples: '012', '01'
out_sample_format: Output sample format.
Currently support: {'ibm32', 'ieee32'}. Default is 'ibm32'.
Currently support: {'ibm32', 'float32'}. Default is 'ibm32'.
storage_options: Storage options for the cloud storage backend.
Default: None (will assume anonymous access)
new_chunks: Set manual chunksize. For development purposes only.
Expand Down Expand Up @@ -95,7 +94,7 @@ def mdio_to_segy(
... mdio_path_or_buffer="prefix2/file.mdio",
... output_segy_path="prefix/file.segy",
... selection_mask=boolean_mask,
... out_sample_format="ieee32",
... out_sample_format="float32",
... )
"""
Expand All @@ -107,12 +106,8 @@ def mdio_to_segy(
storage_options=storage_options,
)

ndim = mdio.n_dim

# We flatten the z-axis (time or depth); so ieee2ibm, and byte-swaps etc
# can run on big chunks of data.
auto_chunk = (None,) * (ndim - 2) + ("100M",) + (-1,)
new_chunks = new_chunks if new_chunks is not None else auto_chunk
if new_chunks is None:
new_chunks = segy_export_rechunker(mdio.chunks, mdio.shape, mdio._traces.dtype)

creation_args = [
mdio_path_or_buffer,
Expand All @@ -137,8 +132,6 @@ def mdio_to_segy(
else:
mdio, sample_format = mdio_spec_to_segy(*creation_args)

num_samp = mdio.shape[-1]

live_mask = mdio.live_mask.compute()

if selection_mask is not None:
Expand All @@ -158,84 +151,36 @@ def mdio_to_segy(
dim_slices += (slice(start, stop),)

# Lazily pull the data with limits now, and limit mask so its the same shape.
live_mask, headers, traces = mdio[dim_slices]
live_mask, headers, samples = mdio[dim_slices]
live_mask = live_mask.rechunk(headers.chunksize)

if selection_mask is not None:
selection_mask = selection_mask[dim_slices]
live_mask = live_mask & selection_mask

# Now we flatten the data in the slowest changing axis (i.e. 0)
# TODO: Add support for flipping these, if user wants
axis = 0

# Get new chunksizes for sequential array
seq_trc_chunks = tuple(
(dim_chunks if idx == axis else (sum(dim_chunks),))
for idx, dim_chunks in enumerate(traces.chunks)
)

# We must unify chunks with "trc_chunks" here because
# headers and live mask may have different chunking.
# We don't take the time axis for headers / live
# Still lazy computation
traces_seq = traces.rechunk(seq_trc_chunks)
headers_seq = headers.rechunk(seq_trc_chunks[:-1])
live_seq = live_mask.rechunk(seq_trc_chunks[:-1])

# Build a Dask graph to do the computation
# Name of task. Using uuid1 is important because
# we could potentially generate these from different machines
task_name = "block-to-sgy-part-" + str(uuid.uuid1())

trace_keys = flatten(traces_seq.__dask_keys__())
header_keys = flatten(headers_seq.__dask_keys__())
live_keys = flatten(live_seq.__dask_keys__())

all_keys = zip(trace_keys, header_keys, live_keys)
# Parse output type and byte order
out_dtype = Dtype[out_sample_format.upper()]
out_byteorder = ByteOrder[endian.upper()]

# tmp file root
out_dir = path.dirname(output_segy_path)

task_graph_dict = {}
block_file_paths = []
for idx, (trace_key, header_key, live_key) in enumerate(all_keys):
block_file_name = f".{idx}_{uuid.uuid1()}._segyblock"
block_file_path = path.join(out_dir, block_file_name)
block_file_paths.append(block_file_path)

block_args = (
block_file_path,
trace_key,
header_key,
live_key,
num_samp,
sample_format,
endian,
)

task_graph_dict[(task_name, idx)] = (write_block_to_segy,) + block_args

# Make actual graph
task_graph = HighLevelGraph.from_collections(
task_name,
task_graph_dict,
dependencies=[traces_seq, headers_seq, live_seq],
)

# Note this doesn't work with distributed.
tqdm_kw = dict(unit="block", dynamic_ncols=True)
block_progress = TqdmCallback(desc="Step 1 / 2 Writing Blocks", **tqdm_kw)

with block_progress:
block_exists = compute_as_if_collection(
cls=Array,
dsk=task_graph,
keys=list(task_graph_dict),
scheduler=client,
)

merge_args = [output_segy_path, block_file_paths, block_exists]
if client is not None:
_ = client.submit(merge_partial_segy, *merge_args).result()
else:
merge_partial_segy(*merge_args)
tmp_dir = TemporaryDirectory(dir=out_dir)

with tmp_dir:
with TqdmCallback(desc="Unwrapping MDIO Blocks"):
flat_files = to_segy(
samples=samples,
headers=headers,
live_mask=live_mask,
out_dtype=out_dtype,
out_byteorder=out_byteorder,
file_root=tmp_dir.name,
axis=tuple(range(1, samples.ndim)),
).compute()

final_concat = [output_segy_path] + flat_files.tolist()

if client is not None:
_ = client.submit(concat_files, final_concat).result()
else:
concat_files(final_concat, progress=True)
6 changes: 5 additions & 1 deletion src/mdio/converters/segy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
from mdio.segy.utilities import get_grid_plan


API_VERSION = metadata.version("multidimio")
try:
API_VERSION = metadata.version("multidimio")
except metadata.PackageNotFoundError:
API_VERSION = "unknown"

BACKENDS = ["s3", "gcs", "gs", "az", "abfs"]


Expand Down
4 changes: 2 additions & 2 deletions src/mdio/segy/_standards_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ class SegyFloatFormat(IntEnum):
IBM32 = 1
INT32 = 2
INT16 = 3
IEEE32 = 5
IEEE64 = 6
FLOAT32 = 5
FLOAT64 = 6
INT8 = 8
INT64 = 9
UINT32 = 10
Expand Down
Loading

0 comments on commit 48c1817

Please sign in to comment.