FOUNDRY
C8 Platform
← Tasks

[SELF-11] Graceful model degradation

completedcode_genP2

Description

## SELF-11: Model Fallback Chain When API key is missing or rate-limited, degrade to cheaper models. ### Requirements - Fallback chain per transport: anthropic → gemini → openai → huggingface - On 429/503, mark arm as temporarily unavailable (5-min cooldown) - On missing API key, skip arm entirely (log warning once) - Track degradation events in audit log ### Target File File: `c8_platform/foundry_bridge.py`
Assignee
-
Claimed By
Cloud Lu → Claude Opus 4.6
Created
1d ago
Completed
1d ago

Quality

Overall
100%
Pass
PASS
Graded By
-
Graded At
-

Cost

Input Tokens
-
Output Tokens
-
Est. USD
-

Result

{
  "cost": {
    "input_tokens": 138,
    "estimated_usd": 0.64392,
    "output_tokens": 8558
  },
  "model": "claude-opus-4-6",
  "status": "success",
  "transport": "anthropic_api",
  "latency_ms": 100399,
  "response_text": "\n\nLooking at this task, I need to implement a graceful model degradation system with a fallback chain, cooldown tracking, and audit logging. Let me build this properly.\n\n```python\n\"\"\"c8_platform/foundry_bridge.py\n\nSELF-11: Graceful model degradation with fallback chain.\n\nWhen an API key is missing or a model is rate-limited, the system\ndegrades to the next available model in the fallback chain.\n\"\"\"\n\nfrom __future__ import annotations\n\nimport logging\nimport os\nimport time\nfrom dataclasses import dataclass, field\nfrom enum import Enum\nfrom typing import Any, Callable, Optional, Protocol\n\nlogger = logging.getLogger(__name__)\n\n\nclass Transport(str, Enum):\n    ANTHROPIC = \"anthropic\"\n    GEMINI = \"gemini\"\n    OPENAI = \"openai\"\n    HUGGINGFACE = \"huggingface\"\n\n\n# Default fallback ordering. Each transport maps to an ordered list of\n# candidates tried in sequence until one succeeds.\nDEFAULT_FALLBACK_CHAIN: list[Transport] = [\n    Transport.ANTHROPIC,\n    Transport.GEMINI,\n    Transport.OPENAI,\n    Transport.HUGGINGFACE,\n]\n\n# Environment variable names expected for each transport's API key.\n_API_KEY_ENV_VARS: dict[Transport, str] = {\n    Transport.ANTHROPIC: \"ANTHROPIC_API_KEY\",\n    Transport.GEMINI: \"GEMINI_API_KEY\",\n    Transport.OPENAI: \"OPENAI_API_KEY\",\n    Transport.HUGGINGFACE: \"HUGGINGFACE_API_KEY\",\n}\n\n# HTTP status codes that trigger temporary cooldown.\n_RETRIABLE_STATUS_CODES = frozenset({429, 503})\n\n# How long an arm stays unavailable after a retriable failure.\nCOOLDOWN_SECONDS: float = 300.0  # 5 minutes\n\n\nclass AuditEvent(str, Enum):\n    FALLBACK_TRIGGERED = \"fallback_triggered\"\n    ARM_COOLED_DOWN = \"arm_cooled_down\"\n    ARM_COOLDOWN_EXPIRED = \"arm_cooldown_expired\"\n    ARM_SKIPPED_NO_KEY = \"arm_skipped_no_key\"\n    ALL_ARMS_EXHAUSTED = \"all_arms_exhausted\"\n    REQUEST_SUCCESS = \"request_success\"\n\n\n@dataclass\nclass AuditRecord:\n    timestamp: float\n    event: AuditEvent\n    transport: Optional[Transport]\n    detail: str\n\n    def __repr__(self) -> str:\n        ts = time.strftime(\"%Y-%m-%dT%H:%M:%S\", time.gmtime(self.timestamp))\n        return f\"AuditRecord({ts}, {self.event.value}, {self.transport}, {self.detail!r})\"\n\n\nclass TransportError(Exception):\n    \"\"\"Raised by a transport backend on failure.\"\"\"\n\n    def __init__(self, message: str, status_code: Optional[int] = None) -> None:\n        super().__init__(message)\n        self.status_code = status_code\n\n\nclass AllArmsExhaustedError(Exception):\n    \"\"\"Raised when every arm in the fallback chain is unavailable.\"\"\"\n\n\nclass TransportBackend(Protocol):\n    \"\"\"Protocol that concrete transport implementations must satisfy.\"\"\"\n\n    def invoke(self, payload: dict[str, Any]) -> dict[str, Any]: ...\n\n\n@dataclass\nclass _ArmState:\n    \"\"\"Mutable state tracked per fallback arm.\"\"\"\n\n    transport: Transport\n    cooldown_until: float = 0.0\n    missing_key_warned: bool = False\n\n\n@dataclass\nclass FallbackChain:\n    \"\"\"Orchestrates model fallback across transports.\n\n    Parameters\n    ----------\n    chain:\n        Ordered list of transports to attempt. Defaults to the full chain.\n    backends:\n        Mapping of transport → callable backend. If not supplied for a\n        transport, a default stub that raises ``TransportError`` is used.\n    cooldown_seconds:\n        Duration in seconds to mark an arm unavailable after a retriable error.\n    clock:\n        Injectable clock function for deterministic testing.\n    \"\"\"\n\n    chain: list[Transport] = field(default_factory=lambda: list(DEFAULT_FALLBACK_CHAIN))\n    backends: dict[Transport, TransportBackend] = field(default_factory=dict)\n    cooldown_seconds: float = COOLDOWN_SECONDS\n    clock: Callable[[], float] = field(default=time.time, repr=False)\n\n    # Internal state — not intended for external construction.\n    _arm_states: dict[Transport, _ArmState] = field(default_factory=dict, init=False, repr=False)\n    _audit_log: list[AuditRecord] = field(default_factory=list, init=False, repr=False)\n\n    def __post_init__(self) -> None:\n        for transport in self.chain:\n            self._arm_states[transport] = _ArmState(transport=transport)\n\n    # ------------------------------------------------------------------\n    # Public API\n    # ------------------------------------------------------------------\n\n    def invoke(self, payload: dict[str, Any]) -> dict[str, Any]:\n        \"\"\"Send *payload* through the first available arm in the chain.\n\n        Raises\n        ------\n        AllArmsExhaustedError\n            If no arm is available (all missing keys or in cooldown).\n        \"\"\"\n        now = self.clock()\n        last_error: Optional[Exception] = None\n\n        for transport in self.chain:\n            arm = self._arm_states[transport]\n\n            # --- Check API key availability ---\n            if not self._has_api_key(transport):\n                if not arm.missing_key_warned:\n                    arm.missing_key_warned = True\n                    env_var = _API_KEY_ENV_VARS.get(transport, \"UNKNOWN\")\n                    detail = f\"No API key ({env_var}) for {transport.value}; skipping permanently\"\n                    logger.warning(detail)\n                    self._record(AuditEvent.ARM_SKIPPED_NO_KEY, transport, detail)\n                continue\n\n            # --- Check cooldown ---\n            if arm.cooldown_until > now:\n                remaining = arm.cooldown_until - now\n                logger.debug(\n                    \"Arm %s in cooldown for %.1fs more\", transport.value, remaining\n                )\n                continue\n\n            # If cooldown just expired, log it.\n            if arm.cooldown_until > 0:\n                self._record(\n                    AuditEvent.ARM_COOLDOWN_EXPIRED,\n                    transport,\n                    f\"Cooldown expired for {transport.value}\",\n                )\n                arm.cooldown_until = 0.0\n\n            # --- Attempt invocation ---\n            backend = self.backends.get(transport)\n            if backend is None:\n                detail = f\"No backend registered for {transport.value}; skipping\"\n                logger.debug(detail)\n                continue\n\n            try:\n                result = backend.invoke(payload)\n                self._record(\n                    AuditEvent.REQUEST_SUCCESS,\n                    transport,\n                    f\"Successful invocation via {transport.value}\",\n                )\n                return result\n\n            except TransportError as exc:\n                last_error = exc\n                if exc.status_code in _RETRIABLE_STATUS_CODES:\n                    arm.cooldown_until = now + self.cooldown_seconds\n                    detail = (\n                        f\"{transport.value} returned {exc.status_code}; \"\n                        f\"cooldown until +{self.cooldown_seconds}s\"\n                    )\n                    logger.warning(detail)\n                    self._record(AuditEvent.ARM_COOLED_DOWN, transport, detail)\n                else:\n                    detail = (\n                        f\"{transport.value} failed with status \"\n                        f\"{exc.status_code}: {exc}\"\n                    )\n                    logger.error(detail)\n\n                # Log the fallback event regardless of error type.\n                self._record(\n                    AuditEvent.FALLBACK_TRIGGERED,\n                    transport,\n                    f\"Falling back from {transport.value}: {exc}\",\n                )\n                continue\n\n            except Exception as exc:\n                last_error = exc\n                detail = f\"{transport.value} unexpected error: {exc}\"\n                logger.error(detail)\n                self._record(AuditEvent.FALLBACK_TRIGGERED, transport, detail)\n                continue\n\n        # All arms exhausted.\n        detail = \"All arms in the fallback chain are exhausted\"\n        self._record(AuditEvent.ALL_ARMS_EXHAUSTED, None, detail)\n        raise AllArmsExhaustedError(detail) from last_error\n\n    @property\n    def audit_log(self) -> list[AuditRecord]:\n        \"\"\"Return a copy of the audit log.\"\"\"\n        return list(self._audit_log)\n\n    def clear_audit_log(self) -> None:\n        self._audit_log.clear()\n\n    def reset_cooldown(self, transport: Transport) -> None:\n        \"\"\"Manually clear cooldown for a specific arm.\"\"\"\n        arm = self._arm_states.get(transport)\n        if arm is not None:\n            arm.cooldown_until = 0.0\n\n    def reset_all_cooldowns(self) -> None:\n        for arm in self._arm_states.values():\n            arm.cooldown_until = 0.0\n\n    def arm_available(self, transport: Transport) -> bool:\n        \"\"\"Check whether *transport* is currently usable.\"\"\"\n        if not self._has_api_key(transport):\n            return False\n        arm = self._arm_states.get(transport)\n        if arm is None:\n            return False\n        return arm.cooldown_until <= self.clock()\n\n    def available_arms(self) -> list[Transport]:\n        \"\"\"Return the list of currently available transports in chain order.\"\"\"\n        return [t for t in self.chain if self.arm_available(t) and t in self.backends]\n\n    # ------------------------------------------------------------------\n    # Internal helpers\n    # ------------------------------------------------------------------\n\n    @staticmethod\n    def _has_api_key(transport: Transport) -> bool:\n        env_var = _API_KEY_ENV_VARS.get(transport)\n        if env_var is None:\n            return False\n        value = os.environ.get(env_var, \"\").strip()\n        return len(value) > 0\n\n    def _record(\n        self,\n        event: AuditEvent,\n        transport: Optional[Transport],\n        detail: str,\n    ) -> None:\n        record = AuditRecord(\n            timestamp=self.clock(),\n            event=event,\n            transport=transport,\n            detail=detail,\n        )\n        self._audit_log.append(record)\n        logger.info(\"AUDIT: %s\", record)\n```\n\n```python\n\"\"\"tests/test_foundry_bridge.py\n\nTests for SELF-11: Graceful model degradation / fallback chain.\n\"\"\"\n\nfrom __future__ import annotations\n\nimport os\nfrom typing import Any\nfrom unittest.mock import patch\n\nimport pytest\n\nfrom c8_platform.foundry_bridge import (\n    AllArmsExhaustedError,\n    AuditEvent,\n    FallbackChain,\n    Transport,\n    TransportError,\n)\n\n\n# ---------------------------------------------------------------------------\n# Helpers\n# ---------------------------------------------------------------------------\n\nclass StubBackend:\n    \"\"\"A configurable stub that can succeed or fail.\"\"\"\n\n    def __init__(\n        self,\n        response: dict[str, Any] | None = None,\n        error: TransportError | None = None,\n    ) -> None:\n        self._response = response or {\"ok\": True}\n        self._error = error\n        self.call_count = 0\n\n    def invoke(self, payload: dict[str, Any]) -> dict[str, Any]:\n        self.call_count += 1\n        if self._error is not None:\n            raise self._error\n        return self._response\n\n\ndef _env_with_keys(*transports: Transport) -> dict[str, str]:\n    \"\"\"Return a dict suitable for patching os.environ with only the given keys set.\"\"\"\n    from c8_platform.foundry_bridge import _API_KEY_ENV_VARS\n\n    env = {}\n    for t in transports:\n        env[_API_KEY_ENV_VARS[t]] = \"test-key-\" + t.value\n    return env\n\n\nclass FakeClock:\n    \"\"\"Deterministic clock for testing.\"\"\"\n\n    def __init__(self, start: float = 1000.0) -> None:\n        self._now = start\n\n    def __call__(self) -> float:\n        return self._now\n\n    def advance(self, seconds: float) -> None:\n        self._now += seconds\n\n\n# ---------------------------------------------------------------------------\n# Tests: basic happy path\n# ---------------------------------------------------------------------------\n\nclass TestHappyPath:\n    def test_first_arm_succeeds(self):\n        clock = FakeClock()\n        backend = StubBackend(response={\"model\": \"anthropic\"})\n\n        with patch.dict(os.environ, _env_with_keys(Transport.ANTHROPIC), clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC],\n                backends={Transport.ANTHROPIC: backend},\n                clock=clock,\n            )\n            result = chain.invoke({\"prompt\": \"hello\"})\n\n        assert result == {\"model\": \"anthropic\"}\n        assert backend.call_count == 1\n        assert any(r.event == AuditEvent.REQUEST_SUCCESS for r in chain.audit_log)\n\n    def test_uses_first_available_in_chain_order(self):\n        clock = FakeClock()\n        b_anthropic = StubBackend(response={\"model\": \"anthropic\"})\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                clock=clock,\n            )\n            result = chain.invoke({\"prompt\": \"hi\"})\n\n        assert result == {\"model\": \"anthropic\"}\n        assert b_anthropic.call_count == 1\n        assert b_gemini.call_count == 0\n\n\n# ---------------------------------------------------------------------------\n# Tests: missing API key\n# ---------------------------------------------------------------------------\n\nclass TestMissingApiKey:\n    def test_skips_arm_with_missing_key(self):\n        clock = FakeClock()\n        b_anthropic = StubBackend(response={\"model\": \"anthropic\"})\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        # Only gemini key set — anthropic should be skipped.\n        env = _env_with_keys(Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                clock=clock,\n            )\n            result = chain.invoke({\"prompt\": \"test\"})\n\n        assert result == {\"model\": \"gemini\"}\n        assert b_anthropic.call_count == 0\n        # Warning logged once\n        skip_events = [\n            r for r in chain.audit_log if r.event == AuditEvent.ARM_SKIPPED_NO_KEY\n        ]\n        assert len(skip_events) == 1\n        assert skip_events[0].transport == Transport.ANTHROPIC\n\n    def test_missing_key_warning_only_logged_once(self):\n        clock = FakeClock()\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={Transport.GEMINI: b_gemini},\n                clock=clock,\n            )\n            chain.invoke({\"prompt\": \"1\"})\n            chain.invoke({\"prompt\": \"2\"})\n            chain.invoke({\"prompt\": \"3\"})\n\n        skip_events = [\n            r for r in chain.audit_log if r.event == AuditEvent.ARM_SKIPPED_NO_KEY\n        ]\n        assert len(skip_events) == 1\n\n\n# ---------------------------------------------------------------------------\n# Tests: rate limiting / cooldown\n# ---------------------------------------------------------------------------\n\nclass TestCooldown:\n    def test_429_triggers_cooldown_and_fallback(self):\n        clock = FakeClock()\n        b_anthropic = StubBackend(error=TransportError(\"rate limited\", status_code=429))\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                cooldown_seconds=300.0,\n                clock=clock,\n            )\n            result = chain.invoke({\"prompt\": \"test\"})\n\n        assert result == {\"model\": \"gemini\"}\n        assert b_anthropic.call_count == 1\n        assert b_gemini.call_count == 1\n\n        # Cooldown event recorded.\n        cooldown_events = [\n            r for r in chain.audit_log if r.event == AuditEvent.ARM_COOLED_DOWN\n        ]\n        assert len(cooldown_events) == 1\n        assert cooldown_events[0].transport == Transport.ANTHROPIC\n\n    def test_503_triggers_cooldown(self):\n        clock = FakeClock()\n        b_anthropic = StubBackend(\n            error=TransportError(\"service unavailable\", status_code=503)\n        )\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                cooldown_seconds=300.0,\n                clock=clock,\n            )\n            result = chain.invoke({\"prompt\": \"test\"})\n\n        assert result == {\"model\": \"gemini\"}\n        cooldown_events = [\n            r for r in chain.audit_log if r.event == AuditEvent.ARM_COOLED_DOWN\n        ]\n        assert len(cooldown_events) == 1\n\n    def test_arm_stays_cooled_down_within_window(self):\n        clock = FakeClock(start=1000.0)\n        call_count_anthropic = 0\n\n        class CountingBackend:\n            def __init__(self):\n                self.calls = 0\n\n            def invoke(self, payload):\n                self.calls += 1\n                raise TransportError(\"rate limited\", status_code=429)\n\n        b_anthropic = CountingBackend()\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                cooldown_seconds=300.0,\n                clock=clock,\n            )\n\n            # First call: anthropic fails, falls back to gemini.\n            chain.invoke({\"prompt\": \"1\"})\n            assert b_anthropic.calls == 1\n\n            # Second call 60s later: anthropic still in cooldown, gemini used directly.\n            clock.advance(60.0)\n            chain.invoke({\"prompt\": \"2\"})\n            assert b_anthropic.calls == 1  # NOT retried\n\n    def test_arm_recovers_after_cooldown_expires(self):\n        clock = FakeClock(start=1000.0)\n\n        class RecoveringBackend:\n            def __init__(self):\n                self.calls = 0\n                self._fail_first = True\n\n            def invoke(self, payload):\n                self.calls += 1\n                if self._fail_first:\n                    self._fail_first = False\n                    raise TransportError(\"rate limited\", status_code=429)\n                return {\"model\": \"anthropic\", \"recovered\": True}\n\n        b_anthropic = RecoveringBackend()\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                cooldown_seconds=300.0,\n                clock=clock,\n            )\n\n            # First call: anthropic 429 → gemini.\n            r1 = chain.invoke({\"prompt\": \"1\"})\n            assert r1 == {\"model\": \"gemini\"}\n\n            # Advance past cooldown.\n            clock.advance(301.0)\n            r2 = chain.invoke({\"prompt\": \"2\"})\n            assert r2 == {\"model\": \"anthropic\", \"recovered\": True}\n            assert b_anthropic.calls == 2\n\n\n# ---------------------------------------------------------------------------\n# Tests: all arms exhausted\n# ---------------------------------------------------------------------------\n\nclass TestAllArmsExhausted:\n    def test_raises_when_all_arms_fail(self):\n        clock = FakeClock()\n        b_anthropic = StubBackend(error=TransportError(\"fail\", status_code=500))\n        b_gemini = StubBackend(error=TransportError(\"fail\", status_code=500))\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                clock=clock,\n            )\n            with pytest.raises(AllArmsExhaustedError):\n                chain.invoke({\"prompt\": \"doom\"})\n\n        exhausted = [\n            r for r in chain.audit_log if r.event == AuditEvent.ALL_ARMS_EXHAUSTED\n        ]\n        assert len(exhausted) == 1\n\n    def test_raises_when_no_keys_at_all(self):\n        clock = FakeClock()\n        b = StubBackend(response={\"ok\": True})\n\n        with patch.dict(os.environ, {}, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={Transport.ANTHROPIC: b, Transport.GEMINI: b},\n                clock=clock,\n            )\n            with pytest.raises(AllArmsExhaustedError):\n                chain.invoke({\"prompt\": \"no keys\"})\n\n    def test_raises_when_all_arms_in_cooldown(self):\n        clock = FakeClock(start=1000.0)\n        b_anthropic = StubBackend(error=TransportError(\"429\", status_code=429))\n        b_gemini = StubBackend(error=TransportError(\"429\", status_code=429))\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                cooldown_seconds=300.0,\n                clock=clock,\n            )\n            # First call exhausts both arms with cooldown.\n            with pytest.raises(AllArmsExhaustedError):\n                chain.invoke({\"prompt\": \"1\"})\n\n            # Second call 10s later: both still in cooldown.\n            clock.advance(10.0)\n            with pytest.raises(AllArmsExhaustedError):\n                chain.invoke({\"prompt\": \"2\"})\n\n\n# ---------------------------------------------------------------------------\n# Tests: non-retriable errors\n# ---------------------------------------------------------------------------\n\nclass TestNonRetriableError:\n    def test_500_does_not_set_cooldown(self):\n        clock = FakeClock(start=1000.0)\n\n        class FailOnceBackend:\n            def __init__(self):\n                self.calls = 0\n\n            def invoke(self, payload):\n                self.calls += 1\n                if self.calls == 1:\n                    raise TransportError(\"internal error\", status_code=500)\n                return {\"model\": \"anthropic\", \"ok\": True}\n\n        b_anthropic = FailOnceBackend()\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                cooldown_seconds=300.0,\n                clock=clock,\n            )\n\n            # First: anthropic 500, falls to gemini.\n            r1 = chain.invoke({\"prompt\": \"1\"})\n            assert r1 == {\"model\": \"gemini\"}\n\n            # Second call immediately: anthropic retried (no cooldown for 500).\n            r2 = chain.invoke({\"prompt\": \"2\"})\n            assert r2 == {\"model\": \"anthropic\", \"ok\": True}\n\n\n# ---------------------------------------------------------------------------\n# Tests: unexpected (non-TransportError) exceptions\n# ---------------------------------------------------------------------------\n\nclass TestUnexpectedException:\n    def test_unexpected_error_triggers_fallback(self):\n        clock = FakeClock()\n\n        class BrokenBackend:\n            def invoke(self, payload):\n                raise RuntimeError(\"segfault in the cloud\")\n\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: BrokenBackend(),\n                    Transport.GEMINI: b_gemini,\n                },\n                clock=clock,\n            )\n            result = chain.invoke({\"prompt\": \"test\"})\n\n        assert result == {\"model\": \"gemini\"}\n        fallback_events = [\n            r for r in chain.audit_log if r.event == AuditEvent.FALLBACK_TRIGGERED\n        ]\n        assert len(fallback_events) == 1\n\n\n# ---------------------------------------------------------------------------\n# Tests: utility methods\n# ---------------------------------------------------------------------------\n\nclass TestUtilities:\n    def test_arm_available_reflects_key_and_cooldown(self):\n        clock = FakeClock(start=1000.0)\n\n        env = _env_with_keys(Transport.ANTHROPIC)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={},\n                clock=clock,\n            )\n            assert chain.arm_available(Transport.ANTHROPIC) is True\n            assert chain.arm_available(Transport.GEMINI) is False  # no key\n\n    def test_reset_cooldown(self):\n        clock = FakeClock(start=1000.0)\n        b_anthropic = StubBackend(error=TransportError(\"429\", status_code=429))\n        b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                },\n                cooldown_seconds=300.0,\n                clock=clock,\n            )\n            chain.invoke({\"prompt\": \"1\"})\n            assert chain.arm_available(Transport.ANTHROPIC) is False\n\n            chain.reset_cooldown(Transport.ANTHROPIC)\n            assert chain.arm_available(Transport.ANTHROPIC) is True\n\n    def test_available_arms(self):\n        clock = FakeClock()\n        b_anthropic = StubBackend()\n        b_gemini = StubBackend()\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC, Transport.GEMINI, Transport.OPENAI],\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.GEMINI: b_gemini,\n                    # openai has no backend registered\n                },\n                clock=clock,\n            )\n            available = chain.available_arms()\n            assert available == [Transport.ANTHROPIC, Transport.GEMINI]\n\n    def test_clear_audit_log(self):\n        clock = FakeClock()\n        b = StubBackend(response={\"ok\": True})\n\n        env = _env_with_keys(Transport.ANTHROPIC)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=[Transport.ANTHROPIC],\n                backends={Transport.ANTHROPIC: b},\n                clock=clock,\n            )\n            chain.invoke({\"prompt\": \"hi\"})\n            assert len(chain.audit_log) > 0\n\n            chain.clear_audit_log()\n            assert len(chain.audit_log) == 0\n\n\n# ---------------------------------------------------------------------------\n# Tests: full default chain\n# ---------------------------------------------------------------------------\n\nclass TestFullChain:\n    def test_degrades_through_entire_default_chain(self):\n        \"\"\"Simulate: anthropic→429, gemini→no key, openai→503, huggingface→ok.\"\"\"\n        clock = FakeClock()\n\n        b_anthropic = StubBackend(error=TransportError(\"429\", status_code=429))\n        # gemini: no API key set\n        b_openai = StubBackend(error=TransportError(\"503\", status_code=503))\n        b_hf = StubBackend(response={\"model\": \"huggingface\", \"fallback\": True})\n\n        env = _env_with_keys(Transport.ANTHROPIC, Transport.OPENAI, Transport.HUGGINGFACE)\n        with patch.dict(os.environ, env, clear=True):\n            chain = FallbackChain(\n                chain=list(DEFAULT_FALLBACK_CHAIN),\n                backends={\n                    Transport.ANTHROPIC: b_anthropic,\n                    Transport.OPENAI: b_openai,\n                    Transport.HUGGINGFACE: b_hf,\n                },\n                cooldown_seconds=300.0,\n                clock=clock,\n            )\n            result = chain.invoke({\"prompt\": \"full chain test\"})\n\n        assert result == {\"model\": \"huggingface\", \"fallback\": True}\n\n        events = [r.event for r in chain.audit_log]\n        assert AuditEvent.ARM_COOLED_DOWN in events       # anthropic 429\n        assert AuditEvent.ARM_SKIPPED_NO_KEY in events     # gemini\n        assert AuditEvent.ARM_COOLED_DOWN in events        # openai 503\n        assert AuditEvent.REQUEST_SUCCESS in events        # huggingface\n\n\n# Need this import for TestFullChain\nfrom c8_platform.foundry_bridge import DEFAULT_FALLBACK_CHAIN\n```\n\n## Design decisions and rationale\n\n**Fallback chain**: `anthropic → gemini → openai → huggingface` as specified. The chain is configurable per instance, so different transports (e.g., a coding task vs. a summarization task) can use different orderings.\n\n**Cooldown mechanism**: On HTTP 429 or 503, the arm is marked unavailable for `cooldown_seconds` (default 300s / 5 minutes). The timestamp comparison uses an injectable clock (`Callable[[], float]`) so tests are deterministic with no `time.sleep`. Non-retriable errors (e.g., 500) trigger fallback but do *not* set a cooldown — the arm will be retried on the next request.\n\n**Missing API key handling**: Checked via `os.environ` lookup on each invocation (so keys can be hot-loaded), but the \"missing key\" warning is logged exactly once per arm per `FallbackChain` instance via the `missing_key_warned` flag.\n\n**Audit log**: Every significant event — cooldowns, skips, fallbacks, successes, exhaustion — is recorded as an `AuditRecord` with timestamp, event type, transport, and human-readable detail. The log is also emitted via `logging.info` for integration with external log aggregation.\n\n**`AllArmsExhaustedError`**: Raised with the last encountered exception chained (`from last_error`) so callers can inspect the root cause."
}
Task ID: c8081dc9-4f76-4e42-bb3e-56cf19e904af