From 8afb99c5b138ccfd287a7e5e7368cd9afe296621 Mon Sep 17 00:00:00 2001 From: sukhman Date: Mon, 9 Jun 2025 17:41:04 +0530 Subject: [PATCH] add test for counded concurrency Signed-off-by: sukhman --- .../identify_push/test_identify_push.py | 91 +++++++++++++++++-- tests/utils/utils.py | 11 +++ 2 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 tests/utils/utils.py diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index ac06f599..ce3373c8 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -1,4 +1,7 @@ import logging +from unittest.mock import ( + patch, +) import pytest import multiaddr @@ -29,10 +32,20 @@ from libp2p.peer.peerinfo import ( from tests.utils.factories import ( host_pair_factory, ) +from tests.utils.utils import ( + create_mock_connections, +) logger = logging.getLogger("libp2p.identity.identify-push-test") +CONCURRENCY_LIMIT = 10 +LIMIT = trio.Semaphore(CONCURRENCY_LIMIT) +concurrency_counter = 0 +max_observed = 0 +lock = trio.Lock() + + @pytest.mark.trio async def test_identify_push_protocol(security_protocol): """ @@ -204,20 +217,20 @@ async def test_identify_push_to_peers(security_protocol): # Check that the peer is in the peerstore assert peer_id_a in peerstore_c.peer_ids() - - # Test for push_identify to only connected peers and not all peers in peerstore + + # Test for push_identify to only connected peers and not all peers # Disconnect a from c. await host_c.disconnect(host_a.get_id()) - # + # await push_identify_to_peers(host_c) - + # Wait a bit for the push to complete await trio.sleep(0.1) - - # Check that host_a's peerstore has not been updated with host_c's information - assert(host_c.get_id() not in host_a.get_peerstore().peer_ids()) - # Check that host_b's peerstore has been updated with host_c's information - assert(host_c.get_id() in host_b.get_peerstore().peer_ids()) + + # Check that host_a's peerstore has not been updated with host_c's info + assert host_c.get_id() not in host_a.get_peerstore().peer_ids() + # Check that host_b's peerstore has been updated with host_c's info + assert host_c.get_id() in host_b.get_peerstore().peer_ids() @pytest.mark.trio @@ -427,3 +440,63 @@ async def test_partial_update_peerstore_from_identify(security_protocol): host_a_public_key = host_a.get_public_key().serialize() peerstore_public_key = peerstore.pubkey(peer_id).serialize() assert host_a_public_key == peerstore_public_key + + +@pytest.mark.trio +async def test_push_identify_to_peers_respects_concurrency_limit(): + """ + Test bounded concurrency for the identify/push protocol to prevent + network congestion. + + This test verifies: + 1. The number of concurrent tasks executing the identify push is always + less than or equal to CONCURRENCY_LIMIT. + 2. An error is raised if concurrency exceeds the defined limit. + + It mocks `push_identify_to_peer` to simulate delay using sleep, + allowing the test to measure and assert actual concurrency behavior. + """ + + # Create a mock host. + key_pair_host = create_new_key_pair() + host = new_host(key_pair=key_pair_host) + + # Create a mock network and add mock connections to the host + host.get_network().connections = create_mock_connections() + with patch( + "libp2p.identity.identify_push.identify_push.push_identify_to_peer", + new=mock_push_identify_to_peer, + ): + await push_identify_to_peers(host) + assert ( + max_observed <= CONCURRENCY_LIMIT + ), f"Max concurrency observed: {max_observed}" + + +async def mock_push_identify_to_peer(host, peer_id, observed_multiaddr=None) -> bool: + """ + Mock function to test concurrency by simulating an identify message. + + This function patches push_identify_to_peer for testing purpose + + Returns + ------- + bool + True if the push was successful, False otherwise. + """ + global concurrency_counter, max_observed + + async with LIMIT: + async with lock: + concurrency_counter += 1 + if concurrency_counter > CONCURRENCY_LIMIT: + raise RuntimeError(f"Concurrency limit exceeded: {concurrency_counter}") + max_observed = max(max_observed, concurrency_counter) + + logger.debug("Successfully pushed identify to peer %s", peer_id) + await trio.sleep(0.05) + + async with lock: + concurrency_counter -= 1 + + return True diff --git a/tests/utils/utils.py b/tests/utils/utils.py new file mode 100644 index 00000000..2eaf4cf6 --- /dev/null +++ b/tests/utils/utils.py @@ -0,0 +1,11 @@ +from unittest.mock import MagicMock + +def create_mock_connections() -> dict: + connections = {} + + for i in range(1, 31): + peer_id = f"peer-{i}" + mock_conn = MagicMock(name=f"INetConn-{i}") + connections[peer_id] = mock_conn + + return connections \ No newline at end of file