Skip to content

Commit

Permalink
Close sessions properly after usage
Browse files Browse the repository at this point in the history
The session created during initialisation of the container was never
properly closed. This unclosed session was until py3.12 garbage
collected since it was unreferenced. With py3.13 the sessions however
are not anymore garbage collected and thus remain open. Resulting in
an open file descriptors of the `pack.idx` for each initialisation of
the container.

This commit fixes it by keeping track of the session that initialises
the container `_container_session`. We adapt the name `_session` to
`_operation_session` for a clearer distinguishment between the two
session types.
  • Loading branch information
agoscinski committed Feb 4, 2025
1 parent 73f14e3 commit 2ecde9c
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 80 deletions.
128 changes: 71 additions & 57 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, overload

from sqlalchemy.engine import Connection, Engine
from sqlalchemy.orm.session import Session
from sqlalchemy.sql import func
from sqlalchemy.sql.expression import delete, select, text, update
Expand Down Expand Up @@ -119,8 +120,13 @@ def __init__(self, folder: str | Path) -> None:
:param folder: the path to a folder that will host this object-store container.
"""
self._folder = Path(folder).resolve()
# Will be populated by the _get_session function
self._session: Session | None = None
# This session is used for to send read/write operations to the
# database. It can be reused but also closed anytime the operation has
# finished.
self._operation_session: Session | None = None
# This session is alive after initialisation and will be only closed
# when the container is closed.
self._container_session: Session | None = None

# These act as caches and will be populated by the corresponding properties
# IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`!
Expand All @@ -133,9 +139,25 @@ def get_folder(self) -> Path:

def close(self) -> None:
"""Close open files (in particular, the connection to the SQLite DB)."""
if self._session is not None:
self._session.close()
self._session = None
if self._operation_session is not None:
binding = self._operation_session.bind
self._operation_session.close()
if isinstance(binding, Engine):
binding.dispose()
elif isinstance(binding, Connection):
binding.invalidate()
binding.close()

Check warning on line 149 in disk_objectstore/container.py

View check run for this annotation

Codecov / codecov/patch

disk_objectstore/container.py#L147-L149

Added lines #L147 - L149 were not covered by tests
self._operation_session = None

if self._container_session is not None:
binding = self._container_session.bind
self._container_session.close()
if isinstance(binding, Engine):
binding.dispose()
elif isinstance(binding, Connection):
binding.invalidate()
binding.close()

Check warning on line 159 in disk_objectstore/container.py

View check run for this annotation

Codecov / codecov/patch

disk_objectstore/container.py#L157-L159

Added lines #L157 - L159 were not covered by tests
self._container_session = None

def __enter__(self) -> Container:
"""Return a context manager that will close the session when exiting the context."""
Expand Down Expand Up @@ -180,42 +202,29 @@ def _get_config_file(self) -> Path:
"""Return the path to the container config file."""
return self._folder / "config.json"

@overload
def _get_session(
self, create: bool = False, raise_if_missing: Literal[True] = True
) -> Session:
...
def _get_container_session(self) -> Session:
"""Return the container session to connect to the pack-index SQLite DB.
@overload
def _get_session(
self, create: bool = False, raise_if_missing: Literal[False] = False
) -> Session | None:
...
This session should not be closed until the container has been closed.
"""
if self._container_session is None:
self._container_session = get_session(
self._get_pack_index_path(),
create=True,
)
return self._container_session

def _get_session(
self, create: bool = False, raise_if_missing: bool = False
) -> Session | None:
"""Return a new session to connect to the pack-index SQLite DB.
def _get_operation_session(self) -> Session:
"""Return an operation session to access the SQLite file.
:param create: if True, creates the sqlite file and schema.
:param raise_if_missing: ignored if create==True. If create==False, and the index file
is missing, either raise an exception (FileNotFoundError) if this flag is True, or return None
This session can be reused if not closed.
"""
return get_session(
self._get_pack_index_path(),
create=create,
raise_if_missing=raise_if_missing,
)

def _get_cached_session(self) -> Session:
"""Return the SQLAlchemy session to access the SQLite file,
reusing the same one."""
# We want to catch both if it's missing, and if it's None
# the latter means that in the previous run the pack file was missing
# but maybe by now it has been created!
if self._session is None:
self._session = self._get_session(create=False, raise_if_missing=True)
return self._session
if self._operation_session is None:
self._operation_session = get_session(
self._get_pack_index_path(),
create=False,
)
return self._operation_session

