Merge pull request #157 from libp2p/router-refactor

Refactored RoutedHost into Injected Router
This commit is contained in:
Alex Haynes
2019-05-05 14:45:22 -04:00
committed by GitHub
16 changed files with 163 additions and 87 deletions

View File

@ -6,10 +6,10 @@ from .peer.peerstore import PeerStore
from .peer.id import id_from_public_key from .peer.id import id_from_public_key
from .network.swarm import Swarm from .network.swarm import Swarm
from .host.basic_host import BasicHost from .host.basic_host import BasicHost
from .kademlia.routed_host import RoutedHost
from .transport.upgrader import TransportUpgrader from .transport.upgrader import TransportUpgrader
from .transport.tcp.tcp import TCP from .transport.tcp.tcp import TCP
from .kademlia.network import KademliaServer from .kademlia.network import KademliaServer
from .routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
async def cleanup_done_tasks(): async def cleanup_done_tasks():
@ -31,7 +31,7 @@ def generate_id():
# private_key = new_key.exportKey("PEM") # private_key = new_key.exportKey("PEM")
return new_id return new_id
def initialize_default_kademlia( def initialize_default_kademlia_router(
ksize=20, alpha=3, id_opt=None, storage=None): ksize=20, alpha=3, id_opt=None, storage=None):
""" """
initialize swam when no swarm is passed in initialize swam when no swarm is passed in
@ -46,13 +46,14 @@ def initialize_default_kademlia(
id_opt = generate_id() id_opt = generate_id()
node_id = id_opt.get_raw_id() node_id = id_opt.get_raw_id()
return KademliaServer(ksize=ksize, alpha=alpha, server = KademliaServer(ksize=ksize, alpha=alpha,
node_id=node_id, storage=storage) node_id=node_id, storage=storage)
return KadmeliaPeerRouter(server)
def initialize_default_swarm( def initialize_default_swarm(
id_opt=None, transport_opt=None, id_opt=None, transport_opt=None, muxer_opt=None,
muxer_opt=None, sec_opt=None, peerstore_opt=None): sec_opt=None, peerstore_opt=None, disc_opt=None):
""" """
initialize swarm when no swarm is passed in initialize swarm when no swarm is passed in
:param id_opt: optional id for host :param id_opt: optional id for host
@ -78,7 +79,8 @@ def initialize_default_swarm(
upgrader = TransportUpgrader(sec, muxer) upgrader = TransportUpgrader(sec, muxer)
peerstore = peerstore_opt or PeerStore() peerstore = peerstore_opt or PeerStore()
swarm_opt = Swarm(id_opt, peerstore, upgrader, transport) swarm_opt = Swarm(id_opt, peerstore,\
upgrader, transport, disc_opt)
return swarm_opt return swarm_opt
@ -105,14 +107,11 @@ async def new_node(
swarm_opt = initialize_default_swarm( swarm_opt = initialize_default_swarm(
id_opt=id_opt, transport_opt=transport_opt, id_opt=id_opt, transport_opt=transport_opt,
muxer_opt=muxer_opt, sec_opt=sec_opt, muxer_opt=muxer_opt, sec_opt=sec_opt,
peerstore_opt=peerstore_opt) peerstore_opt=peerstore_opt, disc_opt=disc_opt)
# TODO enable support for other host type # TODO enable support for other host type
# TODO routing unimplemented # TODO routing unimplemented
if not disc_opt: host = BasicHost(swarm_opt)
host = BasicHost(swarm_opt)
else:
host = RoutedHost(swarm_opt, disc_opt)
# Kick off cleanup job # Kick off cleanup job
asyncio.ensure_future(cleanup_done_tasks()) asyncio.ensure_future(cleanup_done_tasks())

View File

@ -11,21 +11,22 @@ from .host_interface import IHost
class BasicHost(IHost): class BasicHost(IHost):
# default options constructor # default options constructor
def __init__(self, _network): def __init__(self, network, router=None):
self.network = _network self._network = network
self.peerstore = self.network.peerstore self._router = router
self.peerstore = self._network.peerstore
def get_id(self): def get_id(self):
""" """
:return: peer_id of host :return: peer_id of host
""" """
return self.network.get_peer_id() return self._network.get_peer_id()
def get_network(self): def get_network(self):
""" """
:return: network instance of host :return: network instance of host
""" """
return self.network return self._network
def get_peerstore(self): def get_peerstore(self):
""" """
@ -45,7 +46,7 @@ class BasicHost(IHost):
p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty())) p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty()))
addrs = [] addrs = []
for transport in self.network.listeners.values(): for transport in self._network.listeners.values():
for addr in transport.get_addrs(): for addr in transport.get_addrs():
addrs.append(addr.encapsulate(p2p_part)) addrs.append(addr.encapsulate(p2p_part))
return addrs return addrs
@ -57,7 +58,7 @@ class BasicHost(IHost):
:param stream_handler: a stream handler function :param stream_handler: a stream handler function
:return: true if successful :return: true if successful
""" """
return self.network.set_stream_handler(protocol_id, stream_handler) return self._network.set_stream_handler(protocol_id, stream_handler)
# protocol_id can be a list of protocol_ids # protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on # stream will decide which protocol_id to run on
@ -67,7 +68,7 @@ class BasicHost(IHost):
:param protocol_id: protocol id that stream runs on :param protocol_id: protocol id that stream runs on
:return: true if successful :return: true if successful
""" """
stream = await self.network.new_stream(peer_id, protocol_ids) stream = await self._network.new_stream(peer_id, protocol_ids)
return stream return stream
async def connect(self, peer_info): async def connect(self, peer_info):
@ -84,7 +85,7 @@ class BasicHost(IHost):
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
# there is already a connection to this peer # there is already a connection to this peer
if peer_info.peer_id in self.network.connections: if peer_info.peer_id in self._network.connections:
return return
await self.network.dial_peer(peer_info.peer_id) await self._network.dial_peer(peer_info.peer_id)

View File

@ -119,7 +119,7 @@ class ValueSpiderCrawl(SpiderCrawl):
value_counts = Counter(values) value_counts = Counter(values)
if len(value_counts) != 1: if len(value_counts) != 1:
log.warning("Got multiple values for key %i: %s", log.warning("Got multiple values for key %i: %s",
self.node.long_id, str(values)) self.node.xor_id, str(values))
value = value_counts.most_common(1)[0][0] value = value_counts.most_common(1)[0][0]
peer = self.nearest_without_value.popleft() peer = self.nearest_without_value.popleft()

View File

@ -15,8 +15,9 @@ class KadPeerInfo(PeerInfo):
def __init__(self, peer_id, peer_data=None): def __init__(self, peer_id, peer_data=None):
super(KadPeerInfo, self).__init__(peer_id, peer_data) super(KadPeerInfo, self).__init__(peer_id, peer_data)
self.peer_id_obj = peer_id
self.peer_id = peer_id.get_raw_id() self.peer_id = peer_id.get_raw_id()
self.long_id = int(digest(peer_id.get_raw_id()).hex(), 16) self.xor_id = peer_id.get_xor_id()
self.addrs = peer_data.get_addrs() if peer_data else None self.addrs = peer_data.get_addrs() if peer_data else None
@ -34,7 +35,7 @@ class KadPeerInfo(PeerInfo):
""" """
Get the distance between this node and another. Get the distance between this node and another.
""" """
return self.long_id ^ node.long_id return self.xor_id ^ node.xor_id
def __iter__(self): def __iter__(self):
""" """
@ -43,11 +44,15 @@ class KadPeerInfo(PeerInfo):
return iter([self.peer_id, self.ip, self.port]) return iter([self.peer_id, self.ip, self.port])
def __repr__(self): def __repr__(self):
return repr([self.long_id, self.ip, self.port]) return repr([self.xor_id, self.ip, self.port, self.peer_id])
def __str__(self): def __str__(self):
return "%s:%s" % (self.ip, str(self.port)) return "%s:%s" % (self.ip, str(self.port))
def encode(self):
return str(self.peer_id) + "\n" + \
str("/ip4/" + str(self.ip) + "/udp/" + str(self.port))
class KadPeerHeap: class KadPeerHeap:
""" """
A heap of peers ordered by distance to a given node. A heap of peers ordered by distance to a given node.

View File

@ -67,7 +67,7 @@ class KademliaServer:
listen = loop.create_datagram_endpoint(self._create_protocol, listen = loop.create_datagram_endpoint(self._create_protocol,
local_addr=(interface, port)) local_addr=(interface, port))
log.info("Node %i listening on %s:%i", log.info("Node %i listening on %s:%i",
self.node.long_id, interface, port) self.node.xor_id, interface, port)
self.transport, self.protocol = await listen self.transport, self.protocol = await listen
# finally, schedule refreshing table # finally, schedule refreshing table
self.refresh_table() self.refresh_table()

View File

@ -1,21 +0,0 @@
from libp2p.host.basic_host import BasicHost
class RoutedHost(BasicHost):
def __init__(self, _network, _kad_network):
super(RoutedHost, self).__init__(_network)
self.kad_network = _kad_network
def get_kad_network(self):
return self.kad_network
def routed_listen(self, port, interface='0.0.0.0'):
return self.kad_network.listen(port, interface)
def routed_get(self, key):
return self.kad_network.get(key)
def routed_set(self, key, value):
return self.kad_network.set(key, value)
def routed_set_digest(self, dkey, value):
return self.kad_network.set_digest(dkey, value)

View File

@ -33,7 +33,7 @@ class KBucket:
one = KBucket(self.range[0], midpoint, self.ksize) one = KBucket(self.range[0], midpoint, self.ksize)
two = KBucket(midpoint + 1, self.range[1], self.ksize) two = KBucket(midpoint + 1, self.range[1], self.ksize)
for node in self.nodes.values(): for node in self.nodes.values():
bucket = one if node.long_id <= midpoint else two bucket = one if node.xor_id <= midpoint else two
bucket.nodes[node.peer_id] = node bucket.nodes[node.peer_id] = node
return (one, two) return (one, two)
@ -48,7 +48,7 @@ class KBucket:
self.nodes[newnode.peer_id] = newnode self.nodes[newnode.peer_id] = newnode
def has_in_range(self, node): def has_in_range(self, node):
return self.range[0] <= node.long_id <= self.range[1] return self.range[0] <= node.xor_id <= self.range[1]
def is_new_node(self, node): def is_new_node(self, node):
return node.peer_id not in self.nodes return node.peer_id not in self.nodes
@ -175,7 +175,7 @@ class RoutingTable:
Get the index of the bucket that the given node would fall into. Get the index of the bucket that the given node would fall into.
""" """
for index, bucket in enumerate(self.buckets): for index, bucket in enumerate(self.buckets):
if node.long_id < bucket.range[1]: if node.xor_id < bucket.range[1]:
return index return index
# we should never be here, but make linter happy # we should never be here, but make linter happy
return None return None

View File

@ -10,13 +10,14 @@ from .stream.net_stream import NetStream
from .connection.raw_connection import RawConnection from .connection.raw_connection import RawConnection
class Swarm(INetwork): class Swarm(INetwork):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop # pylint: disable=too-many-instance-attributes,cell-var-from-loop,too-many-arguments
def __init__(self, peer_id, peerstore, upgrader, transport): def __init__(self, peer_id, peerstore, upgrader, transport, router):
self.self_id = peer_id self.self_id = peer_id
self.peerstore = peerstore self.peerstore = peerstore
self.upgrader = upgrader self.upgrader = upgrader
self.transport = transport self.transport = transport
self.router = router
self.connections = dict() self.connections = dict()
self.listeners = dict() self.listeners = dict()
self.stream_handlers = dict() self.stream_handlers = dict()
@ -57,8 +58,10 @@ class Swarm(INetwork):
if not addrs: if not addrs:
raise SwarmException("No known addresses to peer") raise SwarmException("No known addresses to peer")
# TODO: define logic to choose which address to use, or try them all ? if not self.router:
multiaddr = addrs[0] multiaddr = addrs[0]
else:
multiaddr = self.router.find_peer(peer_id)
if peer_id in self.connections: if peer_id in self.connections:
# If muxed connection already exists for peer_id, # If muxed connection already exists for peer_id,
@ -183,6 +186,9 @@ class Swarm(INetwork):
return True return True
return False return False
def add_router(self, router):
self.router = router
def create_generic_protocol_handler(swarm): def create_generic_protocol_handler(swarm):
""" """
Create a generic protocol handler from the given swarm. We use swarm Create a generic protocol handler from the given swarm. We use swarm

View File

@ -1,3 +1,4 @@
import hashlib
import base58 import base58
import multihash import multihash
@ -21,6 +22,9 @@ class ID:
def pretty(self): def pretty(self):
return base58.b58encode(self._id_str).decode() return base58.b58encode(self._id_str).decode()
def get_xor_id(self):
return int(digest(self.get_raw_id()).hex(), 16)
def __str__(self): def __str__(self):
pid = self.pretty() pid = self.pretty()
if len(pid) <= 10: if len(pid) <= 10:
@ -67,3 +71,8 @@ def id_from_public_key(key):
def id_from_private_key(key): def id_from_private_key(key):
return id_from_public_key(key.publickey()) return id_from_public_key(key.publickey())
def digest(string):
if not isinstance(string, bytes):
string = str(string).encode('utf8')
return hashlib.sha1(string).digest()

View File

@ -28,4 +28,4 @@ class IPeerRouting(ABC):
Find specific Peer Find specific Peer
FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo
with relevant addresses. with relevant addresses.
""" """

View File

@ -0,0 +1,35 @@
import ast
from libp2p.routing.interfaces import IPeerRouting
from libp2p.kademlia.kad_peerinfo import create_kad_peerinfo
class KadmeliaPeerRouter(IPeerRouting):
# pylint: disable=too-few-public-methods
def __init__(self, dht_server):
self.server = dht_server
async def find_peer(self, peer_id):
"""
Find a specific peer
:param peer_id: peer to search for
:return: KadPeerInfo of specified peer
"""
# switching peer_id to xor_id used by kademlia as node_id
xor_id = peer_id.get_xor_id()
value = await self.server.get(xor_id)
return decode_peerinfo(value)
def decode_peerinfo(encoded):
if isinstance(encoded, bytes):
encoded = encoded.decode()
try:
lines = ast.literal_eval(encoded)
except SyntaxError:
return None
ip = lines[1] # pylint: disable=invalid-name
port = lines[2]
peer_id = lines[3]
peer_info = create_kad_peerinfo(peer_id, ip, port)
return peer_info

View File

@ -1,31 +0,0 @@
from libp2p.routing.interfaces import IPeerRouting
from libp2p.kademlia.utils import digest
from libp2p.peer.peerinfo import PeerInfo
from libp2p.peer.peerdata import PeerData
class KadmeliaPeerRouter(IPeerRouting):
# pylint: disable=too-few-public-methods
def __init__(self, dht_server):
self.server = dht_server
def find_peer(self, peer_id):
"""
Find specific Peer
FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo
with relevant addresses.
"""
value = self.server.get(peer_id)
return decode_peerinfo(value)
def decode_peerinfo(encoded):
if isinstance(encoded, bytes):
encoded = encoded.decode()
lines = encoded.splitlines()
peer_id = lines[0]
addrs = lines[1:]
peer_data = PeerData()
peer_data.add_addrs(addrs)
return PeerInfo(peer_id, addrs)

View File

View File

@ -0,0 +1,73 @@
import pytest
from libp2p.kademlia.network import KademliaServer
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
@pytest.mark.asyncio
async def test_simple_two_nodes():
node_a = KademliaServer()
await node_a.listen(5678)
node_b = KademliaServer()
await node_b.listen(5679)
node_a_value = await node_b.bootstrap([("127.0.0.1", 5678)])
node_a_kad_peerinfo = node_a_value[0]
await node_a.set(node_a_kad_peerinfo.xor_id,
repr(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_b)
returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj)
print(repr(returned_info))
print(repr(node_a_kad_peerinfo))
assert repr(returned_info) == repr(node_a_kad_peerinfo)
@pytest.mark.asyncio
async def test_simple_three_nodes():
node_a = KademliaServer()
await node_a.listen(5701)
node_b = KademliaServer()
await node_b.listen(5702)
node_c = KademliaServer()
await node_c.listen(5703)
node_a_value = await node_b.bootstrap([("127.0.0.1", 5701)])
node_a_kad_peerinfo = node_a_value[0]
await node_c.bootstrap([("127.0.0.1", 5702)])
await node_a.set(node_a_kad_peerinfo.xor_id,
repr(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_c)
returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj)
assert str(returned_info) == str(node_a_kad_peerinfo)
@pytest.mark.asyncio
async def test_simple_four_nodes():
node_a = KademliaServer()
await node_a.listen(5801)
node_b = KademliaServer()
await node_b.listen(5802)
node_c = KademliaServer()
await node_c.listen(5803)
node_d = KademliaServer()
await node_d.listen(5804)
node_a_value = await node_b.bootstrap([("127.0.0.1", 5801)])
node_a_kad_peerinfo = node_a_value[0]
await node_c.bootstrap([("127.0.0.1", 5802)])
await node_d.bootstrap([("127.0.0.1", 5803)])
await node_b.set(node_a_kad_peerinfo.xor_id,
repr(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_d)
returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj)
assert str(returned_info) == str(node_a_kad_peerinfo)