diff --git a/libp2p/transport/quic/__init__.py b/libp2p/transport/quic/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libp2p/transport/quic/config.py b/libp2p/transport/quic/config.py new file mode 100644 index 00000000..75402626 --- /dev/null +++ b/libp2p/transport/quic/config.py @@ -0,0 +1,51 @@ +""" +Configuration classes for QUIC transport. +""" + +from dataclasses import ( + dataclass, + field, +) +import ssl + + +@dataclass +class QUICTransportConfig: + """Configuration for QUIC transport.""" + + # Connection settings + idle_timeout: float = 30.0 # Connection idle timeout in seconds + max_datagram_size: int = 1200 # Maximum UDP datagram size + local_port: int | None = None # Local port for binding (None = random) + + # Protocol version support + enable_draft29: bool = True # Enable QUIC draft-29 for compatibility + enable_v1: bool = True # Enable QUIC v1 (RFC 9000) + + # TLS settings + verify_mode: ssl.VerifyMode = ssl.CERT_REQUIRED + alpn_protocols: list[str] = field(default_factory=lambda: ["libp2p"]) + + # Performance settings + max_concurrent_streams: int = 1000 # Maximum concurrent streams per connection + connection_window: int = 1024 * 1024 # Connection flow control window + stream_window: int = 64 * 1024 # Stream flow control window + + # Logging and debugging + enable_qlog: bool = False # Enable QUIC logging + qlog_dir: str | None = None # Directory for QUIC logs + + # Connection management + max_connections: int = 1000 # Maximum number of connections + connection_timeout: float = 10.0 # Connection establishment timeout + + def __post_init__(self): + """Validate configuration after initialization.""" + if not (self.enable_draft29 or self.enable_v1): + raise ValueError("At least one QUIC version must be enabled") + + if self.idle_timeout <= 0: + raise ValueError("Idle timeout must be positive") + + if self.max_datagram_size < 1200: + raise ValueError("Max datagram size must be at least 1200 bytes") diff --git a/libp2p/transport/quic/connection.py b/libp2p/transport/quic/connection.py new file mode 100644 index 00000000..fceb9d87 --- /dev/null +++ b/libp2p/transport/quic/connection.py @@ -0,0 +1,368 @@ +""" +QUIC Connection implementation for py-libp2p. +Uses aioquic's sans-IO core with trio for async operations. +""" + +import logging +import socket +import time + +from aioquic.quic import ( + events, +) +from aioquic.quic.connection import ( + QuicConnection, +) +import multiaddr +import trio + +from libp2p.abc import ( + IMuxedConn, + IMuxedStream, + IRawConnection, +) +from libp2p.custom_types import ( + StreamHandlerFn, +) +from libp2p.peer.id import ( + ID, +) + +from .exceptions import ( + QUICConnectionError, + QUICStreamError, +) +from .stream import ( + QUICStream, +) +from .transport import ( + QUICTransport, +) + +logger = logging.getLogger(__name__) + + +class QUICConnection(IRawConnection, IMuxedConn): + """ + QUIC connection implementing both raw connection and muxed connection interfaces. + + Uses aioquic's sans-IO core with trio for native async support. + QUIC natively provides stream multiplexing, so this connection acts as both + a raw connection (for transport layer) and muxed connection (for upper layers). + """ + + def __init__( + self, + quic_connection: QuicConnection, + remote_addr: tuple[str, int], + peer_id: ID, + local_peer_id: ID, + initiator: bool, + maddr: multiaddr.Multiaddr, + transport: QUICTransport, + ): + self._quic = quic_connection + self._remote_addr = remote_addr + self._peer_id = peer_id + self._local_peer_id = local_peer_id + self.__is_initiator = initiator + self._maddr = maddr + self._transport = transport + + # Trio networking + self._socket: trio.socket.SocketType | None = None + self._connected_event = trio.Event() + self._closed_event = trio.Event() + + # Stream management + self._streams: dict[int, QUICStream] = {} + self._next_stream_id: int = ( + 0 if initiator else 1 + ) # Even for initiator, odd for responder + self._stream_handler: StreamHandlerFn | None = None + + # Connection state + self._closed = False + self._timer_task = None + + logger.debug(f"Created QUIC connection to {peer_id}") + + @property + def is_initiator(self) -> bool: # type: ignore + return self.__is_initiator + + async def connect(self) -> None: + """Establish the QUIC connection using trio.""" + try: + # Create UDP socket using trio + self._socket = trio.socket.socket( + family=socket.AF_INET, type=socket.SOCK_DGRAM + ) + + # Start the connection establishment + self._quic.connect(self._remote_addr, now=time.time()) + + # Send initial packet(s) + await self._transmit() + + # Start background tasks using trio nursery + async with trio.open_nursery() as nursery: + nursery.start_soon( + self._handle_incoming_data, None, "QUIC INCOMING DATA" + ) + nursery.start_soon(self._handle_timer, None, "QUIC TIMER HANDLER") + + # Wait for connection to be established + await self._connected_event.wait() + + except Exception as e: + logger.error(f"Failed to connect: {e}") + raise QUICConnectionError(f"Connection failed: {e}") from e + + async def _handle_incoming_data(self) -> None: + """Handle incoming UDP datagrams in trio.""" + while not self._closed: + try: + if self._socket: + data, addr = await self._socket.recvfrom(65536) + self._quic.receive_datagram(data, addr, now=time.time()) + await self._process_events() + await self._transmit() + except trio.ClosedResourceError: + break + except Exception as e: + logger.error(f"Error handling incoming data: {e}") + break + + async def _handle_timer(self) -> None: + """Handle QUIC timer events in trio.""" + while not self._closed: + timer_at = self._quic.get_timer() + if timer_at is None: + await trio.sleep(1.0) # No timer set, check again later + continue + + now = time.time() + if timer_at <= now: + self._quic.handle_timer(now=now) + await self._process_events() + await self._transmit() + else: + await trio.sleep(timer_at - now) + + async def _process_events(self) -> None: + """Process QUIC events from aioquic core.""" + while True: + event = self._quic.next_event() + if event is None: + break + + if isinstance(event, events.ConnectionTerminated): + logger.info(f"QUIC connection terminated: {event.reason_phrase}") + self._closed = True + self._closed_event.set() + break + + elif isinstance(event, events.HandshakeCompleted): + logger.debug("QUIC handshake completed") + self._connected_event.set() + + elif isinstance(event, events.StreamDataReceived): + await self._handle_stream_data(event) + + elif isinstance(event, events.StreamReset): + await self._handle_stream_reset(event) + + async def _handle_stream_data(self, event: events.StreamDataReceived) -> None: + """Handle incoming stream data.""" + stream_id = event.stream_id + + if stream_id not in self._streams: + # Create new stream for incoming data + stream = QUICStream( + connection=self, + stream_id=stream_id, + is_initiator=False, # pyrefly: ignore + ) + self._streams[stream_id] = stream + + # Notify stream handler if available + if self._stream_handler: + # Use trio nursery to start stream handler + async with trio.open_nursery() as nursery: + nursery.start_soon(self._stream_handler, stream) + + # Forward data to stream + stream = self._streams[stream_id] + await stream.handle_data_received(event.data, event.end_stream) + + async def _handle_stream_reset(self, event: events.StreamReset) -> None: + """Handle stream reset.""" + stream_id = event.stream_id + if stream_id in self._streams: + stream = self._streams[stream_id] + await stream.handle_reset(event.error_code) + del self._streams[stream_id] + + async def _transmit(self) -> None: + """Send pending datagrams using trio.""" + socket = self._socket + if socket is None: + return + + for data, addr in self._quic.datagrams_to_send(now=time.time()): + try: + await socket.sendto(data, addr) + except Exception as e: + logger.error(f"Failed to send datagram: {e}") + + # IRawConnection interface + + async def write(self, data: bytes): + """ + Write data to the connection. + For QUIC, this creates a new stream for each write operation. + """ + if self._closed: + raise QUICConnectionError("Connection is closed") + + stream = await self.open_stream() + await stream.write(data) + await stream.close() + + async def read(self, n: int = -1) -> bytes: + """ + Read data from the connection. + For QUIC, this reads from the next available stream. + """ + if self._closed: + raise QUICConnectionError("Connection is closed") + + # For raw connection interface, we need to handle this differently + # In practice, upper layers will use the muxed connection interface + raise NotImplementedError( + "Use muxed connection interface for stream-based reading" + ) + + async def close(self) -> None: + """Close the connection and all streams.""" + if self._closed: + return + + self._closed = True + logger.debug(f"Closing QUIC connection to {self._peer_id}") + + # Close all streams using trio nursery + async with trio.open_nursery() as nursery: + for stream in self._streams.values(): + nursery.start_soon(stream.close) + + # Close QUIC connection + self._quic.close() + await self._transmit() # Send close frames + + # Close socket + if self._socket: + self._socket.close() + + self._streams.clear() + self._closed_event.set() + + logger.debug(f"QUIC connection to {self._peer_id} closed") + + @property + def is_closed(self) -> bool: + """Check if connection is closed.""" + return self._closed + + def multiaddr(self) -> multiaddr.Multiaddr: + """Get the multiaddr for this connection.""" + return self._maddr + + def local_peer_id(self) -> ID: + """Get the local peer ID.""" + return self._local_peer_id + + # IMuxedConn interface + + async def open_stream(self) -> IMuxedStream: + """ + Open a new stream on this connection. + + Returns: + New QUIC stream + + """ + if self._closed: + raise QUICStreamError("Connection is closed") + + # Generate next stream ID + stream_id = self._next_stream_id + self._next_stream_id += ( + 2 # Increment by 2 to maintain initiator/responder distinction + ) + + # Create stream + stream = QUICStream( + connection=self, stream_id=stream_id, is_initiator=True + ) # pyrefly: ignore + + self._streams[stream_id] = stream + + logger.debug(f"Opened QUIC stream {stream_id}") + return stream + + def set_stream_handler(self, handler_function: StreamHandlerFn) -> None: + """ + Set handler for incoming streams. + + Args: + handler_function: Function to handle new incoming streams + + """ + self._stream_handler = handler_function + + async def accept_stream(self) -> IMuxedStream: + """ + Accept an incoming stream. + + Returns: + Accepted stream + + """ + # This is handled automatically by the event processing + # Upper layers should use set_stream_handler instead + raise NotImplementedError("Use set_stream_handler for incoming streams") + + async def verify_peer_identity(self) -> None: + """ + Verify the remote peer's identity using TLS certificate. + This implements the libp2p TLS handshake verification. + """ + # Extract peer ID from TLS certificate + # This should match the expected peer ID + cert_peer_id = self._extract_peer_id_from_cert() + + if self._peer_id and cert_peer_id != self._peer_id: + raise QUICConnectionError( + f"Peer ID mismatch: expected {self._peer_id}, got {cert_peer_id}" + ) + + if not self._peer_id: + self._peer_id = cert_peer_id + + logger.debug(f"Verified peer identity: {self._peer_id}") + + def _extract_peer_id_from_cert(self) -> ID: + """Extract peer ID from TLS certificate.""" + # This should extract the peer ID from the TLS certificate + # following the libp2p TLS specification + # Implementation depends on how the certificate is structured + + # Placeholder - implement based on libp2p TLS spec + # The certificate should contain the peer ID in a specific extension + raise NotImplementedError("Certificate peer ID extraction not implemented") + + def __str__(self) -> str: + """String representation of the connection.""" + return f"QUICConnection(peer={self._peer_id}, streams={len(self._streams)})" diff --git a/libp2p/transport/quic/exceptions.py b/libp2p/transport/quic/exceptions.py new file mode 100644 index 00000000..cf8b1781 --- /dev/null +++ b/libp2p/transport/quic/exceptions.py @@ -0,0 +1,35 @@ +""" +QUIC transport specific exceptions. +""" + +from libp2p.exceptions import ( + BaseLibp2pError, +) + + +class QUICError(BaseLibp2pError): + """Base exception for QUIC transport errors.""" + + +class QUICDialError(QUICError): + """Exception raised when QUIC dial operation fails.""" + + +class QUICListenError(QUICError): + """Exception raised when QUIC listen operation fails.""" + + +class QUICConnectionError(QUICError): + """Exception raised for QUIC connection errors.""" + + +class QUICStreamError(QUICError): + """Exception raised for QUIC stream errors.""" + + +class QUICConfigurationError(QUICError): + """Exception raised for QUIC configuration errors.""" + + +class QUICSecurityError(QUICError): + """Exception raised for QUIC security/TLS errors.""" diff --git a/libp2p/transport/quic/stream.py b/libp2p/transport/quic/stream.py new file mode 100644 index 00000000..781cca30 --- /dev/null +++ b/libp2p/transport/quic/stream.py @@ -0,0 +1,134 @@ +""" +QUIC Stream implementation +""" + +from types import ( + TracebackType, +) + +import trio + +from libp2p.abc import ( + IMuxedStream, +) + +from .connection import ( + QUICConnection, +) +from .exceptions import ( + QUICStreamError, +) + + +class QUICStream(IMuxedStream): + """ + Basic QUIC stream implementation for Module 1. + + This is a minimal implementation to make Module 1 self-contained. + Will be moved to a separate stream.py module in Module 3. + """ + + def __init__( + self, connection: "QUICConnection", stream_id: int, is_initiator: bool + ): + self._connection = connection + self._stream_id = stream_id + self._is_initiator = is_initiator + self._closed = False + + # Trio synchronization + self._receive_buffer = bytearray() + self._receive_event = trio.Event() + self._close_event = trio.Event() + + async def read(self, n: int = -1) -> bytes: + """Read data from the stream.""" + if self._closed: + raise QUICStreamError("Stream is closed") + + # Wait for data if buffer is empty + while not self._receive_buffer and not self._closed: + await self._receive_event.wait() + self._receive_event = trio.Event() # Reset for next read + + if n == -1: + data = bytes(self._receive_buffer) + self._receive_buffer.clear() + else: + data = bytes(self._receive_buffer[:n]) + self._receive_buffer = self._receive_buffer[n:] + + return data + + async def write(self, data: bytes) -> None: + """Write data to the stream.""" + if self._closed: + raise QUICStreamError("Stream is closed") + + # Send data using the underlying QUIC connection + self._connection._quic.send_stream_data(self._stream_id, data) + await self._connection._transmit() + + async def close(self, error_code: int = 0) -> None: + """Close the stream.""" + if self._closed: + return + + self._closed = True + + # Close the QUIC stream + self._connection._quic.reset_stream(self._stream_id, error_code) + await self._connection._transmit() + + # Remove from connection's stream list + self._connection._streams.pop(self._stream_id, None) + + self._close_event.set() + + def is_closed(self) -> bool: + """Check if stream is closed.""" + return self._closed + + async def handle_data_received(self, data: bytes, end_stream: bool) -> None: + """Handle data received from the QUIC connection.""" + if self._closed: + return + + self._receive_buffer.extend(data) + self._receive_event.set() + + if end_stream: + await self.close() + + async def handle_reset(self, error_code: int) -> None: + """Handle stream reset.""" + self._closed = True + self._close_event.set() + + def set_deadline(self, ttl: int) -> bool: + """ + Set the deadline + """ + raise NotImplementedError("Yamux does not support setting read deadlines") + + async def reset(self) -> None: + """ + Reset the stream + """ + self.handle_reset(0) + + def get_remote_address(self) -> tuple[str, int] | None: + return self._connection._remote_addr + + async def __aenter__(self) -> "QUICStream": + """Enter the async context manager.""" + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit the async context manager and close the stream.""" + await self.close() diff --git a/libp2p/transport/quic/transport.py b/libp2p/transport/quic/transport.py new file mode 100644 index 00000000..286c73da --- /dev/null +++ b/libp2p/transport/quic/transport.py @@ -0,0 +1,331 @@ +""" +QUIC Transport implementation for py-libp2p. +Uses aioquic's sans-IO core with trio for native async support. +Based on aioquic library with interface consistency to go-libp2p and js-libp2p. +""" + +import copy +import logging + +from aioquic.quic.configuration import ( + QuicConfiguration, +) +from aioquic.quic.connection import ( + QuicConnection, +) +import multiaddr +from multiaddr import ( + Multiaddr, +) +import trio + +from libp2p.abc import ( + IListener, + IRawConnection, + ITransport, +) +from libp2p.crypto.keys import ( + PrivateKey, +) +from libp2p.peer.id import ( + ID, +) + +from .config import ( + QUICTransportConfig, +) +from .connection import ( + QUICConnection, +) +from .exceptions import ( + QUICDialError, + QUICListenError, +) + +logger = logging.getLogger(__name__) + + +class QUICListener(IListener): + async def close(self): + pass + + async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool: + return False + + def get_addrs(self) -> tuple[Multiaddr, ...]: + return () + + +class QUICTransport(ITransport): + """ + QUIC Transport implementation following libp2p transport interface. + + Uses aioquic's sans-IO core with trio for native async support. + Supports both QUIC v1 (RFC 9000) and draft-29 for compatibility with + go-libp2p and js-libp2p implementations. + """ + + # Protocol identifiers matching go-libp2p + PROTOCOL_QUIC_V1 = "/quic-v1" # RFC 9000 + PROTOCOL_QUIC_DRAFT29 = "/quic" # draft-29 + + def __init__( + self, private_key: PrivateKey, config: QUICTransportConfig | None = None + ): + """ + Initialize QUIC transport. + + Args: + private_key: libp2p private key for identity and TLS cert generation + config: QUIC transport configuration options + + """ + self._private_key = private_key + self._peer_id = ID.from_pubkey(private_key.get_public_key()) + self._config = config or QUICTransportConfig() + + # Connection management + self._connections: dict[str, QUICConnection] = {} + self._listeners: list[QUICListener] = [] + + # QUIC configurations for different versions + self._quic_configs: dict[str, QuicConfiguration] = {} + self._setup_quic_configurations() + + # Resource management + self._closed = False + self._nursery_manager = trio.CapacityLimiter(1) + + logger.info(f"Initialized QUIC transport for peer {self._peer_id}") + + def _setup_quic_configurations(self) -> None: + """Setup QUIC configurations for supported protocol versions.""" + # Base configuration + base_config = QuicConfiguration( + is_client=False, + alpn_protocols=["libp2p"], + verify_mode=self._config.verify_mode, + max_datagram_frame_size=self._config.max_datagram_size, + idle_timeout=self._config.idle_timeout, + ) + + # Add TLS certificate generated from libp2p private key + self._setup_tls_configuration(base_config) + + # QUIC v1 (RFC 9000) configuration + quic_v1_config = copy.deepcopy(base_config) + quic_v1_config.supported_versions = [0x00000001] # QUIC v1 + self._quic_configs[self.PROTOCOL_QUIC_V1] = quic_v1_config + + # QUIC draft-29 configuration for compatibility + if self._config.enable_draft29: + draft29_config = copy.deepcopy(base_config) + draft29_config.supported_versions = [0xFF00001D] # draft-29 + self._quic_configs[self.PROTOCOL_QUIC_DRAFT29] = draft29_config + + def _setup_tls_configuration(self, config: QuicConfiguration) -> None: + """ + Setup TLS configuration with libp2p identity integration. + Similar to go-libp2p's certificate generation approach. + """ + from .security import ( + generate_libp2p_tls_config, + ) + + # Generate TLS certificate with embedded libp2p peer ID + # This follows the libp2p TLS spec for peer identity verification + tls_config = generate_libp2p_tls_config(self._private_key, self._peer_id) + + config.load_cert_chain(tls_config.cert_file, tls_config.key_file) + if tls_config.ca_file: + config.load_verify_locations(tls_config.ca_file) + + async def dial( + self, maddr: multiaddr.Multiaddr, peer_id: ID | None = None + ) -> IRawConnection: + """ + Dial a remote peer using QUIC transport. + + Args: + maddr: Multiaddr of the remote peer (e.g., /ip4/1.2.3.4/udp/4001/quic-v1) + peer_id: Expected peer ID for verification + + Returns: + Raw connection interface to the remote peer + + Raises: + QUICDialError: If dialing fails + + """ + if self._closed: + raise QUICDialError("Transport is closed") + + if not is_quic_multiaddr(maddr): + raise QUICDialError(f"Invalid QUIC multiaddr: {maddr}") + + try: + # Extract connection details from multiaddr + host, port = quic_multiaddr_to_endpoint(maddr) + quic_version = multiaddr_to_quic_version(maddr) + + # Get appropriate QUIC configuration + config = self._quic_configs.get(quic_version) + if not config: + raise QUICDialError(f"Unsupported QUIC version: {quic_version}") + + # Create client configuration + client_config = copy.deepcopy(config) + client_config.is_client = True + + logger.debug( + f"Dialing QUIC connection to {host}:{port} (version: {quic_version})" + ) + + # Create QUIC connection using aioquic's sans-IO core + quic_connection = QuicConnection(configuration=client_config) + + # Create trio-based QUIC connection wrapper + connection = QUICConnection( + quic_connection=quic_connection, + remote_addr=(host, port), + peer_id=peer_id, + local_peer_id=self._peer_id, + is_initiator=True, + maddr=maddr, + transport=self, + ) + + # Establish connection using trio + await connection.connect() + + # Store connection for management + conn_id = f"{host}:{port}:{peer_id}" + self._connections[conn_id] = connection + + # Perform libp2p handshake verification + await connection.verify_peer_identity() + + logger.info(f"Successfully dialed QUIC connection to {peer_id}") + return connection + + except Exception as e: + logger.error(f"Failed to dial QUIC connection to {maddr}: {e}") + raise QUICDialError(f"Dial failed: {e}") from e + + def create_listener( + self, handler_function: Callable[[ReadWriteCloser], None] + ) -> IListener: + """ + Create a QUIC listener. + + Args: + handler_function: Function to handle new connections + + Returns: + QUIC listener instance + + """ + if self._closed: + raise QUICListenError("Transport is closed") + + # TODO: Create QUIC Listener + # listener = QUICListener( + # transport=self, + # handler_function=handler_function, + # quic_configs=self._quic_configs, + # config=self._config, + # ) + listener = QUICListener() + + self._listeners.append(listener) + return listener + + def can_dial(self, maddr: multiaddr.Multiaddr) -> bool: + """ + Check if this transport can dial the given multiaddr. + + Args: + maddr: Multiaddr to check + + Returns: + True if this transport can dial the address + + """ + return is_quic_multiaddr(maddr) + + def protocols(self) -> list[str]: + """ + Get supported protocol identifiers. + + Returns: + List of supported protocol strings + + """ + protocols = [self.PROTOCOL_QUIC_V1] + if self._config.enable_draft29: + protocols.append(self.PROTOCOL_QUIC_DRAFT29) + return protocols + + def listen_order(self) -> int: + """ + Get the listen order priority for this transport. + Matches go-libp2p's ListenOrder = 1 for QUIC. + + Returns: + Priority order for listening (lower = higher priority) + + """ + return 1 + + async def close(self) -> None: + """Close the transport and cleanup resources.""" + if self._closed: + return + + self._closed = True + logger.info("Closing QUIC transport") + + # Close all active connections and listeners concurrently using trio nursery + async with trio.open_nursery() as nursery: + # Close all connections + for connection in self._connections.values(): + nursery.start_soon(connection.close) + + # Close all listeners + for listener in self._listeners: + nursery.start_soon(listener.close) + + self._connections.clear() + self._listeners.clear() + + logger.info("QUIC transport closed") + + def __str__(self) -> str: + """String representation of the transport.""" + return f"QUICTransport(peer_id={self._peer_id}, protocols={self.protocols()})" + + +def new_transport( + private_key: PrivateKey, config: QUICTransportConfig | None = None, **kwargs +) -> QUICTransport: + """ + Factory function to create a new QUIC transport. + Follows the naming convention from go-libp2p (NewTransport). + + Args: + private_key: libp2p private key + config: Transport configuration + **kwargs: Additional configuration options + + Returns: + New QUIC transport instance + + """ + if config is None: + config = QUICTransportConfig(**kwargs) + + return QUICTransport(private_key, config) + + +# Type aliases for consistency with go-libp2p +NewTransport = new_transport # go-libp2p style naming diff --git a/tests/core/transport/quic/test_transport.py b/tests/core/transport/quic/test_transport.py new file mode 100644 index 00000000..fd5e8e88 --- /dev/null +++ b/tests/core/transport/quic/test_transport.py @@ -0,0 +1,103 @@ +from unittest.mock import ( + Mock, +) + +import pytest + +from libp2p.crypto.ed25519 import ( + create_new_key_pair, +) +from libp2p.transport.quic.exceptions import ( + QUICDialError, + QUICListenError, +) +from libp2p.transport.quic.transport import ( + QUICTransport, + QUICTransportConfig, +) + + +class TestQUICTransport: + """Test suite for QUIC transport using trio.""" + + @pytest.fixture + def private_key(self): + """Generate test private key.""" + return create_new_key_pair() + + @pytest.fixture + def transport_config(self): + """Generate test transport configuration.""" + return QUICTransportConfig( + idle_timeout=10.0, enable_draft29=True, enable_v1=True + ) + + @pytest.fixture + def transport(self, private_key, transport_config): + """Create test transport instance.""" + return QUICTransport(private_key, transport_config) + + def test_transport_initialization(self, transport): + """Test transport initialization.""" + assert transport._private_key is not None + assert transport._peer_id is not None + assert not transport._closed + assert len(transport._quic_configs) >= 1 + + def test_supported_protocols(self, transport): + """Test supported protocol identifiers.""" + protocols = transport.protocols() + assert "/quic-v1" in protocols + assert "/quic" in protocols # draft-29 + + def test_can_dial_quic_addresses(self, transport): + """Test multiaddr compatibility checking.""" + import multiaddr + + # Valid QUIC addresses + valid_addrs = [ + multiaddr.Multiaddr("/ip4/127.0.0.1/udp/4001/quic-v1"), + multiaddr.Multiaddr("/ip4/192.168.1.1/udp/8080/quic"), + multiaddr.Multiaddr("/ip6/::1/udp/4001/quic-v1"), + ] + + for addr in valid_addrs: + assert transport.can_dial(addr) + + # Invalid addresses + invalid_addrs = [ + multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/4001"), + multiaddr.Multiaddr("/ip4/127.0.0.1/udp/4001"), + multiaddr.Multiaddr("/ip4/127.0.0.1/udp/4001/ws"), + ] + + for addr in invalid_addrs: + assert not transport.can_dial(addr) + + @pytest.mark.trio + async def test_transport_lifecycle(self, transport): + """Test transport lifecycle management using trio.""" + assert not transport._closed + + await transport.close() + assert transport._closed + + # Should be safe to close multiple times + await transport.close() + + @pytest.mark.trio + async def test_dial_closed_transport(self, transport): + """Test dialing with closed transport raises error.""" + import multiaddr + + await transport.close() + + with pytest.raises(QUICDialError, match="Transport is closed"): + await transport.dial(multiaddr.Multiaddr("/ip4/127.0.0.1/udp/4001/quic-v1")) + + def test_create_listener_closed_transport(self, transport): + """Test creating listener with closed transport raises error.""" + transport._closed = True + + with pytest.raises(QUICListenError, match="Transport is closed"): + transport.create_listener(Mock())