diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index a9907ac981..de099c3e96 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -13,13 +13,13 @@ from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker from configs import dify_config -from core.app.apps.workflow.app_generator import WorkflowAppGenerator +from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY, WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.layers.timeslice_layer import TimeSliceLayer from core.app.layers.trigger_post_layer import TriggerPostLayer from extensions.ext_database import db from models.account import Account -from models.enums import CreatorUserRole, WorkflowTriggerStatus +from models.enums import AppTriggerType, CreatorUserRole, WorkflowTriggerStatus from models.model import App, EndUser, Tenant from models.trigger import WorkflowTriggerLog from models.workflow import Workflow @@ -81,6 +81,19 @@ def execute_workflow_sandbox(task_data_dict: dict[str, Any]): ) +def _build_generator_args(trigger_data: TriggerData) -> dict[str, Any]: + """Build args passed into WorkflowAppGenerator.generate for Celery executions.""" + args: dict[str, Any] = { + "inputs": dict(trigger_data.inputs), + "files": list(trigger_data.files), + } + + if trigger_data.trigger_type == AppTriggerType.TRIGGER_WEBHOOK: + args[SKIP_PREPARE_USER_INPUTS_KEY] = True # Webhooks already provide structured inputs + + return args + + def _execute_workflow_common( task_data: WorkflowTaskData, cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler, @@ -128,7 +141,7 @@ def _execute_workflow_common( generator = WorkflowAppGenerator() # Prepare args matching AppGenerateService.generate format - args: dict[str, Any] = {"inputs": dict(trigger_data.inputs), "files": list(trigger_data.files)} + args = _build_generator_args(trigger_data) # If workflow_id was specified, add it to args if trigger_data.workflow_id: diff --git a/api/tests/unit_tests/tasks/test_async_workflow_tasks.py b/api/tests/unit_tests/tasks/test_async_workflow_tasks.py new file mode 100644 index 0000000000..3923e256a6 --- /dev/null +++ b/api/tests/unit_tests/tasks/test_async_workflow_tasks.py @@ -0,0 +1,37 @@ +from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY +from models.enums import AppTriggerType, WorkflowRunTriggeredFrom +from services.workflow.entities import TriggerData, WebhookTriggerData +from tasks import async_workflow_tasks + + +def test_build_generator_args_sets_skip_flag_for_webhook(): + trigger_data = WebhookTriggerData( + app_id="app", + tenant_id="tenant", + workflow_id="workflow", + root_node_id="node", + inputs={"webhook_data": {"body": {"foo": "bar"}}}, + ) + + args = async_workflow_tasks._build_generator_args(trigger_data) + + assert args[SKIP_PREPARE_USER_INPUTS_KEY] is True + assert args["inputs"]["webhook_data"]["body"]["foo"] == "bar" + + +def test_build_generator_args_keeps_validation_for_other_triggers(): + trigger_data = TriggerData( + app_id="app", + tenant_id="tenant", + workflow_id="workflow", + root_node_id="node", + inputs={"foo": "bar"}, + files=[], + trigger_type=AppTriggerType.TRIGGER_SCHEDULE, + trigger_from=WorkflowRunTriggeredFrom.SCHEDULE, + ) + + args = async_workflow_tasks._build_generator_args(trigger_data) + + assert SKIP_PREPARE_USER_INPUTS_KEY not in args + assert args["inputs"] == {"foo": "bar"}