mirror of https://github.com/langgenius/dify.git
Merge remote-tracking branch 'origin/main' into feat/trigger
Resolve merge conflict in use-workflow.ts: - Keep trigger branch workflow-entry utilities imports - Preserve SUPPORT_OUTPUT_VARS_NODE from main branch - Remove unused PARALLEL_DEPTH_LIMIT import 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
commit
d68a9f1850
|
|
@ -471,7 +471,6 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
|
|||
WORKFLOW_MAX_EXECUTION_STEPS=500
|
||||
WORKFLOW_MAX_EXECUTION_TIME=1200
|
||||
WORKFLOW_CALL_MAX_DEPTH=5
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
|
||||
MAX_VARIABLE_SIZE=204800
|
||||
|
||||
# GraphEngine Worker Pool Configuration
|
||||
|
|
|
|||
|
|
@ -588,11 +588,6 @@ class WorkflowConfig(BaseSettings):
|
|||
default=5,
|
||||
)
|
||||
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT: PositiveInt = Field(
|
||||
description="Maximum allowed depth for nested parallel executions",
|
||||
default=3,
|
||||
)
|
||||
|
||||
MAX_VARIABLE_SIZE: PositiveInt = Field(
|
||||
description="Maximum size in bytes for a single variable in workflows. Default to 200 KB.",
|
||||
default=200 * 1024,
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ from sqlalchemy.orm import Session
|
|||
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
|
||||
|
||||
import services
|
||||
from configs import dify_config
|
||||
from controllers.console import api, console_ns
|
||||
from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
|
||||
from controllers.console.app.wraps import get_app_model
|
||||
|
|
@ -799,24 +798,6 @@ class ConvertToWorkflowApi(Resource):
|
|||
}
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/config")
|
||||
class WorkflowConfigApi(Resource):
|
||||
"""Resource for workflow configuration."""
|
||||
|
||||
@api.doc("get_workflow_config")
|
||||
@api.doc(description="Get workflow configuration")
|
||||
@api.doc(params={"app_id": "Application ID"})
|
||||
@api.response(200, "Workflow configuration retrieved successfully")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App):
|
||||
return {
|
||||
"parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
|
||||
}
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows")
|
||||
class PublishedAllWorkflowApi(Resource):
|
||||
@api.doc("get_all_published_workflows")
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ from sqlalchemy.orm import Session
|
|||
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
|
||||
|
||||
import services
|
||||
from configs import dify_config
|
||||
from controllers.console import api
|
||||
from controllers.console.app.error import (
|
||||
ConversationCompletedError,
|
||||
|
|
@ -609,18 +608,6 @@ class DefaultRagPipelineBlockConfigApi(Resource):
|
|||
return rag_pipeline_service.get_default_block_config(node_type=block_type, filters=filters)
|
||||
|
||||
|
||||
class RagPipelineConfigApi(Resource):
|
||||
"""Resource for rag pipeline configuration."""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, pipeline_id):
|
||||
return {
|
||||
"parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
|
||||
}
|
||||
|
||||
|
||||
class PublishedAllRagPipelineApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
|
|
@ -985,10 +972,6 @@ api.add_resource(
|
|||
DraftRagPipelineApi,
|
||||
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft",
|
||||
)
|
||||
api.add_resource(
|
||||
RagPipelineConfigApi,
|
||||
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft/config",
|
||||
)
|
||||
api.add_resource(
|
||||
DraftRagPipelineRunApi,
|
||||
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft/run",
|
||||
|
|
|
|||
|
|
@ -551,7 +551,7 @@ class AdvancedChatAppGenerateTaskPipeline:
|
|||
total_steps=validated_state.node_run_steps,
|
||||
outputs=event.outputs,
|
||||
exceptions_count=event.exceptions_count,
|
||||
conversation_id=None,
|
||||
conversation_id=self._conversation_id,
|
||||
trace_manager=trace_manager,
|
||||
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,38 +1,28 @@
|
|||
import json
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from opentelemetry.trace import Link, Status, StatusCode
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from core.ops.aliyun_trace.data_exporter.traceclient import (
|
||||
TraceClient,
|
||||
build_endpoint,
|
||||
convert_datetime_to_nanoseconds,
|
||||
convert_to_span_id,
|
||||
convert_to_trace_id,
|
||||
create_link,
|
||||
generate_span_id,
|
||||
)
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData, TraceMetadata
|
||||
from core.ops.aliyun_trace.entities.semconv import (
|
||||
GEN_AI_COMPLETION,
|
||||
GEN_AI_FRAMEWORK,
|
||||
GEN_AI_MODEL_NAME,
|
||||
GEN_AI_PROMPT,
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE,
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE,
|
||||
GEN_AI_RESPONSE_FINISH_REASON,
|
||||
GEN_AI_SESSION_ID,
|
||||
GEN_AI_SPAN_KIND,
|
||||
GEN_AI_SYSTEM,
|
||||
GEN_AI_USAGE_INPUT_TOKENS,
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS,
|
||||
GEN_AI_USAGE_TOTAL_TOKENS,
|
||||
GEN_AI_USER_ID,
|
||||
INPUT_VALUE,
|
||||
OUTPUT_VALUE,
|
||||
RETRIEVAL_DOCUMENT,
|
||||
RETRIEVAL_QUERY,
|
||||
TOOL_DESCRIPTION,
|
||||
|
|
@ -40,6 +30,15 @@ from core.ops.aliyun_trace.entities.semconv import (
|
|||
TOOL_PARAMETERS,
|
||||
GenAISpanKind,
|
||||
)
|
||||
from core.ops.aliyun_trace.utils import (
|
||||
create_common_span_attributes,
|
||||
create_links_from_trace_id,
|
||||
create_status_from_error,
|
||||
extract_retrieval_documents,
|
||||
get_user_id_from_message_data,
|
||||
get_workflow_node_status,
|
||||
serialize_json_data,
|
||||
)
|
||||
from core.ops.base_trace_instance import BaseTraceInstance
|
||||
from core.ops.entities.config_entity import AliyunConfig
|
||||
from core.ops.entities.trace_entity import (
|
||||
|
|
@ -52,12 +51,11 @@ from core.ops.entities.trace_entity import (
|
|||
ToolTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.rag.models.document import Document
|
||||
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
|
||||
from core.workflow.entities import WorkflowNodeExecution
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||
from extensions.ext_database import db
|
||||
from models import Account, App, EndUser, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom
|
||||
from models import WorkflowNodeExecutionTriggeredFrom
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -68,8 +66,7 @@ class AliyunDataTrace(BaseTraceInstance):
|
|||
aliyun_config: AliyunConfig,
|
||||
):
|
||||
super().__init__(aliyun_config)
|
||||
base_url = aliyun_config.endpoint.rstrip("/")
|
||||
endpoint = urljoin(base_url, f"adapt_{aliyun_config.license_key}/api/otlp/traces")
|
||||
endpoint = build_endpoint(aliyun_config.endpoint, aliyun_config.license_key)
|
||||
self.trace_client = TraceClient(service_name=aliyun_config.app_name, endpoint=endpoint)
|
||||
|
||||
def trace(self, trace_info: BaseTraceInfo):
|
||||
|
|
@ -95,423 +92,422 @@ class AliyunDataTrace(BaseTraceInstance):
|
|||
try:
|
||||
return self.trace_client.get_project_url()
|
||||
except Exception as e:
|
||||
logger.info("Aliyun get run url failed: %s", str(e), exc_info=True)
|
||||
raise ValueError(f"Aliyun get run url failed: {str(e)}")
|
||||
logger.info("Aliyun get project url failed: %s", str(e), exc_info=True)
|
||||
raise ValueError(f"Aliyun get project url failed: {str(e)}")
|
||||
|
||||
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
||||
trace_id = convert_to_trace_id(trace_info.workflow_run_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, "workflow")
|
||||
self.add_workflow_span(trace_id, workflow_span_id, trace_info, links)
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(trace_info.workflow_run_id),
|
||||
workflow_span_id=convert_to_span_id(trace_info.workflow_run_id, "workflow"),
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=str(trace_info.metadata.get("user_id") or ""),
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
|
||||
self.add_workflow_span(trace_info, trace_metadata)
|
||||
|
||||
workflow_node_executions = self.get_workflow_node_executions(trace_info)
|
||||
for node_execution in workflow_node_executions:
|
||||
node_span = self.build_workflow_node_span(node_execution, trace_id, trace_info, workflow_span_id)
|
||||
node_span = self.build_workflow_node_span(node_execution, trace_info, trace_metadata)
|
||||
self.trace_client.add_span(node_span)
|
||||
|
||||
def message_trace(self, trace_info: MessageTraceInfo):
|
||||
message_data = trace_info.message_data
|
||||
if message_data is None:
|
||||
return
|
||||
|
||||
message_id = trace_info.message_id
|
||||
user_id = get_user_id_from_message_data(message_data)
|
||||
status = create_status_from_error(trace_info.error)
|
||||
|
||||
user_id = message_data.from_account_id
|
||||
if message_data.from_end_user_id:
|
||||
end_user_data: EndUser | None = (
|
||||
db.session.query(EndUser).where(EndUser.id == message_data.from_end_user_id).first()
|
||||
)
|
||||
if end_user_data is not None:
|
||||
user_id = end_user_data.session_id
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
workflow_span_id=0,
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=user_id,
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
|
||||
status: Status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
|
||||
trace_id = convert_to_trace_id(message_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
inputs_json = serialize_json_data(trace_info.inputs)
|
||||
outputs_str = str(trace_info.outputs)
|
||||
|
||||
message_span_id = convert_to_span_id(message_id, "message")
|
||||
message_span = SpanData(
|
||||
trace_id=trace_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=None,
|
||||
span_id=message_span_id,
|
||||
name="message",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(trace_info.outputs),
|
||||
},
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_str,
|
||||
),
|
||||
status=status,
|
||||
links=links,
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
self.trace_client.add_span(message_span)
|
||||
|
||||
app_model_config = getattr(trace_info.message_data, "app_model_config", {})
|
||||
app_model_config = getattr(message_data, "app_model_config", {})
|
||||
pre_prompt = getattr(app_model_config, "pre_prompt", "")
|
||||
inputs_data = getattr(trace_info.message_data, "inputs", {})
|
||||
inputs_data = getattr(message_data, "inputs", {})
|
||||
|
||||
llm_span = SpanData(
|
||||
trace_id=trace_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=message_span_id,
|
||||
span_id=convert_to_span_id(message_id, "llm"),
|
||||
name="llm",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.LLM,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_str,
|
||||
),
|
||||
GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name") or "",
|
||||
GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider") or "",
|
||||
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens),
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens),
|
||||
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens),
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(inputs_data, ensure_ascii=False),
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE: serialize_json_data(inputs_data),
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: pre_prompt,
|
||||
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
GEN_AI_COMPLETION: str(trace_info.outputs),
|
||||
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(trace_info.outputs),
|
||||
GEN_AI_PROMPT: inputs_json,
|
||||
GEN_AI_COMPLETION: outputs_str,
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
self.trace_client.add_span(llm_span)
|
||||
|
||||
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
|
||||
if trace_info.message_data is None:
|
||||
return
|
||||
|
||||
message_id = trace_info.message_id
|
||||
|
||||
trace_id = convert_to_trace_id(message_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
workflow_span_id=0,
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=str(trace_info.metadata.get("user_id") or ""),
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
|
||||
documents_data = extract_retrieval_documents(trace_info.documents)
|
||||
documents_json = serialize_json_data(documents_data)
|
||||
inputs_str = str(trace_info.inputs)
|
||||
|
||||
dataset_retrieval_span = SpanData(
|
||||
trace_id=trace_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=convert_to_span_id(message_id, "message"),
|
||||
span_id=generate_span_id(),
|
||||
name="dataset_retrieval",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
RETRIEVAL_QUERY: str(trace_info.inputs),
|
||||
RETRIEVAL_DOCUMENT: json.dumps(documents_data, ensure_ascii=False),
|
||||
INPUT_VALUE: str(trace_info.inputs),
|
||||
OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False),
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.RETRIEVER,
|
||||
inputs=inputs_str,
|
||||
outputs=documents_json,
|
||||
),
|
||||
RETRIEVAL_QUERY: inputs_str,
|
||||
RETRIEVAL_DOCUMENT: documents_json,
|
||||
},
|
||||
links=links,
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
self.trace_client.add_span(dataset_retrieval_span)
|
||||
|
||||
def tool_trace(self, trace_info: ToolTraceInfo):
|
||||
if trace_info.message_data is None:
|
||||
return
|
||||
|
||||
message_id = trace_info.message_id
|
||||
status = create_status_from_error(trace_info.error)
|
||||
|
||||
status: Status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
workflow_span_id=0,
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=str(trace_info.metadata.get("user_id") or ""),
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
|
||||
trace_id = convert_to_trace_id(message_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
tool_config_json = serialize_json_data(trace_info.tool_config)
|
||||
tool_inputs_json = serialize_json_data(trace_info.tool_inputs)
|
||||
inputs_json = serialize_json_data(trace_info.inputs)
|
||||
|
||||
tool_span = SpanData(
|
||||
trace_id=trace_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=convert_to_span_id(message_id, "message"),
|
||||
span_id=generate_span_id(),
|
||||
name=trace_info.tool_name,
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.TOOL,
|
||||
inputs=inputs_json,
|
||||
outputs=str(trace_info.tool_outputs),
|
||||
),
|
||||
TOOL_NAME: trace_info.tool_name,
|
||||
TOOL_DESCRIPTION: json.dumps(trace_info.tool_config, ensure_ascii=False),
|
||||
TOOL_PARAMETERS: json.dumps(trace_info.tool_inputs, ensure_ascii=False),
|
||||
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(trace_info.tool_outputs),
|
||||
TOOL_DESCRIPTION: tool_config_json,
|
||||
TOOL_PARAMETERS: tool_inputs_json,
|
||||
},
|
||||
status=status,
|
||||
links=links,
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
self.trace_client.add_span(tool_span)
|
||||
|
||||
def get_workflow_node_executions(self, trace_info: WorkflowTraceInfo) -> Sequence[WorkflowNodeExecution]:
|
||||
# through workflow_run_id get all_nodes_execution using repository
|
||||
session_factory = sessionmaker(bind=db.engine)
|
||||
# Find the app's creator account
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
# Get the app to find its creator
|
||||
app_id = trace_info.metadata.get("app_id")
|
||||
if not app_id:
|
||||
raise ValueError("No app_id found in trace_info metadata")
|
||||
app_stmt = select(App).where(App.id == app_id)
|
||||
app = session.scalar(app_stmt)
|
||||
if not app:
|
||||
raise ValueError(f"App with id {app_id} not found")
|
||||
app_id = trace_info.metadata.get("app_id")
|
||||
if not app_id:
|
||||
raise ValueError("No app_id found in trace_info metadata")
|
||||
|
||||
if not app.created_by:
|
||||
raise ValueError(f"App with id {app_id} has no creator (created_by is None)")
|
||||
account_stmt = select(Account).where(Account.id == app.created_by)
|
||||
service_account = session.scalar(account_stmt)
|
||||
if not service_account:
|
||||
raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}")
|
||||
current_tenant = (
|
||||
session.query(TenantAccountJoin).filter_by(account_id=service_account.id, current=True).first()
|
||||
)
|
||||
if not current_tenant:
|
||||
raise ValueError(f"Current tenant not found for account {service_account.id}")
|
||||
service_account.set_tenant_id(current_tenant.tenant_id)
|
||||
service_account = self.get_service_account_with_tenant(app_id)
|
||||
|
||||
session_factory = sessionmaker(bind=db.engine)
|
||||
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
|
||||
session_factory=session_factory,
|
||||
user=service_account,
|
||||
app_id=app_id,
|
||||
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
|
||||
)
|
||||
# Get all executions for this workflow run
|
||||
workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run(
|
||||
workflow_run_id=trace_info.workflow_run_id
|
||||
)
|
||||
return workflow_node_executions
|
||||
|
||||
return workflow_node_execution_repository.get_by_workflow_run(workflow_run_id=trace_info.workflow_run_id)
|
||||
|
||||
def build_workflow_node_span(
|
||||
self, node_execution: WorkflowNodeExecution, trace_id: int, trace_info: WorkflowTraceInfo, workflow_span_id: int
|
||||
self, node_execution: WorkflowNodeExecution, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata
|
||||
):
|
||||
try:
|
||||
if node_execution.node_type == NodeType.LLM:
|
||||
node_span = self.build_workflow_llm_span(trace_id, workflow_span_id, trace_info, node_execution)
|
||||
node_span = self.build_workflow_llm_span(trace_info, node_execution, trace_metadata)
|
||||
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
||||
node_span = self.build_workflow_retrieval_span(trace_id, workflow_span_id, trace_info, node_execution)
|
||||
node_span = self.build_workflow_retrieval_span(trace_info, node_execution, trace_metadata)
|
||||
elif node_execution.node_type == NodeType.TOOL:
|
||||
node_span = self.build_workflow_tool_span(trace_id, workflow_span_id, trace_info, node_execution)
|
||||
node_span = self.build_workflow_tool_span(trace_info, node_execution, trace_metadata)
|
||||
else:
|
||||
node_span = self.build_workflow_task_span(trace_id, workflow_span_id, trace_info, node_execution)
|
||||
node_span = self.build_workflow_task_span(trace_info, node_execution, trace_metadata)
|
||||
return node_span
|
||||
except Exception as e:
|
||||
logger.debug("Error occurred in build_workflow_node_span: %s", e, exc_info=True)
|
||||
return None
|
||||
|
||||
def get_workflow_node_status(self, node_execution: WorkflowNodeExecution) -> Status:
|
||||
span_status: Status = Status(StatusCode.UNSET)
|
||||
if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
||||
span_status = Status(StatusCode.OK)
|
||||
elif node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]:
|
||||
span_status = Status(StatusCode.ERROR, str(node_execution.error))
|
||||
return span_status
|
||||
|
||||
def build_workflow_task_span(
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata
|
||||
) -> SpanData:
|
||||
inputs_json = serialize_json_data(node_execution.inputs)
|
||||
outputs_json = serialize_json_data(node_execution.outputs)
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=trace_metadata.workflow_span_id,
|
||||
span_id=convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
||||
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TASK.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: json.dumps(node_execution.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
|
||||
},
|
||||
status=self.get_workflow_node_status(node_execution),
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.TASK,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_json,
|
||||
),
|
||||
status=get_workflow_node_status(node_execution),
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
|
||||
def build_workflow_tool_span(
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata
|
||||
) -> SpanData:
|
||||
tool_des = {}
|
||||
if node_execution.metadata:
|
||||
tool_des = node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {})
|
||||
|
||||
inputs_json = serialize_json_data(node_execution.inputs or {})
|
||||
outputs_json = serialize_json_data(node_execution.outputs)
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=trace_metadata.workflow_span_id,
|
||||
span_id=convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
||||
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.TOOL,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_json,
|
||||
),
|
||||
TOOL_NAME: node_execution.title,
|
||||
TOOL_DESCRIPTION: json.dumps(tool_des, ensure_ascii=False),
|
||||
TOOL_PARAMETERS: json.dumps(node_execution.inputs or {}, ensure_ascii=False),
|
||||
INPUT_VALUE: json.dumps(node_execution.inputs or {}, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
|
||||
TOOL_DESCRIPTION: serialize_json_data(tool_des),
|
||||
TOOL_PARAMETERS: inputs_json,
|
||||
},
|
||||
status=self.get_workflow_node_status(node_execution),
|
||||
status=get_workflow_node_status(node_execution),
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
|
||||
def build_workflow_retrieval_span(
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata
|
||||
) -> SpanData:
|
||||
input_value = ""
|
||||
if node_execution.inputs:
|
||||
input_value = str(node_execution.inputs.get("query", ""))
|
||||
output_value = ""
|
||||
if node_execution.outputs:
|
||||
output_value = json.dumps(node_execution.outputs.get("result", []), ensure_ascii=False)
|
||||
input_value = str(node_execution.inputs.get("query", "")) if node_execution.inputs else ""
|
||||
output_value = serialize_json_data(node_execution.outputs.get("result", [])) if node_execution.outputs else ""
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=trace_metadata.workflow_span_id,
|
||||
span_id=convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
||||
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.RETRIEVER,
|
||||
inputs=input_value,
|
||||
outputs=output_value,
|
||||
),
|
||||
RETRIEVAL_QUERY: input_value,
|
||||
RETRIEVAL_DOCUMENT: output_value,
|
||||
INPUT_VALUE: input_value,
|
||||
OUTPUT_VALUE: output_value,
|
||||
},
|
||||
status=self.get_workflow_node_status(node_execution),
|
||||
status=get_workflow_node_status(node_execution),
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
|
||||
def build_workflow_llm_span(
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata
|
||||
) -> SpanData:
|
||||
process_data = node_execution.process_data or {}
|
||||
outputs = node_execution.outputs or {}
|
||||
usage_data = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {})
|
||||
|
||||
prompts_json = serialize_json_data(process_data.get("prompts", []))
|
||||
text_output = str(outputs.get("text", ""))
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=trace_metadata.workflow_span_id,
|
||||
span_id=convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
||||
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.LLM,
|
||||
inputs=prompts_json,
|
||||
outputs=text_output,
|
||||
),
|
||||
GEN_AI_MODEL_NAME: process_data.get("model_name") or "",
|
||||
GEN_AI_SYSTEM: process_data.get("model_provider") or "",
|
||||
GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)),
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)),
|
||||
GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)),
|
||||
GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
|
||||
GEN_AI_COMPLETION: str(outputs.get("text", "")),
|
||||
GEN_AI_PROMPT: prompts_json,
|
||||
GEN_AI_COMPLETION: text_output,
|
||||
GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason") or "",
|
||||
INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(outputs.get("text", "")),
|
||||
},
|
||||
status=self.get_workflow_node_status(node_execution),
|
||||
status=get_workflow_node_status(node_execution),
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
|
||||
def add_workflow_span(
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, links: Sequence[Link]
|
||||
):
|
||||
def add_workflow_span(self, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata):
|
||||
message_span_id = None
|
||||
if trace_info.message_id:
|
||||
message_span_id = convert_to_span_id(trace_info.message_id, "message")
|
||||
user_id = trace_info.metadata.get("user_id")
|
||||
status: Status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
if message_span_id: # chatflow
|
||||
status = create_status_from_error(trace_info.error)
|
||||
|
||||
inputs_json = serialize_json_data(trace_info.workflow_run_inputs)
|
||||
outputs_json = serialize_json_data(trace_info.workflow_run_outputs)
|
||||
|
||||
if message_span_id:
|
||||
message_span = SpanData(
|
||||
trace_id=trace_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=None,
|
||||
span_id=message_span_id,
|
||||
name="message",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: trace_info.workflow_run_inputs.get("sys.query") or "",
|
||||
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
|
||||
},
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=trace_info.workflow_run_inputs.get("sys.query") or "",
|
||||
outputs=outputs_json,
|
||||
),
|
||||
status=status,
|
||||
links=links,
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
self.trace_client.add_span(message_span)
|
||||
|
||||
workflow_span = SpanData(
|
||||
trace_id=trace_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=message_span_id,
|
||||
span_id=workflow_span_id,
|
||||
span_id=trace_metadata.workflow_span_id,
|
||||
name="workflow",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
|
||||
},
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_json,
|
||||
),
|
||||
status=status,
|
||||
links=links,
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
self.trace_client.add_span(workflow_span)
|
||||
|
||||
def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
|
||||
message_id = trace_info.message_id
|
||||
status: Status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
status = create_status_from_error(trace_info.error)
|
||||
|
||||
trace_id = convert_to_trace_id(message_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
workflow_span_id=0,
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=str(trace_info.metadata.get("user_id") or ""),
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
|
||||
inputs_json = serialize_json_data(trace_info.inputs)
|
||||
suggested_question_json = serialize_json_data(trace_info.suggested_question)
|
||||
|
||||
suggested_question_span = SpanData(
|
||||
trace_id=trace_id,
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=convert_to_span_id(message_id, "message"),
|
||||
span_id=convert_to_span_id(message_id, "suggested_question"),
|
||||
name="suggested_question",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.LLM,
|
||||
inputs=inputs_json,
|
||||
outputs=suggested_question_json,
|
||||
),
|
||||
GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name") or "",
|
||||
GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider") or "",
|
||||
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
GEN_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False),
|
||||
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False),
|
||||
GEN_AI_PROMPT: inputs_json,
|
||||
GEN_AI_COMPLETION: suggested_question_json,
|
||||
},
|
||||
status=status,
|
||||
links=links,
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
self.trace_client.add_span(suggested_question_span)
|
||||
|
||||
|
||||
def extract_retrieval_documents(documents: list[Document]):
|
||||
documents_data = []
|
||||
for document in documents:
|
||||
document_data = {
|
||||
"content": document.page_content,
|
||||
"metadata": {
|
||||
"dataset_id": document.metadata.get("dataset_id"),
|
||||
"doc_id": document.metadata.get("doc_id"),
|
||||
"document_id": document.metadata.get("document_id"),
|
||||
},
|
||||
"score": document.metadata.get("score"),
|
||||
}
|
||||
documents_data.append(document_data)
|
||||
return documents_data
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ import uuid
|
|||
from collections import deque
|
||||
from collections.abc import Sequence
|
||||
from datetime import datetime
|
||||
from typing import Final
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import httpx
|
||||
from opentelemetry import trace as trace_api
|
||||
|
|
@ -20,8 +22,12 @@ from opentelemetry.trace import Link, SpanContext, TraceFlags
|
|||
from configs import dify_config
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
|
||||
|
||||
INVALID_SPAN_ID = 0x0000000000000000
|
||||
INVALID_TRACE_ID = 0x00000000000000000000000000000000
|
||||
INVALID_SPAN_ID: Final[int] = 0x0000000000000000
|
||||
INVALID_TRACE_ID: Final[int] = 0x00000000000000000000000000000000
|
||||
DEFAULT_TIMEOUT: Final[int] = 5
|
||||
DEFAULT_MAX_QUEUE_SIZE: Final[int] = 1000
|
||||
DEFAULT_SCHEDULE_DELAY_SEC: Final[int] = 5
|
||||
DEFAULT_MAX_EXPORT_BATCH_SIZE: Final[int] = 50
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -31,9 +37,9 @@ class TraceClient:
|
|||
self,
|
||||
service_name: str,
|
||||
endpoint: str,
|
||||
max_queue_size: int = 1000,
|
||||
schedule_delay_sec: int = 5,
|
||||
max_export_batch_size: int = 50,
|
||||
max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE,
|
||||
schedule_delay_sec: int = DEFAULT_SCHEDULE_DELAY_SEC,
|
||||
max_export_batch_size: int = DEFAULT_MAX_EXPORT_BATCH_SIZE,
|
||||
):
|
||||
self.endpoint = endpoint
|
||||
self.resource = Resource(
|
||||
|
|
@ -63,9 +69,9 @@ class TraceClient:
|
|||
def export(self, spans: Sequence[ReadableSpan]):
|
||||
self.exporter.export(spans)
|
||||
|
||||
def api_check(self):
|
||||
def api_check(self) -> bool:
|
||||
try:
|
||||
response = httpx.head(self.endpoint, timeout=5)
|
||||
response = httpx.head(self.endpoint, timeout=DEFAULT_TIMEOUT)
|
||||
if response.status_code == 405:
|
||||
return True
|
||||
else:
|
||||
|
|
@ -75,12 +81,13 @@ class TraceClient:
|
|||
logger.debug("AliyunTrace API check failed: %s", str(e))
|
||||
raise ValueError(f"AliyunTrace API check failed: {str(e)}")
|
||||
|
||||
def get_project_url(self):
|
||||
def get_project_url(self) -> str:
|
||||
return "https://arms.console.aliyun.com/#/llm"
|
||||
|
||||
def add_span(self, span_data: SpanData):
|
||||
def add_span(self, span_data: SpanData | None) -> None:
|
||||
if span_data is None:
|
||||
return
|
||||
|
||||
span: ReadableSpan = self.span_builder.build_span(span_data)
|
||||
with self.condition:
|
||||
if len(self.queue) == self.max_queue_size:
|
||||
|
|
@ -92,14 +99,14 @@ class TraceClient:
|
|||
if len(self.queue) >= self.max_export_batch_size:
|
||||
self.condition.notify()
|
||||
|
||||
def _worker(self):
|
||||
def _worker(self) -> None:
|
||||
while not self.done:
|
||||
with self.condition:
|
||||
if len(self.queue) < self.max_export_batch_size and not self.done:
|
||||
self.condition.wait(timeout=self.schedule_delay_sec)
|
||||
self._export_batch()
|
||||
|
||||
def _export_batch(self):
|
||||
def _export_batch(self) -> None:
|
||||
spans_to_export: list[ReadableSpan] = []
|
||||
with self.condition:
|
||||
while len(spans_to_export) < self.max_export_batch_size and self.queue:
|
||||
|
|
@ -111,7 +118,7 @@ class TraceClient:
|
|||
except Exception as e:
|
||||
logger.debug("Error exporting spans: %s", e)
|
||||
|
||||
def shutdown(self):
|
||||
def shutdown(self) -> None:
|
||||
with self.condition:
|
||||
self.done = True
|
||||
self.condition.notify_all()
|
||||
|
|
@ -121,7 +128,7 @@ class TraceClient:
|
|||
|
||||
|
||||
class SpanBuilder:
|
||||
def __init__(self, resource):
|
||||
def __init__(self, resource: Resource) -> None:
|
||||
self.resource = resource
|
||||
self.instrumentation_scope = InstrumentationScope(
|
||||
__name__,
|
||||
|
|
@ -167,8 +174,12 @@ class SpanBuilder:
|
|||
|
||||
|
||||
def create_link(trace_id_str: str) -> Link:
|
||||
placeholder_span_id = 0x0000000000000000
|
||||
trace_id = int(trace_id_str, 16)
|
||||
placeholder_span_id = INVALID_SPAN_ID
|
||||
try:
|
||||
trace_id = int(trace_id_str, 16)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid trace ID format: {trace_id_str}") from e
|
||||
|
||||
span_context = SpanContext(
|
||||
trace_id=trace_id, span_id=placeholder_span_id, is_remote=False, trace_flags=TraceFlags(TraceFlags.SAMPLED)
|
||||
)
|
||||
|
|
@ -184,26 +195,29 @@ def generate_span_id() -> int:
|
|||
|
||||
|
||||
def convert_to_trace_id(uuid_v4: str | None) -> int:
|
||||
if uuid_v4 is None:
|
||||
raise ValueError("UUID cannot be None")
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4)
|
||||
return uuid_obj.int
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid UUID input: {e}")
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid UUID input: {uuid_v4}") from e
|
||||
|
||||
|
||||
def convert_string_to_id(string: str | None) -> int:
|
||||
if not string:
|
||||
return generate_span_id()
|
||||
hash_bytes = hashlib.sha256(string.encode("utf-8")).digest()
|
||||
id = int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
|
||||
return id
|
||||
return int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
|
||||
|
||||
|
||||
def convert_to_span_id(uuid_v4: str | None, span_type: str) -> int:
|
||||
if uuid_v4 is None:
|
||||
raise ValueError("UUID cannot be None")
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid UUID input: {e}")
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid UUID input: {uuid_v4}") from e
|
||||
combined_key = f"{uuid_obj.hex}-{span_type}"
|
||||
return convert_string_to_id(combined_key)
|
||||
|
||||
|
|
@ -212,5 +226,11 @@ def convert_datetime_to_nanoseconds(start_time_a: datetime | None) -> int | None
|
|||
if start_time_a is None:
|
||||
return None
|
||||
timestamp_in_seconds = start_time_a.timestamp()
|
||||
timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9)
|
||||
return timestamp_in_nanoseconds
|
||||
return int(timestamp_in_seconds * 1e9)
|
||||
|
||||
|
||||
def build_endpoint(base_url: str, license_key: str) -> str:
|
||||
if "log.aliyuncs.com" in base_url: # cms2.0 endpoint
|
||||
return urljoin(base_url, f"adapt_{license_key}/api/v1/traces")
|
||||
else: # xtrace endpoint
|
||||
return urljoin(base_url, f"adapt_{license_key}/api/otlp/traces")
|
||||
|
|
|
|||
|
|
@ -1,18 +1,33 @@
|
|||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.sdk.trace import Event, Status, StatusCode
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
@dataclass
|
||||
class TraceMetadata:
|
||||
"""Metadata for trace operations, containing common attributes for all spans in a trace."""
|
||||
|
||||
trace_id: int
|
||||
workflow_span_id: int
|
||||
session_id: str
|
||||
user_id: str
|
||||
links: list[trace_api.Link]
|
||||
|
||||
|
||||
class SpanData(BaseModel):
|
||||
"""Data model for span information in Aliyun trace system."""
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
trace_id: int = Field(..., description="The unique identifier for the trace.")
|
||||
parent_span_id: int | None = Field(None, description="The ID of the parent span, if any.")
|
||||
span_id: int = Field(..., description="The unique identifier for this span.")
|
||||
name: str = Field(..., description="The name of the span.")
|
||||
attributes: dict[str, str] = Field(default_factory=dict, description="Attributes associated with the span.")
|
||||
attributes: dict[str, Any] = Field(default_factory=dict, description="Attributes associated with the span.")
|
||||
events: Sequence[Event] = Field(default_factory=list, description="Events recorded in the span.")
|
||||
links: Sequence[trace_api.Link] = Field(default_factory=list, description="Links to other spans.")
|
||||
status: Status = Field(default=Status(StatusCode.UNSET), description="The status of the span.")
|
||||
|
|
|
|||
|
|
@ -1,56 +1,37 @@
|
|||
from enum import StrEnum
|
||||
from typing import Final
|
||||
|
||||
# public
|
||||
GEN_AI_SESSION_ID = "gen_ai.session.id"
|
||||
# Public attributes
|
||||
GEN_AI_SESSION_ID: Final[str] = "gen_ai.session.id"
|
||||
GEN_AI_USER_ID: Final[str] = "gen_ai.user.id"
|
||||
GEN_AI_USER_NAME: Final[str] = "gen_ai.user.name"
|
||||
GEN_AI_SPAN_KIND: Final[str] = "gen_ai.span.kind"
|
||||
GEN_AI_FRAMEWORK: Final[str] = "gen_ai.framework"
|
||||
|
||||
GEN_AI_USER_ID = "gen_ai.user.id"
|
||||
# Chain attributes
|
||||
INPUT_VALUE: Final[str] = "input.value"
|
||||
OUTPUT_VALUE: Final[str] = "output.value"
|
||||
|
||||
GEN_AI_USER_NAME = "gen_ai.user.name"
|
||||
# Retriever attributes
|
||||
RETRIEVAL_QUERY: Final[str] = "retrieval.query"
|
||||
RETRIEVAL_DOCUMENT: Final[str] = "retrieval.document"
|
||||
|
||||
GEN_AI_SPAN_KIND = "gen_ai.span.kind"
|
||||
# LLM attributes
|
||||
GEN_AI_MODEL_NAME: Final[str] = "gen_ai.model_name"
|
||||
GEN_AI_SYSTEM: Final[str] = "gen_ai.system"
|
||||
GEN_AI_USAGE_INPUT_TOKENS: Final[str] = "gen_ai.usage.input_tokens"
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS: Final[str] = "gen_ai.usage.output_tokens"
|
||||
GEN_AI_USAGE_TOTAL_TOKENS: Final[str] = "gen_ai.usage.total_tokens"
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: Final[str] = "gen_ai.prompt_template.template"
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE: Final[str] = "gen_ai.prompt_template.variable"
|
||||
GEN_AI_PROMPT: Final[str] = "gen_ai.prompt"
|
||||
GEN_AI_COMPLETION: Final[str] = "gen_ai.completion"
|
||||
GEN_AI_RESPONSE_FINISH_REASON: Final[str] = "gen_ai.response.finish_reason"
|
||||
|
||||
GEN_AI_FRAMEWORK = "gen_ai.framework"
|
||||
|
||||
|
||||
# Chain
|
||||
INPUT_VALUE = "input.value"
|
||||
|
||||
OUTPUT_VALUE = "output.value"
|
||||
|
||||
|
||||
# Retriever
|
||||
RETRIEVAL_QUERY = "retrieval.query"
|
||||
|
||||
RETRIEVAL_DOCUMENT = "retrieval.document"
|
||||
|
||||
|
||||
# LLM
|
||||
GEN_AI_MODEL_NAME = "gen_ai.model_name"
|
||||
|
||||
GEN_AI_SYSTEM = "gen_ai.system"
|
||||
|
||||
GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens"
|
||||
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
|
||||
|
||||
GEN_AI_USAGE_TOTAL_TOKENS = "gen_ai.usage.total_tokens"
|
||||
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE = "gen_ai.prompt_template.template"
|
||||
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE = "gen_ai.prompt_template.variable"
|
||||
|
||||
GEN_AI_PROMPT = "gen_ai.prompt"
|
||||
|
||||
GEN_AI_COMPLETION = "gen_ai.completion"
|
||||
|
||||
GEN_AI_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reason"
|
||||
|
||||
# Tool
|
||||
TOOL_NAME = "tool.name"
|
||||
|
||||
TOOL_DESCRIPTION = "tool.description"
|
||||
|
||||
TOOL_PARAMETERS = "tool.parameters"
|
||||
# Tool attributes
|
||||
TOOL_NAME: Final[str] = "tool.name"
|
||||
TOOL_DESCRIPTION: Final[str] = "tool.description"
|
||||
TOOL_PARAMETERS: Final[str] = "tool.parameters"
|
||||
|
||||
|
||||
class GenAISpanKind(StrEnum):
|
||||
|
|
|
|||
|
|
@ -0,0 +1,95 @@
|
|||
import json
|
||||
from typing import Any
|
||||
|
||||
from opentelemetry.trace import Link, Status, StatusCode
|
||||
|
||||
from core.ops.aliyun_trace.entities.semconv import (
|
||||
GEN_AI_FRAMEWORK,
|
||||
GEN_AI_SESSION_ID,
|
||||
GEN_AI_SPAN_KIND,
|
||||
GEN_AI_USER_ID,
|
||||
INPUT_VALUE,
|
||||
OUTPUT_VALUE,
|
||||
GenAISpanKind,
|
||||
)
|
||||
from core.rag.models.document import Document
|
||||
from core.workflow.entities import WorkflowNodeExecution
|
||||
from core.workflow.enums import WorkflowNodeExecutionStatus
|
||||
from extensions.ext_database import db
|
||||
from models import EndUser
|
||||
|
||||
# Constants
|
||||
DEFAULT_JSON_ENSURE_ASCII = False
|
||||
DEFAULT_FRAMEWORK_NAME = "dify"
|
||||
|
||||
|
||||
def get_user_id_from_message_data(message_data) -> str:
|
||||
user_id = message_data.from_account_id
|
||||
if message_data.from_end_user_id:
|
||||
end_user_data: EndUser | None = (
|
||||
db.session.query(EndUser).where(EndUser.id == message_data.from_end_user_id).first()
|
||||
)
|
||||
if end_user_data is not None:
|
||||
user_id = end_user_data.session_id
|
||||
return user_id
|
||||
|
||||
|
||||
def create_status_from_error(error: str | None) -> Status:
|
||||
if error:
|
||||
return Status(StatusCode.ERROR, error)
|
||||
return Status(StatusCode.OK)
|
||||
|
||||
|
||||
def get_workflow_node_status(node_execution: WorkflowNodeExecution) -> Status:
|
||||
if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
||||
return Status(StatusCode.OK)
|
||||
if node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]:
|
||||
return Status(StatusCode.ERROR, str(node_execution.error))
|
||||
return Status(StatusCode.UNSET)
|
||||
|
||||
|
||||
def create_links_from_trace_id(trace_id: str | None) -> list[Link]:
|
||||
from core.ops.aliyun_trace.data_exporter.traceclient import create_link
|
||||
|
||||
links = []
|
||||
if trace_id:
|
||||
links.append(create_link(trace_id_str=trace_id))
|
||||
return links
|
||||
|
||||
|
||||
def extract_retrieval_documents(documents: list[Document]) -> list[dict[str, Any]]:
|
||||
documents_data = []
|
||||
for document in documents:
|
||||
document_data = {
|
||||
"content": document.page_content,
|
||||
"metadata": {
|
||||
"dataset_id": document.metadata.get("dataset_id"),
|
||||
"doc_id": document.metadata.get("doc_id"),
|
||||
"document_id": document.metadata.get("document_id"),
|
||||
},
|
||||
"score": document.metadata.get("score"),
|
||||
}
|
||||
documents_data.append(document_data)
|
||||
return documents_data
|
||||
|
||||
|
||||
def serialize_json_data(data: Any, ensure_ascii: bool = DEFAULT_JSON_ENSURE_ASCII) -> str:
|
||||
return json.dumps(data, ensure_ascii=ensure_ascii)
|
||||
|
||||
|
||||
def create_common_span_attributes(
|
||||
session_id: str = "",
|
||||
user_id: str = "",
|
||||
span_kind: str = GenAISpanKind.CHAIN,
|
||||
framework: str = DEFAULT_FRAMEWORK_NAME,
|
||||
inputs: str = "",
|
||||
outputs: str = "",
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
GEN_AI_SESSION_ID: session_id,
|
||||
GEN_AI_USER_ID: user_id,
|
||||
GEN_AI_SPAN_KIND: span_kind,
|
||||
GEN_AI_FRAMEWORK: framework,
|
||||
INPUT_VALUE: inputs,
|
||||
OUTPUT_VALUE: outputs,
|
||||
}
|
||||
|
|
@ -191,7 +191,8 @@ class AliyunConfig(BaseTracingConfig):
|
|||
@field_validator("endpoint")
|
||||
@classmethod
|
||||
def endpoint_validator(cls, v, info: ValidationInfo):
|
||||
return cls.validate_endpoint_url(v, "https://tracing-analysis-dc-hz.aliyuncs.com")
|
||||
# aliyun uses two URL formats, which may include a URL path
|
||||
return validate_url_with_path(v, "https://tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
|
||||
OPS_FILE_PATH = "ops_trace/"
|
||||
|
|
|
|||
|
|
@ -170,7 +170,6 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
|
|||
WORKFLOW_MAX_EXECUTION_STEPS=500
|
||||
WORKFLOW_MAX_EXECUTION_TIME=1200
|
||||
WORKFLOW_CALL_MAX_DEPTH=5
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
|
||||
MAX_VARIABLE_SIZE=204800
|
||||
|
||||
# App configuration
|
||||
|
|
|
|||
|
|
@ -40,8 +40,6 @@ def test_dify_config(monkeypatch: pytest.MonkeyPatch):
|
|||
# annotated field with configured value
|
||||
assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30
|
||||
|
||||
assert config.WORKFLOW_PARALLEL_DEPTH_LIMIT == 3
|
||||
|
||||
# values from pyproject.toml
|
||||
assert Version(config.project.version) >= Version("1.0.0")
|
||||
|
||||
|
|
|
|||
|
|
@ -329,20 +329,20 @@ class TestAliyunConfig:
|
|||
assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com"
|
||||
|
||||
def test_endpoint_validation_with_path(self):
|
||||
"""Test endpoint validation normalizes URL by removing path"""
|
||||
"""Test endpoint validation preserves path for Aliyun endpoints"""
|
||||
config = AliyunConfig(
|
||||
license_key="test_license", endpoint="https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces"
|
||||
)
|
||||
assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com"
|
||||
assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces"
|
||||
|
||||
def test_endpoint_validation_invalid_scheme(self):
|
||||
"""Test endpoint validation rejects invalid schemes"""
|
||||
with pytest.raises(ValidationError, match="URL scheme must be one of"):
|
||||
with pytest.raises(ValidationError, match="URL must start with https:// or http://"):
|
||||
AliyunConfig(license_key="test_license", endpoint="ftp://invalid.tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
def test_endpoint_validation_no_scheme(self):
|
||||
"""Test endpoint validation rejects URLs without scheme"""
|
||||
with pytest.raises(ValidationError, match="URL scheme must be one of"):
|
||||
with pytest.raises(ValidationError, match="URL must start with https:// or http://"):
|
||||
AliyunConfig(license_key="test_license", endpoint="invalid.tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
def test_license_key_required(self):
|
||||
|
|
@ -350,6 +350,23 @@ class TestAliyunConfig:
|
|||
with pytest.raises(ValidationError):
|
||||
AliyunConfig(license_key="", endpoint="https://tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
def test_valid_endpoint_format_examples(self):
|
||||
"""Test valid endpoint format examples from comments"""
|
||||
valid_endpoints = [
|
||||
# cms2.0 public endpoint
|
||||
"https://proj-xtrace-123456-cn-heyuan.cn-heyuan.log.aliyuncs.com/apm/trace/opentelemetry",
|
||||
# cms2.0 intranet endpoint
|
||||
"https://proj-xtrace-123456-cn-heyuan.cn-heyuan-intranet.log.aliyuncs.com/apm/trace/opentelemetry",
|
||||
# xtrace public endpoint
|
||||
"http://tracing-cn-heyuan.arms.aliyuncs.com",
|
||||
# xtrace intranet endpoint
|
||||
"http://tracing-cn-heyuan-internal.arms.aliyuncs.com",
|
||||
]
|
||||
|
||||
for endpoint in valid_endpoints:
|
||||
config = AliyunConfig(license_key="test_license", endpoint=endpoint)
|
||||
assert config.endpoint == endpoint
|
||||
|
||||
|
||||
class TestConfigIntegration:
|
||||
"""Integration tests for configuration classes"""
|
||||
|
|
@ -382,7 +399,7 @@ class TestConfigIntegration:
|
|||
assert arize_config.endpoint == "https://arize.com"
|
||||
assert phoenix_with_path_config.endpoint == "https://app.phoenix.arize.com/s/dify-integration"
|
||||
assert phoenix_without_path_config.endpoint == "https://app.phoenix.arize.com"
|
||||
assert aliyun_config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com"
|
||||
assert aliyun_config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces"
|
||||
|
||||
def test_project_default_values(self):
|
||||
"""Test that project default values are set correctly"""
|
||||
|
|
|
|||
|
|
@ -881,7 +881,6 @@ WORKFLOW_MAX_EXECUTION_STEPS=500
|
|||
WORKFLOW_MAX_EXECUTION_TIME=1200
|
||||
WORKFLOW_CALL_MAX_DEPTH=5
|
||||
MAX_VARIABLE_SIZE=204800
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
|
||||
WORKFLOW_FILE_UPLOAD_LIMIT=10
|
||||
|
||||
# GraphEngine Worker Pool Configuration
|
||||
|
|
|
|||
|
|
@ -402,7 +402,6 @@ x-shared-env: &shared-api-worker-env
|
|||
WORKFLOW_MAX_EXECUTION_TIME: ${WORKFLOW_MAX_EXECUTION_TIME:-1200}
|
||||
WORKFLOW_CALL_MAX_DEPTH: ${WORKFLOW_CALL_MAX_DEPTH:-5}
|
||||
MAX_VARIABLE_SIZE: ${MAX_VARIABLE_SIZE:-204800}
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT: ${WORKFLOW_PARALLEL_DEPTH_LIMIT:-3}
|
||||
WORKFLOW_FILE_UPLOAD_LIMIT: ${WORKFLOW_FILE_UPLOAD_LIMIT:-10}
|
||||
GRAPH_ENGINE_MIN_WORKERS: ${GRAPH_ENGINE_MIN_WORKERS:-1}
|
||||
GRAPH_ENGINE_MAX_WORKERS: ${GRAPH_ENGINE_MAX_WORKERS:-10}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import {
|
||||
createContext,
|
||||
useContext,
|
||||
useEffect,
|
||||
useRef,
|
||||
} from 'react'
|
||||
import {
|
||||
|
|
@ -18,13 +19,11 @@ type Shape = {
|
|||
|
||||
export const createFileStore = (
|
||||
value: FileEntity[] = [],
|
||||
onChange?: (files: FileEntity[]) => void,
|
||||
) => {
|
||||
return create<Shape>(set => ({
|
||||
files: value ? [...value] : [],
|
||||
setFiles: (files) => {
|
||||
set({ files })
|
||||
onChange?.(files)
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
|
@ -55,9 +54,35 @@ export const FileContextProvider = ({
|
|||
onChange,
|
||||
}: FileProviderProps) => {
|
||||
const storeRef = useRef<FileStore | undefined>(undefined)
|
||||
const onChangeRef = useRef<FileProviderProps['onChange']>(onChange)
|
||||
const isSyncingRef = useRef(false)
|
||||
|
||||
if (!storeRef.current)
|
||||
storeRef.current = createFileStore(value, onChange)
|
||||
storeRef.current = createFileStore(value)
|
||||
|
||||
// keep latest onChange
|
||||
useEffect(() => {
|
||||
onChangeRef.current = onChange
|
||||
}, [onChange])
|
||||
|
||||
// subscribe to store changes and call latest onChange
|
||||
useEffect(() => {
|
||||
const store = storeRef.current!
|
||||
const unsubscribe = store.subscribe((state: Shape) => {
|
||||
if (isSyncingRef.current) return
|
||||
onChangeRef.current?.(state.files)
|
||||
})
|
||||
return unsubscribe
|
||||
}, [])
|
||||
|
||||
// sync external value into internal store when value changes
|
||||
useEffect(() => {
|
||||
const store = storeRef.current!
|
||||
const nextFiles = value ? [...value] : []
|
||||
isSyncingRef.current = true
|
||||
store.setState({ files: nextFiles })
|
||||
isSyncingRef.current = false
|
||||
}, [value])
|
||||
|
||||
return (
|
||||
<FileContext.Provider value={storeRef.current}>
|
||||
|
|
|
|||
|
|
@ -14,16 +14,6 @@ export const usePipelineConfig = () => {
|
|||
const pipelineId = useStore(s => s.pipelineId)
|
||||
const workflowStore = useWorkflowStore()
|
||||
|
||||
const handleUpdateWorkflowConfig = useCallback((config: Record<string, any>) => {
|
||||
const { setWorkflowConfig } = workflowStore.getState()
|
||||
|
||||
setWorkflowConfig(config)
|
||||
}, [workflowStore])
|
||||
useWorkflowConfig(
|
||||
pipelineId ? `/rag/pipelines/${pipelineId}/workflows/draft/config` : '',
|
||||
handleUpdateWorkflowConfig,
|
||||
)
|
||||
|
||||
const handleUpdateNodesDefaultConfigs = useCallback((nodesDefaultConfigs: Record<string, any> | Record<string, any>[]) => {
|
||||
const { setNodesDefaultConfigs } = workflowStore.getState()
|
||||
let res: Record<string, any> = {}
|
||||
|
|
|
|||
|
|
@ -33,13 +33,6 @@ export const useWorkflowInit = () => {
|
|||
workflowStore.setState({ appId: appDetail.id, appName: appDetail.name })
|
||||
}, [appDetail.id, workflowStore])
|
||||
|
||||
const handleUpdateWorkflowConfig = useCallback((config: Record<string, any>) => {
|
||||
const { setWorkflowConfig } = workflowStore.getState()
|
||||
|
||||
setWorkflowConfig(config)
|
||||
}, [workflowStore])
|
||||
useWorkflowConfig(`/apps/${appDetail.id}/workflows/draft/config`, handleUpdateWorkflowConfig)
|
||||
|
||||
const handleUpdateWorkflowFileUploadConfig = useCallback((config: FileUploadConfigResponse) => {
|
||||
const { setFileUploadConfig } = workflowStore.getState()
|
||||
setFileUploadConfig(config)
|
||||
|
|
|
|||
|
|
@ -35,8 +35,6 @@ export const NODE_LAYOUT_HORIZONTAL_PADDING = 60
|
|||
export const NODE_LAYOUT_VERTICAL_PADDING = 60
|
||||
export const NODE_LAYOUT_MIN_DISTANCE = 100
|
||||
|
||||
export const PARALLEL_DEPTH_LIMIT = 3
|
||||
|
||||
export const RETRIEVAL_OUTPUT_STRUCT = `{
|
||||
"content": "",
|
||||
"title": "",
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ export const useNodesInteractions = () => {
|
|||
const reactflow = useReactFlow()
|
||||
const { store: workflowHistoryStore } = useWorkflowHistoryStore()
|
||||
const { handleSyncWorkflowDraft } = useNodesSyncDraft()
|
||||
const { checkNestedParallelLimit, getAfterNodesInSameBranch } = useWorkflow()
|
||||
const { getAfterNodesInSameBranch } = useWorkflow()
|
||||
const { getNodesReadOnly } = useNodesReadOnly()
|
||||
const { getWorkflowReadOnly } = useWorkflowReadOnly()
|
||||
const { handleSetHelpline } = useHelpline()
|
||||
|
|
@ -438,21 +438,13 @@ export const useNodesInteractions = () => {
|
|||
draft.push(newEdge)
|
||||
})
|
||||
|
||||
if (checkNestedParallelLimit(newNodes, newEdges, targetNode)) {
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
|
||||
handleSyncWorkflowDraft()
|
||||
saveStateToHistory(WorkflowHistoryEvent.NodeConnect, {
|
||||
nodeId: targetNode?.id,
|
||||
})
|
||||
}
|
||||
else {
|
||||
const { setConnectingNodePayload, setEnteringNodePayload }
|
||||
= workflowStore.getState()
|
||||
setConnectingNodePayload(undefined)
|
||||
setEnteringNodePayload(undefined)
|
||||
}
|
||||
handleSyncWorkflowDraft()
|
||||
saveStateToHistory(WorkflowHistoryEvent.NodeConnect, {
|
||||
nodeId: targetNode?.id,
|
||||
})
|
||||
},
|
||||
[
|
||||
getNodesReadOnly,
|
||||
|
|
@ -460,7 +452,6 @@ export const useNodesInteractions = () => {
|
|||
workflowStore,
|
||||
handleSyncWorkflowDraft,
|
||||
saveStateToHistory,
|
||||
checkNestedParallelLimit,
|
||||
],
|
||||
)
|
||||
|
||||
|
|
@ -936,13 +927,8 @@ export const useNodesInteractions = () => {
|
|||
if (newEdge) draft.push(newEdge)
|
||||
})
|
||||
|
||||
if (checkNestedParallelLimit(newNodes, newEdges, prevNode)) {
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
}
|
||||
else {
|
||||
return false
|
||||
}
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
}
|
||||
if (!prevNodeId && nextNodeId) {
|
||||
const nextNodeIndex = nodes.findIndex(node => node.id === nextNodeId)
|
||||
|
|
@ -1089,17 +1075,11 @@ export const useNodesInteractions = () => {
|
|||
draft.push(newEdge)
|
||||
})
|
||||
|
||||
if (checkNestedParallelLimit(newNodes, newEdges, nextNode)) {
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
}
|
||||
else {
|
||||
return false
|
||||
}
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
}
|
||||
else {
|
||||
if (checkNestedParallelLimit(newNodes, edges)) setNodes(newNodes)
|
||||
else return false
|
||||
setNodes(newNodes)
|
||||
}
|
||||
}
|
||||
if (prevNodeId && nextNodeId) {
|
||||
|
|
@ -1299,7 +1279,6 @@ export const useNodesInteractions = () => {
|
|||
saveStateToHistory,
|
||||
workflowStore,
|
||||
getAfterNodesInSameBranch,
|
||||
checkNestedParallelLimit,
|
||||
nodesMetaDataMap,
|
||||
],
|
||||
)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ import {
|
|||
useCallback,
|
||||
} from 'react'
|
||||
import { uniqBy } from 'lodash-es'
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import {
|
||||
getIncomers,
|
||||
getOutgoers,
|
||||
|
|
@ -24,13 +23,11 @@ import {
|
|||
useStore,
|
||||
useWorkflowStore,
|
||||
} from '../store'
|
||||
import { getParallelInfo } from '../utils'
|
||||
import {
|
||||
getWorkflowEntryNode,
|
||||
isWorkflowEntryNode,
|
||||
} from '../utils/workflow-entry'
|
||||
import {
|
||||
PARALLEL_DEPTH_LIMIT,
|
||||
SUPPORT_OUTPUT_VARS_NODE,
|
||||
} from '../constants'
|
||||
import type { IterationNodeType } from '../nodes/iteration/types'
|
||||
|
|
@ -48,7 +45,6 @@ import {
|
|||
import { CUSTOM_ITERATION_START_NODE } from '@/app/components/workflow/nodes/iteration-start/constants'
|
||||
import { CUSTOM_LOOP_START_NODE } from '@/app/components/workflow/nodes/loop-start/constants'
|
||||
import { basePath } from '@/utils/var'
|
||||
import { MAX_PARALLEL_LIMIT } from '@/config'
|
||||
import { useNodesMetaData } from '.'
|
||||
|
||||
export const useIsChatMode = () => {
|
||||
|
|
@ -58,9 +54,7 @@ export const useIsChatMode = () => {
|
|||
}
|
||||
|
||||
export const useWorkflow = () => {
|
||||
const { t } = useTranslation()
|
||||
const store = useStoreApi()
|
||||
const workflowStore = useWorkflowStore()
|
||||
const { getAvailableBlocks } = useAvailableBlocks()
|
||||
const { nodesMap } = useNodesMetaData()
|
||||
|
||||
|
|
@ -322,20 +316,6 @@ export const useWorkflow = () => {
|
|||
return isUsed
|
||||
}, [isVarUsedInNodes])
|
||||
|
||||
const checkParallelLimit = useCallback((nodeId: string, nodeHandle = 'source') => {
|
||||
const {
|
||||
edges,
|
||||
} = store.getState()
|
||||
const connectedEdges = edges.filter(edge => edge.source === nodeId && edge.sourceHandle === nodeHandle)
|
||||
if (connectedEdges.length > MAX_PARALLEL_LIMIT - 1) {
|
||||
const { setShowTips } = workflowStore.getState()
|
||||
setShowTips(t('workflow.common.parallelTip.limit', { num: MAX_PARALLEL_LIMIT }))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}, [store, workflowStore, t])
|
||||
|
||||
const getRootNodesById = useCallback((nodeId: string) => {
|
||||
const {
|
||||
getNodes,
|
||||
|
|
@ -406,33 +386,6 @@ export const useWorkflow = () => {
|
|||
return startNodes
|
||||
}, [nodesMap, getRootNodesById])
|
||||
|
||||
const checkNestedParallelLimit = useCallback((nodes: Node[], edges: Edge[], targetNode?: Node) => {
|
||||
const startNodes = getStartNodes(nodes, targetNode)
|
||||
|
||||
for (let i = 0; i < startNodes.length; i++) {
|
||||
const {
|
||||
parallelList,
|
||||
hasAbnormalEdges,
|
||||
} = getParallelInfo(startNodes[i], nodes, edges)
|
||||
const { workflowConfig } = workflowStore.getState()
|
||||
|
||||
if (hasAbnormalEdges)
|
||||
return false
|
||||
|
||||
for (let i = 0; i < parallelList.length; i++) {
|
||||
const parallel = parallelList[i]
|
||||
|
||||
if (parallel.depth > (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT)) {
|
||||
const { setShowTips } = workflowStore.getState()
|
||||
setShowTips(t('workflow.common.parallelTip.depthLimit', { num: (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT) }))
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}, [t, workflowStore, getStartNodes])
|
||||
|
||||
const isValidConnection = useCallback(({ source, sourceHandle, target }: Connection) => {
|
||||
const {
|
||||
edges,
|
||||
|
|
@ -442,9 +395,6 @@ export const useWorkflow = () => {
|
|||
const sourceNode: Node = nodes.find(node => node.id === source)!
|
||||
const targetNode: Node = nodes.find(node => node.id === target)!
|
||||
|
||||
if (!checkParallelLimit(source!, sourceHandle || 'source'))
|
||||
return false
|
||||
|
||||
if (sourceNode.type === CUSTOM_NOTE_NODE || targetNode.type === CUSTOM_NOTE_NODE)
|
||||
return false
|
||||
|
||||
|
|
@ -477,7 +427,7 @@ export const useWorkflow = () => {
|
|||
}
|
||||
|
||||
return !hasCycle(targetNode)
|
||||
}, [store, checkParallelLimit, getAvailableBlocks])
|
||||
}, [store, getAvailableBlocks])
|
||||
|
||||
const getNode = useCallback((nodeId?: string) => {
|
||||
const { getNodes } = store.getState()
|
||||
|
|
@ -496,8 +446,6 @@ export const useWorkflow = () => {
|
|||
isVarUsedInNodes,
|
||||
removeUsedVarInNodes,
|
||||
isNodeVarsUsedInNodes,
|
||||
checkParallelLimit,
|
||||
checkNestedParallelLimit,
|
||||
isValidConnection,
|
||||
getBeforeNodeById,
|
||||
getIterationNodeChildren,
|
||||
|
|
|
|||
|
|
@ -71,7 +71,6 @@ import PanelContextmenu from './panel-contextmenu'
|
|||
import NodeContextmenu from './node-contextmenu'
|
||||
import SelectionContextmenu from './selection-contextmenu'
|
||||
import SyncingDataModal from './syncing-data-modal'
|
||||
import LimitTips from './limit-tips'
|
||||
import { setupScrollToNodeListener } from './utils/node-navigation'
|
||||
import {
|
||||
useStore,
|
||||
|
|
@ -378,7 +377,6 @@ export const Workflow: FC<WorkflowProps> = memo(({
|
|||
/>
|
||||
)
|
||||
}
|
||||
<LimitTips />
|
||||
{children}
|
||||
<ReactFlow
|
||||
nodeTypes={nodeTypes}
|
||||
|
|
|
|||
|
|
@ -1,39 +0,0 @@
|
|||
import {
|
||||
RiAlertFill,
|
||||
RiCloseLine,
|
||||
} from '@remixicon/react'
|
||||
import { useStore } from './store'
|
||||
import ActionButton from '@/app/components/base/action-button'
|
||||
|
||||
const LimitTips = () => {
|
||||
const showTips = useStore(s => s.showTips)
|
||||
const setShowTips = useStore(s => s.setShowTips)
|
||||
|
||||
if (!showTips)
|
||||
return null
|
||||
|
||||
return (
|
||||
<div className='absolute bottom-16 left-1/2 z-[9] flex h-10 -translate-x-1/2 items-center rounded-xl border border-components-panel-border bg-components-panel-bg-blur p-2 shadow-md'>
|
||||
<div
|
||||
className='absolute inset-0 rounded-xl opacity-[0.4]'
|
||||
style={{
|
||||
background: 'linear-gradient(92deg, rgba(247, 144, 9, 0.25) 0%, rgba(255, 255, 255, 0.00) 100%)',
|
||||
}}
|
||||
></div>
|
||||
<div className='flex h-5 w-5 items-center justify-center'>
|
||||
<RiAlertFill className='h-4 w-4 text-text-warning-secondary' />
|
||||
</div>
|
||||
<div className='system-xs-medium mx-1 px-1 text-text-primary'>
|
||||
{showTips}
|
||||
</div>
|
||||
<ActionButton
|
||||
className='z-[1]'
|
||||
onClick={() => setShowTips('')}
|
||||
>
|
||||
<RiCloseLine className='h-4 w-4' />
|
||||
</ActionButton>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
export default LimitTips
|
||||
|
|
@ -12,7 +12,6 @@ import {
|
|||
useAvailableBlocks,
|
||||
useNodesInteractions,
|
||||
useNodesReadOnly,
|
||||
useWorkflow,
|
||||
} from '@/app/components/workflow/hooks'
|
||||
import BlockSelector from '@/app/components/workflow/block-selector'
|
||||
import type {
|
||||
|
|
@ -39,7 +38,6 @@ const Add = ({
|
|||
const { handleNodeAdd } = useNodesInteractions()
|
||||
const { nodesReadOnly } = useNodesReadOnly()
|
||||
const { availableNextBlocks } = useAvailableBlocks(nodeData.type, nodeData.isInIteration || nodeData.isInLoop)
|
||||
const { checkParallelLimit } = useWorkflow()
|
||||
|
||||
const handleSelect = useCallback<OnSelectBlock>((type, toolDefaultValue) => {
|
||||
handleNodeAdd(
|
||||
|
|
@ -52,14 +50,11 @@ const Add = ({
|
|||
prevNodeSourceHandle: sourceHandle,
|
||||
},
|
||||
)
|
||||
}, [nodeId, sourceHandle, handleNodeAdd])
|
||||
}, [handleNodeAdd])
|
||||
|
||||
const handleOpenChange = useCallback((newOpen: boolean) => {
|
||||
if (newOpen && !checkParallelLimit(nodeId, sourceHandle))
|
||||
return
|
||||
|
||||
setOpen(newOpen)
|
||||
}, [checkParallelLimit, nodeId, sourceHandle])
|
||||
}, [])
|
||||
|
||||
const tip = useMemo(() => {
|
||||
if (isFailBranch)
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import {
|
|||
useIsChatMode,
|
||||
useNodesInteractions,
|
||||
useNodesReadOnly,
|
||||
useWorkflow,
|
||||
} from '../../../hooks'
|
||||
import {
|
||||
useStore,
|
||||
|
|
@ -132,7 +131,6 @@ export const NodeSourceHandle = memo(({
|
|||
const { availableNextBlocks } = useAvailableBlocks(data.type, data.isInIteration || data.isInLoop)
|
||||
const isConnectable = !!availableNextBlocks.length
|
||||
const isChatMode = useIsChatMode()
|
||||
const { checkParallelLimit } = useWorkflow()
|
||||
|
||||
const connected = data._connectedSourceHandleIds?.includes(handleId)
|
||||
const handleOpenChange = useCallback((v: boolean) => {
|
||||
|
|
@ -140,9 +138,8 @@ export const NodeSourceHandle = memo(({
|
|||
}, [])
|
||||
const handleHandleClick = useCallback((e: MouseEvent) => {
|
||||
e.stopPropagation()
|
||||
if (checkParallelLimit(id, handleId))
|
||||
setOpen(v => !v)
|
||||
}, [checkParallelLimit, id, handleId])
|
||||
setOpen(v => !v)
|
||||
}, [])
|
||||
const handleSelect = useCallback((type: BlockEnum, toolDefaultValue?: ToolDefaultValue) => {
|
||||
handleNodeAdd(
|
||||
{
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ import type { WebhookTriggerNodeType } from '@/app/components/workflow/nodes/tri
|
|||
|
||||
import {
|
||||
AGENT_OUTPUT_STRUCT,
|
||||
FILE_STRUCT,
|
||||
HTTP_REQUEST_OUTPUT_STRUCT,
|
||||
KNOWLEDGE_RETRIEVAL_OUTPUT_STRUCT,
|
||||
LLM_OUTPUT_STRUCT,
|
||||
|
|
@ -139,6 +140,10 @@ export const varTypeToStructType = (type: VarType): Type => {
|
|||
[VarType.boolean]: Type.boolean,
|
||||
[VarType.object]: Type.object,
|
||||
[VarType.array]: Type.array,
|
||||
[VarType.arrayString]: Type.array,
|
||||
[VarType.arrayNumber]: Type.array,
|
||||
[VarType.arrayObject]: Type.array,
|
||||
[VarType.arrayFile]: Type.array,
|
||||
} as any
|
||||
)[type] || Type.string
|
||||
)
|
||||
|
|
@ -283,15 +288,6 @@ const findExceptVarInObject = (
|
|||
children: filteredObj.children,
|
||||
}
|
||||
})
|
||||
|
||||
if (isFile && Array.isArray(childrenResult)) {
|
||||
if (childrenResult.length === 0) {
|
||||
childrenResult = OUTPUT_FILE_SUB_VARIABLES.map(key => ({
|
||||
variable: key,
|
||||
type: key === 'size' ? VarType.number : VarType.string,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
childrenResult = []
|
||||
|
|
@ -607,17 +603,15 @@ const formatItem = (
|
|||
variable: outputKey,
|
||||
type:
|
||||
output.type === 'array'
|
||||
? (`Array[${
|
||||
output.items?.type
|
||||
? output.items.type.slice(0, 1).toLocaleUpperCase()
|
||||
+ output.items.type.slice(1)
|
||||
: 'Unknown'
|
||||
? (`Array[${output.items?.type
|
||||
? output.items.type.slice(0, 1).toLocaleUpperCase()
|
||||
+ output.items.type.slice(1)
|
||||
: 'Unknown'
|
||||
}]` as VarType)
|
||||
: (`${
|
||||
output.type
|
||||
? output.type.slice(0, 1).toLocaleUpperCase()
|
||||
+ output.type.slice(1)
|
||||
: 'Unknown'
|
||||
: (`${output.type
|
||||
? output.type.slice(0, 1).toLocaleUpperCase()
|
||||
+ output.type.slice(1)
|
||||
: 'Unknown'
|
||||
}` as VarType),
|
||||
})
|
||||
},
|
||||
|
|
@ -711,9 +705,10 @@ const formatItem = (
|
|||
const children = (() => {
|
||||
if (isFile) {
|
||||
return OUTPUT_FILE_SUB_VARIABLES.map((key) => {
|
||||
const def = FILE_STRUCT.find(c => c.variable === key)
|
||||
return {
|
||||
variable: key,
|
||||
type: key === 'size' ? VarType.number : VarType.string,
|
||||
type: def?.type || VarType.string,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -735,9 +730,10 @@ const formatItem = (
|
|||
if (isFile) {
|
||||
return {
|
||||
children: OUTPUT_FILE_SUB_VARIABLES.map((key) => {
|
||||
const def = FILE_STRUCT.find(c => c.variable === key)
|
||||
return {
|
||||
variable: key,
|
||||
type: key === 'size' ? VarType.number : VarType.string,
|
||||
type: def?.type || VarType.string,
|
||||
}
|
||||
}),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import { Type } from '../../../llm/types'
|
|||
import PickerStructurePanel from '@/app/components/workflow/nodes/_base/components/variable/object-child-tree-panel/picker'
|
||||
import { isSpecialVar, varTypeToStructType } from './utils'
|
||||
import type { Field } from '@/app/components/workflow/nodes/llm/types'
|
||||
import { FILE_STRUCT } from '@/app/components/workflow/constants'
|
||||
import { noop } from 'lodash-es'
|
||||
import { CodeAssistant, MagicEdit } from '@/app/components/base/icons/src/vender/line/general'
|
||||
import ManageInputField from './manage-input-field'
|
||||
|
|
@ -106,8 +105,9 @@ const Item: FC<ItemProps> = ({
|
|||
|
||||
const objStructuredOutput: StructuredOutput | null = useMemo(() => {
|
||||
if (!isObj) return null
|
||||
const properties: Record<string, Field> = {};
|
||||
(isFile ? FILE_STRUCT : (itemData.children as Var[])).forEach((c) => {
|
||||
const properties: Record<string, Field> = {}
|
||||
const childrenVars = (itemData.children as Var[]) || []
|
||||
childrenVars.forEach((c) => {
|
||||
properties[c.variable] = {
|
||||
type: varTypeToStructType(c.type),
|
||||
}
|
||||
|
|
@ -120,7 +120,7 @@ const Item: FC<ItemProps> = ({
|
|||
additionalProperties: false,
|
||||
},
|
||||
}
|
||||
}, [isFile, isObj, itemData.children])
|
||||
}, [isObj, itemData.children])
|
||||
|
||||
const structuredOutput = (() => {
|
||||
if (isStructureOutput)
|
||||
|
|
@ -448,4 +448,5 @@ const VarReferenceVars: FC<Props> = ({
|
|||
</>
|
||||
)
|
||||
}
|
||||
|
||||
export default React.memo(VarReferenceVars)
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ const Panel: FC<NodePanelProps<ListFilterNodeType>> = ({
|
|||
value={inputs.variable || []}
|
||||
onChange={handleVarChanges}
|
||||
filterVar={filterVar}
|
||||
isSupportFileVar={false}
|
||||
typePlaceHolder='Array'
|
||||
/>
|
||||
</Field>
|
||||
|
|
|
|||
|
|
@ -29,10 +29,6 @@ export type WorkflowSliceShape = {
|
|||
setControlPromptEditorRerenderKey: (controlPromptEditorRerenderKey: number) => void
|
||||
showImportDSLModal: boolean
|
||||
setShowImportDSLModal: (showImportDSLModal: boolean) => void
|
||||
showTips: string
|
||||
setShowTips: (showTips: string) => void
|
||||
workflowConfig?: Record<string, any>
|
||||
setWorkflowConfig: (workflowConfig: Record<string, any>) => void
|
||||
fileUploadConfig?: FileUploadConfigResponse
|
||||
setFileUploadConfig: (fileUploadConfig: FileUploadConfigResponse) => void
|
||||
}
|
||||
|
|
@ -59,10 +55,6 @@ export const createWorkflowSlice: StateCreator<WorkflowSliceShape> = set => ({
|
|||
setControlPromptEditorRerenderKey: controlPromptEditorRerenderKey => set(() => ({ controlPromptEditorRerenderKey })),
|
||||
showImportDSLModal: false,
|
||||
setShowImportDSLModal: showImportDSLModal => set(() => ({ showImportDSLModal })),
|
||||
showTips: '',
|
||||
setShowTips: showTips => set(() => ({ showTips })),
|
||||
workflowConfig: undefined,
|
||||
setWorkflowConfig: workflowConfig => set(() => ({ workflowConfig })),
|
||||
fileUploadConfig: undefined,
|
||||
setFileUploadConfig: fileUploadConfig => set(() => ({ fileUploadConfig })),
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,12 +1,8 @@
|
|||
import {
|
||||
getConnectedEdges,
|
||||
getIncomers,
|
||||
getOutgoers,
|
||||
} from 'reactflow'
|
||||
import { v4 as uuid4 } from 'uuid'
|
||||
import {
|
||||
groupBy,
|
||||
isEqual,
|
||||
uniqBy,
|
||||
} from 'lodash-es'
|
||||
import type {
|
||||
|
|
@ -182,158 +178,6 @@ export const changeNodesAndEdgesId = (nodes: Node[], edges: Edge[]) => {
|
|||
return [newNodes, newEdges] as [Node[], Edge[]]
|
||||
}
|
||||
|
||||
type ParallelInfoItem = {
|
||||
parallelNodeId: string
|
||||
depth: number
|
||||
isBranch?: boolean
|
||||
}
|
||||
type NodeParallelInfo = {
|
||||
parallelNodeId: string
|
||||
edgeHandleId: string
|
||||
depth: number
|
||||
}
|
||||
type NodeHandle = {
|
||||
node: Node
|
||||
handle: string
|
||||
}
|
||||
type NodeStreamInfo = {
|
||||
upstreamNodes: Set<string>
|
||||
downstreamEdges: Set<string>
|
||||
}
|
||||
export const getParallelInfo = (startNode: Node, nodes: Node[], edges: Edge[]) => {
|
||||
if (!startNode)
|
||||
throw new Error('Start node not found')
|
||||
|
||||
const parallelList = [] as ParallelInfoItem[]
|
||||
const nextNodeHandles = [{ node: startNode, handle: 'source' }]
|
||||
let hasAbnormalEdges = false
|
||||
|
||||
const traverse = (firstNodeHandle: NodeHandle) => {
|
||||
const nodeEdgesSet = {} as Record<string, Set<string>>
|
||||
const totalEdgesSet = new Set<string>()
|
||||
const nextHandles = [firstNodeHandle]
|
||||
const streamInfo = {} as Record<string, NodeStreamInfo>
|
||||
const parallelListItem = {
|
||||
parallelNodeId: '',
|
||||
depth: 0,
|
||||
} as ParallelInfoItem
|
||||
const nodeParallelInfoMap = {} as Record<string, NodeParallelInfo>
|
||||
nodeParallelInfoMap[firstNodeHandle.node.id] = {
|
||||
parallelNodeId: '',
|
||||
edgeHandleId: '',
|
||||
depth: 0,
|
||||
}
|
||||
|
||||
while (nextHandles.length) {
|
||||
const currentNodeHandle = nextHandles.shift()!
|
||||
const { node: currentNode, handle: currentHandle = 'source' } = currentNodeHandle
|
||||
const currentNodeHandleKey = currentNode.id
|
||||
const connectedEdges = edges.filter(edge => edge.source === currentNode.id && edge.sourceHandle === currentHandle)
|
||||
const connectedEdgesLength = connectedEdges.length
|
||||
const outgoers = nodes.filter(node => connectedEdges.some(edge => edge.target === node.id))
|
||||
const incomers = getIncomers(currentNode, nodes, edges)
|
||||
|
||||
if (!streamInfo[currentNodeHandleKey]) {
|
||||
streamInfo[currentNodeHandleKey] = {
|
||||
upstreamNodes: new Set<string>(),
|
||||
downstreamEdges: new Set<string>(),
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeEdgesSet[currentNodeHandleKey]?.size > 0 && incomers.length > 1) {
|
||||
const newSet = new Set<string>()
|
||||
for (const item of totalEdgesSet) {
|
||||
if (!streamInfo[currentNodeHandleKey].downstreamEdges.has(item))
|
||||
newSet.add(item)
|
||||
}
|
||||
if (isEqual(nodeEdgesSet[currentNodeHandleKey], newSet)) {
|
||||
parallelListItem.depth = nodeParallelInfoMap[currentNode.id].depth
|
||||
nextNodeHandles.push({ node: currentNode, handle: currentHandle })
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeParallelInfoMap[currentNode.id].depth > parallelListItem.depth)
|
||||
parallelListItem.depth = nodeParallelInfoMap[currentNode.id].depth
|
||||
|
||||
outgoers.forEach((outgoer) => {
|
||||
const outgoerConnectedEdges = getConnectedEdges([outgoer], edges).filter(edge => edge.source === outgoer.id)
|
||||
const sourceEdgesGroup = groupBy(outgoerConnectedEdges, 'sourceHandle')
|
||||
const incomers = getIncomers(outgoer, nodes, edges)
|
||||
|
||||
if (outgoers.length > 1 && incomers.length > 1)
|
||||
hasAbnormalEdges = true
|
||||
|
||||
Object.keys(sourceEdgesGroup).forEach((sourceHandle) => {
|
||||
nextHandles.push({ node: outgoer, handle: sourceHandle })
|
||||
})
|
||||
if (!outgoerConnectedEdges.length)
|
||||
nextHandles.push({ node: outgoer, handle: 'source' })
|
||||
|
||||
const outgoerKey = outgoer.id
|
||||
if (!nodeEdgesSet[outgoerKey])
|
||||
nodeEdgesSet[outgoerKey] = new Set<string>()
|
||||
|
||||
if (nodeEdgesSet[currentNodeHandleKey]) {
|
||||
for (const item of nodeEdgesSet[currentNodeHandleKey])
|
||||
nodeEdgesSet[outgoerKey].add(item)
|
||||
}
|
||||
|
||||
if (!streamInfo[outgoerKey]) {
|
||||
streamInfo[outgoerKey] = {
|
||||
upstreamNodes: new Set<string>(),
|
||||
downstreamEdges: new Set<string>(),
|
||||
}
|
||||
}
|
||||
|
||||
if (!nodeParallelInfoMap[outgoer.id]) {
|
||||
nodeParallelInfoMap[outgoer.id] = {
|
||||
...nodeParallelInfoMap[currentNode.id],
|
||||
}
|
||||
}
|
||||
|
||||
if (connectedEdgesLength > 1) {
|
||||
const edge = connectedEdges.find(edge => edge.target === outgoer.id)!
|
||||
nodeEdgesSet[outgoerKey].add(edge.id)
|
||||
totalEdgesSet.add(edge.id)
|
||||
|
||||
streamInfo[currentNodeHandleKey].downstreamEdges.add(edge.id)
|
||||
streamInfo[outgoerKey].upstreamNodes.add(currentNodeHandleKey)
|
||||
|
||||
for (const item of streamInfo[currentNodeHandleKey].upstreamNodes)
|
||||
streamInfo[item].downstreamEdges.add(edge.id)
|
||||
|
||||
if (!parallelListItem.parallelNodeId)
|
||||
parallelListItem.parallelNodeId = currentNode.id
|
||||
|
||||
const prevDepth = nodeParallelInfoMap[currentNode.id].depth + 1
|
||||
const currentDepth = nodeParallelInfoMap[outgoer.id].depth
|
||||
|
||||
nodeParallelInfoMap[outgoer.id].depth = Math.max(prevDepth, currentDepth)
|
||||
}
|
||||
else {
|
||||
for (const item of streamInfo[currentNodeHandleKey].upstreamNodes)
|
||||
streamInfo[outgoerKey].upstreamNodes.add(item)
|
||||
|
||||
nodeParallelInfoMap[outgoer.id].depth = nodeParallelInfoMap[currentNode.id].depth
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
parallelList.push(parallelListItem)
|
||||
}
|
||||
|
||||
while (nextNodeHandles.length) {
|
||||
const nodeHandle = nextNodeHandles.shift()!
|
||||
traverse(nodeHandle)
|
||||
}
|
||||
|
||||
return {
|
||||
parallelList,
|
||||
hasAbnormalEdges,
|
||||
}
|
||||
}
|
||||
|
||||
export const hasErrorHandleNode = (nodeType?: BlockEnum) => {
|
||||
return nodeType === BlockEnum.LLM || nodeType === BlockEnum.Tool || nodeType === BlockEnum.HttpRequest || nodeType === BlockEnum.Code
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,12 +4,12 @@ const translation = {
|
|||
title: '空白の知識パイプライン',
|
||||
description: 'データ処理と構造を完全に制御できるカスタムパイプラインをゼロから作成します。',
|
||||
},
|
||||
backToKnowledge: '知識に戻る',
|
||||
backToKnowledge: 'ナレッジベースに戻る',
|
||||
caution: '注意',
|
||||
importDSL: 'DSLファイルからインポートする',
|
||||
errorTip: 'ナレッジベースの作成に失敗しました',
|
||||
createKnowledge: '知識を創造する',
|
||||
successTip: '知識ベースが正常に作成されました',
|
||||
createKnowledge: 'ナレッジベースを作成する',
|
||||
successTip: 'ナレッジベースが正常に作成されました',
|
||||
},
|
||||
templates: {
|
||||
customized: 'カスタマイズされた',
|
||||
|
|
@ -21,10 +21,10 @@ const translation = {
|
|||
preview: 'プレビュー',
|
||||
dataSource: 'データソース',
|
||||
editInfo: '情報を編集する',
|
||||
exportPipeline: '輸出パイプライン',
|
||||
exportPipeline: 'パイプラインをエクスポートする',
|
||||
saveAndProcess: '保存して処理する',
|
||||
backToDataSource: 'データソースに戻る',
|
||||
useTemplate: 'この知識パイプラインを使用してください',
|
||||
useTemplate: 'このナレッジパイプラインを使用してください',
|
||||
process: 'プロセス',
|
||||
},
|
||||
deletePipeline: {
|
||||
|
|
@ -37,7 +37,7 @@ const translation = {
|
|||
tip: '<CustomLink>ドキュメントに移動</CustomLink>して、ドキュメントを追加または管理してください。',
|
||||
},
|
||||
error: {
|
||||
message: '知識パイプラインの公開に失敗しました',
|
||||
message: 'ナレッジパイプラインの公開に失敗しました',
|
||||
},
|
||||
},
|
||||
publishTemplate: {
|
||||
|
|
@ -147,19 +147,19 @@ const translation = {
|
|||
content: 'この操作は永久的です。以前の方法に戻すことはできません。変換することを確認してください。',
|
||||
},
|
||||
warning: 'この操作は元に戻せません。',
|
||||
title: '知識パイプラインに変換する',
|
||||
title: 'ナレッジパイプラインに変換する',
|
||||
successMessage: 'データセットをパイプラインに正常に変換しました',
|
||||
errorMessage: 'データセットをパイプラインに変換できませんでした',
|
||||
descriptionChunk1: '既存の知識ベースを文書処理のためにナレッジパイプラインを使用するように変換できます。',
|
||||
descriptionChunk1: '既存のナレッジベースを文書処理のためにナレッジパイプラインを使用するように変換できます。',
|
||||
descriptionChunk2: '— よりオープンで柔軟なアプローチを採用し、私たちのマーケットプレイスからのプラグインへのアクセスを提供します。これにより、すべての将来のドキュメントに新しい処理方法が適用されることになります。',
|
||||
},
|
||||
knowledgeNameAndIcon: '知識の名前とアイコン',
|
||||
knowledgeNameAndIcon: 'ナレッジの名前とアイコン',
|
||||
inputField: '入力フィールド',
|
||||
pipelineNameAndIcon: 'パイプライン名とアイコン',
|
||||
knowledgePermissions: '権限',
|
||||
knowledgeNameAndIconPlaceholder: 'ナレッジベースの名前を入力してください',
|
||||
editPipelineInfo: 'パイプライン情報を編集する',
|
||||
knowledgeDescription: '知識の説明',
|
||||
knowledgeDescription: 'ナレッジベースの説明',
|
||||
knowledgeDescriptionPlaceholder: 'このナレッジベースに何が含まれているかを説明してください。詳細な説明は、AIがデータセットの内容により正確にアクセスできるようにします。空の場合、Difyはデフォルトのヒット戦略を使用します。(オプション)',
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ const remoteImageURLs = [hasSetWebPrefix ? new URL(`${process.env.NEXT_PUBLIC_WE
|
|||
/** @type {import('next').NextConfig} */
|
||||
const nextConfig = {
|
||||
basePath: process.env.NEXT_PUBLIC_BASE_PATH || '',
|
||||
transpilePackages: ['echarts', 'zrender'],
|
||||
turbopack: {
|
||||
rules: codeInspectorPlugin({
|
||||
bundler: 'turbopack'
|
||||
|
|
|
|||
Loading…
Reference in New Issue