From c4105688d1744e19f76301bba0c44548ca0d49df Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 30 Jul 2019 17:31:08 +0800 Subject: [PATCH] Fix after rebase --- libp2p/host/host_interface.py | 4 +-- libp2p/network/network_interface.py | 5 ++-- libp2p/network/stream/net_stream_interface.py | 3 -- libp2p/network/swarm.py | 1 - libp2p/peer/id.py | 3 +- libp2p/pubsub/floodsub.py | 3 +- libp2p/pubsub/gossipsub.py | 23 ++++++++------- libp2p/pubsub/pubsub.py | 29 +++++++++---------- libp2p/pubsub/pubsub_notifee.py | 7 +++-- libp2p/pubsub/pubsub_router_interface.py | 3 +- libp2p/routing/interfaces.py | 8 ++--- .../routing/kademlia/kademlia_peer_router.py | 1 - libp2p/stream_muxer/muxed_stream_interface.py | 13 +++++++-- 13 files changed, 52 insertions(+), 51 deletions(-) diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index 1607d0fc..50858089 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -59,8 +59,8 @@ class IHost(ABC): # stream will decide which protocol_id to run on @abstractmethod async def new_stream(self, - peer_id: ID, - protocol_ids: Sequence[str]) -> INetStream: + peer_id: ID, + protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id that host is connecting :param protocol_ids: protocol ids that stream can run on diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index fc1694a8..f34a752c 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -3,7 +3,6 @@ from abc import ( abstractmethod, ) from typing import ( - Any, Awaitable, Callable, Dict, @@ -59,8 +58,8 @@ class INetwork(ABC): @abstractmethod async def new_stream(self, - peer_id: ID, - protocol_ids: Sequence[str]) -> INetStream: + peer_id: ID, + protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id of destination :param protocol_ids: available protocol ids to use for stream diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index 31c0080e..ca3858ff 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -1,7 +1,4 @@ from abc import ABC, abstractmethod -from typing import ( - Any, -) from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 00383600..bedba40c 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -1,6 +1,5 @@ import asyncio from typing import ( - Any, Awaitable, Callable, Dict, diff --git a/libp2p/peer/id.py b/libp2p/peer/id.py index 3a4c4a3d..4e82ca26 100644 --- a/libp2p/peer/id.py +++ b/libp2p/peer/id.py @@ -25,7 +25,8 @@ class ID: def __init__(self, id_str: str) -> None: self._id_str = id_str - def to_bytes(self) -> bytes: + # FIXME: Should return type `bytes` + def to_bytes(self) -> str: return self._id_str def get_raw_id(self) -> str: diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index f2eb79f3..e51c8310 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -51,8 +51,7 @@ class FloodSub(IPubsubRouter): :param peer_id: id of peer to remove """ - # FIXME: Should be changed to type 'peer.ID' - async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: str) -> None: + async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index c778d66c..4f2c88e9 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -6,7 +6,7 @@ from typing import ( Dict, Iterable, List, - MutableSet, + Set, Sequence, ) @@ -145,8 +145,7 @@ class GossipSub(IPubsubRouter): if peer_id_str in self.peers_gossipsub: self.peers_floodsub.remove(peer_id_str) - # FIXME: type of `sender_peer_id` should be changed to `ID` - async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: str) -> None: + async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: ID) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed @@ -154,21 +153,21 @@ class GossipSub(IPubsubRouter): :param sender_peer_id: id of the peer who sent the message """ control_message = rpc.control - sender_peer_id = str(sender_peer_id) + sender_peer_id_str = str(sender_peer_id) # Relay each rpc control message to the appropriate handler if control_message.ihave: for ihave in control_message.ihave: - await self.handle_ihave(ihave, sender_peer_id) + await self.handle_ihave(ihave, sender_peer_id_str) if control_message.iwant: for iwant in control_message.iwant: - await self.handle_iwant(iwant, sender_peer_id) + await self.handle_iwant(iwant, sender_peer_id_str) if control_message.graft: for graft in control_message.graft: - await self.handle_graft(graft, sender_peer_id) + await self.handle_graft(graft, sender_peer_id_str) if control_message.prune: for prune in control_message.prune: - await self.handle_prune(prune, sender_peer_id) + await self.handle_prune(prune, sender_peer_id_str) async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: # pylint: disable=too-many-locals @@ -203,7 +202,8 @@ class GossipSub(IPubsubRouter): :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ - send_to: MutableSet[ID] = set() + # pylint: disable=len-as-condition + send_to: Set[ID] = set() for topic in topic_ids: if topic not in self.pubsub.peer_topics: continue @@ -228,7 +228,6 @@ class GossipSub(IPubsubRouter): # I assume there could be short periods between heartbeats where topic may not # be but we should check that this path gets hit appropriately - # pylint: disable=len-as-condition if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): # If no peers in fanout, choose some peers from gossipsub peers in topic. self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( @@ -480,11 +479,13 @@ class GossipSub(IPubsubRouter): return selection + # FIXME: type of `minus` should be changed to type `Sequence[ID]` + # FIXME: return type should be changed to type `List[ID]` def _get_in_topic_gossipsub_peers_from_minus( self, topic: str, num_to_select: int, - minus: Sequence[ID]) -> List[ID]: + minus: Sequence[str]) -> List[str]: gossipsub_peers_in_topic = [ peer_str for peer_str in self.pubsub.peer_topics[topic] diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index ddfa59c2..77f1fa7f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -29,23 +29,23 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: class Pubsub: - # pylint: disable=too-many-instance-attributes, no-member + # pylint: disable=too-many-instance-attributes, no-member, unsubscriptable-object host: IHost my_id: ID router: 'IPubsubRouter' - peer_queue: asyncio.Queue + peer_queue: asyncio.Queue[ID] protocols: List[str] - incoming_msgs_from_peers: asyncio.Queue - outgoing_messages: asyncio.Queue + incoming_msgs_from_peers: asyncio.Queue[rpc_pb2.Message] + outgoing_messages: asyncio.Queue[rpc_pb2.Message] seen_messages: LRU - my_topics: Dict[str, asyncio.Queue] + my_topics: Dict[str, asyncio.Queue[rpc_pb2.Message]] # FIXME: Should be changed to `Dict[str, List[ID]]` peer_topics: Dict[str, List[str]] @@ -214,9 +214,8 @@ class Pubsub: # Force context switch await asyncio.sleep(0) - # FIXME: type of `origin_id` should be changed to `ID` # FIXME: `sub_message` can be further type hinted with mypy_protobuf - def handle_subscription(self, origin_id: str, sub_message: Any) -> None: + def handle_subscription(self, origin_id: ID, sub_message: Any) -> None: """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as @@ -224,17 +223,17 @@ class Pubsub: :param origin_id: id of the peer who subscribe to the message :param sub_message: RPC.SubOpts """ - origin_id = str(origin_id) + origin_id_str = str(origin_id) if sub_message.subscribe: if sub_message.topicid not in self.peer_topics: - self.peer_topics[sub_message.topicid] = [origin_id] - elif origin_id not in self.peer_topics[sub_message.topicid]: + self.peer_topics[sub_message.topicid] = [origin_id_str] + elif origin_id_str not in self.peer_topics[sub_message.topicid]: # Add peer to topic - self.peer_topics[sub_message.topicid].append(origin_id) + self.peer_topics[sub_message.topicid].append(origin_id_str) else: if sub_message.topicid in self.peer_topics: - if origin_id in self.peer_topics[sub_message.topicid]: - self.peer_topics[sub_message.topicid].remove(origin_id) + if origin_id_str in self.peer_topics[sub_message.topicid]: + self.peer_topics[sub_message.topicid].remove(origin_id_str) # FIXME(mhchia): Change the function name? # FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf @@ -252,7 +251,7 @@ class Pubsub: # for each topic await self.my_topics[topic].put(publish_message) - async def subscribe(self, topic_id: str) -> asyncio.Queue: + async def subscribe(self, topic_id: str) -> asyncio.Queue[rpc_pb2.Message]: """ Subscribe ourself to a topic :param topic_id: topic_id to subscribe to @@ -374,6 +373,6 @@ class Pubsub: self.seen_messages[msg_id] = 1 def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: - if len(self.my_topics) == 0: + if not bool(self.my_topics): return False return all([topic in self.my_topics for topic in msg.topicIDs]) diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index cdaec4bf..5830072b 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -4,17 +4,18 @@ from multiaddr import Multiaddr from libp2p.network.network_interface import INetwork from libp2p.network.notifee_interface import INotifee +from libp2p.peer.id import ID from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn from libp2p.network.stream.net_stream_interface import INetStream class PubsubNotifee(INotifee): - # pylint: disable=too-many-instance-attributes, cell-var-from-loop + # pylint: disable=too-many-instance-attributes, cell-var-from-loop, unsubscriptable-object - initiator_peers_queue: asyncio.Queue + initiator_peers_queue: asyncio.Queue[ID] - def __init__(self, initiator_peers_queue: asyncio.Queue) -> None: + def __init__(self, initiator_peers_queue: asyncio.Queue[ID]) -> None: """ :param initiator_peers_queue: queue to add new peers to so that pubsub can process new peers after we connect to them diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 0cc6c126..8a6a879c 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -41,9 +41,8 @@ class IPubsubRouter(ABC): :param peer_id: id of peer to remove """ - # FIXME: Should be changed to type 'peer.ID' @abstractmethod - async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: str) -> None: + async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed diff --git a/libp2p/routing/interfaces.py b/libp2p/routing/interfaces.py index e656215c..5d0e63f3 100644 --- a/libp2p/routing/interfaces.py +++ b/libp2p/routing/interfaces.py @@ -1,8 +1,8 @@ -from abc import ABC, abstractmethod -from typing import ( - Any, - Iterable, +from abc import ( + ABC, + abstractmethod, ) +from typing import Iterable from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo diff --git a/libp2p/routing/kademlia/kademlia_peer_router.py b/libp2p/routing/kademlia/kademlia_peer_router.py index e50ec841..5e426fe5 100644 --- a/libp2p/routing/kademlia/kademlia_peer_router.py +++ b/libp2p/routing/kademlia/kademlia_peer_router.py @@ -9,7 +9,6 @@ from libp2p.kademlia.kad_peerinfo import ( ) from libp2p.kademlia.network import KademliaServer from libp2p.peer.id import ID -from libp2p.peer.peerinfo import PeerInfo from libp2p.routing.interfaces import IPeerRouting diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py index b9034ce8..2b4dac37 100644 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ b/libp2p/stream_muxer/muxed_stream_interface.py @@ -1,11 +1,18 @@ -from abc import ABC, abstractmethod +from abc import ( + ABC, + abstractmethod, +) +from typing import ( + TYPE_CHECKING, +) -from libp2p.stream_muxer.mplex.mplex import Mplex +if TYPE_CHECKING: + from libp2p.stream_muxer.mplex.mplex import Mplex class IMuxedStream(ABC): - mplex_conn: Mplex + mplex_conn: 'Mplex' @abstractmethod def read(self):