Skip to content

Commit

Permalink
✨ Connections manager for SQLAlchemy module (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
perdy authored and migduroli committed Sep 3, 2024
1 parent a2ef280 commit 869711d
Show file tree
Hide file tree
Showing 11 changed files with 899 additions and 239 deletions.
147 changes: 117 additions & 30 deletions flama/ddd/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from flama.ddd import types
from flama.ddd.repositories import AbstractRepository, SQLAlchemyRepository
from flama.exceptions import ApplicationError

if t.TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncConnection
Expand All @@ -17,95 +18,181 @@


class WorkerType(abc.ABCMeta):
"""Metaclass for workers.
It will gather all the repositories defined in the class as class attributes as a single dictionary under the name
`_repositories`.
"""

def __new__(mcs, name: str, bases: t.Tuple[type], namespace: t.Dict[str, t.Any]):
namespace["_repositories"] = types.Repositories(
{
k: v
for k, v in namespace.get("__annotations__", {}).items()
if inspect.isclass(v) and issubclass(v, AbstractRepository)
if mcs._is_abstract_worker(namespace) and "__annotations__" in namespace:
namespace["_repositories"] = types.Repositories(
{
k: v
for k, v in namespace["__annotations__"].items()
if inspect.isclass(v) and issubclass(v, AbstractRepository)
}
)

namespace["__annotations__"] = {
k: v for k, v in namespace["__annotations__"].items() if k not in namespace["_repositories"]
}
)

return super().__new__(mcs, name, bases, namespace)

@staticmethod
def _is_abstract_worker(namespace: t.Dict[str, t.Any]) -> bool:
return namespace.get("__module__") != "flama.ddd.workers" or namespace.get("__qualname__") != "AbstractWorker"


class AbstractWorker(abc.ABC, metaclass=WorkerType):
"""Abstract class for workers.
It will be used to define the workers for the application. A worker consists of a set of repositories that will be
used to interact with entities and a mechanism for isolate a single unit of work.
"""

class AbstractWorker(abc.ABC):
_repositories: t.ClassVar[t.Dict[str, t.Type[AbstractRepository]]]

def __init__(self, app: t.Optional["Flama"] = None):
"""Initialize the worker.
It will receive the application instance as a parameter.
:param app: Application instance.
"""
self._app = app

@property
def app(self) -> "Flama":
assert self._app, "Worker not initialized"
"""Application instance.
:return: Application instance.
"""
if not self._app:
raise ApplicationError("Worker not initialized")

return self._app

@app.setter
def app(self, app: "Flama"):
def app(self, app: "Flama") -> None:
"""Set the application instance.
:param app: Application instance.
"""
self._app = app

@app.deleter
def app(self):
def app(self) -> None:
"""Delete the application instance."""
self._app = None

@abc.abstractmethod
async def __aenter__(self) -> "AbstractWorker":
async def begin(self) -> None:
"""Start a unit of work."""
...

@abc.abstractmethod
async def __aexit__(self, exc_type, exc_val, exc_tb):
async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
:param rollback: If the unit of work should be rolled back.
"""
...

async def __aenter__(self) -> "AbstractWorker":
"""Start a unit of work."""
await self.begin()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""End a unit of work."""
await self.end(rollback=exc_type is not None)

@abc.abstractmethod
async def commit(self) -> None:
"""Commit the unit of work."""
...

@abc.abstractmethod
async def rollback(self) -> None:
"""Rollback the unit of work."""
...


class SQLAlchemyWorker(AbstractWorker, metaclass=WorkerType):
class SQLAlchemyWorker(AbstractWorker):
_repositories: t.ClassVar[t.Dict[str, t.Type[SQLAlchemyRepository]]]
_connection: "AsyncConnection"
_transaction: "AsyncTransaction"

@property
def connection(self) -> "AsyncConnection":
"""Connection to the database.
:return: Connection to the database.
:raises AttributeError: If the connection is not initialized.
"""
try:
return self._connection
except AttributeError:
raise AttributeError("Connection not initialized")

async def begin(self):
self._connection = self.app.sqlalchemy.engine.connect()
await self._connection.__aenter__()
self._transaction = self._connection.begin()
await self._transaction
@property
def transaction(self) -> "AsyncTransaction":
"""Database transaction.
async def close(self):
if hasattr(self, "_transaction"):
await self._transaction.__aexit__(None, None, None)
del self._transaction
:return: Database transaction.
:raises AttributeError: If the transaction is not started.
"""
try:
return self._transaction
except AttributeError:
raise AttributeError("Transaction not started")

if hasattr(self, "_connection"):
await self._connection.__aexit__(None, None, None)
del self._connection
async def begin_transaction(self) -> None:
"""Open a connection and begin a transaction."""

self._connection = await self.app.sqlalchemy.open_connection()
self._transaction = await self.app.sqlalchemy.begin_transaction(self._connection)

async def end_transaction(self, *, rollback: bool = False) -> None:
"""End a transaction and close the connection.
:param rollback: If the transaction should be rolled back.
:raises AttributeError: If the connection is not initialized or the transaction is not started.
"""
await self.app.sqlalchemy.end_transaction(self.transaction, rollback=rollback)
del self._transaction

await self.app.sqlalchemy.close_connection(self.connection)
del self._connection

async def begin(self) -> None:
"""Start a unit of work.
Initialize the connection, begin a transaction, and create the repositories.
"""
await self.begin_transaction()

async def __aenter__(self):
await self.begin()
for repository, repository_class in self._repositories.items():
setattr(self, repository, repository_class(self.connection))
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
Close the connection, commit or rollback the transaction, and delete the repositories.
:param rollback: If the unit of work should be rolled back.
"""
await self.end_transaction(rollback=rollback)

for repository in self._repositories.keys():
delattr(self, repository)

async def commit(self):
"""Commit the unit of work."""
await self.connection.commit()

async def rollback(self):
"""Rollback the unit of work."""
await self.connection.rollback()
4 changes: 4 additions & 0 deletions flama/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class ApplicationError(Exception):
...


class SQLAlchemyError(ApplicationError):
...


class DecodeError(Exception):
"""
Raised by a Codec when `decode` fails due to malformed syntax.
Expand Down
46 changes: 39 additions & 7 deletions flama/resources/workers.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,64 @@
import typing as t

from flama.ddd import SQLAlchemyWorker
from flama.exceptions import ApplicationError

if t.TYPE_CHECKING:
from flama import Flama
from flama.ddd.repositories import SQLAlchemyTableRepository


class FlamaWorker(SQLAlchemyWorker):
_repositories: t.ClassVar[t.Dict[str, t.Type["SQLAlchemyTableRepository"]]]
"""The worker used by Flama Resources."""

_repositories: t.Dict[str, t.Type["SQLAlchemyTableRepository"]]

def __init__(self, app: t.Optional["Flama"] = None):
"""Initialize the worker.
This special worker is used to handle the repositories created by Flama Resources.
:param app: The application instance.
"""

super().__init__(app)
self._repositories = {}
self._init_repositories: t.Optional[t.Dict[str, "SQLAlchemyTableRepository"]] = None

@property
def repositories(self) -> t.Dict[str, "SQLAlchemyTableRepository"]:
assert self._init_repositories, "Repositories not initialized"
"""Get the initialized repositories.
:retirns: The initialized repositories.
:raises ApplicationError: If the repositories are not initialized.
"""
if not self._init_repositories:
raise ApplicationError("Repositories not initialized")

return self._init_repositories

async def __aenter__(self):
await self.begin()
async def begin(self) -> None:
"""Start a unit of work.
Initialize the connection, begin a transaction, and create the repositories.
"""
await self.begin_transaction()
self._init_repositories = {r: cls(self.connection) for r, cls in self._repositories.items()}
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
Close the connection, commit or rollback the transaction, and delete the repositories.
:param rollback: If the unit of work should be rolled back.
"""
await self.end_transaction(rollback=rollback)
del self._init_repositories

def add_repository(self, name: str, cls: t.Type["SQLAlchemyTableRepository"]) -> None:
"""Register a repository.
:param name: The name of the repository.
:param cls: The class of the repository.
"""
self._repositories[name] = cls
Loading

0 comments on commit 869711d

Please sign in to comment.