From d716e90e174e8532a956c42ce9fe8472c151ec54 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 28 Jul 2019 14:30:15 +0800 Subject: [PATCH] Fix on type hints --- libp2p/host/basic_host.py | 3 ++- libp2p/host/host_interface.py | 2 +- libp2p/network/network_interface.py | 2 +- libp2p/network/stream/net_stream_interface.py | 4 ++++ libp2p/network/swarm.py | 2 +- libp2p/pubsub/floodsub.py | 3 +-- libp2p/pubsub/gossipsub.py | 3 +-- libp2p/pubsub/pubsub.py | 19 ++++++++----------- libp2p/pubsub/pubsub_notifee.py | 16 +++++++++------- libp2p/pubsub/pubsub_router_interface.py | 3 +-- .../muxed_connection_interface.py | 5 +++++ 11 files changed, 34 insertions(+), 28 deletions(-) diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index ac4fe260..5e83f5ce 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -1,6 +1,7 @@ from typing import ( Any, Callable, + Coroutine, List, Sequence, ) @@ -83,7 +84,7 @@ class BasicHost(IHost): addrs.append(addr.encapsulate(p2p_part)) return addrs - def set_stream_handler(self, protocol_id: str, stream_handler: Callable[[INetStream], None]) -> bool: + def set_stream_handler(self, protocol_id: str, stream_handler: Callable[[INetStream], Coroutine[Any, Any, None]]) -> bool: """ set stream handler for host :param protocol_id: protocol id used on stream diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index 0ee74575..b14c11db 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -57,7 +57,7 @@ class IHost(ABC): """ @abstractmethod - def set_stream_handler(self, protocol_id: str, stream_handler: Callable[[INetStream], None]) -> bool: + def set_stream_handler(self, protocol_id: str, stream_handler: Callable[[INetStream], Coroutine[Any, Any, None]]) -> bool: """ set stream handler for host :param protocol_id: protocol id used on stream diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 6bffc9c4..41ab900b 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -35,7 +35,7 @@ class INetwork(ABC): """ @abstractmethod - def set_stream_handler(self, protocol_id: str, stream_handler: Callable[[NetStream], None]) -> bool: + def set_stream_handler(self, protocol_id: str, stream_handler: Callable[[NetStream], Coroutine[Any, Any, None]]) -> bool: """ :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index 22217410..30d7ac7d 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -4,9 +4,13 @@ from typing import ( Coroutine, ) +from libp2p.stream_muxer.mplex.mplex import Mplex + class INetStream(ABC): + mplex_conn: Mplex + @abstractmethod def get_protocol(self) -> str: """ diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 7a6aa2b0..50fd6031 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -74,7 +74,7 @@ class Swarm(INetwork): def get_peer_id(self) -> ID: return self.self_id - def set_stream_handler(self, protocol_id: str, stream_handler: Callable[[NetStream], None]) -> bool: + def set_stream_handler(self, protocol_id: str, stream_handler: Callable[[NetStream], Coroutine[Any, Any, None]]) -> bool: """ :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 8b10eb46..c556ef9e 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -42,8 +42,7 @@ class FloodSub(IPubsubRouter): """ self.pubsub = pubsub - # FIXME: Should be changed to type 'peer.ID' - def add_peer(self, peer_id: str, protocol_id: str) -> None: + def add_peer(self, peer_id: ID, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index c10dc76b..91517a60 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -109,8 +109,7 @@ class GossipSub(IPubsubRouter): # TODO: Start after delay asyncio.ensure_future(self.heartbeat()) - # FIXME: Shoudl be changed to type 'peer.ID' - def add_peer(self, peer_id: str, protocol_id: str) -> None: + def add_peer(self, peer_id: ID, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index f63fa6ab..5887fb4f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -22,8 +22,8 @@ from libp2p.host.host_interface import ( from libp2p.peer.id import ( ID, ) -from libp2p.network.stream.net_stream import ( - NetStream, +from libp2p.network.stream.net_stream_interface import ( + INetStream, ) @@ -40,8 +40,7 @@ class Pubsub: router: IPubsubRouter - # FIXME: Should be changed to `asyncio.Queue[ID]` - peer_queue: asyncio.Queue[str] + peer_queue: asyncio.Queue[ID] protocols: List[str] @@ -78,7 +77,6 @@ class Pubsub: self.router.attach(self) # Register a notifee - # FIXME: Should be changed to `asyncio.Queue[ID]` self.peer_queue = asyncio.Queue() self.host.get_network().notify(PubsubNotifee(self.peer_queue)) @@ -109,7 +107,7 @@ class Pubsub: self.peer_topics = {} # Create peers map, which maps peer_id (as string) to stream (to a given peer) - # FIXME: Should be changed to `Dict[ID, NetStream]` + # FIXME: Should be changed to `Dict[ID, INetStream]` self.peers = {} self.counter = time.time_ns() @@ -130,7 +128,7 @@ class Pubsub: return packet.SerializeToString() - async def continuously_read_stream(self, stream: NetStream) -> None: + async def continuously_read_stream(self, stream: INetStream) -> None: """ Read from input stream in an infinite loop. Process messages from other nodes @@ -168,7 +166,7 @@ class Pubsub: # Force context switch await asyncio.sleep(0) - async def stream_handler(self, stream: NetStream) -> None: + async def stream_handler(self, stream: INetStream) -> None: """ Stream handler for pubsub. Gets invoked whenever a new stream is created on one of the supported pubsub protocols. @@ -196,13 +194,12 @@ class Pubsub: """ while True: - # FIXME: Should be changed to type 'ID' - peer_id: str = await self.peer_queue.get() + peer_id: ID = await self.peer_queue.get() # Open a stream to peer on existing connection # (we know connection exists since that's the only way # an element gets added to peer_queue) - stream: NetStream = await self.host.new_stream(peer_id, self.protocols) + stream: INetStream = await self.host.new_stream(peer_id, self.protocols) # Add Peer # Map peer to stream diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 2798bb13..bc05f338 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -5,8 +5,11 @@ from typing import ( from multiaddr import Multiaddr -from libp2p.stream_muxer.mplex.mplex import ( - Mplex, +from libp2p.peer.id import ( + ID, +) +from libp2p.stream_muxer.muxed_connection_interface import ( + IMuxedConn, ) from libp2p.network.notifee_interface import ( INotifee, @@ -22,10 +25,9 @@ from libp2p.network.stream.net_stream_interface import ( class PubsubNotifee(INotifee): # pylint: disable=too-many-instance-attributes, cell-var-from-loop - # FIXME: Should be changed to type 'peer.ID' - initiator_peers_queue: asyncio.Queue[str] + initiator_peers_queue: asyncio.Queue[ID] - def __init__(self, initiator_peers_queue: asyncio.Queue[str]) -> None: + def __init__(self, initiator_peers_queue: asyncio.Queue[ID]) -> None: """ :param initiator_peers_queue: queue to add new peers to so that pubsub can process new peers after we connect to them @@ -38,7 +40,7 @@ class PubsubNotifee(INotifee): async def closed_stream(self, network: INetwork, stream: INetStream) -> None: pass - async def connected(self, network: INetwork, conn: Mplex) -> None: + async def connected(self, network: INetwork, conn: IMuxedConn) -> None: """ Add peer_id to initiator_peers_queue, so that this peer_id can be used to create a stream and we only want to have one pubsub stream with each peer. @@ -51,7 +53,7 @@ class PubsubNotifee(INotifee): if conn.initiator: await self.initiator_peers_queue.put(conn.peer_id) - async def disconnected(self, network: INetwork, conn: Mplex) -> None: + async def disconnected(self, network: INetwork, conn: IMuxedConn) -> None: pass async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None: diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 14ad2f0f..d644c39f 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -25,9 +25,8 @@ class IPubsubRouter(ABC): :param pubsub: pubsub instance to attach to """ - # FIXME: Should be changed to type 'peer.ID' @abstractmethod - def add_peer(self, peer_id: str, protocol_id: str) -> None: + def add_peer(self, peer_id: ID, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index 0faf7705..b7bd4e6b 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -1,11 +1,16 @@ from abc import ABC, abstractmethod +from libp2p.peer.id import ID + class IMuxedConn(ABC): """ reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go """ + initiator: bool + peer_id: ID + @abstractmethod def __init__(self, conn, generic_protocol_handler, peer_id): """