FOUNDRY
C8 Platform
← Tasks

[SELF-09] Rich health endpoint

completedcode_genP1

Description

## SELF-09: Health Endpoint Expand `healthcheck_wrapper.py` to return rich metrics. ### Requirements - Cycle count, uptime seconds, last cycle timestamp - Dispatch success/failure rate (last 100 cycles) - Cumulative cost tracker - Circuit breaker status per arm - Queue depth (pending task count) - Return as JSON at GET / ### Target File File: `c8_platform/healthcheck_wrapper.py`
Assignee
-
Claimed By
Cloud Lu → Claude Sonnet
Created
1d ago
Completed
1d ago

Quality

Overall
70%
Pass
PASS
Graded By
-
Graded At
-

Cost

Input Tokens
-
Output Tokens
-
Est. USD
-

Result

{
  "cost": {
    "input_tokens": 125,
    "estimated_usd": 0.058665,
    "output_tokens": 3886
  },
  "model": "claude-sonnet-4-20250514",
  "status": "success",
  "transport": "anthropic_api",
  "latency_ms": 48500,
  "response_text": "I'll expand the healthcheck wrapper to return rich metrics as specified. Here's the implementation:\n\n```python\n\"\"\"\nRich health endpoint wrapper for dispatch engine.\nProvides comprehensive system metrics and status information.\n\"\"\"\n\nimport json\nimport time\nimport threading\nfrom collections import deque\nfrom dataclasses import dataclass, asdict\nfrom typing import Dict, Any, Optional\nfrom flask import Flask, jsonify\n\n@dataclass\nclass DispatchMetrics:\n    \"\"\"Metrics for dispatch operations.\"\"\"\n    total_cycles: int = 0\n    successful_cycles: int = 0\n    failed_cycles: int = 0\n    start_time: float = 0\n    last_cycle_timestamp: Optional[float] = None\n    cumulative_cost: float = 0.0\n    \n    def __post_init__(self):\n        if self.start_time == 0:\n            self.start_time = time.time()\n\n@dataclass\nclass CircuitBreakerStatus:\n    \"\"\"Status of a circuit breaker.\"\"\"\n    state: str  # CLOSED, OPEN, HALF_OPEN\n    failure_count: int = 0\n    last_failure_time: Optional[float] = None\n    next_attempt_time: Optional[float] = None\n\nclass HealthMetricsCollector:\n    \"\"\"Collects and manages health metrics for the dispatch engine.\"\"\"\n    \n    def __init__(self, window_size: int = 100):\n        self.window_size = window_size\n        self.metrics = DispatchMetrics()\n        self.recent_results = deque(maxlen=window_size)  # True for success, False for failure\n        self.circuit_breakers: Dict[str, CircuitBreakerStatus] = {}\n        self.queue_depth = 0\n        self.lock = threading.RLock()\n        \n    def record_cycle_start(self):\n        \"\"\"Record the start of a dispatch cycle.\"\"\"\n        with self.lock:\n            self.metrics.total_cycles += 1\n            self.metrics.last_cycle_timestamp = time.time()\n    \n    def record_cycle_success(self, cost: float = 0.0):\n        \"\"\"Record a successful dispatch cycle.\"\"\"\n        with self.lock:\n            self.metrics.successful_cycles += 1\n            self.metrics.cumulative_cost += cost\n            self.recent_results.append(True)\n    \n    def record_cycle_failure(self, cost: float = 0.0):\n        \"\"\"Record a failed dispatch cycle.\"\"\"\n        with self.lock:\n            self.metrics.failed_cycles += 1\n            self.metrics.cumulative_cost += cost\n            self.recent_results.append(False)\n    \n    def update_circuit_breaker(self, arm_name: str, state: str, \n                             failure_count: int = 0, \n                             last_failure_time: Optional[float] = None,\n                             next_attempt_time: Optional[float] = None):\n        \"\"\"Update circuit breaker status for a specific arm.\"\"\"\n        with self.lock:\n            self.circuit_breakers[arm_name] = CircuitBreakerStatus(\n                state=state,\n                failure_count=failure_count,\n                last_failure_time=last_failure_time,\n                next_attempt_time=next_attempt_time\n            )\n    \n    def update_queue_depth(self, depth: int):\n        \"\"\"Update the current queue depth.\"\"\"\n        with self.lock:\n            self.queue_depth = depth\n    \n    def get_success_rate(self) -> float:\n        \"\"\"Calculate success rate for recent cycles.\"\"\"\n        with self.lock:\n            if not self.recent_results:\n                return 0.0\n            successes = sum(1 for result in self.recent_results if result)\n            return successes / len(self.recent_results)\n    \n    def get_failure_rate(self) -> float:\n        \"\"\"Calculate failure rate for recent cycles.\"\"\"\n        return 1.0 - self.get_success_rate()\n    \n    def get_uptime_seconds(self) -> float:\n        \"\"\"Get uptime in seconds since start.\"\"\"\n        return time.time() - self.metrics.start_time\n    \n    def get_health_data(self) -> Dict[str, Any]:\n        \"\"\"Get comprehensive health data.\"\"\"\n        with self.lock:\n            return {\n                \"cycle_count\": self.metrics.total_cycles,\n                \"uptime_seconds\": round(self.get_uptime_seconds(), 2),\n                \"last_cycle_timestamp\": self.metrics.last_cycle_timestamp,\n                \"dispatch_metrics\": {\n                    \"success_rate\": round(self.get_success_rate(), 4),\n                    \"failure_rate\": round(self.get_failure_rate(), 4),\n                    \"successful_cycles\": self.metrics.successful_cycles,\n                    \"failed_cycles\": self.metrics.failed_cycles,\n                    \"window_size\": len(self.recent_results),\n                    \"max_window_size\": self.window_size\n                },\n                \"cumulative_cost\": round(self.metrics.cumulative_cost, 6),\n                \"circuit_breakers\": {\n                    name: asdict(status) \n                    for name, status in self.circuit_breakers.items()\n                },\n                \"queue_depth\": self.queue_depth,\n                \"timestamp\": time.time()\n            }\n\nclass HealthCheckWrapper:\n    \"\"\"Flask wrapper providing rich health endpoint.\"\"\"\n    \n    def __init__(self, dispatch_engine, host: str = \"0.0.0.0\", port: int = 8080):\n        self.dispatch_engine = dispatch_engine\n        self.host = host\n        self.port = port\n        self.metrics_collector = HealthMetricsCollector()\n        \n        # Setup Flask app\n        self.app = Flask(__name__)\n        self.app.add_url_rule('/', 'health', self.health_endpoint, methods=['GET'])\n        self.app.add_url_rule('/health', 'health_alt', self.health_endpoint, methods=['GET'])\n        self.app.add_url_rule('/metrics', 'metrics', self.metrics_endpoint, methods=['GET'])\n        \n        # Monkey patch the dispatch engine to collect metrics\n        self._patch_dispatch_engine()\n    \n    def _patch_dispatch_engine(self):\n        \"\"\"Patch the dispatch engine to collect metrics.\"\"\"\n        if not hasattr(self.dispatch_engine, '_original_process_cycle'):\n            # Store original method\n            self.dispatch_engine._original_process_cycle = getattr(\n                self.dispatch_engine, 'process_cycle', None\n            )\n            \n            # Replace with instrumented version\n            if self.dispatch_engine._original_process_cycle:\n                self.dispatch_engine.process_cycle = self._instrumented_process_cycle\n        \n        # Patch queue depth monitoring if available\n        if hasattr(self.dispatch_engine, 'get_queue_depth'):\n            self._update_queue_depth_periodically()\n        \n        # Patch circuit breaker monitoring if available\n        if hasattr(self.dispatch_engine, 'get_circuit_breaker_status'):\n            self._update_circuit_breaker_status()\n    \n    def _instrumented_process_cycle(self, *args, **kwargs):\n        \"\"\"Instrumented version of process_cycle that collects metrics.\"\"\"\n        self.metrics_collector.record_cycle_start()\n        \n        try:\n            # Call original method\n            result = self.dispatch_engine._original_process_cycle(*args, **kwargs)\n            \n            # Extract cost if available in result\n            cost = 0.0\n            if isinstance(result, dict) and 'cost' in result:\n                cost = result.get('cost', 0.0)\n            elif hasattr(result, 'cost'):\n                cost = getattr(result, 'cost', 0.0)\n            \n            # Determine if cycle was successful\n            if self._is_successful_result(result):\n                self.metrics_collector.record_cycle_success(cost)\n            else:\n                self.metrics_collector.record_cycle_failure(cost)\n            \n            return result\n            \n        except Exception as e:\n            self.metrics_collector.record_cycle_failure()\n            raise\n    \n    def _is_successful_result(self, result) -> bool:\n        \"\"\"Determine if a cycle result indicates success.\"\"\"\n        if result is None:\n            return False\n        if isinstance(result, bool):\n            return result\n        if isinstance(result, dict):\n            return result.get('success', True) and not result.get('error')\n        if hasattr(result, 'success'):\n            return getattr(result, 'success', True)\n        return True  # Assume success if no clear indication\n    \n    def _update_queue_depth_periodically(self):\n        \"\"\"Update queue depth from dispatch engine.\"\"\"\n        try:\n            depth = self.dispatch_engine.get_queue_depth()\n            self.metrics_collector.update_queue_depth(depth)\n        except Exception:\n            pass  # Ignore errors in queue depth monitoring\n    \n    def _update_circuit_breaker_status(self):\n        \"\"\"Update circuit breaker status from dispatch engine.\"\"\"\n        try:\n            status_dict = self.dispatch_engine.get_circuit_breaker_status()\n            for arm_name, status_info in status_dict.items():\n                if isinstance(status_info, dict):\n                    self.metrics_collector.update_circuit_breaker(\n                        arm_name=arm_name,\n                        state=status_info.get('state', 'UNKNOWN'),\n                        failure_count=status_info.get('failure_count', 0),\n                        last_failure_time=status_info.get('last_failure_time'),\n                        next_attempt_time=status_info.get('next_attempt_time')\n                    )\n        except Exception:\n            pass  # Ignore errors in circuit breaker monitoring\n    \n    def health_endpoint(self):\n        \"\"\"Main health endpoint returning comprehensive metrics.\"\"\"\n        try:\n            # Update dynamic metrics before returning\n            self._update_queue_depth_periodically()\n            self._update_circuit_breaker_status()\n            \n            health_data = self.metrics_collector.get_health_data()\n            \n            # Add engine-specific status if available\n            if hasattr(self.dispatch_engine, 'is_healthy'):\n                health_data['engine_healthy'] = self.dispatch_engine.is_healthy()\n            \n            # Add system status indicator\n            health_data['status'] = self._determine_overall_status(health_data)\n            \n            return jsonify(health_data)\n            \n        except Exception as e:\n            return jsonify({\n                \"status\": \"ERROR\",\n                \"error\": str(e),\n                \"timestamp\": time.time()\n            }), 500\n    \n    def metrics_endpoint(self):\n        \"\"\"Simplified metrics endpoint for monitoring systems.\"\"\"\n        try:\n            health_data = self.metrics_collector.get_health_data()\n            \n            # Return simplified metrics\n            return jsonify({\n                \"cycle_count\": health_data[\"cycle_count\"],\n                \"uptime_seconds\": health_data[\"uptime_seconds\"],\n                \"success_rate\": health_data[\"dispatch_metrics\"][\"success_rate\"],\n                \"queue_depth\": health_data[\"queue_depth\"],\n                \"cumulative_cost\": health_data[\"cumulative_cost\"],\n                \"circuit_breakers_open\": sum(\n                    1 for cb in health_data[\"circuit_breakers\"].values()\n                    if cb[\"state\"] == \"OPEN\"\n                ),\n                \"status\": self._determine_overall_status(health_data)\n            })\n            \n        except Exception as e:\n            return jsonify({\n                \"status\": \"ERROR\",\n                \"error\": str(e)\n            }), 500\n    \n    def _determine_overall_status(self, health_data: Dict[str, Any]) -> str:\n        \"\"\"Determine overall system status based on metrics.\"\"\"\n        try:\n            # Check for critical failures\n            success_rate = health_data[\"dispatch_metrics\"][\"success_rate\"]\n            open_breakers = sum(\n                1 for cb in health_data[\"circuit_breakers\"].values()\n                if cb[\"state\"] == \"OPEN\"\n            )\n            queue_depth = health_data[\"queue_depth\"]\n            \n            # Determine status based on thresholds\n            if success_rate < 0.5 or open_breakers > 2:\n                return \"CRITICAL\"\n            elif success_rate < 0.8 or open_breakers > 0 or queue_depth > 100:\n                return \"WARNING\"\n            else:\n                return \"HEALTHY\"\n                \n        except Exception:\n            return \"UNKNOWN\"\n    \n    def run(self, debug: bool = False, threaded: bool = True):\n        \"\"\"Start the health check server.\"\"\"\n        self.app.run(\n            host=self.host,\n            port=self.port,\n            debug=debug,\n            threaded=threaded\n        )\n\ndef create_health_wrapper(dispatch_engine, **kwargs) -> HealthCheckWrapper:\n    \"\"\"Factory function to create a health check wrapper.\"\"\"\n    return HealthCheckWrapper(dispatch_engine, **kwargs)\n\n# Example usage\nif __name__ == \"__main__\":\n    # Mock dispatch engine for testing\n    class MockDispatchEngine:\n        def __init__(self):\n            self.cycle_count = 0\n            \n        def process_cycle(self):\n            self.cycle_count += 1\n            # Simulate occasional failures\n            import random\n            if random.random() < 0.1:\n                raise Exception(\"Simulated failure\")\n            return {\"success\": True, \"cost\": random.uniform(0.001, 0.01)}\n        \n        def get_queue_depth(self):\n            import random\n            return random.randint(0, 50)\n        \n        def get_circuit_breaker_status(self):\n            return {\n                \"openai_arm\": {\"state\": \"CLOSED\", \"failure_count\": 0},\n                \"anthropic_arm\": {\"state\": \"OPEN\", \"failure_count\": 5, \"last_failure_time\": time.time() - 300}\n            }\n        \n        def is_healthy(self):\n            return True\n    \n    # Create and run health wrapper\n    engine = MockDispatchEngine()\n    wrapper = create_health_wrapper(engine, port=8080)\n    \n    print(\"Starting health check server on http://localhost:8080\")\n    print(\"Endpoints:\")\n    print(\"  GET / - Full health metrics\")\n    print(\"  GET /health - Full health metrics\") \n    print(\"  GET /metrics - Simplified metrics\")\n    \n    wrapper.run(debug=True)\n```\n\nThis implementation provides:\n\n## Key Features\n\n1. **Rich Metrics Collection**:\n   - Cycle count, uptime seconds, last cycle timestamp\n   - Success/failure rates with configurable sliding window (default 100 cycles)\n   - Cumulative cost tracking\n   - Circuit breaker status per arm\n   - Queue depth monitoring\n\n2. **Multiple Endpoints**:\n   - `GET /` and `GET /health` - Full health metrics\n   - `GET /metrics` - Simplified metrics for monitoring systems\n\n3. **Automatic Instrumentation**:\n   - Monkey patches the dispatch engine's `process_cycle` method\n   - Collects metrics without modifying the original engine code\n   - Thread-safe metric collection with RLock\n\n4. **Comprehensive Status**:\n   - Overall system status (HEALTHY, WARNING, CRITICAL, ERROR)\n   - Individual circuit breaker states\n   - Real-time queue depth\n   - Cost tracking across all operations\n\n5. **Error Handling**:\n   - Graceful degradation when metrics can't be collected\n   - 500 status codes for server errors\n   - Safe defaults for missing data\n\nThe health endpoint returns JSON with all the requested metrics, making it easy to integrate with monitoring systems and dashboards."
}
Task ID: eb3f3c4a-3425-4861-a073-a76fede3a528