DAG Examples¶
This guide provides comprehensive examples of DAG patterns, including node reuse, complex routing, and advanced use cases.
Overview¶
DAGs in Intent Kit support: - Node Reuse - Share nodes across multiple execution paths - Complex Routing - Conditional logic and branching - Parallel Execution - Multiple paths executed simultaneously - Context Propagation - Rich context flows through all paths
Basic DAG Patterns¶
Simple Linear Flow¶
from intent_kit import DAGBuilder, run_dag
from intent_kit.core.context import DefaultContext
def greet(name: str, **kwargs) -> str:
return f"Hello {name}!"
def get_weather(city: str, **kwargs) -> str:
return f"Weather in {city} is sunny"
# Create DAG
builder = DAGBuilder()
builder.with_default_llm_config({
"provider": "openrouter",
"model": "google/gemma-2-9b-it"
})
# Add nodes
builder.add_node("classifier", "classifier",
output_labels=["greet", "weather"],
description="Route to appropriate action")
builder.add_node("extract_name", "extractor",
param_schema={"name": str},
description="Extract name from greeting")
builder.add_node("extract_city", "extractor",
param_schema={"city": str},
description="Extract city from weather request")
builder.add_node("greet_action", "action",
action=greet,
description="Greet the user")
builder.add_node("weather_action", "action",
action=get_weather,
description="Get weather information")
# Connect nodes
builder.add_edge("classifier", "extract_name", "greet")
builder.add_edge("extract_name", "greet_action", "success")
builder.add_edge("classifier", "extract_city", "weather")
builder.add_edge("extract_city", "weather_action", "success")
builder.set_entrypoints(["classifier"])
# Build and execute
dag = builder.build()
context = DefaultContext()
result, context = run_dag(dag, "Hello Alice")
print(result.data) # "Hello Alice!"
result, context = run_dag(dag, "What's the weather in New York?", context)
print(result.data) # "Weather in New York is sunny"
Node Reuse Patterns¶
Shared Extractor¶
from intent_kit import DAGBuilder
def book_flight(origin: str, destination: str, date: str) -> str:
return f"Flight booked from {origin} to {destination} on {date}"
def book_hotel(city: str, check_in: str, check_out: str) -> str:
return f"Hotel booked in {city} from {check_in} to {check_out}"
# Create DAG with shared extractor
builder = DAGBuilder()
# Shared location extractor used by multiple paths
builder.add_node("extract_location", "extractor",
param_schema={"location": str},
description="Extract location information",
output_key="location_params")
# Flight booking path
builder.add_node("flight_classifier", "classifier",
output_labels=["book_flight"],
description="Classify flight booking requests")
builder.add_node("extract_flight_details", "extractor",
param_schema={"origin": str, "destination": str, "date": str},
description="Extract flight details")
builder.add_node("flight_action", "action",
action=book_flight,
description="Book flight")
# Hotel booking path
builder.add_node("hotel_classifier", "classifier",
output_labels=["book_hotel"],
description="Classify hotel booking requests")
builder.add_node("extract_hotel_details", "extractor",
param_schema={"city": str, "check_in": str, "check_out": str},
description="Extract hotel details")
builder.add_node("hotel_action", "action",
action=book_hotel,
description="Book hotel")
# Connect with shared extractor
builder.add_edge("flight_classifier", "extract_location", "book_flight")
builder.add_edge("extract_location", "extract_flight_details", "success")
builder.add_edge("extract_flight_details", "flight_action", "success")
builder.add_edge("hotel_classifier", "extract_location", "book_hotel")
builder.add_edge("extract_location", "extract_hotel_details", "success")
builder.add_edge("extract_hotel_details", "hotel_action", "success")
builder.set_entrypoints(["flight_classifier", "hotel_classifier"])
Shared Classifier¶
from intent_kit import DAGBuilder
def process_order(product: str, quantity: int) -> str:
return f"Order processed: {quantity} x {product}"
def check_inventory(product: str) -> str:
return f"Inventory check for {product}: In stock"
def calculate_price(product: str, quantity: int) -> str:
return f"Price for {quantity} x {product}: $99.99"
# Create DAG with shared classifier
builder = DAGBuilder()
# Shared classifier for product-related requests
builder.add_node("product_classifier", "classifier",
output_labels=["order", "inventory", "price"],
description="Classify product-related requests")
# Shared product extractor
builder.add_node("extract_product", "extractor",
param_schema={"product": str},
description="Extract product information")
# Order processing path
builder.add_node("extract_order_details", "extractor",
param_schema={"product": str, "quantity": int},
description="Extract order details")
builder.add_node("order_action", "action",
action=process_order,
description="Process order")
# Inventory check path
builder.add_node("inventory_action", "action",
action=check_inventory,
description="Check inventory")
# Price calculation path
builder.add_node("extract_price_details", "extractor",
param_schema={"product": str, "quantity": int},
description="Extract price calculation details")
builder.add_node("price_action", "action",
action=calculate_price,
description="Calculate price")
# Connect with shared classifier and extractor
builder.add_edge("product_classifier", "extract_product", "order")
builder.add_edge("extract_product", "extract_order_details", "success")
builder.add_edge("extract_order_details", "order_action", "success")
builder.add_edge("product_classifier", "extract_product", "inventory")
builder.add_edge("extract_product", "inventory_action", "success")
builder.add_edge("product_classifier", "extract_product", "price")
builder.add_edge("extract_product", "extract_price_details", "success")
builder.add_edge("extract_price_details", "price_action", "success")
builder.set_entrypoints(["product_classifier"])
Complex Routing Patterns¶
Conditional Branching¶
from intent_kit import DAGBuilder
def process_urgent_request(details: str) -> str:
return f"URGENT: {details} - Escalated to priority queue"
def process_normal_request(details: str) -> str:
return f"Normal: {details} - Added to standard queue"
def process_low_priority_request(details: str) -> str:
return f"Low priority: {details} - Added to background queue"
# Create DAG with conditional branching
builder = DAGBuilder()
# Main classifier
builder.add_node("request_classifier", "classifier",
output_labels=["urgent", "normal", "low"],
description="Classify request priority")
# Priority-specific extractors
builder.add_node("extract_urgent_details", "extractor",
param_schema={"details": str, "priority": str},
description="Extract urgent request details")
builder.add_node("extract_normal_details", "extractor",
param_schema={"details": str},
description="Extract normal request details")
builder.add_node("extract_low_details", "extractor",
param_schema={"details": str},
description="Extract low priority request details")
# Priority-specific actions
builder.add_node("urgent_action", "action",
action=process_urgent_request,
description="Process urgent request")
builder.add_node("normal_action", "action",
action=process_normal_request,
description="Process normal request")
builder.add_node("low_action", "action",
action=process_low_priority_request,
description="Process low priority request")
# Connect with conditional branching
builder.add_edge("request_classifier", "extract_urgent_details", "urgent")
builder.add_edge("extract_urgent_details", "urgent_action", "success")
builder.add_edge("request_classifier", "extract_normal_details", "normal")
builder.add_edge("extract_normal_details", "normal_action", "success")
builder.add_edge("request_classifier", "extract_low_details", "low")
builder.add_edge("extract_low_details", "low_action", "success")
builder.set_entrypoints(["request_classifier"])
Multi-Step Validation¶
from intent_kit import DAGBuilder
def validate_user(user_id: str) -> str:
return f"User {user_id} validated"
def validate_payment(payment_info: str) -> str:
return f"Payment {payment_info} validated"
def process_purchase(user_id: str, product: str, payment_info: str) -> str:
return f"Purchase completed: {product} for user {user_id}"
def request_authentication() -> str:
return "Please authenticate to continue"
def request_payment_info() -> str:
return "Please provide payment information"
# Create DAG with multi-step validation
builder = DAGBuilder()
# Initial classifier
builder.add_node("purchase_classifier", "classifier",
output_labels=["purchase"],
description="Classify purchase requests")
# Extract purchase details
builder.add_node("extract_purchase_details", "extractor",
param_schema={"user_id": str, "product": str, "payment_info": str},
description="Extract purchase details")
# Validation nodes
builder.add_node("user_validator", "classifier",
output_labels=["valid", "invalid"],
description="Validate user")
builder.add_node("payment_validator", "classifier",
output_labels=["valid", "invalid"],
description="Validate payment")
# Actions
builder.add_node("user_validation_action", "action",
action=validate_user,
description="Validate user")
builder.add_node("payment_validation_action", "action",
action=validate_payment,
description="Validate payment")
builder.add_node("purchase_action", "action",
action=process_purchase,
description="Process purchase")
builder.add_node("auth_clarification", "clarification",
clarification_message="Please authenticate to continue",
available_options=["Login", "Register", "Cancel"],
output_key="auth_response")
builder.add_node("payment_clarification", "clarification",
clarification_message="Please provide payment information",
available_options=["Credit Card", "PayPal", "Cancel"],
output_key="payment_response")
# Connect multi-step validation
builder.add_edge("purchase_classifier", "extract_purchase_details", "purchase")
builder.add_edge("extract_purchase_details", "user_validator", "success")
builder.add_edge("user_validator", "user_validation_action", "valid")
builder.add_edge("user_validator", "auth_clarification", "invalid")
builder.add_edge("auth_clarification", "user_validator", "Login")
builder.add_edge("user_validation_action", "payment_validator", "success")
builder.add_edge("payment_validator", "payment_validation_action", "valid")
builder.add_edge("payment_validator", "payment_clarification", "invalid")
builder.add_edge("payment_clarification", "payment_validator", "Credit Card")
builder.add_edge("payment_validation_action", "purchase_action", "success")
builder.set_entrypoints(["purchase_classifier"])
Advanced Patterns¶
Parallel Processing¶
from intent_kit import DAGBuilder
def analyze_sentiment(text: str) -> str:
return f"Sentiment analysis: {text} is positive"
def extract_keywords(text: str) -> str:
return f"Keywords extracted: {text}"
def summarize_text(text: str) -> str:
return f"Summary: {text[:50]}..."
def combine_analysis(sentiment: str, keywords: str, summary: str) -> str:
return f"Combined analysis: {sentiment}, {keywords}, {summary}"
# Create DAG with parallel processing
builder = DAGBuilder()
# Main classifier
builder.add_node("text_classifier", "classifier",
output_labels=["analyze"],
description="Classify text analysis requests")
# Extract text
builder.add_node("extract_text", "extractor",
param_schema={"text": str},
description="Extract text for analysis")
# Parallel analysis nodes
builder.add_node("sentiment_analyzer", "action",
action=analyze_sentiment,
description="Analyze sentiment")
builder.add_node("keyword_extractor", "action",
action=extract_keywords,
description="Extract keywords")
builder.add_node("text_summarizer", "action",
action=summarize_text,
description="Summarize text")
# Combine results
builder.add_node("analysis_combiner", "action",
action=combine_analysis,
description="Combine analysis results")
# Connect parallel processing
builder.add_edge("text_classifier", "extract_text", "analyze")
builder.add_edge("extract_text", "sentiment_analyzer", "success")
builder.add_edge("extract_text", "keyword_extractor", "success")
builder.add_edge("extract_text", "text_summarizer", "success")
# All parallel nodes feed into combiner
builder.add_edge("sentiment_analyzer", "analysis_combiner", "success")
builder.add_edge("keyword_extractor", "analysis_combiner", "success")
builder.add_edge("text_summarizer", "analysis_combiner", "success")
builder.set_entrypoints(["text_classifier"])
Error Handling and Recovery¶
from intent_kit import DAGBuilder
def process_data(data: str) -> str:
return f"Data processed: {data}"
def fallback_processing(data: str) -> str:
return f"Fallback processing: {data}"
def log_error(error: str) -> str:
return f"Error logged: {error}"
def retry_processing(data: str) -> str:
return f"Retry processing: {data}"
# Create DAG with error handling
builder = DAGBuilder()
# Main classifier
builder.add_node("data_classifier", "classifier",
output_labels=["process"],
description="Classify data processing requests")
# Extract data
builder.add_node("extract_data", "extractor",
param_schema={"data": str},
description="Extract data for processing")
# Processing nodes
builder.add_node("data_processor", "classifier",
output_labels=["success", "error", "retry"],
description="Process data with error handling")
builder.add_node("main_processor", "action",
action=process_data,
description="Main data processor")
builder.add_node("fallback_processor", "action",
action=fallback_processing,
description="Fallback processor")
builder.add_node("error_logger", "action",
action=log_error,
description="Log errors")
builder.add_node("retry_processor", "action",
action=retry_processing,
description="Retry processing")
# Connect with error handling
builder.add_edge("data_classifier", "extract_data", "process")
builder.add_edge("extract_data", "data_processor", "success")
builder.add_edge("data_processor", "main_processor", "success")
builder.add_edge("data_processor", "fallback_processor", "error")
builder.add_edge("data_processor", "retry_processor", "retry")
# Error logging
builder.add_edge("main_processor", "error_logger", "error")
builder.add_edge("fallback_processor", "error_logger", "error")
builder.add_edge("retry_processor", "error_logger", "error")
builder.set_entrypoints(["data_classifier"])
Real-World Examples¶
Customer Support System¶
from intent_kit import DAGBuilder
def route_ticket(category: str, priority: str) -> str:
return f"Ticket routed to {category} team with {priority} priority"
def create_ticket(user_id: str, issue: str) -> str:
return f"Support ticket created for user {user_id}: {issue}"
def escalate_ticket(ticket_id: str, reason: str) -> str:
return f"Ticket {ticket_id} escalated: {reason}"
def resolve_ticket(ticket_id: str, solution: str) -> str:
return f"Ticket {ticket_id} resolved: {solution}"
# Create customer support DAG
builder = DAGBuilder()
# Main classifier
builder.add_node("support_classifier", "classifier",
output_labels=["create", "escalate", "resolve", "status"],
description="Classify support requests")
# Shared extractors
builder.add_node("extract_user_info", "extractor",
param_schema={"user_id": str},
description="Extract user information")
builder.add_node("extract_ticket_info", "extractor",
param_schema={"ticket_id": str},
description="Extract ticket information")
# Specific extractors
builder.add_node("extract_issue_details", "extractor",
param_schema={"issue": str, "category": str, "priority": str},
description="Extract issue details")
builder.add_node("extract_escalation_details", "extractor",
param_schema={"ticket_id": str, "reason": str},
description="Extract escalation details")
builder.add_node("extract_resolution_details", "extractor",
param_schema={"ticket_id": str, "solution": str},
description="Extract resolution details")
# Actions
builder.add_node("ticket_creator", "action",
action=create_ticket,
description="Create support ticket")
builder.add_node("ticket_router", "action",
action=route_ticket,
description="Route ticket to appropriate team")
builder.add_node("ticket_escalator", "action",
action=escalate_ticket,
description="Escalate ticket")
builder.add_node("ticket_resolver", "action",
action=resolve_ticket,
description="Resolve ticket")
# Connect customer support flow
builder.add_edge("support_classifier", "extract_user_info", "create")
builder.add_edge("extract_user_info", "extract_issue_details", "success")
builder.add_edge("extract_issue_details", "ticket_creator", "success")
builder.add_edge("ticket_creator", "ticket_router", "success")
builder.add_edge("support_classifier", "extract_ticket_info", "escalate")
builder.add_edge("extract_ticket_info", "extract_escalation_details", "success")
builder.add_edge("extract_escalation_details", "ticket_escalator", "success")
builder.add_edge("support_classifier", "extract_ticket_info", "resolve")
builder.add_edge("extract_ticket_info", "extract_resolution_details", "success")
builder.add_edge("extract_resolution_details", "ticket_resolver", "success")
builder.set_entrypoints(["support_classifier"])
E-commerce Recommendation System¶
from intent_kit import DAGBuilder
def get_product_recommendations(category: str, user_preferences: str) -> str:
return f"Recommendations for {category}: {user_preferences}"
def get_personalized_offers(user_id: str, purchase_history: str) -> str:
return f"Personalized offers for user {user_id}: {purchase_history}"
def analyze_user_behavior(user_id: str, behavior_data: str) -> str:
return f"Behavior analysis for user {user_id}: {behavior_data}"
def combine_recommendations(products: str, offers: str, insights: str) -> str:
return f"Combined recommendations: {products}, {offers}, {insights}"
# Create recommendation DAG
builder = DAGBuilder()
# Main classifier
builder.add_node("recommendation_classifier", "classifier",
output_labels=["products", "offers", "insights", "combined"],
description="Classify recommendation requests")
# Shared user extractor
builder.add_node("extract_user", "extractor",
param_schema={"user_id": str},
description="Extract user information")
# Specific extractors
builder.add_node("extract_preferences", "extractor",
param_schema={"category": str, "user_preferences": str},
description="Extract user preferences")
builder.add_node("extract_purchase_history", "extractor",
param_schema={"user_id": str, "purchase_history": str},
description="Extract purchase history")
builder.add_node("extract_behavior", "extractor",
param_schema={"user_id": str, "behavior_data": str},
description="Extract behavior data")
# Actions
builder.add_node("product_recommender", "action",
action=get_product_recommendations,
description="Get product recommendations")
builder.add_node("offer_generator", "action",
action=get_personalized_offers,
description="Generate personalized offers")
builder.add_node("behavior_analyzer", "action",
action=analyze_user_behavior,
description="Analyze user behavior")
builder.add_node("recommendation_combiner", "action",
action=combine_recommendations,
description="Combine all recommendations")
# Connect recommendation flow
builder.add_edge("recommendation_classifier", "extract_user", "products")
builder.add_edge("extract_user", "extract_preferences", "success")
builder.add_edge("extract_preferences", "product_recommender", "success")
builder.add_edge("recommendation_classifier", "extract_user", "offers")
builder.add_edge("extract_user", "extract_purchase_history", "success")
builder.add_edge("extract_purchase_history", "offer_generator", "success")
builder.add_edge("recommendation_classifier", "extract_user", "insights")
builder.add_edge("extract_user", "extract_behavior", "success")
builder.add_edge("extract_behavior", "behavior_analyzer", "success")
# Combined recommendations
builder.add_edge("recommendation_classifier", "extract_user", "combined")
builder.add_edge("extract_user", "extract_preferences", "success")
builder.add_edge("extract_preferences", "product_recommender", "success")
builder.add_edge("product_recommender", "recommendation_combiner", "success")
builder.add_edge("extract_user", "extract_purchase_history", "success")
builder.add_edge("extract_purchase_history", "offer_generator", "success")
builder.add_edge("offer_generator", "recommendation_combiner", "success")
builder.add_edge("extract_user", "extract_behavior", "success")
builder.add_edge("extract_behavior", "behavior_analyzer", "success")
builder.add_edge("behavior_analyzer", "recommendation_combiner", "success")
builder.set_entrypoints(["recommendation_classifier"])
Best Practices¶
1. Node Naming¶
# Use descriptive, consistent naming
builder.add_node("user_authentication_classifier", "classifier", ...)
builder.add_node("extract_user_credentials", "extractor", ...)
builder.add_node("validate_user_credentials", "action", ...)
2. Error Handling¶
# Always include error handling paths
builder.add_node("data_processor", "classifier",
output_labels=["success", "error", "retry"],
description="Process data with error handling")
3. Context Management¶
# Use context to pass data between nodes
context = DefaultContext()
context.set("user_id", "user123")
context.set("session_id", "session456")
result = dag.execute("Process my request", context)
4. Performance Optimization¶
# Use shared nodes for common operations
builder.add_node("shared_text_processor", "extractor",
param_schema={"text": str},
description="Shared text processing")