← Tasks
BRIDGE-7: River Learning Loop ← API Classification Writeback
completedcode_genP1REVENUE-26
Description
Close the flywheel: every Ship API classification writes back to the Data Empire, making the next call cheaper.
Flow:
1. Ship /classify returns result to customer
2. Async: write classification to river_classifications (audit trail)
3. Async: if confidence >= 0.85, write to product_digital_twins (grows Bridge 1 cache)
4. Async: if confidence >= 0.90, write to oracle_knowledge_base (grows Bridge 2 KB)
5. River Learn stage (Stage 6): aggregate metrics, update PTM trust scores
6. Next classification cycle: more cache hits, fewer LLM calls
This is the competitive moat: every API call makes the system smarter.
Competitors need 10.5M classified products to start — we grow ours with
every customer interaction.
Implementation:
- Pub/Sub: publish classification.completed event after every /classify
- River subscriber: ingest completed classifications
- Writeback logic: confidence-gated insertion to twins + KB
- Deduplication: UPSERT by pattern_id to avoid bloating
Output: Pub/Sub writeback publisher + River ingest subscriber + upsert logic
Blocked by: Ship service (06c034a5), Bridge 1 (e42b3ad8)
Assignee
-
Claimed By
Cloud Lu → Claude Sonnet
Created
7h ago
Completed
7h ago
Quality
Overall
71%
Pass
PASS
Graded By
-
Graded At
-
Cost
Input Tokens
-
Output Tokens
-
Est. USD
-
Result
{
"cost": {
"input_tokens": 318,
"estimated_usd": 0.061869,
"output_tokens": 4061
},
"model": "claude-sonnet-4-20250514",
"status": "success",
"transport": "anthropic_api",
"latency_ms": 51399,
"response_text": "# BRIDGE-7: River Learning Loop Implementation\n\n## Architecture Overview\n\n```\nShip API → Classification → Writeback Loop → Smarter System\n ↓ ↓ ↓ ↑\nCustomer Confidence Data Growth Competitive Moat\n```\n\n## Core Components\n\n### 1. Pub/Sub Event Publisher\n\n```typescript\n// services/ship/src/events/classification-publisher.ts\nimport { PubSub } from '@google-cloud/pubsub';\nimport { ClassificationEvent, ClassificationResult } from '../types';\n\nexport class ClassificationEventPublisher {\n private pubsub: PubSub;\n private topicName = 'classification-completed';\n\n constructor() {\n this.pubsub = new PubSub();\n }\n\n async publishClassification(result: ClassificationResult): Promise<void> {\n const event: ClassificationEvent = {\n eventId: crypto.randomUUID(),\n timestamp: new Date().toISOString(),\n customerId: result.customerId,\n classification: {\n patternId: result.patternId,\n productName: result.productName,\n confidence: result.confidence,\n attributes: result.attributes,\n source: result.source, // 'cache' | 'llm' | 'hybrid'\n processingTimeMs: result.processingTimeMs\n },\n metadata: {\n apiVersion: '1.0',\n region: process.env.REGION || 'us-central1'\n }\n };\n\n const topic = this.pubsub.topic(this.topicName);\n const messageBuffer = Buffer.from(JSON.stringify(event));\n \n await topic.publishMessage({\n data: messageBuffer,\n attributes: {\n eventType: 'classification.completed',\n confidence: result.confidence.toString(),\n source: result.source\n }\n });\n }\n}\n```\n\n### 2. River Learning Subscriber\n\n```typescript\n// services/river/src/subscribers/classification-learner.ts\nimport { Message } from '@google-cloud/pubsub';\nimport { ClassificationEvent } from '../types';\nimport { DataWritebackService } from '../services/data-writeback';\nimport { MetricsAggregator } from '../services/metrics-aggregator';\n\nexport class ClassificationLearningSubscriber {\n private writebackService: DataWritebackService;\n private metricsAggregator: MetricsAggregator;\n\n constructor(\n writebackService: DataWritebackService,\n metricsAggregator: MetricsAggregator\n ) {\n this.writebackService = writebackService;\n this.metricsAggregator = metricsAggregator;\n }\n\n async processClassification(message: Message): Promise<void> {\n try {\n const event: ClassificationEvent = JSON.parse(message.data.toString());\n \n // 1. Always write audit trail\n await this.writeAuditTrail(event);\n \n // 2. Confidence-gated writeback\n await this.executeWriteback(event);\n \n // 3. Update metrics and trust scores\n await this.updateMetrics(event);\n \n message.ack();\n } catch (error) {\n console.error('Classification learning error:', error);\n message.nack();\n }\n }\n\n private async writeAuditTrail(event: ClassificationEvent): Promise<void> {\n await this.writebackService.writeClassificationAudit({\n classification_id: event.eventId,\n customer_id: event.customerId,\n pattern_id: event.classification.patternId,\n product_name: event.classification.productName,\n confidence: event.classification.confidence,\n attributes: event.classification.attributes,\n source: event.classification.source,\n processing_time_ms: event.classification.processingTimeMs,\n timestamp: event.timestamp\n });\n }\n\n private async executeWriteback(event: ClassificationEvent): Promise<void> {\n const { classification } = event;\n \n // Bridge 1 Cache: High confidence classifications\n if (classification.confidence >= 0.85) {\n await this.writebackService.upsertDigitalTwin({\n pattern_id: classification.patternId,\n product_name: classification.productName,\n attributes: classification.attributes,\n confidence: classification.confidence,\n last_updated: event.timestamp,\n usage_count: 1 // Will increment on conflict\n });\n }\n\n // Bridge 2 Knowledge Base: Very high confidence\n if (classification.confidence >= 0.90) {\n await this.writebackService.upsertKnowledgeBase({\n pattern_id: classification.patternId,\n canonical_attributes: classification.attributes,\n confidence_score: classification.confidence,\n validation_count: 1,\n last_validated: event.timestamp\n });\n }\n }\n\n private async updateMetrics(event: ClassificationEvent): Promise<void> {\n await this.metricsAggregator.recordClassification({\n source: event.classification.source,\n confidence: event.classification.confidence,\n processingTime: event.classification.processingTimeMs,\n customerId: event.customerId,\n timestamp: event.timestamp\n });\n }\n}\n```\n\n### 3. Data Writeback Service\n\n```typescript\n// services/river/src/services/data-writeback.ts\nimport { Database } from '../database';\nimport { \n ClassificationAudit, \n DigitalTwinUpsert, \n KnowledgeBaseUpsert \n} from '../types';\n\nexport class DataWritebackService {\n constructor(private db: Database) {}\n\n async writeClassificationAudit(audit: ClassificationAudit): Promise<void> {\n await this.db.query(`\n INSERT INTO river_classifications (\n classification_id, customer_id, pattern_id, product_name,\n confidence, attributes, source, processing_time_ms, timestamp\n ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n `, [\n audit.classification_id,\n audit.customer_id,\n audit.pattern_id,\n audit.product_name,\n audit.confidence,\n JSON.stringify(audit.attributes),\n audit.source,\n audit.processing_time_ms,\n audit.timestamp\n ]);\n }\n\n async upsertDigitalTwin(twin: DigitalTwinUpsert): Promise<void> {\n await this.db.query(`\n INSERT INTO product_digital_twins (\n pattern_id, product_name, attributes, confidence, \n last_updated, usage_count, created_at\n ) VALUES ($1, $2, $3, $4, $5, $6, NOW())\n ON CONFLICT (pattern_id) DO UPDATE SET\n attributes = EXCLUDED.attributes,\n confidence = GREATEST(product_digital_twins.confidence, EXCLUDED.confidence),\n last_updated = EXCLUDED.last_updated,\n usage_count = product_digital_twins.usage_count + 1\n `, [\n twin.pattern_id,\n twin.product_name,\n JSON.stringify(twin.attributes),\n twin.confidence,\n twin.last_updated,\n twin.usage_count\n ]);\n }\n\n async upsertKnowledgeBase(kb: KnowledgeBaseUpsert): Promise<void> {\n await this.db.query(`\n INSERT INTO oracle_knowledge_base (\n pattern_id, canonical_attributes, confidence_score,\n validation_count, last_validated, created_at\n ) VALUES ($1, $2, $3, $4, $5, NOW())\n ON CONFLICT (pattern_id) DO UPDATE SET\n canonical_attributes = EXCLUDED.canonical_attributes,\n confidence_score = GREATEST(oracle_knowledge_base.confidence_score, EXCLUDED.confidence_score),\n validation_count = oracle_knowledge_base.validation_count + 1,\n last_validated = EXCLUDED.last_validated\n `, [\n kb.pattern_id,\n JSON.stringify(kb.canonical_attributes),\n kb.confidence_score,\n kb.validation_count,\n kb.last_validated\n ]);\n }\n}\n```\n\n### 4. Metrics Aggregator\n\n```typescript\n// services/river/src/services/metrics-aggregator.ts\nimport { Database } from '../database';\nimport { ClassificationMetric } from '../types';\n\nexport class MetricsAggregator {\n constructor(private db: Database) {}\n\n async recordClassification(metric: ClassificationMetric): Promise<void> {\n // Update hourly aggregates\n await this.db.query(`\n INSERT INTO classification_metrics_hourly (\n hour_bucket, total_classifications, cache_hits, llm_calls,\n avg_confidence, avg_processing_time_ms, unique_customers\n ) VALUES (\n date_trunc('hour', $1::timestamp), 1,\n CASE WHEN $2 = 'cache' THEN 1 ELSE 0 END,\n CASE WHEN $2 = 'llm' THEN 1 ELSE 0 END,\n $3, $4, 1\n )\n ON CONFLICT (hour_bucket) DO UPDATE SET\n total_classifications = classification_metrics_hourly.total_classifications + 1,\n cache_hits = classification_metrics_hourly.cache_hits + \n CASE WHEN $2 = 'cache' THEN 1 ELSE 0 END,\n llm_calls = classification_metrics_hourly.llm_calls + \n CASE WHEN $2 = 'llm' THEN 1 ELSE 0 END,\n avg_confidence = (\n classification_metrics_hourly.avg_confidence * \n classification_metrics_hourly.total_classifications + $3\n ) / (classification_metrics_hourly.total_classifications + 1),\n avg_processing_time_ms = (\n classification_metrics_hourly.avg_processing_time_ms * \n classification_metrics_hourly.total_classifications + $4\n ) / (classification_metrics_hourly.total_classifications + 1)\n `, [\n metric.timestamp,\n metric.source,\n metric.confidence,\n metric.processingTime\n ]);\n\n // Update Pattern Trust Model (PTM) scores\n await this.updatePTMScores(metric);\n }\n\n private async updatePTMScores(metric: ClassificationMetric): Promise<void> {\n // This is where we update trust scores based on classification patterns\n // Higher confidence + faster processing = higher trust\n const trustDelta = this.calculateTrustDelta(metric);\n \n await this.db.query(`\n UPDATE oracle_knowledge_base \n SET trust_score = LEAST(1.0, trust_score + $1)\n WHERE pattern_id IN (\n SELECT pattern_id FROM river_classifications \n WHERE customer_id = $2 \n AND timestamp >= NOW() - INTERVAL '1 hour'\n )\n `, [trustDelta, metric.customerId]);\n }\n\n private calculateTrustDelta(metric: ClassificationMetric): number {\n // Trust increases with high confidence, fast processing\n const confidenceBonus = (metric.confidence - 0.5) * 0.01;\n const speedBonus = metric.processingTime < 100 ? 0.005 : 0;\n return Math.max(0, confidenceBonus + speedBonus);\n }\n}\n```\n\n## Database Schema Extensions\n\n```sql\n-- services/river/migrations/007_learning_loop.sql\n\n-- Audit trail for all classifications\nCREATE TABLE river_classifications (\n id BIGSERIAL PRIMARY KEY,\n classification_id UUID UNIQUE NOT NULL,\n customer_id VARCHAR(255) NOT NULL,\n pattern_id VARCHAR(255) NOT NULL,\n product_name TEXT NOT NULL,\n confidence DECIMAL(4,3) NOT NULL,\n attributes JSONB NOT NULL,\n source VARCHAR(20) NOT NULL CHECK (source IN ('cache', 'llm', 'hybrid')),\n processing_time_ms INTEGER NOT NULL,\n timestamp TIMESTAMP WITH TIME ZONE NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n\nCREATE INDEX idx_river_classifications_customer ON river_classifications(customer_id);\nCREATE INDEX idx_river_classifications_pattern ON river_classifications(pattern_id);\nCREATE INDEX idx_river_classifications_timestamp ON river_classifications(timestamp);\nCREATE INDEX idx_river_classifications_confidence ON river_classifications(confidence);\n\n-- Hourly metrics aggregation\nCREATE TABLE classification_metrics_hourly (\n hour_bucket TIMESTAMP WITH TIME ZONE PRIMARY KEY,\n total_classifications INTEGER NOT NULL DEFAULT 0,\n cache_hits INTEGER NOT NULL DEFAULT 0,\n llm_calls INTEGER NOT NULL DEFAULT 0,\n avg_confidence DECIMAL(4,3) NOT NULL DEFAULT 0,\n avg_processing_time_ms INTEGER NOT NULL DEFAULT 0,\n unique_customers INTEGER NOT NULL DEFAULT 0,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n\n-- Add trust scores to knowledge base\nALTER TABLE oracle_knowledge_base \nADD COLUMN trust_score DECIMAL(4,3) DEFAULT 0.500;\n\nCREATE INDEX idx_oracle_kb_trust_score ON oracle_knowledge_base(trust_score);\n```\n\n## Integration Points\n\n### Ship Service Integration\n\n```typescript\n// services/ship/src/controllers/classify.ts (modification)\nimport { ClassificationEventPublisher } from '../events/classification-publisher';\n\nexport class ClassifyController {\n private publisher: ClassificationEventPublisher;\n\n constructor() {\n this.publisher = new ClassificationEventPublisher();\n }\n\n async classify(req: Request, res: Response): Promise<void> {\n const result = await this.classificationService.classify(req.body);\n \n // Return to customer immediately\n res.json(result);\n \n // Async: publish for learning loop\n this.publisher.publishClassification(result).catch(error => {\n console.error('Failed to publish classification event:', error);\n // Don't fail the request - this is async learning\n });\n }\n}\n```\n\n## Deployment Configuration\n\n```yaml\n# services/river/k8s/classification-subscriber.yaml\napiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: river-classification-subscriber\nspec:\n replicas: 3\n selector:\n matchLabels:\n app: river-classification-subscriber\n template:\n spec:\n containers:\n - name: subscriber\n image: gcr.io/PROJECT_ID/river-classification-subscriber\n env:\n - name: SUBSCRIPTION_NAME\n value: \"classification-completed-sub\"\n - name: MAX_MESSAGES\n value: \"100\"\n - name: ACK_DEADLINE_SECONDS\n value: \"60\"\n resources:\n requests:\n memory: \"256Mi\"\n cpu: \"200m\"\n limits:\n memory: \"512Mi\"\n cpu: \"500m\"\n```\n\n## Monitoring & Observability\n\n```typescript\n// Metrics to track the learning loop effectiveness\nconst learningMetrics = {\n // Input metrics\n classificationsProcessed: counter('classifications_processed_total'),\n confidenceDistribution: histogram('classification_confidence_distribution'),\n \n // Writeback metrics \n digitalTwinsGrown: counter('digital_twins_grown_total'),\n knowledgeBaseGrown: counter('knowledge_base_grown_total'),\n \n // Output metrics (competitive moat)\n cacheHitRate: gauge('cache_hit_rate'),\n avgProcessingTime: gauge('avg_processing_time_ms'),\n llmCallReduction: gauge('llm_call_reduction_percent')\n};\n```\n\n## Success Metrics\n\n1. **Cache Hit Rate**: Should increase over time as twins grow\n2. **Processing Speed**: Faster responses as cache coverage improves \n3. **LLM Cost Reduction**: Fewer expensive LLM calls needed\n4. **Knowledge Quality**: Higher confidence scores in knowledge base\n5. **Competitive Moat**: Growing classification database vs competitors\n\nThis creates the self-reinforcing flywheel: every API call makes the next one faster and cheaper, building an increasingly insurmountable competitive advantage."
}Audit Trail (1)
7h agotask_createdAG Lu
Task ID: a26d7047-dd03-4e31-afb3-0780fd94ef04