From 3d1c36419c84e9694496a9b689f717be7d410de7 Mon Sep 17 00:00:00 2001 From: ankur12-1610 Date: Fri, 29 Aug 2025 02:05:34 +0530 Subject: [PATCH] remove checkpoints, resolve logs, ttl and fix minor issues --- libp2p/discovery/bootstrap/bootstrap.py | 212 ++++++------------------ 1 file changed, 54 insertions(+), 158 deletions(-) diff --git a/libp2p/discovery/bootstrap/bootstrap.py b/libp2p/discovery/bootstrap/bootstrap.py index c1a6cbbc..9bf4ef52 100644 --- a/libp2p/discovery/bootstrap/bootstrap.py +++ b/libp2p/discovery/bootstrap/bootstrap.py @@ -5,20 +5,19 @@ from multiaddr.resolvers import DNSResolver import trio from libp2p.abc import ID, INetworkService, PeerInfo +from libp2p.discovery.bootstrap.utils import validate_bootstrap_addresses from libp2p.discovery.events.peerDiscovery import peerDiscovery -from libp2p.network.exceptions import SwarmException from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.peer.peerstore import PERMANENT_ADDR_TTL +from libp2p.network.exceptions import SwarmException logger = logging.getLogger("libp2p.discovery.bootstrap") resolver = DNSResolver() - class BootstrapDiscovery: """ Bootstrap-based peer discovery for py-libp2p. - - Processes bootstrap addresses in parallel and attempts initial connections. - Adds discovered peers to peerstore for network bootstrapping. + Connects to predefined bootstrap peers and adds them to peerstore. """ def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]): @@ -35,9 +34,6 @@ class BootstrapDiscovery: self.bootstrap_addrs = bootstrap_addrs or [] self.discovered_peers: set[str] = set() self.connection_timeout: int = 10 - self.connected_peers: set[ID] = ( - set() - ) # Track connected peers for drop detection async def start(self) -> None: """Process bootstrap addresses and emit peer discovery events in parallel.""" @@ -48,38 +44,32 @@ class BootstrapDiscovery: # Show all bootstrap addresses being processed for i, addr in enumerate(self.bootstrap_addrs): - logger.info(f"{i + 1}. {addr}") - - # Allow other tasks to run - await trio.lowlevel.checkpoint() + logger.debug(f"{i + 1}. {addr}") # Validate and filter bootstrap addresses - # self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs) + self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs) logger.info(f"Valid addresses after validation: {len(self.bootstrap_addrs)}") - # Allow other tasks to run after validation - await trio.lowlevel.checkpoint() - # Use Trio nursery for PARALLEL address processing try: async with trio.open_nursery() as nursery: - logger.info( + logger.debug( f"Starting {len(self.bootstrap_addrs)} parallel address " f"processing tasks" ) # Start all bootstrap address processing tasks in parallel for addr_str in self.bootstrap_addrs: - logger.info(f"Starting parallel task for: {addr_str}") + logger.debug(f"Starting parallel task for: {addr_str}") nursery.start_soon(self._process_bootstrap_addr, addr_str) # The nursery will wait for all address processing tasks to complete - logger.info( + logger.debug( "Nursery active - waiting for address processing tasks to complete" ) except trio.Cancelled: - logger.info("Bootstrap address processing cancelled - cleaning up tasks") + logger.debug("Bootstrap address processing cancelled - cleaning up tasks") raise except Exception as e: logger.error(f"Bootstrap address processing failed: {e}") @@ -93,52 +83,40 @@ class BootstrapDiscovery: # Clear discovered peers self.discovered_peers.clear() - self.connected_peers.clear() logger.debug("Bootstrap discovery cleanup completed") - async def _process_bootstrap_addr_safe(self, addr_str: str) -> None: - """Safely process a bootstrap address with exception handling.""" - try: - await self._process_bootstrap_addr(addr_str) - except Exception as e: - logger.warning(f"Failed to process bootstrap address {addr_str}: {e}") - # Ensure task cleanup and continue processing other addresses - async def _process_bootstrap_addr(self, addr_str: str) -> None: """Convert string address to PeerInfo and add to peerstore.""" try: - multiaddr = Multiaddr(addr_str) + try: + multiaddr = Multiaddr(addr_str) + except Exception as e: + logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") + return + + if self.is_dns_addr(multiaddr): + resolved_addrs = await resolver.resolve(multiaddr) + if resolved_addrs is None: + logger.warning(f"DNS resolution returned None for: {addr_str}") + return + + peer_id_str = multiaddr.get_peer_id() + if peer_id_str is None: + logger.warning(f"Missing peer ID in DNS address: {addr_str}") + return + peer_id = ID.from_base58(peer_id_str) + addrs = [addr for addr in resolved_addrs] + if not addrs: + logger.warning(f"No addresses resolved for DNS address: {addr_str}") + return + peer_info = PeerInfo(peer_id, addrs) + await self.add_addr(peer_info) + else: + peer_info = info_from_p2p_addr(multiaddr) + await self.add_addr(peer_info) except Exception as e: - logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") - return - - if self.is_dns_addr(multiaddr): - # Allow other tasks to run during DNS resolution - await trio.lowlevel.checkpoint() - - resolved_addrs = await resolver.resolve(multiaddr) - if resolved_addrs is None: - logger.warning(f"DNS resolution returned None for: {addr_str}") - return - - # Allow other tasks to run after DNS resolution - await trio.lowlevel.checkpoint() - - peer_id_str = multiaddr.get_peer_id() - if peer_id_str is None: - logger.warning(f"Missing peer ID in DNS address: {addr_str}") - return - peer_id = ID.from_base58(peer_id_str) - addrs = [addr for addr in resolved_addrs] - if not addrs: - logger.warning(f"No addresses resolved for DNS address: {addr_str}") - return - peer_info = PeerInfo(peer_id, addrs) - await self.add_addr(peer_info) - else: - peer_info = info_from_p2p_addr(multiaddr) - await self.add_addr(peer_info) + logger.warning(f"Failed to process bootstrap address {addr_str}: {e}") def is_dns_addr(self, addr: Multiaddr) -> bool: """Check if the address is a DNS address.""" @@ -149,8 +127,9 @@ class BootstrapDiscovery: Add a peer to the peerstore, emit discovery event, and attempt connection in parallel. """ - logger.info(f"Adding peer to peerstore: {peer_info.peer_id}") - logger.info(f"Total addresses received: {len(peer_info.addrs)}") + logger.debug( + f"Adding peer {peer_info.peer_id} with {len(peer_info.addrs)} addresses" + ) # Skip if it's our own peer if peer_info.peer_id == self.swarm.get_peer_id(): @@ -168,20 +147,10 @@ class BootstrapDiscovery: filtered_out_addrs.append(addr) # Log filtering results - logger.info(f"Address filtering for {peer_info.peer_id}:") - logger.info(f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)}") - logger.info(f"Filtered out: {len(filtered_out_addrs)} (unsupported protocols)") - - # Show filtered addresses for debugging - if filtered_out_addrs: - for addr in filtered_out_addrs: - logger.debug(f"Filtered: {addr}") - - # Show addresses that will be used - if ipv4_tcp_addrs: - logger.debug("Usable addresses:") - for i, addr in enumerate(ipv4_tcp_addrs, 1): - logger.debug(f" Address {i}: {addr}") + logger.debug( + f"Address filtering for {peer_info.peer_id}: " + f"{len(ipv4_tcp_addrs)} IPv4+TCP, {len(filtered_out_addrs)} filtered" + ) # Skip peer if no IPv4+TCP addresses available if not ipv4_tcp_addrs: @@ -191,19 +160,8 @@ class BootstrapDiscovery: ) return - logger.info( - f"Will attempt connection using {len(ipv4_tcp_addrs)} IPv4+TCP addresses" - ) - # Add only IPv4+TCP addresses to peerstore - self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, 0) - - # Allow other tasks to run after adding to peerstore - await trio.lowlevel.checkpoint() - - # Verify addresses were added - stored_addrs = self.peerstore.addrs(peer_info.peer_id) - logger.info(f"Addresses stored in peerstore: {len(stored_addrs)} addresses") + self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, PERMANENT_ADDR_TTL) # Only emit discovery event if this is the first time we see this peer peer_id_str = str(peer_info.peer_id) @@ -212,12 +170,12 @@ class BootstrapDiscovery: self.discovered_peers.add(peer_id_str) # Emit peer discovery event peerDiscovery.emit_peer_discovered(peer_info) - logger.debug(f"Peer discovered: {peer_info.peer_id}") + logger.info(f"Peer discovered: {peer_info.peer_id}") # Use nursery for parallel connection attempt (non-blocking) try: async with trio.open_nursery() as connection_nursery: - logger.info("Starting parallel connection attempt...") + logger.debug("Starting parallel connection attempt...") connection_nursery.start_soon( self._connect_to_peer, peer_info.peer_id ) @@ -235,7 +193,7 @@ class BootstrapDiscovery: ) # Even for existing peers, try to connect if not already connected if peer_info.peer_id not in self.swarm.connections: - logger.info("Starting parallel connection attempt for existing peer...") + logger.debug("Starting parallel connection attempt for existing peer...") # Use nursery for parallel connection attempt (non-blocking) try: async with trio.open_nursery() as connection_nursery: @@ -261,7 +219,7 @@ class BootstrapDiscovery: Uses swarm.dial_peer to connect using addresses stored in peerstore. Times out after connection_timeout seconds to prevent hanging. """ - logger.info(f"Connection attempt for peer: {peer_id}") + logger.debug(f"Connection attempt for peer: {peer_id}") # Pre-connection validation: Check if already connected if peer_id in self.swarm.connections: @@ -270,18 +228,9 @@ class BootstrapDiscovery: ) return - # Allow other tasks to run before connection attempt - await trio.lowlevel.checkpoint() - # Check available addresses before attempting connection available_addrs = self.peerstore.addrs(peer_id) - logger.info( - f"Available addresses for {peer_id}: {len(available_addrs)} addresses" - ) - - # Log all available addresses for transparency - for i, addr in enumerate(available_addrs, 1): - logger.debug(f" Address {i}: {addr}") + logger.debug(f"Connecting to {peer_id} ({len(available_addrs)} addresses)") if not available_addrs: logger.error(f"❌ No addresses available for {peer_id} - cannot connect") @@ -293,43 +242,23 @@ class BootstrapDiscovery: try: with trio.move_on_after(self.connection_timeout): # Log connection attempt - logger.info( + logger.debug( f"Attempting connection to {peer_id} using " f"{len(available_addrs)} addresses" ) - # Log each address that will be attempted - for i, addr in enumerate(available_addrs, 1): - logger.debug(f"Address {i}: {addr}") - # Use swarm.dial_peer to connect using stored addresses connection = await self.swarm.dial_peer(peer_id) # Calculate connection time connection_time = trio.current_time() - connection_start_time - # Allow other tasks to run after dial attempt - await trio.lowlevel.checkpoint() - # Post-connection validation: Verify connection was actually established if peer_id in self.swarm.connections: logger.info( f"✅ Connected to {peer_id} (took {connection_time:.2f}s)" ) - # Track this connection for drop monitoring - self.connected_peers.add(peer_id) - - # Start monitoring this specific connection for drops - trio.lowlevel.spawn_system_task( - self._monitor_peer_connection, peer_id - ) - - # Log which address was successful (if available) - if hasattr(connection, "get_transport_addresses"): - successful_addrs = connection.get_transport_addresses() - if successful_addrs: - logger.debug(f"Successful address: {successful_addrs[0]}") else: logger.warning( f"Dial succeeded but connection not found for {peer_id}" @@ -357,12 +286,12 @@ class BootstrapDiscovery: and getattr(e.__cause__, "exceptions", None) is not None ): exceptions_list = getattr(e.__cause__, "exceptions") - logger.info("📋 Individual address failure details:") + logger.debug("📋 Individual address failure details:") for i, addr_exception in enumerate(exceptions_list, 1): - logger.info(f"Address {i}: {addr_exception}") + logger.debug(f"Address {i}: {addr_exception}") # Also log the actual address that failed if i <= len(available_addrs): - logger.info(f"Failed address: {available_addrs[i - 1]}") + logger.debug(f"Failed address: {available_addrs[i - 1]}") else: logger.warning("No detailed exception information available") else: @@ -379,39 +308,6 @@ class BootstrapDiscovery: f"{e} (took {failed_connection_time:.2f}s)" ) # Don't re-raise to prevent killing the nursery and other parallel tasks - logger.debug("Continuing with other parallel connection attempts") - - async def _monitor_peer_connection(self, peer_id: ID) -> None: - """ - Monitor a specific peer connection for drops using event-driven detection. - - Waits for the connection to be removed from swarm.connections, which - happens when error 4101 or other connection errors occur. - """ - logger.debug(f"🔍 Started monitoring connection to {peer_id}") - - try: - # Wait for the connection to disappear (event-driven) - while peer_id in self.swarm.connections: - await trio.sleep(0.1) # Small sleep to yield control - - # Connection was dropped - log it immediately - if peer_id in self.connected_peers: - self.connected_peers.discard(peer_id) - logger.warning( - f"📡 Connection to {peer_id} was dropped! (detected event-driven)" - ) - - # Log current connection count - remaining_connections = len(self.connected_peers) - logger.info(f"📊 Remaining connected peers: {remaining_connections}") - - except trio.Cancelled: - logger.debug(f"Connection monitoring for {peer_id} stopped") - except Exception as e: - logger.error(f"Error monitoring connection to {peer_id}: {e}") - # Clean up tracking on error - self.connected_peers.discard(peer_id) def _is_ipv4_tcp_addr(self, addr: Multiaddr) -> bool: """