From bd31c6f90bda30db9a8e575f79c32629f195e817 Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 15 Oct 2025 14:45:00 +0800 Subject: [PATCH] refactor(trigger): Reinstate DraftWorkflowTriggerNodeApi with improved structure - Restored the `DraftWorkflowTriggerNodeApi` class to handle polling for trigger events in draft workflows. - Enhanced the implementation to utilize `TriggerDebugEvent` and `TriggerDebugEventPoller` for better event management. - Improved error handling and response structure for node execution, ensuring clarity in API responses. - Updated API documentation to reflect the restored functionality and parameters. --- api/controllers/console/app/workflow.py | 148 ++++++++++++------------ 1 file changed, 74 insertions(+), 74 deletions(-) diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index f14db11576..d23ff49aa9 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -1003,80 +1003,6 @@ class DraftWorkflowNodeLastRunApi(Resource): return node_exec -@console_ns.route("/apps//workflows/draft/nodes//trigger/run") -class DraftWorkflowTriggerNodeApi(Resource): - """ - Single node debug - Polling API for trigger events - Path: /apps//workflows/draft/nodes//trigger/run - """ - - @api.doc("poll_draft_workflow_trigger_node") - @api.doc(description="Poll for trigger events and execute single node when event arrives") - @api.doc(params={"app_id": "Application ID", "node_id": "Node ID"}) - @api.response(200, "Trigger event received and node executed successfully") - @api.response(403, "Permission denied") - @api.response(500, "Internal server error") - @setup_required - @login_required - @account_initialization_required - @get_app_model(mode=[AppMode.WORKFLOW]) - def post(self, app_model: App, node_id: str): - """ - Poll for trigger events and execute single node when event arrives - """ - if not isinstance(current_user, Account) or not current_user.has_edit_permission: - raise Forbidden() - - workflow_service = WorkflowService() - draft_workflow = workflow_service.get_draft_workflow(app_model) - if not draft_workflow: - raise ValueError("Workflow not found") - - node_config = draft_workflow.get_node_config_by_id(node_id=node_id) - if not node_config: - raise ValueError("Node data not found for node %s", node_id) - node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config) - event: TriggerDebugEvent | None = None - # for schedule trigger, when run single node, just execute directly - if node_type == NodeType.TRIGGER_SCHEDULE: - event = TriggerDebugEvent( - workflow_args={}, - node_id=node_id, - ) - # for other trigger types, poll for the event - else: - poller: TriggerDebugEventPoller = create_event_poller( - draft_workflow=draft_workflow, - tenant_id=app_model.tenant_id, - user_id=current_user.id, - app_id=app_model.id, - node_id=node_id, - ) - event = poller.poll() - - if not event: - return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN}) - try: - node_execution = workflow_service.run_draft_workflow_node( - app_model=app_model, - draft_workflow=draft_workflow, - node_id=node_id, - user_inputs=event.workflow_args, - account=current_user, - query="", - files=[], - ) - return jsonable_encoder(node_execution) - except Exception as e: - logger.exception("Error running draft workflow trigger node") - return jsonable_encoder( - { - "status": "error", - "error": str(e), - } - ), 500 - - @console_ns.route("/apps//workflows/draft/trigger/run") class DraftWorkflowTriggerRunApi(Resource): """ @@ -1160,6 +1086,80 @@ class DraftWorkflowTriggerRunApi(Resource): ), 500 +@console_ns.route("/apps//workflows/draft/nodes//trigger/run") +class DraftWorkflowTriggerNodeApi(Resource): + """ + Single node debug - Polling API for trigger events + Path: /apps//workflows/draft/nodes//trigger/run + """ + + @api.doc("poll_draft_workflow_trigger_node") + @api.doc(description="Poll for trigger events and execute single node when event arrives") + @api.doc(params={"app_id": "Application ID", "node_id": "Node ID"}) + @api.response(200, "Trigger event received and node executed successfully") + @api.response(403, "Permission denied") + @api.response(500, "Internal server error") + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW]) + def post(self, app_model: App, node_id: str): + """ + Poll for trigger events and execute single node when event arrives + """ + if not isinstance(current_user, Account) or not current_user.has_edit_permission: + raise Forbidden() + + workflow_service = WorkflowService() + draft_workflow = workflow_service.get_draft_workflow(app_model) + if not draft_workflow: + raise ValueError("Workflow not found") + + node_config = draft_workflow.get_node_config_by_id(node_id=node_id) + if not node_config: + raise ValueError("Node data not found for node %s", node_id) + node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config) + event: TriggerDebugEvent | None = None + # for schedule trigger, when run single node, just execute directly + if node_type == NodeType.TRIGGER_SCHEDULE: + event = TriggerDebugEvent( + workflow_args={}, + node_id=node_id, + ) + # for other trigger types, poll for the event + else: + poller: TriggerDebugEventPoller = create_event_poller( + draft_workflow=draft_workflow, + tenant_id=app_model.tenant_id, + user_id=current_user.id, + app_id=app_model.id, + node_id=node_id, + ) + event = poller.poll() + + if not event: + return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN}) + try: + node_execution = workflow_service.run_draft_workflow_node( + app_model=app_model, + draft_workflow=draft_workflow, + node_id=node_id, + user_inputs=event.workflow_args, + account=current_user, + query="", + files=[], + ) + return jsonable_encoder(node_execution) + except Exception as e: + logger.exception("Error running draft workflow trigger node") + return jsonable_encoder( + { + "status": "error", + "error": str(e), + } + ), 500 + + @console_ns.route("/apps//workflows/draft/trigger/run-all") class DraftWorkflowTriggerRunAllApi(Resource): """