Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory consumption during SEG-Y export #34

Closed
tasansal opened this issue Sep 1, 2022 · 2 comments · Fixed by #109
Closed

Reduce memory consumption during SEG-Y export #34

tasansal opened this issue Sep 1, 2022 · 2 comments · Fixed by #109
Labels
performance Performance

Comments

@tasansal
Copy link
Collaborator

tasansal commented Sep 1, 2022

The distributed workers flatten the chunks along the first dimension to write to SEG-Y.

Huge files >2TB use a lot of memory during export.

The output sharding strategy needs to be optimized:

# We must unify chunks with "trc_chunks" here because
# headers and live mask may have different chunking.
# We don't take the time axis for headers / live
# Still lazy computation
traces_seq = traces.rechunk(seq_trc_chunks)
headers_seq = headers.rechunk(seq_trc_chunks[:-1])
live_seq = live_mask.rechunk(seq_trc_chunks[:-1])
# Build a Dask graph to do the computation
# Name of task. Using uuid1 is important because
# we could potentially generate these from different machines
task_name = "block-to-sgy-part-" + str(uuid.uuid1())
trace_keys = flatten(traces_seq.__dask_keys__())
header_keys = flatten(headers_seq.__dask_keys__())
live_keys = flatten(live_seq.__dask_keys__())
all_keys = zip(trace_keys, header_keys, live_keys)
# tmp file root
out_dir = path.dirname(output_segy_path)
task_graph_dict = {}
block_file_paths = []
for idx, (trace_key, header_key, live_key) in enumerate(all_keys):
block_file_name = f".{idx}_{uuid.uuid1()}._segyblock"
block_file_path = path.join(out_dir, block_file_name)
block_file_paths.append(block_file_path)
block_args = (
block_file_path,
trace_key,
header_key,
live_key,
num_samp,
sample_format,
endian,
)
task_graph_dict[(task_name, idx)] = (write_block_to_segy,) + block_args
# Make actual graph
task_graph = HighLevelGraph.from_collections(
task_name,
task_graph_dict,
dependencies=[traces_seq, headers_seq, live_seq],
)
# Note this doesn't work with distributed.
tqdm_kw = dict(unit="block", dynamic_ncols=True)
block_progress = TqdmCallback(desc="Step 1 / 2 Writing Blocks", **tqdm_kw)
with block_progress:
block_exists = compute_as_if_collection(
cls=Array,
dsk=task_graph,
keys=list(task_graph_dict),
scheduler=client,
)
merge_args = [output_segy_path, block_file_paths, block_exists]
if client is not None:
_ = client.submit(merge_partial_segy, *merge_args).result()
else:
merge_partial_segy(*merge_args)

and

def merge_partial_segy(output_segy_path, block_file_paths, block_exists):

@tasansal tasansal added the performance Performance label Sep 1, 2022
@tasansal
Copy link
Collaborator Author

tasansal commented Oct 7, 2022

ref Dask Community Post

@tasansal
Copy link
Collaborator Author

tasansal commented Nov 3, 2022

This has significant improvements to memory usage:

dask/distributed#7128

import dask
import distributed

with dask.config.set({"distributed.scheduler.worker-saturation": "1.0"}):
    client = distributed.Client(...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Performance
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant