mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-11 15:40:54 +00:00
negotiate timeout configurable in application code
This commit is contained in:
@ -84,6 +84,8 @@ DEFAULT_MUXER = "YAMUX"
|
|||||||
# Multiplexer options
|
# Multiplexer options
|
||||||
MUXER_YAMUX = "YAMUX"
|
MUXER_YAMUX = "YAMUX"
|
||||||
MUXER_MPLEX = "MPLEX"
|
MUXER_MPLEX = "MPLEX"
|
||||||
|
DEFAULT_NEGOTIATE_TIMEOUT = 5
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None:
|
def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None:
|
||||||
@ -249,6 +251,7 @@ def new_host(
|
|||||||
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
|
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
|
||||||
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
|
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
|
||||||
enable_mDNS: bool = False,
|
enable_mDNS: bool = False,
|
||||||
|
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
|
||||||
) -> IHost:
|
) -> IHost:
|
||||||
"""
|
"""
|
||||||
Create a new libp2p host based on the given parameters.
|
Create a new libp2p host based on the given parameters.
|
||||||
@ -274,6 +277,6 @@ def new_host(
|
|||||||
|
|
||||||
if disc_opt is not None:
|
if disc_opt is not None:
|
||||||
return RoutedHost(swarm, disc_opt, enable_mDNS)
|
return RoutedHost(swarm, disc_opt, enable_mDNS)
|
||||||
return BasicHost(swarm, enable_mDNS)
|
return BasicHost(network=swarm, negotitate_timeout=negotiate_timeout, enable_mDNS)
|
||||||
|
|
||||||
__version__ = __version("libp2p")
|
__version__ = __version("libp2p")
|
||||||
|
|||||||
@ -71,6 +71,7 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger("libp2p.network.basic_host")
|
logger = logging.getLogger("libp2p.network.basic_host")
|
||||||
|
DEFAULT_NEGOTIATE_TIMEOUT = 5
|
||||||
|
|
||||||
|
|
||||||
class BasicHost(IHost):
|
class BasicHost(IHost):
|
||||||
@ -92,10 +93,12 @@ class BasicHost(IHost):
|
|||||||
network: INetworkService,
|
network: INetworkService,
|
||||||
enable_mDNS: bool = False,
|
enable_mDNS: bool = False,
|
||||||
default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None,
|
default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None,
|
||||||
|
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
|
||||||
) -> None:
|
) -> None:
|
||||||
self._network = network
|
self._network = network
|
||||||
self._network.set_stream_handler(self._swarm_stream_handler)
|
self._network.set_stream_handler(self._swarm_stream_handler)
|
||||||
self.peerstore = self._network.peerstore
|
self.peerstore = self._network.peerstore
|
||||||
|
self.negotiate_timeout = negotitate_timeout
|
||||||
# Protocol muxing
|
# Protocol muxing
|
||||||
default_protocols = default_protocols or get_default_protocols(self)
|
default_protocols = default_protocols or get_default_protocols(self)
|
||||||
self.multiselect = Multiselect(dict(default_protocols.items()))
|
self.multiselect = Multiselect(dict(default_protocols.items()))
|
||||||
@ -189,7 +192,10 @@ class BasicHost(IHost):
|
|||||||
self.multiselect.add_handler(protocol_id, stream_handler)
|
self.multiselect.add_handler(protocol_id, stream_handler)
|
||||||
|
|
||||||
async def new_stream(
|
async def new_stream(
|
||||||
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
|
self,
|
||||||
|
peer_id: ID,
|
||||||
|
protocol_ids: Sequence[TProtocol],
|
||||||
|
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
|
||||||
) -> INetStream:
|
) -> INetStream:
|
||||||
"""
|
"""
|
||||||
:param peer_id: peer_id that host is connecting
|
:param peer_id: peer_id that host is connecting
|
||||||
@ -201,7 +207,9 @@ class BasicHost(IHost):
|
|||||||
# Perform protocol muxing to determine protocol to use
|
# Perform protocol muxing to determine protocol to use
|
||||||
try:
|
try:
|
||||||
selected_protocol = await self.multiselect_client.select_one_of(
|
selected_protocol = await self.multiselect_client.select_one_of(
|
||||||
list(protocol_ids), MultiselectCommunicator(net_stream)
|
list(protocol_ids),
|
||||||
|
MultiselectCommunicator(net_stream),
|
||||||
|
negotitate_timeout,
|
||||||
)
|
)
|
||||||
except MultiselectClientError as error:
|
except MultiselectClientError as error:
|
||||||
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
|
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
|
||||||
@ -211,7 +219,12 @@ class BasicHost(IHost):
|
|||||||
net_stream.set_protocol(selected_protocol)
|
net_stream.set_protocol(selected_protocol)
|
||||||
return net_stream
|
return net_stream
|
||||||
|
|
||||||
async def send_command(self, peer_id: ID, command: str) -> list[str]:
|
async def send_command(
|
||||||
|
self,
|
||||||
|
peer_id: ID,
|
||||||
|
command: str,
|
||||||
|
response_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
|
||||||
|
) -> list[str]:
|
||||||
"""
|
"""
|
||||||
Send a multistream-select command to the specified peer and return
|
Send a multistream-select command to the specified peer and return
|
||||||
the response.
|
the response.
|
||||||
@ -225,7 +238,7 @@ class BasicHost(IHost):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
response = await self.multiselect_client.query_multistream_command(
|
response = await self.multiselect_client.query_multistream_command(
|
||||||
MultiselectCommunicator(new_stream), command
|
MultiselectCommunicator(new_stream), command, response_timeout
|
||||||
)
|
)
|
||||||
except MultiselectClientError as error:
|
except MultiselectClientError as error:
|
||||||
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
|
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
|
||||||
@ -264,7 +277,7 @@ class BasicHost(IHost):
|
|||||||
# Perform protocol muxing to determine protocol to use
|
# Perform protocol muxing to determine protocol to use
|
||||||
try:
|
try:
|
||||||
protocol, handler = await self.multiselect.negotiate(
|
protocol, handler = await self.multiselect.negotiate(
|
||||||
MultiselectCommunicator(net_stream)
|
MultiselectCommunicator(net_stream), self.negotiate_timeout
|
||||||
)
|
)
|
||||||
except MultiselectError as error:
|
except MultiselectError as error:
|
||||||
peer_id = net_stream.muxed_conn.peer_id
|
peer_id = net_stream.muxed_conn.peer_id
|
||||||
|
|||||||
Reference in New Issue
Block a user