diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index aaa24239..17275d39 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -246,10 +246,6 @@ class Swarm(Service, INetworkService): logger.debug("attempting to open a stream to peer %s", peer_id) swarm_conn = await self.dial_peer(peer_id) - dd = "Yes" if swarm_conn is None else "No" - - print(f"Is swarm conn None: {dd}") - net_stream = await swarm_conn.new_stream() logger.debug("successfully opened a stream to peer %s", peer_id) return net_stream @@ -283,18 +279,24 @@ class Swarm(Service, INetworkService): async def conn_handler( read_write_closer: ReadWriteCloser, maddr: Multiaddr = maddr ) -> None: - raw_conn = RawConnection(read_write_closer, False) - # No need to upgrade QUIC Connection if isinstance(self.transport, QUICTransport): - quic_conn = cast(QUICConnection, raw_conn) - await self.add_conn(quic_conn) - # NOTE: This is a intentional barrier to prevent from the handler - # exiting and closing the connection. - await self.manager.wait_finished() - print("Connection Connected") + try: + quic_conn = cast(QUICConnection, read_write_closer) + await self.add_conn(quic_conn) + peer_id = quic_conn.peer_id + logger.debug( + f"successfully opened connection to peer {peer_id}" + ) + # NOTE: This is a intentional barrier to prevent from the + # handler exiting and closing the connection. + await self.manager.wait_finished() + except Exception: + await read_write_closer.close() return + raw_conn = RawConnection(read_write_closer, False) + # Per, https://discuss.libp2p.io/t/multistream-security/130, we first # secure the conn and then mux the conn try: @@ -410,9 +412,10 @@ class Swarm(Service, INetworkService): muxed_conn, self, ) - print("add_conn called") + logger.debug("Swarm::add_conn | starting muxed connection") self.manager.run_task(muxed_conn.start) await muxed_conn.event_started.wait() + logger.debug("Swarm::add_conn | starting swarm connection") self.manager.run_task(swarm_conn.start) await swarm_conn.event_started.wait() # Store muxed_conn with peer id diff --git a/libp2p/transport/quic/connection.py b/libp2p/transport/quic/connection.py index 2e82ba1a..ccba3c3d 100644 --- a/libp2p/transport/quic/connection.py +++ b/libp2p/transport/quic/connection.py @@ -728,51 +728,47 @@ class QUICConnection(IRawConnection, IMuxedConn): async def accept_stream(self, timeout: float | None = None) -> QUICStream: """ - Accept an incoming stream with timeout support. + Accept incoming stream. Args: - timeout: Optional timeout for accepting streams - - Returns: - Accepted incoming stream - - Raises: - QUICStreamTimeoutError: Accept timeout exceeded - QUICConnectionClosedError: Connection is closed + timeout: Optional timeout. If None, waits indefinitely. """ if self._closed: raise QUICConnectionClosedError("Connection is closed") - timeout = timeout or self.STREAM_ACCEPT_TIMEOUT - - with trio.move_on_after(timeout): - while True: - if self._closed: - raise MuxedConnUnavailable("QUIC connection is closed") - - async with self._accept_queue_lock: - if self._stream_accept_queue: - stream = self._stream_accept_queue.pop(0) - logger.debug(f"Accepted inbound stream {stream.stream_id}") - return stream - - if self._closed: - raise MuxedConnUnavailable( - "Connection closed while accepting stream" - ) - - # Wait for new streams - await self._stream_accept_event.wait() - - logger.error( - "Timeout occured while accepting stream for local peer " - f"{self._local_peer_id.to_string()} on QUIC connection" - ) - if self._closed_event.is_set() or self._closed: - raise MuxedConnUnavailable("QUIC connection closed during timeout") + if timeout is not None: + with trio.move_on_after(timeout): + return await self._accept_stream_impl() + # Timeout occurred + if self._closed_event.is_set() or self._closed: + raise MuxedConnUnavailable("QUIC connection closed during timeout") + else: + raise QUICStreamTimeoutError( + f"Stream accept timed out after {timeout}s" + ) else: - raise QUICStreamTimeoutError(f"Stream accept timed out after {timeout}s") + # No timeout - wait indefinitely + return await self._accept_stream_impl() + + async def _accept_stream_impl(self) -> QUICStream: + while True: + if self._closed: + raise MuxedConnUnavailable("QUIC connection is closed") + + async with self._accept_queue_lock: + if self._stream_accept_queue: + stream = self._stream_accept_queue.pop(0) + logger.debug(f"Accepted inbound stream {stream.stream_id}") + return stream + + if self._closed: + raise MuxedConnUnavailable("Connection closed while accepting stream") + + # Wait for new streams indefinitely + await self._stream_accept_event.wait() + + raise QUICConnectionError("Error occurred while waiting to accept stream") def set_stream_handler(self, handler_function: TQUICStreamHandlerFn) -> None: """ diff --git a/libp2p/transport/quic/listener.py b/libp2p/transport/quic/listener.py index 466f4b6d..fd7cc0f1 100644 --- a/libp2p/transport/quic/listener.py +++ b/libp2p/transport/quic/listener.py @@ -744,10 +744,6 @@ class QUICListener(IListener): f"Started background tasks for connection {dest_cid.hex()}" ) - if self._transport._swarm: - await self._transport._swarm.add_conn(connection) - logger.debug(f"Successfully added connection {dest_cid.hex()} to swarm") - try: logger.debug(f"Invoking user callback {dest_cid.hex()}") await self._handler(connection) diff --git a/libp2p/transport/quic/stream.py b/libp2p/transport/quic/stream.py index 9d534e96..46aabc30 100644 --- a/libp2p/transport/quic/stream.py +++ b/libp2p/transport/quic/stream.py @@ -625,7 +625,7 @@ class QUICStream(IMuxedStream): exc_tb: TracebackType | None, ) -> None: """Exit the async context manager and close the stream.""" - print("Exiting the context and closing the stream") + logger.debug("Exiting the context and closing the stream") await self.close() def set_deadline(self, ttl: int) -> bool: