remove replay capability

This commit is contained in:
yunlu.wen 2026-04-17 11:58:21 +08:00
parent c294006ecf
commit 6afdde1bc4
13 changed files with 50 additions and 36 deletions

View File

@ -81,10 +81,12 @@ class WorkflowEventsApi(WebApiResource):
raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}")
include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true"
replay = request.args.get("replay", "false").lower() == "true"
def _generate_stream_events():
if include_state_snapshot:
# TODO(wylswz): events between shapshot and live tail may be lost.
# TODO(wylswz): previous message chunks are not replayed. In order to support replay, we need
# to figure out a way to deduplicate events between snapshot and stream.
return generator.convert_to_event_stream(
build_workflow_event_stream(
app_mode=app_mode,
@ -92,11 +94,10 @@ class WorkflowEventsApi(WebApiResource):
tenant_id=app_model.tenant_id,
app_id=app_model.id,
session_maker=session_maker,
replay=replay,
)
)
return generator.convert_to_event_stream(
msg_generator.retrieve_events(app_mode, workflow_run.id, replay=replay),
msg_generator.retrieve_events(app_mode, workflow_run.id),
)
event_generator = _generate_stream_events

View File

@ -313,12 +313,10 @@ class MessageBasedAppGenerator(BaseAppGenerator):
workflow_run_id: str,
idle_timeout: float = 300,
on_subscribe: Callable[[], None] | None = None,
replay: bool = False,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)
return stream_topic_events(
topic=topic,
idle_timeout=idle_timeout,
on_subscribe=on_subscribe,
replay=replay,
)

View File

@ -26,7 +26,6 @@ class MessageGenerator:
idle_timeout: float = 300,
ping_interval: float = 10.0,
on_subscribe: Callable[[], None] | None = None,
replay: bool = False,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)
return stream_topic_events(
@ -34,5 +33,4 @@ class MessageGenerator:
idle_timeout=idle_timeout,
ping_interval=ping_interval,
on_subscribe=on_subscribe,
replay=replay,
)

View File

@ -17,7 +17,6 @@ def stream_topic_events(
ping_interval: float | None = None,
on_subscribe: Callable[[], None] | None = None,
terminal_events: Iterable[str | StreamEvent] | None = None,
replay: bool = False,
) -> Generator[Mapping[str, Any] | str, None, None]:
# send a PING event immediately to prevent the connection staying in pending state for a long time.
#
@ -28,7 +27,10 @@ def stream_topic_events(
terminal_values = _normalize_terminal_events(terminal_events)
last_msg_time = time.time()
last_ping_time = last_msg_time
with topic.subscribe(replay=replay) as sub:
# The application layer intentionally does not use broadcast-channel replay;
# callers that need historical events should compose them from persisted state
# (see ``build_workflow_event_stream``) and then tail the live stream.
with topic.subscribe() as sub:
# on_subscribe fires only after the Redis subscription is active.
# This is used to gate task start and reduce pub/sub race for the first event.
if on_subscribe is not None:

View File

@ -92,14 +92,8 @@ class Subscriber(Protocol):
"""
@abstractmethod
def subscribe(self, *, replay: bool = False) -> Subscription:
"""Create a new subscription.
:param replay: When True and the underlying transport supports message retention
(e.g. Redis Streams), the subscription replays all buffered messages from
the beginning of the stream before switching to live tail. Transports
without retention (plain Pub/Sub) silently ignore this flag.
"""
def subscribe(self) -> Subscription:
"""Create a new subscription."""
pass

View File

@ -44,7 +44,7 @@ class Topic:
def as_subscriber(self) -> Subscriber:
return self
def subscribe(self, *, replay: bool = False) -> Subscription:
def subscribe(self) -> Subscription:
return _RedisSubscription(
client=self._client,
pubsub=self._client.pubsub(),

View File

@ -42,7 +42,7 @@ class ShardedTopic:
def as_subscriber(self) -> Subscriber:
return self
def subscribe(self, *, replay: bool = False) -> Subscription:
def subscribe(self) -> Subscription:
return _RedisShardedSubscription(
client=self._client,
pubsub=self._client.pubsub(),

View File

@ -54,17 +54,16 @@ class StreamsTopic:
def as_subscriber(self) -> Subscriber:
return self
def subscribe(self, *, replay: bool = False) -> Subscription:
return _StreamsSubscription(self._client, self._key, replay=replay)
def subscribe(self) -> Subscription:
return _StreamsSubscription(self._client, self._key)
class _StreamsSubscription(Subscription):
_SENTINEL = object()
def __init__(self, client: Redis | RedisCluster, key: str, *, replay: bool = False):
def __init__(self, client: Redis | RedisCluster, key: str):
self._client = client
self._key = key
self._replay = replay
self._queue: queue.Queue[object] = queue.Queue()
@ -91,7 +90,7 @@ class _StreamsSubscription(Subscription):
# `"0"` replays all retained entries; `"$"` tails only new messages.
# ref: https://redis.io/docs/latest/commands/xread/#the-special--id
last_id = "0" if self._replay else "$"
last_id = "$"
try:
while True:
with self._lock:

View File

@ -416,7 +416,6 @@ class AppGenerateService:
cls,
app_model: App,
workflow_run: WorkflowRun,
replay: bool = False,
):
if workflow_run.status.is_ended():
# TODO(QuantumGhost): handled the ended scenario.
@ -425,5 +424,5 @@ class AppGenerateService:
generator = AdvancedChatAppGenerator()
return generator.convert_to_event_stream(
generator.retrieve_events(AppMode(app_model.mode), workflow_run.id, replay=replay),
generator.retrieve_events(AppMode(app_model.mode), workflow_run.id),
)

View File

@ -61,8 +61,30 @@ def build_workflow_event_stream(
session_maker: sessionmaker[Session],
idle_timeout: float = 300,
ping_interval: float = 10.0,
replay: bool = False,
) -> Generator[Mapping[str, Any] | str, None, None]:
"""Yield a stream of workflow events composed of a DB-derived snapshot followed by live tail.
The stream is assembled in two phases that are kept **structurally disjoint** so no
per-event deduplication is required:
1. Snapshot phase: events rebuilt from persistent state via ``_build_snapshot_events``
(``workflow_started``, optional ``message_replace``, ``node_started``/``node_finished``
for each persisted execution, and an optional terminal ``workflow_paused``). This
represents the history that already happened from the client's point of view.
2. Tail phase: events delivered by the broadcast subscription **from the moment of
subscription onward only**. Anything that was published before the subscription was
established is intentionally ignored here, because it has already been covered by
the snapshot phase.
The application layer does not use the broadcast channel's replay capability on any path
(see ``stream_topic_events``). Mixing replay with the snapshot produces events that
overlap along the history axis, and the frontend handlers are not idempotent across the
whole event set (string content is accumulated, ``workflow_paused`` re-opens a new SSE
subscription, iteration/loop starts and file entries are pushed into lists, etc.). A
future feature that wants to recover events published *after* the snapshot was read but
*before* this function subscribed should solve it at the subscription layer
(cursor/position) rather than by re-sending the historical prefix.
"""
topic = MessageGenerator.get_response_topic(app_mode, workflow_run.id)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
node_execution_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(session_maker)
@ -104,7 +126,7 @@ def build_workflow_event_stream(
last_msg_time = time.time()
last_ping_time = last_msg_time
with topic.subscribe(replay=replay) as sub:
with topic.subscribe() as sub:
buffer_state = _start_buffering(sub)
try:
task_id = _resolve_task_id(resumption_context, buffer_state, workflow_run.id)

View File

@ -50,6 +50,7 @@ def make_message():
msg.user_feedback = MagicMock(rating=None)
msg.status = "normal"
msg.error = None
msg.workflow_run_id = "22222222-2222-2222-2222-222222222222"
return msg
@ -84,6 +85,8 @@ class TestMessageListApi:
assert result["limit"] == 20
assert result["has_more"] is False
assert len(result["data"]) == 2
assert result["data"][0]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222"
assert result["data"][1]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222"
def test_get_not_chat_app(self):
api = module.MessageListApi()

View File

@ -12,11 +12,10 @@ from models.model import AppMode
class FakeSubscription:
def __init__(self, message_queue: queue.Queue[bytes], state: dict[str, bool], replay: bool = False) -> None:
def __init__(self, message_queue: queue.Queue[bytes], state: dict[str, bool]) -> None:
self._queue = message_queue
self._state = state
self._closed = False
self._replay = replay
def __enter__(self):
self._state["subscribed"] = True
@ -44,8 +43,8 @@ class FakeTopic:
self._queue: queue.Queue[bytes] = queue.Queue()
self._state = {"subscribed": False}
def subscribe(self, replay: bool = False) -> FakeSubscription:
return FakeSubscription(self._queue, self._state, replay)
def subscribe(self) -> FakeSubscription:
return FakeSubscription(self._queue, self._state)
def publish(self, payload: bytes) -> None:
self._queue.put(payload)

View File

@ -96,9 +96,8 @@ class _SessionMaker:
class _SubscriptionContext:
def __init__(self, subscription: Any, replay: bool = False) -> None:
def __init__(self, subscription: Any) -> None:
self._subscription = subscription
self._replay = replay
def __enter__(self) -> Any:
return self._subscription
@ -111,8 +110,8 @@ class _Topic:
def __init__(self, subscription: Any) -> None:
self._subscription = subscription
def subscribe(self, replay: bool = False) -> _SubscriptionContext:
return _SubscriptionContext(self._subscription, replay)
def subscribe(self) -> _SubscriptionContext:
return _SubscriptionContext(self._subscription)
class _StaticSubscription: