merge workflow pasuing

This commit is contained in:
Yeuoly 2025-10-30 14:54:14 +08:00
parent aa3b16a136
commit ffc3c61d00
6 changed files with 23 additions and 19 deletions

View File

@ -56,7 +56,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth: int,
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
root_node_id: Optional[str] = None,
layers: Optional[Sequence[GraphEngineLayer]] = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
) -> Generator[Mapping[str, Any] | str, None, None]: ...
@overload
@ -72,7 +72,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth: int,
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
root_node_id: Optional[str] = None,
layers: Optional[Sequence[GraphEngineLayer]] = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
) -> Mapping[str, Any]: ...
@overload
@ -88,7 +88,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth: int,
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
root_node_id: Optional[str] = None,
layers: Optional[Sequence[GraphEngineLayer]] = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: ...
def generate(
@ -103,7 +103,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth: int = 0,
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
root_node_id: Optional[str] = None,
layers: Optional[Sequence[GraphEngineLayer]] = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
files: Sequence[Mapping[str, Any]] = args.get("files") or []
@ -202,7 +202,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
root_node_id=root_node_id,
layers=layers,
graph_engine_layers=graph_engine_layers,
)
def resume(self, *, workflow_run_id: str) -> None:
@ -224,7 +224,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
streaming: bool = True,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
root_node_id: Optional[str] = None,
layers: Optional[Sequence[GraphEngineLayer]] = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
"""
Generate App response.
@ -263,7 +263,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
"root_node_id": root_node_id,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"layers": layers,
"graph_engine_layers": graph_engine_layers,
},
)
@ -458,7 +458,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
root_node_id: Optional[str] = None,
layers: Optional[Sequence[GraphEngineLayer]] = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
) -> None:
"""
Generate worker in a new thread.
@ -503,7 +503,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
root_node_id=root_node_id,
layers=layers,
graph_engine_layers=graph_engine_layers,
)
try:

View File

@ -41,12 +41,13 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
root_node_id: Optional[str] = None,
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
layers: Optional[Sequence[GraphEngineLayer]] = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
):
super().__init__(
queue_manager=queue_manager,
variable_loader=variable_loader,
app_id=application_generate_entity.app_config.app_id,
graph_engine_layers=graph_engine_layers,
)
self.application_generate_entity = application_generate_entity
self._workflow = workflow
@ -54,7 +55,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
self._root_node_id = root_node_id
self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository
self._layers = layers or []
def run(self):
"""

View File

@ -3,12 +3,11 @@ from datetime import UTC, datetime
from typing import Any, ClassVar
from pydantic import TypeAdapter
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, sessionmaker
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events.base import GraphEngineEvent
from core.workflow.graph_events.graph import GraphRunFailedEvent, GraphRunPausedEvent, GraphRunSucceededEvent
from models.engine import db
from models.enums import WorkflowTriggerStatus
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity
@ -32,10 +31,12 @@ class TriggerPostLayer(GraphEngineLayer):
cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity,
start_time: datetime,
trigger_log_id: str,
session_maker: sessionmaker[Session],
):
self.trigger_log_id = trigger_log_id
self.start_time = start_time
self.cfs_plan_scheduler_entity = cfs_plan_scheduler_entity
self.session_maker = session_maker
def on_graph_start(self):
pass
@ -45,7 +46,7 @@ class TriggerPostLayer(GraphEngineLayer):
Update trigger log with success or failure.
"""
if isinstance(event, tuple(self._STATUS_MAP.keys())):
with Session(db.engine) as session:
with self.session_maker() as session:
repo = SQLAlchemyWorkflowTriggerLogRepository(session)
trigger_log = repo.get_by_id(self.trigger_log_id)
if not trigger_log:
@ -62,7 +63,10 @@ class TriggerPostLayer(GraphEngineLayer):
outputs = self.graph_runtime_state.outputs
workflow_run_id = outputs.get("workflow_run_id")
# BASICLY, workflow_execution_id is the same as workflow_run_id
workflow_run_id = self.graph_runtime_state.system_variable.workflow_execution_id
assert workflow_run_id, "Workflow run id is not set"
total_tokens = self.graph_runtime_state.total_tokens
# Update trigger log with success

View File

@ -14,9 +14,9 @@ from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.engine_layers.timeslice_layer import TimeSliceLayer
from core.app.engine_layers.trigger_post_layer import TriggerPostLayer
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.layers.timeslice_layer import TimeSliceLayer
from core.app.layers.trigger_post_layer import TriggerPostLayer
from extensions.ext_database import db
from models.account import Account
from models.enums import CreatorUserRole, WorkflowTriggerStatus
@ -145,9 +145,9 @@ def _execute_workflow_common(
call_depth=0,
triggered_from=trigger_data.trigger_from,
root_node_id=trigger_data.root_node_id,
layers=[
graph_engine_layers=[
TimeSliceLayer(cfs_plan_scheduler),
TriggerPostLayer(cfs_plan_scheduler_entity, start_time, trigger_log.id),
TriggerPostLayer(cfs_plan_scheduler_entity, start_time, trigger_log.id, session_factory),
],
)