diff --git a/extra_data/copy.py b/extra_data/copy.py new file mode 100644 index 00000000..9a8de783 --- /dev/null +++ b/extra_data/copy.py @@ -0,0 +1,182 @@ +import sys +from argparse import ArgumentParser +from pathlib import Path +from typing import Union + +import h5py + +from .utils import progress_bar + +__all__ = ["copy_structure"] + + +def progress(processed, total, *, show=True): + """Show progress information""" + if not show: + return + + pbar = progress_bar(processed, total) + if sys.stderr.isatty(): + # "\x1b[2K": delete whole line, "\x1b[1A": move up cursor + print("\x1b[2K\x1b[1A\x1b[2K\x1b[1A", file=sys.stderr) + print(pbar, file=sys.stderr) + + +class Cloner: + def __init__(self, input, output, *, run_data=False, control_data=False): + self.run_data = run_data + self.control_data = control_data + self.visited = {} + + if output.file.mode == "r": + raise ValueError("Output file must be writeable.") + self.visit(input, output) + + @staticmethod + def _copy_attrs(input, output): + for key, value in input.attrs.items(): + output.attrs.create(key, value) + + def visit(self, obj, output): + if obj.name != "/": + link = obj.file.get(obj.name, getlink=True) + if isinstance(link, h5py.SoftLink): + # note this works only for SoftLinks. ExternalLink object's + # name is not the name of the path, but the targeted file's path + output[obj.name] = link + return + + obj_id = h5py.h5o.get_info(obj.id).addr + + if obj_id in self.visited: + # Hardlink to an object we've already seen + output[obj.name] = output[self.visited[obj_id]] + return + + self.visited[obj_id] = obj.name + + if isinstance(obj, h5py.Dataset): + if ( + obj.name.startswith("/INSTRUMENT") + or (obj.name.startswith("/CONTROL") and not self.control_data) + or (obj.name.startswith("/RUN") and not self.run_data) + ): + output_obj = output.create_dataset_like(obj.name, obj) + else: + # note: consider using h5py.File.copy once a bug causing + # segfault for dataset with attributes is fixed, + # see: https://github.com/HDFGroup/hdf5/issues/2414 + output_obj = output.create_dataset_like(obj.name, obj, data=obj[()]) + self._copy_attrs(obj, output_obj) + elif isinstance(obj, h5py.Group): + if obj == obj.file: + # root object + output_obj = output["/"] + else: + output_obj = output.create_group(obj.name) + self._copy_attrs(obj, output_obj) + + for name, child in obj.items(): + if child.file != obj.file: + # external link + output[f'{obj.name}/{name}'] = obj.get(name, getlink=True) + else: + self.visit(child, output) + else: + # unknown type + return + + +def copy_structure( + input: Union[Path, str], + output: Union[Path, str], + *, + run_data=False, + control_data=False, + term_progress=False, +) -> None: + """Clone EuXFEL HDF5 file structure without any of its data. + + Clone the input file or files present the input directory. + The cloned files will be written to output. + + args: + run_data: Copy data in RUN group if set to True + control_data: Copy data in CONTROL group if set to True + term_progress: show progress in terminal if set to True + """ + if isinstance(input, str): + input = Path(input) + input = input.expanduser() + + if isinstance(output, str): + output = Path(output) + output = output.expanduser() + + if not output.is_dir(): + raise ValueError(f"The given output directory does not exist: {output}") + + if h5py.is_hdf5(input): + if output == input.parent: + raise ValueError("Input and output must be different directories.") + Cloner( + h5py.File(input), + h5py.File(output / input.name, "w"), + run_data=run_data, + control_data=control_data, + ) + elif input.is_dir(): + if output == input: + raise ValueError("Input and output must be different directories.") + # clone all hdf5 file present in the given directory + h5files = [f for f in input.glob("*") if h5py.is_hdf5(f)] + + progress(0, len(h5files), show=term_progress) + for n, file_ in enumerate(h5files, start=1): + Cloner( + h5py.File(file_), + h5py.File(output / file_.name, "w"), + run_data=run_data, + control_data=control_data, + ) + progress(n, len(h5files), show=term_progress) + else: + raise ValueError(f"invalid input: {input}") + + +def main(argv=None): + ap = ArgumentParser("Clone EuXFEL HDF5 files but with empty datasets.") + ap.add_argument("input", type=str, help="Path to an HDF5 file or a directory.") + ap.add_argument( + "output", type=str, help="Output directory to write the cloned files." + ) + ap.add_argument( + "--copy-run-data", + "-cr", + action="store_true", + default=False, + help="Copy data present in the RUN group.", + ) + ap.add_argument( + "--copy-control-data", + "-cc", + action="store_true", + default=False, + help="Copy data present in the CONTROL group.", + ) + + args = ap.parse_args(argv) + + print(f"Cloning file(s) structure:\ninput: {args.input}\nOutput: {args.output}\n") + copy_structure( + args.input, + args.output, + run_data=args.copy_run_data, + control_data=args.copy_control_data, + term_progress=True, + ) + print("Done.") + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/extra_data/tests/test_copy.py b/extra_data/tests/test_copy.py new file mode 100644 index 00000000..65ab4bde --- /dev/null +++ b/extra_data/tests/test_copy.py @@ -0,0 +1,71 @@ +from pathlib import Path + +import h5py +import numpy as np + +from extra_data.copy import copy_structure, main + + +def test_copy_structure(tmp_path, mock_sa3_control_data): + xgm = "SA3_XTD10_XGM/XGM/DOOCS" + xgm_intensity = f"INSTRUMENT/{xgm}:output/data/intensityTD" + xgm_flux = f"CONTROL/{xgm}/pulseEnergy/photonFlux/value" + + ext_file = 'ext-data.h5' + ext_path = 'some/data' + with h5py.File(mock_sa3_control_data, "a") as f: + # add some data + ds = f[xgm_intensity] + ds[:] = np.ones(ds.shape, ds.dtype) + ds = f[xgm_flux] + ds[:] = np.ones(ds.shape, ds.dtype) + # add softlink + f["group/SOFTLINKED"] = h5py.SoftLink(f"/{xgm_intensity}") + # add hardlink + f['group/HARDLINKED'] = ds + # add external link + with h5py.File(Path(mock_sa3_control_data).parent / ext_file, 'w') as g: + g[ext_path] = [1] + f['group/EXTLINK'] = h5py.ExternalLink(ext_file, ext_path) + + copy_structure(mock_sa3_control_data, tmp_path, control_data=True) + + inp = h5py.File(mock_sa3_control_data) + out = h5py.File(tmp_path / mock_sa3_control_data.rpartition("/")[-1]) + slink = out.get("group/SOFTLINKED", getlink=True) + extlink = out.get('group/EXTLINK', getlink=True) + + # softlinks are copied + assert isinstance(slink, h5py.SoftLink) + assert slink.path == f"/{xgm_intensity}" + # hardlink + assert out['group/HARDLINKED'] == out[xgm_flux] + # external link + assert extlink.filename == ext_file + assert extlink.path == ext_path + # data is not copied + assert out[xgm_intensity].shape == inp[xgm_intensity].shape + assert out[xgm_intensity].dtype == inp[xgm_intensity].dtype + assert (out[xgm_intensity][()] == 0).all() + # attributes are copied + assert out[xgm_intensity].attrs["unitName"] == "joule" + # control data is copied + assert out[xgm_flux].shape == inp[xgm_flux].shape + assert out[xgm_flux].dtype == inp[xgm_flux].dtype + assert (out[xgm_flux][()] == 1).all() + # run data is not copied + assert out[f"RUN/{xgm}/classId/value"].dtype == h5py.string_dtype() + assert out[f"RUN/{xgm}/classId/value"][()] == [b""] + + +def test_copy_run(tmp_path, mock_spb_proc_run): + copy_structure(mock_spb_proc_run, tmp_path) + + inp_files = list(Path(mock_spb_proc_run).glob('*.h5')) + out_files = list(tmp_path.glob('*.h5')) + assert len(inp_files) == len(out_files) + + +def test_cli(tmp_path, mock_scs_run): + # smoke test + main([mock_scs_run, str(tmp_path)]) diff --git a/extra_data/utils.py b/extra_data/utils.py index 8ccc39d2..592c08f3 100644 --- a/extra_data/utils.py +++ b/extra_data/utils.py @@ -9,14 +9,23 @@ """ import os +from shutil import get_terminal_size def available_cpu_cores(): # This process may be restricted to a subset of the cores on the machine; # sched_getaffinity() tells us which on some Unix flavours (inc Linux) - if hasattr(os, 'sched_getaffinity'): + if hasattr(os, "sched_getaffinity"): return len(os.sched_getaffinity(0)) else: # Fallback, inc on Windows ncpu = os.cpu_count() or 2 return min(ncpu, 8) + + +def progress_bar(done, total, suffix=" "): + line = f"Progress: {done}/{total}{suffix}[{{}}]" + length = min(get_terminal_size().columns - len(line), 50) + filled = int(length * done // total) + bar = "#" * filled + " " * (length - filled) + return line.format(bar) diff --git a/extra_data/validation.py b/extra_data/validation.py index 3cf02c44..caf2b3a0 100644 --- a/extra_data/validation.py +++ b/extra_data/validation.py @@ -4,13 +4,12 @@ import numpy as np import os import os.path as osp -from shutil import get_terminal_size from signal import signal, SIGINT, SIG_IGN import sys from .reader import H5File, FileAccess from .run_files_map import RunFilesMap - +from .utils import progress_bar class ValidationError(Exception): def __init__(self, problems): @@ -212,14 +211,6 @@ def check_index_contiguous(firsts, counts, record): )) -def progress_bar(done, total, suffix=' '): - line = f'Progress: {done}/{total}{suffix}[{{}}]' - length = min(get_terminal_size().columns - len(line), 50) - filled = int(length * done // total) - bar = '#' * filled + ' ' * (length - filled) - return line.format(bar) - - def _check_file(args): runpath, filename = args filepath = osp.join(runpath, filename)