From 8b10190d32ba5cd93c876ac216bd179d17c19df8 Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 18 Apr 2024 15:27:53 -0300 Subject: [PATCH] Fix thread safety issue in patron_activity (PP-1155) (#1793) * Make patron_activity more thread safe. * Move PatronActivityThread out * Fix tests --- api/circulation.py | 142 +++++++++++++++++++------------ tests/api/test_bibliotheca.py | 16 +++- tests/api/test_circulationapi.py | 21 +++-- tests/api/test_overdrive.py | 45 +++++----- 4 files changed, 131 insertions(+), 93 deletions(-) diff --git a/api/circulation.py b/api/circulation.py index 74dedb1a32..6d1b3bb01a 100644 --- a/api/circulation.py +++ b/api/circulation.py @@ -2,12 +2,9 @@ import datetime import logging -import sys -import time from abc import ABC, abstractmethod from collections.abc import Iterable, Mapping from threading import Thread -from types import TracebackType from typing import Any, Literal, TypeVar import flask @@ -15,6 +12,7 @@ from flask_babel import lazy_gettext as _ from pydantic import PositiveInt from sqlalchemy.orm import Query +from sqlalchemy.orm.scoping import ScopedSession from api.circulation_exceptions import ( AlreadyCheckedOut, @@ -61,8 +59,9 @@ get_one, ) from core.model.integration import IntegrationConfiguration +from core.service.logging.configuration import LogLevel from core.util.datetime_helpers import utc_now -from core.util.log import LoggerMixin +from core.util.log import LoggerMixin, log_elapsed_time from core.util.problem_detail import ProblemDetail @@ -763,6 +762,80 @@ def patron_activity( ... +PatronActivityCirculationApiType = PatronActivityCirculationAPI[ + BaseCirculationApiSettings, BaseSettings +] + + +T = TypeVar("T") + + +class PatronActivityThread(Thread, LoggerMixin): + def __init__( + self, + db: Session, + api_cls: type[PatronActivityCirculationApiType], + collection_id: int | None, + patron_id: int | None, + pin: str, + ) -> None: + self.db = db + self.api_cls = api_cls + self.collection_id = collection_id + self.patron_id = patron_id + self.pin = pin + self.activity: list[LoanInfo | HoldInfo] | None = None + self.error = False + super().__init__() + + @property + def details(self) -> str: + return f"Api '{self.api_cls.__name__}' / Collection '{self.collection_id}' / Patron '{self.patron_id}'" + + def load_from_db(self, identifier: int | None, model: type[T]) -> T | None: + if identifier is None: + self.log.error(f"No {model.__name__} ID provided. ({self.details})") + return None + loaded_model = get_one(self.db, model, id=identifier) + if loaded_model is None: + self.log.error( + f"{model.__name__} with ID {identifier} not found. ({self.details})" + ) + return loaded_model + + @property + def patron(self) -> Patron | None: + return self.load_from_db(self.patron_id, Patron) + + @property + def collection(self) -> Collection | None: + return self.load_from_db(self.collection_id, Collection) + + def api(self, collection: Collection) -> PatronActivityCirculationApiType: + return self.api_cls(self.db, collection) + + @log_elapsed_time(log_level=LogLevel.debug) + def run(self) -> None: + try: + patron = self.patron + if patron is None: + return + + collection = self.collection + if collection is None: + return + api = self.api(collection) + self.activity = list(api.patron_activity(patron, self.pin)) + except: + self.log.exception(f"Error fetching patron activity. ({self.details})") + self.error = True + finally: + # In tests we don't have a scoped session here, however when this + # code is run normally we need to remove the session from the thread. + if isinstance(self.db, ScopedSession): + self.db.remove() + + class CirculationAPI(LoggerMixin): """Implement basic circulation logic and abstract away the details between different circulation APIs behind generic operations like @@ -1448,6 +1521,9 @@ def release_hold( return True + @log_elapsed_time( + message_prefix="patron_activity sync", log_level=LogLevel.debug, skip_start=True + ) def patron_activity( self, patron: Patron, pin: str ) -> tuple[list[LoanInfo], list[HoldInfo], bool]: @@ -1459,52 +1535,14 @@ def patron_activity( :return: A 2-tuple (loans, holds) containing `HoldInfo` and `LoanInfo` objects. """ - log = self.log - - class PatronActivityThread(Thread): - def __init__( - self, - api: PatronActivityCirculationAPI[ - BaseCirculationApiSettings, BaseSettings - ], - patron: Patron, - pin: str, - ) -> None: - self.api = api - self.patron = patron - self.pin = pin - self.activity: Iterable[LoanInfo | HoldInfo] | None = None - self.exception: Exception | None = None - self.trace: tuple[ - type[BaseException], BaseException, TracebackType - ] | tuple[None, None, None] | None = None - super().__init__() - - def run(self) -> None: - before = time.time() - try: - self.activity = self.api.patron_activity(self.patron, self.pin) - except Exception as e: - self.exception = e - self.trace = sys.exc_info() - after = time.time() - log.debug( - "Synced %s in %.2f sec", self.api.__class__.__name__, after - before - ) - - # While testing we are in a Session scope - # we need to only close this if api._db is a flask_scoped_session. - if getattr(self.api, "_db", None) and type(self.api._db) != Session: - # Since we are in a Thread using a flask_scoped_session - # we can assume a new Session was opened due to the thread activity. - # We must close this session to avoid connection pool leaks - self.api._db.close() - threads = [] - before = time.time() for api in list(self.api_for_collection.values()): if isinstance(api, PatronActivityCirculationAPI): - thread = PatronActivityThread(api, patron, pin) + api_cls = type(api) + collection_id = api.collection_id + thread = PatronActivityThread( + self._db, api_cls, collection_id, patron.id, pin + ) threads.append(thread) for thread in threads: thread.start() @@ -1514,16 +1552,10 @@ def run(self) -> None: holds: list[HoldInfo] = [] complete = True for thread in threads: - if thread.exception: + if thread.error: # Something went wrong, so we don't have a complete # picture of the patron's loans. complete = False - self.log.error( - "%s errored out: %s", - thread.api.__class__.__name__, - thread.exception, - exc_info=thread.trace, - ) if thread.activity: for i in thread.activity: if not isinstance(i, (LoanInfo, HoldInfo)): @@ -1538,8 +1570,6 @@ def run(self) -> None: elif isinstance(i, HoldInfo): holds.append(i) - after = time.time() - self.log.debug("Full sync took %.2f sec", after - before) return loans, holds, complete def local_loans(self, patron: Patron) -> Query[Loan]: diff --git a/tests/api/test_bibliotheca.py b/tests/api/test_bibliotheca.py index 2c41d0112e..66b8ca0ecb 100644 --- a/tests/api/test_bibliotheca.py +++ b/tests/api/test_bibliotheca.py @@ -6,7 +6,7 @@ from io import BytesIO, StringIO from typing import TYPE_CHECKING, cast from unittest import mock -from unittest.mock import MagicMock, create_autospec +from unittest.mock import MagicMock, create_autospec, patch import pytest from pymarc import parse_xml_to_array @@ -25,7 +25,13 @@ ItemListParser, PatronCirculationParser, ) -from api.circulation import CirculationAPI, FulfillmentInfo, HoldInfo, LoanInfo +from api.circulation import ( + CirculationAPI, + FulfillmentInfo, + HoldInfo, + LoanInfo, + PatronActivityThread, +) from api.circulation_exceptions import ( AlreadyCheckedOut, AlreadyOnHold, @@ -462,7 +468,11 @@ def test_sync_bookshelf(self, bibliotheca_fixture: BibliothecaAPITestFixture): bibliotheca_fixture.api.queue_response( 200, content=bibliotheca_fixture.files.sample_data("checkouts.xml") ) - circulation.sync_bookshelf(patron, "dummy pin") + + with patch.object( + PatronActivityThread, "api", return_value=bibliotheca_fixture.api + ): + circulation.sync_bookshelf(patron, "dummy pin") # The patron should have two loans and two holds. l1, l2 = patron.loans diff --git a/tests/api/test_circulationapi.py b/tests/api/test_circulationapi.py index 68aa54607d..21077ac486 100644 --- a/tests/api/test_circulationapi.py +++ b/tests/api/test_circulationapi.py @@ -2,7 +2,7 @@ import datetime from datetime import timedelta from typing import cast -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import flask import pytest @@ -17,6 +17,7 @@ FulfillmentInfo, HoldInfo, LoanInfo, + PatronActivityThread, ) from api.circulation_exceptions import ( AlreadyCheckedOut, @@ -1441,21 +1442,23 @@ def test_patron_activity( data = api_bibliotheca_files_fixture.sample_data("checkouts.xml") mock_bibliotheca.queue_response(200, content=data) - loans, holds, complete = circulation.patron_activity( - circulation_api.patron, "1234" - ) + with patch.object(PatronActivityThread, "api", return_value=mock_bibliotheca): + loans, holds, complete = circulation.patron_activity( + circulation_api.patron, "1234" + ) assert 2 == len(loans) assert 2 == len(holds) - assert True == complete + assert complete is True mock_bibliotheca.queue_response(500, content="Error") - loans, holds, complete = circulation.patron_activity( - circulation_api.patron, "1234" - ) + with patch.object(PatronActivityThread, "api", return_value=mock_bibliotheca): + loans, holds, complete = circulation.patron_activity( + circulation_api.patron, "1234" + ) assert 0 == len(loans) assert 0 == len(holds) - assert False == complete + assert complete is False def test_can_fulfill_without_loan(self, circulation_api: CirculationAPIFixture): """Can a title can be fulfilled without an active loan? It depends on diff --git a/tests/api/test_overdrive.py b/tests/api/test_overdrive.py index 0ad906e01d..1ddabb5d98 100644 --- a/tests/api/test_overdrive.py +++ b/tests/api/test_overdrive.py @@ -14,7 +14,13 @@ from requests import Response from sqlalchemy.orm.exc import StaleDataError -from api.circulation import CirculationAPI, FulfillmentInfo, HoldInfo, LoanInfo +from api.circulation import ( + CirculationAPI, + FulfillmentInfo, + HoldInfo, + LoanInfo, + PatronActivityThread, +) from api.circulation_exceptions import ( CannotFulfill, CannotHold, @@ -59,6 +65,7 @@ LicensePoolDeliveryMechanism, Measurement, MediaTypes, + Patron, Representation, RightsStatus, Subject, @@ -125,6 +132,10 @@ def sample_json(self, filename): data = self.data.sample_data(filename) return data, json.loads(data) + def sync_bookshelf(self, patron: Patron): + with patch.object(PatronActivityThread, "api", return_value=self.api): + return self.circulation.sync_bookshelf(patron, "dummy pin") + @pytest.fixture(scope="function") def overdrive_api_fixture( @@ -2603,9 +2614,7 @@ def test_sync_bookshelf_creates_local_loans( overdrive_api_fixture.api.queue_response(200, content=holds_data) patron = db.patron() - loans, holds = overdrive_api_fixture.circulation.sync_bookshelf( - patron, "dummy pin" - ) + loans, holds = overdrive_api_fixture.sync_bookshelf(patron) # All four loans in the sample data were created. assert isinstance(loans, list) @@ -2645,9 +2654,7 @@ def test_sync_bookshelf_creates_local_loans( patron.last_loan_activity_sync = None overdrive_api_fixture.api.queue_response(200, content=loans_data) overdrive_api_fixture.api.queue_response(200, content=holds_data) - loans, holds = overdrive_api_fixture.circulation.sync_bookshelf( - patron, "dummy pin" - ) + loans, holds = overdrive_api_fixture.sync_bookshelf(patron) assert isinstance(loans, list) assert 4 == len(loans) assert loans.sort() == patron.loans.sort() @@ -2678,9 +2685,7 @@ def test_sync_bookshelf_removes_loans_not_present_on_remote( # Sync with Overdrive, and the loan not present in the sample # data is removed. - loans, holds = overdrive_api_fixture.circulation.sync_bookshelf( - patron, "dummy pin" - ) + loans, holds = overdrive_api_fixture.sync_bookshelf(patron) assert isinstance(loans, list) assert 4 == len(loans) @@ -2708,9 +2713,7 @@ def test_sync_bookshelf_ignores_loans_from_other_sources( overdrive_api_fixture.api.queue_response(200, content=loans_data) overdrive_api_fixture.api.queue_response(200, content=holds_data) - loans, holds = overdrive_api_fixture.circulation.sync_bookshelf( - patron, "dummy pin" - ) + overdrive_api_fixture.sync_bookshelf(patron) assert 5 == len(patron.loans) assert gutenberg_loan in patron.loans @@ -2726,9 +2729,7 @@ def test_sync_bookshelf_creates_local_holds( overdrive_api_fixture.api.queue_response(200, content=holds_data) patron = db.patron() - loans, holds = overdrive_api_fixture.circulation.sync_bookshelf( - patron, "dummy pin" - ) + loans, holds = overdrive_api_fixture.sync_bookshelf(patron) # All four loans in the sample data were created. assert isinstance(holds, list) assert 4 == len(holds) @@ -2738,9 +2739,7 @@ def test_sync_bookshelf_creates_local_holds( patron.last_loan_activity_sync = None overdrive_api_fixture.api.queue_response(200, content=loans_data) overdrive_api_fixture.api.queue_response(200, content=holds_data) - loans, holds = overdrive_api_fixture.circulation.sync_bookshelf( - patron, "dummy pin" - ) + loans, holds = overdrive_api_fixture.sync_bookshelf(patron) assert isinstance(holds, list) assert 4 == len(holds) assert sorted(holds) == sorted(patron.holds) @@ -2766,9 +2765,7 @@ def test_sync_bookshelf_removes_holds_not_present_on_remote( overdrive_api_fixture.api.queue_response(200, content=holds_data) # The hold not present in the sample data has been removed - loans, holds = overdrive_api_fixture.circulation.sync_bookshelf( - patron, "dummy pin" - ) + loans, holds = overdrive_api_fixture.sync_bookshelf(patron) assert isinstance(holds, list) assert 4 == len(holds) assert holds == patron.holds @@ -2799,9 +2796,7 @@ def test_sync_bookshelf_ignores_holds_from_other_collections( # overdrive_api_fixture.api doesn't know about the hold, but it was not # destroyed, because it came from a different collection. - loans, holds = overdrive_api_fixture.circulation.sync_bookshelf( - patron, "dummy pin" - ) + overdrive_api_fixture.sync_bookshelf(patron) assert 5 == len(patron.holds) assert overdrive_hold in patron.holds