Skip to content

Commit

Permalink
Merge pull request #227 from OpenCOMPES/time_stamped_data2
Browse files Browse the repository at this point in the history
Time stamped data
  • Loading branch information
rettigl authored Nov 21, 2023
2 parents 6c6fc18 + 5dc117a commit 59f095f
Show file tree
Hide file tree
Showing 12 changed files with 634 additions and 54 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/documentation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ jobs:
# path: $GITHUB_WORKSPACE/_build
# key: ${{ runner.os }}-docs

- name: download WSe2 data
- name: download RAW data
# if: steps.cache-primes.outputs.cache-hit != 'true'
run: |
cd $GITHUB_WORKSPACE/docs/tutorial
curl -L --output ./WSe2.zip https://zenodo.org/record/6369728/files/WSe2.zip
unzip -o ./WSe2.zip -d .
curl -L --output ./TaS2.zip https://zenodo.org/records/10160182/files/TaS2.zip
- name: build Sphinx docs
run: poetry run sphinx-build -b html $GITHUB_WORKSPACE/docs $GITHUB_WORKSPACE/_build
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Single-Event DataFrame (SED) documentation
tutorial/1_binning_fake_data
tutorial/2_conversion_pipeline_for_example_time-resolved_ARPES_data
tutorial/3_metadata_collection_and_export_to_NeXus
tutorial/6_binning_with_time-stamped_data

.. toctree::
:maxdepth: 1
Expand Down
2 changes: 1 addition & 1 deletion sed/calibrator/momentum.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def load_data(
self.bin_ranges.append(
(
data.coords[axis][0].values,
data.coords[axis][-1].values,
2 * data.coords[axis][-1].values - data.coords[axis][-2].values, # endpoint
),
)
else:
Expand Down
6 changes: 4 additions & 2 deletions sed/config/mpes_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ momentum:
# default momentum calibration
calibration:
# x momentum scaleing factor
kx_scale: 0.012389400615413859
kx_scale: 0.010729535670610963
# y momentum scaleing factor
ky_scale: 0.012389400615413859
ky_scale: 0.010729535670610963
# x BZ center pixel
x_center: 256.0
# y BZ center pixel
Expand Down Expand Up @@ -216,6 +216,8 @@ histogram:
ranges: [[0, 1800], [0, 1800], [128000, 138000], [0, 32000]]

metadata:
# URL of the epics archiver request engine
archiver_url: "http://aa0.fhi-berlin.mpg.de:17668/retrieval/data/getData.json?pv="
# EPICS channels to collect from EPICS archiver
epics_pvs: ["KTOF:Lens:Extr:I", "trARPES:Carving:TEMP_RBV", "trARPES:XGS600:PressureAC:P_RD", "KTOF:Lens:UDLD:V", "KTOF:Lens:Sample:V", "KTOF:Apertures:m1.RBV", "KTOF:Apertures:m2.RBV", "KTOF:Apertures:m3.RBV", "trARPES:Carving:TRX.RBV", "trARPES:Carving:TRY.RBV", "trARPES:Carving:TRZ.RBV", "trARPES:Carving:THT.RBV", "trARPES:Carving:PHI.RBV", "trARPES:Carving:OMG.RBV"]
# hdf5 attribute containing the field aperture "in" motor position
Expand Down
42 changes: 42 additions & 0 deletions sed/core/dfops.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,48 @@ def apply_filter(
return out_df


def add_time_stamped_data(
df: dask.dataframe.DataFrame,
time_stamps: np.ndarray,
data: np.ndarray,
dest_column: str,
time_stamp_column: str,
**kwds,
) -> dask.dataframe.DataFrame:
"""Add data in form of timestamp/value pairs to the dataframe using interpolation to the
timestamps in the dataframe.
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
time_stamps (np.ndarray): Time stamps of the values to add
data (np.ndarray): Values corresponding at the time stamps in time_stamps
dest_column (str): destination column name
time_stamp_column (str): Time stamp column name
Returns:
Union[pd.DataFrame, dask.dataframe.DataFrame]: Dataframe with added column
"""
if time_stamp_column not in df.columns:
raise ValueError(f"{time_stamp_column} not found in dataframe!")

if len(time_stamps) != len(data):
raise ValueError("time_stamps and data have to be of same length!")

def interpolate_timestamps(
df: dask.dataframe.DataFrame,
) -> dask.dataframe.DataFrame:
df_timestamps = df[time_stamp_column]
df[dest_column] = np.interp(df_timestamps, time_stamps, data)
return df

if not isinstance(df, dask.dataframe.DataFrame):
raise ValueError("This function only works for Dask Dataframes!")

df = df.map_partitions(interpolate_timestamps, **kwds)

return df


def map_columns_2d(
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
map_2d: Callable,
Expand Down
77 changes: 76 additions & 1 deletion sed/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sed.core.config import parse_config
from sed.core.config import save_config
from sed.core.dfops import apply_filter
from sed.core.dfops import add_time_stamped_data
from sed.core.dfops import apply_jitter
from sed.core.metadata import MetaHandler
from sed.diagnostics import grid_histogram
Expand All @@ -34,6 +35,8 @@
from sed.io import to_tiff
from sed.loader import CopyTool
from sed.loader import get_loader
from sed.loader.mpes.loader import get_archiver_data
from sed.loader.mpes.loader import MpesLoader

N_CPU = psutil.cpu_count()

Expand Down Expand Up @@ -119,7 +122,10 @@ def __init__(
)

self.ec = EnergyCalibrator(
loader=self.loader,
loader=get_loader(
loader_name=loader_name,
config=self._config,
),
config=self._config,
)

Expand Down Expand Up @@ -1715,6 +1721,75 @@ def add_jitter(
metadata.append(col)
self._attributes.add(metadata, "jittering", duplicate_policy="append")

def add_time_stamped_data(
self,
dest_column: str,
time_stamps: np.ndarray = None,
data: np.ndarray = None,
archiver_channel: str = None,
**kwds,
):
"""Add data in form of timestamp/value pairs to the dataframe using interpolation to the
timestamps in the dataframe. The time-stamped data can either be provided, or fetched from
an EPICS archiver instance.
Args:
dest_column (str): destination column name
time_stamps (np.ndarray, optional): Time stamps of the values to add. If omitted,
time stamps are retrieved from the epics archiver
data (np.ndarray, optional): Values corresponding at the time stamps in time_stamps.
If omitted, data are retrieved from the epics archiver.
archiver_channel (str, optional): EPICS archiver channel from which to retrieve data.
Either this or data and time_stamps have to be present.
**kwds: additional keyword arguments passed to add_time_stamped_data
"""
time_stamp_column = kwds.pop(
"time_stamp_column",
self._config["dataframe"].get("time_stamp_alias", ""),
)

if time_stamps is None and data is None:
if archiver_channel is None:
raise ValueError(
"Either archiver_channel or both time_stamps and data have to be present!",
)
if self.loader.__name__ != "mpes":
raise NotImplementedError(
"This function is currently only implemented for the mpes loader!",
)
ts_from, ts_to = cast(MpesLoader, self.loader).get_start_and_end_time()
# get channel data with +-5 seconds safety margin
time_stamps, data = get_archiver_data(
archiver_url=self._config["metadata"].get("archiver_url", ""),
archiver_channel=archiver_channel,
ts_from=ts_from - 5,
ts_to=ts_to + 5,
)

self._dataframe = add_time_stamped_data(
self._dataframe,
time_stamps=time_stamps,
data=data,
dest_column=dest_column,
time_stamp_column=time_stamp_column,
**kwds,
)
if self._timed_dataframe is not None:
if time_stamp_column in self._timed_dataframe:
self._timed_dataframe = add_time_stamped_data(
self._timed_dataframe,
time_stamps=time_stamps,
data=data,
dest_column=dest_column,
time_stamp_column=time_stamp_column,
**kwds,
)
metadata: List[Any] = []
metadata.append(dest_column)
metadata.append(time_stamps)
metadata.append(data)
self._attributes.add(metadata, "time_stamped_data", duplicate_policy="append")

def pre_binning(
self,
df_partitions: int = 100,
Expand Down
90 changes: 60 additions & 30 deletions sed/loader/mpes/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def hdf5_to_timed_array(
# need to correct for the time it took to write the file
start_time -= len(ms_marker) / 1000

time_stamp_data = start_time + ms_marker / 1000
time_stamp_data = start_time + np.arange(len(ms_marker)) / 1000

data_list.append(time_stamp_data)

Expand Down Expand Up @@ -441,6 +441,34 @@ def get_elapsed_time(
return secs


def get_archiver_data(
archiver_url: str,
archiver_channel: str,
ts_from: float,
ts_to: float,
) -> Tuple[np.ndarray, np.ndarray]:
"""Extract time stamps and corresponding data from and EPICS archiver instance
Args:
archiver_url (str): URL of the archiver data extraction interface
archiver_channel (str): EPICS channel to extract data for
ts_from (float): starting time stamp of the range of interest
ts_to (float): ending time stamp of the range of interest
Returns:
Tuple[List, List]: The extracted time stamps and corresponding data
"""
iso_from = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
iso_to = datetime.datetime.utcfromtimestamp(ts_to).isoformat()
req_str = archiver_url + archiver_channel + "&from=" + iso_from + "Z&to=" + iso_to + "Z"
with urllib.request.urlopen(req_str) as req:
data = json.load(req)
secs = [x["secs"] + x["nanos"] * 1e-9 for x in data[0]["data"]]
vals = [x["val"] for x in data[0]["data"]]

return (np.asarray(secs), np.asarray(vals))


class MpesLoader(BaseLoader):
"""Mpes implementation of the Loader. Reads from h5 files or folders of the
SPECS Metis 1000 (FHI Berlin)
Expand Down Expand Up @@ -645,6 +673,28 @@ def get_files_from_run_id(
# Return the list of found files
return files

def get_start_and_end_time(self) -> Tuple[float, float]:
"""Extract the start and end time stamps from the loaded files
Returns:
Tuple[float, float]: A tuple containing the start and end time stamps
"""
h5file = h5py.File(self.files[0])
timestamps = hdf5_to_array(
h5file,
group_names=self._config["dataframe"]["hdf5_groupnames"],
time_stamps=True,
)
ts_from = timestamps[-1][1]
h5file = h5py.File(self.files[-1])
timestamps = hdf5_to_array(
h5file,
group_names=self._config["dataframe"]["hdf5_groupnames"],
time_stamps=True,
)
ts_to = timestamps[-1][-1]
return (ts_from, ts_to)

def gather_metadata(
self,
files: Sequence[str],
Expand All @@ -666,21 +716,7 @@ def gather_metadata(
print("Gathering metadata from different locations")
# Read events in with ms time stamps
print("Collecting time stamps...")

h5file = h5py.File(files[0])
timestamps = hdf5_to_array(
h5file,
group_names=self._config["dataframe"]["hdf5_groupnames"],
time_stamps=True,
)
ts_from = timestamps[-1][1]
h5file = h5py.File(files[-1])
timestamps = hdf5_to_array(
h5file,
group_names=self._config["dataframe"]["hdf5_groupnames"],
time_stamps=True,
)
ts_to = timestamps[-1][-1]
(ts_from, ts_to) = self.get_start_and_end_time()

metadata["timing"] = {
"acquisition_start": datetime.datetime.utcfromtimestamp(ts_from)
Expand Down Expand Up @@ -709,28 +745,22 @@ def gather_metadata(

print("Collecting data from the EPICS archive...")
# Get metadata from Epics archive if not present already
start = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
end = datetime.datetime.utcfromtimestamp(ts_to).isoformat()
epics_channels = self._config["metadata"]["epics_pvs"]

start = datetime.datetime.utcfromtimestamp(ts_from).isoformat()

channels_missing = set(epics_channels) - set(
metadata["file"].keys(),
)
for channel in channels_missing:
try:
req_str = (
"http://aa0.fhi-berlin.mpg.de:17668/retrieval/data/getData.json?pv="
+ channel
+ "&from="
+ start
+ "Z&to="
+ end
+ "Z"
_, vals = get_archiver_data(
archiver_url=self._config["metadata"].get("archiver_url"),
archiver_channel=channel,
ts_from=ts_from,
ts_to=ts_to,
)
with urllib.request.urlopen(req_str) as req:
data = json.load(req)
vals = [x["val"] for x in data[0]["data"]]
metadata["file"][f"{channel}"] = np.mean(vals)
metadata["file"][f"{channel}"] = np.mean(vals)

except IndexError:
metadata["file"][f"{channel}"] = np.nan
Expand Down
Loading

0 comments on commit 59f095f

Please sign in to comment.