diff --git a/api/admin/controller/settings.py b/api/admin/controller/settings.py index 5ab4bda7a5..e70070d4ab 100644 --- a/api/admin/controller/settings.py +++ b/api/admin/controller/settings.py @@ -465,17 +465,6 @@ def _get_collection_protocols(self, provider_apis): protocols = self._get_integration_protocols( provider_apis, protocol_name_attr="NAME" ) - protocols.append( - { - "name": ExternalIntegration.MANUAL, - "label": _("Manual import"), - "description": _( - "Books will be manually added to the circulation manager, " - "not imported automatically through a protocol." - ), - "settings": [], - } - ) return protocols diff --git a/api/onix.py b/api/onix.py deleted file mode 100644 index 5b327cb484..0000000000 --- a/api/onix.py +++ /dev/null @@ -1,420 +0,0 @@ -import logging -from enum import Enum - -import dateutil.parser -from lxml import etree - -from core.classifier import Classifier -from core.metadata_layer import ( - CirculationData, - ContributorData, - IdentifierData, - LinkData, - Metadata, - SubjectData, -) -from core.model import ( - Classification, - Contributor, - EditionConstants, - Hyperlink, - Identifier, - LicensePool, - Representation, - Subject, -) -from core.util.datetime_helpers import to_utc -from core.util.xmlparser import XMLParser - - -class UsageStatus(Enum): - UNLIMITED = "01" - LIMITED = "02" - PROHIBITED = "03" - - -class UsageUnit(Enum): - COPIES = "01" - CHARACTERS = "02" - WORDS = "03" - PAGES = "04" - PERCENTAGE = "05" - DEVICES = "06" - CONCURRENT_USERS = "07" - PERCENTAGE_PER_TIME_PERIOD = "08" - DAYS = "09" - TIMES = "10" - - -class ONIXExtractor: - """Transform an ONIX file into a list of Metadata objects.""" - - # TODO: '20' indicates a semicolon-separated list of freeform tags, - # which could also be useful. - SUBJECT_TYPES = { - "01": Classifier.DDC, - "03": Classifier.LCC, - "04": Classifier.LCSH, - "10": Classifier.BISAC, - "12": Classifier.BIC, - } - - AUDIENCE_TYPES = { - "01": Classifier.AUDIENCE_ADULT, # General/trade for adult audience - "02": Classifier.AUDIENCE_CHILDREN, # (not for educational purpose) - "03": Classifier.AUDIENCE_YOUNG_ADULT, # (not for educational purpose) - "04": Classifier.AUDIENCE_CHILDREN, # Primary and secondary/elementary and high school - "05": Classifier.AUDIENCE_ADULT, # College/higher education - "06": Classifier.AUDIENCE_ADULT, # Professional and scholarly - "07": Classifier.AUDIENCE_ADULT, # ESL - "08": Classifier.AUDIENCE_ADULT, # Adult education - "09": Classifier.AUDIENCE_ADULT, # Second language teaching other than English - } - - CONTRIBUTOR_TYPES = { - "A01": Contributor.AUTHOR_ROLE, - "A02": Contributor.AUTHOR_ROLE, # 'With or as told to' - "A03": Contributor.AUTHOR_ROLE, # Screenplay author - "A04": Contributor.LYRICIST_ROLE, # Libretto author for an opera - "A05": Contributor.LYRICIST_ROLE, - "A06": Contributor.COMPOSER_ROLE, - "A07": Contributor.ILLUSTRATOR_ROLE, # Visual artist who is the primary creator of the work - "A08": Contributor.PHOTOGRAPHER_ROLE, - "A09": Contributor.AUTHOR_ROLE, # 'Created by' - "A10": Contributor.UNKNOWN_ROLE, # 'From an idea by' - "A11": Contributor.DESIGNER_ROLE, - "A12": Contributor.ILLUSTRATOR_ROLE, - "A13": Contributor.PHOTOGRAPHER_ROLE, - "A14": Contributor.AUTHOR_ROLE, # Author of the text for a work that is primarily photos or illustrations - "A15": Contributor.INTRODUCTION_ROLE, # Preface author - "A16": Contributor.UNKNOWN_ROLE, # Prologue author - "A17": Contributor.UNKNOWN_ROLE, # Summary author - "A18": Contributor.UNKNOWN_ROLE, # Supplement author - "A19": Contributor.AFTERWORD_ROLE, # Afterword author - "A20": Contributor.UNKNOWN_ROLE, # Author of notes or annotations - "A21": Contributor.UNKNOWN_ROLE, # Author of commentary on main text - "A22": Contributor.UNKNOWN_ROLE, # Epilogue author - "A23": Contributor.FOREWORD_ROLE, - "A24": Contributor.INTRODUCTION_ROLE, - "A25": Contributor.UNKNOWN_ROLE, # Author/compiler of footnotes - "A26": Contributor.UNKNOWN_ROLE, # Author of memoir accompanying main text - "A27": Contributor.UNKNOWN_ROLE, # Person who carried out experiments reported in the text - "A29": Contributor.INTRODUCTION_ROLE, # Author of introduction and notes - "A30": Contributor.UNKNOWN_ROLE, # Writer of computer programs ancillary to the text - "A31": Contributor.LYRICIST_ROLE, # 'Book and lyrics by' - "A32": Contributor.CONTRIBUTOR_ROLE, # 'Contributions by' - "A33": Contributor.UNKNOWN_ROLE, # Appendix author - "A34": Contributor.UNKNOWN_ROLE, # Compiler of index - "A35": Contributor.ARTIST_ROLE, # 'Drawings by' - "A36": Contributor.ARTIST_ROLE, # Cover artist - "A37": Contributor.UNKNOWN_ROLE, # Responsible for preliminary work on which the work is based - "A38": Contributor.UNKNOWN_ROLE, # Author of the first edition who is not an author of the current edition - "A39": Contributor.UNKNOWN_ROLE, # 'Maps by' - "A40": Contributor.ARTIST_ROLE, # 'Inked or colored by' - "A41": Contributor.UNKNOWN_ROLE, # 'Paper engineering by' - "A42": Contributor.UNKNOWN_ROLE, # 'Continued by' - "A43": Contributor.UNKNOWN_ROLE, # Interviewer - "A44": Contributor.UNKNOWN_ROLE, # Interviewee - "A45": Contributor.AUTHOR_ROLE, # Writer of dialogue, captions in a comic book - "A46": Contributor.ARTIST_ROLE, # Inker - "A47": Contributor.ARTIST_ROLE, # Colorist - "A48": Contributor.ARTIST_ROLE, # Letterer - "A51": Contributor.UNKNOWN_ROLE, # 'Research by' - "A99": Contributor.UNKNOWN_ROLE, # 'Other primary creator' - "B01": Contributor.EDITOR_ROLE, - "B02": Contributor.EDITOR_ROLE, # 'Revised by' - "B03": Contributor.UNKNOWN_ROLE, # 'Retold by' - "B04": Contributor.UNKNOWN_ROLE, # 'Abridged by' - "B05": Contributor.ADAPTER_ROLE, - "B06": Contributor.TRANSLATOR_ROLE, - "B07": Contributor.UNKNOWN_ROLE, # 'As told by' - "B08": Contributor.TRANSLATOR_ROLE, # With commentary on the translation - "B09": Contributor.EDITOR_ROLE, # Series editor - "B10": Contributor.TRANSLATOR_ROLE, # 'Edited and translated by' - "B11": Contributor.EDITOR_ROLE, # Editor-in-chief - "B12": Contributor.EDITOR_ROLE, # Guest editor - "B13": Contributor.EDITOR_ROLE, # Volume editor - "B14": Contributor.EDITOR_ROLE, # Editorial board member - "B15": Contributor.EDITOR_ROLE, # 'Editorial coordination by' - "B16": Contributor.EDITOR_ROLE, # Managing editor - "B17": Contributor.EDITOR_ROLE, # Founding editor of a serial publication - "B18": Contributor.EDITOR_ROLE, # 'Prepared for publication by' - "B19": Contributor.EDITOR_ROLE, # Associate editor - "B20": Contributor.EDITOR_ROLE, # Consultant editor - "B21": Contributor.EDITOR_ROLE, # General editor - "B22": Contributor.UNKNOWN_ROLE, # 'Dramatized by' - "B23": Contributor.EDITOR_ROLE, # 'General rapporteur' - "B24": Contributor.EDITOR_ROLE, # Literary editor - "B25": Contributor.COMPOSER_ROLE, # 'Arranged by (music)' - "B26": Contributor.EDITOR_ROLE, # Technical editor - "B27": Contributor.UNKNOWN_ROLE, # Thesis advisor - "B28": Contributor.UNKNOWN_ROLE, # Thesis examiner - "B29": Contributor.EDITOR_ROLE, # Scientific editor - "B30": Contributor.UNKNOWN_ROLE, # Historical advisor - "B31": Contributor.UNKNOWN_ROLE, # Editor of the first edition who is not an editor of the current edition - "B99": Contributor.EDITOR_ROLE, # Other type of adaptation or editing - "C01": Contributor.UNKNOWN_ROLE, # 'Compiled by' - "C02": Contributor.UNKNOWN_ROLE, # 'Selected by' - "C03": Contributor.UNKNOWN_ROLE, # 'Non-text material selected by' - "C04": Contributor.UNKNOWN_ROLE, # 'Curated by' - "C99": Contributor.UNKNOWN_ROLE, # Other type of compilation - "D01": Contributor.PRODUCER_ROLE, - "D02": Contributor.DIRECTOR_ROLE, - "D03": Contributor.MUSICIAN_ROLE, # Conductor - "D04": Contributor.UNKNOWN_ROLE, # Choreographer - "D05": Contributor.DIRECTOR_ROLE, # Other type of direction - "E01": Contributor.ACTOR_ROLE, - "E02": Contributor.PERFORMER_ROLE, # Dancer - "E03": Contributor.NARRATOR_ROLE, # 'Narrator' - "E04": Contributor.UNKNOWN_ROLE, # Commentator - "E05": Contributor.PERFORMER_ROLE, # Vocal soloist - "E06": Contributor.PERFORMER_ROLE, # Instrumental soloist - "E07": Contributor.NARRATOR_ROLE, # Reader of recorded text, as in an audiobook - "E08": Contributor.PERFORMER_ROLE, # Name of a musical group in a performing role - "E09": Contributor.PERFORMER_ROLE, # Speaker - "E10": Contributor.UNKNOWN_ROLE, # Presenter - "E99": Contributor.PERFORMER_ROLE, # Other type of performer - "F01": Contributor.PHOTOGRAPHER_ROLE, # 'Filmed/photographed by' - "F02": Contributor.EDITOR_ROLE, # 'Editor (film or video)' - "F99": Contributor.UNKNOWN_ROLE, # Other type of recording - "Z01": Contributor.UNKNOWN_ROLE, # 'Assisted by' - "Z02": Contributor.UNKNOWN_ROLE, # 'Honored/dedicated to' - "Z99": Contributor.UNKNOWN_ROLE, # Other creative responsibility - } - - PRODUCT_CONTENT_TYPES = { - "10": EditionConstants.BOOK_MEDIUM, # Text (eye-readable) - "01": EditionConstants.AUDIO_MEDIUM, # Audiobook - } - - _logger = logging.getLogger(__name__) - - @classmethod - def parse(cls, file, data_source_name, default_medium=None): - metadata_records = [] - - # TODO: ONIX has plain language 'reference names' and short tags that - # may be used interchangably. This code currently only handles short tags, - # and it's not comprehensive. - - parser = XMLParser() - tree = etree.parse(file) - root = tree.getroot() - - for record in root.findall("product"): - title = parser.text_of_optional_subtag( - record, "descriptivedetail/titledetail/titleelement/b203" - ) - if not title: - title_prefix = parser.text_of_optional_subtag( - record, "descriptivedetail/titledetail/titleelement/b030" - ) - title_without_prefix = parser.text_of_optional_subtag( - record, "descriptivedetail/titledetail/titleelement/b031" - ) - if title_prefix and title_without_prefix: - title = title_prefix + " " + title_without_prefix - - medium = parser.text_of_optional_subtag(record, "b385") - - if not medium and default_medium: - medium = default_medium - else: - medium = cls.PRODUCT_CONTENT_TYPES.get( - medium, EditionConstants.BOOK_MEDIUM - ) - - subtitle = parser.text_of_optional_subtag( - record, "descriptivedetail/titledetail/titleelement/b029" - ) - language = ( - parser.text_of_optional_subtag( - record, "descriptivedetail/language/b252" - ) - or "eng" - ) - publisher = parser.text_of_optional_subtag( - record, "publishingdetail/publisher/b081" - ) - imprint = parser.text_of_optional_subtag( - record, "publishingdetail/imprint/b079" - ) - if imprint == publisher: - imprint = None - - publishing_date = parser.text_of_optional_subtag( - record, "publishingdetail/publishingdate/b306" - ) - issued = None - if publishing_date: - issued = dateutil.parser.isoparse(publishing_date) - if issued.tzinfo is None: - cls._logger.warning( - "Publishing date {} does not contain timezone information. Assuming UTC.".format( - publishing_date - ) - ) - issued = to_utc(issued) - - identifier_tags = parser._xpath(record, "productidentifier") - identifiers = [] - primary_identifier = None - for tag in identifier_tags: - type = parser.text_of_subtag(tag, "b221") - if type == "02" or type == "15": - primary_identifier = IdentifierData( - Identifier.ISBN, parser.text_of_subtag(tag, "b244") - ) - identifiers.append(primary_identifier) - - subject_tags = parser._xpath(record, "descriptivedetail/subject") - subjects = [] - - weight = Classification.TRUSTED_DISTRIBUTOR_WEIGHT - for tag in subject_tags: - type = parser.text_of_subtag(tag, "b067") - if type in cls.SUBJECT_TYPES: - b069 = parser.text_of_optional_subtag(tag, "b069") - - if b069: - subjects.append( - SubjectData(cls.SUBJECT_TYPES[type], b069, weight=weight) - ) - - audience_tags = parser._xpath(record, "descriptivedetail/audience/b204") - audiences = [] - for tag in audience_tags: - if tag.text in cls.AUDIENCE_TYPES: - subjects.append( - SubjectData( - Subject.FREEFORM_AUDIENCE, - cls.AUDIENCE_TYPES[tag.text], - weight=weight, - ) - ) - - # TODO: We don't handle ONIX unnamed and alternatively named contributors. - contributor_tags = parser._xpath(record, "descriptivedetail/contributor") - contributors = [] - for tag in contributor_tags: - type = parser.text_of_subtag(tag, "b035") - if type in cls.CONTRIBUTOR_TYPES: - person_name_display = parser.text_of_optional_subtag(tag, "b036") - person_name_inverted = parser.text_of_optional_subtag(tag, "b037") - corp_name_display = parser.text_of_optional_subtag(tag, "b047") - corp_name_inverted = parser.text_of_optional_subtag(tag, "x443") - bio = parser.text_of_optional_subtag(tag, "b044") - family_name = None - if person_name_display or person_name_inverted: - display_name = person_name_display - sort_name = person_name_inverted - family_name = parser.text_of_optional_subtag(tag, "b040") - elif corp_name_display or corp_name_inverted: - display_name = corp_name_display - # Sort form for corporate name might just be the display name - sort_name = corp_name_inverted or corp_name_display - else: - sort_name = display_name = None - contributors.append( - ContributorData( - sort_name=sort_name, - display_name=display_name, - family_name=family_name, - roles=[cls.CONTRIBUTOR_TYPES[type]], - biography=bio, - ) - ) - - collateral_tags = parser._xpath(record, "collateraldetail/textcontent") - links = [] - for tag in collateral_tags: - type = parser.text_of_subtag(tag, "x426") - # TODO: '03' is the summary in the example I'm testing, but that - # might not be generally true. - if type == "03": - text = parser.text_of_subtag(tag, "d104") - links.append( - LinkData( - rel=Hyperlink.DESCRIPTION, - media_type=Representation.TEXT_HTML_MEDIA_TYPE, - content=text, - ) - ) - - usage_constraint_tags = parser._xpath( - record, "descriptivedetail/epubusageconstraint" - ) - licenses_owned = LicensePool.UNLIMITED_ACCESS - - if usage_constraint_tags: - cls._logger.debug( - "Found {} EpubUsageConstraint tags".format( - len(usage_constraint_tags) - ) - ) - - for usage_constraint_tag in usage_constraint_tags: - usage_status = parser.text_of_subtag(usage_constraint_tag, "x319") - - cls._logger.debug(f"EpubUsageStatus: {usage_status}") - - if usage_status == UsageStatus.PROHIBITED.value: - raise Exception("The content is prohibited") - elif usage_status == UsageStatus.LIMITED.value: - usage_limit_tags = parser._xpath( - record, "descriptivedetail/epubusageconstraint/epubusagelimit" - ) - - cls._logger.debug( - f"Found {len(usage_limit_tags)} EpubUsageLimit tags" - ) - - if not usage_limit_tags: - continue - - [usage_limit_tag] = usage_limit_tags - - usage_unit = parser.text_of_subtag(usage_limit_tag, "x321") - - cls._logger.debug(f"EpubUsageUnit: {usage_unit}") - - if ( - usage_unit == UsageUnit.COPIES.value - or usage_status == UsageUnit.CONCURRENT_USERS.value - ): - quantity_limit = parser.text_of_subtag(usage_limit_tag, "x320") - - cls._logger.debug(f"Quantity: {quantity_limit}") - - if licenses_owned == LicensePool.UNLIMITED_ACCESS: - licenses_owned = 0 - - licenses_owned += int(quantity_limit) - - metadata_records.append( - Metadata( - data_source=data_source_name, - title=title, - subtitle=subtitle, - language=language, - medium=medium, - publisher=publisher, - imprint=imprint, - issued=issued, - primary_identifier=primary_identifier, - identifiers=identifiers, - subjects=subjects, - contributors=contributors, - links=links, - circulation=CirculationData( - data_source_name, - primary_identifier, - licenses_owned=licenses_owned, - licenses_available=licenses_owned, - licenses_reserved=0, - patrons_in_hold_queue=0, - ), - ) - ) - - return metadata_records diff --git a/bin/directory_import b/bin/directory_import deleted file mode 100755 index 51baf60bdb..0000000000 --- a/bin/directory_import +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env python -"""Import books into a collection from local disk storage.""" -import os -import sys - -bin_dir = os.path.split(__file__)[0] -package_dir = os.path.join(bin_dir, "..") -sys.path.append(os.path.abspath(package_dir)) - -# NOTE: We need to import it explicitly to initialize MirrorUploader.IMPLEMENTATION_REGISTRY -from api.lcp import mirror # noqa: autoflake -from scripts import DirectoryImportScript - -DirectoryImportScript().run() diff --git a/core/metadata_layer.py b/core/metadata_layer.py index 15c4ef54e4..9290b68d0f 100644 --- a/core/metadata_layer.py +++ b/core/metadata_layer.py @@ -9,17 +9,15 @@ import csv import datetime import logging -import re from collections import defaultdict from typing import List, Optional from dateutil.parser import parse -from pymarc import MARCReader from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import and_, or_ from .analytics import Analytics -from .classifier import NO_NUMBER, NO_VALUE, Classifier +from .classifier import NO_NUMBER, NO_VALUE from .model import ( Classification, Collection, @@ -46,9 +44,9 @@ from .model.configuration import ExternalIntegrationLink from .model.licensing import LicenseFunctions, LicenseStatus from .util import LanguageCodes -from .util.datetime_helpers import strptime_utc, to_utc, utc_now +from .util.datetime_helpers import to_utc, utc_now from .util.median import median -from .util.personal_names import display_name_to_sort_name, name_tidy +from .util.personal_names import display_name_to_sort_name class ReplacementPolicy: @@ -2380,108 +2378,3 @@ def _date_field(self, row, field_name): self.log.warning('Could not parse date "%s"' % value) value = None return value - - -class MARCExtractor: - - """Transform a MARC file into a list of Metadata objects. - - This is not totally general, but it's a good start. - """ - - # Common things found in a MARC record after the name of the author - # which we sould like to remove. - END_OF_AUTHOR_NAME_RES = [ - re.compile(r",\s+[0-9]+-"), # Birth year - re.compile(r",\s+active "), - re.compile(r",\s+graf,"), - re.compile(r",\s+author."), - ] - - @classmethod - def name_cleanup(cls, name): - # Turn 'Dante Alighieri, 1265-1321, author.' - # into 'Dante Alighieri'. - for regex in cls.END_OF_AUTHOR_NAME_RES: - match = regex.search(name) - if match: - name = name[: match.start()] - break - name = name_tidy(name) - return name - - @classmethod - def parse_year(cls, value): - """Handle a publication year that may not be in the right format.""" - for format in ("%Y", "%Y."): - try: - return strptime_utc(value, format) - except ValueError: - continue - return None - - @classmethod - def parse(cls, file, data_source_name, default_medium_type=None): - reader = MARCReader(file) - metadata_records = [] - - for record in reader: - title = record.title - if title.endswith(" /"): - title = title[: -len(" /")] - issued_year = cls.parse_year(record.pubyear) - publisher = record.publisher - if publisher.endswith(","): - publisher = publisher[:-1] - - links = [] - summary = record.notes[0]["a"] - - if summary: - summary_link = LinkData( - rel=Hyperlink.DESCRIPTION, - media_type=Representation.TEXT_PLAIN, - content=summary, - ) - links.append(summary_link) - - isbn = record["020"]["a"].split(" ")[0] - primary_identifier = IdentifierData(Identifier.ISBN, isbn) - - subjects = [ - SubjectData( - Classifier.FAST, - subject["a"], - ) - for subject in record.subjects - ] - - author = record.author - if author: - author = cls.name_cleanup(author) - author_names = [author] - else: - author_names = ["Anonymous"] - contributors = [ - ContributorData( - sort_name=author, - roles=[Contributor.AUTHOR_ROLE], - ) - for author in author_names - ] - - metadata_records.append( - Metadata( - data_source=data_source_name, - title=title, - language="eng", - medium=Edition.BOOK_MEDIUM, - publisher=publisher, - issued=issued_year, - primary_identifier=primary_identifier, - subjects=subjects, - contributors=contributors, - links=links, - ) - ) - return metadata_records diff --git a/core/model/configuration.py b/core/model/configuration.py index 264d7c3dd2..96df379648 100644 --- a/core/model/configuration.py +++ b/core/model/configuration.py @@ -182,7 +182,6 @@ class ExternalIntegration(Base): ODL = "ODL" ODL2 = "ODL 2.0" LCP = DataSourceConstants.LCP - MANUAL = DataSourceConstants.MANUAL PROQUEST = DataSourceConstants.PROQUEST # These protocols were used on the Content Server when mirroring @@ -201,7 +200,6 @@ class ExternalIntegration(Base): AXIS_360, GUTENBERG, ENKI, - MANUAL, ] # Some integrations with LICENSE_GOAL imply that the data and diff --git a/scripts.py b/scripts.py index 33ae1124bd..f088686dfb 100644 --- a/scripts.py +++ b/scripts.py @@ -24,7 +24,6 @@ from api.marc import LibraryAnnotator as MARCLibraryAnnotator from api.novelist import NoveListAPI from api.nyt import NYTBestSellerAPI -from api.onix import ONIXExtractor from api.opds_for_distributors import ( OPDSForDistributorsImporter, OPDSForDistributorsImportMonitor, @@ -36,35 +35,21 @@ from core.lane import Facets, FeaturedFacets, Lane, Pagination from core.log import LogConfiguration from core.marc import MARCExporter -from core.metadata_layer import ( - CirculationData, - FormatData, - LinkData, - MARCExtractor, - ReplacementPolicy, -) -from core.mirror import MirrorUploader from core.model import ( LOCK_ID_DB_INIT, CachedMARCFile, CirculationEvent, - Collection, ConfigurationSetting, Contribution, DataSource, - DeliveryMechanism, Edition, - EditionConstants, ExternalIntegration, Hold, - Hyperlink, Identifier, Library, LicensePool, Loan, Patron, - Representation, - RightsStatus, SessionManager, get_one, pg_advisory_lock, @@ -72,7 +57,6 @@ from core.model.configuration import ExternalIntegrationLink from core.opds import AcquisitionFeed from core.scripts import ( - CollectionType, IdentifierInputScript, LaneSweeperScript, LibraryInputScript, @@ -1177,558 +1161,6 @@ class OPDSForDistributorsReaperScript(OPDSImportScript): PROTOCOL = OPDSForDistributorsImporter.NAME -class DirectoryImportScript(TimestampScript): - """Import some books into a collection, based on a file containing - metadata and directories containing ebook and cover files. - """ - - name = "Import new titles from a directory on disk" - - @classmethod - def arg_parser(cls, _db): - parser = argparse.ArgumentParser() - parser.add_argument( - "--collection-name", - help="Titles will be imported into a collection with this name. The collection will be created if it does not already exist.", - required=True, - ) - parser.add_argument( - "--collection-type", - help="Collection type. Valid values are: OPEN_ACCESS (default), PROTECTED_ACCESS, LCP.", - type=CollectionType, - choices=list(CollectionType), - default=CollectionType.OPEN_ACCESS, - ) - parser.add_argument( - "--data-source-name", - help="All data associated with this import activity will be recorded as originating with this data source. The data source will be created if it does not already exist.", - required=True, - ) - parser.add_argument( - "--metadata-file", - help="Path to a file containing MARC or ONIX 3.0 metadata for every title in the collection", - required=True, - ) - parser.add_argument( - "--metadata-format", - help='Format of the metadata file ("marc" or "onix")', - default="marc", - ) - parser.add_argument( - "--cover-directory", - help="Directory containing a full-size cover image for every title in the collection.", - ) - parser.add_argument( - "--ebook-directory", - help="Directory containing an EPUB or PDF file for every title in the collection.", - required=True, - ) - RS = RightsStatus - rights_uris = ", ".join(RS.OPEN_ACCESS) - parser.add_argument( - "--rights-uri", - help="A URI explaining the rights status of the works being uploaded. Acceptable values: %s" - % rights_uris, - required=True, - ) - parser.add_argument( - "--dry-run", - help="Show what would be imported, but don't actually do the import.", - action="store_true", - ) - parser.add_argument( - "--default-medium-type", - help="Default medium type used in the case when it's not explicitly specified in a metadata file. " - "Valid values are: {}.".format( - ", ".join(EditionConstants.FULFILLABLE_MEDIA) - ), - type=str, - choices=EditionConstants.FULFILLABLE_MEDIA, - ) - - return parser - - def do_run(self, cmd_args=None): - parser = self.arg_parser(self._db) - parsed = parser.parse_args(cmd_args) - collection_name = parsed.collection_name - collection_type = parsed.collection_type - data_source_name = parsed.data_source_name - metadata_file = parsed.metadata_file - metadata_format = parsed.metadata_format - cover_directory = parsed.cover_directory - ebook_directory = parsed.ebook_directory - rights_uri = parsed.rights_uri - dry_run = parsed.dry_run - default_medium_type = parsed.default_medium_type - - return self.run_with_arguments( - collection_name=collection_name, - collection_type=collection_type, - data_source_name=data_source_name, - metadata_file=metadata_file, - metadata_format=metadata_format, - cover_directory=cover_directory, - ebook_directory=ebook_directory, - rights_uri=rights_uri, - dry_run=dry_run, - default_medium_type=default_medium_type, - ) - - def run_with_arguments( - self, - collection_name, - collection_type, - data_source_name, - metadata_file, - metadata_format, - cover_directory, - ebook_directory, - rights_uri, - dry_run, - default_medium_type=None, - ): - if dry_run: - self.log.warning( - "This is a dry run. No files will be uploaded and nothing will change in the database." - ) - - collection, mirrors = self.load_collection( - collection_name, collection_type, data_source_name - ) - - if not collection or not mirrors: - return - - self.timestamp_collection = collection - - if dry_run: - mirrors = None - - self_hosted_collection = collection_type in ( - CollectionType.OPEN_ACCESS, - CollectionType.PROTECTED_ACCESS, - ) - replacement_policy = ReplacementPolicy.from_license_source(self._db) - replacement_policy.mirrors = mirrors - metadata_records = self.load_metadata( - metadata_file, metadata_format, data_source_name, default_medium_type - ) - for metadata in metadata_records: - _, licensepool = self.work_from_metadata( - collection, - collection_type, - metadata, - replacement_policy, - cover_directory, - ebook_directory, - rights_uri, - ) - - licensepool.self_hosted = True if self_hosted_collection else False - - if not dry_run: - self._db.commit() - - def load_collection(self, collection_name, collection_type, data_source_name): - """Locate a Collection with the given name. - - If the collection is found, it will be associated - with the given data source and configured with existing - covers and books mirror configurations. - - :param collection_name: Name of the Collection. - :type collection_name: string - - :param collection_type: Type of the collection: open access/proteceted access. - :type collection_name: CollectionType - - :param data_source_name: Associate this data source with - the Collection if it does not already have a data source. - A DataSource object will be created if necessary. - :type data_source_name: string - - :return: A 2-tuple (Collection, list of MirrorUploader instances) - :rtype: Tuple[Collection, List[MirrorUploader]] - """ - collection, is_new = Collection.by_name_and_protocol( - self._db, - collection_name, - ExternalIntegration.LCP - if collection_type == CollectionType.LCP - else ExternalIntegration.MANUAL, - ) - - if is_new: - self.log.error( - "An existing collection must be used and should be set up before running this script." - ) - return None, None - - mirrors = dict(covers_mirror=None, books_mirror=None) - - types = [ - ExternalIntegrationLink.COVERS, - ExternalIntegrationLink.OPEN_ACCESS_BOOKS - if collection_type == CollectionType.OPEN_ACCESS - else ExternalIntegrationLink.PROTECTED_ACCESS_BOOKS, - ] - for type in types: - mirror_for_type = MirrorUploader.for_collection(collection, type) - if not mirror_for_type: - self.log.error( - "An existing %s mirror integration should be assigned to the collection before running the script." - % type - ) - return None, None - mirrors[type] = mirror_for_type - - data_source = DataSource.lookup( - self._db, data_source_name, autocreate=True, offers_licenses=True - ) - settings = collection.integration_configuration.settings_dict.copy() - settings[Collection.DATA_SOURCE_NAME_SETTING] = data_source.name - - return collection, mirrors - - def load_metadata( - self, metadata_file, metadata_format, data_source_name, default_medium_type - ): - """Read a metadata file and convert the data into Metadata records.""" - metadata_records = [] - - if metadata_format == "marc": - extractor = MARCExtractor() - elif metadata_format == "onix": - extractor = ONIXExtractor() - - with open(metadata_file) as f: - metadata_records.extend( - extractor.parse(f, data_source_name, default_medium_type) - ) - return metadata_records - - def work_from_metadata( - self, collection, collection_type, metadata, policy, *args, **kwargs - ): - """Creates a Work instance from metadata - - :param collection: Target collection - :type collection: Collection - - :param collection_type: Collection's type: open access/protected access - :type collection_type: CollectionType - - :param metadata: Book's metadata - :type metadata: Metadata - - :param policy: Replacement policy - :type policy: ReplacementPolicy - - :return: A 2-tuple of (Work object, LicensePool object) - :rtype: Tuple[core.model.work.Work, LicensePool] - """ - self.annotate_metadata(collection_type, metadata, policy, *args, **kwargs) - - if not metadata.circulation: - # We cannot actually provide access to the book so there - # is no point in proceeding with the import. - return - - edition, new = metadata.edition(self._db) - metadata.apply(edition, collection, replace=policy) - [pool] = [x for x in edition.license_pools if x.collection == collection] - if new: - self.log.info("Created new edition for %s", edition.title) - else: - self.log.info("Updating existing edition for %s", edition.title) - - work, ignore = pool.calculate_work() - if work: - work.set_presentation_ready() - self.log.info(f"FINALIZED {work.title}/{work.author}/{work.sort_author}") - return work, pool - - def annotate_metadata( - self, - collection_type, - metadata, - policy, - cover_directory, - ebook_directory, - rights_uri, - ): - """Add a CirculationData and possibly an extra LinkData to `metadata` - - :param collection_type: Collection's type: open access/protected access - :type collection_type: CollectionType - - :param metadata: Book's metadata - :type metadata: Metadata - - :param policy: Replacement policy - :type policy: ReplacementPolicy - - :param cover_directory: Directory containing book covers - :type cover_directory: string - - :param ebook_directory: Directory containing books - :type ebook_directory: string - - :param rights_uri: URI explaining the rights status of the works being uploaded - :type rights_uri: string - """ - identifier, ignore = metadata.primary_identifier.load(self._db) - data_source = metadata.data_source(self._db) - mirrors = policy.mirrors - - circulation_data = self.load_circulation_data( - collection_type, - identifier, - data_source, - ebook_directory, - mirrors, - metadata.title, - rights_uri, - ) - if not circulation_data: - # There is no point in contining. - return - - if metadata.circulation: - circulation_data.licenses_owned = metadata.circulation.licenses_owned - circulation_data.licenses_available = ( - metadata.circulation.licenses_available - ) - circulation_data.licenses_reserved = metadata.circulation.licenses_reserved - circulation_data.patrons_in_hold_queue = ( - metadata.circulation.patrons_in_hold_queue - ) - circulation_data.licenses = metadata.circulation.licenses - - metadata.circulation = circulation_data - - # If a cover image is available, add it to the Metadata - # as a link. - cover_link = None - if cover_directory: - cover_link = self.load_cover_link( - identifier, data_source, cover_directory, mirrors - ) - if cover_link: - metadata.links.append(cover_link) - else: - logging.info( - "Proceeding with import even though %r has no cover.", identifier - ) - - def load_circulation_data( - self, - collection_type, - identifier, - data_source, - ebook_directory, - mirrors, - title, - rights_uri, - ): - """Loads an actual copy of a book from disk - - :param collection_type: Collection's type: open access/protected access - :type collection_type: CollectionType - - :param identifier: Book's identifier - :type identifier: core.model.identifier.Identifier, - - :param data_source: DataSource object - :type data_source: DataSource - - :param ebook_directory: Directory containing books - :type ebook_directory: string - - :param mirrors: Dictionary containing mirrors for books and their covers - :type mirrors: Dict[string, MirrorUploader] - - :param title: Book's title - :type title: string - - :param rights_uri: URI explaining the rights status of the works being uploaded - :type rights_uri: string - - :return: A CirculationData that contains the book as an open-access - download, or None if no such book can be found - :rtype: CirculationData - """ - ignore, book_media_type, book_content = self._locate_file( - identifier.identifier, - ebook_directory, - Representation.COMMON_EBOOK_EXTENSIONS, - "ebook file", - ) - if not book_content: - # We couldn't find an actual copy of the book, so there is - # no point in proceeding. - return - - book_mirror = ( - mirrors[ - ExternalIntegrationLink.OPEN_ACCESS_BOOKS - if collection_type == CollectionType.OPEN_ACCESS - else ExternalIntegrationLink.PROTECTED_ACCESS_BOOKS - ] - if mirrors - else None - ) - - # Use the S3 storage for books. - if book_mirror: - book_url = book_mirror.book_url( - identifier, - "." + Representation.FILE_EXTENSIONS[book_media_type], - open_access=collection_type == CollectionType.OPEN_ACCESS, - data_source=data_source, - title=title, - ) - else: - # This is a dry run and we won't be mirroring anything. - book_url = ( - identifier.identifier - + "." - + Representation.FILE_EXTENSIONS[book_media_type] - ) - - book_link_rel = ( - Hyperlink.OPEN_ACCESS_DOWNLOAD - if collection_type == CollectionType.OPEN_ACCESS - else Hyperlink.GENERIC_OPDS_ACQUISITION - ) - book_link = LinkData( - rel=book_link_rel, - href=book_url, - media_type=book_media_type, - content=book_content, - ) - formats = [ - FormatData( - content_type=book_media_type, - drm_scheme=DeliveryMechanism.LCP_DRM - if collection_type == CollectionType.LCP - else DeliveryMechanism.NO_DRM, - link=book_link, - ) - ] - circulation_data = CirculationData( - data_source=data_source.name, - primary_identifier=identifier, - links=[book_link], - formats=formats, - default_rights_uri=rights_uri, - ) - return circulation_data - - def load_cover_link(self, identifier, data_source, cover_directory, mirrors): - """Load an actual book cover from disk. - - :return: A LinkData containing a cover of the book, or None - if no book cover can be found. - """ - cover_filename, cover_media_type, cover_content = self._locate_file( - identifier.identifier, - cover_directory, - Representation.COMMON_IMAGE_EXTENSIONS, - "cover image", - ) - - if not cover_content: - return None - cover_filename = ( - identifier.identifier - + "." - + Representation.FILE_EXTENSIONS[cover_media_type] - ) - - # Use an S3 storage mirror for specifically for covers. - if mirrors and mirrors[ExternalIntegrationLink.COVERS]: - cover_url = mirrors[ExternalIntegrationLink.COVERS].cover_image_url( - data_source, identifier, cover_filename - ) - else: - # This is a dry run and we won't be mirroring anything. - cover_url = cover_filename - - cover_link = LinkData( - rel=Hyperlink.IMAGE, - href=cover_url, - media_type=cover_media_type, - content=cover_content, - ) - return cover_link - - @classmethod - def _locate_file( - cls, - base_filename, - directory, - extensions, - file_type="file", - mock_filesystem_operations=None, - ): - """Find an acceptable file in the given directory. - - :param base_filename: A string to be used as the base of the filename. - - :param directory: Look for a file in this directory. - - :param extensions: Any of these extensions for the file is - acceptable. - - :param file_type: Human-readable description of the type of - file we're looking for. This is used only in a log warning if - no file can be found. - - :param mock_filesystem_operations: A test may pass in a - 2-tuple of functions to replace os.path.exists and the 'open' - function. - - :return: A 3-tuple. (None, None, None) if no file can be - found; otherwise (filename, media_type, contents). - """ - if mock_filesystem_operations: - exists_f, open_f = mock_filesystem_operations - else: - exists_f = os.path.exists - open_f = open - - success_path = None - media_type = None - attempts = [] - for extension in extensions: - for ext in (extension, extension.upper()): - if not ext.startswith("."): - ext = "." + ext - filename = base_filename + ext - path = os.path.join(directory, filename) - attempts.append(path) - if exists_f(path): - media_type = Representation.MEDIA_TYPE_FOR_EXTENSION.get( - ext.lower() - ) - content = None - with open_f(path, "rb") as fh: - content = fh.read() - return filename, media_type, content - - # If we went through that whole loop without returning, - # we have failed. - logging.warning( - "Could not find %s for %s. Looked in: %s", - file_type, - base_filename, - ", ".join(attempts), - ) - return None, None, None - - class LaneResetScript(LibraryInputScript): """Reset a library's lanes based on language configuration or estimates of the library's current collection.""" diff --git a/tests/api/conftest.py b/tests/api/conftest.py index fc1309236c..20d39e22d0 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -31,7 +31,6 @@ "tests.fixtures.files", "tests.fixtures.flask", "tests.fixtures.library", - "tests.fixtures.marc_files", "tests.fixtures.odl", "tests.fixtures.opds2_files", "tests.fixtures.opds_files", diff --git a/tests/api/lcp/test_collection.py b/tests/api/lcp/test_collection.py index 8ccd26977f..2aea735dc2 100644 --- a/tests/api/lcp/test_collection.py +++ b/tests/api/lcp/test_collection.py @@ -293,7 +293,7 @@ def test_patron_activity_returns_correct_result(self, lcp_api_fixture): # 2. Loan from a different collection other_collection = lcp_api_fixture.db.collection( - protocol=ExternalIntegration.MANUAL + protocol=ExternalIntegration.LCP ) other_external_identifier = "2" other_license_pool = lcp_api_fixture.db.licensepool( diff --git a/tests/api/test_onix.py b/tests/api/test_onix.py deleted file mode 100644 index 9e80ed80b0..0000000000 --- a/tests/api/test_onix.py +++ /dev/null @@ -1,92 +0,0 @@ -from io import BytesIO - -import pytest - -from api.onix import ONIXExtractor -from core.classifier import Classifier -from core.metadata_layer import CirculationData -from core.model import Classification, Edition, Identifier, LicensePool -from core.util.datetime_helpers import datetime_utc - -from ..fixtures.api_onix_files import ONIXFilesFixture - - -class TestONIXExtractor: - def test_parser(self, api_onix_files_fixture: ONIXFilesFixture): - """Parse an ONIX file into Metadata objects.""" - - file = api_onix_files_fixture.sample_data("onix_example.xml") - metadata_records = ONIXExtractor().parse(BytesIO(file), "MIT Press") - - assert 2 == len(metadata_records) - - record = metadata_records[0] - assert "Safe Spaces, Brave Spaces" == record.title - assert "Diversity and Free Expression in Education" == record.subtitle - assert "Palfrey, John" == record.contributors[0].sort_name - assert "John Palfrey" == record.contributors[0].display_name - assert "Palfrey" == record.contributors[0].family_name - assert "Head of School at Phillips Academy" in record.contributors[0].biography - assert "The MIT Press" == record.publisher - assert None == record.imprint - assert "9780262343664" == record.primary_identifier.identifier - assert Identifier.ISBN == record.primary_identifier.type - assert "eng" == record.language - assert datetime_utc(2017, 10, 6) == record.issued - subjects = record.subjects - assert 7 == len(subjects) - assert "EDU015000" == subjects[0].identifier - assert Classifier.AUDIENCE_ADULT == subjects[-1].identifier - assert Classifier.BISAC == subjects[0].type - assert Classification.TRUSTED_DISTRIBUTOR_WEIGHT == subjects[0].weight - assert Edition.BOOK_MEDIUM == record.medium - assert 2017 == record.issued.year - - assert 1 == len(record.links) - assert ( - "the essential democratic values of diversity and free expression" - in record.links[0].content - ) - - record = metadata_records[1] - assert Edition.AUDIO_MEDIUM == record.medium - assert "The Test Corporation" == record.contributors[0].display_name - assert "Test Corporation, The" == record.contributors[0].sort_name - - @pytest.mark.parametrize( - "name,file_name,licenses_number", - [ - ("limited_usage_status", "onix_3_usage_constraints_example.xml", 20), - ( - "unlimited_usage_status", - "onix_3_usage_constraints_with_unlimited_usage_status.xml", - LicensePool.UNLIMITED_ACCESS, - ), - ( - "wrong_usage_unit", - "onix_3_usage_constraints_example_with_day_usage_unit.xml", - LicensePool.UNLIMITED_ACCESS, - ), - ], - ) - def test_parse_parses_correctly_onix_3_usage_constraints( - self, name, file_name, licenses_number, api_onix_files_fixture: ONIXFilesFixture - ): - # Arrange - file = api_onix_files_fixture.sample_data(file_name) - - # Act - metadata_records = ONIXExtractor().parse( - BytesIO(file), "ONIX 3 Usage Constraints Example" - ) - - # Assert - assert len(metadata_records) == 1 - - [metadata_record] = metadata_records - - assert (metadata_record.circulation is not None) == True - assert isinstance(metadata_record.circulation, CirculationData) == True - assert isinstance(metadata_record.circulation, CirculationData) == True - assert metadata_record.circulation.licenses_owned == licenses_number - assert metadata_record.circulation.licenses_available == licenses_number diff --git a/tests/api/test_scripts.py b/tests/api/test_scripts.py index 80a615dede..db2e62a92c 100644 --- a/tests/api/test_scripts.py +++ b/tests/api/test_scripts.py @@ -1,6 +1,5 @@ from __future__ import annotations -import contextlib import datetime import logging from io import StringIO @@ -24,29 +23,18 @@ ) from core.lane import Facets, FeaturedFacets, Pagination, WorkList from core.marc import MARCExporter -from core.metadata_layer import IdentifierData, Metadata, ReplacementPolicy -from core.mirror import MirrorUploader from core.model import ( LOCK_ID_DB_INIT, CachedMARCFile, ConfigurationSetting, Credential, DataSource, - DeliveryMechanism, - EditionConstants, ExternalIntegration, - Hyperlink, - Identifier, - LicensePool, - Representation, - RightsStatus, SessionManager, create, ) from core.model.configuration import ExternalIntegrationLink from core.opds import AcquisitionFeed -from core.s3 import MockS3Uploader -from core.scripts import CollectionType from core.util.datetime_helpers import datetime_utc, utc_now from core.util.flask_util import OPDSFeedResponse, Response from scripts import ( @@ -55,7 +43,6 @@ CacheMARCFiles, CacheOPDSGroupFeedPerLane, CacheRepresentationPerLane, - DirectoryImportScript, GenerateShortTokenScript, InstanceInitializationScript, LanguageListScript, @@ -68,7 +55,6 @@ if TYPE_CHECKING: from tests.fixtures.authenticator import SimpleAuthIntegrationFixture from tests.fixtures.database import DatabaseTransactionFixture - from tests.fixtures.sample_covers import SampleCoversFixture from tests.fixtures.search import ExternalSearchFixture @@ -819,623 +805,6 @@ def test_languages(self, db: DatabaseTransactionFixture): assert ["tgl 1 (Tagalog)"] == output -class MockDirectoryImportScript(DirectoryImportScript): - """Mock a filesystem to make it easier to test DirectoryInputScript.""" - - def __init__(self, _db, mock_filesystem={}): - super().__init__(_db) - self.mock_filesystem = mock_filesystem - self._locate_file_args = None - - def _locate_file(self, identifier, directory, extensions, file_type): - self._locate_file_args = (identifier, directory, extensions, file_type) - return self.mock_filesystem.get(directory, (None, None, None)) - - -class TestDirectoryImportScript: - def test_do_run(self, db: DatabaseTransactionFixture): - # Calling do_run with command-line arguments parses the - # arguments and calls run_with_arguments. - - class Mock(DirectoryImportScript): - def run_with_arguments(self, *args, **kwargs): - self.ran_with = kwargs - - script = Mock(db.session) - script.do_run( - cmd_args=[ - "--collection-name=coll1", - "--data-source-name=ds1", - "--metadata-file=metadata", - "--metadata-format=marc", - "--cover-directory=covers", - "--ebook-directory=ebooks", - "--rights-uri=rights", - "--dry-run", - f"--default-medium-type={EditionConstants.AUDIO_MEDIUM}", - ] - ) - assert { - "collection_name": "coll1", - "collection_type": CollectionType.OPEN_ACCESS, - "data_source_name": "ds1", - "metadata_file": "metadata", - "metadata_format": "marc", - "cover_directory": "covers", - "ebook_directory": "ebooks", - "rights_uri": "rights", - "dry_run": True, - "default_medium_type": EditionConstants.AUDIO_MEDIUM, - } == script.ran_with - - def test_run_with_arguments(self, db: DatabaseTransactionFixture): - - metadata1 = object() - metadata2 = object() - collection = db.default_collection() - mirrors = object() - work = object() - licensepool = LicensePool() - - class Mock(DirectoryImportScript): - """Mock the methods called by run_with_arguments.""" - - def __init__(self, _db): - super(DirectoryImportScript, self).__init__(_db) - self.load_collection_calls = [] - self.load_metadata_calls = [] - self.work_from_metadata_calls = [] - - def load_collection(self, *args): - self.load_collection_calls.append(args) - return collection, mirrors - - def load_metadata(self, *args, **kwargs): - self.load_metadata_calls.append(args) - return [metadata1, metadata2] - - def work_from_metadata(self, *args): - self.work_from_metadata_calls.append(args) - return work, licensepool - - # First, try a dry run. - - # Make a change to a model object so we can track when the - # session is committed. - db.default_collection().name = "changed" - - script = Mock(db.session) - basic_args = [ - "collection name", - CollectionType.OPEN_ACCESS, - "data source name", - "metadata file", - "marc", - "cover directory", - "ebook directory", - "rights URI", - ] - script.run_with_arguments( - *(basic_args + [True] + [EditionConstants.BOOK_MEDIUM]) - ) - - # load_collection was called with the collection and data source names. - assert [ - ("collection name", CollectionType.OPEN_ACCESS, "data source name") - ] == script.load_collection_calls - - # load_metadata was called with the metadata file and data source name. - assert [ - ("metadata file", "marc", "data source name", EditionConstants.BOOK_MEDIUM) - ] == script.load_metadata_calls - - # work_from_metadata was called twice, once on each metadata - # object. - [ - (coll1, t1, o1, policy1, c1, e1, r1), - (coll2, t2, o2, policy2, c2, e2, r2), - ] = script.work_from_metadata_calls - - assert coll1 == db.default_collection() - assert coll1 == coll2 - - assert o1 == metadata1 - assert o2 == metadata2 - - assert c1 == "cover directory" - assert c1 == c2 - - assert e1 == "ebook directory" - assert e1 == e2 - - assert "rights URI" == r1 - assert r1 == r2 - - # Since this is a dry run, the ReplacementPolicy has no mirror - # set. - for policy in (policy1, policy2): - assert None == policy.mirrors - assert True == policy.links - assert True == policy.formats - assert True == policy.contributions - assert True == policy.rights - - # Now try it not as a dry run. - script = Mock(db.session) - script.run_with_arguments(*(basic_args + [False])) - - # This time, the ReplacementPolicy has a mirror set - # appropriately. - [ - (coll1, t1, o1, policy1, c1, e1, r1), - (coll1, t2, o2, policy2, c2, e2, r2), - ] = script.work_from_metadata_calls - for policy in policy1, policy2: - assert mirrors == policy.mirrors - - # timestamp_collection has been set to the Collection that will be - # used when a Timestamp is created for this script. - assert db.default_collection() == script.timestamp_collection - - def test_load_collection_setting_mirrors(self, db: DatabaseTransactionFixture): - # Calling load_collection does not create a new collection. - script = DirectoryImportScript(db.session) - collection, mirrors = script.load_collection( - "New collection", CollectionType.OPEN_ACCESS, "data source name" - ) - assert None == collection - assert None == mirrors - - existing_collection = db.collection( - name="some collection", protocol=ExternalIntegration.MANUAL - ) - - collection, mirrors = script.load_collection( - "some collection", CollectionType.OPEN_ACCESS, "data source name" - ) - - # No covers or books mirrors were created beforehand for this collection - # so nothing is returned. - assert None == collection - assert None == mirrors - - # Both mirrors need to set up or else nothing is returned. - storage1 = db.external_integration( - ExternalIntegration.S3, - ExternalIntegration.STORAGE_GOAL, - username="name", - password="password", - ) - external_integration_link = db.external_integration_link( - integration=existing_collection.external_integration, - other_integration=storage1, - purpose=ExternalIntegrationLink.COVERS, - ) - - collection, mirrors = script.load_collection( - "some collection", CollectionType.OPEN_ACCESS, "data source name" - ) - assert None == collection - assert None == mirrors - - # Create another storage and assign it for the books mirror - storage2 = db.external_integration( - ExternalIntegration.S3, - ExternalIntegration.STORAGE_GOAL, - username="name", - password="password", - ) - external_integration_link = db.external_integration_link( - integration=existing_collection.external_integration, - other_integration=storage2, - purpose=ExternalIntegrationLink.OPEN_ACCESS_BOOKS, - ) - - collection, mirrors = script.load_collection( - "some collection", CollectionType.OPEN_ACCESS, "data source name" - ) - assert collection == existing_collection - assert isinstance(mirrors[ExternalIntegrationLink.COVERS], MirrorUploader) - assert isinstance( - mirrors[ExternalIntegrationLink.OPEN_ACCESS_BOOKS], MirrorUploader - ) - - def test_work_from_metadata( - self, db: DatabaseTransactionFixture, sample_covers_fixture: SampleCoversFixture - ): - # Validate the ability to create a new Work from appropriate metadata. - - class Mock(MockDirectoryImportScript): - """In this test we need to verify that annotate_metadata - was called but did nothing. - """ - - def annotate_metadata(self, collection_type, metadata, *args, **kwargs): - metadata.annotated = True - return super().annotate_metadata( - collection_type, metadata, *args, **kwargs - ) - - identifier = IdentifierData(Identifier.GUTENBERG_ID, "1003") - identifier_obj, ignore = identifier.load(db.session) - metadata = Metadata( - DataSource.GUTENBERG, primary_identifier=identifier, title="A book" - ) - metadata.annotated = False # type: ignore - datasource = DataSource.lookup(db.session, DataSource.GUTENBERG) - policy = ReplacementPolicy.from_license_source(db.session) - mirrors = dict(books_mirror=MockS3Uploader(), covers_mirror=MockS3Uploader()) - mirror_type_books = ExternalIntegrationLink.OPEN_ACCESS_BOOKS - mirror_type_covers = ExternalIntegrationLink.COVERS - policy.mirrors = mirrors - - # Here, work_from_metadata calls annotate_metadata, but does - # not actually import anything because there are no files 'on - # disk' and thus no way to actually get the book. - collection = db.default_collection() - collection_type = CollectionType.OPEN_ACCESS - shared_args = ( - collection_type, - metadata, - policy, - "cover directory", - "ebook directory", - RightsStatus.CC0, - ) - # args = (collection, *shared_args) - script = Mock(db.session) - assert None == script.work_from_metadata(collection, *shared_args) - assert True == metadata.annotated # type: ignore - - # Now let's try it with some files 'on disk'. - with open( - sample_covers_fixture.sample_cover_path("test-book-cover.png"), "rb" - ) as fh: - image = fh.read() - mock_filesystem = { - "cover directory": ("cover.jpg", Representation.JPEG_MEDIA_TYPE, image), - "ebook directory": ( - "book.epub", - Representation.EPUB_MEDIA_TYPE, - "I'm an EPUB.", - ), - } - script = MockDirectoryImportScript(db.session, mock_filesystem=mock_filesystem) # type: ignore - work, licensepool_for_work = script.work_from_metadata(collection, *shared_args) - - # Get the edition that was created for this book. It should have - # already been created by `script.work_from_metadata`. - edition, is_new_edition = metadata.edition(db.session) - assert False == is_new_edition - - # We have created a book. It has a cover image, which has a - # thumbnail. - assert "A book" == work.title - assert ( - work.cover_full_url - == "https://test-cover-bucket.s3.amazonaws.com/Gutenberg/Gutenberg%20ID/1003/1003.jpg" - ) - assert ( - work.cover_thumbnail_url - == "https://test-cover-bucket.s3.amazonaws.com/scaled/300/Gutenberg/Gutenberg%20ID/1003/1003.png" - ) - assert 1 == len(work.license_pools) - assert 1 == len(edition.license_pools) - assert 1 == len( - [lp for lp in edition.license_pools if lp.collection == collection] - ) - [pool] = work.license_pools - assert licensepool_for_work == pool - assert ( - pool.open_access_download_url - == "https://test-content-bucket.s3.amazonaws.com/Gutenberg/Gutenberg%20ID/1003/A%20book.epub" - ) - assert RightsStatus.CC0 == pool.delivery_mechanisms[0].rights_status.uri - - # The two mock S3Uploaders have records of 'uploading' all these files - # to S3. The "books" mirror has the epubs and the "covers" mirror - # contains all the images. - [epub] = mirrors[mirror_type_books].uploaded - [full, thumbnail] = mirrors[mirror_type_covers].uploaded - assert epub.url == pool.open_access_download_url - assert full.url == work.cover_full_url - assert thumbnail.url == work.cover_thumbnail_url - - # The EPUB Representation was cleared out after the upload, to - # save database space. - assert b"I'm an EPUB." == mirrors[mirror_type_books].content[0] - assert None == epub.content - - # Now attempt to get a work for a different collection, but with - # the same metadata. - # Even though there will be two license pools associated with the - # work's presentation edition, the call should be successful. - collection2 = db.collection("second collection") - work2, licensepool_for_work2 = script.work_from_metadata( - collection2, *shared_args - ) - - # The presentation edition should be the same for both works. - edition2 = work2.presentation_edition - assert edition == edition2 - - # The licensepool from which the work is calculated should be - # associated with collection2. - assert licensepool_for_work2.collection == collection2 - - # The work and its presentation edition should both have two licensepools, - # one for each collection. - assert 2 == len(work2.license_pools) - assert 2 == len(edition2.license_pools) - assert 1 == len( - [lp for lp in edition2.license_pools if lp.collection == collection2] - ) - - def test_annotate_metadata(self, db: DatabaseTransactionFixture): - """Verify that annotate_metadata calls load_circulation_data - and load_cover_link appropriately. - """ - - # First, test an unsuccessful annotation. - class MockNoCirculationData(DirectoryImportScript): - """Do nothing when load_circulation_data is called. Explode if - load_cover_link is called. - """ - - def load_circulation_data(self, *args): - self.load_circulation_data_args = args - return None - - def load_cover_link(self, *args): - raise Exception("Explode!") - - collection_type = CollectionType.OPEN_ACCESS - gutenberg = DataSource.lookup(db.session, DataSource.GUTENBERG) - identifier = IdentifierData(Identifier.GUTENBERG_ID, "11111") - identifier_obj, ignore = identifier.load(db.session) - metadata = Metadata( - title=db.fresh_str(), data_source=gutenberg, primary_identifier=identifier - ) - mirrors = object() - policy = ReplacementPolicy(mirrors=mirrors) - cover_directory = object() - ebook_directory = object() - rights_uri = object() - - script = MockNoCirculationData(db.session) - args = ( - collection_type, - metadata, - policy, - cover_directory, - ebook_directory, - rights_uri, - ) - script.annotate_metadata(*args) - - # load_circulation_data was called. - assert ( - collection_type, - identifier_obj, - gutenberg, - ebook_directory, - mirrors, - metadata.title, - rights_uri, - ) == script.load_circulation_data_args - - # But because load_circulation_data returned None, - # metadata.circulation_data was not modified and - # load_cover_link was not called (which would have raised an - # exception). - assert None == metadata.circulation - - # Test a successful annotation with no cover image. - class MockNoCoverLink(DirectoryImportScript): - """Return an object when load_circulation_data is called. - Do nothing when load_cover_link is called. - """ - - def load_circulation_data(self, *args): - return "Some circulation data" - - def load_cover_link(self, *args): - self.load_cover_link_args = args - return None - - script = MockNoCoverLink(db.session) # type: ignore - script.annotate_metadata(*args) - - # The Metadata object was annotated with the return value of - # load_circulation_data. - assert "Some circulation data" == metadata.circulation - - # load_cover_link was called. - assert ( - identifier_obj, - gutenberg, - cover_directory, - mirrors, - ) == script.load_cover_link_args # type: ignore - - # But since it provided no cover link, metadata.links was empty. - assert [] == metadata.links - - # Finally, test a completely successful annotation. - class MockWithCoverLink(DirectoryImportScript): - """Mock success for both load_circulation_data - and load_cover_link. - """ - - def load_circulation_data(self, *args): - return "Some circulation data" - - def load_cover_link(self, *args): - return "A cover link" - - metadata.circulation = None - script = MockWithCoverLink(db.session) # type: ignore - script.annotate_metadata(*args) - - assert "Some circulation data" == metadata.circulation - assert ["A cover link"] == metadata.links - - def test_load_circulation_data(self, db: DatabaseTransactionFixture): - # Create a directory import script with an empty mock filesystem. - script = MockDirectoryImportScript(db.session, {}) - - identifier = db.identifier(Identifier.GUTENBERG_ID, "2345") - gutenberg = DataSource.lookup(db.session, DataSource.GUTENBERG) - mirrors = dict(books_mirror=MockS3Uploader(), covers_mirror=None) - args = ( - CollectionType.OPEN_ACCESS, - identifier, - gutenberg, - "ebooks", - mirrors, - "Name of book", - "rights URI", - ) - - # There is nothing on the mock filesystem, so in this case - # load_circulation_data returns None. - assert None == script.load_circulation_data(*args) - - # But we tried. - assert ( - "2345", - "ebooks", - Representation.COMMON_EBOOK_EXTENSIONS, - "ebook file", - ) == script._locate_file_args - - # Try another script that has a populated mock filesystem. - mock_filesystem = { - "ebooks": ("book.epub", Representation.EPUB_MEDIA_TYPE, "I'm an EPUB.") - } - script = MockDirectoryImportScript(db.session, mock_filesystem) - - # Now _locate_file finds something on the mock filesystem, and - # load_circulation_data loads it into a fully populated - # CirculationData object. - circulation = script.load_circulation_data(*args) - assert identifier == circulation.primary_identifier(db.session) - assert gutenberg == circulation.data_source(db.session) - assert "rights URI" == circulation.default_rights_uri - - # The CirculationData has an open-access link associated with it. - [link] = circulation.links - assert Hyperlink.OPEN_ACCESS_DOWNLOAD == link.rel - assert ( - link.href - == "https://test-content-bucket.s3.amazonaws.com/Gutenberg/Gutenberg%20ID/2345/Name%20of%20book.epub" - ) - assert Representation.EPUB_MEDIA_TYPE == link.media_type - assert "I'm an EPUB." == link.content - - # This open-access link will be made available through a - # delivery mechanism described by this FormatData. - [format] = circulation.formats - assert link == format.link - assert link.media_type == format.content_type - assert DeliveryMechanism.NO_DRM == format.drm_scheme - - def test_load_cover_link(self, db: DatabaseTransactionFixture): - # Create a directory import script with an empty mock filesystem. - script = MockDirectoryImportScript(db.session, {}) - - identifier = db.identifier(Identifier.GUTENBERG_ID, "2345") - gutenberg = DataSource.lookup(db.session, DataSource.GUTENBERG) - mirrors = dict(covers_mirror=MockS3Uploader(), books_mirror=None) - args = (identifier, gutenberg, "covers", mirrors) - - # There is nothing on the mock filesystem, so in this case - # load_cover_link returns None. - assert None == script.load_cover_link(*args) - - # But we tried. - assert ( - "2345", - "covers", - Representation.COMMON_IMAGE_EXTENSIONS, - "cover image", - ) == script._locate_file_args - - # Try another script that has a populated mock filesystem. - mock_filesystem = { - "covers": ("acover.jpeg", Representation.JPEG_MEDIA_TYPE, "I'm an image.") - } - script = MockDirectoryImportScript(db.session, mock_filesystem) - link = script.load_cover_link(*args) - assert Hyperlink.IMAGE == link.rel - assert ( - link.href - == "https://test-cover-bucket.s3.amazonaws.com/Gutenberg/Gutenberg%20ID/2345/2345.jpg" - ) - assert Representation.JPEG_MEDIA_TYPE == link.media_type - assert "I'm an image." == link.content - - def test_locate_file(self): - """Test the ability of DirectoryImportScript._locate_file - to find files on a mock filesystem. - """ - # Create a mock filesystem with a single file. - mock_filesystem = {"directory/thefile.JPEG": "The contents"} - - def mock_exists(path): - return path in mock_filesystem - - @contextlib.contextmanager - def mock_open(path, mode="r"): - yield StringIO(mock_filesystem[path]) - - mock_filesystem_operations = mock_exists, mock_open - - def assert_not_found(base_filename, directory, extensions): - """Verify that the given set of arguments to - _locate_file() does not find anything. - """ - result = DirectoryImportScript._locate_file( - base_filename, - directory, - extensions, - file_type="some file", - mock_filesystem_operations=mock_filesystem_operations, - ) - assert (None, None, None) == result - - def assert_found(base_filename, directory, extensions): - """Verify that the given set of arguments to _locate_file() - finds and loads the single file on the mock filesystem.. - """ - result = DirectoryImportScript._locate_file( - base_filename, - directory, - extensions, - file_type="some file", - mock_filesystem_operations=mock_filesystem_operations, - ) - assert ( - "thefile.JPEG", - Representation.JPEG_MEDIA_TYPE, - "The contents", - ) == result - - # As long as the file and directory match we have some flexibility - # regarding the extensions we look for. - assert_found("thefile", "directory", [".jpeg"]) - assert_found("thefile", "directory", [".JPEG"]) - assert_found("thefile", "directory", ["jpeg"]) - assert_found("thefile", "directory", ["JPEG"]) - assert_found("thefile", "directory", [".another-extension", ".jpeg"]) - - # But file, directory, and (flexible) extension must all match. - assert_not_found("anotherfile", "directory", [".jpeg"]) - assert_not_found("thefile", "another_directory", [".jpeg"]) - assert_not_found("thefile", "directory", [".another-extension"]) - assert_not_found("thefile", "directory", []) - - class TestNovelistSnapshotScript: def mockNoveListAPI(self, *args, **kwargs): self.called_with = (args, kwargs) diff --git a/tests/core/conftest.py b/tests/core/conftest.py index 8c3ecb2987..4f494afddc 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -3,7 +3,6 @@ "tests.fixtures.csv_files", "tests.fixtures.database", "tests.fixtures.library", - "tests.fixtures.marc_files", "tests.fixtures.opds2_files", "tests.fixtures.opds_files", "tests.fixtures.overdrive", diff --git a/tests/core/models/test_configuration.py b/tests/core/models/test_configuration.py index 552a766221..0f90faaa2a 100644 --- a/tests/core/models/test_configuration.py +++ b/tests/core/models/test_configuration.py @@ -755,7 +755,7 @@ def test_delete( db = example_externalintegration_fixture.database_fixture integration1 = db.external_integration( - ExternalIntegration.MANUAL, + ExternalIntegration.LCP, ExternalIntegration.LICENSE_GOAL, libraries=[db.default_library()], ) diff --git a/tests/core/test_metadata.py b/tests/core/test_metadata.py index a5fe92375b..0c3cd1b3d0 100644 --- a/tests/core/test_metadata.py +++ b/tests/core/test_metadata.py @@ -6,14 +6,13 @@ import pytest from core.analytics import Analytics -from core.classifier import NO_NUMBER, NO_VALUE, Classifier +from core.classifier import NO_NUMBER, NO_VALUE from core.metadata_layer import ( CirculationData, ContributorData, CSVMetadataImporter, IdentifierData, LinkData, - MARCExtractor, MeasurementData, Metadata, ReplacementPolicy, @@ -37,11 +36,10 @@ ) from core.model.configuration import ExternalIntegrationLink from core.s3 import MockS3Uploader -from core.util.datetime_helpers import datetime_utc, strptime_utc, utc_now +from core.util.datetime_helpers import datetime_utc, utc_now from tests.core.mock import DummyHTTPClient, LogCaptureHandler from tests.fixtures.csv_files import CSVFilesFixture from tests.fixtures.database import DatabaseTransactionFixture -from tests.fixtures.marc_files import MARCFilesFixture from tests.fixtures.sample_covers import SampleCoversFixture @@ -1936,48 +1934,3 @@ def test_success(self, db: DatabaseTransactionFixture): # with the identifier of the audiobook equivalent_identifiers = [x.output for x in identifier.equivalencies] assert [book.primary_identifier] == equivalent_identifiers - - -class TestMARCExtractor: - def test_parse_year(self): - m = MARCExtractor.parse_year - nineteen_hundred = strptime_utc("1900", "%Y") - assert nineteen_hundred == m("1900") - assert nineteen_hundred == m("1900.") - assert None == m("not a year") - - def test_parser(self, marc_files_fixture: MARCFilesFixture): - """Parse a MARC file into Metadata objects.""" - - file = marc_files_fixture.sample_data("ils_plympton_01.mrc") - metadata_records = MARCExtractor.parse(file, "Plympton") - - assert 36 == len(metadata_records) - - record = metadata_records[1] - assert "Strange Case of Dr Jekyll and Mr Hyde" == record.title - assert "Stevenson, Robert Louis" == record.contributors[0].sort_name - assert "Recovering the Classics" in record.publisher - assert "9781682280041" == record.primary_identifier.identifier - assert Identifier.ISBN == record.primary_identifier.type - subjects = record.subjects - assert 2 == len(subjects) - for s in subjects: - assert Classifier.FAST == s.type - assert "Canon" in subjects[0].identifier - assert Edition.BOOK_MEDIUM == record.medium - assert 2015 == record.issued.year - assert "eng" == record.language - - assert 1 == len(record.links) - assert ( - "Utterson and Enfield are worried about their friend" - in record.links[0].content - ) - - def test_name_cleanup(self): - """Test basic name cleanup techniques.""" - m = MARCExtractor.name_cleanup - assert "Dante Alighieri" == m("Dante Alighieri, 1265-1321, author.") - assert "Stevenson, Robert Louis" == m("Stevenson, Robert Louis.") - assert "Wells, H.G." == m("Wells, H.G.") diff --git a/tests/fixtures/marc_files.py b/tests/fixtures/marc_files.py deleted file mode 100644 index 3259a467e2..0000000000 --- a/tests/fixtures/marc_files.py +++ /dev/null @@ -1,16 +0,0 @@ -import pytest - -from tests.fixtures.files import FilesFixture - - -class MARCFilesFixture(FilesFixture): - """A fixture providing access to MARC files.""" - - def __init__(self): - super().__init__("marc") - - -@pytest.fixture() -def marc_files_fixture() -> MARCFilesFixture: - """A fixture providing access to MARC files.""" - return MARCFilesFixture()