Add trio nursery address resolution and connection attempts

This commit is contained in:
ankur12-1610
2025-08-21 11:20:21 +05:30
parent a9f184be6a
commit 8d9b7f413d

View File

@ -1,4 +1,5 @@
import logging import logging
import trio
from multiaddr import Multiaddr from multiaddr import Multiaddr
from multiaddr.resolvers import DNSResolver from multiaddr.resolvers import DNSResolver
@ -16,6 +17,8 @@ resolver = DNSResolver()
class BootstrapDiscovery: class BootstrapDiscovery:
""" """
Bootstrap-based peer discovery for py-libp2p. Bootstrap-based peer discovery for py-libp2p.
Uses Trio nurseries for parallel address resolution and connection attempts.
Connects to predefined bootstrap peers and adds them to peerstore. Connects to predefined bootstrap peers and adds them to peerstore.
""" """
@ -26,26 +29,52 @@ class BootstrapDiscovery:
self.discovered_peers: set[str] = set() self.discovered_peers: set[str] = set()
async def start(self) -> None: async def start(self) -> None:
"""Process bootstrap addresses and emit peer discovery events.""" """Process bootstrap addresses and emit peer discovery events in parallel."""
logger.debug( logger.info(
f"Starting bootstrap discovery with " f"🚀 Starting bootstrap discovery with "
f"{len(self.bootstrap_addrs)} bootstrap addresses" f"{len(self.bootstrap_addrs)} bootstrap addresses"
) )
# Validate and filter bootstrap addresses # Show all bootstrap addresses being processed
self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs) for i, addr in enumerate(self.bootstrap_addrs):
logger.info(f"{i+1}. {addr}")
for addr_str in self.bootstrap_addrs: # Allow other tasks to run
try: await trio.lowlevel.checkpoint()
await self._process_bootstrap_addr(addr_str)
except Exception as e: # Validate and filter bootstrap addresses
logger.debug(f"Failed to process bootstrap address {addr_str}: {e}") # self.bootstrap_addrs = validate_bootstrap_addresses(self.bootstrap_addrs)
logger.info(f"Valid addresses after validation: {len(self.bootstrap_addrs)}")
# Allow other tasks to run after validation
await trio.lowlevel.checkpoint()
# Use Trio nursery for PARALLEL address processing
async with trio.open_nursery() as nursery:
logger.info(f"Starting {len(self.bootstrap_addrs)} parallel address processing tasks")
# Start all bootstrap address processing tasks in parallel
for addr_str in self.bootstrap_addrs:
logger.info(f"Starting parallel task for: {addr_str}")
nursery.start_soon(self._process_bootstrap_addr_safe, addr_str)
# The nursery will wait for all address processing tasks to complete
logger.info("⏳ Nursery active - waiting for address processing tasks to complete")
logger.info("✅ Bootstrap discovery startup complete - all tasks finished")
def stop(self) -> None: def stop(self) -> None:
"""Clean up bootstrap discovery resources.""" """Clean up bootstrap discovery resources."""
logger.debug("Stopping bootstrap discovery") logger.debug("Stopping bootstrap discovery")
self.discovered_peers.clear() self.discovered_peers.clear()
async def _process_bootstrap_addr_safe(self, addr_str: str) -> None:
"""Safely process a bootstrap address with exception handling."""
try:
await self._process_bootstrap_addr(addr_str)
except Exception as e:
logger.debug(f"Failed to process bootstrap address {addr_str}: {e}")
async def _process_bootstrap_addr(self, addr_str: str) -> None: async def _process_bootstrap_addr(self, addr_str: str) -> None:
"""Convert string address to PeerInfo and add to peerstore.""" """Convert string address to PeerInfo and add to peerstore."""
try: try:
@ -53,8 +82,19 @@ class BootstrapDiscovery:
except Exception as e: except Exception as e:
logger.debug(f"Invalid multiaddr format '{addr_str}': {e}") logger.debug(f"Invalid multiaddr format '{addr_str}': {e}")
return return
if self.is_dns_addr(multiaddr): if self.is_dns_addr(multiaddr):
# Allow other tasks to run during DNS resolution
await trio.lowlevel.checkpoint()
resolved_addrs = await resolver.resolve(multiaddr) resolved_addrs = await resolver.resolve(multiaddr)
if resolved_addrs is None:
logger.warning(f"DNS resolution returned None for: {addr_str}")
return
# Allow other tasks to run after DNS resolution
await trio.lowlevel.checkpoint()
peer_id_str = multiaddr.get_peer_id() peer_id_str = multiaddr.get_peer_id()
if peer_id_str is None: if peer_id_str is None:
logger.warning(f"Missing peer ID in DNS address: {addr_str}") logger.warning(f"Missing peer ID in DNS address: {addr_str}")
@ -75,14 +115,24 @@ class BootstrapDiscovery:
return any(protocol.name == "dnsaddr" for protocol in addr.protocols()) return any(protocol.name == "dnsaddr" for protocol in addr.protocols())
async def add_addr(self, peer_info: PeerInfo) -> None: async def add_addr(self, peer_info: PeerInfo) -> None:
"""Add a peer to the peerstore, emit discovery event, and attempt connection.""" """Add a peer to the peerstore, emit discovery event, and attempt connection in parallel."""
logger.info(f"📥 Adding peer to peerstore: {peer_info.peer_id}")
logger.info(f"📍 Total addresses received: {len(peer_info.addrs)}")
# Skip if it's our own peer # Skip if it's our own peer
if peer_info.peer_id == self.swarm.get_peer_id(): if peer_info.peer_id == self.swarm.get_peer_id():
logger.debug(f"Skipping own peer ID: {peer_info.peer_id}") logger.debug(f"Skipping own peer ID: {peer_info.peer_id}")
return return
# Always add addresses to peerstore (allows multiple addresses for same peer) # Always add addresses to peerstore with TTL=0 (no expiration)
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 0)
# Allow other tasks to run after adding to peerstore
await trio.lowlevel.checkpoint()
# Verify addresses were added
stored_addrs = self.peerstore.addrs(peer_info.peer_id)
logger.info(f"✅ Addresses stored in peerstore: {len(stored_addrs)} addresses")
# Only emit discovery event if this is the first time we see this peer # Only emit discovery event if this is the first time we see this peer
peer_id_str = str(peer_info.peer_id) peer_id_str = str(peer_info.peer_id)
@ -93,21 +143,42 @@ class BootstrapDiscovery:
peerDiscovery.emit_peer_discovered(peer_info) peerDiscovery.emit_peer_discovered(peer_info)
logger.debug(f"Peer discovered: {peer_info.peer_id}") logger.debug(f"Peer discovered: {peer_info.peer_id}")
# Attempt to connect to the peer # Use nursery for parallel connection attempt
await self._connect_to_peer(peer_info.peer_id) async with trio.open_nursery() as connection_nursery:
logger.info(f" 🔌 Starting parallel connection attempt...")
connection_nursery.start_soon(self._connect_to_peer, peer_info.peer_id)
else: else:
logger.debug(f"Additional addresses added for peer: {peer_info.peer_id}") logger.debug(f"🔄 Additional addresses added for existing peer: {peer_info.peer_id}")
# Even for existing peers, try to connect if not already connected
if peer_info.peer_id not in self.swarm.connections:
logger.info(f"🔌 Starting parallel connection attempt for existing peer...")
# Use nursery for parallel connection
async with trio.open_nursery() as connection_nursery:
connection_nursery.start_soon(self._connect_to_peer, peer_info.peer_id)
async def _connect_to_peer(self, peer_id: ID) -> None: async def _connect_to_peer(self, peer_id: ID) -> None:
"""Attempt to establish a connection to a peer using swarm.dial_peer.""" """Attempt to establish a connection to a peer using swarm.dial_peer."""
logger.info(f"🔌 Connection attempt for peer: {peer_id}")
# Pre-connection validation: Check if already connected # Pre-connection validation: Check if already connected
# This prevents duplicate connection attempts and unnecessary network overhead
if peer_id in self.swarm.connections: if peer_id in self.swarm.connections:
logger.debug( logger.debug(
f"Already connected to {peer_id} - skipping connection attempt" f"Already connected to {peer_id} - skipping connection attempt"
) )
return return
# Allow other tasks to run before connection attempt
await trio.lowlevel.checkpoint()
# Check available addresses before attempting connection
available_addrs = self.peerstore.addrs(peer_id)
logger.info(f"📍 Available addresses for {peer_id}: {len(available_addrs)} addresses")
if not available_addrs:
logger.error(f"❌ No addresses available for {peer_id} - cannot connect")
return
try: try:
# Log connection attempt for monitoring and debugging # Log connection attempt for monitoring and debugging
logger.debug(f"Attempting to connect to {peer_id}") logger.debug(f"Attempting to connect to {peer_id}")
@ -115,17 +186,13 @@ class BootstrapDiscovery:
# Use swarm.dial_peer to establish connection # Use swarm.dial_peer to establish connection
await self.swarm.dial_peer(peer_id) await self.swarm.dial_peer(peer_id)
# Allow other tasks to run after dial attempt
await trio.lowlevel.checkpoint()
# Post-connection validation: Verify connection was actually established # Post-connection validation: Verify connection was actually established
# swarm.dial_peer may succeed but connection might not be in
# connections dict
# This can happen due to race conditions or connection cleanup
if peer_id in self.swarm.connections: if peer_id in self.swarm.connections:
# Connection successfully established and registered
# Log success at INFO level for operational visibility
logger.info(f"Connected to {peer_id}") logger.info(f"Connected to {peer_id}")
else: else:
# Edge case: dial succeeded but connection not found
# This indicates a potential issue with connection management
logger.warning(f"Dial succeeded but connection not found for {peer_id}") logger.warning(f"Dial succeeded but connection not found for {peer_id}")
except SwarmException as e: except SwarmException as e:
@ -135,5 +202,4 @@ class BootstrapDiscovery:
except Exception as e: except Exception as e:
# Handle unexpected errors that aren't swarm-specific # Handle unexpected errors that aren't swarm-specific
logger.error(f"Unexpected error connecting to {peer_id}: {e}") logger.error(f"Unexpected error connecting to {peer_id}: {e}")
# Re-raise to allow caller to handle if needed
raise raise