Async permanent session

Sometimes you want to have a single permanent reconnecting async session to a GraphQL backend, and that can be difficult to manage manually with the async with client as session syntax.

It is now possible to have a single reconnecting session using the connect_async method of Client with a reconnecting=True argument.

# Create a session from the client which will reconnect automatically.
# This session can be kept in a class for example to provide a way
# to execute GraphQL queries from many different places
session = await client.connect_async(reconnecting=True)

# You can run execute or subscribe method on this session
result = await session.execute(query)

# When you want the connection to close (for cleanup),
# you call close_async
await client.close_async()

When you use reconnecting=True, gql will watch the exceptions generated during the execute and subscribe calls and, if it detects a TransportClosed exception (indicating that the link to the underlying transport is broken), it will try to reconnect to the backend again.

Retries

Connection retries

With reconnecting=True, gql will use the backoff module to repeatedly try to connect with exponential backoff and jitter with a maximum delay of 60 seconds by default.

You can change the default reconnecting profile by providing your own backoff decorator to the retry_connect argument.

# Here wait maximum 5 minutes between connection retries
retry_connect = backoff.on_exception(
    backoff.expo,  # wait generator (here: exponential backoff)
    Exception,     # which exceptions should cause a retry (here: everything)
    max_value=300, # max wait time in seconds
)
session = await client.connect_async(
    reconnecting=True,
    retry_connect=retry_connect,
)

Execution retries

With reconnecting=True, by default we will also retry up to 5 times when an exception happens during an execute call (to manage a possible loss in the connection to the transport).

There is no retry in case of a TransportQueryError exception as it indicates that the connection to the backend is working correctly.

You can change the default execute retry profile by providing your own backoff decorator to the retry_execute argument.

# Here Only 3 tries for execute calls
retry_execute = backoff.on_exception(
    backoff.expo,
    Exception,
    max_tries=3,
    giveup=lambda e: isinstance(e, TransportQueryError),
)
session = await client.connect_async(
    reconnecting=True,
    retry_execute=retry_execute,
)

If you don’t want any retry on the execute calls, you can disable the retries with retry_execute=False

Subscription retries

There is no retry_subscribe as it is not feasible with async generators. If you want retries for your subscriptions, then you can do it yourself with backoff decorators on your methods.

@backoff.on_exception(backoff.expo,
                      Exception,
                      max_tries=3,
                      giveup=lambda e: isinstance(e, TransportQueryError))
async def execute_subscription1(session):
    async for result in session.subscribe(subscription1):
        print(result)

FastAPI example

# First install fastapi and uvicorn:
#
# pip install fastapi uvicorn
#
# then run:
#
# uvicorn fastapi_async:app --reload

import logging

from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)

transport = AIOHTTPTransport(url="https://countries.trevorblades.com/graphql")

client = Client(transport=transport)

query = gql(
    """
query getContinentInfo($code: ID!) {
  continent(code:$code) {
    name
    code
    countries  {
      name
      capital
    }
  }
}
"""
)

app = FastAPI()


@app.on_event("startup")
async def startup_event():
    print("Connecting to GraphQL backend")

    await client.connect_async(reconnecting=True)
    print("End of startup")


@app.on_event("shutdown")
async def shutdown_event():
    print("Shutting down GraphQL permanent connection...")
    await client.close_async()
    print("Shutting down GraphQL permanent connection... done")


continent_codes = [
    "AF",
    "AN",
    "AS",
    "EU",
    "NA",
    "OC",
    "SA",
]


@app.get("/", response_class=HTMLResponse)
def get_root():

    continent_links = ", ".join(
        [f'<a href="continent/{code}">{code}</a>' for code in continent_codes]
    )

    return f"""
    <html>
        <head>
            <title>Continents</title>
        </head>
        <body>
            Continents: {continent_links}
        </body>
    </html>
"""


@app.get("/continent/{continent_code}")
async def get_continent(continent_code):

    if continent_code not in continent_codes:
        raise HTTPException(status_code=404, detail="Continent not found")

    try:
        result = await client.session.execute(
            query, variable_values={"code": continent_code}
        )
    except Exception as e:
        log.debug(f"get_continent Error: {e}")
        raise HTTPException(status_code=503, detail="GraphQL backend unavailable")

    return result

Console example

import asyncio
import logging

from aioconsole import ainput

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

logging.basicConfig(level=logging.INFO)

GET_CONTINENT_NAME = """
    query getContinentName ($code: ID!) {
      continent (code: $code) {
        name
      }
    }
"""


class GraphQLContinentClient:
    def __init__(self):
        self._client = Client(
            transport=AIOHTTPTransport(url="https://countries.trevorblades.com/")
        )
        self._session = None

        self.get_continent_name_query = gql(GET_CONTINENT_NAME)

    async def connect(self):
        self._session = await self._client.connect_async(reconnecting=True)

    async def close(self):
        await self._client.close_async()

    async def get_continent_name(self, code):
        params = {"code": code}

        answer = await self._session.execute(
            self.get_continent_name_query, variable_values=params
        )

        return answer.get("continent").get("name")


async def main():
    continent_client = GraphQLContinentClient()

    continent_codes = ["AF", "AN", "AS", "EU", "NA", "OC", "SA"]

    await continent_client.connect()

    while True:

        answer = await ainput("\nPlease enter a continent code or 'exit':")
        answer = answer.strip()

        if answer == "exit":
            break
        elif answer in continent_codes:

            try:
                continent_name = await continent_client.get_continent_name(answer)
                print(f"The continent name is {continent_name}\n")
            except Exception as exc:
                print(f"Received exception {exc} while trying to get continent name")

        else:
            print(f"Please enter a valid continent code from {continent_codes}")

    await continent_client.close()


asyncio.run(main())