fix: accept stream on server side

This commit is contained in:
Akash Mondal
2025-07-02 12:40:21 +00:00
committed by lla-dane
parent 6c45862fe9
commit c15c317514
9 changed files with 1444 additions and 1743 deletions

View File

@ -18,6 +18,7 @@ from libp2p.stream_muxer.exceptions import (
MuxedStreamError,
MuxedStreamReset,
)
from libp2p.transport.quic.exceptions import QUICStreamClosedError, QUICStreamResetError
from .exceptions import (
StreamClosed,
@ -174,7 +175,7 @@ class NetStream(INetStream):
print("NETSTREAM: READ ERROR, NEW STATE -> CLOSE_READ")
self.__stream_state = StreamState.CLOSE_READ
raise StreamEOF() from error
except MuxedStreamReset as error:
except (MuxedStreamReset, QUICStreamClosedError, QUICStreamResetError) as error:
print("NETSTREAM: READ ERROR, MUXED STREAM RESET")
async with self._state_lock:
if self.__stream_state in [
@ -205,7 +206,12 @@ class NetStream(INetStream):
try:
await self.muxed_stream.write(data)
except (MuxedStreamClosed, MuxedStreamError) as error:
except (
MuxedStreamClosed,
MuxedStreamError,
QUICStreamClosedError,
QUICStreamResetError,
) as error:
async with self._state_lock:
if self.__stream_state == StreamState.OPEN:
self.__stream_state = StreamState.CLOSE_WRITE

View File

@ -179,7 +179,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
"connection_id_changes": 0,
}
logger.debug(
print(
f"Created QUIC connection to {remote_peer_id} "
f"(initiator: {is_initiator}, addr: {remote_addr}, "
"security: {security_manager is not None})"
@ -278,7 +278,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._started = True
self.event_started.set()
logger.debug(f"Starting QUIC connection to {self._remote_peer_id}")
print(f"Starting QUIC connection to {self._remote_peer_id}")
try:
# If this is a client connection, we need to establish the connection
@ -289,7 +289,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._established = True
self._connected_event.set()
logger.debug(f"QUIC connection to {self._remote_peer_id} started")
print(f"QUIC connection to {self._remote_peer_id} started")
except Exception as e:
logger.error(f"Failed to start connection: {e}")
@ -300,7 +300,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
try:
with QUICErrorContext("connection_initiation", "connection"):
if not self._socket:
logger.debug("Creating new socket for outbound connection")
print("Creating new socket for outbound connection")
self._socket = trio.socket.socket(
family=socket.AF_INET, type=socket.SOCK_DGRAM
)
@ -312,7 +312,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Send initial packet(s)
await self._transmit()
logger.debug(f"Initiated QUIC connection to {self._remote_addr}")
print(f"Initiated QUIC connection to {self._remote_addr}")
except Exception as e:
logger.error(f"Failed to initiate connection: {e}")
@ -340,10 +340,10 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Start background event processing
if not self._background_tasks_started:
logger.debug("STARTING BACKGROUND TASK")
print("STARTING BACKGROUND TASK")
await self._start_background_tasks()
else:
logger.debug("BACKGROUND TASK ALREADY STARTED")
print("BACKGROUND TASK ALREADY STARTED")
# Wait for handshake completion with timeout
with trio.move_on_after(
@ -357,11 +357,13 @@ class QUICConnection(IRawConnection, IMuxedConn):
f"{self.CONNECTION_HANDSHAKE_TIMEOUT}s"
)
print("QUICConnection: Verifying peer identity with security manager")
# Verify peer identity using security manager
await self._verify_peer_identity_with_security()
print("QUICConnection: Peer identity verified")
self._established = True
logger.info(f"QUIC connection established with {self._remote_peer_id}")
print(f"QUIC connection established with {self._remote_peer_id}")
except Exception as e:
logger.error(f"Failed to establish connection: {e}")
@ -375,21 +377,26 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._background_tasks_started = True
if self.__is_initiator: # Only for client connections
if self.__is_initiator:
print(f"CLIENT CONNECTION {id(self)}: Starting processing event loop")
self._nursery.start_soon(async_fn=self._client_packet_receiver)
# Start event processing task
self._nursery.start_soon(async_fn=self._event_processing_loop)
self._nursery.start_soon(async_fn=self._event_processing_loop)
else:
print(
f"SERVER CONNECTION {id(self)}: Using listener event forwarding, not own loop"
)
# Start periodic tasks
self._nursery.start_soon(async_fn=self._periodic_maintenance)
logger.debug("Started background tasks for QUIC connection")
print("Started background tasks for QUIC connection")
async def _event_processing_loop(self) -> None:
"""Main event processing loop for the connection."""
logger.debug("Started QUIC event processing loop")
print("Started QUIC event processing loop")
print(
f"Started QUIC event processing loop for connection id: {id(self)} "
f"and local peer id {str(self.local_peer_id())}"
)
try:
while not self._closed:
@ -409,7 +416,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
logger.error(f"Error in event processing loop: {e}")
await self._handle_connection_error(e)
finally:
logger.debug("QUIC event processing loop finished")
print("QUIC event processing loop finished")
async def _periodic_maintenance(self) -> None:
"""Perform periodic connection maintenance."""
@ -424,7 +431,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
# *** NEW: Log connection ID status periodically ***
if logger.isEnabledFor(logging.DEBUG):
cid_stats = self.get_connection_id_stats()
logger.debug(f"Connection ID stats: {cid_stats}")
print(f"Connection ID stats: {cid_stats}")
# Sleep for maintenance interval
await trio.sleep(30.0) # 30 seconds
@ -434,7 +441,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
async def _client_packet_receiver(self) -> None:
"""Receive packets for client connections."""
logger.debug("Starting client packet receiver")
print("Starting client packet receiver")
print("Started QUIC client packet receiver")
try:
@ -454,7 +461,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
await self._transmit()
except trio.ClosedResourceError:
logger.debug("Client socket closed")
print("Client socket closed")
break
except Exception as e:
logger.error(f"Error receiving client packet: {e}")
@ -464,7 +471,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
logger.info("Client packet receiver cancelled")
raise
finally:
logger.debug("Client packet receiver terminated")
print("Client packet receiver terminated")
# Security and identity methods
@ -534,14 +541,14 @@ class QUICConnection(IRawConnection, IMuxedConn):
# aioquic stores the peer certificate as cryptography
# x509.Certificate
self._peer_certificate = tls_context._peer_certificate
logger.debug(
print(
f"Extracted peer certificate: {self._peer_certificate.subject}"
)
else:
logger.debug("No peer certificate found in TLS context")
print("No peer certificate found in TLS context")
else:
logger.debug("No TLS context available for certificate extraction")
print("No TLS context available for certificate extraction")
except Exception as e:
logger.warning(f"Failed to extract peer certificate: {e}")
@ -554,12 +561,10 @@ class QUICConnection(IRawConnection, IMuxedConn):
if hasattr(config, "certificate") and config.certificate:
# This would be the local certificate, not peer certificate
# but we can use it for debugging
logger.debug("Found local certificate in configuration")
print("Found local certificate in configuration")
except Exception as inner_e:
logger.debug(
f"Alternative certificate extraction also failed: {inner_e}"
)
print(f"Alternative certificate extraction also failed: {inner_e}")
async def get_peer_certificate(self) -> x509.Certificate | None:
"""
@ -591,7 +596,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
subject = self._peer_certificate.subject
serial_number = self._peer_certificate.serial_number
logger.debug(
print(
f"Certificate validation - Subject: {subject}, Serial: {serial_number}"
)
return True
@ -716,7 +721,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._outbound_stream_count += 1
self._stats["streams_opened"] += 1
logger.debug(f"Opened outbound QUIC stream {stream_id}")
print(f"Opened outbound QUIC stream {stream_id}")
return stream
raise QUICStreamTimeoutError(f"Stream creation timed out after {timeout}s")
@ -749,7 +754,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
async with self._accept_queue_lock:
if self._stream_accept_queue:
stream = self._stream_accept_queue.pop(0)
logger.debug(f"Accepted inbound stream {stream.stream_id}")
print(f"Accepted inbound stream {stream.stream_id}")
return stream
if self._closed:
@ -777,7 +782,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
"""
self._stream_handler = handler_function
logger.debug("Set stream handler for incoming streams")
print("Set stream handler for incoming streams")
def _remove_stream(self, stream_id: int) -> None:
"""
@ -804,7 +809,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
if self._nursery:
self._nursery.start_soon(update_counts)
logger.debug(f"Removed stream {stream_id} from connection")
print(f"Removed stream {stream_id} from connection")
# *** UPDATED: Complete QUIC event handling - FIXES THE ORIGINAL ISSUE ***
@ -826,14 +831,14 @@ class QUICConnection(IRawConnection, IMuxedConn):
await self._handle_quic_event(event)
if events_processed > 0:
logger.debug(f"Processed {events_processed} QUIC events")
print(f"Processed {events_processed} QUIC events")
finally:
self._event_processing_active = False
async def _handle_quic_event(self, event: events.QuicEvent) -> None:
"""Handle a single QUIC event with COMPLETE event type coverage."""
logger.debug(f"Handling QUIC event: {type(event).__name__}")
print(f"Handling QUIC event: {type(event).__name__}")
print(f"QUIC event: {type(event).__name__}")
try:
@ -860,7 +865,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
elif isinstance(event, events.StopSendingReceived):
await self._handle_stop_sending_received(event)
else:
logger.debug(f"Unhandled QUIC event type: {type(event).__name__}")
print(f"Unhandled QUIC event type: {type(event).__name__}")
print(f"Unhandled QUIC event: {type(event).__name__}")
except Exception as e:
@ -891,7 +896,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Update statistics
self._stats["connection_ids_issued"] += 1
logger.debug(f"Available connection IDs: {len(self._available_connection_ids)}")
print(f"Available connection IDs: {len(self._available_connection_ids)}")
print(f"Available connection IDs: {len(self._available_connection_ids)}")
async def _handle_connection_id_retired(
@ -932,7 +937,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
async def _handle_ping_acknowledged(self, event: events.PingAcknowledged) -> None:
"""Handle ping acknowledgment."""
logger.debug(f"Ping acknowledged: uid={event.uid}")
print(f"Ping acknowledged: uid={event.uid}")
async def _handle_protocol_negotiated(
self, event: events.ProtocolNegotiated
@ -944,7 +949,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self, event: events.StopSendingReceived
) -> None:
"""Handle stop sending request from peer."""
logger.debug(
print(
f"Stop sending received: stream_id={event.stream_id}, error_code={event.error_code}"
)
@ -960,7 +965,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self, event: events.HandshakeCompleted
) -> None:
"""Handle handshake completion with security integration."""
logger.debug("QUIC handshake completed")
print("QUIC handshake completed")
self._handshake_completed = True
# Store handshake event for security verification
@ -969,6 +974,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Try to extract certificate information after handshake
await self._extract_peer_certificate()
print("✅ Setting connected event")
self._connected_event.set()
async def _handle_connection_terminated(
@ -1100,7 +1106,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
except Exception as e:
logger.error(f"Error in stream handler for stream {stream_id}: {e}")
logger.debug(f"Created inbound stream {stream_id}")
print(f"Created inbound stream {stream_id}")
return stream
def _is_incoming_stream(self, stream_id: int) -> bool:
@ -1127,7 +1133,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
try:
stream = self._streams[stream_id]
await stream.handle_reset(event.error_code)
logger.debug(
print(
f"Handled reset for stream {stream_id}"
f"with error code {event.error_code}"
)
@ -1136,13 +1142,13 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Force remove the stream
self._remove_stream(stream_id)
else:
logger.debug(f"Received reset for unknown stream {stream_id}")
print(f"Received reset for unknown stream {stream_id}")
async def _handle_datagram_received(
self, event: events.DatagramFrameReceived
) -> None:
"""Handle datagram frame (if using QUIC datagrams)."""
logger.debug(f"Datagram frame received: size={len(event.data)}")
print(f"Datagram frame received: size={len(event.data)}")
# For now, just log. Could be extended for custom datagram handling
async def _handle_timer_events(self) -> None:
@ -1205,7 +1211,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
return
self._closed = True
logger.debug(f"Closing QUIC connection to {self._remote_peer_id}")
print(f"Closing QUIC connection to {self._remote_peer_id}")
try:
# Close all streams gracefully
@ -1247,7 +1253,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._streams.clear()
self._closed_event.set()
logger.debug(f"QUIC connection to {self._remote_peer_id} closed")
print(f"QUIC connection to {self._remote_peer_id} closed")
except Exception as e:
logger.error(f"Error during connection close: {e}")
@ -1262,15 +1268,13 @@ class QUICConnection(IRawConnection, IMuxedConn):
try:
if self._transport:
await self._transport._cleanup_terminated_connection(self)
logger.debug("Notified transport of connection termination")
print("Notified transport of connection termination")
return
for listener in self._transport._listeners:
try:
await listener._remove_connection_by_object(self)
logger.debug(
"Found and notified listener of connection termination"
)
print("Found and notified listener of connection termination")
return
except Exception:
continue
@ -1294,12 +1298,12 @@ class QUICConnection(IRawConnection, IMuxedConn):
for tracked_cid, tracked_conn in list(listener._connections.items()):
if tracked_conn is self:
await listener._remove_connection(tracked_cid)
logger.debug(
print(
f"Removed connection {tracked_cid.hex()} by object reference"
)
return
logger.debug("Fallback cleanup by connection ID completed")
print("Fallback cleanup by connection ID completed")
except Exception as e:
logger.error(f"Error in fallback cleanup: {e}")

View File

@ -130,8 +130,6 @@ class QUICListener(IListener):
"invalid_packets": 0,
}
logger.debug("Initialized enhanced QUIC listener with connection ID support")
def _get_supported_versions(self) -> set[int]:
"""Get wire format versions for all supported QUIC configurations."""
versions: set[int] = set()
@ -274,87 +272,82 @@ class QUICListener(IListener):
return value, 8
async def _process_packet(self, data: bytes, addr: tuple[str, int]) -> None:
"""
Enhanced packet processing with better connection ID routing and debugging.
"""
"""Process incoming QUIC packet with fine-grained locking."""
try:
# self._stats["packets_processed"] += 1
# self._stats["bytes_received"] += len(data)
self._stats["packets_processed"] += 1
self._stats["bytes_received"] += len(data)
print(f"🔧 PACKET: Processing {len(data)} bytes from {addr}")
# Parse packet to extract connection information
# Parse packet header OUTSIDE the lock
packet_info = self.parse_quic_packet(data)
if packet_info is None:
print("❌ PACKET: Failed to parse packet header")
self._stats["invalid_packets"] += 1
return
dest_cid = packet_info.destination_cid
print(f"🔧 DEBUG: Packet info: {packet_info is not None}")
if packet_info:
print(f"🔧 DEBUG: Packet type: {packet_info.packet_type}")
print(
f"🔧 DEBUG: Is short header: {packet_info.packet_type == QuicPacketType.ONE_RTT}"
)
print(f"🔧 DEBUG: Packet type: {packet_info.packet_type}")
print(
f"🔧 DEBUG: Is short header: {packet_info.packet_type.name != 'INITIAL'}"
)
print(
f"🔧 DEBUG: Pending connections: {[cid.hex() for cid in self._pending_connections.keys()]}"
)
print(
f"🔧 DEBUG: Established connections: {[cid.hex() for cid in self._connections.keys()]}"
)
# CRITICAL FIX: Reduce lock scope - only protect connection lookups
# Get connection references with minimal lock time
connection_obj = None
pending_quic_conn = None
async with self._connection_lock:
if packet_info:
# Quick lookup operations only
print(
f"🔧 DEBUG: Pending connections: {[cid.hex() for cid in self._pending_connections.keys()]}"
)
print(
f"🔧 DEBUG: Established connections: {[cid.hex() for cid in self._connections.keys()]}"
)
if dest_cid in self._connections:
connection_obj = self._connections[dest_cid]
print(
f"🔧 PACKET: Parsed packet - version: 0x{packet_info.version:08x}, "
f"dest_cid: {packet_info.destination_cid.hex()}, "
f"src_cid: {packet_info.source_cid.hex()}"
f" PACKET: Routing to established connection {dest_cid.hex()}"
)
# Check for version negotiation
if packet_info.version == 0:
logger.warning(
f"Received version negotiation packet from {addr}"
)
return
# Check if version is supported
if packet_info.version not in self._supported_versions:
print(
f"❌ PACKET: Unsupported version 0x{packet_info.version:08x}"
)
await self._send_version_negotiation(
addr, packet_info.source_cid
)
return
# Route based on destination connection ID
dest_cid = packet_info.destination_cid
# First, try exact connection ID match
if dest_cid in self._connections:
print(
f"✅ PACKET: Routing to established connection {dest_cid.hex()}"
)
connection = self._connections[dest_cid]
await self._route_to_connection(connection, data, addr)
return
elif dest_cid in self._pending_connections:
print(
f"✅ PACKET: Routing to pending connection {dest_cid.hex()}"
)
quic_conn = self._pending_connections[dest_cid]
await self._handle_pending_connection(
quic_conn, data, addr, dest_cid
)
return
# No existing connection found, create new one
print(f"🔧 PACKET: Creating new connection for {addr}")
await self._handle_new_connection(data, addr, packet_info)
elif dest_cid in self._pending_connections:
pending_quic_conn = self._pending_connections[dest_cid]
print(f"✅ PACKET: Routing to pending connection {dest_cid.hex()}")
else:
# Failed to parse packet
print(f"❌ PACKET: Failed to parse packet from {addr}")
await self._handle_short_header_packet(data, addr)
# Check if this is a new connection
print(
f"🔧 PACKET: Parsed packet - version: {packet_info.version:#x}, dest_cid: {dest_cid.hex()}, src_cid: {packet_info.source_cid.hex()}"
)
if packet_info.packet_type.name == "INITIAL":
print(f"🔧 PACKET: Creating new connection for {addr}")
# Create new connection INSIDE the lock for safety
pending_quic_conn = await self._handle_new_connection(
data, addr, packet_info
)
else:
print(
f"❌ PACKET: Unknown connection for non-initial packet {dest_cid.hex()}"
)
return
# CRITICAL: Process packets OUTSIDE the lock to prevent deadlock
if connection_obj:
# Handle established connection
await self._handle_established_connection_packet(
connection_obj, data, addr, dest_cid
)
elif pending_quic_conn:
# Handle pending connection
await self._handle_pending_connection_packet(
pending_quic_conn, data, addr, dest_cid
)
except Exception as e:
logger.error(f"Error processing packet from {addr}: {e}")
@ -362,6 +355,66 @@ class QUICListener(IListener):
traceback.print_exc()
async def _handle_established_connection_packet(
self,
connection_obj: QUICConnection,
data: bytes,
addr: tuple[str, int],
dest_cid: bytes,
) -> None:
"""Handle packet for established connection WITHOUT holding connection lock."""
try:
print(f"🔧 ESTABLISHED: Handling packet for connection {dest_cid.hex()}")
# Forward packet to connection object
# This may trigger event processing and stream creation
await self._route_to_connection(connection_obj, data, addr)
except Exception as e:
logger.error(f"Error handling established connection packet: {e}")
async def _handle_pending_connection_packet(
self,
quic_conn: QuicConnection,
data: bytes,
addr: tuple[str, int],
dest_cid: bytes,
) -> None:
"""Handle packet for pending connection WITHOUT holding connection lock."""
try:
print(
f"🔧 PENDING: Handling packet for pending connection {dest_cid.hex()}"
)
print(f"🔧 PENDING: Packet size: {len(data)} bytes from {addr}")
# Feed data to QUIC connection
quic_conn.receive_datagram(data, addr, now=time.time())
print("✅ PENDING: Datagram received by QUIC connection")
# Process events - this is crucial for handshake progression
print("🔧 PENDING: Processing QUIC events...")
await self._process_quic_events(quic_conn, addr, dest_cid)
# Send any outgoing packets
print("🔧 PENDING: Transmitting response...")
await self._transmit_for_connection(quic_conn, addr)
# Check if handshake completed (with minimal locking)
if (
hasattr(quic_conn, "_handshake_complete")
and quic_conn._handshake_complete
):
print("✅ PENDING: Handshake completed, promoting connection")
await self._promote_pending_connection(quic_conn, addr, dest_cid)
else:
print("🔧 PENDING: Handshake still in progress")
except Exception as e:
logger.error(f"Error handling pending connection {dest_cid.hex()}: {e}")
import traceback
traceback.print_exc()
async def _send_version_negotiation(
self, addr: tuple[str, int], source_cid: bytes
) -> None:
@ -784,6 +837,9 @@ class QUICListener(IListener):
# Forward to established connection if available
if dest_cid in self._connections:
connection = self._connections[dest_cid]
print(
f"📨 FORWARDING: Stream data to connection {id(connection)}"
)
await connection._handle_stream_data(event)
elif isinstance(event, events.StreamReset):
@ -892,6 +948,7 @@ class QUICListener(IListener):
print(
f"🔄 PROMOTION: Using existing QUICConnection {id(connection)} for {dest_cid.hex()}"
)
else:
from .connection import QUICConnection
@ -924,7 +981,9 @@ class QUICListener(IListener):
# Rest of the existing promotion code...
if self._nursery:
connection._nursery = self._nursery
await connection.connect(self._nursery)
print("QUICListener: Connection connected succesfully")
if self._security_manager:
try:
@ -939,6 +998,11 @@ class QUICListener(IListener):
await connection.close()
return
if self._nursery:
connection._nursery = self._nursery
await connection._start_background_tasks()
print(f"Started background tasks for connection {dest_cid.hex()}")
if self._transport._swarm:
print(f"🔄 PROMOTION: Adding connection {id(connection)} to swarm")
await self._transport._swarm.add_conn(connection)
@ -946,6 +1010,14 @@ class QUICListener(IListener):
f"🔄 PROMOTION: Successfully added connection {id(connection)} to swarm"
)
if self._handler:
try:
print(f"Invoking user callback {dest_cid.hex()}")
await self._handler(connection)
except Exception as e:
logger.error(f"Error in user callback: {e}")
self._stats["connections_accepted"] += 1
logger.info(
f"✅ Enhanced connection {dest_cid.hex()} established from {addr}"

View File

@ -88,7 +88,7 @@ class QUICTransport(ITransport):
def __init__(
self, private_key: PrivateKey, config: QUICTransportConfig | None = None
):
) -> None:
"""
Initialize QUIC transport with security integration.
@ -119,7 +119,7 @@ class QUICTransport(ITransport):
self._nursery_manager = trio.CapacityLimiter(1)
self._background_nursery: trio.Nursery | None = None
self._swarm = None
self._swarm: Swarm | None = None
print(f"Initialized QUIC transport with security for peer {self._peer_id}")
@ -233,13 +233,19 @@ class QUICTransport(ITransport):
raise QUICSecurityError(f"Failed to apply TLS configuration: {e}") from e
# type: ignore
async def dial(self, maddr: multiaddr.Multiaddr, peer_id: ID) -> QUICConnection:
async def dial(
self,
maddr: multiaddr.Multiaddr,
peer_id: ID,
nursery: trio.Nursery | None = None,
) -> QUICConnection:
"""
Dial a remote peer using QUIC transport with security verification.
Args:
maddr: Multiaddr of the remote peer (e.g., /ip4/1.2.3.4/udp/4001/quic-v1)
peer_id: Expected peer ID for verification
nursery: Nursery to execute the background tasks
Returns:
Raw connection interface to the remote peer
@ -278,7 +284,6 @@ class QUICTransport(ITransport):
# Create QUIC connection using aioquic's sans-IO core
native_quic_connection = NativeQUICConnection(configuration=config)
print("QUIC Connection Created")
# Create trio-based QUIC connection wrapper with security
connection = QUICConnection(
quic_connection=native_quic_connection,
@ -290,25 +295,22 @@ class QUICTransport(ITransport):
transport=self,
security_manager=self._security_manager,
)
print("QUIC Connection Created")
# Establish connection using trio
if self._background_nursery:
# Use swarm's long-lived nursery - background tasks persist!
await connection.connect(self._background_nursery)
print("Using background nursery for connection tasks")
else:
# Fallback to temporary nursery (with warning)
print(
"No background nursery available. Connection background tasks "
"may be cancelled when dial completes."
)
async with trio.open_nursery() as temp_nursery:
await connection.connect(temp_nursery)
active_nursery = nursery or self._background_nursery
if active_nursery is None:
logger.error("No nursery set to execute background tasks")
raise QUICDialError("No nursery found to execute tasks")
await connection.connect(active_nursery)
print("Starting to verify peer identity")
# Verify peer identity after TLS handshake
if peer_id:
await self._verify_peer_identity(connection, peer_id)
print("Identity verification done")
# Store connection for management
conn_id = f"{host}:{port}:{peer_id}"
self._connections[conn_id] = connection

View File

@ -0,0 +1,415 @@
"""
Basic QUIC Echo Test
Simple test to verify the basic QUIC flow:
1. Client connects to server
2. Client sends data
3. Server receives data and echoes back
4. Client receives the echo
This test focuses on identifying where the accept_stream issue occurs.
"""
import logging
import pytest
import trio
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.peer.id import ID
from libp2p.transport.quic.config import QUICTransportConfig
from libp2p.transport.quic.connection import QUICConnection
from libp2p.transport.quic.transport import QUICTransport
from libp2p.transport.quic.utils import create_quic_multiaddr
# Set up logging to see what's happening
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class TestBasicQUICFlow:
"""Test basic QUIC client-server communication flow."""
@pytest.fixture
def server_key(self):
"""Generate server key pair."""
return create_new_key_pair()
@pytest.fixture
def client_key(self):
"""Generate client key pair."""
return create_new_key_pair()
@pytest.fixture
def server_config(self):
"""Simple server configuration."""
return QUICTransportConfig(
idle_timeout=10.0,
connection_timeout=5.0,
max_concurrent_streams=10,
max_connections=5,
)
@pytest.fixture
def client_config(self):
"""Simple client configuration."""
return QUICTransportConfig(
idle_timeout=10.0,
connection_timeout=5.0,
max_concurrent_streams=5,
)
@pytest.mark.trio
async def test_basic_echo_flow(
self, server_key, client_key, server_config, client_config
):
"""Test basic client-server echo flow with detailed logging."""
print("\n=== BASIC QUIC ECHO TEST ===")
# Create server components
server_transport = QUICTransport(server_key.private_key, server_config)
server_peer_id = ID.from_pubkey(server_key.public_key)
# Track test state
server_received_data = None
server_connection_established = False
echo_sent = False
async def echo_server_handler(connection: QUICConnection) -> None:
"""Simple echo server handler with detailed logging."""
nonlocal server_received_data, server_connection_established, echo_sent
print("🔗 SERVER: Connection handler called")
server_connection_established = True
try:
print("📡 SERVER: Waiting for incoming stream...")
# Accept stream with timeout and detailed logging
print("📡 SERVER: Calling accept_stream...")
stream = await connection.accept_stream(timeout=5.0)
if stream is None:
print("❌ SERVER: accept_stream returned None")
return
print(f"✅ SERVER: Stream accepted! Stream ID: {stream.stream_id}")
# Read data from the stream
print("📖 SERVER: Reading data from stream...")
server_data = await stream.read(1024)
if not server_data:
print("❌ SERVER: No data received from stream")
return
server_received_data = server_data.decode("utf-8", errors="ignore")
print(f"📨 SERVER: Received data: '{server_received_data}'")
# Echo the data back
echo_message = f"ECHO: {server_received_data}"
print(f"📤 SERVER: Sending echo: '{echo_message}'")
await stream.write(echo_message.encode())
echo_sent = True
print("✅ SERVER: Echo sent successfully")
# Close the stream
await stream.close()
print("🔒 SERVER: Stream closed")
except Exception as e:
print(f"❌ SERVER: Error in handler: {e}")
import traceback
traceback.print_exc()
# Create listener
listener = server_transport.create_listener(echo_server_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
# Variables to track client state
client_connected = False
client_sent_data = False
client_received_echo = None
try:
print("🚀 Starting server...")
async with trio.open_nursery() as nursery:
# Start server listener
success = await listener.listen(listen_addr, nursery)
assert success, "Failed to start server listener"
# Get server address
server_addrs = listener.get_addrs()
server_addr = server_addrs[0]
print(f"🔧 SERVER: Listening on {server_addr}")
# Give server a moment to be ready
await trio.sleep(0.1)
print("🚀 Starting client...")
# Create client transport
client_transport = QUICTransport(client_key.private_key, client_config)
try:
# Connect to server
print(f"📞 CLIENT: Connecting to {server_addr}")
connection = await client_transport.dial(
server_addr, peer_id=server_peer_id, nursery=nursery
)
client_connected = True
print("✅ CLIENT: Connected to server")
# Open a stream
print("📤 CLIENT: Opening stream...")
stream = await connection.open_stream()
print(f"✅ CLIENT: Stream opened with ID: {stream.stream_id}")
# Send test data
test_message = "Hello QUIC Server!"
print(f"📨 CLIENT: Sending message: '{test_message}'")
await stream.write(test_message.encode())
client_sent_data = True
print("✅ CLIENT: Message sent")
# Read echo response
print("📖 CLIENT: Waiting for echo response...")
response_data = await stream.read(1024)
if response_data:
client_received_echo = response_data.decode(
"utf-8", errors="ignore"
)
print(f"📬 CLIENT: Received echo: '{client_received_echo}'")
else:
print("❌ CLIENT: No echo response received")
print("🔒 CLIENT: Closing connection")
await connection.close()
print("🔒 CLIENT: Connection closed")
print("🔒 CLIENT: Closing transport")
await client_transport.close()
print("🔒 CLIENT: Transport closed")
except Exception as e:
print(f"❌ CLIENT: Error: {e}")
import traceback
traceback.print_exc()
finally:
await client_transport.close()
print("🔒 CLIENT: Transport closed")
# Give everything time to complete
await trio.sleep(0.5)
# Cancel nursery to stop server
nursery.cancel_scope.cancel()
finally:
# Cleanup
if not listener._closed:
await listener.close()
await server_transport.close()
# Verify the flow worked
print("\n📊 TEST RESULTS:")
print(f" Server connection established: {server_connection_established}")
print(f" Client connected: {client_connected}")
print(f" Client sent data: {client_sent_data}")
print(f" Server received data: '{server_received_data}'")
print(f" Echo sent by server: {echo_sent}")
print(f" Client received echo: '{client_received_echo}'")
# Test assertions
assert server_connection_established, "Server connection handler was not called"
assert client_connected, "Client failed to connect"
assert client_sent_data, "Client failed to send data"
assert server_received_data == "Hello QUIC Server!", (
f"Server received wrong data: '{server_received_data}'"
)
assert echo_sent, "Server failed to send echo"
assert client_received_echo == "ECHO: Hello QUIC Server!", (
f"Client received wrong echo: '{client_received_echo}'"
)
print("✅ BASIC ECHO TEST PASSED!")
@pytest.mark.trio
async def test_server_accept_stream_timeout(
self, server_key, client_key, server_config, client_config
):
"""Test what happens when server accept_stream times out."""
print("\n=== TESTING SERVER ACCEPT_STREAM TIMEOUT ===")
server_transport = QUICTransport(server_key.private_key, server_config)
server_peer_id = ID.from_pubkey(server_key.public_key)
accept_stream_called = False
accept_stream_timeout = False
async def timeout_test_handler(connection: QUICConnection) -> None:
"""Handler that tests accept_stream timeout."""
nonlocal accept_stream_called, accept_stream_timeout
print("🔗 SERVER: Connection established, testing accept_stream timeout")
accept_stream_called = True
try:
print("📡 SERVER: Calling accept_stream with 2 second timeout...")
stream = await connection.accept_stream(timeout=2.0)
print(f"✅ SERVER: accept_stream returned: {stream}")
except Exception as e:
print(f"⏰ SERVER: accept_stream timed out or failed: {e}")
accept_stream_timeout = True
listener = server_transport.create_listener(timeout_test_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
client_connected = False
try:
async with trio.open_nursery() as nursery:
# Start server
success = await listener.listen(listen_addr, nursery)
assert success
server_addr = listener.get_addrs()[0]
print(f"🔧 SERVER: Listening on {server_addr}")
# Create client but DON'T open a stream
client_transport = QUICTransport(client_key.private_key, client_config)
try:
print("📞 CLIENT: Connecting (but NOT opening stream)...")
connection = await client_transport.dial(
server_addr, peer_id=server_peer_id, nursery=nursery
)
client_connected = True
print("✅ CLIENT: Connected (no stream opened)")
# Wait for server timeout
await trio.sleep(3.0)
await connection.close()
print("🔒 CLIENT: Connection closed")
finally:
await client_transport.close()
nursery.cancel_scope.cancel()
finally:
await listener.close()
await server_transport.close()
print("\n📊 TIMEOUT TEST RESULTS:")
print(f" Client connected: {client_connected}")
print(f" accept_stream called: {accept_stream_called}")
print(f" accept_stream timeout: {accept_stream_timeout}")
assert client_connected, "Client should have connected"
assert accept_stream_called, "accept_stream should have been called"
assert accept_stream_timeout, (
"accept_stream should have timed out when no stream was opened"
)
print("✅ TIMEOUT TEST PASSED!")
@pytest.mark.trio
async def test_debug_accept_stream_hanging(
self, server_key, client_key, server_config, client_config
):
"""Debug test to see exactly where accept_stream might be hanging."""
print("\n=== DEBUGGING ACCEPT_STREAM HANGING ===")
server_transport = QUICTransport(server_key.private_key, server_config)
server_peer_id = ID.from_pubkey(server_key.public_key)
async def debug_handler(connection: QUICConnection) -> None:
"""Handler with extensive debugging."""
print(f"🔗 SERVER: Handler called for connection {id(connection)} ")
print(f" Connection closed: {connection.is_closed}")
print(f" Connection started: {connection._started}")
print(f" Connection established: {connection._established}")
try:
print("📡 SERVER: About to call accept_stream...")
print(f" Accept queue length: {len(connection._stream_accept_queue)}")
print(
f" Accept event set: {connection._stream_accept_event.is_set()}"
)
# Use a short timeout to avoid hanging the test
with trio.move_on_after(3.0) as cancel_scope:
stream = await connection.accept_stream()
if stream:
print(f"✅ SERVER: Got stream {stream.stream_id}")
else:
print("❌ SERVER: accept_stream returned None")
if cancel_scope.cancelled_caught:
print("⏰ SERVER: accept_stream cancelled due to timeout")
except Exception as e:
print(f"❌ SERVER: Exception in accept_stream: {e}")
import traceback
traceback.print_exc()
listener = server_transport.create_listener(debug_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
try:
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
server_addr = listener.get_addrs()[0]
print(f"🔧 SERVER: Listening on {server_addr}")
# Create client and connect
client_transport = QUICTransport(client_key.private_key, client_config)
try:
print("📞 CLIENT: Connecting...")
connection = await client_transport.dial(
server_addr, peer_id=server_peer_id, nursery=nursery
)
print("✅ CLIENT: Connected")
# Open stream after a short delay
await trio.sleep(0.1)
print("📤 CLIENT: Opening stream...")
stream = await connection.open_stream()
print(f"📤 CLIENT: Stream {stream.stream_id} opened")
# Send some data
await stream.write(b"test data")
print("📨 CLIENT: Data sent")
# Give server time to process
await trio.sleep(1.0)
# Cleanup
await stream.close()
await connection.close()
print("🔒 CLIENT: Cleaned up")
finally:
await client_transport.close()
await trio.sleep(0.5)
nursery.cancel_scope.cancel()
finally:
await listener.close()
await server_transport.close()
print("✅ DEBUG TEST COMPLETED!")

View File

@ -295,7 +295,10 @@ class TestQUICConnection:
mock_verify.assert_called_once()
@pytest.mark.trio
async def test_connection_connect_timeout(self, quic_connection: QUICConnection):
@pytest.mark.slow
async def test_connection_connect_timeout(
self, quic_connection: QUICConnection
) -> None:
"""Test connection establishment timeout."""
quic_connection._started = True
# Don't set connected event to simulate timeout
@ -330,7 +333,7 @@ class TestQUICConnection:
# Error handling tests
@pytest.mark.trio
async def test_connection_error_handling(self, quic_connection):
async def test_connection_error_handling(self, quic_connection) -> None:
"""Test connection error handling."""
error = Exception("Test error")
@ -343,7 +346,7 @@ class TestQUICConnection:
# Statistics and monitoring tests
@pytest.mark.trio
async def test_connection_stats_enhanced(self, quic_connection):
async def test_connection_stats_enhanced(self, quic_connection) -> None:
"""Test enhanced connection statistics."""
quic_connection._started = True
@ -370,7 +373,7 @@ class TestQUICConnection:
assert stats["inbound_streams"] == 0
@pytest.mark.trio
async def test_get_active_streams(self, quic_connection):
async def test_get_active_streams(self, quic_connection) -> None:
"""Test getting active streams."""
quic_connection._started = True
@ -385,7 +388,7 @@ class TestQUICConnection:
assert stream2 in active_streams
@pytest.mark.trio
async def test_get_streams_by_protocol(self, quic_connection):
async def test_get_streams_by_protocol(self, quic_connection) -> None:
"""Test getting streams by protocol."""
quic_connection._started = True
@ -407,7 +410,9 @@ class TestQUICConnection:
# Enhanced close tests
@pytest.mark.trio
async def test_connection_close_enhanced(self, quic_connection: QUICConnection):
async def test_connection_close_enhanced(
self, quic_connection: QUICConnection
) -> None:
"""Test enhanced connection close with stream cleanup."""
quic_connection._started = True
@ -423,7 +428,9 @@ class TestQUICConnection:
# Concurrent operations tests
@pytest.mark.trio
async def test_concurrent_stream_operations(self, quic_connection):
async def test_concurrent_stream_operations(
self, quic_connection: QUICConnection
) -> None:
"""Test concurrent stream operations."""
quic_connection._started = True
@ -444,16 +451,16 @@ class TestQUICConnection:
# Connection properties tests
def test_connection_properties(self, quic_connection):
def test_connection_properties(self, quic_connection: QUICConnection) -> None:
"""Test connection property accessors."""
assert quic_connection.multiaddr() == quic_connection._maddr
assert quic_connection.local_peer_id() == quic_connection._local_peer_id
assert quic_connection.remote_peer_id() == quic_connection._peer_id
assert quic_connection.remote_peer_id() == quic_connection._remote_peer_id
# IRawConnection interface tests
@pytest.mark.trio
async def test_raw_connection_write(self, quic_connection):
async def test_raw_connection_write(self, quic_connection: QUICConnection) -> None:
"""Test raw connection write interface."""
quic_connection._started = True
@ -468,26 +475,16 @@ class TestQUICConnection:
mock_stream.close_write.assert_called_once()
@pytest.mark.trio
async def test_raw_connection_read_not_implemented(self, quic_connection):
async def test_raw_connection_read_not_implemented(
self, quic_connection: QUICConnection
) -> None:
"""Test raw connection read raises NotImplementedError."""
with pytest.raises(NotImplementedError, match="Use muxed connection interface"):
with pytest.raises(NotImplementedError):
await quic_connection.read()
# String representation tests
def test_connection_string_representation(self, quic_connection):
"""Test connection string representations."""
repr_str = repr(quic_connection)
str_str = str(quic_connection)
assert "QUICConnection" in repr_str
assert str(quic_connection._peer_id) in repr_str
assert str(quic_connection._remote_addr) in repr_str
assert str(quic_connection._peer_id) in str_str
# Mock verification helpers
def test_mock_resource_scope_functionality(self, mock_resource_scope):
def test_mock_resource_scope_functionality(self, mock_resource_scope) -> None:
"""Test mock resource scope works correctly."""
assert mock_resource_scope.memory_reserved == 0

File diff suppressed because it is too large Load Diff

View File

@ -1,765 +1,323 @@
"""
Integration tests for QUIC transport that test actual networking.
These tests require network access and test real socket operations.
Basic QUIC Echo Test
Simple test to verify the basic QUIC flow:
1. Client connects to server
2. Client sends data
3. Server receives data and echoes back
4. Client receives the echo
This test focuses on identifying where the accept_stream issue occurs.
"""
import logging
import random
import socket
import time
import pytest
import trio
from libp2p.crypto.ed25519 import create_new_key_pair
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.peer.id import ID
from libp2p.transport.quic.config import QUICTransportConfig
from libp2p.transport.quic.connection import QUICConnection
from libp2p.transport.quic.transport import QUICTransport
from libp2p.transport.quic.utils import create_quic_multiaddr
# Set up logging to see what's happening
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class TestQUICNetworking:
"""Integration tests that use actual networking."""
@pytest.fixture
def server_config(self):
"""Server configuration."""
return QUICTransportConfig(
idle_timeout=10.0,
connection_timeout=5.0,
max_concurrent_streams=100,
)
@pytest.fixture
def client_config(self):
"""Client configuration."""
return QUICTransportConfig(
idle_timeout=10.0,
connection_timeout=5.0,
)
class TestBasicQUICFlow:
"""Test basic QUIC client-server communication flow."""
@pytest.fixture
def server_key(self):
"""Generate server key pair."""
return create_new_key_pair().private_key
return create_new_key_pair()
@pytest.fixture
def client_key(self):
"""Generate client key pair."""
return create_new_key_pair().private_key
@pytest.mark.trio
async def test_listener_binding_real_socket(self, server_key, server_config):
"""Test that listener can bind to real socket."""
transport = QUICTransport(server_key, server_config)
async def connection_handler(connection):
logger.info(f"Received connection: {connection}")
listener = transport.create_listener(connection_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
async with trio.open_nursery() as nursery:
try:
success = await listener.listen(listen_addr, nursery)
assert success
# Verify we got a real port
addrs = listener.get_addrs()
assert len(addrs) == 1
# Port should be non-zero (was assigned)
from libp2p.transport.quic.utils import quic_multiaddr_to_endpoint
host, port = quic_multiaddr_to_endpoint(addrs[0])
assert host == "127.0.0.1"
assert port > 0
logger.info(f"Listener bound to {host}:{port}")
# Listener should be active
assert listener.is_listening()
# Test basic stats
stats = listener.get_stats()
assert stats["active_connections"] == 0
assert stats["pending_connections"] == 0
# Close listener
await listener.close()
assert not listener.is_listening()
finally:
await transport.close()
@pytest.mark.trio
async def test_multiple_listeners_different_ports(self, server_key, server_config):
"""Test multiple listeners on different ports."""
transport = QUICTransport(server_key, server_config)
async def connection_handler(connection):
pass
listeners = []
bound_ports = []
# Create multiple listeners
for i in range(3):
listener = transport.create_listener(connection_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
try:
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
# Get bound port
addrs = listener.get_addrs()
from libp2p.transport.quic.utils import quic_multiaddr_to_endpoint
host, port = quic_multiaddr_to_endpoint(addrs[0])
bound_ports.append(port)
listeners.append(listener)
logger.info(f"Listener {i} bound to port {port}")
nursery.cancel_scope.cancel()
finally:
await listener.close()
# All ports should be different
assert len(set(bound_ports)) == len(bound_ports)
@pytest.mark.trio
async def test_port_already_in_use(self, server_key, server_config):
"""Test handling of port already in use."""
transport1 = QUICTransport(server_key, server_config)
transport2 = QUICTransport(server_key, server_config)
async def connection_handler(connection):
pass
listener1 = transport1.create_listener(connection_handler)
listener2 = transport2.create_listener(connection_handler)
# Bind first listener to a specific port
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
async with trio.open_nursery() as nursery:
success1 = await listener1.listen(listen_addr, nursery)
assert success1
# Get the actual bound port
addrs = listener1.get_addrs()
from libp2p.transport.quic.utils import quic_multiaddr_to_endpoint
host, port = quic_multiaddr_to_endpoint(addrs[0])
# Try to bind second listener to same port
# Should fail or get different port
same_port_addr = create_quic_multiaddr("127.0.0.1", port, "/quic")
# This might either fail or succeed with SO_REUSEPORT
# The exact behavior depends on the system
try:
success2 = await listener2.listen(same_port_addr, nursery)
if success2:
# If it succeeds, verify different behavior
logger.info("Second listener bound successfully (SO_REUSEPORT)")
except Exception as e:
logger.info(f"Second listener failed as expected: {e}")
await listener1.close()
await listener2.close()
await transport1.close()
await transport2.close()
@pytest.mark.trio
async def test_listener_connection_tracking(self, server_key, server_config):
"""Test that listener properly tracks connection state."""
transport = QUICTransport(server_key, server_config)
received_connections = []
async def connection_handler(connection):
received_connections.append(connection)
logger.info(f"Handler received connection: {connection}")
# Keep connection alive briefly
await trio.sleep(0.1)
listener = transport.create_listener(connection_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
# Initially no connections
stats = listener.get_stats()
assert stats["active_connections"] == 0
assert stats["pending_connections"] == 0
# Simulate some packet processing
await trio.sleep(0.1)
# Verify listener is still healthy
assert listener.is_listening()
await listener.close()
await transport.close()
@pytest.mark.trio
async def test_listener_error_recovery(self, server_key, server_config):
"""Test listener error handling and recovery."""
transport = QUICTransport(server_key, server_config)
# Handler that raises an exception
async def failing_handler(connection):
raise ValueError("Simulated handler error")
listener = transport.create_listener(failing_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
try:
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
# Even with failing handler, listener should remain stable
await trio.sleep(0.1)
assert listener.is_listening()
# Test complete, stop listening
nursery.cancel_scope.cancel()
finally:
await listener.close()
await transport.close()
@pytest.mark.trio
async def test_transport_resource_cleanup_v1(self, server_key, server_config):
"""Test with single parent nursery managing all listeners."""
transport = QUICTransport(server_key, server_config)
async def connection_handler(connection):
pass
listeners = []
try:
async with trio.open_nursery() as parent_nursery:
# Start all listeners in parallel within the same nursery
for i in range(3):
listener = transport.create_listener(connection_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
listeners.append(listener)
parent_nursery.start_soon(
listener.listen, listen_addr, parent_nursery
)
# Give listeners time to start
await trio.sleep(0.2)
# Verify all listeners are active
for i, listener in enumerate(listeners):
assert listener.is_listening()
# Close transport should close all listeners
await transport.close()
# The nursery will exit cleanly because listeners are closed
finally:
# Cleanup verification outside nursery
assert transport._closed
assert len(transport._listeners) == 0
# All listeners should be closed
for listener in listeners:
assert not listener.is_listening()
@pytest.mark.trio
async def test_concurrent_listener_operations(self, server_key, server_config):
"""Test concurrent listener operations."""
transport = QUICTransport(server_key, server_config)
async def connection_handler(connection):
await trio.sleep(0.01) # Simulate some work
async def create_and_run_listener(listener_id):
"""Create, run, and close a listener."""
listener = transport.create_listener(connection_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
logger.info(f"Listener {listener_id} started")
# Run for a short time
await trio.sleep(0.1)
await listener.close()
logger.info(f"Listener {listener_id} closed")
try:
# Run multiple listeners concurrently
async with trio.open_nursery() as nursery:
for i in range(5):
nursery.start_soon(create_and_run_listener, i)
finally:
await transport.close()
class TestQUICConcurrency:
"""Fixed tests with proper nursery management."""
@pytest.fixture
def server_key(self):
"""Generate server key pair."""
return create_new_key_pair().private_key
return create_new_key_pair()
@pytest.fixture
def server_config(self):
"""Server configuration."""
"""Simple server configuration."""
return QUICTransportConfig(
idle_timeout=10.0,
connection_timeout=5.0,
max_concurrent_streams=100,
max_concurrent_streams=10,
max_connections=5,
)
@pytest.fixture
def client_config(self):
"""Simple client configuration."""
return QUICTransportConfig(
idle_timeout=10.0,
connection_timeout=5.0,
max_concurrent_streams=5,
)
@pytest.mark.trio
async def test_concurrent_listener_operations(self, server_key, server_config):
"""Test concurrent listener operations - FIXED VERSION."""
transport = QUICTransport(server_key, server_config)
async def test_basic_echo_flow(
self, server_key, client_key, server_config, client_config
):
"""Test basic client-server echo flow with detailed logging."""
print("\n=== BASIC QUIC ECHO TEST ===")
async def connection_handler(connection):
await trio.sleep(0.01) # Simulate some work
# Create server components
server_transport = QUICTransport(server_key.private_key, server_config)
server_peer_id = ID.from_pubkey(server_key.public_key)
listeners = []
# Track test state
server_received_data = None
server_connection_established = False
echo_sent = False
async def create_and_run_listener(listener_id):
"""Create and run a listener - fixed to avoid deadlock."""
listener = transport.create_listener(connection_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
listeners.append(listener)
async def echo_server_handler(connection: QUICConnection) -> None:
"""Simple echo server handler with detailed logging."""
nonlocal server_received_data, server_connection_established, echo_sent
print("🔗 SERVER: Connection handler called")
server_connection_established = True
try:
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
print("📡 SERVER: Waiting for incoming stream...")
logger.info(f"Listener {listener_id} started")
# Accept stream with timeout and detailed logging
print("📡 SERVER: Calling accept_stream...")
stream = await connection.accept_stream(timeout=5.0)
# Run for a short time
await trio.sleep(0.1)
if stream is None:
print("❌ SERVER: accept_stream returned None")
return
# Close INSIDE the nursery scope to allow clean exit
await listener.close()
logger.info(f"Listener {listener_id} closed")
print(f"✅ SERVER: Stream accepted! Stream ID: {stream.stream_id}")
# Read data from the stream
print("📖 SERVER: Reading data from stream...")
server_data = await stream.read(1024)
if not server_data:
print("❌ SERVER: No data received from stream")
return
server_received_data = server_data.decode("utf-8", errors="ignore")
print(f"📨 SERVER: Received data: '{server_received_data}'")
# Echo the data back
echo_message = f"ECHO: {server_received_data}"
print(f"📤 SERVER: Sending echo: '{echo_message}'")
await stream.write(echo_message.encode())
echo_sent = True
print("✅ SERVER: Echo sent successfully")
# Close the stream
await stream.close()
print("🔒 SERVER: Stream closed")
except Exception as e:
logger.error(f"Listener {listener_id} error: {e}")
if not listener._closed:
await listener.close()
raise
print(f"❌ SERVER: Error in handler: {e}")
import traceback
try:
# Run multiple listeners concurrently
async with trio.open_nursery() as nursery:
for i in range(5):
nursery.start_soon(create_and_run_listener, i)
traceback.print_exc()
# Verify all listeners were created and closed properly
assert len(listeners) == 5
for listener in listeners:
assert not listener.is_listening() # Should all be closed
finally:
await transport.close()
@pytest.mark.trio
@pytest.mark.slow
async def test_listener_under_simulated_load(self, server_key, server_config):
"""REAL load test with actual packet simulation."""
print("=== REAL LOAD TEST ===")
config = QUICTransportConfig(
idle_timeout=30.0,
connection_timeout=10.0,
max_concurrent_streams=1000,
max_connections=500,
)
transport = QUICTransport(server_key, config)
connection_count = 0
async def connection_handler(connection):
nonlocal connection_count
# TODO: Remove type ignore when pyrefly fixes nonlocal bug
connection_count += 1 # type: ignore
print(f"Real connection established: {connection_count}")
# Simulate connection work
await trio.sleep(0.01)
listener = transport.create_listener(connection_handler)
# Create listener
listener = server_transport.create_listener(echo_server_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
async def generate_udp_traffic(target_host, target_port, num_packets=100):
"""Generate fake UDP traffic to simulate load."""
print(
f"Generating {num_packets} UDP packets to {target_host}:{target_port}"
)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
for i in range(num_packets):
# Send random UDP packets
# (Won't be valid QUIC, but will exercise packet handler)
fake_packet = (
f"FAKE_PACKET_{i}_{random.randint(1000, 9999)}".encode()
)
sock.sendto(fake_packet, (target_host, int(target_port)))
# Small delay between packets
await trio.sleep(0.001)
if i % 20 == 0:
print(f"Sent {i + 1}/{num_packets} packets")
except Exception as e:
print(f"Error sending packets: {e}")
finally:
sock.close()
print(f"Finished sending {num_packets} packets")
# Variables to track client state
client_connected = False
client_sent_data = False
client_received_echo = None
try:
print("🚀 Starting server...")
async with trio.open_nursery() as nursery:
# Start server listener
success = await listener.listen(listen_addr, nursery)
assert success
assert success, "Failed to start server listener"
# Get the actual bound port
bound_addrs = listener.get_addrs()
bound_addr = bound_addrs[0]
print(bound_addr)
host, port = (
bound_addr.value_for_protocol("ip4"),
bound_addr.value_for_protocol("udp"),
)
# Get server address
server_addrs = listener.get_addrs()
server_addr = server_addrs[0]
print(f"🔧 SERVER: Listening on {server_addr}")
print(f"Listener bound to {host}:{port}")
# Give server a moment to be ready
await trio.sleep(0.1)
# Start load generation
nursery.start_soon(generate_udp_traffic, host, port, 50)
print("🚀 Starting client...")
# Let the load test run
start_time = time.time()
await trio.sleep(2.0) # Let traffic flow for 2 seconds
end_time = time.time()
# Create client transport
client_transport = QUICTransport(client_key.private_key, client_config)
# Check that listener handled the load
stats = listener.get_stats()
print(f"Final stats: {stats}")
# Should have received packets (even if they're invalid QUIC)
assert stats["packets_processed"] > 0
assert stats["bytes_received"] > 0
duration = end_time - start_time
print(f"Load test ran for {duration:.2f}s")
print(f"Processed {stats['packets_processed']} packets")
print(f"Received {stats['bytes_received']} bytes")
await listener.close()
finally:
if not listener._closed:
await listener.close()
await transport.close()
class TestQUICRealWorldScenarios:
"""Test real-world usage scenarios - FIXED VERSIONS."""
@pytest.mark.trio
async def test_echo_server_pattern(self):
"""Test a basic echo server pattern - FIXED VERSION."""
server_key = create_new_key_pair().private_key
config = QUICTransportConfig(idle_timeout=5.0)
transport = QUICTransport(server_key, config)
echo_data = []
async def echo_connection_handler(connection):
"""Echo server that handles one connection."""
logger.info(f"Echo server got connection: {connection}")
async def stream_handler(stream):
try:
# Read data and echo it back
while True:
data = await stream.read(1024)
if not data:
break
# Connect to server
print(f"📞 CLIENT: Connecting to {server_addr}")
connection = await client_transport.dial(
server_addr, peer_id=server_peer_id, nursery=nursery
)
client_connected = True
print("✅ CLIENT: Connected to server")
echo_data.append(data)
await stream.write(b"ECHO: " + data)
# Open a stream
print("📤 CLIENT: Opening stream...")
stream = await connection.open_stream()
print(f"✅ CLIENT: Stream opened with ID: {stream.stream_id}")
# Send test data
test_message = "Hello QUIC Server!"
print(f"📨 CLIENT: Sending message: '{test_message}'")
await stream.write(test_message.encode())
client_sent_data = True
print("✅ CLIENT: Message sent")
# Read echo response
print("📖 CLIENT: Waiting for echo response...")
response_data = await stream.read(1024)
if response_data:
client_received_echo = response_data.decode(
"utf-8", errors="ignore"
)
print(f"📬 CLIENT: Received echo: '{client_received_echo}'")
else:
print("❌ CLIENT: No echo response received")
print("🔒 CLIENT: Closing connection")
await connection.close()
print("🔒 CLIENT: Connection closed")
print("🔒 CLIENT: Closing transport")
await client_transport.close()
print("🔒 CLIENT: Transport closed")
except Exception as e:
logger.error(f"Stream error: {e}")
print(f"❌ CLIENT: Error: {e}")
import traceback
traceback.print_exc()
finally:
await stream.close()
await client_transport.close()
print("🔒 CLIENT: Transport closed")
connection.set_stream_handler(stream_handler)
# Keep connection alive until closed
while not connection.is_closed:
await trio.sleep(0.1)
listener = transport.create_listener(echo_connection_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
try:
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
# Let server initialize
await trio.sleep(0.1)
# Verify server is ready
assert listener.is_listening()
# Run server for a bit
# Give everything time to complete
await trio.sleep(0.5)
# Close inside nursery for clean exit
await listener.close()
# Cancel nursery to stop server
nursery.cancel_scope.cancel()
finally:
# Ensure cleanup
# Cleanup
if not listener._closed:
await listener.close()
await transport.close()
await server_transport.close()
# Verify the flow worked
print("\n📊 TEST RESULTS:")
print(f" Server connection established: {server_connection_established}")
print(f" Client connected: {client_connected}")
print(f" Client sent data: {client_sent_data}")
print(f" Server received data: '{server_received_data}'")
print(f" Echo sent by server: {echo_sent}")
print(f" Client received echo: '{client_received_echo}'")
# Test assertions
assert server_connection_established, "Server connection handler was not called"
assert client_connected, "Client failed to connect"
assert client_sent_data, "Client failed to send data"
assert server_received_data == "Hello QUIC Server!", (
f"Server received wrong data: '{server_received_data}'"
)
assert echo_sent, "Server failed to send echo"
assert client_received_echo == "ECHO: Hello QUIC Server!", (
f"Client received wrong echo: '{client_received_echo}'"
)
print("✅ BASIC ECHO TEST PASSED!")
@pytest.mark.trio
async def test_connection_lifecycle_monitoring(self):
"""Test monitoring connection lifecycle events - FIXED VERSION."""
server_key = create_new_key_pair().private_key
config = QUICTransportConfig(idle_timeout=5.0)
transport = QUICTransport(server_key, config)
async def test_server_accept_stream_timeout(
self, server_key, client_key, server_config, client_config
):
"""Test what happens when server accept_stream times out."""
print("\n=== TESTING SERVER ACCEPT_STREAM TIMEOUT ===")
lifecycle_events = []
server_transport = QUICTransport(server_key.private_key, server_config)
server_peer_id = ID.from_pubkey(server_key.public_key)
async def monitoring_handler(connection):
lifecycle_events.append(("connection_started", connection.get_stats()))
accept_stream_called = False
accept_stream_timeout = False
async def timeout_test_handler(connection: QUICConnection) -> None:
"""Handler that tests accept_stream timeout."""
nonlocal accept_stream_called, accept_stream_timeout
print("🔗 SERVER: Connection established, testing accept_stream timeout")
accept_stream_called = True
try:
# Monitor connection
while not connection.is_closed:
stats = connection.get_stats()
lifecycle_events.append(("connection_stats", stats))
await trio.sleep(0.1)
print("📡 SERVER: Calling accept_stream with 2 second timeout...")
stream = await connection.accept_stream(timeout=2.0)
print(f"✅ SERVER: accept_stream returned: {stream}")
except Exception as e:
lifecycle_events.append(("connection_error", str(e)))
finally:
lifecycle_events.append(("connection_ended", connection.get_stats()))
print(f"⏰ SERVER: accept_stream timed out or failed: {e}")
accept_stream_timeout = True
listener = transport.create_listener(monitoring_handler)
listener = server_transport.create_listener(timeout_test_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
client_connected = False
try:
async with trio.open_nursery() as nursery:
# Start server
success = await listener.listen(listen_addr, nursery)
assert success
# Run monitoring for a bit
await trio.sleep(0.5)
server_addr = listener.get_addrs()[0]
print(f"🔧 SERVER: Listening on {server_addr}")
# Check that monitoring infrastructure is working
assert listener.is_listening()
# Create client but DON'T open a stream
client_transport = QUICTransport(client_key.private_key, client_config)
# Close inside nursery
await listener.close()
try:
print("📞 CLIENT: Connecting (but NOT opening stream)...")
connection = await client_transport.dial(
server_addr, peer_id=server_peer_id, nursery=nursery
)
client_connected = True
print("✅ CLIENT: Connected (no stream opened)")
# Wait for server timeout
await trio.sleep(3.0)
await connection.close()
print("🔒 CLIENT: Connection closed")
finally:
await client_transport.close()
nursery.cancel_scope.cancel()
finally:
# Ensure cleanup
if not listener._closed:
await listener.close()
await transport.close()
await listener.close()
await server_transport.close()
# Should have some lifecycle events from setup
logger.info(f"Recorded {len(lifecycle_events)} lifecycle events")
print("\n📊 TIMEOUT TEST RESULTS:")
print(f" Client connected: {client_connected}")
print(f" accept_stream called: {accept_stream_called}")
print(f" accept_stream timeout: {accept_stream_timeout}")
@pytest.mark.trio
async def test_multi_listener_echo_servers(self):
"""Test multiple echo servers running in parallel."""
server_key = create_new_key_pair().private_key
config = QUICTransportConfig(idle_timeout=5.0)
transport = QUICTransport(server_key, config)
assert client_connected, "Client should have connected"
assert accept_stream_called, "accept_stream should have been called"
assert accept_stream_timeout, (
"accept_stream should have timed out when no stream was opened"
)
all_echo_data = {}
listeners = []
async def create_echo_server(server_id):
"""Create and run one echo server."""
echo_data = []
all_echo_data[server_id] = echo_data
async def echo_handler(connection):
logger.info(f"Echo server {server_id} got connection")
async def stream_handler(stream):
try:
while True:
data = await stream.read(1024)
if not data:
break
echo_data.append(data)
await stream.write(f"ECHO-{server_id}: ".encode() + data)
except Exception as e:
logger.error(f"Stream error in server {server_id}: {e}")
finally:
await stream.close()
connection.set_stream_handler(stream_handler)
while not connection.is_closed:
await trio.sleep(0.1)
listener = transport.create_listener(echo_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
listeners.append(listener)
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
logger.info(f"Echo server {server_id} started")
# Run for a bit
await trio.sleep(0.3)
# Close this server
await listener.close()
logger.info(f"Echo server {server_id} closed")
try:
# Run multiple echo servers in parallel
async with trio.open_nursery() as nursery:
for i in range(3):
nursery.start_soon(create_echo_server, i)
# Verify all servers ran
assert len(listeners) == 3
assert len(all_echo_data) == 3
for listener in listeners:
assert not listener.is_listening() # Should all be closed
finally:
await transport.close()
@pytest.mark.trio
async def test_graceful_shutdown_sequence(self):
"""Test graceful shutdown of multiple components."""
server_key = create_new_key_pair().private_key
config = QUICTransportConfig(idle_timeout=5.0)
transport = QUICTransport(server_key, config)
shutdown_events = []
listeners = []
async def tracked_connection_handler(connection):
"""Connection handler that tracks shutdown."""
try:
while not connection.is_closed:
await trio.sleep(0.1)
finally:
shutdown_events.append(f"connection_closed_{id(connection)}")
async def create_tracked_listener(listener_id):
"""Create a listener that tracks its lifecycle."""
try:
listener = transport.create_listener(tracked_connection_handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
listeners.append(listener)
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
shutdown_events.append(f"listener_{listener_id}_started")
# Run for a bit
await trio.sleep(0.2)
# Graceful close
await listener.close()
shutdown_events.append(f"listener_{listener_id}_closed")
except Exception as e:
shutdown_events.append(f"listener_{listener_id}_error_{e}")
raise
try:
# Start multiple listeners
async with trio.open_nursery() as nursery:
for i in range(3):
nursery.start_soon(create_tracked_listener, i)
# Verify shutdown sequence
start_events = [e for e in shutdown_events if "started" in e]
close_events = [e for e in shutdown_events if "closed" in e]
assert len(start_events) == 3
assert len(close_events) == 3
logger.info(f"Shutdown sequence: {shutdown_events}")
finally:
shutdown_events.append("transport_closing")
await transport.close()
shutdown_events.append("transport_closed")
# HELPER FUNCTIONS FOR CLEANER TESTS
async def run_listener_for_duration(transport, handler, duration=0.5):
"""Helper to run a single listener for a specific duration."""
listener = transport.create_listener(handler)
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
async with trio.open_nursery() as nursery:
success = await listener.listen(listen_addr, nursery)
assert success
# Run for specified duration
await trio.sleep(duration)
# Clean close
await listener.close()
return listener
async def run_multiple_listeners_parallel(transport, handler, count=3, duration=0.5):
"""Helper to run multiple listeners in parallel."""
listeners = []
async def single_listener_task(listener_id):
listener = await run_listener_for_duration(transport, handler, duration)
listeners.append(listener)
logger.info(f"Listener {listener_id} completed")
async with trio.open_nursery() as nursery:
for i in range(count):
nursery.start_soon(single_listener_task, i)
return listeners
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])
print("✅ TIMEOUT TEST PASSED!")

View File

@ -8,6 +8,7 @@ from libp2p.crypto.ed25519 import (
create_new_key_pair,
)
from libp2p.crypto.keys import PrivateKey
from libp2p.peer.id import ID
from libp2p.transport.quic.exceptions import (
QUICDialError,
QUICListenError,
@ -111,7 +112,10 @@ class TestQUICTransport:
await transport.close()
with pytest.raises(QUICDialError, match="Transport is closed"):
await transport.dial(multiaddr.Multiaddr("/ip4/127.0.0.1/udp/4001/quic"))
await transport.dial(
multiaddr.Multiaddr("/ip4/127.0.0.1/udp/4001/quic"),
ID.from_pubkey(create_new_key_pair().public_key),
)
def test_create_listener_closed_transport(self, transport):
"""Test creating listener with closed transport raises error."""