From 211e951678ba73a7f49c30839e38503a8b2b5707 Mon Sep 17 00:00:00 2001 From: varunrmallya <100590632+varun-r-mallya@users.noreply.github.com> Date: Sun, 29 Jun 2025 16:02:00 +0530 Subject: [PATCH] fix: improve async validator handling in Pubsub class (#705) Signed-off-by: varun-r-mallya --- libp2p/pubsub/pubsub.py | 9 ++++----- newsfragments/702.bugfix.rst | 1 + 2 files changed, 5 insertions(+), 5 deletions(-) create mode 100644 newsfragments/702.bugfix.rst diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 8ba7d471..78c2fca5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -682,19 +682,18 @@ class Pubsub(Service, IPubsub): # TODO: Implement throttle on async validators if len(async_topic_validators) > 0: - # TODO: Use a better pattern - final_result: bool = True + # Appends to lists are thread safe in CPython + results = [] async def run_async_validator(func: AsyncValidatorFn) -> None: - nonlocal final_result result = await func(msg_forwarder, msg) - final_result = final_result and result + results.append(result) async with trio.open_nursery() as nursery: for async_validator in async_topic_validators: nursery.start_soon(run_async_validator, async_validator) - if not final_result: + if not all(results): raise ValidationError(f"Validation failed for msg={msg}") async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: diff --git a/newsfragments/702.bugfix.rst b/newsfragments/702.bugfix.rst new file mode 100644 index 00000000..90f91f88 --- /dev/null +++ b/newsfragments/702.bugfix.rst @@ -0,0 +1 @@ +Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior.