diff --git a/newsfragments/708.performance.rst b/newsfragments/708.performance.rst new file mode 100644 index 00000000..286615e9 --- /dev/null +++ b/newsfragments/708.performance.rst @@ -0,0 +1 @@ +Added extra tests for identify push concurrency cap under high peer load diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index b0ffb677..935fb2c0 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -35,6 +35,8 @@ from tests.utils.factories import ( ) from tests.utils.utils import ( create_mock_connections, + run_host_forever, + wait_until_listening, ) logger = logging.getLogger("libp2p.identity.identify-push-test") @@ -503,3 +505,91 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): assert state["max_observed"] <= CONCURRENCY_LIMIT, ( f"Max concurrency observed: {state['max_observed']}" ) + + +@pytest.mark.trio +async def test_all_peers_receive_identify_push_with_semaphore(security_protocol): + dummy_peers = [] + + async with host_pair_factory(security_protocol=security_protocol) as (host_a, _): + # Create dummy peers + for _ in range(50): + key_pair = create_new_key_pair() + dummy_host = new_host(key_pair=key_pair) + dummy_host.set_stream_handler( + ID_PUSH, identify_push_handler_for(dummy_host) + ) + listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") + dummy_peers.append((dummy_host, listen_addr)) + + async with trio.open_nursery() as nursery: + # Start all dummy hosts + for host, listen_addr in dummy_peers: + nursery.start_soon(run_host_forever, host, listen_addr) + + # Wait for all hosts to finish setting up listeners + for host, _ in dummy_peers: + await wait_until_listening(host) + + # Now connect host_a → dummy peers + for host, _ in dummy_peers: + await host_a.connect(info_from_p2p_addr(host.get_addrs()[0])) + + await push_identify_to_peers( + host_a, + ) + + await trio.sleep(0.5) + + peer_id_a = host_a.get_id() + for host, _ in dummy_peers: + dummy_peerstore = host.get_peerstore() + assert peer_id_a in dummy_peerstore.peer_ids() + + nursery.cancel_scope.cancel() + + +@pytest.mark.trio +async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_load( + security_protocol, +): + dummy_peers = [] + + async with host_pair_factory(security_protocol=security_protocol) as (host_a, _): + # Create dummy peers + # Breaking with more than 500 peers + # Trio have a async tasks limit of 1000 + for _ in range(499): + key_pair = create_new_key_pair() + dummy_host = new_host(key_pair=key_pair) + dummy_host.set_stream_handler( + ID_PUSH, identify_push_handler_for(dummy_host) + ) + listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") + dummy_peers.append((dummy_host, listen_addr)) + + async with trio.open_nursery() as nursery: + # Start all dummy hosts + for host, listen_addr in dummy_peers: + nursery.start_soon(run_host_forever, host, listen_addr) + + # Wait for all hosts to finish setting up listeners + for host, _ in dummy_peers: + await wait_until_listening(host) + + # Now connect host_a → dummy peers + for host, _ in dummy_peers: + await host_a.connect(info_from_p2p_addr(host.get_addrs()[0])) + + await push_identify_to_peers( + host_a, + ) + + await trio.sleep(0.5) + + peer_id_a = host_a.get_id() + for host, _ in dummy_peers: + dummy_peerstore = host.get_peerstore() + assert peer_id_a in dummy_peerstore.peer_ids() + + nursery.cancel_scope.cancel() diff --git a/tests/utils/utils.py b/tests/utils/utils.py index 6e23ecdd..4fe2a6e1 100644 --- a/tests/utils/utils.py +++ b/tests/utils/utils.py @@ -2,13 +2,30 @@ from unittest.mock import ( MagicMock, ) +import trio -def create_mock_connections() -> dict: +from libp2p.abc import IHost + + +def create_mock_connections(count: int = 50) -> dict: connections = {} - for i in range(1, 31): + for i in range(1, count): peer_id = f"peer-{i}" mock_conn = MagicMock(name=f"INetConn-{i}") connections[peer_id] = mock_conn return connections + + +async def run_host_forever(host: IHost, addr): + async with host.run([addr]): + await trio.sleep_forever() + + +async def wait_until_listening(host, timeout=3): + with trio.move_on_after(timeout): + while not host.get_addrs(): + await trio.sleep(0.05) + return + raise RuntimeError("Timed out waiting for host to get an address")