Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor flashloader #329

Merged
merged 80 commits into from
Jun 30, 2024
Merged

Refactor flashloader #329

merged 80 commits into from
Jun 30, 2024

Conversation

zain-sohail
Copy link
Member

@zain-sohail zain-sohail commented Jan 18, 2024

Summary
The FlashLoader is modularized into 2 classes. The fel module contains these classes and can be reused by SXP (already done in #331) and the lab setup at DESY.

'fel' module

  1. DataFrameCreator:

    • Restructured the process of generating dataframes, organizing them by channel format (electron, pulse, train).
    • Electron dataframe loading is 3x faster due to directly loading all dld channels at once from the dataset.
    • Same index and dataset key behavior as in SXP currently, but also being compatible with current Flash structure.
  2. BufferFileHandler:

    • Manages the creation of buffer files, offering the flexibility of serial or parallel generation as needed.
    • Conducts schema checks against the configuration file for existing buffer files.

Tests should also be available.

@coveralls
Copy link
Collaborator

coveralls commented Jan 18, 2024

Pull Request Test Coverage Report for Build 9733289561

Details

  • 709 of 735 (96.46%) changed or added relevant lines in 15 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.6%) to 92.462%

Changes Missing Coverage Covered Lines Changed/Added Lines %
sed/loader/flash/buffer_handler.py 81 82 98.78%
sed/loader/flash/utils.py 45 46 97.83%
sed/loader/sxp/loader.py 7 8 87.5%
tests/loader/flash/conftest.py 28 29 96.55%
sed/loader/flash/instruments.py 3 6 50.0%
sed/loader/utils.py 28 31 90.32%
sed/loader/flash/dataframe.py 85 90 94.44%
sed/loader/flash/loader.py 68 73 93.15%
tests/loader/test_loaders.py 16 22 72.73%
Totals Coverage Status
Change from base Build 9680494155: 0.6%
Covered Lines: 6869
Relevant Lines: 7429

💛 - Coveralls

Copy link
Member

@rettigl rettigl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put in a few comments, but honestly this is way too large to properly review. Also, the three pull requests you opened seem to contain the same code changes in different stages of modification. I don't understand really how they relate, and which is supposed to change and contain what. I don't really see how I can review this in the current state.

sed/loader/fel/buffer.py Outdated Show resolved Hide resolved
sed/loader/fel/buffer.py Outdated Show resolved Hide resolved
sed/loader/fel/buffer.py Outdated Show resolved Hide resolved
sed/loader/fel/buffer.py Outdated Show resolved Hide resolved
sed/loader/fel/buffer.py Outdated Show resolved Hide resolved
sed/loader/fel/dataframe.py Outdated Show resolved Hide resolved
sed/loader/fel/parquet.py Outdated Show resolved Hide resolved
sed/loader/fel/parquet.py Outdated Show resolved Hide resolved
sed/loader/flash/loader.py Outdated Show resolved Hide resolved
sed/loader/flash/loader.py Outdated Show resolved Hide resolved
@rettigl
Copy link
Member

rettigl commented Feb 1, 2024

When testing with the tutorial 4, I get the following error:

