Source code for chia.base.ChiaFunction

from __future__ import annotations

import ray
import functools
import logging
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    List,
    Optional,
    Protocol,
    Sequence,
    TypeVar,
    Union,
    cast,
    overload,
    ParamSpec
)

from ray import ObjectRef
from ray.experimental.compiled_dag_ref import CompiledDAGRef

P = ParamSpec('P')
R = TypeVar('R')


class _ChiaRemoteHandle(Protocol[P, R]):
    """Typed handle returned by :meth:`ChiaWrapped.options`.

    Parameterized on both ``P`` (call args) and ``R`` (wrapped function's
    return type) so ``chia_remote(...)`` yields ``ObjectRef[R]`` and
    ``get(ref)`` can infer ``R``.
    """
    def chia_remote(self, *args: P.args, **kwargs: P.kwargs) -> ObjectRef[R]: ...
    def chia_remote_blocking(self, *args: P.args, **kwargs: P.kwargs) -> R: ...
    def remote(self, *args: P.args, **kwargs: P.kwargs) -> ObjectRef[R]: ...


[docs] class ChiaWrapped(Protocol[P, R]): """Protocol describing the object returned by ``@ChiaFunction()``. Includes ``__get__`` overloads so that mypy treats instances of this protocol as descriptors. When accessed on a class (``obj is None``), the full ``ParamSpec`` is preserved. When accessed on an *instance* (bound-method position), the return type falls back to ``ChiaWrapped[..., R]`` because Python's type system cannot express "P minus the first argument". This avoids false positives on method calls while keeping full typing for standalone decorated functions. """ _chia_original: Callable[P, R] _chia_options: dict[str, Any] _chia_remote_func: Any _chia_remote_profiled: bool def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: ... def chia_remote(self, *args: P.args, **kwargs: P.kwargs) -> ObjectRef[R]: ... def chia_remote_blocking(self, *args: P.args, **kwargs: P.kwargs) -> R: ... def options(self, **kwargs: Any) -> _ChiaRemoteHandle[P, R]: ... @overload def __get__(self, obj: None, objtype: type) -> ChiaWrapped[P, R]: ... @overload def __get__(self, obj: Any, objtype: type) -> ChiaWrapped[..., R]: ... def __get__(self, obj: Any, objtype: type) -> ChiaWrapped[P, R] | ChiaWrapped[..., R]: ...
[docs] class ChiaFunction: """Decorator that creates both a local and ray.remote version of a function. Usage:: @ChiaFunction(resources={"db1": 1.0}) def my_function(x, y): return x + y # Local call: my_function(1, 2) # Remote call (returns ObjectRef): ref = ChiaCallRemote(my_function, 1, 2) result = get(ref) # Or attribute-based: ref = my_function.chia_remote(1, 2) For bound methods on a class, the decorator uses a trampoline to serialize ``self`` and dispatch via ``ray.remote``:: class MyTool: @ChiaFunction() def do_work(self, x): return x * 2 tool = MyTool() ref = tool.do_work.chia_remote(tool, x=5) """ def __init__(self, **kwargs): """Accept any keyword arguments supported by ``ray.remote().options()``. Common options include ``resources``, ``num_cpus``, ``num_gpus``, ``num_returns``, ``max_retries``, ``retry_exceptions``, ``memory``, ``scheduling_strategy``, ``max_calls``, ``runtime_env``, ``name``, ``namespace``, ``lifetime``, etc. See Ray documentation for the full list. """ self.options = {k: v for k, v in kwargs.items() if v is not None} def __call__(self, func: Callable[P, R]) -> ChiaWrapped[P, R]: options_dict = self.options @functools.wraps(func) def _wrapper(*args, **kwargs): from chia.trace.profiler import get_profiler profiler = get_profiler() if profiler.enabled: info = profiler.on_local_start(func, args, kwargs) result = func(*args, **kwargs) profiler.on_local_end(info, result) return result return func(*args, **kwargs) # Treat the wrapped closure as a ChiaWrapped instance so attribute # assignments below typecheck against the Protocol. wrapper = cast(ChiaWrapped[P, R], _wrapper) wrapper._chia_original = func wrapper._chia_options = options_dict wrapper._chia_remote_func = None # lazy init wrapper._chia_remote_profiled = False # tracks trampoline variant def _try_bypass(args, kwargs, opts): """If func is bypassed, dispatch the provider to the worker. Pops _chia_tag from kwargs. Returns an ObjectRef if bypassed, None otherwise. The provider runs on the worker with the same scheduling options (placement groups, resources) as the real function, so cluster scheduling is still exercised. """ tag = kwargs.pop("_chia_tag", None) try: from chia.base.bypass import get_active_bypass except ImportError: return None b = get_active_bypass() # Forward the call args so a registered condition (set_cond) can # inspect them. The reserved kwargs (_chia_tag, hooks, display_name) # are already popped, so args/kwargs are the clean user arguments. if b is not None and b.is_bypassed(func.__name__, tag, *args, **kwargs): provider, data_path = b.get_provider_info(func.__name__) bypass_state = _get_bypass_state() remote = ray.remote(_chia_bypass_trampoline).options( name=f"BYPASS_{func.__qualname__}", **opts) return remote.remote(provider, tag, data_path, bypass_state, *args, **kwargs) return None def _get_bypass_state(): """Return serialized bypass state if bypass is active, else None. Attached to trampoline calls so workers can restore bypass for nested chia_remote() dispatches. Returns None when bypass is inactive to avoid any overhead. """ try: from chia.base.bypass import get_active_bypass except ImportError: return None b = get_active_bypass() if b is not None and b.active: return b.get_state() return None def chia_remote(*args, **kwargs): """Dispatch func via ray.remote. Lazily initializes the remote wrapper.""" display_name = kwargs.pop("_chia_display_name", None) # Read the cache key before _try_bypass pops _chia_tag from kwargs. cache_tag = kwargs.get("_chia_tag") hooks = _pop_chia_hooks(kwargs) ref = _try_bypass(args, kwargs, options_dict) if ref is not None: return _maybe_wrap_cache(ref, func.__name__, cache_tag) ref = _try_proxy(func, options_dict, _get_bypass_state(), hooks, args, kwargs, display_name) if ref is not None: return _maybe_wrap_cache(ref, func.__name__, cache_tag) from chia.trace.profiler import get_profiler profiler = get_profiler() use_profiled = profiler.enabled # Invalidate cache if profiling state changed. if (wrapper._chia_remote_func is not None and wrapper._chia_remote_profiled != use_profiled): wrapper._chia_remote_func = None if wrapper._chia_remote_func is None: trampoline = _chia_trampoline_profiled if use_profiled else _chia_trampoline remote = ray.remote(trampoline) opts = {"name": func.__qualname__, **options_dict} remote = remote.options(**opts) wrapper._chia_remote_func = remote wrapper._chia_remote_profiled = use_profiled bypass_state = _get_bypass_state() if use_profiled: call_id = profiler.next_call_id() dispatch_meta = profiler.prepare_dispatch( options_dict, args, kwargs, display_name=display_name) ref = wrapper._chia_remote_func.remote(func, call_id, dispatch_meta, bypass_state, hooks, *args, **kwargs) else: ref = wrapper._chia_remote_func.remote(func, bypass_state, hooks, *args, **kwargs) return _maybe_wrap_cache(ref, func.__name__, cache_tag) @functools.wraps(func) def chia_remote_blocking(*args, **kwargs): """Dispatch func remotely (honoring its resource options), block on the result, and return the unwrapped *value* instead of an ObjectRef. Convenience for callers that need a synchronous value rather than a ref — e.g. an MCP ``ChiaTool`` method, which must return func's actual result. ``functools.wraps(func)`` is applied so that ``inspect.signature`` (used by FastMCP to build the tool schema) reports func's real parameters rather than this wrapper's ``(*args, **kwargs)``. """ return get(chia_remote(*args, **kwargs)) def options(**override_opts): """Return a handle whose ``.remote()`` merges *override_opts* on top of the decorator-level options — mirroring Ray's ``func.options(...).remote(...)`` pattern.""" merged = {**options_dict, **{k: v for k, v in override_opts.items() if v is not None}} class _RemoteHandle: def chia_remote(self, *args, **kwargs): display_name = kwargs.pop("_chia_display_name", None) # Read the cache key before _try_bypass pops _chia_tag. cache_tag = kwargs.get("_chia_tag") hooks = _pop_chia_hooks(kwargs) ref = _try_bypass(args, kwargs, merged) if ref is not None: return _maybe_wrap_cache(ref, func.__name__, cache_tag) ref = _try_proxy(func, merged, _get_bypass_state(), hooks, args, kwargs, display_name) if ref is not None: return _maybe_wrap_cache(ref, func.__name__, cache_tag) from chia.trace.profiler import get_profiler profiler = get_profiler() use_profiled = profiler.enabled trampoline = _chia_trampoline_profiled if use_profiled else _chia_trampoline remote = ray.remote(trampoline) opts = {"name": func.__qualname__, **merged} remote = remote.options(**opts) bypass_state = _get_bypass_state() if use_profiled: call_id = profiler.next_call_id() dispatch_meta = profiler.prepare_dispatch( merged, args, kwargs, display_name=display_name) ref = remote.remote(func, call_id, dispatch_meta, bypass_state, hooks, *args, **kwargs) else: ref = remote.remote(func, bypass_state, hooks, *args, **kwargs) return _maybe_wrap_cache(ref, func.__name__, cache_tag) def remote(self, *args, **kwargs): return self.chia_remote(*args, **kwargs) def chia_remote_blocking(self, *args, **kwargs): """Like :meth:`chia_remote` but blocks and returns the value.""" return get(self.chia_remote(*args, **kwargs)) return _RemoteHandle() # setattr (not direct assignment) because ChiaWrapped declares these # as methods; mypy forbids overwriting method slots. setattr(wrapper, "chia_remote", chia_remote) setattr(wrapper, "chia_remote_blocking", chia_remote_blocking) setattr(wrapper, "options", options) return wrapper
# --------------------------------------------------------------------------- # Bypass return function — dispatched to workers when a function is bypassed. # The worker receives pre-computed mock data and just returns it. # --------------------------------------------------------------------------- def _chia_bypass_trampoline(provider, tag, data_path, _chia_bypass_state_, *args, **kwargs): """Bypass trampoline: runs the provider on the worker. Dispatched with the same scheduling options (placement group, resources) as the real function. Restores bypass state first so any nested chia_remote() calls from within the provider see the bypass config. """ _restore_bypass(_chia_bypass_state_) print(f"[BYPASS] {provider.__name__} (tag={tag})") return provider(tag, data_path, *args, **kwargs) # --------------------------------------------------------------------------- # Trampolines — execute the real function on the worker # --------------------------------------------------------------------------- def _restore_bypass(bypass_state): """Restore bypass on this worker if state was provided by the caller.""" if bypass_state is not None: from chia.base.bypass import Bypass, get_active_bypass if get_active_bypass() is None or not get_active_bypass().active: b = Bypass() b.load_state(bypass_state) # --------------------------------------------------------------------------- # Optional worker-side setup/cleanup hooks # # A chia_remote() call may pass reserved kwargs ``_chia_setup`` / # ``_chia_setup_args`` / ``_chia_cleanup`` / ``_chia_cleanup_args``. The # (cloudpicklable) setup callable runs on the worker BEFORE func, the cleanup # callable runs AFTER in a finally (so it runs even when func raises — but not # if setup itself raised). Both are side-effect only: neither receives nor can # replace func's return value. Hooks are skipped on bypassed calls (those go # through _chia_bypass_trampoline, which never runs func). # # Exception semantics: # * setup raising -> propagates; func and cleanup are skipped (nothing was # set up to tear down). The task fails with that error. # * cleanup raising -> caught and logged, never re-raised. A best-effort # teardown must not mask func's return value or func's own # exception (which a bare ``finally`` would do). # --------------------------------------------------------------------------- _logger = logging.getLogger(__name__) def _pop_chia_hooks(kwargs): """Pop the reserved hook kwargs from *kwargs*; return a hooks dict or None.""" setup = kwargs.pop("_chia_setup", None) cleanup = kwargs.pop("_chia_cleanup", None) setup_args = kwargs.pop("_chia_setup_args", ()) cleanup_args = kwargs.pop("_chia_cleanup_args", ()) if setup is None and cleanup is None: return None return { "setup": setup, "setup_args": setup_args, "cleanup": cleanup, "cleanup_args": cleanup_args, } def _run_chia_setup(hooks): """Run the setup hook. Lets exceptions propagate (fails the task).""" if hooks and hooks["setup"] is not None: hooks["setup"](*(hooks["setup_args"] or ())) def _run_chia_cleanup(hooks): """Run the cleanup hook best-effort: never raises, so a failing teardown cannot mask func's result or func's own exception. Logs on failure.""" if hooks and hooks["cleanup"] is not None: try: hooks["cleanup"](*(hooks["cleanup_args"] or ())) except Exception: # noqa: BLE001 — best-effort teardown _logger.exception( "chia cleanup hook raised; ignoring (func result/error preserved)" ) def _try_proxy(func, opts, bypass_state, hooks, args, kwargs, display_name): """Relay this dispatch through the head's DispatchProxy when running on a reverse-tunneled worker, which cannot lease workers on LAN raylets (see chia/base/dispatch_proxy.py for the full story). Returns an ObjectRef to the relayed call, or None when dispatching directly is fine.""" from chia.base.dispatch_proxy import get_dispatch_proxy, should_proxy if not should_proxy(opts): return None from chia.trace.profiler import get_profiler profiler = get_profiler() profile = None if profiler.enabled: profile = (profiler.next_call_id(), profiler.prepare_dispatch(opts, args, kwargs, display_name=display_name)) full_opts = {"name": func.__qualname__, **opts} return get_dispatch_proxy().submit.remote( func, full_opts, bypass_state, hooks, profile, args, kwargs) def _chia_trampoline(func, _chia_bypass_state_, _chia_hooks_, *args, **kwargs): """Trampoline function used by ChiaFunction to dispatch remote calls. _chia_bypass_state_ is automatically attached by chia_remote() when bypass is active, so any nested chia_remote() calls from within func can check bypass. _chia_hooks_ carries optional worker-side setup/cleanup callables. Both are attached by chia_remote() and named with underscores to avoid collisions with user function parameters. """ _restore_bypass(_chia_bypass_state_) from chia.base.pid_registry import _pid_tracking_scope _run_chia_setup(_chia_hooks_) try: with _pid_tracking_scope(): return func(*args, **kwargs) finally: _run_chia_cleanup(_chia_hooks_) def _chia_trampoline_profiled(func, call_id, dispatch_meta, _chia_bypass_state_, _chia_hooks_, *args, **kwargs): """Profiled trampoline — wraps the result with worker metadata. Emits the ``dispatch`` event at function start and the ``complete`` event at function end, both from the worker. Returns a ``_ProfiledResult`` that ``get()`` unwraps transparently. """ _restore_bypass(_chia_bypass_state_) import time as _time from chia.trace.profiler import _ProfiledResult, get_profiler # Ensure the profiler singleton on this worker is enabled. # The worker may have created a disabled singleton before the # collector actor was started. profiler = get_profiler() if not profiler.enabled: get_profiler.reset() # Gather worker metadata once. try: worker_ip = ray.util.get_node_ip_address() except Exception: worker_ip = "unknown" try: ctx = ray.get_runtime_context() worker_id = ctx.get_worker_id() node_id = ctx.get_node_id() except Exception: worker_id = "unknown" node_id = "unknown" # Emit the dispatch event from the worker at actual task start. profiler = get_profiler() display_name = dispatch_meta.get("display_name", "") if dispatch_meta else "" if profiler.enabled and dispatch_meta: profiler.on_worker_dispatch( call_id, func.__name__, worker_ip, worker_id, node_id, dispatch_meta.get("resources", {}), dispatch_meta.get("obj_ref_deps", []), dispatch_meta.get("caller_worker_id", ""), display_name=display_name, ) # perf_counter for high-res duration, time() for wall-clock timestamp from chia.base.pid_registry import _pid_tracking_scope _run_chia_setup(_chia_hooks_) try: with _pid_tracking_scope(): t0 = _time.perf_counter() result = func(*args, **kwargs) extra = profiler._pop_extra() elapsed = _time.perf_counter() - t0 end_ts = _time.time() # wall-clock end timestamp # Emit the complete event from the worker at the actual end time. if profiler.enabled: profiler.on_worker_complete(call_id, func.__name__, end_ts, elapsed, worker_ip, worker_id, node_id, extra=extra, display_name=display_name) return _ProfiledResult( value=result, worker_ip=worker_ip, worker_id=worker_id, node_id=node_id, exec_time_s=elapsed, call_id=call_id, func_name=func.__name__, display_name=display_name, extra=extra, ) finally: _run_chia_cleanup(_chia_hooks_)
[docs] def ChiaCallRemote( func: ChiaWrapped[P, R], *args: P.args, **kwargs: P.kwargs ) -> ObjectRef[R]: """Call the ray.remote version of a @ChiaFunction-decorated function. Returns a ``ray.ObjectRef[R]`` where ``R`` is the wrapped function's return type, so ``get(...)`` can infer the final value type. """ if not hasattr(func, 'chia_remote'): name = getattr(func, '__name__', repr(func)) raise TypeError(f"{name} is not decorated with @ChiaFunction") return func.chia_remote(*args, **kwargs)
[docs] class ObjectRefCallback: """An ObjectRef paired with a callback that :func:`get` runs on resolution. Lets a producer attach a post-resolution hook to the ref it hands back, so callers can stay async and simply ``get()`` it — the callback runs at fetch time and its return becomes ``get``'s result — without passing ``callback=`` themselves. ``ClaudeCodeLLM.prompt`` returns one of these (carrying ``_sync_transcript``) when ``resume_session=True``. ``.ref`` exposes the underlying ObjectRef for ``ray.wait`` / :func:`chia_wait`. """ __slots__ = ("ref", "callback") def __init__(self, ref, callback: Callable[[Any], Any]): self.ref = ref self.callback = callback
# --------------------------------------------------------------------------- # Actor handle wrapper — give a plain Ray actor the chia_remote()/get() surface # --------------------------------------------------------------------------- class _ChiaActorMethod: """One bound method of a :class:`ChiaActorHandle`. Exposes ``chia_remote`` as an alias of Ray's ``remote`` (and keeps ``remote`` working), so actor calls read the same as ``@ChiaFunction`` dispatch: ``get(handle.method.chia_remote(...))``. ``options`` is forwarded so ``handle.method.options(...).chia_remote(...)`` works too. """ __slots__ = ("_method",) def __init__(self, method): self._method = method def chia_remote(self, *args, **kwargs): return self._method.remote(*args, **kwargs) def remote(self, *args, **kwargs): return self._method.remote(*args, **kwargs) def options(self, **kwargs): return _ChiaActorMethod(self._method.options(**kwargs))
[docs] class ChiaActorHandle: """Thin proxy over a Ray actor handle that adds ``chia_remote`` to each method. Built by :func:`chia_actor`. Attribute access returns a :class:`_ChiaActorMethod`, so ``handle.method.chia_remote(...)`` dispatches the call and :func:`get` resolves it — matching the ``@ChiaFunction`` surface. ``remote`` still works, so existing ``handle.method.remote(...)`` call sites are unaffected. Note: unlike a real ``@ChiaFunction`` dispatch, ``chia_remote`` here is a plain alias for Ray's ``remote`` — no profiling, bypass, or cache wrapping is applied; the call goes straight to the actor. Use :attr:`actor` to recover the raw Ray handle (e.g. for ``ray.kill`` or identity checks). """ __slots__ = ("_actor",) def __init__(self, actor): object.__setattr__(self, "_actor", actor) @property def actor(self): """The underlying raw Ray actor handle.""" return self._actor def __getattr__(self, name): # Proxy only public method access. Never intercept dunders: cloudpickle # and copy probe __reduce_ex__/__getstate__/etc., and returning a # _ChiaActorMethod for those would corrupt serialization (the handle is # shipped to workers and passed as a task arg). if name.startswith("__"): raise AttributeError(name) return _ChiaActorMethod(getattr(self._actor, name)) def __reduce__(self): # Serialize as the wrapper around the (serializable) Ray handle so the # proxy survives being sent to a worker or passed into a ChiaFunction. return (chia_actor, (self._actor,))
[docs] def chia_actor(handle): """Wrap a Ray actor *handle* so its methods accept ``chia_remote`` and resolve via :func:`get`. Lets actor calls share the dispatch surface used by ``@ChiaFunction``:: store = chia_actor(some_actor) n = get(store.size_bytes.chia_remote()) get(store.write.chia_remote(key, value)) ``chia_remote`` here is a plain alias for Ray's ``remote`` — it does NOT run the profiling / bypass / cache machinery that ``ChiaWrapped.chia_remote`` does; the call goes straight to the actor. ``remote`` still works, so pre-existing call sites are unchanged. Recover the raw handle via ``handle.actor`` (e.g. for ``ray.kill``). Idempotent: wrapping an already wrapped handle returns it unchanged. """ if isinstance(handle, ChiaActorHandle): return handle return ChiaActorHandle(handle)
def _maybe_wrap_cache(ref, func_name, tag): """Wrap *ref* so the cache write fires when its result is fetched. When *func_name* is configured ``cache: true`` and the call carries a ``_chia_tag`` (*tag*), return an :class:`ObjectRefCallback` whose callback writes the resolved value to the head-pinned cache actor. ``get()`` resolves the inner ref through the profiler unwrap *first*, so the callback receives the final user value (not a ``_ProfiledResult``). Returns *ref* unchanged when there is no tag, the function isn't cached, no cache was started, or *ref* is already an ``ObjectRefCallback`` (``get()`` supports only one level of wrapping). """ if tag is None or isinstance(ref, ObjectRefCallback): return ref try: from chia.base.cache import get_active_cache, is_cached except ImportError: return ref if not is_cached(func_name, tag): return ref cache = get_active_cache() if cache is None: return ref def _write(value): ray.get(cache.write.remote(tag, value)) return value return ObjectRefCallback(ref, _write) @overload def get(ref: ObjectRefCallback, *, timeout: Optional[float] = None, callback: Optional[Callable[[Any], Any]] = None, _use_object_store: bool = False) -> Any: ... @overload def get(ref: Sequence[ObjectRef[R]], *, timeout: Optional[float] = None, callback: Optional[Callable[[Any], Any]] = None, _use_object_store: bool = False) -> List[R]: ... @overload def get(ref: Sequence[ObjectRef[Any]], *, timeout: Optional[float] = None, callback: Optional[Callable[[Any], Any]] = None, _use_object_store: bool = False) -> List[Any]: ... @overload def get(ref: ObjectRef[R], *, timeout: Optional[float] = None, callback: Optional[Callable[[Any], Any]] = None, _use_object_store: bool = False) -> R: ... @overload def get(ref: Sequence[CompiledDAGRef], *, timeout: Optional[float] = None, callback: Optional[Callable[[Any], Any]] = None, _use_object_store: bool = False) -> List[Any]: ... @overload def get(ref: CompiledDAGRef, *, timeout: Optional[float] = None, callback: Optional[Callable[[Any], Any]] = None, _use_object_store: bool = False) -> Any: ...
[docs] def get( ref: Union[ "ObjectRef[Any]", Sequence["ObjectRef[Any]"], CompiledDAGRef, Sequence[CompiledDAGRef], ], *, timeout: Optional[float] = None, callback: Optional[Callable[[Any], Any]] = None, _use_object_store: bool = False, ) -> Union[Any, List[Any]]: """Wrapper around ``ray.get()`` for use in Chia flows. Delegates to :func:`ray.get` and post-processes the result through the Chia profiler. Overloads mirror :func:`ray.get` so a single ``ObjectRef[R]`` returns ``R`` and a sequence of them returns ``List[R]``. ``callback``: when given, it is invoked with the resolved value (after the profiler unwrap) and its return value is what ``get`` returns. This keeps dispatch async — the ref is produced non-blocking by ``chia_remote`` — while running a post-resolution side effect/transform only when the value is fetched (e.g. ``get(ref, callback=fn)``). A :class:`ObjectRefCallback` carries its own callback, so ``get(orc)`` runs it without an explicit ``callback=``. If both are present they compose: the ref's own callback first, then the explicit ``callback``. """ from chia.trace.profiler import get_profiler # An ObjectRefCallback carries a callback alongside its ref: resolve the # underlying ref, then apply its callback, then any explicit one. Only a # single ObjectRefCallback is supported (not lists containing them) — the # chia_remote callers that produce one always get() it on its own. if isinstance(ref, ObjectRefCallback): value = get(ref.ref, timeout=timeout, _use_object_store=_use_object_store) if ref.callback is not None: value = ref.callback(value) if callback is not None: value = callback(value) return value # ray.get's public overloads omit the _use_object_store kwarg, so mypy # can't match any overload when we forward it — suppress here. raw = ray.get(ref, timeout=timeout, _use_object_store=_use_object_store) # type: ignore[call-overload] profiler = get_profiler() value = profiler.on_remote_complete(raw) if callback is not None: return callback(value) return value
[docs] def chia_cancel(ref, force=False): """Cancel a running ChiaFunction task, killing its subprocesses first. Looks up any subprocess PIDs spawned by the task, kills them on the correct remote nodes (using process group kill for ``start_new_session`` subprocesses), then calls ``ray.cancel()``. """ from chia.base.pid_registry import chia_cancel as _cancel return _cancel(ref, force=force)
# Re-export chia_wait + TrackedRef so callers have a single import surface. from chia.base.chia_wait import TrackedRef, chia_wait # noqa: E402,F401