From 445c0f8e65c61c1ded426d18a8d638130e4c2c66 Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Tue, 17 Dec 2019 10:11:17 -0800 Subject: [PATCH] Dangling `kademlia` cleanup --- libp2p/kademlia/network.py | 250 ------------------------------------- 1 file changed, 250 deletions(-) delete mode 100644 libp2p/kademlia/network.py diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py deleted file mode 100644 index 1077bb5a..00000000 --- a/libp2p/kademlia/network.py +++ /dev/null @@ -1,250 +0,0 @@ -"""Package for interacting on the network at a high level.""" -import asyncio -import logging -import pickle - -from .crawling import NodeSpiderCrawl, ValueSpiderCrawl -from .kad_peerinfo import create_kad_peerinfo -from .protocol import KademliaProtocol -from .storage import ForgetfulStorage -from .utils import digest - -log = logging.getLogger(__name__) - - -class KademliaServer: - """ - High level view of a node instance. - - This is the object that should be created to start listening as an - active node on the network. - """ - - protocol_class = KademliaProtocol - - def __init__(self, ksize=20, alpha=3, node_id=None, storage=None): - """ - Create a server instance. This will start listening on the given port. - - Args: - ksize (int): The k parameter from the paper - alpha (int): The alpha parameter from the paper - node_id: The id for this node on the network. - storage: An instance that implements - :interface:`~kademlia.storage.IStorage` - """ - self.ksize = ksize - self.alpha = alpha - self.storage = storage or ForgetfulStorage() - self.node = create_kad_peerinfo(node_id) - self.transport = None - self.protocol = None - self.refresh_loop = None - self.save_state_loop = None - - def stop(self): - if self.transport is not None: - self.transport.close() - - if self.refresh_loop: - self.refresh_loop.cancel() - - if self.save_state_loop: - self.save_state_loop.cancel() - - def _create_protocol(self): - return self.protocol_class(self.node, self.storage, self.ksize) - - async def listen(self, port=0, interface="0.0.0.0"): - """ - Start listening on the given port. - - Provide interface="::" to accept ipv6 address - """ - loop = asyncio.get_event_loop() - listen = loop.create_datagram_endpoint( - self._create_protocol, local_addr=(interface, port) - ) - self.transport, self.protocol = await listen - socket = self.transport.get_extra_info("socket") - self.address = socket.getsockname() - log.info( - "Node %i listening on %s:%i", - self.node.xor_id, - self.address[0], - self.address[1], - ) - # finally, schedule refreshing table - self.refresh_table() - - def refresh_table(self): - log.debug("Refreshing routing table") - asyncio.ensure_future(self._refresh_table()) - loop = asyncio.get_event_loop() - self.refresh_loop = loop.call_later(3600, self.refresh_table) - - async def _refresh_table(self): - """Refresh buckets that haven't had any lookups in the last hour (per - section 2.3 of the paper).""" - results = [] - for node_id in self.protocol.get_refresh_ids(): - node = create_kad_peerinfo(node_id) - nearest = self.protocol.router.find_neighbors(node, self.alpha) - spider = NodeSpiderCrawl( - self.protocol, node, nearest, self.ksize, self.alpha - ) - results.append(spider.find()) - - # do our crawling - await asyncio.gather(*results) - - # now republish keys older than one hour - for dkey, value in self.storage.iter_older_than(3600): - await self.set_digest(dkey, value) - - def bootstrappable_neighbors(self): - """ - Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for use - as an argument to the bootstrap method. - - The server should have been bootstrapped - already - this is just a utility for getting some neighbors and then - storing them if this server is going down for a while. When it comes - back up, the list of nodes can be used to bootstrap. - """ - neighbors = self.protocol.router.find_neighbors(self.node) - return [tuple(n)[-2:] for n in neighbors] - - async def bootstrap(self, addrs): - """ - Bootstrap the server by connecting to other known nodes in the network. - - Args: - addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP - addresses are acceptable - hostnames will cause an error. - """ - log.debug("Attempting to bootstrap node with %i initial contacts", len(addrs)) - cos = list(map(self.bootstrap_node, addrs)) - gathered = await asyncio.gather(*cos) - nodes = [node for node in gathered if node is not None] - spider = NodeSpiderCrawl( - self.protocol, self.node, nodes, self.ksize, self.alpha - ) - return await spider.find() - - async def bootstrap_node(self, addr): - result = await self.protocol.ping(addr, self.node.peer_id_bytes) - return create_kad_peerinfo(result[1], addr[0], addr[1]) if result[0] else None - - async def get(self, key): - """ - Get a key if the network has it. - - Returns: - :class:`None` if not found, the value otherwise. - """ - log.info("Looking up key %s", key) - dkey = digest(key) - # if this node has it, return it - if self.storage.get(dkey) is not None: - return self.storage.get(dkey) - - node = create_kad_peerinfo(dkey) - nearest = self.protocol.router.find_neighbors(node) - if not nearest: - log.warning("There are no known neighbors to get key %s", key) - return None - spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - return await spider.find() - - async def set(self, key, value): - """Set the given string key to the given value in the network.""" - if not check_dht_value_type(value): - raise TypeError("Value must be of type int, float, bool, str, or bytes") - log.info("setting '%s' = '%s' on network", key, value) - dkey = digest(key) - return await self.set_digest(dkey, value) - - async def provide(self, key): - """publish to the network that it provides for a particular key.""" - neighbors = self.protocol.router.find_neighbors(self.node) - return [ - await self.protocol.call_add_provider(n, key, self.node.peer_id_bytes) - for n in neighbors - ] - - async def get_providers(self, key): - """get the list of providers for a key.""" - neighbors = self.protocol.router.find_neighbors(self.node) - return [await self.protocol.call_get_providers(n, key) for n in neighbors] - - async def set_digest(self, dkey, value): - """Set the given SHA1 digest key (bytes) to the given value in the - network.""" - node = create_kad_peerinfo(dkey) - - nearest = self.protocol.router.find_neighbors(node) - if not nearest: - log.warning("There are no known neighbors to set key %s", dkey.hex()) - return False - - spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - nodes = await spider.find() - log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes))) - - # if this node is close too, then store here as well - biggest = max([n.distance_to(node) for n in nodes]) - if self.node.distance_to(node) < biggest: - self.storage[dkey] = value - results = [self.protocol.call_store(n, dkey, value) for n in nodes] - # return true only if at least one store call succeeded - return any(await asyncio.gather(*results)) - - def save_state(self, fname): - """Save the state of this node (the alpha/ksize/id/immediate neighbors) - to a cache file with the given fname.""" - log.info("Saving state to %s", fname) - data = { - "ksize": self.ksize, - "alpha": self.alpha, - "id": self.node.peer_id_bytes, - "neighbors": self.bootstrappable_neighbors(), - } - if not data["neighbors"]: - log.warning("No known neighbors, so not writing to cache.") - return - with open(fname, "wb") as file: - pickle.dump(data, file) - - @classmethod - def load_state(cls, fname): - """Load the state of this node (the alpha/ksize/id/immediate neighbors) - from a cache file with the given fname.""" - log.info("Loading state from %s", fname) - with open(fname, "rb") as file: - data = pickle.load(file) - svr = KademliaServer(data["ksize"], data["alpha"], data["id"]) - if data["neighbors"]: - svr.bootstrap(data["neighbors"]) - return svr - - def save_state_regularly(self, fname, frequency=600): - """ - Save the state of node with a given regularity to the given filename. - - :param fname: File name to save regularly to - :param frequency: Frequency in seconds that the state should be saved. - By default, 10 minutes. - """ - self.save_state(fname) - loop = asyncio.get_event_loop() - self.save_state_loop = loop.call_later( - frequency, self.save_state_regularly, fname, frequency - ) - - -def check_dht_value_type(value): - """Checks to see if the type of the value is a valid type for placing in - the dht.""" - typeset = [int, float, bool, str, bytes] - return type(value) in typeset