Skip to content

API Reference

IntentKit - A Python library for building hierarchical intent classification and execution systems.

This library provides: - Tree-based intent architecture with classifier and intent nodes - IntentGraph for multi-intent routing and splitting - Context-aware execution with dependency tracking - Multiple AI service backends (OpenAI, Anthropic, Google AI, Ollama) - Interactive visualization of execution paths

ClassifierNode

Bases: TreeNode

Intermediate node that uses a classifier to select child nodes.

Source code in intent_kit/classifiers/node.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class ClassifierNode(TreeNode):
    """Intermediate node that uses a classifier to select child nodes."""

    def __init__(
        self,
        name: Optional[str],
        classifier: Callable[
            [str, List["TreeNode"], Optional[Dict[str, Any]]], Optional["TreeNode"]
        ],
        children: List["TreeNode"],
        description: str = "",
        parent: Optional["TreeNode"] = None,
        remediation_strategies: Optional[List[Union[str, RemediationStrategy]]] = None,
    ):
        super().__init__(
            name=name, description=description, children=children, parent=parent
        )
        self.classifier = classifier
        self.remediation_strategies = remediation_strategies or []

    @property
    def node_type(self) -> NodeType:
        """Get the type of this node."""
        return NodeType.CLASSIFIER

    def execute(
        self, user_input: str, context: Optional[IntentContext] = None
    ) -> ExecutionResult:
        context_dict: Dict[str, Any] = {}
        # If context is needed, populate context_dict here in the future
        chosen = self.classifier(user_input, self.children, context_dict)
        if not chosen:
            self.logger.error(
                f"Classifier at '{self.name}' (Path: {'.'.join(self.get_path())}) could not route input."
            )

            # Try remediation strategies
            error = ExecutionError(
                error_type="ClassifierRoutingError",
                message=f"Classifier at '{self.name}' could not route input.",
                node_name=self.name,
                node_path=self.get_path(),
            )

            remediation_result = self._execute_remediation_strategies(
                user_input=user_input, context=context, original_error=error
            )

            if remediation_result:
                return remediation_result

            # If no remediation succeeded, return the original error
            return ExecutionResult(
                success=False,
                node_name=self.name,
                node_path=self.get_path(),
                node_type=NodeType.CLASSIFIER,
                input=user_input,
                output=None,
                error=error,
                params=None,
                children_results=[],
            )
        self.logger.debug(
            f"Classifier at '{self.name}' routed input to '{chosen.name}'."
        )
        child_result = chosen.execute(user_input, context)
        return ExecutionResult(
            success=True,
            node_name=self.name,
            node_path=self.get_path(),
            node_type=NodeType.CLASSIFIER,
            input=user_input,
            output=child_result.output,  # Return the child's actual output
            error=None,
            params={
                "chosen_child": chosen.name,
                "available_children": [child.name for child in self.children],
            },
            children_results=[child_result],
        )

    def _execute_remediation_strategies(
        self,
        user_input: str,
        context: Optional[IntentContext] = None,
        original_error: Optional[ExecutionError] = None,
    ) -> Optional[ExecutionResult]:
        """Execute remediation strategies for classifier failures."""
        if not self.remediation_strategies:
            return None

        for strategy_item in self.remediation_strategies:
            strategy: Optional[RemediationStrategy] = None

            if isinstance(strategy_item, str):
                # String ID - get from registry
                strategy = get_remediation_strategy(strategy_item)
                if not strategy:
                    self.logger.warning(
                        f"Remediation strategy '{strategy_item}' not found in registry"
                    )
                    continue
            elif isinstance(strategy_item, RemediationStrategy):
                # Direct strategy object
                strategy = strategy_item
            else:
                self.logger.warning(
                    f"Invalid remediation strategy type: {type(strategy_item)}"
                )
                continue

            try:
                result = strategy.execute(
                    node_name=self.name or "unknown",
                    user_input=user_input,
                    context=context,
                    original_error=original_error,
                    classifier_func=self.classifier,
                    available_children=self.children,
                )
                if result and result.success:
                    self.logger.info(
                        f"Remediation strategy '{strategy.name}' succeeded for {self.name}"
                    )
                    return result
                else:
                    self.logger.warning(
                        f"Remediation strategy '{strategy.name}' failed for {self.name}"
                    )
            except Exception as e:
                self.logger.error(
                    f"Remediation strategy '{strategy.name}' error for {self.name}: {type(e).__name__}: {str(e)}"
                )

        self.logger.error(f"All remediation strategies failed for {self.name}")
        return None

node_type property

Get the type of this node.

HandlerNode

Bases: TreeNode

Leaf node representing an executable handler with argument extraction and validation.

Source code in intent_kit/handlers/node.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class HandlerNode(TreeNode):
    """Leaf node representing an executable handler with argument extraction and validation."""

    def __init__(
        self,
        name: Optional[str],
        param_schema: Dict[str, Type],
        handler: Callable[..., Any],
        arg_extractor: Callable[[str, Optional[Dict[str, Any]]], Dict[str, Any]],
        context_inputs: Optional[Set[str]] = None,
        context_outputs: Optional[Set[str]] = None,
        input_validator: Optional[Callable[[Dict[str, Any]], bool]] = None,
        output_validator: Optional[Callable[[Any], bool]] = None,
        description: str = "",
        parent: Optional["TreeNode"] = None,
        remediation_strategies: Optional[List[Union[str, RemediationStrategy]]] = None,
    ):
        super().__init__(name=name, description=description, children=[], parent=parent)
        self.param_schema = param_schema
        self.handler = handler
        self.arg_extractor = arg_extractor
        self.context_inputs = context_inputs or set()
        self.context_outputs = context_outputs or set()
        self.input_validator = input_validator
        self.output_validator = output_validator
        self.context_dependencies = declare_dependencies(
            inputs=self.context_inputs,
            outputs=self.context_outputs,
            description=f"Context dependencies for intent '{self.name}'",
        )

        # Store remediation strategies
        self.remediation_strategies = remediation_strategies or []

    @property
    def node_type(self) -> NodeType:
        """Get the type of this node."""
        return NodeType.HANDLER

    def execute(
        self, user_input: str, context: Optional[IntentContext] = None
    ) -> ExecutionResult:
        try:
            context_dict: Optional[Dict[str, Any]] = None
            if context:
                context_dict = {
                    key: context.get(key)
                    for key in self.context_inputs
                    if context.has(key)
                }
            extracted_params = self.arg_extractor(user_input, context_dict or {})
        except Exception as e:
            self.logger.error(
                f"Argument extraction failed for intent '{self.name}' (Path: {'.'.join(self.get_path())}): {type(e).__name__}: {str(e)}"
            )
            return ExecutionResult(
                success=False,
                node_name=self.name,
                node_path=self.get_path(),
                node_type=NodeType.HANDLER,
                input=user_input,
                output=None,
                error=ExecutionError(
                    error_type=type(e).__name__,
                    message=str(e),
                    node_name=self.name,
                    node_path=self.get_path(),
                ),
                params=None,
                children_results=[],
            )
        if self.input_validator:
            try:
                if not self.input_validator(extracted_params):
                    self.logger.error(
                        f"Input validation failed for intent '{self.name}' (Path: {'.'.join(self.get_path())})"
                    )
                    return ExecutionResult(
                        success=False,
                        node_name=self.name,
                        node_path=self.get_path(),
                        node_type=NodeType.HANDLER,
                        input=user_input,
                        output=None,
                        error=ExecutionError(
                            error_type="InputValidationError",
                            message="Input validation failed",
                            node_name=self.name,
                            node_path=self.get_path(),
                        ),
                        params=extracted_params,
                        children_results=[],
                    )
            except Exception as e:
                self.logger.error(
                    f"Input validation error for intent '{self.name}' (Path: {'.'.join(self.get_path())}): {type(e).__name__}: {str(e)}"
                )
                return ExecutionResult(
                    success=False,
                    node_name=self.name,
                    node_path=self.get_path(),
                    node_type=NodeType.HANDLER,
                    input=user_input,
                    output=None,
                    error=ExecutionError(
                        error_type=type(e).__name__,
                        message=str(e),
                        node_name=self.name,
                        node_path=self.get_path(),
                    ),
                    params=extracted_params,
                    children_results=[],
                )
        try:
            validated_params = self._validate_types(extracted_params)
        except Exception as e:
            self.logger.error(
                f"Type validation error for intent '{self.name}' (Path: {'.'.join(self.get_path())}): {type(e).__name__}: {str(e)}"
            )
            return ExecutionResult(
                success=False,
                node_name=self.name,
                node_path=self.get_path(),
                node_type=NodeType.HANDLER,
                input=user_input,
                output=None,
                error=ExecutionError(
                    error_type=type(e).__name__,
                    message=str(e),
                    node_name=self.name,
                    node_path=self.get_path(),
                ),
                params=extracted_params,
                children_results=[],
            )
        try:
            if context is not None:
                output = self.handler(**validated_params, context=context)
            else:
                output = self.handler(**validated_params)
        except Exception as e:
            self.logger.error(
                f"Handler execution error for intent '{self.name}' (Path: {'.'.join(self.get_path())}): {type(e).__name__}: {str(e)}"
            )

            # Try remediation strategies
            error = ExecutionError(
                error_type=type(e).__name__,
                message=str(e),
                node_name=self.name,
                node_path=self.get_path(),
            )

            remediation_result = self._execute_remediation_strategies(
                user_input=user_input,
                context=context,
                original_error=error,
                validated_params=validated_params,
            )

            if remediation_result:
                return remediation_result

            # If no remediation succeeded, return the original error
            return ExecutionResult(
                success=False,
                node_name=self.name,
                node_path=self.get_path(),
                node_type=NodeType.HANDLER,
                input=user_input,
                output=None,
                error=error,
                params=validated_params,
                children_results=[],
            )
        if self.output_validator:
            try:
                if not self.output_validator(output):
                    self.logger.error(
                        f"Output validation failed for intent '{self.name}' (Path: {'.'.join(self.get_path())})"
                    )
                    return ExecutionResult(
                        success=False,
                        node_name=self.name,
                        node_path=self.get_path(),
                        node_type=NodeType.HANDLER,
                        input=user_input,
                        output=output,
                        error=ExecutionError(
                            error_type="OutputValidationError",
                            message="Output validation failed",
                            node_name=self.name,
                            node_path=self.get_path(),
                        ),
                        params=validated_params,
                        children_results=[],
                    )
            except Exception as e:
                self.logger.error(
                    f"Output validation error for intent '{self.name}' (Path: {'.'.join(self.get_path())}): {type(e).__name__}: {str(e)}"
                )
                return ExecutionResult(
                    success=False,
                    node_name=self.name,
                    node_path=self.get_path(),
                    node_type=NodeType.HANDLER,
                    input=user_input,
                    output=output,
                    error=ExecutionError(
                        error_type=type(e).__name__,
                        message=str(e),
                        node_name=self.name,
                        node_path=self.get_path(),
                    ),
                    params=validated_params,
                    children_results=[],
                )
        return ExecutionResult(
            success=True,
            node_name=self.name,
            node_path=self.get_path(),
            node_type=NodeType.HANDLER,
            input=user_input,
            output=output,
            error=None,
            params=validated_params,
            children_results=[],
        )

    def _execute_remediation_strategies(
        self,
        user_input: str,
        context: Optional[IntentContext] = None,
        original_error: Optional[ExecutionError] = None,
        validated_params: Optional[Dict[str, Any]] = None,
    ) -> Optional[ExecutionResult]:
        """Execute remediation strategies in order until one succeeds."""
        if not self.remediation_strategies:
            return None

        for strategy_item in self.remediation_strategies:
            strategy: Optional[RemediationStrategy] = None

            if isinstance(strategy_item, str):
                # String ID - get from registry
                strategy = get_remediation_strategy(strategy_item)
                if not strategy:
                    self.logger.warning(
                        f"Remediation strategy '{strategy_item}' not found in registry"
                    )
                    continue
            elif isinstance(strategy_item, RemediationStrategy):
                # Direct strategy object
                strategy = strategy_item
            else:
                self.logger.warning(
                    f"Invalid remediation strategy type: {type(strategy_item)}"
                )
                continue

            try:
                result = strategy.execute(
                    node_name=self.name or "unknown",
                    user_input=user_input,
                    context=context,
                    original_error=original_error,
                    handler_func=self.handler,
                    validated_params=validated_params,
                )
                if result and result.success:
                    self.logger.info(
                        f"Remediation strategy '{strategy.name}' succeeded for {self.name}"
                    )
                    return result
                else:
                    self.logger.warning(
                        f"Remediation strategy '{strategy.name}' failed for {self.name}"
                    )
            except Exception as e:
                self.logger.error(
                    f"Remediation strategy '{strategy.name}' error for {self.name}: {type(e).__name__}: {str(e)}"
                )

        self.logger.error(f"All remediation strategies failed for {self.name}")
        return None

    def _validate_types(self, params: Dict[str, Any]) -> Dict[str, Any]:
        validated_params = {}
        for param_name, expected_type in self.param_schema.items():
            if param_name not in params:
                self.logger.error(
                    f"Missing required parameter '{param_name}' for intent '{self.name}' (Path: {'.'.join(self.get_path())})"
                )
                raise Exception(f"Missing required parameter '{param_name}'")
            param_value = params[param_name]
            if isinstance(expected_type, type) and expected_type is str:
                if not isinstance(param_value, str):
                    self.logger.error(
                        f"Parameter '{param_name}' must be a string, got {type(param_value).__name__} for intent '{self.name}' (Path: {'.'.join(self.get_path())})"
                    )
                    raise Exception(
                        f"Parameter '{param_name}' must be a string, got {type(param_value).__name__}"
                    )
            elif isinstance(expected_type, type) and expected_type is int:
                try:
                    param_value = int(param_value)
                except (ValueError, TypeError) as e:
                    self.logger.error(
                        f"Parameter '{param_name}' must be an integer, got {type(param_value).__name__} for intent '{self.name}' (Path: {'.'.join(self.get_path())}): {type(e).__name__}: {str(e)}"
                    )
                    raise Exception(
                        f"Parameter '{param_name}' must be an integer, got {type(param_value).__name__}: {type(e).__name__}: {str(e)}"
                    )
            elif isinstance(expected_type, type) and expected_type is float:
                try:
                    param_value = float(param_value)
                except (ValueError, TypeError) as e:
                    self.logger.error(
                        f"Parameter '{param_name}' must be a number, got {type(param_value).__name__} for intent '{self.name}' (Path: {'.'.join(self.get_path())}): {type(e).__name__}: {str(e)}"
                    )
                    raise Exception(
                        f"Parameter '{param_name}' must be a number, got {type(param_value).__name__}: {type(e).__name__}: {str(e)}"
                    )
            elif isinstance(expected_type, type) and expected_type is bool:
                if isinstance(param_value, str):
                    param_value = param_value.lower() in ("true", "1", "yes", "on")
                elif not isinstance(param_value, bool):
                    self.logger.error(
                        f"Parameter '{param_name}' must be a boolean, got {type(param_value).__name__} for intent '{self.name}' (Path: {'.'.join(self.get_path())})"
                    )
                    raise Exception(
                        f"Parameter '{param_name}' must be a boolean, got {type(param_value).__name__}"
                    )
            validated_params[param_name] = param_value
        return validated_params

node_type property

Get the type of this node.

IntentContext

Thread-safe context object for sharing state between workflow steps.

Features: - Field-level locking for concurrent access - Complete audit trail of all operations - Error tracking with detailed information - Session-based isolation - Type-safe field access

Source code in intent_kit/context/__init__.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
class IntentContext:
    """
    Thread-safe context object for sharing state between workflow steps.

    Features:
    - Field-level locking for concurrent access
    - Complete audit trail of all operations
    - Error tracking with detailed information
    - Session-based isolation
    - Type-safe field access
    """

    def __init__(self, session_id: Optional[str] = None, debug: bool = False):
        """
        Initialize a new IntentContext.

        Args:
            session_id: Unique identifier for this context session
            debug: Enable debug logging
        """
        self.session_id = session_id or str(uuid.uuid4())
        self._fields: Dict[str, ContextField] = {}
        self._history: List[ContextHistoryEntry] = []
        self._errors: List[ContextErrorEntry] = []
        self._global_lock = Lock()
        self._debug = debug
        self.logger = Logger(__name__)

        if self._debug:
            self.logger.info(
                f"Created IntentContext with session_id: {self.session_id}"
            )

    def get(self, key: str, default: Any = None) -> Any:
        """
        Get a value from context with field-level locking.

        Args:
            key: The field key to retrieve
            default: Default value if key doesn't exist

        Returns:
            The field value or default
        """
        with self._global_lock:
            if key not in self._fields:
                if self._debug:
                    self.logger.debug(
                        f"Key '{key}' not found, returning default: {default}"
                    )
                self._log_history("get", key, default, None)
                return default
            field = self._fields[key]

        with field.lock:
            value = field.value
            if self._debug:
                self.logger.debug(f"Retrieved '{key}' = {value}")
            self._log_history("get", key, value, None)
            return value

    def set(self, key: str, value: Any, modified_by: Optional[str] = None) -> None:
        """
        Set a value in context with field-level locking and history tracking.

        Args:
            key: The field key to set
            value: The value to store
            modified_by: Identifier for who/what modified this field
        """
        with self._global_lock:
            if key not in self._fields:
                self._fields[key] = ContextField(value)
                # Set modified_by for new fields
                self._fields[key].modified_by = modified_by
                if self._debug:
                    self.logger.debug(f"Created new field '{key}' = {value}")
            else:
                field = self._fields[key]
                with field.lock:
                    old_value = field.value
                    field.value = value
                    field.last_modified = datetime.now()
                    field.modified_by = modified_by
                    if self._debug:
                        self.logger.debug(
                            f"Updated field '{key}' from {old_value} to {value}"
                        )

            self._log_history("set", key, value, modified_by)

    def delete(self, key: str, modified_by: Optional[str] = None) -> bool:
        """
        Delete a field from context.

        Args:
            key: The field key to delete
            modified_by: Identifier for who/what deleted this field

        Returns:
            True if field was deleted, False if it didn't exist
        """
        with self._global_lock:
            if key not in self._fields:
                if self._debug:
                    self.logger.debug(f"Attempted to delete non-existent key '{key}'")
                self._log_history("delete", key, None, modified_by)
                return False

            del self._fields[key]
            if self._debug:
                self.logger.debug(f"Deleted field '{key}'")
            self._log_history("delete", key, None, modified_by)
            return True

    def has(self, key: str) -> bool:
        """
        Check if a field exists in context.

        Args:
            key: The field key to check

        Returns:
            True if field exists, False otherwise
        """
        with self._global_lock:
            return key in self._fields

    def keys(self) -> Set[str]:
        """
        Get all field keys in the context.

        Returns:
            Set of all field keys
        """
        with self._global_lock:
            return set(self._fields.keys())

    def get_history(
        self, key: Optional[str] = None, limit: Optional[int] = None
    ) -> List[ContextHistoryEntry]:
        """
        Get the history of context operations.

        Args:
            key: Filter history to specific key (optional)
            limit: Maximum number of entries to return (optional)

        Returns:
            List of history entries
        """
        with self._global_lock:
            if key:
                filtered_history = [
                    entry for entry in self._history if entry.key == key
                ]
            else:
                filtered_history = self._history.copy()

            if limit:
                filtered_history = filtered_history[-limit:]

            return filtered_history

    def get_field_metadata(self, key: str) -> Optional[Dict[str, Any]]:
        """
        Get metadata for a specific field.

        Args:
            key: The field key

        Returns:
            Dictionary with field metadata or None if field doesn't exist
        """
        with self._global_lock:
            if key not in self._fields:
                return None

            field = self._fields[key]
            return {
                "created_at": field.created_at,
                "last_modified": field.last_modified,
                "modified_by": field.modified_by,
                "value": field.value,
            }

    def clear(self, modified_by: Optional[str] = None) -> None:
        """
        Clear all fields from context.

        Args:
            modified_by: Identifier for who/what cleared the context
        """
        with self._global_lock:
            keys = list(self._fields.keys())
            self._fields.clear()
            if self._debug:
                self.logger.debug(f"Cleared all fields: {keys}")
            self._log_history("clear", "ALL", None, modified_by)

    def _log_history(
        self, action: str, key: str, value: Any, modified_by: Optional[str]
    ) -> None:
        """Log an operation to the history."""
        entry = ContextHistoryEntry(
            timestamp=datetime.now(),
            action=action,
            key=key,
            value=value,
            modified_by=modified_by,
            session_id=self.session_id,
        )
        self._history.append(entry)

    def add_error(
        self,
        node_name: str,
        user_input: str,
        error_message: str,
        error_type: str,
        params: Optional[Dict[str, Any]] = None,
    ) -> None:
        """
        Add an error to the context error log.

        Args:
            node_name: Name of the node where the error occurred
            user_input: The user input that caused the error
            error_message: The error message
            error_type: The type of error
            params: Optional parameters that were being processed
        """
        with self._global_lock:
            error_entry = ContextErrorEntry(
                timestamp=datetime.now(),
                node_name=node_name,
                user_input=user_input,
                error_message=error_message,
                error_type=error_type,
                stack_trace=traceback.format_exc(),
                params=params,
                session_id=self.session_id,
            )
            self._errors.append(error_entry)

            if self._debug:
                self.logger.error(
                    f"Added error to context: {node_name}: {error_message}"
                )

    def get_errors(
        self, node_name: Optional[str] = None, limit: Optional[int] = None
    ) -> List[ContextErrorEntry]:
        """
        Get errors from the context error log.

        Args:
            node_name: Filter errors by node name (optional)
            limit: Maximum number of errors to return (optional)

        Returns:
            List of error entries
        """
        with self._global_lock:
            filtered_errors = self._errors.copy()

            if node_name:
                filtered_errors = [
                    error for error in filtered_errors if error.node_name == node_name
                ]

            if limit:
                filtered_errors = filtered_errors[-limit:]

            return filtered_errors

    def clear_errors(self) -> None:
        """Clear all errors from the context."""
        with self._global_lock:
            error_count = len(self._errors)
            self._errors.clear()
            if self._debug:
                self.logger.debug(f"Cleared {error_count} errors from context")

    def error_count(self) -> int:
        """Get the total number of errors in the context."""
        with self._global_lock:
            return len(self._errors)

    def __str__(self) -> str:
        """String representation of the context."""
        with self._global_lock:
            field_count = len(self._fields)
            history_count = len(self._history)
            error_count = len(self._errors)

        return f"IntentContext(session_id={self.session_id}, fields={field_count}, history={history_count}, errors={error_count})"

    def __repr__(self) -> str:
        """Detailed string representation of the context."""
        return self.__str__()

__init__(session_id=None, debug=False)

Initialize a new IntentContext.

Parameters:

Name Type Description Default
session_id Optional[str]

Unique identifier for this context session

None
debug bool

Enable debug logging

False
Source code in intent_kit/context/__init__.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(self, session_id: Optional[str] = None, debug: bool = False):
    """
    Initialize a new IntentContext.

    Args:
        session_id: Unique identifier for this context session
        debug: Enable debug logging
    """
    self.session_id = session_id or str(uuid.uuid4())
    self._fields: Dict[str, ContextField] = {}
    self._history: List[ContextHistoryEntry] = []
    self._errors: List[ContextErrorEntry] = []
    self._global_lock = Lock()
    self._debug = debug
    self.logger = Logger(__name__)

    if self._debug:
        self.logger.info(
            f"Created IntentContext with session_id: {self.session_id}"
        )

__repr__()

Detailed string representation of the context.

Source code in intent_kit/context/__init__.py
352
353
354
def __repr__(self) -> str:
    """Detailed string representation of the context."""
    return self.__str__()

__str__()

String representation of the context.

Source code in intent_kit/context/__init__.py
343
344
345
346
347
348
349
350
def __str__(self) -> str:
    """String representation of the context."""
    with self._global_lock:
        field_count = len(self._fields)
        history_count = len(self._history)
        error_count = len(self._errors)

    return f"IntentContext(session_id={self.session_id}, fields={field_count}, history={history_count}, errors={error_count})"

add_error(node_name, user_input, error_message, error_type, params=None)

Add an error to the context error log.

Parameters:

Name Type Description Default
node_name str

Name of the node where the error occurred

required
user_input str

The user input that caused the error

required
error_message str

The error message

required
error_type str

The type of error

required
params Optional[Dict[str, Any]]

Optional parameters that were being processed

None
Source code in intent_kit/context/__init__.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def add_error(
    self,
    node_name: str,
    user_input: str,
    error_message: str,
    error_type: str,
    params: Optional[Dict[str, Any]] = None,
) -> None:
    """
    Add an error to the context error log.

    Args:
        node_name: Name of the node where the error occurred
        user_input: The user input that caused the error
        error_message: The error message
        error_type: The type of error
        params: Optional parameters that were being processed
    """
    with self._global_lock:
        error_entry = ContextErrorEntry(
            timestamp=datetime.now(),
            node_name=node_name,
            user_input=user_input,
            error_message=error_message,
            error_type=error_type,
            stack_trace=traceback.format_exc(),
            params=params,
            session_id=self.session_id,
        )
        self._errors.append(error_entry)

        if self._debug:
            self.logger.error(
                f"Added error to context: {node_name}: {error_message}"
            )

clear(modified_by=None)

Clear all fields from context.

Parameters:

Name Type Description Default
modified_by Optional[str]

Identifier for who/what cleared the context

None
Source code in intent_kit/context/__init__.py
240
241
242
243
244
245
246
247
248
249
250
251
252
def clear(self, modified_by: Optional[str] = None) -> None:
    """
    Clear all fields from context.

    Args:
        modified_by: Identifier for who/what cleared the context
    """
    with self._global_lock:
        keys = list(self._fields.keys())
        self._fields.clear()
        if self._debug:
            self.logger.debug(f"Cleared all fields: {keys}")
        self._log_history("clear", "ALL", None, modified_by)

clear_errors()

Clear all errors from the context.

Source code in intent_kit/context/__init__.py
330
331
332
333
334
335
336
def clear_errors(self) -> None:
    """Clear all errors from the context."""
    with self._global_lock:
        error_count = len(self._errors)
        self._errors.clear()
        if self._debug:
            self.logger.debug(f"Cleared {error_count} errors from context")

delete(key, modified_by=None)

Delete a field from context.

Parameters:

Name Type Description Default
key str

The field key to delete

required
modified_by Optional[str]

Identifier for who/what deleted this field

None

Returns:

Type Description
bool

True if field was deleted, False if it didn't exist

Source code in intent_kit/context/__init__.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def delete(self, key: str, modified_by: Optional[str] = None) -> bool:
    """
    Delete a field from context.

    Args:
        key: The field key to delete
        modified_by: Identifier for who/what deleted this field

    Returns:
        True if field was deleted, False if it didn't exist
    """
    with self._global_lock:
        if key not in self._fields:
            if self._debug:
                self.logger.debug(f"Attempted to delete non-existent key '{key}'")
            self._log_history("delete", key, None, modified_by)
            return False

        del self._fields[key]
        if self._debug:
            self.logger.debug(f"Deleted field '{key}'")
        self._log_history("delete", key, None, modified_by)
        return True

error_count()

Get the total number of errors in the context.

Source code in intent_kit/context/__init__.py
338
339
340
341
def error_count(self) -> int:
    """Get the total number of errors in the context."""
    with self._global_lock:
        return len(self._errors)

get(key, default=None)

Get a value from context with field-level locking.

Parameters:

Name Type Description Default
key str

The field key to retrieve

required
default Any

Default value if key doesn't exist

None

Returns:

Type Description
Any

The field value or default

Source code in intent_kit/context/__init__.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def get(self, key: str, default: Any = None) -> Any:
    """
    Get a value from context with field-level locking.

    Args:
        key: The field key to retrieve
        default: Default value if key doesn't exist

    Returns:
        The field value or default
    """
    with self._global_lock:
        if key not in self._fields:
            if self._debug:
                self.logger.debug(
                    f"Key '{key}' not found, returning default: {default}"
                )
            self._log_history("get", key, default, None)
            return default
        field = self._fields[key]

    with field.lock:
        value = field.value
        if self._debug:
            self.logger.debug(f"Retrieved '{key}' = {value}")
        self._log_history("get", key, value, None)
        return value

get_errors(node_name=None, limit=None)

Get errors from the context error log.

Parameters:

Name Type Description Default
node_name Optional[str]

Filter errors by node name (optional)

None
limit Optional[int]

Maximum number of errors to return (optional)

None

Returns:

Type Description
List[ContextErrorEntry]

List of error entries

Source code in intent_kit/context/__init__.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def get_errors(
    self, node_name: Optional[str] = None, limit: Optional[int] = None
) -> List[ContextErrorEntry]:
    """
    Get errors from the context error log.

    Args:
        node_name: Filter errors by node name (optional)
        limit: Maximum number of errors to return (optional)

    Returns:
        List of error entries
    """
    with self._global_lock:
        filtered_errors = self._errors.copy()

        if node_name:
            filtered_errors = [
                error for error in filtered_errors if error.node_name == node_name
            ]

        if limit:
            filtered_errors = filtered_errors[-limit:]

        return filtered_errors

get_field_metadata(key)

Get metadata for a specific field.

Parameters:

Name Type Description Default
key str

The field key

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary with field metadata or None if field doesn't exist

Source code in intent_kit/context/__init__.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def get_field_metadata(self, key: str) -> Optional[Dict[str, Any]]:
    """
    Get metadata for a specific field.

    Args:
        key: The field key

    Returns:
        Dictionary with field metadata or None if field doesn't exist
    """
    with self._global_lock:
        if key not in self._fields:
            return None

        field = self._fields[key]
        return {
            "created_at": field.created_at,
            "last_modified": field.last_modified,
            "modified_by": field.modified_by,
            "value": field.value,
        }

get_history(key=None, limit=None)

Get the history of context operations.

Parameters:

Name Type Description Default
key Optional[str]

Filter history to specific key (optional)

None
limit Optional[int]

Maximum number of entries to return (optional)

None

Returns:

Type Description
List[ContextHistoryEntry]

List of history entries

Source code in intent_kit/context/__init__.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_history(
    self, key: Optional[str] = None, limit: Optional[int] = None
) -> List[ContextHistoryEntry]:
    """
    Get the history of context operations.

    Args:
        key: Filter history to specific key (optional)
        limit: Maximum number of entries to return (optional)

    Returns:
        List of history entries
    """
    with self._global_lock:
        if key:
            filtered_history = [
                entry for entry in self._history if entry.key == key
            ]
        else:
            filtered_history = self._history.copy()

        if limit:
            filtered_history = filtered_history[-limit:]

        return filtered_history

has(key)

Check if a field exists in context.

Parameters:

Name Type Description Default
key str

The field key to check

required

Returns:

Type Description
bool

True if field exists, False otherwise

Source code in intent_kit/context/__init__.py
169
170
171
172
173
174
175
176
177
178
179
180
def has(self, key: str) -> bool:
    """
    Check if a field exists in context.

    Args:
        key: The field key to check

    Returns:
        True if field exists, False otherwise
    """
    with self._global_lock:
        return key in self._fields

keys()

Get all field keys in the context.

Returns:

Type Description
Set[str]

Set of all field keys

Source code in intent_kit/context/__init__.py
182
183
184
185
186
187
188
189
190
def keys(self) -> Set[str]:
    """
    Get all field keys in the context.

    Returns:
        Set of all field keys
    """
    with self._global_lock:
        return set(self._fields.keys())

set(key, value, modified_by=None)

Set a value in context with field-level locking and history tracking.

Parameters:

Name Type Description Default
key str

The field key to set

required
value Any

The value to store

required
modified_by Optional[str]

Identifier for who/what modified this field

None
Source code in intent_kit/context/__init__.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def set(self, key: str, value: Any, modified_by: Optional[str] = None) -> None:
    """
    Set a value in context with field-level locking and history tracking.

    Args:
        key: The field key to set
        value: The value to store
        modified_by: Identifier for who/what modified this field
    """
    with self._global_lock:
        if key not in self._fields:
            self._fields[key] = ContextField(value)
            # Set modified_by for new fields
            self._fields[key].modified_by = modified_by
            if self._debug:
                self.logger.debug(f"Created new field '{key}' = {value}")
        else:
            field = self._fields[key]
            with field.lock:
                old_value = field.value
                field.value = value
                field.last_modified = datetime.now()
                field.modified_by = modified_by
                if self._debug:
                    self.logger.debug(
                        f"Updated field '{key}' from {old_value} to {value}"
                    )

        self._log_history("set", key, value, modified_by)

IntentGraph

The root-level dispatcher for user input.

The graph contains root nodes that can handle different types of intents. Input splitting happens in isolation and routes to appropriate root nodes. Trees emerge naturally from the parent-child relationships between nodes.

Source code in intent_kit/graph/intent_graph.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
class IntentGraph:
    """
    The root-level dispatcher for user input.

    The graph contains root nodes that can handle different types of intents.
    Input splitting happens in isolation and routes to appropriate root nodes.
    Trees emerge naturally from the parent-child relationships between nodes.
    """

    def __init__(
        self,
        root_nodes: Optional[List[TreeNode]] = None,
        splitter: Optional[SplitterFunction] = None,
        visualize: bool = False,
        llm_config: Optional[dict] = None,
        debug_context: bool = False,
        context_trace: bool = False,
    ):
        """
        Initialize the IntentGraph with root nodes.

        Args:
            root_nodes: List of root nodes that can handle intents
            splitter: Function to use for splitting intents (default: pass-through splitter)
            visualize: If True, render the final output to an interactive graph HTML file
            llm_config: LLM configuration for chunk classification (optional)
            debug_context: If True, enable context debugging and state tracking
            context_trace: If True, enable detailed context tracing with timestamps
        """
        self.root_nodes: List[TreeNode] = root_nodes or []

        # Default to pass-through splitter if none provided
        if splitter is None:

            def pass_through_splitter(
                user_input: str, debug: bool = False
            ) -> List[IntentChunk]:
                """Pass-through splitter that doesn't split the input."""
                return [user_input]

            self.splitter: SplitterFunction = pass_through_splitter
        else:
            self.splitter = splitter

        self.logger = Logger(__name__)
        self.visualize = visualize
        self.llm_config = llm_config
        self.debug_context = debug_context
        self.context_trace = context_trace

    def add_root_node(self, root_node: TreeNode, validate: bool = True) -> None:
        """
        Add a root node to the graph.

        Args:
            root_node: The root node to add
            validate: Whether to validate the graph after adding the node
        """
        if not isinstance(root_node, TreeNode):
            raise ValueError("Root node must be a TreeNode")

        self.root_nodes.append(root_node)
        self.logger.info(f"Added root node: {root_node.name}")

        # Validate the graph after adding the node
        if validate:
            try:
                self.validate_graph()
                self.logger.info("Graph validation passed after adding root node")
            except GraphValidationError as e:
                self.logger.error(
                    f"Graph validation failed after adding root node: {e.message}"
                )
                # Remove the node if validation fails and re-raise the error
                self.root_nodes.remove(root_node)
                raise e

    def remove_root_node(self, root_node: TreeNode) -> None:
        """
        Remove a root node from the graph.

        Args:
            root_node: The root node to remove
        """
        if root_node in self.root_nodes:
            self.root_nodes.remove(root_node)
            self.logger.info(f"Removed root node: {root_node.name}")
        else:
            self.logger.warning(f"Root node '{root_node.name}' not found for removal")

    def list_root_nodes(self) -> List[str]:
        """
        List all root node names.

        Returns:
            List of root node names
        """
        return [node.name for node in self.root_nodes]

    def validate_graph(
        self, validate_routing: bool = True, validate_types: bool = True
    ) -> Dict[str, Any]:
        """
        Validate the graph structure and routing constraints.

        Args:
            validate_routing: Whether to validate splitter-to-classifier routing
            validate_types: Whether to validate node types

        Returns:
            Dictionary containing validation results and statistics

        Raises:
            GraphValidationError: If validation fails
        """
        self.logger.info("Validating graph structure...")

        # Collect all nodes from root nodes
        all_nodes = []
        for root_node in self.root_nodes:
            all_nodes.extend(self._collect_all_nodes([root_node]))

        # Validate node types
        if validate_types:
            validate_node_types(all_nodes)

        # Validate splitter routing
        if validate_routing:
            validate_splitter_routing(all_nodes)

        # Get comprehensive validation stats
        stats = validate_graph_structure(all_nodes)

        self.logger.info("Graph validation completed successfully")
        return stats

    def validate_splitter_routing(self) -> None:
        """
        Validate that all splitter nodes only route to classifier nodes.

        Raises:
            GraphValidationError: If any splitter node routes to a non-classifier node
        """
        all_nodes = []
        for root_node in self.root_nodes:
            all_nodes.extend(self._collect_all_nodes([root_node]))

        validate_splitter_routing(all_nodes)

    def _collect_all_nodes(self, nodes: List[TreeNode]) -> List[TreeNode]:
        """Recursively collect all nodes in the graph."""
        all_nodes = []
        visited = set()

        def collect_node(node: TreeNode):
            if node.node_id in visited:
                return
            visited.add(node.node_id)
            all_nodes.append(node)

            for child in node.children:
                collect_node(child)

        for node in nodes:
            collect_node(node)

        return all_nodes

    def _call_splitter(
        self,
        user_input: str,
        debug: bool,
        context: Optional[IntentContext] = None,
        **splitter_kwargs,
    ) -> list:
        """
        Call the splitter function with appropriate parameters.

        Args:
            user_input: The input string to process
            debug: Whether to enable debug logging
            context: Optional context object (not passed to splitter)
            **splitter_kwargs: Additional arguments for the splitter

        Returns:
            List of intent chunks
        """
        result = self.splitter(user_input, debug, **splitter_kwargs)
        return list(result)  # Convert Sequence to list

    def _route_chunk_to_root_node(
        self, chunk: str, debug: bool = False
    ) -> Optional[TreeNode]:
        """
        Route a single chunk to the most appropriate root node.

        Args:
            chunk: The intent chunk to route
            debug: Whether to enable debug logging

        Returns:
            The root node to handle this chunk, or None if no match found
        """
        if not self.root_nodes:
            return None

        # Simple routing logic: try to find a root node that matches the chunk
        # This could be enhanced with more sophisticated matching
        chunk_lower = chunk.lower()

        for node in self.root_nodes:
            # Check if node name appears in the chunk
            if node.name.lower() in chunk_lower:
                if debug:
                    self.logger.info(
                        f"Routed chunk '{chunk}' to root node '{node.name}' by name match"
                    )
                return node

            # Check for keyword matches (could be enhanced)
            keywords = getattr(node, "keywords", [])
            for keyword in keywords:
                if keyword.lower() in chunk_lower:
                    if debug:
                        self.logger.info(
                            f"Routed chunk '{chunk}' to root node '{node.name}' by keyword '{keyword}'"
                        )
                    return node

        # If no specific match, return the first root node as fallback
        if debug:
            self.logger.info(
                f"No specific match for chunk '{chunk}', using first root node '{self.root_nodes[0].name}' as fallback"
            )
        return self.root_nodes[0] if self.root_nodes else None

    def route(
        self,
        user_input: str,
        context: Optional[IntentContext] = None,
        debug: bool = False,
        debug_context: Optional[bool] = None,
        context_trace: Optional[bool] = None,
        **splitter_kwargs,
    ) -> ExecutionResult:
        """
        Route user input through the graph with optional context support.

        Args:
            user_input: The input string to process
            context: Optional context object for state sharing
            debug: Whether to print debug information
            debug_context: Override graph-level debug_context setting
            context_trace: Override graph-level context_trace setting
            **splitter_kwargs: Additional arguments to pass to the splitter

        Returns:
            ExecutionResult containing aggregated results and errors from all matched taxonomies
        """
        # Use method parameters if provided, otherwise use graph-level settings
        debug_context_enabled = (
            debug_context if debug_context is not None else self.debug_context
        )
        context_trace_enabled = (
            context_trace if context_trace is not None else self.context_trace
        )

        if debug:
            self.logger.info(f"Processing input: {user_input}")
            if context:
                self.logger.info(f"Using context: {context}")
            if debug_context_enabled:
                self.logger.info("Context debugging enabled")
            if context_trace_enabled:
                self.logger.info("Context tracing enabled")

        # Split the input into chunks
        try:
            intent_chunks = self._call_splitter(
                user_input=user_input, debug=debug, **splitter_kwargs
            )

        except Exception as e:
            self.logger.error(f"Splitter error: {e}")
            return ExecutionResult(
                success=False,
                params=None,
                children_results=[],
                node_name="splitter",
                node_path=[],
                node_type=NodeType.SPLITTER,
                input=user_input,
                output=None,
                error=ExecutionError(
                    error_type="SplitterError",
                    message=str(e),
                    node_name="splitter",
                    node_path=[],
                ),
            )

        if debug:
            self.logger.info(f"Intent chunks: {intent_chunks}")

        # If no chunks were found, return error
        if not intent_chunks:
            if debug:
                self.logger.warning("No intent chunks found")
            return ExecutionResult(
                success=False,
                params=None,
                children_results=[],
                node_name="no_intent",
                node_path=[],
                node_type=NodeType.UNHANDLED_CHUNK,
                input=user_input,
                output=None,
                error=ExecutionError(
                    error_type="NoIntentFound",
                    message="No intent chunks found",
                    node_name="unhandled_chunk",
                    node_path=[],
                ),
            )

        # Route each chunk to an appropriate root node
        children_results = []
        all_errors = []
        all_outputs = []
        all_params = []

        # Use a queue to process chunks, with recursion limit
        chunks_to_process = list(intent_chunks)  # Copy the list
        processed_chunks: set = set()  # Track processed chunks to avoid infinite loops
        max_recursion_depth = 10  # Prevent infinite recursion

        while chunks_to_process and len(processed_chunks) < max_recursion_depth:
            chunk = chunks_to_process.pop(0)

            # Create a unique identifier for this chunk to avoid infinite loops
            # Use a robust hashing strategy that works for both hashable (e.g. str)
            # and unhashable (e.g. dict) chunk objects.
            # Converting to `repr` preserves enough uniqueness while ensuring the
            # object is hashable.
            chunk_id = hash(repr(chunk))
            if chunk_id in processed_chunks:
                continue  # Skip if we've already processed this exact chunk
            processed_chunks.add(chunk_id)

            # Handle both string and dict chunks
            if isinstance(chunk, str):
                chunk_text = chunk
            elif isinstance(chunk, dict) and "text" in chunk:
                chunk_text = chunk["text"]
            else:
                chunk_text = str(chunk)

            if debug:
                self.logger.info(f"Classifying chunk: '{chunk_text}'")
            classification = classify_intent_chunk(chunk, self.llm_config)
            action = classification.get("action")

            if action == IntentAction.HANDLE:
                # Route to root node as before
                root_node = self._route_chunk_to_root_node(chunk_text, debug)
                if root_node is None:
                    error_result = ExecutionResult(
                        success=False,
                        params=None,
                        children_results=[],
                        node_name="no_root_node",
                        node_path=[],
                        node_type=NodeType.UNKNOWN,
                        input=chunk_text,
                        output=None,
                        error=ExecutionError(
                            error_type="NoRootNodeFound",
                            message=f"No root node found for chunk: '{chunk_text}'",
                            node_name="no_root_node",
                            node_path=[],
                        ),
                    )
                    children_results.append(error_result)
                    all_errors.append(f"No root node found for chunk: '{chunk_text}'")
                    if debug:
                        self.logger.error(
                            f"No root node found for chunk: '{chunk_text}'"
                        )
                    continue
                try:
                    # Context debugging: capture state before execution
                    context_state_before = None
                    if debug_context_enabled and context:
                        context_state_before = self._capture_context_state(
                            context, f"before_{root_node.name}"
                        )

                    result = root_node.execute(chunk_text, context=context)

                    # Context debugging: capture state after execution
                    if debug_context_enabled and context:
                        context_state_after = self._capture_context_state(
                            context, f"after_{root_node.name}"
                        )
                        self._log_context_changes(
                            context_state_before,
                            context_state_after,
                            root_node.name,
                            debug,
                            context_trace_enabled,
                        )

                    if debug:
                        self.logger.info(
                            f"Root node '{root_node.name}' result: {result}"
                        )
                    children_results.append(result)
                    if result.success and result.output is not None:
                        all_outputs.append(result.output)
                    if result.params is not None:
                        all_params.append(result.params)
                    if result.error:
                        all_errors.append(
                            f"Root node '{root_node.name}': {result.error.message}"
                        )
                except Exception as e:
                    error_message = str(e)
                    error_type = type(e).__name__
                    error_result = ExecutionResult(
                        success=False,
                        params=None,
                        children_results=[],
                        node_name="unknown",
                        node_path=[],
                        node_type=NodeType.UNKNOWN,
                        input=chunk_text,
                        output=None,
                        error=ExecutionError(
                            error_type=error_type,
                            message=error_message,
                            node_name="unknown",
                            node_path=[],
                        ),
                    )
                    children_results.append(error_result)
                    all_errors.append(
                        f"Root node '{root_node.name}' failed: {error_message}"
                    )
                    if debug:
                        self.logger.error(f"Root node '{root_node.name}' failed: {e}")
            elif action == IntentAction.SPLIT:
                # Recursively split and route
                if debug:
                    self.logger.info(f"Recursively splitting chunk: '{chunk_text}'")
                sub_chunks = self._call_splitter(chunk_text, debug, **splitter_kwargs)
                # Add sub_chunks to the front of the queue for processing
                chunks_to_process = sub_chunks + chunks_to_process
            elif action == IntentAction.CLARIFY:
                # Stub: Add a result indicating clarification is needed
                error_result = ExecutionResult(
                    success=False,
                    params=None,
                    children_results=[],
                    node_name="clarify",
                    node_path=[],
                    node_type=NodeType.CLARIFY,
                    input=chunk_text,
                    output=None,
                    error=ExecutionError(
                        error_type="ClarificationNeeded",
                        message=f"Clarification needed for chunk: '{chunk_text}'",
                        node_name="clarify",
                        node_path=[],
                    ),
                )
                children_results.append(error_result)
                all_errors.append(f"Clarification needed for chunk: '{chunk_text}'")
                if debug:
                    self.logger.warning(
                        f"Clarification needed for chunk: '{chunk_text}'"
                    )
            elif action == IntentAction.REJECT:
                # Stub: Add a result indicating rejection
                error_result = ExecutionResult(
                    success=False,
                    params=None,
                    children_results=[],
                    node_name="reject",
                    node_path=[],
                    node_type=NodeType.UNKNOWN,
                    input=chunk_text,
                    output=None,
                    error=ExecutionError(
                        error_type="RejectedChunk",
                        message=f"Rejected chunk: '{chunk_text}'",
                        node_name="reject",
                        node_path=[],
                    ),
                )
                children_results.append(error_result)
                all_errors.append(f"Rejected chunk: '{chunk_text}'")
                if debug:
                    self.logger.warning(f"Rejected chunk: '{chunk_text}'")
            else:
                # Unknown action
                error_result = ExecutionResult(
                    success=False,
                    params=None,
                    children_results=[],
                    node_name="unknown_action",
                    node_path=[],
                    node_type=NodeType.UNKNOWN,
                    input=chunk_text,
                    output=None,
                    error=ExecutionError(
                        error_type="UnknownAction",
                        message=f"Unknown action for chunk: '{chunk_text}'",
                        node_name="unknown_action",
                        node_path=[],
                    ),
                )
                children_results.append(error_result)
                all_errors.append(f"Unknown action for chunk: '{chunk_text}'")
                if debug:
                    self.logger.error(f"Unknown action for chunk: '{chunk_text}'")

        # Check if we hit the recursion limit
        if len(processed_chunks) >= max_recursion_depth:
            error_result = ExecutionResult(
                success=False,
                params=None,
                children_results=[],
                node_name="recursion_limit",
                node_path=[],
                node_type=NodeType.UNKNOWN,
                input=user_input,
                output=None,
                error=ExecutionError(
                    error_type="RecursionLimitExceeded",
                    message=f"Recursion limit exceeded ({max_recursion_depth} chunks processed)",
                    node_name="recursion_limit",
                    node_path=[],
                ),
            )
            children_results.append(error_result)
            all_errors.append(
                f"Recursion limit exceeded ({max_recursion_depth} chunks processed)"
            )
            if debug:
                self.logger.error(
                    f"Recursion limit exceeded ({max_recursion_depth} chunks processed)"
                )

        # Determine overall success and create aggregated result
        overall_success = len(all_errors) == 0 and len(children_results) > 0

        # Aggregate outputs and params
        aggregated_output = (
            all_outputs
            if len(all_outputs) > 1
            else (all_outputs[0] if all_outputs else None)
        )
        aggregated_params = (
            all_params
            if len(all_params) > 1
            else (all_params[0] if all_params else None)
        )

        # Ensure params is a dict or None
        if aggregated_params is not None and not isinstance(aggregated_params, dict):
            aggregated_params = {"params": aggregated_params}

        # Create aggregated error if there are any errors
        aggregated_error = None
        if all_errors:
            aggregated_error = ExecutionError(
                error_type="AggregatedErrors",
                message="; ".join(all_errors),
                node_name="intent_graph",
                node_path=[],
            )

        # Create visualization if requested
        visualization_html = None
        if self.visualize:
            try:
                html_path = self._render_execution_graph(children_results, user_input)
                visualization_html = html_path
            except Exception as e:
                self.logger.error(f"Visualization failed: {e}")
                visualization_html = None

        # Add visualization to output if available
        if visualization_html:
            if aggregated_output is None:
                aggregated_output = {"visualization_html": visualization_html}
            elif isinstance(aggregated_output, dict):
                aggregated_output["visualization_html"] = visualization_html
            else:
                aggregated_output = {
                    "output": aggregated_output,
                    "visualization_html": visualization_html,
                }

        if debug:
            self.logger.info(f"Final aggregated result: {overall_success}")

        return ExecutionResult(
            success=overall_success,
            params=aggregated_params,
            children_results=children_results,
            node_name="intent_graph",
            node_path=[],
            node_type=NodeType.GRAPH,
            input=user_input,
            output=aggregated_output,
            error=aggregated_error,
        )

    def _render_execution_graph(
        self, children_results: list[ExecutionResult], user_input: str
    ) -> str:
        """
        Render the execution path as an interactive HTML graph and return the file path.
        """
        if not VIZ_AVAILABLE:
            raise ImportError(
                "networkx and pyvis are required for visualization. Please install with: uv pip install 'intent-kit[viz]'"
            )

        try:
            # Import here to ensure it's available
            from pyvis.network import Network

            # Build the graph from the execution path
            net = Network(height="600px", width="100%", directed=True, notebook=False)
            net.barnes_hut()
            execution_paths = []

            # Extract execution paths from all children results
            for result in children_results:
                # Add the current result to the path
                execution_paths.append(
                    {
                        "node_name": result.node_name,
                        "node_type": result.node_type,
                        "success": result.success,
                        "input": result.input,
                        "output": result.output,
                        "error": result.error,
                        "params": result.params,
                    }
                )

                # Add child results recursively
                for child_result in result.children_results:
                    child_paths = self._extract_execution_paths(child_result)
                    execution_paths.extend(child_paths)

            if not execution_paths:
                # fallback to errors
                execution_paths = []
                for result in children_results:
                    if result.error:
                        execution_paths.append(
                            {
                                "node_name": result.node_name,
                                "node_type": "error",
                                "error": result.error,
                            }
                        )

            # Add nodes and edges
            last_node_id = None
            for idx, node in enumerate(execution_paths):
                node_id = f"{node['node_name']}_{idx}"
                label = f"{node['node_name']}\n{node['node_type']}"
                if node.get("error"):
                    label += f"\nERROR: {node['error']}"
                elif node.get("output"):
                    label += f"\nOutput: {str(node['output'])[:40]}"
                # Color coding
                if node["node_type"] == "error":
                    color = "#ffcccc"  # red
                elif node["node_type"] == "classifier":
                    color = "#99ccff"  # blue
                elif node["node_type"] == "intent":
                    color = "#ccffcc"  # green
                else:
                    color = "#ccccff"  # fallback
                net.add_node(node_id, label=label, color=color)
                if last_node_id is not None:
                    net.add_edge(last_node_id, node_id)
                last_node_id = node_id
            if not execution_paths:
                net.add_node("no_path", label="No execution path", color="#cccccc")

            # Save to HTML file
            html_dir = os.path.join(os.getcwd(), "intentkit_graphs")
            os.makedirs(html_dir, exist_ok=True)
            html_path = os.path.join(
                html_dir, f"intent_graph_{abs(hash(user_input)) % 100000}.html"
            )

            # Generate HTML and write to file manually
            html_content = net.generate_html()
            with open(html_path, "w", encoding="utf-8") as f:
                f.write(html_content)

            return html_path
        except Exception as e:
            self.logger.error(f"Failed to render graph: {e}")
            raise

    def _extract_execution_paths(self, result: ExecutionResult) -> list:
        """
        Recursively extract execution paths from an ExecutionResult.

        Args:
            result: The ExecutionResult to extract paths from

        Returns:
            List of execution path nodes
        """
        paths = []

        # Add current node
        paths.append(
            {
                "node_name": result.node_name,
                "node_type": result.node_type,
                "success": result.success,
                "input": result.input,
                "output": result.output,
                "error": result.error,
                "params": result.params,
            }
        )

        # Recursively add children
        for child_result in result.children_results:
            child_paths = self._extract_execution_paths(child_result)
            paths.extend(child_paths)

        return paths

    def _capture_context_state(
        self, context: IntentContext, label: str
    ) -> Dict[str, Any]:
        """
        Capture the current state of the context for debugging without adding to history.

        Args:
            context: The context to capture
            label: Label for this state capture

        Returns:
            Dictionary containing context state
        """
        state: Dict[str, Any] = {
            "timestamp": datetime.now().isoformat(),
            "label": label,
            "session_id": context.session_id,
            "fields": {},
            "field_count": len(context.keys()),
            "history_count": len(context.get_history()),
            "error_count": context.error_count(),
        }

        # Capture all field values directly from internal state to avoid GET operations
        with context._global_lock:
            for key, field in context._fields.items():
                with field.lock:
                    value = field.value
                    metadata = {
                        "created_at": field.created_at,
                        "last_modified": field.last_modified,
                        "modified_by": field.modified_by,
                        "value": field.value,
                    }
                    state["fields"][key] = {"value": value, "metadata": metadata}

        return state

    def _log_context_changes(
        self,
        state_before: Optional[Dict[str, Any]],
        state_after: Optional[Dict[str, Any]],
        node_name: str,
        debug: bool,
        context_trace: bool,
    ) -> None:
        """
        Log context changes between before and after node execution.

        Args:
            state_before: Context state before execution
            state_after: Context state after execution
            node_name: Name of the node that was executed
            debug: Whether debug logging is enabled
            context_trace: Whether detailed context tracing is enabled
        """
        if not state_before or not state_after:
            return

        # Basic context change logging
        if debug:
            field_count_before = state_before.get("field_count", 0)
            field_count_after = state_after.get("field_count", 0)

            if field_count_after > field_count_before:
                new_fields = set(state_after["fields"].keys()) - set(
                    state_before["fields"].keys()
                )
                self.logger.info(
                    f"Node '{node_name}' added {len(new_fields)} new context fields: {new_fields}"
                )
            elif field_count_after < field_count_before:
                removed_fields = set(state_before["fields"].keys()) - set(
                    state_after["fields"].keys()
                )
                self.logger.info(
                    f"Node '{node_name}' removed {len(removed_fields)} context fields: {removed_fields}"
                )

        # Detailed context tracing
        if context_trace:
            self._log_detailed_context_trace(state_before, state_after, node_name)

    def _log_detailed_context_trace(
        self, state_before: Dict[str, Any], state_after: Dict[str, Any], node_name: str
    ) -> None:
        """
        Log detailed context trace with field-level changes.

        Args:
            state_before: Context state before execution
            state_after: Context state after execution
            node_name: Name of the node that was executed
        """
        fields_before = state_before.get("fields", {})
        fields_after = state_after.get("fields", {})

        # Find changed fields
        changed_fields = []
        for key in set(fields_before.keys()) | set(fields_after.keys()):
            value_before = (
                fields_before.get(key, {}).get("value")
                if key in fields_before
                else None
            )
            value_after = (
                fields_after.get(key, {}).get("value") if key in fields_after else None
            )

            if value_before != value_after:
                changed_fields.append(
                    {
                        "key": key,
                        "before": value_before,
                        "after": value_after,
                        "action": (
                            "modified"
                            if key in fields_before and key in fields_after
                            else "added" if key in fields_after else "removed"
                        ),
                    }
                )

        if changed_fields:
            self.logger.info(f"Context trace for node '{node_name}':")
            for change in changed_fields:
                self.logger.info(
                    f"  {change['action'].upper()}: {change['key']} = {change['after']} (was: {change['before']})"
                )
        else:
            self.logger.info(
                f"Context trace for node '{node_name}': No changes detected"
            )

