diff --git a/alembic/versions/20230831_1c566151741f_remove_self_hosted_from_licensepools.py b/alembic/versions/20230831_1c566151741f_remove_self_hosted_from_licensepools.py new file mode 100644 index 0000000000..2b2d0406a8 --- /dev/null +++ b/alembic/versions/20230831_1c566151741f_remove_self_hosted_from_licensepools.py @@ -0,0 +1,31 @@ +"""Remove self_hosted from licensepools + +Revision ID: 1c566151741f +Revises: 2b672c6fb2b9 +Create Date: 2023-08-31 16:13:54.935093+00:00 + +""" +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "1c566151741f" +down_revision = "2b672c6fb2b9" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.drop_index("ix_licensepools_self_hosted", table_name="licensepools") + op.drop_column("licensepools", "self_hosted") + + +def downgrade() -> None: + op.add_column( + "licensepools", + sa.Column("self_hosted", sa.BOOLEAN(), autoincrement=False, nullable=False), + ) + op.create_index( + "ix_licensepools_self_hosted", "licensepools", ["self_hosted"], unique=False + ) diff --git a/api/admin/controller/admin_search.py b/api/admin/controller/admin_search.py index cc7c7c79b9..e2aa1fddbc 100644 --- a/api/admin/controller/admin_search.py +++ b/api/admin/controller/admin_search.py @@ -47,7 +47,6 @@ def _unzip(cls, values: List[Tuple[str, int]]) -> dict: def _search_field_values_cached(self, collection_ids: List[int]) -> dict: licenses_filter = or_( LicensePool.open_access == True, - LicensePool.self_hosted == True, LicensePool.licenses_owned != 0, ) diff --git a/api/admin/dashboard_stats.py b/api/admin/dashboard_stats.py index f5a9973c74..07df5a1c08 100644 --- a/api/admin/dashboard_stats.py +++ b/api/admin/dashboard_stats.py @@ -34,7 +34,6 @@ class Statistics: LicensePool.open_access == False, ) OPEN_ACCESS_FILTER = LicensePool.open_access == True - SELF_HOSTED_FILTER = LicensePool.self_hosted == True AT_LEAST_ONE_LENDABLE_FILTER = or_( UNLIMITED_LICENSE_FILTER, OPEN_ACCESS_FILTER, @@ -67,7 +66,10 @@ def _gather_collection_stats(self, collection: Collection) -> CollectionInventor metered_license_title_count = _count(self.METERED_LICENSE_FILTER) unlimited_license_title_count = _count(self.UNLIMITED_LICENSE_FILTER) open_access_title_count = _count(self.OPEN_ACCESS_FILTER) - self_hosted_title_count = _count(self.SELF_HOSTED_FILTER) + # TODO: We no longer support self-hosted books, so this should always be 0. + # this value is still included in the response for backwards compatibility, + # but should be removed in a future release. + self_hosted_title_count = 0 at_least_one_loanable_count = _count(self.AT_LEAST_ONE_LENDABLE_FILTER) licenses_owned_count, licenses_available_count = map( diff --git a/api/base_controller.py b/api/base_controller.py index 6e48345e70..55a9830440 100644 --- a/api/base_controller.py +++ b/api/base_controller.py @@ -3,7 +3,7 @@ from flask_babel import lazy_gettext as _ from werkzeug.datastructures import Authorization -from core.model import Library, Loan, Patron, get_one +from core.model import Library, Patron from core.util.problem_detail import ProblemDetail from .circulation_exceptions import * @@ -104,29 +104,6 @@ def authenticate(self): data = self.manager.authentication_for_opds_document return Response(data, 401, headers) - def library_through_external_loan_identifier(self, loan_external_identifier): - """Look up the library the user is trying to access using a loan's external identifier. - We assume that the external identifier is globally unique which is true, for example, - in the case of using Readium LCP. - - :param loan_external_identifier: External identifier of the patron's loan - :type loan_external_identifier: basestring - - :return: Library the patron is trying to access - :rtype: Library - """ - self.manager.reload_settings_if_changed() - - loan = get_one(self._db, Loan, external_identifier=loan_external_identifier) - - if loan is None: - return LOAN_NOT_FOUND - - library = loan.patron.library - flask.request.library = library - - return library - def library_for_request(self, library_short_name): """Look up the library the user is trying to access. diff --git a/api/circulation.py b/api/circulation.py index 1b4bf43f5e..078bea0613 100644 --- a/api/circulation.py +++ b/api/circulation.py @@ -37,13 +37,11 @@ ConfigurationFormItemType, FormField, ) -from core.mirror import MirrorUploader from core.model import ( CirculationEvent, Collection, DataSource, DeliveryMechanism, - ExternalIntegrationLink, Hold, Library, LicensePool, @@ -943,39 +941,6 @@ def can_revoke_hold(self, licensepool: LicensePool, hold: Hold) -> bool: return True return False - def _try_to_sign_fulfillment_link( - self, licensepool: LicensePool, fulfillment: FulfillmentInfo - ) -> FulfillmentInfo: - """Tries to sign the fulfillment URL (only works in the case when the collection has mirrors set up) - - :param licensepool: License pool - :param fulfillment: Fulfillment info - - :return: Fulfillment info with a possibly signed URL - """ - mirror_types = [ExternalIntegrationLink.PROTECTED_ACCESS_BOOKS] - mirror = next( - iter( - [ - MirrorUploader.for_collection(licensepool.collection, mirror_type) - for mirror_type in mirror_types - ] - ) - ) - - if mirror: - signed_url = mirror.sign_url(fulfillment.content_link) - - self.log.info( - "Fulfillment link {} has been signed and translated into {}".format( - fulfillment.content_link, signed_url - ) - ) - - fulfillment.content_link = signed_url - - return fulfillment - def _collect_event( self, patron: Optional[Patron], @@ -1103,11 +1068,7 @@ def borrow( now = utc_now() api = self.api_for_license_pool(licensepool) - if ( - licensepool.open_access - or licensepool.self_hosted - or (not api and licensepool.unlimited_access) - ): + if licensepool.open_access or (not api and licensepool.unlimited_access): # We can 'loan' open-access content ourselves just by # putting a row in the database. __transaction = self._db.begin_nested() @@ -1538,11 +1499,7 @@ def fulfill( api = self.api_for_license_pool(licensepool) - if ( - licensepool.open_access - or licensepool.self_hosted - or (not api and licensepool.unlimited_access) - ): + if licensepool.open_access or (not api and licensepool.unlimited_access): # We ignore the vendor-specific arguments when doing # open-access fulfillment, because we just don't support # partial fulfillment of open-access content. @@ -1555,10 +1512,6 @@ def fulfill( patron, pin, licensepool, delivery_mechanism, fulfillment ) - if licensepool.self_hosted: - fulfillment = self._try_to_sign_fulfillment_link( - licensepool, fulfillment - ) else: if not api: raise CannotFulfill() @@ -1669,7 +1622,7 @@ def revoke_loan( ) if loan is not None: api = self.api_for_license_pool(licensepool) - if not (api is None or licensepool.open_access or licensepool.self_hosted): + if not (api is None or licensepool.open_access): try: api.checkin(patron, pin, licensepool) except NotCheckedOut as e: @@ -1703,7 +1656,7 @@ def release_hold( license_pool=licensepool, on_multiple="interchangeable", ) - if not licensepool.open_access and not licensepool.self_hosted: + if not licensepool.open_access: api = self.api_for_license_pool(licensepool) if api is None: raise TypeError(f"No api for licensepool: {licensepool}") diff --git a/api/controller.py b/api/controller.py index 420408384c..743697e886 100644 --- a/api/controller.py +++ b/api/controller.py @@ -455,10 +455,6 @@ def setup_one_time_controllers(self): self.patron_auth_token = PatronAuthTokenController(self) self.playtime_entries = PlaytimeEntriesController(self) - from api.lcp.controller import LCPController - - self.lcp_controller = LCPController(self) - def setup_configuration_dependent_controllers(self): """Set up all the controllers that depend on the current site configuration. @@ -762,7 +758,6 @@ def apply_borrowing_policy(self, patron, license_pool): and license_pool.licenses_available == 0 and not license_pool.open_access and not license_pool.unlimited_access - and not license_pool.self_hosted ): return FORBIDDEN_BY_POLICY.detailed( _("Library policy prohibits the placement of holds."), status_code=403 diff --git a/api/integration/registry/license_providers.py b/api/integration/registry/license_providers.py index ef5c539f90..eee96900e9 100644 --- a/api/integration/registry/license_providers.py +++ b/api/integration/registry/license_providers.py @@ -32,7 +32,6 @@ def __init__(self) -> None: from api.axis import Axis360API from api.bibliotheca import BibliothecaAPI from api.enki import EnkiAPI - from api.lcp.collection import LCPAPI from api.odilo import OdiloAPI from api.odl import ODLAPI from api.odl2 import ODL2API @@ -47,7 +46,6 @@ def __init__(self) -> None: self.register(OPDSForDistributorsAPI, canonical=OPDSForDistributorsAPI.NAME) self.register(ODLAPI, canonical=ODLAPI.NAME) self.register(ODL2API, canonical=ODL2API.NAME) - self.register(LCPAPI, canonical=LCPAPI.NAME) class OpenAccessLicenseProvidersRegistry(IntegrationRegistry["OPDSImporter"]): diff --git a/api/lcp/__init__.py b/api/lcp/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/api/lcp/collection.py b/api/lcp/collection.py deleted file mode 100644 index f238ad66cc..0000000000 --- a/api/lcp/collection.py +++ /dev/null @@ -1,368 +0,0 @@ -import datetime -import json -from io import BytesIO - -from flask import send_file -from sqlalchemy import or_ - -from api.circulation import BaseCirculationAPI, FulfillmentInfo, LoanInfo -from api.lcp.encrypt import LCPEncryptionSettings -from api.lcp.hash import HasherFactory -from api.lcp.server import LCPServer, LCPServerSettings -from core.integration.base import HasLibraryIntegrationConfiguration -from core.integration.settings import BaseSettings -from core.lcp.credential import LCPCredentialFactory -from core.model import ( - Collection, - DeliveryMechanism, - ExternalIntegration, - LicensePool, - Loan, - get_one, -) -from core.model.configuration import HasExternalIntegration -from core.util.datetime_helpers import utc_now - - -class LCPFulfilmentInfo(FulfillmentInfo): - """Sends LCP licenses as fulfilment info""" - - def __init__( - self, - identifier, - collection, - data_source_name, - identifier_type, - content_link=None, - content_type=None, - content=None, - content_expires=None, - ): - """Initializes a new instance of LCPFulfilmentInfo class - - :param identifier: Identifier - :type identifier: string - - :param collection: Collection - :type collection: Collection - - :param data_source_name: Data source's name - :type data_source_name: string - - :param identifier_type: Identifier's type - :type identifier_type: string - - :param content_link: Content link - :type content_link: Optional[string] - - :param content_link: Identifier's type - :type content_link: string - - :param content: Identifier's type - :type content: Any - - :param content_expires: Time when the content expires - :type content_expires: Optional[datetime.datetime] - """ - super().__init__( - collection, - data_source_name, - identifier_type, - identifier, - content_link, - content_type, - content, - content_expires, - ) - - @property - def as_response(self): - """Returns LCP license as a Flask response - - :return: LCP license as a Flask response - :rtype: Response - """ - return send_file( - BytesIO(json.dumps(self.content)), - mimetype=DeliveryMechanism.LCP_DRM, - as_attachment=True, - attachment_filename=f"{self.identifier}.lcpl", - ) - - -class LCPSettings(LCPEncryptionSettings, LCPServerSettings): - pass - - -class LCPLibrarySettings(BaseSettings): - pass - - -class LCPAPI( - BaseCirculationAPI, HasExternalIntegration, HasLibraryIntegrationConfiguration -): - """Implements LCP workflow""" - - NAME = ExternalIntegration.LCP - SERVICE_NAME = "LCP" - DESCRIPTION = "Manually imported collection protected using Readium LCP DRM" - - @classmethod - def settings_class(cls): - return LCPSettings - - @classmethod - def library_settings_class(cls): - return LCPLibrarySettings - - def label(self): - return self.NAME - - def description(self): - return self.DESCRIPTION - - def __init__(self, db, collection): - """Initializes a new instance of LCPAPI class - - :param db: Database session - :type db: sqlalchemy.orm.session.Session - - :param collection: Book collection - :type collection: Collection - """ - if collection.protocol != ExternalIntegration.LCP: - raise ValueError( - "Collection protocol is {} but must be LCPAPI".format( - collection.protocol - ) - ) - - self._db = db - self._collection_id = collection.id - self._lcp_server_instance = None - - def internal_format(self, delivery_mechanism): - """Look up the internal format for this delivery mechanism or - raise an exception. - - :param delivery_mechanism: A LicensePoolDeliveryMechanism - :type delivery_mechanism: LicensePoolDeliveryMechanism - """ - return delivery_mechanism - - @property - def collection(self): - """Returns an associated Collection object - - :return: Associated Collection object - :rtype: Collection - """ - return Collection.by_id(self._db, id=self._collection_id) - - def external_integration(self, db): - """Returns an external integration associated with this object - - :param db: Database session - :type db: sqlalchemy.orm.session.Session - - :return: External integration associated with this object - :rtype: core.model.configuration.ExternalIntegration - """ - return self.collection.external_integration - - def _create_lcp_server(self): - """Creates a new instance of LCPServer - - :return: New instance of LCPServer - :rtype: LCPServer - """ - - hasher_factory = HasherFactory() - credential_factory = LCPCredentialFactory() - lcp_server = LCPServer( - self.configuration, - hasher_factory, - credential_factory, - ) - - return lcp_server - - @property - def _lcp_server(self): - """Returns an instance of LCPServer - - :return: Instance of LCPServer - :rtype: LCPServer - """ - if self._lcp_server_instance is None: - self._lcp_server_instance = self._create_lcp_server() - - return self._lcp_server_instance - - def checkout(self, patron, pin, licensepool, internal_format): - """Checks out a book on behalf of a patron - - :param patron: A Patron object for the patron who wants to check out the book - :type patron: Patron - - :param pin: The patron's alleged password - :type pin: string - - :param licensepool: Contains lending info as well as link to parent Identifier - :type licensepool: LicensePool - - :param internal_format: Represents the patron's desired book format. - :type internal_format: Any - - :return: a LoanInfo object - :rtype: LoanInfo - """ - days = self.collection.default_loan_period(patron.library) - today = utc_now() - expires = today + datetime.timedelta(days=days) - loan = get_one( - self._db, - Loan, - patron=patron, - license_pool=licensepool, - on_multiple="interchangeable", - ) - - if loan: - license = self._lcp_server.get_license( - self._db, loan.external_identifier, patron - ) - else: - license = self._lcp_server.generate_license( - self._db, licensepool.identifier.identifier, patron, today, expires - ) - - loan = LoanInfo( - licensepool.collection, - licensepool.data_source.name, - identifier_type=licensepool.identifier.type, - identifier=licensepool.identifier.identifier, - start_date=today, - end_date=expires, - fulfillment_info=None, - external_identifier=license["id"], - ) - - return loan - - def fulfill( - self, - patron, - pin, - licensepool, - internal_format=None, - part=None, - fulfill_part_url=None, - ): - """Get the actual resource file to the patron. - - :param patron: A Patron object for the patron who wants to check out the book - :type patron: Patron - - :param pin: The patron's alleged password - :type pin: string - - :param licensepool: Contains lending info as well as link to parent Identifier - :type licensepool: LicensePool - - :param internal_format: A vendor-specific name indicating the format requested by the patron - :type internal_format: - - :param part: A vendor-specific identifier indicating that the - patron wants to fulfill one specific part of the book - (e.g. one chapter of an audiobook), not the whole thing - :type part: Any - - :param fulfill_part_url: A function that takes one argument (a - vendor-specific part identifier) and returns the URL to use - when fulfilling that part - :type fulfill_part_url: Any - - :return: a FulfillmentInfo object - :rtype: FulfillmentInfo - """ - loan = get_one( - self._db, - Loan, - patron=patron, - license_pool=licensepool, - on_multiple="interchangeable", - ) - license = self._lcp_server.get_license( - self._db, loan.external_identifier, patron - ) - fulfillment_info = LCPFulfilmentInfo( - licensepool.identifier.identifier, - licensepool.collection, - licensepool.data_source.name, - licensepool.identifier.type, - content_link=None, - content_type=DeliveryMechanism.LCP_DRM, - content=license, - content_expires=None, - ) - - return fulfillment_info - - def patron_activity(self, patron, pin): - """Returns patron's loans - - :param patron: A Patron object for the patron who wants to check out the book - :type patron: Patron - - :param pin: The patron's alleged password - :type pin: string - - :return: List of patron's loans - :rtype: List[LoanInfo] - """ - now = utc_now() - loans = ( - self._db.query(Loan) - .join(LicensePool) - .join(Collection) - .filter( - Collection.id == self._collection_id, - Loan.patron == patron, - or_(Loan.start is None, Loan.start <= now), - or_(Loan.end is None, Loan.end > now), - ) - ) - - loan_info_objects = [] - - for loan in loans: - licensepool = get_one(self._db, LicensePool, id=loan.license_pool_id) - - loan_info_objects.append( - LoanInfo( - collection=self.collection, - data_source_name=licensepool.data_source.name, - identifier_type=licensepool.identifier.type, - identifier=licensepool.identifier.identifier, - start_date=loan.start, - end_date=loan.end, - fulfillment_info=None, - external_identifier=loan.external_identifier, - ) - ) - - return loan_info_objects - - # TODO: Implement place_hold and release_hold (https://jira.nypl.org/browse/SIMPLY-3013) - def release_hold(self, patron, pin, licensepool): - raise NotImplementedError() - - def place_hold(self, patron, pin, licensepool, notification_email_address): - raise NotImplementedError() - - def checkin(self, patron, pin, licensepool): - raise NotImplementedError() - - def update_availability(self, licensepool): - pass diff --git a/api/lcp/controller.py b/api/lcp/controller.py deleted file mode 100644 index c254ca6bff..0000000000 --- a/api/lcp/controller.py +++ /dev/null @@ -1,138 +0,0 @@ -import logging - -import flask - -from api.admin.problem_details import MISSING_COLLECTION -from api.controller import CirculationManagerController -from api.lcp.factory import LCPServerFactory -from core.lcp.credential import LCPCredentialFactory, LCPUnhashedPassphrase -from core.model import Collection, ExternalIntegration, Session -from core.util.problem_detail import ProblemDetail - - -class LCPController(CirculationManagerController): - """Contains API endpoints related to LCP workflow""" - - def __init__(self, manager): - """Initializes a new instance of LCPController class - - :param manager: CirculationManager object - :type manager: CirculationManager - """ - super().__init__(manager) - - self._logger = logging.getLogger(__name__) - self._credential_factory = LCPCredentialFactory() - self._lcp_server_factory = LCPServerFactory() - - def _get_patron(self): - """Returns a patron associated with the request (if any) - - :return: Patron associated with the request (if any) - :rtype: core.model.patron.Patron - """ - self._logger.info( - "Started fetching an authenticated patron associated with the request" - ) - - patron = self.authenticated_patron_from_request() - - self._logger.info( - "Finished fetching an authenticated patron associated with the request: {}".format( - patron - ) - ) - - return patron - - def _get_lcp_passphrase(self, patron) -> LCPUnhashedPassphrase: - """Returns a patron's LCP passphrase - - :return: Patron's LCP passphrase - :rtype: string - """ - db = Session.object_session(patron) - - self._logger.info("Started fetching a patron's LCP passphrase") - - lcp_passphrase = self._credential_factory.get_patron_passphrase(db, patron) - - self._logger.info( - f"Finished fetching a patron's LCP passphrase: {lcp_passphrase}" - ) - - return lcp_passphrase - - def _get_lcp_collection(self, patron, collection_name): - """Returns an LCP collection for a specified library - NOTE: We assume that there is only ONE LCP collection per library - - :param patron: Patron object - :type patron: core.model.patron.Patron - - :param collection_name: Name of the collection - :type collection_name: string - - :return: LCP collection - :rtype: core.model.collection.Collection - """ - db = Session.object_session(patron) - lcp_collection, _ = Collection.by_name_and_protocol( - db, collection_name, ExternalIntegration.LCP - ) - - if not lcp_collection or lcp_collection not in patron.library.collections: - return MISSING_COLLECTION - - return lcp_collection - - def get_lcp_passphrase(self): - """Returns an LCP passphrase for the authenticated patron - - :return: Flask response containing the LCP passphrase for the authenticated patron - :rtype: Response - """ - self._logger.info("Started fetching a patron's LCP passphrase") - - patron = self._get_patron() - lcp_passphrase = self._get_lcp_passphrase(patron) - - self._logger.info( - "Finished fetching a patron's LCP passphrase: {}".format( - lcp_passphrase.text - ) - ) - - response = flask.jsonify({"passphrase": lcp_passphrase.text}) - - return response - - def get_lcp_license(self, collection_name, license_id): - """Returns an LCP license with the specified ID - - :param collection_name: Name of the collection - :type collection_name: string - - :param license_id: License ID - :type license_id: string - - :return: Flask response containing the LCP license with the specified ID - :rtype: string - """ - self._logger.info(f"Started fetching license # {license_id}") - - patron = self._get_patron() - lcp_collection = self._get_lcp_collection(patron, collection_name) - - if isinstance(lcp_collection, ProblemDetail): - return lcp_collection - - lcp_api = self.circulation.api_for_collection.get(lcp_collection.id) - lcp_server = self._lcp_server_factory.create(lcp_api) - - db = Session.object_session(patron) - lcp_license = lcp_server.get_license(db, license_id, patron) - - self._logger.info(f"Finished fetching license # {license_id}: {lcp_license}") - - return flask.jsonify(lcp_license) diff --git a/api/lcp/encrypt.py b/api/lcp/encrypt.py deleted file mode 100644 index dfb158800e..0000000000 --- a/api/lcp/encrypt.py +++ /dev/null @@ -1,521 +0,0 @@ -import json -import logging -import os -import re -import subprocess -from json import JSONEncoder -from typing import Optional - -from flask_babel import lazy_gettext as _ - -from api.lcp import utils -from core.exceptions import BaseError -from core.integration.settings import ( - BaseSettings, - ConfigurationFormItem, - ConfigurationFormItemType, - FormField, -) -from core.model.integration import IntegrationConfiguration - - -class LCPEncryptionException(BaseError): - """Raised in the case of any errors occurring during LCP encryption process""" - - -class LCPEncryptionConstants: - DEFAULT_LCPENCRYPT_LOCATION = "/go/bin/lcpencrypt" - DEFAULT_LCPENCRYPT_DOCKER_IMAGE = "readium/lcpencrypt" - - -class LCPEncryptionSettings(BaseSettings): - lcpencrypt_location: str = FormField( - default=LCPEncryptionConstants.DEFAULT_LCPENCRYPT_LOCATION, - form=ConfigurationFormItem( - label=_("lcpencrypt's location"), - description=_( - "Full path to the local lcpencrypt binary. " - "The default value is {}".format( - LCPEncryptionConstants.DEFAULT_LCPENCRYPT_LOCATION - ) - ), - type=ConfigurationFormItemType.TEXT, - required=False, - ), - ) - - lcpencrypt_output_directory: Optional[str] = FormField( - form=ConfigurationFormItem( - label=_("lcpencrypt's output directory"), - description=_( - "Full path to the directory where lcpencrypt stores encrypted content. " - "If not set encrypted books will be stored in lcpencrypt's working directory" - ), - type=ConfigurationFormItemType.TEXT, - required=False, - ) - ) - - -class LCPEncryptionResult: - """Represents an output sent by lcpencrypt""" - - CONTENT_ID = "content-id" - CONTENT_ENCRYPTION_KEY = "content-encryption-key" - PROTECTED_CONTENT_LOCATION = "protected-content-location" - PROTECTED_CONTENT_LENGTH = "protected-content-length" - PROTECTED_CONTENT_SHA256 = "protected-content-sha256" - PROTECTED_CONTENT_DISPOSITION = "protected-content-disposition" - PROTECTED_CONTENT_TYPE = "protected-content-type" - - def __init__( - self, - content_id, - content_encryption_key, - protected_content_location, - protected_content_disposition, - protected_content_type, - protected_content_length, - protected_content_sha256, - ): - """Initializes a new instance of LCPEncryptorResult class - - :param: content_id: Content identifier - :type content_id: Optional[string] - - :param: content_encryption_key: Content encryption key - :type content_encryption_key: Optional[string] - - :param: protected_content_location: Complete file path of the encrypted content - :type protected_content_location: Optional[string] - - :param: protected_content_disposition: File name of the encrypted content - :type protected_content_disposition: Optional[string] - - :param: protected_content_type: Media type of the encrypted content - :type protected_content_type: Optional[string] - - :param: protected_content_length: Size of the encrypted content - :type protected_content_length: Optional[string] - - :param: protected_content_sha256: Hash of the encrypted content - :type protected_content_sha256: Optional[string] - """ - self._content_id = content_id - self._content_encryption_key = content_encryption_key - self._protected_content_location = protected_content_location - self._protected_content_disposition = protected_content_disposition - self._protected_content_type = protected_content_type - self._protected_content_length = protected_content_length - self._protected_content_sha256 = protected_content_sha256 - - @property - def content_id(self): - """Returns a content encryption key - - :return: Content encryption key - :rtype: Optional[string] - """ - return self._content_id - - @property - def content_encryption_key(self): - """Returns a content identifier - - :return: Content identifier - :rtype: Optional[string] - """ - return self._content_encryption_key - - @property - def protected_content_location(self): - """Returns a complete file path of the encrypted content - - :return: Complete file path of the encrypted content - :rtype: Optional[string] - """ - return self._protected_content_location - - @property - def protected_content_disposition(self): - """Returns a file name of the encrypted content - - :return: File name of the encrypted content - :rtype: Optional[string] - """ - return self._protected_content_disposition - - @property - def protected_content_type(self): - """Returns a media type of the encrypted content - - :return: Media type of the encrypted content - :rtype: Optional[string] - """ - return self._protected_content_type - - @property - def protected_content_length(self): - """Returns a size of the encrypted content - - :return: Size of the encrypted content - :rtype: Optional[string] - """ - return self._protected_content_length - - @property - def protected_content_sha256(self): - """Returns a hash of the encrypted content - - :return: Hash of the encrypted content - :rtype: Optional[string] - """ - return self._protected_content_sha256 - - @classmethod - def from_dict(cls, result_dict): - """Creates an LCPEncryptorResult object from a Python dictionary - - :param result_dict: Python dictionary containing an lcpencrypt output - :type result_dict: Dict - - :return: LCPEncryptorResult object - :rtype: LCPEncryptionResult - """ - content_id = result_dict.get(cls.CONTENT_ID) - content_encryption_key = result_dict.get(cls.CONTENT_ENCRYPTION_KEY) - protected_content_location = result_dict.get(cls.PROTECTED_CONTENT_LOCATION) - protected_content_length = result_dict.get(cls.PROTECTED_CONTENT_LENGTH) - protected_content_sha256 = result_dict.get(cls.PROTECTED_CONTENT_SHA256) - protected_content_disposition = result_dict.get( - cls.PROTECTED_CONTENT_DISPOSITION - ) - protected_content_type = result_dict.get(cls.PROTECTED_CONTENT_TYPE) - - return cls( - content_id=content_id, - content_encryption_key=content_encryption_key, - protected_content_location=protected_content_location, - protected_content_disposition=protected_content_disposition, - protected_content_type=protected_content_type, - protected_content_length=protected_content_length, - protected_content_sha256=protected_content_sha256, - ) - - def __eq__(self, other): - """Compares two LCPEncryptorResult objects - - :param other: LCPEncryptorResult object - :type other: LCPEncryptionResult - - :return: Boolean value indicating whether two items are equal - :rtype: bool - """ - if not isinstance(other, LCPEncryptionResult): - return False - - return ( - self.content_id == other.content_id - and self.content_encryption_key == other.content_encryption_key - and self.protected_content_location == other.protected_content_location - and self.protected_content_length == other.protected_content_length - and self.protected_content_sha256 == other.protected_content_sha256 - and self.protected_content_disposition - == other.protected_content_disposition - and self.protected_content_type == other.protected_content_type - ) - - def __repr__(self): - """Returns a string representation of a LCPEncryptorResult object - - :return: string representation of a LCPEncryptorResult object - :rtype: string - """ - return ( - "".format( - self.content_id, - self.content_encryption_key, - self.protected_content_location, - self.protected_content_length, - self.protected_content_sha256, - self.protected_content_disposition, - self.protected_content_type, - ) - ) - - -class LCPEncryptorResultJSONEncoder(JSONEncoder): - """Serializes LCPEncryptorResult as a JSON object""" - - def default(self, result): - """Serializers a Subject object to JSON - - :param result: LCPEncryptorResult object - :type result: LCPEncryptionResult - - :return: String containing JSON representation of the LCPEncryptorResult object - :rtype: string - """ - if not isinstance(result, LCPEncryptionResult): - raise ValueError("result must have type LCPEncryptorResult") - - result = { - "content-id": result.content_id, - "content-encryption-key": result.content_encryption_key, - "protected-content-location": result.protected_content_location, - "protected-content-length": result.protected_content_length, - "protected-content-sha256": result.protected_content_sha256, - "protected-content-disposition": result.protected_content_disposition, - "protected-content-type": result.protected_content_type, - } - - return result - - -class LCPEncryptor: - """Wrapper around lcpencrypt tool containing logic to run it locally and in a Docker container""" - - class Parameters: - """Parses input parameters for lcpencrypt""" - - def __init__( - self, - file_path: str, - identifier: str, - configuration: IntegrationConfiguration, - ): - """Initializes a new instance of Parameters class - - :param file_path: File path to the book to be encrypted - - :param identifier: Book's identifier - - :param configuration: IntegrationConfiguration instance - """ - self._lcpencrypt_location = configuration.settings_dict.get( - "lcpencrypt_location" - ) - self._input_file_path = str(file_path) - self._content_id = str(identifier) - - output_directory = configuration.settings_dict.get( - "lcpencrypt_output_directory" - ) - - self._output_file_path = None - - if output_directory: - _, input_extension = os.path.splitext(file_path) - target_extension = utils.get_target_extension(input_extension) - output_file_path = os.path.join( - output_directory, - identifier + target_extension - if target_extension not in identifier - else identifier, - ) - - self._output_file_path = output_file_path - - @property - def lcpencrypt_location(self): - """Returns location of lcpencrypt binary - - :return: Location of lcpencrypt binary - :rtype: string - """ - return self._lcpencrypt_location - - @property - def input_file_path(self): - """Returns path of the input file - - :return: Path of the input file - :rtype: string - """ - return self._input_file_path - - @property - def content_id(self): - """Returns content ID - - :return: Content ID - :rtype: string - """ - return self._content_id - - @property - def output_file_path(self): - """Returns path of the output file - - :return: Path of the output file - :rtype: string - """ - return self._output_file_path - - def to_array(self): - """Returns parameters in an array - - :return: Parameters in an array - :rtype: List - """ - parameters = [ - self._lcpencrypt_location, - "-input", - self._input_file_path, - "-contentid", - self._content_id, - ] - - if self._output_file_path: - parameters.extend(["-output", self._output_file_path]) - - return parameters - - OUTPUT_REGEX = re.compile(r"(\{.+\})?(.+)", re.DOTALL) - - def __init__(self, configuration: IntegrationConfiguration): - """Initializes a new instance of LCPEncryptor class - - :param configuration: The integration configuration of the collection - """ - self._logger = logging.getLogger(__name__) - self.configuration = configuration - - def _lcpencrypt_exists_locally(self): - """Returns a Boolean value indicating whether lcpencrypt exists locally""" - return os.path.isfile( - self.configuration.settings_dict.get("lcpencrypt_location") - ) - - def _parse_output(self, output): - """Parses lcpencrypt's output - - :param output: lcpencrypt's output - :type output: string - - :return: Encryption result - :rtype: LCPEncryptionResult - """ - bracket_index = output.find("{") - - if bracket_index > 0: - output = output[bracket_index:] - - match = self.OUTPUT_REGEX.match(output) - - if not match: - raise LCPEncryptionException("Output has a wrong format") - - match_groups = match.groups() - - if not match_groups: - raise LCPEncryptionException("Output has a wrong format") - - if not match_groups[0]: - raise LCPEncryptionException(match_groups[1].strip()) - - json_output = match_groups[0] - json_result = json.loads(json_output) - result = LCPEncryptionResult.from_dict(json_result) - - if ( - not result.protected_content_length - or not result.protected_content_sha256 - or not result.content_encryption_key - ): - raise LCPEncryptionException("Encryption failed") - - return result - - def _run_lcpencrypt_locally( - self, file_path: str, identifier: str - ) -> LCPEncryptionResult: - """Runs lcpencrypt using a local binary - - :param file_path: File path to the book to be encrypted - :type file_path: string - - :param identifier: Book's identifier - :type identifier: string - - :return: Encryption result - :rtype: LCPEncryptionResult - """ - self._logger.info( - "Started running a local lcpencrypt binary. File path: {}. Identifier: {}".format( - file_path, identifier - ) - ) - - parameters = LCPEncryptor.Parameters(file_path, identifier, self.configuration) - - try: - if parameters.output_file_path: - self._logger.info( - "Creating a directory tree for {}".format( - parameters.output_file_path - ) - ) - - output_directory = os.path.dirname(parameters.output_file_path) - - if not os.path.exists(output_directory): - os.makedirs(output_directory) - - self._logger.info( - "Directory tree {} has been successfully created".format( - output_directory - ) - ) - - self._logger.info( - "Running lcpencrypt using the following parameters: {}".format( - parameters.to_array() - ) - ) - - output = subprocess.check_output(parameters.to_array()) - result = self._parse_output(output) - except Exception as exception: - self._logger.exception( - "An unhandled exception occurred during running a local lcpencrypt binary" - ) - - raise LCPEncryptionException(str(exception), inner_exception=exception) - - self._logger.info( - "Finished running a local lcpencrypt binary. File path: {}. Identifier: {}. Result: {}".format( - file_path, identifier, result - ) - ) - - return result - - def encrypt(self, db, file_path, identifier): - """Encrypts a book - - :param db: Database session - :type db: sqlalchemy.orm.session.Session - - :param file_path: File path to the book to be encrypted - :type file_path: string - - :param identifier: Book's identifier - :type identifier: string - - :return: Encryption result - :rtype: LCPEncryptionResult - """ - if self._lcpencrypt_exists_locally(): - result = self._run_lcpencrypt_locally(file_path, identifier) - - return result - else: - raise NotImplementedError() diff --git a/api/lcp/factory.py b/api/lcp/factory.py deleted file mode 100644 index 3c4f785abf..0000000000 --- a/api/lcp/factory.py +++ /dev/null @@ -1,26 +0,0 @@ -from api.lcp.hash import HasherFactory -from api.lcp.server import LCPServer -from core.lcp.credential import LCPCredentialFactory - - -class LCPServerFactory: - """Creates a new instance of LCPServer""" - - def create(self, integration_association) -> LCPServer: - """Creates a new instance of LCPServer - - :param integration_association: Association with an external integration - :type integration_association: core.model.configuration.HasExternalIntegration - - :return: New instance of LCPServer - :rtype: LCPServer - """ - hasher_factory = HasherFactory() - credential_factory = LCPCredentialFactory() - lcp_server = LCPServer( - integration_association.configuration, - hasher_factory, - credential_factory, - ) - - return lcp_server diff --git a/api/lcp/importer.py b/api/lcp/importer.py deleted file mode 100644 index 2bedfbbbef..0000000000 --- a/api/lcp/importer.py +++ /dev/null @@ -1,32 +0,0 @@ -class LCPImporter: - """Class implementing LCP import workflow""" - - def __init__(self, lcp_encryptor, lcp_server): - """Initializes a new instance of LCPImporter class - - :param lcp_encryptor: LCPEncryptor object - :type lcp_encryptor: encrypt.LCPEncryptor - - :param lcp_server: LCPServer object - :type lcp_server: server.LCPServer - """ - self._lcp_encryptor = lcp_encryptor - self._lcp_server = lcp_server - - def import_book(self, db, file_path, identifier): - """Encrypts a book and sends a notification to the LCP server - - :param db: Database session - :type db: sqlalchemy.orm.session.Session - - :param file_path: File path to the book to be encrypted - :type file_path: string - - :param identifier: Book's identifier - :type identifier: string - - :return: Encryption result - :rtype: LCPEncryptionResult - """ - encrypted_content = self._lcp_encryptor.encrypt(db, file_path, identifier) - self._lcp_server.add_content(db, encrypted_content) diff --git a/api/lcp/mirror.py b/api/lcp/mirror.py deleted file mode 100644 index 729daa5e36..0000000000 --- a/api/lcp/mirror.py +++ /dev/null @@ -1,170 +0,0 @@ -import tempfile - -from flask_babel import lazy_gettext as _ -from sqlalchemy.orm import Session - -from api.lcp.encrypt import LCPEncryptor -from api.lcp.hash import HasherFactory -from api.lcp.importer import LCPImporter -from api.lcp.server import LCPServer, LCPServerSettings -from core.lcp.credential import LCPCredentialFactory -from core.mirror import MirrorUploader -from core.model import Collection, ExternalIntegration -from core.model.collection import HasExternalIntegrationPerCollection -from core.model.configuration import ConfigurationAttributeType, ConfigurationMetadata -from core.s3 import MinIOUploader, MinIOUploaderConfiguration, S3UploaderConfiguration - - -class LCPMirrorConfiguration(S3UploaderConfiguration): - endpoint_url = ConfigurationMetadata( - key=MinIOUploaderConfiguration.endpoint_url.key, - label=_("Endpoint URL"), - description=_("S3 endpoint URL"), - type=ConfigurationAttributeType.TEXT, - required=False, - ) - - -class LCPMirror(MinIOUploader, HasExternalIntegrationPerCollection): - """Implements LCP import workflow: - 1. Encrypts unencrypted books using lcpencrypt - 2. Sends encrypted books to the LCP License Server - 3. LCP License Server generates license metadata and uploads encrypted books to the encrypted_repository - """ - - NAME = ExternalIntegration.LCP - SETTINGS = [ - S3UploaderConfiguration.access_key.to_settings(), - S3UploaderConfiguration.secret_key.to_settings(), - S3UploaderConfiguration.protected_access_content_bucket.to_settings(), - S3UploaderConfiguration.s3_region.to_settings(), - S3UploaderConfiguration.s3_addressing_style.to_settings(), - S3UploaderConfiguration.s3_presigned_url_expiration.to_settings(), - S3UploaderConfiguration.url_template.to_settings(), - LCPMirrorConfiguration.endpoint_url.to_settings(), - ] - - def __init__(self, integration): - """Initializes a new instance of LCPMirror class - - :param integration: External integration containing mirror's properties - :type integration: ExternalIntegration - """ - super().__init__(integration) - - self._lcp_importer_instance = None - - def _create_lcp_importer(self, collection): - """Creates a new instance of LCPImporter - - :param collection: Collection object - :type collection: Collection - - :return: New instance of LCPImporter - :rtype: LCPImporter - """ - configuration = collection.integration_configuration - hasher_factory = HasherFactory() - credential_factory = LCPCredentialFactory() - lcp_encryptor = LCPEncryptor(configuration) - lcp_server = LCPServer( - lambda: LCPServerSettings(**configuration.settings_dict), - hasher_factory, - credential_factory, - ) - lcp_importer = LCPImporter(lcp_encryptor, lcp_server) - - return lcp_importer - - def collection_external_integration(self, collection): - """Returns an external integration associated with the collection - - :param collection: Collection - :type collection: core.model.Collection - - :return: External integration associated with the collection - :rtype: core.model.configuration.ExternalIntegration - """ - db = Session.object_session(collection) - external_integration = ( - db.query(ExternalIntegration) - .join(Collection) - .filter(Collection.id == collection.id) - .one() - ) - - return external_integration - - def cover_image_root(self, bucket, data_source, scaled_size=None): - raise NotImplementedError() - - def marc_file_root(self, bucket, library): - raise NotImplementedError() - - def book_url( - self, - identifier, - extension=".epub", - open_access=False, - data_source=None, - title=None, - ): - """Returns the path to the hosted EPUB file for the given identifier.""" - bucket = self.get_bucket( - S3UploaderConfiguration.OA_CONTENT_BUCKET_KEY - if open_access - else S3UploaderConfiguration.PROTECTED_CONTENT_BUCKET_KEY - ) - root = self.content_root(bucket) - book_url = root + self.key_join([identifier.identifier]) - - return book_url - - def cover_image_url(self, data_source, identifier, filename, scaled_size=None): - raise NotImplementedError() - - def marc_file_url(self, library, lane, end_time, start_time=None): - raise NotImplementedError() - - def mirror_one(self, representation, mirror_to, collection=None): - """Uploads an encrypted book to the encrypted_repository via LCP License Server - - :param representation: Book's representation - :type representation: Representation - - :param mirror_to: Mirror URL - :type mirror_to: string - - :param collection: Collection - :type collection: Optional[Collection] - """ - db = Session.object_session(representation) - bucket = self.get_bucket(S3UploaderConfiguration.PROTECTED_CONTENT_BUCKET_KEY) - content_root = self.content_root(bucket) - identifier = mirror_to.replace(content_root, "") - lcp_importer = self._create_lcp_importer(collection) - - # First, we need to copy unencrypted book's content to a temporary file - with tempfile.NamedTemporaryFile( - suffix=representation.extension(representation.media_type) - ) as temporary_file: - temporary_file.write(representation.content_fh().read()) - temporary_file.flush() - - # Secondly, we execute import: - # 1. Encrypt the temporary file containing the unencrypted book using lcpencrypt - # 2. Send the encrypted book to the LCP License Server - # 3. LCP License Server generates license metadata - # 4. LCP License Server uploads the encrypted book to the encrypted_repository (S3 or EFS) - lcp_importer.import_book(db, temporary_file.name, identifier) - - # Thirdly, we remove unencrypted content from the database - transaction = db.begin_nested() - representation.content = None - transaction.commit() - - def do_upload(self, representation): - raise NotImplementedError() - - -MirrorUploader.IMPLEMENTATION_REGISTRY[LCPMirror.NAME] = LCPMirror diff --git a/api/lcp/server.py b/api/lcp/server.py deleted file mode 100644 index 18fe56fcbb..0000000000 --- a/api/lcp/server.py +++ /dev/null @@ -1,357 +0,0 @@ -from __future__ import annotations - -import json -import os -import urllib.parse -from typing import TYPE_CHECKING, Callable, Optional - -import requests -from flask_babel import lazy_gettext as _ -from pydantic import PositiveInt -from requests.auth import HTTPBasicAuth - -from api.lcp import utils -from api.lcp.encrypt import LCPEncryptionResult, LCPEncryptorResultJSONEncoder -from api.lcp.hash import HasherFactory, HashingAlgorithm -from core.integration.settings import ( - BaseSettings, - ConfigurationFormItem, - ConfigurationFormItemType, - FormField, -) -from core.lcp.credential import ( - LCPCredentialFactory, - LCPHashedPassphrase, - LCPUnhashedPassphrase, -) - -if TYPE_CHECKING: - pass - - -class LCPServerConstants: - DEFAULT_PAGE_SIZE = 100 - DEFAULT_PASSPHRASE_HINT = ( - "If you do not remember your passphrase, please contact your administrator" - ) - DEFAULT_ENCRYPTION_ALGORITHM = HashingAlgorithm.SHA256.value - - -class LCPServerSettings(BaseSettings): - lcpserver_url: str = FormField( - form=ConfigurationFormItem( - label=_("LCP License Server's URL"), - description=_("URL of the LCP License Server"), - type=ConfigurationFormItemType.TEXT, - required=True, - ) - ) - - lcpserver_user: str = FormField( - form=ConfigurationFormItem( - label=_("LCP License Server's user"), - description=_("Name of the user used to connect to the LCP License Server"), - type=ConfigurationFormItemType.TEXT, - required=True, - ) - ) - - lcpserver_password: str = FormField( - form=ConfigurationFormItem( - label=_("LCP License Server's password"), - description=_( - "Password of the user used to connect to the LCP License Server" - ), - type=ConfigurationFormItemType.TEXT, - required=True, - ) - ) - - lcpserver_input_directory: str = FormField( - form=ConfigurationFormItem( - label=_("LCP License Server's input directory"), - description=_( - "Full path to the directory containing encrypted books. " - "This directory should be the same as lcpencrypt's output directory" - ), - type=ConfigurationFormItemType.TEXT, - required=True, - ) - ) - - lcpserver_page_size: Optional[PositiveInt] = FormField( - default=LCPServerConstants.DEFAULT_PAGE_SIZE, - form=ConfigurationFormItem( - label=_("LCP License Server's page size"), - description=_("Number of licences returned by the server"), - type=ConfigurationFormItemType.NUMBER, - required=False, - ), - ) - - provider_name: str = FormField( - form=ConfigurationFormItem( - label=_("LCP service provider's identifier"), - description=_("URI that identifies the provider in an unambiguous way"), - type=ConfigurationFormItemType.TEXT, - required=True, - ) - ) - - passphrase_hint: Optional[str] = FormField( - default=LCPServerConstants.DEFAULT_PASSPHRASE_HINT, - form=ConfigurationFormItem( - label=_("Passphrase hint"), - description=_("Hint proposed to the user for selecting their passphrase"), - type=ConfigurationFormItemType.TEXT, - required=False, - ), - ) - - encryption_algorithm: Optional[str] = FormField( - default=LCPServerConstants.DEFAULT_ENCRYPTION_ALGORITHM, - form=ConfigurationFormItem( - label=_("Passphrase encryption algorithm"), - description=_("Algorithm used for encrypting the passphrase"), - type=ConfigurationFormItemType.SELECT, - required=False, - options=ConfigurationFormItemType.options_from_enum(HashingAlgorithm), - ), - ) - - max_printable_pages: Optional[PositiveInt] = FormField( - form=ConfigurationFormItem( - label=_("Maximum number or printable pages"), - description=_( - "Maximum number of pages that can be printed over the lifetime of the license" - ), - type=ConfigurationFormItemType.NUMBER, - required=False, - ), - ) - - max_copiable_pages: Optional[PositiveInt] = FormField( - form=ConfigurationFormItem( - label=_("Maximum number or copiable characters"), - description=_( - "Maximum number of characters that can be copied to the clipboard" - ), - type=ConfigurationFormItemType.NUMBER, - required=False, - ), - ) - - -class LCPServer: - """Wrapper around LCP License Server's API""" - - def __init__( - self, - get_configuration: Callable[[], BaseSettings], - hasher_factory: HasherFactory, - credential_factory: LCPCredentialFactory, - ): - """Initializes a new instance of LCPServer class - - :param get_configuration: Factory responsible for providing configuration objects from the database - :param hasher_factory: Factory responsible for creating Hasher implementations - :param credential_factory: Factory responsible for creating Hasher implementations - """ - self.get_configuration = get_configuration - self._hasher_factory = hasher_factory - self._credential_factory = credential_factory - self._hasher_instance = None - - def _get_hasher(self): - """Returns a Hasher instance - - :return: Hasher instance - :rtype: hash.Hasher - """ - if self._hasher_instance is None: - self._hasher_instance = self._hasher_factory.create( - self.get_configuration().encryption_algorithm - ) - - return self._hasher_instance - - def _create_partial_license(self, db, patron, license_start=None, license_end=None): - """Creates a partial LCP license used an input by the LCP License Server for generation of LCP licenses - - :param patron: Patron object - :type patron: Patron - - :param license_start: Date and time when the license begins - :type license_start: Optional[datetime.datetime] - - :param license_end: Date and time when the license ends - :type license_end: Optional[datetime.datetime] - - :return: Partial LCP license - :rtype: Dict - """ - hasher = self._get_hasher() - unhashed_passphrase: LCPUnhashedPassphrase = ( - self._credential_factory.get_patron_passphrase(db, patron) - ) - hashed_passphrase: LCPHashedPassphrase = unhashed_passphrase.hash(hasher) - self._credential_factory.set_hashed_passphrase(db, patron, hashed_passphrase) - - config = self.get_configuration() - partial_license = { - "provider": config.provider_name, - "encryption": { - "user_key": { - "text_hint": config.passphrase_hint, - "hex_value": hashed_passphrase.hashed, - } - }, - } - - if patron: - partial_license["user"] = { - "id": self._credential_factory.get_patron_id(db, patron) - } - - rights_fields = [ - license_start, - license_end, - config.max_printable_pages, - config.max_copiable_pages, - ] - - if any( - [ - rights_field is not None and rights_field != "" - for rights_field in rights_fields - ] - ): - partial_license["rights"] = {} - - if license_start: - partial_license["rights"]["start"] = utils.format_datetime(license_start) - if license_end: - partial_license["rights"]["end"] = utils.format_datetime(license_end) - if config.max_printable_pages is not None and config.max_printable_pages != "": - partial_license["rights"]["print"] = int(config.max_printable_pages) - if config.max_copiable_pages is not None and config.max_copiable_pages != "": - partial_license["rights"]["copy"] = int(config.max_copiable_pages) - - return partial_license - - @staticmethod - def _send_request(configuration, method, path, payload, json_encoder=None): - """Sends a request to the LCP License Server - - :param path: URL path part - :type path: string - - :param payload: Dictionary containing request's payload (should be JSON compatible) - :type payload: Union[Dict, object] - - :param json_encoder: JSON encoder - :type json_encoder: JSONEncoder - - :return: Dictionary containing LCP License Server's response - :rtype: Dict - """ - json_payload = json.dumps(payload, cls=json_encoder) - url = urllib.parse.urljoin(configuration.lcpserver_url, path) - response = requests.request( - method, - url, - data=json_payload, - headers={"Content-Type": "application/json"}, - auth=HTTPBasicAuth( - configuration.lcpserver_user, configuration.lcpserver_password - ), - ) - - response.raise_for_status() - - return response - - def add_content(self, db, encrypted_content): - """Notifies LCP License Server about new encrypted content - - :param db: Database session - :type db: sqlalchemy.orm.session.Session - - :param encrypted_content: LCPEncryptionResult object containing information about encrypted content - :type encrypted_content: LCPEncryptionResult - """ - config = self.get_configuration() - content_location = os.path.join( - config.lcpserver_input_directory, - encrypted_content.protected_content_disposition, - ) - payload = LCPEncryptionResult( - content_id=encrypted_content.content_id, - content_encryption_key=encrypted_content.content_encryption_key, - protected_content_location=content_location, - protected_content_disposition=encrypted_content.protected_content_disposition, - protected_content_type=encrypted_content.protected_content_type, - protected_content_length=encrypted_content.protected_content_length, - protected_content_sha256=encrypted_content.protected_content_sha256, - ) - path = f"/contents/{encrypted_content.content_id}" - - self._send_request(config, "put", path, payload, LCPEncryptorResultJSONEncoder) - - def generate_license(self, db, content_id, patron, license_start, license_end): - """Generates a new LCP license - - :param db: Database session - :type db: sqlalchemy.orm.session.Session - - :param content_id: Unique content ID - :type content_id: string - - :param patron: Patron object - :type patron: Patron - - :param license_start: Unique patron ID - :type license_start: string - - :param license_start: Date and time when the license begins - :type license_start: datetime.datetime - - :param license_end: Date and time when the license ends - :type license_end: datetime.datetime - - :return: LCP license - :rtype: Dict - """ - partial_license_payload = self._create_partial_license( - db, patron, license_start, license_end - ) - path = f"contents/{content_id}/license" - response = self._send_request( - self.get_configuration(), "post", path, partial_license_payload - ) - - return response.json() - - def get_license(self, db, license_id, patron): - """Returns an existing license - - :param db: Database session - :type db: sqlalchemy.orm.session.Session - - :param license_id: License's ID - :type license_id: int - - :param patron: Patron object - :type patron: Patron - - :return: Existing license - :rtype: string - """ - partial_license_payload = self._create_partial_license(db, patron) - path = f"licenses/{license_id}" - - response = self._send_request( - self.get_configuration(), "post", path, partial_license_payload - ) - - return response.json() diff --git a/api/lcp/utils.py b/api/lcp/utils.py deleted file mode 100644 index 89a37153c9..0000000000 --- a/api/lcp/utils.py +++ /dev/null @@ -1,49 +0,0 @@ -from core.lcp.exceptions import LCPError - - -def format_datetime(datetime_value): - """Converts a datetime value into a string using the format which Go understands - - :param datetime_value: Datetime value - :type datetime_value: datetime.datetime - - :return: String representation of the datetime value - :rtype: string - """ - datetime_string_value = datetime_value.strftime("%Y-%m-%dT%H:%M:%S") - - # NOTE: Go can parse only strings where the timezone contains a colon (e.g., -07:00) - # Unfortunately, Python doesn't support such format and we have to do it manually - # We assume that all the dates are in UTC - datetime_string_value += "+00:00" - - return datetime_string_value - - -def get_target_extension(input_extension): - if input_extension == ".epub": - target_extension = ".epub" - elif input_extension == ".pdf": - target_extension = ".lcpdf" - elif input_extension == ".lpf": - target_extension = ".audiobook" - elif input_extension == ".audiobook": - target_extension = ".audiobook" - else: - raise LCPError(f'Unknown extension "{input_extension}"') - - return target_extension - - -def bind_method(instance, func, as_name=None): - """Bind the function *func* to *instance*, with either provided name *as_name* - or the existing name of *func*. The provided *func* should accept the - instance as the first argument, i.e. "self". - """ - if as_name is None: - as_name = func.__name__ - - bound_method = func.__get__(instance, instance.__class__) - setattr(instance, as_name, bound_method) - - return bound_method diff --git a/api/routes.py b/api/routes.py index 840bd4812a..ed07b6c1c7 100644 --- a/api/routes.py +++ b/api/routes.py @@ -140,42 +140,6 @@ def decorated(*args, **kwargs): return decorated -def has_library_through_external_loan_identifier( - parameter_name="external_loan_identifier", -): - """Decorator to get a library using the loan's external identifier. - - :param parameter_name: Name of the parameter holding the loan's external identifier - :type parameter_name: string - - :return: Decorated function - :rtype: Callable - """ - - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - if parameter_name in kwargs: - external_loan_identifier = kwargs[parameter_name] - else: - external_loan_identifier = None - - library = ( - app.manager.index_controller.library_through_external_loan_identifier( - external_loan_identifier - ) - ) - - if isinstance(library, ProblemDetail): - return library.response - else: - return func(*args, **kwargs) - - return wrapper - - return decorator - - def allows_library(f): """Decorator similar to @has_library but if there is no library short name, then don't set the request library. @@ -650,23 +614,6 @@ def saml_callback(): ) -@app.route("//lcp/licenses//hint") -@app.route("//lcp/licenses//hint") -@has_library_through_external_loan_identifier(parameter_name="license_id") -@requires_auth -@returns_problem_detail -def lcp_passphrase(collection_name, license_id): - return app.manager.lcp_controller.get_lcp_passphrase() - - -@app.route("//lcp/licenses/") -@has_library_through_external_loan_identifier(parameter_name="license_id") -@requires_auth -@returns_problem_detail -def lcp_license(collection_name, license_id): - return app.manager.lcp_controller.get_lcp_license(collection_name, license_id) - - # Loan notifications for ODL distributors, eg. Feedbooks @library_route("/odl_notify/", methods=["GET", "POST"]) @has_library diff --git a/api/s3_analytics_provider.py b/api/s3_analytics_provider.py index d3182e32d2..f294cc3a5d 100644 --- a/api/s3_analytics_provider.py +++ b/api/s3_analytics_provider.py @@ -151,7 +151,10 @@ def _create_event_object( "patrons_in_hold_queue": license_pool.patrons_in_hold_queue if license_pool else None, - "self_hosted": license_pool.self_hosted if license_pool else None, + # TODO: We no longer support self-hosted books, so this should always be False. + # this value is still included in the response for backwards compatibility, + # but should be removed in a future release. + "self_hosted": False, "title": work.title if work else None, "author": work.author if work else None, "series": work.series if work else None, diff --git a/core/lane.py b/core/lane.py index 89b2df9ffc..ff57db7bd5 100644 --- a/core/lane.py +++ b/core/lane.py @@ -812,7 +812,6 @@ def modify_database_query(self, _db, qu): available_now = or_( LicensePool.open_access == True, - LicensePool.self_hosted == True, LicensePool.unlimited_access, LicensePool.licenses_available > 0, ) @@ -822,13 +821,10 @@ def modify_database_query(self, _db, qu): elif self.availability == self.AVAILABLE_ALL: availability_clause = or_( LicensePool.open_access == True, - LicensePool.self_hosted == True, LicensePool.licenses_owned > 0, LicensePool.unlimited_access, ) elif self.availability == self.AVAILABLE_OPEN_ACCESS: - # TODO: self-hosted content could be allowed here - # depending on what exactly the wording is. availability_clause = LicensePool.open_access == True elif self.availability == self.AVAILABLE_NOT_NOW: # The book must be licensed but currently unavailable. diff --git a/core/model/collection.py b/core/model/collection.py index efdb80da37..df102b004d 100644 --- a/core/model/collection.py +++ b/core/model/collection.py @@ -892,7 +892,6 @@ def restrict_to_ready_deliverable_works( LicensePool.licenses_owned > 0, LicensePool.open_access, LicensePool.unlimited_access, - LicensePool.self_hosted, ) ) @@ -906,7 +905,6 @@ def restrict_to_ready_deliverable_works( or_( LicensePool.licenses_available > 0, LicensePool.open_access, - LicensePool.self_hosted, LicensePool.unlimited_access, ) ) diff --git a/core/model/configuration.py b/core/model/configuration.py index 96df379648..b5713b1e65 100644 --- a/core/model/configuration.py +++ b/core/model/configuration.py @@ -181,7 +181,6 @@ class ExternalIntegration(Base): FEEDBOOKS = DataSourceConstants.FEEDBOOKS ODL = "ODL" ODL2 = "ODL 2.0" - LCP = DataSourceConstants.LCP PROQUEST = DataSourceConstants.PROQUEST # These protocols were used on the Content Server when mirroring diff --git a/core/model/constants.py b/core/model/constants.py index 6590df0533..230b7f3944 100644 --- a/core/model/constants.py +++ b/core/model/constants.py @@ -40,7 +40,6 @@ class DataSourceConstants: FEEDBOOKS = "FeedBooks" BIBBLIO = "Bibblio" ENKI = "Enki" - LCP = "LCP" PROQUEST = "ProQuest" DEPRECATED_NAMES = {"3M": BIBLIOTHECA} diff --git a/core/model/licensing.py b/core/model/licensing.py index 6ef35eaaff..788fe095af 100644 --- a/core/model/licensing.py +++ b/core/model/licensing.py @@ -260,9 +260,6 @@ class LicensePool(Base): licenses_reserved = Column(Integer, default=0) patrons_in_hold_queue = Column(Integer, default=0) - # Set to True for collections imported using MirrorUploaded - self_hosted = Column(Boolean, index=True, nullable=False, default=False) - # This lets us cache the work of figuring out the best open access # link for this LicensePool. _open_access_download_url = Column("open_access_download_url", Unicode) diff --git a/core/model/listeners.py b/core/model/listeners.py index b3e4e5e7ed..2b46cfca7a 100644 --- a/core/model/listeners.py +++ b/core/model/listeners.py @@ -162,7 +162,6 @@ def licensepool_collection_change(target, value, oldvalue, initiator): @event.listens_for(LicensePool.open_access, "set") -@event.listens_for(LicensePool.self_hosted, "set") def licensepool_storage_status_change(target, value, oldvalue, initiator): """A Work may need to have its search document re-indexed if one of its LicensePools changes its open-access status. diff --git a/core/model/work.py b/core/model/work.py index 4621d02932..3dd8f8edef 100644 --- a/core/model/work.py +++ b/core/model/work.py @@ -1216,7 +1216,7 @@ def active_license_pool(self, library: Library | None = None) -> LicensePool | N # We have an unlimited source for this book. # There's no need to keep looking. break - elif p.unlimited_access or p.self_hosted: + elif p.unlimited_access: active_license_pool = p elif ( edition and edition.title and p.licenses_owned and p.licenses_owned > 0 @@ -1693,24 +1693,15 @@ def _set_value(parent, key, target): if doc.license_pools: for item in doc.license_pools: if not ( - item.open_access - or item.unlimited_access - or item.self_hosted - or item.licenses_owned > 0 + item.open_access or item.unlimited_access or item.licenses_owned > 0 ): continue lc: dict = {} _set_value(item, "licensepools", lc) # lc["availability_time"] = getattr(item, "availability_time").timestamp() - lc["available"] = ( - item.unlimited_access - or item.self_hosted - or item.licenses_available > 0 - ) - lc["licensed"] = ( - item.unlimited_access or item.self_hosted or item.licenses_owned > 0 - ) + lc["available"] = item.unlimited_access or item.licenses_available > 0 + lc["licensed"] = item.unlimited_access or item.licenses_owned > 0 if doc.presentation_edition: lc["medium"] = doc.presentation_edition.medium lc["licensepool_id"] = item.id @@ -1895,7 +1886,6 @@ def explicit_bool(label, t): "available", or_( LicensePool.unlimited_access, - LicensePool.self_hosted, LicensePool.licenses_available > 0, ), ), @@ -1903,7 +1893,6 @@ def explicit_bool(label, t): "licensed", or_( LicensePool.unlimited_access, - LicensePool.self_hosted, LicensePool.licenses_owned > 0, ), ), @@ -1922,7 +1911,6 @@ def explicit_bool(label, t): or_( LicensePool.open_access, LicensePool.unlimited_access, - LicensePool.self_hosted, LicensePool.licenses_owned > 0, ), ) diff --git a/core/opds.py b/core/opds.py index 962eaa2c8d..942bf29114 100644 --- a/core/opds.py +++ b/core/opds.py @@ -1794,7 +1794,6 @@ def license_tags(cls, license_pool, loan, hold): elif ( license_pool.open_access or license_pool.unlimited_access - or license_pool.self_hosted or (license_pool.licenses_available > 0 and license_pool.licenses_owned > 0) ): status = "available" @@ -1811,11 +1810,7 @@ def license_tags(cls, license_pool, loan, hold): tags.append(availability_tag) # Open-access pools do not need to display or . - if ( - license_pool.open_access - or license_pool.unlimited_access - or license_pool.self_hosted - ): + if license_pool.open_access or license_pool.unlimited_access: return tags holds_kw = dict() diff --git a/docker/Dockerfile.baseimage b/docker/Dockerfile.baseimage index 6fe06a4d69..7353686365 100644 --- a/docker/Dockerfile.baseimage +++ b/docker/Dockerfile.baseimage @@ -3,12 +3,6 @@ # image is a long process, and we don't want to wait for it to build every time # we push a change to the code base. -############################################################################### -# This is a builder image that is used to build the lcpencrypt binary. -FROM golang:1.17 AS lcp-builder - -RUN go get -v github.com/readium/readium-lcp-server/lcpencrypt - ############################################################################### # This is the main base image build. It is based on phusion/baseimage, which is # a minimal Ubuntu image. Eventually I'd like to switch to using the official @@ -19,9 +13,6 @@ RUN go get -v github.com/readium/readium-lcp-server/lcpencrypt # https://github.com/phusion/baseimage-docker FROM phusion/baseimage:focal-1.2.0 As baseimage -# Copy LCP binary from builder into image. -COPY --from=lcp-builder /go/bin/lcpencrypt /go/bin/lcpencrypt - # Make sure base system is up to date RUN apt-get update && \ apt-get upgrade -y --no-install-recommends -o Dpkg::Options::="--force-confold" && \ diff --git a/tests/api/admin/controller/test_admin_search_controller.py b/tests/api/admin/controller/test_admin_search_controller.py index c13048c7da..6d34f0d19e 100644 --- a/tests/api/admin/controller/test_admin_search_controller.py +++ b/tests/api/admin/controller/test_admin_search_controller.py @@ -143,16 +143,3 @@ def test_different_license_types(self, admin_search_fixture: AdminSearchFixture) ) assert "Horror" in response["genres"] assert "Spanish" in response["languages"] - - # Same goes for self hosted titles - pool.open_access = False - pool.self_hosted = True - with admin_search_fixture.admin_ctrl_fixture.request_context_with_library_and_admin( - "/", - library=admin_search_fixture.admin_ctrl_fixture.ctrl.db.default_library(), - ): - response = ( - admin_search_fixture.manager.admin_search_controller.search_field_values() - ) - assert "Horror" in response["genres"] - assert "Spanish" in response["languages"] diff --git a/tests/api/lcp/__init__.py b/tests/api/lcp/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/api/lcp/lcp_strings.py b/tests/api/lcp/lcp_strings.py deleted file mode 100644 index 25bb83e4f0..0000000000 --- a/tests/api/lcp/lcp_strings.py +++ /dev/null @@ -1,138 +0,0 @@ -EXISTING_BOOK_FILE_PATH = "/books/ebook.epub" -NOT_EXISTING_BOOK_FILE_PATH = "/books/notexistingbook.epub" - -BOOK_IDENTIFIER = "EBOOK" - -CONTENT_ENCRYPTION_KEY = "+RulyN2G8MfAahNEO/Xz0TwBT5xMzvbFFHqqWGPrO3M=" -PROTECTED_CONTENT_LOCATION = ( - "/opt/readium/files/encrypted/1f162bc2-be6f-42a9-8153-96d675418ff1.epub" -) -PROTECTED_CONTENT_DISPOSITION = "1f162bc2-be6f-42a9-8153-96d675418ff1.epub" -PROTECTED_CONTENT_TYPE = "application/epub+zip" -PROTECTED_CONTENT_LENGTH = 798385 -PROTECTED_CONTENT_SHA256 = ( - "e058281cbc11bae29451e5e2c8003efa1164c3f6dde6dcc003c8bb79e2acb88f" -) - - -LCPENCRYPT_NOT_EXISTING_DIRECTORY_RESULT = """Error opening input file, for more information type 'lcpencrypt -help' ; level 30 -open {}: no such file or directory -""".format( - NOT_EXISTING_BOOK_FILE_PATH -) - -LCPENCRYPT_FAILED_ENCRYPTION_RESULT = """{{ - "content-id": "{0}", - "content-encryption-key": null, - "protected-content-location": "{1}", - "protected-content-length": null, - "protected-content-sha256": null, - "protected-content-disposition": "{2}" -}} -Encryption was successful -""".format( - BOOK_IDENTIFIER, PROTECTED_CONTENT_LOCATION, NOT_EXISTING_BOOK_FILE_PATH -) - -LCPENCRYPT_SUCCESSFUL_ENCRYPTION_RESULT = """{{ - "content-id": "{0}", - "content-encryption-key": "{1}", - "protected-content-location": "{2}", - "protected-content-length": {3}, - "protected-content-sha256": "{4}", - "protected-content-disposition": "{5}", - "protected-content-type": "{6}" -}} -Encryption was successful -""".format( - BOOK_IDENTIFIER, - CONTENT_ENCRYPTION_KEY, - PROTECTED_CONTENT_LOCATION, - PROTECTED_CONTENT_LENGTH, - PROTECTED_CONTENT_SHA256, - PROTECTED_CONTENT_DISPOSITION, - PROTECTED_CONTENT_TYPE, -) - -LCPENCRYPT_FAILED_LCPSERVER_NOTIFICATION = """Error notifying the License Server; level 60 -lcp server error 401""" - -LCPENCRYPT_SUCCESSFUL_NOTIFICATION_RESULT = """License Server was notified -{{ - "content-id": "{0}", - "content-encryption-key": "{1}", - "protected-content-location": "{2}", - "protected-content-length": {3}, - "protected-content-sha256": "{4}", - "protected-content-disposition": "{5}", - "protected-content-type": "{6}" -}} -Encryption was successful -""".format( - BOOK_IDENTIFIER, - CONTENT_ENCRYPTION_KEY, - PROTECTED_CONTENT_LOCATION, - PROTECTED_CONTENT_LENGTH, - PROTECTED_CONTENT_SHA256, - PROTECTED_CONTENT_DISPOSITION, - PROTECTED_CONTENT_TYPE, -) - - -LCPSERVER_LICENSE = """ -{ - "provider": "http://circulation.manager", - "id": "e99be177-4902-426a-9b96-0872ae877e2f", - "issued": "2020-08-18T15:04:39Z", - "encryption": { - "profile": "http://readium.org/lcp/basic-profile", - "content_key": { - "algorithm": "http://www.w3.org/2001/04/xmlenc#aes256-cbc", - "encrypted_value": "rYjD9ijFELcraQvdeChvvI21ceHwF3XXN6e4tQpoCbDnnekb9UeGZVlocqANwJ28S0QnJPQk0EnDD6KEIS4dzw==" - }, - "user_key": { - "algorithm": "http://www.w3.org/2001/04/xmlenc#sha256", - "text_hint": "Not very helpful hint", - "key_check": "zf2gU5H8+JIYVbJB2AyotuAq+Fc6xQo85bkhqtWqIU4EVzewwv6HdHgUXvRZB+zp1yZdCTlQvbhA4SQv5oydCQ==" - } - }, - "links": [{ - "rel": "hint", - "href": "http://testfrontend:8991/static/hint.html" - }, { - "rel": "publication", - "href": "http://localhost:9000/books/9780231543973", - "type": "application/pdf+lcp", - "title": "9780231543973.lcpdf", - "length": 1703749, - "hash": "6657273fe78fb29472a0027c08254f57e58b61fe435c30978c00aacd55247bfd" - }, { - "rel": "status", - "href": "http://lsdserver:8990/licenses/e99be177-4902-426a-9b96-0872ae877e2f/status", - "type": "application/vnd.readium.license.status.v1.0+json" - }], - "user": { - "id": "1" - }, - "rights": { - "print": 10, - "copy": 2048, - "start": "2020-08-18T15:04:38Z", - "end": "2020-09-08T15:04:38Z" - }, - "signature": { - "certificate": "MIIFpTCCA42gAwIBAgIBATANBgkqhkiG9w0BAQsFADBnMQswCQYDVQQGEwJGUjEOMAwGA1UEBxMFUGFyaXMxDzANBgNVBAoTBkVEUkxhYjESMBAGA1UECxMJTENQIFRlc3RzMSMwIQYDVQQDExpFRFJMYWIgUmVhZGl1bSBMQ1AgdGVzdCBDQTAeFw0xNjAzMjUwMzM3MDBaFw0yNjAzMjMwNzM3MDBaMIGQMQswCQYDVQQGEwJGUjEOMAwGA1UEBxMFUGFyaXMxDzANBgNVBAoTBkVEUkxhYjESMBAGA1UECxMJTENQIFRlc3RzMSIwIAYDVQQDExlUZXN0IHByb3ZpZGVyIGNlcnRpZmljYXRlMSgwJgYJKoZIhvcNAQkBFhlsYXVyZW50LmxlbWV1ckBlZHJsYWIub3JnMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAq/gFXdvKb+EOzsEkHcoSOcPQmNzivzf+9NOJcxWi1/BwuxqAAPv+4LKoLz89U1xx5TE1swL11BsEkIdVYrjl1RiYRa8YV4bb4xyMTm8lm39P16H1fG7Ep8yyoVuN6LT3WT2xHGp2jYU8I2nW78cyYApAWAuiMc3epeIOxC2mKgf1pGnaX9j5l/Rx8hhxULqoHIHpR8e1eVRC7tgAz4Oy5qeLxGoL4S+GK/11eRlDO37whAWaMRbPnJDqqi8Z0Beovf6jmdoUTJdcPZZ9kFdtPsWjPNNHDldPuJBtCd7lupc0K4pClJSqtJKyxs05Yeb1j7kbs/i3grdlUcxz0zOaPN1YzrzOO7GLEWUnIe+LwVXAeUseHedOexITyDQXXCqMoQw/BC6ApGzR0FynC6ojq98tStYGJAGbKBN/9p20CvYf4/hmPU3fFkImWguPIoeJT//0rz+nSynykeEVtORRIcdyOnX2rL03xxBW7qlTlUXOfQk5oLIWXBW9Z2Q63MPWi8jQhSI0jC12iEqCT54xKRHNWKr04at9pJL85M0bDCbBH/jJ+AIbVx02ewtXcWgWTgK9vgSPN5kRCwIGaV9PMS193KHfNpGqV45EKrfP8U2nvNDeyqLqAN5847ABSW7UmA5Kj/x5uGxIWu9MUKjZlT0FpepswFvMMo1InLHANMcCAwEAAaMyMDAwDAYDVR0TAQH/BAIwADALBgNVHQ8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDQYJKoZIhvcNAQELBQADggIBAEGAqzHsCbrfQwlWas3q66FG/xbiOYQxpngA4CZWKQzJJDyOFgWEihW+H6NlSIH8076srpIZByjEGXZfOku4NH4DGNOj6jQ9mEfEwbrvCoEVHQf5YXladXpKqZgEB2FKeJVjC7yplelBtjBpSo23zhG/o3/Bj7zRySL6gUCewn7z/DkxM6AshDE4HKQxjxp7stpESev+0VTL813WXvwzmucr94H1VPrasFyVzQHj4Ib+Id1OAmgfzst0vSZyX6bjAuiN9yrs7wze5cAYTaswWr7GAnAZ/r1Z3PiDp50qaGRhHqJ+lRAhihpFP+ZjsYWRqnxZnDzJkJ6RZAHi2a3VN8x5WhOUMTf3JZcFVheDmA4SaEjAZAHU8zUxx1Fstjc8GJcjTwWxCsVM2aREBKXAYDhPTVLRKt6PyQxB0GxjDZZSvGI9uXn6S5wvjuE4T2TUwbJeGHqJr4FNpXVQ2XNww+sV2QSiAwrlORm8HNXqavj4rqz1PkUySXJ6b7zbjZoiACq4C7zb70tRYDyCfLTYtaTL3UK2Sa9ePSl0Fe6QfcqlGjalrqOo4GI6oqbAIkIXocHHksbLx0mIMSEWQOax+DqXhsl8tNGVwa5EiUSy83Sc0LyYXoWA35q8dugbkeNnY94rNG/hYKeci1VHhyg4rqxEeVwfBx121JqQSs+hHGKt", - "value": "pbfPRtb4oDT+1Q8nVrZuFrP/uCFqDG+/+jC3pUJfp+iLU+cBVWNCmciADVuq25UkpNOdiTAre8Xjglz1WVV+2AZjiLEaKjQZN0kjYLFjxSC67vUcHc6g5KpAQQTHSbjed5LAjShJWeVkIGQxQFP1a1o+cky8y1tzzRWoZZjCQHTj2ob621cAYgw39z2mj+oKm/vPIYbCrIlahSvjBMCOkWTOoRNZIuqnapRUv25OB9JQeqJzvotTOQvoxZpFg5q3EEmkZAIW55u6XBRaP9CvIAlDuCzevOVT1CojeyVPlP2nWs8b9oBp77S/SYEK0ZYMWMQ0S4LnAB8CNHdGEmF4+jvhrAAOgwpsiMRH0eMQAGZnzPUSKYIr/RqSd7Mp53nFn4a18dGcBgRxipCnVPafU+B7HwWcvkYBu4idlN3tFH1fjPl18yz0qHa8+RlTIyyw73CGQ8SUAY87BLO8tmBKihP+FePqnPX1Fbp6MprI6K4/GkWZoOe3n1oauVLIe7T0CRsA5rar2loUlIJsfESDj5tFnSh4UOeHA0ewHrzDS2qtdFL7sREZ/CnlDJPr0wuZB+uAyECrWe5FuQpEiSP2vxi9ROvTeZuUhphVghFPvBwunlL3AB/6GXkbnlKSJUAb3wiRNWk3r0ilVu9ORSsdq00IzShHGyy8DMVP+5dXSU4=", - "algorithm": "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256" - } -} -""" - -LCPSERVER_URL = "http://localhost:8989" -LCPSERVER_USER = "lcp" -LCPSERVER_PASSWORD = "secretpassword" -LCPSERVER_INPUT_DIRECTORY = "/opt/readium/encrypted" - -CONTENT_ID = "1" -TEXT_HINT = "Not very helpful hint" -PROVIDER_NAME = "http://circulation.manager" diff --git a/tests/api/lcp/test_collection.py b/tests/api/lcp/test_collection.py deleted file mode 100644 index 2aea735dc2..0000000000 --- a/tests/api/lcp/test_collection.py +++ /dev/null @@ -1,355 +0,0 @@ -import datetime -import json -from unittest.mock import MagicMock, create_autospec, patch - -import pytest -from freezegun import freeze_time - -from api.lcp.collection import LCPAPI, LCPFulfilmentInfo -from api.lcp.server import LCPServer, LCPServerConstants -from core.model import DataSource, ExternalIntegration -from core.model.configuration import HasExternalIntegration -from core.util.datetime_helpers import utc_now -from tests.api.lcp import lcp_strings -from tests.fixtures.database import DatabaseTransactionFixture - - -class LCPAPIFixture: - def __init__(self, db: DatabaseTransactionFixture): - self.db = db - self.lcp_collection = self.db.collection(protocol=ExternalIntegration.LCP) - self.integration = self.lcp_collection.external_integration - - integration_association = create_autospec(spec=HasExternalIntegration) - integration_association.external_integration = MagicMock( - return_value=self.integration - ) - - -@pytest.fixture(scope="function") -def lcp_api_fixture(db: DatabaseTransactionFixture) -> LCPAPIFixture: - return LCPAPIFixture(db) - - -class TestLCPAPI: - @freeze_time("2020-01-01 00:00:00") - def test_checkout_without_existing_loan(self, lcp_api_fixture): - # Arrange - lcp_api = LCPAPI(lcp_api_fixture.db.session, lcp_api_fixture.lcp_collection) - patron = lcp_api_fixture.db.patron() - days = lcp_api_fixture.lcp_collection.default_loan_period(patron.library) - start_date = utc_now() - end_date = start_date + datetime.timedelta(days=days) - data_source = DataSource.lookup( - lcp_api_fixture.db.session, DataSource.LCP, autocreate=True - ) - data_source_name = data_source.name - edition = lcp_api_fixture.db.edition( - data_source_name=data_source_name, identifier_id=lcp_strings.CONTENT_ID - ) - license_pool = lcp_api_fixture.db.licensepool( - edition=edition, - data_source_name=data_source_name, - collection=lcp_api_fixture.lcp_collection, - ) - lcp_license = json.loads(lcp_strings.LCPSERVER_LICENSE) - lcp_server_mock = create_autospec(spec=LCPServer) - lcp_server_mock.generate_license = MagicMock(return_value=lcp_license) - - configuration = lcp_api_fixture.lcp_collection.integration_configuration - - with patch("api.lcp.collection.LCPServer") as lcp_server_constructor: - lcp_server_constructor.return_value = lcp_server_mock - - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_url", lcp_strings.LCPSERVER_URL - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_user", lcp_strings.LCPSERVER_USER - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_password", lcp_strings.LCPSERVER_PASSWORD - ) - DatabaseTransactionFixture.set_settings( - configuration, - "lcpserver_input_directory", - lcp_strings.LCPSERVER_INPUT_DIRECTORY, - ) - DatabaseTransactionFixture.set_settings( - configuration, "provider_name", lcp_strings.PROVIDER_NAME - ) - DatabaseTransactionFixture.set_settings( - configuration, "passphrase_hint", lcp_strings.TEXT_HINT - ) - DatabaseTransactionFixture.set_settings( - configuration, - "encryption_algorithm", - LCPServerConstants.DEFAULT_ENCRYPTION_ALGORITHM, - ) - - # Act - loan = lcp_api.checkout(patron, "pin", license_pool, "internal format") - - # Assert - assert loan.collection_id == lcp_api_fixture.lcp_collection.id - assert ( - loan.collection(lcp_api_fixture.db.session) - == lcp_api_fixture.lcp_collection - ) - assert loan.license_pool(lcp_api_fixture.db.session) == license_pool - assert loan.data_source_name == data_source_name - assert loan.identifier_type == license_pool.identifier.type - assert loan.external_identifier == lcp_license["id"] - assert loan.start_date == start_date - assert loan.end_date == end_date - - lcp_server_mock.generate_license.assert_called_once_with( - lcp_api_fixture.db.session, - lcp_strings.CONTENT_ID, - patron, - start_date, - end_date, - ) - - @freeze_time("2020-01-01 00:00:00") - def test_checkout_with_existing_loan(self, lcp_api_fixture): - # Arrange - lcp_api = LCPAPI(lcp_api_fixture.db.session, lcp_api_fixture.lcp_collection) - patron = lcp_api_fixture.db.patron() - days = lcp_api_fixture.lcp_collection.default_loan_period(patron.library) - start_date = utc_now() - end_date = start_date + datetime.timedelta(days=days) - data_source = DataSource.lookup( - lcp_api_fixture.db.session, DataSource.LCP, autocreate=True - ) - data_source_name = data_source.name - edition = lcp_api_fixture.db.edition( - data_source_name=data_source_name, identifier_id=lcp_strings.CONTENT_ID - ) - license_pool = lcp_api_fixture.db.licensepool( - edition=edition, - data_source_name=data_source_name, - collection=lcp_api_fixture.lcp_collection, - ) - lcp_license = json.loads(lcp_strings.LCPSERVER_LICENSE) - lcp_server_mock = create_autospec(spec=LCPServer) - lcp_server_mock.get_license = MagicMock(return_value=lcp_license) - loan_identifier = "e99be177-4902-426a-9b96-0872ae877e2f" - - license_pool.loan_to(patron, external_identifier=loan_identifier) - - configuration = lcp_api_fixture.lcp_collection.integration_configuration - with patch("api.lcp.collection.LCPServer") as lcp_server_constructor: - lcp_server_constructor.return_value = lcp_server_mock - - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_url", lcp_strings.LCPSERVER_URL - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_user", lcp_strings.LCPSERVER_USER - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_password", lcp_strings.LCPSERVER_PASSWORD - ) - DatabaseTransactionFixture.set_settings( - configuration, - "lcpserver_input_directory", - lcp_strings.LCPSERVER_INPUT_DIRECTORY, - ) - DatabaseTransactionFixture.set_settings( - configuration, "provider_name", lcp_strings.PROVIDER_NAME - ) - DatabaseTransactionFixture.set_settings( - configuration, "passphrase_hint", lcp_strings.TEXT_HINT - ) - DatabaseTransactionFixture.set_settings( - configuration, - "encryption_algorithm", - LCPServerConstants.DEFAULT_ENCRYPTION_ALGORITHM, - ) - - # Act - loan = lcp_api.checkout(patron, "pin", license_pool, "internal format") - - # Assert - assert loan.collection_id == lcp_api_fixture.lcp_collection.id - assert ( - loan.collection(lcp_api_fixture.db.session) - == lcp_api_fixture.lcp_collection - ) - assert loan.license_pool(lcp_api_fixture.db.session) == license_pool - assert loan.data_source_name == data_source_name - assert loan.identifier_type == license_pool.identifier.type - assert loan.external_identifier == loan_identifier - assert loan.start_date == start_date - assert loan.end_date == end_date - - lcp_server_mock.get_license.assert_called_once_with( - lcp_api_fixture.db.session, loan_identifier, patron - ) - - @freeze_time("2020-01-01 00:00:00") - def test_fulfil(self, lcp_api_fixture): - # Arrange - lcp_api = LCPAPI(lcp_api_fixture.db.session, lcp_api_fixture.lcp_collection) - patron = lcp_api_fixture.db.patron() - days = lcp_api_fixture.lcp_collection.default_loan_period(patron.library) - today = utc_now() - expires = today + datetime.timedelta(days=days) - data_source = DataSource.lookup( - lcp_api_fixture.db.session, DataSource.LCP, autocreate=True - ) - data_source_name = data_source.name - license_pool = lcp_api_fixture.db.licensepool( - edition=None, - data_source_name=data_source_name, - collection=lcp_api_fixture.lcp_collection, - ) - lcp_license = json.loads(lcp_strings.LCPSERVER_LICENSE) - lcp_server_mock = create_autospec(spec=LCPServer) - lcp_server_mock.get_license = MagicMock(return_value=lcp_license) - - configuration = lcp_api_fixture.lcp_collection.integration_configuration - with patch("api.lcp.collection.LCPServer") as lcp_server_constructor: - lcp_server_constructor.return_value = lcp_server_mock - - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_url", lcp_strings.LCPSERVER_URL - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_user", lcp_strings.LCPSERVER_USER - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_password", lcp_strings.LCPSERVER_PASSWORD - ) - DatabaseTransactionFixture.set_settings( - configuration, - "lcpserver_input_directory", - lcp_strings.LCPSERVER_INPUT_DIRECTORY, - ) - - DatabaseTransactionFixture.set_settings( - configuration, "provider_name", lcp_strings.PROVIDER_NAME - ) - DatabaseTransactionFixture.set_settings( - configuration, "passphrase_hint", lcp_strings.TEXT_HINT - ) - DatabaseTransactionFixture.set_settings( - configuration, - "encryption_algorithm", - LCPServerConstants.DEFAULT_ENCRYPTION_ALGORITHM, - ) - - # Act - license_pool.loan_to( - patron, - start=today, - end=expires, - external_identifier=lcp_license["id"], - ) - fulfilment_info = lcp_api.fulfill( - patron, "pin", license_pool, "internal format" - ) - - # Assert - assert isinstance(fulfilment_info, LCPFulfilmentInfo) == True - assert fulfilment_info.collection_id == lcp_api_fixture.lcp_collection.id - assert ( - fulfilment_info.collection(lcp_api_fixture.db.session) - == lcp_api_fixture.lcp_collection - ) - assert ( - fulfilment_info.license_pool(lcp_api_fixture.db.session) == license_pool - ) - assert fulfilment_info.data_source_name == data_source_name - assert fulfilment_info.identifier_type == license_pool.identifier.type - - lcp_server_mock.get_license.assert_called_once_with( - lcp_api_fixture.db.session, lcp_license["id"], patron - ) - - def test_patron_activity_returns_correct_result(self, lcp_api_fixture): - # Arrange - lcp_api = LCPAPI(lcp_api_fixture.db.session, lcp_api_fixture.lcp_collection) - - # 1. Correct loan - patron = lcp_api_fixture.db.patron() - days = lcp_api_fixture.lcp_collection.default_loan_period(patron.library) - today = utc_now() - expires = today + datetime.timedelta(days=days) - data_source = DataSource.lookup( - lcp_api_fixture.db.session, DataSource.LCP, autocreate=True - ) - data_source_name = data_source.name - external_identifier = "1" - license_pool = lcp_api_fixture.db.licensepool( - edition=None, - data_source_name=data_source_name, - collection=lcp_api_fixture.lcp_collection, - ) - license_pool.loan_to( - patron, start=today, end=expires, external_identifier=external_identifier - ) - - # 2. Loan from a different collection - other_collection = lcp_api_fixture.db.collection( - protocol=ExternalIntegration.LCP - ) - other_external_identifier = "2" - other_license_pool = lcp_api_fixture.db.licensepool( - edition=None, data_source_name=data_source_name, collection=other_collection - ) - other_license_pool.loan_to( - patron, - start=today, - end=expires, - external_identifier=other_external_identifier, - ) - - # 3. Other patron's loan - other_patron = lcp_api_fixture.db.patron() - other_license_pool = lcp_api_fixture.db.licensepool( - edition=None, data_source_name=data_source_name, collection=other_collection - ) - other_license_pool.loan_to(other_patron, start=today, end=expires) - - # 4. Expired loan - other_license_pool = lcp_api_fixture.db.licensepool( - edition=None, - data_source_name=data_source_name, - collection=lcp_api_fixture.lcp_collection, - ) - other_license_pool.loan_to( - patron, start=today, end=today - datetime.timedelta(days=1) - ) - - # 5. Not started loan - other_license_pool = lcp_api_fixture.db.licensepool( - edition=None, - data_source_name=data_source_name, - collection=lcp_api_fixture.lcp_collection, - ) - other_license_pool.loan_to( - patron, - start=today + datetime.timedelta(days=1), - end=today + datetime.timedelta(days=2), - ) - - # Act - loans = lcp_api.patron_activity(patron, "pin") - - # Assert - assert len(loans) == 1 - - loan = loans[0] - assert loan.collection_id == lcp_api_fixture.lcp_collection.id - assert ( - loan.collection(lcp_api_fixture.db.session) - == lcp_api_fixture.lcp_collection - ) - assert loan.license_pool(lcp_api_fixture.db.session) == license_pool - assert loan.data_source_name == data_source_name - assert loan.identifier_type == license_pool.identifier.type - assert loan.external_identifier == external_identifier - assert loan.start_date == today - assert loan.end_date == expires diff --git a/tests/api/lcp/test_controller.py b/tests/api/lcp/test_controller.py deleted file mode 100644 index 6bf2b7d3b6..0000000000 --- a/tests/api/lcp/test_controller.py +++ /dev/null @@ -1,155 +0,0 @@ -import json -from unittest.mock import MagicMock, call, create_autospec, patch - -from flask import request - -from api.lcp.collection import LCPAPI -from api.lcp.controller import LCPController -from api.lcp.factory import LCPServerFactory -from api.lcp.server import LCPServer -from core.external_search import MockExternalSearchIndex -from core.lcp.credential import LCPCredentialFactory, LCPUnhashedPassphrase -from core.model import ExternalIntegration -from core.model.library import Library -from tests.api.lcp import lcp_strings -from tests.api.mockapi.circulation import MockCirculationAPI, MockCirculationManager -from tests.fixtures.api_controller import ControllerFixture - -manager_api_cls = dict( - circulationapi_cls=MockCirculationAPI, - externalsearch_cls=MockExternalSearchIndex, -) - - -class TestLCPController: - def test_get_lcp_passphrase_returns_the_same_passphrase_for_authenticated_patron( - self, controller_fixture: ControllerFixture - ): - # Arrange - expected_passphrase = LCPUnhashedPassphrase( - "1cde00b4-bea9-48fc-819b-bd17c578a22c" - ) - - with patch( - "api.lcp.controller.LCPCredentialFactory" - ) as credential_factory_constructor_mock: - credential_factory = create_autospec(spec=LCPCredentialFactory) - credential_factory.get_patron_passphrase = MagicMock( - return_value=expected_passphrase - ) - credential_factory_constructor_mock.return_value = credential_factory - - patron = controller_fixture.default_patron - manager = MockCirculationManager(controller_fixture.db.session) - controller = LCPController(manager) - controller.authenticated_patron_from_request = MagicMock( # type: ignore - return_value=patron - ) - - url = "http://circulationmanager.org/lcp/hint" - - with controller_fixture.app.test_request_context(url): - request.library: Library = controller_fixture.db.default_library() # type: ignore - - # Act - result1 = controller.get_lcp_passphrase() - result2 = controller.get_lcp_passphrase() - - # Assert - for result in [result1, result2]: - assert result.status_code == 200 - assert ("passphrase" in result.json) == True - assert result.json["passphrase"] == expected_passphrase.text - - credential_factory.get_patron_passphrase.assert_has_calls( - [ - call(controller_fixture.db.session, patron), - call(controller_fixture.db.session, patron), - ] - ) - - def test_get_lcp_license_returns_problem_detail_when_collection_is_missing( - self, controller_fixture - ): - # Arrange - missing_collection_name = "missing-collection" - license_id = "e99be177-4902-426a-9b96-0872ae877e2f" - expected_license = json.loads(lcp_strings.LCPSERVER_LICENSE) - lcp_server = create_autospec(spec=LCPServer) - lcp_server.get_license = MagicMock(return_value=expected_license) - library = controller_fixture.db.default_library() - lcp_collection = controller_fixture.db.collection( - LCPAPI.NAME, ExternalIntegration.LCP - ) - library.collections.append(lcp_collection) - - with patch( - "api.lcp.controller.LCPServerFactory" - ) as lcp_server_factory_constructor_mock: - lcp_server_factory = create_autospec(spec=LCPServerFactory) - lcp_server_factory.create = MagicMock(return_value=lcp_server) - lcp_server_factory_constructor_mock.return_value = lcp_server_factory - - patron = controller_fixture.default_patron - manager = MockCirculationManager(controller_fixture.db.session) - controller = LCPController(manager) - controller.authenticated_patron_from_request = MagicMock( - return_value=patron - ) - - url = "http://circulationmanager.org/{}/licenses{}".format( - missing_collection_name, license_id - ) - - with controller_fixture.app.test_request_context(url): - request.library = controller_fixture.db.default_library() - - # Act - result = controller.get_lcp_license(missing_collection_name, license_id) - - # Assert - assert result.status_code == 404 - - def test_get_lcp_license_returns_the_same_license_for_authenticated_patron( - self, controller_fixture - ): - # Arrange - license_id = "e99be177-4902-426a-9b96-0872ae877e2f" - expected_license = json.loads(lcp_strings.LCPSERVER_LICENSE) - lcp_server = create_autospec(spec=LCPServer) - lcp_server.get_license = MagicMock(return_value=expected_license) - library = controller_fixture.db.default_library() - lcp_collection = controller_fixture.db.collection( - LCPAPI.NAME, ExternalIntegration.LCP - ) - library.collections.append(lcp_collection) - - with patch( - "api.lcp.controller.LCPServerFactory" - ) as lcp_server_factory_constructor_mock: - lcp_server_factory = create_autospec(spec=LCPServerFactory) - lcp_server_factory.create = MagicMock(return_value=lcp_server) - lcp_server_factory_constructor_mock.return_value = lcp_server_factory - - patron = controller_fixture.default_patron - manager = MockCirculationManager(controller_fixture.db.session) - controller = LCPController(manager) - controller.authenticated_patron_from_request = MagicMock( - return_value=patron - ) - - url = "http://circulationmanager.org/{}/licenses{}".format( - LCPAPI.NAME, license_id - ) - - with controller_fixture.app.test_request_context(url): - request.library = controller_fixture.db.default_library() - - # Act - result1 = controller.get_lcp_license(LCPAPI.NAME, license_id) - result2 = controller.get_lcp_license(LCPAPI.NAME, license_id) - - # Assert - for result in [result1, result2]: - assert result.status_code == 200 - assert result.json == expected_license diff --git a/tests/api/lcp/test_encrypt.py b/tests/api/lcp/test_encrypt.py deleted file mode 100644 index 684caf8ce6..0000000000 --- a/tests/api/lcp/test_encrypt.py +++ /dev/null @@ -1,156 +0,0 @@ -from unittest.mock import patch - -import pytest -from pyfakefs.fake_filesystem_unittest import Patcher - -from api.lcp.collection import LCPAPI -from api.lcp.encrypt import ( - LCPEncryptionConstants, - LCPEncryptionException, - LCPEncryptionResult, - LCPEncryptor, -) -from core.integration.goals import Goals -from core.model import Identifier -from core.model.integration import IntegrationConfiguration -from tests.api.lcp import lcp_strings -from tests.fixtures.database import DatabaseTransactionFixture - - -class LCPEncryptFixture: - db: DatabaseTransactionFixture - integration: IntegrationConfiguration - - def __init__(self, db: DatabaseTransactionFixture): - self.db = db - self.integration = self.db.integration_configuration( - protocol=LCPAPI.NAME, goal=Goals.LICENSE_GOAL - ) - - -@pytest.fixture(scope="function") -def lcp_encrypt_fixture(db: DatabaseTransactionFixture) -> LCPEncryptFixture: - return LCPEncryptFixture(db) - - -class TestLCPEncryptor: - @pytest.mark.parametrize( - "_, file_path, lcpencrypt_output, expected_result, expected_exception, create_file", - [ - ( - "non_existing_directory", - lcp_strings.NOT_EXISTING_BOOK_FILE_PATH, - lcp_strings.LCPENCRYPT_NOT_EXISTING_DIRECTORY_RESULT, - None, - LCPEncryptionException( - lcp_strings.LCPENCRYPT_NOT_EXISTING_DIRECTORY_RESULT.strip() - ), - False, - ), - ( - "failed_encryption", - lcp_strings.NOT_EXISTING_BOOK_FILE_PATH, - lcp_strings.LCPENCRYPT_FAILED_ENCRYPTION_RESULT, - None, - LCPEncryptionException("Encryption failed"), - True, - ), - ( - "successful_encryption", - lcp_strings.EXISTING_BOOK_FILE_PATH, - lcp_strings.LCPENCRYPT_SUCCESSFUL_ENCRYPTION_RESULT, - LCPEncryptionResult( - content_id=lcp_strings.BOOK_IDENTIFIER, - content_encryption_key=lcp_strings.CONTENT_ENCRYPTION_KEY, - protected_content_location=lcp_strings.PROTECTED_CONTENT_LOCATION, - protected_content_disposition=lcp_strings.PROTECTED_CONTENT_DISPOSITION, - protected_content_type=lcp_strings.PROTECTED_CONTENT_TYPE, - protected_content_length=lcp_strings.PROTECTED_CONTENT_LENGTH, - protected_content_sha256=lcp_strings.PROTECTED_CONTENT_SHA256, - ), - None, - True, - ), - ( - "failed_lcp_server_notification", - lcp_strings.EXISTING_BOOK_FILE_PATH, - lcp_strings.LCPENCRYPT_FAILED_LCPSERVER_NOTIFICATION, - None, - LCPEncryptionException( - lcp_strings.LCPENCRYPT_FAILED_LCPSERVER_NOTIFICATION.strip() - ), - True, - ), - ( - "successful_lcp_server_notification", - lcp_strings.EXISTING_BOOK_FILE_PATH, - lcp_strings.LCPENCRYPT_SUCCESSFUL_NOTIFICATION_RESULT, - LCPEncryptionResult( - content_id=lcp_strings.BOOK_IDENTIFIER, - content_encryption_key=lcp_strings.CONTENT_ENCRYPTION_KEY, - protected_content_location=lcp_strings.PROTECTED_CONTENT_LOCATION, - protected_content_disposition=lcp_strings.PROTECTED_CONTENT_DISPOSITION, - protected_content_type=lcp_strings.PROTECTED_CONTENT_TYPE, - protected_content_length=lcp_strings.PROTECTED_CONTENT_LENGTH, - protected_content_sha256=lcp_strings.PROTECTED_CONTENT_SHA256, - ), - None, - True, - ), - ], - ) - def test_local_lcpencrypt( - self, - lcp_encrypt_fixture: LCPEncryptFixture, - _, - file_path, - lcpencrypt_output, - expected_result, - expected_exception, - create_file, - ): - # Arrange - # integration_owner = create_autospec(spec=HasIntegrationConfiguration) - # integration_owner.integration_configuration = MagicMock( - # return_value=lcp_encrypt_fixture.integration - # ) - configuration = lcp_encrypt_fixture.integration - encryptor = LCPEncryptor(configuration) - identifier = Identifier(identifier=lcp_strings.BOOK_IDENTIFIER) - - DatabaseTransactionFixture.set_settings( - configuration, - "lcpencrypt_location", - LCPEncryptionConstants.DEFAULT_LCPENCRYPT_LOCATION, - ) - - with Patcher() as patcher: - assert patcher.fs is not None - patcher.fs.create_file(LCPEncryptionConstants.DEFAULT_LCPENCRYPT_LOCATION) - - if create_file: - patcher.fs.create_file(file_path) - - with patch("subprocess.check_output") as subprocess_check_output_mock: - subprocess_check_output_mock.return_value = lcpencrypt_output - - if expected_exception: - with pytest.raises( - expected_exception.__class__ - ) as exception_metadata: - encryptor.encrypt( - lcp_encrypt_fixture.db.session, - file_path, - identifier.identifier, - ) - - # Assert - assert exception_metadata.value == expected_exception - else: - # Assert - result = encryptor.encrypt( - lcp_encrypt_fixture.db.session, - file_path, - identifier.identifier, - ) - assert result == expected_result diff --git a/tests/api/lcp/test_importer.py b/tests/api/lcp/test_importer.py deleted file mode 100644 index 3a054fc2f8..0000000000 --- a/tests/api/lcp/test_importer.py +++ /dev/null @@ -1,36 +0,0 @@ -from unittest.mock import MagicMock, create_autospec - -import sqlalchemy - -from api.lcp.encrypt import LCPEncryptionResult, LCPEncryptor -from api.lcp.importer import LCPImporter -from api.lcp.server import LCPServer - - -class TestLCPImporter: - def test_import_book(self): - # Arrange - file_path = "/opt/readium/raw_books/book.epub" - identifier = "123456789" - encrypted_content = LCPEncryptionResult( - content_id="1", - content_encryption_key="12345", - protected_content_location="/opt/readium/files/encrypted", - protected_content_disposition="encrypted_book", - protected_content_type="application/epub+zip", - protected_content_length=12345, - protected_content_sha256="12345", - ) - lcp_encryptor = create_autospec(spec=LCPEncryptor) - lcp_encryptor.encrypt = MagicMock(return_value=encrypted_content) - lcp_server = create_autospec(spec=LCPServer) - lcp_server.add_content = MagicMock() - importer = LCPImporter(lcp_encryptor, lcp_server) - db = create_autospec(spec=sqlalchemy.orm.session.Session) - - # Act - importer.import_book(db, file_path, identifier) - - # Assert - lcp_encryptor.encrypt.assert_called_once_with(db, file_path, identifier) - lcp_server.add_content.assert_called_once_with(db, encrypted_content) diff --git a/tests/api/lcp/test_mirror.py b/tests/api/lcp/test_mirror.py deleted file mode 100644 index 9f7ba84c51..0000000000 --- a/tests/api/lcp/test_mirror.py +++ /dev/null @@ -1,81 +0,0 @@ -from unittest.mock import ANY, create_autospec, patch - -import pytest - -from api.lcp.importer import LCPImporter -from api.lcp.mirror import LCPMirror -from core.model import ( - Collection, - DataSource, - ExternalIntegration, - Identifier, - Representation, -) -from core.s3 import MinIOUploaderConfiguration, S3UploaderConfiguration -from tests.fixtures.database import DatabaseTransactionFixture - - -class LCPMirrorFixture: - db: DatabaseTransactionFixture - lcp_collection: Collection - lcp_mirror: LCPMirror - - def __init__(self, db: DatabaseTransactionFixture): - self.db = db - - settings = { - S3UploaderConfiguration.PROTECTED_CONTENT_BUCKET_KEY: "encrypted-books", - MinIOUploaderConfiguration.ENDPOINT_URL: "http://minio", - } - integration = self.db.external_integration( - ExternalIntegration.LCP, - goal=ExternalIntegration.STORAGE_GOAL, - settings=settings, - ) - self.lcp_collection = self.db.collection(protocol=ExternalIntegration.LCP) - self.lcp_mirror = LCPMirror(integration) - - -@pytest.fixture(scope="function") -def lcp_mirror_fixture(db: DatabaseTransactionFixture) -> LCPMirrorFixture: - return LCPMirrorFixture(db) - - -class TestLCPMirror: - def test_book_url(self, lcp_mirror_fixture: LCPMirrorFixture): - # Arrange - data_source = DataSource.lookup( - lcp_mirror_fixture.db.session, DataSource.LCP, autocreate=True - ) - identifier = Identifier(identifier="12345", type=Identifier.ISBN) - - # Act - result = lcp_mirror_fixture.lcp_mirror.book_url( - identifier, data_source=data_source - ) - - # Assert - assert result == "http://encrypted-books.minio/12345" - - def test_mirror_one(self, lcp_mirror_fixture: LCPMirrorFixture): - # Arrange - expected_identifier = "12345" - mirror_url = "http://encrypted-books.minio/" + expected_identifier - lcp_importer = create_autospec(spec=LCPImporter) - representation, _ = lcp_mirror_fixture.db.representation( - media_type=Representation.EPUB_MEDIA_TYPE, content="12345" - ) - - # Act - with patch("api.lcp.mirror.LCPImporter") as lcp_importer_constructor: - lcp_importer_constructor.return_value = lcp_importer - lcp_mirror_fixture.lcp_mirror.mirror_one( - representation, - mirror_to=mirror_url, - collection=lcp_mirror_fixture.lcp_collection, - ) - - # Assert - lcp_importer.import_book.assert_called_once_with( - lcp_mirror_fixture.db.session, ANY, expected_identifier - ) diff --git a/tests/api/lcp/test_server.py b/tests/api/lcp/test_server.py deleted file mode 100644 index ddbe544ef7..0000000000 --- a/tests/api/lcp/test_server.py +++ /dev/null @@ -1,319 +0,0 @@ -from __future__ import annotations - -import datetime -import json -import os -import urllib.parse -from typing import Literal -from unittest.mock import MagicMock - -import pytest -import requests_mock - -from api.lcp import utils -from api.lcp.encrypt import LCPEncryptionResult -from api.lcp.hash import HasherFactory -from api.lcp.server import LCPServer, LCPServerConstants, LCPServerSettings -from core.lcp.credential import LCPCredentialFactory, LCPUnhashedPassphrase -from core.model.collection import Collection -from core.model.configuration import ExternalIntegration -from tests.api.lcp import lcp_strings -from tests.fixtures.database import DatabaseTransactionFixture - - -class LCPServerFixture: - db: DatabaseTransactionFixture - lcp_collection: Collection - integration: ExternalIntegration - hasher_factory: HasherFactory - credential_factory: LCPCredentialFactory - lcp_server: LCPServer - - def __init__(self, db: DatabaseTransactionFixture): - self.db = db - self.lcp_collection = self.db.collection(protocol=ExternalIntegration.LCP) - self.configuration = self.lcp_collection.integration_configuration - DatabaseTransactionFixture.set_settings( - self.configuration, "lcpserver_input_directory", "/tmp" - ) - self.hasher_factory = HasherFactory() - self.credential_factory = LCPCredentialFactory() - self.lcp_server = LCPServer( - lambda: LCPServerSettings(**self.configuration.settings_dict), - self.hasher_factory, - self.credential_factory, - ) - - -@pytest.fixture(scope="function") -def lcp_server_fixture(db: DatabaseTransactionFixture) -> LCPServerFixture: - return LCPServerFixture(db) - - -class TestLCPServer: - @pytest.mark.parametrize( - "_, input_directory", - [ - ("non_empty_input_directory", "/tmp/encrypted_books"), - ], - ) - def test_add_content( - self, - lcp_server_fixture: LCPServerFixture, - _: Literal["empty_input_directory", "non_empty_input_directory"], - input_directory: Literal["", "/tmp/encrypted_books"], - ): - # Arrange - lcp_server = LCPServer( - lambda: LCPServerSettings(**lcp_server_fixture.configuration.settings_dict), - lcp_server_fixture.hasher_factory, - lcp_server_fixture.credential_factory, - ) - encrypted_content = LCPEncryptionResult( - content_id=lcp_strings.CONTENT_ID, - content_encryption_key="12345", - protected_content_location="/opt/readium/files/encrypted", - protected_content_disposition="encrypted_book", - protected_content_type="application/epub+zip", - protected_content_length=12345, - protected_content_sha256="12345", - ) - expected_protected_content_disposition = os.path.join( - input_directory, encrypted_content.protected_content_disposition - ) - - configuration = lcp_server_fixture.configuration - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_url", lcp_strings.LCPSERVER_URL - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_user", lcp_strings.LCPSERVER_USER - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_password", lcp_strings.LCPSERVER_PASSWORD - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_input_directory", input_directory - ) - DatabaseTransactionFixture.set_settings( - configuration, "provider_name", lcp_strings.PROVIDER_NAME - ) - DatabaseTransactionFixture.set_settings( - configuration, "passphrase_hint", lcp_strings.TEXT_HINT - ) - DatabaseTransactionFixture.set_settings( - configuration, - "encryption_algorithm", - LCPServerConstants.DEFAULT_ENCRYPTION_ALGORITHM, - ) - - with requests_mock.Mocker() as request_mock: - url = urllib.parse.urljoin( - lcp_strings.LCPSERVER_URL, f"/contents/{lcp_strings.CONTENT_ID}" - ) - request_mock.put(url) - - # Act - lcp_server.add_content(lcp_server_fixture.db.session, encrypted_content) - - # Assert - assert request_mock.called == True - - json_request = json.loads(request_mock.last_request.text) - assert json_request["content-id"] == encrypted_content.content_id - assert ( - json_request["content-encryption-key"] - == encrypted_content.content_encryption_key - ) - assert ( - json_request["protected-content-location"] - == expected_protected_content_disposition - ) - assert ( - json_request["protected-content-disposition"] - == encrypted_content.protected_content_disposition - ) - assert ( - json_request["protected-content-type"] - == encrypted_content.protected_content_type - ) - assert ( - json_request["protected-content-length"] - == encrypted_content.protected_content_length - ) - assert ( - json_request["protected-content-sha256"] - == encrypted_content.protected_content_sha256 - ) - - @pytest.mark.parametrize( - "_, license_start, license_end, max_printable_pages, max_copiable_pages", - [ - ("none_rights", None, None, None, None), - ( - "license_start", - datetime.datetime(2020, 1, 1, 00, 00, 00), - None, - None, - None, - ), - ( - "license_end", - None, - datetime.datetime(2020, 12, 31, 23, 59, 59), - None, - None, - ), - ("max_printable_pages", None, None, 10, None), - ("max_printable_pages_empty_max_copiable_pages", None, None, 10, ""), - ("empty_max_printable_pages", None, None, "", None), - ("max_copiable_pages", None, None, None, 1024), - ("empty_max_printable_pages_max_copiable_pages", None, None, "", 1024), - ("empty_max_copiable_pages", None, None, None, ""), - ( - "dates", - datetime.datetime(2020, 1, 1, 00, 00, 00), - datetime.datetime(2020, 12, 31, 23, 59, 59), - None, - None, - ), - ( - "full_rights", - datetime.datetime(2020, 1, 1, 00, 00, 00), - datetime.datetime(2020, 12, 31, 23, 59, 59), - 10, - 1024, - ), - ], - ) - def test_generate_license( - self, - lcp_server_fixture: LCPServerFixture, - _: Literal[ - "none_rights", - "license_start", - "license_end", - "max_printable_pages", - "max_printable_pages_empty_max_copiable_pages", - "empty_max_printable_pages", - "max_copiable_pages", - "empty_max_printable_pages_max_copiable_pages", - "empty_max_copiable_pages", - "dates", - "full_rights", - ], - license_start: datetime.datetime | None, - license_end: datetime.datetime | None, - max_printable_pages: Literal[10, ""] | None, - max_copiable_pages: Literal["", 1024] | None, - ): - # Arrange - patron = lcp_server_fixture.db.patron() - expected_patron_id = "52a190d1-cd69-4794-9d7a-1ec50392697f" - expected_patron_passphrase = LCPUnhashedPassphrase( - "52a190d1-cd69-4794-9d7a-1ec50392697a" - ) - expected_patron_key = lcp_server_fixture.hasher_factory.create( - LCPServerConstants.DEFAULT_ENCRYPTION_ALGORITHM - ).hash(expected_patron_passphrase.text) - - configuration = lcp_server_fixture.configuration - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_url", lcp_strings.LCPSERVER_URL - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_user", lcp_strings.LCPSERVER_USER - ) - DatabaseTransactionFixture.set_settings( - configuration, "lcpserver_password", lcp_strings.LCPSERVER_PASSWORD - ) - DatabaseTransactionFixture.set_settings( - configuration, "provider_name", lcp_strings.PROVIDER_NAME - ) - DatabaseTransactionFixture.set_settings( - configuration, "passphrase_hint", lcp_strings.TEXT_HINT - ) - DatabaseTransactionFixture.set_settings( - configuration, - "encryption_algorithm", - LCPServerConstants.DEFAULT_ENCRYPTION_ALGORITHM, - ) - DatabaseTransactionFixture.set_settings( - configuration, "max_printable_pages", max_printable_pages - ) - DatabaseTransactionFixture.set_settings( - configuration, "max_copiable_pages", max_copiable_pages - ) - - lcp_server_fixture.credential_factory.get_patron_id = MagicMock( # type: ignore - return_value=expected_patron_id - ) - lcp_server_fixture.credential_factory.get_patron_passphrase = MagicMock( # type: ignore - return_value=expected_patron_passphrase - ) - - with requests_mock.Mocker() as request_mock: - url = urllib.parse.urljoin( - lcp_strings.LCPSERVER_URL, - f"/contents/{lcp_strings.CONTENT_ID}/license", - ) - request_mock.post(url, json=lcp_strings.LCPSERVER_LICENSE) - - # Act - license = lcp_server_fixture.lcp_server.generate_license( - lcp_server_fixture.db.session, - lcp_strings.CONTENT_ID, - patron, - license_start, - license_end, - ) - - # Assert - assert request_mock.called == True - assert license == lcp_strings.LCPSERVER_LICENSE - - json_request = json.loads(request_mock.last_request.text) - assert json_request["provider"] == lcp_strings.PROVIDER_NAME - assert json_request["user"]["id"] == expected_patron_id - assert ( - json_request["encryption"]["user_key"]["text_hint"] - == lcp_strings.TEXT_HINT - ) - assert ( - json_request["encryption"]["user_key"]["hex_value"] - == expected_patron_key - ) - - if license_start is not None: - assert json_request["rights"]["start"] == utils.format_datetime( - license_start - ) - if license_end is not None: - assert json_request["rights"]["end"] == utils.format_datetime( - license_end - ) - if max_printable_pages is not None and max_printable_pages != "": - assert json_request["rights"]["print"] == max_printable_pages - if max_copiable_pages is not None and max_copiable_pages != "": - assert json_request["rights"]["copy"] == max_copiable_pages - - all_rights_fields_are_empty = all( - [ - rights_field is None or rights_field == "" - for rights_field in [ - license_start, - license_end, - max_printable_pages, - max_copiable_pages, - ] - ] - ) - if all_rights_fields_are_empty: - assert ("rights" in json_request) == False - - lcp_server_fixture.credential_factory.get_patron_id.assert_called_once_with( - lcp_server_fixture.db.session, patron - ) - lcp_server_fixture.credential_factory.get_patron_passphrase.assert_called_once_with( - lcp_server_fixture.db.session, patron - ) diff --git a/tests/api/test_circulationapi.py b/tests/api/test_circulationapi.py index d3070b0771..cb42636d3d 100644 --- a/tests/api/test_circulationapi.py +++ b/tests/api/test_circulationapi.py @@ -172,21 +172,6 @@ def test_borrow_sends_analytics_event(self, circulation_api: CirculationAPIFixtu loan, hold, is_new = self.borrow(circulation_api) assert 3 == circulation_api.analytics.count - def test_borrowing_of_self_hosted_book_succeeds( - self, circulation_api: CirculationAPIFixture - ): - # Arrange - circulation_api.pool.self_hosted = True - - # Act - loan, hold, is_new = self.borrow(circulation_api) - - # Assert - assert True == is_new - assert circulation_api.pool == loan.license_pool - assert circulation_api.patron == loan.patron - assert hold is None - def test_borrowing_of_unlimited_access_book_succeeds( self, circulation_api: CirculationAPIFixture ): @@ -1132,14 +1117,9 @@ def yes_we_can(*args, **kwargs): result = try_to_fulfill() assert fulfillment == result - @pytest.mark.parametrize( - "open_access, self_hosted", [(True, False), (False, True), (False, False)] - ) - def test_revoke_loan( - self, circulation_api: CirculationAPIFixture, open_access, self_hosted - ): + @pytest.mark.parametrize("open_access", [True, False]) + def test_revoke_loan(self, circulation_api: CirculationAPIFixture, open_access): circulation_api.pool.open_access = open_access - circulation_api.pool.self_hosted = self_hosted circulation_api.patron.last_loan_activity_sync = utc_now() circulation_api.pool.loan_to(circulation_api.patron) @@ -1157,14 +1137,9 @@ def test_revoke_loan( assert 1 == circulation_api.analytics.count assert CirculationEvent.CM_CHECKIN == circulation_api.analytics.event_type - @pytest.mark.parametrize( - "open_access, self_hosted", [(True, False), (False, True), (False, False)] - ) - def test_release_hold( - self, circulation_api: CirculationAPIFixture, open_access, self_hosted - ): + @pytest.mark.parametrize("open_access", [True, False]) + def test_release_hold(self, circulation_api: CirculationAPIFixture, open_access): circulation_api.pool.open_access = open_access - circulation_api.pool.self_hosted = self_hosted circulation_api.patron.last_loan_activity_sync = utc_now() circulation_api.pool.on_hold_to(circulation_api.patron) diff --git a/tests/api/test_controller_base.py b/tests/api/test_controller_base.py index 8f8d4e9b8b..b80e51c4f5 100644 --- a/tests/api/test_controller_base.py +++ b/tests/api/test_controller_base.py @@ -460,7 +460,6 @@ def test_apply_borrowing_policy_succeeds_for_unlimited_access_books( ) [pool] = work.license_pools pool.open_access = False - pool.self_hosted = False pool.unlimited_access = True # Act @@ -471,31 +470,6 @@ def test_apply_borrowing_policy_succeeds_for_unlimited_access_books( # Assert assert problem is None - def test_apply_borrowing_policy_succeeds_for_self_hosted_books( - self, circulation_fixture: CirculationControllerFixture - ): - with circulation_fixture.request_context_with_library("/"): - # Arrange - patron = circulation_fixture.controller.authenticated_patron( - circulation_fixture.valid_credentials - ) - work = circulation_fixture.db.work( - with_license_pool=True, with_open_access_download=False - ) - [pool] = work.license_pools - pool.licenses_available = 0 - pool.licenses_owned = 0 - pool.open_access = False - pool.self_hosted = True - - # Act - problem = circulation_fixture.controller.apply_borrowing_policy( - patron, pool - ) - - # Assert - assert problem is None - def test_apply_borrowing_policy_when_holds_prohibited( self, circulation_fixture: CirculationControllerFixture, diff --git a/tests/core/models/test_collection.py b/tests/core/models/test_collection.py index c053371431..b85247f201 100644 --- a/tests/core/models/test_collection.py +++ b/tests/core/models/test_collection.py @@ -843,9 +843,7 @@ def test_restrict_to_ready_deliverable_works( ): """A partial test of restrict_to_ready_deliverable_works. - This test covers the following cases: - 1. The bit that excludes audiobooks from certain data sources. - 2. Makes sure that self-hosted books and books with unlimited access are not get filtered out that come. + This test covers the bit that excludes audiobooks from certain data sources. The other cases are tested indirectly in lane.py, but could use a more explicit test here. """ @@ -869,20 +867,6 @@ def test_restrict_to_ready_deliverable_works( ) feedbooks_audiobook.presentation_edition.medium = Edition.AUDIO_MEDIUM - DataSource.lookup(db.session, DataSource.LCP, autocreate=True) - self_hosted_lcp_book = db.work( - data_source_name=DataSource.LCP, - title="Self-hosted LCP book", - with_license_pool=True, - self_hosted=True, - ) - unlimited_access_book = db.work( - data_source_name=DataSource.LCP, - title="Self-hosted LCP book", - with_license_pool=True, - unlimited_access=True, - ) - def expect(qu, works): """Modify the query `qu` by calling restrict_to_ready_deliverable_works(), then verify that @@ -912,8 +896,6 @@ def expect(qu, works): overdrive_ebook, overdrive_audiobook, feedbooks_audiobook, - self_hosted_lcp_book, - unlimited_access_book, ], ) # Putting a data source in the list excludes its audiobooks, but @@ -924,12 +906,10 @@ def expect(qu, works): [ overdrive_ebook, feedbooks_audiobook, - self_hosted_lcp_book, - unlimited_access_book, ], ) setting.value = json.dumps([DataSource.OVERDRIVE, DataSource.FEEDBOOKS]) - expect(qu, [overdrive_ebook, self_hosted_lcp_book, unlimited_access_book]) + expect(qu, [overdrive_ebook]) def test_delete(self, example_collection_fixture: ExampleCollectionFixture): """Verify that Collection.delete will only operate on collections diff --git a/tests/core/models/test_listeners.py b/tests/core/models/test_listeners.py index 230d537c50..2fc0c6e7c8 100644 --- a/tests/core/models/test_listeners.py +++ b/tests/core/models/test_listeners.py @@ -246,10 +246,6 @@ class TestListeners: "works_when_open_access_property_changes", functools.partial(_set_property, property_name="open_access"), ), - ( - "works_when_self_hosted_property_changes", - functools.partial(_set_property, property_name="self_hosted"), - ), ], ) def test_licensepool_storage_status_change( diff --git a/tests/core/models/test_work.py b/tests/core/models/test_work.py index fdddb39d5d..f15f495d86 100644 --- a/tests/core/models/test_work.py +++ b/tests/core/models/test_work.py @@ -1444,7 +1444,6 @@ def test_unlimited_access_books_are_available_by_default( work = db.work(presentation_edition=edition) pool.open_access = False - pool.self_hosted = False pool.unlimited_access = True # Make sure all of this will show up in a database query. @@ -1459,35 +1458,6 @@ def test_unlimited_access_books_are_available_by_default( assert licensepools[0]["open_access"] == False assert licensepools[0]["available"] == True - def test_self_hosted_books_are_available_by_default( - self, db: DatabaseTransactionFixture - ): - # Set up an edition and work. - edition, pool = db.edition( - authors=[ - db.fresh_str(), - db.fresh_str(), - ], - with_license_pool=True, - ) - work = db.work(presentation_edition=edition) - - pool.licenses_owned = 0 - pool.licenses_available = 0 - pool.self_hosted = True - - # Make sure all of this will show up in a database query. - db.session.flush() - - search_doc = work.to_search_document() - - # Each LicensePool for the Work is listed in - # the 'licensepools' section. - licensepools = search_doc["licensepools"] - assert 1 == len(licensepools) - assert licensepools[0]["open_access"] == False - assert licensepools[0]["available"] == True - def test_target_age_string(self, db: DatabaseTransactionFixture): work = db.work() work.target_age = NumericRange(7, 8, "[]") diff --git a/tests/core/test_external_search.py b/tests/core/test_external_search.py index 93dceda06e..5ca325cc7e 100644 --- a/tests/core/test_external_search.py +++ b/tests/core/test_external_search.py @@ -662,7 +662,6 @@ def _populate_works( with_license_pool=True, collection=result.tiny_collection, ) - result.tiny_book.license_pools[0].self_hosted = True # Both collections contain 'The Adventures of Sherlock # Holmes", but each collection licenses the book through a diff --git a/tests/core/test_lane.py b/tests/core/test_lane.py index 039570c12d..10224a46d4 100644 --- a/tests/core/test_lane.py +++ b/tests/core/test_lane.py @@ -919,9 +919,6 @@ def test_modify_database_query(self, db: DatabaseTransactionFixture): # reasons why a book might or might not be 'available'. open_access = db.work(with_open_access_download=True, title="open access") open_access.quality = 1 - self_hosted = db.work( - with_license_pool=True, self_hosted=True, title="self hosted" - ) unlimited_access = db.work( with_license_pool=True, unlimited_access=True, title="unlimited access" ) @@ -949,11 +946,11 @@ def test_modify_database_query(self, db: DatabaseTransactionFixture): for availability, expect in [ ( Facets.AVAILABLE_NOW, - [open_access, available, self_hosted, unlimited_access], + [open_access, available, unlimited_access], ), ( Facets.AVAILABLE_ALL, - [open_access, available, not_available, self_hosted, unlimited_access], + [open_access, available, not_available, unlimited_access], ), (Facets.AVAILABLE_NOT_NOW, [not_available]), ]: @@ -970,7 +967,7 @@ def test_modify_database_query(self, db: DatabaseTransactionFixture): for collection, expect in [ ( Facets.COLLECTION_FULL, - [open_access, available, self_hosted, unlimited_access], + [open_access, available, unlimited_access], ), (Facets.COLLECTION_FEATURED, [open_access]), ]: diff --git a/tests/core/test_opds.py b/tests/core/test_opds.py index 9a4ec75306..6a595d81d0 100644 --- a/tests/core/test_opds.py +++ b/tests/core/test_opds.py @@ -1993,7 +1993,6 @@ def test_license_tags_show_unlimited_access_books( # Arrange edition, pool = db.edition(with_license_pool=True) pool.open_access = False - pool.self_hosted = False pool.unlimited_access = True # Act @@ -2021,23 +2020,6 @@ def test_unlimited_access_pool_loan(self, db: DatabaseTransactionFixture): assert "since" in tag.attrib assert "until" not in tag.attrib - def test_license_tags_show_self_hosted_books(self, db: DatabaseTransactionFixture): - - # Arrange - edition, pool = db.edition(with_license_pool=True) - pool.self_hosted = True - pool.open_access = False - pool.licenses_available = 0 - pool.licenses_owned = 0 - - # Act - tags = AcquisitionFeed.license_tags(pool, None, None) - - # Assert - assert 1 == len(tags) - assert "status" in tags[0].attrib - assert "available" == tags[0].attrib["status"] - def test_single_entry(self, db: DatabaseTransactionFixture): session = db.session diff --git a/tests/core/test_s3_analytics_provider.py b/tests/core/test_s3_analytics_provider.py index e0696ff23f..e07e0b45a7 100644 --- a/tests/core/test_s3_analytics_provider.py +++ b/tests/core/test_s3_analytics_provider.py @@ -298,7 +298,7 @@ def test_analytics_data_with_associated_license_pool_is_correctly_stored_in_s3( assert license_pool.licenses_available == event["licenses_available"] assert license_pool.licenses_reserved == event["licenses_reserved"] assert license_pool.patrons_in_hold_queue == event["patrons_in_hold_queue"] - assert license_pool.self_hosted == event["self_hosted"] + assert False == event["self_hosted"] assert work.title == event["title"] assert work.series == event["series"] assert work.series_position == event["series_position"] diff --git a/tests/fixtures/database.py b/tests/fixtures/database.py index 647caf9412..ec1362ac24 100644 --- a/tests/fixtures/database.py +++ b/tests/fixtures/database.py @@ -343,7 +343,6 @@ def work( presentation_edition=None, collection=None, data_source_name=None, - self_hosted=False, unlimited_access=False, ): """Create a Work. @@ -378,16 +377,12 @@ def work( data_source_name=data_source_name, series=series, collection=collection, - self_hosted=self_hosted, unlimited_access=unlimited_access, ) if with_license_pool: presentation_edition, pool = presentation_edition if with_open_access_download: pool.open_access = True - if self_hosted: - pool.open_access = False - pool.self_hosted = True if unlimited_access: pool.open_access = False pool.unlimited_access = True @@ -446,7 +441,6 @@ def edition( series=None, collection=None, publication_date=None, - self_hosted=False, unlimited_access=False, ): id = identifier_id or self.fresh_str() @@ -486,7 +480,6 @@ def edition( data_source_name=data_source_name, with_open_access_download=with_open_access_download, collection=collection, - self_hosted=self_hosted, unlimited_access=unlimited_access, ) @@ -502,7 +495,6 @@ def licensepool( with_open_access_download=False, set_edition_as_presentation=False, collection=None, - self_hosted=False, unlimited_access=False, ): source = DataSource.lookup(self.session, data_source_name) @@ -518,7 +510,6 @@ def licensepool( data_source=source, collection=collection, availability_time=utc_now(), - self_hosted=self_hosted, unlimited_access=unlimited_access, )