Source code for chia.vlsi.hammer

"""chia.vlsi.hammer — Hammer-vlsi nodes.

:meth:`HammerNode.run` wraps one ``hammer-vlsi`` CLI call. Hammer's CLI is
uniform across actions (syn, par, drc, lvs, sim, power, and the *-to-* bridge
actions): configs in via repeated ``-p``, an ``--obj_dir`` for build output,
an ``-o`` output config that feeds the next action, and the action name. One
node therefore covers the whole flow, with the action as a parameter.

obj_dir is PATH-BASED: it lives on the worker that ran the action, so chained
actions (syn -> syn-to-par -> par) and report fetches
(:meth:`HammerNode.collect`) must land on the SAME worker. :class:`HammerNode`
enforces that via a placement group (see :class:`chia.base.colocated
.ColocatedNode` for the given / reserved / no-PG construction modes).
``HammerNode.<fn>.chia_remote(...)`` (the class attribute) is the raw,
unpinned form for callers that handle placement themselves.

This module knows nothing about technologies, tools, or sites — all of that
arrives via the project's configs.
"""

import glob as _glob
import json
import logging
import os
import signal
import subprocess
from dataclasses import dataclass, field

from chia.base.ChiaFunction import ChiaFunction
from chia.base.colocated import ColocatedNode

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------

[docs] @dataclass class HammerResult: success: bool returncode: int action: str obj_dir: str # on the worker that ran the action stdout: str stderr: str # Parsed contents of the action's ``-o`` output config. Feed it to the # next action (syn -> syn-to-par -> par) via ``config_contents``. output: dict = field(default_factory=dict) # Manifest of every file under obj_dir after the run: relative path -> # size in bytes. Contents stay on the worker; fetch them selectively with # HammerNode.collect pinned to the same bundle. listing: dict[str, int] = field(default_factory=dict)
[docs] @dataclass class HammerCollectResult: obj_dir: str files: dict[str, str] # relpath -> text contents (errors="replace") skipped: dict[str, int] # matched but over max_bytes_per_file; size shown listing: dict[str, int] # fresh manifest of obj_dir at collect time
[docs] @dataclass class HammerMatchResult: obj_dir: str matches: list[tuple[str, int]] # (relpath, size) within the cap, first-seen order skipped: dict[str, int] # matched but over max_bytes_per_file; size shown
[docs] @dataclass class HammerCollectFsResult: obj_dir: str # source dir, on the worker dest_dir: str # where files were written, on the CALLER's disk copied: dict[str, int] # relpath -> bytes written; file is at dest_dir/<relpath> skipped: dict[str, int] # matched but over max_bytes_per_file; size shown
# --------------------------------------------------------------------------- # Worker-side helpers (module-level so they resolve by import on the worker) # --------------------------------------------------------------------------- def _list_files(obj_dir: str) -> dict[str, int]: """Manifest of every file under obj_dir: relative path -> size in bytes.""" listing: dict[str, int] = {} for root, _dirs, names in os.walk(obj_dir): for name in names: path = os.path.join(root, name) try: listing[os.path.relpath(path, obj_dir)] = os.path.getsize(path) except OSError: pass # dangling symlink etc. return listing def _match_files( base_dir: str, patterns: list[str], max_bytes_per_file: int | None, ) -> tuple[list[tuple[str, str, int]], dict[str, int]]: """Resolve *patterns* (globs relative to *base_dir*, ``**`` recursive) to matching files. Returns ``(matches, skipped)`` where ``matches`` is a list of ``(relpath, abspath, size)`` for files within the size cap — deduped across overlapping patterns, in first-seen order — and ``skipped`` maps relpath -> size for files matched but over ``max_bytes_per_file`` (``None`` or 0 disables the cap). Shared by :meth:`HammerNode.collect` and :meth:`HammerNode.collect_fs` so their glob/dedup/cap semantics are identical. """ matches: list[tuple[str, str, int]] = [] skipped: dict[str, int] = {} seen: set[str] = set() for pattern in patterns: for path in _glob.glob(os.path.join(base_dir, pattern), recursive=True): if not os.path.isfile(path): continue rel = os.path.relpath(path, base_dir) if rel in seen: continue try: size = os.path.getsize(path) except OSError: continue seen.add(rel) if max_bytes_per_file and (size > max_bytes_per_file): skipped[rel] = size continue matches.append((rel, path, size)) return matches, skipped # --------------------------------------------------------------------------- # HammerNode # ---------------------------------------------------------------------------
[docs] class HammerNode(ColocatedNode): """Hammer run / collect primitives sharing one placement. The members are ``@staticmethod @ChiaFunction(resources={"hammer": 1})``; ``__init__`` re-binds each into a per-instance pinned form so ``node.<fn>.chia_remote(...)`` lands on this node's bundle:: with HammerNode() as node: # reserves a {"CPU": 1, "hammer": 1} PG syn = get(node.run.chia_remote( "syn", configs=[...], obj_dir="/scratch/build/run1")) rpts = get(node.collect.chia_remote( syn.obj_dir, ["syn-rundir/reports/**"])) par = get(node.run.chia_remote("syn-to-par", ...)) # same worker ``collect`` requires the default bundle's hammer slot, so a collect dispatched while a run executes on this bundle waits for it in the default case — it cannot read half-written reports. """ # Pinned @ChiaFunction members. collect_fs is intentionally absent: it is a # caller-side orchestrator (a plain method) that drives list_matches / # read_chunk, not a task that runs on the worker. _MEMBER_FNS = ("run", "collect", "list_matches", "read_chunk") _DEFAULT_BUNDLE = {"CPU": 1, "hammer": 1}
[docs] @staticmethod @ChiaFunction(resources={"hammer": 1}) def run( action: str, configs: list[str] | None = None, config_contents: dict[str, str] | None = None, obj_dir: str = "build", extra_args: list[str] | None = None, hammer_bin: str = "hammer-vlsi", timeout_seconds: int = 86400, ) -> HammerResult: """Run one ``hammer-vlsi`` action on a worker. Args: action: Any action hammer-vlsi accepts: "syn", "par", "syn-to-par", ... configs: Paths to config YAML/JSON files that exist *on the worker* (baked into the image or mounted), passed as ``-p`` in order (later files override earlier ones). config_contents: filename -> YAML/JSON text, written into ``obj_dir/configs/`` on the worker and appended as ``-p`` after ``configs``. This is how a flow ships configs (or a previous action's ``output``) by value to a remote worker. obj_dir: Hammer build directory on the worker. extra_args: Extra CLI args, inserted before the action. hammer_bin: The hammer executable, or a custom CLIDriver script. timeout_seconds: Wall-clock limit for the subprocess. """ obj_dir = os.path.abspath(obj_dir) os.makedirs(obj_dir, exist_ok=True) config_args = [] for path in configs or []: config_args += ["-p", path] if config_contents: staged = os.path.join(obj_dir, "configs") os.makedirs(staged, exist_ok=True) for filename, text in config_contents.items(): path = os.path.join(staged, filename) with open(path, "w") as f: f.write(text) config_args += ["-p", path] output_json = os.path.join(obj_dir, f"{action}-output.json") cmd = [hammer_bin, *config_args, "--obj_dir", obj_dir, "-o", output_json, *(extra_args or []), action] logger.info(f"Running: {' '.join(cmd)}") # start_new_session puts the whole tool tree in one process group; # chia's pid_registry tracks the pgid so chia_cancel() can kill it. proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, start_new_session=True) try: stdout, stderr = proc.communicate(timeout=timeout_seconds) except subprocess.TimeoutExpired: os.killpg(proc.pid, signal.SIGKILL) stdout, stderr = proc.communicate() stderr += f"\nhammer {action} timed out after {timeout_seconds}s" logger.error(f"hammer {action} timed out after {timeout_seconds}s") output = {} if proc.returncode == 0 and os.path.isfile(output_json): with open(output_json) as f: output = json.load(f) if proc.returncode != 0: logger.error(f"hammer {action} failed (rc={proc.returncode}); " f"stderr tail: {stderr[-500:] if stderr else '(empty)'}") return HammerResult( success=proc.returncode == 0, returncode=proc.returncode, action=action, obj_dir=obj_dir, stdout=stdout, stderr=stderr, output=output, listing=_list_files(obj_dir), )
[docs] @staticmethod @ChiaFunction(resources={"hammer": 1}) def collect( obj_dir: str, patterns: list[str], max_bytes_per_file: int | None = None, ) -> HammerCollectResult: """Fetch text files from a previous action's obj_dir on this worker. Dispatch via the pinned instance member (``node.collect.chia_remote``) so it lands on the worker that owns obj_dir — an unpinned call may not. Args: obj_dir: The build directory a previous action ran in. patterns: Globs relative to obj_dir (``**`` is recursive), e.g. ``["syn-rundir/reports/**", "syn-rundir/*.log"]``. Files matched by multiple patterns appear once. max_bytes_per_file: When set, files over this size are recorded in ``skipped`` instead of shipped through the object store — protects against a glob accidentally matching a netlist. ``None`` (and 0, the falsy edge) means no cap: everything matched is shipped. """ obj_dir = os.path.abspath(obj_dir) matches, skipped = _match_files(obj_dir, patterns, max_bytes_per_file) files: dict[str, str] = {} for rel, path, _size in matches: with open(path, errors="replace") as f: files[rel] = f.read() if skipped: logger.warning( f"hammer collect skipped {len(skipped)} file(s) over " f"{max_bytes_per_file} bytes: {sorted(skipped)[:5]}" ) return HammerCollectResult( obj_dir=obj_dir, files=files, skipped=skipped, listing=_list_files(obj_dir), )
[docs] @staticmethod @ChiaFunction(resources={"hammer": 1}) def list_matches( obj_dir: str, patterns: list[str], max_bytes_per_file: int | None = None, ) -> HammerMatchResult: """Resolve *patterns* against obj_dir on the worker and return the manifest of matching files (relpath + size) without reading contents. The planning half of :meth:`collect_fs`: it tells the caller what to stream and how big each file is. Same glob/dedup/cap rules as :meth:`collect`. """ obj_dir = os.path.abspath(obj_dir) matches, skipped = _match_files(obj_dir, patterns, max_bytes_per_file) return HammerMatchResult( obj_dir=obj_dir, matches=[(rel, size) for rel, _path, size in matches], skipped=skipped, )
[docs] @staticmethod @ChiaFunction(resources={"hammer": 1}) def read_chunk(obj_dir: str, rel: str, offset: int, length: int) -> bytes: """Read ``length`` bytes at ``offset`` from ``obj_dir/rel`` on the worker. The transfer primitive behind :meth:`collect_fs`; returns ``b""`` at or past EOF. ``rel`` is confined to obj_dir.""" obj_dir = os.path.abspath(obj_dir) path = os.path.abspath(os.path.join(obj_dir, rel)) if path != obj_dir and not path.startswith(obj_dir + os.sep): raise ValueError(f"rel {rel!r} escapes obj_dir") with open(path, "rb") as f: f.seek(offset) return f.read(length)
[docs] def collect_fs( self, obj_dir: str, patterns: list[str], dest_dir: str, max_bytes_per_file: int | None = None, chunk_bytes: int = 16 * 1024 * 1024, ) -> HammerCollectFsResult: """Stream matching files from a previous action's obj_dir onto the filesystem of THIS (the calling) process, a chunk at a time. Unlike :meth:`collect`, which returns every file's contents in one object-store payload, this writes each file to ``dest_dir`` on the caller's local disk incrementally — peak memory is ~``chunk_bytes``, not the size of the whole collection. Use it to pull large report trees / gate-level netlists back from the worker to a machine that does NOT share a filesystem with it. This is a caller-side orchestrator, not a ``@ChiaFunction``: it runs wherever you call it and writes to that machine's disk, pulling bytes from the obj_dir worker via the node's pinned :meth:`list_matches` / :meth:`read_chunk` members. It therefore needs a placement group so both members hit the one worker that owns obj_dir — construct the node with ``require_colocated=True`` or pass ``placement_group=...``. Files keep their path relative to obj_dir: a match at ``obj_dir/syn-rundir/reports/x.rpt`` lands at ``dest_dir/syn-rundir/reports/x.rpt``. Args: obj_dir: The build directory a previous action ran in (on the worker). patterns: Globs relative to obj_dir (``**`` recursive); same matching/dedup/cap rules as :meth:`collect`. dest_dir: Destination directory on the calling machine; created as needed. max_bytes_per_file: When set, files over this size are recorded in ``skipped`` and not streamed. ``None`` (and 0) means no cap. chunk_bytes: Bytes per worker read — the memory bound per file. """ from chia.base.ChiaFunction import get if not self._sched_opts: raise RuntimeError( "collect_fs needs a placement group so list_matches and " "read_chunk hit the same worker; construct HammerNode with " "require_colocated=True or pass placement_group=..." ) dest_dir = os.path.abspath(dest_dir) manifest = get(self.list_matches.chia_remote( obj_dir, patterns, max_bytes_per_file)) copied: dict[str, int] = {} for rel, _size in manifest.matches: dst = os.path.join(dest_dir, rel) os.makedirs(os.path.dirname(dst), exist_ok=True) written = 0 with open(dst, "wb") as out: while True: data = get(self.read_chunk.chia_remote( manifest.obj_dir, rel, written, chunk_bytes)) if not data: break out.write(data) written += len(data) del data # free the chunk before fetching the next copied[rel] = written if manifest.skipped: logger.warning( f"hammer collect_fs skipped {len(manifest.skipped)} file(s) over " f"{max_bytes_per_file} bytes: {sorted(manifest.skipped)[:5]}" ) logger.info(f"hammer collect_fs streamed {len(copied)} file(s) to {dest_dir}") return HammerCollectFsResult( obj_dir=manifest.obj_dir, dest_dir=dest_dir, copied=copied, skipped=manifest.skipped, )