Source code for chia.base.tools.AsyncBashTool
from __future__ import annotations
import os
import signal
import subprocess
from typing import Dict, Optional
from chia.base.tools.AsyncJobTool import AsyncJobTool
#: Keep returned output bounded — a long build/test run can emit megabytes.
_MAX_OUTPUT_LINES = 600
def _tail(text: str, n: int = _MAX_OUTPUT_LINES) -> str:
return "\n".join(text.splitlines()[-n:])
[docs]
class AsyncBashTool(AsyncJobTool):
"""MCP bash tool for commands that may run for minutes, built on AsyncJobTool.
Like :class:`chia.base.tools.BashTool.BashTool` it runs shell commands in a
working directory, but a single command here may run long enough (a full
build, a test suite) that a *synchronous* MCP call loses its response on the
streamable-HTTP transport and hangs the agent. So this tool never blocks the
transport for long: ``<name>_run`` starts the command in the background and
waits only a short, bounded slice for it to finish; if it is still running it
returns ``done=false`` and the agent polls ``<name>_run_status`` until
``done=true``.
Fast commands therefore complete inside the single ``run`` call (they finish
within the initial wait); only genuinely long commands require a poll. One
command runs at a time per tool instance — start a command, then poll it to
completion before starting the next (a second ``run`` while one is in flight
returns ``started=false``).
Result dict on completion: ``{exit_code: int, output: str}`` (combined
stdout+stderr, tail-truncated). Mirrors :class:`BashTool`'s combined-stream
behaviour. ``work_dir`` and ``env`` are plain attributes and pickle normally;
the job/threading primitives are dropped + recreated by ``AsyncJobTool``.
"""
def __init__(
self,
name: str,
work_dir: str = "/",
env: Optional[Dict[str, str]] = None,
task_options: Optional[Dict] = None,
):
super().__init__(name, task_options=task_options)
self.work_dir = work_dir
self.env = dict(env) if env else None
self.mcp.add_tool(self.run, name=f"{name}_run")
self.mcp.add_tool(self.run_status, name=f"{name}_run_status")
super().__post_init__()
def _exec(self, command: str) -> dict:
"""Run *command* to completion in its own process group; return result.
``start_new_session=True`` + ``killpg`` on failure so a child that keeps
the stdout pipe open can't wedge ``communicate()`` forever (same hazard,
and fix, as :class:`BashTool` / the build helpers)."""
run_env = {**os.environ, **self.env} if self.env else None
try:
proc = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True, cwd=self.work_dir,
start_new_session=True, env=run_env)
except Exception as e: # noqa: BLE001
return {"exit_code": -1, "output": f"failed to spawn: {e}"}
try:
out, _ = proc.communicate()
return {"exit_code": proc.returncode, "output": _tail(out or "") or "(no output)"}
except Exception as e: # noqa: BLE001
try:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
return {"exit_code": -1, "output": f"error: {e}"}
[docs]
def run(self, command: str, wait_seconds: int = 60) -> dict:
"""Run a bash command in the BACKGROUND, waiting up to *wait_seconds*
(capped at 120) for it to finish.
If it finishes in time, returns ``done=true`` with ``{exit_code,
output}``. If it is still running, returns ``done=false`` — then poll
``<name>_run_status`` until ``done=true``. One command at a time: if a
command is already running, returns ``started=false`` and you should poll
status instead of starting another.
"""
start = self._job_start(lambda: self._exec(command))
if not start.get("started"):
return {**start, "done": False}
return self._job_status(wait_seconds)
[docs]
def run_status(self, wait_seconds: int = 60) -> dict:
"""Wait up to *wait_seconds* (capped at 120) for the in-flight command,
then report. ``done=true`` splices in ``{exit_code, output}``."""
return self._job_status(wait_seconds)