From 3dcd99a2d16b9f02d067fe9eaa90a3841e24a4b6 Mon Sep 17 00:00:00 2001 From: Minimega12121 Date: Tue, 8 Jul 2025 17:48:57 +0530 Subject: [PATCH] todo: handle timeout --- libp2p/stream_muxer/mplex/mplex_stream.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 3b640df1..b86357e7 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -176,8 +176,6 @@ class MplexStream(IMuxedStream): Closing a stream closes it for writing and closes the remote end for reading but allows writing in the other direction. """ - # TODO error handling with timeout - async with self.close_lock: if self.event_local_closed.is_set(): return @@ -185,8 +183,17 @@ class MplexStream(IMuxedStream): flag = ( HeaderTags.CloseInitiator if self.is_initiator else HeaderTags.CloseReceiver ) - # TODO: Raise when `muxed_conn.send_message` fails and `Mplex` isn't shutdown. - await self.muxed_conn.send_message(flag, None, self.stream_id) + + try: + with trio.fail_after(5): # timeout in seconds + await self.muxed_conn.send_message(flag, None, self.stream_id) + except trio.TooSlowError: + raise TimeoutError("Timeout while trying to close the stream") + except MuxedConnUnavailable: + if not self.muxed_conn.event_shutting_down.is_set(): + raise RuntimeError( + "Failed to send close message and Mplex isn't shutting down" + ) _is_remote_closed: bool async with self.close_lock: