From b7d62c0f857d2bd974201546502c10a707435afe Mon Sep 17 00:00:00 2001 From: sukhman Date: Fri, 23 May 2025 00:39:17 +0530 Subject: [PATCH 01/10] Limit concurrency to push identify message to peers Signed-off-by: sukhman --- .../identity/identify_push/identify_push.py | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index c649c368..b6034956 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -40,6 +40,7 @@ logger = logging.getLogger(__name__) ID_PUSH = TProtocol("/ipfs/id/push/1.0.0") PROTOCOL_VERSION = "ipfs/0.1.0" AGENT_VERSION = get_agent_version() +LIMIT = trio.CapacityLimiter(10) def identify_push_handler_for(host: IHost) -> StreamHandlerFn: @@ -146,25 +147,26 @@ async def push_identify_to_peer( True if the push was successful, False otherwise. """ - try: - # Create a new stream to the peer using the identify/push protocol - stream = await host.new_stream(peer_id, [ID_PUSH]) + async with LIMIT: + try: + # Create a new stream to the peer using the identify/push protocol + stream = await host.new_stream(peer_id, [ID_PUSH]) - # Create the identify message - identify_msg = _mk_identify_protobuf(host, observed_multiaddr) - response = identify_msg.SerializeToString() + # Create the identify message + identify_msg = _mk_identify_protobuf(host, observed_multiaddr) + response = identify_msg.SerializeToString() - # Send the identify message - await stream.write(response) + # Send the identify message + await stream.write(response) - # Close the stream - await stream.close() + # Close the stream + await stream.close() - logger.debug("Successfully pushed identify to peer %s", peer_id) - return True - except Exception as e: - logger.error("Error pushing identify to peer %s: %s", peer_id, e) - return False + logger.debug("Successfully pushed identify to peer %s", peer_id) + return True + except Exception as e: + logger.error("Error pushing identify to peer %s: %s", peer_id, e) + return False async def push_identify_to_peers( @@ -179,13 +181,10 @@ async def push_identify_to_peers( """ if peer_ids is None: # Get all connected peers - peer_ids = set(host.get_peerstore().peer_ids()) + peer_ids = set(host.get_connected_peers()) # Push to each peer in parallel using a trio.Nursery - # TODO: Consider using a bounded nursery to limit concurrency - # and avoid overwhelming the network. This can be done by using - # trio.open_nursery(max_concurrent=10) or similar. - # For now, we will use an unbounded nursery for simplicity. + # limiting concurrent connections to 10 async with trio.open_nursery() as nursery: for peer_id in peer_ids: nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr) From ae16909f792deb2eb03b2e64a52c7130a509e17b Mon Sep 17 00:00:00 2001 From: sukhman Date: Mon, 2 Jun 2025 22:42:45 +0530 Subject: [PATCH 02/10] Test: Connected Peers Receive Pushes --- libp2p/identity/identify_push/identify_push.py | 2 +- .../identity/identify_push/test_identify_push.py | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index b6034956..7a9c10a9 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -40,7 +40,7 @@ logger = logging.getLogger(__name__) ID_PUSH = TProtocol("/ipfs/id/push/1.0.0") PROTOCOL_VERSION = "ipfs/0.1.0" AGENT_VERSION = get_agent_version() -LIMIT = trio.CapacityLimiter(10) +LIMIT = trio.Semaphore(10) def identify_push_handler_for(host: IHost) -> StreamHandlerFn: diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index 1b875e6f..ac06f599 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -175,6 +175,7 @@ async def test_identify_push_to_peers(security_protocol): host_c = new_host(key_pair=key_pair_c) # Set up the identify/push handlers + host_a.set_stream_handler(ID_PUSH, identify_push_handler_for(host_a)) host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b)) host_c.set_stream_handler(ID_PUSH, identify_push_handler_for(host_c)) @@ -203,6 +204,20 @@ async def test_identify_push_to_peers(security_protocol): # Check that the peer is in the peerstore assert peer_id_a in peerstore_c.peer_ids() + + # Test for push_identify to only connected peers and not all peers in peerstore + # Disconnect a from c. + await host_c.disconnect(host_a.get_id()) + # + await push_identify_to_peers(host_c) + + # Wait a bit for the push to complete + await trio.sleep(0.1) + + # Check that host_a's peerstore has not been updated with host_c's information + assert(host_c.get_id() not in host_a.get_peerstore().peer_ids()) + # Check that host_b's peerstore has been updated with host_c's information + assert(host_c.get_id() in host_b.get_peerstore().peer_ids()) @pytest.mark.trio From 8afb99c5b138ccfd287a7e5e7368cd9afe296621 Mon Sep 17 00:00:00 2001 From: sukhman Date: Mon, 9 Jun 2025 17:41:04 +0530 Subject: [PATCH 03/10] add test for counded concurrency Signed-off-by: sukhman --- .../identify_push/test_identify_push.py | 91 +++++++++++++++++-- tests/utils/utils.py | 11 +++ 2 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 tests/utils/utils.py diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index ac06f599..ce3373c8 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -1,4 +1,7 @@ import logging +from unittest.mock import ( + patch, +) import pytest import multiaddr @@ -29,10 +32,20 @@ from libp2p.peer.peerinfo import ( from tests.utils.factories import ( host_pair_factory, ) +from tests.utils.utils import ( + create_mock_connections, +) logger = logging.getLogger("libp2p.identity.identify-push-test") +CONCURRENCY_LIMIT = 10 +LIMIT = trio.Semaphore(CONCURRENCY_LIMIT) +concurrency_counter = 0 +max_observed = 0 +lock = trio.Lock() + + @pytest.mark.trio async def test_identify_push_protocol(security_protocol): """ @@ -204,20 +217,20 @@ async def test_identify_push_to_peers(security_protocol): # Check that the peer is in the peerstore assert peer_id_a in peerstore_c.peer_ids() - - # Test for push_identify to only connected peers and not all peers in peerstore + + # Test for push_identify to only connected peers and not all peers # Disconnect a from c. await host_c.disconnect(host_a.get_id()) - # + # await push_identify_to_peers(host_c) - + # Wait a bit for the push to complete await trio.sleep(0.1) - - # Check that host_a's peerstore has not been updated with host_c's information - assert(host_c.get_id() not in host_a.get_peerstore().peer_ids()) - # Check that host_b's peerstore has been updated with host_c's information - assert(host_c.get_id() in host_b.get_peerstore().peer_ids()) + + # Check that host_a's peerstore has not been updated with host_c's info + assert host_c.get_id() not in host_a.get_peerstore().peer_ids() + # Check that host_b's peerstore has been updated with host_c's info + assert host_c.get_id() in host_b.get_peerstore().peer_ids() @pytest.mark.trio @@ -427,3 +440,63 @@ async def test_partial_update_peerstore_from_identify(security_protocol): host_a_public_key = host_a.get_public_key().serialize() peerstore_public_key = peerstore.pubkey(peer_id).serialize() assert host_a_public_key == peerstore_public_key + + +@pytest.mark.trio +async def test_push_identify_to_peers_respects_concurrency_limit(): + """ + Test bounded concurrency for the identify/push protocol to prevent + network congestion. + + This test verifies: + 1. The number of concurrent tasks executing the identify push is always + less than or equal to CONCURRENCY_LIMIT. + 2. An error is raised if concurrency exceeds the defined limit. + + It mocks `push_identify_to_peer` to simulate delay using sleep, + allowing the test to measure and assert actual concurrency behavior. + """ + + # Create a mock host. + key_pair_host = create_new_key_pair() + host = new_host(key_pair=key_pair_host) + + # Create a mock network and add mock connections to the host + host.get_network().connections = create_mock_connections() + with patch( + "libp2p.identity.identify_push.identify_push.push_identify_to_peer", + new=mock_push_identify_to_peer, + ): + await push_identify_to_peers(host) + assert ( + max_observed <= CONCURRENCY_LIMIT + ), f"Max concurrency observed: {max_observed}" + + +async def mock_push_identify_to_peer(host, peer_id, observed_multiaddr=None) -> bool: + """ + Mock function to test concurrency by simulating an identify message. + + This function patches push_identify_to_peer for testing purpose + + Returns + ------- + bool + True if the push was successful, False otherwise. + """ + global concurrency_counter, max_observed + + async with LIMIT: + async with lock: + concurrency_counter += 1 + if concurrency_counter > CONCURRENCY_LIMIT: + raise RuntimeError(f"Concurrency limit exceeded: {concurrency_counter}") + max_observed = max(max_observed, concurrency_counter) + + logger.debug("Successfully pushed identify to peer %s", peer_id) + await trio.sleep(0.05) + + async with lock: + concurrency_counter -= 1 + + return True diff --git a/tests/utils/utils.py b/tests/utils/utils.py new file mode 100644 index 00000000..2eaf4cf6 --- /dev/null +++ b/tests/utils/utils.py @@ -0,0 +1,11 @@ +from unittest.mock import MagicMock + +def create_mock_connections() -> dict: + connections = {} + + for i in range(1, 31): + peer_id = f"peer-{i}" + mock_conn = MagicMock(name=f"INetConn-{i}") + connections[peer_id] = mock_conn + + return connections \ No newline at end of file From 1fb3f9c72bd0024d9f7d3dcd6fb09974c83220bf Mon Sep 17 00:00:00 2001 From: sukhman Date: Mon, 9 Jun 2025 17:50:30 +0530 Subject: [PATCH 04/10] Fix failing ci Signed-off-by: sukhman --- tests/utils/utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/utils/utils.py b/tests/utils/utils.py index 2eaf4cf6..6e23ecdd 100644 --- a/tests/utils/utils.py +++ b/tests/utils/utils.py @@ -1,4 +1,7 @@ -from unittest.mock import MagicMock +from unittest.mock import ( + MagicMock, +) + def create_mock_connections() -> dict: connections = {} @@ -8,4 +11,4 @@ def create_mock_connections() -> dict: mock_conn = MagicMock(name=f"INetConn-{i}") connections[peer_id] = mock_conn - return connections \ No newline at end of file + return connections From 983a4a001cc8f04b06cbdfddbb3ae36cbb04a202 Mon Sep 17 00:00:00 2001 From: sukhman Date: Thu, 26 Jun 2025 15:25:54 +0530 Subject: [PATCH 05/10] Fix pre-commit checks --- pyproject.toml | 3 +- .../identify_push/test_identify_push.py | 84 ++++++++++--------- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 91803ada..71332bf5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,4 +1,3 @@ - [build-system] requires = ["setuptools>=42", "wheel"] build-backend = "setuptools.build_meta" @@ -23,7 +22,7 @@ dependencies = [ "multiaddr>=0.0.9", "mypy-protobuf>=3.0.0", "noiseprotocol>=0.3.0", - "protobuf>=3.20.1,<4.0.0", + "protobuf>=4.21.0,<5.0.0", "pycryptodome>=3.9.2", "pymultihash>=0.8.2", "pynacl>=1.3.0", diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index ce3373c8..1a617b1f 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -39,13 +39,6 @@ from tests.utils.utils import ( logger = logging.getLogger("libp2p.identity.identify-push-test") -CONCURRENCY_LIMIT = 10 -LIMIT = trio.Semaphore(CONCURRENCY_LIMIT) -concurrency_counter = 0 -max_observed = 0 -lock = trio.Lock() - - @pytest.mark.trio async def test_identify_push_protocol(security_protocol): """ @@ -221,7 +214,7 @@ async def test_identify_push_to_peers(security_protocol): # Test for push_identify to only connected peers and not all peers # Disconnect a from c. await host_c.disconnect(host_a.get_id()) - # + await push_identify_to_peers(host_c) # Wait a bit for the push to complete @@ -456,6 +449,46 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): It mocks `push_identify_to_peer` to simulate delay using sleep, allowing the test to measure and assert actual concurrency behavior. """ + CONCURRENCY_LIMIT = 10 + LIMIT = trio.Semaphore(CONCURRENCY_LIMIT) + state = { + "concurrency_counter": 0, + "max_observed": 0, + } + lock = trio.Lock() + + async def mock_push_identify_to_peer( + host, peer_id, observed_multiaddr=None + ) -> bool: + """ + Mock function to test concurrency by simulating an identify message. + + This function patches push_identify_to_peer for testing purpose + + Returns + ------- + bool + True if the push was successful, False otherwise. + + """ + async with LIMIT: + async with lock: + state["concurrency_counter"] += 1 + if state["concurrency_counter"] > CONCURRENCY_LIMIT: + raise RuntimeError( + f"Concurrency limit exceeded: {state['concurrency_counter']}" + ) + state["max_observed"] = max( + state["max_observed"], state["concurrency_counter"] + ) + + logger.debug("Successfully pushed identify to peer %s", peer_id) + await trio.sleep(0.05) + + async with lock: + state["concurrency_counter"] -= 1 + + return True # Create a mock host. key_pair_host = create_new_key_pair() @@ -468,35 +501,6 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): new=mock_push_identify_to_peer, ): await push_identify_to_peers(host) - assert ( - max_observed <= CONCURRENCY_LIMIT - ), f"Max concurrency observed: {max_observed}" - - -async def mock_push_identify_to_peer(host, peer_id, observed_multiaddr=None) -> bool: - """ - Mock function to test concurrency by simulating an identify message. - - This function patches push_identify_to_peer for testing purpose - - Returns - ------- - bool - True if the push was successful, False otherwise. - """ - global concurrency_counter, max_observed - - async with LIMIT: - async with lock: - concurrency_counter += 1 - if concurrency_counter > CONCURRENCY_LIMIT: - raise RuntimeError(f"Concurrency limit exceeded: {concurrency_counter}") - max_observed = max(max_observed, concurrency_counter) - - logger.debug("Successfully pushed identify to peer %s", peer_id) - await trio.sleep(0.05) - - async with lock: - concurrency_counter -= 1 - - return True + assert state["max_observed"] <= CONCURRENCY_LIMIT, ( + f"Max concurrency observed: {state['max_observed']}" + ) From ad87e50eb7e012d2cce3e37da6f3f108f322dd2f Mon Sep 17 00:00:00 2001 From: sukhman Date: Mon, 30 Jun 2025 10:12:02 +0530 Subject: [PATCH 06/10] move concurrency_limit to identify_push Signed-off-by: sukhman --- libp2p/identity/identify_push/identify_push.py | 3 ++- tests/core/identity/identify_push/test_identify_push.py | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 7a9c10a9..71656c27 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -40,7 +40,8 @@ logger = logging.getLogger(__name__) ID_PUSH = TProtocol("/ipfs/id/push/1.0.0") PROTOCOL_VERSION = "ipfs/0.1.0" AGENT_VERSION = get_agent_version() -LIMIT = trio.Semaphore(10) +CONCURRENCY_LIMIT = 10 +LIMIT = trio.Semaphore(CONCURRENCY_LIMIT) def identify_push_handler_for(host: IHost) -> StreamHandlerFn: diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index 1a617b1f..b0ffb677 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -20,6 +20,7 @@ from libp2p.identity.identify.pb.identify_pb2 import ( Identify, ) from libp2p.identity.identify_push.identify_push import ( + CONCURRENCY_LIMIT, ID_PUSH, _update_peerstore_from_identify, identify_push_handler_for, @@ -449,8 +450,6 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): It mocks `push_identify_to_peer` to simulate delay using sleep, allowing the test to measure and assert actual concurrency behavior. """ - CONCURRENCY_LIMIT = 10 - LIMIT = trio.Semaphore(CONCURRENCY_LIMIT) state = { "concurrency_counter": 0, "max_observed": 0, @@ -458,7 +457,7 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): lock = trio.Lock() async def mock_push_identify_to_peer( - host, peer_id, observed_multiaddr=None + host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT) ) -> bool: """ Mock function to test concurrency by simulating an identify message. @@ -471,7 +470,7 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): True if the push was successful, False otherwise. """ - async with LIMIT: + async with limit: async with lock: state["concurrency_counter"] += 1 if state["concurrency_counter"] > CONCURRENCY_LIMIT: From ad0b5505bab3f73944c2a2b6b00a6695c21a1844 Mon Sep 17 00:00:00 2001 From: sukhman Date: Wed, 2 Jul 2025 13:11:34 +0530 Subject: [PATCH 07/10] make limit configurable in push_identify_to_peers --- libp2p/identity/identify_push/identify_push.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 71656c27..5a71cc38 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -41,7 +41,6 @@ ID_PUSH = TProtocol("/ipfs/id/push/1.0.0") PROTOCOL_VERSION = "ipfs/0.1.0" AGENT_VERSION = get_agent_version() CONCURRENCY_LIMIT = 10 -LIMIT = trio.Semaphore(CONCURRENCY_LIMIT) def identify_push_handler_for(host: IHost) -> StreamHandlerFn: @@ -134,7 +133,10 @@ async def _update_peerstore_from_identify( async def push_identify_to_peer( - host: IHost, peer_id: ID, observed_multiaddr: Multiaddr | None = None + host: IHost, + peer_id: ID, + observed_multiaddr: Multiaddr | None = None, + limit=trio.Semaphore(CONCURRENCY_LIMIT), ) -> bool: """ Push an identify message to a specific peer. @@ -148,7 +150,7 @@ async def push_identify_to_peer( True if the push was successful, False otherwise. """ - async with LIMIT: + async with limit: try: # Create a new stream to the peer using the identify/push protocol stream = await host.new_stream(peer_id, [ID_PUSH]) From 88db4ceb21b6a68d50e964528625ff54d3f07c84 Mon Sep 17 00:00:00 2001 From: sukhman Date: Wed, 2 Jul 2025 13:26:52 +0530 Subject: [PATCH 08/10] Fix lint --- libp2p/identity/identify_push/identify_push.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 5a71cc38..914264ed 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -136,7 +136,7 @@ async def push_identify_to_peer( host: IHost, peer_id: ID, observed_multiaddr: Multiaddr | None = None, - limit=trio.Semaphore(CONCURRENCY_LIMIT), + limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT), ) -> bool: """ Push an identify message to a specific peer. From 96c41773eab78a1a5684f2f0226103ff1080e1c6 Mon Sep 17 00:00:00 2001 From: sukhman Date: Thu, 3 Jul 2025 10:44:45 +0530 Subject: [PATCH 09/10] Add newsfrgment --- newsfragments/621.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/621.feature.rst diff --git a/newsfragments/621.feature.rst b/newsfragments/621.feature.rst new file mode 100644 index 00000000..f47c08f9 --- /dev/null +++ b/newsfragments/621.feature.rst @@ -0,0 +1 @@ +Limit concurrency in `push_identify_to_peers` to prevent resource congestion under high peer counts. \ No newline at end of file From 2ee23fdec1ae351f3077127ae78553f59be88843 Mon Sep 17 00:00:00 2001 From: sukhman Date: Thu, 3 Jul 2025 11:12:05 +0530 Subject: [PATCH 10/10] Fix ci --- newsfragments/621.feature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newsfragments/621.feature.rst b/newsfragments/621.feature.rst index f47c08f9..7ed27fac 100644 --- a/newsfragments/621.feature.rst +++ b/newsfragments/621.feature.rst @@ -1 +1 @@ -Limit concurrency in `push_identify_to_peers` to prevent resource congestion under high peer counts. \ No newline at end of file +Limit concurrency in `push_identify_to_peers` to prevent resource congestion under high peer counts.