"""Head-relayed task dispatch for workers on reverse-tunneled nodes.
Ray's ownership model makes the submitting worker the *owner* of a task: the
owner negotiates worker leases directly with the target raylet and serves the
result object's data. On a reverse-tunneled worker that breaks for any
task whose demands place it on a LAN node — chia's tunnels are hub-and-spoke
(tunneled worker <-> head only; see cluster/tunnel.py), LAN raylet/worker ports are random
and may be firewalled from tunneled machine, so the owner's lease RPC gets "no route to host" and
the task sits in PENDING_NODE_ASSIGNMENT forever even when capacity is free.
The fix: when ``chia_remote`` is called on a worker that cannot directly reach
the task's target node — a tunneled worker dispatching to a LAN node, OR a LAN
worker dispatching to a tunneled node (``local -> remote``) — the dispatch is
relayed through a ``DispatchProxy`` actor pinned to the head node. The proxy —
not the originating worker — owns the inner task, so every RPC leg rides a
link that already exists. The head is the only node with a bidirectional path
to every spoke, so it is the universal relay. ``should_proxy`` decides this
per-dispatch and fail-safe: it relays unless it can *prove* every node the task
could land on is directly reachable (see the reachability analysis below)::
tunneled worker -> proxy actor call ..... pinned head worker ports (reverse tunnel + DNAT,
the same path ProfileCollectorActor uses)
proxy <- args from tunneled worker ...... head-side -L object-manager forward
proxy -> LAN raylet lease ... plain LAN traffic
result -> tunneled worker ............... the proxy awaits the value and returns it, so
the result object lives on the head; tunneled worker
fetches it via the DNAT'd head object manager
Costs and limits:
* The head relays the data — args and results each cross the network twice.
* ``num_returns > 1`` is not supported (falls back to direct dispatch, with
a warning, which may hang if the task lands on a LAN node).
* ObjectRefs nested inside proxied args are re-splatted as top-level args of
the inner task; if such a ref is *owned by a tunneled worker*, the LAN
executor cannot reach that owner and the inner task will hang. Pass values,
not refs, into nested dispatches from tunneled worker-resident tasks.
* The proxy inherits the creating job's runtime_env, so it is per-job
(``chia_dispatch_proxy_<job_id>``) and detached; it idles at num_cpus=0
until the cluster is torn down.
"""
from __future__ import annotations
import logging
import os
import time
import ray
logger = logging.getLogger(__name__)
# Exported by node_setup.build_worker_script ONLY on reverse-tunneled workers,
# which makes them the discriminator for "am I on a reverse tunneled node". The advertise
# host is the head's real IP (the head is never tunneled).
_TUNNEL_ENV = "CHIA_TOOL_RELAY_HOST"
_HEAD_IP_ENV = "CHIA_TOOL_ADVERTISE_HOST"
_PROXY_NAMESPACE = "chia_dispatch"
_proxy_handle = None
# ---------------------------------------------------------------------------
# Reachability analysis
#
# Relay through the head only when the task could land on a node this worker
# cannot reach directly. The rule is FAIL-SAFE: proxy unless we can *prove*
# every possible landing node is directly reachable — a wrong "don't proxy"
# hangs the task forever (PENDING_NODE_ASSIGNMENT), while a wrong "proxy" only
# costs one head relay hop.
#
# Roles (the head-as-hub tunnel topology, cluster/tunnel.py):
# head -> reaches every node (the hub); never relays, and the proxy actor
# itself lives here so it must never recurse.
# lan -> reaches the head + other on-prem LAN nodes + itself.
# remote -> reaches only itself directly; cross-spoke and ->LAN (and, kept
# conservative, ->head) go through the relay.
# ---------------------------------------------------------------------------
_ROLES_TTL = 5.0
_nodes_cache: list | None = None
_nodes_cache_ts: float = 0.0
def _alive_nodes() -> list:
"""Short-TTL cached snapshot of alive ``ray.nodes()`` entries.
Conservative by construction: a node absent from a slightly-stale snapshot
is treated as *unreachable* below (→ relay), never as reachable, so caching
can never turn a "must proxy" into an incorrect "direct".
"""
global _nodes_cache, _nodes_cache_ts
now = time.monotonic()
if _nodes_cache is None or now - _nodes_cache_ts >= _ROLES_TTL:
_nodes_cache = [n for n in ray.nodes() if n.get("Alive")]
_nodes_cache_ts = now
return _nodes_cache
def _classify(node: dict) -> str:
"""Role of *node*: ``head`` / ``remote`` / ``lan``."""
res = node.get("Resources", {})
if "node:__internal_head__" in res: # Ray's reserved head marker
return "head"
if str(node.get("NodeName", "")).startswith("127."): # tunnel loopback ip
return "remote"
return "lan"
def _role_from_env() -> str:
"""Fallback role when the current node isn't in the snapshot."""
if _TUNNEL_ENV in os.environ and _HEAD_IP_ENV in os.environ:
return "remote"
return "lan"
def _target_reachable(my_role: str, my_id: str, target_id: str, roles: dict) -> bool:
"""Can a worker of *my_role* at *my_id* directly own a task on *target_id*?
An unknown target (not in the current snapshot) is treated as unreachable.
"""
if my_role == "head":
return True
if target_id == my_id:
return True # self is always reachable
tr = roles.get(target_id)
if tr is None:
return False
if my_role == "lan":
return tr in ("head", "lan") # all on the on-prem LAN
return False # remote: only itself (handled above)
def _resolve_targets(opts: dict, nodes: list) -> set | None:
"""Node ids the task could land on, or ``None`` if undeterminable.
Only two high-confidence cases are resolved; everything else (soft
affinity, SPREAD, placement groups, unconstrained) returns ``None`` so the
caller relays. ``None`` means "can't prove → relay".
"""
ss = opts.get("scheduling_strategy")
try:
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
except Exception:
NodeAffinitySchedulingStrategy = () # type: ignore[assignment]
if NodeAffinitySchedulingStrategy and isinstance(ss, NodeAffinitySchedulingStrategy):
# Hard pin → exactly one target; a soft pin can fall back anywhere.
if getattr(ss, "soft", True):
return None
return {str(ss.node_id)}
if ss is not None:
return None # a scheduling strategy we don't model
# Custom-resource constraint (chia's node-type markers: ec2 / gcp /
# verilator_run / ...). ``resources=`` holds only custom resources; num_cpus
# etc. are separate opts and don't confine to a node *type*.
custom = {k: v for k, v in (opts.get("resources") or {}).items()
if not (k.startswith("node:") or k.startswith("accelerator_type:"))}
if not custom:
return None # unconstrained → could be anywhere
cands = {
str(n["NodeID"]) for n in nodes
if all(n.get("Resources", {}).get(k, 0) >= v for k, v in custom.items())
}
return cands or None
def _directly_reachable(opts: dict) -> bool:
"""True only if every node this dispatch could land on is directly
reachable from the current worker (so no head relay is needed)."""
my_id = str(ray.get_runtime_context().get_node_id())
nodes = _alive_nodes()
roles = {str(n["NodeID"]): _classify(n) for n in nodes}
my_role = roles.get(my_id) or _role_from_env()
if my_role == "head":
return True
targets = _resolve_targets(opts, nodes)
if targets is None:
return False
return all(_target_reachable(my_role, my_id, t, roles) for t in targets)
[docs]
def should_proxy(opts: dict | None = None) -> bool:
"""True when this dispatch must be relayed through the head DispatchProxy.
Relays iff the task could land on a node the current worker cannot reach
directly (see :func:`_directly_reachable`). This both enables LAN-origin
dispatches to tunneled nodes (``local -> remote``) and lets provably-direct
dispatches skip the single-actor relay. The head never relays.
"""
if not ray.is_initialized():
return False
if opts and opts.get("num_returns") not in (None, 1):
logger.warning(
"chia dispatch proxy does not support num_returns > 1; dispatching "
"directly (this may hang if the task schedules on an unreachable node)")
return False
try:
return not _directly_reachable(opts or {})
except Exception:
# On any analysis failure, preserve the original behavior: only
# reverse-tunneled workers relay (never hangs a remote worker).
logger.debug("dispatch-proxy reachability check failed; "
"falling back to env rule", exc_info=True)
return _TUNNEL_ENV in os.environ and _HEAD_IP_ENV in os.environ
@ray.remote(num_cpus=0, max_restarts=-1)
class DispatchProxy:
"""Head-pinned relay: re-dispatches chia trampolines so the head owns them."""
async def submit(self, func, opts, bypass_state, hooks, profile, args, kwargs):
from chia.base.ChiaFunction import _chia_trampoline, _chia_trampoline_profiled
if profile is not None:
call_id, dispatch_meta = profile
remote = ray.remote(_chia_trampoline_profiled).options(**opts)
# The _ProfiledResult passes through unchanged; the caller's get()
# unwraps it exactly as it would for a direct dispatch.
return await remote.remote(func, call_id, dispatch_meta,
bypass_state, hooks, *args, **kwargs)
remote = ray.remote(_chia_trampoline).options(**opts)
return await remote.remote(func, bypass_state, hooks, *args, **kwargs)
[docs]
def get_dispatch_proxy():
"""Get-or-create this job's head-pinned DispatchProxy actor."""
global _proxy_handle
if _proxy_handle is None:
job_id = ray.get_runtime_context().get_job_id()
_proxy_handle = DispatchProxy.options(
name=f"chia_dispatch_proxy_{job_id}",
namespace=_PROXY_NAMESPACE,
get_if_exists=True,
lifetime="detached",
# Pin via Ray's reserved head-node resource. NOT node:<head_ip>:
# co-located --net=host container raylets share the head's IP and
# advertise the same node:<ip> resource, so an IP pin can land the
# proxy in a container raylet — whose random-range worker ports
# both collide on the shared host and are unreachable from tunneled worker
# (only the head raylet's pinned 40000-40099 are tunneled+DNAT'd).
resources={"node:__internal_head__": 0.001},
max_concurrency=64,
# Parity with chia's normal tasks (ray default max_retries=3):
# ride out a proxy restart instead of surfacing ACTOR_UNAVAILABLE.
max_task_retries=3,
).remote()
return _proxy_handle