Skip to content

Commit

Permalink
add storage of conversion_parametesr to specsscan, add tests for it a…
Browse files Browse the repository at this point in the history
…nd an example to tutorial 1
  • Loading branch information
rettigl committed Mar 8, 2024
1 parent a1e1946 commit 04e24c1
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 65 deletions.
104 changes: 56 additions & 48 deletions specsanalyzer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def convert_image(
kinetic_energy: float,
pass_energy: float,
work_function: float,
conversion_dict: dict = None,
conversion_parameters: dict = None,
**kwds,
) -> xr.DataArray:
"""Converts an imagin in physical unit data, angle vs energy
Expand All @@ -109,75 +109,80 @@ def convert_image(
kinetic_energy (float): set analyser kinetic energy
pass_energy (float): set analyser pass energy
work_function (float): set analyser work function
conversion_dict (dict, optional): dictionary of conversion parameters, overwriting
determination from calib2d file. Defaults to None.
conversion_parameters (dict, optional): dictionary of conversion parameters,
overwriting determination from calib2d file. Defaults to None.
Returns:
xr.DataArray: xarray containg the corrected data and kinetic and angle axis
"""
if conversion_dict is None:
conversion_dict = {}
if conversion_parameters is None:
conversion_parameters = {}
else:
conversion_parameters = conversion_parameters.copy()

if "apply_fft_filter" not in conversion_dict.keys():
conversion_dict["apply_fft_filter"] = kwds.pop(
if "apply_fft_filter" not in conversion_parameters.keys():
conversion_parameters["apply_fft_filter"] = kwds.pop(
"apply_fft_filter",
self._config.get("apply_fft_filter", False),
)
if "binning" not in conversion_dict.keys():
conversion_dict["binning"] = kwds.pop("binning", self._config.get("binning", 1))
if "rotation_angle" not in conversion_dict.keys():
conversion_dict["rotation_angle"] = kwds.pop(
if "binning" not in conversion_parameters.keys():
conversion_parameters["binning"] = kwds.pop("binning", self._config.get("binning", 1))
if "rotation_angle" not in conversion_parameters.keys():
conversion_parameters["rotation_angle"] = kwds.pop(
"rotation_angle",
self._config.get("rotation_angle", 0),
)

if conversion_dict["apply_fft_filter"]:
if conversion_parameters["apply_fft_filter"]:
try:
if "fft_filter_peaks" not in conversion_dict.keys():
conversion_dict["fft_filter_peaks"] = kwds.pop(
if "fft_filter_peaks" not in conversion_parameters.keys():
conversion_parameters["fft_filter_peaks"] = kwds.pop(
"fft_filter_peaks",
self._config["fft_filter_peaks"],
)
img = fourier_filter_2d(raw_img, conversion_dict["fft_filter_peaks"])
img = fourier_filter_2d(raw_img, conversion_parameters["fft_filter_peaks"])
except KeyError:
img = raw_img
conversion_parameters["apply_fft_filter"] = False
else:
img = raw_img

if conversion_dict["rotation_angle"]:
img_rotated = imutils.rotate(img, angle=conversion_dict["rotation_angle"])
if conversion_parameters["rotation_angle"]:
img_rotated = imutils.rotate(img, angle=conversion_parameters["rotation_angle"])
img = img_rotated

if "lens_mode" not in conversion_dict.keys():
conversion_dict["lens_mode"] = lens_mode
conversion_dict["kinetic_energy"] = kinetic_energy
conversion_dict["pass_energy"] = pass_energy
conversion_dict["work_function"] = work_function
if "lens_mode" not in conversion_parameters.keys():
conversion_parameters["lens_mode"] = lens_mode
conversion_parameters["kinetic_energy"] = kinetic_energy
conversion_parameters["pass_energy"] = pass_energy
conversion_parameters["work_function"] = work_function
# Determine conversion parameters from calib2d
(
conversion_dict["a_inner"],
conversion_dict["da_matrix"],
conversion_dict["retardation_ratio"],
conversion_dict["source"],
conversion_dict["dims"],
conversion_parameters["a_inner"],
conversion_parameters["da_matrix"],
conversion_parameters["retardation_ratio"],
conversion_parameters["source"],
conversion_parameters["dims"],
) = get_damatrix_fromcalib2d(
lens_mode=lens_mode,
kinetic_energy=kinetic_energy,
pass_energy=pass_energy,
work_function=work_function,
calib2d_dict=self._calib2d,
)
conversion_dict["e_shift"] = np.array(self._calib2d["eShift"])
conversion_dict["de1"] = [self._calib2d["De1"]]
conversion_dict["e_range"] = self._calib2d["eRange"]
conversion_dict["a_range"] = self._calib2d[lens_mode]["default"]["aRange"]
conversion_dict["pixel_size"] = self._config["pixel_size"] * self._config["binning"]
conversion_dict["magnification"] = self._config["magnification"]
conversion_dict["angle_offset_px"] = kwds.get(
conversion_parameters["e_shift"] = np.array(self._calib2d["eShift"])
conversion_parameters["de1"] = [self._calib2d["De1"]]
conversion_parameters["e_range"] = self._calib2d["eRange"]
conversion_parameters["a_range"] = self._calib2d[lens_mode]["default"]["aRange"]
conversion_parameters["pixel_size"] = (
self._config["pixel_size"] * self._config["binning"]
)
conversion_parameters["magnification"] = self._config["magnification"]
conversion_parameters["angle_offset_px"] = kwds.get(
"angle_offset_px",
self._config.get("angle_offset_px", 0),
)
conversion_dict["energy_offset_px"] = kwds.get(
conversion_parameters["energy_offset_px"] = kwds.get(
"energy_offset_px",
self._config.get("energy_offset_px", 0),
)
Expand Down Expand Up @@ -211,16 +216,16 @@ def convert_image(
pass_energy=pass_energy,
nx_pixels=img.shape[1],
ny_pixels=img.shape[0],
pixel_size=conversion_dict["pixel_size"],
magnification=conversion_dict["magnification"],
e_shift=conversion_dict["e_shift"],
de1=conversion_dict["de1"],
e_range=conversion_dict["e_range"],
a_range=conversion_dict["a_range"],
a_inner=conversion_dict["a_inner"],
da_matrix=conversion_dict["da_matrix"],
angle_offset_px=conversion_dict["angle_offset_px"],
energy_offset_px=conversion_dict["energy_offset_px"],
pixel_size=conversion_parameters["pixel_size"],
magnification=conversion_parameters["magnification"],
e_shift=conversion_parameters["e_shift"],
de1=conversion_parameters["de1"],
e_range=conversion_parameters["e_range"],
a_range=conversion_parameters["a_range"],
a_inner=conversion_parameters["a_inner"],
da_matrix=conversion_parameters["da_matrix"],
angle_offset_px=conversion_parameters["angle_offset_px"],
energy_offset_px=conversion_parameters["energy_offset_px"],
)

# save the config parameters for later use collect the info in a new nested dictionary
Expand Down Expand Up @@ -262,9 +267,12 @@ def convert_image(

data_array = xr.DataArray(
data=conv_img,
coords={conversion_dict["dims"][0]: angle_axis, conversion_dict["dims"][1]: ek_axis},
dims=conversion_dict["dims"],
attrs={"conversion_parameters": conversion_dict},
coords={
conversion_parameters["dims"][0]: angle_axis,
conversion_parameters["dims"][1]: ek_axis,
},
dims=conversion_parameters["dims"],
attrs={"conversion_parameters": conversion_parameters},
)

# Handle cropping based on parameters stored in correction dictionary
Expand Down
19 changes: 10 additions & 9 deletions specsscan/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,11 @@ def load_scan(
)

self._scan_info = parse_info_to_dict(scan_path)
config_meta = copy.deepcopy(self.config)

loader_dict = {
"iterations": iterations,
"scan_path": scan_path,
"raw_data": data,
"convert_config": config_meta["spa_params"],
}

(scan_type, lens_mode, kin_energy, pass_energy, work_function) = (
Expand Down Expand Up @@ -198,7 +196,7 @@ def load_scan(
**kwds,
)
else:
xr_list = []
xr_list: list[xr.DataArray] = []
for image in data:
xr_list.append(
self.spa.convert_image(
Expand Down Expand Up @@ -226,6 +224,8 @@ def load_scan(
else:
res_xarray = res_xarray.transpose("Angle", "Ekin", dim)

conversion_metadata = res_xarray.attrs.pop("conversion_parameters")

# rename coords and store mapping information, if available
coordinate_mapping = self._config.get("coordinate_mapping", {})
coordinate_depends = self._config.get("coordinate_depends", {})
Expand Down Expand Up @@ -268,6 +268,7 @@ def load_scan(
collect_metadata=collect_metadata,
),
**{"loader": loader_dict},
**{"conversion_parameters": conversion_metadata},
)

res_xarray.attrs["metadata"] = self.metadata
Expand Down Expand Up @@ -352,16 +353,11 @@ def check_scan(
)

self._scan_info = parse_info_to_dict(scan_path)
config_meta = copy.deepcopy(self.config)

loader_dict = {
"delays": delays,
"scan_path": scan_path,
"raw_data": load_images( # AVG data
scan_path,
df_lut,
),
"convert_config": config_meta["spa_params"],
"raw_data": data,
"check_scan": True,
}

Expand Down Expand Up @@ -391,6 +387,8 @@ def check_scan(
self.spa.print_msg = False
self.spa.print_msg = True

conversion_metadata = xr_list[0].attrs["conversion_parameters"]

dims = get_coords(
scan_path=scan_path,
scan_type=scan_type,
Expand Down Expand Up @@ -424,6 +422,7 @@ def check_scan(
collect_metadata=collect_metadata,
),
**{"loader": loader_dict},
**{"conversion_parameters": conversion_metadata},
)
if metadata is not None:
self.metadata.update(**metadata)
Expand Down Expand Up @@ -556,6 +555,7 @@ def process_sweep_scan(
work_function,
**kwds,
)
conversion_parameters = converted.attrs["conversion_parameters"]
# check for crop parameters
if (
not {"ang_range_min", "ang_range_max", "ek_range_min", "ek_range_max"}.issubset(
Expand Down Expand Up @@ -596,5 +596,6 @@ def process_sweep_scan(
self.spa.print_msg = True
# Strip first and last energy points, as they are not fully filled
data = data[:, 1:-1]
data.attrs["conversion_parameters"] = conversion_parameters

return data
2 changes: 1 addition & 1 deletion tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def test_conversion_from_dict():
pass_energy=pass_energy,
work_function=work_function,
apply_fft_filter=False,
conversion_dict=conversion_parameters,
conversion_parameters=conversion_parameters,
)

# Calculate the average intensity of the image, neglect the noisy parts
Expand Down
54 changes: 52 additions & 2 deletions tests/test_specsscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,63 @@ def test_conversion_3d():
assert res_xarray.sum(axis=(0, 1, 2)) == res_xarray2.sum(axis=(0, 1, 2))

with pytest.raises(IndexError):
sps.check_scan(
sps.load_scan(
scan=4450,
delays=range(1, 20),
iterations=range(1, 20),
path=test_dir,
)


def test_conversion_from_convert_dict():
"""Test the conversion without calib2d file, using passen conversion dictionary parameters"""
sps = SpecsScan(
config={},
user_config={},
system_config={},
)
with pytest.raises(ValueError):
res_xarray = sps.load_scan(
scan=4450,
path=test_dir,
)

conversion_parameters = {
"lens_mode": "WideAngleMode",
"kinetic_energy": 21.9,
"pass_energy": 30.0,
"work_function": 4.558,
"a_inner": 15.0,
"da_matrix": np.array(
[
[0.70585613, 0.74383533, 0.7415424],
[-0.00736453, 0.05832768, 0.14868587],
[-0.00759583, -0.04533556, -0.09021117],
[-0.00180035, 0.00814881, 0.01743308],
],
),
"retardation_ratio": 0.5780666666666666,
"source": "interpolated as 0.4386666666666681*[email protected] + 0.5613333333333319*[email protected]", # noqa
"dims": ["Angle", "Ekin"],
"e_shift": np.array([-0.05, 0.0, 0.05]),
"de1": [0.0033],
"e_range": [-0.066, 0.066],
"a_range": [-15.0, 15.0],
"pixel_size": 0.0258,
"magnification": 4.54,
"angle_offset_px": 0,
"energy_offset_px": 0,
}

res_xarray = sps.load_scan(
scan=4450,
path=test_dir,
conversion_parameters=conversion_parameters,
)

for key in conversion_parameters.keys():
assert key in res_xarray.attrs["metadata"]["conversion_parameters"]


def test_checkscan():
"""Test the check_scan function"""
sps = SpecsScan(
Expand Down
Loading

0 comments on commit 04e24c1

Please sign in to comment.