← Tasks
[SELF-05] Structured error responses
completedcode_genP1
Description
## SELF-05: Error Handling
Add structured error responses to `rosetta_core/router.py`.
### Requirements
- Error format: {"error": {"code": "RATE_LIMITED", "message": "...", "retry_after": 30}}
- Error codes: RATE_LIMITED, INVALID_INPUT, PROVIDER_ERROR, TIMEOUT, INTERNAL
- Graceful degradation: if primary model fails, fall back to cheaper model
- Log all errors with request_id for traceability
### Target File
File: `c8_platform/rosetta_core/router.py`
Assignee
-
Claimed By
Cloud Lu → Claude Sonnet
Created
1d ago
Completed
1d ago
Quality
Overall
76%
Pass
PASS
Graded By
-
Graded At
-
Cost
Input Tokens
-
Output Tokens
-
Est. USD
-
Result
{
"cost": {
"input_tokens": 155,
"estimated_usd": 0.066585,
"output_tokens": 4408
},
"model": "claude-sonnet-4-20250514",
"status": "success",
"transport": "anthropic_api",
"latency_ms": 53799,
"response_text": "I'll implement structured error handling in the router with the specified error format, codes, and graceful degradation.\n\n```python\nimport asyncio\nimport logging\nimport time\nimport uuid\nfrom typing import Dict, Any, Optional, List\nfrom enum import Enum\nfrom dataclasses import dataclass\n\nlogger = logging.getLogger(__name__)\n\nclass ErrorCode(Enum):\n RATE_LIMITED = \"RATE_LIMITED\"\n INVALID_INPUT = \"INVALID_INPUT\"\n PROVIDER_ERROR = \"PROVIDER_ERROR\"\n TIMEOUT = \"TIMEOUT\"\n INTERNAL = \"INTERNAL\"\n\n@dataclass\nclass RosettaError(Exception):\n code: ErrorCode\n message: str\n retry_after: Optional[int] = None\n provider: Optional[str] = None\n request_id: Optional[str] = None\n \n def to_dict(self) -> Dict[str, Any]:\n error_dict = {\n \"code\": self.code.value,\n \"message\": self.message\n }\n if self.retry_after is not None:\n error_dict[\"retry_after\"] = self.retry_after\n return {\"error\": error_dict}\n\nclass ModelTier(Enum):\n PREMIUM = \"premium\"\n STANDARD = \"standard\"\n BUDGET = \"budget\"\n\n@dataclass\nclass ModelConfig:\n name: str\n provider: str\n tier: ModelTier\n cost_per_token: float\n max_tokens: int\n timeout: int = 30\n\nclass RosettaRouter:\n def __init__(self):\n self.providers = {}\n self.model_configs = self._init_model_configs()\n self.rate_limits = {}\n \n def _init_model_configs(self) -> Dict[str, ModelConfig]:\n \"\"\"Initialize model configurations with fallback tiers\"\"\"\n return {\n \"gpt-4\": ModelConfig(\"gpt-4\", \"openai\", ModelTier.PREMIUM, 0.06, 8192),\n \"gpt-3.5-turbo\": ModelConfig(\"gpt-3.5-turbo\", \"openai\", ModelTier.STANDARD, 0.002, 4096),\n \"claude-2\": ModelConfig(\"claude-2\", \"anthropic\", ModelTier.PREMIUM, 0.032, 8192),\n \"claude-instant\": ModelConfig(\"claude-instant\", \"anthropic\", ModelTier.BUDGET, 0.0016, 8192),\n \"llama-70b\": ModelConfig(\"llama-70b\", \"together\", ModelTier.STANDARD, 0.0009, 4096),\n \"llama-7b\": ModelConfig(\"llama-7b\", \"together\", ModelTier.BUDGET, 0.0002, 4096),\n }\n \n def register_provider(self, name: str, provider_instance):\n \"\"\"Register a provider instance\"\"\"\n self.providers[name] = provider_instance\n logger.info(f\"Registered provider: {name}\")\n \n def _generate_request_id(self) -> str:\n \"\"\"Generate unique request ID for traceability\"\"\"\n return str(uuid.uuid4())\n \n def _get_fallback_models(self, primary_model: str) -> List[str]:\n \"\"\"Get fallback models in order of preference (cheaper tiers)\"\"\"\n if primary_model not in self.model_configs:\n return []\n \n primary_config = self.model_configs[primary_model]\n primary_provider = primary_config.provider\n \n # Find fallback models from same provider, cheaper tiers\n fallbacks = []\n for model_name, config in self.model_configs.items():\n if (config.provider == primary_provider and \n config.tier.value != primary_config.tier.value and\n config.cost_per_token < primary_config.cost_per_token):\n fallbacks.append(model_name)\n \n # Sort by cost (cheapest first for fallback)\n fallbacks.sort(key=lambda m: self.model_configs[m].cost_per_token)\n return fallbacks\n \n def _check_rate_limit(self, provider: str, request_id: str) -> Optional[RosettaError]:\n \"\"\"Check if provider is rate limited\"\"\"\n current_time = time.time()\n \n if provider in self.rate_limits:\n limit_info = self.rate_limits[provider]\n if current_time < limit_info['reset_time']:\n retry_after = int(limit_info['reset_time'] - current_time)\n error = RosettaError(\n code=ErrorCode.RATE_LIMITED,\n message=f\"Rate limit exceeded for provider {provider}\",\n retry_after=retry_after,\n provider=provider,\n request_id=request_id\n )\n logger.warning(\n f\"Rate limit hit - Request ID: {request_id}, Provider: {provider}, \"\n f\"Retry after: {retry_after}s\"\n )\n return error\n \n return None\n \n def _validate_request(self, messages: List[Dict], model: str, request_id: str) -> Optional[RosettaError]:\n \"\"\"Validate incoming request\"\"\"\n if not messages:\n return RosettaError(\n code=ErrorCode.INVALID_INPUT,\n message=\"Messages cannot be empty\",\n request_id=request_id\n )\n \n if model not in self.model_configs:\n return RosettaError(\n code=ErrorCode.INVALID_INPUT,\n message=f\"Unsupported model: {model}\",\n request_id=request_id\n )\n \n # Validate message format\n for i, msg in enumerate(messages):\n if not isinstance(msg, dict) or 'role' not in msg or 'content' not in msg:\n return RosettaError(\n code=ErrorCode.INVALID_INPUT,\n message=f\"Invalid message format at index {i}\",\n request_id=request_id\n )\n \n return None\n \n async def _call_model_with_timeout(self, provider_instance, messages: List[Dict], \n model: str, request_id: str, **kwargs) -> Dict[str, Any]:\n \"\"\"Call model with timeout handling\"\"\"\n config = self.model_configs[model]\n \n try:\n response = await asyncio.wait_for(\n provider_instance.complete(messages, model=model, **kwargs),\n timeout=config.timeout\n )\n return response\n \n except asyncio.TimeoutError:\n error = RosettaError(\n code=ErrorCode.TIMEOUT,\n message=f\"Request timed out after {config.timeout}s\",\n provider=config.provider,\n request_id=request_id\n )\n logger.error(\n f\"Timeout error - Request ID: {request_id}, Model: {model}, \"\n f\"Timeout: {config.timeout}s\"\n )\n raise error\n \n except Exception as e:\n # Handle provider-specific errors\n if \"rate\" in str(e).lower() and \"limit\" in str(e).lower():\n # Set rate limit info\n self.rate_limits[config.provider] = {\n 'reset_time': time.time() + 60 # Default 60s backoff\n }\n error = RosettaError(\n code=ErrorCode.RATE_LIMITED,\n message=str(e),\n retry_after=60,\n provider=config.provider,\n request_id=request_id\n )\n else:\n error = RosettaError(\n code=ErrorCode.PROVIDER_ERROR,\n message=f\"Provider error: {str(e)}\",\n provider=config.provider,\n request_id=request_id\n )\n \n logger.error(\n f\"Provider error - Request ID: {request_id}, Model: {model}, \"\n f\"Provider: {config.provider}, Error: {str(e)}\"\n )\n raise error\n \n async def route_request(self, messages: List[Dict], model: str, **kwargs) -> Dict[str, Any]:\n \"\"\"\n Route request with structured error handling and graceful degradation\n \"\"\"\n request_id = self._generate_request_id()\n \n logger.info(f\"Processing request - Request ID: {request_id}, Model: {model}\")\n \n try:\n # Validate request\n validation_error = self._validate_request(messages, model, request_id)\n if validation_error:\n logger.error(f\"Validation failed - Request ID: {request_id}, Error: {validation_error.message}\")\n raise validation_error\n \n config = self.model_configs[model]\n provider_name = config.provider\n \n # Check if provider is registered\n if provider_name not in self.providers:\n error = RosettaError(\n code=ErrorCode.PROVIDER_ERROR,\n message=f\"Provider {provider_name} not available\",\n provider=provider_name,\n request_id=request_id\n )\n logger.error(f\"Provider not available - Request ID: {request_id}, Provider: {provider_name}\")\n raise error\n \n # Get primary provider\n provider_instance = self.providers[provider_name]\n \n # Check rate limits for primary model\n rate_limit_error = self._check_rate_limit(provider_name, request_id)\n if rate_limit_error:\n raise rate_limit_error\n \n # Try primary model first\n try:\n response = await self._call_model_with_timeout(\n provider_instance, messages, model, request_id, **kwargs\n )\n \n logger.info(f\"Request successful - Request ID: {request_id}, Model: {model}\")\n return {\n \"response\": response,\n \"model_used\": model,\n \"request_id\": request_id\n }\n \n except RosettaError as e:\n # If it's a rate limit or provider error, try fallback\n if e.code in [ErrorCode.RATE_LIMITED, ErrorCode.PROVIDER_ERROR, ErrorCode.TIMEOUT]:\n logger.warning(\n f\"Primary model failed, attempting fallback - Request ID: {request_id}, \"\n f\"Primary model: {model}, Error: {e.message}\"\n )\n \n # Attempt graceful degradation\n fallback_models = self._get_fallback_models(model)\n \n for fallback_model in fallback_models:\n try:\n fallback_config = self.model_configs[fallback_model]\n fallback_provider = self.providers[fallback_config.provider]\n \n # Check rate limits for fallback\n fallback_rate_error = self._check_rate_limit(fallback_config.provider, request_id)\n if fallback_rate_error:\n continue # Try next fallback\n \n logger.info(\n f\"Trying fallback model - Request ID: {request_id}, \"\n f\"Fallback model: {fallback_model}\"\n )\n \n response = await self._call_model_with_timeout(\n fallback_provider, messages, fallback_model, request_id, **kwargs\n )\n \n logger.info(\n f\"Fallback successful - Request ID: {request_id}, \"\n f\"Model used: {fallback_model} (fallback from {model})\"\n )\n \n return {\n \"response\": response,\n \"model_used\": fallback_model,\n \"fallback_from\": model,\n \"request_id\": request_id\n }\n \n except RosettaError:\n # Continue to next fallback\n logger.warning(\n f\"Fallback model failed - Request ID: {request_id}, \"\n f\"Fallback model: {fallback_model}\"\n )\n continue\n \n # No fallbacks worked, raise original error\n logger.error(\n f\"All fallback attempts failed - Request ID: {request_id}, \"\n f\"Primary model: {model}, Fallbacks tried: {fallback_models}\"\n )\n \n raise e\n \n except RosettaError:\n # Re-raise structured errors\n raise\n except Exception as e:\n # Handle unexpected errors\n error = RosettaError(\n code=ErrorCode.INTERNAL,\n message=f\"Internal error: {str(e)}\",\n request_id=request_id\n )\n logger.error(\n f\"Internal error - Request ID: {request_id}, Error: {str(e)}\",\n exc_info=True\n )\n raise error\n \n async def handle_request(self, messages: List[Dict], model: str, **kwargs) -> Dict[str, Any]:\n \"\"\"\n Public interface that returns structured error responses\n \"\"\"\n try:\n return await self.route_request(messages, model, **kwargs)\n except RosettaError as e:\n return e.to_dict()\n except Exception as e:\n # Fallback for any unhandled errors\n error = RosettaError(\n code=ErrorCode.INTERNAL,\n message=\"An unexpected error occurred\"\n )\n return error.to_dict()\n\n# Example usage and testing\nif __name__ == \"__main__\":\n import json\n \n # Mock provider for testing\n class MockProvider:\n def __init__(self, name: str, should_fail: bool = False, fail_type: str = \"none\"):\n self.name = name\n self.should_fail = should_fail\n self.fail_type = fail_type\n \n async def complete(self, messages: List[Dict], model: str, **kwargs) -> Dict[str, Any]:\n if self.should_fail:\n if self.fail_type == \"rate_limit\":\n raise Exception(\"Rate limit exceeded\")\n elif self.fail_type == \"timeout\":\n await asyncio.sleep(35) # Longer than timeout\n else:\n raise Exception(\"Provider error\")\n \n return {\n \"choices\": [{\n \"message\": {\n \"role\": \"assistant\",\n \"content\": f\"Response from {model} via {self.name}\"\n }\n }]\n }\n \n async def test_router():\n router = RosettaRouter()\n \n # Register mock providers\n router.register_provider(\"openai\", MockProvider(\"openai\"))\n router.register_provider(\"anthropic\", MockProvider(\"anthropic\"))\n \n # Test successful request\n print(\"=== Test 1: Successful Request ===\")\n result = await router.handle_request(\n messages=[{\"role\": \"user\", \"content\": \"Hello\"}],\n model=\"gpt-4\"\n )\n print(json.dumps(result, indent=2))\n \n # Test with failing provider and fallback\n print(\"\\n=== Test 2: Provider Failure with Fallback ===\")\n router.register_provider(\"openai\", MockProvider(\"openai\", should_fail=True))\n \n result = await router.handle_request(\n messages=[{\"role\": \"user\", \"content\": \"Hello\"}],\n model=\"gpt-4\"\n )\n print(json.dumps(result, indent=2))\n \n # Test invalid input\n print(\"\\n=== Test 3: Invalid Input ===\")\n result = await router.handle_request(\n messages=[],\n model=\"gpt-4\"\n )\n print(json.dumps(result, indent=2))\n \n # Test unsupported model\n print(\"\\n=== Test 4: Unsupported Model ===\")\n result = await router.handle_request(\n messages=[{\"role\": \"user\", \"content\": \"Hello\"}],\n model=\"unknown-model\"\n )\n print(json.dumps(result, indent=2))\n \n # Run tests\n asyncio.run(test_router())\n```\n\nThis implementation provides:\n\n## Key Features\n\n1. **Structured Error Format**: All errors follow the format `{\"error\": {\"code\": \"...\", \"message\": \"...\", \"retry_after\": ...}}`\n\n2. **Error Codes**: Implements all required error codes:\n - `RATE_LIMITED`: With retry_after field\n - `INVALID_INPUT`: For validation errors\n - `PROVIDER_ERROR`: For provider-specific issues\n - `TIMEOUT`: For request timeouts\n - `INTERNAL`: For unexpected errors\n\n3. **Graceful Degradation**: \n - Automatically falls back to cheaper models from the same provider\n - Tries multiple fallbacks in order of cost\n - Maintains request traceability through fallbacks\n\n4. **Request Traceability**:\n - Every request gets a unique `request_id`\n - All errors and logs include the request_id\n - Comprehensive logging at each step\n\n5. **Rate Limit Handling**:\n - Tracks rate limits per provider\n - Provides retry_after information\n - Respects rate limits in fallback attempts\n\n6. **Timeout Management**:\n - Per-model timeout configuration\n - Proper async timeout handling\n - Clear timeout error messages\n\nThe router now provides robust error handling while maintaining the ability to gracefully degrade to cheaper models when the primary model fails, all while keeping detailed logs for debugging and monitoring."
}Task ID: 9c9f01f3-14ff-4824-b11b-752b716e56a5