fix: propagate workflow call depth through HTTP recursion

This commit is contained in:
Yanli 盐粒 2026-03-20 04:03:00 +08:00
parent 5b9cb55c45
commit 4ecba5858b
19 changed files with 735 additions and 6 deletions

View File

@ -84,6 +84,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
user_id=self.application_generate_entity.user_id,
user_from=user_from,
invoke_from=invoke_from,
call_depth=self.application_generate_entity.call_depth,
root_node_id=self._root_node_id,
)
elif self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
@ -91,6 +92,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
workflow=self._workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
call_depth=self.application_generate_entity.call_depth,
)
else:
inputs = self.application_generate_entity.inputs
@ -120,6 +122,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
user_id=self.application_generate_entity.user_id,
user_from=user_from,
invoke_from=invoke_from,
call_depth=self.application_generate_entity.call_depth,
root_node_id=self._root_node_id,
)

View File

@ -102,6 +102,7 @@ class WorkflowBasedAppRunner:
graph_runtime_state: GraphRuntimeState,
user_from: UserFrom,
invoke_from: InvokeFrom,
call_depth: int = 0,
workflow_id: str = "",
tenant_id: str = "",
user_id: str = "",
@ -130,7 +131,7 @@ class WorkflowBasedAppRunner:
user_from=user_from,
invoke_from=invoke_from,
),
call_depth=0,
call_depth=call_depth,
)
# Use the provided graph_runtime_state for consistent state management
@ -156,6 +157,7 @@ class WorkflowBasedAppRunner:
workflow: Workflow,
single_iteration_run: Any | None = None,
single_loop_run: Any | None = None,
call_depth: int = 0,
) -> tuple[Graph, VariablePool, GraphRuntimeState]:
"""
Prepare graph, variable pool, and runtime state for single node execution
@ -189,6 +191,7 @@ class WorkflowBasedAppRunner:
node_id=single_iteration_run.node_id,
user_inputs=dict(single_iteration_run.inputs),
graph_runtime_state=graph_runtime_state,
call_depth=call_depth,
node_type_filter_key="iteration_id",
node_type_label="iteration",
)
@ -198,6 +201,7 @@ class WorkflowBasedAppRunner:
node_id=single_loop_run.node_id,
user_inputs=dict(single_loop_run.inputs),
graph_runtime_state=graph_runtime_state,
call_depth=call_depth,
node_type_filter_key="loop_id",
node_type_label="loop",
)
@ -214,6 +218,7 @@ class WorkflowBasedAppRunner:
node_id: str,
user_inputs: dict[str, Any],
graph_runtime_state: GraphRuntimeState,
call_depth: int,
node_type_filter_key: str, # 'iteration_id' or 'loop_id'
node_type_label: str = "node", # 'iteration' or 'loop' for error messages
) -> tuple[Graph, VariablePool]:
@ -283,7 +288,7 @@ class WorkflowBasedAppRunner:
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
call_depth=0,
call_depth=call_depth,
)
node_factory = DifyNodeFactory(

View File

@ -282,6 +282,7 @@ class DifyNodeFactory(NodeFactory):
max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE,
ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES,
secret_key=dify_config.SECRET_KEY,
)
self._llm_credentials_provider, self._llm_model_factory = build_dify_model_access(self._dify_context.tenant_id)

View File

@ -0,0 +1,27 @@
"""Helpers for workflow recursion depth propagation.
The HTTP request node emits a reserved depth header pair on outbound requests,
and ``services.trigger.webhook_service`` validates that pair when a webhook is
received. The signature binds the propagated depth to the concrete HTTP method
and request path so a depth value captured for one endpoint cannot be replayed
verbatim against another path.
"""
import hashlib
import hmac
def build_workflow_call_depth_signature(*, secret_key: str, method: str, path: str, depth: str) -> str:
"""Build the stable HMAC payload for workflow call-depth propagation.
Args:
secret_key: Shared signing key used by both sender and receiver.
method: Outbound or inbound HTTP method.
path: Request path that the signature is bound to.
depth: Workflow call depth value serialized as a string.
Returns:
Hex-encoded HMAC-SHA256 digest for the method/path/depth tuple.
"""
payload = f"{method.upper()}:{path}:{depth}"
return hmac.new(secret_key.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256).hexdigest()

View File

@ -2,3 +2,8 @@ SYSTEM_VARIABLE_NODE_ID = "sys"
ENVIRONMENT_VARIABLE_NODE_ID = "env"
CONVERSATION_VARIABLE_NODE_ID = "conversation"
RAG_PIPELINE_VARIABLE_NODE_ID = "rag"
# Reserved for internal workflow-to-workflow HTTP calls. External callers should
# not rely on or set this header.
WORKFLOW_CALL_DEPTH_HEADER = "X-Dify-Workflow-Call-Depth"
WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER = "X-Dify-Workflow-Call-Depth-Signature"

View File

@ -12,6 +12,7 @@ def build_http_request_config(
max_text_size: int = 1 * 1024 * 1024,
ssl_verify: bool = True,
ssrf_default_max_retries: int = 3,
secret_key: str = "",
) -> HttpRequestNodeConfig:
return HttpRequestNodeConfig(
max_connect_timeout=max_connect_timeout,
@ -21,6 +22,7 @@ def build_http_request_config(
max_text_size=max_text_size,
ssl_verify=ssl_verify,
ssrf_default_max_retries=ssrf_default_max_retries,
secret_key=secret_key,
)

View File

@ -76,6 +76,7 @@ class HttpRequestNodeConfig:
max_text_size: int
ssl_verify: bool
ssrf_default_max_retries: int
secret_key: str = ""
def default_timeout(self) -> "HttpRequestNodeTimeout":
return HttpRequestNodeTimeout(

View File

@ -1,3 +1,11 @@
"""HTTP request execution helpers for workflow nodes.
Besides normal request assembly, this executor is responsible for propagating
workflow recursion depth across outbound HTTP calls. The reserved call-depth
headers are always regenerated from the current node context so user-supplied
values cannot override or poison the propagation contract.
"""
import base64
import json
import secrets
@ -10,6 +18,8 @@ from urllib.parse import urlencode, urlparse
import httpx
from json_repair import repair_json
from dify_graph.call_depth import build_workflow_call_depth_signature
from dify_graph.constants import WORKFLOW_CALL_DEPTH_HEADER, WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER
from dify_graph.file.enums import FileTransferMethod
from dify_graph.runtime import VariablePool
from dify_graph.variables.segments import ArrayFileSegment, FileSegment
@ -41,6 +51,8 @@ BODY_TYPE_TO_CONTENT_TYPE = {
class Executor:
"""Prepare, execute, and log a workflow HTTP request node invocation."""
method: Literal[
"get",
"head",
@ -77,6 +89,7 @@ class Executor:
timeout: HttpRequestNodeTimeout,
variable_pool: VariablePool,
http_request_config: HttpRequestNodeConfig,
workflow_call_depth: int = 0,
max_retries: int | None = None,
ssl_verify: bool | None = None,
http_client: HttpClientProtocol,
@ -120,6 +133,7 @@ class Executor:
# init template
self.variable_pool = variable_pool
self.node_data = node_data
self.workflow_call_depth = workflow_call_depth
self._initialize()
def _initialize(self):
@ -272,8 +286,30 @@ class Executor:
self.data = form_data
def _assembling_headers(self) -> dict[str, Any]:
"""Assemble outbound headers for the request.
Reserved workflow call-depth headers are removed case-insensitively
before the canonical pair is re-added from ``workflow_call_depth``.
This keeps propagation deterministic even if a workflow author manually
configured colliding headers on the node.
"""
authorization = deepcopy(self.auth)
headers = deepcopy(self.headers) or {}
reserved_header_names = {
WORKFLOW_CALL_DEPTH_HEADER.lower(),
WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER.lower(),
}
headers = {k: v for k, v in headers.items() if k.lower() not in reserved_header_names}
parsed_url = urlparse(self.url)
next_call_depth = str(self.workflow_call_depth + 1)
headers[WORKFLOW_CALL_DEPTH_HEADER] = next_call_depth
headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] = build_workflow_call_depth_signature(
secret_key=self._http_request_config.secret_key,
method=self.method,
path=parsed_url.path,
depth=next_call_depth,
)
if self.auth.type == "api-key":
if self.auth.config is None:
raise AuthorizationConfigError("self.authorization config is required")
@ -388,6 +424,12 @@ class Executor:
return self._validate_and_parse_response(response)
def to_log(self):
"""Render the request in raw HTTP form for node logs.
Internal workflow call-depth headers and authentication headers are
masked so operational logs remain useful without exposing replayable or
credential-bearing values.
"""
url_parts = urlparse(self.url)
path = url_parts.path or "/"
@ -410,6 +452,12 @@ class Executor:
if body.type == "form-data":
headers["Content-Type"] = f"multipart/form-data; boundary={boundary}"
for k, v in headers.items():
if k.lower() in {
WORKFLOW_CALL_DEPTH_HEADER.lower(),
WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER.lower(),
}:
raw += f"{k}: [internal]\r\n"
continue
if self.auth.type == "api-key":
authorization_header = "Authorization"
if self.auth.config and self.auth.config.header:

View File

@ -101,6 +101,7 @@ class HttpRequestNode(Node[HttpRequestNodeData]):
timeout=self._get_request_timeout(self.node_data),
variable_pool=self.graph_runtime_state.variable_pool,
http_request_config=self._http_request_config,
workflow_call_depth=self.workflow_call_depth,
ssl_verify=self.node_data.ssl_verify,
http_client=self._http_client,
file_manager=self._file_manager,

View File

@ -402,6 +402,7 @@ class RagPipelineService:
max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE,
ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES,
secret_key=dify_config.SECRET_KEY,
)
}
default_config = node_class.get_default_config(filters=filters)
@ -435,6 +436,7 @@ class RagPipelineService:
max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE,
ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES,
secret_key=dify_config.SECRET_KEY,
)
default_config = node_class.get_default_config(filters=final_filters or None)
if not default_config:

View File

@ -1,3 +1,4 @@
import hmac
import json
import logging
import mimetypes
@ -23,6 +24,8 @@ from core.workflow.nodes.trigger_webhook.entities import (
WebhookData,
WebhookParameter,
)
from dify_graph.call_depth import build_workflow_call_depth_signature
from dify_graph.constants import WORKFLOW_CALL_DEPTH_HEADER, WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.file.models import FileTransferMethod
from dify_graph.variables.types import ArrayValidation, SegmentType
@ -57,10 +60,49 @@ class WebhookService:
@staticmethod
def _sanitize_key(key: str) -> str:
"""Normalize external keys (headers/params) to workflow-safe variables."""
if not isinstance(key, str):
return key
return key.replace("-", "_")
@classmethod
def extract_workflow_call_depth(cls, headers: Mapping[str, Any]) -> int:
"""Extract the reserved workflow recursion depth header.
The depth header is only trusted when accompanied by a valid HMAC
signature for the current request method/path/depth tuple. Header lookup
accepts either canonical or lower-case names because upstream proxies may
normalize casing. Invalid, missing, unsigned, or negative values are
treated as external requests and therefore fall back to depth 0.
"""
raw_value = headers.get(WORKFLOW_CALL_DEPTH_HEADER)
if raw_value is None:
raw_value = headers.get(WORKFLOW_CALL_DEPTH_HEADER.lower())
if raw_value is None:
return 0
raw_signature = headers.get(WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER)
if raw_signature is None:
raw_signature = headers.get(WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER.lower())
if raw_signature is None:
return 0
normalized_value = str(raw_value).strip()
# The receiver recomputes the signature from the current request context
# instead of trusting the sender's path or method directly.
expected_signature = build_workflow_call_depth_signature(
secret_key=dify_config.SECRET_KEY,
method=request.method,
path=request.path,
depth=normalized_value,
)
if not hmac.compare_digest(str(raw_signature).strip(), expected_signature):
return 0
try:
call_depth = int(normalized_value)
except (TypeError, ValueError):
return 0
return max(call_depth, 0)
@classmethod
def get_webhook_trigger_and_workflow(
cls, webhook_id: str, is_debug: bool = False
@ -764,12 +806,14 @@ class WebhookService:
workflow_inputs = cls.build_workflow_inputs(webhook_data)
# Create trigger data
trigger_call_depth = cls.extract_workflow_call_depth(dict(request.headers))
trigger_data = WebhookTriggerData(
app_id=webhook_trigger.app_id,
workflow_id=workflow.id,
root_node_id=webhook_trigger.node_id, # Start from the webhook node
inputs=workflow_inputs,
tenant_id=webhook_trigger.tenant_id,
call_depth=trigger_call_depth,
)
end_user = EndUserService.get_or_create_end_user_by_type(

View File

@ -26,7 +26,14 @@ class TriggerMetadata(BaseModel):
class TriggerData(BaseModel):
"""Base trigger data model for async workflow execution"""
"""Base trigger data model for async workflow execution.
`call_depth` tracks only the current workflow-to-workflow HTTP recursion
depth. It starts at 0 for external triggers and increments once per nested
webhook-triggered workflow call. For webhook triggers, the value is derived
from the reserved depth headers after `WebhookService.extract_workflow_call_depth`
validates the signature against the inbound request context.
"""
app_id: str
tenant_id: str
@ -34,6 +41,7 @@ class TriggerData(BaseModel):
root_node_id: str
inputs: Mapping[str, Any]
files: Sequence[Mapping[str, Any]] = Field(default_factory=list)
call_depth: int = 0
trigger_type: AppTriggerType
trigger_from: WorkflowRunTriggeredFrom
trigger_metadata: TriggerMetadata | None = None

View File

@ -638,6 +638,7 @@ class WorkflowService:
max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE,
ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES,
secret_key=dify_config.SECRET_KEY,
)
}
default_config = node_class.get_default_config(filters=filters)
@ -673,6 +674,7 @@ class WorkflowService:
max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE,
ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES,
secret_key=dify_config.SECRET_KEY,
)
default_config = node_class.get_default_config(filters=resolved_filters or None)
if not default_config:

View File

@ -164,7 +164,7 @@ def _execute_workflow_common(
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
call_depth=0,
call_depth=trigger_data.call_depth,
triggered_from=trigger_data.trigger_from,
root_node_id=trigger_data.root_node_id,
graph_engine_layers=[

View File

@ -124,6 +124,7 @@ class TestWorkflowBasedAppRunner:
node_id="node-1",
user_inputs={},
graph_runtime_state=graph_runtime_state,
call_depth=3,
node_type_filter_key="iteration_id",
node_type_label="iteration",
)
@ -131,6 +132,35 @@ class TestWorkflowBasedAppRunner:
assert graph is not None
assert variable_pool is graph_runtime_state.variable_pool
def test_init_graph_passes_call_depth_into_node_factory(self, monkeypatch):
runner = WorkflowBasedAppRunner(queue_manager=SimpleNamespace(), app_id="app")
runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=SystemVariable.default()),
start_at=0.0,
)
captured: dict[str, int] = {}
class _FakeNodeFactory:
def __init__(self, *, graph_init_params, graph_runtime_state):
captured["call_depth"] = graph_init_params.call_depth
monkeypatch.setattr("core.app.apps.workflow_app_runner.DifyNodeFactory", _FakeNodeFactory)
monkeypatch.setattr(
"core.app.apps.workflow_app_runner.Graph.init",
lambda **kwargs: SimpleNamespace(),
)
graph = runner._init_graph(
graph_config={"nodes": [{"id": "start", "data": {"type": "start", "version": "1"}}], "edges": []},
graph_runtime_state=runtime_state,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=4,
)
assert graph is not None
assert captured["call_depth"] == 4
def test_handle_graph_run_events_and_pause_notifications(self, monkeypatch):
published: list[object] = []

View File

@ -2,6 +2,8 @@ import pytest
from configs import dify_config
from core.helper.ssrf_proxy import ssrf_proxy
from dify_graph.call_depth import build_workflow_call_depth_signature
from dify_graph.constants import WORKFLOW_CALL_DEPTH_HEADER, WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER
from dify_graph.file.file_manager import file_manager
from dify_graph.nodes.http_request import (
BodyData,
@ -24,7 +26,9 @@ HTTP_REQUEST_CONFIG = HttpRequestNodeConfig(
max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE,
ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES,
secret_key=dify_config.SECRET_KEY,
)
TEST_SECRET_KEY = "test-secret-key"
def test_executor_with_json_body_and_number_variable():
@ -661,3 +665,275 @@ def test_executor_with_json_body_preserves_numbers_and_strings():
assert executor.json["count"] == 42
assert executor.json["id"] == "abc-123"
def test_executor_propagates_workflow_call_depth_header():
variable_pool = VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
)
node_data = HttpRequestNodeData(
title="Depth propagation",
method="get",
url="http://localhost:5001/triggers/webhook/test-webhook",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers="X-Test: value",
params="",
)
executor = Executor(
node_data=node_data,
timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30),
http_request_config=HttpRequestNodeConfig(
max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout,
max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout,
max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout,
max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size,
max_text_size=HTTP_REQUEST_CONFIG.max_text_size,
ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify,
ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries,
secret_key=TEST_SECRET_KEY,
),
variable_pool=variable_pool,
workflow_call_depth=2,
http_client=ssrf_proxy,
file_manager=file_manager,
)
headers = executor._assembling_headers()
assert headers["X-Test"] == "value"
assert headers[WORKFLOW_CALL_DEPTH_HEADER] == "3"
assert headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] == build_workflow_call_depth_signature(
secret_key=TEST_SECRET_KEY,
method="get",
path="/triggers/webhook/test-webhook",
depth="3",
)
def test_executor_replaces_lowercase_reserved_headers_for_internal_webhook_target():
variable_pool = VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
)
node_data = HttpRequestNodeData(
title="Reserved header replacement",
method="get",
url="http://localhost:5001/triggers/webhook/test-webhook",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers=(
"x-dify-workflow-call-depth: user-value\n"
"x-dify-workflow-call-depth-signature: user-signature\n"
"X-Test: value"
),
params="",
)
executor = Executor(
node_data=node_data,
timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30),
http_request_config=HttpRequestNodeConfig(
max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout,
max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout,
max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout,
max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size,
max_text_size=HTTP_REQUEST_CONFIG.max_text_size,
ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify,
ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries,
secret_key=TEST_SECRET_KEY,
),
variable_pool=variable_pool,
workflow_call_depth=2,
http_client=ssrf_proxy,
file_manager=file_manager,
)
headers = executor._assembling_headers()
assert headers["X-Test"] == "value"
assert headers[WORKFLOW_CALL_DEPTH_HEADER] == "3"
assert headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] == build_workflow_call_depth_signature(
secret_key=TEST_SECRET_KEY,
method="get",
path="/triggers/webhook/test-webhook",
depth="3",
)
assert "x-dify-workflow-call-depth" not in headers
assert "x-dify-workflow-call-depth-signature" not in headers
def test_executor_propagates_workflow_call_depth_with_empty_secret():
variable_pool = VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
)
node_data = HttpRequestNodeData(
title="Depth propagation with empty secret",
method="get",
url="http://localhost:5001/triggers/webhook/test-webhook",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers="X-Test: value",
params="",
)
executor = Executor(
node_data=node_data,
timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30),
http_request_config=HttpRequestNodeConfig(
max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout,
max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout,
max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout,
max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size,
max_text_size=HTTP_REQUEST_CONFIG.max_text_size,
ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify,
ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries,
secret_key="",
),
variable_pool=variable_pool,
workflow_call_depth=2,
http_client=ssrf_proxy,
file_manager=file_manager,
)
headers = executor._assembling_headers()
assert headers[WORKFLOW_CALL_DEPTH_HEADER] == "3"
assert headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] == build_workflow_call_depth_signature(
secret_key="",
method="get",
path="/triggers/webhook/test-webhook",
depth="3",
)
def test_executor_log_masks_internal_depth_headers():
variable_pool = VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
)
node_data = HttpRequestNodeData(
title="Depth propagation",
method="get",
url="http://localhost:5001/triggers/webhook/test-webhook",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers="X-Test: value",
params="",
)
executor = Executor(
node_data=node_data,
timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30),
http_request_config=HttpRequestNodeConfig(
max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout,
max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout,
max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout,
max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size,
max_text_size=HTTP_REQUEST_CONFIG.max_text_size,
ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify,
ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries,
secret_key=TEST_SECRET_KEY,
),
variable_pool=variable_pool,
workflow_call_depth=2,
http_client=ssrf_proxy,
file_manager=file_manager,
)
raw_log = executor.to_log()
assert f"{WORKFLOW_CALL_DEPTH_HEADER}: [internal]" in raw_log
assert f"{WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER}: [internal]" in raw_log
assert "X-Dify-Workflow-Call-Depth: 3" not in raw_log
def test_executor_log_masks_reserved_headers_regardless_of_case():
variable_pool = VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
)
node_data = HttpRequestNodeData(
title="Reserved header replacement",
method="get",
url="http://localhost:5001/triggers/webhook/test-webhook",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers=(
"x-dify-workflow-call-depth: user-value\n"
"x-dify-workflow-call-depth-signature: user-signature\n"
"X-Test: value"
),
params="",
)
executor = Executor(
node_data=node_data,
timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30),
http_request_config=HttpRequestNodeConfig(
max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout,
max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout,
max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout,
max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size,
max_text_size=HTTP_REQUEST_CONFIG.max_text_size,
ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify,
ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries,
secret_key=TEST_SECRET_KEY,
),
variable_pool=variable_pool,
workflow_call_depth=2,
http_client=ssrf_proxy,
file_manager=file_manager,
)
raw_log = executor.to_log()
assert "x-dify-workflow-call-depth: [internal]" not in raw_log
assert "x-dify-workflow-call-depth-signature: [internal]" not in raw_log
assert f"{WORKFLOW_CALL_DEPTH_HEADER}: [internal]" in raw_log
assert f"{WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER}: [internal]" in raw_log
assert "user-signature" not in raw_log
assert "user-value" not in raw_log
def test_executor_propagates_workflow_call_depth_to_arbitrary_target_with_secret():
variable_pool = VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
)
node_data = HttpRequestNodeData(
title="External target",
method="get",
url="https://api.example.com/data",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers="X-Test: value",
params="",
)
executor = Executor(
node_data=node_data,
timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30),
http_request_config=HttpRequestNodeConfig(
max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout,
max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout,
max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout,
max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size,
max_text_size=HTTP_REQUEST_CONFIG.max_text_size,
ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify,
ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries,
secret_key=TEST_SECRET_KEY,
),
variable_pool=variable_pool,
workflow_call_depth=2,
http_client=ssrf_proxy,
file_manager=file_manager,
)
headers = executor._assembling_headers()
assert headers["X-Test"] == "value"
assert headers[WORKFLOW_CALL_DEPTH_HEADER] == "3"
assert headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] == build_workflow_call_depth_signature(
secret_key=TEST_SECRET_KEY,
method="get",
path="/data",
depth="3",
)

View File

@ -2,6 +2,8 @@
from unittest.mock import MagicMock, patch
import pytest
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.graph_engine.command_channels.redis_channel import RedisChannel
@ -134,3 +136,27 @@ class TestWorkflowEntryRedisChannel:
assert len(events) == 2
assert events[0] == mock_event1
assert events[1] == mock_event2
def test_workflow_entry_rejects_depth_over_limit(self):
mock_graph = MagicMock()
mock_variable_pool = MagicMock(spec=VariablePool)
mock_graph_runtime_state = MagicMock(spec=GraphRuntimeState)
mock_graph_runtime_state.variable_pool = mock_variable_pool
with (
patch("core.workflow.workflow_entry.dify_config.WORKFLOW_CALL_MAX_DEPTH", 1),
pytest.raises(ValueError, match="Max workflow call depth 1 reached."),
):
WorkflowEntry(
tenant_id="test-tenant",
app_id="test-app",
workflow_id="test-workflow",
graph_config={"nodes": [], "edges": []},
graph=mock_graph,
user_id="test-user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=2,
variable_pool=mock_variable_pool,
graph_runtime_state=mock_graph_runtime_state,
)

View File

@ -5,8 +5,13 @@ import pytest
from flask import Flask
from werkzeug.datastructures import FileStorage
from configs import dify_config
from dify_graph.call_depth import build_workflow_call_depth_signature
from dify_graph.constants import WORKFLOW_CALL_DEPTH_HEADER, WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER
from services.trigger.webhook_service import WebhookService
TEST_SECRET_KEY = "test-secret-key"
class TestWebhookServiceUnit:
"""Unit tests for WebhookService focusing on business logic without database dependencies."""
@ -559,3 +564,206 @@ class TestWebhookServiceUnit:
result = _prepare_webhook_execution("test_webhook", is_debug=True)
assert result == (mock_trigger, mock_workflow, mock_config, mock_data, None)
def test_extract_workflow_call_depth_defaults_to_zero_for_invalid_values(self):
assert WebhookService.extract_workflow_call_depth({}) == 0
assert WebhookService.extract_workflow_call_depth({WORKFLOW_CALL_DEPTH_HEADER: "abc"}) == 0
assert WebhookService.extract_workflow_call_depth({WORKFLOW_CALL_DEPTH_HEADER.lower(): "-1"}) == 0
def test_extract_workflow_call_depth_ignores_unsigned_external_header(self):
assert WebhookService.extract_workflow_call_depth({WORKFLOW_CALL_DEPTH_HEADER: "5"}) == 0
def test_extract_workflow_call_depth_honors_signed_internal_header(self):
app = Flask(__name__)
with (
patch("services.trigger.webhook_service.dify_config.SECRET_KEY", TEST_SECRET_KEY),
app.test_request_context("/triggers/webhook/test-webhook", method="POST"),
):
signature = build_workflow_call_depth_signature(
secret_key=TEST_SECRET_KEY,
method="POST",
path="/triggers/webhook/test-webhook",
depth="4",
)
assert (
WebhookService.extract_workflow_call_depth(
{
WORKFLOW_CALL_DEPTH_HEADER: "4",
WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: signature,
}
)
== 4
)
def test_extract_workflow_call_depth_rejects_signature_for_other_path(self):
app = Flask(__name__)
with (
patch("services.trigger.webhook_service.dify_config.SECRET_KEY", TEST_SECRET_KEY),
app.test_request_context("/triggers/webhook/right-webhook", method="POST"),
):
wrong_signature = build_workflow_call_depth_signature(
secret_key=TEST_SECRET_KEY,
method="POST",
path="/triggers/webhook/wrong-webhook",
depth="4",
)
assert (
WebhookService.extract_workflow_call_depth(
{
WORKFLOW_CALL_DEPTH_HEADER: "4",
WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: wrong_signature,
}
)
== 0
)
@patch("services.trigger.webhook_service.dify_config")
def test_extract_workflow_call_depth_honors_signature_with_empty_secret(self, mock_config):
app = Flask(__name__)
mock_config.SECRET_KEY = ""
with app.test_request_context("/triggers/webhook/test-webhook", method="POST"):
signature = build_workflow_call_depth_signature(
secret_key="",
method="POST",
path="/triggers/webhook/test-webhook",
depth="4",
)
assert (
WebhookService.extract_workflow_call_depth(
{
WORKFLOW_CALL_DEPTH_HEADER: "4",
WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: signature,
}
)
== 4
)
@patch("services.trigger.webhook_service.QuotaType")
@patch("services.trigger.webhook_service.EndUserService")
@patch("services.trigger.webhook_service.AsyncWorkflowService")
@patch("services.trigger.webhook_service.Session")
@patch("services.trigger.webhook_service.db")
def test_trigger_workflow_execution_preserves_header_depth(
self,
mock_db,
mock_session,
mock_async_workflow_service,
mock_end_user_service,
mock_quota_type,
):
app = Flask(__name__)
webhook_trigger = MagicMock(app_id="app", tenant_id="tenant", node_id="root", webhook_id="webhook")
workflow = MagicMock(id="workflow")
mock_end_user = MagicMock()
mock_end_user_service.get_or_create_end_user_by_type.return_value = mock_end_user
mock_db.engine = MagicMock()
mock_session.return_value.__enter__.return_value = MagicMock()
signature = build_workflow_call_depth_signature(
secret_key=TEST_SECRET_KEY,
method="POST",
path="/triggers/webhook/test-webhook",
depth="4",
)
with (
patch("services.trigger.webhook_service.dify_config.SECRET_KEY", TEST_SECRET_KEY),
app.test_request_context(
"/triggers/webhook/test-webhook",
method="POST",
headers={
WORKFLOW_CALL_DEPTH_HEADER: "4",
WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: signature,
},
),
):
WebhookService.trigger_workflow_execution(
webhook_trigger,
{"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}},
workflow,
)
trigger_data = mock_async_workflow_service.trigger_workflow_async.call_args.args[2]
assert trigger_data.call_depth == 4
@patch("services.trigger.webhook_service.QuotaType")
@patch("services.trigger.webhook_service.EndUserService")
@patch("services.trigger.webhook_service.AsyncWorkflowService")
@patch("services.trigger.webhook_service.Session")
@patch("services.trigger.webhook_service.db")
def test_trigger_workflow_execution_ignores_spoofed_external_depth(
self,
mock_db,
mock_session,
mock_async_workflow_service,
mock_end_user_service,
mock_quota_type,
):
app = Flask(__name__)
webhook_trigger = MagicMock(app_id="app", tenant_id="tenant", node_id="root", webhook_id="webhook")
workflow = MagicMock(id="workflow")
mock_end_user_service.get_or_create_end_user_by_type.return_value = MagicMock()
mock_db.engine = MagicMock()
mock_session.return_value.__enter__.return_value = MagicMock()
with app.test_request_context(
"/triggers/webhook/test-webhook",
method="POST",
headers={WORKFLOW_CALL_DEPTH_HEADER: "5"},
):
WebhookService.trigger_workflow_execution(
webhook_trigger,
{"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}},
workflow,
)
trigger_data = mock_async_workflow_service.trigger_workflow_async.call_args.args[2]
assert trigger_data.call_depth == 0
@patch("services.trigger.webhook_service.QuotaType")
@patch("services.trigger.webhook_service.EndUserService")
@patch("services.trigger.webhook_service.AsyncWorkflowService")
@patch("services.trigger.webhook_service.Session")
@patch("services.trigger.webhook_service.db")
def test_trigger_workflow_execution_rejects_signature_captured_from_non_webhook_request(
self,
mock_db,
mock_session,
mock_async_workflow_service,
mock_end_user_service,
mock_quota_type,
):
app = Flask(__name__)
webhook_trigger = MagicMock(app_id="app", tenant_id="tenant", node_id="root", webhook_id="webhook")
workflow = MagicMock(id="workflow")
mock_end_user_service.get_or_create_end_user_by_type.return_value = MagicMock()
mock_db.engine = MagicMock()
mock_session.return_value.__enter__.return_value = MagicMock()
captured_signature = build_workflow_call_depth_signature(
secret_key=dify_config.SECRET_KEY,
method="GET",
path="/v1/external-endpoint",
depth="5",
)
with app.test_request_context(
"/triggers/webhook/test-webhook",
method="POST",
headers={
WORKFLOW_CALL_DEPTH_HEADER: "5",
WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: captured_signature,
},
):
WebhookService.trigger_workflow_execution(
webhook_trigger,
{"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}},
workflow,
)
trigger_data = mock_async_workflow_service.trigger_workflow_async.call_args.args[2]
assert trigger_data.call_depth == 0

View File

@ -1,3 +1,5 @@
from unittest.mock import MagicMock, patch
from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
from services.workflow.entities import WebhookTriggerData
from tasks import async_workflow_tasks
@ -16,3 +18,41 @@ def test_build_generator_args_sets_skip_flag_for_webhook():
assert args[SKIP_PREPARE_USER_INPUTS_KEY] is True
assert args["inputs"]["webhook_data"]["body"]["foo"] == "bar"
def test_execute_workflow_common_uses_trigger_call_depth():
trigger_data = WebhookTriggerData(
app_id="app",
tenant_id="tenant",
workflow_id="workflow",
root_node_id="node",
inputs={"webhook_data": {"body": {}}},
call_depth=3,
)
trigger_log = MagicMock(
id="log-id",
app_id="app",
workflow_id="workflow",
trigger_data=trigger_data.model_dump_json(),
)
trigger_log_repo = MagicMock()
trigger_log_repo.get_by_id.return_value = trigger_log
session = MagicMock()
session.scalar.side_effect = [MagicMock(), MagicMock()]
session_context = MagicMock()
session_context.__enter__.return_value = session
workflow_generator = MagicMock()
with (
patch.object(async_workflow_tasks.session_factory, "create_session", return_value=session_context),
patch.object(async_workflow_tasks, "SQLAlchemyWorkflowTriggerLogRepository", return_value=trigger_log_repo),
patch.object(async_workflow_tasks, "_get_user", return_value=MagicMock()),
patch.object(async_workflow_tasks, "WorkflowAppGenerator", return_value=workflow_generator),
):
async_workflow_tasks._execute_workflow_common(
async_workflow_tasks.WorkflowTaskData(workflow_trigger_log_id="log-id"),
MagicMock(),
MagicMock(),
)
assert workflow_generator.generate.call_args.kwargs["call_depth"] == 3