Source code for chia.models.opencode

"""opencode CLI LLM backend.

:class:`OpenCodeLLM` wraps the ``opencode`` CLI (https://opencode.ai) as an LLM
backend, opencode is provider-agnostic: the model is given as ``provider/model`` (e.g.
``anthropic/claude-sonnet-4-6``) and opencode runs its own server-side agentic
tool loop, so there is no client-side MCP loop here.

WARNING: experimental. Only exercised by the tests in
chia/models/tests/test_opencode.py (mocked unit tests, plus opt-in live tests).
Not validated in production. Auth is environment-driven: opencode uses its own
stored credentials (``opencode auth login``) or provider env vars.
"""

from __future__ import annotations

import json
import logging
import os
import re
import subprocess
import tempfile
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import TYPE_CHECKING, List, Optional

import ray

from chia.base.ChiaFunction import ChiaFunction
from chia.base.llm_call import QueryResult, LLMCallBase

if TYPE_CHECKING:
    from chia.base.tools.ChiaTool import ChiaTool


# ---------------------------------------------------------------------------
# Exceptions
#
# A parallel taxonomy to claude.py. Kept separate so this module stands alone;
# each carries ``__reduce__`` for Ray serialization.
# ---------------------------------------------------------------------------


[docs] class OpenCodeError(Exception): """Base for all opencode CLI errors.""" def __init__( self, node_id: str, error_type: str, exit_code: int = -1, raw_message: str = "", ): self.node_id = node_id self.error_type = error_type self.exit_code = exit_code self.raw_message = raw_message super().__init__(f"{error_type} on {node_id}: {raw_message[:200]}") def __reduce__(self): return ( self.__class__, (self.node_id, self.error_type, self.exit_code, self.raw_message), )
[docs] class RateLimitError(OpenCodeError): """The provider behind opencode reported a usage/rate limit.""" def __init__( self, node_id: str, reset_time: datetime, raw_message: str = "", exit_code: int = -1, ): self.reset_time = reset_time super().__init__(node_id, "rate_limit", exit_code, raw_message) def __reduce__(self): return ( self.__class__, (self.node_id, self.reset_time, self.raw_message, self.exit_code), )
[docs] class AuthenticationError(OpenCodeError): """opencode has no/invalid credentials for the selected provider.""" def __init__(self, node_id: str, exit_code: int = -1, raw_message: str = ""): super().__init__(node_id, "authentication_failed", exit_code, raw_message) def __reduce__(self): return (self.__class__, (self.node_id, self.exit_code, self.raw_message))
[docs] class BillingError(OpenCodeError): """The provider account has a billing/payment problem.""" def __init__(self, node_id: str, exit_code: int = -1, raw_message: str = ""): super().__init__(node_id, "billing_error", exit_code, raw_message) def __reduce__(self): return (self.__class__, (self.node_id, self.exit_code, self.raw_message))
[docs] class InvalidRequestError(OpenCodeError): """Malformed request — bad model string, invalid config, unknown agent, etc.""" def __init__(self, node_id: str, exit_code: int = -1, raw_message: str = ""): super().__init__(node_id, "invalid_request", exit_code, raw_message) def __reduce__(self): return (self.__class__, (self.node_id, self.exit_code, self.raw_message))
[docs] class ServerError(OpenCodeError): """Transient provider/server-side failure (5xx, overloaded, connection).""" def __init__( self, node_id: str, exit_code: int = -1, raw_message: str = "", retry_after: Optional[int] = None, ): self.retry_after = retry_after super().__init__(node_id, "server_error", exit_code, raw_message) def __reduce__(self): return ( self.__class__, (self.node_id, self.exit_code, self.raw_message, self.retry_after), )
[docs] class MaxOutputTokensError(OpenCodeError): """The response was truncated at the output token limit.""" def __init__( self, node_id: str, exit_code: int = -1, raw_message: str = "", partial_text: str = "", ): self.partial_text = partial_text super().__init__(node_id, "max_output_tokens", exit_code, raw_message) def __reduce__(self): return ( self.__class__, (self.node_id, self.exit_code, self.raw_message, self.partial_text), )
[docs] class UnknownOpenCodeError(OpenCodeError): """Unclassified opencode CLI error.""" def __init__( self, node_id: str, exit_code: int = -1, raw_message: str = "", stderr: str = "", ): self.stderr = stderr super().__init__(node_id, "unknown", exit_code, raw_message) def __reduce__(self): return ( self.__class__, (self.node_id, self.exit_code, self.raw_message, self.stderr), )
# --------------------------------------------------------------------------- # Session-id parser # --------------------------------------------------------------------------- _SESSION_ID_RE = re.compile(r"\bses_[A-Za-z0-9]+\b")
[docs] def parse_session_id(stdout: str) -> Optional[str]: """Pull the opencode session id out of ``run --format json`` stdout. Each event is ``{type, sessionID, part:{sessionID, ...}}``; the opening ``step_start`` line reliably carries it. Falls back to a regex scan if the JSON shape changes. """ for line in stdout.splitlines(): line = line.strip() if not line: continue try: event = json.loads(line) except json.JSONDecodeError: continue sid = event.get("sessionID") or (event.get("part") or {}).get("sessionID") if sid: return sid m = _SESSION_ID_RE.search(stdout) return m.group(0) if m else None
[docs] def parse_run_error(stdout: str) -> Optional[dict]: """Pull the first structured error out of ``run --format json`` stdout. opencode emits ``{"type":"error", "sessionID":..., "error":{name, data}}`` events for failures that happen before/around the model request — notably an unknown model id, which surfaces as ``{"name":"UnknownError","data":{"message": "Model not found: ..."}}``. These never reach the session export's ``messages[].info.error`` because no assistant message is ever created, so the run stream is the *only* place they appear (confirmed against opencode 1.15.13). Returns the first such ``error`` dict (``{name, data}``), or ``None``. Genuine provider errors (e.g. APIError 401) appear in *both* the run stream and the export; the export copy is richer (full ``responseHeaders``), so callers prefer it and use this only as a fallback. """ for line in stdout.splitlines(): line = line.strip() if not line: continue try: event = json.loads(line) except json.JSONDecodeError: continue if event.get("type") == "error": err = event.get("error") if isinstance(err, dict) and "name" in err: return err return None
[docs] class OpenCodeLLM(LLMCallBase): """Wraps the ``opencode`` CLI as an LLM backend. Each :meth:`prompt` call runs ``opencode run`` (to create a session and get its id) then ``opencode export`` (to read the assistant response + usage from opencode's local DB). Returns the same :class:`QueryResult` shape as the other backends; ``returncode`` is the ``run`` exit code. """ def __init__( self, model: Optional[str] = None, system_message: str = "", timeout_seconds: int = 600, retries: int = 3, logging_name: str = "opencode", logging_level: int = logging.DEBUG, log_dir: Optional[str] = None, opencode_bin: str = "opencode", agent_name: str = "chia", work_dir: Optional[str] = None, extra_cli_args: Optional[List[str]] = None, ): super().__init__(system_message=system_message) self.logging_level = logging_level self.logging_name = logging_name self.retries = retries self.timeout_seconds = timeout_seconds self.model = model self.opencode_bin = opencode_bin self.agent_name = agent_name self.work_dir = work_dir self.extra_cli_args = extra_cli_args or [] self.logger = logging.getLogger(logging_name) self._last_metadata: dict = {} self._last_export_error: Optional[dict] = None self.logger.warning( "OpenCodeLLM is experimental: only exercised by unit tests so far, " "not validated in production." ) # opencode falls back to its own configured default model when none is # passed on the CLI, so model is optional here — just say so. if self.model is None: self.logger.info( "OpenCodeLLM: no model specified; opencode will use its " "configured default model." ) self._log_dir = log_dir if log_dir is not None: os.makedirs(log_dir, exist_ok=True) run_id = datetime.now().strftime("%Y%m%d_%H%M%S") self._log_prefix = os.path.join(log_dir, f"{logging_name}_{run_id}") else: self._log_prefix = None # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] @ChiaFunction(resources={"opencode_creds": 0.01}) def prompt( self, user_message: str, tools: Optional[List[ChiaTool]] = [], ) -> QueryResult: """Send *user_message* to opencode and return the response. Returns: :class:`QueryResult` with ``success=True`` when opencode ran cleanly, or ``success=False`` when every retry attempt failed. Raises: RateLimitError / AuthenticationError / BillingError / InvalidRequestError: propagate immediately. ServerError: after all retries with exponential backoff. MaxOutputTokensError: after one retry attempt. """ import time as _time from chia.trace.profiler import get_profiler profiler = get_profiler() for attempt in range(self.retries): try: self._last_metadata = {} self._last_export_error = None cli = self._run_opencode(user_message, tools) self._last_metadata["model"] = self.model or "<opencode default>" self._last_metadata["tools"] = [ {"name": t.name, "hostname": getattr(t, "hostname", None), "port": getattr(t, "port", None), "node_id": getattr(t, "node_id", None)} for t in tools ] if profiler.enabled and self._last_metadata: profiler.add_info(self._last_metadata) self._classify_error( cli, export_error=getattr(self, "_last_export_error", None), ) cli.success = True return cli # -- Never retry: propagate immediately -- except (RateLimitError, AuthenticationError, BillingError, InvalidRequestError): raise # -- Retry once: stochastic generation may produce shorter output -- except MaxOutputTokensError: if attempt == 0: self.logger.warning( "Max output tokens on attempt %d/%d, retrying once", attempt + 1, self.retries, ) continue raise # -- Retry with exponential backoff: transient service issue -- except ServerError: backoff = min(5 * 2 ** attempt, 60) self.logger.warning( "Server error on attempt %d/%d, backing off %ds", attempt + 1, self.retries, backoff, ) _time.sleep(backoff) except UnknownOpenCodeError as exc: self.logger.warning( "Unknown error on attempt %d/%d: %s", attempt + 1, self.retries, exc, ) except subprocess.TimeoutExpired: self.logger.warning( "Timeout on attempt %d/%d", attempt + 1, self.retries, ) except Exception as exc: self.logger.warning( "Unexpected error on attempt %d/%d: %s", attempt + 1, self.retries, exc, ) return QueryResult(result="", returncode=-1, stderr="", stream_result="", success=False)
def _get_node_id(self) -> str: try: return ray.get_runtime_context().get_node_id() except Exception: return "unknown" def _classify_error(self, cli: QueryResult, export_error: Optional[dict] = None) -> None: """Inspect *cli* and *export_error* and raise a typed error if wrong. ``opencode run`` almost always exits 0 even on failure (opencode bug #14551), so the exit code alone is not trustworthy. Logic: 1. Clean run (exit 0, non-empty result, no structured error) -> return. 2. Structured ``export_error`` (a ``{name, data}`` object from the session export or run stream) -> map to a typed error. This is the only reliable signal and the only thing we classify on. 3. Any other failure (no structured error: a CLI/process-level failure on stderr, or an empty response) -> :class:`UnknownOpenCodeError` with the raw stderr attached. We deliberately do NOT keyword-match stderr: opencode emits its real errors as structured JSON (handled by (2)), so stderr only carries CLI/usage text, and guessing a type from it was imprecise — better to surface it honestly as unknown. Note the guard requires ``not export_error``: a structured error is honored even on an exit-0 run that returned partial text. """ if cli.returncode == 0 and cli.result and not export_error: return node_id = self._get_node_id() # -- Path A: structured error from the session export (preferred) -- if export_error: name = export_error.get("name", "") data = export_error.get("data", {}) or {} message = data.get("message", "") or "" status = data.get("statusCode") # Rate limit — honor the provider's Retry-After when present. if status == 429: headers = data.get("responseHeaders", {}) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") reset_time = datetime.now(timezone.utc) + timedelta(seconds=60) if retry_after: try: reset_time = datetime.now(timezone.utc) + timedelta( seconds=int(retry_after), ) except (ValueError, TypeError): pass raise RateLimitError( node_id=node_id, reset_time=reset_time, raw_message=message, exit_code=cli.returncode, ) # Authentication. if name == "ProviderAuthError" or status in (401, 403): raise AuthenticationError(node_id, cli.returncode, message) # Billing / quota — APIError whose message names a billing problem. if name == "APIError" and message: if any(kw in message.lower() for kw in ( "billing", "quota", "payment", "credit", "subscription", "plan", )): raise BillingError(node_id, cli.returncode, message) # Output token limit / context overflow. if name in ("ContextOverflowError", "MessageOutputLengthError"): raise MaxOutputTokensError( node_id, cli.returncode, message, partial_text=cli.result, ) # Server error (5xx or explicitly retryable) vs. invalid request. if name == "APIError": if (status and status >= 500) or data.get("isRetryable"): raise ServerError( node_id, exit_code=cli.returncode, raw_message=message, ) raise InvalidRequestError(node_id, cli.returncode, message) # Any other structured error (MessageAbortedError, UnknownError, ...). raise UnknownOpenCodeError( node_id, cli.returncode, message or str(export_error), stderr=cli.stderr, ) # No structured error: a CLI/process-level failure (its message is on # stderr) or an empty response. opencode surfaces its real errors as # structured JSON (handled above), so there's nothing reliable to # classify here — report it honestly as unknown with the stderr attached. raise UnknownOpenCodeError( node_id, cli.returncode, cli.stderr[:300] or "empty response", stderr=cli.stderr, ) # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _build_config(self, tools: List[ChiaTool]) -> dict: """Build the opencode config (written to OPENCODE_CONFIG). Defines the ``chia`` agent carrying our system prompt, and one remote MCP server per ChiaTool. """ cfg: dict = { "$schema": "https://opencode.ai/config.json", "agent": { self.agent_name: { "mode": "primary", "prompt": self.system_message or "You are a helpful assistant.", } }, } if tools: mcp: dict = {} for tool in tools: port = getattr(tool, "port", 8000) mcp[tool.name] = { "type": "remote", "url": f"http://{tool.hostname}:{port}/{tool.name}/mcp", "enabled": True, } cfg["mcp"] = mcp return cfg def _build_run_cmd(self, user_message: str) -> list: """Build the ``opencode run`` command list (message is a positional arg).""" cmd = [ self.opencode_bin, "run", "--format", "json", "--agent", self.agent_name, "--dangerously-skip-permissions", ] if self.model: # omit --model so opencode uses its configured default cmd += ["--model", self.model] if self.work_dir: cmd += ["--dir", self.work_dir] if self.extra_cli_args: cmd += self.extra_cli_args cmd.append(user_message) return cmd def _run_opencode( self, user_message: str, tools: Optional[List[ChiaTool]] = None, ) -> QueryResult: """Run ``opencode run`` then ``opencode export`` and assemble a QueryResult.""" tools = tools or [] cfg = self._build_config(tools) tmp = tempfile.NamedTemporaryFile( mode="w", suffix=".json", prefix="opencode_cfg_", delete=False ) json.dump(cfg, tmp) tmp.close() cfg_path = tmp.name # opencode picks up our config via OPENCODE_CONFIG; disable project # config so a stray opencode.json in cwd can't shadow it. Stored # credentials / provider env vars in the inherited environment provide # auth (we don't touch them). env = dict(os.environ) env["OPENCODE_CONFIG"] = cfg_path env["OPENCODE_DISABLE_PROJECT_CONFIG"] = "1" run_cmd = self._build_run_cmd(user_message) self.logger.info("Running: %s ...", " ".join(run_cmd[:6])) try: # Capture via a file, not a pipe: a large run stream would otherwise # be truncated at 64 KiB (see _capture / module docstring), which can # drop the trailing error events parse_run_error looks for. run = self._capture(run_cmd, env) finally: try: os.unlink(cfg_path) except OSError: pass session_id = parse_session_id(run.stdout) # Errors that happen before an assistant message exists (e.g. unknown # model) only appear as `type:"error"` events in the run stream, never # in the export — capture them here so they can be classified too. run_error = parse_run_error(run.stdout) # A failed run (non-zero, or no session created) → return so the caller # classifies. Surface any run-stream error so it isn't lost. Don't # attempt an export without a session id. if run.returncode != 0 or session_id is None: if run.returncode != 0: self.logger.warning( "opencode run exited %d: %s", run.returncode, run.stderr[:500] ) self._last_export_error = run_error return QueryResult( result="", returncode=run.returncode if run.returncode != 0 else -1, stderr=run.stderr or "no session id in opencode output", stream_result=run.stdout, ) export = self._run_export(session_id, env) final_text, meta, stream, export_error = self._extract_from_export(export) self._last_metadata = meta # Prefer the export's error (richer — full responseHeaders); fall back to # the run-stream error for pre-request failures the export never records. self._last_export_error = export_error or run_error if self._log_prefix is not None: truncated = user_message[:500] + ("..." if len(user_message) > 500 else "") with open(f"{self._log_prefix}.log", "a") as f: f.write("=" * 80 + "\n") f.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] session {session_id}\n") f.write("=" * 80 + "\n\n") f.write(f"[User Message]\n{truncated}\n\n") f.write(stream) f.write("-" * 80 + "\n\n") return QueryResult( result=final_text, returncode=0, stderr=run.stderr, stream_result=stream, ) def _capture(self, cmd: list, env: dict) -> SimpleNamespace: """Run *cmd* capturing stdout to a temp FILE and return it. Returns a ``SimpleNamespace(returncode, stdout, stderr)`` (the same shape ``subprocess.run`` would, so callers read ``.stdout`` etc. unchanged). Why a file instead of ``capture_output=True``: opencode truncates its stdout at the OS pipe buffer (64 KiB on Linux) and still exits 0 when stdout is a pipe, so large ``run`` streams / ``export`` payloads come back cut mid-JSON and unparseable. A regular file has no such limit. stderr is small, so it stays on a pipe. ``stdin=DEVNULL`` because ``run`` blocks on an open stdin pipe. ``subprocess.TimeoutExpired`` propagates to the caller. """ tmp = tempfile.NamedTemporaryFile( mode="w", suffix=".out", prefix="opencode_out_", delete=False ) out_path = tmp.name tmp.close() try: with open(out_path, "w") as out_fh: proc = subprocess.run( cmd, stdin=subprocess.DEVNULL, stdout=out_fh, stderr=subprocess.PIPE, text=True, timeout=self.timeout_seconds, env=env, ) with open(out_path, "r") as in_fh: stdout = in_fh.read() return SimpleNamespace( returncode=proc.returncode, stdout=stdout, stderr=proc.stderr or "" ) finally: try: os.unlink(out_path) except OSError: pass def _run_export(self, session_id: str, env: dict) -> dict: """``opencode export <id>`` → parsed session JSON (``{}`` on failure).""" cmd = [self.opencode_bin, "export", session_id] try: proc = self._capture(cmd, env) except subprocess.TimeoutExpired: self.logger.warning("opencode export timed out for %s", session_id) return {} if proc.returncode != 0: self.logger.warning( "opencode export exited %d: %s", proc.returncode, proc.stderr[:300] ) return {} try: return json.loads(proc.stdout) except json.JSONDecodeError: self.logger.warning("opencode export returned non-JSON for %s", session_id) return {} def _extract_from_export(self, export: dict): """Pull final assistant text, usage metadata, a stream trace, and any error. Export shape:: {info, messages: [{info:{role, tokens, cost, error}, parts: [...]}]} Parts: ``{type:"text", text}``, ``{type:"reasoning", text}``, ``{type:"tool", tool, state:{status, input, output}}``, plus ``step-start``/``step-finish``. The final answer is the text of the last assistant message; tokens/cost are summed across assistant messages. Returns: ``(text, metadata, stream, export_error)`` where *export_error* is the ``{name, data}`` dict from the first assistant message carrying an ``info.error`` (opencode's discriminated error object), or ``None``. This matters because ``opencode run`` almost always exits 0 even on failure (opencode bug #14551), so the exit code alone can't be trusted — the structured error in the export is the reliable signal. """ stream_parts: list[str] = [] meta = {"input_tokens": 0, "output_tokens": 0, "reasoning_tokens": 0, "cache_read": 0, "cache_write": 0, "cost_usd": 0.0, "num_turns": 0} last_assistant_text = "" export_error = None for msg in export.get("messages", []) or []: info = msg.get("info", {}) if isinstance(msg, dict) else {} role = info.get("role") parts = msg.get("parts", []) if isinstance(msg, dict) else [] if role == "assistant": if export_error is None: err = info.get("error") if isinstance(err, dict) and "name" in err: export_error = err meta["num_turns"] += 1 tok = info.get("tokens") or {} meta["input_tokens"] += tok.get("input", 0) or 0 meta["output_tokens"] += tok.get("output", 0) or 0 meta["reasoning_tokens"] += tok.get("reasoning", 0) or 0 cache = tok.get("cache") or {} meta["cache_read"] += cache.get("read", 0) or 0 meta["cache_write"] += cache.get("write", 0) or 0 meta["cost_usd"] += info.get("cost", 0) or 0 turn_text: list[str] = [] for p in parts: if not isinstance(p, dict): continue ptype = p.get("type") if ptype == "text": txt = p.get("text", "") turn_text.append(txt) stream_parts.append(f"[Response]\n{txt}\n\n") elif ptype == "reasoning": stream_parts.append(f"[Thinking]\n{p.get('text', '')}\n\n") elif ptype == "tool": state = p.get("state") or {} args = json.dumps(state.get("input", {})) if len(args) > 2000: args = args[:2000] + "\n... [truncated]" stream_parts.append( f"[Tool Call: {p.get('tool', 'unknown')}]\nArgs: {args}\n\n" ) out = state.get("output", "") if not isinstance(out, str): out = json.dumps(out) if len(out) > 2000: out = out[:2000] + "\n... [truncated]" if out: stream_parts.append(f"[Tool Result]\n{out}\n\n") if turn_text: last_assistant_text = "".join(turn_text) meta = {k: v for k, v in meta.items() if v} return last_assistant_text, meta, "".join(stream_parts), export_error