diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 6e77f50e65..4f8df6bcec 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -14,7 +14,7 @@ from controllers.console.setup import setup_required from controllers.console.wraps import account_initialization_required from core.app.entities.app_invoke_entities import InvokeFrom from fields.workflow_fields import workflow_fields -from libs.helper import uuid_value +from libs.helper import TimestampField, uuid_value from libs.login import current_user, login_required from models.model import App, AppMode from services.workflow_service import WorkflowService @@ -56,7 +56,7 @@ class DraftWorkflowApi(Resource): args = parser.parse_args() workflow_service = WorkflowService() - workflow_service.sync_draft_workflow( + workflow = workflow_service.sync_draft_workflow( app_model=app_model, graph=args.get('graph'), features=args.get('features'), @@ -64,7 +64,8 @@ class DraftWorkflowApi(Resource): ) return { - "result": "success" + "result": "success", + "updated_at": TimestampField().format(workflow.updated_at) } diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 920adcfb79..898091f52c 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -82,7 +82,7 @@ class AdvancedChatAppRunner(AppRunner): # RUN WORKFLOW workflow_engine_manager = WorkflowEngineManager() - result_generator = workflow_engine_manager.run_workflow( + workflow_engine_manager.run_workflow( app_model=app_record, workflow=workflow, triggered_from=WorkflowRunTriggeredFrom.DEBUGGING @@ -97,11 +97,6 @@ class AdvancedChatAppRunner(AppRunner): callbacks=[WorkflowEventTriggerCallback(queue_manager=queue_manager)] ) - for result in result_generator: - # todo handle workflow and node event - pass - - def handle_input_moderation(self, queue_manager: AppQueueManager, app_record: App, app_generate_entity: AdvancedChatAppGenerateEntity, diff --git a/api/core/workflow/entities/base_node_data_entities.py b/api/core/workflow/entities/base_node_data_entities.py index 32b93ea094..afa6ddff04 100644 --- a/api/core/workflow/entities/base_node_data_entities.py +++ b/api/core/workflow/entities/base_node_data_entities.py @@ -1,7 +1,11 @@ from abc import ABC +from typing import Optional from pydantic import BaseModel class BaseNodeData(ABC, BaseModel): - pass + type: str + + title: str + desc: Optional[str] = None diff --git a/api/core/workflow/entities/workflow_entities.py b/api/core/workflow/entities/workflow_entities.py new file mode 100644 index 0000000000..21126caf30 --- /dev/null +++ b/api/core/workflow/entities/workflow_entities.py @@ -0,0 +1,16 @@ +from decimal import Decimal + +from core.workflow.entities.variable_pool import VariablePool +from models.workflow import WorkflowNodeExecution, WorkflowRun + + +class WorkflowRunState: + workflow_run: WorkflowRun + start_at: float + variable_pool: VariablePool + + total_tokens: int = 0 + total_price: Decimal = Decimal(0) + currency: str = "USD" + + workflow_node_executions: list[WorkflowNodeExecution] = [] diff --git a/api/core/workflow/nodes/base_node.py b/api/core/workflow/nodes/base_node.py index 6f28a3f104..314dfb8f22 100644 --- a/api/core/workflow/nodes/base_node.py +++ b/api/core/workflow/nodes/base_node.py @@ -1,21 +1,25 @@ from abc import abstractmethod from typing import Optional +from core.workflow.callbacks.base_callback import BaseWorkflowCallback from core.workflow.entities.base_node_data_entities import BaseNodeData from core.workflow.entities.node_entities import NodeType from core.workflow.entities.variable_pool import VariablePool class BaseNode: - _node_type: NodeType _node_data_cls: type[BaseNodeData] + _node_type: NodeType + + node_id: str + node_data: BaseNodeData def __init__(self, config: dict) -> None: - self._node_id = config.get("id") - if not self._node_id: + self.node_id = config.get("id") + if not self.node_id: raise ValueError("Node ID is required.") - self._node_data = self._node_data_cls(**config.get("data", {})) + self.node_data = self._node_data_cls(**config.get("data", {})) @abstractmethod def _run(self, variable_pool: Optional[VariablePool] = None, @@ -29,11 +33,13 @@ class BaseNode: raise NotImplementedError def run(self, variable_pool: Optional[VariablePool] = None, - run_args: Optional[dict] = None) -> dict: + run_args: Optional[dict] = None, + callbacks: list[BaseWorkflowCallback] = None) -> dict: """ Run node entry :param variable_pool: variable pool :param run_args: run args + :param callbacks: callbacks :return: """ if variable_pool is None and run_args is None: @@ -52,3 +58,11 @@ class BaseNode: :return: """ return {} + + @property + def node_type(self) -> NodeType: + """ + Get node type + :return: + """ + return self._node_type diff --git a/api/core/workflow/nodes/start/entities.py b/api/core/workflow/nodes/start/entities.py index 25b27cf192..64687db042 100644 --- a/api/core/workflow/nodes/start/entities.py +++ b/api/core/workflow/nodes/start/entities.py @@ -1,5 +1,3 @@ -from typing import Optional - from core.app.app_config.entities import VariableEntity from core.workflow.entities.base_node_data_entities import BaseNodeData from core.workflow.entities.node_entities import NodeType @@ -22,6 +20,4 @@ class StartNodeData(BaseNodeData): """ type: str = NodeType.START.value - title: str - desc: Optional[str] = None variables: list[VariableEntity] = [] diff --git a/api/core/workflow/nodes/start/start_node.py b/api/core/workflow/nodes/start/start_node.py index e218cced3d..74d8541436 100644 --- a/api/core/workflow/nodes/start/start_node.py +++ b/api/core/workflow/nodes/start/start_node.py @@ -7,8 +7,8 @@ from core.workflow.nodes.start.entities import StartNodeData class StartNode(BaseNode): - _node_type = NodeType.START _node_data_cls = StartNodeData + node_type = NodeType.START def _run(self, variable_pool: Optional[VariablePool] = None, run_args: Optional[dict] = None) -> dict: diff --git a/api/core/workflow/workflow_engine_manager.py b/api/core/workflow/workflow_engine_manager.py index 3ad36fe1d2..0ec93dd4b2 100644 --- a/api/core/workflow/workflow_engine_manager.py +++ b/api/core/workflow/workflow_engine_manager.py @@ -1,9 +1,12 @@ import json -from collections.abc import Generator +import time from typing import Optional, Union from core.workflow.callbacks.base_callback import BaseWorkflowCallback from core.workflow.entities.node_entities import NodeType +from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_entities import WorkflowRunState +from core.workflow.nodes.base_node import BaseNode from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode from core.workflow.nodes.end.end_node import EndNode @@ -19,7 +22,16 @@ from core.workflow.nodes.variable_assigner.variable_assigner_node import Variabl from extensions.ext_database import db from models.account import Account from models.model import App, EndUser -from models.workflow import CreatedByRole, Workflow, WorkflowRun, WorkflowRunStatus, WorkflowRunTriggeredFrom +from models.workflow import ( + CreatedByRole, + Workflow, + WorkflowNodeExecution, + WorkflowNodeExecutionStatus, + WorkflowNodeExecutionTriggeredFrom, + WorkflowRun, + WorkflowRunStatus, + WorkflowRunTriggeredFrom, +) node_classes = { NodeType.START: StartNode, @@ -114,7 +126,7 @@ class WorkflowEngineManager: user: Union[Account, EndUser], user_inputs: dict, system_inputs: Optional[dict] = None, - callbacks: list[BaseWorkflowCallback] = None) -> Generator: + callbacks: list[BaseWorkflowCallback] = None) -> None: """ Run workflow :param app_model: App instance @@ -140,11 +152,66 @@ class WorkflowEngineManager: system_inputs=system_inputs ) + # init workflow run state + workflow_run_state = WorkflowRunState( + workflow_run=workflow_run, + start_at=time.perf_counter(), + variable_pool=VariablePool( + system_variables=system_inputs, + ) + ) + if callbacks: for callback in callbacks: callback.on_workflow_run_started(workflow_run) - pass + # fetch start node + start_node = self._get_entry_node(graph) + if not start_node: + self._workflow_run_failed( + workflow_run_state=workflow_run_state, + error='Start node not found in workflow graph', + callbacks=callbacks + ) + return + + try: + predecessor_node = None + current_node = start_node + while True: + # run workflow + self._run_workflow_node( + workflow_run_state=workflow_run_state, + node=current_node, + predecessor_node=predecessor_node, + callbacks=callbacks + ) + + if current_node.node_type == NodeType.END: + break + + # todo fetch next node until end node finished or no next node + current_node = None + + if not current_node: + break + + predecessor_node = current_node + # or max steps 30 reached + # or max execution time 10min reached + except Exception as e: + self._workflow_run_failed( + workflow_run_state=workflow_run_state, + error=str(e), + callbacks=callbacks + ) + return + + # workflow run success + self._workflow_run_success( + workflow_run_state=workflow_run_state, + callbacks=callbacks + ) def _init_workflow_run(self, workflow: Workflow, triggered_from: WorkflowRunTriggeredFrom, @@ -184,7 +251,7 @@ class WorkflowEngineManager: status=WorkflowRunStatus.RUNNING.value, created_by_role=(CreatedByRole.ACCOUNT.value if isinstance(user, Account) else CreatedByRole.END_USER.value), - created_by_id=user.id + created_by=user.id ) db.session.add(workflow_run) @@ -195,6 +262,33 @@ class WorkflowEngineManager: return workflow_run + def _workflow_run_failed(self, workflow_run_state: WorkflowRunState, + error: str, + callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun: + """ + Workflow run failed + :param workflow_run_state: workflow run state + :param error: error message + :param callbacks: workflow callbacks + :return: + """ + workflow_run = workflow_run_state.workflow_run + workflow_run.status = WorkflowRunStatus.FAILED.value + workflow_run.error = error + workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at + workflow_run.total_tokens = workflow_run_state.total_tokens + workflow_run.total_price = workflow_run_state.total_price + workflow_run.currency = workflow_run_state.currency + workflow_run.total_steps = len(workflow_run_state.workflow_node_executions) + + db.session.commit() + + if callbacks: + for callback in callbacks: + callback.on_workflow_run_finished(workflow_run) + + return workflow_run + def _get_entry_node(self, graph: dict) -> Optional[StartNode]: """ Get entry node @@ -210,3 +304,83 @@ class WorkflowEngineManager: return StartNode(config=node_config) return None + + def _run_workflow_node(self, workflow_run_state: WorkflowRunState, + node: BaseNode, + predecessor_node: Optional[BaseNode] = None, + callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution: + # init workflow node execution + start_at = time.perf_counter() + workflow_node_execution = self._init_node_execution_from_workflow_run( + workflow_run_state=workflow_run_state, + node=node, + predecessor_node=predecessor_node, + ) + + # add to workflow node executions + workflow_run_state.workflow_node_executions.append(workflow_node_execution) + + try: + # run node, result must have inputs, process_data, outputs, execution_metadata + node_run_result = node.run( + variable_pool=workflow_run_state.variable_pool, + callbacks=callbacks + ) + except Exception as e: + # node run failed + self._workflow_node_execution_failed( + workflow_node_execution=workflow_node_execution, + error=str(e), + callbacks=callbacks + ) + raise + + # node run success + self._workflow_node_execution_success( + workflow_node_execution=workflow_node_execution, + result=node_run_result, + callbacks=callbacks + ) + + return workflow_node_execution + + def _init_node_execution_from_workflow_run(self, workflow_run_state: WorkflowRunState, + node: BaseNode, + predecessor_node: Optional[BaseNode] = None, + callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution: + """ + Init workflow node execution from workflow run + :param workflow_run_state: workflow run state + :param node: current node + :param predecessor_node: predecessor node if exists + :param callbacks: workflow callbacks + :return: + """ + workflow_run = workflow_run_state.workflow_run + + # init workflow node execution + workflow_node_execution = WorkflowNodeExecution( + tenant_id=workflow_run.tenant_id, + app_id=workflow_run.app_id, + workflow_id=workflow_run.workflow_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value, + workflow_run_id=workflow_run.id, + predecessor_node_id=predecessor_node.node_id if predecessor_node else None, + index=len(workflow_run_state.workflow_node_executions) + 1, + node_id=node.node_id, + node_type=node.node_type.value, + title=node.node_data.title, + type=node.node_type.value, + status=WorkflowNodeExecutionStatus.RUNNING.value, + created_by_role=workflow_run.created_by_role, + created_by=workflow_run.created_by + ) + + db.session.add(workflow_node_execution) + db.session.commit() + + if callbacks: + for callback in callbacks: + callback.on_workflow_node_execute_started(workflow_node_execution) + + return workflow_node_execution diff --git a/api/libs/helper.py b/api/libs/helper.py index a35f4ad471..3eb14c50f0 100644 --- a/api/libs/helper.py +++ b/api/libs/helper.py @@ -15,7 +15,7 @@ def run(script): class TimestampField(fields.Raw): - def format(self, value): + def format(self, value) -> int: return int(value.timestamp()) diff --git a/web/.husky/pre-commit b/web/.husky/pre-commit index 1f8ae9a8d3..4bc7fb77ab 100755 --- a/web/.husky/pre-commit +++ b/web/.husky/pre-commit @@ -31,14 +31,16 @@ if $api_modified; then pip install ruff fi - ruff check ./api - result=$? + ruff check ./api || status=$? - if [ $result -ne 0 ]; then + status=${status:-0} + + + if [ $status -ne 0 ]; then + echo "Ruff linter on api module error, exit code: $status" echo "Please run 'dev/reformat' to fix the fixable linting errors." + exit 1 fi - - exit $result fi if $web_modified; then