mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
fix: improve async validator handling in Pubsub class
Signed-off-by: varun-r-mallya <varunrmallya@gmail.com>
This commit is contained in:
@ -682,19 +682,18 @@ class Pubsub(Service, IPubsub):
|
||||
# TODO: Implement throttle on async validators
|
||||
|
||||
if len(async_topic_validators) > 0:
|
||||
# TODO: Use a better pattern
|
||||
final_result: bool = True
|
||||
# Appends to lists are thread safe in CPython
|
||||
results = []
|
||||
|
||||
async def run_async_validator(func: AsyncValidatorFn) -> None:
|
||||
nonlocal final_result
|
||||
result = await func(msg_forwarder, msg)
|
||||
final_result = final_result and result
|
||||
results.append(result)
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
for async_validator in async_topic_validators:
|
||||
nursery.start_soon(run_async_validator, async_validator)
|
||||
|
||||
if not final_result:
|
||||
if not all(results):
|
||||
raise ValidationError(f"Validation failed for msg={msg}")
|
||||
|
||||
async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
|
||||
|
||||
1
newsfragments/702.bugfix.rst
Normal file
1
newsfragments/702.bugfix.rst
Normal file
@ -0,0 +1 @@
|
||||
Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior.
|
||||
Reference in New Issue
Block a user