diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index a4230507..cc1910db 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -2,6 +2,8 @@ from collections.abc import ( Awaitable, Callable, ) +from libp2p.transport.quic.connection import QUICConnection +from typing import cast import logging import sys @@ -281,6 +283,17 @@ class Swarm(Service, INetworkService): ) -> None: raw_conn = RawConnection(read_write_closer, False) + # No need to upgrade QUIC Connection + if isinstance(self.transport, QUICTransport): + print("Connecting QUIC Connection") + quic_conn = cast(QUICConnection, raw_conn) + await self.add_conn(quic_conn) + # NOTE: This is a intentional barrier to prevent from the handler + # exiting and closing the connection. + await self.manager.wait_finished() + print("Connection Connected") + return + # Per, https://discuss.libp2p.io/t/multistream-security/130, we first # secure the conn and then mux the conn try: @@ -396,6 +409,7 @@ class Swarm(Service, INetworkService): muxed_conn, self, ) + print("add_conn called") self.manager.run_task(muxed_conn.start) await muxed_conn.event_started.wait() diff --git a/libp2p/transport/quic/connection.py b/libp2p/transport/quic/connection.py index c8df5f76..a555a900 100644 --- a/libp2p/transport/quic/connection.py +++ b/libp2p/transport/quic/connection.py @@ -44,6 +44,7 @@ logging.basicConfig( handlers=[logging.StreamHandler(stdout)], ) logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) class QUICConnection(IRawConnection, IMuxedConn): @@ -179,7 +180,7 @@ class QUICConnection(IRawConnection, IMuxedConn): "connection_id_changes": 0, } - logger.info( + print( f"Created QUIC connection to {remote_peer_id} " f"(initiator: {is_initiator}, addr: {remote_addr}, " "security: {security_manager is not None})" @@ -278,7 +279,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self._started = True self.event_started.set() - logger.info(f"Starting QUIC connection to {self._remote_peer_id}") + print(f"Starting QUIC connection to {self._remote_peer_id}") try: # If this is a client connection, we need to establish the connection @@ -289,7 +290,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self._established = True self._connected_event.set() - logger.info(f"QUIC connection to {self._remote_peer_id} started") + print(f"QUIC connection to {self._remote_peer_id} started") except Exception as e: logger.error(f"Failed to start connection: {e}") @@ -300,7 +301,7 @@ class QUICConnection(IRawConnection, IMuxedConn): try: with QUICErrorContext("connection_initiation", "connection"): if not self._socket: - logger.info("Creating new socket for outbound connection") + print("Creating new socket for outbound connection") self._socket = trio.socket.socket( family=socket.AF_INET, type=socket.SOCK_DGRAM ) @@ -312,7 +313,7 @@ class QUICConnection(IRawConnection, IMuxedConn): # Send initial packet(s) await self._transmit() - logger.info(f"Initiated QUIC connection to {self._remote_addr}") + print(f"Initiated QUIC connection to {self._remote_addr}") except Exception as e: logger.error(f"Failed to initiate connection: {e}") @@ -334,16 +335,16 @@ class QUICConnection(IRawConnection, IMuxedConn): try: with QUICErrorContext("connection_establishment", "connection"): # Start the connection if not already started - logger.info("STARTING TO CONNECT") + print("STARTING TO CONNECT") if not self._started: await self.start() # Start background event processing if not self._background_tasks_started: - logger.info("STARTING BACKGROUND TASK") + print("STARTING BACKGROUND TASK") await self._start_background_tasks() else: - logger.info("BACKGROUND TASK ALREADY STARTED") + print("BACKGROUND TASK ALREADY STARTED") # Wait for handshake completion with timeout with trio.move_on_after( @@ -357,15 +358,13 @@ class QUICConnection(IRawConnection, IMuxedConn): f"{self.CONNECTION_HANDSHAKE_TIMEOUT}s" ) - logger.info( - "QUICConnection: Verifying peer identity with security manager" - ) + print("QUICConnection: Verifying peer identity with security manager") # Verify peer identity using security manager - await self._verify_peer_identity_with_security() + self.peer_id = await self._verify_peer_identity_with_security() - logger.info("QUICConnection: Peer identity verified") + print("QUICConnection: Peer identity verified") self._established = True - logger.info(f"QUIC connection established with {self._remote_peer_id}") + print(f"QUIC connection established with {self._remote_peer_id}") except Exception as e: logger.error(f"Failed to establish connection: {e}") @@ -385,11 +384,11 @@ class QUICConnection(IRawConnection, IMuxedConn): self._nursery.start_soon(async_fn=self._event_processing_loop) self._nursery.start_soon(async_fn=self._periodic_maintenance) - logger.info("Started background tasks for QUIC connection") + print("Started background tasks for QUIC connection") async def _event_processing_loop(self) -> None: """Main event processing loop for the connection.""" - logger.info( + print( f"Started QUIC event processing loop for connection id: {id(self)} " f"and local peer id {str(self.local_peer_id())}" ) @@ -412,7 +411,7 @@ class QUICConnection(IRawConnection, IMuxedConn): logger.error(f"Error in event processing loop: {e}") await self._handle_connection_error(e) finally: - logger.info("QUIC event processing loop finished") + print("QUIC event processing loop finished") async def _periodic_maintenance(self) -> None: """Perform periodic connection maintenance.""" @@ -427,7 +426,7 @@ class QUICConnection(IRawConnection, IMuxedConn): # *** NEW: Log connection ID status periodically *** if logger.isEnabledFor(logging.DEBUG): cid_stats = self.get_connection_id_stats() - logger.info(f"Connection ID stats: {cid_stats}") + print(f"Connection ID stats: {cid_stats}") # Sleep for maintenance interval await trio.sleep(30.0) # 30 seconds @@ -437,15 +436,15 @@ class QUICConnection(IRawConnection, IMuxedConn): async def _client_packet_receiver(self) -> None: """Receive packets for client connections.""" - logger.info("Starting client packet receiver") - logger.info("Started QUIC client packet receiver") + print("Starting client packet receiver") + print("Started QUIC client packet receiver") try: while not self._closed and self._socket: try: # Receive UDP packets data, addr = await self._socket.recvfrom(65536) - logger.info(f"Client received {len(data)} bytes from {addr}") + print(f"Client received {len(data)} bytes from {addr}") # Feed packet to QUIC connection self._quic.receive_datagram(data, addr, now=time.time()) @@ -457,21 +456,21 @@ class QUICConnection(IRawConnection, IMuxedConn): await self._transmit() except trio.ClosedResourceError: - logger.info("Client socket closed") + print("Client socket closed") break except Exception as e: logger.error(f"Error receiving client packet: {e}") await trio.sleep(0.01) except trio.Cancelled: - logger.info("Client packet receiver cancelled") + print("Client packet receiver cancelled") raise finally: - logger.info("Client packet receiver terminated") + print("Client packet receiver terminated") # Security and identity methods - async def _verify_peer_identity_with_security(self) -> None: + async def _verify_peer_identity_with_security(self) -> ID: """ Verify peer identity using integrated security manager. @@ -479,9 +478,9 @@ class QUICConnection(IRawConnection, IMuxedConn): QUICPeerVerificationError: If peer verification fails """ - logger.info("VERIFYING PEER IDENTITY") + print("VERIFYING PEER IDENTITY") if not self._security_manager: - logger.warning("No security manager available for peer verification") + print("No security manager available for peer verification") return try: @@ -489,11 +488,12 @@ class QUICConnection(IRawConnection, IMuxedConn): await self._extract_peer_certificate() if not self._peer_certificate: - logger.warning("No peer certificate available for verification") + print("No peer certificate available for verification") return # Validate certificate format and accessibility if not self._validate_peer_certificate(): + print("Validation Failed for peer cerificate") raise QUICPeerVerificationError("Peer certificate validation failed") # Verify peer identity using security manager @@ -505,7 +505,7 @@ class QUICConnection(IRawConnection, IMuxedConn): # Update peer ID if it wasn't known (inbound connections) if not self._remote_peer_id: self._remote_peer_id = verified_peer_id - logger.info(f"Discovered peer ID from certificate: {verified_peer_id}") + print(f"Discovered peer ID from certificate: {verified_peer_id}") elif self._remote_peer_id != verified_peer_id: raise QUICPeerVerificationError( f"Peer ID mismatch: expected {self._remote_peer_id}, " @@ -513,7 +513,8 @@ class QUICConnection(IRawConnection, IMuxedConn): ) self._peer_verified = True - logger.info(f"Peer identity verified successfully: {verified_peer_id}") + print(f"Peer identity verified successfully: {verified_peer_id}") + return verified_peer_id except QUICPeerVerificationError: # Re-raise verification errors as-is @@ -526,26 +527,21 @@ class QUICConnection(IRawConnection, IMuxedConn): """Extract peer certificate from completed TLS handshake.""" try: # Get peer certificate from aioquic TLS context - # Based on aioquic source code: QuicConnection.tls._peer_certificate - if hasattr(self._quic, "tls") and self._quic.tls: + if self._quic.tls: tls_context = self._quic.tls - # Check if peer certificate is available in TLS context - if ( - hasattr(tls_context, "_peer_certificate") - and tls_context._peer_certificate - ): + if tls_context._peer_certificate: # aioquic stores the peer certificate as cryptography # x509.Certificate self._peer_certificate = tls_context._peer_certificate - logger.info( + print( f"Extracted peer certificate: {self._peer_certificate.subject}" ) else: - logger.info("No peer certificate found in TLS context") + print("No peer certificate found in TLS context") else: - logger.info("No TLS context available for certificate extraction") + print("No TLS context available for certificate extraction") except Exception as e: logger.warning(f"Failed to extract peer certificate: {e}") @@ -594,7 +590,7 @@ class QUICConnection(IRawConnection, IMuxedConn): subject = self._peer_certificate.subject serial_number = self._peer_certificate.serial_number - logger.info( + print( f"Certificate validation - Subject: {subject}, Serial: {serial_number}" ) return True @@ -719,7 +715,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self._outbound_stream_count += 1 self._stats["streams_opened"] += 1 - logger.info(f"Opened outbound QUIC stream {stream_id}") + print(f"Opened outbound QUIC stream {stream_id}") return stream raise QUICStreamTimeoutError(f"Stream creation timed out after {timeout}s") @@ -781,7 +777,7 @@ class QUICConnection(IRawConnection, IMuxedConn): """ self._stream_handler = handler_function - logger.info("Set stream handler for incoming streams") + print("Set stream handler for incoming streams") def _remove_stream(self, stream_id: int) -> None: """ @@ -808,7 +804,7 @@ class QUICConnection(IRawConnection, IMuxedConn): if self._nursery: self._nursery.start_soon(update_counts) - logger.info(f"Removed stream {stream_id} from connection") + print(f"Removed stream {stream_id} from connection") # *** UPDATED: Complete QUIC event handling - FIXES THE ORIGINAL ISSUE *** @@ -830,15 +826,15 @@ class QUICConnection(IRawConnection, IMuxedConn): await self._handle_quic_event(event) if events_processed > 0: - logger.info(f"Processed {events_processed} QUIC events") + print(f"Processed {events_processed} QUIC events") finally: self._event_processing_active = False async def _handle_quic_event(self, event: events.QuicEvent) -> None: """Handle a single QUIC event with COMPLETE event type coverage.""" - logger.info(f"Handling QUIC event: {type(event).__name__}") - logger.info(f"QUIC event: {type(event).__name__}") + print(f"Handling QUIC event: {type(event).__name__}") + print(f"QUIC event: {type(event).__name__}") try: if isinstance(event, events.ConnectionTerminated): @@ -864,8 +860,8 @@ class QUICConnection(IRawConnection, IMuxedConn): elif isinstance(event, events.StopSendingReceived): await self._handle_stop_sending_received(event) else: - logger.info(f"Unhandled QUIC event type: {type(event).__name__}") - logger.info(f"Unhandled QUIC event: {type(event).__name__}") + print(f"Unhandled QUIC event type: {type(event).__name__}") + print(f"Unhandled QUIC event: {type(event).__name__}") except Exception as e: logger.error(f"Error handling QUIC event {type(event).__name__}: {e}") @@ -880,8 +876,8 @@ class QUICConnection(IRawConnection, IMuxedConn): This is the CRITICAL missing functionality that was causing your issue! """ - logger.info(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}") - logger.info(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}") + print(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}") + print(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}") # Add to available connection IDs self._available_connection_ids.add(event.connection_id) @@ -889,14 +885,14 @@ class QUICConnection(IRawConnection, IMuxedConn): # If we don't have a current connection ID, use this one if self._current_connection_id is None: self._current_connection_id = event.connection_id - logger.info(f"🆔 Set current connection ID to: {event.connection_id.hex()}") - logger.info(f"🆔 Set current connection ID to: {event.connection_id.hex()}") + print(f"🆔 Set current connection ID to: {event.connection_id.hex()}") + print(f"🆔 Set current connection ID to: {event.connection_id.hex()}") # Update statistics self._stats["connection_ids_issued"] += 1 - logger.info(f"Available connection IDs: {len(self._available_connection_ids)}") - logger.info(f"Available connection IDs: {len(self._available_connection_ids)}") + print(f"Available connection IDs: {len(self._available_connection_ids)}") + print(f"Available connection IDs: {len(self._available_connection_ids)}") async def _handle_connection_id_retired( self, event: events.ConnectionIdRetired @@ -906,8 +902,8 @@ class QUICConnection(IRawConnection, IMuxedConn): This handles when the peer tells us to stop using a connection ID. """ - logger.info(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}") - logger.info(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}") + print(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}") + print(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}") # Remove from available IDs and add to retired set self._available_connection_ids.discard(event.connection_id) @@ -924,7 +920,7 @@ class QUICConnection(IRawConnection, IMuxedConn): else: self._current_connection_id = None logger.warning("⚠️ No available connection IDs after retirement!") - logger.info("⚠️ No available connection IDs after retirement!") + print("⚠️ No available connection IDs after retirement!") # Update statistics self._stats["connection_ids_retired"] += 1 @@ -933,13 +929,13 @@ class QUICConnection(IRawConnection, IMuxedConn): async def _handle_ping_acknowledged(self, event: events.PingAcknowledged) -> None: """Handle ping acknowledgment.""" - logger.info(f"Ping acknowledged: uid={event.uid}") + print(f"Ping acknowledged: uid={event.uid}") async def _handle_protocol_negotiated( self, event: events.ProtocolNegotiated ) -> None: """Handle protocol negotiation completion.""" - logger.info(f"Protocol negotiated: {event.alpn_protocol}") + print(f"Protocol negotiated: {event.alpn_protocol}") async def _handle_stop_sending_received( self, event: events.StopSendingReceived @@ -961,7 +957,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self, event: events.HandshakeCompleted ) -> None: """Handle handshake completion with security integration.""" - logger.info("QUIC handshake completed") + print("QUIC handshake completed") self._handshake_completed = True # Store handshake event for security verification @@ -970,14 +966,14 @@ class QUICConnection(IRawConnection, IMuxedConn): # Try to extract certificate information after handshake await self._extract_peer_certificate() - logger.info("✅ Setting connected event") + print("✅ Setting connected event") self._connected_event.set() async def _handle_connection_terminated( self, event: events.ConnectionTerminated ) -> None: """Handle connection termination.""" - logger.info(f"QUIC connection terminated: {event.reason_phrase}") + print(f"QUIC connection terminated: {event.reason_phrase}") # Close all streams for stream in list(self._streams.values()): @@ -1003,7 +999,7 @@ class QUICConnection(IRawConnection, IMuxedConn): try: if stream_id not in self._streams: if self._is_incoming_stream(stream_id): - logger.info(f"Creating new incoming stream {stream_id}") + print(f"Creating new incoming stream {stream_id}") from .stream import QUICStream, StreamDirection @@ -1038,7 +1034,7 @@ class QUICConnection(IRawConnection, IMuxedConn): except Exception as e: logger.error(f"Error handling stream data for stream {stream_id}: {e}") - logger.info(f"❌ STREAM_DATA: Error: {e}") + print(f"❌ STREAM_DATA: Error: {e}") async def _get_or_create_stream(self, stream_id: int) -> QUICStream: """Get existing stream or create new inbound stream.""" @@ -1095,7 +1091,7 @@ class QUICConnection(IRawConnection, IMuxedConn): except Exception as e: logger.error(f"Error in stream handler for stream {stream_id}: {e}") - logger.info(f"Created inbound stream {stream_id}") + print(f"Created inbound stream {stream_id}") return stream def _is_incoming_stream(self, stream_id: int) -> bool: @@ -1122,7 +1118,7 @@ class QUICConnection(IRawConnection, IMuxedConn): try: stream = self._streams[stream_id] await stream.handle_reset(event.error_code) - logger.info( + print( f"Handled reset for stream {stream_id}" f"with error code {event.error_code}" ) @@ -1131,13 +1127,13 @@ class QUICConnection(IRawConnection, IMuxedConn): # Force remove the stream self._remove_stream(stream_id) else: - logger.info(f"Received reset for unknown stream {stream_id}") + print(f"Received reset for unknown stream {stream_id}") async def _handle_datagram_received( self, event: events.DatagramFrameReceived ) -> None: """Handle datagram frame (if using QUIC datagrams).""" - logger.info(f"Datagram frame received: size={len(event.data)}") + print(f"Datagram frame received: size={len(event.data)}") # For now, just log. Could be extended for custom datagram handling async def _handle_timer_events(self) -> None: @@ -1154,7 +1150,7 @@ class QUICConnection(IRawConnection, IMuxedConn): """Transmit pending QUIC packets using available socket.""" sock = self._socket if not sock: - logger.info("No socket to transmit") + print("No socket to transmit") return try: @@ -1200,7 +1196,7 @@ class QUICConnection(IRawConnection, IMuxedConn): return self._closed = True - logger.info(f"Closing QUIC connection to {self._remote_peer_id}") + print(f"Closing QUIC connection to {self._remote_peer_id}") try: # Close all streams gracefully @@ -1242,7 +1238,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self._streams.clear() self._closed_event.set() - logger.info(f"QUIC connection to {self._remote_peer_id} closed") + print(f"QUIC connection to {self._remote_peer_id} closed") except Exception as e: logger.error(f"Error during connection close: {e}") @@ -1257,13 +1253,13 @@ class QUICConnection(IRawConnection, IMuxedConn): try: if self._transport: await self._transport._cleanup_terminated_connection(self) - logger.info("Notified transport of connection termination") + print("Notified transport of connection termination") return for listener in self._transport._listeners: try: await listener._remove_connection_by_object(self) - logger.info("Found and notified listener of connection termination") + print("Found and notified listener of connection termination") return except Exception: continue @@ -1288,10 +1284,10 @@ class QUICConnection(IRawConnection, IMuxedConn): for tracked_cid, tracked_conn in list(listener._connections.items()): if tracked_conn is self: await listener._remove_connection(tracked_cid) - logger.info(f"Removed connection {tracked_cid.hex()}") + print(f"Removed connection {tracked_cid.hex()}") return - logger.info("Fallback cleanup by connection ID completed") + print("Fallback cleanup by connection ID completed") except Exception as e: logger.error(f"Error in fallback cleanup: {e}") @@ -1334,6 +1330,9 @@ class QUICConnection(IRawConnection, IMuxedConn): """ # This method doesn't make sense for a muxed connection # It's here for interface compatibility but should not be used + import traceback + + traceback.print_stack() raise NotImplementedError( "Use streams for reading data from QUIC connections. " "Call accept_stream() or open_stream() instead." diff --git a/libp2p/transport/quic/listener.py b/libp2p/transport/quic/listener.py index 2e6bf3de..e86b8acb 100644 --- a/libp2p/transport/quic/listener.py +++ b/libp2p/transport/quic/listener.py @@ -42,6 +42,7 @@ if TYPE_CHECKING: from .transport import QUICTransport logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) @@ -724,7 +725,8 @@ class QUICListener(IListener): if self._security_manager: try: - await connection._verify_peer_identity_with_security() + peer_id = await connection._verify_peer_identity_with_security() + connection.peer_id = peer_id logger.info( f"Security verification successful for {dest_cid.hex()}" ) diff --git a/libp2p/transport/quic/security.py b/libp2p/transport/quic/security.py index 97754960..9760937c 100644 --- a/libp2p/transport/quic/security.py +++ b/libp2p/transport/quic/security.py @@ -492,6 +492,7 @@ class QUICTLSSecurityConfig: # TLS verification settings verify_mode: ssl.VerifyMode = ssl.CERT_NONE check_hostname: bool = False + request_client_certificate: bool = False # Optional peer ID for validation peer_id: ID | None = None @@ -657,8 +658,9 @@ def create_server_tls_config( peer_id=peer_id, is_client_config=False, config_name="server", - verify_mode=ssl.CERT_NONE, # Server doesn't verify client certs in libp2p + verify_mode=ssl.CERT_NONE, check_hostname=False, + request_client_certificate=True, **kwargs, ) @@ -688,7 +690,7 @@ def create_client_tls_config( peer_id=peer_id, is_client_config=True, config_name="client", - verify_mode=ssl.CERT_NONE, # Client doesn't verify server certs in libp2p + verify_mode=ssl.CERT_NONE, check_hostname=False, **kwargs, ) diff --git a/libp2p/transport/quic/transport.py b/libp2p/transport/quic/transport.py index 4b9b67a8..59cc3bd5 100644 --- a/libp2p/transport/quic/transport.py +++ b/libp2p/transport/quic/transport.py @@ -222,9 +222,6 @@ class QUICTransport(ITransport): config.private_key = tls_config.private_key config.certificate_chain = tls_config.certificate_chain config.alpn_protocols = tls_config.alpn_protocols - - config.verify_mode = tls_config.verify_mode - config.verify_mode = ssl.CERT_NONE print("Successfully applied TLS configuration to QUIC config") @@ -297,9 +294,6 @@ class QUICTransport(ITransport): await connection.connect(self._background_nursery) - print("Starting to verify peer identity") - - print("Identity verification done") # Store connection for management conn_id = f"{host}:{port}" self._connections[conn_id] = connection diff --git a/libp2p/transport/quic/utils.py b/libp2p/transport/quic/utils.py index 0062f7d9..fb65f1e3 100644 --- a/libp2p/transport/quic/utils.py +++ b/libp2p/transport/quic/utils.py @@ -353,6 +353,8 @@ def create_server_config_from_base( server_config.certificate_chain = server_tls_config.certificate_chain if server_tls_config.alpn_protocols: server_config.alpn_protocols = server_tls_config.alpn_protocols + print("Setting request client certificate to True") + server_tls_config.request_client_certificate = True except Exception as e: logger.warning(f"Failed to apply security manager config: {e}")