From c33ab32c33bcbccd5a68d6080e9cd3d8cf9ae71b Mon Sep 17 00:00:00 2001 From: guha-rahul <69rahul16@gmail.com> Date: Mon, 16 Jun 2025 02:50:40 +0530 Subject: [PATCH 01/33] init --- libp2p/pubsub/floodsub.py | 14 +------------ libp2p/pubsub/gossipsub.py | 33 +++--------------------------- libp2p/pubsub/pubsub.py | 41 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 93d01f1a..3e0d454f 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -12,15 +12,9 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.network.stream.exceptions import ( - StreamClosed, -) from libp2p.peer.id import ( ID, ) -from libp2p.utils import ( - encode_varint_prefixed, -) from .exceptions import ( PubsubRouterError, @@ -120,13 +114,7 @@ class FloodSub(IPubsubRouter): if peer_id not in pubsub.peers: continue stream = pubsub.peers[peer_id] - # FIXME: We should add a `WriteMsg` similar to write delimited messages. - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - try: - await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) - except StreamClosed: - logger.debug("Fail to publish message to %s: stream closed", peer_id) - pubsub._handle_dead_peer(peer_id) + await pubsub.write_msg(stream, rpc_msg) async def join(self, topic: str) -> None: """ diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 813719dd..d2a52aaa 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -24,9 +24,6 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.network.stream.exceptions import ( - StreamClosed, -) from libp2p.peer.id import ( ID, ) @@ -42,9 +39,6 @@ from libp2p.pubsub import ( from libp2p.tools.async_service import ( Service, ) -from libp2p.utils import ( - encode_varint_prefixed, -) from .exceptions import ( NoPubsubAttached, @@ -249,14 +243,8 @@ class GossipSub(IPubsubRouter, Service): if peer_id not in self.pubsub.peers: continue stream = self.pubsub.peers[peer_id] - # FIXME: We should add a `WriteMsg` similar to write delimited messages. - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. - try: - await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) - except StreamClosed: - logger.debug("Fail to publish message to %s: stream closed", peer_id) - self.pubsub._handle_dead_peer(peer_id) + await self.pubsub.write_msg(stream, rpc_msg) for topic in pubsub_msg.topicIDs: self.time_since_last_publish[topic] = int(time.time()) @@ -705,8 +693,6 @@ class GossipSub(IPubsubRouter, Service): packet.publish.extend(msgs_to_forward) - # 2) Serialize that packet - rpc_msg: bytes = packet.SerializeToString() if self.pubsub is None: raise NoPubsubAttached @@ -720,14 +706,7 @@ class GossipSub(IPubsubRouter, Service): peer_stream = self.pubsub.peers[sender_peer_id] # 4) And write the packet to the stream - try: - await peer_stream.write(encode_varint_prefixed(rpc_msg)) - except StreamClosed: - logger.debug( - "Fail to responed to iwant request from %s: stream closed", - sender_peer_id, - ) - self.pubsub._handle_dead_peer(sender_peer_id) + await self.pubsub.write_msg(peer_stream, packet) async def handle_graft( self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID @@ -826,8 +805,6 @@ class GossipSub(IPubsubRouter, Service): packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.control.CopyFrom(control_msg) - rpc_msg: bytes = packet.SerializeToString() - # Get stream for peer from pubsub if to_peer not in self.pubsub.peers: logger.debug( @@ -837,8 +814,4 @@ class GossipSub(IPubsubRouter, Service): peer_stream = self.pubsub.peers[to_peer] # Write rpc to stream - try: - await peer_stream.write(encode_varint_prefixed(rpc_msg)) - except StreamClosed: - logger.debug("Fail to emit control message to %s: stream closed", to_peer) - self.pubsub._handle_dead_peer(to_peer) + await self.pubsub.write_msg(peer_stream, packet) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5f66f30a..093e2754 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -66,6 +66,7 @@ from libp2p.utils import ( encode_varint_prefixed, read_varint_prefixed_bytes, ) +from libp2p.utils.varint import encode_uvarint from .pb import ( rpc_pb2, @@ -773,3 +774,43 @@ class Pubsub(Service, IPubsub): def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: return any(topic in self.topic_ids for topic in msg.topicIDs) + + async def write_msg(self, stream: INetStream, rpc_msg: rpc_pb2.RPC) -> bool: + """ + Write an RPC message to a stream with proper error handling. + + Implements WriteMsg similar to go-libp2p-pubsub comm.go + Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 + + + :param stream: stream to write the message to + :param rpc_msg: RPC message to write + :return: True if successful, False if stream was closed + """ + try: + # Calculate message size first + msg_bytes = rpc_msg.SerializeToString() + msg_size = len(msg_bytes) + + # Calculate varint size and allocate exact buffer size needed + + varint_bytes = encode_uvarint(msg_size) + varint_size = len(varint_bytes) + + # Allocate buffer with exact size (like Go's pool.Get()) + buf = bytearray(varint_size + msg_size) + + # Write varint length prefix to buffer (like Go's binary.PutUvarint()) + buf[:varint_size] = varint_bytes + + # Write serialized message after varint (like Go's rpc.MarshalTo()) + buf[varint_size:] = msg_bytes + + # Single write operation (like Go's s.Write(buf)) + await stream.write(bytes(buf)) + return True + except StreamClosed: + peer_id = stream.muxed_conn.peer_id + logger.debug("Fail to write message to %s: stream closed", peer_id) + self._handle_dead_peer(peer_id) + return False From cbd4f9b502659dc73437e0ce8d55fe4f579b4a29 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sun, 1 Jun 2025 23:48:44 +0530 Subject: [PATCH 02/33] feat: init mDNS discovery module --- examples/discovery/mDNS/mDNS.py | 36 ++++++++++++ libp2p/__init__.py | 29 +++++++--- libp2p/discovery/__init__.py | 0 libp2p/discovery/mdns/__init__.py | 0 libp2p/discovery/mdns/broadcaster.py | 0 libp2p/discovery/mdns/listener.py | 0 libp2p/discovery/mdns/mdns.py | 85 ++++++++++++++++++++++++++++ 7 files changed, 141 insertions(+), 9 deletions(-) create mode 100644 examples/discovery/mDNS/mDNS.py create mode 100644 libp2p/discovery/__init__.py create mode 100644 libp2p/discovery/mdns/__init__.py create mode 100644 libp2p/discovery/mdns/broadcaster.py create mode 100644 libp2p/discovery/mdns/listener.py create mode 100644 libp2p/discovery/mdns/mdns.py diff --git a/examples/discovery/mDNS/mDNS.py b/examples/discovery/mDNS/mDNS.py new file mode 100644 index 00000000..8563a1fb --- /dev/null +++ b/examples/discovery/mDNS/mDNS.py @@ -0,0 +1,36 @@ +import secrets +import multiaddr +import trio + +from libp2p import new_host +from libp2p.crypto.secp256k1 import create_new_key_pair + +async def main(): + # Generate a key pair for the host + secret = secrets.token_bytes(32) + key_pair = create_new_key_pair(secret) + + # Listen on a random TCP port + listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") + + # Enable mDNS discovery + host = new_host(key_pair=key_pair, enable_mDNS=True) + + async with host.run(listen_addrs=[listen_addr]): + print("Host started!") + print("Peer ID:", host.get_id()) + print("Listening on:", [str(addr) for addr in host.get_addrs()]) + + # Print discovered peers via mDNS + print("Waiting for mDNS peer discovery events (Ctrl+C to exit)...") + try: + while True: + # Print all known peers every 5 seconds + peers = host.get_peerstore().peer_ids() + print("Known peers:", [str(p) for p in peers if p != host.get_id()]) + await trio.sleep(5) + except KeyboardInterrupt: + print("Exiting...") + +if __name__ == "__main__": + trio.run(main) \ No newline at end of file diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 64f47243..bc814da6 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -71,6 +71,9 @@ from libp2p.transport.upgrader import ( from libp2p.utils.logging import ( setup_logging, ) +from libp2p.discovery.mdns.mdns import ( + MDNSDiscovery +) # Initialize logging configuration setup_logging() @@ -236,13 +239,14 @@ def new_swarm( def new_host( - key_pair: KeyPair | None = None, - muxer_opt: TMuxerOptions | None = None, - sec_opt: TSecurityOptions | None = None, - peerstore_opt: IPeerStore | None = None, - disc_opt: IPeerRouting | None = None, - muxer_preference: Literal["YAMUX", "MPLEX"] | None = None, - listen_addrs: Sequence[multiaddr.Multiaddr] | None = None, + key_pair: Optional[KeyPair] = None, + muxer_opt: Optional[TMuxerOptions] = None, + sec_opt: Optional[TSecurityOptions] = None, + peerstore_opt: Optional[IPeerStore] = None, + disc_opt: Optional[IPeerRouting] = None, + muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None, + listen_addrs: Sequence[multiaddr.Multiaddr] = None, + enable_mDNS: bool = False, ) -> IHost: """ Create a new libp2p host based on the given parameters. @@ -266,8 +270,15 @@ def new_host( ) if disc_opt is not None: - return RoutedHost(swarm, disc_opt) - return BasicHost(swarm) + host = RoutedHost(swarm, disc_opt) + else: + host = BasicHost(swarm) + if enable_mDNS: + mdns = MDNSDiscovery(swarm) + mdns.start() + host._mdns = mdns + + return host __version__ = __version("libp2p") diff --git a/libp2p/discovery/__init__.py b/libp2p/discovery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/mdns/__init__.py b/libp2p/discovery/mdns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py new file mode 100644 index 00000000..8670e00d --- /dev/null +++ b/libp2p/discovery/mdns/mdns.py @@ -0,0 +1,85 @@ +""" +mDNS-based peer discovery for py-libp2p. +Conforms to https://github.com/libp2p/specs/blob/master/discovery/mdns.md +Uses zeroconf for mDNS broadcast/listen. Async operations use trio. +""" +import trio +from typing import Callable, Optional +from zeroconf import ServiceInfo, Zeroconf, ServiceBrowser, ServiceStateChange +from .utils import ( + stringGen +) +from libp2p.abc import ( + INetworkService +) + +SERVICE_TYPE = "_p2p._udp.local." +MCAST_PORT = 5353 +MCAST_ADDR = "224.0.0.251" + +class MDNSDiscovery: + def __init__(self, swarm: INetworkService): + self.peer_id = swarm.get_peer_id() + self.port = 8000 # Default port, can be overridden + # self.broadcast = init.get('broadcast', True) is not False + # self.on_peer = on_peer # Callback: async def on_peer(peer_info) + # self.service_name = service_name or f"{peer_id}.{SERVICE_TYPE}" + # self.zeroconf = Zeroconf() + # self._service_info = ServiceInfo( + # SERVICE_TYPE, + # self.service_name, + # addresses=[], # Will be set on register + # port=self.port, + # properties={b'id': peer_id.encode()}, + # ) + # self._browser = None + # self._running = False + + def main(self) -> None: + """ + Main entry point for the mDNS discovery service. + This method is intended to be run in an event loop. + """ + trio.run(self.start) + + async def start(self): + await trio.sleep(10) + # self._running = True + # await trio.to_thread.run_sync(self.zeroconf.register_service, self._service_info) + # self._browser = ServiceBrowser(self.zeroconf, SERVICE_TYPE, handlers=[self._on_service_state_change]) + print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}") + + async def stop(self): + # self._running = False + # await trio.to_thread.run_sync(self.zeroconf.unregister_service, self._service_info) + # await trio.to_thread.run_sync(self.zeroconf.close) + print(f"Stopping mDNS discovery for peer {self.peer_id}") + + def _on_service_state_change(self, zeroconf, service_type, name, state_change): + if state_change is not ServiceStateChange.Added: + return + info = zeroconf.get_service_info(service_type, name) + if not info or name == self.service_name: + return + peer_id = info.properties.get(b'id') + if not peer_id: + return + peer_id = peer_id.decode() + addresses = [addr for addr in info.parsed_addresses()] + port = info.port + peer_info = {'peer_id': peer_id, 'addresses': addresses, 'port': port} + if self.on_peer: + # Schedule callback in the background + trio.lowlevel.spawn_system_task(self._call_on_peer, peer_info) + + async def _call_on_peer(self, peer_info): + if self.on_peer: + await self.on_peer(peer_info) + +# Example usage: +# async def on_peer(peer_info): +# print(f"Discovered peer: {peer_info['peer_id']} at {peer_info['addresses']}:{peer_info['port']}") +# mdns = MDNSDiscovery(peer_id, port, on_peer) +# await mdns.start() +# ... +# await mdns.stop() From 742bc7bca3a91cf6af4e8a67c6fc2ef73ea584da Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sun, 1 Jun 2025 23:48:57 +0530 Subject: [PATCH 03/33] feat: add stringGen function to generate random strings --- libp2p/discovery/mdns/utils.py | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 libp2p/discovery/mdns/utils.py diff --git a/libp2p/discovery/mdns/utils.py b/libp2p/discovery/mdns/utils.py new file mode 100644 index 00000000..a341928d --- /dev/null +++ b/libp2p/discovery/mdns/utils.py @@ -0,0 +1,11 @@ +import random +import string + +def stringGen(len: int = 63) -> str: + """Generate a random string of lowercase letters and digits.""" + + charset = string.ascii_lowercase + string.digits + result = [] + for _ in range(len): + result.append(random.choice(charset)) + return ''.join(result) \ No newline at end of file From 6add1cb6857c14c3c951fb21c8e542c787bcaab6 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sun, 8 Jun 2025 10:02:13 +0530 Subject: [PATCH 04/33] feat: implement broadcasting in mdns --- libp2p/discovery/mdns/broadcaster.py | 49 ++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py index e69de29b..9c83bf96 100644 --- a/libp2p/discovery/mdns/broadcaster.py +++ b/libp2p/discovery/mdns/broadcaster.py @@ -0,0 +1,49 @@ +from zeroconf import Zeroconf, ServiceInfo +from .utils import stringGen +import socket + +class PeerBroadcaster: + """ + Broadcasts this peer's presence on the local network using mDNS/zeroconf. + Registers a service with the peer's ID in the TXT record as per libp2p spec. + """ + def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, peer_id: str, port: int): + self.zeroconf = zeroconf + self.service_type = service_type + self.peer_id = peer_id + self.port = port + self.service_name = service_name + + # Get the local IP address + local_ip = self._get_local_ip() + + self.service_info = ServiceInfo( + type_=self.service_type, + name=self.service_name, + port=self.port, + properties={b'id': self.peer_id.encode()}, + server=f"{self.service_name}", + addresses=[socket.inet_aton(local_ip)] + ) + + def _get_local_ip(self): + """Get the local IP address of this machine""" + try: + # Connect to a remote address to determine the local IP + # This doesn't actually send data + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + local_ip = s.getsockname()[0] + return local_ip + except Exception: + # Fallback to localhost if we can't determine the IP + return "127.0.0.1" + + def register(self): + """Register the peer's mDNS service on the network.""" + print(f"Registering with name {self.service_name} and peer_id {self.peer_id} on port {self.port}") + self.zeroconf.register_service(self.service_info) + + def unregister(self): + """Unregister the peer's mDNS service from the network.""" + self.zeroconf.unregister_service(self.service_info) From cd7eaba4a43c69a5f2cddadfc05de84c12296920 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sun, 8 Jun 2025 10:02:47 +0530 Subject: [PATCH 05/33] feat: implement mDNS discovery with PeerListener --- libp2p/discovery/mdns/listener.py | 102 ++++++++++++++++++++++++++++++ libp2p/discovery/mdns/mdns.py | 96 ++++++++++------------------ 2 files changed, 136 insertions(+), 62 deletions(-) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index e69de29b..3bc97785 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -0,0 +1,102 @@ +import time +import socket +from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf +from libp2p.peer.peerinfo import PeerInfo +from libp2p.peer.id import ID + +class PeerListener: + """Enhanced mDNS listener for libp2p peer discovery.""" + + def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, on_peer_discovery=None): + self.zeroconf = zeroconf + self.service_type = service_type + self.service_name = service_name + self.on_peer_discovery = on_peer_discovery + self.discovered_services = set() + self.browser = ServiceBrowser( + self.zeroconf, self.service_type, handlers=[self.on_service_state_change] + ) + + def on_service_state_change(self, zeroconf: Zeroconf, service_type, name, state_change): + if state_change != ServiceStateChange.Added or name == self.service_name or name in self.discovered_services: + return + + print(f"Discovered service: {name}") + self.discovered_services.add(name) + + # Process the discovered service + self._process_discovered_service(zeroconf, service_type, name) + + def _process_discovered_service(self, zeroconf, service_type, name): + # Try to get service info with retries + info = None + for attempt in range(3): + info = zeroconf.get_service_info(service_type, name, timeout=5000) + if info: + print(f"Service info successfully retrieved for {name}") + break + print(f"Retrying service info retrieval (attempt {attempt + 1}/3)") + time.sleep(1) + + if not info: + print(f"Failed to retrieve service info for {name}") + return + + # Extract peer information + peer_info = self._extract_peer_info(info) + print(f"Extracted peer info: {peer_info}") + if peer_info: + self._handle_discovered_peer(peer_info) + + def _extract_peer_info(self, service_info): + try: + # Extract IP addresses + addresses = [] + for addr in service_info.addresses: + ip = socket.inet_ntoa(addr) + addresses.append(ip) + + # Extract port + port = service_info.port + + # Extract peer ID from TXT record + peer_id = None + if service_info.properties: + peer_id_bytes = service_info.properties.get(b'id') + if peer_id_bytes: + peer_id = peer_id_bytes.decode('utf-8') + + if not peer_id: + print(f"No peer ID found in TXT record for {service_info.name}") + return None + + # Create multiaddresses + multiaddrs = [] + for ip in addresses: + multiaddr = f"/ip4/{ip}/udp/{port}" + multiaddrs.append(multiaddr) + + # Create PeerInfo object + peer_info = PeerInfo( + peer_id=ID.from_base58(peer_id), + addrs=multiaddrs + ) + + return peer_info + + except Exception as e: + print(f"Error extracting peer info from {service_info.name}: {e}") + return None + + def _handle_discovered_peer(self, peer_info): + print(f"Successfully discovered peer: {peer_info.peer_id}") + print(f"Peer addresses: {peer_info.addrs}") + + # Trigger callback if provided + if self.on_peer_discovery: + self.on_peer_discovery(peer_info) + + def stop(self): + """Stop the listener.""" + if self.browser: + self.browser.cancel() \ No newline at end of file diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py index 8670e00d..bf57ad03 100644 --- a/libp2p/discovery/mdns/mdns.py +++ b/libp2p/discovery/mdns/mdns.py @@ -12,74 +12,46 @@ from .utils import ( from libp2p.abc import ( INetworkService ) +from .listener import PeerListener +from .broadcaster import PeerBroadcaster +from libp2p.peer.peerinfo import PeerInfo SERVICE_TYPE = "_p2p._udp.local." MCAST_PORT = 5353 MCAST_ADDR = "224.0.0.251" class MDNSDiscovery: - def __init__(self, swarm: INetworkService): - self.peer_id = swarm.get_peer_id() - self.port = 8000 # Default port, can be overridden - # self.broadcast = init.get('broadcast', True) is not False - # self.on_peer = on_peer # Callback: async def on_peer(peer_info) - # self.service_name = service_name or f"{peer_id}.{SERVICE_TYPE}" - # self.zeroconf = Zeroconf() - # self._service_info = ServiceInfo( - # SERVICE_TYPE, - # self.service_name, - # addresses=[], # Will be set on register - # port=self.port, - # properties={b'id': peer_id.encode()}, - # ) - # self._browser = None - # self._running = False + """ + mDNS-based peer discovery for py-libp2p, using zeroconf. + Conforms to the libp2p mDNS discovery spec. + """ + def __init__(self, swarm: INetworkService, port: int = 8000, on_peer_discovery=None): + self.peer_id = str(swarm.get_peer_id()) + self.port = port + self.on_peer_discovery = on_peer_discovery + self.zeroconf = Zeroconf() + self.serviceName = f"{stringGen()}.{SERVICE_TYPE}" + self.broadcaster = PeerBroadcaster( + zeroconf=self.zeroconf, + service_type=SERVICE_TYPE, + service_name=self.serviceName, + peer_id = self.peer_id, + port = self.port + ) + self.listener = PeerListener( + zeroconf=self.zeroconf, + service_type=SERVICE_TYPE, + service_name=self.serviceName, + on_peer_discovery=self.on_peer_discovery + ) - def main(self) -> None: - """ - Main entry point for the mDNS discovery service. - This method is intended to be run in an event loop. - """ - trio.run(self.start) - - async def start(self): - await trio.sleep(10) - # self._running = True - # await trio.to_thread.run_sync(self.zeroconf.register_service, self._service_info) - # self._browser = ServiceBrowser(self.zeroconf, SERVICE_TYPE, handlers=[self._on_service_state_change]) + def start(self): + """Register this peer and start listening for others.""" print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}") + self.broadcaster.register() + # Listener is started in constructor - async def stop(self): - # self._running = False - # await trio.to_thread.run_sync(self.zeroconf.unregister_service, self._service_info) - # await trio.to_thread.run_sync(self.zeroconf.close) - print(f"Stopping mDNS discovery for peer {self.peer_id}") - - def _on_service_state_change(self, zeroconf, service_type, name, state_change): - if state_change is not ServiceStateChange.Added: - return - info = zeroconf.get_service_info(service_type, name) - if not info or name == self.service_name: - return - peer_id = info.properties.get(b'id') - if not peer_id: - return - peer_id = peer_id.decode() - addresses = [addr for addr in info.parsed_addresses()] - port = info.port - peer_info = {'peer_id': peer_id, 'addresses': addresses, 'port': port} - if self.on_peer: - # Schedule callback in the background - trio.lowlevel.spawn_system_task(self._call_on_peer, peer_info) - - async def _call_on_peer(self, peer_info): - if self.on_peer: - await self.on_peer(peer_info) - -# Example usage: -# async def on_peer(peer_info): -# print(f"Discovered peer: {peer_info['peer_id']} at {peer_info['addresses']}:{peer_info['port']}") -# mdns = MDNSDiscovery(peer_id, port, on_peer) -# await mdns.start() -# ... -# await mdns.stop() + def stop(self): + """Unregister this peer and clean up zeroconf resources.""" + self.broadcaster.unregister() + self.zeroconf.close() \ No newline at end of file From 3262749db74e89e9a41cf3cbf95df96d1e0d8789 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Fri, 20 Jun 2025 11:37:02 +0530 Subject: [PATCH 06/33] added event emmiter --- examples/discovery/__init__.py | 0 examples/discovery/mDNS/mDNS.py | 31 +++-- libp2p/__init__.py | 19 ++- libp2p/discovery/events/__init__.py | 0 libp2p/discovery/events/peerDiscovery.py | 37 ++++++ libp2p/discovery/mdns/broadcaster.py | 38 ++++-- libp2p/discovery/mdns/listener.py | 153 ++++++++++++----------- libp2p/discovery/mdns/mdns.py | 44 ++++--- libp2p/discovery/mdns/utils.py | 4 +- 9 files changed, 199 insertions(+), 127 deletions(-) create mode 100644 examples/discovery/__init__.py create mode 100644 libp2p/discovery/events/__init__.py create mode 100644 libp2p/discovery/events/peerDiscovery.py diff --git a/examples/discovery/__init__.py b/examples/discovery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/discovery/mDNS/mDNS.py b/examples/discovery/mDNS/mDNS.py index 8563a1fb..cfe6c8b1 100644 --- a/examples/discovery/mDNS/mDNS.py +++ b/examples/discovery/mDNS/mDNS.py @@ -1,9 +1,21 @@ import secrets + import multiaddr import trio -from libp2p import new_host -from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p import ( + new_host, +) +from libp2p.abc import ( + PeerInfo, +) +from libp2p.crypto.secp256k1 import ( + create_new_key_pair, +) +from libp2p.discovery.events.peerDiscovery import ( + peerDiscovery, +) + async def main(): # Generate a key pair for the host @@ -17,20 +29,19 @@ async def main(): host = new_host(key_pair=key_pair, enable_mDNS=True) async with host.run(listen_addrs=[listen_addr]): - print("Host started!") - print("Peer ID:", host.get_id()) - print("Listening on:", [str(addr) for addr in host.get_addrs()]) + print("host peer id", host.get_id()) # Print discovered peers via mDNS print("Waiting for mDNS peer discovery events (Ctrl+C to exit)...") try: while True: - # Print all known peers every 5 seconds - peers = host.get_peerstore().peer_ids() - print("Known peers:", [str(p) for p in peers if p != host.get_id()]) - await trio.sleep(5) + peer_info = PeerInfo(host.get_id(), host.get_addrs()) + + await trio.sleep(1) + await peerDiscovery.emit_peer_discovered(peer_info=peer_info) except KeyboardInterrupt: print("Exiting...") + if __name__ == "__main__": - trio.run(main) \ No newline at end of file + trio.run(main) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index bc814da6..a066207c 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -32,6 +32,9 @@ from libp2p.custom_types import ( TProtocol, TSecurityOptions, ) +from libp2p.discovery.mdns.mdns import ( + MDNSDiscovery, +) from libp2p.host.basic_host import ( BasicHost, ) @@ -71,9 +74,6 @@ from libp2p.transport.upgrader import ( from libp2p.utils.logging import ( setup_logging, ) -from libp2p.discovery.mdns.mdns import ( - MDNSDiscovery -) # Initialize logging configuration setup_logging() @@ -269,16 +269,13 @@ def new_host( listen_addrs=listen_addrs, ) - if disc_opt is not None: - host = RoutedHost(swarm, disc_opt) - else: - host = BasicHost(swarm) - if enable_mDNS: mdns = MDNSDiscovery(swarm) mdns.start() - host._mdns = mdns - - return host + + if disc_opt is not None: + return RoutedHost(swarm, disc_opt) + else: + return BasicHost(swarm) __version__ = __version("libp2p") diff --git a/libp2p/discovery/events/__init__.py b/libp2p/discovery/events/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/events/peerDiscovery.py b/libp2p/discovery/events/peerDiscovery.py new file mode 100644 index 00000000..924998cd --- /dev/null +++ b/libp2p/discovery/events/peerDiscovery.py @@ -0,0 +1,37 @@ +from collections.abc import ( + Awaitable, + Callable, +) + +import trio + +from libp2p.abc import ( + PeerInfo, +) + +TTL: int = 60 * 60 # Time-to-live for discovered peers in seconds + + +class PeerDiscovery: + def __init__(self) -> None: + self._peer_discovered_handlers: list[Callable[[PeerInfo], Awaitable[None]]] = [] + + def register_peer_discovered_handler( + self, handler: Callable[[PeerInfo], Awaitable[None]] + ) -> None: + self._peer_discovered_handlers.append(handler) + + async def emit_peer_discovered(self, peer_info: PeerInfo) -> None: + for handler in self._peer_discovered_handlers: + await handler(peer_info) + + +peerDiscovery = PeerDiscovery() + + +async def peerDiscoveryHandler(peerInfo: PeerInfo) -> None: + await trio.sleep(5) # Simulate some processing delay + # print("Discovered peer is", peerInfo.peer_id) + + +peerDiscovery.register_peer_discovered_handler(peerDiscoveryHandler) diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py index 9c83bf96..67c07ef7 100644 --- a/libp2p/discovery/mdns/broadcaster.py +++ b/libp2p/discovery/mdns/broadcaster.py @@ -1,32 +1,45 @@ -from zeroconf import Zeroconf, ServiceInfo -from .utils import stringGen import socket +from zeroconf import ( + ServiceInfo, + Zeroconf, +) + + class PeerBroadcaster: """ Broadcasts this peer's presence on the local network using mDNS/zeroconf. Registers a service with the peer's ID in the TXT record as per libp2p spec. """ - def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, peer_id: str, port: int): + + def __init__( + self, + zeroconf: Zeroconf, + service_type: str, + service_name: str, + peer_id: str, + port: int, + ): self.zeroconf = zeroconf self.service_type = service_type self.peer_id = peer_id self.port = port self.service_name = service_name - + # Get the local IP address local_ip = self._get_local_ip() - + hostname = socket.gethostname() + self.service_info = ServiceInfo( type_=self.service_type, name=self.service_name, port=self.port, - properties={b'id': self.peer_id.encode()}, - server=f"{self.service_name}", - addresses=[socket.inet_aton(local_ip)] + properties={b"id": self.peer_id.encode()}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton(local_ip)], ) - def _get_local_ip(self): + def _get_local_ip(self) -> str: """Get the local IP address of this machine""" try: # Connect to a remote address to determine the local IP @@ -34,16 +47,17 @@ class PeerBroadcaster: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] + print(f"Local IP determined: {local_ip}") return local_ip except Exception: # Fallback to localhost if we can't determine the IP return "127.0.0.1" - def register(self): + def register(self) -> None: """Register the peer's mDNS service on the network.""" - print(f"Registering with name {self.service_name} and peer_id {self.peer_id} on port {self.port}") + print(repr(self.service_info)) self.zeroconf.register_service(self.service_info) - def unregister(self): + def unregister(self) -> None: """Unregister the peer's mDNS service from the network.""" self.zeroconf.unregister_service(self.service_info) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index 3bc97785..3faea4e2 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -1,102 +1,105 @@ -import time import socket -from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf -from libp2p.peer.peerinfo import PeerInfo -from libp2p.peer.id import ID +import time -class PeerListener: - """Enhanced mDNS listener for libp2p peer discovery.""" - - def __init__(self, zeroconf: Zeroconf, service_type: str, service_name: str, on_peer_discovery=None): +from zeroconf import ( + ServiceBrowser, + ServiceInfo, + ServiceListener, + ServiceStateChange, + Zeroconf, +) + +from libp2p.abc import IPeerStore +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo + + +class PeerListener(ServiceListener): + """mDNS listener — now a true ServiceListener subclass.""" + + def __init__( + self, + peerstore: IPeerStore, + zeroconf: Zeroconf, + service_type: str, + service_name: str, + ) -> None: + self.peerstore = peerstore self.zeroconf = zeroconf self.service_type = service_type self.service_name = service_name - self.on_peer_discovery = on_peer_discovery - self.discovered_services = set() - self.browser = ServiceBrowser( - self.zeroconf, self.service_type, handlers=[self.on_service_state_change] - ) + self.discovered_services: set[str] = set() - def on_service_state_change(self, zeroconf: Zeroconf, service_type, name, state_change): - if state_change != ServiceStateChange.Added or name == self.service_name or name in self.discovered_services: + # pass `self` as the listener object + self.browser = ServiceBrowser(self.zeroconf, self.service_type, listener=self) + + def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: + # map to your on_service_state_change logic + self._on_state_change(zc, type_, name, ServiceStateChange.Added) + + def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: + self._on_state_change(zc, type_, name, ServiceStateChange.Removed) + + def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: + self._on_state_change(zc, type_, name, ServiceStateChange.Updated) + + def _on_state_change( + self, + zeroconf: Zeroconf, + service_type: str, + name: str, + state: ServiceStateChange, + ) -> None: + # skip our own service + if name == self.service_name: return - - print(f"Discovered service: {name}") - self.discovered_services.add(name) - - # Process the discovered service - self._process_discovered_service(zeroconf, service_type, name) - def _process_discovered_service(self, zeroconf, service_type, name): - # Try to get service info with retries + # handle Added + if state is ServiceStateChange.Added: + if name in self.discovered_services: + return + self.discovered_services.add(name) + self._process_discovered_service(zeroconf, service_type, name) + + # ...optional hooks for Removed/Updated if you need them + + def _process_discovered_service( + self, zeroconf: Zeroconf, service_type: str, name: str + ) -> None: + # same retry logic you had before info = None for attempt in range(3): info = zeroconf.get_service_info(service_type, name, timeout=5000) if info: - print(f"Service info successfully retrieved for {name}") break - print(f"Retrying service info retrieval (attempt {attempt + 1}/3)") time.sleep(1) if not info: - print(f"Failed to retrieve service info for {name}") return - # Extract peer information peer_info = self._extract_peer_info(info) - print(f"Extracted peer info: {peer_info}") if peer_info: + # your existing hook self._handle_discovered_peer(peer_info) - def _extract_peer_info(self, service_info): + def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None: try: - # Extract IP addresses - addresses = [] - for addr in service_info.addresses: - ip = socket.inet_ntoa(addr) - addresses.append(ip) - - # Extract port - port = service_info.port - - # Extract peer ID from TXT record - peer_id = None - if service_info.properties: - peer_id_bytes = service_info.properties.get(b'id') - if peer_id_bytes: - peer_id = peer_id_bytes.decode('utf-8') - - if not peer_id: - print(f"No peer ID found in TXT record for {service_info.name}") + addrs = [ + f"/ip4/{socket.inet_ntoa(addr)}/udp/{info.port}" + for addr in info.addresses + ] + pid_bytes = info.properties.get(b"id") + if not pid_bytes: return None - - # Create multiaddresses - multiaddrs = [] - for ip in addresses: - multiaddr = f"/ip4/{ip}/udp/{port}" - multiaddrs.append(multiaddr) - - # Create PeerInfo object - peer_info = PeerInfo( - peer_id=ID.from_base58(peer_id), - addrs=multiaddrs - ) - - return peer_info - - except Exception as e: - print(f"Error extracting peer info from {service_info.name}: {e}") + pid = ID.from_base58(pid_bytes.decode()) + return PeerInfo(peer_id=pid, addrs=addrs) + except Exception: return None - def _handle_discovered_peer(self, peer_info): - print(f"Successfully discovered peer: {peer_info.peer_id}") - print(f"Peer addresses: {peer_info.addrs}") - - # Trigger callback if provided - if self.on_peer_discovery: - self.on_peer_discovery(peer_info) + def _handle_discovered_peer(self, peer_info: PeerInfo) -> None: + # your “emit” or “connect” logic goes here + # print("Discovered:", peer_info) + print("Discovered:", peer_info.peer_id) - def stop(self): - """Stop the listener.""" - if self.browser: - self.browser.cancel() \ No newline at end of file + def stop(self) -> None: + self.browser.cancel() diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py index bf57ad03..62e6685d 100644 --- a/libp2p/discovery/mdns/mdns.py +++ b/libp2p/discovery/mdns/mdns.py @@ -3,55 +3,65 @@ mDNS-based peer discovery for py-libp2p. Conforms to https://github.com/libp2p/specs/blob/master/discovery/mdns.md Uses zeroconf for mDNS broadcast/listen. Async operations use trio. """ -import trio -from typing import Callable, Optional -from zeroconf import ServiceInfo, Zeroconf, ServiceBrowser, ServiceStateChange -from .utils import ( - stringGen + +from zeroconf import ( + Zeroconf, ) + from libp2p.abc import ( - INetworkService + INetworkService, +) + +from .broadcaster import ( + PeerBroadcaster, +) +from .listener import ( + PeerListener, +) +from .utils import ( + stringGen, ) -from .listener import PeerListener -from .broadcaster import PeerBroadcaster -from libp2p.peer.peerinfo import PeerInfo SERVICE_TYPE = "_p2p._udp.local." MCAST_PORT = 5353 MCAST_ADDR = "224.0.0.251" + class MDNSDiscovery: """ mDNS-based peer discovery for py-libp2p, using zeroconf. Conforms to the libp2p mDNS discovery spec. """ - def __init__(self, swarm: INetworkService, port: int = 8000, on_peer_discovery=None): + + def __init__(self, swarm: INetworkService, port: int = 8000): self.peer_id = str(swarm.get_peer_id()) self.port = port - self.on_peer_discovery = on_peer_discovery self.zeroconf = Zeroconf() self.serviceName = f"{stringGen()}.{SERVICE_TYPE}" + self.peerstore = swarm.peerstore + self.swarm = swarm self.broadcaster = PeerBroadcaster( zeroconf=self.zeroconf, service_type=SERVICE_TYPE, service_name=self.serviceName, - peer_id = self.peer_id, - port = self.port + peer_id=self.peer_id, + port=self.port, ) self.listener = PeerListener( zeroconf=self.zeroconf, + peerstore=self.peerstore, service_type=SERVICE_TYPE, service_name=self.serviceName, - on_peer_discovery=self.on_peer_discovery ) - def start(self): + def start(self) -> None: """Register this peer and start listening for others.""" print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}") + print("host is listening on", self.swarm.listeners) self.broadcaster.register() # Listener is started in constructor - def stop(self): + def stop(self) -> None: """Unregister this peer and clean up zeroconf resources.""" self.broadcaster.unregister() - self.zeroconf.close() \ No newline at end of file + self.zeroconf.close() diff --git a/libp2p/discovery/mdns/utils.py b/libp2p/discovery/mdns/utils.py index a341928d..eb05d03a 100644 --- a/libp2p/discovery/mdns/utils.py +++ b/libp2p/discovery/mdns/utils.py @@ -1,11 +1,11 @@ import random import string + def stringGen(len: int = 63) -> str: """Generate a random string of lowercase letters and digits.""" - charset = string.ascii_lowercase + string.digits result = [] for _ in range(len): result.append(random.choice(charset)) - return ''.join(result) \ No newline at end of file + return "".join(result) From f43e7e367a58a015b15248d4cb1b76f91f9e0dc8 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Fri, 20 Jun 2025 16:08:04 +0530 Subject: [PATCH 07/33] refactored code --- examples/discovery/mDNS/mDNS.py | 14 +---- libp2p/discovery/mdns/broadcaster.py | 2 - libp2p/discovery/mdns/listener.py | 86 +++++++++++----------------- libp2p/discovery/mdns/mdns.py | 1 - 4 files changed, 34 insertions(+), 69 deletions(-) diff --git a/examples/discovery/mDNS/mDNS.py b/examples/discovery/mDNS/mDNS.py index cfe6c8b1..996d8882 100644 --- a/examples/discovery/mDNS/mDNS.py +++ b/examples/discovery/mDNS/mDNS.py @@ -6,15 +6,9 @@ import trio from libp2p import ( new_host, ) -from libp2p.abc import ( - PeerInfo, -) from libp2p.crypto.secp256k1 import ( create_new_key_pair, ) -from libp2p.discovery.events.peerDiscovery import ( - peerDiscovery, -) async def main(): @@ -29,16 +23,10 @@ async def main(): host = new_host(key_pair=key_pair, enable_mDNS=True) async with host.run(listen_addrs=[listen_addr]): - print("host peer id", host.get_id()) - # Print discovered peers via mDNS - print("Waiting for mDNS peer discovery events (Ctrl+C to exit)...") try: while True: - peer_info = PeerInfo(host.get_id(), host.get_addrs()) - - await trio.sleep(1) - await peerDiscovery.emit_peer_discovered(peer_info=peer_info) + await trio.sleep(100) except KeyboardInterrupt: print("Exiting...") diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py index 67c07ef7..8e14ed96 100644 --- a/libp2p/discovery/mdns/broadcaster.py +++ b/libp2p/discovery/mdns/broadcaster.py @@ -47,7 +47,6 @@ class PeerBroadcaster: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] - print(f"Local IP determined: {local_ip}") return local_ip except Exception: # Fallback to localhost if we can't determine the IP @@ -55,7 +54,6 @@ class PeerBroadcaster: def register(self) -> None: """Register the peer's mDNS service on the network.""" - print(repr(self.service_info)) self.zeroconf.register_service(self.service_info) def unregister(self) -> None: diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index 3faea4e2..d8d44f38 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -1,15 +1,13 @@ import socket -import time from zeroconf import ( ServiceBrowser, ServiceInfo, ServiceListener, - ServiceStateChange, Zeroconf, ) -from libp2p.abc import IPeerStore +from libp2p.abc import IPeerStore, Multiaddr from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo @@ -28,64 +26,51 @@ class PeerListener(ServiceListener): self.zeroconf = zeroconf self.service_type = service_type self.service_name = service_name - self.discovered_services: set[str] = set() - - # pass `self` as the listener object + self.discovered_services: dict[str, ID] = {} self.browser = ServiceBrowser(self.zeroconf, self.service_type, listener=self) def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: - # map to your on_service_state_change logic - self._on_state_change(zc, type_, name, ServiceStateChange.Added) - - def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: - self._on_state_change(zc, type_, name, ServiceStateChange.Removed) - - def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: - self._on_state_change(zc, type_, name, ServiceStateChange.Updated) - - def _on_state_change( - self, - zeroconf: Zeroconf, - service_type: str, - name: str, - state: ServiceStateChange, - ) -> None: - # skip our own service if name == self.service_name: return - - # handle Added - if state is ServiceStateChange.Added: - if name in self.discovered_services: - return - self.discovered_services.add(name) - self._process_discovered_service(zeroconf, service_type, name) - - # ...optional hooks for Removed/Updated if you need them - - def _process_discovered_service( - self, zeroconf: Zeroconf, service_type: str, name: str - ) -> None: - # same retry logic you had before - info = None - for attempt in range(3): - info = zeroconf.get_service_info(service_type, name, timeout=5000) - if info: - break - time.sleep(1) - + info = zc.get_service_info(type_, name, timeout=5000) if not info: return - peer_info = self._extract_peer_info(info) if peer_info: - # your existing hook - self._handle_discovered_peer(peer_info) + self.discovered_services[name] = peer_info.peer_id + self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + print("Discovered Peer:", peer_info.peer_id) + + def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: + peer_id = self.discovered_services.pop(name) + self.peerstore.clear_addrs(peer_id) + print(f"Removed Peer: {peer_id}") + + def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: + info = zc.get_service_info(type_, name, timeout=5000) + if not info: + return + peer_info = self._extract_peer_info(info) + if peer_info: + self.peerstore.clear_addrs(peer_info.peer_id) + self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + print("Updated Peer", peer_info.peer_id) + + def _process_discovered_service( + self, zeroconf: Zeroconf, type_: str, name: str + ) -> None: + info = zeroconf.get_service_info(type_, name, timeout=5000) + if not info: + return + peer_info = self._extract_peer_info(info) + if peer_info: + self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + print("Discovered:", peer_info.peer_id) def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None: try: addrs = [ - f"/ip4/{socket.inet_ntoa(addr)}/udp/{info.port}" + Multiaddr(f"/ip4/{socket.inet_ntoa(addr)}/udp/{info.port}") for addr in info.addresses ] pid_bytes = info.properties.get(b"id") @@ -96,10 +81,5 @@ class PeerListener(ServiceListener): except Exception: return None - def _handle_discovered_peer(self, peer_info: PeerInfo) -> None: - # your “emit” or “connect” logic goes here - # print("Discovered:", peer_info) - print("Discovered:", peer_info.peer_id) - def stop(self) -> None: self.browser.cancel() diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py index 62e6685d..a76cb39a 100644 --- a/libp2p/discovery/mdns/mdns.py +++ b/libp2p/discovery/mdns/mdns.py @@ -57,7 +57,6 @@ class MDNSDiscovery: def start(self) -> None: """Register this peer and start listening for others.""" print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}") - print("host is listening on", self.swarm.listeners) self.broadcaster.register() # Listener is started in constructor From e2f95f4df3177a7352c8a10e04967157a2922ab2 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Fri, 20 Jun 2025 16:25:28 +0530 Subject: [PATCH 08/33] feat: emitted event from demo file --- examples/discovery/mDNS/mDNS.py | 11 ++++++++++- libp2p/discovery/events/peerDiscovery.py | 14 +++++++------- libp2p/discovery/mdns/listener.py | 9 ++++++++- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/examples/discovery/mDNS/mDNS.py b/examples/discovery/mDNS/mDNS.py index 996d8882..82dc0212 100644 --- a/examples/discovery/mDNS/mDNS.py +++ b/examples/discovery/mDNS/mDNS.py @@ -9,6 +9,15 @@ from libp2p import ( from libp2p.crypto.secp256k1 import ( create_new_key_pair, ) +from libp2p.discovery.events.peerDiscovery import ( + peerDiscovery +) +from libp2p.abc import ( + PeerInfo +) + +def customFunctoion(peerinfo: PeerInfo): + print("Printing peer info from demo file",repr(peerinfo)) async def main(): @@ -18,7 +27,7 @@ async def main(): # Listen on a random TCP port listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") - + peerDiscovery.register_peer_discovered_handler(customFunctoion) # Enable mDNS discovery host = new_host(key_pair=key_pair, enable_mDNS=True) diff --git a/libp2p/discovery/events/peerDiscovery.py b/libp2p/discovery/events/peerDiscovery.py index 924998cd..6eefe381 100644 --- a/libp2p/discovery/events/peerDiscovery.py +++ b/libp2p/discovery/events/peerDiscovery.py @@ -14,24 +14,24 @@ TTL: int = 60 * 60 # Time-to-live for discovered peers in seconds class PeerDiscovery: def __init__(self) -> None: - self._peer_discovered_handlers: list[Callable[[PeerInfo], Awaitable[None]]] = [] + self._peer_discovered_handlers: list[Callable[[PeerInfo], None]] = [] def register_peer_discovered_handler( self, handler: Callable[[PeerInfo], Awaitable[None]] ) -> None: self._peer_discovered_handlers.append(handler) - async def emit_peer_discovered(self, peer_info: PeerInfo) -> None: + def emit_peer_discovered(self, peer_info: PeerInfo) -> None: for handler in self._peer_discovered_handlers: - await handler(peer_info) + handler(peer_info) peerDiscovery = PeerDiscovery() -async def peerDiscoveryHandler(peerInfo: PeerInfo) -> None: - await trio.sleep(5) # Simulate some processing delay - # print("Discovered peer is", peerInfo.peer_id) +# async def peerDiscoveryHandler(peerInfo: PeerInfo) -> None: +# await trio.sleep(5) # Simulate some processing delay +# # print("Discovered peer is", peerInfo.peer_id) -peerDiscovery.register_peer_discovered_handler(peerDiscoveryHandler) +# peerDiscovery.register_peer_discovered_handler(peerDiscoveryHandler) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index d8d44f38..2556f9d0 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -7,9 +7,15 @@ from zeroconf import ( Zeroconf, ) -from libp2p.abc import IPeerStore, Multiaddr +from libp2p.abc import ( + IPeerStore, + Multiaddr +) from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo +from libp2p.discovery.events.peerDiscovery import ( + peerDiscovery +) class PeerListener(ServiceListener): @@ -39,6 +45,7 @@ class PeerListener(ServiceListener): if peer_info: self.discovered_services[name] = peer_info.peer_id self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + peerDiscovery.emit_peer_discovered(peer_info) print("Discovered Peer:", peer_info.peer_id) def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: From 387f4879d16b6bb8f489c9bd7da63aa0901e21d3 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Fri, 20 Jun 2025 16:28:27 +0530 Subject: [PATCH 09/33] fix lint --- examples/discovery/mDNS/mDNS.py | 11 ++++------- libp2p/discovery/events/peerDiscovery.py | 7 ++----- libp2p/discovery/mdns/listener.py | 9 ++------- 3 files changed, 8 insertions(+), 19 deletions(-) diff --git a/examples/discovery/mDNS/mDNS.py b/examples/discovery/mDNS/mDNS.py index 82dc0212..279e2ec2 100644 --- a/examples/discovery/mDNS/mDNS.py +++ b/examples/discovery/mDNS/mDNS.py @@ -6,18 +6,15 @@ import trio from libp2p import ( new_host, ) +from libp2p.abc import PeerInfo from libp2p.crypto.secp256k1 import ( create_new_key_pair, ) -from libp2p.discovery.events.peerDiscovery import ( - peerDiscovery -) -from libp2p.abc import ( - PeerInfo -) +from libp2p.discovery.events.peerDiscovery import peerDiscovery + def customFunctoion(peerinfo: PeerInfo): - print("Printing peer info from demo file",repr(peerinfo)) + print("Printing peer info from demo file", repr(peerinfo)) async def main(): diff --git a/libp2p/discovery/events/peerDiscovery.py b/libp2p/discovery/events/peerDiscovery.py index 6eefe381..7cbbba7c 100644 --- a/libp2p/discovery/events/peerDiscovery.py +++ b/libp2p/discovery/events/peerDiscovery.py @@ -1,10 +1,7 @@ from collections.abc import ( - Awaitable, Callable, ) -import trio - from libp2p.abc import ( PeerInfo, ) @@ -14,10 +11,10 @@ TTL: int = 60 * 60 # Time-to-live for discovered peers in seconds class PeerDiscovery: def __init__(self) -> None: - self._peer_discovered_handlers: list[Callable[[PeerInfo], None]] = [] + self._peer_discovered_handlers: list[Callable[[PeerInfo], None]] = [] def register_peer_discovered_handler( - self, handler: Callable[[PeerInfo], Awaitable[None]] + self, handler: Callable[[PeerInfo], None] ) -> None: self._peer_discovered_handlers.append(handler) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index 2556f9d0..b5829123 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -7,15 +7,10 @@ from zeroconf import ( Zeroconf, ) -from libp2p.abc import ( - IPeerStore, - Multiaddr -) +from libp2p.abc import IPeerStore, Multiaddr +from libp2p.discovery.events.peerDiscovery import peerDiscovery from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo -from libp2p.discovery.events.peerDiscovery import ( - peerDiscovery -) class PeerListener(ServiceListener): From 89ed86d903596484a3e7494dd4b558ec768edc90 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Fri, 20 Jun 2025 20:17:01 +0530 Subject: [PATCH 10/33] feat: add logging for mDNS peer discovery and update dependencies --- examples/discovery/mDNS/mDNS.py | 21 +++++++++++++++------ libp2p/discovery/events/peerDiscovery.py | 8 -------- libp2p/discovery/mdns/broadcaster.py | 3 +++ libp2p/discovery/mdns/listener.py | 11 +++++++---- libp2p/discovery/mdns/mdns.py | 8 +++++++- pyproject.toml | 1 + 6 files changed, 33 insertions(+), 19 deletions(-) diff --git a/examples/discovery/mDNS/mDNS.py b/examples/discovery/mDNS/mDNS.py index 279e2ec2..28d2c9ea 100644 --- a/examples/discovery/mDNS/mDNS.py +++ b/examples/discovery/mDNS/mDNS.py @@ -1,3 +1,4 @@ +import logging import secrets import multiaddr @@ -12,9 +13,17 @@ from libp2p.crypto.secp256k1 import ( ) from libp2p.discovery.events.peerDiscovery import peerDiscovery +logger = logging.getLogger("libp2p.example.discovery.mdns") +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +) +logger.addHandler(handler) -def customFunctoion(peerinfo: PeerInfo): - print("Printing peer info from demo file", repr(peerinfo)) + +def onPeerDiscovery(peerinfo: PeerInfo): + logger.info(f"Discovered: {peerinfo.peer_id}") async def main(): @@ -24,17 +33,17 @@ async def main(): # Listen on a random TCP port listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") - peerDiscovery.register_peer_discovered_handler(customFunctoion) + peerDiscovery.register_peer_discovered_handler(onPeerDiscovery) # Enable mDNS discovery + logger.info("Starting peer Discovery") host = new_host(key_pair=key_pair, enable_mDNS=True) - + await trio.sleep(5) async with host.run(listen_addrs=[listen_addr]): - # Print discovered peers via mDNS try: while True: await trio.sleep(100) except KeyboardInterrupt: - print("Exiting...") + logger.info("Exiting...") if __name__ == "__main__": diff --git a/libp2p/discovery/events/peerDiscovery.py b/libp2p/discovery/events/peerDiscovery.py index 7cbbba7c..6b2d30d0 100644 --- a/libp2p/discovery/events/peerDiscovery.py +++ b/libp2p/discovery/events/peerDiscovery.py @@ -24,11 +24,3 @@ class PeerDiscovery: peerDiscovery = PeerDiscovery() - - -# async def peerDiscoveryHandler(peerInfo: PeerInfo) -> None: -# await trio.sleep(5) # Simulate some processing delay -# # print("Discovered peer is", peerInfo.peer_id) - - -# peerDiscovery.register_peer_discovered_handler(peerDiscoveryHandler) diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py index 8e14ed96..845704a6 100644 --- a/libp2p/discovery/mdns/broadcaster.py +++ b/libp2p/discovery/mdns/broadcaster.py @@ -1,3 +1,4 @@ +import logging import socket from zeroconf import ( @@ -5,6 +6,8 @@ from zeroconf import ( Zeroconf, ) +logger = logging.getLogger("libp2p.discovery.mdns.broadcaster") + class PeerBroadcaster: """ diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index b5829123..027c47c4 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -1,3 +1,4 @@ +import logging import socket from zeroconf import ( @@ -12,6 +13,8 @@ from libp2p.discovery.events.peerDiscovery import peerDiscovery from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo +logger = logging.getLogger("libp2p.discovery.mdns.listner") + class PeerListener(ServiceListener): """mDNS listener — now a true ServiceListener subclass.""" @@ -41,12 +44,12 @@ class PeerListener(ServiceListener): self.discovered_services[name] = peer_info.peer_id self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) peerDiscovery.emit_peer_discovered(peer_info) - print("Discovered Peer:", peer_info.peer_id) + logger.debug("Discovered Peer:", peer_info.peer_id) def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: peer_id = self.discovered_services.pop(name) self.peerstore.clear_addrs(peer_id) - print(f"Removed Peer: {peer_id}") + logger.debug(f"Removed Peer: {peer_id}") def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: info = zc.get_service_info(type_, name, timeout=5000) @@ -56,7 +59,7 @@ class PeerListener(ServiceListener): if peer_info: self.peerstore.clear_addrs(peer_info.peer_id) self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) - print("Updated Peer", peer_info.peer_id) + logger.debug("Updated Peer", peer_info.peer_id) def _process_discovered_service( self, zeroconf: Zeroconf, type_: str, name: str @@ -67,7 +70,7 @@ class PeerListener(ServiceListener): peer_info = self._extract_peer_info(info) if peer_info: self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) - print("Discovered:", peer_info.peer_id) + logger.debug("Discovered:", peer_info.peer_id) def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None: try: diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py index a76cb39a..e2a89463 100644 --- a/libp2p/discovery/mdns/mdns.py +++ b/libp2p/discovery/mdns/mdns.py @@ -4,6 +4,8 @@ Conforms to https://github.com/libp2p/specs/blob/master/discovery/mdns.md Uses zeroconf for mDNS broadcast/listen. Async operations use trio. """ +import logging + from zeroconf import ( Zeroconf, ) @@ -22,6 +24,8 @@ from .utils import ( stringGen, ) +logger = logging.getLogger("libp2p.discovery.mdns") + SERVICE_TYPE = "_p2p._udp.local." MCAST_PORT = 5353 MCAST_ADDR = "224.0.0.251" @@ -56,7 +60,9 @@ class MDNSDiscovery: def start(self) -> None: """Register this peer and start listening for others.""" - print(f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}") + logger.debug( + f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}" + ) self.broadcaster.register() # Listener is started in constructor diff --git a/pyproject.toml b/pyproject.toml index 91803ada..7ca235fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "trio-typing>=0.0.4", "trio>=0.26.0", "fastecdsa==2.3.2; sys_platform != 'win32'", + "zeroconf (>=0.147.0,<0.148.0)", ] classifiers = [ "Development Status :: 4 - Beta", From 3b53120092fde56d9026a0cdc7d6dbf94278f9e0 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Fri, 20 Jun 2025 20:29:24 +0530 Subject: [PATCH 11/33] fixed some errors during rebase --- libp2p/__init__.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index a066207c..6d03ce69 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -239,13 +239,13 @@ def new_swarm( def new_host( - key_pair: Optional[KeyPair] = None, - muxer_opt: Optional[TMuxerOptions] = None, - sec_opt: Optional[TSecurityOptions] = None, - peerstore_opt: Optional[IPeerStore] = None, - disc_opt: Optional[IPeerRouting] = None, - muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None, - listen_addrs: Sequence[multiaddr.Multiaddr] = None, + key_pair: KeyPair | None = None, + muxer_opt: TMuxerOptions | None = None, + sec_opt: TSecurityOptions | None = None, + peerstore_opt: IPeerStore | None = None, + disc_opt: IPeerRouting | None = None, + muxer_preference: Literal["YAMUX", "MPLEX"] | None = None, + listen_addrs: Sequence[multiaddr.Multiaddr] | None = None, enable_mDNS: bool = False, ) -> IHost: """ @@ -275,7 +275,6 @@ def new_host( if disc_opt is not None: return RoutedHost(swarm, disc_opt) - else: - return BasicHost(swarm) + return BasicHost(swarm) __version__ = __version("libp2p") From 67bcad16742b627556e00941dc7ee664ddd9aca3 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 00:18:41 +0530 Subject: [PATCH 12/33] Refactored mDNS example and added script for example --- docs/examples.mDNS.rst | 64 +++++++++++++++++++++++++++++++ docs/examples.rst | 1 + examples/discovery/__init__.py | 0 examples/discovery/mDNS/mDNS.py | 50 ------------------------ examples/mDNS/mDNS.py | 67 +++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 6 files changed, 133 insertions(+), 50 deletions(-) create mode 100644 docs/examples.mDNS.rst delete mode 100644 examples/discovery/__init__.py delete mode 100644 examples/discovery/mDNS/mDNS.py create mode 100644 examples/mDNS/mDNS.py diff --git a/docs/examples.mDNS.rst b/docs/examples.mDNS.rst new file mode 100644 index 00000000..26dc8b17 --- /dev/null +++ b/docs/examples.mDNS.rst @@ -0,0 +1,64 @@ +mDNS Peer Discovery Example +========================== + +This example demonstrates how to use mDNS (Multicast DNS) for peer discovery in py-libp2p. + +Prerequisites +------------- + +First, ensure you have py-libp2p installed and your environment is activated: + +.. code-block:: console + + $ python -m pip install libp2p + +Running the Example +------------------- + +The mDNS demo script allows you to discover peers on your local network using mDNS. To start a peer, run: + +.. code-block:: console + + $ mdns-demo + +You should see output similar to: + +.. code-block:: console + + Run this from another console to start another peer on a different port: + + python mdns-demo -p + + Waiting for mDNS peer discovery events... + + 2025-06-20 23:28:12,052 - libp2p.example.discovery.mdns - INFO - Starting peer Discovery + +To discover peers, open another terminal and run the same command with a different port: + +.. code-block:: console + + $ python mdns-demo -p 9001 + +You should see output indicating that a new peer has been discovered: + +.. code-block:: console + + Run this from the same folder in another console to start another peer on a different port: + + python mdns-demo -p + + Waiting for mDNS peer discovery events... + + 2025-06-20 23:43:43,786 - libp2p.example.discovery.mdns - INFO - Starting peer Discovery + 2025-06-20 23:43:43,790 - libp2p.example.discovery.mdns - INFO - Discovered: 16Uiu2HAmGxy5NdQEjZWtrYUMrzdp3Syvg7MB2E5Lx8weA9DanYxj + +When a new peer is discovered, its peer ID will be printed in the console output. + +How it Works +------------ + +- Each node advertises itself on the local network using mDNS. +- When a new peer is discovered, the handler prints its peer ID. +- This is useful for local peer discovery without requiring a DHT or bootstrap nodes. + +You can modify the script to perform additional actions when peers are discovered, such as opening streams or exchanging messages. diff --git a/docs/examples.rst b/docs/examples.rst index 676216a9..b20d0e63 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -13,3 +13,4 @@ Examples examples.pubsub examples.circuit_relay examples.kademlia + examples.mDNS diff --git a/examples/discovery/__init__.py b/examples/discovery/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/discovery/mDNS/mDNS.py b/examples/discovery/mDNS/mDNS.py deleted file mode 100644 index 28d2c9ea..00000000 --- a/examples/discovery/mDNS/mDNS.py +++ /dev/null @@ -1,50 +0,0 @@ -import logging -import secrets - -import multiaddr -import trio - -from libp2p import ( - new_host, -) -from libp2p.abc import PeerInfo -from libp2p.crypto.secp256k1 import ( - create_new_key_pair, -) -from libp2p.discovery.events.peerDiscovery import peerDiscovery - -logger = logging.getLogger("libp2p.example.discovery.mdns") -logger.setLevel(logging.INFO) -handler = logging.StreamHandler() -handler.setFormatter( - logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -) -logger.addHandler(handler) - - -def onPeerDiscovery(peerinfo: PeerInfo): - logger.info(f"Discovered: {peerinfo.peer_id}") - - -async def main(): - # Generate a key pair for the host - secret = secrets.token_bytes(32) - key_pair = create_new_key_pair(secret) - - # Listen on a random TCP port - listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") - peerDiscovery.register_peer_discovered_handler(onPeerDiscovery) - # Enable mDNS discovery - logger.info("Starting peer Discovery") - host = new_host(key_pair=key_pair, enable_mDNS=True) - await trio.sleep(5) - async with host.run(listen_addrs=[listen_addr]): - try: - while True: - await trio.sleep(100) - except KeyboardInterrupt: - logger.info("Exiting...") - - -if __name__ == "__main__": - trio.run(main) diff --git a/examples/mDNS/mDNS.py b/examples/mDNS/mDNS.py new file mode 100644 index 00000000..ad5c8fd3 --- /dev/null +++ b/examples/mDNS/mDNS.py @@ -0,0 +1,67 @@ +import argparse +import logging +import secrets + +import multiaddr +import trio + +from libp2p import ( + new_host, +) +from libp2p.abc import PeerInfo +from libp2p.crypto.secp256k1 import ( + create_new_key_pair, +) +from libp2p.discovery.events.peerDiscovery import peerDiscovery + +logger = logging.getLogger("libp2p.example.discovery.mdns") +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +) +logger.addHandler(handler) + + +def onPeerDiscovery(peerinfo: PeerInfo): + logger.info(f"Discovered: {peerinfo.peer_id}") + + +async def run(port: int) -> None: + secret = secrets.token_bytes(32) + key_pair = create_new_key_pair(secret) + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + peerDiscovery.register_peer_discovered_handler(onPeerDiscovery) + + print( + "Run this from the same folder in another console to " + "start another peer on a different port:\n\n" + "python mdns-demo -p \n" + ) + print("Waiting for mDNS peer discovery events...\n") + + logger.info("Starting peer Discovery") + host = new_host(key_pair=key_pair, enable_mDNS=True) + async with host.run(listen_addrs=[listen_addr]): + await trio.sleep_forever() + + +def main() -> None: + description = """ + This program demonstrates mDNS peer discovery using libp2p. + To use it, run 'mdns-demo -p ', where is the port number. + Start multiple peers on different ports to see discovery in action. + """ + parser = argparse.ArgumentParser(description=description) + parser.add_argument("-p", "--port", default=0, type=int, help="source port number") + + args = parser.parse_args() + try: + trio.run(run, args.port) + except KeyboardInterrupt: + logger.info("Exiting...") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 7ca235fb..cf000156 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,7 @@ identify-demo = "examples.identify.identify:main" identify-push-demo = "examples.identify_push.identify_push_demo:run_main" identify-push-listener-dialer-demo = "examples.identify_push.identify_push_listener_dialer:main" pubsub-demo = "examples.pubsub.pubsub:main" +mdns-demo = "examples.mDNS.mDNS:main" [project.optional-dependencies] dev = [ From 8f0762f95cecc6f4c4dc27fc4ee5363a2e411abf Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 00:24:22 +0530 Subject: [PATCH 13/33] fix: remove unnecessary blank lines in mDNS example documentation --- docs/examples.mDNS.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/examples.mDNS.rst b/docs/examples.mDNS.rst index 26dc8b17..40639844 100644 --- a/docs/examples.mDNS.rst +++ b/docs/examples.mDNS.rst @@ -26,11 +26,11 @@ You should see output similar to: .. code-block:: console Run this from another console to start another peer on a different port: - + python mdns-demo -p - + Waiting for mDNS peer discovery events... - + 2025-06-20 23:28:12,052 - libp2p.example.discovery.mdns - INFO - Starting peer Discovery To discover peers, open another terminal and run the same command with a different port: From 555e389109535e9bae04608e35857d29d35fd283 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 00:26:36 +0530 Subject: [PATCH 14/33] fix: correct heading formatting in mDNS example documentation --- docs/examples.mDNS.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples.mDNS.rst b/docs/examples.mDNS.rst index 40639844..d6ec6aaf 100644 --- a/docs/examples.mDNS.rst +++ b/docs/examples.mDNS.rst @@ -1,5 +1,5 @@ mDNS Peer Discovery Example -========================== +=========================== This example demonstrates how to use mDNS (Multicast DNS) for peer discovery in py-libp2p. From 77a9788a69bcf44b83287cb593c8bdb627a47250 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 01:10:45 +0530 Subject: [PATCH 15/33] feat: add initial documentation for libp2p.discovery package --- docs/libp2p.discovery.rst | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 docs/libp2p.discovery.rst diff --git a/docs/libp2p.discovery.rst b/docs/libp2p.discovery.rst new file mode 100644 index 00000000..ec6e041d --- /dev/null +++ b/docs/libp2p.discovery.rst @@ -0,0 +1,22 @@ +libp2p.discovery package +======================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + libp2p.discovery.events + libp2p.discovery.mdns + +Submodules +---------- + +Module contents +--------------- + +.. automodule:: libp2p.discovery + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file From 7135e6cd4dc6c0cf09d80a5897b133046d981eef Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 01:11:27 +0530 Subject: [PATCH 16/33] fix: ensure newline at end of file in libp2p.discovery documentation --- docs/libp2p.discovery.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/libp2p.discovery.rst b/docs/libp2p.discovery.rst index ec6e041d..cb8859a4 100644 --- a/docs/libp2p.discovery.rst +++ b/docs/libp2p.discovery.rst @@ -19,4 +19,4 @@ Module contents .. automodule:: libp2p.discovery :members: :undoc-members: - :show-inheritance: \ No newline at end of file + :show-inheritance: From e018af09ae428a02f8a77bf9ccbb9e29e8c01f9f Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 01:30:16 +0530 Subject: [PATCH 17/33] feat: add documentation for libp2p.discovery.events and libp2p.discovery.mdns packages --- docs/libp2p.discovery.events.rst | 21 +++++++++++++++ docs/libp2p.discovery.mdns.rst | 45 ++++++++++++++++++++++++++++++++ docs/libp2p.rst | 1 + 3 files changed, 67 insertions(+) create mode 100644 docs/libp2p.discovery.events.rst create mode 100644 docs/libp2p.discovery.mdns.rst diff --git a/docs/libp2p.discovery.events.rst b/docs/libp2p.discovery.events.rst new file mode 100644 index 00000000..bfac0eb5 --- /dev/null +++ b/docs/libp2p.discovery.events.rst @@ -0,0 +1,21 @@ +libp2p.discovery.events package +=============================== + +Submodules +---------- + +libp2p.discovery.events.peerDiscovery module +-------------------------------------------- + +.. automodule:: libp2p.discovery.events.peerDiscovery + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.discovery.events + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/libp2p.discovery.mdns.rst b/docs/libp2p.discovery.mdns.rst new file mode 100644 index 00000000..474c6725 --- /dev/null +++ b/docs/libp2p.discovery.mdns.rst @@ -0,0 +1,45 @@ +libp2p.discovery.mdns package +============================= + +Submodules +---------- + +libp2p.discovery.mdns.broadcaster module +---------------------------------------- + +.. automodule:: libp2p.discovery.mdns.broadcaster + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.mdns.listener module +------------------------------------- + +.. automodule:: libp2p.discovery.mdns.listener + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.mdns.mdns module +--------------------------------- + +.. automodule:: libp2p.discovery.mdns.mdns + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.mdns.utils module +---------------------------------- + +.. automodule:: libp2p.discovery.mdns.utils + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.discovery.mdns + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/libp2p.rst b/docs/libp2p.rst index 749b4c11..a47e011a 100644 --- a/docs/libp2p.rst +++ b/docs/libp2p.rst @@ -8,6 +8,7 @@ Subpackages :maxdepth: 4 libp2p.crypto + libp2p.discovery libp2p.host libp2p.identity libp2p.io From 35248f8167564b06d08751ee1ba10a65b18f318c Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 01:33:06 +0530 Subject: [PATCH 18/33] fix: ensure newline at end of file in libp2p.discovery.events and libp2p.discovery.mdns documentation --- docs/libp2p.discovery.events.rst | 2 +- docs/libp2p.discovery.mdns.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/libp2p.discovery.events.rst b/docs/libp2p.discovery.events.rst index bfac0eb5..7a0edba9 100644 --- a/docs/libp2p.discovery.events.rst +++ b/docs/libp2p.discovery.events.rst @@ -18,4 +18,4 @@ Module contents .. automodule:: libp2p.discovery.events :members: :undoc-members: - :show-inheritance: \ No newline at end of file + :show-inheritance: diff --git a/docs/libp2p.discovery.mdns.rst b/docs/libp2p.discovery.mdns.rst index 474c6725..af842919 100644 --- a/docs/libp2p.discovery.mdns.rst +++ b/docs/libp2p.discovery.mdns.rst @@ -42,4 +42,4 @@ Module contents .. automodule:: libp2p.discovery.mdns :members: :undoc-members: - :show-inheritance: \ No newline at end of file + :show-inheritance: From 293087bd068ad334883b2705750feb3996565db9 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 15:09:01 +0530 Subject: [PATCH 19/33] feat: added newsfragment for mDNS --- newsfragments/649.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/649.feature.rst diff --git a/newsfragments/649.feature.rst b/newsfragments/649.feature.rst new file mode 100644 index 00000000..843614c2 --- /dev/null +++ b/newsfragments/649.feature.rst @@ -0,0 +1 @@ +Added support for ``Multicast DNS`` in py-libp2p \ No newline at end of file From 31b694aa29d3ba636a07924dcc0ede88464de555 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sat, 21 Jun 2025 15:14:45 +0530 Subject: [PATCH 20/33] fix: ensure newline at end of file in newsfragments/649.feature.rst --- newsfragments/649.feature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newsfragments/649.feature.rst b/newsfragments/649.feature.rst index 843614c2..82ba5cd7 100644 --- a/newsfragments/649.feature.rst +++ b/newsfragments/649.feature.rst @@ -1 +1 @@ -Added support for ``Multicast DNS`` in py-libp2p \ No newline at end of file +Added support for ``Multicast DNS`` in py-libp2p From b258ff3ea20bd62c079b855e937869923fba3ffc Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Mon, 23 Jun 2025 00:04:09 +0530 Subject: [PATCH 21/33] fix: correct logger name typo and update protocol in peer info extraction --- libp2p/discovery/mdns/listener.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index 027c47c4..5de7078c 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -13,7 +13,7 @@ from libp2p.discovery.events.peerDiscovery import peerDiscovery from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo -logger = logging.getLogger("libp2p.discovery.mdns.listner") +logger = logging.getLogger("libp2p.discovery.mdns.listener") class PeerListener(ServiceListener): @@ -75,7 +75,7 @@ class PeerListener(ServiceListener): def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None: try: addrs = [ - Multiaddr(f"/ip4/{socket.inet_ntoa(addr)}/udp/{info.port}") + Multiaddr(f"/ip4/{socket.inet_ntoa(addr)}/tcp/{info.port}") for addr in info.addresses ] pid_bytes = info.properties.get(b"id") From dcc8bbb619b285eac690c7d6fd4339fa3d48c656 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Mon, 23 Jun 2025 01:17:42 +0530 Subject: [PATCH 22/33] feat: add unit and integration tests for mDNS. --- tests/discovery/__init__.py | 0 tests/discovery/mdns/__init__.py | 0 tests/discovery/mdns/test_broadcaster.py | 90 ++++++++++++++ tests/discovery/mdns/test_integration.py | 128 +++++++++++++++++++ tests/discovery/mdns/test_listener.py | 111 +++++++++++++++++ tests/discovery/mdns/test_mdns.py | 152 +++++++++++++++++++++++ tests/discovery/mdns/test_utils.py | 39 ++++++ 7 files changed, 520 insertions(+) create mode 100644 tests/discovery/__init__.py create mode 100644 tests/discovery/mdns/__init__.py create mode 100644 tests/discovery/mdns/test_broadcaster.py create mode 100644 tests/discovery/mdns/test_integration.py create mode 100644 tests/discovery/mdns/test_listener.py create mode 100644 tests/discovery/mdns/test_mdns.py create mode 100644 tests/discovery/mdns/test_utils.py diff --git a/tests/discovery/__init__.py b/tests/discovery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/discovery/mdns/__init__.py b/tests/discovery/mdns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/discovery/mdns/test_broadcaster.py b/tests/discovery/mdns/test_broadcaster.py new file mode 100644 index 00000000..d4722ba7 --- /dev/null +++ b/tests/discovery/mdns/test_broadcaster.py @@ -0,0 +1,90 @@ +""" +Unit tests for mDNS broadcaster component. +""" +import socket +import pytest +from zeroconf import ServiceInfo, Zeroconf + +from libp2p.discovery.mdns.broadcaster import PeerBroadcaster +from libp2p.peer.id import ID + + +class TestPeerBroadcaster: + """Basic unit tests for PeerBroadcaster.""" + + def test_broadcaster_initialization(self): + """Test that broadcaster initializes correctly.""" + zeroconf = Zeroconf() + service_type = "_p2p._udp.local." + service_name = "test-peer._p2p._udp.local." + peer_id = "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" # String, not ID object + port = 8000 + + broadcaster = PeerBroadcaster( + zeroconf=zeroconf, + service_type=service_type, + service_name=service_name, + peer_id=peer_id, + port=port + ) + + assert broadcaster.zeroconf == zeroconf + assert broadcaster.service_type == service_type + assert broadcaster.service_name == service_name + assert broadcaster.peer_id == peer_id + assert broadcaster.port == port + + # Clean up + zeroconf.close() + + def test_broadcaster_service_creation(self): + """Test that broadcaster creates valid service info.""" + zeroconf = Zeroconf() + service_type = "_p2p._udp.local." + service_name = "test-peer2._p2p._udp.local." + peer_id_obj = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") + peer_id = str(peer_id_obj) # Convert to string + port = 8000 + + broadcaster = PeerBroadcaster( + zeroconf=zeroconf, + service_type=service_type, + service_name=service_name, + peer_id=peer_id, + port=port + ) + + # Verify service was created and registered + service_info = broadcaster.service_info + assert service_info is not None + assert service_info.type == service_type + assert service_info.name == service_name + assert service_info.port == port + assert b"id" in service_info.properties + assert service_info.properties[b"id"] == peer_id.encode() + + # Clean up + zeroconf.close() + + def test_broadcaster_start_stop(self): + """Test that broadcaster can start and stop correctly.""" + zeroconf = Zeroconf() + service_type = "_p2p._udp.local." + service_name = "test-start-stop._p2p._udp.local." + peer_id_obj = ID.from_base58("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") + peer_id = str(peer_id_obj) # Convert to string + port = 8001 + + broadcaster = PeerBroadcaster( + zeroconf=zeroconf, + service_type=service_type, + service_name=service_name, + peer_id=peer_id, + port=port + ) + + # Service should be registered + assert broadcaster.service_info is not None + + # Clean up + zeroconf.close() diff --git a/tests/discovery/mdns/test_integration.py b/tests/discovery/mdns/test_integration.py new file mode 100644 index 00000000..c7291aa7 --- /dev/null +++ b/tests/discovery/mdns/test_integration.py @@ -0,0 +1,128 @@ +""" +Basic integration tests for mDNS components. +""" +import socket +import pytest +from zeroconf import ServiceInfo, Zeroconf + +from libp2p.discovery.mdns.broadcaster import PeerBroadcaster +from libp2p.discovery.mdns.listener import PeerListener +from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStore + + +class TestMDNSIntegration: + """Basic integration tests for mDNS components.""" + + def test_broadcaster_listener_basic_integration(self): + """Test basic broadcaster and listener integration with actual service discovery.""" + import time + + # Create two separate Zeroconf instances + zeroconf1 = Zeroconf() + zeroconf2 = Zeroconf() + + try: + # Set up broadcaster + broadcaster_peer_id_obj = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") + broadcaster_peer_id = str(broadcaster_peer_id_obj) # Convert to string + broadcaster = PeerBroadcaster( + zeroconf=zeroconf1, + service_type="_p2p._udp.local.", + service_name="broadcaster-peer._p2p._udp.local.", + peer_id=broadcaster_peer_id, + port=8000 + ) + + # Set up listener + peerstore = PeerStore() + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf2, + service_type="_p2p._udp.local.", + service_name="listener-peer._p2p._udp.local.", + ) + + # Verify initial state + assert broadcaster.service_info is not None + assert listener.discovered_services == {} + assert len(peerstore.peer_ids()) == 0 + + # Broadcaster registers its service + broadcaster.register() + + # Simulate discovery - listener discovers the broadcaster's service + listener.add_service( + zeroconf1, # Use broadcaster's zeroconf to find the service + "_p2p._udp.local.", + "broadcaster-peer._p2p._udp.local." + ) + + # Verify that the listener discovered the broadcaster + assert len(listener.discovered_services) > 0 + assert "broadcaster-peer._p2p._udp.local." in listener.discovered_services + + # Verify the discovered peer ID matches what was broadcast + discovered_peer_id = listener.discovered_services["broadcaster-peer._p2p._udp.local."] + assert str(discovered_peer_id) == broadcaster_peer_id + + # Verify the peer was added to the peerstore + assert len(peerstore.peer_ids()) > 0 + assert discovered_peer_id in peerstore.peer_ids() + + # Verify the addresses were correctly stored + addrs = peerstore.addrs(discovered_peer_id) + assert len(addrs) > 0 + # Should be TCP since we always use TCP protocol + assert "/tcp/8000" in str(addrs[0]) + + print(f"✅ Integration test successful!") + print(f" Broadcaster peer ID: {broadcaster_peer_id}") + print(f" Discovered peer ID: {discovered_peer_id}") + print(f" Discovered addresses: {[str(addr) for addr in addrs]}") + + # Clean up + broadcaster.unregister() + + finally: + zeroconf1.close() + zeroconf2.close() + + def test_service_info_extraction(self): + """Test service info extraction functionality.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + + try: + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type="_p2p._udp.local.", + service_name="test-listener._p2p._udp.local.", + ) + + # Create a test service info + test_peer_id = ID.from_base58("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") + hostname = socket.gethostname() + + service_info = ServiceInfo( + type_="_p2p._udp.local.", + name="test-service._p2p._udp.local.", + port=8001, + properties={b"id": str(test_peer_id).encode()}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton("192.168.1.100")], + ) + + # Test extraction + peer_info = listener._extract_peer_info(service_info) + + assert peer_info is not None + assert peer_info.peer_id == test_peer_id + assert len(peer_info.addrs) == 1 + assert "/tcp/8001" in str(peer_info.addrs[0]) + + # Clean up + + finally: + zeroconf.close() diff --git a/tests/discovery/mdns/test_listener.py b/tests/discovery/mdns/test_listener.py new file mode 100644 index 00000000..aa4992c0 --- /dev/null +++ b/tests/discovery/mdns/test_listener.py @@ -0,0 +1,111 @@ +""" +Unit tests for mDNS listener component. +""" +import socket +import pytest +from zeroconf import ServiceInfo, Zeroconf + +from libp2p.discovery.mdns.listener import PeerListener +from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStore +from libp2p.abc import Multiaddr + + +class TestPeerListener: + """Basic unit tests for PeerListener.""" + + def test_listener_initialization(self): + """Test that listener initializes correctly.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + service_type = "_p2p._udp.local." + service_name = "local-peer._p2p._udp.local." + + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type=service_type, + service_name=service_name, + ) + + assert listener.peerstore == peerstore + assert listener.zeroconf == zeroconf + assert listener.service_type == service_type + assert listener.service_name == service_name + assert listener.discovered_services == {} + + # Clean up + listener.stop() + zeroconf.close() + + def test_listener_extract_peer_info_success(self): + """Test successful PeerInfo extraction from ServiceInfo.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type="_p2p._udp.local.", + service_name="local._p2p._udp.local.", + ) + + # Create sample service info + sample_peer_id = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") + hostname = socket.gethostname() + local_ip = "192.168.1.100" + + sample_service_info = ServiceInfo( + type_="_p2p._udp.local.", + name="test-peer._p2p._udp.local.", + port=8000, + properties={b"id": str(sample_peer_id).encode()}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton(local_ip)], + ) + + peer_info = listener._extract_peer_info(sample_service_info) + + assert peer_info is not None + assert isinstance(peer_info.peer_id, ID) + assert len(peer_info.addrs) > 0 + assert all(isinstance(addr, Multiaddr) for addr in peer_info.addrs) + + # Check that protocol is TCP since we always use TCP + assert "/tcp/" in str(peer_info.addrs[0]) + + # Clean up + listener.stop() + zeroconf.close() + + def test_listener_extract_peer_info_invalid_id(self): + """Test PeerInfo extraction fails with invalid peer ID.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type="_p2p._udp.local.", + service_name="local._p2p._udp.local.", + ) + + # Create service info with invalid peer ID + hostname = socket.gethostname() + local_ip = "192.168.1.100" + + service_info = ServiceInfo( + type_="_p2p._udp.local.", + name="invalid-peer._p2p._udp.local.", + port=8000, + properties={b"id": b"invalid_peer_id_format"}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton(local_ip)], + ) + + peer_info = listener._extract_peer_info(service_info) + assert peer_info is None + + # Clean up + listener.stop() + zeroconf.close() diff --git a/tests/discovery/mdns/test_mdns.py b/tests/discovery/mdns/test_mdns.py new file mode 100644 index 00000000..502f8011 --- /dev/null +++ b/tests/discovery/mdns/test_mdns.py @@ -0,0 +1,152 @@ +""" +Integration test for mDNS discovery where one host finds another. +""" +import time +import socket +import pytest +from zeroconf import Zeroconf + +from libp2p.discovery.mdns.broadcaster import PeerBroadcaster +from libp2p.discovery.mdns.listener import PeerListener +from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStore + + +class TestMDNSDiscovery: + """Integration test for mDNS peer discovery.""" + + def test_one_host_finds_another(self): + """Test that one host can find another host using mDNS.""" + # Create two separate Zeroconf instances to simulate different hosts + host1_zeroconf = Zeroconf() + host2_zeroconf = Zeroconf() + + try: + # Host 1: Set up as broadcaster (the host to be discovered) + host1_peer_id_obj = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") + host1_peer_id = str(host1_peer_id_obj) # Convert to string + host1_broadcaster = PeerBroadcaster( + zeroconf=host1_zeroconf, + service_type="_p2p._udp.local.", + service_name="host1._p2p._udp.local.", + peer_id=host1_peer_id, + port=8000 + ) + + # Host 2: Set up as listener (the host that discovers others) + host2_peerstore = PeerStore() + host2_listener = PeerListener( + peerstore=host2_peerstore, + zeroconf=host2_zeroconf, + service_type="_p2p._udp.local.", + service_name="host2._p2p._udp.local.", + ) + + # Host 1 registers its service for discovery + host1_broadcaster.register() + + + # Manually trigger discovery by calling add_service + # This simulates what happens when mDNS discovers a service + host2_listener.add_service( + host1_zeroconf, # Use host1's zeroconf so it can find the service + "_p2p._udp.local.", + "host1._p2p._udp.local." + ) + + # Verify that host2 discovered host1 + assert len(host2_listener.discovered_services) > 0 + assert "host1._p2p._udp.local." in host2_listener.discovered_services + + # Verify that host1's peer info was added to host2's peerstore + discovered_peer_id = host2_listener.discovered_services["host1._p2p._udp.local."] + assert str(discovered_peer_id) == host1_peer_id + + # Verify addresses were added to peerstore + try: + addrs = host2_peerstore.addrs(discovered_peer_id) + assert len(addrs) > 0 + # Should be TCP since we always use TCP protocol + assert "/tcp/8000" in str(addrs[0]) + except Exception: + # If no addresses found, the discovery didn't work properly + assert False, "Host1 addresses should be in Host2's peerstore" + + print(f"✅ Host2 successfully discovered Host1!") + print(f" Discovered peer ID: {discovered_peer_id}") + print(f" Discovered addresses: {[str(addr) for addr in addrs]}") + + # Clean up + host1_broadcaster.unregister() + host2_listener.stop() + + finally: + host1_zeroconf.close() + host2_zeroconf.close() + + def test_peer_discovery_with_multiple_addresses(self): + """Test discovery works with peers having multiple IP addresses.""" + host1_zeroconf = Zeroconf() + host2_zeroconf = Zeroconf() + + try: + # Create a peer with multiple addresses + host1_peer_id_obj = ID.from_base58("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") + host1_peer_id = str(host1_peer_id_obj) # Convert to string + + # Manually create service info with multiple addresses + from zeroconf import ServiceInfo + hostname = socket.gethostname() + + service_info = ServiceInfo( + type_="_p2p._udp.local.", + name="multi-addr-host._p2p._udp.local.", + port=8001, + properties={b"id": host1_peer_id.encode()}, + server=f"{hostname}.local.", + addresses=[ + socket.inet_aton("192.168.1.100"), + socket.inet_aton("10.0.0.50"), + ], + ) + + # Register the service + host1_zeroconf.register_service(service_info) + + # Set up listener + host2_peerstore = PeerStore() + host2_listener = PeerListener( + peerstore=host2_peerstore, + zeroconf=host2_zeroconf, + service_type="_p2p._udp.local.", + service_name="host2._p2p._udp.local.", + ) + + # Trigger discovery + host2_listener.add_service( + host1_zeroconf, + "_p2p._udp.local.", + "multi-addr-host._p2p._udp.local." + ) + + # Verify discovery + assert "multi-addr-host._p2p._udp.local." in host2_listener.discovered_services + discovered_peer_id = host2_listener.discovered_services["multi-addr-host._p2p._udp.local."] + + # Check multiple addresses were discovered + addrs = host2_peerstore.addrs(discovered_peer_id) + assert len(addrs) == 2 + + addr_strings = [str(addr) for addr in addrs] + assert "/ip4/192.168.1.100/tcp/8001" in addr_strings + assert "/ip4/10.0.0.50/tcp/8001" in addr_strings + + print(f"✅ Successfully discovered peer with multiple addresses!") + print(f" Addresses: {addr_strings}") + + # Clean up + host2_listener.stop() + + finally: + host1_zeroconf.close() + host2_zeroconf.close() diff --git a/tests/discovery/mdns/test_utils.py b/tests/discovery/mdns/test_utils.py new file mode 100644 index 00000000..b50fd44c --- /dev/null +++ b/tests/discovery/mdns/test_utils.py @@ -0,0 +1,39 @@ +""" +Basic unit tests for mDNS utils module. +""" +import string +import pytest + +from libp2p.discovery.mdns.utils import stringGen + + +class TestStringGen: + """Basic unit tests for stringGen function.""" + + def test_stringgen_default_length(self): + """Test stringGen with default length (63).""" + result = stringGen() + + assert isinstance(result, str) + assert len(result) == 63 + + # Check that all characters are from the expected charset + charset = string.ascii_lowercase + string.digits + for char in result: + assert char in charset + + def test_stringgen_custom_length(self): + """Test stringGen with custom lengths.""" + # Test various lengths + test_lengths = [1, 5, 10, 20, 50, 100] + + for length in test_lengths: + result = stringGen(length) + + assert isinstance(result, str) + assert len(result) == length + + # Check that all characters are from the expected charset + charset = string.ascii_lowercase + string.digits + for char in result: + assert char in charset From 9adf9aa4994994bc525acd92e93309893dc323c1 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Mon, 23 Jun 2025 01:48:12 +0530 Subject: [PATCH 23/33] refactor: improve test structure in mDNS tests --- tests/discovery/mdns/test_broadcaster.py | 23 ++-- tests/discovery/mdns/test_integration.py | 128 -------------------- tests/discovery/mdns/test_listener.py | 27 +++-- tests/discovery/mdns/test_mdns.py | 141 +++++++++-------------- tests/discovery/mdns/test_utils.py | 14 +-- 5 files changed, 89 insertions(+), 244 deletions(-) delete mode 100644 tests/discovery/mdns/test_integration.py diff --git a/tests/discovery/mdns/test_broadcaster.py b/tests/discovery/mdns/test_broadcaster.py index d4722ba7..cdb2e8c4 100644 --- a/tests/discovery/mdns/test_broadcaster.py +++ b/tests/discovery/mdns/test_broadcaster.py @@ -1,23 +1,24 @@ """ Unit tests for mDNS broadcaster component. """ -import socket -import pytest -from zeroconf import ServiceInfo, Zeroconf + +from zeroconf import Zeroconf from libp2p.discovery.mdns.broadcaster import PeerBroadcaster from libp2p.peer.id import ID class TestPeerBroadcaster: - """Basic unit tests for PeerBroadcaster.""" + """Unit tests for PeerBroadcaster.""" def test_broadcaster_initialization(self): """Test that broadcaster initializes correctly.""" zeroconf = Zeroconf() service_type = "_p2p._udp.local." service_name = "test-peer._p2p._udp.local." - peer_id = "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" # String, not ID object + peer_id = ( + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" # String, not ID object + ) port = 8000 broadcaster = PeerBroadcaster( @@ -25,7 +26,7 @@ class TestPeerBroadcaster: service_type=service_type, service_name=service_name, peer_id=peer_id, - port=port + port=port, ) assert broadcaster.zeroconf == zeroconf @@ -33,7 +34,7 @@ class TestPeerBroadcaster: assert broadcaster.service_name == service_name assert broadcaster.peer_id == peer_id assert broadcaster.port == port - + # Clean up zeroconf.close() @@ -51,7 +52,7 @@ class TestPeerBroadcaster: service_type=service_type, service_name=service_name, peer_id=peer_id, - port=port + port=port, ) # Verify service was created and registered @@ -62,7 +63,7 @@ class TestPeerBroadcaster: assert service_info.port == port assert b"id" in service_info.properties assert service_info.properties[b"id"] == peer_id.encode() - + # Clean up zeroconf.close() @@ -80,11 +81,11 @@ class TestPeerBroadcaster: service_type=service_type, service_name=service_name, peer_id=peer_id, - port=port + port=port, ) # Service should be registered assert broadcaster.service_info is not None - + # Clean up zeroconf.close() diff --git a/tests/discovery/mdns/test_integration.py b/tests/discovery/mdns/test_integration.py deleted file mode 100644 index c7291aa7..00000000 --- a/tests/discovery/mdns/test_integration.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -Basic integration tests for mDNS components. -""" -import socket -import pytest -from zeroconf import ServiceInfo, Zeroconf - -from libp2p.discovery.mdns.broadcaster import PeerBroadcaster -from libp2p.discovery.mdns.listener import PeerListener -from libp2p.peer.id import ID -from libp2p.peer.peerstore import PeerStore - - -class TestMDNSIntegration: - """Basic integration tests for mDNS components.""" - - def test_broadcaster_listener_basic_integration(self): - """Test basic broadcaster and listener integration with actual service discovery.""" - import time - - # Create two separate Zeroconf instances - zeroconf1 = Zeroconf() - zeroconf2 = Zeroconf() - - try: - # Set up broadcaster - broadcaster_peer_id_obj = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") - broadcaster_peer_id = str(broadcaster_peer_id_obj) # Convert to string - broadcaster = PeerBroadcaster( - zeroconf=zeroconf1, - service_type="_p2p._udp.local.", - service_name="broadcaster-peer._p2p._udp.local.", - peer_id=broadcaster_peer_id, - port=8000 - ) - - # Set up listener - peerstore = PeerStore() - listener = PeerListener( - peerstore=peerstore, - zeroconf=zeroconf2, - service_type="_p2p._udp.local.", - service_name="listener-peer._p2p._udp.local.", - ) - - # Verify initial state - assert broadcaster.service_info is not None - assert listener.discovered_services == {} - assert len(peerstore.peer_ids()) == 0 - - # Broadcaster registers its service - broadcaster.register() - - # Simulate discovery - listener discovers the broadcaster's service - listener.add_service( - zeroconf1, # Use broadcaster's zeroconf to find the service - "_p2p._udp.local.", - "broadcaster-peer._p2p._udp.local." - ) - - # Verify that the listener discovered the broadcaster - assert len(listener.discovered_services) > 0 - assert "broadcaster-peer._p2p._udp.local." in listener.discovered_services - - # Verify the discovered peer ID matches what was broadcast - discovered_peer_id = listener.discovered_services["broadcaster-peer._p2p._udp.local."] - assert str(discovered_peer_id) == broadcaster_peer_id - - # Verify the peer was added to the peerstore - assert len(peerstore.peer_ids()) > 0 - assert discovered_peer_id in peerstore.peer_ids() - - # Verify the addresses were correctly stored - addrs = peerstore.addrs(discovered_peer_id) - assert len(addrs) > 0 - # Should be TCP since we always use TCP protocol - assert "/tcp/8000" in str(addrs[0]) - - print(f"✅ Integration test successful!") - print(f" Broadcaster peer ID: {broadcaster_peer_id}") - print(f" Discovered peer ID: {discovered_peer_id}") - print(f" Discovered addresses: {[str(addr) for addr in addrs]}") - - # Clean up - broadcaster.unregister() - - finally: - zeroconf1.close() - zeroconf2.close() - - def test_service_info_extraction(self): - """Test service info extraction functionality.""" - peerstore = PeerStore() - zeroconf = Zeroconf() - - try: - listener = PeerListener( - peerstore=peerstore, - zeroconf=zeroconf, - service_type="_p2p._udp.local.", - service_name="test-listener._p2p._udp.local.", - ) - - # Create a test service info - test_peer_id = ID.from_base58("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") - hostname = socket.gethostname() - - service_info = ServiceInfo( - type_="_p2p._udp.local.", - name="test-service._p2p._udp.local.", - port=8001, - properties={b"id": str(test_peer_id).encode()}, - server=f"{hostname}.local.", - addresses=[socket.inet_aton("192.168.1.100")], - ) - - # Test extraction - peer_info = listener._extract_peer_info(service_info) - - assert peer_info is not None - assert peer_info.peer_id == test_peer_id - assert len(peer_info.addrs) == 1 - assert "/tcp/8001" in str(peer_info.addrs[0]) - - # Clean up - - finally: - zeroconf.close() diff --git a/tests/discovery/mdns/test_listener.py b/tests/discovery/mdns/test_listener.py index aa4992c0..1995202e 100644 --- a/tests/discovery/mdns/test_listener.py +++ b/tests/discovery/mdns/test_listener.py @@ -1,18 +1,19 @@ """ Unit tests for mDNS listener component. """ + import socket -import pytest + from zeroconf import ServiceInfo, Zeroconf +from libp2p.abc import Multiaddr from libp2p.discovery.mdns.listener import PeerListener from libp2p.peer.id import ID from libp2p.peer.peerstore import PeerStore -from libp2p.abc import Multiaddr class TestPeerListener: - """Basic unit tests for PeerListener.""" + """Unit tests for PeerListener.""" def test_listener_initialization(self): """Test that listener initializes correctly.""" @@ -33,7 +34,7 @@ class TestPeerListener: assert listener.service_type == service_type assert listener.service_name == service_name assert listener.discovered_services == {} - + # Clean up listener.stop() zeroconf.close() @@ -42,7 +43,7 @@ class TestPeerListener: """Test successful PeerInfo extraction from ServiceInfo.""" peerstore = PeerStore() zeroconf = Zeroconf() - + listener = PeerListener( peerstore=peerstore, zeroconf=zeroconf, @@ -51,10 +52,12 @@ class TestPeerListener: ) # Create sample service info - sample_peer_id = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") + sample_peer_id = ID.from_base58( + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" + ) hostname = socket.gethostname() local_ip = "192.168.1.100" - + sample_service_info = ServiceInfo( type_="_p2p._udp.local.", name="test-peer._p2p._udp.local.", @@ -70,10 +73,10 @@ class TestPeerListener: assert isinstance(peer_info.peer_id, ID) assert len(peer_info.addrs) > 0 assert all(isinstance(addr, Multiaddr) for addr in peer_info.addrs) - + # Check that protocol is TCP since we always use TCP assert "/tcp/" in str(peer_info.addrs[0]) - + # Clean up listener.stop() zeroconf.close() @@ -82,7 +85,7 @@ class TestPeerListener: """Test PeerInfo extraction fails with invalid peer ID.""" peerstore = PeerStore() zeroconf = Zeroconf() - + listener = PeerListener( peerstore=peerstore, zeroconf=zeroconf, @@ -93,7 +96,7 @@ class TestPeerListener: # Create service info with invalid peer ID hostname = socket.gethostname() local_ip = "192.168.1.100" - + service_info = ServiceInfo( type_="_p2p._udp.local.", name="invalid-peer._p2p._udp.local.", @@ -105,7 +108,7 @@ class TestPeerListener: peer_info = listener._extract_peer_info(service_info) assert peer_info is None - + # Clean up listener.stop() zeroconf.close() diff --git a/tests/discovery/mdns/test_mdns.py b/tests/discovery/mdns/test_mdns.py index 502f8011..83d734a7 100644 --- a/tests/discovery/mdns/test_mdns.py +++ b/tests/discovery/mdns/test_mdns.py @@ -1,9 +1,9 @@ """ -Integration test for mDNS discovery where one host finds another. +Comprehensive integration tests for mDNS discovery functionality. """ -import time + import socket -import pytest + from zeroconf import Zeroconf from libp2p.discovery.mdns.broadcaster import PeerBroadcaster @@ -13,26 +13,28 @@ from libp2p.peer.peerstore import PeerStore class TestMDNSDiscovery: - """Integration test for mDNS peer discovery.""" + """Comprehensive integration tests for mDNS peer discovery.""" def test_one_host_finds_another(self): """Test that one host can find another host using mDNS.""" # Create two separate Zeroconf instances to simulate different hosts host1_zeroconf = Zeroconf() host2_zeroconf = Zeroconf() - + try: # Host 1: Set up as broadcaster (the host to be discovered) - host1_peer_id_obj = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") + host1_peer_id_obj = ID.from_base58( + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" + ) host1_peer_id = str(host1_peer_id_obj) # Convert to string host1_broadcaster = PeerBroadcaster( zeroconf=host1_zeroconf, service_type="_p2p._udp.local.", service_name="host1._p2p._udp.local.", peer_id=host1_peer_id, - port=8000 + port=8000, ) - + # Host 2: Set up as listener (the host that discovers others) host2_peerstore = PeerStore() host2_listener = PeerListener( @@ -41,27 +43,20 @@ class TestMDNSDiscovery: service_type="_p2p._udp.local.", service_name="host2._p2p._udp.local.", ) - + # Host 1 registers its service for discovery host1_broadcaster.register() - - - # Manually trigger discovery by calling add_service - # This simulates what happens when mDNS discovers a service - host2_listener.add_service( - host1_zeroconf, # Use host1's zeroconf so it can find the service - "_p2p._udp.local.", - "host1._p2p._udp.local." - ) - + # Verify that host2 discovered host1 assert len(host2_listener.discovered_services) > 0 assert "host1._p2p._udp.local." in host2_listener.discovered_services - + # Verify that host1's peer info was added to host2's peerstore - discovered_peer_id = host2_listener.discovered_services["host1._p2p._udp.local."] + discovered_peer_id = host2_listener.discovered_services[ + "host1._p2p._udp.local." + ] assert str(discovered_peer_id) == host1_peer_id - + # Verify addresses were added to peerstore try: addrs = host2_peerstore.addrs(discovered_peer_id) @@ -71,82 +66,56 @@ class TestMDNSDiscovery: except Exception: # If no addresses found, the discovery didn't work properly assert False, "Host1 addresses should be in Host2's peerstore" - - print(f"✅ Host2 successfully discovered Host1!") - print(f" Discovered peer ID: {discovered_peer_id}") - print(f" Discovered addresses: {[str(addr) for addr in addrs]}") - + # Clean up host1_broadcaster.unregister() host2_listener.stop() - + finally: host1_zeroconf.close() host2_zeroconf.close() - def test_peer_discovery_with_multiple_addresses(self): - """Test discovery works with peers having multiple IP addresses.""" - host1_zeroconf = Zeroconf() - host2_zeroconf = Zeroconf() - + def test_service_info_extraction(self): + """Test service info extraction functionality.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + try: - # Create a peer with multiple addresses - host1_peer_id_obj = ID.from_base58("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") - host1_peer_id = str(host1_peer_id_obj) # Convert to string - - # Manually create service info with multiple addresses - from zeroconf import ServiceInfo + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type="_p2p._udp.local.", + service_name="test-listener._p2p._udp.local.", + ) + + # Create a test service info + test_peer_id = ID.from_base58( + "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N" + ) hostname = socket.gethostname() - + + from zeroconf import ServiceInfo + service_info = ServiceInfo( type_="_p2p._udp.local.", - name="multi-addr-host._p2p._udp.local.", + name="test-service._p2p._udp.local.", port=8001, - properties={b"id": host1_peer_id.encode()}, + properties={b"id": str(test_peer_id).encode()}, server=f"{hostname}.local.", - addresses=[ - socket.inet_aton("192.168.1.100"), - socket.inet_aton("10.0.0.50"), - ], + addresses=[socket.inet_aton("192.168.1.100")], ) - - # Register the service - host1_zeroconf.register_service(service_info) - - # Set up listener - host2_peerstore = PeerStore() - host2_listener = PeerListener( - peerstore=host2_peerstore, - zeroconf=host2_zeroconf, - service_type="_p2p._udp.local.", - service_name="host2._p2p._udp.local.", - ) - - # Trigger discovery - host2_listener.add_service( - host1_zeroconf, - "_p2p._udp.local.", - "multi-addr-host._p2p._udp.local." - ) - - # Verify discovery - assert "multi-addr-host._p2p._udp.local." in host2_listener.discovered_services - discovered_peer_id = host2_listener.discovered_services["multi-addr-host._p2p._udp.local."] - - # Check multiple addresses were discovered - addrs = host2_peerstore.addrs(discovered_peer_id) - assert len(addrs) == 2 - - addr_strings = [str(addr) for addr in addrs] - assert "/ip4/192.168.1.100/tcp/8001" in addr_strings - assert "/ip4/10.0.0.50/tcp/8001" in addr_strings - - print(f"✅ Successfully discovered peer with multiple addresses!") - print(f" Addresses: {addr_strings}") - - # Clean up - host2_listener.stop() - + + # Test extraction + peer_info = listener._extract_peer_info(service_info) + + assert peer_info is not None + assert peer_info.peer_id == test_peer_id + assert len(peer_info.addrs) == 1 + assert "/tcp/8001" in str(peer_info.addrs[0]) + + print("✅ Service info extraction test successful!") + print(f" Extracted peer ID: {peer_info.peer_id}") + print(f" Extracted addresses: {[str(addr) for addr in peer_info.addrs]}") + finally: - host1_zeroconf.close() - host2_zeroconf.close() + zeroconf.close() diff --git a/tests/discovery/mdns/test_utils.py b/tests/discovery/mdns/test_utils.py index b50fd44c..81c296bc 100644 --- a/tests/discovery/mdns/test_utils.py +++ b/tests/discovery/mdns/test_utils.py @@ -1,22 +1,22 @@ """ Basic unit tests for mDNS utils module. """ + import string -import pytest from libp2p.discovery.mdns.utils import stringGen class TestStringGen: - """Basic unit tests for stringGen function.""" + """Unit tests for stringGen function.""" def test_stringgen_default_length(self): """Test stringGen with default length (63).""" result = stringGen() - + assert isinstance(result, str) assert len(result) == 63 - + # Check that all characters are from the expected charset charset = string.ascii_lowercase + string.digits for char in result: @@ -26,13 +26,13 @@ class TestStringGen: """Test stringGen with custom lengths.""" # Test various lengths test_lengths = [1, 5, 10, 20, 50, 100] - + for length in test_lengths: result = stringGen(length) - + assert isinstance(result, str) assert len(result) == length - + # Check that all characters are from the expected charset charset = string.ascii_lowercase + string.digits for char in result: From 28d0e5759a59908c11d14096d1a73e3952ef68f4 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Tue, 24 Jun 2025 14:15:42 +0530 Subject: [PATCH 24/33] removed redundant function and added try catch block --- libp2p/discovery/mdns/broadcaster.py | 31 ++++++++++++++++++++++++++-- libp2p/discovery/mdns/listener.py | 11 ---------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py index 845704a6..223d747c 100644 --- a/libp2p/discovery/mdns/broadcaster.py +++ b/libp2p/discovery/mdns/broadcaster.py @@ -2,6 +2,7 @@ import logging import socket from zeroconf import ( + EventLoopBlocked, ServiceInfo, Zeroconf, ) @@ -57,8 +58,34 @@ class PeerBroadcaster: def register(self) -> None: """Register the peer's mDNS service on the network.""" - self.zeroconf.register_service(self.service_info) + try: + self.zeroconf.register_service(self.service_info) + logger.debug("mDNS service registered: %s", self.service_name) + except EventLoopBlocked as e: + logger.warning( + "EventLoopBlocked while registering mDNS '%s': %s", self.service_name, e + ) + except Exception as e: + logger.error( + "Unexpected error during mDNS registration for '%s': %r", + self.service_name, + e, + ) def unregister(self) -> None: """Unregister the peer's mDNS service from the network.""" - self.zeroconf.unregister_service(self.service_info) + try: + self.zeroconf.unregister_service(self.service_info) + logger.debug("mDNS service unregistered: %s", self.service_name) + except EventLoopBlocked as e: + logger.warning( + "EventLoopBlocked while unregistering mDNS '%s': %s", + self.service_name, + e, + ) + except Exception as e: + logger.error( + "Unexpected error during mDNS unregistration for '%s': %r", + self.service_name, + e, + ) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index 5de7078c..de75b1c1 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -61,17 +61,6 @@ class PeerListener(ServiceListener): self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) logger.debug("Updated Peer", peer_info.peer_id) - def _process_discovered_service( - self, zeroconf: Zeroconf, type_: str, name: str - ) -> None: - info = zeroconf.get_service_info(type_, name, timeout=5000) - if not info: - return - peer_info = self._extract_peer_info(info) - if peer_info: - self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) - logger.debug("Discovered:", peer_info.peer_id) - def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None: try: addrs = [ From f274d207155f890309c3b9be0ce887edca896650 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Wed, 25 Jun 2025 23:44:32 +0530 Subject: [PATCH 25/33] feat: attached mdns instance with host --- libp2p/__init__.py | 8 ++------ libp2p/discovery/mdns/listener.py | 4 ++++ libp2p/discovery/mdns/mdns.py | 1 + libp2p/host/basic_host.py | 13 ++++++++++++- libp2p/host/routed_host.py | 6 ++++-- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 6d03ce69..74fde55b 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -269,12 +269,8 @@ def new_host( listen_addrs=listen_addrs, ) - if enable_mDNS: - mdns = MDNSDiscovery(swarm) - mdns.start() - if disc_opt is not None: - return RoutedHost(swarm, disc_opt) - return BasicHost(swarm) + return RoutedHost(swarm, disc_opt, enable_mDNS) + return BasicHost(swarm, enable_mDNS) __version__ = __version("libp2p") diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index de75b1c1..4f7ded0d 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -36,6 +36,7 @@ class PeerListener(ServiceListener): def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: if name == self.service_name: return + logger.debug(f"Adding service: {name}") info = zc.get_service_info(type_, name, timeout=5000) if not info: return @@ -47,6 +48,9 @@ class PeerListener(ServiceListener): logger.debug("Discovered Peer:", peer_info.peer_id) def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: + if name == self.service_name: + return + logger.debug(f"Removing service: {name}") peer_id = self.discovered_services.pop(name) self.peerstore.clear_addrs(peer_id) logger.debug(f"Removed Peer: {peer_id}") diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py index e2a89463..2af6ab36 100644 --- a/libp2p/discovery/mdns/mdns.py +++ b/libp2p/discovery/mdns/mdns.py @@ -68,5 +68,6 @@ class MDNSDiscovery: def stop(self) -> None: """Unregister this peer and clean up zeroconf resources.""" + logger.debug("Stopping mDNS discovery") self.broadcaster.unregister() self.zeroconf.close() diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 1dea876d..ccb37dc2 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -29,6 +29,7 @@ from libp2p.custom_types import ( StreamHandlerFn, TProtocol, ) +from libp2p.discovery.mdns.mdns import MDNSDiscovery from libp2p.host.defaults import ( get_default_protocols, ) @@ -89,6 +90,7 @@ class BasicHost(IHost): def __init__( self, network: INetworkService, + enable_mDNS: bool = False, default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None, ) -> None: self._network = network @@ -98,6 +100,8 @@ class BasicHost(IHost): default_protocols = default_protocols or get_default_protocols(self) self.multiselect = Multiselect(dict(default_protocols.items())) self.multiselect_client = MultiselectClient() + if enable_mDNS: + self.mDNS = MDNSDiscovery(network) def get_id(self) -> ID: """ @@ -162,7 +166,14 @@ class BasicHost(IHost): network = self.get_network() async with background_trio_service(network): await network.listen(*listen_addrs) - yield + if self.mDNS is not None: + logger.debug("Starting mDNS Discovery") + self.mDNS.start() + try: + yield + finally: + if self.mDNS is not None: + self.mDNS.stop() return _run() diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py index b637e1eb..166a15ec 100644 --- a/libp2p/host/routed_host.py +++ b/libp2p/host/routed_host.py @@ -18,8 +18,10 @@ from libp2p.peer.peerinfo import ( class RoutedHost(BasicHost): _router: IPeerRouting - def __init__(self, network: INetworkService, router: IPeerRouting): - super().__init__(network) + def __init__( + self, network: INetworkService, router: IPeerRouting, enable_mDNS: bool = False + ): + super().__init__(network, enable_mDNS) self._router = router async def connect(self, peer_info: PeerInfo) -> None: From 5262566f6aec4fe55b03061a5092be47370b80f6 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Thu, 26 Jun 2025 00:36:59 +0530 Subject: [PATCH 26/33] fix: check for mDNS attribute before accessing it in BasicHost --- libp2p/host/basic_host.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index ccb37dc2..798186cf 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -166,13 +166,13 @@ class BasicHost(IHost): network = self.get_network() async with background_trio_service(network): await network.listen(*listen_addrs) - if self.mDNS is not None: + if hasattr(self, "mDNS") and self.mDNS is not None: logger.debug("Starting mDNS Discovery") self.mDNS.start() try: yield finally: - if self.mDNS is not None: + if hasattr(self, "mDNS") and self.mDNS is not None: self.mDNS.stop() return _run() From c914818f488dc4d59fa685812da8da2cbbdeef87 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Thu, 26 Jun 2025 01:15:10 +0530 Subject: [PATCH 27/33] fix: enhanced logging to show dependencies logs --- examples/mDNS/mDNS.py | 11 +++++++++-- libp2p/discovery/mdns/listener.py | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/examples/mDNS/mDNS.py b/examples/mDNS/mDNS.py index ad5c8fd3..d85e4084 100644 --- a/examples/mDNS/mDNS.py +++ b/examples/mDNS/mDNS.py @@ -14,7 +14,7 @@ from libp2p.crypto.secp256k1 import ( ) from libp2p.discovery.events.peerDiscovery import peerDiscovery -logger = logging.getLogger("libp2p.example.discovery.mdns") +logger = logging.getLogger("libp2p.discovery.mdns") logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter( @@ -22,6 +22,9 @@ handler.setFormatter( ) logger.addHandler(handler) +# Set root logger to DEBUG to capture all logs from dependencies +logging.getLogger().setLevel(logging.DEBUG) + def onPeerDiscovery(peerinfo: PeerInfo): logger.info(f"Discovered: {peerinfo.peer_id}") @@ -55,8 +58,12 @@ def main() -> None: """ parser = argparse.ArgumentParser(description=description) parser.add_argument("-p", "--port", default=0, type=int, help="source port number") - + parser.add_argument( + "-v", "--verbose", action="store_true", help="Enable verbose output" + ) args = parser.parse_args() + if args.verbose: + logger.setLevel(logging.DEBUG) try: trio.run(run, args.port) except KeyboardInterrupt: diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index 4f7ded0d..baec61b5 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -45,7 +45,7 @@ class PeerListener(ServiceListener): self.discovered_services[name] = peer_info.peer_id self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) peerDiscovery.emit_peer_discovered(peer_info) - logger.debug("Discovered Peer:", peer_info.peer_id) + logger.debug(f"Discovered Peer: {peer_info.peer_id}") def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: if name == self.service_name: From 4947578139c3598e65b400112c45c067bdaf25e2 Mon Sep 17 00:00:00 2001 From: guha-rahul <69rahul16@gmail.com> Date: Thu, 26 Jun 2025 19:39:41 +0530 Subject: [PATCH 28/33] add newsfragment --- newsfragments/687.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/687.feature.rst diff --git a/newsfragments/687.feature.rst b/newsfragments/687.feature.rst new file mode 100644 index 00000000..d7985cd6 --- /dev/null +++ b/newsfragments/687.feature.rst @@ -0,0 +1 @@ +Optimized pubsub message writing by implementing a write_msg() method that uses pre-allocated buffers and single write operations, improving performance by eliminating separate varint prefix encoding and write operations in FloodSub and GossipSub. From 4eff928a6d0eff839ee92ce8f46e0be6ed926ccf Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Fri, 27 Jun 2025 00:09:17 +0530 Subject: [PATCH 29/33] fix: update logging messages --- examples/mDNS/mDNS.py | 2 +- libp2p/discovery/mdns/broadcaster.py | 4 ++-- libp2p/discovery/mdns/listener.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/mDNS/mDNS.py b/examples/mDNS/mDNS.py index d85e4084..794e05c8 100644 --- a/examples/mDNS/mDNS.py +++ b/examples/mDNS/mDNS.py @@ -40,7 +40,7 @@ async def run(port: int) -> None: print( "Run this from the same folder in another console to " "start another peer on a different port:\n\n" - "python mdns-demo -p \n" + "mdns-demo -p \n" ) print("Waiting for mDNS peer discovery events...\n") diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py index 223d747c..59dec80f 100644 --- a/libp2p/discovery/mdns/broadcaster.py +++ b/libp2p/discovery/mdns/broadcaster.py @@ -60,7 +60,7 @@ class PeerBroadcaster: """Register the peer's mDNS service on the network.""" try: self.zeroconf.register_service(self.service_info) - logger.debug("mDNS service registered: %s", self.service_name) + logger.debug(f"mDNS service registered: {self.service_name}") except EventLoopBlocked as e: logger.warning( "EventLoopBlocked while registering mDNS '%s': %s", self.service_name, e @@ -76,7 +76,7 @@ class PeerBroadcaster: """Unregister the peer's mDNS service from the network.""" try: self.zeroconf.unregister_service(self.service_info) - logger.debug("mDNS service unregistered: %s", self.service_name) + logger.debug(f"mDNS service unregistered: {self.service_name}") except EventLoopBlocked as e: logger.warning( "EventLoopBlocked while unregistering mDNS '%s': %s", diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py index baec61b5..2061b344 100644 --- a/libp2p/discovery/mdns/listener.py +++ b/libp2p/discovery/mdns/listener.py @@ -63,7 +63,7 @@ class PeerListener(ServiceListener): if peer_info: self.peerstore.clear_addrs(peer_info.peer_id) self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) - logger.debug("Updated Peer", peer_info.peer_id) + logger.debug(f"Updated Peer {peer_info.peer_id}") def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None: try: From 2201d9e8d270d2bc8bdf1097921560a8eecc0ce5 Mon Sep 17 00:00:00 2001 From: guha-rahul <52607971+guha-rahul@users.noreply.github.com> Date: Fri, 27 Jun 2025 13:53:06 +0530 Subject: [PATCH 30/33] update link --- libp2p/pubsub/pubsub.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 8e099beb..6b485933 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -785,8 +785,8 @@ class Pubsub(Service, IPubsub): """ Write an RPC message to a stream with proper error handling. - Implements WriteMsg similar to go-libp2p-pubsub comm.go - Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 + Implements WriteMsg similar to go-msgio which is used in go-libp2p + Ref: https://github.com/libp2p/go-msgio/blob/master/protoio/uvarint_writer.go#L56 :param stream: stream to write the message to From ef16f3c99310088c9cfb3df60d5266c2b1074720 Mon Sep 17 00:00:00 2001 From: acul71 <34693171+acul71@users.noreply.github.com> Date: Sun, 29 Jun 2025 10:50:17 +0200 Subject: [PATCH 31/33] fix: accept new streams for both DATA and WINDOW_UPDATE frames with the SYN flag (#702) * fix: accept new streams for both and frames with the flag * doc: newsfragment --------- Co-authored-by: Manu Sheel Gupta --- libp2p/stream_muxer/yamux/yamux.py | 2 +- newsfragments/701.bugfix.rst | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 newsfragments/701.bugfix.rst diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index 92123465..586bbc2d 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -493,7 +493,7 @@ class Yamux(IMuxedConn): f"type={typ}, flags={flags}, stream_id={stream_id}," f"length={length}" ) - if typ == TYPE_DATA and flags & FLAG_SYN: + if (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE) and flags & FLAG_SYN: async with self.streams_lock: if stream_id not in self.streams: stream = YamuxStream(stream_id, self, False) diff --git a/newsfragments/701.bugfix.rst b/newsfragments/701.bugfix.rst new file mode 100644 index 00000000..2bb1be5d --- /dev/null +++ b/newsfragments/701.bugfix.rst @@ -0,0 +1 @@ +align stream creation logic with yamux specification From 211e951678ba73a7f49c30839e38503a8b2b5707 Mon Sep 17 00:00:00 2001 From: varunrmallya <100590632+varun-r-mallya@users.noreply.github.com> Date: Sun, 29 Jun 2025 16:02:00 +0530 Subject: [PATCH 32/33] fix: improve async validator handling in Pubsub class (#705) Signed-off-by: varun-r-mallya --- libp2p/pubsub/pubsub.py | 9 ++++----- newsfragments/702.bugfix.rst | 1 + 2 files changed, 5 insertions(+), 5 deletions(-) create mode 100644 newsfragments/702.bugfix.rst diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 8ba7d471..78c2fca5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -682,19 +682,18 @@ class Pubsub(Service, IPubsub): # TODO: Implement throttle on async validators if len(async_topic_validators) > 0: - # TODO: Use a better pattern - final_result: bool = True + # Appends to lists are thread safe in CPython + results = [] async def run_async_validator(func: AsyncValidatorFn) -> None: - nonlocal final_result result = await func(msg_forwarder, msg) - final_result = final_result and result + results.append(result) async with trio.open_nursery() as nursery: for async_validator in async_topic_validators: nursery.start_soon(run_async_validator, async_validator) - if not final_result: + if not all(results): raise ValidationError(f"Validation failed for msg={msg}") async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: diff --git a/newsfragments/702.bugfix.rst b/newsfragments/702.bugfix.rst new file mode 100644 index 00000000..90f91f88 --- /dev/null +++ b/newsfragments/702.bugfix.rst @@ -0,0 +1 @@ +Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior. From e8e0cf74d1f220d4266408f93f7c07dee2be6804 Mon Sep 17 00:00:00 2001 From: "sumanjeet0012@gmail.com" Date: Sun, 29 Jun 2025 16:38:52 +0530 Subject: [PATCH 33/33] docs: add mDNS discovery option to new_host function docs --- libp2p/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 74fde55b..23fc65e6 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -258,6 +258,7 @@ def new_host( :param disc_opt: optional discovery :param muxer_preference: optional explicit muxer preference :param listen_addrs: optional list of multiaddrs to listen on + :param enable_mDNS: whether to enable mDNS discovery :return: return a host instance """ swarm = new_swarm(