diff --git a/Makefile b/Makefile index 59422bfd..5553f5a4 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,7 @@ all: protobufs format: black $(FILES_TO_LINT) isort --recursive $(FILES_TO_LINT) + docformatter -ir $(FILES_TO_LINT) lintroll: mypy -p libp2p -p examples --config-file mypy.ini diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 24cff711..12cd8e9e 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -25,9 +25,7 @@ from libp2p.typing import TProtocol async def cleanup_done_tasks() -> None: - """ - clean up asyncio done tasks to free up resources - """ + """clean up asyncio done tasks to free up resources.""" while True: for task in asyncio.all_tasks(): if task.done(): @@ -50,8 +48,8 @@ def generate_peer_id_from(key_pair: KeyPair) -> ID: def initialize_default_kademlia_router( ksize: int = 20, alpha: int = 3, id_opt: ID = None, storage: IStorage = None ) -> KadmeliaPeerRouter: - """ - initialize kadmelia router when no kademlia router is passed in + """initialize kadmelia router when no kademlia router is passed in. + :param ksize: The k parameter from the paper :param alpha: The alpha parameter from the paper :param id_opt: optional id for host @@ -80,8 +78,8 @@ def initialize_default_swarm( peerstore_opt: IPeerStore = None, disc_opt: IPeerRouting = None, ) -> Swarm: - """ - initialize swarm when no swarm is passed in + """initialize swarm when no swarm is passed in. + :param id_opt: optional id for host :param transport_opt: optional choice of transport upgrade :param muxer_opt: optional choice of stream muxer @@ -120,8 +118,8 @@ async def new_node( peerstore_opt: IPeerStore = None, disc_opt: IPeerRouting = None, ) -> IHost: - """ - create new libp2p node + """create new libp2p node. + :param key_pair: key pair for deriving an identity :param swarm_opt: optional swarm :param id_opt: optional id for host diff --git a/libp2p/crypto/authenticated_encryption.py b/libp2p/crypto/authenticated_encryption.py index 6900f1f4..d931f273 100644 --- a/libp2p/crypto/authenticated_encryption.py +++ b/libp2p/crypto/authenticated_encryption.py @@ -61,12 +61,9 @@ class MacAndCipher: def initialize_pair( cipher_type: str, hash_type: str, secret: bytes ) -> Tuple[EncryptionParameters, EncryptionParameters]: - """ - Return a pair of ``Keys`` for use in securing a - communications channel with authenticated encryption - derived from the ``secret`` and using the - requested ``cipher_type`` and ``hash_type``. - """ + """Return a pair of ``Keys`` for use in securing a communications channel + with authenticated encryption derived from the ``secret`` and using the + requested ``cipher_type`` and ``hash_type``.""" if cipher_type != "AES-128": raise NotImplementedError() if hash_type != "SHA256": diff --git a/libp2p/crypto/ecc.py b/libp2p/crypto/ecc.py index 10aed18c..0559bf46 100644 --- a/libp2p/crypto/ecc.py +++ b/libp2p/crypto/ecc.py @@ -6,10 +6,8 @@ from libp2p.crypto.keys import KeyPair, KeyType, PrivateKey, PublicKey def infer_local_type(curve: str) -> curve_types.Curve: - """ - converts a ``str`` representation of some elliptic curve to - a representation understood by the backend of this module. - """ + """converts a ``str`` representation of some elliptic curve to a + representation understood by the backend of this module.""" if curve == "P-256": return curve_types.P256 else: @@ -63,9 +61,8 @@ class ECCPrivateKey(PrivateKey): def create_new_key_pair(curve: str) -> KeyPair: - """ - Return a new ECC keypair with the requested ``curve`` type, e.g. "P-256". - """ + """Return a new ECC keypair with the requested ``curve`` type, e.g. + "P-256".""" private_key = ECCPrivateKey.new(curve) public_key = private_key.get_public_key() return KeyPair(private_key, public_key) diff --git a/libp2p/crypto/exceptions.py b/libp2p/crypto/exceptions.py index a9324df2..fdb049c2 100644 --- a/libp2p/crypto/exceptions.py +++ b/libp2p/crypto/exceptions.py @@ -6,9 +6,7 @@ class CryptographyError(BaseLibp2pError): class MissingDeserializerError(CryptographyError): - """ - Raise if the requested deserialization routine is missing for - some type of cryptographic key. - """ + """Raise if the requested deserialization routine is missing for some type + of cryptographic key.""" pass diff --git a/libp2p/crypto/key_exchange.py b/libp2p/crypto/key_exchange.py index 3200df40..17ec75ab 100644 --- a/libp2p/crypto/key_exchange.py +++ b/libp2p/crypto/key_exchange.py @@ -9,9 +9,7 @@ SharedKeyGenerator = Callable[[bytes], bytes] def create_ephemeral_key_pair(curve_type: str) -> Tuple[PublicKey, SharedKeyGenerator]: - """ - Facilitates ECDH key exchange. - """ + """Facilitates ECDH key exchange.""" if curve_type != "P-256": raise NotImplementedError() diff --git a/libp2p/crypto/keys.py b/libp2p/crypto/keys.py index 5bcc5a37..dff8780a 100644 --- a/libp2p/crypto/keys.py +++ b/libp2p/crypto/keys.py @@ -15,22 +15,16 @@ class KeyType(Enum): class Key(ABC): - """ - A ``Key`` represents a cryptographic key. - """ + """A ``Key`` represents a cryptographic key.""" @abstractmethod def to_bytes(self) -> bytes: - """ - Returns the byte representation of this key. - """ + """Returns the byte representation of this key.""" ... @abstractmethod def get_type(self) -> KeyType: - """ - Returns the ``KeyType`` for ``self``. - """ + """Returns the ``KeyType`` for ``self``.""" ... def __eq__(self, other: object) -> bool: @@ -40,30 +34,23 @@ class Key(ABC): class PublicKey(Key): - """ - A ``PublicKey`` represents a cryptographic public key. - """ + """A ``PublicKey`` represents a cryptographic public key.""" @abstractmethod def verify(self, data: bytes, signature: bytes) -> bool: - """ - Verify that ``signature`` is the cryptographic signature of the hash of ``data``. - """ + """Verify that ``signature`` is the cryptographic signature of the hash + of ``data``.""" ... def _serialize_to_protobuf(self) -> protobuf.PublicKey: - """ - Return the protobuf representation of this ``Key``. - """ + """Return the protobuf representation of this ``Key``.""" key_type = self.get_type().value data = self.to_bytes() protobuf_key = protobuf.PublicKey(key_type=key_type, data=data) return protobuf_key def serialize(self) -> bytes: - """ - Return the canonical serialization of this ``Key``. - """ + """Return the canonical serialization of this ``Key``.""" return self._serialize_to_protobuf().SerializeToString() @classmethod @@ -72,9 +59,7 @@ class PublicKey(Key): class PrivateKey(Key): - """ - A ``PrivateKey`` represents a cryptographic private key. - """ + """A ``PrivateKey`` represents a cryptographic private key.""" @abstractmethod def sign(self, data: bytes) -> bytes: @@ -85,18 +70,14 @@ class PrivateKey(Key): ... def _serialize_to_protobuf(self) -> protobuf.PrivateKey: - """ - Return the protobuf representation of this ``Key``. - """ + """Return the protobuf representation of this ``Key``.""" key_type = self.get_type().value data = self.to_bytes() protobuf_key = protobuf.PrivateKey(key_type=key_type, data=data) return protobuf_key def serialize(self) -> bytes: - """ - Return the canonical serialization of this ``Key``. - """ + """Return the canonical serialization of this ``Key``.""" return self._serialize_to_protobuf().SerializeToString() @classmethod diff --git a/libp2p/crypto/rsa.py b/libp2p/crypto/rsa.py index 9b788cad..2565de4d 100644 --- a/libp2p/crypto/rsa.py +++ b/libp2p/crypto/rsa.py @@ -56,9 +56,10 @@ class RSAPrivateKey(PrivateKey): def create_new_key_pair(bits: int = 2048, e: int = 65537) -> KeyPair: - """ - Returns a new RSA keypair with the requested key size (``bits``) and the given public - exponent ``e``. Sane defaults are provided for both values. + """Returns a new RSA keypair with the requested key size (``bits``) and the + given public exponent ``e``. + + Sane defaults are provided for both values. """ private_key = RSAPrivateKey.new(bits, e) public_key = private_key.get_public_key() diff --git a/libp2p/crypto/secp256k1.py b/libp2p/crypto/secp256k1.py index 475c1673..bf598103 100644 --- a/libp2p/crypto/secp256k1.py +++ b/libp2p/crypto/secp256k1.py @@ -61,9 +61,9 @@ class Secp256k1PrivateKey(PrivateKey): def create_new_key_pair(secret: bytes = None) -> KeyPair: - """ - Returns a new Secp256k1 keypair derived from the provided ``secret``, - a sequence of bytes corresponding to some integer between 0 and the group order. + """Returns a new Secp256k1 keypair derived from the provided ``secret``, a + sequence of bytes corresponding to some integer between 0 and the group + order. A valid secret is created if ``None`` is passed. """ diff --git a/libp2p/exceptions.py b/libp2p/exceptions.py index ce2dc340..bdecfad0 100644 --- a/libp2p/exceptions.py +++ b/libp2p/exceptions.py @@ -3,9 +3,7 @@ class BaseLibp2pError(Exception): class ValidationError(BaseLibp2pError): - """ - Raised when something does not pass a validation check. - """ + """Raised when something does not pass a validation check.""" class ParseError(BaseLibp2pError): diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index b26dd3c7..bca7f57a 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -27,9 +27,10 @@ logger = logging.getLogger("libp2p.network.basic_host") class BasicHost(IHost): - """ - BasicHost is a wrapper of a `INetwork` implementation. It performs protocol negotiation - on a stream with multistream-select right after a stream is initialized. + """BasicHost is a wrapper of a `INetwork` implementation. + + It performs protocol negotiation on a stream with multistream-select + right after a stream is initialized. """ _network: INetwork @@ -86,8 +87,8 @@ class BasicHost(IHost): def set_stream_handler( self, protocol_id: TProtocol, stream_handler: StreamHandlerFn ) -> None: - """ - set stream handler for given `protocol_id` + """set stream handler for given `protocol_id` + :param protocol_id: protocol id used on stream :param stream_handler: a stream handler function """ @@ -118,11 +119,11 @@ class BasicHost(IHost): return net_stream async def connect(self, peer_info: PeerInfo) -> None: - """ - connect ensures there is a connection between this host and the peer with - given `peer_info.peer_id`. connect will absorb the addresses in peer_info into its internal - peerstore. If there is not an active connection, connect will issue a - dial, and block until a connection is opened, or an error is returned. + """connect ensures there is a connection between this host and the peer + with given `peer_info.peer_id`. connect will absorb the addresses in + peer_info into its internal peerstore. If there is not an active + connection, connect will issue a dial, and block until a connection is + opened, or an error is returned. :param peer_info: peer_info of the peer we want to connect to :type peer_info: peer.peerinfo.PeerInfo diff --git a/libp2p/host/exceptions.py b/libp2p/host/exceptions.py index ce7b9723..521f87e3 100644 --- a/libp2p/host/exceptions.py +++ b/libp2p/host/exceptions.py @@ -2,9 +2,7 @@ from libp2p.exceptions import BaseLibp2pError class HostException(BaseLibp2pError): - """ - A generic exception in `IHost`. - """ + """A generic exception in `IHost`.""" class ConnectionFailure(HostException): diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index 5700cdf7..4edc6d01 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -40,8 +40,8 @@ class IHost(ABC): def set_stream_handler( self, protocol_id: TProtocol, stream_handler: StreamHandlerFn ) -> None: - """ - set stream handler for host + """set stream handler for host. + :param protocol_id: protocol id used on stream :param stream_handler: a stream handler function """ @@ -60,11 +60,11 @@ class IHost(ABC): @abstractmethod async def connect(self, peer_info: PeerInfo) -> None: - """ - connect ensures there is a connection between this host and the peer with - given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal - peerstore. If there is not an active connection, connect will issue a - dial, and block until a connection is opened, or an error is returned. + """connect ensures there is a connection between this host and the peer + with given peer_info.peer_id. connect will absorb the addresses in + peer_info into its internal peerstore. If there is not an active + connection, connect will issue a dial, and block until a connection is + opened, or an error is returned. :param peer_info: peer_info of the peer we want to connect to :type peer_info: peer.peerinfo.PeerInfo diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py index 4a9778de..499bb9ba 100644 --- a/libp2p/host/routed_host.py +++ b/libp2p/host/routed_host.py @@ -15,9 +15,9 @@ class RoutedHost(BasicHost): self._router = router async def connect(self, peer_info: PeerInfo) -> None: - """ - connect ensures there is a connection between this host and the peer with - given `peer_info.peer_id`. See (basic_host).connect for more information. + """connect ensures there is a connection between this host and the peer + with given `peer_info.peer_id`. See (basic_host).connect for more + information. RoutedHost's Connect differs in that if the host has no addresses for a given peer, it will use its routing system to try to find some. diff --git a/libp2p/io/exceptions.py b/libp2p/io/exceptions.py index b8e4e011..090c0b1d 100644 --- a/libp2p/io/exceptions.py +++ b/libp2p/io/exceptions.py @@ -6,9 +6,7 @@ class IOException(BaseLibp2pError): class IncompleteReadError(IOException): - """ - Fewer bytes were read than requested. - """ + """Fewer bytes were read than requested.""" class MsgioException(IOException): diff --git a/libp2p/io/msgio.py b/libp2p/io/msgio.py index f60b0ff9..195841fc 100644 --- a/libp2p/io/msgio.py +++ b/libp2p/io/msgio.py @@ -1,5 +1,4 @@ -""" -``msgio`` is an implementation of `https://github.com/libp2p/go-msgio`. +"""``msgio`` is an implementation of `https://github.com/libp2p/go-msgio`. from that repo: "a simple package to r/w length-delimited slices." diff --git a/libp2p/kademlia/__init__.py b/libp2p/kademlia/__init__.py index bce76722..14568595 100644 --- a/libp2p/kademlia/__init__.py +++ b/libp2p/kademlia/__init__.py @@ -1,5 +1,3 @@ -""" -Kademlia is a Python implementation of the Kademlia protocol which -utilizes the asyncio library. -""" +"""Kademlia is a Python implementation of the Kademlia protocol which utilizes +the asyncio library.""" __version__ = "2.0" diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index c649da08..62dd6251 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -8,13 +8,10 @@ log = logging.getLogger(__name__) class SpiderCrawl: - """ - Crawl the network and look for given 160-bit keys. - """ + """Crawl the network and look for given 160-bit keys.""" def __init__(self, protocol, node, peers, ksize, alpha): - """ - Create a new C{SpiderCrawl}er. + """Create a new C{SpiderCrawl}er. Args: protocol: A :class:`~kademlia.protocol.KademliaProtocol` instance. @@ -35,8 +32,7 @@ class SpiderCrawl: self.nearest.push(peers) async def _find(self, rpcmethod): - """ - Get either a value or list of nodes. + """Get either a value or list of nodes. Args: rpcmethod: The protocol's callfindValue or call_find_node. @@ -75,15 +71,11 @@ class ValueSpiderCrawl(SpiderCrawl): self.nearest_without_value = KadPeerHeap(self.node, 1) async def find(self): - """ - Find either the closest nodes or the value requested. - """ + """Find either the closest nodes or the value requested.""" return await self._find(self.protocol.call_find_value) async def _nodes_found(self, responses): - """ - Handle the result of an iteration in _find. - """ + """Handle the result of an iteration in _find.""" toremove = [] found_values = [] for peerid, response in responses.items(): @@ -106,11 +98,11 @@ class ValueSpiderCrawl(SpiderCrawl): return await self.find() async def _handle_found_values(self, values): - """ - We got some values! Exciting. But let's make sure - they're all the same or freak out a little bit. Also, - make sure we tell the nearest node that *didn't* have - the value to store it. + """We got some values! + + Exciting. But let's make sure they're all the same or freak out + a little bit. Also, make sure we tell the nearest node that + *didn't* have the value to store it. """ value_counts = Counter(values) if len(value_counts) != 1: @@ -127,15 +119,11 @@ class ValueSpiderCrawl(SpiderCrawl): class NodeSpiderCrawl(SpiderCrawl): async def find(self): - """ - Find the closest nodes. - """ + """Find the closest nodes.""" return await self._find(self.protocol.call_find_node) async def _nodes_found(self, responses): - """ - Handle the result of an iteration in _find. - """ + """Handle the result of an iteration in _find.""" toremove = [] for peerid, response in responses.items(): response = RPCFindResponse(response) @@ -152,8 +140,7 @@ class NodeSpiderCrawl(SpiderCrawl): class RPCFindResponse: def __init__(self, response): - """ - A wrapper for the result of a RPC find. + """A wrapper for the result of a RPC find. Args: response: This will be a tuple of (, ) @@ -163,9 +150,7 @@ class RPCFindResponse: self.response = response def happened(self): - """ - Did the other host actually respond? - """ + """Did the other host actually respond?""" return self.response[0] def has_value(self): @@ -175,9 +160,9 @@ class RPCFindResponse: return self.response[1]["value"] def get_node_list(self): - """ - Get the node list in the response. If there's no value, this should - be set. + """Get the node list in the response. + + If there's no value, this should be set. """ nodelist = self.response[1] or [] return [create_kad_peerinfo(*nodeple) for nodeple in nodelist] diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py index 346f6714..db75b37e 100644 --- a/libp2p/kademlia/kad_peerinfo.py +++ b/libp2p/kademlia/kad_peerinfo.py @@ -30,9 +30,7 @@ class KadPeerInfo(PeerInfo): return sorted(self.addrs) == sorted(node.addrs) def distance_to(self, node): - """ - Get the distance between this node and another. - """ + """Get the distance between this node and another.""" return self.xor_id ^ node.xor_id def __iter__(self): @@ -56,13 +54,10 @@ class KadPeerInfo(PeerInfo): class KadPeerHeap: - """ - A heap of peers ordered by distance to a given node. - """ + """A heap of peers ordered by distance to a given node.""" def __init__(self, node, maxsize): - """ - Constructor. + """Constructor. @param node: The node to measure all distnaces from. @param maxsize: The maximum size that this heap can grow to. @@ -73,12 +68,13 @@ class KadPeerHeap: self.maxsize = maxsize def remove(self, peers): - """ - Remove a list of peer ids from this heap. Note that while this - heap retains a constant visible size (based on the iterator), it's - actual size may be quite a bit larger than what's exposed. Therefore, - removal of nodes may not change the visible size as previously added - nodes suddenly become visible. + """Remove a list of peer ids from this heap. + + Note that while this heap retains a constant visible size (based + on the iterator), it's actual size may be quite a bit larger + than what's exposed. Therefore, removal of nodes may not change + the visible size as previously added nodes suddenly become + visible. """ peers = set(peers) if not peers: @@ -108,8 +104,7 @@ class KadPeerHeap: return heapq.heappop(self.heap)[1] if self else None def push(self, nodes): - """ - Push nodes onto heap. + """Push nodes onto heap. @param nodes: This can be a single item or a C{list}. """ diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index fcf4b9a8..dbee76c3 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -1,6 +1,4 @@ -""" -Package for interacting on the network at a high level. -""" +"""Package for interacting on the network at a high level.""" import asyncio import logging import pickle @@ -15,16 +13,17 @@ log = logging.getLogger(__name__) class KademliaServer: - """ - High level view of a node instance. This is the object that should be - created to start listening as an active node on the network. + """High level view of a node instance. + + This is the object that should be created to start listening as an + active node on the network. """ protocol_class = KademliaProtocol def __init__(self, ksize=20, alpha=3, node_id=None, storage=None): - """ - Create a server instance. This will start listening on the given port. + """Create a server instance. This will start listening on the given + port. Args: ksize (int): The k parameter from the paper @@ -56,8 +55,7 @@ class KademliaServer: return self.protocol_class(self.node, self.storage, self.ksize) async def listen(self, port, interface="0.0.0.0"): - """ - Start listening on the given port. + """Start listening on the given port. Provide interface="::" to accept ipv6 address """ @@ -77,10 +75,8 @@ class KademliaServer: self.refresh_loop = loop.call_later(3600, self.refresh_table) async def _refresh_table(self): - """ - Refresh buckets that haven't had any lookups in the last hour - (per section 2.3 of the paper). - """ + """Refresh buckets that haven't had any lookups in the last hour (per + section 2.3 of the paper).""" results = [] for node_id in self.protocol.get_refresh_ids(): node = create_kad_peerinfo(node_id) @@ -98,8 +94,7 @@ class KademliaServer: await self.set_digest(dkey, value) def bootstrappable_neighbors(self): - """ - Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for + """Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for use as an argument to the bootstrap method. The server should have been bootstrapped @@ -111,8 +106,8 @@ class KademliaServer: return [tuple(n)[-2:] for n in neighbors] async def bootstrap(self, addrs): - """ - Bootstrap the server by connecting to other known nodes in the network. + """Bootstrap the server by connecting to other known nodes in the + network. Args: addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP @@ -132,8 +127,7 @@ class KademliaServer: return create_kad_peerinfo(result[1], addr[0], addr[1]) if result[0] else None async def get(self, key): - """ - Get a key if the network has it. + """Get a key if the network has it. Returns: :class:`None` if not found, the value otherwise. @@ -153,9 +147,7 @@ class KademliaServer: return await spider.find() async def set(self, key, value): - """ - Set the given string key to the given value in the network. - """ + """Set the given string key to the given value in the network.""" if not check_dht_value_type(value): raise TypeError("Value must be of type int, float, bool, str, or bytes") log.info("setting '%s' = '%s' on network", key, value) @@ -163,9 +155,7 @@ class KademliaServer: return await self.set_digest(dkey, value) async def provide(self, key): - """ - publish to the network that it provides for a particular key - """ + """publish to the network that it provides for a particular key.""" neighbors = self.protocol.router.find_neighbors(self.node) return [ await self.protocol.call_add_provider(n, key, self.node.peer_id_bytes) @@ -173,17 +163,13 @@ class KademliaServer: ] async def get_providers(self, key): - """ - get the list of providers for a key - """ + """get the list of providers for a key.""" neighbors = self.protocol.router.find_neighbors(self.node) return [await self.protocol.call_get_providers(n, key) for n in neighbors] async def set_digest(self, dkey, value): - """ - Set the given SHA1 digest key (bytes) to the given value in the - network. - """ + """Set the given SHA1 digest key (bytes) to the given value in the + network.""" node = create_kad_peerinfo(dkey) nearest = self.protocol.router.find_neighbors(node) @@ -204,10 +190,8 @@ class KademliaServer: return any(await asyncio.gather(*results)) def save_state(self, fname): - """ - Save the state of this node (the alpha/ksize/id/immediate neighbors) - to a cache file with the given fname. - """ + """Save the state of this node (the alpha/ksize/id/immediate neighbors) + to a cache file with the given fname.""" log.info("Saving state to %s", fname) data = { "ksize": self.ksize, @@ -223,10 +207,8 @@ class KademliaServer: @classmethod def load_state(cls, fname): - """ - Load the state of this node (the alpha/ksize/id/immediate neighbors) - from a cache file with the given fname. - """ + """Load the state of this node (the alpha/ksize/id/immediate neighbors) + from a cache file with the given fname.""" log.info("Loading state from %s", fname) with open(fname, "rb") as file: data = pickle.load(file) @@ -236,8 +218,7 @@ class KademliaServer: return svr def save_state_regularly(self, fname, frequency=600): - """ - Save the state of node with a given regularity to the given + """Save the state of node with a given regularity to the given filename. Args: @@ -253,9 +234,7 @@ class KademliaServer: def check_dht_value_type(value): - """ - Checks to see if the type of the value is a valid type for - placing in the dht. - """ + """Checks to see if the type of the value is a valid type for placing in + the dht.""" typeset = [int, float, bool, str, bytes] return type(value) in typeset diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 8606fc2b..311439e0 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -11,15 +11,11 @@ log = logging.getLogger(__name__) class KademliaProtocol(RPCProtocol): - """ - There are four main RPCs in the Kademlia protocol - PING, STORE, FIND_NODE, FIND_VALUE - PING probes if a node is still online - STORE instructs a node to store (key, value) - FIND_NODE takes a 160-bit ID and gets back - (ip, udp_port, node_id) for k closest nodes to target - FIND_VALUE behaves like FIND_NODE unless a value is stored - """ + """There are four main RPCs in the Kademlia protocol PING, STORE, + FIND_NODE, FIND_VALUE PING probes if a node is still online STORE instructs + a node to store (key, value) FIND_NODE takes a 160-bit ID and gets back + (ip, udp_port, node_id) for k closest nodes to target FIND_VALUE behaves + like FIND_NODE unless a value is stored.""" def __init__(self, source_node, storage, ksize): RPCProtocol.__init__(self) @@ -28,9 +24,7 @@ class KademliaProtocol(RPCProtocol): self.source_node = source_node def get_refresh_ids(self): - """ - Get ids to search for to keep old buckets up to date. - """ + """Get ids to search for to keep old buckets up to date.""" ids = [] for bucket in self.router.lonely_buckets(): rid = random.randint(*bucket.range).to_bytes(20, byteorder="big") @@ -75,12 +69,10 @@ class KademliaProtocol(RPCProtocol): return {"value": value} def rpc_add_provider(self, sender, nodeid, key, provider_id): - """ - rpc when receiving an add_provider call - should validate received PeerInfo matches sender nodeid - if it does, receipient must store a record in its datastore - we store a map of content_id to peer_id (non xor) - """ + """rpc when receiving an add_provider call should validate received + PeerInfo matches sender nodeid if it does, receipient must store a + record in its datastore we store a map of content_id to peer_id (non + xor)""" if nodeid == provider_id: log.info( "adding provider %s for key %s in local table", provider_id, str(key) @@ -90,11 +82,9 @@ class KademliaProtocol(RPCProtocol): return False def rpc_get_providers(self, sender, key): - """ - rpc when receiving a get_providers call - should look up key in data store and respond with records - plus a list of closer peers in its routing table - """ + """rpc when receiving a get_providers call should look up key in data + store and respond with records plus a list of closer peers in its + routing table.""" providers = [] record = self.storage.get(key, None) @@ -147,8 +137,7 @@ class KademliaProtocol(RPCProtocol): return self.handle_call_response(result, node_to_ask) def welcome_if_new(self, node): - """ - Given a new node, send it all the keys/values it should be storing, + """Given a new node, send it all the keys/values it should be storing, then add it to the routing table. @param node: A new node that just joined (or that we just found out @@ -177,9 +166,10 @@ class KademliaProtocol(RPCProtocol): self.router.add_contact(node) def handle_call_response(self, result, node): - """ - If we get a response, add the node to the routing table. If - we get no response, make sure it's removed from the routing table. + """If we get a response, add the node to the routing table. + + If we get no response, make sure it's removed from the routing + table. """ if not result[0]: log.warning("no response from %s, removing from router", node) diff --git a/libp2p/kademlia/routing.py b/libp2p/kademlia/routing.py index d0a2b354..6bd5d655 100644 --- a/libp2p/kademlia/routing.py +++ b/libp2p/kademlia/routing.py @@ -8,13 +8,10 @@ from .utils import OrderedSet, bytes_to_bit_string, shared_prefix class KBucket: - """ - each node keeps a list of (ip, udp_port, node_id) - for nodes of distance between 2^i and 2^(i+1) - this list that every node keeps is a k-bucket - each k-bucket implements a last seen eviction - policy except that live nodes are never removed - """ + """each node keeps a list of (ip, udp_port, node_id) for nodes of distance + between 2^i and 2^(i+1) this list that every node keeps is a k-bucket each + k-bucket implements a last seen eviction policy except that live nodes are + never removed.""" def __init__(self, rangeLower, rangeUpper, ksize): self.range = (rangeLower, rangeUpper) @@ -55,9 +52,8 @@ class KBucket: return node.peer_id_bytes not in self.nodes def add_node(self, node): - """ - Add a C{Node} to the C{KBucket}. Return True if successful, - False if the bucket is full. + """Add a C{Node} to the C{KBucket}. Return True if successful, False + if the bucket is full. If the bucket is full, keep track of node in a replacement list, per section 4.1 of the paper. @@ -100,9 +96,7 @@ class TableTraverser: return self def __next__(self): - """ - Pop an item from the left subtree, then right, then left, etc. - """ + """Pop an item from the left subtree, then right, then left, etc.""" if self.current_nodes: return self.current_nodes.pop() @@ -140,10 +134,7 @@ class RoutingTable: self.buckets.insert(index + 1, two) def lonely_buckets(self): - """ - Get all of the buckets that haven't been updated in over - an hour. - """ + """Get all of the buckets that haven't been updated in over an hour.""" hrago = time.monotonic() - 3600 return [b for b in self.buckets if b.last_updated < hrago] @@ -172,9 +163,7 @@ class RoutingTable: asyncio.ensure_future(self.protocol.call_ping(bucket.head())) def get_bucket_for(self, node): - """ - Get the index of the bucket that the given node would fall into. - """ + """Get the index of the bucket that the given node would fall into.""" for index, bucket in enumerate(self.buckets): if node.xor_id < bucket.range[1]: return index diff --git a/libp2p/kademlia/storage.py b/libp2p/kademlia/storage.py index afd30e85..33bdad94 100644 --- a/libp2p/kademlia/storage.py +++ b/libp2p/kademlia/storage.py @@ -6,48 +6,44 @@ import time class IStorage(ABC): - """ - Local storage for this node. - IStorage implementations of get must return the same type as put in by set + """Local storage for this node. + + IStorage implementations of get must return the same type as put in + by set """ @abstractmethod def __setitem__(self, key, value): - """ - Set a key to the given value. - """ + """Set a key to the given value.""" @abstractmethod def __getitem__(self, key): - """ - Get the given key. If item doesn't exist, raises C{KeyError} + """Get the given key. + + If item doesn't exist, raises C{KeyError} """ @abstractmethod def get(self, key, default=None): - """ - Get given key. If not found, return default. + """Get given key. + + If not found, return default. """ @abstractmethod def iter_older_than(self, seconds_old): - """ - Return the an iterator over (key, value) tuples for items older - than the given seconds_old. - """ + """Return the an iterator over (key, value) tuples for items older than + the given seconds_old.""" @abstractmethod def __iter__(self): - """ - Get the iterator for this storage, should yield tuple of (key, value) - """ + """Get the iterator for this storage, should yield tuple of (key, + value)""" class ForgetfulStorage(IStorage): def __init__(self, ttl=604800): - """ - By default, max age is a week. - """ + """By default, max age is a week.""" self.data = OrderedDict() self.ttl = ttl diff --git a/libp2p/kademlia/utils.py b/libp2p/kademlia/utils.py index 78ff0827..a547ccee 100644 --- a/libp2p/kademlia/utils.py +++ b/libp2p/kademlia/utils.py @@ -1,6 +1,4 @@ -""" -General catchall for functions that don't make sense as methods. -""" +"""General catchall for functions that don't make sense as methods.""" import asyncio import hashlib import operator @@ -19,8 +17,8 @@ def digest(string): class OrderedSet(list): - """ - Acts like a list in all ways, except in the behavior of the + """Acts like a list in all ways, except in the behavior of the. + :meth:`push` method. """ @@ -35,8 +33,7 @@ class OrderedSet(list): def shared_prefix(args): - """ - Find the shared prefix between the strings. + """Find the shared prefix between the strings. For instance: diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index f193f435..5606e80c 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -24,9 +24,7 @@ class RawConnection(IRawConnection): self._drain_lock = asyncio.Lock() async def write(self, data: bytes) -> None: - """ - Raise `RawConnError` if the underlying connection breaks - """ + """Raise `RawConnError` if the underlying connection breaks.""" try: self.writer.write(data) except ConnectionResetError as error: @@ -41,9 +39,8 @@ class RawConnection(IRawConnection): raise RawConnError(error) async def read(self, n: int = -1) -> bytes: - """ - Read up to ``n`` bytes from the underlying stream. - This call is delegated directly to the underlying ``self.reader``. + """Read up to ``n`` bytes from the underlying stream. This call is + delegated directly to the underlying ``self.reader``. Raise `RawConnError` if the underlying connection breaks """ diff --git a/libp2p/network/connection/raw_connection_interface.py b/libp2p/network/connection/raw_connection_interface.py index 94951afb..cda23eec 100644 --- a/libp2p/network/connection/raw_connection_interface.py +++ b/libp2p/network/connection/raw_connection_interface.py @@ -2,8 +2,6 @@ from libp2p.io.abc import ReadWriteCloser class IRawConnection(ReadWriteCloser): - """ - A Raw Connection provides a Reader and a Writer - """ + """A Raw Connection provides a Reader and a Writer.""" initiator: bool diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 94ddba2c..9a41a0ba 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -29,8 +29,7 @@ class INetwork(ABC): @abstractmethod async def dial_peer(self, peer_id: ID) -> INetConn: - """ - dial_peer try to create a connection to peer_id + """dial_peer try to create a connection to peer_id. :param peer_id: peer if we want to dial :raises SwarmException: raised when an error occurs @@ -47,9 +46,7 @@ class INetwork(ABC): @abstractmethod def set_stream_handler(self, stream_handler: StreamHandlerFn) -> None: - """ - Set the stream handler for all incoming streams. - """ + """Set the stream handler for all incoming streams.""" @abstractmethod async def listen(self, *multiaddrs: Sequence[Multiaddr]) -> bool: diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 0142721c..5ef65d55 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -38,8 +38,8 @@ class NetStream(INetStream): self.protocol_id = protocol_id async def read(self, n: int = -1) -> bytes: - """ - reads from stream + """reads from stream. + :param n: number of bytes to read :return: bytes of input """ @@ -51,8 +51,8 @@ class NetStream(INetStream): raise StreamReset from error async def write(self, data: bytes) -> int: - """ - write to stream + """write to stream. + :return: number of bytes written """ try: @@ -61,9 +61,7 @@ class NetStream(INetStream): raise StreamClosed from error async def close(self) -> None: - """ - close stream - """ + """close stream.""" await self.muxed_stream.close() async def reset(self) -> None: diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index 41bf4239..66c611c4 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -23,6 +23,4 @@ class INetStream(ReadWriteCloser): @abstractmethod async def reset(self) -> None: - """ - Close both ends of the stream. - """ + """Close both ends of the stream.""" diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 90c9475a..2ef61157 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -69,8 +69,8 @@ class Swarm(INetwork): self.common_stream_handler = stream_handler async def dial_peer(self, peer_id: ID) -> INetConn: - """ - dial_peer try to create a connection to peer_id + """dial_peer try to create a connection to peer_id. + :param peer_id: peer if we want to dial :raises SwarmException: raised when an error occurs :return: muxed connection @@ -254,10 +254,9 @@ class Swarm(INetwork): logger.debug("successfully close the connection to peer %s", peer_id) async def add_conn(self, muxed_conn: IMuxedConn) -> SwarmConn: - """ - Add a `IMuxedConn` to `Swarm` as a `SwarmConn`, notify "connected", - and start to monitor the connection for its new streams and disconnection. - """ + """Add a `IMuxedConn` to `Swarm` as a `SwarmConn`, notify "connected", + and start to monitor the connection for its new streams and + disconnection.""" swarm_conn = SwarmConn(muxed_conn, self) # Store muxed_conn with peer id self.connections[muxed_conn.peer_id] = swarm_conn @@ -267,9 +266,8 @@ class Swarm(INetwork): return swarm_conn def remove_conn(self, swarm_conn: SwarmConn) -> None: - """ - Simply remove the connection from Swarm's records, without closing the connection. - """ + """Simply remove the connection from Swarm's records, without closing + the connection.""" peer_id = swarm_conn.muxed_conn.peer_id if peer_id not in self.connections: return diff --git a/libp2p/peer/addrbook_interface.py b/libp2p/peer/addrbook_interface.py index 0ba26145..a0c4d78b 100644 --- a/libp2p/peer/addrbook_interface.py +++ b/libp2p/peer/addrbook_interface.py @@ -12,8 +12,8 @@ class IAddrBook(ABC): @abstractmethod def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int) -> None: - """ - Calls add_addrs(peer_id, [addr], ttl) + """Calls add_addrs(peer_id, [addr], ttl) + :param peer_id: the peer to add address for :param addr: multiaddress of the peer :param ttl: time-to-live for the address (after this time, address is no longer valid) @@ -21,10 +21,11 @@ class IAddrBook(ABC): @abstractmethod def add_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int) -> None: - """ - Adds addresses for a given peer all with the same time-to-live. If one of the - addresses already exists for the peer and has a longer TTL, no operation should take place. - If one of the addresses exists with a shorter TTL, extend the TTL to equal param ttl. + """Adds addresses for a given peer all with the same time-to-live. If + one of the addresses already exists for the peer and has a longer TTL, + no operation should take place. If one of the addresses exists with a + shorter TTL, extend the TTL to equal param ttl. + :param peer_id: the peer to add address for :param addr: multiaddresses of the peer :param ttl: time-to-live for the address (after this time, address is no longer valid @@ -39,8 +40,8 @@ class IAddrBook(ABC): @abstractmethod def clear_addrs(self, peer_id: ID) -> None: - """ - Removes all previously stored addresses + """Removes all previously stored addresses. + :param peer_id: peer to remove addresses of """ diff --git a/libp2p/peer/peerdata.py b/libp2p/peer/peerdata.py index 3ee4c4ce..9273079d 100644 --- a/libp2p/peer/peerdata.py +++ b/libp2p/peer/peerdata.py @@ -44,4 +44,4 @@ class PeerData(IPeerData): class PeerDataError(KeyError): - """Raised when a key is not found in peer metadata""" + """Raised when a key is not found in peer metadata.""" diff --git a/libp2p/peer/peerdata_interface.py b/libp2p/peer/peerdata_interface.py index ca7c2546..e842acb6 100644 --- a/libp2p/peer/peerdata_interface.py +++ b/libp2p/peer/peerdata_interface.py @@ -39,9 +39,7 @@ class IPeerData(ABC): @abstractmethod def clear_addrs(self) -> None: - """ - Clear all addresses - """ + """Clear all addresses.""" @abstractmethod def put_metadata(self, key: str, val: Any) -> None: diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index c1eae370..4ec02e4c 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -17,10 +17,9 @@ class PeerStore(IPeerStore): self.peer_map = {} def __create_or_get_peer(self, peer_id: ID) -> PeerData: - """ - Returns the peer data for peer_id or creates a new - peer data (and stores it in peer_map) if peer - data for peer_id does not yet exist + """Returns the peer data for peer_id or creates a new peer data (and + stores it in peer_map) if peer data for peer_id does not yet exist. + :param peer_id: peer ID :return: peer data """ @@ -93,4 +92,4 @@ class PeerStore(IPeerStore): class PeerStoreError(KeyError): - """Raised when peer ID is not found in peer store""" + """Raised when peer ID is not found in peer store.""" diff --git a/libp2p/protocol_muxer/exceptions.py b/libp2p/protocol_muxer/exceptions.py index a34e318c..ffcc5d57 100644 --- a/libp2p/protocol_muxer/exceptions.py +++ b/libp2p/protocol_muxer/exceptions.py @@ -2,12 +2,12 @@ from libp2p.exceptions import BaseLibp2pError class MultiselectCommunicatorError(BaseLibp2pError): - """Raised when an error occurs during read/write via communicator""" + """Raised when an error occurs during read/write via communicator.""" class MultiselectError(BaseLibp2pError): - """Raised when an error occurs in multiselect process""" + """Raised when an error occurs in multiselect process.""" class MultiselectClientError(BaseLibp2pError): - """Raised when an error occurs in protocol selection process""" + """Raised when an error occurs in protocol selection process.""" diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 88f7e37e..b333388e 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -11,11 +11,9 @@ PROTOCOL_NOT_FOUND_MSG = "na" class Multiselect(IMultiselectMuxer): - """ - Multiselect module that is responsible for responding to - a multiselect client and deciding on - a specific protocol and handler pair to use for communication - """ + """Multiselect module that is responsible for responding to a multiselect + client and deciding on a specific protocol and handler pair to use for + communication.""" handlers: Dict[TProtocol, StreamHandlerFn] @@ -23,8 +21,8 @@ class Multiselect(IMultiselectMuxer): self.handlers = {} def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None: - """ - Store the handler with the given protocol + """Store the handler with the given protocol. + :param protocol: protocol name :param handler: handler function """ @@ -33,8 +31,8 @@ class Multiselect(IMultiselectMuxer): async def negotiate( self, communicator: IMultiselectCommunicator ) -> Tuple[TProtocol, StreamHandlerFn]: - """ - Negotiate performs protocol selection + """Negotiate performs protocol selection. + :param stream: stream to negotiate on :return: selected protocol name, handler function :raise MultiselectError: raised when negotiation failed @@ -65,8 +63,8 @@ class Multiselect(IMultiselectMuxer): raise MultiselectError(error) async def handshake(self, communicator: IMultiselectCommunicator) -> None: - """ - Perform handshake to agree on multiselect protocol + """Perform handshake to agree on multiselect protocol. + :param communicator: communicator to use :raise MultiselectError: raised when handshake failed """ @@ -88,8 +86,8 @@ class Multiselect(IMultiselectMuxer): def validate_handshake(handshake_contents: str) -> bool: - """ - Determine if handshake is valid and should be confirmed + """Determine if handshake is valid and should be confirmed. + :param handshake_contents: contents of handshake message :return: true if handshake is complete, false otherwise """ diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 07ea5e7f..e688875d 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -11,15 +11,13 @@ PROTOCOL_NOT_FOUND_MSG = "na" class MultiselectClient(IMultiselectClient): - """ - Client for communicating with receiver's multiselect - module in order to select a protocol id to communicate over - """ + """Client for communicating with receiver's multiselect module in order to + select a protocol id to communicate over.""" async def handshake(self, communicator: IMultiselectCommunicator) -> None: - """ - Ensure that the client and multiselect - are both using the same multiselect protocol + """Ensure that the client and multiselect are both using the same + multiselect protocol. + :param stream: stream to communicate with multiselect over :raise MultiselectClientError: raised when handshake failed """ @@ -39,10 +37,10 @@ class MultiselectClient(IMultiselectClient): async def select_one_of( self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator ) -> TProtocol: - """ - For each protocol, send message to multiselect selecting protocol + """For each protocol, send message to multiselect selecting protocol and fail if multiselect does not return same protocol. Returns first protocol that multiselect agrees on (i.e. that multiselect selects) + :param protocol: protocol to select :param stream: stream to communicate with multiselect over :return: selected protocol @@ -62,8 +60,8 @@ class MultiselectClient(IMultiselectClient): async def try_select( self, communicator: IMultiselectCommunicator, protocol: TProtocol ) -> TProtocol: - """ - Try to select the given protocol or raise exception if fails + """Try to select the given protocol or raise exception if fails. + :param communicator: communicator to use to communicate with counterparty :param protocol: protocol to select :raise MultiselectClientError: raised when protocol negotiation failed @@ -87,8 +85,8 @@ class MultiselectClient(IMultiselectClient): def validate_handshake(handshake_contents: str) -> bool: - """ - Determine if handshake is valid and should be confirmed + """Determine if handshake is valid and should be confirmed. + :param handshake_contents: contents of handshake message :return: true if handshake is complete, false otherwise """ diff --git a/libp2p/protocol_muxer/multiselect_client_interface.py b/libp2p/protocol_muxer/multiselect_client_interface.py index e51c01fa..7cf7c72d 100644 --- a/libp2p/protocol_muxer/multiselect_client_interface.py +++ b/libp2p/protocol_muxer/multiselect_client_interface.py @@ -8,15 +8,13 @@ from libp2p.typing import TProtocol class IMultiselectClient(ABC): - """ - Client for communicating with receiver's multiselect - module in order to select a protocol id to communicate over - """ + """Client for communicating with receiver's multiselect module in order to + select a protocol id to communicate over.""" async def handshake(self, communicator: IMultiselectCommunicator) -> None: - """ - Ensure that the client and multiselect - are both using the same multiselect protocol + """Ensure that the client and multiselect are both using the same + multiselect protocol. + :param stream: stream to communicate with multiselect over :raise Exception: multiselect protocol ID mismatch """ @@ -25,10 +23,10 @@ class IMultiselectClient(ABC): async def select_one_of( self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator ) -> TProtocol: - """ - For each protocol, send message to multiselect selecting protocol + """For each protocol, send message to multiselect selecting protocol and fail if multiselect does not return same protocol. Returns first protocol that multiselect agrees on (i.e. that multiselect selects) + :param protocol: protocol to select :param stream: stream to communicate with multiselect over :return: selected protocol @@ -37,8 +35,8 @@ class IMultiselectClient(ABC): async def try_select( self, communicator: IMultiselectCommunicator, protocol: TProtocol ) -> TProtocol: - """ - Try to select the given protocol or raise exception if fails + """Try to select the given protocol or raise exception if fails. + :param communicator: communicator to use to communicate with counterparty :param protocol: protocol to select :raise Exception: error in protocol selection diff --git a/libp2p/protocol_muxer/multiselect_communicator_interface.py b/libp2p/protocol_muxer/multiselect_communicator_interface.py index d72142bb..639a175b 100644 --- a/libp2p/protocol_muxer/multiselect_communicator_interface.py +++ b/libp2p/protocol_muxer/multiselect_communicator_interface.py @@ -2,21 +2,17 @@ from abc import ABC, abstractmethod class IMultiselectCommunicator(ABC): - """ - Communicator helper class that ensures both the client - and multistream module will follow the same multistream protocol, - which is necessary for them to work - """ + """Communicator helper class that ensures both the client and multistream + module will follow the same multistream protocol, which is necessary for + them to work.""" @abstractmethod async def write(self, msg_str: str) -> None: - """ - Write message to stream + """Write message to stream. + :param msg_str: message to write """ @abstractmethod async def read(self) -> str: - """ - Reads message from stream until EOF - """ + """Reads message from stream until EOF.""" diff --git a/libp2p/protocol_muxer/multiselect_muxer_interface.py b/libp2p/protocol_muxer/multiselect_muxer_interface.py index acfbdbdc..239f01ad 100644 --- a/libp2p/protocol_muxer/multiselect_muxer_interface.py +++ b/libp2p/protocol_muxer/multiselect_muxer_interface.py @@ -7,18 +7,16 @@ from .multiselect_communicator_interface import IMultiselectCommunicator class IMultiselectMuxer(ABC): - """ - Multiselect module that is responsible for responding to - a multiselect client and deciding on - a specific protocol and handler pair to use for communication - """ + """Multiselect module that is responsible for responding to a multiselect + client and deciding on a specific protocol and handler pair to use for + communication.""" handlers: Dict[TProtocol, StreamHandlerFn] @abstractmethod def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None: - """ - Store the handler with the given protocol + """Store the handler with the given protocol. + :param protocol: protocol name :param handler: handler function """ @@ -27,8 +25,8 @@ class IMultiselectMuxer(ABC): async def negotiate( self, communicator: IMultiselectCommunicator ) -> Tuple[TProtocol, StreamHandlerFn]: - """ - Negotiate performs protocol selection + """Negotiate performs protocol selection. + :param stream: stream to negotiate on :return: selected protocol name, handler function :raise Exception: negotiation failed exception diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 042b13fe..8f35dc0e 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -31,36 +31,35 @@ class FloodSub(IPubsubRouter): return self.protocols def attach(self, pubsub: Pubsub) -> None: - """ - Attach is invoked by the PubSub constructor to attach the router to a - freshly initialized PubSub instance. + """Attach is invoked by the PubSub constructor to attach the router to + a freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to """ self.pubsub = pubsub def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: - """ - Notifies the router that a new peer has been connected + """Notifies the router that a new peer has been connected. + :param peer_id: id of peer to add """ def remove_peer(self, peer_id: ID) -> None: - """ - Notifies the router that a peer has been disconnected + """Notifies the router that a peer has been disconnected. + :param peer_id: id of peer to remove """ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: - """ - Invoked to process control messages in the RPC envelope. - It is invoked after subscriptions and payload messages have been processed + """Invoked to process control messages in the RPC envelope. It is + invoked after subscriptions and payload messages have been processed. + :param rpc: rpc message """ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: - """ - Invoked to forward a new message that has been validated. - This is where the "flooding" part of floodsub happens + """Invoked to forward a new message that has been validated. This is + where the "flooding" part of floodsub happens. With flooding, routing is almost trivial: for each incoming message, forward to all known peers in the topic. There is a bit of logic, @@ -88,25 +87,24 @@ class FloodSub(IPubsubRouter): await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) async def join(self, topic: str) -> None: - """ - Join notifies the router that we want to receive and - forward messages in a topic. It is invoked after the - subscription announcement + """Join notifies the router that we want to receive and forward + messages in a topic. It is invoked after the subscription announcement. + :param topic: topic to join """ async def leave(self, topic: str) -> None: - """ - Leave notifies the router that we are no longer interested in a topic. - It is invoked after the unsubscription announcement. + """Leave notifies the router that we are no longer interested in a + topic. It is invoked after the unsubscription announcement. + :param topic: topic to leave """ def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID ) -> Iterable[ID]: - """ - Get the eligible peers to send the data to. + """Get the eligible peers to send the data to. + :param msg_forwarder: peer ID of the peer who forwards the message to us. :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 3b73f0e9..2d83cc43 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -94,9 +94,9 @@ class GossipSub(IPubsubRouter): return self.protocols def attach(self, pubsub: Pubsub) -> None: - """ - Attach is invoked by the PubSub constructor to attach the router to a - freshly initialized PubSub instance. + """Attach is invoked by the PubSub constructor to attach the router to + a freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to """ self.pubsub = pubsub @@ -108,8 +108,8 @@ class GossipSub(IPubsubRouter): asyncio.ensure_future(self.heartbeat()) def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: - """ - Notifies the router that a new peer has been connected + """Notifies the router that a new peer has been connected. + :param peer_id: id of peer to add :param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub """ @@ -129,8 +129,8 @@ class GossipSub(IPubsubRouter): self.peers_to_protocol[peer_id] = protocol_id def remove_peer(self, peer_id: ID) -> None: - """ - Notifies the router that a peer has been disconnected + """Notifies the router that a peer has been disconnected. + :param peer_id: id of peer to remove """ logger.debug("removing peer %s", peer_id) @@ -144,9 +144,9 @@ class GossipSub(IPubsubRouter): del self.peers_to_protocol[peer_id] async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: - """ - Invoked to process control messages in the RPC envelope. - It is invoked after subscriptions and payload messages have been processed + """Invoked to process control messages in the RPC envelope. It is + invoked after subscriptions and payload messages have been processed. + :param rpc: RPC message :param sender_peer_id: id of the peer who sent the message """ @@ -167,9 +167,7 @@ class GossipSub(IPubsubRouter): await self.handle_prune(prune, sender_peer_id) async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: - """ - Invoked to forward a new message that has been validated. - """ + """Invoked to forward a new message that has been validated.""" self.mcache.put(pubsub_msg) peers_gen = self._get_peers_to_send( @@ -191,8 +189,8 @@ class GossipSub(IPubsubRouter): def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID ) -> Iterable[ID]: - """ - Get the eligible peers to send the data to. + """Get the eligible peers to send the data to. + :param msg_forwarder: the peer id of the peer who forwards the message to me. :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. @@ -233,10 +231,9 @@ class GossipSub(IPubsubRouter): async def join(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec - """ - Join notifies the router that we want to receive and - forward messages in a topic. It is invoked after the - subscription announcement + """Join notifies the router that we want to receive and forward + messages in a topic. It is invoked after the subscription announcement. + :param topic: topic to join """ logger.debug("joining topic %s", topic) @@ -271,9 +268,9 @@ class GossipSub(IPubsubRouter): async def leave(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec - """ - Leave notifies the router that we are no longer interested in a topic. - It is invoked after the unsubscription announcement. + """Leave notifies the router that we are no longer interested in a + topic. It is invoked after the unsubscription announcement. + :param topic: topic to leave """ logger.debug("leaving topic %s", topic) @@ -289,8 +286,8 @@ class GossipSub(IPubsubRouter): # Heartbeat async def heartbeat(self) -> None: - """ - Call individual heartbeats. + """Call individual heartbeats. + Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat """ @@ -453,9 +450,8 @@ class GossipSub(IPubsubRouter): async def handle_ihave( self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID ) -> None: - """ - Checks the seen set and requests unknown messages with an IWANT message. - """ + """Checks the seen set and requests unknown messages with an IWANT + message.""" # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache seen_seqnos_and_peers = [ seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys() @@ -477,9 +473,8 @@ class GossipSub(IPubsubRouter): async def handle_iwant( self, iwant_msg: rpc_pb2.ControlIWant, sender_peer_id: ID ) -> None: - """ - Forwards all request messages that are present in mcache to the requesting peer. - """ + """Forwards all request messages that are present in mcache to the + requesting peer.""" # FIXME: Update type of message ID # FIXME: Find a better way to parse the msg ids msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs] @@ -536,9 +531,7 @@ class GossipSub(IPubsubRouter): # RPC emitters async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None: - """ - Emit ihave message, sent to to_peer, for topic and msg_ids - """ + """Emit ihave message, sent to to_peer, for topic and msg_ids.""" ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave() ihave_msg.messageIDs.extend(msg_ids) @@ -550,9 +543,7 @@ class GossipSub(IPubsubRouter): await self.emit_control_message(control_msg, to_peer) async def emit_iwant(self, msg_ids: Any, to_peer: ID) -> None: - """ - Emit iwant message, sent to to_peer, for msg_ids - """ + """Emit iwant message, sent to to_peer, for msg_ids.""" iwant_msg: rpc_pb2.ControlIWant = rpc_pb2.ControlIWant() iwant_msg.messageIDs.extend(msg_ids) @@ -563,9 +554,7 @@ class GossipSub(IPubsubRouter): await self.emit_control_message(control_msg, to_peer) async def emit_graft(self, topic: str, to_peer: ID) -> None: - """ - Emit graft message, sent to to_peer, for topic - """ + """Emit graft message, sent to to_peer, for topic.""" graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft() graft_msg.topicID = topic @@ -576,9 +565,7 @@ class GossipSub(IPubsubRouter): await self.emit_control_message(control_msg, to_peer) async def emit_prune(self, topic: str, to_peer: ID) -> None: - """ - Emit graft message, sent to to_peer, for topic - """ + """Emit graft message, sent to to_peer, for topic.""" prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() prune_msg.topicID = topic diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py index 159047b9..f4be902d 100644 --- a/libp2p/pubsub/mcache.py +++ b/libp2p/pubsub/mcache.py @@ -13,8 +13,8 @@ class CacheEntry: """ def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None: - """ - Constructor. + """Constructor. + :param mid: (seqno, from_id) of the msg :param topics: list of topics this message was sent on """ @@ -32,8 +32,8 @@ class MessageCache: history: List[List[CacheEntry]] def __init__(self, window_size: int, history_size: int) -> None: - """ - Constructor. + """Constructor. + :param window_size: Size of the window desired. :param history_size: Size of the history desired. :return: the MessageCache @@ -49,8 +49,8 @@ class MessageCache: self.history = [[] for _ in range(history_size)] def put(self, msg: rpc_pb2.Message) -> None: - """ - Put a message into the mcache. + """Put a message into the mcache. + :param msg: The rpc message to put in. Should contain seqno and from_id """ mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id) @@ -59,8 +59,8 @@ class MessageCache: self.history[0].append(CacheEntry(mid, msg.topicIDs)) def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]: - """ - Get a message from the mcache. + """Get a message from the mcache. + :param mid: (seqno, from_id) of the message to get. :return: The rpc message associated with this mid """ @@ -70,8 +70,8 @@ class MessageCache: return None def window(self, topic: str) -> List[Tuple[bytes, bytes]]: - """ - Get the window for this topic. + """Get the window for this topic. + :param topic: Topic whose message ids we desire. :return: List of mids in the current window. """ @@ -86,9 +86,8 @@ class MessageCache: return mids def shift(self) -> None: - """ - Shift the window over by 1 position, dropping the last element of the history. - """ + """Shift the window over by 1 position, dropping the last element of + the history.""" last_entries: List[CacheEntry] = self.history[len(self.history) - 1] for entry in last_entries: diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 51af1765..f65e1bf8 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -81,12 +81,14 @@ class Pubsub: def __init__( self, host: IHost, router: "IPubsubRouter", my_id: ID, cache_size: int = None ) -> None: - """ - Construct a new Pubsub object, which is responsible for handling all + """Construct a new Pubsub object, which is responsible for handling all Pubsub-related messages and relaying messages as appropriate to the - Pubsub router (which is responsible for choosing who to send messages to). + Pubsub router (which is responsible for choosing who to send messages + to). + Since the logic for choosing peers to send pubsub messages to is - in the router, the same Pubsub impl can back floodsub, gossipsub, etc. + in the router, the same Pubsub impl can back floodsub, + gossipsub, etc. """ self.host = host self.router = router @@ -136,10 +138,8 @@ class Pubsub: asyncio.ensure_future(self.handle_peer_queue()) def get_hello_packet(self) -> rpc_pb2.RPC: - """ - Generate subscription message with all topics we are subscribed to - only send hello packet if we have subscribed topics - """ + """Generate subscription message with all topics we are subscribed to + only send hello packet if we have subscribed topics.""" packet = rpc_pb2.RPC() for topic_id in self.my_topics: packet.subscriptions.extend( @@ -148,9 +148,9 @@ class Pubsub: return packet async def continuously_read_stream(self, stream: INetStream) -> None: - """ - Read from input stream in an infinite loop. Process - messages from other nodes + """Read from input stream in an infinite loop. Process messages from + other nodes. + :param stream: stream to continously read from """ peer_id = stream.muxed_conn.peer_id @@ -207,8 +207,9 @@ class Pubsub: def set_topic_validator( self, topic: str, validator: ValidatorFn, is_async_validator: bool ) -> None: - """ - Register a validator under the given topic. One topic can only have one validtor. + """Register a validator under the given topic. One topic can only have + one validtor. + :param topic: the topic to register validator under :param validator: the validator used to validate messages published to the topic :param is_async_validator: indicate if the validator is an asynchronous validator @@ -216,16 +217,16 @@ class Pubsub: self.topic_validators[topic] = TopicValidator(validator, is_async_validator) def remove_topic_validator(self, topic: str) -> None: - """ - Remove the validator from the given topic. + """Remove the validator from the given topic. + :param topic: the topic to remove validator from """ if topic in self.topic_validators: del self.topic_validators[topic] def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: - """ - Get all validators corresponding to the topics in the message. + """Get all validators corresponding to the topics in the message. + :param msg: the message published to the topic """ return tuple( @@ -235,9 +236,9 @@ class Pubsub: ) async def stream_handler(self, stream: INetStream) -> None: - """ - Stream handler for pubsub. Gets invoked whenever a new stream is created - on one of the supported pubsub protocols. + """Stream handler for pubsub. Gets invoked whenever a new stream is + created on one of the supported pubsub protocols. + :param stream: newly created stream """ try: @@ -290,10 +291,10 @@ class Pubsub: def handle_subscription( self, origin_id: ID, sub_message: rpc_pb2.RPC.SubOpts ) -> None: - """ - Handle an incoming subscription message from a peer. Update internal + """Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as - defined in the subscription message + defined in the subscription message. + :param origin_id: id of the peer who subscribe to the message :param sub_message: RPC.SubOpts """ @@ -310,8 +311,8 @@ class Pubsub: # FIXME(mhchia): Change the function name? async def handle_talk(self, publish_message: rpc_pb2.Message) -> None: - """ - Put incoming message from a peer onto my blocking queue + """Put incoming message from a peer onto my blocking queue. + :param publish_message: RPC.Message format """ @@ -324,8 +325,8 @@ class Pubsub: await self.my_topics[topic].put(publish_message) async def subscribe(self, topic_id: str) -> "asyncio.Queue[rpc_pb2.Message]": - """ - Subscribe ourself to a topic + """Subscribe ourself to a topic. + :param topic_id: topic_id to subscribe to """ @@ -354,8 +355,8 @@ class Pubsub: return self.my_topics[topic_id] async def unsubscribe(self, topic_id: str) -> None: - """ - Unsubscribe ourself from a topic + """Unsubscribe ourself from a topic. + :param topic_id: topic_id to unsubscribe from """ @@ -380,8 +381,8 @@ class Pubsub: await self.router.leave(topic_id) async def message_all_peers(self, raw_msg: bytes) -> None: - """ - Broadcast a message to peers + """Broadcast a message to peers. + :param raw_msg: raw contents of the message to broadcast """ @@ -391,8 +392,8 @@ class Pubsub: await stream.write(encode_varint_prefixed(raw_msg)) async def publish(self, topic_id: str, data: bytes) -> None: - """ - Publish data to a topic + """Publish data to a topic. + :param topic_id: topic which we are going to publish the data to :param data: data which we are publishing """ @@ -411,8 +412,8 @@ class Pubsub: logger.debug("successfully published message %s", msg) async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: - """ - Validate the received message + """Validate the received message. + :param msg_forwarder: the peer who forward us the message. :param msg: the message. """ @@ -440,8 +441,8 @@ class Pubsub: raise ValidationError(f"Validation failed for msg={msg}") async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: - """ - Push a pubsub message to others. + """Push a pubsub message to others. + :param msg_forwarder: the peer who forward us the message. :param msg: the message we are going to push out. """ @@ -481,9 +482,7 @@ class Pubsub: await self.router.publish(msg_forwarder, msg) def _next_seqno(self) -> bytes: - """ - Make the next message sequence id. - """ + """Make the next message sequence id.""" self.counter += 1 return self.counter.to_bytes(8, "big") diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 85c0bd8d..e148ee82 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -30,9 +30,10 @@ class PubsubNotifee(INotifee): pass async def connected(self, network: INetwork, conn: INetConn) -> None: - """ - Add peer_id to initiator_peers_queue, so that this peer_id can be used to - create a stream and we only want to have one pubsub stream with each peer. + """Add peer_id to initiator_peers_queue, so that this peer_id can be + used to create a stream and we only want to have one pubsub stream with + each peer. + :param network: network the connection was opened on :param conn: connection that was opened """ diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 5534c297..f6e6cbc3 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -19,23 +19,23 @@ class IPubsubRouter(ABC): @abstractmethod def attach(self, pubsub: "Pubsub") -> None: - """ - Attach is invoked by the PubSub constructor to attach the router to a - freshly initialized PubSub instance. + """Attach is invoked by the PubSub constructor to attach the router to + a freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to """ @abstractmethod def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: - """ - Notifies the router that a new peer has been connected + """Notifies the router that a new peer has been connected. + :param peer_id: id of peer to add """ @abstractmethod def remove_peer(self, peer_id: ID) -> None: - """ - Notifies the router that a peer has been disconnected + """Notifies the router that a peer has been disconnected. + :param peer_id: id of peer to remove """ @@ -53,25 +53,24 @@ class IPubsubRouter(ABC): # FIXME: Should be changed to type 'peer.ID' @abstractmethod async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: - """ - Invoked to forward a new message that has been validated + """Invoked to forward a new message that has been validated. + :param msg_forwarder: peer_id of message sender :param pubsub_msg: pubsub message to forward """ @abstractmethod async def join(self, topic: str) -> None: - """ - Join notifies the router that we want to receive and - forward messages in a topic. It is invoked after the - subscription announcement + """Join notifies the router that we want to receive and forward + messages in a topic. It is invoked after the subscription announcement. + :param topic: topic to join """ @abstractmethod async def leave(self, topic: str) -> None: - """ - Leave notifies the router that we are no longer interested in a topic. - It is invoked after the unsubscription announcement. + """Leave notifies the router that we are no longer interested in a + topic. It is invoked after the unsubscription announcement. + :param topic: topic to leave """ diff --git a/libp2p/pubsub/validators.py b/libp2p/pubsub/validators.py index e575980d..7f2a8181 100644 --- a/libp2p/pubsub/validators.py +++ b/libp2p/pubsub/validators.py @@ -1,7 +1,7 @@ # FIXME: Replace the type of `pubkey` with a custom type `Pubkey` def signature_validator(pubkey: bytes, msg: bytes) -> bool: - """ - Verify the message against the given public key. + """Verify the message against the given public key. + :param pubkey: the public key which signs the message. :param msg: the message signed. """ diff --git a/libp2p/routing/interfaces.py b/libp2p/routing/interfaces.py index 4c2cd864..c79a58d8 100644 --- a/libp2p/routing/interfaces.py +++ b/libp2p/routing/interfaces.py @@ -8,25 +8,21 @@ from libp2p.peer.peerinfo import PeerInfo class IContentRouting(ABC): @abstractmethod def provide(self, cid: bytes, announce: bool = True) -> None: - """ - Provide adds the given cid to the content routing system. If announce is True, - it also announces it, otherwise it is just kept in the local - accounting of which objects are being provided. + """Provide adds the given cid to the content routing system. + + If announce is True, it also announces it, otherwise it is just + kept in the local accounting of which objects are being + provided. """ @abstractmethod def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]: - """ - Search for peers who are able to provide a given key - returns an iterator of peer.PeerInfo - """ + """Search for peers who are able to provide a given key returns an + iterator of peer.PeerInfo.""" class IPeerRouting(ABC): @abstractmethod async def find_peer(self, peer_id: ID) -> PeerInfo: - """ - Find specific Peer - FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo - with relevant addresses. - """ + """Find specific Peer FindPeer searches for a peer with given peer_id, + returns a peer.PeerInfo with relevant addresses.""" diff --git a/libp2p/routing/kademlia/kademlia_content_router.py b/libp2p/routing/kademlia/kademlia_content_router.py index e66643c2..424a1db7 100644 --- a/libp2p/routing/kademlia/kademlia_content_router.py +++ b/libp2p/routing/kademlia/kademlia_content_router.py @@ -6,16 +6,15 @@ from libp2p.routing.interfaces import IContentRouting class KadmeliaContentRouter(IContentRouting): def provide(self, cid: bytes, announce: bool = True) -> None: - """ - Provide adds the given cid to the content routing system. If announce is True, - it also announces it, otherwise it is just kept in the local - accounting of which objects are being provided. + """Provide adds the given cid to the content routing system. + + If announce is True, it also announces it, otherwise it is just + kept in the local accounting of which objects are being + provided. """ # the DHT finds the closest peers to `key` using the `FIND_NODE` RPC # then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers. def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]: - """ - Search for peers who are able to provide a given key - returns an iterator of peer.PeerInfo - """ + """Search for peers who are able to provide a given key returns an + iterator of peer.PeerInfo.""" diff --git a/libp2p/routing/kademlia/kademlia_peer_router.py b/libp2p/routing/kademlia/kademlia_peer_router.py index 352c7dff..b232ef65 100644 --- a/libp2p/routing/kademlia/kademlia_peer_router.py +++ b/libp2p/routing/kademlia/kademlia_peer_router.py @@ -15,8 +15,8 @@ class KadmeliaPeerRouter(IPeerRouting): self.server = dht_server async def find_peer(self, peer_id: ID) -> PeerInfo: - """ - Find a specific peer + """Find a specific peer. + :param peer_id: peer to search for :return: PeerInfo of specified peer """ diff --git a/libp2p/security/base_session.py b/libp2p/security/base_session.py index e91bc30c..181f1970 100644 --- a/libp2p/security/base_session.py +++ b/libp2p/security/base_session.py @@ -6,10 +6,8 @@ from libp2p.security.secure_conn_interface import ISecureConn class BaseSession(ISecureConn): - """ - ``BaseSession`` is not fully instantiated from its abstract classes as it - is only meant to be used in clases that derive from it. - """ + """``BaseSession`` is not fully instantiated from its abstract classes as + it is only meant to be used in clases that derive from it.""" local_peer: ID local_private_key: PrivateKey diff --git a/libp2p/security/base_transport.py b/libp2p/security/base_transport.py index 10d7b663..4dbcbe2d 100644 --- a/libp2p/security/base_transport.py +++ b/libp2p/security/base_transport.py @@ -11,13 +11,12 @@ def default_secure_bytes_provider(n: int) -> bytes: class BaseSecureTransport(ISecureTransport): - """ - ``BaseSecureTransport`` is not fully instantiated from its abstract classes as it - is only meant to be used in clases that derive from it. + """``BaseSecureTransport`` is not fully instantiated from its abstract + classes as it is only meant to be used in clases that derive from it. - Clients can provide a strategy to get cryptographically secure bytes of a given length. - A default implementation is provided using the ``secrets`` module from the - standard library. + Clients can provide a strategy to get cryptographically secure bytes + of a given length. A default implementation is provided using the + ``secrets`` module from the standard library. """ def __init__( diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 81e70475..9d599efd 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -45,9 +45,7 @@ class InsecureSession(BaseSession): await self.conn.close() async def run_handshake(self) -> None: - """ - Raise `HandshakeFailure` when handshake failed - """ + """Raise `HandshakeFailure` when handshake failed.""" msg = make_exchange_message(self.local_private_key.get_public_key()) msg_bytes = msg.SerializeToString() encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) @@ -101,15 +99,15 @@ class InsecureSession(BaseSession): class InsecureTransport(BaseSecureTransport): - """ - ``InsecureTransport`` provides the "identity" upgrader for a ``IRawConnection``, - i.e. the upgraded transport does not add any additional security. - """ + """``InsecureTransport`` provides the "identity" upgrader for a + ``IRawConnection``, i.e. the upgraded transport does not add any additional + security.""" async def secure_inbound(self, conn: IRawConnection) -> ISecureConn: - """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are not the initiator) + """Secure the connection, either locally or by communicating with + opposing node via conn, for an inbound connection (i.e. we are not the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ session = InsecureSession(self.local_peer, self.local_private_key, conn, False) @@ -117,9 +115,10 @@ class InsecureTransport(BaseSecureTransport): return session async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: - """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are the initiator) + """Secure the connection, either locally or by communicating with + opposing node via conn, for an inbound connection (i.e. we are the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ session = InsecureSession( diff --git a/libp2p/security/secio/exceptions.py b/libp2p/security/secio/exceptions.py index c03fda44..190531d2 100644 --- a/libp2p/security/secio/exceptions.py +++ b/libp2p/security/secio/exceptions.py @@ -6,10 +6,8 @@ class SecioException(HandshakeFailure): class SelfEncryption(SecioException): - """ - Raised to indicate that a host is attempting to encrypt communications - with itself. - """ + """Raised to indicate that a host is attempting to encrypt communications + with itself.""" pass diff --git a/libp2p/security/secio/transport.py b/libp2p/security/secio/transport.py index e1aa0224..31393740 100644 --- a/libp2p/security/secio/transport.py +++ b/libp2p/security/secio/transport.py @@ -136,10 +136,8 @@ class SecureSession(BaseSession): @dataclass(frozen=True) class Proposal: - """ - A ``Proposal`` represents the set of session parameters one peer in a pair of - peers attempting to negotiate a `secio` channel prefers. - """ + """A ``Proposal`` represents the set of session parameters one peer in a + pair of peers attempting to negotiate a `secio` channel prefers.""" nonce: bytes public_key: PublicKey @@ -396,11 +394,10 @@ async def create_secure_session( conn: IRawConnection, remote_peer: PeerID = None, ) -> ISecureConn: - """ - Attempt the initial `secio` handshake with the remote peer. - If successful, return an object that provides secure communication to the - ``remote_peer``. - Raise `SecioException` when `conn` closed. + """Attempt the initial `secio` handshake with the remote peer. + + If successful, return an object that provides secure communication + to the ``remote_peer``. Raise `SecioException` when `conn` closed. Raise `InconsistentNonce` when handshake failed """ msg_io = MsgIOReadWriter(conn) @@ -431,18 +428,17 @@ async def create_secure_session( class Transport(BaseSecureTransport): - """ - ``Transport`` provides a security upgrader for a ``IRawConnection``, - following the `secio` protocol defined in the libp2p specs. - """ + """``Transport`` provides a security upgrader for a ``IRawConnection``, + following the `secio` protocol defined in the libp2p specs.""" def get_nonce(self) -> bytes: return self.secure_bytes_provider(NONCE_SIZE) async def secure_inbound(self, conn: IRawConnection) -> ISecureConn: - """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are not the initiator) + """Secure the connection, either locally or by communicating with + opposing node via conn, for an inbound connection (i.e. we are not the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ local_nonce = self.get_nonce() @@ -456,9 +452,10 @@ class Transport(BaseSecureTransport): async def secure_outbound( self, conn: IRawConnection, peer_id: PeerID ) -> ISecureConn: - """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are the initiator) + """Secure the connection, either locally or by communicating with + opposing node via conn, for an inbound connection (i.e. we are the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ local_nonce = self.get_nonce() diff --git a/libp2p/security/secure_transport_interface.py b/libp2p/security/secure_transport_interface.py index 3b3e6256..baeb194e 100644 --- a/libp2p/security/secure_transport_interface.py +++ b/libp2p/security/secure_transport_interface.py @@ -16,16 +16,18 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa class ISecureTransport(ABC): @abstractmethod async def secure_inbound(self, conn: IRawConnection) -> ISecureConn: - """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are not the initiator) + """Secure the connection, either locally or by communicating with + opposing node via conn, for an inbound connection (i.e. we are not the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ @abstractmethod async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: - """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are the initiator) + """Secure the connection, either locally or by communicating with + opposing node via conn, for an inbound connection (i.e. we are the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index cff55af3..731a9e1f 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -21,8 +21,8 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa class SecurityMultistream(ABC): - """ - SSMuxer is a multistream stream security transport multiplexer. + """SSMuxer is a multistream stream security transport multiplexer. + Go implementation: github.com/libp2p/go-conn-security-multistream/ssms.go """ @@ -40,10 +40,10 @@ class SecurityMultistream(ABC): self.add_transport(protocol, transport) def add_transport(self, protocol: TProtocol, transport: ISecureTransport) -> None: - """ - Add a protocol and its corresponding transport to multistream-select(multiselect). - The order that a protocol is added is exactly the precedence it is negotiated in - multiselect. + """Add a protocol and its corresponding transport to multistream- + select(multiselect). The order that a protocol is added is exactly the + precedence it is negotiated in multiselect. + :param protocol: the protocol name, which is negotiated in multiselect. :param transport: the corresponding transportation to the ``protocol``. """ @@ -56,9 +56,10 @@ class SecurityMultistream(ABC): self.multiselect.add_handler(protocol, None) async def secure_inbound(self, conn: IRawConnection) -> ISecureConn: - """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are not the initiator) + """Secure the connection, either locally or by communicating with + opposing node via conn, for an inbound connection (i.e. we are not the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ transport = await self.select_transport(conn, False) @@ -66,9 +67,10 @@ class SecurityMultistream(ABC): return secure_conn async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: - """ - Secure the connection, either locally or by communicating with opposing node via conn, - for an inbound connection (i.e. we are the initiator) + """Secure the connection, either locally or by communicating with + opposing node via conn, for an inbound connection (i.e. we are the + initiator) + :return: secure connection object (that implements secure_conn_interface) """ transport = await self.select_transport(conn, True) @@ -78,9 +80,9 @@ class SecurityMultistream(ABC): async def select_transport( self, conn: IRawConnection, initiator: bool ) -> ISecureTransport: - """ - Select a transport that both us and the node on the - other end of conn support and agree on + """Select a transport that both us and the node on the other end of + conn support and agree on. + :param conn: conn to choose a transport over :param initiator: true if we are the initiator, false otherwise :return: selected secure transport diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 4af110b6..bebfc66c 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -14,8 +14,8 @@ class IMuxedConn(ABC): @abstractmethod def __init__(self, conn: ISecureConn, peer_id: ID) -> None: - """ - create a new muxed connection + """create a new muxed connection. + :param conn: an instance of secured connection for new muxed streams :param peer_id: peer_id of peer the connection is to @@ -28,29 +28,25 @@ class IMuxedConn(ABC): @abstractmethod async def close(self) -> None: - """ - close connection - """ + """close connection.""" @abstractmethod def is_closed(self) -> bool: - """ - check connection is fully closed + """check connection is fully closed. + :return: true if successful """ @abstractmethod async def open_stream(self) -> "IMuxedStream": - """ - creates a new muxed_stream + """creates a new muxed_stream. + :return: a new ``IMuxedStream`` stream """ @abstractmethod async def accept_stream(self) -> "IMuxedStream": - """ - accepts a muxed stream opened by the other end - """ + """accepts a muxed stream opened by the other end.""" class IMuxedStream(ReadWriteCloser): @@ -59,14 +55,11 @@ class IMuxedStream(ReadWriteCloser): @abstractmethod async def reset(self) -> None: - """ - closes both ends of the stream - tells this remote side to hang up - """ + """closes both ends of the stream tells this remote side to hang up.""" @abstractmethod def set_deadline(self, ttl: int) -> bool: - """ - set deadline for muxed stream + """set deadline for muxed stream. + :return: a new stream """ diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 768e66c1..4616a693 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -41,8 +41,8 @@ class Mplex(IMuxedConn): _tasks: List["asyncio.Future[Any]"] def __init__(self, secured_conn: ISecureConn, peer_id: ID) -> None: - """ - create a new muxed connection + """create a new muxed connection. + :param secured_conn: an instance of ``ISecureConn`` :param generic_protocol_handler: generic protocol handler for new muxed streams @@ -72,9 +72,7 @@ class Mplex(IMuxedConn): return self.secured_conn.initiator async def close(self) -> None: - """ - close the stream muxer and underlying secured connection - """ + """close the stream muxer and underlying secured connection.""" if self.event_shutting_down.is_set(): return # Set the `event_shutting_down`, to allow graceful shutdown. @@ -84,15 +82,15 @@ class Mplex(IMuxedConn): await self.event_closed.wait() def is_closed(self) -> bool: - """ - check connection is fully closed + """check connection is fully closed. + :return: true if successful """ return self.event_closed.is_set() def _get_next_channel_id(self) -> int: - """ - Get next available stream id + """Get next available stream id. + :return: next available stream id for the connection """ next_id = self.next_channel_id @@ -106,8 +104,8 @@ class Mplex(IMuxedConn): return stream async def open_stream(self) -> IMuxedStream: - """ - creates a new muxed_stream + """creates a new muxed_stream. + :return: a new ``MplexStream`` """ channel_id = self._get_next_channel_id() @@ -135,9 +133,7 @@ class Mplex(IMuxedConn): return task_coro.result() async def accept_stream(self) -> IMuxedStream: - """ - accepts a muxed stream opened by the other end - """ + """accepts a muxed stream opened by the other end.""" return await self._wait_until_shutting_down_or_closed( self.new_stream_queue.get() ) @@ -145,8 +141,8 @@ class Mplex(IMuxedConn): async def send_message( self, flag: HeaderTags, data: Optional[bytes], stream_id: StreamID ) -> int: - """ - sends a message over the connection + """sends a message over the connection. + :param header: header to use :param data: data to send in the message :param stream_id: stream the message is in @@ -164,8 +160,8 @@ class Mplex(IMuxedConn): ) async def write_to_stream(self, _bytes: bytes) -> int: - """ - writes a byte array to a secured connection + """writes a byte array to a secured connection. + :param _bytes: byte array to write :return: length written """ @@ -173,9 +169,8 @@ class Mplex(IMuxedConn): return len(_bytes) async def handle_incoming(self) -> None: - """ - Read a message off of the secured connection and add it to the corresponding message buffer - """ + """Read a message off of the secured connection and add it to the + corresponding message buffer.""" while True: try: @@ -189,8 +184,8 @@ class Mplex(IMuxedConn): await self._cleanup() async def read_message(self) -> Tuple[int, int, bytes]: - """ - Read a single message off of the secured connection + """Read a single message off of the secured connection. + :return: stream_id, flag, message contents """ @@ -215,8 +210,8 @@ class Mplex(IMuxedConn): return channel_id, flag, message async def _handle_incoming_message(self) -> None: - """ - Read and handle a new incoming message. + """Read and handle a new incoming message. + :raise MplexUnavailable: `Mplex` encounters fatal error or is shutting down. """ channel_id, flag, message = await self._wait_until_shutting_down_or_closed( diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 7cc0564e..46e89149 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -34,8 +34,8 @@ class MplexStream(IMuxedStream): _buf: bytearray def __init__(self, name: str, stream_id: StreamID, muxed_conn: "Mplex") -> None: - """ - create new MuxedStream in muxer + """create new MuxedStream in muxer. + :param stream_id: stream id of this stream :param muxed_conn: muxed connection of this muxed_stream """ @@ -112,10 +112,10 @@ class MplexStream(IMuxedStream): return bytes(payload) async def read(self, n: int = -1) -> bytes: - """ - Read up to n bytes. Read possibly returns fewer than `n` bytes, - if there are not enough bytes in the Mplex buffer. - If `n == -1`, read until EOF. + """Read up to n bytes. Read possibly returns fewer than `n` bytes, if + there are not enough bytes in the Mplex buffer. If `n == -1`, read + until EOF. + :param n: number of bytes to read :return: bytes actually read """ @@ -141,8 +141,8 @@ class MplexStream(IMuxedStream): return bytes(payload) async def write(self, data: bytes) -> int: - """ - write to stream + """write to stream. + :return: number of bytes written """ if self.event_local_closed.is_set(): @@ -155,10 +155,8 @@ class MplexStream(IMuxedStream): return await self.muxed_conn.send_message(flag, data, self.stream_id) async def close(self) -> None: - """ - Closing a stream closes it for writing and closes the remote end for reading - but allows writing in the other direction. - """ + """Closing a stream closes it for writing and closes the remote end for + reading but allows writing in the other direction.""" # TODO error handling with timeout async with self.close_lock: @@ -182,10 +180,7 @@ class MplexStream(IMuxedStream): del self.muxed_conn.streams[self.stream_id] async def reset(self) -> None: - """ - closes both ends of the stream - tells this remote side to hang up - """ + """closes both ends of the stream tells this remote side to hang up.""" async with self.close_lock: # Both sides have been closed. No need to event_reset. if self.event_remote_closed.is_set() and self.event_local_closed.is_set(): @@ -217,8 +212,8 @@ class MplexStream(IMuxedStream): # TODO deadline not in use def set_deadline(self, ttl: int) -> bool: - """ - set deadline for muxed stream + """set deadline for muxed stream. + :return: True if successful """ self.read_deadline = ttl @@ -226,16 +221,16 @@ class MplexStream(IMuxedStream): return True def set_read_deadline(self, ttl: int) -> bool: - """ - set read deadline for muxed stream + """set read deadline for muxed stream. + :return: True if successful """ self.read_deadline = ttl return True def set_write_deadline(self, ttl: int) -> bool: - """ - set write deadline for muxed stream + """set write deadline for muxed stream. + :return: True if successful """ self.write_deadline = ttl diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index 7f6ee077..ef712ad1 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -16,8 +16,8 @@ DEFAULT_NEGOTIATE_TIMEOUT = 60 class MuxerMultistream: - """ - MuxerMultistream is a multistream stream muxed transport multiplexer. + """MuxerMultistream is a multistream stream muxed transport multiplexer. + go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go """ @@ -34,10 +34,10 @@ class MuxerMultistream: self.add_transport(protocol, transport) def add_transport(self, protocol: TProtocol, transport: TMuxerClass) -> None: - """ - Add a protocol and its corresponding transport to multistream-select(multiselect). - The order that a protocol is added is exactly the precedence it is negotiated in - multiselect. + """Add a protocol and its corresponding transport to multistream- + select(multiselect). The order that a protocol is added is exactly the + precedence it is negotiated in multiselect. + :param protocol: the protocol name, which is negotiated in multiselect. :param transport: the corresponding transportation to the ``protocol``. """ @@ -48,9 +48,9 @@ class MuxerMultistream: self.multiselect.add_handler(protocol, None) async def select_transport(self, conn: IRawConnection) -> TMuxerClass: - """ - Select a transport that both us and the node on the - other end of conn support and agree on + """Select a transport that both us and the node on the other end of + conn support and agree on. + :param conn: conn to choose a transport over :return: selected muxer transport """ diff --git a/libp2p/transport/listener_interface.py b/libp2p/transport/listener_interface.py index 9664f069..7a030627 100644 --- a/libp2p/transport/listener_interface.py +++ b/libp2p/transport/listener_interface.py @@ -7,22 +7,20 @@ from multiaddr import Multiaddr class IListener(ABC): @abstractmethod async def listen(self, maddr: Multiaddr) -> bool: - """ - put listener in listening mode and wait for incoming connections + """put listener in listening mode and wait for incoming connections. + :param maddr: multiaddr of peer :return: return True if successful """ @abstractmethod def get_addrs(self) -> List[Multiaddr]: - """ - retrieve list of addresses the listener is listening on + """retrieve list of addresses the listener is listening on. + :return: return list of addrs """ @abstractmethod async def close(self) -> None: - """ - close the listener such that no more connections - can be open on this transport instance - """ + """close the listener such that no more connections can be open on this + transport instance.""" diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 5ee24283..7b1efa20 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -22,8 +22,8 @@ class TCPListener(IListener): self.handler = handler_function async def listen(self, maddr: Multiaddr) -> bool: - """ - put listener in listening mode and wait for incoming connections + """put listener in listening mode and wait for incoming connections. + :param maddr: maddr of peer :return: return True if successful """ @@ -38,18 +38,16 @@ class TCPListener(IListener): return True def get_addrs(self) -> List[Multiaddr]: - """ - retrieve list of addresses the listener is listening on + """retrieve list of addresses the listener is listening on. + :return: return list of addrs """ # TODO check if server is listening return self.multiaddrs async def close(self) -> None: - """ - close the listener such that no more connections - can be open on this transport instance - """ + """close the listener such that no more connections can be open on this + transport instance.""" if self.server is None: return self.server.close() @@ -59,8 +57,8 @@ class TCPListener(IListener): class TCP(ITransport): async def dial(self, maddr: Multiaddr) -> IRawConnection: - """ - dial a transport to peer listening on multiaddr + """dial a transport to peer listening on multiaddr. + :param maddr: multiaddr of peer :return: `RawConnection` if successful :raise OpenConnectionError: raised when failed to open connection @@ -76,10 +74,10 @@ class TCP(ITransport): return RawConnection(reader, writer, True) def create_listener(self, handler_function: THandler) -> TCPListener: - """ - create listener on transport + """create listener on transport. + :param handler_function: a function called when a new connection is received - that takes a connection as argument which implements interface-connection + that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py """ return TCPListener(handler_function) diff --git a/libp2p/transport/transport_interface.py b/libp2p/transport/transport_interface.py index ca8b5c34..35738907 100644 --- a/libp2p/transport/transport_interface.py +++ b/libp2p/transport/transport_interface.py @@ -11,8 +11,8 @@ from .typing import THandler class ITransport(ABC): @abstractmethod async def dial(self, maddr: Multiaddr) -> IRawConnection: - """ - dial a transport to peer listening on multiaddr + """dial a transport to peer listening on multiaddr. + :param multiaddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) :return: list of multiaddrs @@ -20,9 +20,9 @@ class ITransport(ABC): @abstractmethod def create_listener(self, handler_function: THandler) -> IListener: - """ - create listener on transport + """create listener on transport. + :param handler_function: a function called when a new conntion is received - that takes a connection as argument which implements interface-connection + that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py """ diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 8dda95e7..6b06a2dc 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -26,18 +26,14 @@ class TransportUpgrader: self.muxer_multistream = MuxerMultistream(muxer_transports_by_protocol) def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None: - """ - Upgrade multiaddr listeners to libp2p-transport listeners - """ + """Upgrade multiaddr listeners to libp2p-transport listeners.""" # TODO: Figure out what to do with this function. pass async def upgrade_security( self, raw_conn: IRawConnection, peer_id: ID, initiator: bool ) -> ISecureConn: - """ - Upgrade conn to a secured connection - """ + """Upgrade conn to a secured connection.""" try: if initiator: return await self.security_multistream.secure_outbound( @@ -54,9 +50,7 @@ class TransportUpgrader: ) from error async def upgrade_connection(self, conn: ISecureConn, peer_id: ID) -> IMuxedConn: - """ - Upgrade secured connection to a muxed connection - """ + """Upgrade secured connection to a muxed connection.""" try: return await self.muxer_multistream.new_conn(conn, peer_id) except (MultiselectError, MultiselectClientError) as error: diff --git a/libp2p/utils.py b/libp2p/utils.py index 39c79e58..677f054c 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -19,7 +19,7 @@ SHIFT_64_BIT_MAX = int(math.ceil(64 / 7)) * 7 def encode_uvarint(number: int) -> bytes: - """Pack `number` into varint bytes""" + """Pack `number` into varint bytes.""" buf = b"" while True: towrite = number & 0x7F @@ -33,9 +33,7 @@ def encode_uvarint(number: int) -> bytes: async def decode_uvarint_from_stream(reader: Reader) -> int: - """ - https://en.wikipedia.org/wiki/LEB128 - """ + """https://en.wikipedia.org/wiki/LEB128.""" res = 0 for shift in itertools.count(0, 7): if shift > SHIFT_64_BIT_MAX: diff --git a/setup.py b/setup.py index 83da86d1..f9c2668d 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ extras_require = { "isort==4.3.21", "flake8>=3.7.7,<4.0.0", ], - "dev": ["tox>=3.13.2,<4.0.0"], + "dev": ["tox>=3.13.2,<4.0.0", "docformatter"], } extras_require["dev"] = ( diff --git a/tests/factories.py b/tests/factories.py index b4e8be23..5c08d4df 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -2,7 +2,6 @@ import asyncio from typing import Dict, Tuple import factory - from libp2p import generate_new_rsa_identity, initialize_default_swarm from libp2p.crypto.keys import KeyPair from libp2p.host.basic_host import BasicHost diff --git a/tests/network/test_notify.py b/tests/network/test_notify.py index aaf0ed55..aee9140d 100644 --- a/tests/network/test_notify.py +++ b/tests/network/test_notify.py @@ -1,6 +1,5 @@ -""" -Test Notify and Notifee by ensuring that the proper events get -called, and that the stream passed into opened_stream is correct +"""Test Notify and Notifee by ensuring that the proper events get called, and +that the stream passed into opened_stream is correct. Note: Listen event does not get hit because MyNotifee is passed into network after network has already started listening diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 98a224fa..23e9f2cb 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -20,10 +20,11 @@ CRYPTO_TOPIC = "ethereum" class DummyAccountNode: - """ - Node which has an internal balance mapping, meant to serve as - a dummy crypto blockchain. There is no actual blockchain, just a simple - map indicating how much crypto each user in the mappings holds + """Node which has an internal balance mapping, meant to serve as a dummy + crypto blockchain. + + There is no actual blockchain, just a simple map indicating how much + crypto each user in the mappings holds """ libp2p_node: IHost @@ -40,9 +41,8 @@ class DummyAccountNode: @classmethod async def create(cls): - """ - Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub - instance to this new node + """Create a new DummyAccountNode and attach a libp2p node, a floodsub, + and a pubsub instance to this new node. We use create as this serves as a factory function and allows us to use async await, unlike the init function @@ -53,9 +53,7 @@ class DummyAccountNode: return cls(libp2p_node=pubsub.host, pubsub=pubsub, floodsub=pubsub.router) async def handle_incoming_msgs(self): - """ - Handle all incoming messages on the CRYPTO_TOPIC from peers - """ + """Handle all incoming messages on the CRYPTO_TOPIC from peers.""" while True: incoming = await self.q.get() msg_comps = incoming.data.decode("utf-8").split(",") @@ -66,17 +64,16 @@ class DummyAccountNode: self.handle_set_crypto(msg_comps[1], int(msg_comps[2])) async def setup_crypto_networking(self): - """ - Subscribe to CRYPTO_TOPIC and perform call to function that handles - all incoming messages on said topic - """ + """Subscribe to CRYPTO_TOPIC and perform call to function that handles + all incoming messages on said topic.""" self.q = await self.pubsub.subscribe(CRYPTO_TOPIC) asyncio.ensure_future(self.handle_incoming_msgs()) async def publish_send_crypto(self, source_user, dest_user, amount): - """ - Create a send crypto message and publish that message to all other nodes + """Create a send crypto message and publish that message to all other + nodes. + :param source_user: user to send crypto from :param dest_user: user to send crypto to :param amount: amount of crypto to send @@ -85,8 +82,9 @@ class DummyAccountNode: await self.pubsub.publish(CRYPTO_TOPIC, msg_contents.encode()) async def publish_set_crypto(self, user, amount): - """ - Create a set crypto message and publish that message to all other nodes + """Create a set crypto message and publish that message to all other + nodes. + :param user: user to set crypto for :param amount: amount of crypto """ @@ -94,8 +92,8 @@ class DummyAccountNode: await self.pubsub.publish(CRYPTO_TOPIC, msg_contents.encode()) def handle_send_crypto(self, source_user, dest_user, amount): - """ - Handle incoming send_crypto message + """Handle incoming send_crypto message. + :param source_user: user to send crypto from :param dest_user: user to send crypto to :param amount: amount of crypto to send @@ -111,16 +109,16 @@ class DummyAccountNode: self.balances[dest_user] = amount def handle_set_crypto(self, dest_user, amount): - """ - Handle incoming set_crypto message + """Handle incoming set_crypto message. + :param dest_user: user to set crypto for :param amount: amount of crypto """ self.balances[dest_user] = amount def get_balance(self, user): - """ - Get balance in crypto for a particular user + """Get balance in crypto for a particular user. + :param user: user to get balance for :return: balance of user """ diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index edc2f51e..0fb28580 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -16,9 +16,9 @@ def create_setup_in_new_thread_func(dummy_node): async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): - """ - Helper function to allow for easy construction of custom tests for dummy account nodes - in various network topologies + """Helper function to allow for easy construction of custom tests for dummy + account nodes in various network topologies. + :param num_nodes: number of nodes in the test :param adjacency_map: adjacency map defining each node and its list of neighbors :param action_func: function to execute that includes actions by the nodes, diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index d1302072..20ee05db 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -7,8 +7,8 @@ from tests.utils import connect def message_id_generator(start_val): - """ - Generate a unique message id + """Generate a unique message id. + :param start_val: value to start generating messages at :return: message id """ diff --git a/tests/utils.py b/tests/utils.py index 6c1fa473..67e9668f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -21,9 +21,7 @@ async def connect_swarm(swarm_0, swarm_1): async def connect(node1, node2): - """ - Connect node1 to node2 - """ + """Connect node1 to node2.""" addr = node2.get_addrs()[0] info = info_from_p2p_addr(addr) await node1.connect(info) diff --git a/tests_interop/daemon.py b/tests_interop/daemon.py index 97356845..26d6fe8c 100644 --- a/tests_interop/daemon.py +++ b/tests_interop/daemon.py @@ -19,9 +19,10 @@ TIMEOUT_DURATION = 30 async def try_until_success(coro_func, timeout=TIMEOUT_DURATION): - """ - Keep running ``coro_func`` until either it succeed or time is up. - All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments. + """Keep running ``coro_func`` until either it succeed or time is up. + + All arguments of ``coro_func`` should be filled, i.e. it should be + called without arguments. """ t_start = time.monotonic() while True: