mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
feat: add support for sparse connect (#680)
* init * add newsfragment * fix
This commit is contained in:
1
newsfragments/679.feature.rst
Normal file
1
newsfragments/679.feature.rst
Normal file
@ -0,0 +1 @@
|
||||
Added sparse connect utility function to pubsub test utilities for creating test networks with configurable connectivity.
|
||||
@ -17,6 +17,7 @@ from tests.utils.factories import (
|
||||
from tests.utils.pubsub.utils import (
|
||||
dense_connect,
|
||||
one_to_all_connect,
|
||||
sparse_connect,
|
||||
)
|
||||
|
||||
|
||||
@ -506,3 +507,84 @@ async def test_gossip_heartbeat(initial_peer_count, monkeypatch):
|
||||
# Check that the peer to gossip to is not in our fanout peers
|
||||
assert peer not in fanout_peers
|
||||
assert topic_fanout in peers_to_gossip[peer]
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_dense_connect_fallback():
|
||||
"""Test that sparse_connect falls back to dense connect for small networks."""
|
||||
async with PubsubFactory.create_batch_with_gossipsub(3) as pubsubs_gsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
||||
degree = 2
|
||||
|
||||
# Create network (should use dense connect)
|
||||
await sparse_connect(hosts, degree)
|
||||
|
||||
# Wait for connections to be established
|
||||
await trio.sleep(2)
|
||||
|
||||
# Verify dense topology (all nodes connected to each other)
|
||||
for i, pubsub in enumerate(pubsubs_gsub):
|
||||
connected_peers = len(pubsub.peers)
|
||||
expected_connections = len(hosts) - 1
|
||||
assert connected_peers == expected_connections, (
|
||||
f"Host {i} has {connected_peers} connections, "
|
||||
f"expected {expected_connections} in dense mode"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_sparse_connect():
|
||||
"""Test sparse connect functionality and message propagation."""
|
||||
async with PubsubFactory.create_batch_with_gossipsub(10) as pubsubs_gsub:
|
||||
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
||||
degree = 2
|
||||
topic = "test_topic"
|
||||
|
||||
# Create network (should use sparse connect)
|
||||
await sparse_connect(hosts, degree)
|
||||
|
||||
# Wait for connections to be established
|
||||
await trio.sleep(2)
|
||||
|
||||
# Verify sparse topology
|
||||
for i, pubsub in enumerate(pubsubs_gsub):
|
||||
connected_peers = len(pubsub.peers)
|
||||
assert degree <= connected_peers < len(hosts) - 1, (
|
||||
f"Host {i} has {connected_peers} connections, "
|
||||
f"expected between {degree} and {len(hosts) - 1} in sparse mode"
|
||||
)
|
||||
|
||||
# Test message propagation
|
||||
queues = [await pubsub.subscribe(topic) for pubsub in pubsubs_gsub]
|
||||
await trio.sleep(2)
|
||||
|
||||
# Publish and verify message propagation
|
||||
msg_content = b"test_msg"
|
||||
await pubsubs_gsub[0].publish(topic, msg_content)
|
||||
await trio.sleep(2)
|
||||
|
||||
# Verify message propagation - ideally all nodes should receive it
|
||||
received_count = 0
|
||||
for queue in queues:
|
||||
try:
|
||||
msg = await queue.get()
|
||||
if msg.data == msg_content:
|
||||
received_count += 1
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
total_nodes = len(pubsubs_gsub)
|
||||
|
||||
# Ideally all nodes should receive the message for optimal scalability
|
||||
if received_count == total_nodes:
|
||||
# Perfect propagation achieved
|
||||
pass
|
||||
else:
|
||||
# require more than half for acceptable scalability
|
||||
min_required = (total_nodes + 1) // 2
|
||||
assert received_count >= min_required, (
|
||||
f"Message propagation insufficient: "
|
||||
f"{received_count}/{total_nodes} nodes "
|
||||
f"received the message. Ideally all nodes should receive it, but at "
|
||||
f"minimum {min_required} required for sparse network scalability."
|
||||
)
|
||||
|
||||
@ -40,3 +40,42 @@ async def one_to_all_connect(hosts: Sequence[IHost], central_host_index: int) ->
|
||||
for i, host in enumerate(hosts):
|
||||
if i != central_host_index:
|
||||
await connect(hosts[central_host_index], host)
|
||||
|
||||
|
||||
async def sparse_connect(hosts: Sequence[IHost], degree: int = 3) -> None:
|
||||
"""
|
||||
Create a sparse network topology where each node connects to a limited number of
|
||||
other nodes. This is more efficient than dense connect for large networks.
|
||||
|
||||
The function will automatically switch between dense and sparse connect based on
|
||||
the network size:
|
||||
- For small networks (nodes <= degree + 1), use dense connect
|
||||
- For larger networks, use sparse connect with the specified degree
|
||||
|
||||
Args:
|
||||
hosts: Sequence of hosts to connect
|
||||
degree: Number of connections each node should maintain (default: 3)
|
||||
|
||||
"""
|
||||
if len(hosts) <= degree + 1:
|
||||
# For small networks, use dense connect
|
||||
await dense_connect(hosts)
|
||||
return
|
||||
|
||||
# For larger networks, use sparse connect
|
||||
# For each host, connect to 'degree' number of other hosts
|
||||
for i, host in enumerate(hosts):
|
||||
# Calculate which hosts to connect to
|
||||
# We'll connect to hosts that are 'degree' positions away in the sequence
|
||||
# This creates a more distributed topology
|
||||
for j in range(1, degree + 1):
|
||||
target_idx = (i + j) % len(hosts)
|
||||
# Create bidirectional connection
|
||||
await connect(host, hosts[target_idx])
|
||||
await connect(hosts[target_idx], host)
|
||||
|
||||
# Ensure network connectivity by connecting each node to its immediate neighbors
|
||||
for i in range(len(hosts)):
|
||||
next_idx = (i + 1) % len(hosts)
|
||||
await connect(hosts[i], hosts[next_idx])
|
||||
await connect(hosts[next_idx], hosts[i])
|
||||
|
||||
Reference in New Issue
Block a user