diff --git a/api/tests/test_containers_integration_tests/libs/broadcast_channel/redis/test_sharded_channel.py b/api/tests/test_containers_integration_tests/libs/broadcast_channel/redis/test_sharded_channel.py index af60adf1fb..d612e70910 100644 --- a/api/tests/test_containers_integration_tests/libs/broadcast_channel/redis/test_sharded_channel.py +++ b/api/tests/test_containers_integration_tests/libs/broadcast_channel/redis/test_sharded_channel.py @@ -113,16 +113,31 @@ class TestShardedRedisBroadcastChannelIntegration: topic = broadcast_channel.topic(topic_name) producer = topic.as_producer() subscriptions = [topic.subscribe() for _ in range(subscriber_count)] + ready_events = [threading.Event() for _ in range(subscriber_count)] def producer_thread(): - time.sleep(0.2) # Allow all subscribers to connect + deadline = time.time() + 5.0 + for ev in ready_events: + remaining = deadline - time.time() + if remaining <= 0: + break + if not ev.wait(timeout=max(0.0, remaining)): + pytest.fail("subscriber did not become ready before publish deadline") producer.publish(message) time.sleep(0.2) for sub in subscriptions: sub.close() - def consumer_thread(subscription: Subscription) -> list[bytes]: + def consumer_thread(subscription: Subscription, ready_event: threading.Event) -> list[bytes]: received_msgs = [] + # Prime subscription so the underlying Pub/Sub listener thread starts before publishing + try: + _ = subscription.receive(0.01) + except SubscriptionClosedError: + return received_msgs + finally: + ready_event.set() + while True: try: msg = subscription.receive(0.1) @@ -137,7 +152,10 @@ class TestShardedRedisBroadcastChannelIntegration: with ThreadPoolExecutor(max_workers=subscriber_count + 1) as executor: producer_future = executor.submit(producer_thread) - consumer_futures = [executor.submit(consumer_thread, subscription) for subscription in subscriptions] + consumer_futures = [ + executor.submit(consumer_thread, subscription, ready_events[idx]) + for idx, subscription in enumerate(subscriptions) + ] producer_future.result(timeout=10.0) msgs_by_consumers = []