diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index b6de92f201..5541349edb 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -373,9 +373,6 @@ class RagPipelineService: tenant_id=pipeline.tenant_id, node_id=node_id, ) - - workflow_node_execution.app_id = pipeline.id - workflow_node_execution.created_by = account.id workflow_node_execution.workflow_id = draft_workflow.id db.session.add(workflow_node_execution) @@ -409,8 +406,6 @@ class RagPipelineService: node_id=node_id, ) - workflow_node_execution.app_id = pipeline.id - workflow_node_execution.created_by = account.id workflow_node_execution.workflow_id = published_workflow.id db.session.add(workflow_node_execution) @@ -568,18 +563,17 @@ class RagPipelineService: node_run_result = None error = e.error - workflow_node_execution = WorkflowNodeExecution() - workflow_node_execution.id = str(uuid4()) - workflow_node_execution.tenant_id = tenant_id - workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value - workflow_node_execution.index = 1 - workflow_node_execution.node_id = node_id - workflow_node_execution.node_type = node_instance.node_type - workflow_node_execution.title = node_instance.node_data.title - workflow_node_execution.elapsed_time = time.perf_counter() - start_at - workflow_node_execution.created_by_role = CreatorUserRole.ACCOUNT.value - workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None) - workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None) + workflow_node_execution = WorkflowNodeExecution( + id=str(uuid4()), + workflow_id=node_instance.workflow_id, + index=1, + node_id=node_id, + node_type=node_instance.node_type, + title=node_instance.node_data.title, + elapsed_time=time.perf_counter() - start_at, + finished_at=datetime.now(UTC).replace(tzinfo=None), + created_at=datetime.now(UTC).replace(tzinfo=None), + ) if run_succeeded and node_run_result: # create workflow node execution inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None @@ -590,20 +584,18 @@ class RagPipelineService: ) outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None - workflow_node_execution.inputs = json.dumps(inputs) - workflow_node_execution.process_data = json.dumps(process_data) - workflow_node_execution.outputs = json.dumps(outputs) - workflow_node_execution.execution_metadata = ( - json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None - ) + workflow_node_execution.inputs = inputs + workflow_node_execution.process_data = process_data + workflow_node_execution.outputs = outputs + workflow_node_execution.metadata = node_run_result.metadata if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED: - workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value + workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION: - workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value + workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION workflow_node_execution.error = node_run_result.error else: # create workflow node execution - workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value + workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED workflow_node_execution.error = error return workflow_node_execution @@ -678,18 +670,18 @@ class RagPipelineService: break if not datasource_node_data: raise ValueError("Datasource node data not found") - datasource_parameters = datasource_node_data.get("datasource_parameters", {}) - if datasource_parameters: - datasource_parameters_map = { - item["variable"]: item for item in datasource_parameters + variables = datasource_node_data.get("variables", {}) + if variables: + variables_map = { + item["variable"]: item for item in variables } else: - datasource_parameters_map = {} - variables = datasource_node_data.get("variables", {}) + variables_map = {} + datasource_parameters = datasource_node_data.get("datasource_parameters", {}) user_input_variables = [] - for key, value in variables.items(): + for key, value in datasource_parameters.items(): if not re.match(r"\{\{#([a-zA-Z0-9_]{1,50}(?:\.[a-zA-Z_][a-zA-Z0-9_]{0,29}){1,10})#\}\}", value["value"]): - user_input_variables.append(datasource_parameters_map.get(key, {})) + user_input_variables.append(variables_map.get(key, {})) return user_input_variables def get_draft_first_step_parameters(self, pipeline: Pipeline, node_id: str) -> list[dict]: @@ -710,18 +702,19 @@ class RagPipelineService: break if not datasource_node_data: raise ValueError("Datasource node data not found") - datasource_parameters = datasource_node_data.get("datasource_parameters", {}) - if datasource_parameters: - datasource_parameters_map = { - item["variable"]: item for item in datasource_parameters + variables = datasource_node_data.get("variables", {}) + if variables: + variables_map = { + item["variable"]: item for item in variables } else: - datasource_parameters_map = {} - variables = datasource_node_data.get("variables", {}) + variables = {} + datasource_parameters = datasource_node_data.get("datasource_parameters", {}) + user_input_variables = [] - for key, value in variables.items(): + for key, value in datasource_parameters.items(): if not re.match(r"\{\{#([a-zA-Z0-9_]{1,50}(?:\.[a-zA-Z_][a-zA-Z0-9_]{0,29}){1,10})#\}\}", value["value"]): - user_input_variables.append(datasource_parameters_map.get(key, {})) + user_input_variables.append(variables_map.get(key, {})) return user_input_variables def get_draft_second_step_parameters(self, pipeline: Pipeline, node_id: str) -> list[dict]: @@ -845,10 +838,8 @@ class RagPipelineService: order_config=order_config, triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, ) - # Convert domain models to database models - workflow_node_executions = [repository.to_db_model(node_execution) for node_execution in node_executions] - return workflow_node_executions + return list(node_executions) @classmethod def publish_customized_pipeline_template(cls, pipeline_id: str, args: dict):