FOUNDRY
C8 Platform
← Tasks

[AR-HOOK-01] AskRosetta webhook for async classification results

completedcode_genP1

Description

Create a webhook endpoint for AskRosetta that receives async classification results and stores them in Supabase. ## Context AskRosetta is an HTS product classification API deployed on Google Cloud Run (Zurich). The Cloudflare Worker at `oneworld_trade/deploy/askrosetta-ai/worker.js` proxies requests. The classification engine is in `c8_platform/corridor_engine/` with Flask endpoints. Current flow is synchronous: client calls `/api/classify` → waits for response. For large batches, we need async: client submits → gets job_id → webhook fires when done. ## What to generate File: `c8_platform/corridor_engine/webhook_handler.py` A Flask blueprint that: ### 1. POST /api/v1/webhooks/classification-complete Receives classification results from async processing: ```json { "job_id": "uuid", "status": "completed" | "failed" | "partial", "results": [ { "product_id": "string", "product_description": "string", "hts_code": "6109.10.00", "hts_description": "T-shirts, singlets...", "confidence": 0.92, "duty_rate": 0.167, "source": "rosetta" | "gemini", "section_301_applies": false, "pga_flags": ["FDA"] } ], "metadata": { "total_products": 50, "completed": 48, "failed": 2, "processing_time_ms": 12500, "cost_usd": 0.15 } } ``` ### 2. Behavior: - Validate webhook signature (HMAC-SHA256 with shared secret from `WEBHOOK_SECRET` env var) - Store results in Supabase `classification_jobs` table (upsert by job_id) - Store individual results in `classification_results` table (batch insert) - If client provided a `callback_url`, forward the results there (fire-and-forget POST) - Return 200 with `{"received": true, "job_id": "..."}` - Idempotent: re-processing same job_id updates existing records ### 3. GET /api/v1/jobs/{job_id} Poll endpoint for checking job status: - Returns job status, progress (completed/total), and results if done - Auth: API key in `X-Api-Key` header ### 4. Supabase tables (include CREATE TABLE SQL as comments): ```sql -- classification_jobs: job_id (UUID PK), client_id, status, callback_url, total_products, completed_products, created_at, completed_at, metadata (JSONB) -- classification_results: id (UUID PK), job_id (FK), product_id, product_description, hts_code, confidence, duty_rate, source, pga_flags (TEXT[]), created_at ``` ### Patterns from existing code (app.py): - Flask blueprint registration - `from supabase import create_client` for Supabase access - Environment variables via `os.environ.get()` - JSON response format: `jsonify({"status": "ok", ...})` - Error handling: try/except with 500 response ### Output: Complete Python file with Flask blueprint, ready to register in app.py.
Assignee
-
Claimed By
AG Lu
Created
21d ago
Completed
2d ago

Quality

Overall
79%
Pass
PASS
Graded By
-
Graded At
-

Cost

Input Tokens
-
Output Tokens
-
Est. USD
-

Result

{
  "code": "AR-HOOK-01 Webhook Handler — All requirements PASS. File already exists.\n\nFile: c8_platform/corridor_engine/webhook_handler.py (231 lines)\n\n1. ✅ POST /api/v1/webhooks/classification-complete (line 92):\n   - HMAC-SHA256 signature verification via WEBHOOK_SECRET env var (lines 97-108)\n   - Parses job_id, status, results[], metadata from JSON body\n   - Upserts to classification_jobs table (line 137)\n   - Batch inserts to classification_results with dedup (lines 142-167)\n   - Forwards to callback_url via fire-and-forget thread (lines 170-181)\n   - Returns 200 with {\"received\": true, \"job_id\": \"...\"}\n   - Idempotent: checks existing product_ids before inserting\n\n2. ✅ GET /api/v1/jobs/{job_id} (line 190):\n   - API key auth via X-Api-Key header\n   - Returns job status, progress, metadata\n   - Includes results array when status is completed/partial\n\n3. ✅ CREATE TABLE SQL in docstring (lines 9-32):\n   - classification_jobs: job_id UUID PK, client_id, status, callback_url, total/completed counts, timestamps, metadata JSONB\n   - classification_results: id UUID PK, job_id FK, product_id, product_description, hts_code, confidence, duty_rate, source, pga_flags TEXT[], created_at\n\n4. ✅ Flask Blueprint pattern (webhook_bp, line 47)\n5. ✅ Lazy Supabase client init (lines 49-60)\n6. ✅ Error handling: try/except with 500 response + logging",
  "content": "AR-HOOK-01 Webhook Handler — All requirements PASS. File already exists.\n\nFile: c8_platform/corridor_engine/webhook_handler.py (231 lines)\n\n1. ✅ POST /api/v1/webhooks/classification-complete (line 92):\n   - HMAC-SHA256 signature verification via WEBHOOK_SECRET env var (lines 97-108)\n   - Parses job_id, status, results[], metadata from JSON body\n   - Upserts to classification_jobs table (line 137)\n   - Batch inserts to classification_results with dedup (lines 142-167)\n   - Forwards to callback_url via fire-and-forget thread (lines 170-181)\n   - Returns 200 with {\"received\": true, \"job_id\": \"...\"}\n   - Idempotent: checks existing product_ids before inserting\n\n2. ✅ GET /api/v1/jobs/{job_id} (line 190):\n   - API key auth via X-Api-Key header\n   - Returns job status, progress, metadata\n   - Includes results array when status is completed/partial\n\n3. ✅ CREATE TABLE SQL in docstring (lines 9-32):\n   - classification_jobs: job_id UUID PK, client_id, status, callback_url, total/completed counts, timestamps, metadata JSONB\n   - classification_results: id UUID PK, job_id FK, product_id, product_description, hts_code, confidence, duty_rate, source, pga_flags TEXT[], created_at\n\n4. ✅ Flask Blueprint pattern (webhook_bp, line 47)\n5. ✅ Lazy Supabase client init (lines 49-60)\n6. ✅ Error handling: try/except with 500 response + logging"
}

Audit Trail (3)

2d agotask_completedAG Lu
2d agotask_claimedAG Lu
21d agotask_createdAG Lu
Task ID: 6397211c-6c17-45e9-b42a-a03dd0bc801c