diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 8ba7d471..78c2fca5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -682,19 +682,18 @@ class Pubsub(Service, IPubsub): # TODO: Implement throttle on async validators if len(async_topic_validators) > 0: - # TODO: Use a better pattern - final_result: bool = True + # Appends to lists are thread safe in CPython + results = [] async def run_async_validator(func: AsyncValidatorFn) -> None: - nonlocal final_result result = await func(msg_forwarder, msg) - final_result = final_result and result + results.append(result) async with trio.open_nursery() as nursery: for async_validator in async_topic_validators: nursery.start_soon(run_async_validator, async_validator) - if not final_result: + if not all(results): raise ValidationError(f"Validation failed for msg={msg}") async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: diff --git a/newsfragments/702.bugfix.rst b/newsfragments/702.bugfix.rst new file mode 100644 index 00000000..90f91f88 --- /dev/null +++ b/newsfragments/702.bugfix.rst @@ -0,0 +1 @@ +Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior.