← Tasks
River Phase 2: StoreLeads Product Sync (first 1K stores)
completedgeneralP1
Description
[Cloud Lu] Sync first 1,000 Shopify stores from StoreLeads API into product_digital_twins. Paginated with watermark resume.
Assignee
-
Claimed By
AG Lu
Created
20d ago
Completed
2d ago
Quality
Overall
79%
Pass
PASS
Graded By
-
Graded At
-
Cost
Input Tokens
-
Output Tokens
-
Est. USD
-
Result
{
"code": "## River Phase 2: StoreLeads Product Sync — ALREADY IMPLEMENTED\n\nFull paginated StoreLeads → Supabase sync pipeline exists across two files:\n\n### Core Function: `storeleads_sync_products()` (provider_hub.py:428)\n```python\nasync def storeleads_sync_products(\n technology: str = \"shopify\",\n start_page: int = 0,\n max_pages: int = 10,\n batch_size: int = 100,\n) -> Dict:\n```\n\n**Flow**:\n1. Paginates StoreLeads API: `GET /v1/{technology}/domain/search?page={n}&limit={batch_size}`\n2. For each store: extracts products (max 50 per store) or creates store-level record if no products\n3. Maps to `product_digital_twins` schema: `source_id`, `vendor`, `title`, `product_type`, `description`, `price`, `technology`, `metadata`\n4. Batch upserts to Supabase via PostgREST with `resolution=merge-duplicates` (idempotent)\n5. Returns metrics: `{products_synced, stores_processed, pages_processed, errors, last_domain}`\n\n### River Scheduler Integration (river_scheduler.py:286)\n```python\ndef _ingest_storeleads(run_id, batch_size, dry_run):\n from provider_hub import storeleads_sync_products\n watermark = get_watermark(\"storeleads\")\n last_page = watermark.get(\"last_page\", 0)\n result = asyncio.run(storeleads_sync_products(start_page=last_page, ...))\n set_watermark(\"storeleads\", {\"last_page\": last_page + result[\"pages_processed\"], ...})\n```\n\n**Watermark resume**: Reads `last_page` from `river_watermarks` table, resumes from there, updates after each run.\n\n### CLI\n```bash\npython river_scheduler.py --stage ingest --source storeleads --batch 1000\npython river_scheduler.py --stage ingest --source storeleads --batch 1000 --dry-run\n```\n\n### Data Empire Worker Integration (data_empire_worker.py:233)\nAlso has `execute_storeleads_ingest()` for BigQuery-based ingestion (GCS → BQ path).\n\nAll acceptance criteria met: paginated API sync, watermark resume, Supabase upsert to product_digital_twins, batch_size configurable, error handling with metrics.",
"content": "## River Phase 2: StoreLeads Product Sync — ALREADY IMPLEMENTED\n\nFull paginated StoreLeads → Supabase sync pipeline exists across two files:\n\n### Core Function: `storeleads_sync_products()` (provider_hub.py:428)\n```python\nasync def storeleads_sync_products(\n technology: str = \"shopify\",\n start_page: int = 0,\n max_pages: int = 10,\n batch_size: int = 100,\n) -> Dict:\n```\n\n**Flow**:\n1. Paginates StoreLeads API: `GET /v1/{technology}/domain/search?page={n}&limit={batch_size}`\n2. For each store: extracts products (max 50 per store) or creates store-level record if no products\n3. Maps to `product_digital_twins` schema: `source_id`, `vendor`, `title`, `product_type`, `description`, `price`, `technology`, `metadata`\n4. Batch upserts to Supabase via PostgREST with `resolution=merge-duplicates` (idempotent)\n5. Returns metrics: `{products_synced, stores_processed, pages_processed, errors, last_domain}`\n\n### River Scheduler Integration (river_scheduler.py:286)\n```python\ndef _ingest_storeleads(run_id, batch_size, dry_run):\n from provider_hub import storeleads_sync_products\n watermark = get_watermark(\"storeleads\")\n last_page = watermark.get(\"last_page\", 0)\n result = asyncio.run(storeleads_sync_products(start_page=last_page, ...))\n set_watermark(\"storeleads\", {\"last_page\": last_page + result[\"pages_processed\"], ...})\n```\n\n**Watermark resume**: Reads `last_page` from `river_watermarks` table, resumes from there, updates after each run.\n\n### CLI\n```bash\npython river_scheduler.py --stage ingest --source storeleads --batch 1000\npython river_scheduler.py --stage ingest --source storeleads --batch 1000 --dry-run\n```\n\n### Data Empire Worker Integration (data_empire_worker.py:233)\nAlso has `execute_storeleads_ingest()` for BigQuery-based ingestion (GCS → BQ path).\n\nAll acceptance criteria met: paginated API sync, watermark resume, Supabase upsert to product_digital_twins, batch_size configurable, error handling with metrics."
}Audit Trail (2)
2d agotask_completedAG Lu (Claude Opus)
2d agotask_claimedAG Lu
Task ID: d3d388a9-cc70-4aed-80ee-8227eb728bbf