Advanced Tutorial
Tip
I am still working on the docs. Feel free to DM me on Twitter if you have questions or feedback.
The reason I built FastGQL was to address the n+1
problem I had with other frameworks.
Consider this working example using EdgeDB
as the database:
Full file 👀
import json
import time
import typing as T
from uuid import UUID
import edgedb
from pydantic import TypeAdapter
from fastapi import FastAPI
from fastgql import GQL, GQLInterface, build_router
from dotenv import load_dotenv
load_dotenv()
edgedb_client = edgedb.create_async_client()
Contents = list[T.Union["Movie", "Show"]]
async def query_required_single_json(
name: str, query: str, **variables
) -> dict[str, T.Any]:
start = time.time()
res = json.loads(
await edgedb_client.query_required_single_json(query=query, **variables)
)
took_ms = round((time.time() - start) * 1_000, 2)
print(f"[{name}] took {took_ms} ms")
return res
class Account(GQL):
id: UUID
username: str
async def watchlist(self, limit: int) -> Contents:
q = """select Account {
watchlist: { id, title, release_year := [is Movie].release_year } limit <int64>$limit
} filter .id = <uuid>$id"""
account_d = await query_required_single_json(
name="account.watchlist", query=q, id=self.id, limit=limit
)
return TypeAdapter(Contents).validate_python(account_d["watchlist"])
class Person(GQL):
id: UUID
name: str
async def filmography(self) -> Contents:
q = """select Person {
filmography: { id, title, release_year := [is Movie].release_year }
} filter .id = <uuid>$id"""
person_d = await query_required_single_json(
name="person.filmography", query=q, id=self.id
)
return TypeAdapter(Contents).validate_python(person_d["filmography"])
class Content(GQLInterface):
id: UUID
title: str
async def actors(self) -> list["Person"]:
q = """select Content { actors: { id, name } } filter .id = <uuid>$id"""
content_d = await query_required_single_json(
name="content.actors", query=q, id=self.id
)
return [Person(**p) for p in content_d["actors"]]
class Movie(Content):
release_year: int
class Show(Content):
async def seasons(self) -> list["Season"]:
q = """select Show { season := .<show[is Season] { id, number } } filter .id = <uuid>$id"""
show_d = await query_required_single_json(
name="show.seasons", query=q, id=self.id
)
return [Season(**s) for s in show_d["season"]]
async def num_seasons(self) -> int:
q = """select Show { num_seasons } filter .id = <uuid>$id"""
show_d = await query_required_single_json(
name="show.num_seasons", query=q, id=self.id
)
return show_d["num_seasons"]
class Season(GQL):
id: UUID
number: int
async def show(self) -> Show:
q = """select Season { show: { id, title } } filter .id = <uuid>$id"""
season_d = await query_required_single_json(
name="season.show", query=q, id=self.id
)
return Show(**season_d["show"])
class Query(GQL):
@staticmethod
async def account_by_username(username: str) -> Account:
q = """select Account { id, username } filter .username = <str>$username"""
account_d = await query_required_single_json(
name="account_by_username", query=q, username=username
)
return Account(**account_d)
router = build_router(query_models=[Query])
app = FastAPI()
app.include_router(router, prefix="/graphql")
For each connection, we have to make a new query. For example, if your query is:
{
accountByUsername(username: "Cameron") {
id
username
watchlist(limit: 100) {
__typename
...on Movie {
id
title
releaseYear
actors {
name
}
}
... on Show {
id
title
}
}
}
}
You get back a lot of nested data. For each nested data you get, that's another database call. For example, to get actors from a movie:
class Content(GQLInterface):
id: UUID
title: str
async def actors(self) -> list["Person"]:
q = """select Content { actors: { id, name } } filter .id = <uuid>$id"""
content_d = await query_required_single_json(
name="content.actors", query=q, id=self.id
)
return [Person(**p) for p in content_d["actors"]]
So, to execute this query, the server had to: 1) get the account by username from the database, 2) get the watchlist of that user from the database, 3) get the actor of each movie from the database
There are some solutions to make this process more efficient. One of them is using dataloaders.
However, even with a dataloader, you are still making new requests to the database for each new level of data you are requesting.
FastGQL comes with a way to solve this problem. It ships with QueryBuilder
functionality. This allows you to map your GraphQL schema to your database schema, which means you can dynamically generate the exact database query you need to fulfill the client's request.
Note
Currently QueryBuilder
only works with EdgeDB
.
Here is a full example of the same schema, now using the QueryBuilder
feature.
Full file 👀
import json
import time
import typing as T
from uuid import UUID
import edgedb
from fastapi import FastAPI
from fastgql import (
GQL,
GQLInterface,
build_router,
Link,
Property,
get_qb,
QueryBuilder,
Depends,
Info,
node_from_path,
)
from dotenv import load_dotenv
load_dotenv()
edgedb_client = edgedb.create_async_client()
Contents = list[T.Union["Movie", "Show"]]
def parse_raw_content(raw_content: list[dict, T.Any]) -> Contents:
w_list: Contents = []
for item in raw_content:
if item["typename"] == "default::Movie":
if movie := item.get("Movie"):
w_list.append(Movie(**movie))
elif item["typename"] == "default::Show":
if show := item.get("Show"):
w_list.append(Show(**show))
return w_list
async def query_required_single_json(
name: str, query: str, **variables
) -> dict[str, T.Any]:
start = time.time()
res = json.loads(
await edgedb_client.query_required_single_json(query=query, **variables)
)
took_ms = round((time.time() - start) * 1_000, 2)
print(f"[{name}] took {took_ms} ms")
return res
class AccountPageInfo(GQL):
has_next_page: bool
has_previous_page: bool
start_cursor: str | None
end_cursor: str | None
class AccountEdge(GQL):
cursor: str
node: "Account"
class AccountConnection(GQL):
page_info: AccountPageInfo
edges: list[AccountEdge]
total_count: int
def update_watchlist(child_qb: QueryBuilder, limit: int) -> None:
child_qb.set_limit(limit)
class Account(GQL):
def __init__(self, **data):
super().__init__(**data)
self._data = data
id: T.Annotated[UUID, Property(db_name="id")] = None
username: T.Annotated[str, Property(db_name="username")] = None
async def watchlist(
self, info: Info, limit: int
) -> T.Annotated[Contents, Link(db_name="watchlist", update_qbs=update_watchlist)]:
return parse_raw_content(raw_content=self._data[info.path[-1]])
class Content(GQLInterface):
def __init__(self, **data):
super().__init__(**data)
self._data = data
id: T.Annotated[UUID, Property(db_name="id")] = None
title: T.Annotated[str, Property(db_name="title")] = None
async def actors(
self, info: Info
) -> T.Annotated[list["Person"], Link(db_name="actors")]:
return [Person(**p) for p in self._data[info.path[-1]]]
class Movie(Content):
release_year: T.Annotated[int, Property(db_name="release_year")] = None
class Show(Content):
num_seasons: T.Annotated[int, Property(db_name="num_seasons")] = None
async def seasons(
self, info: Info
) -> T.Annotated[list["Season"], Link(db_name="<show[is Season]")]:
return [Season(**s) for s in self._data[info.path[-1]]]
class Season(GQL):
def __init__(self, **data):
super().__init__(**data)
self._data = data
id: T.Annotated[UUID, Property(db_name="id")] = None
number: T.Annotated[int, Property(db_name="number")] = None
async def show(self, info: Info) -> T.Annotated[Show, Link(db_name="show")]:
return Show(**self._data[info.path[-1]])
class Person(GQL):
def __init__(self, **data):
super().__init__(**data)
self._data = data
id: T.Annotated[UUID, Property(db_name="id")] = None
name: T.Annotated[str, Property(db_name="name")] = None
async def filmography(
self, info: Info
) -> T.Annotated[Contents, Link(db_name="filmography")]:
return parse_raw_content(raw_content=self._data[info.path[-1]])
AccountEdge.model_rebuild()
class Query(GQL):
@staticmethod
async def account_by_username(
username: str, qb: QueryBuilder = Depends(get_qb)
) -> Account:
s, v = qb.build()
q = f"""select Account {s} filter .username = <str>$username"""
print(q)
account_d = await query_required_single_json(
name="account_by_username", query=q, username=username, **v
)
return Account(**account_d)
@staticmethod
async def account_connection(
info: Info,
*,
before: str | None = None,
after: str | None = None,
first: int,
) -> AccountConnection:
qb: QueryBuilder = await Account.qb_config.from_info(
info=info, node=node_from_path(node=info.node, path=["edges", "node"])
)
qb.fields.add("username")
variables = {"first": first}
filter_list: list[str] = []
if before:
filter_list.append(".username > <str>$before")
variables["before"] = before
if after:
filter_list.append(".username < <str>$after")
variables["after"] = after
if filter_list:
filter_s = f'filter {" and ".join(filter_list)} '
else:
filter_s = ""
qb.add_variables(variables, replace=False)
s, v = qb.build()
q = f"""
with
all_accounts := (select Account),
_first := <int16>$first,
accounts := (select all_accounts {filter_s}order by .username desc limit _first),
select {{
total_count := count(all_accounts),
accounts := accounts {s}
}}
"""
connection_d = await query_required_single_json(
name="account_connection", query=q, **v
)
total_count = connection_d["total_count"]
_accounts = [Account(**d) for d in connection_d["accounts"]]
connection = AccountConnection(
page_info=AccountPageInfo(
has_next_page=len(_accounts) == first and total_count > first,
has_previous_page=after is not None,
start_cursor=_accounts[0].username if _accounts else None,
end_cursor=_accounts[-1].username if _accounts else None,
),
total_count=total_count,
edges=[
AccountEdge(node=account, cursor=account.username)
for account in _accounts
],
)
return connection
router = build_router(query_models=[Query])
app = FastAPI()
app.include_router(router, prefix="/graphql")
Now this same query:
{
accountByUsername(username: "Cameron") {
id
username
watchlist(limit: 100) {
__typename
...on Movie {
id
title
releaseYear
actors {
name
}
}
... on Show {
id
title
}
}
}
}
select Account { id, username, watchlist: { typename := .__type__.name, Movie := (select [is Movie] { __typename := .__type__.name, id, release_year, title, actors: { name } }), Show := (select [is Show] { __typename := .__type__.name, id, title }) } LIMIT <int32>$limit } filter .username = <str>$username
The original query took around 180ms to execute and make 6 database calls.
The new query using QueryBuilders
takes less than 30ms to execute and only makes one database call!
For this small example, the results are not so dramatic. But in production, on large datasets, the speed advantage can easily be 10x.