Base Runtime

API reference for chia.base. These pages are generated from the docstrings in the source, so they stay in sync with the code.

Bypass

General-purpose bypass for ChiaFunction tasks.

When bypass is active, a function is still dispatched through Ray (testing cluster scheduling, placement groups, resource allocation) but the real computation is replaced with pre-recorded data.

How it works

The bypass check happens on the caller side (in chia_remote()). When a function is bypassed:

  1. The caller computes the mock data using the registered provider.

  2. Instead of dispatching the real function, it dispatches a trivial _bypass_return(data) function with the same scheduling options (placement group, resources, etc.).

  3. The worker receives the pre-computed data and just returns it.

Usage

from chia.base.bypass import Bypass

# Create a bypass instance — always, as part of loop setup.
# If yaml_path is None, bypass does nothing (all functions run normally).
bypass = Bypass(yaml_path=args.bypass_config)

# Register project-specific providers that build the correct dataclasses.
bypass.set_provider("my_function", my_provider_fn)

# Optionally gate the bypass on a runtime condition (default: always on).
bypass.set_cond("my_function", my_cond_fn)

# Done — the ChiaFunction trampoline checks the active bypass automatically.

YAML format

bypass:
  func_name:
    bypass: true
    data: /path/to/file.json  # optional

  # shorthand (bypass, no data):
  simple_function: true

  # not listed = not bypassed (default, no error)

Provider signature

The provider runs on the worker (same scheduling as the real function):

def my_provider(tag, data_path, *args, **kwargs) -> ReturnType
  • tag: the _chia_tag from the caller (str or None)

  • data_path: the YAML data field (str or None)

  • *args, **kwargs: the original function arguments

Resolution when a function is bypassed:

1. Provider registered -> provider(tag, data_path, *args, **kwargs)
2. Only data_path      -> returns file contents (default file provider)
3. Neither             -> error

Condition gate

A function may also register a condition via set_cond. It runs on the caller as the last gate in is_bypassed — after the bypass flag, the provider/data check, and the tag patterns — and decides whether the bypass actually happens:

def my_cond(tag, data_path, *args, **kwargs) -> bool

A falsy return means the call runs for real instead of being bypassed. With no condition registered the default is to bypass (True). Useful to gate on runtime state, e.g. only replay from the cache when the value is present.

chia.base.bypass.get_active_bypass()[source]

Return the active Bypass instance, or None if bypass is not in use.

class chia.base.bypass.Bypass(yaml_path: str | None = None)[source]

Bases: object

Controls which ChiaFunction tasks return pre-recorded data.

Create one instance per loop run, always, as part of setup:

bypass = Bypass(yaml_path=args.bypass_config)

If yaml_path is None, bypass does nothing — is_bypassed() returns False for all functions. No errors, no special handling.

If yaml_path is a path, the YAML is parsed and functions listed with bypass: true will be bypassed (if they also have a provider or data path).

The constructor registers itself as the active bypass instance. The ChiaFunction trampoline checks this automatically.

file_server()[source]

Return the bypass file server actor handle, creating it if needed.

Custom providers can use this to read arbitrary files from the node that owns this Bypass:

def my_provider(tag, data_path, *args, **kwargs):
    server = get_active_bypass().file_server()
    text = ray.get(server.get_text.remote(data_path))
    ...
set_provider(func_name: str, provider: Callable) None[source]

Register a provider for func_name.

The provider runs on the worker (not the head node) and replaces the real function when it is bypassed. It is called as:

provider(tag, data_path, *args, **kwargs) -> return_value
  • tag: the _chia_tag from the caller (str or None)

  • data_path: the YAML data field for this function (str or None)

  • args, **kwargs: the original function arguments, passed through

set_cond(func_name: str, cond: Callable) None[source]

Register a bypass condition for func_name.

The condition is an additional gate, evaluated on the caller as the last step of is_bypassed() — after the bypass flag, the provider/data check, and the tag patterns. It is called as:

cond(tag, data_path, *args, **kwargs) -> bool

with the same arguments as the provider (set_provider()). If it returns a falsy value the call is not bypassed and runs for real; if it returns truthy the bypass proceeds. When no condition is registered for a function the default is True (no extra gate).

Use it to make the bypass decision depend on runtime state, e.g. only replay from the cache when the value is actually present:

def cache_hit(tag, data_path, *args, **kwargs):
    return ray.get(get_active_cache().has.remote(tag))

bypass.set_cond("run_verilator_test", cache_hit)
property active: bool

True if any functions are configured for bypass.

is_bypassed(func_name: str, tag: str | None = None, *args, **kwargs) bool[source]

Should func_name be bypassed on this call?

Returns True only when:
  1. bypass[name] is True (from YAML)

  2. There is a provider or a data_path

  3. (if a tags list is configured) tag matches a pattern

  4. (if a condition is registered) the condition returns truthy

The tag parameter identifies a specific call (e.g. “iter0_opt2”). If the YAML has a tags list for this function, only calls whose tag matches a pattern are bypassed. If no tags list, all calls are bypassed regardless of tag.

The optional *args, **kwargs are the original call arguments; they are forwarded to a registered condition (see set_cond()). The condition is the last gate: it runs only after tag matching has already passed, and a falsy return means the call runs for real. With no condition registered the default is to bypass (True). A condition may invoke remote calls (e.g. a cache lookup), so callers that use is_bypassed() as a cheap predicate should be aware it can do work when a condition is registered.

Functions not listed in the YAML return False (not bypassed). If no YAML was loaded, always returns False.

get_provider_info(func_name: str) tuple[Callable, str | None][source]

Return (provider, data_path) for dispatching on the worker.

Resolution:
  1. Provider registered -> (provider, data_path_or_None)

  2. Only data_path (no provider) -> (_default_file_provider, data_path)

  3. Neither -> KeyError

The provider is executed on the worker, not here. This method only looks up what to dispatch.

get_state() dict[source]

Return a serializable dict of the bypass state, including providers.

Providers and conditions are serialized via cloudpickle (Ray’s default). This works for module-level functions and most common cases. Lambdas/closures capturing unpicklable objects will fail with a clear error.

The file server actor handle is included so workers can read files from the node that owns this Bypass.

load_state(state: dict) None[source]

Restore bypass state from a dict (from get_state()).

Cache

Head-node output cache for ChiaFunction tasks.

A small key/value store, pinned to the head node as a Ray actor, that pickles arbitrary Python objects keyed by a string tag, with an LRU byte budget and a warm-start scan from disk. It has two halves that work together with the chia.base.bypass mechanism:

  • Write is automatic. A function marked cache: true in the YAML has its real-run output written to the cache automatically by ChiaFunction (via an ObjectRefCallback that fires when get() resolves the result), keyed by the call’s _chia_tag. No user code is needed.

  • Read is manual, via bypass. The cache does not auto-serve. To replay a cached value, register a bypass provider that reads it back:

    def cache_provider(tag, data_path, *args, **kwargs):
        hit, value = ray.get(get_active_cache().read.remote(tag))
        if not hit:
            raise KeyError(f"cache miss for tag {tag!r}")
        return value
    
    bypass.set_provider("run_verilator_test", cache_provider)
    

    Cache = write-through populate; bypass = read path. They share the tag.

Usage

from chia.base.cache import start_cache

# Call once on the driver after ray.init(). Idempotent.
start_cache(size=4, units="GB", cache_dir_path="/data/chia_cache",
            yaml_path=args.bypass_config)

YAML format (same file as bypass, parallel cache: section)

cache:
  run_verilator_test:
    cache: true
    tags: ["iter.*"]   # optional; mirror bypass tag patterns

# shorthand (cache, no tags):
build_megaboom: true
chia.base.cache.start_cache(size: float, cache_dir_path: str, units: str = 'B', yaml_path: str | None = None, namespace: str | None = None)[source]

Create (or find) the head-pinned cache actor. Idempotent.

Call from the driver after ray.init(). Mirrors chia.trace.profiler.start_collector().

The actor is created detached so it is reachable from workers in a different Ray job (e.g. a bypass provider running on a remote worker) — a plain named actor is only visible within its creating job. Pair it with an explicit namespace so cross-job lookups resolve it. stop_cache still tears it down; cross-run reuse remains the on-disk pickles + warm start, not a lingering live actor.

Parameters:
  • size – Numeric budget; multiplied by UNITS[units] to get bytes.

  • cache_dir_path – Directory for the pickle files (on the head node).

  • units – One of UNITS (default "B").

  • yaml_path – Optional path to the bypass/cache YAML; the cache: section opts functions into automatic caching.

  • namespace – Optional Ray namespace for the named actor.

Returns:

The Cache actor handle.

chia.base.cache.stop_cache(namespace: str | None = None) None[source]

Kill the cache actor. Idempotent.

Cross-run reuse comes from the on-disk pickles + warm-start scan, not from a persisted live actor, so killing the actor loses nothing on disk.

chia.base.cache.get_active_cache(namespace: str | None = None)[source]

Return the cache actor handle, or None if no cache was started.

Always resolves the current named actor via ray.get_actor — no process-local handle is cached. A worker (e.g. inside a bypass provider or a nested dispatch) reaches the cache with no state threading, and a cache that was restarted/replaced is picked up automatically rather than serving a stale, dead handle.

chia.base.cache.is_cached(func_name: str, tag: str | None = None) bool[source]

