From 58433f9b52b741f021713be2ee41de48059a7d8e Mon Sep 17 00:00:00 2001 From: Akash Mondal Date: Sat, 16 Aug 2025 18:28:04 +0000 Subject: [PATCH] fix: changes to opening new stream, setting quic connection parameters 1. Do not dial to open a new stream, use existing swarm connection in quic transport to open new stream 2. Derive values from quic config for quic stream configuration 3. Set quic-v1 config only if enabled --- libp2p/network/swarm.py | 9 ++++- libp2p/transport/quic/stream.py | 19 +++++---- libp2p/transport/quic/transport.py | 63 ++++++++++++++++-------------- 3 files changed, 53 insertions(+), 38 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 17275d39..a8680a83 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -245,6 +245,13 @@ class Swarm(Service, INetworkService): """ logger.debug("attempting to open a stream to peer %s", peer_id) + if ( + isinstance(self.transport, QUICTransport) + and self.connections[peer_id] is not None + ): + conn = cast(SwarmConn, self.connections[peer_id]) + return await conn.new_stream() + swarm_conn = await self.dial_peer(peer_id) net_stream = await swarm_conn.new_stream() logger.debug("successfully opened a stream to peer %s", peer_id) @@ -286,7 +293,7 @@ class Swarm(Service, INetworkService): await self.add_conn(quic_conn) peer_id = quic_conn.peer_id logger.debug( - f"successfully opened connection to peer {peer_id}" + f"successfully opened quic connection to peer {peer_id}" ) # NOTE: This is a intentional barrier to prevent from the # handler exiting and closing the connection. diff --git a/libp2p/transport/quic/stream.py b/libp2p/transport/quic/stream.py index 46aabc30..5b8d6bf9 100644 --- a/libp2p/transport/quic/stream.py +++ b/libp2p/transport/quic/stream.py @@ -86,12 +86,6 @@ class QUICStream(IMuxedStream): - Implements proper stream lifecycle management """ - # Configuration constants based on research - DEFAULT_READ_TIMEOUT = 30.0 # 30 seconds - DEFAULT_WRITE_TIMEOUT = 30.0 # 30 seconds - FLOW_CONTROL_WINDOW_SIZE = 512 * 1024 # 512KB per stream - MAX_RECEIVE_BUFFER_SIZE = 1024 * 1024 # 1MB max buffering - def __init__( self, connection: "QUICConnection", @@ -144,6 +138,17 @@ class QUICStream(IMuxedStream): # Resource accounting self._memory_reserved = 0 + + # Stream constant configurations + self.READ_TIMEOUT = connection._transport._config.STREAM_READ_TIMEOUT + self.WRITE_TIMEOUT = connection._transport._config.STREAM_WRITE_TIMEOUT + self.FLOW_CONTROL_WINDOW_SIZE = ( + connection._transport._config.STREAM_FLOW_CONTROL_WINDOW + ) + self.MAX_RECEIVE_BUFFER_SIZE = ( + connection._transport._config.MAX_STREAM_RECEIVE_BUFFER + ) + if self._resource_scope: self._reserve_memory(self.FLOW_CONTROL_WINDOW_SIZE) @@ -226,7 +231,7 @@ class QUICStream(IMuxedStream): return b"" # Wait for data with timeout - timeout = self.DEFAULT_READ_TIMEOUT + timeout = self.READ_TIMEOUT try: with trio.move_on_after(timeout) as cancel_scope: while True: diff --git a/libp2p/transport/quic/transport.py b/libp2p/transport/quic/transport.py index 5f7d99f6..210b0a7f 100644 --- a/libp2p/transport/quic/transport.py +++ b/libp2p/transport/quic/transport.py @@ -114,12 +114,14 @@ class QUICTransport(ITransport): self._swarm: Swarm | None = None - print(f"Initialized QUIC transport with security for peer {self._peer_id}") + logger.debug( + f"Initialized QUIC transport with security for peer {self._peer_id}" + ) def set_background_nursery(self, nursery: trio.Nursery) -> None: """Set the nursery to use for background tasks (called by swarm).""" self._background_nursery = nursery - print("Transport background nursery set") + logger.debug("Transport background nursery set") def set_swarm(self, swarm: Swarm) -> None: """Set the swarm for adding incoming connections.""" @@ -155,27 +157,28 @@ class QUICTransport(ITransport): self._apply_tls_configuration(base_client_config, client_tls_config) # QUIC v1 (RFC 9000) configurations - quic_v1_server_config = create_server_config_from_base( - base_server_config, self._security_manager, self._config - ) - quic_v1_server_config.supported_versions = [ - quic_version_to_wire_format(QUIC_V1_PROTOCOL) - ] + if self._config.enable_v1: + quic_v1_server_config = create_server_config_from_base( + base_server_config, self._security_manager, self._config + ) + quic_v1_server_config.supported_versions = [ + quic_version_to_wire_format(QUIC_V1_PROTOCOL) + ] - quic_v1_client_config = create_client_config_from_base( - base_client_config, self._security_manager, self._config - ) - quic_v1_client_config.supported_versions = [ - quic_version_to_wire_format(QUIC_V1_PROTOCOL) - ] + quic_v1_client_config = create_client_config_from_base( + base_client_config, self._security_manager, self._config + ) + quic_v1_client_config.supported_versions = [ + quic_version_to_wire_format(QUIC_V1_PROTOCOL) + ] - # Store both server and client configs for v1 - self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_server")] = ( - quic_v1_server_config - ) - self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_client")] = ( - quic_v1_client_config - ) + # Store both server and client configs for v1 + self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_server")] = ( + quic_v1_server_config + ) + self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_client")] = ( + quic_v1_client_config + ) # QUIC draft-29 configurations for compatibility if self._config.enable_draft29: @@ -196,7 +199,7 @@ class QUICTransport(ITransport): draft29_client_config ) - print("QUIC configurations initialized with libp2p TLS security") + logger.debug("QUIC configurations initialized with libp2p TLS security") except Exception as e: raise QUICSecurityError( @@ -221,7 +224,7 @@ class QUICTransport(ITransport): config.alpn_protocols = tls_config.alpn_protocols config.verify_mode = ssl.CERT_NONE - print("Successfully applied TLS configuration to QUIC config") + logger.debug("Successfully applied TLS configuration to QUIC config") except Exception as e: raise QUICSecurityError(f"Failed to apply TLS configuration: {e}") from e @@ -267,7 +270,7 @@ class QUICTransport(ITransport): # Get appropriate QUIC client configuration config_key = TProtocol(f"{quic_version}_client") - print("config_key", config_key, self._quic_configs.keys()) + logger.debug("config_key", config_key, self._quic_configs.keys()) config = self._quic_configs.get(config_key) if not config: raise QUICDialError(f"Unsupported QUIC version: {quic_version}") @@ -303,7 +306,7 @@ class QUICTransport(ITransport): transport=self, security_manager=self._security_manager, ) - print("QUIC Connection Created") + logger.debug("QUIC Connection Created") if self._background_nursery is None: logger.error("No nursery set to execute background tasks") @@ -353,8 +356,8 @@ class QUICTransport(ITransport): f"{expected_peer_id}, got {verified_peer_id}" ) - print(f"Peer identity verified: {verified_peer_id}") - print(f"Peer identity verified: {verified_peer_id}") + logger.debug(f"Peer identity verified: {verified_peer_id}") + logger.debug(f"Peer identity verified: {verified_peer_id}") except Exception as e: raise QUICSecurityError(f"Peer identity verification failed: {e}") from e @@ -392,7 +395,7 @@ class QUICTransport(ITransport): ) self._listeners.append(listener) - print("Created QUIC listener with security") + logger.debug("Created QUIC listener with security") return listener def can_dial(self, maddr: multiaddr.Multiaddr) -> bool: @@ -438,7 +441,7 @@ class QUICTransport(ITransport): return self._closed = True - print("Closing QUIC transport") + logger.debug("Closing QUIC transport") # Close all active connections and listeners concurrently using trio nursery async with trio.open_nursery() as nursery: @@ -453,7 +456,7 @@ class QUICTransport(ITransport): self._connections.clear() self._listeners.clear() - print("QUIC transport closed") + logger.debug("QUIC transport closed") async def _cleanup_terminated_connection(self, connection: QUICConnection) -> None: """Clean up a terminated connection from all listeners."""