diff --git a/docs/examples.mDNS.rst b/docs/examples.mDNS.rst new file mode 100644 index 00000000..d6ec6aaf --- /dev/null +++ b/docs/examples.mDNS.rst @@ -0,0 +1,64 @@ +mDNS Peer Discovery Example +=========================== + +This example demonstrates how to use mDNS (Multicast DNS) for peer discovery in py-libp2p. + +Prerequisites +------------- + +First, ensure you have py-libp2p installed and your environment is activated: + +.. code-block:: console + + $ python -m pip install libp2p + +Running the Example +------------------- + +The mDNS demo script allows you to discover peers on your local network using mDNS. To start a peer, run: + +.. code-block:: console + + $ mdns-demo + +You should see output similar to: + +.. code-block:: console + + Run this from another console to start another peer on a different port: + + python mdns-demo -p + + Waiting for mDNS peer discovery events... + + 2025-06-20 23:28:12,052 - libp2p.example.discovery.mdns - INFO - Starting peer Discovery + +To discover peers, open another terminal and run the same command with a different port: + +.. code-block:: console + + $ python mdns-demo -p 9001 + +You should see output indicating that a new peer has been discovered: + +.. code-block:: console + + Run this from the same folder in another console to start another peer on a different port: + + python mdns-demo -p + + Waiting for mDNS peer discovery events... + + 2025-06-20 23:43:43,786 - libp2p.example.discovery.mdns - INFO - Starting peer Discovery + 2025-06-20 23:43:43,790 - libp2p.example.discovery.mdns - INFO - Discovered: 16Uiu2HAmGxy5NdQEjZWtrYUMrzdp3Syvg7MB2E5Lx8weA9DanYxj + +When a new peer is discovered, its peer ID will be printed in the console output. + +How it Works +------------ + +- Each node advertises itself on the local network using mDNS. +- When a new peer is discovered, the handler prints its peer ID. +- This is useful for local peer discovery without requiring a DHT or bootstrap nodes. + +You can modify the script to perform additional actions when peers are discovered, such as opening streams or exchanging messages. diff --git a/docs/examples.rst b/docs/examples.rst index 676216a9..b20d0e63 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -13,3 +13,4 @@ Examples examples.pubsub examples.circuit_relay examples.kademlia + examples.mDNS diff --git a/docs/libp2p.discovery.events.rst b/docs/libp2p.discovery.events.rst new file mode 100644 index 00000000..7a0edba9 --- /dev/null +++ b/docs/libp2p.discovery.events.rst @@ -0,0 +1,21 @@ +libp2p.discovery.events package +=============================== + +Submodules +---------- + +libp2p.discovery.events.peerDiscovery module +-------------------------------------------- + +.. automodule:: libp2p.discovery.events.peerDiscovery + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.discovery.events + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/libp2p.discovery.mdns.rst b/docs/libp2p.discovery.mdns.rst new file mode 100644 index 00000000..af842919 --- /dev/null +++ b/docs/libp2p.discovery.mdns.rst @@ -0,0 +1,45 @@ +libp2p.discovery.mdns package +============================= + +Submodules +---------- + +libp2p.discovery.mdns.broadcaster module +---------------------------------------- + +.. automodule:: libp2p.discovery.mdns.broadcaster + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.mdns.listener module +------------------------------------- + +.. automodule:: libp2p.discovery.mdns.listener + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.mdns.mdns module +--------------------------------- + +.. automodule:: libp2p.discovery.mdns.mdns + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.mdns.utils module +---------------------------------- + +.. automodule:: libp2p.discovery.mdns.utils + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.discovery.mdns + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/libp2p.discovery.rst b/docs/libp2p.discovery.rst new file mode 100644 index 00000000..cb8859a4 --- /dev/null +++ b/docs/libp2p.discovery.rst @@ -0,0 +1,22 @@ +libp2p.discovery package +======================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + libp2p.discovery.events + libp2p.discovery.mdns + +Submodules +---------- + +Module contents +--------------- + +.. automodule:: libp2p.discovery + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/libp2p.rst b/docs/libp2p.rst index 749b4c11..a47e011a 100644 --- a/docs/libp2p.rst +++ b/docs/libp2p.rst @@ -8,6 +8,7 @@ Subpackages :maxdepth: 4 libp2p.crypto + libp2p.discovery libp2p.host libp2p.identity libp2p.io diff --git a/examples/mDNS/mDNS.py b/examples/mDNS/mDNS.py new file mode 100644 index 00000000..794e05c8 --- /dev/null +++ b/examples/mDNS/mDNS.py @@ -0,0 +1,74 @@ +import argparse +import logging +import secrets + +import multiaddr +import trio + +from libp2p import ( + new_host, +) +from libp2p.abc import PeerInfo +from libp2p.crypto.secp256k1 import ( + create_new_key_pair, +) +from libp2p.discovery.events.peerDiscovery import peerDiscovery + +logger = logging.getLogger("libp2p.discovery.mdns") +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +) +logger.addHandler(handler) + +# Set root logger to DEBUG to capture all logs from dependencies +logging.getLogger().setLevel(logging.DEBUG) + + +def onPeerDiscovery(peerinfo: PeerInfo): + logger.info(f"Discovered: {peerinfo.peer_id}") + + +async def run(port: int) -> None: + secret = secrets.token_bytes(32) + key_pair = create_new_key_pair(secret) + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + peerDiscovery.register_peer_discovered_handler(onPeerDiscovery) + + print( + "Run this from the same folder in another console to " + "start another peer on a different port:\n\n" + "mdns-demo -p \n" + ) + print("Waiting for mDNS peer discovery events...\n") + + logger.info("Starting peer Discovery") + host = new_host(key_pair=key_pair, enable_mDNS=True) + async with host.run(listen_addrs=[listen_addr]): + await trio.sleep_forever() + + +def main() -> None: + description = """ + This program demonstrates mDNS peer discovery using libp2p. + To use it, run 'mdns-demo -p ', where is the port number. + Start multiple peers on different ports to see discovery in action. + """ + parser = argparse.ArgumentParser(description=description) + parser.add_argument("-p", "--port", default=0, type=int, help="source port number") + parser.add_argument( + "-v", "--verbose", action="store_true", help="Enable verbose output" + ) + args = parser.parse_args() + if args.verbose: + logger.setLevel(logging.DEBUG) + try: + trio.run(run, args.port) + except KeyboardInterrupt: + logger.info("Exiting...") + + +if __name__ == "__main__": + main() diff --git a/libp2p/__init__.py b/libp2p/__init__.py index de07c78b..fa7ebefd 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -32,6 +32,9 @@ from libp2p.custom_types import ( TProtocol, TSecurityOptions, ) +from libp2p.discovery.mdns.mdns import ( + MDNSDiscovery, +) from libp2p.host.basic_host import ( BasicHost, ) @@ -245,6 +248,7 @@ def new_host( disc_opt: IPeerRouting | None = None, muxer_preference: Literal["YAMUX", "MPLEX"] | None = None, listen_addrs: Sequence[multiaddr.Multiaddr] | None = None, + enable_mDNS: bool = False, ) -> IHost: """ Create a new libp2p host based on the given parameters. @@ -256,6 +260,7 @@ def new_host( :param disc_opt: optional discovery :param muxer_preference: optional explicit muxer preference :param listen_addrs: optional list of multiaddrs to listen on + :param enable_mDNS: whether to enable mDNS discovery :return: return a host instance """ swarm = new_swarm( @@ -268,8 +273,7 @@ def new_host( ) if disc_opt is not None: - return RoutedHost(swarm, disc_opt) - return BasicHost(swarm) - + return RoutedHost(swarm, disc_opt, enable_mDNS) + return BasicHost(swarm, enable_mDNS) __version__ = __version("libp2p") diff --git a/libp2p/discovery/__init__.py b/libp2p/discovery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/events/__init__.py b/libp2p/discovery/events/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/events/peerDiscovery.py b/libp2p/discovery/events/peerDiscovery.py new file mode 100644 index 00000000..6b2d30d0 --- /dev/null +++ b/libp2p/discovery/events/peerDiscovery.py @@ -0,0 +1,26 @@ +from collections.abc import ( + Callable, +) + +from libp2p.abc import ( + PeerInfo, +) + +TTL: int = 60 * 60 # Time-to-live for discovered peers in seconds + + +class PeerDiscovery: + def __init__(self) -> None: + self._peer_discovered_handlers: list[Callable[[PeerInfo], None]] = [] + + def register_peer_discovered_handler( + self, handler: Callable[[PeerInfo], None] + ) -> None: + self._peer_discovered_handlers.append(handler) + + def emit_peer_discovered(self, peer_info: PeerInfo) -> None: + for handler in self._peer_discovered_handlers: + handler(peer_info) + + +peerDiscovery = PeerDiscovery() diff --git a/libp2p/discovery/mdns/__init__.py b/libp2p/discovery/mdns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/discovery/mdns/broadcaster.py b/libp2p/discovery/mdns/broadcaster.py new file mode 100644 index 00000000..59dec80f --- /dev/null +++ b/libp2p/discovery/mdns/broadcaster.py @@ -0,0 +1,91 @@ +import logging +import socket + +from zeroconf import ( + EventLoopBlocked, + ServiceInfo, + Zeroconf, +) + +logger = logging.getLogger("libp2p.discovery.mdns.broadcaster") + + +class PeerBroadcaster: + """ + Broadcasts this peer's presence on the local network using mDNS/zeroconf. + Registers a service with the peer's ID in the TXT record as per libp2p spec. + """ + + def __init__( + self, + zeroconf: Zeroconf, + service_type: str, + service_name: str, + peer_id: str, + port: int, + ): + self.zeroconf = zeroconf + self.service_type = service_type + self.peer_id = peer_id + self.port = port + self.service_name = service_name + + # Get the local IP address + local_ip = self._get_local_ip() + hostname = socket.gethostname() + + self.service_info = ServiceInfo( + type_=self.service_type, + name=self.service_name, + port=self.port, + properties={b"id": self.peer_id.encode()}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton(local_ip)], + ) + + def _get_local_ip(self) -> str: + """Get the local IP address of this machine""" + try: + # Connect to a remote address to determine the local IP + # This doesn't actually send data + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + local_ip = s.getsockname()[0] + return local_ip + except Exception: + # Fallback to localhost if we can't determine the IP + return "127.0.0.1" + + def register(self) -> None: + """Register the peer's mDNS service on the network.""" + try: + self.zeroconf.register_service(self.service_info) + logger.debug(f"mDNS service registered: {self.service_name}") + except EventLoopBlocked as e: + logger.warning( + "EventLoopBlocked while registering mDNS '%s': %s", self.service_name, e + ) + except Exception as e: + logger.error( + "Unexpected error during mDNS registration for '%s': %r", + self.service_name, + e, + ) + + def unregister(self) -> None: + """Unregister the peer's mDNS service from the network.""" + try: + self.zeroconf.unregister_service(self.service_info) + logger.debug(f"mDNS service unregistered: {self.service_name}") + except EventLoopBlocked as e: + logger.warning( + "EventLoopBlocked while unregistering mDNS '%s': %s", + self.service_name, + e, + ) + except Exception as e: + logger.error( + "Unexpected error during mDNS unregistration for '%s': %r", + self.service_name, + e, + ) diff --git a/libp2p/discovery/mdns/listener.py b/libp2p/discovery/mdns/listener.py new file mode 100644 index 00000000..2061b344 --- /dev/null +++ b/libp2p/discovery/mdns/listener.py @@ -0,0 +1,83 @@ +import logging +import socket + +from zeroconf import ( + ServiceBrowser, + ServiceInfo, + ServiceListener, + Zeroconf, +) + +from libp2p.abc import IPeerStore, Multiaddr +from libp2p.discovery.events.peerDiscovery import peerDiscovery +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo + +logger = logging.getLogger("libp2p.discovery.mdns.listener") + + +class PeerListener(ServiceListener): + """mDNS listener — now a true ServiceListener subclass.""" + + def __init__( + self, + peerstore: IPeerStore, + zeroconf: Zeroconf, + service_type: str, + service_name: str, + ) -> None: + self.peerstore = peerstore + self.zeroconf = zeroconf + self.service_type = service_type + self.service_name = service_name + self.discovered_services: dict[str, ID] = {} + self.browser = ServiceBrowser(self.zeroconf, self.service_type, listener=self) + + def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: + if name == self.service_name: + return + logger.debug(f"Adding service: {name}") + info = zc.get_service_info(type_, name, timeout=5000) + if not info: + return + peer_info = self._extract_peer_info(info) + if peer_info: + self.discovered_services[name] = peer_info.peer_id + self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + peerDiscovery.emit_peer_discovered(peer_info) + logger.debug(f"Discovered Peer: {peer_info.peer_id}") + + def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: + if name == self.service_name: + return + logger.debug(f"Removing service: {name}") + peer_id = self.discovered_services.pop(name) + self.peerstore.clear_addrs(peer_id) + logger.debug(f"Removed Peer: {peer_id}") + + def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: + info = zc.get_service_info(type_, name, timeout=5000) + if not info: + return + peer_info = self._extract_peer_info(info) + if peer_info: + self.peerstore.clear_addrs(peer_info.peer_id) + self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + logger.debug(f"Updated Peer {peer_info.peer_id}") + + def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None: + try: + addrs = [ + Multiaddr(f"/ip4/{socket.inet_ntoa(addr)}/tcp/{info.port}") + for addr in info.addresses + ] + pid_bytes = info.properties.get(b"id") + if not pid_bytes: + return None + pid = ID.from_base58(pid_bytes.decode()) + return PeerInfo(peer_id=pid, addrs=addrs) + except Exception: + return None + + def stop(self) -> None: + self.browser.cancel() diff --git a/libp2p/discovery/mdns/mdns.py b/libp2p/discovery/mdns/mdns.py new file mode 100644 index 00000000..2af6ab36 --- /dev/null +++ b/libp2p/discovery/mdns/mdns.py @@ -0,0 +1,73 @@ +""" +mDNS-based peer discovery for py-libp2p. +Conforms to https://github.com/libp2p/specs/blob/master/discovery/mdns.md +Uses zeroconf for mDNS broadcast/listen. Async operations use trio. +""" + +import logging + +from zeroconf import ( + Zeroconf, +) + +from libp2p.abc import ( + INetworkService, +) + +from .broadcaster import ( + PeerBroadcaster, +) +from .listener import ( + PeerListener, +) +from .utils import ( + stringGen, +) + +logger = logging.getLogger("libp2p.discovery.mdns") + +SERVICE_TYPE = "_p2p._udp.local." +MCAST_PORT = 5353 +MCAST_ADDR = "224.0.0.251" + + +class MDNSDiscovery: + """ + mDNS-based peer discovery for py-libp2p, using zeroconf. + Conforms to the libp2p mDNS discovery spec. + """ + + def __init__(self, swarm: INetworkService, port: int = 8000): + self.peer_id = str(swarm.get_peer_id()) + self.port = port + self.zeroconf = Zeroconf() + self.serviceName = f"{stringGen()}.{SERVICE_TYPE}" + self.peerstore = swarm.peerstore + self.swarm = swarm + self.broadcaster = PeerBroadcaster( + zeroconf=self.zeroconf, + service_type=SERVICE_TYPE, + service_name=self.serviceName, + peer_id=self.peer_id, + port=self.port, + ) + self.listener = PeerListener( + zeroconf=self.zeroconf, + peerstore=self.peerstore, + service_type=SERVICE_TYPE, + service_name=self.serviceName, + ) + + def start(self) -> None: + """Register this peer and start listening for others.""" + logger.debug( + f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}" + ) + self.broadcaster.register() + # Listener is started in constructor + + def stop(self) -> None: + """Unregister this peer and clean up zeroconf resources.""" + logger.debug("Stopping mDNS discovery") + self.broadcaster.unregister() + self.zeroconf.close() diff --git a/libp2p/discovery/mdns/utils.py b/libp2p/discovery/mdns/utils.py new file mode 100644 index 00000000..eb05d03a --- /dev/null +++ b/libp2p/discovery/mdns/utils.py @@ -0,0 +1,11 @@ +import random +import string + + +def stringGen(len: int = 63) -> str: + """Generate a random string of lowercase letters and digits.""" + charset = string.ascii_lowercase + string.digits + result = [] + for _ in range(len): + result.append(random.choice(charset)) + return "".join(result) diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 1dea876d..798186cf 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -29,6 +29,7 @@ from libp2p.custom_types import ( StreamHandlerFn, TProtocol, ) +from libp2p.discovery.mdns.mdns import MDNSDiscovery from libp2p.host.defaults import ( get_default_protocols, ) @@ -89,6 +90,7 @@ class BasicHost(IHost): def __init__( self, network: INetworkService, + enable_mDNS: bool = False, default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None, ) -> None: self._network = network @@ -98,6 +100,8 @@ class BasicHost(IHost): default_protocols = default_protocols or get_default_protocols(self) self.multiselect = Multiselect(dict(default_protocols.items())) self.multiselect_client = MultiselectClient() + if enable_mDNS: + self.mDNS = MDNSDiscovery(network) def get_id(self) -> ID: """ @@ -162,7 +166,14 @@ class BasicHost(IHost): network = self.get_network() async with background_trio_service(network): await network.listen(*listen_addrs) - yield + if hasattr(self, "mDNS") and self.mDNS is not None: + logger.debug("Starting mDNS Discovery") + self.mDNS.start() + try: + yield + finally: + if hasattr(self, "mDNS") and self.mDNS is not None: + self.mDNS.stop() return _run() diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py index b637e1eb..166a15ec 100644 --- a/libp2p/host/routed_host.py +++ b/libp2p/host/routed_host.py @@ -18,8 +18,10 @@ from libp2p.peer.peerinfo import ( class RoutedHost(BasicHost): _router: IPeerRouting - def __init__(self, network: INetworkService, router: IPeerRouting): - super().__init__(network) + def __init__( + self, network: INetworkService, router: IPeerRouting, enable_mDNS: bool = False + ): + super().__init__(network, enable_mDNS) self._router = router async def connect(self, peer_info: PeerInfo) -> None: diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 93d01f1a..3e0d454f 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -12,15 +12,9 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.network.stream.exceptions import ( - StreamClosed, -) from libp2p.peer.id import ( ID, ) -from libp2p.utils import ( - encode_varint_prefixed, -) from .exceptions import ( PubsubRouterError, @@ -120,13 +114,7 @@ class FloodSub(IPubsubRouter): if peer_id not in pubsub.peers: continue stream = pubsub.peers[peer_id] - # FIXME: We should add a `WriteMsg` similar to write delimited messages. - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - try: - await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) - except StreamClosed: - logger.debug("Fail to publish message to %s: stream closed", peer_id) - pubsub._handle_dead_peer(peer_id) + await pubsub.write_msg(stream, rpc_msg) async def join(self, topic: str) -> None: """ diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 839d6719..cebc438b 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -24,9 +24,6 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.network.stream.exceptions import ( - StreamClosed, -) from libp2p.peer.id import ( ID, ) @@ -44,9 +41,6 @@ from libp2p.pubsub import ( from libp2p.tools.async_service import ( Service, ) -from libp2p.utils import ( - encode_varint_prefixed, -) from .exceptions import ( NoPubsubAttached, @@ -267,13 +261,10 @@ class GossipSub(IPubsubRouter, Service): if peer_id not in self.pubsub.peers: continue stream = self.pubsub.peers[peer_id] - # FIXME: We should add a `WriteMsg` similar to write delimited messages. - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - try: - await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) - except StreamClosed: - logger.debug("Fail to publish message to %s: stream closed", peer_id) - self.pubsub._handle_dead_peer(peer_id) + + # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. + await self.pubsub.write_msg(stream, rpc_msg) + for topic in pubsub_msg.topicIDs: self.time_since_last_publish[topic] = int(time.time()) @@ -829,8 +820,6 @@ class GossipSub(IPubsubRouter, Service): packet.publish.extend(msgs_to_forward) - # 2) Serialize that packet - rpc_msg: bytes = packet.SerializeToString() if self.pubsub is None: raise NoPubsubAttached @@ -844,14 +833,7 @@ class GossipSub(IPubsubRouter, Service): peer_stream = self.pubsub.peers[sender_peer_id] # 4) And write the packet to the stream - try: - await peer_stream.write(encode_varint_prefixed(rpc_msg)) - except StreamClosed: - logger.debug( - "Fail to responed to iwant request from %s: stream closed", - sender_peer_id, - ) - self.pubsub._handle_dead_peer(sender_peer_id) + await self.pubsub.write_msg(peer_stream, packet) async def handle_graft( self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID @@ -993,8 +975,6 @@ class GossipSub(IPubsubRouter, Service): packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.control.CopyFrom(control_msg) - rpc_msg: bytes = packet.SerializeToString() - # Get stream for peer from pubsub if to_peer not in self.pubsub.peers: logger.debug( @@ -1004,8 +984,4 @@ class GossipSub(IPubsubRouter, Service): peer_stream = self.pubsub.peers[to_peer] # Write rpc to stream - try: - await peer_stream.write(encode_varint_prefixed(rpc_msg)) - except StreamClosed: - logger.debug("Fail to emit control message to %s: stream closed", to_peer) - self.pubsub._handle_dead_peer(to_peer) + await self.pubsub.write_msg(peer_stream, packet) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 481c8981..1614bedc 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -60,6 +60,7 @@ from libp2p.utils import ( encode_varint_prefixed, read_varint_prefixed_bytes, ) +from libp2p.utils.varint import encode_uvarint from .pb import ( rpc_pb2, @@ -828,3 +829,43 @@ class Pubsub(Service, IPubsub): def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: return any(topic in self.topic_ids for topic in msg.topicIDs) + + async def write_msg(self, stream: INetStream, rpc_msg: rpc_pb2.RPC) -> bool: + """ + Write an RPC message to a stream with proper error handling. + + Implements WriteMsg similar to go-msgio which is used in go-libp2p + Ref: https://github.com/libp2p/go-msgio/blob/master/protoio/uvarint_writer.go#L56 + + + :param stream: stream to write the message to + :param rpc_msg: RPC message to write + :return: True if successful, False if stream was closed + """ + try: + # Calculate message size first + msg_bytes = rpc_msg.SerializeToString() + msg_size = len(msg_bytes) + + # Calculate varint size and allocate exact buffer size needed + + varint_bytes = encode_uvarint(msg_size) + varint_size = len(varint_bytes) + + # Allocate buffer with exact size (like Go's pool.Get()) + buf = bytearray(varint_size + msg_size) + + # Write varint length prefix to buffer (like Go's binary.PutUvarint()) + buf[:varint_size] = varint_bytes + + # Write serialized message after varint (like Go's rpc.MarshalTo()) + buf[varint_size:] = msg_bytes + + # Single write operation (like Go's s.Write(buf)) + await stream.write(bytes(buf)) + return True + except StreamClosed: + peer_id = stream.muxed_conn.peer_id + logger.debug("Fail to write message to %s: stream closed", peer_id) + self._handle_dead_peer(peer_id) + return False diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index 92123465..586bbc2d 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -493,7 +493,7 @@ class Yamux(IMuxedConn): f"type={typ}, flags={flags}, stream_id={stream_id}," f"length={length}" ) - if typ == TYPE_DATA and flags & FLAG_SYN: + if (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE) and flags & FLAG_SYN: async with self.streams_lock: if stream_id not in self.streams: stream = YamuxStream(stream_id, self, False) diff --git a/newsfragments/649.feature.rst b/newsfragments/649.feature.rst new file mode 100644 index 00000000..82ba5cd7 --- /dev/null +++ b/newsfragments/649.feature.rst @@ -0,0 +1 @@ +Added support for ``Multicast DNS`` in py-libp2p diff --git a/newsfragments/687.feature.rst b/newsfragments/687.feature.rst new file mode 100644 index 00000000..d7985cd6 --- /dev/null +++ b/newsfragments/687.feature.rst @@ -0,0 +1 @@ +Optimized pubsub message writing by implementing a write_msg() method that uses pre-allocated buffers and single write operations, improving performance by eliminating separate varint prefix encoding and write operations in FloodSub and GossipSub. diff --git a/newsfragments/701.bugfix.rst b/newsfragments/701.bugfix.rst new file mode 100644 index 00000000..2bb1be5d --- /dev/null +++ b/newsfragments/701.bugfix.rst @@ -0,0 +1 @@ +align stream creation logic with yamux specification diff --git a/newsfragments/702.bugfix.rst b/newsfragments/702.bugfix.rst new file mode 100644 index 00000000..90f91f88 --- /dev/null +++ b/newsfragments/702.bugfix.rst @@ -0,0 +1 @@ +Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior. diff --git a/newsfragments/707.feature.rst b/newsfragments/707.feature.rst new file mode 100644 index 00000000..7c521d29 --- /dev/null +++ b/newsfragments/707.feature.rst @@ -0,0 +1 @@ +Added comprehensive tests for pubsub connection utility functions to verify degree limits are enforced, excess peers are handled correctly, and edge cases (degree=0, negative values, empty lists) are managed gracefully. diff --git a/pyproject.toml b/pyproject.toml index 91803ada..cf000156 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "trio-typing>=0.0.4", "trio>=0.26.0", "fastecdsa==2.3.2; sys_platform != 'win32'", + "zeroconf (>=0.147.0,<0.148.0)", ] classifiers = [ "Development Status :: 4 - Beta", @@ -54,6 +55,7 @@ identify-demo = "examples.identify.identify:main" identify-push-demo = "examples.identify_push.identify_push_demo:run_main" identify-push-listener-dialer-demo = "examples.identify_push.identify_push_listener_dialer:main" pubsub-demo = "examples.pubsub.pubsub:main" +mdns-demo = "examples.mDNS.mDNS:main" [project.optional-dependencies] dev = [ diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 03276a78..91205b29 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -15,6 +15,7 @@ from tests.utils.factories import ( PubsubFactory, ) from tests.utils.pubsub.utils import ( + connect_some, dense_connect, one_to_all_connect, sparse_connect, @@ -590,3 +591,166 @@ async def test_sparse_connect(): f"received the message. Ideally all nodes should receive it, but at " f"minimum {min_required} required for sparse network scalability." ) + + +@pytest.mark.trio +async def test_connect_some_with_fewer_hosts_than_degree(): + """Test connect_some when there are fewer hosts than degree.""" + # Create 3 hosts with degree=5 + async with PubsubFactory.create_batch_with_floodsub(3) as pubsubs_fsub: + hosts = [pubsub.host for pubsub in pubsubs_fsub] + degree = 5 + + await connect_some(hosts, degree) + await trio.sleep(0.1) # Allow connections to establish + + # Each host should connect to all other hosts (since there are only 2 others) + for i, pubsub in enumerate(pubsubs_fsub): + connected_peers = len(pubsub.peers) + expected_max_connections = len(hosts) - 1 # All others + assert connected_peers <= expected_max_connections, ( + f"Host {i} has {connected_peers} connections, " + f"but can only connect to {expected_max_connections} others" + ) + + +@pytest.mark.trio +async def test_connect_some_degree_limit_enforced(): + """Test that connect_some enforces degree limits and creates expected topology.""" + # Test with small network where we can verify exact behavior + async with PubsubFactory.create_batch_with_floodsub(6) as pubsubs_fsub: + hosts = [pubsub.host for pubsub in pubsubs_fsub] + degree = 2 + + await connect_some(hosts, degree) + await trio.sleep(0.1) + + # With 6 hosts and degree=2, expected connections: + # Host 0 → connects to hosts 1,2 (2 peers total) + # Host 1 → connects to hosts 2,3 (3 peers: 0,2,3) + # Host 2 → connects to hosts 3,4 (4 peers: 0,1,3,4) + # Host 3 → connects to hosts 4,5 (3 peers: 1,2,4,5) - wait, that's 4! + # Host 4 → connects to host 5 (3 peers: 2,3,5) + # Host 5 → (2 peers: 3,4) + + peer_counts = [len(pubsub.peers) for pubsub in pubsubs_fsub] + + # First and last hosts should have exactly degree connections + assert peer_counts[0] == degree, ( + f"Host 0 should have {degree} peers, got {peer_counts[0]}" + ) + assert peer_counts[-1] <= degree, ( + f"Last host should have ≤ {degree} peers, got {peer_counts[-1]}" + ) + + # Middle hosts may have more due to bidirectional connections + # but the pattern should be consistent with degree limit + total_connections = sum(peer_counts) + + # Should be less than full mesh (each host connected to all others) + full_mesh_connections = len(hosts) * (len(hosts) - 1) + assert total_connections < full_mesh_connections, ( + f"Got {total_connections} total connections, " + f"but full mesh would be {full_mesh_connections}" + ) + + # Should be more than just a chain (each host connected to next only) + chain_connections = 2 * (len(hosts) - 1) # bidirectional chain + assert total_connections > chain_connections, ( + f"Got {total_connections} total connections, which is too few " + f"(chain would be {chain_connections})" + ) + + +@pytest.mark.trio +async def test_connect_some_degree_zero(): + """Test edge case: degree=0 should result in no connections.""" + # Create 5 hosts with degree=0 + async with PubsubFactory.create_batch_with_floodsub(5) as pubsubs_fsub: + hosts = [pubsub.host for pubsub in pubsubs_fsub] + degree = 0 + + await connect_some(hosts, degree) + await trio.sleep(0.1) # Allow any potential connections to establish + + # Verify no connections were made + for i, pubsub in enumerate(pubsubs_fsub): + connected_peers = len(pubsub.peers) + assert connected_peers == 0, ( + f"Host {i} has {connected_peers} connections, " + f"but degree=0 should result in no connections" + ) + + +@pytest.mark.trio +async def test_connect_some_negative_degree(): + """Test edge case: negative degree should be handled gracefully.""" + # Create 5 hosts with degree=-1 + async with PubsubFactory.create_batch_with_floodsub(5) as pubsubs_fsub: + hosts = [pubsub.host for pubsub in pubsubs_fsub] + degree = -1 + + await connect_some(hosts, degree) + await trio.sleep(0.1) # Allow any potential connections to establish + + # Verify no connections were made (negative degree should behave like 0) + for i, pubsub in enumerate(pubsubs_fsub): + connected_peers = len(pubsub.peers) + assert connected_peers == 0, ( + f"Host {i} has {connected_peers} connections, " + f"but negative degree should result in no connections" + ) + + +@pytest.mark.trio +async def test_sparse_connect_degree_zero(): + """Test sparse_connect with degree=0.""" + async with PubsubFactory.create_batch_with_floodsub(8) as pubsubs_fsub: + hosts = [pubsub.host for pubsub in pubsubs_fsub] + degree = 0 + + await sparse_connect(hosts, degree) + await trio.sleep(0.1) # Allow connections to establish + + # With degree=0, sparse_connect should still create neighbor connections + # for connectivity (this is part of the algorithm design) + for i, pubsub in enumerate(pubsubs_fsub): + connected_peers = len(pubsub.peers) + # Should have some connections due to neighbor connectivity + # (each node connects to immediate neighbors) + expected_neighbors = 2 # previous and next in ring + assert connected_peers >= expected_neighbors, ( + f"Host {i} has {connected_peers} connections, " + f"expected at least {expected_neighbors} neighbor connections" + ) + + +@pytest.mark.trio +async def test_empty_host_list(): + """Test edge case: empty host list should be handled gracefully.""" + hosts = [] + + # All functions should handle empty lists gracefully + await connect_some(hosts, 5) + await sparse_connect(hosts, 3) + await dense_connect(hosts) + + # If we reach here without exceptions, the test passes + + +@pytest.mark.trio +async def test_single_host(): + """Test edge case: single host should be handled gracefully.""" + async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub: + hosts = [pubsub.host for pubsub in pubsubs_fsub] + + # All functions should handle single host gracefully + await connect_some(hosts, 5) + await sparse_connect(hosts, 3) + await dense_connect(hosts) + + # Single host should have no connections + connected_peers = len(pubsubs_fsub[0].peers) + assert connected_peers == 0, ( + f"Single host has {connected_peers} connections, expected 0" + ) diff --git a/tests/discovery/__init__.py b/tests/discovery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/discovery/mdns/__init__.py b/tests/discovery/mdns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/discovery/mdns/test_broadcaster.py b/tests/discovery/mdns/test_broadcaster.py new file mode 100644 index 00000000..cdb2e8c4 --- /dev/null +++ b/tests/discovery/mdns/test_broadcaster.py @@ -0,0 +1,91 @@ +""" +Unit tests for mDNS broadcaster component. +""" + +from zeroconf import Zeroconf + +from libp2p.discovery.mdns.broadcaster import PeerBroadcaster +from libp2p.peer.id import ID + + +class TestPeerBroadcaster: + """Unit tests for PeerBroadcaster.""" + + def test_broadcaster_initialization(self): + """Test that broadcaster initializes correctly.""" + zeroconf = Zeroconf() + service_type = "_p2p._udp.local." + service_name = "test-peer._p2p._udp.local." + peer_id = ( + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" # String, not ID object + ) + port = 8000 + + broadcaster = PeerBroadcaster( + zeroconf=zeroconf, + service_type=service_type, + service_name=service_name, + peer_id=peer_id, + port=port, + ) + + assert broadcaster.zeroconf == zeroconf + assert broadcaster.service_type == service_type + assert broadcaster.service_name == service_name + assert broadcaster.peer_id == peer_id + assert broadcaster.port == port + + # Clean up + zeroconf.close() + + def test_broadcaster_service_creation(self): + """Test that broadcaster creates valid service info.""" + zeroconf = Zeroconf() + service_type = "_p2p._udp.local." + service_name = "test-peer2._p2p._udp.local." + peer_id_obj = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") + peer_id = str(peer_id_obj) # Convert to string + port = 8000 + + broadcaster = PeerBroadcaster( + zeroconf=zeroconf, + service_type=service_type, + service_name=service_name, + peer_id=peer_id, + port=port, + ) + + # Verify service was created and registered + service_info = broadcaster.service_info + assert service_info is not None + assert service_info.type == service_type + assert service_info.name == service_name + assert service_info.port == port + assert b"id" in service_info.properties + assert service_info.properties[b"id"] == peer_id.encode() + + # Clean up + zeroconf.close() + + def test_broadcaster_start_stop(self): + """Test that broadcaster can start and stop correctly.""" + zeroconf = Zeroconf() + service_type = "_p2p._udp.local." + service_name = "test-start-stop._p2p._udp.local." + peer_id_obj = ID.from_base58("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") + peer_id = str(peer_id_obj) # Convert to string + port = 8001 + + broadcaster = PeerBroadcaster( + zeroconf=zeroconf, + service_type=service_type, + service_name=service_name, + peer_id=peer_id, + port=port, + ) + + # Service should be registered + assert broadcaster.service_info is not None + + # Clean up + zeroconf.close() diff --git a/tests/discovery/mdns/test_listener.py b/tests/discovery/mdns/test_listener.py new file mode 100644 index 00000000..1995202e --- /dev/null +++ b/tests/discovery/mdns/test_listener.py @@ -0,0 +1,114 @@ +""" +Unit tests for mDNS listener component. +""" + +import socket + +from zeroconf import ServiceInfo, Zeroconf + +from libp2p.abc import Multiaddr +from libp2p.discovery.mdns.listener import PeerListener +from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStore + + +class TestPeerListener: + """Unit tests for PeerListener.""" + + def test_listener_initialization(self): + """Test that listener initializes correctly.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + service_type = "_p2p._udp.local." + service_name = "local-peer._p2p._udp.local." + + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type=service_type, + service_name=service_name, + ) + + assert listener.peerstore == peerstore + assert listener.zeroconf == zeroconf + assert listener.service_type == service_type + assert listener.service_name == service_name + assert listener.discovered_services == {} + + # Clean up + listener.stop() + zeroconf.close() + + def test_listener_extract_peer_info_success(self): + """Test successful PeerInfo extraction from ServiceInfo.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type="_p2p._udp.local.", + service_name="local._p2p._udp.local.", + ) + + # Create sample service info + sample_peer_id = ID.from_base58( + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" + ) + hostname = socket.gethostname() + local_ip = "192.168.1.100" + + sample_service_info = ServiceInfo( + type_="_p2p._udp.local.", + name="test-peer._p2p._udp.local.", + port=8000, + properties={b"id": str(sample_peer_id).encode()}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton(local_ip)], + ) + + peer_info = listener._extract_peer_info(sample_service_info) + + assert peer_info is not None + assert isinstance(peer_info.peer_id, ID) + assert len(peer_info.addrs) > 0 + assert all(isinstance(addr, Multiaddr) for addr in peer_info.addrs) + + # Check that protocol is TCP since we always use TCP + assert "/tcp/" in str(peer_info.addrs[0]) + + # Clean up + listener.stop() + zeroconf.close() + + def test_listener_extract_peer_info_invalid_id(self): + """Test PeerInfo extraction fails with invalid peer ID.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type="_p2p._udp.local.", + service_name="local._p2p._udp.local.", + ) + + # Create service info with invalid peer ID + hostname = socket.gethostname() + local_ip = "192.168.1.100" + + service_info = ServiceInfo( + type_="_p2p._udp.local.", + name="invalid-peer._p2p._udp.local.", + port=8000, + properties={b"id": b"invalid_peer_id_format"}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton(local_ip)], + ) + + peer_info = listener._extract_peer_info(service_info) + assert peer_info is None + + # Clean up + listener.stop() + zeroconf.close() diff --git a/tests/discovery/mdns/test_mdns.py b/tests/discovery/mdns/test_mdns.py new file mode 100644 index 00000000..83d734a7 --- /dev/null +++ b/tests/discovery/mdns/test_mdns.py @@ -0,0 +1,121 @@ +""" +Comprehensive integration tests for mDNS discovery functionality. +""" + +import socket + +from zeroconf import Zeroconf + +from libp2p.discovery.mdns.broadcaster import PeerBroadcaster +from libp2p.discovery.mdns.listener import PeerListener +from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStore + + +class TestMDNSDiscovery: + """Comprehensive integration tests for mDNS peer discovery.""" + + def test_one_host_finds_another(self): + """Test that one host can find another host using mDNS.""" + # Create two separate Zeroconf instances to simulate different hosts + host1_zeroconf = Zeroconf() + host2_zeroconf = Zeroconf() + + try: + # Host 1: Set up as broadcaster (the host to be discovered) + host1_peer_id_obj = ID.from_base58( + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" + ) + host1_peer_id = str(host1_peer_id_obj) # Convert to string + host1_broadcaster = PeerBroadcaster( + zeroconf=host1_zeroconf, + service_type="_p2p._udp.local.", + service_name="host1._p2p._udp.local.", + peer_id=host1_peer_id, + port=8000, + ) + + # Host 2: Set up as listener (the host that discovers others) + host2_peerstore = PeerStore() + host2_listener = PeerListener( + peerstore=host2_peerstore, + zeroconf=host2_zeroconf, + service_type="_p2p._udp.local.", + service_name="host2._p2p._udp.local.", + ) + + # Host 1 registers its service for discovery + host1_broadcaster.register() + + # Verify that host2 discovered host1 + assert len(host2_listener.discovered_services) > 0 + assert "host1._p2p._udp.local." in host2_listener.discovered_services + + # Verify that host1's peer info was added to host2's peerstore + discovered_peer_id = host2_listener.discovered_services[ + "host1._p2p._udp.local." + ] + assert str(discovered_peer_id) == host1_peer_id + + # Verify addresses were added to peerstore + try: + addrs = host2_peerstore.addrs(discovered_peer_id) + assert len(addrs) > 0 + # Should be TCP since we always use TCP protocol + assert "/tcp/8000" in str(addrs[0]) + except Exception: + # If no addresses found, the discovery didn't work properly + assert False, "Host1 addresses should be in Host2's peerstore" + + # Clean up + host1_broadcaster.unregister() + host2_listener.stop() + + finally: + host1_zeroconf.close() + host2_zeroconf.close() + + def test_service_info_extraction(self): + """Test service info extraction functionality.""" + peerstore = PeerStore() + zeroconf = Zeroconf() + + try: + listener = PeerListener( + peerstore=peerstore, + zeroconf=zeroconf, + service_type="_p2p._udp.local.", + service_name="test-listener._p2p._udp.local.", + ) + + # Create a test service info + test_peer_id = ID.from_base58( + "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N" + ) + hostname = socket.gethostname() + + from zeroconf import ServiceInfo + + service_info = ServiceInfo( + type_="_p2p._udp.local.", + name="test-service._p2p._udp.local.", + port=8001, + properties={b"id": str(test_peer_id).encode()}, + server=f"{hostname}.local.", + addresses=[socket.inet_aton("192.168.1.100")], + ) + + # Test extraction + peer_info = listener._extract_peer_info(service_info) + + assert peer_info is not None + assert peer_info.peer_id == test_peer_id + assert len(peer_info.addrs) == 1 + assert "/tcp/8001" in str(peer_info.addrs[0]) + + print("✅ Service info extraction test successful!") + print(f" Extracted peer ID: {peer_info.peer_id}") + print(f" Extracted addresses: {[str(addr) for addr in peer_info.addrs]}") + + finally: + zeroconf.close() diff --git a/tests/discovery/mdns/test_utils.py b/tests/discovery/mdns/test_utils.py new file mode 100644 index 00000000..81c296bc --- /dev/null +++ b/tests/discovery/mdns/test_utils.py @@ -0,0 +1,39 @@ +""" +Basic unit tests for mDNS utils module. +""" + +import string + +from libp2p.discovery.mdns.utils import stringGen + + +class TestStringGen: + """Unit tests for stringGen function.""" + + def test_stringgen_default_length(self): + """Test stringGen with default length (63).""" + result = stringGen() + + assert isinstance(result, str) + assert len(result) == 63 + + # Check that all characters are from the expected charset + charset = string.ascii_lowercase + string.digits + for char in result: + assert char in charset + + def test_stringgen_custom_length(self): + """Test stringGen with custom lengths.""" + # Test various lengths + test_lengths = [1, 5, 10, 20, 50, 100] + + for length in test_lengths: + result = stringGen(length) + + assert isinstance(result, str) + assert len(result) == length + + # Check that all characters are from the expected charset + charset = string.ascii_lowercase + string.digits + for char in result: + assert char in charset diff --git a/tests/utils/pubsub/utils.py b/tests/utils/pubsub/utils.py index 5a10ce52..7413d4e6 100644 --- a/tests/utils/pubsub/utils.py +++ b/tests/utils/pubsub/utils.py @@ -24,16 +24,22 @@ def make_pubsub_msg( ) -# TODO: Implement sparse connect async def dense_connect(hosts: Sequence[IHost]) -> None: await connect_some(hosts, 10) -# FIXME: `degree` is not used at all async def connect_some(hosts: Sequence[IHost], degree: int) -> None: + """ + Connect each host to up to 'degree' number of other hosts. + Creates a sparse network topology where each node has limited connections. + """ for i, host in enumerate(hosts): - for host2 in hosts[i + 1 :]: - await connect(host, host2) + connections_made = 0 + for j in range(i + 1, len(hosts)): + if connections_made >= degree: + break + await connect(host, hosts[j]) + connections_made += 1 async def one_to_all_connect(hosts: Sequence[IHost], central_host_index: int) -> None: