diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 30d383ec02..5f03a7cd37 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -147,9 +147,12 @@ class WorkflowTaskStopApi(Resource): """ Stop workflow task """ - # TODO workflow_service = WorkflowService() - workflow_service.stop_workflow_task(app_model=app_model, task_id=task_id, account=current_user) + workflow_service.stop_workflow_task( + task_id=task_id, + user=current_user, + invoke_from=InvokeFrom.DEBUGGER + ) return { "result": "success" diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 92286c9af0..ed45e2ba8a 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -11,7 +11,7 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline -from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom @@ -123,11 +123,13 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): worker_thread.start() # return response or stream generator - return self._handle_response( + return self._handle_advanced_chat_response( application_generate_entity=application_generate_entity, + workflow=workflow, queue_manager=queue_manager, conversation=conversation, message=message, + user=user, stream=stream ) @@ -159,7 +161,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): conversation=conversation, message=message ) - except ConversationTaskStoppedException: + except GenerateTaskStoppedException: pass except InvokeAuthorizationError: queue_manager.publish_error( @@ -177,33 +179,40 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): finally: db.session.remove() - def _handle_response(self, application_generate_entity: AdvancedChatAppGenerateEntity, - queue_manager: AppQueueManager, - conversation: Conversation, - message: Message, - stream: bool = False) -> Union[dict, Generator]: + def _handle_advanced_chat_response(self, application_generate_entity: AdvancedChatAppGenerateEntity, + workflow: Workflow, + queue_manager: AppQueueManager, + conversation: Conversation, + message: Message, + user: Union[Account, EndUser], + stream: bool = False) -> Union[dict, Generator]: """ Handle response. :param application_generate_entity: application generate entity + :param workflow: workflow :param queue_manager: queue manager :param conversation: conversation :param message: message + :param user: account or end user :param stream: is stream :return: """ # init generate task pipeline generate_task_pipeline = AdvancedChatAppGenerateTaskPipeline( application_generate_entity=application_generate_entity, + workflow=workflow, queue_manager=queue_manager, conversation=conversation, - message=message + message=message, + user=user, + stream=stream ) try: - return generate_task_pipeline.process(stream=stream) + return generate_task_pipeline.process() except ValueError as e: if e.args[0] == "I/O operation on closed file.": # ignore this error - raise ConversationTaskStoppedException() + raise GenerateTaskStoppedException() else: logger.exception(e) raise e diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 077f0c2de0..3279e00355 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -1,6 +1,6 @@ import logging import time -from typing import cast +from typing import Optional, cast from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig from core.app.apps.advanced_chat.workflow_event_trigger_callback import WorkflowEventTriggerCallback @@ -8,16 +8,14 @@ from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_runner import AppRunner from core.app.entities.app_invoke_entities import ( AdvancedChatAppGenerateEntity, - InvokeFrom, ) from core.app.entities.queue_entities import QueueAnnotationReplyEvent, QueueStopEvent, QueueTextChunkEvent from core.moderation.base import ModerationException from core.workflow.entities.node_entities import SystemVariable from core.workflow.workflow_engine_manager import WorkflowEngineManager from extensions.ext_database import db -from models.account import Account -from models.model import App, Conversation, EndUser, Message -from models.workflow import WorkflowRunTriggeredFrom +from models.model import App, Conversation, Message +from models.workflow import Workflow logger = logging.getLogger(__name__) @@ -46,7 +44,7 @@ class AdvancedChatAppRunner(AppRunner): if not app_record: raise ValueError("App not found") - workflow = WorkflowEngineManager().get_workflow(app_model=app_record, workflow_id=app_config.workflow_id) + workflow = self.get_workflow(app_model=app_record, workflow_id=app_config.workflow_id) if not workflow: raise ValueError("Workflow not initialized") @@ -74,19 +72,10 @@ class AdvancedChatAppRunner(AppRunner): ): return - # fetch user - if application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]: - user = db.session.query(Account).filter(Account.id == application_generate_entity.user_id).first() - else: - user = db.session.query(EndUser).filter(EndUser.id == application_generate_entity.user_id).first() - # RUN WORKFLOW workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager.run_workflow( workflow=workflow, - triggered_from=WorkflowRunTriggeredFrom.DEBUGGING - if application_generate_entity.invoke_from == InvokeFrom.DEBUGGER else WorkflowRunTriggeredFrom.APP_RUN, - user=user, user_inputs=inputs, system_inputs={ SystemVariable.QUERY: query, @@ -99,6 +88,20 @@ class AdvancedChatAppRunner(AppRunner): )] ) + def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: + """ + Get workflow + """ + # fetch workflow by workflow_id + workflow = db.session.query(Workflow).filter( + Workflow.tenant_id == app_model.tenant_id, + Workflow.app_id == app_model.id, + Workflow.id == workflow_id + ).first() + + # return workflow + return workflow + def handle_input_moderation(self, queue_manager: AppQueueManager, app_record: App, app_generate_entity: AdvancedChatAppGenerateEntity, diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index db22607146..18bc9c8008 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -4,9 +4,10 @@ import time from collections.abc import Generator from typing import Optional, Union -from pydantic import BaseModel +from pydantic import BaseModel, Extra from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.workflow_based_generate_task_pipeline import WorkflowBasedGenerateTaskPipeline from core.app.entities.app_invoke_entities import ( AdvancedChatAppGenerateEntity, InvokeFrom, @@ -16,25 +17,35 @@ from core.app.entities.queue_entities import ( QueueErrorEvent, QueueMessageFileEvent, QueueMessageReplaceEvent, - QueueNodeFinishedEvent, + QueueNodeFailedEvent, QueueNodeStartedEvent, + QueueNodeSucceededEvent, QueuePingEvent, QueueRetrieverResourcesEvent, QueueStopEvent, QueueTextChunkEvent, - QueueWorkflowFinishedEvent, + QueueWorkflowFailedEvent, QueueWorkflowStartedEvent, + QueueWorkflowSucceededEvent, ) from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError from core.moderation.output_moderation import ModerationRule, OutputModeration from core.tools.tool_file_manager import ToolFileManager -from core.workflow.entities.node_entities import NodeType +from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeType, SystemVariable from events.message_event import message_was_created from extensions.ext_database import db -from models.model import Conversation, Message, MessageFile -from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowRun, WorkflowRunStatus +from models.account import Account +from models.model import Conversation, EndUser, Message, MessageFile +from models.workflow import ( + Workflow, + WorkflowNodeExecution, + WorkflowNodeExecutionStatus, + WorkflowRun, + WorkflowRunStatus, + WorkflowRunTriggeredFrom, +) from services.annotation_service import AppAnnotationService logger = logging.getLogger(__name__) @@ -47,41 +58,63 @@ class TaskState(BaseModel): answer: str = "" metadata: dict = {} usage: LLMUsage - workflow_run_id: Optional[str] = None + + workflow_run: Optional[WorkflowRun] = None + start_at: Optional[float] = None + total_tokens: int = 0 + total_steps: int = 0 + + current_node_execution: Optional[WorkflowNodeExecution] = None + current_node_execution_start_at: Optional[float] = None + + class Config: + """Configuration for this pydantic object.""" + + extra = Extra.forbid + arbitrary_types_allowed = True -class AdvancedChatAppGenerateTaskPipeline: +class AdvancedChatAppGenerateTaskPipeline(WorkflowBasedGenerateTaskPipeline): """ AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application. """ def __init__(self, application_generate_entity: AdvancedChatAppGenerateEntity, + workflow: Workflow, queue_manager: AppQueueManager, conversation: Conversation, - message: Message) -> None: + message: Message, + user: Union[Account, EndUser], + stream: bool) -> None: """ Initialize GenerateTaskPipeline. :param application_generate_entity: application generate entity + :param workflow: workflow :param queue_manager: queue manager :param conversation: conversation :param message: message + :param user: user + :param stream: stream """ self._application_generate_entity = application_generate_entity + self._workflow = workflow self._queue_manager = queue_manager self._conversation = conversation self._message = message + self._user = user self._task_state = TaskState( usage=LLMUsage.empty_usage() ) self._start_at = time.perf_counter() self._output_moderation_handler = self._init_output_moderation() + self._stream = stream - def process(self, stream: bool) -> Union[dict, Generator]: + def process(self) -> Union[dict, Generator]: """ Process generate task pipeline. :return: """ - if stream: + if self._stream: return self._process_stream_response() else: return self._process_blocking_response() @@ -112,22 +145,17 @@ class AdvancedChatAppGenerateTaskPipeline: self._task_state.answer = annotation.content elif isinstance(event, QueueWorkflowStartedEvent): - self._task_state.workflow_run_id = event.workflow_run_id - elif isinstance(event, QueueNodeFinishedEvent): - workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) - if workflow_node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED.value: - if workflow_node_execution.node_type == NodeType.LLM.value: - outputs = workflow_node_execution.outputs_dict - usage_dict = outputs.get('usage', {}) - self._task_state.metadata['usage'] = usage_dict - elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent): - if isinstance(event, QueueWorkflowFinishedEvent): - workflow_run = self._get_workflow_run(event.workflow_run_id) - if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: - outputs = workflow_run.outputs - self._task_state.answer = outputs.get('text', '') - else: - raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}'))) + self._on_workflow_start() + elif isinstance(event, QueueNodeStartedEvent): + self._on_node_start(event) + elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent): + self._on_node_finished(event) + elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent): + self._on_workflow_finished(event) + workflow_run = self._task_state.workflow_run + + if workflow_run.status != WorkflowRunStatus.SUCCEEDED.value: + raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}'))) # response moderation if self._output_moderation_handler: @@ -173,8 +201,9 @@ class AdvancedChatAppGenerateTaskPipeline: yield self._yield_response(data) break elif isinstance(event, QueueWorkflowStartedEvent): - workflow_run = self._get_workflow_run(event.workflow_run_id) - self._task_state.workflow_run_id = workflow_run.id + self._on_workflow_start() + workflow_run = self._task_state.workflow_run + response = { 'event': 'workflow_started', 'task_id': self._application_generate_entity.task_id, @@ -188,7 +217,9 @@ class AdvancedChatAppGenerateTaskPipeline: yield self._yield_response(response) elif isinstance(event, QueueNodeStartedEvent): - workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) + self._on_node_start(event) + workflow_node_execution = self._task_state.current_node_execution + response = { 'event': 'node_started', 'task_id': self._application_generate_entity.task_id, @@ -204,8 +235,10 @@ class AdvancedChatAppGenerateTaskPipeline: } yield self._yield_response(response) - elif isinstance(event, QueueNodeFinishedEvent): - workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) + elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent): + self._on_node_finished(event) + workflow_node_execution = self._task_state.current_node_execution + if workflow_node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED.value: if workflow_node_execution.node_type == NodeType.LLM.value: outputs = workflow_node_execution.outputs_dict @@ -234,16 +267,11 @@ class AdvancedChatAppGenerateTaskPipeline: } yield self._yield_response(response) - elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent): - if isinstance(event, QueueStopEvent): - workflow_run = self._get_workflow_run(self._task_state.workflow_run_id) - else: - workflow_run = self._get_workflow_run(event.workflow_run_id) + elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent): + self._on_workflow_finished(event) + workflow_run = self._task_state.workflow_run - if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: - outputs = workflow_run.outputs_dict - self._task_state.answer = outputs.get('text', '') - else: + if workflow_run.status != WorkflowRunStatus.SUCCEEDED.value: err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')) data = self._error_to_stream_response_data(self._handle_error(err_event)) yield self._yield_response(data) @@ -252,7 +280,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_run_response = { 'event': 'workflow_finished', 'task_id': self._application_generate_entity.task_id, - 'workflow_run_id': event.workflow_run_id, + 'workflow_run_id': workflow_run.id, 'data': { 'id': workflow_run.id, 'workflow_id': workflow_run.workflow_id, @@ -390,6 +418,102 @@ class AdvancedChatAppGenerateTaskPipeline: else: continue + def _on_workflow_start(self) -> None: + self._task_state.start_at = time.perf_counter() + + workflow_run = self._init_workflow_run( + workflow=self._workflow, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING + if self._application_generate_entity.invoke_from == InvokeFrom.DEBUGGER + else WorkflowRunTriggeredFrom.APP_RUN, + user=self._user, + user_inputs=self._application_generate_entity.inputs, + system_inputs={ + SystemVariable.QUERY: self._message.query, + SystemVariable.FILES: self._application_generate_entity.files, + SystemVariable.CONVERSATION: self._conversation.id, + } + ) + + self._task_state.workflow_run = workflow_run + + def _on_node_start(self, event: QueueNodeStartedEvent) -> None: + workflow_node_execution = self._init_node_execution_from_workflow_run( + workflow_run=self._task_state.workflow_run, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_data.title, + node_run_index=event.node_run_index, + predecessor_node_id=event.predecessor_node_id + ) + + self._task_state.current_node_execution = workflow_node_execution + self._task_state.current_node_execution_start_at = time.perf_counter() + self._task_state.total_steps += 1 + + def _on_node_finished(self, event: QueueNodeSucceededEvent | QueueNodeFailedEvent) -> None: + if isinstance(event, QueueNodeSucceededEvent): + workflow_node_execution = self._workflow_node_execution_success( + workflow_node_execution=self._task_state.current_node_execution, + start_at=self._task_state.current_node_execution_start_at, + inputs=event.inputs, + process_data=event.process_data, + outputs=event.outputs, + execution_metadata=event.execution_metadata + ) + + if event.execution_metadata and event.execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS): + self._task_state.total_tokens += ( + int(event.execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS))) + + if workflow_node_execution.node_type == NodeType.LLM.value: + outputs = workflow_node_execution.outputs_dict + usage_dict = outputs.get('usage', {}) + self._task_state.metadata['usage'] = usage_dict + else: + workflow_node_execution = self._workflow_node_execution_failed( + workflow_node_execution=self._task_state.current_node_execution, + start_at=self._task_state.current_node_execution_start_at, + error=event.error + ) + + self._task_state.current_node_execution = workflow_node_execution + + def _on_workflow_finished(self, event: QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent) -> None: + if isinstance(event, QueueStopEvent): + workflow_run = self._workflow_run_failed( + workflow_run=self._task_state.workflow_run, + start_at=self._task_state.start_at, + total_tokens=self._task_state.total_tokens, + total_steps=self._task_state.total_steps, + status=WorkflowRunStatus.STOPPED, + error='Workflow stopped.' + ) + elif isinstance(event, QueueWorkflowFailedEvent): + workflow_run = self._workflow_run_failed( + workflow_run=self._task_state.workflow_run, + start_at=self._task_state.start_at, + total_tokens=self._task_state.total_tokens, + total_steps=self._task_state.total_steps, + status=WorkflowRunStatus.FAILED, + error=event.error + ) + else: + workflow_run = self._workflow_run_success( + workflow_run=self._task_state.workflow_run, + start_at=self._task_state.start_at, + total_tokens=self._task_state.total_tokens, + total_steps=self._task_state.total_steps, + outputs=self._task_state.current_node_execution.outputs + if self._task_state.current_node_execution else None + ) + + self._task_state.workflow_run = workflow_run + + if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: + outputs = workflow_run.outputs_dict + self._task_state.answer = outputs.get('text', '') + def _get_workflow_run(self, workflow_run_id: str) -> WorkflowRun: """ Get workflow run. @@ -397,11 +521,6 @@ class AdvancedChatAppGenerateTaskPipeline: :return: """ workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first() - if workflow_run: - # Because the workflow_run will be modified in the sub-thread, - # and the first query in the main thread will cache the entity, - # you need to expire the entity after the query - db.session.expire(workflow_run) return workflow_run def _get_workflow_node_execution(self, workflow_node_execution_id: str) -> WorkflowNodeExecution: @@ -412,11 +531,6 @@ class AdvancedChatAppGenerateTaskPipeline: """ workflow_node_execution = (db.session.query(WorkflowNodeExecution) .filter(WorkflowNodeExecution.id == workflow_node_execution_id).first()) - if workflow_node_execution: - # Because the workflow_node_execution will be modified in the sub-thread, - # and the first query in the main thread will cache the entity, - # you need to expire the entity after the query - db.session.expire(workflow_node_execution) return workflow_node_execution def _save_message(self) -> None: @@ -428,7 +542,7 @@ class AdvancedChatAppGenerateTaskPipeline: self._message.answer = self._task_state.answer self._message.provider_response_latency = time.perf_counter() - self._start_at - self._message.workflow_run_id = self._task_state.workflow_run_id + self._message.workflow_run_id = self._task_state.workflow_run.id if self._task_state.metadata and self._task_state.metadata.get('usage'): usage = LLMUsage(**self._task_state.metadata['usage']) diff --git a/api/core/app/apps/advanced_chat/workflow_event_trigger_callback.py b/api/core/app/apps/advanced_chat/workflow_event_trigger_callback.py index 8f72305bb1..d9c8a2c96d 100644 --- a/api/core/app/apps/advanced_chat/workflow_event_trigger_callback.py +++ b/api/core/app/apps/advanced_chat/workflow_event_trigger_callback.py @@ -1,14 +1,19 @@ +from typing import Optional + from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.entities.queue_entities import ( - QueueNodeFinishedEvent, + QueueNodeFailedEvent, QueueNodeStartedEvent, + QueueNodeSucceededEvent, QueueTextChunkEvent, - QueueWorkflowFinishedEvent, + QueueWorkflowFailedEvent, QueueWorkflowStartedEvent, + QueueWorkflowSucceededEvent, ) from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback +from core.workflow.entities.base_node_data_entities import BaseNodeData from core.workflow.entities.node_entities import NodeType -from models.workflow import Workflow, WorkflowNodeExecution, WorkflowRun +from models.workflow import Workflow class WorkflowEventTriggerCallback(BaseWorkflowCallback): @@ -17,39 +22,91 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback): self._queue_manager = queue_manager self._streamable_node_ids = self._fetch_streamable_node_ids(workflow.graph_dict) - def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None: + def on_workflow_run_started(self) -> None: """ Workflow run started """ self._queue_manager.publish( - QueueWorkflowStartedEvent(workflow_run_id=workflow_run.id), + QueueWorkflowStartedEvent(), PublishFrom.APPLICATION_MANAGER ) - def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None: + def on_workflow_run_succeeded(self) -> None: """ - Workflow run finished + Workflow run succeeded """ self._queue_manager.publish( - QueueWorkflowFinishedEvent(workflow_run_id=workflow_run.id), + QueueWorkflowSucceededEvent(), PublishFrom.APPLICATION_MANAGER ) - def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None: + def on_workflow_run_failed(self, error: str) -> None: + """ + Workflow run failed + """ + self._queue_manager.publish( + QueueWorkflowFailedEvent( + error=error + ), + PublishFrom.APPLICATION_MANAGER + ) + + def on_workflow_node_execute_started(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + node_run_index: int = 1, + predecessor_node_id: Optional[str] = None) -> None: """ Workflow node execute started """ self._queue_manager.publish( - QueueNodeStartedEvent(workflow_node_execution_id=workflow_node_execution.id), + QueueNodeStartedEvent( + node_id=node_id, + node_type=node_type, + node_data=node_data, + node_run_index=node_run_index, + predecessor_node_id=predecessor_node_id + ), PublishFrom.APPLICATION_MANAGER ) - def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None: + def on_workflow_node_execute_succeeded(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + inputs: Optional[dict] = None, + process_data: Optional[dict] = None, + outputs: Optional[dict] = None, + execution_metadata: Optional[dict] = None) -> None: """ - Workflow node execute finished + Workflow node execute succeeded """ self._queue_manager.publish( - QueueNodeFinishedEvent(workflow_node_execution_id=workflow_node_execution.id), + QueueNodeSucceededEvent( + node_id=node_id, + node_type=node_type, + node_data=node_data, + inputs=inputs, + process_data=process_data, + outputs=outputs, + execution_metadata=execution_metadata + ), + PublishFrom.APPLICATION_MANAGER + ) + + def on_workflow_node_execute_failed(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + error: str) -> None: + """ + Workflow node execute failed + """ + self._queue_manager.publish( + QueueNodeFailedEvent( + node_id=node_id, + node_type=node_type, + node_data=node_data, + error=error + ), PublishFrom.APPLICATION_MANAGER ) diff --git a/api/core/app/apps/agent_chat/app_generator.py b/api/core/app/apps/agent_chat/app_generator.py index 6d27620a09..700a340c96 100644 --- a/api/core/app/apps/agent_chat/app_generator.py +++ b/api/core/app/apps/agent_chat/app_generator.py @@ -11,7 +11,7 @@ from core.app.app_config.easy_ui_based_app.model_config.converter import ModelCo from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfigManager from core.app.apps.agent_chat.app_runner import AgentChatAppRunner -from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, InvokeFrom @@ -177,7 +177,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator): conversation=conversation, message=message ) - except ConversationTaskStoppedException: + except GenerateTaskStoppedException: pass except InvokeAuthorizationError: queue_manager.publish_error( diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index 289567fe5d..43a44819f9 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -11,11 +11,8 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( AppQueueEvent, QueueErrorEvent, - QueueMessage, - QueueMessageEndEvent, QueuePingEvent, QueueStopEvent, - QueueWorkflowFinishedEvent, ) from extensions.ext_redis import redis_client @@ -103,22 +100,16 @@ class AppQueueManager: :return: """ self._check_for_sqlalchemy_models(event.dict()) - - message = self.construct_queue_message(event) - - self._q.put(message) - - if isinstance(event, QueueStopEvent - | QueueErrorEvent - | QueueMessageEndEvent - | QueueWorkflowFinishedEvent): - self.stop_listen() - - if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped(): - raise ConversationTaskStoppedException() + self._publish(event, pub_from) @abstractmethod - def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage: + def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None: + """ + Publish event to queue + :param event: + :param pub_from: + :return: + """ raise NotImplementedError @classmethod @@ -182,5 +173,5 @@ class AppQueueManager: "that cause thread safety issues is not allowed.") -class ConversationTaskStoppedException(Exception): +class GenerateTaskStoppedException(Exception): pass diff --git a/api/core/app/apps/chat/app_generator.py b/api/core/app/apps/chat/app_generator.py index 7ddf8dfe32..317d045c04 100644 --- a/api/core/app/apps/chat/app_generator.py +++ b/api/core/app/apps/chat/app_generator.py @@ -9,7 +9,7 @@ from pydantic import ValidationError from core.app.app_config.easy_ui_based_app.model_config.converter import ModelConfigConverter from core.app.app_config.features.file_upload.manager import FileUploadConfigManager -from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom from core.app.apps.chat.app_config_manager import ChatAppConfigManager from core.app.apps.chat.app_runner import ChatAppRunner from core.app.apps.message_based_app_generator import MessageBasedAppGenerator @@ -177,7 +177,7 @@ class ChatAppGenerator(MessageBasedAppGenerator): conversation=conversation, message=message ) - except ConversationTaskStoppedException: + except GenerateTaskStoppedException: pass except InvokeAuthorizationError: queue_manager.publish_error( diff --git a/api/core/app/apps/completion/app_generator.py b/api/core/app/apps/completion/app_generator.py index 7150bee3ce..b948938aac 100644 --- a/api/core/app/apps/completion/app_generator.py +++ b/api/core/app/apps/completion/app_generator.py @@ -9,7 +9,7 @@ from pydantic import ValidationError from core.app.app_config.easy_ui_based_app.model_config.converter import ModelConfigConverter from core.app.app_config.features.file_upload.manager import FileUploadConfigManager -from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom from core.app.apps.completion.app_config_manager import CompletionAppConfigManager from core.app.apps.completion.app_runner import CompletionAppRunner from core.app.apps.message_based_app_generator import MessageBasedAppGenerator @@ -166,7 +166,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator): queue_manager=queue_manager, message=message ) - except ConversationTaskStoppedException: + except GenerateTaskStoppedException: pass except InvokeAuthorizationError: queue_manager.publish_error( diff --git a/api/core/app/apps/message_based_app_generator.py b/api/core/app/apps/message_based_app_generator.py index 3dee68b5e1..0e76c96ff7 100644 --- a/api/core/app/apps/message_based_app_generator.py +++ b/api/core/app/apps/message_based_app_generator.py @@ -7,7 +7,7 @@ from sqlalchemy import and_ from core.app.app_config.entities import EasyUIBasedAppModelConfigFrom from core.app.apps.base_app_generator import BaseAppGenerator -from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException +from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException from core.app.apps.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline from core.app.entities.app_invoke_entities import ( AdvancedChatAppGenerateEntity, @@ -60,7 +60,7 @@ class MessageBasedAppGenerator(BaseAppGenerator): return generate_task_pipeline.process(stream=stream) except ValueError as e: if e.args[0] == "I/O operation on closed file.": # ignore this error - raise ConversationTaskStoppedException() + raise GenerateTaskStoppedException() else: logger.exception(e) raise e diff --git a/api/core/app/apps/message_based_app_queue_manager.py b/api/core/app/apps/message_based_app_queue_manager.py index 13644c99ae..6d0a71f495 100644 --- a/api/core/app/apps/message_based_app_queue_manager.py +++ b/api/core/app/apps/message_based_app_queue_manager.py @@ -1,9 +1,14 @@ -from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( AppQueueEvent, MessageQueueMessage, + QueueErrorEvent, QueueMessage, + QueueMessageEndEvent, + QueueStopEvent, + QueueWorkflowFailedEvent, + QueueWorkflowSucceededEvent, ) @@ -28,3 +33,31 @@ class MessageBasedAppQueueManager(AppQueueManager): app_mode=self._app_mode, event=event ) + + def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None: + """ + Publish event to queue + :param event: + :param pub_from: + :return: + """ + message = MessageQueueMessage( + task_id=self._task_id, + message_id=self._message_id, + conversation_id=self._conversation_id, + app_mode=self._app_mode, + event=event + ) + + self._q.put(message) + + if isinstance(event, QueueStopEvent + | QueueErrorEvent + | QueueMessageEndEvent + | QueueWorkflowSucceededEvent + | QueueWorkflowFailedEvent): + self.stop_listen() + + if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped(): + raise GenerateTaskStoppedException() + diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 891ca4c2be..d3303047ca 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -9,7 +9,7 @@ from pydantic import ValidationError from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.apps.base_app_generator import BaseAppGenerator -from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager from core.app.apps.workflow.app_runner import WorkflowAppRunner @@ -95,7 +95,9 @@ class WorkflowAppGenerator(BaseAppGenerator): # return response or stream generator return self._handle_response( application_generate_entity=application_generate_entity, + workflow=workflow, queue_manager=queue_manager, + user=user, stream=stream ) @@ -117,7 +119,7 @@ class WorkflowAppGenerator(BaseAppGenerator): application_generate_entity=application_generate_entity, queue_manager=queue_manager ) - except ConversationTaskStoppedException: + except GenerateTaskStoppedException: pass except InvokeAuthorizationError: queue_manager.publish_error( @@ -136,19 +138,25 @@ class WorkflowAppGenerator(BaseAppGenerator): db.session.remove() def _handle_response(self, application_generate_entity: WorkflowAppGenerateEntity, + workflow: Workflow, queue_manager: AppQueueManager, + user: Union[Account, EndUser], stream: bool = False) -> Union[dict, Generator]: """ Handle response. :param application_generate_entity: application generate entity + :param workflow: workflow :param queue_manager: queue manager + :param user: account or end user :param stream: is stream :return: """ # init generate task pipeline generate_task_pipeline = WorkflowAppGenerateTaskPipeline( application_generate_entity=application_generate_entity, + workflow=workflow, queue_manager=queue_manager, + user=user, stream=stream ) @@ -156,7 +164,7 @@ class WorkflowAppGenerator(BaseAppGenerator): return generate_task_pipeline.process() except ValueError as e: if e.args[0] == "I/O operation on closed file.": # ignore this error - raise ConversationTaskStoppedException() + raise GenerateTaskStoppedException() else: logger.exception(e) raise e diff --git a/api/core/app/apps/workflow/app_queue_manager.py b/api/core/app/apps/workflow/app_queue_manager.py index 5cf1e58913..f448138b53 100644 --- a/api/core/app/apps/workflow/app_queue_manager.py +++ b/api/core/app/apps/workflow/app_queue_manager.py @@ -1,8 +1,12 @@ -from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( AppQueueEvent, - QueueMessage, + QueueErrorEvent, + QueueMessageEndEvent, + QueueStopEvent, + QueueWorkflowFailedEvent, + QueueWorkflowSucceededEvent, WorkflowQueueMessage, ) @@ -16,9 +20,27 @@ class WorkflowAppQueueManager(AppQueueManager): self._app_mode = app_mode - def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage: - return WorkflowQueueMessage( + def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None: + """ + Publish event to queue + :param event: + :param pub_from: + :return: + """ + message = WorkflowQueueMessage( task_id=self._task_id, app_mode=self._app_mode, event=event ) + + self._q.put(message) + + if isinstance(event, QueueStopEvent + | QueueErrorEvent + | QueueMessageEndEvent + | QueueWorkflowSucceededEvent + | QueueWorkflowFailedEvent): + self.stop_listen() + + if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped(): + raise GenerateTaskStoppedException() diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 132282ffe3..59a385cb38 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -1,13 +1,12 @@ import logging import time -from typing import cast +from typing import Optional, cast from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.workflow.app_config_manager import WorkflowAppConfig from core.app.apps.workflow.workflow_event_trigger_callback import WorkflowEventTriggerCallback from core.app.entities.app_invoke_entities import ( AppGenerateEntity, - InvokeFrom, WorkflowAppGenerateEntity, ) from core.app.entities.queue_entities import QueueStopEvent, QueueTextChunkEvent @@ -16,9 +15,8 @@ from core.moderation.input_moderation import InputModeration from core.workflow.entities.node_entities import SystemVariable from core.workflow.workflow_engine_manager import WorkflowEngineManager from extensions.ext_database import db -from models.account import Account -from models.model import App, EndUser -from models.workflow import WorkflowRunTriggeredFrom +from models.model import App +from models.workflow import Workflow logger = logging.getLogger(__name__) @@ -43,7 +41,7 @@ class WorkflowAppRunner: if not app_record: raise ValueError("App not found") - workflow = WorkflowEngineManager().get_workflow(app_model=app_record, workflow_id=app_config.workflow_id) + workflow = self.get_workflow(app_model=app_record, workflow_id=app_config.workflow_id) if not workflow: raise ValueError("Workflow not initialized") @@ -59,19 +57,10 @@ class WorkflowAppRunner: ): return - # fetch user - if application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]: - user = db.session.query(Account).filter(Account.id == application_generate_entity.user_id).first() - else: - user = db.session.query(EndUser).filter(EndUser.id == application_generate_entity.user_id).first() - # RUN WORKFLOW workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager.run_workflow( workflow=workflow, - triggered_from=WorkflowRunTriggeredFrom.DEBUGGING - if application_generate_entity.invoke_from == InvokeFrom.DEBUGGER else WorkflowRunTriggeredFrom.APP_RUN, - user=user, user_inputs=inputs, system_inputs={ SystemVariable.FILES: files @@ -82,6 +71,20 @@ class WorkflowAppRunner: )] ) + def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: + """ + Get workflow + """ + # fetch workflow by workflow_id + workflow = db.session.query(Workflow).filter( + Workflow.tenant_id == app_model.tenant_id, + Workflow.app_id == app_model.id, + Workflow.id == workflow_id + ).first() + + # return workflow + return workflow + def handle_input_moderation(self, queue_manager: AppQueueManager, app_record: App, app_generate_entity: WorkflowAppGenerateEntity, diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index a48640766a..721124c4c5 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -4,28 +4,35 @@ import time from collections.abc import Generator from typing import Optional, Union -from pydantic import BaseModel +from pydantic import BaseModel, Extra from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.workflow_based_generate_task_pipeline import WorkflowBasedGenerateTaskPipeline from core.app.entities.app_invoke_entities import ( + InvokeFrom, WorkflowAppGenerateEntity, ) from core.app.entities.queue_entities import ( QueueErrorEvent, QueueMessageReplaceEvent, - QueueNodeFinishedEvent, + QueueNodeFailedEvent, QueueNodeStartedEvent, + QueueNodeSucceededEvent, QueuePingEvent, QueueStopEvent, QueueTextChunkEvent, - QueueWorkflowFinishedEvent, + QueueWorkflowFailedEvent, QueueWorkflowStartedEvent, + QueueWorkflowSucceededEvent, ) from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError from core.moderation.output_moderation import ModerationRule, OutputModeration +from core.workflow.entities.node_entities import NodeRunMetadataKey, SystemVariable from extensions.ext_database import db -from models.workflow import WorkflowNodeExecution, WorkflowRun, WorkflowRunStatus +from models.account import Account +from models.model import EndUser +from models.workflow import Workflow, WorkflowNodeExecution, WorkflowRun, WorkflowRunStatus, WorkflowRunTriggeredFrom logger = logging.getLogger(__name__) @@ -36,24 +43,44 @@ class TaskState(BaseModel): """ answer: str = "" metadata: dict = {} - workflow_run_id: Optional[str] = None + + workflow_run: Optional[WorkflowRun] = None + start_at: Optional[float] = None + total_tokens: int = 0 + total_steps: int = 0 + + current_node_execution: Optional[WorkflowNodeExecution] = None + current_node_execution_start_at: Optional[float] = None + + class Config: + """Configuration for this pydantic object.""" + + extra = Extra.forbid + arbitrary_types_allowed = True -class WorkflowAppGenerateTaskPipeline: +class WorkflowAppGenerateTaskPipeline(WorkflowBasedGenerateTaskPipeline): """ WorkflowAppGenerateTaskPipeline is a class that generate stream output and state management for Application. """ def __init__(self, application_generate_entity: WorkflowAppGenerateEntity, + workflow: Workflow, queue_manager: AppQueueManager, + user: Union[Account, EndUser], stream: bool) -> None: """ Initialize GenerateTaskPipeline. :param application_generate_entity: application generate entity + :param workflow: workflow :param queue_manager: queue manager + :param user: user + :param stream: is stream """ self._application_generate_entity = application_generate_entity + self._workflow = workflow self._queue_manager = queue_manager + self._user = user self._task_state = TaskState() self._start_at = time.perf_counter() self._output_moderation_handler = self._init_output_moderation() @@ -79,17 +106,15 @@ class WorkflowAppGenerateTaskPipeline: if isinstance(event, QueueErrorEvent): raise self._handle_error(event) - elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent): - if isinstance(event, QueueStopEvent): - workflow_run = self._get_workflow_run(self._task_state.workflow_run_id) - else: - workflow_run = self._get_workflow_run(event.workflow_run_id) - - if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: - outputs = workflow_run.outputs_dict - self._task_state.answer = outputs.get('text', '') - else: - raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}'))) + elif isinstance(event, QueueWorkflowStartedEvent): + self._on_workflow_start() + elif isinstance(event, QueueNodeStartedEvent): + self._on_node_start(event) + elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent): + self._on_node_finished(event) + elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent): + self._on_workflow_finished(event) + workflow_run = self._task_state.workflow_run # response moderation if self._output_moderation_handler: @@ -100,10 +125,12 @@ class WorkflowAppGenerateTaskPipeline: public_event=False ) + # save workflow app log + self._save_workflow_app_log() + response = { - 'event': 'workflow_finished', 'task_id': self._application_generate_entity.task_id, - 'workflow_run_id': event.workflow_run_id, + 'workflow_run_id': workflow_run.id, 'data': { 'id': workflow_run.id, 'workflow_id': workflow_run.workflow_id, @@ -135,8 +162,9 @@ class WorkflowAppGenerateTaskPipeline: yield self._yield_response(data) break elif isinstance(event, QueueWorkflowStartedEvent): - self._task_state.workflow_run_id = event.workflow_run_id - workflow_run = self._get_workflow_run(event.workflow_run_id) + self._on_workflow_start() + workflow_run = self._task_state.workflow_run + response = { 'event': 'workflow_started', 'task_id': self._application_generate_entity.task_id, @@ -150,7 +178,9 @@ class WorkflowAppGenerateTaskPipeline: yield self._yield_response(response) elif isinstance(event, QueueNodeStartedEvent): - workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) + self._on_node_start(event) + workflow_node_execution = self._task_state.current_node_execution + response = { 'event': 'node_started', 'task_id': self._application_generate_entity.task_id, @@ -166,8 +196,10 @@ class WorkflowAppGenerateTaskPipeline: } yield self._yield_response(response) - elif isinstance(event, QueueNodeFinishedEvent): - workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) + elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent): + self._on_node_finished(event) + workflow_node_execution = self._task_state.current_node_execution + response = { 'event': 'node_finished', 'task_id': self._application_generate_entity.task_id, @@ -190,20 +222,9 @@ class WorkflowAppGenerateTaskPipeline: } yield self._yield_response(response) - elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent): - if isinstance(event, QueueStopEvent): - workflow_run = self._get_workflow_run(self._task_state.workflow_run_id) - else: - workflow_run = self._get_workflow_run(event.workflow_run_id) - - if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: - outputs = workflow_run.outputs_dict - self._task_state.answer = outputs.get('text', '') - else: - err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')) - data = self._error_to_stream_response_data(self._handle_error(err_event)) - yield self._yield_response(data) - break + elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent): + self._on_workflow_finished(event) + workflow_run = self._task_state.workflow_run # response moderation if self._output_moderation_handler: @@ -219,7 +240,7 @@ class WorkflowAppGenerateTaskPipeline: replace_response = { 'event': 'text_replace', 'task_id': self._application_generate_entity.task_id, - 'workflow_run_id': self._task_state.workflow_run_id, + 'workflow_run_id': self._task_state.workflow_run.id, 'data': { 'text': self._task_state.answer } @@ -233,7 +254,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_run_response = { 'event': 'workflow_finished', 'task_id': self._application_generate_entity.task_id, - 'workflow_run_id': event.workflow_run_id, + 'workflow_run_id': workflow_run.id, 'data': { 'id': workflow_run.id, 'workflow_id': workflow_run.workflow_id, @@ -244,7 +265,7 @@ class WorkflowAppGenerateTaskPipeline: 'total_tokens': workflow_run.total_tokens, 'total_steps': workflow_run.total_steps, 'created_at': int(workflow_run.created_at.timestamp()), - 'finished_at': int(workflow_run.finished_at.timestamp()) + 'finished_at': int(workflow_run.finished_at.timestamp()) if workflow_run.finished_at else None } } @@ -279,7 +300,7 @@ class WorkflowAppGenerateTaskPipeline: response = { 'event': 'text_replace', 'task_id': self._application_generate_entity.task_id, - 'workflow_run_id': self._task_state.workflow_run_id, + 'workflow_run_id': self._task_state.workflow_run.id, 'data': { 'text': event.text } @@ -291,6 +312,95 @@ class WorkflowAppGenerateTaskPipeline: else: continue + def _on_workflow_start(self) -> None: + self._task_state.start_at = time.perf_counter() + + workflow_run = self._init_workflow_run( + workflow=self._workflow, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING + if self._application_generate_entity.invoke_from == InvokeFrom.DEBUGGER + else WorkflowRunTriggeredFrom.APP_RUN, + user=self._user, + user_inputs=self._application_generate_entity.inputs, + system_inputs={ + SystemVariable.FILES: self._application_generate_entity.files + } + ) + + self._task_state.workflow_run = workflow_run + + def _on_node_start(self, event: QueueNodeStartedEvent) -> None: + workflow_node_execution = self._init_node_execution_from_workflow_run( + workflow_run=self._task_state.workflow_run, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_data.title, + node_run_index=event.node_run_index, + predecessor_node_id=event.predecessor_node_id + ) + + self._task_state.current_node_execution = workflow_node_execution + self._task_state.current_node_execution_start_at = time.perf_counter() + self._task_state.total_steps += 1 + + def _on_node_finished(self, event: QueueNodeSucceededEvent | QueueNodeFailedEvent) -> None: + if isinstance(event, QueueNodeSucceededEvent): + workflow_node_execution = self._workflow_node_execution_success( + workflow_node_execution=self._task_state.current_node_execution, + start_at=self._task_state.current_node_execution_start_at, + inputs=event.inputs, + process_data=event.process_data, + outputs=event.outputs, + execution_metadata=event.execution_metadata + ) + + if event.execution_metadata and event.execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS): + self._task_state.total_tokens += ( + int(event.execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS))) + else: + workflow_node_execution = self._workflow_node_execution_failed( + workflow_node_execution=self._task_state.current_node_execution, + start_at=self._task_state.current_node_execution_start_at, + error=event.error + ) + + self._task_state.current_node_execution = workflow_node_execution + + def _on_workflow_finished(self, event: QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent) -> None: + if isinstance(event, QueueStopEvent): + workflow_run = self._workflow_run_failed( + workflow_run=self._task_state.workflow_run, + start_at=self._task_state.start_at, + total_tokens=self._task_state.total_tokens, + total_steps=self._task_state.total_steps, + status=WorkflowRunStatus.STOPPED, + error='Workflow stopped.' + ) + elif isinstance(event, QueueWorkflowFailedEvent): + workflow_run = self._workflow_run_failed( + workflow_run=self._task_state.workflow_run, + start_at=self._task_state.start_at, + total_tokens=self._task_state.total_tokens, + total_steps=self._task_state.total_steps, + status=WorkflowRunStatus.FAILED, + error=event.error + ) + else: + workflow_run = self._workflow_run_success( + workflow_run=self._task_state.workflow_run, + start_at=self._task_state.start_at, + total_tokens=self._task_state.total_tokens, + total_steps=self._task_state.total_steps, + outputs=self._task_state.current_node_execution.outputs + if self._task_state.current_node_execution else None + ) + + self._task_state.workflow_run = workflow_run + + if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: + outputs = workflow_run.outputs_dict + self._task_state.answer = outputs.get('text', '') + def _get_workflow_run(self, workflow_run_id: str) -> WorkflowRun: """ Get workflow run. @@ -298,11 +408,6 @@ class WorkflowAppGenerateTaskPipeline: :return: """ workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first() - if workflow_run: - # Because the workflow_run will be modified in the sub-thread, - # and the first query in the main thread will cache the entity, - # you need to expire the entity after the query - db.session.expire(workflow_run) return workflow_run def _get_workflow_node_execution(self, workflow_node_execution_id: str) -> WorkflowNodeExecution: @@ -313,11 +418,6 @@ class WorkflowAppGenerateTaskPipeline: """ workflow_node_execution = (db.session.query(WorkflowNodeExecution) .filter(WorkflowNodeExecution.id == workflow_node_execution_id).first()) - if workflow_node_execution: - # Because the workflow_node_execution will be modified in the sub-thread, - # and the first query in the main thread will cache the entity, - # you need to expire the entity after the query - db.session.expire(workflow_node_execution) return workflow_node_execution def _save_workflow_app_log(self) -> None: @@ -335,7 +435,7 @@ class WorkflowAppGenerateTaskPipeline: """ response = { 'event': 'text_chunk', - 'workflow_run_id': self._task_state.workflow_run_id, + 'workflow_run_id': self._task_state.workflow_run.id, 'task_id': self._application_generate_entity.task_id, 'data': { 'text': text @@ -398,7 +498,6 @@ class WorkflowAppGenerateTaskPipeline: return { 'event': 'error', 'task_id': self._application_generate_entity.task_id, - 'workflow_run_id': self._task_state.workflow_run_id, **data } diff --git a/api/core/app/apps/workflow/workflow_event_trigger_callback.py b/api/core/app/apps/workflow/workflow_event_trigger_callback.py index 12b93518ed..318466711a 100644 --- a/api/core/app/apps/workflow/workflow_event_trigger_callback.py +++ b/api/core/app/apps/workflow/workflow_event_trigger_callback.py @@ -1,14 +1,19 @@ +from typing import Optional + from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.entities.queue_entities import ( - QueueNodeFinishedEvent, + QueueNodeFailedEvent, QueueNodeStartedEvent, + QueueNodeSucceededEvent, QueueTextChunkEvent, - QueueWorkflowFinishedEvent, + QueueWorkflowFailedEvent, QueueWorkflowStartedEvent, + QueueWorkflowSucceededEvent, ) from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback +from core.workflow.entities.base_node_data_entities import BaseNodeData from core.workflow.entities.node_entities import NodeType -from models.workflow import Workflow, WorkflowNodeExecution, WorkflowRun +from models.workflow import Workflow class WorkflowEventTriggerCallback(BaseWorkflowCallback): @@ -17,39 +22,91 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback): self._queue_manager = queue_manager self._streamable_node_ids = self._fetch_streamable_node_ids(workflow.graph_dict) - def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None: + def on_workflow_run_started(self) -> None: """ Workflow run started """ self._queue_manager.publish( - QueueWorkflowStartedEvent(workflow_run_id=workflow_run.id), + QueueWorkflowStartedEvent(), PublishFrom.APPLICATION_MANAGER ) - def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None: + def on_workflow_run_succeeded(self) -> None: """ - Workflow run finished + Workflow run succeeded """ self._queue_manager.publish( - QueueWorkflowFinishedEvent(workflow_run_id=workflow_run.id), + QueueWorkflowSucceededEvent(), PublishFrom.APPLICATION_MANAGER ) - def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None: + def on_workflow_run_failed(self, error: str) -> None: + """ + Workflow run failed + """ + self._queue_manager.publish( + QueueWorkflowFailedEvent( + error=error + ), + PublishFrom.APPLICATION_MANAGER + ) + + def on_workflow_node_execute_started(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + node_run_index: int = 1, + predecessor_node_id: Optional[str] = None) -> None: """ Workflow node execute started """ self._queue_manager.publish( - QueueNodeStartedEvent(workflow_node_execution_id=workflow_node_execution.id), + QueueNodeStartedEvent( + node_id=node_id, + node_type=node_type, + node_data=node_data, + node_run_index=node_run_index, + predecessor_node_id=predecessor_node_id + ), PublishFrom.APPLICATION_MANAGER ) - def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None: + def on_workflow_node_execute_succeeded(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + inputs: Optional[dict] = None, + process_data: Optional[dict] = None, + outputs: Optional[dict] = None, + execution_metadata: Optional[dict] = None) -> None: """ - Workflow node execute finished + Workflow node execute succeeded """ self._queue_manager.publish( - QueueNodeFinishedEvent(workflow_node_execution_id=workflow_node_execution.id), + QueueNodeSucceededEvent( + node_id=node_id, + node_type=node_type, + node_data=node_data, + inputs=inputs, + process_data=process_data, + outputs=outputs, + execution_metadata=execution_metadata + ), + PublishFrom.APPLICATION_MANAGER + ) + + def on_workflow_node_execute_failed(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + error: str) -> None: + """ + Workflow node execute failed + """ + self._queue_manager.publish( + QueueNodeFailedEvent( + node_id=node_id, + node_type=node_type, + node_data=node_data, + error=error + ), PublishFrom.APPLICATION_MANAGER ) diff --git a/api/core/app/apps/workflow_based_generate_task_pipeline.py b/api/core/app/apps/workflow_based_generate_task_pipeline.py new file mode 100644 index 0000000000..3e9a7b9e1f --- /dev/null +++ b/api/core/app/apps/workflow_based_generate_task_pipeline.py @@ -0,0 +1,202 @@ +import json +import time +from datetime import datetime +from typing import Optional, Union + +from core.model_runtime.utils.encoders import jsonable_encoder +from core.workflow.entities.node_entities import NodeType +from extensions.ext_database import db +from models.account import Account +from models.model import EndUser +from models.workflow import ( + CreatedByRole, + Workflow, + WorkflowNodeExecution, + WorkflowNodeExecutionStatus, + WorkflowNodeExecutionTriggeredFrom, + WorkflowRun, + WorkflowRunStatus, + WorkflowRunTriggeredFrom, +) + + +class WorkflowBasedGenerateTaskPipeline: + def _init_workflow_run(self, workflow: Workflow, + triggered_from: WorkflowRunTriggeredFrom, + user: Union[Account, EndUser], + user_inputs: dict, + system_inputs: Optional[dict] = None) -> WorkflowRun: + """ + Init workflow run + :param workflow: Workflow instance + :param triggered_from: triggered from + :param user: account or end user + :param user_inputs: user variables inputs + :param system_inputs: system inputs, like: query, files + :return: + """ + max_sequence = db.session.query(db.func.max(WorkflowRun.sequence_number)) \ + .filter(WorkflowRun.tenant_id == workflow.tenant_id) \ + .filter(WorkflowRun.app_id == workflow.app_id) \ + .scalar() or 0 + new_sequence_number = max_sequence + 1 + + # init workflow run + workflow_run = WorkflowRun( + tenant_id=workflow.tenant_id, + app_id=workflow.app_id, + sequence_number=new_sequence_number, + workflow_id=workflow.id, + type=workflow.type, + triggered_from=triggered_from.value, + version=workflow.version, + graph=workflow.graph, + inputs=json.dumps({**user_inputs, **jsonable_encoder(system_inputs)}), + status=WorkflowRunStatus.RUNNING.value, + created_by_role=(CreatedByRole.ACCOUNT.value + if isinstance(user, Account) else CreatedByRole.END_USER.value), + created_by=user.id + ) + + db.session.add(workflow_run) + db.session.commit() + + return workflow_run + + def _workflow_run_success(self, workflow_run: WorkflowRun, + start_at: float, + total_tokens: int, + total_steps: int, + outputs: Optional[dict] = None) -> WorkflowRun: + """ + Workflow run success + :param workflow_run: workflow run + :param start_at: start time + :param total_tokens: total tokens + :param total_steps: total steps + :param outputs: outputs + :return: + """ + workflow_run.status = WorkflowRunStatus.SUCCEEDED.value + workflow_run.outputs = outputs + workflow_run.elapsed_time = time.perf_counter() - start_at + workflow_run.total_tokens = total_tokens + workflow_run.total_steps = total_steps + workflow_run.finished_at = datetime.utcnow() + + db.session.commit() + + return workflow_run + + def _workflow_run_failed(self, workflow_run: WorkflowRun, + start_at: float, + total_tokens: int, + total_steps: int, + status: WorkflowRunStatus, + error: str) -> WorkflowRun: + """ + Workflow run failed + :param workflow_run: workflow run + :param start_at: start time + :param total_tokens: total tokens + :param total_steps: total steps + :param status: status + :param error: error message + :return: + """ + workflow_run.status = status.value + workflow_run.error = error + workflow_run.elapsed_time = time.perf_counter() - start_at + workflow_run.total_tokens = total_tokens + workflow_run.total_steps = total_steps + workflow_run.finished_at = datetime.utcnow() + + db.session.commit() + + return workflow_run + + def _init_node_execution_from_workflow_run(self, workflow_run: WorkflowRun, + node_id: str, + node_type: NodeType, + node_title: str, + node_run_index: int = 1, + predecessor_node_id: Optional[str] = None) -> WorkflowNodeExecution: + """ + Init workflow node execution from workflow run + :param workflow_run: workflow run + :param node_id: node id + :param node_type: node type + :param node_title: node title + :param node_run_index: run index + :param predecessor_node_id: predecessor node id if exists + :return: + """ + # init workflow node execution + workflow_node_execution = WorkflowNodeExecution( + tenant_id=workflow_run.tenant_id, + app_id=workflow_run.app_id, + workflow_id=workflow_run.workflow_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value, + workflow_run_id=workflow_run.id, + predecessor_node_id=predecessor_node_id, + index=node_run_index, + node_id=node_id, + node_type=node_type.value, + title=node_title, + status=WorkflowNodeExecutionStatus.RUNNING.value, + created_by_role=workflow_run.created_by_role, + created_by=workflow_run.created_by + ) + + db.session.add(workflow_node_execution) + db.session.commit() + + return workflow_node_execution + + def _workflow_node_execution_success(self, workflow_node_execution: WorkflowNodeExecution, + start_at: float, + inputs: Optional[dict] = None, + process_data: Optional[dict] = None, + outputs: Optional[dict] = None, + execution_metadata: Optional[dict] = None) -> WorkflowNodeExecution: + """ + Workflow node execution success + :param workflow_node_execution: workflow node execution + :param start_at: start time + :param inputs: inputs + :param process_data: process data + :param outputs: outputs + :param execution_metadata: execution metadata + :return: + """ + workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value + workflow_node_execution.elapsed_time = time.perf_counter() - start_at + workflow_node_execution.inputs = json.dumps(inputs) if inputs else None + workflow_node_execution.process_data = json.dumps(process_data) if process_data else None + workflow_node_execution.outputs = json.dumps(outputs) if outputs else None + workflow_node_execution.execution_metadata = json.dumps(jsonable_encoder(execution_metadata)) \ + if execution_metadata else None + workflow_node_execution.finished_at = datetime.utcnow() + + db.session.commit() + + return workflow_node_execution + + def _workflow_node_execution_failed(self, workflow_node_execution: WorkflowNodeExecution, + start_at: float, + error: str) -> WorkflowNodeExecution: + """ + Workflow node execution failed + :param workflow_node_execution: workflow node execution + :param start_at: start time + :param error: error message + :return: + """ + workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value + workflow_node_execution.error = error + workflow_node_execution.elapsed_time = time.perf_counter() - start_at + workflow_node_execution.finished_at = datetime.utcnow() + + db.session.commit() + + return workflow_node_execution diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 67ed13d721..0ea7744b58 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -1,9 +1,11 @@ from enum import Enum -from typing import Any +from typing import Any, Optional from pydantic import BaseModel from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk +from core.workflow.entities.base_node_data_entities import BaseNodeData +from core.workflow.entities.node_entities import NodeType class QueueEvent(Enum): @@ -16,9 +18,11 @@ class QueueEvent(Enum): MESSAGE_REPLACE = "message_replace" MESSAGE_END = "message_end" WORKFLOW_STARTED = "workflow_started" - WORKFLOW_FINISHED = "workflow_finished" + WORKFLOW_SUCCEEDED = "workflow_succeeded" + WORKFLOW_FAILED = "workflow_failed" NODE_STARTED = "node_started" - NODE_FINISHED = "node_finished" + NODE_SUCCEEDED = "node_succeeded" + NODE_FAILED = "node_failed" RETRIEVER_RESOURCES = "retriever_resources" ANNOTATION_REPLY = "annotation_reply" AGENT_THOUGHT = "agent_thought" @@ -96,15 +100,21 @@ class QueueWorkflowStartedEvent(AppQueueEvent): QueueWorkflowStartedEvent entity """ event = QueueEvent.WORKFLOW_STARTED - workflow_run_id: str -class QueueWorkflowFinishedEvent(AppQueueEvent): +class QueueWorkflowSucceededEvent(AppQueueEvent): """ - QueueWorkflowFinishedEvent entity + QueueWorkflowSucceededEvent entity """ - event = QueueEvent.WORKFLOW_FINISHED - workflow_run_id: str + event = QueueEvent.WORKFLOW_SUCCEEDED + + +class QueueWorkflowFailedEvent(AppQueueEvent): + """ + QueueWorkflowFailedEvent entity + """ + event = QueueEvent.WORKFLOW_FAILED + error: str class QueueNodeStartedEvent(AppQueueEvent): @@ -112,17 +122,45 @@ class QueueNodeStartedEvent(AppQueueEvent): QueueNodeStartedEvent entity """ event = QueueEvent.NODE_STARTED - workflow_node_execution_id: str + + node_id: str + node_type: NodeType + node_data: BaseNodeData + node_run_index: int = 1 + predecessor_node_id: Optional[str] = None -class QueueNodeFinishedEvent(AppQueueEvent): +class QueueNodeSucceededEvent(AppQueueEvent): """ - QueueNodeFinishedEvent entity + QueueNodeSucceededEvent entity """ - event = QueueEvent.NODE_FINISHED - workflow_node_execution_id: str + event = QueueEvent.NODE_SUCCEEDED + + node_id: str + node_type: NodeType + node_data: BaseNodeData + + inputs: Optional[dict] = None + process_data: Optional[dict] = None + outputs: Optional[dict] = None + execution_metadata: Optional[dict] = None + + error: Optional[str] = None + + +class QueueNodeFailedEvent(AppQueueEvent): + """ + QueueNodeFailedEvent entity + """ + event = QueueEvent.NODE_FAILED + + node_id: str + node_type: NodeType + node_data: BaseNodeData + + error: str + - class QueueAgentThoughtEvent(AppQueueEvent): """ QueueAgentThoughtEvent entity diff --git a/api/core/workflow/callbacks/base_workflow_callback.py b/api/core/workflow/callbacks/base_workflow_callback.py index 3866bf2c15..cf2915ed86 100644 --- a/api/core/workflow/callbacks/base_workflow_callback.py +++ b/api/core/workflow/callbacks/base_workflow_callback.py @@ -1,34 +1,63 @@ from abc import ABC, abstractmethod +from typing import Optional -from models.workflow import WorkflowNodeExecution, WorkflowRun +from core.workflow.entities.base_node_data_entities import BaseNodeData +from core.workflow.entities.node_entities import NodeType class BaseWorkflowCallback(ABC): @abstractmethod - def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None: + def on_workflow_run_started(self) -> None: """ Workflow run started """ raise NotImplementedError @abstractmethod - def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None: + def on_workflow_run_succeeded(self) -> None: """ - Workflow run finished + Workflow run succeeded """ raise NotImplementedError @abstractmethod - def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None: + def on_workflow_run_failed(self, error: str) -> None: + """ + Workflow run failed + """ + raise NotImplementedError + + @abstractmethod + def on_workflow_node_execute_started(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + node_run_index: int = 1, + predecessor_node_id: Optional[str] = None) -> None: """ Workflow node execute started """ raise NotImplementedError @abstractmethod - def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None: + def on_workflow_node_execute_succeeded(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + inputs: Optional[dict] = None, + process_data: Optional[dict] = None, + outputs: Optional[dict] = None, + execution_metadata: Optional[dict] = None) -> None: """ - Workflow node execute finished + Workflow node execute succeeded + """ + raise NotImplementedError + + @abstractmethod + def on_workflow_node_execute_failed(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + error: str) -> None: + """ + Workflow node execute failed """ raise NotImplementedError @@ -38,4 +67,3 @@ class BaseWorkflowCallback(ABC): Publish text chunk """ raise NotImplementedError - diff --git a/api/core/workflow/entities/workflow_entities.py b/api/core/workflow/entities/workflow_entities.py index 8c15cb95cd..6c2adfe0fb 100644 --- a/api/core/workflow/entities/workflow_entities.py +++ b/api/core/workflow/entities/workflow_entities.py @@ -1,22 +1,32 @@ +from typing import Optional + +from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool -from models.workflow import WorkflowNodeExecution, WorkflowRun +from core.workflow.nodes.base_node import BaseNode +from models.workflow import Workflow + + +class WorkflowNodeAndResult: + node: BaseNode + result: Optional[NodeRunResult] = None + + def __init__(self, node: BaseNode, result: Optional[NodeRunResult] = None): + self.node = node + self.result = result class WorkflowRunState: - workflow_run: WorkflowRun + workflow: Workflow start_at: float user_inputs: dict variable_pool: VariablePool total_tokens: int = 0 - workflow_node_executions: list[WorkflowNodeExecution] = [] + workflow_nodes_and_results: list[WorkflowNodeAndResult] = [] - def __init__(self, workflow_run: WorkflowRun, - start_at: float, - user_inputs: dict, - variable_pool: VariablePool) -> None: - self.workflow_run = workflow_run + def __init__(self, workflow: Workflow, start_at: float, user_inputs: dict, variable_pool: VariablePool): + self.workflow = workflow self.start_at = start_at self.user_inputs = user_inputs self.variable_pool = variable_pool diff --git a/api/core/workflow/nodes/direct_answer/direct_answer_node.py b/api/core/workflow/nodes/direct_answer/direct_answer_node.py index bc6e4bd800..971cbe536e 100644 --- a/api/core/workflow/nodes/direct_answer/direct_answer_node.py +++ b/api/core/workflow/nodes/direct_answer/direct_answer_node.py @@ -43,7 +43,7 @@ class DirectAnswerNode(BaseNode): # publish answer as stream for word in answer: self.publish_text_chunk(word) - time.sleep(0.01) + time.sleep(0.01) # todo sleep 0.01 return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, diff --git a/api/core/workflow/workflow_engine_manager.py b/api/core/workflow/workflow_engine_manager.py index 19dac76631..628df4ac5f 100644 --- a/api/core/workflow/workflow_engine_manager.py +++ b/api/core/workflow/workflow_engine_manager.py @@ -1,13 +1,11 @@ -import json import time -from datetime import datetime -from typing import Optional, Union +from typing import Optional -from core.model_runtime.utils.encoders import jsonable_encoder +from core.app.apps.base_app_queue_manager import GenerateTaskStoppedException from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType from core.workflow.entities.variable_pool import VariablePool, VariableValue -from core.workflow.entities.workflow_entities import WorkflowRunState +from core.workflow.entities.workflow_entities import WorkflowNodeAndResult, WorkflowRunState from core.workflow.nodes.base_node import BaseNode from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode @@ -21,18 +19,9 @@ from core.workflow.nodes.start.start_node import StartNode from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode from core.workflow.nodes.tool.tool_node import ToolNode from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode -from extensions.ext_database import db -from models.account import Account -from models.model import App, EndUser from models.workflow import ( - CreatedByRole, Workflow, - WorkflowNodeExecution, WorkflowNodeExecutionStatus, - WorkflowNodeExecutionTriggeredFrom, - WorkflowRun, - WorkflowRunStatus, - WorkflowRunTriggeredFrom, WorkflowType, ) @@ -53,20 +42,6 @@ node_classes = { class WorkflowEngineManager: - def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: - """ - Get workflow - """ - # fetch workflow by workflow_id - workflow = db.session.query(Workflow).filter( - Workflow.tenant_id == app_model.tenant_id, - Workflow.app_id == app_model.id, - Workflow.id == workflow_id - ).first() - - # return workflow - return workflow - def get_default_configs(self) -> list[dict]: """ Get default block configs @@ -100,16 +75,12 @@ class WorkflowEngineManager: return default_config def run_workflow(self, workflow: Workflow, - triggered_from: WorkflowRunTriggeredFrom, - user: Union[Account, EndUser], user_inputs: dict, system_inputs: Optional[dict] = None, callbacks: list[BaseWorkflowCallback] = None) -> None: """ Run workflow :param workflow: Workflow instance - :param triggered_from: triggered from - :param user: account or end user :param user_inputs: user variables inputs :param system_inputs: system inputs, like: query, files :param callbacks: workflow callbacks @@ -130,18 +101,13 @@ class WorkflowEngineManager: raise ValueError('edges in workflow graph must be a list') # init workflow run - workflow_run = self._init_workflow_run( - workflow=workflow, - triggered_from=triggered_from, - user=user, - user_inputs=user_inputs, - system_inputs=system_inputs, - callbacks=callbacks - ) + if callbacks: + for callback in callbacks: + callback.on_workflow_run_started() # init workflow run state workflow_run_state = WorkflowRunState( - workflow_run=workflow_run, + workflow=workflow, start_at=time.perf_counter(), user_inputs=user_inputs, variable_pool=VariablePool( @@ -166,7 +132,7 @@ class WorkflowEngineManager: has_entry_node = True # max steps 30 reached - if len(workflow_run_state.workflow_node_executions) > 30: + if len(workflow_run_state.workflow_nodes_and_results) > 30: raise ValueError('Max steps 30 reached.') # or max execution time 10min reached @@ -188,14 +154,14 @@ class WorkflowEngineManager: if not has_entry_node: self._workflow_run_failed( - workflow_run_state=workflow_run_state, error='Start node not found in workflow graph.', callbacks=callbacks ) return + except GenerateTaskStoppedException as e: + return except Exception as e: self._workflow_run_failed( - workflow_run_state=workflow_run_state, error=str(e), callbacks=callbacks ) @@ -203,112 +169,33 @@ class WorkflowEngineManager: # workflow run success self._workflow_run_success( - workflow_run_state=workflow_run_state, callbacks=callbacks ) - def _init_workflow_run(self, workflow: Workflow, - triggered_from: WorkflowRunTriggeredFrom, - user: Union[Account, EndUser], - user_inputs: dict, - system_inputs: Optional[dict] = None, - callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun: - """ - Init workflow run - :param workflow: Workflow instance - :param triggered_from: triggered from - :param user: account or end user - :param user_inputs: user variables inputs - :param system_inputs: system inputs, like: query, files - :param callbacks: workflow callbacks - :return: - """ - max_sequence = db.session.query(db.func.max(WorkflowRun.sequence_number)) \ - .filter(WorkflowRun.tenant_id == workflow.tenant_id) \ - .filter(WorkflowRun.app_id == workflow.app_id) \ - .scalar() or 0 - new_sequence_number = max_sequence + 1 - - # init workflow run - workflow_run = WorkflowRun( - tenant_id=workflow.tenant_id, - app_id=workflow.app_id, - sequence_number=new_sequence_number, - workflow_id=workflow.id, - type=workflow.type, - triggered_from=triggered_from.value, - version=workflow.version, - graph=workflow.graph, - inputs=json.dumps({**user_inputs, **jsonable_encoder(system_inputs)}), - status=WorkflowRunStatus.RUNNING.value, - created_by_role=(CreatedByRole.ACCOUNT.value - if isinstance(user, Account) else CreatedByRole.END_USER.value), - created_by=user.id - ) - - db.session.add(workflow_run) - db.session.commit() - - if callbacks: - for callback in callbacks: - callback.on_workflow_run_started(workflow_run) - - return workflow_run - - def _workflow_run_success(self, workflow_run_state: WorkflowRunState, - callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun: + def _workflow_run_success(self, callbacks: list[BaseWorkflowCallback] = None) -> None: """ Workflow run success - :param workflow_run_state: workflow run state :param callbacks: workflow callbacks :return: """ - workflow_run = workflow_run_state.workflow_run - workflow_run.status = WorkflowRunStatus.SUCCEEDED.value - - # fetch last workflow_node_executions - last_workflow_node_execution = workflow_run_state.workflow_node_executions[-1] - if last_workflow_node_execution: - workflow_run.outputs = last_workflow_node_execution.outputs - - workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at - workflow_run.total_tokens = workflow_run_state.total_tokens - workflow_run.total_steps = len(workflow_run_state.workflow_node_executions) - workflow_run.finished_at = datetime.utcnow() - - db.session.commit() if callbacks: for callback in callbacks: - callback.on_workflow_run_finished(workflow_run) + callback.on_workflow_run_succeeded() - return workflow_run - - def _workflow_run_failed(self, workflow_run_state: WorkflowRunState, - error: str, - callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun: + def _workflow_run_failed(self, error: str, + callbacks: list[BaseWorkflowCallback] = None) -> None: """ Workflow run failed - :param workflow_run_state: workflow run state :param error: error message :param callbacks: workflow callbacks :return: """ - workflow_run = workflow_run_state.workflow_run - workflow_run.status = WorkflowRunStatus.FAILED.value - workflow_run.error = error - workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at - workflow_run.total_tokens = workflow_run_state.total_tokens - workflow_run.total_steps = len(workflow_run_state.workflow_node_executions) - workflow_run.finished_at = datetime.utcnow() - - db.session.commit() - if callbacks: for callback in callbacks: - callback.on_workflow_run_finished(workflow_run) - - return workflow_run + callback.on_workflow_run_failed( + error=error + ) def _get_next_node(self, graph: dict, predecessor_node: Optional[BaseNode] = None, @@ -384,18 +271,24 @@ class WorkflowEngineManager: def _run_workflow_node(self, workflow_run_state: WorkflowRunState, node: BaseNode, predecessor_node: Optional[BaseNode] = None, - callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution: - # init workflow node execution - start_at = time.perf_counter() - workflow_node_execution = self._init_node_execution_from_workflow_run( - workflow_run_state=workflow_run_state, + callbacks: list[BaseWorkflowCallback] = None) -> None: + if callbacks: + for callback in callbacks: + callback.on_workflow_node_execute_started( + node_id=node.node_id, + node_type=node.node_type, + node_data=node.node_data, + node_run_index=len(workflow_run_state.workflow_nodes_and_results) + 1, + predecessor_node_id=predecessor_node.node_id if predecessor_node else None + ) + + workflow_nodes_and_result = WorkflowNodeAndResult( node=node, - predecessor_node=predecessor_node, - callbacks=callbacks + result=None ) - # add to workflow node executions - workflow_run_state.workflow_node_executions.append(workflow_node_execution) + # add to workflow_nodes_and_results + workflow_run_state.workflow_nodes_and_results.append(workflow_nodes_and_result) # run node, result must have inputs, process_data, outputs, execution_metadata node_run_result = node.run( @@ -406,24 +299,34 @@ class WorkflowEngineManager: if node_run_result.status == WorkflowNodeExecutionStatus.FAILED: # node run failed - self._workflow_node_execution_failed( - workflow_node_execution=workflow_node_execution, - start_at=start_at, - error=node_run_result.error, - callbacks=callbacks - ) + if callbacks: + for callback in callbacks: + callback.on_workflow_node_execute_failed( + node_id=node.node_id, + node_type=node.node_type, + node_data=node.node_data, + error=node_run_result.error + ) + raise ValueError(f"Node {node.node_data.title} run failed: {node_run_result.error}") # set end node output if in chat self._set_end_node_output_if_in_chat(workflow_run_state, node, node_run_result) + workflow_nodes_and_result.result = node_run_result + # node run success - self._workflow_node_execution_success( - workflow_node_execution=workflow_node_execution, - start_at=start_at, - result=node_run_result, - callbacks=callbacks - ) + if callbacks: + for callback in callbacks: + callback.on_workflow_node_execute_succeeded( + node_id=node.node_id, + node_type=node.node_type, + node_data=node.node_data, + inputs=node_run_result.inputs, + process_data=node_run_result.process_data, + outputs=node_run_result.outputs, + execution_metadata=node_run_result.metadata + ) if node_run_result.outputs: for variable_key, variable_value in node_run_result.outputs.items(): @@ -438,105 +341,9 @@ class WorkflowEngineManager: if node_run_result.metadata and node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS): workflow_run_state.total_tokens += int(node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS)) - return workflow_node_execution - - def _init_node_execution_from_workflow_run(self, workflow_run_state: WorkflowRunState, - node: BaseNode, - predecessor_node: Optional[BaseNode] = None, - callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution: - """ - Init workflow node execution from workflow run - :param workflow_run_state: workflow run state - :param node: current node - :param predecessor_node: predecessor node if exists - :param callbacks: workflow callbacks - :return: - """ - workflow_run = workflow_run_state.workflow_run - - # init workflow node execution - workflow_node_execution = WorkflowNodeExecution( - tenant_id=workflow_run.tenant_id, - app_id=workflow_run.app_id, - workflow_id=workflow_run.workflow_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value, - workflow_run_id=workflow_run.id, - predecessor_node_id=predecessor_node.node_id if predecessor_node else None, - index=len(workflow_run_state.workflow_node_executions) + 1, - node_id=node.node_id, - node_type=node.node_type.value, - title=node.node_data.title, - status=WorkflowNodeExecutionStatus.RUNNING.value, - created_by_role=workflow_run.created_by_role, - created_by=workflow_run.created_by - ) - - db.session.add(workflow_node_execution) - db.session.commit() - - if callbacks: - for callback in callbacks: - callback.on_workflow_node_execute_started(workflow_node_execution) - - return workflow_node_execution - - def _workflow_node_execution_success(self, workflow_node_execution: WorkflowNodeExecution, - start_at: float, - result: NodeRunResult, - callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution: - """ - Workflow node execution success - :param workflow_node_execution: workflow node execution - :param start_at: start time - :param result: node run result - :param callbacks: workflow callbacks - :return: - """ - workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value - workflow_node_execution.elapsed_time = time.perf_counter() - start_at - workflow_node_execution.inputs = json.dumps(result.inputs) if result.inputs else None - workflow_node_execution.process_data = json.dumps(result.process_data) if result.process_data else None - workflow_node_execution.outputs = json.dumps(result.outputs) if result.outputs else None - workflow_node_execution.execution_metadata = json.dumps(jsonable_encoder(result.metadata)) \ - if result.metadata else None - workflow_node_execution.finished_at = datetime.utcnow() - - db.session.commit() - - if callbacks: - for callback in callbacks: - callback.on_workflow_node_execute_finished(workflow_node_execution) - - return workflow_node_execution - - def _workflow_node_execution_failed(self, workflow_node_execution: WorkflowNodeExecution, - start_at: float, - error: str, - callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution: - """ - Workflow node execution failed - :param workflow_node_execution: workflow node execution - :param start_at: start time - :param error: error message - :param callbacks: workflow callbacks - :return: - """ - workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value - workflow_node_execution.error = error - workflow_node_execution.elapsed_time = time.perf_counter() - start_at - workflow_node_execution.finished_at = datetime.utcnow() - - db.session.commit() - - if callbacks: - for callback in callbacks: - callback.on_workflow_node_execute_finished(workflow_node_execution) - - return workflow_node_execution - def _set_end_node_output_if_in_chat(self, workflow_run_state: WorkflowRunState, node: BaseNode, - node_run_result: NodeRunResult): + node_run_result: NodeRunResult) -> None: """ Set end node output if in chat :param workflow_run_state: workflow run state @@ -544,21 +351,19 @@ class WorkflowEngineManager: :param node_run_result: node run result :return: """ - if workflow_run_state.workflow_run.type == WorkflowType.CHAT.value and node.node_type == NodeType.END: - workflow_node_execution_before_end = workflow_run_state.workflow_node_executions[-2] - if workflow_node_execution_before_end: - if workflow_node_execution_before_end.node_type == NodeType.LLM.value: + if workflow_run_state.workflow.type == WorkflowType.CHAT.value and node.node_type == NodeType.END: + workflow_nodes_and_result_before_end = workflow_run_state.workflow_nodes_and_results[-2] + if workflow_nodes_and_result_before_end: + if workflow_nodes_and_result_before_end.node.node_type == NodeType.LLM.value: if not node_run_result.outputs: node_run_result.outputs = {} - node_run_result.outputs['text'] = workflow_node_execution_before_end.outputs_dict.get('text') - elif workflow_node_execution_before_end.node_type == NodeType.DIRECT_ANSWER.value: + node_run_result.outputs['text'] = workflow_nodes_and_result_before_end.result.outputs.get('text') + elif workflow_nodes_and_result_before_end.node.node_type == NodeType.DIRECT_ANSWER.value: if not node_run_result.outputs: node_run_result.outputs = {} - node_run_result.outputs['text'] = workflow_node_execution_before_end.outputs_dict.get('answer') - - return node_run_result + node_run_result.outputs['text'] = workflow_nodes_and_result_before_end.result.outputs.get('answer') def _append_variables_recursively(self, variable_pool: VariablePool, node_id: str, diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 833c22cdff..f8bd80a0b1 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -5,6 +5,7 @@ from typing import Optional, Union from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator +from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom @@ -44,10 +45,14 @@ class WorkflowService: if not app_model.workflow_id: return None - workflow_engine_manager = WorkflowEngineManager() - # fetch published workflow by workflow_id - return workflow_engine_manager.get_workflow(app_model, app_model.workflow_id) + workflow = db.session.query(Workflow).filter( + Workflow.tenant_id == app_model.tenant_id, + Workflow.app_id == app_model.id, + Workflow.id == app_model.workflow_id + ).first() + + return workflow def sync_draft_workflow(self, app_model: App, graph: dict, @@ -201,6 +206,14 @@ class WorkflowService: return response + def stop_workflow_task(self, task_id: str, + user: Union[Account, EndUser], + invoke_from: InvokeFrom) -> None: + """ + Stop workflow task + """ + AppQueueManager.set_stop_flag(task_id, invoke_from, user.id) + def convert_to_workflow(self, app_model: App, account: Account) -> App: """ Basic mode of chatbot app(expert mode) to workflow