From a7eb9b5fbde18dc282ac9e42f538d8e5ca4f0267 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 1 Jul 2025 18:32:57 +0530 Subject: [PATCH] negotiate timeout configurable in application code --- libp2p/__init__.py | 5 ++++- libp2p/host/basic_host.py | 23 ++++++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index fa7ebefd..ba666ff1 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -84,6 +84,8 @@ DEFAULT_MUXER = "YAMUX" # Multiplexer options MUXER_YAMUX = "YAMUX" MUXER_MPLEX = "MPLEX" +DEFAULT_NEGOTIATE_TIMEOUT = 5 + def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None: @@ -249,6 +251,7 @@ def new_host( muxer_preference: Literal["YAMUX", "MPLEX"] | None = None, listen_addrs: Sequence[multiaddr.Multiaddr] | None = None, enable_mDNS: bool = False, + negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> IHost: """ Create a new libp2p host based on the given parameters. @@ -274,6 +277,6 @@ def new_host( if disc_opt is not None: return RoutedHost(swarm, disc_opt, enable_mDNS) - return BasicHost(swarm, enable_mDNS) + return BasicHost(network=swarm, negotitate_timeout=negotiate_timeout, enable_mDNS) __version__ = __version("libp2p") diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 798186cf..cc93be08 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -71,6 +71,7 @@ if TYPE_CHECKING: logger = logging.getLogger("libp2p.network.basic_host") +DEFAULT_NEGOTIATE_TIMEOUT = 5 class BasicHost(IHost): @@ -92,10 +93,12 @@ class BasicHost(IHost): network: INetworkService, enable_mDNS: bool = False, default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None, + negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> None: self._network = network self._network.set_stream_handler(self._swarm_stream_handler) self.peerstore = self._network.peerstore + self.negotiate_timeout = negotitate_timeout # Protocol muxing default_protocols = default_protocols or get_default_protocols(self) self.multiselect = Multiselect(dict(default_protocols.items())) @@ -189,7 +192,10 @@ class BasicHost(IHost): self.multiselect.add_handler(protocol_id, stream_handler) async def new_stream( - self, peer_id: ID, protocol_ids: Sequence[TProtocol] + self, + peer_id: ID, + protocol_ids: Sequence[TProtocol], + negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> INetStream: """ :param peer_id: peer_id that host is connecting @@ -201,7 +207,9 @@ class BasicHost(IHost): # Perform protocol muxing to determine protocol to use try: selected_protocol = await self.multiselect_client.select_one_of( - list(protocol_ids), MultiselectCommunicator(net_stream) + list(protocol_ids), + MultiselectCommunicator(net_stream), + negotitate_timeout, ) except MultiselectClientError as error: logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) @@ -211,7 +219,12 @@ class BasicHost(IHost): net_stream.set_protocol(selected_protocol) return net_stream - async def send_command(self, peer_id: ID, command: str) -> list[str]: + async def send_command( + self, + peer_id: ID, + command: str, + response_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, + ) -> list[str]: """ Send a multistream-select command to the specified peer and return the response. @@ -225,7 +238,7 @@ class BasicHost(IHost): try: response = await self.multiselect_client.query_multistream_command( - MultiselectCommunicator(new_stream), command + MultiselectCommunicator(new_stream), command, response_timeout ) except MultiselectClientError as error: logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) @@ -264,7 +277,7 @@ class BasicHost(IHost): # Perform protocol muxing to determine protocol to use try: protocol, handler = await self.multiselect.negotiate( - MultiselectCommunicator(net_stream) + MultiselectCommunicator(net_stream), self.negotiate_timeout ) except MultiselectError as error: peer_id = net_stream.muxed_conn.peer_id