fix: webhook node output file as file variable (#29621)

This commit is contained in:
wangxiaolei 2025-12-15 19:55:59 +08:00 committed by GitHub
parent a8f3061b3c
commit 09982a1c95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 585 additions and 44 deletions

View File

@ -1,14 +1,22 @@
import logging
from collections.abc import Mapping
from typing import Any
from core.file import FileTransferMethod
from core.variables.types import SegmentType
from core.variables.variables import FileVariable
from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.enums import NodeExecutionType, NodeType
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
from factories import file_factory
from factories.variable_factory import build_segment_with_type
from .entities import ContentType, WebhookData
logger = logging.getLogger(__name__)
class TriggerWebhookNode(Node[WebhookData]):
node_type = NodeType.TRIGGER_WEBHOOK
@ -60,6 +68,34 @@ class TriggerWebhookNode(Node[WebhookData]):
outputs=outputs,
)
def generate_file_var(self, param_name: str, file: dict):
related_id = file.get("related_id")
transfer_method_value = file.get("transfer_method")
if transfer_method_value:
transfer_method = FileTransferMethod.value_of(transfer_method_value)
match transfer_method:
case FileTransferMethod.LOCAL_FILE | FileTransferMethod.REMOTE_URL:
file["upload_file_id"] = related_id
case FileTransferMethod.TOOL_FILE:
file["tool_file_id"] = related_id
case FileTransferMethod.DATASOURCE_FILE:
file["datasource_file_id"] = related_id
try:
file_obj = file_factory.build_from_mapping(
mapping=file,
tenant_id=self.tenant_id,
)
file_segment = build_segment_with_type(SegmentType.FILE, file_obj)
return FileVariable(name=param_name, value=file_segment.value, selector=[self.id, param_name])
except ValueError:
logger.error(
"Failed to build FileVariable for webhook file parameter %s",
param_name,
exc_info=True,
)
return None
def _extract_configured_outputs(self, webhook_inputs: dict[str, Any]) -> dict[str, Any]:
"""Extract outputs based on node configuration from webhook inputs."""
outputs = {}
@ -107,18 +143,33 @@ class TriggerWebhookNode(Node[WebhookData]):
outputs[param_name] = str(webhook_data.get("body", {}).get("raw", ""))
continue
elif self.node_data.content_type == ContentType.BINARY:
outputs[param_name] = webhook_data.get("body", {}).get("raw", b"")
raw_data: dict = webhook_data.get("body", {}).get("raw", {})
file_var = self.generate_file_var(param_name, raw_data)
if file_var:
outputs[param_name] = file_var
else:
outputs[param_name] = raw_data
continue
if param_type == "file":
# Get File object (already processed by webhook controller)
file_obj = webhook_data.get("files", {}).get(param_name)
outputs[param_name] = file_obj
files = webhook_data.get("files", {})
if files and isinstance(files, dict):
file = files.get(param_name)
if file and isinstance(file, dict):
file_var = self.generate_file_var(param_name, file)
if file_var:
outputs[param_name] = file_var
else:
outputs[param_name] = files
else:
outputs[param_name] = files
else:
outputs[param_name] = files
else:
# Get regular body parameter
outputs[param_name] = webhook_data.get("body", {}).get(param_name)
# Include raw webhook data for debugging/advanced use
outputs["_webhook_raw"] = webhook_data
return outputs

View File

@ -1,3 +1,4 @@
import logging
import mimetypes
import os
import re
@ -17,6 +18,8 @@ from core.helper import ssrf_proxy
from extensions.ext_database import db
from models import MessageFile, ToolFile, UploadFile
logger = logging.getLogger(__name__)
def build_from_message_files(
*,
@ -356,15 +359,20 @@ def _build_from_tool_file(
transfer_method: FileTransferMethod,
strict_type_validation: bool = False,
) -> File:
# Backward/interop compatibility: allow tool_file_id to come from related_id or URL
tool_file_id = mapping.get("tool_file_id")
if not tool_file_id:
raise ValueError(f"ToolFile {tool_file_id} not found")
tool_file = db.session.scalar(
select(ToolFile).where(
ToolFile.id == mapping.get("tool_file_id"),
ToolFile.id == tool_file_id,
ToolFile.tenant_id == tenant_id,
)
)
if tool_file is None:
raise ValueError(f"ToolFile {mapping.get('tool_file_id')} not found")
raise ValueError(f"ToolFile {tool_file_id} not found")
extension = "." + tool_file.file_key.split(".")[-1] if "." in tool_file.file_key else ".bin"
@ -402,10 +410,13 @@ def _build_from_datasource_file(
transfer_method: FileTransferMethod,
strict_type_validation: bool = False,
) -> File:
datasource_file_id = mapping.get("datasource_file_id")
if not datasource_file_id:
raise ValueError(f"DatasourceFile {datasource_file_id} not found")
datasource_file = (
db.session.query(UploadFile)
.where(
UploadFile.id == mapping.get("datasource_file_id"),
UploadFile.id == datasource_file_id,
UploadFile.tenant_id == tenant_id,
)
.first()

View File

@ -233,7 +233,7 @@ class TestWebhookService:
"/webhook",
method="POST",
headers={"Content-Type": "multipart/form-data"},
data={"message": "test", "upload": file_storage},
data={"message": "test", "file": file_storage},
):
webhook_trigger = MagicMock()
webhook_trigger.tenant_id = "test_tenant"
@ -242,7 +242,7 @@ class TestWebhookService:
assert webhook_data["method"] == "POST"
assert webhook_data["body"]["message"] == "test"
assert "upload" in webhook_data["files"]
assert "file" in webhook_data["files"]
# Verify file processing was called
mock_external_dependencies["tool_file_manager"].assert_called_once()
@ -414,7 +414,7 @@ class TestWebhookService:
"data": {
"method": "post",
"content_type": "multipart/form-data",
"body": [{"name": "upload", "type": "file", "required": True}],
"body": [{"name": "file", "type": "file", "required": True}],
}
}

View File

@ -9,6 +9,7 @@ import io
from unittest.mock import MagicMock, patch
import pytest
from pandas.errors import ParserError
from werkzeug.datastructures import FileStorage
from configs import dify_config
@ -250,21 +251,22 @@ class TestAnnotationImportServiceValidation:
"""Test that invalid CSV format is handled gracefully."""
from services.annotation_service import AppAnnotationService
# Create CSV with only one column (should require at least 2 columns for question and answer)
csv_content = "single_column_header\nonly_one_value"
# Any content is fine once we force ParserError
csv_content = 'invalid,csv,format\nwith,unbalanced,quotes,and"stuff'
file = FileStorage(stream=io.BytesIO(csv_content.encode()), filename="test.csv", content_type="text/csv")
mock_db_session.query.return_value.where.return_value.first.return_value = mock_app
with patch("services.annotation_service.current_account_with_tenant") as mock_auth:
with (
patch("services.annotation_service.current_account_with_tenant") as mock_auth,
patch("services.annotation_service.pd.read_csv", side_effect=ParserError("malformed CSV")),
):
mock_auth.return_value = (MagicMock(id="user_id"), "tenant_id")
result = AppAnnotationService.batch_import_app_annotations("app_id", file)
# Should return error message about invalid format (less than 2 columns)
assert "error_msg" in result
assert "at least 2 columns" in result["error_msg"].lower()
assert "malformed" in result["error_msg"].lower()
def test_valid_import_succeeds(self, mock_app, mock_db_session):
"""Test that valid import request succeeds."""

View File

@ -0,0 +1,452 @@
"""
Unit tests for webhook file conversion fix.
This test verifies that webhook trigger nodes properly convert file dictionaries
to FileVariable objects, fixing the "Invalid variable type: ObjectVariable" error
when passing files to downstream LLM nodes.
"""
from unittest.mock import Mock, patch
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.graph_init_params import GraphInitParams
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.trigger_webhook.entities import (
ContentType,
Method,
WebhookBodyParameter,
WebhookData,
)
from core.workflow.nodes.trigger_webhook.node import TriggerWebhookNode
from core.workflow.runtime.graph_runtime_state import GraphRuntimeState
from core.workflow.runtime.variable_pool import VariablePool
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom
from models.workflow import WorkflowType
def create_webhook_node(
webhook_data: WebhookData,
variable_pool: VariablePool,
tenant_id: str = "test-tenant",
) -> TriggerWebhookNode:
"""Helper function to create a webhook node with proper initialization."""
node_config = {
"id": "webhook-node-1",
"data": webhook_data.model_dump(),
}
graph_init_params = GraphInitParams(
tenant_id=tenant_id,
app_id="test-app",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="test-workflow",
graph_config={},
user_id="test-user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
call_depth=0,
)
runtime_state = GraphRuntimeState(
variable_pool=variable_pool,
start_at=0,
)
node = TriggerWebhookNode(
id="webhook-node-1",
config=node_config,
graph_init_params=graph_init_params,
graph_runtime_state=runtime_state,
)
# Attach a lightweight app_config onto runtime state for tenant lookups
runtime_state.app_config = Mock()
runtime_state.app_config.tenant_id = tenant_id
# Provide compatibility alias expected by node implementation
# Some nodes reference `self.node_id`; expose it as an alias to `self.id` for tests
node.node_id = node.id
return node
def create_test_file_dict(
filename: str = "test.jpg",
file_type: str = "image",
transfer_method: str = "local_file",
) -> dict:
"""Create a test file dictionary as it would come from webhook service."""
return {
"id": "file-123",
"tenant_id": "test-tenant",
"type": file_type,
"filename": filename,
"extension": ".jpg",
"mime_type": "image/jpeg",
"transfer_method": transfer_method,
"related_id": "related-123",
"storage_key": "storage-key-123",
"size": 1024,
"url": "https://example.com/test.jpg",
"created_at": 1234567890,
"used_at": None,
"hash": "file-hash-123",
}
def test_webhook_node_file_conversion_to_file_variable():
"""Test that webhook node converts file dictionaries to FileVariable objects."""
# Create test file dictionary (as it comes from webhook service)
file_dict = create_test_file_dict("uploaded_image.jpg")
data = WebhookData(
title="Test Webhook with File",
method=Method.POST,
content_type=ContentType.FORM_DATA,
body=[
WebhookBodyParameter(name="image_upload", type="file", required=True),
WebhookBodyParameter(name="message", type="string", required=False),
],
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={
"webhook_data": {
"headers": {},
"query_params": {},
"body": {"message": "Test message"},
"files": {
"image_upload": file_dict,
},
}
},
)
node = create_webhook_node(data, variable_pool)
# Mock the file factory and variable factory
with (
patch("factories.file_factory.build_from_mapping") as mock_file_factory,
patch("core.workflow.nodes.trigger_webhook.node.build_segment_with_type") as mock_segment_factory,
patch("core.workflow.nodes.trigger_webhook.node.FileVariable") as mock_file_variable,
):
# Setup mocks
mock_file_obj = Mock()
mock_file_obj.to_dict.return_value = file_dict
mock_file_factory.return_value = mock_file_obj
mock_segment = Mock()
mock_segment.value = mock_file_obj
mock_segment_factory.return_value = mock_segment
mock_file_var_instance = Mock()
mock_file_variable.return_value = mock_file_var_instance
# Run the node
result = node._run()
# Verify successful execution
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
# Verify file factory was called with correct parameters
mock_file_factory.assert_called_once_with(
mapping=file_dict,
tenant_id="test-tenant",
)
# Verify segment factory was called to create FileSegment
mock_segment_factory.assert_called_once()
# Verify FileVariable was created with correct parameters
mock_file_variable.assert_called_once()
call_args = mock_file_variable.call_args[1]
assert call_args["name"] == "image_upload"
# value should be whatever build_segment_with_type.value returned
assert call_args["value"] == mock_segment.value
assert call_args["selector"] == ["webhook-node-1", "image_upload"]
# Verify output contains the FileVariable, not the original dict
assert result.outputs["image_upload"] == mock_file_var_instance
assert result.outputs["message"] == "Test message"
def test_webhook_node_file_conversion_with_missing_files():
"""Test webhook node file conversion with missing file parameter."""
data = WebhookData(
title="Test Webhook with Missing File",
method=Method.POST,
content_type=ContentType.FORM_DATA,
body=[
WebhookBodyParameter(name="missing_file", type="file", required=False),
],
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={
"webhook_data": {
"headers": {},
"query_params": {},
"body": {},
"files": {}, # No files
}
},
)
node = create_webhook_node(data, variable_pool)
# Run the node without patches (should handle None case gracefully)
result = node._run()
# Verify successful execution
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
# Verify missing file parameter is None
assert result.outputs["_webhook_raw"]["files"] == {}
def test_webhook_node_file_conversion_with_none_file():
"""Test webhook node file conversion with None file value."""
data = WebhookData(
title="Test Webhook with None File",
method=Method.POST,
content_type=ContentType.FORM_DATA,
body=[
WebhookBodyParameter(name="none_file", type="file", required=False),
],
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={
"webhook_data": {
"headers": {},
"query_params": {},
"body": {},
"files": {
"file": None,
},
}
},
)
node = create_webhook_node(data, variable_pool)
# Run the node without patches (should handle None case gracefully)
result = node._run()
# Verify successful execution
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
# Verify None file parameter is None
assert result.outputs["_webhook_raw"]["files"]["file"] is None
def test_webhook_node_file_conversion_with_non_dict_file():
"""Test webhook node file conversion with non-dict file value."""
data = WebhookData(
title="Test Webhook with Non-Dict File",
method=Method.POST,
content_type=ContentType.FORM_DATA,
body=[
WebhookBodyParameter(name="wrong_type", type="file", required=True),
],
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={
"webhook_data": {
"headers": {},
"query_params": {},
"body": {},
"files": {
"file": "not_a_dict", # Wrapped to match node expectation
},
}
},
)
node = create_webhook_node(data, variable_pool)
# Run the node without patches (should handle non-dict case gracefully)
result = node._run()
# Verify successful execution
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
# Verify fallback to original (wrapped) mapping
assert result.outputs["_webhook_raw"]["files"]["file"] == "not_a_dict"
def test_webhook_node_file_conversion_mixed_parameters():
"""Test webhook node with mixed parameter types including files."""
file_dict = create_test_file_dict("mixed_test.jpg")
data = WebhookData(
title="Test Webhook Mixed Parameters",
method=Method.POST,
content_type=ContentType.FORM_DATA,
headers=[],
params=[],
body=[
WebhookBodyParameter(name="text_param", type="string", required=True),
WebhookBodyParameter(name="number_param", type="number", required=False),
WebhookBodyParameter(name="file_param", type="file", required=True),
WebhookBodyParameter(name="bool_param", type="boolean", required=False),
],
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={
"webhook_data": {
"headers": {},
"query_params": {},
"body": {
"text_param": "Hello World",
"number_param": 42,
"bool_param": True,
},
"files": {
"file_param": file_dict,
},
}
},
)
node = create_webhook_node(data, variable_pool)
with (
patch("factories.file_factory.build_from_mapping") as mock_file_factory,
patch("core.workflow.nodes.trigger_webhook.node.build_segment_with_type") as mock_segment_factory,
patch("core.workflow.nodes.trigger_webhook.node.FileVariable") as mock_file_variable,
):
# Setup mocks for file
mock_file_obj = Mock()
mock_file_factory.return_value = mock_file_obj
mock_segment = Mock()
mock_segment.value = mock_file_obj
mock_segment_factory.return_value = mock_segment
mock_file_var = Mock()
mock_file_variable.return_value = mock_file_var
# Run the node
result = node._run()
# Verify successful execution
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
# Verify all parameters are present
assert result.outputs["text_param"] == "Hello World"
assert result.outputs["number_param"] == 42
assert result.outputs["bool_param"] is True
assert result.outputs["file_param"] == mock_file_var
# Verify file conversion was called
mock_file_factory.assert_called_once_with(
mapping=file_dict,
tenant_id="test-tenant",
)
def test_webhook_node_different_file_types():
"""Test webhook node file conversion with different file types."""
image_dict = create_test_file_dict("image.jpg", "image")
data = WebhookData(
title="Test Webhook Different File Types",
method=Method.POST,
content_type=ContentType.FORM_DATA,
body=[
WebhookBodyParameter(name="image", type="file", required=True),
WebhookBodyParameter(name="document", type="file", required=True),
WebhookBodyParameter(name="video", type="file", required=True),
],
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={
"webhook_data": {
"headers": {},
"query_params": {},
"body": {},
"files": {
"image": image_dict,
"document": create_test_file_dict("document.pdf", "document"),
"video": create_test_file_dict("video.mp4", "video"),
},
}
},
)
node = create_webhook_node(data, variable_pool)
with (
patch("factories.file_factory.build_from_mapping") as mock_file_factory,
patch("core.workflow.nodes.trigger_webhook.node.build_segment_with_type") as mock_segment_factory,
patch("core.workflow.nodes.trigger_webhook.node.FileVariable") as mock_file_variable,
):
# Setup mocks for all files
mock_file_objs = [Mock() for _ in range(3)]
mock_segments = [Mock() for _ in range(3)]
mock_file_vars = [Mock() for _ in range(3)]
# Map each segment.value to its corresponding mock file obj
for seg, f in zip(mock_segments, mock_file_objs):
seg.value = f
mock_file_factory.side_effect = mock_file_objs
mock_segment_factory.side_effect = mock_segments
mock_file_variable.side_effect = mock_file_vars
# Run the node
result = node._run()
# Verify successful execution
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
# Verify all file types were converted
assert mock_file_factory.call_count == 3
assert result.outputs["image"] == mock_file_vars[0]
assert result.outputs["document"] == mock_file_vars[1]
assert result.outputs["video"] == mock_file_vars[2]
def test_webhook_node_file_conversion_with_non_dict_wrapper():
"""Test webhook node file conversion when the file wrapper is not a dict."""
data = WebhookData(
title="Test Webhook with Non-dict File Wrapper",
method=Method.POST,
content_type=ContentType.FORM_DATA,
body=[
WebhookBodyParameter(name="non_dict_wrapper", type="file", required=True),
],
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={
"webhook_data": {
"headers": {},
"query_params": {},
"body": {},
"files": {
"file": "just a string",
},
}
},
)
node = create_webhook_node(data, variable_pool)
result = node._run()
# Verify successful execution (should not crash)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
# Verify fallback to original value
assert result.outputs["_webhook_raw"]["files"]["file"] == "just a string"

View File

@ -1,8 +1,10 @@
from unittest.mock import patch
import pytest
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File, FileTransferMethod, FileType
from core.variables import StringVariable
from core.variables import FileVariable, StringVariable
from core.workflow.entities.graph_init_params import GraphInitParams
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.trigger_webhook.entities import (
@ -27,9 +29,6 @@ def create_webhook_node(webhook_data: WebhookData, variable_pool: VariablePool)
"data": webhook_data.model_dump(),
}
node = TriggerWebhookNode(
id="1",
config=node_config,
graph_init_params = GraphInitParams(
tenant_id="1",
app_id="1",
@ -40,12 +39,23 @@ def create_webhook_node(webhook_data: WebhookData, variable_pool: VariablePool)
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
call_depth=0,
),
graph_runtime_state=GraphRuntimeState(
)
runtime_state = GraphRuntimeState(
variable_pool=variable_pool,
start_at=0,
),
)
node = TriggerWebhookNode(
id="1",
config=node_config,
graph_init_params=graph_init_params,
graph_runtime_state=runtime_state,
)
# Provide tenant_id for conversion path
runtime_state.app_config = type("_AppCfg", (), {"tenant_id": "1"})()
# Compatibility alias for some nodes referencing `self.node_id`
node.node_id = node.id
return node
@ -246,20 +256,27 @@ def test_webhook_node_run_with_file_params():
"query_params": {},
"body": {},
"files": {
"upload": file1,
"document": file2,
"upload": file1.to_dict(),
"document": file2.to_dict(),
},
}
},
)
node = create_webhook_node(data, variable_pool)
# Mock the file factory to avoid DB-dependent validation on upload_file_id
with patch("factories.file_factory.build_from_mapping") as mock_file_factory:
def _to_file(mapping, tenant_id, config=None, strict_type_validation=False):
return File.model_validate(mapping)
mock_file_factory.side_effect = _to_file
result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs["upload"] == file1
assert result.outputs["document"] == file2
assert result.outputs["missing_file"] is None
assert isinstance(result.outputs["upload"], FileVariable)
assert isinstance(result.outputs["document"], FileVariable)
assert result.outputs["upload"].value.filename == "image.jpg"
def test_webhook_node_run_mixed_parameters():
@ -291,19 +308,27 @@ def test_webhook_node_run_mixed_parameters():
"headers": {"Authorization": "Bearer token"},
"query_params": {"version": "v1"},
"body": {"message": "Test message"},
"files": {"upload": file_obj},
"files": {"upload": file_obj.to_dict()},
}
},
)
node = create_webhook_node(data, variable_pool)
# Mock the file factory to avoid DB-dependent validation on upload_file_id
with patch("factories.file_factory.build_from_mapping") as mock_file_factory:
def _to_file(mapping, tenant_id, config=None, strict_type_validation=False):
return File.model_validate(mapping)
mock_file_factory.side_effect = _to_file
result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs["Authorization"] == "Bearer token"
assert result.outputs["version"] == "v1"
assert result.outputs["message"] == "Test message"
assert result.outputs["upload"] == file_obj
assert isinstance(result.outputs["upload"], FileVariable)
assert result.outputs["upload"].value.filename == "test.jpg"
assert "_webhook_raw" in result.outputs

View File

@ -82,19 +82,19 @@ class TestWebhookServiceUnit:
"/webhook",
method="POST",
headers={"Content-Type": "multipart/form-data"},
data={"message": "test", "upload": file_storage},
data={"message": "test", "file": file_storage},
):
webhook_trigger = MagicMock()
webhook_trigger.tenant_id = "test_tenant"
with patch.object(WebhookService, "_process_file_uploads") as mock_process_files:
mock_process_files.return_value = {"upload": "mocked_file_obj"}
mock_process_files.return_value = {"file": "mocked_file_obj"}
webhook_data = WebhookService.extract_webhook_data(webhook_trigger)
assert webhook_data["method"] == "POST"
assert webhook_data["body"]["message"] == "test"
assert webhook_data["files"]["upload"] == "mocked_file_obj"
assert webhook_data["files"]["file"] == "mocked_file_obj"
mock_process_files.assert_called_once()
def test_extract_webhook_data_raw_text(self):