Merge branch 'main' into dependency-chore

This commit is contained in:
Manu Sheel Gupta
2025-08-25 16:07:37 +05:30
committed by GitHub
12 changed files with 572 additions and 25 deletions

View File

@ -0,0 +1,63 @@
"""
Advanced demonstration of Thin Waist address handling.
Run:
python -m examples.advanced.network_discovery
"""
from __future__ import annotations
from multiaddr import Multiaddr
try:
from libp2p.utils.address_validation import (
expand_wildcard_address,
get_available_interfaces,
get_optimal_binding_address,
)
except ImportError:
# Fallbacks if utilities are missing
def get_available_interfaces(port: int, protocol: str = "tcp"):
return [Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")]
def expand_wildcard_address(addr: Multiaddr, port: int | None = None):
if port is None:
return [addr]
addr_str = str(addr).rsplit("/", 1)[0]
return [Multiaddr(addr_str + f"/{port}")]
def get_optimal_binding_address(port: int, protocol: str = "tcp"):
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
def main() -> None:
port = 8080
interfaces = get_available_interfaces(port)
print(f"Discovered interfaces for port {port}:")
for a in interfaces:
print(f" - {a}")
wildcard_v4 = Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
expanded_v4 = expand_wildcard_address(wildcard_v4)
print("\nExpanded IPv4 wildcard:")
for a in expanded_v4:
print(f" - {a}")
wildcard_v6 = Multiaddr(f"/ip6/::/tcp/{port}")
expanded_v6 = expand_wildcard_address(wildcard_v6)
print("\nExpanded IPv6 wildcard:")
for a in expanded_v6:
print(f" - {a}")
print("\nOptimal binding address heuristic result:")
print(f" -> {get_optimal_binding_address(port)}")
override_port = 9000
overridden = expand_wildcard_address(wildcard_v4, port=override_port)
print(f"\nPort override expansion to {override_port}:")
for a in overridden:
print(f" - {a}")
if __name__ == "__main__":
main()

View File

@ -1,4 +1,6 @@
import argparse
import random
import secrets
import multiaddr
import trio
@ -12,40 +14,54 @@ from libp2p.crypto.secp256k1 import (
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.exceptions import (
StreamEOF,
)
from libp2p.network.stream.net_stream import (
INetStream,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
)
PROTOCOL_ID = TProtocol("/echo/1.0.0")
MAX_READ_LEN = 2**32 - 1
async def _echo_stream_handler(stream: INetStream) -> None:
# Wait until EOF
msg = await stream.read(MAX_READ_LEN)
await stream.write(msg)
await stream.close()
try:
peer_id = stream.muxed_conn.peer_id
print(f"Received connection from {peer_id}")
# Wait until EOF
msg = await stream.read(MAX_READ_LEN)
print(f"Echoing message: {msg.decode('utf-8')}")
await stream.write(msg)
except StreamEOF:
print("Stream closed by remote peer.")
except Exception as e:
print(f"Error in echo handler: {e}")
finally:
await stream.close()
async def run(port: int, destination: str, seed: int | None = None) -> None:
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
if port <= 0:
port = find_free_port()
listen_addr = get_available_interfaces(port)
if seed:
import random
random.seed(seed)
secret_number = random.getrandbits(32 * 8)
secret = secret_number.to_bytes(length=32, byteorder="big")
else:
import secrets
secret = secrets.token_bytes(32)
host = new_host(key_pair=create_new_key_pair(secret))
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
async with host.run(listen_addrs=listen_addr), trio.open_nursery() as nursery:
# Start the peer-store cleanup task
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
@ -54,10 +70,15 @@ async def run(port: int, destination: str, seed: int | None = None) -> None:
if not destination: # its the server
host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler)
# Print all listen addresses with peer ID (JS parity)
print("Listener ready, listening on:\n")
peer_id = host.get_id().to_string()
for addr in listen_addr:
print(f"{addr}/p2p/{peer_id}")
print(
"Run this from the same folder in another console:\n\n"
f"echo-demo "
f"-d {host.get_addrs()[0]}\n"
"\nRun this from the same folder in another console:\n\n"
f"echo-demo -d {host.get_addrs()[0]}\n"
)
print("Waiting for incoming connections...")
await trio.sleep_forever()

View File

@ -1,6 +1,5 @@
import argparse
import logging
import socket
import base58
import multiaddr
@ -31,6 +30,9 @@ from libp2p.stream_muxer.mplex.mplex import (
from libp2p.tools.async_service.trio_service import (
background_trio_service,
)
from libp2p.utils.address_validation import (
find_free_port,
)
# Configure logging
logging.basicConfig(
@ -77,13 +79,6 @@ async def publish_loop(pubsub, topic, termination_event):
await trio.sleep(1) # Avoid tight loop on error
def find_free_port():
"""Find a free port on localhost."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) # Bind to a free port provided by the OS
return s.getsockname()[1]
async def monitor_peer_topics(pubsub, nursery, termination_event):
"""
Monitor for new topics that peers are subscribed to and

View File

@ -249,9 +249,11 @@ class Swarm(Service, INetworkService):
# We need to wait until `self.listener_nursery` is created.
await self.event_listener_nursery_created.wait()
success_count = 0
for maddr in multiaddrs:
if str(maddr) in self.listeners:
return True
success_count += 1
continue
async def conn_handler(
read_write_closer: ReadWriteCloser, maddr: Multiaddr = maddr
@ -302,13 +304,14 @@ class Swarm(Service, INetworkService):
# Call notifiers since event occurred
await self.notify_listen(maddr)
return True
success_count += 1
logger.debug("successfully started listening on: %s", maddr)
except OSError:
# Failed. Continue looping.
logger.debug("fail to listen on: %s", maddr)
# No maddr succeeded
return False
# Return true if at least one address succeeded
return success_count > 0
async def close(self) -> None:
"""

View File

@ -15,6 +15,13 @@ from libp2p.utils.version import (
get_agent_version,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
expand_wildcard_address,
find_free_port,
)
__all__ = [
"decode_uvarint_from_stream",
"encode_delim",
@ -26,4 +33,8 @@ __all__ = [
"decode_varint_from_bytes",
"decode_varint_with_size",
"read_length_prefixed_protobuf",
"get_available_interfaces",
"get_optimal_binding_address",
"expand_wildcard_address",
"find_free_port",
]

View File

@ -0,0 +1,160 @@
from __future__ import annotations
import socket
from multiaddr import Multiaddr
try:
from multiaddr.utils import ( # type: ignore
get_network_addrs,
get_thin_waist_addresses,
)
_HAS_THIN_WAIST = True
except ImportError: # pragma: no cover - only executed in older environments
_HAS_THIN_WAIST = False
get_thin_waist_addresses = None # type: ignore
get_network_addrs = None # type: ignore
def _safe_get_network_addrs(ip_version: int) -> list[str]:
"""
Internal safe wrapper. Returns a list of IP addresses for the requested IP version.
Falls back to minimal defaults when Thin Waist helpers are missing.
:param ip_version: 4 or 6
"""
if _HAS_THIN_WAIST and get_network_addrs:
try:
return get_network_addrs(ip_version) or []
except Exception: # pragma: no cover - defensive
return []
# Fallback behavior (very conservative)
if ip_version == 4:
return ["127.0.0.1"]
if ip_version == 6:
return ["::1"]
return []
def find_free_port() -> int:
"""Find a free port on localhost."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) # Bind to a free port provided by the OS
return s.getsockname()[1]
def _safe_expand(addr: Multiaddr, port: int | None = None) -> list[Multiaddr]:
"""
Internal safe expansion wrapper. Returns a list of Multiaddr objects.
If Thin Waist isn't available, returns [addr] (identity).
"""
if _HAS_THIN_WAIST and get_thin_waist_addresses:
try:
if port is not None:
return get_thin_waist_addresses(addr, port=port) or []
return get_thin_waist_addresses(addr) or []
except Exception: # pragma: no cover - defensive
return [addr]
return [addr]
def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr]:
"""
Discover available network interfaces (IPv4 + IPv6 if supported) for binding.
:param port: Port number to bind to.
:param protocol: Transport protocol (e.g., "tcp" or "udp").
:return: List of Multiaddr objects representing candidate interface addresses.
"""
addrs: list[Multiaddr] = []
# IPv4 enumeration
seen_v4: set[str] = set()
for ip in _safe_get_network_addrs(4):
seen_v4.add(ip)
addrs.append(Multiaddr(f"/ip4/{ip}/{protocol}/{port}"))
# Ensure IPv4 loopback is always included when IPv4 interfaces are discovered
if seen_v4 and "127.0.0.1" not in seen_v4:
addrs.append(Multiaddr(f"/ip4/127.0.0.1/{protocol}/{port}"))
# TODO: IPv6 support temporarily disabled due to libp2p handshake issues
# IPv6 connections fail during protocol negotiation (SecurityUpgradeFailure)
# Re-enable IPv6 support once the following issues are resolved:
# - libp2p security handshake over IPv6
# - multiselect protocol over IPv6
# - connection establishment over IPv6
#
# seen_v6: set[str] = set()
# for ip in _safe_get_network_addrs(6):
# seen_v6.add(ip)
# addrs.append(Multiaddr(f"/ip6/{ip}/{protocol}/{port}"))
#
# # Always include IPv6 loopback for testing purposes when IPv6 is available
# # This ensures IPv6 functionality can be tested even without global IPv6 addresses
# if "::1" not in seen_v6:
# addrs.append(Multiaddr(f"/ip6/::1/{protocol}/{port}"))
# Fallback if nothing discovered
if not addrs:
addrs.append(Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}"))
return addrs
def expand_wildcard_address(
addr: Multiaddr, port: int | None = None
) -> list[Multiaddr]:
"""
Expand a wildcard (e.g. /ip4/0.0.0.0/tcp/0) into all concrete interfaces.
:param addr: Multiaddr to expand.
:param port: Optional override for port selection.
:return: List of concrete Multiaddr instances.
"""
expanded = _safe_expand(addr, port=port)
if not expanded: # Safety fallback
return [addr]
return expanded
def get_optimal_binding_address(port: int, protocol: str = "tcp") -> Multiaddr:
"""
Choose an optimal address for an example to bind to:
- Prefer non-loopback IPv4
- Then non-loopback IPv6
- Fallback to loopback
- Fallback to wildcard
:param port: Port number.
:param protocol: Transport protocol.
:return: A single Multiaddr chosen heuristically.
"""
candidates = get_available_interfaces(port, protocol)
def is_non_loopback(ma: Multiaddr) -> bool:
s = str(ma)
return not ("/ip4/127." in s or "/ip6/::1" in s)
for c in candidates:
if "/ip4/" in str(c) and is_non_loopback(c):
return c
for c in candidates:
if "/ip6/" in str(c) and is_non_loopback(c):
return c
for c in candidates:
if "/ip4/127." in str(c) or "/ip6/::1" in str(c):
return c
# As a final fallback, produce a wildcard
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
__all__ = [
"get_available_interfaces",
"get_optimal_binding_address",
"expand_wildcard_address",
"find_free_port",
]

View File

@ -0,0 +1 @@
Added Thin Waist address validation utilities (with support for interface enumeration, optimal binding, and wildcard expansion).

View File

@ -0,0 +1,7 @@
Add Thin Waist address validation utilities and integrate into echo example
- Add ``libp2p/utils/address_validation.py`` with dynamic interface discovery
- Implement ``get_available_interfaces()``, ``get_optimal_binding_address()``, and ``expand_wildcard_address()``
- Update echo example to use dynamic address discovery instead of hardcoded wildcard
- Add safe fallbacks for environments lacking Thin Waist support
- Temporarily disable IPv6 support due to libp2p handshake issues (TODO: re-enable when resolved)

View File

@ -0,0 +1,5 @@
Fix multi-address listening bug in swarm.listen()
- Fix early return in swarm.listen() that prevented listening on all addresses
- Add comprehensive tests for multi-address listening functionality
- Ensure all available interfaces are properly bound and connectable

View File

@ -16,6 +16,9 @@ from libp2p.network.exceptions import (
from libp2p.network.swarm import (
Swarm,
)
from libp2p.tools.async_service import (
background_trio_service,
)
from libp2p.tools.utils import (
connect_swarm,
)
@ -184,3 +187,116 @@ def test_new_swarm_quic_multiaddr_raises():
addr = Multiaddr("/ip4/127.0.0.1/udp/9999/quic")
with pytest.raises(ValueError, match="QUIC not yet supported"):
new_swarm(listen_addrs=[addr])
@pytest.mark.trio
async def test_swarm_listen_multiple_addresses(security_protocol):
"""Test that swarm can listen on multiple addresses simultaneously."""
from libp2p.utils.address_validation import get_available_interfaces
# Get multiple addresses to listen on
listen_addrs = get_available_interfaces(0) # Let OS choose ports
# Create a swarm and listen on multiple addresses
swarm = SwarmFactory.build(security_protocol=security_protocol)
async with background_trio_service(swarm):
# Listen on all addresses
success = await swarm.listen(*listen_addrs)
assert success, "Should successfully listen on at least one address"
# Check that we have listeners for the addresses
actual_listeners = list(swarm.listeners.keys())
assert len(actual_listeners) > 0, "Should have at least one listener"
# Verify that all successful listeners are in the listeners dict
successful_count = 0
for addr in listen_addrs:
addr_str = str(addr)
if addr_str in actual_listeners:
successful_count += 1
# This address successfully started listening
listener = swarm.listeners[addr_str]
listener_addrs = listener.get_addrs()
assert len(listener_addrs) > 0, (
f"Listener for {addr} should have addresses"
)
# Check that the listener address matches the expected address
# (port might be different if we used port 0)
expected_ip = addr.value_for_protocol("ip4")
expected_protocol = addr.value_for_protocol("tcp")
if expected_ip and expected_protocol:
found_matching = False
for listener_addr in listener_addrs:
if (
listener_addr.value_for_protocol("ip4") == expected_ip
and listener_addr.value_for_protocol("tcp") is not None
):
found_matching = True
break
assert found_matching, (
f"Listener for {addr} should have matching IP"
)
assert successful_count == len(listen_addrs), (
f"All {len(listen_addrs)} addresses should be listening, "
f"but only {successful_count} succeeded"
)
@pytest.mark.trio
async def test_swarm_listen_multiple_addresses_connectivity(security_protocol):
"""Test that real libp2p connections can be established to all listening addresses.""" # noqa: E501
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.utils.address_validation import get_available_interfaces
# Get multiple addresses to listen on
listen_addrs = get_available_interfaces(0) # Let OS choose ports
# Create a swarm and listen on multiple addresses
swarm1 = SwarmFactory.build(security_protocol=security_protocol)
async with background_trio_service(swarm1):
# Listen on all addresses
success = await swarm1.listen(*listen_addrs)
assert success, "Should successfully listen on at least one address"
# Verify all available interfaces are listening
assert len(swarm1.listeners) == len(listen_addrs), (
f"All {len(listen_addrs)} interfaces should be listening, "
f"but only {len(swarm1.listeners)} are"
)
# Create a second swarm to test connections
swarm2 = SwarmFactory.build(security_protocol=security_protocol)
async with background_trio_service(swarm2):
# Test connectivity to each listening address using real libp2p connections
for addr_str, listener in swarm1.listeners.items():
listener_addrs = listener.get_addrs()
for listener_addr in listener_addrs:
# Create a full multiaddr with peer ID for libp2p connection
peer_id = swarm1.get_peer_id()
full_addr = listener_addr.encapsulate(f"/p2p/{peer_id}")
# Test real libp2p connection
try:
peer_info = info_from_p2p_addr(full_addr)
# Add the peer info to swarm2's peerstore so it knows where to connect # noqa: E501
swarm2.peerstore.add_addrs(
peer_info.peer_id, [listener_addr], 10000
)
await swarm2.dial_peer(peer_info.peer_id)
# Verify connection was established
assert peer_info.peer_id in swarm2.connections, (
f"Connection to {full_addr} should be established"
)
assert swarm2.get_peer_id() in swarm1.connections, (
f"Connection from {full_addr} should be established"
)
except Exception as e:
pytest.fail(
f"Failed to establish libp2p connection to {full_addr}: {e}"
)

View File

@ -0,0 +1,109 @@
import contextlib
import os
from pathlib import Path
import subprocess
import sys
import time
from multiaddr import Multiaddr
from multiaddr.protocols import P_IP4, P_IP6, P_P2P, P_TCP
# pytestmark = pytest.mark.timeout(20) # Temporarily disabled for debugging
# This test is intentionally lightweight and can be marked as 'integration'.
# It ensures the echo example runs and prints the new Thin Waist lines using
# Trio primitives.
current_file = Path(__file__)
project_root = current_file.parent.parent.parent
EXAMPLES_DIR: Path = project_root / "examples" / "echo"
def test_echo_example_starts_and_prints_thin_waist(monkeypatch, tmp_path):
"""Run echo server and validate printed multiaddr and peer id."""
# Run echo example as server
cmd = [sys.executable, "-u", str(EXAMPLES_DIR / "echo.py"), "-p", "0"]
env = {**os.environ, "PYTHONUNBUFFERED": "1"}
proc: subprocess.Popen[str] = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
if proc.stdout is None:
proc.terminate()
raise RuntimeError("Process stdout is None")
out_stream = proc.stdout
peer_id: str | None = None
printed_multiaddr: str | None = None
saw_waiting = False
start = time.time()
timeout_s = 8.0
try:
while time.time() - start < timeout_s:
line = out_stream.readline()
if not line:
time.sleep(0.05)
continue
s = line.strip()
if s.startswith("I am "):
peer_id = s.partition("I am ")[2]
if s.startswith("echo-demo -d "):
printed_multiaddr = s.partition("echo-demo -d ")[2]
if "Waiting for incoming connections..." in s:
saw_waiting = True
break
finally:
with contextlib.suppress(ProcessLookupError):
proc.terminate()
with contextlib.suppress(ProcessLookupError):
proc.kill()
assert peer_id, "Did not capture peer ID line"
assert printed_multiaddr, "Did not capture multiaddr line"
assert saw_waiting, "Did not capture waiting-for-connections line"
# Validate multiaddr structure using py-multiaddr protocol methods
ma = Multiaddr(printed_multiaddr) # should parse without error
# Check that the multiaddr contains the p2p protocol
try:
peer_id_from_multiaddr = ma.value_for_protocol("p2p")
assert peer_id_from_multiaddr is not None, (
"Multiaddr missing p2p protocol value"
)
assert peer_id_from_multiaddr == peer_id, (
f"Peer ID mismatch: {peer_id_from_multiaddr} != {peer_id}"
)
except Exception as e:
raise AssertionError(f"Failed to extract p2p protocol value: {e}")
# Validate the multiaddr structure by checking protocols
protocols = ma.protocols()
# Should have at least IP, TCP, and P2P protocols
assert any(p.code == P_IP4 or p.code == P_IP6 for p in protocols), (
"Missing IP protocol"
)
assert any(p.code == P_TCP for p in protocols), "Missing TCP protocol"
assert any(p.code == P_P2P for p in protocols), "Missing P2P protocol"
# Extract the p2p part and validate it matches the captured peer ID
p2p_part = Multiaddr(f"/p2p/{peer_id}")
try:
# Decapsulate the p2p part to get the transport address
transport_addr = ma.decapsulate(p2p_part)
# Verify the decapsulated address doesn't contain p2p
transport_protocols = transport_addr.protocols()
assert not any(p.code == P_P2P for p in transport_protocols), (
"Decapsulation failed - still contains p2p"
)
# Verify the original multiaddr can be reconstructed
reconstructed = transport_addr.encapsulate(p2p_part)
assert str(reconstructed) == str(ma), "Reconstruction failed"
except Exception as e:
raise AssertionError(f"Multiaddr decapsulation failed: {e}")

View File

@ -0,0 +1,56 @@
import os
import pytest
from multiaddr import Multiaddr
from libp2p.utils.address_validation import (
expand_wildcard_address,
get_available_interfaces,
get_optimal_binding_address,
)
@pytest.mark.parametrize("proto", ["tcp"])
def test_get_available_interfaces(proto: str) -> None:
interfaces = get_available_interfaces(0, protocol=proto)
assert len(interfaces) > 0
for addr in interfaces:
assert isinstance(addr, Multiaddr)
assert f"/{proto}/" in str(addr)
def test_get_optimal_binding_address() -> None:
addr = get_optimal_binding_address(0)
assert isinstance(addr, Multiaddr)
# At least IPv4 or IPv6 prefix present
s = str(addr)
assert ("/ip4/" in s) or ("/ip6/" in s)
def test_expand_wildcard_address_ipv4() -> None:
wildcard = Multiaddr("/ip4/0.0.0.0/tcp/0")
expanded = expand_wildcard_address(wildcard)
assert len(expanded) > 0
for e in expanded:
assert isinstance(e, Multiaddr)
assert "/tcp/" in str(e)
def test_expand_wildcard_address_port_override() -> None:
wildcard = Multiaddr("/ip4/0.0.0.0/tcp/7000")
overridden = expand_wildcard_address(wildcard, port=9001)
assert len(overridden) > 0
for e in overridden:
assert str(e).endswith("/tcp/9001")
@pytest.mark.skipif(
os.environ.get("NO_IPV6") == "1",
reason="Environment disallows IPv6",
)
def test_expand_wildcard_address_ipv6() -> None:
wildcard = Multiaddr("/ip6/::/tcp/0")
expanded = expand_wildcard_address(wildcard)
assert len(expanded) > 0
for e in expanded:
assert "/ip6/" in str(e)