From 09b4c846a45e5c33ded8e444d871dac49e92fcea Mon Sep 17 00:00:00 2001 From: guha-rahul <52607971+guha-rahul@users.noreply.github.com> Date: Thu, 19 Jun 2025 17:48:45 +0530 Subject: [PATCH] feat: add support for sparse connect (#680) * init * add newsfragment * fix --- newsfragments/679.feature.rst | 1 + tests/core/pubsub/test_gossipsub.py | 82 +++++++++++++++++++++++++++++ tests/utils/pubsub/utils.py | 39 ++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 newsfragments/679.feature.rst diff --git a/newsfragments/679.feature.rst b/newsfragments/679.feature.rst new file mode 100644 index 00000000..372053dd --- /dev/null +++ b/newsfragments/679.feature.rst @@ -0,0 +1 @@ +Added sparse connect utility function to pubsub test utilities for creating test networks with configurable connectivity. diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index dffcbeac..4dec971d 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -17,6 +17,7 @@ from tests.utils.factories import ( from tests.utils.pubsub.utils import ( dense_connect, one_to_all_connect, + sparse_connect, ) @@ -506,3 +507,84 @@ async def test_gossip_heartbeat(initial_peer_count, monkeypatch): # Check that the peer to gossip to is not in our fanout peers assert peer not in fanout_peers assert topic_fanout in peers_to_gossip[peer] + + +@pytest.mark.trio +async def test_dense_connect_fallback(): + """Test that sparse_connect falls back to dense connect for small networks.""" + async with PubsubFactory.create_batch_with_gossipsub(3) as pubsubs_gsub: + hosts = [pubsub.host for pubsub in pubsubs_gsub] + degree = 2 + + # Create network (should use dense connect) + await sparse_connect(hosts, degree) + + # Wait for connections to be established + await trio.sleep(2) + + # Verify dense topology (all nodes connected to each other) + for i, pubsub in enumerate(pubsubs_gsub): + connected_peers = len(pubsub.peers) + expected_connections = len(hosts) - 1 + assert connected_peers == expected_connections, ( + f"Host {i} has {connected_peers} connections, " + f"expected {expected_connections} in dense mode" + ) + + +@pytest.mark.trio +async def test_sparse_connect(): + """Test sparse connect functionality and message propagation.""" + async with PubsubFactory.create_batch_with_gossipsub(10) as pubsubs_gsub: + hosts = [pubsub.host for pubsub in pubsubs_gsub] + degree = 2 + topic = "test_topic" + + # Create network (should use sparse connect) + await sparse_connect(hosts, degree) + + # Wait for connections to be established + await trio.sleep(2) + + # Verify sparse topology + for i, pubsub in enumerate(pubsubs_gsub): + connected_peers = len(pubsub.peers) + assert degree <= connected_peers < len(hosts) - 1, ( + f"Host {i} has {connected_peers} connections, " + f"expected between {degree} and {len(hosts) - 1} in sparse mode" + ) + + # Test message propagation + queues = [await pubsub.subscribe(topic) for pubsub in pubsubs_gsub] + await trio.sleep(2) + + # Publish and verify message propagation + msg_content = b"test_msg" + await pubsubs_gsub[0].publish(topic, msg_content) + await trio.sleep(2) + + # Verify message propagation - ideally all nodes should receive it + received_count = 0 + for queue in queues: + try: + msg = await queue.get() + if msg.data == msg_content: + received_count += 1 + except Exception: + continue + + total_nodes = len(pubsubs_gsub) + + # Ideally all nodes should receive the message for optimal scalability + if received_count == total_nodes: + # Perfect propagation achieved + pass + else: + # require more than half for acceptable scalability + min_required = (total_nodes + 1) // 2 + assert received_count >= min_required, ( + f"Message propagation insufficient: " + f"{received_count}/{total_nodes} nodes " + f"received the message. Ideally all nodes should receive it, but at " + f"minimum {min_required} required for sparse network scalability." + ) diff --git a/tests/utils/pubsub/utils.py b/tests/utils/pubsub/utils.py index 3437916a..5a10ce52 100644 --- a/tests/utils/pubsub/utils.py +++ b/tests/utils/pubsub/utils.py @@ -40,3 +40,42 @@ async def one_to_all_connect(hosts: Sequence[IHost], central_host_index: int) -> for i, host in enumerate(hosts): if i != central_host_index: await connect(hosts[central_host_index], host) + + +async def sparse_connect(hosts: Sequence[IHost], degree: int = 3) -> None: + """ + Create a sparse network topology where each node connects to a limited number of + other nodes. This is more efficient than dense connect for large networks. + + The function will automatically switch between dense and sparse connect based on + the network size: + - For small networks (nodes <= degree + 1), use dense connect + - For larger networks, use sparse connect with the specified degree + + Args: + hosts: Sequence of hosts to connect + degree: Number of connections each node should maintain (default: 3) + + """ + if len(hosts) <= degree + 1: + # For small networks, use dense connect + await dense_connect(hosts) + return + + # For larger networks, use sparse connect + # For each host, connect to 'degree' number of other hosts + for i, host in enumerate(hosts): + # Calculate which hosts to connect to + # We'll connect to hosts that are 'degree' positions away in the sequence + # This creates a more distributed topology + for j in range(1, degree + 1): + target_idx = (i + j) % len(hosts) + # Create bidirectional connection + await connect(host, hosts[target_idx]) + await connect(hosts[target_idx], host) + + # Ensure network connectivity by connecting each node to its immediate neighbors + for i in range(len(hosts)): + next_idx = (i + 1) % len(hosts) + await connect(hosts[i], hosts[next_idx]) + await connect(hosts[next_idx], hosts[i])