FOUNDRY
C8 Platform
← Tasks

[SELF-03] Corridor caching layer with TTL

completedcode_genP2

Description

## SELF-03: Corridor Caching Add in-memory caching for corridor lookups with TTL-based invalidation. ### Requirements - FX rates: 15-minute TTL - Freight indices: 1-hour TTL - Corridor cost results: 30-minute TTL - Use stdlib only (no Redis) — module-level dict with timestamps - Function: `cached_corridor_cost(origin, dest, hs6) -> dict` ### Target File File: `c8_platform/corridor_cost_engine.py`
Assignee
-
Claimed By
Cloud Lu → Claude Sonnet
Created
1d ago
Completed
1d ago

Quality

Overall
100%
Pass
PASS
Graded By
-
Graded At
-

Cost

Input Tokens
-
Output Tokens
-
Est. USD
-

Result

{
  "cost": {
    "input_tokens": 151,
    "estimated_usd": 0.048753,
    "output_tokens": 3220
  },
  "model": "claude-sonnet-4-20250514",
  "status": "success",
  "transport": "anthropic_api",
  "latency_ms": 40645,
  "response_text": "I'll implement a corridor caching layer with TTL-based invalidation using only the standard library. Here's the implementation:\n\n```python\n\"\"\"\nCorridor cost calculation engine with caching layer.\n\"\"\"\n\nimport time\nfrom typing import Dict, Any, Optional, Tuple\nfrom dataclasses import dataclass\nfrom datetime import datetime, timedelta\n\n\n@dataclass\nclass CacheEntry:\n    \"\"\"Cache entry with data and expiration timestamp.\"\"\"\n    data: Any\n    expires_at: float\n\n\nclass TTLCache:\n    \"\"\"Simple TTL-based cache using module-level dictionary.\"\"\"\n    \n    def __init__(self):\n        self._cache: Dict[str, CacheEntry] = {}\n    \n    def get(self, key: str) -> Optional[Any]:\n        \"\"\"Get cached value if not expired.\"\"\"\n        if key not in self._cache:\n            return None\n        \n        entry = self._cache[key]\n        if time.time() > entry.expires_at:\n            # Expired, remove from cache\n            del self._cache[key]\n            return None\n        \n        return entry.data\n    \n    def set(self, key: str, value: Any, ttl_seconds: int) -> None:\n        \"\"\"Cache value with TTL in seconds.\"\"\"\n        expires_at = time.time() + ttl_seconds\n        self._cache[key] = CacheEntry(data=value, expires_at=expires_at)\n    \n    def clear(self) -> None:\n        \"\"\"Clear all cached entries.\"\"\"\n        self._cache.clear()\n    \n    def cleanup_expired(self) -> int:\n        \"\"\"Remove expired entries and return count removed.\"\"\"\n        current_time = time.time()\n        expired_keys = [\n            key for key, entry in self._cache.items()\n            if current_time > entry.expires_at\n        ]\n        \n        for key in expired_keys:\n            del self._cache[key]\n        \n        return len(expired_keys)\n\n\n# Module-level cache instances\n_fx_cache = TTLCache()\n_freight_cache = TTLCache()\n_corridor_cache = TTLCache()\n\n# TTL constants (in seconds)\nFX_RATE_TTL = 15 * 60  # 15 minutes\nFREIGHT_INDEX_TTL = 60 * 60  # 1 hour\nCORRIDOR_COST_TTL = 30 * 60  # 30 minutes\n\n\ndef get_fx_rate(from_currency: str, to_currency: str) -> float:\n    \"\"\"Get FX rate with caching (15-minute TTL).\"\"\"\n    cache_key = f\"fx_{from_currency}_{to_currency}\"\n    \n    # Check cache first\n    cached_rate = _fx_cache.get(cache_key)\n    if cached_rate is not None:\n        return cached_rate\n    \n    # Simulate FX rate lookup (replace with actual implementation)\n    rate = _fetch_fx_rate(from_currency, to_currency)\n    \n    # Cache the result\n    _fx_cache.set(cache_key, rate, FX_RATE_TTL)\n    \n    return rate\n\n\ndef get_freight_index(origin: str, dest: str) -> float:\n    \"\"\"Get freight index with caching (1-hour TTL).\"\"\"\n    cache_key = f\"freight_{origin}_{dest}\"\n    \n    # Check cache first\n    cached_index = _freight_cache.get(cache_key)\n    if cached_index is not None:\n        return cached_index\n    \n    # Simulate freight index lookup (replace with actual implementation)\n    index = _fetch_freight_index(origin, dest)\n    \n    # Cache the result\n    _freight_cache.set(cache_key, index, FREIGHT_INDEX_TTL)\n    \n    return index\n\n\ndef cached_corridor_cost(origin: str, dest: str, hs6: str) -> dict:\n    \"\"\"\n    Get corridor cost with caching (30-minute TTL).\n    \n    Args:\n        origin: Origin location code\n        dest: Destination location code  \n        hs6: 6-digit HS commodity code\n        \n    Returns:\n        dict: Corridor cost breakdown including:\n            - base_freight: Base freight cost\n            - fuel_surcharge: Fuel surcharge amount\n            - fx_rate: Applied FX rate\n            - total_cost: Total corridor cost\n            - currency: Cost currency\n            - timestamp: Calculation timestamp\n            - cached: Whether result was from cache\n    \"\"\"\n    cache_key = f\"corridor_{origin}_{dest}_{hs6}\"\n    \n    # Check cache first\n    cached_result = _corridor_cache.get(cache_key)\n    if cached_result is not None:\n        cached_result['cached'] = True\n        return cached_result\n    \n    # Calculate corridor cost\n    result = _calculate_corridor_cost(origin, dest, hs6)\n    result['cached'] = False\n    \n    # Cache the result\n    _corridor_cache.set(cache_key, result, CORRIDOR_COST_TTL)\n    \n    return result\n\n\ndef _calculate_corridor_cost(origin: str, dest: str, hs6: str) -> dict:\n    \"\"\"Calculate corridor cost (internal implementation).\"\"\"\n    # Get base freight rate\n    freight_index = get_freight_index(origin, dest)\n    \n    # Apply commodity-specific multiplier based on HS6\n    commodity_multiplier = _get_commodity_multiplier(hs6)\n    base_freight = freight_index * commodity_multiplier\n    \n    # Calculate fuel surcharge (example: 15% of base)\n    fuel_surcharge = base_freight * 0.15\n    \n    # Get FX rate (assuming USD base, convert to local currency)\n    dest_currency = _get_destination_currency(dest)\n    fx_rate = get_fx_rate(\"USD\", dest_currency) if dest_currency != \"USD\" else 1.0\n    \n    # Calculate totals\n    subtotal_usd = base_freight + fuel_surcharge\n    total_cost = subtotal_usd * fx_rate\n    \n    return {\n        \"base_freight\": base_freight,\n        \"fuel_surcharge\": fuel_surcharge,\n        \"fx_rate\": fx_rate,\n        \"total_cost\": total_cost,\n        \"currency\": dest_currency,\n        \"timestamp\": datetime.utcnow().isoformat(),\n        \"origin\": origin,\n        \"destination\": dest,\n        \"hs6_code\": hs6\n    }\n\n\ndef _fetch_fx_rate(from_currency: str, to_currency: str) -> float:\n    \"\"\"Simulate FX rate fetch (replace with actual API call).\"\"\"\n    # Simulate API delay\n    time.sleep(0.1)\n    \n    # Mock FX rates\n    mock_rates = {\n        (\"USD\", \"EUR\"): 0.85,\n        (\"USD\", \"GBP\"): 0.73,\n        (\"USD\", \"JPY\"): 110.0,\n        (\"USD\", \"CNY\"): 6.45,\n        (\"EUR\", \"USD\"): 1.18,\n        (\"GBP\", \"USD\"): 1.37,\n    }\n    \n    return mock_rates.get((from_currency, to_currency), 1.0)\n\n\ndef _fetch_freight_index(origin: str, dest: str) -> float:\n    \"\"\"Simulate freight index fetch (replace with actual data source).\"\"\"\n    # Simulate API delay\n    time.sleep(0.2)\n    \n    # Mock freight indices based on route\n    mock_indices = {\n        (\"USNYC\", \"DEHAM\"): 1250.0,\n        (\"USNYC\", \"CNSHA\"): 2100.0,\n        (\"USLAX\", \"JPNRT\"): 1800.0,\n        (\"DEHAM\", \"CNSHA\"): 1650.0,\n        (\"USNYC\", \"GBLON\"): 950.0,\n    }\n    \n    return mock_indices.get((origin, dest), 1500.0)  # Default rate\n\n\ndef _get_commodity_multiplier(hs6: str) -> float:\n    \"\"\"Get commodity-specific multiplier based on HS6 code.\"\"\"\n    # First two digits indicate general category\n    category = hs6[:2]\n    \n    multipliers = {\n        \"01\": 1.2,  # Live animals\n        \"02\": 1.1,  # Meat\n        \"84\": 0.9,  # Machinery\n        \"85\": 0.8,  # Electrical equipment\n        \"87\": 1.3,  # Vehicles\n        \"27\": 1.5,  # Mineral fuels\n    }\n    \n    return multipliers.get(category, 1.0)  # Default multiplier\n\n\ndef _get_destination_currency(dest: str) -> str:\n    \"\"\"Get currency for destination.\"\"\"\n    # Extract country from location code (simplified)\n    country_currencies = {\n        \"DE\": \"EUR\",  # Germany\n        \"GB\": \"GBP\",  # Great Britain\n        \"JP\": \"JPY\",  # Japan\n        \"CN\": \"CNY\",  # China\n        \"US\": \"USD\",  # United States\n    }\n    \n    # Extract country code from destination\n    if len(dest) >= 2:\n        country_code = dest[:2]\n        return country_currencies.get(country_code, \"USD\")\n    \n    return \"USD\"\n\n\ndef clear_all_caches() -> None:\n    \"\"\"Clear all cache instances.\"\"\"\n    _fx_cache.clear()\n    _freight_cache.clear()\n    _corridor_cache.clear()\n\n\ndef get_cache_stats() -> dict:\n    \"\"\"Get cache statistics.\"\"\"\n    return {\n        \"fx_cache_size\": len(_fx_cache._cache),\n        \"freight_cache_size\": len(_freight_cache._cache),\n        \"corridor_cache_size\": len(_corridor_cache._cache),\n        \"fx_expired_cleaned\": _fx_cache.cleanup_expired(),\n        \"freight_expired_cleaned\": _freight_cache.cleanup_expired(),\n        \"corridor_expired_cleaned\": _corridor_cache.cleanup_expired(),\n    }\n\n\n# Example usage and testing\nif __name__ == \"__main__\":\n    # Test caching functionality\n    print(\"Testing corridor cost caching...\")\n    \n    # First call - should calculate and cache\n    start_time = time.time()\n    result1 = cached_corridor_cost(\"USNYC\", \"DEHAM\", \"840790\")\n    time1 = time.time() - start_time\n    \n    print(f\"First call took {time1:.3f}s\")\n    print(f\"Cached: {result1['cached']}\")\n    print(f\"Total cost: {result1['total_cost']:.2f} {result1['currency']}\")\n    \n    # Second call - should use cache\n    start_time = time.time()\n    result2 = cached_corridor_cost(\"USNYC\", \"DEHAM\", \"840790\")\n    time2 = time.time() - start_time\n    \n    print(f\"\\nSecond call took {time2:.3f}s\")\n    print(f\"Cached: {result2['cached']}\")\n    print(f\"Speed improvement: {time1/time2:.1f}x faster\")\n    \n    # Cache statistics\n    print(f\"\\nCache stats: {get_cache_stats()}\")\n    \n    # Test different commodity\n    result3 = cached_corridor_cost(\"USNYC\", \"DEHAM\", \"270900\")  # Different HS6\n    print(f\"\\nDifferent commodity cost: {result3['total_cost']:.2f} {result3['currency']}\")\n```\n\nThis implementation provides:\n\n## Key Features\n\n1. **TTL-Based Caching**: Three separate caches with different TTL values:\n   - FX rates: 15 minutes\n   - Freight indices: 1 hour  \n   - Corridor costs: 30 minutes\n\n2. **Stdlib Only**: Uses only standard library with `time` and `typing` modules\n\n3. **Automatic Expiration**: Cache entries automatically expire and are cleaned up\n\n4. **Cache Statistics**: Built-in cache monitoring and cleanup functions\n\n## Cache Design\n\n- **TTLCache Class**: Generic TTL cache implementation\n- **Module-level Instances**: Separate cache instances for different data types\n- **Automatic Cleanup**: Expired entries removed on access and via cleanup method\n\n## Performance Benefits\n\n- **Reduced API Calls**: Cached data avoids expensive external lookups\n- **Faster Response**: Cached corridor costs return immediately\n- **Smart Invalidation**: TTL ensures data freshness while maximizing cache hits\n\n## Usage Examples\n\n```python\n# Basic usage\ncost = cached_corridor_cost(\"USNYC\", \"DEHAM\", \"840790\")\nprint(f\"Total: {cost['total_cost']} {cost['currency']}\")\n\n# Check if result was cached\nif cost['cached']:\n    print(\"Result served from cache\")\n\n# Cache management\nclear_all_caches()  # Clear all caches\nstats = get_cache_stats()  # Get cache statistics\n```\n\nThe implementation balances performance with data freshness, using appropriate TTL values for different types of logistics data based on their typical update frequencies."
}
Task ID: 20794123-5efd-458f-ae24-ab632fbe6635