add test for counded concurrency

Signed-off-by: sukhman <sukhmansinghsaluja@gmail.com>
This commit is contained in:
sukhman
2025-06-09 17:41:04 +05:30
parent ae16909f79
commit 8afb99c5b1
2 changed files with 93 additions and 9 deletions

View File

@ -1,4 +1,7 @@
import logging
from unittest.mock import (
patch,
)
import pytest
import multiaddr
@ -29,10 +32,20 @@ from libp2p.peer.peerinfo import (
from tests.utils.factories import (
host_pair_factory,
)
from tests.utils.utils import (
create_mock_connections,
)
logger = logging.getLogger("libp2p.identity.identify-push-test")
CONCURRENCY_LIMIT = 10
LIMIT = trio.Semaphore(CONCURRENCY_LIMIT)
concurrency_counter = 0
max_observed = 0
lock = trio.Lock()
@pytest.mark.trio
async def test_identify_push_protocol(security_protocol):
"""
@ -204,20 +217,20 @@ async def test_identify_push_to_peers(security_protocol):
# Check that the peer is in the peerstore
assert peer_id_a in peerstore_c.peer_ids()
# Test for push_identify to only connected peers and not all peers in peerstore
# Test for push_identify to only connected peers and not all peers
# Disconnect a from c.
await host_c.disconnect(host_a.get_id())
#
#
await push_identify_to_peers(host_c)
# Wait a bit for the push to complete
await trio.sleep(0.1)
# Check that host_a's peerstore has not been updated with host_c's information
assert(host_c.get_id() not in host_a.get_peerstore().peer_ids())
# Check that host_b's peerstore has been updated with host_c's information
assert(host_c.get_id() in host_b.get_peerstore().peer_ids())
# Check that host_a's peerstore has not been updated with host_c's info
assert host_c.get_id() not in host_a.get_peerstore().peer_ids()
# Check that host_b's peerstore has been updated with host_c's info
assert host_c.get_id() in host_b.get_peerstore().peer_ids()
@pytest.mark.trio
@ -427,3 +440,63 @@ async def test_partial_update_peerstore_from_identify(security_protocol):
host_a_public_key = host_a.get_public_key().serialize()
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
assert host_a_public_key == peerstore_public_key
@pytest.mark.trio
async def test_push_identify_to_peers_respects_concurrency_limit():
"""
Test bounded concurrency for the identify/push protocol to prevent
network congestion.
This test verifies:
1. The number of concurrent tasks executing the identify push is always
less than or equal to CONCURRENCY_LIMIT.
2. An error is raised if concurrency exceeds the defined limit.
It mocks `push_identify_to_peer` to simulate delay using sleep,
allowing the test to measure and assert actual concurrency behavior.
"""
# Create a mock host.
key_pair_host = create_new_key_pair()
host = new_host(key_pair=key_pair_host)
# Create a mock network and add mock connections to the host
host.get_network().connections = create_mock_connections()
with patch(
"libp2p.identity.identify_push.identify_push.push_identify_to_peer",
new=mock_push_identify_to_peer,
):
await push_identify_to_peers(host)
assert (
max_observed <= CONCURRENCY_LIMIT
), f"Max concurrency observed: {max_observed}"
async def mock_push_identify_to_peer(host, peer_id, observed_multiaddr=None) -> bool:
"""
Mock function to test concurrency by simulating an identify message.
This function patches push_identify_to_peer for testing purpose
Returns
-------
bool
True if the push was successful, False otherwise.
"""
global concurrency_counter, max_observed
async with LIMIT:
async with lock:
concurrency_counter += 1
if concurrency_counter > CONCURRENCY_LIMIT:
raise RuntimeError(f"Concurrency limit exceeded: {concurrency_counter}")
max_observed = max(max_observed, concurrency_counter)
logger.debug("Successfully pushed identify to peer %s", peer_id)
await trio.sleep(0.05)
async with lock:
concurrency_counter -= 1
return True

11
tests/utils/utils.py Normal file
View File

@ -0,0 +1,11 @@
from unittest.mock import MagicMock
def create_mock_connections() -> dict:
connections = {}
for i in range(1, 31):
peer_id = f"peer-{i}"
mock_conn = MagicMock(name=f"INetConn-{i}")
connections[peer_id] = mock_conn
return connections