diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 686ef7b4be..6c2d4c6c9f 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -222,6 +222,7 @@ class AdvancedChatDraftRunIterationNodeApi(Resource): logging.exception("internal server error.") raise InternalServerError() + class WorkflowDraftRunIterationNodeApi(Resource): @setup_required @login_required diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 0141dbec58..d1ee8bf166 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -33,12 +33,12 @@ logger = logging.getLogger(__name__) class AdvancedChatAppGenerator(MessageBasedAppGenerator): def generate( - self, app_model: App, - workflow: Workflow, - user: Union[Account, EndUser], - args: dict, - invoke_from: InvokeFrom, - stream: bool = True, + self, app_model: App, + workflow: Workflow, + user: Union[Account, EndUser], + args: dict, + invoke_from: InvokeFrom, + stream: bool = True, ) -> Union[dict, Generator[dict, None, None]]: """ Generate App response. @@ -120,81 +120,14 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): conversation=conversation, stream=stream ) - - def single_iteration_generate(self, app_model: App, - workflow: Workflow, - node_id: str, - user: Account, - args: dict, - stream: bool = True) \ - -> Union[dict, Generator[dict, None, None]]: - """ - Generate App response. - - :param app_model: App - :param workflow: Workflow - :param user: account or end user - :param args: request args - :param invoke_from: invoke from source - :param stream: is stream - """ - if not node_id: - raise ValueError('node_id is required') - - if args.get('inputs') is None: - raise ValueError('inputs is required') - - extras = { - "auto_generate_conversation_name": False - } - - # get conversation - conversation = None - if args.get('conversation_id'): - conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user) - - # convert to app config - app_config = AdvancedChatAppConfigManager.get_app_config( - app_model=app_model, - workflow=workflow - ) - - # init application generate entity - application_generate_entity = AdvancedChatAppGenerateEntity( - task_id=str(uuid.uuid4()), - app_config=app_config, - conversation_id=conversation.id if conversation else None, - inputs={}, - query='', - files=[], - user_id=user.id, - stream=stream, - invoke_from=InvokeFrom.DEBUGGER, - extras=extras, - single_iteration_run=AdvancedChatAppGenerateEntity.SingleIterationRunEntity( - node_id=node_id, - inputs=args['inputs'] - ) - ) - contexts.tenant_id.set(application_generate_entity.app_config.tenant_id) - - return self._generate( - app_model=app_model, - workflow=workflow, - user=user, - invoke_from=InvokeFrom.DEBUGGER, - application_generate_entity=application_generate_entity, - conversation=conversation, - stream=stream - ) def _generate(self, app_model: App, - workflow: Workflow, - user: Union[Account, EndUser], - invoke_from: InvokeFrom, - application_generate_entity: AdvancedChatAppGenerateEntity, - conversation: Conversation = None, - stream: bool = True) \ + workflow: Workflow, + user: Union[Account, EndUser], + invoke_from: InvokeFrom, + application_generate_entity: AdvancedChatAppGenerateEntity, + conversation: Conversation = None, + stream: bool = True) \ -> Union[dict, Generator[dict, None, None]]: is_first_conversation = False if not conversation: @@ -271,30 +204,18 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): var.set(val) with flask_app.app_context(): try: - runner = AdvancedChatAppRunner() - if application_generate_entity.single_iteration_run: - single_iteration_run = application_generate_entity.single_iteration_run - runner.single_iteration_run( - app_id=application_generate_entity.app_config.app_id, - workflow_id=application_generate_entity.app_config.workflow_id, - queue_manager=queue_manager, - inputs=single_iteration_run.inputs, - node_id=single_iteration_run.node_id, - user_id=application_generate_entity.user_id - ) - else: - # get conversation and message - conversation = self._get_conversation(conversation_id) - message = self._get_message(message_id) + # get conversation and message + conversation = self._get_conversation(conversation_id) + message = self._get_message(message_id) - # chatbot app - runner = AdvancedChatAppRunner() - runner.run( - application_generate_entity=application_generate_entity, - queue_manager=queue_manager, - conversation=conversation, - message=message - ) + # chatbot app + runner = AdvancedChatAppRunner() + runner.run( + application_generate_entity=application_generate_entity, + queue_manager=queue_manager, + conversation=conversation, + message=message + ) except GenerateTaskStoppedException: pass except InvokeAuthorizationError: diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 0de5258930..42fdb750ab 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -5,7 +5,6 @@ from collections.abc import Mapping from typing import Any, Optional, cast from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig -from core.app.apps.advanced_chat.workflow_event_trigger_callback import WorkflowEventTriggerCallback from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_runner import AppRunner from core.app.apps.workflow_logging_callback import WorkflowLoggingCallback @@ -17,7 +16,7 @@ from core.app.entities.queue_entities import QueueAnnotationReplyEvent, QueueSto from core.moderation.base import ModerationException from core.workflow.callbacks.base_workflow_callback import WorkflowCallback from core.workflow.entities.node_entities import SystemVariable, UserFrom -from core.workflow.workflow_engine_manager import WorkflowEngineManager +from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db from models.model import App, Conversation, EndUser, Message from models.workflow import Workflow @@ -88,17 +87,13 @@ class AdvancedChatAppRunner(AppRunner): db.session.close() - workflow_callbacks: list[WorkflowCallback] = [WorkflowEventTriggerCallback( - queue_manager=queue_manager, - workflow=workflow - )] - + workflow_callbacks: list[WorkflowCallback] = [] if bool(os.environ.get("DEBUG", 'False').lower() == 'true'): workflow_callbacks.append(WorkflowLoggingCallback()) # RUN WORKFLOW - workflow_engine_manager = WorkflowEngineManager() - workflow_engine_manager.run( + workflow_entry = WorkflowEntry() + workflow_entry.run( workflow=workflow, user_id=application_generate_entity.user_id, user_from=UserFrom.ACCOUNT @@ -116,34 +111,6 @@ class AdvancedChatAppRunner(AppRunner): call_depth=application_generate_entity.call_depth ) - def single_iteration_run(self, app_id: str, workflow_id: str, - queue_manager: AppQueueManager, - inputs: dict, node_id: str, user_id: str) -> None: - """ - Single iteration run - """ - app_record: App = db.session.query(App).filter(App.id == app_id).first() - if not app_record: - raise ValueError("App not found") - - workflow = self.get_workflow(app_model=app_record, workflow_id=workflow_id) - if not workflow: - raise ValueError("Workflow not initialized") - - workflow_callbacks = [WorkflowEventTriggerCallback( - queue_manager=queue_manager, - workflow=workflow - )] - - workflow_engine_manager = WorkflowEngineManager() - workflow_engine_manager.single_step_run_iteration_workflow_node( - workflow=workflow, - node_id=node_id, - user_id=user_id, - user_inputs=inputs, - callbacks=workflow_callbacks - ) - def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: """ Get workflow diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index e5451ffb3b..b4ff94f59d 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -264,15 +264,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc elif isinstance(event, QueueNodeStartedEvent): workflow_node_execution = self._handle_node_start(event) - # search stream_generate_routes if node id is answer start at node - if not self._task_state.current_stream_generate_state and event.node_id in self._stream_generate_routes: - self._task_state.current_stream_generate_state = self._stream_generate_routes[event.node_id] - # reset current route position to 0 - self._task_state.current_stream_generate_state.current_route_position = 0 - - # generate stream outputs when node started - yield from self._generate_stream_outputs_when_node_started() - yield self._workflow_node_start_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, @@ -281,11 +272,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent): workflow_node_execution = self._handle_node_finished(event) - # stream outputs when node finished - generator = self._generate_stream_outputs_when_node_finished() - if generator: - yield from generator - yield self._workflow_node_finish_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution @@ -351,11 +337,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc if delta_text is None: continue - if not self._is_stream_out_support( - event=event - ): - continue - # handle output moderation chunk should_direct_answer = self._handle_output_moderation_chunk(delta_text) if should_direct_answer: diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index cff4ba8af9..15f66b0f81 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -99,6 +99,7 @@ class AppGenerateService: node_id: str, args: Any, streaming: bool = True): + # TODO if app_model.mode == AppMode.ADVANCED_CHAT.value: workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) return AdvancedChatAppGenerator().single_iteration_generate(