DEBUG: client certificate at server

This commit is contained in:
Akash Mondal
2025-07-07 06:47:18 +00:00
committed by lla-dane
parent 0f64bb49b5
commit b3f0a4e8c4
6 changed files with 98 additions and 85 deletions

View File

@ -2,6 +2,8 @@ from collections.abc import (
Awaitable, Awaitable,
Callable, Callable,
) )
from libp2p.transport.quic.connection import QUICConnection
from typing import cast
import logging import logging
import sys import sys
@ -281,6 +283,17 @@ class Swarm(Service, INetworkService):
) -> None: ) -> None:
raw_conn = RawConnection(read_write_closer, False) raw_conn = RawConnection(read_write_closer, False)
# No need to upgrade QUIC Connection
if isinstance(self.transport, QUICTransport):
print("Connecting QUIC Connection")
quic_conn = cast(QUICConnection, raw_conn)
await self.add_conn(quic_conn)
# NOTE: This is a intentional barrier to prevent from the handler
# exiting and closing the connection.
await self.manager.wait_finished()
print("Connection Connected")
return
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first # Per, https://discuss.libp2p.io/t/multistream-security/130, we first
# secure the conn and then mux the conn # secure the conn and then mux the conn
try: try:
@ -396,6 +409,7 @@ class Swarm(Service, INetworkService):
muxed_conn, muxed_conn,
self, self,
) )
print("add_conn called")
self.manager.run_task(muxed_conn.start) self.manager.run_task(muxed_conn.start)
await muxed_conn.event_started.wait() await muxed_conn.event_started.wait()

View File

@ -44,6 +44,7 @@ logging.basicConfig(
handlers=[logging.StreamHandler(stdout)], handlers=[logging.StreamHandler(stdout)],
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class QUICConnection(IRawConnection, IMuxedConn): class QUICConnection(IRawConnection, IMuxedConn):
@ -179,7 +180,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
"connection_id_changes": 0, "connection_id_changes": 0,
} }
logger.info( print(
f"Created QUIC connection to {remote_peer_id} " f"Created QUIC connection to {remote_peer_id} "
f"(initiator: {is_initiator}, addr: {remote_addr}, " f"(initiator: {is_initiator}, addr: {remote_addr}, "
"security: {security_manager is not None})" "security: {security_manager is not None})"
@ -278,7 +279,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._started = True self._started = True
self.event_started.set() self.event_started.set()
logger.info(f"Starting QUIC connection to {self._remote_peer_id}") print(f"Starting QUIC connection to {self._remote_peer_id}")
try: try:
# If this is a client connection, we need to establish the connection # If this is a client connection, we need to establish the connection
@ -289,7 +290,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._established = True self._established = True
self._connected_event.set() self._connected_event.set()
logger.info(f"QUIC connection to {self._remote_peer_id} started") print(f"QUIC connection to {self._remote_peer_id} started")
except Exception as e: except Exception as e:
logger.error(f"Failed to start connection: {e}") logger.error(f"Failed to start connection: {e}")
@ -300,7 +301,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
try: try:
with QUICErrorContext("connection_initiation", "connection"): with QUICErrorContext("connection_initiation", "connection"):
if not self._socket: if not self._socket:
logger.info("Creating new socket for outbound connection") print("Creating new socket for outbound connection")
self._socket = trio.socket.socket( self._socket = trio.socket.socket(
family=socket.AF_INET, type=socket.SOCK_DGRAM family=socket.AF_INET, type=socket.SOCK_DGRAM
) )
@ -312,7 +313,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Send initial packet(s) # Send initial packet(s)
await self._transmit() await self._transmit()
logger.info(f"Initiated QUIC connection to {self._remote_addr}") print(f"Initiated QUIC connection to {self._remote_addr}")
except Exception as e: except Exception as e:
logger.error(f"Failed to initiate connection: {e}") logger.error(f"Failed to initiate connection: {e}")
@ -334,16 +335,16 @@ class QUICConnection(IRawConnection, IMuxedConn):
try: try:
with QUICErrorContext("connection_establishment", "connection"): with QUICErrorContext("connection_establishment", "connection"):
# Start the connection if not already started # Start the connection if not already started
logger.info("STARTING TO CONNECT") print("STARTING TO CONNECT")
if not self._started: if not self._started:
await self.start() await self.start()
# Start background event processing # Start background event processing
if not self._background_tasks_started: if not self._background_tasks_started:
logger.info("STARTING BACKGROUND TASK") print("STARTING BACKGROUND TASK")
await self._start_background_tasks() await self._start_background_tasks()
else: else:
logger.info("BACKGROUND TASK ALREADY STARTED") print("BACKGROUND TASK ALREADY STARTED")
# Wait for handshake completion with timeout # Wait for handshake completion with timeout
with trio.move_on_after( with trio.move_on_after(
@ -357,15 +358,13 @@ class QUICConnection(IRawConnection, IMuxedConn):
f"{self.CONNECTION_HANDSHAKE_TIMEOUT}s" f"{self.CONNECTION_HANDSHAKE_TIMEOUT}s"
) )
logger.info( print("QUICConnection: Verifying peer identity with security manager")
"QUICConnection: Verifying peer identity with security manager"
)
# Verify peer identity using security manager # Verify peer identity using security manager
await self._verify_peer_identity_with_security() self.peer_id = await self._verify_peer_identity_with_security()
logger.info("QUICConnection: Peer identity verified") print("QUICConnection: Peer identity verified")
self._established = True self._established = True
logger.info(f"QUIC connection established with {self._remote_peer_id}") print(f"QUIC connection established with {self._remote_peer_id}")
except Exception as e: except Exception as e:
logger.error(f"Failed to establish connection: {e}") logger.error(f"Failed to establish connection: {e}")
@ -385,11 +384,11 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._nursery.start_soon(async_fn=self._event_processing_loop) self._nursery.start_soon(async_fn=self._event_processing_loop)
self._nursery.start_soon(async_fn=self._periodic_maintenance) self._nursery.start_soon(async_fn=self._periodic_maintenance)
logger.info("Started background tasks for QUIC connection") print("Started background tasks for QUIC connection")
async def _event_processing_loop(self) -> None: async def _event_processing_loop(self) -> None:
"""Main event processing loop for the connection.""" """Main event processing loop for the connection."""
logger.info( print(
f"Started QUIC event processing loop for connection id: {id(self)} " f"Started QUIC event processing loop for connection id: {id(self)} "
f"and local peer id {str(self.local_peer_id())}" f"and local peer id {str(self.local_peer_id())}"
) )
@ -412,7 +411,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
logger.error(f"Error in event processing loop: {e}") logger.error(f"Error in event processing loop: {e}")
await self._handle_connection_error(e) await self._handle_connection_error(e)
finally: finally:
logger.info("QUIC event processing loop finished") print("QUIC event processing loop finished")
async def _periodic_maintenance(self) -> None: async def _periodic_maintenance(self) -> None:
"""Perform periodic connection maintenance.""" """Perform periodic connection maintenance."""
@ -427,7 +426,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
# *** NEW: Log connection ID status periodically *** # *** NEW: Log connection ID status periodically ***
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
cid_stats = self.get_connection_id_stats() cid_stats = self.get_connection_id_stats()
logger.info(f"Connection ID stats: {cid_stats}") print(f"Connection ID stats: {cid_stats}")
# Sleep for maintenance interval # Sleep for maintenance interval
await trio.sleep(30.0) # 30 seconds await trio.sleep(30.0) # 30 seconds
@ -437,15 +436,15 @@ class QUICConnection(IRawConnection, IMuxedConn):
async def _client_packet_receiver(self) -> None: async def _client_packet_receiver(self) -> None:
"""Receive packets for client connections.""" """Receive packets for client connections."""
logger.info("Starting client packet receiver") print("Starting client packet receiver")
logger.info("Started QUIC client packet receiver") print("Started QUIC client packet receiver")
try: try:
while not self._closed and self._socket: while not self._closed and self._socket:
try: try:
# Receive UDP packets # Receive UDP packets
data, addr = await self._socket.recvfrom(65536) data, addr = await self._socket.recvfrom(65536)
logger.info(f"Client received {len(data)} bytes from {addr}") print(f"Client received {len(data)} bytes from {addr}")
# Feed packet to QUIC connection # Feed packet to QUIC connection
self._quic.receive_datagram(data, addr, now=time.time()) self._quic.receive_datagram(data, addr, now=time.time())
@ -457,21 +456,21 @@ class QUICConnection(IRawConnection, IMuxedConn):
await self._transmit() await self._transmit()
except trio.ClosedResourceError: except trio.ClosedResourceError:
logger.info("Client socket closed") print("Client socket closed")
break break
except Exception as e: except Exception as e:
logger.error(f"Error receiving client packet: {e}") logger.error(f"Error receiving client packet: {e}")
await trio.sleep(0.01) await trio.sleep(0.01)
except trio.Cancelled: except trio.Cancelled:
logger.info("Client packet receiver cancelled") print("Client packet receiver cancelled")
raise raise
finally: finally:
logger.info("Client packet receiver terminated") print("Client packet receiver terminated")
# Security and identity methods # Security and identity methods
async def _verify_peer_identity_with_security(self) -> None: async def _verify_peer_identity_with_security(self) -> ID:
""" """
Verify peer identity using integrated security manager. Verify peer identity using integrated security manager.
@ -479,9 +478,9 @@ class QUICConnection(IRawConnection, IMuxedConn):
QUICPeerVerificationError: If peer verification fails QUICPeerVerificationError: If peer verification fails
""" """
logger.info("VERIFYING PEER IDENTITY") print("VERIFYING PEER IDENTITY")
if not self._security_manager: if not self._security_manager:
logger.warning("No security manager available for peer verification") print("No security manager available for peer verification")
return return
try: try:
@ -489,11 +488,12 @@ class QUICConnection(IRawConnection, IMuxedConn):
await self._extract_peer_certificate() await self._extract_peer_certificate()
if not self._peer_certificate: if not self._peer_certificate:
logger.warning("No peer certificate available for verification") print("No peer certificate available for verification")
return return
# Validate certificate format and accessibility # Validate certificate format and accessibility
if not self._validate_peer_certificate(): if not self._validate_peer_certificate():
print("Validation Failed for peer cerificate")
raise QUICPeerVerificationError("Peer certificate validation failed") raise QUICPeerVerificationError("Peer certificate validation failed")
# Verify peer identity using security manager # Verify peer identity using security manager
@ -505,7 +505,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Update peer ID if it wasn't known (inbound connections) # Update peer ID if it wasn't known (inbound connections)
if not self._remote_peer_id: if not self._remote_peer_id:
self._remote_peer_id = verified_peer_id self._remote_peer_id = verified_peer_id
logger.info(f"Discovered peer ID from certificate: {verified_peer_id}") print(f"Discovered peer ID from certificate: {verified_peer_id}")
elif self._remote_peer_id != verified_peer_id: elif self._remote_peer_id != verified_peer_id:
raise QUICPeerVerificationError( raise QUICPeerVerificationError(
f"Peer ID mismatch: expected {self._remote_peer_id}, " f"Peer ID mismatch: expected {self._remote_peer_id}, "
@ -513,7 +513,8 @@ class QUICConnection(IRawConnection, IMuxedConn):
) )
self._peer_verified = True self._peer_verified = True
logger.info(f"Peer identity verified successfully: {verified_peer_id}") print(f"Peer identity verified successfully: {verified_peer_id}")
return verified_peer_id
except QUICPeerVerificationError: except QUICPeerVerificationError:
# Re-raise verification errors as-is # Re-raise verification errors as-is
@ -526,26 +527,21 @@ class QUICConnection(IRawConnection, IMuxedConn):
"""Extract peer certificate from completed TLS handshake.""" """Extract peer certificate from completed TLS handshake."""
try: try:
# Get peer certificate from aioquic TLS context # Get peer certificate from aioquic TLS context
# Based on aioquic source code: QuicConnection.tls._peer_certificate if self._quic.tls:
if hasattr(self._quic, "tls") and self._quic.tls:
tls_context = self._quic.tls tls_context = self._quic.tls
# Check if peer certificate is available in TLS context if tls_context._peer_certificate:
if (
hasattr(tls_context, "_peer_certificate")
and tls_context._peer_certificate
):
# aioquic stores the peer certificate as cryptography # aioquic stores the peer certificate as cryptography
# x509.Certificate # x509.Certificate
self._peer_certificate = tls_context._peer_certificate self._peer_certificate = tls_context._peer_certificate
logger.info( print(
f"Extracted peer certificate: {self._peer_certificate.subject}" f"Extracted peer certificate: {self._peer_certificate.subject}"
) )
else: else:
logger.info("No peer certificate found in TLS context") print("No peer certificate found in TLS context")
else: else:
logger.info("No TLS context available for certificate extraction") print("No TLS context available for certificate extraction")
except Exception as e: except Exception as e:
logger.warning(f"Failed to extract peer certificate: {e}") logger.warning(f"Failed to extract peer certificate: {e}")
@ -594,7 +590,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
subject = self._peer_certificate.subject subject = self._peer_certificate.subject
serial_number = self._peer_certificate.serial_number serial_number = self._peer_certificate.serial_number
logger.info( print(
f"Certificate validation - Subject: {subject}, Serial: {serial_number}" f"Certificate validation - Subject: {subject}, Serial: {serial_number}"
) )
return True return True
@ -719,7 +715,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._outbound_stream_count += 1 self._outbound_stream_count += 1
self._stats["streams_opened"] += 1 self._stats["streams_opened"] += 1
logger.info(f"Opened outbound QUIC stream {stream_id}") print(f"Opened outbound QUIC stream {stream_id}")
return stream return stream
raise QUICStreamTimeoutError(f"Stream creation timed out after {timeout}s") raise QUICStreamTimeoutError(f"Stream creation timed out after {timeout}s")
@ -781,7 +777,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
""" """
self._stream_handler = handler_function self._stream_handler = handler_function
logger.info("Set stream handler for incoming streams") print("Set stream handler for incoming streams")
def _remove_stream(self, stream_id: int) -> None: def _remove_stream(self, stream_id: int) -> None:
""" """
@ -808,7 +804,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
if self._nursery: if self._nursery:
self._nursery.start_soon(update_counts) self._nursery.start_soon(update_counts)
logger.info(f"Removed stream {stream_id} from connection") print(f"Removed stream {stream_id} from connection")
# *** UPDATED: Complete QUIC event handling - FIXES THE ORIGINAL ISSUE *** # *** UPDATED: Complete QUIC event handling - FIXES THE ORIGINAL ISSUE ***
@ -830,15 +826,15 @@ class QUICConnection(IRawConnection, IMuxedConn):
await self._handle_quic_event(event) await self._handle_quic_event(event)
if events_processed > 0: if events_processed > 0:
logger.info(f"Processed {events_processed} QUIC events") print(f"Processed {events_processed} QUIC events")
finally: finally:
self._event_processing_active = False self._event_processing_active = False
async def _handle_quic_event(self, event: events.QuicEvent) -> None: async def _handle_quic_event(self, event: events.QuicEvent) -> None:
"""Handle a single QUIC event with COMPLETE event type coverage.""" """Handle a single QUIC event with COMPLETE event type coverage."""
logger.info(f"Handling QUIC event: {type(event).__name__}") print(f"Handling QUIC event: {type(event).__name__}")
logger.info(f"QUIC event: {type(event).__name__}") print(f"QUIC event: {type(event).__name__}")
try: try:
if isinstance(event, events.ConnectionTerminated): if isinstance(event, events.ConnectionTerminated):
@ -864,8 +860,8 @@ class QUICConnection(IRawConnection, IMuxedConn):
elif isinstance(event, events.StopSendingReceived): elif isinstance(event, events.StopSendingReceived):
await self._handle_stop_sending_received(event) await self._handle_stop_sending_received(event)
else: else:
logger.info(f"Unhandled QUIC event type: {type(event).__name__}") print(f"Unhandled QUIC event type: {type(event).__name__}")
logger.info(f"Unhandled QUIC event: {type(event).__name__}") print(f"Unhandled QUIC event: {type(event).__name__}")
except Exception as e: except Exception as e:
logger.error(f"Error handling QUIC event {type(event).__name__}: {e}") logger.error(f"Error handling QUIC event {type(event).__name__}: {e}")
@ -880,8 +876,8 @@ class QUICConnection(IRawConnection, IMuxedConn):
This is the CRITICAL missing functionality that was causing your issue! This is the CRITICAL missing functionality that was causing your issue!
""" """
logger.info(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}") print(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}")
logger.info(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}") print(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}")
# Add to available connection IDs # Add to available connection IDs
self._available_connection_ids.add(event.connection_id) self._available_connection_ids.add(event.connection_id)
@ -889,14 +885,14 @@ class QUICConnection(IRawConnection, IMuxedConn):
# If we don't have a current connection ID, use this one # If we don't have a current connection ID, use this one
if self._current_connection_id is None: if self._current_connection_id is None:
self._current_connection_id = event.connection_id self._current_connection_id = event.connection_id
logger.info(f"🆔 Set current connection ID to: {event.connection_id.hex()}") print(f"🆔 Set current connection ID to: {event.connection_id.hex()}")
logger.info(f"🆔 Set current connection ID to: {event.connection_id.hex()}") print(f"🆔 Set current connection ID to: {event.connection_id.hex()}")
# Update statistics # Update statistics
self._stats["connection_ids_issued"] += 1 self._stats["connection_ids_issued"] += 1
logger.info(f"Available connection IDs: {len(self._available_connection_ids)}") print(f"Available connection IDs: {len(self._available_connection_ids)}")
logger.info(f"Available connection IDs: {len(self._available_connection_ids)}") print(f"Available connection IDs: {len(self._available_connection_ids)}")
async def _handle_connection_id_retired( async def _handle_connection_id_retired(
self, event: events.ConnectionIdRetired self, event: events.ConnectionIdRetired
@ -906,8 +902,8 @@ class QUICConnection(IRawConnection, IMuxedConn):
This handles when the peer tells us to stop using a connection ID. This handles when the peer tells us to stop using a connection ID.
""" """
logger.info(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}") print(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}")
logger.info(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}") print(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}")
# Remove from available IDs and add to retired set # Remove from available IDs and add to retired set
self._available_connection_ids.discard(event.connection_id) self._available_connection_ids.discard(event.connection_id)
@ -924,7 +920,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
else: else:
self._current_connection_id = None self._current_connection_id = None
logger.warning("⚠️ No available connection IDs after retirement!") logger.warning("⚠️ No available connection IDs after retirement!")
logger.info("⚠️ No available connection IDs after retirement!") print("⚠️ No available connection IDs after retirement!")
# Update statistics # Update statistics
self._stats["connection_ids_retired"] += 1 self._stats["connection_ids_retired"] += 1
@ -933,13 +929,13 @@ class QUICConnection(IRawConnection, IMuxedConn):
async def _handle_ping_acknowledged(self, event: events.PingAcknowledged) -> None: async def _handle_ping_acknowledged(self, event: events.PingAcknowledged) -> None:
"""Handle ping acknowledgment.""" """Handle ping acknowledgment."""
logger.info(f"Ping acknowledged: uid={event.uid}") print(f"Ping acknowledged: uid={event.uid}")
async def _handle_protocol_negotiated( async def _handle_protocol_negotiated(
self, event: events.ProtocolNegotiated self, event: events.ProtocolNegotiated
) -> None: ) -> None:
"""Handle protocol negotiation completion.""" """Handle protocol negotiation completion."""
logger.info(f"Protocol negotiated: {event.alpn_protocol}") print(f"Protocol negotiated: {event.alpn_protocol}")
async def _handle_stop_sending_received( async def _handle_stop_sending_received(
self, event: events.StopSendingReceived self, event: events.StopSendingReceived
@ -961,7 +957,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self, event: events.HandshakeCompleted self, event: events.HandshakeCompleted
) -> None: ) -> None:
"""Handle handshake completion with security integration.""" """Handle handshake completion with security integration."""
logger.info("QUIC handshake completed") print("QUIC handshake completed")
self._handshake_completed = True self._handshake_completed = True
# Store handshake event for security verification # Store handshake event for security verification
@ -970,14 +966,14 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Try to extract certificate information after handshake # Try to extract certificate information after handshake
await self._extract_peer_certificate() await self._extract_peer_certificate()
logger.info("✅ Setting connected event") print("✅ Setting connected event")
self._connected_event.set() self._connected_event.set()
async def _handle_connection_terminated( async def _handle_connection_terminated(
self, event: events.ConnectionTerminated self, event: events.ConnectionTerminated
) -> None: ) -> None:
"""Handle connection termination.""" """Handle connection termination."""
logger.info(f"QUIC connection terminated: {event.reason_phrase}") print(f"QUIC connection terminated: {event.reason_phrase}")
# Close all streams # Close all streams
for stream in list(self._streams.values()): for stream in list(self._streams.values()):
@ -1003,7 +999,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
try: try:
if stream_id not in self._streams: if stream_id not in self._streams:
if self._is_incoming_stream(stream_id): if self._is_incoming_stream(stream_id):
logger.info(f"Creating new incoming stream {stream_id}") print(f"Creating new incoming stream {stream_id}")
from .stream import QUICStream, StreamDirection from .stream import QUICStream, StreamDirection
@ -1038,7 +1034,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
except Exception as e: except Exception as e:
logger.error(f"Error handling stream data for stream {stream_id}: {e}") logger.error(f"Error handling stream data for stream {stream_id}: {e}")
logger.info(f"❌ STREAM_DATA: Error: {e}") print(f"❌ STREAM_DATA: Error: {e}")
async def _get_or_create_stream(self, stream_id: int) -> QUICStream: async def _get_or_create_stream(self, stream_id: int) -> QUICStream:
"""Get existing stream or create new inbound stream.""" """Get existing stream or create new inbound stream."""
@ -1095,7 +1091,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
except Exception as e: except Exception as e:
logger.error(f"Error in stream handler for stream {stream_id}: {e}") logger.error(f"Error in stream handler for stream {stream_id}: {e}")
logger.info(f"Created inbound stream {stream_id}") print(f"Created inbound stream {stream_id}")
return stream return stream
def _is_incoming_stream(self, stream_id: int) -> bool: def _is_incoming_stream(self, stream_id: int) -> bool:
@ -1122,7 +1118,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
try: try:
stream = self._streams[stream_id] stream = self._streams[stream_id]
await stream.handle_reset(event.error_code) await stream.handle_reset(event.error_code)
logger.info( print(
f"Handled reset for stream {stream_id}" f"Handled reset for stream {stream_id}"
f"with error code {event.error_code}" f"with error code {event.error_code}"
) )
@ -1131,13 +1127,13 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Force remove the stream # Force remove the stream
self._remove_stream(stream_id) self._remove_stream(stream_id)
else: else:
logger.info(f"Received reset for unknown stream {stream_id}") print(f"Received reset for unknown stream {stream_id}")
async def _handle_datagram_received( async def _handle_datagram_received(
self, event: events.DatagramFrameReceived self, event: events.DatagramFrameReceived
) -> None: ) -> None:
"""Handle datagram frame (if using QUIC datagrams).""" """Handle datagram frame (if using QUIC datagrams)."""
logger.info(f"Datagram frame received: size={len(event.data)}") print(f"Datagram frame received: size={len(event.data)}")
# For now, just log. Could be extended for custom datagram handling # For now, just log. Could be extended for custom datagram handling
async def _handle_timer_events(self) -> None: async def _handle_timer_events(self) -> None:
@ -1154,7 +1150,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
"""Transmit pending QUIC packets using available socket.""" """Transmit pending QUIC packets using available socket."""
sock = self._socket sock = self._socket
if not sock: if not sock:
logger.info("No socket to transmit") print("No socket to transmit")
return return
try: try:
@ -1200,7 +1196,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
return return
self._closed = True self._closed = True
logger.info(f"Closing QUIC connection to {self._remote_peer_id}") print(f"Closing QUIC connection to {self._remote_peer_id}")
try: try:
# Close all streams gracefully # Close all streams gracefully
@ -1242,7 +1238,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._streams.clear() self._streams.clear()
self._closed_event.set() self._closed_event.set()
logger.info(f"QUIC connection to {self._remote_peer_id} closed") print(f"QUIC connection to {self._remote_peer_id} closed")
except Exception as e: except Exception as e:
logger.error(f"Error during connection close: {e}") logger.error(f"Error during connection close: {e}")
@ -1257,13 +1253,13 @@ class QUICConnection(IRawConnection, IMuxedConn):
try: try:
if self._transport: if self._transport:
await self._transport._cleanup_terminated_connection(self) await self._transport._cleanup_terminated_connection(self)
logger.info("Notified transport of connection termination") print("Notified transport of connection termination")
return return
for listener in self._transport._listeners: for listener in self._transport._listeners:
try: try:
await listener._remove_connection_by_object(self) await listener._remove_connection_by_object(self)
logger.info("Found and notified listener of connection termination") print("Found and notified listener of connection termination")
return return
except Exception: except Exception:
continue continue
@ -1288,10 +1284,10 @@ class QUICConnection(IRawConnection, IMuxedConn):
for tracked_cid, tracked_conn in list(listener._connections.items()): for tracked_cid, tracked_conn in list(listener._connections.items()):
if tracked_conn is self: if tracked_conn is self:
await listener._remove_connection(tracked_cid) await listener._remove_connection(tracked_cid)
logger.info(f"Removed connection {tracked_cid.hex()}") print(f"Removed connection {tracked_cid.hex()}")
return return
logger.info("Fallback cleanup by connection ID completed") print("Fallback cleanup by connection ID completed")
except Exception as e: except Exception as e:
logger.error(f"Error in fallback cleanup: {e}") logger.error(f"Error in fallback cleanup: {e}")
@ -1334,6 +1330,9 @@ class QUICConnection(IRawConnection, IMuxedConn):
""" """
# This method doesn't make sense for a muxed connection # This method doesn't make sense for a muxed connection
# It's here for interface compatibility but should not be used # It's here for interface compatibility but should not be used
import traceback
traceback.print_stack()
raise NotImplementedError( raise NotImplementedError(
"Use streams for reading data from QUIC connections. " "Use streams for reading data from QUIC connections. "
"Call accept_stream() or open_stream() instead." "Call accept_stream() or open_stream() instead."

View File

@ -42,6 +42,7 @@ if TYPE_CHECKING:
from .transport import QUICTransport from .transport import QUICTransport
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)], handlers=[logging.StreamHandler(sys.stdout)],
) )
@ -724,7 +725,8 @@ class QUICListener(IListener):
if self._security_manager: if self._security_manager:
try: try:
await connection._verify_peer_identity_with_security() peer_id = await connection._verify_peer_identity_with_security()
connection.peer_id = peer_id
logger.info( logger.info(
f"Security verification successful for {dest_cid.hex()}" f"Security verification successful for {dest_cid.hex()}"
) )

View File

@ -492,6 +492,7 @@ class QUICTLSSecurityConfig:
# TLS verification settings # TLS verification settings
verify_mode: ssl.VerifyMode = ssl.CERT_NONE verify_mode: ssl.VerifyMode = ssl.CERT_NONE
check_hostname: bool = False check_hostname: bool = False
request_client_certificate: bool = False
# Optional peer ID for validation # Optional peer ID for validation
peer_id: ID | None = None peer_id: ID | None = None
@ -657,8 +658,9 @@ def create_server_tls_config(
peer_id=peer_id, peer_id=peer_id,
is_client_config=False, is_client_config=False,
config_name="server", config_name="server",
verify_mode=ssl.CERT_NONE, # Server doesn't verify client certs in libp2p verify_mode=ssl.CERT_NONE,
check_hostname=False, check_hostname=False,
request_client_certificate=True,
**kwargs, **kwargs,
) )
@ -688,7 +690,7 @@ def create_client_tls_config(
peer_id=peer_id, peer_id=peer_id,
is_client_config=True, is_client_config=True,
config_name="client", config_name="client",
verify_mode=ssl.CERT_NONE, # Client doesn't verify server certs in libp2p verify_mode=ssl.CERT_NONE,
check_hostname=False, check_hostname=False,
**kwargs, **kwargs,
) )

