← Tasks
[SELF-03] Corridor caching layer with TTL
completedcode_genP2
Description
## SELF-03: Corridor Caching
Add in-memory caching for corridor lookups with TTL-based invalidation.
### Requirements
- FX rates: 15-minute TTL
- Freight indices: 1-hour TTL
- Corridor cost results: 30-minute TTL
- Use stdlib only (no Redis) — module-level dict with timestamps
- Function: `cached_corridor_cost(origin, dest, hs6) -> dict`
### Target File
File: `c8_platform/corridor_cost_engine.py`
Assignee
-
Claimed By
Cloud Lu → Claude Sonnet
Created
1d ago
Completed
1d ago
Quality
Overall
100%
Pass
PASS
Graded By
-
Graded At
-
Cost
Input Tokens
-
Output Tokens
-
Est. USD
-
Result
{
"cost": {
"input_tokens": 151,
"estimated_usd": 0.048753,
"output_tokens": 3220
},
"model": "claude-sonnet-4-20250514",
"status": "success",
"transport": "anthropic_api",
"latency_ms": 40645,
"response_text": "I'll implement a corridor caching layer with TTL-based invalidation using only the standard library. Here's the implementation:\n\n```python\n\"\"\"\nCorridor cost calculation engine with caching layer.\n\"\"\"\n\nimport time\nfrom typing import Dict, Any, Optional, Tuple\nfrom dataclasses import dataclass\nfrom datetime import datetime, timedelta\n\n\n@dataclass\nclass CacheEntry:\n \"\"\"Cache entry with data and expiration timestamp.\"\"\"\n data: Any\n expires_at: float\n\n\nclass TTLCache:\n \"\"\"Simple TTL-based cache using module-level dictionary.\"\"\"\n \n def __init__(self):\n self._cache: Dict[str, CacheEntry] = {}\n \n def get(self, key: str) -> Optional[Any]:\n \"\"\"Get cached value if not expired.\"\"\"\n if key not in self._cache:\n return None\n \n entry = self._cache[key]\n if time.time() > entry.expires_at:\n # Expired, remove from cache\n del self._cache[key]\n return None\n \n return entry.data\n \n def set(self, key: str, value: Any, ttl_seconds: int) -> None:\n \"\"\"Cache value with TTL in seconds.\"\"\"\n expires_at = time.time() + ttl_seconds\n self._cache[key] = CacheEntry(data=value, expires_at=expires_at)\n \n def clear(self) -> None:\n \"\"\"Clear all cached entries.\"\"\"\n self._cache.clear()\n \n def cleanup_expired(self) -> int:\n \"\"\"Remove expired entries and return count removed.\"\"\"\n current_time = time.time()\n expired_keys = [\n key for key, entry in self._cache.items()\n if current_time > entry.expires_at\n ]\n \n for key in expired_keys:\n del self._cache[key]\n \n return len(expired_keys)\n\n\n# Module-level cache instances\n_fx_cache = TTLCache()\n_freight_cache = TTLCache()\n_corridor_cache = TTLCache()\n\n# TTL constants (in seconds)\nFX_RATE_TTL = 15 * 60 # 15 minutes\nFREIGHT_INDEX_TTL = 60 * 60 # 1 hour\nCORRIDOR_COST_TTL = 30 * 60 # 30 minutes\n\n\ndef get_fx_rate(from_currency: str, to_currency: str) -> float:\n \"\"\"Get FX rate with caching (15-minute TTL).\"\"\"\n cache_key = f\"fx_{from_currency}_{to_currency}\"\n \n # Check cache first\n cached_rate = _fx_cache.get(cache_key)\n if cached_rate is not None:\n return cached_rate\n \n # Simulate FX rate lookup (replace with actual implementation)\n rate = _fetch_fx_rate(from_currency, to_currency)\n \n # Cache the result\n _fx_cache.set(cache_key, rate, FX_RATE_TTL)\n \n return rate\n\n\ndef get_freight_index(origin: str, dest: str) -> float:\n \"\"\"Get freight index with caching (1-hour TTL).\"\"\"\n cache_key = f\"freight_{origin}_{dest}\"\n \n # Check cache first\n cached_index = _freight_cache.get(cache_key)\n if cached_index is not None:\n return cached_index\n \n # Simulate freight index lookup (replace with actual implementation)\n index = _fetch_freight_index(origin, dest)\n \n # Cache the result\n _freight_cache.set(cache_key, index, FREIGHT_INDEX_TTL)\n \n return index\n\n\ndef cached_corridor_cost(origin: str, dest: str, hs6: str) -> dict:\n \"\"\"\n Get corridor cost with caching (30-minute TTL).\n \n Args:\n origin: Origin location code\n dest: Destination location code \n hs6: 6-digit HS commodity code\n \n Returns:\n dict: Corridor cost breakdown including:\n - base_freight: Base freight cost\n - fuel_surcharge: Fuel surcharge amount\n - fx_rate: Applied FX rate\n - total_cost: Total corridor cost\n - currency: Cost currency\n - timestamp: Calculation timestamp\n - cached: Whether result was from cache\n \"\"\"\n cache_key = f\"corridor_{origin}_{dest}_{hs6}\"\n \n # Check cache first\n cached_result = _corridor_cache.get(cache_key)\n if cached_result is not None:\n cached_result['cached'] = True\n return cached_result\n \n # Calculate corridor cost\n result = _calculate_corridor_cost(origin, dest, hs6)\n result['cached'] = False\n \n # Cache the result\n _corridor_cache.set(cache_key, result, CORRIDOR_COST_TTL)\n \n return result\n\n\ndef _calculate_corridor_cost(origin: str, dest: str, hs6: str) -> dict:\n \"\"\"Calculate corridor cost (internal implementation).\"\"\"\n # Get base freight rate\n freight_index = get_freight_index(origin, dest)\n \n # Apply commodity-specific multiplier based on HS6\n commodity_multiplier = _get_commodity_multiplier(hs6)\n base_freight = freight_index * commodity_multiplier\n \n # Calculate fuel surcharge (example: 15% of base)\n fuel_surcharge = base_freight * 0.15\n \n # Get FX rate (assuming USD base, convert to local currency)\n dest_currency = _get_destination_currency(dest)\n fx_rate = get_fx_rate(\"USD\", dest_currency) if dest_currency != \"USD\" else 1.0\n \n # Calculate totals\n subtotal_usd = base_freight + fuel_surcharge\n total_cost = subtotal_usd * fx_rate\n \n return {\n \"base_freight\": base_freight,\n \"fuel_surcharge\": fuel_surcharge,\n \"fx_rate\": fx_rate,\n \"total_cost\": total_cost,\n \"currency\": dest_currency,\n \"timestamp\": datetime.utcnow().isoformat(),\n \"origin\": origin,\n \"destination\": dest,\n \"hs6_code\": hs6\n }\n\n\ndef _fetch_fx_rate(from_currency: str, to_currency: str) -> float:\n \"\"\"Simulate FX rate fetch (replace with actual API call).\"\"\"\n # Simulate API delay\n time.sleep(0.1)\n \n # Mock FX rates\n mock_rates = {\n (\"USD\", \"EUR\"): 0.85,\n (\"USD\", \"GBP\"): 0.73,\n (\"USD\", \"JPY\"): 110.0,\n (\"USD\", \"CNY\"): 6.45,\n (\"EUR\", \"USD\"): 1.18,\n (\"GBP\", \"USD\"): 1.37,\n }\n \n return mock_rates.get((from_currency, to_currency), 1.0)\n\n\ndef _fetch_freight_index(origin: str, dest: str) -> float:\n \"\"\"Simulate freight index fetch (replace with actual data source).\"\"\"\n # Simulate API delay\n time.sleep(0.2)\n \n # Mock freight indices based on route\n mock_indices = {\n (\"USNYC\", \"DEHAM\"): 1250.0,\n (\"USNYC\", \"CNSHA\"): 2100.0,\n (\"USLAX\", \"JPNRT\"): 1800.0,\n (\"DEHAM\", \"CNSHA\"): 1650.0,\n (\"USNYC\", \"GBLON\"): 950.0,\n }\n \n return mock_indices.get((origin, dest), 1500.0) # Default rate\n\n\ndef _get_commodity_multiplier(hs6: str) -> float:\n \"\"\"Get commodity-specific multiplier based on HS6 code.\"\"\"\n # First two digits indicate general category\n category = hs6[:2]\n \n multipliers = {\n \"01\": 1.2, # Live animals\n \"02\": 1.1, # Meat\n \"84\": 0.9, # Machinery\n \"85\": 0.8, # Electrical equipment\n \"87\": 1.3, # Vehicles\n \"27\": 1.5, # Mineral fuels\n }\n \n return multipliers.get(category, 1.0) # Default multiplier\n\n\ndef _get_destination_currency(dest: str) -> str:\n \"\"\"Get currency for destination.\"\"\"\n # Extract country from location code (simplified)\n country_currencies = {\n \"DE\": \"EUR\", # Germany\n \"GB\": \"GBP\", # Great Britain\n \"JP\": \"JPY\", # Japan\n \"CN\": \"CNY\", # China\n \"US\": \"USD\", # United States\n }\n \n # Extract country code from destination\n if len(dest) >= 2:\n country_code = dest[:2]\n return country_currencies.get(country_code, \"USD\")\n \n return \"USD\"\n\n\ndef clear_all_caches() -> None:\n \"\"\"Clear all cache instances.\"\"\"\n _fx_cache.clear()\n _freight_cache.clear()\n _corridor_cache.clear()\n\n\ndef get_cache_stats() -> dict:\n \"\"\"Get cache statistics.\"\"\"\n return {\n \"fx_cache_size\": len(_fx_cache._cache),\n \"freight_cache_size\": len(_freight_cache._cache),\n \"corridor_cache_size\": len(_corridor_cache._cache),\n \"fx_expired_cleaned\": _fx_cache.cleanup_expired(),\n \"freight_expired_cleaned\": _freight_cache.cleanup_expired(),\n \"corridor_expired_cleaned\": _corridor_cache.cleanup_expired(),\n }\n\n\n# Example usage and testing\nif __name__ == \"__main__\":\n # Test caching functionality\n print(\"Testing corridor cost caching...\")\n \n # First call - should calculate and cache\n start_time = time.time()\n result1 = cached_corridor_cost(\"USNYC\", \"DEHAM\", \"840790\")\n time1 = time.time() - start_time\n \n print(f\"First call took {time1:.3f}s\")\n print(f\"Cached: {result1['cached']}\")\n print(f\"Total cost: {result1['total_cost']:.2f} {result1['currency']}\")\n \n # Second call - should use cache\n start_time = time.time()\n result2 = cached_corridor_cost(\"USNYC\", \"DEHAM\", \"840790\")\n time2 = time.time() - start_time\n \n print(f\"\\nSecond call took {time2:.3f}s\")\n print(f\"Cached: {result2['cached']}\")\n print(f\"Speed improvement: {time1/time2:.1f}x faster\")\n \n # Cache statistics\n print(f\"\\nCache stats: {get_cache_stats()}\")\n \n # Test different commodity\n result3 = cached_corridor_cost(\"USNYC\", \"DEHAM\", \"270900\") # Different HS6\n print(f\"\\nDifferent commodity cost: {result3['total_cost']:.2f} {result3['currency']}\")\n```\n\nThis implementation provides:\n\n## Key Features\n\n1. **TTL-Based Caching**: Three separate caches with different TTL values:\n - FX rates: 15 minutes\n - Freight indices: 1 hour \n - Corridor costs: 30 minutes\n\n2. **Stdlib Only**: Uses only standard library with `time` and `typing` modules\n\n3. **Automatic Expiration**: Cache entries automatically expire and are cleaned up\n\n4. **Cache Statistics**: Built-in cache monitoring and cleanup functions\n\n## Cache Design\n\n- **TTLCache Class**: Generic TTL cache implementation\n- **Module-level Instances**: Separate cache instances for different data types\n- **Automatic Cleanup**: Expired entries removed on access and via cleanup method\n\n## Performance Benefits\n\n- **Reduced API Calls**: Cached data avoids expensive external lookups\n- **Faster Response**: Cached corridor costs return immediately\n- **Smart Invalidation**: TTL ensures data freshness while maximizing cache hits\n\n## Usage Examples\n\n```python\n# Basic usage\ncost = cached_corridor_cost(\"USNYC\", \"DEHAM\", \"840790\")\nprint(f\"Total: {cost['total_cost']} {cost['currency']}\")\n\n# Check if result was cached\nif cost['cached']:\n print(\"Result served from cache\")\n\n# Cache management\nclear_all_caches() # Clear all caches\nstats = get_cache_stats() # Get cache statistics\n```\n\nThe implementation balances performance with data freshness, using appropriate TTL values for different types of logistics data based on their typical update frequencies."
}Task ID: 20794123-5efd-458f-ae24-ab632fbe6635