From 6afdde1bc4d1dad2c3cb55c3744d2d7ed2bac6ce Mon Sep 17 00:00:00 2001 From: "yunlu.wen" Date: Fri, 17 Apr 2026 11:58:21 +0800 Subject: [PATCH] remove replay capability --- api/controllers/web/workflow_events.py | 7 ++--- .../app/apps/message_based_app_generator.py | 2 -- api/core/app/apps/message_generator.py | 2 -- api/core/app/apps/streaming_utils.py | 6 +++-- api/libs/broadcast_channel/channel.py | 10 ++----- api/libs/broadcast_channel/redis/channel.py | 2 +- .../redis/sharded_channel.py | 2 +- .../redis/streams_channel.py | 9 +++---- api/services/app_generate_service.py | 3 +-- .../workflow_event_snapshot_service.py | 26 +++++++++++++++++-- .../console/explore/test_message.py | 3 +++ .../core/app/apps/test_streaming_utils.py | 7 +++-- ...kflow_event_snapshot_service_additional.py | 7 +++-- 13 files changed, 50 insertions(+), 36 deletions(-) diff --git a/api/controllers/web/workflow_events.py b/api/controllers/web/workflow_events.py index 4da144d6b1..4d452db691 100644 --- a/api/controllers/web/workflow_events.py +++ b/api/controllers/web/workflow_events.py @@ -81,10 +81,12 @@ class WorkflowEventsApi(WebApiResource): raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}") include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true" - replay = request.args.get("replay", "false").lower() == "true" def _generate_stream_events(): if include_state_snapshot: + # TODO(wylswz): events between shapshot and live tail may be lost. + # TODO(wylswz): previous message chunks are not replayed. In order to support replay, we need + # to figure out a way to deduplicate events between snapshot and stream. return generator.convert_to_event_stream( build_workflow_event_stream( app_mode=app_mode, @@ -92,11 +94,10 @@ class WorkflowEventsApi(WebApiResource): tenant_id=app_model.tenant_id, app_id=app_model.id, session_maker=session_maker, - replay=replay, ) ) return generator.convert_to_event_stream( - msg_generator.retrieve_events(app_mode, workflow_run.id, replay=replay), + msg_generator.retrieve_events(app_mode, workflow_run.id), ) event_generator = _generate_stream_events diff --git a/api/core/app/apps/message_based_app_generator.py b/api/core/app/apps/message_based_app_generator.py index a19829e344..3c0454b7da 100644 --- a/api/core/app/apps/message_based_app_generator.py +++ b/api/core/app/apps/message_based_app_generator.py @@ -313,12 +313,10 @@ class MessageBasedAppGenerator(BaseAppGenerator): workflow_run_id: str, idle_timeout: float = 300, on_subscribe: Callable[[], None] | None = None, - replay: bool = False, ) -> Generator[Mapping | str, None, None]: topic = cls.get_response_topic(app_mode, workflow_run_id) return stream_topic_events( topic=topic, idle_timeout=idle_timeout, on_subscribe=on_subscribe, - replay=replay, ) diff --git a/api/core/app/apps/message_generator.py b/api/core/app/apps/message_generator.py index 155f966d4d..b496e224fd 100644 --- a/api/core/app/apps/message_generator.py +++ b/api/core/app/apps/message_generator.py @@ -26,7 +26,6 @@ class MessageGenerator: idle_timeout: float = 300, ping_interval: float = 10.0, on_subscribe: Callable[[], None] | None = None, - replay: bool = False, ) -> Generator[Mapping | str, None, None]: topic = cls.get_response_topic(app_mode, workflow_run_id) return stream_topic_events( @@ -34,5 +33,4 @@ class MessageGenerator: idle_timeout=idle_timeout, ping_interval=ping_interval, on_subscribe=on_subscribe, - replay=replay, ) diff --git a/api/core/app/apps/streaming_utils.py b/api/core/app/apps/streaming_utils.py index 6b76f6e807..a01a11e1ac 100644 --- a/api/core/app/apps/streaming_utils.py +++ b/api/core/app/apps/streaming_utils.py @@ -17,7 +17,6 @@ def stream_topic_events( ping_interval: float | None = None, on_subscribe: Callable[[], None] | None = None, terminal_events: Iterable[str | StreamEvent] | None = None, - replay: bool = False, ) -> Generator[Mapping[str, Any] | str, None, None]: # send a PING event immediately to prevent the connection staying in pending state for a long time. # @@ -28,7 +27,10 @@ def stream_topic_events( terminal_values = _normalize_terminal_events(terminal_events) last_msg_time = time.time() last_ping_time = last_msg_time - with topic.subscribe(replay=replay) as sub: + # The application layer intentionally does not use broadcast-channel replay; + # callers that need historical events should compose them from persisted state + # (see ``build_workflow_event_stream``) and then tail the live stream. + with topic.subscribe() as sub: # on_subscribe fires only after the Redis subscription is active. # This is used to gate task start and reduce pub/sub race for the first event. if on_subscribe is not None: diff --git a/api/libs/broadcast_channel/channel.py b/api/libs/broadcast_channel/channel.py index ed37b9bb92..660f977cfc 100644 --- a/api/libs/broadcast_channel/channel.py +++ b/api/libs/broadcast_channel/channel.py @@ -92,14 +92,8 @@ class Subscriber(Protocol): """ @abstractmethod - def subscribe(self, *, replay: bool = False) -> Subscription: - """Create a new subscription. - - :param replay: When True and the underlying transport supports message retention - (e.g. Redis Streams), the subscription replays all buffered messages from - the beginning of the stream before switching to live tail. Transports - without retention (plain Pub/Sub) silently ignore this flag. - """ + def subscribe(self) -> Subscription: + """Create a new subscription.""" pass diff --git a/api/libs/broadcast_channel/redis/channel.py b/api/libs/broadcast_channel/redis/channel.py index af59e50a7d..b76a23eb3c 100644 --- a/api/libs/broadcast_channel/redis/channel.py +++ b/api/libs/broadcast_channel/redis/channel.py @@ -44,7 +44,7 @@ class Topic: def as_subscriber(self) -> Subscriber: return self - def subscribe(self, *, replay: bool = False) -> Subscription: + def subscribe(self) -> Subscription: return _RedisSubscription( client=self._client, pubsub=self._client.pubsub(), diff --git a/api/libs/broadcast_channel/redis/sharded_channel.py b/api/libs/broadcast_channel/redis/sharded_channel.py index bfe0bc1b6f..919d8d622e 100644 --- a/api/libs/broadcast_channel/redis/sharded_channel.py +++ b/api/libs/broadcast_channel/redis/sharded_channel.py @@ -42,7 +42,7 @@ class ShardedTopic: def as_subscriber(self) -> Subscriber: return self - def subscribe(self, *, replay: bool = False) -> Subscription: + def subscribe(self) -> Subscription: return _RedisShardedSubscription( client=self._client, pubsub=self._client.pubsub(), diff --git a/api/libs/broadcast_channel/redis/streams_channel.py b/api/libs/broadcast_channel/redis/streams_channel.py index 3a0561370c..7c5ce794d6 100644 --- a/api/libs/broadcast_channel/redis/streams_channel.py +++ b/api/libs/broadcast_channel/redis/streams_channel.py @@ -54,17 +54,16 @@ class StreamsTopic: def as_subscriber(self) -> Subscriber: return self - def subscribe(self, *, replay: bool = False) -> Subscription: - return _StreamsSubscription(self._client, self._key, replay=replay) + def subscribe(self) -> Subscription: + return _StreamsSubscription(self._client, self._key) class _StreamsSubscription(Subscription): _SENTINEL = object() - def __init__(self, client: Redis | RedisCluster, key: str, *, replay: bool = False): + def __init__(self, client: Redis | RedisCluster, key: str): self._client = client self._key = key - self._replay = replay self._queue: queue.Queue[object] = queue.Queue() @@ -91,7 +90,7 @@ class _StreamsSubscription(Subscription): # `"0"` replays all retained entries; `"$"` tails only new messages. # ref: https://redis.io/docs/latest/commands/xread/#the-special--id - last_id = "0" if self._replay else "$" + last_id = "$" try: while True: with self._lock: diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 178de3bb9f..5e8c7aa337 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -416,7 +416,6 @@ class AppGenerateService: cls, app_model: App, workflow_run: WorkflowRun, - replay: bool = False, ): if workflow_run.status.is_ended(): # TODO(QuantumGhost): handled the ended scenario. @@ -425,5 +424,5 @@ class AppGenerateService: generator = AdvancedChatAppGenerator() return generator.convert_to_event_stream( - generator.retrieve_events(AppMode(app_model.mode), workflow_run.id, replay=replay), + generator.retrieve_events(AppMode(app_model.mode), workflow_run.id), ) diff --git a/api/services/workflow_event_snapshot_service.py b/api/services/workflow_event_snapshot_service.py index b133118e8f..b4db6f7cfc 100644 --- a/api/services/workflow_event_snapshot_service.py +++ b/api/services/workflow_event_snapshot_service.py @@ -61,8 +61,30 @@ def build_workflow_event_stream( session_maker: sessionmaker[Session], idle_timeout: float = 300, ping_interval: float = 10.0, - replay: bool = False, ) -> Generator[Mapping[str, Any] | str, None, None]: + """Yield a stream of workflow events composed of a DB-derived snapshot followed by live tail. + + The stream is assembled in two phases that are kept **structurally disjoint** so no + per-event deduplication is required: + + 1. Snapshot phase: events rebuilt from persistent state via ``_build_snapshot_events`` + (``workflow_started``, optional ``message_replace``, ``node_started``/``node_finished`` + for each persisted execution, and an optional terminal ``workflow_paused``). This + represents the history that already happened from the client's point of view. + 2. Tail phase: events delivered by the broadcast subscription **from the moment of + subscription onward only**. Anything that was published before the subscription was + established is intentionally ignored here, because it has already been covered by + the snapshot phase. + + The application layer does not use the broadcast channel's replay capability on any path + (see ``stream_topic_events``). Mixing replay with the snapshot produces events that + overlap along the history axis, and the frontend handlers are not idempotent across the + whole event set (string content is accumulated, ``workflow_paused`` re-opens a new SSE + subscription, iteration/loop starts and file entries are pushed into lists, etc.). A + future feature that wants to recover events published *after* the snapshot was read but + *before* this function subscribed should solve it at the subscription layer + (cursor/position) rather than by re-sending the historical prefix. + """ topic = MessageGenerator.get_response_topic(app_mode, workflow_run.id) workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) node_execution_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(session_maker) @@ -104,7 +126,7 @@ def build_workflow_event_stream( last_msg_time = time.time() last_ping_time = last_msg_time - with topic.subscribe(replay=replay) as sub: + with topic.subscribe() as sub: buffer_state = _start_buffering(sub) try: task_id = _resolve_task_id(resumption_context, buffer_state, workflow_run.id) diff --git a/api/tests/unit_tests/controllers/console/explore/test_message.py b/api/tests/unit_tests/controllers/console/explore/test_message.py index 145cc9cdd7..b8008b3c22 100644 --- a/api/tests/unit_tests/controllers/console/explore/test_message.py +++ b/api/tests/unit_tests/controllers/console/explore/test_message.py @@ -50,6 +50,7 @@ def make_message(): msg.user_feedback = MagicMock(rating=None) msg.status = "normal" msg.error = None + msg.workflow_run_id = "22222222-2222-2222-2222-222222222222" return msg @@ -84,6 +85,8 @@ class TestMessageListApi: assert result["limit"] == 20 assert result["has_more"] is False assert len(result["data"]) == 2 + assert result["data"][0]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222" + assert result["data"][1]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222" def test_get_not_chat_app(self): api = module.MessageListApi() diff --git a/api/tests/unit_tests/core/app/apps/test_streaming_utils.py b/api/tests/unit_tests/core/app/apps/test_streaming_utils.py index 04bd651e52..a7714c56ce 100644 --- a/api/tests/unit_tests/core/app/apps/test_streaming_utils.py +++ b/api/tests/unit_tests/core/app/apps/test_streaming_utils.py @@ -12,11 +12,10 @@ from models.model import AppMode class FakeSubscription: - def __init__(self, message_queue: queue.Queue[bytes], state: dict[str, bool], replay: bool = False) -> None: + def __init__(self, message_queue: queue.Queue[bytes], state: dict[str, bool]) -> None: self._queue = message_queue self._state = state self._closed = False - self._replay = replay def __enter__(self): self._state["subscribed"] = True @@ -44,8 +43,8 @@ class FakeTopic: self._queue: queue.Queue[bytes] = queue.Queue() self._state = {"subscribed": False} - def subscribe(self, replay: bool = False) -> FakeSubscription: - return FakeSubscription(self._queue, self._state, replay) + def subscribe(self) -> FakeSubscription: + return FakeSubscription(self._queue, self._state) def publish(self, payload: bytes) -> None: self._queue.put(payload) diff --git a/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service_additional.py b/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service_additional.py index ad102d54dd..d2634d7d7b 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service_additional.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service_additional.py @@ -96,9 +96,8 @@ class _SessionMaker: class _SubscriptionContext: - def __init__(self, subscription: Any, replay: bool = False) -> None: + def __init__(self, subscription: Any) -> None: self._subscription = subscription - self._replay = replay def __enter__(self) -> Any: return self._subscription @@ -111,8 +110,8 @@ class _Topic: def __init__(self, subscription: Any) -> None: self._subscription = subscription - def subscribe(self, replay: bool = False) -> _SubscriptionContext: - return _SubscriptionContext(self._subscription, replay) + def subscribe(self) -> _SubscriptionContext: + return _SubscriptionContext(self._subscription) class _StaticSubscription: