diff --git a/api/controllers/console/__init__.py b/api/controllers/console/__init__.py index a6f803785a..853ca9e3a7 100644 --- a/api/controllers/console/__init__.py +++ b/api/controllers/console/__init__.py @@ -8,7 +8,7 @@ api = ExternalApi(bp) from . import admin, apikey, extension, feature, setup, version, ping # Import app controllers from .app import (advanced_prompt_template, annotation, app, audio, completion, conversation, generator, message, - model_config, site, statistic, workflow, workflow_app_log) + model_config, site, statistic, workflow, workflow_run, workflow_app_log) # Import auth controllers from .auth import activate, data_source_oauth, login, oauth # Import billing controllers diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 5f03a7cd37..6f81da5691 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -15,6 +15,7 @@ from controllers.console.setup import setup_required from controllers.console.wraps import account_initialization_required from core.app.entities.app_invoke_entities import InvokeFrom from fields.workflow_fields import workflow_fields +from fields.workflow_run_fields import workflow_run_node_execution_fields from libs.helper import TimestampField, uuid_value from libs.login import current_user, login_required from models.model import App, AppMode @@ -164,18 +165,24 @@ class DraftWorkflowNodeRunApi(Resource): @login_required @account_initialization_required @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + @marshal_with(workflow_run_node_execution_fields) def post(self, app_model: App, node_id: str): """ Run draft workflow node """ - # TODO - workflow_service = WorkflowService() - workflow_service.run_draft_workflow_node(app_model=app_model, node_id=node_id, account=current_user) + parser = reqparse.RequestParser() + parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json') + args = parser.parse_args() - # TODO - return { - "result": "success" - } + workflow_service = WorkflowService() + workflow_node_execution = workflow_service.run_draft_workflow_node( + app_model=app_model, + node_id=node_id, + user_inputs=args.get('inputs'), + account=current_user + ) + + return workflow_node_execution class PublishedWorkflowApi(Resource): @@ -291,7 +298,7 @@ api.add_resource(DraftWorkflowApi, '/apps//workflows/draft') api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps//advanced-chat/workflows/draft/run') api.add_resource(DraftWorkflowRunApi, '/apps//workflows/draft/run') api.add_resource(WorkflowTaskStopApi, '/apps//workflows/tasks//stop') -api.add_resource(DraftWorkflowNodeRunApi, '/apps//workflows/draft/nodes//run') +api.add_resource(DraftWorkflowNodeRunApi, '/apps//workflows/draft/nodes//run') api.add_resource(PublishedWorkflowApi, '/apps//workflows/published') api.add_resource(DefaultBlockConfigsApi, '/apps//workflows/default-workflow-block-configs') api.add_resource(DefaultBlockConfigApi, '/apps//workflows/default-workflow-block-configs' diff --git a/api/core/workflow/errors.py b/api/core/workflow/errors.py new file mode 100644 index 0000000000..fe79fadf66 --- /dev/null +++ b/api/core/workflow/errors.py @@ -0,0 +1,10 @@ +from core.workflow.entities.node_entities import NodeType + + +class WorkflowNodeRunFailedError(Exception): + def __init__(self, node_id: str, node_type: NodeType, node_title: str, error: str): + self.node_id = node_id + self.node_type = node_type + self.node_title = node_title + self.error = error + super().__init__(f"Node {node_title} run failed: {error}") diff --git a/api/core/workflow/nodes/base_node.py b/api/core/workflow/nodes/base_node.py index a603f484ef..dfba9d0385 100644 --- a/api/core/workflow/nodes/base_node.py +++ b/api/core/workflow/nodes/base_node.py @@ -108,7 +108,7 @@ class BaseNode(ABC): ) @classmethod - def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict: + def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param config: node config @@ -119,7 +119,7 @@ class BaseNode(ABC): @classmethod @abstractmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param node_data: node data diff --git a/api/core/workflow/nodes/code/code_node.py b/api/core/workflow/nodes/code/code_node.py index 2f22a386e5..2c11e5ba00 100644 --- a/api/core/workflow/nodes/code/code_node.py +++ b/api/core/workflow/nodes/code/code_node.py @@ -289,7 +289,7 @@ class CodeNode(BaseNode): return transformed_result @classmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: CodeNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: CodeNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param node_data: node data @@ -297,5 +297,5 @@ class CodeNode(BaseNode): """ return { - variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables - } \ No newline at end of file + variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables + } diff --git a/api/core/workflow/nodes/direct_answer/direct_answer_node.py b/api/core/workflow/nodes/direct_answer/direct_answer_node.py index 9193bab9ee..fedbc9b2d1 100644 --- a/api/core/workflow/nodes/direct_answer/direct_answer_node.py +++ b/api/core/workflow/nodes/direct_answer/direct_answer_node.py @@ -50,10 +50,16 @@ class DirectAnswerNode(BaseNode): ) @classmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param node_data: node data :return: """ - return {} + node_data = cast(cls._node_data_cls, node_data) + + variable_mapping = {} + for variable_selector in node_data.variables: + variable_mapping[variable_selector.variable] = variable_selector.value_selector + + return variable_mapping diff --git a/api/core/workflow/nodes/end/end_node.py b/api/core/workflow/nodes/end/end_node.py index 65b0b86aa0..2666ccc4f9 100644 --- a/api/core/workflow/nodes/end/end_node.py +++ b/api/core/workflow/nodes/end/end_node.py @@ -56,7 +56,7 @@ class EndNode(BaseNode): ) @classmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param node_data: node data diff --git a/api/core/workflow/nodes/http_request/http_request_node.py b/api/core/workflow/nodes/http_request/http_request_node.py index 4ee76deb83..853f8fe5e3 100644 --- a/api/core/workflow/nodes/http_request/http_request_node.py +++ b/api/core/workflow/nodes/http_request/http_request_node.py @@ -48,12 +48,12 @@ class HttpRequestNode(BaseNode): @classmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: HttpRequestNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: HttpRequestNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param node_data: node data :return: """ return { - variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables - } \ No newline at end of file + variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables + } diff --git a/api/core/workflow/nodes/llm/llm_node.py b/api/core/workflow/nodes/llm/llm_node.py index 90a7755b85..41e28937ac 100644 --- a/api/core/workflow/nodes/llm/llm_node.py +++ b/api/core/workflow/nodes/llm/llm_node.py @@ -23,7 +23,7 @@ class LLMNode(BaseNode): pass @classmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param node_data: node data diff --git a/api/core/workflow/nodes/start/start_node.py b/api/core/workflow/nodes/start/start_node.py index 2321e04bd4..08171457fb 100644 --- a/api/core/workflow/nodes/start/start_node.py +++ b/api/core/workflow/nodes/start/start_node.py @@ -69,7 +69,7 @@ class StartNode(BaseNode): return filtered_inputs @classmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param node_data: node data diff --git a/api/core/workflow/nodes/template_transform/template_transform_node.py b/api/core/workflow/nodes/template_transform/template_transform_node.py index a037332f4b..c41f5d1030 100644 --- a/api/core/workflow/nodes/template_transform/template_transform_node.py +++ b/api/core/workflow/nodes/template_transform/template_transform_node.py @@ -72,12 +72,12 @@ class TemplateTransformNode(BaseNode): ) @classmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: TemplateTransformNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: TemplateTransformNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping :param node_data: node data :return: """ return { - variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables + variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables } \ No newline at end of file diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index bfa7db3943..69a97fc206 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -133,12 +133,12 @@ class ToolNode(BaseNode): @classmethod - def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]: + def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]: """ Extract variable selector to variable mapping """ return { - k.value_selector: k.variable + k.variable: k.value_selector for k in cast(ToolNodeData, node_data).tool_parameters if k.variable_type == 'selector' - } \ No newline at end of file + } diff --git a/api/core/workflow/workflow_engine_manager.py b/api/core/workflow/workflow_engine_manager.py index 0bc13cbb5a..17225c19ea 100644 --- a/api/core/workflow/workflow_engine_manager.py +++ b/api/core/workflow/workflow_engine_manager.py @@ -6,6 +6,7 @@ from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType from core.workflow.entities.variable_pool import VariablePool, VariableValue from core.workflow.entities.workflow_entities import WorkflowNodeAndResult, WorkflowRunState +from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.nodes.base_node import BaseNode, UserFrom from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode @@ -180,6 +181,93 @@ class WorkflowEngineManager: callbacks=callbacks ) + def single_step_run_workflow_node(self, workflow: Workflow, + node_id: str, + user_id: str, + user_inputs: dict) -> tuple[BaseNode, NodeRunResult]: + """ + Single step run workflow node + :param workflow: Workflow instance + :param node_id: node id + :param user_id: user id + :param user_inputs: user inputs + :return: + """ + # fetch node info from workflow graph + graph = workflow.graph_dict + if not graph: + raise ValueError('workflow graph not found') + + nodes = graph.get('nodes') + if not nodes: + raise ValueError('nodes not found in workflow graph') + + # fetch node config from node id + node_config = None + for node in nodes: + if node.get('id') == node_id: + node_config = node + break + + if not node_config: + raise ValueError('node id not found in workflow graph') + + # Get node class + node_cls = node_classes.get(NodeType.value_of(node_config.get('data', {}).get('type'))) + + # init workflow run state + node_instance = node_cls( + tenant_id=workflow.tenant_id, + app_id=workflow.app_id, + workflow_id=workflow.id, + user_id=user_id, + user_from=UserFrom.ACCOUNT, + config=node_config + ) + + try: + # init variable pool + variable_pool = VariablePool( + system_variables={}, + user_inputs={} + ) + + # variable selector to variable mapping + try: + variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(node_config) + except NotImplementedError: + variable_mapping = {} + + for variable_key, variable_selector in variable_mapping.items(): + if variable_key not in user_inputs: + raise ValueError(f'Variable key {variable_key} not found in user inputs.') + + # fetch variable node id from variable selector + variable_node_id = variable_selector[0] + variable_key_list = variable_selector[1:] + + # append variable and value to variable pool + variable_pool.append_variable( + node_id=variable_node_id, + variable_key_list=variable_key_list, + value=user_inputs.get(variable_key) + ) + + # run node + node_run_result = node_instance.run( + variable_pool=variable_pool + ) + except Exception as e: + raise WorkflowNodeRunFailedError( + node_id=node_instance.node_id, + node_type=node_instance.node_type, + node_title=node_instance.node_data.title, + error=str(e) + ) + + return node_instance, node_run_result + + def _workflow_run_success(self, callbacks: list[BaseWorkflowCallback] = None) -> None: """ Workflow run success diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index 572f472f1f..3135d91fd3 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -34,11 +34,9 @@ workflow_run_for_list_fields = { } workflow_run_pagination_fields = { - 'page': fields.Integer, - 'limit': fields.Integer(attribute='per_page'), - 'total': fields.Integer, - 'has_more': fields.Boolean(attribute='has_next'), - 'data': fields.List(fields.Nested(workflow_run_for_list_fields), attribute='items') + 'limit': fields.Integer(attribute='limit'), + 'has_more': fields.Boolean(attribute='has_more'), + 'data': fields.List(fields.Nested(workflow_run_for_list_fields), attribute='data') } workflow_run_detail_fields = { diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 70ce1f2ce0..1d3f93f224 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -34,26 +34,26 @@ class WorkflowRunService: if not last_workflow_run: raise ValueError('Last workflow run not exists') - conversations = base_query.filter( + workflow_runs = base_query.filter( WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.id != last_workflow_run.id ).order_by(WorkflowRun.created_at.desc()).limit(limit).all() else: - conversations = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all() + workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all() has_more = False - if len(conversations) == limit: - current_page_first_conversation = conversations[-1] + if len(workflow_runs) == limit: + current_page_first_workflow_run = workflow_runs[-1] rest_count = base_query.filter( - WorkflowRun.created_at < current_page_first_conversation.created_at, - WorkflowRun.id != current_page_first_conversation.id + WorkflowRun.created_at < current_page_first_workflow_run.created_at, + WorkflowRun.id != current_page_first_workflow_run.id ).count() if rest_count > 0: has_more = True return InfiniteScrollPagination( - data=conversations, + data=workflow_runs, limit=limit, has_more=has_more ) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index f8bd80a0b1..2c9c07106c 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -1,4 +1,5 @@ import json +import time from collections.abc import Generator from datetime import datetime from typing import Optional, Union @@ -9,12 +10,21 @@ from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom +from core.model_runtime.utils.encoders import jsonable_encoder from core.workflow.entities.node_entities import NodeType +from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.workflow_engine_manager import WorkflowEngineManager from extensions.ext_database import db from models.account import Account from models.model import App, AppMode, EndUser -from models.workflow import Workflow, WorkflowType +from models.workflow import ( + CreatedByRole, + Workflow, + WorkflowNodeExecution, + WorkflowNodeExecutionStatus, + WorkflowNodeExecutionTriggeredFrom, + WorkflowType, +) from services.workflow.workflow_converter import WorkflowConverter @@ -214,6 +224,80 @@ class WorkflowService: """ AppQueueManager.set_stop_flag(task_id, invoke_from, user.id) + def run_draft_workflow_node(self, app_model: App, + node_id: str, + user_inputs: dict, + account: Account) -> WorkflowNodeExecution: + """ + Run draft workflow node + """ + # fetch draft workflow by app_model + draft_workflow = self.get_draft_workflow(app_model=app_model) + if not draft_workflow: + raise ValueError('Workflow not initialized') + + # run draft workflow node + workflow_engine_manager = WorkflowEngineManager() + start_at = time.perf_counter() + + try: + node_instance, node_run_result = workflow_engine_manager.single_step_run_workflow_node( + workflow=draft_workflow, + node_id=node_id, + user_inputs=user_inputs, + user_id=account.id, + ) + except WorkflowNodeRunFailedError as e: + workflow_node_execution = WorkflowNodeExecution( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + workflow_id=draft_workflow.id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value, + index=1, + node_id=e.node_id, + node_type=e.node_type.value, + title=e.node_title, + status=WorkflowNodeExecutionStatus.FAILED.value, + error=e.error, + elapsed_time=time.perf_counter() - start_at, + created_by_role=CreatedByRole.ACCOUNT.value, + created_by=account.id, + created_at=datetime.utcnow(), + finished_at=datetime.utcnow() + ) + db.session.add(workflow_node_execution) + db.session.commit() + + return workflow_node_execution + + # create workflow node execution + workflow_node_execution = WorkflowNodeExecution( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + workflow_id=draft_workflow.id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value, + index=1, + node_id=node_id, + node_type=node_instance.node_type.value, + title=node_instance.node_data.title, + inputs=json.dumps(node_run_result.inputs) if node_run_result.inputs else None, + process_data=json.dumps(node_run_result.process_data) if node_run_result.process_data else None, + outputs=json.dumps(node_run_result.outputs) if node_run_result.outputs else None, + execution_metadata=(json.dumps(jsonable_encoder(node_run_result.metadata)) + if node_run_result.metadata else None), + status=WorkflowNodeExecutionStatus.SUCCEEDED.value, + elapsed_time=time.perf_counter() - start_at, + created_by_role=CreatedByRole.ACCOUNT.value, + created_by=account.id, + created_at=datetime.utcnow(), + finished_at=datetime.utcnow() + ) + + db.session.add(workflow_node_execution) + db.session.commit() + + return workflow_node_execution + def convert_to_workflow(self, app_model: App, account: Account) -> App: """ Basic mode of chatbot app(expert mode) to workflow