Refactor: update examples to utilize new address paradigm with wildcard support

- Introduced `get_wildcard_address` function for explicit wildcard binding.
- Updated examples to use `get_available_interfaces` and `get_optimal_binding_address` for address selection.
- Ensured consistent usage of the new address paradigm across all example files.
- Added tests to verify the implementation of the new address paradigm and wildcard feature.
This commit is contained in:
yashksaini-coder
2025-09-09 12:10:28 +05:30
parent 80e22f7c4a
commit 7d364da950
15 changed files with 280 additions and 181 deletions

View File

@ -14,6 +14,7 @@ try:
expand_wildcard_address, expand_wildcard_address,
get_available_interfaces, get_available_interfaces,
get_optimal_binding_address, get_optimal_binding_address,
get_wildcard_address,
) )
except ImportError: except ImportError:
# Fallbacks if utilities are missing # Fallbacks if utilities are missing
@ -29,6 +30,9 @@ except ImportError:
def get_optimal_binding_address(port: int, protocol: str = "tcp"): def get_optimal_binding_address(port: int, protocol: str = "tcp"):
return Multiaddr(f"/ip4/127.0.0.1/{protocol}/{port}") return Multiaddr(f"/ip4/127.0.0.1/{protocol}/{port}")
def get_wildcard_address(port: int, protocol: str = "tcp"):
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
def main() -> None: def main() -> None:
port = 8080 port = 8080
@ -37,7 +41,10 @@ def main() -> None:
for a in interfaces: for a in interfaces:
print(f" - {a}") print(f" - {a}")
wildcard_v4 = Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") # Demonstrate wildcard address as a feature
wildcard_v4 = get_wildcard_address(port)
print(f"\nWildcard address (feature): {wildcard_v4}")
expanded_v4 = expand_wildcard_address(wildcard_v4) expanded_v4 = expand_wildcard_address(wildcard_v4)
print("\nExpanded IPv4 wildcard:") print("\nExpanded IPv4 wildcard:")
for a in expanded_v4: for a in expanded_v4:

View File

@ -53,7 +53,11 @@ BOOTSTRAP_PEERS = [
async def run(port: int, bootstrap_addrs: list[str]) -> None: async def run(port: int, bootstrap_addrs: list[str]) -> None:
"""Run the bootstrap discovery example.""" """Run the bootstrap discovery example."""
from libp2p.utils.address_validation import find_free_port, get_available_interfaces from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
if port <= 0: if port <= 0:
port = find_free_port() port = find_free_port()
@ -93,6 +97,12 @@ async def run(port: int, bootstrap_addrs: list[str]) -> None:
logger.info(f"{addr}") logger.info(f"{addr}")
print(f"{addr}") print(f"{addr}")
# Display optimal address for reference
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
logger.info(f"Optimal address: {optimal_addr_with_peer}")
print(f"Optimal address: {optimal_addr_with_peer}")
# Keep running and log peer discovery events # Keep running and log peer discovery events
await trio.sleep_forever() await trio.sleep_forever()
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -46,7 +46,11 @@ async def write_data(stream: INetStream) -> None:
async def run(port: int, destination: str) -> None: async def run(port: int, destination: str) -> None:
from libp2p.utils.address_validation import find_free_port, get_available_interfaces from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
if port <= 0: if port <= 0:
port = find_free_port() port = find_free_port()
@ -72,11 +76,12 @@ async def run(port: int, destination: str) -> None:
for addr in all_addrs: for addr in all_addrs:
print(f"{addr}") print(f"{addr}")
# Use the first address as the default for the client command # Use optimal address for the client command
default_addr = all_addrs[0] optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
print( print(
f"\nRun this from the same folder in another console:\n\n" f"\nRun this from the same folder in another console:\n\n"
f"chat-demo -d {default_addr}\n" f"chat-demo -d {optimal_addr_with_peer}\n"
) )
print("Waiting for incoming connection...") print("Waiting for incoming connection...")

View File

@ -1,6 +1,5 @@
import secrets import secrets
import multiaddr
import trio import trio
from libp2p import ( from libp2p import (
@ -13,6 +12,10 @@ from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID, PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport, Transport as NoiseTransport,
) )
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main(): async def main():
@ -39,14 +42,16 @@ async def main():
# Create a host with the key pair, Noise security, and mplex multiplexer # Create a host with the key pair, Noise security, and mplex multiplexer
host = new_host(key_pair=key_pair, sec_opt=security_options) host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address # Configure the listening address using the new paradigm
port = 8000 port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}") listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host # Start the host
async with host.run(listen_addrs=[listen_addr]): async with host.run(listen_addrs=listen_addrs):
print("libp2p has started with Noise encryption and mplex multiplexing") print("libp2p has started with Noise encryption and mplex multiplexing")
print("libp2p is listening on:", host.get_addrs()) print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running # Keep the host running
await trio.sleep_forever() await trio.sleep_forever()

View File

@ -38,6 +38,10 @@ from libp2p.network.stream.net_stream import (
from libp2p.peer.peerinfo import ( from libp2p.peer.peerinfo import (
info_from_p2p_addr, info_from_p2p_addr,
) )
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
PROTOCOL_ID = TProtocol("/echo/1.0.0") PROTOCOL_ID = TProtocol("/echo/1.0.0")
@ -173,7 +177,9 @@ async def run_enhanced_demo(
""" """
Run enhanced echo demo with NetStream state management. Run enhanced echo demo with NetStream state management.
""" """
listen_addr = multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}") # Use the new address paradigm
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Generate or use provided key # Generate or use provided key
if seed: if seed:
@ -185,7 +191,7 @@ async def run_enhanced_demo(
host = new_host(key_pair=create_new_key_pair(secret)) host = new_host(key_pair=create_new_key_pair(secret))
async with host.run(listen_addrs=[listen_addr]): async with host.run(listen_addrs=listen_addrs):
print(f"Host ID: {host.get_id().to_string()}") print(f"Host ID: {host.get_id().to_string()}")
print("=" * 60) print("=" * 60)
@ -196,10 +202,12 @@ async def run_enhanced_demo(
# type: ignore: Stream is type of NetStream # type: ignore: Stream is type of NetStream
host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler) host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler)
# Use optimal address for client command
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
print( print(
"Run client from another console:\n" "Run client from another console:\n"
f"python3 example_net_stream.py " f"python3 example_net_stream.py "
f"-d {host.get_addrs()[0]}\n" f"-d {optimal_addr_with_peer}\n"
) )
print("Waiting for connections...") print("Waiting for connections...")
print("Press Ctrl+C to stop server") print("Press Ctrl+C to stop server")

View File

@ -1,6 +1,5 @@
import secrets import secrets
import multiaddr
import trio import trio
from libp2p import ( from libp2p import (
@ -13,6 +12,10 @@ from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID, PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport, Transport as NoiseTransport,
) )
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main(): async def main():
@ -39,14 +42,16 @@ async def main():
# Create a host with the key pair, Noise security, and mplex multiplexer # Create a host with the key pair, Noise security, and mplex multiplexer
host = new_host(key_pair=key_pair, sec_opt=security_options) host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address # Configure the listening address using the new paradigm
port = 8000 port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}") listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host # Start the host
async with host.run(listen_addrs=[listen_addr]): async with host.run(listen_addrs=listen_addrs):
print("libp2p has started") print("libp2p has started")
print("libp2p is listening on:", host.get_addrs()) print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running # Keep the host running
await trio.sleep_forever() await trio.sleep_forever()

View File

@ -1,6 +1,5 @@
import secrets import secrets
import multiaddr
import trio import trio
from libp2p import ( from libp2p import (
@ -9,6 +8,10 @@ from libp2p import (
from libp2p.crypto.secp256k1 import ( from libp2p.crypto.secp256k1 import (
create_new_key_pair, create_new_key_pair,
) )
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main(): async def main():
@ -19,14 +22,16 @@ async def main():
# Create a host with the key pair # Create a host with the key pair
host = new_host(key_pair=key_pair) host = new_host(key_pair=key_pair)
# Configure the listening address # Configure the listening address using the new paradigm
port = 8000 port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}") listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host # Start the host
async with host.run(listen_addrs=[listen_addr]): async with host.run(listen_addrs=listen_addrs):
print("libp2p has started with TCP transport") print("libp2p has started with TCP transport")
print("libp2p is listening on:", host.get_addrs()) print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running # Keep the host running
await trio.sleep_forever() await trio.sleep_forever()

View File

@ -27,6 +27,7 @@ from libp2p.peer.peerinfo import (
from libp2p.utils.address_validation import ( from libp2p.utils.address_validation import (
find_free_port, find_free_port,
get_available_interfaces, get_available_interfaces,
get_optimal_binding_address,
) )
# Configure minimal logging # Configure minimal logging
@ -82,9 +83,13 @@ async def run(port: int, destination: str, seed: int | None = None) -> None:
for addr in listen_addr: for addr in listen_addr:
print(f"{addr}/p2p/{peer_id}") print(f"{addr}/p2p/{peer_id}")
# Get optimal address for display
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{peer_id}"
print( print(
"\nRun this from the same folder in another console:\n\n" "\nRun this from the same folder in another console:\n\n"
f"echo-demo -d {host.get_addrs()[0]}\n" f"echo-demo -d {optimal_addr_with_peer}\n"
) )
print("Waiting for incoming connections...") print("Waiting for incoming connections...")
await trio.sleep_forever() await trio.sleep_forever()

View File

@ -63,7 +63,10 @@ def print_identify_response(identify_response: Identify):
async def run(port: int, destination: str, use_varint_format: bool = True) -> None: async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
from libp2p.utils.address_validation import get_available_interfaces from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
if not destination: if not destination:
# Create first host (listener) # Create first host (listener)
@ -100,11 +103,12 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
for addr in all_addrs: for addr in all_addrs:
print(f"{addr}") print(f"{addr}")
# Use the first address as the default for the client command # Use optimal address for the client command
default_addr = all_addrs[0] optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host_a.get_id().to_string()}"
print( print(
f"\nRun this from the same folder in another console:\n\n" f"\nRun this from the same folder in another console:\n\n"
f"identify-demo {format_flag} -d {default_addr}\n" f"identify-demo {format_flag} -d {optimal_addr_with_peer}\n"
) )
print("Waiting for incoming identify request...") print("Waiting for incoming identify request...")
@ -152,6 +156,7 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
from libp2p.utils.address_validation import ( from libp2p.utils.address_validation import (
find_free_port, find_free_port,
get_available_interfaces, get_available_interfaces,
get_optimal_binding_address,
) )
if port <= 0: if port <= 0:

View File

@ -151,7 +151,10 @@ async def run_node(
key_pair = create_new_key_pair(secrets.token_bytes(32)) key_pair = create_new_key_pair(secrets.token_bytes(32))
host = new_host(key_pair=key_pair) host = new_host(key_pair=key_pair)
from libp2p.utils.address_validation import get_available_interfaces from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
listen_addrs = get_available_interfaces(port) listen_addrs = get_available_interfaces(port)
@ -168,9 +171,10 @@ async def run_node(
for addr in all_addrs: for addr in all_addrs:
logger.info(f"{addr}") logger.info(f"{addr}")
# Use the first address as the default for the bootstrap command # Use optimal address for the bootstrap command
default_addr = all_addrs[0] optimal_addr = get_optimal_binding_address(port)
bootstrap_cmd = f"--bootstrap {default_addr}" optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
bootstrap_cmd = f"--bootstrap {optimal_addr_with_peer}"
logger.info("To connect to this node, use: %s", bootstrap_cmd) logger.info("To connect to this node, use: %s", bootstrap_cmd)
await connect_to_bootstrap_nodes(host, bootstrap_nodes) await connect_to_bootstrap_nodes(host, bootstrap_nodes)
@ -182,7 +186,7 @@ async def run_node(
# Save server address in server mode # Save server address in server mode
if dht_mode == DHTMode.SERVER: if dht_mode == DHTMode.SERVER:
save_server_addr(str(default_addr)) save_server_addr(str(optimal_addr_with_peer))
# Start the DHT service # Start the DHT service
async with background_trio_service(dht): async with background_trio_service(dht):

View File

@ -61,7 +61,11 @@ async def send_ping(stream: INetStream) -> None:
async def run(port: int, destination: str) -> None: async def run(port: int, destination: str) -> None:
from libp2p.utils.address_validation import find_free_port, get_available_interfaces from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
if port <= 0: if port <= 0:
port = find_free_port() port = find_free_port()
@ -83,11 +87,12 @@ async def run(port: int, destination: str) -> None:
for addr in all_addrs: for addr in all_addrs:
print(f"{addr}") print(f"{addr}")
# Use the first address as the default for the client command # Use optimal address for the client command
default_addr = all_addrs[0] optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
print( print(
f"\nRun this from the same folder in another console:\n\n" f"\nRun this from the same folder in another console:\n\n"
f"ping-demo -d {default_addr}\n" f"ping-demo -d {optimal_addr_with_peer}\n"
) )
print("Waiting for incoming connection...") print("Waiting for incoming connection...")

View File

@ -102,7 +102,10 @@ async def monitor_peer_topics(pubsub, nursery, termination_event):
async def run(topic: str, destination: str | None, port: int | None) -> None: async def run(topic: str, destination: str | None, port: int | None) -> None:
from libp2p.utils.address_validation import get_available_interfaces from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
if port is None or port == 0: if port is None or port == 0:
port = find_free_port() port = find_free_port()
@ -162,11 +165,14 @@ async def run(topic: str, destination: str | None, port: int | None) -> None:
for addr in all_addrs: for addr in all_addrs:
logger.info(f"{addr}") logger.info(f"{addr}")
# Use the first address as the default for the client command # Use optimal address for the client command
default_addr = all_addrs[0] optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = (
f"{optimal_addr}/p2p/{host.get_id().to_string()}"
)
logger.info( logger.info(
f"\nRun this from the same folder in another console:\n\n" f"\nRun this from the same folder in another console:\n\n"
f"pubsub-demo -d {default_addr}\n" f"pubsub-demo -d {optimal_addr_with_peer}\n"
) )
logger.info("Waiting for peers...") logger.info("Waiting for peers...")

View File

@ -3,38 +3,24 @@ from __future__ import annotations
import socket import socket
from multiaddr import Multiaddr from multiaddr import Multiaddr
from multiaddr.utils import get_network_addrs, get_thin_waist_addresses
try:
from multiaddr.utils import ( # type: ignore
get_network_addrs,
get_thin_waist_addresses,
)
_HAS_THIN_WAIST = True
except ImportError: # pragma: no cover - only executed in older environments
_HAS_THIN_WAIST = False
get_thin_waist_addresses = None # type: ignore
get_network_addrs = None # type: ignore
def _safe_get_network_addrs(ip_version: int) -> list[str]: def _safe_get_network_addrs(ip_version: int) -> list[str]:
""" """
Internal safe wrapper. Returns a list of IP addresses for the requested IP version. Internal safe wrapper. Returns a list of IP addresses for the requested IP version.
Falls back to minimal defaults when Thin Waist helpers are missing.
:param ip_version: 4 or 6 :param ip_version: 4 or 6
""" """
if _HAS_THIN_WAIST and get_network_addrs: try:
try: return get_network_addrs(ip_version) or []
return get_network_addrs(ip_version) or [] except Exception: # pragma: no cover - defensive
except Exception: # pragma: no cover - defensive # Fallback behavior (very conservative)
return [] if ip_version == 4:
# Fallback behavior (very conservative) return ["127.0.0.1"]
if ip_version == 4: if ip_version == 6:
return ["127.0.0.1"] return ["::1"]
if ip_version == 6: return []
return ["::1"]
return []
def find_free_port() -> int: def find_free_port() -> int:
@ -47,16 +33,13 @@ def find_free_port() -> int:
def _safe_expand(addr: Multiaddr, port: int | None = None) -> list[Multiaddr]: def _safe_expand(addr: Multiaddr, port: int | None = None) -> list[Multiaddr]:
""" """
Internal safe expansion wrapper. Returns a list of Multiaddr objects. Internal safe expansion wrapper. Returns a list of Multiaddr objects.
If Thin Waist isn't available, returns [addr] (identity).
""" """
if _HAS_THIN_WAIST and get_thin_waist_addresses: try:
try: if port is not None:
if port is not None: return get_thin_waist_addresses(addr, port=port) or []
return get_thin_waist_addresses(addr, port=port) or [] return get_thin_waist_addresses(addr) or []
return get_thin_waist_addresses(addr) or [] except Exception: # pragma: no cover - defensive
except Exception: # pragma: no cover - defensive return [addr]
return [addr]
return [addr]
def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr]: def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr]:
@ -122,6 +105,20 @@ def expand_wildcard_address(
return expanded return expanded
def get_wildcard_address(port: int, protocol: str = "tcp") -> Multiaddr:
"""
Get wildcard address (0.0.0.0) when explicitly needed.
This function provides access to wildcard binding as a feature when
explicitly required, preserving the ability to bind to all interfaces.
:param port: Port number.
:param protocol: Transport protocol.
:return: A Multiaddr with wildcard binding (0.0.0.0).
"""
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
def get_optimal_binding_address(port: int, protocol: str = "tcp") -> Multiaddr: def get_optimal_binding_address(port: int, protocol: str = "tcp") -> Multiaddr:
""" """
Choose an optimal address for an example to bind to: Choose an optimal address for an example to bind to:
@ -157,6 +154,7 @@ def get_optimal_binding_address(port: int, protocol: str = "tcp") -> Multiaddr:
__all__ = [ __all__ = [
"get_available_interfaces", "get_available_interfaces",
"get_optimal_binding_address", "get_optimal_binding_address",
"get_wildcard_address",
"expand_wildcard_address", "expand_wildcard_address",
"find_free_port", "find_free_port",
] ]

View File

@ -1,12 +1,12 @@
""" """
Tests to verify that all examples use 127.0.0.1 instead of 0.0.0.0 Tests to verify that all examples use the new address paradigm consistently
""" """
from pathlib import Path from pathlib import Path
class TestExamplesBindAddress: class TestExamplesAddressParadigm:
"""Test suite to verify all examples use secure bind addresses""" """Test suite to verify all examples use the new address paradigm consistently"""
def get_example_files(self): def get_example_files(self):
"""Get all Python files in the examples directory""" """Get all Python files in the examples directory"""
@ -32,49 +32,26 @@ class TestExamplesBindAddress:
return found_wildcards return found_wildcards
def test_no_wildcard_binding_in_examples(self): def test_examples_use_address_paradigm(self):
"""Test that no example files use 0.0.0.0 for binding""" """Test that examples use the new address paradigm functions"""
example_files = self.get_example_files() example_files = self.get_example_files()
# Skip certain files that might legitimately discuss wildcards # Files that should use the new paradigm
skip_files = [ networking_examples = [
"network_discover.py", # This demonstrates wildcard expansion "echo/echo.py",
]
files_with_wildcards = {}
for filepath in example_files:
if any(skip in str(filepath) for skip in skip_files):
continue
wildcards = self.check_file_for_wildcard_binding(filepath)
if wildcards:
files_with_wildcards[str(filepath)] = wildcards
# Assert no wildcards found
if files_with_wildcards:
error_msg = "Found wildcard bindings in example files:\n"
for filepath, occurrences in files_with_wildcards.items():
error_msg += f"\n{filepath}:\n"
for line_num, line in occurrences:
error_msg += f" Line {line_num}: {line}\n"
assert False, error_msg
def test_examples_use_loopback_address(self):
"""Test that examples use 127.0.0.1 for local binding"""
example_files = self.get_example_files()
# Files that should contain listen addresses
files_with_networking = [
"ping/ping.py",
"chat/chat.py", "chat/chat.py",
"ping/ping.py",
"bootstrap/bootstrap.py", "bootstrap/bootstrap.py",
"pubsub/pubsub.py", "pubsub/pubsub.py",
"identify/identify.py", "identify/identify.py",
] ]
for filename in files_with_networking: paradigm_functions = [
"get_available_interfaces",
"get_optimal_binding_address",
]
for filename in networking_examples:
filepath = None filepath = None
for example_file in example_files: for example_file in example_files:
if filename in str(example_file): if filename in str(example_file):
@ -87,24 +64,54 @@ class TestExamplesBindAddress:
with open(filepath, encoding="utf-8") as f: with open(filepath, encoding="utf-8") as f:
content = f.read() content = f.read()
# Check for proper loopback usage # Check that the file uses the new paradigm functions
has_loopback = "127.0.0.1" in content or "localhost" in content for func in paradigm_functions:
has_multiaddr_loopback = "/ip4/127.0.0.1/" in content assert func in content, (
f"{filepath} should use {func} from the new address paradigm"
)
assert has_loopback or has_multiaddr_loopback, ( def test_wildcard_available_as_feature(self):
f"{filepath} should use loopback address (127.0.0.1)" """Test that wildcard is available as a feature when needed"""
example_files = self.get_example_files()
# Check that network_discover.py demonstrates wildcard usage
network_discover_file = None
for example_file in example_files:
if "network_discover.py" in str(example_file):
network_discover_file = example_file
break
if network_discover_file:
with open(network_discover_file, encoding="utf-8") as f:
content = f.read()
# Should demonstrate wildcard expansion
assert "0.0.0.0" in content, (
f"{network_discover_file} should demonstrate wildcard usage"
)
assert "expand_wildcard_address" in content, (
f"{network_discover_file} should use expand_wildcard_address"
) )
def test_doc_examples_use_loopback(self): def test_doc_examples_use_paradigm(self):
"""Test that documentation examples use secure addresses""" """Test that documentation examples use the new address paradigm"""
doc_examples_dir = Path("examples/doc-examples") doc_examples_dir = Path("examples/doc-examples")
if not doc_examples_dir.exists(): if not doc_examples_dir.exists():
return return
doc_example_files = list(doc_examples_dir.glob("*.py")) doc_example_files = list(doc_examples_dir.glob("*.py"))
paradigm_functions = [
"get_available_interfaces",
"get_optimal_binding_address",
]
for filepath in doc_example_files: for filepath in doc_example_files:
wildcards = self.check_file_for_wildcard_binding(filepath) with open(filepath, encoding="utf-8") as f:
assert not wildcards, ( content = f.read()
f"Documentation example {filepath} contains wildcard binding"
) # Check that doc examples use the new paradigm
for func in paradigm_functions:
assert func in content, (
f"Documentation example {filepath} should use {func}"
)

View File

@ -1,5 +1,5 @@
""" """
Tests for default bind address changes from 0.0.0.0 to 127.0.0.1 Tests for the new address paradigm with wildcard support as a feature
""" """
import pytest import pytest
@ -9,28 +9,43 @@ from libp2p import new_host
from libp2p.utils.address_validation import ( from libp2p.utils.address_validation import (
get_available_interfaces, get_available_interfaces,
get_optimal_binding_address, get_optimal_binding_address,
get_wildcard_address,
) )
class TestDefaultBindAddress: class TestAddressParadigm:
""" """
Test suite for verifying default bind addresses use Test suite for verifying the new address paradigm:
secure addresses (not 0.0.0.0) - get_available_interfaces() returns all available interfaces
- get_optimal_binding_address() returns optimal address for examples
- get_wildcard_address() provides wildcard as a feature when needed
""" """
def test_default_bind_address_is_not_wildcard(self): def test_wildcard_address_function(self):
"""Test that default bind address is NOT 0.0.0.0 (wildcard)""" """Test that get_wildcard_address() provides wildcard as a feature"""
port = 8000
addr = get_wildcard_address(port)
# Should return wildcard address when explicitly requested
assert "0.0.0.0" in str(addr)
addr_str = str(addr)
assert "/ip4/" in addr_str
assert f"/tcp/{port}" in addr_str
def test_optimal_binding_address_selection(self):
"""Test that optimal binding address uses good heuristics"""
port = 8000 port = 8000
addr = get_optimal_binding_address(port) addr = get_optimal_binding_address(port)
# Should NOT return wildcard address
assert "0.0.0.0" not in str(addr)
# Should return a valid IP address (could be loopback or local network) # Should return a valid IP address (could be loopback or local network)
addr_str = str(addr) addr_str = str(addr)
assert "/ip4/" in addr_str assert "/ip4/" in addr_str
assert f"/tcp/{port}" in addr_str assert f"/tcp/{port}" in addr_str
# Should be from available interfaces
available_interfaces = get_available_interfaces(port)
assert addr in available_interfaces
def test_available_interfaces_includes_loopback(self): def test_available_interfaces_includes_loopback(self):
"""Test that available interfaces always includes loopback address""" """Test that available interfaces always includes loopback address"""
port = 8000 port = 8000
@ -43,9 +58,12 @@ class TestDefaultBindAddress:
loopback_found = any("127.0.0.1" in str(addr) for addr in interfaces) loopback_found = any("127.0.0.1" in str(addr) for addr in interfaces)
assert loopback_found, "Loopback address not found in available interfaces" assert loopback_found, "Loopback address not found in available interfaces"
# Should not have wildcard as the only option # Available interfaces should not include wildcard by default
if len(interfaces) == 1: # (wildcard is available as a feature through get_wildcard_address())
assert "0.0.0.0" not in str(interfaces[0]) wildcard_found = any("0.0.0.0" in str(addr) for addr in interfaces)
assert not wildcard_found, (
"Wildcard should not be in default available interfaces"
)
def test_host_default_listen_address(self): def test_host_default_listen_address(self):
"""Test that new hosts use secure default addresses""" """Test that new hosts use secure default addresses"""
@ -59,55 +77,66 @@ class TestDefaultBindAddress:
# Note: We can't test actual binding without running the host, # Note: We can't test actual binding without running the host,
# but we've verified the address format is correct # but we've verified the address format is correct
def test_no_wildcard_in_fallback(self): def test_paradigm_consistency(self):
"""Test that fallback addresses don't use wildcard binding""" """Test that the address paradigm is consistent"""
# When no interfaces are discovered, fallback should be loopback
port = 8000 port = 8000
# Even if we can't discover interfaces, we should get loopback # get_optimal_binding_address should return a valid address
addr = get_optimal_binding_address(port) optimal_addr = get_optimal_binding_address(port)
# Should NOT be wildcard assert "/ip4/" in str(optimal_addr)
assert "0.0.0.0" not in str(addr) assert f"/tcp/{port}" in str(optimal_addr)
# Should be a valid IP address # get_wildcard_address should return wildcard when explicitly needed
addr_str = str(addr) wildcard_addr = get_wildcard_address(port)
assert "/ip4/" in addr_str assert "0.0.0.0" in str(wildcard_addr)
assert f"/tcp/{port}" in addr_str assert f"/tcp/{port}" in str(wildcard_addr)
# Both should be valid Multiaddr objects
assert isinstance(optimal_addr, Multiaddr)
assert isinstance(wildcard_addr, Multiaddr)
@pytest.mark.parametrize("protocol", ["tcp", "udp"]) @pytest.mark.parametrize("protocol", ["tcp", "udp"])
def test_different_protocols_use_secure_addresses(self, protocol): def test_different_protocols_support(self, protocol):
"""Test that different protocols still use secure addresses by default""" """Test that different protocols are supported by the paradigm"""
port = 8000 port = 8000
addr = get_optimal_binding_address(port, protocol=protocol)
# Should NOT be wildcard # Test optimal address with different protocols
assert "0.0.0.0" not in str(addr) optimal_addr = get_optimal_binding_address(port, protocol=protocol)
assert protocol in str(addr) assert protocol in str(optimal_addr)
assert f"/{protocol}/{port}" in str(optimal_addr)
# Should be a valid IP address # Test wildcard address with different protocols
addr_str = str(addr) wildcard_addr = get_wildcard_address(port, protocol=protocol)
assert "/ip4/" in addr_str assert "0.0.0.0" in str(wildcard_addr)
assert f"/{protocol}/{port}" in addr_str assert protocol in str(wildcard_addr)
assert f"/{protocol}/{port}" in str(wildcard_addr)
def test_security_no_public_binding_by_default(self): # Test available interfaces with different protocols
"""Test that no public interface binding occurs by default""" interfaces = get_available_interfaces(port, protocol=protocol)
assert len(interfaces) > 0
for addr in interfaces:
assert protocol in str(addr)
def test_wildcard_available_as_feature(self):
"""Test that wildcard binding is available as a feature when needed"""
port = 8000 port = 8000
# Wildcard should be available through get_wildcard_address()
wildcard_addr = get_wildcard_address(port)
assert "0.0.0.0" in str(wildcard_addr)
# But should not be in default available interfaces
interfaces = get_available_interfaces(port) interfaces = get_available_interfaces(port)
wildcard_in_interfaces = any("0.0.0.0" in str(addr) for addr in interfaces)
# Check that we don't expose on all interfaces by default assert not wildcard_in_interfaces, (
wildcard_addrs = [addr for addr in interfaces if "0.0.0.0" in str(addr)] "Wildcard should not be in default interfaces"
assert len(wildcard_addrs) == 0, (
"Found wildcard addresses in default interfaces"
) )
# Verify optimal address selection doesn't choose wildcard # Optimal address should not be wildcard by default
optimal = get_optimal_binding_address(port) optimal = get_optimal_binding_address(port)
assert "0.0.0.0" not in str(optimal), "Optimal address should not be wildcard" assert "0.0.0.0" not in str(optimal), (
"Optimal address should not be wildcard by default"
# Should be a valid IP address (could be loopback or local network) )
addr_str = str(optimal)
assert "/ip4/" in addr_str
assert f"/tcp/{port}" in addr_str
def test_loopback_is_always_available(self): def test_loopback_is_always_available(self):
"""Test that loopback address is always available as an option""" """Test that loopback address is always available as an option"""
@ -132,9 +161,6 @@ class TestDefaultBindAddress:
interfaces = get_available_interfaces(port) interfaces = get_available_interfaces(port)
optimal = get_optimal_binding_address(port) optimal = get_optimal_binding_address(port)
# Should never return wildcard
assert "0.0.0.0" not in str(optimal)
# Should return one of the available interfaces # Should return one of the available interfaces
optimal_str = str(optimal) optimal_str = str(optimal)
interface_strs = [str(addr) for addr in interfaces] interface_strs = [str(addr) for addr in interfaces]
@ -142,7 +168,7 @@ class TestDefaultBindAddress:
f"Optimal address {optimal_str} should be in available interfaces" f"Optimal address {optimal_str} should be in available interfaces"
) )
# If non-loopback interfaces are available, should prefer them # Should prefer non-loopback when available, fallback to loopback
non_loopback_interfaces = [ non_loopback_interfaces = [
addr for addr in interfaces if "127.0.0.1" not in str(addr) addr for addr in interfaces if "127.0.0.1" not in str(addr)
] ]
@ -157,23 +183,21 @@ class TestDefaultBindAddress:
"Should use loopback when no other interfaces available" "Should use loopback when no other interfaces available"
) )
def test_address_validation_utilities_behavior(self): def test_address_paradigm_completeness(self):
"""Test that address validation utilities behave as expected""" """Test that the address paradigm provides all necessary functionality"""
port = 8000 port = 8000
# Test that we get multiple interface options # Test that we get interface options
interfaces = get_available_interfaces(port) interfaces = get_available_interfaces(port)
assert len(interfaces) >= 2, ( assert len(interfaces) >= 1, "Should have at least one interface"
"Should have at least loopback + one network interface"
)
# Test that loopback is always included # Test that loopback is always included
has_loopback = any("127.0.0.1" in str(addr) for addr in interfaces) has_loopback = any("127.0.0.1" in str(addr) for addr in interfaces)
assert has_loopback, "Loopback should always be available" assert has_loopback, "Loopback should always be available"
# Test that no wildcards are included # Test that wildcard is available as a feature
has_wildcard = any("0.0.0.0" in str(addr) for addr in interfaces) wildcard_addr = get_wildcard_address(port)
assert not has_wildcard, "Wildcard addresses should never be included" assert "0.0.0.0" in str(wildcard_addr)
# Test optimal selection # Test optimal selection
optimal = get_optimal_binding_address(port) optimal = get_optimal_binding_address(port)