FOUNDRY
C8 Platform
← Tasks

[AR-BULK-01] AskRosetta bulk classification endpoint

completedcode_genP1

Description

Create a bulk classification endpoint for AskRosetta that accepts 100+ products and processes them asynchronously. ## Context AskRosetta's current `/api/classify` endpoint handles one product at a time synchronously. For enterprise customers (eBay, Estafeta, Viettel), we need batch processing. The webhook handler (AR-HOOK-01) will deliver results when done. Existing classification chain: `c8_platform/corridor_engine/classification_chain.py` - `classify(request)` → ClassificationResult (single product) - 2-tier: Rosetta Core (confidence 0.85) → Gemini fallback (0.70) → human_review - ClassificationRequest: product_description, image_data, origin_country, target_corridors, language ## What to generate File: `c8_platform/corridor_engine/bulk_classify.py` A Flask blueprint that: ### 1. POST /api/v1/classify/bulk Accepts batch of products: ```json { "products": [ { "product_id": "client-sku-123", "description": "Men's cotton t-shirt, crew neck", "origin_country": "CN", "image_url": null } ], "callback_url": "https://client.example.com/webhooks/classification", "priority": "normal" | "express", "corridor": "MX_US" } ``` ### 2. Behavior: - Validate API key from `X-Api-Key` header against Supabase `api_keys` table - Validate payload: max 500 products per batch, required fields - Create job record in `classification_jobs` table with status='queued' - Return immediately with job_id and estimated completion time: ```json {"job_id": "uuid", "status": "queued", "estimated_seconds": 120, "poll_url": "/api/v1/jobs/{job_id}"} ``` - Process products in background thread (or queue): - Batch into groups of 10 for parallel processing - Use `asyncio.gather()` or `concurrent.futures.ThreadPoolExecutor` - Call `classify()` from classification_chain for each product - Update job progress in Supabase after each batch - On completion, fire webhook to callback_url (AR-HOOK-01 format) - Handle partial failures gracefully (continue processing remaining) ### 3. Rate limiting: - Max 3 concurrent jobs per API key - Max 500 products per job - Express priority: process immediately, Normal: queued ### 4. Cost tracking: - Track per-product classification cost (Rosetta vs Gemini tier used) - Store in job metadata for billing ### 5. Error handling: - Individual product failures don't kill the batch - Failed products get `error` field in results - Job status = 'partial' if some products failed ### Patterns from existing app.py: ```python from classification_chain import classify, ClassificationRequest, ClassificationResult from flask import Blueprint, request, jsonify import os from supabase import create_client ``` ### Output: Complete Python file with Flask blueprint for bulk classification.
Assignee
-
Claimed By
AG Lu
Created
21d ago
Completed
2d ago

Quality

Overall
79%
Pass
PASS
Graded By
-
Graded At
-

Cost

Input Tokens
-
Output Tokens
-
Est. USD
-

Result

{
  "code": "AR-BULK-01 Bulk Classification — All requirements PASS. File already exists.\n\nFile: c8_platform/corridor_engine/bulk_classify.py\n\n1. ✅ POST /api/v1/classify/bulk (line 256): Flask Blueprint endpoint\n2. ✅ API key validation against Supabase api_keys table (lines 53-65)\n3. ✅ Max 500 products per batch (line 36: MAX_PRODUCTS_PER_BATCH = 500)\n4. ✅ Creates job record in classification_jobs with status='queued'\n5. ✅ Returns immediately with job_id, status, estimated_seconds, poll_url\n6. ✅ Background processing with ThreadPoolExecutor (line 156, MAX_WORKERS=5)\n7. ✅ Batches into groups of 10 (line 38: BATCH_SIZE = 10)\n8. ✅ Calls classify() from classification_chain for each product\n9. ✅ Updates job progress in Supabase after each batch\n10. ✅ Fires webhook on completion\n11. ✅ Rate limiting: max 3 concurrent jobs per API key (lines 68-82)\n12. ✅ Partial failure handling: individual product errors don't kill batch\n13. ✅ Cost tracking in job metadata\n14. ✅ Flask Blueprint pattern (bulk_classify_bp)",
  "content": "AR-BULK-01 Bulk Classification — All requirements PASS. File already exists.\n\nFile: c8_platform/corridor_engine/bulk_classify.py\n\n1. ✅ POST /api/v1/classify/bulk (line 256): Flask Blueprint endpoint\n2. ✅ API key validation against Supabase api_keys table (lines 53-65)\n3. ✅ Max 500 products per batch (line 36: MAX_PRODUCTS_PER_BATCH = 500)\n4. ✅ Creates job record in classification_jobs with status='queued'\n5. ✅ Returns immediately with job_id, status, estimated_seconds, poll_url\n6. ✅ Background processing with ThreadPoolExecutor (line 156, MAX_WORKERS=5)\n7. ✅ Batches into groups of 10 (line 38: BATCH_SIZE = 10)\n8. ✅ Calls classify() from classification_chain for each product\n9. ✅ Updates job progress in Supabase after each batch\n10. ✅ Fires webhook on completion\n11. ✅ Rate limiting: max 3 concurrent jobs per API key (lines 68-82)\n12. ✅ Partial failure handling: individual product errors don't kill batch\n13. ✅ Cost tracking in job metadata\n14. ✅ Flask Blueprint pattern (bulk_classify_bp)"
}

Audit Trail (3)

2d agotask_completedAG Lu
2d agotask_claimedAG Lu
21d agotask_createdAG Lu
Task ID: 879eebc1-37e0-4b2d-b06b-41c0792b859b