mirror of
https://github.com/langgenius/dify.git
synced 2026-06-07 16:32:01 +08:00
feat: output declaration and inspector (#36618)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
fbfb4b3a00
commit
b1f0a11d84
@ -472,6 +472,9 @@ class DifyNodeFactory(NodeFactory):
|
||||
if issubclass(node_class, DifyAgentNode):
|
||||
from clients.agent_backend import AgentBackendRunEventAdapter, AgentBackendRunRequestBuilder
|
||||
from clients.agent_backend.factory import create_agent_backend_run_client
|
||||
from core.workflow.nodes.agent_v2.file_tenant_validator import UploadFileTenantValidator
|
||||
from core.workflow.nodes.agent_v2.output_failure_orchestrator import OutputFailureOrchestrator
|
||||
from core.workflow.nodes.agent_v2.output_type_checker import PerOutputTypeChecker
|
||||
|
||||
return {
|
||||
"binding_resolver": WorkflowAgentBindingResolver(),
|
||||
@ -486,6 +489,11 @@ class DifyNodeFactory(NodeFactory):
|
||||
),
|
||||
"event_adapter": AgentBackendRunEventAdapter(),
|
||||
"output_adapter": WorkflowAgentOutputAdapter(),
|
||||
# Stage 4 §5/§7: per-output validation + failure orchestration. The
|
||||
# tenant validator queries upload_files so it stays cheap when
|
||||
# outputs contain no file refs.
|
||||
"type_checker": PerOutputTypeChecker(file_validator=UploadFileTenantValidator()),
|
||||
"failure_orchestrator": OutputFailureOrchestrator(),
|
||||
}
|
||||
return {
|
||||
"strategy_resolver": self._agent_strategy_resolver,
|
||||
|
||||
@ -7,9 +7,11 @@ from clients.agent_backend import (
|
||||
AgentBackendError,
|
||||
AgentBackendHTTPError,
|
||||
AgentBackendInternalEventType,
|
||||
AgentBackendRunCancelledInternalEvent,
|
||||
AgentBackendRunClient,
|
||||
AgentBackendRunEventAdapter,
|
||||
AgentBackendRunFailedInternalEvent,
|
||||
AgentBackendRunPausedInternalEvent,
|
||||
AgentBackendRunSucceededInternalEvent,
|
||||
AgentBackendStreamError,
|
||||
AgentBackendStreamInternalEvent,
|
||||
@ -21,10 +23,18 @@ from core.workflow.system_variables import SystemVariableKey, get_system_text
|
||||
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from graphon.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
|
||||
from graphon.nodes.base.node import Node
|
||||
from models.agent_config_entities import WorkflowNodeJobConfig
|
||||
|
||||
from .binding_resolver import WorkflowAgentBindingError, WorkflowAgentBindingResolver
|
||||
from .entities import DifyAgentNodeData
|
||||
from .output_adapter import WorkflowAgentOutputAdapter
|
||||
from .output_failure_orchestrator import (
|
||||
FailedOutput,
|
||||
OutputFailureDecision,
|
||||
OutputFailureKind,
|
||||
OutputFailureOrchestrator,
|
||||
)
|
||||
from .output_type_checker import OutputTypeCheckOutcome, PerOutputTypeChecker
|
||||
from .runtime_request_builder import (
|
||||
WorkflowAgentRuntimeBuildContext,
|
||||
WorkflowAgentRuntimeRequestBuilder,
|
||||
@ -36,6 +46,17 @@ if TYPE_CHECKING:
|
||||
from graphon.runtime import GraphRuntimeState
|
||||
|
||||
|
||||
# Stage 4 §5+§7: the terminal events that `_consume_event_stream` may return.
|
||||
# Stream + started events are filtered out before we yield; transport errors
|
||||
# are surfaced as a separate StreamCompletedEvent in the second tuple slot.
|
||||
_TerminalAgentBackendEvent = (
|
||||
AgentBackendRunSucceededInternalEvent
|
||||
| AgentBackendRunFailedInternalEvent
|
||||
| AgentBackendRunCancelledInternalEvent
|
||||
| AgentBackendRunPausedInternalEvent
|
||||
)
|
||||
|
||||
|
||||
class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
node_type = BuiltinNodeTypes.AGENT
|
||||
|
||||
@ -51,6 +72,8 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
agent_backend_client: AgentBackendRunClient,
|
||||
event_adapter: AgentBackendRunEventAdapter,
|
||||
output_adapter: WorkflowAgentOutputAdapter,
|
||||
type_checker: PerOutputTypeChecker,
|
||||
failure_orchestrator: OutputFailureOrchestrator,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
node_id=node_id,
|
||||
@ -63,6 +86,8 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
self._agent_backend_client = agent_backend_client
|
||||
self._event_adapter = event_adapter
|
||||
self._output_adapter = output_adapter
|
||||
self._type_checker = type_checker
|
||||
self._failure_orchestrator = failure_orchestrator
|
||||
|
||||
@classmethod
|
||||
def version(cls) -> str:
|
||||
@ -86,6 +111,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
}
|
||||
}
|
||||
|
||||
# ──── Setup: resolve binding once + extract declared outputs for stage 4 checks ────
|
||||
try:
|
||||
bundle = self._binding_resolver.resolve(
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
@ -93,32 +119,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
workflow_id=workflow_id,
|
||||
node_id=self._node_id,
|
||||
)
|
||||
runtime_request = self._runtime_request_builder.build(
|
||||
WorkflowAgentRuntimeBuildContext(
|
||||
dify_context=dify_ctx,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id=self._node_id,
|
||||
node_execution_id=self.id,
|
||||
variable_pool=self.graph_runtime_state.variable_pool,
|
||||
binding=bundle.binding,
|
||||
agent=bundle.agent,
|
||||
snapshot=bundle.snapshot,
|
||||
)
|
||||
)
|
||||
inputs = {"agent_backend_request": runtime_request.redacted_request}
|
||||
metadata = dict(runtime_request.metadata)
|
||||
process_data = {
|
||||
"agent_id": bundle.agent.id,
|
||||
"agent_config_snapshot_id": bundle.snapshot.id,
|
||||
"binding_id": bundle.binding.id,
|
||||
}
|
||||
create_response = self._agent_backend_client.create_run(runtime_request.request)
|
||||
metadata["agent_backend"] = {
|
||||
**dict(metadata.get("agent_backend") or {}),
|
||||
"run_id": create_response.run_id,
|
||||
"status": create_response.status,
|
||||
}
|
||||
except WorkflowAgentBindingError as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
@ -128,37 +128,195 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
error_type=error.error_code,
|
||||
)
|
||||
return
|
||||
except WorkflowAgentRuntimeRequestBuildError as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
error=str(error),
|
||||
error_type=error.error_code,
|
||||
|
||||
process_data = {
|
||||
"agent_id": bundle.agent.id,
|
||||
"agent_config_snapshot_id": bundle.snapshot.id,
|
||||
"binding_id": bundle.binding.id,
|
||||
}
|
||||
|
||||
# Stage 4 §4.1 (D-3): use effective outputs so defaults flow through both
|
||||
# the backend request and the post-run type check.
|
||||
node_job = WorkflowNodeJobConfig.model_validate(bundle.binding.node_job_config_dict)
|
||||
effective_outputs = list(
|
||||
WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(list(node_job.declared_outputs))
|
||||
)
|
||||
outputs_by_name = {o.name: o for o in effective_outputs}
|
||||
|
||||
# ──── Retry loop (Stage 4 §7) ────
|
||||
attempt = 0
|
||||
while True:
|
||||
try:
|
||||
runtime_request = self._runtime_request_builder.build(
|
||||
WorkflowAgentRuntimeBuildContext(
|
||||
dify_context=dify_ctx,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id=self._node_id,
|
||||
node_execution_id=self.id,
|
||||
variable_pool=self.graph_runtime_state.variable_pool,
|
||||
binding=bundle.binding,
|
||||
agent=bundle.agent,
|
||||
snapshot=bundle.snapshot,
|
||||
attempt=attempt,
|
||||
)
|
||||
)
|
||||
except WorkflowAgentRuntimeRequestBuildError as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
error=str(error),
|
||||
error_type=error.error_code,
|
||||
)
|
||||
return
|
||||
except Exception as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
error=str(error),
|
||||
error_type="agent_workflow_node_runtime_error",
|
||||
)
|
||||
return
|
||||
|
||||
# Capture inputs only from the first attempt so retry doesn't churn the
|
||||
# node's "inputs" payload that ends up in the workflow detail view.
|
||||
if attempt == 0:
|
||||
inputs = {"agent_backend_request": runtime_request.redacted_request}
|
||||
metadata = dict(runtime_request.metadata)
|
||||
metadata["attempt"] = attempt
|
||||
|
||||
try:
|
||||
create_response = self._agent_backend_client.create_run(runtime_request.request)
|
||||
except AgentBackendError as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
error=str(error),
|
||||
error_type=self._agent_backend_error_type(error),
|
||||
)
|
||||
return
|
||||
|
||||
metadata["agent_backend"] = {
|
||||
**dict(metadata.get("agent_backend") or {}),
|
||||
"run_id": create_response.run_id,
|
||||
"status": create_response.status,
|
||||
}
|
||||
|
||||
terminal_event, exhausted = self._consume_event_stream(create_response.run_id, metadata)
|
||||
if exhausted is not None:
|
||||
# Streaming error / unexpected end — surface immediately without
|
||||
# retrying because the failure is transport-level.
|
||||
yield exhausted
|
||||
return
|
||||
if terminal_event is None:
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=self._output_adapter.build_stream_exhausted_result(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
# Non-success terminal (failed / cancelled / paused) skips per-output
|
||||
# post-processing — the backend itself already failed.
|
||||
if not isinstance(terminal_event, AgentBackendRunSucceededInternalEvent):
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=self._output_adapter.build_failure_result(
|
||||
event=terminal_event,
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
# ──── Stage 4: per-output type check ────
|
||||
type_check = self._type_checker.check(
|
||||
declared_outputs=effective_outputs,
|
||||
raw_output=terminal_event.output,
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
)
|
||||
return
|
||||
except AgentBackendError as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
error=str(error),
|
||||
error_type=self._agent_backend_error_type(error),
|
||||
self._record_type_check_metadata(metadata, type_check)
|
||||
|
||||
if not type_check.has_failures:
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=self._output_adapter.build_success_result(
|
||||
event=terminal_event,
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
# ──── Stage 4: orchestrate retry / default / fail ────
|
||||
failures = [
|
||||
FailedOutput(
|
||||
declared=outputs_by_name[result.name],
|
||||
failure_kind=OutputFailureKind.TYPE_CHECK,
|
||||
reason=result.reason,
|
||||
)
|
||||
for result in type_check.failures
|
||||
if result.name in outputs_by_name
|
||||
]
|
||||
outcome = self._failure_orchestrator.decide(failures=failures, current_attempt=attempt)
|
||||
metadata["output_failure_decision"] = outcome.decision.value
|
||||
metadata["output_failure_reason"] = outcome.primary_reason
|
||||
|
||||
if outcome.decision == OutputFailureDecision.RETRY:
|
||||
attempt = outcome.next_attempt
|
||||
continue
|
||||
|
||||
if outcome.decision == OutputFailureDecision.USE_DEFAULT:
|
||||
patched_event = self._patch_event_with_defaults(terminal_event, outcome.per_output_actions)
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=self._output_adapter.build_success_result(
|
||||
event=patched_event,
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
error_type = (
|
||||
"output_type_check_failed_fail_branch"
|
||||
if outcome.decision == OutputFailureDecision.TAKE_FAIL_BRANCH
|
||||
else "output_type_check_failed"
|
||||
)
|
||||
return
|
||||
except Exception as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
error=str(error),
|
||||
error_type="agent_workflow_node_runtime_error",
|
||||
error=outcome.primary_reason,
|
||||
error_type=error_type,
|
||||
)
|
||||
return
|
||||
|
||||
def _consume_event_stream(
|
||||
self,
|
||||
run_id: str,
|
||||
metadata: dict[str, Any],
|
||||
) -> tuple[
|
||||
_TerminalAgentBackendEvent | None,
|
||||
StreamCompletedEvent | None,
|
||||
]:
|
||||
"""Consume the SSE stream for one Agent backend run.
|
||||
|
||||
Returns a 2-tuple ``(terminal_event, transport_failure)``:
|
||||
- ``terminal_event``: the first non-stream/non-started internal event,
|
||||
or ``None`` if the stream ended without one.
|
||||
- ``transport_failure``: a populated ``StreamCompletedEvent`` when the
|
||||
stream itself errored (backend/HTTP/protocol fault). Mutually
|
||||
exclusive with ``terminal_event``.
|
||||
"""
|
||||
stream_event_count = 0
|
||||
try:
|
||||
for public_event in self._agent_backend_client.stream_events(create_response.run_id):
|
||||
for public_event in self._agent_backend_client.stream_events(run_id):
|
||||
stream_event_count += 1
|
||||
for internal_event in self._event_adapter.adapt(public_event):
|
||||
if internal_event.type == AgentBackendInternalEventType.RUN_STARTED:
|
||||
@ -171,58 +329,78 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
**dict(metadata.get("agent_backend") or {}),
|
||||
"stream_event_count": stream_event_count,
|
||||
}
|
||||
if isinstance(internal_event, AgentBackendRunSucceededInternalEvent):
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=self._output_adapter.build_success_result(
|
||||
event=internal_event,
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return
|
||||
# Narrow to the 4 known terminal event types so the caller
|
||||
# can hand the result to ``build_failure_result`` (which is
|
||||
# typed against the union). Anything else is a protocol-
|
||||
# level surprise we surface as a stream error.
|
||||
if isinstance(
|
||||
internal_event,
|
||||
AgentBackendRunFailedInternalEvent,
|
||||
) or internal_event.type in {
|
||||
AgentBackendInternalEventType.RUN_CANCELLED,
|
||||
AgentBackendInternalEventType.RUN_PAUSED,
|
||||
}:
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=self._output_adapter.build_failure_result(
|
||||
event=internal_event,
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return
|
||||
AgentBackendRunSucceededInternalEvent
|
||||
| AgentBackendRunFailedInternalEvent
|
||||
| AgentBackendRunCancelledInternalEvent
|
||||
| AgentBackendRunPausedInternalEvent,
|
||||
):
|
||||
return internal_event, None
|
||||
return None, self._failure_event(
|
||||
inputs={},
|
||||
process_data={},
|
||||
metadata=metadata,
|
||||
error=f"Unexpected internal event type {internal_event.type!r}",
|
||||
error_type="agent_backend_stream_error",
|
||||
)
|
||||
except AgentBackendError as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
return None, self._failure_event(
|
||||
inputs={},
|
||||
process_data={},
|
||||
metadata=metadata,
|
||||
error=str(error),
|
||||
error_type=self._agent_backend_error_type(error),
|
||||
)
|
||||
return
|
||||
except Exception as error:
|
||||
yield self._failure_event(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
return None, self._failure_event(
|
||||
inputs={},
|
||||
process_data={},
|
||||
metadata=metadata,
|
||||
error=str(error),
|
||||
error_type="agent_backend_stream_error",
|
||||
)
|
||||
return
|
||||
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=self._output_adapter.build_stream_exhausted_result(
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return None, None
|
||||
|
||||
@staticmethod
|
||||
def _record_type_check_metadata(metadata: dict[str, Any], outcome: OutputTypeCheckOutcome) -> None:
|
||||
# Surface enough detail in metadata for Inspector / debug logs without
|
||||
# leaking the raw failing values (which may be sensitive).
|
||||
metadata["output_type_check"] = {
|
||||
"passed": not outcome.has_failures,
|
||||
"results": [
|
||||
{
|
||||
"name": r.name,
|
||||
"type": r.declared_type.value,
|
||||
"status": r.status.value,
|
||||
"reason": r.reason,
|
||||
}
|
||||
for r in outcome.results
|
||||
],
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _patch_event_with_defaults(
|
||||
event: AgentBackendRunSucceededInternalEvent,
|
||||
per_output_actions: Mapping[str, Any],
|
||||
) -> AgentBackendRunSucceededInternalEvent:
|
||||
"""Merge USE_DEFAULT replacements into the success event's output dict.
|
||||
|
||||
The event is a frozen dataclass / Pydantic model; we copy with the
|
||||
replacements applied so downstream code (output_adapter normalize) sees
|
||||
the patched payload.
|
||||
"""
|
||||
if not per_output_actions:
|
||||
return event
|
||||
original = event.output if isinstance(event.output, Mapping) else {}
|
||||
patched_output: dict[str, Any] = dict(original)
|
||||
patched_output.update(per_output_actions)
|
||||
return event.model_copy(update={"output": patched_output})
|
||||
|
||||
@staticmethod
|
||||
def _failure_event(
|
||||
|
||||
46
api/core/workflow/nodes/agent_v2/file_tenant_validator.py
Normal file
46
api/core/workflow/nodes/agent_v2/file_tenant_validator.py
Normal file
@ -0,0 +1,46 @@
|
||||
"""Tenant-scope validator for file refs produced by Agent backend outputs.
|
||||
|
||||
Stage 4 §5.3: every file output the Agent backend produces must resolve to an
|
||||
``upload_files`` row that belongs to the current tenant; cross-tenant file
|
||||
references must never be plumbed downstream. ``PerOutputTypeChecker`` accepts a
|
||||
``FileTenantValidator`` Protocol so unit tests can stub the check without
|
||||
hitting Postgres.
|
||||
|
||||
This module supplies the production implementation that queries the
|
||||
``upload_files`` table via SQLAlchemy.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.exc import DataError, SQLAlchemyError
|
||||
|
||||
from core.db.session_factory import session_factory
|
||||
from models.model import UploadFile
|
||||
|
||||
|
||||
class UploadFileTenantValidator:
|
||||
"""Production ``FileTenantValidator`` backed by the ``upload_files`` table.
|
||||
|
||||
Returns ``False`` (rejects the file) on any pathological input: empty
|
||||
file_id/tenant_id, non-UUID file_id format, DB errors. The Agent backend
|
||||
may produce arbitrary strings inside file refs since the schema only
|
||||
asserts ``{type: string}``; treating malformed refs as invalid keeps the
|
||||
workflow node from crashing on garbage backend output.
|
||||
"""
|
||||
|
||||
def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool:
|
||||
if not file_id or not tenant_id:
|
||||
return False
|
||||
try:
|
||||
UUID(file_id)
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
try:
|
||||
with session_factory.create_session() as session:
|
||||
owner_tenant_id = session.scalar(select(UploadFile.tenant_id).where(UploadFile.id == file_id))
|
||||
except (DataError, SQLAlchemyError):
|
||||
return False
|
||||
return owner_tenant_id == tenant_id
|
||||
201
api/core/workflow/nodes/agent_v2/output_failure_orchestrator.py
Normal file
201
api/core/workflow/nodes/agent_v2/output_failure_orchestrator.py
Normal file
@ -0,0 +1,201 @@
|
||||
"""Per-output failure decision logic for Workflow Agent Node v2.
|
||||
|
||||
Stage 4 §7. Pure orchestration: given a set of per-output failures plus their
|
||||
configured ``DeclaredOutputFailureStrategy``, decide whether the workflow node
|
||||
should retry the Agent backend run, take a fail branch, fall back to default
|
||||
values, or fail outright.
|
||||
|
||||
The orchestrator is intentionally state-free. The caller (``agent_node._run``)
|
||||
owns the retry attempt counter and is responsible for actually issuing the
|
||||
re-run; this module only computes the decision.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
from models.agent_config_entities import (
|
||||
DeclaredOutputConfig,
|
||||
DeclaredOutputFailureStrategy,
|
||||
OutputErrorStrategy,
|
||||
)
|
||||
|
||||
|
||||
class OutputFailureKind(StrEnum):
|
||||
"""Why the per-output post-processing failed."""
|
||||
|
||||
TYPE_CHECK = "type_check"
|
||||
OUTPUT_CHECK = "output_check"
|
||||
|
||||
|
||||
class OutputFailureDecision(StrEnum):
|
||||
"""What the runtime should do after collecting per-output failures."""
|
||||
|
||||
# Re-invoke Agent backend (entire node re-runs). Used while retry budget
|
||||
# remains for at least one failed output.
|
||||
RETRY = "retry"
|
||||
# Replace each failed output's value with its declared default_value and
|
||||
# surface the run as successful.
|
||||
USE_DEFAULT = "use_default"
|
||||
# Mark the workflow node as failed; halt downstream propagation.
|
||||
FAIL_NODE = "fail_node"
|
||||
# Surface the node as exception → route through fail branch outbound edge.
|
||||
TAKE_FAIL_BRANCH = "take_fail_branch"
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class FailedOutput:
|
||||
"""One failed output that the orchestrator needs to reason about."""
|
||||
|
||||
declared: DeclaredOutputConfig
|
||||
failure_kind: OutputFailureKind
|
||||
reason: str | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class OutputFailureOutcome:
|
||||
"""Outcome of orchestrating one batch of failures.
|
||||
|
||||
``per_output_actions`` is non-empty only when ``decision == USE_DEFAULT``;
|
||||
it maps the failed output's ``name`` to the value that should be merged
|
||||
into the node's outputs in place of the failed value.
|
||||
"""
|
||||
|
||||
decision: OutputFailureDecision
|
||||
per_output_actions: Mapping[str, Any]
|
||||
next_attempt: int
|
||||
primary_reason: str
|
||||
failure_kinds: tuple[OutputFailureKind, ...]
|
||||
|
||||
|
||||
# Stage 4 §7 — precedence used to merge differing per-output strategies into a
|
||||
# single node-level decision when multiple outputs fail at once.
|
||||
# Smaller integer = lower priority. FAIL_BRANCH wins overall.
|
||||
_STRATEGY_TERMINAL_RANK: dict[OutputErrorStrategy, int] = {
|
||||
OutputErrorStrategy.DEFAULT_VALUE: 0,
|
||||
OutputErrorStrategy.STOP: 1,
|
||||
OutputErrorStrategy.FAIL_BRANCH: 2,
|
||||
}
|
||||
|
||||
|
||||
class OutputFailureOrchestrator:
|
||||
"""Pure decision engine for per-output failure handling."""
|
||||
|
||||
def decide(
|
||||
self,
|
||||
*,
|
||||
failures: list[FailedOutput],
|
||||
current_attempt: int,
|
||||
) -> OutputFailureOutcome:
|
||||
"""Compute the next action given a non-empty list of failures.
|
||||
|
||||
``current_attempt`` is zero-indexed: ``0`` means the failures come
|
||||
from the first backend run, ``1`` from the first retry, etc. The
|
||||
returned ``next_attempt`` is the value the caller should use for the
|
||||
next iteration when ``decision == RETRY``.
|
||||
"""
|
||||
if not failures:
|
||||
raise ValueError("OutputFailureOrchestrator.decide() requires at least one failure")
|
||||
|
||||
# Stage 4 §7: any output whose retry budget is not yet exhausted forces
|
||||
# a whole-node retry. The effective max-retries is the *maximum* across
|
||||
# all currently-failed outputs so retry continues until every output's
|
||||
# budget is spent (or it goes ready).
|
||||
retry_budget = max(
|
||||
(f.declared.failure_strategy.retry.max_retries if f.declared.failure_strategy.retry.enabled else 0)
|
||||
for f in failures
|
||||
)
|
||||
if current_attempt < retry_budget:
|
||||
return OutputFailureOutcome(
|
||||
decision=OutputFailureDecision.RETRY,
|
||||
per_output_actions={},
|
||||
next_attempt=current_attempt + 1,
|
||||
primary_reason=self._summarize(failures),
|
||||
failure_kinds=self._failure_kinds(failures),
|
||||
)
|
||||
|
||||
# Retry budget exhausted: collapse each per-output terminal action into
|
||||
# a single node-level decision via the precedence table.
|
||||
merged = self._merge_terminal_decisions(failures)
|
||||
per_output_actions: dict[str, Any] = {}
|
||||
if merged == OutputFailureDecision.USE_DEFAULT:
|
||||
for failure in failures:
|
||||
strategy = failure.declared.failure_strategy
|
||||
if strategy.on_failure == OutputErrorStrategy.DEFAULT_VALUE:
|
||||
per_output_actions[failure.declared.name] = strategy.default_value
|
||||
|
||||
return OutputFailureOutcome(
|
||||
decision=merged,
|
||||
per_output_actions=per_output_actions,
|
||||
next_attempt=current_attempt,
|
||||
primary_reason=self._summarize(failures),
|
||||
failure_kinds=self._failure_kinds(failures),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _merge_terminal_decisions(failures: list[FailedOutput]) -> OutputFailureDecision:
|
||||
# Pick the highest-precedence strategy across all failures.
|
||||
winning: OutputErrorStrategy = OutputErrorStrategy.DEFAULT_VALUE
|
||||
winning_rank = -1
|
||||
for failure in failures:
|
||||
strategy = failure.declared.failure_strategy.on_failure
|
||||
rank = _STRATEGY_TERMINAL_RANK[strategy]
|
||||
if rank > winning_rank:
|
||||
winning = strategy
|
||||
winning_rank = rank
|
||||
return _TERMINAL_STRATEGY_TO_DECISION[winning]
|
||||
|
||||
@staticmethod
|
||||
def _summarize(failures: list[FailedOutput]) -> str:
|
||||
parts: list[str] = []
|
||||
for failure in failures:
|
||||
reason = failure.reason or "no reason recorded"
|
||||
parts.append(f"{failure.declared.name}[{failure.failure_kind.value}]: {reason}")
|
||||
return "; ".join(parts)
|
||||
|
||||
@staticmethod
|
||||
def _failure_kinds(failures: list[FailedOutput]) -> tuple[OutputFailureKind, ...]:
|
||||
seen: list[OutputFailureKind] = []
|
||||
for failure in failures:
|
||||
if failure.failure_kind not in seen:
|
||||
seen.append(failure.failure_kind)
|
||||
return tuple(seen)
|
||||
|
||||
|
||||
_TERMINAL_STRATEGY_TO_DECISION: dict[OutputErrorStrategy, OutputFailureDecision] = {
|
||||
OutputErrorStrategy.STOP: OutputFailureDecision.FAIL_NODE,
|
||||
OutputErrorStrategy.DEFAULT_VALUE: OutputFailureDecision.USE_DEFAULT,
|
||||
OutputErrorStrategy.FAIL_BRANCH: OutputFailureDecision.TAKE_FAIL_BRANCH,
|
||||
}
|
||||
|
||||
|
||||
def retry_idempotency_key(
|
||||
*,
|
||||
workflow_run_id: str | None,
|
||||
node_execution_id: str,
|
||||
attempt: int,
|
||||
) -> str:
|
||||
"""Compute the Agent backend ``idempotency_key`` for a given attempt.
|
||||
|
||||
Stage 4 §7 / decision D-4: each retry must use a distinct key so the
|
||||
backend's protocol-level dedup doesn't return the previous run's id.
|
||||
First attempt (attempt=0) matches the pre-stage-4 key shape so logs stay
|
||||
backward compatible.
|
||||
"""
|
||||
base = f"{workflow_run_id}:{node_execution_id}" if workflow_run_id else node_execution_id
|
||||
if attempt <= 0:
|
||||
return base
|
||||
return f"{base}:retry-{attempt}"
|
||||
|
||||
|
||||
def build_failure_strategy_for(declared: DeclaredOutputConfig) -> DeclaredOutputFailureStrategy:
|
||||
"""Convenience accessor that always returns a populated strategy.
|
||||
|
||||
Existing callers that read ``output.failure_strategy`` already get a
|
||||
populated default thanks to the BaseModel default_factory, but this helper
|
||||
documents the contract and gives the orchestrator's tests a single hook.
|
||||
"""
|
||||
return declared.failure_strategy
|
||||
244
api/core/workflow/nodes/agent_v2/output_type_checker.py
Normal file
244
api/core/workflow/nodes/agent_v2/output_type_checker.py
Normal file
@ -0,0 +1,244 @@
|
||||
"""Per-output runtime type checker for Workflow Agent Node v2.
|
||||
|
||||
Stage 4 §5: after Agent backend returns ``run_succeeded.data.output`` (a JSON
|
||||
object that already passed the ``dify.output`` layer's JSON Schema validation
|
||||
inside pydantic-ai), the API side runs a *second* pass that:
|
||||
|
||||
1. Locates each declared output by name in the backend payload.
|
||||
2. Asserts the value's shape against the declared ``DeclaredOutputType``
|
||||
(including array items and file ref objects).
|
||||
3. For file outputs, verifies the referenced ``file_id`` resolves to a file
|
||||
owned by the current tenant (PRD §5.3 file output reference safety).
|
||||
|
||||
The checker is intentionally pure: it takes data in and returns a structured
|
||||
outcome out. ``FileTenantValidator`` is injected as a Protocol so unit tests
|
||||
can stub tenant resolution without DB access.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from typing import Any, Protocol
|
||||
|
||||
from models.agent_config_entities import (
|
||||
DeclaredArrayItem,
|
||||
DeclaredOutputConfig,
|
||||
DeclaredOutputType,
|
||||
)
|
||||
|
||||
|
||||
class OutputTypeCheckStatus(StrEnum):
|
||||
"""Lifecycle status of a single declared output after type check."""
|
||||
|
||||
READY = "ready"
|
||||
NOT_PRODUCED = "not_produced"
|
||||
TYPE_CHECK_FAILED = "type_check_failed"
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class OutputTypeCheckResult:
|
||||
"""Outcome of type-checking one declared output.
|
||||
|
||||
``value`` carries the raw payload value as it appeared in the backend
|
||||
response. For ``TYPE_CHECK_FAILED`` results the value is preserved so the
|
||||
Failure Orchestrator can decide whether to surface it (e.g. for debug
|
||||
metadata) — it is **not** safe to feed into downstream nodes.
|
||||
"""
|
||||
|
||||
name: str
|
||||
declared_type: DeclaredOutputType
|
||||
status: OutputTypeCheckStatus
|
||||
value: Any
|
||||
reason: str | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class OutputTypeCheckOutcome:
|
||||
"""Aggregate per-output type-check results for one Agent backend run."""
|
||||
|
||||
results: tuple[OutputTypeCheckResult, ...]
|
||||
|
||||
@property
|
||||
def failures(self) -> tuple[OutputTypeCheckResult, ...]:
|
||||
return tuple(r for r in self.results if r.status == OutputTypeCheckStatus.TYPE_CHECK_FAILED)
|
||||
|
||||
@property
|
||||
def has_failures(self) -> bool:
|
||||
return bool(self.failures)
|
||||
|
||||
def by_name(self) -> dict[str, OutputTypeCheckResult]:
|
||||
return {r.name: r for r in self.results}
|
||||
|
||||
|
||||
class FileTenantValidator(Protocol):
|
||||
"""Verify a file ref resolves to a file owned by the given tenant."""
|
||||
|
||||
def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool: ...
|
||||
|
||||
|
||||
# Recognized aliases the Agent backend (or pydantic-ai) may produce for the
|
||||
# canonical file id field. The canonical spec form is ``file_id`` (§5.2).
|
||||
_FILE_ID_KEYS: tuple[str, ...] = ("file_id", "upload_file_id", "tool_file_id")
|
||||
|
||||
|
||||
class PerOutputTypeChecker:
|
||||
"""Validate that each declared output is present and shaped correctly.
|
||||
|
||||
The checker handles array items recursively and is opinionated about file
|
||||
refs: only dicts with at least one recognized id key plus a tenant-scope
|
||||
match pass. Stage 4 §5.2 + §5.3.
|
||||
"""
|
||||
|
||||
def __init__(self, file_validator: FileTenantValidator) -> None:
|
||||
self._file_validator = file_validator
|
||||
|
||||
def check(
|
||||
self,
|
||||
*,
|
||||
declared_outputs: list[DeclaredOutputConfig],
|
||||
raw_output: Mapping[str, Any] | Any,
|
||||
tenant_id: str,
|
||||
) -> OutputTypeCheckOutcome:
|
||||
"""Run type check for every declared output.
|
||||
|
||||
``raw_output`` should be ``run_succeeded.data.output``. The backend
|
||||
always returns a dict because the ``dify.output`` layer wraps every
|
||||
schema in a top-level object; if it isn't a dict (e.g. backend
|
||||
misbehaving) every required output is flagged as ``TYPE_CHECK_FAILED``.
|
||||
"""
|
||||
results: list[OutputTypeCheckResult] = []
|
||||
payload = raw_output if isinstance(raw_output, Mapping) else None
|
||||
|
||||
for declared in declared_outputs:
|
||||
if payload is None:
|
||||
results.append(
|
||||
OutputTypeCheckResult(
|
||||
name=declared.name,
|
||||
declared_type=declared.type,
|
||||
status=OutputTypeCheckStatus.TYPE_CHECK_FAILED,
|
||||
value=raw_output,
|
||||
reason="Backend output is not a JSON object.",
|
||||
)
|
||||
)
|
||||
continue
|
||||
if declared.name not in payload:
|
||||
if declared.required:
|
||||
results.append(
|
||||
OutputTypeCheckResult(
|
||||
name=declared.name,
|
||||
declared_type=declared.type,
|
||||
status=OutputTypeCheckStatus.TYPE_CHECK_FAILED,
|
||||
value=None,
|
||||
reason=f"Required output {declared.name!r} is missing from backend payload.",
|
||||
)
|
||||
)
|
||||
else:
|
||||
results.append(
|
||||
OutputTypeCheckResult(
|
||||
name=declared.name,
|
||||
declared_type=declared.type,
|
||||
status=OutputTypeCheckStatus.NOT_PRODUCED,
|
||||
value=None,
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
value = payload[declared.name]
|
||||
failure_reason = self._validate_value(
|
||||
declared_type=declared.type,
|
||||
value=value,
|
||||
tenant_id=tenant_id,
|
||||
array_item=declared.array_item,
|
||||
)
|
||||
if failure_reason is None:
|
||||
results.append(
|
||||
OutputTypeCheckResult(
|
||||
name=declared.name,
|
||||
declared_type=declared.type,
|
||||
status=OutputTypeCheckStatus.READY,
|
||||
value=value,
|
||||
)
|
||||
)
|
||||
else:
|
||||
results.append(
|
||||
OutputTypeCheckResult(
|
||||
name=declared.name,
|
||||
declared_type=declared.type,
|
||||
status=OutputTypeCheckStatus.TYPE_CHECK_FAILED,
|
||||
value=value,
|
||||
reason=failure_reason,
|
||||
)
|
||||
)
|
||||
|
||||
return OutputTypeCheckOutcome(results=tuple(results))
|
||||
|
||||
def _validate_value(
|
||||
self,
|
||||
*,
|
||||
declared_type: DeclaredOutputType,
|
||||
value: Any,
|
||||
tenant_id: str,
|
||||
array_item: DeclaredArrayItem | None,
|
||||
) -> str | None:
|
||||
"""Return ``None`` on success, or a human-readable failure reason."""
|
||||
if declared_type == DeclaredOutputType.STRING:
|
||||
if not isinstance(value, str):
|
||||
return f"expected string, got {type(value).__name__}"
|
||||
return None
|
||||
if declared_type == DeclaredOutputType.NUMBER:
|
||||
# ``bool`` is a subclass of int in Python; PRD treats numbers as
|
||||
# strictly numeric so we reject bools here.
|
||||
if not isinstance(value, (int, float)) or isinstance(value, bool):
|
||||
return f"expected number, got {type(value).__name__}"
|
||||
return None
|
||||
if declared_type == DeclaredOutputType.BOOLEAN:
|
||||
if not isinstance(value, bool):
|
||||
return f"expected boolean, got {type(value).__name__}"
|
||||
return None
|
||||
if declared_type == DeclaredOutputType.OBJECT:
|
||||
if not isinstance(value, Mapping):
|
||||
return f"expected object, got {type(value).__name__}"
|
||||
return None
|
||||
if declared_type == DeclaredOutputType.ARRAY:
|
||||
if not isinstance(value, list):
|
||||
return f"expected array, got {type(value).__name__}"
|
||||
if array_item is None:
|
||||
# Defensive: the model validator should have populated this; if
|
||||
# absent, accept any items rather than crash.
|
||||
return None
|
||||
for index, item in enumerate(value):
|
||||
item_reason = self._validate_value(
|
||||
declared_type=array_item.type,
|
||||
value=item,
|
||||
tenant_id=tenant_id,
|
||||
array_item=None,
|
||||
)
|
||||
if item_reason is not None:
|
||||
return f"items[{index}]: {item_reason}"
|
||||
return None
|
||||
if declared_type == DeclaredOutputType.FILE:
|
||||
return self._validate_file_value(value=value, tenant_id=tenant_id)
|
||||
|
||||
# Defensive: future DeclaredOutputType members reach this branch and
|
||||
# should fail loudly so we never silently accept unknown shapes.
|
||||
return f"unsupported declared_type={declared_type!r}"
|
||||
|
||||
def _validate_file_value(self, *, value: Any, tenant_id: str) -> str | None:
|
||||
if not isinstance(value, Mapping):
|
||||
return f"expected file ref object, got {type(value).__name__}"
|
||||
file_id = self._extract_file_id(value)
|
||||
if file_id is None:
|
||||
return "file ref missing a recognized file_id field"
|
||||
if not self._file_validator.is_owned_by_tenant(file_id=file_id, tenant_id=tenant_id):
|
||||
return f"file_id {file_id!r} is not accessible to tenant {tenant_id!r}"
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _extract_file_id(value: Mapping[str, Any]) -> str | None:
|
||||
for key in _FILE_ID_KEYS:
|
||||
candidate = value.get(key)
|
||||
if isinstance(candidate, str) and candidate:
|
||||
return candidate
|
||||
return None
|
||||
@ -19,11 +19,16 @@ from graphon.variables.segments import Segment
|
||||
from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding
|
||||
from models.agent_config_entities import (
|
||||
AgentSoulConfig,
|
||||
DeclaredArrayItem,
|
||||
DeclaredOutputConfig,
|
||||
DeclaredOutputType,
|
||||
WorkflowNodeJobConfig,
|
||||
)
|
||||
from models.agent_config_entities import (
|
||||
effective_declared_outputs as _effective_declared_outputs,
|
||||
)
|
||||
|
||||
from .output_failure_orchestrator import retry_idempotency_key
|
||||
from .runtime_feature_manifest import build_runtime_feature_manifest
|
||||
|
||||
|
||||
@ -56,6 +61,9 @@ class WorkflowAgentRuntimeBuildContext:
|
||||
binding: WorkflowAgentNodeBinding
|
||||
agent: Agent
|
||||
snapshot: AgentConfigSnapshot
|
||||
# Stage 4 §7 / D-4: 0 for the first run, then incremented per retry. Drives the
|
||||
# idempotency key so the backend treats each retry as a fresh request.
|
||||
attempt: int = 0
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
@ -142,9 +150,13 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
|
||||
@staticmethod
|
||||
def _idempotency_key(context: WorkflowAgentRuntimeBuildContext) -> str:
|
||||
if context.workflow_run_id:
|
||||
return f"{context.workflow_run_id}:{context.node_execution_id}"
|
||||
return context.node_execution_id
|
||||
# Stage 4 §7 / D-4: retries get distinct keys (``...:retry-{attempt}``) so
|
||||
# the Agent backend's protocol-level dedup can't replay a previous run.
|
||||
return retry_idempotency_key(
|
||||
workflow_run_id=context.workflow_run_id,
|
||||
node_execution_id=context.node_execution_id,
|
||||
attempt=context.attempt,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_metadata(
|
||||
@ -237,11 +249,17 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
|
||||
@staticmethod
|
||||
def _build_output_config(declared_outputs: Sequence[DeclaredOutputConfig]) -> AgentBackendOutputConfig | None:
|
||||
if not declared_outputs:
|
||||
return None
|
||||
"""Build the structured-output layer config sent to Agent backend.
|
||||
|
||||
Stage 4 §4.1 (D-3): when the user hasn't declared any outputs, inject the
|
||||
PRD-mandated defaults (text / files / json) at runtime so the backend
|
||||
always receives a stable schema and the downstream Inspector + nodes
|
||||
have consistent output names. The defaults are NOT persisted.
|
||||
"""
|
||||
effective_outputs = WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(declared_outputs)
|
||||
properties: dict[str, Any] = {}
|
||||
required: list[str] = []
|
||||
for output in declared_outputs:
|
||||
for output in effective_outputs:
|
||||
properties[output.name] = WorkflowAgentRuntimeRequestBuilder._schema_for_declared_output(output)
|
||||
if output.required:
|
||||
required.append(output.name)
|
||||
@ -250,21 +268,52 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
schema["required"] = required
|
||||
return AgentBackendOutputConfig(json_schema=schema)
|
||||
|
||||
@staticmethod
|
||||
def effective_declared_outputs(
|
||||
declared_outputs: Sequence[DeclaredOutputConfig],
|
||||
) -> Sequence[DeclaredOutputConfig]:
|
||||
"""Alias for :func:`models.agent_config_entities.effective_declared_outputs`.
|
||||
|
||||
Kept as a static method on the builder so existing call sites
|
||||
(``agent_node._run``, tests) don't need to change their import.
|
||||
"""
|
||||
return _effective_declared_outputs(list(declared_outputs))
|
||||
|
||||
@staticmethod
|
||||
def _schema_for_declared_output(output: DeclaredOutputConfig) -> dict[str, Any]:
|
||||
match output.type:
|
||||
schema = WorkflowAgentRuntimeRequestBuilder._schema_for_type(output.type, array_item=output.array_item)
|
||||
if output.description:
|
||||
schema["description"] = output.description
|
||||
return schema
|
||||
|
||||
@staticmethod
|
||||
def _schema_for_type(
|
||||
output_type: DeclaredOutputType,
|
||||
*,
|
||||
array_item: DeclaredArrayItem | None = None,
|
||||
) -> dict[str, Any]:
|
||||
match output_type:
|
||||
case DeclaredOutputType.STRING:
|
||||
schema: dict[str, Any] = {"type": "string"}
|
||||
return {"type": "string"}
|
||||
case DeclaredOutputType.NUMBER:
|
||||
schema = {"type": "number"}
|
||||
return {"type": "number"}
|
||||
case DeclaredOutputType.BOOLEAN:
|
||||
schema = {"type": "boolean"}
|
||||
return {"type": "boolean"}
|
||||
case DeclaredOutputType.OBJECT:
|
||||
schema = {"type": "object"}
|
||||
return {"type": "object"}
|
||||
case DeclaredOutputType.ARRAY:
|
||||
schema = {"type": "array"}
|
||||
# Stage 4 §4.2: items shape mirrors the declared array_item.
|
||||
# Validator guarantees array_item is set when type is array.
|
||||
item_type = array_item.type if array_item else DeclaredOutputType.OBJECT
|
||||
schema: dict[str, Any] = {
|
||||
"type": "array",
|
||||
"items": WorkflowAgentRuntimeRequestBuilder._schema_for_type(item_type),
|
||||
}
|
||||
if array_item is not None and array_item.description:
|
||||
schema["items"]["description"] = array_item.description
|
||||
return schema
|
||||
case DeclaredOutputType.FILE:
|
||||
schema = {
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"file_id": {"type": "string"},
|
||||
@ -273,9 +322,6 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
"url": {"type": "string"},
|
||||
},
|
||||
}
|
||||
if output.description:
|
||||
schema["description"] = output.description
|
||||
return schema
|
||||
|
||||
@staticmethod
|
||||
def _normalize_credentials(credentials: Mapping[str, Any]) -> dict[str, str | int | float | bool | None]:
|
||||
|
||||
@ -147,14 +147,15 @@ class WorkflowAgentNodeValidator:
|
||||
f"Workflow Agent node {binding.node_id} has duplicate output name {output.name}."
|
||||
)
|
||||
output_names.add(output.name)
|
||||
for check in output.checks:
|
||||
if check.benchmark_file_ref is not None:
|
||||
cls._validate_file_ref(
|
||||
session=session,
|
||||
binding=binding,
|
||||
file_ref=check.benchmark_file_ref,
|
||||
ref_context=f"output {output.name} benchmark file",
|
||||
)
|
||||
# Stage 4 §4.3: declared output carries a single optional check, gated by
|
||||
# ``check.enabled``. Only enabled checks need their benchmark file resolved.
|
||||
if output.check is not None and output.check.enabled and output.check.benchmark_file_ref is not None:
|
||||
cls._validate_file_ref(
|
||||
session=session,
|
||||
binding=binding,
|
||||
file_ref=output.check.benchmark_file_ref,
|
||||
ref_context=f"output {output.name} benchmark file",
|
||||
)
|
||||
|
||||
for ref in node_job.previous_node_output_refs:
|
||||
selector = cls.selector_from_ref(ref)
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
import re
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
from typing import Any, Final
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, model_validator
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
|
||||
|
||||
|
||||
class AgentKnowledgeQueryMode(StrEnum):
|
||||
@ -23,6 +24,23 @@ class DeclaredOutputType(StrEnum):
|
||||
FILE = "file"
|
||||
|
||||
|
||||
class OutputErrorStrategy(StrEnum):
|
||||
"""Per-output failure handling strategy.
|
||||
|
||||
Mirrors ``graphon.ErrorStrategy`` but scoped to a single declared output of
|
||||
a Workflow Agent Node. The runtime applies the strategy after type check or
|
||||
output check fails and any configured retry attempts have been exhausted.
|
||||
"""
|
||||
|
||||
STOP = "stop"
|
||||
DEFAULT_VALUE = "default_value"
|
||||
FAIL_BRANCH = "fail_branch"
|
||||
|
||||
|
||||
# JSON-schema-friendly name pattern. Stage 4 §3.1 / §10.1.
|
||||
_OUTPUT_NAME_PATTERN: Final[re.Pattern[str]] = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
||||
|
||||
|
||||
class AgentSoulPromptConfig(BaseModel):
|
||||
system_prompt: str = ""
|
||||
|
||||
@ -108,40 +126,218 @@ class AgentSoulConfig(BaseModel):
|
||||
|
||||
|
||||
class DeclaredOutputFileConfig(BaseModel):
|
||||
"""File-type output metadata. Both lists empty means "any file accepted"."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
extensions: list[str] = Field(default_factory=list)
|
||||
mime_types: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class DeclaredArrayItem(BaseModel):
|
||||
"""Per-item shape for an ``array``-typed declared output.
|
||||
|
||||
PRD §OUTPUT 配置框 keeps arrays one level deep on first version; nested arrays
|
||||
are rejected so the runtime type checker and JSON Schema stay easy to reason
|
||||
about. Stage 4 §4.2.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: DeclaredOutputType
|
||||
description: str | None = None
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _reject_nested_array(self) -> "DeclaredArrayItem":
|
||||
if self.type == DeclaredOutputType.ARRAY:
|
||||
raise ValueError("nested arrays are not supported as array_item.type")
|
||||
return self
|
||||
|
||||
|
||||
class DeclaredOutputCheckConfig(BaseModel):
|
||||
type: str = Field(min_length=1, max_length=64)
|
||||
"""File-output content check via a model-based comparison against a benchmark file.
|
||||
|
||||
Per PRD §OUTPUT 配置框, output check is **file-only** and optional. Stage 4 §4.3.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
enabled: bool = False
|
||||
prompt: str | None = None
|
||||
benchmark_file_ref: dict[str, Any] | None = None
|
||||
# Reserved for stage 4.1: pick a different model than Agent Soul's for the check.
|
||||
# Stage 4 leaves this Optional and unused by FileOutputCheckExecutor.
|
||||
model_ref: dict[str, Any] | None = None
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _require_prompt_and_benchmark_when_enabled(self) -> "DeclaredOutputCheckConfig":
|
||||
if self.enabled:
|
||||
if not self.prompt or not self.prompt.strip():
|
||||
raise ValueError("prompt is required when output check is enabled")
|
||||
if self.benchmark_file_ref is None:
|
||||
raise ValueError("benchmark_file_ref is required when output check is enabled")
|
||||
return self
|
||||
|
||||
|
||||
class DeclaredOutputRetryConfig(BaseModel):
|
||||
"""Per-output retry configuration that mirrors ``graphon.RetryConfig`` shape."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
enabled: bool = False
|
||||
max_retries: int = Field(default=0, ge=0, le=10)
|
||||
retry_interval_ms: int = Field(default=0, ge=0, le=60_000)
|
||||
|
||||
|
||||
class DeclaredOutputFailureStrategy(BaseModel):
|
||||
on_type_check_failed: str | None = None
|
||||
on_output_check_failed: str | None = None
|
||||
max_retries: int = Field(default=0, ge=0, le=10)
|
||||
"""Per-output failure handling.
|
||||
|
||||
A single strategy applies to both ``type_check`` and ``output_check`` failures
|
||||
(PRD does not distinguish them at the UX level). Stage 4 §4.4.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
retry: DeclaredOutputRetryConfig = Field(default_factory=DeclaredOutputRetryConfig)
|
||||
on_failure: OutputErrorStrategy = OutputErrorStrategy.STOP
|
||||
# When ``on_failure == DEFAULT_VALUE`` this value replaces the failed output. The
|
||||
# value's shape must match the owning ``DeclaredOutputConfig.type``; that match is
|
||||
# enforced at ``DeclaredOutputConfig`` level so the strategy stays type-agnostic.
|
||||
default_value: Any = None
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _require_default_value_when_default_strategy(self) -> "DeclaredOutputFailureStrategy":
|
||||
if self.on_failure == OutputErrorStrategy.DEFAULT_VALUE and self.default_value is None:
|
||||
raise ValueError(
|
||||
"default_value must be provided when on_failure=default_value; None is reserved for 'not set'."
|
||||
)
|
||||
return self
|
||||
|
||||
|
||||
class DeclaredOutputConfig(BaseModel):
|
||||
"""One declared output of a Workflow Agent Node.
|
||||
|
||||
Stage 4 normalizes the shape: ``check`` is singular (was ``checks: list`` in
|
||||
stage 3), and ``failure_strategy`` defaults to a populated value so runtime
|
||||
code can call ``output.failure_strategy.on_failure`` without None-guards.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
id: str | None = None
|
||||
name: str = Field(min_length=1, max_length=255)
|
||||
type: DeclaredOutputType
|
||||
description: str | None = None
|
||||
required: bool = True
|
||||
file: DeclaredOutputFileConfig | None = None
|
||||
checks: list[DeclaredOutputCheckConfig] = Field(default_factory=list)
|
||||
failure_strategy: DeclaredOutputFailureStrategy | None = None
|
||||
array_item: DeclaredArrayItem | None = None
|
||||
check: DeclaredOutputCheckConfig | None = None
|
||||
failure_strategy: DeclaredOutputFailureStrategy = Field(default_factory=DeclaredOutputFailureStrategy)
|
||||
|
||||
@field_validator("failure_strategy", mode="before")
|
||||
@classmethod
|
||||
def _coerce_none_failure_strategy(cls, value: Any) -> Any:
|
||||
# Backward compat: persisted JSON may carry ``failure_strategy: null``;
|
||||
# treat it as "use defaults".
|
||||
if value is None:
|
||||
return DeclaredOutputFailureStrategy()
|
||||
return value
|
||||
|
||||
@model_validator(mode="after")
|
||||
def validate_file_metadata(self) -> "DeclaredOutputConfig":
|
||||
if self.type == DeclaredOutputType.FILE and self.file is None:
|
||||
self.file = DeclaredOutputFileConfig()
|
||||
if self.type != DeclaredOutputType.FILE and self.file is not None:
|
||||
def _validate_shape(self) -> "DeclaredOutputConfig":
|
||||
if not _OUTPUT_NAME_PATTERN.fullmatch(self.name):
|
||||
raise ValueError(
|
||||
f"output name {self.name!r} must match {_OUTPUT_NAME_PATTERN.pattern} (JSON-schema-friendly identifier)"
|
||||
)
|
||||
|
||||
if self.type == DeclaredOutputType.FILE:
|
||||
if self.file is None:
|
||||
self.file = DeclaredOutputFileConfig()
|
||||
elif self.file is not None:
|
||||
raise ValueError("file metadata is only allowed for file outputs")
|
||||
|
||||
if self.type == DeclaredOutputType.ARRAY:
|
||||
if self.array_item is None:
|
||||
# Backward compat for stage 3 fixtures: array without array_item
|
||||
# defaults to array<object>, matching the prior JSON-Schema behavior.
|
||||
self.array_item = DeclaredArrayItem(type=DeclaredOutputType.OBJECT)
|
||||
elif self.array_item is not None:
|
||||
raise ValueError("array_item is only allowed when type is array")
|
||||
|
||||
# Per PRD §OUTPUT 配置框: output check is file-only.
|
||||
if self.check is not None and self.check.enabled and self.type != DeclaredOutputType.FILE:
|
||||
raise ValueError("output check is only allowed for file outputs")
|
||||
|
||||
# If the strategy is DEFAULT_VALUE, validate the default's shape against the
|
||||
# declared type so we fail at save-time rather than at runtime.
|
||||
strategy = self.failure_strategy
|
||||
if strategy.on_failure == OutputErrorStrategy.DEFAULT_VALUE and strategy.default_value is not None:
|
||||
self._assert_default_value_matches_type(strategy.default_value)
|
||||
|
||||
return self
|
||||
|
||||
def _assert_default_value_matches_type(self, value: Any) -> None:
|
||||
type_ = self.type
|
||||
if type_ == DeclaredOutputType.STRING:
|
||||
ok = isinstance(value, str)
|
||||
elif type_ == DeclaredOutputType.NUMBER:
|
||||
ok = isinstance(value, (int, float)) and not isinstance(value, bool)
|
||||
elif type_ == DeclaredOutputType.BOOLEAN:
|
||||
ok = isinstance(value, bool)
|
||||
elif type_ == DeclaredOutputType.OBJECT:
|
||||
ok = isinstance(value, dict)
|
||||
elif type_ == DeclaredOutputType.ARRAY:
|
||||
ok = isinstance(value, list)
|
||||
elif type_ == DeclaredOutputType.FILE:
|
||||
ok = isinstance(value, dict) and "file_id" in value
|
||||
else:
|
||||
ok = False
|
||||
if not ok:
|
||||
raise ValueError(
|
||||
f"default_value shape does not match output type {type_.value!r}: got {type(value).__name__}"
|
||||
)
|
||||
|
||||
|
||||
# PRD §OUTPUT 配置框 0522 共识: "Output 如果没有配置,则 text, files, json"
|
||||
# The runtime injects these when ``declared_outputs`` is empty (stage 4 §4.1, D-3).
|
||||
# Not persisted; mutating this constant changes UI defaults globally.
|
||||
DEFAULT_DECLARED_OUTPUTS: Final[tuple[DeclaredOutputConfig, ...]] = (
|
||||
DeclaredOutputConfig(
|
||||
name="text",
|
||||
type=DeclaredOutputType.STRING,
|
||||
required=False,
|
||||
description="Free-form text answer.",
|
||||
),
|
||||
DeclaredOutputConfig(
|
||||
name="files",
|
||||
type=DeclaredOutputType.ARRAY,
|
||||
required=False,
|
||||
description="Files produced by the agent.",
|
||||
array_item=DeclaredArrayItem(type=DeclaredOutputType.FILE),
|
||||
),
|
||||
DeclaredOutputConfig(
|
||||
name="json",
|
||||
type=DeclaredOutputType.OBJECT,
|
||||
required=False,
|
||||
description="Free-form JSON object.",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def effective_declared_outputs(
|
||||
declared_outputs: list[DeclaredOutputConfig] | tuple[DeclaredOutputConfig, ...],
|
||||
) -> tuple[DeclaredOutputConfig, ...]:
|
||||
"""Return the outputs the runtime actually presents.
|
||||
|
||||
Returns ``declared_outputs`` unchanged when non-empty, otherwise the PRD
|
||||
defaults from ``DEFAULT_DECLARED_OUTPUTS``. Shared helper so Composer load
|
||||
responses, runtime request builder, and the Node Output Inspector all use
|
||||
the same fallback (stage 4 §4.1, decision D-3).
|
||||
"""
|
||||
if declared_outputs:
|
||||
return tuple(declared_outputs)
|
||||
return DEFAULT_DECLARED_OUTPUTS
|
||||
|
||||
|
||||
class WorkflowNodeJobConfig(BaseModel):
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
@ -12153,19 +12153,44 @@ Condition detail
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| DebugPermission | string | | |
|
||||
|
||||
#### DeclaredArrayItem
|
||||
|
||||
Per-item shape for an ``array``-typed declared output.
|
||||
|
||||
PRD §OUTPUT 配置框 keeps arrays one level deep on first version; nested arrays
|
||||
are rejected so the runtime type checker and JSON Schema stay easy to reason
|
||||
about. Stage 4 §4.2.
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| description | string | | No |
|
||||
| type | [DeclaredOutputType](#declaredoutputtype) | | Yes |
|
||||
|
||||
#### DeclaredOutputCheckConfig
|
||||
|
||||
File-output content check via a model-based comparison against a benchmark file.
|
||||
|
||||
Per PRD §OUTPUT 配置框, output check is **file-only** and optional. Stage 4 §4.3.
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| benchmark_file_ref | object | | No |
|
||||
| enabled | boolean | | No |
|
||||
| model_ref | object | | No |
|
||||
| prompt | string | | No |
|
||||
| type | string | | Yes |
|
||||
|
||||
#### DeclaredOutputConfig
|
||||
|
||||
One declared output of a Workflow Agent Node.
|
||||
|
||||
Stage 4 normalizes the shape: ``check`` is singular (was ``checks: list`` in
|
||||
stage 3), and ``failure_strategy`` defaults to a populated value so runtime
|
||||
code can call ``output.failure_strategy.on_failure`` without None-guards.
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| checks | [ [DeclaredOutputCheckConfig](#declaredoutputcheckconfig) ] | | No |
|
||||
| array_item | [DeclaredArrayItem](#declaredarrayitem) | | No |
|
||||
| check | [DeclaredOutputCheckConfig](#declaredoutputcheckconfig) | | No |
|
||||
| description | string | | No |
|
||||
| failure_strategy | [DeclaredOutputFailureStrategy](#declaredoutputfailurestrategy) | | No |
|
||||
| file | [DeclaredOutputFileConfig](#declaredoutputfileconfig) | | No |
|
||||
@ -12176,19 +12201,36 @@ Condition detail
|
||||
|
||||
#### DeclaredOutputFailureStrategy
|
||||
|
||||
Per-output failure handling.
|
||||
|
||||
A single strategy applies to both ``type_check`` and ``output_check`` failures
|
||||
(PRD does not distinguish them at the UX level). Stage 4 §4.4.
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| max_retries | integer | | No |
|
||||
| on_output_check_failed | string | | No |
|
||||
| on_type_check_failed | string | | No |
|
||||
| default_value | | | No |
|
||||
| on_failure | [OutputErrorStrategy](#outputerrorstrategy) | | No |
|
||||
| retry | [DeclaredOutputRetryConfig](#declaredoutputretryconfig) | | No |
|
||||
|
||||
#### DeclaredOutputFileConfig
|
||||
|
||||
File-type output metadata. Both lists empty means "any file accepted".
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| extensions | [ string ] | | No |
|
||||
| mime_types | [ string ] | | No |
|
||||
|
||||
#### DeclaredOutputRetryConfig
|
||||
|
||||
Per-output retry configuration that mirrors ``graphon.RetryConfig`` shape.
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| enabled | boolean | | No |
|
||||
| max_retries | integer | | No |
|
||||
| retry_interval_ms | integer | | No |
|
||||
|
||||
#### DeclaredOutputType
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
@ -13584,6 +13626,18 @@ Enum class for model type.
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| result | string | Operation result | Yes |
|
||||
|
||||
#### OutputErrorStrategy
|
||||
|
||||
Per-output failure handling strategy.
|
||||
|
||||
Mirrors ``graphon.ErrorStrategy`` but scoped to a single declared output of
|
||||
a Workflow Agent Node. The runtime applies the strategy after type check or
|
||||
output check fails and any configured retry attempts have been exhausted.
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| OutputErrorStrategy | string | Per-output failure handling strategy. Mirrors ``graphon.ErrorStrategy`` but scoped to a single declared output of a Workflow Agent Node. The runtime applies the strategy after type check or output check fails and any configured retry attempts have been exhausted. | |
|
||||
|
||||
#### OwnerTransferCheckPayload
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
|
||||
@ -16,6 +16,12 @@ from models.agent import (
|
||||
WorkflowAgentBindingType,
|
||||
WorkflowAgentNodeBinding,
|
||||
)
|
||||
from models.agent_config_entities import (
|
||||
DeclaredOutputConfig,
|
||||
)
|
||||
from models.agent_config_entities import (
|
||||
effective_declared_outputs as _effective_declared_outputs,
|
||||
)
|
||||
from models.workflow import Workflow
|
||||
from services.agent.composer_validator import ComposerConfigValidator
|
||||
from services.agent.errors import AgentNameConflictError, AgentNotFoundError, AgentVersionNotFoundError
|
||||
@ -681,6 +687,27 @@ class AgentComposerService:
|
||||
.limit(1)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _declared_outputs_from_binding(binding: WorkflowAgentNodeBinding) -> list[DeclaredOutputConfig]:
|
||||
"""Re-hydrate the binding's node_job_config into typed declared outputs.
|
||||
|
||||
node_job_config is stored as JSON / LongText; the typed view is needed
|
||||
so the effective_declared_outputs helper can fall back to defaults on
|
||||
an empty list without callers re-implementing the fallback.
|
||||
"""
|
||||
node_job = WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict)
|
||||
return list(node_job.declared_outputs)
|
||||
|
||||
@staticmethod
|
||||
def _serialize_effective_outputs(declared_outputs: list[DeclaredOutputConfig]) -> list[dict[str, Any]]:
|
||||
"""JSON-serialize the effective declared outputs (PRD defaults if empty).
|
||||
|
||||
Stage 4 decision D-3 keeps defaults out of the DB; this helper is the
|
||||
single place that injects them into the Composer load response so the
|
||||
wire shape stays consistent whether the user has declared anything yet.
|
||||
"""
|
||||
return [output.model_dump(mode="json") for output in _effective_declared_outputs(declared_outputs)]
|
||||
|
||||
@classmethod
|
||||
def _empty_workflow_state(cls, *, app_id: str, workflow_id: str, node_id: str) -> dict[str, Any]:
|
||||
return {
|
||||
@ -691,6 +718,9 @@ class AgentComposerService:
|
||||
"soul_lock": {"locked": False, "can_unlock": False, "reason": "workflow_only_empty"},
|
||||
"agent_soul": AgentSoulConfig().model_dump(mode="json"),
|
||||
"node_job": WorkflowNodeJobConfig().model_dump(mode="json"),
|
||||
# Stage 4 §4.1 / §10.1 (D-3): empty composer state still surfaces the
|
||||
# PRD defaults so the front-end has stable output names to render.
|
||||
"effective_declared_outputs": cls._serialize_effective_outputs([]),
|
||||
"save_options": [ComposerSaveStrategy.NODE_JOB_ONLY.value, ComposerSaveStrategy.SAVE_TO_ROSTER.value],
|
||||
"impact_summary": None,
|
||||
"app_id": app_id,
|
||||
@ -739,6 +769,11 @@ class AgentComposerService:
|
||||
if version
|
||||
else AgentSoulConfig().model_dump(mode="json"),
|
||||
"node_job": binding.node_job_config_dict,
|
||||
# Stage 4 §4.1 / §10.1 (D-3): when the saved node_job carries no
|
||||
# declared_outputs, surface the PRD defaults so the front-end can
|
||||
# render them as read-only chips. When user-defined outputs exist
|
||||
# this is the same list (so callers don't need to special-case).
|
||||
"effective_declared_outputs": cls._serialize_effective_outputs(cls._declared_outputs_from_binding(binding)),
|
||||
"save_options": save_options,
|
||||
"impact_summary": cls.calculate_impact(
|
||||
tenant_id=binding.tenant_id, current_snapshot_id=binding.current_snapshot_id
|
||||
|
||||
@ -99,6 +99,13 @@ def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCE
|
||||
},
|
||||
call_depth=0,
|
||||
)
|
||||
from core.workflow.nodes.agent_v2.output_failure_orchestrator import OutputFailureOrchestrator
|
||||
from core.workflow.nodes.agent_v2.output_type_checker import PerOutputTypeChecker
|
||||
|
||||
class _AlwaysAllowFileValidator:
|
||||
def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool:
|
||||
return True
|
||||
|
||||
return DifyAgentNode(
|
||||
node_id="agent-node",
|
||||
data=DifyAgentNodeData.model_validate({"type": BuiltinNodeTypes.AGENT, "version": "2"}),
|
||||
@ -109,6 +116,8 @@ def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCE
|
||||
agent_backend_client=FakeAgentBackendRunClient(scenario=scenario),
|
||||
event_adapter=AgentBackendRunEventAdapter(),
|
||||
output_adapter=WorkflowAgentOutputAdapter(),
|
||||
type_checker=PerOutputTypeChecker(file_validator=_AlwaysAllowFileValidator()),
|
||||
failure_orchestrator=OutputFailureOrchestrator(),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,53 @@
|
||||
"""Defensive tests for UploadFileTenantValidator (Stage 4 §5.3).
|
||||
|
||||
The validator must never raise on pathological inputs: the Agent backend may
|
||||
hand us garbage in the ``file_id`` slot because the protocol layer only
|
||||
asserts ``{"type": "string"}``. Anything that isn't a real UUID belonging to
|
||||
the current tenant should simply return False.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from core.workflow.nodes.agent_v2.file_tenant_validator import UploadFileTenantValidator
|
||||
|
||||
|
||||
def test_empty_inputs_return_false_without_db_hit():
|
||||
validator = UploadFileTenantValidator()
|
||||
with patch("core.workflow.nodes.agent_v2.file_tenant_validator.session_factory") as factory:
|
||||
assert validator.is_owned_by_tenant(file_id="", tenant_id="tenant-1") is False
|
||||
assert validator.is_owned_by_tenant(file_id="abc", tenant_id="") is False
|
||||
factory.create_session.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"bad_file_id",
|
||||
[
|
||||
"not-a-uuid",
|
||||
"this-id-does-not-exist",
|
||||
"0123",
|
||||
"🤖🤖🤖",
|
||||
"../../etc/passwd",
|
||||
"550e8400-e29b-41d4-a716-446655440000-trailing",
|
||||
],
|
||||
)
|
||||
def test_non_uuid_file_ids_return_false_without_db_hit(bad_file_id: str):
|
||||
validator = UploadFileTenantValidator()
|
||||
with patch("core.workflow.nodes.agent_v2.file_tenant_validator.session_factory") as factory:
|
||||
assert validator.is_owned_by_tenant(file_id=bad_file_id, tenant_id="tenant-1") is False
|
||||
factory.create_session.assert_not_called()
|
||||
|
||||
|
||||
def test_db_error_swallowed_and_returns_false():
|
||||
"""Any DB-level fault (timeout, dialect quirk, connection drop) must reject
|
||||
the file rather than crash the workflow node."""
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
validator = UploadFileTenantValidator()
|
||||
valid_uuid = "550e8400-e29b-41d4-a716-446655440000"
|
||||
with patch("core.workflow.nodes.agent_v2.file_tenant_validator.session_factory") as factory:
|
||||
factory.create_session.return_value.__enter__.return_value.scalar.side_effect = SQLAlchemyError("boom")
|
||||
assert validator.is_owned_by_tenant(file_id=valid_uuid, tenant_id="tenant-1") is False
|
||||
@ -0,0 +1,229 @@
|
||||
"""Unit tests for OutputFailureOrchestrator decision logic.
|
||||
|
||||
Stage 4 §7.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from core.workflow.nodes.agent_v2.output_failure_orchestrator import (
|
||||
FailedOutput,
|
||||
OutputFailureDecision,
|
||||
OutputFailureKind,
|
||||
OutputFailureOrchestrator,
|
||||
retry_idempotency_key,
|
||||
)
|
||||
from models.agent_config_entities import (
|
||||
DeclaredOutputConfig,
|
||||
DeclaredOutputFailureStrategy,
|
||||
DeclaredOutputRetryConfig,
|
||||
DeclaredOutputType,
|
||||
OutputErrorStrategy,
|
||||
)
|
||||
|
||||
|
||||
def _output(
|
||||
name: str,
|
||||
*,
|
||||
on_failure: OutputErrorStrategy = OutputErrorStrategy.STOP,
|
||||
max_retries: int = 0,
|
||||
retry_enabled: bool = False,
|
||||
default_value=None,
|
||||
) -> DeclaredOutputConfig:
|
||||
strategy = DeclaredOutputFailureStrategy(
|
||||
retry=DeclaredOutputRetryConfig(enabled=retry_enabled, max_retries=max_retries),
|
||||
on_failure=on_failure,
|
||||
default_value=default_value,
|
||||
)
|
||||
return DeclaredOutputConfig(name=name, type=DeclaredOutputType.STRING, failure_strategy=strategy)
|
||||
|
||||
|
||||
def _failure(declared: DeclaredOutputConfig, kind: OutputFailureKind = OutputFailureKind.TYPE_CHECK) -> FailedOutput:
|
||||
return FailedOutput(declared=declared, failure_kind=kind, reason="boom")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Retry budget
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_retry_within_budget_returns_retry_and_increments_attempt():
|
||||
orch = OutputFailureOrchestrator()
|
||||
declared = _output("x", retry_enabled=True, max_retries=2)
|
||||
|
||||
outcome = orch.decide(failures=[_failure(declared)], current_attempt=0)
|
||||
|
||||
assert outcome.decision == OutputFailureDecision.RETRY
|
||||
assert outcome.next_attempt == 1
|
||||
|
||||
|
||||
def test_retry_budget_uses_max_across_multiple_outputs():
|
||||
"""If two outputs fail and their retry budgets differ, retry continues
|
||||
until the *larger* budget is spent (§7)."""
|
||||
orch = OutputFailureOrchestrator()
|
||||
out_a = _output("a", retry_enabled=True, max_retries=1)
|
||||
out_b = _output("b", retry_enabled=True, max_retries=3)
|
||||
|
||||
# attempt=1: a's budget already spent, but b still has budget → still retry
|
||||
outcome = orch.decide(failures=[_failure(out_a), _failure(out_b)], current_attempt=1)
|
||||
assert outcome.decision == OutputFailureDecision.RETRY
|
||||
assert outcome.next_attempt == 2
|
||||
|
||||
|
||||
def test_retry_disabled_skips_retry_even_with_max_retries_set():
|
||||
orch = OutputFailureOrchestrator()
|
||||
declared = _output("x", retry_enabled=False, max_retries=5) # disabled wins
|
||||
|
||||
outcome = orch.decide(failures=[_failure(declared)], current_attempt=0)
|
||||
|
||||
assert outcome.decision != OutputFailureDecision.RETRY
|
||||
|
||||
|
||||
def test_retry_budget_exhausted_falls_to_terminal():
|
||||
orch = OutputFailureOrchestrator()
|
||||
declared = _output(
|
||||
"x", retry_enabled=True, max_retries=1, on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="fallback"
|
||||
)
|
||||
|
||||
outcome = orch.decide(failures=[_failure(declared)], current_attempt=1) # already at max
|
||||
|
||||
assert outcome.decision == OutputFailureDecision.USE_DEFAULT
|
||||
assert outcome.per_output_actions == {"x": "fallback"}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Terminal decisions
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_stop_terminal_returns_fail_node():
|
||||
orch = OutputFailureOrchestrator()
|
||||
declared = _output("x", on_failure=OutputErrorStrategy.STOP)
|
||||
|
||||
outcome = orch.decide(failures=[_failure(declared)], current_attempt=0)
|
||||
|
||||
assert outcome.decision == OutputFailureDecision.FAIL_NODE
|
||||
assert outcome.per_output_actions == {}
|
||||
|
||||
|
||||
def test_default_value_terminal_collects_per_output_defaults():
|
||||
orch = OutputFailureOrchestrator()
|
||||
out_a = _output("a", on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="A")
|
||||
out_b = _output("b", on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="B")
|
||||
|
||||
outcome = orch.decide(failures=[_failure(out_a), _failure(out_b)], current_attempt=0)
|
||||
|
||||
assert outcome.decision == OutputFailureDecision.USE_DEFAULT
|
||||
assert outcome.per_output_actions == {"a": "A", "b": "B"}
|
||||
|
||||
|
||||
def test_fail_branch_terminal_returns_take_fail_branch():
|
||||
orch = OutputFailureOrchestrator()
|
||||
declared = _output("x", on_failure=OutputErrorStrategy.FAIL_BRANCH)
|
||||
|
||||
outcome = orch.decide(failures=[_failure(declared)], current_attempt=0)
|
||||
|
||||
assert outcome.decision == OutputFailureDecision.TAKE_FAIL_BRANCH
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Multi-output precedence: FAIL_BRANCH > FAIL_NODE > USE_DEFAULT
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_fail_branch_wins_over_stop_and_default_value():
|
||||
orch = OutputFailureOrchestrator()
|
||||
out_a = _output("a", on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="x")
|
||||
out_b = _output("b", on_failure=OutputErrorStrategy.STOP)
|
||||
out_c = _output("c", on_failure=OutputErrorStrategy.FAIL_BRANCH)
|
||||
|
||||
outcome = orch.decide(
|
||||
failures=[_failure(out_a), _failure(out_b), _failure(out_c)],
|
||||
current_attempt=0,
|
||||
)
|
||||
|
||||
assert outcome.decision == OutputFailureDecision.TAKE_FAIL_BRANCH
|
||||
# USE_DEFAULT was overridden → no per-output actions surfaced.
|
||||
assert outcome.per_output_actions == {}
|
||||
|
||||
|
||||
def test_stop_wins_over_default_value_when_no_fail_branch():
|
||||
orch = OutputFailureOrchestrator()
|
||||
out_a = _output("a", on_failure=OutputErrorStrategy.DEFAULT_VALUE, default_value="x")
|
||||
out_b = _output("b", on_failure=OutputErrorStrategy.STOP)
|
||||
|
||||
outcome = orch.decide(failures=[_failure(out_a), _failure(out_b)], current_attempt=0)
|
||||
|
||||
assert outcome.decision == OutputFailureDecision.FAIL_NODE
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Output check vs type check failure kinds
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_failure_kinds_dedupes_and_preserves_order():
|
||||
orch = OutputFailureOrchestrator()
|
||||
decl = _output("x")
|
||||
failures = [
|
||||
FailedOutput(declared=decl, failure_kind=OutputFailureKind.TYPE_CHECK, reason="r1"),
|
||||
FailedOutput(declared=decl, failure_kind=OutputFailureKind.OUTPUT_CHECK, reason="r2"),
|
||||
FailedOutput(declared=decl, failure_kind=OutputFailureKind.TYPE_CHECK, reason="r3"),
|
||||
]
|
||||
|
||||
outcome = orch.decide(failures=failures, current_attempt=0)
|
||||
|
||||
assert outcome.failure_kinds == (OutputFailureKind.TYPE_CHECK, OutputFailureKind.OUTPUT_CHECK)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Reason summarization
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_primary_reason_includes_every_failure():
|
||||
orch = OutputFailureOrchestrator()
|
||||
out_a = _output("a")
|
||||
out_b = _output("b")
|
||||
|
||||
outcome = orch.decide(
|
||||
failures=[
|
||||
FailedOutput(declared=out_a, failure_kind=OutputFailureKind.TYPE_CHECK, reason="expected str"),
|
||||
FailedOutput(declared=out_b, failure_kind=OutputFailureKind.OUTPUT_CHECK, reason="missing section 3"),
|
||||
],
|
||||
current_attempt=0,
|
||||
)
|
||||
|
||||
assert "a[type_check]: expected str" in outcome.primary_reason
|
||||
assert "b[output_check]: missing section 3" in outcome.primary_reason
|
||||
|
||||
|
||||
def test_no_failures_raises():
|
||||
orch = OutputFailureOrchestrator()
|
||||
with pytest.raises(ValueError, match="at least one failure"):
|
||||
orch.decide(failures=[], current_attempt=0)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Idempotency key helper (decision D-4)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_idempotency_key_first_attempt_matches_pre_stage_4_shape():
|
||||
key = retry_idempotency_key(workflow_run_id="run-1", node_execution_id="node-exec-1", attempt=0)
|
||||
assert key == "run-1:node-exec-1"
|
||||
|
||||
|
||||
def test_idempotency_key_appends_retry_attempt_for_retries():
|
||||
assert (
|
||||
retry_idempotency_key(workflow_run_id="run-1", node_execution_id="node-exec-1", attempt=2)
|
||||
== "run-1:node-exec-1:retry-2"
|
||||
)
|
||||
|
||||
|
||||
def test_idempotency_key_handles_missing_workflow_run_id():
|
||||
assert retry_idempotency_key(workflow_run_id=None, node_execution_id="node-exec-9", attempt=0) == "node-exec-9"
|
||||
assert (
|
||||
retry_idempotency_key(workflow_run_id=None, node_execution_id="node-exec-9", attempt=1) == "node-exec-9:retry-1"
|
||||
)
|
||||
@ -0,0 +1,256 @@
|
||||
"""Unit tests for PerOutputTypeChecker.
|
||||
|
||||
Stage 4 §5.1-§5.3.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
|
||||
import pytest
|
||||
|
||||
from core.workflow.nodes.agent_v2.output_type_checker import (
|
||||
OutputTypeCheckStatus,
|
||||
PerOutputTypeChecker,
|
||||
)
|
||||
from models.agent_config_entities import (
|
||||
DeclaredArrayItem,
|
||||
DeclaredOutputConfig,
|
||||
DeclaredOutputType,
|
||||
)
|
||||
|
||||
|
||||
class StubFileValidator:
|
||||
"""Trivially records the set of file_ids that pass tenant scope."""
|
||||
|
||||
def __init__(self, *, allowed: Mapping[str, set[str]] | None = None) -> None:
|
||||
# Mapping: tenant_id -> {file_id, ...}
|
||||
self._allowed = {tenant: set(ids) for tenant, ids in (allowed or {}).items()}
|
||||
|
||||
def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool:
|
||||
return file_id in self._allowed.get(tenant_id, set())
|
||||
|
||||
|
||||
def _str_output(name: str = "summary", required: bool = True) -> DeclaredOutputConfig:
|
||||
return DeclaredOutputConfig(name=name, type=DeclaredOutputType.STRING, required=required)
|
||||
|
||||
|
||||
def _make_checker(*, allowed: Mapping[str, set[str]] | None = None) -> PerOutputTypeChecker:
|
||||
return PerOutputTypeChecker(file_validator=StubFileValidator(allowed=allowed))
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Happy path per type
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_all_scalar_types_ready_on_correct_payload():
|
||||
checker = _make_checker()
|
||||
declared = [
|
||||
DeclaredOutputConfig(name="s", type=DeclaredOutputType.STRING),
|
||||
DeclaredOutputConfig(name="n", type=DeclaredOutputType.NUMBER),
|
||||
DeclaredOutputConfig(name="b", type=DeclaredOutputType.BOOLEAN),
|
||||
DeclaredOutputConfig(name="o", type=DeclaredOutputType.OBJECT),
|
||||
]
|
||||
payload = {"s": "hello", "n": 3.14, "b": True, "o": {"k": "v"}}
|
||||
|
||||
outcome = checker.check(declared_outputs=declared, raw_output=payload, tenant_id="t-1")
|
||||
|
||||
assert not outcome.has_failures
|
||||
assert {r.name: r.status for r in outcome.results} == {
|
||||
"s": OutputTypeCheckStatus.READY,
|
||||
"n": OutputTypeCheckStatus.READY,
|
||||
"b": OutputTypeCheckStatus.READY,
|
||||
"o": OutputTypeCheckStatus.READY,
|
||||
}
|
||||
|
||||
|
||||
def test_number_rejects_bool_even_though_python_says_isinstance_int():
|
||||
checker = _make_checker()
|
||||
outcome = checker.check(
|
||||
declared_outputs=[DeclaredOutputConfig(name="n", type=DeclaredOutputType.NUMBER)],
|
||||
raw_output={"n": True},
|
||||
tenant_id="t-1",
|
||||
)
|
||||
|
||||
assert outcome.has_failures
|
||||
assert outcome.failures[0].reason == "expected number, got bool"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("declared_type", "wrong_value", "expected_kind"),
|
||||
[
|
||||
(DeclaredOutputType.STRING, 123, "int"),
|
||||
(DeclaredOutputType.NUMBER, "x", "str"),
|
||||
(DeclaredOutputType.BOOLEAN, "yes", "str"),
|
||||
(DeclaredOutputType.OBJECT, [1, 2], "list"),
|
||||
],
|
||||
)
|
||||
def test_type_mismatch_reported_with_actual_kind(declared_type, wrong_value, expected_kind):
|
||||
checker = _make_checker()
|
||||
outcome = checker.check(
|
||||
declared_outputs=[DeclaredOutputConfig(name="x", type=declared_type)],
|
||||
raw_output={"x": wrong_value},
|
||||
tenant_id="t-1",
|
||||
)
|
||||
|
||||
assert outcome.failures[0].status == OutputTypeCheckStatus.TYPE_CHECK_FAILED
|
||||
assert expected_kind in (outcome.failures[0].reason or "")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Array + array_item recursion
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_array_of_strings_validates_each_item():
|
||||
checker = _make_checker()
|
||||
declared = DeclaredOutputConfig(
|
||||
name="tags",
|
||||
type=DeclaredOutputType.ARRAY,
|
||||
array_item=DeclaredArrayItem(type=DeclaredOutputType.STRING),
|
||||
)
|
||||
|
||||
ok = checker.check(declared_outputs=[declared], raw_output={"tags": ["a", "b"]}, tenant_id="t-1")
|
||||
bad = checker.check(declared_outputs=[declared], raw_output={"tags": ["a", 2]}, tenant_id="t-1")
|
||||
|
||||
assert not ok.has_failures
|
||||
reason = bad.failures[0].reason
|
||||
assert reason is not None
|
||||
assert reason.startswith("items[1]:")
|
||||
|
||||
|
||||
def test_array_of_files_validates_per_item_file_ref():
|
||||
checker = _make_checker(allowed={"t-1": {"file-A"}})
|
||||
declared = DeclaredOutputConfig(
|
||||
name="docs",
|
||||
type=DeclaredOutputType.ARRAY,
|
||||
array_item=DeclaredArrayItem(type=DeclaredOutputType.FILE),
|
||||
)
|
||||
|
||||
ok = checker.check(
|
||||
declared_outputs=[declared],
|
||||
raw_output={"docs": [{"file_id": "file-A", "filename": "a.pdf"}]},
|
||||
tenant_id="t-1",
|
||||
)
|
||||
cross_tenant = checker.check(
|
||||
declared_outputs=[declared],
|
||||
raw_output={"docs": [{"file_id": "other-tenant-file", "filename": "x.pdf"}]},
|
||||
tenant_id="t-1",
|
||||
)
|
||||
|
||||
assert not ok.has_failures
|
||||
assert cross_tenant.failures[0].reason is not None
|
||||
assert "not accessible" in cross_tenant.failures[0].reason
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# File ref tenant scope
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_file_ref_must_be_tenant_owned():
|
||||
checker = _make_checker(allowed={"t-1": {"my-file"}})
|
||||
declared = DeclaredOutputConfig(name="report", type=DeclaredOutputType.FILE)
|
||||
|
||||
outcome = checker.check(
|
||||
declared_outputs=[declared],
|
||||
raw_output={"report": {"file_id": "my-file", "filename": "r.pdf"}},
|
||||
tenant_id="t-1",
|
||||
)
|
||||
assert not outcome.has_failures
|
||||
|
||||
outcome = checker.check(
|
||||
declared_outputs=[declared],
|
||||
raw_output={"report": {"file_id": "other", "filename": "r.pdf"}},
|
||||
tenant_id="t-1",
|
||||
)
|
||||
assert outcome.has_failures
|
||||
|
||||
|
||||
def test_file_ref_missing_id_field_fails():
|
||||
checker = _make_checker()
|
||||
declared = DeclaredOutputConfig(name="r", type=DeclaredOutputType.FILE)
|
||||
|
||||
outcome = checker.check(
|
||||
declared_outputs=[declared],
|
||||
raw_output={"r": {"filename": "x.pdf"}}, # no file_id / upload_file_id / tool_file_id
|
||||
tenant_id="t-1",
|
||||
)
|
||||
|
||||
assert outcome.failures[0].reason == "file ref missing a recognized file_id field"
|
||||
|
||||
|
||||
def test_file_ref_accepts_upload_file_id_alias():
|
||||
checker = _make_checker(allowed={"t-1": {"alt-file"}})
|
||||
declared = DeclaredOutputConfig(name="r", type=DeclaredOutputType.FILE)
|
||||
|
||||
outcome = checker.check(
|
||||
declared_outputs=[declared],
|
||||
raw_output={"r": {"upload_file_id": "alt-file"}},
|
||||
tenant_id="t-1",
|
||||
)
|
||||
|
||||
assert not outcome.has_failures
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Missing values + required flag
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_optional_output_missing_is_not_produced_not_a_failure():
|
||||
checker = _make_checker()
|
||||
declared = [
|
||||
DeclaredOutputConfig(name="opt", type=DeclaredOutputType.STRING, required=False),
|
||||
]
|
||||
|
||||
outcome = checker.check(declared_outputs=declared, raw_output={}, tenant_id="t-1")
|
||||
|
||||
assert not outcome.has_failures
|
||||
assert outcome.results[0].status == OutputTypeCheckStatus.NOT_PRODUCED
|
||||
|
||||
|
||||
def test_required_output_missing_is_failure():
|
||||
checker = _make_checker()
|
||||
declared = [_str_output(required=True)]
|
||||
|
||||
outcome = checker.check(declared_outputs=declared, raw_output={}, tenant_id="t-1")
|
||||
|
||||
assert outcome.has_failures
|
||||
assert outcome.failures[0].reason is not None
|
||||
assert "missing" in outcome.failures[0].reason
|
||||
|
||||
|
||||
def test_non_dict_payload_fails_all_required_outputs():
|
||||
"""If backend returns a string instead of an object, every required declared
|
||||
output is marked failed; optional outputs are marked failed too because we
|
||||
can't even attempt a lookup."""
|
||||
checker = _make_checker()
|
||||
declared = [_str_output("req", required=True), _str_output("opt", required=False)]
|
||||
|
||||
outcome = checker.check(declared_outputs=declared, raw_output="raw text", tenant_id="t-1")
|
||||
|
||||
assert all(r.status == OutputTypeCheckStatus.TYPE_CHECK_FAILED for r in outcome.results)
|
||||
assert outcome.failures[0].reason == "Backend output is not a JSON object."
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Aggregation helpers
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_outcome_by_name_indexes_results():
|
||||
checker = _make_checker()
|
||||
declared = [_str_output("a"), _str_output("b")]
|
||||
|
||||
outcome = checker.check(
|
||||
declared_outputs=declared,
|
||||
raw_output={"a": "x", "b": 1}, # b wrong type
|
||||
tenant_id="t-1",
|
||||
)
|
||||
|
||||
indexed = outcome.by_name()
|
||||
assert indexed["a"].status == OutputTypeCheckStatus.READY
|
||||
assert indexed["b"].status == OutputTypeCheckStatus.TYPE_CHECK_FAILED
|
||||
assert len(outcome.failures) == 1
|
||||
@ -217,3 +217,79 @@ def test_invalid_previous_node_output_ref_fails_request_build():
|
||||
WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
|
||||
|
||||
assert exc_info.value.error_code == "invalid_previous_node_output_ref"
|
||||
|
||||
|
||||
def test_empty_declared_outputs_injects_prd_defaults_text_files_json():
|
||||
"""Stage 4 §4.1 (D-3): empty declared_outputs → backend receives the PRD defaults
|
||||
(text / files / json) as a stable structured-output contract."""
|
||||
context = _context()
|
||||
binding = WorkflowAgentNodeBinding(
|
||||
id="binding-1",
|
||||
tenant_id="tenant-1",
|
||||
app_id="app-1",
|
||||
workflow_id="workflow-1",
|
||||
node_id="agent-node",
|
||||
agent_id="agent-1",
|
||||
current_snapshot_id="snapshot-1",
|
||||
node_job_config=WorkflowNodeJobConfig.model_validate({}),
|
||||
)
|
||||
context = replace(context, binding=binding)
|
||||
|
||||
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
|
||||
|
||||
dumped = result.request.model_dump(mode="json")
|
||||
output_layer = dumped["composition"]["layers"][-1]["config"]
|
||||
properties = output_layer["json_schema"]["properties"]
|
||||
assert set(properties) == {"text", "files", "json"}
|
||||
assert properties["text"]["type"] == "string"
|
||||
assert properties["files"]["type"] == "array"
|
||||
# `files` defaults to array<file> → items is a file ref object.
|
||||
assert properties["files"]["items"]["properties"]["file_id"]["type"] == "string"
|
||||
assert properties["json"]["type"] == "object"
|
||||
# Defaults are all required=False so no `required:` key on the schema.
|
||||
assert "required" not in output_layer["json_schema"]
|
||||
|
||||
|
||||
def test_array_output_emits_typed_items_per_array_item():
|
||||
context = _context()
|
||||
binding = WorkflowAgentNodeBinding(
|
||||
id="binding-1",
|
||||
tenant_id="tenant-1",
|
||||
app_id="app-1",
|
||||
workflow_id="workflow-1",
|
||||
node_id="agent-node",
|
||||
agent_id="agent-1",
|
||||
current_snapshot_id="snapshot-1",
|
||||
node_job_config=WorkflowNodeJobConfig.model_validate(
|
||||
{
|
||||
"declared_outputs": [
|
||||
{
|
||||
"name": "tags",
|
||||
"type": "array",
|
||||
"array_item": {"type": "string", "description": "topic tag"},
|
||||
"required": True,
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
)
|
||||
context = replace(context, binding=binding)
|
||||
|
||||
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
|
||||
|
||||
output_schema = result.request.model_dump(mode="json")["composition"]["layers"][-1]["config"]["json_schema"]
|
||||
tags_schema = output_schema["properties"]["tags"]
|
||||
assert tags_schema["type"] == "array"
|
||||
assert tags_schema["items"]["type"] == "string"
|
||||
assert tags_schema["items"]["description"] == "topic tag"
|
||||
assert output_schema["required"] == ["tags"]
|
||||
|
||||
|
||||
def test_effective_declared_outputs_passthrough_when_user_declared():
|
||||
"""effective_declared_outputs() must return user-provided outputs verbatim
|
||||
when non-empty; only empty input gets PRD defaults injected."""
|
||||
from models.agent_config_entities import DeclaredOutputConfig
|
||||
|
||||
declared = [DeclaredOutputConfig(name="summary", type=DeclaredOutputType.STRING)]
|
||||
effective = WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(declared)
|
||||
assert list(effective) == declared
|
||||
|
||||
@ -240,7 +240,11 @@ def test_publish_validation_accepts_tenant_scoped_file_ref():
|
||||
{
|
||||
"name": "report",
|
||||
"type": "file",
|
||||
"checks": [{"type": "benchmark", "benchmark_file_ref": {"upload_file_id": "file-1"}}],
|
||||
"check": {
|
||||
"enabled": True,
|
||||
"prompt": "Report must include a risk summary.",
|
||||
"benchmark_file_ref": {"upload_file_id": "file-1"},
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@ -105,6 +105,8 @@ def test_agent_soul_model_config_is_first_class_without_credentials():
|
||||
|
||||
|
||||
def test_declared_outputs_support_file_check_and_failure_strategy():
|
||||
"""Stage 4 §4.3 + §4.4: file output may carry a single ``check`` plus a
|
||||
full LLM-node-parity ``failure_strategy``."""
|
||||
node_job = WorkflowNodeJobConfig.model_validate(
|
||||
{
|
||||
"declared_outputs": [
|
||||
@ -112,17 +114,14 @@ def test_declared_outputs_support_file_check_and_failure_strategy():
|
||||
"name": "analysis_report",
|
||||
"type": "file",
|
||||
"file": {"extensions": [".pdf"], "mime_types": ["application/pdf"]},
|
||||
"checks": [
|
||||
{
|
||||
"type": "benchmark_file",
|
||||
"prompt": "Report must include risk summary.",
|
||||
"benchmark_file_ref": {"upload_file_id": "file-1"},
|
||||
}
|
||||
],
|
||||
"check": {
|
||||
"enabled": True,
|
||||
"prompt": "Report must include risk summary.",
|
||||
"benchmark_file_ref": {"upload_file_id": "file-1"},
|
||||
},
|
||||
"failure_strategy": {
|
||||
"on_type_check_failed": "fail_node",
|
||||
"on_output_check_failed": "retry",
|
||||
"max_retries": 1,
|
||||
"retry": {"enabled": True, "max_retries": 1, "retry_interval_ms": 500},
|
||||
"on_failure": "fail_branch",
|
||||
},
|
||||
}
|
||||
]
|
||||
@ -133,9 +132,12 @@ def test_declared_outputs_support_file_check_and_failure_strategy():
|
||||
assert output.type == DeclaredOutputType.FILE
|
||||
assert output.file is not None
|
||||
assert output.file.extensions == [".pdf"]
|
||||
assert output.checks[0].type == "benchmark_file"
|
||||
assert output.failure_strategy is not None
|
||||
assert output.failure_strategy.max_retries == 1
|
||||
assert output.check is not None
|
||||
assert output.check.enabled is True
|
||||
assert output.check.prompt == "Report must include risk summary."
|
||||
assert output.failure_strategy.retry.enabled is True
|
||||
assert output.failure_strategy.retry.max_retries == 1
|
||||
assert output.failure_strategy.on_failure.value == "fail_branch"
|
||||
|
||||
|
||||
def test_plaintext_secrets_are_rejected():
|
||||
|
||||
@ -73,6 +73,13 @@ def test_load_workflow_composer_returns_empty_state(monkeypatch):
|
||||
assert result["binding"] is None
|
||||
assert result["save_options"] == ["node_job_only", "save_to_roster"]
|
||||
assert result["workflow_id"] == "workflow-1"
|
||||
# Stage 4 §4.1 / §10.1 (D-3): empty state still surfaces PRD defaults so
|
||||
# the front-end has stable output names to render before the user declares
|
||||
# anything.
|
||||
effective = result["effective_declared_outputs"]
|
||||
assert [o["name"] for o in effective] == ["text", "files", "json"]
|
||||
files_output = next(o for o in effective if o["name"] == "files")
|
||||
assert files_output["array_item"] == {"type": "file", "description": None}
|
||||
|
||||
|
||||
def test_load_workflow_composer_serializes_existing_binding(monkeypatch):
|
||||
@ -257,6 +264,37 @@ def test_serialize_workflow_state_changes_lock_and_save_options(monkeypatch):
|
||||
assert state["soul_lock"]["locked"] is True
|
||||
assert "save_as_new_version" in state["save_options"]
|
||||
assert state["agent_soul"]["app_features"] == {}
|
||||
# Stage 4 §10.1 (D-3): binding with no declared_outputs → response surfaces
|
||||
# PRD defaults via effective_declared_outputs (DB row remains untouched).
|
||||
effective_names = [o["name"] for o in state["effective_declared_outputs"]]
|
||||
assert effective_names == ["text", "files", "json"]
|
||||
|
||||
|
||||
def test_serialize_workflow_state_passes_user_declared_outputs_through_effective(monkeypatch):
|
||||
binding = WorkflowAgentNodeBinding(
|
||||
id="binding-1",
|
||||
tenant_id="tenant-1",
|
||||
binding_type=WorkflowAgentBindingType.ROSTER_AGENT,
|
||||
agent_id="agent-1",
|
||||
current_snapshot_id="version-1",
|
||||
workflow_id="workflow-1",
|
||||
node_id="node-1",
|
||||
node_job_config=(
|
||||
'{"workflow_prompt":"work","declared_outputs":[{"name":"summary","type":"string","required":true}]}'
|
||||
),
|
||||
)
|
||||
agent = Agent(id="agent-1", name="Analyst", description="", scope=AgentScope.ROSTER, status=AgentStatus.ACTIVE)
|
||||
version = AgentConfigSnapshot(id="version-1", version=1, config_snapshot='{"prompt":{"system_prompt":"x"}}')
|
||||
monkeypatch.setattr(AgentComposerService, "calculate_impact", lambda **kwargs: {"workflow_node_count": 1})
|
||||
|
||||
state = AgentComposerService._serialize_workflow_state(binding=binding, agent=agent, version=version)
|
||||
|
||||
# When the user has declared outputs, effective_declared_outputs is the same
|
||||
# list (no defaults injected).
|
||||
effective = state["effective_declared_outputs"]
|
||||
assert [o["name"] for o in effective] == ["summary"]
|
||||
assert effective[0]["type"] == "string"
|
||||
assert effective[0]["required"] is True
|
||||
|
||||
|
||||
def test_composer_save_helpers_create_and_rebind_agents(monkeypatch):
|
||||
@ -573,3 +611,54 @@ def test_validator_dict_helpers_wrap_validation_errors():
|
||||
|
||||
assert valid_soul.prompt.system_prompt == "x"
|
||||
assert valid_node_job.workflow_prompt == "x"
|
||||
|
||||
|
||||
def test_composer_validator_rejects_stage_4_declared_output_violations():
|
||||
"""Stage 4 §10.1: the model-layer validators surface stage-4-specific
|
||||
violations through InvalidComposerConfigError so the Composer save endpoint
|
||||
reports them with the same error shape as other shape failures.
|
||||
"""
|
||||
# Output name violates the JSON-schema-friendly identifier pattern.
|
||||
with pytest.raises(InvalidComposerConfigError):
|
||||
ComposerConfigValidator.validate_node_job_dict({"declared_outputs": [{"name": "1bad", "type": "string"}]})
|
||||
|
||||
# Output check is enabled on a non-file output.
|
||||
with pytest.raises(InvalidComposerConfigError):
|
||||
ComposerConfigValidator.validate_node_job_dict(
|
||||
{
|
||||
"declared_outputs": [
|
||||
{
|
||||
"name": "text",
|
||||
"type": "string",
|
||||
"check": {
|
||||
"enabled": True,
|
||||
"prompt": "p",
|
||||
"benchmark_file_ref": {"file_id": "f"},
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
# default_value shape doesn't match the declared type.
|
||||
with pytest.raises(InvalidComposerConfigError):
|
||||
ComposerConfigValidator.validate_node_job_dict(
|
||||
{
|
||||
"declared_outputs": [
|
||||
{
|
||||
"name": "score",
|
||||
"type": "number",
|
||||
"failure_strategy": {
|
||||
"on_failure": "default_value",
|
||||
"default_value": "not a number",
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
# Nested array_item is rejected outright.
|
||||
with pytest.raises(InvalidComposerConfigError):
|
||||
ComposerConfigValidator.validate_node_job_dict(
|
||||
{"declared_outputs": [{"name": "matrix", "type": "array", "array_item": {"type": "array"}}]}
|
||||
)
|
||||
|
||||
@ -1436,7 +1436,8 @@ export type AgentSoulToolsConfig = {
|
||||
}
|
||||
|
||||
export type DeclaredOutputConfig = {
|
||||
checks?: Array<DeclaredOutputCheckConfig>
|
||||
array_item?: DeclaredArrayItem
|
||||
check?: DeclaredOutputCheckConfig
|
||||
description?: string | null
|
||||
failure_strategy?: DeclaredOutputFailureStrategy
|
||||
file?: DeclaredOutputFileConfig
|
||||
@ -1524,18 +1525,26 @@ export type AgentSoulModelCredentialRef = {
|
||||
type: string
|
||||
}
|
||||
|
||||
export type DeclaredArrayItem = {
|
||||
description?: string | null
|
||||
type: DeclaredOutputType
|
||||
}
|
||||
|
||||
export type DeclaredOutputCheckConfig = {
|
||||
benchmark_file_ref?: {
|
||||
[key: string]: unknown
|
||||
} | null
|
||||
enabled?: boolean
|
||||
model_ref?: {
|
||||
[key: string]: unknown
|
||||
} | null
|
||||
prompt?: string | null
|
||||
type: string
|
||||
}
|
||||
|
||||
export type DeclaredOutputFailureStrategy = {
|
||||
max_retries?: number
|
||||
on_output_check_failed?: string | null
|
||||
on_type_check_failed?: string | null
|
||||
default_value?: unknown
|
||||
on_failure?: OutputErrorStrategy
|
||||
retry?: DeclaredOutputRetryConfig
|
||||
}
|
||||
|
||||
export type DeclaredOutputFileConfig = {
|
||||
@ -1553,6 +1562,14 @@ export type UserActionConfig = {
|
||||
|
||||
export type FormInputConfig = unknown
|
||||
|
||||
export type OutputErrorStrategy = 'default_value' | 'fail_branch' | 'stop'
|
||||
|
||||
export type DeclaredOutputRetryConfig = {
|
||||
enabled?: boolean
|
||||
max_retries?: number
|
||||
retry_interval_ms?: number
|
||||
}
|
||||
|
||||
export type ButtonStyle = 'accent' | 'default' | 'ghost' | 'primary'
|
||||
|
||||
export type ParagraphInputConfig = {
|
||||
|
||||
@ -1792,24 +1792,22 @@ export const zAgentSoulConfig = z.object({
|
||||
|
||||
/**
|
||||
* DeclaredOutputCheckConfig
|
||||
*
|
||||
* File-output content check via a model-based comparison against a benchmark file.
|
||||
*
|
||||
* Per PRD §OUTPUT 配置框, output check is **file-only** and optional. Stage 4 §4.3.
|
||||
*/
|
||||
export const zDeclaredOutputCheckConfig = z.object({
|
||||
benchmark_file_ref: z.record(z.string(), z.unknown()).nullish(),
|
||||
enabled: z.boolean().optional().default(false),
|
||||
model_ref: z.record(z.string(), z.unknown()).nullish(),
|
||||
prompt: z.string().nullish(),
|
||||
type: z.string().min(1).max(64),
|
||||
})
|
||||
|
||||
/**
|
||||
* DeclaredOutputFailureStrategy
|
||||
*/
|
||||
export const zDeclaredOutputFailureStrategy = z.object({
|
||||
max_retries: z.int().gte(0).lte(10).optional().default(0),
|
||||
on_output_check_failed: z.string().nullish(),
|
||||
on_type_check_failed: z.string().nullish(),
|
||||
})
|
||||
|
||||
/**
|
||||
* DeclaredOutputFileConfig
|
||||
*
|
||||
* File-type output metadata. Both lists empty means "any file accepted".
|
||||
*/
|
||||
export const zDeclaredOutputFileConfig = z.object({
|
||||
extensions: z.array(z.string()).optional(),
|
||||
@ -1828,11 +1826,70 @@ export const zDeclaredOutputType = z.enum([
|
||||
'string',
|
||||
])
|
||||
|
||||
/**
|
||||
* DeclaredArrayItem
|
||||
*
|
||||
* Per-item shape for an ``array``-typed declared output.
|
||||
*
|
||||
* PRD §OUTPUT 配置框 keeps arrays one level deep on first version; nested arrays
|
||||
* are rejected so the runtime type checker and JSON Schema stay easy to reason
|
||||
* about. Stage 4 §4.2.
|
||||
*/
|
||||
export const zDeclaredArrayItem = z.object({
|
||||
description: z.string().nullish(),
|
||||
type: zDeclaredOutputType,
|
||||
})
|
||||
|
||||
export const zFormInputConfig = z.unknown()
|
||||
|
||||
/**
|
||||
* OutputErrorStrategy
|
||||
*
|
||||
* Per-output failure handling strategy.
|
||||
*
|
||||
* Mirrors ``graphon.ErrorStrategy`` but scoped to a single declared output of
|
||||
* a Workflow Agent Node. The runtime applies the strategy after type check or
|
||||
* output check fails and any configured retry attempts have been exhausted.
|
||||
*/
|
||||
export const zOutputErrorStrategy = z.enum(['default_value', 'fail_branch', 'stop'])
|
||||
|
||||
/**
|
||||
* DeclaredOutputRetryConfig
|
||||
*
|
||||
* Per-output retry configuration that mirrors ``graphon.RetryConfig`` shape.
|
||||
*/
|
||||
export const zDeclaredOutputRetryConfig = z.object({
|
||||
enabled: z.boolean().optional().default(false),
|
||||
max_retries: z.int().gte(0).lte(10).optional().default(0),
|
||||
retry_interval_ms: z.int().gte(0).lte(60000).optional().default(0),
|
||||
})
|
||||
|
||||
/**
|
||||
* DeclaredOutputFailureStrategy
|
||||
*
|
||||
* Per-output failure handling.
|
||||
*
|
||||
* A single strategy applies to both ``type_check`` and ``output_check`` failures
|
||||
* (PRD does not distinguish them at the UX level). Stage 4 §4.4.
|
||||
*/
|
||||
export const zDeclaredOutputFailureStrategy = z.object({
|
||||
default_value: z.unknown().optional(),
|
||||
on_failure: zOutputErrorStrategy.optional(),
|
||||
retry: zDeclaredOutputRetryConfig.optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* DeclaredOutputConfig
|
||||
*
|
||||
* One declared output of a Workflow Agent Node.
|
||||
*
|
||||
* Stage 4 normalizes the shape: ``check`` is singular (was ``checks: list`` in
|
||||
* stage 3), and ``failure_strategy`` defaults to a populated value so runtime
|
||||
* code can call ``output.failure_strategy.on_failure`` without None-guards.
|
||||
*/
|
||||
export const zDeclaredOutputConfig = z.object({
|
||||
checks: z.array(zDeclaredOutputCheckConfig).optional(),
|
||||
array_item: zDeclaredArrayItem.optional(),
|
||||
check: zDeclaredOutputCheckConfig.optional(),
|
||||
description: z.string().nullish(),
|
||||
failure_strategy: zDeclaredOutputFailureStrategy.optional(),
|
||||
file: zDeclaredOutputFileConfig.optional(),
|
||||
@ -1871,8 +1928,6 @@ export const zComposerSavePayload = z.object({
|
||||
version_note: z.string().nullish(),
|
||||
})
|
||||
|
||||
export const zFormInputConfig = z.unknown()
|
||||
|
||||
/**
|
||||
* ButtonStyle
|
||||
*
|
||||
|
||||
Loading…
Reference in New Issue
Block a user