From eca548851b0fced9526efeaf3a902aa9ad21dde7 Mon Sep 17 00:00:00 2001 From: Minimega12121 Date: Fri, 25 Jul 2025 16:19:29 +0530 Subject: [PATCH] added new fragment and tests --- newsfragments/752.internal.rst | 1 + tests/core/stream_muxer/test_mplex_stream.py | 37 ++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 newsfragments/752.internal.rst diff --git a/newsfragments/752.internal.rst b/newsfragments/752.internal.rst new file mode 100644 index 00000000..b0aed33d --- /dev/null +++ b/newsfragments/752.internal.rst @@ -0,0 +1 @@ +[mplex] Add timeout and error handling during stream close diff --git a/tests/core/stream_muxer/test_mplex_stream.py b/tests/core/stream_muxer/test_mplex_stream.py index 62d384c2..1d9c2234 100644 --- a/tests/core/stream_muxer/test_mplex_stream.py +++ b/tests/core/stream_muxer/test_mplex_stream.py @@ -8,6 +8,7 @@ from libp2p.stream_muxer.mplex.exceptions import ( MplexStreamClosed, MplexStreamEOF, MplexStreamReset, + MuxedConnUnavailable, ) from libp2p.stream_muxer.mplex.mplex import ( MPLEX_MESSAGE_CHANNEL_SIZE, @@ -213,3 +214,39 @@ async def test_mplex_stream_reset(mplex_stream_pair): # `reset` should do nothing as well. await stream_0.reset() await stream_1.reset() + + +@pytest.mark.trio +async def test_mplex_stream_close_timeout(monkeypatch, mplex_stream_pair): + stream_0, stream_1 = mplex_stream_pair + + # (simulate hanging) + async def fake_send_message(*args, **kwargs): + await trio.sleep_forever() + + monkeypatch.setattr(stream_0.muxed_conn, "send_message", fake_send_message) + + with pytest.raises(TimeoutError): + await stream_0.close() + + +@pytest.mark.trio +async def test_mplex_stream_close_mux_unavailable(monkeypatch, mplex_stream_pair): + stream_0, _ = mplex_stream_pair + + # Patch send_message to raise MuxedConnUnavailable + def raise_unavailable(*args, **kwargs): + raise MuxedConnUnavailable("Simulated conn drop") + + monkeypatch.setattr(stream_0.muxed_conn, "send_message", raise_unavailable) + + # Case 1: Mplex is shutting down — should not raise + stream_0.muxed_conn.event_shutting_down.set() + await stream_0.close() # Should NOT raise + + # Case 2: Mplex is NOT shutting down — should raise RuntimeError + stream_0.event_local_closed = trio.Event() # Reset since it was set in first call + stream_0.muxed_conn.event_shutting_down = trio.Event() # Unset the shutdown flag + + with pytest.raises(RuntimeError, match="Failed to send close message"): + await stream_0.close()