diff --git a/alembic/versions/20241127_c3458e1ef9aa_remove_unsafe_characters_summary.py b/alembic/versions/20241127_c3458e1ef9aa_remove_unsafe_characters_summary.py new file mode 100644 index 000000000..05101d74f --- /dev/null +++ b/alembic/versions/20241127_c3458e1ef9aa_remove_unsafe_characters_summary.py @@ -0,0 +1,32 @@ +"""Remove unsafe characters summary + +Revision ID: c3458e1ef9aa +Revises: 272da5f400de +Create Date: 2024-11-27 20:32:41.431147+00:00 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "c3458e1ef9aa" +down_revision = "272da5f400de" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Remove any characters that are not XML safe from the summary_text field. The code has been + # updated to filter out these characters, but this cleans up any existing data. + # https://www.postgresql.org/docs/current/functions-matching.html#FUNCTIONS-POSIX-REGEXP + op.execute( + "UPDATE works SET summary_text = regexp_replace(" + " summary_text, '[^\u0020-\uD7FF\u0009\u000A\u000D\uE000-\uFFFD\U00010000-\U0010FFFF]+', '', 'g'" + ") WHERE " + "summary_text ~ '[^\u0020-\uD7FF\u0009\u000A\u000D\uE000-\uFFFD\U00010000-\U0010FFFF]'" + ) + + +def downgrade() -> None: + # No need to do anything on downgrade. + pass diff --git a/src/palace/manager/sqlalchemy/model/work.py b/src/palace/manager/sqlalchemy/model/work.py index 9c0cc4263..b3e0fdbc9 100644 --- a/src/palace/manager/sqlalchemy/model/work.py +++ b/src/palace/manager/sqlalchemy/model/work.py @@ -2,11 +2,13 @@ from __future__ import annotations +import re import sys from collections import Counter from collections.abc import Sequence from datetime import date, datetime from decimal import Decimal +from functools import cache from typing import TYPE_CHECKING, Any, cast import opensearchpy @@ -609,11 +611,21 @@ def merge_into(self, other_work): other_work.calculate_presentation() - def set_summary(self, resource): + @classmethod + @cache + def _xml_text_sanitization_regex(cls) -> re.Pattern[str]: + # Source: https://stackoverflow.com/questions/8733233/filtering-out-certain-bytes-in-python + return re.compile( + r"[^\u0020-\uD7FF\u0009\u000A\u000D\uE000-\uFFFD\U00010000-\U0010FFFF]+" + ) + + def set_summary(self, resource: Resource) -> None: self.summary = resource - # TODO: clean up the content if resource and resource.representation: - self.summary_text = resource.representation.unicode_content + # Make sure that the summary text only contains characters that are XML compatible. + self.summary_text = self._xml_text_sanitization_regex().sub( + "", resource.representation.unicode_content + ) else: self.summary_text = "" WorkCoverageRecord.add_for(self, operation=WorkCoverageRecord.SUMMARY_OPERATION) diff --git a/tests/manager/sqlalchemy/model/test_edition.py b/tests/manager/sqlalchemy/model/test_edition.py index a2d7cd4ec..0674b60a5 100644 --- a/tests/manager/sqlalchemy/model/test_edition.py +++ b/tests/manager/sqlalchemy/model/test_edition.py @@ -403,20 +403,32 @@ def test_set_summary(self, db: DatabaseTransactionFixture): work = db.work(presentation_edition=e) overdrive = DataSource.lookup(db.session, DataSource.OVERDRIVE) - # Set the work's summmary. + # Set the work's summary. l1, new = pool.add_link( Hyperlink.DESCRIPTION, None, overdrive, "text/plain", "F" ) work.set_summary(l1.resource) - assert l1.resource == work.summary - assert "F" == work.summary_text + assert work.summary == l1.resource + assert work.summary_text == "F" + + # Set the work's summary to a string that contains characters that cannot be + # represented in XML. + l2, new = pool.add_link( + Hyperlink.DESCRIPTION, + None, + overdrive, + "text/plain", + "\u0000💣ü🔥\u0001\u000C", + ) + work.set_summary(l2.resource) + assert work.summary_text == "💣ü🔥" # Remove the summary. work.set_summary(None) - assert None == work.summary - assert "" == work.summary_text + assert work.summary is None + assert work.summary_text == "" def test_calculate_evaluate_summary_quality_with_privileged_data_sources( self, db: DatabaseTransactionFixture