Skip to content

Commit

Permalink
create Django transport and channel
Browse files Browse the repository at this point in the history
  • Loading branch information
thuibr committed Jan 19, 2025
1 parent a0175b0 commit 4df3ff8
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 1 deletion.
3 changes: 2 additions & 1 deletion docs/includes/introduction.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ Transport Comparison
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *Pyro* | Virtual | Yes | Yes [#f1]_ | No | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *Django* | Virtual | Yes | Yes | Yes | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+


.. [#f1] Declarations only kept in memory, so exchanges/queues
Expand Down Expand Up @@ -264,4 +266,3 @@ There are some concepts you should be familiar with before starting:
zero or more words. For example `"*.stock.#"` matches the
routing keys `"usd.stock"` and `"eur.stock.db"` but not
`"stock.nasdaq"`.

3 changes: 3 additions & 0 deletions docs/userguide/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ All of these are valid URLs:
# Using Pyro with name server running on 'localhost'
pyro://localhost/kombu.broker
# Using Django
django:///
The query part of the URL can also be used to set options, e.g.:

Expand Down
1 change: 1 addition & 0 deletions kombu/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def supports_librabbitmq() -> bool | None:
'azureservicebus': 'kombu.transport.azureservicebus:Transport',
'pyro': 'kombu.transport.pyro:Transport',
'gcpubsub': 'kombu.transport.gcpubsub:Transport',
'django': 'kombu.transport.django_kombu.transport.Transport',
}

_transport_cache = {}
Expand Down
9 changes: 9 additions & 0 deletions kombu/transport/django_kombu/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import annotations

from django.apps import AppConfig


class KombuConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "django_kombu"
label = "kombu"
106 changes: 106 additions & 0 deletions kombu/transport/django_kombu/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Generated by Django 4.2.17 on 2025-01-19 17:08

from __future__ import annotations

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

initial = True

dependencies = []

operations = [
migrations.CreateModel(
name="Exchange",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("name", models.CharField(max_length=200, unique=True)),
],
),
migrations.CreateModel(
name="Queue",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("name", models.CharField(max_length=200, unique=True)),
],
),
migrations.CreateModel(
name="Message",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("visible", models.BooleanField(db_index=True, default=True)),
(
"sent_at",
models.DateTimeField(auto_now_add=True, db_index=True, null=True),
),
("message", models.TextField()),
("version", models.PositiveIntegerField(default=1)),
(
"queue",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="messages",
to="django_kombu.queue",
),
),
],
),
migrations.CreateModel(
name="Binding",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("routing_key", models.CharField(max_length=255, null=True)),
(
"exchange",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="bindings",
to="django_kombu.exchange",
),
),
(
"queue",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="bindings",
to="django_kombu.queue",
),
),
],
),
]
Empty file.
39 changes: 39 additions & 0 deletions kombu/transport/django_kombu/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

from django.db import models


class Queue(models.Model):
name = models.CharField(max_length=200, unique=True)

def __str__(self):
return self.name


class Message(models.Model):
visible = models.BooleanField(default=True, db_index=True)
sent_at = models.DateTimeField(null=True, db_index=True, auto_now_add=True)
message = models.TextField(null=False)
version = models.PositiveIntegerField(null=False, default=1)
queue = models.ForeignKey(Queue, on_delete=models.CASCADE, related_name="messages")

def __str__(self):
return f"{self.sent_at} {self.message} {self.queue_id}"


class Exchange(models.Model):
name = models.CharField(max_length=200, unique=True)

def __str__(self):
return f"{self.name}"


class Binding(models.Model):
queue = models.ForeignKey(Queue, on_delete=models.CASCADE, related_name="bindings")
exchange = models.ForeignKey(
Exchange, on_delete=models.CASCADE, related_name="bindings"
)
routing_key = models.CharField(max_length=255, null=True)

def __str__(self):
return f"Binding: {self.queue.name} -> {self.exchange.name} with routing_key {self.routing_key}"
122 changes: 122 additions & 0 deletions kombu/transport/django_kombu/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Django Transport module for kombu.
Kombu transport using Django ORM as the message store.
Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No
Connection String
=================
.. code-block::
django:///
"""

from __future__ import annotations

import json
import logging
from queue import Empty

from django.db import transaction

from kombu.transport import virtual
from kombu.transport.django_kombu.models import (Binding, Exchange, Message,
Queue)

VERSION = (0, 0, 1)
__version__ = ".".join(map(str, VERSION))

logger = logging.getLogger(__name__)


class Channel(virtual.Channel):
"""The channel class."""

supports_fanout = True
sep = "-"

def _open(self):
pass

def _put(self, queue, message, **kwargs):
queue_instance, _ = Queue.objects.get_or_create(name=queue)
queue_instance.messages.create(message=json.dumps(message))

def _get(self, queue, timeout=None):
with transaction.atomic():
try:
queue_instance = Queue.objects.get(name=queue)
except Queue.DoesNotExist:
raise Empty()
message_instance = (
Message.objects.select_for_update(skip_locked=True)
.filter(visible=True, queue=queue_instance)
.order_by("sent_at", "id")
.first()
)
if message_instance is not None:
message_instance.visible = False
message_instance.save(update_fields=["visible"])
msg = message_instance.message
return json.loads(msg)
raise Empty()

def _purge(self, queue):
try:
queue_instance = Queue.objects.get(name=queue)
except Queue.DoesNotExist:
return
queue_instance.messages.all().delete()

def _queue_bind(self, exchange, routing_key, pattern, queue):
queue_instance, _ = Queue.objects.get_or_create(name=queue)
exchange_instance, _ = Exchange.objects.get_or_create(name=exchange)
binding, created = Binding.objects.get_or_create(
queue=queue_instance,
exchange=exchange_instance,
routing_key=routing_key,
)
if created:
logger.debug(f"Binding created: {binding}")
else:
logger.debug(f"Binding already exists: {binding}")

def _put_fanout(self, exchange, message, routing_key, **kwargs):
exchange_instance = Exchange.objects.get(name=exchange)
queues = Queue.objects.filter(
bindings__exchange=exchange_instance, bindings__routing_key=routing_key
)
logger.debug(
f"Found {len(queues)} queues bound to fanout exchange {exchange_instance.name}"
)
for queue in queues:
# Publish the message to each bound queue
logger.debug(f"Publishing message to fanout queue: {queue.name}")
self._put(queue.name, message)

def get_table(self, exchange):
exchange_instance = Exchange.objects.get(name=exchange)
bindings = exchange_instance.bindings.all()
return [(binding.routing_key, "", binding.queue.name) for binding in bindings]


class Transport(virtual.Transport):
"""The transport class"""

Channel = Channel

can_parse_url = True
driver_type = "django"
driver_name = "django"

implements = virtual.Transport.implements.extend(
exchange_type=frozenset(["direct", "topic", "fanout"])
)
1 change: 1 addition & 0 deletions requirements/extras/django.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
django>=4.2.18
1 change: 1 addition & 0 deletions requirements/test-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ urllib3>=1.26.16; sys_platform != 'win32'
-r extras/sqlalchemy.txt
-r extras/etcd.txt
-r extras/gcpubsub.txt
-r extras/django.txt
79 changes: 79 additions & 0 deletions t/unit/transport/test_django.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import annotations

from queue import Empty
from unittest.mock import patch

import django
import pytest
from django.conf import settings
from django.core.management import call_command

from kombu import Connection

settings.configure(
INSTALLED_APPS=("kombu.transport.django_kombu",),
DATABASES={"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}},
)
django.setup()
call_command("migrate", database="default")

# Need to import after setting up Django
from kombu.transport.django_kombu.models import Message, Queue # noqa: E402


@pytest.fixture
def channel():
conn = Connection("django:///")
conn.connect()
channel = conn.channel()

yield channel

channel._purge("celery")
conn.release()


def test_url_parser():
with patch("kombu.transport.django_kombu.transport.Channel._open"):
url = "django:///"
Connection(url).connect()


def test_simple_queueing(channel):
channel._put("celery", "DATA_SIMPLE_QUEUEING")
assert channel._get("celery") == "DATA_SIMPLE_QUEUEING"


def test_queueing_multiple(channel):
channel._put("celery", "DATA_SIMPLE_QUEUEING")
channel._put("celery", "DATA_SIMPLE_QUEUEING2")

assert channel._get("celery") == "DATA_SIMPLE_QUEUEING"
assert channel._get("celery") == "DATA_SIMPLE_QUEUEING2"


def test__get_queue_that_does_not_exist(channel):
with pytest.raises(Empty):
channel._get("queue_that_does_not_exist")


def test__get_queue_when_empty(channel):
channel._put("celery", "MESSAGE")
channel._get("celery")
with pytest.raises(Empty):
channel._get("celery")


def test__purge_queue_that_does_not_exist(channel):
channel._purge("queue_that_does_not_exist") # does not raise an Exception


def test_queue_name(channel):
queue = Queue.objects.create(name="celery_queue")
assert "celery_queue" in str(queue)


def test_message_name(channel):
queue = Queue.objects.create(name="another_queue")
message = Message.objects.create(message="message", queue=queue)
assert "message" in str(message)

0 comments on commit 4df3ff8

Please sign in to comment.