From cd7eaba4a43c69a5f2cddadfc05de84c12296920 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sun, 8 Jun 2025 10:02:47 +0530 Subject: [PATCH] feat: implement mDNS discovery with PeerListener --- libp2p/discovery/mdns/listener.py | 102 ++++++++++++++++++++++++++++++ libp2p/discovery/mdns/mdns.py | 96 ++++++++++------------------ 2 files changed, 136 insertions(+), 62 deletions(-) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index e69de29b..3bc97785 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -0,0 +1,102 @@ +import time +import socket +from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf +from libp2p.peer.peerinfo import PeerInfo +from libp2p.peer.id import ID + +class PeerListener: + """Enhanced mDNS listener for libp2p peer discovery.""" + + def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, on_peer_discovery=None): + self.zeroconf = zeroconf + self.service_type = service_type + self.service_name = service_name + self.on_peer_discovery = on_peer_discovery + self.discovered_services = set() + self.browser = ServiceBrowser( + self.zeroconf, self.service_type, handlers=[self.on_service_state_change] + ) + + def on_service_state_change(self, zeroconf: Zeroconf, service_type, name, state_change): + if state_change != ServiceStateChange.Added or name == self.service_name or name in self.discovered_services: + return + + print(f"Discovered service: {name}") + self.discovered_services.add(name) + + # Process the discovered service + self._process_discovered_service(zeroconf, service_type, name) + + def _process_discovered_service(self, zeroconf, service_type, name): + # Try to get service info with retries + info = None + for attempt in range(3): + info = zeroconf.get_service_info(service_type, name, timeout=5000) + if info: + print(f"Service info successfully retrieved for {name}") + break + print(f"Retrying service info retrieval (attempt {attempt + 1}/3)") + time.sleep(1) + + if not info: + print(f"Failed to retrieve service info for {name}") + return + + # Extract peer information + peer_info = self._extract_peer_info(info) + print(f"Extracted peer info: {peer_info}") + if peer_info: + self._handle_discovered_peer(peer_info) + + def _extract_peer_info(self, service_info): + try: + # Extract IP addresses + addresses = [] + for addr in service_info.addresses: + ip = socket.inet_ntoa(addr) + addresses.append(ip) + + # Extract port + port = service_info.port + + # Extract peer ID from TXT record + peer_id = None + if service_info.properties: + peer_id_bytes = service_info.properties.get(b'id') + if peer_id_bytes: + peer_id = peer_id_bytes.decode('utf-8') + + if not peer_id: + print(f"No peer ID found in TXT record for {service_info.name}") + return None + + # Create multiaddresses + multiaddrs = [] + for ip in addresses: + multiaddr = f"/ip4/{ip}/udp/{port}" + multiaddrs.append(multiaddr) + + # Create PeerInfo object + peer_info = PeerInfo( + peer_id=ID.from_base58(peer_id), + addrs=multiaddrs + ) + + return peer_info + + except Exception as e: + print(f"Error extracting peer info from {service_info.name}: {e}") + return None + + def _handle_discovered_peer(self, peer_info): + print(f"Successfully discovered peer: {peer_info.peer_id}") + print(f"Peer addresses: {peer_info.addrs}") + + # Trigger callback if provided + if self.on_peer_discovery: + self.on_peer_discovery(peer_info) + + def stop(self): + """Stop the listener.""" + if self.browser: + self.browser.cancel() \ No newline at end of file diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py index 8670e00d..bf57ad03 100644 --- a/libp2p/discovery/mdns/mdns.py +++ b/libp2p/discovery/mdns/mdns.py @@ -12,74 +12,46 @@ from .utils import ( from libp2p.abc import ( INetworkService ) +from .listener import PeerListener +from .broadcaster import PeerBroadcaster +from libp2p.peer.peerinfo import PeerInfo SERVICE_TYPE = "_p2p._udp.local." MCAST_PORT = 5353 MCAST_ADDR = "224.0.0.251" class MDNSDiscovery: - def __init__(self, swarm: INetworkService): - self.peer_id = swarm.get_peer_id() - self.port = 8000 # Default port, can be overridden - # self.broadcast = init.get('broadcast', True) is not False - # self.on_peer = on_peer # Callback: async def on_peer(peer_info) - # self.service_name = service_name or f"{peer_id}.{SERVICE_TYPE}" - # self.zeroconf = Zeroconf() - # self._service_info = ServiceInfo( - # SERVICE_TYPE, - # self.service_name, - # addresses=[], # Will be set on register - # port=self.port, - # properties={b'id': peer_id.encode()}, - # ) - # self._browser = None - # self._running = False + """ + mDNS-based peer discovery for py-libp2p, using zeroconf. + Conforms to the libp2p mDNS discovery spec. + """ + def __init__(self, swarm: INetworkService, port: int = 8000, on_peer_discovery=None): + self.peer_id = str(swarm.get_peer_id()) + self.port = port + self.on_peer_discovery = on_peer_discovery + self.zeroconf = Zeroconf() + self.serviceName = f"{stringGen()}.{SERVICE_TYPE}" + self.broadcaster = PeerBroadcaster( + zeroconf=self.zeroconf, + service_type=SERVICE_TYPE, + service_name=self.serviceName, + peer_id = self.peer_id, + port = self.port + ) + self.listener = PeerListener( + zeroconf=self.zeroconf, + service_type=SERVICE_TYPE, + service_name=self.serviceName, + on_peer_discovery=self.on_peer_discovery + ) - def main(self) -> None: - """ - Main entry point for the mDNS discovery service. - This method is intended to be run in an event loop. - """ - trio.run(self.start) - - async def start(self): - await trio.sleep(10) - # self._running = True - # await trio.to_thread.run_sync(self.zeroconf.register_service, self._service_info) - # self._browser = ServiceBrowser(self.zeroconf, SERVICE_TYPE, handlers=[self._on_service_state_change]) + def start(self): + """Register this peer and start listening for others.""" print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}") + self.broadcaster.register() + # Listener is started in constructor - async def stop(self): - # self._running = False - # await trio.to_thread.run_sync(self.zeroconf.unregister_service, self._service_info) - # await trio.to_thread.run_sync(self.zeroconf.close) - print(f"Stopping mDNS discovery for peer {self.peer_id}") - - def _on_service_state_change(self, zeroconf, service_type, name, state_change): - if state_change is not ServiceStateChange.Added: - return - info = zeroconf.get_service_info(service_type, name) - if not info or name == self.service_name: - return - peer_id = info.properties.get(b'id') - if not peer_id: - return - peer_id = peer_id.decode() - addresses = [addr for addr in info.parsed_addresses()] - port = info.port - peer_info = {'peer_id': peer_id, 'addresses': addresses, 'port': port} - if self.on_peer: - # Schedule callback in the background - trio.lowlevel.spawn_system_task(self._call_on_peer, peer_info) - - async def _call_on_peer(self, peer_info): - if self.on_peer: - await self.on_peer(peer_info) - -# Example usage: -# async def on_peer(peer_info): -# print(f"Discovered peer: {peer_info['peer_id']} at {peer_info['addresses']}:{peer_info['port']}") -# mdns = MDNSDiscovery(peer_id, port, on_peer) -# await mdns.start() -# ... -# await mdns.stop() + def stop(self): + """Unregister this peer and clean up zeroconf resources.""" + self.broadcaster.unregister() + self.zeroconf.close() \ No newline at end of file