Should func_name’s output be cached on this call?

True only when the function is configured cache: true and, if tags: patterns are present, tag re.fullmatch``es one. Mirrors :meth:`chia.base.bypass.Bypass.is_bypassed`. The config is read from the actor (the source of truth) so it never goes stale across a cache restart; only tagged dispatches reach this path (see ``_maybe_wrap_cache).

Chia Function

class chia.base.ChiaFunction.ChiaWrapped(*args, **kwargs)[source]

Bases: Protocol[P, R]

Protocol describing the object returned by @ChiaFunction().

Includes __get__ overloads so that mypy treats instances of this protocol as descriptors. When accessed on a class (obj is None), the full ParamSpec is preserved. When accessed on an instance (bound-method position), the return type falls back to ChiaWrapped[..., R] because Python’s type system cannot express “P minus the first argument”. This avoids false positives on method calls while keeping full typing for standalone decorated functions.

class chia.base.ChiaFunction.ChiaFunction(**kwargs)[source]

Bases: object

Decorator that creates both a local and ray.remote version of a function.

Usage:

@ChiaFunction(resources={"db1": 1.0})
def my_function(x, y):
    return x + y

# Local call:
my_function(1, 2)

# Remote call (returns ObjectRef):
ref = ChiaCallRemote(my_function, 1, 2)
result = get(ref)

# Or attribute-based:
ref = my_function.chia_remote(1, 2)

For bound methods on a class, the decorator uses a trampoline to serialize self and dispatch via ray.remote:

class MyTool:
    @ChiaFunction()
    def do_work(self, x):
        return x * 2

tool = MyTool()
ref = tool.do_work.chia_remote(tool, x=5)

Accept any keyword arguments supported by ray.remote().options().

Common options include resources, num_cpus, num_gpus, num_returns, max_retries, retry_exceptions, memory, scheduling_strategy, max_calls, runtime_env, name, namespace, lifetime, etc. See Ray documentation for the full list.

chia.base.ChiaFunction.ChiaCallRemote(func: ~chia.base.ChiaFunction.ChiaWrapped[~P, ~chia.base.ChiaFunction.R], *args: ~typing.~P, **kwargs: ~typing.~P) ObjectRef[R][source]

Call the ray.remote version of a @ChiaFunction-decorated function.

Returns a ray.ObjectRef[R] where R is the wrapped function’s return type, so get(...) can infer the final value type.

class chia.base.ChiaFunction.ObjectRefCallback(ref, callback: Callable[[Any], Any])[source]

Bases: object

An ObjectRef paired with a callback that get() runs on resolution.

Lets a producer attach a post-resolution hook to the ref it hands back, so callers can stay async and simply get() it — the callback runs at fetch time and its return becomes get’s result — without passing callback= themselves. ClaudeCodeLLM.prompt returns one of these (carrying _sync_transcript) when resume_session=True.

.ref exposes the underlying ObjectRef for ray.wait / chia_wait().

class chia.base.ChiaFunction.ChiaActorHandle(actor)[source]

Bases: object

Thin proxy over a Ray actor handle that adds chia_remote to each method.

Built by chia_actor(). Attribute access returns a _ChiaActorMethod, so handle.method.chia_remote(...) dispatches the call and get() resolves it — matching the @ChiaFunction surface. remote still works, so existing handle.method.remote(...) call sites are unaffected.

Note: unlike a real @ChiaFunction dispatch, chia_remote here is a plain alias for Ray’s remote — no profiling, bypass, or cache wrapping is applied; the call goes straight to the actor. Use actor to recover the raw Ray handle (e.g. for ray.kill or identity checks).

property actor

The underlying raw Ray actor handle.

chia.base.ChiaFunction.chia_actor(handle)[source]

Wrap a Ray actor handle so its methods accept chia_remote and resolve via get().

Lets actor calls share the dispatch surface used by @ChiaFunction:

store = chia_actor(some_actor)
n = get(store.size_bytes.chia_remote())
get(store.write.chia_remote(key, value))

chia_remote here is a plain alias for Ray’s remote — it does NOT run the profiling / bypass / cache machinery that ChiaWrapped.chia_remote does; the call goes straight to the actor. remote still works, so pre-existing call sites are unchanged. Recover the raw handle via handle.actor (e.g. for ray.kill). Idempotent: wrapping an already wrapped handle returns it unchanged.

chia.base.ChiaFunction.get(ref: ObjectRefCallback, *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) Any[source]
chia.base.ChiaFunction.get(ref: Sequence[ObjectRef[R]], *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) List[R]
chia.base.ChiaFunction.get(ref: Sequence[ObjectRef[Any]], *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) List[Any]
chia.base.ChiaFunction.get(ref: ObjectRef[R], *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) R
chia.base.ChiaFunction.get(ref: Sequence[CompiledDAGRef], *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) List[Any]
chia.base.ChiaFunction.get(ref: CompiledDAGRef, *, timeout: float | None = None, callback: Callable[[Any], Any] | None = None, _use_object_store: bool = False) Any

Wrapper around ray.get() for use in Chia flows.

Delegates to ray.get() and post-processes the result through the Chia profiler. Overloads mirror ray.get() so a single ObjectRef[R] returns R and a sequence of them returns List[R].

callback: when given, it is invoked with the resolved value (after the profiler unwrap) and its return value is what get returns. This keeps dispatch async — the ref is produced non-blocking by chia_remote — while running a post-resolution side effect/transform only when the value is fetched (e.g. get(ref, callback=fn)).

A ObjectRefCallback carries its own callback, so get(orc) runs it without an explicit callback=. If both are present they compose: the ref’s own callback first, then the explicit callback.

chia.base.ChiaFunction.chia_cancel(ref, force=False)[source]

Cancel a running ChiaFunction task, killing its subprocesses first.

Looks up any subprocess PIDs spawned by the task, kills them on the correct remote nodes (using process group kill for start_new_session subprocesses), then calls ray.cancel().

Chia Wait

ray.wait wrapper with stuck-PENDING_NODE_ASSIGNMENT detection + retry.

Ray’s owner-side NormalTaskSubmitter sends RequestWorkerLease RPCs without a client-side timeout. If the chosen raylet is wedged but TCP-reachable (silent), the lease slot stays full forever and downstream tasks of the same scheduling key get throttled at the owner. chia_wait detects the pattern (PENDING_NODE_ASSIGNMENT older than pending_timeout while the cluster has free resources) and optionally cancels + resubmits.

Use a TrackedRef to pair an ObjectRef with the closure that can re-dispatch it. chia_wait mirrors ray.wait’s (ready, pending) return shape but operates on TrackedRefs.

class chia.base.chia_wait.TrackedRef(ref: ObjectRef, submit_fn: Callable[[], ObjectRef] | None = None, label: str = '', submitted_at: float = 0.0, retries: int = 0)[source]

Bases: object

ObjectRef + resubmit closure + submission timestamp.

submit_fn must accept no arguments and produce a fresh ObjectRef equivalent to the original submission (same function, args, options). Capture call-site state via functools.partial or a closure.

chia.base.chia_wait.chia_wait(tracked: list[TrackedRef], *, num_returns: int = 1, timeout: float | None = None, pending_timeout: float | None = None, retry: bool = False, max_retries: int = 1, require_demand_absent: bool = False, min_free_fraction: float = 0.5, cancel_on_stuck: bool = False, print_logs: bool = False) tuple[list[TrackedRef], list[TrackedRef]][source]

Drop-in replacement for ray.wait() that handles stuck-PENDING tasks.

Returns (ready, pending) lists of TrackedRef after at most num_returns complete or timeout seconds elapse — same semantics as ray.wait. Additionally, when pending_timeout is set, any pending TrackedRef whose submission age exceeds pending_timeout is classified as “stuck” iff:

  1. Ray state API reports state == PENDING_NODE_ASSIGNMENT, AND

  2. ray.available_resources() covers the task’s required resources (i.e. it could schedule somewhere if Ray were healthy), AND

  3. for every required resource r, available[r] / cluster_total[r] >= min_free_fraction (cluster-side starvation check — distinguishes a wedged raylet, where the resource is sitting unused, from a busy cluster where tasks are merely waiting on the owner-side per-class budget), AND

  4. (when require_demand_absent=True) the cluster has no pending demand reported for this task’s resource fingerprint.

For each stuck TrackedRef:
  • If retry=True, tr.submit_fn is set, and tr.retries < max_retries: ray.cancel(tr.ref, force=True) (best effort), then call tr.submit_fn() for a fresh ref. Mutate tr in place (ref/submitted_at/retries++). The old ref is orphaned — its rate-limit slot may stay burnt until the wedged raylet’s TCP eventually resets.

  • Else: log a warning and leave tr in pending unchanged.

The returned pending list contains the same TrackedRef objects passed in (possibly with mutated ref/submitted_at after retry); callers can keep parallel bookkeeping keyed on identity.

Chia KV Store

Detached, head-node-pinned Ray actor that stores arbitrary key/value pairs for the cluster to read and write live.

Use this whenever a pipeline needs a small, operator-adjustable piece of shared state — e.g. concurrency knobs, feature flags, or rate limits — without rebuilding the detached-actor scaffolding each time.

Because the actor is lifetime="detached", its values persist across pipeline restarts. Any Ray client on the cluster can look it up by name and namespace:

from chia.base.chia_kv_store import get_or_create_kv_store, query_kv

