From fb4584b7760c6a04ffc7a017927d0f6e54eb5344 Mon Sep 17 00:00:00 2001 From: FFXN Date: Sat, 14 Feb 2026 09:48:42 +0800 Subject: [PATCH] feat: Features about running and debugging snippets. --- api/controllers/console/snippets/payloads.py | 27 ++ .../console/snippets/snippet_workflow.py | 236 +++++++++++++++- api/services/snippet_generate_service.py | 264 ++++++++++++++++++ api/services/snippet_service.py | 24 ++ 4 files changed, 550 insertions(+), 1 deletion(-) create mode 100644 api/services/snippet_generate_service.py diff --git a/api/controllers/console/snippets/payloads.py b/api/controllers/console/snippets/payloads.py index 06bd4d00fe..bdee8461dc 100644 --- a/api/controllers/console/snippets/payloads.py +++ b/api/controllers/console/snippets/payloads.py @@ -69,6 +69,33 @@ class WorkflowRunQuery(BaseModel): limit: int = Field(default=20, ge=1, le=100) +class SnippetDraftRunPayload(BaseModel): + """Payload for running snippet draft workflow.""" + + inputs: dict[str, Any] + files: list[dict[str, Any]] | None = None + + +class SnippetDraftNodeRunPayload(BaseModel): + """Payload for running a single node in snippet draft workflow.""" + + inputs: dict[str, Any] + query: str = "" + files: list[dict[str, Any]] | None = None + + +class SnippetIterationNodeRunPayload(BaseModel): + """Payload for running an iteration node in snippet draft workflow.""" + + inputs: dict[str, Any] | None = None + + +class SnippetLoopNodeRunPayload(BaseModel): + """Payload for running a loop node in snippet draft workflow.""" + + inputs: dict[str, Any] | None = None + + class PublishWorkflowPayload(BaseModel): """Payload for publishing snippet workflow.""" diff --git a/api/controllers/console/snippets/snippet_workflow.py b/api/controllers/console/snippets/snippet_workflow.py index eef04bd740..ff46f71b0f 100644 --- a/api/controllers/console/snippets/snippet_workflow.py +++ b/api/controllers/console/snippets/snippet_workflow.py @@ -6,7 +6,7 @@ from typing import ParamSpec, TypeVar from flask import request from flask_restx import Resource, marshal_with from sqlalchemy.orm import Session -from werkzeug.exceptions import NotFound +from werkzeug.exceptions import InternalServerError, NotFound from controllers.common.schema import register_schema_models from controllers.console import console_ns @@ -15,11 +15,16 @@ from controllers.console.app.workflow import workflow_model from controllers.console.app.workflow_run import ( workflow_run_detail_model, workflow_run_node_execution_list_model, + workflow_run_node_execution_model, workflow_run_pagination_model, ) from controllers.console.snippets.payloads import ( PublishWorkflowPayload, + SnippetDraftNodeRunPayload, + SnippetDraftRunPayload, SnippetDraftSyncPayload, + SnippetIterationNodeRunPayload, + SnippetLoopNodeRunPayload, WorkflowRunQuery, ) from controllers.console.wraps import ( @@ -27,12 +32,17 @@ from controllers.console.wraps import ( edit_permission_required, setup_required, ) +from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.entities.app_invoke_entities import InvokeFrom +from core.workflow.graph_engine.manager import GraphEngineManager from extensions.ext_database import db from factories import variable_factory +from libs import helper from libs.helper import TimestampField from libs.login import current_account_with_tenant, login_required from models.snippet import CustomizedSnippet from services.errors.app import WorkflowHashNotEqualError +from services.snippet_generate_service import SnippetGenerateService from services.snippet_service import SnippetService logger = logging.getLogger(__name__) @@ -44,6 +54,10 @@ R = TypeVar("R") register_schema_models( console_ns, SnippetDraftSyncPayload, + SnippetDraftNodeRunPayload, + SnippetDraftRunPayload, + SnippetIterationNodeRunPayload, + SnippetLoopNodeRunPayload, WorkflowRunQuery, PublishWorkflowPayload, ) @@ -304,3 +318,223 @@ class SnippetWorkflowRunNodeExecutionsApi(Resource): ) return {"data": node_executions} + + +@console_ns.route("/snippets//workflows/draft/nodes//run") +class SnippetDraftNodeRunApi(Resource): + @console_ns.doc("run_snippet_draft_node") + @console_ns.doc(description="Run a single node in snippet draft workflow (single-step debugging)") + @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) + @console_ns.expect(console_ns.models.get(SnippetDraftNodeRunPayload.__name__)) + @console_ns.response(200, "Node run completed successfully", workflow_run_node_execution_model) + @console_ns.response(404, "Snippet or draft workflow not found") + @setup_required + @login_required + @account_initialization_required + @get_snippet + @marshal_with(workflow_run_node_execution_model) + @edit_permission_required + def post(self, snippet: CustomizedSnippet, node_id: str): + """ + Run a single node in snippet draft workflow. + + Executes a specific node with provided inputs for single-step debugging. + Returns the node execution result including status, outputs, and timing. + """ + current_user, _ = current_account_with_tenant() + payload = SnippetDraftNodeRunPayload.model_validate(console_ns.payload or {}) + + user_inputs = payload.inputs + + # Get draft workflow for file parsing + snippet_service = SnippetService() + draft_workflow = snippet_service.get_draft_workflow(snippet=snippet) + if not draft_workflow: + raise NotFound("Draft workflow not found") + + files = SnippetGenerateService.parse_files(draft_workflow, payload.files) + + workflow_node_execution = SnippetGenerateService.run_draft_node( + snippet=snippet, + node_id=node_id, + user_inputs=user_inputs, + account=current_user, + query=payload.query, + files=files, + ) + + return workflow_node_execution + + +@console_ns.route("/snippets//workflows/draft/nodes//last-run") +class SnippetDraftNodeLastRunApi(Resource): + @console_ns.doc("get_snippet_draft_node_last_run") + @console_ns.doc(description="Get last run result for a node in snippet draft workflow") + @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) + @console_ns.response(200, "Node last run retrieved successfully", workflow_run_node_execution_model) + @console_ns.response(404, "Snippet, draft workflow, or node last run not found") + @setup_required + @login_required + @account_initialization_required + @get_snippet + @marshal_with(workflow_run_node_execution_model) + def get(self, snippet: CustomizedSnippet, node_id: str): + """ + Get the last run result for a specific node in snippet draft workflow. + + Returns the most recent execution record for the given node, + including status, inputs, outputs, and timing information. + """ + snippet_service = SnippetService() + draft_workflow = snippet_service.get_draft_workflow(snippet=snippet) + if not draft_workflow: + raise NotFound("Draft workflow not found") + + node_exec = snippet_service.get_snippet_node_last_run( + snippet=snippet, + workflow=draft_workflow, + node_id=node_id, + ) + if node_exec is None: + raise NotFound("Node last run not found") + + return node_exec + + +@console_ns.route("/snippets//workflows/draft/iteration/nodes//run") +class SnippetDraftRunIterationNodeApi(Resource): + @console_ns.doc("run_snippet_draft_iteration_node") + @console_ns.doc(description="Run draft workflow iteration node for snippet") + @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) + @console_ns.expect(console_ns.models.get(SnippetIterationNodeRunPayload.__name__)) + @console_ns.response(200, "Iteration node run started successfully (SSE stream)") + @console_ns.response(404, "Snippet or draft workflow not found") + @setup_required + @login_required + @account_initialization_required + @get_snippet + @edit_permission_required + def post(self, snippet: CustomizedSnippet, node_id: str): + """ + Run a draft workflow iteration node for snippet. + + Iteration nodes execute their internal sub-graph multiple times over an input list. + Returns an SSE event stream with iteration progress and results. + """ + current_user, _ = current_account_with_tenant() + args = SnippetIterationNodeRunPayload.model_validate(console_ns.payload or {}).model_dump(exclude_none=True) + + try: + response = SnippetGenerateService.generate_single_iteration( + snippet=snippet, user=current_user, node_id=node_id, args=args, streaming=True + ) + + return helper.compact_generate_response(response) + except ValueError as e: + raise e + except Exception: + logger.exception("internal server error.") + raise InternalServerError() + + +@console_ns.route("/snippets//workflows/draft/loop/nodes//run") +class SnippetDraftRunLoopNodeApi(Resource): + @console_ns.doc("run_snippet_draft_loop_node") + @console_ns.doc(description="Run draft workflow loop node for snippet") + @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) + @console_ns.expect(console_ns.models.get(SnippetLoopNodeRunPayload.__name__)) + @console_ns.response(200, "Loop node run started successfully (SSE stream)") + @console_ns.response(404, "Snippet or draft workflow not found") + @setup_required + @login_required + @account_initialization_required + @get_snippet + @edit_permission_required + def post(self, snippet: CustomizedSnippet, node_id: str): + """ + Run a draft workflow loop node for snippet. + + Loop nodes execute their internal sub-graph repeatedly until a condition is met. + Returns an SSE event stream with loop progress and results. + """ + current_user, _ = current_account_with_tenant() + args = SnippetLoopNodeRunPayload.model_validate(console_ns.payload or {}) + + try: + response = SnippetGenerateService.generate_single_loop( + snippet=snippet, user=current_user, node_id=node_id, args=args, streaming=True + ) + + return helper.compact_generate_response(response) + except ValueError as e: + raise e + except Exception: + logger.exception("internal server error.") + raise InternalServerError() + + +@console_ns.route("/snippets//workflows/draft/run") +class SnippetDraftWorkflowRunApi(Resource): + @console_ns.doc("run_snippet_draft_workflow") + @console_ns.expect(console_ns.models.get(SnippetDraftRunPayload.__name__)) + @console_ns.response(200, "Draft workflow run started successfully (SSE stream)") + @console_ns.response(404, "Snippet or draft workflow not found") + @setup_required + @login_required + @account_initialization_required + @get_snippet + @edit_permission_required + def post(self, snippet: CustomizedSnippet): + """ + Run draft workflow for snippet. + + Executes the snippet's draft workflow with the provided inputs + and returns an SSE event stream with execution progress and results. + """ + current_user, _ = current_account_with_tenant() + + payload = SnippetDraftRunPayload.model_validate(console_ns.payload or {}) + args = payload.model_dump(exclude_none=True) + + try: + response = SnippetGenerateService.generate( + snippet=snippet, + user=current_user, + args=args, + invoke_from=InvokeFrom.DEBUGGER, + streaming=True, + ) + + return helper.compact_generate_response(response) + except ValueError as e: + raise e + except Exception: + logger.exception("internal server error.") + raise InternalServerError() + + +@console_ns.route("/snippets//workflow-runs/tasks//stop") +class SnippetWorkflowTaskStopApi(Resource): + @console_ns.doc("stop_snippet_workflow_task") + @console_ns.response(200, "Task stopped successfully") + @console_ns.response(404, "Snippet not found") + @setup_required + @login_required + @account_initialization_required + @get_snippet + @edit_permission_required + def post(self, snippet: CustomizedSnippet, task_id: str): + """ + Stop a running snippet workflow task. + + Uses both the legacy stop flag mechanism and the graph engine + command channel for backward compatibility. + """ + # Stop using both mechanisms for backward compatibility + # Legacy stop flag mechanism (without user check) + AppQueueManager.set_stop_flag_no_user_check(task_id) + + # New graph engine command channel mechanism + GraphEngineManager.send_stop_command(task_id) + + return {"result": "success"} diff --git a/api/services/snippet_generate_service.py b/api/services/snippet_generate_service.py new file mode 100644 index 0000000000..b21a2cbd7a --- /dev/null +++ b/api/services/snippet_generate_service.py @@ -0,0 +1,264 @@ +""" +Service for generating snippet workflow executions. + +Uses an adapter pattern to bridge CustomizedSnippet with the App-based +WorkflowAppGenerator. The adapter (_SnippetAsApp) provides the minimal App-like +interface needed by the generator, avoiding modifications to core workflow +infrastructure. + +Key invariants: +- Snippets always run as WORKFLOW mode (not CHAT or ADVANCED_CHAT). +- The adapter maps snippet.id to app_id in workflow execution records. +- Snippet debugging has no rate limiting (max_active_requests = 0). + +Supported execution modes: +- Full workflow run (generate): Runs the entire draft workflow as SSE stream. +- Single node run (run_draft_node): Synchronous single-step debugging for regular nodes. +- Single iteration run (generate_single_iteration): SSE stream for iteration container nodes. +- Single loop run (generate_single_loop): SSE stream for loop container nodes. +""" + +import logging +from collections.abc import Generator, Mapping, Sequence +from typing import Any, Union + +from core.app.app_config.features.file_upload.manager import FileUploadConfigManager +from core.app.apps.workflow.app_generator import WorkflowAppGenerator +from core.app.entities.app_invoke_entities import InvokeFrom +from core.file.models import File +from factories import file_factory +from models import Account +from models.model import AppMode, EndUser +from models.snippet import CustomizedSnippet +from models.workflow import Workflow, WorkflowNodeExecutionModel +from services.snippet_service import SnippetService +from services.workflow_service import WorkflowService + +logger = logging.getLogger(__name__) + + +class _SnippetAsApp: + """ + Minimal adapter that wraps a CustomizedSnippet to satisfy the App-like + interface required by WorkflowAppGenerator, WorkflowAppConfigManager, + and WorkflowService.run_draft_workflow_node. + + Used properties: + - id: maps to snippet.id (stored as app_id in workflows table) + - tenant_id: maps to snippet.tenant_id + - mode: hardcoded to AppMode.WORKFLOW since snippets always run as workflows + - max_active_requests: defaults to 0 (no limit) for snippet debugging + - app_model_config_id: None (snippets don't have app model configs) + """ + + id: str + tenant_id: str + mode: str + max_active_requests: int + app_model_config_id: str | None + + def __init__(self, snippet: CustomizedSnippet) -> None: + self.id = snippet.id + self.tenant_id = snippet.tenant_id + self.mode = AppMode.WORKFLOW.value + self.max_active_requests = 0 + self.app_model_config_id = None + + +class SnippetGenerateService: + """ + Service for running snippet workflow executions. + + Adapts CustomizedSnippet to work with the existing App-based + WorkflowAppGenerator infrastructure, avoiding duplication of the + complex workflow execution pipeline. + """ + + @classmethod + def generate( + cls, + snippet: CustomizedSnippet, + user: Union[Account, EndUser], + args: Mapping[str, Any], + invoke_from: InvokeFrom, + streaming: bool = True, + ) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: + """ + Run a snippet's draft workflow. + + Retrieves the draft workflow, adapts the snippet to an App-like proxy, + then delegates execution to WorkflowAppGenerator. + + :param snippet: CustomizedSnippet instance + :param user: Account or EndUser initiating the run + :param args: Workflow inputs (must include "inputs" key) + :param invoke_from: Source of invocation (typically DEBUGGER) + :param streaming: Whether to stream the response + :return: Blocking response mapping or SSE streaming generator + :raises ValueError: If the snippet has no draft workflow + """ + snippet_service = SnippetService() + workflow = snippet_service.get_draft_workflow(snippet=snippet) + if not workflow: + raise ValueError("Workflow not initialized") + + # Adapt snippet to App-like interface for WorkflowAppGenerator + app_proxy = _SnippetAsApp(snippet) + + return WorkflowAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().generate( + app_model=app_proxy, # type: ignore[arg-type] + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=streaming, + call_depth=0, + ) + ) + + @classmethod + def run_draft_node( + cls, + snippet: CustomizedSnippet, + node_id: str, + user_inputs: Mapping[str, Any], + account: Account, + query: str = "", + files: Sequence[File] | None = None, + ) -> WorkflowNodeExecutionModel: + """ + Run a single node in a snippet's draft workflow (single-step debugging). + + Retrieves the draft workflow, adapts the snippet to an App-like proxy, + parses file inputs, then delegates to WorkflowService.run_draft_workflow_node. + + :param snippet: CustomizedSnippet instance + :param node_id: ID of the node to run + :param user_inputs: User input values for the node + :param account: Account initiating the run + :param query: Optional query string + :param files: Optional parsed file objects + :return: WorkflowNodeExecutionModel with execution results + :raises ValueError: If the snippet has no draft workflow + """ + snippet_service = SnippetService() + draft_workflow = snippet_service.get_draft_workflow(snippet=snippet) + if not draft_workflow: + raise ValueError("Workflow not initialized") + + app_proxy = _SnippetAsApp(snippet) + + workflow_service = WorkflowService() + return workflow_service.run_draft_workflow_node( + app_model=app_proxy, # type: ignore[arg-type] + draft_workflow=draft_workflow, + node_id=node_id, + user_inputs=user_inputs, + account=account, + query=query, + files=files, + ) + + @classmethod + def generate_single_iteration( + cls, + snippet: CustomizedSnippet, + user: Union[Account, EndUser], + node_id: str, + args: Mapping[str, Any], + streaming: bool = True, + ) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: + """ + Run a single iteration node in a snippet's draft workflow. + + Iteration nodes are container nodes that execute their sub-graph multiple + times, producing many events. Therefore, this uses the full WorkflowAppGenerator + pipeline with SSE streaming (unlike regular single-step node run). + + :param snippet: CustomizedSnippet instance + :param user: Account or EndUser initiating the run + :param node_id: ID of the iteration node to run + :param args: Dict containing 'inputs' key with iteration input data + :param streaming: Whether to stream the response (should be True) + :return: SSE streaming generator + :raises ValueError: If the snippet has no draft workflow + """ + snippet_service = SnippetService() + workflow = snippet_service.get_draft_workflow(snippet=snippet) + if not workflow: + raise ValueError("Workflow not initialized") + + app_proxy = _SnippetAsApp(snippet) + + return WorkflowAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().single_iteration_generate( + app_model=app_proxy, # type: ignore[arg-type] + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, + ) + ) + + @classmethod + def generate_single_loop( + cls, + snippet: CustomizedSnippet, + user: Union[Account, EndUser], + node_id: str, + args: Any, + streaming: bool = True, + ) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: + """ + Run a single loop node in a snippet's draft workflow. + + Loop nodes are container nodes that execute their sub-graph repeatedly, + producing many events. Therefore, this uses the full WorkflowAppGenerator + pipeline with SSE streaming (unlike regular single-step node run). + + :param snippet: CustomizedSnippet instance + :param user: Account or EndUser initiating the run + :param node_id: ID of the loop node to run + :param args: Pydantic model with 'inputs' attribute containing loop input data + :param streaming: Whether to stream the response (should be True) + :return: SSE streaming generator + :raises ValueError: If the snippet has no draft workflow + """ + snippet_service = SnippetService() + workflow = snippet_service.get_draft_workflow(snippet=snippet) + if not workflow: + raise ValueError("Workflow not initialized") + + app_proxy = _SnippetAsApp(snippet) + + return WorkflowAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().single_loop_generate( + app_model=app_proxy, # type: ignore[arg-type] + workflow=workflow, + node_id=node_id, + user=user, + args=args, # type: ignore[arg-type] + streaming=streaming, + ) + ) + + @staticmethod + def parse_files(workflow: Workflow, files: list[dict] | None = None) -> Sequence[File]: + """ + Parse file mappings into File objects based on workflow configuration. + + :param workflow: Workflow instance for file upload config + :param files: Raw file mapping dicts + :return: Parsed File objects + """ + files = files or [] + file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False) + if file_extra_config is None: + return [] + return file_factory.build_from_mappings( + mappings=files, + tenant_id=workflow.tenant_id, + config=file_extra_config, + ) diff --git a/api/services/snippet_service.py b/api/services/snippet_service.py index 3664ed2617..cbe3f0f4a6 100644 --- a/api/services/snippet_service.py +++ b/api/services/snippet_service.py @@ -524,6 +524,30 @@ class SnippetService: return node_executions + # --- Node Execution Operations --- + + def get_snippet_node_last_run( + self, + *, + snippet: CustomizedSnippet, + workflow: Workflow, + node_id: str, + ) -> WorkflowNodeExecutionModel | None: + """ + Get the most recent execution for a specific node in a snippet workflow. + + :param snippet: CustomizedSnippet instance + :param workflow: Workflow instance + :param node_id: Node identifier + :return: WorkflowNodeExecutionModel or None + """ + return self._node_execution_service_repo.get_node_last_execution( + tenant_id=snippet.tenant_id, + app_id=snippet.id, + workflow_id=workflow.id, + node_id=node_id, + ) + # --- Use Count --- @staticmethod