mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-10 15:10:54 +00:00
feat: implement mDNS discovery with PeerListener
This commit is contained in:
@ -0,0 +1,102 @@
|
|||||||
|
import time
|
||||||
|
import socket
|
||||||
|
from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf
|
||||||
|
from libp2p.peer.peerinfo import PeerInfo
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
|
||||||
|
class PeerListener:
|
||||||
|
"""Enhanced mDNS listener for libp2p peer discovery."""
|
||||||
|
|
||||||
|
def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, on_peer_discovery=None):
|
||||||
|
self.zeroconf = zeroconf
|
||||||
|
self.service_type = service_type
|
||||||
|
self.service_name = service_name
|
||||||
|
self.on_peer_discovery = on_peer_discovery
|
||||||
|
self.discovered_services = set()
|
||||||
|
self.browser = ServiceBrowser(
|
||||||
|
self.zeroconf, self.service_type, handlers=[self.on_service_state_change]
|
||||||
|
)
|
||||||
|
|
||||||
|
def on_service_state_change(self, zeroconf: Zeroconf, service_type, name, state_change):
|
||||||
|
if state_change != ServiceStateChange.Added or name == self.service_name or name in self.discovered_services:
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Discovered service: {name}")
|
||||||
|
self.discovered_services.add(name)
|
||||||
|
|
||||||
|
# Process the discovered service
|
||||||
|
self._process_discovered_service(zeroconf, service_type, name)
|
||||||
|
|
||||||
|
def _process_discovered_service(self, zeroconf, service_type, name):
|
||||||
|
# Try to get service info with retries
|
||||||
|
info = None
|
||||||
|
for attempt in range(3):
|
||||||
|
info = zeroconf.get_service_info(service_type, name, timeout=5000)
|
||||||
|
if info:
|
||||||
|
print(f"Service info successfully retrieved for {name}")
|
||||||
|
break
|
||||||
|
print(f"Retrying service info retrieval (attempt {attempt + 1}/3)")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
if not info:
|
||||||
|
print(f"Failed to retrieve service info for {name}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Extract peer information
|
||||||
|
peer_info = self._extract_peer_info(info)
|
||||||
|
print(f"Extracted peer info: {peer_info}")
|
||||||
|
if peer_info:
|
||||||
|
self._handle_discovered_peer(peer_info)
|
||||||
|
|
||||||
|
def _extract_peer_info(self, service_info):
|
||||||
|
try:
|
||||||
|
# Extract IP addresses
|
||||||
|
addresses = []
|
||||||
|
for addr in service_info.addresses:
|
||||||
|
ip = socket.inet_ntoa(addr)
|
||||||
|
addresses.append(ip)
|
||||||
|
|
||||||
|
# Extract port
|
||||||
|
port = service_info.port
|
||||||
|
|
||||||
|
# Extract peer ID from TXT record
|
||||||
|
peer_id = None
|
||||||
|
if service_info.properties:
|
||||||
|
peer_id_bytes = service_info.properties.get(b'id')
|
||||||
|
if peer_id_bytes:
|
||||||
|
peer_id = peer_id_bytes.decode('utf-8')
|
||||||
|
|
||||||
|
if not peer_id:
|
||||||
|
print(f"No peer ID found in TXT record for {service_info.name}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Create multiaddresses
|
||||||
|
multiaddrs = []
|
||||||
|
for ip in addresses:
|
||||||
|
multiaddr = f"/ip4/{ip}/udp/{port}"
|
||||||
|
multiaddrs.append(multiaddr)
|
||||||
|
|
||||||
|
# Create PeerInfo object
|
||||||
|
peer_info = PeerInfo(
|
||||||
|
peer_id=ID.from_base58(peer_id),
|
||||||
|
addrs=multiaddrs
|
||||||
|
)
|
||||||
|
|
||||||
|
return peer_info
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error extracting peer info from {service_info.name}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _handle_discovered_peer(self, peer_info):
|
||||||
|
print(f"Successfully discovered peer: {peer_info.peer_id}")
|
||||||
|
print(f"Peer addresses: {peer_info.addrs}")
|
||||||
|
|
||||||
|
# Trigger callback if provided
|
||||||
|
if self.on_peer_discovery:
|
||||||
|
self.on_peer_discovery(peer_info)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the listener."""
|
||||||
|
if self.browser:
|
||||||
|
self.browser.cancel()
|
||||||
@ -12,74 +12,46 @@ from .utils import (
|
|||||||
from libp2p.abc import (
|
from libp2p.abc import (
|
||||||
INetworkService
|
INetworkService
|
||||||
)
|
)
|
||||||
|
from .listener import PeerListener
|
||||||
|
from .broadcaster import PeerBroadcaster
|
||||||
|
from libp2p.peer.peerinfo import PeerInfo
|
||||||
|
|
||||||
SERVICE_TYPE = "_p2p._udp.local."
|
SERVICE_TYPE = "_p2p._udp.local."
|
||||||
MCAST_PORT = 5353
|
MCAST_PORT = 5353
|
||||||
MCAST_ADDR = "224.0.0.251"
|
MCAST_ADDR = "224.0.0.251"
|
||||||
|
|
||||||
class MDNSDiscovery:
|
class MDNSDiscovery:
|
||||||
def __init__(self, swarm: INetworkService):
|
"""
|
||||||
self.peer_id = swarm.get_peer_id()
|
mDNS-based peer discovery for py-libp2p, using zeroconf.
|
||||||
self.port = 8000 # Default port, can be overridden
|
Conforms to the libp2p mDNS discovery spec.
|
||||||
# self.broadcast = init.get('broadcast', True) is not False
|
"""
|
||||||
# self.on_peer = on_peer # Callback: async def on_peer(peer_info)
|
def __init__(self, swarm: INetworkService, port: int = 8000, on_peer_discovery=None):
|
||||||
# self.service_name = service_name or f"{peer_id}.{SERVICE_TYPE}"
|
self.peer_id = str(swarm.get_peer_id())
|
||||||
# self.zeroconf = Zeroconf()
|
self.port = port
|
||||||
# self._service_info = ServiceInfo(
|
self.on_peer_discovery = on_peer_discovery
|
||||||
# SERVICE_TYPE,
|
self.zeroconf = Zeroconf()
|
||||||
# self.service_name,
|
self.serviceName = f"{stringGen()}.{SERVICE_TYPE}"
|
||||||
# addresses=[], # Will be set on register
|
self.broadcaster = PeerBroadcaster(
|
||||||
# port=self.port,
|
zeroconf=self.zeroconf,
|
||||||
# properties={b'id': peer_id.encode()},
|
service_type=SERVICE_TYPE,
|
||||||
# )
|
service_name=self.serviceName,
|
||||||
# self._browser = None
|
peer_id = self.peer_id,
|
||||||
# self._running = False
|
port = self.port
|
||||||
|
)
|
||||||
|
self.listener = PeerListener(
|
||||||
|
zeroconf=self.zeroconf,
|
||||||
|
service_type=SERVICE_TYPE,
|
||||||
|
service_name=self.serviceName,
|
||||||
|
on_peer_discovery=self.on_peer_discovery
|
||||||
|
)
|
||||||
|
|
||||||
def main(self) -> None:
|
def start(self):
|
||||||
"""
|
"""Register this peer and start listening for others."""
|
||||||
Main entry point for the mDNS discovery service.
|
|
||||||
This method is intended to be run in an event loop.
|
|
||||||
"""
|
|
||||||
trio.run(self.start)
|
|
||||||
|
|
||||||
async def start(self):
|
|
||||||
await trio.sleep(10)
|
|
||||||
# self._running = True
|
|
||||||
# await trio.to_thread.run_sync(self.zeroconf.register_service, self._service_info)
|
|
||||||
# self._browser = ServiceBrowser(self.zeroconf, SERVICE_TYPE, handlers=[self._on_service_state_change])
|
|
||||||
print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}")
|
print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}")
|
||||||
|
self.broadcaster.register()
|
||||||
|
# Listener is started in constructor
|
||||||
|
|
||||||
async def stop(self):
|
def stop(self):
|
||||||
# self._running = False
|
"""Unregister this peer and clean up zeroconf resources."""
|
||||||
# await trio.to_thread.run_sync(self.zeroconf.unregister_service, self._service_info)
|
self.broadcaster.unregister()
|
||||||
# await trio.to_thread.run_sync(self.zeroconf.close)
|
self.zeroconf.close()
|
||||||
print(f"Stopping mDNS discovery for peer {self.peer_id}")
|
|
||||||
|
|
||||||
def _on_service_state_change(self, zeroconf, service_type, name, state_change):
|
|
||||||
if state_change is not ServiceStateChange.Added:
|
|
||||||
return
|
|
||||||
info = zeroconf.get_service_info(service_type, name)
|
|
||||||
if not info or name == self.service_name:
|
|
||||||
return
|
|
||||||
peer_id = info.properties.get(b'id')
|
|
||||||
if not peer_id:
|
|
||||||
return
|
|
||||||
peer_id = peer_id.decode()
|
|
||||||
addresses = [addr for addr in info.parsed_addresses()]
|
|
||||||
port = info.port
|
|
||||||
peer_info = {'peer_id': peer_id, 'addresses': addresses, 'port': port}
|
|
||||||
if self.on_peer:
|
|
||||||
# Schedule callback in the background
|
|
||||||
trio.lowlevel.spawn_system_task(self._call_on_peer, peer_info)
|
|
||||||
|
|
||||||
async def _call_on_peer(self, peer_info):
|
|
||||||
if self.on_peer:
|
|
||||||
await self.on_peer(peer_info)
|
|
||||||
|
|
||||||
# Example usage:
|
|
||||||
# async def on_peer(peer_info):
|
|
||||||
# print(f"Discovered peer: {peer_info['peer_id']} at {peer_info['addresses']}:{peer_info['port']}")
|
|
||||||
# mdns = MDNSDiscovery(peer_id, port, on_peer)
|
|
||||||
# await mdns.start()
|
|
||||||
# ...
|
|
||||||
# await mdns.stop()
|
|
||||||
Reference in New Issue
Block a user