diff --git a/Makefile b/Makefile index 59422bfd..da54ffcf 100644 --- a/Makefile +++ b/Makefile @@ -9,11 +9,13 @@ all: protobufs format: black $(FILES_TO_LINT) isort --recursive $(FILES_TO_LINT) + docformatter -ir --pre-summary-newline $(FILES_TO_LINT) lintroll: mypy -p libp2p -p examples --config-file mypy.ini black --check $(FILES_TO_LINT) isort --recursive --check-only $(FILES_TO_LINT) + docformatter --pre-summary-newline --check --recursive $(FILES_TO_LINT) flake8 $(FILES_TO_LINT) protobufs: $(PY) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 24cff711..88ae3b54 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -25,9 +25,7 @@ from libp2p.typing import TProtocol async def cleanup_done_tasks() -> None: - """ - clean up asyncio done tasks to free up resources - """ + """clean up asyncio done tasks to free up resources.""" while True: for task in asyncio.all_tasks(): if task.done(): @@ -51,7 +49,8 @@ def initialize_default_kademlia_router( ksize: int = 20, alpha: int = 3, id_opt: ID = None, storage: IStorage = None ) -> KadmeliaPeerRouter: """ - initialize kadmelia router when no kademlia router is passed in + initialize kadmelia router when no kademlia router is passed in. + :param ksize: The k parameter from the paper :param alpha: The alpha parameter from the paper :param id_opt: optional id for host @@ -81,7 +80,8 @@ def initialize_default_swarm( disc_opt: IPeerRouting = None, ) -> Swarm: """ - initialize swarm when no swarm is passed in + initialize swarm when no swarm is passed in. + :param id_opt: optional id for host :param transport_opt: optional choice of transport upgrade :param muxer_opt: optional choice of stream muxer @@ -121,7 +121,8 @@ async def new_node( disc_opt: IPeerRouting = None, ) -> IHost: """ - create new libp2p node + create new libp2p node. + :param key_pair: key pair for deriving an identity :param swarm_opt: optional swarm :param id_opt: optional id for host diff --git a/libp2p/crypto/authenticated_encryption.py b/libp2p/crypto/authenticated_encryption.py index 6900f1f4..d931f273 100644 --- a/libp2p/crypto/authenticated_encryption.py +++ b/libp2p/crypto/authenticated_encryption.py @@ -61,12 +61,9 @@ class MacAndCipher: def initialize_pair( cipher_type: str, hash_type: str, secret: bytes ) -> Tuple[EncryptionParameters, EncryptionParameters]: - """ - Return a pair of ``Keys`` for use in securing a - communications channel with authenticated encryption - derived from the ``secret`` and using the - requested ``cipher_type`` and ``hash_type``. - """ + """Return a pair of ``Keys`` for use in securing a communications channel + with authenticated encryption derived from the ``secret`` and using the + requested ``cipher_type`` and ``hash_type``.""" if cipher_type != "AES-128": raise NotImplementedError() if hash_type != "SHA256": diff --git a/libp2p/crypto/ecc.py b/libp2p/crypto/ecc.py index 10aed18c..0559bf46 100644 --- a/libp2p/crypto/ecc.py +++ b/libp2p/crypto/ecc.py @@ -6,10 +6,8 @@ from libp2p.crypto.keys import KeyPair, KeyType, PrivateKey, PublicKey def infer_local_type(curve: str) -> curve_types.Curve: - """ - converts a ``str`` representation of some elliptic curve to - a representation understood by the backend of this module. - """ + """converts a ``str`` representation of some elliptic curve to a + representation understood by the backend of this module.""" if curve == "P-256": return curve_types.P256 else: @@ -63,9 +61,8 @@ class ECCPrivateKey(PrivateKey): def create_new_key_pair(curve: str) -> KeyPair: - """ - Return a new ECC keypair with the requested ``curve`` type, e.g. "P-256". - """ + """Return a new ECC keypair with the requested ``curve`` type, e.g. + "P-256".""" private_key = ECCPrivateKey.new(curve) public_key = private_key.get_public_key() return KeyPair(private_key, public_key) diff --git a/libp2p/crypto/exceptions.py b/libp2p/crypto/exceptions.py index a9324df2..fdb049c2 100644 --- a/libp2p/crypto/exceptions.py +++ b/libp2p/crypto/exceptions.py @@ -6,9 +6,7 @@ class CryptographyError(BaseLibp2pError): class MissingDeserializerError(CryptographyError): - """ - Raise if the requested deserialization routine is missing for - some type of cryptographic key. - """ + """Raise if the requested deserialization routine is missing for some type + of cryptographic key.""" pass diff --git a/libp2p/crypto/key_exchange.py b/libp2p/crypto/key_exchange.py index 3200df40..17ec75ab 100644 --- a/libp2p/crypto/key_exchange.py +++ b/libp2p/crypto/key_exchange.py @@ -9,9 +9,7 @@ SharedKeyGenerator = Callable[[bytes], bytes] def create_ephemeral_key_pair(curve_type: str) -> Tuple[PublicKey, SharedKeyGenerator]: - """ - Facilitates ECDH key exchange. - """ + """Facilitates ECDH key exchange.""" if curve_type != "P-256": raise NotImplementedError() diff --git a/libp2p/crypto/keys.py b/libp2p/crypto/keys.py index 5bcc5a37..dff8780a 100644 --- a/libp2p/crypto/keys.py +++ b/libp2p/crypto/keys.py @@ -15,22 +15,16 @@ class KeyType(Enum): class Key(ABC): - """ - A ``Key`` represents a cryptographic key. - """ + """A ``Key`` represents a cryptographic key.""" @abstractmethod def to_bytes(self) -> bytes: - """ - Returns the byte representation of this key. - """ + """Returns the byte representation of this key.""" ... @abstractmethod def get_type(self) -> KeyType: - """ - Returns the ``KeyType`` for ``self``. - """ + """Returns the ``KeyType`` for ``self``.""" ... def __eq__(self, other: object) -> bool: @@ -40,30 +34,23 @@ class Key(ABC): class PublicKey(Key): - """ - A ``PublicKey`` represents a cryptographic public key. - """ + """A ``PublicKey`` represents a cryptographic public key.""" @abstractmethod def verify(self, data: bytes, signature: bytes) -> bool: - """ - Verify that ``signature`` is the cryptographic signature of the hash of ``data``. - """ + """Verify that ``signature`` is the cryptographic signature of the hash + of ``data``.""" ... def _serialize_to_protobuf(self) -> protobuf.PublicKey: - """ - Return the protobuf representation of this ``Key``. - """ + """Return the protobuf representation of this ``Key``.""" key_type = self.get_type().value data = self.to_bytes() protobuf_key = protobuf.PublicKey(key_type=key_type, data=data) return protobuf_key def serialize(self) -> bytes: - """ - Return the canonical serialization of this ``Key``. - """ + """Return the canonical serialization of this ``Key``.""" return self._serialize_to_protobuf().SerializeToString() @classmethod @@ -72,9 +59,7 @@ class PublicKey(Key): class PrivateKey(Key): - """ - A ``PrivateKey`` represents a cryptographic private key. - """ + """A ``PrivateKey`` represents a cryptographic private key.""" @abstractmethod def sign(self, data: bytes) -> bytes: @@ -85,18 +70,14 @@ class PrivateKey(Key): ... def _serialize_to_protobuf(self) -> protobuf.PrivateKey: - """ - Return the protobuf representation of this ``Key``. - """ + """Return the protobuf representation of this ``Key``.""" key_type = self.get_type().value data = self.to_bytes() protobuf_key = protobuf.PrivateKey(key_type=key_type, data=data) return protobuf_key def serialize(self) -> bytes: - """ - Return the canonical serialization of this ``Key``. - """ + """Return the canonical serialization of this ``Key``.""" return self._serialize_to_protobuf().SerializeToString() @classmethod diff --git a/libp2p/crypto/rsa.py b/libp2p/crypto/rsa.py index 9b788cad..ec636cae 100644 --- a/libp2p/crypto/rsa.py +++ b/libp2p/crypto/rsa.py @@ -57,8 +57,10 @@ class RSAPrivateKey(PrivateKey): def create_new_key_pair(bits: int = 2048, e: int = 65537) -> KeyPair: """ - Returns a new RSA keypair with the requested key size (``bits``) and the given public - exponent ``e``. Sane defaults are provided for both values. + Returns a new RSA keypair with the requested key size (``bits``) and the + given public exponent ``e``. + + Sane defaults are provided for both values. """ private_key = RSAPrivateKey.new(bits, e) public_key = private_key.get_public_key() diff --git a/libp2p/crypto/secp256k1.py b/libp2p/crypto/secp256k1.py index 475c1673..aabb153a 100644 --- a/libp2p/crypto/secp256k1.py +++ b/libp2p/crypto/secp256k1.py @@ -62,8 +62,9 @@ class Secp256k1PrivateKey(PrivateKey): def create_new_key_pair(secret: bytes = None) -> KeyPair: """ - Returns a new Secp256k1 keypair derived from the provided ``secret``, - a sequence of bytes corresponding to some integer between 0 and the group order. + Returns a new Secp256k1 keypair derived from the provided ``secret``, a + sequence of bytes corresponding to some integer between 0 and the group + order. A valid secret is created if ``None`` is passed. """ diff --git a/libp2p/exceptions.py b/libp2p/exceptions.py index ce2dc340..bdecfad0 100644 --- a/libp2p/exceptions.py +++ b/libp2p/exceptions.py @@ -3,9 +3,7 @@ class BaseLibp2pError(Exception): class ValidationError(BaseLibp2pError): - """ - Raised when something does not pass a validation check. - """ + """Raised when something does not pass a validation check.""" class ParseError(BaseLibp2pError): diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 66e36e09..0a5dae7b 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -32,8 +32,10 @@ logger = logging.getLogger("libp2p.network.basic_host") class BasicHost(IHost): """ - BasicHost is a wrapper of a `INetwork` implementation. It performs protocol negotiation - on a stream with multistream-select right after a stream is initialized. + BasicHost is a wrapper of a `INetwork` implementation. + + It performs protocol negotiation on a stream with multistream-select + right after a stream is initialized. """ _network: INetwork @@ -97,6 +99,7 @@ class BasicHost(IHost): ) -> None: """ set stream handler for given `protocol_id` + :param protocol_id: protocol id used on stream :param stream_handler: a stream handler function """ @@ -128,10 +131,11 @@ class BasicHost(IHost): async def connect(self, peer_info: PeerInfo) -> None: """ - connect ensures there is a connection between this host and the peer with - given `peer_info.peer_id`. connect will absorb the addresses in peer_info into its internal - peerstore. If there is not an active connection, connect will issue a - dial, and block until a connection is opened, or an error is returned. + connect ensures there is a connection between this host and the peer + with given `peer_info.peer_id`. connect will absorb the addresses in + peer_info into its internal peerstore. If there is not an active + connection, connect will issue a dial, and block until a connection is + opened, or an error is returned. :param peer_info: peer_info of the peer we want to connect to :type peer_info: peer.peerinfo.PeerInfo diff --git a/libp2p/host/exceptions.py b/libp2p/host/exceptions.py index ce7b9723..521f87e3 100644 --- a/libp2p/host/exceptions.py +++ b/libp2p/host/exceptions.py @@ -2,9 +2,7 @@ from libp2p.exceptions import BaseLibp2pError class HostException(BaseLibp2pError): - """ - A generic exception in `IHost`. - """ + """A generic exception in `IHost`.""" class ConnectionFailure(HostException): diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index 5700cdf7..6a326b97 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -41,7 +41,8 @@ class IHost(ABC): self, protocol_id: TProtocol, stream_handler: StreamHandlerFn ) -> None: """ - set stream handler for host + set stream handler for host. + :param protocol_id: protocol id used on stream :param stream_handler: a stream handler function """ @@ -61,10 +62,11 @@ class IHost(ABC): @abstractmethod async def connect(self, peer_info: PeerInfo) -> None: """ - connect ensures there is a connection between this host and the peer with - given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal - peerstore. If there is not an active connection, connect will issue a - dial, and block until a connection is opened, or an error is returned. + connect ensures there is a connection between this host and the peer + with given peer_info.peer_id. connect will absorb the addresses in + peer_info into its internal peerstore. If there is not an active + connection, connect will issue a dial, and block until a connection is + opened, or an error is returned. :param peer_info: peer_info of the peer we want to connect to :type peer_info: peer.peerinfo.PeerInfo diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py index 4a9778de..78b6fa54 100644 --- a/libp2p/host/routed_host.py +++ b/libp2p/host/routed_host.py @@ -16,8 +16,9 @@ class RoutedHost(BasicHost): async def connect(self, peer_info: PeerInfo) -> None: """ - connect ensures there is a connection between this host and the peer with - given `peer_info.peer_id`. See (basic_host).connect for more information. + connect ensures there is a connection between this host and the peer + with given `peer_info.peer_id`. See (basic_host).connect for more + information. RoutedHost's Connect differs in that if the host has no addresses for a given peer, it will use its routing system to try to find some. diff --git a/libp2p/io/exceptions.py b/libp2p/io/exceptions.py index d4e1dfae..0f2230f7 100644 --- a/libp2p/io/exceptions.py +++ b/libp2p/io/exceptions.py @@ -6,9 +6,7 @@ class IOException(BaseLibp2pError): class IncompleteReadError(IOException): - """ - Fewer bytes were read than requested. - """ + """Fewer bytes were read than requested.""" class MsgioException(IOException): diff --git a/libp2p/kademlia/__init__.py b/libp2p/kademlia/__init__.py index bce76722..14568595 100644 --- a/libp2p/kademlia/__init__.py +++ b/libp2p/kademlia/__init__.py @@ -1,5 +1,3 @@ -""" -Kademlia is a Python implementation of the Kademlia protocol which -utilizes the asyncio library. -""" +"""Kademlia is a Python implementation of the Kademlia protocol which utilizes +the asyncio library.""" __version__ = "2.0" diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index c649da08..1a5566fb 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -8,9 +8,7 @@ log = logging.getLogger(__name__) class SpiderCrawl: - """ - Crawl the network and look for given 160-bit keys. - """ + """Crawl the network and look for given 160-bit keys.""" def __init__(self, protocol, node, peers, ksize, alpha): """ @@ -75,15 +73,11 @@ class ValueSpiderCrawl(SpiderCrawl): self.nearest_without_value = KadPeerHeap(self.node, 1) async def find(self): - """ - Find either the closest nodes or the value requested. - """ + """Find either the closest nodes or the value requested.""" return await self._find(self.protocol.call_find_value) async def _nodes_found(self, responses): - """ - Handle the result of an iteration in _find. - """ + """Handle the result of an iteration in _find.""" toremove = [] found_values = [] for peerid, response in responses.items(): @@ -107,10 +101,11 @@ class ValueSpiderCrawl(SpiderCrawl): async def _handle_found_values(self, values): """ - We got some values! Exciting. But let's make sure - they're all the same or freak out a little bit. Also, - make sure we tell the nearest node that *didn't* have - the value to store it. + We got some values! + + Exciting. But let's make sure they're all the same or freak out + a little bit. Also, make sure we tell the nearest node that + *didn't* have the value to store it. """ value_counts = Counter(values) if len(value_counts) != 1: @@ -127,15 +122,11 @@ class ValueSpiderCrawl(SpiderCrawl): class NodeSpiderCrawl(SpiderCrawl): async def find(self): - """ - Find the closest nodes. - """ + """Find the closest nodes.""" return await self._find(self.protocol.call_find_node) async def _nodes_found(self, responses): - """ - Handle the result of an iteration in _find. - """ + """Handle the result of an iteration in _find.""" toremove = [] for peerid, response in responses.items(): response = RPCFindResponse(response) @@ -163,9 +154,7 @@ class RPCFindResponse: self.response = response def happened(self): - """ - Did the other host actually respond? - """ + """Did the other host actually respond?""" return self.response[0] def has_value(self): @@ -176,8 +165,9 @@ class RPCFindResponse: def get_node_list(self): """ - Get the node list in the response. If there's no value, this should - be set. + Get the node list in the response. + + If there's no value, this should be set. """ nodelist = self.response[1] or [] return [create_kad_peerinfo(*nodeple) for nodeple in nodelist] diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py index 346f6714..efb20f17 100644 --- a/libp2p/kademlia/kad_peerinfo.py +++ b/libp2p/kademlia/kad_peerinfo.py @@ -30,9 +30,7 @@ class KadPeerInfo(PeerInfo): return sorted(self.addrs) == sorted(node.addrs) def distance_to(self, node): - """ - Get the distance between this node and another. - """ + """Get the distance between this node and another.""" return self.xor_id ^ node.xor_id def __iter__(self): @@ -56,9 +54,7 @@ class KadPeerInfo(PeerInfo): class KadPeerHeap: - """ - A heap of peers ordered by distance to a given node. - """ + """A heap of peers ordered by distance to a given node.""" def __init__(self, node, maxsize): """ @@ -74,11 +70,13 @@ class KadPeerHeap: def remove(self, peers): """ - Remove a list of peer ids from this heap. Note that while this - heap retains a constant visible size (based on the iterator), it's - actual size may be quite a bit larger than what's exposed. Therefore, - removal of nodes may not change the visible size as previously added - nodes suddenly become visible. + Remove a list of peer ids from this heap. + + Note that while this heap retains a constant visible size (based + on the iterator), it's actual size may be quite a bit larger + than what's exposed. Therefore, removal of nodes may not change + the visible size as previously added nodes suddenly become + visible. """ peers = set(peers) if not peers: diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index fcf4b9a8..f0a4991e 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -1,6 +1,4 @@ -""" -Package for interacting on the network at a high level. -""" +"""Package for interacting on the network at a high level.""" import asyncio import logging import pickle @@ -16,8 +14,10 @@ log = logging.getLogger(__name__) class KademliaServer: """ - High level view of a node instance. This is the object that should be - created to start listening as an active node on the network. + High level view of a node instance. + + This is the object that should be created to start listening as an + active node on the network. """ protocol_class = KademliaProtocol @@ -77,10 +77,8 @@ class KademliaServer: self.refresh_loop = loop.call_later(3600, self.refresh_table) async def _refresh_table(self): - """ - Refresh buckets that haven't had any lookups in the last hour - (per section 2.3 of the paper). - """ + """Refresh buckets that haven't had any lookups in the last hour (per + section 2.3 of the paper).""" results = [] for node_id in self.protocol.get_refresh_ids(): node = create_kad_peerinfo(node_id) @@ -99,8 +97,8 @@ class KademliaServer: def bootstrappable_neighbors(self): """ - Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for - use as an argument to the bootstrap method. + Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for use + as an argument to the bootstrap method. The server should have been bootstrapped already - this is just a utility for getting some neighbors and then @@ -153,9 +151,7 @@ class KademliaServer: return await spider.find() async def set(self, key, value): - """ - Set the given string key to the given value in the network. - """ + """Set the given string key to the given value in the network.""" if not check_dht_value_type(value): raise TypeError("Value must be of type int, float, bool, str, or bytes") log.info("setting '%s' = '%s' on network", key, value) @@ -163,9 +159,7 @@ class KademliaServer: return await self.set_digest(dkey, value) async def provide(self, key): - """ - publish to the network that it provides for a particular key - """ + """publish to the network that it provides for a particular key.""" neighbors = self.protocol.router.find_neighbors(self.node) return [ await self.protocol.call_add_provider(n, key, self.node.peer_id_bytes) @@ -173,17 +167,13 @@ class KademliaServer: ] async def get_providers(self, key): - """ - get the list of providers for a key - """ + """get the list of providers for a key.""" neighbors = self.protocol.router.find_neighbors(self.node) return [await self.protocol.call_get_providers(n, key) for n in neighbors] async def set_digest(self, dkey, value): - """ - Set the given SHA1 digest key (bytes) to the given value in the - network. - """ + """Set the given SHA1 digest key (bytes) to the given value in the + network.""" node = create_kad_peerinfo(dkey) nearest = self.protocol.router.find_neighbors(node) @@ -204,10 +194,8 @@ class KademliaServer: return any(await asyncio.gather(*results)) def save_state(self, fname): - """ - Save the state of this node (the alpha/ksize/id/immediate neighbors) - to a cache file with the given fname. - """ + """Save the state of this node (the alpha/ksize/id/immediate neighbors) + to a cache file with the given fname.""" log.info("Saving state to %s", fname) data = { "ksize": self.ksize, @@ -223,10 +211,8 @@ class KademliaServer: @classmethod def load_state(cls, fname): - """ - Load the state of this node (the alpha/ksize/id/immediate neighbors) - from a cache file with the given fname. - """ + """Load the state of this node (the alpha/ksize/id/immediate neighbors) + from a cache file with the given fname.""" log.info("Loading state from %s", fname) with open(fname, "rb") as file: data = pickle.load(file) @@ -237,8 +223,7 @@ class KademliaServer: def save_state_regularly(self, fname, frequency=600): """ - Save the state of node with a given regularity to the given - filename. + Save the state of node with a given regularity to the given filename. Args: fname: File name to save retularly to @@ -253,9 +238,7 @@ class KademliaServer: def check_dht_value_type(value): - """ - Checks to see if the type of the value is a valid type for - placing in the dht. - """ + """Checks to see if the type of the value is a valid type for placing in + the dht.""" typeset = [int, float, bool, str, bytes] return type(value) in typeset diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 8606fc2b..419e41c3 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -11,15 +11,11 @@ log = logging.getLogger(__name__) class KademliaProtocol(RPCProtocol): - """ - There are four main RPCs in the Kademlia protocol - PING, STORE, FIND_NODE, FIND_VALUE - PING probes if a node is still online - STORE instructs a node to store (key, value) - FIND_NODE takes a 160-bit ID and gets back - (ip, udp_port, node_id) for k closest nodes to target - FIND_VALUE behaves like FIND_NODE unless a value is stored - """ + """There are four main RPCs in the Kademlia protocol PING, STORE, + FIND_NODE, FIND_VALUE PING probes if a node is still online STORE instructs + a node to store (key, value) FIND_NODE takes a 160-bit ID and gets back + (ip, udp_port, node_id) for k closest nodes to target FIND_VALUE behaves + like FIND_NODE unless a value is stored.""" def __init__(self, source_node, storage, ksize): RPCProtocol.__init__(self) @@ -28,9 +24,7 @@ class KademliaProtocol(RPCProtocol): self.source_node = source_node def get_refresh_ids(self): - """ - Get ids to search for to keep old buckets up to date. - """ + """Get ids to search for to keep old buckets up to date.""" ids = [] for bucket in self.router.lonely_buckets(): rid = random.randint(*bucket.range).to_bytes(20, byteorder="big") @@ -75,12 +69,10 @@ class KademliaProtocol(RPCProtocol): return {"value": value} def rpc_add_provider(self, sender, nodeid, key, provider_id): - """ - rpc when receiving an add_provider call - should validate received PeerInfo matches sender nodeid - if it does, receipient must store a record in its datastore - we store a map of content_id to peer_id (non xor) - """ + """rpc when receiving an add_provider call should validate received + PeerInfo matches sender nodeid if it does, receipient must store a + record in its datastore we store a map of content_id to peer_id (non + xor)""" if nodeid == provider_id: log.info( "adding provider %s for key %s in local table", provider_id, str(key) @@ -90,11 +82,9 @@ class KademliaProtocol(RPCProtocol): return False def rpc_get_providers(self, sender, key): - """ - rpc when receiving a get_providers call - should look up key in data store and respond with records - plus a list of closer peers in its routing table - """ + """rpc when receiving a get_providers call should look up key in data + store and respond with records plus a list of closer peers in its + routing table.""" providers = [] record = self.storage.get(key, None) @@ -178,8 +168,10 @@ class KademliaProtocol(RPCProtocol): def handle_call_response(self, result, node): """ - If we get a response, add the node to the routing table. If - we get no response, make sure it's removed from the routing table. + If we get a response, add the node to the routing table. + + If we get no response, make sure it's removed from the routing + table. """ if not result[0]: log.warning("no response from %s, removing from router", node) diff --git a/libp2p/kademlia/routing.py b/libp2p/kademlia/routing.py index d0a2b354..f65c90a1 100644 --- a/libp2p/kademlia/routing.py +++ b/libp2p/kademlia/routing.py @@ -8,13 +8,10 @@ from .utils import OrderedSet, bytes_to_bit_string, shared_prefix class KBucket: - """ - each node keeps a list of (ip, udp_port, node_id) - for nodes of distance between 2^i and 2^(i+1) - this list that every node keeps is a k-bucket - each k-bucket implements a last seen eviction - policy except that live nodes are never removed - """ + """each node keeps a list of (ip, udp_port, node_id) for nodes of distance + between 2^i and 2^(i+1) this list that every node keeps is a k-bucket each + k-bucket implements a last seen eviction policy except that live nodes are + never removed.""" def __init__(self, rangeLower, rangeUpper, ksize): self.range = (rangeLower, rangeUpper) @@ -56,8 +53,8 @@ class KBucket: def add_node(self, node): """ - Add a C{Node} to the C{KBucket}. Return True if successful, - False if the bucket is full. + Add a C{Node} to the C{KBucket}. Return True if successful, False if + the bucket is full. If the bucket is full, keep track of node in a replacement list, per section 4.1 of the paper. @@ -100,9 +97,7 @@ class TableTraverser: return self def __next__(self): - """ - Pop an item from the left subtree, then right, then left, etc. - """ + """Pop an item from the left subtree, then right, then left, etc.""" if self.current_nodes: return self.current_nodes.pop() @@ -140,10 +135,7 @@ class RoutingTable: self.buckets.insert(index + 1, two) def lonely_buckets(self): - """ - Get all of the buckets that haven't been updated in over - an hour. - """ + """Get all of the buckets that haven't been updated in over an hour.""" hrago = time.monotonic() - 3600 return [b for b in self.buckets if b.last_updated < hrago] @@ -172,9 +164,7 @@ class RoutingTable: asyncio.ensure_future(self.protocol.call_ping(bucket.head())) def get_bucket_for(self, node): - """ - Get the index of the bucket that the given node would fall into. - """ + """Get the index of the bucket that the given node would fall into.""" for index, bucket in enumerate(self.buckets): if node.xor_id < bucket.range[1]: return index diff --git a/libp2p/kademlia/storage.py b/libp2p/kademlia/storage.py index afd30e85..014853c1 100644 --- a/libp2p/kademlia/storage.py +++ b/libp2p/kademlia/storage.py @@ -8,46 +8,45 @@ import time class IStorage(ABC): """ Local storage for this node. - IStorage implementations of get must return the same type as put in by set + + IStorage implementations of get must return the same type as put in + by set """ @abstractmethod def __setitem__(self, key, value): - """ - Set a key to the given value. - """ + """Set a key to the given value.""" @abstractmethod def __getitem__(self, key): """ - Get the given key. If item doesn't exist, raises C{KeyError} + Get the given key. + + If item doesn't exist, raises C{KeyError} """ @abstractmethod def get(self, key, default=None): """ - Get given key. If not found, return default. + Get given key. + + If not found, return default. """ @abstractmethod def iter_older_than(self, seconds_old): - """ - Return the an iterator over (key, value) tuples for items older - than the given seconds_old. - """ + """Return the an iterator over (key, value) tuples for items older than + the given seconds_old.""" @abstractmethod def __iter__(self): - """ - Get the iterator for this storage, should yield tuple of (key, value) - """ + """Get the iterator for this storage, should yield tuple of (key, + value)""" class ForgetfulStorage(IStorage): def __init__(self, ttl=604800): - """ - By default, max age is a week. - """ + """By default, max age is a week.""" self.data = OrderedDict() self.ttl = ttl diff --git a/libp2p/kademlia/utils.py b/libp2p/kademlia/utils.py index 78ff0827..f18a6713 100644 --- a/libp2p/kademlia/utils.py +++ b/libp2p/kademlia/utils.py @@ -1,6 +1,4 @@ -""" -General catchall for functions that don't make sense as methods. -""" +"""General catchall for functions that don't make sense as methods.""" import asyncio import hashlib import operator @@ -20,7 +18,8 @@ def digest(string): class OrderedSet(list): """ - Acts like a list in all ways, except in the behavior of the + Acts like a list in all ways, except in the behavior of the. + :meth:`push` method. """ diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 2c710d76..08d22055 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -24,9 +24,7 @@ class RawConnection(IRawConnection): self._drain_lock = asyncio.Lock() async def write(self, data: bytes) -> None: - """ - Raise `RawConnError` if the underlying connection breaks - """ + """Raise `RawConnError` if the underlying connection breaks.""" try: self.writer.write(data) except ConnectionResetError as error: @@ -42,8 +40,8 @@ class RawConnection(IRawConnection): async def read(self, n: int = -1) -> bytes: """ - Read up to ``n`` bytes from the underlying stream. - This call is delegated directly to the underlying ``self.reader``. + Read up to ``n`` bytes from the underlying stream. This call is + delegated directly to the underlying ``self.reader``. Raise `RawConnError` if the underlying connection breaks """ diff --git a/libp2p/network/connection/raw_connection_interface.py b/libp2p/network/connection/raw_connection_interface.py index 9be55010..d08e4676 100644 --- a/libp2p/network/connection/raw_connection_interface.py +++ b/libp2p/network/connection/raw_connection_interface.py @@ -2,8 +2,6 @@ from libp2p.io.abc import ReadWriteCloser class IRawConnection(ReadWriteCloser): - """ - A Raw Connection provides a Reader and a Writer - """ + """A Raw Connection provides a Reader and a Writer.""" is_initiator: bool diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 94ddba2c..9e942831 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -30,7 +30,7 @@ class INetwork(ABC): @abstractmethod async def dial_peer(self, peer_id: ID) -> INetConn: """ - dial_peer try to create a connection to peer_id + dial_peer try to create a connection to peer_id. :param peer_id: peer if we want to dial :raises SwarmException: raised when an error occurs @@ -47,9 +47,7 @@ class INetwork(ABC): @abstractmethod def set_stream_handler(self, stream_handler: StreamHandlerFn) -> None: - """ - Set the stream handler for all incoming streams. - """ + """Set the stream handler for all incoming streams.""" @abstractmethod async def listen(self, *multiaddrs: Sequence[Multiaddr]) -> bool: diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 0142721c..7ab609d0 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -39,7 +39,8 @@ class NetStream(INetStream): async def read(self, n: int = -1) -> bytes: """ - reads from stream + reads from stream. + :param n: number of bytes to read :return: bytes of input """ @@ -52,7 +53,8 @@ class NetStream(INetStream): async def write(self, data: bytes) -> int: """ - write to stream + write to stream. + :return: number of bytes written """ try: @@ -61,9 +63,7 @@ class NetStream(INetStream): raise StreamClosed from error async def close(self) -> None: - """ - close stream - """ + """close stream.""" await self.muxed_stream.close() async def reset(self) -> None: diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index 41bf4239..66c611c4 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -23,6 +23,4 @@ class INetStream(ReadWriteCloser): @abstractmethod async def reset(self) -> None: - """ - Close both ends of the stream. - """ + """Close both ends of the stream.""" diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 90c9475a..a697d7da 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -70,7 +70,8 @@ class Swarm(INetwork): async def dial_peer(self, peer_id: ID) -> INetConn: """ - dial_peer try to create a connection to peer_id + dial_peer try to create a connection to peer_id. + :param peer_id: peer if we want to dial :raises SwarmException: raised when an error occurs :return: muxed connection @@ -254,10 +255,9 @@ class Swarm(INetwork): logger.debug("successfully close the connection to peer %s", peer_id) async def add_conn(self, muxed_conn: IMuxedConn) -> SwarmConn: - """ - Add a `IMuxedConn` to `Swarm` as a `SwarmConn`, notify "connected", - and start to monitor the connection for its new streams and disconnection. - """ + """Add a `IMuxedConn` to `Swarm` as a `SwarmConn`, notify "connected", + and start to monitor the connection for its new streams and + disconnection.""" swarm_conn = SwarmConn(muxed_conn, self) # Store muxed_conn with peer id self.connections[muxed_conn.peer_id] = swarm_conn @@ -267,9 +267,8 @@ class Swarm(INetwork): return swarm_conn def remove_conn(self, swarm_conn: SwarmConn) -> None: - """ - Simply remove the connection from Swarm's records, without closing the connection. - """ + """Simply remove the connection from Swarm's records, without closing + the connection.""" peer_id = swarm_conn.muxed_conn.peer_id if peer_id not in self.connections: return diff --git a/libp2p/peer/addrbook_interface.py b/libp2p/peer/addrbook_interface.py index 0ba26145..a4045a76 100644 --- a/libp2p/peer/addrbook_interface.py +++ b/libp2p/peer/addrbook_interface.py @@ -14,6 +14,7 @@ class IAddrBook(ABC): def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int) -> None: """ Calls add_addrs(peer_id, [addr], ttl) + :param peer_id: the peer to add address for :param addr: multiaddress of the peer :param ttl: time-to-live for the address (after this time, address is no longer valid) @@ -22,9 +23,11 @@ class IAddrBook(ABC): @abstractmethod def add_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int) -> None: """ - Adds addresses for a given peer all with the same time-to-live. If one of the - addresses already exists for the peer and has a longer TTL, no operation should take place. - If one of the addresses exists with a shorter TTL, extend the TTL to equal param ttl. + Adds addresses for a given peer all with the same time-to-live. If one + of the addresses already exists for the peer and has a longer TTL, no + operation should take place. If one of the addresses exists with a + shorter TTL, extend the TTL to equal param ttl. + :param peer_id: the peer to add address for :param addr: multiaddresses of the peer :param ttl: time-to-live for the address (after this time, address is no longer valid @@ -40,7 +43,8 @@ class IAddrBook(ABC): @abstractmethod def clear_addrs(self, peer_id: ID) -> None: """ - Removes all previously stored addresses + Removes all previously stored addresses. + :param peer_id: peer to remove addresses of """ diff --git a/libp2p/peer/peerdata.py b/libp2p/peer/peerdata.py index 3ee4c4ce..9273079d 100644 --- a/libp2p/peer/peerdata.py +++ b/libp2p/peer/peerdata.py @@ -44,4 +44,4 @@ class PeerData(IPeerData): class PeerDataError(KeyError): - """Raised when a key is not found in peer metadata""" + """Raised when a key is not found in peer metadata.""" diff --git a/libp2p/peer/peerdata_interface.py b/libp2p/peer/peerdata_interface.py index ca7c2546..e842acb6 100644 --- a/libp2p/peer/peerdata_interface.py +++ b/libp2p/peer/peerdata_interface.py @@ -39,9 +39,7 @@ class IPeerData(ABC): @abstractmethod def clear_addrs(self) -> None: - """ - Clear all addresses - """ + """Clear all addresses.""" @abstractmethod def put_metadata(self, key: str, val: Any) -> None: diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index a416c471..65f6eb7c 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -18,9 +18,9 @@ class PeerStore(IPeerStore): def __create_or_get_peer(self, peer_id: ID) -> PeerData: """ - Returns the peer data for peer_id or creates a new - peer data (and stores it in peer_map) if peer - data for peer_id does not yet exist + Returns the peer data for peer_id or creates a new peer data (and + stores it in peer_map) if peer data for peer_id does not yet exist. + :param peer_id: peer ID :return: peer data """ @@ -96,4 +96,4 @@ class PeerStore(IPeerStore): class PeerStoreError(KeyError): - """Raised when peer ID is not found in peer store""" + """Raised when peer ID is not found in peer store.""" diff --git a/libp2p/protocol_muxer/exceptions.py b/libp2p/protocol_muxer/exceptions.py index a34e318c..ffcc5d57 100644 --- a/libp2p/protocol_muxer/exceptions.py +++ b/libp2p/protocol_muxer/exceptions.py @@ -2,12 +2,12 @@ from libp2p.exceptions import BaseLibp2pError class MultiselectCommunicatorError(BaseLibp2pError): - """Raised when an error occurs during read/write via communicator""" + """Raised when an error occurs during read/write via communicator.""" class MultiselectError(BaseLibp2pError): - """Raised when an error occurs in multiselect process""" + """Raised when an error occurs in multiselect process.""" class MultiselectClientError(BaseLibp2pError): - """Raised when an error occurs in protocol selection process""" + """Raised when an error occurs in protocol selection process.""" diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 81267e3f..2ea1e748 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -11,11 +11,9 @@ PROTOCOL_NOT_FOUND_MSG = "na" class Multiselect(IMultiselectMuxer): - """ - Multiselect module that is responsible for responding to - a multiselect client and deciding on - a specific protocol and handler pair to use for communication - """ + """Multiselect module that is responsible for responding to a multiselect + client and deciding on a specific protocol and handler pair to use for + communication.""" handlers: Dict[TProtocol, StreamHandlerFn] @@ -28,7 +26,8 @@ class Multiselect(IMultiselectMuxer): def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None: """ - Store the handler with the given protocol + Store the handler with the given protocol. + :param protocol: protocol name :param handler: handler function """ @@ -38,7 +37,8 @@ class Multiselect(IMultiselectMuxer): self, communicator: IMultiselectCommunicator ) -> Tuple[TProtocol, StreamHandlerFn]: """ - Negotiate performs protocol selection + Negotiate performs protocol selection. + :param stream: stream to negotiate on :return: selected protocol name, handler function :raise MultiselectError: raised when negotiation failed @@ -70,7 +70,8 @@ class Multiselect(IMultiselectMuxer): async def handshake(self, communicator: IMultiselectCommunicator) -> None: """ - Perform handshake to agree on multiselect protocol + Perform handshake to agree on multiselect protocol. + :param communicator: communicator to use :raise MultiselectError: raised when handshake failed """ @@ -93,7 +94,8 @@ class Multiselect(IMultiselectMuxer): def is_valid_handshake(handshake_contents: str) -> bool: """ - Determine if handshake is valid and should be confirmed + Determine if handshake is valid and should be confirmed. + :param handshake_contents: contents of handshake message :return: true if handshake is complete, false otherwise """ diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index c940dfc8..89fc9754 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -11,15 +11,14 @@ PROTOCOL_NOT_FOUND_MSG = "na" class MultiselectClient(IMultiselectClient): - """ - Client for communicating with receiver's multiselect - module in order to select a protocol id to communicate over - """ + """Client for communicating with receiver's multiselect module in order to + select a protocol id to communicate over.""" async def handshake(self, communicator: IMultiselectCommunicator) -> None: """ - Ensure that the client and multiselect - are both using the same multiselect protocol + Ensure that the client and multiselect are both using the same + multiselect protocol. + :param stream: stream to communicate with multiselect over :raise MultiselectClientError: raised when handshake failed """ @@ -40,9 +39,10 @@ class MultiselectClient(IMultiselectClient): self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator ) -> TProtocol: """ - For each protocol, send message to multiselect selecting protocol - and fail if multiselect does not return same protocol. Returns first + For each protocol, send message to multiselect selecting protocol and + fail if multiselect does not return same protocol. Returns first protocol that multiselect agrees on (i.e. that multiselect selects) + :param protocol: protocol to select :param stream: stream to communicate with multiselect over :return: selected protocol @@ -63,7 +63,8 @@ class MultiselectClient(IMultiselectClient): self, communicator: IMultiselectCommunicator, protocol: TProtocol ) -> TProtocol: """ - Try to select the given protocol or raise exception if fails + Try to select the given protocol or raise exception if fails. + :param communicator: communicator to use to communicate with counterparty :param protocol: protocol to select :raise MultiselectClientError: raised when protocol negotiation failed @@ -88,7 +89,8 @@ class MultiselectClient(IMultiselectClient): def is_valid_handshake(handshake_contents: str) -> bool: """ - Determine if handshake is valid and should be confirmed + Determine if handshake is valid and should be confirmed. + :param handshake_contents: contents of handshake message :return: true if handshake is complete, false otherwise """ diff --git a/libp2p/protocol_muxer/multiselect_client_interface.py b/libp2p/protocol_muxer/multiselect_client_interface.py index e51c01fa..fffc2fbe 100644 --- a/libp2p/protocol_muxer/multiselect_client_interface.py +++ b/libp2p/protocol_muxer/multiselect_client_interface.py @@ -8,15 +8,14 @@ from libp2p.typing import TProtocol class IMultiselectClient(ABC): - """ - Client for communicating with receiver's multiselect - module in order to select a protocol id to communicate over - """ + """Client for communicating with receiver's multiselect module in order to + select a protocol id to communicate over.""" async def handshake(self, communicator: IMultiselectCommunicator) -> None: """ - Ensure that the client and multiselect - are both using the same multiselect protocol + Ensure that the client and multiselect are both using the same + multiselect protocol. + :param stream: stream to communicate with multiselect over :raise Exception: multiselect protocol ID mismatch """ @@ -26,9 +25,10 @@ class IMultiselectClient(ABC): self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator ) -> TProtocol: """ - For each protocol, send message to multiselect selecting protocol - and fail if multiselect does not return same protocol. Returns first + For each protocol, send message to multiselect selecting protocol and + fail if multiselect does not return same protocol. Returns first protocol that multiselect agrees on (i.e. that multiselect selects) + :param protocol: protocol to select :param stream: stream to communicate with multiselect over :return: selected protocol @@ -38,7 +38,8 @@ class IMultiselectClient(ABC): self, communicator: IMultiselectCommunicator, protocol: TProtocol ) -> TProtocol: """ - Try to select the given protocol or raise exception if fails + Try to select the given protocol or raise exception if fails. + :param communicator: communicator to use to communicate with counterparty :param protocol: protocol to select :raise Exception: error in protocol selection diff --git a/libp2p/protocol_muxer/multiselect_communicator_interface.py b/libp2p/protocol_muxer/multiselect_communicator_interface.py index d72142bb..1fad2897 100644 --- a/libp2p/protocol_muxer/multiselect_communicator_interface.py +++ b/libp2p/protocol_muxer/multiselect_communicator_interface.py @@ -2,21 +2,18 @@ from abc import ABC, abstractmethod class IMultiselectCommunicator(ABC): - """ - Communicator helper class that ensures both the client - and multistream module will follow the same multistream protocol, - which is necessary for them to work - """ + """Communicator helper class that ensures both the client and multistream + module will follow the same multistream protocol, which is necessary for + them to work.""" @abstractmethod async def write(self, msg_str: str) -> None: """ - Write message to stream + Write message to stream. + :param msg_str: message to write """ @abstractmethod async def read(self) -> str: - """ - Reads message from stream until EOF - """ + """Reads message from stream until EOF.""" diff --git a/libp2p/protocol_muxer/multiselect_muxer_interface.py b/libp2p/protocol_muxer/multiselect_muxer_interface.py index acfbdbdc..66e0392a 100644 --- a/libp2p/protocol_muxer/multiselect_muxer_interface.py +++ b/libp2p/protocol_muxer/multiselect_muxer_interface.py @@ -7,18 +7,17 @@ from .multiselect_communicator_interface import IMultiselectCommunicator class IMultiselectMuxer(ABC): - """ - Multiselect module that is responsible for responding to - a multiselect client and deciding on - a specific protocol and handler pair to use for communication - """ + """Multiselect module that is responsible for responding to a multiselect + client and deciding on a specific protocol and handler pair to use for + communication.""" handlers: Dict[TProtocol, StreamHandlerFn] @abstractmethod def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None: """ - Store the handler with the given protocol + Store the handler with the given protocol. + :param protocol: protocol name :param handler: handler function """ @@ -28,7 +27,8 @@ class IMultiselectMuxer(ABC): self, communicator: IMultiselectCommunicator ) -> Tuple[TProtocol, StreamHandlerFn]: """ - Negotiate performs protocol selection + Negotiate performs protocol selection. + :param stream: stream to negotiate on :return: selected protocol name, handler function :raise Exception: negotiation failed exception diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 042b13fe..15ca6b02 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -34,33 +34,37 @@ class FloodSub(IPubsubRouter): """ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to """ self.pubsub = pubsub def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: """ - Notifies the router that a new peer has been connected + Notifies the router that a new peer has been connected. + :param peer_id: id of peer to add """ def remove_peer(self, peer_id: ID) -> None: """ - Notifies the router that a peer has been disconnected + Notifies the router that a peer has been disconnected. + :param peer_id: id of peer to remove """ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: """ - Invoked to process control messages in the RPC envelope. - It is invoked after subscriptions and payload messages have been processed + Invoked to process control messages in the RPC envelope. It is invoked + after subscriptions and payload messages have been processed. + :param rpc: rpc message """ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: """ - Invoked to forward a new message that has been validated. - This is where the "flooding" part of floodsub happens + Invoked to forward a new message that has been validated. This is where + the "flooding" part of floodsub happens. With flooding, routing is almost trivial: for each incoming message, forward to all known peers in the topic. There is a bit of logic, @@ -89,9 +93,9 @@ class FloodSub(IPubsubRouter): async def join(self, topic: str) -> None: """ - Join notifies the router that we want to receive and - forward messages in a topic. It is invoked after the - subscription announcement + Join notifies the router that we want to receive and forward messages + in a topic. It is invoked after the subscription announcement. + :param topic: topic to join """ @@ -99,6 +103,7 @@ class FloodSub(IPubsubRouter): """ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement. + :param topic: topic to leave """ @@ -107,6 +112,7 @@ class FloodSub(IPubsubRouter): ) -> Iterable[ID]: """ Get the eligible peers to send the data to. + :param msg_forwarder: peer ID of the peer who forwards the message to us. :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 3b73f0e9..864189e4 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -97,6 +97,7 @@ class GossipSub(IPubsubRouter): """ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to """ self.pubsub = pubsub @@ -109,7 +110,8 @@ class GossipSub(IPubsubRouter): def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: """ - Notifies the router that a new peer has been connected + Notifies the router that a new peer has been connected. + :param peer_id: id of peer to add :param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub """ @@ -130,7 +132,8 @@ class GossipSub(IPubsubRouter): def remove_peer(self, peer_id: ID) -> None: """ - Notifies the router that a peer has been disconnected + Notifies the router that a peer has been disconnected. + :param peer_id: id of peer to remove """ logger.debug("removing peer %s", peer_id) @@ -145,8 +148,9 @@ class GossipSub(IPubsubRouter): async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: """ - Invoked to process control messages in the RPC envelope. - It is invoked after subscriptions and payload messages have been processed + Invoked to process control messages in the RPC envelope. It is invoked + after subscriptions and payload messages have been processed. + :param rpc: RPC message :param sender_peer_id: id of the peer who sent the message """ @@ -167,9 +171,7 @@ class GossipSub(IPubsubRouter): await self.handle_prune(prune, sender_peer_id) async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: - """ - Invoked to forward a new message that has been validated. - """ + """Invoked to forward a new message that has been validated.""" self.mcache.put(pubsub_msg) peers_gen = self._get_peers_to_send( @@ -193,6 +195,7 @@ class GossipSub(IPubsubRouter): ) -> Iterable[ID]: """ Get the eligible peers to send the data to. + :param msg_forwarder: the peer id of the peer who forwards the message to me. :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. @@ -234,9 +237,9 @@ class GossipSub(IPubsubRouter): async def join(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec """ - Join notifies the router that we want to receive and - forward messages in a topic. It is invoked after the - subscription announcement + Join notifies the router that we want to receive and forward messages + in a topic. It is invoked after the subscription announcement. + :param topic: topic to join """ logger.debug("joining topic %s", topic) @@ -274,6 +277,7 @@ class GossipSub(IPubsubRouter): """ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement. + :param topic: topic to leave """ logger.debug("leaving topic %s", topic) @@ -291,6 +295,7 @@ class GossipSub(IPubsubRouter): async def heartbeat(self) -> None: """ Call individual heartbeats. + Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat """ @@ -453,9 +458,8 @@ class GossipSub(IPubsubRouter): async def handle_ihave( self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID ) -> None: - """ - Checks the seen set and requests unknown messages with an IWANT message. - """ + """Checks the seen set and requests unknown messages with an IWANT + message.""" # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache seen_seqnos_and_peers = [ seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys() @@ -477,9 +481,8 @@ class GossipSub(IPubsubRouter): async def handle_iwant( self, iwant_msg: rpc_pb2.ControlIWant, sender_peer_id: ID ) -> None: - """ - Forwards all request messages that are present in mcache to the requesting peer. - """ + """Forwards all request messages that are present in mcache to the + requesting peer.""" # FIXME: Update type of message ID # FIXME: Find a better way to parse the msg ids msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs] @@ -536,9 +539,7 @@ class GossipSub(IPubsubRouter): # RPC emitters async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None: - """ - Emit ihave message, sent to to_peer, for topic and msg_ids - """ + """Emit ihave message, sent to to_peer, for topic and msg_ids.""" ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave() ihave_msg.messageIDs.extend(msg_ids) @@ -550,9 +551,7 @@ class GossipSub(IPubsubRouter): await self.emit_control_message(control_msg, to_peer) async def emit_iwant(self, msg_ids: Any, to_peer: ID) -> None: - """ - Emit iwant message, sent to to_peer, for msg_ids - """ + """Emit iwant message, sent to to_peer, for msg_ids.""" iwant_msg: rpc_pb2.ControlIWant = rpc_pb2.ControlIWant() iwant_msg.messageIDs.extend(msg_ids) @@ -563,9 +562,7 @@ class GossipSub(IPubsubRouter): await self.emit_control_message(control_msg, to_peer) async def emit_graft(self, topic: str, to_peer: ID) -> None: - """ - Emit graft message, sent to to_peer, for topic - """ + """Emit graft message, sent to to_peer, for topic.""" graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft() graft_msg.topicID = topic @@ -576,9 +573,7 @@ class GossipSub(IPubsubRouter): await self.emit_control_message(control_msg, to_peer) async def emit_prune(self, topic: str, to_peer: ID) -> None: - """ - Emit graft message, sent to to_peer, for topic - """ + """Emit graft message, sent to to_peer, for topic.""" prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() prune_msg.topicID = topic diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py index 159047b9..233b7cf1 100644 --- a/libp2p/pubsub/mcache.py +++ b/libp2p/pubsub/mcache.py @@ -15,6 +15,7 @@ class CacheEntry: def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None: """ Constructor. + :param mid: (seqno, from_id) of the msg :param topics: list of topics this message was sent on """ @@ -34,6 +35,7 @@ class MessageCache: def __init__(self, window_size: int, history_size: int) -> None: """ Constructor. + :param window_size: Size of the window desired. :param history_size: Size of the history desired. :return: the MessageCache @@ -51,6 +53,7 @@ class MessageCache: def put(self, msg: rpc_pb2.Message) -> None: """ Put a message into the mcache. + :param msg: The rpc message to put in. Should contain seqno and from_id """ mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id) @@ -61,6 +64,7 @@ class MessageCache: def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]: """ Get a message from the mcache. + :param mid: (seqno, from_id) of the message to get. :return: The rpc message associated with this mid """ @@ -72,6 +76,7 @@ class MessageCache: def window(self, topic: str) -> List[Tuple[bytes, bytes]]: """ Get the window for this topic. + :param topic: Topic whose message ids we desire. :return: List of mids in the current window. """ @@ -86,9 +91,8 @@ class MessageCache: return mids def shift(self) -> None: - """ - Shift the window over by 1 position, dropping the last element of the history. - """ + """Shift the window over by 1 position, dropping the last element of + the history.""" last_entries: List[CacheEntry] = self.history[len(self.history) - 1] for entry in last_entries: diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 51af1765..7be2d288 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -84,9 +84,12 @@ class Pubsub: """ Construct a new Pubsub object, which is responsible for handling all Pubsub-related messages and relaying messages as appropriate to the - Pubsub router (which is responsible for choosing who to send messages to). + Pubsub router (which is responsible for choosing who to send messages + to). + Since the logic for choosing peers to send pubsub messages to is - in the router, the same Pubsub impl can back floodsub, gossipsub, etc. + in the router, the same Pubsub impl can back floodsub, + gossipsub, etc. """ self.host = host self.router = router @@ -136,10 +139,8 @@ class Pubsub: asyncio.ensure_future(self.handle_peer_queue()) def get_hello_packet(self) -> rpc_pb2.RPC: - """ - Generate subscription message with all topics we are subscribed to - only send hello packet if we have subscribed topics - """ + """Generate subscription message with all topics we are subscribed to + only send hello packet if we have subscribed topics.""" packet = rpc_pb2.RPC() for topic_id in self.my_topics: packet.subscriptions.extend( @@ -149,8 +150,9 @@ class Pubsub: async def continuously_read_stream(self, stream: INetStream) -> None: """ - Read from input stream in an infinite loop. Process - messages from other nodes + Read from input stream in an infinite loop. Process messages from other + nodes. + :param stream: stream to continously read from """ peer_id = stream.muxed_conn.peer_id @@ -208,7 +210,9 @@ class Pubsub: self, topic: str, validator: ValidatorFn, is_async_validator: bool ) -> None: """ - Register a validator under the given topic. One topic can only have one validtor. + Register a validator under the given topic. One topic can only have one + validtor. + :param topic: the topic to register validator under :param validator: the validator used to validate messages published to the topic :param is_async_validator: indicate if the validator is an asynchronous validator @@ -218,6 +222,7 @@ class Pubsub: def remove_topic_validator(self, topic: str) -> None: """ Remove the validator from the given topic. + :param topic: the topic to remove validator from """ if topic in self.topic_validators: @@ -226,6 +231,7 @@ class Pubsub: def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: """ Get all validators corresponding to the topics in the message. + :param msg: the message published to the topic """ return tuple( @@ -236,8 +242,9 @@ class Pubsub: async def stream_handler(self, stream: INetStream) -> None: """ - Stream handler for pubsub. Gets invoked whenever a new stream is created - on one of the supported pubsub protocols. + Stream handler for pubsub. Gets invoked whenever a new stream is + created on one of the supported pubsub protocols. + :param stream: newly created stream """ try: @@ -293,7 +300,8 @@ class Pubsub: """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as - defined in the subscription message + defined in the subscription message. + :param origin_id: id of the peer who subscribe to the message :param sub_message: RPC.SubOpts """ @@ -311,7 +319,8 @@ class Pubsub: # FIXME(mhchia): Change the function name? async def handle_talk(self, publish_message: rpc_pb2.Message) -> None: """ - Put incoming message from a peer onto my blocking queue + Put incoming message from a peer onto my blocking queue. + :param publish_message: RPC.Message format """ @@ -325,7 +334,8 @@ class Pubsub: async def subscribe(self, topic_id: str) -> "asyncio.Queue[rpc_pb2.Message]": """ - Subscribe ourself to a topic + Subscribe ourself to a topic. + :param topic_id: topic_id to subscribe to """ @@ -355,7 +365,8 @@ class Pubsub: async def unsubscribe(self, topic_id: str) -> None: """ - Unsubscribe ourself from a topic + Unsubscribe ourself from a topic. + :param topic_id: topic_id to unsubscribe from """ @@ -381,7 +392,8 @@ class Pubsub: async def message_all_peers(self, raw_msg: bytes) -> None: """ - Broadcast a message to peers + Broadcast a message to peers. + :param raw_msg: raw contents of the message to broadcast """ @@ -392,7 +404,8 @@ class Pubsub: async def publish(self, topic_id: str, data: bytes) -> None: """ - Publish data to a topic + Publish data to a topic. + :param topic_id: topic which we are going to publish the data to :param data: data which we are publishing """ @@ -412,7 +425,8 @@ class Pubsub: async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: """ - Validate the received message + Validate the received message. + :param msg_forwarder: the peer who forward us the message. :param msg: the message. """ @@ -442,6 +456,7 @@ class Pubsub: async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: """ Push a pubsub message to others. + :param msg_forwarder: the peer who forward us the message. :param msg: the message we are going to push out. """ @@ -481,9 +496,7 @@ class Pubsub: await self.router.publish(msg_forwarder, msg) def _next_seqno(self) -> bytes: - """ - Make the next message sequence id. - """ + """Make the next message sequence id.""" self.counter += 1 return self.counter.to_bytes(8, "big") diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 85c0bd8d..19be6123 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -31,8 +31,10 @@ class PubsubNotifee(INotifee): async def connected(self, network: INetwork, conn: INetConn) -> None: """ - Add peer_id to initiator_peers_queue, so that this peer_id can be used to - create a stream and we only want to have one pubsub stream with each peer. + Add peer_id to initiator_peers_queue, so that this peer_id can be used + to create a stream and we only want to have one pubsub stream with each + peer. + :param network: network the connection was opened on :param conn: connection that was opened """ diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 5534c297..99a9be75 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -22,20 +22,23 @@ class IPubsubRouter(ABC): """ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to """ @abstractmethod def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: """ - Notifies the router that a new peer has been connected + Notifies the router that a new peer has been connected. + :param peer_id: id of peer to add """ @abstractmethod def remove_peer(self, peer_id: ID) -> None: """ - Notifies the router that a peer has been disconnected + Notifies the router that a peer has been disconnected. + :param peer_id: id of peer to remove """ @@ -54,7 +57,8 @@ class IPubsubRouter(ABC): @abstractmethod async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: """ - Invoked to forward a new message that has been validated + Invoked to forward a new message that has been validated. + :param msg_forwarder: peer_id of message sender :param pubsub_msg: pubsub message to forward """ @@ -62,9 +66,9 @@ class IPubsubRouter(ABC): @abstractmethod async def join(self, topic: str) -> None: """ - Join notifies the router that we want to receive and - forward messages in a topic. It is invoked after the - subscription announcement + Join notifies the router that we want to receive and forward messages + in a topic. It is invoked after the subscription announcement. + :param topic: topic to join """ @@ -73,5 +77,6 @@ class IPubsubRouter(ABC): """ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement. + :param topic: topic to leave """ diff --git a/libp2p/pubsub/validators.py b/libp2p/pubsub/validators.py index e575980d..d1972075 100644 --- a/libp2p/pubsub/validators.py +++ b/libp2p/pubsub/validators.py @@ -2,6 +2,7 @@ def signature_validator(pubkey: bytes, msg: bytes) -> bool: """ Verify the message against the given public key. + :param pubkey: the public key which signs the message. :param msg: the message signed. """ diff --git a/libp2p/routing/interfaces.py b/libp2p/routing/interfaces.py index 4c2cd864..24cad7e1 100644 --- a/libp2p/routing/interfaces.py +++ b/libp2p/routing/interfaces.py @@ -9,24 +9,21 @@ class IContentRouting(ABC): @abstractmethod def provide(self, cid: bytes, announce: bool = True) -> None: """ - Provide adds the given cid to the content routing system. If announce is True, - it also announces it, otherwise it is just kept in the local - accounting of which objects are being provided. + Provide adds the given cid to the content routing system. + + If announce is True, it also announces it, otherwise it is just + kept in the local accounting of which objects are being + provided. """ @abstractmethod def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]: - """ - Search for peers who are able to provide a given key - returns an iterator of peer.PeerInfo - """ + """Search for peers who are able to provide a given key returns an + iterator of peer.PeerInfo.""" class IPeerRouting(ABC): @abstractmethod async def find_peer(self, peer_id: ID) -> PeerInfo: - """ - Find specific Peer - FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo - with relevant addresses. - """ + """Find specific Peer FindPeer searches for a peer with given peer_id, + returns a peer.PeerInfo with relevant addresses.""" diff --git a/libp2p/routing/kademlia/kademlia_content_router.py b/libp2p/routing/kademlia/kademlia_content_router.py index e66643c2..b623a252 100644 --- a/libp2p/routing/kademlia/kademlia_content_router.py +++ b/libp2p/routing/kademlia/kademlia_content_router.py @@ -7,15 +7,15 @@ from libp2p.routing.interfaces import IContentRouting class KadmeliaContentRouter(IContentRouting): def provide(self, cid: bytes, announce: bool = True) -> None: """ - Provide adds the given cid to the content routing system. If announce is True, - it also announces it, otherwise it is just kept in the local - accounting of which objects are being provided. + Provide adds the given cid to the content routing system. + + If announce is True, it also announces it, otherwise it is just + kept in the local accounting of which objects are being + provided. """ # the DHT finds the closest peers to `key` using the `FIND_NODE` RPC # then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers. def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]: - """ - Search for peers who are able to provide a given key - returns an iterator of peer.PeerInfo - """ + """Search for peers who are able to provide a given key returns an + iterator of peer.PeerInfo.""" diff --git a/libp2p/routing/kademlia/kademlia_peer_router.py b/libp2p/routing/kademlia/kademlia_peer_router.py index 352c7dff..32f20757 100644 --- a/libp2p/routing/kademlia/kademlia_peer_router.py +++ b/libp2p/routing/kademlia/kademlia_peer_router.py @@ -16,7 +16,8 @@ class KadmeliaPeerRouter(IPeerRouting): async def find_peer(self, peer_id: ID) -> PeerInfo: """ - Find a specific peer + Find a specific peer. + :param peer_id: peer to search for :return: PeerInfo of specified peer """ diff --git a/libp2p/security/base_session.py b/libp2p/security/base_session.py index 01450be3..cce1b6cf 100644 --- a/libp2p/security/base_session.py +++ b/libp2p/security/base_session.py @@ -6,10 +6,8 @@ from libp2p.security.secure_conn_interface import ISecureConn class BaseSession(ISecureConn): - """ - ``BaseSession`` is not fully instantiated from its abstract classes as it - is only meant to be used in clases that derive from it. - """ + """``BaseSession`` is not fully instantiated from its abstract classes as + it is only meant to be used in clases that derive from it.""" local_peer: ID local_private_key: PrivateKey diff --git a/libp2p/security/base_transport.py b/libp2p/security/base_transport.py index 10d7b663..916ccce8 100644 --- a/libp2p/security/base_transport.py +++ b/libp2p/security/base_transport.py @@ -12,12 +12,12 @@ def default_secure_bytes_provider(n: int) -> bytes: class BaseSecureTransport(ISecureTransport): """ - ``BaseSecureTransport`` is not fully instantiated from its abstract classes as it - is only meant to be used in clases that derive from it. + ``BaseSecureTransport`` is not fully instantiated from its abstract classes + as it is only meant to be used in clases that derive from it. - Clients can provide a strategy to get cryptographically secure bytes of a given length. - A default implementation is provided using the ``secrets`` module from the - standard library. + Clients can provide a strategy to get cryptographically secure bytes + of a given length. A default implementation is provided using the + ``secrets`` module from the standard library. """ def __init__( diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index fd0701dd..4199c612 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -46,9 +46,7 @@ class InsecureSession(BaseSession): await self.conn.close() async def run_handshake(self) -> None: - """ - Raise `HandshakeFailure` when handshake failed - """ + """Raise `HandshakeFailure` when handshake failed.""" msg = make_exchange_message(self.local_private_key.get_public_key()) msg_bytes = msg.SerializeToString() encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) @@ -104,15 +102,16 @@ class InsecureSession(BaseSession): class InsecureTransport(BaseSecureTransport): - """ - ``InsecureTransport`` provides the "identity" upgrader for a ``IRawConnection``, - i.e. the upgraded transport does not add any additional security. - """ + """``InsecureTransport`` provides the "identity" upgrader for a + ``IRawConnection``, i.e. the upgraded transport does not add any additional + security.""" async def secure_inbound(self, conn: IRawConnection) -> ISecureConn: """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are not the initiator) + Secure the connection, either locally or by communicating with opposing + node via conn, for an inbound connection (i.e. we are not the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ session = InsecureSession(self.local_peer, self.local_private_key, conn, False) @@ -121,8 +120,9 @@ class InsecureTransport(BaseSecureTransport): async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are the initiator) + Secure the connection, either locally or by communicating with opposing + node via conn, for an inbound connection (i.e. we are the initiator) + :return: secure connection object (that implements secure_conn_interface) """ session = InsecureSession( diff --git a/libp2p/security/secio/exceptions.py b/libp2p/security/secio/exceptions.py index d86ce3b0..ca6d37fa 100644 --- a/libp2p/security/secio/exceptions.py +++ b/libp2p/security/secio/exceptions.py @@ -6,10 +6,8 @@ class SecioException(HandshakeFailure): class SelfEncryption(SecioException): - """ - Raised to indicate that a host is attempting to encrypt communications - with itself. - """ + """Raised to indicate that a host is attempting to encrypt communications + with itself.""" pass diff --git a/libp2p/security/secio/transport.py b/libp2p/security/secio/transport.py index 4eb6a8bf..6a4c8673 100644 --- a/libp2p/security/secio/transport.py +++ b/libp2p/security/secio/transport.py @@ -143,10 +143,8 @@ class SecureSession(BaseSession): @dataclass(frozen=True) class Proposal: - """ - A ``Proposal`` represents the set of session parameters one peer in a pair of - peers attempting to negotiate a `secio` channel prefers. - """ + """A ``Proposal`` represents the set of session parameters one peer in a + pair of peers attempting to negotiate a `secio` channel prefers.""" nonce: bytes public_key: PublicKey @@ -408,9 +406,9 @@ async def create_secure_session( ) -> ISecureConn: """ Attempt the initial `secio` handshake with the remote peer. - If successful, return an object that provides secure communication to the - ``remote_peer``. - Raise `SecioException` when `conn` closed. + + If successful, return an object that provides secure communication + to the ``remote_peer``. Raise `SecioException` when `conn` closed. Raise `InconsistentNonce` when handshake failed """ msg_io = MsgIOReadWriter(conn) @@ -443,18 +441,18 @@ async def create_secure_session( class Transport(BaseSecureTransport): - """ - ``Transport`` provides a security upgrader for a ``IRawConnection``, - following the `secio` protocol defined in the libp2p specs. - """ + """``Transport`` provides a security upgrader for a ``IRawConnection``, + following the `secio` protocol defined in the libp2p specs.""" def get_nonce(self) -> bytes: return self.secure_bytes_provider(NONCE_SIZE) async def secure_inbound(self, conn: IRawConnection) -> ISecureConn: """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are not the initiator) + Secure the connection, either locally or by communicating with opposing + node via conn, for an inbound connection (i.e. we are not the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ local_nonce = self.get_nonce() @@ -469,8 +467,9 @@ class Transport(BaseSecureTransport): self, conn: IRawConnection, peer_id: PeerID ) -> ISecureConn: """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are the initiator) + Secure the connection, either locally or by communicating with opposing + node via conn, for an inbound connection (i.e. we are the initiator) + :return: secure connection object (that implements secure_conn_interface) """ local_nonce = self.get_nonce() diff --git a/libp2p/security/secure_transport_interface.py b/libp2p/security/secure_transport_interface.py index 3b3e6256..678b9c67 100644 --- a/libp2p/security/secure_transport_interface.py +++ b/libp2p/security/secure_transport_interface.py @@ -17,15 +17,18 @@ class ISecureTransport(ABC): @abstractmethod async def secure_inbound(self, conn: IRawConnection) -> ISecureConn: """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are not the initiator) + Secure the connection, either locally or by communicating with opposing + node via conn, for an inbound connection (i.e. we are not the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ @abstractmethod async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are the initiator) + Secure the connection, either locally or by communicating with opposing + node via conn, for an inbound connection (i.e. we are the initiator) + :return: secure connection object (that implements secure_conn_interface) """ diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 7d1c816f..52c957c1 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -23,6 +23,7 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa class SecurityMultistream(ABC): """ SSMuxer is a multistream stream security transport multiplexer. + Go implementation: github.com/libp2p/go-conn-security-multistream/ssms.go """ @@ -41,9 +42,10 @@ class SecurityMultistream(ABC): def add_transport(self, protocol: TProtocol, transport: ISecureTransport) -> None: """ - Add a protocol and its corresponding transport to multistream-select(multiselect). - The order that a protocol is added is exactly the precedence it is negotiated in - multiselect. + Add a protocol and its corresponding transport to multistream- + select(multiselect). The order that a protocol is added is exactly the + precedence it is negotiated in multiselect. + :param protocol: the protocol name, which is negotiated in multiselect. :param transport: the corresponding transportation to the ``protocol``. """ @@ -57,8 +59,10 @@ class SecurityMultistream(ABC): async def secure_inbound(self, conn: IRawConnection) -> ISecureConn: """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are not the initiator) + Secure the connection, either locally or by communicating with opposing + node via conn, for an inbound connection (i.e. we are not the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ transport = await self.select_transport(conn, False) @@ -67,8 +71,9 @@ class SecurityMultistream(ABC): async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are the initiator) + Secure the connection, either locally or by communicating with opposing + node via conn, for an inbound connection (i.e. we are the initiator) + :return: secure connection object (that implements secure_conn_interface) """ transport = await self.select_transport(conn, True) @@ -79,8 +84,9 @@ class SecurityMultistream(ABC): self, conn: IRawConnection, is_initiator: bool ) -> ISecureTransport: """ - Select a transport that both us and the node on the - other end of conn support and agree on + Select a transport that both us and the node on the other end of conn + support and agree on. + :param conn: conn to choose a transport over :param is_initiator: true if we are the initiator, false otherwise :return: selected secure transport diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 3e69fa46..71704c1e 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -15,7 +15,8 @@ class IMuxedConn(ABC): @abstractmethod def __init__(self, conn: ISecureConn, peer_id: ID) -> None: """ - create a new muxed connection + create a new muxed connection. + :param conn: an instance of secured connection for new muxed streams :param peer_id: peer_id of peer the connection is to @@ -28,29 +29,27 @@ class IMuxedConn(ABC): @abstractmethod async def close(self) -> None: - """ - close connection - """ + """close connection.""" @abstractmethod def is_closed(self) -> bool: """ - check connection is fully closed + check connection is fully closed. + :return: true if successful """ @abstractmethod async def open_stream(self) -> "IMuxedStream": """ - creates a new muxed_stream + creates a new muxed_stream. + :return: a new ``IMuxedStream`` stream """ @abstractmethod async def accept_stream(self) -> "IMuxedStream": - """ - accepts a muxed stream opened by the other end - """ + """accepts a muxed stream opened by the other end.""" class IMuxedStream(ReadWriteCloser): @@ -59,14 +58,12 @@ class IMuxedStream(ReadWriteCloser): @abstractmethod async def reset(self) -> None: - """ - closes both ends of the stream - tells this remote side to hang up - """ + """closes both ends of the stream tells this remote side to hang up.""" @abstractmethod def set_deadline(self, ttl: int) -> bool: """ - set deadline for muxed stream + set deadline for muxed stream. + :return: a new stream """ diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 307e78d0..c8d7a209 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -42,7 +42,8 @@ class Mplex(IMuxedConn): def __init__(self, secured_conn: ISecureConn, peer_id: ID) -> None: """ - create a new muxed connection + create a new muxed connection. + :param secured_conn: an instance of ``ISecureConn`` :param generic_protocol_handler: generic protocol handler for new muxed streams @@ -72,9 +73,7 @@ class Mplex(IMuxedConn): return self.secured_conn.is_initiator async def close(self) -> None: - """ - close the stream muxer and underlying secured connection - """ + """close the stream muxer and underlying secured connection.""" if self.event_shutting_down.is_set(): return # Set the `event_shutting_down`, to allow graceful shutdown. @@ -85,14 +84,16 @@ class Mplex(IMuxedConn): def is_closed(self) -> bool: """ - check connection is fully closed + check connection is fully closed. + :return: true if successful """ return self.event_closed.is_set() def _get_next_channel_id(self) -> int: """ - Get next available stream id + Get next available stream id. + :return: next available stream id for the connection """ next_id = self.next_channel_id @@ -107,7 +108,8 @@ class Mplex(IMuxedConn): async def open_stream(self) -> IMuxedStream: """ - creates a new muxed_stream + creates a new muxed_stream. + :return: a new ``MplexStream`` """ channel_id = self._get_next_channel_id() @@ -135,9 +137,7 @@ class Mplex(IMuxedConn): return task_coro.result() async def accept_stream(self) -> IMuxedStream: - """ - accepts a muxed stream opened by the other end - """ + """accepts a muxed stream opened by the other end.""" return await self._wait_until_shutting_down_or_closed( self.new_stream_queue.get() ) @@ -146,7 +146,8 @@ class Mplex(IMuxedConn): self, flag: HeaderTags, data: Optional[bytes], stream_id: StreamID ) -> int: """ - sends a message over the connection + sends a message over the connection. + :param header: header to use :param data: data to send in the message :param stream_id: stream the message is in @@ -165,7 +166,8 @@ class Mplex(IMuxedConn): async def write_to_stream(self, _bytes: bytes) -> int: """ - writes a byte array to a secured connection + writes a byte array to a secured connection. + :param _bytes: byte array to write :return: length written """ @@ -173,9 +175,8 @@ class Mplex(IMuxedConn): return len(_bytes) async def handle_incoming(self) -> None: - """ - Read a message off of the secured connection and add it to the corresponding message buffer - """ + """Read a message off of the secured connection and add it to the + corresponding message buffer.""" while True: try: @@ -190,7 +191,8 @@ class Mplex(IMuxedConn): async def read_message(self) -> Tuple[int, int, bytes]: """ - Read a single message off of the secured connection + Read a single message off of the secured connection. + :return: stream_id, flag, message contents """ @@ -217,6 +219,7 @@ class Mplex(IMuxedConn): async def _handle_incoming_message(self) -> None: """ Read and handle a new incoming message. + :raise MplexUnavailable: `Mplex` encounters fatal error or is shutting down. """ channel_id, flag, message = await self._wait_until_shutting_down_or_closed( diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 8db42121..3f283c81 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -35,7 +35,8 @@ class MplexStream(IMuxedStream): def __init__(self, name: str, stream_id: StreamID, muxed_conn: "Mplex") -> None: """ - create new MuxedStream in muxer + create new MuxedStream in muxer. + :param stream_id: stream id of this stream :param muxed_conn: muxed connection of this muxed_stream """ @@ -113,9 +114,10 @@ class MplexStream(IMuxedStream): async def read(self, n: int = -1) -> bytes: """ - Read up to n bytes. Read possibly returns fewer than `n` bytes, - if there are not enough bytes in the Mplex buffer. - If `n == -1`, read until EOF. + Read up to n bytes. Read possibly returns fewer than `n` bytes, if + there are not enough bytes in the Mplex buffer. If `n == -1`, read + until EOF. + :param n: number of bytes to read :return: bytes actually read """ @@ -142,7 +144,8 @@ class MplexStream(IMuxedStream): async def write(self, data: bytes) -> int: """ - write to stream + write to stream. + :return: number of bytes written """ if self.event_local_closed.is_set(): @@ -155,10 +158,8 @@ class MplexStream(IMuxedStream): return await self.muxed_conn.send_message(flag, data, self.stream_id) async def close(self) -> None: - """ - Closing a stream closes it for writing and closes the remote end for reading - but allows writing in the other direction. - """ + """Closing a stream closes it for writing and closes the remote end for + reading but allows writing in the other direction.""" # TODO error handling with timeout async with self.close_lock: @@ -182,10 +183,7 @@ class MplexStream(IMuxedStream): del self.muxed_conn.streams[self.stream_id] async def reset(self) -> None: - """ - closes both ends of the stream - tells this remote side to hang up - """ + """closes both ends of the stream tells this remote side to hang up.""" async with self.close_lock: # Both sides have been closed. No need to event_reset. if self.event_remote_closed.is_set() and self.event_local_closed.is_set(): @@ -218,7 +216,8 @@ class MplexStream(IMuxedStream): # TODO deadline not in use def set_deadline(self, ttl: int) -> bool: """ - set deadline for muxed stream + set deadline for muxed stream. + :return: True if successful """ self.read_deadline = ttl @@ -227,7 +226,8 @@ class MplexStream(IMuxedStream): def set_read_deadline(self, ttl: int) -> bool: """ - set read deadline for muxed stream + set read deadline for muxed stream. + :return: True if successful """ self.read_deadline = ttl @@ -235,7 +235,8 @@ class MplexStream(IMuxedStream): def set_write_deadline(self, ttl: int) -> bool: """ - set write deadline for muxed stream + set write deadline for muxed stream. + :return: True if successful """ self.write_deadline = ttl diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index 00e14289..f82cd19d 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -18,6 +18,7 @@ DEFAULT_NEGOTIATE_TIMEOUT = 60 class MuxerMultistream: """ MuxerMultistream is a multistream stream muxed transport multiplexer. + go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go """ @@ -35,9 +36,10 @@ class MuxerMultistream: def add_transport(self, protocol: TProtocol, transport: TMuxerClass) -> None: """ - Add a protocol and its corresponding transport to multistream-select(multiselect). - The order that a protocol is added is exactly the precedence it is negotiated in - multiselect. + Add a protocol and its corresponding transport to multistream- + select(multiselect). The order that a protocol is added is exactly the + precedence it is negotiated in multiselect. + :param protocol: the protocol name, which is negotiated in multiselect. :param transport: the corresponding transportation to the ``protocol``. """ @@ -49,8 +51,9 @@ class MuxerMultistream: async def select_transport(self, conn: IRawConnection) -> TMuxerClass: """ - Select a transport that both us and the node on the - other end of conn support and agree on + Select a transport that both us and the node on the other end of conn + support and agree on. + :param conn: conn to choose a transport over :return: selected muxer transport """ diff --git a/libp2p/transport/listener_interface.py b/libp2p/transport/listener_interface.py index 9664f069..1b22531b 100644 --- a/libp2p/transport/listener_interface.py +++ b/libp2p/transport/listener_interface.py @@ -8,7 +8,8 @@ class IListener(ABC): @abstractmethod async def listen(self, maddr: Multiaddr) -> bool: """ - put listener in listening mode and wait for incoming connections + put listener in listening mode and wait for incoming connections. + :param maddr: multiaddr of peer :return: return True if successful """ @@ -16,13 +17,12 @@ class IListener(ABC): @abstractmethod def get_addrs(self) -> List[Multiaddr]: """ - retrieve list of addresses the listener is listening on + retrieve list of addresses the listener is listening on. + :return: return list of addrs """ @abstractmethod async def close(self) -> None: - """ - close the listener such that no more connections - can be open on this transport instance - """ + """close the listener such that no more connections can be open on this + transport instance.""" diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 5ee24283..7470510d 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -23,7 +23,8 @@ class TCPListener(IListener): async def listen(self, maddr: Multiaddr) -> bool: """ - put listener in listening mode and wait for incoming connections + put listener in listening mode and wait for incoming connections. + :param maddr: maddr of peer :return: return True if successful """ @@ -39,17 +40,16 @@ class TCPListener(IListener): def get_addrs(self) -> List[Multiaddr]: """ - retrieve list of addresses the listener is listening on + retrieve list of addresses the listener is listening on. + :return: return list of addrs """ # TODO check if server is listening return self.multiaddrs async def close(self) -> None: - """ - close the listener such that no more connections - can be open on this transport instance - """ + """close the listener such that no more connections can be open on this + transport instance.""" if self.server is None: return self.server.close() @@ -60,7 +60,8 @@ class TCPListener(IListener): class TCP(ITransport): async def dial(self, maddr: Multiaddr) -> IRawConnection: """ - dial a transport to peer listening on multiaddr + dial a transport to peer listening on multiaddr. + :param maddr: multiaddr of peer :return: `RawConnection` if successful :raise OpenConnectionError: raised when failed to open connection @@ -77,9 +78,10 @@ class TCP(ITransport): def create_listener(self, handler_function: THandler) -> TCPListener: """ - create listener on transport + create listener on transport. + :param handler_function: a function called when a new connection is received - that takes a connection as argument which implements interface-connection + that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py """ return TCPListener(handler_function) diff --git a/libp2p/transport/transport_interface.py b/libp2p/transport/transport_interface.py index ca8b5c34..402162bc 100644 --- a/libp2p/transport/transport_interface.py +++ b/libp2p/transport/transport_interface.py @@ -12,7 +12,8 @@ class ITransport(ABC): @abstractmethod async def dial(self, maddr: Multiaddr) -> IRawConnection: """ - dial a transport to peer listening on multiaddr + dial a transport to peer listening on multiaddr. + :param multiaddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) :return: list of multiaddrs @@ -21,8 +22,9 @@ class ITransport(ABC): @abstractmethod def create_listener(self, handler_function: THandler) -> IListener: """ - create listener on transport + create listener on transport. + :param handler_function: a function called when a new conntion is received - that takes a connection as argument which implements interface-connection + that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py """ diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 7c0c5b0e..cd8c26da 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -26,18 +26,14 @@ class TransportUpgrader: self.muxer_multistream = MuxerMultistream(muxer_transports_by_protocol) def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None: - """ - Upgrade multiaddr listeners to libp2p-transport listeners - """ + """Upgrade multiaddr listeners to libp2p-transport listeners.""" # TODO: Figure out what to do with this function. pass async def upgrade_security( self, raw_conn: IRawConnection, peer_id: ID, is_initiator: bool ) -> ISecureConn: - """ - Upgrade conn to a secured connection - """ + """Upgrade conn to a secured connection.""" try: if is_initiator: return await self.security_multistream.secure_outbound( @@ -54,9 +50,7 @@ class TransportUpgrader: ) from error async def upgrade_connection(self, conn: ISecureConn, peer_id: ID) -> IMuxedConn: - """ - Upgrade secured connection to a muxed connection - """ + """Upgrade secured connection to a muxed connection.""" try: return await self.muxer_multistream.new_conn(conn, peer_id) except (MultiselectError, MultiselectClientError) as error: diff --git a/libp2p/utils.py b/libp2p/utils.py index aeb7ae3b..3d0794a1 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -19,7 +19,7 @@ SHIFT_64_BIT_MAX = int(math.ceil(64 / 7)) * 7 def encode_uvarint(number: int) -> bytes: - """Pack `number` into varint bytes""" + """Pack `number` into varint bytes.""" buf = b"" while True: towrite = number & 0x7F @@ -33,9 +33,7 @@ def encode_uvarint(number: int) -> bytes: async def decode_uvarint_from_stream(reader: Reader) -> int: - """ - https://en.wikipedia.org/wiki/LEB128 - """ + """https://en.wikipedia.org/wiki/LEB128.""" res = 0 for shift in itertools.count(0, 7): if shift > SHIFT_64_BIT_MAX: diff --git a/setup.py b/setup.py index 83da86d1..f9c2668d 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ extras_require = { "isort==4.3.21", "flake8>=3.7.7,<4.0.0", ], - "dev": ["tox>=3.13.2,<4.0.0"], + "dev": ["tox>=3.13.2,<4.0.0", "docformatter"], } extras_require["dev"] = ( diff --git a/tests/network/test_notify.py b/tests/network/test_notify.py index aaf0ed55..78e407e9 100644 --- a/tests/network/test_notify.py +++ b/tests/network/test_notify.py @@ -1,6 +1,6 @@ """ -Test Notify and Notifee by ensuring that the proper events get -called, and that the stream passed into opened_stream is correct +Test Notify and Notifee by ensuring that the proper events get called, and that +the stream passed into opened_stream is correct. Note: Listen event does not get hit because MyNotifee is passed into network after network has already started listening diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 98a224fa..ac9e7698 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -21,9 +21,11 @@ CRYPTO_TOPIC = "ethereum" class DummyAccountNode: """ - Node which has an internal balance mapping, meant to serve as - a dummy crypto blockchain. There is no actual blockchain, just a simple - map indicating how much crypto each user in the mappings holds + Node which has an internal balance mapping, meant to serve as a dummy + crypto blockchain. + + There is no actual blockchain, just a simple map indicating how much + crypto each user in the mappings holds """ libp2p_node: IHost @@ -41,8 +43,8 @@ class DummyAccountNode: @classmethod async def create(cls): """ - Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub - instance to this new node + Create a new DummyAccountNode and attach a libp2p node, a floodsub, and + a pubsub instance to this new node. We use create as this serves as a factory function and allows us to use async await, unlike the init function @@ -53,9 +55,7 @@ class DummyAccountNode: return cls(libp2p_node=pubsub.host, pubsub=pubsub, floodsub=pubsub.router) async def handle_incoming_msgs(self): - """ - Handle all incoming messages on the CRYPTO_TOPIC from peers - """ + """Handle all incoming messages on the CRYPTO_TOPIC from peers.""" while True: incoming = await self.q.get() msg_comps = incoming.data.decode("utf-8").split(",") @@ -66,17 +66,17 @@ class DummyAccountNode: self.handle_set_crypto(msg_comps[1], int(msg_comps[2])) async def setup_crypto_networking(self): - """ - Subscribe to CRYPTO_TOPIC and perform call to function that handles - all incoming messages on said topic - """ + """Subscribe to CRYPTO_TOPIC and perform call to function that handles + all incoming messages on said topic.""" self.q = await self.pubsub.subscribe(CRYPTO_TOPIC) asyncio.ensure_future(self.handle_incoming_msgs()) async def publish_send_crypto(self, source_user, dest_user, amount): """ - Create a send crypto message and publish that message to all other nodes + Create a send crypto message and publish that message to all other + nodes. + :param source_user: user to send crypto from :param dest_user: user to send crypto to :param amount: amount of crypto to send @@ -86,7 +86,9 @@ class DummyAccountNode: async def publish_set_crypto(self, user, amount): """ - Create a set crypto message and publish that message to all other nodes + Create a set crypto message and publish that message to all other + nodes. + :param user: user to set crypto for :param amount: amount of crypto """ @@ -95,7 +97,8 @@ class DummyAccountNode: def handle_send_crypto(self, source_user, dest_user, amount): """ - Handle incoming send_crypto message + Handle incoming send_crypto message. + :param source_user: user to send crypto from :param dest_user: user to send crypto to :param amount: amount of crypto to send @@ -112,7 +115,8 @@ class DummyAccountNode: def handle_set_crypto(self, dest_user, amount): """ - Handle incoming set_crypto message + Handle incoming set_crypto message. + :param dest_user: user to set crypto for :param amount: amount of crypto """ @@ -120,7 +124,8 @@ class DummyAccountNode: def get_balance(self, user): """ - Get balance in crypto for a particular user + Get balance in crypto for a particular user. + :param user: user to get balance for :return: balance of user """ diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index edc2f51e..39def71c 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -17,8 +17,9 @@ def create_setup_in_new_thread_func(dummy_node): async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): """ - Helper function to allow for easy construction of custom tests for dummy account nodes - in various network topologies + Helper function to allow for easy construction of custom tests for dummy + account nodes in various network topologies. + :param num_nodes: number of nodes in the test :param adjacency_map: adjacency map defining each node and its list of neighbors :param action_func: function to execute that includes actions by the nodes, diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index d1302072..34f7510b 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -8,7 +8,8 @@ from tests.utils import connect def message_id_generator(start_val): """ - Generate a unique message id + Generate a unique message id. + :param start_val: value to start generating messages at :return: message id """ diff --git a/tests/utils.py b/tests/utils.py index 6c1fa473..67e9668f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -21,9 +21,7 @@ async def connect_swarm(swarm_0, swarm_1): async def connect(node1, node2): - """ - Connect node1 to node2 - """ + """Connect node1 to node2.""" addr = node2.get_addrs()[0] info = info_from_p2p_addr(addr) await node1.connect(info) diff --git a/tests_interop/daemon.py b/tests_interop/daemon.py index 97356845..83aa82d3 100644 --- a/tests_interop/daemon.py +++ b/tests_interop/daemon.py @@ -21,7 +21,9 @@ TIMEOUT_DURATION = 30 async def try_until_success(coro_func, timeout=TIMEOUT_DURATION): """ Keep running ``coro_func`` until either it succeed or time is up. - All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments. + + All arguments of ``coro_func`` should be filled, i.e. it should be + called without arguments. """ t_start = time.monotonic() while True: diff --git a/tox.ini b/tox.ini index 7a6b652f..3fc294ae 100644 --- a/tox.ini +++ b/tox.ini @@ -40,7 +40,8 @@ extras = dev commands = mypy -p libp2p -p examples --config-file {toxinidir}/mypy.ini black --check libp2p tests tests_interop examples setup.py - isort --recursive --check-only libp2p tests tests_interop examples setup.py + isort --recursive --check-only --diff libp2p tests tests_interop examples setup.py + docformatter --pre-summary-newline --check --recursive libp2p tests tests_interop examples setup.py flake8 libp2p tests tests_interop examples setup.py [testenv:py37-interop]