diff --git a/.pylintrc b/.pylintrc index 49552216..6784a7cc 100644 --- a/.pylintrc +++ b/.pylintrc @@ -128,6 +128,8 @@ disable=print-statement, dict-keys-not-iterating, dict-values-not-iterating, missing-docstring, + cyclic-import, + duplicate-code, # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 40292dfc..99ee832a 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -1,5 +1,21 @@ +from typing import ( + Any, + Awaitable, + Callable, + List, + Sequence, +) + import multiaddr +from libp2p.network.network_interface import INetwork +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo +from libp2p.peer.peerstore_interface import IPeerStore + +from libp2p.network.stream.net_stream_interface import INetStream +from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter + from .host_interface import IHost # Upon host creation, host takes in options, @@ -8,50 +24,58 @@ from .host_interface import IHost # telling it to listen on the given listen addresses. +StreamHandlerFn = Callable[[INetStream], Awaitable[None]] + + class BasicHost(IHost): + _network: INetwork + router: KadmeliaPeerRouter + peerstore: IPeerStore + # default options constructor - def __init__(self, network, router=None): + def __init__(self, network: INetwork, router: KadmeliaPeerRouter = None) -> None: self._network = network self._router = router self.peerstore = self._network.peerstore - def get_id(self): + def get_id(self) -> ID: """ :return: peer_id of host """ return self._network.get_peer_id() - def get_network(self): + def get_network(self) -> INetwork: """ :return: network instance of host """ return self._network - def get_peerstore(self): + def get_peerstore(self) -> IPeerStore: """ :return: peerstore of the host (same one as in its network instance) """ return self.peerstore - def get_mux(self): + # FIXME: Replace with correct return type + def get_mux(self) -> Any: """ :return: mux instance of host """ - def get_addrs(self): + def get_addrs(self) -> List[multiaddr.Multiaddr]: """ :return: all the multiaddr addresses this host is listening too """ p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty())) - addrs = [] + addrs: List[multiaddr.Multiaddr] = [] for transport in self._network.listeners.values(): for addr in transport.get_addrs(): addrs.append(addr.encapsulate(p2p_part)) return addrs - def set_stream_handler(self, protocol_id, stream_handler): + def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool: """ set stream handler for host :param protocol_id: protocol id used on stream @@ -62,16 +86,15 @@ class BasicHost(IHost): # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on - async def new_stream(self, peer_id, protocol_ids): + async def new_stream(self, peer_id: ID, protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id that host is connecting :param protocol_id: protocol id that stream runs on - :return: true if successful + :return: stream: new stream created """ - stream = await self._network.new_stream(peer_id, protocol_ids) - return stream + return await self._network.new_stream(peer_id, protocol_ids) - async def connect(self, peer_info): + async def connect(self, peer_info: PeerInfo) -> None: """ connect ensures there is a connection between this host and the peer with given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index 57847c8b..50858089 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -1,34 +1,53 @@ from abc import ABC, abstractmethod +from typing import ( + Any, + Awaitable, + Callable, + List, + Sequence, +) + +import multiaddr + +from libp2p.network.network_interface import INetwork +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo + +from libp2p.network.stream.net_stream_interface import INetStream + + +StreamHandlerFn = Callable[[INetStream], Awaitable[None]] class IHost(ABC): @abstractmethod - def get_id(self): + def get_id(self) -> ID: """ :return: peer_id of host """ @abstractmethod - def get_network(self): + def get_network(self) -> INetwork: """ :return: network instance of host """ + # FIXME: Replace with correct return type @abstractmethod - def get_mux(self): + def get_mux(self) -> Any: """ :return: mux instance of host """ @abstractmethod - def get_addrs(self): + def get_addrs(self) -> List[multiaddr.Multiaddr]: """ :return: all the multiaddr addresses this host is listening too """ @abstractmethod - def set_stream_handler(self, protocol_id, stream_handler): + def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool: """ set stream handler for host :param protocol_id: protocol id used on stream @@ -39,15 +58,17 @@ class IHost(ABC): # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on @abstractmethod - def new_stream(self, peer_id, protocol_ids): + async def new_stream(self, + peer_id: ID, + protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id that host is connecting :param protocol_ids: protocol ids that stream can run on - :return: true if successful + :return: stream: new stream created """ @abstractmethod - def connect(self, peer_info): + async def connect(self, peer_info: PeerInfo) -> None: """ connect ensures there is a connection between this host and the peer with given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 23cb5516..3e5716bb 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -1,8 +1,22 @@ +import asyncio + from .raw_connection_interface import IRawConnection class RawConnection(IRawConnection): - def __init__(self, ip, port, reader, writer, initiator): + conn_ip: str + conn_port: str + reader: asyncio.StreamReader + writer: asyncio.StreamWriter + _next_id: int + initiator: bool + + def __init__(self, + ip: str, + port: str, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + initiator: bool) -> None: # pylint: disable=too-many-arguments self.conn_ip = ip self.conn_port = port @@ -11,12 +25,12 @@ class RawConnection(IRawConnection): self._next_id = 0 if initiator else 1 self.initiator = initiator - async def write(self, data): + async def write(self, data: bytes) -> None: self.writer.write(data) self.writer.write("\n".encode()) await self.writer.drain() - async def read(self): + async def read(self) -> bytes: line = await self.reader.readline() adjusted_line = line.decode().rstrip('\n') @@ -24,10 +38,10 @@ class RawConnection(IRawConnection): # encoding and decoding return adjusted_line.encode() - def close(self): + def close(self) -> None: self.writer.close() - def next_stream_id(self): + def next_stream_id(self) -> int: """ Get next available stream id :return: next available stream id for the connection diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 71e813c5..1a84c0ea 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -1,16 +1,45 @@ -from abc import ABC, abstractmethod +from abc import ( + ABC, + abstractmethod, +) +from typing import ( + Awaitable, + Callable, + Dict, + Sequence, + TYPE_CHECKING, +) + +from multiaddr import Multiaddr + +from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStore +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.transport.listener_interface import IListener + +from .stream.net_stream_interface import INetStream + +if TYPE_CHECKING: + from .notifee_interface import INotifee + + +StreamHandlerFn = Callable[[INetStream], Awaitable[None]] class INetwork(ABC): + peerstore: PeerStore + connections: Dict[ID, IMuxedConn] + listeners: Dict[str, IListener] + @abstractmethod - def get_peer_id(self): + def get_peer_id(self) -> ID: """ :return: the peer id """ @abstractmethod - def dial_peer(self, peer_id): + async def dial_peer(self, peer_id: ID) -> IMuxedConn: """ dial_peer try to create a connection to peer_id @@ -20,7 +49,7 @@ class INetwork(ABC): """ @abstractmethod - def set_stream_handler(self, protocol_id, stream_handler): + def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool: """ :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance @@ -28,7 +57,9 @@ class INetwork(ABC): """ @abstractmethod - def new_stream(self, peer_id, protocol_ids): + async def new_stream(self, + peer_id: ID, + protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id of destination :param protocol_ids: available protocol ids to use for stream @@ -36,14 +67,14 @@ class INetwork(ABC): """ @abstractmethod - def listen(self, *args): + async def listen(self, *args: Sequence[Multiaddr]) -> bool: """ :param *args: one or many multiaddrs to start listening on :return: True if at least one success """ @abstractmethod - def notify(self, notifee): + def notify(self, notifee: 'INotifee') -> bool: """ :param notifee: object implementing Notifee interface :return: true if notifee registered successfully, false otherwise diff --git a/libp2p/network/notifee_interface.py b/libp2p/network/notifee_interface.py index 17ba1fe0..65ad4480 100644 --- a/libp2p/network/notifee_interface.py +++ b/libp2p/network/notifee_interface.py @@ -1,44 +1,58 @@ -from abc import ABC, abstractmethod +from abc import ( + ABC, + abstractmethod, +) +from typing import TYPE_CHECKING + +from multiaddr import Multiaddr + +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn + +from libp2p.network.stream.net_stream_interface import INetStream + +if TYPE_CHECKING: + from .network_interface import INetwork + class INotifee(ABC): @abstractmethod - async def opened_stream(self, network, stream): + async def opened_stream(self, network: 'INetwork', stream: INetStream) -> None: """ :param network: network the stream was opened on :param stream: stream that was opened """ @abstractmethod - async def closed_stream(self, network, stream): + async def closed_stream(self, network: 'INetwork', stream: INetStream) -> None: """ :param network: network the stream was closed on :param stream: stream that was closed """ @abstractmethod - async def connected(self, network, conn): + async def connected(self, network: 'INetwork', conn: IMuxedConn) -> None: """ :param network: network the connection was opened on :param conn: connection that was opened """ @abstractmethod - async def disconnected(self, network, conn): + async def disconnected(self, network: 'INetwork', conn: IMuxedConn) -> None: """ :param network: network the connection was closed on :param conn: connection that was closed """ @abstractmethod - async def listen(self, network, multiaddr): + async def listen(self, network: 'INetwork', multiaddr: Multiaddr) -> None: """ :param network: network the listener is listening on :param multiaddr: multiaddress listener is listening on """ @abstractmethod - async def listen_close(self, network, multiaddr): + async def listen_close(self, network: 'INetwork', multiaddr: Multiaddr) -> None: """ :param network: network the connection was opened on :param multiaddr: multiaddress listener is no longer listening on diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index f7089625..8ebe7913 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,41 +1,48 @@ +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream + from .net_stream_interface import INetStream class NetStream(INetStream): - def __init__(self, muxed_stream): + muxed_stream: IMuxedStream + mplex_conn: IMuxedConn + protocol_id: str + + def __init__(self, muxed_stream: IMuxedStream) -> None: self.muxed_stream = muxed_stream self.mplex_conn = muxed_stream.mplex_conn self.protocol_id = None - def get_protocol(self): + def get_protocol(self) -> str: """ :return: protocol id that stream runs on """ return self.protocol_id - def set_protocol(self, protocol_id): + def set_protocol(self, protocol_id: str) -> None: """ :param protocol_id: protocol id that stream runs on :return: true if successful """ self.protocol_id = protocol_id - async def read(self): + async def read(self) -> bytes: """ read from stream :return: bytes of input until EOF """ return await self.muxed_stream.read() - async def write(self, data): + async def write(self, data: bytes) -> int: """ write to stream :return: number of bytes written """ return await self.muxed_stream.write(data) - async def close(self): + async def close(self) -> bool: """ close stream :return: true if successful diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index 056f9244..ca3858ff 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -1,37 +1,41 @@ from abc import ABC, abstractmethod +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn + class INetStream(ABC): + mplex_conn: IMuxedConn + @abstractmethod - def get_protocol(self): + def get_protocol(self) -> str: """ :return: protocol id that stream runs on """ @abstractmethod - def set_protocol(self, protocol_id): + def set_protocol(self, protocol_id: str) -> bool: """ :param protocol_id: protocol id that stream runs on :return: true if successful """ @abstractmethod - def read(self): + async def read(self) -> bytes: """ reads from the underlying muxed_stream :return: bytes of input """ @abstractmethod - def write(self, _bytes): + async def write(self, data: bytes) -> int: """ write to the underlying muxed_stream :return: number of bytes written """ @abstractmethod - def close(self): + async def close(self) -> bool: """ close the underlying muxed stream :return: true if successful diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index e3d037e8..a259fce6 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -1,18 +1,62 @@ import asyncio +from typing import ( + Awaitable, + Callable, + Dict, + List, + Sequence, +) -from libp2p.protocol_muxer.multiselect_client import MultiselectClient +from multiaddr import Multiaddr + +from libp2p.peer.id import ( + ID, + id_b58_decode, +) +from libp2p.peer.peerstore import PeerStore from libp2p.protocol_muxer.multiselect import Multiselect -from libp2p.peer.id import id_b58_decode +from libp2p.protocol_muxer.multiselect_client import MultiselectClient +from libp2p.routing.interfaces import IPeerRouting +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream +from libp2p.transport.upgrader import TransportUpgrader +from libp2p.transport.transport_interface import ITransport +from libp2p.transport.listener_interface import IListener + from .network_interface import INetwork from .notifee_interface import INotifee -from .stream.net_stream import NetStream from .connection.raw_connection import RawConnection +from .stream.net_stream import NetStream +from .stream.net_stream_interface import INetStream + + +StreamHandlerFn = Callable[[INetStream], Awaitable[None]] + class Swarm(INetwork): # pylint: disable=too-many-instance-attributes,cell-var-from-loop,too-many-arguments - def __init__(self, peer_id, peerstore, upgrader, transport, router): + self_id: ID + peerstore: PeerStore + upgrader: TransportUpgrader + transport: ITransport + router: IPeerRouting + connections: Dict[ID, IMuxedConn] + listeners: Dict[str, IListener] + stream_handlers: Dict[INetStream, Callable[[INetStream], None]] + + multiselect: Multiselect + multiselect_client: MultiselectClient + + notifees: List[INotifee] + + def __init__(self, + peer_id: ID, + peerstore: PeerStore, + upgrader: TransportUpgrader, + transport: ITransport, + router: IPeerRouting): self.self_id = peer_id self.peerstore = peerstore self.upgrader = upgrader @@ -32,10 +76,10 @@ class Swarm(INetwork): # Create generic protocol handler self.generic_protocol_handler = create_generic_protocol_handler(self) - def get_peer_id(self): + def get_peer_id(self) -> ID: return self.self_id - def set_stream_handler(self, protocol_id, stream_handler): + def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool: """ :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance @@ -44,7 +88,7 @@ class Swarm(INetwork): self.multiselect.add_handler(protocol_id, stream_handler) return True - async def dial_peer(self, peer_id): + async def dial_peer(self, peer_id: ID) -> IMuxedConn: """ dial_peer try to create a connection to peer_id :param peer_id: peer if we want to dial @@ -87,7 +131,7 @@ class Swarm(INetwork): return muxed_conn - async def new_stream(self, peer_id, protocol_ids): + async def new_stream(self, peer_id: ID, protocol_ids: Sequence[str]) -> NetStream: """ :param peer_id: peer_id of destination :param protocol_id: protocol id @@ -109,7 +153,10 @@ class Swarm(INetwork): muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr) # Perform protocol muxing to determine protocol to use - selected_protocol = await self.multiselect_client.select_one_of(protocol_ids, muxed_stream) + selected_protocol = await self.multiselect_client.select_one_of( + list(protocol_ids), + muxed_stream, + ) # Create a net stream with the selected protocol net_stream = NetStream(muxed_stream) @@ -121,7 +168,7 @@ class Swarm(INetwork): return net_stream - async def listen(self, *args): + async def listen(self, *args: Sequence[Multiaddr]) -> bool: """ :param *args: one or many multiaddrs to start listening on :return: true if at least one success @@ -139,7 +186,8 @@ class Swarm(INetwork): if str(multiaddr) in self.listeners: return True - async def conn_handler(reader, writer): + async def conn_handler(reader: asyncio.StreamReader, + writer: asyncio.StreamWriter) -> None: # Read in first message (should be peer_id of initiator) and ack peer_id = id_b58_decode((await reader.read(1024)).decode()) @@ -182,7 +230,7 @@ class Swarm(INetwork): # No multiaddr succeeded return False - def notify(self, notifee): + def notify(self, notifee: INotifee) -> bool: """ :param notifee: object implementing Notifee interface :return: true if notifee registered successfully, false otherwise @@ -192,7 +240,7 @@ class Swarm(INetwork): return True return False - def add_router(self, router): + def add_router(self, router: IPeerRouting) -> None: self.router = router # TODO: `tear_down` @@ -204,7 +252,10 @@ class Swarm(INetwork): # TODO: `disconnect`? -def create_generic_protocol_handler(swarm): +GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]] + + +def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn: """ Create a generic protocol handler from the given swarm. We use swarm to extract the multiselect module so that generic_protocol_handler @@ -213,7 +264,7 @@ def create_generic_protocol_handler(swarm): """ multiselect = swarm.multiselect - async def generic_protocol_handler(muxed_stream): + async def generic_protocol_handler(muxed_stream: IMuxedStream) -> None: # Perform protocol muxing to determine protocol to use protocol, handler = await multiselect.negotiate(muxed_stream) @@ -229,5 +280,6 @@ def create_generic_protocol_handler(swarm): return generic_protocol_handler + class SwarmException(Exception): pass diff --git a/libp2p/peer/addrbook_interface.py b/libp2p/peer/addrbook_interface.py index 914d937a..5ac34f02 100644 --- a/libp2p/peer/addrbook_interface.py +++ b/libp2p/peer/addrbook_interface.py @@ -1,13 +1,22 @@ from abc import ABC, abstractmethod +from typing import ( + List, + Sequence, +) + + +from multiaddr import Multiaddr + +from .id import ID class IAddrBook(ABC): - def __init__(self): + def __init__(self) -> None: pass @abstractmethod - def add_addr(self, peer_id, addr, ttl): + def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int) -> None: """ Calls add_addrs(peer_id, [addr], ttl) :param peer_id: the peer to add address for @@ -16,7 +25,7 @@ class IAddrBook(ABC): """ @abstractmethod - def add_addrs(self, peer_id, addrs, ttl): + def add_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int) -> None: """ Adds addresses for a given peer all with the same time-to-live. If one of the addresses already exists for the peer and has a longer TTL, no operation should take place. @@ -27,21 +36,21 @@ class IAddrBook(ABC): """ @abstractmethod - def addrs(self, peer_id): + def addrs(self, peer_id: ID) -> List[Multiaddr]: """ :param peer_id: peer to get addresses of :return: all known (and valid) addresses for the given peer """ @abstractmethod - def clear_addrs(self, peer_id): + def clear_addrs(self, peer_id: ID) -> None: """ Removes all previously stored addresses :param peer_id: peer to remove addresses of """ @abstractmethod - def peers_with_addrs(self): + def peers_with_addrs(self) -> List[ID]: """ :return: all of the peer IDs stored with addresses """ diff --git a/libp2p/peer/id.py b/libp2p/peer/id.py index 77e2f87e..8561e0ec 100644 --- a/libp2p/peer/id.py +++ b/libp2p/peer/id.py @@ -1,7 +1,14 @@ import hashlib +from typing import ( + Union, +) + import base58 + import multihash +from Crypto.PublicKey.RSA import RsaKey + # MaxInlineKeyLength is the maximum length a key can be for it to be inlined in # the peer ID. # * When `len(pubKey.Bytes()) <= MaxInlineKeyLength`, the peer ID is the @@ -13,67 +20,71 @@ MAX_INLINE_KEY_LENGTH = 42 class ID: - def __init__(self, id_str): + _id_str: bytes + + def __init__(self, id_str: bytes) -> None: self._id_str = id_str def to_bytes(self) -> bytes: return self._id_str - def get_raw_id(self): + def get_raw_id(self) -> bytes: return self._id_str - def pretty(self): + def pretty(self) -> str: return base58.b58encode(self._id_str).decode() - def get_xor_id(self): + def get_xor_id(self) -> int: return int(digest(self.get_raw_id()).hex(), 16) - def __str__(self): + def __str__(self) -> str: pid = self.pretty() return pid __repr__ = __str__ - def __eq__(self, other): + def __eq__(self, other: object) -> bool: #pylint: disable=protected-access + if not isinstance(other, ID): + return NotImplemented return self._id_str == other._id_str - def __hash__(self): + def __hash__(self) -> int: return hash(self._id_str) -def id_b58_encode(peer_id): +def id_b58_encode(peer_id: ID) -> str: """ return a b58-encoded string """ #pylint: disable=protected-access - return base58.b58encode(peer_id._id_str).decode() + return base58.b58encode(peer_id.get_raw_id()).decode() -def id_b58_decode(peer_id_str): +def id_b58_decode(peer_id_str: str) -> ID: """ return a base58-decoded peer ID """ return ID(base58.b58decode(peer_id_str)) -def id_from_public_key(key): +def id_from_public_key(key: RsaKey) -> ID: # export into binary format key_bin = key.exportKey("DER") - algo = multihash.Func.sha2_256 + algo: int = multihash.Func.sha2_256 # TODO: seems identity is not yet supported in pymultihash # if len(b) <= MAX_INLINE_KEY_LENGTH: # algo multihash.func.identity - mh_digest = multihash.digest(key_bin, algo) + mh_digest: multihash.Multihash = multihash.digest(key_bin, algo) return ID(mh_digest.encode()) -def id_from_private_key(key): +def id_from_private_key(key: RsaKey) -> ID: return id_from_public_key(key.publickey()) -def digest(string): - if not isinstance(string, bytes): - string = str(string).encode('utf8') - return hashlib.sha1(string).digest() +def digest(data: Union[str, bytes]) -> bytes: + if isinstance(data, str): + data = data.encode('utf8') + return hashlib.sha1(data).digest() diff --git a/libp2p/peer/peerdata.py b/libp2p/peer/peerdata.py index c505badd..68749c84 100644 --- a/libp2p/peer/peerdata.py +++ b/libp2p/peer/peerdata.py @@ -1,35 +1,48 @@ +from typing import ( + Any, + Dict, + List, + Sequence, +) + +from multiaddr import Multiaddr + from .peerdata_interface import IPeerData class PeerData(IPeerData): - def __init__(self): + metadata: Dict[Any, Any] + protocols: List[str] + addrs: List[Multiaddr] + + def __init__(self) -> None: self.metadata = {} self.protocols = [] self.addrs = [] - def get_protocols(self): + def get_protocols(self) -> List[str]: return self.protocols - def add_protocols(self, protocols): - self.protocols.extend(protocols) + def add_protocols(self, protocols: Sequence[str]) -> None: + self.protocols.extend(list(protocols)) - def set_protocols(self, protocols): - self.protocols = protocols + def set_protocols(self, protocols: Sequence[str]) -> None: + self.protocols = list(protocols) - def add_addrs(self, addrs): + def add_addrs(self, addrs: Sequence[Multiaddr]) -> None: self.addrs.extend(addrs) - def get_addrs(self): + def get_addrs(self) -> List[Multiaddr]: return self.addrs - def clear_addrs(self): + def clear_addrs(self) -> None: self.addrs = [] - def put_metadata(self, key, val): + def put_metadata(self, key: str, val: Any) -> None: self.metadata[key] = val - def get_metadata(self, key): + def get_metadata(self, key: str) -> Any: if key in self.metadata: return self.metadata[key] raise PeerDataError("key not found") diff --git a/libp2p/peer/peerdata_interface.py b/libp2p/peer/peerdata_interface.py index cf4a2920..cefd56dc 100644 --- a/libp2p/peer/peerdata_interface.py +++ b/libp2p/peer/peerdata_interface.py @@ -1,46 +1,55 @@ from abc import ABC, abstractmethod +from typing import ( + Any, + List, + Sequence, +) + +from multiaddr import Multiaddr + +from .peermetadata_interface import IPeerMetadata class IPeerData(ABC): @abstractmethod - def get_protocols(self): + def get_protocols(self) -> List[str]: """ :return: all protocols associated with given peer """ @abstractmethod - def add_protocols(self, protocols): + def add_protocols(self, protocols: Sequence[str]) -> None: """ :param protocols: protocols to add """ @abstractmethod - def set_protocols(self, protocols): + def set_protocols(self, protocols: Sequence[str]) -> None: """ :param protocols: protocols to add """ @abstractmethod - def add_addrs(self, addrs): + def add_addrs(self, addrs: Sequence[Multiaddr]) -> None: """ :param addrs: multiaddresses to add """ @abstractmethod - def get_addrs(self): + def get_addrs(self) -> List[Multiaddr]: """ :return: all multiaddresses """ @abstractmethod - def clear_addrs(self): + def clear_addrs(self) -> None: """ Clear all addresses """ @abstractmethod - def put_metadata(self, key, val): + def put_metadata(self, key: str, val: Any) -> None: """ :param key: key in KV pair :param val: val to associate with key @@ -48,7 +57,7 @@ class IPeerData(ABC): """ @abstractmethod - def get_metadata(self, key): + def get_metadata(self, key: str) -> IPeerMetadata: """ :param key: key in KV pair :return: val for key diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index e1730669..6ccd0236 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -1,12 +1,23 @@ +from typing import ( + List, +) + import multiaddr -from .id import id_b58_decode +from .id import ( + ID, + id_b58_decode, +) from .peerdata import PeerData class PeerInfo: # pylint: disable=too-few-public-methods - def __init__(self, peer_id, peer_data=None): + + peer_id: ID + addrs: List[multiaddr.Multiaddr] + + def __init__(self, peer_id: ID, peer_data: PeerData = None) -> None: self.peer_id = peer_id self.addrs = peer_data.get_addrs() if peer_data else None @@ -30,16 +41,16 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: ) # make sure the /p2p value parses as a peer.ID - peer_id_str = p2p_part.value_for_protocol(multiaddr.protocols.P_P2P) - peer_id = id_b58_decode(peer_id_str) + peer_id_str: str = p2p_part.value_for_protocol(multiaddr.protocols.P_P2P) + peer_id: ID = id_b58_decode(peer_id_str) # we might have received just an / p2p part, which means there's no addr. if len(parts) > 1: addr = multiaddr.Multiaddr.join(*parts[:-1]) peer_data = PeerData() - peer_data.addrs = [addr] - peer_data.protocols = [p.code for p in addr.protocols()] + peer_data.add_addrs([addr]) + peer_data.set_protocols([p.code for p in addr.protocols()]) return PeerInfo(peer_id, peer_data) diff --git a/libp2p/peer/peermetadata_interface.py b/libp2p/peer/peermetadata_interface.py index 1badabd3..3d60259a 100644 --- a/libp2p/peer/peermetadata_interface.py +++ b/libp2p/peer/peermetadata_interface.py @@ -1,13 +1,20 @@ from abc import ABC, abstractmethod +from typing import ( + Any, +) + +from .id import ( + ID, +) class IPeerMetadata(ABC): - def __init__(self): + def __init__(self) -> None: pass @abstractmethod - def get(self, peer_id, key): + def get(self, peer_id: ID, key: str) -> Any: """ :param peer_id: peer ID to lookup key for :param key: key to look up @@ -16,7 +23,7 @@ class IPeerMetadata(ABC): """ @abstractmethod - def put(self, peer_id, key, val): + def put(self, peer_id: ID, key: str, val: Any) -> None: """ :param peer_id: peer ID to lookup key for :param key: key to associate with peer diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index 5a6a77b3..2cd1574f 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -1,15 +1,28 @@ -from .peerstore_interface import IPeerStore +from typing import ( + Any, + Dict, + List, + Optional, + Sequence, +) + +from multiaddr import Multiaddr + +from .id import ID from .peerdata import PeerData from .peerinfo import PeerInfo +from .peerstore_interface import IPeerStore class PeerStore(IPeerStore): - def __init__(self): + peer_map: Dict[ID, PeerData] + + def __init__(self) -> None: IPeerStore.__init__(self) self.peer_map = {} - def __create_or_get_peer(self, peer_id): + def __create_or_get_peer(self, peer_id: ID) -> PeerData: """ Returns the peer data for peer_id or creates a new peer data (and stores it in peer_map) if peer @@ -23,65 +36,65 @@ class PeerStore(IPeerStore): self.peer_map[peer_id] = data return self.peer_map[peer_id] - def peer_info(self, peer_id): + def peer_info(self, peer_id: ID) -> Optional[PeerInfo]: if peer_id in self.peer_map: - peer = self.peer_map[peer_id] - return PeerInfo(peer_id, peer) + peer_data = self.peer_map[peer_id] + return PeerInfo(peer_id, peer_data) return None - def get_protocols(self, peer_id): + def get_protocols(self, peer_id: ID) -> List[str]: if peer_id in self.peer_map: return self.peer_map[peer_id].get_protocols() raise PeerStoreError("peer ID not found") - def add_protocols(self, peer_id, protocols): + def add_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None: peer = self.__create_or_get_peer(peer_id) - peer.add_protocols(protocols) + peer.add_protocols(list(protocols)) - def set_protocols(self, peer_id, protocols): + def set_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None: peer = self.__create_or_get_peer(peer_id) - peer.set_protocols(protocols) + peer.set_protocols(list(protocols)) - def peers(self): + def peer_ids(self) -> List[ID]: return list(self.peer_map.keys()) - def get(self, peer_id, key): + def get(self, peer_id: ID, key: str) -> Any: if peer_id in self.peer_map: val = self.peer_map[peer_id].get_metadata(key) return val raise PeerStoreError("peer ID not found") - def put(self, peer_id, key, val): + def put(self, peer_id: ID, key: str, val: Any) -> None: # <> # This can output an error, not sure what the possible errors are peer = self.__create_or_get_peer(peer_id) peer.put_metadata(key, val) - def add_addr(self, peer_id, addr, ttl): + def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int) -> None: self.add_addrs(peer_id, [addr], ttl) - def add_addrs(self, peer_id, addrs, ttl): + def add_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int) -> None: # Ignore ttl for now peer = self.__create_or_get_peer(peer_id) - peer.add_addrs(addrs) + peer.add_addrs(list(addrs)) - def addrs(self, peer_id): + def addrs(self, peer_id: ID) -> List[Multiaddr]: if peer_id in self.peer_map: return self.peer_map[peer_id].get_addrs() raise PeerStoreError("peer ID not found") - def clear_addrs(self, peer_id): + def clear_addrs(self, peer_id: ID) -> None: # Only clear addresses if the peer is in peer map if peer_id in self.peer_map: self.peer_map[peer_id].clear_addrs() - def peers_with_addrs(self): + def peers_with_addrs(self) -> List[ID]: # Add all peers with addrs at least 1 to output - output = [] + output: List[ID] = [] - for key in self.peer_map: - if len(self.peer_map[key].get_addrs()) >= 1: - output.append(key) + for peer_id in self.peer_map: + if len(self.peer_map[peer_id].get_addrs()) >= 1: + output.append(peer_id) return output diff --git a/libp2p/peer/peerstore_interface.py b/libp2p/peer/peerstore_interface.py index b368d8f6..db6dbde5 100644 --- a/libp2p/peer/peerstore_interface.py +++ b/libp2p/peer/peerstore_interface.py @@ -1,24 +1,31 @@ from abc import abstractmethod +from typing import ( + List, + Sequence, +) + from .addrbook_interface import IAddrBook +from .id import ID +from .peerinfo import PeerInfo from .peermetadata_interface import IPeerMetadata class IPeerStore(IAddrBook, IPeerMetadata): - def __init__(self): + def __init__(self) -> None: IPeerMetadata.__init__(self) IAddrBook.__init__(self) @abstractmethod - def peer_info(self, peer_id): + def peer_info(self, peer_id: ID) -> PeerInfo: """ :param peer_id: peer ID to get info for :return: peer info object """ @abstractmethod - def get_protocols(self, peer_id): + def get_protocols(self, peer_id: ID) -> List[str]: """ :param peer_id: peer ID to get protocols for :return: protocols (as strings) @@ -26,7 +33,7 @@ class IPeerStore(IAddrBook, IPeerMetadata): """ @abstractmethod - def add_protocols(self, peer_id, protocols): + def add_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None: """ :param peer_id: peer ID to add protocols for :param protocols: protocols to add @@ -34,7 +41,7 @@ class IPeerStore(IAddrBook, IPeerMetadata): """ @abstractmethod - def set_protocols(self, peer_id, protocols): + def set_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None: """ :param peer_id: peer ID to set protocols for :param protocols: protocols to set @@ -42,7 +49,7 @@ class IPeerStore(IAddrBook, IPeerMetadata): """ @abstractmethod - def peers(self): + def peer_ids(self) -> List[ID]: """ :return: all of the peer IDs stored in peer store """ diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 04040213..e51c8310 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,5 +1,7 @@ from typing import ( Iterable, + List, + Sequence, ) from libp2p.peer.id import ( @@ -8,23 +10,28 @@ from libp2p.peer.id import ( ) from .pb import rpc_pb2 +from .pubsub import Pubsub from .pubsub_router_interface import IPubsubRouter class FloodSub(IPubsubRouter): # pylint: disable=no-member - def __init__(self, protocols): - self.protocols = protocols + protocols: List[str] + + pubsub: Pubsub + + def __init__(self, protocols: Sequence[str]) -> None: + self.protocols = list(protocols) self.pubsub = None - def get_protocols(self): + def get_protocols(self) -> List[str]: """ :return: the list of protocols supported by the router """ return self.protocols - def attach(self, pubsub): + def attach(self, pubsub: Pubsub) -> None: """ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance. @@ -32,19 +39,19 @@ class FloodSub(IPubsubRouter): """ self.pubsub = pubsub - def add_peer(self, peer_id, protocol_id): + def add_peer(self, peer_id: ID, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add """ - def remove_peer(self, peer_id): + def remove_peer(self, peer_id: ID) -> None: """ Notifies the router that a peer has been disconnected :param peer_id: id of peer to remove """ - async def handle_rpc(self, rpc, sender_peer_id): + async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed @@ -80,7 +87,7 @@ class FloodSub(IPubsubRouter): # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 await stream.write(rpc_msg.SerializeToString()) - async def join(self, topic): + async def join(self, topic: str) -> None: """ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the @@ -88,7 +95,7 @@ class FloodSub(IPubsubRouter): :param topic: topic to join """ - async def leave(self, topic): + async def leave(self, topic: str) -> None: """ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement. diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 0af5e5ff..f0d903b9 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -1,14 +1,15 @@ +from ast import literal_eval import asyncio import random from typing import ( + Any, + Dict, Iterable, List, - MutableSet, + Set, Sequence, ) -from ast import literal_eval - from libp2p.peer.id import ( ID, id_b58_decode, @@ -16,6 +17,7 @@ from libp2p.peer.id import ( from .mcache import MessageCache from .pb import rpc_pb2 +from .pubsub import Pubsub from .pubsub_router_interface import IPubsubRouter @@ -24,10 +26,45 @@ class GossipSub(IPubsubRouter): # pylint: disable=too-many-instance-attributes # pylint: disable=too-many-public-methods - def __init__(self, protocols, degree, degree_low, degree_high, time_to_live, gossip_window=3, - gossip_history=5, heartbeat_interval=120): + protocols: List[str] + pubsub: Pubsub + + degree: int + degree_high: int + degree_low: int + + time_to_live: int + + # FIXME: Should be changed to `Dict[str, List[ID]]` + mesh: Dict[str, List[str]] + # FIXME: Should be changed to `Dict[str, List[ID]]` + fanout: Dict[str, List[str]] + + # FIXME: Should be changed to `Dict[ID, str]` + peers_to_protocol: Dict[str, str] + + time_since_last_publish: Dict[str, int] + + #FIXME: Should be changed to List[ID] + peers_gossipsub: List[str] + #FIXME: Should be changed to List[ID] + peers_floodsub: List[str] + + mcache: MessageCache + + heartbeat_interval: int + + def __init__(self, + protocols: Sequence[str], + degree: int, + degree_low: int, + degree_high: int, + time_to_live: int, + gossip_window: int = 3, + gossip_history: int = 5, + heartbeat_interval: int = 120) -> None: # pylint: disable=too-many-arguments - self.protocols = protocols + self.protocols = list(protocols) self.pubsub = None # Store target degree, upper degree bound, and lower degree bound @@ -42,6 +79,9 @@ class GossipSub(IPubsubRouter): self.mesh = {} self.fanout = {} + # Create peer --> protocol mapping + self.peers_to_protocol = {} + # Create topic --> time since last publish map self.time_since_last_publish = {} @@ -56,13 +96,13 @@ class GossipSub(IPubsubRouter): # Interface functions - def get_protocols(self): + def get_protocols(self) -> List[str]: """ :return: the list of protocols supported by the router """ return self.protocols - def attach(self, pubsub): + def attach(self, pubsub: Pubsub) -> None: """ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance. @@ -74,50 +114,60 @@ class GossipSub(IPubsubRouter): # TODO: Start after delay asyncio.ensure_future(self.heartbeat()) - def add_peer(self, peer_id, protocol_id): + def add_peer(self, peer_id: ID, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add + :param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub """ # Add peer to the correct peer list peer_type = GossipSub.get_peer_type(protocol_id) peer_id_str = str(peer_id) + + self.peers_to_protocol[peer_id_str] = protocol_id + if peer_type == "gossip": self.peers_gossipsub.append(peer_id_str) elif peer_type == "flood": self.peers_floodsub.append(peer_id_str) - def remove_peer(self, peer_id): + def remove_peer(self, peer_id: ID) -> None: """ Notifies the router that a peer has been disconnected :param peer_id: id of peer to remove """ peer_id_str = str(peer_id) - self.peers_to_protocol.remove(peer_id_str) + del self.peers_to_protocol[peer_id_str] - async def handle_rpc(self, rpc, sender_peer_id): + if peer_id_str in self.peers_gossipsub: + self.peers_gossipsub.remove(peer_id_str) + if peer_id_str in self.peers_gossipsub: + self.peers_floodsub.remove(peer_id_str) + + async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: ID) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed - :param rpc: rpc message + :param rpc: RPC message + :param sender_peer_id: id of the peer who sent the message """ control_message = rpc.control - sender_peer_id = str(sender_peer_id) + sender_peer_id_str = str(sender_peer_id) - # Relay each rpc control to the appropriate handler + # Relay each rpc control message to the appropriate handler if control_message.ihave: for ihave in control_message.ihave: - await self.handle_ihave(ihave, sender_peer_id) + await self.handle_ihave(ihave, sender_peer_id_str) if control_message.iwant: for iwant in control_message.iwant: - await self.handle_iwant(iwant, sender_peer_id) + await self.handle_iwant(iwant, sender_peer_id_str) if control_message.graft: for graft in control_message.graft: - await self.handle_graft(graft, sender_peer_id) + await self.handle_graft(graft, sender_peer_id_str) if control_message.prune: for prune in control_message.prune: - await self.handle_prune(prune, sender_peer_id) + await self.handle_prune(prune, sender_peer_id_str) async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: # pylint: disable=too-many-locals @@ -152,7 +202,8 @@ class GossipSub(IPubsubRouter): :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ - send_to: MutableSet[ID] = set() + # pylint: disable=len-as-condition + send_to: Set[ID] = set() for topic in topic_ids: if topic not in self.pubsub.peer_topics: continue @@ -177,7 +228,6 @@ class GossipSub(IPubsubRouter): # I assume there could be short periods between heartbeats where topic may not # be but we should check that this path gets hit appropriately - # pylint: disable=len-as-condition if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): # If no peers in fanout, choose some peers from gossipsub peers in topic. self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( @@ -191,7 +241,7 @@ class GossipSub(IPubsubRouter): # Excludes `msg_forwarder` and `origin` yield from send_to.difference([msg_forwarder, origin]) - async def join(self, topic): + async def join(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec """ Join notifies the router that we want to receive and @@ -204,8 +254,9 @@ class GossipSub(IPubsubRouter): # Create mesh[topic] if it does not yet exist self.mesh[topic] = [] - topic_in_fanout = topic in self.fanout - fanout_peers = self.fanout[topic] if topic_in_fanout else [] + topic_in_fanout: bool = topic in self.fanout + # FIXME: Should be changed to `List[ID]` + fanout_peers: List[str] = self.fanout[topic] if topic_in_fanout else [] fanout_size = len(fanout_peers) if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): # There are less than D peers (let this number be x) @@ -229,7 +280,7 @@ class GossipSub(IPubsubRouter): if topic_in_fanout: del self.fanout[topic] - async def leave(self, topic): + async def leave(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec """ Leave notifies the router that we are no longer interested in a topic. @@ -247,7 +298,7 @@ class GossipSub(IPubsubRouter): # Interface Helper Functions @staticmethod - def get_peer_type(protocol_id): + def get_peer_type(protocol_id: str) -> str: # TODO: Do this in a better, more efficient way if "gossipsub" in protocol_id: return "gossip" @@ -255,7 +306,13 @@ class GossipSub(IPubsubRouter): return "flood" return "unknown" - async def deliver_messages_to_peers(self, peers, msg_sender, origin_id, serialized_packet): + # FIXME: type of `peers` should be changed to `List[ID]` + # FIXME: type of `msg_sender` and `origin_id` should be changed to `ID` + async def deliver_messages_to_peers(self, + peers: List[str], + msg_sender: str, + origin_id: str, + serialized_packet: bytes) -> None: for peer_id_in_topic in peers: # Forward to all peers that are not the # message sender and are not the message origin @@ -267,7 +324,7 @@ class GossipSub(IPubsubRouter): await stream.write(serialized_packet) # Heartbeat - async def heartbeat(self): + async def heartbeat(self) -> None: """ Call individual heartbeats. Note: the heartbeats are called with awaits because each heartbeat depends on the @@ -281,7 +338,7 @@ class GossipSub(IPubsubRouter): await asyncio.sleep(self.heartbeat_interval) - async def mesh_heartbeat(self): + async def mesh_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.mesh: # Skip if no peers have subscribed to the topic @@ -297,7 +354,8 @@ class GossipSub(IPubsubRouter): self.mesh[topic], ) - fanout_peers_not_in_mesh = [ + # FIXME: Should be changed to `List[ID]` + fanout_peers_not_in_mesh: List[str] = [ peer for peer in selected_peers if peer not in self.mesh[topic] @@ -311,8 +369,12 @@ class GossipSub(IPubsubRouter): if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] - selected_peers = GossipSub.select_from_minus(num_mesh_peers_in_topic - self.degree, - self.mesh[topic], []) + # FIXME: Should be changed to `List[ID]` + selected_peers = GossipSub.select_from_minus( + num_mesh_peers_in_topic - self.degree, + self.mesh[topic], + [], + ) for peer in selected_peers: # Remove peer from mesh[topic] self.mesh[topic].remove(peer) @@ -320,7 +382,7 @@ class GossipSub(IPubsubRouter): # Emit PRUNE(topic) control message to peer await self.emit_prune(topic, peer) - async def fanout_heartbeat(self): + async def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.fanout: # If time since last published > ttl @@ -328,7 +390,7 @@ class GossipSub(IPubsubRouter): if self.time_since_last_publish[topic] > self.time_to_live: # Remove topic from fanout del self.fanout[topic] - self.time_since_last_publish.remove(topic) + del self.time_since_last_publish[topic] else: num_fanout_peers_in_topic = len(self.fanout[topic]) @@ -343,7 +405,7 @@ class GossipSub(IPubsubRouter): # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) - async def gossip_heartbeat(self): + async def gossip_heartbeat(self) -> None: # pylint: disable=too-many-nested-blocks for topic in self.mesh: msg_ids = self.mcache.window(topic) @@ -362,8 +424,8 @@ class GossipSub(IPubsubRouter): # TODO: this line is a monster, can hopefully be simplified if (topic not in self.mesh or (peer not in self.mesh[topic]))\ and (topic not in self.fanout or (peer not in self.fanout[topic])): - msg_ids = [str(msg) for msg in msg_ids] - await self.emit_ihave(topic, msg_ids, peer) + msg_id_strs = [str(msg_id) for msg_id in msg_ids] + await self.emit_ihave(topic, msg_id_strs, peer) # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh @@ -383,13 +445,15 @@ class GossipSub(IPubsubRouter): for peer in peers_to_emit_ihave_to: if peer not in self.mesh[topic] and peer not in self.fanout[topic]: - msg_ids = [str(msg) for msg in msg_ids] - await self.emit_ihave(topic, msg_ids, peer) + msg_id_strs = [str(msg) for msg in msg_ids] + await self.emit_ihave(topic, msg_id_strs, peer) self.mcache.shift() @staticmethod - def select_from_minus(num_to_select, pool, minus): + def select_from_minus(num_to_select: int, + pool: Sequence[Any], + minus: Sequence[Any]) -> List[Any]: """ Select at most num_to_select subset of elements from the set (pool - minus) randomly. :param num_to_select: number of elements to randomly select @@ -400,10 +464,10 @@ class GossipSub(IPubsubRouter): # Create selection pool, which is selection_pool = pool - minus if minus: # Create a new selection pool by removing elements of minus - selection_pool = [x for x in pool if x not in minus] + selection_pool: List[Any] = [x for x in pool if x not in minus] else: # Don't create a new selection_pool if we are not subbing anything - selection_pool = pool + selection_pool = list(pool) # If num_to_select > size(selection_pool), then return selection_pool (which has the most # possible elements s.t. the number of elements is less than num_to_select) @@ -411,15 +475,17 @@ class GossipSub(IPubsubRouter): return selection_pool # Random selection - selection = random.sample(selection_pool, num_to_select) + selection: List[Any] = random.sample(selection_pool, num_to_select) return selection + # FIXME: type of `minus` should be changed to type `Sequence[ID]` + # FIXME: return type should be changed to type `List[ID]` def _get_in_topic_gossipsub_peers_from_minus( self, topic: str, num_to_select: int, - minus: Sequence[ID]) -> List[ID]: + minus: Sequence[str]) -> List[str]: gossipsub_peers_in_topic = [ peer_str for peer_str in self.pubsub.peer_topics[topic] @@ -433,7 +499,7 @@ class GossipSub(IPubsubRouter): # RPC handlers - async def handle_ihave(self, ihave_msg, sender_peer_id): + async def handle_ihave(self, ihave_msg: rpc_pb2.Message, sender_peer_id: str) -> None: """ Checks the seen set and requests unknown messages with an IWANT message. """ @@ -442,29 +508,37 @@ class GossipSub(IPubsubRouter): from_id_str = sender_peer_id # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache - seen_seqnos_and_peers = [seqno_and_from - for seqno_and_from in self.pubsub.seen_messages.keys()] + seen_seqnos_and_peers = [ + seqno_and_from + for seqno_and_from in self.pubsub.seen_messages.keys() + ] # Add all unknown message ids (ids that appear in ihave_msg but not in seen_seqnos) to list # of messages we want to request - msg_ids_wanted = [msg_id for msg_id in ihave_msg.messageIDs - if literal_eval(msg_id) not in seen_seqnos_and_peers] + # FIXME: Update type of message ID + msg_ids_wanted: List[Any] = [ + msg_id + for msg_id in ihave_msg.messageIDs + if literal_eval(msg_id) not in seen_seqnos_and_peers + ] # Request messages with IWANT message if msg_ids_wanted: await self.emit_iwant(msg_ids_wanted, from_id_str) - async def handle_iwant(self, iwant_msg, sender_peer_id): + async def handle_iwant(self, iwant_msg: rpc_pb2.Message, sender_peer_id: str) -> None: """ Forwards all request messages that are present in mcache to the requesting peer. """ from_id_str = sender_peer_id - msg_ids = [literal_eval(msg) for msg in iwant_msg.messageIDs] - msgs_to_forward = [] + # FIXME: Update type of message ID + # FIXME: Find a better way to parse the msg ids + msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs] + msgs_to_forward: List[rpc_pb2.Message] = [] for msg_id_iwant in msg_ids: # Check if the wanted message ID is present in mcache - msg = self.mcache.get(msg_id_iwant) + msg: rpc_pb2.Message = self.mcache.get(msg_id_iwant) # Cache hit if msg: @@ -476,12 +550,12 @@ class GossipSub(IPubsubRouter): # because then the message will forwarded to peers in the topics contained in the messages. # We should # 1) Package these messages into a single packet - packet = rpc_pb2.RPC() + packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.publish.extend(msgs_to_forward) # 2) Serialize that packet - rpc_msg = packet.SerializeToString() + rpc_msg: bytes = packet.SerializeToString() # 3) Get the stream to this peer # TODO: Should we pass in from_id or from_id_str here? @@ -490,8 +564,8 @@ class GossipSub(IPubsubRouter): # 4) And write the packet to the stream await peer_stream.write(rpc_msg) - async def handle_graft(self, graft_msg, sender_peer_id): - topic = graft_msg.topicID + async def handle_graft(self, graft_msg: rpc_pb2.Message, sender_peer_id: str) -> None: + topic: str = graft_msg.topicID from_id_str = sender_peer_id @@ -503,8 +577,8 @@ class GossipSub(IPubsubRouter): # Respond with PRUNE if not subscribed to the topic await self.emit_prune(topic, sender_peer_id) - async def handle_prune(self, prune_msg, sender_peer_id): - topic = prune_msg.topicID + async def handle_prune(self, prune_msg: rpc_pb2.Message, sender_peer_id: str) -> None: + topic: str = prune_msg.topicID from_id_str = sender_peer_id @@ -514,65 +588,65 @@ class GossipSub(IPubsubRouter): # RPC emitters - async def emit_ihave(self, topic, msg_ids, to_peer): + async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: str) -> None: """ Emit ihave message, sent to to_peer, for topic and msg_ids """ - ihave_msg = rpc_pb2.ControlIHave() + ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave() ihave_msg.messageIDs.extend(msg_ids) ihave_msg.topicID = topic - control_msg = rpc_pb2.ControlMessage() + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.ihave.extend([ihave_msg]) await self.emit_control_message(control_msg, to_peer) - async def emit_iwant(self, msg_ids, to_peer): + async def emit_iwant(self, msg_ids: Any, to_peer: str) -> None: """ Emit iwant message, sent to to_peer, for msg_ids """ - iwant_msg = rpc_pb2.ControlIWant() + iwant_msg: rpc_pb2.ControlIWant = rpc_pb2.ControlIWant() iwant_msg.messageIDs.extend(msg_ids) - control_msg = rpc_pb2.ControlMessage() + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.iwant.extend([iwant_msg]) await self.emit_control_message(control_msg, to_peer) - async def emit_graft(self, topic, to_peer): + async def emit_graft(self, topic: str, to_peer: str) -> None: """ Emit graft message, sent to to_peer, for topic """ - graft_msg = rpc_pb2.ControlGraft() + graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft() graft_msg.topicID = topic - control_msg = rpc_pb2.ControlMessage() + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.graft.extend([graft_msg]) await self.emit_control_message(control_msg, to_peer) - async def emit_prune(self, topic, to_peer): + async def emit_prune(self, topic: str, to_peer: str) -> None: """ Emit graft message, sent to to_peer, for topic """ - prune_msg = rpc_pb2.ControlPrune() + prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() prune_msg.topicID = topic - control_msg = rpc_pb2.ControlMessage() + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.prune.extend([prune_msg]) await self.emit_control_message(control_msg, to_peer) - async def emit_control_message(self, control_msg, to_peer): + async def emit_control_message(self, control_msg: rpc_pb2.ControlMessage, to_peer: str) -> None: # Add control message to packet - packet = rpc_pb2.RPC() + packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.control.CopyFrom(control_msg) - rpc_msg = packet.SerializeToString() + rpc_msg: bytes = packet.SerializeToString() # Get stream for peer from pubsub peer_stream = self.pubsub.peers[to_peer] diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py index 071945a5..ef140c59 100644 --- a/libp2p/pubsub/mcache.py +++ b/libp2p/pubsub/mcache.py @@ -1,20 +1,44 @@ +from typing import ( + Dict, + List, + Optional, + Sequence, + Tuple, +) + +from .pb import rpc_pb2 + + +class CacheEntry: + # pylint: disable=too-few-public-methods + + mid: Tuple[bytes, bytes] + topics: List[str] + + """ + A logical representation of an entry in the mcache's _history_. + """ + def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None: + """ + Constructor. + :param mid: (seqno, from_id) of the msg + :param topics: list of topics this message was sent on + """ + self.mid = mid + self.topics = list(topics) + + class MessageCache: - class CacheEntry: - # pylint: disable=too-few-public-methods - """ - A logical representation of an entry in the mcache's _history_. - """ - def __init__(self, mid, topics): - """ - Constructor. - :param mid: (seqno, from_id) of the msg - :param topics: list of topics this message was sent on - """ - self.mid = mid - self.topics = topics - def __init__(self, window_size, history_size): + window_size: int + history_size: int + + msgs: Dict[Tuple[bytes, bytes], rpc_pb2.Message] + + history: List[List[CacheEntry]] + + def __init__(self, window_size: int, history_size: int) -> None: """ Constructor. :param window_size: Size of the window desired. @@ -29,25 +53,22 @@ class MessageCache: # max length of history_size. each item is a list of CacheEntry. # messages lost upon shift(). - self.history = [] + self.history = [ + [] + for _ in range(history_size) + ] - for _ in range(history_size): - self.history.append([]) - - def put(self, msg): + def put(self, msg: rpc_pb2.Message) -> None: """ Put a message into the mcache. :param msg: The rpc message to put in. Should contain seqno and from_id """ - mid = (msg.seqno, msg.from_id) + mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id) self.msgs[mid] = msg - if not self.history[0]: - self.history[0] = [] + self.history[0].append(CacheEntry(mid, msg.topicIDs)) - self.history[0].append(self.CacheEntry(mid, msg.topicIDs)) - - def get(self, mid): + def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]: """ Get a message from the mcache. :param mid: (seqno, from_id) of the message to get. @@ -58,13 +79,13 @@ class MessageCache: return None - def window(self, topic): + def window(self, topic: str) -> List[Tuple[bytes, bytes]]: """ Get the window for this topic. :param topic: Topic whose message ids we desire. :return: List of mids in the current window. """ - mids = [] + mids: List[Tuple[bytes, bytes]] = [] for entries_list in self.history[: self.window_size]: for entry in entries_list: @@ -74,16 +95,16 @@ class MessageCache: return mids - def shift(self): + def shift(self) -> None: """ Shift the window over by 1 position, dropping the last element of the history. """ - last_entries = self.history[len(self.history) - 1] + last_entries: List[CacheEntry] = self.history[len(self.history) - 1] for entry in last_entries: del self.msgs[entry.mid] - i = len(self.history) - 2 + i: int = len(self.history) - 2 while i >= 0: self.history[i + 1] = self.history[i] diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 2af4e245..744c37c5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -5,27 +5,22 @@ from typing import ( Any, Dict, List, - Sequence, Tuple, + TYPE_CHECKING, ) from lru import LRU -from libp2p.host.host_interface import ( - IHost, -) -from libp2p.peer.id import ( - ID, -) -from libp2p.network.stream.net_stream_interface import ( - INetStream, -) +from libp2p.host.host_interface import IHost +from libp2p.peer.id import ID + +from libp2p.network.stream.net_stream_interface import INetStream from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee -from .pubsub_router_interface import ( - IPubsubRouter, -) + +if TYPE_CHECKING: + from .pubsub_router_interface import IPubsubRouter def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: @@ -34,30 +29,37 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: class Pubsub: - # pylint: disable=too-many-instance-attributes, no-member + # pylint: disable=too-many-instance-attributes, no-member, unsubscriptable-object host: IHost my_id: ID - router: IPubsubRouter - peer_queue: asyncio.Queue - protocols: Sequence[str] - incoming_msgs_from_peers: asyncio.Queue() - outgoing_messages: asyncio.Queue() + + router: 'IPubsubRouter' + + peer_queue: 'asyncio.Queue[ID]' + + protocols: List[str] + + incoming_msgs_from_peers: 'asyncio.Queue[rpc_pb2.Message]' + outgoing_messages: 'asyncio.Queue[rpc_pb2.Message]' + seen_messages: LRU - my_topics: Dict[str, asyncio.Queue] + + my_topics: Dict[str, 'asyncio.Queue[rpc_pb2.Message]'] + # FIXME: Should be changed to `Dict[str, List[ID]]` peer_topics: Dict[str, List[str]] # FIXME: Should be changed to `Dict[ID, INetStream]` peers: Dict[str, INetStream] + # NOTE: Be sure it is increased atomically everytime. counter: int # uint64 - def __init__( - self, - host: IHost, - router: IPubsubRouter, - my_id: ID, - cache_size: int = None) -> None: + def __init__(self, + host: IHost, + router: 'IPubsubRouter', + my_id: ID, + cache_size: int = None) -> None: """ Construct a new Pubsub object, which is responsible for handling all Pubsub-related messages and relaying messages as appropriate to the @@ -99,9 +101,11 @@ class Pubsub: self.my_topics = {} # Map of topic to peers to keep track of what peers are subscribed to + # FIXME: Should be changed to `Dict[str, ID]` self.peer_topics = {} # Create peers map, which maps peer_id (as string) to stream (to a given peer) + # FIXME: Should be changed to `Dict[ID, INetStream]` self.peers = {} self.counter = time.time_ns() @@ -114,7 +118,7 @@ class Pubsub: Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics """ - packet = rpc_pb2.RPC() + packet: rpc_pb2.RPC = rpc_pb2.RPC() if self.my_topics: for topic_id in self.my_topics: packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( @@ -131,8 +135,8 @@ class Pubsub: peer_id = stream.mplex_conn.peer_id while True: - incoming = (await stream.read()) - rpc_incoming = rpc_pb2.RPC() + incoming: bytes = (await stream.read()) + rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) if rpc_incoming.publish: @@ -168,12 +172,12 @@ class Pubsub: """ # Add peer # Map peer to stream - peer_id = stream.mplex_conn.peer_id + peer_id: ID = stream.mplex_conn.peer_id self.peers[str(peer_id)] = stream self.router.add_peer(peer_id, stream.get_protocol()) # Send hello packet - hello = self.get_hello_packet() + hello: bytes = self.get_hello_packet() await stream.write(hello) # Pass stream off to stream reader @@ -188,12 +192,12 @@ class Pubsub: """ while True: - peer_id = await self.peer_queue.get() + peer_id: ID = await self.peer_queue.get() # Open a stream to peer on existing connection # (we know connection exists since that's the only way # an element gets added to peer_queue) - stream = await self.host.new_stream(peer_id, self.protocols) + stream: INetStream = await self.host.new_stream(peer_id, self.protocols) # Add Peer # Map peer to stream @@ -201,7 +205,7 @@ class Pubsub: self.router.add_peer(peer_id, stream.get_protocol()) # Send hello packet - hello = self.get_hello_packet() + hello: bytes = self.get_hello_packet() await stream.write(hello) # Pass stream off to stream reader @@ -219,24 +223,24 @@ class Pubsub: :param origin_id: id of the peer who subscribe to the message :param sub_message: RPC.SubOpts """ - origin_id = str(origin_id) + origin_id_str = str(origin_id) if sub_message.subscribe: if sub_message.topicid not in self.peer_topics: - self.peer_topics[sub_message.topicid] = [origin_id] - elif origin_id not in self.peer_topics[sub_message.topicid]: + self.peer_topics[sub_message.topicid] = [origin_id_str] + elif origin_id_str not in self.peer_topics[sub_message.topicid]: # Add peer to topic - self.peer_topics[sub_message.topicid].append(origin_id) + self.peer_topics[sub_message.topicid].append(origin_id_str) else: if sub_message.topicid in self.peer_topics: - if origin_id in self.peer_topics[sub_message.topicid]: - self.peer_topics[sub_message.topicid].remove(origin_id) + if origin_id_str in self.peer_topics[sub_message.topicid]: + self.peer_topics[sub_message.topicid].remove(origin_id_str) # FIXME(mhchia): Change the function name? # FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf async def handle_talk(self, publish_message: Any) -> None: """ Put incoming message from a peer onto my blocking queue - :param talk: RPC.Message format + :param publish_message: RPC.Message format """ # Check if this message has any topics that we are subscribed to @@ -247,7 +251,7 @@ class Pubsub: # for each topic await self.my_topics[topic].put(publish_message) - async def subscribe(self, topic_id: str) -> asyncio.Queue: + async def subscribe(self, topic_id: str) -> 'asyncio.Queue[rpc_pb2.Message]': """ Subscribe ourself to a topic :param topic_id: topic_id to subscribe to @@ -261,7 +265,7 @@ class Pubsub: self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message - packet = rpc_pb2.RPC() + packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe=True, topicid=topic_id.encode('utf-8') @@ -289,7 +293,7 @@ class Pubsub: del self.my_topics[topic_id] # Create unsubscribe message - packet = rpc_pb2.RPC() + packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe=False, topicid=topic_id.encode('utf-8') @@ -301,8 +305,7 @@ class Pubsub: # Tell router we are leaving this topic await self.router.leave(topic_id) - # FIXME: `rpc_msg` can be further type hinted with mypy_protobuf - async def message_all_peers(self, rpc_msg: Any) -> None: + async def message_all_peers(self, raw_msg: bytes) -> None: """ Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast @@ -311,7 +314,7 @@ class Pubsub: # Broadcast message for _, stream in self.peers.items(): # Write message to stream - await stream.write(rpc_msg) + await stream.write(raw_msg) async def publish(self, topic_id: str, data: bytes) -> None: """ @@ -370,6 +373,6 @@ class Pubsub: self.seen_messages[msg_id] = 1 def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: - if len(self.my_topics) == 0: + if not self.my_topics: return False return all([topic in self.my_topics for topic in msg.topicIDs]) diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 4173bd8f..9f06176e 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -1,23 +1,39 @@ +from typing import ( + TYPE_CHECKING, +) + +from multiaddr import Multiaddr + +from libp2p.network.network_interface import INetwork from libp2p.network.notifee_interface import INotifee +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn + +from libp2p.network.stream.net_stream_interface import INetStream + +if TYPE_CHECKING: + import asyncio + from libp2p.peer.id import ID class PubsubNotifee(INotifee): - # pylint: disable=too-many-instance-attributes, cell-var-from-loop + # pylint: disable=too-many-instance-attributes, cell-var-from-loop, unsubscriptable-object - def __init__(self, initiator_peers_queue): + initiator_peers_queue: 'asyncio.Queue[ID]' + + def __init__(self, initiator_peers_queue: 'asyncio.Queue[ID]') -> None: """ :param initiator_peers_queue: queue to add new peers to so that pubsub can process new peers after we connect to them """ self.initiator_peers_queue = initiator_peers_queue - async def opened_stream(self, network, stream): + async def opened_stream(self, network: INetwork, stream: INetStream) -> None: pass - async def closed_stream(self, network, stream): + async def closed_stream(self, network: INetwork, stream: INetStream) -> None: pass - async def connected(self, network, conn): + async def connected(self, network: INetwork, conn: IMuxedConn) -> None: """ Add peer_id to initiator_peers_queue, so that this peer_id can be used to create a stream and we only want to have one pubsub stream with each peer. @@ -30,11 +46,11 @@ class PubsubNotifee(INotifee): if conn.initiator: await self.initiator_peers_queue.put(conn.peer_id) - async def disconnected(self, network, conn): + async def disconnected(self, network: INetwork, conn: IMuxedConn) -> None: pass - async def listen(self, network, multiaddr): + async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None: pass - async def listen_close(self, network, multiaddr): + async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None: pass diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 8819e5f0..8a6a879c 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -1,15 +1,26 @@ from abc import ABC, abstractmethod +from typing import ( + List, + TYPE_CHECKING, +) + +from libp2p.peer.id import ID + +from .pb import rpc_pb2 + +if TYPE_CHECKING: + from .pubsub import Pubsub class IPubsubRouter(ABC): @abstractmethod - def get_protocols(self): + def get_protocols(self) -> List[str]: """ :return: the list of protocols supported by the router """ @abstractmethod - def attach(self, pubsub): + def attach(self, pubsub: 'Pubsub') -> None: """ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance. @@ -17,21 +28,21 @@ class IPubsubRouter(ABC): """ @abstractmethod - def add_peer(self, peer_id, protocol_id): + def add_peer(self, peer_id: ID, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add """ @abstractmethod - def remove_peer(self, peer_id): + def remove_peer(self, peer_id: ID) -> None: """ Notifies the router that a peer has been disconnected :param peer_id: id of peer to remove """ @abstractmethod - def handle_rpc(self, rpc, sender_peer_id): + async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed @@ -41,8 +52,9 @@ class IPubsubRouter(ABC): :param rpc: rpc message """ + # FIXME: Should be changed to type 'peer.ID' @abstractmethod - async def publish(self, msg_forwarder, pubsub_msg): + async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: """ Invoked to forward a new message that has been validated :param msg_forwarder: peer_id of message sender @@ -50,7 +62,7 @@ class IPubsubRouter(ABC): """ @abstractmethod - def join(self, topic): + async def join(self, topic: str) -> None: """ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the @@ -59,7 +71,7 @@ class IPubsubRouter(ABC): """ @abstractmethod - def leave(self, topic): + async def leave(self, topic: str) -> None: """ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement. diff --git a/libp2p/routing/interfaces.py b/libp2p/routing/interfaces.py index 1f29d48c..5d0e63f3 100644 --- a/libp2p/routing/interfaces.py +++ b/libp2p/routing/interfaces.py @@ -1,11 +1,18 @@ -from abc import ABC, abstractmethod +from abc import ( + ABC, + abstractmethod, +) +from typing import Iterable + +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo # pylint: disable=too-few-public-methods class IContentRouting(ABC): @abstractmethod - def provide(self, cid, announce=True): + def provide(self, cid: bytes, announce: bool = True) -> None: """ Provide adds the given cid to the content routing system. If announce is True, it also announces it, otherwise it is just kept in the local @@ -13,7 +20,7 @@ class IContentRouting(ABC): """ @abstractmethod - def find_provider_iter(self, cid, count): + def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]: """ Search for peers who are able to provide a given key returns an iterator of peer.PeerInfo @@ -23,7 +30,7 @@ class IContentRouting(ABC): class IPeerRouting(ABC): @abstractmethod - def find_peer(self, peer_id): + async def find_peer(self, peer_id: ID) -> PeerInfo: """ Find specific Peer FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo diff --git a/libp2p/routing/kademlia/kademlia_content_router.py b/libp2p/routing/kademlia/kademlia_content_router.py index ac49be8d..468ca3ae 100644 --- a/libp2p/routing/kademlia/kademlia_content_router.py +++ b/libp2p/routing/kademlia/kademlia_content_router.py @@ -1,9 +1,14 @@ +from typing import ( + Iterable, +) + +from libp2p.peer.peerinfo import PeerInfo from libp2p.routing.interfaces import IContentRouting class KadmeliaContentRouter(IContentRouting): - def provide(self, cid, announce=True): + def provide(self, cid: bytes, announce: bool = True) -> None: """ Provide adds the given cid to the content routing system. If announce is True, it also announces it, otherwise it is just kept in the local @@ -12,7 +17,7 @@ class KadmeliaContentRouter(IContentRouting): # the DHT finds the closest peers to `key` using the `FIND_NODE` RPC # then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers. - def find_provider_iter(self, cid, count): + def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]: """ Search for peers who are able to provide a given key returns an iterator of peer.PeerInfo diff --git a/libp2p/routing/kademlia/kademlia_peer_router.py b/libp2p/routing/kademlia/kademlia_peer_router.py index 45c43c82..5e426fe5 100644 --- a/libp2p/routing/kademlia/kademlia_peer_router.py +++ b/libp2p/routing/kademlia/kademlia_peer_router.py @@ -1,16 +1,26 @@ import ast +from typing import ( + Union, +) +from libp2p.kademlia.kad_peerinfo import ( + KadPeerInfo, + create_kad_peerinfo, +) +from libp2p.kademlia.network import KademliaServer +from libp2p.peer.id import ID from libp2p.routing.interfaces import IPeerRouting -from libp2p.kademlia.kad_peerinfo import create_kad_peerinfo class KadmeliaPeerRouter(IPeerRouting): # pylint: disable=too-few-public-methods - def __init__(self, dht_server): + server: KademliaServer + + def __init__(self, dht_server: KademliaServer) -> None: self.server = dht_server - async def find_peer(self, peer_id): + async def find_peer(self, peer_id: ID) -> KadPeerInfo: """ Find a specific peer :param peer_id: peer to search for @@ -21,7 +31,7 @@ class KadmeliaPeerRouter(IPeerRouting): value = await self.server.get(xor_id) return decode_peerinfo(value) -def decode_peerinfo(encoded): +def decode_peerinfo(encoded: Union[bytes, str]) -> KadPeerInfo: if isinstance(encoded, bytes): encoded = encoded.decode() try: diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 3fd69619..08b30d55 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,7 +1,8 @@ import asyncio +from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream + from .utils import get_flag -from ..muxed_stream_interface import IMuxedStream class MplexStream(IMuxedStream): diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index 0faf7705..b7bd4e6b 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -1,11 +1,16 @@ from abc import ABC, abstractmethod +from libp2p.peer.id import ID + class IMuxedConn(ABC): """ reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go """ + initiator: bool + peer_id: ID + @abstractmethod def __init__(self, conn, generic_protocol_handler, peer_id): """ diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py index b15dba1e..eb6f2672 100644 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ b/libp2p/stream_muxer/muxed_stream_interface.py @@ -1,8 +1,15 @@ -from abc import ABC, abstractmethod +from abc import ( + ABC, + abstractmethod, +) + +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn class IMuxedStream(ABC): + mplex_conn: IMuxedConn + @abstractmethod def read(self): """ diff --git a/tests/libp2p/test_libp2p.py b/tests/libp2p/test_libp2p.py index e9bfb83f..a7465036 100644 --- a/tests/libp2p/test_libp2p.py +++ b/tests/libp2p/test_libp2p.py @@ -316,20 +316,20 @@ async def test_host_connect(): transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) - assert not node_a.get_peerstore().peers() + assert not node_a.get_peerstore().peer_ids() addr = node_b.get_addrs()[0] info = info_from_p2p_addr(addr) await node_a.connect(info) - assert len(node_a.get_peerstore().peers()) == 1 + assert len(node_a.get_peerstore().peer_ids()) == 1 await node_a.connect(info) # make sure we don't do double connection - assert len(node_a.get_peerstore().peers()) == 1 + assert len(node_a.get_peerstore().peer_ids()) == 1 - assert node_b.get_id() in node_a.get_peerstore().peers() + assert node_b.get_id() in node_a.get_peerstore().peer_ids() ma_node_b = multiaddr.Multiaddr('/p2p/%s' % node_b.get_id().pretty()) for addr in node_a.get_peerstore().addrs(node_b.get_id()): assert addr.encapsulate(ma_node_b) in node_b.get_addrs() diff --git a/tests/peer/test_peerstore.py b/tests/peer/test_peerstore.py index 52029ba1..ffb7b2dc 100644 --- a/tests/peer/test_peerstore.py +++ b/tests/peer/test_peerstore.py @@ -55,4 +55,4 @@ def test_peers(): store.put("peer2", "key", "val") store.add_addr("peer3", "/foo", 10) - assert set(store.peers()) == set(["peer1", "peer2", "peer3"]) + assert set(store.peer_ids()) == set(["peer1", "peer2", "peer3"]) diff --git a/tox.ini b/tox.ini index 10302ca1..ea6af8fe 100644 --- a/tox.ini +++ b/tox.ini @@ -19,5 +19,6 @@ basepython = basepython = python3 extras = dev commands = - pylint --rcfile={toxinidir}/.pylintrc libp2p examples tests + # TODO: Add the tests/ folder back to pylint + pylint --rcfile={toxinidir}/.pylintrc libp2p examples mypy -p libp2p -p examples --config-file {toxinidir}/mypy.ini