Merge branch 'main' into todo/handletimeout

This commit is contained in:
Manu Sheel Gupta
2025-07-26 20:54:53 +05:30
committed by GitHub
14 changed files with 413 additions and 4 deletions

View File

@ -251,6 +251,7 @@ def new_host(
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
enable_mDNS: bool = False,
bootstrap: list[str] | None = None,
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
) -> IHost:
"""
@ -264,6 +265,7 @@ def new_host(
:param muxer_preference: optional explicit muxer preference
:param listen_addrs: optional list of multiaddrs to listen on
:param enable_mDNS: whether to enable mDNS discovery
:param bootstrap: optional list of bootstrap peer addresses as strings
:return: return a host instance
"""
swarm = new_swarm(
@ -276,7 +278,7 @@ def new_host(
)
if disc_opt is not None:
return RoutedHost(swarm, disc_opt, enable_mDNS)
return BasicHost(network=swarm,enable_mDNS=enable_mDNS , negotitate_timeout=negotiate_timeout)
return RoutedHost(swarm, disc_opt, enable_mDNS, bootstrap)
return BasicHost(network=swarm,enable_mDNS=enable_mDNS , bootstrap=bootstrap, negotitate_timeout=negotiate_timeout)
__version__ = __version("libp2p")

View File

@ -0,0 +1,5 @@
"""Bootstrap peer discovery module for py-libp2p."""
from .bootstrap import BootstrapDiscovery
__all__ = ["BootstrapDiscovery"]

View File

@ -0,0 +1,94 @@
import logging
from multiaddr import Multiaddr
from multiaddr.resolvers import DNSResolver
from libp2p.abc import ID, INetworkService, PeerInfo
from libp2p.discovery.bootstrap.utils import validate_bootstrap_addresses
from libp2p.discovery.events.peerDiscovery import peerDiscovery
from libp2p.peer.peerinfo import info_from_p2p_addr
logger = logging.getLogger("libp2p.discovery.bootstrap")
resolver = DNSResolver()
class BootstrapDiscovery:
"""
Bootstrap-based peer discovery for py-libp2p.
Connects to predefined bootstrap peers and adds them to peerstore.
"""
def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]):
self.swarm = swarm
self.peerstore = swarm.peerstore
self.bootstrap_addrs = bootstrap_addrs or []
self.discovered_peers: set[str] = set()
async def start(self) -> None:
"""Process bootstrap addresses and emit peer discovery events."""
logger.debug(
f"Starting bootstrap discovery with "
f"{len(self.bootstrap_addrs)} bootstrap addresses"
)
# Validate and filter bootstrap addresses
self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs)
for addr_str in self.bootstrap_addrs:
try:
await self._process_bootstrap_addr(addr_str)
except Exception as e:
logger.debug(f"Failed to process bootstrap address {addr_str}: {e}")
def stop(self) -> None:
"""Clean up bootstrap discovery resources."""
logger.debug("Stopping bootstrap discovery")
self.discovered_peers.clear()
async def _process_bootstrap_addr(self, addr_str: str) -> None:
"""Convert string address to PeerInfo and add to peerstore."""
try:
multiaddr = Multiaddr(addr_str)
except Exception as e:
logger.debug(f"Invalid multiaddr format '{addr_str}': {e}")
return
if self.is_dns_addr(multiaddr):
resolved_addrs = await resolver.resolve(multiaddr)
peer_id_str = multiaddr.get_peer_id()
if peer_id_str is None:
logger.warning(f"Missing peer ID in DNS address: {addr_str}")
return
peer_id = ID.from_base58(peer_id_str)
addrs = [addr for addr in resolved_addrs]
if not addrs:
logger.warning(f"No addresses resolved for DNS address: {addr_str}")
return
peer_info = PeerInfo(peer_id, addrs)
self.add_addr(peer_info)
else:
self.add_addr(info_from_p2p_addr(multiaddr))
def is_dns_addr(self, addr: Multiaddr) -> bool:
"""Check if the address is a DNS address."""
return any(protocol.name == "dnsaddr" for protocol in addr.protocols())
def add_addr(self, peer_info: PeerInfo) -> None:
"""Add a peer to the peerstore and emit discovery event."""
# Skip if it's our own peer
if peer_info.peer_id == self.swarm.get_peer_id():
logger.debug(f"Skipping own peer ID: {peer_info.peer_id}")
return
# Always add addresses to peerstore (allows multiple addresses for same peer)
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
# Only emit discovery event if this is the first time we see this peer
peer_id_str = str(peer_info.peer_id)
if peer_id_str not in self.discovered_peers:
# Track discovered peer
self.discovered_peers.add(peer_id_str)
# Emit peer discovery event
peerDiscovery.emit_peer_discovered(peer_info)
logger.debug(f"Peer discovered: {peer_info.peer_id}")
else:
logger.debug(f"Additional addresses added for peer: {peer_info.peer_id}")

View File

@ -0,0 +1,51 @@
"""Utility functions for bootstrap discovery."""
import logging
from multiaddr import Multiaddr
from libp2p.peer.peerinfo import InvalidAddrError, PeerInfo, info_from_p2p_addr
logger = logging.getLogger("libp2p.discovery.bootstrap.utils")
def validate_bootstrap_addresses(addrs: list[str]) -> list[str]:
"""
Validate and filter bootstrap addresses.
:param addrs: List of bootstrap address strings
:return: List of valid bootstrap addresses
"""
valid_addrs = []
for addr_str in addrs:
try:
# Try to parse as multiaddr
multiaddr = Multiaddr(addr_str)
# Try to extract peer info (this validates the p2p component)
info_from_p2p_addr(multiaddr)
valid_addrs.append(addr_str)
logger.debug(f"Valid bootstrap address: {addr_str}")
except (InvalidAddrError, ValueError, Exception) as e:
logger.warning(f"Invalid bootstrap address '{addr_str}': {e}")
continue
return valid_addrs
def parse_bootstrap_peer_info(addr_str: str) -> PeerInfo | None:
"""
Parse bootstrap address string into PeerInfo.
:param addr_str: Bootstrap address string
:return: PeerInfo object or None if parsing fails
"""
try:
multiaddr = Multiaddr(addr_str)
return info_from_p2p_addr(multiaddr)
except Exception as e:
logger.error(f"Failed to parse bootstrap address '{addr_str}': {e}")
return None

View File

@ -29,6 +29,7 @@ from libp2p.custom_types import (
StreamHandlerFn,
TProtocol,
)
from libp2p.discovery.bootstrap.bootstrap import BootstrapDiscovery
from libp2p.discovery.mdns.mdns import MDNSDiscovery
from libp2p.host.defaults import (
get_default_protocols,
@ -92,6 +93,7 @@ class BasicHost(IHost):
self,
network: INetworkService,
enable_mDNS: bool = False,
bootstrap: list[str] | None = None,
default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None,
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
) -> None:
@ -105,6 +107,8 @@ class BasicHost(IHost):
self.multiselect_client = MultiselectClient()
if enable_mDNS:
self.mDNS = MDNSDiscovery(network)
if bootstrap:
self.bootstrap = BootstrapDiscovery(network, bootstrap)
def get_id(self) -> ID:
"""
@ -172,11 +176,16 @@ class BasicHost(IHost):
if hasattr(self, "mDNS") and self.mDNS is not None:
logger.debug("Starting mDNS Discovery")
self.mDNS.start()
if hasattr(self, "bootstrap") and self.bootstrap is not None:
logger.debug("Starting Bootstrap Discovery")
await self.bootstrap.start()
try:
yield
finally:
if hasattr(self, "mDNS") and self.mDNS is not None:
self.mDNS.stop()
if hasattr(self, "bootstrap") and self.bootstrap is not None:
self.bootstrap.stop()
return _run()

View File

@ -19,9 +19,13 @@ class RoutedHost(BasicHost):
_router: IPeerRouting
def __init__(
self, network: INetworkService, router: IPeerRouting, enable_mDNS: bool = False
self,
network: INetworkService,
router: IPeerRouting,
enable_mDNS: bool = False,
bootstrap: list[str] | None = None,
):
super().__init__(network, enable_mDNS)
super().__init__(network, enable_mDNS, bootstrap)
self._router = router
async def connect(self, peer_info: PeerInfo) -> None: