Skip to content

Commit

Permalink
feat: add delay queues support (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Nov 19, 2022
1 parent 6d766e6 commit 62b5db6
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 206 deletions.
129 changes: 129 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Simple queue system based on FastAPI and PostgreSQL.
- Message filtering support.
- Dead queue support.
- Redrive support (move messages between queues).
- Delay queues support.
- Prometheus metrics support.
- Simplicity, it does the minimum necessary, it will not have an authentication/permission scheme among other things.

Expand Down Expand Up @@ -148,6 +149,7 @@ content-type: application/json
"message_retention_seconds":1209600,
"message_filters":null,
"message_max_deliveries":null,
"delivery_delay_seconds":null,
"created_at":"2022-10-05T21:51:44.684743",
"updated_at":"2022-10-05T21:51:44.684743"
}
Expand Down Expand Up @@ -353,6 +355,7 @@ content-type: application/json
]
},
"message_max_deliveries":null,
"delivery_delay_seconds":null,
"created_at":"2022-10-05T23:42:10.322336",
"updated_at":"2022-10-05T23:42:10.322336"
}
Expand Down Expand Up @@ -479,6 +482,7 @@ content-type: application/json
"message_retention_seconds":1209600,
"message_filters":null,
"message_max_deliveries":null,
"delivery_delay_seconds":null,
"created_at":"2022-10-06T00:33:04.707829",
"updated_at":"2022-10-06T00:33:04.707829"
}
Expand Down Expand Up @@ -511,6 +515,7 @@ content-type: application/json
"message_retention_seconds":1209600,
"message_filters":null,
"message_max_deliveries":2,
"delivery_delay_seconds":null,
"created_at":"2022-10-05T21:51:44.684743",
"updated_at":"2022-10-06T01:05:56.066023"
}
Expand Down Expand Up @@ -747,6 +752,130 @@ content-type: application/json
{"num_undelivered_messages":2,"oldest_unacked_message_age_seconds":787}
```

## Delay queues

Delay queues let you postpone the delivery of new messages to consumers for a number of seconds.

```bash
curl -i -X 'POST' \
'http://localhost:8000/queues' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"id": "delayed-events",
"topic_id": "events",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 30
}'

HTTP/1.1 201 Created
date: Sat, 19 Nov 2022 17:53:42 GMT
server: uvicorn
content-length: 291
content-type: application/json

{
"id":"delayed-events",
"topic_id":"events",
"dead_queue_id":null,
"ack_deadline_seconds":30,
"message_retention_seconds":1209600,
"message_filters":null,
"message_max_deliveries":null,
"delivery_delay_seconds":30,
"created_at":"2022-11-19T17:53:42.858140",
"updated_at":"2022-11-19T17:53:42.858140"
}
```

```bash
curl -i -X 'POST' \
'http://localhost:8000/topics/events/messages' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"data": {"event_name": "event3", "success": true},
"attributes": {"event_name": "event3"}
}'

HTTP/1.1 201 Created
date: Sat, 19 Nov 2022 18:02:08 GMT
server: uvicorn
content-length: 359
content-type: application/json

{
"data":[
{
"id":"ed42463c-7725-4c3d-9de0-142bb0074be6",
"queue_id":"delayed-events",
"data":{
"event_name":"event3",
"success":true
},
"attributes":{
"event_name":"event3"
},
"delivery_attempts":0,
"expired_at":"2022-12-03T18:02:09.435449",
"scheduled_at":"2022-11-19T18:02:39.435449",
"created_at":"2022-11-19T18:02:09.435449",
"updated_at":"2022-11-19T18:02:09.435449"
}
]
}
```

```bash
curl -i -X 'GET' \
'http://localhost:8000/queues/delayed-events/messages' \
-H 'accept: application/json'

HTTP/1.1 200 OK
date: Sat, 19 Nov 2022 18:02:24 GMT
server: uvicorn
content-length: 11
content-type: application/json

{"data":[]}
```

Wait for 30 seconds and try again:

```bash
curl -i -X 'GET' \
'http://localhost:8000/queues/delayed-events/messages' \
-H 'accept: application/json'

HTTP/1.1 200 OK
date: Sat, 19 Nov 2022 18:02:58 GMT
server: uvicorn
content-length: 359
content-type: application/json

