remove checkpoints, resolve logs, ttl and fix minor issues

This commit is contained in:
ankur12-1610
2025-08-29 02:05:34 +05:30
parent c940dac1e6
commit 3d1c36419c

View File

@ -5,20 +5,19 @@ from multiaddr.resolvers import DNSResolver
import trio import trio
from libp2p.abc import ID, INetworkService, PeerInfo from libp2p.abc import ID, INetworkService, PeerInfo
from libp2p.discovery.bootstrap.utils import validate_bootstrap_addresses
from libp2p.discovery.events.peerDiscovery import peerDiscovery from libp2p.discovery.events.peerDiscovery import peerDiscovery
from libp2p.network.exceptions import SwarmException
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.peer.peerstore import PERMANENT_ADDR_TTL
from libp2p.network.exceptions import SwarmException
logger = logging.getLogger("libp2p.discovery.bootstrap") logger = logging.getLogger("libp2p.discovery.bootstrap")
resolver = DNSResolver() resolver = DNSResolver()
class BootstrapDiscovery: class BootstrapDiscovery:
""" """
Bootstrap-based peer discovery for py-libp2p. Bootstrap-based peer discovery for py-libp2p.
Connects to predefined bootstrap peers and adds them to peerstore.
Processes bootstrap addresses in parallel and attempts initial connections.
Adds discovered peers to peerstore for network bootstrapping.
""" """
def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]): def __init__(self, swarm: INetworkService, bootstrap_addrs: list[str]):
@ -35,9 +34,6 @@ class BootstrapDiscovery:
self.bootstrap_addrs = bootstrap_addrs or [] self.bootstrap_addrs = bootstrap_addrs or []
self.discovered_peers: set[str] = set() self.discovered_peers: set[str] = set()
self.connection_timeout: int = 10 self.connection_timeout: int = 10
self.connected_peers: set[ID] = (
set()
) # Track connected peers for drop detection
async def start(self) -> None: async def start(self) -> None:
"""Process bootstrap addresses and emit peer discovery events in parallel.""" """Process bootstrap addresses and emit peer discovery events in parallel."""
@ -48,38 +44,32 @@ class BootstrapDiscovery:
# Show all bootstrap addresses being processed # Show all bootstrap addresses being processed
for i, addr in enumerate(self.bootstrap_addrs): for i, addr in enumerate(self.bootstrap_addrs):
logger.info(f"{i + 1}. {addr}") logger.debug(f"{i + 1}. {addr}")
# Allow other tasks to run
await trio.lowlevel.checkpoint()
# Validate and filter bootstrap addresses # Validate and filter bootstrap addresses
# self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs) self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs)
logger.info(f"Valid addresses after validation: {len(self.bootstrap_addrs)}") logger.info(f"Valid addresses after validation: {len(self.bootstrap_addrs)}")
# Allow other tasks to run after validation
await trio.lowlevel.checkpoint()
# Use Trio nursery for PARALLEL address processing # Use Trio nursery for PARALLEL address processing
try: try:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
logger.info( logger.debug(
f"Starting {len(self.bootstrap_addrs)} parallel address " f"Starting {len(self.bootstrap_addrs)} parallel address "
f"processing tasks" f"processing tasks"
) )
# Start all bootstrap address processing tasks in parallel # Start all bootstrap address processing tasks in parallel
for addr_str in self.bootstrap_addrs: for addr_str in self.bootstrap_addrs:
logger.info(f"Starting parallel task for: {addr_str}") logger.debug(f"Starting parallel task for: {addr_str}")
nursery.start_soon(self._process_bootstrap_addr, addr_str) nursery.start_soon(self._process_bootstrap_addr, addr_str)
# The nursery will wait for all address processing tasks to complete # The nursery will wait for all address processing tasks to complete
logger.info( logger.debug(
"Nursery active - waiting for address processing tasks to complete" "Nursery active - waiting for address processing tasks to complete"
) )
except trio.Cancelled: except trio.Cancelled:
logger.info("Bootstrap address processing cancelled - cleaning up tasks") logger.debug("Bootstrap address processing cancelled - cleaning up tasks")
raise raise
except Exception as e: except Exception as e:
logger.error(f"Bootstrap address processing failed: {e}") logger.error(f"Bootstrap address processing failed: {e}")
@ -93,52 +83,40 @@ class BootstrapDiscovery:
# Clear discovered peers # Clear discovered peers
self.discovered_peers.clear() self.discovered_peers.clear()
self.connected_peers.clear()
logger.debug("Bootstrap discovery cleanup completed") logger.debug("Bootstrap discovery cleanup completed")
async def _process_bootstrap_addr_safe(self, addr_str: str) -> None:
"""Safely process a bootstrap address with exception handling."""
try:
await self._process_bootstrap_addr(addr_str)
except Exception as e:
logger.warning(f"Failed to process bootstrap address {addr_str}: {e}")
# Ensure task cleanup and continue processing other addresses
async def _process_bootstrap_addr(self, addr_str: str) -> None: async def _process_bootstrap_addr(self, addr_str: str) -> None:
"""Convert string address to PeerInfo and add to peerstore.""" """Convert string address to PeerInfo and add to peerstore."""
try: try:
multiaddr = Multiaddr(addr_str) try:
multiaddr = Multiaddr(addr_str)
except Exception as e:
logger.debug(f"Invalid multiaddr format '{addr_str}': {e}")
return
if self.is_dns_addr(multiaddr):
resolved_addrs = await resolver.resolve(multiaddr)
if resolved_addrs is None:
logger.warning(f"DNS resolution returned None for: {addr_str}")
return
peer_id_str = multiaddr.get_peer_id()
if peer_id_str is None:
logger.warning(f"Missing peer ID in DNS address: {addr_str}")
return
peer_id = ID.from_base58(peer_id_str)
addrs = [addr for addr in resolved_addrs]
if not addrs:
logger.warning(f"No addresses resolved for DNS address: {addr_str}")
return
peer_info = PeerInfo(peer_id, addrs)
await self.add_addr(peer_info)
else:
peer_info = info_from_p2p_addr(multiaddr)
await self.add_addr(peer_info)
except Exception as e: except Exception as e:
logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") logger.warning(f"Failed to process bootstrap address {addr_str}: {e}")
return
if self.is_dns_addr(multiaddr):
# Allow other tasks to run during DNS resolution
await trio.lowlevel.checkpoint()
resolved_addrs = await resolver.resolve(multiaddr)
if resolved_addrs is None:
logger.warning(f"DNS resolution returned None for: {addr_str}")
return
# Allow other tasks to run after DNS resolution
await trio.lowlevel.checkpoint()
peer_id_str = multiaddr.get_peer_id()
if peer_id_str is None:
logger.warning(f"Missing peer ID in DNS address: {addr_str}")
return
peer_id = ID.from_base58(peer_id_str)
addrs = [addr for addr in resolved_addrs]
if not addrs:
logger.warning(f"No addresses resolved for DNS address: {addr_str}")
return
peer_info = PeerInfo(peer_id, addrs)
await self.add_addr(peer_info)
else:
peer_info = info_from_p2p_addr(multiaddr)
await self.add_addr(peer_info)
def is_dns_addr(self, addr: Multiaddr) -> bool: def is_dns_addr(self, addr: Multiaddr) -> bool:
"""Check if the address is a DNS address.""" """Check if the address is a DNS address."""
@ -149,8 +127,9 @@ class BootstrapDiscovery:
Add a peer to the peerstore, emit discovery event, Add a peer to the peerstore, emit discovery event,
and attempt connection in parallel. and attempt connection in parallel.
""" """
logger.info(f"Adding peer to peerstore: {peer_info.peer_id}") logger.debug(
logger.info(f"Total addresses received: {len(peer_info.addrs)}") f"Adding peer {peer_info.peer_id} with {len(peer_info.addrs)} addresses"
)
# Skip if it's our own peer # Skip if it's our own peer
if peer_info.peer_id == self.swarm.get_peer_id(): if peer_info.peer_id == self.swarm.get_peer_id():
@ -168,20 +147,10 @@ class BootstrapDiscovery:
filtered_out_addrs.append(addr) filtered_out_addrs.append(addr)
# Log filtering results # Log filtering results
logger.info(f"Address filtering for {peer_info.peer_id}:") logger.debug(
logger.info(f"IPv4+TCP addresses: {len(ipv4_tcp_addrs)}") f"Address filtering for {peer_info.peer_id}: "
logger.info(f"Filtered out: {len(filtered_out_addrs)} (unsupported protocols)") f"{len(ipv4_tcp_addrs)} IPv4+TCP, {len(filtered_out_addrs)} filtered"
)
# Show filtered addresses for debugging
if filtered_out_addrs:
for addr in filtered_out_addrs:
logger.debug(f"Filtered: {addr}")
# Show addresses that will be used
if ipv4_tcp_addrs:
logger.debug("Usable addresses:")
for i, addr in enumerate(ipv4_tcp_addrs, 1):
logger.debug(f" Address {i}: {addr}")
# Skip peer if no IPv4+TCP addresses available # Skip peer if no IPv4+TCP addresses available
if not ipv4_tcp_addrs: if not ipv4_tcp_addrs:
@ -191,19 +160,8 @@ class BootstrapDiscovery:
) )
return return
logger.info(
f"Will attempt connection using {len(ipv4_tcp_addrs)} IPv4+TCP addresses"
)
# Add only IPv4+TCP addresses to peerstore # Add only IPv4+TCP addresses to peerstore
self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, 0) self.peerstore.add_addrs(peer_info.peer_id, ipv4_tcp_addrs, PERMANENT_ADDR_TTL)
# Allow other tasks to run after adding to peerstore
await trio.lowlevel.checkpoint()
# Verify addresses were added
stored_addrs = self.peerstore.addrs(peer_info.peer_id)
logger.info(f"Addresses stored in peerstore: {len(stored_addrs)} addresses")
# Only emit discovery event if this is the first time we see this peer # Only emit discovery event if this is the first time we see this peer
peer_id_str = str(peer_info.peer_id) peer_id_str = str(peer_info.peer_id)
@ -212,12 +170,12 @@ class BootstrapDiscovery:
self.discovered_peers.add(peer_id_str) self.discovered_peers.add(peer_id_str)
# Emit peer discovery event # Emit peer discovery event
peerDiscovery.emit_peer_discovered(peer_info) peerDiscovery.emit_peer_discovered(peer_info)
logger.debug(f"Peer discovered: {peer_info.peer_id}") logger.info(f"Peer discovered: {peer_info.peer_id}")
# Use nursery for parallel connection attempt (non-blocking) # Use nursery for parallel connection attempt (non-blocking)
try: try:
async with trio.open_nursery() as connection_nursery: async with trio.open_nursery() as connection_nursery:
logger.info("Starting parallel connection attempt...") logger.debug("Starting parallel connection attempt...")
connection_nursery.start_soon( connection_nursery.start_soon(
self._connect_to_peer, peer_info.peer_id self._connect_to_peer, peer_info.peer_id
) )
@ -235,7 +193,7 @@ class BootstrapDiscovery:
) )
# Even for existing peers, try to connect if not already connected # Even for existing peers, try to connect if not already connected
if peer_info.peer_id not in self.swarm.connections: if peer_info.peer_id not in self.swarm.connections:
logger.info("Starting parallel connection attempt for existing peer...") logger.debug("Starting parallel connection attempt for existing peer...")
# Use nursery for parallel connection attempt (non-blocking) # Use nursery for parallel connection attempt (non-blocking)
try: try:
async with trio.open_nursery() as connection_nursery: async with trio.open_nursery() as connection_nursery:
@ -261,7 +219,7 @@ class BootstrapDiscovery:
Uses swarm.dial_peer to connect using addresses stored in peerstore. Uses swarm.dial_peer to connect using addresses stored in peerstore.
Times out after connection_timeout seconds to prevent hanging. Times out after connection_timeout seconds to prevent hanging.
""" """
logger.info(f"Connection attempt for peer: {peer_id}") logger.debug(f"Connection attempt for peer: {peer_id}")
# Pre-connection validation: Check if already connected # Pre-connection validation: Check if already connected
if peer_id in self.swarm.connections: if peer_id in self.swarm.connections:
@ -270,18 +228,9 @@ class BootstrapDiscovery:
) )
return return
# Allow other tasks to run before connection attempt
await trio.lowlevel.checkpoint()
# Check available addresses before attempting connection # Check available addresses before attempting connection
available_addrs = self.peerstore.addrs(peer_id) available_addrs = self.peerstore.addrs(peer_id)
logger.info( logger.debug(f"Connecting to {peer_id} ({len(available_addrs)} addresses)")
f"Available addresses for {peer_id}: {len(available_addrs)} addresses"
)
# Log all available addresses for transparency
for i, addr in enumerate(available_addrs, 1):
logger.debug(f" Address {i}: {addr}")
if not available_addrs: if not available_addrs:
logger.error(f"❌ No addresses available for {peer_id} - cannot connect") logger.error(f"❌ No addresses available for {peer_id} - cannot connect")
@ -293,43 +242,23 @@ class BootstrapDiscovery:
try: try:
with trio.move_on_after(self.connection_timeout): with trio.move_on_after(self.connection_timeout):
# Log connection attempt # Log connection attempt
logger.info( logger.debug(
f"Attempting connection to {peer_id} using " f"Attempting connection to {peer_id} using "
f"{len(available_addrs)} addresses" f"{len(available_addrs)} addresses"
) )
# Log each address that will be attempted
for i, addr in enumerate(available_addrs, 1):
logger.debug(f"Address {i}: {addr}")
# Use swarm.dial_peer to connect using stored addresses # Use swarm.dial_peer to connect using stored addresses
connection = await self.swarm.dial_peer(peer_id) connection = await self.swarm.dial_peer(peer_id)
# Calculate connection time # Calculate connection time
connection_time = trio.current_time() - connection_start_time connection_time = trio.current_time() - connection_start_time
# Allow other tasks to run after dial attempt
await trio.lowlevel.checkpoint()
# Post-connection validation: Verify connection was actually established # Post-connection validation: Verify connection was actually established
if peer_id in self.swarm.connections: if peer_id in self.swarm.connections:
logger.info( logger.info(
f"✅ Connected to {peer_id} (took {connection_time:.2f}s)" f"✅ Connected to {peer_id} (took {connection_time:.2f}s)"
) )
# Track this connection for drop monitoring
self.connected_peers.add(peer_id)
# Start monitoring this specific connection for drops
trio.lowlevel.spawn_system_task(
self._monitor_peer_connection, peer_id
)
# Log which address was successful (if available)
if hasattr(connection, "get_transport_addresses"):
successful_addrs = connection.get_transport_addresses()
if successful_addrs:
logger.debug(f"Successful address: {successful_addrs[0]}")
else: else:
logger.warning( logger.warning(
f"Dial succeeded but connection not found for {peer_id}" f"Dial succeeded but connection not found for {peer_id}"
@ -357,12 +286,12 @@ class BootstrapDiscovery:
and getattr(e.__cause__, "exceptions", None) is not None and getattr(e.__cause__, "exceptions", None) is not None
): ):
exceptions_list = getattr(e.__cause__, "exceptions") exceptions_list = getattr(e.__cause__, "exceptions")
logger.info("📋 Individual address failure details:") logger.debug("📋 Individual address failure details:")
for i, addr_exception in enumerate(exceptions_list, 1): for i, addr_exception in enumerate(exceptions_list, 1):
logger.info(f"Address {i}: {addr_exception}") logger.debug(f"Address {i}: {addr_exception}")
# Also log the actual address that failed # Also log the actual address that failed
if i <= len(available_addrs): if i <= len(available_addrs):
logger.info(f"Failed address: {available_addrs[i - 1]}") logger.debug(f"Failed address: {available_addrs[i - 1]}")
else: else:
logger.warning("No detailed exception information available") logger.warning("No detailed exception information available")
else: else:
@ -379,39 +308,6 @@ class BootstrapDiscovery:
f"{e} (took {failed_connection_time:.2f}s)" f"{e} (took {failed_connection_time:.2f}s)"
) )
# Don't re-raise to prevent killing the nursery and other parallel tasks # Don't re-raise to prevent killing the nursery and other parallel tasks
logger.debug("Continuing with other parallel connection attempts")
async def _monitor_peer_connection(self, peer_id: ID) -> None:
"""
Monitor a specific peer connection for drops using event-driven detection.
Waits for the connection to be removed from swarm.connections, which
happens when error 4101 or other connection errors occur.
"""
logger.debug(f"🔍 Started monitoring connection to {peer_id}")
try:
# Wait for the connection to disappear (event-driven)
while peer_id in self.swarm.connections:
await trio.sleep(0.1) # Small sleep to yield control
# Connection was dropped - log it immediately
if peer_id in self.connected_peers:
self.connected_peers.discard(peer_id)
logger.warning(
f"📡 Connection to {peer_id} was dropped! (detected event-driven)"
)
# Log current connection count
remaining_connections = len(self.connected_peers)
logger.info(f"📊 Remaining connected peers: {remaining_connections}")
except trio.Cancelled:
logger.debug(f"Connection monitoring for {peer_id} stopped")
except Exception as e:
logger.error(f"Error monitoring connection to {peer_id}: {e}")
# Clean up tracking on error
self.connected_peers.discard(peer_id)
def _is_ipv4_tcp_addr(self, addr: Multiaddr) -> bool: def _is_ipv4_tcp_addr(self, addr: Multiaddr) -> bool:
""" """