From 8b2268fcc93d3021d73f026a5b6cb2a1a3c5b71b Mon Sep 17 00:00:00 2001 From: varun-r-mallya Date: Thu, 26 Jun 2025 19:52:35 +0530 Subject: [PATCH] fix: improve async validator handling in Pubsub class Signed-off-by: varun-r-mallya --- libp2p/pubsub/pubsub.py | 9 ++++----- newsfragments/702.bugfix.rst | 1 + 2 files changed, 5 insertions(+), 5 deletions(-) create mode 100644 newsfragments/702.bugfix.rst diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 8ba7d471..78c2fca5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -682,19 +682,18 @@ class Pubsub(Service, IPubsub): # TODO: Implement throttle on async validators if len(async_topic_validators) > 0: - # TODO: Use a better pattern - final_result: bool = True + # Appends to lists are thread safe in CPython + results = [] async def run_async_validator(func: AsyncValidatorFn) -> None: - nonlocal final_result result = await func(msg_forwarder, msg) - final_result = final_result and result + results.append(result) async with trio.open_nursery() as nursery: for async_validator in async_topic_validators: nursery.start_soon(run_async_validator, async_validator) - if not final_result: + if not all(results): raise ValidationError(f"Validation failed for msg={msg}") async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: diff --git a/newsfragments/702.bugfix.rst b/newsfragments/702.bugfix.rst new file mode 100644 index 00000000..90f91f88 --- /dev/null +++ b/newsfragments/702.bugfix.rst @@ -0,0 +1 @@ +Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior.