From 7ccf858ce61606ea4518a64ace44e4bbee4591b5 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 7 Jan 2026 21:47:23 +0800 Subject: [PATCH] fix(workflow): pass correct user_from/invoke_from into graph init (#30637) --- api/core/app/apps/advanced_chat/app_runner.py | 16 +++++----- api/core/app/apps/pipeline/pipeline_runner.py | 31 ++++++++++++------- api/core/app/apps/workflow/app_runner.py | 16 +++++----- api/core/app/apps/workflow_app_runner.py | 14 +++++++-- 4 files changed, 49 insertions(+), 28 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index a2ae8dec5b..d636548f2b 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -39,7 +39,6 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from extensions.otel import WorkflowAppRunnerHandler, trace_span from models import Workflow -from models.enums import UserFrom from models.model import App, Conversation, Message, MessageAnnotation from models.workflow import ConversationVariable from services.conversation_variable_updater import ConversationVariableUpdater @@ -106,6 +105,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): if not app_record: raise ValueError("App not found") + invoke_from = self.application_generate_entity.invoke_from + if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run: + invoke_from = InvokeFrom.DEBUGGER + user_from = self._resolve_user_from(invoke_from) + if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run: # Handle single iteration or single loop run graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution( @@ -158,6 +162,8 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): workflow_id=self._workflow.id, tenant_id=self._workflow.tenant_id, user_id=self.application_generate_entity.user_id, + user_from=user_from, + invoke_from=invoke_from, ) db.session.close() @@ -175,12 +181,8 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): graph=graph, graph_config=self._workflow.graph_dict, user_id=self.application_generate_entity.user_id, - user_from=( - UserFrom.ACCOUNT - if self.application_generate_entity.invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} - else UserFrom.END_USER - ), - invoke_from=self.application_generate_entity.invoke_from, + user_from=user_from, + invoke_from=invoke_from, call_depth=self.application_generate_entity.call_depth, variable_pool=variable_pool, graph_runtime_state=graph_runtime_state, diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 4be9e01fbf..0157521ae9 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -73,9 +73,15 @@ class PipelineRunner(WorkflowBasedAppRunner): """ app_config = self.application_generate_entity.app_config app_config = cast(PipelineConfig, app_config) + invoke_from = self.application_generate_entity.invoke_from + + if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run: + invoke_from = InvokeFrom.DEBUGGER + + user_from = self._resolve_user_from(invoke_from) user_id = None - if self.application_generate_entity.invoke_from in {InvokeFrom.WEB_APP, InvokeFrom.SERVICE_API}: + if invoke_from in {InvokeFrom.WEB_APP, InvokeFrom.SERVICE_API}: end_user = db.session.query(EndUser).where(EndUser.id == self.application_generate_entity.user_id).first() if end_user: user_id = end_user.session_id @@ -117,7 +123,7 @@ class PipelineRunner(WorkflowBasedAppRunner): dataset_id=self.application_generate_entity.dataset_id, datasource_type=self.application_generate_entity.datasource_type, datasource_info=self.application_generate_entity.datasource_info, - invoke_from=self.application_generate_entity.invoke_from.value, + invoke_from=invoke_from.value, ) rag_pipeline_variables = [] @@ -149,6 +155,8 @@ class PipelineRunner(WorkflowBasedAppRunner): graph_runtime_state=graph_runtime_state, start_node_id=self.application_generate_entity.start_node_id, workflow=workflow, + user_from=user_from, + invoke_from=invoke_from, ) # RUN WORKFLOW @@ -159,12 +167,8 @@ class PipelineRunner(WorkflowBasedAppRunner): graph=graph, graph_config=workflow.graph_dict, user_id=self.application_generate_entity.user_id, - user_from=( - UserFrom.ACCOUNT - if self.application_generate_entity.invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} - else UserFrom.END_USER - ), - invoke_from=self.application_generate_entity.invoke_from, + user_from=user_from, + invoke_from=invoke_from, call_depth=self.application_generate_entity.call_depth, graph_runtime_state=graph_runtime_state, variable_pool=variable_pool, @@ -210,7 +214,12 @@ class PipelineRunner(WorkflowBasedAppRunner): return workflow def _init_rag_pipeline_graph( - self, workflow: Workflow, graph_runtime_state: GraphRuntimeState, start_node_id: str | None = None + self, + workflow: Workflow, + graph_runtime_state: GraphRuntimeState, + start_node_id: str | None = None, + user_from: UserFrom = UserFrom.ACCOUNT, + invoke_from: InvokeFrom = InvokeFrom.SERVICE_API, ) -> Graph: """ Init pipeline graph @@ -253,8 +262,8 @@ class PipelineRunner(WorkflowBasedAppRunner): workflow_id=workflow.id, graph_config=graph_config, user_id=self.application_generate_entity.user_id, - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.SERVICE_API, + user_from=user_from, + invoke_from=invoke_from, call_depth=0, ) diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 894e6f397a..8dbdc1d58c 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -20,7 +20,6 @@ from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_redis import redis_client from extensions.otel import WorkflowAppRunnerHandler, trace_span from libs.datetime_utils import naive_utc_now -from models.enums import UserFrom from models.workflow import Workflow logger = logging.getLogger(__name__) @@ -74,7 +73,12 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): workflow_execution_id=self.application_generate_entity.workflow_execution_id, ) + invoke_from = self.application_generate_entity.invoke_from # if only single iteration or single loop run is requested + if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run: + invoke_from = InvokeFrom.DEBUGGER + user_from = self._resolve_user_from(invoke_from) + if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run: graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution( workflow=self._workflow, @@ -102,6 +106,8 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): workflow_id=self._workflow.id, tenant_id=self._workflow.tenant_id, user_id=self.application_generate_entity.user_id, + user_from=user_from, + invoke_from=invoke_from, root_node_id=self._root_node_id, ) @@ -120,12 +126,8 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): graph=graph, graph_config=self._workflow.graph_dict, user_id=self.application_generate_entity.user_id, - user_from=( - UserFrom.ACCOUNT - if self.application_generate_entity.invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} - else UserFrom.END_USER - ), - invoke_from=self.application_generate_entity.invoke_from, + user_from=user_from, + invoke_from=invoke_from, call_depth=self.application_generate_entity.call_depth, variable_pool=variable_pool, graph_runtime_state=graph_runtime_state, diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 0e125b3538..7adf3504ac 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -77,10 +77,18 @@ class WorkflowBasedAppRunner: self._app_id = app_id self._graph_engine_layers = graph_engine_layers + @staticmethod + def _resolve_user_from(invoke_from: InvokeFrom) -> UserFrom: + if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER}: + return UserFrom.ACCOUNT + return UserFrom.END_USER + def _init_graph( self, graph_config: Mapping[str, Any], graph_runtime_state: GraphRuntimeState, + user_from: UserFrom, + invoke_from: InvokeFrom, workflow_id: str = "", tenant_id: str = "", user_id: str = "", @@ -105,8 +113,8 @@ class WorkflowBasedAppRunner: workflow_id=workflow_id, graph_config=graph_config, user_id=user_id, - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.SERVICE_API, + user_from=user_from, + invoke_from=invoke_from, call_depth=0, ) @@ -250,7 +258,7 @@ class WorkflowBasedAppRunner: graph_config=graph_config, user_id="", user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.SERVICE_API, + invoke_from=InvokeFrom.DEBUGGER, call_depth=0, )