Merge pull request #708 from lla-dane/todo/bounded-nursery

Added tests for identify push concurrency cap under high peer load
This commit is contained in:
Manu Sheel Gupta
2025-07-05 14:28:31 -07:00
committed by GitHub
3 changed files with 110 additions and 2 deletions

View File

@ -0,0 +1 @@
Added extra tests for identify push concurrency cap under high peer load

View File

@ -35,6 +35,8 @@ from tests.utils.factories import (
)
from tests.utils.utils import (
create_mock_connections,
run_host_forever,
wait_until_listening,
)
logger = logging.getLogger("libp2p.identity.identify-push-test")
@ -503,3 +505,91 @@ async def test_push_identify_to_peers_respects_concurrency_limit():
assert state["max_observed"] <= CONCURRENCY_LIMIT, (
f"Max concurrency observed: {state['max_observed']}"
)
@pytest.mark.trio
async def test_all_peers_receive_identify_push_with_semaphore(security_protocol):
dummy_peers = []
async with host_pair_factory(security_protocol=security_protocol) as (host_a, _):
# Create dummy peers
for _ in range(50):
key_pair = create_new_key_pair()
dummy_host = new_host(key_pair=key_pair)
dummy_host.set_stream_handler(
ID_PUSH, identify_push_handler_for(dummy_host)
)
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
dummy_peers.append((dummy_host, listen_addr))
async with trio.open_nursery() as nursery:
# Start all dummy hosts
for host, listen_addr in dummy_peers:
nursery.start_soon(run_host_forever, host, listen_addr)
# Wait for all hosts to finish setting up listeners
for host, _ in dummy_peers:
await wait_until_listening(host)
# Now connect host_a → dummy peers
for host, _ in dummy_peers:
await host_a.connect(info_from_p2p_addr(host.get_addrs()[0]))
await push_identify_to_peers(
host_a,
)
await trio.sleep(0.5)
peer_id_a = host_a.get_id()
for host, _ in dummy_peers:
dummy_peerstore = host.get_peerstore()
assert peer_id_a in dummy_peerstore.peer_ids()
nursery.cancel_scope.cancel()
@pytest.mark.trio
async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_load(
security_protocol,
):
dummy_peers = []
async with host_pair_factory(security_protocol=security_protocol) as (host_a, _):
# Create dummy peers
# Breaking with more than 500 peers
# Trio have a async tasks limit of 1000
for _ in range(499):
key_pair = create_new_key_pair()
dummy_host = new_host(key_pair=key_pair)
dummy_host.set_stream_handler(
ID_PUSH, identify_push_handler_for(dummy_host)
)
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
dummy_peers.append((dummy_host, listen_addr))
async with trio.open_nursery() as nursery:
# Start all dummy hosts
for host, listen_addr in dummy_peers:
nursery.start_soon(run_host_forever, host, listen_addr)
# Wait for all hosts to finish setting up listeners
for host, _ in dummy_peers:
await wait_until_listening(host)
# Now connect host_a → dummy peers
for host, _ in dummy_peers:
await host_a.connect(info_from_p2p_addr(host.get_addrs()[0]))
await push_identify_to_peers(
host_a,
)
await trio.sleep(0.5)
peer_id_a = host_a.get_id()
for host, _ in dummy_peers:
dummy_peerstore = host.get_peerstore()
assert peer_id_a in dummy_peerstore.peer_ids()
nursery.cancel_scope.cancel()

View File

@ -2,13 +2,30 @@ from unittest.mock import (
MagicMock,
)
import trio
def create_mock_connections() -> dict:
from libp2p.abc import IHost
def create_mock_connections(count: int = 50) -> dict:
connections = {}
for i in range(1, 31):
for i in range(1, count):
peer_id = f"peer-{i}"
mock_conn = MagicMock(name=f"INetConn-{i}")
connections[peer_id] = mock_conn
return connections
async def run_host_forever(host: IHost, addr):
async with host.run([addr]):
await trio.sleep_forever()
async def wait_until_listening(host, timeout=3):
with trio.move_on_after(timeout):
while not host.get_addrs():
await trio.sleep(0.05)
return
raise RuntimeError("Timed out waiting for host to get an address")