def _get_loose_path_from_hashkey(self, hashkey: str) -> Path:
"""Return the path of a loose object on disk containing the data of a given hash key.
Expand Down Expand Up @@ -332,6 +341,7 @@ def init_container(
raise ValueError(f'Unknown hash type "{hash_type}"')

if clear:
self.close()
if self._folder.exists():
shutil.rmtree(self._folder)

Expand Down Expand Up @@ -391,7 +401,7 @@ def init_container(
]:
os.makedirs(folder)

self._get_session(create=True)
self._get_container_session()

def _get_repository_config(self) -> dict[str, int | str]:
"""Return the repository config."""
Expand Down Expand Up @@ -584,7 +594,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
# Currently ordering in the DB (it's ordered across all packs, but this should not be
# a problem as we then split them by pack). To be checked, performance-wise, if it's better
# to order in python instead
session = self._get_cached_session()
session = self._get_operation_session()

obj_reader: StreamReadBytesType

Expand Down Expand Up @@ -729,18 +739,18 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
# If they are not, the object does not exist.
if loose_not_found:
# IMPORTANT. I need to close the session (and flush the
# self._session cache) to refresh the DB, otherwise since I am
# self._operation_session cache) to refresh the DB, otherwise since I am
# reading in WAL mode, I will be keeping to read from the "old"
# state of the DB.
# Note that this is an expensive operation!
# This means that asking for non-existing objects will be
# slow.
if self._session is not None:
self._session.close()
self._session = None
if self._operation_session is not None:
self._operation_session.close()
self._operation_session = None

packs = defaultdict(list)
session = self._get_cached_session()
session = self._get_operation_session()
if len(loose_not_found) <= self._MAX_CHUNK_ITERATE_LENGTH:
for chunk in chunk_iterator(
loose_not_found, size=self._IN_SQL_MAX_LENGTH
Expand Down Expand Up @@ -1069,7 +1079,7 @@ def count_objects(self) -> ObjectCount:
In particular, it returns the number of loose objects,
of packed objects, and the number of pack files."""

number_packed = self._get_cached_session().scalar(
number_packed = self._get_operation_session().scalar(
select(func.count()).select_from(Obj)
)
return ObjectCount(
Expand Down Expand Up @@ -1122,7 +1132,7 @@ def get_total_size(self) -> TotalSize:
"""
retval = {}

session = self._get_cached_session()
session = self._get_operation_session()
# COALESCE is used to return 0 if there are no results, rather than None
# SQL's COALESCE returns the first non-null result
retval["total_size_packed"] = session.scalar(
Expand Down Expand Up @@ -1227,7 +1237,7 @@ def list_all_objects(self) -> Iterator[str]:
loose_objects = set(self._list_loose())

# Let us initialise a session
session = self._get_cached_session()
session = self._get_operation_session()

# This variable stored the last PK that we saw. We are assuming that PKs are always positive integers.
# NOTE: We don't use limit+offset, but a filter on the last PK being > than the last PK seen.
Expand Down Expand Up @@ -1368,7 +1378,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man

loose_objects = set(self._list_loose())
pack_int_id = self._get_pack_id_to_write_to()
session = self._get_cached_session()
session = self._get_operation_session()

# I first skip all loose hashkeys that already exist in the pack.
# Packing should be performed by a single process at a given time as a
Expand Down Expand Up @@ -1640,7 +1650,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b
# without affecting the original list, and it's from the end so it's fast
working_stream_list = list(stream_list[::-1])
pack_int_id = self._get_pack_id_to_write_to()
session = self._get_cached_session()
session = self._get_operation_session()

if no_holes:
if callback:
Expand Down Expand Up @@ -1916,6 +1926,10 @@ def add_objects_to_pack( # pylint: disable=too-many-arguments
:return: a list of object hash keys
"""
if not self.is_initialised:
raise ValueError(

Check warning on line 1930 in disk_objectstore/container.py

View check run for this annotation

Codecov / codecov/patch

disk_objectstore/container.py#L1930

Added line #L1930 was not covered by tests
"Invalid use of function, please first initialise the container."
)
stream_list: list[StreamSeekBytesType] = [
io.BytesIO(content) for content in content_list
]
Expand Down Expand Up @@ -1981,7 +1995,7 @@ def _vacuum(self) -> None:
"""
# VACUUM cannot be performed from within a transaction
# see: https://github.com/sqlalchemy/sqlalchemy/discussions/6959
session = self._get_cached_session()
session = self._get_operation_session()
session.execute(text("COMMIT"))
session.execute(text("VACUUM"))
# ensure sqlalchemy knows to open a new transaction for the next execution
Expand Down Expand Up @@ -2074,7 +2088,7 @@ def clean_storage( # pylint: disable=too-many-branches,too-many-locals
# Force reload of the session to get the most up-to-date packed objects
self.close()

session = self._get_cached_session()
session = self._get_operation_session()
# I search now for all loose hash keys that exist also in the packs
existing_packed_hashkeys = []
if len(loose_objects) <= self._MAX_CHUNK_ITERATE_LENGTH:
Expand Down Expand Up @@ -2169,7 +2183,7 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m
# see issue #94.
# NOTE: I need to wrap in the `yield_first_element` iterator since it returns a list of lists
sorted_packed = yield_first_element(
self._get_cached_session().execute(
self._get_operation_session().execute(
text("SELECT hashkey FROM db_object ORDER BY hashkey")
)
)
Expand Down Expand Up @@ -2331,7 +2345,7 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m

# Since I called the `add_objects_to_pack` without committing (gives a boost for performance),
# I need now to commit to save what I've been doing.
self._get_cached_session().commit()
self._get_operation_session().commit()

return old_new_obj_hashkey_mapping

Expand Down Expand Up @@ -2406,7 +2420,7 @@ def callback(self, action, value):
invalid_sizes = []
overlapping = []

session = self._get_cached_session()
session = self._get_operation_session()

if callback:
# If we have a callback, compute the total count of objects in this pack
Expand Down Expand Up @@ -2511,7 +2525,7 @@ def validate(self, callback: Callable | None = None) -> ValidationIssues:
if callback:
callback(action="close", value=None)

session = self._get_cached_session()
session = self._get_operation_session()

all_pack_ids = sorted(
{res[0] for res in session.execute(select(Obj.pack_id).distinct())}
Expand Down Expand Up @@ -2585,7 +2599,7 @@ def delete_objects(self, hashkeys: list[str]) -> list[str | Any]:
# No loose object: it's OK
pass

session = self._get_cached_session()
session = self._get_operation_session()

# Operate in chunks, due to the SQLite limits
# (see comment above the definition of self._IN_SQL_MAX_LENGTH)
Expand Down Expand Up @@ -2676,7 +2690,7 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements,too-ma
self._REPACK_PACK_ID, allow_repack_pack=True
).exists(), f"The repack pack '{self._REPACK_PACK_ID}' already exists, probably a previous repacking aborted?"

session = self._get_cached_session()
session = self._get_operation_session()

one_object_in_pack = session.execute(
select(Obj.id).where(Obj.pack_id == pack_id).limit(1)
Expand Down
11 changes: 2 additions & 9 deletions disk_objectstore/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Models for the container index file (SQLite DB)."""
from pathlib import Path
from typing import Optional

from sqlalchemy import Boolean, Column, Integer, String, create_engine, event
from sqlalchemy.orm import declarative_base, sessionmaker
Expand Down Expand Up @@ -31,19 +30,13 @@ class Obj(Base): # pylint: disable=too-few-public-methods
) # integer ID of the pack in which this entry is stored


def get_session(
path: Path, create: bool = False, raise_if_missing: bool = False
) -> Optional[Session]:
def get_session(path: Path, create: bool = False) -> Session:
"""Return a new session to connect to the pack-index SQLite DB.
:param create: if True, creates the sqlite file and schema.
:param raise_if_missing: ignored if create==True. If create==False, and the index file
is missing, either raise an exception (FileNotFoundError) if this flag is True, or return None
"""
if not create and not path.exists():
if raise_if_missing:
raise FileNotFoundError("Pack index does not exist")
return None
raise FileNotFoundError("Pack index does not exist")

engine = create_engine(f"sqlite:///{path}", future=True)

Expand Down
2 changes: 1 addition & 1 deletion tests/concurrent_tests/periodic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def main(
)
print(f"Exists Loose: {loose_path.exists()}")
session = (
container._get_cached_session() # pylint: disable=protected-access
container._get_operation_session() # pylint: disable=protected-access
)
stmt = select(
Obj.pack_id,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def test_backup(temp_container, temp_dir, remote, verbosity):
if verbosity:
args += [f"--verbosity={verbosity}"]

result = CliRunner().invoke(cli.backup, args, obj=obj)
result = CliRunner().invoke(cli.backup, args, obj=obj, catch_exceptions=False)

assert result.exit_code == 0
assert path.exists()
Expand Down
Loading

0 comments on commit 2ecde9c

Please sign in to comment.