Merge branch 'main' into limit_concurrency

This commit is contained in:
Manu Sheel Gupta
2025-06-30 07:47:12 -07:00
committed by GitHub
36 changed files with 1134 additions and 59 deletions

View File

@ -32,6 +32,9 @@ from libp2p.custom_types import (
TProtocol,
TSecurityOptions,
)
from libp2p.discovery.mdns.mdns import (
MDNSDiscovery,
)
from libp2p.host.basic_host import (
BasicHost,
)
@ -245,6 +248,7 @@ def new_host(
disc_opt: IPeerRouting | None = None,
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
enable_mDNS: bool = False,
) -> IHost:
"""
Create a new libp2p host based on the given parameters.
@ -256,6 +260,7 @@ def new_host(
:param disc_opt: optional discovery
:param muxer_preference: optional explicit muxer preference
:param listen_addrs: optional list of multiaddrs to listen on
:param enable_mDNS: whether to enable mDNS discovery
:return: return a host instance
"""
swarm = new_swarm(
@ -268,8 +273,7 @@ def new_host(
)
if disc_opt is not None:
return RoutedHost(swarm, disc_opt)
return BasicHost(swarm)
return RoutedHost(swarm, disc_opt, enable_mDNS)
return BasicHost(swarm, enable_mDNS)
__version__ = __version("libp2p")

View File

View File

View File

@ -0,0 +1,26 @@
from collections.abc import (
Callable,
)
from libp2p.abc import (
PeerInfo,
)
TTL: int = 60 * 60 # Time-to-live for discovered peers in seconds
class PeerDiscovery:
def __init__(self) -> None:
self._peer_discovered_handlers: list[Callable[[PeerInfo], None]] = []
def register_peer_discovered_handler(
self, handler: Callable[[PeerInfo], None]
) -> None:
self._peer_discovered_handlers.append(handler)
def emit_peer_discovered(self, peer_info: PeerInfo) -> None:
for handler in self._peer_discovered_handlers:
handler(peer_info)
peerDiscovery = PeerDiscovery()

View File

View File

@ -0,0 +1,91 @@
import logging
import socket
from zeroconf import (
EventLoopBlocked,
ServiceInfo,
Zeroconf,
)
logger = logging.getLogger("libp2p.discovery.mdns.broadcaster")
class PeerBroadcaster:
"""
Broadcasts this peer's presence on the local network using mDNS/zeroconf.
Registers a service with the peer's ID in the TXT record as per libp2p spec.
"""
def __init__(
self,
zeroconf: Zeroconf,
service_type: str,
service_name: str,
peer_id: str,
port: int,
):
self.zeroconf = zeroconf
self.service_type = service_type
self.peer_id = peer_id
self.port = port
self.service_name = service_name
# Get the local IP address
local_ip = self._get_local_ip()
hostname = socket.gethostname()
self.service_info = ServiceInfo(
type_=self.service_type,
name=self.service_name,
port=self.port,
properties={b"id": self.peer_id.encode()},
server=f"{hostname}.local.",
addresses=[socket.inet_aton(local_ip)],
)
def _get_local_ip(self) -> str:
"""Get the local IP address of this machine"""
try:
# Connect to a remote address to determine the local IP
# This doesn't actually send data
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
return local_ip
except Exception:
# Fallback to localhost if we can't determine the IP
return "127.0.0.1"
def register(self) -> None:
"""Register the peer's mDNS service on the network."""
try:
self.zeroconf.register_service(self.service_info)
logger.debug(f"mDNS service registered: {self.service_name}")
except EventLoopBlocked as e:
logger.warning(
"EventLoopBlocked while registering mDNS '%s': %s", self.service_name, e
)
except Exception as e:
logger.error(
"Unexpected error during mDNS registration for '%s': %r",
self.service_name,
e,
)
def unregister(self) -> None:
"""Unregister the peer's mDNS service from the network."""
try:
self.zeroconf.unregister_service(self.service_info)
logger.debug(f"mDNS service unregistered: {self.service_name}")
except EventLoopBlocked as e:
logger.warning(
"EventLoopBlocked while unregistering mDNS '%s': %s",
self.service_name,
e,
)
except Exception as e:
logger.error(
"Unexpected error during mDNS unregistration for '%s': %r",
self.service_name,
e,
)

View File

@ -0,0 +1,83 @@
import logging
import socket
from zeroconf import (
ServiceBrowser,
ServiceInfo,
ServiceListener,
Zeroconf,
)
from libp2p.abc import IPeerStore, Multiaddr
from libp2p.discovery.events.peerDiscovery import peerDiscovery
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
logger = logging.getLogger("libp2p.discovery.mdns.listener")
class PeerListener(ServiceListener):
"""mDNS listener — now a true ServiceListener subclass."""
def __init__(
self,
peerstore: IPeerStore,
zeroconf: Zeroconf,
service_type: str,
service_name: str,
) -> None:
self.peerstore = peerstore
self.zeroconf = zeroconf
self.service_type = service_type
self.service_name = service_name
self.discovered_services: dict[str, ID] = {}
self.browser = ServiceBrowser(self.zeroconf, self.service_type, listener=self)
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
if name == self.service_name:
return
logger.debug(f"Adding service: {name}")
info = zc.get_service_info(type_, name, timeout=5000)
if not info:
return
peer_info = self._extract_peer_info(info)
if peer_info:
self.discovered_services[name] = peer_info.peer_id
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
peerDiscovery.emit_peer_discovered(peer_info)
logger.debug(f"Discovered Peer: {peer_info.peer_id}")
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
if name == self.service_name:
return
logger.debug(f"Removing service: {name}")
peer_id = self.discovered_services.pop(name)
self.peerstore.clear_addrs(peer_id)
logger.debug(f"Removed Peer: {peer_id}")
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
info = zc.get_service_info(type_, name, timeout=5000)
if not info:
return
peer_info = self._extract_peer_info(info)
if peer_info:
self.peerstore.clear_addrs(peer_info.peer_id)
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
logger.debug(f"Updated Peer {peer_info.peer_id}")
def _extract_peer_info(self, info: ServiceInfo) -> PeerInfo | None:
try:
addrs = [
Multiaddr(f"/ip4/{socket.inet_ntoa(addr)}/tcp/{info.port}")
for addr in info.addresses
]
pid_bytes = info.properties.get(b"id")
if not pid_bytes:
return None
pid = ID.from_base58(pid_bytes.decode())
return PeerInfo(peer_id=pid, addrs=addrs)
except Exception:
return None
def stop(self) -> None:
self.browser.cancel()

View File

@ -0,0 +1,73 @@
"""
mDNS-based peer discovery for py-libp2p.
Conforms to https://github.com/libp2p/specs/blob/master/discovery/mdns.md
Uses zeroconf for mDNS broadcast/listen. Async operations use trio.
"""
import logging
from zeroconf import (
Zeroconf,
)
from libp2p.abc import (
INetworkService,
)
from .broadcaster import (
PeerBroadcaster,
)
from .listener import (
PeerListener,
)
from .utils import (
stringGen,
)
logger = logging.getLogger("libp2p.discovery.mdns")
SERVICE_TYPE = "_p2p._udp.local."
MCAST_PORT = 5353
MCAST_ADDR = "224.0.0.251"
class MDNSDiscovery:
"""
mDNS-based peer discovery for py-libp2p, using zeroconf.
Conforms to the libp2p mDNS discovery spec.
"""
def __init__(self, swarm: INetworkService, port: int = 8000):
self.peer_id = str(swarm.get_peer_id())
self.port = port
self.zeroconf = Zeroconf()
self.serviceName = f"{stringGen()}.{SERVICE_TYPE}"
self.peerstore = swarm.peerstore
self.swarm = swarm
self.broadcaster = PeerBroadcaster(
zeroconf=self.zeroconf,
service_type=SERVICE_TYPE,
service_name=self.serviceName,
peer_id=self.peer_id,
port=self.port,
)
self.listener = PeerListener(
zeroconf=self.zeroconf,
peerstore=self.peerstore,
service_type=SERVICE_TYPE,
service_name=self.serviceName,
)
def start(self) -> None:
"""Register this peer and start listening for others."""
logger.debug(
f"Starting mDNS discovery for peer {self.peer_id} on port {self.port}"
)
self.broadcaster.register()
# Listener is started in constructor
def stop(self) -> None:
"""Unregister this peer and clean up zeroconf resources."""
logger.debug("Stopping mDNS discovery")
self.broadcaster.unregister()
self.zeroconf.close()

View File

@ -0,0 +1,11 @@
import random
import string
def stringGen(len: int = 63) -> str:
"""Generate a random string of lowercase letters and digits."""
charset = string.ascii_lowercase + string.digits
result = []
for _ in range(len):
result.append(random.choice(charset))
return "".join(result)

View File

@ -29,6 +29,7 @@ from libp2p.custom_types import (
StreamHandlerFn,
TProtocol,
)
from libp2p.discovery.mdns.mdns import MDNSDiscovery
from libp2p.host.defaults import (
get_default_protocols,
)
@ -89,6 +90,7 @@ class BasicHost(IHost):
def __init__(
self,
network: INetworkService,
enable_mDNS: bool = False,
default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None,
) -> None:
self._network = network
@ -98,6 +100,8 @@ class BasicHost(IHost):
default_protocols = default_protocols or get_default_protocols(self)
self.multiselect = Multiselect(dict(default_protocols.items()))
self.multiselect_client = MultiselectClient()
if enable_mDNS:
self.mDNS = MDNSDiscovery(network)
def get_id(self) -> ID:
"""
@ -162,7 +166,14 @@ class BasicHost(IHost):
network = self.get_network()
async with background_trio_service(network):
await network.listen(*listen_addrs)
yield
if hasattr(self, "mDNS") and self.mDNS is not None:
logger.debug("Starting mDNS Discovery")
self.mDNS.start()
try:
yield
finally:
if hasattr(self, "mDNS") and self.mDNS is not None:
self.mDNS.stop()
return _run()

View File

@ -18,8 +18,10 @@ from libp2p.peer.peerinfo import (
class RoutedHost(BasicHost):
_router: IPeerRouting
def __init__(self, network: INetworkService, router: IPeerRouting):
super().__init__(network)
def __init__(
self, network: INetworkService, router: IPeerRouting, enable_mDNS: bool = False
):
super().__init__(network, enable_mDNS)
self._router = router
async def connect(self, peer_info: PeerInfo) -> None:

View File

@ -12,15 +12,9 @@ from libp2p.abc import (
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.peer.id import (
ID,
)
from libp2p.utils import (
encode_varint_prefixed,
)
from .exceptions import (
PubsubRouterError,
@ -120,13 +114,7 @@ class FloodSub(IPubsubRouter):
if peer_id not in pubsub.peers:
continue
stream = pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
pubsub._handle_dead_peer(peer_id)
await pubsub.write_msg(stream, rpc_msg)
async def join(self, topic: str) -> None:
"""

View File

@ -24,9 +24,6 @@ from libp2p.abc import (
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.peer.id import (
ID,
)
@ -44,9 +41,6 @@ from libp2p.pubsub import (
from libp2p.tools.async_service import (
Service,
)
from libp2p.utils import (
encode_varint_prefixed,
)
from .exceptions import (
NoPubsubAttached,
@ -267,13 +261,10 @@ class GossipSub(IPubsubRouter, Service):
if peer_id not in self.pubsub.peers:
continue
stream = self.pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
self.pubsub._handle_dead_peer(peer_id)
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
await self.pubsub.write_msg(stream, rpc_msg)
for topic in pubsub_msg.topicIDs:
self.time_since_last_publish[topic] = int(time.time())
@ -829,8 +820,6 @@ class GossipSub(IPubsubRouter, Service):
packet.publish.extend(msgs_to_forward)
# 2) Serialize that packet
rpc_msg: bytes = packet.SerializeToString()
if self.pubsub is None:
raise NoPubsubAttached
@ -844,14 +833,7 @@ class GossipSub(IPubsubRouter, Service):
peer_stream = self.pubsub.peers[sender_peer_id]
# 4) And write the packet to the stream
try:
await peer_stream.write(encode_varint_prefixed(rpc_msg))
except StreamClosed:
logger.debug(
"Fail to responed to iwant request from %s: stream closed",
sender_peer_id,
)
self.pubsub._handle_dead_peer(sender_peer_id)
await self.pubsub.write_msg(peer_stream, packet)
async def handle_graft(
self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID
@ -993,8 +975,6 @@ class GossipSub(IPubsubRouter, Service):
packet: rpc_pb2.RPC = rpc_pb2.RPC()
packet.control.CopyFrom(control_msg)
rpc_msg: bytes = packet.SerializeToString()
# Get stream for peer from pubsub
if to_peer not in self.pubsub.peers:
logger.debug(
@ -1004,8 +984,4 @@ class GossipSub(IPubsubRouter, Service):
peer_stream = self.pubsub.peers[to_peer]
# Write rpc to stream
try:
await peer_stream.write(encode_varint_prefixed(rpc_msg))
except StreamClosed:
logger.debug("Fail to emit control message to %s: stream closed", to_peer)
self.pubsub._handle_dead_peer(to_peer)
await self.pubsub.write_msg(peer_stream, packet)

View File

@ -66,6 +66,7 @@ from libp2p.utils import (
encode_varint_prefixed,
read_varint_prefixed_bytes,
)
from libp2p.utils.varint import encode_uvarint
from .pb import (
rpc_pb2,
@ -682,19 +683,18 @@ class Pubsub(Service, IPubsub):
# TODO: Implement throttle on async validators
if len(async_topic_validators) > 0:
# TODO: Use a better pattern
final_result: bool = True
# Appends to lists are thread safe in CPython
results = []
async def run_async_validator(func: AsyncValidatorFn) -> None:
nonlocal final_result
result = await func(msg_forwarder, msg)
final_result = final_result and result
results.append(result)
async with trio.open_nursery() as nursery:
for async_validator in async_topic_validators:
nursery.start_soon(run_async_validator, async_validator)
if not final_result:
if not all(results):
raise ValidationError(f"Validation failed for msg={msg}")
async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
@ -779,3 +779,43 @@ class Pubsub(Service, IPubsub):
def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool:
return any(topic in self.topic_ids for topic in msg.topicIDs)
async def write_msg(self, stream: INetStream, rpc_msg: rpc_pb2.RPC) -> bool:
"""
Write an RPC message to a stream with proper error handling.
Implements WriteMsg similar to go-msgio which is used in go-libp2p
Ref: https://github.com/libp2p/go-msgio/blob/master/protoio/uvarint_writer.go#L56
:param stream: stream to write the message to
:param rpc_msg: RPC message to write
:return: True if successful, False if stream was closed
"""
try:
# Calculate message size first
msg_bytes = rpc_msg.SerializeToString()
msg_size = len(msg_bytes)
# Calculate varint size and allocate exact buffer size needed
varint_bytes = encode_uvarint(msg_size)
varint_size = len(varint_bytes)
# Allocate buffer with exact size (like Go's pool.Get())
buf = bytearray(varint_size + msg_size)
# Write varint length prefix to buffer (like Go's binary.PutUvarint())
buf[:varint_size] = varint_bytes
# Write serialized message after varint (like Go's rpc.MarshalTo())
buf[varint_size:] = msg_bytes
# Single write operation (like Go's s.Write(buf))
await stream.write(bytes(buf))
return True
except StreamClosed:
peer_id = stream.muxed_conn.peer_id
logger.debug("Fail to write message to %s: stream closed", peer_id)
self._handle_dead_peer(peer_id)
return False

View File

@ -493,7 +493,7 @@ class Yamux(IMuxedConn):
f"type={typ}, flags={flags}, stream_id={stream_id},"
f"length={length}"
)
if typ == TYPE_DATA and flags & FLAG_SYN:
if (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE) and flags & FLAG_SYN:
async with self.streams_lock:
if stream_id not in self.streams:
stream = YamuxStream(stream_id, self, False)