Skip to content

Commit

Permalink
Hack stream-to-packed
Browse files Browse the repository at this point in the history
  • Loading branch information
GeigerJ2 committed May 17, 2024
1 parent be0db3c commit 297d87f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 2 deletions.
49 changes: 49 additions & 0 deletions src/aiida/cmdline/commands/cmd_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,52 @@ def profile_delete(force, delete_data, profiles):

get_config().delete_profile(profile.name, delete_storage=delete_data)
echo.echo_success(f'Profile `{profile.name}` was deleted.')

@verdi_profile.command('flush')
@arguments.PROFILES(required=True)
@options.FORCE(help='Skip any prompts for confirmation.')
@click.option(
'--dry-run',
is_flag=True,
help='Run the maintenance in dry-run mode to print actions that would be taken without actually executing them.',
)
def profile_flush(force, profiles, dry_run):
"""Delete data of one or more profiles.
The PROFILES argument takes one or multiple profile names of which the storage will be deleted.
"""

from aiida.common.exceptions import LockingProfileError
from aiida.manage.manager import get_manager
from aiida.orm import Group, Node, QueryBuilder
from aiida.plugins import StorageFactory
from aiida.tools import delete_nodes

manager = get_manager()
storage = manager.get_profile_storage()

for profile in profiles:
if not force:
echo.echo_warning('This operation cannot be undone, are you sure you want to continue?', nl=False)

if not force and not click.confirm(''):
echo.echo_report(f'Deleting of `{profile.name}` cancelled.')
continue

else:

# Delete nodes
qb = QueryBuilder()
qb.append(Node)
nodes = qb.all()
node_ids = [node[0].pk for node in nodes]
delete_nodes(node_ids, dry_run=dry_run)

# Delete groups
groups = Group.collection.all()
for group in groups:
Group.collection.delete(group.pk)

storage.maintain(full=True, dry_run=False, compress=dry_run)

# Users and Computers?
2 changes: 1 addition & 1 deletion src/aiida/repository/backend/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def put_object_from_filelike(self, handle: BinaryIO) -> str:
and not self.is_readable_byte_stream(handle)
):
raise TypeError(f'handle does not seem to be a byte stream: {type(handle)}.')
return self._put_object_from_filelike(handle)
return self._put_object_from_filelike_packed(handle)

@abc.abstractmethod
def _put_object_from_filelike(self, handle: BinaryIO) -> str:
Expand Down
11 changes: 11 additions & 0 deletions src/aiida/repository/backend/disk_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,20 @@ def _put_object_from_filelike(self, handle: t.BinaryIO) -> str:
:return: the generated fully qualified identifier for the object within the repository.
:raises TypeError: if the handle is not a byte stream.
"""
raise SystemExit("using normal from_filelike")
with self._container as container:
return container.add_streamed_object(handle)

def _put_object_from_filelike_packed(self, handle: t.BinaryIO) -> str:
"""Store the byte contents of a file in the repository.
:param handle: filelike object with the byte content to be stored.
:return: the generated fully qualified identifier for the object within the repository.
:raises TypeError: if the handle is not a byte stream.
"""
with self._container as container:
return container.add_streamed_object_to_pack(handle)

def has_objects(self, keys: t.List[str]) -> t.List[bool]:
with self._container as container:
return container.has_objects(keys)
Expand Down
28 changes: 27 additions & 1 deletion src/aiida/tools/archive/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,17 @@ def import_archive(
)
new_repo_keys = _get_new_object_keys(archive_format.key_format, backend_from, backend, query_params)

# IMPORT_LOGGER.report('HELLO')
if test_run:
# exit before we write anything to the database or repository
raise ImportTestRun('test run complete')

# now the transaction has been successfully populated, but not committed, we add the repository files
# if the commit fails, this is not so much an issue, since the files can be removed on repo maintenance
_add_files_to_repo(backend_from, backend, new_repo_keys)
# _add_files_to_repo(backend_from, backend, new_repo_keys)
IMPORT_LOGGER.report('Using: _add_files_to_repo_packed')

_add_files_to_repo_packed(backend_from, backend, new_repo_keys)

IMPORT_LOGGER.report('Committing transaction to database...')

Expand Down Expand Up @@ -1183,3 +1187,25 @@ def _add_files_to_repo(backend_from: StorageBackend, backend_to: StorageBackend,
f'Archive repository key is different to backend key: {key!r} != {backend_key!r}'
)
progress.update()

# This is probably not having any effect here, instead, I defined _put_object_from_filelike_packed in
# AbstractRepositoryBackend
def _add_files_to_repo_packed(backend_from: StorageBackend, backend_to: StorageBackend, new_keys: Set[str]) -> None:
"""Add the new files to the repository."""
if not new_keys:
raise Exception("No new keys.")
return None

repository_to = backend_to.get_repository()
repository_from = backend_from.get_repository()
# import sys
# raise Exception(f"from: {type(repository_from)} | to: {type(repository_to)}")
# E.g. from: ZipfileBackendRepository to DiskObjectStoreRepositoryBackend
with get_progress_reporter()(desc='Adding archive files to repository', total=len(new_keys)) as progress:
for key, handle in repository_from.iter_object_streams(new_keys): # type: ignore[arg-type]
backend_key = repository_to.put_object_from_filelike(handle)
if backend_key != key:
raise ImportValidationError(
f'Archive repository key is different to backend key: {key!r} != {backend_key!r}'
)
progress.update()

0 comments on commit 297d87f

Please sign in to comment.