From 31b6a6f237f5dfe7f7ee73ca134965ccb23dec37 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Fri, 27 Jun 2025 14:33:49 +0530 Subject: [PATCH 1/6] todo/bounded nursery in identify-push --- libp2p/identity/identify_push/identify_push.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 914264ed..77a65187 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -190,4 +190,4 @@ async def push_identify_to_peers( # limiting concurrent connections to 10 async with trio.open_nursery() as nursery: for peer_id in peer_ids: - nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr) + nursery.start_soon(limited_push, peer_id) From a89ba8ef81e98be2a7c2c9ab8def271c6acf2bb5 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Fri, 27 Jun 2025 14:50:10 +0530 Subject: [PATCH 2/6] added newsfragment --- newsfragments/708.performance.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 newsfragments/708.performance.rst diff --git a/newsfragments/708.performance.rst b/newsfragments/708.performance.rst new file mode 100644 index 00000000..551a70e4 --- /dev/null +++ b/newsfragments/708.performance.rst @@ -0,0 +1,2 @@ +Limit concurrency in `push_identify_to_peers` to prevent resource exhaustion under high peer counts. +This makes peer communication more stable and efficient, especially at scale. From 383d7cb72211ed3cde415f8c5d7b4cecb7171d5e Mon Sep 17 00:00:00 2001 From: lla-dane Date: Wed, 2 Jul 2025 19:43:31 +0530 Subject: [PATCH 3/6] added tests --- libp2p/identity/identify_push/identify_push.py | 6 +++++- tests/core/identity/identify_push/test_identify_push.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 77a65187..cf39fd76 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -176,7 +176,9 @@ async def push_identify_to_peers( host: IHost, peer_ids: set[ID] | None = None, observed_multiaddr: Multiaddr | None = None, -) -> None: + counter: dict[str, int] | None = None, + lock: trio.Lock | None = None, +) -> int: # <-- return the max concurrency """ Push an identify message to multiple peers in parallel. @@ -191,3 +193,5 @@ async def push_identify_to_peers( async with trio.open_nursery() as nursery: for peer_id in peer_ids: nursery.start_soon(limited_push, peer_id) + + return counter["max"] if counter else 0 diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index b0ffb677..292d1948 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -10,6 +10,7 @@ import trio from libp2p import ( new_host, ) +from libp2p.abc import IHost from libp2p.crypto.secp256k1 import ( create_new_key_pair, ) From 8bfd4bde948edc6ad04f47cc028faa8f863ab2a3 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Thu, 3 Jul 2025 11:53:58 +0530 Subject: [PATCH 4/6] created concurrency limit configurable --- libp2p/identity/identify_push/identify_push.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index cf39fd76..e9e979af 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -178,6 +178,7 @@ async def push_identify_to_peers( observed_multiaddr: Multiaddr | None = None, counter: dict[str, int] | None = None, lock: trio.Lock | None = None, + limit: int = CONCURRENCY_LIMIT, ) -> int: # <-- return the max concurrency """ Push an identify message to multiple peers in parallel. From a7d122a0f944c15fc377b28911a15cbc249ac5b8 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Fri, 4 Jul 2025 17:28:44 +0530 Subject: [PATCH 5/6] added extra tests for identifu push for concurrency cap --- .../identity/identify_push/identify_push.py | 9 +- .../identify_push/test_identify_push.py | 91 ++++++++++++++++++- tests/utils/utils.py | 21 ++++- 3 files changed, 111 insertions(+), 10 deletions(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index e9e979af..914264ed 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -176,10 +176,7 @@ async def push_identify_to_peers( host: IHost, peer_ids: set[ID] | None = None, observed_multiaddr: Multiaddr | None = None, - counter: dict[str, int] | None = None, - lock: trio.Lock | None = None, - limit: int = CONCURRENCY_LIMIT, -) -> int: # <-- return the max concurrency +) -> None: """ Push an identify message to multiple peers in parallel. @@ -193,6 +190,4 @@ async def push_identify_to_peers( # limiting concurrent connections to 10 async with trio.open_nursery() as nursery: for peer_id in peer_ids: - nursery.start_soon(limited_push, peer_id) - - return counter["max"] if counter else 0 + nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr) diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index 292d1948..935fb2c0 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -10,7 +10,6 @@ import trio from libp2p import ( new_host, ) -from libp2p.abc import IHost from libp2p.crypto.secp256k1 import ( create_new_key_pair, ) @@ -36,6 +35,8 @@ from tests.utils.factories import ( ) from tests.utils.utils import ( create_mock_connections, + run_host_forever, + wait_until_listening, ) logger = logging.getLogger("libp2p.identity.identify-push-test") @@ -504,3 +505,91 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): assert state["max_observed"] <= CONCURRENCY_LIMIT, ( f"Max concurrency observed: {state['max_observed']}" ) + + +@pytest.mark.trio +async def test_all_peers_receive_identify_push_with_semaphore(security_protocol): + dummy_peers = [] + + async with host_pair_factory(security_protocol=security_protocol) as (host_a, _): + # Create dummy peers + for _ in range(50): + key_pair = create_new_key_pair() + dummy_host = new_host(key_pair=key_pair) + dummy_host.set_stream_handler( + ID_PUSH, identify_push_handler_for(dummy_host) + ) + listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") + dummy_peers.append((dummy_host, listen_addr)) + + async with trio.open_nursery() as nursery: + # Start all dummy hosts + for host, listen_addr in dummy_peers: + nursery.start_soon(run_host_forever, host, listen_addr) + + # Wait for all hosts to finish setting up listeners + for host, _ in dummy_peers: + await wait_until_listening(host) + + # Now connect host_a → dummy peers + for host, _ in dummy_peers: + await host_a.connect(info_from_p2p_addr(host.get_addrs()[0])) + + await push_identify_to_peers( + host_a, + ) + + await trio.sleep(0.5) + + peer_id_a = host_a.get_id() + for host, _ in dummy_peers: + dummy_peerstore = host.get_peerstore() + assert peer_id_a in dummy_peerstore.peer_ids() + + nursery.cancel_scope.cancel() + + +@pytest.mark.trio +async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_load( + security_protocol, +): + dummy_peers = [] + + async with host_pair_factory(security_protocol=security_protocol) as (host_a, _): + # Create dummy peers + # Breaking with more than 500 peers + # Trio have a async tasks limit of 1000 + for _ in range(499): + key_pair = create_new_key_pair() + dummy_host = new_host(key_pair=key_pair) + dummy_host.set_stream_handler( + ID_PUSH, identify_push_handler_for(dummy_host) + ) + listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") + dummy_peers.append((dummy_host, listen_addr)) + + async with trio.open_nursery() as nursery: + # Start all dummy hosts + for host, listen_addr in dummy_peers: + nursery.start_soon(run_host_forever, host, listen_addr) + + # Wait for all hosts to finish setting up listeners + for host, _ in dummy_peers: + await wait_until_listening(host) + + # Now connect host_a → dummy peers + for host, _ in dummy_peers: + await host_a.connect(info_from_p2p_addr(host.get_addrs()[0])) + + await push_identify_to_peers( + host_a, + ) + + await trio.sleep(0.5) + + peer_id_a = host_a.get_id() + for host, _ in dummy_peers: + dummy_peerstore = host.get_peerstore() + assert peer_id_a in dummy_peerstore.peer_ids() + + nursery.cancel_scope.cancel() diff --git a/tests/utils/utils.py b/tests/utils/utils.py index 6e23ecdd..4fe2a6e1 100644 --- a/tests/utils/utils.py +++ b/tests/utils/utils.py @@ -2,13 +2,30 @@ from unittest.mock import ( MagicMock, ) +import trio -def create_mock_connections() -> dict: +from libp2p.abc import IHost + + +def create_mock_connections(count: int = 50) -> dict: connections = {} - for i in range(1, 31): + for i in range(1, count): peer_id = f"peer-{i}" mock_conn = MagicMock(name=f"INetConn-{i}") connections[peer_id] = mock_conn return connections + + +async def run_host_forever(host: IHost, addr): + async with host.run([addr]): + await trio.sleep_forever() + + +async def wait_until_listening(host, timeout=3): + with trio.move_on_after(timeout): + while not host.get_addrs(): + await trio.sleep(0.05) + return + raise RuntimeError("Timed out waiting for host to get an address") From bfe3dee781365f9674700f9f388f7c906c7e4cf2 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Fri, 4 Jul 2025 17:32:48 +0530 Subject: [PATCH 6/6] updated newsfragment --- newsfragments/708.performance.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/newsfragments/708.performance.rst b/newsfragments/708.performance.rst index 551a70e4..286615e9 100644 --- a/newsfragments/708.performance.rst +++ b/newsfragments/708.performance.rst @@ -1,2 +1 @@ -Limit concurrency in `push_identify_to_peers` to prevent resource exhaustion under high peer counts. -This makes peer communication more stable and efficient, especially at scale. +Added extra tests for identify push concurrency cap under high peer load