# Building an AI-Powered Database Migration Pipeline with LangGraph and Amazon Bedrock *How we reduced utility billing system migrations from weeks to days using orchestrated AI agents with human-in-the-loop validation* --- ## The Challenge: Database Migrations Are Painful If you've ever been involved in a large-scale database migration, you know the drill. A customer is moving from their legacy billing system to your platform, and someone needs to figure out how their `CUST_ACCT_NBR` maps to your `customer_account_id`. Multiply that by hundreds of tables and thousands of columns, and you're looking at weeks of tedious, error-prone work. For utility billing companies, this is a recurring challenge. Each new customer migration means: - **2-3 weeks** of manual schema analysis - **Dozens of mapping errors** discovered late in the process - **Inconsistent documentation** that varies by analyst - **Lost institutional knowledge** when team members move on With a large-scale migration approaching—a complex utility billing database with hundreds of tables—we needed a better way. ## Why Traditional Approaches Fall Short The obvious first thought: "Can't we just automate this with some string matching?" We tried. Column names like `bill_dt` and `billing_date` are easy. But what about `SERV_LOC_ADDR` mapping to `premise_service_address`? Or understanding that `KWH_USAGE` in one system represents the same concept as `consumption_reading` in another? Schema mapping isn't just about matching names—it's about understanding *meaning*. It requires: 1. **Semantic understanding** of what columns represent 2. **Domain knowledge** about utility billing concepts 3. **Judgment calls** when mappings are ambiguous 4. **Documentation** explaining why decisions were made This is exactly the kind of task where large language models excel. But LLMs alone aren't enough. You need orchestration, state management, human oversight, and the ability to learn from corrections. Enter LangGraph. ## What is LangGraph? LangGraph is an orchestration framework for building stateful, multi-step AI applications. Think of it as a way to define workflows where each step can be an AI agent, a deterministic function, or a human decision point. The key concepts are straightforward: - **Nodes**: Individual processing steps (AI-powered or deterministic) - **Edges**: Connections between nodes that define the flow - **State**: Shared data that flows through the graph and persists across steps - **Checkpointing**: Automatic state saving that enables pause/resume and recovery Here's what makes LangGraph particularly suited for complex workflows: ``` ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Discovery │────▶│ Mapper │────▶│ Evaluator │ └─────────────┘ └─────────────┘ └─────────────┘ │ ┌──────────────────────────┤ │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │Human Review │ │SQL Generator│ └─────────────┘ └─────────────┘ │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ Rectifier │ │ Documentor │ └─────────────┘ └─────────────┘ ``` Unlike simple sequential pipelines, LangGraph supports non-linear execution. Nodes can move forwards *and backwards*—A→B, but also B→A or C→A when needed. Low confidence mappings? Route to human review. Human rejects the mapping entirely? Loop back to the mapper for a fresh attempt. Rectification fails? Return to the evaluator for reassessment. This bidirectional flow is essential for real-world applications where you can't predict every path upfront. Traditional DAG-based orchestrators force you into a forward-only model, but complex workflows often need to revisit earlier steps based on what's learned downstream. ## Our Solution: A Hybrid Agent Pipeline We built a migration assistant that combines AI intelligence with deterministic validation and human oversight. The architecture uses 8 specialized nodes, split between AI-powered agents and deterministic processors. ### The AI Agents (Powered by Claude Sonnet via Amazon Bedrock) **Mapper Node**: The heart of the system. It analyzes source schemas, understands the semantic meaning of columns, and generates mappings to our canonical schema. Each mapping includes a confidence score (0.0-1.0) and detailed rationale. ```python # Example mapping output { "source_table": "customers", "source_column": "CUST_ACCT_NBR", "target_table": "customer", "target_column": "account_number", "confidence": 0.92, "rationale": "Strong semantic match for customer account identifier. Both serve as primary account references in utility billing context." } ``` **Rectifier Node**: When humans correct a mapping, this agent applies the feedback intelligently—not just to the specific correction, but to similar patterns throughout the schema. **SQL Generator Node**: Transforms approved mappings into executable DuckDB transformation scripts, complete with data type conversions and validation queries. **Documentor Node**: Generates 10 types of documentation automatically—from executive summaries to detailed data lineage reports. ### The Deterministic Nodes **Discovery Node**: Parses source files (CSV, SQL DDL, Excel), extracts metadata, infers relationships, and calculates data quality metrics. No AI needed—just solid parsing logic. **Evaluator Node**: Validates mappings against business rules. Are all required fields mapped? Are data types compatible? This catches issues before they reach production. **Human Review Node**: The critical human-in-the-loop checkpoint. When confidence drops below threshold (default 0.80), execution pauses and presents mappings for expert review. **Validation Node**: Final quality checks before output. Ensures transformation scripts are syntactically valid and documentation is complete. ## The Human-in-the-Loop Pattern Here's where LangGraph really shines. The framework provides native `interrupt()` functionality that pauses execution, saves state, and waits for human input—even if that takes days. ```python from langgraph.types import interrupt class HumanReviewNode: def execute(self, state): # Identify low-confidence mappings review_items = self._identify_review_items( state['mappings'], state['confidence_scores'] ) if review_items and state.get('review_mode') == 'ui': # Pause execution and wait for human input human_feedback = interrupt({ "type": "human_review", "items": review_items, "summary": f"{len(review_items)} mappings need review" }) # This line only executes after human provides feedback return self._apply_corrections(state, human_feedback) ``` When the pipeline hits an interrupt: 1. State is automatically checkpointed 2. The caller receives structured data about what needs review 3. A human reviews and provides feedback (approve, reject, modify) 4. Execution resumes exactly where it left off This isn't just a nice-to-have. For complex migrations, some mappings genuinely require human judgment. The key is making that judgment efficient by only surfacing the uncertain cases. ## Confidence Scoring: Knowing What You Don't Know One of the most valuable aspects of using LLMs for this task is their ability to express uncertainty. Our confidence scoring system weighs multiple factors: | Factor | Weight | Description | |--------|--------|-------------| | Semantic Similarity | 40% | How well do the names and purposes align? | | Data Type Compatibility | 25% | Direct match vs. conversion required | | Domain Context | 20% | Does this match utility billing patterns? | | Historical Success | 15% | Have similar mappings worked before? | The confidence score directly drives workflow decisions: - **≥ 0.80**: Auto-approve, continue to SQL generation - **0.50 - 0.79**: Route to human review - **< 0.50**: Flag as likely incorrect, require human decision This calibrated uncertainty is what makes the human-in-the-loop pattern practical. Without it, you'd either review everything (defeating the purpose) or review nothing (missing errors). ## Continuous Learning with DynamoDB Every human correction is an opportunity to improve. When a reviewer modifies a mapping, we store the pattern: ```python pattern = LearningPattern.create( pattern_type=PatternType.SCHEMA_MAPPING, source_pattern="customers.CUST_ACCT_NBR", target_pattern="customer.account_number", confidence_score=0.95, # Boosted after human approval context_metadata={ "original_confidence": 0.72, "human_feedback": "approve", "correction_reason": "Standard account identifier pattern" } ) ``` On subsequent migrations, the Mapper Node retrieves relevant historical patterns and includes them in the LLM context. Over time, the system learns: - Common naming conventions across different source systems - Domain-specific patterns (utility billing has its own vocabulary) - Edge cases that previously required human intervention This creates a flywheel effect. Each migration makes the next one faster and more accurate. ## Durable Execution: Surviving Failures Real-world pipelines fail. LLM APIs timeout. Servers restart. Users close their browsers mid-review. LangGraph's checkpointing handles all of this gracefully. We use DynamoDB as our checkpoint store for production deployments: ```python from backend.core.checkpointing import DynamoDBCheckpointSaver checkpointer = DynamoDBCheckpointSaver( table_name="migration-checkpoints", region="us-east-1", ttl_days=30 ) # Pipeline automatically saves state after each node pipeline = MigrationPipeline(config) compiled_graph = workflow.compile(checkpointer=checkpointer) ``` If a migration is interrupted—whether by an error, a timeout, or a human reviewer going to lunch—it can resume from the last checkpoint. No work is lost. This also enables "time travel" debugging. You can inspect the state at any previous checkpoint, understand what went wrong, and even replay from an earlier point with modified inputs. ## Integration with Amazon Bedrock We chose Amazon Bedrock for LLM access because it provides: - **Managed infrastructure**: No GPU provisioning or model hosting - **Multiple model options**: We use Claude Sonnet, but could switch models easily - **Enterprise security**: Data stays within AWS, with IAM-based access control - **Cost visibility**: Clear per-token pricing for budgeting The integration is straightforward with LangChain's Bedrock adapter: ```python from langchain_aws import ChatBedrock llm = ChatBedrock( model_id="anthropic.claude-3-sonnet-20240229-v1:0", region_name="us-east-1" ) ``` For embedding-based pre-filtering (matching obvious columns before invoking the LLM), we use local HuggingFace sentence-transformers by default, with Bedrock Titan as a fallback. This reduces LLM calls by up to 70% for schemas with many straightforward mappings. ## Results and Lessons Learned After deploying the pipeline for production migrations: - **Migration time**: Reduced from 2-3 weeks to 2-3 days (80% improvement) - **Mapping errors**: 90% fewer errors caught in production - **Documentation**: 10 document types generated automatically - **Knowledge retention**: Patterns stored and reused across migrations ### What Worked Well **Confidence-based routing** was the key insight. Not every mapping needs human review, and not every mapping can be fully automated. The sweet spot is routing based on model uncertainty. **Structured prompts with domain context** dramatically improved mapping quality. Generic prompts produced generic results. Prompts that included utility billing vocabulary and patterns produced expert-level mappings. **Checkpointing everything** saved us multiple times during development. When something went wrong, we could inspect state, fix the issue, and resume without re-running expensive LLM calls. ### What We'd Do Differently **Start with smaller models for simple tasks**. We initially used Claude Sonnet for everything, but many tasks (like generating boilerplate documentation) work fine with smaller, faster models. **Invest in evaluation earlier**. We built the pipeline before we had good metrics for mapping quality. Retrofitting evaluation was harder than building it in from the start. **Design for observability from day one**. LangSmith integration and structured logging were afterthoughts. They should have been foundational. ## Getting Started with LangGraph If you're considering LangGraph for your own AI workflows, here's the minimal setup: ```python from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver from typing import TypedDict # Define your state class MigrationState(TypedDict): source_schema: dict mappings: list confidence_scores: dict current_node: str # Create the graph workflow = StateGraph(MigrationState) # Add nodes workflow.add_node("discovery", discovery_node) workflow.add_node("mapper", mapper_node) workflow.add_node("validator", validator_node) # Define edges workflow.set_entry_point("discovery") workflow.add_edge("discovery", "mapper") workflow.add_edge("mapper", "validator") workflow.add_edge("validator", END) # Compile with checkpointing checkpointer = MemorySaver() app = workflow.compile(checkpointer=checkpointer) # Execute result = app.invoke(initial_state, {"configurable": {"thread_id": "migration-001"}}) ``` From there, you can add: - Conditional edges for branching logic - `interrupt()` calls for human-in-the-loop - Persistent checkpointers (DynamoDB, PostgreSQL) for production - Streaming for real-time progress updates ## Conclusion Database migration is a problem that's been around for decades, but the tools to solve it have fundamentally changed. LLMs provide the semantic understanding that rule-based systems lack. LangGraph provides the orchestration that raw LLM calls lack. And human-in-the-loop patterns provide the reliability that fully automated systems lack. The combination—AI agents for intelligence, deterministic nodes for validation, humans for judgment, and durable execution for reliability—creates something more powerful than any single approach. For organizations facing similar challenges, this means faster migrations, fewer errors, and institutional knowledge that compounds over time. It's a pattern that applies wherever you need to combine AI capabilities with human oversight and complex workflow logic. The code is running in production. And every correction makes the next migration a little bit smarter. --- *Built with LangGraph, Amazon Bedrock (Claude Sonnet), and AWS infrastructure.* # Deep Dive The below sections go deeper, read on if you love the tech. We'll walk through each of the 8 pipeline nodes, explain the technology decisions we made, and show how state flows through the system. ## The Pipeline at a Glance Before diving into each node, here's the complete flow: ``` ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Discovery │────▶│ Mapper │────▶│ Evaluator │ │ (Parsing) │ │ (AI Agent) │ │(Validation) │ └─────────────┘ └─────────────┘ └─────────────┘ │ ┌──────────────────────────┼──────────────────────────┐ │ │ │ ▼ ▼ │ ┌─────────────┐ ┌─────────────┐ │ │Human Review │ │SQL Generator│◀───────────────────┘ │ (Interrupt) │ │ (AI Agent) │ └─────────────┘ └─────────────┘ │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ Rectifier │ │ Documentor │ │ (AI Agent) │ │ (AI Agent) │ └─────────────┘ └─────────────┘ │ │ │ ▼ │ ┌─────────────┐ └─────────────────▶│ Validation │ │(Final QA) │ └─────────────┘ ``` The pipeline has 8 nodes split into two categories: **AI-Powered Nodes (4):** Mapper, Rectifier, SQL Generator, Documentor **Deterministic Nodes (4):** Discovery, Evaluator, Human Review, Validation This hybrid approach is intentional. AI excels at semantic understanding and generation, but deterministic logic is better for parsing, validation, and orchestration. Let's examine each node. --- ## State Management: The Foundation Before exploring the nodes, we need to understand how state flows through the pipeline. Managing state across an 8-node pipeline with both AI and deterministic components is complex—you need to track inputs, outputs, confidence scores, and human feedback across potentially long-running executions. This is where LangGraph's [persistence system](https://langchain-ai.github.io/langgraph/concepts/persistence/) becomes essential. We use thread-scoped state management where each migration run maintains its own isolated state throughout the entire execution. The `TypedDict` defines the complete state schema, giving us type safety and clear contracts between nodes: ```python class MigrationState(TypedDict): # Input source_file_path: str source_format: str # 'csv', 'sql', 'excel' # Discovery outputs source_schema: Dict[str, Any] data_quality_metrics: Dict[str, float] relationships: List[Dict[str, Any]] # Mapping outputs mappings: List[Dict[str, Any]] confidence_scores: Dict[str, float] mapping_rationale: Dict[str, str] # Rectification outputs corrected_mappings: List[Dict[str, Any]] correction_log: List[Dict[str, Any]] # SQL Generation outputs transformation_scripts: List[str] validation_queries: List[str] # Documentation outputs documentation: Dict[str, str] # 10 document types # Evaluation outputs quality_metrics: Dict[str, float] completeness_score: float # Human review human_feedback: Optional[Dict[str, Any]] review_required: bool # System state current_node: str execution_id: str thread_id: str error_log: List[Dict[str, Any]] learning_patterns: List[Dict[str, Any]] ``` Each node reads from this state, performs its work, and returns a dictionary of updates. LangGraph merges these updates into the state automatically and persists them at each checkpoint. This thread-scoped persistence means each migration maintains its own isolated state—no cross-contamination between concurrent migrations, but full continuity within a single execution. The immutable update pattern makes debugging straightforward—you can inspect the state at any checkpoint to understand exactly what happened. --- ## Node 1: Discovery (Deterministic) **Purpose:** Parse source files and extract normalized schema metadata. **Why Deterministic:** Schema parsing is a well-defined problem. We don't need AI to read CSV headers or parse SQL DDL—we need reliable, fast parsing logic. ### Technology Decision: DuckDB for Schema Inference For CSV files, we use DuckDB's `read_csv_auto` function with schema inference: ```python def _parse_data_csv_with_duckdb(self, file_path: str) -> DatabaseSchema: conn = duckdb.connect() table_name = Path(file_path).stem # DuckDB infers types from sample data conn.execute(f""" CREATE TABLE "{table_name}" AS SELECT * FROM read_csv_auto('{file_path}', SAMPLE_SIZE=10000) """) # Extract schema information schema_info = conn.execute(f'DESCRIBE "{table_name}"').fetchall() row_count = conn.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0] ``` DuckDB was chosen over pandas for several reasons: - **Type inference is more accurate** with larger sample sizes - **Memory efficient** for large files (doesn't load everything into memory) - **SQL interface** makes it easy to calculate data quality metrics ### Data Quality Metrics Discovery doesn't just parse—it calculates quality metrics that inform downstream decisions: ```python # Calculate null percentage per column null_count = conn.execute( f'SELECT COUNT(*) FROM "{table_name}" WHERE "{col_name}" IS NULL' ).fetchone()[0] null_percentage = (null_count / row_count * 100) if row_count > 0 else 0.0 # Calculate unique count for cardinality analysis unique_count = conn.execute( f'SELECT COUNT(DISTINCT "{col_name}") FROM "{table_name}"' ).fetchone()[0] ``` These metrics help the Mapper node make better decisions. A column with 90% nulls is probably optional. A column with unique values for every row is likely a primary key. ### Multi-Format Support Discovery handles three input formats: 1. **CSV:** DuckDB inference for data files, or metadata parsing for schema exports 2. **SQL DDL:** `sqlparse` library for CREATE TABLE statement extraction 3. **Excel:** `openpyxl` for structured schema workbooks The format detection is automatic based on file extension and content inspection. --- ## Node 2: Mapper (AI-Powered) **Purpose:** Generate semantic mappings from source schema to canonical schema with confidence scores. **Why AI:** This is where LLMs shine. Understanding that `CUST_ACCT_NBR` means the same thing as `customer_account_id` requires semantic reasoning that rule-based systems can't match. ### Local Embeddings: The Secret Weapon Before the LLM even sees a column, we run it through a local embedding-based pre-filter. This is one of the most impactful optimizations in the entire pipeline. **Why Local?** We use HuggingFace's `sentence-transformers/all-MiniLM-L6-v2` model running entirely in-memory. No API calls, no network latency, no per-token costs. The model is ~90MB and processes 1,300+ columns per second on CPU. ```python from langchain_huggingface import HuggingFaceEmbeddings # Initialize once, reuse everywhere embeddings_model = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", cache_folder="./models", model_kwargs={'device': 'cpu'}, encode_kwargs={'normalize_embeddings': True} # For cosine similarity ) ``` **How It Works:** We pre-compute embeddings for all canonical schema columns, then compare each source column against them using cosine similarity: ```python def _build_column_text(self, table: str, column: str, data_type: str) -> str: """Build text representation for embedding.""" return f"table: {table} column: {column} type: {data_type}" # Similarity thresholds drive routing decisions HIGH_CONFIDENCE = 0.90 # Auto-accept, skip LLM entirely MAYBE_THRESHOLD = 0.60 # Send to LLM with suggestion # Below 0.60: Full LLM processing required ``` **The Results:** For a schema with 12,515 columns: - Processing time: ~47 seconds (vs. 45+ minutes with Bedrock embeddings) - Cost: $0 (vs. potentially $100+ for cloud embeddings) - LLM call reduction: Up to 70% for schemas with obvious mappings **Intelligent Prioritization:** Not all columns are equal. We prioritize based on utility billing domain knowledge: | Priority | Score | Examples | |----------|-------|----------| | Primary Keys | 10 | `customer_id`, `account_pk` | | Foreign Keys | 9 | `billing_account_fk` | | Utility Keywords | 8 | `meter_reading`, `consumption` | | Standard Columns | 5 | `address`, `phone` | | Generic Columns | 2 | `created_at`, `updated_by` | **Fallback to Bedrock:** If local embeddings fail (rare), we fall back to Amazon Bedrock Titan embeddings. But in practice, local embeddings are faster, cheaper, and work offline—making them the clear default choice. ### The Mapping Prompt The Mapper uses a carefully crafted prompt with domain context: ```python UTILITY BILLING DOMAIN PATTERNS: - Customer entities: customer_id, account_number, service_address - Billing cycles: bill_date, due_date, billing_period, cycle_code - Meter readings: meter_number, reading_date, consumption, demand - Service types: electric, gas, water, sewer, waste - Rate structures: rate_code, tier_level, time_of_use ``` Including domain vocabulary dramatically improves mapping accuracy. Generic prompts produce generic results. ### Confidence Scoring Each mapping includes a calibrated confidence score based on multiple factors: | Factor | Weight | Description | |--------|--------|-------------| | Semantic Similarity | 40% | Column name and purpose alignment | | Data Type Compatibility | 25% | Direct match vs. conversion required | | Domain Context | 20% | Utility billing pattern recognition | | Historical Success | 15% | Similar pattern performance | The confidence score directly drives routing decisions: - **≥ 0.80:** Auto-approve, continue to SQL generation - **0.50 - 0.79:** Route to human review - **< 0.50:** Flag as likely incorrect ### Learning Database Integration Before generating new mappings, the Mapper retrieves historical patterns: ```python def _retrieve_historical_patterns(self, source_schema: DatabaseSchema) -> List[LearningPattern]: # Get patterns with similar table/column names for table in source_schema.tables: table_patterns = self.learning_db.get_similar_patterns( source_pattern=table.name, pattern_type=PatternType.SCHEMA_MAPPING, similarity_threshold=0.6, limit=10 ) relevant_patterns.extend(table_patterns) ``` These patterns are included in the LLM context, allowing the model to learn from previous migrations. Over time, this creates a flywheel effect—each migration makes the next one more accurate. ### Output Structure The Mapper returns structured JSON that downstream nodes can process: ```python { "mappings": [ { "source_table": "customers", "source_column": "CUST_ACCT_NBR", "target_table": "customer", "target_column": "account_number", "transformation_type": "direct", "transformation_logic": "Direct mapping - semantic match" } ], "confidence_scores": { "customers.CUST_ACCT_NBR": 0.92 }, "rationale": { "customers.CUST_ACCT_NBR": "Strong semantic match for customer account identifier..." } } ``` --- ## Node 3: Evaluator (Deterministic) **Purpose:** Validate mappings against business rules and calculate quality metrics. **Why Deterministic:** Validation rules are explicit. "All required fields must be mapped" doesn't need AI interpretation—it needs a checklist. ### Validation Checks The Evaluator runs multiple validation passes: 1. **Completeness Check:** Are all source columns mapped? 2. **Type Compatibility:** Can source types convert to target types? 3. **Constraint Validation:** Are NOT NULL and foreign key constraints satisfied? 4. **Coverage Analysis:** What percentage of the canonical schema is populated? ```python def _validate_source_schema_completeness(self, mappings, source_schema_dict): # Count all source elements all_source_elements = set() for table in source_tables: for column in table.get('columns', []): element_key = f"{table['name']}.{column['name']}" all_source_elements.add(element_key) # Calculate coverage mapping_coverage = (mapped_count / total_count) * 100.0 ``` ### Quality Metrics The Evaluator calculates metrics that inform the routing decision: - **Completeness Score:** Percentage of source elements mapped - **Average Confidence:** Mean confidence across all mappings - **Complex Mapping Ratio:** Percentage of mappings requiring transformation If any metric falls below threshold, the pipeline routes to Human Review. --- ## Node 4: Human Review (Interrupt-Based) **Purpose:** Pause execution for human validation of uncertain mappings. **Why Interrupt:** Database migrations are high-stakes operations—you can't just auto-approve uncertain mappings and hope for the best. But you also can't block a pipeline waiting for a human who might be in a meeting, on vacation, or reviewing other migrations. This is where LangGraph's [interrupt capability](https://langchain-ai.github.io/langgraph/concepts/human_in_the_loop/) becomes essential. The `interrupt()` function pauses execution, saves state, and waits for human input—even if that takes days. The pipeline doesn't crash, doesn't lose progress, and resumes exactly where it left off. ### The Interrupt Pattern ```python from langgraph.types import interrupt class HumanReviewNode(BaseNode): def execute(self, state: MigrationState) -> Dict[str, Any]: # Identify items requiring review review_items = self._identify_review_items( state['mappings'], state['confidence_scores'], state.get('mapping_rationale', {}) ) if review_items and state.get('review_mode') == 'ui': # This pauses execution and returns to caller human_feedback = interrupt({ "type": "human_review_required", "items": review_items, "summary": f"{len(review_items)} mappings need review" }) # This line only executes after human provides feedback return self._apply_corrections(state, human_feedback) ``` When [`interrupt()`](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/add-human-in-the-loop/) is called: 1. Graph state is automatically checkpointed 2. Execution pauses and returns `__interrupt__` to the caller 3. The caller presents a review UI to the human 4. Human provides feedback (approve, reject, modify) 5. Caller resumes with `Command(resume=feedback)` 6. `interrupt()` returns the feedback value and execution continues ### Review Item Selection Not every mapping needs review. The node filters based on confidence: ```python def _identify_review_items(self, mappings, confidence_scores, rationale): review_items = [] for mapping in mappings: source_key = f"{mapping['source_table']}.{mapping['source_column']}" confidence = confidence_scores.get(source_key, 0.5) if confidence < self.confidence_threshold: review_items.append({ 'mapping': mapping, 'confidence_score': confidence, 'rationale': rationale.get(source_key, ''), 'suggested_action': 'review' }) return sorted(review_items, key=lambda x: x['confidence_score']) ``` Low-confidence items are presented first, making review efficient. --- ## Node 5: Rectifier (AI-Powered) **Purpose:** Apply human corrections intelligently across similar patterns. **Why AI:** When a human corrects one mapping, the Rectifier uses AI to identify and fix similar patterns throughout the schema. This amplifies human effort. ### Correction Strategies The Rectifier tries multiple correction approaches in order: ```python def _correct_single_mapping(self, mapping, candidate, execution_id): correction_strategies = [ self._try_rule_based_correction, # Fast, deterministic fixes self._try_ai_powered_correction, # LLM-based reasoning self._try_pattern_based_correction # Historical pattern matching ] for strategy in correction_strategies: result = strategy(mapping, candidate, execution_id) if result['success']: return result ``` ### Rule-Based Corrections Common fixes are handled without AI: ```python def _try_rule_based_correction(self, mapping, candidate, execution_id): # Fix common table name mappings table_corrections = { 'customer': 'customers', 'account': 'accounts', 'bill': 'bills', 'payment': 'payments' } for wrong, correct in table_corrections.items(): if wrong in target_table.lower() and correct in canonical_schema.tables: corrected_mapping['target_table'] = correct corrections_applied.append(f"Corrected table name: {target_table} -> {correct}") ``` ### Learning from Corrections Every correction is stored for future use: ```python correction_log.append({ 'source_key': source_key, 'original_mapping': mapping, 'corrected_mapping': correction_result['corrected_mapping'], 'correction_method': correction_result['method'], 'confidence_before': candidate['confidence'], 'confidence_after': correction_result.get('confidence_after'), 'timestamp': datetime.now(timezone.utc).isoformat() }) ``` This log feeds back into the Learning Database, improving future migrations. --- ## Node 6: SQL Generator (AI-Powered) **Purpose:** Transform validated mappings into executable DuckDB SQL scripts. **Why AI:** SQL generation requires understanding transformation logic, handling edge cases, and producing syntactically correct code. LLMs excel at this. ### Mapping Analysis Before generating SQL, the node analyzes each mapping's complexity: ```python def _analyze_mappings(self, mappings, source_schema): for mapping in mappings: analysis = { 'complexity': 'simple', # simple, medium, complex 'transformation_requirements': [], 'data_type_conversion': None, 'validation_checks': [] } # Determine complexity based on transformation type if transformation_type in ['split', 'merge', 'calculate']: analysis['complexity'] = 'complex' elif transformation_type == 'convert': analysis['complexity'] = 'medium' ``` ### Data Type Conversion The node handles type conversions automatically: ```python def _get_conversion_function(self, source_type: str, target_type: str) -> str: conversion_map = { ('string', 'integer'): 'CAST({} AS INTEGER)', ('string', 'timestamp'): 'CAST({} AS TIMESTAMP)', ('string', 'boolean'): "CASE WHEN UPPER({}) IN ('TRUE', '1', 'YES') THEN TRUE ELSE FALSE END", ('timestamp', 'date'): 'CAST({} AS DATE)' } return conversion_map.get((source_type, target_type), f'CAST({{}} AS {target_type.upper()})') ``` ### Generated Output The SQL Generator produces two types of scripts: 1. **Transformation Scripts:** INSERT INTO ... SELECT statements 2. **Validation Queries:** COUNT and comparison queries to verify data integrity ```sql -- Transformation script for customers INSERT INTO customers ( account_number, customer_name, service_address ) SELECT source.CUST_ACCT_NBR, source.CUST_NAME, source.SERV_ADDR FROM source_customers source; -- Validation query SELECT COUNT(*) as source_count FROM source_customers; SELECT COUNT(*) as target_count FROM customers; ``` --- ## Node 7: Documentor (AI-Powered) **Purpose:** Generate comprehensive migration documentation automatically. **Why AI:** Documentation requires synthesizing information from multiple sources and presenting it clearly. This is a natural language generation task. ### 10 Document Types The Documentor generates a complete documentation package: 1. **Migration Overview:** Executive summary and project scope 2. **Source Schema Analysis:** Detailed source database documentation 3. **Mapping Specification:** Complete mapping rules and rationale 4. **Transformation Guide:** Step-by-step transformation procedures 5. **Data Quality Report:** Quality metrics and issue analysis 6. **Validation Checklist:** Pre/during/post-migration validation steps 7. **Deployment Instructions:** Environment setup and execution guide 8. **Rollback Procedures:** Emergency recovery procedures 9. **Troubleshooting Guide:** Common issues and resolutions 10. **Technical Reference:** API and configuration documentation ### Parallel Generation For performance, documents are generated in parallel using async: ```python async def generate_all_documents(self, context, execution_id): tasks = [] for doc_type in self.DOCUMENT_TYPES: task = asyncio.create_task( self._generate_document_async(doc_type, context, execution_id) ) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) ``` ### Mermaid Diagrams The Documentor also generates visual diagrams: ```python def _create_source_schema_diagram(self, source_schema: DatabaseSchema) -> str: mermaid = "```mermaid\nerDiagram\n" for table in source_schema.tables: mermaid += f" {table.name} {{\n" for column in table.columns: key_indicator = " PK" if column.primary_key else "" mermaid += f" {column.data_type} {column.name}{key_indicator}\n" mermaid += " }\n" mermaid += "```" return mermaid ``` --- ## Node 8: Validation (Deterministic) **Purpose:** Final quality assurance before output. **Why Deterministic:** Final validation is a checklist, not a judgment call. Either the SQL is syntactically valid or it isn't. ### SQL Syntax Validation ```python def _validate_sql_syntax(self, sql_script: str) -> Dict[str, Any]: validation_result = {'valid': True, 'errors': []} # Check for required SQL keywords required_keywords = ['INSERT INTO', 'SELECT', 'UPDATE', 'CREATE'] has_required_keyword = any(keyword in sql_upper for keyword in required_keywords) # Check for balanced parentheses if sql_script.count('(') != sql_script.count(')'): validation_result['errors'].append("Unbalanced parentheses") # Check for dangerous patterns dangerous_patterns = ['DROP TABLE', 'DROP DATABASE', 'TRUNCATE'] for pattern in dangerous_patterns: if pattern in sql_upper: validation_result['errors'].append(f"Dangerous pattern: {pattern}") ``` ### Documentation Completeness ```python def _validate_documentation_completeness(self, documentation): required_documents = [ 'migration_overview', 'source_schema_analysis', 'mapping_specification', 'transformation_guide', 'data_quality_report', 'validation_checklist', 'deployment_instructions', 'rollback_procedures', 'troubleshooting_guide', 'technical_reference' ] missing = [doc for doc in required_documents if doc not in documentation] completeness_score = (len(required_documents) - len(missing)) / len(required_documents) * 100 ``` ### Overall Status The Validation node determines the final pipeline status: - **PASS:** All validations successful - **WARNING:** Minor issues detected but output is usable - **FAIL:** Critical issues that block output --- ## Conditional Routing: The Glue Traditional workflow engines force you into rigid, predefined paths. But AI pipelines are inherently unpredictable—a mapping might be perfect or terrible, and you won't know until the LLM responds. This is where LangGraph's [conditional edges](https://langchain-ai.github.io/langgraph/concepts/low_level/#conditional-edges) become invaluable. They let the data drive the flow, routing based on runtime state rather than static configuration. The key insight: **routing decisions should be based on runtime state, not static configuration**. Our pipeline routes based on confidence scores, completeness metrics, and quality thresholds that are only known after each node executes. Here's how we wire up the graph using LangGraph's [`StateGraph`](https://langchain-ai.github.io/langgraph/reference/graphs/#langgraph.graph.StateGraph): ```python def _build_graph(self) -> StateGraph: workflow = StateGraph(MigrationState) # Add all nodes workflow.add_node("discovery", self._discovery_wrapper) workflow.add_node("mapper", self._mapper_wrapper) # ... other nodes # Linear edges workflow.add_edge("discovery", "mapper") workflow.add_edge("mapper", "evaluator") # Conditional edge from evaluator workflow.add_conditional_edges( "evaluator", self._should_trigger_human_review, { "human_review": "human_review", "sql_generator": "sql_generator" } ) ``` The routing function examines state and returns the next node. Notice how it checks multiple conditions—this is where business logic meets AI uncertainty: ```python def _should_trigger_human_review(self, state: MigrationState) -> str: confidence_scores = state.get("confidence_scores", {}) if confidence_scores: avg_confidence = sum(confidence_scores.values()) / len(confidence_scores) if avg_confidence < self.config.confidence_threshold: return "human_review" completeness_score = state.get("completeness_score", 1.0) if completeness_score < 0.9: return "human_review" return "sql_generator" ``` Why not hardcode the threshold? Because different migrations have different risk tolerances. A small internal migration might accept 0.70 confidence. A large customer migration might require 0.90. The threshold is configurable, but the routing logic is consistent. This pattern also enables **graceful degradation**. If the LLM produces low-confidence mappings, we don't fail—we route to humans. If humans aren't available, we can queue for later. The pipeline adapts to conditions rather than breaking. --- ## Performance Monitoring You can't improve what you don't measure. But more importantly: **AI pipelines degrade silently**. A prompt that worked last month might produce worse results today due to model updates, data drift, or subtle changes in input patterns. Without monitoring, you won't notice until customers complain. We wrap every node with performance tracking that captures both speed and quality: ```python def create_performance_wrapper(node_name: str, node_func): def wrapper(state: MigrationState) -> Dict[str, Any]: node_start_time = time.time() result = node_func(state) execution_time = time.time() - node_start_time # Track performance observability_manager.track_node_execution( node_name=node_name, execution_time=execution_time, success=True, confidence_score=result.get("confidence_score") ) # Check for regression regression_detector.check_for_regression( f"node_duration_{node_name}", execution_time ) return result return wrapper ``` The wrapper pattern keeps monitoring concerns separate from business logic. Nodes don't know they're being monitored—they just do their job. This makes testing easier and keeps the codebase clean. What we track and why: - **Per-node execution times:** Identifies bottlenecks. If the Mapper suddenly takes 3x longer, something changed—maybe the schema is larger, maybe the LLM is slower, maybe we have a bug. - **Success/failure rates:** Catches systematic issues. A node that fails 10% of the time needs investigation. - **Confidence score trends:** The canary in the coal mine. Declining confidence often precedes quality issues. - **Automatic regression detection:** Alerts when metrics deviate from baselines. We don't wait for users to report problems. The regression detector deserves special mention. It maintains rolling baselines and flags when current performance deviates significantly. A 20% slowdown in the SQL Generator might be acceptable during a large migration, but a 200% slowdown suggests something is wrong. --- ## Checkpointing: Surviving Failures Real-world migrations fail. Networks timeout, services restart, humans go home for the day. Without durable state, you'd lose hours of LLM processing and have to start over. LangGraph's [persistence system](https://langchain-ai.github.io/langgraph/concepts/persistence/) solves this by automatically saving state after each node completes. For our pipeline, this means a migration can survive infrastructure failures, service deployments, and multi-day human review cycles without losing progress: ```python from langgraph.checkpoint.memory import MemorySaver # For development checkpointer = MemorySaver() # For production (DynamoDB) checkpointer = DynamoDBCheckpointSaver( table_name="migration-checkpoints", region="us-east-1", ttl_days=30 ) # Compile graph with checkpointing graph = workflow.compile(checkpointer=checkpointer) ``` Benefits: - **Resume after failure:** Pick up exactly where you left off—no re-running expensive LLM calls - **[Time travel](https://langchain-ai.github.io/langgraph/concepts/time-travel/) debugging:** When something goes wrong, you can inspect state at any previous checkpoint and even replay from any point. This is invaluable for debugging AI pipelines where issues might only manifest several nodes downstream - **Human review persistence:** Reviews can take days without losing progress—the pipeline waits patiently --- ## Bedrock Integration: Resilient LLM Calls Production LLM pipelines need to handle failures gracefully. Bedrock APIs can timeout, hit rate limits, or experience transient errors. Our `BedrockLLMClient` wraps all LLM calls with comprehensive retry logic. ### Exponential Backoff with Fallback Models Every LLM invocation uses exponential backoff with configurable parameters: ```python class BedrockLLMClient: def __init__( self, model_id: str = "us.anthropic.claude-3-7-sonnet-20250219-v1:0", region: str = "us-east-1", max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0 ): self.model_id = model_id self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay # Fallback models in order of preference self.fallback_models = [ "us.anthropic.claude-3-haiku-20240307-v1:0", "anthropic.claude-3-sonnet-20240229-v1:0" ] ``` The retry logic handles different error types intelligently: ```python def invoke_with_retry(self, messages, max_retries=None, **kwargs): for attempt in range(max_retries + 1): try: response = self.llm.invoke(messages, **kwargs) return response except (ClientError, BotoCoreError) as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') # Don't retry on validation or access errors if error_code in ['ValidationException', 'AccessDeniedException']: raise e # Try fallback model on model-specific errors if error_code in ['ModelNotReadyException', 'ModelTimeoutException']: self._switch_to_fallback_model() continue # Exponential backoff for transient errors delay = min(self.base_delay * (2 ** attempt), self.max_delay) time.sleep(delay) ``` The backoff formula `min(base_delay * 2^attempt, max_delay)` means: - Attempt 1: 1 second delay - Attempt 2: 2 second delay - Attempt 3: 4 second delay - Capped at 60 seconds maximum ### Cost Tracking Every LLM call tracks token usage and calculates costs in real-time: ```python @dataclass class CostTracker: # Claude 3.5 Sonnet pricing (per 1K tokens) SONNET_INPUT_COST = 0.003 SONNET_OUTPUT_COST = 0.015 # Claude 3 Haiku pricing (per 1K tokens) HAIKU_INPUT_COST = 0.00025 HAIKU_OUTPUT_COST = 0.00125 def calculate_cost(self, model_id, input_tokens, output_tokens): if "sonnet" in model_id.lower(): input_cost = (input_tokens / 1000) * self.SONNET_INPUT_COST output_cost = (output_tokens / 1000) * self.SONNET_OUTPUT_COST # ... model-specific pricing ``` This lets us monitor costs per migration and per node, catching runaway token usage before it becomes expensive. --- ## Prompt Management: Versioning and Performance Tracking Prompts are code. They need versioning, testing, and performance tracking just like any other code artifact. Our `PromptManager` treats prompts as first-class citizens. ### Versioned Prompt Storage Each prompt is stored as a JSON file with full version history: ```json { "prompt_name": "schema_mapping", "versions": [ { "version": "v1.0", "template": "Please create semantic mappings for...", "system_prompt": "You are an expert database schema mapping specialist...", "variables": ["source_context", "canonical_context", "historical_context"], "description": "Initial schema mapping prompt", "created_at": "2025-12-11T04:27:31.581920+00:00", "performance_metrics": {}, "usage_count": 0, "success_rate": 0.0, "hash": "d8bd62b5ee76fc39" }, { "version": "v1.1", "template": "Analyze and map the following source database schema...", "system_prompt": "You are an expert database schema mapping specialist for utility billing...", "performance_metrics": { "avg_confidence": 0.85, "avg_execution_time": 12.3 }, "usage_count": 247, "success_rate": 0.89 } ] } ``` The hash is calculated from the prompt content, making it easy to detect changes: ```python def calculate_hash(self) -> str: content = f"{self.system_prompt}|{self.template}" return hashlib.sha256(content.encode()).hexdigest()[:16] ``` ### Performance Tracking Every prompt invocation updates performance metrics using an exponential moving average: ```python def update_prompt_performance(self, prompt_name, version, success, confidence_score, execution_time): prompt_version = self.get_prompt_version(prompt_name, version) # Update usage count prompt_version.usage_count += 1 # Update success rate using exponential moving average alpha = 0.1 # Learning rate new_success = 1.0 if success else 0.0 prompt_version.success_rate = (1 - alpha) * prompt_version.success_rate + alpha * new_success # Track average confidence and execution time if confidence_score is not None: current_avg = prompt_version.performance_metrics.get('avg_confidence', confidence_score) prompt_version.performance_metrics['avg_confidence'] = ( (current_avg * (prompt_version.usage_count - 1) + confidence_score) / prompt_version.usage_count ) ``` ### Automatic Best Version Selection When retrieving a prompt, the system can automatically select the best-performing version: ```python def get_best_prompt_version(self, prompt_name) -> PromptVersion: versions = self.list_prompt_versions(prompt_name) # Filter versions with sufficient usage (at least 5 uses) tested_versions = [v for v in versions if v.usage_count >= 5] if not tested_versions: return versions[-1] # Return latest if none tested # Score based on success rate (70%) and confidence (30%) def score_version(version): success_weight = 0.7 confidence_weight = 0.3 return (success_weight * version.success_rate + confidence_weight * version.performance_metrics.get('avg_confidence', 0.5)) return max(tested_versions, key=score_version) ``` This enables gradual prompt improvement—new versions are tested alongside existing ones, and the best performer wins. ### A/B Testing Support The prompt manager supports comparing versions: ```python comparison = prompt_manager.compare_prompt_versions( prompt_name="schema_mapping", version_a="v1.0", version_b="v1.1" ) # Returns: { "prompt_name": "schema_mapping", "version_a": {"version": "v1.0", "success_rate": 0.72, ...}, "version_b": {"version": "v1.1", "success_rate": 0.89, ...}, "winner": "v1.1", "confidence": 0.17 # Difference in success rates } ``` --- ## Key Takeaways Building this pipeline taught us several lessons: 1. **Hybrid is better than pure AI.** Use LLMs for semantic tasks, deterministic logic for validation. Don't force AI where rules work better. 2. **Local embeddings are a game-changer.** Running `sentence-transformers` locally gives you 1,300+ columns/second at zero cost. Cloud embeddings should be a fallback, not the default. 3. **Confidence scoring is essential.** Without calibrated uncertainty, you can't route intelligently. Invest in getting this right.