__init__(root_nodes=None, splitter=None, visualize=False, llm_config=None, debug_context=False, context_trace=False)

Initialize the IntentGraph with root nodes.

Parameters:

Name Type Description Default
root_nodes Optional[List[TreeNode]]

List of root nodes that can handle intents

None
splitter Optional[SplitterFunction]

Function to use for splitting intents (default: pass-through splitter)

None
visualize bool

If True, render the final output to an interactive graph HTML file

False
llm_config Optional[dict]

LLM configuration for chunk classification (optional)

None
debug_context bool

If True, enable context debugging and state tracking

False
context_trace bool

If True, enable detailed context tracing with timestamps

False
Source code in intent_kit/graph/intent_graph.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    root_nodes: Optional[List[TreeNode]] = None,
    splitter: Optional[SplitterFunction] = None,
    visualize: bool = False,
    llm_config: Optional[dict] = None,
    debug_context: bool = False,
    context_trace: bool = False,
):
    """
    Initialize the IntentGraph with root nodes.

    Args:
        root_nodes: List of root nodes that can handle intents
        splitter: Function to use for splitting intents (default: pass-through splitter)
        visualize: If True, render the final output to an interactive graph HTML file
        llm_config: LLM configuration for chunk classification (optional)
        debug_context: If True, enable context debugging and state tracking
        context_trace: If True, enable detailed context tracing with timestamps
    """
    self.root_nodes: List[TreeNode] = root_nodes or []

    # Default to pass-through splitter if none provided
    if splitter is None:

        def pass_through_splitter(
            user_input: str, debug: bool = False
        ) -> List[IntentChunk]:
            """Pass-through splitter that doesn't split the input."""
            return [user_input]

        self.splitter: SplitterFunction = pass_through_splitter
    else:
        self.splitter = splitter

    self.logger = Logger(__name__)
    self.visualize = visualize
    self.llm_config = llm_config
    self.debug_context = debug_context
    self.context_trace = context_trace

add_root_node(root_node, validate=True)

Add a root node to the graph.

Parameters:

Name Type Description Default
root_node TreeNode

The root node to add

required
validate bool

Whether to validate the graph after adding the node

True
Source code in intent_kit/graph/intent_graph.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def add_root_node(self, root_node: TreeNode, validate: bool = True) -> None:
    """
    Add a root node to the graph.

    Args:
        root_node: The root node to add
        validate: Whether to validate the graph after adding the node
    """
    if not isinstance(root_node, TreeNode):
        raise ValueError("Root node must be a TreeNode")

    self.root_nodes.append(root_node)
    self.logger.info(f"Added root node: {root_node.name}")

    # Validate the graph after adding the node
    if validate:
        try:
            self.validate_graph()
            self.logger.info("Graph validation passed after adding root node")
        except GraphValidationError as e:
            self.logger.error(
                f"Graph validation failed after adding root node: {e.message}"
            )
            # Remove the node if validation fails and re-raise the error
            self.root_nodes.remove(root_node)
            raise e

list_root_nodes()

List all root node names.

Returns:

Type Description
List[str]

List of root node names

Source code in intent_kit/graph/intent_graph.py
131
132
133
134
135
136
137
138
def list_root_nodes(self) -> List[str]:
    """
    List all root node names.

    Returns:
        List of root node names
    """
    return [node.name for node in self.root_nodes]

remove_root_node(root_node)

Remove a root node from the graph.

Parameters:

Name Type Description Default
root_node TreeNode

The root node to remove

required
Source code in intent_kit/graph/intent_graph.py
118
119
120
121
122
123
124
125
126
127
128
129
def remove_root_node(self, root_node: TreeNode) -> None:
    """
    Remove a root node from the graph.

    Args:
        root_node: The root node to remove
    """
    if root_node in self.root_nodes:
        self.root_nodes.remove(root_node)
        self.logger.info(f"Removed root node: {root_node.name}")
    else:
        self.logger.warning(f"Root node '{root_node.name}' not found for removal")

route(user_input, context=None, debug=False, debug_context=None, context_trace=None, **splitter_kwargs)

Route user input through the graph with optional context support.

Parameters:

Name Type Description Default
user_input str

The input string to process

required
context Optional[IntentContext]

Optional context object for state sharing

None
debug bool

Whether to print debug information

False
debug_context Optional[bool]

Override graph-level debug_context setting

None
context_trace Optional[bool]

Override graph-level context_trace setting

None
**splitter_kwargs

Additional arguments to pass to the splitter

{}

Returns:

Type Description
ExecutionResult

ExecutionResult containing aggregated results and errors from all matched taxonomies

Source code in intent_kit/graph/intent_graph.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def route(
    self,
    user_input: str,
    context: Optional[IntentContext] = None,
    debug: bool = False,
    debug_context: Optional[bool] = None,
    context_trace: Optional[bool] = None,
    **splitter_kwargs,
) -> ExecutionResult:
    """
    Route user input through the graph with optional context support.

    Args:
        user_input: The input string to process
        context: Optional context object for state sharing
        debug: Whether to print debug information
        debug_context: Override graph-level debug_context setting
        context_trace: Override graph-level context_trace setting
        **splitter_kwargs: Additional arguments to pass to the splitter

    Returns:
        ExecutionResult containing aggregated results and errors from all matched taxonomies
    """
    # Use method parameters if provided, otherwise use graph-level settings
    debug_context_enabled = (
        debug_context if debug_context is not None else self.debug_context
    )
    context_trace_enabled = (
        context_trace if context_trace is not None else self.context_trace
    )

    if debug:
        self.logger.info(f"Processing input: {user_input}")
        if context:
            self.logger.info(f"Using context: {context}")
        if debug_context_enabled:
            self.logger.info("Context debugging enabled")
        if context_trace_enabled:
            self.logger.info("Context tracing enabled")

    # Split the input into chunks
    try:
        intent_chunks = self._call_splitter(
            user_input=user_input, debug=debug, **splitter_kwargs
        )

    except Exception as e:
        self.logger.error(f"Splitter error: {e}")
        return ExecutionResult(
            success=False,
            params=None,
            children_results=[],
            node_name="splitter",
            node_path=[],
            node_type=NodeType.SPLITTER,
            input=user_input,
            output=None,
            error=ExecutionError(
                error_type="SplitterError",
                message=str(e),
                node_name="splitter",
                node_path=[],
            ),
        )

    if debug:
        self.logger.info(f"Intent chunks: {intent_chunks}")

    # If no chunks were found, return error
    if not intent_chunks:
        if debug:
            self.logger.warning("No intent chunks found")
        return ExecutionResult(
            success=False,
            params=None,
            children_results=[],
            node_name="no_intent",
            node_path=[],
            node_type=NodeType.UNHANDLED_CHUNK,
            input=user_input,
            output=None,
            error=ExecutionError(
                error_type="NoIntentFound",
                message="No intent chunks found",
                node_name="unhandled_chunk",
                node_path=[],
            ),
        )

    # Route each chunk to an appropriate root node
    children_results = []
    all_errors = []
    all_outputs = []
    all_params = []

    # Use a queue to process chunks, with recursion limit
    chunks_to_process = list(intent_chunks)  # Copy the list
    processed_chunks: set = set()  # Track processed chunks to avoid infinite loops
    max_recursion_depth = 10  # Prevent infinite recursion

    while chunks_to_process and len(processed_chunks) < max_recursion_depth:
        chunk = chunks_to_process.pop(0)

        # Create a unique identifier for this chunk to avoid infinite loops
        # Use a robust hashing strategy that works for both hashable (e.g. str)
        # and unhashable (e.g. dict) chunk objects.
        # Converting to `repr` preserves enough uniqueness while ensuring the
        # object is hashable.
        chunk_id = hash(repr(chunk))
        if chunk_id in processed_chunks:
            continue  # Skip if we've already processed this exact chunk
        processed_chunks.add(chunk_id)

        # Handle both string and dict chunks
        if isinstance(chunk, str):
            chunk_text = chunk
        elif isinstance(chunk, dict) and "text" in chunk:
            chunk_text = chunk["text"]
        else:
            chunk_text = str(chunk)

        if debug:
            self.logger.info(f"Classifying chunk: '{chunk_text}'")
        classification = classify_intent_chunk(chunk, self.llm_config)
        action = classification.get("action")

        if action == IntentAction.HANDLE:
            # Route to root node as before
            root_node = self._route_chunk_to_root_node(chunk_text, debug)
            if root_node is None:
                error_result = ExecutionResult(
                    success=False,
                    params=None,
                    children_results=[],
                    node_name="no_root_node",
                    node_path=[],
                    node_type=NodeType.UNKNOWN,
                    input=chunk_text,
                    output=None,
                    error=ExecutionError(
                        error_type="NoRootNodeFound",
                        message=f"No root node found for chunk: '{chunk_text}'",
                        node_name="no_root_node",
                        node_path=[],
                    ),
                )
                children_results.append(error_result)
                all_errors.append(f"No root node found for chunk: '{chunk_text}'")
                if debug:
                    self.logger.error(
                        f"No root node found for chunk: '{chunk_text}'"
                    )
                continue
            try:
                # Context debugging: capture state before execution
                context_state_before = None
                if debug_context_enabled and context:
                    context_state_before = self._capture_context_state(
                        context, f"before_{root_node.name}"
                    )

                result = root_node.execute(chunk_text, context=context)

                # Context debugging: capture state after execution
                if debug_context_enabled and context:
                    context_state_after = self._capture_context_state(
                        context, f"after_{root_node.name}"
                    )
                    self._log_context_changes(
                        context_state_before,
                        context_state_after,
                        root_node.name,
                        debug,
                        context_trace_enabled,
                    )

                if debug:
                    self.logger.info(
                        f"Root node '{root_node.name}' result: {result}"
                    )
                children_results.append(result)
                if result.success and result.output is not None:
                    all_outputs.append(result.output)
                if result.params is not None:
                    all_params.append(result.params)
                if result.error:
                    all_errors.append(
                        f"Root node '{root_node.name}': {result.error.message}"
                    )
            except Exception as e:
                error_message = str(e)
                error_type = type(e).__name__
                error_result = ExecutionResult(
                    success=False,
                    params=None,
                    children_results=[],
                    node_name="unknown",
                    node_path=[],
                    node_type=NodeType.UNKNOWN,
                    input=chunk_text,
                    output=None,
                    error=ExecutionError(
                        error_type=error_type,
                        message=error_message,
                        node_name="unknown",
                        node_path=[],
                    ),
                )
                children_results.append(error_result)
                all_errors.append(
                    f"Root node '{root_node.name}' failed: {error_message}"
                )
                if debug:
                    self.logger.error(f"Root node '{root_node.name}' failed: {e}")
        elif action == IntentAction.SPLIT:
            # Recursively split and route
            if debug:
                self.logger.info(f"Recursively splitting chunk: '{chunk_text}'")
            sub_chunks = self._call_splitter(chunk_text, debug, **splitter_kwargs)
            # Add sub_chunks to the front of the queue for processing
            chunks_to_process = sub_chunks + chunks_to_process
        elif action == IntentAction.CLARIFY:
            # Stub: Add a result indicating clarification is needed
            error_result = ExecutionResult(
                success=False,
                params=None,
                children_results=[],
                node_name="clarify",
                node_path=[],
                node_type=NodeType.CLARIFY,
                input=chunk_text,
                output=None,
                error=ExecutionError(
                    error_type="ClarificationNeeded",
                    message=f"Clarification needed for chunk: '{chunk_text}'",
                    node_name="clarify",
                    node_path=[],
                ),
            )
            children_results.append(error_result)
            all_errors.append(f"Clarification needed for chunk: '{chunk_text}'")
            if debug:
                self.logger.warning(
                    f"Clarification needed for chunk: '{chunk_text}'"
                )
        elif action == IntentAction.REJECT:
            # Stub: Add a result indicating rejection
            error_result = ExecutionResult(
                success=False,
                params=None,
                children_results=[],
                node_name="reject",
                node_path=[],
                node_type=NodeType.UNKNOWN,
                input=chunk_text,
                output=None,
                error=ExecutionError(
                    error_type="RejectedChunk",
                    message=f"Rejected chunk: '{chunk_text}'",
                    node_name="reject",
                    node_path=[],
                ),
            )
            children_results.append(error_result)
            all_errors.append(f"Rejected chunk: '{chunk_text}'")
            if debug:
                self.logger.warning(f"Rejected chunk: '{chunk_text}'")
        else:
            # Unknown action
            error_result = ExecutionResult(
                success=False,
                params=None,
                children_results=[],
                node_name="unknown_action",
                node_path=[],
                node_type=NodeType.UNKNOWN,
                input=chunk_text,
                output=None,
                error=ExecutionError(
                    error_type="UnknownAction",
                    message=f"Unknown action for chunk: '{chunk_text}'",
                    node_name="unknown_action",
                    node_path=[],
                ),
            )
            children_results.append(error_result)
            all_errors.append(f"Unknown action for chunk: '{chunk_text}'")
            if debug:
                self.logger.error(f"Unknown action for chunk: '{chunk_text}'")

    # Check if we hit the recursion limit
    if len(processed_chunks) >= max_recursion_depth:
        error_result = ExecutionResult(
            success=False,
            params=None,
            children_results=[],
            node_name="recursion_limit",
            node_path=[],
            node_type=NodeType.UNKNOWN,
            input=user_input,
            output=None,
            error=ExecutionError(
                error_type="RecursionLimitExceeded",
                message=f"Recursion limit exceeded ({max_recursion_depth} chunks processed)",
                node_name="recursion_limit",
                node_path=[],
            ),
        )
        children_results.append(error_result)
        all_errors.append(
            f"Recursion limit exceeded ({max_recursion_depth} chunks processed)"
        )
        if debug:
            self.logger.error(
                f"Recursion limit exceeded ({max_recursion_depth} chunks processed)"
            )

    # Determine overall success and create aggregated result
    overall_success = len(all_errors) == 0 and len(children_results) > 0

    # Aggregate outputs and params
    aggregated_output = (
        all_outputs
        if len(all_outputs) > 1
        else (all_outputs[0] if all_outputs else None)
    )
    aggregated_params = (
        all_params
        if len(all_params) > 1
        else (all_params[0] if all_params else None)
    )

    # Ensure params is a dict or None
    if aggregated_params is not None and not isinstance(aggregated_params, dict):
        aggregated_params = {"params": aggregated_params}

    # Create aggregated error if there are any errors
    aggregated_error = None
    if all_errors:
        aggregated_error = ExecutionError(
            error_type="AggregatedErrors",
            message="; ".join(all_errors),
            node_name="intent_graph",
            node_path=[],
        )

    # Create visualization if requested
    visualization_html = None
    if self.visualize:
        try:
            html_path = self._render_execution_graph(children_results, user_input)
            visualization_html = html_path
        except Exception as e:
            self.logger.error(f"Visualization failed: {e}")
            visualization_html = None

    # Add visualization to output if available
    if visualization_html:
        if aggregated_output is None:
            aggregated_output = {"visualization_html": visualization_html}
        elif isinstance(aggregated_output, dict):
            aggregated_output["visualization_html"] = visualization_html
        else:
            aggregated_output = {
                "output": aggregated_output,
                "visualization_html": visualization_html,
            }

    if debug:
        self.logger.info(f"Final aggregated result: {overall_success}")

    return ExecutionResult(
        success=overall_success,
        params=aggregated_params,
        children_results=children_results,
        node_name="intent_graph",
        node_path=[],
        node_type=NodeType.GRAPH,
        input=user_input,
        output=aggregated_output,
        error=aggregated_error,
    )

