Chess DE Project
Introduction
This is a data engineering project that ingests data from Chess.com API into DuckDB, transforms the data using dbt on DuckDB and visualising the results on a Streamlit dashboard.
Architecture
The data engineering project stack contains the following:
- dltHub: Ingestion Layer to load the data into the data warehouse
- Dagster: To schedule and orchestrate the DAGs
- Postgres: To store and persist Dagster details
- DuckDB: Data Warehouse
- Streamlit: Dashboard Layer
Architecture Diagram

Project Structure
.
├── chess_dagster
│ ├── chess_dbt
│ ├── ├── dbt_project.yml
│ ├── ├── models/
│ ├── ├── tests/
│ ├── ├── udf/
│ ├── ├── ...
│ ├── chess_dlt
│ ├── ├── .dlt/
│ ├── ├── __init__.py
│ ├── ├── data_contracts.py
│ ├── chess_etl
│ ├── ├── assets_dbt.py
│ ├── ├── assets_dlt.py
│ ├── ├── definitions.py
│ ├── ├── resources.py
│ ├── dagster.yaml
│ ├── Dockerfile_dagster
│ ├── Dockerfile_user_code
│ ├── profiles.yml
│ ├── pyproject.toml
│ ├── workspace.yaml
├── data
│ ├── chess.duckdb
│ ├── ...
├── streamlit_dashboard
│ ├── app.py
│ ├── Dockerfile
│ └── requirements.txt
├── .env.example
├── docker-compose-dashboard.yml
├── docker-compose.yml
├── Makefile
└── README.md
Running Chess Dashboard
Run locally
To run locally, you’ll need:
Clone the repo, create a .env
file and run the following commands to start the data pipeline:
git clone https://github.com/jkwd/chess_dashboard.git
cd chess_dashboard
-
make init
to create an.env
file from.env.example
- Edit the
CHESS_USERNAME
in the.env
file to your username -
make up
to build and start the docker container - Go to
http://localhost:3000
to view the Dagster UI - Materialize all assets
- Go to
http://localhost:8501
to view the Streamlit Dashboard -
make down
to stop the containers
Github Codespaces
- Fork/Clone
https://github.com/jkwd/chess_dashboard.git
to your own Repository - Open in Codespaces
-
make init
to create an.env
file from.env.example
- Edit the
CHESS_USERNAME
in the.env
file to your username -
make up
to build and start the docker container - Find the forwarded addresses in
PORTS
section of the Code Editor

- Go to Forwarded address for port
3000
to view the Dagster UI - Materialize all assets
- Go to Forwarded address for port
8501
to view the Streamlit Dashboard -
make down
to stop the containers - Stop/Delete Codespaces when you are done
Running Dagster Job
Click on Assets Tab on the top

Click on View global asset ineage at the top right of the page

Click on Materialize All
Implementation
Dagster as an Orchestrator
Taking it directly from the docs, Dagster is an open source data orchestrator built for data engineers, with integrated lineage, observability, a declarative programming model, and best-in-class testability.
There are many components to Dagster, but for this project we mainly focus on the following:
- Asset: The logical unit of data, e.g. table, dataset, ML model. This forms the nodes in the lineage graph. In this project, the assets are the dlt asset and the dbt asset.
- Resource: External dependencies such as APIs, databases, or anything outside of Dagster. In this project, the resources are DuckDB, dlt and dbt.
- Jobs: A subset of Assets. E.g.
some_job_1
can contain assetsA, B, C
andsome_job_2
can contain assetsW, X, Y, Z
. - Schedule: A way to automate jobs/assets at a given interval (e.g. run everyday at 0000 UTC)
- Definitions: The top-level construct of the Dagster project which contains references to all the objects, e.g. Asset, Resources, Schedules, Jobs, etc.
DuckDB as the Data Warehouse
In simple terms, DuckDB is the online analytical processing (OLAP) version of SQLite. It is easy to install with a pip install duckdb
and its completely embedded within the host system. It is free, fast, portable and has lots of features. It is simple and perfect for a single-node project like this.
Ingestion using dlt+dagster
dlt allows us to load data from source system to a destination system using python.
There are 4 main components to dlt:
- Source: The group of resources we plan to get the data from. E.g. API endpoint or Postgres DB
- Resource: A function that yields the data
- Destination: The location we want the data to land. E.g. S3, Snowflake, DuckDB
- Pipeline: The main building block of dlt which orchestrates the loading of data from your source into your destination in three discrete steps
- Extracts data from the source to the hard drive
- Inspects and normalizes your data and computes a schema compatible with your destination. E.g.
{"items": {"id": 1}}
will becomeitems__id
. You can control the normalization phase and apply data schema contracts. - Loads the data into the destination and run schema migrations if necessary
Configuring the Source
A source can consist of a group of resources. So let’s start with the resource first:
# chess_dagster/chess_dlt/__init__.py
@dlt.resource(write_disposition="replace", columns=PlayersGames)
def player_games(username: str) -> Generator[Any, Any, Any]:
"""
Yields player's `username` games.
Args:
username: str: Player username to retrieve games for.
Yields:
Generator[Any, Any, Any]: A generator that return a list of games for a player.
"""
def _get_player_archives(username: str) -> List:
"""
Returns url to game archives for a specified player username.
Args:
username: str: Player username to retrieve archives for.
Yields:
List: List of player archive data.
"""
data = requests.get(f"https://api.chess.com/pub/player/{username}/games/archives")
return data.json().get("archives", [])
# get archives in parallel by decorating the http request with defer
@dlt.defer
def _get_games(url: str) -> List[Dict[str, Any]]:
"""
Returns games of the specified player username from the given archive url.
Args:
url: str: URL to the archive to retrieve games from in the format https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM}
Yields:
List: List of player's games data.
"""
logger.info(f"Getting games from {url}")
try:
games = requests.get(url).json().get("games", [])
return games # type: ignore
except requests.HTTPError as http_err:
# sometimes archives are not available and the error seems to be permanent
if http_err.response.status_code == 404:
return []
raise
except Exception as err:
logger.error(f"Unexpected error: {err}")
raise
archives = _get_player_archives(username)
for url in archives:
# the `url` format is https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM}
# get the filtered archive
yield _get_games(url)
Here we have 1 resource player_games
which gets the games of the player.
Where we have configured 2 things:
-
write_disposition="replace"
: This means that every time we get new data, we will replace the entire table. -
columns=PlayersGames
: This is where we apply a data schema contract during using Pydantic the normalize phase of the pipeline to ensure that the API returns the columns that we expect and in the right data type.
Once we have the resource data, we can plug it into the source as such:
# chess_dagster/chess_dlt/__init__.py
@dlt.source(name="chess")
def source(username: str):
return (
player_games(username=username)
)
As mentioned earlier, a dlt.source
consist of a group or resources. So if there is another resource then we just need to add it into the return statement tuple.
The dagster asset created from the source and resource will have the name of dlt_<source_def_name>_<resource_def_name>
. So in the case of the project it will be identified as dlt_source_player_games
. If you remember when configuring @dlt.source(name="chess")
we have the additional name
parameter, this overwrites the source function name. So it will finally resolve to dlt_chess_player_games
to be referenced by other downstream assets.
Now that we have the source and resources done, let’s integrate dlt with dagster.
Integrating dlt with dagster
You can follow this guide to integrate dagster+dlt.
This requires:
- Creating the dagster resource for dlt (Not to be confused with dlt resource)
- Creating of the dlt asset
- Setting the dlt asset and dagster resource for dlt into the dagster definition
# chess_dagster/chess_etl/resources.py
from dagster_embedded_elt.dlt import DagsterDltResource
dlt_resource = DagsterDltResource()
Configuring the dagster resource for dlt is as simple as the 2 lines above. This resource will then be used in the Dagster definition. We will come back to the Dagster definition in the later section after configuring the dbt asset and resources as well.
# chess_dagster/chess_etl/assets_dlt.py
@dlt_assets(
dlt_source=source(
username=os.getenv("CHESS_USERNAME")
),
dlt_pipeline=pipeline(
pipeline_name="chess_pipeline",
destination=destinations.duckdb(os.getenv('CHESS_DB')), # The path to the .duckdb file
dataset_name=SCHEMA_RAW, # This is the table schema in duckdb.
),
name="chess", # This is the table catalog in duckdb.
)
def chess_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
The above code creates the dlt asset. dlt allow us to just focus on getting the data into the required format. If you noticed, the creation and insert/overwrite/deletion of schema and table are all handled by dlt.
In DuckDB, the table details will be as follows when the data has been loaded given that SCHEMA_RAW='chess_data_raw'
:
table_catalog | table_schema | table_name |
---|---|---|
chess | chess_data_raw | player_games |
Transformation using dbt+dagster
dbt is a transformation workflow that allows you to modularize and centralize your analytics code, while also providing your data team with guardrails typically found in software engineering workflows. It allows engineers to transform data in the warehouse more effectively and its the T in the ELT framework.
There are many components to dbt, but for this project we mainly focus on the following:
- Models: SQL/python queries that define data transformations
- Tests: Ensure data quality by validating on the models
- Documentation: Documentation of the table and columns as well as providing the data lineage
Configuring the dbt project
# dbt_project.yml
name: 'chess_dbt'
version: '1.0.0'
profile: 'chess_dbt'
model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]
target-path: "target"
clean-targets:
- "target"
- "dbt_modules"
- "logs"
require-dbt-version: [">=1.0.0", "<2.0.0"]
vars:
username: "{{env_var('CHESS_USERNAME')}}"
models:
chess_dbt:
materialized: table
staging:
materialized: view
# profiles.yml
chess_dbt:
target: dev
outputs:
dev:
...
ci:
...
prod:
type: duckdb
path: "{{ env_var('CHESS_DB') }}"
threads: 1
module_paths:
- "{{ env_var('DAGSTER_APP') }}/chess_dbt/udf"
plugins:
# Custom module in the lib directory that defines SQL UDFs written in Python at the start of
# the dbt run
- module: my_custom_functions
Above are the configurations for the dbt_project.yml
and profiles.yml
.
Configuring the dbt sources to point to the dlt assets
# schema.yml
sources:
- name: chess_source
schema: chess_data_raw
tables:
- name: player_games
meta:
dagster:
asset_key: ["dlt_chess_player_games"]
columns:
Early in this, I have explained that the resulting asset from dlt to dagster will be dlt_chess_player_games
. When configuring the dbt source, we will need to point that source to the dagster asset for the lineage to work. We introduce the meta
with the necessary dagster asset_key
.
Using python UDFs
In the profiles.yml
you may notice something less familiar under the module_paths
and plugins
. DuckDB allows us to register Python UDFs which can then be used as a function in our SQL models. So we create the UDF in my_custom_functions.py
file located at the specified module_paths.
# chess_dagster/chess_dbt/udf/my_custom_functions.py
from duckdb import DuckDBPyConnection
from dbt.adapters.duckdb.plugins import BasePlugin
from dbt.adapters.duckdb.utils import TargetConfig
import chess.pgn
from io import StringIO
import chess
def pgn_to_fens_udf(pgn: str) -> list[str]:
"""Takes in a PGN and go move by move to get the FEN of the board at each move.
Returns a list of fen strings.
Args:
pgn (str): pgn of the game
Returns:
arr (list[str]): fen strings of the board at each move
"""
pgn_header = pgn.split('\n\n')[0]
pgn_moves = pgn.split('\n\n')[1]
if 'Chess960' not in pgn_header:
pgn = pgn_moves
arr = []
game = chess.pgn.read_game(StringIO(pgn)).game()
board = game.board()
for move in game.mainline_moves():
board.push(move)
fen = board.fen()
arr.append(fen)
return arr
# The python module that you create must have a class named "Plugin"
# which extends the `dbt.adapters.duckdb.plugins.BasePlugin` class.
class Plugin(BasePlugin):
def configure_connection(self, conn: DuckDBPyConnection):
conn.create_function("pgn_to_fens_udf", pgn_to_fens_udf)
Above is an example of how we can configure the the UDF and in the SQL model, we can do something like below to use it:
select
pgn_to_fens_udf('some_pgn') as fens
from some_table
Technically, dbt has released python models which would have allowed us to just create the model in python instead of using UDF with SQL. However by doing this, we may restrict ourselves unintentionally. As dbt is a more SQL-first framework, some features such as dbt unit-tests are only supported with SQL models.
Integrating dbt with dagster
You can follow this guide to integrate dagster+dbt.
This requires:
- Creating the dagster resource for dbt
- Creating of the dbt asset in Dagster
- Setting the dbt asset and dagster resource for dbt into the dagster definition
# chess_dagster/chess_etl/resources.py
from dagster_dbt import DbtCliResource
from pathlib import Path
import os
HOME_DIR = os.getenv("HOME")
dbt_project_dir = Path(__file__).joinpath("..", "..", "chess_dbt").resolve()
dbt_resource = DbtCliResource(project_dir=os.fspath(dbt_project_dir),
profiles_dir=os.path.join(HOME_DIR, ".dbt"),
global_config_flags=["--log-format-file", "text"],
target="prod")
# If DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is set, a manifest will be created at run time.
# Otherwise, we expect a manifest to be present in the project's target directory.
if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path = (
dbt_resource.cli(
["--quiet", "parse"],
target_path=Path("target"),
)
.wait() # wait for the parse command to finish as dbt_resource.cli() creates a subprocess
.target_path.joinpath("manifest.json")
)
else:
dbt_manifest_path = dbt_project_dir.joinpath("target", "manifest.json")
Setting up the dbt resources is not as straightforward as dlt’s. Like any other dbt projects, we may need to specify the path to the dbt_project.yml
and profiles.yml
.
In addition to the 2 files, Dagster also expects the dbt manifest.json
to be present. The manifest contains all the dbt model, tests, macros, etc and it’ll be used by Dagster to create the respective Dagster Assets and also display them on the lineage graph in the UI. We have DAGSTER_DBT_PARSE_PROJECT_ON_LOAD
in our .env
file to give us the flexibility to provide our own manifest file or to parse one at “runtime”.
# chess_dagster/chess_etl/assets_dbt.py
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
from chess_etl.resources import dbt_manifest_path
@dbt_assets(manifest=dbt_manifest_path)
def chess_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build", "--target", "prod"], context=context).stream()
Using the manifest.json
and the DbtCliResource
, we can now define our Dagster dbt asset. We can specify the dbt command that we want in dbt.cli()
.
Dagster definitions to integrate dlt+dbt into Dagster
# chess_dagster/chess_etl/definitions.py
from dagster import (
Definitions,
ScheduleDefinition,
define_asset_job,
)
from chess_etl.assets_dlt import chess_dlt_assets
from chess_etl.assets_dbt import chess_dbt_assets
from chess_etl.resources import dlt_resource, dbt_resource
all_assets_job = define_asset_job(name="all_assets_job")
daily_refresh_schedule = ScheduleDefinition(
job=all_assets_job, cron_schedule="0 0 * * *"
)
# Must be last
defs = Definitions(
assets=[chess_dlt_assets, chess_dbt_assets],
resources={
"dlt": dlt_resource,
"dbt": dbt_resource,
},
schedules=[daily_refresh_schedule],
jobs=[all_assets_job]
)
In the definitions, we will define the 2 resources and assets. We can also create a Job and Schedule for that Job. Everything will be part of the Definitions()
that will be used to power the Dagster app.
Deploying Dagster on Docker
You can follow this guide and this github to deploy the project on Dagster

Dagster allows us to seperate the user code from the system code. This allows for seperation of concerns and any crashes from the user code will not crash Dagster. Each user code repository and the Dagster system can run on their own python environment as well which reduces the dependencies of each other.
You may view the following code in the links provided to set up Dagster on Docker:
Streamlit Dashboard
Once the data has been populated, we can view the results in the streamlit dashboard under http://localhost:8501/
.
CI Checks
name: Docker CI Workflow
on:
workflow_dispatch:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
jobs:
run-ci-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Create .env file
run: cp .env.example .env
- name: Check docker-compose
run: docker compose version
- name: Check docker-compose format
run: docker compose -f docker-compose.yml config
- name: Install Make
run: sudo apt-get install make
- name: Build the docker compose images
run: make build
- name: Run python lint
run: make lint-python
- name: Run SQL lint
run: make lint-sql
- name: Run pytest
run: make pytest
- name: Run dbt unit test
run: make dbt-unit-test
- name: Run e2e
run: make run
generate-dbt-docs:
runs-on: ubuntu-latest
needs: run-ci-tests
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: print working directory
run: pwd
- name: list directory
run: ls -l
- name: Install dbt
run: pip3 install dbt-duckdb==1.9.0
- name: Verify dbt
run: dbt --version
- name: dbt parse
working-directory: ./chess_dagster/chess_dbt
env:
CHESS_USERNAME: magnuscarlsen
run : |
dbt parse --profiles-dir .. --project-dir . --target ci
- name: generate dbt docs
working-directory: ./chess_dagster/chess_dbt
env:
CHESS_USERNAME: magnuscarlsen
run : |
dbt docs generate --profiles-dir .. --project-dir . --empty-catalog --no-compile --target ci
cd target
mkdir ${{ github.workspace }}/docs
cp *.json *.html ${{ github.workspace }}/docs
ls -ltra ${{ github.workspace }}/docs
- name: "Upload pages to artifact"
uses: actions/upload-pages-artifact@v3
with:
path: ${{ github.workspace }}/docs
- name: "Zip artifact"
run: zip -jrq docs.zip ${{ github.workspace }}/docs
- name: "Upload artifact for deployment job"
uses: actions/upload-artifact@v4
with:
name: docs
path: docs.zip
# Deploy to Github pages
deploy-to-github-pages:
# Add a dependency to the build job
needs: generate-dbt-docs
# Grant GITHUB_TOKEN the permissions required to make a Pages deployment
permissions:
pages: write # to deploy to Pages
id-token: write # to verify the deployment originates from an appropriate source
# Deploy to the github-pages environment
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
# Specify runner + deployment step
runs-on: ubuntu-latest
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4 # or the latest "vX.X.X" version tag for this action
For CI, we use Github Actions to perform the checks and publish dbt docs for reference:
- Lint python using ruff
- Lint SQL using sqlruff
- Run pytests for python based unit tests
- Run dbt unit tests
- Run Dagster Job end-to-end to ensure everything works (Job is relatively fast)
- Generate dbt docs
- Publish dbt docs to github pages (You can view the generated dbt docs here)
Step 1-5 are being run on the docker container to ensure consistency in the results.
Enjoy Reading This Article?
Here are some more articles you might like to read next: