From dd6ab7c68cca6bc8b56f9269e3edd02cfcd5190b Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Wed, 22 Oct 2025 11:45:05 +0800 Subject: [PATCH] feat: support pausing workflow trigger log --- api/core/app/engine_layers/suspend_layer.py | 1 + .../app/engine_layers/trigger_post_layer.py | 19 +++++++++---------- api/models/enums.py | 1 + 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/api/core/app/engine_layers/suspend_layer.py b/api/core/app/engine_layers/suspend_layer.py index 65f1db35c7..f981523149 100644 --- a/api/core/app/engine_layers/suspend_layer.py +++ b/api/core/app/engine_layers/suspend_layer.py @@ -12,4 +12,5 @@ class SuspendLayer(GraphEngineLayer): pass def on_graph_end(self, error: Exception | None): + """ """ pass diff --git a/api/core/app/engine_layers/trigger_post_layer.py b/api/core/app/engine_layers/trigger_post_layer.py index ad757d5ffb..e5bf1e0a60 100644 --- a/api/core/app/engine_layers/trigger_post_layer.py +++ b/api/core/app/engine_layers/trigger_post_layer.py @@ -1,6 +1,6 @@ import logging from datetime import UTC, datetime -from typing import Any +from typing import Any, ClassVar from pydantic import TypeAdapter from sqlalchemy.orm import Session @@ -21,6 +21,12 @@ class TriggerPostLayer(GraphEngineLayer): Trigger post layer. """ + _STATUS_MAP: ClassVar[dict[type[GraphEngineEvent], WorkflowTriggerStatus]] = { + GraphRunSucceededEvent: WorkflowTriggerStatus.SUCCEEDED, + GraphRunFailedEvent: WorkflowTriggerStatus.FAILED, + GraphRunPausedEvent: WorkflowTriggerStatus.PAUSED, + } + def __init__( self, cfs_plan_scheduler_entity: TriggerWorkflowCFSPlanEntity, @@ -38,7 +44,7 @@ class TriggerPostLayer(GraphEngineLayer): """ Update trigger log with success or failure. """ - if isinstance(event, GraphRunSucceededEvent | GraphRunFailedEvent): + if isinstance(event, tuple(self._STATUS_MAP.keys())): with Session(db.engine) as session: repo = SQLAlchemyWorkflowTriggerLogRepository(session) trigger_log = repo.get_by_id(self.trigger_log_id) @@ -60,11 +66,7 @@ class TriggerPostLayer(GraphEngineLayer): total_tokens = self.graph_runtime_state.total_tokens # Update trigger log with success - trigger_log.status = ( - WorkflowTriggerStatus.SUCCEEDED - if isinstance(event, GraphRunSucceededEvent) - else WorkflowTriggerStatus.FAILED - ) + trigger_log.status = self._STATUS_MAP[type(event)] trigger_log.workflow_run_id = workflow_run_id trigger_log.outputs = TypeAdapter(dict[str, Any]).dump_json(outputs).decode() trigger_log.elapsed_time = elapsed_time @@ -72,9 +74,6 @@ class TriggerPostLayer(GraphEngineLayer): trigger_log.finished_at = datetime.now(UTC) repo.update(trigger_log) session.commit() - elif isinstance(event, GraphRunPausedEvent): - # FIXME: handle the paused event - pass def on_graph_end(self, error: Exception | None) -> None: pass diff --git a/api/models/enums.py b/api/models/enums.py index fc9d71a2bd..da35a34123 100644 --- a/api/models/enums.py +++ b/api/models/enums.py @@ -52,6 +52,7 @@ class WorkflowTriggerStatus(StrEnum): QUEUED = "queued" RUNNING = "running" SUCCEEDED = "succeeded" + PAUSED = "paused" FAILED = "failed" RATE_LIMITED = "rate_limited" RETRYING = "retrying"