feat(graph_engine): add abstract layer and dump / load methods for ready queue.

This commit is contained in:
-LAN- 2025-09-14 04:19:24 +08:00
parent efa5f35277
commit 4cdc19fd05
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
8 changed files with 367 additions and 7 deletions

114
api/core/workflow/README.md Normal file
View File

@ -0,0 +1,114 @@
# Workflow
## Project Overview
This is the workflow graph engine module of Dify, implementing a queue-based distributed workflow execution system. The engine handles agentic AI workflows with support for parallel execution, node iteration, conditional logic, and external command control.
## Architecture
### Core Components
The graph engine follows a layered architecture with strict dependency rules:
1. **Graph Engine** (`graph_engine/`) - Orchestrates workflow execution
- **Manager** - External control interface for stop/pause/resume commands
- **Worker** - Node execution runtime
- **Command Processing** - Handles control commands (abort, pause, resume)
- **Event Management** - Event propagation and layer notifications
- **Graph Traversal** - Edge processing and skip propagation
- **Response Coordinator** - Path tracking and session management
- **Layers** - Pluggable middleware (debug logging, execution limits)
- **Command Channels** - Communication channels (InMemory, Redis)
2. **Graph** (`graph/`) - Graph structure and runtime state
- **Graph Template** - Workflow definition
- **Edge** - Node connections with conditions
- **Runtime State Protocol** - State management interface
3. **Nodes** (`nodes/`) - Node implementations
- **Base** - Abstract node classes and variable parsing
- **Specific Nodes** - LLM, Agent, Code, HTTP Request, Iteration, Loop, etc.
4. **Events** (`node_events/`) - Event system
- **Base** - Event protocols
- **Node Events** - Node lifecycle events
5. **Entities** (`entities/`) - Domain models
- **Variable Pool** - Variable storage
- **Graph Init Params** - Initialization configuration
## Key Design Patterns
### Command Channel Pattern
External workflow control via Redis or in-memory channels:
```python
# Send stop command to running workflow
channel = RedisChannel(redis_client, f"workflow:{task_id}:commands")
channel.send_command(AbortCommand(reason="User requested"))
```
### Layer System
Extensible middleware for cross-cutting concerns:
```python
engine = GraphEngine(graph)
engine.add_layer(DebugLoggingLayer(level="INFO"))
engine.add_layer(ExecutionLimitsLayer(max_nodes=100))
```
### Event-Driven Architecture
All node executions emit events for monitoring and integration:
- `NodeRunStartedEvent` - Node execution begins
- `NodeRunSucceededEvent` - Node completes successfully
- `NodeRunFailedEvent` - Node encounters error
- `GraphRunStartedEvent/GraphRunCompletedEvent` - Workflow lifecycle
### Variable Pool
Centralized variable storage with namespace isolation:
```python
# Variables scoped by node_id
pool.add(["node1", "output"], value)
result = pool.get(["node1", "output"])
```
## Import Architecture Rules
The codebase enforces strict layering via import-linter:
1. **Workflow Layers** (top to bottom):
- graph_engine → graph_events → graph → nodes → node_events → entities
2. **Graph Engine Internal Layers**:
- orchestration → command_processing → event_management → graph_traversal → domain
3. **Domain Isolation**:
- Domain models cannot import from infrastructure layers
4. **Command Channel Independence**:
- InMemory and Redis channels must remain independent
## Common Tasks
### Adding a New Node Type
1. Create node class in `nodes/<node_type>/`
2. Inherit from `BaseNode` or appropriate base class
3. Implement `_run()` method
4. Register in `nodes/node_mapping.py`
5. Add tests in `tests/unit_tests/core/workflow/nodes/`
### Implementing a Custom Layer
1. Create class inheriting from `Layer` base
2. Override lifecycle methods: `on_graph_start()`, `on_event()`, `on_graph_end()`
3. Add to engine via `engine.add_layer()`
### Debugging Workflow Execution
Enable debug logging layer:
```python
debug_layer = DebugLoggingLayer(
level="DEBUG",
include_inputs=True,
include_outputs=True
)
```

View File

@ -38,6 +38,7 @@ from .graph_traversal import EdgeProcessor, SkipPropagator
from .layers.base import GraphEngineLayer from .layers.base import GraphEngineLayer
from .orchestration import Dispatcher, ExecutionCoordinator from .orchestration import Dispatcher, ExecutionCoordinator
from .protocols.command_channel import CommandChannel from .protocols.command_channel import CommandChannel
from .ready_queue import InMemoryReadyQueue
from .response_coordinator import ResponseStreamCoordinator from .response_coordinator import ResponseStreamCoordinator
from .worker_management import WorkerPool from .worker_management import WorkerPool
@ -104,7 +105,7 @@ class GraphEngine:
# === Execution Queues === # === Execution Queues ===
# Queue for nodes ready to execute # Queue for nodes ready to execute
self._ready_queue: queue.Queue[str] = queue.Queue() self._ready_queue = InMemoryReadyQueue()
# Queue for events generated during execution # Queue for events generated during execution
self._event_queue: queue.Queue[GraphNodeEventBase] = queue.Queue() self._event_queue: queue.Queue[GraphNodeEventBase] = queue.Queue()

View File

@ -2,7 +2,6 @@
Graph state manager that combines node, edge, and execution tracking. Graph state manager that combines node, edge, and execution tracking.
""" """
import queue
import threading import threading
from collections.abc import Sequence from collections.abc import Sequence
from typing import TypedDict, final from typing import TypedDict, final
@ -10,6 +9,8 @@ from typing import TypedDict, final
from core.workflow.enums import NodeState from core.workflow.enums import NodeState
from core.workflow.graph import Edge, Graph from core.workflow.graph import Edge, Graph
from .ready_queue import ReadyQueue
class EdgeStateAnalysis(TypedDict): class EdgeStateAnalysis(TypedDict):
"""Analysis result for edge states.""" """Analysis result for edge states."""
@ -21,7 +22,7 @@ class EdgeStateAnalysis(TypedDict):
@final @final
class GraphStateManager: class GraphStateManager:
def __init__(self, graph: Graph, ready_queue: queue.Queue[str]) -> None: def __init__(self, graph: Graph, ready_queue: ReadyQueue) -> None:
""" """
Initialize the state manager. Initialize the state manager.

View File

@ -0,0 +1,11 @@
"""
Ready queue implementations for GraphEngine.
This package contains the protocol and implementations for managing
the queue of nodes ready for execution.
"""
from .in_memory import InMemoryReadyQueue
from .protocol import ReadyQueue
__all__ = ["InMemoryReadyQueue", "ReadyQueue"]

View File

@ -0,0 +1,142 @@
"""
In-memory implementation of the ReadyQueue protocol.
This implementation wraps Python's standard queue.Queue and adds
serialization capabilities for state storage.
"""
import queue
from typing import final
@final
class InMemoryReadyQueue:
"""
In-memory ready queue implementation with serialization support.
This implementation uses Python's queue.Queue internally and provides
methods to serialize and restore the queue state.
"""
def __init__(self, maxsize: int = 0) -> None:
"""
Initialize the in-memory ready queue.
Args:
maxsize: Maximum size of the queue (0 for unlimited)
"""
self._queue: queue.Queue[str] = queue.Queue(maxsize=maxsize)
def put(self, item: str) -> None:
"""
Add a node ID to the ready queue.
Args:
item: The node ID to add to the queue
"""
self._queue.put(item)
def get(self, timeout: float | None = None) -> str:
"""
Retrieve and remove a node ID from the queue.
Args:
timeout: Maximum time to wait for an item (None for blocking)
Returns:
The node ID retrieved from the queue
Raises:
queue.Empty: If timeout expires and no item is available
"""
if timeout is None:
return self._queue.get(block=True)
return self._queue.get(timeout=timeout)
def task_done(self) -> None:
"""
Indicate that a previously retrieved task is complete.
Used by worker threads to signal task completion for
join() synchronization.
"""
self._queue.task_done()
def empty(self) -> bool:
"""
Check if the queue is empty.
Returns:
True if the queue has no items, False otherwise
"""
return self._queue.empty()
def qsize(self) -> int:
"""
Get the approximate size of the queue.
Returns:
The approximate number of items in the queue
"""
return self._queue.qsize()
def dumps(self) -> dict[str, object]:
"""
Serialize the queue state for storage.
Returns:
A dictionary containing the serialized queue state
"""
# Extract all items from the queue without removing them
items: list[str] = []
temp_items: list[str] = []
# Drain the queue temporarily to get all items
while not self._queue.empty():
try:
item = self._queue.get_nowait()
temp_items.append(item)
items.append(item)
except queue.Empty:
break
# Put items back in the same order
for item in temp_items:
self._queue.put(item)
return {
"type": "InMemoryReadyQueue",
"version": "1.0",
"items": items,
"maxsize": self._queue.maxsize,
}
def loads(self, data: dict[str, object]) -> None:
"""
Restore the queue state from serialized data.
Args:
data: The serialized queue state to restore
"""
if data.get("type") != "InMemoryReadyQueue":
raise ValueError(f"Invalid serialized data type: {data.get('type')}")
if data.get("version") != "1.0":
raise ValueError(f"Unsupported version: {data.get('version')}")
# Clear the current queue
while not self._queue.empty():
try:
self._queue.get_nowait()
except queue.Empty:
break
# Restore items
items = data.get("items", [])
if not isinstance(items, list):
raise ValueError("Invalid items data: expected list")
for item in items:
if not isinstance(item, str):
raise ValueError(f"Invalid item type: expected str, got {type(item).__name__}")
self._queue.put(item)

View File

@ -0,0 +1,88 @@
"""
ReadyQueue protocol for GraphEngine node execution queue.
This protocol defines the interface for managing the queue of nodes ready
for execution, supporting both in-memory and persistent storage scenarios.
"""
from typing import Protocol
class ReadyQueue(Protocol):
"""
Protocol for managing nodes ready for execution in GraphEngine.
This protocol defines the interface that any ready queue implementation
must provide, enabling both in-memory queues and persistent queues
that can be serialized for state storage.
"""
def put(self, item: str) -> None:
"""
Add a node ID to the ready queue.
Args:
item: The node ID to add to the queue
"""
...
def get(self, timeout: float | None = None) -> str:
"""
Retrieve and remove a node ID from the queue.
Args:
timeout: Maximum time to wait for an item (None for blocking)
Returns:
The node ID retrieved from the queue
Raises:
queue.Empty: If timeout expires and no item is available
"""
...
def task_done(self) -> None:
"""
Indicate that a previously retrieved task is complete.
Used by worker threads to signal task completion for
join() synchronization.
"""
...
def empty(self) -> bool:
"""
Check if the queue is empty.
Returns:
True if the queue has no items, False otherwise
"""
...
def qsize(self) -> int:
"""
Get the approximate size of the queue.
Returns:
The approximate number of items in the queue
"""
...
def dumps(self) -> dict[str, object]:
"""
Serialize the queue state for storage.
Returns:
A dictionary containing the serialized queue state
that can be persisted and later restored
"""
...
def loads(self, data: dict[str, object]) -> None:
"""
Restore the queue state from serialized data.
Args:
data: The serialized queue state to restore
"""
...

View File

@ -22,6 +22,8 @@ from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
from core.workflow.nodes.base.node import Node from core.workflow.nodes.base.node import Node
from libs.flask_utils import preserve_flask_contexts from libs.flask_utils import preserve_flask_contexts
from .ready_queue import ReadyQueue
@final @final
class Worker(threading.Thread): class Worker(threading.Thread):
@ -35,7 +37,7 @@ class Worker(threading.Thread):
def __init__( def __init__(
self, self,
ready_queue: queue.Queue[str], ready_queue: ReadyQueue,
event_queue: queue.Queue[GraphNodeEventBase], event_queue: queue.Queue[GraphNodeEventBase],
graph: Graph, graph: Graph,
worker_id: int = 0, worker_id: int = 0,
@ -46,7 +48,7 @@ class Worker(threading.Thread):
Initialize worker thread. Initialize worker thread.
Args: Args:
ready_queue: Queue containing node IDs ready for execution ready_queue: Ready queue containing node IDs ready for execution
event_queue: Queue for pushing execution events event_queue: Queue for pushing execution events
graph: Graph containing nodes to execute graph: Graph containing nodes to execute
worker_id: Unique identifier for this worker worker_id: Unique identifier for this worker

View File

@ -14,6 +14,7 @@ from configs import dify_config
from core.workflow.graph import Graph from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase from core.workflow.graph_events import GraphNodeEventBase
from ..ready_queue import ReadyQueue
from ..worker import Worker from ..worker import Worker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -35,7 +36,7 @@ class WorkerPool:
def __init__( def __init__(
self, self,
ready_queue: queue.Queue[str], ready_queue: ReadyQueue,
event_queue: queue.Queue[GraphNodeEventBase], event_queue: queue.Queue[GraphNodeEventBase],
graph: Graph, graph: Graph,
flask_app: "Flask | None" = None, flask_app: "Flask | None" = None,
@ -49,7 +50,7 @@ class WorkerPool:
Initialize the simple worker pool. Initialize the simple worker pool.
Args: Args:
ready_queue: Queue of nodes ready for execution ready_queue: Ready queue for nodes ready for execution
event_queue: Queue for worker events event_queue: Queue for worker events
graph: The workflow graph graph: The workflow graph
flask_app: Optional Flask app for context preservation flask_app: Optional Flask app for context preservation