← Tasks
[SELF-11] Graceful model degradation
completedcode_genP2
Description
## SELF-11: Model Fallback Chain
When API key is missing or rate-limited, degrade to cheaper models.
### Requirements
- Fallback chain per transport: anthropic → gemini → openai → huggingface
- On 429/503, mark arm as temporarily unavailable (5-min cooldown)
- On missing API key, skip arm entirely (log warning once)
- Track degradation events in audit log
### Target File
File: `c8_platform/foundry_bridge.py`
Assignee
-
Claimed By
Cloud Lu → Claude Opus 4.6
Created
1d ago
Completed
1d ago
Quality
Overall
100%
Pass
PASS
Graded By
-
Graded At
-
Cost
Input Tokens
-
Output Tokens
-
Est. USD
-
Result
{
"cost": {
"input_tokens": 138,
"estimated_usd": 0.64392,
"output_tokens": 8558
},
"model": "claude-opus-4-6",
"status": "success",
"transport": "anthropic_api",
"latency_ms": 100399,
"response_text": "\n\nLooking at this task, I need to implement a graceful model degradation system with a fallback chain, cooldown tracking, and audit logging. Let me build this properly.\n\n```python\n\"\"\"c8_platform/foundry_bridge.py\n\nSELF-11: Graceful model degradation with fallback chain.\n\nWhen an API key is missing or a model is rate-limited, the system\ndegrades to the next available model in the fallback chain.\n\"\"\"\n\nfrom __future__ import annotations\n\nimport logging\nimport os\nimport time\nfrom dataclasses import dataclass, field\nfrom enum import Enum\nfrom typing import Any, Callable, Optional, Protocol\n\nlogger = logging.getLogger(__name__)\n\n\nclass Transport(str, Enum):\n ANTHROPIC = \"anthropic\"\n GEMINI = \"gemini\"\n OPENAI = \"openai\"\n HUGGINGFACE = \"huggingface\"\n\n\n# Default fallback ordering. Each transport maps to an ordered list of\n# candidates tried in sequence until one succeeds.\nDEFAULT_FALLBACK_CHAIN: list[Transport] = [\n Transport.ANTHROPIC,\n Transport.GEMINI,\n Transport.OPENAI,\n Transport.HUGGINGFACE,\n]\n\n# Environment variable names expected for each transport's API key.\n_API_KEY_ENV_VARS: dict[Transport, str] = {\n Transport.ANTHROPIC: \"ANTHROPIC_API_KEY\",\n Transport.GEMINI: \"GEMINI_API_KEY\",\n Transport.OPENAI: \"OPENAI_API_KEY\",\n Transport.HUGGINGFACE: \"HUGGINGFACE_API_KEY\",\n}\n\n# HTTP status codes that trigger temporary cooldown.\n_RETRIABLE_STATUS_CODES = frozenset({429, 503})\n\n# How long an arm stays unavailable after a retriable failure.\nCOOLDOWN_SECONDS: float = 300.0 # 5 minutes\n\n\nclass AuditEvent(str, Enum):\n FALLBACK_TRIGGERED = \"fallback_triggered\"\n ARM_COOLED_DOWN = \"arm_cooled_down\"\n ARM_COOLDOWN_EXPIRED = \"arm_cooldown_expired\"\n ARM_SKIPPED_NO_KEY = \"arm_skipped_no_key\"\n ALL_ARMS_EXHAUSTED = \"all_arms_exhausted\"\n REQUEST_SUCCESS = \"request_success\"\n\n\n@dataclass\nclass AuditRecord:\n timestamp: float\n event: AuditEvent\n transport: Optional[Transport]\n detail: str\n\n def __repr__(self) -> str:\n ts = time.strftime(\"%Y-%m-%dT%H:%M:%S\", time.gmtime(self.timestamp))\n return f\"AuditRecord({ts}, {self.event.value}, {self.transport}, {self.detail!r})\"\n\n\nclass TransportError(Exception):\n \"\"\"Raised by a transport backend on failure.\"\"\"\n\n def __init__(self, message: str, status_code: Optional[int] = None) -> None:\n super().__init__(message)\n self.status_code = status_code\n\n\nclass AllArmsExhaustedError(Exception):\n \"\"\"Raised when every arm in the fallback chain is unavailable.\"\"\"\n\n\nclass TransportBackend(Protocol):\n \"\"\"Protocol that concrete transport implementations must satisfy.\"\"\"\n\n def invoke(self, payload: dict[str, Any]) -> dict[str, Any]: ...\n\n\n@dataclass\nclass _ArmState:\n \"\"\"Mutable state tracked per fallback arm.\"\"\"\n\n transport: Transport\n cooldown_until: float = 0.0\n missing_key_warned: bool = False\n\n\n@dataclass\nclass FallbackChain:\n \"\"\"Orchestrates model fallback across transports.\n\n Parameters\n ----------\n chain:\n Ordered list of transports to attempt. Defaults to the full chain.\n backends:\n Mapping of transport → callable backend. If not supplied for a\n transport, a default stub that raises ``TransportError`` is used.\n cooldown_seconds:\n Duration in seconds to mark an arm unavailable after a retriable error.\n clock:\n Injectable clock function for deterministic testing.\n \"\"\"\n\n chain: list[Transport] = field(default_factory=lambda: list(DEFAULT_FALLBACK_CHAIN))\n backends: dict[Transport, TransportBackend] = field(default_factory=dict)\n cooldown_seconds: float = COOLDOWN_SECONDS\n clock: Callable[[], float] = field(default=time.time, repr=False)\n\n # Internal state — not intended for external construction.\n _arm_states: dict[Transport, _ArmState] = field(default_factory=dict, init=False, repr=False)\n _audit_log: list[AuditRecord] = field(default_factory=list, init=False, repr=False)\n\n def __post_init__(self) -> None:\n for transport in self.chain:\n self._arm_states[transport] = _ArmState(transport=transport)\n\n # ------------------------------------------------------------------\n # Public API\n # ------------------------------------------------------------------\n\n def invoke(self, payload: dict[str, Any]) -> dict[str, Any]:\n \"\"\"Send *payload* through the first available arm in the chain.\n\n Raises\n ------\n AllArmsExhaustedError\n If no arm is available (all missing keys or in cooldown).\n \"\"\"\n now = self.clock()\n last_error: Optional[Exception] = None\n\n for transport in self.chain:\n arm = self._arm_states[transport]\n\n # --- Check API key availability ---\n if not self._has_api_key(transport):\n if not arm.missing_key_warned:\n arm.missing_key_warned = True\n env_var = _API_KEY_ENV_VARS.get(transport, \"UNKNOWN\")\n detail = f\"No API key ({env_var}) for {transport.value}; skipping permanently\"\n logger.warning(detail)\n self._record(AuditEvent.ARM_SKIPPED_NO_KEY, transport, detail)\n continue\n\n # --- Check cooldown ---\n if arm.cooldown_until > now:\n remaining = arm.cooldown_until - now\n logger.debug(\n \"Arm %s in cooldown for %.1fs more\", transport.value, remaining\n )\n continue\n\n # If cooldown just expired, log it.\n if arm.cooldown_until > 0:\n self._record(\n AuditEvent.ARM_COOLDOWN_EXPIRED,\n transport,\n f\"Cooldown expired for {transport.value}\",\n )\n arm.cooldown_until = 0.0\n\n # --- Attempt invocation ---\n backend = self.backends.get(transport)\n if backend is None:\n detail = f\"No backend registered for {transport.value}; skipping\"\n logger.debug(detail)\n continue\n\n try:\n result = backend.invoke(payload)\n self._record(\n AuditEvent.REQUEST_SUCCESS,\n transport,\n f\"Successful invocation via {transport.value}\",\n )\n return result\n\n except TransportError as exc:\n last_error = exc\n if exc.status_code in _RETRIABLE_STATUS_CODES:\n arm.cooldown_until = now + self.cooldown_seconds\n detail = (\n f\"{transport.value} returned {exc.status_code}; \"\n f\"cooldown until +{self.cooldown_seconds}s\"\n )\n logger.warning(detail)\n self._record(AuditEvent.ARM_COOLED_DOWN, transport, detail)\n else:\n detail = (\n f\"{transport.value} failed with status \"\n f\"{exc.status_code}: {exc}\"\n )\n logger.error(detail)\n\n # Log the fallback event regardless of error type.\n self._record(\n AuditEvent.FALLBACK_TRIGGERED,\n transport,\n f\"Falling back from {transport.value}: {exc}\",\n )\n continue\n\n except Exception as exc:\n last_error = exc\n detail = f\"{transport.value} unexpected error: {exc}\"\n logger.error(detail)\n self._record(AuditEvent.FALLBACK_TRIGGERED, transport, detail)\n continue\n\n # All arms exhausted.\n detail = \"All arms in the fallback chain are exhausted\"\n self._record(AuditEvent.ALL_ARMS_EXHAUSTED, None, detail)\n raise AllArmsExhaustedError(detail) from last_error\n\n @property\n def audit_log(self) -> list[AuditRecord]:\n \"\"\"Return a copy of the audit log.\"\"\"\n return list(self._audit_log)\n\n def clear_audit_log(self) -> None:\n self._audit_log.clear()\n\n def reset_cooldown(self, transport: Transport) -> None:\n \"\"\"Manually clear cooldown for a specific arm.\"\"\"\n arm = self._arm_states.get(transport)\n if arm is not None:\n arm.cooldown_until = 0.0\n\n def reset_all_cooldowns(self) -> None:\n for arm in self._arm_states.values():\n arm.cooldown_until = 0.0\n\n def arm_available(self, transport: Transport) -> bool:\n \"\"\"Check whether *transport* is currently usable.\"\"\"\n if not self._has_api_key(transport):\n return False\n arm = self._arm_states.get(transport)\n if arm is None:\n return False\n return arm.cooldown_until <= self.clock()\n\n def available_arms(self) -> list[Transport]:\n \"\"\"Return the list of currently available transports in chain order.\"\"\"\n return [t for t in self.chain if self.arm_available(t) and t in self.backends]\n\n # ------------------------------------------------------------------\n # Internal helpers\n # ------------------------------------------------------------------\n\n @staticmethod\n def _has_api_key(transport: Transport) -> bool:\n env_var = _API_KEY_ENV_VARS.get(transport)\n if env_var is None:\n return False\n value = os.environ.get(env_var, \"\").strip()\n return len(value) > 0\n\n def _record(\n self,\n event: AuditEvent,\n transport: Optional[Transport],\n detail: str,\n ) -> None:\n record = AuditRecord(\n timestamp=self.clock(),\n event=event,\n transport=transport,\n detail=detail,\n )\n self._audit_log.append(record)\n logger.info(\"AUDIT: %s\", record)\n```\n\n```python\n\"\"\"tests/test_foundry_bridge.py\n\nTests for SELF-11: Graceful model degradation / fallback chain.\n\"\"\"\n\nfrom __future__ import annotations\n\nimport os\nfrom typing import Any\nfrom unittest.mock import patch\n\nimport pytest\n\nfrom c8_platform.foundry_bridge import (\n AllArmsExhaustedError,\n AuditEvent,\n FallbackChain,\n Transport,\n TransportError,\n)\n\n\n# ---------------------------------------------------------------------------\n# Helpers\n# ---------------------------------------------------------------------------\n\nclass StubBackend:\n \"\"\"A configurable stub that can succeed or fail.\"\"\"\n\n def __init__(\n self,\n response: dict[str, Any] | None = None,\n error: TransportError | None = None,\n ) -> None:\n self._response = response or {\"ok\": True}\n self._error = error\n self.call_count = 0\n\n def invoke(self, payload: dict[str, Any]) -> dict[str, Any]:\n self.call_count += 1\n if self._error is not None:\n raise self._error\n return self._response\n\n\ndef _env_with_keys(*transports: Transport) -> dict[str, str]:\n \"\"\"Return a dict suitable for patching os.environ with only the given keys set.\"\"\"\n from c8_platform.foundry_bridge import _API_KEY_ENV_VARS\n\n env = {}\n for t in transports:\n env[_API_KEY_ENV_VARS[t]] = \"test-key-\" + t.value\n return env\n\n\nclass FakeClock:\n \"\"\"Deterministic clock for testing.\"\"\"\n\n def __init__(self, start: float = 1000.0) -> None:\n self._now = start\n\n def __call__(self) -> float:\n return self._now\n\n def advance(self, seconds: float) -> None:\n self._now += seconds\n\n\n# ---------------------------------------------------------------------------\n# Tests: basic happy path\n# ---------------------------------------------------------------------------\n\nclass TestHappyPath:\n def test_first_arm_succeeds(self):\n clock = FakeClock()\n backend = StubBackend(response={\"model\": \"anthropic\"})\n\n with patch.dict(os.environ, _env_with_keys(Transport.ANTHROPIC), clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC],\n backends={Transport.ANTHROPIC: backend},\n clock=clock,\n )\n result = chain.invoke({\"prompt\": \"hello\"})\n\n assert result == {\"model\": \"anthropic\"}\n assert backend.call_count == 1\n assert any(r.event == AuditEvent.REQUEST_SUCCESS for r in chain.audit_log)\n\n def test_uses_first_available_in_chain_order(self):\n clock = FakeClock()\n b_anthropic = StubBackend(response={\"model\": \"anthropic\"})\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n clock=clock,\n )\n result = chain.invoke({\"prompt\": \"hi\"})\n\n assert result == {\"model\": \"anthropic\"}\n assert b_anthropic.call_count == 1\n assert b_gemini.call_count == 0\n\n\n# ---------------------------------------------------------------------------\n# Tests: missing API key\n# ---------------------------------------------------------------------------\n\nclass TestMissingApiKey:\n def test_skips_arm_with_missing_key(self):\n clock = FakeClock()\n b_anthropic = StubBackend(response={\"model\": \"anthropic\"})\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n # Only gemini key set — anthropic should be skipped.\n env = _env_with_keys(Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n clock=clock,\n )\n result = chain.invoke({\"prompt\": \"test\"})\n\n assert result == {\"model\": \"gemini\"}\n assert b_anthropic.call_count == 0\n # Warning logged once\n skip_events = [\n r for r in chain.audit_log if r.event == AuditEvent.ARM_SKIPPED_NO_KEY\n ]\n assert len(skip_events) == 1\n assert skip_events[0].transport == Transport.ANTHROPIC\n\n def test_missing_key_warning_only_logged_once(self):\n clock = FakeClock()\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={Transport.GEMINI: b_gemini},\n clock=clock,\n )\n chain.invoke({\"prompt\": \"1\"})\n chain.invoke({\"prompt\": \"2\"})\n chain.invoke({\"prompt\": \"3\"})\n\n skip_events = [\n r for r in chain.audit_log if r.event == AuditEvent.ARM_SKIPPED_NO_KEY\n ]\n assert len(skip_events) == 1\n\n\n# ---------------------------------------------------------------------------\n# Tests: rate limiting / cooldown\n# ---------------------------------------------------------------------------\n\nclass TestCooldown:\n def test_429_triggers_cooldown_and_fallback(self):\n clock = FakeClock()\n b_anthropic = StubBackend(error=TransportError(\"rate limited\", status_code=429))\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n cooldown_seconds=300.0,\n clock=clock,\n )\n result = chain.invoke({\"prompt\": \"test\"})\n\n assert result == {\"model\": \"gemini\"}\n assert b_anthropic.call_count == 1\n assert b_gemini.call_count == 1\n\n # Cooldown event recorded.\n cooldown_events = [\n r for r in chain.audit_log if r.event == AuditEvent.ARM_COOLED_DOWN\n ]\n assert len(cooldown_events) == 1\n assert cooldown_events[0].transport == Transport.ANTHROPIC\n\n def test_503_triggers_cooldown(self):\n clock = FakeClock()\n b_anthropic = StubBackend(\n error=TransportError(\"service unavailable\", status_code=503)\n )\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n cooldown_seconds=300.0,\n clock=clock,\n )\n result = chain.invoke({\"prompt\": \"test\"})\n\n assert result == {\"model\": \"gemini\"}\n cooldown_events = [\n r for r in chain.audit_log if r.event == AuditEvent.ARM_COOLED_DOWN\n ]\n assert len(cooldown_events) == 1\n\n def test_arm_stays_cooled_down_within_window(self):\n clock = FakeClock(start=1000.0)\n call_count_anthropic = 0\n\n class CountingBackend:\n def __init__(self):\n self.calls = 0\n\n def invoke(self, payload):\n self.calls += 1\n raise TransportError(\"rate limited\", status_code=429)\n\n b_anthropic = CountingBackend()\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n cooldown_seconds=300.0,\n clock=clock,\n )\n\n # First call: anthropic fails, falls back to gemini.\n chain.invoke({\"prompt\": \"1\"})\n assert b_anthropic.calls == 1\n\n # Second call 60s later: anthropic still in cooldown, gemini used directly.\n clock.advance(60.0)\n chain.invoke({\"prompt\": \"2\"})\n assert b_anthropic.calls == 1 # NOT retried\n\n def test_arm_recovers_after_cooldown_expires(self):\n clock = FakeClock(start=1000.0)\n\n class RecoveringBackend:\n def __init__(self):\n self.calls = 0\n self._fail_first = True\n\n def invoke(self, payload):\n self.calls += 1\n if self._fail_first:\n self._fail_first = False\n raise TransportError(\"rate limited\", status_code=429)\n return {\"model\": \"anthropic\", \"recovered\": True}\n\n b_anthropic = RecoveringBackend()\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n cooldown_seconds=300.0,\n clock=clock,\n )\n\n # First call: anthropic 429 → gemini.\n r1 = chain.invoke({\"prompt\": \"1\"})\n assert r1 == {\"model\": \"gemini\"}\n\n # Advance past cooldown.\n clock.advance(301.0)\n r2 = chain.invoke({\"prompt\": \"2\"})\n assert r2 == {\"model\": \"anthropic\", \"recovered\": True}\n assert b_anthropic.calls == 2\n\n\n# ---------------------------------------------------------------------------\n# Tests: all arms exhausted\n# ---------------------------------------------------------------------------\n\nclass TestAllArmsExhausted:\n def test_raises_when_all_arms_fail(self):\n clock = FakeClock()\n b_anthropic = StubBackend(error=TransportError(\"fail\", status_code=500))\n b_gemini = StubBackend(error=TransportError(\"fail\", status_code=500))\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n clock=clock,\n )\n with pytest.raises(AllArmsExhaustedError):\n chain.invoke({\"prompt\": \"doom\"})\n\n exhausted = [\n r for r in chain.audit_log if r.event == AuditEvent.ALL_ARMS_EXHAUSTED\n ]\n assert len(exhausted) == 1\n\n def test_raises_when_no_keys_at_all(self):\n clock = FakeClock()\n b = StubBackend(response={\"ok\": True})\n\n with patch.dict(os.environ, {}, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={Transport.ANTHROPIC: b, Transport.GEMINI: b},\n clock=clock,\n )\n with pytest.raises(AllArmsExhaustedError):\n chain.invoke({\"prompt\": \"no keys\"})\n\n def test_raises_when_all_arms_in_cooldown(self):\n clock = FakeClock(start=1000.0)\n b_anthropic = StubBackend(error=TransportError(\"429\", status_code=429))\n b_gemini = StubBackend(error=TransportError(\"429\", status_code=429))\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n cooldown_seconds=300.0,\n clock=clock,\n )\n # First call exhausts both arms with cooldown.\n with pytest.raises(AllArmsExhaustedError):\n chain.invoke({\"prompt\": \"1\"})\n\n # Second call 10s later: both still in cooldown.\n clock.advance(10.0)\n with pytest.raises(AllArmsExhaustedError):\n chain.invoke({\"prompt\": \"2\"})\n\n\n# ---------------------------------------------------------------------------\n# Tests: non-retriable errors\n# ---------------------------------------------------------------------------\n\nclass TestNonRetriableError:\n def test_500_does_not_set_cooldown(self):\n clock = FakeClock(start=1000.0)\n\n class FailOnceBackend:\n def __init__(self):\n self.calls = 0\n\n def invoke(self, payload):\n self.calls += 1\n if self.calls == 1:\n raise TransportError(\"internal error\", status_code=500)\n return {\"model\": \"anthropic\", \"ok\": True}\n\n b_anthropic = FailOnceBackend()\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n cooldown_seconds=300.0,\n clock=clock,\n )\n\n # First: anthropic 500, falls to gemini.\n r1 = chain.invoke({\"prompt\": \"1\"})\n assert r1 == {\"model\": \"gemini\"}\n\n # Second call immediately: anthropic retried (no cooldown for 500).\n r2 = chain.invoke({\"prompt\": \"2\"})\n assert r2 == {\"model\": \"anthropic\", \"ok\": True}\n\n\n# ---------------------------------------------------------------------------\n# Tests: unexpected (non-TransportError) exceptions\n# ---------------------------------------------------------------------------\n\nclass TestUnexpectedException:\n def test_unexpected_error_triggers_fallback(self):\n clock = FakeClock()\n\n class BrokenBackend:\n def invoke(self, payload):\n raise RuntimeError(\"segfault in the cloud\")\n\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: BrokenBackend(),\n Transport.GEMINI: b_gemini,\n },\n clock=clock,\n )\n result = chain.invoke({\"prompt\": \"test\"})\n\n assert result == {\"model\": \"gemini\"}\n fallback_events = [\n r for r in chain.audit_log if r.event == AuditEvent.FALLBACK_TRIGGERED\n ]\n assert len(fallback_events) == 1\n\n\n# ---------------------------------------------------------------------------\n# Tests: utility methods\n# ---------------------------------------------------------------------------\n\nclass TestUtilities:\n def test_arm_available_reflects_key_and_cooldown(self):\n clock = FakeClock(start=1000.0)\n\n env = _env_with_keys(Transport.ANTHROPIC)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={},\n clock=clock,\n )\n assert chain.arm_available(Transport.ANTHROPIC) is True\n assert chain.arm_available(Transport.GEMINI) is False # no key\n\n def test_reset_cooldown(self):\n clock = FakeClock(start=1000.0)\n b_anthropic = StubBackend(error=TransportError(\"429\", status_code=429))\n b_gemini = StubBackend(response={\"model\": \"gemini\"})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n },\n cooldown_seconds=300.0,\n clock=clock,\n )\n chain.invoke({\"prompt\": \"1\"})\n assert chain.arm_available(Transport.ANTHROPIC) is False\n\n chain.reset_cooldown(Transport.ANTHROPIC)\n assert chain.arm_available(Transport.ANTHROPIC) is True\n\n def test_available_arms(self):\n clock = FakeClock()\n b_anthropic = StubBackend()\n b_gemini = StubBackend()\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.GEMINI)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC, Transport.GEMINI, Transport.OPENAI],\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.GEMINI: b_gemini,\n # openai has no backend registered\n },\n clock=clock,\n )\n available = chain.available_arms()\n assert available == [Transport.ANTHROPIC, Transport.GEMINI]\n\n def test_clear_audit_log(self):\n clock = FakeClock()\n b = StubBackend(response={\"ok\": True})\n\n env = _env_with_keys(Transport.ANTHROPIC)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=[Transport.ANTHROPIC],\n backends={Transport.ANTHROPIC: b},\n clock=clock,\n )\n chain.invoke({\"prompt\": \"hi\"})\n assert len(chain.audit_log) > 0\n\n chain.clear_audit_log()\n assert len(chain.audit_log) == 0\n\n\n# ---------------------------------------------------------------------------\n# Tests: full default chain\n# ---------------------------------------------------------------------------\n\nclass TestFullChain:\n def test_degrades_through_entire_default_chain(self):\n \"\"\"Simulate: anthropic→429, gemini→no key, openai→503, huggingface→ok.\"\"\"\n clock = FakeClock()\n\n b_anthropic = StubBackend(error=TransportError(\"429\", status_code=429))\n # gemini: no API key set\n b_openai = StubBackend(error=TransportError(\"503\", status_code=503))\n b_hf = StubBackend(response={\"model\": \"huggingface\", \"fallback\": True})\n\n env = _env_with_keys(Transport.ANTHROPIC, Transport.OPENAI, Transport.HUGGINGFACE)\n with patch.dict(os.environ, env, clear=True):\n chain = FallbackChain(\n chain=list(DEFAULT_FALLBACK_CHAIN),\n backends={\n Transport.ANTHROPIC: b_anthropic,\n Transport.OPENAI: b_openai,\n Transport.HUGGINGFACE: b_hf,\n },\n cooldown_seconds=300.0,\n clock=clock,\n )\n result = chain.invoke({\"prompt\": \"full chain test\"})\n\n assert result == {\"model\": \"huggingface\", \"fallback\": True}\n\n events = [r.event for r in chain.audit_log]\n assert AuditEvent.ARM_COOLED_DOWN in events # anthropic 429\n assert AuditEvent.ARM_SKIPPED_NO_KEY in events # gemini\n assert AuditEvent.ARM_COOLED_DOWN in events # openai 503\n assert AuditEvent.REQUEST_SUCCESS in events # huggingface\n\n\n# Need this import for TestFullChain\nfrom c8_platform.foundry_bridge import DEFAULT_FALLBACK_CHAIN\n```\n\n## Design decisions and rationale\n\n**Fallback chain**: `anthropic → gemini → openai → huggingface` as specified. The chain is configurable per instance, so different transports (e.g., a coding task vs. a summarization task) can use different orderings.\n\n**Cooldown mechanism**: On HTTP 429 or 503, the arm is marked unavailable for `cooldown_seconds` (default 300s / 5 minutes). The timestamp comparison uses an injectable clock (`Callable[[], float]`) so tests are deterministic with no `time.sleep`. Non-retriable errors (e.g., 500) trigger fallback but do *not* set a cooldown — the arm will be retried on the next request.\n\n**Missing API key handling**: Checked via `os.environ` lookup on each invocation (so keys can be hot-loaded), but the \"missing key\" warning is logged exactly once per arm per `FallbackChain` instance via the `missing_key_warned` flag.\n\n**Audit log**: Every significant event — cooldowns, skips, fallbacks, successes, exhaustion — is recorded as an `AuditRecord` with timestamp, event type, transport, and human-readable detail. The log is also emitted via `logging.info` for integration with external log aggregation.\n\n**`AllArmsExhaustedError`**: Raised with the last encountered exception chained (`from last_error`) so callers can inspect the root cause."
}Task ID: c8081dc9-4f76-4e42-bb3e-56cf19e904af