From 996b5cf15d7e719c4b247aefe3757d7cf54d3efc Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 5 Feb 2020 17:05:30 +0800 Subject: [PATCH] Mplex: catch exceptions from `channel.send` --- libp2p/stream_muxer/mplex/mplex.py | 13 ++++++++----- tests/stream_muxer/test_mplex_conn.py | 2 -- tests/stream_muxer/test_mplex_stream.py | 2 -- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 70f26b3b..5e265b96 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -111,11 +111,11 @@ class Mplex(IMuxedConn): async def _initialize_stream(self, stream_id: StreamID, name: str) -> MplexStream: # Use an unbounded buffer, to avoid `handle_incoming` being blocked when doing # `send_channel.send`. - channels = trio.open_memory_channel[bytes](math.inf) - stream = MplexStream(name, stream_id, self, channels[1]) + send_channel, receive_channel = trio.open_memory_channel[bytes](math.inf) + stream = MplexStream(name, stream_id, self, receive_channel) async with self.streams_lock: self.streams[stream_id] = stream - self.streams_msg_channels[stream_id] = channels[0] + self.streams_msg_channels[stream_id] = send_channel return stream async def open_stream(self) -> IMuxedStream: @@ -269,7 +269,10 @@ class Mplex(IMuxedConn): if stream.event_remote_closed.is_set(): # TODO: Warn "Received data from remote after stream was closed by them. (len = %d)" # noqa: E501 return - await send_channel.send(message) + try: + await send_channel.send(message) + except (trio.BrokenResourceError, trio.ClosedResourceError): + raise MplexUnavailable async def _handle_close(self, stream_id: StreamID) -> None: async with self.streams_lock: @@ -325,7 +328,7 @@ class Mplex(IMuxedConn): stream.event_local_closed.set() send_channel = self.streams_msg_channels[stream_id] await send_channel.aclose() - self.streams = None self.event_closed.set() + # FIXME: It's enough to just close one side. await self.new_stream_send_channel.aclose() await self.new_stream_receive_channel.aclose() diff --git a/tests/stream_muxer/test_mplex_conn.py b/tests/stream_muxer/test_mplex_conn.py index 4bff2d61..df1097dd 100644 --- a/tests/stream_muxer/test_mplex_conn.py +++ b/tests/stream_muxer/test_mplex_conn.py @@ -31,12 +31,10 @@ async def test_mplex_conn(mplex_conn_pair): assert stream_0.event_remote_closed.is_set() assert stream_0.event_reset.is_set() assert stream_0.event_local_closed.is_set() - assert conn_0.streams is None # Test: All streams on the other side are also closed. assert stream_1.event_remote_closed.is_set() assert stream_1.event_reset.is_set() assert stream_1.event_local_closed.is_set() - assert conn_1.streams is None # Test: No effect to close more than once between two side. await conn_0.close() diff --git a/tests/stream_muxer/test_mplex_stream.py b/tests/stream_muxer/test_mplex_stream.py index 55ee97bd..181daa09 100644 --- a/tests/stream_muxer/test_mplex_stream.py +++ b/tests/stream_muxer/test_mplex_stream.py @@ -69,9 +69,7 @@ async def test_mplex_stream_read_after_remote_closed(mplex_stream_pair): await stream_0.close() assert stream_0.event_local_closed.is_set() await trio.sleep(0.01) - # await trio.sleep(100000) await wait_all_tasks_blocked() - # await trio.sleep_forever() assert stream_1.event_remote_closed.is_set() assert (await stream_1.read(MAX_READ_LEN)) == DATA with pytest.raises(MplexStreamEOF):