handle = get_or_create_kv_store(
    name="my_pipeline_knobs", namespace="my_pipeline",
    num_parallel=2, stage="warmup",
)
n = query_kv(handle, "num_parallel", default=2)

An operator can adjust values live from any machine with Ray access:

python -m chia.base.chia_kv_store \
    --name my_pipeline_knobs --namespace my_pipeline --set num_parallel 4
python -m chia.base.chia_kv_store \
    --name my_pipeline_knobs --namespace my_pipeline --get num_parallel
chia.base.chia_kv_store.get_or_create_kv_store(name: str, namespace: str, **defaults: Any) ActorHandle[source]

Attach to the detached actor if one already exists, otherwise create it pinned to the current Ray node.

The actor is scheduled with NodeAffinitySchedulingStrategy bound to whatever node id ray.get_runtime_context().get_node_id() returns at call time. When invoked from a pipeline driver on the head node, that pins the actor to the head.

Parameters:
  • name – Ray actor name.

  • namespace – Ray actor namespace.

  • **defaults – Initial values used only when creating a fresh actor. Ignored when attaching to an existing actor.

Returns:

A Ray ActorHandle to the ChiaKVStore.

chia.base.chia_kv_store.query_kv(handle, key: str, default: Any = None, timeout: float = 60.0) Any[source]

Read key from the actor with a hard timeout.

Intended for hot loops that should never wedge on a broken actor: any exception (actor dead, timeout, serialization error) or a None handle falls back to default.

Parameters:
  • handle – ActorHandle returned by get_or_create_kv_store(). May be None — treated the same as an unreachable actor.

  • key – Key to read.

  • default – Value returned on any error, timeout, or missing key.

  • timeout – Hard timeout in seconds for the underlying ray.get.

Returns:

The stored value, or default on any failure.

Colocated

chia.base.colocated — placement-group plumbing for path-based node classes.

Any subsystem whose artifacts live on a worker’s filesystem (a gem5 checkout, a hammer obj_dir, etc) can pin a family of @ChiaFunction members to one placement-group bundle and guarantee they land on the same worker.

class chia.base.colocated.PinnedChiaFn(fn, scheduling_opts: dict)[source]

Bases: object

Exposes a @ChiaFunction with .chia_remote pre-pinned to a placement-group bundle. Resource requirements are carried over unchanged by .options(); with no scheduling opts it delegates to the raw function so the caller’s own placement applies.

(Lifted from chia.simulators.gem5._PinnedChiaFn; gem5 can migrate here.)

options(**overrides)[source]

Layer extra Ray options on top of the node’s pinning.

class chia.base.colocated.ColocatedNode(placement_group=None, require_colocated: bool = True, *, bundle_index: int = 0, reserve_bundle: dict | None = None, pg_strategy: str = 'STRICT_PACK', wait_for_pg: bool = True, pg_ready_timeout_s: float | None = None)[source]

Bases: object

Base for node classes whose member ChiaFunctions must share one worker.

Subclasses declare:

  • _MEMBER_FNS: names of class-level @ChiaFunction members that __init__ re-binds into per-instance PinnedChiaFn wrappers, so node.<fn>.chia_remote(...) lands on this node’s bundle while Subclass.<fn>.chia_remote(...) (the class attribute) stays unpinned.

  • _DEFAULT_BUNDLE: resource shape of a self-reserved bundle.

Pinning never alters a member’s resource demands — the placement group only changes where they are satisfied from (the bundle’s reservation instead of the node’s free pool). A bundle that cannot satisfy every member is therefore allowed: construction logs a warning naming the members that don’t fit (derived from each member’s own @ChiaFunction options), and dispatching one of them later raises ValueError at submission.

Placement is decided once at construction:

  • placement_group given -> pin members to bundle_index of it (the node will NOT release a PG it did not create).

  • none + require_colocated=True -> reserve a 1-bundle _DEFAULT_BUNDLE PG (owned + released by close()).

  • none + require_colocated=False -> no pinning; the caller schedules each .chia_remote / .options(...) call itself.

Usable as a context manager so a self-reserved PG is released on exit.

Set up placement and bind the member functions.

Parameters:
  • placement_group – an existing Ray PlacementGroup to schedule onto. If given, require_colocated is moot (placement is already fixed) and this node will not remove the PG on close.

  • require_colocated – when no PG is given, reserve one so all members co-locate. When False, leave placement to the caller.

  • bundle_index – which bundle of the (given or reserved) PG to pin to.

  • reserve_bundle – resource shape of a self-reserved bundle (default _DEFAULT_BUNDLE). A bundle too small for some members is allowed — those members just can’t dispatch through this node (a construction-time warning lists them).

  • pg_strategy – placement strategy for a self-reserved PG.

  • wait_for_pg – block on pg.ready() for a self-reserved PG so the node is usable immediately.

  • pg_ready_timeout_s – optional timeout for that wait.

property placement_group

The PG members are pinned to (None when the caller handles placement).

property owns_placement_group: bool

True iff this node reserved its PG and will release it on close().

property task_options: dict

Scheduling opts to co-locate an actor (e.g. a ChiaTool) with this node’s bundle. Empty when the node has no placement group (require_colocated=False) — there is no shared placement to inherit, so an actor given these opts would not be pinned.

close() None[source]

Release the PG iff this node reserved it. Idempotent.

Dispatch Proxy

Head-relayed task dispatch for workers on reverse-tunneled nodes.

Ray’s ownership model makes the submitting worker the owner of a task: the owner negotiates worker leases directly with the target raylet and serves the result object’s data. On a reverse-tunneled worker that breaks for any task whose demands place it on a LAN node — chia’s tunnels are hub-and-spoke (tunneled worker <-> head only; see cluster/tunnel.py), LAN raylet/worker ports are random and may be firewalled from tunneled machine, so the owner’s lease RPC gets “no route to host” and the task sits in PENDING_NODE_ASSIGNMENT forever even when capacity is free.

The fix: when chia_remote is called on a worker that cannot directly reach the task’s target node — a tunneled worker dispatching to a LAN node, OR a LAN worker dispatching to a tunneled node (local -> remote) — the dispatch is relayed through a DispatchProxy actor pinned to the head node. The proxy — not the originating worker — owns the inner task, so every RPC leg rides a link that already exists. The head is the only node with a bidirectional path to every spoke, so it is the universal relay. should_proxy decides this per-dispatch and fail-safe: it relays unless it can prove every node the task could land on is directly reachable (see the reachability analysis below):

tunneled worker -> proxy actor call ..... pinned head worker ports (reverse tunnel + DNAT,
                              the same path ProfileCollectorActor uses)
proxy <- args from tunneled worker ...... head-side -L object-manager forward
proxy -> LAN raylet lease ... plain LAN traffic
result -> tunneled worker ............... the proxy awaits the value and returns it, so
                              the result object lives on the head; tunneled worker
                              fetches it via the DNAT'd head object manager

Costs and limits:

  • The head relays the data — args and results each cross the network twice.

  • num_returns > 1 is not supported (falls back to direct dispatch, with a warning, which may hang if the task lands on a LAN node).

  • ObjectRefs nested inside proxied args are re-splatted as top-level args of the inner task; if such a ref is owned by a tunneled worker, the LAN executor cannot reach that owner and the inner task will hang. Pass values, not refs, into nested dispatches from tunneled worker-resident tasks.

  • The proxy inherits the creating job’s runtime_env, so it is per-job (chia_dispatch_proxy_<job_id>) and detached; it idles at num_cpus=0 until the cluster is torn down.

chia.base.dispatch_proxy.should_proxy(opts: dict | None = None) bool[source]

True when this dispatch must be relayed through the head DispatchProxy.

Relays iff the task could land on a node the current worker cannot reach directly (see _directly_reachable()). This both enables LAN-origin dispatches to tunneled nodes (local -> remote) and lets provably-direct dispatches skip the single-actor relay. The head never relays.

chia.base.dispatch_proxy.get_dispatch_proxy()[source]

Get-or-create this job’s head-pinned DispatchProxy actor.

PID Registry

PID tracking and cancellation for ChiaFunction remote tasks.

Tracks subprocess PIDs spawned during @ChiaFunction remote execution and provides chia_cancel() to kill those process trees before cancelling the Ray task.

The Popen hook is scoped per-task via threading.local so concurrent Ray tasks on the same worker don’t interfere with each other.

class chia.base.pid_registry.PidRegistryActor[source]

Bases: object

Ray actor that maps task IDs to subprocess PIDs.

Created lazily on first registration, looked up by workers and the driver via _get_registry().

kill_all(grace: float = 25.0) int[source]

Kill all tracked subprocess PIDs across all nodes.

Dispatches remote kill tasks and waits for completion. Each kill sends SIGTERM, polls for the process to exit, and only escalates to SIGKILL if it is still alive after grace seconds. Returns the number of PIDs targeted.

chia.base.pid_registry.chia_cancel(ref: ObjectRef, force: bool = False) None[source]

Cancel a running ChiaFunction task, killing its subprocesses first.

Looks up any subprocess PIDs spawned by the task, kills them on the correct remote nodes (using process group kill for start_new_session subprocesses), then calls ray.cancel().

Parameters:
  • ref – The ObjectRef returned by chia_remote().

  • force – Passed through to ray.cancel(). If True, the Ray worker is killed; if False (default), a TaskCancelledError is raised cooperatively.