config={"core": {"paths": {
    "data_raw_dir": "../../flash_test_data/fl1user3", 
    "data_parquet_dir": "../../flash_test_data/parquet/"
}}}
sp = SedProcessor(runs=[44797], config=config, user_config=config_file, system_config={}, collect_metadata=False, force_recreate=True)
Folder config loaded from: [/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/sed_config.yaml]
User config loaded from: [/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/hextof_config.yaml]
Default config loaded from: [/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/config/default.yaml]
Reading files: 5 new files of 5 total.
[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   2 out of   5 | elapsed:   19.7s remaining:   29.5s
[Parallel(n_jobs=5)]: Done   3 out of   5 | elapsed:   37.9s remaining:   25.3s
[Parallel(n_jobs=5)]: Done   5 out of   5 | elapsed:   38.6s remaining:    0.0s
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In [4], [line 5](vscode-notebook-cell:?execution_count=4&line=5)
      [1](vscode-notebook-cell:?execution_count=4&line=1) config={"core": {"paths": {
      [2](vscode-notebook-cell:?execution_count=4&line=2)     "data_raw_dir": "../../flash_test_data/fl1user3", 
      [3](vscode-notebook-cell:?execution_count=4&line=3)     "data_parquet_dir": "../../flash_test_data/parquet/"
      [4](vscode-notebook-cell:?execution_count=4&line=4) }}}
----> [5](vscode-notebook-cell:?execution_count=4&line=5) sp = SedProcessor(runs=[44797], config=config, user_config=config_file, system_config={}, collect_metadata=False, force_recreate=True)

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:156, in SedProcessor.__init__(self, metadata, config, dataframe, files, folder, runs, collect_metadata, verbose, **kwds)
    [154](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:154) # Load data if provided:
    [155](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:155) if dataframe is not None or files is not None or folder is not None or runs is not None:
--> [156](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:156)     self.load(
    [157](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:157)         dataframe=dataframe,
    [158](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:158)         metadata=metadata,
    [159](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:159)         files=files,
    [160](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:160)         folder=folder,
    [161](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:161)         runs=runs,
    [162](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:162)         collect_metadata=collect_metadata,
    [163](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:163)         **kwds,
    [164](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:164)     )

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:375, in SedProcessor.load(self, dataframe, metadata, files, folder, runs, collect_metadata, **kwds)
    [367](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:367)         dataframe, timed_dataframe, metadata = self.loader.read_dataframe(
    [368](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:368)             folders=cast(str, self.cpy(folder)),
    [369](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:369)             runs=runs,
   (...)
    [372](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:372)             **kwds,
    [373](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:373)         )
    [374](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:374)     else:
--> [375](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:375)         dataframe, timed_dataframe, metadata = self.loader.read_dataframe(
    [376](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:376)             runs=runs,
    [377](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:377)             metadata=metadata,
    [378](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:378)             collect_metadata=collect_metadata,
    [379](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:379)             **kwds,
    [380](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:380)         )
    [382](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:382) elif folder is not None:
    [383](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:383)     dataframe, timed_dataframe, metadata = self.loader.read_dataframe(
    [384](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:384)         folders=cast(str, self.cpy(folder)),
    [385](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:385)         metadata=metadata,
    [386](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:386)         collect_metadata=collect_metadata,
    [387](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:387)         **kwds,
    [388](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:388)     )

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:283, in FlashLoader.read_dataframe(self, files, folders, runs, ftype, metadata, collect_metadata, converted, load_parquet, save_parquet, detector, force_recreate, parquet_dir, debug, **kwds)
    [278](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:278) # Default behavior is to create the buffer files and load them
    [279](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:279) else:
    [280](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:280)     # Obtain the parquet filenames, metadata, and schema from the method
    [281](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:281)     # which handles buffer file creation/reading
    [282](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:282)     h5_paths = [Path(file) for file in self.files]
--> [283](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:283)     buffer = BufferHandler(
    [284](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:284)         self._config["dataframe"],
    [285](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:285)         h5_paths,
    [286](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:286)         parquet_path,
    [287](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:287)         force_recreate,
    [288](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:288)         suffix=detector,
    [289](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:289)         debug=debug,
    [290](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:290)     )
    [291](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:291)     df = buffer.dataframe_electron
    [292](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/flash/loader.py:292)     df_timed = buffer.dataframe_pulse

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:81, in BufferHandler.__init__(self, cfg_df, h5_paths, folder, force_recreate, prefix, suffix, debug, auto)
     [78](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:78) if not force_recreate:
     [79](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:79)     self.schema_check()
---> [81](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:81) self.create_buffer_files(debug)
     [83](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:83) self.get_filled_dataframe()

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:195, in BufferHandler.create_buffer_files(self, debug)
    [193](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:193)         self._create_buffer_file(h5_path, parquet_path)
    [194](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:194) else:
--> [195](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:195)     Parallel(n_jobs=self.num_files, verbose=10)(
    [196](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:196)         delayed(self._create_buffer_file)(h5_path, parquet_path)
    [197](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:197)         for h5_path, parquet_path in zip(self.h5_to_create, self.buffer_to_create)
    [198](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/loader/fel/buffer.py:198)     )

File /mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:1098, in Parallel.__call__(self, iterable)
   [1095](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:1095)     self._iterating = False
   [1097](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:1097) with self._backend.retrieval_context():
-> [1098](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:1098)     self.retrieve()
   [1099](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:1099) # Make sure that we get a last message telling us we are done
   [1100](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:1100) elapsed_time = time.time() - self._start_time

File /mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:975, in Parallel.retrieve(self)
    [973](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:973) try:
    [974](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:974)     if getattr(self._backend, 'supports_timeout', False):
--> [975](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:975)         self._output.extend(job.get(timeout=self.timeout))
    [976](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:976)     else:
    [977](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/parallel.py:977)         self._output.extend(job.get())

File /mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/_parallel_backends.py:567, in LokyBackend.wrap_future_result(future, timeout)
    [564](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/_parallel_backends.py:564) """Wrapper for Future.result to implement the same behaviour as
    [565](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/_parallel_backends.py:565) AsyncResults.get from multiprocessing."""
    [566](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/_parallel_backends.py:566) try:
--> [567](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/_parallel_backends.py:567)     return future.result(timeout=timeout)
    [568](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/_parallel_backends.py:568) except CfTimeoutError as e:
    [569](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/.pyenv/lib/python3.8/site-packages/joblib/_parallel_backends.py:569)     raise TimeoutError from e

File [~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:437](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:437), in Future.result(self, timeout)
    [435](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:435)     raise CancelledError()
    [436](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:436) elif self._state == FINISHED:
--> [437](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:437)     return self.__get_result()
    [439](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:439) self._condition.wait(timeout)
    [441](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:441) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File [~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:389](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:389), in Future.__get_result(self)
    [387](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:387) if self._exception:
    [388](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:388)     try:
--> [389](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:389)         raise self._exception
    [390](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:390)     finally:
    [391](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:391)         # Break a reference cycle with the exception in self._exception
    [392](https://vscode-remote+ssh-002dremote-002bpcr840-002egnz-002empg-002ede.vscode-resource.vscode-cdn.net/mnt/pcshare/users/Laurenz/AreaB/sed/sed/tutorial/~/.conda/envs/.pyenv/lib/python3.8/concurrent/futures/_base.py:392)         self = None

KeyError: "None of [Int64Index([      0,       1,       2,       3,       4,       5,       6,\n                  7,       8,       9,\n            ...\n            1018472, 1018473, 1018474, 1018476, 1018475, 1018477, 1018478,\n            1018479, 1018480, 1018481],\n           dtype='int64', length=1018482)] are in the [columns]"

@zain-sohail
Copy link
Member Author

I have figured a way to make reviewing slightly easier. I will put all the classes in the loader file, and not have different modules. So at least the main branch and this are easy to compare against each other.

I will address your issues in that commit

@zain-sohail zain-sohail linked an issue Mar 20, 2024 that may be closed by this pull request
@rettigl
Copy link
Member

rettigl commented Jun 13, 2024

@zain-sohail you are still working on this, can you let me know when you consider it done, so I can review a coverged version?

@zain-sohail
Copy link
Member Author

zain-sohail commented Jun 13, 2024

@zain-sohail you are still working on this, can you let me know when you consider it done, so I can review a coverged version?

There's work more on feature side than refactoring but I put some of those features here since I didn't wanna do it on older version. I'd suggest we merge this to V1 branch.

Basically now:

  • Instead of felloader, sticking to flashloader because in the current structure, loader should stay same across different usecases but Dataframe creation or Bufferhandling changes slightly.
  • Removed the loading and saving the df and df_timed to parquet after ffilling (removed parquethandler basically). This was causing to many conditional chains and complicating the data flow a lot. And the only major usecase was from wespe instrument since they use some simulation data to get the correct tof etc.
  • For that, now there's the instrument option used. This will branch out for wespe to do the data conversion, and can be structured better as we go along.
  • All important data transformations are now tracked as we do in processor.py.
  • All the buffer files (parquet format) are having footers which contain useful metadata so that's also used. Currently, it's details per file, but can be aggergated per run later.
  • elapsed time method is finally implemented with the help of using metadata
  • sectorID not saved in buffer anymore but done after loading those files
  • property to view available runs

Older changes:

  • Option to use index/dataset keys
  • a lot of testing
  • data types for columns (not sure how useful considering we usually jitter)
  • bufferhandler with schema check etc.
  • dataframecreator with concise and faster code

@zain-sohail zain-sohail changed the base branch from main to v1_feature_branch June 22, 2024 15:40
@rettigl rettigl mentioned this pull request Jun 23, 2024
12 tasks
@rettigl
Copy link
Member

rettigl commented Jun 23, 2024

Can you merge/rebase and update typing before I review?

Copy link
Member

@rettigl rettigl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems mostly fine to me. The code for generating the dataframes is indeed apparently several times faster than the old one, but I don't completely understand it. Some more and fine-grained description would be helpful.
For future tracking, a benchmark of the buffer file generation would also be great.
I made some detailed commends and questions in-between.
One thing does not seem to work for me with the new version, the saving of h5:
saving data to binned.h5

saving data to binned.h5
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[54], line 1
----> 1 sp.save('binned.h5')

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/core/processor.py:2503, in SedProcessor.save(self, faddr, **kwds)
   2497     to_tiff(
   2498         data=data,
   2499         faddr=faddr,
   2500         **kwds,
   2501     )
   2502 elif extension in (".h5", ".hdf5"):
-> 2503     to_h5(
   2504         data=data,
   2505         faddr=faddr,
   2506         **kwds,
   2507     )
   2508 elif extension in (".nxs", ".nexus"):
   2509     try:

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/io/hdf5.py:128, in to_h5(data, faddr, mode)
    122     if "metadata" in data.attrs and isinstance(
    123         data.attrs["metadata"],
    124         dict,
    125     ):
    126         meta_group = h5_file.create_group("metadata")
--> 128         recursive_write_metadata(meta_group, data.attrs["metadata"])
    130 print("Saving complete!")

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/io/hdf5.py:44, in recursive_write_metadata(h5group, node)
     42 elif isinstance(item, dict):
     43     group = h5group.create_group(key)
---> 44     recursive_write_metadata(group, item)
     45 else:
     46     try:

File /mnt/pcshare/users/Laurenz/AreaB/sed/sed/sed/io/hdf5.py:43, in recursive_write_metadata(h5group, node)
     41         print(f"Saved {key} as string.")
     42 elif isinstance(item, dict):
---> 43     group = h5group.create_group(key)
     44     recursive_write_metadata(group, item)
     45 else:

File /mnt/pcshare/users/Laurenz/AreaB/sed/poetry_envs/virtualenvs/sed-processor-3qnpZCFI-py3.9/lib/python3.9/site-packages/h5py/_hl/group.py:62, in Group.create_group(self, name, track_order)
     59     track_order = h5.get_config().track_order
     61 with phil:
---> 62     name, lcpl = self._e(name, lcpl=True)
     63     gcpl = Group._gcpl_crt_order if track_order else None
     64     gid = h5g.create(self.id, name, lcpl=lcpl, gcpl=gcpl)

File /mnt/pcshare/users/Laurenz/AreaB/sed/poetry_envs/virtualenvs/sed-processor-3qnpZCFI-py3.9/lib/python3.9/site-packages/h5py/_hl/base.py:206, in CommonStateObject._e(self, name, lcpl)
    204         coding = h5t.CSET_UTF8
    205 else:
--> 206     raise TypeError(f"A name should be string or bytes, not {type(name)}")
    208 if lcpl:
    209     return name, get_lcpl(coding)

TypeError: A name should be string or bytes, not <class 'int'>

The reason is int-type keys in the parquet metadata:
image

sed/loader/flash/buffer_handler.py Show resolved Hide resolved
sed/loader/flash/buffer_handler.py Outdated Show resolved Hide resolved
sed/loader/flash/buffer_handler.py Outdated Show resolved Hide resolved
sed/loader/flash/buffer_handler.py Show resolved Hide resolved
sed/loader/flash/buffer_handler.py Show resolved Hide resolved
tests/loader/flash/test_dataframe_creator.py Outdated Show resolved Hide resolved
tests/loader/flash/test_dataframe_creator.py Outdated Show resolved Hide resolved
tests/loader/flash/test_flash_loader.py Outdated Show resolved Hide resolved
tests/loader/flash/test_flash_loader.py Show resolved Hide resolved
tests/loader/flash/test_utils.py Outdated Show resolved Hide resolved
@rettigl
Copy link
Member

rettigl commented Jun 26, 2024

I'm also often getting these kind of warnings:
image

@zain-sohail
Copy link
Member Author

I'm also often getting these kind of warnings: image

So far I never got such errors. But maybe it's a memory issue? as I mostly work on maxwell

@rettigl
Copy link
Member

rettigl commented Jun 29, 2024

I'm also often getting these kind of warnings: image

So far I never got such errors. But maybe it's a memory issue? as I mostly work on maxwell

It seems more to be related to some bottleneck while creating the processes. Once the jobs start running, these warnings stop.

Copy link
Member

@rettigl rettigl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM. Some small comments only left.
The time_stamps are tuples, so they are being saved as str. We could either convert them to to an array, or split them into two channels before saving.

tests/loader/flash/test_flash_loader.py Outdated Show resolved Hide resolved
tests/test_processor.py Outdated Show resolved Hide resolved
@zain-sohail zain-sohail merged commit f5b9148 into v1_feature_branch Jun 30, 2024
5 checks passed
@zain-sohail zain-sohail deleted the refactor-flashloader branch June 30, 2024 16:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add dtype in loaders
3 participants