diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 8a68cafad8..30d383ec02 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -129,18 +129,14 @@ class DraftWorkflowRunApi(Resource): args=args, invoke_from=InvokeFrom.DEBUGGER ) + + return compact_response(response) except ValueError as e: raise e except Exception as e: logging.exception("internal server error.") raise InternalServerError() - def generate() -> Generator: - yield from response - - return Response(stream_with_context(generate()), status=200, - mimetype='text/event-stream') - class WorkflowTaskStopApi(Resource): @setup_required diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 9c06f516a5..db22607146 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -235,36 +235,39 @@ class AdvancedChatAppGenerateTaskPipeline: yield self._yield_response(response) elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent): - if isinstance(event, QueueWorkflowFinishedEvent): + if isinstance(event, QueueStopEvent): + workflow_run = self._get_workflow_run(self._task_state.workflow_run_id) + else: workflow_run = self._get_workflow_run(event.workflow_run_id) - if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: - outputs = workflow_run.outputs_dict - self._task_state.answer = outputs.get('text', '') - else: - err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')) - data = self._error_to_stream_response_data(self._handle_error(err_event)) - yield self._yield_response(data) - break - workflow_run_response = { - 'event': 'workflow_finished', - 'task_id': self._application_generate_entity.task_id, - 'workflow_run_id': event.workflow_run_id, - 'data': { - 'id': workflow_run.id, - 'workflow_id': workflow_run.workflow_id, - 'status': workflow_run.status, - 'outputs': workflow_run.outputs_dict, - 'error': workflow_run.error, - 'elapsed_time': workflow_run.elapsed_time, - 'total_tokens': workflow_run.total_tokens, - 'total_steps': workflow_run.total_steps, - 'created_at': int(workflow_run.created_at.timestamp()), - 'finished_at': int(workflow_run.finished_at.timestamp()) - } + if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: + outputs = workflow_run.outputs_dict + self._task_state.answer = outputs.get('text', '') + else: + err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')) + data = self._error_to_stream_response_data(self._handle_error(err_event)) + yield self._yield_response(data) + break + + workflow_run_response = { + 'event': 'workflow_finished', + 'task_id': self._application_generate_entity.task_id, + 'workflow_run_id': event.workflow_run_id, + 'data': { + 'id': workflow_run.id, + 'workflow_id': workflow_run.workflow_id, + 'status': workflow_run.status, + 'outputs': workflow_run.outputs_dict, + 'error': workflow_run.error, + 'elapsed_time': workflow_run.elapsed_time, + 'total_tokens': workflow_run.total_tokens, + 'total_steps': workflow_run.total_steps, + 'created_at': int(workflow_run.created_at.timestamp()), + 'finished_at': int(workflow_run.finished_at.timestamp()) } + } - yield self._yield_response(workflow_run_response) + yield self._yield_response(workflow_run_response) # response moderation if self._output_moderation_handler: diff --git a/api/core/app/apps/message_based_app_queue_manager.py b/api/core/app/apps/message_based_app_queue_manager.py index ed9475502d..13644c99ae 100644 --- a/api/core/app/apps/message_based_app_queue_manager.py +++ b/api/core/app/apps/message_based_app_queue_manager.py @@ -2,6 +2,7 @@ from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( AppQueueEvent, + MessageQueueMessage, QueueMessage, ) @@ -20,7 +21,7 @@ class MessageBasedAppQueueManager(AppQueueManager): self._message_id = str(message_id) def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage: - return QueueMessage( + return MessageQueueMessage( task_id=self._task_id, message_id=self._message_id, conversation_id=self._conversation_id, diff --git a/api/core/app/apps/workflow/app_queue_manager.py b/api/core/app/apps/workflow/app_queue_manager.py index 0f9b0a1c78..5cf1e58913 100644 --- a/api/core/app/apps/workflow/app_queue_manager.py +++ b/api/core/app/apps/workflow/app_queue_manager.py @@ -3,6 +3,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( AppQueueEvent, QueueMessage, + WorkflowQueueMessage, ) @@ -16,7 +17,7 @@ class WorkflowAppQueueManager(AppQueueManager): self._app_mode = app_mode def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage: - return QueueMessage( + return WorkflowQueueMessage( task_id=self._task_id, app_mode=self._app_mode, event=event diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index bcd5a4ba3d..a48640766a 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -86,7 +86,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_run = self._get_workflow_run(event.workflow_run_id) if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: - outputs = workflow_run.outputs + outputs = workflow_run.outputs_dict self._task_state.answer = outputs.get('text', '') else: raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}'))) @@ -136,12 +136,11 @@ class WorkflowAppGenerateTaskPipeline: break elif isinstance(event, QueueWorkflowStartedEvent): self._task_state.workflow_run_id = event.workflow_run_id - workflow_run = self._get_workflow_run(event.workflow_run_id) response = { 'event': 'workflow_started', 'task_id': self._application_generate_entity.task_id, - 'workflow_run_id': event.workflow_run_id, + 'workflow_run_id': workflow_run.id, 'data': { 'id': workflow_run.id, 'workflow_id': workflow_run.workflow_id, @@ -198,7 +197,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_run = self._get_workflow_run(event.workflow_run_id) if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: - outputs = workflow_run.outputs + outputs = workflow_run.outputs_dict self._task_state.answer = outputs.get('text', '') else: err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')) @@ -228,6 +227,9 @@ class WorkflowAppGenerateTaskPipeline: yield self._yield_response(replace_response) + # save workflow app log + self._save_workflow_app_log() + workflow_run_response = { 'event': 'workflow_finished', 'task_id': self._application_generate_entity.task_id, @@ -295,7 +297,13 @@ class WorkflowAppGenerateTaskPipeline: :param workflow_run_id: workflow run id :return: """ - return db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first() + workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first() + if workflow_run: + # Because the workflow_run will be modified in the sub-thread, + # and the first query in the main thread will cache the entity, + # you need to expire the entity after the query + db.session.expire(workflow_run) + return workflow_run def _get_workflow_node_execution(self, workflow_node_execution_id: str) -> WorkflowNodeExecution: """ @@ -303,7 +311,21 @@ class WorkflowAppGenerateTaskPipeline: :param workflow_node_execution_id: workflow node execution id :return: """ - return db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution_id).first() + workflow_node_execution = (db.session.query(WorkflowNodeExecution) + .filter(WorkflowNodeExecution.id == workflow_node_execution_id).first()) + if workflow_node_execution: + # Because the workflow_node_execution will be modified in the sub-thread, + # and the first query in the main thread will cache the entity, + # you need to expire the entity after the query + db.session.expire(workflow_node_execution) + return workflow_node_execution + + def _save_workflow_app_log(self) -> None: + """ + Save workflow app log. + :return: + """ + pass # todo def _handle_chunk(self, text: str) -> dict: """ diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 38f9638eaa..67ed13d721 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -176,7 +176,20 @@ class QueueMessage(BaseModel): QueueMessage entity """ task_id: str - message_id: str - conversation_id: str app_mode: str event: AppQueueEvent + + +class MessageQueueMessage(QueueMessage): + """ + MessageQueueMessage entity + """ + message_id: str + conversation_id: str + + +class WorkflowQueueMessage(QueueMessage): + """ + WorkflowQueueMessage entity + """ + pass diff --git a/api/models/workflow.py b/api/models/workflow.py index 0883d0ef13..9768c364dd 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -143,7 +143,7 @@ class Workflow(db.Model): return [] # get user_input_form from start node - return start_node.get('variables', []) + return start_node.get('data', {}).get('variables', []) class WorkflowRunTriggeredFrom(Enum):