mirror of https://github.com/langgenius/dify.git
Merge 384a53e820 into 9a6b4147bc
This commit is contained in:
commit
ec5394e1b7
|
|
@ -256,6 +256,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
|
||||||
single_iteration_run=AdvancedChatAppGenerateEntity.SingleIterationRunEntity(
|
single_iteration_run=AdvancedChatAppGenerateEntity.SingleIterationRunEntity(
|
||||||
node_id=node_id, inputs=args["inputs"]
|
node_id=node_id, inputs=args["inputs"]
|
||||||
),
|
),
|
||||||
|
workflow_run_id=str(uuid.uuid4()),
|
||||||
)
|
)
|
||||||
contexts.plugin_tool_providers.set({})
|
contexts.plugin_tool_providers.set({})
|
||||||
contexts.plugin_tool_providers_lock.set(threading.Lock())
|
contexts.plugin_tool_providers_lock.set(threading.Lock())
|
||||||
|
|
@ -339,6 +340,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
|
||||||
invoke_from=InvokeFrom.DEBUGGER,
|
invoke_from=InvokeFrom.DEBUGGER,
|
||||||
extras={"auto_generate_conversation_name": False},
|
extras={"auto_generate_conversation_name": False},
|
||||||
single_loop_run=AdvancedChatAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]),
|
single_loop_run=AdvancedChatAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]),
|
||||||
|
workflow_run_id=str(uuid.uuid4()),
|
||||||
)
|
)
|
||||||
contexts.plugin_tool_providers.set({})
|
contexts.plugin_tool_providers.set({})
|
||||||
contexts.plugin_tool_providers_lock.set(threading.Lock())
|
contexts.plugin_tool_providers_lock.set(threading.Lock())
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,10 @@ from core.variables.variables import VariableUnion
|
||||||
from core.workflow.enums import WorkflowType
|
from core.workflow.enums import WorkflowType
|
||||||
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
|
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
|
||||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||||
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
from core.workflow.graph_engine.layers.persistence import (
|
||||||
|
PersistenceWorkflowInfo,
|
||||||
|
WorkflowPersistenceLayer,
|
||||||
|
)
|
||||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||||
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
||||||
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
||||||
|
|
@ -109,6 +112,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||||
workflow=self._workflow,
|
workflow=self._workflow,
|
||||||
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
||||||
single_loop_run=self.application_generate_entity.single_loop_run,
|
single_loop_run=self.application_generate_entity.single_loop_run,
|
||||||
|
system_variables=system_inputs,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
inputs = self.application_generate_entity.inputs
|
inputs = self.application_generate_entity.inputs
|
||||||
|
|
@ -186,6 +190,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||||
|
|
||||||
self._queue_manager.graph_runtime_state = graph_runtime_state
|
self._queue_manager.graph_runtime_state = graph_runtime_state
|
||||||
|
|
||||||
|
if not self.application_generate_entity.is_single_stepping_container_nodes():
|
||||||
persistence_layer = WorkflowPersistenceLayer(
|
persistence_layer = WorkflowPersistenceLayer(
|
||||||
application_generate_entity=self.application_generate_entity,
|
application_generate_entity=self.application_generate_entity,
|
||||||
workflow_info=PersistenceWorkflowInfo(
|
workflow_info=PersistenceWorkflowInfo(
|
||||||
|
|
@ -198,7 +203,6 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||||
workflow_node_execution_repository=self._workflow_node_execution_repository,
|
workflow_node_execution_repository=self._workflow_node_execution_repository,
|
||||||
trace_manager=self.application_generate_entity.trace_manager,
|
trace_manager=self.application_generate_entity.trace_manager,
|
||||||
)
|
)
|
||||||
|
|
||||||
workflow_entry.graph_engine.layer(persistence_layer)
|
workflow_entry.graph_engine.layer(persistence_layer)
|
||||||
for layer in self._graph_engine_layers:
|
for layer in self._graph_engine_layers:
|
||||||
workflow_entry.graph_engine.layer(layer)
|
workflow_entry.graph_engine.layer(layer)
|
||||||
|
|
|
||||||
|
|
@ -92,19 +92,7 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
||||||
|
|
||||||
db.session.close()
|
db.session.close()
|
||||||
|
|
||||||
# if only single iteration run is requested
|
|
||||||
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
|
|
||||||
# Handle single iteration or single loop run
|
|
||||||
graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution(
|
|
||||||
workflow=workflow,
|
|
||||||
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
|
||||||
single_loop_run=self.application_generate_entity.single_loop_run,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
inputs = self.application_generate_entity.inputs
|
|
||||||
files = self.application_generate_entity.files
|
files = self.application_generate_entity.files
|
||||||
|
|
||||||
# Create a variable pool.
|
|
||||||
system_inputs = SystemVariable(
|
system_inputs = SystemVariable(
|
||||||
files=files,
|
files=files,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
|
|
@ -120,6 +108,19 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
||||||
invoke_from=self.application_generate_entity.invoke_from.value,
|
invoke_from=self.application_generate_entity.invoke_from.value,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# if only single iteration run is requested
|
||||||
|
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
|
||||||
|
# Handle single iteration or single loop run
|
||||||
|
graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution(
|
||||||
|
workflow=workflow,
|
||||||
|
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
||||||
|
single_loop_run=self.application_generate_entity.single_loop_run,
|
||||||
|
system_variables=system_inputs,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
inputs = self.application_generate_entity.inputs
|
||||||
|
|
||||||
|
# Create a variable pool.
|
||||||
rag_pipeline_variables = []
|
rag_pipeline_variables = []
|
||||||
if workflow.rag_pipeline_variables:
|
if workflow.rag_pipeline_variables:
|
||||||
for v in workflow.rag_pipeline_variables:
|
for v in workflow.rag_pipeline_variables:
|
||||||
|
|
@ -171,7 +172,7 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
||||||
)
|
)
|
||||||
|
|
||||||
self._queue_manager.graph_runtime_state = graph_runtime_state
|
self._queue_manager.graph_runtime_state = graph_runtime_state
|
||||||
|
if not self.application_generate_entity.is_single_stepping_container_nodes():
|
||||||
persistence_layer = WorkflowPersistenceLayer(
|
persistence_layer = WorkflowPersistenceLayer(
|
||||||
application_generate_entity=self.application_generate_entity,
|
application_generate_entity=self.application_generate_entity,
|
||||||
workflow_info=PersistenceWorkflowInfo(
|
workflow_info=PersistenceWorkflowInfo(
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,10 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
|
||||||
from core.workflow.enums import WorkflowType
|
from core.workflow.enums import WorkflowType
|
||||||
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
|
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
|
||||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||||
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
from core.workflow.graph_engine.layers.persistence import (
|
||||||
|
PersistenceWorkflowInfo,
|
||||||
|
WorkflowPersistenceLayer,
|
||||||
|
)
|
||||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||||
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
||||||
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
||||||
|
|
@ -80,6 +83,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||||
workflow=self._workflow,
|
workflow=self._workflow,
|
||||||
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
||||||
single_loop_run=self.application_generate_entity.single_loop_run,
|
single_loop_run=self.application_generate_entity.single_loop_run,
|
||||||
|
system_variables=system_inputs,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
inputs = self.application_generate_entity.inputs
|
inputs = self.application_generate_entity.inputs
|
||||||
|
|
@ -132,6 +136,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||||
command_channel=command_channel,
|
command_channel=command_channel,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not self.application_generate_entity.is_single_stepping_container_nodes():
|
||||||
persistence_layer = WorkflowPersistenceLayer(
|
persistence_layer = WorkflowPersistenceLayer(
|
||||||
application_generate_entity=self.application_generate_entity,
|
application_generate_entity=self.application_generate_entity,
|
||||||
workflow_info=PersistenceWorkflowInfo(
|
workflow_info=PersistenceWorkflowInfo(
|
||||||
|
|
|
||||||
|
|
@ -130,6 +130,8 @@ class WorkflowBasedAppRunner:
|
||||||
workflow: Workflow,
|
workflow: Workflow,
|
||||||
single_iteration_run: Any | None = None,
|
single_iteration_run: Any | None = None,
|
||||||
single_loop_run: Any | None = None,
|
single_loop_run: Any | None = None,
|
||||||
|
*,
|
||||||
|
system_variables: SystemVariable | None = None,
|
||||||
) -> tuple[Graph, VariablePool, GraphRuntimeState]:
|
) -> tuple[Graph, VariablePool, GraphRuntimeState]:
|
||||||
"""
|
"""
|
||||||
Prepare graph, variable pool, and runtime state for single node execution
|
Prepare graph, variable pool, and runtime state for single node execution
|
||||||
|
|
@ -147,9 +149,10 @@ class WorkflowBasedAppRunner:
|
||||||
ValueError: If neither single_iteration_run nor single_loop_run is specified
|
ValueError: If neither single_iteration_run nor single_loop_run is specified
|
||||||
"""
|
"""
|
||||||
# Create initial runtime state with variable pool containing environment variables
|
# Create initial runtime state with variable pool containing environment variables
|
||||||
|
system_variables = system_variables or SystemVariable.empty()
|
||||||
graph_runtime_state = GraphRuntimeState(
|
graph_runtime_state = GraphRuntimeState(
|
||||||
variable_pool=VariablePool(
|
variable_pool=VariablePool(
|
||||||
system_variables=SystemVariable.empty(),
|
system_variables=system_variables,
|
||||||
user_inputs={},
|
user_inputs={},
|
||||||
environment_variables=workflow.environment_variables,
|
environment_variables=workflow.environment_variables,
|
||||||
),
|
),
|
||||||
|
|
@ -220,7 +223,7 @@ class WorkflowBasedAppRunner:
|
||||||
# filter nodes only in the specified node type (iteration or loop)
|
# filter nodes only in the specified node type (iteration or loop)
|
||||||
main_node_config = next((n for n in graph_config.get("nodes", []) if n.get("id") == node_id), None)
|
main_node_config = next((n for n in graph_config.get("nodes", []) if n.get("id") == node_id), None)
|
||||||
start_node_id = main_node_config.get("data", {}).get("start_node_id") if main_node_config else None
|
start_node_id = main_node_config.get("data", {}).get("start_node_id") if main_node_config else None
|
||||||
node_configs = [
|
base_node_configs = [
|
||||||
node
|
node
|
||||||
for node in graph_config.get("nodes", [])
|
for node in graph_config.get("nodes", [])
|
||||||
if node.get("id") == node_id
|
if node.get("id") == node_id
|
||||||
|
|
@ -228,26 +231,74 @@ class WorkflowBasedAppRunner:
|
||||||
or (start_node_id and node.get("id") == start_node_id)
|
or (start_node_id and node.get("id") == start_node_id)
|
||||||
]
|
]
|
||||||
|
|
||||||
graph_config["nodes"] = node_configs
|
# Build a base graph config (without synthetic entry) to keep node-level context minimal.
|
||||||
|
base_graph_config = graph_config.copy()
|
||||||
|
base_graph_config["nodes"] = base_node_configs
|
||||||
|
|
||||||
node_ids = [node.get("id") for node in node_configs]
|
node_ids = [node.get("id") for node in base_node_configs if isinstance(node.get("id"), str)]
|
||||||
|
|
||||||
# filter edges only in the specified node type
|
# filter edges only in the specified node type
|
||||||
edge_configs = [
|
base_edge_configs = [
|
||||||
edge
|
edge
|
||||||
for edge in graph_config.get("edges", [])
|
for edge in graph_config.get("edges", [])
|
||||||
if (edge.get("source") is None or edge.get("source") in node_ids)
|
if (edge.get("source") is None or edge.get("source") in node_ids)
|
||||||
and (edge.get("target") is None or edge.get("target") in node_ids)
|
and (edge.get("target") is None or edge.get("target") in node_ids)
|
||||||
]
|
]
|
||||||
|
|
||||||
graph_config["edges"] = edge_configs
|
base_graph_config["edges"] = base_edge_configs
|
||||||
|
|
||||||
|
# Inject a synthetic start node so Graph validation accepts the single-node graph
|
||||||
|
# (loop/iteration nodes are containers and cannot serve as graph roots).
|
||||||
|
synthetic_start_node_id = f"{node_id}_single_step_start"
|
||||||
|
synthetic_start_node = {
|
||||||
|
"id": synthetic_start_node_id,
|
||||||
|
"type": "custom",
|
||||||
|
"data": {
|
||||||
|
"type": NodeType.START,
|
||||||
|
"title": "Start",
|
||||||
|
"desc": "Synthetic start for single-step run",
|
||||||
|
"version": "1",
|
||||||
|
"variables": [],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
synthetic_end_node_id = f"{node_id}_single_step_end"
|
||||||
|
synthetic_end_node = {
|
||||||
|
"id": synthetic_end_node_id,
|
||||||
|
"type": "custom",
|
||||||
|
"data": {
|
||||||
|
"type": NodeType.END,
|
||||||
|
"title": "End",
|
||||||
|
"desc": "Synthetic end for single-step run",
|
||||||
|
"version": "1",
|
||||||
|
"outputs": [],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
graph_config_with_entry = base_graph_config.copy()
|
||||||
|
graph_config_with_entry["nodes"] = [*base_node_configs, synthetic_start_node, synthetic_end_node]
|
||||||
|
graph_config_with_entry["edges"] = [
|
||||||
|
*base_edge_configs,
|
||||||
|
{
|
||||||
|
"source": synthetic_start_node_id,
|
||||||
|
"target": node_id,
|
||||||
|
"sourceHandle": "source",
|
||||||
|
"targetHandle": "target",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"source": node_id,
|
||||||
|
"target": synthetic_end_node_id,
|
||||||
|
"sourceHandle": "source",
|
||||||
|
"targetHandle": "target",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
# Create required parameters for Graph.init
|
# Create required parameters for Graph.init
|
||||||
graph_init_params = GraphInitParams(
|
graph_init_params = GraphInitParams(
|
||||||
tenant_id=workflow.tenant_id,
|
tenant_id=workflow.tenant_id,
|
||||||
app_id=self._app_id,
|
app_id=self._app_id,
|
||||||
workflow_id=workflow.id,
|
workflow_id=workflow.id,
|
||||||
graph_config=graph_config,
|
graph_config=base_graph_config,
|
||||||
user_id="",
|
user_id="",
|
||||||
user_from=UserFrom.ACCOUNT,
|
user_from=UserFrom.ACCOUNT,
|
||||||
invoke_from=InvokeFrom.SERVICE_API,
|
invoke_from=InvokeFrom.SERVICE_API,
|
||||||
|
|
@ -260,14 +311,16 @@ class WorkflowBasedAppRunner:
|
||||||
)
|
)
|
||||||
|
|
||||||
# init graph
|
# init graph
|
||||||
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id)
|
graph = Graph.init(
|
||||||
|
graph_config=graph_config_with_entry, node_factory=node_factory, root_node_id=synthetic_start_node_id
|
||||||
|
)
|
||||||
|
|
||||||
if not graph:
|
if not graph:
|
||||||
raise ValueError("graph not found in workflow")
|
raise ValueError("graph not found in workflow")
|
||||||
|
|
||||||
# fetch node config from node id
|
# fetch node config from node id
|
||||||
target_node_config = None
|
target_node_config = None
|
||||||
for node in node_configs:
|
for node in base_node_configs:
|
||||||
if node.get("id") == node_id:
|
if node.get("id") == node_id:
|
||||||
target_node_config = node
|
target_node_config = node
|
||||||
break
|
break
|
||||||
|
|
|
||||||
|
|
@ -228,6 +228,9 @@ class AdvancedChatAppGenerateEntity(ConversationAppGenerateEntity):
|
||||||
|
|
||||||
single_loop_run: SingleLoopRunEntity | None = None
|
single_loop_run: SingleLoopRunEntity | None = None
|
||||||
|
|
||||||
|
def is_single_stepping_container_nodes(self) -> bool:
|
||||||
|
return self.single_iteration_run is not None or self.single_loop_run is not None
|
||||||
|
|
||||||
|
|
||||||
class WorkflowAppGenerateEntity(AppGenerateEntity):
|
class WorkflowAppGenerateEntity(AppGenerateEntity):
|
||||||
"""
|
"""
|
||||||
|
|
@ -258,6 +261,9 @@ class WorkflowAppGenerateEntity(AppGenerateEntity):
|
||||||
|
|
||||||
single_loop_run: SingleLoopRunEntity | None = None
|
single_loop_run: SingleLoopRunEntity | None = None
|
||||||
|
|
||||||
|
def is_single_stepping_container_nodes(self) -> bool:
|
||||||
|
return self.single_iteration_run is not None or self.single_loop_run is not None
|
||||||
|
|
||||||
|
|
||||||
class RagPipelineGenerateEntity(WorkflowAppGenerateEntity):
|
class RagPipelineGenerateEntity(WorkflowAppGenerateEntity):
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -149,18 +149,49 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
|
||||||
return isinstance(variable, NoneSegment) or len(variable.value) == 0
|
return isinstance(variable, NoneSegment) or len(variable.value) == 0
|
||||||
|
|
||||||
def _handle_empty_iteration(self, variable: ArraySegment | NoneSegment) -> Generator[NodeEventBase, None, None]:
|
def _handle_empty_iteration(self, variable: ArraySegment | NoneSegment) -> Generator[NodeEventBase, None, None]:
|
||||||
|
started_at = naive_utc_now()
|
||||||
|
inputs: dict[str, object] = {"iterator_selector": []}
|
||||||
|
usage = LLMUsage.empty_usage()
|
||||||
|
|
||||||
|
yield IterationStartedEvent(
|
||||||
|
start_at=started_at,
|
||||||
|
inputs=inputs,
|
||||||
|
metadata={"iteration_length": 0},
|
||||||
|
)
|
||||||
|
|
||||||
# Try our best to preserve the type information.
|
# Try our best to preserve the type information.
|
||||||
if isinstance(variable, ArraySegment):
|
if isinstance(variable, ArraySegment):
|
||||||
output = variable.model_copy(update={"value": []})
|
output = variable.model_copy(update={"value": []})
|
||||||
else:
|
else:
|
||||||
output = ArrayAnySegment(value=[])
|
output = ArrayAnySegment(value=[])
|
||||||
|
|
||||||
|
yield IterationSucceededEvent(
|
||||||
|
start_at=started_at,
|
||||||
|
inputs=inputs,
|
||||||
|
outputs={"output": []},
|
||||||
|
steps=0,
|
||||||
|
metadata={
|
||||||
|
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
|
||||||
|
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
|
||||||
|
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
|
||||||
|
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
yield StreamCompletedEvent(
|
yield StreamCompletedEvent(
|
||||||
node_run_result=NodeRunResult(
|
node_run_result=NodeRunResult(
|
||||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||||
# TODO(QuantumGhost): is it possible to compute the type of `output`
|
# TODO(QuantumGhost): is it possible to compute the type of `output`
|
||||||
# from graph definition?
|
# from graph definition?
|
||||||
outputs={"output": output},
|
outputs={"output": output},
|
||||||
|
inputs=inputs,
|
||||||
|
metadata={
|
||||||
|
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
|
||||||
|
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
|
||||||
|
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
|
||||||
|
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {},
|
||||||
|
},
|
||||||
|
llm_usage=usage,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,245 @@
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import core.app.apps.workflow.app_runner as workflow_app_runner
|
||||||
|
from core.app.apps.workflow.app_runner import WorkflowAppRunner, WorkflowBasedAppRunner
|
||||||
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||||
|
from core.workflow.enums import NodeType, SystemVariableKey, WorkflowType
|
||||||
|
from core.workflow.system_variable import SystemVariable
|
||||||
|
|
||||||
|
|
||||||
|
def test_prepare_single_iteration_injects_system_variables_and_fake_workflow():
|
||||||
|
node_id = "iteration_node"
|
||||||
|
execution_id = "workflow-exec-123"
|
||||||
|
|
||||||
|
workflow = SimpleNamespace(
|
||||||
|
id="workflow-id",
|
||||||
|
tenant_id="tenant-id",
|
||||||
|
app_id="app-id",
|
||||||
|
environment_variables=[],
|
||||||
|
graph_dict={
|
||||||
|
"nodes": [
|
||||||
|
{
|
||||||
|
"id": node_id,
|
||||||
|
"type": "custom",
|
||||||
|
"data": {
|
||||||
|
"type": NodeType.ITERATION,
|
||||||
|
"title": "Iteration",
|
||||||
|
"version": "1",
|
||||||
|
"iterator_selector": ["start", "items"],
|
||||||
|
"output_selector": [node_id, "output"],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"edges": [],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
|
||||||
|
|
||||||
|
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
|
||||||
|
|
||||||
|
graph, _, runtime_state = runner._prepare_single_node_execution(
|
||||||
|
workflow=workflow,
|
||||||
|
single_iteration_run=SimpleNamespace(node_id=node_id, inputs={"input_selector": [1, 2, 3]}),
|
||||||
|
system_variables=system_inputs,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
|
||||||
|
assert runtime_state.variable_pool.get_by_prefix("sys")[SystemVariableKey.WORKFLOW_EXECUTION_ID] == execution_id
|
||||||
|
assert graph.root_node.id == f"{node_id}_single_step_start"
|
||||||
|
assert f"{node_id}_single_step_end" in graph.nodes
|
||||||
|
|
||||||
|
|
||||||
|
def test_prepare_single_loop_injects_system_variables_and_fake_workflow():
|
||||||
|
node_id = "loop_node"
|
||||||
|
execution_id = "workflow-exec-456"
|
||||||
|
|
||||||
|
workflow = SimpleNamespace(
|
||||||
|
id="workflow-id",
|
||||||
|
tenant_id="tenant-id",
|
||||||
|
app_id="app-id",
|
||||||
|
environment_variables=[],
|
||||||
|
graph_dict={
|
||||||
|
"nodes": [
|
||||||
|
{
|
||||||
|
"id": node_id,
|
||||||
|
"type": "custom",
|
||||||
|
"data": {
|
||||||
|
"type": NodeType.LOOP,
|
||||||
|
"title": "Loop",
|
||||||
|
"version": "1",
|
||||||
|
"loop_count": 1,
|
||||||
|
"break_conditions": [],
|
||||||
|
"logical_operator": "and",
|
||||||
|
"loop_variables": [],
|
||||||
|
"outputs": {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"edges": [],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
|
||||||
|
|
||||||
|
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
|
||||||
|
|
||||||
|
graph, _, runtime_state = runner._prepare_single_node_execution(
|
||||||
|
workflow=workflow,
|
||||||
|
single_loop_run=SimpleNamespace(node_id=node_id, inputs={}),
|
||||||
|
system_variables=system_inputs,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
|
||||||
|
assert graph.root_node.id == f"{node_id}_single_step_start"
|
||||||
|
assert f"{node_id}_single_step_end" in graph.nodes
|
||||||
|
|
||||||
|
|
||||||
|
class DummyCommandChannel:
|
||||||
|
def fetch_commands(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def send_command(self, command):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _empty_graph_engine_run(self):
|
||||||
|
if False: # pragma: no cover
|
||||||
|
yield None
|
||||||
|
|
||||||
|
|
||||||
|
def _build_generate_entity(*, single_iteration_run=None, single_loop_run=None):
|
||||||
|
if isinstance(single_iteration_run, dict):
|
||||||
|
single_iteration_run = SimpleNamespace(**single_iteration_run)
|
||||||
|
if isinstance(single_loop_run, dict):
|
||||||
|
single_loop_run = SimpleNamespace(**single_loop_run)
|
||||||
|
|
||||||
|
base = SimpleNamespace(
|
||||||
|
app_config=SimpleNamespace(app_id="app-id", workflow_id="workflow-id"),
|
||||||
|
workflow_execution_id="workflow-exec-id",
|
||||||
|
files=[],
|
||||||
|
user_id="user-id",
|
||||||
|
inputs={},
|
||||||
|
invoke_from=InvokeFrom.DEBUGGER,
|
||||||
|
call_depth=0,
|
||||||
|
task_id="task-id",
|
||||||
|
trace_manager=None,
|
||||||
|
single_iteration_run=single_iteration_run,
|
||||||
|
single_loop_run=single_loop_run,
|
||||||
|
)
|
||||||
|
|
||||||
|
def is_single_stepping_container_nodes():
|
||||||
|
return base.single_iteration_run is not None or base.single_loop_run is not None
|
||||||
|
|
||||||
|
base.is_single_stepping_container_nodes = is_single_stepping_container_nodes # type: ignore[attr-defined]
|
||||||
|
return base
|
||||||
|
|
||||||
|
|
||||||
|
def test_workflow_runner_attaches_persistence_for_full_run(monkeypatch):
|
||||||
|
from core.workflow.graph_engine.graph_engine import GraphEngine
|
||||||
|
|
||||||
|
monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run)
|
||||||
|
persistence_ctor = MagicMock(name="persistence_layer_ctor")
|
||||||
|
monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor)
|
||||||
|
monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel())
|
||||||
|
|
||||||
|
queue_manager = MagicMock()
|
||||||
|
workflow = SimpleNamespace(
|
||||||
|
id="workflow-id",
|
||||||
|
tenant_id="tenant-id",
|
||||||
|
app_id="app-id",
|
||||||
|
type=WorkflowType.WORKFLOW,
|
||||||
|
version="1",
|
||||||
|
graph_dict={
|
||||||
|
"nodes": [
|
||||||
|
{
|
||||||
|
"id": "start",
|
||||||
|
"type": "custom",
|
||||||
|
"data": {"type": NodeType.START, "title": "Start", "version": "1", "variables": []},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "end",
|
||||||
|
"type": "custom",
|
||||||
|
"data": {"type": NodeType.END, "title": "End", "version": "1", "outputs": []},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
"edges": [
|
||||||
|
{"source": "start", "target": "end", "sourceHandle": "source", "targetHandle": "target"},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
environment_variables=[],
|
||||||
|
)
|
||||||
|
generate_entity = _build_generate_entity()
|
||||||
|
generate_entity.inputs = {"input": "value"}
|
||||||
|
|
||||||
|
runner = WorkflowAppRunner(
|
||||||
|
application_generate_entity=generate_entity,
|
||||||
|
queue_manager=queue_manager,
|
||||||
|
variable_loader=MagicMock(),
|
||||||
|
workflow=workflow,
|
||||||
|
system_user_id="system-user-id",
|
||||||
|
root_node_id=None,
|
||||||
|
workflow_execution_repository=MagicMock(),
|
||||||
|
workflow_node_execution_repository=MagicMock(),
|
||||||
|
graph_engine_layers=(),
|
||||||
|
)
|
||||||
|
|
||||||
|
runner.run()
|
||||||
|
|
||||||
|
assert persistence_ctor.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_workflow_runner_skips_persistence_for_single_step(monkeypatch):
|
||||||
|
from core.workflow.graph_engine.graph_engine import GraphEngine
|
||||||
|
|
||||||
|
monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run)
|
||||||
|
persistence_ctor = MagicMock(name="persistence_layer_ctor")
|
||||||
|
monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor)
|
||||||
|
monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel())
|
||||||
|
|
||||||
|
queue_manager = MagicMock()
|
||||||
|
workflow = SimpleNamespace(
|
||||||
|
id="workflow-id",
|
||||||
|
tenant_id="tenant-id",
|
||||||
|
app_id="app-id",
|
||||||
|
type=WorkflowType.WORKFLOW,
|
||||||
|
version="1",
|
||||||
|
graph_dict={
|
||||||
|
"nodes": [
|
||||||
|
{
|
||||||
|
"id": "loop",
|
||||||
|
"type": "custom",
|
||||||
|
"data": {
|
||||||
|
"type": NodeType.LOOP,
|
||||||
|
"title": "Loop",
|
||||||
|
"version": "1",
|
||||||
|
"loop_count": 1,
|
||||||
|
"break_conditions": [],
|
||||||
|
"logical_operator": "and",
|
||||||
|
"loop_variables": [],
|
||||||
|
"outputs": {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"edges": [],
|
||||||
|
},
|
||||||
|
environment_variables=[],
|
||||||
|
)
|
||||||
|
generate_entity = _build_generate_entity(single_loop_run={"node_id": "loop", "inputs": {}})
|
||||||
|
|
||||||
|
runner = WorkflowAppRunner(
|
||||||
|
application_generate_entity=generate_entity,
|
||||||
|
queue_manager=queue_manager,
|
||||||
|
variable_loader=MagicMock(),
|
||||||
|
workflow=workflow,
|
||||||
|
system_user_id="system-user-id",
|
||||||
|
root_node_id=None,
|
||||||
|
workflow_execution_repository=MagicMock(),
|
||||||
|
workflow_node_execution_repository=MagicMock(),
|
||||||
|
graph_engine_layers=(),
|
||||||
|
)
|
||||||
|
|
||||||
|
runner.run()
|
||||||
|
|
||||||
|
assert persistence_ctor.call_count == 0
|
||||||
|
|
@ -0,0 +1,51 @@
|
||||||
|
from core.workflow.entities import GraphInitParams
|
||||||
|
from core.workflow.graph_events import (
|
||||||
|
NodeRunIterationStartedEvent,
|
||||||
|
NodeRunIterationSucceededEvent,
|
||||||
|
NodeRunSucceededEvent,
|
||||||
|
)
|
||||||
|
from core.workflow.nodes.iteration.iteration_node import IterationNode
|
||||||
|
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
||||||
|
from core.workflow.system_variable import SystemVariable
|
||||||
|
|
||||||
|
|
||||||
|
def test_iteration_node_emits_iteration_events_when_iterator_empty():
|
||||||
|
init_params = GraphInitParams(
|
||||||
|
tenant_id="tenant",
|
||||||
|
app_id="app",
|
||||||
|
workflow_id="workflow",
|
||||||
|
graph_config={},
|
||||||
|
user_id="user",
|
||||||
|
user_from="account",
|
||||||
|
invoke_from="debugger",
|
||||||
|
call_depth=0,
|
||||||
|
)
|
||||||
|
runtime_state = GraphRuntimeState(
|
||||||
|
variable_pool=VariablePool(system_variables=SystemVariable.empty(), user_inputs={}),
|
||||||
|
start_at=0.0,
|
||||||
|
)
|
||||||
|
runtime_state.variable_pool.add(("start", "items"), [])
|
||||||
|
|
||||||
|
node = IterationNode(
|
||||||
|
id="iteration-node",
|
||||||
|
config={
|
||||||
|
"id": "iteration-node",
|
||||||
|
"data": {
|
||||||
|
"title": "Iteration",
|
||||||
|
"iterator_selector": ["start", "items"],
|
||||||
|
"output_selector": ["iteration-node", "output"],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
graph_init_params=init_params,
|
||||||
|
graph_runtime_state=runtime_state,
|
||||||
|
)
|
||||||
|
|
||||||
|
events = list(node.run())
|
||||||
|
|
||||||
|
assert any(isinstance(event, NodeRunIterationStartedEvent) for event in events)
|
||||||
|
|
||||||
|
iteration_succeeded_event = next(event for event in events if isinstance(event, NodeRunIterationSucceededEvent))
|
||||||
|
assert iteration_succeeded_event.steps == 0
|
||||||
|
assert iteration_succeeded_event.outputs == {"output": []}
|
||||||
|
|
||||||
|
assert any(isinstance(event, NodeRunSucceededEvent) for event in events)
|
||||||
|
|
@ -791,8 +791,12 @@ const useOneStepRun = <T>({
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
const { data: iterationData } = params
|
const { data: iterationData } = params
|
||||||
_runResult.created_by = iterationData.created_by.name
|
const nextRunResult = {
|
||||||
setRunResult(_runResult)
|
..._runResult,
|
||||||
|
created_by: (iterationData.created_by as any)?.name || '',
|
||||||
|
}
|
||||||
|
_runResult = nextRunResult
|
||||||
|
setRunResult(nextRunResult)
|
||||||
},
|
},
|
||||||
onIterationStart: (params) => {
|
onIterationStart: (params) => {
|
||||||
const newIterationRunResult = produce(_iterationResult, (draft) => {
|
const newIterationRunResult = produce(_iterationResult, (draft) => {
|
||||||
|
|
@ -894,8 +898,12 @@ const useOneStepRun = <T>({
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
const { data: loopData } = params
|
const { data: loopData } = params
|
||||||
_runResult.created_by = loopData.created_by.name
|
const nextRunResult = {
|
||||||
setRunResult(_runResult)
|
..._runResult,
|
||||||
|
created_by: (loopData.created_by as any)?.name || '',
|
||||||
|
}
|
||||||
|
_runResult = nextRunResult
|
||||||
|
setRunResult(nextRunResult)
|
||||||
},
|
},
|
||||||
onLoopStart: (params) => {
|
onLoopStart: (params) => {
|
||||||
const newLoopRunResult = produce(_loopResult, (draft) => {
|
const newLoopRunResult = produce(_loopResult, (draft) => {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue