-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Question: Support sqlalchemy AsyncSession ? #324
Comments
I would be willing to help on this with a PR, if the maintainers are interested 😉 The question I would have about a possible PR is, if the asyncio part should be its own subpackage, similar to how sqlalchemy handles it (https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html), or if another integration is preferable 😉 If the general structure is agreed, I would start with a draft PR 😉 class UserSql(Base):
__tablename__ = "users"
id = Column(String, primary_key=True)
username = Column(String, nullable=False)
class User(SQLAlchemyObjectType):
class Meta:
model = UserSql
interfaces = (relay.Node,)
@classmethod
async def get_node(cls, info, id):
session = get_session(info.context)
return await session.get(cls._meta.model, id)
class Query(graphene.ObjectType):
node = relay.Node.Field()
example_schema = graphene.Schema(query=Query) from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session, create_async_engine
from sqlalchemy.orm import sessionmaker
_engine = create_async_engine(settings.database_url)
class SessionProvider:
def __init__(self, engine=None):
if engine is None:
engine = _engine
self.engine = engine
self._session_class = sessionmaker(bind=engine, class_=AsyncSession)
def create_session(self, **kwargs) -> AsyncSession:
return self._session_class(**kwargs)
def create_scoped_session(self) -> AsyncSession:
return async_scoped_session(self._session_class, scopefunc=get_request_id)
session_provider = SessionProvider(engine=_engine) import pytest
from example_schema import UserSql, example_schema
from base import Base
from session import _engine, session_provider
@pytest.mark.asyncio
async def test_get_node(setup_example_db, session):
global_id = to_global_id("User", 0)
executed = await example_schema.execute_async(
"""
query Node($id: ID!) {
node(id: $id) {
... on User {
id
}
}
}
""",
variable_values={"id": global_id},
context_value={"session": session},
)
assert executed.errors is None
user = executed.data["node"]
invoice_id = from_global_id(user["id"])[1]
assert invoice_id == str(0)
@pytest.fixture
def example_users():
return [UserSql(id=0, username="karen"), UserSql(id=1, username="peter")]
@pytest.fixture
async def session():
session = session_provider.create_session()
yield session
await session.close()
@pytest.fixture
async def setup_example_db(example_users, session):
async with _engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async with session:
session.add_all(example_users)
await session.commit() |
The idea of implementing async sessions is great. |
From what I found relationships should be fine, if not loaded lazily, which would conflict with this section of the sqlalchemy docs: https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#preventing-implicit-io-when-using-asyncsession. So setting lazy in the relationships to |
We should clearly communicate the lack of support for certain lazy loading techniques then. It seems like batching isn't working with sqa-1.4, so maybe that should be fixed first, since an async query should definitely be compatible with all the work done on the N+1 problem. Check batching.py, it seems to use a single session derived from query context. |
I very much agree on all the point 👍 I will check if I can come up with something that moves this to the meta class 👍 I will check how the async mutations are done to figure, if I can draw inspiration from that. Same goes for the batching part 🙂 |
Hey, I've looked into it again. |
hey all. Thank You for response.
@classmethod
def get_query(cls, model, info, sort=None, **args):
query = get_query(model, info.context)
if sort is not None:
if not isinstance(sort, list):
sort = [sort]
sort_args = []
# ensure consistent handling of graphene Enums, enum values and
# plain strings
for item in sort:
if isinstance(item, enum.Enum):
sort_args.append(item.value.value)
elif isinstance(item, EnumValue):
sort_args.append(item.value)
else:
sort_args.append(item)
query = query.order_by(*sort_args)
return query to @classmethod
async def get_query(cls, model, info, sort=None, **args):
query = None
if sort is not None:
if not isinstance(sort, list):
sort = [sort]
sort_args = []
# ensure consistent handling of graphene Enums, enum values and
# plain strings
for item in sort:
if isinstance(item, enum.Enum):
sort_args.append(item.value.value)
elif isinstance(item, EnumValue):
sort_args.append(item.value)
else:
sort_args.append(item)
query = select(model).order_by(*sort_args)
return query and any updates in code lib. |
Implemented in #350, thanks to @jendrikjoe |
This issue has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related topics referencing this issue. |
Sqlalchemy in 1.4 support asynchronous I/O. I would like to know about plans to support async 1.4+ sqlalchemy. Will this be implemented in the graphene-sqlalchemy?
The text was updated successfully, but these errors were encountered: