Fix multi-address listening bug in swarm.listen()

- Fix early return in swarm.listen() that prevented listening on all addresses
- Add comprehensive tests for multi-address listening functionality
- Ensure all available interfaces are properly bound and connectable
This commit is contained in:
acul71
2025-08-24 01:49:42 +02:00
parent ed2716c1bf
commit b6cbd78943
2 changed files with 123 additions and 4 deletions

View File

@ -249,9 +249,11 @@ class Swarm(Service, INetworkService):
# We need to wait until `self.listener_nursery` is created.
await self.event_listener_nursery_created.wait()
success_count = 0
for maddr in multiaddrs:
if str(maddr) in self.listeners:
return True
success_count += 1
continue
async def conn_handler(
read_write_closer: ReadWriteCloser, maddr: Multiaddr = maddr
@ -302,13 +304,14 @@ class Swarm(Service, INetworkService):
# Call notifiers since event occurred
await self.notify_listen(maddr)
return True
success_count += 1
logger.debug("successfully started listening on: %s", maddr)
except OSError:
# Failed. Continue looping.
logger.debug("fail to listen on: %s", maddr)
# No maddr succeeded
return False
# Return true if at least one address succeeded
return success_count > 0
async def close(self) -> None:
"""

View File

@ -16,6 +16,9 @@ from libp2p.network.exceptions import (
from libp2p.network.swarm import (
Swarm,
)
from libp2p.tools.async_service import (
background_trio_service,
)
from libp2p.tools.utils import (
connect_swarm,
)
@ -184,3 +187,116 @@ def test_new_swarm_quic_multiaddr_raises():
addr = Multiaddr("/ip4/127.0.0.1/udp/9999/quic")
with pytest.raises(ValueError, match="QUIC not yet supported"):
new_swarm(listen_addrs=[addr])
@pytest.mark.trio
async def test_swarm_listen_multiple_addresses(security_protocol):
"""Test that swarm can listen on multiple addresses simultaneously."""
from libp2p.utils.address_validation import get_available_interfaces
# Get multiple addresses to listen on
listen_addrs = get_available_interfaces(0) # Let OS choose ports
# Create a swarm and listen on multiple addresses
swarm = SwarmFactory.build(security_protocol=security_protocol)
async with background_trio_service(swarm):
# Listen on all addresses
success = await swarm.listen(*listen_addrs)
assert success, "Should successfully listen on at least one address"
# Check that we have listeners for the addresses
actual_listeners = list(swarm.listeners.keys())
assert len(actual_listeners) > 0, "Should have at least one listener"
# Verify that all successful listeners are in the listeners dict
successful_count = 0
for addr in listen_addrs:
addr_str = str(addr)
if addr_str in actual_listeners:
successful_count += 1
# This address successfully started listening
listener = swarm.listeners[addr_str]
listener_addrs = listener.get_addrs()
assert len(listener_addrs) > 0, (
f"Listener for {addr} should have addresses"
)
# Check that the listener address matches the expected address
# (port might be different if we used port 0)
expected_ip = addr.value_for_protocol("ip4")
expected_protocol = addr.value_for_protocol("tcp")
if expected_ip and expected_protocol:
found_matching = False
for listener_addr in listener_addrs:
if (
listener_addr.value_for_protocol("ip4") == expected_ip
and listener_addr.value_for_protocol("tcp") is not None
):
found_matching = True
break
assert found_matching, (
f"Listener for {addr} should have matching IP"
)
assert successful_count == len(listen_addrs), (
f"All {len(listen_addrs)} addresses should be listening, "
f"but only {successful_count} succeeded"
)
@pytest.mark.trio
async def test_swarm_listen_multiple_addresses_connectivity(security_protocol):
"""Test that real libp2p connections can be established to all listening addresses.""" # noqa: E501
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.utils.address_validation import get_available_interfaces
# Get multiple addresses to listen on
listen_addrs = get_available_interfaces(0) # Let OS choose ports
# Create a swarm and listen on multiple addresses
swarm1 = SwarmFactory.build(security_protocol=security_protocol)
async with background_trio_service(swarm1):
# Listen on all addresses
success = await swarm1.listen(*listen_addrs)
assert success, "Should successfully listen on at least one address"
# Verify all available interfaces are listening
assert len(swarm1.listeners) == len(listen_addrs), (
f"All {len(listen_addrs)} interfaces should be listening, "
f"but only {len(swarm1.listeners)} are"
)
# Create a second swarm to test connections
swarm2 = SwarmFactory.build(security_protocol=security_protocol)
async with background_trio_service(swarm2):
# Test connectivity to each listening address using real libp2p connections
for addr_str, listener in swarm1.listeners.items():
listener_addrs = listener.get_addrs()
for listener_addr in listener_addrs:
# Create a full multiaddr with peer ID for libp2p connection
peer_id = swarm1.get_peer_id()
full_addr = listener_addr.encapsulate(f"/p2p/{peer_id}")
# Test real libp2p connection
try:
peer_info = info_from_p2p_addr(full_addr)
# Add the peer info to swarm2's peerstore so it knows where to connect # noqa: E501
swarm2.peerstore.add_addrs(
peer_info.peer_id, [listener_addr], 10000
)
await swarm2.dial_peer(peer_info.peer_id)
# Verify connection was established
assert peer_info.peer_id in swarm2.connections, (
f"Connection to {full_addr} should be established"
)
assert swarm2.get_peer_id() in swarm1.connections, (
f"Connection from {full_addr} should be established"
)
except Exception as e:
pytest.fail(
f"Failed to establish libp2p connection to {full_addr}: {e}"
)