mirror of https://github.com/langgenius/dify.git
feat: support pausing workflow trigger log
This commit is contained in:
parent
a1ea256e79
commit
dd6ab7c68c
|
|
@ -12,4 +12,5 @@ class SuspendLayer(GraphEngineLayer):
|
|||
pass
|
||||
|
||||
def on_graph_end(self, error: Exception | None):
|
||||
""" """
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from pydantic import TypeAdapter
|
||||
from sqlalchemy.orm import Session
|
||||
|
|
@ -21,6 +21,12 @@ class TriggerPostLayer(GraphEngineLayer):
|
|||
Trigger post layer.
|
||||
"""
|
||||
|
||||
_STATUS_MAP: ClassVar[dict[type[GraphEngineEvent], WorkflowTriggerStatus]] = {
|
||||
GraphRunSucceededEvent: WorkflowTriggerStatus.SUCCEEDED,
|
||||
GraphRunFailedEvent: WorkflowTriggerStatus.FAILED,
|
||||
GraphRunPausedEvent: WorkflowTriggerStatus.PAUSED,
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cfs_plan_scheduler_entity: TriggerWorkflowCFSPlanEntity,
|
||||
|
|
@ -38,7 +44,7 @@ class TriggerPostLayer(GraphEngineLayer):
|
|||
"""
|
||||
Update trigger log with success or failure.
|
||||
"""
|
||||
if isinstance(event, GraphRunSucceededEvent | GraphRunFailedEvent):
|
||||
if isinstance(event, tuple(self._STATUS_MAP.keys())):
|
||||
with Session(db.engine) as session:
|
||||
repo = SQLAlchemyWorkflowTriggerLogRepository(session)
|
||||
trigger_log = repo.get_by_id(self.trigger_log_id)
|
||||
|
|
@ -60,11 +66,7 @@ class TriggerPostLayer(GraphEngineLayer):
|
|||
total_tokens = self.graph_runtime_state.total_tokens
|
||||
|
||||
# Update trigger log with success
|
||||
trigger_log.status = (
|
||||
WorkflowTriggerStatus.SUCCEEDED
|
||||
if isinstance(event, GraphRunSucceededEvent)
|
||||
else WorkflowTriggerStatus.FAILED
|
||||
)
|
||||
trigger_log.status = self._STATUS_MAP[type(event)]
|
||||
trigger_log.workflow_run_id = workflow_run_id
|
||||
trigger_log.outputs = TypeAdapter(dict[str, Any]).dump_json(outputs).decode()
|
||||
trigger_log.elapsed_time = elapsed_time
|
||||
|
|
@ -72,9 +74,6 @@ class TriggerPostLayer(GraphEngineLayer):
|
|||
trigger_log.finished_at = datetime.now(UTC)
|
||||
repo.update(trigger_log)
|
||||
session.commit()
|
||||
elif isinstance(event, GraphRunPausedEvent):
|
||||
# FIXME: handle the paused event
|
||||
pass
|
||||
|
||||
def on_graph_end(self, error: Exception | None) -> None:
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ class WorkflowTriggerStatus(StrEnum):
|
|||
QUEUED = "queued"
|
||||
RUNNING = "running"
|
||||
SUCCEEDED = "succeeded"
|
||||
PAUSED = "paused"
|
||||
FAILED = "failed"
|
||||
RATE_LIMITED = "rate_limited"
|
||||
RETRYING = "retrying"
|
||||
|
|
|
|||
Loading…
Reference in New Issue