← Tasks
[AR-HOOK-01] AskRosetta webhook for async classification results
completedcode_genP1
Description
Create a webhook endpoint for AskRosetta that receives async classification results and stores them in Supabase.
## Context
AskRosetta is an HTS product classification API deployed on Google Cloud Run (Zurich). The Cloudflare Worker at `oneworld_trade/deploy/askrosetta-ai/worker.js` proxies requests. The classification engine is in `c8_platform/corridor_engine/` with Flask endpoints.
Current flow is synchronous: client calls `/api/classify` → waits for response. For large batches, we need async: client submits → gets job_id → webhook fires when done.
## What to generate
File: `c8_platform/corridor_engine/webhook_handler.py`
A Flask blueprint that:
### 1. POST /api/v1/webhooks/classification-complete
Receives classification results from async processing:
```json
{
"job_id": "uuid",
"status": "completed" | "failed" | "partial",
"results": [
{
"product_id": "string",
"product_description": "string",
"hts_code": "6109.10.00",
"hts_description": "T-shirts, singlets...",
"confidence": 0.92,
"duty_rate": 0.167,
"source": "rosetta" | "gemini",
"section_301_applies": false,
"pga_flags": ["FDA"]
}
],
"metadata": {
"total_products": 50,
"completed": 48,
"failed": 2,
"processing_time_ms": 12500,
"cost_usd": 0.15
}
}
```
### 2. Behavior:
- Validate webhook signature (HMAC-SHA256 with shared secret from `WEBHOOK_SECRET` env var)
- Store results in Supabase `classification_jobs` table (upsert by job_id)
- Store individual results in `classification_results` table (batch insert)
- If client provided a `callback_url`, forward the results there (fire-and-forget POST)
- Return 200 with `{"received": true, "job_id": "..."}`
- Idempotent: re-processing same job_id updates existing records
### 3. GET /api/v1/jobs/{job_id}
Poll endpoint for checking job status:
- Returns job status, progress (completed/total), and results if done
- Auth: API key in `X-Api-Key` header
### 4. Supabase tables (include CREATE TABLE SQL as comments):
```sql
-- classification_jobs: job_id (UUID PK), client_id, status, callback_url, total_products, completed_products, created_at, completed_at, metadata (JSONB)
-- classification_results: id (UUID PK), job_id (FK), product_id, product_description, hts_code, confidence, duty_rate, source, pga_flags (TEXT[]), created_at
```
### Patterns from existing code (app.py):
- Flask blueprint registration
- `from supabase import create_client` for Supabase access
- Environment variables via `os.environ.get()`
- JSON response format: `jsonify({"status": "ok", ...})`
- Error handling: try/except with 500 response
### Output: Complete Python file with Flask blueprint, ready to register in app.py.
Assignee
-
Claimed By
AG Lu
Created
21d ago
Completed
2d ago
Quality
Overall
79%
Pass
PASS
Graded By
-
Graded At
-
Cost
Input Tokens
-
Output Tokens
-
Est. USD
-
Result
{
"code": "AR-HOOK-01 Webhook Handler — All requirements PASS. File already exists.\n\nFile: c8_platform/corridor_engine/webhook_handler.py (231 lines)\n\n1. ✅ POST /api/v1/webhooks/classification-complete (line 92):\n - HMAC-SHA256 signature verification via WEBHOOK_SECRET env var (lines 97-108)\n - Parses job_id, status, results[], metadata from JSON body\n - Upserts to classification_jobs table (line 137)\n - Batch inserts to classification_results with dedup (lines 142-167)\n - Forwards to callback_url via fire-and-forget thread (lines 170-181)\n - Returns 200 with {\"received\": true, \"job_id\": \"...\"}\n - Idempotent: checks existing product_ids before inserting\n\n2. ✅ GET /api/v1/jobs/{job_id} (line 190):\n - API key auth via X-Api-Key header\n - Returns job status, progress, metadata\n - Includes results array when status is completed/partial\n\n3. ✅ CREATE TABLE SQL in docstring (lines 9-32):\n - classification_jobs: job_id UUID PK, client_id, status, callback_url, total/completed counts, timestamps, metadata JSONB\n - classification_results: id UUID PK, job_id FK, product_id, product_description, hts_code, confidence, duty_rate, source, pga_flags TEXT[], created_at\n\n4. ✅ Flask Blueprint pattern (webhook_bp, line 47)\n5. ✅ Lazy Supabase client init (lines 49-60)\n6. ✅ Error handling: try/except with 500 response + logging",
"content": "AR-HOOK-01 Webhook Handler — All requirements PASS. File already exists.\n\nFile: c8_platform/corridor_engine/webhook_handler.py (231 lines)\n\n1. ✅ POST /api/v1/webhooks/classification-complete (line 92):\n - HMAC-SHA256 signature verification via WEBHOOK_SECRET env var (lines 97-108)\n - Parses job_id, status, results[], metadata from JSON body\n - Upserts to classification_jobs table (line 137)\n - Batch inserts to classification_results with dedup (lines 142-167)\n - Forwards to callback_url via fire-and-forget thread (lines 170-181)\n - Returns 200 with {\"received\": true, \"job_id\": \"...\"}\n - Idempotent: checks existing product_ids before inserting\n\n2. ✅ GET /api/v1/jobs/{job_id} (line 190):\n - API key auth via X-Api-Key header\n - Returns job status, progress, metadata\n - Includes results array when status is completed/partial\n\n3. ✅ CREATE TABLE SQL in docstring (lines 9-32):\n - classification_jobs: job_id UUID PK, client_id, status, callback_url, total/completed counts, timestamps, metadata JSONB\n - classification_results: id UUID PK, job_id FK, product_id, product_description, hts_code, confidence, duty_rate, source, pga_flags TEXT[], created_at\n\n4. ✅ Flask Blueprint pattern (webhook_bp, line 47)\n5. ✅ Lazy Supabase client init (lines 49-60)\n6. ✅ Error handling: try/except with 500 response + logging"
}Audit Trail (3)
2d agotask_completedAG Lu
2d agotask_claimedAG Lu
21d agotask_createdAG Lu
Task ID: 6397211c-6c17-45e9-b42a-a03dd0bc801c