{
"data":[
{
"id":"ed42463c-7725-4c3d-9de0-142bb0074be6",
"queue_id":"delayed-events",
"data":{
"success":true,
"event_name":"event3"
},
"attributes":{
"event_name":"event3"
},
"delivery_attempts":1,
"expired_at":"2022-12-03T18:02:09.435449",
"scheduled_at":"2022-11-19T18:03:28.853622",
"created_at":"2022-11-19T18:02:09.435449",
"updated_at":"2022-11-19T18:02:58.853622"
}
]
}
```

## Prometheus metrics

You can enable prometheus metrics using the environment variable `fastqueue_enable_prometheus_metrics='true'`.
Expand Down
28 changes: 28 additions & 0 deletions alembic/versions/002_add_delivery_delay_seconds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Auto generated
Revision ID: 4c4cb56442a9
Revises: fe94d60449f2
Create Date: 2022-11-19 14:16:04.956922
"""
import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision = "4c4cb56442a9"
down_revision = "fe94d60449f2"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("queues", sa.Column("delivery_delay_seconds", sa.Integer(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("queues", "delivery_delay_seconds")
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ fastqueue_max_message_retention_seconds='1209600'
fastqueue_min_message_max_deliveries='1'
fastqueue_max_message_max_deliveries='1000'
fastqueue_queue_cleanup_interval_seconds='60'
fastqueue_min_delivery_delay_seconds='1'
fastqueue_max_delivery_delay_seconds='900'

fastqueue_enable_prometheus_metrics='false'
2 changes: 2 additions & 0 deletions env.sample.default
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ fastqueue_max_message_retention_seconds='1209600'
fastqueue_min_message_max_deliveries='1'
fastqueue_max_message_max_deliveries='1000'
fastqueue_queue_cleanup_interval_seconds='60'
fastqueue_min_delivery_delay_seconds='1'
fastqueue_max_delivery_delay_seconds='900'

fastqueue_enable_prometheus_metrics='false'
2 changes: 2 additions & 0 deletions fastqueue/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class Settings(BaseSettings):
min_message_max_deliveries: int = 1
max_message_max_deliveries: int = 1000
queue_cleanup_interval_seconds: int = 60
min_delivery_delay_seconds: int = 1
max_delivery_delay_seconds: int = 900

# prometheus metrics
enable_prometheus_metrics: bool = False
Expand Down
1 change: 1 addition & 0 deletions fastqueue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Queue(Base):
message_retention_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_filters = sqlalchemy.Column(postgresql.JSONB, nullable=True)
message_max_deliveries = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
delivery_delay_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)
updated_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)

Expand Down
7 changes: 7 additions & 0 deletions fastqueue/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class CreateQueueSchema(Schema):
message_max_deliveries: int | None = Field(
None, ge=settings.min_message_max_deliveries, le=settings.max_message_max_deliveries
)
delivery_delay_seconds: int | None = Field(
None, ge=settings.min_delivery_delay_seconds, le=settings.max_delivery_delay_seconds
)

@root_validator()
def message_max_deliveries_is_required_for_dead_queue_id(cls, values):
Expand All @@ -74,6 +77,9 @@ class UpdateQueueSchema(Schema):
message_max_deliveries: int | None = Field(
None, ge=settings.min_message_max_deliveries, le=settings.max_message_max_deliveries
)
delivery_delay_seconds: int | None = Field(
None, ge=settings.min_delivery_delay_seconds, le=settings.max_delivery_delay_seconds
)

@root_validator()
def message_max_deliveries_is_required_for_dead_queue_id(cls, values):
Expand All @@ -88,6 +94,7 @@ class QueueSchema(Schema):
message_retention_seconds: int
message_filters: dict[str, list[str]] | None
message_max_deliveries: int | None
delivery_delay_seconds: int | None
created_at: datetime
updated_at: datetime

Expand Down
17 changes: 14 additions & 3 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
message_retention_seconds=data.message_retention_seconds,
message_filters=data.message_filters,
message_max_deliveries=data.message_max_deliveries,
delivery_delay_seconds=data.delivery_delay_seconds,
created_at=now,
updated_at=now,
)
Expand All @@ -143,6 +144,7 @@ def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSche
queue.message_retention_seconds = data.message_retention_seconds
queue.message_filters = data.message_filters
queue.message_max_deliveries = data.message_max_deliveries
queue.delivery_delay_seconds = data.delivery_delay_seconds
queue.created_at = queue.created_at
queue.updated_at = datetime.utcnow()
session.commit()
Expand Down Expand Up @@ -212,11 +214,14 @@ def _cleanup_move_messages_to_dead_queue(cls, queue: QueueSchema, session: Sessi
Message.delivery_attempts >= queue.message_max_deliveries,
]
now = datetime.utcnow()
scheduled_at = now
if dead_queue.delivery_delay_seconds is not None:
scheduled_at = now + timedelta(seconds=dead_queue.delivery_delay_seconds)
update_data = {
"queue_id": queue.dead_queue_id,
"delivery_attempts": 0,
"expired_at": now + timedelta(seconds=dead_queue.message_retention_seconds),
"scheduled_at": now,
"scheduled_at": scheduled_at,
"updated_at": now,
}
session.query(Message).filter(*delivery_attempts_filter).update(update_data)
Expand All @@ -235,12 +240,15 @@ def redrive(cls, id: str, data: RedriveQueueSchema, session: Session) -> None:
queue = get_model(model=Queue, filters={"id": id}, session=session)
destination_queue = get_model(model=Queue, filters={"id": data.destination_queue_id}, session=session)
now = datetime.utcnow()
scheduled_at = now
if destination_queue.delivery_delay_seconds is not None:
scheduled_at = now + timedelta(seconds=destination_queue.delivery_delay_seconds)
filters = get_filters_for_consume(queue, now)
update_data = {
"queue_id": destination_queue.id,
"delivery_attempts": 0,
"expired_at": now + timedelta(seconds=destination_queue.message_retention_seconds),
"scheduled_at": now,
"scheduled_at": scheduled_at,
"updated_at": now,
}
session.query(Message).filter(*filters).update(update_data)
Expand Down Expand Up @@ -279,14 +287,17 @@ def create(cls, topic_id: str, data: CreateMessageSchema, session: Session) -> L
continue

now = datetime.utcnow()
scheduled_at = now
if queue.delivery_delay_seconds is not None:
scheduled_at = now + timedelta(seconds=queue.delivery_delay_seconds)
message = Message(
id=uuid.uuid4().hex,
queue_id=queue.id,
data=data.data,
attributes=data.attributes,
delivery_attempts=0,
expired_at=now + timedelta(seconds=queue.message_retention_seconds),
scheduled_at=now,
scheduled_at=scheduled_at,
created_at=now,
updated_at=now,
)
Expand Down
Loading

0 comments on commit 62b5db6

Please sign in to comment.