From d36e32370354c81095fd2fb291907cefe13aa8bf Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 4 Nov 2019 21:17:54 +0800 Subject: [PATCH] Update error handling of pubsub stream handler --- libp2p/pubsub/pubsub.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5e518774..dd0bed3d 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -163,13 +163,7 @@ class Pubsub: peer_id = stream.muxed_conn.peer_id while True: - try: - incoming: bytes = await read_varint_prefixed_bytes(stream) - except (ParseError, IncompleteReadError) as error: - logger.debug( - "read corrupted data from peer %s, error=%s", peer_id, error - ) - continue + incoming: bytes = await read_varint_prefixed_bytes(stream) rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) if rpc_incoming.publish: @@ -252,13 +246,18 @@ class Pubsub: :param stream: newly created stream """ + peer_id = stream.muxed_conn.peer_id + + # Error handling pattern reference: + # https://github.com/libp2p/go-libp2p-pubsub/blob/534fe2f382d8dd75dab89ddb0760542546c9f24e/comm.go#L38-L46 # noqa: E501 try: await self.continuously_read_stream(stream) - except (StreamEOF, StreamReset) as error: - logger.debug("fail to read from stream, error=%s", error) - await stream.reset() - # TODO: what to do when the stream is terminated? - # disconnect the peer? + except StreamEOF as error: + stream.close() + logger.debug("fail to read from peer %s, error=%s", peer_id, error) + except (ParseError, IncompleteReadError, StreamReset) as error: + stream.reset() + logger.debug("read corrupted data from peer %s, error=%s", peer_id, error) async def _handle_new_peer(self, peer_id: ID) -> None: try: