-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #467 : Clone euxfel data file structure
- Loading branch information
Showing
4 changed files
with
264 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
import sys | ||
from argparse import ArgumentParser | ||
from pathlib import Path | ||
from typing import Union | ||
|
||
import h5py | ||
|
||
from .utils import progress_bar | ||
|
||
__all__ = ["copy_structure"] | ||
|
||
|
||
def progress(processed, total, *, show=True): | ||
"""Show progress information""" | ||
if not show: | ||
return | ||
|
||
pbar = progress_bar(processed, total) | ||
if sys.stderr.isatty(): | ||
# "\x1b[2K": delete whole line, "\x1b[1A": move up cursor | ||
print("\x1b[2K\x1b[1A\x1b[2K\x1b[1A", file=sys.stderr) | ||
print(pbar, file=sys.stderr) | ||
|
||
|
||
class Cloner: | ||
def __init__(self, input, output, *, run_data=False, control_data=False): | ||
self.run_data = run_data | ||
self.control_data = control_data | ||
self.visited = {} | ||
|
||
if output.file.mode == "r": | ||
raise ValueError("Output file must be writeable.") | ||
self.visit(input, output) | ||
|
||
@staticmethod | ||
def _copy_attrs(input, output): | ||
for key, value in input.attrs.items(): | ||
output.attrs.create(key, value) | ||
|
||
def visit(self, obj, output): | ||
if obj.name != "/": | ||
link = obj.file.get(obj.name, getlink=True) | ||
if isinstance(link, h5py.SoftLink): | ||
# note this works only for SoftLinks. ExternalLink object's | ||
# name is not the name of the path, but the targeted file's path | ||
output[obj.name] = link | ||
return | ||
|
||
obj_id = h5py.h5o.get_info(obj.id).addr | ||
|
||
if obj_id in self.visited: | ||
# Hardlink to an object we've already seen | ||
output[obj.name] = output[self.visited[obj_id]] | ||
return | ||
|
||
self.visited[obj_id] = obj.name | ||
|
||
if isinstance(obj, h5py.Dataset): | ||
if ( | ||
obj.name.startswith("/INSTRUMENT") | ||
or (obj.name.startswith("/CONTROL") and not self.control_data) | ||
or (obj.name.startswith("/RUN") and not self.run_data) | ||
): | ||
output_obj = output.create_dataset_like(obj.name, obj) | ||
else: | ||
# note: consider using h5py.File.copy once a bug causing | ||
# segfault for dataset with attributes is fixed, | ||
# see: https://github.com/HDFGroup/hdf5/issues/2414 | ||
output_obj = output.create_dataset_like(obj.name, obj, data=obj[()]) | ||
self._copy_attrs(obj, output_obj) | ||
elif isinstance(obj, h5py.Group): | ||
if obj == obj.file: | ||
# root object | ||
output_obj = output["/"] | ||
else: | ||
output_obj = output.create_group(obj.name) | ||
self._copy_attrs(obj, output_obj) | ||
|
||
for name, child in obj.items(): | ||
if child.file != obj.file: | ||
# external link | ||
output[f'{obj.name}/{name}'] = obj.get(name, getlink=True) | ||
else: | ||
self.visit(child, output) | ||
else: | ||
# unknown type | ||
return | ||
|
||
|
||
def copy_structure( | ||
input: Union[Path, str], | ||
output: Union[Path, str], | ||
*, | ||
run_data=False, | ||
control_data=False, | ||
term_progress=False, | ||
) -> None: | ||
"""Clone EuXFEL HDF5 file structure without any of its data. | ||
Clone the input file or files present the input directory. | ||
The cloned files will be written to output. | ||
args: | ||
run_data: Copy data in RUN group if set to True | ||
control_data: Copy data in CONTROL group if set to True | ||
term_progress: show progress in terminal if set to True | ||
""" | ||
if isinstance(input, str): | ||
input = Path(input) | ||
input = input.expanduser() | ||
|
||
if isinstance(output, str): | ||
output = Path(output) | ||
output = output.expanduser() | ||
|
||
if not output.is_dir(): | ||
raise ValueError(f"The given output directory does not exist: {output}") | ||
|
||
if h5py.is_hdf5(input): | ||
if output == input.parent: | ||
raise ValueError("Input and output must be different directories.") | ||
Cloner( | ||
h5py.File(input), | ||
h5py.File(output / input.name, "w"), | ||
run_data=run_data, | ||
control_data=control_data, | ||
) | ||
elif input.is_dir(): | ||
if output == input: | ||
raise ValueError("Input and output must be different directories.") | ||
# clone all hdf5 file present in the given directory | ||
h5files = [f for f in input.glob("*") if h5py.is_hdf5(f)] | ||
|
||
progress(0, len(h5files), show=term_progress) | ||
for n, file_ in enumerate(h5files, start=1): | ||
Cloner( | ||
h5py.File(file_), | ||
h5py.File(output / file_.name, "w"), | ||
run_data=run_data, | ||
control_data=control_data, | ||
) | ||
progress(n, len(h5files), show=term_progress) | ||
else: | ||
raise ValueError(f"invalid input: {input}") | ||
|
||
|
||
def main(argv=None): | ||
ap = ArgumentParser("Clone EuXFEL HDF5 files but with empty datasets.") | ||
ap.add_argument("input", type=str, help="Path to an HDF5 file or a directory.") | ||
ap.add_argument( | ||
"output", type=str, help="Output directory to write the cloned files." | ||
) | ||
ap.add_argument( | ||
"--copy-run-data", | ||
"-cr", | ||
action="store_true", | ||
default=False, | ||
help="Copy data present in the RUN group.", | ||
) | ||
ap.add_argument( | ||
"--copy-control-data", | ||
"-cc", | ||
action="store_true", | ||
default=False, | ||
help="Copy data present in the CONTROL group.", | ||
) | ||
|
||
args = ap.parse_args(argv) | ||
|
||
print(f"Cloning file(s) structure:\ninput: {args.input}\nOutput: {args.output}\n") | ||
copy_structure( | ||
args.input, | ||
args.output, | ||
run_data=args.copy_run_data, | ||
control_data=args.copy_control_data, | ||
term_progress=True, | ||
) | ||
print("Done.") | ||
|
||
|
||
if __name__ == "__main__": | ||
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
from pathlib import Path | ||
|
||
import h5py | ||
import numpy as np | ||
|
||
from extra_data.copy import copy_structure, main | ||
|
||
|
||
def test_copy_structure(tmp_path, mock_sa3_control_data): | ||
xgm = "SA3_XTD10_XGM/XGM/DOOCS" | ||
xgm_intensity = f"INSTRUMENT/{xgm}:output/data/intensityTD" | ||
xgm_flux = f"CONTROL/{xgm}/pulseEnergy/photonFlux/value" | ||
|
||
ext_file = 'ext-data.h5' | ||
ext_path = 'some/data' | ||
with h5py.File(mock_sa3_control_data, "a") as f: | ||
# add some data | ||
ds = f[xgm_intensity] | ||
ds[:] = np.ones(ds.shape, ds.dtype) | ||
ds = f[xgm_flux] | ||
ds[:] = np.ones(ds.shape, ds.dtype) | ||
# add softlink | ||
f["group/SOFTLINKED"] = h5py.SoftLink(f"/{xgm_intensity}") | ||
# add hardlink | ||
f['group/HARDLINKED'] = ds | ||
# add external link | ||
with h5py.File(Path(mock_sa3_control_data).parent / ext_file, 'w') as g: | ||
g[ext_path] = [1] | ||
f['group/EXTLINK'] = h5py.ExternalLink(ext_file, ext_path) | ||
|
||
copy_structure(mock_sa3_control_data, tmp_path, control_data=True) | ||
|
||
inp = h5py.File(mock_sa3_control_data) | ||
out = h5py.File(tmp_path / mock_sa3_control_data.rpartition("/")[-1]) | ||
slink = out.get("group/SOFTLINKED", getlink=True) | ||
extlink = out.get('group/EXTLINK', getlink=True) | ||
|
||
# softlinks are copied | ||
assert isinstance(slink, h5py.SoftLink) | ||
assert slink.path == f"/{xgm_intensity}" | ||
# hardlink | ||
assert out['group/HARDLINKED'] == out[xgm_flux] | ||
# external link | ||
assert extlink.filename == ext_file | ||
assert extlink.path == ext_path | ||
# data is not copied | ||
assert out[xgm_intensity].shape == inp[xgm_intensity].shape | ||
assert out[xgm_intensity].dtype == inp[xgm_intensity].dtype | ||
assert (out[xgm_intensity][()] == 0).all() | ||
# attributes are copied | ||
assert out[xgm_intensity].attrs["unitName"] == "joule" | ||
# control data is copied | ||
assert out[xgm_flux].shape == inp[xgm_flux].shape | ||
assert out[xgm_flux].dtype == inp[xgm_flux].dtype | ||
assert (out[xgm_flux][()] == 1).all() | ||
# run data is not copied | ||
assert out[f"RUN/{xgm}/classId/value"].dtype == h5py.string_dtype() | ||
assert out[f"RUN/{xgm}/classId/value"][()] == [b""] | ||
|
||
|
||
def test_copy_run(tmp_path, mock_spb_proc_run): | ||
copy_structure(mock_spb_proc_run, tmp_path) | ||
|
||
inp_files = list(Path(mock_spb_proc_run).glob('*.h5')) | ||
out_files = list(tmp_path.glob('*.h5')) | ||
assert len(inp_files) == len(out_files) | ||
|
||
|
||
def test_cli(tmp_path, mock_scs_run): | ||
# smoke test | ||
main([mock_scs_run, str(tmp_path)]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters