diff --git a/examples/echo/echo_quic.py b/examples/echo/echo_quic.py index 68580e20..ad1ce3ca 100644 --- a/examples/echo/echo_quic.py +++ b/examples/echo/echo_quic.py @@ -125,12 +125,12 @@ async def run_client(destination: str, seed: int | None = None) -> None: msg = b"hi, there!\n" await stream.write(msg) - # Notify the other side about EOF - await stream.close() response = await stream.read() print(f"Sent: {msg.decode('utf-8')}") print(f"Got: {response.decode('utf-8')}") + await stream.close() + await host.disconnect(info.peer_id) async def run(port: int, destination: str, seed: int | None = None) -> None: diff --git a/examples/echo/test_quic.py b/examples/echo/test_quic.py index ea97bd20..ab037ae4 100644 --- a/examples/echo/test_quic.py +++ b/examples/echo/test_quic.py @@ -262,6 +262,7 @@ async def test_server_startup(): await trio.sleep(5.0) print("✅ Server test completed (timed out normally)") + nursery.cancel_scope.cancel() return True else: print("❌ Failed to bind server") @@ -347,13 +348,13 @@ async def test_full_handshake_and_certificate_exchange(): print("✅ aioquic connections instantiated correctly.") print("🔧 Client CIDs") - print(f"Local Init CID: ", client_conn._local_initial_source_connection_id.hex()) + print("Local Init CID: ", client_conn._local_initial_source_connection_id.hex()) print( - f"Remote Init CID: ", + "Remote Init CID: ", (client_conn._remote_initial_source_connection_id or b"").hex(), ) print( - f"Original Destination CID: ", + "Original Destination CID: ", client_conn.original_destination_connection_id.hex(), ) print(f"Host CID: {client_conn._host_cids[0].cid.hex()}") @@ -372,9 +373,11 @@ async def test_full_handshake_and_certificate_exchange(): while time() - start_time < max_duration_s: for datagram, _ in client_conn.datagrams_to_send(now=time()): - header = pull_quic_header(Buffer(data=datagram)) + header = pull_quic_header(Buffer(data=datagram), host_cid_length=8) print("Client packet source connection id", header.source_cid.hex()) - print("Client packet destination connection id", header.destination_cid.hex()) + print( + "Client packet destination connection id", header.destination_cid.hex() + ) print("--SERVER INJESTING CLIENT PACKET---") server_conn.receive_datagram(datagram, client_address, now=time()) @@ -382,9 +385,11 @@ async def test_full_handshake_and_certificate_exchange(): f"Server remote initial source id: {(server_conn._remote_initial_source_connection_id or b'').hex()}" ) for datagram, _ in server_conn.datagrams_to_send(now=time()): - header = pull_quic_header(Buffer(data=datagram)) + header = pull_quic_header(Buffer(data=datagram), host_cid_length=8) print("Server packet source connection id", header.source_cid.hex()) - print("Server packet destination connection id", header.destination_cid.hex()) + print( + "Server packet destination connection id", header.destination_cid.hex() + ) print("--CLIENT INJESTING SERVER PACKET---") client_conn.receive_datagram(datagram, server_address, now=time()) @@ -413,12 +418,8 @@ async def test_full_handshake_and_certificate_exchange(): ) print("✅ Client successfully received server certificate.") - assert server_peer_cert is not None, ( - "❌ Server FAILED to receive client certificate." - ) - print("✅ Server successfully received client certificate.") - print("🎉 Test Passed: Full handshake and certificate exchange successful.") + return True async def main(): diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index b54fdda4..528e1dc8 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,6 +1,7 @@ from enum import ( Enum, ) +import inspect import trio @@ -163,20 +164,25 @@ class NetStream(INetStream): data = await self.muxed_stream.read(n) return data except MuxedStreamEOF as error: + print("NETSTREAM: READ ERROR, RECEIVED EOF") async with self._state_lock: if self.__stream_state == StreamState.CLOSE_WRITE: self.__stream_state = StreamState.CLOSE_BOTH + print("NETSTREAM: READ ERROR, REMOVING STREAM") await self._remove() elif self.__stream_state == StreamState.OPEN: + print("NETSTREAM: READ ERROR, NEW STATE -> CLOSE_READ") self.__stream_state = StreamState.CLOSE_READ raise StreamEOF() from error except MuxedStreamReset as error: + print("NETSTREAM: READ ERROR, MUXED STREAM RESET") async with self._state_lock: if self.__stream_state in [ StreamState.OPEN, StreamState.CLOSE_READ, StreamState.CLOSE_WRITE, ]: + print("NETSTREAM: READ ERROR, NEW STATE -> RESET") self.__stream_state = StreamState.RESET await self._remove() raise StreamReset() from error @@ -210,6 +216,8 @@ class NetStream(INetStream): async def close(self) -> None: """Close stream for writing.""" + print("NETSTREAM: CLOSING STREAM, CURRENT STATE: ", self.__stream_state) + print("CALLED BY: ", inspect.stack()[1].function) async with self._state_lock: if self.__stream_state in [ StreamState.CLOSE_BOTH, @@ -229,6 +237,7 @@ class NetStream(INetStream): async def reset(self) -> None: """Reset stream, closing both ends.""" + print("NETSTREAM: RESETING STREAM") async with self._state_lock: if self.__stream_state == StreamState.RESET: return diff --git a/libp2p/transport/quic/connection.py b/libp2p/transport/quic/connection.py index ff0a4a8d..1e5299db 100644 --- a/libp2p/transport/quic/connection.py +++ b/libp2p/transport/quic/connection.py @@ -966,7 +966,7 @@ class QUICConnection(IRawConnection, IMuxedConn): self, event: events.ConnectionTerminated ) -> None: """Handle connection termination.""" - logger.debug(f"QUIC connection terminated: {event.reason_phrase}") + print(f"QUIC connection terminated: {event.reason_phrase}") # Close all streams for stream in list(self._streams.values()): diff --git a/libp2p/transport/quic/stream.py b/libp2p/transport/quic/stream.py index 06b2201b..a008d8ec 100644 --- a/libp2p/transport/quic/stream.py +++ b/libp2p/transport/quic/stream.py @@ -360,10 +360,6 @@ class QUICStream(IMuxedStream): return try: - # Signal read closure to QUIC layer - self._connection._quic.reset_stream(self._stream_id, error_code=0) - await self._connection._transmit() - self._read_closed = True async with self._state_lock: @@ -590,6 +586,7 @@ class QUICStream(IMuxedStream): exc_tb: TracebackType | None, ) -> None: """Exit the async context manager and close the stream.""" + print("Exiting the context and closing the stream") await self.close() def set_deadline(self, ttl: int) -> bool: