Merge remote-tracking branch 'acul71/feat/804-add-thin-waist-address' into feat/804-add-thin-waist-address

This commit is contained in:
yashksaini-coder
2025-08-19 19:56:20 +05:30
7 changed files with 311 additions and 82 deletions

View File

@ -5,11 +5,12 @@ the stream passed into opened_stream is correct.
Note: Listen event does not get hit because MyNotifee is passed
into network after network has already started listening
TODO: Add tests for closed_stream, listen_close when those
features are implemented in swarm
Note: ClosedStream events are processed asynchronously and may not be
immediately available due to the rapid nature of operations
"""
import enum
from unittest.mock import Mock
import pytest
from multiaddr import Multiaddr
@ -29,11 +30,11 @@ from tests.utils.factories import (
class Event(enum.Enum):
OpenedStream = 0
ClosedStream = 1 # Not implemented
ClosedStream = 1
Connected = 2
Disconnected = 3
Listen = 4
ListenClose = 5 # Not implemented
ListenClose = 5
class MyNotifee(INotifee):
@ -60,8 +61,11 @@ class MyNotifee(INotifee):
self.events.append(Event.Listen)
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
# TODO: It is not implemented yet.
pass
if network is None:
raise ValueError("network parameter cannot be None")
if multiaddr is None:
raise ValueError("multiaddr parameter cannot be None")
self.events.append(Event.ListenClose)
@pytest.mark.trio
@ -123,3 +127,171 @@ async def test_notify(security_protocol):
assert await wait_for_event(events_1_1, Event.OpenedStream, 1.0)
assert await wait_for_event(events_1_1, Event.ClosedStream, 1.0)
assert await wait_for_event(events_1_1, Event.Disconnected, 1.0)
# Note: ListenClose events are triggered when swarm closes during cleanup
# The test framework automatically closes listeners, triggering ListenClose
# notifications
async def wait_for_event(events_list, event, timeout=1.0):
"""Helper to wait for a specific event to appear in the events list."""
with trio.move_on_after(timeout):
while event not in events_list:
await trio.sleep(0.01)
return True
return False
@pytest.mark.trio
async def test_notify_with_closed_stream_and_listen_close():
"""Test that closed_stream and listen_close events are properly triggered."""
# Event lists for notifees
events_0 = []
events_1 = []
# Create two swarms
async with SwarmFactory.create_batch_and_listen(2) as swarms:
# Register notifees
notifee_0 = MyNotifee(events_0)
notifee_1 = MyNotifee(events_1)
swarms[0].register_notifee(notifee_0)
swarms[1].register_notifee(notifee_1)
# Connect swarms
await connect_swarm(swarms[0], swarms[1])
# Create and close a stream to trigger closed_stream event
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
await stream.close()
# Note: Events are processed asynchronously and may not be immediately available
# due to the rapid nature of operations
@pytest.mark.trio
async def test_notify_edge_cases():
"""Test edge cases for notify system."""
events = []
async with SwarmFactory.create_batch_and_listen(2) as swarms:
notifee = MyNotifee(events)
swarms[0].register_notifee(notifee)
# Connect swarms first
await connect_swarm(swarms[0], swarms[1])
# Test 1: Multiple rapid stream operations
streams = []
for _ in range(5):
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
streams.append(stream)
# Close all streams rapidly
for stream in streams:
await stream.close()
@pytest.mark.trio
async def test_my_notifee_error_handling():
"""Test error handling for invalid parameters in MyNotifee methods."""
events = []
notifee = MyNotifee(events)
# Mock objects for testing
mock_network = Mock(spec=INetwork)
mock_stream = Mock(spec=INetStream)
mock_multiaddr = Mock(spec=Multiaddr)
# Test closed_stream with None parameters
with pytest.raises(ValueError, match="network parameter cannot be None"):
await notifee.closed_stream(None, mock_stream) # type: ignore
with pytest.raises(ValueError, match="stream parameter cannot be None"):
await notifee.closed_stream(mock_network, None) # type: ignore
# Test listen_close with None parameters
with pytest.raises(ValueError, match="network parameter cannot be None"):
await notifee.listen_close(None, mock_multiaddr) # type: ignore
with pytest.raises(ValueError, match="multiaddr parameter cannot be None"):
await notifee.listen_close(mock_network, None) # type: ignore
# Verify no events were recorded due to errors
assert len(events) == 0
@pytest.mark.trio
async def test_rapid_stream_operations():
"""Test rapid stream open/close operations."""
events_0 = []
events_1 = []
async with SwarmFactory.create_batch_and_listen(2) as swarms:
notifee_0 = MyNotifee(events_0)
notifee_1 = MyNotifee(events_1)
swarms[0].register_notifee(notifee_0)
swarms[1].register_notifee(notifee_1)
# Connect swarms
await connect_swarm(swarms[0], swarms[1])
# Rapidly create and close multiple streams
streams = []
for _ in range(3):
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
streams.append(stream)
# Close all streams immediately
for stream in streams:
await stream.close()
# Verify OpenedStream events are recorded
assert events_0.count(Event.OpenedStream) == 3
assert events_1.count(Event.OpenedStream) == 3
# Close peer to trigger disconnection events
await swarms[0].close_peer(swarms[1].get_peer_id())
@pytest.mark.trio
async def test_concurrent_stream_operations():
"""Test concurrent stream operations using trio nursery."""
events_0 = []
events_1 = []
async with SwarmFactory.create_batch_and_listen(2) as swarms:
notifee_0 = MyNotifee(events_0)
notifee_1 = MyNotifee(events_1)
swarms[0].register_notifee(notifee_0)
swarms[1].register_notifee(notifee_1)
# Connect swarms
await connect_swarm(swarms[0], swarms[1])
async def create_and_close_stream():
"""Create and immediately close a stream."""
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
await stream.close()
# Run multiple stream operations concurrently
async with trio.open_nursery() as nursery:
for _ in range(4):
nursery.start_soon(create_and_close_stream)
# Verify some OpenedStream events are recorded
# (concurrent operations may not all succeed)
opened_count_0 = events_0.count(Event.OpenedStream)
opened_count_1 = events_1.count(Event.OpenedStream)
assert opened_count_0 > 0, (
f"Expected some OpenedStream events, got {opened_count_0}"
)
assert opened_count_1 > 0, (
f"Expected some OpenedStream events, got {opened_count_1}"
)
# Close peer to trigger disconnection events
await swarms[0].close_peer(swarms[1].get_peer_id())

View File

@ -1,65 +1,108 @@
import contextlib
import sys
import os
from pathlib import Path
import subprocess
import sys
import time
import pytest
import trio
from multiaddr import Multiaddr
from multiaddr.protocols import P_IP4, P_IP6, P_P2P, P_TCP
# pytestmark = pytest.mark.timeout(20) # Temporarily disabled for debugging
# This test is intentionally lightweight and can be marked as 'integration'.
# It ensures the echo example runs and prints the new Thin Waist lines using Trio primitives.
EXAMPLES_DIR = Path(__file__).parent.parent.parent / "examples" / "echo"
current_file = Path(__file__)
project_root = current_file.parent.parent.parent
EXAMPLES_DIR: Path = project_root / "examples" / "echo"
@pytest.mark.trio
async def test_echo_example_starts_and_prints_thin_waist() -> None:
cmd = [sys.executable, str(EXAMPLES_DIR / "echo.py"), "-p", "0"]
def test_echo_example_starts_and_prints_thin_waist(monkeypatch, tmp_path):
"""Run echo server and validate printed multiaddr and peer id."""
# Run echo example as server
cmd = [sys.executable, "-u", str(EXAMPLES_DIR / "echo.py"), "-p", "0"]
env = {**os.environ, "PYTHONUNBUFFERED": "1"}
proc: subprocess.Popen[str] = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
found_selected = False
found_interfaces = False
if proc.stdout is None:
proc.terminate()
raise RuntimeError("Process stdout is None")
out_stream = proc.stdout
# Use a cancellation scope as timeout (similar to previous 10s loop)
with trio.move_on_after(10) as cancel_scope:
# Start process streaming stdout
proc = await trio.open_process(
cmd,
stdout=trio.SUBPROCESS_PIPE,
stderr=trio.STDOUT,
peer_id: str | None = None
printed_multiaddr: str | None = None
saw_waiting = False
start = time.time()
timeout_s = 8.0
try:
while time.time() - start < timeout_s:
line = out_stream.readline()
if not line:
time.sleep(0.05)
continue
s = line.strip()
if s.startswith("I am "):
peer_id = s.partition("I am ")[2]
if s.startswith("echo-demo -d "):
printed_multiaddr = s.partition("echo-demo -d ")[2]
if "Waiting for incoming connections..." in s:
saw_waiting = True
break
finally:
with contextlib.suppress(ProcessLookupError):
proc.terminate()
with contextlib.suppress(ProcessLookupError):
proc.kill()
assert peer_id, "Did not capture peer ID line"
assert printed_multiaddr, "Did not capture multiaddr line"
assert saw_waiting, "Did not capture waiting-for-connections line"
# Validate multiaddr structure using py-multiaddr protocol methods
ma = Multiaddr(printed_multiaddr) # should parse without error
# Check that the multiaddr contains the p2p protocol
try:
peer_id_from_multiaddr = ma.value_for_protocol("p2p")
assert peer_id_from_multiaddr is not None, (
"Multiaddr missing p2p protocol value"
)
assert peer_id_from_multiaddr == peer_id, (
f"Peer ID mismatch: {peer_id_from_multiaddr} != {peer_id}"
)
except Exception as e:
raise AssertionError(f"Failed to extract p2p protocol value: {e}")
assert proc.stdout is not None # for type checkers
buffer = b""
# Validate the multiaddr structure by checking protocols
protocols = ma.protocols()
try:
while not (found_selected and found_interfaces):
# Read some bytes (non-blocking with timeout scope)
data = await proc.stdout.receive_some(1024)
if not data:
# Process might still be starting; yield control
await trio.sleep(0.05)
continue
buffer += data
# Process complete lines
*lines, buffer = buffer.split(b"\n") if b"\n" in buffer else ([], buffer)
for raw in lines:
line = raw.decode(errors="ignore")
if "Selected binding address:" in line:
found_selected = True
if "Available candidate interfaces:" in line:
found_interfaces = True
if "Waiting for incoming connections..." in line:
# We have reached steady state; can stop reading further
if found_selected and found_interfaces:
break
finally:
# Terminate the long-running echo example
with contextlib.suppress(Exception):
proc.terminate()
with contextlib.suppress(Exception):
await trio.move_on_after(2)(proc.wait) # best-effort wait
if cancel_scope.cancelled_caught:
# Timeout occurred
pass
# Should have at least IP, TCP, and P2P protocols
assert any(p.code == P_IP4 or p.code == P_IP6 for p in protocols), (
"Missing IP protocol"
)
assert any(p.code == P_TCP for p in protocols), "Missing TCP protocol"
assert any(p.code == P_P2P for p in protocols), "Missing P2P protocol"
assert found_selected, "Did not capture Thin Waist binding log line"
assert found_interfaces, "Did not capture Thin Waist interfaces log line"
# Extract the p2p part and validate it matches the captured peer ID
p2p_part = Multiaddr(f"/p2p/{peer_id}")
try:
# Decapsulate the p2p part to get the transport address
transport_addr = ma.decapsulate(p2p_part)
# Verify the decapsulated address doesn't contain p2p
transport_protocols = transport_addr.protocols()
assert not any(p.code == P_P2P for p in transport_protocols), (
"Decapsulation failed - still contains p2p"
)
# Verify the original multiaddr can be reconstructed
reconstructed = transport_addr.encapsulate(p2p_part)
assert str(reconstructed) == str(ma), "Reconstruction failed"
except Exception as e:
raise AssertionError(f"Multiaddr decapsulation failed: {e}")

View File

@ -4,9 +4,9 @@ import pytest
from multiaddr import Multiaddr
from libp2p.utils.address_validation import (
expand_wildcard_address,
get_available_interfaces,
get_optimal_binding_address,
expand_wildcard_address,
)
@ -53,4 +53,4 @@ def test_expand_wildcard_address_ipv6() -> None:
expanded = expand_wildcard_address(wildcard)
assert len(expanded) > 0
for e in expanded:
assert "/ip6/" in str(e)
assert "/ip6/" in str(e)