diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 77fb8c1975..9e9f896da1 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -55,6 +55,7 @@ class WorkflowAppGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], triggered_from: Optional[WorkflowRunTriggeredFrom] = None, + root_node_id: Optional[str] = None, ) -> Generator[Mapping | str, None, None]: ... @overload @@ -70,6 +71,7 @@ class WorkflowAppGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], triggered_from: Optional[WorkflowRunTriggeredFrom] = None, + root_node_id: Optional[str] = None, ) -> Mapping[str, Any]: ... @overload @@ -85,6 +87,7 @@ class WorkflowAppGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], triggered_from: Optional[WorkflowRunTriggeredFrom] = None, + root_node_id: Optional[str] = None, ) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ... def generate( @@ -99,6 +102,7 @@ class WorkflowAppGenerator(BaseAppGenerator): call_depth: int = 0, workflow_thread_pool_id: Optional[str] = None, triggered_from: Optional[WorkflowRunTriggeredFrom] = None, + root_node_id: Optional[str] = None, ) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: files: Sequence[Mapping[str, Any]] = args.get("files") or [] @@ -194,6 +198,7 @@ class WorkflowAppGenerator(BaseAppGenerator): workflow_node_execution_repository=workflow_node_execution_repository, streaming=streaming, workflow_thread_pool_id=workflow_thread_pool_id, + root_node_id=root_node_id, ) def _generate( @@ -209,6 +214,7 @@ class WorkflowAppGenerator(BaseAppGenerator): streaming: bool = True, workflow_thread_pool_id: Optional[str] = None, variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER, + root_node_id: Optional[str] = None, ) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]: """ Generate App response. @@ -246,6 +252,7 @@ class WorkflowAppGenerator(BaseAppGenerator): "context": context, "workflow_thread_pool_id": workflow_thread_pool_id, "variable_loader": variable_loader, + "root_node_id": root_node_id, }, ) @@ -442,6 +449,7 @@ class WorkflowAppGenerator(BaseAppGenerator): context: contextvars.Context, variable_loader: VariableLoader, workflow_thread_pool_id: Optional[str] = None, + root_node_id: Optional[str] = None, ) -> None: """ Generate worker in a new thread. @@ -485,6 +493,7 @@ class WorkflowAppGenerator(BaseAppGenerator): variable_loader=variable_loader, workflow=workflow, system_user_id=system_user_id, + root_node_id=root_node_id, ) try: diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 4f4c1460ae..e57b582dcd 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -34,6 +34,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): workflow_thread_pool_id: Optional[str] = None, workflow: Workflow, system_user_id: str, + root_node_id: Optional[str] = None, ) -> None: super().__init__( queue_manager=queue_manager, @@ -44,6 +45,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): self.workflow_thread_pool_id = workflow_thread_pool_id self._workflow = workflow self._sys_user_id = system_user_id + self._root_node_id = root_node_id def run(self) -> None: """ @@ -93,7 +95,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): ) # init graph - graph = self._init_graph(graph_config=self._workflow.graph_dict) + graph = self._init_graph(graph_config=self._workflow.graph_dict, root_node_id=self._root_node_id) # RUN WORKFLOW workflow_entry = WorkflowEntry( diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 948ea95e63..e60abead66 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -1,5 +1,5 @@ from collections.abc import Mapping -from typing import Any, cast +from typing import Any, Optional, cast from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.entities.queue_entities import ( @@ -79,7 +79,7 @@ class WorkflowBasedAppRunner: self._variable_loader = variable_loader self._app_id = app_id - def _init_graph(self, graph_config: Mapping[str, Any]) -> Graph: + def _init_graph(self, graph_config: Mapping[str, Any], root_node_id: Optional[str] = None) -> Graph: """ Init graph """ @@ -92,7 +92,7 @@ class WorkflowBasedAppRunner: if not isinstance(graph_config.get("edges"), list): raise ValueError("edges in workflow graph must be a list") # init graph - graph = Graph.init(graph_config=graph_config) + graph = Graph.init(graph_config=graph_config, root_node_id=root_node_id) if not graph: raise ValueError("graph not found in workflow") diff --git a/api/migrations/versions/2025_08_23_2006-994bdf7197ab_add_workflow_trigger_logs.py b/api/migrations/versions/2025_08_23_2038-4558cfabe44e_add_workflow_trigger_logs.py similarity index 94% rename from api/migrations/versions/2025_08_23_2006-994bdf7197ab_add_workflow_trigger_logs.py rename to api/migrations/versions/2025_08_23_2038-4558cfabe44e_add_workflow_trigger_logs.py index 20760983b6..79c05d0aaa 100644 --- a/api/migrations/versions/2025_08_23_2006-994bdf7197ab_add_workflow_trigger_logs.py +++ b/api/migrations/versions/2025_08_23_2038-4558cfabe44e_add_workflow_trigger_logs.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 994bdf7197ab +Revision ID: 4558cfabe44e Revises: fa8b0fa6f407 -Create Date: 2025-08-23 20:06:35.995973 +Create Date: 2025-08-23 20:38:20.059323 """ from alembic import op @@ -11,7 +11,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '994bdf7197ab' +revision = '4558cfabe44e' down_revision = 'fa8b0fa6f407' branch_labels = None depends_on = None @@ -25,6 +25,7 @@ def upgrade(): sa.Column('app_id', models.types.StringUUID(), nullable=False), sa.Column('workflow_id', models.types.StringUUID(), nullable=False), sa.Column('workflow_run_id', models.types.StringUUID(), nullable=True), + sa.Column('root_node_id', sa.String(length=255), nullable=True), sa.Column('trigger_type', sa.String(length=50), nullable=False), sa.Column('trigger_data', sa.Text(), nullable=False), sa.Column('inputs', sa.Text(), nullable=False), diff --git a/api/models/workflow.py b/api/models/workflow.py index f00c0030dd..38eda31c5e 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1288,6 +1288,7 @@ class WorkflowTriggerLog(Base): - app_id (uuid) App ID - workflow_id (uuid) Workflow ID - workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts + - root_node_id (string) Optional - Custom starting node ID for workflow execution - trigger_type (string) Type of trigger: webhook, schedule, plugin - trigger_data (text) Full trigger data including inputs (JSON) - inputs (text) Input parameters (JSON) @@ -1321,6 +1322,7 @@ class WorkflowTriggerLog(Base): app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False) workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True) + root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) trigger_type: Mapped[str] = mapped_column(String(50), nullable=False) trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON diff --git a/api/services/async_workflow_service.py b/api/services/async_workflow_service.py index 448f6b2f63..5221f8f391 100644 --- a/api/services/async_workflow_service.py +++ b/api/services/async_workflow_service.py @@ -110,6 +110,7 @@ class AsyncWorkflowService: tenant_id=trigger_data.tenant_id, app_id=trigger_data.app_id, workflow_id=workflow.id, + root_node_id=trigger_data.root_node_id, trigger_type=trigger_data.trigger_type, trigger_data=trigger_data.model_dump_json(), inputs=json.dumps(dict(trigger_data.inputs)), diff --git a/api/services/workflow/entities.py b/api/services/workflow/entities.py index cfefa021b6..6c6da42bb9 100644 --- a/api/services/workflow/entities.py +++ b/api/services/workflow/entities.py @@ -25,6 +25,7 @@ class TriggerData(BaseModel): app_id: str tenant_id: str workflow_id: Optional[str] = None + root_node_id: str inputs: Mapping[str, Any] files: Sequence[Mapping[str, Any]] = Field(default_factory=list) trigger_type: WorkflowRunTriggeredFrom diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index 0612deb806..50824df078 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -128,6 +128,7 @@ def _execute_workflow_common(task_data: WorkflowTaskData) -> AsyncTriggerExecuti call_depth=0, workflow_thread_pool_id=None, triggered_from=trigger_data.trigger_type, + root_node_id=trigger_data.root_node_id, ) # Calculate elapsed time