Async permanent session¶
Sometimes you want to have a single permanent reconnecting async session to a GraphQL backend,
and that can be difficult to manage manually with the async with client as session
syntax.
It is now possible to have a single reconnecting session using the
connect_async
method of Client
with a reconnecting=True
argument.
# Create a session from the client which will reconnect automatically.
# This session can be kept in a class for example to provide a way
# to execute GraphQL queries from many different places
session = await client.connect_async(reconnecting=True)
# You can run execute or subscribe method on this session
result = await session.execute(query)
# When you want the connection to close (for cleanup),
# you call close_async
await client.close_async()
When you use reconnecting=True
, gql will watch the exceptions generated
during the execute and subscribe calls and, if it detects a TransportClosed exception
(indicating that the link to the underlying transport is broken),
it will try to reconnect to the backend again.
Retries¶
Connection retries¶
With reconnecting=True
, gql will use the backoff module to repeatedly try to connect with
exponential backoff and jitter with a maximum delay of 60 seconds by default.
You can change the default reconnecting profile by providing your own
backoff decorator to the retry_connect
argument.
# Here wait maximum 5 minutes between connection retries
retry_connect = backoff.on_exception(
backoff.expo, # wait generator (here: exponential backoff)
Exception, # which exceptions should cause a retry (here: everything)
max_value=300, # max wait time in seconds
)
session = await client.connect_async(
reconnecting=True,
retry_connect=retry_connect,
)
Execution retries¶
With reconnecting=True
, by default we will also retry up to 5 times
when an exception happens during an execute call (to manage a possible loss in the connection
to the transport).
There is no retry in case of a TransportQueryError
exception as it indicates that
the connection to the backend is working correctly.
You can change the default execute retry profile by providing your own
backoff decorator to the retry_execute
argument.
# Here Only 3 tries for execute calls
retry_execute = backoff.on_exception(
backoff.expo,
Exception,
max_tries=3,
)
session = await client.connect_async(
reconnecting=True,
retry_execute=retry_execute,
)
If you don’t want any retry on the execute calls, you can disable the retries with retry_execute=False
Note
If you want to retry even with TransportQueryError
exceptions,
then you need to make your own backoff decorator on your own method:
@backoff.on_exception(backoff.expo,
Exception,
max_tries=3)
async def execute_with_retry(session, query):
return await session.execute(query)
Subscription retries¶
There is no retry_subscribe
as it is not feasible with async generators.
If you want retries for your subscriptions, then you can do it yourself
with backoff decorators on your methods.
@backoff.on_exception(backoff.expo,
Exception,
max_tries=3,
giveup=lambda e: isinstance(e, TransportQueryError))
async def execute_subscription1(session):
async for result in session.subscribe(subscription1):
print(result)
FastAPI example¶
# First install fastapi and uvicorn:
#
# pip install fastapi uvicorn
#
# then run:
#
# uvicorn fastapi_async:app --reload
import logging
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from gql import Client, gql
from gql.client import ReconnectingAsyncClientSession
from gql.transport.aiohttp import AIOHTTPTransport
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
transport = AIOHTTPTransport(url="https://countries.trevorblades.com/graphql")
client = Client(transport=transport)
query = gql(
"""
query getContinentInfo($code: ID!) {
continent(code:$code) {
name
code
countries {
name
capital
}
}
}
"""
)
app = FastAPI()
@app.on_event("startup")
async def startup_event():
print("Connecting to GraphQL backend")
await client.connect_async(reconnecting=True)
print("End of startup")
@app.on_event("shutdown")
async def shutdown_event():
print("Shutting down GraphQL permanent connection...")
await client.close_async()
print("Shutting down GraphQL permanent connection... done")
continent_codes = [
"AF",
"AN",
"AS",
"EU",
"NA",
"OC",
"SA",
]
@app.get("/", response_class=HTMLResponse)
def get_root():
continent_links = ", ".join(
[f'<a href="continent/{code}">{code}</a>' for code in continent_codes]
)
return f"""
<html>
<head>
<title>Continents</title>
</head>
<body>
Continents: {continent_links}
</body>
</html>
"""
@app.get("/continent/{continent_code}")
async def get_continent(continent_code):
if continent_code not in continent_codes:
raise HTTPException(status_code=404, detail="Continent not found")
try:
assert isinstance(client.session, ReconnectingAsyncClientSession)
result = await client.session.execute(
query, variable_values={"code": continent_code}
)
except Exception as e:
log.debug(f"get_continent Error: {e}")
raise HTTPException(status_code=503, detail="GraphQL backend unavailable")
return result
Console example¶
import asyncio
import logging
from typing import Optional
from aioconsole import ainput
from gql import Client, gql
from gql.client import AsyncClientSession
from gql.transport.aiohttp import AIOHTTPTransport
logging.basicConfig(level=logging.INFO)
GET_CONTINENT_NAME = """
query getContinentName ($code: ID!) {
continent (code: $code) {
name
}
}
"""
class GraphQLContinentClient:
def __init__(self):
self._client = Client(
transport=AIOHTTPTransport(url="https://countries.trevorblades.com/")
)
self._session: Optional[AsyncClientSession] = None
self.get_continent_name_query = gql(GET_CONTINENT_NAME)
async def connect(self):
self._session = await self._client.connect_async(reconnecting=True)
async def close(self):
await self._client.close_async()
async def get_continent_name(self, code):
params = {"code": code}
assert self._session is not None
answer = await self._session.execute(
self.get_continent_name_query, variable_values=params
)
return answer.get("continent").get("name") # type: ignore
async def main():
continent_client = GraphQLContinentClient()
continent_codes = ["AF", "AN", "AS", "EU", "NA", "OC", "SA"]
await continent_client.connect()
while True:
answer = await ainput("\nPlease enter a continent code or 'exit':")
answer = answer.strip()
if answer == "exit":
break
elif answer in continent_codes:
try:
continent_name = await continent_client.get_continent_name(answer)
print(f"The continent name is {continent_name}\n")
except Exception as exc:
print(f"Received exception {exc} while trying to get continent name")
else:
print(f"Please enter a valid continent code from {continent_codes}")
await continent_client.close()
asyncio.run(main())