Base Runtime
API reference for chia.base. These pages are generated from the docstrings in the source, so they stay in sync with the code.
Bypass
General-purpose bypass for ChiaFunction tasks.
When bypass is active, a function is still dispatched through Ray (testing cluster scheduling, placement groups, resource allocation) but the real computation is replaced with pre-recorded data.
How it works
The bypass check happens on the caller side (in chia_remote()).
When a function is bypassed:
The caller computes the mock data using the registered provider.
Instead of dispatching the real function, it dispatches a trivial
_bypass_return(data)function with the same scheduling options (placement group, resources, etc.).The worker receives the pre-computed data and just returns it.
Usage
from chia.base.bypass import Bypass
# Create a bypass instance — always, as part of loop setup.
# If yaml_path is None, bypass does nothing (all functions run normally).
bypass = Bypass(yaml_path=args.bypass_config)
# Register project-specific providers that build the correct dataclasses.
bypass.set_provider("my_function", my_provider_fn)
# Optionally gate the bypass on a runtime condition (default: always on).
bypass.set_cond("my_function", my_cond_fn)
# Done — the ChiaFunction trampoline checks the active bypass automatically.
YAML format
bypass:
func_name:
bypass: true
data: /path/to/file.json # optional
# shorthand (bypass, no data):
simple_function: true
# not listed = not bypassed (default, no error)
Provider signature
The provider runs on the worker (same scheduling as the real function):
def my_provider(tag, data_path, *args, **kwargs) -> ReturnType
tag: the_chia_tagfrom the caller (str or None)data_path: the YAMLdatafield (str or None)*args, **kwargs: the original function arguments
Resolution when a function is bypassed:
1. Provider registered -> provider(tag, data_path, *args, **kwargs)
2. Only data_path -> returns file contents (default file provider)
3. Neither -> error
Condition gate
A function may also register a condition via set_cond. It runs on the
caller as the last gate in is_bypassed — after the bypass flag, the
provider/data check, and the tag patterns — and decides whether the bypass
actually happens:
def my_cond(tag, data_path, *args, **kwargs) -> bool
A falsy return means the call runs for real instead of being bypassed. With no
condition registered the default is to bypass (True). Useful to gate on
runtime state, e.g. only replay from the cache when the value is present.
- chia.base.bypass.get_active_bypass()[source]
Return the active Bypass instance, or None if bypass is not in use.
- class chia.base.bypass.Bypass(yaml_path: str | None = None)[source]
Bases:
objectControls which ChiaFunction tasks return pre-recorded data.
Create one instance per loop run, always, as part of setup:
bypass = Bypass(yaml_path=args.bypass_config)
If
yaml_pathis None, bypass does nothing —is_bypassed()returns False for all functions. No errors, no special handling.If
yaml_pathis a path, the YAML is parsed and functions listed withbypass: truewill be bypassed (if they also have a provider or data path).The constructor registers itself as the active bypass instance. The ChiaFunction trampoline checks this automatically.
- file_server()[source]
Return the bypass file server actor handle, creating it if needed.
Custom providers can use this to read arbitrary files from the node that owns this Bypass:
def my_provider(tag, data_path, *args, **kwargs): server = get_active_bypass().file_server() text = ray.get(server.get_text.remote(data_path)) ...
- set_provider(func_name: str, provider: Callable) None[source]
Register a provider for func_name.
The provider runs on the worker (not the head node) and replaces the real function when it is bypassed. It is called as:
provider(tag, data_path, *args, **kwargs) -> return_value
tag: the
_chia_tagfrom the caller (str or None)data_path: the YAML
datafield for this function (str or None)args, **kwargs: the original function arguments, passed through
- set_cond(func_name: str, cond: Callable) None[source]
Register a bypass condition for func_name.
The condition is an additional gate, evaluated on the caller as the last step of
is_bypassed()— after the bypass flag, the provider/data check, and the tag patterns. It is called as:cond(tag, data_path, *args, **kwargs) -> bool
with the same arguments as the provider (
set_provider()). If it returns a falsy value the call is not bypassed and runs for real; if it returns truthy the bypass proceeds. When no condition is registered for a function the default isTrue(no extra gate).Use it to make the bypass decision depend on runtime state, e.g. only replay from the cache when the value is actually present:
def cache_hit(tag, data_path, *args, **kwargs): return ray.get(get_active_cache().has.remote(tag)) bypass.set_cond("run_verilator_test", cache_hit)
- property active: bool
True if any functions are configured for bypass.
- is_bypassed(func_name: str, tag: str | None = None, *args, **kwargs) bool[source]
Should func_name be bypassed on this call?
- Returns True only when:
bypass[name] is True (from YAML)
There is a provider or a data_path
(if a
tagslist is configured) tag matches a pattern(if a condition is registered) the condition returns truthy
The tag parameter identifies a specific call (e.g. “iter0_opt2”). If the YAML has a
tagslist for this function, only calls whose tag matches a pattern are bypassed. If notagslist, all calls are bypassed regardless of tag.The optional
*args, **kwargsare the original call arguments; they are forwarded to a registered condition (seeset_cond()). The condition is the last gate: it runs only after tag matching has already passed, and a falsy return means the call runs for real. With no condition registered the default is to bypass (True). A condition may invoke remote calls (e.g. a cache lookup), so callers that useis_bypassed()as a cheap predicate should be aware it can do work when a condition is registered.Functions not listed in the YAML return False (not bypassed). If no YAML was loaded, always returns False.
- get_provider_info(func_name: str) tuple[Callable, str | None][source]
Return
(provider, data_path)for dispatching on the worker.- Resolution:
Provider registered ->
(provider, data_path_or_None)Only data_path (no provider) ->
(_default_file_provider, data_path)Neither -> KeyError
The provider is executed on the worker, not here. This method only looks up what to dispatch.
- get_state() dict[source]
Return a serializable dict of the bypass state, including providers.
Providers and conditions are serialized via cloudpickle (Ray’s default). This works for module-level functions and most common cases. Lambdas/closures capturing unpicklable objects will fail with a clear error.
The file server actor handle is included so workers can read files from the node that owns this Bypass.
- load_state(state: dict) None[source]
Restore bypass state from a dict (from
get_state()).
Cache
Head-node output cache for ChiaFunction tasks.
A small key/value store, pinned to the head node as a Ray actor, that pickles
arbitrary Python objects keyed by a string tag, with an LRU byte budget and a
warm-start scan from disk. It has two halves that work together with the
chia.base.bypass mechanism:
Write is automatic. A function marked
cache: truein the YAML has its real-run output written to the cache automatically byChiaFunction(via anObjectRefCallbackthat fires whenget()resolves the result), keyed by the call’s_chia_tag. No user code is needed.Read is manual, via bypass. The cache does not auto-serve. To replay a cached value, register a bypass provider that reads it back:
def cache_provider(tag, data_path, *args, **kwargs): hit, value = ray.get(get_active_cache().read.remote(tag)) if not hit: raise KeyError(f"cache miss for tag {tag!r}") return value bypass.set_provider("run_verilator_test", cache_provider)
Cache = write-through populate; bypass = read path. They share the tag.
Usage
from chia.base.cache import start_cache
# Call once on the driver after ray.init(). Idempotent.
start_cache(size=4, units="GB", cache_dir_path="/data/chia_cache",
yaml_path=args.bypass_config)
YAML format (same file as bypass, parallel cache: section)
cache:
run_verilator_test:
cache: true
tags: ["iter.*"] # optional; mirror bypass tag patterns
# shorthand (cache, no tags):
build_megaboom: true
- chia.base.cache.start_cache(size: float, cache_dir_path: str, units: str = 'B', yaml_path: str | None = None, namespace: str | None = None)[source]
Create (or find) the head-pinned cache actor. Idempotent.
Call from the driver after
ray.init(). Mirrorschia.trace.profiler.start_collector().The actor is created
detachedso it is reachable from workers in a different Ray job (e.g. a bypass provider running on a remote worker) — a plain named actor is only visible within its creating job. Pair it with an explicitnamespaceso cross-job lookups resolve it.stop_cachestill tears it down; cross-run reuse remains the on-disk pickles + warm start, not a lingering live actor.- Parameters:
size – Numeric budget; multiplied by
UNITS[units]to get bytes.cache_dir_path – Directory for the pickle files (on the head node).
units – One of
UNITS(default"B").yaml_path – Optional path to the bypass/cache YAML; the
cache:section opts functions into automatic caching.namespace – Optional Ray namespace for the named actor.
- Returns:
The
Cacheactor handle.
- chia.base.cache.stop_cache(namespace: str | None = None) None[source]
Kill the cache actor. Idempotent.
Cross-run reuse comes from the on-disk pickles + warm-start scan, not from a persisted live actor, so killing the actor loses nothing on disk.
- chia.base.cache.get_active_cache(namespace: str | None = None)[source]
Return the cache actor handle, or None if no cache was started.
Always resolves the current named actor via
ray.get_actor— no process-local handle is cached. A worker (e.g. inside a bypass provider or a nested dispatch) reaches the cache with no state threading, and a cache that was restarted/replaced is picked up automatically rather than serving a stale, dead handle.
- chia.base.cache.is_cached(func_name: str, tag: str | None = None) bool[source]
Should func_name’s output be cached on this call?
True only when the function is configured
cache: trueand, iftags:patterns are present, tagre.fullmatch``es one. Mirrors :meth:`chia.base.bypass.Bypass.is_bypassed`. The config is read from the actor (the source of truth) so it never goes stale across a cache restart; only tagged dispatches reach this path (see ``_maybe_wrap_cache).
Chia Function
- class chia.base.ChiaFunction.ChiaWrapped(*args, **kwargs)[source]
Bases:
Protocol[P,R]Protocol describing the object returned by
@ChiaFunction().Includes
__get__overloads so that mypy treats instances of this protocol as descriptors. When accessed on a class (obj is None), the fullParamSpecis preserved. When accessed on an instance (bound-method position), the return type falls back toChiaWrapped[..., R]because Python’s type system cannot express “P minus the first argument”. This avoids false positives on method calls while keeping full typing for standalone decorated functions.
- class chia.base.ChiaFunction.ChiaFunction(**kwargs)[source]
Bases:
objectDecorator that creates both a local and ray.remote version of a function.
Usage:
@ChiaFunction(resources={"db1": 1.0}) def my_function(x, y): return x + y # Local call: my_function(1, 2) # Remote call (returns ObjectRef): ref = ChiaCallRemote(my_function, 1, 2) result = get(ref) # Or attribute-based: ref = my_function.chia_remote(1, 2)
For bound methods on a class, the decorator uses a trampoline to serialize
selfand dispatch viaray.remote:class MyTool: @ChiaFunction() def do_work(self, x): return x * 2 tool = MyTool() ref = tool.do_work.chia_remote(tool, x=5)
Accept any keyword arguments supported by
ray.remote().options().Common options include
resources,num_cpus,num_gpus,num_returns,max_retries,retry_exceptions,memory,scheduling_strategy,max_calls,runtime_env,name,namespace,lifetime, etc. See Ray documentation for the full list.
- chia.base.ChiaFunction.ChiaCallRemote(func: ~chia.base.ChiaFunction.ChiaWrapped[~P, ~chia.base.ChiaFunction.R], *args: ~typing.~P, **kwargs: ~typing.~P) ObjectRef[R][source]
Call the ray.remote version of a @ChiaFunction-decorated function.
Returns a
ray.ObjectRef[R]whereRis the wrapped function’s return type, soget(...)can infer the final value type.
- class chia.base.ChiaFunction.ObjectRefCallback(ref, callback: Callable[[Any], Any])[source]
Bases:
objectAn ObjectRef paired with a callback that
get()runs on resolution.Lets a producer attach a post-resolution hook to the ref it hands back, so callers can stay async and simply
get()it — the callback runs at fetch time and its return becomesget’s result — without passingcallback=themselves.ClaudeCodeLLM.promptreturns one of these (carrying_sync_transcript) whenresume_session=True..refexposes the underlying ObjectRef forray.wait/chia_wait().
- class chia.base.ChiaFunction.ChiaActorHandle(actor)[source]
Bases:
objectThin proxy over a Ray actor handle that adds
chia_remoteto each method.Built by
chia_actor(). Attribute access returns a_ChiaActorMethod, sohandle.method.chia_remote(...)dispatches the call andget()resolves it — matching the@ChiaFunctionsurface.remotestill works, so existinghandle.method.remote(...)call sites are unaffected.Note: unlike a real
@ChiaFunctiondispatch,chia_remotehere is a plain alias for Ray’sremote— no profiling, bypass, or cache wrapping is applied; the call goes straight to the actor. Useactorto recover the raw Ray handle (e.g. forray.killor identity checks).- property actor
The underlying raw Ray actor handle.
- chia.base.ChiaFunction.chia_actor(handle)[source]
Wrap a Ray actor handle so its methods accept
chia_remoteand resolve viaget().Lets actor calls share the dispatch surface used by
@ChiaFunction:store = chia_actor(some_actor) n = get(store.size_bytes.chia_remote()) get(store.write.chia_remote(key, value))
chia_remotehere is a plain alias for Ray’sremote— it does NOT run the profiling / bypass / cache machinery thatChiaWrapped.chia_remotedoes; the call goes straight to the actor.remotestill works, so pre-existing call sites are unchanged. Recover the raw handle viahandle.actor(e.g. forray.kill). Idempotent: wrapping an already wrapped handle returns it unchanged.
- chia.base.ChiaFunction.get(ref: ObjectRefCallback, *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) Any[source]
- chia.base.ChiaFunction.get(ref: Sequence[ObjectRef[R]], *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) List[R]
- chia.base.ChiaFunction.get(ref: Sequence[ObjectRef[Any]], *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) List[Any]
- chia.base.ChiaFunction.get(ref: ObjectRef[R], *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) R
- chia.base.ChiaFunction.get(ref: Sequence[CompiledDAGRef], *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) List[Any]
- chia.base.ChiaFunction.get(ref: CompiledDAGRef, *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) Any
Wrapper around
ray.get()for use in Chia flows.Delegates to
ray.get()and post-processes the result through the Chia profiler. Overloads mirrorray.get()so a singleObjectRef[R]returnsRand a sequence of them returnsList[R].callback: when given, it is invoked with the resolved value (after the profiler unwrap) and its return value is whatgetreturns. This keeps dispatch async — the ref is produced non-blocking bychia_remote— while running a post-resolution side effect/transform only when the value is fetched (e.g.get(ref, callback=fn)).A
ObjectRefCallbackcarries its own callback, soget(orc)runs it without an explicitcallback=. If both are present they compose: the ref’s own callback first, then the explicitcallback.
- chia.base.ChiaFunction.chia_cancel(ref, force=False)[source]
Cancel a running ChiaFunction task, killing its subprocesses first.
Looks up any subprocess PIDs spawned by the task, kills them on the correct remote nodes (using process group kill for
start_new_sessionsubprocesses), then callsray.cancel().
Chia Wait
ray.wait wrapper with stuck-PENDING_NODE_ASSIGNMENT detection + retry.
Ray’s owner-side NormalTaskSubmitter sends
RequestWorkerLease RPCs without a client-side timeout. If the chosen
raylet is wedged but TCP-reachable (silent), the lease slot stays full
forever and downstream tasks of the same scheduling key get throttled at
the owner. chia_wait detects
the pattern (PENDING_NODE_ASSIGNMENT older than pending_timeout while
the cluster has free resources) and optionally cancels + resubmits.
Use a TrackedRef to pair an ObjectRef with the closure that can
re-dispatch it. chia_wait mirrors ray.wait’s (ready, pending)
return shape but operates on TrackedRefs.
- class chia.base.chia_wait.TrackedRef(ref: ObjectRef, submit_fn: Callable[[], ObjectRef] | None = None, label: str = '', submitted_at: float = 0.0, retries: int = 0)[source]
Bases:
objectObjectRef + resubmit closure + submission timestamp.
submit_fnmust accept no arguments and produce a fresh ObjectRef equivalent to the original submission (same function, args, options). Capture call-site state viafunctools.partialor a closure.
- chia.base.chia_wait.chia_wait(tracked: list[TrackedRef], *, num_returns: int = 1, timeout: float | None = None, pending_timeout: float | None = None, retry: bool = False, max_retries: int = 1, require_demand_absent: bool = False, min_free_fraction: float = 0.5, cancel_on_stuck: bool = False, print_logs: bool = False) tuple[list[TrackedRef], list[TrackedRef]][source]
Drop-in replacement for
ray.wait()that handles stuck-PENDING tasks.Returns
(ready, pending)lists ofTrackedRefafter at mostnum_returnscomplete ortimeoutseconds elapse — same semantics asray.wait. Additionally, whenpending_timeoutis set, any pending TrackedRef whose submission age exceedspending_timeoutis classified as “stuck” iff:Ray state API reports
state == PENDING_NODE_ASSIGNMENT, ANDray.available_resources()covers the task’s required resources (i.e. it could schedule somewhere if Ray were healthy), ANDfor every required resource
r,available[r] / cluster_total[r] >= min_free_fraction(cluster-side starvation check — distinguishes a wedged raylet, where the resource is sitting unused, from a busy cluster where tasks are merely waiting on the owner-side per-class budget), AND(when
require_demand_absent=True) the cluster has no pending demand reported for this task’s resource fingerprint.
- For each stuck TrackedRef:
If
retry=True,tr.submit_fnis set, andtr.retries < max_retries:ray.cancel(tr.ref, force=True)(best effort), then calltr.submit_fn()for a fresh ref. Mutatetrin place (ref/submitted_at/retries++). The old ref is orphaned — its rate-limit slot may stay burnt until the wedged raylet’s TCP eventually resets.Else: log a warning and leave
trin pending unchanged.
The returned
pendinglist contains the same TrackedRef objects passed in (possibly with mutatedref/submitted_atafter retry); callers can keep parallel bookkeeping keyed on identity.
Chia KV Store
Detached, head-node-pinned Ray actor that stores arbitrary key/value pairs for the cluster to read and write live.
Use this whenever a pipeline needs a small, operator-adjustable piece of shared state — e.g. concurrency knobs, feature flags, or rate limits — without rebuilding the detached-actor scaffolding each time.
Because the actor is lifetime="detached", its values persist across
pipeline restarts. Any Ray client on the cluster can look it up by name
and namespace:
from chia.base.chia_kv_store import get_or_create_kv_store, query_kv
handle = get_or_create_kv_store(
name="my_pipeline_knobs", namespace="my_pipeline",
num_parallel=2, stage="warmup",
)
n = query_kv(handle, "num_parallel", default=2)
An operator can adjust values live from any machine with Ray access:
python -m chia.base.chia_kv_store \
--name my_pipeline_knobs --namespace my_pipeline --set num_parallel 4
python -m chia.base.chia_kv_store \
--name my_pipeline_knobs --namespace my_pipeline --get num_parallel
- chia.base.chia_kv_store.get_or_create_kv_store(name: str, namespace: str, **defaults: Any) ActorHandle[source]
Attach to the detached actor if one already exists, otherwise create it pinned to the current Ray node.
The actor is scheduled with
NodeAffinitySchedulingStrategybound to whatever node idray.get_runtime_context().get_node_id()returns at call time. When invoked from a pipeline driver on the head node, that pins the actor to the head.- Parameters:
name – Ray actor name.
namespace – Ray actor namespace.
**defaults – Initial values used only when creating a fresh actor. Ignored when attaching to an existing actor.
- Returns:
A Ray ActorHandle to the ChiaKVStore.
- chia.base.chia_kv_store.query_kv(handle, key: str, default: Any = None, timeout: float = 60.0) Any[source]
Read
keyfrom the actor with a hard timeout.Intended for hot loops that should never wedge on a broken actor: any exception (actor dead, timeout, serialization error) or a
Nonehandle falls back todefault.- Parameters:
handle – ActorHandle returned by
get_or_create_kv_store(). May beNone— treated the same as an unreachable actor.key – Key to read.
default – Value returned on any error, timeout, or missing key.
timeout – Hard timeout in seconds for the underlying
ray.get.
- Returns:
The stored value, or
defaulton any failure.
Colocated
chia.base.colocated — placement-group plumbing for path-based node classes.
Any subsystem whose artifacts live on a worker’s filesystem (a gem5 checkout, a
hammer obj_dir, etc) can pin a family of @ChiaFunction members to one
placement-group bundle and guarantee they land on the same worker.
- class chia.base.colocated.PinnedChiaFn(fn, scheduling_opts: dict)[source]
Bases:
objectExposes a
@ChiaFunctionwith.chia_remotepre-pinned to a placement-group bundle. Resource requirements are carried over unchanged by.options(); with no scheduling opts it delegates to the raw function so the caller’s own placement applies.(Lifted from
chia.simulators.gem5._PinnedChiaFn; gem5 can migrate here.)
- class chia.base.colocated.ColocatedNode(placement_group=None, require_colocated: bool = True, *, bundle_index: int = 0, reserve_bundle: dict | None = None, pg_strategy: str = 'STRICT_PACK', wait_for_pg: bool = True, pg_ready_timeout_s: float | None = None)[source]
Bases:
objectBase for node classes whose member ChiaFunctions must share one worker.
Subclasses declare:
_MEMBER_FNS: names of class-level@ChiaFunctionmembers that__init__re-binds into per-instancePinnedChiaFnwrappers, sonode.<fn>.chia_remote(...)lands on this node’s bundle whileSubclass.<fn>.chia_remote(...)(the class attribute) stays unpinned._DEFAULT_BUNDLE: resource shape of a self-reserved bundle.
Pinning never alters a member’s resource demands — the placement group only changes where they are satisfied from (the bundle’s reservation instead of the node’s free pool). A bundle that cannot satisfy every member is therefore allowed: construction logs a warning naming the members that don’t fit (derived from each member’s own
@ChiaFunctionoptions), and dispatching one of them later raisesValueErrorat submission.Placement is decided once at construction:
placement_groupgiven -> pin members tobundle_indexof it (the node will NOT release a PG it did not create).none +
require_colocated=True-> reserve a 1-bundle_DEFAULT_BUNDLEPG (owned + released byclose()).none +
require_colocated=False-> no pinning; the caller schedules each.chia_remote/.options(...)call itself.
Usable as a context manager so a self-reserved PG is released on exit.
Set up placement and bind the member functions.
- Parameters:
placement_group – an existing Ray
PlacementGroupto schedule onto. If given,require_colocatedis moot (placement is already fixed) and this node will not remove the PG on close.require_colocated – when no PG is given, reserve one so all members co-locate. When False, leave placement to the caller.
bundle_index – which bundle of the (given or reserved) PG to pin to.
reserve_bundle – resource shape of a self-reserved bundle (default
_DEFAULT_BUNDLE). A bundle too small for some members is allowed — those members just can’t dispatch through this node (a construction-time warning lists them).pg_strategy – placement strategy for a self-reserved PG.
wait_for_pg – block on
pg.ready()for a self-reserved PG so the node is usable immediately.pg_ready_timeout_s – optional timeout for that wait.
- property placement_group
The PG members are pinned to (None when the caller handles placement).
- property owns_placement_group: bool
True iff this node reserved its PG and will release it on close().
- property task_options: dict
Scheduling opts to co-locate an actor (e.g. a
ChiaTool) with this node’s bundle. Empty when the node has no placement group (require_colocated=False) — there is no shared placement to inherit, so an actor given these opts would not be pinned.
Dispatch Proxy
Head-relayed task dispatch for workers on reverse-tunneled nodes.
Ray’s ownership model makes the submitting worker the owner of a task: the owner negotiates worker leases directly with the target raylet and serves the result object’s data. On a reverse-tunneled worker that breaks for any task whose demands place it on a LAN node — chia’s tunnels are hub-and-spoke (tunneled worker <-> head only; see cluster/tunnel.py), LAN raylet/worker ports are random and may be firewalled from tunneled machine, so the owner’s lease RPC gets “no route to host” and the task sits in PENDING_NODE_ASSIGNMENT forever even when capacity is free.
The fix: when chia_remote is called on a worker that cannot directly reach
the task’s target node — a tunneled worker dispatching to a LAN node, OR a LAN
worker dispatching to a tunneled node (local -> remote) — the dispatch is
relayed through a DispatchProxy actor pinned to the head node. The proxy —
not the originating worker — owns the inner task, so every RPC leg rides a
link that already exists. The head is the only node with a bidirectional path
to every spoke, so it is the universal relay. should_proxy decides this
per-dispatch and fail-safe: it relays unless it can prove every node the task
could land on is directly reachable (see the reachability analysis below):
tunneled worker -> proxy actor call ..... pinned head worker ports (reverse tunnel + DNAT,
the same path ProfileCollectorActor uses)
proxy <- args from tunneled worker ...... head-side -L object-manager forward
proxy -> LAN raylet lease ... plain LAN traffic
result -> tunneled worker ............... the proxy awaits the value and returns it, so
the result object lives on the head; tunneled worker
fetches it via the DNAT'd head object manager
Costs and limits:
The head relays the data — args and results each cross the network twice.
num_returns > 1is not supported (falls back to direct dispatch, with a warning, which may hang if the task lands on a LAN node).ObjectRefs nested inside proxied args are re-splatted as top-level args of the inner task; if such a ref is owned by a tunneled worker, the LAN executor cannot reach that owner and the inner task will hang. Pass values, not refs, into nested dispatches from tunneled worker-resident tasks.
The proxy inherits the creating job’s runtime_env, so it is per-job (
chia_dispatch_proxy_<job_id>) and detached; it idles at num_cpus=0 until the cluster is torn down.
- chia.base.dispatch_proxy.should_proxy(opts: dict | None = None) bool[source]
True when this dispatch must be relayed through the head DispatchProxy.
Relays iff the task could land on a node the current worker cannot reach directly (see
_directly_reachable()). This both enables LAN-origin dispatches to tunneled nodes (local -> remote) and lets provably-direct dispatches skip the single-actor relay. The head never relays.
PID Registry
PID tracking and cancellation for ChiaFunction remote tasks.
Tracks subprocess PIDs spawned during @ChiaFunction remote execution
and provides chia_cancel() to kill those process trees before
cancelling the Ray task.
The Popen hook is scoped per-task via threading.local so concurrent
Ray tasks on the same worker don’t interfere with each other.
- class chia.base.pid_registry.PidRegistryActor[source]
Bases:
objectRay actor that maps task IDs to subprocess PIDs.
Created lazily on first registration, looked up by workers and the driver via
_get_registry().- kill_all(grace: float = 25.0) int[source]
Kill all tracked subprocess PIDs across all nodes.
Dispatches remote kill tasks and waits for completion. Each kill sends SIGTERM, polls for the process to exit, and only escalates to SIGKILL if it is still alive after
graceseconds. Returns the number of PIDs targeted.
- chia.base.pid_registry.chia_cancel(ref: ObjectRef, force: bool = False) None[source]
Cancel a running ChiaFunction task, killing its subprocesses first.
Looks up any subprocess PIDs spawned by the task, kills them on the correct remote nodes (using process group kill for
start_new_sessionsubprocesses), then callsray.cancel().- Parameters:
ref – The
ObjectRefreturned bychia_remote().force – Passed through to
ray.cancel(). IfTrue, the Ray worker is killed; ifFalse(default), aTaskCancelledErroris raised cooperatively.
LLM Call
- class chia.base.llm_call.QueryResult(result: str, returncode: int, stderr: str, stream_result: str, success: bool = False)[source]
Bases:
objectStructured result from prompting an LLM or agent.
- Parameters:
result (str) – The final response from the LLM or agent
returncode (int) – The returncode from running the prompt
stderr (str) – The stderr output from running the prompt (clis only)
stream_result (str) – The full transcript of all turns of the LLM or agent
success (bool) – Whether the prompt completed successfully
- class chia.base.llm_call.LLMCallBase(system_message: str)[source]
Bases:
ABCPolymorphic base container for generic LLM and agent configuration traits and behavior. Easy to switch between different backing providers, servers, and CLIs
- abstractmethod prompt(user_message: str, tools: List[ChiaTool] | None = []) QueryResult[source]
Send a prompt to this LLM
- Parameters:
user_message (str) – Message used to prompt the LLM
tools (Optional[List[ChiaTool]]) – Tools available to the LLM during the call
MCP tool servers
Chia Tool
- class chia.base.tools.ChiaTool.ToolInfo(name: 'str', port: 'int', node_id: 'str')[source]
Bases:
object
- class chia.base.tools.ChiaTool.ChiaTool(name: str, task_options: Dict | None = None, logging_level=10)[source]
Bases:
objectBase class for MCP tool servers deployed onto Ray workers.
Subclasses define a setup() method, which calls:
``self.mcp.add_tool(self.method, name=...)`` (one or more times)
to register functions as tools with instances of this ChiaTool.
Subclass can shut down the tool server with:
``self.stop()`` - Tells the actor to shut down uvicorn, then kills the actor. - Because start and stop run in the same actor process, the uvicorn server reference is always reachable.The resulting MCP endpoint is at:
http://{self.hostname}:{self.port}/{self.name}/mcp
Example subclass:
class BashTool(ChiaTool): def setup(self): self.mcp.add_tool(self.run_command, name=f"{name}_run_command") def run_command(self, command: str) -> str: ...
Alternatively, instead of a setup method a subclass can define an __init__ method which must do the following:
def __init__(self, name, task_options): super().__init__(name, task_options=task_options) # Registers fns with self.mcp.add_tool super().__post_init__()
Initializes ChiaTool with a name and optional resource requirements.
- setup(*args, **kwargs)[source]
Hook for the auto-constructed style — override to register tools and set instance state, instead of writing
__init__.A subclass that defines
setupand no__init__is given an__init__automatically (see__init_subclass__()) that runsChiaTool.__init__before and__post_init__aftersetup, so the contract with the subclass can’t be written incorrectly. Insidesetupthe base__init__has already run, soself.name/self.mcpare available:class BashTool(ChiaTool): def setup(self, work_dir="/"): self.work_dir = work_dir self.mcp.add_tool(self.run_command, name=f"{self.name}_run_command")
Positional/keyword args from the constructor (other than
name,task_options, andlogging_level, which the base consumes) are forwarded here. Multi-level subclasses overridesetupand callsuper().setup(...)to chain.
- chia.base.tools.ChiaTool.resolve_tool_url(url: str) str[source]
Rewrite a tool URL so it is routable from the current node.
On tunnelled EC2 workers
CHIA_TOOL_ADVERTISE_HOSTandCHIA_TOOL_RELAY_HOSTare set. Tool URLs advertise the head’s real IP (CHIA_TOOL_ADVERTISE_HOST) which is only directly reachable from the local network. EC2 workers must connect via a reverse-tunnel relay instead, so this function replaces the host portion of the URL with the relay loopback (CHIA_TOOL_RELAY_HOST).On non-tunnelled nodes (or when the env vars are absent) the URL is returned unchanged.
- chia.base.tools.ChiaTool.start_router(tool: ChiaTool, ip_address: str, base_port: int = 8000, max_tries: int = 100) int[source]
Start MCP tool servers using a local uvicorn instance.
Uses a plain uvicorn server (background thread) instead of Ray Serve so that multiple nodes can each host their own independent tool servers without route-prefix conflicts.
Tries ports starting from base_port, skipping any that are already in use (e.g. host-networked Docker containers sharing the same port space).
Returns the port that was successfully bound.
Chia Tool Template
- class chia.base.tools.ChiaToolTemplate.ChiaToolTemplate(name, *args, task_options=None, logging_level=10, **kwargs)[source]
Bases:
ChiaToolInitializes ChiaTool with a name and optional resource requirements.
Bash Tool
- class chia.base.tools.BashTool.BashTool(name: str, work_dir: str = '/', timeout_seconds: int = 120, task_options: Dict | None = None)[source]
Bases:
ChiaToolMCP tool that executes bash commands in its deployed container.
When constructed with
task_options, the tools are setup on machines using a setup function which is called remotely with those task_options.Initializes ChiaTool with a name and optional resource requirements.
Async Bash Tool
- class chia.base.tools.AsyncBashTool.AsyncBashTool(name: str, work_dir: str = '/', env: Dict[str, str] | None = None, task_options: Dict | None = None)[source]
Bases:
AsyncJobToolMCP bash tool for commands that may run for minutes, built on AsyncJobTool.
Like
chia.base.tools.BashTool.BashToolit runs shell commands in a working directory, but a single command here may run long enough (a full build, a test suite) that a synchronous MCP call loses its response on the streamable-HTTP transport and hangs the agent. So this tool never blocks the transport for long:<name>_runstarts the command in the background and waits only a short, bounded slice for it to finish; if it is still running it returnsdone=falseand the agent polls<name>_run_statusuntildone=true.Fast commands therefore complete inside the single
runcall (they finish within the initial wait); only genuinely long commands require a poll. One command runs at a time per tool instance — start a command, then poll it to completion before starting the next (a secondrunwhile one is in flight returnsstarted=false).Result dict on completion:
{exit_code: int, output: str}(combined stdout+stderr, tail-truncated). MirrorsBashTool’s combined-stream behaviour.work_dirandenvare plain attributes and pickle normally; the job/threading primitives are dropped + recreated byAsyncJobTool.Initializes ChiaTool with a name and optional resource requirements.
- run(command: str, wait_seconds: int = 60) dict[source]
Run a bash command in the BACKGROUND, waiting up to wait_seconds (capped at 120) for it to finish.
If it finishes in time, returns
done=truewith{exit_code, output}. If it is still running, returnsdone=false— then poll<name>_run_statusuntildone=true. One command at a time: if a command is already running, returnsstarted=falseand you should poll status instead of starting another.
Async Job Tool
- class chia.base.tools.AsyncJobTool.AsyncJobTool(name: str, task_options: Dict | None = None)[source]
Bases:
ChiaToolBase for MCP tools that run a long job in the background, then poll for it.
Why this exists: a single MCP tool call that blocks for minutes holds the streamable-HTTP connection open with no traffic for the whole job. That transport can lose the response for such a long synchronous call, stranding the result and hanging the agent forever (observed on real runs). The fix is to never block the transport for long: a
startcall kicks the work off in a daemon thread and returns immediately, and astatuscall long-polls in short, bounded chunks (<=120s) untildone=true.Subclasses register their own start/status tool methods that delegate to
_job_start()and_job_status():class BuildTool(AsyncJobTool): def __init__(self, name, task_options=None): super().__init__(name, task_options=task_options) self.mcp.add_tool(self.build, name=f"{name}_build") self.mcp.add_tool(self.build_status, name=f"{name}_build_status") super().__post_init__() def build(self, targets=None) -> dict: return self._job_start(lambda: do_build(targets)) def build_status(self, wait_seconds: int = 60) -> dict: return self._job_status(wait_seconds)
One job runs at a time per tool instance; calling
_job_startwhile a job is in flight returns{"started": False, "running": True}rather than starting a second. Theworkcallable must return adict; ondone=trueits keys are spliced into the status result.The job’s threading primitives are not picklable, but ChiaTool serializes the tool object to the server actor (and to the LLM worker), so they are dropped in
__getstate__and recreated in__setstate__.Initializes ChiaTool with a name and optional resource requirements.
Util
- chia.base.tools.util.make_router_lifespan(mcp_instances: List[FastMCP])[source]
A lifespan which runs all MCP session managers concurrently.
This is useful when multiple MCP ASGI apps are mounted under one FastAPI ingress app (e.g. /gem5, /chia). Mounting should be done outside the lifespan; this only manages session manager lifetimes.