gql.transport.common.base

class gql.transport.common.base.SubscriptionTransportBase(*, adapter: AdapterConnection, connect_timeout: Optional[Union[int, float]] = 10, close_timeout: Optional[Union[int, float]] = 10, keep_alive_timeout: Optional[Union[int, float]] = None)

Bases: AsyncTransport

abstract Async Transport used to implement different subscription protocols (mainly websockets).

__init__(*, adapter: AdapterConnection, connect_timeout: Optional[Union[int, float]] = 10, close_timeout: Optional[Union[int, float]] = 10, keep_alive_timeout: Optional[Union[int, float]] = None) None

Initialize the transport with the given parameters.

Parameters
  • adapter – The connection dependency adapter

  • connect_timeout – Timeout in seconds for the establishment of the connection. If None is provided this will wait forever.

  • close_timeout – Timeout in seconds for the close. If None is provided this will wait forever.

  • keep_alive_timeout – Optional Timeout in seconds to receive a sign of liveness from the server.

property response_headers: Dict[str, str]
async subscribe(document: DocumentNode, variable_values: Optional[Dict[str, Any]] = None, operation_name: Optional[str] = None, send_stop: Optional[bool] = True) AsyncGenerator[ExecutionResult, None]

Send a query and receive the results using a python async generator.

The query can be a graphql query, mutation or subscription.

The results are sent as an ExecutionResult object.

async execute(document: DocumentNode, variable_values: Optional[Dict[str, Any]] = None, operation_name: Optional[str] = None) ExecutionResult

Execute the provided document AST against the configured remote server using the current session.

Send a query but close the async generator as soon as we have the first answer.

The result is sent as an ExecutionResult object.

async connect() None

Coroutine which will:

  • connect to the websocket address

  • send the init message

  • wait for the connection acknowledge from the server

  • create an asyncio task which will be used to receive and parse the answers

Should be cleaned with a call to the close coroutine

async close() None

Coroutine used to Close an established connection

async wait_closed() None
property url: str
property connect_args: Dict[str, Any]