diff --git a/examples/echo/echo_quic.py b/examples/echo/echo_quic.py index ad1ce3ca..cdead8dd 100644 --- a/examples/echo/echo_quic.py +++ b/examples/echo/echo_quic.py @@ -55,7 +55,7 @@ async def run_server(port: int, seed: int | None = None) -> None: # QUIC transport configuration quic_config = QUICTransportConfig( idle_timeout=30.0, - max_concurrent_streams=1000, + max_concurrent_streams=100, connection_timeout=10.0, enable_draft29=False, ) @@ -68,16 +68,21 @@ async def run_server(port: int, seed: int | None = None) -> None: # Server mode: start listener async with host.run(listen_addrs=[listen_addr]): - print(f"I am {host.get_id().to_string()}") - host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler) + try: + print(f"I am {host.get_id().to_string()}") + host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler) - print( - "Run this from the same folder in another console:\n\n" - f"python3 ./examples/echo/echo_quic.py " - f"-d {host.get_addrs()[0]}\n" - ) - print("Waiting for incoming QUIC connections...") - await trio.sleep_forever() + print( + "Run this from the same folder in another console:\n\n" + f"python3 ./examples/echo/echo_quic.py " + f"-d {host.get_addrs()[0]}\n" + ) + print("Waiting for incoming QUIC connections...") + await trio.sleep_forever() + except KeyboardInterrupt: + print("Closing server gracefully...") + await host.close() + return async def run_client(destination: str, seed: int | None = None) -> None: @@ -96,7 +101,7 @@ async def run_client(destination: str, seed: int | None = None) -> None: # QUIC transport configuration quic_config = QUICTransportConfig( idle_timeout=30.0, - max_concurrent_streams=1000, + max_concurrent_streams=100, connection_timeout=10.0, enable_draft29=False, ) diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index a0311bd8..e32c48ac 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -299,9 +299,7 @@ class BasicHost(IHost): ) except MultiselectError as error: peer_id = net_stream.muxed_conn.peer_id - logger.debug( - "failed to accept a stream from peer %s, error=%s", peer_id, error - ) + print("failed to accept a stream from peer %s, error=%s", peer_id, error) await net_stream.reset() return if protocol is None: diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index 98a8129c..dff5b339 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -1,3 +1,5 @@ +from builtins import AssertionError + from libp2p.abc import ( IMultiselectCommunicator, ) @@ -36,7 +38,8 @@ class MultiselectCommunicator(IMultiselectCommunicator): msg_bytes = encode_delim(msg_str.encode()) try: await self.read_writer.write(msg_bytes) - except IOException as error: + # Handle for connection close during ongoing negotiation in QUIC + except (IOException, AssertionError, ValueError) as error: raise MultiselectCommunicatorError( "fail to write to multiselect communicator" ) from error diff --git a/libp2p/transport/quic/config.py b/libp2p/transport/quic/config.py index 00f1907b..80b4bdb1 100644 --- a/libp2p/transport/quic/config.py +++ b/libp2p/transport/quic/config.py @@ -1,3 +1,5 @@ +from typing import Literal + """ Configuration classes for QUIC transport. """ @@ -64,7 +66,7 @@ class QUICTransportConfig: alpn_protocols: list[str] = field(default_factory=lambda: ["libp2p"]) # Performance settings - max_concurrent_streams: int = 1000 # Maximum concurrent streams per connection + max_concurrent_streams: int = 100 # Maximum concurrent streams per connection connection_window: int = 1024 * 1024 # Connection flow control window stream_window: int = 64 * 1024 # Stream flow control window @@ -299,10 +301,11 @@ class QUICStreamMetricsConfig: self.metrics_aggregation_interval = metrics_aggregation_interval -# Factory function for creating optimized configurations - - -def create_stream_config_for_use_case(use_case: str) -> QUICTransportConfig: +def create_stream_config_for_use_case( + use_case: Literal[ + "high_throughput", "low_latency", "many_streams", "memory_constrained" + ], +) -> QUICTransportConfig: """ Create optimized stream configuration for specific use cases. diff --git a/libp2p/transport/quic/connection.py b/libp2p/transport/quic/connection.py index 1e5299db..a0790934 100644 --- a/libp2p/transport/quic/connection.py +++ b/libp2p/transport/quic/connection.py @@ -19,6 +19,7 @@ import trio from libp2p.abc import IMuxedConn, IRawConnection from libp2p.custom_types import TQUICStreamHandlerFn from libp2p.peer.id import ID +from libp2p.stream_muxer.exceptions import MuxedConnUnavailable from .exceptions import ( QUICConnectionClosedError, @@ -64,8 +65,7 @@ class QUICConnection(IRawConnection, IMuxedConn): - COMPLETE connection ID management (fixes the original issue) """ - # Configuration constants based on research - MAX_CONCURRENT_STREAMS = 1000 + MAX_CONCURRENT_STREAMS = 100 MAX_INCOMING_STREAMS = 1000 MAX_OUTGOING_STREAMS = 1000 STREAM_ACCEPT_TIMEOUT = 30.0 @@ -76,7 +76,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self, quic_connection: QuicConnection, remote_addr: tuple[str, int], - peer_id: ID, + remote_peer_id: ID | None, local_peer_id: ID, is_initiator: bool, maddr: multiaddr.Multiaddr, @@ -91,7 +91,7 @@ class QUICConnection(IRawConnection, IMuxedConn): Args: quic_connection: aioquic QuicConnection instance remote_addr: Remote peer address - peer_id: Remote peer ID (may be None initially) + remote_peer_id: Remote peer ID (may be None initially) local_peer_id: Local peer ID is_initiator: Whether this is the connection initiator maddr: Multiaddr for this connection @@ -103,8 +103,9 @@ class QUICConnection(IRawConnection, IMuxedConn): """ self._quic = quic_connection self._remote_addr = remote_addr - self.peer_id = peer_id + self._remote_peer_id = remote_peer_id self._local_peer_id = local_peer_id + self.peer_id = remote_peer_id or local_peer_id self.__is_initiator = is_initiator self._maddr = maddr self._transport = transport @@ -134,7 +135,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self._accept_queue_lock = trio.Lock() # Connection state - self._closed = False + self._closed: bool = False self._established = False self._started = False self._handshake_completed = False @@ -179,7 +180,7 @@ class QUICConnection(IRawConnection, IMuxedConn): } logger.debug( - f"Created QUIC connection to {peer_id} " + f"Created QUIC connection to {remote_peer_id} " f"(initiator: {is_initiator}, addr: {remote_addr}, " "security: {security_manager is not None})" ) @@ -238,7 +239,7 @@ class QUICConnection(IRawConnection, IMuxedConn): def remote_peer_id(self) -> ID | None: """Get the remote peer ID.""" - return self.peer_id + return self._remote_peer_id # *** NEW: Connection ID management methods *** def get_connection_id_stats(self) -> dict[str, Any]: @@ -277,7 +278,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self._started = True self.event_started.set() - logger.debug(f"Starting QUIC connection to {self.peer_id}") + logger.debug(f"Starting QUIC connection to {self._remote_peer_id}") try: # If this is a client connection, we need to establish the connection @@ -288,7 +289,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self._established = True self._connected_event.set() - logger.debug(f"QUIC connection to {self.peer_id} started") + logger.debug(f"QUIC connection to {self._remote_peer_id} started") except Exception as e: logger.error(f"Failed to start connection: {e}") @@ -360,7 +361,7 @@ class QUICConnection(IRawConnection, IMuxedConn): await self._verify_peer_identity_with_security() self._established = True - logger.info(f"QUIC connection established with {self.peer_id}") + logger.info(f"QUIC connection established with {self._remote_peer_id}") except Exception as e: logger.error(f"Failed to establish connection: {e}") @@ -495,16 +496,16 @@ class QUICConnection(IRawConnection, IMuxedConn): # Verify peer identity using security manager verified_peer_id = self._security_manager.verify_peer_identity( self._peer_certificate, - self.peer_id, # Expected peer ID for outbound connections + self._remote_peer_id, # Expected peer ID for outbound connections ) # Update peer ID if it wasn't known (inbound connections) - if not self.peer_id: - self.peer_id = verified_peer_id + if not self._remote_peer_id: + self._remote_peer_id = verified_peer_id logger.info(f"Discovered peer ID from certificate: {verified_peer_id}") - elif self.peer_id != verified_peer_id: + elif self._remote_peer_id != verified_peer_id: raise QUICPeerVerificationError( - f"Peer ID mismatch: expected {self.peer_id}, got {verified_peer_id}" + f"Peer ID mismatch: expected {self._remote_peer_id}, got {verified_peer_id}" ) self._peer_verified = True @@ -608,7 +609,7 @@ class QUICConnection(IRawConnection, IMuxedConn): info: dict[str, bool | Any | None] = { "peer_verified": self._peer_verified, "handshake_complete": self._handshake_completed, - "peer_id": str(self.peer_id) if self.peer_id else None, + "peer_id": str(self._remote_peer_id) if self._remote_peer_id else None, "local_peer_id": str(self._local_peer_id), "is_initiator": self.__is_initiator, "has_certificate": self._peer_certificate is not None, @@ -742,6 +743,9 @@ class QUICConnection(IRawConnection, IMuxedConn): with trio.move_on_after(timeout): while True: + if self._closed: + raise MuxedConnUnavailable("QUIC connection is closed") + async with self._accept_queue_lock: if self._stream_accept_queue: stream = self._stream_accept_queue.pop(0) @@ -749,15 +753,20 @@ class QUICConnection(IRawConnection, IMuxedConn): return stream if self._closed: - raise QUICConnectionClosedError( + raise MuxedConnUnavailable( "Connection closed while accepting stream" ) # Wait for new streams await self._stream_accept_event.wait() - self._stream_accept_event = trio.Event() - raise QUICStreamTimeoutError(f"Stream accept timed out after {timeout}s") + print( + f"{id(self)} ACCEPT STREAM TIMEOUT: CONNECTION STATE {self._closed_event.is_set() or self._closed}" + ) + if self._closed_event.is_set() or self._closed: + raise MuxedConnUnavailable("QUIC connection closed during timeout") + else: + raise QUICStreamTimeoutError(f"Stream accept timed out after {timeout}s") def set_stream_handler(self, handler_function: TQUICStreamHandlerFn) -> None: """ @@ -979,6 +988,11 @@ class QUICConnection(IRawConnection, IMuxedConn): self._closed = True self._closed_event.set() + self._stream_accept_event.set() + print(f"✅ TERMINATION: Woke up pending accept_stream() calls, {id(self)}") + + await self._notify_parent_of_termination() + async def _handle_stream_data(self, event: events.StreamDataReceived) -> None: """Handle stream data events - create streams and add to accept queue.""" stream_id = event.stream_id @@ -1191,7 +1205,7 @@ class QUICConnection(IRawConnection, IMuxedConn): return self._closed = True - logger.debug(f"Closing QUIC connection to {self.peer_id}") + logger.debug(f"Closing QUIC connection to {self._remote_peer_id}") try: # Close all streams gracefully @@ -1233,11 +1247,62 @@ class QUICConnection(IRawConnection, IMuxedConn): self._streams.clear() self._closed_event.set() - logger.debug(f"QUIC connection to {self.peer_id} closed") + logger.debug(f"QUIC connection to {self._remote_peer_id} closed") except Exception as e: logger.error(f"Error during connection close: {e}") + async def _notify_parent_of_termination(self) -> None: + """ + Notify the parent listener/transport to remove this connection from tracking. + + This ensures that terminated connections are cleaned up from the + 'established connections' list. + """ + try: + if self._transport: + await self._transport._cleanup_terminated_connection(self) + logger.debug("Notified transport of connection termination") + return + + for listener in self._transport._listeners: + try: + await listener._remove_connection_by_object(self) + logger.debug( + "Found and notified listener of connection termination" + ) + return + except Exception: + continue + + # Method 4: Use connection ID if we have one (most reliable) + if self._current_connection_id: + await self._cleanup_by_connection_id(self._current_connection_id) + return + + logger.warning( + "Could not notify parent of connection termination - no parent reference found" + ) + + except Exception as e: + logger.error(f"Error notifying parent of connection termination: {e}") + + async def _cleanup_by_connection_id(self, connection_id: bytes) -> None: + """Cleanup using connection ID as a fallback method.""" + try: + for listener in self._transport._listeners: + for tracked_cid, tracked_conn in list(listener._connections.items()): + if tracked_conn is self: + await listener._remove_connection(tracked_cid) + logger.debug( + f"Removed connection {tracked_cid.hex()} by object reference" + ) + return + + logger.debug("Fallback cleanup by connection ID completed") + except Exception as e: + logger.error(f"Error in fallback cleanup: {e}") + # IRawConnection interface (for compatibility) def get_remote_address(self) -> tuple[str, int]: @@ -1333,7 +1398,7 @@ class QUICConnection(IRawConnection, IMuxedConn): def __repr__(self) -> str: return ( - f"QUICConnection(peer={self.peer_id}, " + f"QUICConnection(peer={self._remote_peer_id}, " f"addr={self._remote_addr}, " f"initiator={self.__is_initiator}, " f"verified={self._peer_verified}, " @@ -1343,4 +1408,4 @@ class QUICConnection(IRawConnection, IMuxedConn): ) def __str__(self) -> str: - return f"QUICConnection({self.peer_id})" + return f"QUICConnection({self._remote_peer_id})" diff --git a/libp2p/transport/quic/listener.py b/libp2p/transport/quic/listener.py index ef48e928..7c687dc2 100644 --- a/libp2p/transport/quic/listener.py +++ b/libp2p/transport/quic/listener.py @@ -880,42 +880,49 @@ class QUICListener(IListener): async def _promote_pending_connection( self, quic_conn: QuicConnection, addr: tuple[str, int], dest_cid: bytes - ) -> None: - """Promote a pending connection to an established connection.""" + ): + """Promote pending connection - avoid duplicate creation.""" try: # Remove from pending connections self._pending_connections.pop(dest_cid, None) - # Create multiaddr for this connection - host, port = addr - quic_version = "quic" - remote_maddr = create_quic_multiaddr(host, port, f"/{quic_version}") + # CHECK: Does QUICConnection already exist? + if dest_cid in self._connections: + connection = self._connections[dest_cid] + print( + f"🔄 PROMOTION: Using existing QUICConnection {id(connection)} for {dest_cid.hex()}" + ) + else: + from .connection import QUICConnection - from .connection import QUICConnection + host, port = addr + quic_version = "quic" + remote_maddr = create_quic_multiaddr(host, port, f"/{quic_version}") - connection = QUICConnection( - quic_connection=quic_conn, - remote_addr=addr, - peer_id=None, - local_peer_id=self._transport._peer_id, - is_initiator=False, - maddr=remote_maddr, - transport=self._transport, - security_manager=self._security_manager, - listener_socket=self._socket, - ) + connection = QUICConnection( + quic_connection=quic_conn, + remote_addr=addr, + remote_peer_id=None, + local_peer_id=self._transport._peer_id, + is_initiator=False, + maddr=remote_maddr, + transport=self._transport, + security_manager=self._security_manager, + listener_socket=self._socket, + ) - print( - f"🔧 PROMOTION: Created connection with socket: {self._socket is not None}" - ) - print( - f"🔧 PROMOTION: Socket type: {type(self._socket) if self._socket else 'None'}" - ) + print( + f"🔄 PROMOTION: Created NEW QUICConnection {id(connection)} for {dest_cid.hex()}" + ) - self._connections[dest_cid] = connection + # Store the connection + self._connections[dest_cid] = connection + + # Update mappings self._addr_to_cid[addr] = dest_cid self._cid_to_addr[dest_cid] = addr + # Rest of the existing promotion code... if self._nursery: await connection.connect(self._nursery) @@ -932,10 +939,11 @@ class QUICListener(IListener): await connection.close() return - # Call the connection handler - if self._nursery: - self._nursery.start_soon( - self._handle_new_established_connection, connection + if self._transport._swarm: + print(f"🔄 PROMOTION: Adding connection {id(connection)} to swarm") + await self._transport._swarm.add_conn(connection) + print( + f"🔄 PROMOTION: Successfully added connection {id(connection)} to swarm" ) self._stats["connections_accepted"] += 1 @@ -946,7 +954,6 @@ class QUICListener(IListener): except Exception as e: logger.error(f"❌ Error promoting connection {dest_cid.hex()}: {e}") await self._remove_connection(dest_cid) - self._stats["connections_rejected"] += 1 async def _remove_connection(self, dest_cid: bytes) -> None: """Remove connection by connection ID.""" @@ -1220,6 +1227,32 @@ class QUICListener(IListener): except Exception as e: logger.error(f"Error closing listener: {e}") + async def _remove_connection_by_object(self, connection_obj) -> None: + """Remove a connection by object reference (called when connection terminates).""" + try: + # Find the connection ID for this object + connection_cid = None + for cid, tracked_connection in self._connections.items(): + if tracked_connection is connection_obj: + connection_cid = cid + break + + if connection_cid: + await self._remove_connection(connection_cid) + logger.debug( + f"✅ TERMINATION: Removed connection {connection_cid.hex()} by object reference" + ) + print( + f"✅ TERMINATION: Removed connection {connection_cid.hex()} by object reference" + ) + else: + logger.warning("⚠️ TERMINATION: Connection object not found in tracking") + print("⚠️ TERMINATION: Connection object not found in tracking") + + except Exception as e: + logger.error(f"❌ TERMINATION: Error removing connection by object: {e}") + print(f"❌ TERMINATION: Error removing connection by object: {e}") + def get_addresses(self) -> list[Multiaddr]: """Get the bound addresses.""" return self._bound_addresses.copy() diff --git a/libp2p/transport/quic/transport.py b/libp2p/transport/quic/transport.py index 1eee6529..d4b2d5cb 100644 --- a/libp2p/transport/quic/transport.py +++ b/libp2p/transport/quic/transport.py @@ -218,13 +218,11 @@ class QUICTransport(ITransport): """ try: - # Access attributes directly from QUICTLSSecurityConfig config.certificate = tls_config.certificate config.private_key = tls_config.private_key config.certificate_chain = tls_config.certificate_chain config.alpn_protocols = tls_config.alpn_protocols - # Set verification mode (though libp2p typically doesn't verify) config.verify_mode = tls_config.verify_mode config.verify_mode = ssl.CERT_NONE @@ -285,12 +283,12 @@ class QUICTransport(ITransport): connection = QUICConnection( quic_connection=native_quic_connection, remote_addr=(host, port), - peer_id=peer_id, + remote_peer_id=peer_id, local_peer_id=self._peer_id, is_initiator=True, maddr=maddr, transport=self, - security_manager=self._security_manager, # Pass security manager + security_manager=self._security_manager, ) # Establish connection using trio @@ -389,7 +387,7 @@ class QUICTransport(ITransport): handler_function=handler_function, quic_configs=server_configs, config=self._config, - security_manager=self._security_manager, # Pass security manager + security_manager=self._security_manager, ) self._listeners.append(listener) @@ -456,6 +454,17 @@ class QUICTransport(ITransport): print("QUIC transport closed") + async def _cleanup_terminated_connection(self, connection) -> None: + """Clean up a terminated connection from all listeners.""" + try: + for listener in self._listeners: + await listener._remove_connection_by_object(connection) + logger.debug( + "✅ TRANSPORT: Cleaned up terminated connection from all listeners" + ) + except Exception as e: + logger.error(f"❌ TRANSPORT: Error cleaning up terminated connection: {e}") + def get_stats(self) -> dict[str, int | list[str] | object]: """Get transport statistics including security info.""" return { diff --git a/tests/core/transport/quic/test_connection.py b/tests/core/transport/quic/test_connection.py index 12e08138..5ee496c3 100644 --- a/tests/core/transport/quic/test_connection.py +++ b/tests/core/transport/quic/test_connection.py @@ -69,7 +69,7 @@ class TestQUICConnection: return QUICConnection( quic_connection=mock_quic_connection, remote_addr=("127.0.0.1", 4001), - peer_id=peer_id, + remote_peer_id=None, local_peer_id=peer_id, is_initiator=True, maddr=Multiaddr("/ip4/127.0.0.1/udp/4001/quic"), @@ -87,7 +87,7 @@ class TestQUICConnection: return QUICConnection( quic_connection=mock_quic_connection, remote_addr=("127.0.0.1", 4001), - peer_id=peer_id, + remote_peer_id=peer_id, local_peer_id=peer_id, is_initiator=False, maddr=Multiaddr("/ip4/127.0.0.1/udp/4001/quic"), @@ -117,7 +117,7 @@ class TestQUICConnection: client_conn = QUICConnection( quic_connection=Mock(), remote_addr=("127.0.0.1", 4001), - peer_id=None, + remote_peer_id=None, local_peer_id=Mock(), is_initiator=True, maddr=Multiaddr("/ip4/127.0.0.1/udp/4001/quic"), @@ -129,7 +129,7 @@ class TestQUICConnection: server_conn = QUICConnection( quic_connection=Mock(), remote_addr=("127.0.0.1", 4001), - peer_id=None, + remote_peer_id=None, local_peer_id=Mock(), is_initiator=False, maddr=Multiaddr("/ip4/127.0.0.1/udp/4001/quic"),