Skip to content

Commit

Permalink
Merge pull request #94 from OpenCOMPES/90-loader-plugin-api
Browse files Browse the repository at this point in the history
90 loader plugin api
  • Loading branch information
rettigl authored Dec 3, 2022
2 parents b49ba7e + f6eee75 commit 87e9301
Show file tree
Hide file tree
Showing 29 changed files with 560 additions and 334 deletions.
7 changes: 4 additions & 3 deletions sed/calibrator/energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from silx.io import dictdump

from sed.binning import bin_dataframe
from sed.loader.mpes import MpesLoader
from sed.loader.base.loader import BaseLoader


class EnergyCalibrator:
Expand All @@ -48,6 +48,7 @@ class EnergyCalibrator:

def __init__(
self,
loader: BaseLoader,
biases: np.ndarray = None,
traces: np.ndarray = None,
tof: np.ndarray = None,
Expand All @@ -63,6 +64,7 @@ def __init__(
using the bin_data method.
"""

self.loader = loader
self.biases: np.ndarray = None
self.traces: np.ndarray = None
self.traces_normed: np.ndarray = None
Expand Down Expand Up @@ -217,8 +219,7 @@ def bin_data(
if bias_key is None:
bias_key = self._config.get("energy", {}).get("bias_key", "")

loader = MpesLoader(config=self._config)
dataframe = loader.read_dataframe(files=data_files)
dataframe, _ = self.loader.read_dataframe(files=data_files)
traces = bin_dataframe(
dataframe,
bins=bins,
Expand Down
3 changes: 3 additions & 0 deletions sed/config/default.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
core:
loader: dask

binning:
hist_mode: "numba"
mode: fast
Expand Down
31 changes: 23 additions & 8 deletions sed/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from sed.core.dfops import apply_jitter
from sed.core.metadata import MetaHandler
from sed.diagnostics import grid_histogram
from sed.loader.loader_interface import get_loader
from sed.loader.mirrorutil import CopyTool
from sed.loader.mpes import MpesLoader

N_CPU = psutil.cpu_count()

Expand Down Expand Up @@ -59,18 +59,25 @@ def __init__(
self._coordinates: Dict[Any, Any] = {}
self.axis: Dict[Any, Any] = {}
self._attributes = MetaHandler(meta=metadata)

loader_name = self._config["core"]["loader"]
self.loader = get_loader(
loader_name=loader_name,
config=self._config,
)

self.ec = EnergyCalibrator(
loader=self.loader,
config=self._config,
)

self.mc = MomentumCorrector(
config=self._config,
)

self.dc = DelayCalibrator(
config=self._config,
)
self.ml = MpesLoader( # pylint: disable=invalid-name
config=self._config,
)

self.use_copy_tool = self._config.get("core", {}).get(
"use_copy_tool",
Expand Down Expand Up @@ -216,17 +223,25 @@ def load(
if dataframe is not None:
self._dataframe = dataframe
elif folder is not None:
self._dataframe = self.ml.read_dataframe(
# pylint: disable=unused-variable
dataframe, metadata = self.loader.read_dataframe(
folder=cast(str, self.cpy(folder)),
**kwds,
)
self._files = self.ml.files
self._dataframe = dataframe
# TODO: Implement metadata treatment
# self._attributes.add(metadata)
self._files = self.loader.files
elif files is not None:
self._dataframe = self.ml.read_dataframe(
# pylint: disable=unused-variable
dataframe, metadata = self.loader.read_dataframe(
files=cast(List[str], self.cpy(files)),
**kwds,
)
self._files = self.ml.files
self._dataframe = dataframe
# TODO: Implement metadata treatment
# self._attributes.add(metadata)
self._files = self.loader.files
else:
raise ValueError(
"Either 'dataframe', 'files' or 'folder' needs to be privided!",
Expand Down
1 change: 1 addition & 0 deletions sed/loader/base/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# base loader
Empty file added sed/loader/base/__init__.py
Empty file.
50 changes: 50 additions & 0 deletions sed/loader/base/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""The abstract class off of which to implement loaders."""
from abc import ABC
from abc import abstractmethod
from typing import List
from typing import Sequence
from typing import Tuple

import dask.dataframe as ddf


class BaseLoader(ABC):
"""
The abstract class off of which to implement loaders.
The reader's folder name is the identifier.
For this BaseLoader with filename base/loader.py the ID becomes 'base'
"""

# pylint: disable=too-few-public-methods

__name__ = "BaseLoader"

supported_file_types: List[str] = []

@abstractmethod
def __init__(
self,
config: dict = None,
):
if config is None:
config = {}

self._config = config

self.files: List[str] = []

@abstractmethod
def read_dataframe(
self,
files: Sequence[str] = None,
folder: str = None,
ftype: str = None,
**kwds,
) -> Tuple[ddf.DataFrame, dict]:
"""Reads data from given files or folder and returns a dask dataframe,
and a dictionary with metadata"""
return None, None


LOADER = BaseLoader
Empty file added sed/loader/generic/__init__.py
Empty file.
104 changes: 104 additions & 0 deletions sed/loader/generic/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
module sed.loader.mpes, code for loading hdf5 files delayed into a dask dataframe.
Mostly ported from https://github.com/mpes-kit/mpes.
@author: L. Rettig
"""
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Sequence
from typing import Tuple

import dask.dataframe as ddf

from sed.loader.base.loader import BaseLoader
from sed.loader.utils import gather_files


class GenericLoader(BaseLoader): # pylint: disable=too-few-public-methods
"""Dask implementation of the Loader. Reads from various file types using the
utilities of Dask."""

__name__ = "dask"

supported_file_types = ["parquet", "csv", "json"]

def __init__(
self,
config: dict = None,
):
if config is None:
config = {}

self._config = config

self.files: List[str] = []

def read_dataframe(
self,
files: Sequence[str] = None,
folder: str = None,
ftype: str = "parquet",
**kwds,
) -> Tuple[ddf.DataFrame, dict]:
"""Read stored files from a folder into a dataframe.
Parameters:
folder, files: str, list/tuple | None, None
Folder path of the files or a list of file paths. The folder path has
the priority such that if it's specified, the specified files will
be ignored.
ftype: str | 'parquet'
File type to read ('parquet', 'json', 'csv', etc).
If a folder path is given, all files of the specified type are read
into the dataframe in the reading order.
**kwds: keyword arguments
See the keyword arguments for the specific file parser in
``dask.dataframe`` module.
**Return**\n
Dask dataframe read from specified files.
"""

metadata: Dict[Any, Any] = {}
# pylint: disable=duplicate-code
if folder is not None:
folder = os.path.realpath(folder)
files = gather_files(
folder=folder,
extension=ftype,
file_sorting=True,
**kwds,
)

elif folder is None:
if files is None:
raise ValueError(
"Either the folder or file path should be provided!",
)
files = [os.path.realpath(file) for file in files]

self.files = files

if not files:
raise FileNotFoundError("No valid files found!")

if ftype == "parquet":
return (ddf.read_parquet(files, **kwds), metadata)

if ftype == "json":
return (ddf.read_json(files, **kwds), metadata)

if ftype == "csv":
return (ddf.read_csv(files, **kwds), metadata)

try:
return (ddf.read_table(files, **kwds), metadata)
except (TypeError, ValueError, NotImplementedError) as exc:
raise Exception(
"The file format cannot be understood!",
) from exc


LOADER = GenericLoader
54 changes: 54 additions & 0 deletions sed/loader/loader_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Interface to select a specified loader
"""
import glob
import importlib.util
import os
from typing import List

from sed.loader.base.loader import BaseLoader


def get_loader(loader_name: str, config: dict = None) -> BaseLoader:
"""Helper function to get the loader object from it's given name"""

if config is None:
config = {}

path_prefix = (
f"{os.path.dirname(__file__)}{os.sep}"
if os.path.dirname(__file__)
else ""
)
path = os.path.join(path_prefix, loader_name, "loader.py")
if not os.path.exists(path):
error_str = f"Invalid loader {loader_name}. Available loaders are: ["
for loader in get_names_of_all_loaders():
error_str += f"{loader}, "
error_str += "]."
raise ValueError(error_str)

spec = importlib.util.spec_from_file_location("loader.py", path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore[attr-defined]
return module.LOADER(config=config) # type: ignore[attr-defined]


def get_names_of_all_loaders() -> List[str]:
"""Helper function to populate a list of all available loaders"""
path_prefix = (
f"{os.path.dirname(__file__)}{os.sep}"
if os.path.dirname(__file__)
else ""
)
files = glob.glob(os.path.join(path_prefix, "*", "loader.py"))
all_loaders = []
for file in files:
if f"{os.sep}base{os.sep}" not in file:
index_of_loaders_folder_name = file.rindex(
f"loader{os.sep}",
) + len(f"loader{os.sep}")
index_of_last_path_sep = file.rindex(os.sep)
all_loaders.append(
file[index_of_loaders_folder_name:index_of_last_path_sep],
)
return all_loaders
Empty file added sed/loader/mpes/__init__.py
Empty file.
Loading

0 comments on commit 87e9301

Please sign in to comment.