Source code for chia.base.tools.AsyncJobTool

from __future__ import annotations

import threading
from typing import Callable, Dict, Optional

from chia.base.tools.ChiaTool import ChiaTool


[docs] class AsyncJobTool(ChiaTool): """Base for MCP tools that run a long job in the background, then poll for it. Why this exists: a single MCP tool call that blocks for minutes holds the streamable-HTTP connection open with no traffic for the whole job. That transport can lose the response for such a long synchronous call, stranding the result and hanging the agent forever (observed on real runs). The fix is to never block the transport for long: a ``start`` call kicks the work off in a daemon thread and returns immediately, and a ``status`` call long-polls in short, bounded chunks (<=120s) until ``done=true``. Subclasses register their own start/status tool methods that delegate to :meth:`_job_start` and :meth:`_job_status`:: class BuildTool(AsyncJobTool): def __init__(self, name, task_options=None): super().__init__(name, task_options=task_options) self.mcp.add_tool(self.build, name=f"{name}_build") self.mcp.add_tool(self.build_status, name=f"{name}_build_status") super().__post_init__() def build(self, targets=None) -> dict: return self._job_start(lambda: do_build(targets)) def build_status(self, wait_seconds: int = 60) -> dict: return self._job_status(wait_seconds) One job runs at a time per tool instance; calling ``_job_start`` while a job is in flight returns ``{"started": False, "running": True}`` rather than starting a second. The ``work`` callable must return a ``dict``; on ``done=true`` its keys are spliced into the status result. The job's threading primitives are not picklable, but ChiaTool serializes the tool object to the server actor (and to the LLM worker), so they are dropped in ``__getstate__`` and recreated in ``__setstate__``. """ #: Hard cap on how long a single status poll may block, so the MCP call #: always returns promptly even if the caller passes a large wait. _MAX_POLL_SECONDS = 120 def __init__(self, name: str, task_options: Optional[Dict] = None): super().__init__(name, task_options=task_options) self._init_job_state() def _init_job_state(self) -> None: self._job_lock = threading.Lock() self._job_done = threading.Event() self._job_result: Optional[dict] = None self._job_thread: Optional[threading.Thread] = None def _ensure_job_state(self) -> None: """Create the job-state primitives if they don't exist yet. ``__init__`` sets these up for the explicit-init idiom, but a subclass built with the ``setup()`` idiom never runs ``AsyncJobTool.__init__`` (the auto-init brackets only ``ChiaTool.__init__``, skipping intermediate bases), so initialize them lazily on first use too. This keeps the job machinery working under both idioms with no per-subclass boilerplate. """ if not hasattr(self, "_job_lock"): self._init_job_state() def _job_start(self, work: Callable[[], dict]) -> dict: """Run callable *work* (-> result dict) in a daemon thread; return now. Returns ``{"started": True, "running": True}`` on launch, or ``{"started": False, "running": True, ...}`` if a job is already running. """ self._ensure_job_state() with self._job_lock: if self._job_thread is not None and self._job_thread.is_alive(): return {"started": False, "running": True, "note": "a job is already running here; poll status"} self._job_done.clear() self._job_result = None def _run() -> None: r = work() with self._job_lock: self._job_result = r self._job_done.set() self._job_thread = threading.Thread(target=_run, daemon=True) self._job_thread.start() return {"started": True, "running": True} def _job_status(self, wait_seconds: int) -> dict: """Block up to ``wait_seconds`` (capped at ``_MAX_POLL_SECONDS`` so the MCP call stays short), then report. ``done=true`` splices in the job's result dict; otherwise returns ``{"done": False, "running": ...}``.""" self._ensure_job_state() finished = self._job_done.wait( timeout=max(0, min(int(wait_seconds), self._MAX_POLL_SECONDS))) with self._job_lock: if finished and self._job_result is not None: return {"done": True, "running": False, **self._job_result} if self._job_thread is None: return {"done": False, "running": False, "note": "nothing started yet"} return {"done": False, "running": True, "note": "still running; poll again"} # threading primitives can't be pickled; ChiaTool pickles `self`. def __getstate__(self): state = super().__getstate__() for k in ("_job_lock", "_job_done", "_job_result", "_job_thread"): state.pop(k, None) return state def __setstate__(self, state): super().__setstate__(state) self._init_job_state()