Merge pull request #324 from aratz-lasa/issue-280

Implemented Host that includes a routing system.
This commit is contained in:
Kevin Mai-Husan Chia
2019-10-16 17:20:23 +09:00
committed by GitHub
10 changed files with 188 additions and 47 deletions

View File

@ -4,6 +4,8 @@ from typing import Sequence
from libp2p.crypto.keys import KeyPair from libp2p.crypto.keys import KeyPair
from libp2p.crypto.rsa import create_new_key_pair from libp2p.crypto.rsa import create_new_key_pair
from libp2p.host.basic_host import BasicHost from libp2p.host.basic_host import BasicHost
from libp2p.host.host_interface import IHost
from libp2p.host.routed_host import RoutedHost
from libp2p.kademlia.network import KademliaServer from libp2p.kademlia.network import KademliaServer
from libp2p.kademlia.storage import IStorage from libp2p.kademlia.storage import IStorage
from libp2p.network.network_interface import INetwork from libp2p.network.network_interface import INetwork
@ -106,7 +108,7 @@ def initialize_default_swarm(
peerstore = peerstore_opt or PeerStore() peerstore = peerstore_opt or PeerStore()
# TODO: Initialize discovery if not presented # TODO: Initialize discovery if not presented
return Swarm(id_opt, peerstore, upgrader, transport, disc_opt) return Swarm(id_opt, peerstore, upgrader, transport)
async def new_node( async def new_node(
@ -117,7 +119,7 @@ async def new_node(
sec_opt: TSecurityOptions = None, sec_opt: TSecurityOptions = None,
peerstore_opt: IPeerStore = None, peerstore_opt: IPeerStore = None,
disc_opt: IPeerRouting = None, disc_opt: IPeerRouting = None,
) -> BasicHost: ) -> IHost:
""" """
create new libp2p node create new libp2p node
:param key_pair: key pair for deriving an identity :param key_pair: key pair for deriving an identity
@ -149,6 +151,10 @@ async def new_node(
# TODO enable support for other host type # TODO enable support for other host type
# TODO routing unimplemented # TODO routing unimplemented
host: IHost # If not explicitly typed, MyPy raises error
if disc_opt:
host = RoutedHost(swarm_opt, disc_opt)
else:
host = BasicHost(swarm_opt) host = BasicHost(swarm_opt)
# Kick off cleanup job # Kick off cleanup job

View File

@ -13,7 +13,6 @@ from libp2p.protocol_muxer.exceptions import MultiselectClientError, Multiselect
from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.protocol_muxer.multiselect_client import MultiselectClient
from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
from libp2p.typing import StreamHandlerFn, TProtocol from libp2p.typing import StreamHandlerFn, TProtocol
from .host_interface import IHost from .host_interface import IHost
@ -34,16 +33,14 @@ class BasicHost(IHost):
""" """
_network: INetwork _network: INetwork
_router: KadmeliaPeerRouter
peerstore: IPeerStore peerstore: IPeerStore
multiselect: Multiselect multiselect: Multiselect
multiselect_client: MultiselectClient multiselect_client: MultiselectClient
def __init__(self, network: INetwork, router: KadmeliaPeerRouter = None) -> None: def __init__(self, network: INetwork) -> None:
self._network = network self._network = network
self._network.set_stream_handler(self._swarm_stream_handler) self._network.set_stream_handler(self._swarm_stream_handler)
self._router = router
self.peerstore = self._network.peerstore self.peerstore = self._network.peerstore
# Protocol muxing # Protocol muxing
self.multiselect = Multiselect() self.multiselect = Multiselect()

View File

@ -0,0 +1,40 @@
from libp2p.host.basic_host import BasicHost
from libp2p.host.exceptions import ConnectionFailure
from libp2p.network.network_interface import INetwork
from libp2p.peer.peerinfo import PeerInfo
from libp2p.routing.interfaces import IPeerRouting
# RoutedHost is a p2p Host that includes a routing system.
# This allows the Host to find the addresses for peers when it does not have them.
class RoutedHost(BasicHost):
_router: IPeerRouting
def __init__(self, network: INetwork, router: IPeerRouting):
super().__init__(network)
self._router = router
async def connect(self, peer_info: PeerInfo) -> None:
"""
connect ensures there is a connection between this host and the peer with
given `peer_info.peer_id`. See (basic_host).connect for more information.
RoutedHost's Connect differs in that if the host has no addresses for a
given peer, it will use its routing system to try to find some.
:param peer_info: peer_info of the peer we want to connect to
:type peer_info: peer.peerinfo.PeerInfo
"""
# check if we were given some addresses, otherwise, find some with the routing system.
if not peer_info.addrs:
found_peer_info = await self._router.find_peer(peer_info.peer_id)
if not found_peer_info:
raise ConnectionFailure("Unable to find Peer address")
self.peerstore.add_addrs(peer_info.peer_id, found_peer_info.addrs, 10)
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
# there is already a connection to this peer
if peer_info.peer_id in self._network.connections:
return
await self._network.dial_peer(peer_info.peer_id)

View File

@ -8,7 +8,6 @@ from libp2p.network.connection.net_connection_interface import INetConn
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.peer.peerstore import PeerStoreError from libp2p.peer.peerstore import PeerStoreError
from libp2p.peer.peerstore_interface import IPeerStore from libp2p.peer.peerstore_interface import IPeerStore
from libp2p.routing.interfaces import IPeerRouting
from libp2p.stream_muxer.abc import IMuxedConn from libp2p.stream_muxer.abc import IMuxedConn
from libp2p.transport.exceptions import ( from libp2p.transport.exceptions import (
MuxerUpgradeFailure, MuxerUpgradeFailure,
@ -36,7 +35,6 @@ class Swarm(INetwork):
peerstore: IPeerStore peerstore: IPeerStore
upgrader: TransportUpgrader upgrader: TransportUpgrader
transport: ITransport transport: ITransport
router: IPeerRouting
# TODO: Connection and `peer_id` are 1-1 mapping in our implementation, # TODO: Connection and `peer_id` are 1-1 mapping in our implementation,
# whereas in Go one `peer_id` may point to multiple connections. # whereas in Go one `peer_id` may point to multiple connections.
connections: Dict[ID, INetConn] connections: Dict[ID, INetConn]
@ -51,13 +49,11 @@ class Swarm(INetwork):
peerstore: IPeerStore, peerstore: IPeerStore,
upgrader: TransportUpgrader, upgrader: TransportUpgrader,
transport: ITransport, transport: ITransport,
router: IPeerRouting,
): ):
self.self_id = peer_id self.self_id = peer_id
self.peerstore = peerstore self.peerstore = peerstore
self.upgrader = upgrader self.upgrader = upgrader
self.transport = transport self.transport = transport
self.router = router
self.connections = dict() self.connections = dict()
self.listeners = dict() self.listeners = dict()
@ -96,10 +92,7 @@ class Swarm(INetwork):
if not addrs: if not addrs:
raise SwarmException(f"No known addresses to peer {peer_id}") raise SwarmException(f"No known addresses to peer {peer_id}")
if not self.router:
multiaddr = addrs[0] multiaddr = addrs[0]
else:
multiaddr = self.router.find_peer(peer_id)
# Dial peer (connection to peer does not yet exist) # Dial peer (connection to peer does not yet exist)
# Transport dials peer (gets back a raw conn) # Transport dials peer (gets back a raw conn)
try: try:
@ -232,9 +225,6 @@ class Swarm(INetwork):
# No maddr succeeded # No maddr succeeded
return False return False
def add_router(self, router: IPeerRouting) -> None:
self.router = router
async def close(self) -> None: async def close(self) -> None:
# TODO: Prevent from new listeners and conns being added. # TODO: Prevent from new listeners and conns being added.
# Reference: https://github.com/libp2p/go-libp2p-swarm/blob/8be680aef8dea0a4497283f2f98470c2aeae6b65/swarm.go#L124-L134 # noqa: E501 # Reference: https://github.com/libp2p/go-libp2p-swarm/blob/8be680aef8dea0a4497283f2f98470c2aeae6b65/swarm.go#L124-L134 # noqa: E501

View File

@ -1,4 +1,4 @@
from typing import List, Sequence from typing import Any, List, Sequence
import multiaddr import multiaddr
@ -6,7 +6,6 @@ from .id import ID
class PeerInfo: class PeerInfo:
peer_id: ID peer_id: ID
addrs: List[multiaddr.Multiaddr] addrs: List[multiaddr.Multiaddr]
@ -14,6 +13,13 @@ class PeerInfo:
self.peer_id = peer_id self.peer_id = peer_id
self.addrs = list(addrs) self.addrs = list(addrs)
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, PeerInfo)
and self.peer_id == other.peer_id
and self.addrs == other.addrs
)
def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
if not addr: if not addr:

View File

@ -1,42 +1,42 @@
import ast import json
from typing import Union
import multiaddr
from libp2p.kademlia.kad_peerinfo import KadPeerInfo, create_kad_peerinfo
from libp2p.kademlia.network import KademliaServer from libp2p.kademlia.network import KademliaServer
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
from libp2p.routing.interfaces import IPeerRouting from libp2p.routing.interfaces import IPeerRouting
class KadmeliaPeerRouter(IPeerRouting): class KadmeliaPeerRouter(IPeerRouting):
server: KademliaServer server: KademliaServer
def __init__(self, dht_server: KademliaServer) -> None: def __init__(self, dht_server: KademliaServer) -> None:
self.server = dht_server self.server = dht_server
async def find_peer(self, peer_id: ID) -> KadPeerInfo: async def find_peer(self, peer_id: ID) -> PeerInfo:
""" """
Find a specific peer Find a specific peer
:param peer_id: peer to search for :param peer_id: peer to search for
:return: KadPeerInfo of specified peer :return: PeerInfo of specified peer
""" """
# switching peer_id to xor_id used by kademlia as node_id # switching peer_id to xor_id used by kademlia as node_id
xor_id = peer_id.xor_id xor_id = peer_id.xor_id
# ignore type for kad # ignore type for kad
value = await self.server.get(xor_id) # type: ignore value = await self.server.get(xor_id) # type: ignore
return decode_peerinfo(value) return (
peer_info_from_str(value) if value else None
) # TODO: should raise error if None?
def decode_peerinfo(encoded: Union[bytes, str]) -> KadPeerInfo: def peer_info_to_str(peer_info: PeerInfo) -> str:
if isinstance(encoded, bytes): return json.dumps(
encoded = encoded.decode() [peer_info.peer_id.to_string(), list(map(lambda a: str(a), peer_info.addrs))]
try: )
lines = ast.literal_eval(encoded)
except SyntaxError:
return None def peer_info_from_str(string: str) -> PeerInfo:
ip = lines[1] peer_id, raw_addrs = json.loads(string)
port = lines[2] return PeerInfo(
peer_id = lines[3] ID.from_base58(peer_id), list(map(lambda a: multiaddr.Multiaddr(a), raw_addrs))
# ignore typing for kad )
peer_info = create_kad_peerinfo(peer_id, ip, port) # type: ignore
return peer_info

0
tests/host/__init__.py Normal file
View File

View File

@ -0,0 +1,74 @@
import asyncio
import pytest
from libp2p.host.exceptions import ConnectionFailure
from libp2p.peer.peerinfo import PeerInfo
from libp2p.routing.kademlia.kademlia_peer_router import peer_info_to_str
from tests.utils import (
set_up_nodes_by_transport_and_disc_opt,
set_up_nodes_by_transport_opt,
set_up_routers,
)
@pytest.mark.asyncio
async def test_host_routing_success():
routers = await set_up_routers([5678, 5679])
transports = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]]
transport_disc_opt_list = zip(transports, routers)
(host_a, host_b) = await set_up_nodes_by_transport_and_disc_opt(
transport_disc_opt_list
)
# Set routing info
await routers[0].server.set(
host_a.get_id().xor_id,
peer_info_to_str(PeerInfo(host_a.get_id(), host_a.get_addrs())),
)
await routers[1].server.set(
host_b.get_id().xor_id,
peer_info_to_str(PeerInfo(host_b.get_id(), host_b.get_addrs())),
)
# forces to use routing as no addrs are provided
await host_a.connect(PeerInfo(host_b.get_id(), []))
await host_b.connect(PeerInfo(host_a.get_id(), []))
# Clean up
await asyncio.gather(*[host_a.close(), host_b.close()])
routers[0].server.stop()
routers[1].server.stop()
@pytest.mark.asyncio
async def test_host_routing_fail():
routers = await set_up_routers([5678, 5679])
transports = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]]
transport_disc_opt_list = zip(transports, routers)
(host_a, host_b) = await set_up_nodes_by_transport_and_disc_opt(
transport_disc_opt_list
)
host_c = (await set_up_nodes_by_transport_opt([["/ip4/127.0.0.1/tcp/0"]]))[0]
# Set routing info
await routers[0].server.set(
host_a.get_id().xor_id,
peer_info_to_str(PeerInfo(host_a.get_id(), host_a.get_addrs())),
)
await routers[1].server.set(
host_b.get_id().xor_id,
peer_info_to_str(PeerInfo(host_b.get_id(), host_b.get_addrs())),
)
# routing fails because host_c does not use routing
with pytest.raises(ConnectionFailure):
await host_a.connect(PeerInfo(host_c.get_id(), []))
with pytest.raises(ConnectionFailure):
await host_b.connect(PeerInfo(host_c.get_id(), []))
# Clean up
await asyncio.gather(*[host_a.close(), host_b.close(), host_c.close()])
routers[0].server.stop()
routers[1].server.stop()

View File

@ -2,7 +2,10 @@ import pytest
from libp2p.kademlia.network import KademliaServer from libp2p.kademlia.network import KademliaServer
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter from libp2p.routing.kademlia.kademlia_peer_router import (
KadmeliaPeerRouter,
peer_info_to_str,
)
@pytest.mark.asyncio @pytest.mark.asyncio
@ -15,11 +18,11 @@ async def test_simple_two_nodes():
node_a_value = await node_b.bootstrap([("127.0.0.1", 5678)]) node_a_value = await node_b.bootstrap([("127.0.0.1", 5678)])
node_a_kad_peerinfo = node_a_value[0] node_a_kad_peerinfo = node_a_value[0]
await node_a.set(node_a_kad_peerinfo.xor_id, repr(node_a_kad_peerinfo)) await node_a.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_b) router = KadmeliaPeerRouter(node_b)
returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes)) returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes))
assert repr(returned_info) == repr(node_a_kad_peerinfo) assert returned_info == node_a_kad_peerinfo
@pytest.mark.asyncio @pytest.mark.asyncio
@ -37,11 +40,11 @@ async def test_simple_three_nodes():
node_a_kad_peerinfo = node_a_value[0] node_a_kad_peerinfo = node_a_value[0]
await node_c.bootstrap([("127.0.0.1", 5702)]) await node_c.bootstrap([("127.0.0.1", 5702)])
await node_a.set(node_a_kad_peerinfo.xor_id, repr(node_a_kad_peerinfo)) await node_a.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_c) router = KadmeliaPeerRouter(node_c)
returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes)) returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes))
assert str(returned_info) == str(node_a_kad_peerinfo) assert returned_info == node_a_kad_peerinfo
@pytest.mark.asyncio @pytest.mark.asyncio
@ -65,8 +68,8 @@ async def test_simple_four_nodes():
await node_d.bootstrap([("127.0.0.1", 5803)]) await node_d.bootstrap([("127.0.0.1", 5803)])
await node_b.set(node_a_kad_peerinfo.xor_id, repr(node_a_kad_peerinfo)) await node_b.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_d) router = KadmeliaPeerRouter(node_d)
returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes)) returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes))
assert str(returned_info) == str(node_a_kad_peerinfo) assert returned_info == node_a_kad_peerinfo

View File

@ -1,7 +1,9 @@
import multiaddr import multiaddr
from libp2p import new_node from libp2p import new_node
from libp2p.kademlia.network import KademliaServer
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
from tests.constants import MAX_READ_LEN from tests.constants import MAX_READ_LEN
@ -36,6 +38,29 @@ async def set_up_nodes_by_transport_opt(transport_opt_list):
return tuple(nodes_list) return tuple(nodes_list)
async def set_up_nodes_by_transport_and_disc_opt(transport_disc_opt_list):
nodes_list = []
for transport_opt, disc_opt in transport_disc_opt_list:
node = await new_node(transport_opt=transport_opt, disc_opt=disc_opt)
await node.get_network().listen(multiaddr.Multiaddr(transport_opt[0]))
nodes_list.append(node)
return tuple(nodes_list)
async def set_up_routers(router_confs):
bootstrap_node = KademliaServer()
await bootstrap_node.listen(router_confs[0])
routers = [KadmeliaPeerRouter(bootstrap_node)]
for port in router_confs[1:]:
node = KademliaServer()
await node.listen(port)
await node.bootstrap_node(("127.0.0.1", router_confs[0]))
routers.append(KadmeliaPeerRouter(node))
return routers
async def echo_stream_handler(stream): async def echo_stream_handler(stream):
while True: while True:
read_string = (await stream.read(MAX_READ_LEN)).decode() read_string = (await stream.read(MAX_READ_LEN)).decode()