Merge branch 'main' into enhancement/yamuxstream-lock

This commit is contained in:
Manu Sheel Gupta
2025-09-23 00:43:22 +05:30
committed by GitHub
61 changed files with 6444 additions and 223 deletions

View File

@ -65,7 +65,7 @@ async def test_prune_backoff():
@pytest.mark.trio
async def test_unsubscribe_backoff():
async with PubsubFactory.create_batch_with_gossipsub(
2, heartbeat_interval=1, prune_back_off=1, unsubscribe_back_off=2
2, heartbeat_interval=0.5, prune_back_off=2, unsubscribe_back_off=4
) as pubsubs:
gsub0 = pubsubs[0].router
gsub1 = pubsubs[1].router
@ -107,7 +107,8 @@ async def test_unsubscribe_backoff():
)
# try to graft again (should succeed after backoff)
await trio.sleep(1)
# Wait longer than unsubscribe_back_off (4 seconds) + some buffer
await trio.sleep(4.5)
await gsub0.emit_graft(topic, host_1.get_id())
await trio.sleep(1)
assert host_0.get_id() in gsub1.mesh[topic], (

View File

@ -0,0 +1,324 @@
"""
Tests for the transport registry functionality.
"""
from multiaddr import Multiaddr
from libp2p.abc import IListener, IRawConnection, ITransport
from libp2p.custom_types import THandler
from libp2p.transport.tcp.tcp import TCP
from libp2p.transport.transport_registry import (
TransportRegistry,
create_transport_for_multiaddr,
get_supported_transport_protocols,
get_transport_registry,
register_transport,
)
from libp2p.transport.upgrader import TransportUpgrader
from libp2p.transport.websocket.transport import WebsocketTransport
class TestTransportRegistry:
"""Test the TransportRegistry class."""
def test_init(self):
"""Test registry initialization."""
registry = TransportRegistry()
assert isinstance(registry, TransportRegistry)
# Check that default transports are registered
supported = registry.get_supported_protocols()
assert "tcp" in supported
assert "ws" in supported
def test_register_transport(self):
"""Test transport registration."""
registry = TransportRegistry()
# Register a custom transport
class CustomTransport(ITransport):
async def dial(self, maddr: Multiaddr) -> IRawConnection:
raise NotImplementedError("CustomTransport dial not implemented")
def create_listener(self, handler_function: THandler) -> IListener:
raise NotImplementedError(
"CustomTransport create_listener not implemented"
)
registry.register_transport("custom", CustomTransport)
assert registry.get_transport("custom") == CustomTransport
def test_get_transport(self):
"""Test getting registered transports."""
registry = TransportRegistry()
# Test existing transports
assert registry.get_transport("tcp") == TCP
assert registry.get_transport("ws") == WebsocketTransport
# Test non-existent transport
assert registry.get_transport("nonexistent") is None
def test_get_supported_protocols(self):
"""Test getting supported protocols."""
registry = TransportRegistry()
protocols = registry.get_supported_protocols()
assert isinstance(protocols, list)
assert "tcp" in protocols
assert "ws" in protocols
def test_create_transport_tcp(self):
"""Test creating TCP transport."""
registry = TransportRegistry()
upgrader = TransportUpgrader({}, {})
transport = registry.create_transport("tcp", upgrader)
assert isinstance(transport, TCP)
def test_create_transport_websocket(self):
"""Test creating WebSocket transport."""
registry = TransportRegistry()
upgrader = TransportUpgrader({}, {})
transport = registry.create_transport("ws", upgrader)
assert isinstance(transport, WebsocketTransport)
def test_create_transport_invalid_protocol(self):
"""Test creating transport with invalid protocol."""
registry = TransportRegistry()
upgrader = TransportUpgrader({}, {})
transport = registry.create_transport("invalid", upgrader)
assert transport is None
def test_create_transport_websocket_no_upgrader(self):
"""Test that WebSocket transport requires upgrader."""
registry = TransportRegistry()
# This should fail gracefully and return None
transport = registry.create_transport("ws", None)
assert transport is None
class TestGlobalRegistry:
"""Test the global registry functions."""
def test_get_transport_registry(self):
"""Test getting the global registry."""
registry = get_transport_registry()
assert isinstance(registry, TransportRegistry)
def test_register_transport_global(self):
"""Test registering transport globally."""
class GlobalCustomTransport(ITransport):
async def dial(self, maddr: Multiaddr) -> IRawConnection:
raise NotImplementedError("GlobalCustomTransport dial not implemented")
def create_listener(self, handler_function: THandler) -> IListener:
raise NotImplementedError(
"GlobalCustomTransport create_listener not implemented"
)
# Register globally
register_transport("global_custom", GlobalCustomTransport)
# Check that it's available
registry = get_transport_registry()
assert registry.get_transport("global_custom") == GlobalCustomTransport
def test_get_supported_transport_protocols_global(self):
"""Test getting supported protocols from global registry."""
protocols = get_supported_transport_protocols()
assert isinstance(protocols, list)
assert "tcp" in protocols
assert "ws" in protocols
class TestTransportFactory:
"""Test the transport factory functions."""
def test_create_transport_for_multiaddr_tcp(self):
"""Test creating transport for TCP multiaddr."""
upgrader = TransportUpgrader({}, {})
# TCP multiaddr
maddr = Multiaddr("/ip4/127.0.0.1/tcp/8080")
transport = create_transport_for_multiaddr(maddr, upgrader)
assert transport is not None
assert isinstance(transport, TCP)
def test_create_transport_for_multiaddr_websocket(self):
"""Test creating transport for WebSocket multiaddr."""
upgrader = TransportUpgrader({}, {})
# WebSocket multiaddr
maddr = Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")
transport = create_transport_for_multiaddr(maddr, upgrader)
assert transport is not None
assert isinstance(transport, WebsocketTransport)
def test_create_transport_for_multiaddr_websocket_secure(self):
"""Test creating transport for WebSocket multiaddr."""
upgrader = TransportUpgrader({}, {})
# WebSocket multiaddr
maddr = Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")
transport = create_transport_for_multiaddr(maddr, upgrader)
assert transport is not None
assert isinstance(transport, WebsocketTransport)
def test_create_transport_for_multiaddr_ipv6(self):
"""Test creating transport for IPv6 multiaddr."""
upgrader = TransportUpgrader({}, {})
# IPv6 WebSocket multiaddr
maddr = Multiaddr("/ip6/::1/tcp/8080/ws")
transport = create_transport_for_multiaddr(maddr, upgrader)
assert transport is not None
assert isinstance(transport, WebsocketTransport)
def test_create_transport_for_multiaddr_dns(self):
"""Test creating transport for DNS multiaddr."""
upgrader = TransportUpgrader({}, {})
# DNS WebSocket multiaddr
maddr = Multiaddr("/dns4/example.com/tcp/443/ws")
transport = create_transport_for_multiaddr(maddr, upgrader)
assert transport is not None
assert isinstance(transport, WebsocketTransport)
def test_create_transport_for_multiaddr_unknown(self):
"""Test creating transport for unknown multiaddr."""
upgrader = TransportUpgrader({}, {})
# Unknown multiaddr
maddr = Multiaddr("/ip4/127.0.0.1/udp/8080")
transport = create_transport_for_multiaddr(maddr, upgrader)
assert transport is None
def test_create_transport_for_multiaddr_with_upgrader(self):
"""Test creating transport with upgrader."""
upgrader = TransportUpgrader({}, {})
# This should work for both TCP and WebSocket with upgrader
maddr_tcp = Multiaddr("/ip4/127.0.0.1/tcp/8080")
transport_tcp = create_transport_for_multiaddr(maddr_tcp, upgrader)
assert transport_tcp is not None
maddr_ws = Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")
transport_ws = create_transport_for_multiaddr(maddr_ws, upgrader)
assert transport_ws is not None
class TestTransportInterfaceCompliance:
"""Test that all transports implement the required interface."""
def test_tcp_implements_itransport(self):
"""Test that TCP transport implements ITransport."""
transport = TCP()
assert isinstance(transport, ITransport)
assert hasattr(transport, "dial")
assert hasattr(transport, "create_listener")
assert callable(transport.dial)
assert callable(transport.create_listener)
def test_websocket_implements_itransport(self):
"""Test that WebSocket transport implements ITransport."""
upgrader = TransportUpgrader({}, {})
transport = WebsocketTransport(upgrader)
assert isinstance(transport, ITransport)
assert hasattr(transport, "dial")
assert hasattr(transport, "create_listener")
assert callable(transport.dial)
assert callable(transport.create_listener)
class TestErrorHandling:
"""Test error handling in the transport registry."""
def test_create_transport_with_exception(self):
"""Test handling of transport creation exceptions."""
registry = TransportRegistry()
upgrader = TransportUpgrader({}, {})
# Register a transport that raises an exception
class ExceptionTransport(ITransport):
def __init__(self, *args, **kwargs):
raise RuntimeError("Transport creation failed")
async def dial(self, maddr: Multiaddr) -> IRawConnection:
raise NotImplementedError("ExceptionTransport dial not implemented")
def create_listener(self, handler_function: THandler) -> IListener:
raise NotImplementedError(
"ExceptionTransport create_listener not implemented"
)
registry.register_transport("exception", ExceptionTransport)
# Should handle exception gracefully and return None
transport = registry.create_transport("exception", upgrader)
assert transport is None
def test_invalid_multiaddr_handling(self):
"""Test handling of invalid multiaddrs."""
upgrader = TransportUpgrader({}, {})
# Test with a multiaddr that has an unsupported transport protocol
# This should be handled gracefully by our transport registry
# udp is not a supported transport
maddr = Multiaddr("/ip4/127.0.0.1/tcp/8080/udp/1234")
transport = create_transport_for_multiaddr(maddr, upgrader)
assert transport is None
class TestIntegration:
"""Test integration scenarios."""
def test_multiple_transport_types(self):
"""Test using multiple transport types in the same registry."""
registry = TransportRegistry()
upgrader = TransportUpgrader({}, {})
# Create different transport types
tcp_transport = registry.create_transport("tcp", upgrader)
ws_transport = registry.create_transport("ws", upgrader)
# All should be different types
assert isinstance(tcp_transport, TCP)
assert isinstance(ws_transport, WebsocketTransport)
# All should be different instances
assert tcp_transport is not ws_transport
def test_transport_registry_persistence(self):
"""Test that transport registry persists across calls."""
registry1 = get_transport_registry()
registry2 = get_transport_registry()
# Should be the same instance
assert registry1 is registry2
# Register a transport in one
class PersistentTransport(ITransport):
async def dial(self, maddr: Multiaddr) -> IRawConnection:
raise NotImplementedError("PersistentTransport dial not implemented")
def create_listener(self, handler_function: THandler) -> IListener:
raise NotImplementedError(
"PersistentTransport create_listener not implemented"
)
registry1.register_transport("persistent", PersistentTransport)
# Should be available in the other
assert registry2.get_transport("persistent") == PersistentTransport

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,532 @@
#!/usr/bin/env python3
"""
Python-to-Python WebSocket peer-to-peer tests.
This module tests real WebSocket communication between two Python libp2p hosts,
including both WS and WSS (WebSocket Secure) scenarios.
"""
import pytest
from multiaddr import Multiaddr
from libp2p import create_yamux_muxer_option, new_host
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
from libp2p.custom_types import TProtocol
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
from libp2p.transport.websocket.multiaddr_utils import (
is_valid_websocket_multiaddr,
parse_websocket_multiaddr,
)
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
PING_LENGTH = 32
@pytest.mark.trio
async def test_websocket_p2p_plaintext():
"""Test Python-to-Python WebSocket communication with plaintext security."""
# Create two hosts with plaintext security
key_pair_a = create_new_key_pair()
key_pair_b = create_new_key_pair()
# Host A (listener) - use only plaintext security
security_options_a = {
PLAINTEXT_PROTOCOL_ID: InsecureTransport(
local_key_pair=key_pair_a, secure_bytes_provider=None, peerstore=None
)
}
host_a = new_host(
key_pair=key_pair_a,
sec_opt=security_options_a,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")],
)
# Host B (dialer) - use only plaintext security
security_options_b = {
PLAINTEXT_PROTOCOL_ID: InsecureTransport(
local_key_pair=key_pair_b, secure_bytes_provider=None, peerstore=None
)
}
host_b = new_host(
key_pair=key_pair_b,
sec_opt=security_options_b,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")], # Ensure WebSocket
# transport
)
# Test data
test_data = b"Hello WebSocket P2P!"
received_data = None
# Set up ping handler on host A
async def ping_handler(stream):
nonlocal received_data
received_data = await stream.read(len(test_data))
await stream.write(received_data) # Echo back
await stream.close()
host_a.set_stream_handler(PING_PROTOCOL_ID, ping_handler)
# Start both hosts
async with (
host_a.run(listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")]),
host_b.run(listen_addrs=[]),
):
# Get host A's listen address
listen_addrs = host_a.get_addrs()
assert len(listen_addrs) > 0
# Find the WebSocket address
ws_addr = None
for addr in listen_addrs:
if "/ws" in str(addr):
ws_addr = addr
break
assert ws_addr is not None, "No WebSocket listen address found"
assert is_valid_websocket_multiaddr(ws_addr), "Invalid WebSocket multiaddr"
# Parse the WebSocket multiaddr
parsed = parse_websocket_multiaddr(ws_addr)
assert not parsed.is_wss, "Should be plain WebSocket, not WSS"
assert parsed.sni is None, "SNI should be None for plain WebSocket"
# Connect host B to host A
from libp2p.peer.peerinfo import info_from_p2p_addr
peer_info = info_from_p2p_addr(ws_addr)
await host_b.connect(peer_info)
# Create stream and test communication
stream = await host_b.new_stream(host_a.get_id(), [PING_PROTOCOL_ID])
await stream.write(test_data)
response = await stream.read(len(test_data))
await stream.close()
# Verify communication
assert received_data == test_data, f"Expected {test_data}, got {received_data}"
assert response == test_data, f"Expected echo {test_data}, got {response}"
@pytest.mark.trio
async def test_websocket_p2p_noise():
"""Test Python-to-Python WebSocket communication with Noise security."""
# Create two hosts with Noise security
key_pair_a = create_new_key_pair()
key_pair_b = create_new_key_pair()
noise_key_pair_a = create_new_x25519_key_pair()
noise_key_pair_b = create_new_x25519_key_pair()
# Host A (listener)
security_options_a = {
NOISE_PROTOCOL_ID: NoiseTransport(
libp2p_keypair=key_pair_a,
noise_privkey=noise_key_pair_a.private_key,
early_data=None,
with_noise_pipes=False,
)
}
host_a = new_host(
key_pair=key_pair_a,
sec_opt=security_options_a,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")],
)
# Host B (dialer)
security_options_b = {
NOISE_PROTOCOL_ID: NoiseTransport(
libp2p_keypair=key_pair_b,
noise_privkey=noise_key_pair_b.private_key,
early_data=None,
with_noise_pipes=False,
)
}
host_b = new_host(
key_pair=key_pair_b,
sec_opt=security_options_b,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")], # Ensure WebSocket
# transport
)
# Test data
test_data = b"Hello WebSocket P2P with Noise!"
received_data = None
# Set up ping handler on host A
async def ping_handler(stream):
nonlocal received_data
received_data = await stream.read(len(test_data))
await stream.write(received_data) # Echo back
await stream.close()
host_a.set_stream_handler(PING_PROTOCOL_ID, ping_handler)
# Start both hosts
async with (
host_a.run(listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")]),
host_b.run(listen_addrs=[]),
):
# Get host A's listen address
listen_addrs = host_a.get_addrs()
assert len(listen_addrs) > 0
# Find the WebSocket address
ws_addr = None
for addr in listen_addrs:
if "/ws" in str(addr):
ws_addr = addr
break
assert ws_addr is not None, "No WebSocket listen address found"
assert is_valid_websocket_multiaddr(ws_addr), "Invalid WebSocket multiaddr"
# Parse the WebSocket multiaddr
parsed = parse_websocket_multiaddr(ws_addr)
assert not parsed.is_wss, "Should be plain WebSocket, not WSS"
assert parsed.sni is None, "SNI should be None for plain WebSocket"
# Connect host B to host A
from libp2p.peer.peerinfo import info_from_p2p_addr
peer_info = info_from_p2p_addr(ws_addr)
await host_b.connect(peer_info)
# Create stream and test communication
stream = await host_b.new_stream(host_a.get_id(), [PING_PROTOCOL_ID])
await stream.write(test_data)
response = await stream.read(len(test_data))
await stream.close()
# Verify communication
assert received_data == test_data, f"Expected {test_data}, got {received_data}"
assert response == test_data, f"Expected echo {test_data}, got {response}"
@pytest.mark.trio
async def test_websocket_p2p_libp2p_ping():
"""Test Python-to-Python WebSocket communication using libp2p ping protocol."""
# Create two hosts with Noise security
key_pair_a = create_new_key_pair()
key_pair_b = create_new_key_pair()
noise_key_pair_a = create_new_x25519_key_pair()
noise_key_pair_b = create_new_x25519_key_pair()
# Host A (listener)
security_options_a = {
NOISE_PROTOCOL_ID: NoiseTransport(
libp2p_keypair=key_pair_a,
noise_privkey=noise_key_pair_a.private_key,
early_data=None,
with_noise_pipes=False,
)
}
host_a = new_host(
key_pair=key_pair_a,
sec_opt=security_options_a,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")],
)
# Host B (dialer)
security_options_b = {
NOISE_PROTOCOL_ID: NoiseTransport(
libp2p_keypair=key_pair_b,
noise_privkey=noise_key_pair_b.private_key,
early_data=None,
with_noise_pipes=False,
)
}
host_b = new_host(
key_pair=key_pair_b,
sec_opt=security_options_b,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")], # Ensure WebSocket
# transport
)
# Set up ping handler on host A (standard libp2p ping protocol)
async def ping_handler(stream):
# Read ping data (32 bytes)
ping_data = await stream.read(PING_LENGTH)
# Echo back the same data (pong)
await stream.write(ping_data)
await stream.close()
host_a.set_stream_handler(PING_PROTOCOL_ID, ping_handler)
# Start both hosts
async with (
host_a.run(listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")]),
host_b.run(listen_addrs=[]),
):
# Get host A's listen address
listen_addrs = host_a.get_addrs()
assert len(listen_addrs) > 0
# Find the WebSocket address
ws_addr = None
for addr in listen_addrs:
if "/ws" in str(addr):
ws_addr = addr
break
assert ws_addr is not None, "No WebSocket listen address found"
# Connect host B to host A
from libp2p.peer.peerinfo import info_from_p2p_addr
peer_info = info_from_p2p_addr(ws_addr)
await host_b.connect(peer_info)
# Create stream and test libp2p ping protocol
stream = await host_b.new_stream(host_a.get_id(), [PING_PROTOCOL_ID])
# Send ping (32 bytes as per libp2p ping protocol)
ping_data = b"\x01" * PING_LENGTH
await stream.write(ping_data)
# Receive pong (should be same 32 bytes)
pong_data = await stream.read(PING_LENGTH)
await stream.close()
# Verify ping-pong
assert pong_data == ping_data, (
f"Expected ping {ping_data}, got pong {pong_data}"
)
@pytest.mark.trio
async def test_websocket_p2p_multiple_streams():
"""
Test Python-to-Python WebSocket communication with multiple concurrent
streams.
"""
# Create two hosts with Noise security
key_pair_a = create_new_key_pair()
key_pair_b = create_new_key_pair()
noise_key_pair_a = create_new_x25519_key_pair()
noise_key_pair_b = create_new_x25519_key_pair()
# Host A (listener)
security_options_a = {
NOISE_PROTOCOL_ID: NoiseTransport(
libp2p_keypair=key_pair_a,
noise_privkey=noise_key_pair_a.private_key,
early_data=None,
with_noise_pipes=False,
)
}
host_a = new_host(
key_pair=key_pair_a,
sec_opt=security_options_a,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")],
)
# Host B (dialer)
security_options_b = {
NOISE_PROTOCOL_ID: NoiseTransport(
libp2p_keypair=key_pair_b,
noise_privkey=noise_key_pair_b.private_key,
early_data=None,
with_noise_pipes=False,
)
}
host_b = new_host(
key_pair=key_pair_b,
sec_opt=security_options_b,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")], # Ensure WebSocket
# transport
)
# Test protocol
test_protocol = TProtocol("/test/multiple/streams/1.0.0")
received_data = []
# Set up handler on host A
async def test_handler(stream):
data = await stream.read(1024)
received_data.append(data)
await stream.write(data) # Echo back
await stream.close()
host_a.set_stream_handler(test_protocol, test_handler)
# Start both hosts
async with (
host_a.run(listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")]),
host_b.run(listen_addrs=[]),
):
# Get host A's listen address
listen_addrs = host_a.get_addrs()
ws_addr = None
for addr in listen_addrs:
if "/ws" in str(addr):
ws_addr = addr
break
assert ws_addr is not None, "No WebSocket listen address found"
# Connect host B to host A
from libp2p.peer.peerinfo import info_from_p2p_addr
peer_info = info_from_p2p_addr(ws_addr)
await host_b.connect(peer_info)
# Create multiple concurrent streams
num_streams = 5
test_data_list = [f"Stream {i} data".encode() for i in range(num_streams)]
async def create_stream_and_test(stream_id: int, data: bytes):
stream = await host_b.new_stream(host_a.get_id(), [test_protocol])
await stream.write(data)
response = await stream.read(len(data))
await stream.close()
return response
# Run all streams concurrently
tasks = [
create_stream_and_test(i, test_data_list[i]) for i in range(num_streams)
]
responses = []
for task in tasks:
responses.append(await task)
# Verify all communications
assert len(received_data) == num_streams, (
f"Expected {num_streams} received messages, got {len(received_data)}"
)
for i, (sent, received, response) in enumerate(
zip(test_data_list, received_data, responses)
):
assert received == sent, f"Stream {i}: Expected {sent}, got {received}"
assert response == sent, f"Stream {i}: Expected echo {sent}, got {response}"
@pytest.mark.trio
async def test_websocket_p2p_connection_state():
"""Test WebSocket connection state tracking and metadata."""
# Create two hosts with Noise security
key_pair_a = create_new_key_pair()
key_pair_b = create_new_key_pair()
noise_key_pair_a = create_new_x25519_key_pair()
noise_key_pair_b = create_new_x25519_key_pair()
# Host A (listener)
security_options_a = {
NOISE_PROTOCOL_ID: NoiseTransport(
libp2p_keypair=key_pair_a,
noise_privkey=noise_key_pair_a.private_key,
early_data=None,
with_noise_pipes=False,
)
}
host_a = new_host(
key_pair=key_pair_a,
sec_opt=security_options_a,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")],
)
# Host B (dialer)
security_options_b = {
NOISE_PROTOCOL_ID: NoiseTransport(
libp2p_keypair=key_pair_b,
noise_privkey=noise_key_pair_b.private_key,
early_data=None,
with_noise_pipes=False,
)
}
host_b = new_host(
key_pair=key_pair_b,
sec_opt=security_options_b,
muxer_opt=create_yamux_muxer_option(),
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")], # Ensure WebSocket
# transport
)
# Set up handler on host A
async def test_handler(stream):
# Read some data
await stream.read(1024)
# Write some data back
await stream.write(b"Response data")
await stream.close()
host_a.set_stream_handler(PING_PROTOCOL_ID, test_handler)
# Start both hosts
async with (
host_a.run(listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0/ws")]),
host_b.run(listen_addrs=[]),
):
# Get host A's listen address
listen_addrs = host_a.get_addrs()
ws_addr = None
for addr in listen_addrs:
if "/ws" in str(addr):
ws_addr = addr
break
assert ws_addr is not None, "No WebSocket listen address found"
# Connect host B to host A
from libp2p.peer.peerinfo import info_from_p2p_addr
peer_info = info_from_p2p_addr(ws_addr)
await host_b.connect(peer_info)
# Create stream and test communication
stream = await host_b.new_stream(host_a.get_id(), [PING_PROTOCOL_ID])
await stream.write(b"Test data for connection state")
response = await stream.read(1024)
await stream.close()
# Verify response
assert response == b"Response data", f"Expected 'Response data', got {response}"
# Test connection state (if available)
# Note: This tests the connection state tracking we implemented
connections = host_b.get_network().connections
assert len(connections) > 0, "Should have at least one connection"
# Get the connection to host A
conn_to_a = None
for peer_id, conn_list in connections.items():
if peer_id == host_a.get_id():
# connections maps peer_id to list of connections, get the first one
conn_to_a = conn_list[0] if conn_list else None
break
assert conn_to_a is not None, "Should have connection to host A"
# Test that the connection has the expected properties
assert hasattr(conn_to_a, "muxed_conn"), "Connection should have muxed_conn"
assert hasattr(conn_to_a.muxed_conn, "secured_conn"), (
"Muxed connection should have underlying secured_conn"
)
# If the underlying connection is our WebSocket connection, test its state
# Type assertion to access private attribute for testing
underlying_conn = getattr(conn_to_a.muxed_conn, "secured_conn")
if hasattr(underlying_conn, "conn_state"):
state = underlying_conn.conn_state()
assert "connection_start_time" in state, (
"Connection state should include start time"
)
assert "bytes_read" in state, "Connection state should include bytes read"
assert "bytes_written" in state, (
"Connection state should include bytes written"
)
assert state["bytes_read"] > 0, "Should have read some bytes"
assert state["bytes_written"] > 0, "Should have written some bytes"

View File

@ -0,0 +1,117 @@
"""
Tests to verify that all examples use the new address paradigm consistently
"""
from pathlib import Path
class TestExamplesAddressParadigm:
"""Test suite to verify all examples use the new address paradigm consistently"""
def get_example_files(self):
"""Get all Python files in the examples directory"""
examples_dir = Path("examples")
return list(examples_dir.rglob("*.py"))
def check_file_for_wildcard_binding(self, filepath):
"""Check if a file contains 0.0.0.0 binding"""
with open(filepath, encoding="utf-8") as f:
content = f.read()
# Check for various forms of wildcard binding
wildcard_patterns = [
"0.0.0.0",
"/ip4/0.0.0.0/",
]
found_wildcards = []
for line_num, line in enumerate(content.splitlines(), 1):
for pattern in wildcard_patterns:
if pattern in line and not line.strip().startswith("#"):
found_wildcards.append((line_num, line.strip()))
return found_wildcards
def test_examples_use_address_paradigm(self):
"""Test that examples use the new address paradigm functions"""
example_files = self.get_example_files()
# Files that should use the new paradigm
networking_examples = [
"echo/echo.py",
"chat/chat.py",
"ping/ping.py",
"bootstrap/bootstrap.py",
"pubsub/pubsub.py",
"identify/identify.py",
]
paradigm_functions = [
"get_available_interfaces",
"get_optimal_binding_address",
]
for filename in networking_examples:
filepath = None
for example_file in example_files:
if filename in str(example_file):
filepath = example_file
break
if filepath is None:
continue
with open(filepath, encoding="utf-8") as f:
content = f.read()
# Check that the file uses the new paradigm functions
for func in paradigm_functions:
assert func in content, (
f"{filepath} should use {func} from the new address paradigm"
)
def test_wildcard_available_as_feature(self):
"""Test that wildcard is available as a feature when needed"""
example_files = self.get_example_files()
# Check that network_discover.py demonstrates wildcard usage
network_discover_file = None
for example_file in example_files:
if "network_discover.py" in str(example_file):
network_discover_file = example_file
break
if network_discover_file:
with open(network_discover_file, encoding="utf-8") as f:
content = f.read()
# Should demonstrate wildcard expansion
assert "0.0.0.0" in content, (
f"{network_discover_file} should demonstrate wildcard usage"
)
assert "expand_wildcard_address" in content, (
f"{network_discover_file} should use expand_wildcard_address"
)
def test_doc_examples_use_paradigm(self):
"""Test that documentation examples use the new address paradigm"""
doc_examples_dir = Path("examples/doc-examples")
if not doc_examples_dir.exists():
return
doc_example_files = list(doc_examples_dir.glob("*.py"))
paradigm_functions = [
"get_available_interfaces",
"get_optimal_binding_address",
]
for filepath in doc_example_files:
with open(filepath, encoding="utf-8") as f:
content = f.read()
# Check that doc examples use the new paradigm
for func in paradigm_functions:
assert func in content, (
f"Documentation example {filepath} should use {func}"
)

View File

View File

@ -0,0 +1,21 @@
{
"name": "src",
"version": "1.0.0",
"main": "ping.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"description": "",
"dependencies": {
"@chainsafe/libp2p-noise": "^9.0.0",
"@chainsafe/libp2p-yamux": "^5.0.1",
"@libp2p/ping": "^2.0.36",
"@libp2p/plaintext": "^2.0.29",
"@libp2p/websockets": "^9.2.18",
"libp2p": "^2.9.0",
"multiaddr": "^10.0.1"
}
}

View File

@ -0,0 +1,122 @@
import { createLibp2p } from 'libp2p'
import { webSockets } from '@libp2p/websockets'
import { ping } from '@libp2p/ping'
import { noise } from '@chainsafe/libp2p-noise'
import { plaintext } from '@libp2p/plaintext'
import { yamux } from '@chainsafe/libp2p-yamux'
// import { identify } from '@libp2p/identify' // Commented out for compatibility
// Configuration from environment (with defaults for compatibility)
const TRANSPORT = process.env.transport || 'ws'
const SECURITY = process.env.security || 'noise'
const MUXER = process.env.muxer || 'yamux'
const IP = process.env.ip || '0.0.0.0'
async function main() {
console.log(`🔧 Configuration: transport=${TRANSPORT}, security=${SECURITY}, muxer=${MUXER}`)
// Build options following the proven pattern from test-plans-fork
const options = {
start: true,
connectionGater: {
denyDialMultiaddr: async () => false
},
connectionMonitor: {
enabled: false
},
services: {
ping: ping()
}
}
// Transport configuration (following get-libp2p.ts pattern)
switch (TRANSPORT) {
case 'ws':
options.transports = [webSockets()]
options.addresses = {
listen: [`/ip4/${IP}/tcp/0/ws`]
}
break
case 'wss':
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'
options.transports = [webSockets()]
options.addresses = {
listen: [`/ip4/${IP}/tcp/0/wss`]
}
break
default:
throw new Error(`Unknown transport: ${TRANSPORT}`)
}
// Security configuration
switch (SECURITY) {
case 'noise':
options.connectionEncryption = [noise()]
break
case 'plaintext':
options.connectionEncryption = [plaintext()]
break
default:
throw new Error(`Unknown security: ${SECURITY}`)
}
// Muxer configuration
switch (MUXER) {
case 'yamux':
options.streamMuxers = [yamux()]
break
default:
throw new Error(`Unknown muxer: ${MUXER}`)
}
console.log('🔧 Creating libp2p node with proven interop configuration...')
const node = await createLibp2p(options)
await node.start()
console.log(node.peerId.toString())
for (const addr of node.getMultiaddrs()) {
console.log(addr.toString())
}
// Debug: Print supported protocols
console.log('DEBUG: Supported protocols:')
if (node.services && node.services.registrar) {
const protocols = node.services.registrar.getProtocols()
for (const protocol of protocols) {
console.log('DEBUG: Protocol:', protocol)
}
}
// Debug: Print connection encryption protocols
console.log('DEBUG: Connection encryption protocols:')
try {
if (node.components && node.components.connectionEncryption) {
for (const encrypter of node.components.connectionEncryption) {
console.log('DEBUG: Encrypter:', encrypter.protocol)
}
}
} catch (e) {
console.log('DEBUG: Could not access connectionEncryption:', e.message)
}
// Debug: Print stream muxer protocols
console.log('DEBUG: Stream muxer protocols:')
try {
if (node.components && node.components.streamMuxers) {
for (const muxer of node.components.streamMuxers) {
console.log('DEBUG: Muxer:', muxer.protocol)
}
}
} catch (e) {
console.log('DEBUG: Could not access streamMuxers:', e.message)
}
// Keep the process alive
await new Promise(() => {})
}
main().catch(err => {
console.error(err)
process.exit(1)
})

View File

@ -0,0 +1,127 @@
import os
import signal
import subprocess
import pytest
from multiaddr import Multiaddr
import trio
from trio.lowlevel import open_process
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.custom_types import TProtocol
from libp2p.host.basic_host import BasicHost
from libp2p.network.exceptions import SwarmException
from libp2p.network.swarm import Swarm
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
from libp2p.peer.peerstore import PeerStore
from libp2p.security.insecure.transport import InsecureTransport
from libp2p.stream_muxer.yamux.yamux import Yamux
from libp2p.transport.upgrader import TransportUpgrader
from libp2p.transport.websocket.transport import WebsocketTransport
PLAINTEXT_PROTOCOL_ID = "/plaintext/2.0.0"
@pytest.mark.trio
async def test_ping_with_js_node():
# Skip this test due to JavaScript dependency issues
pytest.skip("Skipping JS interop test due to dependency issues")
js_node_dir = os.path.join(os.path.dirname(__file__), "js_libp2p", "js_node", "src")
script_name = "./ws_ping_node.mjs"
try:
subprocess.run(
["npm", "install"],
cwd=js_node_dir,
check=True,
capture_output=True,
text=True,
)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
pytest.fail(f"Failed to run 'npm install': {e}")
# Launch the JS libp2p node (long-running)
proc = await open_process(
["node", script_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=js_node_dir,
)
assert proc.stdout is not None, "stdout pipe missing"
assert proc.stderr is not None, "stderr pipe missing"
stdout = proc.stdout
stderr = proc.stderr
try:
# Read first two lines (PeerID and multiaddr)
buffer = b""
with trio.fail_after(30):
while buffer.count(b"\n") < 2:
chunk = await stdout.receive_some(1024)
if not chunk:
break
buffer += chunk
lines = [line for line in buffer.decode().splitlines() if line.strip()]
if len(lines) < 2:
stderr_output = await stderr.receive_some(2048)
stderr_output = stderr_output.decode()
pytest.fail(
"JS node did not produce expected PeerID and multiaddr.\n"
f"Stdout: {buffer.decode()!r}\n"
f"Stderr: {stderr_output!r}"
)
peer_id_line, addr_line = lines[0], lines[1]
peer_id = ID.from_base58(peer_id_line)
maddr = Multiaddr(addr_line)
# Debug: Print what we're trying to connect to
print(f"JS Node Peer ID: {peer_id_line}")
print(f"JS Node Address: {addr_line}")
print(f"All JS Node lines: {lines}")
# Set up Python host
key_pair = create_new_key_pair()
py_peer_id = ID.from_pubkey(key_pair.public_key)
peer_store = PeerStore()
peer_store.add_key_pair(py_peer_id, key_pair)
upgrader = TransportUpgrader(
secure_transports_by_protocol={
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair)
},
muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux},
)
transport = WebsocketTransport(upgrader)
swarm = Swarm(py_peer_id, peer_store, upgrader, transport)
host = BasicHost(swarm)
# Connect to JS node
peer_info = PeerInfo(peer_id, [maddr])
print(f"Python trying to connect to: {peer_info}")
# Use the host as a context manager
async with host.run(listen_addrs=[]):
await trio.sleep(1)
try:
await host.connect(peer_info)
except SwarmException as e:
underlying_error = e.__cause__
pytest.fail(
"Connection failed with SwarmException.\n"
f"THE REAL ERROR IS: {underlying_error!r}\n"
)
assert host.get_network().connections.get(peer_id) is not None
# Ping protocol
stream = await host.new_stream(peer_id, [TProtocol("/ipfs/ping/1.0.0")])
await stream.write(b"ping")
data = await stream.read(4)
assert data == b"pong"
finally:
proc.send_signal(signal.SIGTERM)
await trio.sleep(0)

View File

@ -0,0 +1,206 @@
"""
Tests for the new address paradigm with wildcard support as a feature
"""
import pytest
from multiaddr import Multiaddr
from libp2p import new_host
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
get_wildcard_address,
)
class TestAddressParadigm:
"""
Test suite for verifying the new address paradigm:
- get_available_interfaces() returns all available interfaces
- get_optimal_binding_address() returns optimal address for examples
- get_wildcard_address() provides wildcard as a feature when needed
"""
def test_wildcard_address_function(self):
"""Test that get_wildcard_address() provides wildcard as a feature"""
port = 8000
addr = get_wildcard_address(port)
# Should return wildcard address when explicitly requested
assert "0.0.0.0" in str(addr)
addr_str = str(addr)
assert "/ip4/" in addr_str
assert f"/tcp/{port}" in addr_str
def test_optimal_binding_address_selection(self):
"""Test that optimal binding address uses good heuristics"""
port = 8000
addr = get_optimal_binding_address(port)
# Should return a valid IP address (could be loopback or local network)
addr_str = str(addr)
assert "/ip4/" in addr_str
assert f"/tcp/{port}" in addr_str
# Should be from available interfaces
available_interfaces = get_available_interfaces(port)
assert addr in available_interfaces
def test_available_interfaces_includes_loopback(self):
"""Test that available interfaces always includes loopback address"""
port = 8000
interfaces = get_available_interfaces(port)
# Should have at least one interface
assert len(interfaces) > 0
# Should include loopback address
loopback_found = any("127.0.0.1" in str(addr) for addr in interfaces)
assert loopback_found, "Loopback address not found in available interfaces"
# Available interfaces should not include wildcard by default
# (wildcard is available as a feature through get_wildcard_address())
wildcard_found = any("0.0.0.0" in str(addr) for addr in interfaces)
assert not wildcard_found, (
"Wildcard should not be in default available interfaces"
)
def test_host_default_listen_address(self):
"""Test that new hosts use secure default addresses"""
# Create a host with a specific port
port = 8000
listen_addr = Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")
host = new_host(listen_addrs=[listen_addr])
# Verify the host configuration
assert host is not None
# Note: We can't test actual binding without running the host,
# but we've verified the address format is correct
def test_paradigm_consistency(self):
"""Test that the address paradigm is consistent"""
port = 8000
# get_optimal_binding_address should return a valid address
optimal_addr = get_optimal_binding_address(port)
assert "/ip4/" in str(optimal_addr)
assert f"/tcp/{port}" in str(optimal_addr)
# get_wildcard_address should return wildcard when explicitly needed
wildcard_addr = get_wildcard_address(port)
assert "0.0.0.0" in str(wildcard_addr)
assert f"/tcp/{port}" in str(wildcard_addr)
# Both should be valid Multiaddr objects
assert isinstance(optimal_addr, Multiaddr)
assert isinstance(wildcard_addr, Multiaddr)
@pytest.mark.parametrize("protocol", ["tcp", "udp"])
def test_different_protocols_support(self, protocol):
"""Test that different protocols are supported by the paradigm"""
port = 8000
# Test optimal address with different protocols
optimal_addr = get_optimal_binding_address(port, protocol=protocol)
assert protocol in str(optimal_addr)
assert f"/{protocol}/{port}" in str(optimal_addr)
# Test wildcard address with different protocols
wildcard_addr = get_wildcard_address(port, protocol=protocol)
assert "0.0.0.0" in str(wildcard_addr)
assert protocol in str(wildcard_addr)
assert f"/{protocol}/{port}" in str(wildcard_addr)
# Test available interfaces with different protocols
interfaces = get_available_interfaces(port, protocol=protocol)
assert len(interfaces) > 0
for addr in interfaces:
assert protocol in str(addr)
def test_wildcard_available_as_feature(self):
"""Test that wildcard binding is available as a feature when needed"""
port = 8000
# Wildcard should be available through get_wildcard_address()
wildcard_addr = get_wildcard_address(port)
assert "0.0.0.0" in str(wildcard_addr)
# But should not be in default available interfaces
interfaces = get_available_interfaces(port)
wildcard_in_interfaces = any("0.0.0.0" in str(addr) for addr in interfaces)
assert not wildcard_in_interfaces, (
"Wildcard should not be in default interfaces"
)
# Optimal address should not be wildcard by default
optimal = get_optimal_binding_address(port)
assert "0.0.0.0" not in str(optimal), (
"Optimal address should not be wildcard by default"
)
def test_loopback_is_always_available(self):
"""Test that loopback address is always available as an option"""
port = 8000
interfaces = get_available_interfaces(port)
# Loopback should always be available
loopback_addrs = [addr for addr in interfaces if "127.0.0.1" in str(addr)]
assert len(loopback_addrs) > 0, "Loopback address should always be available"
# At least one loopback address should have the correct port
loopback_with_port = [
addr for addr in loopback_addrs if f"/tcp/{port}" in str(addr)
]
assert len(loopback_with_port) > 0, (
f"Loopback address with port {port} should be available"
)
def test_optimal_address_selection_behavior(self):
"""Test that optimal address selection works correctly"""
port = 8000
interfaces = get_available_interfaces(port)
optimal = get_optimal_binding_address(port)
# Should return one of the available interfaces
optimal_str = str(optimal)
interface_strs = [str(addr) for addr in interfaces]
assert optimal_str in interface_strs, (
f"Optimal address {optimal_str} should be in available interfaces"
)
# Should prefer non-loopback when available, fallback to loopback
non_loopback_interfaces = [
addr for addr in interfaces if "127.0.0.1" not in str(addr)
]
if non_loopback_interfaces:
# Should prefer non-loopback when available
assert "127.0.0.1" not in str(optimal), (
"Should prefer non-loopback when available"
)
else:
# Should use loopback when no other interfaces available
assert "127.0.0.1" in str(optimal), (
"Should use loopback when no other interfaces available"
)
def test_address_paradigm_completeness(self):
"""Test that the address paradigm provides all necessary functionality"""
port = 8000
# Test that we get interface options
interfaces = get_available_interfaces(port)
assert len(interfaces) >= 1, "Should have at least one interface"
# Test that loopback is always included
has_loopback = any("127.0.0.1" in str(addr) for addr in interfaces)
assert has_loopback, "Loopback should always be available"
# Test that wildcard is available as a feature
wildcard_addr = get_wildcard_address(port)
assert "0.0.0.0" in str(wildcard_addr)
# Test optimal selection
optimal = get_optimal_binding_address(port)
assert optimal in interfaces, (
"Optimal address should be from available interfaces"
)