replace node with KadPeerInfo

replace node with kadpeerinfo
This commit is contained in:
zixuanzh
2019-04-18 19:21:43 -04:00
parent cce226c714
commit 9ddbd18ded
5 changed files with 271 additions and 118 deletions

View File

@ -1,7 +1,8 @@
from collections import Counter from collections import Counter
import logging import logging
from .node import Node, NodeHeap from .kad_peerinfo import KadPeerInfo, KadPeerHeap
from libp2p.peer.id import ID
from .utils import gather_dict from .utils import gather_dict
@ -32,7 +33,7 @@ class SpiderCrawl:
self.ksize = ksize self.ksize = ksize
self.alpha = alpha self.alpha = alpha
self.node = node self.node = node
self.nearest = NodeHeap(self.node, self.ksize) self.nearest = KadPeerHeap(self.node, self.ksize)
self.last_ids_crawled = [] self.last_ids_crawled = []
log.info("creating spider with peers: %s", peers) log.info("creating spider with peers: %s", peers)
self.nearest.push(peers) self.nearest.push(peers)
@ -61,7 +62,7 @@ class SpiderCrawl:
dicts = {} dicts = {}
for peer in self.nearest.get_uncontacted()[:count]: for peer in self.nearest.get_uncontacted()[:count]:
dicts[peer.id] = rpcmethod(peer, self.node) dicts[peer.peer_id] = rpcmethod(peer, self.node)
self.nearest.mark_contacted(peer) self.nearest.mark_contacted(peer)
found = await gather_dict(dicts) found = await gather_dict(dicts)
return await self._nodes_found(found) return await self._nodes_found(found)
@ -76,7 +77,7 @@ class ValueSpiderCrawl(SpiderCrawl):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
# keep track of the single nearest node without value - per # keep track of the single nearest node without value - per
# section 2.3 so we can set the key there if found # section 2.3 so we can set the key there if found
self.nearest_without_value = NodeHeap(self.node, 1) self.nearest_without_value = KadPeerHeap(self.node, 1)
async def find(self): async def find(self):
""" """
@ -124,7 +125,7 @@ class ValueSpiderCrawl(SpiderCrawl):
peer = self.nearest_without_value.popleft() peer = self.nearest_without_value.popleft()
if peer: if peer:
await self.protocol.call_store(peer, self.node.id, value) await self.protocol.call_store(peer, self.node.peer_id, value)
return value return value
@ -183,4 +184,4 @@ class RPCFindResponse:
be set. be set.
""" """
nodelist = self.response[1] or [] nodelist = self.response[1] or []
return [Node(*nodeple) for nodeple in nodelist] return [KadPeerInfo(ID(*nodeple)) for nodeple in nodelist]

View File

@ -0,0 +1,112 @@
import heapq
from operator import itemgetter
from libp2p.peer.peerinfo import PeerInfo
class KadPeerInfo(PeerInfo):
def __init__(self, peer_id, peer_data):
super(KadPeerInfo, self).__init__(peer_id, peer_data)
self.long_id = int(peer_id.hex(), 16)
def same_home_as(self, node):
#TODO: handle more than one addr
return self.addrs[0] == node.addrs[0]
def distance_to(self, node):
"""
Get the distance between this node and another.
"""
return self.long_id ^ node.long_id
def __iter__(self):
"""
Enables use of Node as a tuple - i.e., tuple(node) works.
"""
return iter([self.peer_id.pretty(), str(self.addrs[0])])
def __repr__(self):
return repr([self.long_id, str(self.addrs[0])])
def __str__(self):
return str(self.addrs[0])
class KadPeerHeap:
"""
A heap of peers ordered by distance to a given node.
"""
def __init__(self, node, maxsize):
"""
Constructor.
@param node: The node to measure all distnaces from.
@param maxsize: The maximum size that this heap can grow to.
"""
self.node = node
self.heap = []
self.contacted = set()
self.maxsize = maxsize
def remove(self, peers):
"""
Remove a list of peer ids from this heap. Note that while this
heap retains a constant visible size (based on the iterator), it's
actual size may be quite a bit larger than what's exposed. Therefore,
removal of nodes may not change the visible size as previously added
nodes suddenly become visible.
"""
peers = set(peers)
if not peers:
return
nheap = []
for distance, node in self.heap:
if node.peer_id not in peers:
heapq.heappush(nheap, (distance, node))
self.heap = nheap
def get_node(self, node_id):
for _, node in self.heap:
if node.peer_id == node_id:
return node
return None
def have_contacted_all(self):
return len(self.get_uncontacted()) == 0
def get_ids(self):
return [n.peer_id for n in self]
def mark_contacted(self, node):
self.contacted.add(node.peer_id)
def popleft(self):
return heapq.heappop(self.heap)[1] if self else None
def push(self, nodes):
"""
Push nodes onto heap.
@param nodes: This can be a single item or a C{list}.
"""
if not isinstance(nodes, list):
nodes = [nodes]
for node in nodes:
if node not in self:
distance = self.node.distance_to(node)
heapq.heappush(self.heap, (distance, node))
def __len__(self):
return min(len(self.heap), self.maxsize)
def __iter__(self):
nodes = heapq.nsmallest(self.maxsize, self.heap)
return iter(map(itemgetter(1), nodes))
def __contains__(self, node):
for _, other in self.heap:
if node.peer_id == other.peer_id:
return True
return False
def get_uncontacted(self):
return [n for n in self if n.peer_id not in self.contacted]

View File

@ -5,11 +5,14 @@ import random
import pickle import pickle
import asyncio import asyncio
import logging import logging
from multiaddr import Multiaddr
from libp2p.peer.id import ID
from libp2p.peer.peerdata import PeerData
from .protocol import KademliaProtocol from .protocol import KademliaProtocol
from .utils import digest from .utils import digest
from .storage import ForgetfulStorage from .storage import ForgetfulStorage
from .node import Node from .kad_peerinfo import KadPeerInfo
from .crawling import ValueSpiderCrawl from .crawling import ValueSpiderCrawl
from .crawling import NodeSpiderCrawl from .crawling import NodeSpiderCrawl
@ -39,7 +42,8 @@ class Server:
self.ksize = ksize self.ksize = ksize
self.alpha = alpha self.alpha = alpha
self.storage = storage or ForgetfulStorage() self.storage = storage or ForgetfulStorage()
self.node = Node(node_id or digest(random.getrandbits(255))) new_node_id = ID(node_id) if node_id else ID(digest(random.getrandbits(255)))
self.node = KadPeerInfo(new_node_id, None)
self.transport = None self.transport = None
self.protocol = None self.protocol = None
self.refresh_loop = None self.refresh_loop = None
@ -86,7 +90,7 @@ class Server:
""" """
results = [] results = []
for node_id in self.protocol.get_refresh_ids(): for node_id in self.protocol.get_refresh_ids():
node = Node(node_id) node = KadPeerInfo(node_id, None)
nearest = self.protocol.router.find_neighbors(node, self.alpha) nearest = self.protocol.router.find_neighbors(node, self.alpha)
spider = NodeSpiderCrawl(self.protocol, node, nearest, spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha) self.ksize, self.alpha)
@ -130,8 +134,12 @@ class Server:
return await spider.find() return await spider.find()
async def bootstrap_node(self, addr): async def bootstrap_node(self, addr):
result = await self.protocol.ping(addr, self.node.id) result = await self.protocol.ping(addr, self.node.peer_id)
return Node(result[1], addr[0], addr[1]) if result[0] else None node_id = ID(result[1])
peer_data = PeerData() #pylint: disable=no-value-for-parameter
addr = [Multiaddr("/ip4/" + str(addr[0]) + "/udp/" + str(addr[1]))]
peer_data.add_addrs(addr)
return KadPeerInfo(node_id, peer_data) if result[0] else None
async def get(self, key): async def get(self, key):
""" """
@ -145,7 +153,8 @@ class Server:
# if this node has it, return it # if this node has it, return it
if self.storage.get(dkey) is not None: if self.storage.get(dkey) is not None:
return self.storage.get(dkey) return self.storage.get(dkey)
node = Node(dkey)
node = KadPeerInfo(ID(dkey))
nearest = self.protocol.router.find_neighbors(node) nearest = self.protocol.router.find_neighbors(node)
if not nearest: if not nearest:
log.warning("There are no known neighbors to get key %s", key) log.warning("There are no known neighbors to get key %s", key)
@ -171,7 +180,7 @@ class Server:
Set the given SHA1 digest key (bytes) to the given value in the Set the given SHA1 digest key (bytes) to the given value in the
network. network.
""" """
node = Node(dkey) node = KadPeerInfo(ID(dkey))
nearest = self.protocol.router.find_neighbors(node) nearest = self.protocol.router.find_neighbors(node)
if not nearest: if not nearest:
@ -201,7 +210,7 @@ class Server:
data = { data = {
'ksize': self.ksize, 'ksize': self.ksize,
'alpha': self.alpha, 'alpha': self.alpha,
'id': self.node.id, 'id': self.node.peer_id,
'neighbors': self.bootstrappable_neighbors() 'neighbors': self.bootstrappable_neighbors()
} }
if not data['neighbors']: if not data['neighbors']:

View File

@ -1,113 +1,113 @@
from operator import itemgetter # from operator import itemgetter
import heapq # import heapq
class Node: # class Node:
def __init__(self, node_id, ip=None, port=None): # def __init__(self, node_id, ip=None, port=None):
self.id = node_id # pylint: disable=invalid-name # self.id = node_id # pylint: disable=invalid-name
self.ip = ip # pylint: disable=invalid-name # self.ip = ip # pylint: disable=invalid-name
self.port = port # self.port = port
self.long_id = int(node_id.hex(), 16) # self.long_id = int(node_id.hex(), 16)
def same_home_as(self, node): # def same_home_as(self, node):
return self.ip == node.ip and self.port == node.port # return self.ip == node.ip and self.port == node.port
def distance_to(self, node): # def distance_to(self, node):
""" # """
Get the distance between this node and another. # Get the distance between this node and another.
""" # """
return self.long_id ^ node.long_id # return self.long_id ^ node.long_id
def __iter__(self): # def __iter__(self):
""" # """
Enables use of Node as a tuple - i.e., tuple(node) works. # Enables use of Node as a tuple - i.e., tuple(node) works.
""" # """
return iter([self.id, self.ip, self.port]) # return iter([self.id, self.ip, self.port])
def __repr__(self): # def __repr__(self):
return repr([self.long_id, self.ip, self.port]) # return repr([self.long_id, self.ip, self.port])
def __str__(self): # def __str__(self):
return "%s:%s" % (self.ip, str(self.port)) # return "%s:%s" % (self.ip, str(self.port))
class NodeHeap: # class NodeHeap:
""" # """
A heap of nodes ordered by distance to a given node. # A heap of nodes ordered by distance to a given node.
""" # """
def __init__(self, node, maxsize): # def __init__(self, node, maxsize):
""" # """
Constructor. # Constructor.
@param node: The node to measure all distnaces from. # @param node: The node to measure all distnaces from.
@param maxsize: The maximum size that this heap can grow to. # @param maxsize: The maximum size that this heap can grow to.
""" # """
self.node = node # self.node = node
self.heap = [] # self.heap = []
self.contacted = set() # self.contacted = set()
self.maxsize = maxsize # self.maxsize = maxsize
def remove(self, peers): # def remove(self, peers):
""" # """
Remove a list of peer ids from this heap. Note that while this # Remove a list of peer ids from this heap. Note that while this
heap retains a constant visible size (based on the iterator), it's # heap retains a constant visible size (based on the iterator), it's
actual size may be quite a bit larger than what's exposed. Therefore, # actual size may be quite a bit larger than what's exposed. Therefore,
removal of nodes may not change the visible size as previously added # removal of nodes may not change the visible size as previously added
nodes suddenly become visible. # nodes suddenly become visible.
""" # """
peers = set(peers) # peers = set(peers)
if not peers: # if not peers:
return # return
nheap = [] # nheap = []
for distance, node in self.heap: # for distance, node in self.heap:
if node.id not in peers: # if node.id not in peers:
heapq.heappush(nheap, (distance, node)) # heapq.heappush(nheap, (distance, node))
self.heap = nheap # self.heap = nheap
def get_node(self, node_id): # def get_node(self, node_id):
for _, node in self.heap: # for _, node in self.heap:
if node.id == node_id: # if node.id == node_id:
return node # return node
return None # return None
def have_contacted_all(self): # def have_contacted_all(self):
return len(self.get_uncontacted()) == 0 # return len(self.get_uncontacted()) == 0
def get_ids(self): # def get_ids(self):
return [n.id for n in self] # return [n.id for n in self]
def mark_contacted(self, node): # def mark_contacted(self, node):
self.contacted.add(node.id) # self.contacted.add(node.id)
def popleft(self): # def popleft(self):
return heapq.heappop(self.heap)[1] if self else None # return heapq.heappop(self.heap)[1] if self else None
def push(self, nodes): # def push(self, nodes):
""" # """
Push nodes onto heap. # Push nodes onto heap.
@param nodes: This can be a single item or a C{list}. # @param nodes: This can be a single item or a C{list}.
""" # """
if not isinstance(nodes, list): # if not isinstance(nodes, list):
nodes = [nodes] # nodes = [nodes]
for node in nodes: # for node in nodes:
if node not in self: # if node not in self:
distance = self.node.distance_to(node) # distance = self.node.distance_to(node)
heapq.heappush(self.heap, (distance, node)) # heapq.heappush(self.heap, (distance, node))
def __len__(self): # def __len__(self):
return min(len(self.heap), self.maxsize) # return min(len(self.heap), self.maxsize)
def __iter__(self): # def __iter__(self):
nodes = heapq.nsmallest(self.maxsize, self.heap) # nodes = heapq.nsmallest(self.maxsize, self.heap)
return iter(map(itemgetter(1), nodes)) # return iter(map(itemgetter(1), nodes))
def __contains__(self, node): # def __contains__(self, node):
for _, other in self.heap: # for _, other in self.heap:
if node.id == other.id: # if node.id == other.id:
return True # return True
return False # return False
def get_uncontacted(self): # def get_uncontacted(self):
return [n for n in self if n.id not in self.contacted] # return [n for n in self if n.id not in self.contacted]

View File

@ -4,7 +4,7 @@ import logging
from rpcudp.protocol import RPCProtocol from rpcudp.protocol import RPCProtocol
from .node import Node from .kad_peerinfo import KadPeerInfo
from .routing import RoutingTable from .routing import RoutingTable
from .utils import digest from .utils import digest
@ -47,12 +47,28 @@ class KademliaProtocol(RPCProtocol):
return sender return sender
def rpc_ping(self, sender, nodeid): def rpc_ping(self, sender, nodeid):
source = Node(nodeid, sender[0], sender[1]) print ("RPC PING")
print (sender)
node_id = ID(nodeid)
peer_data = PeerData() #pylint: disable=no-value-for-parameter
addr = [Multiaddr("/ip4/" + str(sender[0]) + "/udp/" + str(sender[1]))]
peer_data.add_addrs(addr)
source = KadPeerInfo(node_id, peer_data)
self.welcome_if_new(source) self.welcome_if_new(source)
return self.source_node.id return self.source_node.peer_id
def rpc_store(self, sender, nodeid, key, value): def rpc_store(self, sender, nodeid, key, value):
source = Node(nodeid, sender[0], sender[1]) print ("RPC STORE")
print (sender)
node_id = ID(nodeid)
peer_data = PeerData() #pylint: disable=no-value-for-parameter
addr = [Multiaddr("/ip4/" + str(sender[0]) + "/udp/" + str(sender[1]))]
peer_data.add_addrs(addr)
source = KadPeerInfo(node_id, peer_data)
self.welcome_if_new(source) self.welcome_if_new(source)
log.debug("got a store request from %s, storing '%s'='%s'", log.debug("got a store request from %s, storing '%s'='%s'",
sender, key.hex(), value) sender, key.hex(), value)
@ -62,14 +78,29 @@ class KademliaProtocol(RPCProtocol):
def rpc_find_node(self, sender, nodeid, key): def rpc_find_node(self, sender, nodeid, key):
log.info("finding neighbors of %i in local table", log.info("finding neighbors of %i in local table",
int(nodeid.hex(), 16)) int(nodeid.hex(), 16))
source = Node(nodeid, sender[0], sender[1])
node_id = ID(nodeid)
peer_data = PeerData() #pylint: disable=no-value-for-parameter
addr = [Multiaddr("/ip4/" + str(sender[0]) + "/udp/" + str(sender[1]))]
peer_data.add_addrs(addr)
source = KadPeerInfo(node_id, peer_data)
# source = Node(nodeid, sender[0], sender[1])
self.welcome_if_new(source) self.welcome_if_new(source)
node = Node(key) node = KadPeerInfo(ID(key))
neighbors = self.router.find_neighbors(node, exclude=source) neighbors = self.router.find_neighbors(node, exclude=source)
return list(map(tuple, neighbors)) return list(map(tuple, neighbors))
def rpc_find_value(self, sender, nodeid, key): def rpc_find_value(self, sender, nodeid, key):
source = Node(nodeid, sender[0], sender[1]) print ("RPC_FIND_VALUE")
print (sender)
node_id = ID(nodeid)
peer_data = PeerData() #pylint: disable=no-value-for-parameter
addr = [Multiaddr("/ip4/" + str(sender[0]) + "/udp/" + str(sender[1]))]
peer_data.add_addrs(addr)
source = KadPeerInfo(node_id, peer_data)
self.welcome_if_new(source) self.welcome_if_new(source)
value = self.storage.get(key, None) value = self.storage.get(key, None)
if value is None: if value is None:
@ -78,24 +109,24 @@ class KademliaProtocol(RPCProtocol):
async def call_find_node(self, node_to_ask, node_to_find): async def call_find_node(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port) address = (node_to_ask.ip, node_to_ask.port)
result = await self.find_node(address, self.source_node.id, result = await self.find_node(address, self.source_node.peer_id,
node_to_find.id) node_to_find.peer_id)
return self.handle_call_response(result, node_to_ask) return self.handle_call_response(result, node_to_ask)
async def call_find_value(self, node_to_ask, node_to_find): async def call_find_value(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port) address = (node_to_ask.ip, node_to_ask.port)
result = await self.find_value(address, self.source_node.id, result = await self.find_value(address, self.source_node.peer_id,
node_to_find.id) node_to_find.peer_id)
return self.handle_call_response(result, node_to_ask) return self.handle_call_response(result, node_to_ask)
async def call_ping(self, node_to_ask): async def call_ping(self, node_to_ask):
address = (node_to_ask.ip, node_to_ask.port) address = (node_to_ask.ip, node_to_ask.port)
result = await self.ping(address, self.source_node.id) result = await self.ping(address, self.source_node.peer_id)
return self.handle_call_response(result, node_to_ask) return self.handle_call_response(result, node_to_ask)
async def call_store(self, node_to_ask, key, value): async def call_store(self, node_to_ask, key, value):
address = (node_to_ask.ip, node_to_ask.port) address = (node_to_ask.ip, node_to_ask.port)
result = await self.store(address, self.source_node.id, key, value) result = await self.store(address, self.source_node.peer_id, key, value)
return self.handle_call_response(result, node_to_ask) return self.handle_call_response(result, node_to_ask)
def welcome_if_new(self, node): def welcome_if_new(self, node):
@ -117,7 +148,7 @@ class KademliaProtocol(RPCProtocol):
log.info("never seen %s before, adding to router", node) log.info("never seen %s before, adding to router", node)
for key, value in self.storage: for key, value in self.storage:
keynode = Node(digest(key)) keynode = KadPeerInfo(ID(digest(key)))
neighbors = self.router.find_neighbors(keynode) neighbors = self.router.find_neighbors(keynode)
if neighbors: if neighbors:
last = neighbors[-1].distance_to(keynode) last = neighbors[-1].distance_to(keynode)