diff --git a/api/core/app/layers/conversation_variable_persist_layer.py b/api/core/app/layers/conversation_variable_persist_layer.py index d5e6b04a4a..9125236af6 100644 --- a/api/core/app/layers/conversation_variable_persist_layer.py +++ b/api/core/app/layers/conversation_variable_persist_layer.py @@ -8,6 +8,7 @@ scope updates that matter to chat applications. """ import logging +from typing import override from core.workflow.system_variables import SystemVariableKey, get_system_text from core.workflow.variable_prefixes import CONVERSATION_VARIABLE_NODE_ID @@ -23,9 +24,11 @@ class ConversationVariablePersistenceLayer(GraphEngineLayer): super().__init__() self._conversation_variable_updater = conversation_variable_updater + @override def on_graph_start(self) -> None: pass + @override def on_event(self, event: GraphEngineEvent) -> None: if not isinstance(event, NodeRunVariableUpdatedEvent): return @@ -44,5 +47,6 @@ class ConversationVariablePersistenceLayer(GraphEngineLayer): self._conversation_variable_updater.update(conversation_id=conversation_id, variable=event.variable) + @override def on_graph_end(self, error: Exception | None) -> None: pass diff --git a/api/core/app/layers/pause_state_persist_layer.py b/api/core/app/layers/pause_state_persist_layer.py index 9811f9f830..d651721899 100644 --- a/api/core/app/layers/pause_state_persist_layer.py +++ b/api/core/app/layers/pause_state_persist_layer.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Annotated, Literal, Self +from typing import Annotated, Literal, Self, override from pydantic import BaseModel, Field from sqlalchemy import Engine @@ -83,6 +83,7 @@ class PauseStatePersistenceLayer(GraphEngineLayer): def _get_repo(self) -> APIWorkflowRunRepository: return DifyAPIRepositoryFactory.create_api_workflow_run_repository(self._session_maker) + @override def on_graph_start(self) -> None: """ Called when graph execution starts. @@ -92,6 +93,7 @@ class PauseStatePersistenceLayer(GraphEngineLayer): """ pass + @override def on_event(self, event: GraphEngineEvent) -> None: """ Called for every event emitted by the engine. @@ -132,6 +134,7 @@ class PauseStatePersistenceLayer(GraphEngineLayer): pause_reasons=event.reasons, ) + @override def on_graph_end(self, error: Exception | None) -> None: """ Called when graph execution ends. diff --git a/api/core/app/layers/suspend_layer.py b/api/core/app/layers/suspend_layer.py index 1a79a9f843..3e28303a7d 100644 --- a/api/core/app/layers/suspend_layer.py +++ b/api/core/app/layers/suspend_layer.py @@ -1,3 +1,5 @@ +from typing import override + from graphon.graph_engine.layers import GraphEngineLayer from graphon.graph_events import GraphEngineEvent, GraphRunPausedEvent @@ -9,9 +11,11 @@ class SuspendLayer(GraphEngineLayer): super().__init__() self._paused = False + @override def on_graph_start(self): self._paused = False + @override def on_event(self, event: GraphEngineEvent): """ Handle the paused event, stash runtime state into storage and wait for resume. @@ -19,6 +23,7 @@ class SuspendLayer(GraphEngineLayer): if isinstance(event, GraphRunPausedEvent): self._paused = True + @override def on_graph_end(self, error: Exception | None): """ """ self._paused = False diff --git a/api/core/app/layers/timeslice_layer.py b/api/core/app/layers/timeslice_layer.py index bb9fc1b6fa..094c21944d 100644 --- a/api/core/app/layers/timeslice_layer.py +++ b/api/core/app/layers/timeslice_layer.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import ClassVar +from typing import ClassVar, override from apscheduler.schedulers.background import BackgroundScheduler # type: ignore @@ -63,6 +63,7 @@ class TimeSliceLayer(GraphEngineLayer): except Exception: logger.exception("scheduler error during check if the workflow need to be suspended") + @override def on_graph_start(self): """ Start timer to check if the workflow need to be suspended. @@ -78,9 +79,11 @@ class TimeSliceLayer(GraphEngineLayer): id=self.schedule_id, ) + @override def on_event(self, event: GraphEngineEvent): pass + @override def on_graph_end(self, error: Exception | None) -> None: self.stopped = True # remove the scheduler diff --git a/api/core/app/layers/trigger_post_layer.py b/api/core/app/layers/trigger_post_layer.py index b60fe82ffe..65b8af6706 100644 --- a/api/core/app/layers/trigger_post_layer.py +++ b/api/core/app/layers/trigger_post_layer.py @@ -1,6 +1,6 @@ import logging from datetime import UTC, datetime -from typing import Any, ClassVar +from typing import Any, ClassVar, override from pydantic import TypeAdapter @@ -37,9 +37,11 @@ class TriggerPostLayer(GraphEngineLayer): self.start_time = start_time self.cfs_plan_scheduler_entity = cfs_plan_scheduler_entity + @override def on_graph_start(self): pass + @override def on_event(self, event: GraphEngineEvent): """ Update trigger log with success or failure. @@ -82,5 +84,6 @@ class TriggerPostLayer(GraphEngineLayer): repo.update(trigger_log) session.commit() + @override def on_graph_end(self, error: Exception | None) -> None: pass diff --git a/api/core/app/workflow/layers/persistence.py b/api/core/app/workflow/layers/persistence.py index c5dba65232..619590c81e 100644 --- a/api/core/app/workflow/layers/persistence.py +++ b/api/core/app/workflow/layers/persistence.py @@ -12,7 +12,7 @@ state. from collections.abc import Mapping from dataclasses import dataclass from datetime import datetime -from typing import Any, Union +from typing import Any, Union, override from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity from core.helper.trace_id_helper import ParentTraceContext @@ -98,12 +98,14 @@ class WorkflowPersistenceLayer(GraphEngineLayer): # ------------------------------------------------------------------ # GraphEngineLayer lifecycle # ------------------------------------------------------------------ + @override def on_graph_start(self) -> None: self._workflow_execution = None self._node_execution_cache.clear() self._node_snapshots.clear() self._node_sequence = 0 + @override def on_event(self, event: GraphEngineEvent) -> None: match event: case GraphRunStartedEvent(): @@ -131,6 +133,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer): case NodeRunPauseRequestedEvent(): self._handle_node_pause_requested(event) + @override def on_graph_end(self, error: Exception | None) -> None: return