← Tasks
[AR-BULK-01] AskRosetta bulk classification endpoint
completedcode_genP1
Description
Create a bulk classification endpoint for AskRosetta that accepts 100+ products and processes them asynchronously.
## Context
AskRosetta's current `/api/classify` endpoint handles one product at a time synchronously. For enterprise customers (eBay, Estafeta, Viettel), we need batch processing. The webhook handler (AR-HOOK-01) will deliver results when done.
Existing classification chain: `c8_platform/corridor_engine/classification_chain.py`
- `classify(request)` → ClassificationResult (single product)
- 2-tier: Rosetta Core (confidence 0.85) → Gemini fallback (0.70) → human_review
- ClassificationRequest: product_description, image_data, origin_country, target_corridors, language
## What to generate
File: `c8_platform/corridor_engine/bulk_classify.py`
A Flask blueprint that:
### 1. POST /api/v1/classify/bulk
Accepts batch of products:
```json
{
"products": [
{
"product_id": "client-sku-123",
"description": "Men's cotton t-shirt, crew neck",
"origin_country": "CN",
"image_url": null
}
],
"callback_url": "https://client.example.com/webhooks/classification",
"priority": "normal" | "express",
"corridor": "MX_US"
}
```
### 2. Behavior:
- Validate API key from `X-Api-Key` header against Supabase `api_keys` table
- Validate payload: max 500 products per batch, required fields
- Create job record in `classification_jobs` table with status='queued'
- Return immediately with job_id and estimated completion time:
```json
{"job_id": "uuid", "status": "queued", "estimated_seconds": 120, "poll_url": "/api/v1/jobs/{job_id}"}
```
- Process products in background thread (or queue):
- Batch into groups of 10 for parallel processing
- Use `asyncio.gather()` or `concurrent.futures.ThreadPoolExecutor`
- Call `classify()` from classification_chain for each product
- Update job progress in Supabase after each batch
- On completion, fire webhook to callback_url (AR-HOOK-01 format)
- Handle partial failures gracefully (continue processing remaining)
### 3. Rate limiting:
- Max 3 concurrent jobs per API key
- Max 500 products per job
- Express priority: process immediately, Normal: queued
### 4. Cost tracking:
- Track per-product classification cost (Rosetta vs Gemini tier used)
- Store in job metadata for billing
### 5. Error handling:
- Individual product failures don't kill the batch
- Failed products get `error` field in results
- Job status = 'partial' if some products failed
### Patterns from existing app.py:
```python
from classification_chain import classify, ClassificationRequest, ClassificationResult
from flask import Blueprint, request, jsonify
import os
from supabase import create_client
```
### Output: Complete Python file with Flask blueprint for bulk classification.
Assignee
-
Claimed By
AG Lu
Created
21d ago
Completed
2d ago
Quality
Overall
79%
Pass
PASS
Graded By
-
Graded At
-
Cost
Input Tokens
-
Output Tokens
-
Est. USD
-
Result
{
"code": "AR-BULK-01 Bulk Classification — All requirements PASS. File already exists.\n\nFile: c8_platform/corridor_engine/bulk_classify.py\n\n1. ✅ POST /api/v1/classify/bulk (line 256): Flask Blueprint endpoint\n2. ✅ API key validation against Supabase api_keys table (lines 53-65)\n3. ✅ Max 500 products per batch (line 36: MAX_PRODUCTS_PER_BATCH = 500)\n4. ✅ Creates job record in classification_jobs with status='queued'\n5. ✅ Returns immediately with job_id, status, estimated_seconds, poll_url\n6. ✅ Background processing with ThreadPoolExecutor (line 156, MAX_WORKERS=5)\n7. ✅ Batches into groups of 10 (line 38: BATCH_SIZE = 10)\n8. ✅ Calls classify() from classification_chain for each product\n9. ✅ Updates job progress in Supabase after each batch\n10. ✅ Fires webhook on completion\n11. ✅ Rate limiting: max 3 concurrent jobs per API key (lines 68-82)\n12. ✅ Partial failure handling: individual product errors don't kill batch\n13. ✅ Cost tracking in job metadata\n14. ✅ Flask Blueprint pattern (bulk_classify_bp)",
"content": "AR-BULK-01 Bulk Classification — All requirements PASS. File already exists.\n\nFile: c8_platform/corridor_engine/bulk_classify.py\n\n1. ✅ POST /api/v1/classify/bulk (line 256): Flask Blueprint endpoint\n2. ✅ API key validation against Supabase api_keys table (lines 53-65)\n3. ✅ Max 500 products per batch (line 36: MAX_PRODUCTS_PER_BATCH = 500)\n4. ✅ Creates job record in classification_jobs with status='queued'\n5. ✅ Returns immediately with job_id, status, estimated_seconds, poll_url\n6. ✅ Background processing with ThreadPoolExecutor (line 156, MAX_WORKERS=5)\n7. ✅ Batches into groups of 10 (line 38: BATCH_SIZE = 10)\n8. ✅ Calls classify() from classification_chain for each product\n9. ✅ Updates job progress in Supabase after each batch\n10. ✅ Fires webhook on completion\n11. ✅ Rate limiting: max 3 concurrent jobs per API key (lines 68-82)\n12. ✅ Partial failure handling: individual product errors don't kill batch\n13. ✅ Cost tracking in job metadata\n14. ✅ Flask Blueprint pattern (bulk_classify_bp)"
}Audit Trail (3)
2d agotask_completedAG Lu
2d agotask_claimedAG Lu
21d agotask_createdAG Lu
Task ID: 879eebc1-37e0-4b2d-b06b-41c0792b859b