View File

@ -222,9 +222,6 @@ class QUICTransport(ITransport):
config.private_key = tls_config.private_key config.private_key = tls_config.private_key
config.certificate_chain = tls_config.certificate_chain config.certificate_chain = tls_config.certificate_chain
config.alpn_protocols = tls_config.alpn_protocols config.alpn_protocols = tls_config.alpn_protocols
config.verify_mode = tls_config.verify_mode
config.verify_mode = ssl.CERT_NONE config.verify_mode = ssl.CERT_NONE
print("Successfully applied TLS configuration to QUIC config") print("Successfully applied TLS configuration to QUIC config")
@ -297,9 +294,6 @@ class QUICTransport(ITransport):
await connection.connect(self._background_nursery) await connection.connect(self._background_nursery)
print("Starting to verify peer identity")
print("Identity verification done")
# Store connection for management # Store connection for management
conn_id = f"{host}:{port}" conn_id = f"{host}:{port}"
self._connections[conn_id] = connection self._connections[conn_id] = connection

View File

@ -353,6 +353,8 @@ def create_server_config_from_base(
server_config.certificate_chain = server_tls_config.certificate_chain server_config.certificate_chain = server_tls_config.certificate_chain
if server_tls_config.alpn_protocols: if server_tls_config.alpn_protocols:
server_config.alpn_protocols = server_tls_config.alpn_protocols server_config.alpn_protocols = server_tls_config.alpn_protocols
print("Setting request client certificate to True")
server_tls_config.request_client_certificate = True
except Exception as e: except Exception as e:
logger.warning(f"Failed to apply security manager config: {e}") logger.warning(f"Failed to apply security manager config: {e}")