mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
added test for connected peers
This commit is contained in:
committed by
Paul Robinson
parent
847dfa6577
commit
17575f4a38
@ -146,7 +146,10 @@ class BasicHost(IHost):
|
||||
return addrs
|
||||
|
||||
def get_connected_peers(self) -> List[ID]:
|
||||
return list(self.get_network().connections.keys())
|
||||
"""
|
||||
:return: all the ids of peers this host is currently connected to
|
||||
"""
|
||||
return list(self._network.connections.keys())
|
||||
|
||||
@asynccontextmanager
|
||||
async def run(
|
||||
|
||||
118
tests/core/host/test_connected_peers.py
Normal file
118
tests/core/host/test_connected_peers.py
Normal file
@ -0,0 +1,118 @@
|
||||
import pytest
|
||||
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
)
|
||||
from libp2p.tools.factories import (
|
||||
HostFactory,
|
||||
)
|
||||
|
||||
|
||||
async def connect_two(host_a, host_b, host_c):
|
||||
# Initially all of the hosts are disconnected
|
||||
assert (len(host_a.get_connected_peers())) == 0
|
||||
assert (len(host_b.get_connected_peers())) == 0
|
||||
assert (len(host_c.get_connected_peers())) == 0
|
||||
|
||||
# Connecting hostA with hostB
|
||||
addr = host_b.get_addrs()[0]
|
||||
info = info_from_p2p_addr(addr)
|
||||
await host_a.connect(info)
|
||||
|
||||
# Since hostA and hostB are connected now
|
||||
assert (len(host_a.get_connected_peers())) == 1
|
||||
assert (len(host_b.get_connected_peers())) == 1
|
||||
assert (len(host_c.get_connected_peers())) == 0
|
||||
assert host_a.get_connected_peers()[0] == host_b.get_id()
|
||||
assert host_b.get_connected_peers()[0] == host_a.get_id()
|
||||
|
||||
|
||||
async def connect_three_cyclic(host_a, host_b, host_c):
|
||||
# Connecting hostA with hostB
|
||||
addr = host_b.get_addrs()[0]
|
||||
infoB = info_from_p2p_addr(addr)
|
||||
await host_a.connect(infoB)
|
||||
|
||||
# Connecting hostB with hostC
|
||||
addr = host_c.get_addrs()[0]
|
||||
infoC = info_from_p2p_addr(addr)
|
||||
await host_b.connect(infoC)
|
||||
|
||||
# Connecting hostC with hostA
|
||||
addr = host_a.get_addrs()[0]
|
||||
infoA = info_from_p2p_addr(addr)
|
||||
await host_c.connect(infoA)
|
||||
|
||||
# Performing checks
|
||||
assert (len(host_a.get_connected_peers())) == 2
|
||||
assert (len(host_b.get_connected_peers())) == 2
|
||||
assert (len(host_c.get_connected_peers())) == 2
|
||||
assert host_a.get_connected_peers() == [host_b.get_id(), host_c.get_id()]
|
||||
assert host_b.get_connected_peers() == [host_a.get_id(), host_c.get_id()]
|
||||
assert host_c.get_connected_peers() == [host_b.get_id(), host_a.get_id()]
|
||||
|
||||
|
||||
async def connect_two_to_one(host_a, host_b, host_c):
|
||||
# Connecting hostA and hostC to hostB
|
||||
addr = host_b.get_addrs()[0]
|
||||
infoB = info_from_p2p_addr(addr)
|
||||
await host_a.connect(infoB)
|
||||
await host_c.connect(infoB)
|
||||
|
||||
# Performing checks
|
||||
assert (len(host_a.get_connected_peers())) == 1
|
||||
assert (len(host_b.get_connected_peers())) == 2
|
||||
assert (len(host_c.get_connected_peers())) == 1
|
||||
assert host_a.get_connected_peers() == [host_b.get_id()]
|
||||
assert host_b.get_connected_peers() == [host_a.get_id(), host_c.get_id()]
|
||||
assert host_c.get_connected_peers() == [host_b.get_id()]
|
||||
|
||||
|
||||
async def connect_and_disconnect(host_a, host_b, host_c):
|
||||
# Connecting hostB to hostA and hostC
|
||||
addr = host_a.get_addrs()[0]
|
||||
infoA = info_from_p2p_addr(addr)
|
||||
await host_b.connect(infoA)
|
||||
addr = host_c.get_addrs()[0]
|
||||
infoC = info_from_p2p_addr(addr)
|
||||
await host_b.connect(infoC)
|
||||
|
||||
# Performing checks
|
||||
assert (len(host_a.get_connected_peers())) == 1
|
||||
assert (len(host_b.get_connected_peers())) == 2
|
||||
assert (len(host_c.get_connected_peers())) == 1
|
||||
assert host_a.get_connected_peers() == [host_b.get_id()]
|
||||
assert host_b.get_connected_peers() == [host_a.get_id(), host_c.get_id()]
|
||||
assert host_c.get_connected_peers() == [host_b.get_id()]
|
||||
|
||||
# Disconnecting hostB and hostA
|
||||
await host_b.disconnect(host_a.get_id())
|
||||
|
||||
# Performing checks
|
||||
assert (len(host_a.get_connected_peers())) == 0
|
||||
assert (len(host_b.get_connected_peers())) == 1
|
||||
assert (len(host_c.get_connected_peers())) == 1
|
||||
assert host_b.get_connected_peers() == [host_c.get_id()]
|
||||
assert host_c.get_connected_peers() == [host_b.get_id()]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test",
|
||||
[
|
||||
(connect_two),
|
||||
(connect_three_cyclic),
|
||||
(connect_two_to_one),
|
||||
(connect_and_disconnect),
|
||||
],
|
||||
)
|
||||
@pytest.mark.trio
|
||||
async def test_chat(test, security_protocol):
|
||||
print("!@# ", security_protocol)
|
||||
async with HostFactory.create_batch_and_listen(
|
||||
3, security_protocol=security_protocol
|
||||
) as hosts:
|
||||
# addr = hosts[0].get_addrs()[0]
|
||||
# info = info_from_p2p_addr(addr)
|
||||
# await hosts[1].connect(info)
|
||||
|
||||
await test(hosts[0], hosts[1], hosts[2])
|
||||
Reference in New Issue
Block a user