diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 1f9b8599..50f65ff0 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -8,7 +8,7 @@ from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from utils import message_id_generator, generate_RPC_packet, \ create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \ - connect + connect, one_to_all_connect from tests.utils import cleanup SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] @@ -16,25 +16,47 @@ SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] @pytest.mark.asyncio async def test_join(): - num_hosts = 1 + # Create libp2p hosts + next_msg_id_func = message_id_generator(0) + + num_hosts = 10 + hosts_indices = list(range(num_hosts)) libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ SUPPORTED_PROTOCOLS, \ 10, 9, 11, 30, 3, 5, 0.5) - gossipsub = gossipsubs[0] topic = "test_join" + central_node_index = 0 + # Remove index of central host from the indices + hosts_indices.remove(central_node_index) + num_subscribed_peer = 6 + subscribed_peer_indices = random.sample(hosts_indices, num_subscribed_peer) - assert topic not in gossipsub.mesh - await gossipsub.join(topic) - assert topic in gossipsub.mesh + # All pubsub except the one of central node subscribe to topic + for i in subscribed_peer_indices: + q = await pubsubs[i].subscribe(topic) - # Test re-join - await gossipsub.join(topic) + # Connect central host to all other hosts + await one_to_all_connect(libp2p_hosts, central_node_index) - await cleanup() + # Wait 2 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(2) + + # Check that the pubsub of central node does not have mesh or fanout for the topic + assert topic not in gossipsubs[central_node_index].fanout + assert topic not in gossipsubs[central_node_index].mesh + + # Central node subscribe message origin + await pubsubs[central_node_index].subscribe(topic) + + for i in hosts_indices: + if i in subscribed_peer_indices: + assert str(libp2p_hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic] + else: + assert str(libp2p_hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic] @pytest.mark.asyncio