Merge pull request #892 from yashksaini-coder/fix/885-Update-default-Bind-address

Fix/885 update default bind address
This commit is contained in:
Manu Sheel Gupta
2025-09-23 00:23:00 +05:30
committed by GitHub
34 changed files with 881 additions and 206 deletions

View File

@ -36,12 +36,14 @@ Create a file named ``relay_node.py`` with the following content:
from libp2p.relay.circuit_v2.transport import CircuitV2Transport
from libp2p.relay.circuit_v2.config import RelayConfig
from libp2p.tools.async_service import background_trio_service
from libp2p.utils import get_wildcard_address
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("relay_node")
async def run_relay():
listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/9000")
# Use wildcard address to listen on all interfaces
listen_addr = get_wildcard_address(9000)
host = new_host()
config = RelayConfig(
@ -107,6 +109,7 @@ Create a file named ``destination_node.py`` with the following content:
from libp2p.relay.circuit_v2.config import RelayConfig
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.tools.async_service import background_trio_service
from libp2p.utils import get_wildcard_address
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("destination_node")
@ -139,7 +142,8 @@ Create a file named ``destination_node.py`` with the following content:
Run a simple destination node that accepts connections.
This is a simplified version that doesn't use the relay functionality.
"""
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/9001")
# Create a libp2p host - use wildcard address to listen on all interfaces
listen_addr = get_wildcard_address(9001)
host = new_host()
# Configure as a relay receiver (stop)
@ -252,14 +256,15 @@ Create a file named ``source_node.py`` with the following content:
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.tools.async_service import background_trio_service
from libp2p.relay.circuit_v2.discovery import RelayInfo
from libp2p.utils import get_wildcard_address
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("source_node")
async def run_source(relay_peer_id=None, destination_peer_id=None):
# Create a libp2p host
listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/9002")
# Create a libp2p host - use wildcard address to listen on all interfaces
listen_addr = get_wildcard_address(9002)
host = new_host()
# Configure as a relay client
@ -428,7 +433,7 @@ Running the Example
Relay node multiaddr: /ip4/127.0.0.1/tcp/9000/p2p/QmaUigQJ9nJERa6GaZuyfaiX91QjYwoQJ46JS3k7ys7SLx
==================================================
Listening on: [<Multiaddr /ip4/0.0.0.0/tcp/9000/p2p/QmaUigQJ9nJERa6GaZuyfaiX91QjYwoQJ46JS3k7ys7SLx>]
Listening on: [<Multiaddr /ip4/127.0.0.1/tcp/9000/p2p/QmaUigQJ9nJERa6GaZuyfaiX91QjYwoQJ46JS3k7ys7SLx>]
Protocol service started
Relay service started successfully
Relay limits: RelayLimits(duration=3600, data=10485760, max_circuit_conns=8, max_reservations=4)
@ -447,7 +452,7 @@ Running the Example
Use this ID in the source node: QmPBr38KeQG2ibyL4fxq6yJWpfoVNCqJMHBdNyn1Qe4h5s
==================================================
Listening on: [<Multiaddr /ip4/0.0.0.0/tcp/9001/p2p/QmPBr38KeQG2ibyL4fxq6yJWpfoVNCqJMHBdNyn1Qe4h5s>]
Listening on: [<Multiaddr /ip4/127.0.0.1/tcp/9001/p2p/QmPBr38KeQG2ibyL4fxq6yJWpfoVNCqJMHBdNyn1Qe4h5s>]
Registered echo protocol handler
Protocol service started
Transport created
@ -469,7 +474,7 @@ Running the Example
$ python source_node.py
Source node started with ID: QmPyM56cgmFoHTgvMgGfDWRdVRQznmxCDDDg2dJ8ygVXj3
Listening on: [<Multiaddr /ip4/0.0.0.0/tcp/9002/p2p/QmPyM56cgmFoHTgvMgGfDWRdVRQznmxCDDDg2dJ8ygVXj3>]
Listening on: [<Multiaddr /ip4/127.0.0.1/tcp/9002/p2p/QmPyM56cgmFoHTgvMgGfDWRdVRQznmxCDDDg2dJ8ygVXj3>]
Protocol service started
No relay peer ID provided. Please enter the relay\'s peer ID:
Enter relay peer ID: QmaUigQJ9nJERa6GaZuyfaiX91QjYwoQJ46JS3k7ys7SLx

View File

@ -12,7 +12,7 @@ This example demonstrates how to use the libp2p ``identify`` protocol.
$ identify-demo
First host listening. Run this from another console:
identify-demo -p 8889 -d /ip4/0.0.0.0/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
identify-demo -p 8889 -d /ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
Waiting for incoming identify request...
@ -21,13 +21,13 @@ folder and paste it in:
.. code-block:: console
$ identify-demo -p 8889 -d /ip4/0.0.0.0/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
dialer (host_b) listening on /ip4/0.0.0.0/tcp/8889
$ identify-demo -p 8889 -d /ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
dialer (host_b) listening on /ip4/127.0.0.1/tcp/8889
Second host connecting to peer: QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
Starting identify protocol...
Identify response:
Public Key (Base64): CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDC6c/oNPP9X13NDQ3Xrlp3zOj+ErXIWb/A4JGwWchiDBwMhMslEX3ct8CqI0BqUYKuwdFjowqqopOJ3cS2MlqtGaiP6Dg9bvGqSDoD37BpNaRVNcebRxtB0nam9SQy3PYLbHAmz0vR4ToSiL9OLRORnGOxCtHBuR8ZZ5vS0JEni8eQMpNa7IuXwyStnuty/QjugOZudBNgYSr8+9gH722KTjput5IRL7BrpIdd4HNXGVRm4b9BjNowvHu404x3a/ifeNblpy/FbYyFJEW0looygKF7hpRHhRbRKIDZt2BqOfT1sFkbqsHE85oY859+VMzP61YELgvGwai2r7KcjkW/AgMBAAE=
Listen Addresses: ['/ip4/0.0.0.0/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM']
Listen Addresses: ['/ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM']
Protocols: ['/ipfs/id/1.0.0', '/ipfs/ping/1.0.0']
Observed Address: ['/ip4/127.0.0.1/tcp/38082']
Protocol Version: ipfs/0.1.0

View File

@ -34,11 +34,11 @@ There is also a more interactive version of the example which runs as separate l
==== Starting Identify-Push Listener on port 8888 ====
Listener host ready!
Listening on: /ip4/0.0.0.0/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
Listening on: /ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
Peer ID: QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
Run dialer with command:
identify-push-listener-dialer-demo -d /ip4/0.0.0.0/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
identify-push-listener-dialer-demo -d /ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
Waiting for incoming connections... (Ctrl+C to exit)
@ -47,12 +47,12 @@ folder and paste it in:
.. code-block:: console
$ identify-push-listener-dialer-demo -d /ip4/0.0.0.0/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
$ identify-push-listener-dialer-demo -d /ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
==== Starting Identify-Push Dialer on port 8889 ====
Dialer host ready!
Listening on: /ip4/0.0.0.0/tcp/8889/p2p/QmZyXwVuTaBcDeRsSkJpOpWrSt
Listening on: /ip4/127.0.0.1/tcp/8889/p2p/QmZyXwVuTaBcDeRsSkJpOpWrSt
Connecting to peer: QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
Successfully connected to listener!

View File

@ -15,7 +15,7 @@ This example demonstrates how to create a chat application using libp2p's PubSub
2025-04-06 23:59:17,471 - pubsub-demo - INFO - Your selected topic is: pubsub-chat
2025-04-06 23:59:17,472 - pubsub-demo - INFO - Using random available port: 33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Node started with peer ID: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Listening on: /ip4/0.0.0.0/tcp/33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Listening on: /ip4/127.0.0.1/tcp/33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Initializing PubSub and GossipSub...
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Pubsub and GossipSub services started.
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Pubsub ready.
@ -35,7 +35,7 @@ Copy the line that starts with ``pubsub-demo -d``, open a new terminal and paste
2025-04-07 00:00:59,846 - pubsub-demo - INFO - Your selected topic is: pubsub-chat
2025-04-07 00:00:59,846 - pubsub-demo - INFO - Using random available port: 51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Node started with peer ID: QmYQKCm95Ut1aXsjHmWVYqdaVbno1eKTYC8KbEVjqUaKaQ
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Listening on: /ip4/0.0.0.0/tcp/51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Listening on: /ip4/127.0.0.1/tcp/51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Initializing PubSub and GossipSub...
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Pubsub and GossipSub services started.
2025-04-07 00:00:59,865 - pubsub-demo - INFO - Pubsub ready.

View File

@ -23,7 +23,7 @@ The Random Walk implementation performs the following key operations:
2025-08-12 19:51:25,424 - random-walk-example - INFO - Mode: server, Port: 0 Demo interval: 30s
2025-08-12 19:51:25,426 - random-walk-example - INFO - Starting server node on port 45123
2025-08-12 19:51:25,426 - random-walk-example - INFO - Node peer ID: 16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-08-12 19:51:25,426 - random-walk-example - INFO - Node address: /ip4/0.0.0.0/tcp/45123/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-08-12 19:51:25,426 - random-walk-example - INFO - Node address: /ip4/127.0.0.1/tcp/45123/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-08-12 19:51:25,427 - random-walk-example - INFO - Initial routing table size: 0
2025-08-12 19:51:25,427 - random-walk-example - INFO - DHT service started in SERVER mode
2025-08-12 19:51:25,430 - libp2p.discovery.random_walk.rt_refresh_manager - INFO - RT Refresh Manager started

View File

@ -14,11 +14,26 @@ try:
expand_wildcard_address,
get_available_interfaces,
get_optimal_binding_address,
get_wildcard_address,
)
except ImportError:
# Fallbacks if utilities are missing
# Fallbacks if utilities are missing - use minimal network discovery
import socket
def get_available_interfaces(port: int, protocol: str = "tcp"):
return [Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")]
# Try to get local network interfaces, fallback to loopback
addrs = []
try:
# Get hostname IP (better than hardcoded localhost)
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
if local_ip != "127.0.0.1":
addrs.append(Multiaddr(f"/ip4/{local_ip}/{protocol}/{port}"))
except Exception:
pass
# Always include loopback as fallback
addrs.append(Multiaddr(f"/ip4/127.0.0.1/{protocol}/{port}"))
return addrs
def expand_wildcard_address(addr: Multiaddr, port: int | None = None):
if port is None:
@ -27,6 +42,15 @@ except ImportError:
return [Multiaddr(addr_str + f"/{port}")]
def get_optimal_binding_address(port: int, protocol: str = "tcp"):
# Try to get a non-loopback address first
interfaces = get_available_interfaces(port, protocol)
for addr in interfaces:
if "127.0.0.1" not in str(addr):
return addr
# Fallback to loopback if no other interfaces found
return Multiaddr(f"/ip4/127.0.0.1/{protocol}/{port}")
def get_wildcard_address(port: int, protocol: str = "tcp"):
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
@ -37,7 +61,10 @@ def main() -> None:
for a in interfaces:
print(f" - {a}")
wildcard_v4 = Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
# Demonstrate wildcard address as a feature
wildcard_v4 = get_wildcard_address(port)
print(f"\nWildcard address (feature): {wildcard_v4}")
expanded_v4 = expand_wildcard_address(wildcard_v4)
print("\nExpanded IPv4 wildcard:")
for a in expanded_v4:

View File

@ -2,7 +2,6 @@ import argparse
import logging
import secrets
import multiaddr
import trio
from libp2p import new_host
@ -54,18 +53,26 @@ BOOTSTRAP_PEERS = [
async def run(port: int, bootstrap_addrs: list[str]) -> None:
"""Run the bootstrap discovery example."""
from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
if port <= 0:
port = find_free_port()
# Generate key pair
secret = secrets.token_bytes(32)
key_pair = create_new_key_pair(secret)
# Create listen address
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
# Create listen addresses for all available interfaces
listen_addrs = get_available_interfaces(port)
# Register peer discovery handler
peerDiscovery.register_peer_discovered_handler(on_peer_discovery)
logger.info("🚀 Starting Bootstrap Discovery Example")
logger.info(f"📍 Listening on: {listen_addr}")
logger.info(f"🌐 Bootstrap peers: {len(bootstrap_addrs)}")
print("\n" + "=" * 60)
@ -80,7 +87,22 @@ async def run(port: int, bootstrap_addrs: list[str]) -> None:
host = new_host(key_pair=key_pair, bootstrap=bootstrap_addrs)
try:
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
# Get all available addresses with peer ID
all_addrs = host.get_addrs()
logger.info("Listener ready, listening on:")
print("Listener ready, listening on:")
for addr in all_addrs:
logger.info(f"{addr}")
print(f"{addr}")
# Display optimal address for reference
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
logger.info(f"Optimal address: {optimal_addr_with_peer}")
print(f"Optimal address: {optimal_addr_with_peer}")
# Keep running and log peer discovery events
await trio.sleep_forever()
except KeyboardInterrupt:
@ -98,7 +120,7 @@ def main() -> None:
Usage:
python bootstrap.py -p 8000
python bootstrap.py -p 8001 --custom-bootstrap \\
"/ip4/127.0.0.1/tcp/8000/p2p/QmYourPeerID"
"/ip4/[HOST_IP]/tcp/8000/p2p/QmYourPeerID"
"""
parser = argparse.ArgumentParser(

View File

@ -1,4 +1,5 @@
import argparse
import logging
import sys
import multiaddr
@ -17,6 +18,11 @@ from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
# Configure minimal logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("multiaddr").setLevel(logging.WARNING)
logging.getLogger("libp2p").setLevel(logging.WARNING)
PROTOCOL_ID = TProtocol("/chat/1.0.0")
MAX_READ_LEN = 2**32 - 1
@ -40,9 +46,18 @@ async def write_data(stream: INetStream) -> None:
async def run(port: int, destination: str) -> None:
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
if port <= 0:
port = find_free_port()
listen_addrs = get_available_interfaces(port)
host = new_host()
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
# Start the peer-store cleanup task
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
@ -54,10 +69,19 @@ async def run(port: int, destination: str) -> None:
host.set_stream_handler(PROTOCOL_ID, stream_handler)
# Get all available addresses with peer ID
all_addrs = host.get_addrs()
print("Listener ready, listening on:\n")
for addr in all_addrs:
print(f"{addr}")
# Use optimal address for the client command
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
print(
"Run this from the same folder in another console:\n\n"
f"chat-demo "
f"-d {host.get_addrs()[0]}\n"
f"\nRun this from the same folder in another console:\n\n"
f"chat-demo -d {optimal_addr_with_peer}\n"
)
print("Waiting for incoming connection...")
@ -86,7 +110,7 @@ def main() -> None:
where <DESTINATION> is the multiaddress of the previous listener host.
"""
example_maddr = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
"/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")

View File

@ -1,6 +1,5 @@
import secrets
import multiaddr
import trio
from libp2p import (
@ -9,9 +8,10 @@ from libp2p import (
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.security.insecure.transport import (
PLAINTEXT_PROTOCOL_ID,
InsecureTransport,
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
@ -38,17 +38,19 @@ async def main():
# Create a host with the key pair and insecure transport
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
# Configure the listening address using the new paradigm
port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
print(
"libp2p has started with insecure transport "
"(not recommended for production)"
)
print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running
await trio.sleep_forever()

View File

@ -1,6 +1,5 @@
import secrets
import multiaddr
import trio
from libp2p import (
@ -13,6 +12,10 @@ from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main():
@ -39,14 +42,16 @@ async def main():
# Create a host with the key pair and Noise security
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
# Configure the listening address using the new paradigm
port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
print("libp2p has started with Noise encryption")
print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running
await trio.sleep_forever()

View File

@ -1,6 +1,5 @@
import secrets
import multiaddr
import trio
from libp2p import (
@ -13,6 +12,10 @@ from libp2p.security.secio.transport import (
ID as SECIO_PROTOCOL_ID,
Transport as SecioTransport,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main():
@ -32,14 +35,16 @@ async def main():
# Create a host with the key pair and SECIO security
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
# Configure the listening address using the new paradigm
port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
print("libp2p has started with SECIO encryption")
print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running
await trio.sleep_forever()

View File

@ -1,6 +1,5 @@
import secrets
import multiaddr
import trio
from libp2p import (
@ -13,6 +12,10 @@ from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main():
@ -39,14 +42,16 @@ async def main():
# Create a host with the key pair, Noise security, and mplex multiplexer
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
# Configure the listening address using the new paradigm
port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
print("libp2p has started with Noise encryption and mplex multiplexing")
print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running
await trio.sleep_forever()

View File

@ -38,6 +38,10 @@ from libp2p.network.stream.net_stream import (
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
PROTOCOL_ID = TProtocol("/echo/1.0.0")
@ -173,7 +177,9 @@ async def run_enhanced_demo(
"""
Run enhanced echo demo with NetStream state management.
"""
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
# Use the new address paradigm
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Generate or use provided key
if seed:
@ -185,7 +191,7 @@ async def run_enhanced_demo(
host = new_host(key_pair=create_new_key_pair(secret))
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
print(f"Host ID: {host.get_id().to_string()}")
print("=" * 60)
@ -196,10 +202,12 @@ async def run_enhanced_demo(
# type: ignore: Stream is type of NetStream
host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler)
# Use optimal address for client command
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
print(
"Run client from another console:\n"
f"python3 example_net_stream.py "
f"-d {host.get_addrs()[0]}\n"
f"-d {optimal_addr_with_peer}\n"
)
print("Waiting for connections...")
print("Press Ctrl+C to stop server")
@ -226,7 +234,7 @@ async def run_enhanced_demo(
def main() -> None:
example_maddr = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
"/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(

View File

@ -1,6 +1,6 @@
import secrets
import multiaddr
from multiaddr import Multiaddr
import trio
from libp2p import (
@ -16,6 +16,10 @@ from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main():
@ -42,14 +46,16 @@ async def main():
# Create a host with the key pair, Noise security, and mplex multiplexer
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
# Configure the listening address using the new paradigm
port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
print("libp2p has started")
print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Connect to bootstrap peers manually
bootstrap_list = [
@ -61,7 +67,7 @@ async def main():
for addr in bootstrap_list:
try:
peer_info = info_from_p2p_addr(multiaddr.Multiaddr(addr))
peer_info = info_from_p2p_addr(Multiaddr(addr))
await host.connect(peer_info)
print(f"Connected to {peer_info.peer_id.to_string()}")
except Exception as e:

View File

@ -1,6 +1,5 @@
import secrets
import multiaddr
import trio
from libp2p import (
@ -9,6 +8,10 @@ from libp2p import (
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main():
@ -19,14 +22,25 @@ async def main():
# Create a host with the key pair
host = new_host(key_pair=key_pair, enable_quic=True)
# Configure the listening address
# Configure the listening address using the new paradigm
port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/udp/{port}/quic-v1")
listen_addrs = get_available_interfaces(port, protocol="udp")
# Convert TCP addresses to QUIC-v1 addresses
quic_addrs = []
for addr in listen_addrs:
addr_str = str(addr).replace("/tcp/", "/udp/") + "/quic-v1"
from multiaddr import Multiaddr
quic_addrs.append(Multiaddr(addr_str))
optimal_addr = get_optimal_binding_address(port, protocol="udp")
optimal_quic_str = str(optimal_addr).replace("/tcp/", "/udp/") + "/quic-v1"
# Start the host
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=quic_addrs):
print("libp2p has started with QUIC transport")
print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_quic_str}")
# Keep the host running
await trio.sleep_forever()

View File

@ -1,6 +1,5 @@
import secrets
import multiaddr
import trio
from libp2p import (
@ -13,6 +12,10 @@ from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main():
@ -39,14 +42,16 @@ async def main():
# Create a host with the key pair, Noise security, and mplex multiplexer
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
# Configure the listening address using the new paradigm
port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
print("libp2p has started")
print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running
await trio.sleep_forever()

View File

@ -1,6 +1,5 @@
import secrets
import multiaddr
import trio
from libp2p import (
@ -9,6 +8,10 @@ from libp2p import (
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
async def main():
@ -19,14 +22,16 @@ async def main():
# Create a host with the key pair
host = new_host(key_pair=key_pair)
# Configure the listening address
# Configure the listening address using the new paradigm
port = 8000
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
optimal_addr = get_optimal_binding_address(port)
# Start the host
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=listen_addrs):
print("libp2p has started with TCP transport")
print("libp2p is listening on:", host.get_addrs())
print(f"Optimal address: {optimal_addr}")
# Keep the host running
await trio.sleep_forever()

View File

@ -7,6 +7,7 @@ This example shows how to:
2. Use different load balancing strategies
3. Access multiple connections through the new API
4. Maintain backward compatibility
5. Use the new address paradigm for network configuration
"""
import logging
@ -15,6 +16,7 @@ import trio
from libp2p import new_swarm
from libp2p.network.swarm import ConnectionConfig, RetryConfig
from libp2p.utils import get_available_interfaces, get_optimal_binding_address
# Set up logging
logging.basicConfig(level=logging.INFO)
@ -103,10 +105,45 @@ async def example_backward_compatibility() -> None:
logger.info("Backward compatibility example completed")
async def example_network_address_paradigm() -> None:
"""Example of using the new address paradigm with multiple connections."""
logger.info("Demonstrating network address paradigm...")
# Get available interfaces using the new paradigm
port = 8000 # Example port
available_interfaces = get_available_interfaces(port)
logger.info(f"Available interfaces: {available_interfaces}")
# Get optimal binding address
optimal_address = get_optimal_binding_address(port)
logger.info(f"Optimal binding address: {optimal_address}")
# Create connection config for multiple connections with network awareness
connection_config = ConnectionConfig(
max_connections_per_peer=3, load_balancing_strategy="round_robin"
)
# Create swarm with address paradigm
swarm = new_swarm(connection_config=connection_config)
logger.info("Network address paradigm features:")
logger.info(" - get_available_interfaces() for interface discovery")
logger.info(" - get_optimal_binding_address() for smart address selection")
logger.info(" - Multiple connections with proper network binding")
await swarm.close()
logger.info("Network address paradigm example completed")
async def example_production_ready_config() -> None:
"""Example of production-ready configuration."""
logger.info("Creating swarm with production-ready configuration...")
# Get optimal network configuration using the new paradigm
port = 8001 # Example port
optimal_address = get_optimal_binding_address(port)
logger.info(f"Using optimal binding address: {optimal_address}")
# Production-ready retry configuration
retry_config = RetryConfig(
max_retries=3, # Reasonable retry limit
@ -156,6 +193,9 @@ async def main() -> None:
await example_backward_compatibility()
logger.info("-" * 30)
await example_network_address_paradigm()
logger.info("-" * 30)
await example_production_ready_config()
logger.info("-" * 30)

View File

@ -1,4 +1,5 @@
import argparse
import logging
import random
import secrets
@ -26,8 +27,14 @@ from libp2p.peer.peerinfo import (
from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
# Configure minimal logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("multiaddr").setLevel(logging.WARNING)
logging.getLogger("libp2p").setLevel(logging.WARNING)
PROTOCOL_ID = TProtocol("/echo/1.0.0")
MAX_READ_LEN = 2**32 - 1
@ -76,9 +83,13 @@ async def run(port: int, destination: str, seed: int | None = None) -> None:
for addr in listen_addr:
print(f"{addr}/p2p/{peer_id}")
# Get optimal address for display
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{peer_id}"
print(
"\nRun this from the same folder in another console:\n\n"
f"echo-demo -d {host.get_addrs()[0]}\n"
f"echo-demo -d {optimal_addr_with_peer}\n"
)
print("Waiting for incoming connections...")
await trio.sleep_forever()
@ -114,7 +125,7 @@ def main() -> None:
where <DESTINATION> is the multiaddress of the previous listener host.
"""
example_maddr = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
"/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")

View File

@ -20,6 +20,11 @@ from libp2p.custom_types import TProtocol
from libp2p.network.stream.net_stream import INetStream
from libp2p.peer.peerinfo import info_from_p2p_addr
# Configure minimal logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("multiaddr").setLevel(logging.WARNING)
logging.getLogger("libp2p").setLevel(logging.WARNING)
PROTOCOL_ID = TProtocol("/echo/1.0.0")
@ -38,7 +43,22 @@ async def _echo_stream_handler(stream: INetStream) -> None:
async def run_server(port: int, seed: int | None = None) -> None:
"""Run echo server with QUIC transport."""
listen_addr = Multiaddr(f"/ip4/0.0.0.0/udp/{port}/quic")
from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
if port <= 0:
port = find_free_port()
# For QUIC, we need UDP addresses - use the new address paradigm
tcp_addrs = get_available_interfaces(port)
# Convert TCP addresses to QUIC addresses
quic_addrs = []
for addr in tcp_addrs:
addr_str = str(addr).replace("/tcp/", "/udp/") + "/quic"
quic_addrs.append(Multiaddr(addr_str))
if seed:
import random
@ -58,15 +78,26 @@ async def run_server(port: int, seed: int | None = None) -> None:
)
# Server mode: start listener
async with host.run(listen_addrs=[listen_addr]):
async with host.run(listen_addrs=quic_addrs):
try:
print(f"I am {host.get_id().to_string()}")
host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler)
# Get all available addresses with peer ID
all_addrs = host.get_addrs()
print("Listener ready, listening on:")
for addr in all_addrs:
print(f"{addr}")
# Use optimal address for the client command
optimal_tcp = get_optimal_binding_address(port)
optimal_quic_str = str(optimal_tcp).replace("/tcp/", "/udp/") + "/quic"
peer_id = host.get_id().to_string()
optimal_quic_with_peer = f"{optimal_quic_str}/p2p/{peer_id}"
print(
"Run this from the same folder in another console:\n\n"
f"python3 ./examples/echo/echo_quic.py "
f"-d {host.get_addrs()[0]}\n"
f"\nRun this from the same folder in another console:\n\n"
f"python3 ./examples/echo/echo_quic.py -d {optimal_quic_with_peer}\n"
)
print("Waiting for incoming QUIC connections...")
await trio.sleep_forever()
@ -148,7 +179,7 @@ def main() -> None:
where <DESTINATION> is the QUIC multiaddress of the previous listener host.
"""
example_maddr = "/ip4/127.0.0.1/udp/8000/quic/p2p/QmQn4SwGkDZKkUEpBRBv"
example_maddr = "/ip4/[HOST_IP]/udp/8000/quic/p2p/QmQn4SwGkDZKkUEpBRBv"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-p", "--port", default=0, type=int, help="UDP port number")
@ -173,6 +204,4 @@ def main() -> None:
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("aioquic").setLevel(logging.DEBUG)
main()

View File

@ -20,6 +20,11 @@ from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
# Configure minimal logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("multiaddr").setLevel(logging.WARNING)
logging.getLogger("libp2p").setLevel(logging.WARNING)
logger = logging.getLogger("libp2p.identity.identify-example")
@ -58,11 +63,19 @@ def print_identify_response(identify_response: Identify):
async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
localhost_ip = "0.0.0.0"
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
if not destination:
# Create first host (listener)
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
if port <= 0:
from libp2p.utils.address_validation import find_free_port
port = find_free_port()
listen_addrs = get_available_interfaces(port)
host_a = new_host()
# Set up identify handler with specified format
@ -73,25 +86,49 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
async with (
host_a.run(listen_addrs=[listen_addr]),
host_a.run(listen_addrs=listen_addrs),
trio.open_nursery() as nursery,
):
# Start the peer-store cleanup task
nursery.start_soon(host_a.get_peerstore().start_cleanup_task, 60)
# Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
# connections
server_addr = str(host_a.get_addrs()[0])
client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/")
# Get all available addresses with peer ID
all_addrs = host_a.get_addrs()
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
format_flag = "--raw-format" if not use_varint_format else ""
print(
f"First host listening (using {format_name} format). "
f"Run this from another console:\n\n"
f"identify-demo {format_flag} -d {client_addr}\n"
)
print("Waiting for incoming identify request...")
if use_varint_format:
format_name = "length-prefixed"
print(f"First host listening (using {format_name} format).")
print("Listener ready, listening on:\n")
for addr in all_addrs:
print(f"{addr}")
# Use optimal address for the client command
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = (
f"{optimal_addr}/p2p/{host_a.get_id().to_string()}"
)
print(
f"\nRun this from the same folder in another console:\n\n"
f"identify-demo -d {optimal_addr_with_peer}\n"
)
print("Waiting for incoming identify request...")
else:
format_name = "raw protobuf"
print(f"First host listening (using {format_name} format).")
print("Listener ready, listening on:\n")
for addr in all_addrs:
print(f"{addr}")
# Use optimal address for the client command
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = (
f"{optimal_addr}/p2p/{host_a.get_id().to_string()}"
)
print(
f"\nRun this from the same folder in another console:\n\n"
f"identify-demo -d {optimal_addr_with_peer}\n"
)
print("Waiting for incoming identify request...")
# Add a custom handler to show connection events
async def custom_identify_handler(stream):
@ -134,11 +171,20 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
else:
# Create second host (dialer)
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
if port <= 0:
port = find_free_port()
listen_addrs = get_available_interfaces(port)
host_b = new_host()
async with (
host_b.run(listen_addrs=[listen_addr]),
host_b.run(listen_addrs=listen_addrs),
trio.open_nursery() as nursery,
):
# Start the peer-store cleanup task
@ -234,7 +280,7 @@ def main() -> None:
"""
example_maddr = (
"/ip4/127.0.0.1/tcp/8888/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
"/ip4/[HOST_IP]/tcp/8888/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
@ -258,7 +304,7 @@ def main() -> None:
# Determine format: use varint (length-prefixed) if --raw-format is specified,
# otherwise use raw protobuf format (old format)
use_varint_format = args.raw_format
use_varint_format = not args.raw_format
try:
if args.destination:

View File

@ -36,6 +36,9 @@ from libp2p.identity.identify_push import (
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
)
# Configure logging
logger = logging.getLogger(__name__)
@ -207,13 +210,13 @@ async def main() -> None:
ID_PUSH, create_custom_identify_push_handler(host_2, "Host 2")
)
# Start listening on random ports using the run context manager
listen_addr_1 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
listen_addr_2 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
# Start listening on available interfaces using random ports
listen_addrs_1 = get_available_interfaces(0) # 0 for random port
listen_addrs_2 = get_available_interfaces(0) # 0 for random port
async with (
host_1.run([listen_addr_1]),
host_2.run([listen_addr_2]),
host_1.run(listen_addrs_1),
host_2.run(listen_addrs_2),
trio.open_nursery() as nursery,
):
# Start the peer-store cleanup task

View File

@ -14,7 +14,7 @@ Usage:
python identify_push_listener_dialer.py
# Then in another console, run as a dialer (default port 8889):
python identify_push_listener_dialer.py -d /ip4/127.0.0.1/tcp/8888/p2p/PEER_ID
python identify_push_listener_dialer.py -d /ip4/[HOST_IP]/tcp/8888/p2p/PEER_ID
(where PEER_ID is the peer ID displayed by the listener)
"""
@ -56,6 +56,11 @@ from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
# Configure minimal logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("multiaddr").setLevel(logging.WARNING)
logging.getLogger("libp2p").setLevel(logging.WARNING)
# Configure logging
logger = logging.getLogger("libp2p.identity.identify-push-example")
@ -194,6 +199,11 @@ async def run_listener(
port: int, use_varint_format: bool = True, raw_format_flag: bool = False
) -> None:
"""Run a host in listener mode."""
from libp2p.utils.address_validation import find_free_port, get_available_interfaces
if port <= 0:
port = find_free_port()
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
print(
f"\n==== Starting Identify-Push Listener on port {port} "
@ -215,26 +225,33 @@ async def run_listener(
custom_identify_push_handler_for(host, use_varint_format=use_varint_format),
)
# Start listening
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
# Start listening on all available interfaces
listen_addrs = get_available_interfaces(port)
try:
async with host.run([listen_addr]):
addr = host.get_addrs()[0]
async with host.run(listen_addrs):
all_addrs = host.get_addrs()
logger.info("Listener host ready!")
print("Listener host ready!")
logger.info(f"Listening on: {addr}")
print(f"Listening on: {addr}")
logger.info("Listener ready, listening on:")
print("Listener ready, listening on:")
for addr in all_addrs:
logger.info(f"{addr}")
print(f"{addr}")
logger.info(f"Peer ID: {host.get_id().pretty()}")
print(f"Peer ID: {host.get_id().pretty()}")
print("\nRun dialer with command:")
# Use the first address as the default for the dialer command
default_addr = all_addrs[0]
print("\nRun this from the same folder in another console:")
if raw_format_flag:
print(f"identify-push-listener-dialer-demo -d {addr} --raw-format")
print(
f"identify-push-listener-dialer-demo -d {default_addr} --raw-format"
)
else:
print(f"identify-push-listener-dialer-demo -d {addr}")
print(f"identify-push-listener-dialer-demo -d {default_addr}")
print("\nWaiting for incoming identify/push requests... (Ctrl+C to exit)")
# Keep running until interrupted
@ -274,10 +291,12 @@ async def run_dialer(
identify_push_handler_for(host, use_varint_format=use_varint_format),
)
# Start listening on a different port
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
# Start listening on available interfaces
from libp2p.utils.address_validation import get_available_interfaces
async with host.run([listen_addr]):
listen_addrs = get_available_interfaces(port)
async with host.run(listen_addrs):
logger.info("Dialer host ready!")
print("Dialer host ready!")

View File

@ -150,26 +150,43 @@ async def run_node(
key_pair = create_new_key_pair(secrets.token_bytes(32))
host = new_host(key_pair=key_pair)
listen_addr = Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
listen_addrs = get_available_interfaces(port)
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
# Start the peer-store cleanup task
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
peer_id = host.get_id().pretty()
addr_str = f"/ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}"
# Get all available addresses with peer ID
all_addrs = host.get_addrs()
logger.info("Listener ready, listening on:")
for addr in all_addrs:
logger.info(f"{addr}")
# Use optimal address for the bootstrap command
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
bootstrap_cmd = f"--bootstrap {optimal_addr_with_peer}"
logger.info("To connect to this node, use: %s", bootstrap_cmd)
await connect_to_bootstrap_nodes(host, bootstrap_nodes)
dht = KadDHT(host, dht_mode)
# take all peer ids from the host and add them to the dht
for peer_id in host.get_peerstore().peer_ids():
await dht.routing_table.add_peer(peer_id)
logger.info(f"Connected to bootstrap nodes: {host.get_connected_peers()}")
bootstrap_cmd = f"--bootstrap {addr_str}"
logger.info("To connect to this node, use: %s", bootstrap_cmd)
# Save server address in server mode
if dht_mode == DHTMode.SERVER:
save_server_addr(addr_str)
save_server_addr(str(optimal_addr_with_peer))
# Start the DHT service
async with background_trio_service(dht):

View File

@ -2,7 +2,6 @@ import argparse
import logging
import secrets
import multiaddr
import trio
from libp2p import (
@ -14,6 +13,11 @@ from libp2p.crypto.secp256k1 import (
)
from libp2p.discovery.events.peerDiscovery import peerDiscovery
# Configure minimal logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("multiaddr").setLevel(logging.WARNING)
logging.getLogger("libp2p").setLevel(logging.WARNING)
logger = logging.getLogger("libp2p.discovery.mdns")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
@ -22,34 +26,43 @@ handler.setFormatter(
)
logger.addHandler(handler)
# Set root logger to DEBUG to capture all logs from dependencies
logging.getLogger().setLevel(logging.DEBUG)
def onPeerDiscovery(peerinfo: PeerInfo):
logger.info(f"Discovered: {peerinfo.peer_id}")
async def run(port: int) -> None:
from libp2p.utils.address_validation import find_free_port, get_available_interfaces
if port <= 0:
port = find_free_port()
secret = secrets.token_bytes(32)
key_pair = create_new_key_pair(secret)
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
peerDiscovery.register_peer_discovered_handler(onPeerDiscovery)
print(
"Run this from the same folder in another console to "
"start another peer on a different port:\n\n"
"mdns-demo -p <ANOTHER_PORT>\n"
)
print("Waiting for mDNS peer discovery events...\n")
logger.info("Starting peer Discovery")
host = new_host(key_pair=key_pair, enable_mDNS=True)
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
# Start the peer-store cleanup task
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
# Get all available addresses with peer ID
all_addrs = host.get_addrs()
print("Listener ready, listening on:")
for addr in all_addrs:
print(f"{addr}")
print(
"\nRun this from the same folder in another console to "
"start another peer on a different port:\n\n"
"mdns-demo -p <ANOTHER_PORT>\n"
)
print("Waiting for mDNS peer discovery events...\n")
await trio.sleep_forever()

View File

@ -1,4 +1,5 @@
import argparse
import logging
import multiaddr
import trio
@ -16,6 +17,11 @@ from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
# Configure minimal logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("multiaddr").setLevel(logging.WARNING)
logging.getLogger("libp2p").setLevel(logging.WARNING)
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
PING_LENGTH = 32
RESP_TIMEOUT = 60
@ -55,20 +61,38 @@ async def send_ping(stream: INetStream) -> None:
async def run(port: int, destination: str) -> None:
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
host = new_host(listen_addrs=[listen_addr])
from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
if port <= 0:
port = find_free_port()
listen_addrs = get_available_interfaces(port)
host = new_host(listen_addrs=listen_addrs)
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
# Start the peer-store cleanup task
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
if not destination:
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
# Get all available addresses with peer ID
all_addrs = host.get_addrs()
print("Listener ready, listening on:\n")
for addr in all_addrs:
print(f"{addr}")
# Use optimal address for the client command
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
print(
"Run this from the same folder in another console:\n\n"
f"ping-demo "
f"-d {host.get_addrs()[0]}\n"
f"\nRun this from the same folder in another console:\n\n"
f"ping-demo -d {optimal_addr_with_peer}\n"
)
print("Waiting for incoming connection...")
@ -94,7 +118,7 @@ def main() -> None:
"""
example_maddr = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
"/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)

View File

@ -102,14 +102,16 @@ async def monitor_peer_topics(pubsub, nursery, termination_event):
async def run(topic: str, destination: str | None, port: int | None) -> None:
# Initialize network settings
localhost_ip = "127.0.0.1"
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
)
if port is None or port == 0:
port = find_free_port()
logger.info(f"Using random available port: {port}")
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
listen_addrs = get_available_interfaces(port)
# Create a new libp2p host
host = new_host(
@ -138,12 +140,11 @@ async def run(topic: str, destination: str | None, port: int | None) -> None:
pubsub = Pubsub(host, gossipsub)
termination_event = trio.Event() # Event to signal termination
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
# Start the peer-store cleanup task
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
logger.info(f"Node started with peer ID: {host.get_id()}")
logger.info(f"Listening on: {listen_addr}")
logger.info("Initializing PubSub and GossipSub...")
async with background_trio_service(pubsub):
async with background_trio_service(gossipsub):
@ -157,10 +158,21 @@ async def run(topic: str, destination: str | None, port: int | None) -> None:
if not destination:
# Server mode
# Get all available addresses with peer ID
all_addrs = host.get_addrs()
logger.info("Listener ready, listening on:")
for addr in all_addrs:
logger.info(f"{addr}")
# Use optimal address for the client command
optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = (
f"{optimal_addr}/p2p/{host.get_id().to_string()}"
)
logger.info(
"Run this script in another console with:\n"
f"pubsub-demo "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id()}\n"
f"\nRun this from the same folder in another console:\n\n"
f"pubsub-demo -d {optimal_addr_with_peer}\n"
)
logger.info("Waiting for peers...")
@ -182,11 +194,6 @@ async def run(topic: str, destination: str | None, port: int | None) -> None:
f"Connecting to peer: {info.peer_id} "
f"using protocols: {protocols_in_maddr}"
)
logger.info(
"Run this script in another console with:\n"
f"pubsub-demo "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id()}\n"
)
try:
await host.connect(info)
logger.info(f"Connected to peer: {info.peer_id}")

View File

@ -16,7 +16,6 @@ import random
import secrets
import sys
from multiaddr import Multiaddr
import trio
from libp2p import new_host
@ -130,16 +129,24 @@ async def run_node(port: int, mode: str, demo_interval: int = 30) -> None:
# Create host and DHT
key_pair = create_new_key_pair(secrets.token_bytes(32))
host = new_host(key_pair=key_pair, bootstrap=DEFAULT_BOOTSTRAP_NODES)
listen_addr = Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
from libp2p.utils.address_validation import get_available_interfaces
listen_addrs = get_available_interfaces(port)
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
# Start maintenance tasks
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
nursery.start_soon(maintain_connections, host)
peer_id = host.get_id().pretty()
logger.info(f"Node peer ID: {peer_id}")
logger.info(f"Node address: /ip4/0.0.0.0/tcp/{port}/p2p/{peer_id}")
# Get all available addresses with peer ID
all_addrs = host.get_addrs()
logger.info("Listener ready, listening on:")
for addr in all_addrs:
logger.info(f"{addr}")
# Create and start DHT with Random Walk enabled
dht = KadDHT(host, dht_mode, enable_random_walk=True)

View File

@ -901,7 +901,7 @@ class QUICListener(IListener):
# Set socket options
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) # type: ignore[attr-defined]
# Bind to address
await sock.bind((host, port))

View File

@ -3,38 +3,24 @@ from __future__ import annotations
import socket
from multiaddr import Multiaddr
try:
from multiaddr.utils import ( # type: ignore
get_network_addrs,
get_thin_waist_addresses,
)
_HAS_THIN_WAIST = True
except ImportError: # pragma: no cover - only executed in older environments
_HAS_THIN_WAIST = False
get_thin_waist_addresses = None # type: ignore
get_network_addrs = None # type: ignore
from multiaddr.utils import get_network_addrs, get_thin_waist_addresses
def _safe_get_network_addrs(ip_version: int) -> list[str]:
"""
Internal safe wrapper. Returns a list of IP addresses for the requested IP version.
Falls back to minimal defaults when Thin Waist helpers are missing.
:param ip_version: 4 or 6
"""
if _HAS_THIN_WAIST and get_network_addrs:
try:
return get_network_addrs(ip_version) or []
except Exception: # pragma: no cover - defensive
return []
# Fallback behavior (very conservative)
if ip_version == 4:
return ["127.0.0.1"]
if ip_version == 6:
return ["::1"]
return []
try:
return get_network_addrs(ip_version) or []
except Exception: # pragma: no cover - defensive
# Fallback behavior (very conservative)
if ip_version == 4:
return ["127.0.0.1"]
if ip_version == 6:
return ["::1"]
return []
def find_free_port() -> int:
@ -47,16 +33,13 @@ def find_free_port() -> int:
def _safe_expand(addr: Multiaddr, port: int | None = None) -> list[Multiaddr]:
"""
Internal safe expansion wrapper. Returns a list of Multiaddr objects.
If Thin Waist isn't available, returns [addr] (identity).
"""
if _HAS_THIN_WAIST and get_thin_waist_addresses:
try:
if port is not None:
return get_thin_waist_addresses(addr, port=port) or []
return get_thin_waist_addresses(addr) or []
except Exception: # pragma: no cover - defensive
return [addr]
return [addr]
try:
if port is not None:
return get_thin_waist_addresses(addr, port=port) or []
return get_thin_waist_addresses(addr) or []
except Exception: # pragma: no cover - defensive
return [addr]
def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr]:
@ -73,8 +56,9 @@ def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr
seen_v4: set[str] = set()
for ip in _safe_get_network_addrs(4):
seen_v4.add(ip)
addrs.append(Multiaddr(f"/ip4/{ip}/{protocol}/{port}"))
if ip not in seen_v4: # Avoid duplicates
seen_v4.add(ip)
addrs.append(Multiaddr(f"/ip4/{ip}/{protocol}/{port}"))
# Ensure IPv4 loopback is always included when IPv4 interfaces are discovered
if seen_v4 and "127.0.0.1" not in seen_v4:
@ -89,8 +73,9 @@ def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr
#
# seen_v6: set[str] = set()
# for ip in _safe_get_network_addrs(6):
# seen_v6.add(ip)
# addrs.append(Multiaddr(f"/ip6/{ip}/{protocol}/{port}"))
# if ip not in seen_v6: # Avoid duplicates
# seen_v6.add(ip)
# addrs.append(Multiaddr(f"/ip6/{ip}/{protocol}/{port}"))
#
# # Always include IPv6 loopback for testing purposes when IPv6 is available
# # This ensures IPv6 functionality can be tested even without global IPv6 addresses
@ -99,7 +84,7 @@ def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr
# Fallback if nothing discovered
if not addrs:
addrs.append(Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}"))
addrs.append(Multiaddr(f"/ip4/127.0.0.1/{protocol}/{port}"))
return addrs
@ -120,6 +105,20 @@ def expand_wildcard_address(
return expanded
def get_wildcard_address(port: int, protocol: str = "tcp") -> Multiaddr:
"""
Get wildcard address (0.0.0.0) when explicitly needed.
This function provides access to wildcard binding as a feature when
explicitly required, preserving the ability to bind to all interfaces.
:param port: Port number.
:param protocol: Transport protocol.
:return: A Multiaddr with wildcard binding (0.0.0.0).
"""
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
def get_optimal_binding_address(port: int, protocol: str = "tcp") -> Multiaddr:
"""
Choose an optimal address for an example to bind to:
@ -148,13 +147,14 @@ def get_optimal_binding_address(port: int, protocol: str = "tcp") -> Multiaddr:
if "/ip4/127." in str(c) or "/ip6/::1" in str(c):
return c
# As a final fallback, produce a wildcard
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
# As a final fallback, produce a loopback address
return Multiaddr(f"/ip4/127.0.0.1/{protocol}/{port}")
__all__ = [
"get_available_interfaces",
"get_optimal_binding_address",
"get_wildcard_address",
"expand_wildcard_address",
"find_free_port",
]

View File

@ -0,0 +1,2 @@
Updated all example scripts and core modules to use secure loopback addresses instead of wildcard addresses for network binding.
The `get_wildcard_address` function and related logic now utilize all available interfaces safely, improving security and consistency across the codebase.

View File

@ -65,7 +65,7 @@ async def test_prune_backoff():
@pytest.mark.trio
async def test_unsubscribe_backoff():
async with PubsubFactory.create_batch_with_gossipsub(
2, heartbeat_interval=1, prune_back_off=1, unsubscribe_back_off=2
2, heartbeat_interval=0.5, prune_back_off=2, unsubscribe_back_off=4
) as pubsubs:
gsub0 = pubsubs[0].router
gsub1 = pubsubs[1].router
@ -107,7 +107,8 @@ async def test_unsubscribe_backoff():
)
# try to graft again (should succeed after backoff)
await trio.sleep(1)
# Wait longer than unsubscribe_back_off (4 seconds) + some buffer
await trio.sleep(4.5)
await gsub0.emit_graft(topic, host_1.get_id())
await trio.sleep(1)
assert host_0.get_id() in gsub1.mesh[topic], (

View File

@ -0,0 +1,117 @@
"""
Tests to verify that all examples use the new address paradigm consistently
"""
from pathlib import Path
class TestExamplesAddressParadigm:
"""Test suite to verify all examples use the new address paradigm consistently"""
def get_example_files(self):
"""Get all Python files in the examples directory"""
examples_dir = Path("examples")
return list(examples_dir.rglob("*.py"))
def check_file_for_wildcard_binding(self, filepath):
"""Check if a file contains 0.0.0.0 binding"""
with open(filepath, encoding="utf-8") as f:
content = f.read()
# Check for various forms of wildcard binding
wildcard_patterns = [
"0.0.0.0",
"/ip4/0.0.0.0/",
]
found_wildcards = []
for line_num, line in enumerate(content.splitlines(), 1):
for pattern in wildcard_patterns:
if pattern in line and not line.strip().startswith("#"):
found_wildcards.append((line_num, line.strip()))
return found_wildcards
def test_examples_use_address_paradigm(self):
"""Test that examples use the new address paradigm functions"""
example_files = self.get_example_files()
# Files that should use the new paradigm
networking_examples = [
"echo/echo.py",
"chat/chat.py",
"ping/ping.py",
"bootstrap/bootstrap.py",
"pubsub/pubsub.py",
"identify/identify.py",
]
paradigm_functions = [
"get_available_interfaces",
"get_optimal_binding_address",
]
for filename in networking_examples:
filepath = None
for example_file in example_files:
if filename in str(example_file):
filepath = example_file
break
if filepath is None:
continue
with open(filepath, encoding="utf-8") as f:
content = f.read()
# Check that the file uses the new paradigm functions
for func in paradigm_functions:
assert func in content, (
f"{filepath} should use {func} from the new address paradigm"
)
def test_wildcard_available_as_feature(self):
"""Test that wildcard is available as a feature when needed"""
example_files = self.get_example_files()
# Check that network_discover.py demonstrates wildcard usage
network_discover_file = None
for example_file in example_files:
if "network_discover.py" in str(example_file):
network_discover_file = example_file
break
if network_discover_file:
with open(network_discover_file, encoding="utf-8") as f:
content = f.read()
# Should demonstrate wildcard expansion
assert "0.0.0.0" in content, (
f"{network_discover_file} should demonstrate wildcard usage"
)
assert "expand_wildcard_address" in content, (
f"{network_discover_file} should use expand_wildcard_address"
)
def test_doc_examples_use_paradigm(self):
"""Test that documentation examples use the new address paradigm"""
doc_examples_dir = Path("examples/doc-examples")
if not doc_examples_dir.exists():
return
doc_example_files = list(doc_examples_dir.glob("*.py"))
paradigm_functions = [
"get_available_interfaces",
"get_optimal_binding_address",
]
for filepath in doc_example_files:
with open(filepath, encoding="utf-8") as f:
content = f.read()
# Check that doc examples use the new paradigm
for func in paradigm_functions:
assert func in content, (
f"Documentation example {filepath} should use {func}"
)

View File

@ -0,0 +1,206 @@
"""
Tests for the new address paradigm with wildcard support as a feature
"""
import pytest
from multiaddr import Multiaddr
from libp2p import new_host
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
get_wildcard_address,
)
class TestAddressParadigm:
"""
Test suite for verifying the new address paradigm:
- get_available_interfaces() returns all available interfaces
- get_optimal_binding_address() returns optimal address for examples
- get_wildcard_address() provides wildcard as a feature when needed
"""
def test_wildcard_address_function(self):
"""Test that get_wildcard_address() provides wildcard as a feature"""
port = 8000
addr = get_wildcard_address(port)
# Should return wildcard address when explicitly requested
assert "0.0.0.0" in str(addr)
addr_str = str(addr)
assert "/ip4/" in addr_str
assert f"/tcp/{port}" in addr_str
def test_optimal_binding_address_selection(self):
"""Test that optimal binding address uses good heuristics"""
port = 8000
addr = get_optimal_binding_address(port)
# Should return a valid IP address (could be loopback or local network)
addr_str = str(addr)
assert "/ip4/" in addr_str
assert f"/tcp/{port}" in addr_str
# Should be from available interfaces
available_interfaces = get_available_interfaces(port)
assert addr in available_interfaces
def test_available_interfaces_includes_loopback(self):
"""Test that available interfaces always includes loopback address"""
port = 8000
interfaces = get_available_interfaces(port)
# Should have at least one interface
assert len(interfaces) > 0
# Should include loopback address
loopback_found = any("127.0.0.1" in str(addr) for addr in interfaces)
assert loopback_found, "Loopback address not found in available interfaces"
# Available interfaces should not include wildcard by default
# (wildcard is available as a feature through get_wildcard_address())
wildcard_found = any("0.0.0.0" in str(addr) for addr in interfaces)
assert not wildcard_found, (
"Wildcard should not be in default available interfaces"
)
def test_host_default_listen_address(self):
"""Test that new hosts use secure default addresses"""
# Create a host with a specific port
port = 8000
listen_addr = Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")
host = new_host(listen_addrs=[listen_addr])
# Verify the host configuration
assert host is not None
# Note: We can't test actual binding without running the host,
# but we've verified the address format is correct
def test_paradigm_consistency(self):
"""Test that the address paradigm is consistent"""
port = 8000
# get_optimal_binding_address should return a valid address
optimal_addr = get_optimal_binding_address(port)
assert "/ip4/" in str(optimal_addr)
assert f"/tcp/{port}" in str(optimal_addr)
# get_wildcard_address should return wildcard when explicitly needed
wildcard_addr = get_wildcard_address(port)
assert "0.0.0.0" in str(wildcard_addr)
assert f"/tcp/{port}" in str(wildcard_addr)
# Both should be valid Multiaddr objects
assert isinstance(optimal_addr, Multiaddr)
assert isinstance(wildcard_addr, Multiaddr)
@pytest.mark.parametrize("protocol", ["tcp", "udp"])
def test_different_protocols_support(self, protocol):
"""Test that different protocols are supported by the paradigm"""
port = 8000
# Test optimal address with different protocols
optimal_addr = get_optimal_binding_address(port, protocol=protocol)
assert protocol in str(optimal_addr)
assert f"/{protocol}/{port}" in str(optimal_addr)
# Test wildcard address with different protocols
wildcard_addr = get_wildcard_address(port, protocol=protocol)
assert "0.0.0.0" in str(wildcard_addr)
assert protocol in str(wildcard_addr)
assert f"/{protocol}/{port}" in str(wildcard_addr)
# Test available interfaces with different protocols
interfaces = get_available_interfaces(port, protocol=protocol)
assert len(interfaces) > 0
for addr in interfaces:
assert protocol in str(addr)
def test_wildcard_available_as_feature(self):
"""Test that wildcard binding is available as a feature when needed"""
port = 8000
# Wildcard should be available through get_wildcard_address()
wildcard_addr = get_wildcard_address(port)
assert "0.0.0.0" in str(wildcard_addr)
# But should not be in default available interfaces
interfaces = get_available_interfaces(port)
wildcard_in_interfaces = any("0.0.0.0" in str(addr) for addr in interfaces)
assert not wildcard_in_interfaces, (
"Wildcard should not be in default interfaces"
)
# Optimal address should not be wildcard by default
optimal = get_optimal_binding_address(port)
assert "0.0.0.0" not in str(optimal), (
"Optimal address should not be wildcard by default"
)
def test_loopback_is_always_available(self):
"""Test that loopback address is always available as an option"""
port = 8000
interfaces = get_available_interfaces(port)
# Loopback should always be available
loopback_addrs = [addr for addr in interfaces if "127.0.0.1" in str(addr)]
assert len(loopback_addrs) > 0, "Loopback address should always be available"
# At least one loopback address should have the correct port
loopback_with_port = [
addr for addr in loopback_addrs if f"/tcp/{port}" in str(addr)
]
assert len(loopback_with_port) > 0, (
f"Loopback address with port {port} should be available"
)
def test_optimal_address_selection_behavior(self):
"""Test that optimal address selection works correctly"""
port = 8000
interfaces = get_available_interfaces(port)
optimal = get_optimal_binding_address(port)
# Should return one of the available interfaces
optimal_str = str(optimal)
interface_strs = [str(addr) for addr in interfaces]
assert optimal_str in interface_strs, (
f"Optimal address {optimal_str} should be in available interfaces"
)
# Should prefer non-loopback when available, fallback to loopback
non_loopback_interfaces = [
addr for addr in interfaces if "127.0.0.1" not in str(addr)
]
if non_loopback_interfaces:
# Should prefer non-loopback when available
assert "127.0.0.1" not in str(optimal), (
"Should prefer non-loopback when available"
)
else:
# Should use loopback when no other interfaces available
assert "127.0.0.1" in str(optimal), (
"Should use loopback when no other interfaces available"
)
def test_address_paradigm_completeness(self):
"""Test that the address paradigm provides all necessary functionality"""
port = 8000
# Test that we get interface options
interfaces = get_available_interfaces(port)
assert len(interfaces) >= 1, "Should have at least one interface"
# Test that loopback is always included
has_loopback = any("127.0.0.1" in str(addr) for addr in interfaces)
assert has_loopback, "Loopback should always be available"
# Test that wildcard is available as a feature
wildcard_addr = get_wildcard_address(port)
assert "0.0.0.0" in str(wildcard_addr)
# Test optimal selection
optimal = get_optimal_binding_address(port)
assert optimal in interfaces, (
"Optimal address should be from available interfaces"
)