Skip to content

Advanced Tutorial

Tip

I am still working on the docs. Feel free to DM me on Twitter if you have questions or feedback.

The reason I built FastGQL was to address the n+1 problem I had with other frameworks.

Consider this working example using EdgeDB as the database:

Full file 👀
import json
import time
import typing as T
from uuid import UUID
import edgedb
from pydantic import TypeAdapter
from fastapi import FastAPI
from fastgql import GQL, GQLInterface, build_router
from dotenv import load_dotenv

load_dotenv()

edgedb_client = edgedb.create_async_client()

Contents = list[T.Union["Movie", "Show"]]


async def query_required_single_json(
    name: str, query: str, **variables
) -> dict[str, T.Any]:
    start = time.time()
    res = json.loads(
        await edgedb_client.query_required_single_json(query=query, **variables)
    )
    took_ms = round((time.time() - start) * 1_000, 2)
    print(f"[{name}] took {took_ms} ms")
    return res


class Account(GQL):
    id: UUID
    username: str

    async def watchlist(self, limit: int) -> Contents:
        q = """select Account {
            watchlist: { id, title, release_year := [is Movie].release_year } limit <int64>$limit
        } filter .id = <uuid>$id"""
        account_d = await query_required_single_json(
            name="account.watchlist", query=q, id=self.id, limit=limit
        )
        return TypeAdapter(Contents).validate_python(account_d["watchlist"])


class Person(GQL):
    id: UUID
    name: str

    async def filmography(self) -> Contents:
        q = """select Person {
            filmography: { id, title, release_year := [is Movie].release_year }
        } filter .id = <uuid>$id"""
        person_d = await query_required_single_json(
            name="person.filmography", query=q, id=self.id
        )
        return TypeAdapter(Contents).validate_python(person_d["filmography"])


class Content(GQLInterface):
    id: UUID
    title: str

    async def actors(self) -> list["Person"]:
        q = """select Content { actors: { id, name } } filter .id = <uuid>$id"""
        content_d = await query_required_single_json(
            name="content.actors", query=q, id=self.id
        )
        return [Person(**p) for p in content_d["actors"]]


class Movie(Content):
    release_year: int


class Show(Content):
    async def seasons(self) -> list["Season"]:
        q = """select Show { season := .<show[is Season] { id, number } } filter .id = <uuid>$id"""
        show_d = await query_required_single_json(
            name="show.seasons", query=q, id=self.id
        )
        return [Season(**s) for s in show_d["season"]]

    async def num_seasons(self) -> int:
        q = """select Show { num_seasons } filter .id = <uuid>$id"""
        show_d = await query_required_single_json(
            name="show.num_seasons", query=q, id=self.id
        )
        return show_d["num_seasons"]


class Season(GQL):
    id: UUID
    number: int

    async def show(self) -> Show:
        q = """select Season { show: { id, title } } filter .id = <uuid>$id"""
        season_d = await query_required_single_json(
            name="season.show", query=q, id=self.id
        )
        return Show(**season_d["show"])


class Query(GQL):
    @staticmethod
    async def account_by_username(username: str) -> Account:
        q = """select Account { id, username } filter .username = <str>$username"""
        account_d = await query_required_single_json(
            name="account_by_username", query=q, username=username
        )
        return Account(**account_d)


router = build_router(query_models=[Query])

app = FastAPI()

app.include_router(router, prefix="/graphql")

For each connection, we have to make a new query. For example, if your query is:

{
  accountByUsername(username: "Cameron") {
    id
    username
    watchlist(limit: 100) {
      __typename
      ...on Movie {
        id
        title
        releaseYear
        actors {
          name
        }
      }
      ... on Show {
        id
        title
      }
    }
  }
}

You get back a lot of nested data. For each nested data you get, that's another database call. For example, to get actors from a movie:

class Content(GQLInterface):
    id: UUID
    title: str

    async def actors(self) -> list["Person"]:
        q = """select Content { actors: { id, name } } filter .id = <uuid>$id"""
        content_d = await query_required_single_json(
            name="content.actors", query=q, id=self.id
        )
        return [Person(**p) for p in content_d["actors"]]

So, to execute this query, the server had to: 1) get the account by username from the database, 2) get the watchlist of that user from the database, 3) get the actor of each movie from the database

There are some solutions to make this process more efficient. One of them is using dataloaders.

However, even with a dataloader, you are still making new requests to the database for each new level of data you are requesting.

FastGQL comes with a way to solve this problem. It ships with QueryBuilder functionality. This allows you to map your GraphQL schema to your database schema, which means you can dynamically generate the exact database query you need to fulfill the client's request.

Note

Currently QueryBuilder only works with EdgeDB.

Here is a full example of the same schema, now using the QueryBuilder feature.

Full file 👀
import json
import time
import typing as T
from uuid import UUID
import edgedb
from fastapi import FastAPI
from fastgql import (
    GQL,
    GQLInterface,
    build_router,
    Link,
    Property,
    get_qb,
    QueryBuilder,
    Depends,
    Info,
    node_from_path,
)
from dotenv import load_dotenv

load_dotenv()

edgedb_client = edgedb.create_async_client()

Contents = list[T.Union["Movie", "Show"]]


def parse_raw_content(raw_content: list[dict, T.Any]) -> Contents:
    w_list: Contents = []
    for item in raw_content:
        if item["typename"] == "default::Movie":
            if movie := item.get("Movie"):
                w_list.append(Movie(**movie))
        elif item["typename"] == "default::Show":
            if show := item.get("Show"):
                w_list.append(Show(**show))
    return w_list