validate_graph(validate_routing=True, validate_types=True)

Validate the graph structure and routing constraints.

Parameters:

Name Type Description Default
validate_routing bool

Whether to validate splitter-to-classifier routing

True
validate_types bool

Whether to validate node types

True

Returns:

Type Description
Dict[str, Any]

Dictionary containing validation results and statistics

Raises:

Type Description
GraphValidationError

If validation fails

Source code in intent_kit/graph/intent_graph.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def validate_graph(
    self, validate_routing: bool = True, validate_types: bool = True
) -> Dict[str, Any]:
    """
    Validate the graph structure and routing constraints.

    Args:
        validate_routing: Whether to validate splitter-to-classifier routing
        validate_types: Whether to validate node types

    Returns:
        Dictionary containing validation results and statistics

    Raises:
        GraphValidationError: If validation fails
    """
    self.logger.info("Validating graph structure...")

    # Collect all nodes from root nodes
    all_nodes = []
    for root_node in self.root_nodes:
        all_nodes.extend(self._collect_all_nodes([root_node]))

    # Validate node types
    if validate_types:
        validate_node_types(all_nodes)

    # Validate splitter routing
    if validate_routing:
        validate_splitter_routing(all_nodes)

    # Get comprehensive validation stats
    stats = validate_graph_structure(all_nodes)

    self.logger.info("Graph validation completed successfully")
    return stats

validate_splitter_routing()

Validate that all splitter nodes only route to classifier nodes.

Raises:

Type Description
GraphValidationError

If any splitter node routes to a non-classifier node

Source code in intent_kit/graph/intent_graph.py
177
178
179
180
181
182
183
184
185
186
187
188
def validate_splitter_routing(self) -> None:
    """
    Validate that all splitter nodes only route to classifier nodes.

    Raises:
        GraphValidationError: If any splitter node routes to a non-classifier node
    """
    all_nodes = []
    for root_node in self.root_nodes:
        all_nodes.extend(self._collect_all_nodes([root_node]))

    validate_splitter_routing(all_nodes)

IntentGraphBuilder

Builder class for creating IntentGraph instances with a fluent interface.

Source code in intent_kit/builder.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class IntentGraphBuilder:
    """Builder class for creating IntentGraph instances with a fluent interface."""

    def __init__(self):
        self._root_node: Optional[TreeNode] = None
        self._splitter = None
        self._debug_context = False
        self._context_trace = False

    def root(self, node: TreeNode) -> "IntentGraphBuilder":
        """Set the root node for the intent graph.

        Args:
            node: The root TreeNode to use for the graph

        Returns:
            Self for method chaining
        """
        self._root_node = node
        return self

    def splitter(self, splitter_func) -> "IntentGraphBuilder":
        """Set a custom splitter function for the intent graph.

        Args:
            splitter_func: Function to use for splitting intents

        Returns:
            Self for method chaining
        """
        self._splitter = splitter_func
        return self

    def build(self) -> IntentGraph:
        """Build and return the IntentGraph instance.

        Returns:
            Configured IntentGraph instance

        Raises:
            ValueError: If no root node has been set
        """
        if self._root_node is None:
            raise ValueError("No root node set. Call .root() before .build()")

        if self._splitter:
            graph = IntentGraph(
                splitter=self._splitter,
                debug_context=self._debug_context,
                context_trace=self._context_trace,
            )
        else:
            graph = IntentGraph(
                debug_context=self._debug_context, context_trace=self._context_trace
            )
        graph.add_root_node(self._root_node)
        return graph

    def debug_context(self, enabled: bool = True) -> "IntentGraphBuilder":
        """Enable context debugging for the intent graph.

        Args:
            enabled: Whether to enable context debugging

        Returns:
            Self for method chaining
        """
        self._debug_context = enabled
        return self

    def context_trace(self, enabled: bool = True) -> "IntentGraphBuilder":
        """Enable detailed context tracing for the intent graph.

        Args:
            enabled: Whether to enable context tracing

        Returns:
            Self for method chaining
        """
        self._context_trace = enabled
        return self

build()

Build and return the IntentGraph instance.

Returns:

Type Description
IntentGraph

Configured IntentGraph instance

Raises:

Type Description
ValueError

If no root node has been set

Source code in intent_kit/builder.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def build(self) -> IntentGraph:
    """Build and return the IntentGraph instance.

    Returns:
        Configured IntentGraph instance

    Raises:
        ValueError: If no root node has been set
    """
    if self._root_node is None:
        raise ValueError("No root node set. Call .root() before .build()")

    if self._splitter:
        graph = IntentGraph(
            splitter=self._splitter,
            debug_context=self._debug_context,
            context_trace=self._context_trace,
        )
    else:
        graph = IntentGraph(
            debug_context=self._debug_context, context_trace=self._context_trace
        )
    graph.add_root_node(self._root_node)
    return graph

context_trace(enabled=True)

Enable detailed context tracing for the intent graph.

Parameters:

Name Type Description Default
enabled bool

Whether to enable context tracing

True

Returns:

Type Description
IntentGraphBuilder

Self for method chaining

Source code in intent_kit/builder.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def context_trace(self, enabled: bool = True) -> "IntentGraphBuilder":
    """Enable detailed context tracing for the intent graph.

    Args:
        enabled: Whether to enable context tracing

    Returns:
        Self for method chaining
    """
    self._context_trace = enabled
    return self

debug_context(enabled=True)

Enable context debugging for the intent graph.

Parameters:

Name Type Description Default
enabled bool

Whether to enable context debugging

True

Returns:

Type Description
IntentGraphBuilder

Self for method chaining

Source code in intent_kit/builder.py
82
83
84
85
86
87
88
89
90
91
92
def debug_context(self, enabled: bool = True) -> "IntentGraphBuilder":
    """Enable context debugging for the intent graph.

    Args:
        enabled: Whether to enable context debugging

    Returns:
        Self for method chaining
    """
    self._debug_context = enabled
    return self

root(node)

Set the root node for the intent graph.

Parameters:

Name Type Description Default
node TreeNode

The root TreeNode to use for the graph

required

Returns:

Type Description
IntentGraphBuilder

Self for method chaining

Source code in intent_kit/builder.py
33
34
35
36
37
38
39
40
41
42
43
def root(self, node: TreeNode) -> "IntentGraphBuilder":
    """Set the root node for the intent graph.

    Args:
        node: The root TreeNode to use for the graph

    Returns:
        Self for method chaining
    """
    self._root_node = node
    return self

splitter(splitter_func)

Set a custom splitter function for the intent graph.

Parameters:

Name Type Description Default
splitter_func

Function to use for splitting intents

required

Returns:

Type Description
IntentGraphBuilder

Self for method chaining

Source code in intent_kit/builder.py
45
46
47
48
49
50
51
52
53
54
55
def splitter(self, splitter_func) -> "IntentGraphBuilder":
    """Set a custom splitter function for the intent graph.

    Args:
        splitter_func: Function to use for splitting intents

    Returns:
        Self for method chaining
    """
    self._splitter = splitter_func
    return self

LLMFactory

Factory for creating LLM clients.

Source code in intent_kit/services/llm_factory.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class LLMFactory:
    """Factory for creating LLM clients."""

    @staticmethod
    def create_client(llm_config: Dict[str, Any]):
        """
        Create an LLM client based on the configuration.

        Args:
            llm_config: Dictionary with keys:
                - provider: "openai", "anthropic", "google", "openrouter", "ollama"
                - api_key: API key for the provider (not required for ollama)
                - model: Model name (optional, uses defaults)
                - max_tokens: Maximum tokens (optional)
                - temperature: Temperature (optional)
                - base_url: Base URL for ollama (optional, defaults to localhost:11434)

        Returns:
            LLM client instance

        Raises:
            ValueError: If provider is not supported or config is invalid
        """
        if not llm_config:
            raise ValueError("LLM config cannot be empty")

        provider = llm_config.get("provider")
        api_key = llm_config.get("api_key")

        if not provider:
            raise ValueError("LLM config must include 'provider'")

        provider = provider.lower()

        # Handle Ollama separately since it doesn't require an API key
        if provider == "ollama":
            base_url = llm_config.get("base_url", "http://localhost:11434")
            return OllamaClient(base_url=base_url)

        # For other providers, API key is required
        if not api_key:
            raise ValueError(
                "LLM config must include 'api_key' for provider: {provider}"
            )

        if provider == "openai":
            return OpenAIClient(api_key=api_key)
        elif provider == "anthropic":
            return AnthropicClient(api_key=api_key)
        elif provider == "google":
            return GoogleClient(api_key=api_key)
        elif provider == "openrouter":
            return OpenRouterClient(api_key=api_key)
        else:
            raise ValueError(f"Unsupported LLM provider: {provider}")

    @staticmethod
    def generate_with_config(llm_config: Dict[str, Any], prompt: str) -> str:
        """
        Generate text using the specified LLM configuration.

        Args:
            llm_config: LLM configuration dictionary
            prompt: Text prompt to send to the LLM

        Returns:
            Generated text response
        """
        client = LLMFactory.create_client(llm_config)

        # Extract optional parameters
        model = llm_config.get("model")

        # For now, we'll use the default generate method
        # In the future, we can extend this to pass additional parameters
        if model:
            return client.generate(prompt, model=model)
        else:
            return client.generate(prompt)

create_client(llm_config) staticmethod

Create an LLM client based on the configuration.

Parameters:

Name Type Description Default
llm_config Dict[str, Any]

Dictionary with keys: - provider: "openai", "anthropic", "google", "openrouter", "ollama" - api_key: API key for the provider (not required for ollama) - model: Model name (optional, uses defaults) - max_tokens: Maximum tokens (optional) - temperature: Temperature (optional) - base_url: Base URL for ollama (optional, defaults to localhost:11434)

required

Returns:

Type Description

LLM client instance

Raises:

Type Description
ValueError

If provider is not supported or config is invalid

Source code in intent_kit/services/llm_factory.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@staticmethod
def create_client(llm_config: Dict[str, Any]):
    """
    Create an LLM client based on the configuration.

    Args:
        llm_config: Dictionary with keys:
            - provider: "openai", "anthropic", "google", "openrouter", "ollama"
            - api_key: API key for the provider (not required for ollama)
            - model: Model name (optional, uses defaults)
            - max_tokens: Maximum tokens (optional)
            - temperature: Temperature (optional)
            - base_url: Base URL for ollama (optional, defaults to localhost:11434)

    Returns:
        LLM client instance

    Raises:
        ValueError: If provider is not supported or config is invalid
    """
    if not llm_config:
        raise ValueError("LLM config cannot be empty")

    provider = llm_config.get("provider")
    api_key = llm_config.get("api_key")

    if not provider:
        raise ValueError("LLM config must include 'provider'")

    provider = provider.lower()

    # Handle Ollama separately since it doesn't require an API key
    if provider == "ollama":
        base_url = llm_config.get("base_url", "http://localhost:11434")
        return OllamaClient(base_url=base_url)

    # For other providers, API key is required
    if not api_key:
        raise ValueError(
            "LLM config must include 'api_key' for provider: {provider}"
        )

    if provider == "openai":
        return OpenAIClient(api_key=api_key)
    elif provider == "anthropic":
        return AnthropicClient(api_key=api_key)
    elif provider == "google":
        return GoogleClient(api_key=api_key)
    elif provider == "openrouter":
        return OpenRouterClient(api_key=api_key)
    else:
        raise ValueError(f"Unsupported LLM provider: {provider}")

generate_with_config(llm_config, prompt) staticmethod

Generate text using the specified LLM configuration.

Parameters:

Name Type Description Default
llm_config Dict[str, Any]

LLM configuration dictionary

required
prompt str

Text prompt to send to the LLM

required

Returns:

Type Description
str

Generated text response

Source code in intent_kit/services/llm_factory.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@staticmethod
def generate_with_config(llm_config: Dict[str, Any], prompt: str) -> str:
    """
    Generate text using the specified LLM configuration.

    Args:
        llm_config: LLM configuration dictionary
        prompt: Text prompt to send to the LLM

    Returns:
        Generated text response
    """
    client = LLMFactory.create_client(llm_config)

    # Extract optional parameters
    model = llm_config.get("model")

    # For now, we'll use the default generate method
    # In the future, we can extend this to pass additional parameters
    if model:
        return client.generate(prompt, model=model)
    else:
        return client.generate(prompt)

NodeType

Bases: Enum

Enumeration of valid node types in the intent tree.

Source code in intent_kit/node/enums.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class NodeType(Enum):
    """Enumeration of valid node types in the intent tree."""

    # Base node types
    UNKNOWN = "unknown"

    # Specialized node types
    HANDLER = "handler"
    CLASSIFIER = "classifier"
    SPLITTER = "splitter"
    CLARIFY = "clarify"
    GRAPH = "graph"

    # Special types for execution results
    UNHANDLED_CHUNK = "unhandled_chunk"

SplitterNode

Bases: TreeNode

Node that splits user input into multiple intent chunks.

