diff --git a/api/tests/test_containers_integration_tests/services/test_schedule_service.py b/api/tests/test_containers_integration_tests/services/test_schedule_service.py new file mode 100644 index 0000000000..87f3306258 --- /dev/null +++ b/api/tests/test_containers_integration_tests/services/test_schedule_service.py @@ -0,0 +1,387 @@ +"""Testcontainers integration tests for schedule service SQL-backed behavior.""" + +from datetime import datetime +from types import SimpleNamespace +from uuid import uuid4 + +import pytest +from sqlalchemy import delete, select +from sqlalchemy.orm import Session + +from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate +from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError +from events.event_handlers.sync_workflow_schedule_when_app_published import sync_schedule_from_workflow +from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole +from models.trigger import WorkflowSchedulePlan +from services.errors.account import AccountNotFoundError +from services.trigger.schedule_service import ScheduleService + + +class ScheduleServiceIntegrationFactory: + @staticmethod + def create_account_with_tenant( + db_session_with_containers: Session, + role: TenantAccountRole = TenantAccountRole.OWNER, + ) -> tuple[Account, Tenant]: + account = Account( + email=f"{uuid4()}@example.com", + name=f"user-{uuid4()}", + interface_language="en-US", + status="active", + ) + tenant = Tenant(name=f"tenant-{uuid4()}", status="normal") + db_session_with_containers.add_all([account, tenant]) + db_session_with_containers.flush() + + join = TenantAccountJoin( + tenant_id=tenant.id, + account_id=account.id, + role=role, + current=True, + ) + db_session_with_containers.add(join) + db_session_with_containers.commit() + + account.current_tenant = tenant + return account, tenant + + @staticmethod + def create_schedule_plan( + db_session_with_containers: Session, + *, + tenant_id: str, + app_id: str | None = None, + node_id: str = "start", + cron_expression: str = "30 10 * * *", + timezone: str = "UTC", + next_run_at: datetime | None = None, + ) -> WorkflowSchedulePlan: + schedule = WorkflowSchedulePlan( + tenant_id=tenant_id, + app_id=app_id or str(uuid4()), + node_id=node_id, + cron_expression=cron_expression, + timezone=timezone, + next_run_at=next_run_at, + ) + db_session_with_containers.add(schedule) + db_session_with_containers.commit() + return schedule + + +def _cron_workflow( + *, + node_id: str = "start", + cron_expression: str = "30 10 * * *", + timezone: str = "UTC", +): + return SimpleNamespace( + graph_dict={ + "nodes": [ + { + "id": node_id, + "data": { + "type": "trigger-schedule", + "mode": "cron", + "cron_expression": cron_expression, + "timezone": timezone, + }, + } + ] + } + ) + + +def _no_schedule_workflow(): + return SimpleNamespace( + graph_dict={ + "nodes": [ + { + "id": "node-1", + "data": {"type": "llm"}, + } + ] + } + ) + + +class TestScheduleServiceIntegration: + def test_create_schedule_persists_schedule(self, db_session_with_containers: Session): + account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + expected_next_run = datetime(2026, 1, 1, 10, 30, 0) + config = ScheduleConfig( + node_id="start", + cron_expression="30 10 * * *", + timezone="UTC", + ) + + with pytest.MonkeyPatch.context() as monkeypatch: + monkeypatch.setattr( + "services.trigger.schedule_service.calculate_next_run_at", + lambda *_args, **_kwargs: expected_next_run, + ) + schedule = ScheduleService.create_schedule( + session=db_session_with_containers, + tenant_id=tenant.id, + app_id=str(uuid4()), + config=config, + ) + + persisted = db_session_with_containers.get(WorkflowSchedulePlan, schedule.id) + assert persisted is not None + assert persisted.tenant_id == tenant.id + assert persisted.node_id == "start" + assert persisted.cron_expression == "30 10 * * *" + assert persisted.timezone == "UTC" + assert persisted.next_run_at == expected_next_run + + def test_update_schedule_updates_fields_and_recomputes_next_run(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + schedule = ScheduleServiceIntegrationFactory.create_schedule_plan( + db_session_with_containers, + tenant_id=tenant.id, + cron_expression="30 10 * * *", + timezone="UTC", + ) + expected_next_run = datetime(2026, 1, 2, 12, 0, 0) + + with pytest.MonkeyPatch.context() as monkeypatch: + monkeypatch.setattr( + "services.trigger.schedule_service.calculate_next_run_at", + lambda *_args, **_kwargs: expected_next_run, + ) + updated = ScheduleService.update_schedule( + session=db_session_with_containers, + schedule_id=schedule.id, + updates=SchedulePlanUpdate( + cron_expression="0 12 * * *", + timezone="America/New_York", + ), + ) + + db_session_with_containers.refresh(updated) + assert updated.cron_expression == "0 12 * * *" + assert updated.timezone == "America/New_York" + assert updated.next_run_at == expected_next_run + + def test_update_schedule_updates_only_node_id_without_recomputing_time(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + initial_next_run = datetime(2026, 1, 1, 10, 0, 0) + schedule = ScheduleServiceIntegrationFactory.create_schedule_plan( + db_session_with_containers, + tenant_id=tenant.id, + next_run_at=initial_next_run, + ) + + with pytest.MonkeyPatch.context() as monkeypatch: + calls: list[tuple] = [] + + def _track(*args, **kwargs): + calls.append((args, kwargs)) + return datetime(2026, 1, 9, 10, 0, 0) + + monkeypatch.setattr("services.trigger.schedule_service.calculate_next_run_at", _track) + updated = ScheduleService.update_schedule( + session=db_session_with_containers, + schedule_id=schedule.id, + updates=SchedulePlanUpdate(node_id="node-new"), + ) + + db_session_with_containers.refresh(updated) + assert updated.node_id == "node-new" + assert updated.next_run_at == initial_next_run + assert calls == [] + + def test_update_schedule_not_found_raises(self, db_session_with_containers: Session): + with pytest.raises(ScheduleNotFoundError, match="Schedule not found"): + ScheduleService.update_schedule( + session=db_session_with_containers, + schedule_id=str(uuid4()), + updates=SchedulePlanUpdate(node_id="node-new"), + ) + + def test_delete_schedule_removes_row(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + schedule = ScheduleServiceIntegrationFactory.create_schedule_plan( + db_session_with_containers, + tenant_id=tenant.id, + ) + + ScheduleService.delete_schedule( + session=db_session_with_containers, + schedule_id=schedule.id, + ) + db_session_with_containers.commit() + + assert db_session_with_containers.get(WorkflowSchedulePlan, schedule.id) is None + + def test_delete_schedule_not_found_raises(self, db_session_with_containers: Session): + with pytest.raises(ScheduleNotFoundError, match="Schedule not found"): + ScheduleService.delete_schedule( + session=db_session_with_containers, + schedule_id=str(uuid4()), + ) + + def test_get_tenant_owner_returns_owner_account(self, db_session_with_containers: Session): + owner, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant( + db_session_with_containers, + role=TenantAccountRole.OWNER, + ) + + result = ScheduleService.get_tenant_owner( + session=db_session_with_containers, + tenant_id=tenant.id, + ) + + assert result.id == owner.id + + def test_get_tenant_owner_falls_back_to_admin(self, db_session_with_containers: Session): + admin, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant( + db_session_with_containers, + role=TenantAccountRole.ADMIN, + ) + + result = ScheduleService.get_tenant_owner( + session=db_session_with_containers, + tenant_id=tenant.id, + ) + + assert result.id == admin.id + + def test_get_tenant_owner_raises_when_account_record_missing(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + db_session_with_containers.execute(delete(TenantAccountJoin)) + missing_account_id = str(uuid4()) + join = TenantAccountJoin( + tenant_id=tenant.id, + account_id=missing_account_id, + role=TenantAccountRole.OWNER, + current=True, + ) + db_session_with_containers.add(join) + db_session_with_containers.commit() + + with pytest.raises(AccountNotFoundError, match=missing_account_id): + ScheduleService.get_tenant_owner(session=db_session_with_containers, tenant_id=tenant.id) + + def test_get_tenant_owner_raises_when_no_owner_or_admin_found(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + db_session_with_containers.execute(delete(TenantAccountJoin)) + db_session_with_containers.commit() + + with pytest.raises(AccountNotFoundError, match=tenant.id): + ScheduleService.get_tenant_owner(session=db_session_with_containers, tenant_id=tenant.id) + + def test_update_next_run_at_updates_persisted_value(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + schedule = ScheduleServiceIntegrationFactory.create_schedule_plan( + db_session_with_containers, + tenant_id=tenant.id, + ) + expected_next_run = datetime(2026, 1, 3, 10, 30, 0) + + with pytest.MonkeyPatch.context() as monkeypatch: + monkeypatch.setattr( + "services.trigger.schedule_service.calculate_next_run_at", + lambda *_args, **_kwargs: expected_next_run, + ) + result = ScheduleService.update_next_run_at( + session=db_session_with_containers, + schedule_id=schedule.id, + ) + + db_session_with_containers.refresh(schedule) + assert result == expected_next_run + assert schedule.next_run_at == expected_next_run + + def test_update_next_run_at_raises_when_schedule_not_found(self, db_session_with_containers: Session): + with pytest.raises(ScheduleNotFoundError, match="Schedule not found"): + ScheduleService.update_next_run_at( + session=db_session_with_containers, + schedule_id=str(uuid4()), + ) + + +class TestSyncScheduleFromWorkflowIntegration: + def test_sync_schedule_create_new(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + app_id = str(uuid4()) + expected_next_run = datetime(2026, 1, 4, 10, 30, 0) + + with pytest.MonkeyPatch.context() as monkeypatch: + monkeypatch.setattr( + "services.trigger.schedule_service.calculate_next_run_at", + lambda *_args, **_kwargs: expected_next_run, + ) + result = sync_schedule_from_workflow( + tenant_id=tenant.id, + app_id=app_id, + workflow=_cron_workflow(), + ) + + assert result is not None + persisted = db_session_with_containers.execute( + select(WorkflowSchedulePlan).where(WorkflowSchedulePlan.app_id == app_id) + ).scalar_one() + assert persisted.node_id == "start" + assert persisted.cron_expression == "30 10 * * *" + assert persisted.timezone == "UTC" + assert persisted.next_run_at == expected_next_run + + def test_sync_schedule_update_existing(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + app_id = str(uuid4()) + existing = ScheduleServiceIntegrationFactory.create_schedule_plan( + db_session_with_containers, + tenant_id=tenant.id, + app_id=app_id, + node_id="old-start", + cron_expression="30 10 * * *", + timezone="UTC", + ) + existing_id = existing.id + expected_next_run = datetime(2026, 1, 5, 12, 0, 0) + + with pytest.MonkeyPatch.context() as monkeypatch: + monkeypatch.setattr( + "services.trigger.schedule_service.calculate_next_run_at", + lambda *_args, **_kwargs: expected_next_run, + ) + result = sync_schedule_from_workflow( + tenant_id=tenant.id, + app_id=app_id, + workflow=_cron_workflow( + node_id="start", + cron_expression="0 12 * * *", + timezone="America/New_York", + ), + ) + + assert result is not None + db_session_with_containers.expire_all() + persisted = db_session_with_containers.get(WorkflowSchedulePlan, existing_id) + assert persisted is not None + assert persisted.node_id == "start" + assert persisted.cron_expression == "0 12 * * *" + assert persisted.timezone == "America/New_York" + assert persisted.next_run_at == expected_next_run + + def test_sync_schedule_remove_when_no_config(self, db_session_with_containers: Session): + _account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers) + app_id = str(uuid4()) + existing = ScheduleServiceIntegrationFactory.create_schedule_plan( + db_session_with_containers, + tenant_id=tenant.id, + app_id=app_id, + ) + existing_id = existing.id + + result = sync_schedule_from_workflow( + tenant_id=tenant.id, + app_id=app_id, + workflow=_no_schedule_workflow(), + ) + + assert result is None + db_session_with_containers.expire_all() + assert db_session_with_containers.get(WorkflowSchedulePlan, existing_id) is None diff --git a/api/tests/unit_tests/services/test_schedule_service.py b/api/tests/unit_tests/services/test_schedule_service.py index 334062242b..0f8f7ffab5 100644 --- a/api/tests/unit_tests/services/test_schedule_service.py +++ b/api/tests/unit_tests/services/test_schedule_service.py @@ -2,23 +2,16 @@ import unittest from datetime import UTC, datetime from types import SimpleNamespace from typing import Any, cast -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import MagicMock, Mock import pytest from sqlalchemy.orm import Session from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE -from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate, VisualConfig -from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError, ScheduleNotFoundError -from events.event_handlers.sync_workflow_schedule_when_app_published import ( - sync_schedule_from_workflow, -) +from core.workflow.nodes.trigger_schedule.entities import VisualConfig +from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h -from models.account import Account, TenantAccountJoin -from models.trigger import WorkflowSchedulePlan from models.workflow import Workflow -from services.errors.account import AccountNotFoundError -from services.trigger import schedule_service as service_module from services.trigger.schedule_service import ScheduleService @@ -83,180 +76,6 @@ class TestScheduleService(unittest.TestCase): with pytest.raises(UnknownTimeZoneError): calculate_next_run_at(cron_expr, timezone) - @patch("libs.schedule_utils.calculate_next_run_at") - def test_create_schedule(self, mock_calculate_next_run): - """Test creating a new schedule.""" - mock_session = MagicMock(spec=Session) - mock_calculate_next_run.return_value = datetime(2025, 8, 30, 10, 30, 0, tzinfo=UTC) - - config = ScheduleConfig( - node_id="start", - cron_expression="30 10 * * *", - timezone="UTC", - ) - - schedule = ScheduleService.create_schedule( - session=mock_session, - tenant_id="test-tenant", - app_id="test-app", - config=config, - ) - - assert schedule is not None - assert schedule.tenant_id == "test-tenant" - assert schedule.app_id == "test-app" - assert schedule.node_id == "start" - assert schedule.cron_expression == "30 10 * * *" - assert schedule.timezone == "UTC" - assert schedule.next_run_at is not None - mock_session.add.assert_called_once() - mock_session.flush.assert_called_once() - - @patch("services.trigger.schedule_service.calculate_next_run_at") - def test_update_schedule(self, mock_calculate_next_run): - """Test updating an existing schedule.""" - mock_session = MagicMock(spec=Session) - mock_schedule = Mock(spec=WorkflowSchedulePlan) - mock_schedule.cron_expression = "0 12 * * *" - mock_schedule.timezone = "America/New_York" - mock_session.get.return_value = mock_schedule - mock_calculate_next_run.return_value = datetime(2025, 8, 30, 12, 0, 0, tzinfo=UTC) - - updates = SchedulePlanUpdate( - cron_expression="0 12 * * *", - timezone="America/New_York", - ) - - result = ScheduleService.update_schedule( - session=mock_session, - schedule_id="test-schedule-id", - updates=updates, - ) - - assert result is not None - assert result.cron_expression == "0 12 * * *" - assert result.timezone == "America/New_York" - mock_calculate_next_run.assert_called_once() - mock_session.flush.assert_called_once() - - def test_update_schedule_not_found(self): - """Test updating a non-existent schedule raises exception.""" - from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError - - mock_session = MagicMock(spec=Session) - mock_session.get.return_value = None - - updates = SchedulePlanUpdate( - cron_expression="0 12 * * *", - ) - - with pytest.raises(ScheduleNotFoundError) as context: - ScheduleService.update_schedule( - session=mock_session, - schedule_id="non-existent-id", - updates=updates, - ) - - assert "Schedule not found: non-existent-id" in str(context.value) - mock_session.flush.assert_not_called() - - def test_delete_schedule(self): - """Test deleting a schedule.""" - mock_session = MagicMock(spec=Session) - mock_schedule = Mock(spec=WorkflowSchedulePlan) - mock_session.get.return_value = mock_schedule - - # Should not raise exception and complete successfully - ScheduleService.delete_schedule( - session=mock_session, - schedule_id="test-schedule-id", - ) - - mock_session.delete.assert_called_once_with(mock_schedule) - mock_session.flush.assert_called_once() - - def test_delete_schedule_not_found(self): - """Test deleting a non-existent schedule raises exception.""" - from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError - - mock_session = MagicMock(spec=Session) - mock_session.get.return_value = None - - # Should raise ScheduleNotFoundError - with pytest.raises(ScheduleNotFoundError) as context: - ScheduleService.delete_schedule( - session=mock_session, - schedule_id="non-existent-id", - ) - - assert "Schedule not found: non-existent-id" in str(context.value) - mock_session.delete.assert_not_called() - - @patch("services.trigger.schedule_service.select") - def test_get_tenant_owner(self, mock_select): - """Test getting tenant owner account.""" - mock_session = MagicMock(spec=Session) - mock_account = Mock(spec=Account) - mock_account.id = "owner-account-id" - - # Mock owner query - mock_owner_result = Mock(spec=TenantAccountJoin) - mock_owner_result.account_id = "owner-account-id" - - mock_session.execute.return_value.scalar_one_or_none.return_value = mock_owner_result - mock_session.get.return_value = mock_account - - result = ScheduleService.get_tenant_owner( - session=mock_session, - tenant_id="test-tenant", - ) - - assert result is not None - assert result.id == "owner-account-id" - - @patch("services.trigger.schedule_service.select") - def test_get_tenant_owner_fallback_to_admin(self, mock_select): - """Test getting tenant owner falls back to admin if no owner.""" - mock_session = MagicMock(spec=Session) - mock_account = Mock(spec=Account) - mock_account.id = "admin-account-id" - - # Mock admin query (owner returns None) - mock_admin_result = Mock(spec=TenantAccountJoin) - mock_admin_result.account_id = "admin-account-id" - - mock_session.execute.return_value.scalar_one_or_none.side_effect = [None, mock_admin_result] - mock_session.get.return_value = mock_account - - result = ScheduleService.get_tenant_owner( - session=mock_session, - tenant_id="test-tenant", - ) - - assert result is not None - assert result.id == "admin-account-id" - - @patch("services.trigger.schedule_service.calculate_next_run_at") - def test_update_next_run_at(self, mock_calculate_next_run): - """Test updating next run time after schedule triggered.""" - mock_session = MagicMock(spec=Session) - mock_schedule = Mock(spec=WorkflowSchedulePlan) - mock_schedule.cron_expression = "30 10 * * *" - mock_schedule.timezone = "UTC" - mock_session.get.return_value = mock_schedule - - next_time = datetime(2025, 8, 31, 10, 30, 0, tzinfo=UTC) - mock_calculate_next_run.return_value = next_time - - result = ScheduleService.update_next_run_at( - session=mock_session, - schedule_id="test-schedule-id", - ) - - assert result == next_time - assert mock_schedule.next_run_at == next_time - mock_session.flush.assert_called_once() - class TestVisualToCron(unittest.TestCase): """Test cases for visual configuration to cron conversion.""" @@ -678,108 +497,6 @@ class TestScheduleWithTimezone(unittest.TestCase): assert summer_next.hour == 14 -class TestSyncScheduleFromWorkflow(unittest.TestCase): - """Test cases for syncing schedule from workflow.""" - - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.db") - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService") - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.select") - def test_sync_schedule_create_new(self, mock_select, mock_service, mock_db): - """Test creating new schedule when none exists.""" - mock_session = MagicMock() - mock_db.engine = MagicMock() - mock_session.__enter__ = MagicMock(return_value=mock_session) - mock_session.__exit__ = MagicMock(return_value=None) - sessionmaker = MagicMock(return_value=MagicMock(begin=MagicMock(return_value=mock_session))) - with patch("events.event_handlers.sync_workflow_schedule_when_app_published.sessionmaker", sessionmaker): - mock_session.scalar.return_value = None # No existing plan - - # Mock extract_schedule_config to return a ScheduleConfig object - mock_config = Mock(spec=ScheduleConfig) - mock_config.node_id = "start" - mock_config.cron_expression = "30 10 * * *" - mock_config.timezone = "UTC" - mock_service.extract_schedule_config.return_value = mock_config - - mock_new_plan = Mock(spec=WorkflowSchedulePlan) - mock_service.create_schedule.return_value = mock_new_plan - - workflow = Mock(spec=Workflow) - result = sync_schedule_from_workflow("tenant-id", "app-id", workflow) - - assert result == mock_new_plan - mock_service.create_schedule.assert_called_once() - mock_session.commit.assert_not_called() - - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.db") - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService") - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.select") - def test_sync_schedule_update_existing(self, mock_select, mock_service, mock_db): - """Test updating existing schedule.""" - mock_session = MagicMock() - mock_db.engine = MagicMock() - mock_session.__enter__ = MagicMock(return_value=mock_session) - mock_session.__exit__ = MagicMock(return_value=None) - sessionmaker = MagicMock(return_value=MagicMock(begin=MagicMock(return_value=mock_session))) - - with patch("events.event_handlers.sync_workflow_schedule_when_app_published.sessionmaker", sessionmaker): - mock_existing_plan = Mock(spec=WorkflowSchedulePlan) - mock_existing_plan.id = "existing-plan-id" - mock_session.scalar.return_value = mock_existing_plan - - # Mock extract_schedule_config to return a ScheduleConfig object - mock_config = Mock(spec=ScheduleConfig) - mock_config.node_id = "start" - mock_config.cron_expression = "0 12 * * *" - mock_config.timezone = "America/New_York" - mock_service.extract_schedule_config.return_value = mock_config - - mock_updated_plan = Mock(spec=WorkflowSchedulePlan) - mock_service.update_schedule.return_value = mock_updated_plan - - workflow = Mock(spec=Workflow) - result = sync_schedule_from_workflow("tenant-id", "app-id", workflow) - - assert result == mock_updated_plan - mock_service.update_schedule.assert_called_once() - # Verify the arguments passed to update_schedule - call_args = mock_service.update_schedule.call_args - assert call_args.kwargs["session"] == mock_session - assert call_args.kwargs["schedule_id"] == "existing-plan-id" - updates_obj = call_args.kwargs["updates"] - assert isinstance(updates_obj, SchedulePlanUpdate) - assert updates_obj.node_id == "start" - assert updates_obj.cron_expression == "0 12 * * *" - assert updates_obj.timezone == "America/New_York" - mock_session.commit.assert_not_called() - - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.db") - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService") - @patch("events.event_handlers.sync_workflow_schedule_when_app_published.select") - def test_sync_schedule_remove_when_no_config(self, mock_select, mock_service, mock_db): - """Test removing schedule when no schedule config in workflow.""" - mock_session = MagicMock() - mock_db.engine = MagicMock() - mock_session.__enter__ = MagicMock(return_value=mock_session) - mock_session.__exit__ = MagicMock(return_value=None) - sessionmaker = MagicMock(return_value=MagicMock(begin=MagicMock(return_value=mock_session))) - - with patch("events.event_handlers.sync_workflow_schedule_when_app_published.sessionmaker", sessionmaker): - mock_existing_plan = Mock(spec=WorkflowSchedulePlan) - mock_existing_plan.id = "existing-plan-id" - mock_session.scalar.return_value = mock_existing_plan - - mock_service.extract_schedule_config.return_value = None # No schedule config - - workflow = Mock(spec=Workflow) - result = sync_schedule_from_workflow("tenant-id", "app-id", workflow) - - assert result is None - # Now using ScheduleService.delete_schedule instead of session.delete - mock_service.delete_schedule.assert_called_once_with(session=mock_session, schedule_id="existing-plan-id") - mock_session.commit.assert_not_called() - - @pytest.fixture def session_mock() -> MagicMock: return MagicMock(spec=Session) @@ -789,62 +506,6 @@ def _workflow(**kwargs: Any) -> Workflow: return cast(Workflow, SimpleNamespace(**kwargs)) -def test_update_schedule_should_update_only_node_id_without_recomputing_time( - session_mock: MagicMock, - monkeypatch: pytest.MonkeyPatch, -) -> None: - # Arrange - schedule = MagicMock(spec=WorkflowSchedulePlan) - schedule.cron_expression = "0 10 * * *" - schedule.timezone = "UTC" - session_mock.get.return_value = schedule - - next_run_mock = MagicMock(return_value=datetime(2026, 1, 1, 10, 0, tzinfo=UTC)) - monkeypatch.setattr(service_module, "calculate_next_run_at", next_run_mock) - - # Act - result = ScheduleService.update_schedule( - session=session_mock, - schedule_id="schedule-1", - updates=SchedulePlanUpdate(node_id="node-new"), - ) - - # Assert - assert result is schedule - assert schedule.node_id == "node-new" - next_run_mock.assert_not_called() - session_mock.flush.assert_called_once() - - -def test_get_tenant_owner_should_raise_when_account_record_missing(session_mock: MagicMock) -> None: - # Arrange - join = SimpleNamespace(account_id="account-404") - session_mock.execute.return_value.scalar_one_or_none.return_value = join - session_mock.get.return_value = None - - # Act / Assert - with pytest.raises(AccountNotFoundError, match="Account not found: account-404"): - ScheduleService.get_tenant_owner(session=session_mock, tenant_id="tenant-1") - - -def test_get_tenant_owner_should_raise_when_no_owner_or_admin_found(session_mock: MagicMock) -> None: - # Arrange - session_mock.execute.return_value.scalar_one_or_none.side_effect = [None, None] - - # Act / Assert - with pytest.raises(AccountNotFoundError, match="Account not found for tenant: tenant-1"): - ScheduleService.get_tenant_owner(session=session_mock, tenant_id="tenant-1") - - -def test_update_next_run_at_should_raise_when_schedule_not_found(session_mock: MagicMock) -> None: - # Arrange - session_mock.get.return_value = None - - # Act / Assert - with pytest.raises(ScheduleNotFoundError, match="Schedule not found: schedule-1"): - ScheduleService.update_next_run_at(session=session_mock, schedule_id="schedule-1") - - def test_to_schedule_config_should_build_from_cron_mode() -> None: # Arrange node_config: dict[str, Any] = {