mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-09 22:50:54 +00:00
Add gossipsub.join test
This commit is contained in:
@ -8,7 +8,7 @@ from libp2p.pubsub.pb import rpc_pb2
|
|||||||
from libp2p.pubsub.pubsub import Pubsub
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
from utils import message_id_generator, generate_RPC_packet, \
|
from utils import message_id_generator, generate_RPC_packet, \
|
||||||
create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \
|
create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \
|
||||||
connect
|
connect, one_to_all_connect
|
||||||
from tests.utils import cleanup
|
from tests.utils import cleanup
|
||||||
|
|
||||||
SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"]
|
SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"]
|
||||||
@ -16,25 +16,47 @@ SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"]
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_join():
|
async def test_join():
|
||||||
num_hosts = 1
|
# Create libp2p hosts
|
||||||
|
next_msg_id_func = message_id_generator(0)
|
||||||
|
|
||||||
|
num_hosts = 10
|
||||||
|
hosts_indices = list(range(num_hosts))
|
||||||
libp2p_hosts = await create_libp2p_hosts(num_hosts)
|
libp2p_hosts = await create_libp2p_hosts(num_hosts)
|
||||||
|
|
||||||
# Create pubsub, gossipsub instances
|
# Create pubsub, gossipsub instances
|
||||||
_, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
|
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
|
||||||
SUPPORTED_PROTOCOLS, \
|
SUPPORTED_PROTOCOLS, \
|
||||||
10, 9, 11, 30, 3, 5, 0.5)
|
10, 9, 11, 30, 3, 5, 0.5)
|
||||||
|
|
||||||
gossipsub = gossipsubs[0]
|
|
||||||
topic = "test_join"
|
topic = "test_join"
|
||||||
|
central_node_index = 0
|
||||||
|
# Remove index of central host from the indices
|
||||||
|
hosts_indices.remove(central_node_index)
|
||||||
|
num_subscribed_peer = 6
|
||||||
|
subscribed_peer_indices = random.sample(hosts_indices, num_subscribed_peer)
|
||||||
|
|
||||||
assert topic not in gossipsub.mesh
|
# All pubsub except the one of central node subscribe to topic
|
||||||
await gossipsub.join(topic)
|
for i in subscribed_peer_indices:
|
||||||
assert topic in gossipsub.mesh
|
q = await pubsubs[i].subscribe(topic)
|
||||||
|
|
||||||
# Test re-join
|
# Connect central host to all other hosts
|
||||||
await gossipsub.join(topic)
|
await one_to_all_connect(libp2p_hosts, central_node_index)
|
||||||
|
|
||||||
await cleanup()
|
# Wait 2 seconds for heartbeat to allow mesh to connect
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
# Check that the pubsub of central node does not have mesh or fanout for the topic
|
||||||
|
assert topic not in gossipsubs[central_node_index].fanout
|
||||||
|
assert topic not in gossipsubs[central_node_index].mesh
|
||||||
|
|
||||||
|
# Central node subscribe message origin
|
||||||
|
await pubsubs[central_node_index].subscribe(topic)
|
||||||
|
|
||||||
|
for i in hosts_indices:
|
||||||
|
if i in subscribed_peer_indices:
|
||||||
|
assert str(libp2p_hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic]
|
||||||
|
else:
|
||||||
|
assert str(libp2p_hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
Reference in New Issue
Block a user