diff --git a/examples/chat/chat.py b/examples/chat/chat.py index ea985989..0b0fd436 100755 --- a/examples/chat/chat.py +++ b/examples/chat/chat.py @@ -9,7 +9,7 @@ from libp2p import new_node from libp2p.peer.peerinfo import info_from_p2p_addr -PROTOCOL_ID = '/chat/1.0.0' +PROTOCOL_ID = "/chat/1.0.0" async def read_data(stream): @@ -32,25 +32,27 @@ async def write_data(stream): async def run(port, destination): - external_ip = urllib.request.urlopen( - 'https://v4.ident.me/').read().decode('utf8') + external_ip = urllib.request.urlopen("https://v4.ident.me/").read().decode("utf8") transport_opt = "/ip4/%s/tcp/%s" % (external_ip, port) - host = await new_node( - transport_opt=[transport_opt]) + host = await new_node(transport_opt=[transport_opt]) await host.get_network().listen(multiaddr.Multiaddr(transport_opt)) if not destination: # its the server + async def stream_handler(stream): asyncio.ensure_future(read_data(stream)) asyncio.ensure_future(write_data(stream)) + host.set_stream_handler(PROTOCOL_ID, stream_handler) if not port: raise RuntimeError("was not able to find the actual local port") - print("Run './examples/chat/chat.py -p %s -d /ip4/%s/tcp/%s/p2p/%s' on another console.\n" % - (int(port) + 1, external_ip, port, host.get_id().pretty())) + print( + "Run './examples/chat/chat.py -p %s -d /ip4/%s/tcp/%s/p2p/%s' on another console.\n" + % (int(port) + 1, external_ip, port, host.get_id().pretty()) + ) print("\nWaiting for incoming connection\n\n") else: # its the client @@ -75,19 +77,17 @@ def main(): Then, run another host with 'python ./chat -p -d ', where is the multiaddress of the previous listener host. """ - example_maddr = "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" + example_maddr = ( + "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" + ) parser = argparse.ArgumentParser(description=description) parser.add_argument( "--debug", - action='store_true', - help='generate the same node ID on every execution', + action="store_true", + help="generate the same node ID on every execution", ) parser.add_argument( - "-p", - "--port", - default=8000, - type=int, - help="source port number", + "-p", "--port", default=8000, type=int, help="source port number" ) parser.add_argument( "-d", @@ -107,5 +107,5 @@ def main(): loop.close() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/libp2p/__init__.py b/libp2p/__init__.py index b963fbe7..e285432f 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -26,14 +26,15 @@ async def cleanup_done_tasks(): # Some sleep necessary to context switch await asyncio.sleep(3) + def generate_id(): new_key = RSA.generate(2048, e=65537) new_id = id_from_public_key(new_key.publickey()) # private_key = new_key.exportKey("PEM") return new_id -def initialize_default_kademlia_router( - ksize=20, alpha=3, id_opt=None, storage=None): + +def initialize_default_kademlia_router(ksize=20, alpha=3, id_opt=None, storage=None): """ initialize kadmelia router when no kademlia router is passed in :param ksize: The k parameter from the paper @@ -47,14 +48,18 @@ def initialize_default_kademlia_router( id_opt = generate_id() node_id = id_opt.get_raw_id() - server = KademliaServer(ksize=ksize, alpha=alpha, - node_id=node_id, storage=storage) + server = KademliaServer(ksize=ksize, alpha=alpha, node_id=node_id, storage=storage) return KadmeliaPeerRouter(server) def initialize_default_swarm( - id_opt=None, transport_opt=None, muxer_opt=None, - sec_opt=None, peerstore_opt=None, disc_opt=None): + id_opt=None, + transport_opt=None, + muxer_opt=None, + sec_opt=None, + peerstore_opt=None, + disc_opt=None, +): """ initialize swarm when no swarm is passed in :param id_opt: optional id for host @@ -82,16 +87,20 @@ def initialize_default_swarm( peerstore = peerstore_opt or PeerStore() # TODO: Initialize discovery if not presented - swarm_opt = Swarm(id_opt, peerstore,\ - upgrader, transport, disc_opt) + swarm_opt = Swarm(id_opt, peerstore, upgrader, transport, disc_opt) return swarm_opt async def new_node( - swarm_opt=None, id_opt=None, transport_opt=None, - muxer_opt=None, sec_opt=None, peerstore_opt=None, - disc_opt=None): + swarm_opt=None, + id_opt=None, + transport_opt=None, + muxer_opt=None, + sec_opt=None, + peerstore_opt=None, + disc_opt=None, +): """ create new libp2p node :param swarm_opt: optional swarm @@ -110,9 +119,13 @@ async def new_node( if not swarm_opt: swarm_opt = initialize_default_swarm( - id_opt=id_opt, transport_opt=transport_opt, - muxer_opt=muxer_opt, sec_opt=sec_opt, - peerstore_opt=peerstore_opt, disc_opt=disc_opt) + id_opt=id_opt, + transport_opt=transport_opt, + muxer_opt=muxer_opt, + sec_opt=sec_opt, + peerstore_opt=peerstore_opt, + disc_opt=disc_opt, + ) # TODO enable support for other host type # TODO routing unimplemented diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 99ee832a..ff1b165a 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -1,10 +1,4 @@ -from typing import ( - Any, - Awaitable, - Callable, - List, - Sequence, -) +from typing import Any, Awaitable, Callable, List, Sequence import multiaddr @@ -67,7 +61,7 @@ class BasicHost(IHost): """ :return: all the multiaddr addresses this host is listening too """ - p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty())) + p2p_part = multiaddr.Multiaddr("/p2p/{}".format(self.get_id().pretty())) addrs: List[multiaddr.Multiaddr] = [] for transport in self._network.listeners.values(): @@ -75,7 +69,9 @@ class BasicHost(IHost): addrs.append(addr.encapsulate(p2p_part)) return addrs - def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool: + def set_stream_handler( + self, protocol_id: str, stream_handler: StreamHandlerFn + ) -> bool: """ set stream handler for host :param protocol_id: protocol id used on stream diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index 50858089..9bf54648 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -1,11 +1,5 @@ from abc import ABC, abstractmethod -from typing import ( - Any, - Awaitable, - Callable, - List, - Sequence, -) +from typing import Any, Awaitable, Callable, List, Sequence import multiaddr @@ -20,7 +14,6 @@ StreamHandlerFn = Callable[[INetStream], Awaitable[None]] class IHost(ABC): - @abstractmethod def get_id(self) -> ID: """ @@ -47,7 +40,9 @@ class IHost(ABC): """ @abstractmethod - def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool: + def set_stream_handler( + self, protocol_id: str, stream_handler: StreamHandlerFn + ) -> bool: """ set stream handler for host :param protocol_id: protocol id used on stream @@ -58,9 +53,7 @@ class IHost(ABC): # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on @abstractmethod - async def new_stream(self, - peer_id: ID, - protocol_ids: Sequence[str]) -> INetStream: + async def new_stream(self, peer_id: ID, protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id that host is connecting :param protocol_ids: protocol ids that stream can run on diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index 3fdfbc69..702271e5 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -118,8 +118,9 @@ class ValueSpiderCrawl(SpiderCrawl): """ value_counts = Counter(values) if len(value_counts) != 1: - log.warning("Got multiple values for key %i: %s", - self.node.xor_id, str(values)) + log.warning( + "Got multiple values for key %i: %s", self.node.xor_id, str(values) + ) value = value_counts.most_common(1)[0][0] peer = self.nearest_without_value.popleft() @@ -175,7 +176,7 @@ class RPCFindResponse: return isinstance(self.response[1], dict) def get_value(self): - return self.response[1]['value'] + return self.response[1]["value"] def get_node_list(self): """ diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py index a5ef5dbd..e04b056a 100644 --- a/libp2p/kademlia/kad_peerinfo.py +++ b/libp2p/kademlia/kad_peerinfo.py @@ -11,6 +11,7 @@ from .utils import digest P_IP = "ip4" P_UDP = "udp" + class KadPeerInfo(PeerInfo): def __init__(self, peer_id, peer_data=None): super(KadPeerInfo, self).__init__(peer_id, peer_data) @@ -22,11 +23,8 @@ class KadPeerInfo(PeerInfo): self.addrs = peer_data.get_addrs() if peer_data else None # pylint: disable=invalid-name - self.ip = self.addrs[0].value_for_protocol(P_IP)\ - if peer_data else None - self.port = int(self.addrs[0].value_for_protocol(P_UDP))\ - if peer_data else None - + self.ip = self.addrs[0].value_for_protocol(P_IP) if peer_data else None + self.port = int(self.addrs[0].value_for_protocol(P_UDP)) if peer_data else None def same_home_as(self, node): return sorted(self.addrs) == sorted(node.addrs) @@ -50,13 +48,18 @@ class KadPeerInfo(PeerInfo): return "%s:%s" % (self.ip, str(self.port)) def encode(self): - return str(self.peer_id) + "\n" + \ - str("/ip4/" + str(self.ip) + "/udp/" + str(self.port)) + return ( + str(self.peer_id) + + "\n" + + str("/ip4/" + str(self.ip) + "/udp/" + str(self.port)) + ) + class KadPeerHeap: """ A heap of peers ordered by distance to a given node. """ + def __init__(self, node, maxsize): """ Constructor. @@ -134,13 +137,17 @@ class KadPeerHeap: def get_uncontacted(self): return [n for n in self if n.peer_id not in self.contacted] + def create_kad_peerinfo(raw_node_id=None, sender_ip=None, sender_port=None): node_id = ID(raw_node_id) if raw_node_id else ID(digest(random.getrandbits(255))) peer_data = None if sender_ip and sender_port: - peer_data = PeerData() #pylint: disable=no-value-for-parameter - addr = [Multiaddr("/"+ P_IP +"/" + str(sender_ip) + "/"\ - + P_UDP + "/" + str(sender_port))] + peer_data = PeerData() # pylint: disable=no-value-for-parameter + addr = [ + Multiaddr( + "/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port) + ) + ] peer_data.add_addrs(addr) return KadPeerInfo(node_id, peer_data) diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index 05b9ab4e..f8b9d7a4 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -57,17 +57,17 @@ class KademliaServer: def _create_protocol(self): return self.protocol_class(self.node, self.storage, self.ksize) - async def listen(self, port, interface='0.0.0.0'): + async def listen(self, port, interface="0.0.0.0"): """ Start listening on the given port. Provide interface="::" to accept ipv6 address """ loop = asyncio.get_event_loop() - listen = loop.create_datagram_endpoint(self._create_protocol, - local_addr=(interface, port)) - log.info("Node %i listening on %s:%i", - self.node.xor_id, interface, port) + listen = loop.create_datagram_endpoint( + self._create_protocol, local_addr=(interface, port) + ) + log.info("Node %i listening on %s:%i", self.node.xor_id, interface, port) self.transport, self.protocol = await listen # finally, schedule refreshing table self.refresh_table() @@ -87,8 +87,9 @@ class KademliaServer: for node_id in self.protocol.get_refresh_ids(): node = create_kad_peerinfo(node_id) nearest = self.protocol.router.find_neighbors(node, self.alpha) - spider = NodeSpiderCrawl(self.protocol, node, nearest, - self.ksize, self.alpha) + spider = NodeSpiderCrawl( + self.protocol, node, nearest, self.ksize, self.alpha + ) results.append(spider.find()) # do our crawling @@ -119,13 +120,13 @@ class KademliaServer: addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP addresses are acceptable - hostnames will cause an error. """ - log.debug("Attempting to bootstrap node with %i initial contacts", - len(addrs)) + log.debug("Attempting to bootstrap node with %i initial contacts", len(addrs)) cos = list(map(self.bootstrap_node, addrs)) gathered = await asyncio.gather(*cos) nodes = [node for node in gathered if node is not None] - spider = NodeSpiderCrawl(self.protocol, self.node, nodes, - self.ksize, self.alpha) + spider = NodeSpiderCrawl( + self.protocol, self.node, nodes, self.ksize, self.alpha + ) return await spider.find() async def bootstrap_node(self, addr): @@ -150,8 +151,7 @@ class KademliaServer: if not nearest: log.warning("There are no known neighbors to get key %s", key) return None - spider = ValueSpiderCrawl(self.protocol, node, nearest, - self.ksize, self.alpha) + spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) return await spider.find() async def set(self, key, value): @@ -159,9 +159,7 @@ class KademliaServer: Set the given string key to the given value in the network. """ if not check_dht_value_type(value): - raise TypeError( - "Value must be of type int, float, bool, str, or bytes" - ) + raise TypeError("Value must be of type int, float, bool, str, or bytes") log.info("setting '%s' = '%s' on network", key, value) dkey = digest(key) return await self.set_digest(dkey, value) @@ -171,7 +169,10 @@ class KademliaServer: publish to the network that it provides for a particular key """ neighbors = self.protocol.router.find_neighbors(self.node) - return [await self.protocol.call_add_provider(n, key, self.node.peer_id) for n in neighbors] + return [ + await self.protocol.call_add_provider(n, key, self.node.peer_id) + for n in neighbors + ] async def get_providers(self, key): """ @@ -189,12 +190,10 @@ class KademliaServer: nearest = self.protocol.router.find_neighbors(node) if not nearest: - log.warning("There are no known neighbors to set key %s", - dkey.hex()) + log.warning("There are no known neighbors to set key %s", dkey.hex()) return False - spider = NodeSpiderCrawl(self.protocol, node, nearest, - self.ksize, self.alpha) + spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) nodes = await spider.find() log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes))) @@ -213,15 +212,15 @@ class KademliaServer: """ log.info("Saving state to %s", fname) data = { - 'ksize': self.ksize, - 'alpha': self.alpha, - 'id': self.node.peer_id, - 'neighbors': self.bootstrappable_neighbors() + "ksize": self.ksize, + "alpha": self.alpha, + "id": self.node.peer_id, + "neighbors": self.bootstrappable_neighbors(), } - if not data['neighbors']: + if not data["neighbors"]: log.warning("No known neighbors, so not writing to cache.") return - with open(fname, 'wb') as file: + with open(fname, "wb") as file: pickle.dump(data, file) @classmethod @@ -231,11 +230,11 @@ class KademliaServer: from a cache file with the given fname. """ log.info("Loading state from %s", fname) - with open(fname, 'rb') as file: + with open(fname, "rb") as file: data = pickle.load(file) - svr = KademliaServer(data['ksize'], data['alpha'], data['id']) - if data['neighbors']: - svr.bootstrap(data['neighbors']) + svr = KademliaServer(data["ksize"], data["alpha"], data["id"]) + if data["neighbors"]: + svr.bootstrap(data["neighbors"]) return svr def save_state_regularly(self, fname, frequency=600): @@ -250,10 +249,9 @@ class KademliaServer: """ self.save_state(fname) loop = asyncio.get_event_loop() - self.save_state_loop = loop.call_later(frequency, - self.save_state_regularly, - fname, - frequency) + self.save_state_loop = loop.call_later( + frequency, self.save_state_regularly, fname, frequency + ) def check_dht_value_type(value): @@ -261,11 +259,5 @@ def check_dht_value_type(value): Checks to see if the type of the value is a valid type for placing in the dht. """ - typeset = [ - int, - float, - bool, - str, - bytes - ] + typeset = [int, float, bool, str, bytes] return type(value) in typeset # pylint: disable=unidiomatic-typecheck diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 0612545d..1c995793 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -20,6 +20,7 @@ class KademliaProtocol(RPCProtocol): (ip, udp_port, node_id) for k closest nodes to target FIND_VALUE behaves like FIND_NODE unless a value is stored """ + def __init__(self, source_node, storage, ksize): RPCProtocol.__init__(self) self.router = RoutingTable(self, ksize, source_node) @@ -32,7 +33,7 @@ class KademliaProtocol(RPCProtocol): """ ids = [] for bucket in self.router.lonely_buckets(): - rid = random.randint(*bucket.range).to_bytes(20, byteorder='big') + rid = random.randint(*bucket.range).to_bytes(20, byteorder="big") ids.append(rid) return ids @@ -49,14 +50,14 @@ class KademliaProtocol(RPCProtocol): source = create_kad_peerinfo(nodeid, sender[0], sender[1]) self.welcome_if_new(source) - log.debug("got a store request from %s, storing '%s'='%s'", - sender, key.hex(), value) + log.debug( + "got a store request from %s, storing '%s'='%s'", sender, key.hex(), value + ) self.storage[key] = value return True def rpc_find_node(self, sender, nodeid, key): - log.info("finding neighbors of %i in local table", - int(nodeid.hex(), 16)) + log.info("finding neighbors of %i in local table", int(nodeid.hex(), 16)) source = create_kad_peerinfo(nodeid, sender[0], sender[1]) self.welcome_if_new(source) @@ -71,7 +72,7 @@ class KademliaProtocol(RPCProtocol): value = self.storage.get(key, None) if value is None: return self.rpc_find_node(sender, nodeid, key) - return {'value': value} + return {"value": value} def rpc_add_provider(self, sender, nodeid, key, provider_id): # pylint: disable=unused-argument @@ -82,8 +83,9 @@ class KademliaProtocol(RPCProtocol): we store a map of content_id to peer_id (non xor) """ if nodeid == provider_id: - log.info("adding provider %s for key %s in local table", - provider_id, str(key)) + log.info( + "adding provider %s for key %s in local table", provider_id, str(key) + ) self.storage[key] = provider_id return True return False @@ -111,14 +113,16 @@ class KademliaProtocol(RPCProtocol): async def call_find_node(self, node_to_ask, node_to_find): address = (node_to_ask.ip, node_to_ask.port) - result = await self.find_node(address, self.source_node.peer_id, - node_to_find.peer_id) + result = await self.find_node( + address, self.source_node.peer_id, node_to_find.peer_id + ) return self.handle_call_response(result, node_to_ask) async def call_find_value(self, node_to_ask, node_to_find): address = (node_to_ask.ip, node_to_ask.port) - result = await self.find_value(address, self.source_node.peer_id, - node_to_find.peer_id) + result = await self.find_value( + address, self.source_node.peer_id, node_to_find.peer_id + ) return self.handle_call_response(result, node_to_ask) async def call_ping(self, node_to_ask): @@ -133,9 +137,9 @@ class KademliaProtocol(RPCProtocol): async def call_add_provider(self, node_to_ask, key, provider_id): address = (node_to_ask.ip, node_to_ask.port) - result = await self.add_provider(address, - self.source_node.peer_id, - key, provider_id) + result = await self.add_provider( + address, self.source_node.peer_id, key, provider_id + ) return self.handle_call_response(result, node_to_ask) diff --git a/libp2p/kademlia/routing.py b/libp2p/kademlia/routing.py index b84717e9..6aba4ca9 100644 --- a/libp2p/kademlia/routing.py +++ b/libp2p/kademlia/routing.py @@ -15,6 +15,7 @@ class KBucket: each k-bucket implements a last seen eviction policy except that live nodes are never removed """ + def __init__(self, rangeLower, rangeUpper, ksize): self.range = (rangeLower, rangeUpper) self.nodes = OrderedDict() @@ -92,7 +93,7 @@ class TableTraverser: table.buckets[index].touch_last_updated() self.current_nodes = table.buckets[index].get_nodes() self.left_buckets = table.buckets[:index] - self.right_buckets = table.buckets[(index + 1):] + self.right_buckets = table.buckets[(index + 1) :] self.left = True def __iter__(self): diff --git a/libp2p/kademlia/utils.py b/libp2p/kademlia/utils.py index da900851..f5fa2d16 100644 --- a/libp2p/kademlia/utils.py +++ b/libp2p/kademlia/utils.py @@ -14,7 +14,7 @@ async def gather_dict(dic): def digest(string): if not isinstance(string, bytes): - string = str(string).encode('utf8') + string = str(string).encode("utf8") return hashlib.sha1(string).digest() @@ -53,5 +53,5 @@ def shared_prefix(args): def bytes_to_bit_string(bites): - bits = [bin(bite)[2:].rjust(8, '0') for bite in bites] + bits = [bin(bite)[2:].rjust(8, "0") for bite in bites] return "".join(bits) diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 3e5716bb..034298e9 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -2,6 +2,7 @@ import asyncio from .raw_connection_interface import IRawConnection + class RawConnection(IRawConnection): conn_ip: str @@ -11,12 +12,14 @@ class RawConnection(IRawConnection): _next_id: int initiator: bool - def __init__(self, - ip: str, - port: str, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter, - initiator: bool) -> None: + def __init__( + self, + ip: str, + port: str, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + initiator: bool, + ) -> None: # pylint: disable=too-many-arguments self.conn_ip = ip self.conn_port = port @@ -32,7 +35,7 @@ class RawConnection(IRawConnection): async def read(self) -> bytes: line = await self.reader.readline() - adjusted_line = line.decode().rstrip('\n') + adjusted_line = line.decode().rstrip("\n") # TODO: figure out a way to remove \n without going back and forth with # encoding and decoding diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 1a84c0ea..1afcabf5 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -1,14 +1,5 @@ -from abc import ( - ABC, - abstractmethod, -) -from typing import ( - Awaitable, - Callable, - Dict, - Sequence, - TYPE_CHECKING, -) +from abc import ABC, abstractmethod +from typing import Awaitable, Callable, Dict, Sequence, TYPE_CHECKING from multiaddr import Multiaddr @@ -49,7 +40,9 @@ class INetwork(ABC): """ @abstractmethod - def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool: + def set_stream_handler( + self, protocol_id: str, stream_handler: StreamHandlerFn + ) -> bool: """ :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance @@ -57,9 +50,7 @@ class INetwork(ABC): """ @abstractmethod - async def new_stream(self, - peer_id: ID, - protocol_ids: Sequence[str]) -> INetStream: + async def new_stream(self, peer_id: ID, protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id of destination :param protocol_ids: available protocol ids to use for stream @@ -74,7 +65,7 @@ class INetwork(ABC): """ @abstractmethod - def notify(self, notifee: 'INotifee') -> bool: + def notify(self, notifee: "INotifee") -> bool: """ :param notifee: object implementing Notifee interface :return: true if notifee registered successfully, false otherwise diff --git a/libp2p/network/notifee_interface.py b/libp2p/network/notifee_interface.py index 65ad4480..73f4ecde 100644 --- a/libp2p/network/notifee_interface.py +++ b/libp2p/network/notifee_interface.py @@ -1,7 +1,4 @@ -from abc import ( - ABC, - abstractmethod, -) +from abc import ABC, abstractmethod from typing import TYPE_CHECKING from multiaddr import Multiaddr @@ -15,44 +12,43 @@ if TYPE_CHECKING: class INotifee(ABC): - @abstractmethod - async def opened_stream(self, network: 'INetwork', stream: INetStream) -> None: + async def opened_stream(self, network: "INetwork", stream: INetStream) -> None: """ :param network: network the stream was opened on :param stream: stream that was opened """ @abstractmethod - async def closed_stream(self, network: 'INetwork', stream: INetStream) -> None: + async def closed_stream(self, network: "INetwork", stream: INetStream) -> None: """ :param network: network the stream was closed on :param stream: stream that was closed """ @abstractmethod - async def connected(self, network: 'INetwork', conn: IMuxedConn) -> None: + async def connected(self, network: "INetwork", conn: IMuxedConn) -> None: """ :param network: network the connection was opened on :param conn: connection that was opened """ @abstractmethod - async def disconnected(self, network: 'INetwork', conn: IMuxedConn) -> None: + async def disconnected(self, network: "INetwork", conn: IMuxedConn) -> None: """ :param network: network the connection was closed on :param conn: connection that was closed """ @abstractmethod - async def listen(self, network: 'INetwork', multiaddr: Multiaddr) -> None: + async def listen(self, network: "INetwork", multiaddr: Multiaddr) -> None: """ :param network: network the listener is listening on :param multiaddr: multiaddress listener is listening on """ @abstractmethod - async def listen_close(self, network: 'INetwork', multiaddr: Multiaddr) -> None: + async def listen_close(self, network: "INetwork", multiaddr: Multiaddr) -> None: """ :param network: network the connection was opened on :param multiaddr: multiaddress listener is no longer listening on diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index a259fce6..76141a0d 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -1,18 +1,9 @@ import asyncio -from typing import ( - Awaitable, - Callable, - Dict, - List, - Sequence, -) +from typing import Awaitable, Callable, Dict, List, Sequence from multiaddr import Multiaddr -from libp2p.peer.id import ( - ID, - id_b58_decode, -) +from libp2p.peer.id import ID, id_b58_decode from libp2p.peer.peerstore import PeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient @@ -51,12 +42,14 @@ class Swarm(INetwork): notifees: List[INotifee] - def __init__(self, - peer_id: ID, - peerstore: PeerStore, - upgrader: TransportUpgrader, - transport: ITransport, - router: IPeerRouting): + def __init__( + self, + peer_id: ID, + peerstore: PeerStore, + upgrader: TransportUpgrader, + transport: ITransport, + router: IPeerRouting, + ): self.self_id = peer_id self.peerstore = peerstore self.upgrader = upgrader @@ -79,7 +72,9 @@ class Swarm(INetwork): def get_peer_id(self) -> ID: return self.self_id - def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool: + def set_stream_handler( + self, protocol_id: str, stream_handler: StreamHandlerFn + ) -> bool: """ :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance @@ -119,8 +114,9 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) - muxed_conn = self.upgrader.upgrade_connection(secured_conn, \ - self.generic_protocol_handler, peer_id) + muxed_conn = self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) # Store muxed connection in connections self.connections[peer_id] = muxed_conn @@ -154,8 +150,7 @@ class Swarm(INetwork): # Perform protocol muxing to determine protocol to use selected_protocol = await self.multiselect_client.select_one_of( - list(protocol_ids), - muxed_stream, + list(protocol_ids), muxed_stream ) # Create a net stream with the selected protocol @@ -186,8 +181,9 @@ class Swarm(INetwork): if str(multiaddr) in self.listeners: return True - async def conn_handler(reader: asyncio.StreamReader, - writer: asyncio.StreamWriter) -> None: + async def conn_handler( + reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: # Read in first message (should be peer_id of initiator) and ack peer_id = id_b58_decode((await reader.read(1024)).decode()) @@ -196,14 +192,22 @@ class Swarm(INetwork): # Upgrade reader/write to a net_stream and pass \ # to appropriate stream handler (using multiaddr) - raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'), - multiaddr.value_for_protocol('tcp'), reader, writer, False) + raw_conn = RawConnection( + multiaddr.value_for_protocol("ip4"), + multiaddr.value_for_protocol("tcp"), + reader, + writer, + False, + ) # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn - secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, False) - muxed_conn = self.upgrader.upgrade_connection(secured_conn, \ - self.generic_protocol_handler, peer_id) + secured_conn = await self.upgrader.upgrade_security( + raw_conn, peer_id, False + ) + muxed_conn = self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) # Store muxed_conn with peer id self.connections[peer_id] = muxed_conn diff --git a/libp2p/peer/addrbook_interface.py b/libp2p/peer/addrbook_interface.py index 5ac34f02..722dc109 100644 --- a/libp2p/peer/addrbook_interface.py +++ b/libp2p/peer/addrbook_interface.py @@ -1,8 +1,5 @@ from abc import ABC, abstractmethod -from typing import ( - List, - Sequence, -) +from typing import List, Sequence from multiaddr import Multiaddr @@ -11,7 +8,6 @@ from .id import ID class IAddrBook(ABC): - def __init__(self) -> None: pass diff --git a/libp2p/peer/id.py b/libp2p/peer/id.py index 8561e0ec..79cf3a3c 100644 --- a/libp2p/peer/id.py +++ b/libp2p/peer/id.py @@ -1,7 +1,5 @@ import hashlib -from typing import ( - Union, -) +from typing import Union import base58 @@ -44,7 +42,7 @@ class ID: __repr__ = __str__ def __eq__(self, other: object) -> bool: - #pylint: disable=protected-access + # pylint: disable=protected-access if not isinstance(other, ID): return NotImplemented return self._id_str == other._id_str @@ -57,7 +55,7 @@ def id_b58_encode(peer_id: ID) -> str: """ return a b58-encoded string """ - #pylint: disable=protected-access + # pylint: disable=protected-access return base58.b58encode(peer_id.get_raw_id()).decode() @@ -84,7 +82,8 @@ def id_from_public_key(key: RsaKey) -> ID: def id_from_private_key(key: RsaKey) -> ID: return id_from_public_key(key.publickey()) + def digest(data: Union[str, bytes]) -> bytes: if isinstance(data, str): - data = data.encode('utf8') + data = data.encode("utf8") return hashlib.sha1(data).digest() diff --git a/libp2p/peer/peerdata.py b/libp2p/peer/peerdata.py index 68749c84..3ee4c4ce 100644 --- a/libp2p/peer/peerdata.py +++ b/libp2p/peer/peerdata.py @@ -1,9 +1,4 @@ -from typing import ( - Any, - Dict, - List, - Sequence, -) +from typing import Any, Dict, List, Sequence from multiaddr import Multiaddr diff --git a/libp2p/peer/peerdata_interface.py b/libp2p/peer/peerdata_interface.py index cefd56dc..ca7c2546 100644 --- a/libp2p/peer/peerdata_interface.py +++ b/libp2p/peer/peerdata_interface.py @@ -1,9 +1,5 @@ from abc import ABC, abstractmethod -from typing import ( - Any, - List, - Sequence, -) +from typing import Any, List, Sequence from multiaddr import Multiaddr @@ -11,7 +7,6 @@ from .peermetadata_interface import IPeerMetadata class IPeerData(ABC): - @abstractmethod def get_protocols(self) -> List[str]: """ diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index 6ccd0236..89bd307b 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -1,13 +1,8 @@ -from typing import ( - List, -) +from typing import List import multiaddr -from .id import ( - ID, - id_b58_decode, -) +from .id import ID, id_b58_decode from .peerdata import PeerData @@ -31,7 +26,9 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: parts = addr.split() if not parts: - raise InvalidAddrError(f"`parts`={parts} should at least have a protocol `P_P2P`") + raise InvalidAddrError( + f"`parts`={parts} should at least have a protocol `P_P2P`" + ) p2p_part = parts[-1] last_protocol_code = p2p_part.protocols()[0].code diff --git a/libp2p/peer/peermetadata_interface.py b/libp2p/peer/peermetadata_interface.py index 3d60259a..22a690e1 100644 --- a/libp2p/peer/peermetadata_interface.py +++ b/libp2p/peer/peermetadata_interface.py @@ -1,15 +1,10 @@ from abc import ABC, abstractmethod -from typing import ( - Any, -) +from typing import Any -from .id import ( - ID, -) +from .id import ID class IPeerMetadata(ABC): - def __init__(self) -> None: pass diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index 2cd1574f..1d15ab2a 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -1,10 +1,4 @@ -from typing import ( - Any, - Dict, - List, - Optional, - Sequence, -) +from typing import Any, Dict, List, Optional, Sequence from multiaddr import Multiaddr diff --git a/libp2p/peer/peerstore_interface.py b/libp2p/peer/peerstore_interface.py index db6dbde5..bc68234c 100644 --- a/libp2p/peer/peerstore_interface.py +++ b/libp2p/peer/peerstore_interface.py @@ -1,8 +1,5 @@ from abc import abstractmethod -from typing import ( - List, - Sequence, -) +from typing import List, Sequence from .addrbook_interface import IAddrBook @@ -12,7 +9,6 @@ from .peermetadata_interface import IPeerMetadata class IPeerStore(IAddrBook, IPeerMetadata): - def __init__(self) -> None: IPeerMetadata.__init__(self) IAddrBook.__init__(self) diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index b93de4fb..6dd9f129 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -118,5 +118,6 @@ def validate_handshake(handshake_contents): # is added return handshake_contents == MULTISELECT_PROTOCOL_ID + class MultiselectClientError(ValueError): """Raised when an error occurs in protocol selection process""" diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index e51c8310..0fcefe0d 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,13 +1,6 @@ -from typing import ( - Iterable, - List, - Sequence, -) +from typing import Iterable, List, Sequence -from libp2p.peer.id import ( - ID, - id_b58_decode, -) +from libp2p.peer.id import ID, id_b58_decode from .pb import rpc_pb2 from .pubsub import Pubsub @@ -78,9 +71,7 @@ class FloodSub(IPubsubRouter): msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id), ) - rpc_msg = rpc_pb2.RPC( - publish=[pubsub_msg], - ) + rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) for peer_id in peers_gen: stream = self.pubsub.peers[str(peer_id)] # FIXME: We should add a `WriteMsg` similar to write delimited messages. @@ -103,10 +94,8 @@ class FloodSub(IPubsubRouter): """ def _get_peers_to_send( - self, - topic_ids: Iterable[str], - msg_forwarder: ID, - origin: ID) -> Iterable[ID]: + self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID + ) -> Iterable[ID]: """ Get the eligible peers to send the data to. :param msg_forwarder: peer ID of the peer who forwards the message to us. diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index f0d903b9..d8e82686 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -1,19 +1,9 @@ from ast import literal_eval import asyncio import random -from typing import ( - Any, - Dict, - Iterable, - List, - Set, - Sequence, -) +from typing import Any, Dict, Iterable, List, Set, Sequence -from libp2p.peer.id import ( - ID, - id_b58_decode, -) +from libp2p.peer.id import ID, id_b58_decode from .mcache import MessageCache from .pb import rpc_pb2 @@ -45,24 +35,26 @@ class GossipSub(IPubsubRouter): time_since_last_publish: Dict[str, int] - #FIXME: Should be changed to List[ID] + # FIXME: Should be changed to List[ID] peers_gossipsub: List[str] - #FIXME: Should be changed to List[ID] + # FIXME: Should be changed to List[ID] peers_floodsub: List[str] mcache: MessageCache heartbeat_interval: int - def __init__(self, - protocols: Sequence[str], - degree: int, - degree_low: int, - degree_high: int, - time_to_live: int, - gossip_window: int = 3, - gossip_history: int = 5, - heartbeat_interval: int = 120) -> None: + def __init__( + self, + protocols: Sequence[str], + degree: int, + degree_low: int, + degree_high: int, + time_to_live: int, + gossip_window: int = 3, + gossip_history: int = 5, + heartbeat_interval: int = 120, + ) -> None: # pylint: disable=too-many-arguments self.protocols = list(protocols) self.pubsub = None @@ -181,9 +173,7 @@ class GossipSub(IPubsubRouter): msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id), ) - rpc_msg = rpc_pb2.RPC( - publish=[pubsub_msg], - ) + rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) for peer_id in peers_gen: stream = self.pubsub.peers[str(peer_id)] # FIXME: We should add a `WriteMsg` similar to write delimited messages. @@ -192,10 +182,8 @@ class GossipSub(IPubsubRouter): await stream.write(rpc_msg.SerializeToString()) def _get_peers_to_send( - self, - topic_ids: Iterable[str], - msg_forwarder: ID, - origin: ID) -> Iterable[ID]: + self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID + ) -> Iterable[ID]: """ Get the eligible peers to send the data to. :param msg_forwarder: the peer id of the peer who forwards the message to me. @@ -231,9 +219,7 @@ class GossipSub(IPubsubRouter): if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): # If no peers in fanout, choose some peers from gossipsub peers in topic. self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree, - [], + topic, self.degree, [] ) in_topic_gossipsub_peers = self.fanout[topic] for peer_id_str in in_topic_gossipsub_peers: @@ -264,9 +250,7 @@ class GossipSub(IPubsubRouter): # Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. if topic in self.pubsub.peer_topics: selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree - fanout_size, - fanout_peers, + topic, self.degree - fanout_size, fanout_peers ) # Combine fanout peers with selected peers fanout_peers += selected_peers @@ -308,11 +292,13 @@ class GossipSub(IPubsubRouter): # FIXME: type of `peers` should be changed to `List[ID]` # FIXME: type of `msg_sender` and `origin_id` should be changed to `ID` - async def deliver_messages_to_peers(self, - peers: List[str], - msg_sender: str, - origin_id: str, - serialized_packet: bytes) -> None: + async def deliver_messages_to_peers( + self, + peers: List[str], + msg_sender: str, + origin_id: str, + serialized_packet: bytes, + ) -> None: for peer_id_in_topic in peers: # Forward to all peers that are not the # message sender and are not the message origin @@ -349,16 +335,12 @@ class GossipSub(IPubsubRouter): if num_mesh_peers_in_topic < self.degree_low: # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree - num_mesh_peers_in_topic, - self.mesh[topic], + topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic] ) # FIXME: Should be changed to `List[ID]` fanout_peers_not_in_mesh: List[str] = [ - peer - for peer in selected_peers - if peer not in self.mesh[topic] + peer for peer in selected_peers if peer not in self.mesh[topic] ] for peer in fanout_peers_not_in_mesh: # Add peer to mesh[topic] @@ -371,9 +353,7 @@ class GossipSub(IPubsubRouter): # Select |mesh[topic]| - D peers from mesh[topic] # FIXME: Should be changed to `List[ID]` selected_peers = GossipSub.select_from_minus( - num_mesh_peers_in_topic - self.degree, - self.mesh[topic], - [], + num_mesh_peers_in_topic - self.degree, self.mesh[topic], [] ) for peer in selected_peers: # Remove peer from mesh[topic] @@ -415,15 +395,16 @@ class GossipSub(IPubsubRouter): if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree, - [], + topic, self.degree, [] ) for peer in peers_to_emit_ihave_to: # TODO: this line is a monster, can hopefully be simplified - if (topic not in self.mesh or (peer not in self.mesh[topic]))\ - and (topic not in self.fanout or (peer not in self.fanout[topic])): + if ( + topic not in self.mesh or (peer not in self.mesh[topic]) + ) and ( + topic not in self.fanout or (peer not in self.fanout[topic]) + ): msg_id_strs = [str(msg_id) for msg_id in msg_ids] await self.emit_ihave(topic, msg_id_strs, peer) @@ -438,12 +419,13 @@ class GossipSub(IPubsubRouter): if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree, - [], + topic, self.degree, [] ) for peer in peers_to_emit_ihave_to: - if peer not in self.mesh[topic] and peer not in self.fanout[topic]: + if ( + peer not in self.mesh[topic] + and peer not in self.fanout[topic] + ): msg_id_strs = [str(msg) for msg in msg_ids] await self.emit_ihave(topic, msg_id_strs, peer) @@ -451,9 +433,9 @@ class GossipSub(IPubsubRouter): self.mcache.shift() @staticmethod - def select_from_minus(num_to_select: int, - pool: Sequence[Any], - minus: Sequence[Any]) -> List[Any]: + def select_from_minus( + num_to_select: int, pool: Sequence[Any], minus: Sequence[Any] + ) -> List[Any]: """ Select at most num_to_select subset of elements from the set (pool - minus) randomly. :param num_to_select: number of elements to randomly select @@ -482,24 +464,22 @@ class GossipSub(IPubsubRouter): # FIXME: type of `minus` should be changed to type `Sequence[ID]` # FIXME: return type should be changed to type `List[ID]` def _get_in_topic_gossipsub_peers_from_minus( - self, - topic: str, - num_to_select: int, - minus: Sequence[str]) -> List[str]: + self, topic: str, num_to_select: int, minus: Sequence[str] + ) -> List[str]: gossipsub_peers_in_topic = [ peer_str for peer_str in self.pubsub.peer_topics[topic] if peer_str in self.peers_gossipsub ] return self.select_from_minus( - num_to_select, - gossipsub_peers_in_topic, - list(minus), + num_to_select, gossipsub_peers_in_topic, list(minus) ) # RPC handlers - async def handle_ihave(self, ihave_msg: rpc_pb2.Message, sender_peer_id: str) -> None: + async def handle_ihave( + self, ihave_msg: rpc_pb2.Message, sender_peer_id: str + ) -> None: """ Checks the seen set and requests unknown messages with an IWANT message. """ @@ -509,8 +489,7 @@ class GossipSub(IPubsubRouter): # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache seen_seqnos_and_peers = [ - seqno_and_from - for seqno_and_from in self.pubsub.seen_messages.keys() + seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys() ] # Add all unknown message ids (ids that appear in ihave_msg but not in seen_seqnos) to list @@ -526,7 +505,9 @@ class GossipSub(IPubsubRouter): if msg_ids_wanted: await self.emit_iwant(msg_ids_wanted, from_id_str) - async def handle_iwant(self, iwant_msg: rpc_pb2.Message, sender_peer_id: str) -> None: + async def handle_iwant( + self, iwant_msg: rpc_pb2.Message, sender_peer_id: str + ) -> None: """ Forwards all request messages that are present in mcache to the requesting peer. """ @@ -564,7 +545,9 @@ class GossipSub(IPubsubRouter): # 4) And write the packet to the stream await peer_stream.write(rpc_msg) - async def handle_graft(self, graft_msg: rpc_pb2.Message, sender_peer_id: str) -> None: + async def handle_graft( + self, graft_msg: rpc_pb2.Message, sender_peer_id: str + ) -> None: topic: str = graft_msg.topicID from_id_str = sender_peer_id @@ -577,7 +560,9 @@ class GossipSub(IPubsubRouter): # Respond with PRUNE if not subscribed to the topic await self.emit_prune(topic, sender_peer_id) - async def handle_prune(self, prune_msg: rpc_pb2.Message, sender_peer_id: str) -> None: + async def handle_prune( + self, prune_msg: rpc_pb2.Message, sender_peer_id: str + ) -> None: topic: str = prune_msg.topicID from_id_str = sender_peer_id @@ -641,7 +626,9 @@ class GossipSub(IPubsubRouter): await self.emit_control_message(control_msg, to_peer) - async def emit_control_message(self, control_msg: rpc_pb2.ControlMessage, to_peer: str) -> None: + async def emit_control_message( + self, control_msg: rpc_pb2.ControlMessage, to_peer: str + ) -> None: # Add control message to packet packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.control.CopyFrom(control_msg) diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py index ef140c59..08390b71 100644 --- a/libp2p/pubsub/mcache.py +++ b/libp2p/pubsub/mcache.py @@ -1,10 +1,4 @@ -from typing import ( - Dict, - List, - Optional, - Sequence, - Tuple, -) +from typing import Dict, List, Optional, Sequence, Tuple from .pb import rpc_pb2 @@ -18,6 +12,7 @@ class CacheEntry: """ A logical representation of an entry in the mcache's _history_. """ + def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None: """ Constructor. @@ -30,7 +25,6 @@ class CacheEntry: class MessageCache: - window_size: int history_size: int @@ -53,10 +47,7 @@ class MessageCache: # max length of history_size. each item is a list of CacheEntry. # messages lost upon shift(). - self.history = [ - [] - for _ in range(history_size) - ] + self.history = [[] for _ in range(history_size)] def put(self, msg: rpc_pb2.Message) -> None: """ diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 75697860..6872aa33 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,13 +1,7 @@ # pylint: disable=no-name-in-module import asyncio import time -from typing import ( - Any, - Dict, - List, - Tuple, - TYPE_CHECKING, -) +from typing import Any, Dict, List, Tuple, TYPE_CHECKING from lru import LRU @@ -34,18 +28,18 @@ class Pubsub: host: IHost my_id: ID - router: 'IPubsubRouter' + router: "IPubsubRouter" - peer_queue: 'asyncio.Queue[ID]' + peer_queue: "asyncio.Queue[ID]" protocols: List[str] - incoming_msgs_from_peers: 'asyncio.Queue[rpc_pb2.Message]' - outgoing_messages: 'asyncio.Queue[rpc_pb2.Message]' + incoming_msgs_from_peers: "asyncio.Queue[rpc_pb2.Message]" + outgoing_messages: "asyncio.Queue[rpc_pb2.Message]" seen_messages: LRU - my_topics: Dict[str, 'asyncio.Queue[rpc_pb2.Message]'] + my_topics: Dict[str, "asyncio.Queue[rpc_pb2.Message]"] # FIXME: Should be changed to `Dict[str, List[ID]]` peer_topics: Dict[str, List[str]] @@ -55,11 +49,9 @@ class Pubsub: # NOTE: Be sure it is increased atomically everytime. counter: int # uint64 - def __init__(self, - host: IHost, - router: 'IPubsubRouter', - my_id: ID, - cache_size: int = None) -> None: + def __init__( + self, host: IHost, router: "IPubsubRouter", my_id: ID, cache_size: int = None + ) -> None: """ Construct a new Pubsub object, which is responsible for handling all Pubsub-related messages and relaying messages as appropriate to the @@ -120,12 +112,9 @@ class Pubsub: """ packet = rpc_pb2.RPC() for topic_id in self.my_topics: - packet.subscriptions.extend([ - rpc_pb2.RPC.SubOpts( - subscribe=True, - topicid=topic_id, - ) - ]) + packet.subscriptions.extend( + [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] + ) return packet.SerializeToString() async def continuously_read_stream(self, stream: INetStream) -> None: @@ -262,7 +251,7 @@ class Pubsub: # for each topic await self.my_topics[topic].put(publish_message) - async def subscribe(self, topic_id: str) -> 'asyncio.Queue[rpc_pb2.Message]': + async def subscribe(self, topic_id: str) -> "asyncio.Queue[rpc_pb2.Message]": """ Subscribe ourself to a topic :param topic_id: topic_id to subscribe to @@ -277,10 +266,9 @@ class Pubsub: # Create subscribe message packet: rpc_pb2.RPC = rpc_pb2.RPC() - packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe=True, - topicid=topic_id.encode('utf-8') - )]) + packet.subscriptions.extend( + [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id.encode("utf-8"))] + ) # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -305,10 +293,9 @@ class Pubsub: # Create unsubscribe message packet: rpc_pb2.RPC = rpc_pb2.RPC() - packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe=False, - topicid=topic_id.encode('utf-8') - )]) + packet.subscriptions.extend( + [rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id.encode("utf-8"))] + ) # Send out unsubscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -371,7 +358,7 @@ class Pubsub: Make the next message sequence id. """ self.counter += 1 - return self.counter.to_bytes(8, 'big') + return self.counter.to_bytes(8, "big") def _is_msg_seen(self, msg: rpc_pb2.Message) -> bool: msg_id = get_msg_id(msg) diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 9f06176e..e73bd170 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -1,6 +1,4 @@ -from typing import ( - TYPE_CHECKING, -) +from typing import TYPE_CHECKING from multiaddr import Multiaddr @@ -18,9 +16,9 @@ if TYPE_CHECKING: class PubsubNotifee(INotifee): # pylint: disable=too-many-instance-attributes, cell-var-from-loop, unsubscriptable-object - initiator_peers_queue: 'asyncio.Queue[ID]' + initiator_peers_queue: "asyncio.Queue[ID]" - def __init__(self, initiator_peers_queue: 'asyncio.Queue[ID]') -> None: + def __init__(self, initiator_peers_queue: "asyncio.Queue[ID]") -> None: """ :param initiator_peers_queue: queue to add new peers to so that pubsub can process new peers after we connect to them diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 8a6a879c..38c5394a 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -1,8 +1,5 @@ from abc import ABC, abstractmethod -from typing import ( - List, - TYPE_CHECKING, -) +from typing import List, TYPE_CHECKING from libp2p.peer.id import ID @@ -11,8 +8,8 @@ from .pb import rpc_pb2 if TYPE_CHECKING: from .pubsub import Pubsub -class IPubsubRouter(ABC): +class IPubsubRouter(ABC): @abstractmethod def get_protocols(self) -> List[str]: """ @@ -20,7 +17,7 @@ class IPubsubRouter(ABC): """ @abstractmethod - def attach(self, pubsub: 'Pubsub') -> None: + def attach(self, pubsub: "Pubsub") -> None: """ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance. diff --git a/libp2p/routing/interfaces.py b/libp2p/routing/interfaces.py index 5d0e63f3..d040e717 100644 --- a/libp2p/routing/interfaces.py +++ b/libp2p/routing/interfaces.py @@ -1,16 +1,13 @@ -from abc import ( - ABC, - abstractmethod, -) +from abc import ABC, abstractmethod from typing import Iterable from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo + # pylint: disable=too-few-public-methods class IContentRouting(ABC): - @abstractmethod def provide(self, cid: bytes, announce: bool = True) -> None: """ @@ -28,7 +25,6 @@ class IContentRouting(ABC): class IPeerRouting(ABC): - @abstractmethod async def find_peer(self, peer_id: ID) -> PeerInfo: """ diff --git a/libp2p/routing/kademlia/kademlia_content_router.py b/libp2p/routing/kademlia/kademlia_content_router.py index 468ca3ae..e66643c2 100644 --- a/libp2p/routing/kademlia/kademlia_content_router.py +++ b/libp2p/routing/kademlia/kademlia_content_router.py @@ -1,13 +1,10 @@ -from typing import ( - Iterable, -) +from typing import Iterable from libp2p.peer.peerinfo import PeerInfo from libp2p.routing.interfaces import IContentRouting class KadmeliaContentRouter(IContentRouting): - def provide(self, cid: bytes, announce: bool = True) -> None: """ Provide adds the given cid to the content routing system. If announce is True, diff --git a/libp2p/routing/kademlia/kademlia_peer_router.py b/libp2p/routing/kademlia/kademlia_peer_router.py index 5e426fe5..9ccbcd9c 100644 --- a/libp2p/routing/kademlia/kademlia_peer_router.py +++ b/libp2p/routing/kademlia/kademlia_peer_router.py @@ -1,12 +1,7 @@ import ast -from typing import ( - Union, -) +from typing import Union -from libp2p.kademlia.kad_peerinfo import ( - KadPeerInfo, - create_kad_peerinfo, -) +from libp2p.kademlia.kad_peerinfo import KadPeerInfo, create_kad_peerinfo from libp2p.kademlia.network import KademliaServer from libp2p.peer.id import ID from libp2p.routing.interfaces import IPeerRouting @@ -31,6 +26,7 @@ class KadmeliaPeerRouter(IPeerRouting): value = await self.server.get(xor_id) return decode_peerinfo(value) + def decode_peerinfo(encoded: Union[bytes, str]) -> KadPeerInfo: if isinstance(encoded, bytes): encoded = encoded.decode() @@ -38,7 +34,7 @@ def decode_peerinfo(encoded: Union[bytes, str]) -> KadPeerInfo: lines = ast.literal_eval(encoded) except SyntaxError: return None - ip = lines[1] # pylint: disable=invalid-name + ip = lines[1] # pylint: disable=invalid-name port = lines[2] peer_id = lines[3] peer_info = create_kad_peerinfo(peer_id, ip, port) diff --git a/libp2p/security/insecure_security.py b/libp2p/security/insecure_security.py index dfa80a7e..d20d29b3 100644 --- a/libp2p/security/insecure_security.py +++ b/libp2p/security/insecure_security.py @@ -1,8 +1,8 @@ from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.secure_conn_interface import ISecureConn -class InsecureTransport(ISecureTransport): +class InsecureTransport(ISecureTransport): def __init__(self, transport_id): self.transport_id = transport_id @@ -24,8 +24,8 @@ class InsecureTransport(ISecureTransport): insecure_conn = InsecureConn(conn, self.transport_id) return insecure_conn -class InsecureConn(ISecureConn): +class InsecureConn(ISecureConn): def __init__(self, conn, conn_id): self.conn = conn self.details = {} diff --git a/libp2p/security/secure_conn_interface.py b/libp2p/security/secure_conn_interface.py index e8433a29..c71a54ac 100644 --- a/libp2p/security/secure_conn_interface.py +++ b/libp2p/security/secure_conn_interface.py @@ -8,8 +8,9 @@ involved in the secured connection Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interface.go """ -class ISecureConn(ABC): + +class ISecureConn(ABC): @abstractmethod def get_conn(self): """ diff --git a/libp2p/security/secure_transport_interface.py b/libp2p/security/secure_transport_interface.py index 54ca8b17..82096431 100644 --- a/libp2p/security/secure_transport_interface.py +++ b/libp2p/security/secure_transport_interface.py @@ -8,8 +8,9 @@ chosen by a security transport multistream module. Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interface.go """ -class ISecureTransport(ABC): + +class ISecureTransport(ABC): @abstractmethod async def secure_inbound(self, conn): """ diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index c8d2f884..7b07b8bd 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -10,8 +10,9 @@ involved in the secured connection Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interface.go """ -class SecurityMultistream(ABC): + +class SecurityMultistream(ABC): def __init__(self): # Map protocol to secure transport self.transports = {} @@ -31,7 +32,6 @@ class SecurityMultistream(ABC): # we only care about selecting the protocol, not any handler function self.multiselect.add_handler(protocol, None) - async def secure_inbound(self, conn): """ Secure the connection, either locally or by communicating with opposing node via conn, @@ -47,7 +47,6 @@ class SecurityMultistream(ABC): return secure_conn - async def secure_outbound(self, conn, peer_id): """ Secure the connection, either locally or by communicating with opposing node via conn, @@ -63,7 +62,6 @@ class SecurityMultistream(ABC): return secure_conn - async def select_transport(self, conn, initiator): """ Select a transport that both us and the node on the @@ -79,8 +77,9 @@ class SecurityMultistream(ABC): protocol = None if initiator: # Select protocol if initiator - protocol = \ - await self.multiselect_client.select_one_of(list(self.transports.keys()), conn) + protocol = await self.multiselect_client.select_one_of( + list(self.transports.keys()), conn + ) else: # Select protocol if non-initiator protocol, _ = await self.multiselect.negotiate(conn) diff --git a/libp2p/security/simple_security.py b/libp2p/security/simple_security.py index 62f56668..1860860c 100644 --- a/libp2p/security/simple_security.py +++ b/libp2p/security/simple_security.py @@ -2,8 +2,8 @@ import asyncio from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.secure_conn_interface import ISecureConn -class SimpleSecurityTransport(ISecureTransport): +class SimpleSecurityTransport(ISecureTransport): def __init__(self, key_phrase): self.key_phrase = key_phrase @@ -17,7 +17,9 @@ class SimpleSecurityTransport(ISecureTransport): incoming = (await conn.read()).decode() if incoming != self.key_phrase: - raise Exception("Key phrase differed between nodes. Expected " + self.key_phrase) + raise Exception( + "Key phrase differed between nodes. Expected " + self.key_phrase + ) secure_conn = SimpleSecureConn(conn, self.key_phrase) return secure_conn @@ -36,13 +38,15 @@ class SimpleSecurityTransport(ISecureTransport): await asyncio.sleep(0) if incoming != self.key_phrase: - raise Exception("Key phrase differed between nodes. Expected " + self.key_phrase) + raise Exception( + "Key phrase differed between nodes. Expected " + self.key_phrase + ) secure_conn = SimpleSecureConn(conn, self.key_phrase) return secure_conn -class SimpleSecureConn(ISecureConn): +class SimpleSecureConn(ISecureConn): def __init__(self, conn, key_phrase): self.conn = conn self.details = {} diff --git a/libp2p/stream_muxer/mplex/constants.py b/libp2p/stream_muxer/mplex/constants.py index a6dfcab9..a0537b2e 100644 --- a/libp2p/stream_muxer/mplex/constants.py +++ b/libp2p/stream_muxer/mplex/constants.py @@ -1,6 +1 @@ -HEADER_TAGS = { - "NEW_STREAM": 0, - "MESSAGE": 2, - "CLOSE": 4, - "RESET": 6 -} +HEADER_TAGS = {"NEW_STREAM": 0, "MESSAGE": 2, "CLOSE": 4, "RESET": 6} diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index e660a524..ecbb6c7d 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -158,7 +158,9 @@ class Mplex(IMuxedConn): try: header = await decode_uvarint_from_stream(self.raw_conn.reader, timeout) length = await decode_uvarint_from_stream(self.raw_conn.reader, timeout) - message = await asyncio.wait_for(self.raw_conn.reader.read(length), timeout=timeout) + message = await asyncio.wait_for( + self.raw_conn.reader.read(length), timeout=timeout + ) except asyncio.TimeoutError: return None, None, None diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 08b30d55..c7ebb541 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -40,7 +40,8 @@ class MplexStream(IMuxedStream): :return: number of bytes written """ return await self.mplex_conn.send_message( - get_flag(self.initiator, "MESSAGE"), data, self.stream_id) + get_flag(self.initiator, "MESSAGE"), data, self.stream_id + ) async def close(self): """ @@ -50,7 +51,9 @@ class MplexStream(IMuxedStream): """ # TODO error handling with timeout # TODO understand better how mutexes are used from go repo - await self.mplex_conn.send_message(get_flag(self.initiator, "CLOSE"), None, self.stream_id) + await self.mplex_conn.send_message( + get_flag(self.initiator, "CLOSE"), None, self.stream_id + ) remote_lock = "" async with self.stream_lock: @@ -79,7 +82,8 @@ class MplexStream(IMuxedStream): if not self.remote_closed: await self.mplex_conn.send_message( - get_flag(self.initiator, "RESET"), None, self.stream_id) + get_flag(self.initiator, "RESET"), None, self.stream_id + ) self.local_closed = True self.remote_closed = True diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 96a2dc5b..b5c852f6 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -5,14 +5,14 @@ from .constants import HEADER_TAGS def encode_uvarint(number): """Pack `number` into varint bytes""" - buf = b'' + buf = b"" while True: - towrite = number & 0x7f + towrite = number & 0x7F number >>= 7 if number: - buf += bytes((towrite | 0x80, )) + buf += bytes((towrite | 0x80,)) else: - buf += bytes((towrite, )) + buf += bytes((towrite,)) break return buf @@ -22,7 +22,7 @@ def decode_uvarint(buff, index): result = 0 while True: i = buff[index] - result |= (i & 0x7f) << shift + result |= (i & 0x7F) << shift shift += 7 if not i & 0x80: break @@ -30,19 +30,21 @@ def decode_uvarint(buff, index): return result, index + 1 + async def decode_uvarint_from_stream(reader, timeout): shift = 0 result = 0 while True: byte = await asyncio.wait_for(reader.read(1), timeout=timeout) - i = struct.unpack('>H', b'\x00' + byte)[0] - result |= (i & 0x7f) << shift + i = struct.unpack(">H", b"\x00" + byte)[0] + result |= (i & 0x7F) << shift shift += 7 if not i & 0x80: break return result + def get_flag(initiator, action): """ get header flag based on action for mplex diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py index eb6f2672..00ce5867 100644 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ b/libp2p/stream_muxer/muxed_stream_interface.py @@ -1,7 +1,4 @@ -from abc import ( - ABC, - abstractmethod, -) +from abc import ABC, abstractmethod from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn diff --git a/libp2p/transport/listener_interface.py b/libp2p/transport/listener_interface.py index 174f267a..5f84ef6c 100644 --- a/libp2p/transport/listener_interface.py +++ b/libp2p/transport/listener_interface.py @@ -2,7 +2,6 @@ from abc import ABC, abstractmethod class IListener(ABC): - @abstractmethod def listen(self, maddr): """ diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index bcbb5332..8f884302 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -10,12 +10,10 @@ from ..transport_interface import ITransport class TCP(ITransport): - def __init__(self): self.listener = self.Listener() class Listener(IListener): - def __init__(self, handler_function=None): self.multiaddrs = [] self.server = None @@ -29,8 +27,8 @@ class TCP(ITransport): """ self.server = await asyncio.start_server( self.handler, - maddr.value_for_protocol('ip4'), - maddr.value_for_protocol('tcp'), + maddr.value_for_protocol("ip4"), + maddr.value_for_protocol("tcp"), ) socket = self.server.sockets[0] self.multiaddrs.append(_multiaddr_from_socket(socket)) @@ -70,8 +68,8 @@ class TCP(ITransport): :param options: optional object :return: True if successful """ - host = maddr.value_for_protocol('ip4') - port = int(maddr.value_for_protocol('tcp')) + host = maddr.value_for_protocol("ip4") + port = int(maddr.value_for_protocol("tcp")) reader, writer = await asyncio.open_connection(host, port) diff --git a/libp2p/transport/transport_interface.py b/libp2p/transport/transport_interface.py index 42acb0da..b067e5b5 100644 --- a/libp2p/transport/transport_interface.py +++ b/libp2p/transport/transport_interface.py @@ -2,7 +2,6 @@ from abc import ABC, abstractmethod class ITransport(ABC): - @abstractmethod def dial(self, maddr, self_id, options=None): """ diff --git a/setup.py b/setup.py index 284d4bd1..f2b5a0e6 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,7 @@ import setuptools -classifiers = [ - f"Programming Language :: Python :: {version}" - for version in ["3.7"] -] +classifiers = [f"Programming Language :: Python :: {version}" for version in ["3.7"]] # pylint: disable=invalid-name @@ -15,17 +12,13 @@ extras_require = { "pytest-cov>=2.7.1,<3.0.0", "pytest-asyncio>=0.10.0,<1.0.0", ], - "lint": [ - "pylint>=2.3.1,<3.0.0", - "mypy>=0.701,<1.0", - "black==19.3b0", - ], - "dev": [ - "tox>=3.13.2,<4.0.0", - ], + "lint": ["pylint>=2.3.1,<3.0.0", "mypy>=0.701,<1.0", "black==19.3b0"], + "dev": ["tox>=3.13.2,<4.0.0"], } -extras_require["dev"] = extras_require["test"] + extras_require["lint"] + extras_require["dev"] +extras_require["dev"] = ( + extras_require["test"] + extras_require["lint"] + extras_require["dev"] +) setuptools.setup( diff --git a/tests/examples/test_chat.py b/tests/examples/test_chat.py index ee192cca..3c949a0d 100644 --- a/tests/examples/test_chat.py +++ b/tests/examples/test_chat.py @@ -8,13 +8,14 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.protocol_muxer.multiselect_client import MultiselectClientError -PROTOCOL_ID = '/chat/1.0.0' +PROTOCOL_ID = "/chat/1.0.0" + async def hello_world(host_a, host_b): async def stream_handler(stream): read = await stream.read() - assert read == b'hello world from host b' - await stream.write(b'hello world from host a') + assert read == b"hello world from host b" + await stream.write(b"hello world from host a") await stream.close() host_a.set_stream_handler(PROTOCOL_ID, stream_handler) @@ -22,14 +23,14 @@ async def hello_world(host_a, host_b): # Start a stream with the destination. # Multiaddress of the destination peer is fetched from the peerstore using 'peerId'. stream = await host_b.new_stream(host_a.get_id(), [PROTOCOL_ID]) - await stream.write(b'hello world from host b') + await stream.write(b"hello world from host b") read = await stream.read() - assert read == b'hello world from host a' + assert read == b"hello world from host a" await stream.close() async def connect_write(host_a, host_b): - messages = ['data %d' % i for i in range(5)] + messages = ["data %d" % i for i in range(5)] received = [] async def stream_handler(stream): @@ -38,6 +39,7 @@ async def connect_write(host_a, host_b): received.append((await stream.read()).decode()) except Exception: # exception is raised when other side close the stream ? break + host_a.set_stream_handler(PROTOCOL_ID, stream_handler) # Start a stream with the destination. @@ -54,7 +56,7 @@ async def connect_write(host_a, host_b): async def connect_read(host_a, host_b): - messages = [b'data %d' % i for i in range(5)] + messages = [b"data %d" % i for i in range(5)] async def stream_handler(stream): for message in messages: @@ -78,7 +80,7 @@ async def connect_read(host_a, host_b): async def no_common_protocol(host_a, host_b): - messages = [b'data %d' % i for i in range(5)] + messages = [b"data %d" % i for i in range(5)] async def stream_handler(stream): for message in messages: @@ -89,16 +91,13 @@ async def no_common_protocol(host_a, host_b): # try to creates a new new with a procotol not known by the other host with pytest.raises(MultiselectClientError): - _ = await host_b.new_stream(host_a.get_id(), ['/fakeproto/0.0.1']) + _ = await host_b.new_stream(host_a.get_id(), ["/fakeproto/0.0.1"]) @pytest.mark.asyncio -@pytest.mark.parametrize("test", [ - (hello_world), - (connect_write), - (connect_read), - (no_common_protocol), -]) +@pytest.mark.parametrize( + "test", [(hello_world), (connect_write), (connect_read), (no_common_protocol)] +) async def test_chat(test): transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] (host_a, host_b) = await set_up_nodes_by_transport_opt(transport_opt_list) diff --git a/tests/kademlia/test_basic.py b/tests/kademlia/test_basic.py index b1cc7121..03e92c68 100644 --- a/tests/kademlia/test_basic.py +++ b/tests/kademlia/test_basic.py @@ -25,7 +25,7 @@ async def test_example(): assert await node_a.get(key) == value -@pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(2, 5)]) +@pytest.mark.parametrize("nodes_nr", [(2 ** i) for i in range(2, 5)]) @pytest.mark.asyncio async def test_multiple_nodes_bootstrap_set_get(nodes_nr): @@ -53,7 +53,7 @@ async def test_multiple_nodes_bootstrap_set_get(nodes_nr): assert await node.get(key) == value -@pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(2, 5)]) +@pytest.mark.parametrize("nodes_nr", [(2 ** i) for i in range(2, 5)]) @pytest.mark.asyncio async def test_multiple_nodes_set_bootstrap_get(nodes_nr): node_bootstrap = KademliaServer() diff --git a/tests/libp2p/test_libp2p.py b/tests/libp2p/test_libp2p.py index a7465036..b98738f1 100644 --- a/tests/libp2p/test_libp2p.py +++ b/tests/libp2p/test_libp2p.py @@ -115,11 +115,14 @@ async def test_multiple_streams(): response_a = (await stream_a.read()).decode() response_b = (await stream_b.read()).decode() - assert response_a == ("ack_b:" + a_message) and response_b == ("ack_a:" + b_message) + assert response_a == ("ack_b:" + a_message) and response_b == ( + "ack_a:" + b_message + ) # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_multiple_streams_same_initiator_different_protocols(): transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] @@ -173,13 +176,16 @@ async def test_multiple_streams_same_initiator_different_protocols(): response_a2 = (await stream_a2.read()).decode() response_a3 = (await stream_a3.read()).decode() - assert (response_a1 == ("ack_a1:" + a1_message) - and response_a2 == ("ack_a2:" + a2_message) - and response_a3 == ("ack_a3:" + a3_message)) + assert ( + response_a1 == ("ack_a1:" + a1_message) + and response_a2 == ("ack_a2:" + a2_message) + and response_a3 == ("ack_a3:" + a3_message) + ) # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_multiple_streams_two_initiators(): transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] @@ -250,18 +256,24 @@ async def test_multiple_streams_two_initiators(): response_b1 = (await stream_b1.read()).decode() response_b2 = (await stream_b2.read()).decode() - assert (response_a1 == ("ack_a1:" + a1_message) - and response_a2 == ("ack_a2:" + a2_message) - and response_b1 == ("ack_b1:" + b1_message) - and response_b2 == ("ack_b2:" + b2_message)) + assert ( + response_a1 == ("ack_a1:" + a1_message) + and response_a2 == ("ack_a2:" + a2_message) + and response_b1 == ("ack_b1:" + b1_message) + and response_b2 == ("ack_b2:" + b2_message) + ) # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_triangle_nodes_connection(): - transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"],\ - ["/ip4/127.0.0.1/tcp/0"]] + transport_opt_list = [ + ["/ip4/127.0.0.1/tcp/0"], + ["/ip4/127.0.0.1/tcp/0"], + ["/ip4/127.0.0.1/tcp/0"], + ] (node_a, node_b, node_c) = await set_up_nodes_by_transport_opt(transport_opt_list) async def stream_handler(stream): @@ -296,8 +308,14 @@ async def test_triangle_nodes_connection(): stream_c_to_b = await node_c.new_stream(node_b.get_id(), ["/echo/1.0.0"]) messages = ["hello" + str(x) for x in range(5)] - streams = [stream_a_to_b, stream_a_to_c, stream_b_to_a, stream_b_to_c, - stream_c_to_a, stream_c_to_b] + streams = [ + stream_a_to_b, + stream_a_to_c, + stream_b_to_a, + stream_b_to_c, + stream_c_to_a, + stream_c_to_b, + ] for message in messages: for stream in streams: @@ -330,7 +348,7 @@ async def test_host_connect(): assert len(node_a.get_peerstore().peer_ids()) == 1 assert node_b.get_id() in node_a.get_peerstore().peer_ids() - ma_node_b = multiaddr.Multiaddr('/p2p/%s' % node_b.get_id().pretty()) + ma_node_b = multiaddr.Multiaddr("/p2p/%s" % node_b.get_id().pretty()) for addr in node_a.get_peerstore().addrs(node_b.get_id()): assert addr.encapsulate(ma_node_b) in node_b.get_addrs() diff --git a/tests/libp2p/test_notify.py b/tests/libp2p/test_notify.py index 570ad57f..4dfd8805 100644 --- a/tests/libp2p/test_notify.py +++ b/tests/libp2p/test_notify.py @@ -13,14 +13,18 @@ import pytest import multiaddr -from tests.utils import cleanup, echo_stream_handler, \ - perform_two_host_set_up_custom_handler +from tests.utils import ( + cleanup, + echo_stream_handler, + perform_two_host_set_up_custom_handler, +) from libp2p import new_node, initialize_default_swarm from libp2p.network.notifee_interface import INotifee from libp2p.host.basic_host import BasicHost # pylint: disable=too-many-locals + class MyNotifee(INotifee): # pylint: disable=too-many-instance-attributes, cell-var-from-loop @@ -29,28 +33,25 @@ class MyNotifee(INotifee): self.val_to_append_to_event = val_to_append_to_event async def opened_stream(self, network, stream): - self.events.append(["opened_stream" + \ - self.val_to_append_to_event, stream]) + self.events.append(["opened_stream" + self.val_to_append_to_event, stream]) async def closed_stream(self, network, stream): pass async def connected(self, network, conn): - self.events.append(["connected" + self.val_to_append_to_event,\ - conn]) + self.events.append(["connected" + self.val_to_append_to_event, conn]) async def disconnected(self, network, conn): pass async def listen(self, network, _multiaddr): - self.events.append(["listened" + self.val_to_append_to_event,\ - _multiaddr]) + self.events.append(["listened" + self.val_to_append_to_event, _multiaddr]) async def listen_close(self, network, _multiaddr): pass -class InvalidNotifee(): +class InvalidNotifee: # pylint: disable=too-many-instance-attributes, cell-var-from-loop def __init__(self): @@ -114,8 +115,7 @@ async def test_one_notifier(): # Ensure the connected and opened_stream events were hit in MyNotifee obj # and that stream passed into opened_stream matches the stream created on # node_a - assert events == [["connected0", stream.mplex_conn], \ - ["opened_stream0", stream]] + assert events == [["connected0", stream.mplex_conn], ["opened_stream0", stream]] messages = ["hello", "hello"] for message in messages: @@ -128,6 +128,7 @@ async def test_one_notifier(): # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_one_notifier_on_two_nodes(): events_b = [] @@ -136,8 +137,10 @@ async def test_one_notifier_on_two_nodes(): # Ensure the connected and opened_stream events were hit in Notifee obj # and that the stream passed into opened_stream matches the stream created on # node_b - assert events_b == [["connectedb", stream.mplex_conn], \ - ["opened_streamb", stream]] + assert events_b == [ + ["connectedb", stream.mplex_conn], + ["opened_streamb", stream], + ] while True: read_string = (await stream.read()).decode() @@ -158,8 +161,7 @@ async def test_one_notifier_on_two_nodes(): # Ensure the connected and opened_stream events were hit in MyNotifee obj # and that stream passed into opened_stream matches the stream created on # node_a - assert events_a == [["connecteda", stream.mplex_conn], \ - ["opened_streama", stream]] + assert events_a == [["connecteda", stream.mplex_conn], ["opened_streama", stream]] messages = ["hello", "hello"] for message in messages: @@ -172,6 +174,7 @@ async def test_one_notifier_on_two_nodes(): # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_one_notifier_on_two_nodes_with_listen(): events_b = [] @@ -191,9 +194,9 @@ async def test_one_notifier_on_two_nodes_with_listen(): # and that the stream passed into opened_stream matches the stream created on # node_b assert events_b == [ - ["listenedb", node_b_multiaddr], \ - ["connectedb", stream.mplex_conn], \ - ["opened_streamb", stream] + ["listenedb", node_b_multiaddr], + ["connectedb", stream.mplex_conn], + ["opened_streamb", stream], ] while True: read_string = (await stream.read()).decode() @@ -219,10 +222,7 @@ async def test_one_notifier_on_two_nodes_with_listen(): # Ensure the connected and opened_stream events were hit in MyNotifee obj # and that stream passed into opened_stream matches the stream created on # node_a - assert events_a == [ - ["connecteda", stream.mplex_conn], \ - ["opened_streama", stream] - ] + assert events_a == [["connecteda", stream.mplex_conn], ["opened_streama", stream]] messages = ["hello", "hello"] for message in messages: @@ -235,6 +235,7 @@ async def test_one_notifier_on_two_nodes_with_listen(): # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_two_notifiers(): node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler) @@ -254,7 +255,6 @@ async def test_two_notifiers(): assert events0 == [["connected0", stream.mplex_conn], ["opened_stream0", stream]] assert events1 == [["connected1", stream.mplex_conn], ["opened_stream1", stream]] - messages = ["hello", "hello"] for message in messages: await stream.write(message.encode()) @@ -266,6 +266,7 @@ async def test_two_notifiers(): # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_ten_notifiers(): num_notifiers = 10 @@ -284,8 +285,10 @@ async def test_ten_notifiers(): # and that the stream passed into opened_stream matches the stream created on # node_a for i in range(num_notifiers): - assert events_lst[i] == [["connected" + str(i), stream.mplex_conn], \ - ["opened_stream" + str(i), stream]] + assert events_lst[i] == [ + ["connected" + str(i), stream.mplex_conn], + ["opened_stream" + str(i), stream], + ] messages = ["hello", "hello"] for message in messages: @@ -298,6 +301,7 @@ async def test_ten_notifiers(): # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_ten_notifiers_on_two_nodes(): num_notifiers = 10 @@ -308,8 +312,10 @@ async def test_ten_notifiers_on_two_nodes(): # and that the stream passed into opened_stream matches the stream created on # node_b for i in range(num_notifiers): - assert events_lst_b[i] == [["connectedb" + str(i), stream.mplex_conn], \ - ["opened_streamb" + str(i), stream]] + assert events_lst_b[i] == [ + ["connectedb" + str(i), stream.mplex_conn], + ["opened_streamb" + str(i), stream], + ] while True: read_string = (await stream.read()).decode() @@ -332,8 +338,10 @@ async def test_ten_notifiers_on_two_nodes(): # and that the stream passed into opened_stream matches the stream created on # node_a for i in range(num_notifiers): - assert events_lst_a[i] == [["connecteda" + str(i), stream.mplex_conn], \ - ["opened_streama" + str(i), stream]] + assert events_lst_a[i] == [ + ["connecteda" + str(i), stream.mplex_conn], + ["opened_streama" + str(i), stream], + ] messages = ["hello", "hello"] for message in messages: @@ -346,6 +354,7 @@ async def test_ten_notifiers_on_two_nodes(): # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_invalid_notifee(): num_notifiers = 10 diff --git a/tests/network/test_connection.py b/tests/network/test_connection.py index 9b7602d4..a2b2c204 100644 --- a/tests/network/test_connection.py +++ b/tests/network/test_connection.py @@ -13,7 +13,7 @@ async def handle_echo(reader, writer): # TODO: this test should develop out into a fuller test between MPlex # modules communicating with each other. async def test_simple_echo(): - server_ip = '127.0.0.1' + server_ip = "127.0.0.1" server_port = 8888 await asyncio.start_server(handle_echo, server_ip, server_port) diff --git a/tests/peer/test_peerid.py b/tests/peer/test_peerid.py index d1cf7182..c863b5de 100644 --- a/tests/peer/test_peerid.py +++ b/tests/peer/test_peerid.py @@ -3,26 +3,35 @@ import multihash import pytest import base58 from Crypto.PublicKey import RSA -from libp2p.peer.id import ID, id_b58_encode, id_b58_decode, id_from_public_key, id_from_private_key +from libp2p.peer.id import ( + ID, + id_b58_encode, + id_b58_decode, + id_from_public_key, + id_from_private_key, +) -ALPHABETS = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' +ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" + def test_init_(): - random_id_string = '' + random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) peer_id = ID(random_id_string) - #pylint: disable=protected-access + # pylint: disable=protected-access assert peer_id._id_str == random_id_string + def test_no_init_value(): with pytest.raises(Exception) as _: - #pylint: disable=no-value-for-parameter + # pylint: disable=no-value-for-parameter ID() + def test_pretty(): - random_id_string = '' + random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) peer_id = ID(random_id_string) @@ -31,8 +40,9 @@ def test_pretty(): assert actual == expected + def test_str_less_than_10(): - random_id_string = '' + random_id_string = "" for _ in range(5): random_id_string += random.SystemRandom().choice(ALPHABETS) pid = base58.b58encode(random_id_string).decode() @@ -41,8 +51,9 @@ def test_str_less_than_10(): assert actual == expected + def test_str_more_than_10(): - random_id_string = '' + random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) pid = base58.b58encode(random_id_string).decode() @@ -51,8 +62,9 @@ def test_str_more_than_10(): assert actual == expected + def test_eq_true(): - random_id_string = '' + random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) other = ID(random_id_string) @@ -62,6 +74,7 @@ def test_eq_true(): assert actual == expected + def test_eq_false(): other = ID("efgh") @@ -70,8 +83,9 @@ def test_eq_false(): assert actual == expected + def test_hash(): - random_id_string = '' + random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) @@ -80,8 +94,9 @@ def test_hash(): assert actual == expected + def test_id_b58_encode(): - random_id_string = '' + random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) expected = base58.b58encode(random_id_string).decode() @@ -89,8 +104,9 @@ def test_id_b58_encode(): assert actual == expected + def test_id_b58_decode(): - random_id_string = '' + random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) expected = ID(base58.b58decode(random_id_string)) @@ -98,6 +114,7 @@ def test_id_b58_decode(): assert actual == expected + def test_id_from_public_key(): bits_list = [1024, 1280, 1536, 1536, 2048] key = RSA.generate(random.choice(bits_list)) @@ -109,9 +126,9 @@ def test_id_from_public_key(): assert actual == expected + def test_id_from_private_key(): key = RSA.generate(2048, e=65537) id_from_pub = id_from_public_key(key.publickey()) id_from_priv = id_from_private_key(key) assert id_from_pub == id_from_priv - \ No newline at end of file diff --git a/tests/peer/test_peerinfo.py b/tests/peer/test_peerinfo.py index 75a78cef..67b9b802 100644 --- a/tests/peer/test_peerinfo.py +++ b/tests/peer/test_peerinfo.py @@ -9,14 +9,14 @@ from libp2p.peer.peerdata import PeerData from libp2p.peer.id import ID -ALPHABETS = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' +ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" def test_init_(): peer_data = PeerData() random_addrs = [random.randint(0, 255) for r in range(4)] peer_data.add_addrs(random_addrs) - random_id_string = '' + random_id_string = "" for _ in range(10): random_id_string += random.SystemRandom().choice(ALPHABETS) peer_id = ID(random_id_string) @@ -33,15 +33,16 @@ def test_init_no_value(): @pytest.mark.parametrize( - 'addr', + "addr", ( pytest.param(None), - pytest.param(random.randint(0, 255), id='random integer'), - pytest.param(multiaddr.Multiaddr('/'), id='empty multiaddr'), + pytest.param(random.randint(0, 255), id="random integer"), + pytest.param(multiaddr.Multiaddr("/"), id="empty multiaddr"), pytest.param( - multiaddr.Multiaddr('/ip4/127.0.0.1'), id='multiaddr without peer_id(p2p protocol)' + multiaddr.Multiaddr("/ip4/127.0.0.1"), + id="multiaddr without peer_id(p2p protocol)", ), - ) + ), ) def test_info_from_p2p_addr_invalid(addr): with pytest.raises(InvalidAddrError): @@ -50,8 +51,13 @@ def test_info_from_p2p_addr_invalid(addr): def test_info_from_p2p_addr_valid(): # pylint: disable=line-too-long - m_addr = multiaddr.Multiaddr('/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ') + m_addr = multiaddr.Multiaddr( + "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" + ) info = info_from_p2p_addr(m_addr) - assert info.peer_id.pretty() == '3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ' + assert ( + info.peer_id.pretty() + == "3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" + ) assert len(info.addrs) == 1 - assert str(info.addrs[0]) == '/ip4/127.0.0.1/tcp/8000' + assert str(info.addrs[0]) == "/ip4/127.0.0.1/tcp/8000" diff --git a/tests/protocol_muxer/test_protocol_muxer.py b/tests/protocol_muxer/test_protocol_muxer.py index 00949faf..aba457b0 100644 --- a/tests/protocol_muxer/test_protocol_muxer.py +++ b/tests/protocol_muxer/test_protocol_muxer.py @@ -12,8 +12,9 @@ from libp2p.protocol_muxer.multiselect_client import MultiselectClientError # when using the same ports across tests -async def perform_simple_test(expected_selected_protocol, - protocols_for_client, protocols_with_handlers): +async def perform_simple_test( + expected_selected_protocol, protocols_for_client, protocols_with_handlers +): transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) @@ -51,15 +52,15 @@ async def perform_simple_test(expected_selected_protocol, @pytest.mark.asyncio async def test_single_protocol_succeeds(): expected_selected_protocol = "/echo/1.0.0" - await perform_simple_test(expected_selected_protocol, - ["/echo/1.0.0"], ["/echo/1.0.0"]) + await perform_simple_test( + expected_selected_protocol, ["/echo/1.0.0"], ["/echo/1.0.0"] + ) @pytest.mark.asyncio async def test_single_protocol_fails(): with pytest.raises(MultiselectClientError): - await perform_simple_test("", ["/echo/1.0.0"], - ["/potato/1.0.0"]) + await perform_simple_test("", ["/echo/1.0.0"], ["/potato/1.0.0"]) # Cleanup not reached on error await cleanup() @@ -70,8 +71,9 @@ async def test_multiple_protocol_first_is_valid_succeeds(): expected_selected_protocol = "/echo/1.0.0" protocols_for_client = ["/echo/1.0.0", "/potato/1.0.0"] protocols_for_listener = ["/foo/1.0.0", "/echo/1.0.0"] - await perform_simple_test(expected_selected_protocol, protocols_for_client, - protocols_for_listener) + await perform_simple_test( + expected_selected_protocol, protocols_for_client, protocols_for_listener + ) @pytest.mark.asyncio @@ -79,8 +81,9 @@ async def test_multiple_protocol_second_is_valid_succeeds(): expected_selected_protocol = "/foo/1.0.0" protocols_for_client = ["/rock/1.0.0", "/foo/1.0.0"] protocols_for_listener = ["/foo/1.0.0", "/echo/1.0.0"] - await perform_simple_test(expected_selected_protocol, protocols_for_client, - protocols_for_listener) + await perform_simple_test( + expected_selected_protocol, protocols_for_client, protocols_for_listener + ) @pytest.mark.asyncio @@ -88,8 +91,7 @@ async def test_multiple_protocol_fails(): protocols_for_client = ["/rock/1.0.0", "/foo/1.0.0", "/bar/1.0.0"] protocols_for_listener = ["/aspyn/1.0.0", "/rob/1.0.0", "/zx/1.0.0", "/alex/1.0.0"] with pytest.raises(MultiselectClientError): - await perform_simple_test("", protocols_for_client, - protocols_for_listener) + await perform_simple_test("", protocols_for_client, protocols_for_listener) # Cleanup not reached on error await cleanup() diff --git a/tests/pubsub/conftest.py b/tests/pubsub/conftest.py index cf8a26e8..93689aee 100644 --- a/tests/pubsub/conftest.py +++ b/tests/pubsub/conftest.py @@ -11,15 +11,12 @@ from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.gossipsub import GossipSub from libp2p.pubsub.pubsub import Pubsub -from .configs import ( - FLOODSUB_PROTOCOL_ID, - GOSSIPSUB_PROTOCOL_ID, - LISTEN_MADDR, -) +from .configs import FLOODSUB_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID, LISTEN_MADDR # pylint: disable=redefined-outer-name + @pytest.fixture def num_hosts(): return 3 @@ -27,14 +24,12 @@ def num_hosts(): @pytest.fixture async def hosts(num_hosts): - _hosts = await asyncio.gather(*[ - new_node(transport_opt=[str(LISTEN_MADDR)]) - for _ in range(num_hosts) - ]) - await asyncio.gather(*[ - _host.get_network().listen(LISTEN_MADDR) - for _host in _hosts - ]) + _hosts = await asyncio.gather( + *[new_node(transport_opt=[str(LISTEN_MADDR)]) for _ in range(num_hosts)] + ) + await asyncio.gather( + *[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts] + ) yield _hosts # Clean up listeners = [] @@ -42,18 +37,12 @@ async def hosts(num_hosts): for listener in _host.get_network().listeners.values(): listener.server.close() listeners.append(listener) - await asyncio.gather(*[ - listener.server.wait_closed() - for listener in listeners - ]) + await asyncio.gather(*[listener.server.wait_closed() for listener in listeners]) @pytest.fixture def floodsubs(num_hosts): - return tuple( - FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) - for _ in range(num_hosts) - ) + return tuple(FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) for _ in range(num_hosts)) class GossipsubParams(NamedTuple): @@ -74,10 +63,7 @@ def gossipsub_params(): @pytest.fixture def gossipsubs(num_hosts, gossipsub_params): yield tuple( - GossipSub( - protocols=[GOSSIPSUB_PROTOCOL_ID], - **gossipsub_params._asdict(), - ) + GossipSub(protocols=[GOSSIPSUB_PROTOCOL_ID], **gossipsub_params._asdict()) for _ in range(num_hosts) ) # TODO: Clean up @@ -90,11 +76,7 @@ def _make_pubsubs(hosts, pubsub_routers): f"length of hosts={len(hosts)}" ) return tuple( - Pubsub( - host=host, - router=router, - my_id=host.get_id(), - ) + Pubsub(host=host, router=router, my_id=host.get_id()) for host, router in zip(hosts, pubsub_routers) ) diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 42d17ca6..186b06cc 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -29,15 +29,12 @@ class DummyAccountNode: a dummy crypto blockchain. There is no actual blockchain, just a simple map indicating how much crypto each user in the mappings holds """ + libp2p_node: IHost pubsub: Pubsub floodsub: FloodSub - def __init__( - self, - libp2p_node: IHost, - pubsub: Pubsub, - floodsub: FloodSub): + def __init__(self, libp2p_node: IHost, pubsub: Pubsub, floodsub: FloodSub): self.libp2p_node = libp2p_node self.pubsub = pubsub self.floodsub = floodsub @@ -56,19 +53,13 @@ class DummyAccountNode: """ libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await libp2p_node.get_network().listen( + multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") + ) floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) - pubsub = Pubsub( - libp2p_node, - floodsub, - "a", - ) - return cls( - libp2p_node=libp2p_node, - pubsub=pubsub, - floodsub=floodsub, - ) + pubsub = Pubsub(libp2p_node, floodsub, "a") + return cls(libp2p_node=libp2p_node, pubsub=pubsub, floodsub=floodsub) async def handle_incoming_msgs(self): """ @@ -76,7 +67,7 @@ class DummyAccountNode: """ while True: incoming = await self.q.get() - msg_comps = incoming.data.decode('utf-8').split(",") + msg_comps = incoming.data.decode("utf-8").split(",") if msg_comps[0] == "send": self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) diff --git a/tests/pubsub/floodsub_integration_test_settings.py b/tests/pubsub/floodsub_integration_test_settings.py index 77d96a71..4d9f5436 100644 --- a/tests/pubsub/floodsub_integration_test_settings.py +++ b/tests/pubsub/floodsub_integration_test_settings.py @@ -6,15 +6,9 @@ from libp2p import new_node from libp2p.peer.id import ID from libp2p.pubsub.pubsub import Pubsub -from tests.utils import ( - cleanup, - connect, -) +from tests.utils import cleanup, connect -from .configs import ( - FLOODSUB_PROTOCOL_ID, - LISTEN_MADDR, -) +from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] @@ -23,192 +17,84 @@ FLOODSUB_PROTOCOL_TEST_CASES = [ { "name": "simple_two_nodes", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": b"foo", - "node_id": "A" - } - ] + "adj_list": {"A": ["B"]}, + "topic_map": {"topic1": ["B"]}, + "messages": [{"topics": ["topic1"], "data": b"foo", "node_id": "A"}], }, { "name": "three_nodes_two_topics", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "A": ["B"], - "B": ["C"], - }, - "topic_map": { - "topic1": ["B", "C"], - "topic2": ["B", "C"], - }, + "adj_list": {"A": ["B"], "B": ["C"]}, + "topic_map": {"topic1": ["B", "C"], "topic2": ["B", "C"]}, "messages": [ - { - "topics": ["topic1"], - "data": b"foo", - "node_id": "A", - }, - { - "topics": ["topic2"], - "data": b"Alex is tall", - "node_id": "A", - } - ] + {"topics": ["topic1"], "data": b"foo", "node_id": "A"}, + {"topics": ["topic2"], "data": b"Alex is tall", "node_id": "A"}, + ], }, { "name": "two_nodes_one_topic_single_subscriber_is_sender", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "A": ["B"], - }, - "topic_map": { - "topic1": ["B"], - }, - "messages": [ - { - "topics": ["topic1"], - "data": b"Alex is tall", - "node_id": "B", - } - ] + "adj_list": {"A": ["B"]}, + "topic_map": {"topic1": ["B"]}, + "messages": [{"topics": ["topic1"], "data": b"Alex is tall", "node_id": "B"}], }, { "name": "two_nodes_one_topic_two_msgs", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "A": ["B"], - }, - "topic_map": { - "topic1": ["B"], - }, + "adj_list": {"A": ["B"]}, + "topic_map": {"topic1": ["B"]}, "messages": [ - { - "topics": ["topic1"], - "data": b"Alex is tall", - "node_id": "B", - }, - { - "topics": ["topic1"], - "data": b"foo", - "node_id": "A", - } - ] + {"topics": ["topic1"], "data": b"Alex is tall", "node_id": "B"}, + {"topics": ["topic1"], "data": b"foo", "node_id": "A"}, + ], }, { "name": "seven_nodes_tree_one_topics", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"], - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"], - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - } - ] + "adj_list": {"1": ["2", "3"], "2": ["4", "5"], "3": ["6", "7"]}, + "topic_map": {"astrophysics": ["2", "3", "4", "5", "6", "7"]}, + "messages": [{"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"}], }, { "name": "seven_nodes_tree_three_topics", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"], - }, + "adj_list": {"1": ["2", "3"], "2": ["4", "5"], "3": ["6", "7"]}, "topic_map": { "astrophysics": ["2", "3", "4", "5", "6", "7"], "space": ["2", "3", "4", "5", "6", "7"], "onions": ["2", "3", "4", "5", "6", "7"], }, "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["space"], - "data": b"foobar", - "node_id": "1", - }, - { - "topics": ["onions"], - "data": b"I am allergic", - "node_id": "1", - } - ] + {"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"}, + {"topics": ["space"], "data": b"foobar", "node_id": "1"}, + {"topics": ["onions"], "data": b"I am allergic", "node_id": "1"}, + ], }, { "name": "seven_nodes_tree_three_topics_diff_origin", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"], - }, + "adj_list": {"1": ["2", "3"], "2": ["4", "5"], "3": ["6", "7"]}, "topic_map": { "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], "space": ["1", "2", "3", "4", "5", "6", "7"], "onions": ["1", "2", "3", "4", "5", "6", "7"], }, "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["space"], - "data": b"foobar", - "node_id": "4", - }, - { - "topics": ["onions"], - "data": b"I am allergic", - "node_id": "7", - } - ] + {"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"}, + {"topics": ["space"], "data": b"foobar", "node_id": "4"}, + {"topics": ["onions"], "data": b"I am allergic", "node_id": "7"}, + ], }, { "name": "three_nodes_clique_two_topic_diff_origin", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3"], - "2": ["3"], - }, - "topic_map": { - "astrophysics": ["1", "2", "3"], - "school": ["1", "2", "3"], - }, + "adj_list": {"1": ["2", "3"], "2": ["3"]}, + "topic_map": {"astrophysics": ["1", "2", "3"], "school": ["1", "2", "3"]}, "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic", - "node_id": "1", - } - ] + {"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"}, + {"topics": ["school"], "data": b"foobar", "node_id": "2"}, + {"topics": ["astrophysics"], "data": b"I am allergic", "node_id": "1"}, + ], }, { "name": "four_nodes_clique_two_topic_diff_origin_many_msgs", @@ -224,95 +110,33 @@ FLOODSUB_PROTOCOL_TEST_CASES = [ "school": ["1", "2", "3", "4"], }, "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar2", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar3", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic3", - "node_id": "1", - } - ] + {"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"}, + {"topics": ["school"], "data": b"foobar", "node_id": "2"}, + {"topics": ["astrophysics"], "data": b"I am allergic", "node_id": "1"}, + {"topics": ["school"], "data": b"foobar2", "node_id": "2"}, + {"topics": ["astrophysics"], "data": b"I am allergic2", "node_id": "1"}, + {"topics": ["school"], "data": b"foobar3", "node_id": "2"}, + {"topics": ["astrophysics"], "data": b"I am allergic3", "node_id": "1"}, + ], }, { "name": "five_nodes_ring_two_topic_diff_origin_many_msgs", "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2"], - "2": ["3"], - "3": ["4"], - "4": ["5"], - "5": ["1"], - }, + "adj_list": {"1": ["2"], "2": ["3"], "3": ["4"], "4": ["5"], "5": ["1"]}, "topic_map": { "astrophysics": ["1", "2", "3", "4", "5"], "school": ["1", "2", "3", "4", "5"], }, "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar2", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar3", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic3", - "node_id": "1", - } - ] - } + {"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"}, + {"topics": ["school"], "data": b"foobar", "node_id": "2"}, + {"topics": ["astrophysics"], "data": b"I am allergic", "node_id": "1"}, + {"topics": ["school"], "data": b"foobar2", "node_id": "2"}, + {"topics": ["astrophysics"], "data": b"I am allergic2", "node_id": "1"}, + {"topics": ["school"], "data": b"foobar3", "node_id": "2"}, + {"topics": ["astrophysics"], "data": b"I am allergic3", "node_id": "1"}, + ], + }, ] # pylint: disable=invalid-name @@ -420,12 +244,7 @@ async def perform_test_from_obj(obj, router_factory): # Publish message # TODO: Should be single RPC package with several topics for topic in topics: - tasks_publish.append( - pubsub_map[node_id].publish( - topic, - data, - ) - ) + tasks_publish.append(pubsub_map[node_id].publish(topic, data)) # For each topic in topics, add (topic, node_id, data) tuple to ordered test list for topic in topics: diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index 1efa579f..c2f22611 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -3,10 +3,7 @@ from threading import Thread import pytest -from tests.utils import ( - cleanup, - connect, -) +from tests.utils import cleanup, connect from .dummy_account_node import DummyAccountNode @@ -16,6 +13,7 @@ from .dummy_account_node import DummyAccountNode def create_setup_in_new_thread_func(dummy_node): def setup_in_new_thread(): asyncio.ensure_future(dummy_node.setup_crypto_networking()) + return setup_in_new_thread @@ -39,8 +37,9 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): for source_num in adjacency_map: target_nums = adjacency_map[source_num] for target_num in target_nums: - await connect(dummy_nodes[source_num].libp2p_node, \ - dummy_nodes[target_num].libp2p_node) + await connect( + dummy_nodes[source_num].libp2p_node, dummy_nodes[target_num].libp2p_node + ) # Allow time for network creation to take place await asyncio.sleep(0.25) @@ -142,6 +141,7 @@ async def test_set_then_send_from_root_seven_nodes_tree_topography(): await perform_test(num_nodes, adj_map, action_func, assertion_func) + @pytest.mark.asyncio async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography(): num_nodes = 7 @@ -158,6 +158,7 @@ async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography(): await perform_test(num_nodes, adj_map, action_func, assertion_func) + @pytest.mark.asyncio async def test_simple_five_nodes_ring_topography(): num_nodes = 5 @@ -171,6 +172,7 @@ async def test_simple_five_nodes_ring_topography(): await perform_test(num_nodes, adj_map, action_func, assertion_func) + @pytest.mark.asyncio async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): num_nodes = 5 @@ -187,6 +189,7 @@ async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): await perform_test(num_nodes, adj_map, action_func, assertion_func) + @pytest.mark.asyncio async def test_set_then_send_from_five_diff_nodes_five_nodes_ring_topography(): num_nodes = 5 diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index a5876b87..b15a7c79 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -7,15 +7,9 @@ from libp2p.peer.id import ID from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.pubsub import Pubsub -from tests.utils import ( - cleanup, - connect, -) +from tests.utils import cleanup, connect -from .configs import ( - FLOODSUB_PROTOCOL_ID, - LISTEN_MADDR, -) +from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR from .floodsub_integration_test_settings import ( perform_test_from_obj, floodsub_protocol_pytest_params, @@ -85,7 +79,9 @@ async def test_lru_cache_two_nodes(monkeypatch): def get_msg_id(msg): # Originally it is `(msg.seqno, msg.from_id)` return (msg.data, msg.from_id) + import libp2p.pubsub.pubsub + monkeypatch.setattr(libp2p.pubsub.pubsub, "get_msg_id", get_msg_id) # Initialize Pubsub with a cache_size of 4 @@ -104,7 +100,7 @@ async def test_lru_cache_two_nodes(monkeypatch): def _make_testing_data(i: int) -> bytes: num_int_bytes = 4 - if i >= 2**(num_int_bytes * 8): + if i >= 2 ** (num_int_bytes * 8): raise ValueError("integer is too large to be serialized") return b"data" + i.to_bytes(num_int_bytes, "big") @@ -121,13 +117,7 @@ async def test_lru_cache_two_nodes(monkeypatch): await cleanup() -@pytest.mark.parametrize( - "test_case_obj", - floodsub_protocol_pytest_params, -) +@pytest.mark.parametrize("test_case_obj", floodsub_protocol_pytest_params) @pytest.mark.asyncio async def test_gossipsub_run_with_floodsub_tests(test_case_obj): - await perform_test_from_obj( - test_case_obj, - FloodSub, - ) + await perform_test_from_obj(test_case_obj, FloodSub) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index e0fed92d..58ed0e16 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -3,10 +3,7 @@ import random import pytest -from tests.utils import ( - cleanup, - connect, -) +from tests.utils import cleanup, connect from .configs import GOSSIPSUB_PROTOCOL_ID from .utils import ( @@ -28,9 +25,9 @@ async def test_join(): libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 4, 3, 5, 30, 3, 5, 0.5) + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances( + libp2p_hosts, SUPPORTED_PROTOCOLS, 4, 3, 5, 30, 3, 5, 0.5 + ) topic = "test_join" central_node_index = 0 @@ -69,10 +66,19 @@ async def test_join(): for i in hosts_indices: if i in subscribed_peer_indices: - assert str(libp2p_hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic] - assert str(libp2p_hosts[central_node_index].get_id()) in gossipsubs[i].mesh[topic] + assert ( + str(libp2p_hosts[i].get_id()) + in gossipsubs[central_node_index].mesh[topic] + ) + assert ( + str(libp2p_hosts[central_node_index].get_id()) + in gossipsubs[i].mesh[topic] + ) else: - assert str(libp2p_hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic] + assert ( + str(libp2p_hosts[i].get_id()) + not in gossipsubs[central_node_index].mesh[topic] + ) assert topic not in gossipsubs[i].mesh await cleanup() @@ -84,9 +90,9 @@ async def test_leave(): libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 0.5) + _, gossipsubs = create_pubsub_and_gossipsub_instances( + libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 + ) gossipsub = gossipsubs[0] topic = "test_leave" @@ -111,9 +117,9 @@ async def test_handle_graft(event_loop, monkeypatch): libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 0.5) + _, gossipsubs = create_pubsub_and_gossipsub_instances( + libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 + ) index_alice = 0 id_alice = str(libp2p_hosts[index_alice].get_id()) @@ -131,10 +137,11 @@ async def test_handle_graft(event_loop, monkeypatch): # Monkey patch bob's `emit_prune` function so we can # check if it is called in `handle_graft` event_emit_prune = asyncio.Event() + async def emit_prune(topic, sender_peer_id): event_emit_prune.set() - monkeypatch.setattr(gossipsubs[index_bob], 'emit_prune', emit_prune) + monkeypatch.setattr(gossipsubs[index_bob], "emit_prune", emit_prune) # Check that alice is bob's peer but not his mesh peer assert id_alice in gossipsubs[index_bob].peers_gossipsub @@ -143,11 +150,7 @@ async def test_handle_graft(event_loop, monkeypatch): await gossipsubs[index_alice].emit_graft(topic, id_bob) # Check that `emit_prune` is called - await asyncio.wait_for( - event_emit_prune.wait(), - timeout=1, - loop=event_loop, - ) + await asyncio.wait_for(event_emit_prune.wait(), timeout=1, loop=event_loop) assert event_emit_prune.is_set() # Check that bob is alice's peer but not her mesh peer @@ -171,9 +174,9 @@ async def test_handle_prune(): libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 3) + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances( + libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 3 + ) index_alice = 0 id_alice = str(libp2p_hosts[index_alice].get_id()) @@ -217,9 +220,9 @@ async def test_dense(): libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 0.5) + pubsubs, _ = create_pubsub_and_gossipsub_instances( + libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 + ) # All pubsub subscribe to foobar queues = [] @@ -236,7 +239,7 @@ async def test_dense(): await asyncio.sleep(2) for i in range(num_msgs): - msg_content = b"foo " + i.to_bytes(1, 'big') + msg_content = b"foo " + i.to_bytes(1, "big") # randomly pick a message origin origin_idx = random.randint(0, num_hosts - 1) @@ -260,9 +263,9 @@ async def test_fanout(): libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 0.5) + pubsubs, _ = create_pubsub_and_gossipsub_instances( + libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 + ) # All pubsub subscribe to foobar except for `pubsubs[0]` queues = [] @@ -300,7 +303,7 @@ async def test_fanout(): # Send messages again for i in range(num_msgs): - msg_content = b"bar " + i.to_bytes(1, 'big') + msg_content = b"bar " + i.to_bytes(1, "big") # Pick the message origin to the node that is not subscribed to 'foobar' origin_idx = 0 @@ -316,6 +319,7 @@ async def test_fanout(): await cleanup() + @pytest.mark.asyncio async def test_fanout_maintenance(): # Create libp2p hosts @@ -324,9 +328,9 @@ async def test_fanout_maintenance(): libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 0.5) + pubsubs, _ = create_pubsub_and_gossipsub_instances( + libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 + ) # All pubsub subscribe to foobar queues = [] @@ -345,7 +349,7 @@ async def test_fanout_maintenance(): # Send messages with origin not subscribed for i in range(num_msgs): - msg_content = b"foo " + i.to_bytes(1, 'big') + msg_content = b"foo " + i.to_bytes(1, "big") # Pick the message origin to the node that is not subscribed to 'foobar' origin_idx = 0 @@ -377,7 +381,7 @@ async def test_fanout_maintenance(): # Check messages can still be sent for i in range(num_msgs): - msg_content = b"bar " + i.to_bytes(1, 'big') + msg_content = b"bar " + i.to_bytes(1, "big") # Pick the message origin to the node that is not subscribed to 'foobar' origin_idx = 0 @@ -402,22 +406,14 @@ async def test_gossip_propagation(): # Create pubsub, gossipsub instances pubsubs, _ = create_pubsub_and_gossipsub_instances( - hosts, - SUPPORTED_PROTOCOLS, - 1, - 0, - 2, - 30, - 50, - 100, - 0.5, + hosts, SUPPORTED_PROTOCOLS, 1, 0, 2, 30, 50, 100, 0.5 ) topic = "foo" await pubsubs[0].subscribe(topic) # node 0 publish to topic - msg_content = b'foo_msg' + msg_content = b"foo_msg" # publish from the randomly chosen host await pubsubs[0].publish(topic, msg_content) diff --git a/tests/pubsub/test_gossipsub_backward_compatibility.py b/tests/pubsub/test_gossipsub_backward_compatibility.py index 2a3167bf..855a0ccd 100644 --- a/tests/pubsub/test_gossipsub_backward_compatibility.py +++ b/tests/pubsub/test_gossipsub_backward_compatibility.py @@ -8,10 +8,7 @@ from libp2p.pubsub.pubsub import Pubsub from tests.utils import cleanup -from .configs import ( - FLOODSUB_PROTOCOL_ID, - LISTEN_MADDR, -) +from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR from .floodsub_integration_test_settings import ( perform_test_from_obj, floodsub_protocol_pytest_params, @@ -34,19 +31,12 @@ async def test_gossipsub_initialize_with_floodsub_protocol(): await cleanup() -@pytest.mark.parametrize( - "test_case_obj", - floodsub_protocol_pytest_params, -) +@pytest.mark.parametrize("test_case_obj", floodsub_protocol_pytest_params) @pytest.mark.asyncio async def test_gossipsub_run_with_floodsub_tests(test_case_obj): await perform_test_from_obj( test_case_obj, functools.partial( - GossipSub, - degree=3, - degree_low=2, - degree_high=4, - time_to_live=30, - ) + GossipSub, degree=3, degree_low=2, degree_high=4, time_to_live=30 + ), ) diff --git a/tests/pubsub/test_mcache.py b/tests/pubsub/test_mcache.py index ebfe9570..7ef0c918 100644 --- a/tests/pubsub/test_mcache.py +++ b/tests/pubsub/test_mcache.py @@ -33,7 +33,7 @@ async def test_mcache(): # successful read assert get_msg == msg - gids = mcache.window('test') + gids = mcache.window("test") assert len(gids) == 10 @@ -55,7 +55,7 @@ async def test_mcache(): assert get_msg == msg - gids = mcache.window('test') + gids = mcache.window("test") assert len(gids) == 20 @@ -108,7 +108,7 @@ async def test_mcache(): assert get_msg == msg - gids = mcache.window('test') + gids = mcache.window("test") assert len(gids) == 30 diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 874922ed..f7023ada 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -7,23 +7,16 @@ import pytest from libp2p.peer.id import ID from libp2p.pubsub.pb import rpc_pb2 -from tests.utils import ( - connect, -) +from tests.utils import connect -from .utils import ( - make_pubsub_msg, -) +from .utils import make_pubsub_msg TESTING_TOPIC = "TEST_SUBSCRIBE" TESTING_DATA = b"data" -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_subscribe_and_unsubscribe(pubsubs_fsub): await pubsubs_fsub[0].subscribe(TESTING_TOPIC) @@ -33,10 +26,7 @@ async def test_subscribe_and_unsubscribe(pubsubs_fsub): assert TESTING_TOPIC not in pubsubs_fsub[0].my_topics -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_re_subscribe(pubsubs_fsub): await pubsubs_fsub[0].subscribe(TESTING_TOPIC) @@ -46,10 +36,7 @@ async def test_re_subscribe(pubsubs_fsub): assert TESTING_TOPIC in pubsubs_fsub[0].my_topics -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_re_unsubscribe(pubsubs_fsub): # Unsubscribe from topic we didn't even subscribe to @@ -80,19 +67,13 @@ async def test_peers_subscribe(pubsubs_fsub): assert str(pubsubs_fsub[0].my_id) not in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_get_hello_packet(pubsubs_fsub): def _get_hello_packet_topic_ids(): packet = rpc_pb2.RPC() packet.ParseFromString(pubsubs_fsub[0].get_hello_packet()) - return tuple( - sub.topicid - for sub in packet.subscriptions - ) + return tuple(sub.topicid for sub in packet.subscriptions) # pylint: disable=len-as-condition # Test: No subscription, so there should not be any topic ids in the hello packet. @@ -100,10 +81,7 @@ async def test_get_hello_packet(pubsubs_fsub): # Test: After subscriptions, topic ids should be in the hello packet. topic_ids = ["t", "o", "p", "i", "c"] - await asyncio.gather(*[ - pubsubs_fsub[0].subscribe(topic) - for topic in topic_ids - ]) + await asyncio.gather(*[pubsubs_fsub[0].subscribe(topic) for topic in topic_ids]) topic_ids_in_hello = _get_hello_packet_topic_ids() for topic in topic_ids: assert topic in topic_ids_in_hello @@ -128,14 +106,11 @@ class FakeNetStream: async def write(self, data: bytes) -> int: for i in data: - await self._queue.put(i.to_bytes(1, 'big')) + await self._queue.put(i.to_bytes(1, "big")) return len(data) -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): stream = FakeNetStream() @@ -156,7 +131,9 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): event_handle_rpc.set() monkeypatch.setattr(pubsubs_fsub[0], "push_msg", mock_push_msg) - monkeypatch.setattr(pubsubs_fsub[0], "handle_subscription", mock_handle_subscription) + monkeypatch.setattr( + pubsubs_fsub[0], "handle_subscription", mock_handle_subscription + ) monkeypatch.setattr(pubsubs_fsub[0].router, "handle_rpc", mock_handle_rpc) async def wait_for_event_occurring(event): @@ -176,9 +153,7 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): # Test: `push_msg` is called when publishing to a subscribed topic. publish_subscribed_topic = rpc_pb2.RPC( - publish=[rpc_pb2.Message( - topicIDs=[TESTING_TOPIC] - )], + publish=[rpc_pb2.Message(topicIDs=[TESTING_TOPIC])] ) await stream.write(publish_subscribed_topic.SerializeToString()) await wait_for_event_occurring(event_push_msg) @@ -190,18 +165,14 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): # Test: `push_msg` is not called when publishing to a topic-not-subscribed. publish_not_subscribed_topic = rpc_pb2.RPC( - publish=[rpc_pb2.Message( - topicIDs=["NOT_SUBSCRIBED"] - )], + publish=[rpc_pb2.Message(topicIDs=["NOT_SUBSCRIBED"])] ) await stream.write(publish_not_subscribed_topic.SerializeToString()) with pytest.raises(asyncio.TimeoutError): await wait_for_event_occurring(event_push_msg) # Test: `handle_subscription` is called when a subscription message is received. - subscription_msg = rpc_pb2.RPC( - subscriptions=[rpc_pb2.RPC.SubOpts()], - ) + subscription_msg = rpc_pb2.RPC(subscriptions=[rpc_pb2.RPC.SubOpts()]) await stream.write(subscription_msg.SerializeToString()) await wait_for_event_occurring(event_handle_subscription) # Make sure the other events are not emitted. @@ -229,23 +200,17 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): # - `test_handle_peer_queue` -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) def test_handle_subscription(pubsubs_fsub): assert len(pubsubs_fsub[0].peer_topics) == 0 - sub_msg_0 = rpc_pb2.RPC.SubOpts( - subscribe=True, - topicid=TESTING_TOPIC, - ) - peer_ids = [ - ID(b"\x12\x20" + i.to_bytes(32, "big")) - for i in range(2) - ] + sub_msg_0 = rpc_pb2.RPC.SubOpts(subscribe=True, topicid=TESTING_TOPIC) + peer_ids = [ID(b"\x12\x20" + i.to_bytes(32, "big")) for i in range(2)] # Test: One peer is subscribed pubsubs_fsub[0].handle_subscription(peer_ids[0], sub_msg_0) - assert len(pubsubs_fsub[0].peer_topics) == 1 and TESTING_TOPIC in pubsubs_fsub[0].peer_topics + assert ( + len(pubsubs_fsub[0].peer_topics) == 1 + and TESTING_TOPIC in pubsubs_fsub[0].peer_topics + ) assert len(pubsubs_fsub[0].peer_topics[TESTING_TOPIC]) == 1 assert str(peer_ids[0]) in pubsubs_fsub[0].peer_topics[TESTING_TOPIC] # Test: Another peer is subscribed @@ -255,27 +220,18 @@ def test_handle_subscription(pubsubs_fsub): assert str(peer_ids[1]) in pubsubs_fsub[0].peer_topics[TESTING_TOPIC] # Test: Subscribe to another topic another_topic = "ANOTHER_TOPIC" - sub_msg_1 = rpc_pb2.RPC.SubOpts( - subscribe=True, - topicid=another_topic, - ) + sub_msg_1 = rpc_pb2.RPC.SubOpts(subscribe=True, topicid=another_topic) pubsubs_fsub[0].handle_subscription(peer_ids[0], sub_msg_1) assert len(pubsubs_fsub[0].peer_topics) == 2 assert another_topic in pubsubs_fsub[0].peer_topics assert str(peer_ids[0]) in pubsubs_fsub[0].peer_topics[another_topic] # Test: unsubscribe - unsub_msg = rpc_pb2.RPC.SubOpts( - subscribe=False, - topicid=TESTING_TOPIC, - ) + unsub_msg = rpc_pb2.RPC.SubOpts(subscribe=False, topicid=TESTING_TOPIC) pubsubs_fsub[0].handle_subscription(peer_ids[0], unsub_msg) assert str(peer_ids[0]) not in pubsubs_fsub[0].peer_topics[TESTING_TOPIC] -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_handle_talk(pubsubs_fsub): sub = await pubsubs_fsub[0].subscribe(TESTING_TOPIC) @@ -293,25 +249,19 @@ async def test_handle_talk(pubsubs_fsub): seqno=b"\x11" * 8, ) await pubsubs_fsub[0].handle_talk(msg_1) - assert len(pubsubs_fsub[0].my_topics) == 1 and sub == pubsubs_fsub[0].my_topics[TESTING_TOPIC] + assert ( + len(pubsubs_fsub[0].my_topics) == 1 + and sub == pubsubs_fsub[0].my_topics[TESTING_TOPIC] + ) assert sub.qsize() == 1 assert (await sub.get()) == msg_0 -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_message_all_peers(pubsubs_fsub, monkeypatch): - peer_ids = [ - ID(b"\x12\x20" + i.to_bytes(32, "big")) - for i in range(10) - ] - mock_peers = { - str(peer_id): FakeNetStream() - for peer_id in peer_ids - } + peer_ids = [ID(b"\x12\x20" + i.to_bytes(32, "big")) for i in range(10)] + mock_peers = {str(peer_id): FakeNetStream() for peer_id in peer_ids} monkeypatch.setattr(pubsubs_fsub[0], "peers", mock_peers) empty_rpc = rpc_pb2.RPC() @@ -320,10 +270,7 @@ async def test_message_all_peers(pubsubs_fsub, monkeypatch): assert (await stream.read()) == empty_rpc.SerializeToString() -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_publish(pubsubs_fsub, monkeypatch): msg_forwarders = [] @@ -332,20 +279,20 @@ async def test_publish(pubsubs_fsub, monkeypatch): async def push_msg(msg_forwarder, msg): msg_forwarders.append(msg_forwarder) msgs.append(msg) + monkeypatch.setattr(pubsubs_fsub[0], "push_msg", push_msg) await pubsubs_fsub[0].publish(TESTING_TOPIC, TESTING_DATA) await pubsubs_fsub[0].publish(TESTING_TOPIC, TESTING_DATA) assert len(msgs) == 2, "`push_msg` should be called every time `publish` is called" - assert (msg_forwarders[0] == msg_forwarders[1]) and (msg_forwarders[1] == pubsubs_fsub[0].my_id) + assert (msg_forwarders[0] == msg_forwarders[1]) and ( + msg_forwarders[1] == pubsubs_fsub[0].my_id + ) assert msgs[0].seqno != msgs[1].seqno, "`seqno` should be different every time" -@pytest.mark.parametrize( - "num_hosts", - (1,), -) +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio async def test_push_msg(pubsubs_fsub, monkeypatch): # pylint: disable=protected-access @@ -360,6 +307,7 @@ async def test_push_msg(pubsubs_fsub, monkeypatch): async def router_publish(*args, **kwargs): event.set() + monkeypatch.setattr(pubsubs_fsub[0].router, "publish", router_publish) # Test: `msg` is not seen before `push_msg`, and is seen after `push_msg`. diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index b83c854c..c74d7336 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,8 +1,6 @@ import asyncio import struct -from typing import ( - Sequence, -) +from typing import Sequence import multiaddr @@ -33,21 +31,16 @@ def message_id_generator(start_val): val += 1 # Convert val to big endian - return struct.pack('>Q', val) + return struct.pack(">Q", val) return generator def make_pubsub_msg( - origin_id: ID, - topic_ids: Sequence[str], - data: bytes, - seqno: bytes) -> rpc_pb2.Message: + origin_id: ID, topic_ids: Sequence[str], data: bytes, seqno: bytes +) -> rpc_pb2.Message: return rpc_pb2.Message( - from_id=origin_id.to_bytes(), - seqno=seqno, - data=data, - topicIDs=list(topic_ids), + from_id=origin_id.to_bytes(), seqno=seqno, data=data, topicIDs=list(topic_ids) ) @@ -61,13 +54,13 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): """ packet = rpc_pb2.RPC() message = rpc_pb2.Message( - from_id=origin_id.encode('utf-8'), + from_id=origin_id.encode("utf-8"), seqno=msg_id, - data=msg_content.encode('utf-8'), + data=msg_content.encode("utf-8"), ) for topic in topics: - message.topicIDs.extend([topic.encode('utf-8')]) + message.topicIDs.extend([topic.encode("utf-8")]) packet.publish.extend([message]) return packet @@ -95,22 +88,29 @@ async def create_libp2p_hosts(num_hosts): def create_pubsub_and_gossipsub_instances( - libp2p_hosts, - supported_protocols, - degree, - degree_low, - degree_high, - time_to_live, - gossip_window, - gossip_history, - heartbeat_interval): + libp2p_hosts, + supported_protocols, + degree, + degree_low, + degree_high, + time_to_live, + gossip_window, + gossip_history, + heartbeat_interval, +): pubsubs = [] gossipsubs = [] for node in libp2p_hosts: - gossipsub = GossipSub(supported_protocols, degree, - degree_low, degree_high, time_to_live, - gossip_window, gossip_history, - heartbeat_interval) + gossipsub = GossipSub( + supported_protocols, + degree, + degree_low, + degree_high, + time_to_live, + gossip_window, + gossip_history, + heartbeat_interval, + ) pubsub = Pubsub(node, gossipsub, node.get_id()) pubsubs.append(pubsub) gossipsubs.append(gossipsub) @@ -121,6 +121,7 @@ def create_pubsub_and_gossipsub_instances( # FIXME: There is no difference between `sparse_connect` and `dense_connect`, # before `connect_some` is fixed. + async def sparse_connect(hosts): await connect_some(hosts, 3) diff --git a/tests/routing/test_kad_peer_router.py b/tests/routing/test_kad_peer_router.py index 7f639fbc..0581544b 100644 --- a/tests/routing/test_kad_peer_router.py +++ b/tests/routing/test_kad_peer_router.py @@ -3,6 +3,7 @@ import pytest from libp2p.kademlia.network import KademliaServer from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter + @pytest.mark.asyncio async def test_simple_two_nodes(): node_a = KademliaServer() @@ -13,8 +14,7 @@ async def test_simple_two_nodes(): node_a_value = await node_b.bootstrap([("127.0.0.1", 5678)]) node_a_kad_peerinfo = node_a_value[0] - await node_a.set(node_a_kad_peerinfo.xor_id, - repr(node_a_kad_peerinfo)) + await node_a.set(node_a_kad_peerinfo.xor_id, repr(node_a_kad_peerinfo)) router = KadmeliaPeerRouter(node_b) returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj) @@ -22,6 +22,7 @@ async def test_simple_two_nodes(): print(repr(node_a_kad_peerinfo)) assert repr(returned_info) == repr(node_a_kad_peerinfo) + @pytest.mark.asyncio async def test_simple_three_nodes(): node_a = KademliaServer() @@ -37,13 +38,13 @@ async def test_simple_three_nodes(): node_a_kad_peerinfo = node_a_value[0] await node_c.bootstrap([("127.0.0.1", 5702)]) - await node_a.set(node_a_kad_peerinfo.xor_id, - repr(node_a_kad_peerinfo)) + await node_a.set(node_a_kad_peerinfo.xor_id, repr(node_a_kad_peerinfo)) router = KadmeliaPeerRouter(node_c) returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj) assert str(returned_info) == str(node_a_kad_peerinfo) + @pytest.mark.asyncio async def test_simple_four_nodes(): node_a = KademliaServer() @@ -65,8 +66,7 @@ async def test_simple_four_nodes(): await node_d.bootstrap([("127.0.0.1", 5803)]) - await node_b.set(node_a_kad_peerinfo.xor_id, - repr(node_a_kad_peerinfo)) + await node_b.set(node_a_kad_peerinfo.xor_id, repr(node_a_kad_peerinfo)) router = KadmeliaPeerRouter(node_d) returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj) diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index ed1ad299..fe9409d3 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -12,11 +12,13 @@ from tests.utils import cleanup # TODO: Add tests for multiple streams being opened on different # protocols through the same connection + def peer_id_for_node(node): addr = node.get_addrs()[0] info = info_from_p2p_addr(addr) return info.peer_id + async def connect(node1, node2): """ Connect node1 to node2 @@ -25,8 +27,10 @@ async def connect(node1, node2): info = info_from_p2p_addr(addr) await node1.connect(info) -async def perform_simple_test(assertion_func, \ - transports_for_initiator, transports_for_noninitiator): + +async def perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator +): # Create libp2p nodes and connect them, then secure the connection, then check # the proper security was chosen @@ -69,8 +73,10 @@ async def test_single_insecure_security_transport_succeeds(): def assertion_func(details): assert details["id"] == "foo" - await perform_simple_test(assertion_func, - transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) + @pytest.mark.asyncio async def test_single_simple_test_security_transport_succeeds(): @@ -80,63 +86,84 @@ async def test_single_simple_test_security_transport_succeeds(): def assertion_func(details): assert details["key_phrase"] == "tacos" - await perform_simple_test(assertion_func, - transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) + @pytest.mark.asyncio async def test_two_simple_test_security_transport_for_initiator_succeeds(): - transports_for_initiator = {"tacos": SimpleSecurityTransport("tacos"), - "shleep": SimpleSecurityTransport("shleep")} + transports_for_initiator = { + "tacos": SimpleSecurityTransport("tacos"), + "shleep": SimpleSecurityTransport("shleep"), + } transports_for_noninitiator = {"shleep": SimpleSecurityTransport("shleep")} def assertion_func(details): assert details["key_phrase"] == "shleep" - await perform_simple_test(assertion_func, - transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) + @pytest.mark.asyncio async def test_two_simple_test_security_transport_for_noninitiator_succeeds(): transports_for_initiator = {"tacos": SimpleSecurityTransport("tacos")} - transports_for_noninitiator = {"shleep": SimpleSecurityTransport("shleep"), - "tacos": SimpleSecurityTransport("tacos")} + transports_for_noninitiator = { + "shleep": SimpleSecurityTransport("shleep"), + "tacos": SimpleSecurityTransport("tacos"), + } def assertion_func(details): assert details["key_phrase"] == "tacos" - await perform_simple_test(assertion_func, - transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) @pytest.mark.asyncio async def test_two_simple_test_security_transport_for_both_succeeds(): - transports_for_initiator = {"a": SimpleSecurityTransport("a"), - "b": SimpleSecurityTransport("b")} - transports_for_noninitiator = {"c": SimpleSecurityTransport("c"), - "b": SimpleSecurityTransport("b")} + transports_for_initiator = { + "a": SimpleSecurityTransport("a"), + "b": SimpleSecurityTransport("b"), + } + transports_for_noninitiator = { + "c": SimpleSecurityTransport("c"), + "b": SimpleSecurityTransport("b"), + } def assertion_func(details): assert details["key_phrase"] == "b" - await perform_simple_test(assertion_func, - transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) + @pytest.mark.asyncio async def test_multiple_security_none_the_same_fails(): - transports_for_initiator = {"a": SimpleSecurityTransport("a"), - "b": SimpleSecurityTransport("b")} - transports_for_noninitiator = {"c": SimpleSecurityTransport("c"), - "d": SimpleSecurityTransport("d")} + transports_for_initiator = { + "a": SimpleSecurityTransport("a"), + "b": SimpleSecurityTransport("b"), + } + transports_for_noninitiator = { + "c": SimpleSecurityTransport("c"), + "d": SimpleSecurityTransport("d"), + } def assertion_func(_): assert False with pytest.raises(MultiselectClientError): - await perform_simple_test(assertion_func, - transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) await cleanup() + @pytest.mark.asyncio async def test_default_insecure_security(): transports_for_initiator = None @@ -155,5 +182,6 @@ async def test_default_insecure_security(): else: assert details1 == details2 - await perform_simple_test(assertion_func, - transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) diff --git a/tests/test_example.py b/tests/test_example.py index 034e1e5e..a31fa64e 100644 --- a/tests/test_example.py +++ b/tests/test_example.py @@ -2,18 +2,12 @@ import pytest # pylint: disable=eval-used -@pytest.mark.parametrize("test_input,expected", [ - ("3+5", 8), - ("2+4", 6), - ("6*9", 54), -]) + +@pytest.mark.parametrize("test_input,expected", [("3+5", 8), ("2+4", 6), ("6*9", 54)]) def test_eval(test_input, expected): assert eval(test_input) == expected -@pytest.mark.parametrize("test_input,expected", [ - ("3+5", 8), - ("2+4", 6), - ("6*5", 30), -]) + +@pytest.mark.parametrize("test_input,expected", [("3+5", 8), ("2+4", 6), ("6*5", 30)]) def test_eval2(test_input, expected): assert eval(test_input) == expected diff --git a/tests/transport/test_tcp.py b/tests/transport/test_tcp.py index 38229262..7231a060 100644 --- a/tests/transport/test_tcp.py +++ b/tests/transport/test_tcp.py @@ -10,11 +10,11 @@ async def test_multiaddr_from_socket(): def handler(r, w): pass - server = await asyncio.start_server(handler, '127.0.0.1', 8000) - assert str(_multiaddr_from_socket(server.sockets[0])) == '/ip4/127.0.0.1/tcp/8000' + server = await asyncio.start_server(handler, "127.0.0.1", 8000) + assert str(_multiaddr_from_socket(server.sockets[0])) == "/ip4/127.0.0.1/tcp/8000" - server = await asyncio.start_server(handler, '127.0.0.1', 0) + server = await asyncio.start_server(handler, "127.0.0.1", 0) addr = _multiaddr_from_socket(server.sockets[0]) - assert addr.value_for_protocol('ip4') == '127.0.0.1' - port = addr.value_for_protocol('tcp') + assert addr.value_for_protocol("ip4") == "127.0.0.1" + port = addr.value_for_protocol("tcp") assert int(port) > 0 diff --git a/tests/utils.py b/tests/utils.py index 6e2d64ca..cc914c04 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -36,6 +36,7 @@ async def set_up_nodes_by_transport_opt(transport_opt_list): nodes_list.append(node) return tuple(nodes_list) + async def echo_stream_handler(stream): while True: read_string = (await stream.read()).decode() @@ -43,6 +44,7 @@ async def echo_stream_handler(stream): resp = "ack:" + read_string await stream.write(resp.encode()) + async def perform_two_host_set_up_custom_handler(handler): transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list)