From b1f0a11d84ef06b8923faab0ee81618669fd6c84 Mon Sep 17 00:00:00 2001 From: zyssyz123 <916125788@qq.com> Date: Mon, 25 May 2026 18:08:58 +0800 Subject: [PATCH] feat: output declaration and inspector (#36618) Co-authored-by: Claude Opus 4.7 (1M context) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/core/workflow/node_factory.py | 8 + .../workflow/nodes/agent_v2/agent_node.py | 348 +++++++++++++----- .../nodes/agent_v2/file_tenant_validator.py | 46 +++ .../agent_v2/output_failure_orchestrator.py | 201 ++++++++++ .../nodes/agent_v2/output_type_checker.py | 244 ++++++++++++ .../nodes/agent_v2/runtime_request_builder.py | 78 +++- .../workflow/nodes/agent_v2/validators.py | 17 +- api/models/agent_config_entities.py | 220 ++++++++++- api/openapi/markdown/console-swagger.md | 64 +++- api/services/agent/composer_service.py | 35 ++ .../nodes/agent_v2/test_agent_node.py | 9 + .../agent_v2/test_file_tenant_validator.py | 53 +++ .../test_output_failure_orchestrator.py | 229 ++++++++++++ .../agent_v2/test_output_type_checker.py | 256 +++++++++++++ .../agent_v2/test_runtime_request_builder.py | 76 ++++ .../nodes/agent_v2/test_validators.py | 6 +- .../agent/test_agent_composer_entities.py | 28 +- .../services/agent/test_agent_services.py | 89 +++++ .../generated/api/console/apps/types.gen.ts | 27 +- .../generated/api/console/apps/zod.gen.ts | 81 +++- 20 files changed, 1957 insertions(+), 158 deletions(-) create mode 100644 api/core/workflow/nodes/agent_v2/file_tenant_validator.py create mode 100644 api/core/workflow/nodes/agent_v2/output_failure_orchestrator.py create mode 100644 api/core/workflow/nodes/agent_v2/output_type_checker.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_file_tenant_validator.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_failure_orchestrator.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_type_checker.py diff --git a/api/core/workflow/node_factory.py b/api/core/workflow/node_factory.py index baaa536a5c..4a27b2c623 100644 --- a/api/core/workflow/node_factory.py +++ b/api/core/workflow/node_factory.py @@ -472,6 +472,9 @@ class DifyNodeFactory(NodeFactory): if issubclass(node_class, DifyAgentNode): from clients.agent_backend import AgentBackendRunEventAdapter, AgentBackendRunRequestBuilder from clients.agent_backend.factory import create_agent_backend_run_client + from core.workflow.nodes.agent_v2.file_tenant_validator import UploadFileTenantValidator + from core.workflow.nodes.agent_v2.output_failure_orchestrator import OutputFailureOrchestrator + from core.workflow.nodes.agent_v2.output_type_checker import PerOutputTypeChecker return { "binding_resolver": WorkflowAgentBindingResolver(), @@ -486,6 +489,11 @@ class DifyNodeFactory(NodeFactory): ), "event_adapter": AgentBackendRunEventAdapter(), "output_adapter": WorkflowAgentOutputAdapter(), + # Stage 4 §5/§7: per-output validation + failure orchestration. The + # tenant validator queries upload_files so it stays cheap when + # outputs contain no file refs. + "type_checker": PerOutputTypeChecker(file_validator=UploadFileTenantValidator()), + "failure_orchestrator": OutputFailureOrchestrator(), } return { "strategy_resolver": self._agent_strategy_resolver, diff --git a/api/core/workflow/nodes/agent_v2/agent_node.py b/api/core/workflow/nodes/agent_v2/agent_node.py index 0409579a74..c209c6b1de 100644 --- a/api/core/workflow/nodes/agent_v2/agent_node.py +++ b/api/core/workflow/nodes/agent_v2/agent_node.py @@ -7,9 +7,11 @@ from clients.agent_backend import ( AgentBackendError, AgentBackendHTTPError, AgentBackendInternalEventType, + AgentBackendRunCancelledInternalEvent, AgentBackendRunClient, AgentBackendRunEventAdapter, AgentBackendRunFailedInternalEvent, + AgentBackendRunPausedInternalEvent, AgentBackendRunSucceededInternalEvent, AgentBackendStreamError, AgentBackendStreamInternalEvent, @@ -21,10 +23,18 @@ from core.workflow.system_variables import SystemVariableKey, get_system_text from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from graphon.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent from graphon.nodes.base.node import Node +from models.agent_config_entities import WorkflowNodeJobConfig from .binding_resolver import WorkflowAgentBindingError, WorkflowAgentBindingResolver from .entities import DifyAgentNodeData from .output_adapter import WorkflowAgentOutputAdapter +from .output_failure_orchestrator import ( + FailedOutput, + OutputFailureDecision, + OutputFailureKind, + OutputFailureOrchestrator, +) +from .output_type_checker import OutputTypeCheckOutcome, PerOutputTypeChecker from .runtime_request_builder import ( WorkflowAgentRuntimeBuildContext, WorkflowAgentRuntimeRequestBuilder, @@ -36,6 +46,17 @@ if TYPE_CHECKING: from graphon.runtime import GraphRuntimeState +# Stage 4 §5+§7: the terminal events that `_consume_event_stream` may return. +# Stream + started events are filtered out before we yield; transport errors +# are surfaced as a separate StreamCompletedEvent in the second tuple slot. +_TerminalAgentBackendEvent = ( + AgentBackendRunSucceededInternalEvent + | AgentBackendRunFailedInternalEvent + | AgentBackendRunCancelledInternalEvent + | AgentBackendRunPausedInternalEvent +) + + class DifyAgentNode(Node[DifyAgentNodeData]): node_type = BuiltinNodeTypes.AGENT @@ -51,6 +72,8 @@ class DifyAgentNode(Node[DifyAgentNodeData]): agent_backend_client: AgentBackendRunClient, event_adapter: AgentBackendRunEventAdapter, output_adapter: WorkflowAgentOutputAdapter, + type_checker: PerOutputTypeChecker, + failure_orchestrator: OutputFailureOrchestrator, ) -> None: super().__init__( node_id=node_id, @@ -63,6 +86,8 @@ class DifyAgentNode(Node[DifyAgentNodeData]): self._agent_backend_client = agent_backend_client self._event_adapter = event_adapter self._output_adapter = output_adapter + self._type_checker = type_checker + self._failure_orchestrator = failure_orchestrator @classmethod def version(cls) -> str: @@ -86,6 +111,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]): } } + # ──── Setup: resolve binding once + extract declared outputs for stage 4 checks ──── try: bundle = self._binding_resolver.resolve( tenant_id=dify_ctx.tenant_id, @@ -93,32 +119,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]): workflow_id=workflow_id, node_id=self._node_id, ) - runtime_request = self._runtime_request_builder.build( - WorkflowAgentRuntimeBuildContext( - dify_context=dify_ctx, - workflow_id=workflow_id, - workflow_run_id=workflow_run_id, - node_id=self._node_id, - node_execution_id=self.id, - variable_pool=self.graph_runtime_state.variable_pool, - binding=bundle.binding, - agent=bundle.agent, - snapshot=bundle.snapshot, - ) - ) - inputs = {"agent_backend_request": runtime_request.redacted_request} - metadata = dict(runtime_request.metadata) - process_data = { - "agent_id": bundle.agent.id, - "agent_config_snapshot_id": bundle.snapshot.id, - "binding_id": bundle.binding.id, - } - create_response = self._agent_backend_client.create_run(runtime_request.request) - metadata["agent_backend"] = { - **dict(metadata.get("agent_backend") or {}), - "run_id": create_response.run_id, - "status": create_response.status, - } except WorkflowAgentBindingError as error: yield self._failure_event( inputs=inputs, @@ -128,37 +128,195 @@ class DifyAgentNode(Node[DifyAgentNodeData]): error_type=error.error_code, ) return - except WorkflowAgentRuntimeRequestBuildError as error: - yield self._failure_event( - inputs=inputs, - process_data=process_data, - metadata=metadata, - error=str(error), - error_type=error.error_code, + + process_data = { + "agent_id": bundle.agent.id, + "agent_config_snapshot_id": bundle.snapshot.id, + "binding_id": bundle.binding.id, + } + + # Stage 4 §4.1 (D-3): use effective outputs so defaults flow through both + # the backend request and the post-run type check. + node_job = WorkflowNodeJobConfig.model_validate(bundle.binding.node_job_config_dict) + effective_outputs = list( + WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(list(node_job.declared_outputs)) + ) + outputs_by_name = {o.name: o for o in effective_outputs} + + # ──── Retry loop (Stage 4 §7) ──── + attempt = 0 + while True: + try: + runtime_request = self._runtime_request_builder.build( + WorkflowAgentRuntimeBuildContext( + dify_context=dify_ctx, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + node_id=self._node_id, + node_execution_id=self.id, + variable_pool=self.graph_runtime_state.variable_pool, + binding=bundle.binding, + agent=bundle.agent, + snapshot=bundle.snapshot, + attempt=attempt, + ) + ) + except WorkflowAgentRuntimeRequestBuildError as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type=error.error_code, + ) + return + except Exception as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type="agent_workflow_node_runtime_error", + ) + return + + # Capture inputs only from the first attempt so retry doesn't churn the + # node's "inputs" payload that ends up in the workflow detail view. + if attempt == 0: + inputs = {"agent_backend_request": runtime_request.redacted_request} + metadata = dict(runtime_request.metadata) + metadata["attempt"] = attempt + + try: + create_response = self._agent_backend_client.create_run(runtime_request.request) + except AgentBackendError as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type=self._agent_backend_error_type(error), + ) + return + + metadata["agent_backend"] = { + **dict(metadata.get("agent_backend") or {}), + "run_id": create_response.run_id, + "status": create_response.status, + } + + terminal_event, exhausted = self._consume_event_stream(create_response.run_id, metadata) + if exhausted is not None: + # Streaming error / unexpected end — surface immediately without + # retrying because the failure is transport-level. + yield exhausted + return + if terminal_event is None: + yield StreamCompletedEvent( + node_run_result=self._output_adapter.build_stream_exhausted_result( + inputs=inputs, + process_data=process_data, + metadata=metadata, + ) + ) + return + + # Non-success terminal (failed / cancelled / paused) skips per-output + # post-processing — the backend itself already failed. + if not isinstance(terminal_event, AgentBackendRunSucceededInternalEvent): + yield StreamCompletedEvent( + node_run_result=self._output_adapter.build_failure_result( + event=terminal_event, + inputs=inputs, + process_data=process_data, + metadata=metadata, + ) + ) + return + + # ──── Stage 4: per-output type check ──── + type_check = self._type_checker.check( + declared_outputs=effective_outputs, + raw_output=terminal_event.output, + tenant_id=dify_ctx.tenant_id, ) - return - except AgentBackendError as error: - yield self._failure_event( - inputs=inputs, - process_data=process_data, - metadata=metadata, - error=str(error), - error_type=self._agent_backend_error_type(error), + self._record_type_check_metadata(metadata, type_check) + + if not type_check.has_failures: + yield StreamCompletedEvent( + node_run_result=self._output_adapter.build_success_result( + event=terminal_event, + inputs=inputs, + process_data=process_data, + metadata=metadata, + ) + ) + return + + # ──── Stage 4: orchestrate retry / default / fail ──── + failures = [ + FailedOutput( + declared=outputs_by_name[result.name], + failure_kind=OutputFailureKind.TYPE_CHECK, + reason=result.reason, + ) + for result in type_check.failures + if result.name in outputs_by_name + ] + outcome = self._failure_orchestrator.decide(failures=failures, current_attempt=attempt) + metadata["output_failure_decision"] = outcome.decision.value + metadata["output_failure_reason"] = outcome.primary_reason + + if outcome.decision == OutputFailureDecision.RETRY: + attempt = outcome.next_attempt + continue + + if outcome.decision == OutputFailureDecision.USE_DEFAULT: + patched_event = self._patch_event_with_defaults(terminal_event, outcome.per_output_actions) + yield StreamCompletedEvent( + node_run_result=self._output_adapter.build_success_result( + event=patched_event, + inputs=inputs, + process_data=process_data, + metadata=metadata, + ) + ) + return + + error_type = ( + "output_type_check_failed_fail_branch" + if outcome.decision == OutputFailureDecision.TAKE_FAIL_BRANCH + else "output_type_check_failed" ) - return - except Exception as error: yield self._failure_event( inputs=inputs, process_data=process_data, metadata=metadata, - error=str(error), - error_type="agent_workflow_node_runtime_error", + error=outcome.primary_reason, + error_type=error_type, ) return + def _consume_event_stream( + self, + run_id: str, + metadata: dict[str, Any], + ) -> tuple[ + _TerminalAgentBackendEvent | None, + StreamCompletedEvent | None, + ]: + """Consume the SSE stream for one Agent backend run. + + Returns a 2-tuple ``(terminal_event, transport_failure)``: + - ``terminal_event``: the first non-stream/non-started internal event, + or ``None`` if the stream ended without one. + - ``transport_failure``: a populated ``StreamCompletedEvent`` when the + stream itself errored (backend/HTTP/protocol fault). Mutually + exclusive with ``terminal_event``. + """ stream_event_count = 0 try: - for public_event in self._agent_backend_client.stream_events(create_response.run_id): + for public_event in self._agent_backend_client.stream_events(run_id): stream_event_count += 1 for internal_event in self._event_adapter.adapt(public_event): if internal_event.type == AgentBackendInternalEventType.RUN_STARTED: @@ -171,58 +329,78 @@ class DifyAgentNode(Node[DifyAgentNodeData]): **dict(metadata.get("agent_backend") or {}), "stream_event_count": stream_event_count, } - if isinstance(internal_event, AgentBackendRunSucceededInternalEvent): - yield StreamCompletedEvent( - node_run_result=self._output_adapter.build_success_result( - event=internal_event, - inputs=inputs, - process_data=process_data, - metadata=metadata, - ) - ) - return + # Narrow to the 4 known terminal event types so the caller + # can hand the result to ``build_failure_result`` (which is + # typed against the union). Anything else is a protocol- + # level surprise we surface as a stream error. if isinstance( internal_event, - AgentBackendRunFailedInternalEvent, - ) or internal_event.type in { - AgentBackendInternalEventType.RUN_CANCELLED, - AgentBackendInternalEventType.RUN_PAUSED, - }: - yield StreamCompletedEvent( - node_run_result=self._output_adapter.build_failure_result( - event=internal_event, - inputs=inputs, - process_data=process_data, - metadata=metadata, - ) - ) - return + AgentBackendRunSucceededInternalEvent + | AgentBackendRunFailedInternalEvent + | AgentBackendRunCancelledInternalEvent + | AgentBackendRunPausedInternalEvent, + ): + return internal_event, None + return None, self._failure_event( + inputs={}, + process_data={}, + metadata=metadata, + error=f"Unexpected internal event type {internal_event.type!r}", + error_type="agent_backend_stream_error", + ) except AgentBackendError as error: - yield self._failure_event( - inputs=inputs, - process_data=process_data, + return None, self._failure_event( + inputs={}, + process_data={}, metadata=metadata, error=str(error), error_type=self._agent_backend_error_type(error), ) - return except Exception as error: - yield self._failure_event( - inputs=inputs, - process_data=process_data, + return None, self._failure_event( + inputs={}, + process_data={}, metadata=metadata, error=str(error), error_type="agent_backend_stream_error", ) - return - yield StreamCompletedEvent( - node_run_result=self._output_adapter.build_stream_exhausted_result( - inputs=inputs, - process_data=process_data, - metadata=metadata, - ) - ) + return None, None + + @staticmethod + def _record_type_check_metadata(metadata: dict[str, Any], outcome: OutputTypeCheckOutcome) -> None: + # Surface enough detail in metadata for Inspector / debug logs without + # leaking the raw failing values (which may be sensitive). + metadata["output_type_check"] = { + "passed": not outcome.has_failures, + "results": [ + { + "name": r.name, + "type": r.declared_type.value, + "status": r.status.value, + "reason": r.reason, + } + for r in outcome.results + ], + } + + @staticmethod + def _patch_event_with_defaults( + event: AgentBackendRunSucceededInternalEvent, + per_output_actions: Mapping[str, Any], + ) -> AgentBackendRunSucceededInternalEvent: + """Merge USE_DEFAULT replacements into the success event's output dict. + + The event is a frozen dataclass / Pydantic model; we copy with the + replacements applied so downstream code (output_adapter normalize) sees + the patched payload. + """ + if not per_output_actions: + return event + original = event.output if isinstance(event.output, Mapping) else {} + patched_output: dict[str, Any] = dict(original) + patched_output.update(per_output_actions) + return event.model_copy(update={"output": patched_output}) @staticmethod def _failure_event( diff --git a/api/core/workflow/nodes/agent_v2/file_tenant_validator.py b/api/core/workflow/nodes/agent_v2/file_tenant_validator.py new file mode 100644 index 0000000000..69b5297829 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/file_tenant_validator.py @@ -0,0 +1,46 @@ +"""Tenant-scope validator for file refs produced by Agent backend outputs. + +Stage 4 §5.3: every file output the Agent backend produces must resolve to an +``upload_files`` row that belongs to the current tenant; cross-tenant file +references must never be plumbed downstream. ``PerOutputTypeChecker`` accepts a +``FileTenantValidator`` Protocol so unit tests can stub the check without +hitting Postgres. + +This module supplies the production implementation that queries the +``upload_files`` table via SQLAlchemy. +""" + +from __future__ import annotations + +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.exc import DataError, SQLAlchemyError + +from core.db.session_factory import session_factory +from models.model import UploadFile + + +class UploadFileTenantValidator: + """Production ``FileTenantValidator`` backed by the ``upload_files`` table. + + Returns ``False`` (rejects the file) on any pathological input: empty + file_id/tenant_id, non-UUID file_id format, DB errors. The Agent backend + may produce arbitrary strings inside file refs since the schema only + asserts ``{type: string}``; treating malformed refs as invalid keeps the + workflow node from crashing on garbage backend output. + """ + + def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool: + if not file_id or not tenant_id: + return False + try: + UUID(file_id) + except (ValueError, TypeError): + return False + try: + with session_factory.create_session() as session: + owner_tenant_id = session.scalar(select(UploadFile.tenant_id).where(UploadFile.id == file_id)) + except (DataError, SQLAlchemyError): + return False + return owner_tenant_id == tenant_id diff --git a/api/core/workflow/nodes/agent_v2/output_failure_orchestrator.py b/api/core/workflow/nodes/agent_v2/output_failure_orchestrator.py new file mode 100644 index 0000000000..d9d060b660 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/output_failure_orchestrator.py @@ -0,0 +1,201 @@ +"""Per-output failure decision logic for Workflow Agent Node v2. + +Stage 4 §7. Pure orchestration: given a set of per-output failures plus their +configured ``DeclaredOutputFailureStrategy``, decide whether the workflow node +should retry the Agent backend run, take a fail branch, fall back to default +values, or fail outright. + +The orchestrator is intentionally state-free. The caller (``agent_node._run``) +owns the retry attempt counter and is responsible for actually issuing the +re-run; this module only computes the decision. +""" + +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass +from enum import StrEnum +from typing import Any + +from models.agent_config_entities import ( + DeclaredOutputConfig, + DeclaredOutputFailureStrategy, + OutputErrorStrategy, +) + + +class OutputFailureKind(StrEnum): + """Why the per-output post-processing failed.""" + + TYPE_CHECK = "type_check" + OUTPUT_CHECK = "output_check" + + +class OutputFailureDecision(StrEnum): + """What the runtime should do after collecting per-output failures.""" + + # Re-invoke Agent backend (entire node re-runs). Used while retry budget + # remains for at least one failed output. + RETRY = "retry" + # Replace each failed output's value with its declared default_value and + # surface the run as successful. + USE_DEFAULT = "use_default" + # Mark the workflow node as failed; halt downstream propagation. + FAIL_NODE = "fail_node" + # Surface the node as exception → route through fail branch outbound edge. + TAKE_FAIL_BRANCH = "take_fail_branch" + + +@dataclass(frozen=True, slots=True) +class FailedOutput: + """One failed output that the orchestrator needs to reason about.""" + + declared: DeclaredOutputConfig + failure_kind: OutputFailureKind + reason: str | None = None + + +@dataclass(frozen=True, slots=True) +class OutputFailureOutcome: + """Outcome of orchestrating one batch of failures. + + ``per_output_actions`` is non-empty only when ``decision == USE_DEFAULT``; + it maps the failed output's ``name`` to the value that should be merged + into the node's outputs in place of the failed value. + """ + + decision: OutputFailureDecision + per_output_actions: Mapping[str, Any] + next_attempt: int + primary_reason: str + failure_kinds: tuple[OutputFailureKind, ...] + + +# Stage 4 §7 — precedence used to merge differing per-output strategies into a +# single node-level decision when multiple outputs fail at once. +# Smaller integer = lower priority. FAIL_BRANCH wins overall. +_STRATEGY_TERMINAL_RANK: dict[OutputErrorStrategy, int] = { + OutputErrorStrategy.DEFAULT_VALUE: 0, + OutputErrorStrategy.STOP: 1, + OutputErrorStrategy.FAIL_BRANCH: 2, +} + + +class OutputFailureOrchestrator: + """Pure decision engine for per-output failure handling.""" + + def decide( + self, + *, + failures: list[FailedOutput], + current_attempt: int, + ) -> OutputFailureOutcome: + """Compute the next action given a non-empty list of failures. + + ``current_attempt`` is zero-indexed: ``0`` means the failures come + from the first backend run, ``1`` from the first retry, etc. The + returned ``next_attempt`` is the value the caller should use for the + next iteration when ``decision == RETRY``. + """ + if not failures: + raise ValueError("OutputFailureOrchestrator.decide() requires at least one failure") + + # Stage 4 §7: any output whose retry budget is not yet exhausted forces + # a whole-node retry. The effective max-retries is the *maximum* across + # all currently-failed outputs so retry continues until every output's + # budget is spent (or it goes ready). + retry_budget = max( + (f.declared.failure_strategy.retry.max_retries if f.declared.failure_strategy.retry.enabled else 0) + for f in failures + ) + if current_attempt < retry_budget: + return OutputFailureOutcome( + decision=OutputFailureDecision.RETRY, + per_output_actions={}, + next_attempt=current_attempt + 1, + primary_reason=self._summarize(failures), + failure_kinds=self._failure_kinds(failures), + ) + + # Retry budget exhausted: collapse each per-output terminal action into + # a single node-level decision via the precedence table. + merged = self._merge_terminal_decisions(failures) + per_output_actions: dict[str, Any] = {} + if merged == OutputFailureDecision.USE_DEFAULT: + for failure in failures: + strategy = failure.declared.failure_strategy + if strategy.on_failure == OutputErrorStrategy.DEFAULT_VALUE: + per_output_actions[failure.declared.name] = strategy.default_value + + return OutputFailureOutcome( + decision=merged, + per_output_actions=per_output_actions, + next_attempt=current_attempt, + primary_reason=self._summarize(failures), + failure_kinds=self._failure_kinds(failures), + ) + + @staticmethod + def _merge_terminal_decisions(failures: list[FailedOutput]) -> OutputFailureDecision: + # Pick the highest-precedence strategy across all failures. + winning: OutputErrorStrategy = OutputErrorStrategy.DEFAULT_VALUE + winning_rank = -1 + for failure in failures: + strategy = failure.declared.failure_strategy.on_failure + rank = _STRATEGY_TERMINAL_RANK[strategy] + if rank > winning_rank: + winning = strategy + winning_rank = rank + return _TERMINAL_STRATEGY_TO_DECISION[winning] + + @staticmethod + def _summarize(failures: list[FailedOutput]) -> str: + parts: list[str] = [] + for failure in failures: + reason = failure.reason or "no reason recorded" + parts.append(f"{failure.declared.name}[{failure.failure_kind.value}]: {reason}") + return "; ".join(parts) + + @staticmethod + def _failure_kinds(failures: list[FailedOutput]) -> tuple[OutputFailureKind, ...]: + seen: list[OutputFailureKind] = [] + for failure in failures: + if failure.failure_kind not in seen: + seen.append(failure.failure_kind) + return tuple(seen) + + +_TERMINAL_STRATEGY_TO_DECISION: dict[OutputErrorStrategy, OutputFailureDecision] = { + OutputErrorStrategy.STOP: OutputFailureDecision.FAIL_NODE, + OutputErrorStrategy.DEFAULT_VALUE: OutputFailureDecision.USE_DEFAULT, + OutputErrorStrategy.FAIL_BRANCH: OutputFailureDecision.TAKE_FAIL_BRANCH, +} + + +def retry_idempotency_key( + *, + workflow_run_id: str | None, + node_execution_id: str, + attempt: int, +) -> str: + """Compute the Agent backend ``idempotency_key`` for a given attempt. + + Stage 4 §7 / decision D-4: each retry must use a distinct key so the + backend's protocol-level dedup doesn't return the previous run's id. + First attempt (attempt=0) matches the pre-stage-4 key shape so logs stay + backward compatible. + """ + base = f"{workflow_run_id}:{node_execution_id}" if workflow_run_id else node_execution_id + if attempt <= 0: + return base + return f"{base}:retry-{attempt}" + + +def build_failure_strategy_for(declared: DeclaredOutputConfig) -> DeclaredOutputFailureStrategy: + """Convenience accessor that always returns a populated strategy. + + Existing callers that read ``output.failure_strategy`` already get a + populated default thanks to the BaseModel default_factory, but this helper + documents the contract and gives the orchestrator's tests a single hook. + """ + return declared.failure_strategy diff --git a/api/core/workflow/nodes/agent_v2/output_type_checker.py b/api/core/workflow/nodes/agent_v2/output_type_checker.py new file mode 100644 index 0000000000..50981f3faa --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/output_type_checker.py @@ -0,0 +1,244 @@ +"""Per-output runtime type checker for Workflow Agent Node v2. + +Stage 4 §5: after Agent backend returns ``run_succeeded.data.output`` (a JSON +object that already passed the ``dify.output`` layer's JSON Schema validation +inside pydantic-ai), the API side runs a *second* pass that: + +1. Locates each declared output by name in the backend payload. +2. Asserts the value's shape against the declared ``DeclaredOutputType`` + (including array items and file ref objects). +3. For file outputs, verifies the referenced ``file_id`` resolves to a file + owned by the current tenant (PRD §5.3 file output reference safety). + +The checker is intentionally pure: it takes data in and returns a structured +outcome out. ``FileTenantValidator`` is injected as a Protocol so unit tests +can stub tenant resolution without DB access. +""" + +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass +from enum import StrEnum +from typing import Any, Protocol + +from models.agent_config_entities import ( + DeclaredArrayItem, + DeclaredOutputConfig, + DeclaredOutputType, +) + + +class OutputTypeCheckStatus(StrEnum): + """Lifecycle status of a single declared output after type check.""" + + READY = "ready" + NOT_PRODUCED = "not_produced" + TYPE_CHECK_FAILED = "type_check_failed" + + +@dataclass(frozen=True, slots=True) +class OutputTypeCheckResult: + """Outcome of type-checking one declared output. + + ``value`` carries the raw payload value as it appeared in the backend + response. For ``TYPE_CHECK_FAILED`` results the value is preserved so the + Failure Orchestrator can decide whether to surface it (e.g. for debug + metadata) — it is **not** safe to feed into downstream nodes. + """ + + name: str + declared_type: DeclaredOutputType + status: OutputTypeCheckStatus + value: Any + reason: str | None = None + + +@dataclass(frozen=True, slots=True) +class OutputTypeCheckOutcome: + """Aggregate per-output type-check results for one Agent backend run.""" + + results: tuple[OutputTypeCheckResult, ...] + + @property + def failures(self) -> tuple[OutputTypeCheckResult, ...]: + return tuple(r for r in self.results if r.status == OutputTypeCheckStatus.TYPE_CHECK_FAILED) + + @property + def has_failures(self) -> bool: + return bool(self.failures) + + def by_name(self) -> dict[str, OutputTypeCheckResult]: + return {r.name: r for r in self.results} + + +class FileTenantValidator(Protocol): + """Verify a file ref resolves to a file owned by the given tenant.""" + + def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool: ... + + +# Recognized aliases the Agent backend (or pydantic-ai) may produce for the +# canonical file id field. The canonical spec form is ``file_id`` (§5.2). +_FILE_ID_KEYS: tuple[str, ...] = ("file_id", "upload_file_id", "tool_file_id") + + +class PerOutputTypeChecker: + """Validate that each declared output is present and shaped correctly. + + The checker handles array items recursively and is opinionated about file + refs: only dicts with at least one recognized id key plus a tenant-scope + match pass. Stage 4 §5.2 + §5.3. + """ + + def __init__(self, file_validator: FileTenantValidator) -> None: + self._file_validator = file_validator + + def check( + self, + *, + declared_outputs: list[DeclaredOutputConfig], + raw_output: Mapping[str, Any] | Any, + tenant_id: str, + ) -> OutputTypeCheckOutcome: + """Run type check for every declared output. + + ``raw_output`` should be ``run_succeeded.data.output``. The backend + always returns a dict because the ``dify.output`` layer wraps every + schema in a top-level object; if it isn't a dict (e.g. backend + misbehaving) every required output is flagged as ``TYPE_CHECK_FAILED``. + """ + results: list[OutputTypeCheckResult] = [] + payload = raw_output if isinstance(raw_output, Mapping) else None + + for declared in declared_outputs: + if payload is None: + results.append( + OutputTypeCheckResult( + name=declared.name, + declared_type=declared.type, + status=OutputTypeCheckStatus.TYPE_CHECK_FAILED, + value=raw_output, + reason="Backend output is not a JSON object.", + ) + ) + continue + if declared.name not in payload: + if declared.required: + results.append( + OutputTypeCheckResult( + name=declared.name, + declared_type=declared.type, + status=OutputTypeCheckStatus.TYPE_CHECK_FAILED, + value=None, + reason=f"Required output {declared.name!r} is missing from backend payload.", + ) + ) + else: + results.append( + OutputTypeCheckResult( + name=declared.name, + declared_type=declared.type, + status=OutputTypeCheckStatus.NOT_PRODUCED, + value=None, + ) + ) + continue + + value = payload[declared.name] + failure_reason = self._validate_value( + declared_type=declared.type, + value=value, + tenant_id=tenant_id, + array_item=declared.array_item, + ) + if failure_reason is None: + results.append( + OutputTypeCheckResult( + name=declared.name, + declared_type=declared.type, + status=OutputTypeCheckStatus.READY, + value=value, + ) + ) + else: + results.append( + OutputTypeCheckResult( + name=declared.name, + declared_type=declared.type, + status=OutputTypeCheckStatus.TYPE_CHECK_FAILED, + value=value, + reason=failure_reason, + ) + ) + + return OutputTypeCheckOutcome(results=tuple(results)) + + def _validate_value( + self, + *, + declared_type: DeclaredOutputType, + value: Any, + tenant_id: str, + array_item: DeclaredArrayItem | None, + ) -> str | None: + """Return ``None`` on success, or a human-readable failure reason.""" + if declared_type == DeclaredOutputType.STRING: + if not isinstance(value, str): + return f"expected string, got {type(value).__name__}" + return None + if declared_type == DeclaredOutputType.NUMBER: + # ``bool`` is a subclass of int in Python; PRD treats numbers as + # strictly numeric so we reject bools here. + if not isinstance(value, (int, float)) or isinstance(value, bool): + return f"expected number, got {type(value).__name__}" + return None + if declared_type == DeclaredOutputType.BOOLEAN: + if not isinstance(value, bool): + return f"expected boolean, got {type(value).__name__}" + return None + if declared_type == DeclaredOutputType.OBJECT: + if not isinstance(value, Mapping): + return f"expected object, got {type(value).__name__}" + return None + if declared_type == DeclaredOutputType.ARRAY: + if not isinstance(value, list): + return f"expected array, got {type(value).__name__}" + if array_item is None: + # Defensive: the model validator should have populated this; if + # absent, accept any items rather than crash. + return None + for index, item in enumerate(value): + item_reason = self._validate_value( + declared_type=array_item.type, + value=item, + tenant_id=tenant_id, + array_item=None, + ) + if item_reason is not None: + return f"items[{index}]: {item_reason}" + return None + if declared_type == DeclaredOutputType.FILE: + return self._validate_file_value(value=value, tenant_id=tenant_id) + + # Defensive: future DeclaredOutputType members reach this branch and + # should fail loudly so we never silently accept unknown shapes. + return f"unsupported declared_type={declared_type!r}" + + def _validate_file_value(self, *, value: Any, tenant_id: str) -> str | None: + if not isinstance(value, Mapping): + return f"expected file ref object, got {type(value).__name__}" + file_id = self._extract_file_id(value) + if file_id is None: + return "file ref missing a recognized file_id field" + if not self._file_validator.is_owned_by_tenant(file_id=file_id, tenant_id=tenant_id): + return f"file_id {file_id!r} is not accessible to tenant {tenant_id!r}" + return None + + @staticmethod + def _extract_file_id(value: Mapping[str, Any]) -> str | None: + for key in _FILE_ID_KEYS: + candidate = value.get(key) + if isinstance(candidate, str) and candidate: + return candidate + return None diff --git a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py index 66a4418b68..431f658e33 100644 --- a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py +++ b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py @@ -19,11 +19,16 @@ from graphon.variables.segments import Segment from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding from models.agent_config_entities import ( AgentSoulConfig, + DeclaredArrayItem, DeclaredOutputConfig, DeclaredOutputType, WorkflowNodeJobConfig, ) +from models.agent_config_entities import ( + effective_declared_outputs as _effective_declared_outputs, +) +from .output_failure_orchestrator import retry_idempotency_key from .runtime_feature_manifest import build_runtime_feature_manifest @@ -56,6 +61,9 @@ class WorkflowAgentRuntimeBuildContext: binding: WorkflowAgentNodeBinding agent: Agent snapshot: AgentConfigSnapshot + # Stage 4 §7 / D-4: 0 for the first run, then incremented per retry. Drives the + # idempotency key so the backend treats each retry as a fresh request. + attempt: int = 0 @dataclass(frozen=True, slots=True) @@ -142,9 +150,13 @@ class WorkflowAgentRuntimeRequestBuilder: @staticmethod def _idempotency_key(context: WorkflowAgentRuntimeBuildContext) -> str: - if context.workflow_run_id: - return f"{context.workflow_run_id}:{context.node_execution_id}" - return context.node_execution_id + # Stage 4 §7 / D-4: retries get distinct keys (``...:retry-{attempt}``) so + # the Agent backend's protocol-level dedup can't replay a previous run. + return retry_idempotency_key( + workflow_run_id=context.workflow_run_id, + node_execution_id=context.node_execution_id, + attempt=context.attempt, + ) @staticmethod def _build_metadata( @@ -237,11 +249,17 @@ class WorkflowAgentRuntimeRequestBuilder: @staticmethod def _build_output_config(declared_outputs: Sequence[DeclaredOutputConfig]) -> AgentBackendOutputConfig | None: - if not declared_outputs: - return None + """Build the structured-output layer config sent to Agent backend. + + Stage 4 §4.1 (D-3): when the user hasn't declared any outputs, inject the + PRD-mandated defaults (text / files / json) at runtime so the backend + always receives a stable schema and the downstream Inspector + nodes + have consistent output names. The defaults are NOT persisted. + """ + effective_outputs = WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(declared_outputs) properties: dict[str, Any] = {} required: list[str] = [] - for output in declared_outputs: + for output in effective_outputs: properties[output.name] = WorkflowAgentRuntimeRequestBuilder._schema_for_declared_output(output) if output.required: required.append(output.name) @@ -250,21 +268,52 @@ class WorkflowAgentRuntimeRequestBuilder: schema["required"] = required return AgentBackendOutputConfig(json_schema=schema) + @staticmethod + def effective_declared_outputs( + declared_outputs: Sequence[DeclaredOutputConfig], + ) -> Sequence[DeclaredOutputConfig]: + """Alias for :func:`models.agent_config_entities.effective_declared_outputs`. + + Kept as a static method on the builder so existing call sites + (``agent_node._run``, tests) don't need to change their import. + """ + return _effective_declared_outputs(list(declared_outputs)) + @staticmethod def _schema_for_declared_output(output: DeclaredOutputConfig) -> dict[str, Any]: - match output.type: + schema = WorkflowAgentRuntimeRequestBuilder._schema_for_type(output.type, array_item=output.array_item) + if output.description: + schema["description"] = output.description + return schema + + @staticmethod + def _schema_for_type( + output_type: DeclaredOutputType, + *, + array_item: DeclaredArrayItem | None = None, + ) -> dict[str, Any]: + match output_type: case DeclaredOutputType.STRING: - schema: dict[str, Any] = {"type": "string"} + return {"type": "string"} case DeclaredOutputType.NUMBER: - schema = {"type": "number"} + return {"type": "number"} case DeclaredOutputType.BOOLEAN: - schema = {"type": "boolean"} + return {"type": "boolean"} case DeclaredOutputType.OBJECT: - schema = {"type": "object"} + return {"type": "object"} case DeclaredOutputType.ARRAY: - schema = {"type": "array"} + # Stage 4 §4.2: items shape mirrors the declared array_item. + # Validator guarantees array_item is set when type is array. + item_type = array_item.type if array_item else DeclaredOutputType.OBJECT + schema: dict[str, Any] = { + "type": "array", + "items": WorkflowAgentRuntimeRequestBuilder._schema_for_type(item_type), + } + if array_item is not None and array_item.description: + schema["items"]["description"] = array_item.description + return schema case DeclaredOutputType.FILE: - schema = { + return { "type": "object", "properties": { "file_id": {"type": "string"}, @@ -273,9 +322,6 @@ class WorkflowAgentRuntimeRequestBuilder: "url": {"type": "string"}, }, } - if output.description: - schema["description"] = output.description - return schema @staticmethod def _normalize_credentials(credentials: Mapping[str, Any]) -> dict[str, str | int | float | bool | None]: diff --git a/api/core/workflow/nodes/agent_v2/validators.py b/api/core/workflow/nodes/agent_v2/validators.py index f54be8621a..f8df0506e8 100644 --- a/api/core/workflow/nodes/agent_v2/validators.py +++ b/api/core/workflow/nodes/agent_v2/validators.py @@ -147,14 +147,15 @@ class WorkflowAgentNodeValidator: f"Workflow Agent node {binding.node_id} has duplicate output name {output.name}." ) output_names.add(output.name) - for check in output.checks: - if check.benchmark_file_ref is not None: - cls._validate_file_ref( - session=session, - binding=binding, - file_ref=check.benchmark_file_ref, - ref_context=f"output {output.name} benchmark file", - ) + # Stage 4 §4.3: declared output carries a single optional check, gated by + # ``check.enabled``. Only enabled checks need their benchmark file resolved. + if output.check is not None and output.check.enabled and output.check.benchmark_file_ref is not None: + cls._validate_file_ref( + session=session, + binding=binding, + file_ref=output.check.benchmark_file_ref, + ref_context=f"output {output.name} benchmark file", + ) for ref in node_job.previous_node_output_refs: selector = cls.selector_from_ref(ref) diff --git a/api/models/agent_config_entities.py b/api/models/agent_config_entities.py index c07f51b261..9524d22d7f 100644 --- a/api/models/agent_config_entities.py +++ b/api/models/agent_config_entities.py @@ -1,7 +1,8 @@ +import re from enum import StrEnum -from typing import Any +from typing import Any, Final -from pydantic import BaseModel, ConfigDict, Field, model_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator class AgentKnowledgeQueryMode(StrEnum): @@ -23,6 +24,23 @@ class DeclaredOutputType(StrEnum): FILE = "file" +class OutputErrorStrategy(StrEnum): + """Per-output failure handling strategy. + + Mirrors ``graphon.ErrorStrategy`` but scoped to a single declared output of + a Workflow Agent Node. The runtime applies the strategy after type check or + output check fails and any configured retry attempts have been exhausted. + """ + + STOP = "stop" + DEFAULT_VALUE = "default_value" + FAIL_BRANCH = "fail_branch" + + +# JSON-schema-friendly name pattern. Stage 4 §3.1 / §10.1. +_OUTPUT_NAME_PATTERN: Final[re.Pattern[str]] = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + class AgentSoulPromptConfig(BaseModel): system_prompt: str = "" @@ -108,40 +126,218 @@ class AgentSoulConfig(BaseModel): class DeclaredOutputFileConfig(BaseModel): + """File-type output metadata. Both lists empty means "any file accepted".""" + + model_config = ConfigDict(extra="forbid") + extensions: list[str] = Field(default_factory=list) mime_types: list[str] = Field(default_factory=list) +class DeclaredArrayItem(BaseModel): + """Per-item shape for an ``array``-typed declared output. + + PRD §OUTPUT 配置框 keeps arrays one level deep on first version; nested arrays + are rejected so the runtime type checker and JSON Schema stay easy to reason + about. Stage 4 §4.2. + """ + + model_config = ConfigDict(extra="forbid") + + type: DeclaredOutputType + description: str | None = None + + @model_validator(mode="after") + def _reject_nested_array(self) -> "DeclaredArrayItem": + if self.type == DeclaredOutputType.ARRAY: + raise ValueError("nested arrays are not supported as array_item.type") + return self + + class DeclaredOutputCheckConfig(BaseModel): - type: str = Field(min_length=1, max_length=64) + """File-output content check via a model-based comparison against a benchmark file. + + Per PRD §OUTPUT 配置框, output check is **file-only** and optional. Stage 4 §4.3. + """ + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False prompt: str | None = None benchmark_file_ref: dict[str, Any] | None = None + # Reserved for stage 4.1: pick a different model than Agent Soul's for the check. + # Stage 4 leaves this Optional and unused by FileOutputCheckExecutor. + model_ref: dict[str, Any] | None = None + + @model_validator(mode="after") + def _require_prompt_and_benchmark_when_enabled(self) -> "DeclaredOutputCheckConfig": + if self.enabled: + if not self.prompt or not self.prompt.strip(): + raise ValueError("prompt is required when output check is enabled") + if self.benchmark_file_ref is None: + raise ValueError("benchmark_file_ref is required when output check is enabled") + return self + + +class DeclaredOutputRetryConfig(BaseModel): + """Per-output retry configuration that mirrors ``graphon.RetryConfig`` shape.""" + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + max_retries: int = Field(default=0, ge=0, le=10) + retry_interval_ms: int = Field(default=0, ge=0, le=60_000) class DeclaredOutputFailureStrategy(BaseModel): - on_type_check_failed: str | None = None - on_output_check_failed: str | None = None - max_retries: int = Field(default=0, ge=0, le=10) + """Per-output failure handling. + + A single strategy applies to both ``type_check`` and ``output_check`` failures + (PRD does not distinguish them at the UX level). Stage 4 §4.4. + """ + + model_config = ConfigDict(extra="forbid") + + retry: DeclaredOutputRetryConfig = Field(default_factory=DeclaredOutputRetryConfig) + on_failure: OutputErrorStrategy = OutputErrorStrategy.STOP + # When ``on_failure == DEFAULT_VALUE`` this value replaces the failed output. The + # value's shape must match the owning ``DeclaredOutputConfig.type``; that match is + # enforced at ``DeclaredOutputConfig`` level so the strategy stays type-agnostic. + default_value: Any = None + + @model_validator(mode="after") + def _require_default_value_when_default_strategy(self) -> "DeclaredOutputFailureStrategy": + if self.on_failure == OutputErrorStrategy.DEFAULT_VALUE and self.default_value is None: + raise ValueError( + "default_value must be provided when on_failure=default_value; None is reserved for 'not set'." + ) + return self class DeclaredOutputConfig(BaseModel): + """One declared output of a Workflow Agent Node. + + Stage 4 normalizes the shape: ``check`` is singular (was ``checks: list`` in + stage 3), and ``failure_strategy`` defaults to a populated value so runtime + code can call ``output.failure_strategy.on_failure`` without None-guards. + """ + + model_config = ConfigDict(extra="forbid") + id: str | None = None name: str = Field(min_length=1, max_length=255) type: DeclaredOutputType description: str | None = None required: bool = True file: DeclaredOutputFileConfig | None = None - checks: list[DeclaredOutputCheckConfig] = Field(default_factory=list) - failure_strategy: DeclaredOutputFailureStrategy | None = None + array_item: DeclaredArrayItem | None = None + check: DeclaredOutputCheckConfig | None = None + failure_strategy: DeclaredOutputFailureStrategy = Field(default_factory=DeclaredOutputFailureStrategy) + + @field_validator("failure_strategy", mode="before") + @classmethod + def _coerce_none_failure_strategy(cls, value: Any) -> Any: + # Backward compat: persisted JSON may carry ``failure_strategy: null``; + # treat it as "use defaults". + if value is None: + return DeclaredOutputFailureStrategy() + return value @model_validator(mode="after") - def validate_file_metadata(self) -> "DeclaredOutputConfig": - if self.type == DeclaredOutputType.FILE and self.file is None: - self.file = DeclaredOutputFileConfig() - if self.type != DeclaredOutputType.FILE and self.file is not None: + def _validate_shape(self) -> "DeclaredOutputConfig": + if not _OUTPUT_NAME_PATTERN.fullmatch(self.name): + raise ValueError( + f"output name {self.name!r} must match {_OUTPUT_NAME_PATTERN.pattern} (JSON-schema-friendly identifier)" + ) + + if self.type == DeclaredOutputType.FILE: + if self.file is None: + self.file = DeclaredOutputFileConfig() + elif self.file is not None: raise ValueError("file metadata is only allowed for file outputs") + + if self.type == DeclaredOutputType.ARRAY: + if self.array_item is None: + # Backward compat for stage 3 fixtures: array without array_item + # defaults to array, matching the prior JSON-Schema behavior. + self.array_item = DeclaredArrayItem(type=DeclaredOutputType.OBJECT) + elif self.array_item is not None: + raise ValueError("array_item is only allowed when type is array") + + # Per PRD §OUTPUT 配置框: output check is file-only. + if self.check is not None and self.check.enabled and self.type != DeclaredOutputType.FILE: + raise ValueError("output check is only allowed for file outputs") + + # If the strategy is DEFAULT_VALUE, validate the default's shape against the + # declared type so we fail at save-time rather than at runtime. + strategy = self.failure_strategy + if strategy.on_failure == OutputErrorStrategy.DEFAULT_VALUE and strategy.default_value is not None: + self._assert_default_value_matches_type(strategy.default_value) + return self + def _assert_default_value_matches_type(self, value: Any) -> None: + type_ = self.type + if type_ == DeclaredOutputType.STRING: + ok = isinstance(value, str) + elif type_ == DeclaredOutputType.NUMBER: + ok = isinstance(value, (int, float)) and not isinstance(value, bool) + elif type_ == DeclaredOutputType.BOOLEAN: + ok = isinstance(value, bool) + elif type_ == DeclaredOutputType.OBJECT: + ok = isinstance(value, dict) + elif type_ == DeclaredOutputType.ARRAY: + ok = isinstance(value, list) + elif type_ == DeclaredOutputType.FILE: + ok = isinstance(value, dict) and "file_id" in value + else: + ok = False + if not ok: + raise ValueError( + f"default_value shape does not match output type {type_.value!r}: got {type(value).__name__}" + ) + + +# PRD §OUTPUT 配置框 0522 共识: "Output 如果没有配置,则 text, files, json" +# The runtime injects these when ``declared_outputs`` is empty (stage 4 §4.1, D-3). +# Not persisted; mutating this constant changes UI defaults globally. +DEFAULT_DECLARED_OUTPUTS: Final[tuple[DeclaredOutputConfig, ...]] = ( + DeclaredOutputConfig( + name="text", + type=DeclaredOutputType.STRING, + required=False, + description="Free-form text answer.", + ), + DeclaredOutputConfig( + name="files", + type=DeclaredOutputType.ARRAY, + required=False, + description="Files produced by the agent.", + array_item=DeclaredArrayItem(type=DeclaredOutputType.FILE), + ), + DeclaredOutputConfig( + name="json", + type=DeclaredOutputType.OBJECT, + required=False, + description="Free-form JSON object.", + ), +) + + +def effective_declared_outputs( + declared_outputs: list[DeclaredOutputConfig] | tuple[DeclaredOutputConfig, ...], +) -> tuple[DeclaredOutputConfig, ...]: + """Return the outputs the runtime actually presents. + + Returns ``declared_outputs`` unchanged when non-empty, otherwise the PRD + defaults from ``DEFAULT_DECLARED_OUTPUTS``. Shared helper so Composer load + responses, runtime request builder, and the Node Output Inspector all use + the same fallback (stage 4 §4.1, decision D-3). + """ + if declared_outputs: + return tuple(declared_outputs) + return DEFAULT_DECLARED_OUTPUTS + class WorkflowNodeJobConfig(BaseModel): model_config = ConfigDict(extra="forbid") diff --git a/api/openapi/markdown/console-swagger.md b/api/openapi/markdown/console-swagger.md index 30f34e6f24..5ea5bbe008 100644 --- a/api/openapi/markdown/console-swagger.md +++ b/api/openapi/markdown/console-swagger.md @@ -12153,19 +12153,44 @@ Condition detail | ---- | ---- | ----------- | -------- | | DebugPermission | string | | | +#### DeclaredArrayItem + +Per-item shape for an ``array``-typed declared output. + +PRD §OUTPUT 配置框 keeps arrays one level deep on first version; nested arrays +are rejected so the runtime type checker and JSON Schema stay easy to reason +about. Stage 4 §4.2. + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| description | string | | No | +| type | [DeclaredOutputType](#declaredoutputtype) | | Yes | + #### DeclaredOutputCheckConfig +File-output content check via a model-based comparison against a benchmark file. + +Per PRD §OUTPUT 配置框, output check is **file-only** and optional. Stage 4 §4.3. + | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | | benchmark_file_ref | object | | No | +| enabled | boolean | | No | +| model_ref | object | | No | | prompt | string | | No | -| type | string | | Yes | #### DeclaredOutputConfig +One declared output of a Workflow Agent Node. + +Stage 4 normalizes the shape: ``check`` is singular (was ``checks: list`` in +stage 3), and ``failure_strategy`` defaults to a populated value so runtime +code can call ``output.failure_strategy.on_failure`` without None-guards. + | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | -| checks | [ [DeclaredOutputCheckConfig](#declaredoutputcheckconfig) ] | | No | +| array_item | [DeclaredArrayItem](#declaredarrayitem) | | No | +| check | [DeclaredOutputCheckConfig](#declaredoutputcheckconfig) | | No | | description | string | | No | | failure_strategy | [DeclaredOutputFailureStrategy](#declaredoutputfailurestrategy) | | No | | file | [DeclaredOutputFileConfig](#declaredoutputfileconfig) | | No | @@ -12176,19 +12201,36 @@ Condition detail #### DeclaredOutputFailureStrategy +Per-output failure handling. + +A single strategy applies to both ``type_check`` and ``output_check`` failures +(PRD does not distinguish them at the UX level). Stage 4 §4.4. + | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | -| max_retries | integer | | No | -| on_output_check_failed | string | | No | -| on_type_check_failed | string | | No | +| default_value | | | No | +| on_failure | [OutputErrorStrategy](#outputerrorstrategy) | | No | +| retry | [DeclaredOutputRetryConfig](#declaredoutputretryconfig) | | No | #### DeclaredOutputFileConfig +File-type output metadata. Both lists empty means "any file accepted". + | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | | extensions | [ string ] | | No | | mime_types | [ string ] | | No | +#### DeclaredOutputRetryConfig + +Per-output retry configuration that mirrors ``graphon.RetryConfig`` shape. + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| enabled | boolean | | No | +| max_retries | integer | | No | +| retry_interval_ms | integer | | No | + #### DeclaredOutputType | Name | Type | Description | Required | @@ -13584,6 +13626,18 @@ Enum class for model type. | ---- | ---- | ----------- | -------- | | result | string | Operation result | Yes | +#### OutputErrorStrategy + +Per-output failure handling strategy. + +Mirrors ``graphon.ErrorStrategy`` but scoped to a single declared output of +a Workflow Agent Node. The runtime applies the strategy after type check or +output check fails and any configured retry attempts have been exhausted. + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| OutputErrorStrategy | string | Per-output failure handling strategy. Mirrors ``graphon.ErrorStrategy`` but scoped to a single declared output of a Workflow Agent Node. The runtime applies the strategy after type check or output check fails and any configured retry attempts have been exhausted. | | + #### OwnerTransferCheckPayload | Name | Type | Description | Required | diff --git a/api/services/agent/composer_service.py b/api/services/agent/composer_service.py index 27f3771408..a0f54a8a31 100644 --- a/api/services/agent/composer_service.py +++ b/api/services/agent/composer_service.py @@ -16,6 +16,12 @@ from models.agent import ( WorkflowAgentBindingType, WorkflowAgentNodeBinding, ) +from models.agent_config_entities import ( + DeclaredOutputConfig, +) +from models.agent_config_entities import ( + effective_declared_outputs as _effective_declared_outputs, +) from models.workflow import Workflow from services.agent.composer_validator import ComposerConfigValidator from services.agent.errors import AgentNameConflictError, AgentNotFoundError, AgentVersionNotFoundError @@ -681,6 +687,27 @@ class AgentComposerService: .limit(1) ) + @staticmethod + def _declared_outputs_from_binding(binding: WorkflowAgentNodeBinding) -> list[DeclaredOutputConfig]: + """Re-hydrate the binding's node_job_config into typed declared outputs. + + node_job_config is stored as JSON / LongText; the typed view is needed + so the effective_declared_outputs helper can fall back to defaults on + an empty list without callers re-implementing the fallback. + """ + node_job = WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict) + return list(node_job.declared_outputs) + + @staticmethod + def _serialize_effective_outputs(declared_outputs: list[DeclaredOutputConfig]) -> list[dict[str, Any]]: + """JSON-serialize the effective declared outputs (PRD defaults if empty). + + Stage 4 decision D-3 keeps defaults out of the DB; this helper is the + single place that injects them into the Composer load response so the + wire shape stays consistent whether the user has declared anything yet. + """ + return [output.model_dump(mode="json") for output in _effective_declared_outputs(declared_outputs)] + @classmethod def _empty_workflow_state(cls, *, app_id: str, workflow_id: str, node_id: str) -> dict[str, Any]: return { @@ -691,6 +718,9 @@ class AgentComposerService: "soul_lock": {"locked": False, "can_unlock": False, "reason": "workflow_only_empty"}, "agent_soul": AgentSoulConfig().model_dump(mode="json"), "node_job": WorkflowNodeJobConfig().model_dump(mode="json"), + # Stage 4 §4.1 / §10.1 (D-3): empty composer state still surfaces the + # PRD defaults so the front-end has stable output names to render. + "effective_declared_outputs": cls._serialize_effective_outputs([]), "save_options": [ComposerSaveStrategy.NODE_JOB_ONLY.value, ComposerSaveStrategy.SAVE_TO_ROSTER.value], "impact_summary": None, "app_id": app_id, @@ -739,6 +769,11 @@ class AgentComposerService: if version else AgentSoulConfig().model_dump(mode="json"), "node_job": binding.node_job_config_dict, + # Stage 4 §4.1 / §10.1 (D-3): when the saved node_job carries no + # declared_outputs, surface the PRD defaults so the front-end can + # render them as read-only chips. When user-defined outputs exist + # this is the same list (so callers don't need to special-case). + "effective_declared_outputs": cls._serialize_effective_outputs(cls._declared_outputs_from_binding(binding)), "save_options": save_options, "impact_summary": cls.calculate_impact( tenant_id=binding.tenant_id, current_snapshot_id=binding.current_snapshot_id diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py index 5a0cf87688..b182b94161 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py @@ -99,6 +99,13 @@ def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCE }, call_depth=0, ) + from core.workflow.nodes.agent_v2.output_failure_orchestrator import OutputFailureOrchestrator + from core.workflow.nodes.agent_v2.output_type_checker import PerOutputTypeChecker + + class _AlwaysAllowFileValidator: + def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool: + return True + return DifyAgentNode( node_id="agent-node", data=DifyAgentNodeData.model_validate({"type": BuiltinNodeTypes.AGENT, "version": "2"}), @@ -109,6 +116,8 @@ def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCE agent_backend_client=FakeAgentBackendRunClient(scenario=scenario), event_adapter=AgentBackendRunEventAdapter(), output_adapter=WorkflowAgentOutputAdapter(), + type_checker=PerOutputTypeChecker(file_validator=_AlwaysAllowFileValidator()), + failure_orchestrator=OutputFailureOrchestrator(), ) diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_file_tenant_validator.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_file_tenant_validator.py new file mode 100644 index 0000000000..c070e81790 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_file_tenant_validator.py @@ -0,0 +1,53 @@ +"""Defensive tests for UploadFileTenantValidator (Stage 4 §5.3). + +The validator must never raise on pathological inputs: the Agent backend may +hand us garbage in the ``file_id`` slot because the protocol layer only +asserts ``{"type": "string"}``. Anything that isn't a real UUID belonging to +the current tenant should simply return False. +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from core.workflow.nodes.agent_v2.file_tenant_validator import UploadFileTenantValidator + + +def test_empty_inputs_return_false_without_db_hit(): + validator = UploadFileTenantValidator() + with patch("core.workflow.nodes.agent_v2.file_tenant_validator.session_factory") as factory: + assert validator.is_owned_by_tenant(file_id="", tenant_id="tenant-1") is False + assert validator.is_owned_by_tenant(file_id="abc", tenant_id="") is False + factory.create_session.assert_not_called() + + +@pytest.mark.parametrize( + "bad_file_id", + [ + "not-a-uuid", + "this-id-does-not-exist", + "0123", + "🤖🤖🤖", + "../../etc/passwd", + "550e8400-e29b-41d4-a716-446655440000-trailing", + ], +) +def test_non_uuid_file_ids_return_false_without_db_hit(bad_file_id: str): + validator = UploadFileTenantValidator() + with patch("core.workflow.nodes.agent_v2.file_tenant_validator.session_factory") as factory: + assert validator.is_owned_by_tenant(file_id=bad_file_id, tenant_id="tenant-1") is False + factory.create_session.assert_not_called() + + +def test_db_error_swallowed_and_returns_false(): + """Any DB-level fault (timeout, dialect quirk, connection drop) must reject + the file rather than crash the workflow node.""" + from sqlalchemy.exc import SQLAlchemyError + + validator = UploadFileTenantValidator() + valid_uuid = "550e8400-e29b-41d4-a716-446655440000" + with patch("core.workflow.nodes.agent_v2.file_tenant_validator.session_factory") as factory: + factory.create_session.return_value.__enter__.return_value.scalar.side_effect = SQLAlchemyError("boom") + assert validator.is_owned_by_tenant(file_id=valid_uuid, tenant_id="tenant-1") is False diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_failure_orchestrator.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_failure_orchestrator.py new file mode 100644 index 0000000000..f142540f61 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_failure_orchestrator.py @@ -0,0 +1,229 @@ +"""Unit tests for OutputFailureOrchestrator decision logic. + +Stage 4 §7. +""" + +from __future__ import annotations + +import pytest + +from core.workflow.nodes.agent_v2.output_failure_orchestrator import ( + FailedOutput, + OutputFailureDecision, + OutputFailureKind, + OutputFailureOrchestrator, + retry_idempotency_key, +) +from models.agent_config_entities import ( + DeclaredOutputConfig, + DeclaredOutputFailureStrategy, + DeclaredOutputRetryConfig, + DeclaredOutputType, + OutputErrorStrategy, +) + + +def _output( + name: str, + *, + on_failure: OutputErrorStrategy = OutputErrorStrategy.STOP, + max_retries: int = 0, + retry_enabled: bool = False, + default_value=None, +) -> DeclaredOutputConfig: + strategy = DeclaredOutputFailureStrategy( + retry=DeclaredOutputRetryConfig(enabled=retry_enabled, max_retries=max_retries), + on_failure=on_failure, + default_value=default_value, + ) + return DeclaredOutputConfig(name=name, type=DeclaredOutputType.STRING, failure_strategy=strategy) + + +def _failure(declared: DeclaredOutputConfig, kind: OutputFailureKind = OutputFailureKind.TYPE_CHECK) -> FailedOutput: + return FailedOutput(declared=declared, failure_kind=kind, reason="boom") + + +# ────────────────────────────────────────────────────────────────────────────── +# Retry budget +# ────────────────────────────────────────────────────────────────────────────── + + +def test_retry_within_budget_returns_retry_and_increments_attempt(): + orch = OutputFailureOrchestrator() + declared = _output("x", retry_enabled=True, max_retries=2) + + outcome = orch.decide(failures=[_failure(declared)], current_attempt=0) + + assert outcome.decision == OutputFailureDecision.RETRY + assert outcome.next_attempt == 1 + + +def test_retry_budget_uses_max_across_multiple_outputs(): + """If two outputs fail and their retry budgets differ, retry continues + until the *larger* budget is spent (§7).""" + orch = OutputFailureOrchestrator() + out_a = _output("a", retry_enabled=True, max_retries=1) + out_b = _output("b", retry_enabled=True, max_retries=3) + + # attempt=1: a's budget already spent, but b still has budget → still retry + outcome = orch.decide(failures=[_failure(out_a), _failure(out_b)], current_attempt=1) + assert outcome.decision == OutputFailureDecision.RETRY + assert outcome.next_attempt == 2 + + +def test_retry_disabled_skips_retry_even_with_max_retries_set(): + orch = OutputFailureOrchestrator() + declared = _output("x", retry_enabled=False, max_retries=5) # disabled wins + + outcome = orch.decide(failures=[_failure(declared)], current_attempt=0) + + assert outcome.decision != OutputFailureDecision.RETRY + + +def test_retry_budget_exhausted_falls_to_terminal(): + orch = OutputFailureOrchestrator() + declared = _output( + "x", retry_enabled=True, max_retries=1, on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="fallback" + ) + + outcome = orch.decide(failures=[_failure(declared)], current_attempt=1) # already at max + + assert outcome.decision == OutputFailureDecision.USE_DEFAULT + assert outcome.per_output_actions == {"x": "fallback"} + + +# ────────────────────────────────────────────────────────────────────────────── +# Terminal decisions +# ────────────────────────────────────────────────────────────────────────────── + + +def test_stop_terminal_returns_fail_node(): + orch = OutputFailureOrchestrator() + declared = _output("x", on_failure=OutputErrorStrategy.STOP) + + outcome = orch.decide(failures=[_failure(declared)], current_attempt=0) + + assert outcome.decision == OutputFailureDecision.FAIL_NODE + assert outcome.per_output_actions == {} + + +def test_default_value_terminal_collects_per_output_defaults(): + orch = OutputFailureOrchestrator() + out_a = _output("a", on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="A") + out_b = _output("b", on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="B") + + outcome = orch.decide(failures=[_failure(out_a), _failure(out_b)], current_attempt=0) + + assert outcome.decision == OutputFailureDecision.USE_DEFAULT + assert outcome.per_output_actions == {"a": "A", "b": "B"} + + +def test_fail_branch_terminal_returns_take_fail_branch(): + orch = OutputFailureOrchestrator() + declared = _output("x", on_failure=OutputErrorStrategy.FAIL_BRANCH) + + outcome = orch.decide(failures=[_failure(declared)], current_attempt=0) + + assert outcome.decision == OutputFailureDecision.TAKE_FAIL_BRANCH + + +# ────────────────────────────────────────────────────────────────────────────── +# Multi-output precedence: FAIL_BRANCH > FAIL_NODE > USE_DEFAULT +# ────────────────────────────────────────────────────────────────────────────── + + +def test_fail_branch_wins_over_stop_and_default_value(): + orch = OutputFailureOrchestrator() + out_a = _output("a", on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="x") + out_b = _output("b", on_failure=OutputErrorStrategy.STOP) + out_c = _output("c", on_failure=OutputErrorStrategy.FAIL_BRANCH) + + outcome = orch.decide( + failures=[_failure(out_a), _failure(out_b), _failure(out_c)], + current_attempt=0, + ) + + assert outcome.decision == OutputFailureDecision.TAKE_FAIL_BRANCH + # USE_DEFAULT was overridden → no per-output actions surfaced. + assert outcome.per_output_actions == {} + + +def test_stop_wins_over_default_value_when_no_fail_branch(): + orch = OutputFailureOrchestrator() + out_a = _output("a", on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="x") + out_b = _output("b", on_failure=OutputErrorStrategy.STOP) + + outcome = orch.decide(failures=[_failure(out_a), _failure(out_b)], current_attempt=0) + + assert outcome.decision == OutputFailureDecision.FAIL_NODE + + +# ────────────────────────────────────────────────────────────────────────────── +# Output check vs type check failure kinds +# ────────────────────────────────────────────────────────────────────────────── + + +def test_failure_kinds_dedupes_and_preserves_order(): + orch = OutputFailureOrchestrator() + decl = _output("x") + failures = [ + FailedOutput(declared=decl, failure_kind=OutputFailureKind.TYPE_CHECK, reason="r1"), + FailedOutput(declared=decl, failure_kind=OutputFailureKind.OUTPUT_CHECK, reason="r2"), + FailedOutput(declared=decl, failure_kind=OutputFailureKind.TYPE_CHECK, reason="r3"), + ] + + outcome = orch.decide(failures=failures, current_attempt=0) + + assert outcome.failure_kinds == (OutputFailureKind.TYPE_CHECK, OutputFailureKind.OUTPUT_CHECK) + + +# ────────────────────────────────────────────────────────────────────────────── +# Reason summarization +# ────────────────────────────────────────────────────────────────────────────── + + +def test_primary_reason_includes_every_failure(): + orch = OutputFailureOrchestrator() + out_a = _output("a") + out_b = _output("b") + + outcome = orch.decide( + failures=[ + FailedOutput(declared=out_a, failure_kind=OutputFailureKind.TYPE_CHECK, reason="expected str"), + FailedOutput(declared=out_b, failure_kind=OutputFailureKind.OUTPUT_CHECK, reason="missing section 3"), + ], + current_attempt=0, + ) + + assert "a[type_check]: expected str" in outcome.primary_reason + assert "b[output_check]: missing section 3" in outcome.primary_reason + + +def test_no_failures_raises(): + orch = OutputFailureOrchestrator() + with pytest.raises(ValueError, match="at least one failure"): + orch.decide(failures=[], current_attempt=0) + + +# ────────────────────────────────────────────────────────────────────────────── +# Idempotency key helper (decision D-4) +# ────────────────────────────────────────────────────────────────────────────── + + +def test_idempotency_key_first_attempt_matches_pre_stage_4_shape(): + key = retry_idempotency_key(workflow_run_id="run-1", node_execution_id="node-exec-1", attempt=0) + assert key == "run-1:node-exec-1" + + +def test_idempotency_key_appends_retry_attempt_for_retries(): + assert ( + retry_idempotency_key(workflow_run_id="run-1", node_execution_id="node-exec-1", attempt=2) + == "run-1:node-exec-1:retry-2" + ) + + +def test_idempotency_key_handles_missing_workflow_run_id(): + assert retry_idempotency_key(workflow_run_id=None, node_execution_id="node-exec-9", attempt=0) == "node-exec-9" + assert ( + retry_idempotency_key(workflow_run_id=None, node_execution_id="node-exec-9", attempt=1) == "node-exec-9:retry-1" + ) diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_type_checker.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_type_checker.py new file mode 100644 index 0000000000..c80b0c1f95 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_type_checker.py @@ -0,0 +1,256 @@ +"""Unit tests for PerOutputTypeChecker. + +Stage 4 §5.1-§5.3. +""" + +from __future__ import annotations + +from collections.abc import Mapping + +import pytest + +from core.workflow.nodes.agent_v2.output_type_checker import ( + OutputTypeCheckStatus, + PerOutputTypeChecker, +) +from models.agent_config_entities import ( + DeclaredArrayItem, + DeclaredOutputConfig, + DeclaredOutputType, +) + + +class StubFileValidator: + """Trivially records the set of file_ids that pass tenant scope.""" + + def __init__(self, *, allowed: Mapping[str, set[str]] | None = None) -> None: + # Mapping: tenant_id -> {file_id, ...} + self._allowed = {tenant: set(ids) for tenant, ids in (allowed or {}).items()} + + def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool: + return file_id in self._allowed.get(tenant_id, set()) + + +def _str_output(name: str = "summary", required: bool = True) -> DeclaredOutputConfig: + return DeclaredOutputConfig(name=name, type=DeclaredOutputType.STRING, required=required) + + +def _make_checker(*, allowed: Mapping[str, set[str]] | None = None) -> PerOutputTypeChecker: + return PerOutputTypeChecker(file_validator=StubFileValidator(allowed=allowed)) + + +# ────────────────────────────────────────────────────────────────────────────── +# Happy path per type +# ────────────────────────────────────────────────────────────────────────────── + + +def test_all_scalar_types_ready_on_correct_payload(): + checker = _make_checker() + declared = [ + DeclaredOutputConfig(name="s", type=DeclaredOutputType.STRING), + DeclaredOutputConfig(name="n", type=DeclaredOutputType.NUMBER), + DeclaredOutputConfig(name="b", type=DeclaredOutputType.BOOLEAN), + DeclaredOutputConfig(name="o", type=DeclaredOutputType.OBJECT), + ] + payload = {"s": "hello", "n": 3.14, "b": True, "o": {"k": "v"}} + + outcome = checker.check(declared_outputs=declared, raw_output=payload, tenant_id="t-1") + + assert not outcome.has_failures + assert {r.name: r.status for r in outcome.results} == { + "s": OutputTypeCheckStatus.READY, + "n": OutputTypeCheckStatus.READY, + "b": OutputTypeCheckStatus.READY, + "o": OutputTypeCheckStatus.READY, + } + + +def test_number_rejects_bool_even_though_python_says_isinstance_int(): + checker = _make_checker() + outcome = checker.check( + declared_outputs=[DeclaredOutputConfig(name="n", type=DeclaredOutputType.NUMBER)], + raw_output={"n": True}, + tenant_id="t-1", + ) + + assert outcome.has_failures + assert outcome.failures[0].reason == "expected number, got bool" + + +@pytest.mark.parametrize( + ("declared_type", "wrong_value", "expected_kind"), + [ + (DeclaredOutputType.STRING, 123, "int"), + (DeclaredOutputType.NUMBER, "x", "str"), + (DeclaredOutputType.BOOLEAN, "yes", "str"), + (DeclaredOutputType.OBJECT, [1, 2], "list"), + ], +) +def test_type_mismatch_reported_with_actual_kind(declared_type, wrong_value, expected_kind): + checker = _make_checker() + outcome = checker.check( + declared_outputs=[DeclaredOutputConfig(name="x", type=declared_type)], + raw_output={"x": wrong_value}, + tenant_id="t-1", + ) + + assert outcome.failures[0].status == OutputTypeCheckStatus.TYPE_CHECK_FAILED + assert expected_kind in (outcome.failures[0].reason or "") + + +# ────────────────────────────────────────────────────────────────────────────── +# Array + array_item recursion +# ────────────────────────────────────────────────────────────────────────────── + + +def test_array_of_strings_validates_each_item(): + checker = _make_checker() + declared = DeclaredOutputConfig( + name="tags", + type=DeclaredOutputType.ARRAY, + array_item=DeclaredArrayItem(type=DeclaredOutputType.STRING), + ) + + ok = checker.check(declared_outputs=[declared], raw_output={"tags": ["a", "b"]}, tenant_id="t-1") + bad = checker.check(declared_outputs=[declared], raw_output={"tags": ["a", 2]}, tenant_id="t-1") + + assert not ok.has_failures + reason = bad.failures[0].reason + assert reason is not None + assert reason.startswith("items[1]:") + + +def test_array_of_files_validates_per_item_file_ref(): + checker = _make_checker(allowed={"t-1": {"file-A"}}) + declared = DeclaredOutputConfig( + name="docs", + type=DeclaredOutputType.ARRAY, + array_item=DeclaredArrayItem(type=DeclaredOutputType.FILE), + ) + + ok = checker.check( + declared_outputs=[declared], + raw_output={"docs": [{"file_id": "file-A", "filename": "a.pdf"}]}, + tenant_id="t-1", + ) + cross_tenant = checker.check( + declared_outputs=[declared], + raw_output={"docs": [{"file_id": "other-tenant-file", "filename": "x.pdf"}]}, + tenant_id="t-1", + ) + + assert not ok.has_failures + assert cross_tenant.failures[0].reason is not None + assert "not accessible" in cross_tenant.failures[0].reason + + +# ────────────────────────────────────────────────────────────────────────────── +# File ref tenant scope +# ────────────────────────────────────────────────────────────────────────────── + + +def test_file_ref_must_be_tenant_owned(): + checker = _make_checker(allowed={"t-1": {"my-file"}}) + declared = DeclaredOutputConfig(name="report", type=DeclaredOutputType.FILE) + + outcome = checker.check( + declared_outputs=[declared], + raw_output={"report": {"file_id": "my-file", "filename": "r.pdf"}}, + tenant_id="t-1", + ) + assert not outcome.has_failures + + outcome = checker.check( + declared_outputs=[declared], + raw_output={"report": {"file_id": "other", "filename": "r.pdf"}}, + tenant_id="t-1", + ) + assert outcome.has_failures + + +def test_file_ref_missing_id_field_fails(): + checker = _make_checker() + declared = DeclaredOutputConfig(name="r", type=DeclaredOutputType.FILE) + + outcome = checker.check( + declared_outputs=[declared], + raw_output={"r": {"filename": "x.pdf"}}, # no file_id / upload_file_id / tool_file_id + tenant_id="t-1", + ) + + assert outcome.failures[0].reason == "file ref missing a recognized file_id field" + + +def test_file_ref_accepts_upload_file_id_alias(): + checker = _make_checker(allowed={"t-1": {"alt-file"}}) + declared = DeclaredOutputConfig(name="r", type=DeclaredOutputType.FILE) + + outcome = checker.check( + declared_outputs=[declared], + raw_output={"r": {"upload_file_id": "alt-file"}}, + tenant_id="t-1", + ) + + assert not outcome.has_failures + + +# ────────────────────────────────────────────────────────────────────────────── +# Missing values + required flag +# ────────────────────────────────────────────────────────────────────────────── + + +def test_optional_output_missing_is_not_produced_not_a_failure(): + checker = _make_checker() + declared = [ + DeclaredOutputConfig(name="opt", type=DeclaredOutputType.STRING, required=False), + ] + + outcome = checker.check(declared_outputs=declared, raw_output={}, tenant_id="t-1") + + assert not outcome.has_failures + assert outcome.results[0].status == OutputTypeCheckStatus.NOT_PRODUCED + + +def test_required_output_missing_is_failure(): + checker = _make_checker() + declared = [_str_output(required=True)] + + outcome = checker.check(declared_outputs=declared, raw_output={}, tenant_id="t-1") + + assert outcome.has_failures + assert outcome.failures[0].reason is not None + assert "missing" in outcome.failures[0].reason + + +def test_non_dict_payload_fails_all_required_outputs(): + """If backend returns a string instead of an object, every required declared + output is marked failed; optional outputs are marked failed too because we + can't even attempt a lookup.""" + checker = _make_checker() + declared = [_str_output("req", required=True), _str_output("opt", required=False)] + + outcome = checker.check(declared_outputs=declared, raw_output="raw text", tenant_id="t-1") + + assert all(r.status == OutputTypeCheckStatus.TYPE_CHECK_FAILED for r in outcome.results) + assert outcome.failures[0].reason == "Backend output is not a JSON object." + + +# ────────────────────────────────────────────────────────────────────────────── +# Aggregation helpers +# ────────────────────────────────────────────────────────────────────────────── + + +def test_outcome_by_name_indexes_results(): + checker = _make_checker() + declared = [_str_output("a"), _str_output("b")] + + outcome = checker.check( + declared_outputs=declared, + raw_output={"a": "x", "b": 1}, # b wrong type + tenant_id="t-1", + ) + + indexed = outcome.by_name() + assert indexed["a"].status == OutputTypeCheckStatus.READY + assert indexed["b"].status == OutputTypeCheckStatus.TYPE_CHECK_FAILED + assert len(outcome.failures) == 1 diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py index d50a61883b..7ddb5552a8 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py @@ -217,3 +217,79 @@ def test_invalid_previous_node_output_ref_fails_request_build(): WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context) assert exc_info.value.error_code == "invalid_previous_node_output_ref" + + +def test_empty_declared_outputs_injects_prd_defaults_text_files_json(): + """Stage 4 §4.1 (D-3): empty declared_outputs → backend receives the PRD defaults + (text / files / json) as a stable structured-output contract.""" + context = _context() + binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig.model_validate({}), + ) + context = replace(context, binding=binding) + + result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context) + + dumped = result.request.model_dump(mode="json") + output_layer = dumped["composition"]["layers"][-1]["config"] + properties = output_layer["json_schema"]["properties"] + assert set(properties) == {"text", "files", "json"} + assert properties["text"]["type"] == "string" + assert properties["files"]["type"] == "array" + # `files` defaults to array → items is a file ref object. + assert properties["files"]["items"]["properties"]["file_id"]["type"] == "string" + assert properties["json"]["type"] == "object" + # Defaults are all required=False so no `required:` key on the schema. + assert "required" not in output_layer["json_schema"] + + +def test_array_output_emits_typed_items_per_array_item(): + context = _context() + binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig.model_validate( + { + "declared_outputs": [ + { + "name": "tags", + "type": "array", + "array_item": {"type": "string", "description": "topic tag"}, + "required": True, + } + ], + } + ), + ) + context = replace(context, binding=binding) + + result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context) + + output_schema = result.request.model_dump(mode="json")["composition"]["layers"][-1]["config"]["json_schema"] + tags_schema = output_schema["properties"]["tags"] + assert tags_schema["type"] == "array" + assert tags_schema["items"]["type"] == "string" + assert tags_schema["items"]["description"] == "topic tag" + assert output_schema["required"] == ["tags"] + + +def test_effective_declared_outputs_passthrough_when_user_declared(): + """effective_declared_outputs() must return user-provided outputs verbatim + when non-empty; only empty input gets PRD defaults injected.""" + from models.agent_config_entities import DeclaredOutputConfig + + declared = [DeclaredOutputConfig(name="summary", type=DeclaredOutputType.STRING)] + effective = WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(declared) + assert list(effective) == declared diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py index 99d1e91c44..a202bc9ce1 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py @@ -240,7 +240,11 @@ def test_publish_validation_accepts_tenant_scoped_file_ref(): { "name": "report", "type": "file", - "checks": [{"type": "benchmark", "benchmark_file_ref": {"upload_file_id": "file-1"}}], + "check": { + "enabled": True, + "prompt": "Report must include a risk summary.", + "benchmark_file_ref": {"upload_file_id": "file-1"}, + }, } ] } diff --git a/api/tests/unit_tests/services/agent/test_agent_composer_entities.py b/api/tests/unit_tests/services/agent/test_agent_composer_entities.py index af7ae36644..dbdf37a905 100644 --- a/api/tests/unit_tests/services/agent/test_agent_composer_entities.py +++ b/api/tests/unit_tests/services/agent/test_agent_composer_entities.py @@ -105,6 +105,8 @@ def test_agent_soul_model_config_is_first_class_without_credentials(): def test_declared_outputs_support_file_check_and_failure_strategy(): + """Stage 4 §4.3 + §4.4: file output may carry a single ``check`` plus a + full LLM-node-parity ``failure_strategy``.""" node_job = WorkflowNodeJobConfig.model_validate( { "declared_outputs": [ @@ -112,17 +114,14 @@ def test_declared_outputs_support_file_check_and_failure_strategy(): "name": "analysis_report", "type": "file", "file": {"extensions": [".pdf"], "mime_types": ["application/pdf"]}, - "checks": [ - { - "type": "benchmark_file", - "prompt": "Report must include risk summary.", - "benchmark_file_ref": {"upload_file_id": "file-1"}, - } - ], + "check": { + "enabled": True, + "prompt": "Report must include risk summary.", + "benchmark_file_ref": {"upload_file_id": "file-1"}, + }, "failure_strategy": { - "on_type_check_failed": "fail_node", - "on_output_check_failed": "retry", - "max_retries": 1, + "retry": {"enabled": True, "max_retries": 1, "retry_interval_ms": 500}, + "on_failure": "fail_branch", }, } ] @@ -133,9 +132,12 @@ def test_declared_outputs_support_file_check_and_failure_strategy(): assert output.type == DeclaredOutputType.FILE assert output.file is not None assert output.file.extensions == [".pdf"] - assert output.checks[0].type == "benchmark_file" - assert output.failure_strategy is not None - assert output.failure_strategy.max_retries == 1 + assert output.check is not None + assert output.check.enabled is True + assert output.check.prompt == "Report must include risk summary." + assert output.failure_strategy.retry.enabled is True + assert output.failure_strategy.retry.max_retries == 1 + assert output.failure_strategy.on_failure.value == "fail_branch" def test_plaintext_secrets_are_rejected(): diff --git a/api/tests/unit_tests/services/agent/test_agent_services.py b/api/tests/unit_tests/services/agent/test_agent_services.py index 00840640a1..21bbe20ea2 100644 --- a/api/tests/unit_tests/services/agent/test_agent_services.py +++ b/api/tests/unit_tests/services/agent/test_agent_services.py @@ -73,6 +73,13 @@ def test_load_workflow_composer_returns_empty_state(monkeypatch): assert result["binding"] is None assert result["save_options"] == ["node_job_only", "save_to_roster"] assert result["workflow_id"] == "workflow-1" + # Stage 4 §4.1 / §10.1 (D-3): empty state still surfaces PRD defaults so + # the front-end has stable output names to render before the user declares + # anything. + effective = result["effective_declared_outputs"] + assert [o["name"] for o in effective] == ["text", "files", "json"] + files_output = next(o for o in effective if o["name"] == "files") + assert files_output["array_item"] == {"type": "file", "description": None} def test_load_workflow_composer_serializes_existing_binding(monkeypatch): @@ -257,6 +264,37 @@ def test_serialize_workflow_state_changes_lock_and_save_options(monkeypatch): assert state["soul_lock"]["locked"] is True assert "save_as_new_version" in state["save_options"] assert state["agent_soul"]["app_features"] == {} + # Stage 4 §10.1 (D-3): binding with no declared_outputs → response surfaces + # PRD defaults via effective_declared_outputs (DB row remains untouched). + effective_names = [o["name"] for o in state["effective_declared_outputs"]] + assert effective_names == ["text", "files", "json"] + + +def test_serialize_workflow_state_passes_user_declared_outputs_through_effective(monkeypatch): + binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + binding_type=WorkflowAgentBindingType.ROSTER_AGENT, + agent_id="agent-1", + current_snapshot_id="version-1", + workflow_id="workflow-1", + node_id="node-1", + node_job_config=( + '{"workflow_prompt":"work","declared_outputs":[{"name":"summary","type":"string","required":true}]}' + ), + ) + agent = Agent(id="agent-1", name="Analyst", description="", scope=AgentScope.ROSTER, status=AgentStatus.ACTIVE) + version = AgentConfigSnapshot(id="version-1", version=1, config_snapshot='{"prompt":{"system_prompt":"x"}}') + monkeypatch.setattr(AgentComposerService, "calculate_impact", lambda **kwargs: {"workflow_node_count": 1}) + + state = AgentComposerService._serialize_workflow_state(binding=binding, agent=agent, version=version) + + # When the user has declared outputs, effective_declared_outputs is the same + # list (no defaults injected). + effective = state["effective_declared_outputs"] + assert [o["name"] for o in effective] == ["summary"] + assert effective[0]["type"] == "string" + assert effective[0]["required"] is True def test_composer_save_helpers_create_and_rebind_agents(monkeypatch): @@ -573,3 +611,54 @@ def test_validator_dict_helpers_wrap_validation_errors(): assert valid_soul.prompt.system_prompt == "x" assert valid_node_job.workflow_prompt == "x" + + +def test_composer_validator_rejects_stage_4_declared_output_violations(): + """Stage 4 §10.1: the model-layer validators surface stage-4-specific + violations through InvalidComposerConfigError so the Composer save endpoint + reports them with the same error shape as other shape failures. + """ + # Output name violates the JSON-schema-friendly identifier pattern. + with pytest.raises(InvalidComposerConfigError): + ComposerConfigValidator.validate_node_job_dict({"declared_outputs": [{"name": "1bad", "type": "string"}]}) + + # Output check is enabled on a non-file output. + with pytest.raises(InvalidComposerConfigError): + ComposerConfigValidator.validate_node_job_dict( + { + "declared_outputs": [ + { + "name": "text", + "type": "string", + "check": { + "enabled": True, + "prompt": "p", + "benchmark_file_ref": {"file_id": "f"}, + }, + } + ] + } + ) + + # default_value shape doesn't match the declared type. + with pytest.raises(InvalidComposerConfigError): + ComposerConfigValidator.validate_node_job_dict( + { + "declared_outputs": [ + { + "name": "score", + "type": "number", + "failure_strategy": { + "on_failure": "default_value", + "default_value": "not a number", + }, + } + ] + } + ) + + # Nested array_item is rejected outright. + with pytest.raises(InvalidComposerConfigError): + ComposerConfigValidator.validate_node_job_dict( + {"declared_outputs": [{"name": "matrix", "type": "array", "array_item": {"type": "array"}}]} + ) diff --git a/packages/contracts/generated/api/console/apps/types.gen.ts b/packages/contracts/generated/api/console/apps/types.gen.ts index 5a529ea49d..71ad8486ee 100644 --- a/packages/contracts/generated/api/console/apps/types.gen.ts +++ b/packages/contracts/generated/api/console/apps/types.gen.ts @@ -1436,7 +1436,8 @@ export type AgentSoulToolsConfig = { } export type DeclaredOutputConfig = { - checks?: Array + array_item?: DeclaredArrayItem + check?: DeclaredOutputCheckConfig description?: string | null failure_strategy?: DeclaredOutputFailureStrategy file?: DeclaredOutputFileConfig @@ -1524,18 +1525,26 @@ export type AgentSoulModelCredentialRef = { type: string } +export type DeclaredArrayItem = { + description?: string | null + type: DeclaredOutputType +} + export type DeclaredOutputCheckConfig = { benchmark_file_ref?: { [key: string]: unknown } | null + enabled?: boolean + model_ref?: { + [key: string]: unknown + } | null prompt?: string | null - type: string } export type DeclaredOutputFailureStrategy = { - max_retries?: number - on_output_check_failed?: string | null - on_type_check_failed?: string | null + default_value?: unknown + on_failure?: OutputErrorStrategy + retry?: DeclaredOutputRetryConfig } export type DeclaredOutputFileConfig = { @@ -1553,6 +1562,14 @@ export type UserActionConfig = { export type FormInputConfig = unknown +export type OutputErrorStrategy = 'default_value' | 'fail_branch' | 'stop' + +export type DeclaredOutputRetryConfig = { + enabled?: boolean + max_retries?: number + retry_interval_ms?: number +} + export type ButtonStyle = 'accent' | 'default' | 'ghost' | 'primary' export type ParagraphInputConfig = { diff --git a/packages/contracts/generated/api/console/apps/zod.gen.ts b/packages/contracts/generated/api/console/apps/zod.gen.ts index 5c5b0fa213..445127b966 100644 --- a/packages/contracts/generated/api/console/apps/zod.gen.ts +++ b/packages/contracts/generated/api/console/apps/zod.gen.ts @@ -1792,24 +1792,22 @@ export const zAgentSoulConfig = z.object({ /** * DeclaredOutputCheckConfig + * + * File-output content check via a model-based comparison against a benchmark file. + * + * Per PRD §OUTPUT 配置框, output check is **file-only** and optional. Stage 4 §4.3. */ export const zDeclaredOutputCheckConfig = z.object({ benchmark_file_ref: z.record(z.string(), z.unknown()).nullish(), + enabled: z.boolean().optional().default(false), + model_ref: z.record(z.string(), z.unknown()).nullish(), prompt: z.string().nullish(), - type: z.string().min(1).max(64), -}) - -/** - * DeclaredOutputFailureStrategy - */ -export const zDeclaredOutputFailureStrategy = z.object({ - max_retries: z.int().gte(0).lte(10).optional().default(0), - on_output_check_failed: z.string().nullish(), - on_type_check_failed: z.string().nullish(), }) /** * DeclaredOutputFileConfig + * + * File-type output metadata. Both lists empty means "any file accepted". */ export const zDeclaredOutputFileConfig = z.object({ extensions: z.array(z.string()).optional(), @@ -1828,11 +1826,70 @@ export const zDeclaredOutputType = z.enum([ 'string', ]) +/** + * DeclaredArrayItem + * + * Per-item shape for an ``array``-typed declared output. + * + * PRD §OUTPUT 配置框 keeps arrays one level deep on first version; nested arrays + * are rejected so the runtime type checker and JSON Schema stay easy to reason + * about. Stage 4 §4.2. + */ +export const zDeclaredArrayItem = z.object({ + description: z.string().nullish(), + type: zDeclaredOutputType, +}) + +export const zFormInputConfig = z.unknown() + +/** + * OutputErrorStrategy + * + * Per-output failure handling strategy. + * + * Mirrors ``graphon.ErrorStrategy`` but scoped to a single declared output of + * a Workflow Agent Node. The runtime applies the strategy after type check or + * output check fails and any configured retry attempts have been exhausted. + */ +export const zOutputErrorStrategy = z.enum(['default_value', 'fail_branch', 'stop']) + +/** + * DeclaredOutputRetryConfig + * + * Per-output retry configuration that mirrors ``graphon.RetryConfig`` shape. + */ +export const zDeclaredOutputRetryConfig = z.object({ + enabled: z.boolean().optional().default(false), + max_retries: z.int().gte(0).lte(10).optional().default(0), + retry_interval_ms: z.int().gte(0).lte(60000).optional().default(0), +}) + +/** + * DeclaredOutputFailureStrategy + * + * Per-output failure handling. + * + * A single strategy applies to both ``type_check`` and ``output_check`` failures + * (PRD does not distinguish them at the UX level). Stage 4 §4.4. + */ +export const zDeclaredOutputFailureStrategy = z.object({ + default_value: z.unknown().optional(), + on_failure: zOutputErrorStrategy.optional(), + retry: zDeclaredOutputRetryConfig.optional(), +}) + /** * DeclaredOutputConfig + * + * One declared output of a Workflow Agent Node. + * + * Stage 4 normalizes the shape: ``check`` is singular (was ``checks: list`` in + * stage 3), and ``failure_strategy`` defaults to a populated value so runtime + * code can call ``output.failure_strategy.on_failure`` without None-guards. */ export const zDeclaredOutputConfig = z.object({ - checks: z.array(zDeclaredOutputCheckConfig).optional(), + array_item: zDeclaredArrayItem.optional(), + check: zDeclaredOutputCheckConfig.optional(), description: z.string().nullish(), failure_strategy: zDeclaredOutputFailureStrategy.optional(), file: zDeclaredOutputFileConfig.optional(), @@ -1871,8 +1928,6 @@ export const zComposerSavePayload = z.object({ version_note: z.string().nullish(), }) -export const zFormInputConfig = z.unknown() - /** * ButtonStyle *