Source code in intent_kit/splitters/node.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class SplitterNode(TreeNode):
    """Node that splits user input into multiple intent chunks."""

    def __init__(
        self,
        name: Optional[str],
        splitter_function,
        children: List["TreeNode"],
        description: str = "",
        parent: Optional["TreeNode"] = None,
        llm_client=None,
    ):
        super().__init__(
            name=name, description=description, children=children, parent=parent
        )
        self.splitter_function = splitter_function
        self.llm_client = llm_client

    @property
    def node_type(self) -> NodeType:
        """Get the type of this node."""
        return NodeType.SPLITTER

    def execute(
        self, user_input: str, context: Optional[IntentContext] = None
    ) -> ExecutionResult:
        try:
            intent_chunks = self.splitter_function(user_input, debug=False)
            if not intent_chunks:
                self.logger.warning(f"Splitter '{self.name}' found no intent chunks")
                return ExecutionResult(
                    success=False,
                    node_name=self.name,
                    node_path=self.get_path(),
                    node_type=NodeType.SPLITTER,
                    input=user_input,
                    output=None,
                    error=ExecutionError(
                        error_type="NoIntentChunksFound",
                        message="No intent chunks found after splitting",
                        node_name=self.name,
                        node_path=self.get_path(),
                    ),
                    params={"intent_chunks": []},
                    children_results=[],
                )
            self.logger.debug(
                f"Splitter '{self.name}' found {len(intent_chunks)} chunks: {intent_chunks}"
            )
            children_results = []
            all_outputs = []
            for chunk in intent_chunks:
                if isinstance(chunk, dict) and "chunk_text" in chunk:
                    chunk_text = str(chunk["chunk_text"])
                else:
                    chunk_text = str(chunk)
                handled = False
                for child in self.children:
                    try:
                        child_result = child.execute(chunk_text, context)
                        if child_result.success:
                            children_results.append(child_result)
                            all_outputs.append(child_result.output)
                            handled = True
                            break
                    except Exception as e:
                        self.logger.debug(
                            f"Child '{child.name}' failed to handle chunk '{chunk_text}': {e}"
                        )
                        continue
                if not handled:
                    error_result = ExecutionResult(
                        success=False,
                        node_name=f"unhandled_chunk_{chunk_text[:20]}",
                        node_path=self.get_path()
                        + [f"unhandled_chunk_{chunk_text[:20]}"],
                        node_type=NodeType.UNHANDLED_CHUNK,
                        input=chunk_text,
                        output=None,
                        error=ExecutionError(
                            error_type="UnhandledChunk",
                            message=f"No child node could handle chunk: '{chunk_text}'",
                            node_name=self.name,
                            node_path=self.get_path(),
                        ),
                        params={"chunk": chunk_text},
                        children_results=[],
                    )
                    children_results.append(error_result)
            successful_results = [r for r in children_results if r.success]
            overall_success = len(successful_results) > 0
            return ExecutionResult(
                success=overall_success,
                node_name=self.name,
                node_path=self.get_path(),
                node_type=NodeType.SPLITTER,
                input=user_input,
                output=all_outputs if all_outputs else None,
                error=None,
                params={
                    "intent_chunks": intent_chunks,
                    "chunks_processed": len(intent_chunks),
                    "chunks_handled": len(successful_results),
                },
                children_results=children_results,
            )
        except Exception as e:
            self.logger.error(f"Splitter execution error for '{self.name}': {e}")
            return ExecutionResult(
                success=False,
                node_name=self.name,
                node_path=self.get_path(),
                node_type=NodeType.SPLITTER,
                input=user_input,
                output=None,
                error=ExecutionError.from_exception(
                    e, self.name, self.get_path(), node_id=self.node_id
                ),
                params=None,
                children_results=[],
            )

node_type property

Get the type of this node.

TreeNode

Bases: Node, ABC

Base class for all nodes in the intent tree.

Source code in intent_kit/node/base.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class TreeNode(Node, ABC):
    """Base class for all nodes in the intent tree."""

    def __init__(
        self,
        *,
        name: Optional[str] = None,
        description: str,
        children: Optional[List["TreeNode"]] = None,
        parent: Optional["TreeNode"] = None,
    ):
        super().__init__(name=name, parent=parent)
        self.logger = Logger(name or "unnamed_node")
        self.description = description
        self.children: List["TreeNode"] = list(children) if children else []
        for child in self.children:
            child.parent = self

    @property
    def node_type(self) -> NodeType:
        """Get the type of this node. Override in subclasses."""
        return NodeType.UNKNOWN

    @abstractmethod
    def execute(
        self, user_input: str, context: Optional[IntentContext] = None
    ) -> ExecutionResult:
        """Execute the node with the given user input and optional context."""
        pass

node_type property

Get the type of this node. Override in subclasses.

execute(user_input, context=None) abstractmethod

Execute the node with the given user input and optional context.

Source code in intent_kit/node/base.py
68
69
70
71
72
73
@abstractmethod
def execute(
    self, user_input: str, context: Optional[IntentContext] = None
) -> ExecutionResult:
    """Execute the node with the given user input and optional context."""
    pass

create_intent_graph(root_node)

Create an IntentGraph with the given root node.

Parameters:

Name Type Description Default
root_node TreeNode

The root TreeNode for the graph

required

Returns:

Type Description
IntentGraph

Configured IntentGraph instance

Source code in intent_kit/builder.py
374
375
376
377
378
379
380
381
382
383
def create_intent_graph(root_node: TreeNode) -> IntentGraph:
    """Create an IntentGraph with the given root node.

    Args:
        root_node: The root TreeNode for the graph

    Returns:
        Configured IntentGraph instance
    """
    return IntentGraphBuilder().root(root_node).build()

create_llm_arg_extractor(llm_config, extraction_prompt, param_schema)

Create an LLM-powered argument extractor function.

Parameters:

Name Type Description Default
llm_config Dict[str, Any]

LLM configuration dictionary

required
extraction_prompt str

Prompt template for argument extraction

required
param_schema Dict[str, Any]

Parameter schema defining expected parameters

required

Returns:

Type Description
Callable[[str, Optional[Dict[str, Any]]], Dict[str, Any]]

Argument extractor function that can be used with HandlerNode

Source code in intent_kit/classifiers/llm_classifier.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def create_llm_arg_extractor(
    llm_config: Dict[str, Any], extraction_prompt: str, param_schema: Dict[str, Any]
) -> Callable[[str, Optional[Dict[str, Any]]], Dict[str, Any]]:
    """
    Create an LLM-powered argument extractor function.

    Args:
        llm_config: LLM configuration dictionary
        extraction_prompt: Prompt template for argument extraction
        param_schema: Parameter schema defining expected parameters

    Returns:
        Argument extractor function that can be used with HandlerNode
    """

    def llm_arg_extractor(
        user_input: str, context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """
        LLM-powered argument extractor that extracts parameters from user input.

        Args:
            user_input: User's input text
            context: Optional context information to include in the prompt

        Returns:
            Dictionary of extracted parameters
        """
        try:
            # Build context information for the prompt
            context_info = ""
            if context:
                context_info = "\n\nAvailable Context Information:\n"
                for key, value in context.items():
                    context_info += f"- {key}: {value}\n"
                context_info += "\nUse this context information to help extract more accurate parameters."

            # Build the extraction prompt
            param_descriptions = "\n".join(
                [
                    f"- {param_name}: {param_type.__name__}"
                    for param_name, param_type in param_schema.items()
                ]
            )

            prompt = extraction_prompt.format(
                user_input=user_input,
                param_descriptions=param_descriptions,
                param_names=", ".join(param_schema.keys()),
                context_info=context_info,
            )

            # Get LLM response
            # Obfuscate API key in debug log
            safe_config = llm_config.copy()
            if "api_key" in safe_config:
                safe_config["api_key"] = "***OBFUSCATED***"
            logger.debug(f"LLM arg extractor config: {safe_config}")
            logger.debug(f"LLM arg extractor prompt: {prompt}")
            response = LLMFactory.generate_with_config(llm_config, prompt)

            # Parse the response to extract parameters
            # For now, we'll use a simple approach - in the future this could be JSON parsing
            extracted_params = {}

            # Simple parsing: look for "param_name: value" patterns
            lines = response.strip().split("\n")
            for line in lines:
                line = line.strip()
                if ":" in line:
                    key, value = line.split(":", 1)
                    key = key.strip()
                    value = value.strip()
                    if key in param_schema:
                        extracted_params[key] = value

            logger.debug(f"LLM arg extractor extracted: {extracted_params}")
            return extracted_params

        except Exception as e:
            logger.error(f"LLM arg extractor error: {str(e)}")
            return {}

    return llm_arg_extractor

create_llm_classifier(llm_config, classification_prompt, node_descriptions)

Create an LLM-powered classifier function.

Parameters:

Name Type Description Default
llm_config Dict[str, Any]

LLM configuration dictionary

required
classification_prompt str

Prompt template for classification

required
node_descriptions List[str]

List of descriptions for each child node

required

Returns:

Type Description
Callable[[str, List[TreeNode], Optional[Dict[str, Any]]], Optional[TreeNode]]

Classifier function that can be used with ClassifierNode

Source code in intent_kit/classifiers/llm_classifier.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def create_llm_classifier(
    llm_config: Dict[str, Any], classification_prompt: str, node_descriptions: List[str]
) -> Callable[[str, List[TreeNode], Optional[Dict[str, Any]]], Optional[TreeNode]]:
    """
    Create an LLM-powered classifier function.

    Args:
        llm_config: LLM configuration dictionary
        classification_prompt: Prompt template for classification
        node_descriptions: List of descriptions for each child node

    Returns:
        Classifier function that can be used with ClassifierNode
    """

    def llm_classifier(
        user_input: str,
        children: List[TreeNode],
        context: Optional[Dict[str, Any]] = None,
    ) -> Optional[TreeNode]:
        """
        LLM-powered classifier that selects the most appropriate child node.

        Args:
            user_input: User's input text
            children: List of available child nodes
            context: Optional context information to include in the prompt

        Returns:
            Selected child node or None if no match
        """
        try:
            # Build context information for the prompt
            context_info = ""
            if context:
                context_info = "\n\nAvailable Context Information:\n"
                for key, value in context.items():
                    context_info += f"- {key}: {value}\n"
                context_info += "\nUse this context information to make better classification decisions."

            # Build the classification prompt
            prompt = classification_prompt.format(
                user_input=user_input,
                node_descriptions="\n".join(
                    [
                        f"{i+1}. {child.name}: {child.description}"
                        for i, child in enumerate(children)
                    ]
                ),
                num_nodes=len(children),
                context_info=context_info,
            )

            # Get LLM response
            response = LLMFactory.generate_with_config(llm_config, prompt)

            # Parse the response to get the selected node index
            # Expect response to be a number (1-based index)
            try:
                # Try to extract just the number from the response
                response_text = response.strip()

                # Look for patterns like "Your choice (number only): 3" or "The choice is: 3"
                # Make patterns more specific to avoid matching context numbers like years/timestamps
                number_patterns = [
                    # Match 1-2 digit numbers after "choice"
                    r"choice.*?(\d{1,2})",
                    # Match 1-2 digit numbers after "answer"
                    r"answer.*?(\d{1,2})",
                    # Match 1-2 digit numbers after "number"
                    r"number.*?(\d{1,2})",
                    # Match 1-2 digit numbers after "select"
                    r"select.*?(\d{1,2})",
                    # Match 1-2 digit numbers after "option"
                    r"option.*?(\d{1,2})",
                    r"^(\d{1,2})$",  # Match standalone 1-2 digit numbers
                    # Match 1-2 digit numbers with optional whitespace
                    r"^(\d{1,2})\s*$",
                ]

                selected_index = None
                for pattern in number_patterns:
                    match = re.search(
                        pattern, response_text, re.IGNORECASE | re.MULTILINE
                    )
                    if match:
                        # Convert to 0-based
                        selected_index = int(match.group(1)) - 1
                        break

                # If no pattern matched, try to parse the entire response as a number
                if selected_index is None:
                    # Clean up the response text - remove markdown formatting, asterisks, etc.
                    cleaned_text = re.sub(r"[^\d]", "", response_text)
                    if cleaned_text:
                        # Only take the first 1-2 digits to avoid long numbers like years
                        first_digits = cleaned_text[:2]
                        if first_digits.isdigit():
                            # Convert to 0-based
                            selected_index = int(first_digits) - 1

                if selected_index is not None and 0 <= selected_index < len(children):
                    return children[selected_index]
                else:
                    logger.warning(
                        f"LLM returned invalid index: {selected_index} (valid range: 0-{len(children)-1})"
                    )
                    logger.warning(
                        f"Available children: {[child.name for child in children]}"
                    )
                    logger.warning(f"Raw LLM response: {response_text}")
                    return None
            except ValueError as e:
                logger.warning(
                    f"LLM response could not be parsed as integer: {response}"
                )
                logger.warning(f"Parse error: {e}")
                return None

        except Exception as e:
            logger.error(f"LLM classifier error: {str(e)}")
            return None

    return llm_classifier

get_context_dependencies(graph)

Analyze the full dependency map for all nodes in a graph.

Parameters:

Name Type Description Default
graph Any

IntentGraph instance to analyze

required

Returns:

Type Description
Dict[str, ContextDependencies]

Dictionary mapping node names to their context dependencies

Source code in intent_kit/context/debug.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_context_dependencies(graph: Any) -> Dict[str, ContextDependencies]:
    """
    Analyze the full dependency map for all nodes in a graph.

    Args:
        graph: IntentGraph instance to analyze

    Returns:
        Dictionary mapping node names to their context dependencies
    """
    dependencies = {}

    # Collect all nodes from root nodes
    all_nodes = []
    for root_node in graph.root_nodes:
        all_nodes.extend(_collect_all_nodes([root_node]))

    # Analyze dependencies for each node
    for node in all_nodes:
        node_deps = _analyze_node_dependencies(node)
        if node_deps:
            dependencies[node.name] = node_deps

    return dependencies

handler(*, name, description, handler_func, param_schema, llm_config=None, extraction_prompt=None, context_inputs=None, context_outputs=None, input_validator=None, output_validator=None, remediation_strategies=None)

Create a handler node with automatic argument extraction.

Parameters:

Name Type Description Default
name str

Name of the handler node

required
description str

Description of what this handler does

required
handler_func Callable[..., Any]

Function to execute when this handler is triggered

required
param_schema Dict[str, Type]

Dictionary mapping parameter names to their types

required
llm_config Optional[Dict[str, Any]]

Optional LLM configuration for LLM-based argument extraction. If not provided, uses a simple rule-based extractor.

None
extraction_prompt Optional[str]

Optional custom prompt for LLM argument extraction

None
context_inputs Optional[Set[str]]

Optional set of context keys this handler reads from

None
context_outputs Optional[Set[str]]

Optional set of context keys this handler writes to

None
input_validator Optional[Callable[[Dict[str, Any]], bool]]

Optional function to validate extracted parameters

None
output_validator Optional[Callable[[Any], bool]]

Optional function to validate handler output

None

Returns:

Type Description
TreeNode

Configured HandlerNode

Example

greet_handler = handler( ... name="greet", ... description="Greet the user", ... handler_func=lambda name: f"Hello {name}!", ... param_schema={"name": str}, ... llm_config=LLM_CONFIG ... )

Source code in intent_kit/builder.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def handler(
    *,
    name: str,
    description: str,
    handler_func: Callable[..., Any],
    param_schema: Dict[str, Type],
    llm_config: Optional[Dict[str, Any]] = None,
    extraction_prompt: Optional[str] = None,
    context_inputs: Optional[Set[str]] = None,
    context_outputs: Optional[Set[str]] = None,
    input_validator: Optional[Callable[[Dict[str, Any]], bool]] = None,
    output_validator: Optional[Callable[[Any], bool]] = None,
    remediation_strategies: Optional[List[Union[str, "RemediationStrategy"]]] = None,
) -> TreeNode:
    """Create a handler node with automatic argument extraction.

    Args:
        name: Name of the handler node
        description: Description of what this handler does
        handler_func: Function to execute when this handler is triggered
        param_schema: Dictionary mapping parameter names to their types
        llm_config: Optional LLM configuration for LLM-based argument extraction.
                   If not provided, uses a simple rule-based extractor.
        extraction_prompt: Optional custom prompt for LLM argument extraction
        context_inputs: Optional set of context keys this handler reads from
        context_outputs: Optional set of context keys this handler writes to
        input_validator: Optional function to validate extracted parameters
        output_validator: Optional function to validate handler output

    Returns:
        Configured HandlerNode

    Example:
        >>> greet_handler = handler(
        ...     name="greet",
        ...     description="Greet the user",
        ...     handler_func=lambda name: f"Hello {name}!",
        ...     param_schema={"name": str},
        ...     llm_config=LLM_CONFIG
        ... )
    """
    # Create argument extractor based on configuration
    if llm_config:
        # Use LLM-based extraction
        if not extraction_prompt:
            extraction_prompt = get_default_extraction_prompt()

        arg_extractor = create_llm_arg_extractor(
            llm_config, extraction_prompt, param_schema
        )
    else:
        # Use simple rule-based extraction as fallback
        def simple_arg_extractor(
            text: str, context: Optional[Dict[str, Any]] = None
        ) -> Dict[str, Any]:
            """Simple rule-based argument extractor."""
            extracted = {}

            # For each parameter, try to extract it using simple rules
            for param_name, param_type in param_schema.items():
                if isinstance(param_type, type) and param_type is str:
                    # For string parameters, try to find relevant text
                    if param_name.lower() in ["name", "location", "operation"]:
                        # Extract the last word as a simple heuristic
                        words = text.split()
                        if words:
                            extracted[param_name] = words[-1]
                    else:
                        # Default: use the entire text for string params
                        extracted[param_name] = text.strip()
                elif isinstance(param_type, type) and param_type in [int, float]:
                    # For numeric parameters, try to find numbers in text
                    import re

                    numbers = re.findall(r"\d+(?:\.\d+)?", text)
                    if numbers:
                        try:
                            extracted[param_name] = param_type(numbers[0])
                        except (ValueError, IndexError):
                            # Use default values for common parameters
                            if param_name in ["a", "first"]:
                                extracted[param_name] = param_type(10)
                            elif param_name in ["b", "second"]:
                                extracted[param_name] = param_type(5)
                            else:
                                extracted[param_name] = param_type(0)
                    else:
                        # Use default values
                        if param_name in ["a", "first"]:
                            extracted[param_name] = param_type(10)
                        elif param_name in ["b", "second"]:
                            extracted[param_name] = param_type(5)
                        else:
                            extracted[param_name] = param_type(0)
                else:
                    # For other types, use a default value
                    if isinstance(param_type, type) and param_type is bool:
                        extracted[param_name] = True  # type: ignore
                    else:
                        extracted[param_name] = None  # type: ignore

            return extracted

        arg_extractor = simple_arg_extractor

    return HandlerNode(
        name=name,
        param_schema=param_schema,
        handler=handler_func,
        arg_extractor=arg_extractor,
        context_inputs=context_inputs,
        context_outputs=context_outputs,
        input_validator=input_validator,
        output_validator=output_validator,
        description=description,
        remediation_strategies=remediation_strategies,
    )

keyword_classifier(user_input, children, context=None)

A simple classifier that selects the first child whose name appears in the user input.

Parameters:

Name Type Description Default
user_input str

The input string to process

required
children list[TreeNode]

List of possible child nodes

required
context Optional[Dict[str, Any]]

Optional context dictionary (unused in this classifier)

None

Returns:

Type Description
Optional[TreeNode]

The first matching child node, or None if no match is found

Source code in intent_kit/classifiers/keyword.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def keyword_classifier(
    user_input: str, children: list[TreeNode], context: Optional[Dict[str, Any]] = None
) -> Optional[TreeNode]:
    """
    A simple classifier that selects the first child whose name appears in the user input.

    Args:
        user_input: The input string to process
        children: List of possible child nodes
        context: Optional context dictionary (unused in this classifier)

    Returns:
        The first matching child node, or None if no match is found
    """
    user_input_lower = user_input.lower()
    for child in children:
        if child.name.lower() in user_input_lower:
            return child
    return None

llm_classifier(*, name, children, llm_config, classification_prompt=None, description='', remediation_strategies=None)

Create an LLM-powered classifier node with auto-wired children descriptions.

Parameters:

Name Type Description Default
name str

Name of the classifier node

required
children List[TreeNode]

List of child nodes to classify between

required
llm_config Dict[str, Any]

LLM configuration for classification

required
classification_prompt Optional[str]

Optional custom classification prompt

None
description str

Optional description of the classifier

''

Returns:

Type Description
TreeNode

Configured ClassifierNode with auto-wired children descriptions

Example

classifier = llm_classifier( ... name="root", ... children=[greet_handler, calc_handler, weather_handler], ... llm_config=LLM_CONFIG ... )

Source code in intent_kit/builder.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def llm_classifier(
    *,
    name: str,
    children: List[TreeNode],
    llm_config: Dict[str, Any],
    classification_prompt: Optional[str] = None,
    description: str = "",
    remediation_strategies: Optional[List[Union[str, "RemediationStrategy"]]] = None,
) -> TreeNode:
    """Create an LLM-powered classifier node with auto-wired children descriptions.

    Args:
        name: Name of the classifier node
        children: List of child nodes to classify between
        llm_config: LLM configuration for classification
        classification_prompt: Optional custom classification prompt
        description: Optional description of the classifier

    Returns:
        Configured ClassifierNode with auto-wired children descriptions

    Example:
        >>> classifier = llm_classifier(
        ...     name="root",
        ...     children=[greet_handler, calc_handler, weather_handler],
        ...     llm_config=LLM_CONFIG
        ... )
    """
    if not children:
        raise ValueError("llm_classifier requires at least one child node")

    # Auto-wire children descriptions for the classifier
    node_descriptions = []
    for child in children:
        if hasattr(child, "description") and child.description:
            node_descriptions.append(f"{child.name}: {child.description}")
        else:
            # Use name as fallback if no description
            node_descriptions.append(child.name)
            logger.warning(
                f"Child node '{child.name}' has no description, using name as fallback"
            )

    if not classification_prompt:
        classification_prompt = get_default_classification_prompt()

    classifier = create_llm_classifier(
        llm_config, classification_prompt, node_descriptions
    )

    classifier_node = ClassifierNode(
        name=name,
        classifier=classifier,
        children=children,
        description=description,
        remediation_strategies=remediation_strategies,
    )

    # Set parent reference for all children to this classifier node
    for child in children:
        child.parent = classifier_node

    return classifier_node

llm_splitter_node(*, name, children, llm_config, description='')

Create an LLM-powered splitter node for multi-intent handling.

Parameters:

Name Type Description Default
name str

Name of the splitter node

required
children List[TreeNode]

List of child nodes to route to

required
llm_config Dict[str, Any]

LLM configuration for splitting

required
description str

Optional description of the splitter

''

Returns:

Type Description
TreeNode

Configured SplitterNode with LLM-powered splitting

Example

splitter = llm_splitter_node( ... name="multi_intent_splitter", ... children=[classifier_node], ... llm_config=LLM_CONFIG ... )

Source code in intent_kit/builder.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def llm_splitter_node(
    *,
    name: str,
    children: List[TreeNode],
    llm_config: Dict[str, Any],
    description: str = "",
) -> TreeNode:
    """Create an LLM-powered splitter node for multi-intent handling.

    Args:
        name: Name of the splitter node
        children: List of child nodes to route to
        llm_config: LLM configuration for splitting
        description: Optional description of the splitter

    Returns:
        Configured SplitterNode with LLM-powered splitting

    Example:
        >>> splitter = llm_splitter_node(
        ...     name="multi_intent_splitter",
        ...     children=[classifier_node],
        ...     llm_config=LLM_CONFIG
        ... )
    """

    # Create a wrapper function that provides the LLM client to llm_splitter
    def llm_splitter_wrapper(
        user_input: str, debug: bool = False
    ) -> Sequence[IntentChunk]:
        # Extract LLM client from config
        llm_client = llm_config.get("llm_client")
        return llm_splitter(user_input, debug, llm_client)

    splitter_node = SplitterNode(
        name=name,
        splitter_function=llm_splitter_wrapper,
        children=children,
        description=description,
        llm_client=llm_config.get("llm_client"),
    )

    # Set parent reference for all children to this splitter node
    for child in children:
        child.parent = splitter_node

    return splitter_node

rule_splitter_node(*, name, children, description='')

Create a rule-based splitter node for multi-intent handling.

Parameters:

Name Type Description Default
name str

Name of the splitter node

required
children List[TreeNode]

List of child nodes to route to

required
description str

Optional description of the splitter

''

Returns:

Type Description
TreeNode

Configured SplitterNode with rule-based splitting

Example

splitter = rule_splitter_node( ... name="rule_based_splitter", ... children=[classifier_node], ... )

Source code in intent_kit/builder.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def rule_splitter_node(
    *, name: str, children: List[TreeNode], description: str = ""
) -> TreeNode:
    """Create a rule-based splitter node for multi-intent handling.

    Args:
        name: Name of the splitter node
        children: List of child nodes to route to
        description: Optional description of the splitter

    Returns:
        Configured SplitterNode with rule-based splitting

    Example:
        >>> splitter = rule_splitter_node(
        ...     name="rule_based_splitter",
        ...     children=[classifier_node],
        ... )
    """
    splitter_node = SplitterNode(
        name=name,
        splitter_function=rule_splitter,
        children=children,
        description=description,
    )

    # Set parent reference for all children to this splitter node
    for child in children:
        child.parent = splitter_node

    return splitter_node

trace_context_execution(graph, user_input, context, output_format='console')

Generate a detailed execution trace with context state changes.

Parameters:

Name Type Description Default
graph Any

IntentGraph instance

required
user_input str

The user input that was processed

required
context IntentContext

Context object with execution history

required
output_format str

Output format ("console", "json")

'console'

Returns:

Type Description
str

Formatted execution trace

Source code in intent_kit/context/debug.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def trace_context_execution(
    graph: Any, user_input: str, context: IntentContext, output_format: str = "console"
) -> str:
    """
    Generate a detailed execution trace with context state changes.

    Args:
        graph: IntentGraph instance
        user_input: The user input that was processed
        context: Context object with execution history
        output_format: Output format ("console", "json")

    Returns:
        Formatted execution trace
    """
    # Capture history BEFORE we start reading context to avoid feedback loop
    history_before_debug: List[ContextHistoryEntry] = context.get_history()

    # Capture context state without adding to history
    context_state = _capture_full_context_state(context)

    # Analyze history to get operation counts
    set_ops = sum(
        1
        for entry in history_before_debug
        if hasattr(entry, "action") and entry.action == "set"
    )
    get_ops = sum(
        1
        for entry in history_before_debug
        if hasattr(entry, "action") and entry.action == "get"
    )
    delete_ops = sum(
        1
        for entry in history_before_debug
        if hasattr(entry, "action") and entry.action == "delete"
    )

    # Cast to satisfy mypy
    cast_dict = cast(Dict[str, Any], context_state["history_summary"])
    cast_dict.update(
        {
            "total_entries": len(history_before_debug),
            "set_operations": set_ops,
            "get_operations": get_ops,
            "delete_operations": delete_ops,
        }
    )

    trace_data = {
        "timestamp": datetime.now().isoformat(),
        "user_input": user_input,
        "session_id": context.session_id,
        "execution_summary": {
            "total_fields": len(context.keys()),
            "history_entries": len(history_before_debug),
            "error_count": context.error_count(),
        },
        "context_state": context_state,
        "history": _format_context_history(history_before_debug),
    }

    if output_format == "json":
        json_str = json.dumps(trace_data, indent=2, default=str)
        return json_str
    else:  # console format
        return _format_console_trace(trace_data)

validate_context_flow(graph, context)

Validate the context flow for a graph and context.

Source code in intent_kit/context/debug.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def validate_context_flow(graph: Any, context: IntentContext) -> Dict[str, Any]:
    """
    Validate the context flow for a graph and context.
    """
    dependencies = get_context_dependencies(graph)
    validation_results: Dict[str, Any] = {
        "valid": True,
        "missing_dependencies": {},
        "available_fields": set(context.keys()),
        "total_nodes": len(dependencies),
        "nodes_with_dependencies": 0,
        "warnings": [],
    }

    for node_name, deps in dependencies.items():
        validation = _validate_node_dependencies(deps, context)
        if not validation["valid"]:
            validation_results["valid"] = False
            validation_results["missing_dependencies"][node_name] = validation[
                "missing_inputs"
            ]

        if deps.inputs or deps.outputs:
            validation_results["nodes_with_dependencies"] += 1

    return validation_results