Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clone euxfel data file structure #467

Merged
merged 9 commits into from
Dec 5, 2023
Merged
182 changes: 182 additions & 0 deletions extra_data/copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import sys
from argparse import ArgumentParser
from pathlib import Path
from typing import Union

import h5py

from .utils import progress_bar

__all__ = ["copy_structure"]


def progress(processed, total, *, show=True):
"""Show progress information"""
if not show:
return

Check warning on line 16 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L15-L16

Added lines #L15 - L16 were not covered by tests

pbar = progress_bar(processed, total)
if sys.stderr.isatty():

Check warning on line 19 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L18-L19

Added lines #L18 - L19 were not covered by tests
# "\x1b[2K": delete whole line, "\x1b[1A": move up cursor
print("\x1b[2K\x1b[1A\x1b[2K\x1b[1A", file=sys.stderr)
print(pbar, file=sys.stderr)

Check warning on line 22 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L21-L22

Added lines #L21 - L22 were not covered by tests


class Cloner:
def __init__(self, input, output, *, run_data=False, control_data=False):
self.run_data = run_data
self.control_data = control_data
self.visited = {}

if output.file.mode == "r":
raise ValueError("Output file must be writeable.")

Check warning on line 32 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L32

Added line #L32 was not covered by tests
self.visit(input, output)

@staticmethod
def _copy_attrs(input, output):
for key, value in input.attrs.items():
output.attrs.create(key, value)

def visit(self, obj, output):
if obj.name != "/":
link = obj.file.get(obj.name, getlink=True)
tmichela marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(link, h5py.SoftLink):
output[obj.name] = h5py.SoftLink(link.path)
return
elif isinstance(link, h5py.ExternalLink):
# TODO do we want to support external links?
# this *might* work, but external softlinks may point to non reacheable data
# with h5py.File(link.filename) as ext:
# Cloner(ext[link.path], output[obj.name], run_data=self.run_data, control_data=self.control_data)
Fixed Show fixed Hide fixed
tmichela marked this conversation as resolved.
Show resolved Hide resolved
return

Check warning on line 51 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L51

Added line #L51 was not covered by tests

obj_id = h5py.h5o.get_info(obj.id).addr

if obj_id in self.visited:
# Hardlink to an object we've already seen
output[obj.name] = output[self.visited[obj_id]]
return

Check warning on line 58 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L57-L58

Added lines #L57 - L58 were not covered by tests

self.visited[obj_id] = obj.name

if isinstance(obj, h5py.Dataset):
if (
obj.name.startswith("/INSTRUMENT")
or (obj.name.startswith("/CONTROL") and not self.control_data)
or (obj.name.startswith("/RUN") and not self.run_data)
):
output_obj = output.create_dataset_like(obj.name, obj)
else:
# note: consider using h5py.File.copy once a bug causing
# segfault for dataset with attributes is fixed,
# see: https://github.com/HDFGroup/hdf5/issues/2414
output_obj = output.create_dataset_like(obj.name, obj, data=obj[()])
self._copy_attrs(obj, output_obj)
elif isinstance(obj, h5py.Group):
if obj == obj.file:
# root object
output_obj = output["/"]
else:
output_obj = output.create_group(obj.name)
self._copy_attrs(obj, output_obj)

for child in obj.values():
self.visit(child, output)
else:
# unknown type
return

Check warning on line 87 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L87

Added line #L87 was not covered by tests


def copy_structure(
input: Union[Path, str],
output: Union[Path, str],
*,
run_data=False,
control_data=False,
term_progress=False,
) -> None:
"""Clone EuXFEL HDF5 file structure without any of its data.

Clone the input file or files present the input directory.
The cloned files will be written to output.

args:
run_data: Copy data in RUN group if set to True
control_data: Copy data in CONTROL group if set to True
term_progress: show progress in terminal if set to True
"""
if isinstance(input, str):
input = Path(input)
input = input.expanduser()

if isinstance(output, str):
output = Path(output)

Check warning on line 113 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L113

Added line #L113 was not covered by tests
output = output.expanduser()

if not output.is_dir():
raise ValueError(f"The given output directory does not exist: {output}")

Check warning on line 117 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L117

Added line #L117 was not covered by tests

if h5py.is_hdf5(input):
if output == input.parent:
raise ValueError("Input and output must be different directories.")

Check warning on line 121 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L121

Added line #L121 was not covered by tests
Cloner(
h5py.File(input),
h5py.File(output / input.name, "w"),
run_data=run_data,
control_data=control_data,
)
elif input.is_dir():
if output == input:
raise ValueError("Input and output must be different directories.")

Check warning on line 130 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L128-L130

Added lines #L128 - L130 were not covered by tests
# clone all hdf5 file present in the given directory
h5files = [f for f in input.glob("*") if h5py.is_hdf5(f)]

Check warning on line 132 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L132

Added line #L132 was not covered by tests

progress(0, len(h5files), show=term_progress)
for n, file_ in enumerate(h5files, start=1):
Cloner(

Check warning on line 136 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L134-L136

Added lines #L134 - L136 were not covered by tests
h5py.File(file_),
h5py.File(output / file_.name, "w"),
run_data=run_data,
control_data=control_data,
)
progress(n, len(h5files), show=term_progress)

Check warning on line 142 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L142

Added line #L142 was not covered by tests
else:
raise ValueError(f"invalid input: {input}")

Check warning on line 144 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L144

Added line #L144 was not covered by tests


