Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 FIX: Ensure Container DB always closed after access #5123

Merged
merged 4 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 100 additions & 85 deletions aiida/repository/backend/disk_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Implementation of the ``AbstractRepositoryBackend`` using the ``disk-objectstore`` as the backend."""
import contextlib
import shutil
from typing import BinaryIO, Iterable, Iterator, List, Optional, Tuple
import typing as t

from disk_objectstore import Container

Expand All @@ -16,66 +16,79 @@


class DiskObjectStoreRepositoryBackend(AbstractRepositoryBackend):
"""Implementation of the ``AbstractRepositoryBackend`` using the ``disk-object-store`` as the backend."""
"""Implementation of the ``AbstractRepositoryBackend`` using the ``disk-object-store`` as the backend.

def __init__(self, container):
.. note:: For certain methods, the container may create a sessions which should be closed after the operation is
done to make sure the connection to the underlying sqlite database is closed. The best way is to accomplish this
is by using the container as a context manager, which will automatically call the ``close`` method when it exits
which ensures the session being closed. Note that not all methods may open the session and so need closing it,
but to be on the safe side, we put every use of the container in a context manager. If no session is created,
the ``close`` method is essentially a no-op.

"""

def __init__(self, container: Container):
type_check(container, Container)
self._container = container

def __str__(self) -> str:
"""Return the string representation of this repository."""
if self.is_initialised:
return f'DiskObjectStoreRepository: {self.container.container_id} | {self.container.get_folder()}'
with self._container as container:
return f'DiskObjectStoreRepository: {container.container_id} | {container.get_folder()}'
return 'DiskObjectStoreRepository: <uninitialised>'

@property
def uuid(self) -> Optional[str]:
def uuid(self) -> t.Optional[str]:
"""Return the unique identifier of the repository."""
if not self.is_initialised:
return None
return self.container.container_id
with self._container as container:
return container.container_id

@property
def key_format(self) -> Optional[str]:
return self.container.hash_type
def key_format(self) -> t.Optional[str]:
with self._container as container:
return container.hash_type

def initialise(self, **kwargs) -> None:
"""Initialise the repository if it hasn't already been initialised.

:param kwargs: parameters for the initialisation.
"""
self.container.init_container(**kwargs)
with self._container as container:
container.init_container(**kwargs)

@property
def is_initialised(self) -> bool:
"""Return whether the repository has been initialised."""
return self.container.is_initialised

@property
def container(self) -> Container:
return self._container
with self._container as container:
return container.is_initialised

def erase(self):
"""Delete the repository itself and all its contents."""
try:
shutil.rmtree(self.container.get_folder())
with self._container as container:
shutil.rmtree(container.get_folder())
except FileNotFoundError:
pass

def _put_object_from_filelike(self, handle: BinaryIO) -> str:
def _put_object_from_filelike(self, handle: t.BinaryIO) -> str:
"""Store the byte contents of a file in the repository.

:param handle: filelike object with the byte content to be stored.
:return: the generated fully qualified identifier for the object within the repository.
:raises TypeError: if the handle is not a byte stream.
"""
return self.container.add_streamed_object(handle)
with self._container as container:
return container.add_streamed_object(handle)

def has_objects(self, keys: List[str]) -> List[bool]:
return self.container.has_objects(keys)
def has_objects(self, keys: t.List[str]) -> t.List[bool]:
with self._container as container:
return container.has_objects(keys)

@contextlib.contextmanager
def open(self, key: str) -> Iterator[BinaryIO]:
def open(self, key: str) -> t.Iterator[t.BinaryIO]:
"""Open a file handle to an object stored under the given key.

.. note:: this should only be used to open a handle to read an existing file. To write a new file use the method
Expand All @@ -88,21 +101,24 @@ def open(self, key: str) -> Iterator[BinaryIO]:
"""
super().open(key)

with self.container.get_object_stream(key) as handle:
yield handle # type: ignore[misc]
with self._container as container:
with container.get_object_stream(key) as handle:
yield handle # type: ignore[misc]

def iter_object_streams(self, keys: List[str]) -> Iterator[Tuple[str, BinaryIO]]:
with self.container.get_objects_stream_and_meta(keys) as triplets:
def iter_object_streams(self, keys: t.List[str]) -> t.Iterator[t.Tuple[str, t.BinaryIO]]:
with self._container.get_objects_stream_and_meta(keys) as triplets:
for key, stream, _ in triplets:
assert stream is not None
yield key, stream # type: ignore[misc]

def delete_objects(self, keys: List[str]) -> None:
def delete_objects(self, keys: t.List[str]) -> None:
super().delete_objects(keys)
self.container.delete_objects(keys)
with self._container as container:
container.delete_objects(keys)

def list_objects(self) -> Iterable[str]:
return self.container.list_all_objects()
def list_objects(self) -> t.Iterable[str]:
with self._container as container:
return container.list_all_objects()

def get_object_hash(self, key: str) -> str:
"""Return the SHA-256 hash of an object stored under the given key.
Expand All @@ -117,11 +133,12 @@ def get_object_hash(self, key: str) -> str:
"""
if not self.has_object(key):
raise FileNotFoundError(key)
if self.container.hash_type != 'sha256':
return super().get_object_hash(key)
with self._container as container:
if container.hash_type != 'sha256':
return super().get_object_hash(key)
return key

def maintain( # type: ignore # pylint: disable=arguments-differ,too-many-branches
def maintain( # type: ignore[override] # pylint: disable=arguments-differ,too-many-branches
self,
dry_run: bool = False,
live: bool = True,
Expand All @@ -132,21 +149,16 @@ def maintain( # type: ignore # pylint: disable=arguments-differ,too-many-branch
) -> dict:
"""Performs maintenance operations.

