Merge branch 'feat/r2' into deploy/rag-dev

This commit is contained in:
jyong 2025-06-05 15:29:37 +08:00
commit 067ec17539
1 changed files with 36 additions and 45 deletions

View File

@ -373,9 +373,6 @@ class RagPipelineService:
tenant_id=pipeline.tenant_id,
node_id=node_id,
)
workflow_node_execution.app_id = pipeline.id
workflow_node_execution.created_by = account.id
workflow_node_execution.workflow_id = draft_workflow.id
db.session.add(workflow_node_execution)
@ -409,8 +406,6 @@ class RagPipelineService:
node_id=node_id,
)
workflow_node_execution.app_id = pipeline.id
workflow_node_execution.created_by = account.id
workflow_node_execution.workflow_id = published_workflow.id
db.session.add(workflow_node_execution)
@ -568,18 +563,17 @@ class RagPipelineService:
node_run_result = None
error = e.error
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = str(uuid4())
workflow_node_execution.tenant_id = tenant_id
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
workflow_node_execution.index = 1
workflow_node_execution.node_id = node_id
workflow_node_execution.node_type = node_instance.node_type
workflow_node_execution.title = node_instance.node_data.title
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
workflow_node_execution.created_by_role = CreatorUserRole.ACCOUNT.value
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
workflow_node_execution = WorkflowNodeExecution(
id=str(uuid4()),
workflow_id=node_instance.workflow_id,
index=1,
node_id=node_id,
node_type=node_instance.node_type,
title=node_instance.node_data.title,
elapsed_time=time.perf_counter() - start_at,
finished_at=datetime.now(UTC).replace(tzinfo=None),
created_at=datetime.now(UTC).replace(tzinfo=None),
)
if run_succeeded and node_run_result:
# create workflow node execution
inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
@ -590,20 +584,18 @@ class RagPipelineService:
)
outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
workflow_node_execution.inputs = json.dumps(inputs)
workflow_node_execution.process_data = json.dumps(process_data)
workflow_node_execution.outputs = json.dumps(outputs)
workflow_node_execution.execution_metadata = (
json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
)
workflow_node_execution.inputs = inputs
workflow_node_execution.process_data = process_data
workflow_node_execution.outputs = outputs
workflow_node_execution.metadata = node_run_result.metadata
if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED
elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION
workflow_node_execution.error = node_run_result.error
else:
# create workflow node execution
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED
workflow_node_execution.error = error
return workflow_node_execution
@ -678,18 +670,18 @@ class RagPipelineService:
break
if not datasource_node_data:
raise ValueError("Datasource node data not found")
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
if datasource_parameters:
datasource_parameters_map = {
item["variable"]: item for item in datasource_parameters
variables = datasource_node_data.get("variables", {})
if variables:
variables_map = {
item["variable"]: item for item in variables
}
else:
datasource_parameters_map = {}
variables = datasource_node_data.get("variables", {})
variables_map = {}
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
user_input_variables = []
for key, value in variables.items():
for key, value in datasource_parameters.items():
if not re.match(r"\{\{#([a-zA-Z0-9_]{1,50}(?:\.[a-zA-Z_][a-zA-Z0-9_]{0,29}){1,10})#\}\}", value["value"]):
user_input_variables.append(datasource_parameters_map.get(key, {}))
user_input_variables.append(variables_map.get(key, {}))
return user_input_variables
def get_draft_first_step_parameters(self, pipeline: Pipeline, node_id: str) -> list[dict]:
@ -710,18 +702,19 @@ class RagPipelineService:
break
if not datasource_node_data:
raise ValueError("Datasource node data not found")
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
if datasource_parameters:
datasource_parameters_map = {
item["variable"]: item for item in datasource_parameters
variables = datasource_node_data.get("variables", {})
if variables:
variables_map = {
item["variable"]: item for item in variables
}
else:
datasource_parameters_map = {}
variables = datasource_node_data.get("variables", {})
variables = {}
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
user_input_variables = []
for key, value in variables.items():
for key, value in datasource_parameters.items():
if not re.match(r"\{\{#([a-zA-Z0-9_]{1,50}(?:\.[a-zA-Z_][a-zA-Z0-9_]{0,29}){1,10})#\}\}", value["value"]):
user_input_variables.append(datasource_parameters_map.get(key, {}))
user_input_variables.append(variables_map.get(key, {}))
return user_input_variables
def get_draft_second_step_parameters(self, pipeline: Pipeline, node_id: str) -> list[dict]:
@ -845,10 +838,8 @@ class RagPipelineService:
order_config=order_config,
triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN,
)
# Convert domain models to database models
workflow_node_executions = [repository.to_db_model(node_execution) for node_execution in node_executions]
return workflow_node_executions
return list(node_executions)
@classmethod
def publish_customized_pipeline_template(cls, pipeline_id: str, args: dict):