Subscriptions

To create a subscription, you can directly call the subscribe method on the schema. This method is async and must be awaited.

import asyncio
from datetime import datetime
from graphene import ObjectType, String, Schema, Field

# Every schema requires a query.
class Query(ObjectType):
    hello = String()

    def resolve_hello(root, info):
        return "Hello, world!"

class Subscription(ObjectType):
    time_of_day = String()

    async def subscribe_time_of_day(root, info):
        while True:
            yield datetime.now().isoformat()
            await asyncio.sleep(1)

schema = Schema(query=Query, subscription=Subscription)

async def main(schema):
    subscription = 'subscription { timeOfDay }'
    result = await schema.subscribe(subscription)
    async for item in result:
        print(item.data['timeOfDay'])

asyncio.run(main(schema))

The result is an async iterator which yields items in the same manner as a query.