diff --git a/api/core/workflow/README.md b/api/core/workflow/README.md index 53e910e7b6..bef19ba90b 100644 --- a/api/core/workflow/README.md +++ b/api/core/workflow/README.md @@ -11,6 +11,7 @@ This is the workflow graph engine module of Dify, implementing a queue-based dis The graph engine follows a layered architecture with strict dependency rules: 1. **Graph Engine** (`graph_engine/`) - Orchestrates workflow execution + - **Manager** - External control interface for stop/pause/resume commands - **Worker** - Node execution runtime - **Command Processing** - Handles control commands (abort, pause, resume) @@ -20,27 +21,33 @@ The graph engine follows a layered architecture with strict dependency rules: - **Layers** - Pluggable middleware (debug logging, execution limits) - **Command Channels** - Communication channels (InMemory, Redis) -2. **Graph** (`graph/`) - Graph structure and runtime state +1. **Graph** (`graph/`) - Graph structure and runtime state + - **Graph Template** - Workflow definition - **Edge** - Node connections with conditions - **Runtime State Protocol** - State management interface -3. **Nodes** (`nodes/`) - Node implementations +1. **Nodes** (`nodes/`) - Node implementations + - **Base** - Abstract node classes and variable parsing - **Specific Nodes** - LLM, Agent, Code, HTTP Request, Iteration, Loop, etc. -4. **Events** (`node_events/`) - Event system +1. **Events** (`node_events/`) - Event system + - **Base** - Event protocols - **Node Events** - Node lifecycle events -5. **Entities** (`entities/`) - Domain models +1. **Entities** (`entities/`) - Domain models + - **Variable Pool** - Variable storage - **Graph Init Params** - Initialization configuration ## Key Design Patterns ### Command Channel Pattern + External workflow control via Redis or in-memory channels: + ```python # Send stop command to running workflow channel = RedisChannel(redis_client, f"workflow:{task_id}:commands") @@ -48,7 +55,9 @@ channel.send_command(AbortCommand(reason="User requested")) ``` ### Layer System + Extensible middleware for cross-cutting concerns: + ```python engine = GraphEngine(graph) engine.add_layer(DebugLoggingLayer(level="INFO")) @@ -56,14 +65,18 @@ engine.add_layer(ExecutionLimitsLayer(max_nodes=100)) ``` ### Event-Driven Architecture + All node executions emit events for monitoring and integration: + - `NodeRunStartedEvent` - Node execution begins - `NodeRunSucceededEvent` - Node completes successfully - `NodeRunFailedEvent` - Node encounters error - `GraphRunStartedEvent/GraphRunCompletedEvent` - Workflow lifecycle ### Variable Pool + Centralized variable storage with namespace isolation: + ```python # Variables scoped by node_id pool.add(["node1", "output"], value) @@ -75,15 +88,19 @@ result = pool.get(["node1", "output"]) The codebase enforces strict layering via import-linter: 1. **Workflow Layers** (top to bottom): + - graph_engine → graph_events → graph → nodes → node_events → entities -2. **Graph Engine Internal Layers**: +1. **Graph Engine Internal Layers**: + - orchestration → command_processing → event_management → graph_traversal → domain -3. **Domain Isolation**: +1. **Domain Isolation**: + - Domain models cannot import from infrastructure layers -4. **Command Channel Independence**: +1. **Command Channel Independence**: + - InMemory and Redis channels must remain independent ## Common Tasks @@ -91,20 +108,21 @@ The codebase enforces strict layering via import-linter: ### Adding a New Node Type 1. Create node class in `nodes//` -2. Inherit from `BaseNode` or appropriate base class -3. Implement `_run()` method -4. Register in `nodes/node_mapping.py` -5. Add tests in `tests/unit_tests/core/workflow/nodes/` +1. Inherit from `BaseNode` or appropriate base class +1. Implement `_run()` method +1. Register in `nodes/node_mapping.py` +1. Add tests in `tests/unit_tests/core/workflow/nodes/` ### Implementing a Custom Layer 1. Create class inheriting from `Layer` base -2. Override lifecycle methods: `on_graph_start()`, `on_event()`, `on_graph_end()` -3. Add to engine via `engine.add_layer()` +1. Override lifecycle methods: `on_graph_start()`, `on_event()`, `on_graph_end()` +1. Add to engine via `engine.add_layer()` ### Debugging Workflow Execution Enable debug logging layer: + ```python debug_layer = DebugLoggingLayer( level="DEBUG",