FOUNDRY
C8 Platform
← Tasks

BRIDGE-7: River Learning Loop ← API Classification Writeback

completedcode_genP1REVENUE-26

Description

Close the flywheel: every Ship API classification writes back to the Data Empire, making the next call cheaper. Flow: 1. Ship /classify returns result to customer 2. Async: write classification to river_classifications (audit trail) 3. Async: if confidence >= 0.85, write to product_digital_twins (grows Bridge 1 cache) 4. Async: if confidence >= 0.90, write to oracle_knowledge_base (grows Bridge 2 KB) 5. River Learn stage (Stage 6): aggregate metrics, update PTM trust scores 6. Next classification cycle: more cache hits, fewer LLM calls This is the competitive moat: every API call makes the system smarter. Competitors need 10.5M classified products to start — we grow ours with every customer interaction. Implementation: - Pub/Sub: publish classification.completed event after every /classify - River subscriber: ingest completed classifications - Writeback logic: confidence-gated insertion to twins + KB - Deduplication: UPSERT by pattern_id to avoid bloating Output: Pub/Sub writeback publisher + River ingest subscriber + upsert logic Blocked by: Ship service (06c034a5), Bridge 1 (e42b3ad8)
Assignee
-
Claimed By
Cloud Lu → Claude Sonnet
Created
7h ago
Completed
7h ago

Quality

Overall
71%
Pass
PASS
Graded By
-
Graded At
-

Cost

Input Tokens
-
Output Tokens
-
Est. USD
-

Result

{
  "cost": {
    "input_tokens": 318,
    "estimated_usd": 0.061869,
    "output_tokens": 4061
  },
  "model": "claude-sonnet-4-20250514",
  "status": "success",
  "transport": "anthropic_api",
  "latency_ms": 51399,
  "response_text": "# BRIDGE-7: River Learning Loop Implementation\n\n## Architecture Overview\n\n```\nShip API → Classification → Writeback Loop → Smarter System\n    ↓           ↓              ↓              ↑\nCustomer    Confidence    Data Growth    Competitive Moat\n```\n\n## Core Components\n\n### 1. Pub/Sub Event Publisher\n\n```typescript\n// services/ship/src/events/classification-publisher.ts\nimport { PubSub } from '@google-cloud/pubsub';\nimport { ClassificationEvent, ClassificationResult } from '../types';\n\nexport class ClassificationEventPublisher {\n  private pubsub: PubSub;\n  private topicName = 'classification-completed';\n\n  constructor() {\n    this.pubsub = new PubSub();\n  }\n\n  async publishClassification(result: ClassificationResult): Promise<void> {\n    const event: ClassificationEvent = {\n      eventId: crypto.randomUUID(),\n      timestamp: new Date().toISOString(),\n      customerId: result.customerId,\n      classification: {\n        patternId: result.patternId,\n        productName: result.productName,\n        confidence: result.confidence,\n        attributes: result.attributes,\n        source: result.source, // 'cache' | 'llm' | 'hybrid'\n        processingTimeMs: result.processingTimeMs\n      },\n      metadata: {\n        apiVersion: '1.0',\n        region: process.env.REGION || 'us-central1'\n      }\n    };\n\n    const topic = this.pubsub.topic(this.topicName);\n    const messageBuffer = Buffer.from(JSON.stringify(event));\n    \n    await topic.publishMessage({\n      data: messageBuffer,\n      attributes: {\n        eventType: 'classification.completed',\n        confidence: result.confidence.toString(),\n        source: result.source\n      }\n    });\n  }\n}\n```\n\n### 2. River Learning Subscriber\n\n```typescript\n// services/river/src/subscribers/classification-learner.ts\nimport { Message } from '@google-cloud/pubsub';\nimport { ClassificationEvent } from '../types';\nimport { DataWritebackService } from '../services/data-writeback';\nimport { MetricsAggregator } from '../services/metrics-aggregator';\n\nexport class ClassificationLearningSubscriber {\n  private writebackService: DataWritebackService;\n  private metricsAggregator: MetricsAggregator;\n\n  constructor(\n    writebackService: DataWritebackService,\n    metricsAggregator: MetricsAggregator\n  ) {\n    this.writebackService = writebackService;\n    this.metricsAggregator = metricsAggregator;\n  }\n\n  async processClassification(message: Message): Promise<void> {\n    try {\n      const event: ClassificationEvent = JSON.parse(message.data.toString());\n      \n      // 1. Always write audit trail\n      await this.writeAuditTrail(event);\n      \n      // 2. Confidence-gated writeback\n      await this.executeWriteback(event);\n      \n      // 3. Update metrics and trust scores\n      await this.updateMetrics(event);\n      \n      message.ack();\n    } catch (error) {\n      console.error('Classification learning error:', error);\n      message.nack();\n    }\n  }\n\n  private async writeAuditTrail(event: ClassificationEvent): Promise<void> {\n    await this.writebackService.writeClassificationAudit({\n      classification_id: event.eventId,\n      customer_id: event.customerId,\n      pattern_id: event.classification.patternId,\n      product_name: event.classification.productName,\n      confidence: event.classification.confidence,\n      attributes: event.classification.attributes,\n      source: event.classification.source,\n      processing_time_ms: event.classification.processingTimeMs,\n      timestamp: event.timestamp\n    });\n  }\n\n  private async executeWriteback(event: ClassificationEvent): Promise<void> {\n    const { classification } = event;\n    \n    // Bridge 1 Cache: High confidence classifications\n    if (classification.confidence >= 0.85) {\n      await this.writebackService.upsertDigitalTwin({\n        pattern_id: classification.patternId,\n        product_name: classification.productName,\n        attributes: classification.attributes,\n        confidence: classification.confidence,\n        last_updated: event.timestamp,\n        usage_count: 1 // Will increment on conflict\n      });\n    }\n\n    // Bridge 2 Knowledge Base: Very high confidence\n    if (classification.confidence >= 0.90) {\n      await this.writebackService.upsertKnowledgeBase({\n        pattern_id: classification.patternId,\n        canonical_attributes: classification.attributes,\n        confidence_score: classification.confidence,\n        validation_count: 1,\n        last_validated: event.timestamp\n      });\n    }\n  }\n\n  private async updateMetrics(event: ClassificationEvent): Promise<void> {\n    await this.metricsAggregator.recordClassification({\n      source: event.classification.source,\n      confidence: event.classification.confidence,\n      processingTime: event.classification.processingTimeMs,\n      customerId: event.customerId,\n      timestamp: event.timestamp\n    });\n  }\n}\n```\n\n### 3. Data Writeback Service\n\n```typescript\n// services/river/src/services/data-writeback.ts\nimport { Database } from '../database';\nimport { \n  ClassificationAudit, \n  DigitalTwinUpsert, \n  KnowledgeBaseUpsert \n} from '../types';\n\nexport class DataWritebackService {\n  constructor(private db: Database) {}\n\n  async writeClassificationAudit(audit: ClassificationAudit): Promise<void> {\n    await this.db.query(`\n      INSERT INTO river_classifications (\n        classification_id, customer_id, pattern_id, product_name,\n        confidence, attributes, source, processing_time_ms, timestamp\n      ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n    `, [\n      audit.classification_id,\n      audit.customer_id,\n      audit.pattern_id,\n      audit.product_name,\n      audit.confidence,\n      JSON.stringify(audit.attributes),\n      audit.source,\n      audit.processing_time_ms,\n      audit.timestamp\n    ]);\n  }\n\n  async upsertDigitalTwin(twin: DigitalTwinUpsert): Promise<void> {\n    await this.db.query(`\n      INSERT INTO product_digital_twins (\n        pattern_id, product_name, attributes, confidence, \n        last_updated, usage_count, created_at\n      ) VALUES ($1, $2, $3, $4, $5, $6, NOW())\n      ON CONFLICT (pattern_id) DO UPDATE SET\n        attributes = EXCLUDED.attributes,\n        confidence = GREATEST(product_digital_twins.confidence, EXCLUDED.confidence),\n        last_updated = EXCLUDED.last_updated,\n        usage_count = product_digital_twins.usage_count + 1\n    `, [\n      twin.pattern_id,\n      twin.product_name,\n      JSON.stringify(twin.attributes),\n      twin.confidence,\n      twin.last_updated,\n      twin.usage_count\n    ]);\n  }\n\n  async upsertKnowledgeBase(kb: KnowledgeBaseUpsert): Promise<void> {\n    await this.db.query(`\n      INSERT INTO oracle_knowledge_base (\n        pattern_id, canonical_attributes, confidence_score,\n        validation_count, last_validated, created_at\n      ) VALUES ($1, $2, $3, $4, $5, NOW())\n      ON CONFLICT (pattern_id) DO UPDATE SET\n        canonical_attributes = EXCLUDED.canonical_attributes,\n        confidence_score = GREATEST(oracle_knowledge_base.confidence_score, EXCLUDED.confidence_score),\n        validation_count = oracle_knowledge_base.validation_count + 1,\n        last_validated = EXCLUDED.last_validated\n    `, [\n      kb.pattern_id,\n      JSON.stringify(kb.canonical_attributes),\n      kb.confidence_score,\n      kb.validation_count,\n      kb.last_validated\n    ]);\n  }\n}\n```\n\n### 4. Metrics Aggregator\n\n```typescript\n// services/river/src/services/metrics-aggregator.ts\nimport { Database } from '../database';\nimport { ClassificationMetric } from '../types';\n\nexport class MetricsAggregator {\n  constructor(private db: Database) {}\n\n  async recordClassification(metric: ClassificationMetric): Promise<void> {\n    // Update hourly aggregates\n    await this.db.query(`\n      INSERT INTO classification_metrics_hourly (\n        hour_bucket, total_classifications, cache_hits, llm_calls,\n        avg_confidence, avg_processing_time_ms, unique_customers\n      ) VALUES (\n        date_trunc('hour', $1::timestamp), 1,\n        CASE WHEN $2 = 'cache' THEN 1 ELSE 0 END,\n        CASE WHEN $2 = 'llm' THEN 1 ELSE 0 END,\n        $3, $4, 1\n      )\n      ON CONFLICT (hour_bucket) DO UPDATE SET\n        total_classifications = classification_metrics_hourly.total_classifications + 1,\n        cache_hits = classification_metrics_hourly.cache_hits + \n          CASE WHEN $2 = 'cache' THEN 1 ELSE 0 END,\n        llm_calls = classification_metrics_hourly.llm_calls + \n          CASE WHEN $2 = 'llm' THEN 1 ELSE 0 END,\n        avg_confidence = (\n          classification_metrics_hourly.avg_confidence * \n          classification_metrics_hourly.total_classifications + $3\n        ) / (classification_metrics_hourly.total_classifications + 1),\n        avg_processing_time_ms = (\n          classification_metrics_hourly.avg_processing_time_ms * \n          classification_metrics_hourly.total_classifications + $4\n        ) / (classification_metrics_hourly.total_classifications + 1)\n    `, [\n      metric.timestamp,\n      metric.source,\n      metric.confidence,\n      metric.processingTime\n    ]);\n\n    // Update Pattern Trust Model (PTM) scores\n    await this.updatePTMScores(metric);\n  }\n\n  private async updatePTMScores(metric: ClassificationMetric): Promise<void> {\n    // This is where we update trust scores based on classification patterns\n    // Higher confidence + faster processing = higher trust\n    const trustDelta = this.calculateTrustDelta(metric);\n    \n    await this.db.query(`\n      UPDATE oracle_knowledge_base \n      SET trust_score = LEAST(1.0, trust_score + $1)\n      WHERE pattern_id IN (\n        SELECT pattern_id FROM river_classifications \n        WHERE customer_id = $2 \n        AND timestamp >= NOW() - INTERVAL '1 hour'\n      )\n    `, [trustDelta, metric.customerId]);\n  }\n\n  private calculateTrustDelta(metric: ClassificationMetric): number {\n    // Trust increases with high confidence, fast processing\n    const confidenceBonus = (metric.confidence - 0.5) * 0.01;\n    const speedBonus = metric.processingTime < 100 ? 0.005 : 0;\n    return Math.max(0, confidenceBonus + speedBonus);\n  }\n}\n```\n\n## Database Schema Extensions\n\n```sql\n-- services/river/migrations/007_learning_loop.sql\n\n-- Audit trail for all classifications\nCREATE TABLE river_classifications (\n    id BIGSERIAL PRIMARY KEY,\n    classification_id UUID UNIQUE NOT NULL,\n    customer_id VARCHAR(255) NOT NULL,\n    pattern_id VARCHAR(255) NOT NULL,\n    product_name TEXT NOT NULL,\n    confidence DECIMAL(4,3) NOT NULL,\n    attributes JSONB NOT NULL,\n    source VARCHAR(20) NOT NULL CHECK (source IN ('cache', 'llm', 'hybrid')),\n    processing_time_ms INTEGER NOT NULL,\n    timestamp TIMESTAMP WITH TIME ZONE NOT NULL,\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n\nCREATE INDEX idx_river_classifications_customer ON river_classifications(customer_id);\nCREATE INDEX idx_river_classifications_pattern ON river_classifications(pattern_id);\nCREATE INDEX idx_river_classifications_timestamp ON river_classifications(timestamp);\nCREATE INDEX idx_river_classifications_confidence ON river_classifications(confidence);\n\n-- Hourly metrics aggregation\nCREATE TABLE classification_metrics_hourly (\n    hour_bucket TIMESTAMP WITH TIME ZONE PRIMARY KEY,\n    total_classifications INTEGER NOT NULL DEFAULT 0,\n    cache_hits INTEGER NOT NULL DEFAULT 0,\n    llm_calls INTEGER NOT NULL DEFAULT 0,\n    avg_confidence DECIMAL(4,3) NOT NULL DEFAULT 0,\n    avg_processing_time_ms INTEGER NOT NULL DEFAULT 0,\n    unique_customers INTEGER NOT NULL DEFAULT 0,\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n\n-- Add trust scores to knowledge base\nALTER TABLE oracle_knowledge_base \nADD COLUMN trust_score DECIMAL(4,3) DEFAULT 0.500;\n\nCREATE INDEX idx_oracle_kb_trust_score ON oracle_knowledge_base(trust_score);\n```\n\n## Integration Points\n\n### Ship Service Integration\n\n```typescript\n// services/ship/src/controllers/classify.ts (modification)\nimport { ClassificationEventPublisher } from '../events/classification-publisher';\n\nexport class ClassifyController {\n  private publisher: ClassificationEventPublisher;\n\n  constructor() {\n    this.publisher = new ClassificationEventPublisher();\n  }\n\n  async classify(req: Request, res: Response): Promise<void> {\n    const result = await this.classificationService.classify(req.body);\n    \n    // Return to customer immediately\n    res.json(result);\n    \n    // Async: publish for learning loop\n    this.publisher.publishClassification(result).catch(error => {\n      console.error('Failed to publish classification event:', error);\n      // Don't fail the request - this is async learning\n    });\n  }\n}\n```\n\n## Deployment Configuration\n\n```yaml\n# services/river/k8s/classification-subscriber.yaml\napiVersion: apps/v1\nkind: Deployment\nmetadata:\n  name: river-classification-subscriber\nspec:\n  replicas: 3\n  selector:\n    matchLabels:\n      app: river-classification-subscriber\n  template:\n    spec:\n      containers:\n      - name: subscriber\n        image: gcr.io/PROJECT_ID/river-classification-subscriber\n        env:\n        - name: SUBSCRIPTION_NAME\n          value: \"classification-completed-sub\"\n        - name: MAX_MESSAGES\n          value: \"100\"\n        - name: ACK_DEADLINE_SECONDS\n          value: \"60\"\n        resources:\n          requests:\n            memory: \"256Mi\"\n            cpu: \"200m\"\n          limits:\n            memory: \"512Mi\"\n            cpu: \"500m\"\n```\n\n## Monitoring & Observability\n\n```typescript\n// Metrics to track the learning loop effectiveness\nconst learningMetrics = {\n  // Input metrics\n  classificationsProcessed: counter('classifications_processed_total'),\n  confidenceDistribution: histogram('classification_confidence_distribution'),\n  \n  // Writeback metrics  \n  digitalTwinsGrown: counter('digital_twins_grown_total'),\n  knowledgeBaseGrown: counter('knowledge_base_grown_total'),\n  \n  // Output metrics (competitive moat)\n  cacheHitRate: gauge('cache_hit_rate'),\n  avgProcessingTime: gauge('avg_processing_time_ms'),\n  llmCallReduction: gauge('llm_call_reduction_percent')\n};\n```\n\n## Success Metrics\n\n1. **Cache Hit Rate**: Should increase over time as twins grow\n2. **Processing Speed**: Faster responses as cache coverage improves  \n3. **LLM Cost Reduction**: Fewer expensive LLM calls needed\n4. **Knowledge Quality**: Higher confidence scores in knowledge base\n5. **Competitive Moat**: Growing classification database vs competitors\n\nThis creates the self-reinforcing flywheel: every API call makes the next one faster and cheaper, building an increasingly insurmountable competitive advantage."
}

Audit Trail (1)

7h agotask_createdAG Lu
Task ID: a26d7047-dd03-4e31-afb3-0780fd94ef04