From c940dac1e6749e5dd4b745bc8aaf6e6755b3624d Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Tue, 26 Aug 2025 01:41:26 +0530 Subject: [PATCH] simplify bootstrap discovery with optimized timeouts --- libp2p/discovery/bootstrap/bootstrap.py | 256 +++++++++--------------- 1 file changed, 90 insertions(+), 166 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index e38e5eeb..c1a6cbbc 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -17,8 +17,8 @@ class BootstrapDiscovery: """ Bootstrap-based peer discovery for py-libp2p. - Uses Trio nurseries for parallel address resolution and connection attempts. - Connects to predefined bootstrap peers and adds them to peerstore. + Processes bootstrap addresses in parallel and attempts initial connections. + Adds discovered peers to peerstore for network bootstrapping. """ def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]): @@ -29,15 +29,15 @@ class BootstrapDiscovery: swarm: The network service (swarm) instance bootstrap_addrs: List of bootstrap peer multiaddresses - Note: Connection maintenance is always enabled to ensure reliable connectivity. - """ self.swarm = swarm self.peerstore = swarm.peerstore self.bootstrap_addrs = bootstrap_addrs or [] self.discovered_peers: set[str] = set() - self.connected_bootstrap_peers: set[ID] = set() - self._disconnect_monitor_running = False + self.connection_timeout: int = 10 + self.connected_peers: set[ID] = ( + set() + ) # Track connected peers for drop detection async def start(self) -> None: """Process bootstrap addresses and emit peer discovery events in parallel.""" @@ -71,7 +71,7 @@ class BootstrapDiscovery: # Start all bootstrap address processing tasks in parallel for addr_str in self.bootstrap_addrs: logger.info(f"Starting parallel task for: {addr_str}") - nursery.start_soon(self._process_bootstrap_addr_safe, addr_str) + nursery.start_soon(self._process_bootstrap_addr, addr_str) # The nursery will wait for all address processing tasks to complete logger.info( @@ -87,20 +87,13 @@ class BootstrapDiscovery: logger.info("Bootstrap discovery startup complete - all tasks finished") - # Always start disconnect monitoring for reliable connectivity - if not self._disconnect_monitor_running: - trio.lowlevel.spawn_system_task(self._monitor_disconnections) - def stop(self) -> None: - """Clean up bootstrap discovery resources and stop all background tasks.""" + """Clean up bootstrap discovery resources.""" logger.info("Stopping bootstrap discovery and cleaning up tasks") # Clear discovered peers self.discovered_peers.clear() - self.connected_bootstrap_peers.clear() - - # Mark disconnect monitor as stopped - self._disconnect_monitor_running = False + self.connected_peers.clear() logger.debug("Bootstrap discovery cleanup completed") @@ -164,7 +157,7 @@ class BootstrapDiscovery: logger.debug(f"Skipping own peer ID: {peer_info.peer_id}") return - # Filter addresses to only include IPv4+TCP (restrict dialing attempts) + # Filter addresses to only include IPv4+TCP (only supported protocol) ipv4_tcp_addrs = [] filtered_out_addrs = [] @@ -174,12 +167,9 @@ class BootstrapDiscovery: else: filtered_out_addrs.append(addr) - # Log filtering results with fallback strategy details + # Log filtering results logger.info(f"Address filtering for {peer_info.peer_id}:") - logger.info( - f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)} " - f"(will be tried in sequence for fallback)" - ) + logger.info(f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)}") logger.info(f"Filtered out: {len(filtered_out_addrs)} (unsupported protocols)") # Show filtered addresses for debugging @@ -187,11 +177,11 @@ class BootstrapDiscovery: for addr in filtered_out_addrs: logger.debug(f"Filtered: {addr}") - # Show addresses that will be used for fallback + # Show addresses that will be used if ipv4_tcp_addrs: - logger.debug("Addresses for fallback attempts:") + logger.debug("Usable addresses:") for i, addr in enumerate(ipv4_tcp_addrs, 1): - logger.debug(f" Fallback {i}: {addr}") + logger.debug(f" Address {i}: {addr}") # Skip peer if no IPv4+TCP addresses available if not ipv4_tcp_addrs: @@ -202,12 +192,10 @@ class BootstrapDiscovery: return logger.info( - f"Will attempt connection with automatic fallback through " - f"{len(ipv4_tcp_addrs)} IPv4+TCP addresses" + f"Will attempt connection using {len(ipv4_tcp_addrs)} IPv4+TCP addresses" ) # Add only IPv4+TCP addresses to peerstore - # (restrict dialing to supported protocols) self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, 0) # Allow other tasks to run after adding to peerstore @@ -268,10 +256,10 @@ class BootstrapDiscovery: async def _connect_to_peer(self, peer_id: ID) -> None: """ - Attempt to establish a connection to a peer with fallback logic. + Attempt to establish a connection to a peer with timeout. - Uses swarm.dial_peer which tries all available addresses for the peer - in sequence until one succeeds or all fail. + Uses swarm.dial_peer to connect using addresses stored in peerstore. + Times out after connection_timeout seconds to prevent hanging. """ logger.info(f"Connection attempt for peer: {peer_id}") @@ -303,55 +291,64 @@ class BootstrapDiscovery: connection_start_time = trio.current_time() try: - # Log connection attempt with fallback details - logger.info( - f"Attempting connection to {peer_id} (will try {len(available_addrs)} " - f"addresses with automatic fallback)" + with trio.move_on_after(self.connection_timeout): + # Log connection attempt + logger.info( + f"Attempting connection to {peer_id} using " + f"{len(available_addrs)} addresses" + ) + + # Log each address that will be attempted + for i, addr in enumerate(available_addrs, 1): + logger.debug(f"Address {i}: {addr}") + + # Use swarm.dial_peer to connect using stored addresses + connection = await self.swarm.dial_peer(peer_id) + + # Calculate connection time + connection_time = trio.current_time() - connection_start_time + + # Allow other tasks to run after dial attempt + await trio.lowlevel.checkpoint() + + # Post-connection validation: Verify connection was actually established + if peer_id in self.swarm.connections: + logger.info( + f"✅ Connected to {peer_id} (took {connection_time:.2f}s)" + ) + + # Track this connection for drop monitoring + self.connected_peers.add(peer_id) + + # Start monitoring this specific connection for drops + trio.lowlevel.spawn_system_task( + self._monitor_peer_connection, peer_id + ) + + # Log which address was successful (if available) + if hasattr(connection, "get_transport_addresses"): + successful_addrs = connection.get_transport_addresses() + if successful_addrs: + logger.debug(f"Successful address: {successful_addrs[0]}") + else: + logger.warning( + f"Dial succeeded but connection not found for {peer_id}" + ) + except trio.TooSlowError: + logger.warning( + f"❌ Connection to {peer_id} timed out after {self.connection_timeout}s" ) - - # Log each address that will be attempted - for i, addr in enumerate(available_addrs, 1): - logger.debug(f"Fallback address {i}: {addr}") - - # Use swarm.dial_peer - this automatically implements fallback logic: - # - Tries each address in sequence until one succeeds - # - Collects exceptions from failed attempts - # - Raises SwarmException with MultiError if all attempts fail - connection = await self.swarm.dial_peer(peer_id) - - # Calculate connection time - connection_time = trio.current_time() - connection_start_time - - # Allow other tasks to run after dial attempt - await trio.lowlevel.checkpoint() - - # Post-connection validation: Verify connection was actually established - if peer_id in self.swarm.connections: - logger.info(f"✅ Connected to {peer_id} (took {connection_time:.2f}s)") - - # Track this as a connected bootstrap peer - self.connected_bootstrap_peers.add(peer_id) - - # Log which address was successful (if available) - if hasattr(connection, "get_transport_addresses"): - successful_addrs = connection.get_transport_addresses() - if successful_addrs: - logger.debug(f"Successful address: {successful_addrs[0]}") - else: - logger.warning(f"Dial succeeded but connection not found for {peer_id}") - except SwarmException as e: # Calculate failed connection time failed_connection_time = trio.current_time() - connection_start_time - # Enhanced error logging with fallback details + # Enhanced error logging error_msg = str(e) if "no addresses established a successful connection" in error_msg: logger.warning( f"❌ Failed to connect to {peer_id} after trying all " f"{len(available_addrs)} addresses " - f"(took {failed_connection_time:.2f}s) - " - f"all fallback attempts failed" + f"(took {failed_connection_time:.2f}s)" ) # Log individual address failures if this is a MultiError if ( @@ -384,117 +381,44 @@ class BootstrapDiscovery: # Don't re-raise to prevent killing the nursery and other parallel tasks logger.debug("Continuing with other parallel connection attempts") - async def _monitor_disconnections(self) -> None: + async def _monitor_peer_connection(self, peer_id: ID) -> None: """ - Monitor bootstrap peer connections and immediately reconnect when they drop. + Monitor a specific peer connection for drops using event-driven detection. - This runs as a background task that efficiently detects - disconnections in real-time. + Waits for the connection to be removed from swarm.connections, which + happens when error 4101 or other connection errors occur. """ - self._disconnect_monitor_running = True - logger.info( - "Disconnect monitor started - will reconnect " - "immediately when connections drop" - ) + logger.debug(f"🔍 Started monitoring connection to {peer_id}") try: - while True: - # Check for disconnections more frequently but efficiently - await trio.sleep(1.0) # Check every second for responsiveness + # Wait for the connection to disappear (event-driven) + while peer_id in self.swarm.connections: + await trio.sleep(0.1) # Small sleep to yield control - # Check which bootstrap peers are no longer connected - disconnected_peers = [] - for peer_id in list(self.connected_bootstrap_peers): - if peer_id not in self.swarm.connections: - disconnected_peers.append(peer_id) - self.connected_bootstrap_peers.discard(peer_id) - logger.info( - f"⚠️ Detected disconnection from bootstrap peer: {peer_id}" - ) + # Connection was dropped - log it immediately + if peer_id in self.connected_peers: + self.connected_peers.discard(peer_id) + logger.warning( + f"📡 Connection to {peer_id} was dropped! (detected event-driven)" + ) - # Immediately reconnect to disconnected peers - if disconnected_peers: - logger.info( - f"🔄 Immediately reconnecting to {len(disconnected_peers)} " - f"disconnected bootstrap peer(s)" - ) - - # Reconnect in parallel for better performance - try: - async with trio.open_nursery() as reconnect_nursery: - for peer_id in disconnected_peers: - logger.info(f"🔌 Reconnecting to {peer_id}") - reconnect_nursery.start_soon( - self._reconnect_to_peer, peer_id - ) - except trio.Cancelled: - logger.debug("Reconnection nursery cancelled") - raise - except Exception as e: - logger.warning(f"Reconnection nursery failed: {e}") + # Log current connection count + remaining_connections = len(self.connected_peers) + logger.info(f"📊 Remaining connected peers: {remaining_connections}") except trio.Cancelled: - logger.info("Disconnect monitor stopped - task cancelled") + logger.debug(f"Connection monitoring for {peer_id} stopped") except Exception as e: - logger.error(f"Unexpected error in disconnect monitor: {e}") - finally: - self._disconnect_monitor_running = False - logger.debug("Disconnect monitor task cleanup completed") - - async def _reconnect_to_peer(self, peer_id: ID) -> None: - """ - Reconnect to a specific bootstrap peer with backoff on failure. - - This method includes simple backoff logic to avoid overwhelming - peers that may be temporarily unavailable. - """ - max_attempts = 3 - base_delay = 1.0 - - try: - for attempt in range(1, max_attempts + 1): - try: - logger.debug( - f"Reconnection attempt {attempt}/{max_attempts} for {peer_id}" - ) - await self._connect_to_peer(peer_id) - - # If we get here, connection was successful - if peer_id in self.swarm.connections: - logger.info( - f"✅ Successfully reconnected to {peer_id} on " - f"attempt {attempt}" - ) - return - - except Exception as e: - logger.debug( - f"Reconnection attempt {attempt} failed for {peer_id}: {e}" - ) - - # Wait before next attempt (exponential backoff) - if attempt < max_attempts: - delay = base_delay * (2 ** (attempt - 1)) # 1s, 2s, 4s - logger.debug( - f"Waiting {delay}s before next reconnection attempt" - ) - await trio.sleep(delay) - - logger.warning( - f"❌ Failed to reconnect to {peer_id} after {max_attempts} attempts" - ) - - except Exception as e: - # Catch any unexpected errors to prevent crashing the nursery - logger.error(f"❌ Unexpected error during reconnection to {peer_id}: {e}") - # Don't re-raise to keep other parallel reconnection tasks running + logger.error(f"Error monitoring connection to {peer_id}: {e}") + # Clean up tracking on error + self.connected_peers.discard(peer_id) def _is_ipv4_tcp_addr(self, addr: Multiaddr) -> bool: """ Check if address is IPv4 with TCP protocol only. - This restricts dialing attempts to addresses that conform to IPv4+TCP, - filtering out IPv6, UDP, QUIC, WebSocket, and other unsupported protocols. + Filters out IPv6, UDP, QUIC, WebSocket, and other unsupported protocols. + Only IPv4+TCP addresses are supported by the current transport. """ try: protocols = addr.protocols()