From 6aeb217349b1c196f36690c34b4600f0794d180e Mon Sep 17 00:00:00 2001 From: Luca Vivona Date: Tue, 15 Jul 2025 14:59:34 -0400 Subject: [PATCH 1/6] replace: attributes with cache cached_property --- libp2p/peer/id.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/libp2p/peer/id.py b/libp2p/peer/id.py index 0be51ea2..61e399cd 100644 --- a/libp2p/peer/id.py +++ b/libp2p/peer/id.py @@ -3,6 +3,10 @@ import hashlib import base58 import multihash +from functools import ( + cached_property, +) + from libp2p.crypto.keys import ( PublicKey, ) @@ -36,25 +40,23 @@ if ENABLE_INLINING: class ID: _bytes: bytes - _xor_id: int | None = None - _b58_str: str | None = None def __init__(self, peer_id_bytes: bytes) -> None: self._bytes = peer_id_bytes - @property + @cached_property def xor_id(self) -> int: - if not self._xor_id: - self._xor_id = int(sha256_digest(self._bytes).hex(), 16) - return self._xor_id + return int(sha256_digest(self._bytes).hex(), 16) + + @cached_property + def base58(self) -> str: + return base58.b58encode(self._bytes).decode() def to_bytes(self) -> bytes: return self._bytes def to_base58(self) -> str: - if not self._b58_str: - self._b58_str = base58.b58encode(self._bytes).decode() - return self._b58_str + return self.base58 def __repr__(self) -> str: return f"" From 23622ea1a088a39f3ba1fe5539eeb59afd205f5d Mon Sep 17 00:00:00 2001 From: Luca Vivona Date: Tue, 15 Jul 2025 15:28:03 -0400 Subject: [PATCH 2/6] style: enforce consistent import block --- libp2p/peer/id.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/libp2p/peer/id.py b/libp2p/peer/id.py index 61e399cd..28a9d75a 100644 --- a/libp2p/peer/id.py +++ b/libp2p/peer/id.py @@ -1,12 +1,9 @@ +import functools import hashlib import base58 import multihash -from functools import ( - cached_property, -) - from libp2p.crypto.keys import ( PublicKey, ) @@ -44,11 +41,11 @@ class ID: def __init__(self, peer_id_bytes: bytes) -> None: self._bytes = peer_id_bytes - @cached_property + @functools.cached_property def xor_id(self) -> int: return int(sha256_digest(self._bytes).hex(), 16) - @cached_property + @functools.cached_property def base58(self) -> str: return base58.b58encode(self._bytes).decode() From 9f40d97a056d1d493120be1afd204ec6b5f95615 Mon Sep 17 00:00:00 2001 From: Luca Vivona Date: Wed, 16 Jul 2025 22:08:25 -0400 Subject: [PATCH 3/6] chore(newsfragment): add entry to the release notes --- newsfragments/772.internal.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/772.internal.rst diff --git a/newsfragments/772.internal.rst b/newsfragments/772.internal.rst new file mode 100644 index 00000000..7079d6c4 --- /dev/null +++ b/newsfragments/772.internal.rst @@ -0,0 +1 @@ +Replace the libp2p.peer.ID cache attributes with functools.cached_property functional decorator. \ No newline at end of file From ae82895d86fd0992824fd93ea00fc4d8026587aa Mon Sep 17 00:00:00 2001 From: Luca Vivona Date: Wed, 16 Jul 2025 22:12:05 -0400 Subject: [PATCH 4/6] style: add new line within newsfragment --- newsfragments/772.internal.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newsfragments/772.internal.rst b/newsfragments/772.internal.rst index 7079d6c4..2c84641c 100644 --- a/newsfragments/772.internal.rst +++ b/newsfragments/772.internal.rst @@ -1 +1 @@ -Replace the libp2p.peer.ID cache attributes with functools.cached_property functional decorator. \ No newline at end of file +Replace the libp2p.peer.ID cache attributes with functools.cached_property functional decorator. From 35075313448e3038a3d41f52d169eb15e9f2eeea Mon Sep 17 00:00:00 2001 From: Luca Vivona Date: Thu, 17 Jul 2025 22:43:00 -0400 Subject: [PATCH 5/6] chore: clarify newline requirement in newsfragments README.md (#775) * chore: clarify newline requirement in README Small change in newsfragments README.md, that reduces some possible room for pull-request tox workflow errors. * style: remove double backticks for single backticks the linter strikes again XD. * docs: clarify trailing newline requirement in newsfragments for lint checks --------- Co-authored-by: Manu Sheel Gupta --- newsfragments/775.docs.rst | 1 + newsfragments/README.md | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 newsfragments/775.docs.rst diff --git a/newsfragments/775.docs.rst b/newsfragments/775.docs.rst new file mode 100644 index 00000000..300b27ca --- /dev/null +++ b/newsfragments/775.docs.rst @@ -0,0 +1 @@ +Clarified the requirement for a trailing newline in newsfragments to pass lint checks. diff --git a/newsfragments/README.md b/newsfragments/README.md index 177d6492..4b54df7c 100644 --- a/newsfragments/README.md +++ b/newsfragments/README.md @@ -18,12 +18,19 @@ Each file should be named like `..rst`, where - `performance` - `removal` -So for example: `123.feature.rst`, `456.bugfix.rst` +So for example: `1024.feature.rst` + +**Important**: Ensure the file ends with a newline character (`\n`) to pass GitHub tox linting checks. + +``` +Added support for Ed25519 key generation in libp2p peer identity creation. + +``` If the PR fixes an issue, use that number here. If there is no issue, then open up the PR first and use the PR number for the newsfragment. -Note that the `towncrier` tool will automatically +**Note** that the `towncrier` tool will automatically reflow your text, so don't try to do any fancy formatting. Run `towncrier build --draft` to get a preview of what the release notes entry will look like in the final release notes. From 11560f5cc95baad057eedca2830a1fdc9b95d353 Mon Sep 17 00:00:00 2001 From: Abhinav Agarwalla <120122716+lla-dane@users.noreply.github.com> Date: Fri, 18 Jul 2025 17:31:28 +0530 Subject: [PATCH 6/6] TODO: throttle on async validators (#755) * fixed todo: throttle on async validators * added test: validate message respects concurrency limit * added newsfragment * added configurable validator semaphore in the PubSub constructor * added the concurrency-checker in the original test-validate-msg test case * separate out a _run_async_validator function * remove redundant run_async_validator --- libp2p/pubsub/pubsub.py | 39 ++++++++++++++++++------ newsfragments/755.performance.rst | 2 ++ tests/core/pubsub/test_pubsub.py | 50 +++++++++++++++++++++++++++---- 3 files changed, 77 insertions(+), 14 deletions(-) create mode 100644 newsfragments/755.performance.rst diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index a913c721..5641ec5d 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -102,6 +102,9 @@ class TopicValidator(NamedTuple): is_async: bool +MAX_CONCURRENT_VALIDATORS = 10 + + class Pubsub(Service, IPubsub): host: IHost @@ -109,6 +112,7 @@ class Pubsub(Service, IPubsub): peer_receive_channel: trio.MemoryReceiveChannel[ID] dead_peer_receive_channel: trio.MemoryReceiveChannel[ID] + _validator_semaphore: trio.Semaphore seen_messages: LastSeenCache @@ -143,6 +147,7 @@ class Pubsub(Service, IPubsub): msg_id_constructor: Callable[ [rpc_pb2.Message], bytes ] = get_peer_and_seqno_msg_id, + max_concurrent_validator_count: int = MAX_CONCURRENT_VALIDATORS, ) -> None: """ Construct a new Pubsub object, which is responsible for handling all @@ -168,6 +173,7 @@ class Pubsub(Service, IPubsub): # Therefore, we can only close from the receive side. self.peer_receive_channel = peer_receive self.dead_peer_receive_channel = dead_peer_receive + self._validator_semaphore = trio.Semaphore(max_concurrent_validator_count) # Register a notifee self.host.get_network().register_notifee( PubsubNotifee(peer_send, dead_peer_send) @@ -657,7 +663,11 @@ class Pubsub(Service, IPubsub): logger.debug("successfully published message %s", msg) - async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: + async def validate_msg( + self, + msg_forwarder: ID, + msg: rpc_pb2.Message, + ) -> None: """ Validate the received message. @@ -680,23 +690,34 @@ class Pubsub(Service, IPubsub): if not validator(msg_forwarder, msg): raise ValidationError(f"Validation failed for msg={msg}") - # TODO: Implement throttle on async validators - if len(async_topic_validators) > 0: # Appends to lists are thread safe in CPython - results = [] - - async def run_async_validator(func: AsyncValidatorFn) -> None: - result = await func(msg_forwarder, msg) - results.append(result) + results: list[bool] = [] async with trio.open_nursery() as nursery: for async_validator in async_topic_validators: - nursery.start_soon(run_async_validator, async_validator) + nursery.start_soon( + self._run_async_validator, + async_validator, + msg_forwarder, + msg, + results, + ) if not all(results): raise ValidationError(f"Validation failed for msg={msg}") + async def _run_async_validator( + self, + func: AsyncValidatorFn, + msg_forwarder: ID, + msg: rpc_pb2.Message, + results: list[bool], + ) -> None: + async with self._validator_semaphore: + result = await func(msg_forwarder, msg) + results.append(result) + async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: """ Push a pubsub message to others. diff --git a/newsfragments/755.performance.rst b/newsfragments/755.performance.rst new file mode 100644 index 00000000..386e661b --- /dev/null +++ b/newsfragments/755.performance.rst @@ -0,0 +1,2 @@ +Added throttling for async topic validators in validate_msg, enforcing a +concurrency limit to prevent resource exhaustion under heavy load. diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index 81389ed1..e674dbc0 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -5,10 +5,12 @@ import inspect from typing import ( NamedTuple, ) +from unittest.mock import patch import pytest import trio +from libp2p.custom_types import AsyncValidatorFn from libp2p.exceptions import ( ValidationError, ) @@ -243,7 +245,37 @@ async def test_get_msg_validators(): ((False, True), (True, False), (True, True)), ) @pytest.mark.trio -async def test_validate_msg(is_topic_1_val_passed, is_topic_2_val_passed): +async def test_validate_msg_with_throttle_condition( + is_topic_1_val_passed, is_topic_2_val_passed +): + CONCURRENCY_LIMIT = 10 + + state = { + "concurrency_counter": 0, + "max_observed": 0, + } + lock = trio.Lock() + + async def mock_run_async_validator( + self, + func: AsyncValidatorFn, + msg_forwarder: ID, + msg: rpc_pb2.Message, + results: list[bool], + ) -> None: + async with self._validator_semaphore: + async with lock: + state["concurrency_counter"] += 1 + if state["concurrency_counter"] > state["max_observed"]: + state["max_observed"] = state["concurrency_counter"] + + try: + result = await func(msg_forwarder, msg) + results.append(result) + finally: + async with lock: + state["concurrency_counter"] -= 1 + async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub: def passed_sync_validator(peer_id: ID, msg: rpc_pb2.Message) -> bool: @@ -280,11 +312,19 @@ async def test_validate_msg(is_topic_1_val_passed, is_topic_2_val_passed): seqno=b"\x00" * 8, ) - if is_topic_1_val_passed and is_topic_2_val_passed: - await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg) - else: - with pytest.raises(ValidationError): + with patch( + "libp2p.pubsub.pubsub.Pubsub._run_async_validator", + new=mock_run_async_validator, + ): + if is_topic_1_val_passed and is_topic_2_val_passed: await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg) + else: + with pytest.raises(ValidationError): + await pubsubs_fsub[0].validate_msg(pubsubs_fsub[0].my_id, msg) + + assert state["max_observed"] <= CONCURRENCY_LIMIT, ( + f"Max concurrency observed: {state['max_observed']}" + ) @pytest.mark.trio