From c4f9ce6bb32c8749184995bd53e111639433184e Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 12 Nov 2019 18:07:30 +0800 Subject: [PATCH 1/8] Handle `StreamClosed` in identify protocol handler --- libp2p/identity/identify/protocol.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/libp2p/identity/identify/protocol.py b/libp2p/identity/identify/protocol.py index 87d946c0..092deb4a 100644 --- a/libp2p/identity/identify/protocol.py +++ b/libp2p/identity/identify/protocol.py @@ -3,6 +3,7 @@ import logging from multiaddr import Multiaddr from libp2p.host.host_interface import IHost +from libp2p.network.stream.exceptions import StreamClosed from libp2p.network.stream.net_stream_interface import INetStream from libp2p.typing import StreamHandlerFn, TProtocol @@ -43,8 +44,12 @@ def identify_handler_for(host: IHost) -> StreamHandlerFn: protobuf = _mk_identify_protobuf(host) response = protobuf.SerializeToString() - await stream.write(response) - await stream.close() - logger.debug("successfully handled request for %s from %s", ID, peer_id) + try: + await stream.write(response) + except StreamClosed: + logger.debug("Fail to respond to %s request: stream closed", ID) + else: + await stream.close() + logger.debug("successfully handled request for %s from %s", ID, peer_id) return handle_identify From 9be9b4bbfcc73cedcf3b4ac25e92a1f5d5de82d4 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 12 Nov 2019 18:10:41 +0800 Subject: [PATCH 2/8] Handle `StreamClosed` in pub/gossip/flood-sub --- libp2p/pubsub/floodsub.py | 6 +++++- libp2p/pubsub/gossipsub.py | 8 +++++++- libp2p/pubsub/pubsub.py | 18 +++++++++++++++--- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 15ca6b02..bac0bd77 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,6 +1,7 @@ import logging from typing import Iterable, List, Sequence +from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID from libp2p.typing import TProtocol from libp2p.utils import encode_varint_prefixed @@ -89,7 +90,10 @@ class FloodSub(IPubsubRouter): stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) + try: + await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) + except StreamClosed: + logger.debug("Fail to publish message to %s: stream closed", peer_id) async def join(self, topic: str) -> None: """ diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 864189e4..564cf44c 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -4,6 +4,7 @@ import logging import random from typing import Any, Dict, Iterable, List, Sequence, Set +from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID from libp2p.pubsub import floodsub from libp2p.typing import TProtocol @@ -188,7 +189,12 @@ class GossipSub(IPubsubRouter): # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. - await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) + try: + await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) + except StreamClosed: + logger.debug("Fail to publish message to %s: stream closed", peer_id) + # TODO: also remove peer info from pubsub + self.remove_peer(peer_id) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index d5e02677..8cc6539e 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -20,7 +20,7 @@ from libp2p.exceptions import ParseError, ValidationError from libp2p.host.host_interface import IHost from libp2p.io.exceptions import IncompleteReadError from libp2p.network.exceptions import SwarmException -from libp2p.network.stream.exceptions import StreamEOF, StreamReset +from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.typing import TProtocol @@ -279,13 +279,19 @@ class Pubsub: # Send hello packet hello = self.get_hello_packet() - await stream.write(encode_varint_prefixed(hello.SerializeToString())) + try: + await stream.write(encode_varint_prefixed(hello.SerializeToString())) + except StreamClosed: + logger.debug("Fail to add new peer %s: stream closed", peer_id) + del self.peers[peer_id] + return # TODO: Check EOF of this stream. # TODO: Check if the peer in black list. try: self.router.add_peer(peer_id, stream.get_protocol()) except Exception as error: logger.debug("fail to add new peer %s, error %s", peer_id, error) + del self.peers[peer_id] return logger.debug("added new peer %s", peer_id) @@ -429,7 +435,13 @@ class Pubsub: # Broadcast message for stream in self.peers.values(): # Write message to stream - await stream.write(encode_varint_prefixed(raw_msg)) + try: + await stream.write(encode_varint_prefixed(raw_msg)) + except StreamClosed: + peer_id = stream.muxed_conn.peer_id + logger.debug("Fail to message peer %s: stream closed", peer_id) + del self.peers[peer_id] + self.router.remove_peer(peer_id) async def publish(self, topic_id: str, data: bytes) -> None: """ From 86e0fa45635a3899e2d970fade8fc223f065ac98 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 12 Nov 2019 18:33:44 +0800 Subject: [PATCH 3/8] Handle `StreamClosed` in ping protocol handler --- libp2p/host/ping.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/libp2p/host/ping.py b/libp2p/host/ping.py index 9075f0bb..01a34109 100644 --- a/libp2p/host/ping.py +++ b/libp2p/host/ping.py @@ -1,7 +1,7 @@ import asyncio import logging -from libp2p.network.stream.exceptions import StreamEOF, StreamReset +from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID as PeerID from libp2p.typing import TProtocol @@ -25,17 +25,24 @@ async def _handle_ping(stream: INetStream, peer_id: PeerID) -> bool: logger.debug("Other side closed while waiting for ping from %s", peer_id) return False except StreamReset as error: + print("peer", peer_id, "ping reset") logger.debug( "Other side reset while waiting for ping from %s: %s", peer_id, error ) raise except Exception as error: + print("peer", peer_id, "ping", type(error)) logger.debug("Error while waiting to read ping for %s: %s", peer_id, error) raise logger.debug("Received ping from %s with data: 0x%s", peer_id, payload.hex()) + print("receive ping from", peer_id) - await stream.write(payload) + try: + await stream.write(payload) + except StreamClosed: + logger.debug("Fail to respond to ping from %s: stream closed", peer_id) + raise return True @@ -44,11 +51,13 @@ async def handle_ping(stream: INetStream) -> None: or closes the ``stream``.""" peer_id = stream.muxed_conn.peer_id + print("handling ping from", peer_id) while True: try: should_continue = await _handle_ping(stream, peer_id) if not should_continue: return except Exception: + print("error finish ping") await stream.reset() return From cbe57cd5d72263006dd4f8b03aab6e65111ac27a Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 14 Nov 2019 14:22:23 +0800 Subject: [PATCH 4/8] Fix lint --- libp2p/crypto/rsa.py | 4 ++-- libp2p/host/ping.py | 5 ----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/libp2p/crypto/rsa.py b/libp2p/crypto/rsa.py index ec636cae..e40be45a 100644 --- a/libp2p/crypto/rsa.py +++ b/libp2p/crypto/rsa.py @@ -25,7 +25,7 @@ class RSAPublicKey(PublicKey): h = SHA256.new(data) try: # NOTE: the typing in ``pycryptodome`` is wrong on the arguments to ``verify``. - pkcs1_15.new(self.impl).verify(h, signature) # type: ignore + pkcs1_15.new(self.impl).verify(h, signature) except (ValueError, TypeError): return False return True @@ -49,7 +49,7 @@ class RSAPrivateKey(PrivateKey): def sign(self, data: bytes) -> bytes: h = SHA256.new(data) # NOTE: the typing in ``pycryptodome`` is wrong on the arguments to ``sign``. - return pkcs1_15.new(self.impl).sign(h) # type: ignore + return pkcs1_15.new(self.impl).sign(h) def get_public_key(self) -> PublicKey: return RSAPublicKey(self.impl.publickey()) diff --git a/libp2p/host/ping.py b/libp2p/host/ping.py index 01a34109..3144ef4d 100644 --- a/libp2p/host/ping.py +++ b/libp2p/host/ping.py @@ -25,18 +25,15 @@ async def _handle_ping(stream: INetStream, peer_id: PeerID) -> bool: logger.debug("Other side closed while waiting for ping from %s", peer_id) return False except StreamReset as error: - print("peer", peer_id, "ping reset") logger.debug( "Other side reset while waiting for ping from %s: %s", peer_id, error ) raise except Exception as error: - print("peer", peer_id, "ping", type(error)) logger.debug("Error while waiting to read ping for %s: %s", peer_id, error) raise logger.debug("Received ping from %s with data: 0x%s", peer_id, payload.hex()) - print("receive ping from", peer_id) try: await stream.write(payload) @@ -51,13 +48,11 @@ async def handle_ping(stream: INetStream) -> None: or closes the ``stream``.""" peer_id = stream.muxed_conn.peer_id - print("handling ping from", peer_id) while True: try: should_continue = await _handle_ping(stream, peer_id) if not should_continue: return except Exception: - print("error finish ping") await stream.reset() return From 1cc5a6f58b153e4026381747f53b73eb14744b9e Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 14 Nov 2019 14:45:30 +0800 Subject: [PATCH 5/8] Bump pycryptodome version to 3.9.2 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3a430c37..b2aa9dd2 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ setuptools.setup( platforms=["unix", "linux", "osx"], classifiers=classifiers, install_requires=[ - "pycryptodome>=3.8.2,<4.0.0", + "pycryptodome>=3.9.2,<4.0.0", "base58>=1.0.3,<2.0.0", "pymultihash>=0.8.2", "multiaddr>=0.0.8,<0.1.0", From 7d1f3d6000ed5c91daf8da79bdd80bb5f3fe8e94 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Fri, 15 Nov 2019 12:12:28 +0800 Subject: [PATCH 6/8] Remove outdated comment --- libp2p/crypto/rsa.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/libp2p/crypto/rsa.py b/libp2p/crypto/rsa.py index e40be45a..b059a187 100644 --- a/libp2p/crypto/rsa.py +++ b/libp2p/crypto/rsa.py @@ -24,7 +24,6 @@ class RSAPublicKey(PublicKey): def verify(self, data: bytes, signature: bytes) -> bool: h = SHA256.new(data) try: - # NOTE: the typing in ``pycryptodome`` is wrong on the arguments to ``verify``. pkcs1_15.new(self.impl).verify(h, signature) except (ValueError, TypeError): return False @@ -48,7 +47,6 @@ class RSAPrivateKey(PrivateKey): def sign(self, data: bytes) -> bytes: h = SHA256.new(data) - # NOTE: the typing in ``pycryptodome`` is wrong on the arguments to ``sign``. return pkcs1_15.new(self.impl).sign(h) def get_public_key(self) -> PublicKey: From ccc7879422a89297763548ebd3452978722056ce Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sat, 16 Nov 2019 16:24:48 +0800 Subject: [PATCH 7/8] Add stream.write error handling in gossipsub --- libp2p/pubsub/gossipsub.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 564cf44c..839d3f7d 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -518,7 +518,10 @@ class GossipSub(IPubsubRouter): peer_stream = self.pubsub.peers[sender_peer_id] # 4) And write the packet to the stream - await peer_stream.write(encode_varint_prefixed(rpc_msg)) + try: + await peer_stream.write(encode_varint_prefixed(rpc_msg)) + except StreamClosed: + logger.debug("Fail to responed to iwant request from %s: stream closed", sender_peer_id) async def handle_graft( self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID @@ -602,4 +605,7 @@ class GossipSub(IPubsubRouter): peer_stream = self.pubsub.peers[to_peer] # Write rpc to stream - await peer_stream.write(encode_varint_prefixed(rpc_msg)) + try: + await peer_stream.write(encode_varint_prefixed(rpc_msg)) + except StreamClosed: + logger.debug("Fail to emit control message to %s: stream closed", to_peer) From ace5ef69a84950af4c531017332b1b5dad0c99b5 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sat, 16 Nov 2019 17:03:04 +0800 Subject: [PATCH 8/8] Apply PR feedback: handle pubsub dead peer when stream closed in gossipsub --- libp2p/pubsub/gossipsub.py | 10 +++++++--- libp2p/pubsub/pubsub.py | 3 +-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 839d3f7d..93faebdd 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -193,8 +193,7 @@ class GossipSub(IPubsubRouter): await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) except StreamClosed: logger.debug("Fail to publish message to %s: stream closed", peer_id) - # TODO: also remove peer info from pubsub - self.remove_peer(peer_id) + self.pubsub._handle_dead_peer(peer_id) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -521,7 +520,11 @@ class GossipSub(IPubsubRouter): try: await peer_stream.write(encode_varint_prefixed(rpc_msg)) except StreamClosed: - logger.debug("Fail to responed to iwant request from %s: stream closed", sender_peer_id) + logger.debug( + "Fail to responed to iwant request from %s: stream closed", + sender_peer_id, + ) + self.pubsub._handle_dead_peer(sender_peer_id) async def handle_graft( self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID @@ -609,3 +612,4 @@ class GossipSub(IPubsubRouter): await peer_stream.write(encode_varint_prefixed(rpc_msg)) except StreamClosed: logger.debug("Fail to emit control message to %s: stream closed", to_peer) + self.pubsub._handle_dead_peer(to_peer) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 8cc6539e..3834eb4b 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -440,8 +440,7 @@ class Pubsub: except StreamClosed: peer_id = stream.muxed_conn.peer_id logger.debug("Fail to message peer %s: stream closed", peer_id) - del self.peers[peer_id] - self.router.remove_peer(peer_id) + self._handle_dead_peer(peer_id) async def publish(self, topic_id: str, data: bytes) -> None: """