Add tests for swarm, and debug

Fix `swarm_pair_factory`
This commit is contained in:
mhchia
2019-09-15 20:44:48 +08:00
parent 276ac4d8ab
commit 0356380996
3 changed files with 56 additions and 1 deletions

View File

@ -112,7 +112,7 @@ class PubsubFactory(factory.Factory):
async def swarm_pair_factory(is_secure: bool) -> Tuple[Swarm, Swarm]:
swarms = await ListeningSwarmFactory.create_batch_and_listen(2)
swarms = await ListeningSwarmFactory.create_batch_and_listen(is_secure, 2)
await connect_swarm(swarms[0], swarms[1])
return swarms[0], swarms[1]

View File

@ -2,10 +2,43 @@ import asyncio
import pytest
from libp2p.network.exceptions import SwarmException
from tests.factories import ListeningSwarmFactory
from tests.utils import connect_swarm
@pytest.mark.asyncio
async def test_swarm_dial_peer(is_host_secure):
swarms = await ListeningSwarmFactory.create_batch_and_listen(is_host_secure, 3)
# Test: No addr found.
with pytest.raises(SwarmException):
await swarms[0].dial_peer(swarms[1].get_peer_id())
# Test: len(addr) in the peerstore is 0.
swarms[0].peerstore.add_addrs(swarms[1].get_peer_id(), [], 10000)
with pytest.raises(SwarmException):
await swarms[0].dial_peer(swarms[1].get_peer_id())
# Test: Succeed if addrs of the peer_id are present in the peerstore.
addrs = tuple(
addr
for transport in swarms[1].listeners.values()
for addr in transport.get_addrs()
)
swarms[0].peerstore.add_addrs(swarms[1].get_peer_id(), addrs, 10000)
await swarms[0].dial_peer(swarms[1].get_peer_id())
assert swarms[0].get_peer_id() in swarms[1].connections
assert swarms[1].get_peer_id() in swarms[0].connections
# Test: Reuse connections when we already have ones with a peer.
conn_to_1 = swarms[0].connections[swarms[1].get_peer_id()]
conn = await swarms[0].dial_peer(swarms[1].get_peer_id())
assert conn is conn_to_1
# Clean up
await asyncio.gather(*[swarm.close() for swarm in swarms])
@pytest.mark.asyncio
async def test_swarm_close_peer(is_host_secure):
swarms = await ListeningSwarmFactory.create_batch_and_listen(is_host_secure, 3)
@ -47,3 +80,14 @@ async def test_swarm_close_peer(is_host_secure):
# Clean up
await asyncio.gather(*[swarm.close() for swarm in swarms])
@pytest.mark.asyncio
async def test_swarm_remove_conn(swarm_pair):
swarm_0, swarm_1 = swarm_pair
conn_0 = swarm_0.connections[swarm_1.get_peer_id()]
swarm_0.remove_conn(conn_0)
assert swarm_1.get_peer_id() not in swarm_0.connections
# Test: Remove twice. There should not be errors.
swarm_0.remove_conn(conn_0)
assert swarm_1.get_peer_id() not in swarm_0.connections