fix: succesfull echo example completed

This commit is contained in:
Akash Mondal
2025-07-01 12:24:57 +00:00
committed by lla-dane
parent 8f0cdc9ed4
commit 6c45862fe9
8 changed files with 199 additions and 83 deletions

View File

@ -55,7 +55,7 @@ async def run_server(port: int, seed: int | None = None) -> None:
# QUIC transport configuration
quic_config = QUICTransportConfig(
idle_timeout=30.0,
max_concurrent_streams=1000,
max_concurrent_streams=100,
connection_timeout=10.0,
enable_draft29=False,
)
@ -68,16 +68,21 @@ async def run_server(port: int, seed: int | None = None) -> None:
# Server mode: start listener
async with host.run(listen_addrs=[listen_addr]):
print(f"I am {host.get_id().to_string()}")
host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler)
try:
print(f"I am {host.get_id().to_string()}")
host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler)
print(
"Run this from the same folder in another console:\n\n"
f"python3 ./examples/echo/echo_quic.py "
f"-d {host.get_addrs()[0]}\n"
)
print("Waiting for incoming QUIC connections...")
await trio.sleep_forever()
print(
"Run this from the same folder in another console:\n\n"
f"python3 ./examples/echo/echo_quic.py "
f"-d {host.get_addrs()[0]}\n"
)
print("Waiting for incoming QUIC connections...")
await trio.sleep_forever()
except KeyboardInterrupt:
print("Closing server gracefully...")
await host.close()
return
async def run_client(destination: str, seed: int | None = None) -> None:
@ -96,7 +101,7 @@ async def run_client(destination: str, seed: int | None = None) -> None:
# QUIC transport configuration
quic_config = QUICTransportConfig(
idle_timeout=30.0,
max_concurrent_streams=1000,
max_concurrent_streams=100,
connection_timeout=10.0,
enable_draft29=False,
)

View File

@ -299,9 +299,7 @@ class BasicHost(IHost):
)
except MultiselectError as error:
peer_id = net_stream.muxed_conn.peer_id
logger.debug(
"failed to accept a stream from peer %s, error=%s", peer_id, error
)
print("failed to accept a stream from peer %s, error=%s", peer_id, error)
await net_stream.reset()
return
if protocol is None:

View File

@ -1,3 +1,5 @@
from builtins import AssertionError
from libp2p.abc import (
IMultiselectCommunicator,
)
@ -36,7 +38,8 @@ class MultiselectCommunicator(IMultiselectCommunicator):
msg_bytes = encode_delim(msg_str.encode())
try:
await self.read_writer.write(msg_bytes)
except IOException as error:
# Handle for connection close during ongoing negotiation in QUIC
except (IOException, AssertionError, ValueError) as error:
raise MultiselectCommunicatorError(
"fail to write to multiselect communicator"
) from error

View File

@ -1,3 +1,5 @@
from typing import Literal
"""
Configuration classes for QUIC transport.
"""
@ -64,7 +66,7 @@ class QUICTransportConfig:
alpn_protocols: list[str] = field(default_factory=lambda: ["libp2p"])
# Performance settings
max_concurrent_streams: int = 1000 # Maximum concurrent streams per connection
max_concurrent_streams: int = 100 # Maximum concurrent streams per connection
connection_window: int = 1024 * 1024 # Connection flow control window
stream_window: int = 64 * 1024 # Stream flow control window
@ -299,10 +301,11 @@ class QUICStreamMetricsConfig:
self.metrics_aggregation_interval = metrics_aggregation_interval
# Factory function for creating optimized configurations
def create_stream_config_for_use_case(use_case: str) -> QUICTransportConfig:
def create_stream_config_for_use_case(
use_case: Literal[
"high_throughput", "low_latency", "many_streams", "memory_constrained"
],
) -> QUICTransportConfig:
"""
Create optimized stream configuration for specific use cases.

View File

@ -19,6 +19,7 @@ import trio
from libp2p.abc import IMuxedConn, IRawConnection
from libp2p.custom_types import TQUICStreamHandlerFn
from libp2p.peer.id import ID
from libp2p.stream_muxer.exceptions import MuxedConnUnavailable
from .exceptions import (
QUICConnectionClosedError,
@ -64,8 +65,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
- COMPLETE connection ID management (fixes the original issue)
"""
# Configuration constants based on research
MAX_CONCURRENT_STREAMS = 1000
MAX_CONCURRENT_STREAMS = 100
MAX_INCOMING_STREAMS = 1000
MAX_OUTGOING_STREAMS = 1000
STREAM_ACCEPT_TIMEOUT = 30.0
@ -76,7 +76,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self,
quic_connection: QuicConnection,
remote_addr: tuple[str, int],
peer_id: ID,
remote_peer_id: ID | None,
local_peer_id: ID,
is_initiator: bool,
maddr: multiaddr.Multiaddr,
@ -91,7 +91,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
Args:
quic_connection: aioquic QuicConnection instance
remote_addr: Remote peer address
peer_id: Remote peer ID (may be None initially)
remote_peer_id: Remote peer ID (may be None initially)
local_peer_id: Local peer ID
is_initiator: Whether this is the connection initiator
maddr: Multiaddr for this connection
@ -103,8 +103,9 @@ class QUICConnection(IRawConnection, IMuxedConn):
"""
self._quic = quic_connection
self._remote_addr = remote_addr
self.peer_id = peer_id
self._remote_peer_id = remote_peer_id
self._local_peer_id = local_peer_id
self.peer_id = remote_peer_id or local_peer_id
self.__is_initiator = is_initiator
self._maddr = maddr
self._transport = transport
@ -134,7 +135,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._accept_queue_lock = trio.Lock()
# Connection state
self._closed = False
self._closed: bool = False
self._established = False
self._started = False
self._handshake_completed = False
@ -179,7 +180,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
}
logger.debug(
f"Created QUIC connection to {peer_id} "
f"Created QUIC connection to {remote_peer_id} "
f"(initiator: {is_initiator}, addr: {remote_addr}, "
"security: {security_manager is not None})"
)
@ -238,7 +239,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
def remote_peer_id(self) -> ID | None:
"""Get the remote peer ID."""
return self.peer_id
return self._remote_peer_id
# *** NEW: Connection ID management methods ***
def get_connection_id_stats(self) -> dict[str, Any]:
@ -277,7 +278,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._started = True
self.event_started.set()
logger.debug(f"Starting QUIC connection to {self.peer_id}")
logger.debug(f"Starting QUIC connection to {self._remote_peer_id}")
try:
# If this is a client connection, we need to establish the connection
@ -288,7 +289,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._established = True
self._connected_event.set()
logger.debug(f"QUIC connection to {self.peer_id} started")
logger.debug(f"QUIC connection to {self._remote_peer_id} started")
except Exception as e:
logger.error(f"Failed to start connection: {e}")
@ -360,7 +361,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
await self._verify_peer_identity_with_security()
self._established = True
logger.info(f"QUIC connection established with {self.peer_id}")
logger.info(f"QUIC connection established with {self._remote_peer_id}")
except Exception as e:
logger.error(f"Failed to establish connection: {e}")
@ -495,16 +496,16 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Verify peer identity using security manager
verified_peer_id = self._security_manager.verify_peer_identity(
self._peer_certificate,
self.peer_id, # Expected peer ID for outbound connections
self._remote_peer_id, # Expected peer ID for outbound connections
)
# Update peer ID if it wasn't known (inbound connections)
if not self.peer_id:
self.peer_id = verified_peer_id
if not self._remote_peer_id:
self._remote_peer_id = verified_peer_id
logger.info(f"Discovered peer ID from certificate: {verified_peer_id}")
elif self.peer_id != verified_peer_id:
elif self._remote_peer_id != verified_peer_id:
raise QUICPeerVerificationError(
f"Peer ID mismatch: expected {self.peer_id}, got {verified_peer_id}"
f"Peer ID mismatch: expected {self._remote_peer_id}, got {verified_peer_id}"
)
self._peer_verified = True
@ -608,7 +609,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
info: dict[str, bool | Any | None] = {
"peer_verified": self._peer_verified,
"handshake_complete": self._handshake_completed,
"peer_id": str(self.peer_id) if self.peer_id else None,
"peer_id": str(self._remote_peer_id) if self._remote_peer_id else None,
"local_peer_id": str(self._local_peer_id),
"is_initiator": self.__is_initiator,
"has_certificate": self._peer_certificate is not None,
@ -742,6 +743,9 @@ class QUICConnection(IRawConnection, IMuxedConn):
with trio.move_on_after(timeout):
while True:
if self._closed:
raise MuxedConnUnavailable("QUIC connection is closed")
async with self._accept_queue_lock:
if self._stream_accept_queue:
stream = self._stream_accept_queue.pop(0)
@ -749,15 +753,20 @@ class QUICConnection(IRawConnection, IMuxedConn):
return stream
if self._closed:
raise QUICConnectionClosedError(
raise MuxedConnUnavailable(
"Connection closed while accepting stream"
)
# Wait for new streams
await self._stream_accept_event.wait()
self._stream_accept_event = trio.Event()
raise QUICStreamTimeoutError(f"Stream accept timed out after {timeout}s")
print(
f"{id(self)} ACCEPT STREAM TIMEOUT: CONNECTION STATE {self._closed_event.is_set() or self._closed}"
)
if self._closed_event.is_set() or self._closed:
raise MuxedConnUnavailable("QUIC connection closed during timeout")
else:
raise QUICStreamTimeoutError(f"Stream accept timed out after {timeout}s")
def set_stream_handler(self, handler_function: TQUICStreamHandlerFn) -> None:
"""
@ -979,6 +988,11 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._closed = True
self._closed_event.set()
self._stream_accept_event.set()
print(f"✅ TERMINATION: Woke up pending accept_stream() calls, {id(self)}")
await self._notify_parent_of_termination()
async def _handle_stream_data(self, event: events.StreamDataReceived) -> None:
"""Handle stream data events - create streams and add to accept queue."""
stream_id = event.stream_id
@ -1191,7 +1205,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
return
self._closed = True
logger.debug(f"Closing QUIC connection to {self.peer_id}")
logger.debug(f"Closing QUIC connection to {self._remote_peer_id}")
try:
# Close all streams gracefully
@ -1233,11 +1247,62 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._streams.clear()
self._closed_event.set()
logger.debug(f"QUIC connection to {self.peer_id} closed")
logger.debug(f"QUIC connection to {self._remote_peer_id} closed")
except Exception as e:
logger.error(f"Error during connection close: {e}")
async def _notify_parent_of_termination(self) -> None:
"""
Notify the parent listener/transport to remove this connection from tracking.
This ensures that terminated connections are cleaned up from the
'established connections' list.
"""
try:
if self._transport:
await self._transport._cleanup_terminated_connection(self)
logger.debug("Notified transport of connection termination")
return
for listener in self._transport._listeners:
try:
await listener._remove_connection_by_object(self)
logger.debug(
"Found and notified listener of connection termination"
)
return
except Exception:
continue
# Method 4: Use connection ID if we have one (most reliable)
if self._current_connection_id:
await self._cleanup_by_connection_id(self._current_connection_id)
return
logger.warning(
"Could not notify parent of connection termination - no parent reference found"
)
except Exception as e:
logger.error(f"Error notifying parent of connection termination: {e}")
async def _cleanup_by_connection_id(self, connection_id: bytes) -> None:
"""Cleanup using connection ID as a fallback method."""
try:
for listener in self._transport._listeners:
for tracked_cid, tracked_conn in list(listener._connections.items()):
if tracked_conn is self:
await listener._remove_connection(tracked_cid)
logger.debug(
f"Removed connection {tracked_cid.hex()} by object reference"
)
return
logger.debug("Fallback cleanup by connection ID completed")
except Exception as e:
logger.error(f"Error in fallback cleanup: {e}")
# IRawConnection interface (for compatibility)
def get_remote_address(self) -> tuple[str, int]:
@ -1333,7 +1398,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
def __repr__(self) -> str:
return (
f"QUICConnection(peer={self.peer_id}, "
f"QUICConnection(peer={self._remote_peer_id}, "
f"addr={self._remote_addr}, "
f"initiator={self.__is_initiator}, "
f"verified={self._peer_verified}, "
@ -1343,4 +1408,4 @@ class QUICConnection(IRawConnection, IMuxedConn):
)
def __str__(self) -> str:
return f"QUICConnection({self.peer_id})"
return f"QUICConnection({self._remote_peer_id})"

View File

@ -880,42 +880,49 @@ class QUICListener(IListener):
async def _promote_pending_connection(
self, quic_conn: QuicConnection, addr: tuple[str, int], dest_cid: bytes
) -> None:
"""Promote a pending connection to an established connection."""
):
"""Promote pending connection - avoid duplicate creation."""
try:
# Remove from pending connections
self._pending_connections.pop(dest_cid, None)
# Create multiaddr for this connection
host, port = addr
quic_version = "quic"
remote_maddr = create_quic_multiaddr(host, port, f"/{quic_version}")
# CHECK: Does QUICConnection already exist?
if dest_cid in self._connections:
connection = self._connections[dest_cid]
print(
f"🔄 PROMOTION: Using existing QUICConnection {id(connection)} for {dest_cid.hex()}"
)
else:
from .connection import QUICConnection
from .connection import QUICConnection
host, port = addr
quic_version = "quic"
remote_maddr = create_quic_multiaddr(host, port, f"/{quic_version}")
connection = QUICConnection(
quic_connection=quic_conn,
remote_addr=addr,
peer_id=None,
local_peer_id=self._transport._peer_id,
is_initiator=False,
maddr=remote_maddr,
transport=self._transport,
security_manager=self._security_manager,
listener_socket=self._socket,
)
connection = QUICConnection(
quic_connection=quic_conn,
remote_addr=addr,
remote_peer_id=None,
local_peer_id=self._transport._peer_id,
is_initiator=False,
maddr=remote_maddr,
transport=self._transport,
security_manager=self._security_manager,
listener_socket=self._socket,
)
print(
f"🔧 PROMOTION: Created connection with socket: {self._socket is not None}"
)
print(
f"🔧 PROMOTION: Socket type: {type(self._socket) if self._socket else 'None'}"
)
print(
f"🔄 PROMOTION: Created NEW QUICConnection {id(connection)} for {dest_cid.hex()}"
)
self._connections[dest_cid] = connection
# Store the connection
self._connections[dest_cid] = connection
# Update mappings
self._addr_to_cid[addr] = dest_cid
self._cid_to_addr[dest_cid] = addr
# Rest of the existing promotion code...
if self._nursery:
await connection.connect(self._nursery)
@ -932,10 +939,11 @@ class QUICListener(IListener):
await connection.close()
return
# Call the connection handler
if self._nursery:
self._nursery.start_soon(
self._handle_new_established_connection, connection
if self._transport._swarm:
print(f"🔄 PROMOTION: Adding connection {id(connection)} to swarm")
await self._transport._swarm.add_conn(connection)
print(
f"🔄 PROMOTION: Successfully added connection {id(connection)} to swarm"
)
self._stats["connections_accepted"] += 1
@ -946,7 +954,6 @@ class QUICListener(IListener):
except Exception as e:
logger.error(f"❌ Error promoting connection {dest_cid.hex()}: {e}")
await self._remove_connection(dest_cid)
self._stats["connections_rejected"] += 1
async def _remove_connection(self, dest_cid: bytes) -> None:
"""Remove connection by connection ID."""
@ -1220,6 +1227,32 @@ class QUICListener(IListener):
except Exception as e:
logger.error(f"Error closing listener: {e}")
async def _remove_connection_by_object(self, connection_obj) -> None:
"""Remove a connection by object reference (called when connection terminates)."""
try:
# Find the connection ID for this object
connection_cid = None
for cid, tracked_connection in self._connections.items():
if tracked_connection is connection_obj:
connection_cid = cid
break
if connection_cid:
await self._remove_connection(connection_cid)
logger.debug(
f"✅ TERMINATION: Removed connection {connection_cid.hex()} by object reference"
)
print(
f"✅ TERMINATION: Removed connection {connection_cid.hex()} by object reference"
)
else:
logger.warning("⚠️ TERMINATION: Connection object not found in tracking")
print("⚠️ TERMINATION: Connection object not found in tracking")
except Exception as e:
logger.error(f"❌ TERMINATION: Error removing connection by object: {e}")
print(f"❌ TERMINATION: Error removing connection by object: {e}")
def get_addresses(self) -> list[Multiaddr]:
"""Get the bound addresses."""
return self._bound_addresses.copy()

View File

@ -218,13 +218,11 @@ class QUICTransport(ITransport):
"""
try:
# Access attributes directly from QUICTLSSecurityConfig
config.certificate = tls_config.certificate
config.private_key = tls_config.private_key
config.certificate_chain = tls_config.certificate_chain
config.alpn_protocols = tls_config.alpn_protocols
# Set verification mode (though libp2p typically doesn't verify)
config.verify_mode = tls_config.verify_mode
config.verify_mode = ssl.CERT_NONE
@ -285,12 +283,12 @@ class QUICTransport(ITransport):
connection = QUICConnection(
quic_connection=native_quic_connection,
remote_addr=(host, port),
peer_id=peer_id,
remote_peer_id=peer_id,
local_peer_id=self._peer_id,
is_initiator=True,
maddr=maddr,
transport=self,
security_manager=self._security_manager, # Pass security manager
security_manager=self._security_manager,
)
# Establish connection using trio
@ -389,7 +387,7 @@ class QUICTransport(ITransport):
handler_function=handler_function,
quic_configs=server_configs,
config=self._config,
security_manager=self._security_manager, # Pass security manager
security_manager=self._security_manager,
)
self._listeners.append(listener)
@ -456,6 +454,17 @@ class QUICTransport(ITransport):
print("QUIC transport closed")
async def _cleanup_terminated_connection(self, connection) -> None:
"""Clean up a terminated connection from all listeners."""
try:
for listener in self._listeners:
await listener._remove_connection_by_object(connection)
logger.debug(
"✅ TRANSPORT: Cleaned up terminated connection from all listeners"
)
except Exception as e:
logger.error(f"❌ TRANSPORT: Error cleaning up terminated connection: {e}")
def get_stats(self) -> dict[str, int | list[str] | object]:
"""Get transport statistics including security info."""
return {

View File

@ -69,7 +69,7 @@ class TestQUICConnection:
return QUICConnection(
quic_connection=mock_quic_connection,
remote_addr=("127.0.0.1", 4001),
peer_id=peer_id,
remote_peer_id=None,
local_peer_id=peer_id,
is_initiator=True,
maddr=Multiaddr("/ip4/127.0.0.1/udp/4001/quic"),
@ -87,7 +87,7 @@ class TestQUICConnection:
return QUICConnection(
quic_connection=mock_quic_connection,
remote_addr=("127.0.0.1", 4001),
peer_id=peer_id,
remote_peer_id=peer_id,
local_peer_id=peer_id,
is_initiator=False,
maddr=Multiaddr("/ip4/127.0.0.1/udp/4001/quic"),
@ -117,7 +117,7 @@ class TestQUICConnection:
client_conn = QUICConnection(
quic_connection=Mock(),
remote_addr=("127.0.0.1", 4001),
peer_id=None,
remote_peer_id=None,
local_peer_id=Mock(),
is_initiator=True,
maddr=Multiaddr("/ip4/127.0.0.1/udp/4001/quic"),
@ -129,7 +129,7 @@ class TestQUICConnection:
server_conn = QUICConnection(
quic_connection=Mock(),
remote_addr=("127.0.0.1", 4001),
peer_id=None,
remote_peer_id=None,
local_peer_id=Mock(),
is_initiator=False,
maddr=Multiaddr("/ip4/127.0.0.1/udp/4001/quic"),