From 563b82252de38353829ed3fdd05c3cbab526982d Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 15 Nov 2023 20:41:05 +0100 Subject: [PATCH 01/17] allows to store and filter by payload extras --- pgpubsub/listen.py | 4 + pgpubsub/models.py | 13 +- pgpubsub/tests/conftest.py | 14 +++ .../tests/migrations/0012_payload_extras.py | 55 +++++++++ pgpubsub/tests/test_core.py | 13 -- pgpubsub/tests/test_payload_extras.py | 114 ++++++++++++++++++ pgpubsub/triggers.py | 13 +- pyproject.toml | 1 + settings.py | 3 + 9 files changed, 214 insertions(+), 16 deletions(-) create mode 100644 pgpubsub/tests/conftest.py create mode 100644 pgpubsub/tests/migrations/0012_payload_extras.py create mode 100644 pgpubsub/tests/test_payload_extras.py diff --git a/pgpubsub/listen.py b/pgpubsub/listen.py index 1e1b440..be70587 100644 --- a/pgpubsub/listen.py +++ b/pgpubsub/listen.py @@ -4,6 +4,7 @@ import sys from typing import List, Optional, Union +from django.conf import settings from django.core.management import execute_from_command_line from django.db import connection, transaction from django.db.models import Func, Value, Q @@ -163,11 +164,14 @@ def validate(self): def process(self): logger.info( f'Processing notification for {self.channel_cls.name()}') + extras_filter = getattr(settings, 'PGPUBSUB_LISTENER_FILTER', None) + extras_filter = [extras_filter] if extras_filter else [] notification = ( Notification.objects.select_for_update( skip_locked=True).filter( Q(payload=CastToJSONB(Value(self.notification.payload))) | Q(payload=self.notification.payload), + *extras_filter, channel=self.notification.channel, ).first() ) diff --git a/pgpubsub/models.py b/pgpubsub/models.py index 5392b29..6660cda 100644 --- a/pgpubsub/models.py +++ b/pgpubsub/models.py @@ -1,6 +1,6 @@ -from typing import Type +from typing import Optional, Type -from django.db import models +from django.db import connection, connections, models try: from django.db.models import JSONField @@ -58,3 +58,12 @@ def __repr__(self): @classmethod def from_channel(cls, channel: Type[BaseChannel]): return cls.objects.filter(channel=channel.listen_safe_name()) + + @classmethod + def set_payload_extras_builder(cls, func_name: str, using: Optional[str] = None) -> None: + if using: + conn = connections[using] + else: + conn = connection + with conn.cursor() as cursor: + cursor.execute("set local pgpubsub.get_payload_extras_func = %s", (func_name,)) diff --git a/pgpubsub/tests/conftest.py b/pgpubsub/tests/conftest.py new file mode 100644 index 0000000..ce3eb8a --- /dev/null +++ b/pgpubsub/tests/conftest.py @@ -0,0 +1,14 @@ +import pytest +from django.db import connection +from pgpubsub.listen import listen_to_channels + +@pytest.fixture() +def pg_connection(): + return listen_to_channels() + + +@pytest.fixture +def tx_start_time(django_db_setup): + with connection.cursor() as cursor: + cursor.execute("SELECT now();") + return cursor.fetchone()[0] diff --git a/pgpubsub/tests/migrations/0012_payload_extras.py b/pgpubsub/tests/migrations/0012_payload_extras.py new file mode 100644 index 0000000..b629a3d --- /dev/null +++ b/pgpubsub/tests/migrations/0012_payload_extras.py @@ -0,0 +1,55 @@ +# Generated by Django 3.2.12 on 2023-11-15 13:37 + +from django.db import migrations +import pgtrigger.compiler +import pgtrigger.migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('tests', '0011_payload_stores_proper_jsonb'), + ] + + operations = [ + pgtrigger.migrations.RemoveTrigger( + model_name='author', + name='pgpubsub_160cf', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='child', + name='pgpubsub_89ef9', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='childofabstract', + name='pgpubsub_b1c0b', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='media', + name='pgpubsub_a83de', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='post', + name='pgpubsub_72091', + ), + pgtrigger.migrations.AddTrigger( + model_name='author', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_160cf', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Author"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_160cf\', payload);\n \n perform pg_notify(\'pgpubsub_160cf\', payload::text);\n RETURN NEW;\n ', hash='9492b9c4235c977beaea77ccd016ee81e51444f7', operation='INSERT', pgid='pgtrigger_pgpubsub_160cf_72a36', table='tests_author', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='child', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_89ef9', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Child"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_89ef9\', payload);\n \n perform pg_notify(\'pgpubsub_89ef9\', payload::text);\n RETURN NEW;\n ', hash='aa9a46a1ee682b78b093d817df9dc267f3545ba0', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_89ef9_92bc1', table='tests_child', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='childofabstract', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_b1c0b', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "ChildOfAbstract"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_b1c0b\', payload);\n \n perform pg_notify(\'pgpubsub_b1c0b\', payload::text);\n RETURN NEW;\n ', hash='5450deaa8b49599d7d78cc8f32ddfa81553882cc', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_b1c0b_c8531', table='tests_childofabstract', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='media', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_a83de', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Media"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_a83de\', payload);\n \n perform pg_notify(\'pgpubsub_a83de\', payload::text);\n RETURN NEW;\n ', hash='710bc37c522b4b09187ef6926ced675e1aa23222', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_a83de_cacbb', table='tests_media', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='post', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_72091', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Post"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_72091\', payload);\n \n perform pg_notify(\'pgpubsub_72091\', payload::text);\n RETURN NEW;\n ', hash='36a4ee090402180ff69073f5fd6aa9fd5d88d56a', operation='DELETE', pgid='pgtrigger_pgpubsub_72091_67aeb', table='tests_post', when='AFTER')), + ), + ] diff --git a/pgpubsub/tests/test_core.py b/pgpubsub/tests/test_core.py index 1c8ac06..bda4740 100644 --- a/pgpubsub/tests/test_core.py +++ b/pgpubsub/tests/test_core.py @@ -7,7 +7,6 @@ import pytest from pgpubsub.listen import ( - listen_to_channels, process_notifications, listen, ) @@ -21,11 +20,6 @@ from pgpubsub.tests.models import Author, Media, Post -@pytest.fixture() -def pg_connection(): - return listen_to_channels() - - @pytest.mark.django_db(transaction=True) def test_post_fetch_notify(pg_connection): author = Author.objects.create(name='Billy') @@ -241,13 +235,6 @@ def test_media_insert_notify(pg_connection): assert 'new' in stored_notification.payload -@pytest.fixture -def tx_start_time(django_db_setup): - with connection.cursor() as cursor: - cursor.execute("SELECT now();") - return cursor.fetchone()[0] - - @pytest.mark.django_db(transaction=True) def test_persistent_notification_has_a_creation_timestamp(pg_connection, tx_start_time): Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) diff --git a/pgpubsub/tests/test_payload_extras.py b/pgpubsub/tests/test_payload_extras.py new file mode 100644 index 0000000..599157e --- /dev/null +++ b/pgpubsub/tests/test_payload_extras.py @@ -0,0 +1,114 @@ +import json +import os +from contextlib import contextmanager + +import pytest + +from django.db import connections +from django.db.models import Q +from django.db.transaction import atomic +from pgpubsub.listen import process_notifications +from pgpubsub.models import Notification +from pgpubsub.tests.channels import ( + MediaTriggerChannel, +) +from pgpubsub.tests.models import Author, Media, Post + +@pytest.fixture(scope="session") +def django_db_modify_db_settings(): + is_multitenant = True + print(f"django_db_modify_db_settings: {is_multitenant=}") + if is_multitenant: + from django.conf import settings + settings.PGPUBSUB_ENABLE_PAYLOAD_EXTRAS = True + +""" +@pytest.fixture +def is_multitenant(request): + marks = [m.name for m in request.node.iter_markers()] + if request.node.parent: + marks += [m.name for m in request.node.parent.iter_markers()] + return 'multitenant' in marks + """ + + +@pytest.mark.multitenant +@pytest.mark.django_db(transaction=True) +def test_payload_extras_are_not_added_by_default(pg_connection): + Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) + stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() + assert 'extras' not in stored_notification.payload + + pg_connection.poll() + assert 1 == len(pg_connection.notifies) + + +@pytest.fixture +def configure_payload_extras(): + @contextmanager + def configurer(func_name: str, extras: dict[str, str]): + with connections['default'].cursor() as cursor: + cursor.execute(f""" + create or replace function {func_name}() + returns JSONB + language sql + as $$ + SELECT '{json.dumps(extras)}'::JSONB + $$ + """) + yield + cursor.execute("drop function if exists get_test_payload_extras()") + + return configurer + + +@pytest.mark.parametrize("db", [None, "default"]) +@pytest.mark.django_db(transaction=True) +def test_payload_extras_are_added_if_enabled( + pg_connection, db, configure_payload_extras +): + with ( + atomic(), + configure_payload_extras( + func_name='get_test_payload_extras', + extras={'test_key': 'test-value'}, + ) + ): + Notification.set_payload_extras_builder('get_test_payload_extras', using=db) + Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) + stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() + assert stored_notification.payload['extras'] == {'test_key': 'test-value'} + + pg_connection.poll() + assert 1 == len(pg_connection.notifies) + + +@pytest.mark.django_db(transaction=True) +def test_process_notifications_gets_all_by_default(pg_connection): + Author.objects.create(name='no-filter') + assert not Post.objects.exists() + process_notifications(pg_connection) + assert 1 == Post.objects.filter(author__name='no-filter').count() + +@pytest.mark.django_db(transaction=True) +def test_process_notifications_filters_out_unmatching_notifications( + pg_connection, settings, configure_payload_extras +): + Author.objects.create(name='notmatching') + with ( + atomic(), + configure_payload_extras( + func_name='get_test_payload_extras', + extras={'test_key': 'test-value'}, + ) + ): + Notification.set_payload_extras_builder('get_test_payload_extras') + Author.objects.create(name='matching') + + settings.PGPUBSUB_LISTENER_FILTER = Q(payload__extras__test_key='test-value') + assert not Post.objects.exists() + process_notifications(pg_connection) + assert 1 == Post.objects.filter(author__name='matching').count() + assert 0 == Post.objects.filter(author__name='notmatching').count() + + diff --git a/pgpubsub/triggers.py b/pgpubsub/triggers.py index 03d26c5..494ff9f 100644 --- a/pgpubsub/triggers.py +++ b/pgpubsub/triggers.py @@ -16,7 +16,11 @@ def get_func(self, model: Type[Model]): ''' def get_declare(self, model: Type[Model]): - return [('payload', 'JSONB')] + return [ + ('payload', 'JSONB'), + ('get_payload_extras_func', 'TEXT'), + ('extras', 'JSONB') + ] def _pre_notify(self): return '' @@ -26,6 +30,13 @@ def _build_payload(self, model): payload := '{{"app": "{model._meta.app_label}", "model": "{model.__name__}"}}'::jsonb; payload := jsonb_insert(payload, '{{old}}', COALESCE(to_jsonb(OLD), 'null')); payload := jsonb_insert(payload, '{{new}}', COALESCE(to_jsonb(NEW), 'null')); + SELECT current_setting('pgpubsub.get_payload_extras_func', True) + INTO get_payload_extras_func; + IF get_payload_extras_func IS NOT NULL THEN + EXECUTE 'SELECT ' || quote_ident(get_payload_extras_func) || '()' + INTO extras; + payload := jsonb_insert(payload, '{{extras}}', extras); + END IF; ''' diff --git a/pyproject.toml b/pyproject.toml index dc43202..da79143 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,3 +88,4 @@ log_cli_level = "INFO" DJANGO_SETTINGS_MODULE = "settings" filterwarnings = ["ignore::DeprecationWarning:django.http.request:", "ignore::DeprecationWarning:django.utils.encoding:"] +markers = ["multitenant: uses multenant configuration"] diff --git a/settings.py b/settings.py index adf957c..bc1bace 100644 --- a/settings.py +++ b/settings.py @@ -1,3 +1,4 @@ +import os import dj_database_url SECRET_KEY = 'django-pgpubsub' @@ -17,3 +18,5 @@ DEFAULT_AUTO_FIELD = 'django.db.models.AutoField' ALLOWED_HOSTS = ['localhost', '127.0.0.1', '0.0.0.0:8000'] + +PGPUBSUB_ENABLE_PAYLOAD_EXTRAS = os.environ.get('PGPUBSUB_ENABLE_PAYLOAD_EXTRAS', "False") == "True" From f0498d30463df4d23a1dc6eebadd52dc4f94bc84 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 13:57:43 +0100 Subject: [PATCH 02/17] listener filter as fully qualified class --- pgpubsub/listen.py | 27 +++++++++++++++++++++------ pgpubsub/listeners.py | 8 +++++++- pgpubsub/tests/test_payload_extras.py | 10 +++++++++- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/pgpubsub/listen.py b/pgpubsub/listen.py index be70587..7f8bbd3 100644 --- a/pgpubsub/listen.py +++ b/pgpubsub/listen.py @@ -1,3 +1,4 @@ +import importlib import logging import multiprocessing import select @@ -18,6 +19,7 @@ locate_channel, registry, ) +from pgpubsub.listeners import ListenerFilterProvider from pgpubsub.models import Notification logger = logging.getLogger(__name__) @@ -155,6 +157,18 @@ class CastToJSONB(Func): template = '((%(expressions)s)::jsonb)' +def get_extra_filter() -> Q: + extra_filter_provider_fq_name = getattr(settings, 'PGPUBSUB_LISTENER_FILTER', None) + if extra_filter_provider_fq_name: + module = importlib.import_module( + '.'.join(extra_filter_provider_fq_name.split('.')[:-1]) + ) + clazz = getattr(module, extra_filter_provider_fq_name.split('.')[-1]) + extra_filter_provider: ListenerFilterProvider = clazz() + return extra_filter_provider.get_filter() + else: + return Q() + class LockableNotificationProcessor(NotificationProcessor): def validate(self): @@ -164,15 +178,16 @@ def validate(self): def process(self): logger.info( f'Processing notification for {self.channel_cls.name()}') - extras_filter = getattr(settings, 'PGPUBSUB_LISTENER_FILTER', None) - extras_filter = [extras_filter] if extras_filter else [] + payload_filter = ( + Q(payload=CastToJSONB(Value(self.notification.payload))) | + Q(payload=self.notification.payload) + ) + payload_filter &= get_extra_filter() notification = ( Notification.objects.select_for_update( skip_locked=True).filter( - Q(payload=CastToJSONB(Value(self.notification.payload))) - | Q(payload=self.notification.payload), - *extras_filter, - channel=self.notification.channel, + payload_filter, + channel=self.notification.channel, ).first() ) if notification is None: diff --git a/pgpubsub/listeners.py b/pgpubsub/listeners.py index f9a7bfd..4796dc2 100644 --- a/pgpubsub/listeners.py +++ b/pgpubsub/listeners.py @@ -1,7 +1,8 @@ from functools import wraps -from typing import Union, Type +from typing import Protocol, Type, Union import pgtrigger +from django.db.models import Q from pgtrigger import Trigger, registered from pgpubsub.channel import ( @@ -100,3 +101,8 @@ def wrapper(*args, **kwargs): return callback(*args, **kwargs) return wrapper return _trig_listener + + +class ListenerFilterProvider(Protocol): + def get_filter(self) -> Q: + ... diff --git a/pgpubsub/tests/test_payload_extras.py b/pgpubsub/tests/test_payload_extras.py index 599157e..4c40d16 100644 --- a/pgpubsub/tests/test_payload_extras.py +++ b/pgpubsub/tests/test_payload_extras.py @@ -8,6 +8,7 @@ from django.db.models import Q from django.db.transaction import atomic from pgpubsub.listen import process_notifications +from pgpubsub.listeners import ListenerFilterProvider from pgpubsub.models import Notification from pgpubsub.tests.channels import ( MediaTriggerChannel, @@ -90,6 +91,13 @@ def test_process_notifications_gets_all_by_default(pg_connection): process_notifications(pg_connection) assert 1 == Post.objects.filter(author__name='no-filter').count() + +class TestListenerFilterProvider(ListenerFilterProvider): + __test__ = False + def get_filter(self) -> Q: + return Q(payload__extras__test_key='test-value') + + @pytest.mark.django_db(transaction=True) def test_process_notifications_filters_out_unmatching_notifications( pg_connection, settings, configure_payload_extras @@ -105,7 +113,7 @@ def test_process_notifications_filters_out_unmatching_notifications( Notification.set_payload_extras_builder('get_test_payload_extras') Author.objects.create(name='matching') - settings.PGPUBSUB_LISTENER_FILTER = Q(payload__extras__test_key='test-value') + settings.PGPUBSUB_LISTENER_FILTER = 'pgpubsub.tests.test_payload_extras.TestListenerFilterProvider' assert not Post.objects.exists() process_notifications(pg_connection) assert 1 == Post.objects.filter(author__name='matching').count() From 1a256270800c8319da91350ce27d8d759a4d4171 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 14:22:49 +0100 Subject: [PATCH 03/17] option to pass extras to the listener --- pgpubsub/channel.py | 22 ++++++++++++++++++++-- pgpubsub/tests/listeners.py | 16 ++++++++++++---- pgpubsub/tests/test_payload_extras.py | 20 ++++++++++++++++++++ 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/pgpubsub/channel.py b/pgpubsub/channel.py index 74a5895..0b27187 100644 --- a/pgpubsub/channel.py +++ b/pgpubsub/channel.py @@ -7,9 +7,10 @@ import inspect import json from pydoc import locate -from typing import Callable, Dict, Union, List +from typing import Any, Callable, Dict, Optional, Union, List from django.apps import apps +from django.conf import settings from django.core import serializers from django.core.serializers.json import DjangoJSONEncoder from django.db import models @@ -143,6 +144,20 @@ class TriggerChannel(BaseChannel): model = NotImplementedError old: models.Model new: models.Model + extras: Optional[Dict[str, Any]] = None + + @classmethod + def pass_extras_to_listeners(cls) -> bool: + return getattr(settings, 'PGPUBSUB_PASS_EXTRAS_TO_LISTENERS', False) + + @property + def signature(self): + return { + k: v for k, v in self.__dict__.items() + if k in self.__dataclass_fields__ and ( + k != 'extras' or self.pass_extras_to_listeners() + ) + } @classmethod def deserialize(cls, payload: Union[Dict, str]): @@ -167,7 +182,10 @@ def deserialize(cls, payload: Union[Dict, str]): new = next(new_deserialized_objects, None) if new is not None: new = new.object - return {'old': old, 'new': new} + fields = {'old': old, 'new': new} + if cls.pass_extras_to_listeners(): + fields['extras'] = payload_dict.get('extras', {}) + return fields @classmethod def _build_model_serializer_data(cls, payload: Dict, state: str): diff --git a/pgpubsub/tests/listeners.py b/pgpubsub/tests/listeners.py index c0dce3d..8795452 100644 --- a/pgpubsub/tests/listeners.py +++ b/pgpubsub/tests/listeners.py @@ -1,5 +1,6 @@ -from collections import defaultdict import datetime +from collections import defaultdict +from typing import Any, Dict, Optional from django.db.transaction import atomic @@ -38,17 +39,24 @@ def notify_post_owner(model_id: int, model_type: str, **kwargs): @atomic @pgpubsub.post_insert_listener(AuthorTriggerChannel) -def create_first_post_for_author(old: Author, new: Author): +def create_first_post_for_author( + old: Author, new: Author, extras: Optional[Dict[str, Any]] = None +): print(f'Creating first post for {new.name}') + content = 'Welcome! This is your first post' + if extras and 'content' in extras: + content = extras.get('content') Post.objects.create( author_id=new.pk, - content='Welcome! This is your first post', + content=content, date=datetime.date.today(), ) @pgpubsub.post_insert_listener(AuthorTriggerChannel) -def another_author_trigger(old: Author, new: Author): +def another_author_trigger( + old: Author, new: Author, extras: Optional[Dict[str, Any]] = None +): print(f'Another author trigger') diff --git a/pgpubsub/tests/test_payload_extras.py b/pgpubsub/tests/test_payload_extras.py index 4c40d16..ec63dde 100644 --- a/pgpubsub/tests/test_payload_extras.py +++ b/pgpubsub/tests/test_payload_extras.py @@ -120,3 +120,23 @@ def test_process_notifications_filters_out_unmatching_notifications( assert 0 == Post.objects.filter(author__name='notmatching').count() +@pytest.mark.django_db(transaction=True) +def test_payload_extras_are_passed_to_listener_callback( + pg_connection, settings, configure_payload_extras +): + settings.PGPUBSUB_PASS_EXTRAS_TO_LISTENERS = True + with ( + atomic(), + configure_payload_extras( + func_name='get_test_payload_extras', + extras={'content': 'overriden content'}, + ) + ): + Notification.set_payload_extras_builder('get_test_payload_extras') + Author.objects.create(name='I like overrides') + + assert not Post.objects.exists() + process_notifications(pg_connection) + post = Post.objects.all().first() + assert post is not None + assert post.content == 'overriden content' From 1777b0f48203e3ef0c5922206ec60b5ee2887383 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 14:32:47 +0100 Subject: [PATCH 04/17] cleanup --- pgpubsub/tests/test_payload_extras.py | 27 +++++---------------------- settings.py | 2 -- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/pgpubsub/tests/test_payload_extras.py b/pgpubsub/tests/test_payload_extras.py index ec63dde..cd2598c 100644 --- a/pgpubsub/tests/test_payload_extras.py +++ b/pgpubsub/tests/test_payload_extras.py @@ -15,23 +15,6 @@ ) from pgpubsub.tests.models import Author, Media, Post -@pytest.fixture(scope="session") -def django_db_modify_db_settings(): - is_multitenant = True - print(f"django_db_modify_db_settings: {is_multitenant=}") - if is_multitenant: - from django.conf import settings - settings.PGPUBSUB_ENABLE_PAYLOAD_EXTRAS = True - -""" -@pytest.fixture -def is_multitenant(request): - marks = [m.name for m in request.node.iter_markers()] - if request.node.parent: - marks += [m.name for m in request.node.parent.iter_markers()] - return 'multitenant' in marks - """ - @pytest.mark.multitenant @pytest.mark.django_db(transaction=True) @@ -85,7 +68,7 @@ def test_payload_extras_are_added_if_enabled( @pytest.mark.django_db(transaction=True) -def test_process_notifications_gets_all_by_default(pg_connection): +def test_process_notifications_gets_all_notifications_by_default(pg_connection): Author.objects.create(name='no-filter') assert not Post.objects.exists() process_notifications(pg_connection) @@ -99,10 +82,10 @@ def get_filter(self) -> Q: @pytest.mark.django_db(transaction=True) -def test_process_notifications_filters_out_unmatching_notifications( +def test_process_notifications_filters_out_nonmatching_notifications( pg_connection, settings, configure_payload_extras ): - Author.objects.create(name='notmatching') + Author.objects.create(name='nonmatching') with ( atomic(), configure_payload_extras( @@ -117,11 +100,11 @@ def test_process_notifications_filters_out_unmatching_notifications( assert not Post.objects.exists() process_notifications(pg_connection) assert 1 == Post.objects.filter(author__name='matching').count() - assert 0 == Post.objects.filter(author__name='notmatching').count() + assert 0 == Post.objects.filter(author__name='nonmatching').count() @pytest.mark.django_db(transaction=True) -def test_payload_extras_are_passed_to_listener_callback( +def test_payload_extras_may_be_passed_to_listener_callback( pg_connection, settings, configure_payload_extras ): settings.PGPUBSUB_PASS_EXTRAS_TO_LISTENERS = True diff --git a/settings.py b/settings.py index bc1bace..e7cceef 100644 --- a/settings.py +++ b/settings.py @@ -18,5 +18,3 @@ DEFAULT_AUTO_FIELD = 'django.db.models.AutoField' ALLOWED_HOSTS = ['localhost', '127.0.0.1', '0.0.0.0:8000'] - -PGPUBSUB_ENABLE_PAYLOAD_EXTRAS = os.environ.get('PGPUBSUB_ENABLE_PAYLOAD_EXTRAS', "False") == "True" From 19241e86882ce3ca2522ae3dd341cf6f874334a5 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 16:24:21 +0100 Subject: [PATCH 05/17] adds docs --- docs/payload_extras.rst | 92 +++++++++++++++++++++++++++ docs/toc.rst | 1 + pgpubsub/models.py | 7 +- pgpubsub/tests/test_payload_extras.py | 8 ++- 4 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 docs/payload_extras.rst diff --git a/docs/payload_extras.rst b/docs/payload_extras.rst new file mode 100644 index 0000000..511c10c --- /dev/null +++ b/docs/payload_extras.rst @@ -0,0 +1,92 @@ +.. _payload_extras: + +Payload Extras +============== + +Sometimes it is beneficial to pass some contextual information from the trigger +to the trigger listener along the payload. Examples are: + +- tracing information that allows to track complex request processing in a + multi component system +- in a multitenant system a tenant information to be able to identify the + tenant that peformed an operation that triggered a notification + + +This can be done by using **Payload Extras**. This feature includes: + +- ability to add an additional a.k.a. extra information to the payload in the + trigger +- ability to filter by the extra information in the listener process +- ability to use ``extra`` fields in the listener callbacks + + +Add ``extras`` to payload in the trigger +---------------------------------------- + +Define a postgres function that returns ``JSONB`` value that should be added to +the payload and set it using ``Notification.set_payload_extras_builder``. + +.. code-block:: python + + from pgpubsub.models import Notification + + Notification.set_payload_extras_builder('get_tracing_extras') + +The setting is effective for the current connection (by default) or till the +end of the current transanction if ``till_tx_end=True`` is specified. + +The common pattern of usage is to store tracing information as a [custom +option](https://www.postgresql.org/docs/16/runtime-config-custom.html) when the +transaction is started using ``SET LOCAL myapp.myvalue = 'value'`` and then +retrive that via ``SELECT current_setting('myapp.myvalue')`` in a function +configured via ``set_payload_extras_builder``. + +See examples of usage in ``pgpubsub.tests.test_payload_extras.py``. + + +Filter by ``extras`` field in the trigger listener +-------------------------------------------------- + +Define a class that implements ``ListenerFilterProvider`` protocol and set option +``PGPUBSUB_LISTENER_FILTER`` to its fully qualified class name. + +.. code-block:: python + + from pgpubsub.listeners import ListenerFilterProvider + + class TenantListenerFilterProvider(ListenerFilterProvider): + def get_filter(self) -> Q: + return Q(payload__extras__tenant='my-tenant') + + # django settings + PGPUBSUB_LISTENER_FILTER = 'myapp.whatever.TenantListenerFilterProvider' + +This configuration will skip any notifications that do not have ``tenant`` field +equal to ``my-tenant`` in the payload's ``extras`` field. + +Pass ``extras`` field to the trigger listener callback +------------------------------------------------------ + +To enable this set ``PGPUBSUB_PASS_EXTRAS_TO_LISTENERS`` to ``True`` in djago +settings and add a ``extras`` parameter to the listener callback. + +.. code-block:: python + + # listeners.py + import pgpubsub + from pgpubsub.tests.channels import AuthorTriggerChannel + from pgpubsub.tests.models import Author, Post + + @pgpubsub.post_insert_listener(AuthorTriggerChannel) + def create_first_post_for_author( + old: Author, new: Author, extras: Dict[str, Any] + ): + print(f'Creating first post for {new.name} with extras={extras}') + Post.objects.create( + author_id=new.pk, + content='Welcome! This is your first post', + date=datetime.date.today(), + ) + + # django settings + PGPUBSUB_PASS_EXTRAS_TO_LISTENERS = True diff --git a/docs/toc.rst b/docs/toc.rst index ce297ac..9c7ccb8 100644 --- a/docs/toc.rst +++ b/docs/toc.rst @@ -20,6 +20,7 @@ Table of Contents notifications exactly_once_messaging recovery + payload_extras .. toctree:: diff --git a/pgpubsub/models.py b/pgpubsub/models.py index 6660cda..3ce473e 100644 --- a/pgpubsub/models.py +++ b/pgpubsub/models.py @@ -60,10 +60,13 @@ def from_channel(cls, channel: Type[BaseChannel]): return cls.objects.filter(channel=channel.listen_safe_name()) @classmethod - def set_payload_extras_builder(cls, func_name: str, using: Optional[str] = None) -> None: + def set_payload_extras_builder( + cls, func_name: str, till_tx_end: bool = False, using: Optional[str] = None + ) -> None: if using: conn = connections[using] else: conn = connection + scope = "LOCAL" if till_tx_end else "SESSION" with conn.cursor() as cursor: - cursor.execute("set local pgpubsub.get_payload_extras_func = %s", (func_name,)) + cursor.execute(f"SET {scope} pgpubsub.get_payload_extras_func = %s", (func_name,)) diff --git a/pgpubsub/tests/test_payload_extras.py b/pgpubsub/tests/test_payload_extras.py index cd2598c..ae0484a 100644 --- a/pgpubsub/tests/test_payload_extras.py +++ b/pgpubsub/tests/test_payload_extras.py @@ -93,7 +93,9 @@ def test_process_notifications_filters_out_nonmatching_notifications( extras={'test_key': 'test-value'}, ) ): - Notification.set_payload_extras_builder('get_test_payload_extras') + Notification.set_payload_extras_builder( + 'get_test_payload_extras', till_tx_end=True + ) Author.objects.create(name='matching') settings.PGPUBSUB_LISTENER_FILTER = 'pgpubsub.tests.test_payload_extras.TestListenerFilterProvider' @@ -115,7 +117,9 @@ def test_payload_extras_may_be_passed_to_listener_callback( extras={'content': 'overriden content'}, ) ): - Notification.set_payload_extras_builder('get_test_payload_extras') + Notification.set_payload_extras_builder( + 'get_test_payload_extras', till_tx_end=True + ) Author.objects.create(name='I like overrides') assert not Post.objects.exists() From 621c3a06af156ff4a47f93f79f3a8c117f314ff2 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 16:30:52 +0100 Subject: [PATCH 06/17] removes unused mark --- pgpubsub/tests/test_payload_extras.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pgpubsub/tests/test_payload_extras.py b/pgpubsub/tests/test_payload_extras.py index ae0484a..96135ff 100644 --- a/pgpubsub/tests/test_payload_extras.py +++ b/pgpubsub/tests/test_payload_extras.py @@ -16,7 +16,6 @@ from pgpubsub.tests.models import Author, Media, Post -@pytest.mark.multitenant @pytest.mark.django_db(transaction=True) def test_payload_extras_are_not_added_by_default(pg_connection): Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) From c0cc4df3db1d75242656f9e58bc1c1aa1a258a8b Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 16:32:24 +0100 Subject: [PATCH 07/17] removes unused markers --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index da79143..dc43202 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,4 +88,3 @@ log_cli_level = "INFO" DJANGO_SETTINGS_MODULE = "settings" filterwarnings = ["ignore::DeprecationWarning:django.http.request:", "ignore::DeprecationWarning:django.utils.encoding:"] -markers = ["multitenant: uses multenant configuration"] From 31e9259430754f727e9afc80a898969a2c745bfb Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 16:32:48 +0100 Subject: [PATCH 08/17] removes unused import --- settings.py | 1 - 1 file changed, 1 deletion(-) diff --git a/settings.py b/settings.py index e7cceef..adf957c 100644 --- a/settings.py +++ b/settings.py @@ -1,4 +1,3 @@ -import os import dj_database_url SECRET_KEY = 'django-pgpubsub' From f66011e5d20aa3b372bd39612c4eca7b2d53a464 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 16:55:41 +0100 Subject: [PATCH 09/17] fixes typos --- docs/payload_extras.rst | 2 +- pgpubsub/tests/test_payload_extras.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/payload_extras.rst b/docs/payload_extras.rst index 511c10c..d002900 100644 --- a/docs/payload_extras.rst +++ b/docs/payload_extras.rst @@ -67,7 +67,7 @@ equal to ``my-tenant`` in the payload's ``extras`` field. Pass ``extras`` field to the trigger listener callback ------------------------------------------------------ -To enable this set ``PGPUBSUB_PASS_EXTRAS_TO_LISTENERS`` to ``True`` in djago +To enable this set ``PGPUBSUB_PASS_EXTRAS_TO_LISTENERS`` to ``True`` in django settings and add a ``extras`` parameter to the listener callback. .. code-block:: python diff --git a/pgpubsub/tests/test_payload_extras.py b/pgpubsub/tests/test_payload_extras.py index 96135ff..7261035 100644 --- a/pgpubsub/tests/test_payload_extras.py +++ b/pgpubsub/tests/test_payload_extras.py @@ -57,7 +57,9 @@ def test_payload_extras_are_added_if_enabled( extras={'test_key': 'test-value'}, ) ): - Notification.set_payload_extras_builder('get_test_payload_extras', using=db) + Notification.set_payload_extras_builder( + 'get_test_payload_extras', till_tx_end=True, using=db + ) Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() assert stored_notification.payload['extras'] == {'test_key': 'test-value'} From afa7014f99aaefd56a9d0257fd6af4478dbde132 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 16 Nov 2023 20:18:57 +0100 Subject: [PATCH 10/17] changes to payload context storage --- docs/api.rst | 6 + docs/payload_context.rst | 82 +++++++++++ docs/payload_extras.rst | 92 ------------- docs/recovery.rst | 4 +- docs/toc.rst | 2 +- pgpubsub/__init__.py | 3 +- pgpubsub/channel.py | 36 +++-- pgpubsub/models.py | 14 +- pgpubsub/tests/listeners.py | 8 +- .../tests/migrations/0012_payload_context.py | 55 ++++++++ .../tests/migrations/0012_payload_extras.py | 55 -------- pgpubsub/tests/test_payload_context.py | 86 ++++++++++++ pgpubsub/tests/test_payload_extras.py | 130 ------------------ pgpubsub/triggers.py | 13 +- 14 files changed, 268 insertions(+), 318 deletions(-) create mode 100644 docs/payload_context.rst delete mode 100644 docs/payload_extras.rst create mode 100644 pgpubsub/tests/migrations/0012_payload_context.py delete mode 100644 pgpubsub/tests/migrations/0012_payload_extras.py create mode 100644 pgpubsub/tests/test_payload_context.py delete mode 100644 pgpubsub/tests/test_payload_extras.py diff --git a/docs/api.rst b/docs/api.rst index 1488888..ac3ff6b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -18,6 +18,9 @@ Channels Listeners --------- +.. autoclass:: pgpubsub.ListenerFilterProvider + + .. autofunction:: pgpubsub.listener @@ -48,6 +51,9 @@ Listeners .. autofunction:: pgpubsub.trigger_listener +.. autofunction:: pgpubsub.set_notification_context + + Notifiers --------- diff --git a/docs/payload_context.rst b/docs/payload_context.rst new file mode 100644 index 0000000..cfd9e5b --- /dev/null +++ b/docs/payload_context.rst @@ -0,0 +1,82 @@ +.. _payload_context: + +Payload Context +=============== + +Sometimes it is beneficial to pass some contextual information from the trigger +to the trigger listener along the payload. Examples are: + +- tracing information that allows to track complex request processing in a + multi component system +- in a multitenant system a tenant information to be able to identify the + tenant that peformed an operation that triggered a notification + + +This can be done by using **Payload Context**. This feature includes: + +- ability to add an additional information to the payload in the trigger +- ability to filter by the fields in the context in the listener process +- ability to use ``context`` fields in the listener callbacks + + +Add ``context`` to payload in the trigger +----------------------------------------- + +Before doing updates that produce notifications set the context that should be +passed using ``pgpubsub.set_notification_context`` function. + +.. code-block:: python + + from pgpubsub import set_notification_context + + set_notification_context({'some-key': 'some-value'}) + +The setting is effective till the end of the current transanction. + + +Filter by ``context`` field in the trigger listener +--------------------------------------------------- + +Define a class that implements ``ListenerFilterProvider`` protocol and set +option ``PGPUBSUB_LISTENER_FILTER`` to its fully qualified class name. + +.. code-block:: python + + from pgpubsub import ListenerFilterProvider + + class TenantListenerFilterProvider(ListenerFilterProvider): + def get_filter(self) -> Q: + return Q(payload__context__tenant='my-tenant') + + # django settings + PGPUBSUB_LISTENER_FILTER = 'myapp.whatever.TenantListenerFilterProvider' + +This configuration will skip any notifications that do not have ``tenant`` field +equal to ``my-tenant`` in the payload's ``context`` field. + +Pass ``context`` field to the trigger listener callback +------------------------------------------------------- + +To enable this set ``PGPUBSUB_CONTEXT_TO_LISTENERS`` to ``True`` in django +settings and add a ``context`` parameter to the listener callback. + +.. code-block:: python + + # listeners.py + import pgpubsub + from pgpubsub.tests.channels import AuthorTriggerChannel + from pgpubsub.tests.models import Author, Post + + @pgpubsub.post_insert_listener(AuthorTriggerChannel) + def create_first_post_for_author( + old: Author, new: Author, context: Dict[str, Any] + ): + print(f'Creating first post for {new.name} with context={context}') + Post.objects.create( + author_id=new.pk, + content='Welcome! This is your first post', + date=datetime.date.today(), + ) + + # django settings + PGPUBSUB_PASS_CONTEXT_TO_LISTENERS = True diff --git a/docs/payload_extras.rst b/docs/payload_extras.rst deleted file mode 100644 index d002900..0000000 --- a/docs/payload_extras.rst +++ /dev/null @@ -1,92 +0,0 @@ -.. _payload_extras: - -Payload Extras -============== - -Sometimes it is beneficial to pass some contextual information from the trigger -to the trigger listener along the payload. Examples are: - -- tracing information that allows to track complex request processing in a - multi component system -- in a multitenant system a tenant information to be able to identify the - tenant that peformed an operation that triggered a notification - - -This can be done by using **Payload Extras**. This feature includes: - -- ability to add an additional a.k.a. extra information to the payload in the - trigger -- ability to filter by the extra information in the listener process -- ability to use ``extra`` fields in the listener callbacks - - -Add ``extras`` to payload in the trigger ----------------------------------------- - -Define a postgres function that returns ``JSONB`` value that should be added to -the payload and set it using ``Notification.set_payload_extras_builder``. - -.. code-block:: python - - from pgpubsub.models import Notification - - Notification.set_payload_extras_builder('get_tracing_extras') - -The setting is effective for the current connection (by default) or till the -end of the current transanction if ``till_tx_end=True`` is specified. - -The common pattern of usage is to store tracing information as a [custom -option](https://www.postgresql.org/docs/16/runtime-config-custom.html) when the -transaction is started using ``SET LOCAL myapp.myvalue = 'value'`` and then -retrive that via ``SELECT current_setting('myapp.myvalue')`` in a function -configured via ``set_payload_extras_builder``. - -See examples of usage in ``pgpubsub.tests.test_payload_extras.py``. - - -Filter by ``extras`` field in the trigger listener --------------------------------------------------- - -Define a class that implements ``ListenerFilterProvider`` protocol and set option -``PGPUBSUB_LISTENER_FILTER`` to its fully qualified class name. - -.. code-block:: python - - from pgpubsub.listeners import ListenerFilterProvider - - class TenantListenerFilterProvider(ListenerFilterProvider): - def get_filter(self) -> Q: - return Q(payload__extras__tenant='my-tenant') - - # django settings - PGPUBSUB_LISTENER_FILTER = 'myapp.whatever.TenantListenerFilterProvider' - -This configuration will skip any notifications that do not have ``tenant`` field -equal to ``my-tenant`` in the payload's ``extras`` field. - -Pass ``extras`` field to the trigger listener callback ------------------------------------------------------- - -To enable this set ``PGPUBSUB_PASS_EXTRAS_TO_LISTENERS`` to ``True`` in django -settings and add a ``extras`` parameter to the listener callback. - -.. code-block:: python - - # listeners.py - import pgpubsub - from pgpubsub.tests.channels import AuthorTriggerChannel - from pgpubsub.tests.models import Author, Post - - @pgpubsub.post_insert_listener(AuthorTriggerChannel) - def create_first_post_for_author( - old: Author, new: Author, extras: Dict[str, Any] - ): - print(f'Creating first post for {new.name} with extras={extras}') - Post.objects.create( - author_id=new.pk, - content='Welcome! This is your first post', - date=datetime.date.today(), - ) - - # django settings - PGPUBSUB_PASS_EXTRAS_TO_LISTENERS = True diff --git a/docs/recovery.rst b/docs/recovery.rst index 6d96cce..04c7751 100644 --- a/docs/recovery.rst +++ b/docs/recovery.rst @@ -21,8 +21,8 @@ by supplying it with the ``--recover`` option. This will tell the listening proc any missed stored notifications automatically when it starts up. -Note that this recovery option can be enabled whenever we use the `listen` management command -by supplying it with the `--recover` option. This will tell the listening processes to replay +Note that this recovery option can be enabled whenever we use the ``listen`` management command +by supplying it with the ``--recover`` option. This will tell the listening processes to replay any missed stored notifications automatically when it starts up. It is important to enable server side cursors in the django settings used by diff --git a/docs/toc.rst b/docs/toc.rst index 9c7ccb8..5cd9451 100644 --- a/docs/toc.rst +++ b/docs/toc.rst @@ -20,7 +20,7 @@ Table of Contents notifications exactly_once_messaging recovery - payload_extras + payload_context .. toctree:: diff --git a/pgpubsub/__init__.py b/pgpubsub/__init__.py index 120ef01..2ab9c0d 100644 --- a/pgpubsub/__init__.py +++ b/pgpubsub/__init__.py @@ -1,4 +1,4 @@ -from pgpubsub.channel import Channel, TriggerChannel +from pgpubsub.channel import Channel, TriggerChannel, set_notification_context from pgpubsub.listeners import ( listener, pre_save_listener, @@ -10,6 +10,7 @@ pre_delete_listener, post_delete_listener, trigger_listener, + ListenerFilterProvider, ) from pgpubsub.notify import notify, process_stored_notifications diff --git a/pgpubsub/channel.py b/pgpubsub/channel.py index 0b27187..34739d3 100644 --- a/pgpubsub/channel.py +++ b/pgpubsub/channel.py @@ -1,19 +1,19 @@ +import datetime import hashlib +import inspect +import json from abc import abstractmethod from collections import defaultdict from dataclasses import dataclass from decimal import Decimal -import datetime -import inspect -import json from pydoc import locate -from typing import Any, Callable, Dict, Optional, Union, List +from typing import Any, Callable, Dict, List, Optional, Union from django.apps import apps from django.conf import settings from django.core import serializers from django.core.serializers.json import DjangoJSONEncoder -from django.db import models +from django.db import connection, connections, models registry = defaultdict(list) @@ -144,18 +144,18 @@ class TriggerChannel(BaseChannel): model = NotImplementedError old: models.Model new: models.Model - extras: Optional[Dict[str, Any]] = None + context: Optional[Dict[str, Any]] = None @classmethod - def pass_extras_to_listeners(cls) -> bool: - return getattr(settings, 'PGPUBSUB_PASS_EXTRAS_TO_LISTENERS', False) + def pass_context_to_listeners(cls) -> bool: + return getattr(settings, 'PGPUBSUB_PASS_CONTEXT_TO_LISTENERS', False) @property def signature(self): return { k: v for k, v in self.__dict__.items() if k in self.__dataclass_fields__ and ( - k != 'extras' or self.pass_extras_to_listeners() + k != 'context' or self.pass_context_to_listeners() ) } @@ -183,8 +183,8 @@ def deserialize(cls, payload: Union[Dict, str]): if new is not None: new = new.object fields = {'old': old, 'new': new} - if cls.pass_extras_to_listeners(): - fields['extras'] = payload_dict.get('extras', {}) + if cls.pass_context_to_listeners(): + fields['context'] = payload_dict.get('context', {}) return fields @classmethod @@ -231,6 +231,20 @@ def _build_model_serializer_data(cls, payload: Dict, state: str): return model_data +def set_notification_context( + context: Dict[str, Any], using: Optional[str] = None +) -> None: + if using: + conn = connections[using] + else: + conn = connection + with conn.cursor() as cursor: + cursor.execute( + "SET LOCAL pgpubsub.notification_context = %s", + (json.dumps(context),) + ) + + def locate_channel(channel): if isinstance(channel, str): channel = locate(channel) diff --git a/pgpubsub/models.py b/pgpubsub/models.py index 3ce473e..e27e0a4 100644 --- a/pgpubsub/models.py +++ b/pgpubsub/models.py @@ -1,6 +1,6 @@ from typing import Optional, Type -from django.db import connection, connections, models +from django.db import models try: from django.db.models import JSONField @@ -58,15 +58,3 @@ def __repr__(self): @classmethod def from_channel(cls, channel: Type[BaseChannel]): return cls.objects.filter(channel=channel.listen_safe_name()) - - @classmethod - def set_payload_extras_builder( - cls, func_name: str, till_tx_end: bool = False, using: Optional[str] = None - ) -> None: - if using: - conn = connections[using] - else: - conn = connection - scope = "LOCAL" if till_tx_end else "SESSION" - with conn.cursor() as cursor: - cursor.execute(f"SET {scope} pgpubsub.get_payload_extras_func = %s", (func_name,)) diff --git a/pgpubsub/tests/listeners.py b/pgpubsub/tests/listeners.py index 8795452..dd4ecf3 100644 --- a/pgpubsub/tests/listeners.py +++ b/pgpubsub/tests/listeners.py @@ -40,12 +40,12 @@ def notify_post_owner(model_id: int, model_type: str, **kwargs): @atomic @pgpubsub.post_insert_listener(AuthorTriggerChannel) def create_first_post_for_author( - old: Author, new: Author, extras: Optional[Dict[str, Any]] = None + old: Author, new: Author, context: Optional[Dict[str, Any]] = None ): print(f'Creating first post for {new.name}') content = 'Welcome! This is your first post' - if extras and 'content' in extras: - content = extras.get('content') + if context and 'content' in context: + content = context.get('content') Post.objects.create( author_id=new.pk, content=content, @@ -55,7 +55,7 @@ def create_first_post_for_author( @pgpubsub.post_insert_listener(AuthorTriggerChannel) def another_author_trigger( - old: Author, new: Author, extras: Optional[Dict[str, Any]] = None + old: Author, new: Author, context: Optional[Dict[str, Any]] = None ): print(f'Another author trigger') diff --git a/pgpubsub/tests/migrations/0012_payload_context.py b/pgpubsub/tests/migrations/0012_payload_context.py new file mode 100644 index 0000000..40251c4 --- /dev/null +++ b/pgpubsub/tests/migrations/0012_payload_context.py @@ -0,0 +1,55 @@ +# Generated by Django 3.2.12 on 2023-11-16 13:00 + +from django.db import migrations +import pgtrigger.compiler +import pgtrigger.migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('tests', '0011_payload_stores_proper_jsonb'), + ] + + operations = [ + pgtrigger.migrations.RemoveTrigger( + model_name='author', + name='pgpubsub_160cf', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='child', + name='pgpubsub_89ef9', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='childofabstract', + name='pgpubsub_b1c0b', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='media', + name='pgpubsub_a83de', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='post', + name='pgpubsub_72091', + ), + pgtrigger.migrations.AddTrigger( + model_name='author', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_160cf', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Author"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_160cf\', payload);\n \n perform pg_notify(\'pgpubsub_160cf\', payload::text);\n RETURN NEW;\n ', hash='0c70137e92883450adaa16bcb45de73e23b82fb4', operation='INSERT', pgid='pgtrigger_pgpubsub_160cf_72a36', table='tests_author', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='child', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_89ef9', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Child"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_89ef9\', payload);\n \n perform pg_notify(\'pgpubsub_89ef9\', payload::text);\n RETURN NEW;\n ', hash='cf798ce24042b3016c4aaba8a644bdc707b13371', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_89ef9_92bc1', table='tests_child', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='childofabstract', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_b1c0b', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "ChildOfAbstract"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_b1c0b\', payload);\n \n perform pg_notify(\'pgpubsub_b1c0b\', payload::text);\n RETURN NEW;\n ', hash='d4ab1811b7c894c483fc6d5269db7e49ce51158a', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_b1c0b_c8531', table='tests_childofabstract', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='media', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_a83de', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Media"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_a83de\', payload);\n \n perform pg_notify(\'pgpubsub_a83de\', payload::text);\n RETURN NEW;\n ', hash='4a793ae41f14dbe46316ff95f0517dab16653ce5', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_a83de_cacbb', table='tests_media', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='post', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_72091', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Post"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_72091\', payload);\n \n perform pg_notify(\'pgpubsub_72091\', payload::text);\n RETURN NEW;\n ', hash='f9c2be3e92c93fd272970de09dc7e94e5ddd02a6', operation='DELETE', pgid='pgtrigger_pgpubsub_72091_67aeb', table='tests_post', when='AFTER')), + ), + ] diff --git a/pgpubsub/tests/migrations/0012_payload_extras.py b/pgpubsub/tests/migrations/0012_payload_extras.py deleted file mode 100644 index b629a3d..0000000 --- a/pgpubsub/tests/migrations/0012_payload_extras.py +++ /dev/null @@ -1,55 +0,0 @@ -# Generated by Django 3.2.12 on 2023-11-15 13:37 - -from django.db import migrations -import pgtrigger.compiler -import pgtrigger.migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ('tests', '0011_payload_stores_proper_jsonb'), - ] - - operations = [ - pgtrigger.migrations.RemoveTrigger( - model_name='author', - name='pgpubsub_160cf', - ), - pgtrigger.migrations.RemoveTrigger( - model_name='child', - name='pgpubsub_89ef9', - ), - pgtrigger.migrations.RemoveTrigger( - model_name='childofabstract', - name='pgpubsub_b1c0b', - ), - pgtrigger.migrations.RemoveTrigger( - model_name='media', - name='pgpubsub_a83de', - ), - pgtrigger.migrations.RemoveTrigger( - model_name='post', - name='pgpubsub_72091', - ), - pgtrigger.migrations.AddTrigger( - model_name='author', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_160cf', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Author"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_160cf\', payload);\n \n perform pg_notify(\'pgpubsub_160cf\', payload::text);\n RETURN NEW;\n ', hash='9492b9c4235c977beaea77ccd016ee81e51444f7', operation='INSERT', pgid='pgtrigger_pgpubsub_160cf_72a36', table='tests_author', when='AFTER')), - ), - pgtrigger.migrations.AddTrigger( - model_name='child', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_89ef9', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Child"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_89ef9\', payload);\n \n perform pg_notify(\'pgpubsub_89ef9\', payload::text);\n RETURN NEW;\n ', hash='aa9a46a1ee682b78b093d817df9dc267f3545ba0', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_89ef9_92bc1', table='tests_child', when='AFTER')), - ), - pgtrigger.migrations.AddTrigger( - model_name='childofabstract', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_b1c0b', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "ChildOfAbstract"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_b1c0b\', payload);\n \n perform pg_notify(\'pgpubsub_b1c0b\', payload::text);\n RETURN NEW;\n ', hash='5450deaa8b49599d7d78cc8f32ddfa81553882cc', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_b1c0b_c8531', table='tests_childofabstract', when='AFTER')), - ), - pgtrigger.migrations.AddTrigger( - model_name='media', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_a83de', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Media"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_a83de\', payload);\n \n perform pg_notify(\'pgpubsub_a83de\', payload::text);\n RETURN NEW;\n ', hash='710bc37c522b4b09187ef6926ced675e1aa23222', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_a83de_cacbb', table='tests_media', when='AFTER')), - ), - pgtrigger.migrations.AddTrigger( - model_name='post', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_72091', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; get_payload_extras_func TEXT; extras JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Post"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.get_payload_extras_func\', True)\n INTO get_payload_extras_func;\n IF get_payload_extras_func IS NOT NULL THEN\n EXECUTE \'SELECT \' || quote_ident(get_payload_extras_func) || \'()\'\n INTO extras;\n payload := jsonb_insert(payload, \'{extras}\', extras);\n END IF;\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_72091\', payload);\n \n perform pg_notify(\'pgpubsub_72091\', payload::text);\n RETURN NEW;\n ', hash='36a4ee090402180ff69073f5fd6aa9fd5d88d56a', operation='DELETE', pgid='pgtrigger_pgpubsub_72091_67aeb', table='tests_post', when='AFTER')), - ), - ] diff --git a/pgpubsub/tests/test_payload_context.py b/pgpubsub/tests/test_payload_context.py new file mode 100644 index 0000000..c28d481 --- /dev/null +++ b/pgpubsub/tests/test_payload_context.py @@ -0,0 +1,86 @@ +import json +import os + +import pytest + +import pgpubsub +from django.db import connections +from django.db.models import Q +from django.db.transaction import atomic +from pgpubsub.listen import process_notifications +from pgpubsub.listeners import ListenerFilterProvider +from pgpubsub.models import Notification +from pgpubsub.tests.channels import ( + MediaTriggerChannel, +) +from pgpubsub.tests.models import Author, Media, Post + + +@pytest.mark.django_db(transaction=True) +def test_empty_notification_context_is_stored_in_payload_by_default(pg_connection): + Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) + stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() + assert stored_notification.payload['context'] == {} + + pg_connection.poll() + assert 1 == len(pg_connection.notifies) + + +@pytest.mark.parametrize("db", [None, "default"]) +@pytest.mark.django_db(transaction=True) +def test_notification_context_is_stored_in_payload(pg_connection, db): + with atomic(): + pgpubsub.set_notification_context({'test_key': 'test-value'}, using=db) + Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) + + stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() + assert stored_notification.payload['context'] == {'test_key': 'test-value'} + + pg_connection.poll() + assert 1 == len(pg_connection.notifies) + + +@pytest.mark.django_db(transaction=True) +def test_process_notifications_gets_all_notifications_by_default(pg_connection): + Author.objects.create(name='no-filter') + assert not Post.objects.exists() + process_notifications(pg_connection) + assert 1 == Post.objects.filter(author__name='no-filter').count() + + +class TestListenerFilterProvider(ListenerFilterProvider): + __test__ = False + def get_filter(self) -> Q: + return Q(payload__context__test_key='test-value') + + +@pytest.mark.django_db(transaction=True) +def test_process_notifications_filters_out_nonmatching_notifications( + pg_connection, settings +): + Author.objects.create(name='nonmatching') + with atomic(): + pgpubsub.set_notification_context({'test_key': 'test-value'}) + Author.objects.create(name='matching') + + settings.PGPUBSUB_LISTENER_FILTER = 'pgpubsub.tests.test_payload_context.TestListenerFilterProvider' + assert not Post.objects.exists() + process_notifications(pg_connection) + assert 1 == Post.objects.filter(author__name='matching').count() + assert 0 == Post.objects.filter(author__name='nonmatching').count() + + +@pytest.mark.django_db(transaction=True) +def test_payload_context_may_be_passed_to_listener_callback( + pg_connection, settings +): + settings.PGPUBSUB_PASS_CONTEXT_TO_LISTENERS = True + with atomic(): + pgpubsub.set_notification_context({'content': 'overriden content'}) + Author.objects.create(name='I like overrides') + + assert not Post.objects.exists() + process_notifications(pg_connection) + post = Post.objects.all().first() + assert post is not None + assert post.content == 'overriden content' diff --git a/pgpubsub/tests/test_payload_extras.py b/pgpubsub/tests/test_payload_extras.py deleted file mode 100644 index 7261035..0000000 --- a/pgpubsub/tests/test_payload_extras.py +++ /dev/null @@ -1,130 +0,0 @@ -import json -import os -from contextlib import contextmanager - -import pytest - -from django.db import connections -from django.db.models import Q -from django.db.transaction import atomic -from pgpubsub.listen import process_notifications -from pgpubsub.listeners import ListenerFilterProvider -from pgpubsub.models import Notification -from pgpubsub.tests.channels import ( - MediaTriggerChannel, -) -from pgpubsub.tests.models import Author, Media, Post - - -@pytest.mark.django_db(transaction=True) -def test_payload_extras_are_not_added_by_default(pg_connection): - Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) - stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() - assert 'extras' not in stored_notification.payload - - pg_connection.poll() - assert 1 == len(pg_connection.notifies) - - -@pytest.fixture -def configure_payload_extras(): - @contextmanager - def configurer(func_name: str, extras: dict[str, str]): - with connections['default'].cursor() as cursor: - cursor.execute(f""" - create or replace function {func_name}() - returns JSONB - language sql - as $$ - SELECT '{json.dumps(extras)}'::JSONB - $$ - """) - yield - cursor.execute("drop function if exists get_test_payload_extras()") - - return configurer - - -@pytest.mark.parametrize("db", [None, "default"]) -@pytest.mark.django_db(transaction=True) -def test_payload_extras_are_added_if_enabled( - pg_connection, db, configure_payload_extras -): - with ( - atomic(), - configure_payload_extras( - func_name='get_test_payload_extras', - extras={'test_key': 'test-value'}, - ) - ): - Notification.set_payload_extras_builder( - 'get_test_payload_extras', till_tx_end=True, using=db - ) - Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) - stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() - assert stored_notification.payload['extras'] == {'test_key': 'test-value'} - - pg_connection.poll() - assert 1 == len(pg_connection.notifies) - - -@pytest.mark.django_db(transaction=True) -def test_process_notifications_gets_all_notifications_by_default(pg_connection): - Author.objects.create(name='no-filter') - assert not Post.objects.exists() - process_notifications(pg_connection) - assert 1 == Post.objects.filter(author__name='no-filter').count() - - -class TestListenerFilterProvider(ListenerFilterProvider): - __test__ = False - def get_filter(self) -> Q: - return Q(payload__extras__test_key='test-value') - - -@pytest.mark.django_db(transaction=True) -def test_process_notifications_filters_out_nonmatching_notifications( - pg_connection, settings, configure_payload_extras -): - Author.objects.create(name='nonmatching') - with ( - atomic(), - configure_payload_extras( - func_name='get_test_payload_extras', - extras={'test_key': 'test-value'}, - ) - ): - Notification.set_payload_extras_builder( - 'get_test_payload_extras', till_tx_end=True - ) - Author.objects.create(name='matching') - - settings.PGPUBSUB_LISTENER_FILTER = 'pgpubsub.tests.test_payload_extras.TestListenerFilterProvider' - assert not Post.objects.exists() - process_notifications(pg_connection) - assert 1 == Post.objects.filter(author__name='matching').count() - assert 0 == Post.objects.filter(author__name='nonmatching').count() - - -@pytest.mark.django_db(transaction=True) -def test_payload_extras_may_be_passed_to_listener_callback( - pg_connection, settings, configure_payload_extras -): - settings.PGPUBSUB_PASS_EXTRAS_TO_LISTENERS = True - with ( - atomic(), - configure_payload_extras( - func_name='get_test_payload_extras', - extras={'content': 'overriden content'}, - ) - ): - Notification.set_payload_extras_builder( - 'get_test_payload_extras', till_tx_end=True - ) - Author.objects.create(name='I like overrides') - - assert not Post.objects.exists() - process_notifications(pg_connection) - post = Post.objects.all().first() - assert post is not None - assert post.content == 'overriden content' diff --git a/pgpubsub/triggers.py b/pgpubsub/triggers.py index 494ff9f..8337604 100644 --- a/pgpubsub/triggers.py +++ b/pgpubsub/triggers.py @@ -18,8 +18,7 @@ def get_func(self, model: Type[Model]): def get_declare(self, model: Type[Model]): return [ ('payload', 'JSONB'), - ('get_payload_extras_func', 'TEXT'), - ('extras', 'JSONB') + ('notification_context', 'JSONB'), ] def _pre_notify(self): @@ -30,13 +29,9 @@ def _build_payload(self, model): payload := '{{"app": "{model._meta.app_label}", "model": "{model.__name__}"}}'::jsonb; payload := jsonb_insert(payload, '{{old}}', COALESCE(to_jsonb(OLD), 'null')); payload := jsonb_insert(payload, '{{new}}', COALESCE(to_jsonb(NEW), 'null')); - SELECT current_setting('pgpubsub.get_payload_extras_func', True) - INTO get_payload_extras_func; - IF get_payload_extras_func IS NOT NULL THEN - EXECUTE 'SELECT ' || quote_ident(get_payload_extras_func) || '()' - INTO extras; - payload := jsonb_insert(payload, '{{extras}}', extras); - END IF; + SELECT COALESCE(current_setting('pgpubsub.notification_context', True), '{{}}')::jsonb + INTO notification_context; + payload := jsonb_insert(payload, '{{context}}', notification_context); ''' From 0350b76525dfa37e9c65a2439352592328b70cf9 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Tue, 28 Nov 2023 16:18:02 +0100 Subject: [PATCH 11/17] handles empty context setting --- .../0013_empty_pg_setting_handling.py | 55 +++++++++++++++++++ pgpubsub/tests/test_payload_context.py | 14 +++++ pgpubsub/triggers.py | 9 ++- 3 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 pgpubsub/tests/migrations/0013_empty_pg_setting_handling.py diff --git a/pgpubsub/tests/migrations/0013_empty_pg_setting_handling.py b/pgpubsub/tests/migrations/0013_empty_pg_setting_handling.py new file mode 100644 index 0000000..3cd0617 --- /dev/null +++ b/pgpubsub/tests/migrations/0013_empty_pg_setting_handling.py @@ -0,0 +1,55 @@ +# Generated by Django 3.2.12 on 2023-11-28 09:15 + +from django.db import migrations +import pgtrigger.compiler +import pgtrigger.migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('tests', '0012_payload_context'), + ] + + operations = [ + pgtrigger.migrations.RemoveTrigger( + model_name='author', + name='pgpubsub_160cf', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='child', + name='pgpubsub_89ef9', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='childofabstract', + name='pgpubsub_b1c0b', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='media', + name='pgpubsub_a83de', + ), + pgtrigger.migrations.RemoveTrigger( + model_name='post', + name='pgpubsub_72091', + ), + pgtrigger.migrations.AddTrigger( + model_name='author', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_160cf', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Author"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_160cf\', payload);\n \n perform pg_notify(\'pgpubsub_160cf\', payload::text);\n RETURN NEW;\n ', hash='a0cc1827e0c0c6faef61951e3494bbe69a8448ed', operation='INSERT', pgid='pgtrigger_pgpubsub_160cf_72a36', table='tests_author', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='child', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_89ef9', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Child"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_89ef9\', payload);\n \n perform pg_notify(\'pgpubsub_89ef9\', payload::text);\n RETURN NEW;\n ', hash='cec5f55510398cb434023b5ac3d4c0795e0fd480', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_89ef9_92bc1', table='tests_child', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='childofabstract', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_b1c0b', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "ChildOfAbstract"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_b1c0b\', payload);\n \n perform pg_notify(\'pgpubsub_b1c0b\', payload::text);\n RETURN NEW;\n ', hash='70a23861f292d3be26d11f30fbbca98b781e5d57', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_b1c0b_c8531', table='tests_childofabstract', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='media', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_a83de', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Media"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_a83de\', payload);\n \n perform pg_notify(\'pgpubsub_a83de\', payload::text);\n RETURN NEW;\n ', hash='e2105ac0a63a4c12537a35577d6bf396d256e6db', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_a83de_cacbb', table='tests_media', when='AFTER')), + ), + pgtrigger.migrations.AddTrigger( + model_name='post', + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_72091', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Post"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_72091\', payload);\n \n perform pg_notify(\'pgpubsub_72091\', payload::text);\n RETURN NEW;\n ', hash='132438a8b48b5ee1766405ca35a1f32895eda92d', operation='DELETE', pgid='pgtrigger_pgpubsub_72091_67aeb', table='tests_post', when='AFTER')), + ), + ] diff --git a/pgpubsub/tests/test_payload_context.py b/pgpubsub/tests/test_payload_context.py index c28d481..65c1371 100644 --- a/pgpubsub/tests/test_payload_context.py +++ b/pgpubsub/tests/test_payload_context.py @@ -39,6 +39,20 @@ def test_notification_context_is_stored_in_payload(pg_connection, db): pg_connection.poll() assert 1 == len(pg_connection.notifies) +@pytest.mark.parametrize("db", [None, "default"]) +@pytest.mark.django_db(transaction=True) +def test_notification_context_is_cleared_after_transaction_end(pg_connection, db): + with atomic(): + pgpubsub.set_notification_context({'test_key': 'test-value'}, using=db) + + Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) + + stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() + assert stored_notification.payload['context'] == {} + + pg_connection.poll() + assert 1 == len(pg_connection.notifies) + @pytest.mark.django_db(transaction=True) def test_process_notifications_gets_all_notifications_by_default(pg_connection): diff --git a/pgpubsub/triggers.py b/pgpubsub/triggers.py index 8337604..dd96e20 100644 --- a/pgpubsub/triggers.py +++ b/pgpubsub/triggers.py @@ -19,6 +19,7 @@ def get_declare(self, model: Type[Model]): return [ ('payload', 'JSONB'), ('notification_context', 'JSONB'), + ('notification_context_text', 'TEXT'), ] def _pre_notify(self): @@ -29,9 +30,11 @@ def _build_payload(self, model): payload := '{{"app": "{model._meta.app_label}", "model": "{model.__name__}"}}'::jsonb; payload := jsonb_insert(payload, '{{old}}', COALESCE(to_jsonb(OLD), 'null')); payload := jsonb_insert(payload, '{{new}}', COALESCE(to_jsonb(NEW), 'null')); - SELECT COALESCE(current_setting('pgpubsub.notification_context', True), '{{}}')::jsonb - INTO notification_context; - payload := jsonb_insert(payload, '{{context}}', notification_context); + SELECT current_setting('pgpubsub.notification_context', True) INTO notification_context_text; + IF COALESCE(notification_context_text, '') = '' THEN + notification_context_text := '{{}}'; + END IF; + payload := jsonb_insert(payload, '{{context}}', notification_context_text::jsonb); ''' From 1f7cdbd7a4fa01e2709d8499d0f4a8c296c9fceb Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 30 Nov 2023 13:43:05 +0100 Subject: [PATCH 12/17] ignores tx aborted error --- pgpubsub/channel.py | 19 +++++++++++++++---- pgpubsub/tests/test_payload_context.py | 14 +++++++++++++- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/pgpubsub/channel.py b/pgpubsub/channel.py index 34739d3..39fda17 100644 --- a/pgpubsub/channel.py +++ b/pgpubsub/channel.py @@ -14,6 +14,7 @@ from django.core import serializers from django.core.serializers.json import DjangoJSONEncoder from django.db import connection, connections, models +from django.db.utils import InternalError registry = defaultdict(list) @@ -231,6 +232,10 @@ def _build_model_serializer_data(cls, payload: Dict, state: str): return model_data +TX_ABORTED_ERROR_MESSAGE = ( + 'current transaction is aborted, commands ignored until end of transaction block' +) + def set_notification_context( context: Dict[str, Any], using: Optional[str] = None ) -> None: @@ -239,10 +244,16 @@ def set_notification_context( else: conn = connection with conn.cursor() as cursor: - cursor.execute( - "SET LOCAL pgpubsub.notification_context = %s", - (json.dumps(context),) - ) + try: + cursor.execute( + "SET LOCAL pgpubsub.notification_context = %s", + (json.dumps(context),) + ) + except InternalError as e: + if TX_ABORTED_ERROR_MESSAGE in str(e): + return + else: + raise def locate_channel(channel): diff --git a/pgpubsub/tests/test_payload_context.py b/pgpubsub/tests/test_payload_context.py index 65c1371..c58a14a 100644 --- a/pgpubsub/tests/test_payload_context.py +++ b/pgpubsub/tests/test_payload_context.py @@ -4,7 +4,7 @@ import pytest import pgpubsub -from django.db import connections +from django.db import connection, connections from django.db.models import Q from django.db.transaction import atomic from pgpubsub.listen import process_notifications @@ -39,6 +39,18 @@ def test_notification_context_is_stored_in_payload(pg_connection, db): pg_connection.poll() assert 1 == len(pg_connection.notifies) + +def test_set_notification_context_is_noop_if_transaction_needs_rollback(db): + with atomic(): + try: + with connection.cursor() as cur: + cur.execute("invalid sql") + except Exception: + pass + pgpubsub.set_notification_context({'test_key': 'test-value'}) + connection.set_rollback(True) + + @pytest.mark.parametrize("db", [None, "default"]) @pytest.mark.django_db(transaction=True) def test_notification_context_is_cleared_after_transaction_end(pg_connection, db): From 4afba939b0935bce9a792e8c61bf449f21c814a9 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 8 Feb 2024 13:13:44 +0100 Subject: [PATCH 13/17] adds filter by payload context to listener recovery --- pgpubsub/listen.py | 3 ++- pgpubsub/tests/conftest.py | 2 +- pgpubsub/tests/connection.py | 5 +++++ pgpubsub/tests/test_core.py | 17 ++++++----------- pgpubsub/tests/test_payload_context.py | 21 +++++++++++++++++++++ 5 files changed, 35 insertions(+), 13 deletions(-) create mode 100644 pgpubsub/tests/connection.py diff --git a/pgpubsub/listen.py b/pgpubsub/listen.py index 7f8bbd3..44cbf8a 100644 --- a/pgpubsub/listen.py +++ b/pgpubsub/listen.py @@ -208,9 +208,10 @@ def validate(self): def process(self): logger.info(f'Processing all notifications for channel {self.channel_cls.name()} \n') + payload_filter = Q(channel=self.notification.channel) & get_extra_filter() notifications = ( Notification.objects.select_for_update( - skip_locked=True).filter(channel=self.notification.channel).iterator() + skip_locked=True).filter(payload_filter).iterator() ) logger.info(f'Found notifications: {notifications}') for notification in notifications: diff --git a/pgpubsub/tests/conftest.py b/pgpubsub/tests/conftest.py index ce3eb8a..dd16d3c 100644 --- a/pgpubsub/tests/conftest.py +++ b/pgpubsub/tests/conftest.py @@ -1,6 +1,6 @@ import pytest from django.db import connection -from pgpubsub.listen import listen_to_channels +from pgpubsub.listen import listen_to_channels @pytest.fixture() def pg_connection(): diff --git a/pgpubsub/tests/connection.py b/pgpubsub/tests/connection.py new file mode 100644 index 0000000..c50942b --- /dev/null +++ b/pgpubsub/tests/connection.py @@ -0,0 +1,5 @@ + +def simulate_listener_does_not_receive_notifications(pg_connection): + pg_connection.notifies = [] + pg_connection.poll() + assert 0 == len(pg_connection.notifies) diff --git a/pgpubsub/tests/test_core.py b/pgpubsub/tests/test_core.py index bda4740..d1ddd8a 100644 --- a/pgpubsub/tests/test_core.py +++ b/pgpubsub/tests/test_core.py @@ -16,6 +16,7 @@ AuthorTriggerChannel, MediaTriggerChannel, ) +from pgpubsub.tests.connection import simulate_listener_does_not_receive_notifications from pgpubsub.tests.listeners import post_reads_per_date_cache from pgpubsub.tests.models import Author, Media, Post @@ -117,12 +118,6 @@ def test_author_bulk_insert_notify(pg_connection): assert [author.pk for author in authors] == list(post_authors) -def _simulate_listener_does_not_receive_notifications(pg_connection): - pg_connection.notifies = [] - pg_connection.poll() - assert 0 == len(pg_connection.notifies) - - @pytest.mark.django_db(transaction=True) def test_process_stored_notifications(pg_connection): Author.objects.create(name='Billy') @@ -130,7 +125,7 @@ def test_process_stored_notifications(pg_connection): assert 2 == len(pg_connection.notifies) assert 2 == Notification.objects.count() assert 0 == Post.objects.count() - _simulate_listener_does_not_receive_notifications(pg_connection) + simulate_listener_does_not_receive_notifications(pg_connection) process_stored_notifications() pg_connection.poll() # One notification for each lockable channel @@ -148,7 +143,7 @@ def test_recover_notifications(pg_connection): assert 2 == len(pg_connection.notifies) assert 2 == Notification.objects.count() assert 0 == Post.objects.count() - _simulate_listener_does_not_receive_notifications(pg_connection) + simulate_listener_does_not_receive_notifications(pg_connection) with patch('pgpubsub.listen.POLL', False): listen(recover=True) pg_connection.poll() @@ -164,7 +159,7 @@ def test_recover_multiple_notifications(pg_connection): assert ENTITIES_COUNT == len(pg_connection.notifies) assert ENTITIES_COUNT == Notification.objects.count() assert 0 == Post.objects.count() - _simulate_listener_does_not_receive_notifications(pg_connection) + simulate_listener_does_not_receive_notifications(pg_connection) with patch('pgpubsub.listen.POLL', False): listen(recover=True) pg_connection.poll() @@ -191,7 +186,7 @@ def test_recover_notifications_after_exception(pg_connection): assert 3 == Notification.objects.count() assert 0 == Post.objects.count() - _simulate_listener_does_not_receive_notifications(pg_connection) + simulate_listener_does_not_receive_notifications(pg_connection) with patch('pgpubsub.listen.POLL', False): listen(recover=True) pg_connection.poll() @@ -218,7 +213,7 @@ def test_recover_multiple_notifications_after_exception(pg_connection): assert GOOD_COUNT + BROKEN_COUNT == Notification.objects.count() assert 0 == Post.objects.count() - _simulate_listener_does_not_receive_notifications(pg_connection) + simulate_listener_does_not_receive_notifications(pg_connection) with patch('pgpubsub.listen.POLL', False): listen(recover=True) pg_connection.poll() diff --git a/pgpubsub/tests/test_payload_context.py b/pgpubsub/tests/test_payload_context.py index c58a14a..71bbc7d 100644 --- a/pgpubsub/tests/test_payload_context.py +++ b/pgpubsub/tests/test_payload_context.py @@ -10,9 +10,11 @@ from pgpubsub.listen import process_notifications from pgpubsub.listeners import ListenerFilterProvider from pgpubsub.models import Notification +from pgpubsub.notify import process_stored_notifications from pgpubsub.tests.channels import ( MediaTriggerChannel, ) +from pgpubsub.tests.connection import simulate_listener_does_not_receive_notifications from pgpubsub.tests.models import Author, Media, Post @@ -96,6 +98,25 @@ def test_process_notifications_filters_out_nonmatching_notifications( assert 0 == Post.objects.filter(author__name='nonmatching').count() +@pytest.mark.django_db(transaction=True) +def test_process_notifications_recovery_filters_out_nonmatching_notifications( + pg_connection, settings +): + Author.objects.create(name='nonmatching') + with atomic(): + pgpubsub.set_notification_context({'test_key': 'test-value'}) + Author.objects.create(name='matching') + + settings.PGPUBSUB_LISTENER_FILTER = 'pgpubsub.tests.test_payload_context.TestListenerFilterProvider' + assert not Post.objects.exists() + + simulate_listener_does_not_receive_notifications(pg_connection) + process_stored_notifications() + process_notifications(pg_connection) + assert 1 == Post.objects.filter(author__name='matching').count() + assert 0 == Post.objects.filter(author__name='nonmatching').count() + + @pytest.mark.django_db(transaction=True) def test_payload_context_may_be_passed_to_listener_callback( pg_connection, settings From f51c288ed451a2cd25191136345db030584ac003 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 30 Nov 2023 17:32:14 +0100 Subject: [PATCH 14/17] check that there tx is ok from django perspective --- pgpubsub/channel.py | 2 ++ pgpubsub/tests/test_payload_context.py | 23 +++++++++++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pgpubsub/channel.py b/pgpubsub/channel.py index 39fda17..9ff5108 100644 --- a/pgpubsub/channel.py +++ b/pgpubsub/channel.py @@ -243,6 +243,8 @@ def set_notification_context( conn = connections[using] else: conn = connection + if conn.needs_rollback: + return with conn.cursor() as cursor: try: cursor.execute( diff --git a/pgpubsub/tests/test_payload_context.py b/pgpubsub/tests/test_payload_context.py index 71bbc7d..f5dfd86 100644 --- a/pgpubsub/tests/test_payload_context.py +++ b/pgpubsub/tests/test_payload_context.py @@ -7,6 +7,7 @@ from django.db import connection, connections from django.db.models import Q from django.db.transaction import atomic +from django.db.utils import IntegrityError from pgpubsub.listen import process_notifications from pgpubsub.listeners import ListenerFilterProvider from pgpubsub.models import Notification @@ -42,15 +43,29 @@ def test_notification_context_is_stored_in_payload(pg_connection, db): assert 1 == len(pg_connection.notifies) -def test_set_notification_context_is_noop_if_transaction_needs_rollback(db): +def test_set_notification_context_is_noop_if_there_was_error_in_transaction(db): + def execute_some_errorneous_statement(): + with connection.cursor() as cur: + cur.execute("invalid sql") + with atomic(): try: - with connection.cursor() as cur: - cur.execute("invalid sql") + execute_some_errorneous_statement() except Exception: pass pgpubsub.set_notification_context({'test_key': 'test-value'}) - connection.set_rollback(True) + connection.set_rollback(True) # need this to rollback as commit would fail + +def test_set_notification_context_is_noop_if_transaction_needs_rollback(db): + def emulate_db_error_handling_in_django(): + # In many use cases in django the low level DB exception is handled + # and the fact that it happened is recorded on the connection with + # needs_rollback + connection.needs_rollback = True + + with atomic(): + emulate_db_error_handling_in_django() + pgpubsub.set_notification_context({'test_key': 'test-value'}) @pytest.mark.parametrize("db", [None, "default"]) From 0c1ed90a28f491e1c51f4644c4e3e2d1951beedc Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 8 Feb 2024 13:27:44 +0100 Subject: [PATCH 15/17] adds support for session bound notification context --- pgpubsub/channel.py | 15 ++++++- pgpubsub/tests/test_payload_context.py | 59 +++++++++++++++++++++++--- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/pgpubsub/channel.py b/pgpubsub/channel.py index 9ff5108..bae63e8 100644 --- a/pgpubsub/channel.py +++ b/pgpubsub/channel.py @@ -245,10 +245,23 @@ def set_notification_context( conn = connection if conn.needs_rollback: return + use_tx_bound_notification_context = getattr( + settings, 'PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT', False + ) + if use_tx_bound_notification_context and not conn.in_atomic_block: + raise RuntimeError( + 'Transaction bound context can be only set in atomic block. ' + 'Either start transaction with `atomic` or do not use transaction bound ' + 'payload context via PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT=False' + ) with conn.cursor() as cursor: try: + if use_tx_bound_notification_context: + scope = 'LOCAL' + else: + scope = 'SESSION' cursor.execute( - "SET LOCAL pgpubsub.notification_context = %s", + f'SET {scope} pgpubsub.notification_context = %s', (json.dumps(context),) ) except InternalError as e: diff --git a/pgpubsub/tests/test_payload_context.py b/pgpubsub/tests/test_payload_context.py index f5dfd86..88f9527 100644 --- a/pgpubsub/tests/test_payload_context.py +++ b/pgpubsub/tests/test_payload_context.py @@ -29,11 +29,41 @@ def test_empty_notification_context_is_stored_in_payload_by_default(pg_connectio assert 1 == len(pg_connection.notifies) -@pytest.mark.parametrize("db", [None, "default"]) +@pytest.fixture +def clear_notification_context(): + yield + pgpubsub.set_notification_context({}) + + +@pytest.mark.parametrize("db_alias", [None, "default"]) +@pytest.mark.parametrize("tx_bound_context", [None, False]) +@pytest.mark.django_db(transaction=True) +def test_notification_context_is_stored_in_payload( + pg_connection, settings, db_alias, tx_bound_context, clear_notification_context +): + if tx_bound_context is None: + delattr(settings, 'PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT') + else: + settings.PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT = tx_bound_context + + pgpubsub.set_notification_context({'test_key': 'test-value'}, + using=db_alias) + Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) + + stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() + assert stored_notification.payload['context'] == {'test_key': 'test-value'} + + pg_connection.poll() + assert 1 == len(pg_connection.notifies) + + +@pytest.mark.parametrize("db_alias", [None, "default"]) @pytest.mark.django_db(transaction=True) -def test_notification_context_is_stored_in_payload(pg_connection, db): +def test_tx_bound_notification_context_is_stored_in_payload(pg_connection, settings, db_alias): + settings.PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT = True with atomic(): - pgpubsub.set_notification_context({'test_key': 'test-value'}, using=db) + pgpubsub.set_notification_context({'test_key': 'test-value'}, + using=db_alias) Media.objects.create(name='avatar.jpg', content_type='image/png', size=15000) stored_notification = Notification.from_channel(channel=MediaTriggerChannel).get() @@ -43,11 +73,24 @@ def test_notification_context_is_stored_in_payload(pg_connection, db): assert 1 == len(pg_connection.notifies) -def test_set_notification_context_is_noop_if_there_was_error_in_transaction(db): +@pytest.mark.django_db(transaction=True) +def test_set_notification_context_raises_outside_of_tx_if_tx_bound_context_is_used( + settings +): + settings.PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT = True + + with pytest.raises(RuntimeError): + pgpubsub.set_notification_context({'test_key': 'test-value'}) + + +def test_set_notification_context_is_noop_if_there_was_error_in_transaction( + db, settings +): def execute_some_errorneous_statement(): with connection.cursor() as cur: cur.execute("invalid sql") + settings.PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT = True with atomic(): try: execute_some_errorneous_statement() @@ -56,13 +99,14 @@ def execute_some_errorneous_statement(): pgpubsub.set_notification_context({'test_key': 'test-value'}) connection.set_rollback(True) # need this to rollback as commit would fail -def test_set_notification_context_is_noop_if_transaction_needs_rollback(db): +def test_set_notification_context_is_noop_if_transaction_needs_rollback(db, settings): def emulate_db_error_handling_in_django(): # In many use cases in django the low level DB exception is handled # and the fact that it happened is recorded on the connection with # needs_rollback connection.needs_rollback = True + settings.PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT = True with atomic(): emulate_db_error_handling_in_django() pgpubsub.set_notification_context({'test_key': 'test-value'}) @@ -70,7 +114,10 @@ def emulate_db_error_handling_in_django(): @pytest.mark.parametrize("db", [None, "default"]) @pytest.mark.django_db(transaction=True) -def test_notification_context_is_cleared_after_transaction_end(pg_connection, db): +def test_notification_context_is_cleared_after_transaction_end( + pg_connection, db, settings +): + settings.PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT = True with atomic(): pgpubsub.set_notification_context({'test_key': 'test-value'}, using=db) From 645caa0ab3d46428622399b877cda4f101c753e7 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Thu, 8 Feb 2024 13:42:07 +0100 Subject: [PATCH 16/17] document PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT option --- docs/payload_context.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/payload_context.rst b/docs/payload_context.rst index cfd9e5b..e84cd98 100644 --- a/docs/payload_context.rst +++ b/docs/payload_context.rst @@ -31,7 +31,9 @@ passed using ``pgpubsub.set_notification_context`` function. set_notification_context({'some-key': 'some-value'}) -The setting is effective till the end of the current transanction. +The setting is effective till the connection is closed. Alternatively the setting +``PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT`` can be used to clean the context at the end +of the current transanction. Filter by ``context`` field in the trigger listener From 1e1f64855911c38283c99addaca25d7c315996ee Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 14 Feb 2024 18:11:59 +0100 Subject: [PATCH 17/17] fixes review comments --- docs/payload_context.rst | 13 +++-- pgpubsub/models.py | 2 +- .../tests/migrations/0012_payload_context.py | 12 ++-- .../0013_empty_pg_setting_handling.py | 55 ------------------- pgpubsub/triggers.py | 1 - 5 files changed, 15 insertions(+), 68 deletions(-) delete mode 100644 pgpubsub/tests/migrations/0013_empty_pg_setting_handling.py diff --git a/docs/payload_context.rst b/docs/payload_context.rst index e84cd98..02864f1 100644 --- a/docs/payload_context.rst +++ b/docs/payload_context.rst @@ -31,16 +31,19 @@ passed using ``pgpubsub.set_notification_context`` function. set_notification_context({'some-key': 'some-value'}) -The setting is effective till the connection is closed. Alternatively the setting -``PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT`` can be used to clean the context at the end -of the current transanction. +The setting is effective till the connection is closed. Alternatively the +setting ``PGPUBSUB_TX_BOUND_NOTIFICATION_CONTEXT=True`` can be used to clean +the context at the end of the current transanction. Filter by ``context`` field in the trigger listener --------------------------------------------------- -Define a class that implements ``ListenerFilterProvider`` protocol and set -option ``PGPUBSUB_LISTENER_FILTER`` to its fully qualified class name. +Note: that the filtering is currently supported only for stored notifications that is +only for channels with ``lock_notifications = True``. + +Define a class that implements the ``ListenerFilterProvider`` protocol and set +the option ``PGPUBSUB_LISTENER_FILTER`` to its fully qualified class name. .. code-block:: python diff --git a/pgpubsub/models.py b/pgpubsub/models.py index e27e0a4..5392b29 100644 --- a/pgpubsub/models.py +++ b/pgpubsub/models.py @@ -1,4 +1,4 @@ -from typing import Optional, Type +from typing import Type from django.db import models diff --git a/pgpubsub/tests/migrations/0012_payload_context.py b/pgpubsub/tests/migrations/0012_payload_context.py index 40251c4..4e7718a 100644 --- a/pgpubsub/tests/migrations/0012_payload_context.py +++ b/pgpubsub/tests/migrations/0012_payload_context.py @@ -1,4 +1,4 @@ -# Generated by Django 3.2.12 on 2023-11-16 13:00 +# Generated by Django 3.2.12 on 2024-02-14 11:11 from django.db import migrations import pgtrigger.compiler @@ -34,22 +34,22 @@ class Migration(migrations.Migration): ), pgtrigger.migrations.AddTrigger( model_name='author', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_160cf', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Author"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_160cf\', payload);\n \n perform pg_notify(\'pgpubsub_160cf\', payload::text);\n RETURN NEW;\n ', hash='0c70137e92883450adaa16bcb45de73e23b82fb4', operation='INSERT', pgid='pgtrigger_pgpubsub_160cf_72a36', table='tests_author', when='AFTER')), + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_160cf', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Author"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_160cf\', payload);\n \n perform pg_notify(\'pgpubsub_160cf\', payload::text);\n RETURN NEW;\n ', hash='7054162f85ef34f32990521dcbdacecf89344dee', operation='INSERT', pgid='pgtrigger_pgpubsub_160cf_72a36', table='tests_author', when='AFTER')), ), pgtrigger.migrations.AddTrigger( model_name='child', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_89ef9', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Child"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_89ef9\', payload);\n \n perform pg_notify(\'pgpubsub_89ef9\', payload::text);\n RETURN NEW;\n ', hash='cf798ce24042b3016c4aaba8a644bdc707b13371', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_89ef9_92bc1', table='tests_child', when='AFTER')), + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_89ef9', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Child"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_89ef9\', payload);\n \n perform pg_notify(\'pgpubsub_89ef9\', payload::text);\n RETURN NEW;\n ', hash='9977852f482c0f7dfe178edd75433a2c6146ea25', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_89ef9_92bc1', table='tests_child', when='AFTER')), ), pgtrigger.migrations.AddTrigger( model_name='childofabstract', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_b1c0b', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "ChildOfAbstract"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_b1c0b\', payload);\n \n perform pg_notify(\'pgpubsub_b1c0b\', payload::text);\n RETURN NEW;\n ', hash='d4ab1811b7c894c483fc6d5269db7e49ce51158a', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_b1c0b_c8531', table='tests_childofabstract', when='AFTER')), + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_b1c0b', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "ChildOfAbstract"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_b1c0b\', payload);\n \n perform pg_notify(\'pgpubsub_b1c0b\', payload::text);\n RETURN NEW;\n ', hash='06c20d9ba80bfe58d86663a61cf7d6a826d308f2', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_b1c0b_c8531', table='tests_childofabstract', when='AFTER')), ), pgtrigger.migrations.AddTrigger( model_name='media', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_a83de', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Media"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_a83de\', payload);\n \n perform pg_notify(\'pgpubsub_a83de\', payload::text);\n RETURN NEW;\n ', hash='4a793ae41f14dbe46316ff95f0517dab16653ce5', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_a83de_cacbb', table='tests_media', when='AFTER')), + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_a83de', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Media"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_a83de\', payload);\n \n perform pg_notify(\'pgpubsub_a83de\', payload::text);\n RETURN NEW;\n ', hash='4270742f2cb8d4a8c5c8cfde41fc25b2e69d327a', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_a83de_cacbb', table='tests_media', when='AFTER')), ), pgtrigger.migrations.AddTrigger( model_name='post', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_72091', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB;', func='\n \n payload := \'{"app": "tests", "model": "Post"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT COALESCE(current_setting(\'pgpubsub.notification_context\', True), \'{}\')::jsonb\n INTO notification_context;\n payload := jsonb_insert(payload, \'{context}\', notification_context);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_72091\', payload);\n \n perform pg_notify(\'pgpubsub_72091\', payload::text);\n RETURN NEW;\n ', hash='f9c2be3e92c93fd272970de09dc7e94e5ddd02a6', operation='DELETE', pgid='pgtrigger_pgpubsub_72091_67aeb', table='tests_post', when='AFTER')), + trigger=pgtrigger.compiler.Trigger(name='pgpubsub_72091', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Post"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_72091\', payload);\n \n perform pg_notify(\'pgpubsub_72091\', payload::text);\n RETURN NEW;\n ', hash='aecde4b8e0a62d2846b181438259a0737fcd6afb', operation='DELETE', pgid='pgtrigger_pgpubsub_72091_67aeb', table='tests_post', when='AFTER')), ), ] diff --git a/pgpubsub/tests/migrations/0013_empty_pg_setting_handling.py b/pgpubsub/tests/migrations/0013_empty_pg_setting_handling.py deleted file mode 100644 index 3cd0617..0000000 --- a/pgpubsub/tests/migrations/0013_empty_pg_setting_handling.py +++ /dev/null @@ -1,55 +0,0 @@ -# Generated by Django 3.2.12 on 2023-11-28 09:15 - -from django.db import migrations -import pgtrigger.compiler -import pgtrigger.migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ('tests', '0012_payload_context'), - ] - - operations = [ - pgtrigger.migrations.RemoveTrigger( - model_name='author', - name='pgpubsub_160cf', - ), - pgtrigger.migrations.RemoveTrigger( - model_name='child', - name='pgpubsub_89ef9', - ), - pgtrigger.migrations.RemoveTrigger( - model_name='childofabstract', - name='pgpubsub_b1c0b', - ), - pgtrigger.migrations.RemoveTrigger( - model_name='media', - name='pgpubsub_a83de', - ), - pgtrigger.migrations.RemoveTrigger( - model_name='post', - name='pgpubsub_72091', - ), - pgtrigger.migrations.AddTrigger( - model_name='author', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_160cf', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Author"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_160cf\', payload);\n \n perform pg_notify(\'pgpubsub_160cf\', payload::text);\n RETURN NEW;\n ', hash='a0cc1827e0c0c6faef61951e3494bbe69a8448ed', operation='INSERT', pgid='pgtrigger_pgpubsub_160cf_72a36', table='tests_author', when='AFTER')), - ), - pgtrigger.migrations.AddTrigger( - model_name='child', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_89ef9', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Child"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_89ef9\', payload);\n \n perform pg_notify(\'pgpubsub_89ef9\', payload::text);\n RETURN NEW;\n ', hash='cec5f55510398cb434023b5ac3d4c0795e0fd480', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_89ef9_92bc1', table='tests_child', when='AFTER')), - ), - pgtrigger.migrations.AddTrigger( - model_name='childofabstract', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_b1c0b', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "ChildOfAbstract"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_b1c0b\', payload);\n \n perform pg_notify(\'pgpubsub_b1c0b\', payload::text);\n RETURN NEW;\n ', hash='70a23861f292d3be26d11f30fbbca98b781e5d57', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_b1c0b_c8531', table='tests_childofabstract', when='AFTER')), - ), - pgtrigger.migrations.AddTrigger( - model_name='media', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_a83de', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Media"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_a83de\', payload);\n \n perform pg_notify(\'pgpubsub_a83de\', payload::text);\n RETURN NEW;\n ', hash='e2105ac0a63a4c12537a35577d6bf396d256e6db', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_a83de_cacbb', table='tests_media', when='AFTER')), - ), - pgtrigger.migrations.AddTrigger( - model_name='post', - trigger=pgtrigger.compiler.Trigger(name='pgpubsub_72091', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE payload JSONB; notification_context JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Post"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_72091\', payload);\n \n perform pg_notify(\'pgpubsub_72091\', payload::text);\n RETURN NEW;\n ', hash='132438a8b48b5ee1766405ca35a1f32895eda92d', operation='DELETE', pgid='pgtrigger_pgpubsub_72091_67aeb', table='tests_post', when='AFTER')), - ), - ] diff --git a/pgpubsub/triggers.py b/pgpubsub/triggers.py index dd96e20..ebe0d9b 100644 --- a/pgpubsub/triggers.py +++ b/pgpubsub/triggers.py @@ -18,7 +18,6 @@ def get_func(self, model: Type[Model]): def get_declare(self, model: Type[Model]): return [ ('payload', 'JSONB'), - ('notification_context', 'JSONB'), ('notification_context_text', 'TEXT'), ]