From 9ddbd18dedb1a81028a25f029f6b15b5c92cb984 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Thu, 18 Apr 2019 19:21:43 -0400 Subject: [PATCH] replace node with KadPeerInfo replace node with kadpeerinfo --- libp2p/kademlia/crawling.py | 13 +-- libp2p/kademlia/kad_peerinfo.py | 112 ++++++++++++++++++++ libp2p/kademlia/network.py | 25 +++-- libp2p/kademlia/node.py | 180 ++++++++++++++++---------------- libp2p/kademlia/protocol.py | 59 ++++++++--- 5 files changed, 271 insertions(+), 118 deletions(-) create mode 100644 libp2p/kademlia/kad_peerinfo.py diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index 6006bef7..a1756af0 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -1,7 +1,8 @@ from collections import Counter import logging -from .node import Node, NodeHeap +from .kad_peerinfo import KadPeerInfo, KadPeerHeap +from libp2p.peer.id import ID from .utils import gather_dict @@ -32,7 +33,7 @@ class SpiderCrawl: self.ksize = ksize self.alpha = alpha self.node = node - self.nearest = NodeHeap(self.node, self.ksize) + self.nearest = KadPeerHeap(self.node, self.ksize) self.last_ids_crawled = [] log.info("creating spider with peers: %s", peers) self.nearest.push(peers) @@ -61,7 +62,7 @@ class SpiderCrawl: dicts = {} for peer in self.nearest.get_uncontacted()[:count]: - dicts[peer.id] = rpcmethod(peer, self.node) + dicts[peer.peer_id] = rpcmethod(peer, self.node) self.nearest.mark_contacted(peer) found = await gather_dict(dicts) return await self._nodes_found(found) @@ -76,7 +77,7 @@ class ValueSpiderCrawl(SpiderCrawl): SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) # keep track of the single nearest node without value - per # section 2.3 so we can set the key there if found - self.nearest_without_value = NodeHeap(self.node, 1) + self.nearest_without_value = KadPeerHeap(self.node, 1) async def find(self): """ @@ -124,7 +125,7 @@ class ValueSpiderCrawl(SpiderCrawl): peer = self.nearest_without_value.popleft() if peer: - await self.protocol.call_store(peer, self.node.id, value) + await self.protocol.call_store(peer, self.node.peer_id, value) return value @@ -183,4 +184,4 @@ class RPCFindResponse: be set. """ nodelist = self.response[1] or [] - return [Node(*nodeple) for nodeple in nodelist] + return [KadPeerInfo(ID(*nodeple)) for nodeple in nodelist] diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py new file mode 100644 index 00000000..d227b113 --- /dev/null +++ b/libp2p/kademlia/kad_peerinfo.py @@ -0,0 +1,112 @@ +import heapq +from operator import itemgetter +from libp2p.peer.peerinfo import PeerInfo + + +class KadPeerInfo(PeerInfo): + def __init__(self, peer_id, peer_data): + super(KadPeerInfo, self).__init__(peer_id, peer_data) + self.long_id = int(peer_id.hex(), 16) + + def same_home_as(self, node): + #TODO: handle more than one addr + return self.addrs[0] == node.addrs[0] + + def distance_to(self, node): + """ + Get the distance between this node and another. + """ + return self.long_id ^ node.long_id + + def __iter__(self): + """ + Enables use of Node as a tuple - i.e., tuple(node) works. + """ + return iter([self.peer_id.pretty(), str(self.addrs[0])]) + + def __repr__(self): + return repr([self.long_id, str(self.addrs[0])]) + + def __str__(self): + return str(self.addrs[0]) + +class KadPeerHeap: + """ + A heap of peers ordered by distance to a given node. + """ + def __init__(self, node, maxsize): + """ + Constructor. + + @param node: The node to measure all distnaces from. + @param maxsize: The maximum size that this heap can grow to. + """ + self.node = node + self.heap = [] + self.contacted = set() + self.maxsize = maxsize + + def remove(self, peers): + """ + Remove a list of peer ids from this heap. Note that while this + heap retains a constant visible size (based on the iterator), it's + actual size may be quite a bit larger than what's exposed. Therefore, + removal of nodes may not change the visible size as previously added + nodes suddenly become visible. + """ + peers = set(peers) + if not peers: + return + nheap = [] + for distance, node in self.heap: + if node.peer_id not in peers: + heapq.heappush(nheap, (distance, node)) + self.heap = nheap + + def get_node(self, node_id): + for _, node in self.heap: + if node.peer_id == node_id: + return node + return None + + def have_contacted_all(self): + return len(self.get_uncontacted()) == 0 + + def get_ids(self): + return [n.peer_id for n in self] + + def mark_contacted(self, node): + self.contacted.add(node.peer_id) + + def popleft(self): + return heapq.heappop(self.heap)[1] if self else None + + def push(self, nodes): + """ + Push nodes onto heap. + + @param nodes: This can be a single item or a C{list}. + """ + if not isinstance(nodes, list): + nodes = [nodes] + + for node in nodes: + if node not in self: + distance = self.node.distance_to(node) + heapq.heappush(self.heap, (distance, node)) + + def __len__(self): + return min(len(self.heap), self.maxsize) + + def __iter__(self): + nodes = heapq.nsmallest(self.maxsize, self.heap) + return iter(map(itemgetter(1), nodes)) + + def __contains__(self, node): + for _, other in self.heap: + if node.peer_id == other.peer_id: + return True + return False + + def get_uncontacted(self): + return [n for n in self if n.peer_id not in self.contacted] diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index bbfbad60..2933a7ee 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -5,11 +5,14 @@ import random import pickle import asyncio import logging +from multiaddr import Multiaddr +from libp2p.peer.id import ID +from libp2p.peer.peerdata import PeerData from .protocol import KademliaProtocol from .utils import digest from .storage import ForgetfulStorage -from .node import Node +from .kad_peerinfo import KadPeerInfo from .crawling import ValueSpiderCrawl from .crawling import NodeSpiderCrawl @@ -39,7 +42,8 @@ class Server: self.ksize = ksize self.alpha = alpha self.storage = storage or ForgetfulStorage() - self.node = Node(node_id or digest(random.getrandbits(255))) + new_node_id = ID(node_id) if node_id else ID(digest(random.getrandbits(255))) + self.node = KadPeerInfo(new_node_id, None) self.transport = None self.protocol = None self.refresh_loop = None @@ -86,7 +90,7 @@ class Server: """ results = [] for node_id in self.protocol.get_refresh_ids(): - node = Node(node_id) + node = KadPeerInfo(node_id, None) nearest = self.protocol.router.find_neighbors(node, self.alpha) spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) @@ -130,8 +134,12 @@ class Server: return await spider.find() async def bootstrap_node(self, addr): - result = await self.protocol.ping(addr, self.node.id) - return Node(result[1], addr[0], addr[1]) if result[0] else None + result = await self.protocol.ping(addr, self.node.peer_id) + node_id = ID(result[1]) + peer_data = PeerData() #pylint: disable=no-value-for-parameter + addr = [Multiaddr("/ip4/" + str(addr[0]) + "/udp/" + str(addr[1]))] + peer_data.add_addrs(addr) + return KadPeerInfo(node_id, peer_data) if result[0] else None async def get(self, key): """ @@ -145,7 +153,8 @@ class Server: # if this node has it, return it if self.storage.get(dkey) is not None: return self.storage.get(dkey) - node = Node(dkey) + + node = KadPeerInfo(ID(dkey)) nearest = self.protocol.router.find_neighbors(node) if not nearest: log.warning("There are no known neighbors to get key %s", key) @@ -171,7 +180,7 @@ class Server: Set the given SHA1 digest key (bytes) to the given value in the network. """ - node = Node(dkey) + node = KadPeerInfo(ID(dkey)) nearest = self.protocol.router.find_neighbors(node) if not nearest: @@ -201,7 +210,7 @@ class Server: data = { 'ksize': self.ksize, 'alpha': self.alpha, - 'id': self.node.id, + 'id': self.node.peer_id, 'neighbors': self.bootstrappable_neighbors() } if not data['neighbors']: diff --git a/libp2p/kademlia/node.py b/libp2p/kademlia/node.py index 2f087f5d..251b5d89 100644 --- a/libp2p/kademlia/node.py +++ b/libp2p/kademlia/node.py @@ -1,113 +1,113 @@ -from operator import itemgetter -import heapq +# from operator import itemgetter +# import heapq -class Node: - def __init__(self, node_id, ip=None, port=None): - self.id = node_id # pylint: disable=invalid-name - self.ip = ip # pylint: disable=invalid-name - self.port = port - self.long_id = int(node_id.hex(), 16) +# class Node: +# def __init__(self, node_id, ip=None, port=None): +# self.id = node_id # pylint: disable=invalid-name +# self.ip = ip # pylint: disable=invalid-name +# self.port = port +# self.long_id = int(node_id.hex(), 16) - def same_home_as(self, node): - return self.ip == node.ip and self.port == node.port +# def same_home_as(self, node): +# return self.ip == node.ip and self.port == node.port - def distance_to(self, node): - """ - Get the distance between this node and another. - """ - return self.long_id ^ node.long_id +# def distance_to(self, node): +# """ +# Get the distance between this node and another. +# """ +# return self.long_id ^ node.long_id - def __iter__(self): - """ - Enables use of Node as a tuple - i.e., tuple(node) works. - """ - return iter([self.id, self.ip, self.port]) +# def __iter__(self): +# """ +# Enables use of Node as a tuple - i.e., tuple(node) works. +# """ +# return iter([self.id, self.ip, self.port]) - def __repr__(self): - return repr([self.long_id, self.ip, self.port]) +# def __repr__(self): +# return repr([self.long_id, self.ip, self.port]) - def __str__(self): - return "%s:%s" % (self.ip, str(self.port)) +# def __str__(self): +# return "%s:%s" % (self.ip, str(self.port)) -class NodeHeap: - """ - A heap of nodes ordered by distance to a given node. - """ - def __init__(self, node, maxsize): - """ - Constructor. +# class NodeHeap: +# """ +# A heap of nodes ordered by distance to a given node. +# """ +# def __init__(self, node, maxsize): +# """ +# Constructor. - @param node: The node to measure all distnaces from. - @param maxsize: The maximum size that this heap can grow to. - """ - self.node = node - self.heap = [] - self.contacted = set() - self.maxsize = maxsize +# @param node: The node to measure all distnaces from. +# @param maxsize: The maximum size that this heap can grow to. +# """ +# self.node = node +# self.heap = [] +# self.contacted = set() +# self.maxsize = maxsize - def remove(self, peers): - """ - Remove a list of peer ids from this heap. Note that while this - heap retains a constant visible size (based on the iterator), it's - actual size may be quite a bit larger than what's exposed. Therefore, - removal of nodes may not change the visible size as previously added - nodes suddenly become visible. - """ - peers = set(peers) - if not peers: - return - nheap = [] - for distance, node in self.heap: - if node.id not in peers: - heapq.heappush(nheap, (distance, node)) - self.heap = nheap +# def remove(self, peers): +# """ +# Remove a list of peer ids from this heap. Note that while this +# heap retains a constant visible size (based on the iterator), it's +# actual size may be quite a bit larger than what's exposed. Therefore, +# removal of nodes may not change the visible size as previously added +# nodes suddenly become visible. +# """ +# peers = set(peers) +# if not peers: +# return +# nheap = [] +# for distance, node in self.heap: +# if node.id not in peers: +# heapq.heappush(nheap, (distance, node)) +# self.heap = nheap - def get_node(self, node_id): - for _, node in self.heap: - if node.id == node_id: - return node - return None +# def get_node(self, node_id): +# for _, node in self.heap: +# if node.id == node_id: +# return node +# return None - def have_contacted_all(self): - return len(self.get_uncontacted()) == 0 +# def have_contacted_all(self): +# return len(self.get_uncontacted()) == 0 - def get_ids(self): - return [n.id for n in self] +# def get_ids(self): +# return [n.id for n in self] - def mark_contacted(self, node): - self.contacted.add(node.id) +# def mark_contacted(self, node): +# self.contacted.add(node.id) - def popleft(self): - return heapq.heappop(self.heap)[1] if self else None +# def popleft(self): +# return heapq.heappop(self.heap)[1] if self else None - def push(self, nodes): - """ - Push nodes onto heap. +# def push(self, nodes): +# """ +# Push nodes onto heap. - @param nodes: This can be a single item or a C{list}. - """ - if not isinstance(nodes, list): - nodes = [nodes] +# @param nodes: This can be a single item or a C{list}. +# """ +# if not isinstance(nodes, list): +# nodes = [nodes] - for node in nodes: - if node not in self: - distance = self.node.distance_to(node) - heapq.heappush(self.heap, (distance, node)) +# for node in nodes: +# if node not in self: +# distance = self.node.distance_to(node) +# heapq.heappush(self.heap, (distance, node)) - def __len__(self): - return min(len(self.heap), self.maxsize) +# def __len__(self): +# return min(len(self.heap), self.maxsize) - def __iter__(self): - nodes = heapq.nsmallest(self.maxsize, self.heap) - return iter(map(itemgetter(1), nodes)) +# def __iter__(self): +# nodes = heapq.nsmallest(self.maxsize, self.heap) +# return iter(map(itemgetter(1), nodes)) - def __contains__(self, node): - for _, other in self.heap: - if node.id == other.id: - return True - return False +# def __contains__(self, node): +# for _, other in self.heap: +# if node.id == other.id: +# return True +# return False - def get_uncontacted(self): - return [n for n in self if n.id not in self.contacted] +# def get_uncontacted(self): +# return [n for n in self if n.id not in self.contacted] diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 52732308..b43098c5 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -4,7 +4,7 @@ import logging from rpcudp.protocol import RPCProtocol -from .node import Node +from .kad_peerinfo import KadPeerInfo from .routing import RoutingTable from .utils import digest @@ -47,12 +47,28 @@ class KademliaProtocol(RPCProtocol): return sender def rpc_ping(self, sender, nodeid): - source = Node(nodeid, sender[0], sender[1]) + print ("RPC PING") + print (sender) + + node_id = ID(nodeid) + peer_data = PeerData() #pylint: disable=no-value-for-parameter + addr = [Multiaddr("/ip4/" + str(sender[0]) + "/udp/" + str(sender[1]))] + peer_data.add_addrs(addr) + source = KadPeerInfo(node_id, peer_data) + self.welcome_if_new(source) - return self.source_node.id + return self.source_node.peer_id def rpc_store(self, sender, nodeid, key, value): - source = Node(nodeid, sender[0], sender[1]) + print ("RPC STORE") + print (sender) + + node_id = ID(nodeid) + peer_data = PeerData() #pylint: disable=no-value-for-parameter + addr = [Multiaddr("/ip4/" + str(sender[0]) + "/udp/" + str(sender[1]))] + peer_data.add_addrs(addr) + source = KadPeerInfo(node_id, peer_data) + self.welcome_if_new(source) log.debug("got a store request from %s, storing '%s'='%s'", sender, key.hex(), value) @@ -62,14 +78,29 @@ class KademliaProtocol(RPCProtocol): def rpc_find_node(self, sender, nodeid, key): log.info("finding neighbors of %i in local table", int(nodeid.hex(), 16)) - source = Node(nodeid, sender[0], sender[1]) + + node_id = ID(nodeid) + peer_data = PeerData() #pylint: disable=no-value-for-parameter + addr = [Multiaddr("/ip4/" + str(sender[0]) + "/udp/" + str(sender[1]))] + peer_data.add_addrs(addr) + source = KadPeerInfo(node_id, peer_data) + + # source = Node(nodeid, sender[0], sender[1]) self.welcome_if_new(source) - node = Node(key) + node = KadPeerInfo(ID(key)) neighbors = self.router.find_neighbors(node, exclude=source) return list(map(tuple, neighbors)) def rpc_find_value(self, sender, nodeid, key): - source = Node(nodeid, sender[0], sender[1]) + print ("RPC_FIND_VALUE") + print (sender) + + node_id = ID(nodeid) + peer_data = PeerData() #pylint: disable=no-value-for-parameter + addr = [Multiaddr("/ip4/" + str(sender[0]) + "/udp/" + str(sender[1]))] + peer_data.add_addrs(addr) + source = KadPeerInfo(node_id, peer_data) + self.welcome_if_new(source) value = self.storage.get(key, None) if value is None: @@ -78,24 +109,24 @@ class KademliaProtocol(RPCProtocol): async def call_find_node(self, node_to_ask, node_to_find): address = (node_to_ask.ip, node_to_ask.port) - result = await self.find_node(address, self.source_node.id, - node_to_find.id) + result = await self.find_node(address, self.source_node.peer_id, + node_to_find.peer_id) return self.handle_call_response(result, node_to_ask) async def call_find_value(self, node_to_ask, node_to_find): address = (node_to_ask.ip, node_to_ask.port) - result = await self.find_value(address, self.source_node.id, - node_to_find.id) + result = await self.find_value(address, self.source_node.peer_id, + node_to_find.peer_id) return self.handle_call_response(result, node_to_ask) async def call_ping(self, node_to_ask): address = (node_to_ask.ip, node_to_ask.port) - result = await self.ping(address, self.source_node.id) + result = await self.ping(address, self.source_node.peer_id) return self.handle_call_response(result, node_to_ask) async def call_store(self, node_to_ask, key, value): address = (node_to_ask.ip, node_to_ask.port) - result = await self.store(address, self.source_node.id, key, value) + result = await self.store(address, self.source_node.peer_id, key, value) return self.handle_call_response(result, node_to_ask) def welcome_if_new(self, node): @@ -117,7 +148,7 @@ class KademliaProtocol(RPCProtocol): log.info("never seen %s before, adding to router", node) for key, value in self.storage: - keynode = Node(digest(key)) + keynode = KadPeerInfo(ID(digest(key))) neighbors = self.router.find_neighbors(keynode) if neighbors: last = neighbors[-1].distance_to(keynode)