"""chia.base.colocated — placement-group plumbing for path-based node classes.
Any subsystem whose artifacts live on a worker's filesystem (a gem5 checkout, a
hammer obj_dir, etc) can pin a family of ``@ChiaFunction`` members to one
placement-group bundle and guarantee they land on the same worker.
"""
from __future__ import annotations
import logging
import ray
from ray.util.placement_group import (
placement_group as _placement_group,
remove_placement_group as _remove_placement_group,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
logger = logging.getLogger(__name__)
[docs]
class PinnedChiaFn:
"""Exposes a ``@ChiaFunction`` with ``.chia_remote`` pre-pinned to a
placement-group bundle. Resource requirements are carried over unchanged by
``.options()``; with no scheduling opts it delegates to the raw function so
the caller's own placement applies.
(Lifted from ``chia.simulators.gem5._PinnedChiaFn``; gem5 can migrate here.)
"""
def __init__(self, fn, scheduling_opts: dict):
self._fn = fn
self._opts = dict(scheduling_opts) if scheduling_opts else {}
self.chia_remote = (
fn.options(**self._opts).chia_remote if self._opts else fn.chia_remote
)
[docs]
def options(self, **overrides):
"""Layer extra Ray options on top of the node's pinning."""
merged = {**self._opts, **overrides}
return self._fn.options(**merged) if merged else self._fn
def __call__(self, *args, **kwargs):
"""Local (non-Ray) invocation of the underlying function."""
return self._fn(*args, **kwargs)
[docs]
class ColocatedNode:
"""Base for node classes whose member ChiaFunctions must share one worker.
Subclasses declare:
* ``_MEMBER_FNS``: names of class-level ``@ChiaFunction`` members that
``__init__`` re-binds into per-instance :class:`PinnedChiaFn` wrappers,
so ``node.<fn>.chia_remote(...)`` lands on this node's bundle while
``Subclass.<fn>.chia_remote(...)`` (the class attribute) stays unpinned.
* ``_DEFAULT_BUNDLE``: resource shape of a self-reserved bundle.
Pinning never alters a member's resource demands — the placement group only
changes where they are satisfied from (the bundle's reservation instead of
the node's free pool). A bundle that cannot satisfy every member is
therefore allowed: construction logs a warning naming the members that
don't fit (derived from each member's own ``@ChiaFunction`` options), and
dispatching one of them later raises ``ValueError`` at submission.
Placement is decided once at construction:
* ``placement_group`` given -> pin members to ``bundle_index`` of it
(the node will NOT release a PG it did not create).
* none + ``require_colocated=True`` -> reserve a 1-bundle
``_DEFAULT_BUNDLE`` PG (owned + released by :meth:`close`).
* none + ``require_colocated=False`` -> no pinning; the caller schedules
each ``.chia_remote`` / ``.options(...)`` call itself.
Usable as a context manager so a self-reserved PG is released on exit.
"""
_MEMBER_FNS: tuple[str, ...] = ()
_DEFAULT_BUNDLE: dict = {"CPU": 1}
def __init__(
self,
placement_group=None,
require_colocated: bool = True,
*,
bundle_index: int = 0,
reserve_bundle: dict | None = None,
pg_strategy: str = "STRICT_PACK",
wait_for_pg: bool = True,
pg_ready_timeout_s: float | None = None,
):
"""Set up placement and bind the member functions.
Args:
placement_group: an existing Ray ``PlacementGroup`` to schedule onto.
If given, ``require_colocated`` is moot (placement is already
fixed) and this node will not remove the PG on close.
require_colocated: when no PG is given, reserve one so all members
co-locate. When False, leave placement to the caller.
bundle_index: which bundle of the (given or reserved) PG to pin to.
reserve_bundle: resource shape of a self-reserved bundle
(default ``_DEFAULT_BUNDLE``). A bundle too small for some
members is allowed — those members just can't dispatch through
this node (a construction-time warning lists them).
pg_strategy: placement strategy for a self-reserved PG.
wait_for_pg: block on ``pg.ready()`` for a self-reserved PG so the
node is usable immediately.
pg_ready_timeout_s: optional timeout for that wait.
"""
self._owns_pg = False
self._bundle_index = bundle_index
if placement_group is not None:
self._pg = placement_group
elif require_colocated:
bundle = reserve_bundle or dict(self._DEFAULT_BUNDLE)
self._pg = _placement_group([bundle], strategy=pg_strategy)
self._owns_pg = True
self._bundle_index = 0
if wait_for_pg:
ray.get(self._pg.ready(), timeout=pg_ready_timeout_s)
else:
self._pg = None
if self._pg is not None:
self._sched_opts = {
"scheduling_strategy": PlacementGroupSchedulingStrategy(
placement_group=self._pg,
placement_group_bundle_index=self._bundle_index,
)
}
else:
self._sched_opts = {}
# Re-bind each class-level @ChiaFunction into a pinned instance member:
# node.<fn>.chia_remote == <fn>.options(<sched>).chia_remote
for name in self._MEMBER_FNS:
setattr(self, name, PinnedChiaFn(getattr(type(self), name), self._sched_opts))
self._warn_unsatisfiable_members()
def _member_demands(self) -> dict[str, dict]:
"""Per-member resource demands, derived from each ``@ChiaFunction``'s
decorator options: custom ``resources`` plus ``num_cpus`` (which Ray
defaults to 1 for tasks)."""
demands = {}
for name in self._MEMBER_FNS:
opts = getattr(getattr(type(self), name), "_chia_options", {})
demands[name] = {"CPU": opts.get("num_cpus", 1), **opts.get("resources", {})}
return demands
def _warn_unsatisfiable_members(self) -> None:
"""Warn (never raise) for members whose demands cannot fit the pinned
bundle(s) — dispatching such a member through this node raises
``ValueError`` at submission. Advisory only: constructing a node
against a PG that serves a subset of its members is legitimate."""
if self._pg is None:
return
try:
specs = self._pg.bundle_specs
except Exception:
return # PG metadata unavailable; skip the advisory check
bundles = specs if self._bundle_index == -1 else [specs[self._bundle_index]]
for name, demand in self._member_demands().items():
if not any(all(b.get(res, 0) >= v for res, v in demand.items())
for b in bundles):
logger.warning(
f"{type(self).__name__}.{name} demands {demand}, which does "
f"not fit the pinned bundle(s) {bundles}; dispatching it via "
f"this node will raise ValueError at submission"
)
# -- placement-group lifecycle --------------------------------------------
@property
def placement_group(self):
"""The PG members are pinned to (None when the caller handles placement)."""
return self._pg
@property
def owns_placement_group(self) -> bool:
"""True iff this node reserved its PG and will release it on close()."""
return self._owns_pg
@property
def task_options(self) -> dict:
"""Scheduling opts to co-locate an actor (e.g. a ``ChiaTool``) with this
node's bundle. Empty when the node has no placement group
(``require_colocated=False``) — there is no shared placement to inherit,
so an actor given these opts would not be pinned."""
return dict(self._sched_opts)
[docs]
def close(self) -> None:
"""Release the PG iff this node reserved it. Idempotent."""
if self._owns_pg and self._pg is not None:
_remove_placement_group(self._pg)
self._pg = None
self._owns_pg = False
def __enter__(self):
return self
def __exit__(self, *exc) -> None:
self.close()