import unittest from datetime import UTC, datetime from types import SimpleNamespace from typing import Any, cast from unittest.mock import MagicMock, Mock import pytest from sqlalchemy.orm import Session from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE from core.workflow.nodes.trigger_schedule.entities import VisualConfig from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h from models.workflow import Workflow from services.trigger.schedule_service import ScheduleService class TestScheduleService(unittest.TestCase): """Test cases for ScheduleService class.""" def test_calculate_next_run_at_valid_cron(self): """Test calculating next run time with valid cron expression.""" # Test daily cron at 10:30 AM cron_expr = "30 10 * * *" timezone = "UTC" base_time = datetime(2025, 8, 29, 9, 0, 0, tzinfo=UTC) next_run = calculate_next_run_at(cron_expr, timezone, base_time) assert next_run is not None assert next_run.hour == 10 assert next_run.minute == 30 assert next_run.day == 29 def test_calculate_next_run_at_with_timezone(self): """Test calculating next run time with different timezone.""" cron_expr = "0 9 * * *" # 9:00 AM timezone = "America/New_York" base_time = datetime(2025, 8, 29, 12, 0, 0, tzinfo=UTC) # 8:00 AM EDT next_run = calculate_next_run_at(cron_expr, timezone, base_time) assert next_run is not None # 9:00 AM EDT = 13:00 UTC (during EDT) expected_utc_hour = 13 assert next_run.hour == expected_utc_hour def test_calculate_next_run_at_with_last_day_of_month(self): """Test calculating next run time with 'L' (last day) syntax.""" cron_expr = "0 10 L * *" # 10:00 AM on last day of month timezone = "UTC" base_time = datetime(2025, 2, 15, 9, 0, 0, tzinfo=UTC) next_run = calculate_next_run_at(cron_expr, timezone, base_time) assert next_run is not None # February 2025 has 28 days assert next_run.day == 28 assert next_run.month == 2 def test_calculate_next_run_at_invalid_cron(self): """Test calculating next run time with invalid cron expression.""" cron_expr = "invalid cron" timezone = "UTC" with pytest.raises(ValueError): calculate_next_run_at(cron_expr, timezone) def test_calculate_next_run_at_invalid_timezone(self): """Test calculating next run time with invalid timezone.""" from pytz import UnknownTimeZoneError cron_expr = "30 10 * * *" timezone = "Invalid/Timezone" with pytest.raises(UnknownTimeZoneError): calculate_next_run_at(cron_expr, timezone) class TestVisualToCron(unittest.TestCase): """Test cases for visual configuration to cron conversion.""" def test_visual_to_cron_hourly(self): """Test converting hourly visual config to cron.""" visual_config = VisualConfig(on_minute=15) result = ScheduleService.visual_to_cron("hourly", visual_config) assert result == "15 * * * *" def test_visual_to_cron_daily(self): """Test converting daily visual config to cron.""" visual_config = VisualConfig(time="2:30 PM") result = ScheduleService.visual_to_cron("daily", visual_config) assert result == "30 14 * * *" def test_visual_to_cron_weekly(self): """Test converting weekly visual config to cron.""" visual_config = VisualConfig( time="10:00 AM", weekdays=["mon", "wed", "fri"], ) result = ScheduleService.visual_to_cron("weekly", visual_config) assert result == "0 10 * * 1,3,5" def test_visual_to_cron_monthly_with_specific_days(self): """Test converting monthly visual config with specific days.""" visual_config = VisualConfig( time="11:30 AM", monthly_days=[1, 15], ) result = ScheduleService.visual_to_cron("monthly", visual_config) assert result == "30 11 1,15 * *" def test_visual_to_cron_monthly_with_last_day(self): """Test converting monthly visual config with last day using 'L' syntax.""" visual_config = VisualConfig( time="11:30 AM", monthly_days=[1, "last"], ) result = ScheduleService.visual_to_cron("monthly", visual_config) assert result == "30 11 1,L * *" def test_visual_to_cron_monthly_only_last_day(self): """Test converting monthly visual config with only last day.""" visual_config = VisualConfig( time="9:00 PM", monthly_days=["last"], ) result = ScheduleService.visual_to_cron("monthly", visual_config) assert result == "0 21 L * *" def test_visual_to_cron_monthly_with_end_days_and_last(self): """Test converting monthly visual config with days 29, 30, 31 and 'last'.""" visual_config = VisualConfig( time="3:45 PM", monthly_days=[29, 30, 31, "last"], ) result = ScheduleService.visual_to_cron("monthly", visual_config) # Should have 29,30,31,L - the L handles all possible last days assert result == "45 15 29,30,31,L * *" def test_visual_to_cron_invalid_frequency(self): """Test converting with invalid frequency.""" with pytest.raises(ScheduleConfigError, match="Unsupported frequency: invalid"): ScheduleService.visual_to_cron("invalid", VisualConfig()) def test_visual_to_cron_weekly_no_weekdays(self): """Test converting weekly with no weekdays specified.""" visual_config = VisualConfig(time="10:00 AM") with pytest.raises(ScheduleConfigError, match="Weekdays are required for weekly schedules"): ScheduleService.visual_to_cron("weekly", visual_config) def test_visual_to_cron_hourly_no_minute(self): """Test converting hourly with no on_minute specified.""" visual_config = VisualConfig() # on_minute defaults to 0 result = ScheduleService.visual_to_cron("hourly", visual_config) assert result == "0 * * * *" # Should use default value 0 def test_visual_to_cron_daily_no_time(self): """Test converting daily with no time specified.""" visual_config = VisualConfig(time=None) with pytest.raises(ScheduleConfigError, match="time is required for daily schedules"): ScheduleService.visual_to_cron("daily", visual_config) def test_visual_to_cron_weekly_no_time(self): """Test converting weekly with no time specified.""" visual_config = VisualConfig(weekdays=["mon"]) visual_config.time = None # Override default with pytest.raises(ScheduleConfigError, match="time is required for weekly schedules"): ScheduleService.visual_to_cron("weekly", visual_config) def test_visual_to_cron_monthly_no_time(self): """Test converting monthly with no time specified.""" visual_config = VisualConfig(monthly_days=[1]) visual_config.time = None # Override default with pytest.raises(ScheduleConfigError, match="time is required for monthly schedules"): ScheduleService.visual_to_cron("monthly", visual_config) def test_visual_to_cron_monthly_duplicate_days(self): """Test monthly with duplicate days should be deduplicated.""" visual_config = VisualConfig( time="10:00 AM", monthly_days=[1, 15, 1, 15, 31], # Duplicates ) result = ScheduleService.visual_to_cron("monthly", visual_config) assert result == "0 10 1,15,31 * *" # Should be deduplicated def test_visual_to_cron_monthly_unsorted_days(self): """Test monthly with unsorted days should be sorted.""" visual_config = VisualConfig( time="2:30 PM", monthly_days=[20, 5, 15, 1, 10], # Unsorted ) result = ScheduleService.visual_to_cron("monthly", visual_config) assert result == "30 14 1,5,10,15,20 * *" # Should be sorted def test_visual_to_cron_weekly_all_weekdays(self): """Test weekly with all weekdays.""" visual_config = VisualConfig( time="8:00 AM", weekdays=["sun", "mon", "tue", "wed", "thu", "fri", "sat"], ) result = ScheduleService.visual_to_cron("weekly", visual_config) assert result == "0 8 * * 0,1,2,3,4,5,6" def test_visual_to_cron_hourly_boundary_values(self): """Test hourly with boundary minute values.""" # Minimum value visual_config = VisualConfig(on_minute=0) result = ScheduleService.visual_to_cron("hourly", visual_config) assert result == "0 * * * *" # Maximum value visual_config = VisualConfig(on_minute=59) result = ScheduleService.visual_to_cron("hourly", visual_config) assert result == "59 * * * *" def test_visual_to_cron_daily_midnight_noon(self): """Test daily at special times (midnight and noon).""" # Midnight visual_config = VisualConfig(time="12:00 AM") result = ScheduleService.visual_to_cron("daily", visual_config) assert result == "0 0 * * *" # Noon visual_config = VisualConfig(time="12:00 PM") result = ScheduleService.visual_to_cron("daily", visual_config) assert result == "0 12 * * *" def test_visual_to_cron_monthly_mixed_with_last_and_duplicates(self): """Test monthly with mixed days, 'last', and duplicates.""" visual_config = VisualConfig( time="11:45 PM", monthly_days=[15, 1, "last", 15, 30, 1, "last"], # Mixed with duplicates ) result = ScheduleService.visual_to_cron("monthly", visual_config) assert result == "45 23 1,15,30,L * *" # Deduplicated and sorted with L at end def test_visual_to_cron_weekly_single_day(self): """Test weekly with single weekday.""" visual_config = VisualConfig( time="6:30 PM", weekdays=["sun"], ) result = ScheduleService.visual_to_cron("weekly", visual_config) assert result == "30 18 * * 0" def test_visual_to_cron_monthly_all_possible_days(self): """Test monthly with all 31 days plus 'last'.""" all_days = list(range(1, 32)) + ["last"] visual_config = VisualConfig( time="12:01 AM", monthly_days=all_days, ) result = ScheduleService.visual_to_cron("monthly", visual_config) expected_days = ",".join([str(i) for i in range(1, 32)]) + ",L" assert result == f"1 0 {expected_days} * *" def test_visual_to_cron_monthly_no_days(self): """Test monthly without any days specified should raise error.""" visual_config = VisualConfig(time="10:00 AM", monthly_days=[]) with pytest.raises(ScheduleConfigError, match="Monthly days are required for monthly schedules"): ScheduleService.visual_to_cron("monthly", visual_config) def test_visual_to_cron_weekly_empty_weekdays_list(self): """Test weekly with empty weekdays list should raise error.""" visual_config = VisualConfig(time="10:00 AM", weekdays=[]) with pytest.raises(ScheduleConfigError, match="Weekdays are required for weekly schedules"): ScheduleService.visual_to_cron("weekly", visual_config) class TestParseTime(unittest.TestCase): """Test cases for time parsing function.""" def test_parse_time_am(self): """Test parsing AM time.""" hour, minute = convert_12h_to_24h("9:30 AM") assert hour == 9 assert minute == 30 def test_parse_time_pm(self): """Test parsing PM time.""" hour, minute = convert_12h_to_24h("2:45 PM") assert hour == 14 assert minute == 45 def test_parse_time_noon(self): """Test parsing 12:00 PM (noon).""" hour, minute = convert_12h_to_24h("12:00 PM") assert hour == 12 assert minute == 0 def test_parse_time_midnight(self): """Test parsing 12:00 AM (midnight).""" hour, minute = convert_12h_to_24h("12:00 AM") assert hour == 0 assert minute == 0 def test_parse_time_invalid_format(self): """Test parsing invalid time format.""" with pytest.raises(ValueError, match="Invalid time format"): convert_12h_to_24h("25:00") def test_parse_time_invalid_hour(self): """Test parsing invalid hour.""" with pytest.raises(ValueError, match="Invalid hour: 13"): convert_12h_to_24h("13:00 PM") def test_parse_time_invalid_minute(self): """Test parsing invalid minute.""" with pytest.raises(ValueError, match="Invalid minute: 60"): convert_12h_to_24h("10:60 AM") def test_parse_time_empty_string(self): """Test parsing empty string.""" with pytest.raises(ValueError, match="Time string cannot be empty"): convert_12h_to_24h("") def test_parse_time_invalid_period(self): """Test parsing invalid period.""" with pytest.raises(ValueError, match="Invalid period"): convert_12h_to_24h("10:30 XM") class TestExtractScheduleConfig(unittest.TestCase): """Test cases for extracting schedule configuration from workflow.""" def test_extract_schedule_config_with_cron_mode(self): """Test extracting schedule config in cron mode.""" workflow = Mock(spec=Workflow) workflow.graph_dict = { "nodes": [ { "id": "schedule-node", "data": { "type": "trigger-schedule", "mode": "cron", "cron_expression": "0 10 * * *", "timezone": "America/New_York", }, } ] } config = ScheduleService.extract_schedule_config(workflow) assert config is not None assert config.node_id == "schedule-node" assert config.cron_expression == "0 10 * * *" assert config.timezone == "America/New_York" def test_extract_schedule_config_with_visual_mode(self): """Test extracting schedule config in visual mode.""" workflow = Mock(spec=Workflow) workflow.graph_dict = { "nodes": [ { "id": "schedule-node", "data": { "type": "trigger-schedule", "mode": "visual", "frequency": "daily", "visual_config": {"time": "10:30 AM"}, "timezone": "UTC", }, } ] } config = ScheduleService.extract_schedule_config(workflow) assert config is not None assert config.node_id == "schedule-node" assert config.cron_expression == "30 10 * * *" assert config.timezone == "UTC" def test_extract_schedule_config_no_schedule_node(self): """Test extracting config when no schedule node exists.""" workflow = Mock(spec=Workflow) workflow.graph_dict = { "nodes": [ { "id": "other-node", "data": {"type": "llm"}, } ] } config = ScheduleService.extract_schedule_config(workflow) assert config is None def test_extract_schedule_config_invalid_graph(self): """Test extracting config with invalid graph data.""" workflow = Mock(spec=Workflow) workflow.graph_dict = None with pytest.raises(ScheduleConfigError, match="Workflow graph is empty"): ScheduleService.extract_schedule_config(workflow) class TestScheduleWithTimezone(unittest.TestCase): """Test cases for schedule with timezone handling.""" def test_visual_schedule_with_timezone_integration(self): """Test complete flow: visual config → cron → execution in different timezones. This test verifies that when a user in Shanghai sets a schedule for 10:30 AM, it runs at 10:30 AM Shanghai time, not 10:30 AM UTC. """ # User in Shanghai wants to run a task at 10:30 AM local time visual_config = VisualConfig( time="10:30 AM", # This is Shanghai time monthly_days=[1], ) # Convert to cron expression cron_expr = ScheduleService.visual_to_cron("monthly", visual_config) assert cron_expr is not None assert cron_expr == "30 10 1 * *" # Direct conversion # Now test execution with Shanghai timezone shanghai_tz = "Asia/Shanghai" # Base time: 2025-01-01 00:00:00 UTC (08:00:00 Shanghai) base_time = datetime(2025, 1, 1, 0, 0, 0, tzinfo=UTC) next_run = calculate_next_run_at(cron_expr, shanghai_tz, base_time) assert next_run is not None # Should run at 10:30 AM Shanghai time on Jan 1 # 10:30 AM Shanghai = 02:30 AM UTC (Shanghai is UTC+8) assert next_run.year == 2025 assert next_run.month == 1 assert next_run.day == 1 assert next_run.hour == 2 # 02:30 UTC assert next_run.minute == 30 def test_visual_schedule_different_timezones_same_local_time(self): """Test that same visual config in different timezones runs at different UTC times. This verifies that a schedule set for "9:00 AM" runs at 9 AM local time regardless of the timezone. """ visual_config = VisualConfig( time="9:00 AM", weekdays=["mon"], ) cron_expr = ScheduleService.visual_to_cron("weekly", visual_config) assert cron_expr is not None assert cron_expr == "0 9 * * 1" # Base time: Sunday 2025-01-05 12:00:00 UTC base_time = datetime(2025, 1, 5, 12, 0, 0, tzinfo=UTC) # Test New York (UTC-5 in January) ny_next = calculate_next_run_at(cron_expr, "America/New_York", base_time) assert ny_next is not None # Monday 9 AM EST = Monday 14:00 UTC assert ny_next.day == 6 assert ny_next.hour == 14 # 9 AM EST = 2 PM UTC # Test Tokyo (UTC+9) tokyo_next = calculate_next_run_at(cron_expr, "Asia/Tokyo", base_time) assert tokyo_next is not None # Monday 9 AM JST = Monday 00:00 UTC assert tokyo_next.day == 6 assert tokyo_next.hour == 0 # 9 AM JST = 0 AM UTC def test_visual_schedule_daily_across_dst_change(self): """Test that daily schedules adjust correctly during DST changes. A schedule set for "10:00 AM" should always run at 10 AM local time, even when DST changes. """ visual_config = VisualConfig( time="10:00 AM", ) cron_expr = ScheduleService.visual_to_cron("daily", visual_config) assert cron_expr is not None assert cron_expr == "0 10 * * *" # Test before DST (EST - UTC-5) winter_base = datetime(2025, 2, 1, 0, 0, 0, tzinfo=UTC) winter_next = calculate_next_run_at(cron_expr, "America/New_York", winter_base) assert winter_next is not None # 10 AM EST = 15:00 UTC assert winter_next.hour == 15 # Test during DST (EDT - UTC-4) summer_base = datetime(2025, 6, 1, 0, 0, 0, tzinfo=UTC) summer_next = calculate_next_run_at(cron_expr, "America/New_York", summer_base) assert summer_next is not None # 10 AM EDT = 14:00 UTC assert summer_next.hour == 14 @pytest.fixture def session_mock() -> MagicMock: return MagicMock(spec=Session) def _workflow(**kwargs: Any) -> Workflow: return cast(Workflow, SimpleNamespace(**kwargs)) def test_to_schedule_config_should_build_from_cron_mode() -> None: # Arrange node_config: dict[str, Any] = { "id": "node-1", "data": { "mode": "cron", "cron_expression": "0 12 * * *", "timezone": "Asia/Kolkata", }, } # Act result = ScheduleService.to_schedule_config(node_config=node_config) # Assert assert result.node_id == "node-1" assert result.cron_expression == "0 12 * * *" assert result.timezone == "Asia/Kolkata" def test_to_schedule_config_should_raise_for_cron_mode_without_expression() -> None: # Arrange node_config = {"id": "node-1", "data": {"mode": "cron", "cron_expression": ""}} # Act / Assert with pytest.raises(ScheduleConfigError, match="Cron expression is required for cron mode"): ScheduleService.to_schedule_config(node_config=node_config) def test_to_schedule_config_should_build_from_visual_mode(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange node_config = { "id": "node-1", "data": { "mode": "visual", "frequency": "daily", "visual_config": {"time": "9:30 AM"}, "timezone": "UTC", }, } monkeypatch.setattr(ScheduleService, "visual_to_cron", MagicMock(return_value="30 9 * * *")) # Act result = ScheduleService.to_schedule_config(node_config=node_config) # Assert assert result.cron_expression == "30 9 * * *" def test_to_schedule_config_should_raise_for_invalid_mode() -> None: # Arrange node_config = {"id": "node-1", "data": {"mode": "manual"}} # Act / Assert with pytest.raises(ScheduleConfigError, match="Invalid schedule mode: manual"): ScheduleService.to_schedule_config(node_config=node_config) def test_extract_schedule_config_should_raise_when_graph_is_empty() -> None: # Arrange workflow = _workflow(graph_dict={}) # Act / Assert with pytest.raises(ScheduleConfigError, match="Workflow graph is empty"): ScheduleService.extract_schedule_config(workflow=workflow) def test_extract_schedule_config_should_raise_when_mode_invalid() -> None: # Arrange workflow = _workflow( graph_dict={ "nodes": [ { "id": "schedule-1", "data": { "type": TRIGGER_SCHEDULE_NODE_TYPE, "mode": "invalid", }, } ] } ) # Act / Assert with pytest.raises(ScheduleConfigError, match="Invalid schedule mode: invalid"): ScheduleService.extract_schedule_config(workflow=workflow) if __name__ == "__main__": unittest.main()