feat/trigger: support specifying root node (#24388)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Yeuoly 2025-08-23 20:44:03 +08:00 committed by GitHub
parent 6aed7e3ff4
commit a7b558b38b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 24 additions and 7 deletions

View File

@ -55,6 +55,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
root_node_id: Optional[str] = None,
) -> Generator[Mapping | str, None, None]: ...
@overload
@ -70,6 +71,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
root_node_id: Optional[str] = None,
) -> Mapping[str, Any]: ...
@overload
@ -85,6 +87,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
root_node_id: Optional[str] = None,
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ...
def generate(
@ -99,6 +102,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth: int = 0,
workflow_thread_pool_id: Optional[str] = None,
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
root_node_id: Optional[str] = None,
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]:
files: Sequence[Mapping[str, Any]] = args.get("files") or []
@ -194,6 +198,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
workflow_thread_pool_id=workflow_thread_pool_id,
root_node_id=root_node_id,
)
def _generate(
@ -209,6 +214,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
streaming: bool = True,
workflow_thread_pool_id: Optional[str] = None,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
root_node_id: Optional[str] = None,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
"""
Generate App response.
@ -246,6 +252,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
"context": context,
"workflow_thread_pool_id": workflow_thread_pool_id,
"variable_loader": variable_loader,
"root_node_id": root_node_id,
},
)
@ -442,6 +449,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
context: contextvars.Context,
variable_loader: VariableLoader,
workflow_thread_pool_id: Optional[str] = None,
root_node_id: Optional[str] = None,
) -> None:
"""
Generate worker in a new thread.
@ -485,6 +493,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
variable_loader=variable_loader,
workflow=workflow,
system_user_id=system_user_id,
root_node_id=root_node_id,
)
try:

View File

@ -34,6 +34,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
workflow_thread_pool_id: Optional[str] = None,
workflow: Workflow,
system_user_id: str,
root_node_id: Optional[str] = None,
) -> None:
super().__init__(
queue_manager=queue_manager,
@ -44,6 +45,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
self.workflow_thread_pool_id = workflow_thread_pool_id
self._workflow = workflow
self._sys_user_id = system_user_id
self._root_node_id = root_node_id
def run(self) -> None:
"""
@ -93,7 +95,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
)
# init graph
graph = self._init_graph(graph_config=self._workflow.graph_dict)
graph = self._init_graph(graph_config=self._workflow.graph_dict, root_node_id=self._root_node_id)
# RUN WORKFLOW
workflow_entry = WorkflowEntry(

View File

@ -1,5 +1,5 @@
from collections.abc import Mapping
from typing import Any, cast
from typing import Any, Optional, cast
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.queue_entities import (
@ -79,7 +79,7 @@ class WorkflowBasedAppRunner:
self._variable_loader = variable_loader
self._app_id = app_id
def _init_graph(self, graph_config: Mapping[str, Any]) -> Graph:
def _init_graph(self, graph_config: Mapping[str, Any], root_node_id: Optional[str] = None) -> Graph:
"""
Init graph
"""
@ -92,7 +92,7 @@ class WorkflowBasedAppRunner:
if not isinstance(graph_config.get("edges"), list):
raise ValueError("edges in workflow graph must be a list")
# init graph
graph = Graph.init(graph_config=graph_config)
graph = Graph.init(graph_config=graph_config, root_node_id=root_node_id)
if not graph:
raise ValueError("graph not found in workflow")

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 994bdf7197ab
Revision ID: 4558cfabe44e
Revises: fa8b0fa6f407
Create Date: 2025-08-23 20:06:35.995973
Create Date: 2025-08-23 20:38:20.059323
"""
from alembic import op
@ -11,7 +11,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '994bdf7197ab'
revision = '4558cfabe44e'
down_revision = 'fa8b0fa6f407'
branch_labels = None
depends_on = None
@ -25,6 +25,7 @@ def upgrade():
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('workflow_id', models.types.StringUUID(), nullable=False),
sa.Column('workflow_run_id', models.types.StringUUID(), nullable=True),
sa.Column('root_node_id', sa.String(length=255), nullable=True),
sa.Column('trigger_type', sa.String(length=50), nullable=False),
sa.Column('trigger_data', sa.Text(), nullable=False),
sa.Column('inputs', sa.Text(), nullable=False),

View File

@ -1288,6 +1288,7 @@ class WorkflowTriggerLog(Base):
- app_id (uuid) App ID
- workflow_id (uuid) Workflow ID
- workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts
- root_node_id (string) Optional - Custom starting node ID for workflow execution
- trigger_type (string) Type of trigger: webhook, schedule, plugin
- trigger_data (text) Full trigger data including inputs (JSON)
- inputs (text) Input parameters (JSON)
@ -1321,6 +1322,7 @@ class WorkflowTriggerLog(Base):
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True)
root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
trigger_type: Mapped[str] = mapped_column(String(50), nullable=False)
trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON

View File

@ -110,6 +110,7 @@ class AsyncWorkflowService:
tenant_id=trigger_data.tenant_id,
app_id=trigger_data.app_id,
workflow_id=workflow.id,
root_node_id=trigger_data.root_node_id,
trigger_type=trigger_data.trigger_type,
trigger_data=trigger_data.model_dump_json(),
inputs=json.dumps(dict(trigger_data.inputs)),

View File

@ -25,6 +25,7 @@ class TriggerData(BaseModel):
app_id: str
tenant_id: str
workflow_id: Optional[str] = None
root_node_id: str
inputs: Mapping[str, Any]
files: Sequence[Mapping[str, Any]] = Field(default_factory=list)
trigger_type: WorkflowRunTriggeredFrom

View File

@ -128,6 +128,7 @@ def _execute_workflow_common(task_data: WorkflowTaskData) -> AsyncTriggerExecuti
call_depth=0,
workflow_thread_pool_id=None,
triggered_from=trigger_data.trigger_type,
root_node_id=trigger_data.root_node_id,
)
# Calculate elapsed time