From 163cc35cb00565911c807cdcc98aa5a94f8884d4 Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Sun, 17 Aug 2025 02:12:09 +0530 Subject: [PATCH 1/8] Enhance Bootstrap module to dial peers after address resolution. --- libp2p/discovery/bootstrap/bootstrap.py | 53 +++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index 222a88a1..290a5089 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -6,6 +6,7 @@ from multiaddr.resolvers import DNSResolver from libp2p.abc import ID, INetworkService, PeerInfo from libp2p.discovery.bootstrap.utils import validate_bootstrap_addresses from libp2p.discovery.events.peerDiscovery import peerDiscovery +from libp2p.network.exceptions import SwarmException from libp2p.peer.peerinfo import info_from_p2p_addr logger = logging.getLogger("libp2p.discovery.bootstrap") @@ -64,16 +65,17 @@ class BootstrapDiscovery: logger.warning(f"No addresses resolved for DNS address: {addr_str}") return peer_info = PeerInfo(peer_id, addrs) - self.add_addr(peer_info) + await self.add_addr(peer_info) else: - self.add_addr(info_from_p2p_addr(multiaddr)) + peer_info = info_from_p2p_addr(multiaddr) + await self.add_addr(peer_info) def is_dns_addr(self, addr: Multiaddr) -> bool: """Check if the address is a DNS address.""" return any(protocol.name == "dnsaddr" for protocol in addr.protocols()) - def add_addr(self, peer_info: PeerInfo) -> None: - """Add a peer to the peerstore and emit discovery event.""" + async def add_addr(self, peer_info: PeerInfo) -> None: + """Add a peer to the peerstore, emit discovery event, and attempt connection.""" # Skip if it's our own peer if peer_info.peer_id == self.swarm.get_peer_id(): logger.debug(f"Skipping own peer ID: {peer_info.peer_id}") @@ -90,5 +92,48 @@ class BootstrapDiscovery: # Emit peer discovery event peerDiscovery.emit_peer_discovered(peer_info) logger.debug(f"Peer discovered: {peer_info.peer_id}") + + # Attempt to connect to the peer + await self._connect_to_peer(peer_info.peer_id) else: logger.debug(f"Additional addresses added for peer: {peer_info.peer_id}") + + async def _connect_to_peer(self, peer_id: ID) -> None: + """Attempt to establish a connection to a peer using swarm.dial_peer.""" + # Pre-connection validation: Check if already connected + # This prevents duplicate connection attempts and unnecessary network overhead + if peer_id in self.swarm.connections: + logger.debug( + f"Already connected to {peer_id} - skipping connection attempt" + ) + return + + try: + # Log connection attempt for monitoring and debugging + logger.debug(f"Attempting to connect to {peer_id}") + + # Use swarm.dial_peer to establish connection + await self.swarm.dial_peer(peer_id) + + # Post-connection validation: Verify connection was actually established + # swarm.dial_peer may succeed but connection might not be in + # connections dict + # This can happen due to race conditions or connection cleanup + if peer_id in self.swarm.connections: + # Connection successfully established and registered + # Log success at INFO level for operational visibility + logger.info(f"Connected to {peer_id}") + else: + # Edge case: dial succeeded but connection not found + # This indicates a potential issue with connection management + logger.warning(f"Dial succeeded but connection not found for {peer_id}") + + except SwarmException as e: + # Handle swarm-level connection errors + logger.warning(f"Failed to connect to {peer_id}: {e}") + + except Exception as e: + # Handle unexpected errors that aren't swarm-specific + logger.error(f"Unexpected error connecting to {peer_id}: {e}") + # Re-raise to allow caller to handle if needed + raise From 8d9b7f413dbabe8fbe4b7c9809c0fd943fd0f1e8 Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Thu, 21 Aug 2025 11:20:21 +0530 Subject: [PATCH 2/8] Add trio nursery address resolution and connection attempts --- libp2p/discovery/bootstrap/bootstrap.py | 118 ++++++++++++++++++------ 1 file changed, 92 insertions(+), 26 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index 290a5089..9eaaaa07 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -1,4 +1,5 @@ import logging +import trio from multiaddr import Multiaddr from multiaddr.resolvers import DNSResolver @@ -16,6 +17,8 @@ resolver = DNSResolver() class BootstrapDiscovery: """ Bootstrap-based peer discovery for py-libp2p. + + Uses Trio nurseries for parallel address resolution and connection attempts. Connects to predefined bootstrap peers and adds them to peerstore. """ @@ -26,26 +29,52 @@ class BootstrapDiscovery: self.discovered_peers: set[str] = set() async def start(self) -> None: - """Process bootstrap addresses and emit peer discovery events.""" - logger.debug( - f"Starting bootstrap discovery with " + """Process bootstrap addresses and emit peer discovery events in parallel.""" + logger.info( + f"🚀 Starting bootstrap discovery with " f"{len(self.bootstrap_addrs)} bootstrap addresses" ) + + # Show all bootstrap addresses being processed + for i, addr in enumerate(self.bootstrap_addrs): + logger.info(f"{i+1}. {addr}") + + # Allow other tasks to run + await trio.lowlevel.checkpoint() # Validate and filter bootstrap addresses - self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs) + # self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs) + logger.info(f"Valid addresses after validation: {len(self.bootstrap_addrs)}") - for addr_str in self.bootstrap_addrs: - try: - await self._process_bootstrap_addr(addr_str) - except Exception as e: - logger.debug(f"Failed to process bootstrap address {addr_str}: {e}") + # Allow other tasks to run after validation + await trio.lowlevel.checkpoint() + + # Use Trio nursery for PARALLEL address processing + async with trio.open_nursery() as nursery: + logger.info(f"Starting {len(self.bootstrap_addrs)} parallel address processing tasks") + + # Start all bootstrap address processing tasks in parallel + for addr_str in self.bootstrap_addrs: + logger.info(f"Starting parallel task for: {addr_str}") + nursery.start_soon(self._process_bootstrap_addr_safe, addr_str) + + # The nursery will wait for all address processing tasks to complete + logger.info("⏳ Nursery active - waiting for address processing tasks to complete") + + logger.info("✅ Bootstrap discovery startup complete - all tasks finished") def stop(self) -> None: """Clean up bootstrap discovery resources.""" logger.debug("Stopping bootstrap discovery") self.discovered_peers.clear() + async def _process_bootstrap_addr_safe(self, addr_str: str) -> None: + """Safely process a bootstrap address with exception handling.""" + try: + await self._process_bootstrap_addr(addr_str) + except Exception as e: + logger.debug(f"Failed to process bootstrap address {addr_str}: {e}") + async def _process_bootstrap_addr(self, addr_str: str) -> None: """Convert string address to PeerInfo and add to peerstore.""" try: @@ -53,8 +82,19 @@ class BootstrapDiscovery: except Exception as e: logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") return + if self.is_dns_addr(multiaddr): + # Allow other tasks to run during DNS resolution + await trio.lowlevel.checkpoint() + resolved_addrs = await resolver.resolve(multiaddr) + if resolved_addrs is None: + logger.warning(f"DNS resolution returned None for: {addr_str}") + return + + # Allow other tasks to run after DNS resolution + await trio.lowlevel.checkpoint() + peer_id_str = multiaddr.get_peer_id() if peer_id_str is None: logger.warning(f"Missing peer ID in DNS address: {addr_str}") @@ -75,14 +115,24 @@ class BootstrapDiscovery: return any(protocol.name == "dnsaddr" for protocol in addr.protocols()) async def add_addr(self, peer_info: PeerInfo) -> None: - """Add a peer to the peerstore, emit discovery event, and attempt connection.""" + """Add a peer to the peerstore, emit discovery event, and attempt connection in parallel.""" + logger.info(f"📥 Adding peer to peerstore: {peer_info.peer_id}") + logger.info(f"📍 Total addresses received: {len(peer_info.addrs)}") + # Skip if it's our own peer if peer_info.peer_id == self.swarm.get_peer_id(): logger.debug(f"Skipping own peer ID: {peer_info.peer_id}") return + + # Always add addresses to peerstore with TTL=0 (no expiration) + self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 0) - # Always add addresses to peerstore (allows multiple addresses for same peer) - self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + # Allow other tasks to run after adding to peerstore + await trio.lowlevel.checkpoint() + + # Verify addresses were added + stored_addrs = self.peerstore.addrs(peer_info.peer_id) + logger.info(f"✅ Addresses stored in peerstore: {len(stored_addrs)} addresses") # Only emit discovery event if this is the first time we see this peer peer_id_str = str(peer_info.peer_id) @@ -93,39 +143,56 @@ class BootstrapDiscovery: peerDiscovery.emit_peer_discovered(peer_info) logger.debug(f"Peer discovered: {peer_info.peer_id}") - # Attempt to connect to the peer - await self._connect_to_peer(peer_info.peer_id) + # Use nursery for parallel connection attempt + async with trio.open_nursery() as connection_nursery: + logger.info(f" 🔌 Starting parallel connection attempt...") + connection_nursery.start_soon(self._connect_to_peer, peer_info.peer_id) + else: - logger.debug(f"Additional addresses added for peer: {peer_info.peer_id}") + logger.debug(f"🔄 Additional addresses added for existing peer: {peer_info.peer_id}") + # Even for existing peers, try to connect if not already connected + if peer_info.peer_id not in self.swarm.connections: + logger.info(f"🔌 Starting parallel connection attempt for existing peer...") + # Use nursery for parallel connection + async with trio.open_nursery() as connection_nursery: + connection_nursery.start_soon(self._connect_to_peer, peer_info.peer_id) async def _connect_to_peer(self, peer_id: ID) -> None: """Attempt to establish a connection to a peer using swarm.dial_peer.""" + logger.info(f"🔌 Connection attempt for peer: {peer_id}") + # Pre-connection validation: Check if already connected - # This prevents duplicate connection attempts and unnecessary network overhead if peer_id in self.swarm.connections: logger.debug( f"Already connected to {peer_id} - skipping connection attempt" ) return + # Allow other tasks to run before connection attempt + await trio.lowlevel.checkpoint() + + # Check available addresses before attempting connection + available_addrs = self.peerstore.addrs(peer_id) + logger.info(f"📍 Available addresses for {peer_id}: {len(available_addrs)} addresses") + + if not available_addrs: + logger.error(f"❌ No addresses available for {peer_id} - cannot connect") + return + try: # Log connection attempt for monitoring and debugging logger.debug(f"Attempting to connect to {peer_id}") # Use swarm.dial_peer to establish connection await self.swarm.dial_peer(peer_id) - + + # Allow other tasks to run after dial attempt + await trio.lowlevel.checkpoint() + # Post-connection validation: Verify connection was actually established - # swarm.dial_peer may succeed but connection might not be in - # connections dict - # This can happen due to race conditions or connection cleanup if peer_id in self.swarm.connections: - # Connection successfully established and registered - # Log success at INFO level for operational visibility logger.info(f"Connected to {peer_id}") else: - # Edge case: dial succeeded but connection not found - # This indicates a potential issue with connection management logger.warning(f"Dial succeeded but connection not found for {peer_id}") except SwarmException as e: @@ -135,5 +202,4 @@ class BootstrapDiscovery: except Exception as e: # Handle unexpected errors that aren't swarm-specific logger.error(f"Unexpected error connecting to {peer_id}: {e}") - # Re-raise to allow caller to handle if needed - raise + raise \ No newline at end of file From 5a2fca32a0926d4b954d5406777b25d233e3fb4d Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Fri, 22 Aug 2025 01:44:53 +0530 Subject: [PATCH 3/8] Add ip4 and tcp address resolution and fallback connection attempts --- libp2p/discovery/bootstrap/bootstrap.py | 431 ++++++++++++++++++++---- 1 file changed, 371 insertions(+), 60 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index 9eaaaa07..e38e5eeb 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -1,11 +1,10 @@ import logging -import trio from multiaddr import Multiaddr from multiaddr.resolvers import DNSResolver +import trio from libp2p.abc import ID, INetworkService, PeerInfo -from libp2p.discovery.bootstrap.utils import validate_bootstrap_addresses from libp2p.discovery.events.peerDiscovery import peerDiscovery from libp2p.network.exceptions import SwarmException from libp2p.peer.peerinfo import info_from_p2p_addr @@ -17,27 +16,39 @@ resolver = DNSResolver() class BootstrapDiscovery: """ Bootstrap-based peer discovery for py-libp2p. - + Uses Trio nurseries for parallel address resolution and connection attempts. Connects to predefined bootstrap peers and adds them to peerstore. """ def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]): + """ + Initialize BootstrapDiscovery. + + Args: + swarm: The network service (swarm) instance + bootstrap_addrs: List of bootstrap peer multiaddresses + + Note: Connection maintenance is always enabled to ensure reliable connectivity. + + """ self.swarm = swarm self.peerstore = swarm.peerstore self.bootstrap_addrs = bootstrap_addrs or [] self.discovered_peers: set[str] = set() + self.connected_bootstrap_peers: set[ID] = set() + self._disconnect_monitor_running = False async def start(self) -> None: """Process bootstrap addresses and emit peer discovery events in parallel.""" logger.info( - f"🚀 Starting bootstrap discovery with " + f"Starting bootstrap discovery with " f"{len(self.bootstrap_addrs)} bootstrap addresses" ) - + # Show all bootstrap addresses being processed for i, addr in enumerate(self.bootstrap_addrs): - logger.info(f"{i+1}. {addr}") + logger.info(f"{i + 1}. {addr}") # Allow other tasks to run await trio.lowlevel.checkpoint() @@ -50,30 +61,56 @@ class BootstrapDiscovery: await trio.lowlevel.checkpoint() # Use Trio nursery for PARALLEL address processing - async with trio.open_nursery() as nursery: - logger.info(f"Starting {len(self.bootstrap_addrs)} parallel address processing tasks") - - # Start all bootstrap address processing tasks in parallel - for addr_str in self.bootstrap_addrs: - logger.info(f"Starting parallel task for: {addr_str}") - nursery.start_soon(self._process_bootstrap_addr_safe, addr_str) - - # The nursery will wait for all address processing tasks to complete - logger.info("⏳ Nursery active - waiting for address processing tasks to complete") - - logger.info("✅ Bootstrap discovery startup complete - all tasks finished") + try: + async with trio.open_nursery() as nursery: + logger.info( + f"Starting {len(self.bootstrap_addrs)} parallel address " + f"processing tasks" + ) + + # Start all bootstrap address processing tasks in parallel + for addr_str in self.bootstrap_addrs: + logger.info(f"Starting parallel task for: {addr_str}") + nursery.start_soon(self._process_bootstrap_addr_safe, addr_str) + + # The nursery will wait for all address processing tasks to complete + logger.info( + "Nursery active - waiting for address processing tasks to complete" + ) + + except trio.Cancelled: + logger.info("Bootstrap address processing cancelled - cleaning up tasks") + raise + except Exception as e: + logger.error(f"Bootstrap address processing failed: {e}") + raise + + logger.info("Bootstrap discovery startup complete - all tasks finished") + + # Always start disconnect monitoring for reliable connectivity + if not self._disconnect_monitor_running: + trio.lowlevel.spawn_system_task(self._monitor_disconnections) def stop(self) -> None: - """Clean up bootstrap discovery resources.""" - logger.debug("Stopping bootstrap discovery") + """Clean up bootstrap discovery resources and stop all background tasks.""" + logger.info("Stopping bootstrap discovery and cleaning up tasks") + + # Clear discovered peers self.discovered_peers.clear() + self.connected_bootstrap_peers.clear() + + # Mark disconnect monitor as stopped + self._disconnect_monitor_running = False + + logger.debug("Bootstrap discovery cleanup completed") async def _process_bootstrap_addr_safe(self, addr_str: str) -> None: """Safely process a bootstrap address with exception handling.""" try: await self._process_bootstrap_addr(addr_str) except Exception as e: - logger.debug(f"Failed to process bootstrap address {addr_str}: {e}") + logger.warning(f"Failed to process bootstrap address {addr_str}: {e}") + # Ensure task cleanup and continue processing other addresses async def _process_bootstrap_addr(self, addr_str: str) -> None: """Convert string address to PeerInfo and add to peerstore.""" @@ -82,19 +119,19 @@ class BootstrapDiscovery: except Exception as e: logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") return - + if self.is_dns_addr(multiaddr): # Allow other tasks to run during DNS resolution await trio.lowlevel.checkpoint() - + resolved_addrs = await resolver.resolve(multiaddr) if resolved_addrs is None: logger.warning(f"DNS resolution returned None for: {addr_str}") return - + # Allow other tasks to run after DNS resolution await trio.lowlevel.checkpoint() - + peer_id_str = multiaddr.get_peer_id() if peer_id_str is None: logger.warning(f"Missing peer ID in DNS address: {addr_str}") @@ -115,24 +152,70 @@ class BootstrapDiscovery: return any(protocol.name == "dnsaddr" for protocol in addr.protocols()) async def add_addr(self, peer_info: PeerInfo) -> None: - """Add a peer to the peerstore, emit discovery event, and attempt connection in parallel.""" - logger.info(f"📥 Adding peer to peerstore: {peer_info.peer_id}") - logger.info(f"📍 Total addresses received: {len(peer_info.addrs)}") - + """ + Add a peer to the peerstore, emit discovery event, + and attempt connection in parallel. + """ + logger.info(f"Adding peer to peerstore: {peer_info.peer_id}") + logger.info(f"Total addresses received: {len(peer_info.addrs)}") + # Skip if it's our own peer if peer_info.peer_id == self.swarm.get_peer_id(): logger.debug(f"Skipping own peer ID: {peer_info.peer_id}") return - - # Always add addresses to peerstore with TTL=0 (no expiration) - self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 0) + + # Filter addresses to only include IPv4+TCP (restrict dialing attempts) + ipv4_tcp_addrs = [] + filtered_out_addrs = [] + + for addr in peer_info.addrs: + if self._is_ipv4_tcp_addr(addr): + ipv4_tcp_addrs.append(addr) + else: + filtered_out_addrs.append(addr) + + # Log filtering results with fallback strategy details + logger.info(f"Address filtering for {peer_info.peer_id}:") + logger.info( + f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)} " + f"(will be tried in sequence for fallback)" + ) + logger.info(f"Filtered out: {len(filtered_out_addrs)} (unsupported protocols)") + + # Show filtered addresses for debugging + if filtered_out_addrs: + for addr in filtered_out_addrs: + logger.debug(f"Filtered: {addr}") + + # Show addresses that will be used for fallback + if ipv4_tcp_addrs: + logger.debug("Addresses for fallback attempts:") + for i, addr in enumerate(ipv4_tcp_addrs, 1): + logger.debug(f" Fallback {i}: {addr}") + + # Skip peer if no IPv4+TCP addresses available + if not ipv4_tcp_addrs: + logger.warning( + f"❌ No IPv4+TCP addresses for {peer_info.peer_id} - " + f"skipping connection attempts" + ) + return + + logger.info( + f"Will attempt connection with automatic fallback through " + f"{len(ipv4_tcp_addrs)} IPv4+TCP addresses" + ) + + # Add only IPv4+TCP addresses to peerstore + # (restrict dialing to supported protocols) + self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, 0) # Allow other tasks to run after adding to peerstore await trio.lowlevel.checkpoint() # Verify addresses were added stored_addrs = self.peerstore.addrs(peer_info.peer_id) - logger.info(f"✅ Addresses stored in peerstore: {len(stored_addrs)} addresses") + logger.info(f"Addresses stored in peerstore: {len(stored_addrs)} addresses") # Only emit discovery event if this is the first time we see this peer peer_id_str = str(peer_info.peer_id) @@ -143,24 +226,55 @@ class BootstrapDiscovery: peerDiscovery.emit_peer_discovered(peer_info) logger.debug(f"Peer discovered: {peer_info.peer_id}") - # Use nursery for parallel connection attempt - async with trio.open_nursery() as connection_nursery: - logger.info(f" 🔌 Starting parallel connection attempt...") - connection_nursery.start_soon(self._connect_to_peer, peer_info.peer_id) - + # Use nursery for parallel connection attempt (non-blocking) + try: + async with trio.open_nursery() as connection_nursery: + logger.info("Starting parallel connection attempt...") + connection_nursery.start_soon( + self._connect_to_peer, peer_info.peer_id + ) + except trio.Cancelled: + logger.debug(f"Connection attempt cancelled for {peer_info.peer_id}") + raise + except Exception as e: + logger.warning( + f"Connection nursery failed for {peer_info.peer_id}: {e}" + ) + else: - logger.debug(f"🔄 Additional addresses added for existing peer: {peer_info.peer_id}") + logger.debug( + f"Additional addresses added for existing peer: {peer_info.peer_id}" + ) # Even for existing peers, try to connect if not already connected if peer_info.peer_id not in self.swarm.connections: - logger.info(f"🔌 Starting parallel connection attempt for existing peer...") - # Use nursery for parallel connection - async with trio.open_nursery() as connection_nursery: - connection_nursery.start_soon(self._connect_to_peer, peer_info.peer_id) + logger.info("Starting parallel connection attempt for existing peer...") + # Use nursery for parallel connection attempt (non-blocking) + try: + async with trio.open_nursery() as connection_nursery: + connection_nursery.start_soon( + self._connect_to_peer, peer_info.peer_id + ) + except trio.Cancelled: + logger.debug( + f"Connection attempt cancelled for existing peer " + f"{peer_info.peer_id}" + ) + raise + except Exception as e: + logger.warning( + f"Connection nursery failed for existing peer " + f"{peer_info.peer_id}: {e}" + ) async def _connect_to_peer(self, peer_id: ID) -> None: - """Attempt to establish a connection to a peer using swarm.dial_peer.""" - logger.info(f"🔌 Connection attempt for peer: {peer_id}") - + """ + Attempt to establish a connection to a peer with fallback logic. + + Uses swarm.dial_peer which tries all available addresses for the peer + in sequence until one succeeds or all fail. + """ + logger.info(f"Connection attempt for peer: {peer_id}") + # Pre-connection validation: Check if already connected if peer_id in self.swarm.connections: logger.debug( @@ -173,33 +287,230 @@ class BootstrapDiscovery: # Check available addresses before attempting connection available_addrs = self.peerstore.addrs(peer_id) - logger.info(f"📍 Available addresses for {peer_id}: {len(available_addrs)} addresses") - + logger.info( + f"Available addresses for {peer_id}: {len(available_addrs)} addresses" + ) + + # Log all available addresses for transparency + for i, addr in enumerate(available_addrs, 1): + logger.debug(f" Address {i}: {addr}") + if not available_addrs: logger.error(f"❌ No addresses available for {peer_id} - cannot connect") return - try: - # Log connection attempt for monitoring and debugging - logger.debug(f"Attempting to connect to {peer_id}") + # Record start time for connection attempt monitoring + connection_start_time = trio.current_time() + + try: + # Log connection attempt with fallback details + logger.info( + f"Attempting connection to {peer_id} (will try {len(available_addrs)} " + f"addresses with automatic fallback)" + ) + + # Log each address that will be attempted + for i, addr in enumerate(available_addrs, 1): + logger.debug(f"Fallback address {i}: {addr}") + + # Use swarm.dial_peer - this automatically implements fallback logic: + # - Tries each address in sequence until one succeeds + # - Collects exceptions from failed attempts + # - Raises SwarmException with MultiError if all attempts fail + connection = await self.swarm.dial_peer(peer_id) + + # Calculate connection time + connection_time = trio.current_time() - connection_start_time - # Use swarm.dial_peer to establish connection - await self.swarm.dial_peer(peer_id) - # Allow other tasks to run after dial attempt await trio.lowlevel.checkpoint() - + # Post-connection validation: Verify connection was actually established if peer_id in self.swarm.connections: - logger.info(f"Connected to {peer_id}") + logger.info(f"✅ Connected to {peer_id} (took {connection_time:.2f}s)") + + # Track this as a connected bootstrap peer + self.connected_bootstrap_peers.add(peer_id) + + # Log which address was successful (if available) + if hasattr(connection, "get_transport_addresses"): + successful_addrs = connection.get_transport_addresses() + if successful_addrs: + logger.debug(f"Successful address: {successful_addrs[0]}") else: logger.warning(f"Dial succeeded but connection not found for {peer_id}") except SwarmException as e: - # Handle swarm-level connection errors - logger.warning(f"Failed to connect to {peer_id}: {e}") + # Calculate failed connection time + failed_connection_time = trio.current_time() - connection_start_time + + # Enhanced error logging with fallback details + error_msg = str(e) + if "no addresses established a successful connection" in error_msg: + logger.warning( + f"❌ Failed to connect to {peer_id} after trying all " + f"{len(available_addrs)} addresses " + f"(took {failed_connection_time:.2f}s) - " + f"all fallback attempts failed" + ) + # Log individual address failures if this is a MultiError + if ( + e.__cause__ is not None + and hasattr(e.__cause__, "exceptions") + and getattr(e.__cause__, "exceptions", None) is not None + ): + exceptions_list = getattr(e.__cause__, "exceptions") + logger.info("📋 Individual address failure details:") + for i, addr_exception in enumerate(exceptions_list, 1): + logger.info(f"Address {i}: {addr_exception}") + # Also log the actual address that failed + if i <= len(available_addrs): + logger.info(f"Failed address: {available_addrs[i - 1]}") + else: + logger.warning("No detailed exception information available") + else: + logger.warning( + f"❌ Failed to connect to {peer_id}: {e} " + f"(took {failed_connection_time:.2f}s)" + ) except Exception as e: # Handle unexpected errors that aren't swarm-specific - logger.error(f"Unexpected error connecting to {peer_id}: {e}") - raise \ No newline at end of file + failed_connection_time = trio.current_time() - connection_start_time + logger.error( + f"❌ Unexpected error connecting to {peer_id}: " + f"{e} (took {failed_connection_time:.2f}s)" + ) + # Don't re-raise to prevent killing the nursery and other parallel tasks + logger.debug("Continuing with other parallel connection attempts") + + async def _monitor_disconnections(self) -> None: + """ + Monitor bootstrap peer connections and immediately reconnect when they drop. + + This runs as a background task that efficiently detects + disconnections in real-time. + """ + self._disconnect_monitor_running = True + logger.info( + "Disconnect monitor started - will reconnect " + "immediately when connections drop" + ) + + try: + while True: + # Check for disconnections more frequently but efficiently + await trio.sleep(1.0) # Check every second for responsiveness + + # Check which bootstrap peers are no longer connected + disconnected_peers = [] + for peer_id in list(self.connected_bootstrap_peers): + if peer_id not in self.swarm.connections: + disconnected_peers.append(peer_id) + self.connected_bootstrap_peers.discard(peer_id) + logger.info( + f"⚠️ Detected disconnection from bootstrap peer: {peer_id}" + ) + + # Immediately reconnect to disconnected peers + if disconnected_peers: + logger.info( + f"🔄 Immediately reconnecting to {len(disconnected_peers)} " + f"disconnected bootstrap peer(s)" + ) + + # Reconnect in parallel for better performance + try: + async with trio.open_nursery() as reconnect_nursery: + for peer_id in disconnected_peers: + logger.info(f"🔌 Reconnecting to {peer_id}") + reconnect_nursery.start_soon( + self._reconnect_to_peer, peer_id + ) + except trio.Cancelled: + logger.debug("Reconnection nursery cancelled") + raise + except Exception as e: + logger.warning(f"Reconnection nursery failed: {e}") + + except trio.Cancelled: + logger.info("Disconnect monitor stopped - task cancelled") + except Exception as e: + logger.error(f"Unexpected error in disconnect monitor: {e}") + finally: + self._disconnect_monitor_running = False + logger.debug("Disconnect monitor task cleanup completed") + + async def _reconnect_to_peer(self, peer_id: ID) -> None: + """ + Reconnect to a specific bootstrap peer with backoff on failure. + + This method includes simple backoff logic to avoid overwhelming + peers that may be temporarily unavailable. + """ + max_attempts = 3 + base_delay = 1.0 + + try: + for attempt in range(1, max_attempts + 1): + try: + logger.debug( + f"Reconnection attempt {attempt}/{max_attempts} for {peer_id}" + ) + await self._connect_to_peer(peer_id) + + # If we get here, connection was successful + if peer_id in self.swarm.connections: + logger.info( + f"✅ Successfully reconnected to {peer_id} on " + f"attempt {attempt}" + ) + return + + except Exception as e: + logger.debug( + f"Reconnection attempt {attempt} failed for {peer_id}: {e}" + ) + + # Wait before next attempt (exponential backoff) + if attempt < max_attempts: + delay = base_delay * (2 ** (attempt - 1)) # 1s, 2s, 4s + logger.debug( + f"Waiting {delay}s before next reconnection attempt" + ) + await trio.sleep(delay) + + logger.warning( + f"❌ Failed to reconnect to {peer_id} after {max_attempts} attempts" + ) + + except Exception as e: + # Catch any unexpected errors to prevent crashing the nursery + logger.error(f"❌ Unexpected error during reconnection to {peer_id}: {e}") + # Don't re-raise to keep other parallel reconnection tasks running + + def _is_ipv4_tcp_addr(self, addr: Multiaddr) -> bool: + """ + Check if address is IPv4 with TCP protocol only. + + This restricts dialing attempts to addresses that conform to IPv4+TCP, + filtering out IPv6, UDP, QUIC, WebSocket, and other unsupported protocols. + """ + try: + protocols = addr.protocols() + + # Must have IPv4 protocol + has_ipv4 = any(p.name == "ip4" for p in protocols) + if not has_ipv4: + return False + + # Must have TCP protocol + has_tcp = any(p.name == "tcp" for p in protocols) + if not has_tcp: + return False + + return True + + except Exception: + # If we can't parse the address, don't use it + return False From c940dac1e6749e5dd4b745bc8aaf6e6755b3624d Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Tue, 26 Aug 2025 01:41:26 +0530 Subject: [PATCH 4/8] simplify bootstrap discovery with optimized timeouts --- libp2p/discovery/bootstrap/bootstrap.py | 256 +++++++++--------------- 1 file changed, 90 insertions(+), 166 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index e38e5eeb..c1a6cbbc 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -17,8 +17,8 @@ class BootstrapDiscovery: """ Bootstrap-based peer discovery for py-libp2p. - Uses Trio nurseries for parallel address resolution and connection attempts. - Connects to predefined bootstrap peers and adds them to peerstore. + Processes bootstrap addresses in parallel and attempts initial connections. + Adds discovered peers to peerstore for network bootstrapping. """ def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]): @@ -29,15 +29,15 @@ class BootstrapDiscovery: swarm: The network service (swarm) instance bootstrap_addrs: List of bootstrap peer multiaddresses - Note: Connection maintenance is always enabled to ensure reliable connectivity. - """ self.swarm = swarm self.peerstore = swarm.peerstore self.bootstrap_addrs = bootstrap_addrs or [] self.discovered_peers: set[str] = set() - self.connected_bootstrap_peers: set[ID] = set() - self._disconnect_monitor_running = False + self.connection_timeout: int = 10 + self.connected_peers: set[ID] = ( + set() + ) # Track connected peers for drop detection async def start(self) -> None: """Process bootstrap addresses and emit peer discovery events in parallel.""" @@ -71,7 +71,7 @@ class BootstrapDiscovery: # Start all bootstrap address processing tasks in parallel for addr_str in self.bootstrap_addrs: logger.info(f"Starting parallel task for: {addr_str}") - nursery.start_soon(self._process_bootstrap_addr_safe, addr_str) + nursery.start_soon(self._process_bootstrap_addr, addr_str) # The nursery will wait for all address processing tasks to complete logger.info( @@ -87,20 +87,13 @@ class BootstrapDiscovery: logger.info("Bootstrap discovery startup complete - all tasks finished") - # Always start disconnect monitoring for reliable connectivity - if not self._disconnect_monitor_running: - trio.lowlevel.spawn_system_task(self._monitor_disconnections) - def stop(self) -> None: - """Clean up bootstrap discovery resources and stop all background tasks.""" + """Clean up bootstrap discovery resources.""" logger.info("Stopping bootstrap discovery and cleaning up tasks") # Clear discovered peers self.discovered_peers.clear() - self.connected_bootstrap_peers.clear() - - # Mark disconnect monitor as stopped - self._disconnect_monitor_running = False + self.connected_peers.clear() logger.debug("Bootstrap discovery cleanup completed") @@ -164,7 +157,7 @@ class BootstrapDiscovery: logger.debug(f"Skipping own peer ID: {peer_info.peer_id}") return - # Filter addresses to only include IPv4+TCP (restrict dialing attempts) + # Filter addresses to only include IPv4+TCP (only supported protocol) ipv4_tcp_addrs = [] filtered_out_addrs = [] @@ -174,12 +167,9 @@ class BootstrapDiscovery: else: filtered_out_addrs.append(addr) - # Log filtering results with fallback strategy details + # Log filtering results logger.info(f"Address filtering for {peer_info.peer_id}:") - logger.info( - f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)} " - f"(will be tried in sequence for fallback)" - ) + logger.info(f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)}") logger.info(f"Filtered out: {len(filtered_out_addrs)} (unsupported protocols)") # Show filtered addresses for debugging @@ -187,11 +177,11 @@ class BootstrapDiscovery: for addr in filtered_out_addrs: logger.debug(f"Filtered: {addr}") - # Show addresses that will be used for fallback + # Show addresses that will be used if ipv4_tcp_addrs: - logger.debug("Addresses for fallback attempts:") + logger.debug("Usable addresses:") for i, addr in enumerate(ipv4_tcp_addrs, 1): - logger.debug(f" Fallback {i}: {addr}") + logger.debug(f" Address {i}: {addr}") # Skip peer if no IPv4+TCP addresses available if not ipv4_tcp_addrs: @@ -202,12 +192,10 @@ class BootstrapDiscovery: return logger.info( - f"Will attempt connection with automatic fallback through " - f"{len(ipv4_tcp_addrs)} IPv4+TCP addresses" + f"Will attempt connection using {len(ipv4_tcp_addrs)} IPv4+TCP addresses" ) # Add only IPv4+TCP addresses to peerstore - # (restrict dialing to supported protocols) self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, 0) # Allow other tasks to run after adding to peerstore @@ -268,10 +256,10 @@ class BootstrapDiscovery: async def _connect_to_peer(self, peer_id: ID) -> None: """ - Attempt to establish a connection to a peer with fallback logic. + Attempt to establish a connection to a peer with timeout. - Uses swarm.dial_peer which tries all available addresses for the peer - in sequence until one succeeds or all fail. + Uses swarm.dial_peer to connect using addresses stored in peerstore. + Times out after connection_timeout seconds to prevent hanging. """ logger.info(f"Connection attempt for peer: {peer_id}") @@ -303,55 +291,64 @@ class BootstrapDiscovery: connection_start_time = trio.current_time() try: - # Log connection attempt with fallback details - logger.info( - f"Attempting connection to {peer_id} (will try {len(available_addrs)} " - f"addresses with automatic fallback)" + with trio.move_on_after(self.connection_timeout): + # Log connection attempt + logger.info( + f"Attempting connection to {peer_id} using " + f"{len(available_addrs)} addresses" + ) + + # Log each address that will be attempted + for i, addr in enumerate(available_addrs, 1): + logger.debug(f"Address {i}: {addr}") + + # Use swarm.dial_peer to connect using stored addresses + connection = await self.swarm.dial_peer(peer_id) + + # Calculate connection time + connection_time = trio.current_time() - connection_start_time + + # Allow other tasks to run after dial attempt + await trio.lowlevel.checkpoint() + + # Post-connection validation: Verify connection was actually established + if peer_id in self.swarm.connections: + logger.info( + f"✅ Connected to {peer_id} (took {connection_time:.2f}s)" + ) + + # Track this connection for drop monitoring + self.connected_peers.add(peer_id) + + # Start monitoring this specific connection for drops + trio.lowlevel.spawn_system_task( + self._monitor_peer_connection, peer_id + ) + + # Log which address was successful (if available) + if hasattr(connection, "get_transport_addresses"): + successful_addrs = connection.get_transport_addresses() + if successful_addrs: + logger.debug(f"Successful address: {successful_addrs[0]}") + else: + logger.warning( + f"Dial succeeded but connection not found for {peer_id}" + ) + except trio.TooSlowError: + logger.warning( + f"❌ Connection to {peer_id} timed out after {self.connection_timeout}s" ) - - # Log each address that will be attempted - for i, addr in enumerate(available_addrs, 1): - logger.debug(f"Fallback address {i}: {addr}") - - # Use swarm.dial_peer - this automatically implements fallback logic: - # - Tries each address in sequence until one succeeds - # - Collects exceptions from failed attempts - # - Raises SwarmException with MultiError if all attempts fail - connection = await self.swarm.dial_peer(peer_id) - - # Calculate connection time - connection_time = trio.current_time() - connection_start_time - - # Allow other tasks to run after dial attempt - await trio.lowlevel.checkpoint() - - # Post-connection validation: Verify connection was actually established - if peer_id in self.swarm.connections: - logger.info(f"✅ Connected to {peer_id} (took {connection_time:.2f}s)") - - # Track this as a connected bootstrap peer - self.connected_bootstrap_peers.add(peer_id) - - # Log which address was successful (if available) - if hasattr(connection, "get_transport_addresses"): - successful_addrs = connection.get_transport_addresses() - if successful_addrs: - logger.debug(f"Successful address: {successful_addrs[0]}") - else: - logger.warning(f"Dial succeeded but connection not found for {peer_id}") - except SwarmException as e: # Calculate failed connection time failed_connection_time = trio.current_time() - connection_start_time - # Enhanced error logging with fallback details + # Enhanced error logging error_msg = str(e) if "no addresses established a successful connection" in error_msg: logger.warning( f"❌ Failed to connect to {peer_id} after trying all " f"{len(available_addrs)} addresses " - f"(took {failed_connection_time:.2f}s) - " - f"all fallback attempts failed" + f"(took {failed_connection_time:.2f}s)" ) # Log individual address failures if this is a MultiError if ( @@ -384,117 +381,44 @@ class BootstrapDiscovery: # Don't re-raise to prevent killing the nursery and other parallel tasks logger.debug("Continuing with other parallel connection attempts") - async def _monitor_disconnections(self) -> None: + async def _monitor_peer_connection(self, peer_id: ID) -> None: """ - Monitor bootstrap peer connections and immediately reconnect when they drop. + Monitor a specific peer connection for drops using event-driven detection. - This runs as a background task that efficiently detects - disconnections in real-time. + Waits for the connection to be removed from swarm.connections, which + happens when error 4101 or other connection errors occur. """ - self._disconnect_monitor_running = True - logger.info( - "Disconnect monitor started - will reconnect " - "immediately when connections drop" - ) + logger.debug(f"🔍 Started monitoring connection to {peer_id}") try: - while True: - # Check for disconnections more frequently but efficiently - await trio.sleep(1.0) # Check every second for responsiveness + # Wait for the connection to disappear (event-driven) + while peer_id in self.swarm.connections: + await trio.sleep(0.1) # Small sleep to yield control - # Check which bootstrap peers are no longer connected - disconnected_peers = [] - for peer_id in list(self.connected_bootstrap_peers): - if peer_id not in self.swarm.connections: - disconnected_peers.append(peer_id) - self.connected_bootstrap_peers.discard(peer_id) - logger.info( - f"⚠️ Detected disconnection from bootstrap peer: {peer_id}" - ) + # Connection was dropped - log it immediately + if peer_id in self.connected_peers: + self.connected_peers.discard(peer_id) + logger.warning( + f"📡 Connection to {peer_id} was dropped! (detected event-driven)" + ) - # Immediately reconnect to disconnected peers - if disconnected_peers: - logger.info( - f"🔄 Immediately reconnecting to {len(disconnected_peers)} " - f"disconnected bootstrap peer(s)" - ) - - # Reconnect in parallel for better performance - try: - async with trio.open_nursery() as reconnect_nursery: - for peer_id in disconnected_peers: - logger.info(f"🔌 Reconnecting to {peer_id}") - reconnect_nursery.start_soon( - self._reconnect_to_peer, peer_id - ) - except trio.Cancelled: - logger.debug("Reconnection nursery cancelled") - raise - except Exception as e: - logger.warning(f"Reconnection nursery failed: {e}") + # Log current connection count + remaining_connections = len(self.connected_peers) + logger.info(f"📊 Remaining connected peers: {remaining_connections}") except trio.Cancelled: - logger.info("Disconnect monitor stopped - task cancelled") + logger.debug(f"Connection monitoring for {peer_id} stopped") except Exception as e: - logger.error(f"Unexpected error in disconnect monitor: {e}") - finally: - self._disconnect_monitor_running = False - logger.debug("Disconnect monitor task cleanup completed") - - async def _reconnect_to_peer(self, peer_id: ID) -> None: - """ - Reconnect to a specific bootstrap peer with backoff on failure. - - This method includes simple backoff logic to avoid overwhelming - peers that may be temporarily unavailable. - """ - max_attempts = 3 - base_delay = 1.0 - - try: - for attempt in range(1, max_attempts + 1): - try: - logger.debug( - f"Reconnection attempt {attempt}/{max_attempts} for {peer_id}" - ) - await self._connect_to_peer(peer_id) - - # If we get here, connection was successful - if peer_id in self.swarm.connections: - logger.info( - f"✅ Successfully reconnected to {peer_id} on " - f"attempt {attempt}" - ) - return - - except Exception as e: - logger.debug( - f"Reconnection attempt {attempt} failed for {peer_id}: {e}" - ) - - # Wait before next attempt (exponential backoff) - if attempt < max_attempts: - delay = base_delay * (2 ** (attempt - 1)) # 1s, 2s, 4s - logger.debug( - f"Waiting {delay}s before next reconnection attempt" - ) - await trio.sleep(delay) - - logger.warning( - f"❌ Failed to reconnect to {peer_id} after {max_attempts} attempts" - ) - - except Exception as e: - # Catch any unexpected errors to prevent crashing the nursery - logger.error(f"❌ Unexpected error during reconnection to {peer_id}: {e}") - # Don't re-raise to keep other parallel reconnection tasks running + logger.error(f"Error monitoring connection to {peer_id}: {e}") + # Clean up tracking on error + self.connected_peers.discard(peer_id) def _is_ipv4_tcp_addr(self, addr: Multiaddr) -> bool: """ Check if address is IPv4 with TCP protocol only. - This restricts dialing attempts to addresses that conform to IPv4+TCP, - filtering out IPv6, UDP, QUIC, WebSocket, and other unsupported protocols. + Filters out IPv6, UDP, QUIC, WebSocket, and other unsupported protocols. + Only IPv4+TCP addresses are supported by the current transport. """ try: protocols = addr.protocols() From 3d1c36419c84e9694496a9b689f717be7d410de7 Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Fri, 29 Aug 2025 02:05:34 +0530 Subject: [PATCH 5/8] remove checkpoints, resolve logs, ttl and fix minor issues --- libp2p/discovery/bootstrap/bootstrap.py | 212 ++++++------------------ 1 file changed, 54 insertions(+), 158 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index c1a6cbbc..9bf4ef52 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -5,20 +5,19 @@ from multiaddr.resolvers import DNSResolver import trio from libp2p.abc import ID, INetworkService, PeerInfo +from libp2p.discovery.bootstrap.utils import validate_bootstrap_addresses from libp2p.discovery.events.peerDiscovery import peerDiscovery -from libp2p.network.exceptions import SwarmException from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.peer.peerstore import PERMANENT_ADDR_TTL +from libp2p.network.exceptions import SwarmException logger = logging.getLogger("libp2p.discovery.bootstrap") resolver = DNSResolver() - class BootstrapDiscovery: """ Bootstrap-based peer discovery for py-libp2p. - - Processes bootstrap addresses in parallel and attempts initial connections. - Adds discovered peers to peerstore for network bootstrapping. + Connects to predefined bootstrap peers and adds them to peerstore. """ def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]): @@ -35,9 +34,6 @@ class BootstrapDiscovery: self.bootstrap_addrs = bootstrap_addrs or [] self.discovered_peers: set[str] = set() self.connection_timeout: int = 10 - self.connected_peers: set[ID] = ( - set() - ) # Track connected peers for drop detection async def start(self) -> None: """Process bootstrap addresses and emit peer discovery events in parallel.""" @@ -48,38 +44,32 @@ class BootstrapDiscovery: # Show all bootstrap addresses being processed for i, addr in enumerate(self.bootstrap_addrs): - logger.info(f"{i + 1}. {addr}") - - # Allow other tasks to run - await trio.lowlevel.checkpoint() + logger.debug(f"{i + 1}. {addr}") # Validate and filter bootstrap addresses - # self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs) + self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs) logger.info(f"Valid addresses after validation: {len(self.bootstrap_addrs)}") - # Allow other tasks to run after validation - await trio.lowlevel.checkpoint() - # Use Trio nursery for PARALLEL address processing try: async with trio.open_nursery() as nursery: - logger.info( + logger.debug( f"Starting {len(self.bootstrap_addrs)} parallel address " f"processing tasks" ) # Start all bootstrap address processing tasks in parallel for addr_str in self.bootstrap_addrs: - logger.info(f"Starting parallel task for: {addr_str}") + logger.debug(f"Starting parallel task for: {addr_str}") nursery.start_soon(self._process_bootstrap_addr, addr_str) # The nursery will wait for all address processing tasks to complete - logger.info( + logger.debug( "Nursery active - waiting for address processing tasks to complete" ) except trio.Cancelled: - logger.info("Bootstrap address processing cancelled - cleaning up tasks") + logger.debug("Bootstrap address processing cancelled - cleaning up tasks") raise except Exception as e: logger.error(f"Bootstrap address processing failed: {e}") @@ -93,52 +83,40 @@ class BootstrapDiscovery: # Clear discovered peers self.discovered_peers.clear() - self.connected_peers.clear() logger.debug("Bootstrap discovery cleanup completed") - async def _process_bootstrap_addr_safe(self, addr_str: str) -> None: - """Safely process a bootstrap address with exception handling.""" - try: - await self._process_bootstrap_addr(addr_str) - except Exception as e: - logger.warning(f"Failed to process bootstrap address {addr_str}: {e}") - # Ensure task cleanup and continue processing other addresses - async def _process_bootstrap_addr(self, addr_str: str) -> None: """Convert string address to PeerInfo and add to peerstore.""" try: - multiaddr = Multiaddr(addr_str) + try: + multiaddr = Multiaddr(addr_str) + except Exception as e: + logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") + return + + if self.is_dns_addr(multiaddr): + resolved_addrs = await resolver.resolve(multiaddr) + if resolved_addrs is None: + logger.warning(f"DNS resolution returned None for: {addr_str}") + return + + peer_id_str = multiaddr.get_peer_id() + if peer_id_str is None: + logger.warning(f"Missing peer ID in DNS address: {addr_str}") + return + peer_id = ID.from_base58(peer_id_str) + addrs = [addr for addr in resolved_addrs] + if not addrs: + logger.warning(f"No addresses resolved for DNS address: {addr_str}") + return + peer_info = PeerInfo(peer_id, addrs) + await self.add_addr(peer_info) + else: + peer_info = info_from_p2p_addr(multiaddr) + await self.add_addr(peer_info) except Exception as e: - logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") - return - - if self.is_dns_addr(multiaddr): - # Allow other tasks to run during DNS resolution - await trio.lowlevel.checkpoint() - - resolved_addrs = await resolver.resolve(multiaddr) - if resolved_addrs is None: - logger.warning(f"DNS resolution returned None for: {addr_str}") - return - - # Allow other tasks to run after DNS resolution - await trio.lowlevel.checkpoint() - - peer_id_str = multiaddr.get_peer_id() - if peer_id_str is None: - logger.warning(f"Missing peer ID in DNS address: {addr_str}") - return - peer_id = ID.from_base58(peer_id_str) - addrs = [addr for addr in resolved_addrs] - if not addrs: - logger.warning(f"No addresses resolved for DNS address: {addr_str}") - return - peer_info = PeerInfo(peer_id, addrs) - await self.add_addr(peer_info) - else: - peer_info = info_from_p2p_addr(multiaddr) - await self.add_addr(peer_info) + logger.warning(f"Failed to process bootstrap address {addr_str}: {e}") def is_dns_addr(self, addr: Multiaddr) -> bool: """Check if the address is a DNS address.""" @@ -149,8 +127,9 @@ class BootstrapDiscovery: Add a peer to the peerstore, emit discovery event, and attempt connection in parallel. """ - logger.info(f"Adding peer to peerstore: {peer_info.peer_id}") - logger.info(f"Total addresses received: {len(peer_info.addrs)}") + logger.debug( + f"Adding peer {peer_info.peer_id} with {len(peer_info.addrs)} addresses" + ) # Skip if it's our own peer if peer_info.peer_id == self.swarm.get_peer_id(): @@ -168,20 +147,10 @@ class BootstrapDiscovery: filtered_out_addrs.append(addr) # Log filtering results - logger.info(f"Address filtering for {peer_info.peer_id}:") - logger.info(f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)}") - logger.info(f"Filtered out: {len(filtered_out_addrs)} (unsupported protocols)") - - # Show filtered addresses for debugging - if filtered_out_addrs: - for addr in filtered_out_addrs: - logger.debug(f"Filtered: {addr}") - - # Show addresses that will be used - if ipv4_tcp_addrs: - logger.debug("Usable addresses:") - for i, addr in enumerate(ipv4_tcp_addrs, 1): - logger.debug(f" Address {i}: {addr}") + logger.debug( + f"Address filtering for {peer_info.peer_id}: " + f"{len(ipv4_tcp_addrs)} IPv4+TCP, {len(filtered_out_addrs)} filtered" + ) # Skip peer if no IPv4+TCP addresses available if not ipv4_tcp_addrs: @@ -191,19 +160,8 @@ class BootstrapDiscovery: ) return - logger.info( - f"Will attempt connection using {len(ipv4_tcp_addrs)} IPv4+TCP addresses" - ) - # Add only IPv4+TCP addresses to peerstore - self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, 0) - - # Allow other tasks to run after adding to peerstore - await trio.lowlevel.checkpoint() - - # Verify addresses were added - stored_addrs = self.peerstore.addrs(peer_info.peer_id) - logger.info(f"Addresses stored in peerstore: {len(stored_addrs)} addresses") + self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, PERMANENT_ADDR_TTL) # Only emit discovery event if this is the first time we see this peer peer_id_str = str(peer_info.peer_id) @@ -212,12 +170,12 @@ class BootstrapDiscovery: self.discovered_peers.add(peer_id_str) # Emit peer discovery event peerDiscovery.emit_peer_discovered(peer_info) - logger.debug(f"Peer discovered: {peer_info.peer_id}") + logger.info(f"Peer discovered: {peer_info.peer_id}") # Use nursery for parallel connection attempt (non-blocking) try: async with trio.open_nursery() as connection_nursery: - logger.info("Starting parallel connection attempt...") + logger.debug("Starting parallel connection attempt...") connection_nursery.start_soon( self._connect_to_peer, peer_info.peer_id ) @@ -235,7 +193,7 @@ class BootstrapDiscovery: ) # Even for existing peers, try to connect if not already connected if peer_info.peer_id not in self.swarm.connections: - logger.info("Starting parallel connection attempt for existing peer...") + logger.debug("Starting parallel connection attempt for existing peer...") # Use nursery for parallel connection attempt (non-blocking) try: async with trio.open_nursery() as connection_nursery: @@ -261,7 +219,7 @@ class BootstrapDiscovery: Uses swarm.dial_peer to connect using addresses stored in peerstore. Times out after connection_timeout seconds to prevent hanging. """ - logger.info(f"Connection attempt for peer: {peer_id}") + logger.debug(f"Connection attempt for peer: {peer_id}") # Pre-connection validation: Check if already connected if peer_id in self.swarm.connections: @@ -270,18 +228,9 @@ class BootstrapDiscovery: ) return - # Allow other tasks to run before connection attempt - await trio.lowlevel.checkpoint() - # Check available addresses before attempting connection available_addrs = self.peerstore.addrs(peer_id) - logger.info( - f"Available addresses for {peer_id}: {len(available_addrs)} addresses" - ) - - # Log all available addresses for transparency - for i, addr in enumerate(available_addrs, 1): - logger.debug(f" Address {i}: {addr}") + logger.debug(f"Connecting to {peer_id} ({len(available_addrs)} addresses)") if not available_addrs: logger.error(f"❌ No addresses available for {peer_id} - cannot connect") @@ -293,43 +242,23 @@ class BootstrapDiscovery: try: with trio.move_on_after(self.connection_timeout): # Log connection attempt - logger.info( + logger.debug( f"Attempting connection to {peer_id} using " f"{len(available_addrs)} addresses" ) - # Log each address that will be attempted - for i, addr in enumerate(available_addrs, 1): - logger.debug(f"Address {i}: {addr}") - # Use swarm.dial_peer to connect using stored addresses connection = await self.swarm.dial_peer(peer_id) # Calculate connection time connection_time = trio.current_time() - connection_start_time - # Allow other tasks to run after dial attempt - await trio.lowlevel.checkpoint() - # Post-connection validation: Verify connection was actually established if peer_id in self.swarm.connections: logger.info( f"✅ Connected to {peer_id} (took {connection_time:.2f}s)" ) - # Track this connection for drop monitoring - self.connected_peers.add(peer_id) - - # Start monitoring this specific connection for drops - trio.lowlevel.spawn_system_task( - self._monitor_peer_connection, peer_id - ) - - # Log which address was successful (if available) - if hasattr(connection, "get_transport_addresses"): - successful_addrs = connection.get_transport_addresses() - if successful_addrs: - logger.debug(f"Successful address: {successful_addrs[0]}") else: logger.warning( f"Dial succeeded but connection not found for {peer_id}" @@ -357,12 +286,12 @@ class BootstrapDiscovery: and getattr(e.__cause__, "exceptions", None) is not None ): exceptions_list = getattr(e.__cause__, "exceptions") - logger.info("📋 Individual address failure details:") + logger.debug("📋 Individual address failure details:") for i, addr_exception in enumerate(exceptions_list, 1): - logger.info(f"Address {i}: {addr_exception}") + logger.debug(f"Address {i}: {addr_exception}") # Also log the actual address that failed if i <= len(available_addrs): - logger.info(f"Failed address: {available_addrs[i - 1]}") + logger.debug(f"Failed address: {available_addrs[i - 1]}") else: logger.warning("No detailed exception information available") else: @@ -379,39 +308,6 @@ class BootstrapDiscovery: f"{e} (took {failed_connection_time:.2f}s)" ) # Don't re-raise to prevent killing the nursery and other parallel tasks - logger.debug("Continuing with other parallel connection attempts") - - async def _monitor_peer_connection(self, peer_id: ID) -> None: - """ - Monitor a specific peer connection for drops using event-driven detection. - - Waits for the connection to be removed from swarm.connections, which - happens when error 4101 or other connection errors occur. - """ - logger.debug(f"🔍 Started monitoring connection to {peer_id}") - - try: - # Wait for the connection to disappear (event-driven) - while peer_id in self.swarm.connections: - await trio.sleep(0.1) # Small sleep to yield control - - # Connection was dropped - log it immediately - if peer_id in self.connected_peers: - self.connected_peers.discard(peer_id) - logger.warning( - f"📡 Connection to {peer_id} was dropped! (detected event-driven)" - ) - - # Log current connection count - remaining_connections = len(self.connected_peers) - logger.info(f"📊 Remaining connected peers: {remaining_connections}") - - except trio.Cancelled: - logger.debug(f"Connection monitoring for {peer_id} stopped") - except Exception as e: - logger.error(f"Error monitoring connection to {peer_id}: {e}") - # Clean up tracking on error - self.connected_peers.discard(peer_id) def _is_ipv4_tcp_addr(self, addr: Multiaddr) -> bool: """ From 997094e5b7a3e7554e9df2890f73ae4fca92cb19 Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Fri, 29 Aug 2025 11:40:40 +0530 Subject: [PATCH 6/8] resolve linting errors --- libp2p/discovery/bootstrap/bootstrap.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index 9bf4ef52..2bc79c5f 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -7,13 +7,14 @@ import trio from libp2p.abc import ID, INetworkService, PeerInfo from libp2p.discovery.bootstrap.utils import validate_bootstrap_addresses from libp2p.discovery.events.peerDiscovery import peerDiscovery +from libp2p.network.exceptions import SwarmException from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.peerstore import PERMANENT_ADDR_TTL -from libp2p.network.exceptions import SwarmException logger = logging.getLogger("libp2p.discovery.bootstrap") resolver = DNSResolver() + class BootstrapDiscovery: """ Bootstrap-based peer discovery for py-libp2p. @@ -193,7 +194,9 @@ class BootstrapDiscovery: ) # Even for existing peers, try to connect if not already connected if peer_info.peer_id not in self.swarm.connections: - logger.debug("Starting parallel connection attempt for existing peer...") + logger.debug( + "Starting parallel connection attempt for existing peer..." + ) # Use nursery for parallel connection attempt (non-blocking) try: async with trio.open_nursery() as connection_nursery: @@ -248,7 +251,7 @@ class BootstrapDiscovery: ) # Use swarm.dial_peer to connect using stored addresses - connection = await self.swarm.dial_peer(peer_id) + await self.swarm.dial_peer(peer_id) # Calculate connection time connection_time = trio.current_time() - connection_start_time From 8f5dd3bd115cbf4469e8c6824fb76763e3971ab5 Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Fri, 29 Aug 2025 17:28:50 +0530 Subject: [PATCH 7/8] remove excessive use of trio nursery --- libp2p/discovery/bootstrap/bootstrap.py | 45 +++++-------------------- 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index 2bc79c5f..63985242 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -14,6 +14,8 @@ from libp2p.peer.peerstore import PERMANENT_ADDR_TTL logger = logging.getLogger("libp2p.discovery.bootstrap") resolver = DNSResolver() +DEFAULT_CONNECTION_TIMEOUT = 10 + class BootstrapDiscovery: """ @@ -34,7 +36,7 @@ class BootstrapDiscovery: self.peerstore = swarm.peerstore self.bootstrap_addrs = bootstrap_addrs or [] self.discovered_peers: set[str] = set() - self.connection_timeout: int = 10 + self.connection_timeout: int = DEFAULT_CONNECTION_TIMEOUT async def start(self) -> None: """Process bootstrap addresses and emit peer discovery events in parallel.""" @@ -173,20 +175,9 @@ class BootstrapDiscovery: peerDiscovery.emit_peer_discovered(peer_info) logger.info(f"Peer discovered: {peer_info.peer_id}") - # Use nursery for parallel connection attempt (non-blocking) - try: - async with trio.open_nursery() as connection_nursery: - logger.debug("Starting parallel connection attempt...") - connection_nursery.start_soon( - self._connect_to_peer, peer_info.peer_id - ) - except trio.Cancelled: - logger.debug(f"Connection attempt cancelled for {peer_info.peer_id}") - raise - except Exception as e: - logger.warning( - f"Connection nursery failed for {peer_info.peer_id}: {e}" - ) + # Connect to peer (parallel across different bootstrap addresses) + logger.debug("Connecting to discovered peer...") + await self._connect_to_peer(peer_info.peer_id) else: logger.debug( @@ -194,33 +185,15 @@ class BootstrapDiscovery: ) # Even for existing peers, try to connect if not already connected if peer_info.peer_id not in self.swarm.connections: - logger.debug( - "Starting parallel connection attempt for existing peer..." - ) - # Use nursery for parallel connection attempt (non-blocking) - try: - async with trio.open_nursery() as connection_nursery: - connection_nursery.start_soon( - self._connect_to_peer, peer_info.peer_id - ) - except trio.Cancelled: - logger.debug( - f"Connection attempt cancelled for existing peer " - f"{peer_info.peer_id}" - ) - raise - except Exception as e: - logger.warning( - f"Connection nursery failed for existing peer " - f"{peer_info.peer_id}: {e}" - ) + logger.debug("Connecting to existing peer...") + await self._connect_to_peer(peer_info.peer_id) async def _connect_to_peer(self, peer_id: ID) -> None: """ Attempt to establish a connection to a peer with timeout. Uses swarm.dial_peer to connect using addresses stored in peerstore. - Times out after connection_timeout seconds to prevent hanging. + Times out after self.connection_timeout seconds to prevent hanging. """ logger.debug(f"Connection attempt for peer: {peer_id}") From 37a4d96f902305af2f8baface46664c9787344b4 Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Tue, 2 Sep 2025 22:23:11 +0530 Subject: [PATCH 8/8] add rst --- newsfragments/849.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/849.feature.rst diff --git a/newsfragments/849.feature.rst b/newsfragments/849.feature.rst new file mode 100644 index 00000000..73ad1453 --- /dev/null +++ b/newsfragments/849.feature.rst @@ -0,0 +1 @@ +Add automatic peer dialing in bootstrap module using trio.Nursery.