diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index 9eaaaa07..e38e5eeb 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -1,11 +1,10 @@ import logging -import trio from multiaddr import Multiaddr from multiaddr.resolvers import DNSResolver +import trio from libp2p.abc import ID, INetworkService, PeerInfo -from libp2p.discovery.bootstrap.utils import validate_bootstrap_addresses from libp2p.discovery.events.peerDiscovery import peerDiscovery from libp2p.network.exceptions import SwarmException from libp2p.peer.peerinfo import info_from_p2p_addr @@ -17,27 +16,39 @@ resolver = DNSResolver() class BootstrapDiscovery: """ Bootstrap-based peer discovery for py-libp2p. - + Uses Trio nurseries for parallel address resolution and connection attempts. Connects to predefined bootstrap peers and adds them to peerstore. """ def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]): + """ + Initialize BootstrapDiscovery. + + Args: + swarm: The network service (swarm) instance + bootstrap_addrs: List of bootstrap peer multiaddresses + + Note: Connection maintenance is always enabled to ensure reliable connectivity. + + """ self.swarm = swarm self.peerstore = swarm.peerstore self.bootstrap_addrs = bootstrap_addrs or [] self.discovered_peers: set[str] = set() + self.connected_bootstrap_peers: set[ID] = set() + self._disconnect_monitor_running = False async def start(self) -> None: """Process bootstrap addresses and emit peer discovery events in parallel.""" logger.info( - f"🚀 Starting bootstrap discovery with " + f"Starting bootstrap discovery with " f"{len(self.bootstrap_addrs)} bootstrap addresses" ) - + # Show all bootstrap addresses being processed for i, addr in enumerate(self.bootstrap_addrs): - logger.info(f"{i+1}. {addr}") + logger.info(f"{i + 1}. {addr}") # Allow other tasks to run await trio.lowlevel.checkpoint() @@ -50,30 +61,56 @@ class BootstrapDiscovery: await trio.lowlevel.checkpoint() # Use Trio nursery for PARALLEL address processing - async with trio.open_nursery() as nursery: - logger.info(f"Starting {len(self.bootstrap_addrs)} parallel address processing tasks") - - # Start all bootstrap address processing tasks in parallel - for addr_str in self.bootstrap_addrs: - logger.info(f"Starting parallel task for: {addr_str}") - nursery.start_soon(self._process_bootstrap_addr_safe, addr_str) - - # The nursery will wait for all address processing tasks to complete - logger.info("⏳ Nursery active - waiting for address processing tasks to complete") - - logger.info("✅ Bootstrap discovery startup complete - all tasks finished") + try: + async with trio.open_nursery() as nursery: + logger.info( + f"Starting {len(self.bootstrap_addrs)} parallel address " + f"processing tasks" + ) + + # Start all bootstrap address processing tasks in parallel + for addr_str in self.bootstrap_addrs: + logger.info(f"Starting parallel task for: {addr_str}") + nursery.start_soon(self._process_bootstrap_addr_safe, addr_str) + + # The nursery will wait for all address processing tasks to complete + logger.info( + "Nursery active - waiting for address processing tasks to complete" + ) + + except trio.Cancelled: + logger.info("Bootstrap address processing cancelled - cleaning up tasks") + raise + except Exception as e: + logger.error(f"Bootstrap address processing failed: {e}") + raise + + logger.info("Bootstrap discovery startup complete - all tasks finished") + + # Always start disconnect monitoring for reliable connectivity + if not self._disconnect_monitor_running: + trio.lowlevel.spawn_system_task(self._monitor_disconnections) def stop(self) -> None: - """Clean up bootstrap discovery resources.""" - logger.debug("Stopping bootstrap discovery") + """Clean up bootstrap discovery resources and stop all background tasks.""" + logger.info("Stopping bootstrap discovery and cleaning up tasks") + + # Clear discovered peers self.discovered_peers.clear() + self.connected_bootstrap_peers.clear() + + # Mark disconnect monitor as stopped + self._disconnect_monitor_running = False + + logger.debug("Bootstrap discovery cleanup completed") async def _process_bootstrap_addr_safe(self, addr_str: str) -> None: """Safely process a bootstrap address with exception handling.""" try: await self._process_bootstrap_addr(addr_str) except Exception as e: - logger.debug(f"Failed to process bootstrap address {addr_str}: {e}") + logger.warning(f"Failed to process bootstrap address {addr_str}: {e}") + # Ensure task cleanup and continue processing other addresses async def _process_bootstrap_addr(self, addr_str: str) -> None: """Convert string address to PeerInfo and add to peerstore.""" @@ -82,19 +119,19 @@ class BootstrapDiscovery: except Exception as e: logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") return - + if self.is_dns_addr(multiaddr): # Allow other tasks to run during DNS resolution await trio.lowlevel.checkpoint() - + resolved_addrs = await resolver.resolve(multiaddr) if resolved_addrs is None: logger.warning(f"DNS resolution returned None for: {addr_str}") return - + # Allow other tasks to run after DNS resolution await trio.lowlevel.checkpoint() - + peer_id_str = multiaddr.get_peer_id() if peer_id_str is None: logger.warning(f"Missing peer ID in DNS address: {addr_str}") @@ -115,24 +152,70 @@ class BootstrapDiscovery: return any(protocol.name == "dnsaddr" for protocol in addr.protocols()) async def add_addr(self, peer_info: PeerInfo) -> None: - """Add a peer to the peerstore, emit discovery event, and attempt connection in parallel.""" - logger.info(f"📥 Adding peer to peerstore: {peer_info.peer_id}") - logger.info(f"📍 Total addresses received: {len(peer_info.addrs)}") - + """ + Add a peer to the peerstore, emit discovery event, + and attempt connection in parallel. + """ + logger.info(f"Adding peer to peerstore: {peer_info.peer_id}") + logger.info(f"Total addresses received: {len(peer_info.addrs)}") + # Skip if it's our own peer if peer_info.peer_id == self.swarm.get_peer_id(): logger.debug(f"Skipping own peer ID: {peer_info.peer_id}") return - - # Always add addresses to peerstore with TTL=0 (no expiration) - self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 0) + + # Filter addresses to only include IPv4+TCP (restrict dialing attempts) + ipv4_tcp_addrs = [] + filtered_out_addrs = [] + + for addr in peer_info.addrs: + if self._is_ipv4_tcp_addr(addr): + ipv4_tcp_addrs.append(addr) + else: + filtered_out_addrs.append(addr) + + # Log filtering results with fallback strategy details + logger.info(f"Address filtering for {peer_info.peer_id}:") + logger.info( + f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)} " + f"(will be tried in sequence for fallback)" + ) + logger.info(f"Filtered out: {len(filtered_out_addrs)} (unsupported protocols)") + + # Show filtered addresses for debugging + if filtered_out_addrs: + for addr in filtered_out_addrs: + logger.debug(f"Filtered: {addr}") + + # Show addresses that will be used for fallback + if ipv4_tcp_addrs: + logger.debug("Addresses for fallback attempts:") + for i, addr in enumerate(ipv4_tcp_addrs, 1): + logger.debug(f" Fallback {i}: {addr}") + + # Skip peer if no IPv4+TCP addresses available + if not ipv4_tcp_addrs: + logger.warning( + f"❌ No IPv4+TCP addresses for {peer_info.peer_id} - " + f"skipping connection attempts" + ) + return + + logger.info( + f"Will attempt connection with automatic fallback through " + f"{len(ipv4_tcp_addrs)} IPv4+TCP addresses" + ) + + # Add only IPv4+TCP addresses to peerstore + # (restrict dialing to supported protocols) + self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, 0) # Allow other tasks to run after adding to peerstore await trio.lowlevel.checkpoint() # Verify addresses were added stored_addrs = self.peerstore.addrs(peer_info.peer_id) - logger.info(f"✅ Addresses stored in peerstore: {len(stored_addrs)} addresses") + logger.info(f"Addresses stored in peerstore: {len(stored_addrs)} addresses") # Only emit discovery event if this is the first time we see this peer peer_id_str = str(peer_info.peer_id) @@ -143,24 +226,55 @@ class BootstrapDiscovery: peerDiscovery.emit_peer_discovered(peer_info) logger.debug(f"Peer discovered: {peer_info.peer_id}") - # Use nursery for parallel connection attempt - async with trio.open_nursery() as connection_nursery: - logger.info(f" 🔌 Starting parallel connection attempt...") - connection_nursery.start_soon(self._connect_to_peer, peer_info.peer_id) - + # Use nursery for parallel connection attempt (non-blocking) + try: + async with trio.open_nursery() as connection_nursery: + logger.info("Starting parallel connection attempt...") + connection_nursery.start_soon( + self._connect_to_peer, peer_info.peer_id + ) + except trio.Cancelled: + logger.debug(f"Connection attempt cancelled for {peer_info.peer_id}") + raise + except Exception as e: + logger.warning( + f"Connection nursery failed for {peer_info.peer_id}: {e}" + ) + else: - logger.debug(f"🔄 Additional addresses added for existing peer: {peer_info.peer_id}") + logger.debug( + f"Additional addresses added for existing peer: {peer_info.peer_id}" + ) # Even for existing peers, try to connect if not already connected if peer_info.peer_id not in self.swarm.connections: - logger.info(f"🔌 Starting parallel connection attempt for existing peer...") - # Use nursery for parallel connection - async with trio.open_nursery() as connection_nursery: - connection_nursery.start_soon(self._connect_to_peer, peer_info.peer_id) + logger.info("Starting parallel connection attempt for existing peer...") + # Use nursery for parallel connection attempt (non-blocking) + try: + async with trio.open_nursery() as connection_nursery: + connection_nursery.start_soon( + self._connect_to_peer, peer_info.peer_id + ) + except trio.Cancelled: + logger.debug( + f"Connection attempt cancelled for existing peer " + f"{peer_info.peer_id}" + ) + raise + except Exception as e: + logger.warning( + f"Connection nursery failed for existing peer " + f"{peer_info.peer_id}: {e}" + ) async def _connect_to_peer(self, peer_id: ID) -> None: - """Attempt to establish a connection to a peer using swarm.dial_peer.""" - logger.info(f"🔌 Connection attempt for peer: {peer_id}") - + """ + Attempt to establish a connection to a peer with fallback logic. + + Uses swarm.dial_peer which tries all available addresses for the peer + in sequence until one succeeds or all fail. + """ + logger.info(f"Connection attempt for peer: {peer_id}") + # Pre-connection validation: Check if already connected if peer_id in self.swarm.connections: logger.debug( @@ -173,33 +287,230 @@ class BootstrapDiscovery: # Check available addresses before attempting connection available_addrs = self.peerstore.addrs(peer_id) - logger.info(f"📍 Available addresses for {peer_id}: {len(available_addrs)} addresses") - + logger.info( + f"Available addresses for {peer_id}: {len(available_addrs)} addresses" + ) + + # Log all available addresses for transparency + for i, addr in enumerate(available_addrs, 1): + logger.debug(f" Address {i}: {addr}") + if not available_addrs: logger.error(f"❌ No addresses available for {peer_id} - cannot connect") return - try: - # Log connection attempt for monitoring and debugging - logger.debug(f"Attempting to connect to {peer_id}") + # Record start time for connection attempt monitoring + connection_start_time = trio.current_time() + + try: + # Log connection attempt with fallback details + logger.info( + f"Attempting connection to {peer_id} (will try {len(available_addrs)} " + f"addresses with automatic fallback)" + ) + + # Log each address that will be attempted + for i, addr in enumerate(available_addrs, 1): + logger.debug(f"Fallback address {i}: {addr}") + + # Use swarm.dial_peer - this automatically implements fallback logic: + # - Tries each address in sequence until one succeeds + # - Collects exceptions from failed attempts + # - Raises SwarmException with MultiError if all attempts fail + connection = await self.swarm.dial_peer(peer_id) + + # Calculate connection time + connection_time = trio.current_time() - connection_start_time - # Use swarm.dial_peer to establish connection - await self.swarm.dial_peer(peer_id) - # Allow other tasks to run after dial attempt await trio.lowlevel.checkpoint() - + # Post-connection validation: Verify connection was actually established if peer_id in self.swarm.connections: - logger.info(f"Connected to {peer_id}") + logger.info(f"✅ Connected to {peer_id} (took {connection_time:.2f}s)") + + # Track this as a connected bootstrap peer + self.connected_bootstrap_peers.add(peer_id) + + # Log which address was successful (if available) + if hasattr(connection, "get_transport_addresses"): + successful_addrs = connection.get_transport_addresses() + if successful_addrs: + logger.debug(f"Successful address: {successful_addrs[0]}") else: logger.warning(f"Dial succeeded but connection not found for {peer_id}") except SwarmException as e: - # Handle swarm-level connection errors - logger.warning(f"Failed to connect to {peer_id}: {e}") + # Calculate failed connection time + failed_connection_time = trio.current_time() - connection_start_time + + # Enhanced error logging with fallback details + error_msg = str(e) + if "no addresses established a successful connection" in error_msg: + logger.warning( + f"❌ Failed to connect to {peer_id} after trying all " + f"{len(available_addrs)} addresses " + f"(took {failed_connection_time:.2f}s) - " + f"all fallback attempts failed" + ) + # Log individual address failures if this is a MultiError + if ( + e.__cause__ is not None + and hasattr(e.__cause__, "exceptions") + and getattr(e.__cause__, "exceptions", None) is not None + ): + exceptions_list = getattr(e.__cause__, "exceptions") + logger.info("📋 Individual address failure details:") + for i, addr_exception in enumerate(exceptions_list, 1): + logger.info(f"Address {i}: {addr_exception}") + # Also log the actual address that failed + if i <= len(available_addrs): + logger.info(f"Failed address: {available_addrs[i - 1]}") + else: + logger.warning("No detailed exception information available") + else: + logger.warning( + f"❌ Failed to connect to {peer_id}: {e} " + f"(took {failed_connection_time:.2f}s)" + ) except Exception as e: # Handle unexpected errors that aren't swarm-specific - logger.error(f"Unexpected error connecting to {peer_id}: {e}") - raise \ No newline at end of file + failed_connection_time = trio.current_time() - connection_start_time + logger.error( + f"❌ Unexpected error connecting to {peer_id}: " + f"{e} (took {failed_connection_time:.2f}s)" + ) + # Don't re-raise to prevent killing the nursery and other parallel tasks + logger.debug("Continuing with other parallel connection attempts") + + async def _monitor_disconnections(self) -> None: + """ + Monitor bootstrap peer connections and immediately reconnect when they drop. + + This runs as a background task that efficiently detects + disconnections in real-time. + """ + self._disconnect_monitor_running = True + logger.info( + "Disconnect monitor started - will reconnect " + "immediately when connections drop" + ) + + try: + while True: + # Check for disconnections more frequently but efficiently + await trio.sleep(1.0) # Check every second for responsiveness + + # Check which bootstrap peers are no longer connected + disconnected_peers = [] + for peer_id in list(self.connected_bootstrap_peers): + if peer_id not in self.swarm.connections: + disconnected_peers.append(peer_id) + self.connected_bootstrap_peers.discard(peer_id) + logger.info( + f"⚠️ Detected disconnection from bootstrap peer: {peer_id}" + ) + + # Immediately reconnect to disconnected peers + if disconnected_peers: + logger.info( + f"🔄 Immediately reconnecting to {len(disconnected_peers)} " + f"disconnected bootstrap peer(s)" + ) + + # Reconnect in parallel for better performance + try: + async with trio.open_nursery() as reconnect_nursery: + for peer_id in disconnected_peers: + logger.info(f"🔌 Reconnecting to {peer_id}") + reconnect_nursery.start_soon( + self._reconnect_to_peer, peer_id + ) + except trio.Cancelled: + logger.debug("Reconnection nursery cancelled") + raise + except Exception as e: + logger.warning(f"Reconnection nursery failed: {e}") + + except trio.Cancelled: + logger.info("Disconnect monitor stopped - task cancelled") + except Exception as e: + logger.error(f"Unexpected error in disconnect monitor: {e}") + finally: + self._disconnect_monitor_running = False + logger.debug("Disconnect monitor task cleanup completed") + + async def _reconnect_to_peer(self, peer_id: ID) -> None: + """ + Reconnect to a specific bootstrap peer with backoff on failure. + + This method includes simple backoff logic to avoid overwhelming + peers that may be temporarily unavailable. + """ + max_attempts = 3 + base_delay = 1.0 + + try: + for attempt in range(1, max_attempts + 1): + try: + logger.debug( + f"Reconnection attempt {attempt}/{max_attempts} for {peer_id}" + ) + await self._connect_to_peer(peer_id) + + # If we get here, connection was successful + if peer_id in self.swarm.connections: + logger.info( + f"✅ Successfully reconnected to {peer_id} on " + f"attempt {attempt}" + ) + return + + except Exception as e: + logger.debug( + f"Reconnection attempt {attempt} failed for {peer_id}: {e}" + ) + + # Wait before next attempt (exponential backoff) + if attempt < max_attempts: + delay = base_delay * (2 ** (attempt - 1)) # 1s, 2s, 4s + logger.debug( + f"Waiting {delay}s before next reconnection attempt" + ) + await trio.sleep(delay) + + logger.warning( + f"❌ Failed to reconnect to {peer_id} after {max_attempts} attempts" + ) + + except Exception as e: + # Catch any unexpected errors to prevent crashing the nursery + logger.error(f"❌ Unexpected error during reconnection to {peer_id}: {e}") + # Don't re-raise to keep other parallel reconnection tasks running + + def _is_ipv4_tcp_addr(self, addr: Multiaddr) -> bool: + """ + Check if address is IPv4 with TCP protocol only. + + This restricts dialing attempts to addresses that conform to IPv4+TCP, + filtering out IPv6, UDP, QUIC, WebSocket, and other unsupported protocols. + """ + try: + protocols = addr.protocols() + + # Must have IPv4 protocol + has_ipv4 = any(p.name == "ip4" for p in protocols) + if not has_ipv4: + return False + + # Must have TCP protocol + has_tcp = any(p.name == "tcp" for p in protocols) + if not has_tcp: + return False + + return True + + except Exception: + # If we can't parse the address, don't use it + return False