LLM Call

class chia.base.llm_call.QueryResult(result: str, returncode: int, stderr: str, stream_result: str, success: bool = False)[source]

Bases: object

Structured result from prompting an LLM or agent.

Parameters:
  • result (str) – The final response from the LLM or agent

  • returncode (int) – The returncode from running the prompt

  • stderr (str) – The stderr output from running the prompt (clis only)

  • stream_result (str) – The full transcript of all turns of the LLM or agent

  • success (bool) – Whether the prompt completed successfully

class chia.base.llm_call.LLMCallBase(system_message: str)[source]

Bases: ABC

Polymorphic base container for generic LLM and agent configuration traits and behavior. Easy to switch between different backing providers, servers, and CLIs

abstractmethod prompt(user_message: str, tools: List[ChiaTool] | None = []) QueryResult[source]

Send a prompt to this LLM

Parameters:
  • user_message (str) – Message used to prompt the LLM

  • tools (Optional[List[ChiaTool]]) – Tools available to the LLM during the call

MCP tool servers

Chia Tool

class chia.base.tools.ChiaTool.ToolInfo(name: 'str', port: 'int', node_id: 'str')[source]

Bases: object

class chia.base.tools.ChiaTool.ChiaTool(name: str, task_options: Dict | None = None, logging_level=10)[source]

Bases: object

Base class for MCP tool servers deployed onto Ray workers.

Subclasses define a setup() method, which calls:

``self.mcp.add_tool(self.method, name=...)``  (one or more times)

to register functions as tools with instances of this ChiaTool.

Subclass can shut down the tool server with:

``self.stop()``
- Tells the actor to shut down uvicorn, then kills the actor.
- Because start and stop run in the same actor process, the
    uvicorn server reference is always reachable.

The resulting MCP endpoint is at:

http://{self.hostname}:{self.port}/{self.name}/mcp

Example subclass:

class BashTool(ChiaTool):
    def setup(self):
        self.mcp.add_tool(self.run_command, name=f"{name}_run_command")

    def run_command(self, command: str) -> str:
        ...

Alternatively, instead of a setup method a subclass can define an __init__ method which must do the following:

def __init__(self, name, task_options):
    super().__init__(name, task_options=task_options)
    # Registers fns with self.mcp.add_tool
    super().__post_init__()

Initializes ChiaTool with a name and optional resource requirements.

setup(*args, **kwargs)[source]

Hook for the auto-constructed style — override to register tools and set instance state, instead of writing __init__.

A subclass that defines setup and no __init__ is given an __init__ automatically (see __init_subclass__()) that runs ChiaTool.__init__ before and __post_init__ after setup, so the contract with the subclass can’t be written incorrectly. Inside setup the base __init__ has already run, so self.name / self.mcp are available:

class BashTool(ChiaTool):
    def setup(self, work_dir="/"):
        self.work_dir = work_dir
        self.mcp.add_tool(self.run_command, name=f"{self.name}_run_command")

Positional/keyword args from the constructor (other than name, task_options, and logging_level, which the base consumes) are forwarded here. Multi-level subclasses override setup and call super().setup(...) to chain.

stop()[source]

Stop the tool’s MCP server and clean up resources.

chia.base.tools.ChiaTool.resolve_tool_url(url: str) str[source]

Rewrite a tool URL so it is routable from the current node.

On tunnelled EC2 workers CHIA_TOOL_ADVERTISE_HOST and CHIA_TOOL_RELAY_HOST are set. Tool URLs advertise the head’s real IP (CHIA_TOOL_ADVERTISE_HOST) which is only directly reachable from the local network. EC2 workers must connect via a reverse-tunnel relay instead, so this function replaces the host portion of the URL with the relay loopback (CHIA_TOOL_RELAY_HOST).

On non-tunnelled nodes (or when the env vars are absent) the URL is returned unchanged.

chia.base.tools.ChiaTool.start_router(tool: ChiaTool, ip_address: str, base_port: int = 8000, max_tries: int = 100) int[source]

Start MCP tool servers using a local uvicorn instance.

Uses a plain uvicorn server (background thread) instead of Ray Serve so that multiple nodes can each host their own independent tool servers without route-prefix conflicts.

Tries ports starting from base_port, skipping any that are already in use (e.g. host-networked Docker containers sharing the same port space).

Returns the port that was successfully bound.

chia.base.tools.ChiaTool.stop_router(tool_name: str) bool[source]

Stop a running uvicorn server by tool name.

Looks up the server in the worker-side _active_servers registry, signals it to exit, waits for the thread, and releases the port. Returns True if the server was found and stopped.

Chia Tool Template

class chia.base.tools.ChiaToolTemplate.ChiaToolTemplate(name, *args, task_options=None, logging_level=10, **kwargs)[source]

