Source code for chia.aws.s3

"""S3 object-store client bound to one bucket, exposed as a chia service-pattern node.

binds to one
bucket at construction and exposes a small, synchronous API. Errors are raised
as typed exceptions (no in-band ``success: bool``); see ``S3Error`` and its
subclasses. Transient failures (5xx, throttling, network errors) are retried
once before raising.

Usage::

    from chia.aws.s3 import S3Node, S3NotFoundError

    node = S3Node("my-bucket", region="us-west-2")
    node.put_bytes("results/run1.json", b"{}")
    data = node.get_bytes("results/run1.json")
    node.upload_file("/tmp/waves.vcd", "waves/run1.vcd")

For anything beyond this surface (presigned URLs, multipart tuning, ...),
drop down to the raw boto3 client via ``node._client``.
"""

from __future__ import annotations

import logging
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable

import boto3
from boto3.exceptions import RetriesExceededError, S3UploadFailedError
from botocore.config import Config as BotoConfig
from botocore.exceptions import (
    ClientError,
    ConnectionError as BotoConnectionError,
    HTTPClientError,
    NoCredentialsError,
)

from chia.cluster.log import get_logger


# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------

[docs] class S3Error(Exception): """Base class for all S3 node errors. ``code`` is the AWS error code, if any.""" def __init__(self, message: str = "", code: str = ""): self.code = code super().__init__(message)
[docs] class S3AuthError(S3Error): """Missing/invalid credentials, or access denied (401/403/AccessDenied)."""
[docs] class S3NotFoundError(S3Error): """Object or bucket does not exist (404/NoSuchKey/NoSuchBucket)."""
[docs] class S3RequestError(S3Error): """Non-transient client-side error (other 4xx, e.g. BucketAlreadyExists)."""
[docs] class S3ServerError(S3Error): """5xx/throttling after one retry, or a network/timeout failure."""
# HEAD requests (head_object/head_bucket) have no error body, so botocore # reports bare numeric codes ("404", "403") — match those alongside the # named codes. _NOT_FOUND_CODES = frozenset({"NoSuchKey", "NoSuchBucket", "404", "NotFound"}) _AUTH_CODES = frozenset({ "AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch", "ExpiredToken", "TokenRefreshRequired", "403", "Forbidden", }) _TRANSIENT_CODES = frozenset({ "InternalError", "ServiceUnavailable", "SlowDown", "RequestTimeout", "Throttling", "ThrottlingException", "RequestLimitExceeded", "RequestTimeTooSkewed", }) # --------------------------------------------------------------------------- # Node # ---------------------------------------------------------------------------
[docs] @dataclass class S3ObjectInfo: """Serializable metadata for one S3 object (one ``list`` result row).""" key: str size: int last_modified: str # ISO-8601 string etag: str # surrounding quotes stripped
[docs] class S3Node: """Client for one S3 bucket. Service-pattern node (head-node only, not a Ray task). Bind to one bucket at construction; all methods are synchronous and raise typed exceptions on failure. Credentials come from the default boto3 chain unless overridden — see ``__init__`` for the resolution order. """ logging_name = "S3Node" def __init__( self, bucket: str, region: str | None = None, profile: str | None = None, aws_access_key_id: str | None = None, aws_secret_access_key: str | None = None, aws_session_token: str | None = None, timeout_seconds: int = 60, logging_level: int = logging.DEBUG, ): """Bind to ``bucket``. Credential resolution, in order of precedence: 1. Explicit keys — pass ``aws_access_key_id`` and ``aws_secret_access_key`` together (plus ``aws_session_token`` for temporary/STS credentials). Use this to ship credentials by value into an environment with no ``~/.aws`` or instance role, e.g. a docker worker. key, secret, token = load_aws_creds(creds_dir) # on the head node node = S3Node(bucket, # on the worker aws_access_key_id=key, aws_secret_access_key=secret, aws_session_token=token or None) Empty strings are treated as "not provided" (so a blank ``load_aws_creds`` tuple falls through to the default chain), but passing only one of key/secret, a token without both keys, or explicit keys together with ``profile`` raises ``ValueError``. 2. ``profile`` — a named profile from ``~/.aws``. 3. Neither — boto3's default chain (env vars, ``~/.aws``, instance metadata / IAM role). The right choice on the head node and on EC2. Credentials are resolved lazily by boto3: nothing is validated here, and missing/invalid credentials surface as ``S3AuthError`` on the first call. """ aws_access_key_id = aws_access_key_id or None aws_secret_access_key = aws_secret_access_key or None aws_session_token = aws_session_token or None if (aws_access_key_id is None) != (aws_secret_access_key is None): raise ValueError( "aws_access_key_id and aws_secret_access_key must be " "provided together") if aws_session_token and aws_access_key_id is None: raise ValueError( "aws_session_token requires aws_access_key_id and " "aws_secret_access_key") if profile and aws_access_key_id: raise ValueError( "pass either profile or explicit aws_* keys, not both") self.bucket = bucket self._session = boto3.session.Session( profile_name=profile, region_name=region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token, ) self.region = self._session.region_name or "us-east-1" self.logger = get_logger("aws.s3") self.logger.setLevel(logging_level) # botocore's built-in retries are disabled so the node's explicit # retry-once below is the only retry policy in play. self._client = self._session.client( "s3", config=BotoConfig( connect_timeout=timeout_seconds, read_timeout=timeout_seconds, retries={"max_attempts": 1, "mode": "standard"}, ), ) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def upload_file(self, local_path: str | Path, key: str) -> None: """Upload a local file to ``key`` (multipart for large files).""" local = Path(local_path) self._call( f"upload_file {local} -> s3://{self.bucket}/{key}", self._client.upload_file, str(local), self.bucket, key, )
[docs] def download_file(self, key: str, local_path: str | Path) -> None: """Download ``key`` to a local path, creating parent directories.""" local = Path(local_path) local.parent.mkdir(parents=True, exist_ok=True) self._call( f"download_file s3://{self.bucket}/{key} -> {local}", self._client.download_file, self.bucket, key, str(local), )
[docs] def put_bytes(self, key: str, data: bytes, content_type: str | None = None) -> None: """Write ``data`` to ``key``.""" kwargs: dict[str, Any] = {"Bucket": self.bucket, "Key": key, "Body": data} if content_type is not None: kwargs["ContentType"] = content_type self._call(f"put_object s3://{self.bucket}/{key}", self._client.put_object, **kwargs)
[docs] def get_bytes(self, key: str) -> bytes: """Return the contents of ``key``.""" def _get() -> bytes: resp = self._client.get_object(Bucket=self.bucket, Key=key) return resp["Body"].read() return self._call(f"get_object s3://{self.bucket}/{key}", _get)
[docs] def list(self, prefix: str = "", max_keys: int | None = None) -> list[S3ObjectInfo]: """List objects under ``prefix``, up to ``max_keys`` (None = all). Includes zero-byte ``.../`` folder-marker keys if present; callers that don't want them should filter on ``key.endswith("/")``. """ def _list() -> list[S3ObjectInfo]: paginator = self._client.get_paginator("list_objects_v2") out: list[S3ObjectInfo] = [] for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix): for obj in page.get("Contents", []): out.append(S3ObjectInfo( key=obj["Key"], size=obj["Size"], last_modified=obj["LastModified"].isoformat(), etag=obj["ETag"].strip('"'), )) if max_keys is not None and len(out) >= max_keys: return out return out return self._call(f"list_objects_v2 s3://{self.bucket}/{prefix}", _list)
[docs] def exists(self, key: str) -> bool: """Return whether ``key`` exists. A 403 raises ``S3AuthError`` rather than reporting ``False``.""" try: self._call(f"head_object s3://{self.bucket}/{key}", self._client.head_object, Bucket=self.bucket, Key=key) except S3NotFoundError: return False return True
[docs] def delete(self, key: str) -> None: """Delete ``key``. Idempotent — S3 does not error on a missing key.""" self._call(f"delete_object s3://{self.bucket}/{key}", self._client.delete_object, Bucket=self.bucket, Key=key)
[docs] def ensure_bucket(self) -> bool: """Create the bound bucket if it does not exist. Returns True if the bucket was created, False if it already existed. Raises ``S3AuthError`` if the bucket exists but is not accessible. """ try: self._call(f"head_bucket s3://{self.bucket}", self._client.head_bucket, Bucket=self.bucket) return False except S3NotFoundError: pass self.logger.info("Creating S3 bucket: %s", self.bucket) create_kwargs: dict[str, Any] = {"Bucket": self.bucket} if self.region != "us-east-1": create_kwargs["CreateBucketConfiguration"] = { "LocationConstraint": self.region } try: self._call(f"create_bucket s3://{self.bucket}", self._client.create_bucket, **create_kwargs) except S3RequestError as exc: if exc.code == "BucketAlreadyOwnedByYou": return False # lost a creation race; the bucket exists raise return True
# ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _call(self, op: str, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Any: """Invoke a boto3 call, retrying once on transient failures and raising typed errors.""" attempts = 2 # one retry, mirroring GithubIssuesNode._request for attempt in range(attempts): try: return fn(*args, **kwargs) except NoCredentialsError as exc: raise S3AuthError(f"no AWS credentials found for {op}") from exc except S3UploadFailedError as exc: # The transfer manager wraps the original ClientError # (implicitly chained, so check __context__ too). cause = exc.__cause__ or exc.__context__ if isinstance(cause, ClientError): mapped = self._map_client_error(op, cause) else: mapped = S3ServerError(f"upload failed on {op}: {exc}") if isinstance(mapped, S3ServerError) and attempt + 1 < attempts: self.logger.warning( "Transient failure on %s (attempt %d), retrying: %s", op, attempt + 1, exc) time.sleep(2) continue raise mapped from exc except (BotoConnectionError, HTTPClientError, RetriesExceededError) as exc: if attempt + 1 < attempts: self.logger.warning( "Network error on %s (attempt %d), retrying: %s", op, attempt + 1, exc) time.sleep(2) continue raise S3ServerError(f"network error on {op}: {exc}") from exc except ClientError as exc: mapped = self._map_client_error(op, exc) if isinstance(mapped, S3ServerError) and attempt + 1 < attempts: self.logger.warning( "Transient %s on %s (attempt %d), retrying", mapped.code or "error", op, attempt + 1) time.sleep(2) continue raise mapped from exc raise S3ServerError(f"exhausted retries for {op}") # unreachable def _map_client_error(self, op: str, exc: ClientError) -> S3Error: """Map a botocore ClientError to the matching typed exception.""" err = exc.response.get("Error", {}) code = err.get("Code", "") message = err.get("Message", "") status = exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 0) detail = f"{code or status} on {op}: {message or exc}" if code in _NOT_FOUND_CODES or status == 404: return S3NotFoundError(detail, code=code) if code in _AUTH_CODES or status in (401, 403): return S3AuthError(detail, code=code) if code in _TRANSIENT_CODES or 500 <= status < 600: return S3ServerError(detail, code=code) return S3RequestError(detail, code=code)