Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move worker script #97

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ First, you may want to take a look at the project structure and understand what
├── app # Main application directory.
│ ├── __init__.py # Initialization file for the app package.
│ ├── main.py # Main entry point of the FastAPI application.
├── worker.py # Worker script for background tasks.
│ │
│ ├── api # Folder containing API-related logic.
│ │ ├── __init__.py
Expand Down Expand Up @@ -598,11 +598,16 @@ First, you may want to take a look at the project structure and understand what
│ │ │ ├── cache_exceptions.py # Exceptions related to cache operations.
│ │ │ └── http_exceptions.py # HTTP-related exceptions.
│ │ │
│ │ └── utils # Utility functions and helpers.
│ │ ├── utils # Utility functions and helpers.
│ │ │ ├── __init__.py
│ │ │ ├── cache.py # Cache-related utilities.
│ │ │ ├── queue.py # Utilities for task queue management.
│ │ │ └── rate_limit.py # Rate limiting utilities.
│ │ │
│ │ └── worker # Worker script for background tasks.
│ │ ├── __init__.py
│ │ ├── cache.py # Cache-related utilities.
│ │ ├── queue.py # Utilities for task queue management.
│ │ └── rate_limit.py # Rate limiting utilities.
│ │ ├── settings.py # Worker configuration and settings.
│ │ └── functions.py # Async task definitions and management.
│ │
│ ├── crud # CRUD operations for the application.
│ │ ├── __init__.py
Expand Down Expand Up @@ -1242,7 +1247,7 @@ For `client-side caching`, all you have to do is let the `Settings` class define

### 5.10 ARQ Job Queues

Create the background task in `app/worker.py`:
Create the background task in `app/core/worker/functions.py`:

```python
...
Expand All @@ -1252,7 +1257,7 @@ async def sample_background_task(ctx, name: str) -> str:
return f"Task {name} is complete!"
```

Then add the function to the `WorkerSettings` class `functions` variable:
Then add the function to the `WorkerSettings` class `functions` variable in `app/core/worker/settings.py`:

```python
# -------- class --------
Expand Down
Empty file added src/app/core/worker/__init__.py
Empty file.
15 changes: 0 additions & 15 deletions src/app/worker.py → src/app/core/worker/functions.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import asyncio
import logging
import uvloop
from arq.connections import RedisSettings
from arq.worker import Worker

from .core.config import settings

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")

REDIS_QUEUE_HOST = settings.REDIS_QUEUE_HOST
REDIS_QUEUE_PORT = settings.REDIS_QUEUE_PORT


# -------- background tasks --------
async def sample_background_task(ctx: Worker, name: str) -> str:
Expand All @@ -27,12 +21,3 @@ async def startup(ctx: Worker) -> None:

async def shutdown(ctx: Worker) -> None:
logging.info("Worker end")


# -------- class --------
class WorkerSettings:
functions = [sample_background_task]
redis_settings = RedisSettings(host=REDIS_QUEUE_HOST, port=REDIS_QUEUE_PORT)
on_startup = startup
on_shutdown = shutdown
handle_signals = False
14 changes: 14 additions & 0 deletions src/app/core/worker/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from arq.connections import RedisSettings
from ...core.config import settings
from .functions import sample_background_task, startup, shutdown

REDIS_QUEUE_HOST = settings.REDIS_QUEUE_HOST
REDIS_QUEUE_PORT = settings.REDIS_QUEUE_PORT


class WorkerSettings:
functions = [sample_background_task]
redis_settings = RedisSettings(host=REDIS_QUEUE_HOST, port=REDIS_QUEUE_PORT)
on_startup = startup
on_shutdown = shutdown
handle_signals = False