AI & Machine Learning Engineering

Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines

MatterAI Agent
MatterAI Agent
14 min read·

Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines

Self-correcting loops enable AI agents to iteratively improve their outputs through automated feedback and retry mechanisms. This guide covers implementing these patterns using LangGraph's StateGraph architecture and CrewAI's agent orchestration capabilities.

Core Architecture

LangGraph provides low-level workflow control through StateGraph and conditional edges. CrewAI handles high-level agent orchestration and task delegation. Combined, they create robust self-correcting systems.

State Management Pattern

Define persistent state to track iteration counts, error logs, intermediate results, and token usage:

from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import RateLimitError, APIConnectionError

llm = ChatOpenAI(model="gpt-4")

class AgentState(TypedDict):
    messages: List[str]
    iterations: int
    errors: List[str]
    current_output: Optional[str]
    should_retry: bool
    total_tokens: int
    quality_score: float

LangGraph Self-Correcting Implementation

Safe Code Validation

Replace direct code execution with AST-based validation to prevent injection attacks:

import ast
from typing import Tuple

ALLOWED_MODULES = {"math", "json", "re", "collections", "itertools", "functools"}

def validate_code_safety(code: str) -> Tuple[bool, str]:
    """Validate code safety using AST analysis. Returns (is_safe, error_message)."""
    if not code or not code.strip():
        return False, "Empty code provided"
    
    try:
        tree = ast.parse(code)
    except SyntaxError as e:
        return False, f"Syntax error: {e}"
    
    dangerous_nodes = []
    
    for node in ast.walk(tree):
        # Block dangerous function calls
        if isinstance(node, ast.Call):
            if isinstance(node.func, ast.Name):
                func_name = node.func.id
                if func_name in {"exec", "eval", "compile", "open", "input", "__import__"}:
                    dangerous_nodes.append(f"Forbidden function: {func_name}")
        
        # Block imports not in allowlist
        if isinstance(node, (ast.Import, ast.ImportFrom)):
            module_name = node.module if isinstance(node, ast.ImportFrom) else node.names[0].name
            root_module = module_name.split(".")[0] if module_name else ""
            if root_module not in ALLOWED_MODULES:
                dangerous_nodes.append(f"Import not allowed: {module_name}")
        
        # Block access to dangerous dunder methods only (not all underscore-prefixed)
        if isinstance(node, ast.Attribute):
            if node.attr.startswith("__") and node.attr.endswith("__"):
                dangerous_nodes.append(f"Access to dunder method: {node.attr}")
    
    if dangerous_nodes:
        return False, "; ".join(dangerous_nodes)
    
    return True, ""

def execute_validated_code(code: str, test_inputs: dict = None) -> dict:
    """Safely execute validated code in restricted namespace."""
    is_safe, error = validate_code_safety(code)
    if not is_safe:
        raise ValueError(f"Code validation failed: {error}")
    
    # Execute in isolated namespace
    execution_namespace = {"__builtins__": {}}
    try:
        exec(code, execution_namespace)
        return {"success": True, "namespace": execution_namespace}
    except Exception as e:
        return {"success": False, "error": str(e)}

Resilient LLM Calls with Retry Logic

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((RateLimitError, APIConnectionError)),
    reraise=True
)
def invoke_llm_with_retry(prompt: str) -> str:
    """Invoke LLM with exponential backoff for rate limits and network errors."""
    response = llm.invoke(prompt)
    return response.content

def generate_node(state: AgentState) -> AgentState:
    """Generate solution based on current state."""
    try:
        if state["iterations"] == 0:
            output = invoke_llm_with_retry(state["messages"][-1])
        else:
            error_context = "\n".join(state["errors"])
            prompt = f"Previous errors: {error_context}\nTask: {state['messages'][-1]}"
            output = invoke_llm_with_retry(prompt)
        
        # Track token usage
        token_estimate = len(output.split()) * 1.3  # Rough approximation
        
        return {
            **state,
            "current_output": output,
            "should_retry": False,
            "total_tokens": state.get("total_tokens", 0) + int(token_estimate)
        }
    except (RateLimitError, APIConnectionError) as e:
        return {
            **state,
            "should_retry": True,
            "errors": state["errors"] + [f"API Error: {str(e)}"]
        }

def validate_node(state: AgentState) -> AgentState:
    """Validate output and determine if correction needed."""
    try:
        result = validate_code_safety(state["current_output"])
        if result[0]:
            return {
                **state,
                "should_retry": False,
                "errors": []
            }
        else:
            return {
                **state,
                "should_retry": True,
                "errors": state["errors"] + [result[1]]
            }
    except Exception as e:
        return {
            **state,
            "should_retry": True,
            "errors": state["errors"] + [str(e)]
        }

Conditional Edge Logic

Implement self-correction loop using conditional edges:

def should_continue(state: AgentState) -> str:
    """Determine if loop should continue or end."""
    max_iterations = 5
    max_tokens = 50000
    
    if state["iterations"] >= max_iterations:
        return "end"
    if state.get("total_tokens", 0) >= max_tokens:
        return "end"
    elif state["should_retry"]:
        return "generate"
    else:
        return "end"

def increment_iterations(state: AgentState) -> AgentState:
    return {**state, "iterations": state["iterations"] + 1}

# Build workflow
workflow = StateGraph(AgentState)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)
workflow.add_node("increment", increment_iterations)

workflow.add_edge("__start__", "generate")
workflow.add_conditional_edges(
    "generate",
    lambda state: "validate" if state.get("current_output") else "end",
    {
        "validate": "validate",
        "end": "__end__"
    }
)
workflow.add_edge("validate", "increment")
workflow.add_conditional_edges(
    "increment",
    should_continue,
    {
        "generate": "generate",
        "end": "__end__"
    }
)

CrewAI Integration

CrewAI as LangGraph Node

Embed CrewAI crews as executable nodes within LangGraph workflows. Note that Agent parameters must be passed as keyword arguments:

from crewai import Agent, Task, Crew

def crewai_node(state: AgentState) -> AgentState:
    """Execute CrewAI workflow as graph node."""
    # Define agents with explicit keyword arguments
    researcher = Agent(
        role="Researcher",
        goal="Gather relevant information",
        backstory="Expert researcher with web search capabilities",
        verbose=False
    )
    
    writer = Agent(
        role="Writer",
        goal="Create comprehensive content",
        backstory="Technical writer specializing in AI topics",
        verbose=False
    )
    
    # Define tasks with pre-existing agent references
    research_task = Task(
        description=f"Research: {state['messages'][-1]}",
        expected_output="Comprehensive research summary",
        agent=researcher
    )
    
    writing_task = Task(
        description="Write comprehensive content based on research",
        expected_output="Well-structured technical content",
        agent=writer
    )
    
    crew = Crew(
        agents=[researcher, writer],
        tasks=[research_task, writing_task],
        verbose=False
    )
    
    try:
        result = crew.kickoff()
        # Properly handle CrewOutput object - preserve metadata
        output_text = result.raw if hasattr(result, 'raw') else str(result)
        return {
            **state,
            "current_output": output_text,
            "should_retry": False
        }
    except Exception as e:
        return {
            **state,
            "errors": state["errors"] + [f"CrewAI error: {str(e)}"],
            "should_retry": True
        }

Self-Correction Patterns

Error-Driven Correction

Implement feedback loops that learn from execution failures:

def analyze_errors(errors: List[str]) -> List[str]:
    """Classify error types from error messages."""
    patterns = []
    for error in errors:
        error_lower = error.lower()
        if "syntax" in error_lower or "syntaxerror" in error_lower:
            patterns.append("syntax_error")
        elif "logic" in error_lower or "assertion" in error_lower:
            patterns.append("logic_error")
        elif "timeout" in error_lower:
            patterns.append("timeout_error")
        elif "api" in error_lower or "rate" in error_lower:
            patterns.append("api_error")
        else:
            patterns.append("unknown_error")
    return list(set(patterns))

def adaptive_correction(state: AgentState) -> AgentState:
    """Adapt correction strategy based on error patterns."""
    error_patterns = analyze_errors(state["errors"])
    
    if "syntax_error" in error_patterns:
        correction_prompt = "Fix syntax errors and validate code structure"
    elif "logic_error" in error_patterns:
        correction_prompt = "Review algorithm logic and edge cases"
    elif "api_error" in error_patterns:
        correction_prompt = "Handle API errors gracefully with fallbacks"
    else:
        correction_prompt = "General improvement and optimization"
    
    enhanced_prompt = f"{state['messages'][-1]}\nCorrection focus: {correction_prompt}"
    
    return {
        **state,
        "messages": state["messages"] + [enhanced_prompt]
    }

Quality Gates with LLM-as-Judge

Replace arbitrary scoring with structured LLM evaluation:

from langchain_core.prompts import ChatPromptTemplate
import json
import re

QUALITY_RUBRIC = """
Evaluate the following code output on a scale of 0-10 for each criterion:
1. Correctness: Does it solve the stated problem?
2. Readability: Is it well-structured and documented?
3. Efficiency: Does it use appropriate algorithms?
4. Safety: Does it handle edge cases and errors?

Output format: {"correctness": N, "readability": N, "efficiency": N, "safety": N, "overall": N}
"""

def parse_json_safely(json_str: str) -> dict:
    """Safely parse JSON with robust error handling."""
    try:
        # Try direct JSON parsing first
        return json.loads(json_str)
    except json.JSONDecodeError:
        try:
            # Try extracting JSON from markdown code blocks
            json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', json_str, re.DOTALL)
            if json_match:
                return json.loads(json_match.group(1))
        except (json.JSONDecodeError, AttributeError):
            pass
        
        try:
            # Try finding JSON-like structure in text
            json_match = re.search(r'({.*})', json_str, re.DOTALL)
            if json_match:
                return json.loads(json_match.group(1))
        except (json.JSONDecodeError, AttributeError):
            pass
    
    # Return fallback if all parsing attempts fail
    return {"overall": 5.0, "reason": "JSON parsing failed"}

def evaluate_quality_llm(output: str, original_task: str) -> dict:
    """Use LLM to evaluate output quality against rubric."""
    if not output or len(output.strip()) == 0:
        return {"overall": 0.0, "reason": "Empty output"}
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", QUALITY_RUBRIC),
        ("human", f"Original task: {original_task}\n\nOutput to evaluate:\n{output}")
    ])
    
    try:
        response = llm.invoke(prompt.format_messages())
        content = response.content
        
        # Parse JSON response with robust error handling
        scores = parse_json_safely(content)
        return scores
    except Exception as e:
        # Fallback to basic heuristics
        return {"overall": 5.0, "reason": f"LLM evaluation failed: {str(e)}"}

def quality_gate(state: AgentState) -> AgentState:
    """Evaluate output quality and decide on continuation."""
    quality_result = evaluate_quality_llm(
        state["current_output"],
        state["messages"][0]
    )
    
    quality_score = quality_result.get("overall", 0) / 10.0
    
    if quality_score >= 0.8:
        return {**state, "should_retry": False, "quality_score": quality_score}
    elif state["iterations"] < 3:
        return {**state, "should_retry": True, "quality_score": quality_score}
    else:
        return {**state, "should_retry": False, "quality_score": quality_score}

Observability and Monitoring

LangSmith Integration

Enable tracing for production debugging:

import os

# Configure LangSmith tracing
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "agentic-workflows"

# LangSmith automatically captures traces when environment variables are set
# Access traces at https://smith.langchain.com

OpenTelemetry for Custom Metrics

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Configure OpenTelemetry
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

def traced_generate_node(state: AgentState) -> AgentState:
    with tracer.start_as_current_span("generate_node") as span:
        span.set_attribute("iterations", state["iterations"])
        result = generate_node(state)
        span.set_attribute("output_length", len(result.get("current_output", "")))
        return result

Cost Management

Track and limit token usage across iterations:

class CostManagedState(TypedDict):
    messages: List[str]
    iterations: int
    errors: List[str]
    current_output: Optional[str]
    should_retry: bool
    total_tokens: int
    max_tokens: int
    estimated_cost_usd: float

# Pricing per 1K tokens (adjust for your model)
TOKEN_COSTS = {
    "gpt-4": {"input": 0.03, "output": 0.06},
    "gpt-3.5-turbo": {"input": 0.001, "output": 0.002}
}

def calculate_cost(tokens: int, model: str = "gpt-4") -> float:
    """Calculate estimated cost in USD."""
    rates = TOKEN_COSTS.get(model, TOKEN_COSTS["gpt-4"])
    return (tokens / 1000) * rates["input"]

def cost_aware_should_continue(state: CostManagedState) -> str:
    """Check iteration limits and cost thresholds."""
    max_iterations = 5
    max_cost_usd = 1.0
    
    current_cost = calculate_cost(state.get("total_tokens", 0))
    
    if state["iterations"] >= max_iterations:
        return "end"
    if current_cost >= max_cost_usd:
        return "end"
    if state.get("total_tokens", 0) >= state.get("max_tokens", 50000):
        return "end"
    elif state["should_retry"]:
        return "generate"
    else:
        return "end"

Production Deployment

State Persistence for Distributed Systems

Use database-backed checkpointers for production deployments:

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
import asyncpg
import asyncio

# Development: In-memory checkpointer
def get_dev_checkpointer():
    return InMemorySaver()

# Production: PostgreSQL-backed checkpointer for distributed systems
async def get_production_checkpointer(db_uri: str):
    """
    Production checkpointer with PostgreSQL backend using modern async API.
    Enables state persistence across multiple instances.
    """
    # Modern LangGraph uses async PostgresSaver
    checkpointer = PostgresSaver.from_conn_string(db_uri)
    await checkpointer.setup()  # Run migrations asynchronously
    return checkpointer

# Configuration
DB_URI = "postgresql://user:password@localhost:5432/langgraph_state"

# Compile with appropriate checkpointer
async def setup_production_workflow():
    checkpointer = await get_production_checkpointer(DB_URI)
    production_workflow = workflow.compile(checkpointer=checkpointer)
    return production_workflow

# Execute with state persistence
initial_state = {
    "messages": ["Write a Python function to calculate fibonacci numbers"],
    "iterations": 0,
    "errors": [],
    "current_output": None,
    "should_retry": False,
    "total_tokens": 0,
    "quality_score": 0.0
}

# Usage example
async def run_workflow():
    production_workflow = await setup_production_workflow()
    result = await production_workflow.ainvoke(
        initial_state,
        config={"configurable": {"thread_id": "production_session_1"}}
    )
    return result

Thread ID Management for State Recovery

import uuid
import hashlib
from datetime import datetime

def create_thread_id(user_id: str, session_id: str = None, timestamp: str = None) -> str:
    """
    Generate deterministic thread ID for state recovery with collision prevention.
    Handles edge cases for concurrent sessions and empty inputs.
    """
    # Handle edge cases
    if not user_id or not isinstance(user_id, str):
        raise ValueError("user_id must be a non-empty string")
    
    if not session_id:
        session_id = str(uuid.uuid4())
    
    if not timestamp:
        timestamp = datetime.utcnow().isoformat()
    
    # Create unique identifier to prevent collisions
    unique_string = f"{user_id}_{session_id}_{timestamp}"
    
    # Use hash for consistent length and collision resistance
    thread_hash = hashlib.sha256(unique_string.encode()).hexdigest()[:16]
    
    return f"{user_id}_{thread_hash}"

def resume_workflow(thread_id: str, checkpointer, production_workflow):
    """Resume a previously interrupted workflow with proper error handling."""
    try:
        # Retrieve last state with error handling
        config = {"configurable": {"thread_id": thread_id}}
        state_history = list(checkpointer.list(config))
        
        if state_history:
            # Get the most recent checkpoint
            latest_checkpoint = max(state_history, key=lambda x: x.timestamp)
            return production_workflow.ainvoke(
                latest_checkpoint.values,
                config=config
            )
        else:
            raise ValueError(f"No checkpoint history found for thread_id: {thread_id}")
    except Exception as e:
        raise RuntimeError(f"Failed to resume workflow: {str(e)}")

Implementation Best Practices

  1. State Persistence: Use modern PostgresSaver with async API for distributed production systems; InMemorySaver only for development
  2. Error Classification: Categorize errors to apply targeted corrections
  3. Iteration Limits: Set maximum retry attempts and cost thresholds to prevent runaway workflows
  4. Quality Metrics: Use LLM-as-judge with structured rubrics and robust JSON parsing
  5. Cost Management: Track token usage and implement cost limits per workflow
  6. Observability: Enable LangSmith tracing and OpenTelemetry metrics for production debugging
  7. Security: Never execute unvalidated code; use AST analysis with appropriate underscore handling
  8. Thread Safety: Implement robust thread ID management for concurrent sessions
  9. Output Handling: Preserve CrewAI metadata by accessing .raw attribute instead of string casting
  10. Error Handling: Implement comprehensive error handling for all external API calls

Share this Guide:

Ready to Supercharge Your Development Workflow?

Join thousands of engineering teams using MatterAI to accelerate code reviews, catch bugs earlier, and ship faster.

No Credit Card Required
SOC 2 Type 2 Certified
Setup in 2 Minutes
Enterprise Security
4.9/5 Rating
2500+ Developers