diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index dd0bed3d..7a453e7a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -253,10 +253,10 @@ class Pubsub: try: await self.continuously_read_stream(stream) except StreamEOF as error: - stream.close() + await stream.close() logger.debug("fail to read from peer %s, error=%s", peer_id, error) except (ParseError, IncompleteReadError, StreamReset) as error: - stream.reset() + await stream.reset() logger.debug("read corrupted data from peer %s, error=%s", peer_id, error) async def _handle_new_peer(self, peer_id: ID) -> None: @@ -281,7 +281,7 @@ class Pubsub: logger.debug("added new peer %s", peer_id) - async def _handle_dead_peer(self, peer_id: ID) -> None: + def _handle_dead_peer(self, peer_id: ID) -> None: del self.peers[peer_id] for topic in self.peer_topics: @@ -300,14 +300,9 @@ class Pubsub: pubsub protocols we support """ while True: - peer_id: ID = await self.peer_queue.get() - # Add Peer - asyncio.ensure_future(self._handle_new_peer(peer_id)) - # Force context switch - await asyncio.sleep(0) async def handle_dead_peer_queue(self) -> None: """ @@ -315,14 +310,9 @@ class Pubsub: remove peer info from pubsub and pubsub router. """ while True: - peer_id: ID = await self.dead_peer_queue.get() - # Remove Peer - asyncio.ensure_future(self._handle_dead_peer(peer_id)) - - # Force context switch - await asyncio.sleep(0) + self._handle_dead_peer(peer_id) def handle_subscription( self, origin_id: ID, sub_message: rpc_pb2.RPC.SubOpts