Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using a single session with FastAPI #179

Closed
antonkravc opened this issue Dec 7, 2020 · 11 comments · Fixed by #324
Closed

Using a single session with FastAPI #179

antonkravc opened this issue Dec 7, 2020 · 11 comments · Fixed by #324
Labels
type: feature A new feature

Comments

@antonkravc
Copy link

I have a Hasura GraphQL engine as a server with a few small services acting like webhooks for business logic and handling database events. One of theses services has a REST API and needs to retrieve data from the GraphQL engine or run mutations.
Due to performance concerns I have decided to rewrite one of the services with FastAPI in order to leverage async.

I am quite new to async in Python in general which is why I took my time to go through gql documentation.
It is my understanding that it is ideal to keep a single async client session throughout the life span of the service. It is also my understanding that the only way of getting an async session is using the async with client as session: syntax.

That poses the question of how can I wrap the whole app inside of the async with. Or perhaps I am missing something and there is a better way of doing this.

@leszekhanusz leszekhanusz added the type: question or discussion Issue discussing or asking a question about gql label Dec 7, 2020
@leszekhanusz
Copy link
Collaborator

There are multiple ways to structure your application.

You can simply use different async task running with the provided session, as described here

Or you could be more fancy and create an instance of a custom Class which will keep your session variable as a property, which will reconnect if the connection fails and warn your code that it needs to reinitialize some info...

Or you could be hackish and call aenter and aexit manually (not advised)

@antonkravc
Copy link
Author

antonkravc commented Dec 8, 2020

So this example is very similar to what I have working right now.

from gql import Client, gql

from fastapi import FastAPI
from gql.transport.aiohttp import AIOHTTPTransport


QUERY = gql(
    """
    query getSomething {
        foo {
            bar
        }
    }
    """
)

MUTATION = gql(
    """
    mutation doSomething {
        action {
            result
        }
    }
    """
)


class GraphQLEngineClient:
    def __init__(self):
        self._client = Client(
            transport=AIOHTTPTransport(
                url='http://graphql-engine/v1/graphql'
            ),
            fetch_schema_from_transport=True,
        )

    async def get_something(self):
        async with self._client as session:
            return (
                await session.execute(document=QUERY)
            ).get('foo')

    async def do_something(self):
        async with self._client as session:
            return (
                await session.execute(document=MUTATION)
            ).get('action')


client = GraphQLEngineClient()
app = FastAPI()


@app.get('/')
async def get_something():
    return await client.get_something()


@app.post('/')
async def do_something():
    return await client.do_something()

I am aware that you can pass a session around as a parameter but the question is where it should instantiated. There is no obvious entry point that can be wrapped with a async with.

@leszekhanusz
Copy link
Collaborator

leszekhanusz commented Dec 8, 2020

Something like this (untested):

class GraphQLEngineClient:
    def __init__(self):
        self._client = Client(
            transport=AIOHTTPTransport(
                url='http://graphql-engine/v1/graphql'
            ),
            fetch_schema_from_transport=True,
        )
        self._session = None
        self._connect_task = None
        self._close_event = asyncio.Event()

    @backoff.on_exception(backoff.expo, Exception, max_time=300)
    async def _connection(self):
        print ("Connecting")

        try:
            async with self._client as session:
                self._session = session
                print ("Connected")

                # Wait for the close event
                self._close_event.clear()
                await self._close_event.wait()

                print ("Closed")
                return

        finally:
            self._session = None
            print ("Disconnected")

    def connect(self):
        if not self._connect_task:
            self._connect_task = asyncio.create_task(self._connection())

    def close(self):
        self._close_event.set()

    async def execute(self, document):
        # Here, check that session is not None and add
        # necessary try/catch
        # You could also add retries if needed and possibly wait for a reconnection
        # Or you could wrap your execute function with backoff
        return await self._session.execute(document)

    async def get_something(self):
        return (await self.execute(document=QUERY)).get('foo')

    async def do_something(self):
        return (await self.execute(document=MUTATION)).get('action')


client = GraphQLEngineClient()
client.connect()

@leszekhanusz
Copy link
Collaborator

@antonkravc did it solve your question, could we close this issue?

@utd-simlab
Copy link

@leszekhanusz I'm working on a similar use case and trying to use your example. Could you give a more complete example on how to use this? I'm still not sure how to run both the connect method and the other methods.

@leszekhanusz
Copy link
Collaborator

I now realize that I made some mistakes and that it is a little bit more complex than I thought.
The problem if you create a session in a separate task like I did is that when the connection fails the _connection task is not aware that the connection has failed (the code always in the async with), so you need to detect a connection failure in the execute and warn the connection task to close and reconnect again.
I'll make a complete example for the next weekend.

@leszekhanusz
Copy link
Collaborator

Here is a complete executable example.
You'll need to install the aioconsole dependency to use it.
It is a program used to the a Continent name from a continent code using GraphQL, with the session saved in the class.

It has become quite ugly and is probably full of race conditions and deadlock bugs but it is an example (don't use this in production without testing).
The GraphQL session is started in a separate tasks when you run the connect coroutine.
I used 4 events in order to communicate to and from this task.
If we notice that the Transport is closed when we try to execute a request, then we activate the reconnect_request event so that the connection task can reconnect.

import backoff
import asyncio
import logging
from aioconsole import ainput

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.exceptions import TransportClosed

logging.basicConfig(level=logging.INFO)

GET_CONTINENT_NAME = """
    query getContinentName ($code: ID!) {
      continent (code: $code) {
        name
      }
    }
"""

class GraphQLContinentClient:
    def __init__(self):
        self._client = Client(transport=AIOHTTPTransport(url="https://countries.trevorblades.com/"))
        self._session = None
        self._connect_task = None

        self._close_request_event = asyncio.Event()
        self._reconnect_request_event = asyncio.Event()

        self._connected_event = asyncio.Event()
        self._closed_event = asyncio.Event()

        self.get_continent_name_query = gql(GET_CONTINENT_NAME)

    @backoff.on_exception(backoff.expo, Exception, max_time=300)
    async def _connection_loop(self):

        while True:
            print ("Connecting")
            try:
                async with self._client as session:
                    self._session = session
                    print ("Connected")
                    self._connected_event.set()

                    # Wait for the close or reconnect event
                    self._close_request_event.clear()
                    self._reconnect_request_event.clear()

                    close_event_task = asyncio.create_task(self._close_request_event.wait())
                    reconnect_event_task = asyncio.create_task(self._reconnect_request_event.wait())

                    events = [close_event_task, reconnect_event_task]

                    done, pending = await asyncio.wait(events, return_when=asyncio.FIRST_COMPLETED)

                    for task in pending:
                        task.cancel()

                    if close_event_task in done:
                        # If we received a closed event, then we go out of the loop
                        break

                    # If we received a reconnect event, then we disconnect and connect again

            finally:
                self._session = None
                print ("Disconnected")

        print ("Closed")
        self._closed_event.set()

    async def connect(self):
        print('connect()')
        if self._connect_task:
            print ("Already connected")
        else:
            self._connected_event.clear()
            self._connect_task = asyncio.create_task(self._connection_loop())
            await asyncio.wait_for(self._connected_event.wait(), timeout=10.0)

    async def close(self):
        print('close()')
        self._connect_task = None
        self._closed_event.clear()
        self._close_request_event.set()
        await asyncio.wait_for(self._closed_event.wait(), timeout=10.0)

    @backoff.on_exception(backoff.expo, Exception, max_tries=3)
    async def execute(self, *args, **kwargs):
        try:
            answer = await self._session.execute(*args, **kwargs)
        except TransportClosed:
            self._reconnect_request_event.set()
            raise

        return answer

    async def get_continent_name(self, code):
        params = {
            'code': code
        }

        answer = await self.execute(self.get_continent_name_query, variable_values=params)

        return answer.get('continent').get('name')

async def main():
    continent_client = GraphQLContinentClient()

    continent_codes = ['AF', 'AN', 'AS', 'EU', 'NA', 'OC', 'SA']

    await continent_client.connect()

    while True:

        answer = await ainput("\nPlease enter a continent code or 'exit':")
        answer = answer.strip()

        if answer == "exit":
            break
        elif answer in continent_codes:

            try:
                continent_name = await continent_client.get_continent_name(answer)
                print (f'The continent name is {continent_name}\n')
            except Exception as exc:
                print (f'Received exception {exc} while trying to get continent name')

        else:
            print (f'Please enter a valid continent code from {continent_codes}')

    await continent_client.close()

asyncio.run(main())

@leszekhanusz
Copy link
Collaborator

This should be made easier.

After the stable version has been done, I propose to add a new interface to create permanent sessions:

permanent_session = client.permanentSession()

await permanent_session.connect()

# Add your queries and subscriptions here, the permanent_session can be
# kept in a class and will automatically reconnect if needed

await permanent_session.close()

@leszekhanusz leszekhanusz added type: feature A new feature and removed type: question or discussion Issue discussing or asking a question about gql labels Dec 10, 2021
@yoshua0x
Copy link

Just here to share my own experience with a similar use case and share my excitement for permanent sessions.

I recently had to do some hackery to get an async implementation working with FastAPI since it requires the client session to passed in as a sub-dependency – which is a FastAPI construct – or else it interrupts the FastAPI event loop. Permanent sessions would improve this implementation.

Side note: Keep up the amazing work on the project! I've been long awaiting for a mature async GraphQL client in the Python ecosystem.

@leszekhanusz
Copy link
Collaborator

This is now implemented in #324
I'll still need to add some tests to keep the coverage at 100%
You can already try it and provide your feedback. Async code can be tricky and I'll need some real world use feedback.

@motybz
Copy link

motybz commented Jun 22, 2022

Hi @leszekhanusz, thank you for your awesome work.

I tested (stress test) it with my FastAPI project and it solve all of the session handling
Thank you very much!

I'm looking forward to the official release

(I used https://locust.io/ for the stress test)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: feature A new feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants