mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-11 15:40:54 +00:00
added extra tests for identifu push for concurrency cap
This commit is contained in:
@ -10,7 +10,6 @@ import trio
|
||||
from libp2p import (
|
||||
new_host,
|
||||
)
|
||||
from libp2p.abc import IHost
|
||||
from libp2p.crypto.secp256k1 import (
|
||||
create_new_key_pair,
|
||||
)
|
||||
@ -36,6 +35,8 @@ from tests.utils.factories import (
|
||||
)
|
||||
from tests.utils.utils import (
|
||||
create_mock_connections,
|
||||
run_host_forever,
|
||||
wait_until_listening,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("libp2p.identity.identify-push-test")
|
||||
@ -504,3 +505,91 @@ async def test_push_identify_to_peers_respects_concurrency_limit():
|
||||
assert state["max_observed"] <= CONCURRENCY_LIMIT, (
|
||||
f"Max concurrency observed: {state['max_observed']}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_all_peers_receive_identify_push_with_semaphore(security_protocol):
|
||||
dummy_peers = []
|
||||
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (host_a, _):
|
||||
# Create dummy peers
|
||||
for _ in range(50):
|
||||
key_pair = create_new_key_pair()
|
||||
dummy_host = new_host(key_pair=key_pair)
|
||||
dummy_host.set_stream_handler(
|
||||
ID_PUSH, identify_push_handler_for(dummy_host)
|
||||
)
|
||||
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||
dummy_peers.append((dummy_host, listen_addr))
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
# Start all dummy hosts
|
||||
for host, listen_addr in dummy_peers:
|
||||
nursery.start_soon(run_host_forever, host, listen_addr)
|
||||
|
||||
# Wait for all hosts to finish setting up listeners
|
||||
for host, _ in dummy_peers:
|
||||
await wait_until_listening(host)
|
||||
|
||||
# Now connect host_a → dummy peers
|
||||
for host, _ in dummy_peers:
|
||||
await host_a.connect(info_from_p2p_addr(host.get_addrs()[0]))
|
||||
|
||||
await push_identify_to_peers(
|
||||
host_a,
|
||||
)
|
||||
|
||||
await trio.sleep(0.5)
|
||||
|
||||
peer_id_a = host_a.get_id()
|
||||
for host, _ in dummy_peers:
|
||||
dummy_peerstore = host.get_peerstore()
|
||||
assert peer_id_a in dummy_peerstore.peer_ids()
|
||||
|
||||
nursery.cancel_scope.cancel()
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_load(
|
||||
security_protocol,
|
||||
):
|
||||
dummy_peers = []
|
||||
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (host_a, _):
|
||||
# Create dummy peers
|
||||
# Breaking with more than 500 peers
|
||||
# Trio have a async tasks limit of 1000
|
||||
for _ in range(499):
|
||||
key_pair = create_new_key_pair()
|
||||
dummy_host = new_host(key_pair=key_pair)
|
||||
dummy_host.set_stream_handler(
|
||||
ID_PUSH, identify_push_handler_for(dummy_host)
|
||||
)
|
||||
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||
dummy_peers.append((dummy_host, listen_addr))
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
# Start all dummy hosts
|
||||
for host, listen_addr in dummy_peers:
|
||||
nursery.start_soon(run_host_forever, host, listen_addr)
|
||||
|
||||
# Wait for all hosts to finish setting up listeners
|
||||
for host, _ in dummy_peers:
|
||||
await wait_until_listening(host)
|
||||
|
||||
# Now connect host_a → dummy peers
|
||||
for host, _ in dummy_peers:
|
||||
await host_a.connect(info_from_p2p_addr(host.get_addrs()[0]))
|
||||
|
||||
await push_identify_to_peers(
|
||||
host_a,
|
||||
)
|
||||
|
||||
await trio.sleep(0.5)
|
||||
|
||||
peer_id_a = host_a.get_id()
|
||||
for host, _ in dummy_peers:
|
||||
dummy_peerstore = host.get_peerstore()
|
||||
assert peer_id_a in dummy_peerstore.peer_ids()
|
||||
|
||||
nursery.cancel_scope.cancel()
|
||||
|
||||
Reference in New Issue
Block a user