diff --git a/api/controllers/console/snippets/snippet_workflow.py b/api/controllers/console/snippets/snippet_workflow.py index d2a29661ef..f47f22cc51 100644 --- a/api/controllers/console/snippets/snippet_workflow.py +++ b/api/controllers/console/snippets/snippet_workflow.py @@ -1,617 +1,617 @@ -import logging -from collections.abc import Callable -from functools import wraps - -from flask import request -from flask_restx import Resource, fields, marshal, marshal_with -from sqlalchemy.orm import Session -from werkzeug.exceptions import BadRequest, InternalServerError, NotFound - -from controllers.common.schema import register_schema_models -from controllers.console import console_ns -from controllers.console.app.error import DraftWorkflowNotExist, DraftWorkflowNotSync -from controllers.console.app.workflow import ( - RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE, - workflow_model, - workflow_pagination_model, -) -from controllers.console.app.workflow_run import ( - workflow_run_detail_model, - workflow_run_node_execution_list_model, - workflow_run_node_execution_model, - workflow_run_pagination_model, -) -from controllers.console.snippets.payloads import ( - PublishWorkflowPayload, - SnippetDraftNodeRunPayload, - SnippetDraftRunPayload, - SnippetDraftSyncPayload, - SnippetIterationNodeRunPayload, - SnippetLoopNodeRunPayload, - SnippetWorkflowListQuery, - WorkflowRunQuery, -) -from controllers.console.wraps import ( - account_initialization_required, - edit_permission_required, - setup_required, -) -from core.app.apps.base_app_queue_manager import AppQueueManager -from core.app.entities.app_invoke_entities import InvokeFrom -from extensions.ext_database import db -from extensions.ext_redis import redis_client -from graphon.graph_engine.manager import GraphEngineManager -from libs import helper -from libs.helper import TimestampField -from libs.login import current_account_with_tenant, login_required -from models.snippet import CustomizedSnippet -from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError -from services.snippet_generate_service import SnippetGenerateService -from services.snippet_service import SnippetService - -logger = logging.getLogger(__name__) - -# Register Pydantic models with Swagger -register_schema_models( - console_ns, - SnippetDraftSyncPayload, - SnippetDraftNodeRunPayload, - SnippetDraftRunPayload, - SnippetIterationNodeRunPayload, - SnippetLoopNodeRunPayload, - SnippetWorkflowListQuery, - WorkflowRunQuery, - PublishWorkflowPayload, -) - - -snippet_workflow_model = console_ns.clone("SnippetWorkflow", workflow_model, { - "input_fields": fields.Raw(default=[]), -}) - - -class SnippetNotFoundError(Exception): - """Snippet not found error.""" - - pass - - -def get_snippet[**P, R](view_func: Callable[P, R]) -> Callable[P, R]: - """Decorator to fetch and validate snippet access.""" - - @wraps(view_func) - def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R: - if not kwargs.get("snippet_id"): - raise ValueError("missing snippet_id in path parameters") - - _, current_tenant_id = current_account_with_tenant() - - snippet_id = str(kwargs.get("snippet_id")) - del kwargs["snippet_id"] - - snippet = SnippetService.get_snippet_by_id( - snippet_id=snippet_id, - tenant_id=current_tenant_id, - ) - - if not snippet: - raise NotFound("Snippet not found") - - kwargs["snippet"] = snippet - - return view_func(*args, **kwargs) - - return decorated_view - - -@console_ns.route("/snippets//workflows/draft") -class SnippetDraftWorkflowApi(Resource): - @console_ns.doc("get_snippet_draft_workflow") - @console_ns.response(200, "Draft workflow retrieved successfully", snippet_workflow_model) - @console_ns.response(404, "Snippet or draft workflow not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - @marshal_with(snippet_workflow_model) - def get(self, snippet: CustomizedSnippet): - """Get draft workflow for snippet.""" - snippet_service = SnippetService() - workflow = snippet_service.get_draft_workflow(snippet=snippet) - - if not workflow: - raise DraftWorkflowNotExist() - - db.session.expunge(workflow) - workflow.conversation_variables = [] - workflow.input_fields = snippet.input_fields_list - return workflow - - @console_ns.doc("sync_snippet_draft_workflow") - @console_ns.expect(console_ns.models.get(SnippetDraftSyncPayload.__name__)) - @console_ns.response(200, "Draft workflow synced successfully") - @console_ns.response(400, "Hash mismatch") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def post(self, snippet: CustomizedSnippet): - """Sync draft workflow for snippet.""" - current_user, _ = current_account_with_tenant() - - payload = SnippetDraftSyncPayload.model_validate(console_ns.payload or {}) - - try: - snippet_service = SnippetService() - workflow = snippet_service.sync_draft_workflow( - snippet=snippet, - graph=payload.graph, - unique_hash=payload.hash, - account=current_user, - input_fields=payload.input_fields, - ) - except WorkflowHashNotEqualError: - raise DraftWorkflowNotSync() - except ValueError as e: - return {"message": str(e)}, 400 - - return { - "result": "success", - "hash": workflow.unique_hash, - "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at), - } - - -@console_ns.route("/snippets//workflows/draft/config") -class SnippetDraftConfigApi(Resource): - @console_ns.doc("get_snippet_draft_config") - @console_ns.response(200, "Draft config retrieved successfully") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def get(self, snippet: CustomizedSnippet): - """Get snippet draft workflow configuration limits.""" - return { - "parallel_depth_limit": 3, - } - - -@console_ns.route("/snippets//workflows/publish") -class SnippetPublishedWorkflowApi(Resource): - @console_ns.doc("get_snippet_published_workflow") - @console_ns.response(200, "Published workflow retrieved successfully", snippet_workflow_model) - @console_ns.response(404, "Snippet not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - @marshal_with(snippet_workflow_model) - def get(self, snippet: CustomizedSnippet): - """Get published workflow for snippet.""" - if not snippet.is_published: - return None - - snippet_service = SnippetService() - workflow = snippet_service.get_published_workflow(snippet=snippet) - - if workflow: - workflow.input_fields = snippet.input_fields_list - - return workflow - - @console_ns.doc("publish_snippet_workflow") - @console_ns.expect(console_ns.models.get(PublishWorkflowPayload.__name__)) - @console_ns.response(200, "Workflow published successfully") - @console_ns.response(400, "No draft workflow found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def post(self, snippet: CustomizedSnippet): - """Publish snippet workflow.""" - current_user, _ = current_account_with_tenant() - snippet_service = SnippetService() - - with Session(db.engine) as session: - snippet = session.merge(snippet) - try: - workflow = snippet_service.publish_workflow( - session=session, - snippet=snippet, - account=current_user, - ) - workflow_created_at = TimestampField().format(workflow.created_at) - session.commit() - except ValueError as e: - return {"message": str(e)}, 400 - - return { - "result": "success", - "created_at": workflow_created_at, - } - - -@console_ns.route("/snippets//workflows/default-workflow-block-configs") -class SnippetDefaultBlockConfigsApi(Resource): - @console_ns.doc("get_snippet_default_block_configs") - @console_ns.response(200, "Default block configs retrieved successfully") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def get(self, snippet: CustomizedSnippet): - """Get default block configurations for snippet workflow.""" - snippet_service = SnippetService() - return snippet_service.get_default_block_configs() - - -@console_ns.route("/snippets//workflows") -class SnippetPublishedAllWorkflowApi(Resource): - @console_ns.expect(console_ns.models[SnippetWorkflowListQuery.__name__]) - @console_ns.doc("get_all_snippet_published_workflows") - @console_ns.doc(description="Get all published workflows for a snippet") - @console_ns.doc(params={"snippet_id": "Snippet ID"}) - @console_ns.response(200, "Published workflows retrieved successfully", workflow_pagination_model) - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def get(self, snippet: CustomizedSnippet): - """Get all published workflow versions for snippet.""" - args = SnippetWorkflowListQuery.model_validate(request.args.to_dict(flat=True)) - - snippet_service = SnippetService() - with Session(db.engine) as session: - workflows, has_more = snippet_service.get_all_published_workflows( - session=session, - snippet=snippet, - page=args.page, - limit=args.limit, - ) - serialized_workflows = marshal(workflows, workflow_model) - - return { - "items": serialized_workflows, - "page": args.page, - "limit": args.limit, - "has_more": has_more, - } - - -@console_ns.route("/snippets//workflows//restore") -class SnippetDraftWorkflowRestoreApi(Resource): - @console_ns.doc("restore_snippet_workflow_to_draft") - @console_ns.doc(description="Restore a published snippet workflow version into the draft workflow") - @console_ns.doc(params={"snippet_id": "Snippet ID", "workflow_id": "Published workflow ID"}) - @console_ns.response(200, "Workflow restored successfully") - @console_ns.response(400, "Source workflow must be published") - @console_ns.response(404, "Workflow not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def post(self, snippet: CustomizedSnippet, workflow_id: str): - """Restore a published snippet workflow version into the draft workflow.""" - current_user, _ = current_account_with_tenant() - snippet_service = SnippetService() - - try: - workflow = snippet_service.restore_published_workflow_to_draft( - snippet=snippet, - workflow_id=workflow_id, - account=current_user, - ) - except IsDraftWorkflowError as exc: - raise BadRequest(RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE) from exc - except WorkflowNotFoundError as exc: - raise NotFound(str(exc)) from exc - except ValueError as exc: - raise BadRequest(str(exc)) from exc - - return { - "result": "success", - "hash": workflow.unique_hash, - "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at), - } - - -@console_ns.route("/snippets//workflow-runs") -class SnippetWorkflowRunsApi(Resource): - @console_ns.doc("list_snippet_workflow_runs") - @console_ns.response(200, "Workflow runs retrieved successfully", workflow_run_pagination_model) - @setup_required - @login_required - @account_initialization_required - @get_snippet - @marshal_with(workflow_run_pagination_model) - def get(self, snippet: CustomizedSnippet): - """List workflow runs for snippet.""" - query = WorkflowRunQuery.model_validate( - { - "last_id": request.args.get("last_id"), - "limit": request.args.get("limit", type=int, default=20), - } - ) - args = { - "last_id": query.last_id, - "limit": query.limit, - } - - snippet_service = SnippetService() - result = snippet_service.get_snippet_workflow_runs(snippet=snippet, args=args) - - return result - - -@console_ns.route("/snippets//workflow-runs/") -class SnippetWorkflowRunDetailApi(Resource): - @console_ns.doc("get_snippet_workflow_run_detail") - @console_ns.response(200, "Workflow run detail retrieved successfully", workflow_run_detail_model) - @console_ns.response(404, "Workflow run not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @marshal_with(workflow_run_detail_model) - def get(self, snippet: CustomizedSnippet, run_id): - """Get workflow run detail for snippet.""" - run_id = str(run_id) - - snippet_service = SnippetService() - workflow_run = snippet_service.get_snippet_workflow_run(snippet=snippet, run_id=run_id) - - if not workflow_run: - raise NotFound("Workflow run not found") - - return workflow_run - - -@console_ns.route("/snippets//workflow-runs//node-executions") -class SnippetWorkflowRunNodeExecutionsApi(Resource): - @console_ns.doc("list_snippet_workflow_run_node_executions") - @console_ns.response(200, "Node executions retrieved successfully", workflow_run_node_execution_list_model) - @setup_required - @login_required - @account_initialization_required - @get_snippet - @marshal_with(workflow_run_node_execution_list_model) - def get(self, snippet: CustomizedSnippet, run_id): - """List node executions for a workflow run.""" - run_id = str(run_id) - - snippet_service = SnippetService() - node_executions = snippet_service.get_snippet_workflow_run_node_executions( - snippet=snippet, - run_id=run_id, - ) - - return {"data": node_executions} - - -@console_ns.route("/snippets//workflows/draft/nodes//run") -class SnippetDraftNodeRunApi(Resource): - @console_ns.doc("run_snippet_draft_node") - @console_ns.doc(description="Run a single node in snippet draft workflow (single-step debugging)") - @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) - @console_ns.expect(console_ns.models.get(SnippetDraftNodeRunPayload.__name__)) - @console_ns.response(200, "Node run completed successfully", workflow_run_node_execution_model) - @console_ns.response(404, "Snippet or draft workflow not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @marshal_with(workflow_run_node_execution_model) - @edit_permission_required - def post(self, snippet: CustomizedSnippet, node_id: str): - """ - Run a single node in snippet draft workflow. - - Executes a specific node with provided inputs for single-step debugging. - Returns the node execution result including status, outputs, and timing. - """ - current_user, _ = current_account_with_tenant() - payload = SnippetDraftNodeRunPayload.model_validate(console_ns.payload or {}) - - user_inputs = payload.inputs - - # Get draft workflow for file parsing - snippet_service = SnippetService() - draft_workflow = snippet_service.get_draft_workflow(snippet=snippet) - if not draft_workflow: - raise NotFound("Draft workflow not found") - - files = SnippetGenerateService.parse_files(draft_workflow, payload.files) - - workflow_node_execution = SnippetGenerateService.run_draft_node( - snippet=snippet, - node_id=node_id, - user_inputs=user_inputs, - account=current_user, - query=payload.query, - files=files, - ) - - return workflow_node_execution - - -@console_ns.route("/snippets//workflows/draft/nodes//last-run") -class SnippetDraftNodeLastRunApi(Resource): - @console_ns.doc("get_snippet_draft_node_last_run") - @console_ns.doc(description="Get last run result for a node in snippet draft workflow") - @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) - @console_ns.response(200, "Node last run retrieved successfully", workflow_run_node_execution_model) - @console_ns.response(404, "Snippet, draft workflow, or node last run not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @marshal_with(workflow_run_node_execution_model) - def get(self, snippet: CustomizedSnippet, node_id: str): - """ - Get the last run result for a specific node in snippet draft workflow. - - Returns the most recent execution record for the given node, - including status, inputs, outputs, and timing information. - """ - snippet_service = SnippetService() - draft_workflow = snippet_service.get_draft_workflow(snippet=snippet) - if not draft_workflow: - raise NotFound("Draft workflow not found") - - node_exec = snippet_service.get_snippet_node_last_run( - snippet=snippet, - workflow=draft_workflow, - node_id=node_id, - ) - if node_exec is None: - raise NotFound("Node last run not found") - - return node_exec - - -@console_ns.route("/snippets//workflows/draft/iteration/nodes//run") -class SnippetDraftRunIterationNodeApi(Resource): - @console_ns.doc("run_snippet_draft_iteration_node") - @console_ns.doc(description="Run draft workflow iteration node for snippet") - @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) - @console_ns.expect(console_ns.models.get(SnippetIterationNodeRunPayload.__name__)) - @console_ns.response(200, "Iteration node run started successfully (SSE stream)") - @console_ns.response(404, "Snippet or draft workflow not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def post(self, snippet: CustomizedSnippet, node_id: str): - """ - Run a draft workflow iteration node for snippet. - - Iteration nodes execute their internal sub-graph multiple times over an input list. - Returns an SSE event stream with iteration progress and results. - """ - current_user, _ = current_account_with_tenant() - args = SnippetIterationNodeRunPayload.model_validate(console_ns.payload or {}).model_dump(exclude_none=True) - - try: - response = SnippetGenerateService.generate_single_iteration( - snippet=snippet, user=current_user, node_id=node_id, args=args, streaming=True - ) - - return helper.compact_generate_response(response) - except ValueError as e: - raise e - except Exception: - logger.exception("internal server error.") - raise InternalServerError() - - -@console_ns.route("/snippets//workflows/draft/loop/nodes//run") -class SnippetDraftRunLoopNodeApi(Resource): - @console_ns.doc("run_snippet_draft_loop_node") - @console_ns.doc(description="Run draft workflow loop node for snippet") - @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) - @console_ns.expect(console_ns.models.get(SnippetLoopNodeRunPayload.__name__)) - @console_ns.response(200, "Loop node run started successfully (SSE stream)") - @console_ns.response(404, "Snippet or draft workflow not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def post(self, snippet: CustomizedSnippet, node_id: str): - """ - Run a draft workflow loop node for snippet. - - Loop nodes execute their internal sub-graph repeatedly until a condition is met. - Returns an SSE event stream with loop progress and results. - """ - current_user, _ = current_account_with_tenant() - args = SnippetLoopNodeRunPayload.model_validate(console_ns.payload or {}) - - try: - response = SnippetGenerateService.generate_single_loop( - snippet=snippet, user=current_user, node_id=node_id, args=args, streaming=True - ) - - return helper.compact_generate_response(response) - except ValueError as e: - raise e - except Exception: - logger.exception("internal server error.") - raise InternalServerError() - - -@console_ns.route("/snippets//workflows/draft/run") -class SnippetDraftWorkflowRunApi(Resource): - @console_ns.doc("run_snippet_draft_workflow") - @console_ns.expect(console_ns.models.get(SnippetDraftRunPayload.__name__)) - @console_ns.response(200, "Draft workflow run started successfully (SSE stream)") - @console_ns.response(404, "Snippet or draft workflow not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def post(self, snippet: CustomizedSnippet): - """ - Run draft workflow for snippet. - - Executes the snippet's draft workflow with the provided inputs - and returns an SSE event stream with execution progress and results. - """ - current_user, _ = current_account_with_tenant() - - payload = SnippetDraftRunPayload.model_validate(console_ns.payload or {}) - args = payload.model_dump(exclude_none=True) - - try: - response = SnippetGenerateService.generate( - snippet=snippet, - user=current_user, - args=args, - invoke_from=InvokeFrom.DEBUGGER, - streaming=True, - ) - - return helper.compact_generate_response(response) - except ValueError as e: - raise e - except Exception: - logger.exception("internal server error.") - raise InternalServerError() - - -@console_ns.route("/snippets//workflow-runs/tasks//stop") -class SnippetWorkflowTaskStopApi(Resource): - @console_ns.doc("stop_snippet_workflow_task") - @console_ns.response(200, "Task stopped successfully") - @console_ns.response(404, "Snippet not found") - @setup_required - @login_required - @account_initialization_required - @get_snippet - @edit_permission_required - def post(self, snippet: CustomizedSnippet, task_id: str): - """ - Stop a running snippet workflow task. - - Uses both the legacy stop flag mechanism and the graph engine - command channel for backward compatibility. - """ - # Stop using both mechanisms for backward compatibility - # Legacy stop flag mechanism (without user check) - AppQueueManager.set_stop_flag_no_user_check(task_id) - - # New graph engine command channel mechanism - GraphEngineManager(redis_client).send_stop_command(task_id) - - return {"result": "success"} +# import logging +# from collections.abc import Callable +# from functools import wraps + +# from flask import request +# from flask_restx import Resource, fields, marshal, marshal_with +# from sqlalchemy.orm import Session +# from werkzeug.exceptions import BadRequest, InternalServerError, NotFound + +# from controllers.common.schema import register_schema_models +# from controllers.console import console_ns +# from controllers.console.app.error import DraftWorkflowNotExist, DraftWorkflowNotSync +# from controllers.console.app.workflow import ( +# RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE, +# workflow_model, +# workflow_pagination_model, +# ) +# from controllers.console.app.workflow_run import ( +# workflow_run_detail_model, +# workflow_run_node_execution_list_model, +# workflow_run_node_execution_model, +# workflow_run_pagination_model, +# ) +# from controllers.console.snippets.payloads import ( +# PublishWorkflowPayload, +# SnippetDraftNodeRunPayload, +# SnippetDraftRunPayload, +# SnippetDraftSyncPayload, +# SnippetIterationNodeRunPayload, +# SnippetLoopNodeRunPayload, +# SnippetWorkflowListQuery, +# WorkflowRunQuery, +# ) +# from controllers.console.wraps import ( +# account_initialization_required, +# edit_permission_required, +# setup_required, +# ) +# from core.app.apps.base_app_queue_manager import AppQueueManager +# from core.app.entities.app_invoke_entities import InvokeFrom +# from extensions.ext_database import db +# from extensions.ext_redis import redis_client +# from graphon.graph_engine.manager import GraphEngineManager +# from libs import helper +# from libs.helper import TimestampField +# from libs.login import current_account_with_tenant, login_required +# from models.snippet import CustomizedSnippet +# from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError +# from services.snippet_generate_service import SnippetGenerateService +# from services.snippet_service import SnippetService + +# logger = logging.getLogger(__name__) + +# # Register Pydantic models with Swagger +# register_schema_models( +# console_ns, +# SnippetDraftSyncPayload, +# SnippetDraftNodeRunPayload, +# SnippetDraftRunPayload, +# SnippetIterationNodeRunPayload, +# SnippetLoopNodeRunPayload, +# SnippetWorkflowListQuery, +# WorkflowRunQuery, +# PublishWorkflowPayload, +# ) + + +# snippet_workflow_model = console_ns.clone("SnippetWorkflow", workflow_model, { +# "input_fields": fields.Raw(default=[]), +# }) + + +# class SnippetNotFoundError(Exception): +# """Snippet not found error.""" + +# pass + + +# def get_snippet[**P, R](view_func: Callable[P, R]) -> Callable[P, R]: +# """Decorator to fetch and validate snippet access.""" + +# @wraps(view_func) +# def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R: +# if not kwargs.get("snippet_id"): +# raise ValueError("missing snippet_id in path parameters") + +# _, current_tenant_id = current_account_with_tenant() + +# snippet_id = str(kwargs.get("snippet_id")) +# del kwargs["snippet_id"] + +# snippet = SnippetService.get_snippet_by_id( +# snippet_id=snippet_id, +# tenant_id=current_tenant_id, +# ) + +# if not snippet: +# raise NotFound("Snippet not found") + +# kwargs["snippet"] = snippet + +# return view_func(*args, **kwargs) + +# return decorated_view + + +# @console_ns.route("/snippets//workflows/draft") +# class SnippetDraftWorkflowApi(Resource): +# @console_ns.doc("get_snippet_draft_workflow") +# @console_ns.response(200, "Draft workflow retrieved successfully", snippet_workflow_model) +# @console_ns.response(404, "Snippet or draft workflow not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# @marshal_with(snippet_workflow_model) +# def get(self, snippet: CustomizedSnippet): +# """Get draft workflow for snippet.""" +# snippet_service = SnippetService() +# workflow = snippet_service.get_draft_workflow(snippet=snippet) + +# if not workflow: +# raise DraftWorkflowNotExist() + +# db.session.expunge(workflow) +# workflow.conversation_variables = [] +# workflow.input_fields = snippet.input_fields_list +# return workflow + +# @console_ns.doc("sync_snippet_draft_workflow") +# @console_ns.expect(console_ns.models.get(SnippetDraftSyncPayload.__name__)) +# @console_ns.response(200, "Draft workflow synced successfully") +# @console_ns.response(400, "Hash mismatch") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def post(self, snippet: CustomizedSnippet): +# """Sync draft workflow for snippet.""" +# current_user, _ = current_account_with_tenant() + +# payload = SnippetDraftSyncPayload.model_validate(console_ns.payload or {}) + +# try: +# snippet_service = SnippetService() +# workflow = snippet_service.sync_draft_workflow( +# snippet=snippet, +# graph=payload.graph, +# unique_hash=payload.hash, +# account=current_user, +# input_fields=payload.input_fields, +# ) +# except WorkflowHashNotEqualError: +# raise DraftWorkflowNotSync() +# except ValueError as e: +# return {"message": str(e)}, 400 + +# return { +# "result": "success", +# "hash": workflow.unique_hash, +# "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at), +# } + + +# @console_ns.route("/snippets//workflows/draft/config") +# class SnippetDraftConfigApi(Resource): +# @console_ns.doc("get_snippet_draft_config") +# @console_ns.response(200, "Draft config retrieved successfully") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def get(self, snippet: CustomizedSnippet): +# """Get snippet draft workflow configuration limits.""" +# return { +# "parallel_depth_limit": 3, +# } + + +# @console_ns.route("/snippets//workflows/publish") +# class SnippetPublishedWorkflowApi(Resource): +# @console_ns.doc("get_snippet_published_workflow") +# @console_ns.response(200, "Published workflow retrieved successfully", snippet_workflow_model) +# @console_ns.response(404, "Snippet not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# @marshal_with(snippet_workflow_model) +# def get(self, snippet: CustomizedSnippet): +# """Get published workflow for snippet.""" +# if not snippet.is_published: +# return None + +# snippet_service = SnippetService() +# workflow = snippet_service.get_published_workflow(snippet=snippet) + +# if workflow: +# workflow.input_fields = snippet.input_fields_list + +# return workflow + +# @console_ns.doc("publish_snippet_workflow") +# @console_ns.expect(console_ns.models.get(PublishWorkflowPayload.__name__)) +# @console_ns.response(200, "Workflow published successfully") +# @console_ns.response(400, "No draft workflow found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def post(self, snippet: CustomizedSnippet): +# """Publish snippet workflow.""" +# current_user, _ = current_account_with_tenant() +# snippet_service = SnippetService() + +# with Session(db.engine) as session: +# snippet = session.merge(snippet) +# try: +# workflow = snippet_service.publish_workflow( +# session=session, +# snippet=snippet, +# account=current_user, +# ) +# workflow_created_at = TimestampField().format(workflow.created_at) +# session.commit() +# except ValueError as e: +# return {"message": str(e)}, 400 + +# return { +# "result": "success", +# "created_at": workflow_created_at, +# } + + +# @console_ns.route("/snippets//workflows/default-workflow-block-configs") +# class SnippetDefaultBlockConfigsApi(Resource): +# @console_ns.doc("get_snippet_default_block_configs") +# @console_ns.response(200, "Default block configs retrieved successfully") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def get(self, snippet: CustomizedSnippet): +# """Get default block configurations for snippet workflow.""" +# snippet_service = SnippetService() +# return snippet_service.get_default_block_configs() + + +# @console_ns.route("/snippets//workflows") +# class SnippetPublishedAllWorkflowApi(Resource): +# @console_ns.expect(console_ns.models[SnippetWorkflowListQuery.__name__]) +# @console_ns.doc("get_all_snippet_published_workflows") +# @console_ns.doc(description="Get all published workflows for a snippet") +# @console_ns.doc(params={"snippet_id": "Snippet ID"}) +# @console_ns.response(200, "Published workflows retrieved successfully", workflow_pagination_model) +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def get(self, snippet: CustomizedSnippet): +# """Get all published workflow versions for snippet.""" +# args = SnippetWorkflowListQuery.model_validate(request.args.to_dict(flat=True)) + +# snippet_service = SnippetService() +# with Session(db.engine) as session: +# workflows, has_more = snippet_service.get_all_published_workflows( +# session=session, +# snippet=snippet, +# page=args.page, +# limit=args.limit, +# ) +# serialized_workflows = marshal(workflows, workflow_model) + +# return { +# "items": serialized_workflows, +# "page": args.page, +# "limit": args.limit, +# "has_more": has_more, +# } + + +# @console_ns.route("/snippets//workflows//restore") +# class SnippetDraftWorkflowRestoreApi(Resource): +# @console_ns.doc("restore_snippet_workflow_to_draft") +# @console_ns.doc(description="Restore a published snippet workflow version into the draft workflow") +# @console_ns.doc(params={"snippet_id": "Snippet ID", "workflow_id": "Published workflow ID"}) +# @console_ns.response(200, "Workflow restored successfully") +# @console_ns.response(400, "Source workflow must be published") +# @console_ns.response(404, "Workflow not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def post(self, snippet: CustomizedSnippet, workflow_id: str): +# """Restore a published snippet workflow version into the draft workflow.""" +# current_user, _ = current_account_with_tenant() +# snippet_service = SnippetService() + +# try: +# workflow = snippet_service.restore_published_workflow_to_draft( +# snippet=snippet, +# workflow_id=workflow_id, +# account=current_user, +# ) +# except IsDraftWorkflowError as exc: +# raise BadRequest(RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE) from exc +# except WorkflowNotFoundError as exc: +# raise NotFound(str(exc)) from exc +# except ValueError as exc: +# raise BadRequest(str(exc)) from exc + +# return { +# "result": "success", +# "hash": workflow.unique_hash, +# "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at), +# } + + +# @console_ns.route("/snippets//workflow-runs") +# class SnippetWorkflowRunsApi(Resource): +# @console_ns.doc("list_snippet_workflow_runs") +# @console_ns.response(200, "Workflow runs retrieved successfully", workflow_run_pagination_model) +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @marshal_with(workflow_run_pagination_model) +# def get(self, snippet: CustomizedSnippet): +# """List workflow runs for snippet.""" +# query = WorkflowRunQuery.model_validate( +# { +# "last_id": request.args.get("last_id"), +# "limit": request.args.get("limit", type=int, default=20), +# } +# ) +# args = { +# "last_id": query.last_id, +# "limit": query.limit, +# } + +# snippet_service = SnippetService() +# result = snippet_service.get_snippet_workflow_runs(snippet=snippet, args=args) + +# return result + + +# @console_ns.route("/snippets//workflow-runs/") +# class SnippetWorkflowRunDetailApi(Resource): +# @console_ns.doc("get_snippet_workflow_run_detail") +# @console_ns.response(200, "Workflow run detail retrieved successfully", workflow_run_detail_model) +# @console_ns.response(404, "Workflow run not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @marshal_with(workflow_run_detail_model) +# def get(self, snippet: CustomizedSnippet, run_id): +# """Get workflow run detail for snippet.""" +# run_id = str(run_id) + +# snippet_service = SnippetService() +# workflow_run = snippet_service.get_snippet_workflow_run(snippet=snippet, run_id=run_id) + +# if not workflow_run: +# raise NotFound("Workflow run not found") + +# return workflow_run + + +# @console_ns.route("/snippets//workflow-runs//node-executions") +# class SnippetWorkflowRunNodeExecutionsApi(Resource): +# @console_ns.doc("list_snippet_workflow_run_node_executions") +# @console_ns.response(200, "Node executions retrieved successfully", workflow_run_node_execution_list_model) +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @marshal_with(workflow_run_node_execution_list_model) +# def get(self, snippet: CustomizedSnippet, run_id): +# """List node executions for a workflow run.""" +# run_id = str(run_id) + +# snippet_service = SnippetService() +# node_executions = snippet_service.get_snippet_workflow_run_node_executions( +# snippet=snippet, +# run_id=run_id, +# ) + +# return {"data": node_executions} + + +# @console_ns.route("/snippets//workflows/draft/nodes//run") +# class SnippetDraftNodeRunApi(Resource): +# @console_ns.doc("run_snippet_draft_node") +# @console_ns.doc(description="Run a single node in snippet draft workflow (single-step debugging)") +# @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) +# @console_ns.expect(console_ns.models.get(SnippetDraftNodeRunPayload.__name__)) +# @console_ns.response(200, "Node run completed successfully", workflow_run_node_execution_model) +# @console_ns.response(404, "Snippet or draft workflow not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @marshal_with(workflow_run_node_execution_model) +# @edit_permission_required +# def post(self, snippet: CustomizedSnippet, node_id: str): +# """ +# Run a single node in snippet draft workflow. + +# Executes a specific node with provided inputs for single-step debugging. +# Returns the node execution result including status, outputs, and timing. +# """ +# current_user, _ = current_account_with_tenant() +# payload = SnippetDraftNodeRunPayload.model_validate(console_ns.payload or {}) + +# user_inputs = payload.inputs + +# # Get draft workflow for file parsing +# snippet_service = SnippetService() +# draft_workflow = snippet_service.get_draft_workflow(snippet=snippet) +# if not draft_workflow: +# raise NotFound("Draft workflow not found") + +# files = SnippetGenerateService.parse_files(draft_workflow, payload.files) + +# workflow_node_execution = SnippetGenerateService.run_draft_node( +# snippet=snippet, +# node_id=node_id, +# user_inputs=user_inputs, +# account=current_user, +# query=payload.query, +# files=files, +# ) + +# return workflow_node_execution + + +# @console_ns.route("/snippets//workflows/draft/nodes//last-run") +# class SnippetDraftNodeLastRunApi(Resource): +# @console_ns.doc("get_snippet_draft_node_last_run") +# @console_ns.doc(description="Get last run result for a node in snippet draft workflow") +# @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) +# @console_ns.response(200, "Node last run retrieved successfully", workflow_run_node_execution_model) +# @console_ns.response(404, "Snippet, draft workflow, or node last run not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @marshal_with(workflow_run_node_execution_model) +# def get(self, snippet: CustomizedSnippet, node_id: str): +# """ +# Get the last run result for a specific node in snippet draft workflow. + +# Returns the most recent execution record for the given node, +# including status, inputs, outputs, and timing information. +# """ +# snippet_service = SnippetService() +# draft_workflow = snippet_service.get_draft_workflow(snippet=snippet) +# if not draft_workflow: +# raise NotFound("Draft workflow not found") + +# node_exec = snippet_service.get_snippet_node_last_run( +# snippet=snippet, +# workflow=draft_workflow, +# node_id=node_id, +# ) +# if node_exec is None: +# raise NotFound("Node last run not found") + +# return node_exec + + +# @console_ns.route("/snippets//workflows/draft/iteration/nodes//run") +# class SnippetDraftRunIterationNodeApi(Resource): +# @console_ns.doc("run_snippet_draft_iteration_node") +# @console_ns.doc(description="Run draft workflow iteration node for snippet") +# @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) +# @console_ns.expect(console_ns.models.get(SnippetIterationNodeRunPayload.__name__)) +# @console_ns.response(200, "Iteration node run started successfully (SSE stream)") +# @console_ns.response(404, "Snippet or draft workflow not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def post(self, snippet: CustomizedSnippet, node_id: str): +# """ +# Run a draft workflow iteration node for snippet. + +# Iteration nodes execute their internal sub-graph multiple times over an input list. +# Returns an SSE event stream with iteration progress and results. +# """ +# current_user, _ = current_account_with_tenant() +# args = SnippetIterationNodeRunPayload.model_validate(console_ns.payload or {}).model_dump(exclude_none=True) + +# try: +# response = SnippetGenerateService.generate_single_iteration( +# snippet=snippet, user=current_user, node_id=node_id, args=args, streaming=True +# ) + +# return helper.compact_generate_response(response) +# except ValueError as e: +# raise e +# except Exception: +# logger.exception("internal server error.") +# raise InternalServerError() + + +# @console_ns.route("/snippets//workflows/draft/loop/nodes//run") +# class SnippetDraftRunLoopNodeApi(Resource): +# @console_ns.doc("run_snippet_draft_loop_node") +# @console_ns.doc(description="Run draft workflow loop node for snippet") +# @console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"}) +# @console_ns.expect(console_ns.models.get(SnippetLoopNodeRunPayload.__name__)) +# @console_ns.response(200, "Loop node run started successfully (SSE stream)") +# @console_ns.response(404, "Snippet or draft workflow not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def post(self, snippet: CustomizedSnippet, node_id: str): +# """ +# Run a draft workflow loop node for snippet. + +# Loop nodes execute their internal sub-graph repeatedly until a condition is met. +# Returns an SSE event stream with loop progress and results. +# """ +# current_user, _ = current_account_with_tenant() +# args = SnippetLoopNodeRunPayload.model_validate(console_ns.payload or {}) + +# try: +# response = SnippetGenerateService.generate_single_loop( +# snippet=snippet, user=current_user, node_id=node_id, args=args, streaming=True +# ) + +# return helper.compact_generate_response(response) +# except ValueError as e: +# raise e +# except Exception: +# logger.exception("internal server error.") +# raise InternalServerError() + + +# @console_ns.route("/snippets//workflows/draft/run") +# class SnippetDraftWorkflowRunApi(Resource): +# @console_ns.doc("run_snippet_draft_workflow") +# @console_ns.expect(console_ns.models.get(SnippetDraftRunPayload.__name__)) +# @console_ns.response(200, "Draft workflow run started successfully (SSE stream)") +# @console_ns.response(404, "Snippet or draft workflow not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def post(self, snippet: CustomizedSnippet): +# """ +# Run draft workflow for snippet. + +# Executes the snippet's draft workflow with the provided inputs +# and returns an SSE event stream with execution progress and results. +# """ +# current_user, _ = current_account_with_tenant() + +# payload = SnippetDraftRunPayload.model_validate(console_ns.payload or {}) +# args = payload.model_dump(exclude_none=True) + +# try: +# response = SnippetGenerateService.generate( +# snippet=snippet, +# user=current_user, +# args=args, +# invoke_from=InvokeFrom.DEBUGGER, +# streaming=True, +# ) + +# return helper.compact_generate_response(response) +# except ValueError as e: +# raise e +# except Exception: +# logger.exception("internal server error.") +# raise InternalServerError() + + +# @console_ns.route("/snippets//workflow-runs/tasks//stop") +# class SnippetWorkflowTaskStopApi(Resource): +# @console_ns.doc("stop_snippet_workflow_task") +# @console_ns.response(200, "Task stopped successfully") +# @console_ns.response(404, "Snippet not found") +# @setup_required +# @login_required +# @account_initialization_required +# @get_snippet +# @edit_permission_required +# def post(self, snippet: CustomizedSnippet, task_id: str): +# """ +# Stop a running snippet workflow task. + +# Uses both the legacy stop flag mechanism and the graph engine +# command channel for backward compatibility. +# """ +# # Stop using both mechanisms for backward compatibility +# # Legacy stop flag mechanism (without user check) +# AppQueueManager.set_stop_flag_no_user_check(task_id) + +# # New graph engine command channel mechanism +# GraphEngineManager(redis_client).send_stop_command(task_id) + +# return {"result": "success"}