refactoring of the code to implement IAdvertiser and IDiscoverer

This commit is contained in:
Christophe de Carvalho
2019-02-27 22:49:51 +01:00
parent 4889a0a790
commit cd8cb5c443
7 changed files with 260 additions and 0 deletions

View File

@ -23,6 +23,7 @@ async def cleanup_done_tasks():
# Some sleep necessary to context switch
await asyncio.sleep(3)
def initialize_default_swarm(
id_opt=None, transport_opt=None,
muxer_opt=None, sec_opt=None, peerstore_opt=None):
@ -54,6 +55,7 @@ def initialize_default_swarm(
return swarm_opt
async def new_node(
swarm_opt=None, id_opt=None, transport_opt=None,
muxer_opt=None, sec_opt=None, peerstore_opt=None):

View File

@ -0,0 +1,77 @@
import msgpack
from libp2p.peer.id import id_b58_decode
from libp2p.kademlia.network import Server
from libp2p.kademlia.node import Node
from libp2p.kademlia.utils import digest
from libp2p.kademlia.crawling import ValueMultipleSpiderCrawl
from libp2p.discovery.advertiser_interface import IAdvertiser
from libp2p.discovery.discoverer_interface import IDiscoverer
from libp2p.peer.peerinfo import PeerInfo
from libp2p.peer.peerdata import PeerData
class KademliaPeerRouter(IAdvertiser, IDiscoverer):
def __init__(self, host, bootstrap_nodes=None):
self.host = host
self.peer_id = host.get_id()
self.bootstrap_nodes = bootstrap_nodes
self.node = Server()
async def listen(self, port):
await self.node.listen(port)
if self.bootstrap_nodes:
await self.node.bootstrap(self.bootstrap_nodes)
async def advertise(self, service):
await self.node.set(service, self._make_advertise_msg())
def _make_advertise_msg(self):
peer_data = PeerData()
peer_data.add_addrs(self.host.get_addrs())
peer_info = PeerInfo(self.peer_id, peer_data)
if len(peer_info.addrs) < 1:
raise RuntimeError("not know address for self")
return encode_peer_info(peer_info)
async def find_peers(self, service):
key = dht_key(service)
target = Node(key)
nearest = self.node.protocol.router.find_neighbors(target)
if not nearest:
print("There are no known neighbors to get key %s", key)
return []
spider = ValueMultipleSpiderCrawl(self.node.protocol, target, nearest,
self.node.ksize, self.node.alpha)
values = await spider.find()
if values:
return list(map(decode_peer_info, values))
return []
def dht_key(service):
# TODO: should convert to Content Identification
return digest(service)
def encode_peer_info(peer_info):
return msgpack.dumps({
'peer_id': peer_info.peer_id.pretty(),
'addrs': [str(ma) for ma in peer_info.addrs]
})
def decode_peer_info(data):
info = msgpack.loads(data, raw=False)
peer_id = id_b58_decode(info['peer_id'])
peer_data = PeerData()
peer_data.add_addrs(info['addrs'])
return PeerInfo(peer_id, peer_data)

View File

@ -0,0 +1,61 @@
from libp2p.peer.peerstore import PeerStoreError
from .basic_host import BasicHost
class RoutedHost(BasicHost):
# default options constructor
def __init__(self, host, router):
super().__init__(host.network)
self.host = host
self.router = router
async def advertise(self, service):
await self.router.advertise(service)
async def connect(self, peer_info):
"""
connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal
peerstore. If there is not an active connection, connect will issue a
dial, and block until a connection is open, or an error is
returned.
:param peer_info: peer_info of the host we want to connect to
:type peer_info: peer.peerinfo.PeerInfo
"""
# there is already a connection to this peer
if peer_info.peer_id in self.network.connections:
return
# Check if we have some address for that peer
# if not, we use the router to get information about the peer
peer_info.addrs = await self._find_peer_addrs(peer_info.peer_id)
# if addrs are given, save them
if peer_info.addrs:
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
# try to connect
await super().connect(peer_info)
def find_peers(self, service):
return self.router.find_peers(service)
async def _find_peer_addrs(self, peer_id):
try:
addrs = self.peerstore.addrs(peer_id)
except PeerStoreError:
addrs = None
if not addrs:
peers_info = await self.router.find_peers(peer_id.pretty())
if not peers_info:
raise KeyError("no address found for this peer_id %s" % str(peer_id))
peer_info = peers_info[0] # todo: handle multiple response
if peer_info.peer_id != peer_id:
raise RuntimeError('routing failure: provided addrs for different peer')
addrs = peer_info.addrs
return addrs

View File

@ -69,6 +69,46 @@ class SpiderCrawl:
raise NotImplementedError
class ValueMultipleSpiderCrawl(SpiderCrawl):
# TODO: move this out of this module
def __init__(self, protocol, node, peers, ksize, alpha):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
# keep track of the single nearest node without value - per
# section 2.3 so we can set the key there if found
self.nearest_without_value = NodeHeap(self.node, 1)
async def find(self):
"""
Find either the closest nodes or the value requested.
"""
return await self._find(self.protocol.call_find_value)
async def _nodes_found(self, responses):
"""
Handle the result of an iteration in _find.
"""
toremove = []
found_values = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
elif response.has_value():
found_values.append(response.get_value())
else:
peer = self.nearest.get_node(peerid)
self.nearest_without_value.push(peer)
self.nearest.push(response.get_node_list())
self.nearest.remove(toremove)
if found_values:
return found_values
if self.nearest.have_contacted_all():
# not found!
return None
return await self.find()
class ValueSpiderCrawl(SpiderCrawl):
def __init__(self, protocol, node, peers, ksize, alpha):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)