async def query_required_single_json(
    name: str, query: str, **variables
) -> dict[str, T.Any]:
    start = time.time()
    res = json.loads(
        await edgedb_client.query_required_single_json(query=query, **variables)
    )
    took_ms = round((time.time() - start) * 1_000, 2)
    print(f"[{name}] took {took_ms} ms")
    return res


class AccountPageInfo(GQL):
    has_next_page: bool
    has_previous_page: bool
    start_cursor: str | None
    end_cursor: str | None


class AccountEdge(GQL):
    cursor: str
    node: "Account"


class AccountConnection(GQL):
    page_info: AccountPageInfo
    edges: list[AccountEdge]
    total_count: int


def update_watchlist(child_qb: QueryBuilder, limit: int) -> None:
    child_qb.set_limit(limit)


class Account(GQL):
    def __init__(self, **data):
        super().__init__(**data)
        self._data = data

    id: T.Annotated[UUID, Property(db_name="id")] = None
    username: T.Annotated[str, Property(db_name="username")] = None

    async def watchlist(
        self, info: Info, limit: int
    ) -> T.Annotated[Contents, Link(db_name="watchlist", update_qbs=update_watchlist)]:
        return parse_raw_content(raw_content=self._data[info.path[-1]])


class Content(GQLInterface):
    def __init__(self, **data):
        super().__init__(**data)
        self._data = data

    id: T.Annotated[UUID, Property(db_name="id")] = None
    title: T.Annotated[str, Property(db_name="title")] = None

    async def actors(
        self, info: Info
    ) -> T.Annotated[list["Person"], Link(db_name="actors")]:
        return [Person(**p) for p in self._data[info.path[-1]]]


class Movie(Content):
    release_year: T.Annotated[int, Property(db_name="release_year")] = None


class Show(Content):
    num_seasons: T.Annotated[int, Property(db_name="num_seasons")] = None

    async def seasons(
        self, info: Info
    ) -> T.Annotated[list["Season"], Link(db_name="<show[is Season]")]:
        return [Season(**s) for s in self._data[info.path[-1]]]


class Season(GQL):
    def __init__(self, **data):
        super().__init__(**data)
        self._data = data

    id: T.Annotated[UUID, Property(db_name="id")] = None
    number: T.Annotated[int, Property(db_name="number")] = None

    async def show(self, info: Info) -> T.Annotated[Show, Link(db_name="show")]:
        return Show(**self._data[info.path[-1]])


class Person(GQL):
    def __init__(self, **data):
        super().__init__(**data)
        self._data = data

    id: T.Annotated[UUID, Property(db_name="id")] = None
    name: T.Annotated[str, Property(db_name="name")] = None

    async def filmography(
        self, info: Info
    ) -> T.Annotated[Contents, Link(db_name="filmography")]:
        return parse_raw_content(raw_content=self._data[info.path[-1]])


AccountEdge.model_rebuild()


class Query(GQL):
    @staticmethod
    async def account_by_username(
        username: str, qb: QueryBuilder = Depends(get_qb)
    ) -> Account:
        s, v = qb.build()
        q = f"""select Account {s} filter .username = <str>$username"""
        print(q)
        account_d = await query_required_single_json(
            name="account_by_username", query=q, username=username, **v
        )
        return Account(**account_d)

    @staticmethod
    async def account_connection(
        info: Info,
        *,
        before: str | None = None,
        after: str | None = None,
        first: int,
    ) -> AccountConnection:
        qb: QueryBuilder = await Account.qb_config.from_info(
            info=info, node=node_from_path(node=info.node, path=["edges", "node"])
        )
        qb.fields.add("username")
        variables = {"first": first}
        filter_list: list[str] = []
        if before:
            filter_list.append(".username > <str>$before")
            variables["before"] = before
        if after:
            filter_list.append(".username < <str>$after")
            variables["after"] = after
        if filter_list:
            filter_s = f'filter {" and ".join(filter_list)} '
        else:
            filter_s = ""
        qb.add_variables(variables, replace=False)
        s, v = qb.build()
        q = f"""
        with
            all_accounts := (select Account),
            _first := <int16>$first,
            accounts := (select all_accounts {filter_s}order by .username desc limit _first),
        select {{
            total_count := count(all_accounts),
            accounts := accounts {s}
        }}
        """
        connection_d = await query_required_single_json(
            name="account_connection", query=q, **v
        )
        total_count = connection_d["total_count"]
        _accounts = [Account(**d) for d in connection_d["accounts"]]
        connection = AccountConnection(
            page_info=AccountPageInfo(
                has_next_page=len(_accounts) == first and total_count > first,
                has_previous_page=after is not None,
                start_cursor=_accounts[0].username if _accounts else None,
                end_cursor=_accounts[-1].username if _accounts else None,
            ),
            total_count=total_count,
            edges=[
                AccountEdge(node=account, cursor=account.username)
                for account in _accounts
            ],
        )
        return connection


router = build_router(query_models=[Query])

app = FastAPI()

app.include_router(router, prefix="/graphql")

Now this same query:

{
  accountByUsername(username: "Cameron") {
    id
    username
    watchlist(limit: 100) {
      __typename
      ...on Movie {
        id
        title
        releaseYear
        actors {
          name
        }
      }
      ... on Show {
        id
        title
      }
    }
  }
}
executes with only one call to the database that looks like this:
select Account { id, username, watchlist: { typename := .__type__.name, Movie := (select [is Movie] { __typename := .__type__.name, id, release_year, title, actors: { name } }), Show := (select [is Show] { __typename := .__type__.name, id, title }) } LIMIT <int32>$limit } filter .username = <str>$username

The original query took around 180ms to execute and make 6 database calls.

The new query using QueryBuilders takes less than 30ms to execute and only makes one database call!

For this small example, the results are not so dramatic. But in production, on large datasets, the speed advantage can easily be 10x.