Fix linting and type checking issues for Thin Waist feature

This commit is contained in:
acul71
2025-08-19 01:11:48 +02:00
parent fe71c479dc
commit 05b372b1eb
5 changed files with 106 additions and 40 deletions

View File

@ -11,8 +11,8 @@ from multiaddr import Multiaddr
try:
from libp2p.utils.address_validation import (
get_available_interfaces,
expand_wildcard_address,
get_available_interfaces,
get_optimal_binding_address,
)
except ImportError:
@ -21,7 +21,10 @@ except ImportError:
return [Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")]
def expand_wildcard_address(addr: Multiaddr, port: int | None = None):
return [addr if port is None else Multiaddr(str(addr).rsplit("/", 1)[0] + f"/{port}")]
if port is None:
return [addr]
addr_str = str(addr).rsplit("/", 1)[0]
return [Multiaddr(addr_str + f"/{port}")]
def get_optimal_binding_address(port: int, protocol: str = "tcp"):
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
@ -57,4 +60,4 @@ def main() -> None:
if __name__ == "__main__":
main()
main()

View File

@ -18,10 +18,8 @@ from libp2p.network.stream.net_stream import (
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.utils.address_validation import (
get_optimal_binding_address,
get_available_interfaces,
)
PROTOCOL_ID = TProtocol("/echo/1.0.0")
@ -38,7 +36,7 @@ async def _echo_stream_handler(stream: INetStream) -> None:
async def run(port: int, destination: str, seed: int | None = None) -> None:
# CHANGED: previously hardcoded 0.0.0.0
listen_addr = get_optimal_binding_address(port)
if seed:
import random

View File

@ -1,9 +1,13 @@
from __future__ import annotations
from typing import List, Optional
from multiaddr import Multiaddr
try:
from multiaddr.utils import get_thin_waist_addresses, get_network_addrs # type: ignore
from multiaddr.utils import ( # type: ignore
get_network_addrs,
get_thin_waist_addresses,
)
_HAS_THIN_WAIST = True
except ImportError: # pragma: no cover - only executed in older environments
_HAS_THIN_WAIST = False
@ -11,7 +15,7 @@ except ImportError: # pragma: no cover - only executed in older environments
get_network_addrs = None # type: ignore
def _safe_get_network_addrs(ip_version: int) -> List[str]:
def _safe_get_network_addrs(ip_version: int) -> list[str]:
"""
Internal safe wrapper. Returns a list of IP addresses for the requested IP version.
Falls back to minimal defaults when Thin Waist helpers are missing.
@ -31,7 +35,7 @@ def _safe_get_network_addrs(ip_version: int) -> List[str]:
return []
def _safe_expand(addr: Multiaddr, port: Optional[int] = None) -> List[Multiaddr]:
def _safe_expand(addr: Multiaddr, port: int | None = None) -> list[Multiaddr]:
"""
Internal safe expansion wrapper. Returns a list of Multiaddr objects.
If Thin Waist isn't available, returns [addr] (identity).
@ -46,7 +50,7 @@ def _safe_expand(addr: Multiaddr, port: Optional[int] = None) -> List[Multiaddr]
return [addr]
def get_available_interfaces(port: int, protocol: str = "tcp") -> List[Multiaddr]:
def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr]:
"""
Discover available network interfaces (IPv4 + IPv6 if supported) for binding.
@ -54,15 +58,17 @@ def get_available_interfaces(port: int, protocol: str = "tcp") -> List[Multiaddr
:param protocol: Transport protocol (e.g., "tcp" or "udp").
:return: List of Multiaddr objects representing candidate interface addresses.
"""
addrs: List[Multiaddr] = []
addrs: list[Multiaddr] = []
# IPv4 enumeration
for ip in _safe_get_network_addrs(4):
addrs.append(Multiaddr(f"/ip4/{ip}/{protocol}/{port}"))
# IPv6 enumeration (optional: only include if we have at least one global or loopback)
# IPv6 enumeration (optional: only include if we have at least one global or
# loopback)
for ip in _safe_get_network_addrs(6):
# Avoid returning unusable wildcard expansions if the environment does not support IPv6
# Avoid returning unusable wildcard expansions if the environment does not
# support IPv6
addrs.append(Multiaddr(f"/ip6/{ip}/{protocol}/{port}"))
# Fallback if nothing discovered
@ -72,7 +78,9 @@ def get_available_interfaces(port: int, protocol: str = "tcp") -> List[Multiaddr
return addrs
def expand_wildcard_address(addr: Multiaddr, port: Optional[int] = None) -> List[Multiaddr]:
def expand_wildcard_address(
addr: Multiaddr, port: int | None = None
) -> list[Multiaddr]:
"""
Expand a wildcard (e.g. /ip4/0.0.0.0/tcp/0) into all concrete interfaces.
@ -122,4 +130,4 @@ __all__ = [
"get_available_interfaces",
"get_optimal_binding_address",
"expand_wildcard_address",
]
]

View File

@ -1,45 +1,60 @@
import asyncio
import contextlib
import os
from pathlib import Path
import subprocess
import sys
import time
from pathlib import Path
import pytest
from multiaddr import Multiaddr
from multiaddr.protocols import P_IP4, P_IP6, P_P2P, P_TCP
# pytestmark = pytest.mark.timeout(20) # Temporarily disabled for debugging
# This test is intentionally lightweight and can be marked as 'integration'.
# It ensures the echo example runs and prints the new Thin Waist lines.
EXAMPLES_DIR = Path(__file__).parent.parent.parent / "examples" / "echo"
current_file = Path(__file__)
project_root = current_file.parent.parent.parent
EXAMPLES_DIR: Path = project_root / "examples" / "echo"
@pytest.mark.timeout(20)
def test_echo_example_starts_and_prints_thin_waist(monkeypatch, tmp_path):
# We run: python examples/echo/echo.py -p 0
cmd = [sys.executable, str(EXAMPLES_DIR / "echo.py"), "-p", "0"]
proc = subprocess.Popen(
"""Run echo server and validate printed multiaddr and peer id."""
# Run echo example as server
cmd = [sys.executable, "-u", str(EXAMPLES_DIR / "echo.py"), "-p", "0"]
env = {**os.environ, "PYTHONUNBUFFERED": "1"}
proc: subprocess.Popen[str] = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
assert proc.stdout is not None
found_selected = False
found_interfaces = False
if proc.stdout is None:
proc.terminate()
raise RuntimeError("Process stdout is None")
out_stream = proc.stdout
peer_id: str | None = None
printed_multiaddr: str | None = None
saw_waiting = False
start = time.time()
timeout_s = 8.0
try:
while time.time() - start < 10:
line = proc.stdout.readline()
while time.time() - start < timeout_s:
line = out_stream.readline()
if not line:
time.sleep(0.1)
time.sleep(0.05)
continue
if "Selected binding address:" in line:
found_selected = True
if "Available candidate interfaces:" in line:
found_interfaces = True
if "Waiting for incoming connections..." in line:
s = line.strip()
if s.startswith("I am "):
peer_id = s.partition("I am ")[2]
if s.startswith("echo-demo -d "):
printed_multiaddr = s.partition("echo-demo -d ")[2]
if "Waiting for incoming connections..." in s:
saw_waiting = True
break
finally:
with contextlib.suppress(ProcessLookupError):
@ -47,5 +62,47 @@ def test_echo_example_starts_and_prints_thin_waist(monkeypatch, tmp_path):
with contextlib.suppress(ProcessLookupError):
proc.kill()
assert found_selected, "Did not capture Thin Waist binding log line"
assert found_interfaces, "Did not capture Thin Waist interfaces log line"
assert peer_id, "Did not capture peer ID line"
assert printed_multiaddr, "Did not capture multiaddr line"
assert saw_waiting, "Did not capture waiting-for-connections line"
# Validate multiaddr structure using py-multiaddr protocol methods
ma = Multiaddr(printed_multiaddr) # should parse without error
# Check that the multiaddr contains the p2p protocol
try:
peer_id_from_multiaddr = ma.value_for_protocol("p2p")
assert peer_id_from_multiaddr is not None, (
"Multiaddr missing p2p protocol value"
)
assert peer_id_from_multiaddr == peer_id, (
f"Peer ID mismatch: {peer_id_from_multiaddr} != {peer_id}"
)
except Exception as e:
raise AssertionError(f"Failed to extract p2p protocol value: {e}")
# Validate the multiaddr structure by checking protocols
protocols = ma.protocols()
# Should have at least IP, TCP, and P2P protocols
assert any(p.code == P_IP4 or p.code == P_IP6 for p in protocols), (
"Missing IP protocol"
)
assert any(p.code == P_TCP for p in protocols), "Missing TCP protocol"
assert any(p.code == P_P2P for p in protocols), "Missing P2P protocol"
# Extract the p2p part and validate it matches the captured peer ID
p2p_part = Multiaddr(f"/p2p/{peer_id}")
try:
# Decapsulate the p2p part to get the transport address
transport_addr = ma.decapsulate(p2p_part)
# Verify the decapsulated address doesn't contain p2p
transport_protocols = transport_addr.protocols()
assert not any(p.code == P_P2P for p in transport_protocols), (
"Decapsulation failed - still contains p2p"
)
# Verify the original multiaddr can be reconstructed
reconstructed = transport_addr.encapsulate(p2p_part)
assert str(reconstructed) == str(ma), "Reconstruction failed"
except Exception as e:
raise AssertionError(f"Multiaddr decapsulation failed: {e}")

View File

@ -4,9 +4,9 @@ import pytest
from multiaddr import Multiaddr
from libp2p.utils.address_validation import (
expand_wildcard_address,
get_available_interfaces,
get_optimal_binding_address,
expand_wildcard_address,
)
@ -53,4 +53,4 @@ def test_expand_wildcard_address_ipv6() -> None:
expanded = expand_wildcard_address(wildcard)
assert len(expanded) > 0
for e in expanded:
assert "/ip6/" in str(e)
assert "/ip6/" in str(e)