:param live:
if True, will only perform operations that are safe to do while the repository is in use.
:param pack_loose:
flag for forcing the packing of loose files.
:param do_repack:
flag for forcing the re-packing of already packed files.
:param clean_storage:
flag for forcing the cleaning of soft-deleted files from the repository.
:param do_vacuum:
flag for forcing the vacuuming of the internal database when cleaning the repository.
:return:
a dictionary with information on the operations performed.
:param live:if True, will only perform operations that are safe to do while the repository is in use.
:param pack_loose:flag for forcing the packing of loose files.
:param do_repack:flag for forcing the re-packing of already packed files.
:param clean_storage:flag for forcing the cleaning of soft-deleted files from the repository.
:param do_vacuum:flag for forcing the vacuuming of the internal database when cleaning the repository.
:return:a dictionary with information on the operations performed.
"""
from aiida.backends.control import MAINTAIN_LOGGER
DOSTORE_LOGGER = MAINTAIN_LOGGER.getChild('disk_object_store') # pylint: disable=invalid-name

logger = MAINTAIN_LOGGER.getChild('disk_object_store')

if live and (do_repack or clean_storage or do_vacuum):
overrides = {'do_repack': do_repack, 'clean_storage': clean_storage, 'do_vacuum': do_vacuum}
Expand All @@ -164,52 +176,55 @@ def maintain( # type: ignore # pylint: disable=arguments-differ,too-many-branch
clean_storage = True if clean_storage is None else clean_storage
do_vacuum = True if do_vacuum is None else do_vacuum

if pack_loose:
files_numb = self.container.count_objects()['loose']
files_size = self.container.get_total_size()['total_size_loose'] * BYTES_TO_MB
DOSTORE_LOGGER.report(f'Packing all loose files ({files_numb} files occupying {files_size} MB) ...')
if not dry_run:
self.container.pack_all_loose()

if do_repack:
files_numb = self.container.count_objects()['packed']
files_size = self.container.get_total_size()['total_size_packfiles_on_disk'] * BYTES_TO_MB
DOSTORE_LOGGER.report(
f'Re-packing all pack files ({files_numb} files in packs, occupying {files_size} MB) ...'
)
if not dry_run:
self.container.repack()

if clean_storage:
DOSTORE_LOGGER.report(f'Cleaning the repository database (with `vacuum={do_vacuum}`) ...')
if not dry_run:
self.container.clean_storage(vacuum=do_vacuum)


def get_info( # type: ignore # pylint: disable=arguments-differ
with self._container as container:
if pack_loose:
files_numb = container.count_objects()['loose']
files_size = container.get_total_size()['total_size_loose'] * BYTES_TO_MB
logger.report(f'Packing all loose files ({files_numb} files occupying {files_size} MB) ...')
if not dry_run:
container.pack_all_loose()

if do_repack:
files_numb = container.count_objects()['packed']
files_size = container.get_total_size()['total_size_packfiles_on_disk'] * BYTES_TO_MB
logger.report(f'Re-packing all pack files ({files_numb} files in packs, occupying {files_size} MB) ...')
if not dry_run:
container.repack()

if clean_storage:
logger.report(f'Cleaning the repository database (with `vacuum={do_vacuum}`) ...')
if not dry_run:
container.clean_storage(vacuum=do_vacuum)


def get_info( # type: ignore[override] # pylint: disable=arguments-differ
self,
statistics=False,
) -> dict:
) -> t.Dict[str, t.Union[int, str, t.Dict[str, int], t.Dict[str, float]]]:
"""Return information on configuration and content of the repository."""
output_info: t.Dict[str, t.Union[int, str, t.Dict[str, int], t.Dict[str, float]]] = {}

with self._container as container:
output_info['SHA-hash algorithm'] = container.hash_type
output_info['Compression algorithm'] = container.compression_algorithm

if not statistics:
return output_info

output_info = {}
output_info['SHA-hash algorithm'] = self.container.hash_type
output_info['Compression algorithm'] = self.container.compression_algorithm
files_data = container.count_objects()
size_data = container.get_total_size()

if not statistics:
return output_info
output_info['Packs'] = files_data['pack_files']

files_data = self.container.count_objects()
size_data = self.container.get_total_size()
output_info['Objects'] = {
'unpacked': files_data['loose'],
'packed': files_data['packed'],
}

output_info['Packs'] = files_data['pack_files'] # type: ignore
output_info['Size (MB)'] = {
'unpacked': size_data['total_size_loose'] * BYTES_TO_MB,
'packed': size_data['total_size_packfiles_on_disk'] * BYTES_TO_MB,
'other': size_data['total_size_packindexes_on_disk'] * BYTES_TO_MB,
}

output_info['Objects'] = { # type: ignore
'unpacked': files_data['loose'],
'packed': files_data['packed'],
}
output_info['Size (MB)'] = { # type: ignore
'unpacked': size_data['total_size_loose'] * BYTES_TO_MB,
'packed': size_data['total_size_packfiles_on_disk'] * BYTES_TO_MB,
'other': size_data['total_size_packindexes_on_disk'] * BYTES_TO_MB,
}
return output_info
2 changes: 1 addition & 1 deletion docs/source/intro/about.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ AiiDA is built to support and streamline the four core pillars of the ADES model
* **Data provenance:** AiiDA automatically tracks and records inputs, outputs and metadata of all calculations and workflows in extensive provenance graphs that preserve the full lineage of all data.
* **Advanced queries:** AiiDA's query language enables fast graph queries on millions of nodes.
* **Plugin interface:** AiiDA can support via plugins any computational code and data analytics tool, data type, scheduler, connection mode, etc. (see `public plugin repository <https://aiidateam.github.io/aiida-registry/>`__).
* **HPC interface:** AiiDA can seamlessly deal with heterogeneous and remote computing resources; it works with many schedulers out of the box (`SLURM <https://slurm.schedmd.com>`__, `PBS Pro <https://www.pbspro.org/>`__, `torque <http://www.adaptivecomputing.com/products/torque/>`__, `SGE <http://gridscheduler.sourceforge.net/>`__ or `LSF <https://www.ibm.com/support/knowledgecenter/SSETD4/product_welcome_platform_lsf.html>`__).
* **HPC interface:** AiiDA can seamlessly deal with heterogeneous and remote computing resources; it works with many schedulers out of the box (`SLURM <https://slurm.schedmd.com>`__, `PBS Pro <https://www.pbspro.org/>`__, `torque <http://www.adaptivecomputing.com/products/torque/>`__, `SGE <http://gridscheduler.sourceforge.net/>`__ or `LSF <https://www.ibm.com/docs/en/spectrum-lsf>`__).
* **Open science:** AiiDA allows to export both full databases and selected subsets, to be shared with collaborators or made available and browsable online on the `Archive <https://archive.materialscloud.org/>`__ and `Explore <https://www.materialscloud.org/explore>`__ sections of `Materials Cloud <https://www.materialscloud.org>`__.
* **Open source:** AiiDA is released under the `MIT open-source license <https://github.com/aiidateam/aiida-core/blob/develop/LICENSE.txt>`__.

Expand Down
12 changes: 2 additions & 10 deletions tests/orm/nodes/data/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for array related functions."""
"""Tests for the :mod:`aiida.orm.nodes.data.array.array` module."""
import numpy
import pytest

from aiida.manage.manager import get_manager
from aiida.orm import ArrayData, load_node


@pytest.mark.usefixtures('clear_database_before_test')
def test_read_stored():
"""Test the `parse_formula` utility function."""
"""Test reading an array from an ``ArrayData`` after storing and loading it."""
array = numpy.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
node = ArrayData()
node.set_array('array', array)
Expand All @@ -29,10 +28,3 @@ def test_read_stored():

loaded = load_node(node.uuid)
assert numpy.array_equal(loaded.get_array('array'), array)

# Now pack all the files in the repository
container = get_manager().get_backend().get_repository().container
container.pack_all_loose()

loaded = load_node(node.uuid)
assert numpy.array_equal(loaded.get_array('array'), array)
6 changes: 3 additions & 3 deletions tests/repository/backend/test_disk_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def test_erase(repository, generate_directory):

assert repository.has_object(key)

dirpath = pathlib.Path(repository.container.get_folder())
dirpath = pathlib.Path(repository._container.get_folder()) # pylint: disable=protected-access
repository.erase()

assert not dirpath.exists()
Expand Down Expand Up @@ -207,7 +207,7 @@ def test_list_objects(repository, generate_directory):
def test_key_format(repository):
"""Test the ``key_format`` property."""
repository.initialise()
assert repository.key_format == repository.container.hash_type
assert repository.key_format == repository._container.hash_type # pylint: disable=protected-access


def test_get_info(populated_repository):
Expand Down Expand Up @@ -268,7 +268,7 @@ def test_get_info(populated_repository):
def test_maintain(populated_repository, kwargs, output_info):
"""Test the ``maintain`` method."""
populated_repository.maintain(**kwargs)
file_info = populated_repository.container.count_objects()
file_info = populated_repository._container.count_objects() # pylint: disable=protected-access
assert file_info['loose'] == output_info['unpacked']
assert file_info['packed'] == output_info['packed']

Expand Down
9 changes: 6 additions & 3 deletions tests/tools/archive/test_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
import io
import os

import pytest

from aiida import orm
from aiida.tools.archive import create_archive, import_archive


@pytest.mark.usefixtures('clear_database_before_test')
def test_export_repository(aiida_profile, tmp_path):
"""Test exporting a node with files in the repository."""
from aiida.manage.manager import get_manager

aiida_profile.reset_db()
repository = get_manager().get_backend().get_repository()

node = orm.Data()
node.put_object_from_filelike(io.BytesIO(b'file_a'), 'file_a')
Expand All @@ -32,8 +35,8 @@ def test_export_repository(aiida_profile, tmp_path):
create_archive([node], filename=filepath)

aiida_profile.reset_db()
container = get_manager().get_backend().get_repository().container
container.init_container(clear=True)
repository.erase()
repository.initialise()
import_archive(filepath)

loaded = orm.load_node(uuid=node_uuid)
Expand Down