test: migrate webhook service additional mock tests to testcontainers (#35199)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
James 2026-04-15 10:14:36 +02:00 committed by GitHub
parent 5542329554
commit 98897a5379
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 510 additions and 382 deletions

View File

@ -0,0 +1,507 @@
from __future__ import annotations
import json
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from uuid import uuid4
import pytest
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
from models.enums import AppTriggerStatus, AppTriggerType
from models.model import App
from models.trigger import AppTrigger, WorkflowWebhookTrigger
from models.workflow import Workflow
from services.errors.app import QuotaExceededError
from services.trigger.webhook_service import WebhookService
class WebhookServiceRelationshipFactory:
@staticmethod
def create_account_and_tenant(db_session_with_containers: Session) -> tuple[Account, Tenant]:
account = Account(
name=f"Account {uuid4()}",
email=f"webhook-{uuid4()}@example.com",
password="hashed-password",
password_salt="salt",
interface_language="en-US",
timezone="UTC",
)
db_session_with_containers.add(account)
db_session_with_containers.commit()
tenant = Tenant(name=f"Tenant {uuid4()}", plan="basic", status="normal")
db_session_with_containers.add(tenant)
db_session_with_containers.commit()
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER,
current=True,
)
db_session_with_containers.add(join)
db_session_with_containers.commit()
account.current_tenant = tenant
return account, tenant
@staticmethod
def create_app(db_session_with_containers: Session, tenant: Tenant, account: Account) -> App:
app = App(
tenant_id=tenant.id,
name=f"Webhook App {uuid4()}",
description="",
mode="workflow",
icon_type="emoji",
icon="bot",
icon_background="#FFFFFF",
enable_site=False,
enable_api=True,
api_rpm=100,
api_rph=100,
is_demo=False,
is_public=False,
is_universal=False,
created_by=account.id,
updated_by=account.id,
)
db_session_with_containers.add(app)
db_session_with_containers.commit()
return app
@staticmethod
def create_workflow(
db_session_with_containers: Session,
*,
app: App,
account: Account,
node_ids: list[str],
version: str,
) -> Workflow:
graph = {
"nodes": [
{
"id": node_id,
"data": {
"type": TRIGGER_WEBHOOK_NODE_TYPE,
"title": f"Webhook {node_id}",
"method": "post",
"content_type": "application/json",
"headers": [],
"params": [],
"body": [],
"status_code": 200,
"response_body": '{"status": "ok"}',
"timeout": 30,
},
}
for node_id in node_ids
],
"edges": [],
}
workflow = Workflow(
tenant_id=app.tenant_id,
app_id=app.id,
type="workflow",
graph=json.dumps(graph),
features=json.dumps({}),
created_by=account.id,
updated_by=account.id,
environment_variables=[],
conversation_variables=[],
version=version,
)
db_session_with_containers.add(workflow)
db_session_with_containers.commit()
return workflow
@staticmethod
def create_webhook_trigger(
db_session_with_containers: Session,
*,
app: App,
account: Account,
node_id: str,
webhook_id: str | None = None,
) -> WorkflowWebhookTrigger:
webhook_trigger = WorkflowWebhookTrigger(
app_id=app.id,
node_id=node_id,
tenant_id=app.tenant_id,
webhook_id=webhook_id or uuid4().hex[:24],
created_by=account.id,
)
db_session_with_containers.add(webhook_trigger)
db_session_with_containers.commit()
return webhook_trigger
@staticmethod
def create_app_trigger(
db_session_with_containers: Session,
*,
app: App,
node_id: str,
status: AppTriggerStatus,
) -> AppTrigger:
app_trigger = AppTrigger(
tenant_id=app.tenant_id,
app_id=app.id,
node_id=node_id,
trigger_type=AppTriggerType.TRIGGER_WEBHOOK,
provider_name="webhook",
title=f"Webhook {node_id}",
status=status,
)
db_session_with_containers.add(app_trigger)
db_session_with_containers.commit()
return app_trigger
class TestWebhookServiceLookupWithContainers:
def test_get_webhook_trigger_and_workflow_raises_when_app_trigger_missing(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-14.001"
)
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="node-1"
)
with pytest.raises(ValueError, match="App trigger not found"):
WebhookService.get_webhook_trigger_and_workflow(webhook_trigger.webhook_id)
def test_get_webhook_trigger_and_workflow_raises_when_app_trigger_rate_limited(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-14.001"
)
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="node-1"
)
factory.create_app_trigger(
db_session_with_containers, app=app, node_id="node-1", status=AppTriggerStatus.RATE_LIMITED
)
with pytest.raises(ValueError, match="rate limited"):
WebhookService.get_webhook_trigger_and_workflow(webhook_trigger.webhook_id)
def test_get_webhook_trigger_and_workflow_raises_when_app_trigger_disabled(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-14.001"
)
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="node-1"
)
factory.create_app_trigger(
db_session_with_containers, app=app, node_id="node-1", status=AppTriggerStatus.DISABLED
)
with pytest.raises(ValueError, match="disabled"):
WebhookService.get_webhook_trigger_and_workflow(webhook_trigger.webhook_id)
def test_get_webhook_trigger_and_workflow_raises_when_workflow_missing(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="node-1"
)
factory.create_app_trigger(
db_session_with_containers, app=app, node_id="node-1", status=AppTriggerStatus.ENABLED
)
with pytest.raises(ValueError, match="Workflow not found"):
WebhookService.get_webhook_trigger_and_workflow(webhook_trigger.webhook_id)
def test_get_webhook_trigger_and_workflow_returns_debug_draft_workflow(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
factory.create_workflow(
db_session_with_containers,
app=app,
account=account,
node_ids=["published-node"],
version="2026-04-14.001",
)
draft_workflow = factory.create_workflow(
db_session_with_containers,
app=app,
account=account,
node_ids=["debug-node"],
version=Workflow.VERSION_DRAFT,
)
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="debug-node"
)
got_trigger, got_workflow, got_node_config = WebhookService.get_webhook_trigger_and_workflow(
webhook_trigger.webhook_id,
is_debug=True,
)
assert got_trigger.id == webhook_trigger.id
assert got_workflow.id == draft_workflow.id
assert got_node_config["id"] == "debug-node"
class TestWebhookServiceTriggerExecutionWithContainers:
def test_trigger_workflow_execution_triggers_async_workflow_successfully(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
workflow = factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-14.001"
)
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="node-1"
)
end_user = SimpleNamespace(id=str(uuid4()))
webhook_data = {"body": {"value": 1}, "headers": {}, "query_params": {}, "files": {}, "method": "POST"}
with (
patch(
"services.trigger.webhook_service.EndUserService.get_or_create_end_user_by_type",
return_value=end_user,
),
patch("services.trigger.webhook_service.QuotaType.TRIGGER.consume") as mock_consume,
patch("services.trigger.webhook_service.AsyncWorkflowService.trigger_workflow_async") as mock_trigger,
):
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow)
mock_consume.assert_called_once_with(webhook_trigger.tenant_id)
mock_trigger.assert_called_once()
trigger_args = mock_trigger.call_args.args
assert trigger_args[1] is end_user
assert trigger_args[2].workflow_id == workflow.id
assert trigger_args[2].root_node_id == webhook_trigger.node_id
def test_trigger_workflow_execution_marks_tenant_rate_limited_when_quota_exceeded(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
workflow = factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-14.001"
)
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="node-1"
)
with (
patch(
"services.trigger.webhook_service.EndUserService.get_or_create_end_user_by_type",
return_value=SimpleNamespace(id=str(uuid4())),
),
patch(
"services.trigger.webhook_service.QuotaType.TRIGGER.consume",
side_effect=QuotaExceededError(feature="trigger", tenant_id=tenant.id, required=1),
),
patch(
"services.trigger.webhook_service.AppTriggerService.mark_tenant_triggers_rate_limited"
) as mock_mark_rate_limited,
):
with pytest.raises(QuotaExceededError):
WebhookService.trigger_workflow_execution(
webhook_trigger,
{"body": {}, "headers": {}, "query_params": {}, "files": {}, "method": "POST"},
workflow,
)
mock_mark_rate_limited.assert_called_once_with(tenant.id)
def test_trigger_workflow_execution_logs_and_reraises_unexpected_errors(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
workflow = factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-14.001"
)
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="node-1"
)
with (
patch(
"services.trigger.webhook_service.EndUserService.get_or_create_end_user_by_type",
side_effect=RuntimeError("boom"),
),
patch("services.trigger.webhook_service.logger.exception") as mock_logger_exception,
):
with pytest.raises(RuntimeError, match="boom"):
WebhookService.trigger_workflow_execution(
webhook_trigger,
{"body": {}, "headers": {}, "query_params": {}, "files": {}, "method": "POST"},
workflow,
)
mock_logger_exception.assert_called_once()
class TestWebhookServiceRelationshipSyncWithContainers:
def test_sync_webhook_relationships_raises_when_workflow_exceeds_node_limit(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
node_ids = [f"node-{index}" for index in range(WebhookService.MAX_WEBHOOK_NODES_PER_WORKFLOW + 1)]
workflow = factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=node_ids, version=Workflow.VERSION_DRAFT
)
with pytest.raises(ValueError, match="maximum webhook node limit"):
WebhookService.sync_webhook_relationships(app, workflow)
def test_sync_webhook_relationships_raises_when_lock_not_acquired(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
workflow = factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version=Workflow.VERSION_DRAFT
)
lock = MagicMock()
lock.acquire.return_value = False
with patch("services.trigger.webhook_service.redis_client.lock", return_value=lock):
with pytest.raises(RuntimeError, match="Failed to acquire lock"):
WebhookService.sync_webhook_relationships(app, workflow)
def test_sync_webhook_relationships_creates_missing_records_and_deletes_stale_records(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
stale_trigger = factory.create_webhook_trigger(
db_session_with_containers,
app=app,
account=account,
node_id="node-stale",
webhook_id="stale-webhook-id-000001",
)
stale_trigger_id = stale_trigger.id
workflow = factory.create_workflow(
db_session_with_containers,
app=app,
account=account,
node_ids=["node-new"],
version=Workflow.VERSION_DRAFT,
)
with patch(
"services.trigger.webhook_service.WebhookService.generate_webhook_id", return_value="new-webhook-id-000001"
):
WebhookService.sync_webhook_relationships(app, workflow)
db_session_with_containers.expire_all()
records = db_session_with_containers.scalars(
select(WorkflowWebhookTrigger).where(WorkflowWebhookTrigger.app_id == app.id)
).all()
assert [record.node_id for record in records] == ["node-new"]
assert records[0].webhook_id == "new-webhook-id-000001"
assert db_session_with_containers.get(WorkflowWebhookTrigger, stale_trigger_id) is None
def test_sync_webhook_relationships_sets_redis_cache_for_new_record(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
workflow = factory.create_workflow(
db_session_with_containers,
app=app,
account=account,
node_ids=["node-cache"],
version=Workflow.VERSION_DRAFT,
)
cache_key = f"{WebhookService.__WEBHOOK_NODE_CACHE_KEY__}:{app.id}:node-cache"
with patch(
"services.trigger.webhook_service.WebhookService.generate_webhook_id", return_value="cache-webhook-id-00001"
):
WebhookService.sync_webhook_relationships(app, workflow)
cached_payload = WebhookServiceRelationshipFactory._read_cache(cache_key)
assert cached_payload is not None
assert cached_payload["node_id"] == "node-cache"
assert cached_payload["webhook_id"] == "cache-webhook-id-00001"
def test_sync_webhook_relationships_logs_when_lock_release_fails(
self, db_session_with_containers: Session, flask_app_with_containers
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
workflow = factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=[], version=Workflow.VERSION_DRAFT
)
lock = MagicMock()
lock.acquire.return_value = True
lock.release.side_effect = RuntimeError("release failed")
with (
patch("services.trigger.webhook_service.redis_client.lock", return_value=lock),
patch("services.trigger.webhook_service.logger.exception") as mock_logger_exception,
):
WebhookService.sync_webhook_relationships(app, workflow)
mock_logger_exception.assert_called_once()
def _read_cache(cache_key: str) -> dict[str, str] | None:
from extensions.ext_redis import redis_client
cached = redis_client.get(cache_key)
if not cached:
return None
if isinstance(cached, bytes):
cached = cached.decode("utf-8")
return json.loads(cached)
WebhookServiceRelationshipFactory._read_cache = staticmethod(_read_cache)

View File

@ -1,5 +1,5 @@
from types import SimpleNamespace
from typing import Any, cast
from typing import Any
from unittest.mock import MagicMock
import pytest
@ -13,11 +13,6 @@ from core.workflow.nodes.trigger_webhook.entities import (
WebhookData,
WebhookParameter,
)
from models.enums import AppTriggerStatus
from models.model import App
from models.trigger import WorkflowWebhookTrigger
from models.workflow import Workflow
from services.errors.app import QuotaExceededError
from services.trigger import webhook_service as service_module
from services.trigger.webhook_service import WebhookService
@ -39,156 +34,13 @@ class _FakeQuery:
return self._result
class _SessionContext:
def __init__(self, session: Any) -> None:
self._session = session
def __enter__(self) -> Any:
return self._session
def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> bool:
return False
class _SessionmakerContext:
def __init__(self, session: Any) -> None:
self._session = session
def begin(self) -> "_SessionmakerContext":
return self
def __enter__(self) -> Any:
return self._session
def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> bool:
return False
@pytest.fixture
def flask_app() -> Flask:
return Flask(__name__)
def _patch_session(monkeypatch: pytest.MonkeyPatch, session: Any) -> None:
monkeypatch.setattr(service_module, "db", SimpleNamespace(engine=MagicMock(), session=MagicMock()))
monkeypatch.setattr(service_module, "Session", lambda *args, **kwargs: _SessionContext(session))
monkeypatch.setattr(service_module, "sessionmaker", lambda *args, **kwargs: _SessionmakerContext(session))
def _workflow_trigger(**kwargs: Any) -> WorkflowWebhookTrigger:
return cast(WorkflowWebhookTrigger, SimpleNamespace(**kwargs))
def _workflow(**kwargs: Any) -> Workflow:
return cast(Workflow, SimpleNamespace(**kwargs))
def _app(**kwargs: Any) -> App:
return cast(App, SimpleNamespace(**kwargs))
class TestWebhookServiceLookup:
def test_get_webhook_trigger_and_workflow_should_raise_when_webhook_not_found(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
fake_session = MagicMock()
fake_session.scalar.return_value = None
_patch_session(monkeypatch, fake_session)
with pytest.raises(ValueError, match="Webhook not found"):
WebhookService.get_webhook_trigger_and_workflow("webhook-1")
def test_get_webhook_trigger_and_workflow_should_raise_when_app_trigger_not_found(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1")
fake_session = MagicMock()
fake_session.scalar.side_effect = [webhook_trigger, None]
_patch_session(monkeypatch, fake_session)
with pytest.raises(ValueError, match="App trigger not found"):
WebhookService.get_webhook_trigger_and_workflow("webhook-1")
def test_get_webhook_trigger_and_workflow_should_raise_when_app_trigger_rate_limited(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1")
app_trigger = SimpleNamespace(status=AppTriggerStatus.RATE_LIMITED)
fake_session = MagicMock()
fake_session.scalar.side_effect = [webhook_trigger, app_trigger]
_patch_session(monkeypatch, fake_session)
with pytest.raises(ValueError, match="rate limited"):
WebhookService.get_webhook_trigger_and_workflow("webhook-1")
def test_get_webhook_trigger_and_workflow_should_raise_when_app_trigger_disabled(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1")
app_trigger = SimpleNamespace(status=AppTriggerStatus.DISABLED)
fake_session = MagicMock()
fake_session.scalar.side_effect = [webhook_trigger, app_trigger]
_patch_session(monkeypatch, fake_session)
with pytest.raises(ValueError, match="disabled"):
WebhookService.get_webhook_trigger_and_workflow("webhook-1")
def test_get_webhook_trigger_and_workflow_should_raise_when_workflow_not_found(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1")
app_trigger = SimpleNamespace(status=AppTriggerStatus.ENABLED)
fake_session = MagicMock()
fake_session.scalar.side_effect = [webhook_trigger, app_trigger, None]
_patch_session(monkeypatch, fake_session)
with pytest.raises(ValueError, match="Workflow not found"):
WebhookService.get_webhook_trigger_and_workflow("webhook-1")
def test_get_webhook_trigger_and_workflow_should_return_values_for_non_debug_mode(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1")
app_trigger = SimpleNamespace(status=AppTriggerStatus.ENABLED)
workflow = MagicMock()
workflow.get_node_config_by_id.return_value = {"data": {"key": "value"}}
fake_session = MagicMock()
fake_session.scalar.side_effect = [webhook_trigger, app_trigger, workflow]
_patch_session(monkeypatch, fake_session)
got_trigger, got_workflow, got_node_config = WebhookService.get_webhook_trigger_and_workflow("webhook-1")
assert got_trigger is webhook_trigger
assert got_workflow is workflow
assert got_node_config == {"data": {"key": "value"}}
def test_get_webhook_trigger_and_workflow_should_return_values_for_debug_mode(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1")
workflow = MagicMock()
workflow.get_node_config_by_id.return_value = {"data": {"mode": "debug"}}
fake_session = MagicMock()
fake_session.scalar.side_effect = [webhook_trigger, workflow]
_patch_session(monkeypatch, fake_session)
got_trigger, got_workflow, got_node_config = WebhookService.get_webhook_trigger_and_workflow(
"webhook-1",
is_debug=True,
)
assert got_trigger is webhook_trigger
assert got_workflow is workflow
assert got_node_config == {"data": {"mode": "debug"}}
def _workflow_trigger(**kwargs: Any) -> Any:
return SimpleNamespace(**kwargs)
class TestWebhookServiceExtractionFallbacks:
@ -420,237 +272,6 @@ class TestWebhookServiceValidationAndConversion:
assert result["webhook_body"] == {"b": 2}
class TestWebhookServiceExecutionAndSync:
def test_trigger_workflow_execution_should_trigger_async_workflow_successfully(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = _workflow_trigger(
app_id="app-1",
node_id="node-1",
tenant_id="tenant-1",
webhook_id="webhook-1",
)
workflow = _workflow(id="wf-1")
webhook_data = {"body": {"x": 1}}
session = MagicMock()
_patch_session(monkeypatch, session)
end_user = SimpleNamespace(id="end-user-1")
monkeypatch.setattr(
service_module.EndUserService,
"get_or_create_end_user_by_type",
MagicMock(return_value=end_user),
)
quota_type = SimpleNamespace(TRIGGER=SimpleNamespace(consume=MagicMock()))
monkeypatch.setattr(service_module, "QuotaType", quota_type)
trigger_async_mock = MagicMock()
monkeypatch.setattr(service_module.AsyncWorkflowService, "trigger_workflow_async", trigger_async_mock)
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow)
trigger_async_mock.assert_called_once()
def test_trigger_workflow_execution_should_mark_tenant_rate_limited_when_quota_exceeded(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = _workflow_trigger(
app_id="app-1",
node_id="node-1",
tenant_id="tenant-1",
webhook_id="webhook-1",
)
workflow = _workflow(id="wf-1")
session = MagicMock()
_patch_session(monkeypatch, session)
monkeypatch.setattr(
service_module.EndUserService,
"get_or_create_end_user_by_type",
MagicMock(return_value=SimpleNamespace(id="end-user-1")),
)
quota_type = SimpleNamespace(
TRIGGER=SimpleNamespace(
consume=MagicMock(side_effect=QuotaExceededError(feature="trigger", tenant_id="tenant-1", required=1))
)
)
monkeypatch.setattr(service_module, "QuotaType", quota_type)
mark_rate_limited_mock = MagicMock()
monkeypatch.setattr(
service_module.AppTriggerService, "mark_tenant_triggers_rate_limited", mark_rate_limited_mock
)
with pytest.raises(QuotaExceededError):
WebhookService.trigger_workflow_execution(webhook_trigger, {"body": {}}, workflow)
mark_rate_limited_mock.assert_called_once_with("tenant-1")
def test_trigger_workflow_execution_should_log_and_reraise_unexpected_errors(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = _workflow_trigger(
app_id="app-1",
node_id="node-1",
tenant_id="tenant-1",
webhook_id="webhook-1",
)
workflow = _workflow(id="wf-1")
session = MagicMock()
_patch_session(monkeypatch, session)
monkeypatch.setattr(
service_module.EndUserService,
"get_or_create_end_user_by_type",
MagicMock(side_effect=RuntimeError("boom")),
)
logger_exception_mock = MagicMock()
monkeypatch.setattr(service_module.logger, "exception", logger_exception_mock)
with pytest.raises(RuntimeError, match="boom"):
WebhookService.trigger_workflow_execution(webhook_trigger, {"body": {}}, workflow)
logger_exception_mock.assert_called_once()
def test_sync_webhook_relationships_should_raise_when_workflow_exceeds_node_limit(self) -> None:
app = _app(id="app-1", tenant_id="tenant-1", created_by="user-1")
workflow = _workflow(
walk_nodes=lambda _node_type: [
(f"node-{i}", {}) for i in range(WebhookService.MAX_WEBHOOK_NODES_PER_WORKFLOW + 1)
]
)
with pytest.raises(ValueError, match="maximum webhook node limit"):
WebhookService.sync_webhook_relationships(app, workflow)
def test_sync_webhook_relationships_should_raise_when_lock_not_acquired(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
app = _app(id="app-1", tenant_id="tenant-1", created_by="user-1")
workflow = _workflow(walk_nodes=lambda _node_type: [("node-1", {})])
lock = MagicMock()
lock.acquire.return_value = False
monkeypatch.setattr(service_module.redis_client, "get", MagicMock(return_value=None))
monkeypatch.setattr(service_module.redis_client, "lock", MagicMock(return_value=lock))
with pytest.raises(RuntimeError, match="Failed to acquire lock"):
WebhookService.sync_webhook_relationships(app, workflow)
def test_sync_webhook_relationships_should_create_missing_records_and_delete_stale_records(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
app = _app(id="app-1", tenant_id="tenant-1", created_by="user-1")
workflow = _workflow(walk_nodes=lambda _node_type: [("node-new", {})])
class _WorkflowWebhookTrigger:
app_id = "app_id"
tenant_id = "tenant_id"
webhook_id = "webhook_id"
node_id = "node_id"
def __init__(self, app_id: str, tenant_id: str, node_id: str, webhook_id: str, created_by: str) -> None:
self.id = None
self.app_id = app_id
self.tenant_id = tenant_id
self.node_id = node_id
self.webhook_id = webhook_id
self.created_by = created_by
class _Select:
def where(self, *args: Any, **kwargs: Any) -> "_Select":
return self
class _Session:
def __init__(self) -> None:
self.added: list[Any] = []
self.deleted: list[Any] = []
self.commit_count = 0
self.existing_records = [SimpleNamespace(node_id="node-stale")]
def scalars(self, _stmt: Any) -> Any:
return SimpleNamespace(all=lambda: self.existing_records)
def add(self, obj: Any) -> None:
self.added.append(obj)
def flush(self) -> None:
for idx, obj in enumerate(self.added, start=1):
if obj.id is None:
obj.id = f"rec-{idx}"
def commit(self) -> None:
self.commit_count += 1
def delete(self, obj: Any) -> None:
self.deleted.append(obj)
lock = MagicMock()
lock.acquire.return_value = True
lock.release.return_value = None
fake_session = _Session()
monkeypatch.setattr(service_module, "WorkflowWebhookTrigger", _WorkflowWebhookTrigger)
monkeypatch.setattr(service_module, "select", MagicMock(return_value=_Select()))
monkeypatch.setattr(service_module.redis_client, "get", MagicMock(return_value=None))
monkeypatch.setattr(service_module.redis_client, "lock", MagicMock(return_value=lock))
redis_set_mock = MagicMock()
redis_delete_mock = MagicMock()
monkeypatch.setattr(service_module.redis_client, "set", redis_set_mock)
monkeypatch.setattr(service_module.redis_client, "delete", redis_delete_mock)
monkeypatch.setattr(WebhookService, "generate_webhook_id", MagicMock(return_value="generated-webhook-id"))
_patch_session(monkeypatch, fake_session)
WebhookService.sync_webhook_relationships(app, workflow)
assert len(fake_session.added) == 1
assert len(fake_session.deleted) == 1
redis_set_mock.assert_called_once()
redis_delete_mock.assert_called_once()
lock.release.assert_called_once()
def test_sync_webhook_relationships_should_log_when_lock_release_fails(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
app = _app(id="app-1", tenant_id="tenant-1", created_by="user-1")
workflow = _workflow(walk_nodes=lambda _node_type: [])
class _Select:
def where(self, *args: Any, **kwargs: Any) -> "_Select":
return self
class _Session:
def scalars(self, _stmt: Any) -> Any:
return SimpleNamespace(all=lambda: [])
def commit(self) -> None:
return None
lock = MagicMock()
lock.acquire.return_value = True
lock.release.side_effect = RuntimeError("release failed")
logger_exception_mock = MagicMock()
monkeypatch.setattr(service_module, "select", MagicMock(return_value=_Select()))
monkeypatch.setattr(service_module.redis_client, "get", MagicMock(return_value=None))
monkeypatch.setattr(service_module.redis_client, "lock", MagicMock(return_value=lock))
monkeypatch.setattr(service_module.logger, "exception", logger_exception_mock)
_patch_session(monkeypatch, _Session())
WebhookService.sync_webhook_relationships(app, workflow)
assert logger_exception_mock.call_count == 1
class TestWebhookServiceUtilities:
def test_generate_webhook_response_should_fallback_when_response_body_is_not_json(self) -> None:
node_config = {"data": {"status_code": 200, "response_body": "{bad-json"}}