diff --git a/api/core/workflow/README.md b/api/core/workflow/README.md new file mode 100644 index 0000000000..53e910e7b6 --- /dev/null +++ b/api/core/workflow/README.md @@ -0,0 +1,114 @@ +# Workflow + +## Project Overview + +This is the workflow graph engine module of Dify, implementing a queue-based distributed workflow execution system. The engine handles agentic AI workflows with support for parallel execution, node iteration, conditional logic, and external command control. + +## Architecture + +### Core Components + +The graph engine follows a layered architecture with strict dependency rules: + +1. **Graph Engine** (`graph_engine/`) - Orchestrates workflow execution + - **Manager** - External control interface for stop/pause/resume commands + - **Worker** - Node execution runtime + - **Command Processing** - Handles control commands (abort, pause, resume) + - **Event Management** - Event propagation and layer notifications + - **Graph Traversal** - Edge processing and skip propagation + - **Response Coordinator** - Path tracking and session management + - **Layers** - Pluggable middleware (debug logging, execution limits) + - **Command Channels** - Communication channels (InMemory, Redis) + +2. **Graph** (`graph/`) - Graph structure and runtime state + - **Graph Template** - Workflow definition + - **Edge** - Node connections with conditions + - **Runtime State Protocol** - State management interface + +3. **Nodes** (`nodes/`) - Node implementations + - **Base** - Abstract node classes and variable parsing + - **Specific Nodes** - LLM, Agent, Code, HTTP Request, Iteration, Loop, etc. + +4. **Events** (`node_events/`) - Event system + - **Base** - Event protocols + - **Node Events** - Node lifecycle events + +5. **Entities** (`entities/`) - Domain models + - **Variable Pool** - Variable storage + - **Graph Init Params** - Initialization configuration + +## Key Design Patterns + +### Command Channel Pattern +External workflow control via Redis or in-memory channels: +```python +# Send stop command to running workflow +channel = RedisChannel(redis_client, f"workflow:{task_id}:commands") +channel.send_command(AbortCommand(reason="User requested")) +``` + +### Layer System +Extensible middleware for cross-cutting concerns: +```python +engine = GraphEngine(graph) +engine.add_layer(DebugLoggingLayer(level="INFO")) +engine.add_layer(ExecutionLimitsLayer(max_nodes=100)) +``` + +### Event-Driven Architecture +All node executions emit events for monitoring and integration: +- `NodeRunStartedEvent` - Node execution begins +- `NodeRunSucceededEvent` - Node completes successfully +- `NodeRunFailedEvent` - Node encounters error +- `GraphRunStartedEvent/GraphRunCompletedEvent` - Workflow lifecycle + +### Variable Pool +Centralized variable storage with namespace isolation: +```python +# Variables scoped by node_id +pool.add(["node1", "output"], value) +result = pool.get(["node1", "output"]) +``` + +## Import Architecture Rules + +The codebase enforces strict layering via import-linter: + +1. **Workflow Layers** (top to bottom): + - graph_engine → graph_events → graph → nodes → node_events → entities + +2. **Graph Engine Internal Layers**: + - orchestration → command_processing → event_management → graph_traversal → domain + +3. **Domain Isolation**: + - Domain models cannot import from infrastructure layers + +4. **Command Channel Independence**: + - InMemory and Redis channels must remain independent + +## Common Tasks + +### Adding a New Node Type + +1. Create node class in `nodes//` +2. Inherit from `BaseNode` or appropriate base class +3. Implement `_run()` method +4. Register in `nodes/node_mapping.py` +5. Add tests in `tests/unit_tests/core/workflow/nodes/` + +### Implementing a Custom Layer + +1. Create class inheriting from `Layer` base +2. Override lifecycle methods: `on_graph_start()`, `on_event()`, `on_graph_end()` +3. Add to engine via `engine.add_layer()` + +### Debugging Workflow Execution + +Enable debug logging layer: +```python +debug_layer = DebugLoggingLayer( + level="DEBUG", + include_inputs=True, + include_outputs=True +) +``` diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index ff56605d3d..6e58d19fd6 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -38,6 +38,7 @@ from .graph_traversal import EdgeProcessor, SkipPropagator from .layers.base import GraphEngineLayer from .orchestration import Dispatcher, ExecutionCoordinator from .protocols.command_channel import CommandChannel +from .ready_queue import InMemoryReadyQueue from .response_coordinator import ResponseStreamCoordinator from .worker_management import WorkerPool @@ -104,7 +105,7 @@ class GraphEngine: # === Execution Queues === # Queue for nodes ready to execute - self._ready_queue: queue.Queue[str] = queue.Queue() + self._ready_queue = InMemoryReadyQueue() # Queue for events generated during execution self._event_queue: queue.Queue[GraphNodeEventBase] = queue.Queue() diff --git a/api/core/workflow/graph_engine/graph_state_manager.py b/api/core/workflow/graph_engine/graph_state_manager.py index efc3992ac9..22a3a826fc 100644 --- a/api/core/workflow/graph_engine/graph_state_manager.py +++ b/api/core/workflow/graph_engine/graph_state_manager.py @@ -2,7 +2,6 @@ Graph state manager that combines node, edge, and execution tracking. """ -import queue import threading from collections.abc import Sequence from typing import TypedDict, final @@ -10,6 +9,8 @@ from typing import TypedDict, final from core.workflow.enums import NodeState from core.workflow.graph import Edge, Graph +from .ready_queue import ReadyQueue + class EdgeStateAnalysis(TypedDict): """Analysis result for edge states.""" @@ -21,7 +22,7 @@ class EdgeStateAnalysis(TypedDict): @final class GraphStateManager: - def __init__(self, graph: Graph, ready_queue: queue.Queue[str]) -> None: + def __init__(self, graph: Graph, ready_queue: ReadyQueue) -> None: """ Initialize the state manager. diff --git a/api/core/workflow/graph_engine/ready_queue/__init__.py b/api/core/workflow/graph_engine/ready_queue/__init__.py new file mode 100644 index 0000000000..9b890880f5 --- /dev/null +++ b/api/core/workflow/graph_engine/ready_queue/__init__.py @@ -0,0 +1,11 @@ +""" +Ready queue implementations for GraphEngine. + +This package contains the protocol and implementations for managing +the queue of nodes ready for execution. +""" + +from .in_memory import InMemoryReadyQueue +from .protocol import ReadyQueue + +__all__ = ["InMemoryReadyQueue", "ReadyQueue"] diff --git a/api/core/workflow/graph_engine/ready_queue/in_memory.py b/api/core/workflow/graph_engine/ready_queue/in_memory.py new file mode 100644 index 0000000000..90df9a0096 --- /dev/null +++ b/api/core/workflow/graph_engine/ready_queue/in_memory.py @@ -0,0 +1,142 @@ +""" +In-memory implementation of the ReadyQueue protocol. + +This implementation wraps Python's standard queue.Queue and adds +serialization capabilities for state storage. +""" + +import queue +from typing import final + + +@final +class InMemoryReadyQueue: + """ + In-memory ready queue implementation with serialization support. + + This implementation uses Python's queue.Queue internally and provides + methods to serialize and restore the queue state. + """ + + def __init__(self, maxsize: int = 0) -> None: + """ + Initialize the in-memory ready queue. + + Args: + maxsize: Maximum size of the queue (0 for unlimited) + """ + self._queue: queue.Queue[str] = queue.Queue(maxsize=maxsize) + + def put(self, item: str) -> None: + """ + Add a node ID to the ready queue. + + Args: + item: The node ID to add to the queue + """ + self._queue.put(item) + + def get(self, timeout: float | None = None) -> str: + """ + Retrieve and remove a node ID from the queue. + + Args: + timeout: Maximum time to wait for an item (None for blocking) + + Returns: + The node ID retrieved from the queue + + Raises: + queue.Empty: If timeout expires and no item is available + """ + if timeout is None: + return self._queue.get(block=True) + return self._queue.get(timeout=timeout) + + def task_done(self) -> None: + """ + Indicate that a previously retrieved task is complete. + + Used by worker threads to signal task completion for + join() synchronization. + """ + self._queue.task_done() + + def empty(self) -> bool: + """ + Check if the queue is empty. + + Returns: + True if the queue has no items, False otherwise + """ + return self._queue.empty() + + def qsize(self) -> int: + """ + Get the approximate size of the queue. + + Returns: + The approximate number of items in the queue + """ + return self._queue.qsize() + + def dumps(self) -> dict[str, object]: + """ + Serialize the queue state for storage. + + Returns: + A dictionary containing the serialized queue state + """ + # Extract all items from the queue without removing them + items: list[str] = [] + temp_items: list[str] = [] + + # Drain the queue temporarily to get all items + while not self._queue.empty(): + try: + item = self._queue.get_nowait() + temp_items.append(item) + items.append(item) + except queue.Empty: + break + + # Put items back in the same order + for item in temp_items: + self._queue.put(item) + + return { + "type": "InMemoryReadyQueue", + "version": "1.0", + "items": items, + "maxsize": self._queue.maxsize, + } + + def loads(self, data: dict[str, object]) -> None: + """ + Restore the queue state from serialized data. + + Args: + data: The serialized queue state to restore + """ + if data.get("type") != "InMemoryReadyQueue": + raise ValueError(f"Invalid serialized data type: {data.get('type')}") + + if data.get("version") != "1.0": + raise ValueError(f"Unsupported version: {data.get('version')}") + + # Clear the current queue + while not self._queue.empty(): + try: + self._queue.get_nowait() + except queue.Empty: + break + + # Restore items + items = data.get("items", []) + if not isinstance(items, list): + raise ValueError("Invalid items data: expected list") + + for item in items: + if not isinstance(item, str): + raise ValueError(f"Invalid item type: expected str, got {type(item).__name__}") + self._queue.put(item) diff --git a/api/core/workflow/graph_engine/ready_queue/protocol.py b/api/core/workflow/graph_engine/ready_queue/protocol.py new file mode 100644 index 0000000000..0e457bcf05 --- /dev/null +++ b/api/core/workflow/graph_engine/ready_queue/protocol.py @@ -0,0 +1,88 @@ +""" +ReadyQueue protocol for GraphEngine node execution queue. + +This protocol defines the interface for managing the queue of nodes ready +for execution, supporting both in-memory and persistent storage scenarios. +""" + +from typing import Protocol + + +class ReadyQueue(Protocol): + """ + Protocol for managing nodes ready for execution in GraphEngine. + + This protocol defines the interface that any ready queue implementation + must provide, enabling both in-memory queues and persistent queues + that can be serialized for state storage. + """ + + def put(self, item: str) -> None: + """ + Add a node ID to the ready queue. + + Args: + item: The node ID to add to the queue + """ + ... + + def get(self, timeout: float | None = None) -> str: + """ + Retrieve and remove a node ID from the queue. + + Args: + timeout: Maximum time to wait for an item (None for blocking) + + Returns: + The node ID retrieved from the queue + + Raises: + queue.Empty: If timeout expires and no item is available + """ + ... + + def task_done(self) -> None: + """ + Indicate that a previously retrieved task is complete. + + Used by worker threads to signal task completion for + join() synchronization. + """ + ... + + def empty(self) -> bool: + """ + Check if the queue is empty. + + Returns: + True if the queue has no items, False otherwise + """ + ... + + def qsize(self) -> int: + """ + Get the approximate size of the queue. + + Returns: + The approximate number of items in the queue + """ + ... + + def dumps(self) -> dict[str, object]: + """ + Serialize the queue state for storage. + + Returns: + A dictionary containing the serialized queue state + that can be persisted and later restored + """ + ... + + def loads(self, data: dict[str, object]) -> None: + """ + Restore the queue state from serialized data. + + Args: + data: The serialized queue state to restore + """ + ... diff --git a/api/core/workflow/graph_engine/worker.py b/api/core/workflow/graph_engine/worker.py index e7462309c9..42c9b936dd 100644 --- a/api/core/workflow/graph_engine/worker.py +++ b/api/core/workflow/graph_engine/worker.py @@ -22,6 +22,8 @@ from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent from core.workflow.nodes.base.node import Node from libs.flask_utils import preserve_flask_contexts +from .ready_queue import ReadyQueue + @final class Worker(threading.Thread): @@ -35,7 +37,7 @@ class Worker(threading.Thread): def __init__( self, - ready_queue: queue.Queue[str], + ready_queue: ReadyQueue, event_queue: queue.Queue[GraphNodeEventBase], graph: Graph, worker_id: int = 0, @@ -46,7 +48,7 @@ class Worker(threading.Thread): Initialize worker thread. Args: - ready_queue: Queue containing node IDs ready for execution + ready_queue: Ready queue containing node IDs ready for execution event_queue: Queue for pushing execution events graph: Graph containing nodes to execute worker_id: Unique identifier for this worker diff --git a/api/core/workflow/graph_engine/worker_management/worker_pool.py b/api/core/workflow/graph_engine/worker_management/worker_pool.py index 00328fbda1..a9aada9ea5 100644 --- a/api/core/workflow/graph_engine/worker_management/worker_pool.py +++ b/api/core/workflow/graph_engine/worker_management/worker_pool.py @@ -14,6 +14,7 @@ from configs import dify_config from core.workflow.graph import Graph from core.workflow.graph_events import GraphNodeEventBase +from ..ready_queue import ReadyQueue from ..worker import Worker logger = logging.getLogger(__name__) @@ -35,7 +36,7 @@ class WorkerPool: def __init__( self, - ready_queue: queue.Queue[str], + ready_queue: ReadyQueue, event_queue: queue.Queue[GraphNodeEventBase], graph: Graph, flask_app: "Flask | None" = None, @@ -49,7 +50,7 @@ class WorkerPool: Initialize the simple worker pool. Args: - ready_queue: Queue of nodes ready for execution + ready_queue: Ready queue for nodes ready for execution event_queue: Queue for worker events graph: The workflow graph flask_app: Optional Flask app for context preservation