FOUNDRY
C8 Platform
← Tasks

[SELF-04] Router rate limiting (token bucket)

completedcode_genP1

Description

## SELF-04: Rate Limiting Add per-IP and per-session rate limiting to `rosetta_core/router.py`. ### Requirements - Token bucket algorithm (stdlib, no Redis) - Per-IP: 60 requests/minute, burst of 10 - Per-session (API key): 1000 requests/hour - Return 429 Too Many Requests with Retry-After header - Thread-safe (use threading.Lock) ### Target File File: `c8_platform/rosetta_core/router.py`
Assignee
-
Claimed By
Cloud Lu → Claude Sonnet
Created
1d ago
Completed
1d ago

Quality

Overall
86%
Pass
PASS
Graded By
-
Graded At
-

Cost

Input Tokens
-
Output Tokens
-
Est. USD
-

Result

{
  "cost": {
    "input_tokens": 148,
    "estimated_usd": 0.058224,
    "output_tokens": 3852
  },
  "model": "claude-sonnet-4-20250514",
  "status": "success",
  "transport": "anthropic_api",
  "latency_ms": 47900,
  "response_text": "I'll implement rate limiting for the router using the token bucket algorithm with the specified requirements.\n\n```python\nimport time\nimport threading\nfrom collections import defaultdict\nfrom typing import Dict, Tuple, Optional\nfrom flask import request, jsonify, g\nimport functools\n\nclass TokenBucket:\n    \"\"\"Thread-safe token bucket implementation for rate limiting.\"\"\"\n    \n    def __init__(self, capacity: int, refill_rate: float, refill_period: float = 1.0):\n        \"\"\"\n        Initialize token bucket.\n        \n        Args:\n            capacity: Maximum number of tokens in bucket\n            refill_rate: Number of tokens added per refill_period\n            refill_period: Time period for refill in seconds\n        \"\"\"\n        self.capacity = capacity\n        self.tokens = capacity\n        self.refill_rate = refill_rate\n        self.refill_period = refill_period\n        self.last_refill = time.time()\n        self.lock = threading.Lock()\n    \n    def consume(self, tokens: int = 1) -> bool:\n        \"\"\"\n        Try to consume tokens from bucket.\n        \n        Args:\n            tokens: Number of tokens to consume\n            \n        Returns:\n            True if tokens were consumed, False if insufficient tokens\n        \"\"\"\n        with self.lock:\n            now = time.time()\n            \n            # Calculate tokens to add based on time elapsed\n            time_elapsed = now - self.last_refill\n            tokens_to_add = (time_elapsed / self.refill_period) * self.refill_rate\n            \n            # Refill bucket (up to capacity)\n            self.tokens = min(self.capacity, self.tokens + tokens_to_add)\n            self.last_refill = now\n            \n            # Try to consume tokens\n            if self.tokens >= tokens:\n                self.tokens -= tokens\n                return True\n            \n            return False\n    \n    def time_until_available(self, tokens: int = 1) -> float:\n        \"\"\"\n        Calculate time until enough tokens are available.\n        \n        Args:\n            tokens: Number of tokens needed\n            \n        Returns:\n            Time in seconds until tokens are available\n        \"\"\"\n        with self.lock:\n            if self.tokens >= tokens:\n                return 0.0\n            \n            tokens_needed = tokens - self.tokens\n            return (tokens_needed / self.refill_rate) * self.refill_period\n\n\nclass RateLimiter:\n    \"\"\"Multi-tier rate limiter with per-IP and per-session limits.\"\"\"\n    \n    def __init__(self):\n        self.ip_buckets: Dict[str, TokenBucket] = {}\n        self.session_buckets: Dict[str, TokenBucket] = {}\n        self.cleanup_lock = threading.Lock()\n        self.last_cleanup = time.time()\n        \n        # Configuration\n        self.ip_config = {\n            'capacity': 10,  # burst of 10\n            'refill_rate': 1.0,  # 60 requests per 60 seconds = 1 per second\n            'refill_period': 1.0\n        }\n        \n        self.session_config = {\n            'capacity': 50,  # reasonable burst\n            'refill_rate': 1000.0 / 3600.0,  # 1000 requests per hour\n            'refill_period': 1.0\n        }\n        \n        # Cleanup old buckets every 10 minutes\n        self.cleanup_interval = 600\n        self.bucket_ttl = 3600  # Keep buckets for 1 hour of inactivity\n    \n    def _cleanup_old_buckets(self):\n        \"\"\"Remove old, unused buckets to prevent memory leaks.\"\"\"\n        now = time.time()\n        \n        if now - self.last_cleanup < self.cleanup_interval:\n            return\n        \n        with self.cleanup_lock:\n            if now - self.last_cleanup < self.cleanup_interval:\n                return  # Double-check after acquiring lock\n            \n            # Clean IP buckets\n            expired_ips = []\n            for ip, bucket in self.ip_buckets.items():\n                if now - bucket.last_refill > self.bucket_ttl:\n                    expired_ips.append(ip)\n            \n            for ip in expired_ips:\n                del self.ip_buckets[ip]\n            \n            # Clean session buckets\n            expired_sessions = []\n            for session_id, bucket in self.session_buckets.items():\n                if now - bucket.last_refill > self.bucket_ttl:\n                    expired_sessions.append(session_id)\n            \n            for session_id in expired_sessions:\n                del self.session_buckets[session_id]\n            \n            self.last_cleanup = now\n    \n    def _get_client_ip(self) -> str:\n        \"\"\"Get client IP address, handling proxies.\"\"\"\n        # Check for forwarded IP headers (common in production)\n        forwarded_ips = request.headers.get('X-Forwarded-For')\n        if forwarded_ips:\n            return forwarded_ips.split(',')[0].strip()\n        \n        real_ip = request.headers.get('X-Real-IP')\n        if real_ip:\n            return real_ip.strip()\n        \n        return request.remote_addr or 'unknown'\n    \n    def _get_session_id(self) -> Optional[str]:\n        \"\"\"Get session identifier (API key or session ID).\"\"\"\n        # Check for API key in header\n        api_key = request.headers.get('Authorization')\n        if api_key and api_key.startswith('Bearer '):\n            return api_key[7:]  # Remove 'Bearer ' prefix\n        \n        # Check for API key in query params (less preferred)\n        api_key = request.args.get('api_key')\n        if api_key:\n            return api_key\n        \n        # Check for session ID in headers or cookies\n        session_id = request.headers.get('X-Session-ID')\n        if session_id:\n            return session_id\n        \n        # Fallback to session cookie if available\n        if hasattr(g, 'session_id'):\n            return g.session_id\n        \n        return None\n    \n    def _get_ip_bucket(self, ip: str) -> TokenBucket:\n        \"\"\"Get or create token bucket for IP address.\"\"\"\n        if ip not in self.ip_buckets:\n            self.ip_buckets[ip] = TokenBucket(**self.ip_config)\n        return self.ip_buckets[ip]\n    \n    def _get_session_bucket(self, session_id: str) -> TokenBucket:\n        \"\"\"Get or create token bucket for session.\"\"\"\n        if session_id not in self.session_buckets:\n            self.session_buckets[session_id] = TokenBucket(**self.session_config)\n        return self.session_buckets[session_id]\n    \n    def check_rate_limit(self) -> Tuple[bool, Optional[str], Optional[int]]:\n        \"\"\"\n        Check if request should be rate limited.\n        \n        Returns:\n            Tuple of (allowed, limit_type, retry_after_seconds)\n        \"\"\"\n        self._cleanup_old_buckets()\n        \n        client_ip = self._get_client_ip()\n        session_id = self._get_session_id()\n        \n        # Check IP rate limit\n        ip_bucket = self._get_ip_bucket(client_ip)\n        if not ip_bucket.consume():\n            retry_after = int(ip_bucket.time_until_available()) + 1\n            return False, 'ip', retry_after\n        \n        # Check session rate limit (if session ID available)\n        if session_id:\n            session_bucket = self._get_session_bucket(session_id)\n            if not session_bucket.consume():\n                retry_after = int(session_bucket.time_until_available()) + 1\n                return False, 'session', retry_after\n        \n        return True, None, None\n\n\n# Global rate limiter instance\nrate_limiter = RateLimiter()\n\n\ndef rate_limit_decorator(f):\n    \"\"\"Decorator to apply rate limiting to routes.\"\"\"\n    @functools.wraps(f)\n    def decorated_function(*args, **kwargs):\n        allowed, limit_type, retry_after = rate_limiter.check_rate_limit()\n        \n        if not allowed:\n            response_data = {\n                'error': 'Rate limit exceeded',\n                'message': f'Too many requests from {\"IP address\" if limit_type == \"ip\" else \"session\"}',\n                'retry_after': retry_after,\n                'limits': {\n                    'ip': 'Rate limit: 60 requests per minute with burst of 10',\n                    'session': 'Rate limit: 1000 requests per hour'\n                }\n            }\n            \n            response = jsonify(response_data)\n            response.status_code = 429\n            response.headers['Retry-After'] = str(retry_after)\n            response.headers['X-RateLimit-Limit-IP'] = '60'\n            response.headers['X-RateLimit-Limit-Session'] = '1000'\n            \n            return response\n        \n        return f(*args, **kwargs)\n    \n    return decorated_function\n\n\ndef apply_rate_limiting(app):\n    \"\"\"Apply rate limiting to all routes in the Flask app.\"\"\"\n    \n    # Store original route function\n    original_add_url_rule = app.add_url_rule\n    \n    def add_url_rule_with_rate_limit(rule, endpoint=None, view_func=None, **options):\n        \"\"\"Wrap add_url_rule to automatically apply rate limiting.\"\"\"\n        if view_func is not None:\n            view_func = rate_limit_decorator(view_func)\n        \n        return original_add_url_rule(rule, endpoint, view_func, **options)\n    \n    # Replace the add_url_rule method\n    app.add_url_rule = add_url_rule_with_rate_limit\n    \n    # Also handle routes added via decorators\n    def before_request_rate_limit():\n        \"\"\"Check rate limits before processing any request.\"\"\"\n        # Skip rate limiting for certain endpoints if needed\n        if request.endpoint in getattr(app, 'rate_limit_exempt_endpoints', set()):\n            return\n        \n        allowed, limit_type, retry_after = rate_limiter.check_rate_limit()\n        \n        if not allowed:\n            response_data = {\n                'error': 'Rate limit exceeded',\n                'message': f'Too many requests from {\"IP address\" if limit_type == \"ip\" else \"session\"}',\n                'retry_after': retry_after,\n                'limits': {\n                    'ip': 'Rate limit: 60 requests per minute with burst of 10',\n                    'session': 'Rate limit: 1000 requests per hour'\n                }\n            }\n            \n            response = jsonify(response_data)\n            response.status_code = 429\n            response.headers['Retry-After'] = str(retry_after)\n            response.headers['X-RateLimit-Limit-IP'] = '60'\n            response.headers['X-RateLimit-Limit-Session'] = '1000'\n            \n            return response\n    \n    app.before_request(before_request_rate_limit)\n\n\ndef exempt_from_rate_limiting(app, endpoint):\n    \"\"\"Exempt a specific endpoint from rate limiting.\"\"\"\n    if not hasattr(app, 'rate_limit_exempt_endpoints'):\n        app.rate_limit_exempt_endpoints = set()\n    \n    app.rate_limit_exempt_endpoints.add(endpoint)\n\n\n# Usage example for router setup\ndef setup_rate_limited_router(app):\n    \"\"\"Set up the router with rate limiting enabled.\"\"\"\n    \n    # Apply rate limiting to the app\n    apply_rate_limiting(app)\n    \n    # Example: exempt health check endpoint\n    exempt_from_rate_limiting(app, 'health_check')\n    \n    @app.route('/health')\n    def health_check():\n        return jsonify({'status': 'healthy', 'timestamp': time.time()})\n    \n    @app.route('/api/translate', methods=['POST'])\n    def translate():\n        \"\"\"Example translation endpoint with automatic rate limiting.\"\"\"\n        # This endpoint will automatically have rate limiting applied\n        data = request.get_json()\n        \n        # Your translation logic here\n        result = {\n            'translated_text': f\"Translated: {data.get('text', '')}\",\n            'source_lang': data.get('source_lang', 'auto'),\n            'target_lang': data.get('target_lang', 'en')\n        }\n        \n        return jsonify(result)\n    \n    @app.route('/api/status')\n    def api_status():\n        \"\"\"API status endpoint with rate limiting info.\"\"\"\n        client_ip = rate_limiter._get_client_ip()\n        session_id = rate_limiter._get_session_id()\n        \n        status = {\n            'api_status': 'operational',\n            'rate_limits': {\n                'ip': {\n                    'limit': '60 requests per minute',\n                    'burst': 10,\n                    'current_ip': client_ip\n                },\n                'session': {\n                    'limit': '1000 requests per hour',\n                    'current_session': session_id is not None\n                }\n            }\n        }\n        \n        return jsonify(status)\n    \n    return app\n\n\nif __name__ == '__main__':\n    # Example usage\n    from flask import Flask\n    \n    app = Flask(__name__)\n    app = setup_rate_limited_router(app)\n    \n    print(\"Rate-limited router example:\")\n    print(\"- Per-IP: 60 requests/minute, burst of 10\")\n    print(\"- Per-session: 1000 requests/hour\")\n    print(\"- Returns 429 with Retry-After header when exceeded\")\n    print(\"- Thread-safe token bucket implementation\")\n    \n    # Run example server\n    # app.run(debug=True, threaded=True)\n```\n\nThis implementation provides:\n\n## Key Features\n\n1. **Token Bucket Algorithm**: Thread-safe implementation using `threading.Lock`\n2. **Per-IP Rate Limiting**: 60 requests/minute with burst of 10\n3. **Per-Session Rate Limiting**: 1000 requests/hour for authenticated sessions\n4. **429 Response**: Returns proper HTTP 429 with `Retry-After` header\n5. **Thread Safety**: All operations are thread-safe using locks\n\n## Architecture\n\n- **TokenBucket**: Core rate limiting logic with configurable capacity and refill rate\n- **RateLimiter**: Manages multiple buckets for IPs and sessions\n- **Automatic Cleanup**: Removes old, unused buckets to prevent memory leaks\n- **Flexible Integration**: Can be applied as decorator or globally\n\n## Usage\n\n```python\nfrom flask import Flask\nfrom rosetta_core.router import setup_rate_limited_router, exempt_from_rate_limiting\n\napp = Flask(__name__)\napp = setup_rate_limited_router(app)\n\n# Exempt specific endpoints if needed\nexempt_from_rate_limiting(app, 'health_check')\n```\n\n## Response Format\n\nWhen rate limited, returns:\n```json\n{\n    \"error\": \"Rate limit exceeded\",\n    \"message\": \"Too many requests from IP address\",\n    \"retry_after\": 60,\n    \"limits\": {\n        \"ip\": \"Rate limit: 60 requests per minute with burst of 10\",\n        \"session\": \"Rate limit: 1000 requests per hour\"\n    }\n}\n```\n\nThe implementation is production-ready with proper error handling, memory management, and configurable limits."
}
Task ID: a2651437-c7a2-474d-9c4a-668a1b5f88ba