def main(argv=None):
ap = ArgumentParser("Clone EuXFEL HDF5 files but with empty datasets.")
ap.add_argument("input", type=str, help="Path to an HDF5 file or a directory.")
ap.add_argument(

Check warning on line 150 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L148-L150

Added lines #L148 - L150 were not covered by tests
"output", type=str, help="Output directory to write the cloned files."
)
ap.add_argument(

Check warning on line 153 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L153

Added line #L153 was not covered by tests
"--copy-run-data",
"-cr",
action="store_true",
default=False,
help="Copy data present in the RUN group.",
)
ap.add_argument(

Check warning on line 160 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L160

Added line #L160 was not covered by tests
"--copy-control-data",
"-cc",
action="store_true",
default=False,
help="Copy data present in the CONTROL group.",
)

args = ap.parse_args()

Check warning on line 168 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L168

Added line #L168 was not covered by tests

print(f"Cloning file(s) structure:\ninput: {args.input}\nOutput: {args.output}\n")
copy_structure(

Check warning on line 171 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L170-L171

Added lines #L170 - L171 were not covered by tests
args.input,
args.output,
run_data=args.copy_run_data,
control_data=args.copy_control_data,
term_progress=True,
)
print("Done.")

Check warning on line 178 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L178

Added line #L178 was not covered by tests


if __name__ == "__main__":
main(sys.argv[1:])

Check warning on line 182 in extra_data/copy.py

View check run for this annotation

Codecov / codecov/patch

extra_data/copy.py#L182

Added line #L182 was not covered by tests
47 changes: 47 additions & 0 deletions extra_data/tests/test_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import h5py
import numpy as np

from extra_data.copy import copy_structure


def test_copy_structure(tmp_path, mock_sa3_control_data):
xgm = "SA3_XTD10_XGM/XGM/DOOCS"
xgm_intensity = f"INSTRUMENT/{xgm}:output/data/intensityTD"
xgm_flux = f"CONTROL/{xgm}/pulseEnergy/photonFlux/value"
with h5py.File(mock_sa3_control_data, "a") as f:
# add softlink
f[f"LINKED/{xgm_intensity}"] = h5py.SoftLink(f"/{xgm_intensity}")
# add some data
ds = f[xgm_intensity]
ds[:] = np.ones(ds.shape, ds.dtype)
ds = f[xgm_flux]
ds[:] = np.ones(ds.shape, ds.dtype)

copy_structure(mock_sa3_control_data, tmp_path, control_data=True)

inp = h5py.File(mock_sa3_control_data)
out = h5py.File(tmp_path / mock_sa3_control_data.rpartition("/")[-1])
slink = out.get(f"LINKED/{xgm_intensity}", getlink=True)

# softlinks are copied
assert isinstance(slink, h5py.SoftLink)
assert slink.path == f"/{xgm_intensity}"
# data is not copied
assert out[xgm_intensity].shape == inp[xgm_intensity].shape
assert out[xgm_intensity].dtype == inp[xgm_intensity].dtype
assert (out[xgm_intensity][()] == 0).all()
# attributes are copied
assert out[xgm_intensity].attrs["unitName"] == "joule"
# control data is copied
assert out[xgm_flux].shape == inp[xgm_flux].shape
assert out[xgm_flux].dtype == inp[xgm_flux].dtype
assert (out[xgm_flux][()] == 1).all()
# run data is not copied
assert out[f"RUN/{xgm}/classId/value"].dtype == h5py.string_dtype()
assert out[f"RUN/{xgm}/classId/value"][()] == [b""]

# TODO test hardlinks


def test_copy_run():
...
Fixed Show fixed Hide fixed
11 changes: 10 additions & 1 deletion extra_data/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,23 @@
"""

import os
from shutil import get_terminal_size


def available_cpu_cores():
# This process may be restricted to a subset of the cores on the machine;
# sched_getaffinity() tells us which on some Unix flavours (inc Linux)
if hasattr(os, 'sched_getaffinity'):
if hasattr(os, "sched_getaffinity"):
return len(os.sched_getaffinity(0))
else:
# Fallback, inc on Windows
ncpu = os.cpu_count() or 2
return min(ncpu, 8)


def progress_bar(done, total, suffix=" "):
line = f"Progress: {done}/{total}{suffix}[{{}}]"
length = min(get_terminal_size().columns - len(line), 50)
filled = int(length * done // total)
bar = "#" * filled + " " * (length - filled)
return line.format(bar)

Check warning on line 31 in extra_data/utils.py

View check run for this annotation

Codecov / codecov/patch

extra_data/utils.py#L27-L31

Added lines #L27 - L31 were not covered by tests
11 changes: 1 addition & 10 deletions extra_data/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
import numpy as np
import os
import os.path as osp
from shutil import get_terminal_size
from signal import signal, SIGINT, SIG_IGN
import sys

from .reader import H5File, FileAccess
from .run_files_map import RunFilesMap

from .utils import progress_bar

class ValidationError(Exception):
def __init__(self, problems):
Expand Down Expand Up @@ -212,14 +211,6 @@ def check_index_contiguous(firsts, counts, record):
))


def progress_bar(done, total, suffix=' '):
line = f'Progress: {done}/{total}{suffix}[{{}}]'
length = min(get_terminal_size().columns - len(line), 50)
filled = int(length * done // total)
bar = '#' * filled + ' ' * (length - filled)
return line.format(bar)


def _check_file(args):
runpath, filename = args
filepath = osp.join(runpath, filename)
Expand Down