Use factories and fixtures in pubsub tests

Done
- Add factories using factory-boy
- Modify fixtures and tests to use factories
- Modify tests to use fixtures and factories
- Clean up
This commit is contained in:
mhchia
2019-08-01 00:09:09 +08:00
parent 9181cf95f0
commit c72dfe1dd3
14 changed files with 237 additions and 410 deletions

View File

@ -5,29 +5,30 @@ import pytest
from tests.utils import cleanup, connect
from .configs import GOSSIPSUB_PROTOCOL_ID
from .utils import (
create_libp2p_hosts,
create_pubsub_and_gossipsub_instances,
dense_connect,
one_to_all_connect,
from .configs import GossipsubParams
from .utils import dense_connect, one_to_all_connect
@pytest.mark.parametrize(
"num_hosts, gossipsub_params",
(
(
4,
GossipsubParams(
degree=4,
degree_low=3,
degree_high=5,
time_to_live=30,
gossip_window=3,
gossip_history=5,
heartbeat_interval=0.5,
),
),
),
)
SUPPORTED_PROTOCOLS = [GOSSIPSUB_PROTOCOL_ID]
@pytest.mark.asyncio
async def test_join():
# Create libp2p hosts
num_hosts = 4
async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub):
hosts_indices = list(range(num_hosts))
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 4, 3, 5, 30, 3, 5, 0.5
)
topic = "test_join"
central_node_index = 0
@ -38,10 +39,10 @@ async def test_join():
# All pubsub except the one of central node subscribe to topic
for i in subscribed_peer_indices:
await pubsubs[i].subscribe(topic)
await pubsubs_gsub[i].subscribe(topic)
# Connect central host to all other hosts
await one_to_all_connect(libp2p_hosts, central_node_index)
await one_to_all_connect(hosts, central_node_index)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
@ -49,7 +50,7 @@ async def test_join():
# Central node publish to the topic so that this topic
# is added to central node's fanout
# publish from the randomly chosen host
await pubsubs[central_node_index].publish(topic, b"data")
await pubsubs_gsub[central_node_index].publish(topic, b"data")
# Check that the gossipsub of central node has fanout for the topic
assert topic in gossipsubs[central_node_index].fanout
@ -57,7 +58,7 @@ async def test_join():
assert topic not in gossipsubs[central_node_index].mesh
# Central node subscribes the topic
await pubsubs[central_node_index].subscribe(topic)
await pubsubs_gsub[central_node_index].subscribe(topic)
await asyncio.sleep(2)
@ -66,35 +67,21 @@ async def test_join():
for i in hosts_indices:
if i in subscribed_peer_indices:
assert (
str(libp2p_hosts[i].get_id())
in gossipsubs[central_node_index].mesh[topic]
)
assert (
str(libp2p_hosts[central_node_index].get_id())
in gossipsubs[i].mesh[topic]
)
assert str(hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic]
assert str(hosts[central_node_index].get_id()) in gossipsubs[i].mesh[topic]
else:
assert (
str(libp2p_hosts[i].get_id())
not in gossipsubs[central_node_index].mesh[topic]
str(hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic]
)
assert topic not in gossipsubs[i].mesh
await cleanup()
@pytest.mark.parametrize("num_hosts", (1,))
@pytest.mark.asyncio
async def test_leave():
num_hosts = 1
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
_, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
gossipsub = gossipsubs[0]
async def test_leave(pubsubs_gsub):
gossipsub = pubsubs_gsub[0].router
topic = "test_leave"
assert topic not in gossipsub.mesh
@ -111,21 +98,14 @@ async def test_leave():
await cleanup()
@pytest.mark.parametrize("num_hosts", (2,))
@pytest.mark.asyncio
async def test_handle_graft(event_loop, monkeypatch):
num_hosts = 2
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
_, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
async def test_handle_graft(pubsubs_gsub, hosts, gossipsubs, event_loop, monkeypatch):
index_alice = 0
id_alice = str(libp2p_hosts[index_alice].get_id())
id_alice = str(hosts[index_alice].get_id())
index_bob = 1
id_bob = str(libp2p_hosts[index_bob].get_id())
await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob])
id_bob = str(hosts[index_bob].get_id())
await connect(hosts[index_alice], hosts[index_bob])
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
@ -168,26 +148,35 @@ async def test_handle_graft(event_loop, monkeypatch):
await cleanup()
@pytest.mark.parametrize(
"num_hosts, gossipsub_params",
(
(
2,
GossipsubParams(
degree=10,
degree_low=9,
degree_high=11,
time_to_live=30,
gossip_window=3,
gossip_history=5,
heartbeat_interval=3,
),
),
),
)
@pytest.mark.asyncio
async def test_handle_prune():
num_hosts = 2
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 3
)
async def test_handle_prune(pubsubs_gsub, hosts, gossipsubs):
index_alice = 0
id_alice = str(libp2p_hosts[index_alice].get_id())
id_alice = str(hosts[index_alice].get_id())
index_bob = 1
id_bob = str(libp2p_hosts[index_bob].get_id())
id_bob = str(hosts[index_bob].get_id())
topic = "test_handle_prune"
for pubsub in pubsubs:
for pubsub in pubsubs_gsub:
await pubsub.subscribe(topic)
await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob])
await connect(hosts[index_alice], hosts[index_bob])
# Wait 3 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(3)
@ -212,28 +201,21 @@ async def test_handle_prune():
await cleanup()
@pytest.mark.parametrize("num_hosts", (10,))
@pytest.mark.asyncio
async def test_dense():
# Create libp2p hosts
num_hosts = 10
async def test_dense(num_hosts, pubsubs_gsub, hosts):
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
# All pubsub subscribe to foobar
queues = []
for pubsub in pubsubs:
for pubsub in pubsubs_gsub:
q = await pubsub.subscribe("foobar")
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
# Densely connect libp2p hosts in a random way
await dense_connect(hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
@ -245,7 +227,7 @@ async def test_dense():
origin_idx = random.randint(0, num_hosts - 1)
# publish from the randomly chosen host
await pubsubs[origin_idx].publish("foobar", msg_content)
await pubsubs_gsub[origin_idx].publish("foobar", msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
@ -255,28 +237,21 @@ async def test_dense():
await cleanup()
@pytest.mark.parametrize("num_hosts", (10,))
@pytest.mark.asyncio
async def test_fanout():
# Create libp2p hosts
num_hosts = 10
async def test_fanout(hosts, pubsubs_gsub):
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
# All pubsub subscribe to foobar except for `pubsubs[0]`
# All pubsub subscribe to foobar except for `pubsubs_gsub[0]`
queues = []
for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe("foobar")
for i in range(1, len(pubsubs_gsub)):
q = await pubsubs_gsub[i].subscribe("foobar")
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
await dense_connect(hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
@ -290,7 +265,7 @@ async def test_fanout():
origin_idx = 0
# publish from the randomly chosen host
await pubsubs[origin_idx].publish(topic, msg_content)
await pubsubs_gsub[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
@ -299,7 +274,7 @@ async def test_fanout():
assert msg.data == msg_content
# Subscribe message origin
queues.insert(0, await pubsubs[0].subscribe(topic))
queues.insert(0, await pubsubs_gsub[0].subscribe(topic))
# Send messages again
for i in range(num_msgs):
@ -309,7 +284,7 @@ async def test_fanout():
origin_idx = 0
# publish from the randomly chosen host
await pubsubs[origin_idx].publish(topic, msg_content)
await pubsubs_gsub[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
@ -320,29 +295,22 @@ async def test_fanout():
await cleanup()
@pytest.mark.parametrize("num_hosts", (10,))
@pytest.mark.asyncio
async def test_fanout_maintenance():
# Create libp2p hosts
num_hosts = 10
async def test_fanout_maintenance(hosts, pubsubs_gsub):
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
# All pubsub subscribe to foobar
queues = []
topic = "foobar"
for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe(topic)
for i in range(1, len(pubsubs_gsub)):
q = await pubsubs_gsub[i].subscribe(topic)
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
await dense_connect(hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
@ -355,7 +323,7 @@ async def test_fanout_maintenance():
origin_idx = 0
# publish from the randomly chosen host
await pubsubs[origin_idx].publish(topic, msg_content)
await pubsubs_gsub[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
@ -363,7 +331,7 @@ async def test_fanout_maintenance():
msg = await queue.get()
assert msg.data == msg_content
for sub in pubsubs:
for sub in pubsubs_gsub:
await sub.unsubscribe(topic)
queues = []
@ -371,8 +339,8 @@ async def test_fanout_maintenance():
await asyncio.sleep(2)
# Resub and repeat
for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe(topic)
for i in range(1, len(pubsubs_gsub)):
q = await pubsubs_gsub[i].subscribe(topic)
# Add each blocking queue to an array of blocking queues
queues.append(q)
@ -387,7 +355,7 @@ async def test_fanout_maintenance():
origin_idx = 0
# publish from the randomly chosen host
await pubsubs[origin_idx].publish(topic, msg_content)
await pubsubs_gsub[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
@ -398,28 +366,36 @@ async def test_fanout_maintenance():
await cleanup()
@pytest.mark.parametrize(
"num_hosts, gossipsub_params",
(
(
2,
GossipsubParams(
degree=1,
degree_low=0,
degree_high=2,
time_to_live=30,
gossip_window=50,
gossip_history=100,
heartbeat_interval=0.5,
),
),
),
)
@pytest.mark.asyncio
async def test_gossip_propagation():
# Create libp2p hosts
num_hosts = 2
hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, _ = create_pubsub_and_gossipsub_instances(
hosts, SUPPORTED_PROTOCOLS, 1, 0, 2, 30, 50, 100, 0.5
)
async def test_gossip_propagation(hosts, pubsubs_gsub):
topic = "foo"
await pubsubs[0].subscribe(topic)
await pubsubs_gsub[0].subscribe(topic)
# node 0 publish to topic
msg_content = b"foo_msg"
# publish from the randomly chosen host
await pubsubs[0].publish(topic, msg_content)
await pubsubs_gsub[0].publish(topic, msg_content)
# now node 1 subscribes
queue_1 = await pubsubs[1].subscribe(topic)
queue_1 = await pubsubs_gsub[1].subscribe(topic)
await connect(hosts[0], hosts[1])