Databases
API reference for chia.database. These pages are generated from the docstrings in the source, so they stay in sync with the code.
Base
chia.database.base — shared machinery and contract for database nodes.
DatabaseNode is the abstract base for engine-specific database
nodes (SQLiteNode,
PostgresNode). It owns everything
that is engine-agnostic:
the three placement modes (placement group via
ColocatedNode, or a hard NodeAffinity pin vianode_id=/pin_to_current_node=True),the
_DbBoundChiaFnre-binding loop that injects the node’slocator(a file path, a DSN, …) and defaultconnect_optsinto every member call,spawned-tool tracking and the tools-then-placement-group
close().
It also declares each common member as an abstract staticmethod, so the
family is polymorphic: DatabaseNode cannot be instantiated, an engine
subclass missing a member fails at construction (not at first dispatch),
and engine-blind code can be written against the base:
def record(db: DatabaseNode, sql: str, params: tuple) -> None:
get(db.execute.chia_remote(sql, params))
The subclasses override every stub with a real @ChiaFunction;
the stubs carry the canonical signatures and semantics.
Portability caveat: the members share names, signatures, result types
(ExecResult, list[dict]), and DatabaseNode.transaction()’s
params-type dispatch — but SQL text is dialect-specific. At minimum the
parameter placeholders differ (see each subclass’s paramstyle: sqlite is
qmark — ? / :name — and postgres is format — %s /
%(name)s), and DDL idioms often do too. Engine-blind code must either
render SQL per db.paramstyle or stick to statements valid in both
dialects.
Contract notes the subclasses follow (asserted by their tests, not by the
base): write members are declared max_retries=0 (Ray task replay would
double-apply a committed-but-unreturned non-idempotent write), and read
members open their connection with an engine-appropriate read-only guard.
- class chia.database.base.ExecResult(rowcount: int, lastrowid: int | None = None, rows: list[dict] | None = None)[source]
Bases:
objectOutcome of one write statement, uniform across engines.
rowcountis the DBAPIcursor.rowcount(the total for executemany).lastrowidis sqlite-only (None for executemany and non-INSERT statements; always None on postgres — useRETURNINGand readrowsinstead).rowscarries fetched result rows when the statement returned any (e.g. postgresINSERT ... RETURNING id).
- class chia.database.base.DatabaseNode(locator: str, placement_group=None, require_colocated: bool = True, *, node_id: str | None = None, pin_to_current_node: bool = False, bundle_index: int = 0, reserve_bundle: dict | None = None, pg_strategy: str = 'STRICT_PACK', wait_for_pg: bool = True, pg_ready_timeout_s: float | None = None, connect_opts: dict | None = None)[source]
Bases:
ColocatedNode,ABCAbstract base for engine-specific database nodes.
Subclasses must define:
every abstract member below as a
@staticmethod @ChiaFunctiontaking the engine’s locator as the explicit first argument,paramstyle— the PEP 249 placeholder style of the engine’s SQL,spawn_query_tool— the engine’s LLM-facing MCP tool factory,
and may extend
_MEMBER_FNSwith engine-specific members (e.g.SQLiteNodeappendswal_checkpoint); the binding loop picks the extras up automatically.Set up placement and bind the member functions.
- Parameters:
locator – the engine-specific database address (an absolute file path for sqlite, a DSN/conninfo string for postgres), already validated/normalized by the subclass.
/ (placement_group / require_colocated / bundle_index) – reserve_bundle / pg_strategy / wait_for_pg / pg_ready_timeout_s: as in
ColocatedNode.node_id – pin every member to this Ray node via a hard
NodeAffinitySchedulingStrategy(soft=False) instead of a placement group — for state that already lives on a known machine. Mutually exclusive withplacement_group.pin_to_current_node – convenience for
node_id=<caller's node>. Requires an initialized Ray context.connect_opts – engine-specific connection defaults injected into every member call; per-call
connect_opts=overrides.
- paramstyle: str = ''
PEP 249 paramstyle of the SQL this node’s members expect (e.g. “qmark” for sqlite, “format” for postgres).
- close() None[source]
Stop spawned tools, then release the PG iff this node reserved it.
Tools are stopped BEFORE the PG is removed — a tool’s server actor may be pinned to this PG, so tearing the PG down first would orphan it. Idempotent.
- abstractmethod static init_schema(locator: str, script: str, *, connect_opts: dict | None = None) None[source]
Create the database (if creatable) and run a multi-statement DDL script. Idempotent
CREATE ... IF NOT EXISTSscripts encouraged.
- abstractmethod static executescript(locator: str, script: str, *, connect_opts: dict | None = None) None[source]
Run a multi-statement SQL script (no parameters).
- abstractmethod static add_column_if_missing(locator: str, table: str, column: str, type_decl: str, *, connect_opts: dict | None = None) bool[source]
ALTER TABLE ... ADD COLUMNiff absent; True iff added. Identifiers are regex-validated before any SQL interpolation.
- abstractmethod static schema(locator: str, *, connect_opts: dict | None = None) str[source]
Human-readable rendering of the current tables/indexes.
- abstractmethod static execute(locator: str, sql: str, params: tuple | dict = (), *, connect_opts: dict | None = None) ExecResult[source]
Run one statement in its own committed transaction. Declared with
max_retries=0(non-idempotent-replay guard).
- abstractmethod static executemany(locator: str, sql: str, seq_of_params: list, *, connect_opts: dict | None = None) ExecResult[source]
DBAPI
executemanyin one committed transaction;rowcountis the total across all parameter sets.
- abstractmethod static transaction(locator: str, ops: list, *, connect_opts: dict | None = None) list[ExecResult][source]
Run
opsatomically — all or nothing, in one transaction.Each op is
(sql, params)where the params TYPE selects the form:tuple/dict-> execute,list-> executemany,None-> bare execute. Any error rolls the whole batch back and re-raises.
- abstractmethod static query(locator: str, sql: str, params: tuple | dict = (), *, limit: int | None = None, connect_opts: dict | None = None) list[dict][source]
Rows as
list[dict];limit=Ncaps the fetch. Opened with the engine’s read-only guard, so accidental writes raise.
- abstractmethod static query_one(locator: str, sql: str, params: tuple | dict = (), *, connect_opts: dict | None = None) dict | None[source]
First row as a dict, or None.
- abstractmethod static query_value(locator: str, sql: str, params: tuple | dict = (), *, default: Any = None, connect_opts: dict | None = None) Any[source]
First column of the first row, or
defaultwhen there is no row. (An aggregate over an empty table yields a NULL row, returned as None, notdefault.)
Postgres Node
chia.database.postgres_node — generic PostgreSQL client node for chia flows.
SQL placeholders are psycopg’s
paramstyle = "format" (%s positional, %(name)s named) — SQL
written for SQLiteNode (? / :name) is NOT portable verbatim.
Placement: Colocation is NOT required for correctness — any
worker can reach the server over TCP — so require_colocated defaults to
False (members dispatch unpinned). The placement modes are still
available for latency or topology reasons (placement_group=,
node_id=, pin_to_current_node=True).
Server: this node does not manage one. The easy path:
docker run -d --name chia-pg -p 5432:5432 \
-e POSTGRES_PASSWORD=chia postgres:16
db = PostgresNode("postgresql://postgres:chia@<host>:5432/postgres")
A managed PostgresServerNode(ColocatedNode) (initdb / pg_ctl with the
data dir on one worker’s local disk — where colocation genuinely matters
for postgres) is deliberately deferred: it is blocked on distributing
postgres binaries to worker images, and running the official docker image
on the target host is usually the better answer.
Driver: psycopg (v3) via the optional extra pip install
'chia[postgres]'. All psycopg imports are lazy (inside function bodies),
so importing this module — and constructing a node from a string DSN —
works without the driver; only executing members (locally or on a worker)
needs it.
Concurrency model: each chia_remote call opens a fresh connection. Defaults:
autocommit=True with explicit conn.transaction() blocks for writes,
lock_timeout = 30s, 30 s connect timeout. Override per
node or per call via connect_opts (keys: connect_timeout_s,
lock_timeout_ms, statement_timeout_ms, application_name).
Read members run with default_transaction_read_only = on — a guard
against accidental writes, not an adversarial boundary.
Write semantics: write members are declared max_retries=0 (Ray task
replay would double-apply a committed-but-unreturned non-idempotent write).
lastrowid has no meaning on postgres — use INSERT ... RETURNING id
and read ExecResult.rows:
res = get(db.execute.chia_remote(
"INSERT INTO t (v) VALUES (%s) RETURNING id", ("x",)))
new_id = res.rows[0]["id"]
LLM tool read-only layering: PostgresNode.spawn_query_tool() connects
with default_transaction_read_only=on by default - fine for cooperative
LLMs. The real boundary is credentials: pass a GRANT SELECT-only
role’s DSN via spawn_query_tool(..., dsn=readonly_dsn).
- class chia.database.postgres_node.PostgresNode(dsn: str | dict, placement_group=None, require_colocated: bool = False, *, node_id: str | None = None, pin_to_current_node: bool = False, bundle_index: int = 0, reserve_bundle: dict | None = None, pg_strategy: str = 'STRICT_PACK', wait_for_pg: bool = True, pg_ready_timeout_s: float | None = None, connect_opts: dict | None = None)[source]
Bases:
DatabaseNodeGeneric PostgreSQL client node over one server/database (by DSN).
All members are
@staticmethod @ChiaFunctiontakingdsnas the explicit first argument;__init__re-binds them so instance calls injectself.dsn(and the node’sconnect_opts) automatically:db = PostgresNode("postgresql://postgres:chia@dbhost:5432/postgres") get(db.init_schema.chia_remote("CREATE TABLE IF NOT EXISTS t ...")) get(db.execute.chia_remote("INSERT INTO t (v) VALUES (%s)", ("x",))) rows = get(db.query.chia_remote("SELECT * FROM t"))
See the module docstring for the server story, paramstyle, and the read-only layering of the LLM tool.
Normalize
dsnand defer toDatabaseNode.- Parameters:
dsn – libpq conninfo string (
postgresql://user:pw@host/dborhost=... dbname=...) or a dict of conninfo parts (dict form requires psycopg locally).require_colocated – defaults to
False— postgres clients reach the server from any worker, so members dispatch unpinned unless a placement mode is requested.connect_opts – connection defaults injected into every member call (keys:
connect_timeout_s,lock_timeout_ms,statement_timeout_ms,application_name); per-callconnect_opts=overrides.
:param (remaining args as in
DatabaseNode.):- paramstyle: str = 'format'
PEP 249 paramstyle of the SQL this node’s members expect (e.g. “qmark” for sqlite, “format” for postgres).
- spawn_query_tool(name: str, *, read_write: bool = False, dsn: str | None = None, **tool_kwargs) PostgresQueryTool[source]
Create a
PostgresQueryToolfor this node’s database.No placement is required — any worker can reach the server — so this works on an unpinned node (the tool actor is then scheduled wherever Ray likes). Pass
dsn=to give the tool different credentials (e.g. aGRANT SELECT-only role — the real read-only boundary). The returned tool is tracked and stopped byclose().tool_kwargsforward toPostgresQueryTool(e.g.row_limit,cell_char_limit,total_char_limit,schemas).
- static init_schema(dsn: str, script: str, *, connect_opts: dict | None = None) None[source]
Run a multi-statement DDL
scriptin one transaction. IdempotentCREATE ... IF NOT EXISTSscripts are encouraged. (psycopg runs multi-statement strings only when no parameters are passed — which is the case here.)
- static executescript(dsn: str, script: str, *, connect_opts: dict | None = None) None[source]
Alias-level sibling of
init_schema()(postgres has no file/dirs to create, so the two are identical here; both exist to satisfy theDatabaseNodecontract).
- static add_column_if_missing(dsn: str, table: str, column: str, type_decl: str, *, connect_opts: dict | None = None) bool[source]
ALTER TABLE ... ADD COLUMN IF NOT EXISTS; True iff added.tablemay be schema-qualified (myschema.t; default schemapublic). Identifiers andtype_declare regex-validated before any SQL interpolation (and before the psycopg import, so validation errors don’t depend on the driver).
- static schema(dsn: str, *, schemas: tuple = ('public',), connect_opts: dict | None = None) str[source]
Human-readable table/index listing from
information_schemaandpg_indexesfor the given schemas. A simple rendering, not pg_dump fidelity.
- static execute(dsn: str, sql: str, params: tuple | dict = (), *, connect_opts: dict | None = None) ExecResult[source]
Run one statement in its own committed transaction. When the statement returns rows (
RETURNING, or a SELECT), they are attached asExecResult.rows
- static executemany(dsn: str, sql: str, seq_of_params: list, *, connect_opts: dict | None = None) ExecResult[source]
executemanyin one committed transaction;rowcountis the total across all parameter sets.
- static transaction(dsn: str, ops: list, *, connect_opts: dict | None = None) list[ExecResult][source]
Run
opsatomically in one transaction.Each op is
(sql, params)where the params TYPE selects the form:tupleordict->execute(sql, params)(single statement)list->executemany(sql, params)(bulk rows)None->execute(sql)(no params)
NOTE: single-statement params MUST be a tuple/dict — a list means executemany. Any error rolls the whole batch back and re-raises (surfacing as a Ray task error at
get()). Explicit BEGIN/COMMIT inside op SQL is unsupported. Returns oneExecResultper op (rows attached for ops that return them).
- static query(dsn: str, sql: str, params: tuple | dict = (), *, limit: int | None = None, connect_opts: dict | None = None) list[dict][source]
Rows as
list[dict].limit=Ncaps the fetch (protects the object store from unbounded SELECTs); default None fetches all. The session runs withdefault_transaction_read_only = on, so an accidental write raisesReadOnlySqlTransaction.
- class chia.database.postgres_node.PostgresQueryTool(name: str, dsn: str, task_options: dict | None = None, row_limit: int = 100, cell_char_limit: int = 4096, total_char_limit: int = 32768, read_write: bool = False, schemas: tuple = ('public',))[source]
Bases:
ChiaToolMCP tool exposing SQL over a PostgreSQL database
query/schemaconnect withdefault_transaction_read_only=onin the session options — a weak guard against LLM-authored writes (SQL containingSET default_transaction_read_only = offflips it). For an adversarial boundary, construct the tool with aGRANT SELECT-only role’s DSN. Errors are caught and returned as text; row and byte caps keep the tool result bounded even for an unfilteredSELECT *. Withread_write=Trueanexecutetool is additionally registered for single-statement writes.This class is deliberately self-contained (lazy
psycopgimports, no module-level helpers) so it can be copied verbatim into a driver’s__main__as a by-value-pickling escape hatch for worker images whose chia install predates this module.Initializes ChiaTool with a name and optional resource requirements.
- query(sql: str) str[source]
Execute
sqlagainst the database in a read-only session.Returns a pipe-delimited markdown-ish table with row/cell/byte caps and a trailing
NOTE:line when a cap fires.
Sqlite Node
chia.database.sqlite_node — generic colocated SQLite store for chia flows.
SQLiteNode is a DatabaseNode (and thus
a ColocatedNode) whose member
@ChiaFunctions all operate on one SQLite database file. Because
every member is pinned to the same machine, they all see the same file —
that colocation guarantee is what makes a plain on-disk SQLite database a
safe shared store for a distributed chia flow. SQL placeholders are
sqlite’s paramstyle = "qmark" (? positional, :name named); see
PostgresNode for the client-server
sibling.
Placement modes (decided once at construction):
default /
placement_group=...— standard ColocatedNode pinning; a fresh DB lands wherever the bundle does and all members co-locate there.
node_id=.../pin_to_current_node=True— hard NodeAffinity pin, for a DB file that already lives on a known machine (e.g. the head node’s local disk, as in gem5align’s alignment.db).
Concurrency model: each chia_remote call may run in a different worker
process on the pinned machine, so every member opens a fresh connection.
Defaults: WAL journal mode (readers never block the writer), 30 s busy
timeout, BEGIN IMMEDIATE write transactions (concurrent writers queue on
the lock instead of failing mid-transaction), synchronous=NORMAL,
foreign_keys=ON. Override per node or per call via connect_opts
(keys: busy_timeout_s, wal, synchronous, foreign_keys).
Warning
Never point db_path at NFS or other shared/network storage. WAL’s
shared-memory coordination only works between processes on one machine
with a local filesystem — which is exactly what the colocation guarantee
provides. On network filesystems WAL can corrupt the database.
Data guidance: rows travel through the Ray object store as list[dict]
(BLOB columns round-trip as bytes). Store large artifacts as files and
put paths in the DB; multi-MB blobs inflate the object store and every
get().
Write semantics: write members are declared with max_retries=0 — Ray’s
task replay after a worker death would double-apply a committed-but-
unreturned non-idempotent write. Callers that know a write is idempotent
can opt back in via node.execute.options(max_retries=...).
Example — gem5align’s AlignmentDB expressed on this node:
db = SQLiteNode("/abs/path/alignment.db", pin_to_current_node=True)
get(db.init_schema.chia_remote(ITERATIONS_SCHEMA + BENCH_SCHEMA))
get(db.add_column_if_missing.chia_remote("iterations", "base_rev", "TEXT"))
tool = db.spawn_query_tool("align_db") # read-only SQL for LLMs
# insert_iteration: INSERT + DELETE + bulk INSERT, atomically
get(db.transaction.chia_remote([
("INSERT OR REPLACE INTO iterations (...) VALUES (?, ...)", iter_row),
("DELETE FROM benchmark_results WHERE entry_id = ?", (entry_id,)),
("INSERT INTO benchmark_results (...) VALUES (?, ...)", bench_rows),
]))
max_iter = get(db.query_value.chia_remote(
"SELECT MAX(iteration) FROM iterations", default=-1))
Composite reads with Python-side logic (an AlignmentDB load_entry /
best_per_benchmark) belong in a domain subclass — append them to
_MEMBER_FNS and the binding machinery picks them up:
class AlignmentDBNode(SQLiteNode):
_MEMBER_FNS = SQLiteNode._MEMBER_FNS + ("load_entry",)
@staticmethod
@ChiaFunction(num_cpus=0.1)
def load_entry(db_path: str, entry_id: str, *,
connect_opts: dict | None = None) -> dict | None:
conn = _connect(db_path, read_only=True, connect_opts=connect_opts)
...
- chia.database.sqlite_node.SQLiteExecResult
Back-compat alias — the original name for the shared result dataclass.
- class chia.database.sqlite_node.SQLiteNode(db_path: str | PathLike, placement_group=None, require_colocated: bool = True, *, node_id: str | None = None, pin_to_current_node: bool = False, bundle_index: int = 0, reserve_bundle: dict | None = None, pg_strategy: str = 'STRICT_PACK', wait_for_pg: bool = True, pg_ready_timeout_s: float | None = None, connect_opts: dict | None = None)[source]
Bases:
DatabaseNodeColocated node exposing generic SQLite operations over one DB file.
All members are
@staticmethod @ChiaFunctiontakingdb_pathas the explicit first argument;__init__re-binds them so instance calls injectself.db_path(and the node’sconnect_opts) automatically:with SQLiteNode("/abs/path/state.db") as db: # reserves a PG get(db.init_schema.chia_remote("CREATE TABLE IF NOT EXISTS t ...")) get(db.execute.chia_remote("INSERT INTO t VALUES (?)", (1,))) rows = get(db.query.chia_remote("SELECT * FROM t"))
See the module docstring for placement modes, concurrency model, and the AlignmentDB mapping example.
Validate
db_pathand defer toDatabaseNode.- Parameters:
db_path – absolute path of the database file on the target machine (a relative path would resolve against a Ray worker’s cwd). Created on first write if absent.
connect_opts – connection defaults injected into every member call (keys:
busy_timeout_s,wal,synchronous,foreign_keys); per-callconnect_opts=overrides.
:param (remaining args as in
DatabaseNode.):- paramstyle: str = 'qmark'
PEP 249 paramstyle of the SQL this node’s members expect (e.g. “qmark” for sqlite, “format” for postgres).
- spawn_query_tool(name: str, *, read_write: bool = False, **tool_kwargs) SQLiteQueryTool[source]
Create a
SQLiteQueryToolco-located with this node.Exposes read-only SQL (and, with
read_write=True, single-statement writes) over MCP, pinned to the same machine as the members so the tool reads the same DB file. The returned tool is tracked and stopped byclose().tool_kwargsforward toSQLiteQueryTool(e.g.row_limit,cell_char_limit,total_char_limit).
- static init_schema(db_path: str, script: str, *, connect_opts: dict | None = None) None[source]
Create parent dirs and the DB file, then run
scriptviaexecutescript. IdempotentCREATE TABLE IF NOT EXISTS-style scripts are encouraged;executescriptsemantics apply (the script may contain its own BEGIN/COMMIT).
- static executescript(db_path: str, script: str, *, connect_opts: dict | None = None) None[source]
Run a multi-statement SQL script (no params — sqlite3 limitation). Like
init_schema()without the create-dirs step.
- static add_column_if_missing(db_path: str, table: str, column: str, type_decl: str, *, connect_opts: dict | None = None) bool[source]
ALTER TABLE ADD COLUMNiff the column is absent; True iff added.table/columnmust be plain identifiers andtype_decla benign type expression — they are interpolated into the SQL (identifiers cannot be parameters).
- static schema(db_path: str, *, connect_opts: dict | None = None) str[source]
CREATE TABLE / CREATE INDEX statements from
sqlite_master.
- static execute(db_path: str, sql: str, params: tuple | dict = (), *, connect_opts: dict | None = None) ExecResult[source]
Run one statement in its own committed transaction.
- static executemany(db_path: str, sql: str, seq_of_params: list, *, connect_opts: dict | None = None) ExecResult[source]
executemanyin one committed transaction;rowcountis the total across all parameter sets.
- static transaction(db_path: str, ops: list, *, connect_opts: dict | None = None) list[ExecResult][source]
Run
opsatomically: BEGIN IMMEDIATE; each op in order; COMMIT.Each op is
(sql, params)where the params TYPE selects the form:tupleordict->execute(sql, params)(single statement)list->executemany(sql, params)(bulk rows)None->execute(sql)(no params)
NOTE: single-statement params MUST be a tuple/dict — a list means executemany. Any sqlite error rolls the whole batch back and re-raises (surfacing as a Ray task error at
get()). Explicit BEGIN/COMMIT inside op SQL is unsupported. Returns oneSQLiteExecResultper op.
- static query(db_path: str, sql: str, params: tuple | dict = (), *, limit: int | None = None, connect_opts: dict | None = None) list[dict][source]
Rows as
list[dict].limit=Ncaps the fetch (protects the object store from unbounded SELECTs); default None fetches all. The connection is opened withPRAGMA query_only=ON, so an accidental write raisesOperationalError.
- static query_one(db_path: str, sql: str, params: tuple | dict = (), *, connect_opts: dict | None = None) dict | None[source]
First row as a dict, or None.
- static query_value(db_path: str, sql: str, params: tuple | dict = (), *, default: Any = None, connect_opts: dict | None = None) Any[source]
First column of the first row, or
defaultwhen there is no row (e.g.query_value("SELECT MAX(iteration) FROM t", default=-1)— note an aggregate over an empty table yields a NULL row, returned as None, notdefault).
- class chia.database.sqlite_node.SQLiteQueryTool(name: str, db_path: str, task_options: dict | None = None, row_limit: int = 100, cell_char_limit: int = 4096, total_char_limit: int = 32768, read_write: bool = False)[source]
Bases:
ChiaToolMCP tool exposing SQL over a SQLite DB file (generalized from gem5align’s
AlignmentDbQueryTool).query/schemaopensqlite3.connect(..., uri=True, mode=ro)— the hard safety boundary against LLM-authored SQL: any INSERT / UPDATE / DELETE / DDL raisesOperationalError, which is caught and returned as text. Row and byte caps keep the tool result bounded even when the LLM writes an unfilteredSELECT *. Withread_write=Trueanexecutetool is additionally registered for single-statement writes.WAL caveat: a
mode=roopen can fail if the-shmsidecar needs recovery; the error string is returned to the LLM. A co-resident writer (the owningSQLiteNode) normally keeps the sidecars live.This class is deliberately self-contained (stdlib
sqlite3only, no module-level helpers) so it can be copied verbatim into a driver’s__main__as a by-value-pickling escape hatch for worker images whose chia install predates this module — see the module docstring.Initializes ChiaTool with a name and optional resource requirements.
- query(sql: str) str[source]
Execute
sqlread-only against the database.Returns a pipe-delimited markdown-ish table. Rows capped at
self.row_limit; cells truncated toself.cell_char_limitchars with…marker; total result capped atself.total_char_limitbytes. A trailingNOTE:line flags any cap that fired so the caller can narrow the query.