fix: test

This commit is contained in:
lyzno1 2025-10-30 19:49:08 +08:00
parent 3b37ae1b4e
commit 04bfa235a9
No known key found for this signature in database
1 changed files with 23 additions and 12 deletions

View File

@ -7,8 +7,9 @@ from faker import Faker
from flask import Flask
from werkzeug.datastructures import FileStorage
from models.enums import AppTriggerStatus, AppTriggerType
from models.model import App
from models.trigger import WorkflowWebhookTrigger
from models.trigger import AppTrigger, WorkflowWebhookTrigger
from models.workflow import Workflow
from services.account_service import AccountService, TenantService
from services.trigger.webhook_service import WebhookService
@ -134,6 +135,18 @@ class TestWebhookService:
created_by=account.id,
)
db_session_with_containers.add(webhook_trigger)
db_session_with_containers.flush()
# Create app trigger (required for non-debug mode)
app_trigger = AppTrigger(
tenant_id=tenant.id,
app_id=app.id,
node_id="webhook_node",
trigger_type=AppTriggerType.TRIGGER_WEBHOOK,
title="Test Webhook",
status=AppTriggerStatus.ENABLED,
)
db_session_with_containers.add(app_trigger)
db_session_with_containers.commit()
return {
@ -143,6 +156,7 @@ class TestWebhookService:
"workflow": workflow,
"webhook_trigger": webhook_trigger,
"webhook_id": webhook_id,
"app_trigger": app_trigger,
}
def test_get_webhook_trigger_and_workflow_success(self, test_data, flask_app_with_containers):
@ -436,23 +450,20 @@ class TestWebhookService:
# Verify AsyncWorkflowService was called
mock_external_dependencies["async_service"].trigger_workflow_async.assert_called_once()
def test_trigger_workflow_execution_no_tenant_owner(
def test_trigger_workflow_execution_end_user_service_failure(
self, test_data, mock_external_dependencies, flask_app_with_containers
):
"""Test workflow execution trigger when tenant owner not found."""
"""Test workflow execution trigger when EndUserService fails."""
webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}}
with flask_app_with_containers.app_context():
# Mock tenant owner lookup to return None
with (
patch("services.trigger.webhook_service.select") as mock_select,
patch("services.trigger.webhook_service.Session") as mock_session,
):
mock_session_instance = MagicMock()
mock_session.return_value.__enter__.return_value = mock_session_instance
mock_session_instance.scalar.return_value = None
# Mock EndUserService to raise an exception
with patch(
"services.trigger.webhook_service.EndUserService.get_or_create_end_user_by_type"
) as mock_end_user:
mock_end_user.side_effect = ValueError("Failed to create end user")
with pytest.raises(ValueError, match="Tenant owner not found"):
with pytest.raises(ValueError, match="Failed to create end user"):
WebhookService.trigger_workflow_execution(
test_data["webhook_trigger"], webhook_data, test_data["workflow"]
)