mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
fix: changes to opening new stream, setting quic connection parameters
1. Do not dial to open a new stream, use existing swarm connection in quic transport to open new stream 2. Derive values from quic config for quic stream configuration 3. Set quic-v1 config only if enabled
This commit is contained in:
@ -245,6 +245,13 @@ class Swarm(Service, INetworkService):
|
||||
"""
|
||||
logger.debug("attempting to open a stream to peer %s", peer_id)
|
||||
|
||||
if (
|
||||
isinstance(self.transport, QUICTransport)
|
||||
and self.connections[peer_id] is not None
|
||||
):
|
||||
conn = cast(SwarmConn, self.connections[peer_id])
|
||||
return await conn.new_stream()
|
||||
|
||||
swarm_conn = await self.dial_peer(peer_id)
|
||||
net_stream = await swarm_conn.new_stream()
|
||||
logger.debug("successfully opened a stream to peer %s", peer_id)
|
||||
@ -286,7 +293,7 @@ class Swarm(Service, INetworkService):
|
||||
await self.add_conn(quic_conn)
|
||||
peer_id = quic_conn.peer_id
|
||||
logger.debug(
|
||||
f"successfully opened connection to peer {peer_id}"
|
||||
f"successfully opened quic connection to peer {peer_id}"
|
||||
)
|
||||
# NOTE: This is a intentional barrier to prevent from the
|
||||
# handler exiting and closing the connection.
|
||||
|
||||
@ -86,12 +86,6 @@ class QUICStream(IMuxedStream):
|
||||
- Implements proper stream lifecycle management
|
||||
"""
|
||||
|
||||
# Configuration constants based on research
|
||||
DEFAULT_READ_TIMEOUT = 30.0 # 30 seconds
|
||||
DEFAULT_WRITE_TIMEOUT = 30.0 # 30 seconds
|
||||
FLOW_CONTROL_WINDOW_SIZE = 512 * 1024 # 512KB per stream
|
||||
MAX_RECEIVE_BUFFER_SIZE = 1024 * 1024 # 1MB max buffering
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
connection: "QUICConnection",
|
||||
@ -144,6 +138,17 @@ class QUICStream(IMuxedStream):
|
||||
|
||||
# Resource accounting
|
||||
self._memory_reserved = 0
|
||||
|
||||
# Stream constant configurations
|
||||
self.READ_TIMEOUT = connection._transport._config.STREAM_READ_TIMEOUT
|
||||
self.WRITE_TIMEOUT = connection._transport._config.STREAM_WRITE_TIMEOUT
|
||||
self.FLOW_CONTROL_WINDOW_SIZE = (
|
||||
connection._transport._config.STREAM_FLOW_CONTROL_WINDOW
|
||||
)
|
||||
self.MAX_RECEIVE_BUFFER_SIZE = (
|
||||
connection._transport._config.MAX_STREAM_RECEIVE_BUFFER
|
||||
)
|
||||
|
||||
if self._resource_scope:
|
||||
self._reserve_memory(self.FLOW_CONTROL_WINDOW_SIZE)
|
||||
|
||||
@ -226,7 +231,7 @@ class QUICStream(IMuxedStream):
|
||||
return b""
|
||||
|
||||
# Wait for data with timeout
|
||||
timeout = self.DEFAULT_READ_TIMEOUT
|
||||
timeout = self.READ_TIMEOUT
|
||||
try:
|
||||
with trio.move_on_after(timeout) as cancel_scope:
|
||||
while True:
|
||||
|
||||
@ -114,12 +114,14 @@ class QUICTransport(ITransport):
|
||||
|
||||
self._swarm: Swarm | None = None
|
||||
|
||||
print(f"Initialized QUIC transport with security for peer {self._peer_id}")
|
||||
logger.debug(
|
||||
f"Initialized QUIC transport with security for peer {self._peer_id}"
|
||||
)
|
||||
|
||||
def set_background_nursery(self, nursery: trio.Nursery) -> None:
|
||||
"""Set the nursery to use for background tasks (called by swarm)."""
|
||||
self._background_nursery = nursery
|
||||
print("Transport background nursery set")
|
||||
logger.debug("Transport background nursery set")
|
||||
|
||||
def set_swarm(self, swarm: Swarm) -> None:
|
||||
"""Set the swarm for adding incoming connections."""
|
||||
@ -155,27 +157,28 @@ class QUICTransport(ITransport):
|
||||
self._apply_tls_configuration(base_client_config, client_tls_config)
|
||||
|
||||
# QUIC v1 (RFC 9000) configurations
|
||||
quic_v1_server_config = create_server_config_from_base(
|
||||
base_server_config, self._security_manager, self._config
|
||||
)
|
||||
quic_v1_server_config.supported_versions = [
|
||||
quic_version_to_wire_format(QUIC_V1_PROTOCOL)
|
||||
]
|
||||
if self._config.enable_v1:
|
||||
quic_v1_server_config = create_server_config_from_base(
|
||||
base_server_config, self._security_manager, self._config
|
||||
)
|
||||
quic_v1_server_config.supported_versions = [
|
||||
quic_version_to_wire_format(QUIC_V1_PROTOCOL)
|
||||
]
|
||||
|
||||
quic_v1_client_config = create_client_config_from_base(
|
||||
base_client_config, self._security_manager, self._config
|
||||
)
|
||||
quic_v1_client_config.supported_versions = [
|
||||
quic_version_to_wire_format(QUIC_V1_PROTOCOL)
|
||||
]
|
||||
quic_v1_client_config = create_client_config_from_base(
|
||||
base_client_config, self._security_manager, self._config
|
||||
)
|
||||
quic_v1_client_config.supported_versions = [
|
||||
quic_version_to_wire_format(QUIC_V1_PROTOCOL)
|
||||
]
|
||||
|
||||
# Store both server and client configs for v1
|
||||
self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_server")] = (
|
||||
quic_v1_server_config
|
||||
)
|
||||
self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_client")] = (
|
||||
quic_v1_client_config
|
||||
)
|
||||
# Store both server and client configs for v1
|
||||
self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_server")] = (
|
||||
quic_v1_server_config
|
||||
)
|
||||
self._quic_configs[TProtocol(f"{QUIC_V1_PROTOCOL}_client")] = (
|
||||
quic_v1_client_config
|
||||
)
|
||||
|
||||
# QUIC draft-29 configurations for compatibility
|
||||
if self._config.enable_draft29:
|
||||
@ -196,7 +199,7 @@ class QUICTransport(ITransport):
|
||||
draft29_client_config
|
||||
)
|
||||
|
||||
print("QUIC configurations initialized with libp2p TLS security")
|
||||
logger.debug("QUIC configurations initialized with libp2p TLS security")
|
||||
|
||||
except Exception as e:
|
||||
raise QUICSecurityError(
|
||||
@ -221,7 +224,7 @@ class QUICTransport(ITransport):
|
||||
config.alpn_protocols = tls_config.alpn_protocols
|
||||
config.verify_mode = ssl.CERT_NONE
|
||||
|
||||
print("Successfully applied TLS configuration to QUIC config")
|
||||
logger.debug("Successfully applied TLS configuration to QUIC config")
|
||||
|
||||
except Exception as e:
|
||||
raise QUICSecurityError(f"Failed to apply TLS configuration: {e}") from e
|
||||
@ -267,7 +270,7 @@ class QUICTransport(ITransport):
|
||||
|
||||
# Get appropriate QUIC client configuration
|
||||
config_key = TProtocol(f"{quic_version}_client")
|
||||
print("config_key", config_key, self._quic_configs.keys())
|
||||
logger.debug("config_key", config_key, self._quic_configs.keys())
|
||||
config = self._quic_configs.get(config_key)
|
||||
if not config:
|
||||
raise QUICDialError(f"Unsupported QUIC version: {quic_version}")
|
||||
@ -303,7 +306,7 @@ class QUICTransport(ITransport):
|
||||
transport=self,
|
||||
security_manager=self._security_manager,
|
||||
)
|
||||
print("QUIC Connection Created")
|
||||
logger.debug("QUIC Connection Created")
|
||||
|
||||
if self._background_nursery is None:
|
||||
logger.error("No nursery set to execute background tasks")
|
||||
@ -353,8 +356,8 @@ class QUICTransport(ITransport):
|
||||
f"{expected_peer_id}, got {verified_peer_id}"
|
||||
)
|
||||
|
||||
print(f"Peer identity verified: {verified_peer_id}")
|
||||
print(f"Peer identity verified: {verified_peer_id}")
|
||||
logger.debug(f"Peer identity verified: {verified_peer_id}")
|
||||
logger.debug(f"Peer identity verified: {verified_peer_id}")
|
||||
|
||||
except Exception as e:
|
||||
raise QUICSecurityError(f"Peer identity verification failed: {e}") from e
|
||||
@ -392,7 +395,7 @@ class QUICTransport(ITransport):
|
||||
)
|
||||
|
||||
self._listeners.append(listener)
|
||||
print("Created QUIC listener with security")
|
||||
logger.debug("Created QUIC listener with security")
|
||||
return listener
|
||||
|
||||
def can_dial(self, maddr: multiaddr.Multiaddr) -> bool:
|
||||
@ -438,7 +441,7 @@ class QUICTransport(ITransport):
|
||||
return
|
||||
|
||||
self._closed = True
|
||||
print("Closing QUIC transport")
|
||||
logger.debug("Closing QUIC transport")
|
||||
|
||||
# Close all active connections and listeners concurrently using trio nursery
|
||||
async with trio.open_nursery() as nursery:
|
||||
@ -453,7 +456,7 @@ class QUICTransport(ITransport):
|
||||
self._connections.clear()
|
||||
self._listeners.clear()
|
||||
|
||||
print("QUIC transport closed")
|
||||
logger.debug("QUIC transport closed")
|
||||
|
||||
async def _cleanup_terminated_connection(self, connection: QUICConnection) -> None:
|
||||
"""Clean up a terminated connection from all listeners."""
|
||||
|
||||
Reference in New Issue
Block a user