From 83d11db852de66f6820330d264ab476ab5d402e0 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Fri, 20 Jun 2025 15:18:17 +0530 Subject: [PATCH 01/12] fix: added negotiate timeout to MuxerMultistream --- libp2p/protocol_muxer/multiselect_client.py | 18 +++++++++++++++--- libp2p/stream_muxer/muxer_multistream.py | 3 --- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 8d8c02a1..f9f4ba34 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -2,6 +2,8 @@ from collections.abc import ( Sequence, ) +import trio + from libp2p.abc import ( IMultiselectClient, IMultiselectCommunicator, @@ -17,6 +19,7 @@ from .exceptions import ( MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0" PROTOCOL_NOT_FOUND_MSG = "na" +DEFAULT_NEGOTIATE_TIMEOUT = 60 class MultiselectClient(IMultiselectClient): @@ -39,7 +42,10 @@ class MultiselectClient(IMultiselectClient): raise MultiselectClientError() from error try: - handshake_contents = await communicator.read() + with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): + handshake_contents = await communicator.read() + except trio.TooSlowError: + raise MultiselectClientError("handshake read timed out") except MultiselectCommunicatorError as error: raise MultiselectClientError() from error @@ -93,8 +99,11 @@ class MultiselectClient(IMultiselectClient): raise ValueError("Command not supported") try: - response = await communicator.read() + with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): # Timeout after 5 seconds + response = await communicator.read() response_list = response.strip().splitlines() + except trio.TooSlowError: + raise MultiselectClientError("command response timed out") except MultiselectCommunicatorError as error: raise MultiselectClientError() from error @@ -117,7 +126,10 @@ class MultiselectClient(IMultiselectClient): raise MultiselectClientError() from error try: - response = await communicator.read() + with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): # Timeout after 5 seconds + response = await communicator.read() + except trio.TooSlowError: + raise MultiselectClientError("protocol selection response timed out") except MultiselectCommunicatorError as error: raise MultiselectClientError() from error diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index b4aa5d57..76699c67 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -31,9 +31,6 @@ from libp2p.stream_muxer.yamux.yamux import ( Yamux, ) -# FIXME: add negotiate timeout to `MuxerMultistream` -DEFAULT_NEGOTIATE_TIMEOUT = 60 - class MuxerMultistream: """ From ba231dab79ce66fc14f5560c9c5dd969724d9ade Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 24 Jun 2025 09:36:16 +0530 Subject: [PATCH 02/12] added newsfragment --- newsfragments/696.bugfix.rst | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 newsfragments/696.bugfix.rst diff --git a/newsfragments/696.bugfix.rst b/newsfragments/696.bugfix.rst new file mode 100644 index 00000000..727b0f64 --- /dev/null +++ b/newsfragments/696.bugfix.rst @@ -0,0 +1,5 @@ +Add timeout to: +1. multiselect handshake +2. query_multistream_command +3. try_select +to prevent indefinite hangs when a remote peer does not respond. From 6d8f695778935c65605a5f73afa823fbd7d8f303 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 24 Jun 2025 19:37:45 +0530 Subject: [PATCH 03/12] updated multiselect.py and newsfragment --- libp2p/protocol_muxer/multiselect.py | 8 +++++++- newsfragments/696.bugfix.rst | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 8f6e0e74..91c1efed 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -1,3 +1,5 @@ +import trio + from libp2p.abc import ( IMultiselectCommunicator, IMultiselectMuxer, @@ -14,6 +16,7 @@ from .exceptions import ( MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0" PROTOCOL_NOT_FOUND_MSG = "na" +DEFAULT_NEGOTIATE_TIMEOUT = 60 class Multiselect(IMultiselectMuxer): @@ -102,7 +105,10 @@ class Multiselect(IMultiselectMuxer): raise MultiselectError() from error try: - handshake_contents = await communicator.read() + with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): # Timeout after 5 seconds + handshake_contents = await communicator.read() + except trio.TooSlowError: + raise MultiselectError("protocol selection response timed out") except MultiselectCommunicatorError as error: raise MultiselectError() from error diff --git a/newsfragments/696.bugfix.rst b/newsfragments/696.bugfix.rst index 727b0f64..d67ad81e 100644 --- a/newsfragments/696.bugfix.rst +++ b/newsfragments/696.bugfix.rst @@ -1,5 +1,5 @@ Add timeout to: -1. multiselect handshake +1. multiselect handshake in multiselect_client.py and multiselect.py 2. query_multistream_command 3. try_select to prevent indefinite hangs when a remote peer does not respond. From 621ea321abd35565d77e92abe7242af7bbf0f269 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Wed, 25 Jun 2025 00:32:33 +0530 Subject: [PATCH 04/12] Set default-negotiate-timeout = 5 sec --- libp2p/protocol_muxer/multiselect.py | 7 +++++-- libp2p/protocol_muxer/multiselect_client.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 91c1efed..99ad37f5 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -16,7 +16,7 @@ from .exceptions import ( MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0" PROTOCOL_NOT_FOUND_MSG = "na" -DEFAULT_NEGOTIATE_TIMEOUT = 60 +DEFAULT_NEGOTIATE_TIMEOUT = 5 class Multiselect(IMultiselectMuxer): @@ -63,7 +63,10 @@ class Multiselect(IMultiselectMuxer): while True: try: - command = await communicator.read() + with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): + command = await communicator.read() + except trio.TooSlowError: + raise MultiselectError("handshake read timeout") except MultiselectCommunicatorError as error: raise MultiselectError() from error diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index f9f4ba34..74335241 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -19,7 +19,7 @@ from .exceptions import ( MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0" PROTOCOL_NOT_FOUND_MSG = "na" -DEFAULT_NEGOTIATE_TIMEOUT = 60 +DEFAULT_NEGOTIATE_TIMEOUT = 5 class MultiselectClient(IMultiselectClient): From 8753024add4124472dfba40066834e4a2414f2b6 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Wed, 25 Jun 2025 22:24:36 +0530 Subject: [PATCH 05/12] Updated the timeout wrapper for read/write operations --- libp2p/protocol_muxer/multiselect.py | 66 +++++++++---------- libp2p/protocol_muxer/multiselect_client.py | 70 +++++++++++---------- 2 files changed, 70 insertions(+), 66 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 99ad37f5..05b112dc 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -59,41 +59,44 @@ class Multiselect(IMultiselectMuxer): :return: selected protocol name, handler function :raise MultiselectError: raised when negotiation failed """ - await self.handshake(communicator) + try: + with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): + await self.handshake(communicator) - while True: - try: - with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): - command = await communicator.read() - except trio.TooSlowError: - raise MultiselectError("handshake read timeout") - except MultiselectCommunicatorError as error: - raise MultiselectError() from error - - if command == "ls": - supported_protocols = [p for p in self.handlers.keys() if p is not None] - response = "\n".join(supported_protocols) + "\n" - - try: - await communicator.write(response) - except MultiselectCommunicatorError as error: - raise MultiselectError() from error - - else: - protocol = TProtocol(command) - if protocol in self.handlers: + while True: try: - await communicator.write(protocol) + command = await communicator.read() except MultiselectCommunicatorError as error: raise MultiselectError() from error - return protocol, self.handlers[protocol] - try: - await communicator.write(PROTOCOL_NOT_FOUND_MSG) - except MultiselectCommunicatorError as error: - raise MultiselectError() from error + if command == "ls": + supported_protocols = [ + p for p in self.handlers.keys() if p is not None + ] + response = "\n".join(supported_protocols) + "\n" - raise MultiselectError("Negotiation failed: no matching protocol") + try: + await communicator.write(response) + except MultiselectCommunicatorError as error: + raise MultiselectError() from error + + else: + protocol = TProtocol(command) + if protocol in self.handlers: + try: + await communicator.write(protocol) + except MultiselectCommunicatorError as error: + raise MultiselectError() from error + + return protocol, self.handlers[protocol] + try: + await communicator.write(PROTOCOL_NOT_FOUND_MSG) + except MultiselectCommunicatorError as error: + raise MultiselectError() from error + + raise MultiselectError("Negotiation failed: no matching protocol") + except trio.TooSlowError: + raise MultiselectError("handshake read timeout") async def handshake(self, communicator: IMultiselectCommunicator) -> None: """ @@ -108,10 +111,7 @@ class Multiselect(IMultiselectMuxer): raise MultiselectError() from error try: - with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): # Timeout after 5 seconds - handshake_contents = await communicator.read() - except trio.TooSlowError: - raise MultiselectError("protocol selection response timed out") + handshake_contents = await communicator.read() except MultiselectCommunicatorError as error: raise MultiselectError() from error diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 74335241..75d5ca05 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -42,10 +42,8 @@ class MultiselectClient(IMultiselectClient): raise MultiselectClientError() from error try: - with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): - handshake_contents = await communicator.read() - except trio.TooSlowError: - raise MultiselectClientError("handshake read timed out") + handshake_contents = await communicator.read() + except MultiselectCommunicatorError as error: raise MultiselectClientError() from error @@ -65,16 +63,22 @@ class MultiselectClient(IMultiselectClient): :return: selected protocol :raise MultiselectClientError: raised when protocol negotiation failed """ - await self.handshake(communicator) + try: + with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): + await self.handshake(communicator) - for protocol in protocols: - try: - selected_protocol = await self.try_select(communicator, protocol) - return selected_protocol - except MultiselectClientError: - pass + for protocol in protocols: + try: + selected_protocol = await self.try_select( + communicator, protocol + ) + return selected_protocol + except MultiselectClientError: + pass - raise MultiselectClientError("protocols not supported") + raise MultiselectClientError("protocols not supported") + except trio.TooSlowError: + raise MultiselectClientError("response timed out") async def query_multistream_command( self, communicator: IMultiselectCommunicator, command: str @@ -88,26 +92,28 @@ class MultiselectClient(IMultiselectClient): :raise MultiselectClientError: If the communicator fails to process data. :return: list of strings representing the response from peer. """ - await self.handshake(communicator) - - if command == "ls": - try: - await communicator.write("ls") - except MultiselectCommunicatorError as error: - raise MultiselectClientError() from error - else: - raise ValueError("Command not supported") - try: - with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): # Timeout after 5 seconds - response = await communicator.read() - response_list = response.strip().splitlines() + with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): + await self.handshake(communicator) + + if command == "ls": + try: + await communicator.write("ls") + except MultiselectCommunicatorError as error: + raise MultiselectClientError() from error + else: + raise ValueError("Command not supported") + + try: + response = await communicator.read() + response_list = response.strip().splitlines() + + except MultiselectCommunicatorError as error: + raise MultiselectClientError() from error + + return response_list except trio.TooSlowError: raise MultiselectClientError("command response timed out") - except MultiselectCommunicatorError as error: - raise MultiselectClientError() from error - - return response_list async def try_select( self, communicator: IMultiselectCommunicator, protocol: TProtocol @@ -126,10 +132,8 @@ class MultiselectClient(IMultiselectClient): raise MultiselectClientError() from error try: - with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): # Timeout after 5 seconds - response = await communicator.read() - except trio.TooSlowError: - raise MultiselectClientError("protocol selection response timed out") + response = await communicator.read() + except MultiselectCommunicatorError as error: raise MultiselectClientError() from error From d0e73f54386fe024a3482dd940dbd8a17fd68d91 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Wed, 25 Jun 2025 22:31:14 +0530 Subject: [PATCH 06/12] Updated newsfragment --- newsfragments/696.bugfix.rst | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/newsfragments/696.bugfix.rst b/newsfragments/696.bugfix.rst index d67ad81e..d5686418 100644 --- a/newsfragments/696.bugfix.rst +++ b/newsfragments/696.bugfix.rst @@ -1,5 +1,4 @@ -Add timeout to: -1. multiselect handshake in multiselect_client.py and multiselect.py -2. query_multistream_command -3. try_select +Add timeout wrappers in: +1. multiselect.py: `negotiate` function +2. multiselect_client.py: `select_one_of` , `query_multistream_command` functions to prevent indefinite hangs when a remote peer does not respond. From 715e528a566d4a98a3de87dd38cd72e704e1614e Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 28 Jun 2025 08:52:52 +0530 Subject: [PATCH 07/12] DEFAULT_NEGOTIATE_TIMEOUT configurable --- libp2p/protocol_muxer/multiselect.py | 6 ++++-- libp2p/protocol_muxer/multiselect_client.py | 16 ++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 05b112dc..6a68d2e4 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -50,7 +50,9 @@ class Multiselect(IMultiselectMuxer): # FIXME: Make TProtocol Optional[TProtocol] to keep types consistent async def negotiate( - self, communicator: IMultiselectCommunicator + self, + communicator: IMultiselectCommunicator, + negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> tuple[TProtocol, StreamHandlerFn | None]: """ Negotiate performs protocol selection. @@ -60,7 +62,7 @@ class Multiselect(IMultiselectMuxer): :raise MultiselectError: raised when negotiation failed """ try: - with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): + with trio.fail_after(negotiate_timeout): await self.handshake(communicator) while True: diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 75d5ca05..aa6db5ba 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -51,7 +51,10 @@ class MultiselectClient(IMultiselectClient): raise MultiselectClientError("multiselect protocol ID mismatch") async def select_one_of( - self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator + self, + protocols: Sequence[TProtocol], + communicator: IMultiselectCommunicator, + negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> TProtocol: """ For each protocol, send message to multiselect selecting protocol and @@ -64,7 +67,7 @@ class MultiselectClient(IMultiselectClient): :raise MultiselectClientError: raised when protocol negotiation failed """ try: - with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): + with trio.fail_after(negotitate_timeout): await self.handshake(communicator) for protocol in protocols: @@ -81,7 +84,10 @@ class MultiselectClient(IMultiselectClient): raise MultiselectClientError("response timed out") async def query_multistream_command( - self, communicator: IMultiselectCommunicator, command: str + self, + communicator: IMultiselectCommunicator, + command: str, + response_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> list[str]: """ Send a multistream-select command over the given communicator and return @@ -89,11 +95,13 @@ class MultiselectClient(IMultiselectClient): :param communicator: communicator to use to communicate with counterparty :param command: supported multistream-select command(e.g., ls) + :param negotiate_timeout: timeout for negotiation, + defaults to DEFAULT_NEGOTIATE_TIMEOUT :raise MultiselectClientError: If the communicator fails to process data. :return: list of strings representing the response from peer. """ try: - with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): + with trio.fail_after(response_timeout): await self.handshake(communicator) if command == "ls": From 4df454ebdc9805246566b74df7e2e05bb2f5b39f Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 28 Jun 2025 09:01:12 +0530 Subject: [PATCH 08/12] fix docstrings --- libp2p/protocol_muxer/multiselect.py | 2 ++ libp2p/protocol_muxer/multiselect_client.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 6a68d2e4..d478d441 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -58,6 +58,8 @@ class Multiselect(IMultiselectMuxer): Negotiate performs protocol selection. :param stream: stream to negotiate on + :param negotiate_timeout: timeout for negotiation, + defaults to DEFAULT_NEGOTIATE_TIMEOUT :return: selected protocol name, handler function :raise MultiselectError: raised when negotiation failed """ diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index aa6db5ba..3a78ff8e 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -63,6 +63,8 @@ class MultiselectClient(IMultiselectClient): :param protocol: protocol to select :param communicator: communicator to use to communicate with counterparty + :param negotiate_timeout: timeout for negotiation, + defaults to DEFAULT_NEGOTIATE_TIMEOUT :return: selected protocol :raise MultiselectClientError: raised when protocol negotiation failed """ From fee4208d8948b302c13d9617319fffdaaca67fc8 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 28 Jun 2025 09:06:18 +0530 Subject: [PATCH 09/12] fix docstrings --- libp2p/protocol_muxer/multiselect.py | 3 +-- libp2p/protocol_muxer/multiselect_client.py | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index d478d441..3f6ef02f 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -58,8 +58,7 @@ class Multiselect(IMultiselectMuxer): Negotiate performs protocol selection. :param stream: stream to negotiate on - :param negotiate_timeout: timeout for negotiation, - defaults to DEFAULT_NEGOTIATE_TIMEOUT + :param negotiate_timeout: timeout for negotiation :return: selected protocol name, handler function :raise MultiselectError: raised when negotiation failed """ diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 3a78ff8e..a5b35006 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -63,8 +63,7 @@ class MultiselectClient(IMultiselectClient): :param protocol: protocol to select :param communicator: communicator to use to communicate with counterparty - :param negotiate_timeout: timeout for negotiation, - defaults to DEFAULT_NEGOTIATE_TIMEOUT + :param negotiate_timeout: timeout for negotiation :return: selected protocol :raise MultiselectClientError: raised when protocol negotiation failed """ @@ -97,8 +96,7 @@ class MultiselectClient(IMultiselectClient): :param communicator: communicator to use to communicate with counterparty :param command: supported multistream-select command(e.g., ls) - :param negotiate_timeout: timeout for negotiation, - defaults to DEFAULT_NEGOTIATE_TIMEOUT + :param negotiate_timeout: timeout for negotiation :raise MultiselectClientError: If the communicator fails to process data. :return: list of strings representing the response from peer. """ From a7eb9b5fbde18dc282ac9e42f538d8e5ca4f0267 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 1 Jul 2025 18:32:57 +0530 Subject: [PATCH 10/12] negotiate timeout configurable in application code --- libp2p/__init__.py | 5 ++++- libp2p/host/basic_host.py | 23 ++++++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index fa7ebefd..ba666ff1 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -84,6 +84,8 @@ DEFAULT_MUXER = "YAMUX" # Multiplexer options MUXER_YAMUX = "YAMUX" MUXER_MPLEX = "MPLEX" +DEFAULT_NEGOTIATE_TIMEOUT = 5 + def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None: @@ -249,6 +251,7 @@ def new_host( muxer_preference: Literal["YAMUX", "MPLEX"] | None = None, listen_addrs: Sequence[multiaddr.Multiaddr] | None = None, enable_mDNS: bool = False, + negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> IHost: """ Create a new libp2p host based on the given parameters. @@ -274,6 +277,6 @@ def new_host( if disc_opt is not None: return RoutedHost(swarm, disc_opt, enable_mDNS) - return BasicHost(swarm, enable_mDNS) + return BasicHost(network=swarm, negotitate_timeout=negotiate_timeout, enable_mDNS) __version__ = __version("libp2p") diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 798186cf..cc93be08 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -71,6 +71,7 @@ if TYPE_CHECKING: logger = logging.getLogger("libp2p.network.basic_host") +DEFAULT_NEGOTIATE_TIMEOUT = 5 class BasicHost(IHost): @@ -92,10 +93,12 @@ class BasicHost(IHost): network: INetworkService, enable_mDNS: bool = False, default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None, + negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> None: self._network = network self._network.set_stream_handler(self._swarm_stream_handler) self.peerstore = self._network.peerstore + self.negotiate_timeout = negotitate_timeout # Protocol muxing default_protocols = default_protocols or get_default_protocols(self) self.multiselect = Multiselect(dict(default_protocols.items())) @@ -189,7 +192,10 @@ class BasicHost(IHost): self.multiselect.add_handler(protocol_id, stream_handler) async def new_stream( - self, peer_id: ID, protocol_ids: Sequence[TProtocol] + self, + peer_id: ID, + protocol_ids: Sequence[TProtocol], + negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> INetStream: """ :param peer_id: peer_id that host is connecting @@ -201,7 +207,9 @@ class BasicHost(IHost): # Perform protocol muxing to determine protocol to use try: selected_protocol = await self.multiselect_client.select_one_of( - list(protocol_ids), MultiselectCommunicator(net_stream) + list(protocol_ids), + MultiselectCommunicator(net_stream), + negotitate_timeout, ) except MultiselectClientError as error: logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) @@ -211,7 +219,12 @@ class BasicHost(IHost): net_stream.set_protocol(selected_protocol) return net_stream - async def send_command(self, peer_id: ID, command: str) -> list[str]: + async def send_command( + self, + peer_id: ID, + command: str, + response_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, + ) -> list[str]: """ Send a multistream-select command to the specified peer and return the response. @@ -225,7 +238,7 @@ class BasicHost(IHost): try: response = await self.multiselect_client.query_multistream_command( - MultiselectCommunicator(new_stream), command + MultiselectCommunicator(new_stream), command, response_timeout ) except MultiselectClientError as error: logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) @@ -264,7 +277,7 @@ class BasicHost(IHost): # Perform protocol muxing to determine protocol to use try: protocol, handler = await self.multiselect.negotiate( - MultiselectCommunicator(net_stream) + MultiselectCommunicator(net_stream), self.negotiate_timeout ) except MultiselectError as error: peer_id = net_stream.muxed_conn.peer_id From 01319638cdae5033036f07a83932f422c93469d0 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 1 Jul 2025 18:37:46 +0530 Subject: [PATCH 11/12] rebase with latest commit --- libp2p/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index ba666ff1..542a71c1 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -277,6 +277,6 @@ def new_host( if disc_opt is not None: return RoutedHost(swarm, disc_opt, enable_mDNS) - return BasicHost(network=swarm, negotitate_timeout=negotiate_timeout, enable_mDNS) + return BasicHost(network=swarm,enable_mDNS=enable_mDNS , negotitate_timeout=negotiate_timeout) __version__ = __version("libp2p") From 572c6915f63a5043e6b2cb8ee510e67d3a1f4970 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 1 Jul 2025 23:27:19 +0530 Subject: [PATCH 12/12] added tests for negotiate/response timeout --- .../protocol_muxer/test_negotiate_timeout.py | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 tests/core/protocol_muxer/test_negotiate_timeout.py diff --git a/tests/core/protocol_muxer/test_negotiate_timeout.py b/tests/core/protocol_muxer/test_negotiate_timeout.py new file mode 100644 index 00000000..a50d65f6 --- /dev/null +++ b/tests/core/protocol_muxer/test_negotiate_timeout.py @@ -0,0 +1,59 @@ +import pytest +import trio + +from libp2p.abc import ( + IMultiselectCommunicator, +) +from libp2p.custom_types import TProtocol +from libp2p.protocol_muxer.exceptions import ( + MultiselectClientError, + MultiselectError, +) +from libp2p.protocol_muxer.multiselect import Multiselect +from libp2p.protocol_muxer.multiselect_client import MultiselectClient + + +class DummyMultiselectCommunicator(IMultiselectCommunicator): + """ + Dummy MultiSelectCommunicator to test out negotiate timmeout. + """ + + def __init__(self) -> None: + return + + async def write(self, msg_str: str) -> None: + """Goes into infinite loop when .write is called""" + await trio.sleep_forever() + + async def read(self) -> str: + """Returns a dummy read""" + return "dummy_read" + + +@pytest.mark.trio +async def test_select_one_of_timeout(): + ECHO = TProtocol("/echo/1.0.0") + communicator = DummyMultiselectCommunicator() + + client = MultiselectClient() + + with pytest.raises(MultiselectClientError, match="response timed out"): + await client.select_one_of([ECHO], communicator, 2) + + +@pytest.mark.trio +async def test_query_multistream_command_timeout(): + communicator = DummyMultiselectCommunicator() + client = MultiselectClient() + + with pytest.raises(MultiselectClientError, match="response timed out"): + await client.query_multistream_command(communicator, "ls", 2) + + +@pytest.mark.trio +async def test_negotiate_timeout(): + communicator = DummyMultiselectCommunicator() + server = Multiselect() + + with pytest.raises(MultiselectError, match="handshake read timeout"): + await server.negotiate(communicator, 2)