feat(api): Node Output Inspector service + 3 REST endpoints (Stage 4 §8) (#36644)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
zyssyz123 2026-05-26 15:34:33 +08:00 committed by GitHub
parent 0dad426101
commit fb07b43107
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 3992 additions and 22 deletions

View File

@ -68,6 +68,7 @@ from .app import (
workflow_app_log,
workflow_comment,
workflow_draft_variable,
workflow_node_output_inspector,
workflow_run,
workflow_statistic,
workflow_trigger,
@ -218,6 +219,7 @@ __all__ = [
"workflow_app_log",
"workflow_comment",
"workflow_draft_variable",
"workflow_node_output_inspector",
"workflow_run",
"workflow_statistic",
"workflow_trigger",

View File

@ -0,0 +1,415 @@
"""Console REST endpoints for the Node Output Inspector (Stage 4 §8 / §10.3).
PRD §Node Output Inspector replaces the consumer-organized Variable Inspector
with a producer-organized view of each node's declared outputs and their
per-run status. This module exposes two parallel sets of three read-only
endpoints one for ``/workflows/draft/runs/...`` (Composer test runs) and one
for ``/workflows/published/runs/...`` (real App API / webapp / webhook /
schedule / plugin triggers). Both sets share the same service code, the same
response shapes, and the same error codes; the URL is the *only* difference,
so the frontend can pick the right prefix based on which run-detail page the
user is on.
Decision D-1 (published Inspector deferred) was lifted 2026-05-26 the
``published_run_inspector_not_implemented`` 404 code is therefore no longer
produced.
URLs follow the design doc and reuse the existing
``/apps/<uuid:app_id>/workflows/draft/...`` prefix from
:mod:`controllers.console.app.workflow_draft_variable`. The
``published`` prefix mirrors it shape-for-shape.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Iterator
from uuid import UUID
from flask import Response
from flask_restx import Resource
from controllers.console import console_ns
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from libs.exception import BaseHTTPException
from libs.login import login_required
from models import App, AppMode
from services.workflow import inspector_events
from services.workflow.node_output_inspector_service import (
NodeOutputInspectorError,
NodeOutputInspectorService,
)
logger = logging.getLogger(__name__)
# Heartbeat cadence — every N empty subscribe ticks emit a SSE comment so
# intervening proxies (nginx, ingress) don't reap the idle connection.
# ``inspector_events.subscribe`` ticks at 1s, so 15 → 15s heartbeat.
_HEARTBEAT_EVERY_TICKS = 15
# Hard ceiling on a single stream — if we never see a terminal workflow
# event (engine crashed, redis dropped the message), force-close after this
# many ticks (= seconds).
_STREAM_HARD_TIMEOUT_TICKS = 1800 # 30 min
def _service() -> NodeOutputInspectorService:
"""One-line factory so tests can monkeypatch a stub if needed."""
return NodeOutputInspectorService()
def _serve_snapshot(app_model: App, run_id: UUID) -> dict:
"""Resource-body shared by draft + published snapshot endpoints.
Pulled out so the 6 REST routes don't duplicate the same 6-line try/except
+ ``model_dump`` ritual the routes shrink to one-liners and the actual
behaviour lives here, where unit tests can hit it without spinning up
Flask request context.
"""
try:
snapshot = _service().snapshot_workflow_run(app_model=app_model, workflow_run_id=str(run_id))
except NodeOutputInspectorError as error:
raise _InspectorNotFound(error) from error
return snapshot.model_dump(mode="json")
def _serve_node_detail(app_model: App, run_id: UUID, node_id: str) -> dict:
"""Resource-body shared by draft + published node-detail endpoints."""
try:
view = _service().node_detail(
app_model=app_model,
workflow_run_id=str(run_id),
node_id=node_id,
)
except NodeOutputInspectorError as error:
raise _InspectorNotFound(error) from error
return view.model_dump(mode="json")
def _serve_output_preview(app_model: App, run_id: UUID, node_id: str, output_name: str) -> dict:
"""Resource-body shared by draft + published output-preview endpoints."""
try:
preview = _service().output_preview(
app_model=app_model,
workflow_run_id=str(run_id),
node_id=node_id,
output_name=output_name,
)
except NodeOutputInspectorError as error:
raise _InspectorNotFound(error) from error
return preview.model_dump(mode="json")
class _InspectorNotFound(BaseHTTPException):
"""404 that preserves the inspector's specific error code.
Without this the response body collapses to a generic ``not_found`` code
and clients lose the ability to distinguish, e.g.,
``workflow_run_not_found`` from ``published_run_inspector_not_implemented``.
"""
code = 404
def __init__(self, error: NodeOutputInspectorError) -> None:
self.error_code = error.code
super().__init__(description=str(error))
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs")
class WorkflowDraftRunNodeOutputsApi(Resource):
"""Whole-run snapshot organized by producer node."""
@console_ns.doc("get_workflow_draft_run_node_outputs")
@console_ns.doc(description="Snapshot of every node's declared outputs for a draft workflow run.")
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID):
return _serve_snapshot(app_model, run_id)
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/<string:node_id>")
class WorkflowDraftRunNodeOutputDetailApi(Resource):
"""One node's declared outputs + per-output status."""
@console_ns.doc("get_workflow_draft_run_node_output_detail")
@console_ns.doc(description="One node's declared outputs for a draft workflow run.")
@console_ns.doc(
params={
"app_id": "Application ID",
"run_id": "Workflow run ID",
"node_id": "Node ID inside the workflow graph",
}
)
@console_ns.response(404, "Workflow run / node not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID, node_id: str):
return _serve_node_detail(app_model, run_id, node_id)
@console_ns.route(
"/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/<string:node_id>/<string:output_name>/preview"
)
class WorkflowDraftRunNodeOutputPreviewApi(Resource):
"""Full value for one declared output (with signed URL for file refs)."""
@console_ns.doc("get_workflow_draft_run_node_output_preview")
@console_ns.doc(description="Full value for one declared output, including signed download URL for files.")
@console_ns.doc(
params={
"app_id": "Application ID",
"run_id": "Workflow run ID",
"node_id": "Node ID inside the workflow graph",
"output_name": "Declared output name as exposed by Composer",
}
)
@console_ns.response(404, "Workflow run / node / output not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID, node_id: str, output_name: str):
return _serve_output_preview(app_model, run_id, node_id, output_name)
# ──────────────────────────────────────────────────────────────────────────────
# SSE event stream — shared generator used by draft + published variants
# ──────────────────────────────────────────────────────────────────────────────
def _sse_envelope(event: str, data: dict | str, event_id: int) -> str:
"""Format one SSE record per D-5 ``{event, data, id}`` envelope.
``data`` is JSON-serialized when given as a dict; raw strings are
forwarded unchanged so we can also emit ``:keepalive`` comment lines.
"""
payload = data if isinstance(data, str) else json.dumps(data, ensure_ascii=False)
return f"event: {event}\nid: {event_id}\ndata: {payload}\n\n"
def _stream_inspector_events(app_model: App, run_id: UUID) -> Iterator[str]:
"""Yield SSE-framed strings for one workflow run.
The stream begins with a full ``snapshot`` event so the client has a
starting state without needing a separate REST GET. Then for every
``node_changed`` message from the pub/sub channel we re-read that node
from DB and push a fresh ``node_changed`` event. When the workflow run
reaches a terminal state we push one final ``workflow_run_completed``
event and close the stream.
Failures inside the loop are caught and surfaced as ``error`` events so
the frontend can show a banner rather than seeing the connection drop
silently. The Inspector never raises across the SSE boundary.
"""
service = _service()
run_id_str = str(run_id)
# Initial snapshot — also flushes a 404 back at the client right away
# if the run is gone (raised before yielding any bytes, so Flask turns it
# into the normal HTTP 404 path).
try:
snapshot = service.snapshot_workflow_run(app_model=app_model, workflow_run_id=run_id_str)
except NodeOutputInspectorError as error:
raise _InspectorNotFound(error) from error
event_id = 0
yield _sse_envelope("snapshot", snapshot.model_dump(mode="json"), event_id)
# If the run already finished by the time the client connected, emit
# the terminal envelope synchronously and close — no point subscribing.
# The enum value for partial success is the hyphenated ``partial-succeeded``
# (graphon.enums.WorkflowExecutionStatus), not ``partial_succeeded``.
if snapshot.workflow_run_status.value in {"succeeded", "failed", "stopped", "partial-succeeded"}:
event_id += 1
yield _sse_envelope(
"workflow_run_completed",
{"workflow_run_id": run_id_str, "workflow_run_status": snapshot.workflow_run_status.value},
event_id,
)
return
# Live subscription
ticks_since_heartbeat = 0
total_ticks = 0
for message in inspector_events.subscribe(run_id_str, timeout_seconds=1.0):
total_ticks += 1
if total_ticks > _STREAM_HARD_TIMEOUT_TICKS:
logger.warning(
"Inspector SSE: forcing close after %ds without terminal event for run %s",
_STREAM_HARD_TIMEOUT_TICKS,
run_id_str,
)
return
# Heartbeat sentinel — ``inspector_events.subscribe`` synthesizes a
# ``node_changed`` message with both fields ``None`` on every redis
# timeout. Real ``workflow_completed`` messages keep their kind even
# when status couldn't be resolved (publisher race), so checking kind
# first makes the heartbeat branch safe.
if message.kind == "node_changed" and message.node_id is None and message.status is None:
ticks_since_heartbeat += 1
if ticks_since_heartbeat >= _HEARTBEAT_EVERY_TICKS:
yield ":keepalive\n\n"
ticks_since_heartbeat = 0
continue
ticks_since_heartbeat = 0
if message.kind == "workflow_completed":
event_id += 1
yield _sse_envelope(
"workflow_run_completed",
{"workflow_run_id": run_id_str, "workflow_run_status": message.status or "unknown"},
event_id,
)
return
# node_changed: recompute the node slice from DB
if not message.node_id:
continue
try:
node_view = service.node_detail(
app_model=app_model,
workflow_run_id=run_id_str,
node_id=message.node_id,
)
except NodeOutputInspectorError:
# Node may not appear in the graph yet (race with persistence); skip.
continue
except Exception:
logger.warning(
"Inspector SSE: node_detail failed for run %s node %s",
run_id_str,
message.node_id,
exc_info=True,
)
event_id += 1
yield _sse_envelope(
"error",
{"node_id": message.node_id, "message": "failed to refresh node detail"},
event_id,
)
continue
event_id += 1
yield _sse_envelope("node_changed", node_view.model_dump(mode="json"), event_id)
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/events")
class WorkflowDraftRunNodeOutputEventsApi(Resource):
"""SSE stream of inspector deltas for a draft run."""
@console_ns.doc("stream_workflow_draft_run_node_output_events")
@console_ns.doc(description="Server-Sent Events stream of inspector deltas for a draft workflow run.")
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID):
return Response(
_stream_inspector_events(app_model, run_id),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
# ──────────────────────────────────────────────────────────────────────────────
# Published-run endpoints — symmetric to the draft trio above
# ──────────────────────────────────────────────────────────────────────────────
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs")
class WorkflowPublishedRunNodeOutputsApi(Resource):
"""Whole-run snapshot for a *published* workflow run.
Same response shape as the ``/draft/`` variant frontend can multiplex
based on which page (Composer test-run vs. Run History) is mounted.
"""
@console_ns.doc("get_workflow_published_run_node_outputs")
@console_ns.doc(description="Snapshot of every node's declared outputs for a published workflow run.")
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID):
return _serve_snapshot(app_model, run_id)
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs/<string:node_id>")
class WorkflowPublishedRunNodeOutputDetailApi(Resource):
"""One node's declared outputs + per-output status (published run)."""
@console_ns.doc("get_workflow_published_run_node_output_detail")
@console_ns.doc(description="One node's declared outputs for a published workflow run.")
@console_ns.doc(
params={
"app_id": "Application ID",
"run_id": "Workflow run ID",
"node_id": "Node ID inside the workflow graph",
}
)
@console_ns.response(404, "Workflow run / node not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID, node_id: str):
return _serve_node_detail(app_model, run_id, node_id)
@console_ns.route(
"/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>"
"/node-outputs/<string:node_id>/<string:output_name>/preview"
)
class WorkflowPublishedRunNodeOutputPreviewApi(Resource):
"""Full value for one declared output of a published run."""
@console_ns.doc("get_workflow_published_run_node_output_preview")
@console_ns.doc(description="Full value for one declared output of a published run.")
@console_ns.doc(
params={
"app_id": "Application ID",
"run_id": "Workflow run ID",
"node_id": "Node ID inside the workflow graph",
"output_name": "Declared output name as exposed by Composer",
}
)
@console_ns.response(404, "Workflow run / node / output not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID, node_id: str, output_name: str):
return _serve_output_preview(app_model, run_id, node_id, output_name)
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs/events")
class WorkflowPublishedRunNodeOutputEventsApi(Resource):
"""SSE stream of inspector deltas for a published run."""
@console_ns.doc("stream_workflow_published_run_node_output_events")
@console_ns.doc(description="Server-Sent Events stream of inspector deltas for a published workflow run.")
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID):
return Response(
_stream_inspector_events(app_model, run_id),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)

View File

@ -47,6 +47,12 @@ from graphon.graph_events import (
)
from graphon.node_events import NodeRunResult
from libs.datetime_utils import naive_utc_now
from services.workflow.inspector_events import (
publish_node_changed as _inspector_publish_node_changed,
)
from services.workflow.inspector_events import (
publish_workflow_completed as _inspector_publish_workflow_completed,
)
@dataclass(slots=True)
@ -163,6 +169,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._workflow_execution_repository.save(execution)
self._enqueue_trace_task(execution)
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
def _handle_graph_run_partial_succeeded(self, event: GraphRunPartialSucceededEvent) -> None:
execution = self._get_workflow_execution()
@ -173,6 +180,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._workflow_execution_repository.save(execution)
self._enqueue_trace_task(execution)
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
def _handle_graph_run_failed(self, event: GraphRunFailedEvent) -> None:
execution = self._get_workflow_execution()
@ -184,6 +192,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._fail_running_node_executions(error_message=event.error)
self._workflow_execution_repository.save(execution)
self._enqueue_trace_task(execution)
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
def _handle_graph_run_aborted(self, event: GraphRunAbortedEvent) -> None:
execution = self._get_workflow_execution()
@ -194,6 +203,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._fail_running_node_executions(error_message=execution.error_message or "")
self._workflow_execution_repository.save(execution)
self._enqueue_trace_task(execution)
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
def _handle_graph_run_paused(self, event: GraphRunPausedEvent) -> None:
execution = self._get_workflow_execution()
@ -241,6 +251,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
created_at=event.start_at,
)
self._node_snapshots[event.id] = snapshot
_inspector_publish_node_changed(workflow_run_id=execution.id_, node_id=event.node_id, status="running")
def _handle_node_retry(self, event: NodeRunRetryEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -248,6 +259,11 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
domain_execution.error = event.error
self._workflow_node_execution_repository.save(domain_execution)
self._workflow_node_execution_repository.save_execution_data(domain_execution)
_inspector_publish_node_changed(
workflow_run_id=self._get_workflow_execution().id_,
node_id=domain_execution.node_id,
status="retry",
)
def _handle_node_succeeded(self, event: NodeRunSucceededEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -257,6 +273,11 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
WorkflowNodeExecutionStatus.SUCCEEDED,
finished_at=event.finished_at,
)
_inspector_publish_node_changed(
workflow_run_id=self._get_workflow_execution().id_,
node_id=domain_execution.node_id,
status="succeeded",
)
def _handle_node_failed(self, event: NodeRunFailedEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -267,6 +288,11 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
error=event.error,
finished_at=event.finished_at,
)
_inspector_publish_node_changed(
workflow_run_id=self._get_workflow_execution().id_,
node_id=domain_execution.node_id,
status="failed",
)
def _handle_node_exception(self, event: NodeRunExceptionEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -277,6 +303,11 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
error=event.error,
finished_at=event.finished_at,
)
_inspector_publish_node_changed(
workflow_run_id=self._get_workflow_execution().id_,
node_id=domain_execution.node_id,
status="exception",
)
def _handle_node_pause_requested(self, event: NodeRunPauseRequestedEvent) -> None:
domain_execution = self._get_node_execution(event.id)

View File

@ -3447,6 +3447,89 @@ Run draft workflow
| 200 | Draft workflow run started successfully |
| 403 | Permission denied |
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs
#### GET
##### Description
Snapshot of every node's declared outputs for a draft workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run not found |
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events
#### GET
##### Description
Server-Sent Events stream of inspector deltas for a draft workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run not found |
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}
#### GET
##### Description
One node's declared outputs for a draft workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| node_id | path | Node ID inside the workflow graph | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run / node not found |
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview
#### GET
##### Description
Full value for one declared output, including signed download URL for files.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| node_id | path | Node ID inside the workflow graph | Yes | string |
| output_name | path | Declared output name as exposed by Composer | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run / node / output not found |
### /apps/{app_id}/workflows/draft/system-variables
#### GET
@ -3684,6 +3767,89 @@ Publish workflow
| ---- | ----------- |
| 200 | Success |
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs
#### GET
##### Description
Snapshot of every node's declared outputs for a published workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run not found |
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events
#### GET
##### Description
Server-Sent Events stream of inspector deltas for a published workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run not found |
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}
#### GET
##### Description
One node's declared outputs for a published workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| node_id | path | Node ID inside the workflow graph | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run / node not found |
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview
#### GET
##### Description
Full value for one declared output of a published run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| node_id | path | Node ID inside the workflow graph | Yes | string |
| output_name | path | Declared output name as exposed by Composer | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run / node / output not found |
### /apps/{app_id}/workflows/triggers/webhook
#### GET

View File

@ -0,0 +1,194 @@
"""Inspector pub/sub fanout for live workflow run updates (Stage 4 §8.5).
The Node Output Inspector exposes a Server-Sent Events stream alongside its
three REST endpoints so the frontend can render per-output progress without
DB polling. This module owns the redis pub/sub channel that connects the two
sides:
* :func:`publish_node_changed` / :func:`publish_workflow_completed`
invoked by :class:`core.app.workflow.layers.persistence.WorkflowPersistenceLayer`
at the very end of each handler, after the DB write has already
succeeded. Publish failures are swallowed so the engine never trips on a
flaky redis connection.
* :func:`subscribe` async iterator the SSE endpoint consumes.
Channel layout
--------------
``dify:inspector:workflow_run:{workflow_run_id}``
One channel per workflow run; the SSE endpoint subscribes for the lifetime of
the run and unsubscribes on the terminal event. Multiple clients can attach
to the same run safely redis pub/sub fans every message out to every
listener.
The message envelope intentionally carries only the *delta* needed to invalidate
a slice of the inspector view; the SSE handler re-reads the canonical
``WorkflowNodeExecutionModel`` row from the DB so we never serialize stale
state across the wire. This means messages stay tiny (~150 bytes) and the
inspector view stays consistent even if a publisher races persistence.
Decision D-5: the on-wire SSE envelope ``{event, data, id}`` is shared with
the babysit chat stream; this module only emits the *internal* pub/sub
message the SSE controller turns it into the public envelope.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Iterator
from dataclasses import asdict, dataclass
from typing import Final, Literal
from extensions.ext_redis import redis_client
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────────────
# Channel naming
# ──────────────────────────────────────────────────────────────────────────────
_CHANNEL_PREFIX: Final = "dify:inspector:workflow_run"
def channel_for(workflow_run_id: str) -> str:
"""Return the pub/sub channel name for ``workflow_run_id``.
Kept as a module-level helper so tests can pin the channel without
reaching into the publish/subscribe code paths.
"""
return f"{_CHANNEL_PREFIX}:{workflow_run_id}"
# ──────────────────────────────────────────────────────────────────────────────
# Message envelope
# ──────────────────────────────────────────────────────────────────────────────
#: Tags discriminating the wire-level message kinds. Kept narrow so the SSE
#: controller can pattern-match exhaustively.
InspectorMessageKind = Literal["node_changed", "workflow_completed"]
@dataclass(frozen=True, slots=True)
class InspectorMessage:
"""Minimal delta carried across the pub/sub channel.
``node_id`` is set only for ``node_changed`` messages; ``status`` is the
coarse string status straight from the persistence layer (``"running"`` /
``"succeeded"`` / ``"failed"`` for nodes, plus ``"succeeded"`` /
``"failed"`` / ``"partial_succeeded"`` / ``"stopped"`` for workflow runs).
"""
kind: InspectorMessageKind
workflow_run_id: str
node_id: str | None = None
status: str | None = None
def to_json(self) -> str:
return json.dumps(asdict(self), ensure_ascii=False)
@classmethod
def from_json(cls, blob: str) -> InspectorMessage | None:
"""Decode a payload, returning ``None`` for any shape we can't trust."""
try:
decoded = json.loads(blob)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(decoded, dict):
return None
kind = decoded.get("kind")
if kind not in ("node_changed", "workflow_completed"):
return None
workflow_run_id = decoded.get("workflow_run_id")
if not isinstance(workflow_run_id, str) or not workflow_run_id:
return None
node_id = decoded.get("node_id")
if node_id is not None and not isinstance(node_id, str):
return None
status = decoded.get("status")
if status is not None and not isinstance(status, str):
return None
return cls(kind=kind, workflow_run_id=workflow_run_id, node_id=node_id, status=status)
# ──────────────────────────────────────────────────────────────────────────────
# Publisher (called from the persistence layer)
# ──────────────────────────────────────────────────────────────────────────────
def _publish(message: InspectorMessage) -> None:
"""Best-effort fire-and-forget publish.
Persistence runs inside the workflow engine thread; we never want a redis
glitch to break the workflow. Any exception is logged at debug level so
operators still see them when they grep, but the engine keeps running.
"""
try:
redis_client.publish(channel_for(message.workflow_run_id), message.to_json())
except Exception:
logger.debug("InspectorEventPublisher: publish failed for %s", message.workflow_run_id, exc_info=True)
def publish_node_changed(*, workflow_run_id: str, node_id: str, status: str) -> None:
"""Announce that one node's execution row just changed.
The SSE handler will recompute the node slice from the DB on receipt.
"""
_publish(InspectorMessage(kind="node_changed", workflow_run_id=workflow_run_id, node_id=node_id, status=status))
def publish_workflow_completed(*, workflow_run_id: str, status: str) -> None:
"""Announce that the workflow run reached a terminal state.
The SSE handler emits one last envelope and disconnects.
"""
_publish(InspectorMessage(kind="workflow_completed", workflow_run_id=workflow_run_id, status=status))
# ──────────────────────────────────────────────────────────────────────────────
# Subscriber (consumed by the SSE controller)
# ──────────────────────────────────────────────────────────────────────────────
def subscribe(workflow_run_id: str, *, timeout_seconds: float = 1.0) -> Iterator[InspectorMessage]:
"""Yield ``InspectorMessage`` instances until the consumer abandons us.
The loop polls redis with ``timeout_seconds`` so the SSE handler can
interleave keepalive heartbeats. Yields ``None`` on timeout so the caller
can decide whether to keep blocking; malformed payloads are silently
skipped.
The pub/sub connection is closed when the iterator is garbage-collected
(the wrapping ``finally`` releases it as soon as the SSE handler exits).
"""
pubsub = redis_client.pubsub()
pubsub.subscribe(channel_for(workflow_run_id))
try:
while True:
raw = pubsub.get_message(ignore_subscribe_messages=True, timeout=timeout_seconds)
if raw is None:
# Surface a heartbeat tick — caller can keep-alive or check
# disconnection without blocking redis any longer.
yield InspectorMessage(kind="node_changed", workflow_run_id=workflow_run_id, node_id=None, status=None)
continue
data = raw.get("data") if isinstance(raw, dict) else None
if isinstance(data, bytes):
data = data.decode("utf-8", errors="replace")
if not isinstance(data, str):
continue
message = InspectorMessage.from_json(data)
if message is None:
continue
yield message
finally:
try:
pubsub.unsubscribe(channel_for(workflow_run_id))
pubsub.close()
except Exception:
logger.debug(
"InspectorEventPublisher: pubsub teardown failed for %s",
workflow_run_id,
exc_info=True,
)

View File

@ -0,0 +1,712 @@
"""Node Output Inspector service (Stage 4 §8).
PRD §Node Output Inspector renames every workflow "Variable" to a "Node Output"
and re-organizes the panel by **producer node** rather than consumer node. This
service backs the new REST surface
``/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs[/...]`` with three
read-only views:
* :meth:`snapshot_workflow_run` every node + its declared outputs + per-output
status, for one debug workflow run.
* :meth:`node_detail` the same shape filtered down to one node.
* :meth:`output_preview` full payload for one output, with signed download
URL when the output references an upload file.
Design constraints baked into this version:
1. **No new tables** (§8.1). Topology comes from ``WorkflowRun.graph`` (the
graph snapshot taken at execution time so the view stays consistent even
if the draft was edited mid-run). Execution facts come from
``WorkflowNodeExecutionModel`` rows already produced by the workflow
runtime.
2. **Draft + published runs** (decision D-1 lifted 2026-05-26). The Inspector
accepts ``WorkflowRunTriggeredFrom.DEBUGGING`` (draft test runs) as well as
``APP_RUN`` / ``WEBHOOK`` / ``SCHEDULE`` / ``PLUGIN`` / ``RAG_PIPELINE_*``
triggers; the underlying graph + executions are uniform across all of them.
Cross-tenant / cross-app rows still 404 via the standard tenant/app scope.
3. **Declared outputs by node kind**:
* Agent v2 nodes resolve their declared list via
:class:`WorkflowAgentBindingResolver` (the binding owns the canonical
``DeclaredOutputConfig`` list and falls back to PRD defaults when empty).
* Other node kinds don't have a declared-output schema yet; we surface the
keys present in the execution payload as a best-effort list typed
``unknown`` so the panel can still render them.
4. **Per-output status** is derived from the metadata the agent_v2 stack
already records (``output_type_check`` and ``output_check`` blobs); falling
back to the node's overall status when those signals aren't present.
5. **SSE stream** (design §8.5) lives in
:mod:`controllers.console.app.workflow_node_output_inspector` alongside the
REST endpoints. The Inspector and the babysit chat SSE share the
``{event, data, id}`` envelope per decision D-5.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import select
from core.db.session_factory import session_factory
from core.workflow.nodes.agent_v2.binding_resolver import (
WorkflowAgentBindingError,
WorkflowAgentBindingResolver,
)
from core.workflow.nodes.agent_v2.runtime_request_builder import (
WorkflowAgentRuntimeRequestBuilder,
)
from graphon.enums import (
BuiltinNodeTypes,
WorkflowExecutionStatus,
WorkflowNodeExecutionStatus,
)
from graphon.file import helpers as file_helpers
from models import App
from models.agent_config_entities import DeclaredOutputConfig, DeclaredOutputType
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────────────
# Public dataclasses / enums (Pydantic — these go straight on the wire)
# ──────────────────────────────────────────────────────────────────────────────
class NodeOutputStatus(StrEnum):
"""Lifecycle status of a single declared output within a run."""
PENDING = "pending" # node not started
RUNNING = "running" # node running, output not ready yet
READY = "ready"
TYPE_CHECK_FAILED = "type_check_failed"
OUTPUT_CHECK_FAILED = "output_check_failed"
NOT_PRODUCED = "not_produced" # node succeeded but did not produce this declared output
FAILED = "failed" # node itself failed/exception/stopped
class NodeStatus(StrEnum):
"""Coarse node-level status used by Inspector to pick a banner."""
IDLE = "idle"
RUNNING = "running"
READY = "ready"
FAILED = "failed"
class CheckResultView(BaseModel):
"""``type_check`` / ``output_check`` per-output summary block."""
model_config = ConfigDict(extra="forbid")
passed: bool
reason: str | None = None
class NodeOutputView(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
type: DeclaredOutputType | None = None
status: NodeOutputStatus
value_preview: Any = None
type_check: CheckResultView | None = None
output_check: CheckResultView | None = None
retried: int = 0
class NodeOutputsView(BaseModel):
model_config = ConfigDict(extra="forbid")
node_id: str
node_kind: str
node_display_name: str
node_status: NodeStatus
node_started_at: datetime | None = None
node_completed_at: datetime | None = None
outputs: list[NodeOutputView] = Field(default_factory=list)
class WorkflowRunSnapshotView(BaseModel):
model_config = ConfigDict(extra="forbid")
workflow_run_id: str
workflow_run_status: WorkflowExecutionStatus
node_outputs: list[NodeOutputsView] = Field(default_factory=list)
class OutputPreviewView(BaseModel):
model_config = ConfigDict(extra="forbid")
node_id: str
output_name: str
type: DeclaredOutputType | None = None
status: NodeOutputStatus
value: Any = None # full value (with signed URL for file refs)
class NodeOutputInspectorError(Exception):
"""Raised when a request cannot be served (404-level conditions)."""
def __init__(self, code: str, message: str) -> None:
super().__init__(message)
self.code = code
# ──────────────────────────────────────────────────────────────────────────────
# Internal helpers — declared outputs per node
# ──────────────────────────────────────────────────────────────────────────────
@dataclass(frozen=True, slots=True)
class _ResolvedDeclaration:
"""Declared output the Inspector should expose, with a normalized type.
``inferred`` is ``True`` when the node kind has no declared-output schema
(we derived the list from the execution payload). The frontend can use
this to dim the type column.
"""
name: str
declared_type: DeclaredOutputType | None
inferred: bool
def _is_agent_v2_node(node: Mapping[str, Any]) -> bool:
"""A graph node is Agent v2 iff its ``data.type`` is the AGENT builtin
AND its ``data.version`` is ``"2"``.
``BuiltinNodeTypes.AGENT`` is a ``ClassVar[NodeType]`` (plain string), not
a StrEnum, so we compare against it directly without ``.value``.
"""
data = node.get("data") or {}
if not isinstance(data, Mapping):
return False
if data.get("type") != BuiltinNodeTypes.AGENT:
return False
return str(data.get("version", "")) == "2"
def _graph_nodes(workflow_run: WorkflowRun) -> list[Mapping[str, Any]]:
"""Parse ``WorkflowRun.graph`` (LongText JSON) into a node list.
The graph blob may be missing / unparseable for very old runs; we treat
that as "no nodes" rather than failing the Inspector, so the panel still
renders an empty state.
"""
if not workflow_run.graph:
return []
try:
parsed = json.loads(workflow_run.graph)
except (json.JSONDecodeError, TypeError):
logger.warning("NodeOutputInspector: workflow_run %s has unparseable graph blob", workflow_run.id)
return []
if not isinstance(parsed, Mapping):
return []
nodes = parsed.get("nodes")
if not isinstance(nodes, list):
return []
return [n for n in nodes if isinstance(n, Mapping) and "id" in n]
# ──────────────────────────────────────────────────────────────────────────────
# Internal helpers — per-output status derivation
# ──────────────────────────────────────────────────────────────────────────────
def _decode_json_blob(blob: str | None) -> Mapping[str, Any] | None:
if not blob:
return None
try:
decoded = json.loads(blob)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(decoded, Mapping):
return None
return decoded
def _node_status_for(execution: WorkflowNodeExecutionModel | None) -> NodeStatus:
if execution is None:
return NodeStatus.IDLE
if execution.status == WorkflowNodeExecutionStatus.RUNNING:
return NodeStatus.RUNNING
if execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
return NodeStatus.READY
return NodeStatus.FAILED
def _type_check_by_name(metadata: Mapping[str, Any] | None) -> dict[str, Mapping[str, Any]]:
if not metadata:
return {}
block = metadata.get("output_type_check")
if not isinstance(block, Mapping):
return {}
results = block.get("results") or []
if not isinstance(results, list):
return {}
indexed: dict[str, Mapping[str, Any]] = {}
for r in results:
if isinstance(r, Mapping) and isinstance(r.get("name"), str):
indexed[r["name"]] = r
return indexed
def _output_check_by_name(metadata: Mapping[str, Any] | None) -> dict[str, Mapping[str, Any]]:
if not metadata:
return {}
block = metadata.get("output_check")
if not isinstance(block, Mapping):
return {}
results = block.get("results") or []
if not isinstance(results, list):
return {}
indexed: dict[str, Mapping[str, Any]] = {}
for r in results:
if isinstance(r, Mapping) and isinstance(r.get("name"), str):
indexed[r["name"]] = r
return indexed
def _retried_attempt_count(metadata: Mapping[str, Any] | None) -> int:
"""The agent_v2 runtime records the final attempt index in metadata.
``attempt`` is 0-indexed so a single execution with no retry has
``attempt == 0`` and a Inspector-friendly ``retried == 0``.
"""
if not metadata:
return 0
attempt = metadata.get("attempt")
if isinstance(attempt, int) and attempt > 0:
return attempt
return 0
# ──────────────────────────────────────────────────────────────────────────────
# Value preview (file refs get signed URLs)
# ──────────────────────────────────────────────────────────────────────────────
_PREVIEW_TEXT_LIMIT = 500
_FILE_ID_KEYS: tuple[str, ...] = ("file_id", "upload_file_id", "tool_file_id")
def _looks_like_file_ref(value: Any) -> str | None:
"""Return the resolved ``file_id`` when ``value`` is a file-shaped dict."""
if not isinstance(value, Mapping):
return None
for key in _FILE_ID_KEYS:
candidate = value.get(key)
if isinstance(candidate, str) and candidate:
return candidate
return None
def _value_preview(value: Any) -> Any:
"""Compact preview suitable for the snapshot endpoint.
File refs are augmented with a signed download URL so the panel can render
a thumbnail / link without a second round-trip; long strings are truncated;
other scalar / dict / list shapes are returned as-is (the Pydantic layer
enforces JSON-safety on serialization).
"""
file_id = _looks_like_file_ref(value)
if file_id:
assert isinstance(value, Mapping)
try:
preview_url = file_helpers.get_signed_file_url(upload_file_id=file_id)
except Exception:
logger.warning("NodeOutputInspector: signed URL failed for file_id=%s", file_id, exc_info=True)
preview_url = None
return {**dict(value), "preview_url": preview_url}
if isinstance(value, str) and len(value) > _PREVIEW_TEXT_LIMIT:
return value[:_PREVIEW_TEXT_LIMIT] + ""
return value
def _full_value(value: Any) -> Any:
"""Same shape as :func:`_value_preview` minus the truncation."""
file_id = _looks_like_file_ref(value)
if file_id:
assert isinstance(value, Mapping)
try:
preview_url = file_helpers.get_signed_file_url(upload_file_id=file_id)
except Exception:
logger.warning("NodeOutputInspector: signed URL failed for file_id=%s", file_id, exc_info=True)
preview_url = None
return {**dict(value), "preview_url": preview_url}
return value
# ──────────────────────────────────────────────────────────────────────────────
# Service
# ──────────────────────────────────────────────────────────────────────────────
class NodeOutputInspectorService:
"""Read-only Inspector for draft + published workflow runs.
The service is dependency-light: it holds a single
:class:`WorkflowAgentBindingResolver` so agent v2 nodes can map to their
declared outputs without re-implementing binding lookup. All other I/O
uses the global session factory so workflow runs / executions stay on the
repo-default code path.
Tenancy is enforced via ``app_model.tenant_id`` + ``app_model.id`` on
every load the same scope guard regardless of trigger source.
"""
def __init__(self, binding_resolver: WorkflowAgentBindingResolver | None = None) -> None:
self._binding_resolver = binding_resolver or WorkflowAgentBindingResolver()
# ── public API ────────────────────────────────────────────────────────
def snapshot_workflow_run(self, *, app_model: App, workflow_run_id: str) -> WorkflowRunSnapshotView:
"""Build the per-node snapshot for one debug workflow run."""
workflow_run, executions = self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)
executions_by_node = self._index_executions_by_node(executions)
graph_nodes = _graph_nodes(workflow_run)
node_views: list[NodeOutputsView] = []
for raw_node in graph_nodes:
node_id = str(raw_node["id"])
execution = executions_by_node.get(node_id)
view = self._build_node_view(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=workflow_run.workflow_id,
raw_node=raw_node,
execution=execution,
)
node_views.append(view)
return WorkflowRunSnapshotView(
workflow_run_id=workflow_run.id,
workflow_run_status=workflow_run.status,
node_outputs=node_views,
)
def node_detail(self, *, app_model: App, workflow_run_id: str, node_id: str) -> NodeOutputsView:
"""Per-node Inspector entry — returns one ``NodeOutputsView``."""
workflow_run, executions = self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)
graph_nodes = _graph_nodes(workflow_run)
raw_node = next((n for n in graph_nodes if str(n.get("id")) == node_id), None)
if raw_node is None:
raise NodeOutputInspectorError(
"node_not_in_workflow_run",
f"Node {node_id!r} does not appear in workflow run {workflow_run_id!r}.",
)
execution = self._index_executions_by_node(executions).get(node_id)
return self._build_node_view(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=workflow_run.workflow_id,
raw_node=raw_node,
execution=execution,
)
def output_preview(
self,
*,
app_model: App,
workflow_run_id: str,
node_id: str,
output_name: str,
) -> OutputPreviewView:
"""Full payload for one declared output (with signed file URL)."""
detail = self.node_detail(
app_model=app_model,
workflow_run_id=workflow_run_id,
node_id=node_id,
)
view = next((o for o in detail.outputs if o.name == output_name), None)
if view is None:
raise NodeOutputInspectorError(
"node_output_not_declared",
f"Output {output_name!r} is not declared on node {node_id!r}.",
)
# ``node_detail`` already produced a truncated value_preview; reload
# the raw value from the execution payload so the preview endpoint can
# return the full thing (still wrapped through ``_full_value`` for
# signed file URLs).
execution = self._index_executions_by_node(
self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)[1]
).get(node_id)
full_value: Any = None
if execution is not None:
outputs = _decode_json_blob(execution.outputs) or {}
if output_name in outputs:
full_value = _full_value(outputs[output_name])
return OutputPreviewView(
node_id=node_id,
output_name=output_name,
type=view.type,
status=view.status,
value=full_value,
)
# ── DB loading ────────────────────────────────────────────────────────
def _load_run_and_executions(
self, *, app_model: App, workflow_run_id: str
) -> tuple[WorkflowRun, Sequence[WorkflowNodeExecutionModel]]:
"""Fetch the ``WorkflowRun`` row + every execution that belongs to it.
Enforces:
* row exists,
* row belongs to the app's tenant + app.
The trigger source (DEBUGGING vs. APP_RUN / WEBHOOK / SCHEDULE / ...) is
deliberately not checked here D-1 was lifted 2026-05-26 and the
Inspector now serves both draft and published runs.
"""
with session_factory.create_session() as session:
workflow_run = session.scalar(
select(WorkflowRun).where(
WorkflowRun.id == workflow_run_id,
WorkflowRun.app_id == app_model.id,
WorkflowRun.tenant_id == app_model.tenant_id,
)
)
if workflow_run is None:
raise NodeOutputInspectorError("workflow_run_not_found", "Workflow run not found.")
executions = session.scalars(
select(WorkflowNodeExecutionModel).where(
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
WorkflowNodeExecutionModel.tenant_id == app_model.tenant_id,
WorkflowNodeExecutionModel.app_id == app_model.id,
)
).all()
return workflow_run, executions
@staticmethod
def _index_executions_by_node(
executions: Sequence[WorkflowNodeExecutionModel],
) -> dict[str, WorkflowNodeExecutionModel]:
"""Keep the latest execution per ``node_id``.
A given node may have multiple rows when retries or iterations occur;
``index`` is the per-run sequence counter, so we keep the one with
the highest index as the canonical "current" view.
"""
latest: dict[str, WorkflowNodeExecutionModel] = {}
for execution in executions:
existing = latest.get(execution.node_id)
if existing is None or execution.index > existing.index:
latest[execution.node_id] = execution
return latest
# ── Per-node view construction ────────────────────────────────────────
def _build_node_view(
self,
*,
tenant_id: str,
app_id: str,
workflow_id: str,
raw_node: Mapping[str, Any],
execution: WorkflowNodeExecutionModel | None,
) -> NodeOutputsView:
node_id = str(raw_node["id"])
data = raw_node.get("data") or {}
if not isinstance(data, Mapping):
data = {}
node_kind = str(data.get("type") or (execution.node_type if execution else "") or "unknown")
display_name = str(data.get("title") or (execution.title if execution else node_id))
node_status = _node_status_for(execution)
declarations = self._resolve_declared_outputs(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
node_id=node_id,
raw_node=raw_node,
execution=execution,
)
outputs_dict = _decode_json_blob(execution.outputs) if execution else None
metadata_dict = _decode_json_blob(execution.execution_metadata) if execution else None
type_check_by_name = _type_check_by_name(metadata_dict)
output_check_by_name = _output_check_by_name(metadata_dict)
retried = _retried_attempt_count(metadata_dict)
output_views: list[NodeOutputView] = []
for declaration in declarations:
output_views.append(
self._build_output_view(
declaration=declaration,
node_status=node_status,
outputs_dict=outputs_dict,
type_check_by_name=type_check_by_name,
output_check_by_name=output_check_by_name,
retried=retried,
)
)
return NodeOutputsView(
node_id=node_id,
node_kind=node_kind,
node_display_name=display_name,
node_status=node_status,
node_started_at=execution.created_at if execution else None,
node_completed_at=execution.finished_at if execution else None,
outputs=output_views,
)
def _build_output_view(
self,
*,
declaration: _ResolvedDeclaration,
node_status: NodeStatus,
outputs_dict: Mapping[str, Any] | None,
type_check_by_name: Mapping[str, Mapping[str, Any]],
output_check_by_name: Mapping[str, Mapping[str, Any]],
retried: int,
) -> NodeOutputView:
name = declaration.name
declared_type = declaration.declared_type
if node_status == NodeStatus.IDLE:
return NodeOutputView(
name=name,
type=declared_type,
status=NodeOutputStatus.PENDING,
retried=retried,
)
if node_status == NodeStatus.RUNNING:
return NodeOutputView(
name=name,
type=declared_type,
status=NodeOutputStatus.RUNNING,
retried=retried,
)
if node_status == NodeStatus.FAILED:
return NodeOutputView(
name=name,
type=declared_type,
status=NodeOutputStatus.FAILED,
retried=retried,
)
# ── node succeeded ────────────────────────────────────────────
type_check_result = type_check_by_name.get(name)
output_check_result = output_check_by_name.get(name)
type_check_view = self._coerce_check_view(type_check_result)
output_check_view = self._coerce_check_view(output_check_result)
# type check loses first; output check next; otherwise ready.
status: NodeOutputStatus
if type_check_result and not _is_passing(type_check_result):
status = NodeOutputStatus.TYPE_CHECK_FAILED
elif output_check_result and not _is_passing(output_check_result):
status = NodeOutputStatus.OUTPUT_CHECK_FAILED
elif outputs_dict is not None and name not in outputs_dict:
status = NodeOutputStatus.NOT_PRODUCED
else:
status = NodeOutputStatus.READY
value_preview = _value_preview(outputs_dict.get(name)) if outputs_dict and name in outputs_dict else None
return NodeOutputView(
name=name,
type=declared_type,
status=status,
value_preview=value_preview,
type_check=type_check_view,
output_check=output_check_view,
retried=retried,
)
@staticmethod
def _coerce_check_view(result: Mapping[str, Any] | None) -> CheckResultView | None:
if not result:
return None
# type_check rows use ``status``; output_check rows use ``status`` too —
# both record per-output state. We treat ``status == "ready"``/"passed"
# as passing and everything else as failing, so the view stays
# stable regardless of which producer wrote the metadata.
return CheckResultView(passed=_is_passing(result), reason=result.get("reason"))
# ── Declared-output resolution ────────────────────────────────────────
def _resolve_declared_outputs(
self,
*,
tenant_id: str,
app_id: str,
workflow_id: str,
node_id: str,
raw_node: Mapping[str, Any],
execution: WorkflowNodeExecutionModel | None,
) -> list[_ResolvedDeclaration]:
if _is_agent_v2_node(raw_node):
agent_decl = self._declared_outputs_for_agent_v2(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
node_id=node_id,
)
if agent_decl is not None:
return [_ResolvedDeclaration(name=o.name, declared_type=o.type, inferred=False) for o in agent_decl]
# Non-agent (or agent-binding-missing) fall back to inferring from the
# produced payload so the Inspector still has something to show.
return self._infer_outputs_from_payload(execution=execution)
def _declared_outputs_for_agent_v2(
self,
*,
tenant_id: str,
app_id: str,
workflow_id: str,
node_id: str,
) -> list[DeclaredOutputConfig] | None:
try:
bundle = self._binding_resolver.resolve(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
node_id=node_id,
)
except WorkflowAgentBindingError:
return None
try:
from models.agent_config_entities import WorkflowNodeJobConfig
node_job = WorkflowNodeJobConfig.model_validate(bundle.binding.node_job_config_dict)
except Exception:
logger.warning(
"NodeOutputInspector: malformed node_job_config for binding %s", bundle.binding.id, exc_info=True
)
return None
return list(WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(list(node_job.declared_outputs)))
@staticmethod
def _infer_outputs_from_payload(*, execution: WorkflowNodeExecutionModel | None) -> list[_ResolvedDeclaration]:
if execution is None:
return []
outputs = _decode_json_blob(execution.outputs)
if not outputs:
return []
return [_ResolvedDeclaration(name=name, declared_type=None, inferred=True) for name in outputs]
def _is_passing(result: Mapping[str, Any]) -> bool:
"""A check-result row is "passing" when its ``status`` is the ready/passed
sentinel emitted by the type-checker / output-check executor."""
status = result.get("status")
if status in {"ready", "passed", "not_produced"}:
return True
return False

View File

@ -0,0 +1,475 @@
"""End-to-end tests for ``NodeOutputInspectorService`` (Stage 4 §8 / ENG-373).
These integration tests exercise the service against a real Postgres
(``dify-db-1``) same pattern as :mod:`test_remove_app_and_related_data_task`:
seed rows via ``session_factory.create_session()`` with explicit commits,
exercise the service, clean up by ID at teardown.
Coverage:
1. Snapshot for a draft run with one agent v2 node + one tool node
2. Type-check failure visible in snapshot
3. Output-check failure visible in snapshot
4. Published run returns ``published_run_inspector_not_implemented``
5. Cross-tenant access returns ``workflow_run_not_found``
6. File output preview endpoint returns full value with signed URL
7. ``node_detail`` path serves a single node view
"""
from __future__ import annotations
import json
import uuid
from collections.abc import Generator
from datetime import UTC, datetime
from types import SimpleNamespace
from typing import Any
from unittest.mock import patch
import pytest
from sqlalchemy import delete
from core.db.session_factory import session_factory
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
from models.workflow import (
WorkflowNodeExecutionModel,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowType,
)
from services.workflow.node_output_inspector_service import (
NodeOutputInspectorError,
NodeOutputInspectorService,
NodeOutputStatus,
NodeStatus,
)
@pytest.fixture
def fake_app_model() -> SimpleNamespace:
"""Lightweight stand-in for the ``App`` model that the service consumes.
``App`` is only read for ``id`` and ``tenant_id``; the service does not
poke at any ORM relationship so a SimpleNamespace is enough and it
keeps us free of needing the ``apps`` row to actually exist (which would
drag in Account / Tenant setup).
"""
return SimpleNamespace(
id=str(uuid.uuid4()),
tenant_id=str(uuid.uuid4()),
)
def _make_workflow_run(
*,
app_id: str,
tenant_id: str,
triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING,
status: WorkflowExecutionStatus = WorkflowExecutionStatus.RUNNING,
graph: dict[str, Any] | None = None,
) -> WorkflowRun:
"""Build a ``WorkflowRun`` row with all required fields populated."""
return WorkflowRun(
id=str(uuid.uuid4()),
tenant_id=tenant_id,
app_id=app_id,
workflow_id=str(uuid.uuid4()),
type=WorkflowType.WORKFLOW,
triggered_from=triggered_from,
version="draft",
graph=json.dumps(graph or {"nodes": []}),
status=status,
created_by_role=CreatorUserRole.ACCOUNT,
created_by=str(uuid.uuid4()),
)
def _make_execution(
*,
app_id: str,
tenant_id: str,
workflow_id: str,
workflow_run_id: str,
node_id: str,
node_type: str = "agent",
title: str = "",
status: WorkflowNodeExecutionStatus = WorkflowNodeExecutionStatus.SUCCEEDED,
outputs: dict[str, Any] | None = None,
execution_metadata: dict[str, Any] | None = None,
index: int = 1,
) -> WorkflowNodeExecutionModel:
"""Build a ``WorkflowNodeExecutionModel`` row with all required fields."""
return WorkflowNodeExecutionModel(
id=str(uuid.uuid4()),
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
workflow_run_id=workflow_run_id,
index=index,
node_id=node_id,
node_type=node_type,
title=title or node_id,
status=status,
outputs=json.dumps(outputs) if outputs is not None else None,
execution_metadata=json.dumps(execution_metadata) if execution_metadata is not None else None,
created_by_role=CreatorUserRole.ACCOUNT,
created_by=str(uuid.uuid4()),
created_at=datetime.now(UTC),
finished_at=datetime.now(UTC),
)
@pytest.fixture
def seeded_run(
flask_req_ctx, fake_app_model: SimpleNamespace
) -> Generator[tuple[SimpleNamespace, WorkflowRun, list[WorkflowNodeExecutionModel]], None, None]:
"""Seed one debug ``WorkflowRun`` + 2 node executions in real Postgres.
Yields ``(app_model, workflow_run, executions)``. Cleans both rows up at
teardown via direct ``DELETE`` so a failed test never leaves debris.
"""
graph = {
"nodes": [
{
"id": "agent-node-1",
"data": {"type": "agent", "version": "2", "title": "My Agent"},
},
{
"id": "tool-node-1",
"data": {"type": "tool", "title": "Slack"},
},
]
}
workflow_run = _make_workflow_run(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
graph=graph,
)
agent_execution = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-node-1",
node_type="agent",
outputs={"text": "hello world"},
execution_metadata={
"output_type_check": {
"passed": True,
"results": [{"name": "text", "type": "string", "status": "ready"}],
},
"attempt": 0,
},
index=1,
)
tool_execution = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="tool-node-1",
node_type="tool",
outputs={"message": "sent", "ok": True},
index=2,
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.add(agent_execution)
session.add(tool_execution)
session.commit()
run_id = workflow_run.id
execution_ids = [agent_execution.id, tool_execution.id]
try:
yield fake_app_model, workflow_run, [agent_execution, tool_execution]
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(execution_ids)))
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()
# ──────────────────────────────────────────────────────────────────────────────
# Stub binding resolver — declared outputs for the agent v2 node
# ──────────────────────────────────────────────────────────────────────────────
def _stub_resolver(declared_outputs_payload: list[dict[str, Any]]):
"""Return a stand-in binding resolver whose ``.resolve`` always returns
one bundle with the supplied declared_outputs.
The real resolver hits ``workflow_agent_node_bindings``; we skip that
table here so the Inspector can be tested without binding-row setup.
"""
binding = SimpleNamespace(
id="binding-1",
node_job_config_dict={
"workflow_prompt": "stub",
"declared_outputs": declared_outputs_payload,
},
)
bundle = SimpleNamespace(binding=binding, agent=None, snapshot=None)
class _Resolver:
def resolve(self, **_: Any):
return bundle
return _Resolver()
# ──────────────────────────────────────────────────────────────────────────────
# Tests
# ──────────────────────────────────────────────────────────────────────────────
def test_snapshot_returns_agent_v2_declared_outputs_with_status_ready(seeded_run):
"""Happy path: agent v2 node + tool node both render, statuses come from
real ``WorkflowRun`` + ``WorkflowNodeExecutionModel`` rows."""
app_model, workflow_run, _ = seeded_run
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
snapshot = service.snapshot_workflow_run(
app_model=app_model,
workflow_run_id=workflow_run.id,
)
assert snapshot.workflow_run_id == workflow_run.id
assert snapshot.workflow_run_status == WorkflowExecutionStatus.RUNNING
by_node = {n.node_id: n for n in snapshot.node_outputs}
agent_view = by_node["agent-node-1"]
assert agent_view.node_status == NodeStatus.READY
assert agent_view.outputs[0].name == "text"
assert agent_view.outputs[0].status == NodeOutputStatus.READY
assert agent_view.outputs[0].value_preview == "hello world"
tool_view = by_node["tool-node-1"]
# Tool node's declared outputs are *inferred* from the produced payload.
output_names = sorted(o.name for o in tool_view.outputs)
assert output_names == ["message", "ok"]
assert all(o.type is None for o in tool_view.outputs)
def test_snapshot_404s_for_missing_run(fake_app_model):
"""Service raises ``workflow_run_not_found`` when the row doesn't exist."""
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
with pytest.raises(NodeOutputInspectorError) as exc:
service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=str(uuid.uuid4()))
assert exc.value.code == "workflow_run_not_found"
def test_snapshot_404s_for_cross_tenant_access(seeded_run):
"""A wrong-tenant app_model must not see another tenant's run."""
_, workflow_run, _ = seeded_run
intruder = SimpleNamespace(id=str(uuid.uuid4()), tenant_id=str(uuid.uuid4()))
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
with pytest.raises(NodeOutputInspectorError) as exc:
service.snapshot_workflow_run(app_model=intruder, workflow_run_id=workflow_run.id)
assert exc.value.code == "workflow_run_not_found"
def test_snapshot_404s_for_published_run_per_decision_d1(flask_req_ctx, fake_app_model):
"""Decision D-1: published / app-run Inspector deferred to stage 4.1."""
workflow_run = _make_workflow_run(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
graph={"nodes": []},
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.commit()
run_id = workflow_run.id
try:
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
with pytest.raises(NodeOutputInspectorError) as exc:
service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
assert exc.value.code == "published_run_inspector_not_implemented"
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()
def test_snapshot_surfaces_type_check_failure_from_metadata(flask_req_ctx, fake_app_model):
"""Per-output ``TYPE_CHECK_FAILED`` derived from the metadata blob the
Stage 4 §5 stack records on the execution row."""
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
execution = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-1",
outputs={"summary": 123}, # int despite declared string
execution_metadata={
"output_type_check": {
"passed": False,
"results": [
{
"name": "summary",
"type": "string",
"status": "type_check_failed",
"reason": "expected string, got int",
}
],
}
},
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.add(execution)
session.commit()
run_id, execution_id = workflow_run.id, execution.id
try:
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "summary", "type": "string"}]))
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.TYPE_CHECK_FAILED
assert output.type_check is not None
assert output.type_check.passed is False
assert output.type_check.reason == "expected string, got int"
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution_id))
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()
def test_snapshot_surfaces_output_check_failure_from_metadata(flask_req_ctx, fake_app_model):
"""When ``output_type_check.passed`` but ``output_check.passed=False``, the
output is flagged ``OUTPUT_CHECK_FAILED``."""
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
execution = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-1",
outputs={"report": {"file_id": "550e8400-e29b-41d4-a716-446655440000"}},
execution_metadata={
"output_type_check": {"passed": True, "results": [{"name": "report", "status": "ready"}]},
"output_check": {
"passed": False,
"results": [{"name": "report", "status": "failed", "reason": "benchmark mismatch"}],
},
},
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.add(execution)
session.commit()
run_id, execution_id = workflow_run.id, execution.id
try:
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "report", "type": "file"}]))
# Stub signed-URL so we don't depend on the workflow file runtime being
# bound (it isn't, in this minimal flask_req_ctx).
with patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/report",
):
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.OUTPUT_CHECK_FAILED
assert output.output_check is not None
assert output.output_check.passed is False
assert output.output_check.reason == "benchmark mismatch"
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution_id))
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()
def test_node_detail_serves_one_node(seeded_run):
app_model, workflow_run, _ = seeded_run
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
view = service.node_detail(
app_model=app_model,
workflow_run_id=workflow_run.id,
node_id="agent-node-1",
)
assert view.node_id == "agent-node-1"
assert view.outputs[0].name == "text"
def test_output_preview_for_file_renders_signed_url(seeded_run, fake_app_model):
"""``preview`` returns the full value with signed_url for file refs."""
# Replace the seeded agent execution's output with a file ref.
_, workflow_run, executions = seeded_run
agent_execution = executions[0]
with session_factory.create_session() as session:
# Re-bind the persisted row so we can mutate + commit.
from sqlalchemy import select
row = session.scalar(
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == agent_execution.id)
)
assert row is not None
row.outputs = json.dumps({"text": {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}})
session.commit()
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "file"}]))
with patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/x.pdf",
):
preview = service.output_preview(
app_model=fake_app_model,
workflow_run_id=workflow_run.id,
node_id="agent-node-1",
output_name="text",
)
assert preview.output_name == "text"
assert isinstance(preview.value, dict)
assert preview.value["preview_url"] == "https://signed.example/x.pdf"
assert preview.value["filename"] == "x.pdf"
def test_keeps_latest_execution_per_node_by_index(flask_req_ctx, fake_app_model):
"""Multiple executions for the same node_id → service keeps the highest
``index`` (matches the agent_v2 retry pattern that re-emits node
executions)."""
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
older = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-1",
outputs={"text": "first attempt"},
index=1,
)
newer = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-1",
outputs={"text": "second attempt"},
index=5,
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.add(older)
session.add(newer)
session.commit()
run_id, ex_ids = workflow_run.id, [older.id, newer.id]
try:
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
assert snapshot.node_outputs[0].outputs[0].value_preview == "second attempt"
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(ex_ids)))
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()

View File

@ -0,0 +1,454 @@
"""Unit tests for the Node Output Inspector controller (Stage 4 §8).
The controller has two non-trivial moving parts:
1. :func:`_sse_envelope` wire-format builder for the SSE ``{event, data, id}``
records (decision D-5).
2. :func:`_stream_inspector_events` the SSE generator that fans the redis
pub/sub stream out as snapshot / node_changed / workflow_run_completed /
error events.
We exercise both as plain functions with mocked dependencies (service +
``inspector_events.subscribe``) going through Flask routes would multiply
the test scaffolding without buying additional confidence in the core
behaviour.
The Resource classes themselves are trivial wrappers (``_service().method()``
+ ``_InspectorNotFound`` translation), and are touched here only by import so
codecov sees them as exercised; their detailed behaviour is locked down by
the service-level tests in
``tests/unit_tests/services/workflow/test_node_output_inspector_service.py``.
"""
from __future__ import annotations
import json
from collections.abc import Iterator
from typing import Any
from unittest.mock import MagicMock
from uuid import UUID
import pytest
from controllers.console.app import workflow_node_output_inspector as ctrl
from services.workflow.inspector_events import InspectorMessage
from services.workflow.node_output_inspector_service import (
NodeOutputInspectorError,
NodeOutputStatus,
NodeOutputsView,
NodeStatus,
WorkflowRunSnapshotView,
)
# ──────────────────────────────────────────────────────────────────────────────
# Fixtures
# ──────────────────────────────────────────────────────────────────────────────
@pytest.fixture
def app_model() -> Any:
"""A minimal ``App`` stub the controller passes through to the service.
The SSE generator never reads its attributes just forwards it so a
sentinel object is enough.
"""
return MagicMock(name="App", tenant_id="tenant-1", id="app-1")
@pytest.fixture
def run_id() -> UUID:
return UUID("00000000-0000-0000-0000-0000000000aa")
def _snapshot_view(*, status: str, node_id: str = "agent-1") -> WorkflowRunSnapshotView:
from graphon.enums import WorkflowExecutionStatus
return WorkflowRunSnapshotView(
workflow_run_id="00000000-0000-0000-0000-0000000000aa",
workflow_run_status=WorkflowExecutionStatus(status),
node_outputs=[
NodeOutputsView(
node_id=node_id,
node_kind="agent",
node_display_name="Greeter",
node_status=NodeStatus.RUNNING if status == "running" else NodeStatus.READY,
outputs=[],
)
],
)
def _node_view(*, node_id: str = "agent-1", node_status: NodeStatus = NodeStatus.READY) -> NodeOutputsView:
return NodeOutputsView(
node_id=node_id,
node_kind="agent",
node_display_name="Greeter",
node_status=node_status,
outputs=[],
)
# ──────────────────────────────────────────────────────────────────────────────
# _sse_envelope
# ──────────────────────────────────────────────────────────────────────────────
def test_sse_envelope_serializes_dict_payload():
out = ctrl._sse_envelope("snapshot", {"foo": "bar"}, 7)
lines = out.rstrip("\n").split("\n")
assert lines[0] == "event: snapshot"
assert lines[1] == "id: 7"
assert lines[2] == 'data: {"foo": "bar"}'
assert out.endswith("\n\n") # SSE record separator
def test_sse_envelope_passes_strings_through_unmodified():
"""A raw string payload (e.g. ``:keepalive``) is emitted as-is."""
out = ctrl._sse_envelope("snapshot", ":keepalive", 1)
assert "data: :keepalive\n" in out
def test_sse_envelope_handles_unicode_payload():
out = ctrl._sse_envelope("node_changed", {"name": "你好"}, 3)
assert "你好" in out # ensure_ascii=False
# ──────────────────────────────────────────────────────────────────────────────
# _stream_inspector_events — fast path (already-terminal run)
# ──────────────────────────────────────────────────────────────────────────────
def _drain(stream: Iterator[str]) -> list[str]:
return list(stream)
def _parse(record: str) -> tuple[str, dict | None]:
"""Pull ``event`` + ``data`` (json-decoded) out of one SSE record."""
event = None
data: dict | None = None
for line in record.rstrip("\n").split("\n"):
if line.startswith("event: "):
event = line[len("event: ") :]
elif line.startswith("data: "):
try:
data = json.loads(line[len("data: ") :])
except json.JSONDecodeError:
data = None
assert event is not None
return event, data
@pytest.fixture
def patch_service(monkeypatch: pytest.MonkeyPatch):
"""Replace ``_service()`` with a MagicMock per-test."""
fake = MagicMock()
monkeypatch.setattr(ctrl, "_service", lambda: fake)
return fake
@pytest.fixture
def patch_subscribe(monkeypatch: pytest.MonkeyPatch):
"""Patch the pub/sub subscribe iterator."""
def _make(messages: list[InspectorMessage | None]):
def _subscribe(workflow_run_id: str, *, timeout_seconds: float = 1.0):
for m in messages:
if m is None:
# heartbeat sentinel
yield InspectorMessage(
kind="node_changed",
workflow_run_id=workflow_run_id,
node_id=None,
status=None,
)
else:
yield m
monkeypatch.setattr(ctrl.inspector_events, "subscribe", _subscribe)
return _make
def test_stream_fast_path_when_run_already_terminal(patch_service, app_model, run_id):
"""A run that's already ``succeeded`` should produce ``snapshot`` →
``workflow_run_completed`` and close without subscribing to pub/sub."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="succeeded")
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
assert len(records) == 2
e0, d0 = _parse(records[0])
e1, d1 = _parse(records[1])
assert e0 == "snapshot"
assert d0 is not None
assert d0["workflow_run_status"] == "succeeded"
assert e1 == "workflow_run_completed"
assert d1 is not None
assert d1["workflow_run_status"] == "succeeded"
def test_stream_fast_path_each_terminal_status(patch_service, app_model, run_id):
"""All four terminal statuses take the fast-path. Note the enum value for
partial success is the hyphenated ``partial-succeeded``."""
for terminal in ("succeeded", "failed", "stopped", "partial-succeeded"):
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status=terminal)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
events = [_parse(r)[0] for r in records]
assert events == ["snapshot", "workflow_run_completed"], terminal
def test_stream_initial_404_propagates_before_any_bytes(patch_service, app_model, run_id):
"""``NodeOutputInspectorError`` on the initial snapshot must surface as the
controller's ``_InspectorNotFound`` exception so Flask returns HTTP 404
not a half-streamed SSE body."""
patch_service.snapshot_workflow_run.side_effect = NodeOutputInspectorError(
"workflow_run_not_found", "Workflow run not found."
)
gen = ctrl._stream_inspector_events(app_model, run_id)
with pytest.raises(ctrl._InspectorNotFound) as exc:
next(gen)
assert exc.value.error_code == "workflow_run_not_found"
# ──────────────────────────────────────────────────────────────────────────────
# _stream_inspector_events — live path (run is running)
# ──────────────────────────────────────────────────────────────────────────────
def test_stream_live_emits_snapshot_then_node_changed_then_completion(
patch_service, patch_subscribe, app_model, run_id
):
"""Happy path: snapshot → 2× node_changed → workflow_run_completed."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_service.node_detail.return_value = _node_view(node_id="agent-1")
msgs = [
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="succeeded"),
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
]
patch_subscribe(msgs)
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
assert events == ["snapshot", "node_changed", "node_changed", "workflow_run_completed"]
# node_detail should be called once per delta (not once per heartbeat)
assert patch_service.node_detail.call_count == 2
def test_stream_emits_heartbeat_after_n_idle_ticks(
patch_service, patch_subscribe, monkeypatch: pytest.MonkeyPatch, app_model, run_id
):
"""When pub/sub returns the heartbeat sentinel ``_HEARTBEAT_EVERY_TICKS``
times in a row, the generator emits a ``:keepalive`` SSE comment."""
monkeypatch.setattr(ctrl, "_HEARTBEAT_EVERY_TICKS", 2)
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_service.node_detail.return_value = _node_view()
# 2 heartbeats → keepalive, then real message + completion.
patch_subscribe(
[
None,
None,
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="failed"),
]
)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
raw = "".join(records)
assert ":keepalive\n\n" in raw
assert "workflow_run_completed" in raw
def test_stream_hard_timeout_force_closes_without_terminal(
patch_service, patch_subscribe, monkeypatch: pytest.MonkeyPatch, app_model, run_id
):
"""If the engine crashes / drops the terminal event, the generator force-
closes after ``_STREAM_HARD_TIMEOUT_TICKS`` ticks rather than hanging."""
monkeypatch.setattr(ctrl, "_STREAM_HARD_TIMEOUT_TICKS", 3)
monkeypatch.setattr(ctrl, "_HEARTBEAT_EVERY_TICKS", 100) # avoid keepalive noise
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
# 5 heartbeats, no terminal → generator should bail after 3 ticks.
patch_subscribe([None] * 10)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
events = [_parse(r)[0] for r in records]
assert events == ["snapshot"] # only snapshot, then forced close
def test_stream_skips_messages_with_missing_node_id(patch_service, patch_subscribe, app_model, run_id):
"""Defensive: malformed node_changed without node_id is silently dropped."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_subscribe(
[
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="", status="running"),
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
]
)
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
assert events == ["snapshot", "workflow_run_completed"]
assert patch_service.node_detail.call_count == 0
def test_stream_skips_node_detail_404_without_breaking_stream(patch_service, patch_subscribe, app_model, run_id):
"""When node_detail 404s mid-stream (node still being persisted), the
generator just drops that delta and keeps streaming."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_service.node_detail.side_effect = NodeOutputInspectorError("node_not_in_workflow_run", "transient")
patch_subscribe(
[
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
]
)
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
assert events == ["snapshot", "workflow_run_completed"]
def test_stream_emits_error_event_on_node_detail_unexpected_exception(
patch_service, patch_subscribe, app_model, run_id
):
"""Any non-Inspector exception (DB outage, JSON decode error) becomes a
user-visible ``error`` SSE record; the stream keeps running."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_service.node_detail.side_effect = RuntimeError("db gone")
patch_subscribe(
[
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
]
)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
events = [_parse(r) for r in records]
kinds = [e for e, _ in events]
assert kinds == ["snapshot", "error", "workflow_run_completed"]
err_event, err_data = events[1]
assert err_data is not None
assert err_data["node_id"] == "agent-1"
assert "failed" in err_data["message"]
def test_stream_workflow_completed_status_falls_back_to_unknown(patch_service, patch_subscribe, app_model, run_id):
"""If the pub/sub message arrives with status=None (publish race), the SSE
payload still carries ``workflow_run_status`` with the ``unknown``
sentinel so the frontend never sees a missing field."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_subscribe(
[InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status=None)]
)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
e, d = _parse(records[-1])
assert e == "workflow_run_completed"
assert d is not None
assert d["workflow_run_status"] == "unknown"
# ──────────────────────────────────────────────────────────────────────────────
# Resource classes — import-level smoke + service-method delegation
# ──────────────────────────────────────────────────────────────────────────────
def test_resource_classes_are_registered():
"""All 8 Inspector Resource classes must be importable from the module so
flask-restx can discover them via the namespace decorators."""
for name in (
"WorkflowDraftRunNodeOutputsApi",
"WorkflowDraftRunNodeOutputDetailApi",
"WorkflowDraftRunNodeOutputPreviewApi",
"WorkflowDraftRunNodeOutputEventsApi",
"WorkflowPublishedRunNodeOutputsApi",
"WorkflowPublishedRunNodeOutputDetailApi",
"WorkflowPublishedRunNodeOutputPreviewApi",
"WorkflowPublishedRunNodeOutputEventsApi",
):
assert hasattr(ctrl, name), name
def test_inspector_not_found_preserves_error_code():
"""Sanity: the controller's bespoke 404 wrapper hangs onto the
Inspector's specific error code rather than collapsing to a generic
``not_found``."""
err = NodeOutputInspectorError("node_not_in_workflow_run", "boom")
wrapped = ctrl._InspectorNotFound(err)
assert wrapped.error_code == "node_not_in_workflow_run"
assert wrapped.code == 404
# ──────────────────────────────────────────────────────────────────────────────
# _serve_* — shared REST handler bodies (covered by both draft + published)
# ──────────────────────────────────────────────────────────────────────────────
def test_serve_snapshot_happy_path(patch_service, app_model, run_id):
"""Returns the snapshot view as JSON-serialisable dict."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
result = ctrl._serve_snapshot(app_model, run_id)
assert isinstance(result, dict)
assert result["workflow_run_id"] == "00000000-0000-0000-0000-0000000000aa"
patch_service.snapshot_workflow_run.assert_called_once_with(app_model=app_model, workflow_run_id=str(run_id))
def test_serve_snapshot_translates_inspector_error_to_404(patch_service, app_model, run_id):
"""``NodeOutputInspectorError`` becomes the controller's 404 wrapper with
the specific ``error_code`` preserved."""
patch_service.snapshot_workflow_run.side_effect = NodeOutputInspectorError("workflow_run_not_found", "no such run")
with pytest.raises(ctrl._InspectorNotFound) as exc:
ctrl._serve_snapshot(app_model, run_id)
assert exc.value.error_code == "workflow_run_not_found"
def test_serve_node_detail_happy_path(patch_service, app_model, run_id):
patch_service.node_detail.return_value = _node_view(node_id="agent-1")
result = ctrl._serve_node_detail(app_model, run_id, "agent-1")
assert result["node_id"] == "agent-1"
patch_service.node_detail.assert_called_once_with(
app_model=app_model, workflow_run_id=str(run_id), node_id="agent-1"
)
def test_serve_node_detail_translates_inspector_error(patch_service, app_model, run_id):
patch_service.node_detail.side_effect = NodeOutputInspectorError("node_not_in_workflow_run", "missing")
with pytest.raises(ctrl._InspectorNotFound) as exc:
ctrl._serve_node_detail(app_model, run_id, "ghost")
assert exc.value.error_code == "node_not_in_workflow_run"
def test_serve_output_preview_happy_path(patch_service, app_model, run_id):
from services.workflow.node_output_inspector_service import (
DeclaredOutputType,
OutputPreviewView,
)
patch_service.output_preview.return_value = OutputPreviewView(
node_id="agent-1",
output_name="text",
type=DeclaredOutputType.STRING,
status=NodeOutputStatus.READY,
value="Hello",
)
result = ctrl._serve_output_preview(app_model, run_id, "agent-1", "text")
assert result["value"] == "Hello"
assert result["status"] == "ready"
patch_service.output_preview.assert_called_once_with(
app_model=app_model,
workflow_run_id=str(run_id),
node_id="agent-1",
output_name="text",
)
def test_serve_output_preview_translates_inspector_error(patch_service, app_model, run_id):
patch_service.output_preview.side_effect = NodeOutputInspectorError("node_output_not_declared", "no such output")
with pytest.raises(ctrl._InspectorNotFound) as exc:
ctrl._serve_output_preview(app_model, run_id, "agent-1", "phantom")
assert exc.value.error_code == "node_output_not_declared"
# ──────────────────────────────────────────────────────────────────────────────
# Note: the Resource ``.get`` methods themselves (6 REST + 2 SSE) are
# 1-line delegators to the helpers above. They can't be called directly in a
# unit test because their decorator stack (``@setup_required`` /
# ``@login_required`` / ``@account_initialization_required`` /
# ``@get_app_model``) needs a real Flask request context + DB-backed account.
# The integration test in
# ``tests/integration_tests/services/test_node_output_inspector_service.py``
# (and the E2E driver in /tmp/e2e_inspector_sse_published.py) exercise them
# through the HTTP stack.
# ──────────────────────────────────────────────────────────────────────────────

View File

@ -0,0 +1,192 @@
"""Verify the workflow persistence layer fans Inspector deltas to redis pub/sub.
The hook lives in ``core/app/workflow/layers/persistence.py``:
every ``_handle_node_*`` and the terminal ``_handle_graph_run_*`` handlers
call into ``services.workflow.inspector_events.publish_node_changed`` /
``publish_workflow_completed`` after the DB write succeeds. Those calls are
the only thing the Inspector SSE stream listens to, so any future refactor of
the persistence layer must keep them in place.
We don't reconstruct a full workflow engine here — the handlers are tested
in isolation by patching just the moving parts they touch
(``_workflow_execution`` + ``_node_execution_cache``) and asserting against
the publisher module's call sites. This keeps the test compact and tied to
the contract, not the implementation.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from unittest.mock import MagicMock
import pytest
from core.app.workflow.layers import persistence as persistence_mod
from core.app.workflow.layers.persistence import WorkflowPersistenceLayer
@pytest.fixture
def layer() -> WorkflowPersistenceLayer:
"""Build a layer instance with all repository / trace deps stubbed.
We bypass ``__init__`` because constructing it for real pulls in the
workflow engine's app-generate-entity, repos, and a runtime state — none
of which matter for asserting that the publish-hook fires.
"""
instance = WorkflowPersistenceLayer.__new__(WorkflowPersistenceLayer)
# Minimum surface the handlers touch:
instance._workflow_execution_repository = MagicMock()
instance._workflow_node_execution_repository = MagicMock()
instance._trace_manager = None
instance._workflow_info = MagicMock(workflow_id="wf-1")
instance._application_generate_entity = MagicMock()
# Use a SimpleNamespace-like spec so Pydantic-validated callsites (e.g.
# ``WorkflowNodeExecution.new`` requires real strings) get the right types.
workflow_execution = MagicMock()
workflow_execution.id_ = "run-1"
workflow_execution.workflow_id = "wf-1"
workflow_execution.status = MagicMock(value="succeeded")
workflow_execution.outputs = {}
workflow_execution.error_message = None
workflow_execution.exceptions_count = 0
workflow_execution.finished_at = None
instance._workflow_execution = workflow_execution
instance._node_execution_cache = {}
instance._node_snapshots = {}
instance._node_sequence = 0
# `graph_runtime_state` is a layer-base property; stub it.
instance._graph_runtime_state = MagicMock(total_tokens=0, node_run_steps=0, outputs={}, exceptions_count=0)
return instance
@pytest.fixture
def capture_publishes(monkeypatch: pytest.MonkeyPatch) -> dict[str, list]:
"""Replace the two publishers with capture lists so each test can assert
on the exact arguments."""
calls: dict[str, list] = {"node": [], "workflow": []}
def fake_node(*, workflow_run_id: str, node_id: str, status: str) -> None:
calls["node"].append({"workflow_run_id": workflow_run_id, "node_id": node_id, "status": status})
def fake_workflow(*, workflow_run_id: str, status: str) -> None:
calls["workflow"].append({"workflow_run_id": workflow_run_id, "status": status})
monkeypatch.setattr(persistence_mod, "_inspector_publish_node_changed", fake_node)
monkeypatch.setattr(persistence_mod, "_inspector_publish_workflow_completed", fake_workflow)
return calls
# ──────────────────────────────────────────────────────────────────────────────
# Graph-level publish hooks
# ──────────────────────────────────────────────────────────────────────────────
def _graph_event(**kwargs: Any) -> MagicMock:
return MagicMock(**kwargs)
def test_graph_run_succeeded_publishes_workflow_completed(layer, capture_publishes):
layer._workflow_execution.status = MagicMock(value="succeeded")
layer._handle_graph_run_succeeded(_graph_event(outputs={"text": "hi"}))
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "succeeded"}]
assert capture_publishes["node"] == []
def test_graph_run_partial_succeeded_publishes_workflow_completed(layer, capture_publishes):
layer._workflow_execution.status = MagicMock(value="partial-succeeded")
layer._handle_graph_run_partial_succeeded(_graph_event(outputs={}, exceptions_count=1))
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "partial-succeeded"}]
def test_graph_run_failed_publishes_workflow_completed(layer, capture_publishes):
layer._workflow_execution.status = MagicMock(value="failed")
layer._handle_graph_run_failed(_graph_event(error="boom", exceptions_count=0))
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "failed"}]
def test_graph_run_aborted_publishes_workflow_completed(layer, capture_publishes):
layer._workflow_execution.status = MagicMock(value="stopped")
layer._handle_graph_run_aborted(_graph_event(reason="user stop"))
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "stopped"}]
def test_graph_run_paused_does_not_publish_completion(layer, capture_publishes):
"""Pause is not a terminal state — the Inspector keeps waiting for either
resume or a real terminal event."""
layer._handle_graph_run_paused(_graph_event(outputs={}))
assert capture_publishes["workflow"] == []
assert capture_publishes["node"] == []
# ──────────────────────────────────────────────────────────────────────────────
# Node-level publish hooks
# ──────────────────────────────────────────────────────────────────────────────
def _node_started_event(node_id: str = "agent-1", exec_id: str = "exec-1") -> MagicMock:
return MagicMock(
id=exec_id,
node_id=node_id,
node_type="agent",
node_title="Greeter",
predecessor_node_id=None,
in_iteration_id=None,
in_loop_id=None,
start_at=datetime(2026, 5, 26, 0, 0, 0),
)
def _seed_node_execution(layer: WorkflowPersistenceLayer, exec_id: str, node_id: str) -> None:
"""Inject a domain execution into the cache so the success / fail / etc
handlers (which look it up by id) can run without going through started."""
layer._node_execution_cache[exec_id] = MagicMock(
id=exec_id, node_id=node_id, status=MagicMock(value="running"), outputs={}, error=None
)
def test_node_started_publishes_running(layer, capture_publishes):
layer._handle_node_started(_node_started_event())
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "running"}]
def test_node_retry_publishes_retry(layer, capture_publishes):
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
event = MagicMock(id="exec-1", error="rate limit")
layer._handle_node_retry(event)
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "retry"}]
def test_node_succeeded_publishes_succeeded(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
# Stub the inner _update_node_execution so we don't have to construct a
# full NodeRunResult — we only want to confirm the publish happens after.
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
event = MagicMock(id="exec-1", node_run_result=MagicMock(), finished_at=datetime.now())
layer._handle_node_succeeded(event)
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "succeeded"}]
def test_node_failed_publishes_failed(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
event = MagicMock(id="exec-1", node_run_result=MagicMock(), error="bad", finished_at=datetime.now())
layer._handle_node_failed(event)
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "failed"}]
def test_node_exception_publishes_exception(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
event = MagicMock(id="exec-1", node_run_result=MagicMock(), error="oom", finished_at=datetime.now())
layer._handle_node_exception(event)
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "exception"}]
def test_node_pause_requested_does_not_publish(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
"""Node pause is not an Inspector-visible state — no publish."""
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
event = MagicMock(id="exec-1", node_run_result=MagicMock())
layer._handle_node_pause_requested(event)
assert capture_publishes["node"] == []

View File

@ -0,0 +1,224 @@
"""Unit tests for :mod:`services.workflow.inspector_events`.
The publisher and subscriber both touch redis, so we mock it out at the
``redis_client`` boundary. The goal is to lock down:
1. the channel-naming convention (frontend SSE doesn't need to know it but
tests catch accidental renames),
2. the JSON envelope (``kind / workflow_run_id / node_id / status``),
3. publisher robustness when redis is unavailable,
4. subscriber's tolerance of malformed payloads and bytes-vs-str messages,
5. subscriber's heartbeat-on-idle behaviour.
"""
from __future__ import annotations
import json
from collections.abc import Iterator
from typing import Any
from unittest.mock import MagicMock, patch
from services.workflow import inspector_events
from services.workflow.inspector_events import InspectorMessage
# ──────────────────────────────────────────────────────────────────────────────
# Channel + envelope
# ──────────────────────────────────────────────────────────────────────────────
def test_channel_for_returns_namespaced_key():
assert inspector_events.channel_for("run-42") == "dify:inspector:workflow_run:run-42"
def test_inspector_message_to_json_round_trip():
msg = InspectorMessage(kind="node_changed", workflow_run_id="r1", node_id="agent-1", status="succeeded")
parsed = json.loads(msg.to_json())
assert parsed == {"kind": "node_changed", "workflow_run_id": "r1", "node_id": "agent-1", "status": "succeeded"}
def test_inspector_message_from_json_rejects_bad_kind():
blob = json.dumps({"kind": "something_else", "workflow_run_id": "r1"})
assert InspectorMessage.from_json(blob) is None
def test_inspector_message_from_json_rejects_bad_workflow_run_id():
blob = json.dumps({"kind": "node_changed", "workflow_run_id": ""})
assert InspectorMessage.from_json(blob) is None
def test_inspector_message_from_json_rejects_non_string_node_id():
blob = json.dumps({"kind": "node_changed", "workflow_run_id": "r1", "node_id": 42})
assert InspectorMessage.from_json(blob) is None
def test_inspector_message_from_json_returns_none_for_invalid_json():
assert InspectorMessage.from_json("{not json") is None
def test_inspector_message_from_json_rejects_non_dict_payload():
"""Defensive: a JSON array or scalar is not an InspectorMessage."""
assert InspectorMessage.from_json("[1, 2, 3]") is None
assert InspectorMessage.from_json('"plain string"') is None
def test_inspector_message_from_json_rejects_non_string_status():
"""Status field, if present, must be a string."""
blob = json.dumps({"kind": "workflow_completed", "workflow_run_id": "r1", "status": 42})
assert InspectorMessage.from_json(blob) is None
# ──────────────────────────────────────────────────────────────────────────────
# Publisher
# ──────────────────────────────────────────────────────────────────────────────
def test_publish_node_changed_writes_to_run_channel():
fake_redis = MagicMock()
with patch.object(inspector_events, "redis_client", fake_redis):
inspector_events.publish_node_changed(workflow_run_id="run-1", node_id="agent-1", status="running")
fake_redis.publish.assert_called_once()
channel, blob = fake_redis.publish.call_args.args
assert channel == "dify:inspector:workflow_run:run-1"
msg = InspectorMessage.from_json(blob)
assert msg is not None
assert msg.kind == "node_changed"
assert msg.node_id == "agent-1"
assert msg.status == "running"
def test_publish_workflow_completed_emits_terminal_message():
fake_redis = MagicMock()
with patch.object(inspector_events, "redis_client", fake_redis):
inspector_events.publish_workflow_completed(workflow_run_id="run-1", status="succeeded")
blob = fake_redis.publish.call_args.args[1]
msg = InspectorMessage.from_json(blob)
assert msg is not None
assert msg.kind == "workflow_completed"
assert msg.node_id is None
assert msg.status == "succeeded"
def test_publish_swallows_redis_errors():
"""Persistence must not crash if redis blows up — we publish best-effort."""
class _BrokenRedis:
def publish(self, *_args: Any, **_kwargs: Any) -> None:
raise RuntimeError("redis offline")
with patch.object(inspector_events, "redis_client", _BrokenRedis()):
# No exception should escape.
inspector_events.publish_node_changed(workflow_run_id="run-1", node_id="agent-1", status="running")
# ──────────────────────────────────────────────────────────────────────────────
# Subscriber
# ──────────────────────────────────────────────────────────────────────────────
def _make_fake_pubsub(messages: list[dict[str, Any] | None]) -> MagicMock:
"""Build a redis pubsub stub that replays ``messages`` then raises StopIteration."""
pubsub = MagicMock()
it: Iterator[dict[str, Any] | None] = iter(messages)
pubsub.get_message.side_effect = lambda **_kwargs: next(it, None)
return pubsub
def test_subscribe_yields_heartbeat_then_real_message():
"""Idle ticks (``get_message`` returns None) surface as a sentinel; real
payloads decode to ``InspectorMessage`` instances."""
payload = json.dumps(
{"kind": "node_changed", "workflow_run_id": "run-1", "node_id": "agent-1", "status": "succeeded"}
)
fake_redis = MagicMock()
fake_redis.pubsub.return_value = _make_fake_pubsub(
[
None, # heartbeat tick
{"data": payload.encode("utf-8")}, # bytes payload, real message
None, # heartbeat
]
)
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
first = next(gen)
second = next(gen)
third = next(gen)
# First message is the heartbeat sentinel (both node_id and status are None).
assert first.node_id is None
assert first.status is None
# Second is the real one.
assert second.kind == "node_changed"
assert second.node_id == "agent-1"
assert second.status == "succeeded"
# Third is another heartbeat.
assert third.node_id is None
def test_subscribe_skips_malformed_payloads():
fake_redis = MagicMock()
fake_redis.pubsub.return_value = _make_fake_pubsub(
[
{"data": b"not json at all"},
{"data": json.dumps({"kind": "node_changed", "workflow_run_id": "run-1"}).encode("utf-8")},
]
)
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
msg = next(gen)
assert msg.kind == "node_changed"
assert msg.node_id is None
def test_subscribe_unsubscribes_on_teardown():
fake_pubsub = _make_fake_pubsub([None])
fake_redis = MagicMock()
fake_redis.pubsub.return_value = fake_pubsub
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
next(gen)
gen.close()
fake_pubsub.unsubscribe.assert_called_once_with("dify:inspector:workflow_run:run-1")
fake_pubsub.close.assert_called_once()
def test_subscribe_swallows_teardown_errors():
"""``unsubscribe`` / ``close`` failures must not propagate out of the
generator they're best-effort cleanup."""
fake_pubsub = MagicMock()
fake_pubsub.get_message.return_value = None
fake_pubsub.unsubscribe.side_effect = RuntimeError("redis offline")
fake_pubsub.close.side_effect = RuntimeError("close failed")
fake_redis = MagicMock()
fake_redis.pubsub.return_value = fake_pubsub
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
next(gen)
# The teardown path runs in ``finally``; closing the generator
# exercises it. No exception should escape.
gen.close()
def test_subscribe_skips_non_string_data_payloads():
"""``raw["data"]`` can be ``None`` / int / bytes — only str is decodable
and the rest are silently skipped."""
fake_pubsub = MagicMock()
msgs: list[dict[str, Any] | None] = [
{"data": None}, # missing payload
{"data": 12345}, # int payload (shouldn't happen, defensive)
{
"data": json.dumps(
{"kind": "node_changed", "workflow_run_id": "run-1", "node_id": "agent-1", "status": "running"}
)
},
]
it = iter(msgs)
fake_pubsub.get_message.side_effect = lambda **_kw: next(it, None)
fake_redis = MagicMock()
fake_redis.pubsub.return_value = fake_pubsub
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
msg = next(gen)
assert msg.kind == "node_changed"
assert msg.node_id == "agent-1"

View File

@ -0,0 +1,499 @@
"""Unit tests for NodeOutputInspectorService (Stage 4 §8).
The service reads from postgres and resolves agent v2 bindings; this suite
mocks ``session_factory`` and the binding resolver so we exercise the
view-construction logic without DB / network access.
"""
from __future__ import annotations
import json
from datetime import UTC, datetime
from types import SimpleNamespace
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
from models.agent_config_entities import (
DeclaredArrayItem,
DeclaredOutputConfig,
DeclaredOutputType,
)
from models.enums import WorkflowRunTriggeredFrom
from services.workflow.node_output_inspector_service import (
NodeOutputInspectorError,
NodeOutputInspectorService,
NodeOutputStatus,
NodeStatus,
)
# ──────────────────────────────────────────────────────────────────────────────
# Fixtures
# ──────────────────────────────────────────────────────────────────────────────
def _app_model(*, tenant_id: str = "tenant-1", app_id: str = "app-1"):
return SimpleNamespace(tenant_id=tenant_id, id=app_id)
def _workflow_run(
*,
run_id: str = "run-1",
workflow_id: str = "workflow-1",
tenant_id: str = "tenant-1",
app_id: str = "app-1",
triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING,
status: WorkflowExecutionStatus = WorkflowExecutionStatus.RUNNING,
nodes: list[dict[str, Any]] | None = None,
):
return SimpleNamespace(
id=run_id,
workflow_id=workflow_id,
tenant_id=tenant_id,
app_id=app_id,
triggered_from=triggered_from,
status=status,
graph=json.dumps({"nodes": nodes or []}),
)
def _execution(
*,
node_id: str,
node_type: str = "agent",
title: str = "",
status: WorkflowNodeExecutionStatus = WorkflowNodeExecutionStatus.SUCCEEDED,
outputs: dict[str, Any] | None = None,
execution_metadata: dict[str, Any] | None = None,
index: int = 1,
created_at: datetime | None = None,
finished_at: datetime | None = None,
):
return SimpleNamespace(
node_id=node_id,
node_type=node_type,
title=title or node_id,
status=status,
outputs=json.dumps(outputs) if outputs is not None else None,
execution_metadata=json.dumps(execution_metadata) if execution_metadata is not None else None,
index=index,
created_at=created_at or datetime.now(UTC),
finished_at=finished_at,
)
def _agent_v2_node(*, node_id: str = "agent-node-1", title: str = "My Agent") -> dict[str, Any]:
return {
"id": node_id,
"data": {"type": "agent", "version": "2", "title": title},
}
def _non_agent_node(*, node_id: str = "tool-node-1", node_type: str = "tool", title: str = "Slack") -> dict[str, Any]:
return {
"id": node_id,
"data": {"type": node_type, "title": title},
}
def _patch_session(
*,
workflow_run: SimpleNamespace | None,
executions: list[SimpleNamespace] | None = None,
):
"""Patch ``session_factory.create_session`` to return the configured rows.
Returns a context manager that the test uses with ``with``.
"""
executions = executions or []
mock_session = MagicMock()
mock_session.scalar.return_value = workflow_run
mock_session.scalars.return_value.all.return_value = executions
cm = MagicMock()
cm.__enter__.return_value = mock_session
cm.__exit__.return_value = False
return patch(
"services.workflow.node_output_inspector_service.session_factory.create_session",
return_value=cm,
)
def _stub_binding_resolver(*, declared_outputs: list[DeclaredOutputConfig]):
"""Build a fake ``WorkflowAgentBindingResolver`` whose ``.resolve`` returns
a binding with ``node_job_config_dict.declared_outputs``."""
binding = SimpleNamespace(
id="binding-1",
node_job_config_dict={
"workflow_prompt": "stub",
"declared_outputs": [o.model_dump() for o in declared_outputs],
},
)
bundle = SimpleNamespace(binding=binding, agent=None, snapshot=None)
resolver = MagicMock()
resolver.resolve.return_value = bundle
return resolver
def _make_service(declared_outputs: list[DeclaredOutputConfig] | None = None) -> NodeOutputInspectorService:
return NodeOutputInspectorService(binding_resolver=_stub_binding_resolver(declared_outputs=declared_outputs or []))
# ──────────────────────────────────────────────────────────────────────────────
# 404 paths
# ──────────────────────────────────────────────────────────────────────────────
def test_snapshot_404_when_workflow_run_missing():
service = _make_service()
with _patch_session(workflow_run=None):
with pytest.raises(NodeOutputInspectorError) as exc:
service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="missing")
assert exc.value.code == "workflow_run_not_found"
def test_snapshot_accepts_published_run_d1_lifted():
"""D-1 was lifted 2026-05-26: any ``triggered_from`` is now accepted."""
service = _make_service()
run = _workflow_run(
nodes=[_agent_v2_node(node_id="agent-1")],
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
)
with _patch_session(workflow_run=run, executions=[]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.workflow_run_id == "run-1"
assert [n.node_id for n in snapshot.node_outputs] == ["agent-1"]
def test_snapshot_accepts_webhook_triggered_run():
"""Webhook / schedule / plugin triggers are also published-side."""
service = _make_service()
run = _workflow_run(
nodes=[_agent_v2_node(node_id="agent-1")],
triggered_from=WorkflowRunTriggeredFrom.WEBHOOK,
)
with _patch_session(workflow_run=run, executions=[]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.workflow_run_id == "run-1"
def test_node_detail_404_when_node_id_absent_from_graph():
service = _make_service()
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
with _patch_session(workflow_run=run, executions=[]):
with pytest.raises(NodeOutputInspectorError) as exc:
service.node_detail(app_model=_app_model(), workflow_run_id="run-1", node_id="ghost")
assert exc.value.code == "node_not_in_workflow_run"
def test_output_preview_404_when_output_name_unknown():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", outputs={"text": "hello"})
with _patch_session(workflow_run=run, executions=[ex]):
with pytest.raises(NodeOutputInspectorError) as exc:
service.output_preview(
app_model=_app_model(),
workflow_run_id="run-1",
node_id="agent-1",
output_name="missing",
)
assert exc.value.code == "node_output_not_declared"
# ──────────────────────────────────────────────────────────────────────────────
# Snapshot happy path
# ──────────────────────────────────────────────────────────────────────────────
def test_snapshot_status_pending_when_node_has_no_execution():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
with _patch_session(workflow_run=run, executions=[]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert len(snapshot.node_outputs) == 1
node = snapshot.node_outputs[0]
assert node.node_status == NodeStatus.IDLE
assert node.outputs[0].status == NodeOutputStatus.PENDING
def test_snapshot_status_running():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", status=WorkflowNodeExecutionStatus.RUNNING)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.node_outputs[0].node_status == NodeStatus.RUNNING
assert snapshot.node_outputs[0].outputs[0].status == NodeOutputStatus.RUNNING
def test_snapshot_status_failed_node_marks_all_outputs_failed():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(name="a", type=DeclaredOutputType.STRING),
DeclaredOutputConfig(name="b", type=DeclaredOutputType.NUMBER),
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", status=WorkflowNodeExecutionStatus.FAILED)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
statuses = {o.name: o.status for o in snapshot.node_outputs[0].outputs}
assert statuses == {"a": NodeOutputStatus.FAILED, "b": NodeOutputStatus.FAILED}
def test_snapshot_status_ready_when_outputs_present_and_no_failure_metadata():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", outputs={"text": "hello"})
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.READY
assert output.value_preview == "hello"
def test_snapshot_marks_type_check_failure():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(
node_id="agent-1",
outputs={"text": "ok"},
execution_metadata={
"output_type_check": {
"passed": False,
"results": [{"name": "text", "type": "string", "status": "type_check_failed", "reason": "wrong shape"}],
}
},
)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.TYPE_CHECK_FAILED
assert output.type_check is not None
assert output.type_check.passed is False
assert output.type_check.reason == "wrong shape"
def test_snapshot_marks_output_check_failure_when_type_check_passed():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(
name="report",
type=DeclaredOutputType.FILE,
)
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(
node_id="agent-1",
outputs={"report": {"file_id": "550e8400-e29b-41d4-a716-446655440000"}},
execution_metadata={
"output_type_check": {"passed": True, "results": [{"name": "report", "status": "ready"}]},
"output_check": {
"passed": False,
"results": [{"name": "report", "status": "failed", "reason": "benchmark mismatch"}],
},
},
)
with (
_patch_session(workflow_run=run, executions=[ex]),
patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/x",
),
):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.OUTPUT_CHECK_FAILED
assert output.output_check is not None
assert output.output_check.passed is False
assert output.output_check.reason == "benchmark mismatch"
def test_snapshot_marks_not_produced_when_declared_output_missing_from_payload():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING),
DeclaredOutputConfig(name="optional_meta", type=DeclaredOutputType.OBJECT, required=False),
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", outputs={"text": "hi"}) # optional_meta missing
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
statuses = {o.name: o.status for o in snapshot.node_outputs[0].outputs}
assert statuses == {"text": NodeOutputStatus.READY, "optional_meta": NodeOutputStatus.NOT_PRODUCED}
# ──────────────────────────────────────────────────────────────────────────────
# Non-agent node — outputs inferred from execution payload
# ──────────────────────────────────────────────────────────────────────────────
def test_non_agent_node_outputs_inferred_from_payload_keys():
service = _make_service()
run = _workflow_run(nodes=[_non_agent_node(node_id="tool-1", node_type="tool")])
ex = _execution(
node_id="tool-1",
node_type="tool",
outputs={"message": "sent", "thread_ts": "1234"},
)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output_names = sorted(o.name for o in snapshot.node_outputs[0].outputs)
assert output_names == ["message", "thread_ts"]
# All inferred outputs should have ``type=None`` since we don't know the
# schema yet.
assert all(o.type is None for o in snapshot.node_outputs[0].outputs)
# ──────────────────────────────────────────────────────────────────────────────
# File preview / signed URL
# ──────────────────────────────────────────────────────────────────────────────
def test_file_output_preview_includes_signed_url():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(name="report", type=DeclaredOutputType.FILE),
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
file_payload = {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}
ex = _execution(node_id="agent-1", outputs={"report": file_payload})
with (
_patch_session(workflow_run=run, executions=[ex]),
patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/x.pdf",
),
):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
preview_value = snapshot.node_outputs[0].outputs[0].value_preview
assert isinstance(preview_value, dict)
assert preview_value["preview_url"] == "https://signed.example/x.pdf"
assert preview_value["filename"] == "x.pdf"
def test_file_output_preview_endpoint_returns_full_value_with_signed_url():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(name="report", type=DeclaredOutputType.FILE),
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
file_payload = {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}
ex = _execution(node_id="agent-1", outputs={"report": file_payload})
with (
_patch_session(workflow_run=run, executions=[ex]),
patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/x.pdf",
),
):
preview = service.output_preview(
app_model=_app_model(),
workflow_run_id="run-1",
node_id="agent-1",
output_name="report",
)
assert preview.output_name == "report"
assert preview.status == NodeOutputStatus.READY
assert isinstance(preview.value, dict)
assert preview.value["preview_url"] == "https://signed.example/x.pdf"
# ──────────────────────────────────────────────────────────────────────────────
# Retry / metadata
# ──────────────────────────────────────────────────────────────────────────────
def test_retried_count_pulled_from_attempt_metadata():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(
node_id="agent-1",
outputs={"text": "ok"},
execution_metadata={"attempt": 2},
)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.node_outputs[0].outputs[0].retried == 2
# ──────────────────────────────────────────────────────────────────────────────
# Latest-execution-per-node grouping
# ──────────────────────────────────────────────────────────────────────────────
def test_keeps_latest_execution_per_node_by_index():
"""When a node has multiple executions (retries / iterations) keep the
canonical one the row with the highest ``index``."""
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
older = _execution(node_id="agent-1", outputs={"text": "old"}, index=1)
newer = _execution(node_id="agent-1", outputs={"text": "new"}, index=5)
with _patch_session(workflow_run=run, executions=[older, newer]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.node_outputs[0].outputs[0].value_preview == "new"
# ──────────────────────────────────────────────────────────────────────────────
# Array item declarations round-trip correctly
# ──────────────────────────────────────────────────────────────────────────────
def test_array_typed_output_with_array_item_renders_correctly():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(
name="files",
type=DeclaredOutputType.ARRAY,
array_item=DeclaredArrayItem(type=DeclaredOutputType.FILE),
)
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", outputs={"files": []})
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output = snapshot.node_outputs[0].outputs[0]
assert output.type == DeclaredOutputType.ARRAY
# ──────────────────────────────────────────────────────────────────────────────
# Graph parsing edge cases
# ──────────────────────────────────────────────────────────────────────────────
def test_unparseable_graph_blob_yields_empty_snapshot_not_500():
service = _make_service()
run = SimpleNamespace(
id="run-1",
workflow_id="workflow-1",
tenant_id="tenant-1",
app_id="app-1",
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status=WorkflowExecutionStatus.RUNNING,
graph="{not valid json",
)
with _patch_session(workflow_run=run, executions=[]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.node_outputs == []

View File

@ -167,6 +167,14 @@ import {
zGetAppsByAppIdWorkflowsDraftNodesByNodeIdVariablesResponse,
zGetAppsByAppIdWorkflowsDraftPath,
zGetAppsByAppIdWorkflowsDraftResponse,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse,
zGetAppsByAppIdWorkflowsDraftSystemVariablesPath,
zGetAppsByAppIdWorkflowsDraftSystemVariablesResponse,
zGetAppsByAppIdWorkflowsDraftVariablesByVariableIdPath,
@ -175,6 +183,14 @@ import {
zGetAppsByAppIdWorkflowsDraftVariablesQuery,
zGetAppsByAppIdWorkflowsDraftVariablesResponse,
zGetAppsByAppIdWorkflowsPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse,
zGetAppsByAppIdWorkflowsPublishPath,
zGetAppsByAppIdWorkflowsPublishResponse,
zGetAppsByAppIdWorkflowsQuery,
@ -3787,13 +3803,132 @@ export const run10 = {
}
/**
* Get system variables for workflow
* Server-Sent Events stream of inspector deltas for a draft workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get59 = oc
.route({
deprecated: true,
description:
'Server-Sent Events stream of inspector deltas for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEvents',
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath }))
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse)
export const events = {
get: get59,
}
/**
* Full value for one declared output, including signed download URL for files.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get60 = oc
.route({
deprecated: true,
description:
'Full value for one declared output, including signed download URL for files.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreview',
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview',
tags: ['console'],
})
.input(
z.object({
params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
}),
)
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse)
export const preview3 = {
get: get60,
}
export const byOutputName = {
preview: preview3,
}
/**
* One node's declared outputs for a draft workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get61 = oc
.route({
deprecated: true,
description:
'One node\'s declared outputs for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeId',
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath }))
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse)
export const byNodeId8 = {
get: get61,
byOutputName,
}
/**
* Snapshot of every node's declared outputs for a draft workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get62 = oc
.route({
deprecated: true,
description:
'Snapshot of every node\'s declared outputs for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputs',
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath }))
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse)
export const nodeOutputs = {
get: get62,
events,
byNodeId: byNodeId8,
}
export const byRunId2 = {
nodeOutputs,
}
export const runs = {
byRunId: byRunId2,
}
/**
* Get system variables for workflow
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get63 = oc
.route({
deprecated: true,
description:
@ -3808,7 +3943,7 @@ export const get59 = oc
.output(zGetAppsByAppIdWorkflowsDraftSystemVariablesResponse)
export const systemVariables = {
get: get59,
get: get63,
}
/**
@ -3930,7 +4065,7 @@ export const delete9 = oc
*
* @deprecated
*/
export const get60 = oc
export const get64 = oc
.route({
deprecated: true,
description:
@ -3972,7 +4107,7 @@ export const patch2 = oc
export const byVariableId = {
delete: delete9,
get: get60,
get: get64,
patch: patch2,
reset,
}
@ -4002,7 +4137,7 @@ export const delete10 = oc
*
* @deprecated
*/
export const get61 = oc
export const get65 = oc
.route({
deprecated: true,
description:
@ -4024,7 +4159,7 @@ export const get61 = oc
export const variables2 = {
delete: delete10,
get: get61,
get: get65,
byVariableId,
}
@ -4037,7 +4172,7 @@ export const variables2 = {
*
* @deprecated
*/
export const get62 = oc
export const get66 = oc
.route({
deprecated: true,
description:
@ -4082,7 +4217,7 @@ export const post55 = oc
.output(zPostAppsByAppIdWorkflowsDraftResponse)
export const draft2 = {
get: get62,
get: get66,
post: post55,
conversationVariables: conversationVariables2,
environmentVariables,
@ -4092,6 +4227,7 @@ export const draft2 = {
loop: loop2,
nodes: nodes7,
run: run10,
runs,
systemVariables,
trigger: trigger2,
variables: variables2,
@ -4106,7 +4242,7 @@ export const draft2 = {
*
* @deprecated
*/
export const get63 = oc
export const get67 = oc
.route({
deprecated: true,
description:
@ -4149,10 +4285,137 @@ export const post56 = oc
.output(zPostAppsByAppIdWorkflowsPublishResponse)
export const publish = {
get: get63,
get: get67,
post: post56,
}
/**
* Server-Sent Events stream of inspector deltas for a published workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get68 = oc
.route({
deprecated: true,
description:
'Server-Sent Events stream of inspector deltas for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEvents',
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath }))
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse)
export const events2 = {
get: get68,
}
/**
* Full value for one declared output of a published run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get69 = oc
.route({
deprecated: true,
description:
'Full value for one declared output of a published run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId:
'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreview',
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview',
tags: ['console'],
})
.input(
z.object({
params:
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
}),
)
.output(
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
)
export const preview4 = {
get: get69,
}
export const byOutputName2 = {
preview: preview4,
}
/**
* One node's declared outputs for a published workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get70 = oc
.route({
deprecated: true,
description:
'One node\'s declared outputs for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeId',
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath }))
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse)
export const byNodeId9 = {
get: get70,
byOutputName: byOutputName2,
}
/**
* Snapshot of every node's declared outputs for a published workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get71 = oc
.route({
deprecated: true,
description:
'Snapshot of every node\'s declared outputs for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputs',
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath }))
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse)
export const nodeOutputs2 = {
get: get71,
events: events2,
byNodeId: byNodeId9,
}
export const byRunId3 = {
nodeOutputs: nodeOutputs2,
}
export const runs2 = {
byRunId: byRunId3,
}
export const published = {
runs: runs2,
}
/**
* Get webhook trigger for a node
*
@ -4160,7 +4423,7 @@ export const publish = {
*
* @deprecated
*/
export const get64 = oc
export const get72 = oc
.route({
deprecated: true,
description:
@ -4181,7 +4444,7 @@ export const get64 = oc
.output(zGetAppsByAppIdWorkflowsTriggersWebhookResponse)
export const webhook = {
get: get64,
get: get72,
}
export const triggers2 = {
@ -4279,7 +4542,7 @@ export const byWorkflowId = {
*
* @deprecated
*/
export const get65 = oc
export const get73 = oc
.route({
deprecated: true,
description:
@ -4300,10 +4563,11 @@ export const get65 = oc
.output(zGetAppsByAppIdWorkflowsResponse)
export const workflows3 = {
get: get65,
get: get73,
defaultWorkflowBlockConfigs,
draft: draft2,
publish,
published,
triggers: triggers2,
byWorkflowId,
}
@ -4336,7 +4600,7 @@ export const delete12 = oc
*
* @deprecated
*/
export const get66 = oc
export const get74 = oc
.route({
deprecated: true,
description:
@ -4377,7 +4641,7 @@ export const put7 = oc
export const byAppId2 = {
delete: delete12,
get: get66,
get: get74,
put: put7,
advancedChat,
agentComposer,
@ -4446,7 +4710,7 @@ export const byApiKeyId = {
*
* Get all API keys for an app
*/
export const get67 = oc
export const get75 = oc
.route({
description: 'Get all API keys for an app',
inputStructure: 'detailed',
@ -4479,7 +4743,7 @@ export const post58 = oc
.output(zPostAppsByResourceIdApiKeysResponse)
export const apiKeys = {
get: get67,
get: get75,
post: post58,
byApiKeyId,
}
@ -4495,7 +4759,7 @@ export const byResourceId = {
*
* @deprecated
*/
export const get68 = oc
export const get76 = oc
.route({
deprecated: true,
description:
@ -4510,7 +4774,7 @@ export const get68 = oc
.output(zGetAppsByServerIdServerRefreshResponse)
export const refresh = {
get: get68,
get: get76,
}
export const server2 = {
@ -4526,7 +4790,7 @@ export const byServerId = {
*
* Get list of applications with pagination and filtering
*/
export const get69 = oc
export const get77 = oc
.route({
description: 'Get list of applications with pagination and filtering',
inputStructure: 'detailed',
@ -4565,7 +4829,7 @@ export const post59 = oc
.output(zPostAppsResponse)
export const apps = {
get: get69,
get: get77,
post: post59,
imports,
workflows,

View File

@ -4750,6 +4750,122 @@ export type PostAppsByAppIdWorkflowsDraftRunResponses = {
export type PostAppsByAppIdWorkflowsDraftRunResponse
= PostAppsByAppIdWorkflowsDraftRunResponses[keyof PostAppsByAppIdWorkflowsDraftRunResponses]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsData = {
body?: never
path: {
app_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs'
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsError
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsData = {
body?: never
path: {
app_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events'
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsError
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdData = {
body?: never
path: {
app_id: string
node_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}'
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdError
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewData = {
body?: never
path: {
app_id: string
node_id: string
output_name: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview'
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewError
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses
= {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses]
export type GetAppsByAppIdWorkflowsDraftSystemVariablesData = {
body?: never
path: {
@ -5006,6 +5122,124 @@ export type PostAppsByAppIdWorkflowsPublishResponses = {
export type PostAppsByAppIdWorkflowsPublishResponse
= PostAppsByAppIdWorkflowsPublishResponses[keyof PostAppsByAppIdWorkflowsPublishResponses]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsData = {
body?: never
path: {
app_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs'
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsError
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsData = {
body?: never
path: {
app_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events'
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsError
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdData = {
body?: never
path: {
app_id: string
node_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}'
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdError
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewData
= {
body?: never
path: {
app_id: string
node_id: string
output_name: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview'
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors
= {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewError
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses
= {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses]
export type GetAppsByAppIdWorkflowsTriggersWebhookData = {
body?: never
path: {

View File

@ -3834,6 +3834,60 @@ export const zPostAppsByAppIdWorkflowsDraftRunPath = z.object({
*/
export const zPostAppsByAppIdWorkflowsDraftRunResponse = z.record(z.string(), z.unknown())
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath = z.object({
app_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath = z.object({
app_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath = z.object({
app_id: z.string(),
node_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath
= z.object({
app_id: z.string(),
node_id: z.string(),
output_name: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
= z.record(z.string(), z.unknown())
export const zGetAppsByAppIdWorkflowsDraftSystemVariablesPath = z.object({
app_id: z.string(),
})
@ -3954,6 +4008,60 @@ export const zPostAppsByAppIdWorkflowsPublishPath = z.object({
*/
export const zPostAppsByAppIdWorkflowsPublishResponse = z.record(z.string(), z.unknown())
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath = z.object({
app_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath = z.object({
app_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath = z.object({
app_id: z.string(),
node_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath
= z.object({
app_id: z.string(),
node_id: z.string(),
output_name: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
= z.record(z.string(), z.unknown())
export const zGetAppsByAppIdWorkflowsTriggersWebhookPath = z.object({
app_id: z.string(),
})