diff --git a/examples/discovery/__init__.py b/examples/discovery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/discovery/mDNS/mDNS.py b/examples/discovery/mDNS/mDNS.py index 8563a1fb..cfe6c8b1 100644 --- a/examples/discovery/mDNS/mDNS.py +++ b/examples/discovery/mDNS/mDNS.py @@ -1,9 +1,21 @@ import secrets + import multiaddr import trio -from libp2p import new_host -from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p import ( + new_host, +) +from libp2p.abc import ( + PeerInfo, +) +from libp2p.crypto.secp256k1 import ( + create_new_key_pair, +) +from libp2p.discovery.events.peerDiscovery import ( + peerDiscovery, +) + async def main(): # Generate a key pair for the host @@ -17,20 +29,19 @@ async def main(): host = new_host(key_pair=key_pair, enable_mDNS=True) async with host.run(listen_addrs=[listen_addr]): - print("Host started!") - print("Peer ID:", host.get_id()) - print("Listening on:", [str(addr) for addr in host.get_addrs()]) + print("host peer id", host.get_id()) # Print discovered peers via mDNS print("Waiting for mDNS peer discovery events (Ctrl+C to exit)...") try: while True: - # Print all known peers every 5 seconds - peers = host.get_peerstore().peer_ids() - print("Known peers:", [str(p) for p in peers if p != host.get_id()]) - await trio.sleep(5) + peer_info = PeerInfo(host.get_id(), host.get_addrs()) + + await trio.sleep(1) + await peerDiscovery.emit_peer_discovered(peer_info=peer_info) except KeyboardInterrupt: print("Exiting...") + if __name__ == "__main__": - trio.run(main) \ No newline at end of file + trio.run(main) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index bc814da6..a066207c 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -32,6 +32,9 @@ from libp2p.custom_types import ( TProtocol, TSecurityOptions, ) +from libp2p.discovery.mdns.mdns import ( + MDNSDiscovery, +) from libp2p.host.basic_host import ( BasicHost, ) @@ -71,9 +74,6 @@ from libp2p.transport.upgrader import ( from libp2p.utils.logging import ( setup_logging, ) -from libp2p.discovery.mdns.mdns import ( - MDNSDiscovery -) # Initialize logging configuration setup_logging() @@ -269,16 +269,13 @@ def new_host( listen_addrs=listen_addrs, ) - if disc_opt is not None: - host = RoutedHost(swarm, disc_opt) - else: - host = BasicHost(swarm) - if enable_mDNS: mdns = MDNSDiscovery(swarm) mdns.start() - host._mdns = mdns - - return host + + if disc_opt is not None: + return RoutedHost(swarm, disc_opt) + else: + return BasicHost(swarm) __version__ = __version("libp2p") diff --git a/libp2p/discovery/events/__init__.py b/libp2p/discovery/events/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/events/peerDiscovery.py b/libp2p/discovery/events/peerDiscovery.py new file mode 100644 index 00000000..924998cd --- /dev/null +++ b/libp2p/discovery/events/peerDiscovery.py @@ -0,0 +1,37 @@ +from collections.abc import ( + Awaitable, + Callable, +) + +import trio + +from libp2p.abc import ( + PeerInfo, +) + +TTL: int = 60 * 60 # Time-to-live for discovered peers in seconds + + +class PeerDiscovery: + def __init__(self) -> None: + self._peer_discovered_handlers: list[Callable[[PeerInfo], Awaitable[None]]] = [] + + def register_peer_discovered_handler( + self, handler: Callable[[PeerInfo], Awaitable[None]] + ) -> None: + self._peer_discovered_handlers.append(handler) + + async def emit_peer_discovered(self, peer_info: PeerInfo) -> None: + for handler in self._peer_discovered_handlers: + await handler(peer_info) + + +peerDiscovery = PeerDiscovery() + + +async def peerDiscoveryHandler(peerInfo: PeerInfo) -> None: + await trio.sleep(5) # Simulate some processing delay + # print("Discovered peer is", peerInfo.peer_id) + + +peerDiscovery.register_peer_discovered_handler(peerDiscoveryHandler) diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py index 9c83bf96..67c07ef7 100644 --- a/libp2p/discovery/mdns/broadcaster.py +++ b/libp2p/discovery/mdns/broadcaster.py @@ -1,32 +1,45 @@ -from zeroconf import Zeroconf, ServiceInfo -from .utils import stringGen import socket +from zeroconf import ( + ServiceInfo, + Zeroconf, +) + + class PeerBroadcaster: """ Broadcasts this peer's presence on the local network using mDNS/zeroconf. Registers a service with the peer's ID in the TXT record as per libp2p spec. """ - def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, peer_id: str, port: int): + + def __init__( + self, + zeroconf: Zeroconf, + service_type: str, + service_name: str, + peer_id: str, + port: int, + ): self.zeroconf = zeroconf self.service_type = service_type self.peer_id = peer_id self.port = port self.service_name = service_name - + # Get the local IP address local_ip = self._get_local_ip() - + hostname = socket.gethostname() + self.service_info = ServiceInfo( type_=self.service_type, name=self.service_name, port=self.port, - properties={b'id': self.peer_id.encode()}, - server=f"{self.service_name}", - addresses=[socket.inet_aton(local_ip)] + properties={b"id": self.peer_id.encode()}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton(local_ip)], ) - def _get_local_ip(self): + def _get_local_ip(self) -> str: """Get the local IP address of this machine""" try: # Connect to a remote address to determine the local IP @@ -34,16 +47,17 @@ class PeerBroadcaster: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] + print(f"Local IP determined: {local_ip}") return local_ip except Exception: # Fallback to localhost if we can't determine the IP return "127.0.0.1" - def register(self): + def register(self) -> None: """Register the peer's mDNS service on the network.""" - print(f"Registering with name {self.service_name} and peer_id {self.peer_id} on port {self.port}") + print(repr(self.service_info)) self.zeroconf.register_service(self.service_info) - def unregister(self): + def unregister(self) -> None: """Unregister the peer's mDNS service from the network.""" self.zeroconf.unregister_service(self.service_info) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index 3bc97785..3faea4e2 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -1,102 +1,105 @@ -import time import socket -from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf -from libp2p.peer.peerinfo import PeerInfo -from libp2p.peer.id import ID +import time -class PeerListener: - """Enhanced mDNS listener for libp2p peer discovery.""" - - def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, on_peer_discovery=None): +from zeroconf import ( + ServiceBrowser, + ServiceInfo, + ServiceListener, + ServiceStateChange, + Zeroconf, +) + +from libp2p.abc import IPeerStore +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo + + +class PeerListener(ServiceListener): + """mDNS listener — now a true ServiceListener subclass.""" + + def __init__( + self, + peerstore: IPeerStore, + zeroconf: Zeroconf, + service_type: str, + service_name: str, + ) -> None: + self.peerstore = peerstore self.zeroconf = zeroconf self.service_type = service_type self.service_name = service_name - self.on_peer_discovery = on_peer_discovery - self.discovered_services = set() - self.browser = ServiceBrowser( - self.zeroconf, self.service_type, handlers=[self.on_service_state_change] - ) + self.discovered_services: set[str] = set() - def on_service_state_change(self, zeroconf: Zeroconf, service_type, name, state_change): - if state_change != ServiceStateChange.Added or name == self.service_name or name in self.discovered_services: + # pass `self` as the listener object + self.browser = ServiceBrowser(self.zeroconf, self.service_type, listener=self) + + def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: + # map to your on_service_state_change logic + self._on_state_change(zc, type_, name, ServiceStateChange.Added) + + def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: + self._on_state_change(zc, type_, name, ServiceStateChange.Removed) + + def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: + self._on_state_change(zc, type_, name, ServiceStateChange.Updated) + + def _on_state_change( + self, + zeroconf: Zeroconf, + service_type: str, + name: str, + state: ServiceStateChange, + ) -> None: + # skip our own service + if name == self.service_name: return - - print(f"Discovered service: {name}") - self.discovered_services.add(name) - - # Process the discovered service - self._process_discovered_service(zeroconf, service_type, name) - def _process_discovered_service(self, zeroconf, service_type, name): - # Try to get service info with retries + # handle Added + if state is ServiceStateChange.Added: + if name in self.discovered_services: + return + self.discovered_services.add(name) + self._process_discovered_service(zeroconf, service_type, name) + + # ...optional hooks for Removed/Updated if you need them + + def _process_discovered_service( + self, zeroconf: Zeroconf, service_type: str, name: str + ) -> None: + # same retry logic you had before info = None for attempt in range(3): info = zeroconf.get_service_info(service_type, name, timeout=5000) if info: - print(f"Service info successfully retrieved for {name}") break - print(f"Retrying service info retrieval (attempt {attempt + 1}/3)") time.sleep(1) if not info: - print(f"Failed to retrieve service info for {name}") return - # Extract peer information peer_info = self._extract_peer_info(info) - print(f"Extracted peer info: {peer_info}") if peer_info: + # your existing hook self._handle_discovered_peer(peer_info) - def _extract_peer_info(self, service_info): + def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None: try: - # Extract IP addresses - addresses = [] - for addr in service_info.addresses: - ip = socket.inet_ntoa(addr) - addresses.append(ip) - - # Extract port - port = service_info.port - - # Extract peer ID from TXT record - peer_id = None - if service_info.properties: - peer_id_bytes = service_info.properties.get(b'id') - if peer_id_bytes: - peer_id = peer_id_bytes.decode('utf-8') - - if not peer_id: - print(f"No peer ID found in TXT record for {service_info.name}") + addrs = [ + f"/ip4/{socket.inet_ntoa(addr)}/udp/{info.port}" + for addr in info.addresses + ] + pid_bytes = info.properties.get(b"id") + if not pid_bytes: return None - - # Create multiaddresses - multiaddrs = [] - for ip in addresses: - multiaddr = f"/ip4/{ip}/udp/{port}" - multiaddrs.append(multiaddr) - - # Create PeerInfo object - peer_info = PeerInfo( - peer_id=ID.from_base58(peer_id), - addrs=multiaddrs - ) - - return peer_info - - except Exception as e: - print(f"Error extracting peer info from {service_info.name}: {e}") + pid = ID.from_base58(pid_bytes.decode()) + return PeerInfo(peer_id=pid, addrs=addrs) + except Exception: return None - def _handle_discovered_peer(self, peer_info): - print(f"Successfully discovered peer: {peer_info.peer_id}") - print(f"Peer addresses: {peer_info.addrs}") - - # Trigger callback if provided - if self.on_peer_discovery: - self.on_peer_discovery(peer_info) + def _handle_discovered_peer(self, peer_info: PeerInfo) -> None: + # your “emit” or “connect” logic goes here + # print("Discovered:", peer_info) + print("Discovered:", peer_info.peer_id) - def stop(self): - """Stop the listener.""" - if self.browser: - self.browser.cancel() \ No newline at end of file + def stop(self) -> None: + self.browser.cancel() diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py index bf57ad03..62e6685d 100644 --- a/libp2p/discovery/mdns/mdns.py +++ b/libp2p/discovery/mdns/mdns.py @@ -3,55 +3,65 @@ mDNS-based peer discovery for py-libp2p. Conforms to https://github.com/libp2p/specs/blob/master/discovery/mdns.md Uses zeroconf for mDNS broadcast/listen. Async operations use trio. """ -import trio -from typing import Callable, Optional -from zeroconf import ServiceInfo, Zeroconf, ServiceBrowser, ServiceStateChange -from .utils import ( - stringGen + +from zeroconf import ( + Zeroconf, ) + from libp2p.abc import ( - INetworkService + INetworkService, +) + +from .broadcaster import ( + PeerBroadcaster, +) +from .listener import ( + PeerListener, +) +from .utils import ( + stringGen, ) -from .listener import PeerListener -from .broadcaster import PeerBroadcaster -from libp2p.peer.peerinfo import PeerInfo SERVICE_TYPE = "_p2p._udp.local." MCAST_PORT = 5353 MCAST_ADDR = "224.0.0.251" + class MDNSDiscovery: """ mDNS-based peer discovery for py-libp2p, using zeroconf. Conforms to the libp2p mDNS discovery spec. """ - def __init__(self, swarm: INetworkService, port: int = 8000, on_peer_discovery=None): + + def __init__(self, swarm: INetworkService, port: int = 8000): self.peer_id = str(swarm.get_peer_id()) self.port = port - self.on_peer_discovery = on_peer_discovery self.zeroconf = Zeroconf() self.serviceName = f"{stringGen()}.{SERVICE_TYPE}" + self.peerstore = swarm.peerstore + self.swarm = swarm self.broadcaster = PeerBroadcaster( zeroconf=self.zeroconf, service_type=SERVICE_TYPE, service_name=self.serviceName, - peer_id = self.peer_id, - port = self.port + peer_id=self.peer_id, + port=self.port, ) self.listener = PeerListener( zeroconf=self.zeroconf, + peerstore=self.peerstore, service_type=SERVICE_TYPE, service_name=self.serviceName, - on_peer_discovery=self.on_peer_discovery ) - def start(self): + def start(self) -> None: """Register this peer and start listening for others.""" print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}") + print("host is listening on", self.swarm.listeners) self.broadcaster.register() # Listener is started in constructor - def stop(self): + def stop(self) -> None: """Unregister this peer and clean up zeroconf resources.""" self.broadcaster.unregister() - self.zeroconf.close() \ No newline at end of file + self.zeroconf.close() diff --git a/libp2p/discovery/mdns/utils.py b/libp2p/discovery/mdns/utils.py index a341928d..eb05d03a 100644 --- a/libp2p/discovery/mdns/utils.py +++ b/libp2p/discovery/mdns/utils.py @@ -1,11 +1,11 @@ import random import string + def stringGen(len: int = 63) -> str: """Generate a random string of lowercase letters and digits.""" - charset = string.ascii_lowercase + string.digits result = [] for _ in range(len): result.append(random.choice(charset)) - return ''.join(result) \ No newline at end of file + return "".join(result)