test: migrate schedule service mock tests to testcontainers (#35196)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
James 2026-04-14 22:34:53 +02:00 committed by GitHub
parent 693080aa12
commit 7de92c598f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 390 additions and 342 deletions

View File

@ -0,0 +1,387 @@
"""Testcontainers integration tests for schedule service SQL-backed behavior."""
from datetime import datetime
from types import SimpleNamespace
from uuid import uuid4
import pytest
from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate
from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError
from events.event_handlers.sync_workflow_schedule_when_app_published import sync_schedule_from_workflow
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
from models.trigger import WorkflowSchedulePlan
from services.errors.account import AccountNotFoundError
from services.trigger.schedule_service import ScheduleService
class ScheduleServiceIntegrationFactory:
@staticmethod
def create_account_with_tenant(
db_session_with_containers: Session,
role: TenantAccountRole = TenantAccountRole.OWNER,
) -> tuple[Account, Tenant]:
account = Account(
email=f"{uuid4()}@example.com",
name=f"user-{uuid4()}",
interface_language="en-US",
status="active",
)
tenant = Tenant(name=f"tenant-{uuid4()}", status="normal")
db_session_with_containers.add_all([account, tenant])
db_session_with_containers.flush()
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=role,
current=True,
)
db_session_with_containers.add(join)
db_session_with_containers.commit()
account.current_tenant = tenant
return account, tenant
@staticmethod
def create_schedule_plan(
db_session_with_containers: Session,
*,
tenant_id: str,
app_id: str | None = None,
node_id: str = "start",
cron_expression: str = "30 10 * * *",
timezone: str = "UTC",
next_run_at: datetime | None = None,
) -> WorkflowSchedulePlan:
schedule = WorkflowSchedulePlan(
tenant_id=tenant_id,
app_id=app_id or str(uuid4()),
node_id=node_id,
cron_expression=cron_expression,
timezone=timezone,
next_run_at=next_run_at,
)
db_session_with_containers.add(schedule)
db_session_with_containers.commit()
return schedule
def _cron_workflow(
*,
node_id: str = "start",
cron_expression: str = "30 10 * * *",
timezone: str = "UTC",
):
return SimpleNamespace(
graph_dict={
"nodes": [
{
"id": node_id,
"data": {
"type": "trigger-schedule",
"mode": "cron",
"cron_expression": cron_expression,
"timezone": timezone,
},
}
]
}
)
def _no_schedule_workflow():
return SimpleNamespace(
graph_dict={
"nodes": [
{
"id": "node-1",
"data": {"type": "llm"},
}
]
}
)
class TestScheduleServiceIntegration:
def test_create_schedule_persists_schedule(self, db_session_with_containers: Session):
account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
expected_next_run = datetime(2026, 1, 1, 10, 30, 0)
config = ScheduleConfig(
node_id="start",
cron_expression="30 10 * * *",
timezone="UTC",
)
with pytest.MonkeyPatch.context() as monkeypatch:
monkeypatch.setattr(
"services.trigger.schedule_service.calculate_next_run_at",
lambda *_args, **_kwargs: expected_next_run,
)
schedule = ScheduleService.create_schedule(
session=db_session_with_containers,
tenant_id=tenant.id,
app_id=str(uuid4()),
config=config,
)
persisted = db_session_with_containers.get(WorkflowSchedulePlan, schedule.id)
assert persisted is not None
assert persisted.tenant_id == tenant.id
assert persisted.node_id == "start"
assert persisted.cron_expression == "30 10 * * *"
assert persisted.timezone == "UTC"
assert persisted.next_run_at == expected_next_run
def test_update_schedule_updates_fields_and_recomputes_next_run(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
schedule = ScheduleServiceIntegrationFactory.create_schedule_plan(
db_session_with_containers,
tenant_id=tenant.id,
cron_expression="30 10 * * *",
timezone="UTC",
)
expected_next_run = datetime(2026, 1, 2, 12, 0, 0)
with pytest.MonkeyPatch.context() as monkeypatch:
monkeypatch.setattr(
"services.trigger.schedule_service.calculate_next_run_at",
lambda *_args, **_kwargs: expected_next_run,
)
updated = ScheduleService.update_schedule(
session=db_session_with_containers,
schedule_id=schedule.id,
updates=SchedulePlanUpdate(
cron_expression="0 12 * * *",
timezone="America/New_York",
),
)
db_session_with_containers.refresh(updated)
assert updated.cron_expression == "0 12 * * *"
assert updated.timezone == "America/New_York"
assert updated.next_run_at == expected_next_run
def test_update_schedule_updates_only_node_id_without_recomputing_time(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
initial_next_run = datetime(2026, 1, 1, 10, 0, 0)
schedule = ScheduleServiceIntegrationFactory.create_schedule_plan(
db_session_with_containers,
tenant_id=tenant.id,
next_run_at=initial_next_run,
)
with pytest.MonkeyPatch.context() as monkeypatch:
calls: list[tuple] = []
def _track(*args, **kwargs):
calls.append((args, kwargs))
return datetime(2026, 1, 9, 10, 0, 0)
monkeypatch.setattr("services.trigger.schedule_service.calculate_next_run_at", _track)
updated = ScheduleService.update_schedule(
session=db_session_with_containers,
schedule_id=schedule.id,
updates=SchedulePlanUpdate(node_id="node-new"),
)
db_session_with_containers.refresh(updated)
assert updated.node_id == "node-new"
assert updated.next_run_at == initial_next_run
assert calls == []
def test_update_schedule_not_found_raises(self, db_session_with_containers: Session):
with pytest.raises(ScheduleNotFoundError, match="Schedule not found"):
ScheduleService.update_schedule(
session=db_session_with_containers,
schedule_id=str(uuid4()),
updates=SchedulePlanUpdate(node_id="node-new"),
)
def test_delete_schedule_removes_row(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
schedule = ScheduleServiceIntegrationFactory.create_schedule_plan(
db_session_with_containers,
tenant_id=tenant.id,
)
ScheduleService.delete_schedule(
session=db_session_with_containers,
schedule_id=schedule.id,
)
db_session_with_containers.commit()
assert db_session_with_containers.get(WorkflowSchedulePlan, schedule.id) is None
def test_delete_schedule_not_found_raises(self, db_session_with_containers: Session):
with pytest.raises(ScheduleNotFoundError, match="Schedule not found"):
ScheduleService.delete_schedule(
session=db_session_with_containers,
schedule_id=str(uuid4()),
)
def test_get_tenant_owner_returns_owner_account(self, db_session_with_containers: Session):
owner, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(
db_session_with_containers,
role=TenantAccountRole.OWNER,
)
result = ScheduleService.get_tenant_owner(
session=db_session_with_containers,
tenant_id=tenant.id,
)
assert result.id == owner.id
def test_get_tenant_owner_falls_back_to_admin(self, db_session_with_containers: Session):
admin, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(
db_session_with_containers,
role=TenantAccountRole.ADMIN,
)
result = ScheduleService.get_tenant_owner(
session=db_session_with_containers,
tenant_id=tenant.id,
)
assert result.id == admin.id
def test_get_tenant_owner_raises_when_account_record_missing(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
db_session_with_containers.execute(delete(TenantAccountJoin))
missing_account_id = str(uuid4())
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=missing_account_id,
role=TenantAccountRole.OWNER,
current=True,
)
db_session_with_containers.add(join)
db_session_with_containers.commit()
with pytest.raises(AccountNotFoundError, match=missing_account_id):
ScheduleService.get_tenant_owner(session=db_session_with_containers, tenant_id=tenant.id)
def test_get_tenant_owner_raises_when_no_owner_or_admin_found(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
db_session_with_containers.execute(delete(TenantAccountJoin))
db_session_with_containers.commit()
with pytest.raises(AccountNotFoundError, match=tenant.id):
ScheduleService.get_tenant_owner(session=db_session_with_containers, tenant_id=tenant.id)
def test_update_next_run_at_updates_persisted_value(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
schedule = ScheduleServiceIntegrationFactory.create_schedule_plan(
db_session_with_containers,
tenant_id=tenant.id,
)
expected_next_run = datetime(2026, 1, 3, 10, 30, 0)
with pytest.MonkeyPatch.context() as monkeypatch:
monkeypatch.setattr(
"services.trigger.schedule_service.calculate_next_run_at",
lambda *_args, **_kwargs: expected_next_run,
)
result = ScheduleService.update_next_run_at(
session=db_session_with_containers,
schedule_id=schedule.id,
)
db_session_with_containers.refresh(schedule)
assert result == expected_next_run
assert schedule.next_run_at == expected_next_run
def test_update_next_run_at_raises_when_schedule_not_found(self, db_session_with_containers: Session):
with pytest.raises(ScheduleNotFoundError, match="Schedule not found"):
ScheduleService.update_next_run_at(
session=db_session_with_containers,
schedule_id=str(uuid4()),
)
class TestSyncScheduleFromWorkflowIntegration:
def test_sync_schedule_create_new(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
app_id = str(uuid4())
expected_next_run = datetime(2026, 1, 4, 10, 30, 0)
with pytest.MonkeyPatch.context() as monkeypatch:
monkeypatch.setattr(
"services.trigger.schedule_service.calculate_next_run_at",
lambda *_args, **_kwargs: expected_next_run,
)
result = sync_schedule_from_workflow(
tenant_id=tenant.id,
app_id=app_id,
workflow=_cron_workflow(),
)
assert result is not None
persisted = db_session_with_containers.execute(
select(WorkflowSchedulePlan).where(WorkflowSchedulePlan.app_id == app_id)
).scalar_one()
assert persisted.node_id == "start"
assert persisted.cron_expression == "30 10 * * *"
assert persisted.timezone == "UTC"
assert persisted.next_run_at == expected_next_run
def test_sync_schedule_update_existing(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
app_id = str(uuid4())
existing = ScheduleServiceIntegrationFactory.create_schedule_plan(
db_session_with_containers,
tenant_id=tenant.id,
app_id=app_id,
node_id="old-start",
cron_expression="30 10 * * *",
timezone="UTC",
)
existing_id = existing.id
expected_next_run = datetime(2026, 1, 5, 12, 0, 0)
with pytest.MonkeyPatch.context() as monkeypatch:
monkeypatch.setattr(
"services.trigger.schedule_service.calculate_next_run_at",
lambda *_args, **_kwargs: expected_next_run,
)
result = sync_schedule_from_workflow(
tenant_id=tenant.id,
app_id=app_id,
workflow=_cron_workflow(
node_id="start",
cron_expression="0 12 * * *",
timezone="America/New_York",
),
)
assert result is not None
db_session_with_containers.expire_all()
persisted = db_session_with_containers.get(WorkflowSchedulePlan, existing_id)
assert persisted is not None
assert persisted.node_id == "start"
assert persisted.cron_expression == "0 12 * * *"
assert persisted.timezone == "America/New_York"
assert persisted.next_run_at == expected_next_run
def test_sync_schedule_remove_when_no_config(self, db_session_with_containers: Session):
_account, tenant = ScheduleServiceIntegrationFactory.create_account_with_tenant(db_session_with_containers)
app_id = str(uuid4())
existing = ScheduleServiceIntegrationFactory.create_schedule_plan(
db_session_with_containers,
tenant_id=tenant.id,
app_id=app_id,
)
existing_id = existing.id
result = sync_schedule_from_workflow(
tenant_id=tenant.id,
app_id=app_id,
workflow=_no_schedule_workflow(),
)
assert result is None
db_session_with_containers.expire_all()
assert db_session_with_containers.get(WorkflowSchedulePlan, existing_id) is None

View File

@ -2,23 +2,16 @@ import unittest
from datetime import UTC, datetime
from types import SimpleNamespace
from typing import Any, cast
from unittest.mock import MagicMock, Mock, patch
from unittest.mock import MagicMock, Mock
import pytest
from sqlalchemy.orm import Session
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate, VisualConfig
from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError, ScheduleNotFoundError
from events.event_handlers.sync_workflow_schedule_when_app_published import (
sync_schedule_from_workflow,
)
from core.workflow.nodes.trigger_schedule.entities import VisualConfig
from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError
from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h
from models.account import Account, TenantAccountJoin
from models.trigger import WorkflowSchedulePlan
from models.workflow import Workflow
from services.errors.account import AccountNotFoundError
from services.trigger import schedule_service as service_module
from services.trigger.schedule_service import ScheduleService
@ -83,180 +76,6 @@ class TestScheduleService(unittest.TestCase):
with pytest.raises(UnknownTimeZoneError):
calculate_next_run_at(cron_expr, timezone)
@patch("libs.schedule_utils.calculate_next_run_at")
def test_create_schedule(self, mock_calculate_next_run):
"""Test creating a new schedule."""
mock_session = MagicMock(spec=Session)
mock_calculate_next_run.return_value = datetime(2025, 8, 30, 10, 30, 0, tzinfo=UTC)
config = ScheduleConfig(
node_id="start",
cron_expression="30 10 * * *",
timezone="UTC",
)
schedule = ScheduleService.create_schedule(
session=mock_session,
tenant_id="test-tenant",
app_id="test-app",
config=config,
)
assert schedule is not None
assert schedule.tenant_id == "test-tenant"
assert schedule.app_id == "test-app"
assert schedule.node_id == "start"
assert schedule.cron_expression == "30 10 * * *"
assert schedule.timezone == "UTC"
assert schedule.next_run_at is not None
mock_session.add.assert_called_once()
mock_session.flush.assert_called_once()
@patch("services.trigger.schedule_service.calculate_next_run_at")
def test_update_schedule(self, mock_calculate_next_run):
"""Test updating an existing schedule."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_schedule.cron_expression = "0 12 * * *"
mock_schedule.timezone = "America/New_York"
mock_session.get.return_value = mock_schedule
mock_calculate_next_run.return_value = datetime(2025, 8, 30, 12, 0, 0, tzinfo=UTC)
updates = SchedulePlanUpdate(
cron_expression="0 12 * * *",
timezone="America/New_York",
)
result = ScheduleService.update_schedule(
session=mock_session,
schedule_id="test-schedule-id",
updates=updates,
)
assert result is not None
assert result.cron_expression == "0 12 * * *"
assert result.timezone == "America/New_York"
mock_calculate_next_run.assert_called_once()
mock_session.flush.assert_called_once()
def test_update_schedule_not_found(self):
"""Test updating a non-existent schedule raises exception."""
from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError
mock_session = MagicMock(spec=Session)
mock_session.get.return_value = None
updates = SchedulePlanUpdate(
cron_expression="0 12 * * *",
)
with pytest.raises(ScheduleNotFoundError) as context:
ScheduleService.update_schedule(
session=mock_session,
schedule_id="non-existent-id",
updates=updates,
)
assert "Schedule not found: non-existent-id" in str(context.value)
mock_session.flush.assert_not_called()
def test_delete_schedule(self):
"""Test deleting a schedule."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_session.get.return_value = mock_schedule
# Should not raise exception and complete successfully
ScheduleService.delete_schedule(
session=mock_session,
schedule_id="test-schedule-id",
)
mock_session.delete.assert_called_once_with(mock_schedule)
mock_session.flush.assert_called_once()
def test_delete_schedule_not_found(self):
"""Test deleting a non-existent schedule raises exception."""
from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError
mock_session = MagicMock(spec=Session)
mock_session.get.return_value = None
# Should raise ScheduleNotFoundError
with pytest.raises(ScheduleNotFoundError) as context:
ScheduleService.delete_schedule(
session=mock_session,
schedule_id="non-existent-id",
)
assert "Schedule not found: non-existent-id" in str(context.value)
mock_session.delete.assert_not_called()
@patch("services.trigger.schedule_service.select")
def test_get_tenant_owner(self, mock_select):
"""Test getting tenant owner account."""
mock_session = MagicMock(spec=Session)
mock_account = Mock(spec=Account)
mock_account.id = "owner-account-id"
# Mock owner query
mock_owner_result = Mock(spec=TenantAccountJoin)
mock_owner_result.account_id = "owner-account-id"
mock_session.execute.return_value.scalar_one_or_none.return_value = mock_owner_result
mock_session.get.return_value = mock_account
result = ScheduleService.get_tenant_owner(
session=mock_session,
tenant_id="test-tenant",
)
assert result is not None
assert result.id == "owner-account-id"
@patch("services.trigger.schedule_service.select")
def test_get_tenant_owner_fallback_to_admin(self, mock_select):
"""Test getting tenant owner falls back to admin if no owner."""
mock_session = MagicMock(spec=Session)
mock_account = Mock(spec=Account)
mock_account.id = "admin-account-id"
# Mock admin query (owner returns None)
mock_admin_result = Mock(spec=TenantAccountJoin)
mock_admin_result.account_id = "admin-account-id"
mock_session.execute.return_value.scalar_one_or_none.side_effect = [None, mock_admin_result]
mock_session.get.return_value = mock_account
result = ScheduleService.get_tenant_owner(
session=mock_session,
tenant_id="test-tenant",
)
assert result is not None
assert result.id == "admin-account-id"
@patch("services.trigger.schedule_service.calculate_next_run_at")
def test_update_next_run_at(self, mock_calculate_next_run):
"""Test updating next run time after schedule triggered."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_schedule.cron_expression = "30 10 * * *"
mock_schedule.timezone = "UTC"
mock_session.get.return_value = mock_schedule
next_time = datetime(2025, 8, 31, 10, 30, 0, tzinfo=UTC)
mock_calculate_next_run.return_value = next_time
result = ScheduleService.update_next_run_at(
session=mock_session,
schedule_id="test-schedule-id",
)
assert result == next_time
assert mock_schedule.next_run_at == next_time
mock_session.flush.assert_called_once()
class TestVisualToCron(unittest.TestCase):
"""Test cases for visual configuration to cron conversion."""
@ -678,108 +497,6 @@ class TestScheduleWithTimezone(unittest.TestCase):
assert summer_next.hour == 14
class TestSyncScheduleFromWorkflow(unittest.TestCase):
"""Test cases for syncing schedule from workflow."""
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_create_new(self, mock_select, mock_service, mock_db):
"""Test creating new schedule when none exists."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
sessionmaker = MagicMock(return_value=MagicMock(begin=MagicMock(return_value=mock_session)))
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.sessionmaker", sessionmaker):
mock_session.scalar.return_value = None # No existing plan
# Mock extract_schedule_config to return a ScheduleConfig object
mock_config = Mock(spec=ScheduleConfig)
mock_config.node_id = "start"
mock_config.cron_expression = "30 10 * * *"
mock_config.timezone = "UTC"
mock_service.extract_schedule_config.return_value = mock_config
mock_new_plan = Mock(spec=WorkflowSchedulePlan)
mock_service.create_schedule.return_value = mock_new_plan
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result == mock_new_plan
mock_service.create_schedule.assert_called_once()
mock_session.commit.assert_not_called()
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_update_existing(self, mock_select, mock_service, mock_db):
"""Test updating existing schedule."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
sessionmaker = MagicMock(return_value=MagicMock(begin=MagicMock(return_value=mock_session)))
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.sessionmaker", sessionmaker):
mock_existing_plan = Mock(spec=WorkflowSchedulePlan)
mock_existing_plan.id = "existing-plan-id"
mock_session.scalar.return_value = mock_existing_plan
# Mock extract_schedule_config to return a ScheduleConfig object
mock_config = Mock(spec=ScheduleConfig)
mock_config.node_id = "start"
mock_config.cron_expression = "0 12 * * *"
mock_config.timezone = "America/New_York"
mock_service.extract_schedule_config.return_value = mock_config
mock_updated_plan = Mock(spec=WorkflowSchedulePlan)
mock_service.update_schedule.return_value = mock_updated_plan
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result == mock_updated_plan
mock_service.update_schedule.assert_called_once()
# Verify the arguments passed to update_schedule
call_args = mock_service.update_schedule.call_args
assert call_args.kwargs["session"] == mock_session
assert call_args.kwargs["schedule_id"] == "existing-plan-id"
updates_obj = call_args.kwargs["updates"]
assert isinstance(updates_obj, SchedulePlanUpdate)
assert updates_obj.node_id == "start"
assert updates_obj.cron_expression == "0 12 * * *"
assert updates_obj.timezone == "America/New_York"
mock_session.commit.assert_not_called()
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_remove_when_no_config(self, mock_select, mock_service, mock_db):
"""Test removing schedule when no schedule config in workflow."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
sessionmaker = MagicMock(return_value=MagicMock(begin=MagicMock(return_value=mock_session)))
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.sessionmaker", sessionmaker):
mock_existing_plan = Mock(spec=WorkflowSchedulePlan)
mock_existing_plan.id = "existing-plan-id"
mock_session.scalar.return_value = mock_existing_plan
mock_service.extract_schedule_config.return_value = None # No schedule config
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result is None
# Now using ScheduleService.delete_schedule instead of session.delete
mock_service.delete_schedule.assert_called_once_with(session=mock_session, schedule_id="existing-plan-id")
mock_session.commit.assert_not_called()
@pytest.fixture
def session_mock() -> MagicMock:
return MagicMock(spec=Session)
@ -789,62 +506,6 @@ def _workflow(**kwargs: Any) -> Workflow:
return cast(Workflow, SimpleNamespace(**kwargs))
def test_update_schedule_should_update_only_node_id_without_recomputing_time(
session_mock: MagicMock,
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange
schedule = MagicMock(spec=WorkflowSchedulePlan)
schedule.cron_expression = "0 10 * * *"
schedule.timezone = "UTC"
session_mock.get.return_value = schedule
next_run_mock = MagicMock(return_value=datetime(2026, 1, 1, 10, 0, tzinfo=UTC))
monkeypatch.setattr(service_module, "calculate_next_run_at", next_run_mock)
# Act
result = ScheduleService.update_schedule(
session=session_mock,
schedule_id="schedule-1",
updates=SchedulePlanUpdate(node_id="node-new"),
)
# Assert
assert result is schedule
assert schedule.node_id == "node-new"
next_run_mock.assert_not_called()
session_mock.flush.assert_called_once()
def test_get_tenant_owner_should_raise_when_account_record_missing(session_mock: MagicMock) -> None:
# Arrange
join = SimpleNamespace(account_id="account-404")
session_mock.execute.return_value.scalar_one_or_none.return_value = join
session_mock.get.return_value = None
# Act / Assert
with pytest.raises(AccountNotFoundError, match="Account not found: account-404"):
ScheduleService.get_tenant_owner(session=session_mock, tenant_id="tenant-1")
def test_get_tenant_owner_should_raise_when_no_owner_or_admin_found(session_mock: MagicMock) -> None:
# Arrange
session_mock.execute.return_value.scalar_one_or_none.side_effect = [None, None]
# Act / Assert
with pytest.raises(AccountNotFoundError, match="Account not found for tenant: tenant-1"):
ScheduleService.get_tenant_owner(session=session_mock, tenant_id="tenant-1")
def test_update_next_run_at_should_raise_when_schedule_not_found(session_mock: MagicMock) -> None:
# Arrange
session_mock.get.return_value = None
# Act / Assert
with pytest.raises(ScheduleNotFoundError, match="Schedule not found: schedule-1"):
ScheduleService.update_next_run_at(session=session_mock, schedule_id="schedule-1")
def test_to_schedule_config_should_build_from_cron_mode() -> None:
# Arrange
node_config: dict[str, Any] = {