← Tasks
Rate limiting enforcement per API key tier
completedcode_genP2
Description
Implement per-key rate limiting based on tier monthly call allowance. Return 429 with Retry-After header when limit exceeded. Check against metering table.
Assignee
-
Claimed By
Cloud Lu → Claude Sonnet
Created
21d ago
Completed
1d ago
Quality
Overall
1%
Pass
FAIL
Graded By
-
Graded At
-
Cost
Input Tokens
-
Output Tokens
-
Est. USD
-
Result
{
"cost": {
"input_tokens": 50,
"estimated_usd": 0.083865,
"output_tokens": 5581
},
"model": "claude-sonnet-4-20250514",
"status": "success",
"transport": "anthropic_api",
"latency_ms": 66160,
"response_text": "I'll help you implement a comprehensive rate limiting system based on API key tiers with monthly allowances.\n\n## Database Schema\n\n```sql\n-- API Keys and Tiers\nCREATE TABLE api_keys (\n key_id VARCHAR(64) PRIMARY KEY,\n tier VARCHAR(20) NOT NULL,\n created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n is_active BOOLEAN DEFAULT TRUE\n);\n\n-- Tier Configuration\nCREATE TABLE api_tiers (\n tier VARCHAR(20) PRIMARY KEY,\n monthly_limit INTEGER NOT NULL,\n rate_limit_per_minute INTEGER NOT NULL,\n description TEXT\n);\n\n-- Usage Metering\nCREATE TABLE api_usage (\n key_id VARCHAR(64),\n year_month VARCHAR(7), -- Format: YYYY-MM\n call_count INTEGER DEFAULT 0,\n last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n PRIMARY KEY (key_id, year_month),\n FOREIGN KEY (key_id) REFERENCES api_keys(key_id)\n);\n\n-- Rate limiting window tracking\nCREATE TABLE rate_limit_windows (\n key_id VARCHAR(64),\n window_start TIMESTAMP,\n call_count INTEGER DEFAULT 0,\n PRIMARY KEY (key_id, window_start)\n);\n\n-- Sample tier data\nINSERT INTO api_tiers VALUES\n('free', 1000, 10, 'Free tier - 1K calls/month'),\n('basic', 10000, 50, 'Basic tier - 10K calls/month'),\n('pro', 100000, 200, 'Pro tier - 100K calls/month'),\n('enterprise', 1000000, 1000, 'Enterprise tier - 1M calls/month');\n```\n\n## Core Rate Limiting Service\n\n```python\nimport time\nimport asyncio\nfrom datetime import datetime, timedelta\nfrom typing import Optional, Tuple, Dict, Any\nfrom dataclasses import dataclass\nimport redis\nimport asyncpg\nfrom fastapi import HTTPException, Request\nfrom fastapi.responses import JSONResponse\n\n@dataclass\nclass RateLimitResult:\n allowed: bool\n remaining_calls: int\n reset_time: int\n retry_after: Optional[int] = None\n monthly_remaining: int = 0\n\n@dataclass\nclass ApiKeyInfo:\n key_id: str\n tier: str\n monthly_limit: int\n rate_limit_per_minute: int\n is_active: bool\n\nclass RateLimitService:\n def __init__(self, db_pool: asyncpg.Pool, redis_client: redis.Redis):\n self.db_pool = db_pool\n self.redis = redis_client\n self.tier_cache = {}\n self.cache_ttl = 300 # 5 minutes\n \n async def get_api_key_info(self, api_key: str) -> Optional[ApiKeyInfo]:\n \"\"\"Get API key information with tier details\"\"\"\n cache_key = f\"api_key:{api_key}\"\n \n # Try cache first\n cached = await self.redis.get(cache_key)\n if cached:\n import json\n data = json.loads(cached)\n return ApiKeyInfo(**data)\n \n # Query database\n async with self.db_pool.acquire() as conn:\n query = \"\"\"\n SELECT ak.key_id, ak.tier, ak.is_active,\n at.monthly_limit, at.rate_limit_per_minute\n FROM api_keys ak\n JOIN api_tiers at ON ak.tier = at.tier\n WHERE ak.key_id = $1 AND ak.is_active = TRUE\n \"\"\"\n row = await conn.fetchrow(query, api_key)\n \n if not row:\n return None\n \n key_info = ApiKeyInfo(\n key_id=row['key_id'],\n tier=row['tier'],\n monthly_limit=row['monthly_limit'],\n rate_limit_per_minute=row['rate_limit_per_minute'],\n is_active=row['is_active']\n )\n \n # Cache the result\n import json\n await self.redis.setex(\n cache_key, \n self.cache_ttl, \n json.dumps(key_info.__dict__)\n )\n \n return key_info\n \n async def check_monthly_limit(self, key_info: ApiKeyInfo) -> Tuple[bool, int]:\n \"\"\"Check if monthly limit is exceeded\"\"\"\n current_month = datetime.now().strftime(\"%Y-%m\")\n \n async with self.db_pool.acquire() as conn:\n # Get current month usage\n usage_row = await conn.fetchrow(\n \"SELECT call_count FROM api_usage WHERE key_id = $1 AND year_month = $2\",\n key_info.key_id, current_month\n )\n \n current_usage = usage_row['call_count'] if usage_row else 0\n remaining = max(0, key_info.monthly_limit - current_usage)\n \n return current_usage < key_info.monthly_limit, remaining\n \n async def check_rate_limit(self, key_info: ApiKeyInfo) -> Tuple[bool, int, int]:\n \"\"\"Check rate limit using sliding window\"\"\"\n window_size = 60 # 1 minute window\n now = int(time.time())\n window_start = now - (now % window_size)\n \n # Redis sliding window counter\n pipe = self.redis.pipeline()\n key = f\"rate_limit:{key_info.key_id}:{window_start}\"\n \n pipe.incr(key)\n pipe.expire(key, window_size)\n results = await pipe.execute()\n \n current_count = results[0]\n remaining = max(0, key_info.rate_limit_per_minute - current_count)\n reset_time = window_start + window_size\n \n return current_count <= key_info.rate_limit_per_minute, remaining, reset_time\n \n async def increment_usage(self, key_id: str) -> None:\n \"\"\"Increment monthly usage counter\"\"\"\n current_month = datetime.now().strftime(\"%Y-%m\")\n \n async with self.db_pool.acquire() as conn:\n await conn.execute(\"\"\"\n INSERT INTO api_usage (key_id, year_month, call_count, last_updated)\n VALUES ($1, $2, 1, CURRENT_TIMESTAMP)\n ON CONFLICT (key_id, year_month) \n DO UPDATE SET \n call_count = api_usage.call_count + 1,\n last_updated = CURRENT_TIMESTAMP\n \"\"\", key_id, current_month)\n \n async def enforce_rate_limit(self, api_key: str) -> RateLimitResult:\n \"\"\"Main rate limiting enforcement function\"\"\"\n # Get API key info\n key_info = await self.get_api_key_info(api_key)\n if not key_info:\n raise HTTPException(status_code=401, detail=\"Invalid API key\")\n \n # Check monthly limit first\n monthly_allowed, monthly_remaining = await self.check_monthly_limit(key_info)\n if not monthly_allowed:\n # Calculate retry after (start of next month)\n next_month = datetime.now().replace(day=1) + timedelta(days=32)\n next_month = next_month.replace(day=1)\n retry_after = int((next_month - datetime.now()).total_seconds())\n \n return RateLimitResult(\n allowed=False,\n remaining_calls=0,\n reset_time=int(next_month.timestamp()),\n retry_after=retry_after,\n monthly_remaining=0\n )\n \n # Check rate limit\n rate_allowed, rate_remaining, reset_time = await self.check_rate_limit(key_info)\n if not rate_allowed:\n retry_after = reset_time - int(time.time())\n \n return RateLimitResult(\n allowed=False,\n remaining_calls=rate_remaining,\n reset_time=reset_time,\n retry_after=retry_after,\n monthly_remaining=monthly_remaining\n )\n \n # If allowed, increment usage\n await self.increment_usage(key_info.key_id)\n \n return RateLimitResult(\n allowed=True,\n remaining_calls=rate_remaining - 1,\n reset_time=reset_time,\n monthly_remaining=monthly_remaining - 1\n )\n\n# Rate limiting middleware\nclass RateLimitMiddleware:\n def __init__(self, rate_limit_service: RateLimitService):\n self.rate_limit_service = rate_limit_service\n \n async def __call__(self, request: Request, call_next):\n # Extract API key from header\n api_key = request.headers.get(\"X-API-Key\")\n if not api_key:\n return JSONResponse(\n status_code=401,\n content={\"error\": \"API key required\"}\n )\n \n try:\n # Check rate limits\n result = await self.rate_limit_service.enforce_rate_limit(api_key)\n \n if not result.allowed:\n headers = {\n \"X-RateLimit-Remaining\": str(result.remaining_calls),\n \"X-RateLimit-Reset\": str(result.reset_time),\n \"X-Monthly-Remaining\": str(result.monthly_remaining)\n }\n \n if result.retry_after:\n headers[\"Retry-After\"] = str(result.retry_after)\n \n return JSONResponse(\n status_code=429,\n content={\n \"error\": \"Rate limit exceeded\",\n \"retry_after\": result.retry_after\n },\n headers=headers\n )\n \n # Add rate limit headers to response\n response = await call_next(request)\n response.headers[\"X-RateLimit-Remaining\"] = str(result.remaining_calls)\n response.headers[\"X-RateLimit-Reset\"] = str(result.reset_time)\n response.headers[\"X-Monthly-Remaining\"] = str(result.monthly_remaining)\n \n return response\n \n except HTTPException as e:\n return JSONResponse(\n status_code=e.status_code,\n content={\"error\": e.detail}\n )\n except Exception as e:\n return JSONResponse(\n status_code=500,\n content={\"error\": \"Internal server error\"}\n )\n```\n\n## FastAPI Implementation\n\n```python\nfrom fastapi import FastAPI, Depends, HTTPException\nfrom fastapi.middleware.base import BaseHTTPMiddleware\nimport asyncpg\nimport redis.asyncio as redis\n\napp = FastAPI()\n\n# Initialize services\nasync def init_services():\n # Database connection\n db_pool = await asyncpg.create_pool(\n \"postgresql://user:pass@localhost/ratelimit_db\"\n )\n \n # Redis connection\n redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)\n \n # Rate limiting service\n rate_limit_service = RateLimitService(db_pool, redis_client)\n \n return rate_limit_service\n\n@app.on_event(\"startup\")\nasync def startup():\n app.state.rate_limit_service = await init_services()\n \n # Add middleware\n app.add_middleware(\n BaseHTTPMiddleware,\n dispatch=RateLimitMiddleware(app.state.rate_limit_service)\n )\n\n# Alternative decorator approach\nfrom functools import wraps\n\ndef rate_limited(func):\n @wraps(func)\n async def wrapper(*args, **kwargs):\n # This would be handled by middleware, but showing decorator pattern\n return await func(*args, **kwargs)\n return wrapper\n\n@app.get(\"/api/data\")\n@rate_limited\nasync def get_data():\n return {\"data\": \"Your API response\"}\n\n@app.get(\"/api/status/{api_key}\")\nasync def get_rate_limit_status(api_key: str):\n \"\"\"Get current rate limit status for an API key\"\"\"\n service = app.state.rate_limit_service\n \n key_info = await service.get_api_key_info(api_key)\n if not key_info:\n raise HTTPException(status_code=404, detail=\"API key not found\")\n \n monthly_allowed, monthly_remaining = await service.check_monthly_limit(key_info)\n rate_allowed, rate_remaining, reset_time = await service.check_rate_limit(key_info)\n \n return {\n \"api_key\": api_key,\n \"tier\": key_info.tier,\n \"monthly_limit\": key_info.monthly_limit,\n \"monthly_remaining\": monthly_remaining,\n \"rate_limit_per_minute\": key_info.rate_limit_per_minute,\n \"rate_remaining\": rate_remaining,\n \"reset_time\": reset_time,\n \"status\": \"active\" if monthly_allowed and rate_allowed else \"limited\"\n }\n```\n\n## Advanced Features\n\n```python\n# Enhanced rate limiting with burst allowance\nclass EnhancedRateLimitService(RateLimitService):\n \n async def check_burst_allowance(self, key_info: ApiKeyInfo) -> Tuple[bool, int]:\n \"\"\"Allow burst traffic up to 2x normal rate for premium tiers\"\"\"\n if key_info.tier in ['pro', 'enterprise']:\n burst_multiplier = 2\n else:\n burst_multiplier = 1.2\n \n burst_limit = int(key_info.rate_limit_per_minute * burst_multiplier)\n \n # Use separate burst counter\n window_size = 60\n now = int(time.time())\n window_start = now - (now % window_size)\n \n key = f\"burst_limit:{key_info.key_id}:{window_start}\"\n current_count = await self.redis.get(key) or 0\n current_count = int(current_count)\n \n return current_count < burst_limit, burst_limit - current_count\n\n async def get_usage_analytics(self, key_id: str, months: int = 6) -> Dict[str, Any]:\n \"\"\"Get usage analytics for the API key\"\"\"\n async with self.db_pool.acquire() as conn:\n query = \"\"\"\n SELECT year_month, call_count, \n LAG(call_count) OVER (ORDER BY year_month) as prev_month\n FROM api_usage \n WHERE key_id = $1 \n ORDER BY year_month DESC \n LIMIT $2\n \"\"\"\n rows = await conn.fetch(query, key_id, months)\n \n analytics = {\n \"usage_history\": [],\n \"total_calls\": 0,\n \"avg_monthly\": 0,\n \"growth_rate\": 0\n }\n \n for row in rows:\n analytics[\"usage_history\"].append({\n \"month\": row[\"year_month\"],\n \"calls\": row[\"call_count\"],\n \"growth\": ((row[\"call_count\"] - (row[\"prev_month\"] or 0)) / \n max(row[\"prev_month\"] or 1, 1)) * 100 if row[\"prev_month\"] else 0\n })\n analytics[\"total_calls\"] += row[\"call_count\"]\n \n if rows:\n analytics[\"avg_monthly\"] = analytics[\"total_calls\"] / len(rows)\n if len(rows) > 1:\n recent_avg = sum(r[\"call_count\"] for r in rows[:3]) / min(3, len(rows))\n older_avg = sum(r[\"call_count\"] for r in rows[3:]) / max(1, len(rows[3:]))\n analytics[\"growth_rate\"] = ((recent_avg - older_avg) / older_avg) * 100 if older_avg > 0 else 0\n \n return analytics\n\n# Rate limit with custom rules\nclass CustomRuleEngine:\n def __init__(self):\n self.rules = {}\n \n async def add_custom_rule(self, key_id: str, rule: Dict[str, Any]):\n \"\"\"Add custom rate limiting rule for specific API key\"\"\"\n self.rules[key_id] = rule\n \n async def apply_custom_rules(self, key_info: ApiKeyInfo, base_result: RateLimitResult) -> RateLimitResult:\n \"\"\"Apply custom rules that might override base limits\"\"\"\n if key_info.key_id in self.rules:\n rule = self.rules[key_info.key_id]\n \n # Example: Holiday boost\n if rule.get(\"holiday_boost\") and self.is_holiday_period():\n base_result.remaining_calls *= 2\n \n # Example: Geographic restrictions\n if rule.get(\"geo_restrictions\"):\n # Would check IP geolocation here\n pass\n \n return base_result\n \n def is_holiday_period(self) -> bool:\n \"\"\"Check if current time is in holiday period\"\"\"\n now = datetime.now()\n # Example: Black Friday week\n if now.month == 11 and 20 <= now.day <= 30:\n return True\n return False\n```\n\n## Monitoring and Alerting\n\n```python\nimport logging\nfrom typing import List\nfrom dataclasses import asdict\n\nclass RateLimitMonitor:\n def __init__(self, rate_limit_service: RateLimitService):\n self.service = rate_limit_service\n self.logger = logging.getLogger(__name__)\n \n async def check_approaching_limits(self, threshold: float = 0.8) -> List[Dict[str, Any]]:\n \"\"\"Find API keys approaching their limits\"\"\"\n current_month = datetime.now().strftime(\"%Y-%m\")\n \n async with self.service.db_pool.acquire() as conn:\n query = \"\"\"\n SELECT ak.key_id, ak.tier, at.monthly_limit, \n COALESCE(au.call_count, 0) as current_usage,\n (COALESCE(au.call_count, 0)::float / at.monthly_limit) as usage_ratio\n FROM api_keys ak\n JOIN api_tiers at ON ak.tier = at.tier\n LEFT JOIN api_usage au ON ak.key_id = au.key_id AND au.year_month = $1\n WHERE ak.is_active = TRUE \n AND (COALESCE(au.call_count, 0)::float / at.monthly_limit) > $2\n ORDER BY usage_ratio DESC\n \"\"\"\n \n rows = await conn.fetch(query, current_month, threshold)\n \n alerts = []\n for row in rows:\n alert = {\n \"key_id\": row[\"key_id\"],\n \"tier\": row[\"tier\"],\n \"monthly_limit\": row[\"monthly_limit\"],\n \"current_usage\": row[\"current_usage\"],\n \"usage_percentage\": row[\"usage_ratio\"] * 100,\n \"calls_remaining\": row[\"monthly_limit\"] - row[\"current_usage\"]\n }\n alerts.append(alert)\n \n # Log warning\n self.logger.warning(\n f\"API key {row['key_id']} approaching limit: \"\n f\"{row['usage_ratio']*100:.1f}% used\"\n )\n \n return alerts\n \n async def generate_usage_report(self) -> Dict[str, Any]:\n \"\"\"Generate comprehensive usage report\"\"\"\n async with self.service.db_pool.acquire() as conn:\n # Tier distribution\n tier_stats = await conn.fetch(\"\"\"\n SELECT at.tier, COUNT(*) as key_count,\n SUM(COALESCE(au.call_count, 0)) as total_calls\n FROM api_tiers at\n LEFT JOIN api_keys ak ON at.tier = ak.tier AND ak.is_active = TRUE\n LEFT JOIN api_usage au ON ak.key_id = au.key_id \n AND au.year_month = $1\n GROUP BY at.tier\n ORDER BY total_calls DESC\n \"\"\", datetime.now().strftime(\"%Y-%m\"))\n \n # Top consumers\n top_consumers = await conn.fetch(\"\"\"\n SELECT ak.key_id, ak.tier, au.call_count\n FROM api_usage au\n JOIN api_keys ak ON au.key_id = ak.key_id\n WHERE au.year_month = $1 AND ak.is_active = TRUE\n ORDER BY au.call_count DESC\n LIMIT 10\n \"\"\", datetime.now().strftime(\"%Y-%m\"))\n \n return {\n \"report_date\": datetime.now().isoformat(),\n \"tier_distribution\": [dict(row) for row in tier_stats],\n \"top_consumers\": [dict(row) for row in top_consumers],\n \"total_active_keys\": sum(row[\"key_count\"] for row in tier_stats),\n \"total_calls_this_month\": sum(row[\"total_calls\"] for row in tier_stats)\n }\n\n# Usage example with monitoring\n@app.get(\"/admin/rate-limit-alerts\")\nasync def get_rate_limit_alerts():\n \"\"\"Admin endpoint to check for approaching limits\"\"\"\n monitor = RateLimitMonitor(app.state.rate_limit_service)\n alerts = await monitor.check_approaching_limits(threshold=0.8)\n return {\"alerts\": alerts}\n\n@app.get(\"/admin/usage-report\")\nasync def get_usage_report():\n \"\"\"Admin endpoint for usage analytics\"\"\"\n monitor = RateLimitMonitor(app.state.rate_limit_service)\n report = await monitor.generate_usage_report()\n return report\n```\n\nThis implementation provides:\n\n1. **Tier-based monthly limits** with database tracking\n2. **Rate limiting per minute** with Redis sliding windows\n3. **Proper 429 responses** with Retry-After headers\n4. **Comprehensive monitoring** and alerting\n5. **Usage analytics** and reporting\n6. **Custom rule support** for special cases\n7. **Burst allowance** for premium tiers\n8. **Caching** for performance optimization\n\nThe system handles both short-term rate limiting and monthly quotas, provides detailed feedback to clients, and includes administrative tools for monitoring usage patterns."
}Audit Trail (3)
1d agotask_completedTerminal Lu
1d agotask_claimedTerminal Lu
1d agotask_claimedVS Lu
Task ID: a5e9ddf9-3080-44e3-8510-d2996c6cc15b