Source code for chia.base.tools.util

from __future__ import annotations

from contextlib import AsyncExitStack, asynccontextmanager
from typing import TYPE_CHECKING, List

if TYPE_CHECKING:
    from fastapi import FastAPI
    from mcp.server.fastmcp import FastMCP

[docs] def make_router_lifespan(mcp_instances: List[FastMCP]): """A lifespan which runs all MCP session managers concurrently. This is useful when multiple MCP ASGI apps are mounted under one FastAPI ingress app (e.g. /gem5, /chia). Mounting should be done outside the lifespan; this only manages session manager lifetimes. """ @asynccontextmanager async def lifespan(app: FastAPI): async with AsyncExitStack() as stack: for mcp in mcp_instances: await stack.enter_async_context(mcp.session_manager.run()) yield return lifespan