diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index caea8b6b95..c979543b43 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -84,6 +84,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): user_id=self.application_generate_entity.user_id, user_from=user_from, invoke_from=invoke_from, + call_depth=self.application_generate_entity.call_depth, root_node_id=self._root_node_id, ) elif self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run: @@ -91,6 +92,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): workflow=self._workflow, single_iteration_run=self.application_generate_entity.single_iteration_run, single_loop_run=self.application_generate_entity.single_loop_run, + call_depth=self.application_generate_entity.call_depth, ) else: inputs = self.application_generate_entity.inputs @@ -120,6 +122,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): user_id=self.application_generate_entity.user_id, user_from=user_from, invoke_from=invoke_from, + call_depth=self.application_generate_entity.call_depth, root_node_id=self._root_node_id, ) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index adc6cce9af..55b73febb2 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -102,6 +102,7 @@ class WorkflowBasedAppRunner: graph_runtime_state: GraphRuntimeState, user_from: UserFrom, invoke_from: InvokeFrom, + call_depth: int = 0, workflow_id: str = "", tenant_id: str = "", user_id: str = "", @@ -130,7 +131,7 @@ class WorkflowBasedAppRunner: user_from=user_from, invoke_from=invoke_from, ), - call_depth=0, + call_depth=call_depth, ) # Use the provided graph_runtime_state for consistent state management @@ -156,6 +157,7 @@ class WorkflowBasedAppRunner: workflow: Workflow, single_iteration_run: Any | None = None, single_loop_run: Any | None = None, + call_depth: int = 0, ) -> tuple[Graph, VariablePool, GraphRuntimeState]: """ Prepare graph, variable pool, and runtime state for single node execution @@ -189,6 +191,7 @@ class WorkflowBasedAppRunner: node_id=single_iteration_run.node_id, user_inputs=dict(single_iteration_run.inputs), graph_runtime_state=graph_runtime_state, + call_depth=call_depth, node_type_filter_key="iteration_id", node_type_label="iteration", ) @@ -198,6 +201,7 @@ class WorkflowBasedAppRunner: node_id=single_loop_run.node_id, user_inputs=dict(single_loop_run.inputs), graph_runtime_state=graph_runtime_state, + call_depth=call_depth, node_type_filter_key="loop_id", node_type_label="loop", ) @@ -214,6 +218,7 @@ class WorkflowBasedAppRunner: node_id: str, user_inputs: dict[str, Any], graph_runtime_state: GraphRuntimeState, + call_depth: int, node_type_filter_key: str, # 'iteration_id' or 'loop_id' node_type_label: str = "node", # 'iteration' or 'loop' for error messages ) -> tuple[Graph, VariablePool]: @@ -283,7 +288,7 @@ class WorkflowBasedAppRunner: user_from=UserFrom.ACCOUNT, invoke_from=InvokeFrom.DEBUGGER, ), - call_depth=0, + call_depth=call_depth, ) node_factory = DifyNodeFactory( diff --git a/api/core/workflow/node_factory.py b/api/core/workflow/node_factory.py index ab34263a79..58c70d3ade 100644 --- a/api/core/workflow/node_factory.py +++ b/api/core/workflow/node_factory.py @@ -282,6 +282,7 @@ class DifyNodeFactory(NodeFactory): max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE, ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY, ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES, + secret_key=dify_config.SECRET_KEY, ) self._llm_credentials_provider, self._llm_model_factory = build_dify_model_access(self._dify_context.tenant_id) diff --git a/api/dify_graph/call_depth.py b/api/dify_graph/call_depth.py new file mode 100644 index 0000000000..0f89f47e6f --- /dev/null +++ b/api/dify_graph/call_depth.py @@ -0,0 +1,27 @@ +"""Helpers for workflow recursion depth propagation. + +The HTTP request node emits a reserved depth header pair on outbound requests, +and ``services.trigger.webhook_service`` validates that pair when a webhook is +received. The signature binds the propagated depth to the concrete HTTP method +and request path so a depth value captured for one endpoint cannot be replayed +verbatim against another path. +""" + +import hashlib +import hmac + + +def build_workflow_call_depth_signature(*, secret_key: str, method: str, path: str, depth: str) -> str: + """Build the stable HMAC payload for workflow call-depth propagation. + + Args: + secret_key: Shared signing key used by both sender and receiver. + method: Outbound or inbound HTTP method. + path: Request path that the signature is bound to. + depth: Workflow call depth value serialized as a string. + + Returns: + Hex-encoded HMAC-SHA256 digest for the method/path/depth tuple. + """ + payload = f"{method.upper()}:{path}:{depth}" + return hmac.new(secret_key.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256).hexdigest() diff --git a/api/dify_graph/constants.py b/api/dify_graph/constants.py index 7664be0983..9e0546da7a 100644 --- a/api/dify_graph/constants.py +++ b/api/dify_graph/constants.py @@ -2,3 +2,8 @@ SYSTEM_VARIABLE_NODE_ID = "sys" ENVIRONMENT_VARIABLE_NODE_ID = "env" CONVERSATION_VARIABLE_NODE_ID = "conversation" RAG_PIPELINE_VARIABLE_NODE_ID = "rag" + +# Reserved for internal workflow-to-workflow HTTP calls. External callers should +# not rely on or set this header. +WORKFLOW_CALL_DEPTH_HEADER = "X-Dify-Workflow-Call-Depth" +WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER = "X-Dify-Workflow-Call-Depth-Signature" diff --git a/api/dify_graph/nodes/http_request/config.py b/api/dify_graph/nodes/http_request/config.py index 53bf6c7ae4..b293eca429 100644 --- a/api/dify_graph/nodes/http_request/config.py +++ b/api/dify_graph/nodes/http_request/config.py @@ -12,6 +12,7 @@ def build_http_request_config( max_text_size: int = 1 * 1024 * 1024, ssl_verify: bool = True, ssrf_default_max_retries: int = 3, + secret_key: str = "", ) -> HttpRequestNodeConfig: return HttpRequestNodeConfig( max_connect_timeout=max_connect_timeout, @@ -21,6 +22,7 @@ def build_http_request_config( max_text_size=max_text_size, ssl_verify=ssl_verify, ssrf_default_max_retries=ssrf_default_max_retries, + secret_key=secret_key, ) diff --git a/api/dify_graph/nodes/http_request/entities.py b/api/dify_graph/nodes/http_request/entities.py index f594d58ae6..4d42621d4b 100644 --- a/api/dify_graph/nodes/http_request/entities.py +++ b/api/dify_graph/nodes/http_request/entities.py @@ -76,6 +76,7 @@ class HttpRequestNodeConfig: max_text_size: int ssl_verify: bool ssrf_default_max_retries: int + secret_key: str = "" def default_timeout(self) -> "HttpRequestNodeTimeout": return HttpRequestNodeTimeout( diff --git a/api/dify_graph/nodes/http_request/executor.py b/api/dify_graph/nodes/http_request/executor.py index 892b0fc688..f0736324c9 100644 --- a/api/dify_graph/nodes/http_request/executor.py +++ b/api/dify_graph/nodes/http_request/executor.py @@ -1,3 +1,11 @@ +"""HTTP request execution helpers for workflow nodes. + +Besides normal request assembly, this executor is responsible for propagating +workflow recursion depth across outbound HTTP calls. The reserved call-depth +headers are always regenerated from the current node context so user-supplied +values cannot override or poison the propagation contract. +""" + import base64 import json import secrets @@ -10,6 +18,8 @@ from urllib.parse import urlencode, urlparse import httpx from json_repair import repair_json +from dify_graph.call_depth import build_workflow_call_depth_signature +from dify_graph.constants import WORKFLOW_CALL_DEPTH_HEADER, WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER from dify_graph.file.enums import FileTransferMethod from dify_graph.runtime import VariablePool from dify_graph.variables.segments import ArrayFileSegment, FileSegment @@ -41,6 +51,8 @@ BODY_TYPE_TO_CONTENT_TYPE = { class Executor: + """Prepare, execute, and log a workflow HTTP request node invocation.""" + method: Literal[ "get", "head", @@ -77,6 +89,7 @@ class Executor: timeout: HttpRequestNodeTimeout, variable_pool: VariablePool, http_request_config: HttpRequestNodeConfig, + workflow_call_depth: int = 0, max_retries: int | None = None, ssl_verify: bool | None = None, http_client: HttpClientProtocol, @@ -120,6 +133,7 @@ class Executor: # init template self.variable_pool = variable_pool self.node_data = node_data + self.workflow_call_depth = workflow_call_depth self._initialize() def _initialize(self): @@ -272,8 +286,30 @@ class Executor: self.data = form_data def _assembling_headers(self) -> dict[str, Any]: + """Assemble outbound headers for the request. + + Reserved workflow call-depth headers are removed case-insensitively + before the canonical pair is re-added from ``workflow_call_depth``. + This keeps propagation deterministic even if a workflow author manually + configured colliding headers on the node. + """ authorization = deepcopy(self.auth) headers = deepcopy(self.headers) or {} + reserved_header_names = { + WORKFLOW_CALL_DEPTH_HEADER.lower(), + WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER.lower(), + } + headers = {k: v for k, v in headers.items() if k.lower() not in reserved_header_names} + parsed_url = urlparse(self.url) + next_call_depth = str(self.workflow_call_depth + 1) + headers[WORKFLOW_CALL_DEPTH_HEADER] = next_call_depth + headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] = build_workflow_call_depth_signature( + secret_key=self._http_request_config.secret_key, + method=self.method, + path=parsed_url.path, + depth=next_call_depth, + ) + if self.auth.type == "api-key": if self.auth.config is None: raise AuthorizationConfigError("self.authorization config is required") @@ -388,6 +424,12 @@ class Executor: return self._validate_and_parse_response(response) def to_log(self): + """Render the request in raw HTTP form for node logs. + + Internal workflow call-depth headers and authentication headers are + masked so operational logs remain useful without exposing replayable or + credential-bearing values. + """ url_parts = urlparse(self.url) path = url_parts.path or "/" @@ -410,6 +452,12 @@ class Executor: if body.type == "form-data": headers["Content-Type"] = f"multipart/form-data; boundary={boundary}" for k, v in headers.items(): + if k.lower() in { + WORKFLOW_CALL_DEPTH_HEADER.lower(), + WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER.lower(), + }: + raw += f"{k}: [internal]\r\n" + continue if self.auth.type == "api-key": authorization_header = "Authorization" if self.auth.config and self.auth.config.header: diff --git a/api/dify_graph/nodes/http_request/node.py b/api/dify_graph/nodes/http_request/node.py index 486ae241ee..2916562d0a 100644 --- a/api/dify_graph/nodes/http_request/node.py +++ b/api/dify_graph/nodes/http_request/node.py @@ -101,6 +101,7 @@ class HttpRequestNode(Node[HttpRequestNodeData]): timeout=self._get_request_timeout(self.node_data), variable_pool=self.graph_runtime_state.variable_pool, http_request_config=self._http_request_config, + workflow_call_depth=self.workflow_call_depth, ssl_verify=self.node_data.ssl_verify, http_client=self._http_client, file_manager=self._file_manager, diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index f3aedafac9..c2888a3a8d 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -402,6 +402,7 @@ class RagPipelineService: max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE, ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY, ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES, + secret_key=dify_config.SECRET_KEY, ) } default_config = node_class.get_default_config(filters=filters) @@ -435,6 +436,7 @@ class RagPipelineService: max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE, ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY, ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES, + secret_key=dify_config.SECRET_KEY, ) default_config = node_class.get_default_config(filters=final_filters or None) if not default_config: diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index 3c1a4cc747..b75606ef40 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -1,3 +1,4 @@ +import hmac import json import logging import mimetypes @@ -23,6 +24,8 @@ from core.workflow.nodes.trigger_webhook.entities import ( WebhookData, WebhookParameter, ) +from dify_graph.call_depth import build_workflow_call_depth_signature +from dify_graph.constants import WORKFLOW_CALL_DEPTH_HEADER, WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.file.models import FileTransferMethod from dify_graph.variables.types import ArrayValidation, SegmentType @@ -57,10 +60,49 @@ class WebhookService: @staticmethod def _sanitize_key(key: str) -> str: """Normalize external keys (headers/params) to workflow-safe variables.""" - if not isinstance(key, str): - return key return key.replace("-", "_") + @classmethod + def extract_workflow_call_depth(cls, headers: Mapping[str, Any]) -> int: + """Extract the reserved workflow recursion depth header. + + The depth header is only trusted when accompanied by a valid HMAC + signature for the current request method/path/depth tuple. Header lookup + accepts either canonical or lower-case names because upstream proxies may + normalize casing. Invalid, missing, unsigned, or negative values are + treated as external requests and therefore fall back to depth 0. + """ + raw_value = headers.get(WORKFLOW_CALL_DEPTH_HEADER) + if raw_value is None: + raw_value = headers.get(WORKFLOW_CALL_DEPTH_HEADER.lower()) + if raw_value is None: + return 0 + + raw_signature = headers.get(WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER) + if raw_signature is None: + raw_signature = headers.get(WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER.lower()) + if raw_signature is None: + return 0 + + normalized_value = str(raw_value).strip() + # The receiver recomputes the signature from the current request context + # instead of trusting the sender's path or method directly. + expected_signature = build_workflow_call_depth_signature( + secret_key=dify_config.SECRET_KEY, + method=request.method, + path=request.path, + depth=normalized_value, + ) + if not hmac.compare_digest(str(raw_signature).strip(), expected_signature): + return 0 + + try: + call_depth = int(normalized_value) + except (TypeError, ValueError): + return 0 + + return max(call_depth, 0) + @classmethod def get_webhook_trigger_and_workflow( cls, webhook_id: str, is_debug: bool = False @@ -764,12 +806,14 @@ class WebhookService: workflow_inputs = cls.build_workflow_inputs(webhook_data) # Create trigger data + trigger_call_depth = cls.extract_workflow_call_depth(dict(request.headers)) trigger_data = WebhookTriggerData( app_id=webhook_trigger.app_id, workflow_id=workflow.id, root_node_id=webhook_trigger.node_id, # Start from the webhook node inputs=workflow_inputs, tenant_id=webhook_trigger.tenant_id, + call_depth=trigger_call_depth, ) end_user = EndUserService.get_or_create_end_user_by_type( diff --git a/api/services/workflow/entities.py b/api/services/workflow/entities.py index 2af0d1fd90..d1f91975af 100644 --- a/api/services/workflow/entities.py +++ b/api/services/workflow/entities.py @@ -26,7 +26,14 @@ class TriggerMetadata(BaseModel): class TriggerData(BaseModel): - """Base trigger data model for async workflow execution""" + """Base trigger data model for async workflow execution. + + `call_depth` tracks only the current workflow-to-workflow HTTP recursion + depth. It starts at 0 for external triggers and increments once per nested + webhook-triggered workflow call. For webhook triggers, the value is derived + from the reserved depth headers after `WebhookService.extract_workflow_call_depth` + validates the signature against the inbound request context. + """ app_id: str tenant_id: str @@ -34,6 +41,7 @@ class TriggerData(BaseModel): root_node_id: str inputs: Mapping[str, Any] files: Sequence[Mapping[str, Any]] = Field(default_factory=list) + call_depth: int = 0 trigger_type: AppTriggerType trigger_from: WorkflowRunTriggeredFrom trigger_metadata: TriggerMetadata | None = None diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index e13cdd5f27..8067f0ab16 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -638,6 +638,7 @@ class WorkflowService: max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE, ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY, ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES, + secret_key=dify_config.SECRET_KEY, ) } default_config = node_class.get_default_config(filters=filters) @@ -673,6 +674,7 @@ class WorkflowService: max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE, ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY, ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES, + secret_key=dify_config.SECRET_KEY, ) default_config = node_class.get_default_config(filters=resolved_filters or None) if not default_config: diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index d247cf5cf7..e349c8d6ac 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -164,7 +164,7 @@ def _execute_workflow_common( args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=False, - call_depth=0, + call_depth=trigger_data.call_depth, triggered_from=trigger_data.trigger_from, root_node_id=trigger_data.root_node_id, graph_engine_layers=[ diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py index 3f1dd14569..ca85ee71e9 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py @@ -124,6 +124,7 @@ class TestWorkflowBasedAppRunner: node_id="node-1", user_inputs={}, graph_runtime_state=graph_runtime_state, + call_depth=3, node_type_filter_key="iteration_id", node_type_label="iteration", ) @@ -131,6 +132,35 @@ class TestWorkflowBasedAppRunner: assert graph is not None assert variable_pool is graph_runtime_state.variable_pool + def test_init_graph_passes_call_depth_into_node_factory(self, monkeypatch): + runner = WorkflowBasedAppRunner(queue_manager=SimpleNamespace(), app_id="app") + runtime_state = GraphRuntimeState( + variable_pool=VariablePool(system_variables=SystemVariable.default()), + start_at=0.0, + ) + captured: dict[str, int] = {} + + class _FakeNodeFactory: + def __init__(self, *, graph_init_params, graph_runtime_state): + captured["call_depth"] = graph_init_params.call_depth + + monkeypatch.setattr("core.app.apps.workflow_app_runner.DifyNodeFactory", _FakeNodeFactory) + monkeypatch.setattr( + "core.app.apps.workflow_app_runner.Graph.init", + lambda **kwargs: SimpleNamespace(), + ) + + graph = runner._init_graph( + graph_config={"nodes": [{"id": "start", "data": {"type": "start", "version": "1"}}], "edges": []}, + graph_runtime_state=runtime_state, + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=4, + ) + + assert graph is not None + assert captured["call_depth"] == 4 + def test_handle_graph_run_events_and_pause_notifications(self, monkeypatch): published: list[object] = [] diff --git a/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py b/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py index cea7195417..dcfce1d566 100644 --- a/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py +++ b/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py @@ -2,6 +2,8 @@ import pytest from configs import dify_config from core.helper.ssrf_proxy import ssrf_proxy +from dify_graph.call_depth import build_workflow_call_depth_signature +from dify_graph.constants import WORKFLOW_CALL_DEPTH_HEADER, WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER from dify_graph.file.file_manager import file_manager from dify_graph.nodes.http_request import ( BodyData, @@ -24,7 +26,9 @@ HTTP_REQUEST_CONFIG = HttpRequestNodeConfig( max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE, ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY, ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES, + secret_key=dify_config.SECRET_KEY, ) +TEST_SECRET_KEY = "test-secret-key" def test_executor_with_json_body_and_number_variable(): @@ -661,3 +665,275 @@ def test_executor_with_json_body_preserves_numbers_and_strings(): assert executor.json["count"] == 42 assert executor.json["id"] == "abc-123" + + +def test_executor_propagates_workflow_call_depth_header(): + variable_pool = VariablePool( + system_variables=SystemVariable.default(), + user_inputs={}, + ) + node_data = HttpRequestNodeData( + title="Depth propagation", + method="get", + url="http://localhost:5001/triggers/webhook/test-webhook", + authorization=HttpRequestNodeAuthorization(type="no-auth"), + headers="X-Test: value", + params="", + ) + + executor = Executor( + node_data=node_data, + timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30), + http_request_config=HttpRequestNodeConfig( + max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout, + max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout, + max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout, + max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size, + max_text_size=HTTP_REQUEST_CONFIG.max_text_size, + ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify, + ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries, + secret_key=TEST_SECRET_KEY, + ), + variable_pool=variable_pool, + workflow_call_depth=2, + http_client=ssrf_proxy, + file_manager=file_manager, + ) + + headers = executor._assembling_headers() + + assert headers["X-Test"] == "value" + assert headers[WORKFLOW_CALL_DEPTH_HEADER] == "3" + assert headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] == build_workflow_call_depth_signature( + secret_key=TEST_SECRET_KEY, + method="get", + path="/triggers/webhook/test-webhook", + depth="3", + ) + + +def test_executor_replaces_lowercase_reserved_headers_for_internal_webhook_target(): + variable_pool = VariablePool( + system_variables=SystemVariable.default(), + user_inputs={}, + ) + node_data = HttpRequestNodeData( + title="Reserved header replacement", + method="get", + url="http://localhost:5001/triggers/webhook/test-webhook", + authorization=HttpRequestNodeAuthorization(type="no-auth"), + headers=( + "x-dify-workflow-call-depth: user-value\n" + "x-dify-workflow-call-depth-signature: user-signature\n" + "X-Test: value" + ), + params="", + ) + + executor = Executor( + node_data=node_data, + timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30), + http_request_config=HttpRequestNodeConfig( + max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout, + max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout, + max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout, + max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size, + max_text_size=HTTP_REQUEST_CONFIG.max_text_size, + ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify, + ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries, + secret_key=TEST_SECRET_KEY, + ), + variable_pool=variable_pool, + workflow_call_depth=2, + http_client=ssrf_proxy, + file_manager=file_manager, + ) + + headers = executor._assembling_headers() + + assert headers["X-Test"] == "value" + assert headers[WORKFLOW_CALL_DEPTH_HEADER] == "3" + assert headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] == build_workflow_call_depth_signature( + secret_key=TEST_SECRET_KEY, + method="get", + path="/triggers/webhook/test-webhook", + depth="3", + ) + assert "x-dify-workflow-call-depth" not in headers + assert "x-dify-workflow-call-depth-signature" not in headers + + +def test_executor_propagates_workflow_call_depth_with_empty_secret(): + variable_pool = VariablePool( + system_variables=SystemVariable.default(), + user_inputs={}, + ) + node_data = HttpRequestNodeData( + title="Depth propagation with empty secret", + method="get", + url="http://localhost:5001/triggers/webhook/test-webhook", + authorization=HttpRequestNodeAuthorization(type="no-auth"), + headers="X-Test: value", + params="", + ) + + executor = Executor( + node_data=node_data, + timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30), + http_request_config=HttpRequestNodeConfig( + max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout, + max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout, + max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout, + max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size, + max_text_size=HTTP_REQUEST_CONFIG.max_text_size, + ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify, + ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries, + secret_key="", + ), + variable_pool=variable_pool, + workflow_call_depth=2, + http_client=ssrf_proxy, + file_manager=file_manager, + ) + + headers = executor._assembling_headers() + + assert headers[WORKFLOW_CALL_DEPTH_HEADER] == "3" + assert headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] == build_workflow_call_depth_signature( + secret_key="", + method="get", + path="/triggers/webhook/test-webhook", + depth="3", + ) + + +def test_executor_log_masks_internal_depth_headers(): + variable_pool = VariablePool( + system_variables=SystemVariable.default(), + user_inputs={}, + ) + node_data = HttpRequestNodeData( + title="Depth propagation", + method="get", + url="http://localhost:5001/triggers/webhook/test-webhook", + authorization=HttpRequestNodeAuthorization(type="no-auth"), + headers="X-Test: value", + params="", + ) + + executor = Executor( + node_data=node_data, + timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30), + http_request_config=HttpRequestNodeConfig( + max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout, + max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout, + max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout, + max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size, + max_text_size=HTTP_REQUEST_CONFIG.max_text_size, + ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify, + ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries, + secret_key=TEST_SECRET_KEY, + ), + variable_pool=variable_pool, + workflow_call_depth=2, + http_client=ssrf_proxy, + file_manager=file_manager, + ) + + raw_log = executor.to_log() + + assert f"{WORKFLOW_CALL_DEPTH_HEADER}: [internal]" in raw_log + assert f"{WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER}: [internal]" in raw_log + assert "X-Dify-Workflow-Call-Depth: 3" not in raw_log + + +def test_executor_log_masks_reserved_headers_regardless_of_case(): + variable_pool = VariablePool( + system_variables=SystemVariable.default(), + user_inputs={}, + ) + node_data = HttpRequestNodeData( + title="Reserved header replacement", + method="get", + url="http://localhost:5001/triggers/webhook/test-webhook", + authorization=HttpRequestNodeAuthorization(type="no-auth"), + headers=( + "x-dify-workflow-call-depth: user-value\n" + "x-dify-workflow-call-depth-signature: user-signature\n" + "X-Test: value" + ), + params="", + ) + + executor = Executor( + node_data=node_data, + timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30), + http_request_config=HttpRequestNodeConfig( + max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout, + max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout, + max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout, + max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size, + max_text_size=HTTP_REQUEST_CONFIG.max_text_size, + ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify, + ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries, + secret_key=TEST_SECRET_KEY, + ), + variable_pool=variable_pool, + workflow_call_depth=2, + http_client=ssrf_proxy, + file_manager=file_manager, + ) + + raw_log = executor.to_log() + + assert "x-dify-workflow-call-depth: [internal]" not in raw_log + assert "x-dify-workflow-call-depth-signature: [internal]" not in raw_log + assert f"{WORKFLOW_CALL_DEPTH_HEADER}: [internal]" in raw_log + assert f"{WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER}: [internal]" in raw_log + assert "user-signature" not in raw_log + assert "user-value" not in raw_log + + +def test_executor_propagates_workflow_call_depth_to_arbitrary_target_with_secret(): + variable_pool = VariablePool( + system_variables=SystemVariable.default(), + user_inputs={}, + ) + node_data = HttpRequestNodeData( + title="External target", + method="get", + url="https://api.example.com/data", + authorization=HttpRequestNodeAuthorization(type="no-auth"), + headers="X-Test: value", + params="", + ) + + executor = Executor( + node_data=node_data, + timeout=HttpRequestNodeTimeout(connect=10, read=30, write=30), + http_request_config=HttpRequestNodeConfig( + max_connect_timeout=HTTP_REQUEST_CONFIG.max_connect_timeout, + max_read_timeout=HTTP_REQUEST_CONFIG.max_read_timeout, + max_write_timeout=HTTP_REQUEST_CONFIG.max_write_timeout, + max_binary_size=HTTP_REQUEST_CONFIG.max_binary_size, + max_text_size=HTTP_REQUEST_CONFIG.max_text_size, + ssl_verify=HTTP_REQUEST_CONFIG.ssl_verify, + ssrf_default_max_retries=HTTP_REQUEST_CONFIG.ssrf_default_max_retries, + secret_key=TEST_SECRET_KEY, + ), + variable_pool=variable_pool, + workflow_call_depth=2, + http_client=ssrf_proxy, + file_manager=file_manager, + ) + + headers = executor._assembling_headers() + + assert headers["X-Test"] == "value" + assert headers[WORKFLOW_CALL_DEPTH_HEADER] == "3" + assert headers[WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER] == build_workflow_call_depth_signature( + secret_key=TEST_SECRET_KEY, + method="get", + path="/data", + depth="3", + ) diff --git a/api/tests/unit_tests/core/workflow/test_workflow_entry_redis_channel.py b/api/tests/unit_tests/core/workflow/test_workflow_entry_redis_channel.py index 9969c953e8..3fb4556445 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_entry_redis_channel.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_entry_redis_channel.py @@ -2,6 +2,8 @@ from unittest.mock import MagicMock, patch +import pytest + from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom from core.workflow.workflow_entry import WorkflowEntry from dify_graph.graph_engine.command_channels.redis_channel import RedisChannel @@ -134,3 +136,27 @@ class TestWorkflowEntryRedisChannel: assert len(events) == 2 assert events[0] == mock_event1 assert events[1] == mock_event2 + + def test_workflow_entry_rejects_depth_over_limit(self): + mock_graph = MagicMock() + mock_variable_pool = MagicMock(spec=VariablePool) + mock_graph_runtime_state = MagicMock(spec=GraphRuntimeState) + mock_graph_runtime_state.variable_pool = mock_variable_pool + + with ( + patch("core.workflow.workflow_entry.dify_config.WORKFLOW_CALL_MAX_DEPTH", 1), + pytest.raises(ValueError, match="Max workflow call depth 1 reached."), + ): + WorkflowEntry( + tenant_id="test-tenant", + app_id="test-app", + workflow_id="test-workflow", + graph_config={"nodes": [], "edges": []}, + graph=mock_graph, + user_id="test-user", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=2, + variable_pool=mock_variable_pool, + graph_runtime_state=mock_graph_runtime_state, + ) diff --git a/api/tests/unit_tests/services/test_webhook_service.py b/api/tests/unit_tests/services/test_webhook_service.py index ffdcc046f9..ed0fa1b48f 100644 --- a/api/tests/unit_tests/services/test_webhook_service.py +++ b/api/tests/unit_tests/services/test_webhook_service.py @@ -5,8 +5,13 @@ import pytest from flask import Flask from werkzeug.datastructures import FileStorage +from configs import dify_config +from dify_graph.call_depth import build_workflow_call_depth_signature +from dify_graph.constants import WORKFLOW_CALL_DEPTH_HEADER, WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER from services.trigger.webhook_service import WebhookService +TEST_SECRET_KEY = "test-secret-key" + class TestWebhookServiceUnit: """Unit tests for WebhookService focusing on business logic without database dependencies.""" @@ -559,3 +564,206 @@ class TestWebhookServiceUnit: result = _prepare_webhook_execution("test_webhook", is_debug=True) assert result == (mock_trigger, mock_workflow, mock_config, mock_data, None) + + def test_extract_workflow_call_depth_defaults_to_zero_for_invalid_values(self): + assert WebhookService.extract_workflow_call_depth({}) == 0 + assert WebhookService.extract_workflow_call_depth({WORKFLOW_CALL_DEPTH_HEADER: "abc"}) == 0 + assert WebhookService.extract_workflow_call_depth({WORKFLOW_CALL_DEPTH_HEADER.lower(): "-1"}) == 0 + + def test_extract_workflow_call_depth_ignores_unsigned_external_header(self): + assert WebhookService.extract_workflow_call_depth({WORKFLOW_CALL_DEPTH_HEADER: "5"}) == 0 + + def test_extract_workflow_call_depth_honors_signed_internal_header(self): + app = Flask(__name__) + + with ( + patch("services.trigger.webhook_service.dify_config.SECRET_KEY", TEST_SECRET_KEY), + app.test_request_context("/triggers/webhook/test-webhook", method="POST"), + ): + signature = build_workflow_call_depth_signature( + secret_key=TEST_SECRET_KEY, + method="POST", + path="/triggers/webhook/test-webhook", + depth="4", + ) + + assert ( + WebhookService.extract_workflow_call_depth( + { + WORKFLOW_CALL_DEPTH_HEADER: "4", + WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: signature, + } + ) + == 4 + ) + + def test_extract_workflow_call_depth_rejects_signature_for_other_path(self): + app = Flask(__name__) + + with ( + patch("services.trigger.webhook_service.dify_config.SECRET_KEY", TEST_SECRET_KEY), + app.test_request_context("/triggers/webhook/right-webhook", method="POST"), + ): + wrong_signature = build_workflow_call_depth_signature( + secret_key=TEST_SECRET_KEY, + method="POST", + path="/triggers/webhook/wrong-webhook", + depth="4", + ) + + assert ( + WebhookService.extract_workflow_call_depth( + { + WORKFLOW_CALL_DEPTH_HEADER: "4", + WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: wrong_signature, + } + ) + == 0 + ) + + @patch("services.trigger.webhook_service.dify_config") + def test_extract_workflow_call_depth_honors_signature_with_empty_secret(self, mock_config): + app = Flask(__name__) + mock_config.SECRET_KEY = "" + + with app.test_request_context("/triggers/webhook/test-webhook", method="POST"): + signature = build_workflow_call_depth_signature( + secret_key="", + method="POST", + path="/triggers/webhook/test-webhook", + depth="4", + ) + + assert ( + WebhookService.extract_workflow_call_depth( + { + WORKFLOW_CALL_DEPTH_HEADER: "4", + WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: signature, + } + ) + == 4 + ) + + @patch("services.trigger.webhook_service.QuotaType") + @patch("services.trigger.webhook_service.EndUserService") + @patch("services.trigger.webhook_service.AsyncWorkflowService") + @patch("services.trigger.webhook_service.Session") + @patch("services.trigger.webhook_service.db") + def test_trigger_workflow_execution_preserves_header_depth( + self, + mock_db, + mock_session, + mock_async_workflow_service, + mock_end_user_service, + mock_quota_type, + ): + app = Flask(__name__) + webhook_trigger = MagicMock(app_id="app", tenant_id="tenant", node_id="root", webhook_id="webhook") + workflow = MagicMock(id="workflow") + mock_end_user = MagicMock() + mock_end_user_service.get_or_create_end_user_by_type.return_value = mock_end_user + mock_db.engine = MagicMock() + mock_session.return_value.__enter__.return_value = MagicMock() + signature = build_workflow_call_depth_signature( + secret_key=TEST_SECRET_KEY, + method="POST", + path="/triggers/webhook/test-webhook", + depth="4", + ) + + with ( + patch("services.trigger.webhook_service.dify_config.SECRET_KEY", TEST_SECRET_KEY), + app.test_request_context( + "/triggers/webhook/test-webhook", + method="POST", + headers={ + WORKFLOW_CALL_DEPTH_HEADER: "4", + WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: signature, + }, + ), + ): + WebhookService.trigger_workflow_execution( + webhook_trigger, + {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}}, + workflow, + ) + + trigger_data = mock_async_workflow_service.trigger_workflow_async.call_args.args[2] + assert trigger_data.call_depth == 4 + + @patch("services.trigger.webhook_service.QuotaType") + @patch("services.trigger.webhook_service.EndUserService") + @patch("services.trigger.webhook_service.AsyncWorkflowService") + @patch("services.trigger.webhook_service.Session") + @patch("services.trigger.webhook_service.db") + def test_trigger_workflow_execution_ignores_spoofed_external_depth( + self, + mock_db, + mock_session, + mock_async_workflow_service, + mock_end_user_service, + mock_quota_type, + ): + app = Flask(__name__) + webhook_trigger = MagicMock(app_id="app", tenant_id="tenant", node_id="root", webhook_id="webhook") + workflow = MagicMock(id="workflow") + mock_end_user_service.get_or_create_end_user_by_type.return_value = MagicMock() + mock_db.engine = MagicMock() + mock_session.return_value.__enter__.return_value = MagicMock() + + with app.test_request_context( + "/triggers/webhook/test-webhook", + method="POST", + headers={WORKFLOW_CALL_DEPTH_HEADER: "5"}, + ): + WebhookService.trigger_workflow_execution( + webhook_trigger, + {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}}, + workflow, + ) + + trigger_data = mock_async_workflow_service.trigger_workflow_async.call_args.args[2] + assert trigger_data.call_depth == 0 + + @patch("services.trigger.webhook_service.QuotaType") + @patch("services.trigger.webhook_service.EndUserService") + @patch("services.trigger.webhook_service.AsyncWorkflowService") + @patch("services.trigger.webhook_service.Session") + @patch("services.trigger.webhook_service.db") + def test_trigger_workflow_execution_rejects_signature_captured_from_non_webhook_request( + self, + mock_db, + mock_session, + mock_async_workflow_service, + mock_end_user_service, + mock_quota_type, + ): + app = Flask(__name__) + webhook_trigger = MagicMock(app_id="app", tenant_id="tenant", node_id="root", webhook_id="webhook") + workflow = MagicMock(id="workflow") + mock_end_user_service.get_or_create_end_user_by_type.return_value = MagicMock() + mock_db.engine = MagicMock() + mock_session.return_value.__enter__.return_value = MagicMock() + captured_signature = build_workflow_call_depth_signature( + secret_key=dify_config.SECRET_KEY, + method="GET", + path="/v1/external-endpoint", + depth="5", + ) + + with app.test_request_context( + "/triggers/webhook/test-webhook", + method="POST", + headers={ + WORKFLOW_CALL_DEPTH_HEADER: "5", + WORKFLOW_CALL_DEPTH_SIGNATURE_HEADER: captured_signature, + }, + ): + WebhookService.trigger_workflow_execution( + webhook_trigger, + {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}}, + workflow, + ) + + trigger_data = mock_async_workflow_service.trigger_workflow_async.call_args.args[2] + assert trigger_data.call_depth == 0 diff --git a/api/tests/unit_tests/tasks/test_async_workflow_tasks.py b/api/tests/unit_tests/tasks/test_async_workflow_tasks.py index 0920f1482c..6cd928fc95 100644 --- a/api/tests/unit_tests/tasks/test_async_workflow_tasks.py +++ b/api/tests/unit_tests/tasks/test_async_workflow_tasks.py @@ -1,3 +1,5 @@ +from unittest.mock import MagicMock, patch + from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY from services.workflow.entities import WebhookTriggerData from tasks import async_workflow_tasks @@ -16,3 +18,41 @@ def test_build_generator_args_sets_skip_flag_for_webhook(): assert args[SKIP_PREPARE_USER_INPUTS_KEY] is True assert args["inputs"]["webhook_data"]["body"]["foo"] == "bar" + + +def test_execute_workflow_common_uses_trigger_call_depth(): + trigger_data = WebhookTriggerData( + app_id="app", + tenant_id="tenant", + workflow_id="workflow", + root_node_id="node", + inputs={"webhook_data": {"body": {}}}, + call_depth=3, + ) + trigger_log = MagicMock( + id="log-id", + app_id="app", + workflow_id="workflow", + trigger_data=trigger_data.model_dump_json(), + ) + trigger_log_repo = MagicMock() + trigger_log_repo.get_by_id.return_value = trigger_log + session = MagicMock() + session.scalar.side_effect = [MagicMock(), MagicMock()] + session_context = MagicMock() + session_context.__enter__.return_value = session + workflow_generator = MagicMock() + + with ( + patch.object(async_workflow_tasks.session_factory, "create_session", return_value=session_context), + patch.object(async_workflow_tasks, "SQLAlchemyWorkflowTriggerLogRepository", return_value=trigger_log_repo), + patch.object(async_workflow_tasks, "_get_user", return_value=MagicMock()), + patch.object(async_workflow_tasks, "WorkflowAppGenerator", return_value=workflow_generator), + ): + async_workflow_tasks._execute_workflow_common( + async_workflow_tasks.WorkflowTaskData(workflow_trigger_log_id="log-id"), + MagicMock(), + MagicMock(), + ) + + assert workflow_generator.generate.call_args.kwargs["call_depth"] == 3