Bases: ChiaTool

Initializes ChiaTool with a name and optional resource requirements.

setup()[source]

Setup function for the CHIA tool

hello_world_tool(name: str) str[source]

Example tool method that takes a name as input and returns a greeting string.

Parameters:

name (str) – The name to include in the greeting.

goodbye_world_tool(name: str) str[source]

Example tool method that takes a name as input and returns a goodbye string.

Parameters:

name (str) – The name to include in the goodbye message.

Bash Tool

class chia.base.tools.BashTool.BashTool(name: str, work_dir: str = '/', timeout_seconds: int = 120, task_options: Dict | None = None)[source]

Bases: ChiaTool

MCP tool that executes bash commands in its deployed container.

When constructed with task_options, the tools are setup on machines using a setup function which is called remotely with those task_options.

Initializes ChiaTool with a name and optional resource requirements.

run_command(command: str) str[source]

Run a bash command and return combined stdout/stderr.

Async Bash Tool

class chia.base.tools.AsyncBashTool.AsyncBashTool(name: str, work_dir: str = '/', env: Dict[str, str] | None = None, task_options: Dict | None = None)[source]

Bases: AsyncJobTool

MCP bash tool for commands that may run for minutes, built on AsyncJobTool.

Like chia.base.tools.BashTool.BashTool it runs shell commands in a working directory, but a single command here may run long enough (a full build, a test suite) that a synchronous MCP call loses its response on the streamable-HTTP transport and hangs the agent. So this tool never blocks the transport for long: <name>_run starts the command in the background and waits only a short, bounded slice for it to finish; if it is still running it returns done=false and the agent polls <name>_run_status until done=true.

Fast commands therefore complete inside the single run call (they finish within the initial wait); only genuinely long commands require a poll. One command runs at a time per tool instance — start a command, then poll it to completion before starting the next (a second run while one is in flight returns started=false).

Result dict on completion: {exit_code: int, output: str} (combined stdout+stderr, tail-truncated). Mirrors BashTool’s combined-stream behaviour. work_dir and env are plain attributes and pickle normally; the job/threading primitives are dropped + recreated by AsyncJobTool.

Initializes ChiaTool with a name and optional resource requirements.

run(command: str, wait_seconds: int = 60) dict[source]

Run a bash command in the BACKGROUND, waiting up to wait_seconds (capped at 120) for it to finish.

If it finishes in time, returns done=true with {exit_code, output}. If it is still running, returns done=false — then poll <name>_run_status until done=true. One command at a time: if a command is already running, returns started=false and you should poll status instead of starting another.

run_status(wait_seconds: int = 60) dict[source]

Wait up to wait_seconds (capped at 120) for the in-flight command, then report. done=true splices in {exit_code, output}.

Async Job Tool

class chia.base.tools.AsyncJobTool.AsyncJobTool(name: str, task_options: Dict | None = None)[source]

Bases: ChiaTool

Base for MCP tools that run a long job in the background, then poll for it.

Why this exists: a single MCP tool call that blocks for minutes holds the streamable-HTTP connection open with no traffic for the whole job. That transport can lose the response for such a long synchronous call, stranding the result and hanging the agent forever (observed on real runs). The fix is to never block the transport for long: a start call kicks the work off in a daemon thread and returns immediately, and a status call long-polls in short, bounded chunks (<=120s) until done=true.

Subclasses register their own start/status tool methods that delegate to _job_start() and _job_status():

class BuildTool(AsyncJobTool):
    def __init__(self, name, task_options=None):
        super().__init__(name, task_options=task_options)
        self.mcp.add_tool(self.build, name=f"{name}_build")
        self.mcp.add_tool(self.build_status, name=f"{name}_build_status")
        super().__post_init__()

    def build(self, targets=None) -> dict:
        return self._job_start(lambda: do_build(targets))

    def build_status(self, wait_seconds: int = 60) -> dict:
        return self._job_status(wait_seconds)

One job runs at a time per tool instance; calling _job_start while a job is in flight returns {"started": False, "running": True} rather than starting a second. The work callable must return a dict; on done=true its keys are spliced into the status result.

The job’s threading primitives are not picklable, but ChiaTool serializes the tool object to the server actor (and to the LLM worker), so they are dropped in __getstate__ and recreated in __setstate__.

Initializes ChiaTool with a name and optional resource requirements.

Util

chia.base.tools.util.make_router_lifespan(mcp_instances: List[FastMCP])[source]

A lifespan which runs all MCP session managers concurrently.

This is useful when multiple MCP ASGI apps are mounted under one FastAPI ingress app (e.g. /gem5, /chia). Mounting should be done outside the lifespan; this only manages session manager lifetimes.