From b41538d8c782edaa7c0fcbbfda3a91502f3186e9 Mon Sep 17 00:00:00 2001 From: Harry Date: Tue, 28 Oct 2025 23:34:03 +0800 Subject: [PATCH] feat(trigger): reinforcement schedule trigger debugging with cron calculation - Implemented a caching mechanism for schedule trigger debug events using Redis to optimize performance. - Added methods to create and manage schedule debug runtime configurations, including cron expression handling. - Updated the ScheduleTriggerDebugEventPoller to utilize the new caching and event creation logic. - Removed the deprecated build_schedule_pool_key function from event handling. --- api/core/trigger/debug/event_selectors.py | 74 +++++++++++-- api/core/trigger/debug/events.py | 10 -- api/libs/datetime_utils.py | 11 ++ api/services/trigger/schedule_service.py | 31 +++++- api/tasks/workflow_schedule_tasks.py | 36 ------- .../test_trigger_debug_event_selectors.py | 102 ++++++++++++++++++ 6 files changed, 207 insertions(+), 57 deletions(-) create mode 100644 api/tests/unit_tests/core/test_trigger_debug_event_selectors.py diff --git a/api/core/trigger/debug/event_selectors.py b/api/core/trigger/debug/event_selectors.py index bd96157f89..bd1ff4ebfe 100644 --- a/api/core/trigger/debug/event_selectors.py +++ b/api/core/trigger/debug/event_selectors.py @@ -1,8 +1,11 @@ """Trigger debug service supporting plugin and webhook debugging in draft workflows.""" +import hashlib import logging +import time from abc import ABC, abstractmethod from collections.abc import Mapping +from datetime import datetime from typing import Any from pydantic import BaseModel @@ -14,11 +17,14 @@ from core.trigger.debug.events import ( ScheduleDebugEvent, WebhookDebugEvent, build_plugin_pool_key, - build_schedule_pool_key, build_webhook_pool_key, ) from core.workflow.enums import NodeType from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData +from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig +from extensions.ext_redis import redis_client +from libs.datetime_utils import ensure_naive_utc, naive_utc_now +from libs.schedule_utils import calculate_next_run_at from models.model import App from models.provider_ids import TriggerProviderID from models.workflow import Workflow @@ -125,18 +131,66 @@ class WebhookTriggerDebugEventPoller(TriggerDebugEventPoller): class ScheduleTriggerDebugEventPoller(TriggerDebugEventPoller): - def poll(self) -> TriggerDebugEvent | None: - pool_key: str = build_schedule_pool_key(tenant_id=self.tenant_id, app_id=self.app_id, node_id=self.node_id) - schedule_event: ScheduleDebugEvent | None = TriggerDebugEventBus.poll( - event_type=ScheduleDebugEvent, - pool_key=pool_key, - tenant_id=self.tenant_id, - user_id=self.user_id, - app_id=self.app_id, + """ + Poller for schedule trigger debug events. + + This poller will simulate the schedule trigger event by creating a schedule debug runtime cache + and calculating the next run at. + """ + + RUNTIME_CACHE_TTL = 60 * 5 + + class ScheduleDebugRuntime(BaseModel): + cache_key: str + timezone: str + cron_expression: str + next_run_at: datetime + + def schedule_debug_runtime_key(self, cron_hash: str) -> str: + return f"schedule_debug_runtime:{self.tenant_id}:{self.user_id}:{self.app_id}:{self.node_id}:{cron_hash}" + + def get_or_create_schedule_debug_runtime(self): + from services.trigger.schedule_service import ScheduleService + + schedule_config: ScheduleConfig = ScheduleService.to_schedule_config(self.node_config) + cron_hash = hashlib.sha256(schedule_config.cron_expression.encode()).hexdigest() + cache_key = self.schedule_debug_runtime_key(cron_hash) + runtime_cache = redis_client.get(cache_key) + if runtime_cache is None: + schedule_debug_runtime = self.ScheduleDebugRuntime( + cron_expression=schedule_config.cron_expression, + timezone=schedule_config.timezone, + cache_key=cache_key, + next_run_at=ensure_naive_utc( + calculate_next_run_at(schedule_config.cron_expression, schedule_config.timezone) + ), + ) + redis_client.setex( + name=self.schedule_debug_runtime_key(cron_hash), + time=self.RUNTIME_CACHE_TTL, + value=schedule_debug_runtime.model_dump_json(), + ) + return schedule_debug_runtime + else: + redis_client.expire(cache_key, self.RUNTIME_CACHE_TTL) + runtime = self.ScheduleDebugRuntime.model_validate_json(runtime_cache) + runtime.next_run_at = ensure_naive_utc(runtime.next_run_at) + return runtime + + def create_schedule_event(self, schedule_debug_runtime: ScheduleDebugRuntime) -> ScheduleDebugEvent: + redis_client.delete(schedule_debug_runtime.cache_key) + return ScheduleDebugEvent( + timestamp=int(time.time()), node_id=self.node_id, + inputs={}, ) - if not schedule_event: + + def poll(self) -> TriggerDebugEvent | None: + schedule_debug_runtime = self.get_or_create_schedule_debug_runtime() + if schedule_debug_runtime.next_run_at > naive_utc_now(): return None + + schedule_event: ScheduleDebugEvent = self.create_schedule_event(schedule_debug_runtime) workflow_args: Mapping[str, Any] = { "inputs": schedule_event.inputs or {}, "files": [], diff --git a/api/core/trigger/debug/events.py b/api/core/trigger/debug/events.py index 4766ec4c6a..9f7bab5e49 100644 --- a/api/core/trigger/debug/events.py +++ b/api/core/trigger/debug/events.py @@ -26,16 +26,6 @@ class ScheduleDebugEvent(BaseDebugEvent): inputs: Mapping[str, Any] -def build_schedule_pool_key(tenant_id: str, app_id: str, node_id: str) -> str: - """Generate pool key for schedule events. - Args: - tenant_id: Tenant ID - app_id: App ID - node_id: Node ID - """ - return f"{TriggerDebugPoolKey.SCHEDULE}:{tenant_id}:{app_id}:{node_id}" - - class WebhookDebugEvent(BaseDebugEvent): """Debug event for webhook triggers.""" diff --git a/api/libs/datetime_utils.py b/api/libs/datetime_utils.py index e576a34629..83b3351568 100644 --- a/api/libs/datetime_utils.py +++ b/api/libs/datetime_utils.py @@ -20,3 +20,14 @@ def naive_utc_now() -> datetime.datetime: representing current UTC time. """ return _now_func(datetime.UTC).replace(tzinfo=None) + + +def ensure_naive_utc(dt: datetime.datetime) -> datetime.datetime: + """Return the datetime as naive UTC (tzinfo=None). + + If the input is timezone-aware, convert to UTC and drop the tzinfo. + Assumes naive datetimes are already expressed in UTC. + """ + if dt.tzinfo is None: + return dt + return dt.astimezone(datetime.UTC).replace(tzinfo=None) diff --git a/api/services/trigger/schedule_service.py b/api/services/trigger/schedule_service.py index f5e5ec6e01..b0f39bf929 100644 --- a/api/services/trigger/schedule_service.py +++ b/api/services/trigger/schedule_service.py @@ -1,7 +1,8 @@ import json import logging +from collections.abc import Mapping from datetime import datetime -from typing import Optional +from typing import Any, Optional from sqlalchemy import select from sqlalchemy.orm import Session @@ -168,6 +169,34 @@ class ScheduleService: session.flush() return next_run_at + @staticmethod + def to_schedule_config(node_config: Mapping[str, Any]) -> ScheduleConfig: + """ + Converts user-friendly visual schedule settings to cron expression. + Maintains consistency with frontend UI expectations while supporting croniter's extended syntax. + """ + node_data = node_config.get("data", {}) + mode = node_data.get("mode", "visual") + timezone = node_data.get("timezone", "UTC") + node_id = node_config.get("id", "start") + + cron_expression = None + if mode == "cron": + cron_expression = node_data.get("cron_expression") + if not cron_expression: + raise ScheduleConfigError("Cron expression is required for cron mode") + elif mode == "visual": + frequency = str(node_data.get("frequency")) + if not frequency: + raise ScheduleConfigError("Frequency is required for visual mode") + visual_config = VisualConfig(**node_data.get("visual_config", {})) + cron_expression = ScheduleService.visual_to_cron(frequency=frequency, visual_config=visual_config) + if not cron_expression: + raise ScheduleConfigError("Cron expression is required for visual mode") + else: + raise ScheduleConfigError(f"Invalid schedule mode: {mode}") + return ScheduleConfig(node_id=node_id, cron_expression=cron_expression, timezone=timezone) + @staticmethod def extract_schedule_config(workflow: Workflow) -> Optional[ScheduleConfig]: """ diff --git a/api/tasks/workflow_schedule_tasks.py b/api/tasks/workflow_schedule_tasks.py index 3b6ea5dadf..09a8da6f05 100644 --- a/api/tasks/workflow_schedule_tasks.py +++ b/api/tasks/workflow_schedule_tasks.py @@ -1,11 +1,8 @@ import logging -import time from celery import shared_task from sqlalchemy.orm import sessionmaker -from core.trigger.debug.event_bus import TriggerDebugEventBus -from core.trigger.debug.events import ScheduleDebugEvent, build_schedule_pool_key from core.workflow.nodes.trigger_schedule.exc import ( ScheduleExecutionError, ScheduleNotFoundError, @@ -59,39 +56,6 @@ def run_schedule_trigger(schedule_id: str) -> None: ), ) logger.info("Schedule %s triggered workflow: %s", schedule_id, response.workflow_trigger_log_id) - - # Debug dispatch: Send event to waiting debug listeners (if any) - try: - event = ScheduleDebugEvent( - timestamp=int(time.time()), - node_id=schedule.node_id, - inputs=inputs, - ) - pool_key = build_schedule_pool_key( - tenant_id=schedule.tenant_id, - app_id=schedule.app_id, - node_id=schedule.node_id, - ) - dispatched_count = TriggerDebugEventBus.dispatch( - tenant_id=schedule.tenant_id, - event=event, - pool_key=pool_key, - ) - if dispatched_count > 0: - logger.debug( - "Dispatched schedule debug event to %d listener(s) for schedule %s", - dispatched_count, - schedule_id, - ) - except Exception as debug_error: - # Debug dispatch failure should not affect production workflow execution - logger.warning( - "Failed to dispatch debug event for schedule %s: %s", - schedule_id, - str(debug_error), - exc_info=True, - ) - except Exception as e: raise ScheduleExecutionError( f"Failed to trigger workflow for schedule {schedule_id}, app {schedule.app_id}" diff --git a/api/tests/unit_tests/core/test_trigger_debug_event_selectors.py b/api/tests/unit_tests/core/test_trigger_debug_event_selectors.py new file mode 100644 index 0000000000..9c0ac1d234 --- /dev/null +++ b/api/tests/unit_tests/core/test_trigger_debug_event_selectors.py @@ -0,0 +1,102 @@ +import hashlib +import json +from datetime import UTC, datetime + +import pytest +import pytz + +from core.trigger.debug import event_selectors +from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig + + +class _DummyRedis: + def __init__(self): + self.store: dict[str, str] = {} + + def get(self, key: str): + return self.store.get(key) + + def setex(self, name: str, time: int, value: str): + self.store[name] = value + + def expire(self, name: str, ttl: int): + # Expiration not required for these tests. + pass + + def delete(self, name: str): + self.store.pop(name, None) + + +@pytest.fixture +def dummy_schedule_config() -> ScheduleConfig: + return ScheduleConfig( + node_id="node-1", + cron_expression="* * * * *", + timezone="Asia/Shanghai", + ) + + +@pytest.fixture(autouse=True) +def patch_schedule_service(monkeypatch: pytest.MonkeyPatch, dummy_schedule_config: ScheduleConfig): + # Ensure poller always receives the deterministic config. + monkeypatch.setattr( + "services.trigger.schedule_service.ScheduleService.to_schedule_config", + staticmethod(lambda *_args, **_kwargs: dummy_schedule_config), + ) + + +def _make_poller( + monkeypatch: pytest.MonkeyPatch, redis_client: _DummyRedis +) -> event_selectors.ScheduleTriggerDebugEventPoller: + monkeypatch.setattr(event_selectors, "redis_client", redis_client) + return event_selectors.ScheduleTriggerDebugEventPoller( + tenant_id="tenant-1", + user_id="user-1", + app_id="app-1", + node_config={"id": "node-1", "data": {"mode": "cron"}}, + node_id="node-1", + ) + + +def test_schedule_poller_handles_aware_next_run(monkeypatch: pytest.MonkeyPatch): + redis_client = _DummyRedis() + poller = _make_poller(monkeypatch, redis_client) + + base_now = datetime(2025, 1, 1, 12, 0, 0) + aware_next_run = datetime(2025, 1, 1, 12, 0, 5, tzinfo=UTC) + + monkeypatch.setattr(event_selectors, "naive_utc_now", lambda: base_now) + monkeypatch.setattr(event_selectors, "calculate_next_run_at", lambda *_: aware_next_run) + + event = poller.poll() + + assert event is not None + assert event.node_id == "node-1" + assert event.workflow_args["inputs"] == {} + + +def test_schedule_runtime_cache_normalizes_timezone( + monkeypatch: pytest.MonkeyPatch, dummy_schedule_config: ScheduleConfig +): + redis_client = _DummyRedis() + poller = _make_poller(monkeypatch, redis_client) + + localized_time = pytz.timezone("Asia/Shanghai").localize(datetime(2025, 1, 1, 20, 0, 0)) + + cron_hash = hashlib.sha256(dummy_schedule_config.cron_expression.encode()).hexdigest() + cache_key = poller.schedule_debug_runtime_key(cron_hash) + + redis_client.store[cache_key] = json.dumps( + { + "cache_key": cache_key, + "timezone": dummy_schedule_config.timezone, + "cron_expression": dummy_schedule_config.cron_expression, + "next_run_at": localized_time.isoformat(), + } + ) + + runtime = poller.get_or_create_schedule_debug_runtime() + + expected = localized_time.astimezone(UTC).replace(tzinfo=None) + assert runtime.next_run_at == expected + assert runtime.next_run_at.tzinfo is None