""" QUIC Transport implementation for py-libp2p with integrated security. Uses aioquic's sans-IO core with trio for native async support. Based on aioquic library with interface consistency to go-libp2p and js-libp2p. Updated to include Module 5 security integration. """ from collections.abc import Iterable import copy import logging from aioquic.quic.configuration import ( QuicConfiguration, ) from aioquic.quic.connection import ( QuicConnection as NativeQUICConnection, ) import multiaddr import trio from libp2p.abc import ( IRawConnection, ITransport, ) from libp2p.crypto.keys import ( PrivateKey, ) from libp2p.custom_types import THandler, TProtocol from libp2p.peer.id import ( ID, ) from libp2p.transport.quic.security import TSecurityConfig from libp2p.transport.quic.utils import ( get_alpn_protocols, is_quic_multiaddr, multiaddr_to_quic_version, quic_multiaddr_to_endpoint, quic_version_to_wire_format, ) from .config import ( QUICTransportConfig, ) from .connection import ( QUICConnection, ) from .exceptions import ( QUICDialError, QUICListenError, QUICSecurityError, ) from .listener import ( QUICListener, ) from .security import ( QUICTLSConfigManager, create_quic_security_transport, ) QUIC_V1_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_V1 QUIC_DRAFT29_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_DRAFT29 logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler()], ) logger = logging.getLogger(__name__) class QUICTransport(ITransport): """ QUIC Transport implementation following libp2p transport interface. Uses aioquic's sans-IO core with trio for native async support. Supports both QUIC v1 (RFC 9000) and draft-29 for compatibility with go-libp2p and js-libp2p implementations. Includes integrated libp2p TLS security with peer identity verification. """ def __init__( self, private_key: PrivateKey, config: QUICTransportConfig | None = None ): """ Initialize QUIC transport with security integration. Args: private_key: libp2p private key for identity and TLS cert generation config: QUIC transport configuration options """ self._private_key = private_key self._peer_id = ID.from_pubkey(private_key.get_public_key()) self._config = config or QUICTransportConfig() # Connection management self._connections: dict[str, QUICConnection] = {} self._listeners: list[QUICListener] = [] # Security manager for TLS integration self._security_manager = create_quic_security_transport( self._private_key, self._peer_id ) # QUIC configurations for different versions self._quic_configs: dict[TProtocol, QuicConfiguration] = {} self._setup_quic_configurations() # Resource management self._closed = False self._nursery_manager = trio.CapacityLimiter(1) logger.info( f"Initialized QUIC transport with security for peer {self._peer_id}" ) def _setup_quic_configurations(self) -> None: """Setup QUIC configurations.""" try: # Get TLS configuration from security manager server_tls_config = self._security_manager.create_server_config() client_tls_config = self._security_manager.create_client_config() # Base server configuration base_server_config = QuicConfiguration( is_client=False, alpn_protocols=get_alpn_protocols(), verify_mode=self._config.verify_mode, max_datagram_frame_size=self._config.max_datagram_size, idle_timeout=self._config.idle_timeout, ) # Base client configuration base_client_config = QuicConfiguration( is_client=True, alpn_protocols=get_alpn_protocols(), verify_mode=self._config.verify_mode, max_datagram_frame_size=self._config.max_datagram_size, idle_timeout=self._config.idle_timeout, ) # Apply TLS configuration self._apply_tls_configuration(base_server_config, server_tls_config) self._apply_tls_configuration(base_client_config, client_tls_config) # QUIC v1 (RFC 9000) configurations quic_v1_server_config = copy.copy(base_server_config) quic_v1_server_config.supported_versions = [ quic_version_to_wire_format(QUIC_V1_PROTOCOL) ] quic_v1_client_config = copy.copy(base_client_config) quic_v1_client_config.supported_versions = [ quic_version_to_wire_format(QUIC_V1_PROTOCOL) ] # Store both server and client configs for v1 self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_server")] = ( quic_v1_server_config ) self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_client")] = ( quic_v1_client_config ) # QUIC draft-29 configurations for compatibility if self._config.enable_draft29: draft29_server_config: QuicConfiguration = copy.copy(base_server_config) draft29_server_config.supported_versions = [ quic_version_to_wire_format(QUIC_DRAFT29_PROTOCOL) ] draft29_client_config = copy.copy(base_client_config) draft29_client_config.supported_versions = [ quic_version_to_wire_format(QUIC_DRAFT29_PROTOCOL) ] self._quic_configs[TProtocol(f"{QUIC_DRAFT29_PROTOCOL}_server")] = ( draft29_server_config ) self._quic_configs[TProtocol(f"{QUIC_DRAFT29_PROTOCOL}_client")] = ( draft29_client_config ) logger.info("QUIC configurations initialized with libp2p TLS security") except Exception as e: raise QUICSecurityError( f"Failed to setup QUIC TLS configurations: {e}" ) from e def _apply_tls_configuration( self, config: QuicConfiguration, tls_config: TSecurityConfig ) -> None: """ Apply TLS configuration to a QUIC configuration using aioquic's actual API. Args: config: QuicConfiguration to update tls_config: TLS configuration dictionary from security manager """ try: # Set certificate and private key directly on the configuration # aioquic expects cryptography objects, not DER bytes if "certificate" in tls_config and "private_key" in tls_config: # The security manager should return cryptography objects # not DER bytes, but if it returns DER bytes, we need to handle that certificate = tls_config["certificate"] private_key = tls_config["private_key"] # Check if we received DER bytes and need # to convert to cryptography objects if isinstance(certificate, bytes): from cryptography import x509 certificate = x509.load_der_x509_certificate(certificate) if isinstance(private_key, bytes): from cryptography.hazmat.primitives import serialization private_key = serialization.load_der_private_key( # type: ignore private_key, password=None ) # Set directly on the configuration object config.certificate = certificate config.private_key = private_key # Handle certificate chain if provided certificate_chain = tls_config.get("certificate_chain", []) if certificate_chain and isinstance(certificate_chain, Iterable): # Convert DER bytes to cryptography objects if needed chain_objects = [] for cert in certificate_chain: if isinstance(cert, bytes): from cryptography import x509 cert = x509.load_der_x509_certificate(cert) chain_objects.append(cert) config.certificate_chain = chain_objects # Set ALPN protocols if "alpn_protocols" in tls_config: config.alpn_protocols = tls_config["alpn_protocols"] # type: ignore # Set certificate verification mode if "verify_mode" in tls_config: config.verify_mode = tls_config["verify_mode"] # type: ignore logger.debug("Successfully applied TLS configuration to QUIC config") except Exception as e: raise QUICSecurityError(f"Failed to apply TLS configuration: {e}") from e async def dial( self, maddr: multiaddr.Multiaddr, peer_id: ID | None = None ) -> IRawConnection: """ Dial a remote peer using QUIC transport with security verification. Args: maddr: Multiaddr of the remote peer (e.g., /ip4/1.2.3.4/udp/4001/quic-v1) peer_id: Expected peer ID for verification Returns: Raw connection interface to the remote peer Raises: QUICDialError: If dialing fails QUICSecurityError: If security verification fails """ if self._closed: raise QUICDialError("Transport is closed") if not is_quic_multiaddr(maddr): raise QUICDialError(f"Invalid QUIC multiaddr: {maddr}") try: # Extract connection details from multiaddr host, port = quic_multiaddr_to_endpoint(maddr) quic_version = multiaddr_to_quic_version(maddr) # Get appropriate QUIC client configuration config_key = TProtocol(f"{quic_version}_client") print("config_key", config_key, self._quic_configs.keys()) config = self._quic_configs.get(config_key) if not config: raise QUICDialError(f"Unsupported QUIC version: {quic_version}") config.is_client = True logger.debug( f"Dialing QUIC connection to {host}:{port} (version: {quic_version})" ) print("Start QUIC Connection") # Create QUIC connection using aioquic's sans-IO core native_quic_connection = NativeQUICConnection(configuration=config) print("QUIC Connection Created") # Create trio-based QUIC connection wrapper with security connection = QUICConnection( quic_connection=native_quic_connection, remote_addr=(host, port), peer_id=peer_id, local_peer_id=self._peer_id, is_initiator=True, maddr=maddr, transport=self, security_manager=self._security_manager, # Pass security manager ) # Establish connection using trio async with trio.open_nursery() as nursery: await connection.connect(nursery) # Verify peer identity after TLS handshake if peer_id: await self._verify_peer_identity(connection, peer_id) # Store connection for management conn_id = f"{host}:{port}:{peer_id}" self._connections[conn_id] = connection logger.info(f"Successfully dialed secure QUIC connection to {peer_id}") return connection except Exception as e: logger.error(f"Failed to dial QUIC connection to {maddr}: {e}") raise QUICDialError(f"Dial failed: {e}") from e async def _verify_peer_identity( self, connection: QUICConnection, expected_peer_id: ID ) -> None: """ Verify remote peer identity after TLS handshake. Args: connection: The established QUIC connection expected_peer_id: Expected peer ID Raises: QUICSecurityError: If peer verification fails """ try: # Get peer certificate from the connection peer_certificate = await connection.get_peer_certificate() if not peer_certificate: raise QUICSecurityError("No peer certificate available") # Verify peer identity using security manager verified_peer_id = self._security_manager.verify_peer_identity( peer_certificate, expected_peer_id ) if verified_peer_id != expected_peer_id: raise QUICSecurityError( "Peer ID verification failed: expected " f"{expected_peer_id}, got {verified_peer_id}" ) logger.info(f"Peer identity verified: {verified_peer_id}") print(f"Peer identity verified: {verified_peer_id}") except Exception as e: raise QUICSecurityError(f"Peer identity verification failed: {e}") from e def create_listener(self, handler_function: THandler) -> QUICListener: """ Create a QUIC listener with integrated security. Args: handler_function: Function to handle new connections Returns: QUIC listener instance Raises: QUICListenError: If transport is closed """ if self._closed: raise QUICListenError("Transport is closed") # Get server configurations for the listener server_configs = { version: config for version, config in self._quic_configs.items() if version.endswith("_server") } listener = QUICListener( transport=self, handler_function=handler_function, quic_configs=server_configs, config=self._config, security_manager=self._security_manager, # Pass security manager ) self._listeners.append(listener) logger.debug("Created QUIC listener with security") return listener def can_dial(self, maddr: multiaddr.Multiaddr) -> bool: """ Check if this transport can dial the given multiaddr. Args: maddr: Multiaddr to check Returns: True if this transport can dial the address """ return is_quic_multiaddr(maddr) def protocols(self) -> list[TProtocol]: """ Get supported protocol identifiers. Returns: List of supported protocol strings """ protocols = [QUIC_V1_PROTOCOL] if self._config.enable_draft29: protocols.append(QUIC_DRAFT29_PROTOCOL) return protocols def listen_order(self) -> int: """ Get the listen order priority for this transport. Matches go-libp2p's ListenOrder = 1 for QUIC. Returns: Priority order for listening (lower = higher priority) """ return 1 async def close(self) -> None: """Close the transport and cleanup resources.""" if self._closed: return self._closed = True logger.info("Closing QUIC transport") # Close all active connections and listeners concurrently using trio nursery async with trio.open_nursery() as nursery: # Close all connections for connection in self._connections.values(): nursery.start_soon(connection.close) # Close all listeners for listener in self._listeners: nursery.start_soon(listener.close) self._connections.clear() self._listeners.clear() logger.info("QUIC transport closed") def get_stats(self) -> dict[str, int | list[str] | object]: """Get transport statistics including security info.""" return { "active_connections": len(self._connections), "active_listeners": len(self._listeners), "supported_protocols": self.protocols(), "local_peer_id": str(self._peer_id), "security_enabled": True, "tls_configured": True, } def get_security_manager(self) -> QUICTLSConfigManager: """ Get the security manager for this transport. Returns: The QUIC TLS configuration manager """ return self._security_manager