diff --git a/examples/echo/echo_quic.py b/examples/echo/echo_quic.py index f31041ad..532cfe3d 100644 --- a/examples/echo/echo_quic.py +++ b/examples/echo/echo_quic.py @@ -1,15 +1,11 @@ #!/usr/bin/env python3 """ -QUIC Echo Example - Direct replacement for examples/echo/echo.py +QUIC Echo Example - Fixed version with proper client/server separation This program demonstrates a simple echo protocol using QUIC transport where a peer listens for connections and copies back any input received on a stream. -Modified from the original TCP version to use QUIC transport, providing: -- Built-in TLS security -- Native stream multiplexing -- Better performance over UDP -- Modern QUIC protocol features +Fixed to properly separate client and server modes - clients don't start listeners. """ import argparse @@ -40,16 +36,8 @@ async def _echo_stream_handler(stream: INetStream) -> None: await stream.close() -async def run(port: int, destination: str, seed: int | None = None) -> None: - """ - Run echo server or client with QUIC transport. - - Key changes from TCP version: - 1. UDP multiaddr instead of TCP - 2. QUIC transport configuration - 3. Everything else remains the same! - """ - # CHANGED: UDP + QUIC instead of TCP +async def run_server(port: int, seed: int | None = None) -> None: + """Run echo server with QUIC transport.""" listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/udp/{port}/quic") if seed: @@ -63,7 +51,7 @@ async def run(port: int, destination: str, seed: int | None = None) -> None: secret = secrets.token_bytes(32) - # NEW: QUIC transport configuration + # QUIC transport configuration quic_config = QUICTransportConfig( idle_timeout=30.0, max_concurrent_streams=1000, @@ -71,46 +59,87 @@ async def run(port: int, destination: str, seed: int | None = None) -> None: enable_draft29=False, ) - # CHANGED: Add QUIC transport options + # Create host with QUIC transport host = new_host( key_pair=create_new_key_pair(secret), transport_opt={"quic_config": quic_config}, ) + # Server mode: start listener async with host.run(listen_addrs=[listen_addr]): print(f"I am {host.get_id().to_string()}") + host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler) - if not destination: # Server mode - host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler) + print( + "Run this from the same folder in another console:\n\n" + f"python3 ./examples/echo/echo_quic.py " + f"-d {host.get_addrs()[0]}\n" + ) + print("Waiting for incoming QUIC connections...") + await trio.sleep_forever() - print( - "Run this from the same folder in another console:\n\n" - f"python3 ./examples/echo/echo_quic.py " - f"-d {host.get_addrs()[0]}\n" - ) - print("Waiting for incoming QUIC connections...") - await trio.sleep_forever() - else: # Client mode - maddr = multiaddr.Multiaddr(destination) - info = info_from_p2p_addr(maddr) - # Associate the peer with local ip address - await host.connect(info) +async def run_client(destination: str, seed: int | None = None) -> None: + """Run echo client with QUIC transport.""" + if seed: + import random - # Start a stream with the destination. - # Multiaddress of the destination peer is fetched from the peerstore - # using 'peerId'. - stream = await host.new_stream(info.peer_id, [PROTOCOL_ID]) + random.seed(seed) + secret_number = random.getrandbits(32 * 8) + secret = secret_number.to_bytes(length=32, byteorder="big") + else: + import secrets - msg = b"hi, there!\n" + secret = secrets.token_bytes(32) - await stream.write(msg) - # Notify the other side about EOF - await stream.close() - response = await stream.read() + # QUIC transport configuration + quic_config = QUICTransportConfig( + idle_timeout=30.0, + max_concurrent_streams=1000, + connection_timeout=10.0, + enable_draft29=False, + ) - print(f"Sent: {msg.decode('utf-8')}") - print(f"Got: {response.decode('utf-8')}") + # Create host with QUIC transport + host = new_host( + key_pair=create_new_key_pair(secret), + transport_opt={"quic_config": quic_config}, + ) + + # Client mode: NO listener, just connect + async with host.run(listen_addrs=[]): # Empty listen_addrs for client + print(f"I am {host.get_id().to_string()}") + + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + + # Connect to server + await host.connect(info) + + # Start a stream with the destination + stream = await host.new_stream(info.peer_id, [PROTOCOL_ID]) + + msg = b"hi, there!\n" + + await stream.write(msg) + # Notify the other side about EOF + await stream.close() + response = await stream.read() + + print(f"Sent: {msg.decode('utf-8')}") + print(f"Got: {response.decode('utf-8')}") + + +async def run(port: int, destination: str, seed: int | None = None) -> None: + """ + Run echo server or client with QUIC transport. + + Fixed version that properly separates client and server modes. + """ + if not destination: # Server mode + await run_server(port, seed) + else: # Client mode + await run_client(destination, seed) def main() -> None: @@ -122,16 +151,16 @@ def main() -> None: QUIC provides built-in TLS security and stream multiplexing over UDP. - To use it, first run 'python ./echo.py -p ', where is - the UDP port number.Then, run another host with , - 'python ./echo.py -p -d ' + To use it, first run 'python ./echo_quic_fixed.py -p ', where is + the UDP port number. Then, run another host with , + 'python ./echo_quic_fixed.py -d ' where is the QUIC multiaddress of the previous listener host. """ example_maddr = "/ip4/127.0.0.1/udp/8000/quic/p2p/QmQn4SwGkDZKkUEpBRBv" parser = argparse.ArgumentParser(description=description) - parser.add_argument("-p", "--port", default=8000, type=int, help="UDP port number") + parser.add_argument("-p", "--port", default=0, type=int, help="UDP port number") parser.add_argument( "-d", "--destination", @@ -152,6 +181,7 @@ def main() -> None: pass -logging.basicConfig(level=logging.DEBUG) if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + logging.getLogger("aioquic").setLevel(logging.DEBUG) main() diff --git a/libp2p/transport/quic/listener.py b/libp2p/transport/quic/listener.py index bb7f3fd5..76fc18c5 100644 --- a/libp2p/transport/quic/listener.py +++ b/libp2p/transport/quic/listener.py @@ -250,6 +250,7 @@ class QUICListener(IListener): async def _process_packet(self, data: bytes, addr: tuple[str, int]) -> None: """ Enhanced packet processing with connection ID routing and version negotiation. + FIXED: Added address-based connection reuse to prevent multiple connections. """ try: self._stats["packets_processed"] += 1 @@ -258,11 +259,15 @@ class QUICListener(IListener): # Parse packet to extract connection information packet_info = self.parse_quic_packet(data) + print(f"🔧 DEBUG: Address mappings: {self._addr_to_cid}") + print( + f"🔧 DEBUG: Pending connections: {list(self._pending_connections.keys())}" + ) + async with self._connection_lock: if packet_info: # Check for version negotiation if packet_info.version == 0: - # Version negotiation packet - this shouldn't happen on server logger.warning( f"Received version negotiation packet from {addr}" ) @@ -279,24 +284,79 @@ class QUICListener(IListener): dest_cid = packet_info.destination_cid if dest_cid in self._connections: - # Existing connection + # Existing established connection + print(f"🔧 ROUTING: To established connection {dest_cid.hex()}") connection = self._connections[dest_cid] await self._route_to_connection(connection, data, addr) + elif dest_cid in self._pending_connections: - # Pending connection + # Existing pending connection + print(f"🔧 ROUTING: To pending connection {dest_cid.hex()}") quic_conn = self._pending_connections[dest_cid] await self._handle_pending_connection( quic_conn, data, addr, dest_cid ) + else: - # New connection - only handle Initial packets for new conn - if packet_info.packet_type == 0: # Initial packet - await self._handle_new_connection(data, addr, packet_info) - else: - logger.debug( - "Ignoring non-Initial packet for unknown " - f"connection ID from {addr}" + # CRITICAL FIX: Check for existing connection by address BEFORE creating new + existing_cid = self._addr_to_cid.get(addr) + + if existing_cid is not None: + print( + f"✅ FOUND: Existing connection {existing_cid.hex()} for address {addr}" ) + print( + f"🔧 NOTE: Client dest_cid {dest_cid.hex()} != our cid {existing_cid.hex()}" + ) + + # Route to existing connection by address + if existing_cid in self._pending_connections: + print( + "🔧 ROUTING: Using existing pending connection by address" + ) + quic_conn = self._pending_connections[existing_cid] + await self._handle_pending_connection( + quic_conn, data, addr, existing_cid + ) + elif existing_cid in self._connections: + print( + "🔧 ROUTING: Using existing established connection by address" + ) + connection = self._connections[existing_cid] + await self._route_to_connection(connection, data, addr) + else: + print( + f"❌ ERROR: Address mapping exists but connection {existing_cid.hex()} not found!" + ) + # Clean up broken mapping and create new + self._addr_to_cid.pop(addr, None) + if packet_info.packet_type == 0: # Initial packet + print( + "🔧 NEW: Creating new connection after cleanup" + ) + await self._handle_new_connection( + data, addr, packet_info + ) + + else: + # Truly new connection - only handle Initial packets + if packet_info.packet_type == 0: # Initial packet + print(f"🔧 NEW: Creating first connection for {addr}") + await self._handle_new_connection( + data, addr, packet_info + ) + + # Debug the newly created connection + new_cid = self._addr_to_cid.get(addr) + if new_cid and new_cid in self._pending_connections: + quic_conn = self._pending_connections[new_cid] + await self._debug_quic_connection_state( + quic_conn, new_cid + ) + else: + logger.debug( + f"Ignoring non-Initial packet for unknown connection ID from {addr}" + ) else: # Fallback to address-based routing for short header packets await self._handle_short_header_packet(data, addr) @@ -504,6 +564,49 @@ class QUICListener(IListener): connection = self._connections[dest_cid] await connection._handle_stream_reset(event) + async def _debug_quic_connection_state( + self, quic_conn: QuicConnection, connection_id: bytes + ): + """Debug the internal state of the QUIC connection.""" + try: + print(f"🔧 QUIC_STATE: Debugging connection {connection_id}") + + if not quic_conn: + print("🔧 QUIC_STATE: QUIC CONNECTION NOT FOUND") + return + + # Check TLS state + if hasattr(quic_conn, "tls") and quic_conn.tls: + print("🔧 QUIC_STATE: TLS context exists") + if hasattr(quic_conn.tls, "state"): + print(f"🔧 QUIC_STATE: TLS state: {quic_conn.tls.state}") + else: + print("❌ QUIC_STATE: No TLS context!") + + # Check connection state + if hasattr(quic_conn, "_state"): + print(f"🔧 QUIC_STATE: Connection state: {quic_conn._state}") + + # Check if handshake is complete + if hasattr(quic_conn, "_handshake_complete"): + print( + f"🔧 QUIC_STATE: Handshake complete: {quic_conn._handshake_complete}" + ) + + # Check configuration + if hasattr(quic_conn, "configuration"): + config = quic_conn.configuration + print( + f"🔧 QUIC_STATE: Config certificate: {config.certificate is not None}" + ) + print( + f"🔧 QUIC_STATE: Config private_key: {config.private_key is not None}" + ) + print(f"🔧 QUIC_STATE: Config is_client: {config.is_client}") + + except Exception as e: + print(f"❌ QUIC_STATE: Error checking state: {e}") + async def _promote_pending_connection( self, quic_conn: QuicConnection, addr: tuple[str, int], dest_cid: bytes ) -> None: @@ -601,22 +704,114 @@ class QUICListener(IListener): if dest_cid: await self._remove_connection(dest_cid) - async def _transmit_for_connection( - self, quic_conn: QuicConnection, addr: tuple[str, int] - ) -> None: - """Send outgoing packets for a QUIC connection.""" + async def _transmit_for_connection(self, quic_conn, addr): + """Enhanced transmission diagnostics to analyze datagram content.""" try: - while True: - datagrams = quic_conn.datagrams_to_send(now=time.time()) - if not datagrams: - break + print(f"🔧 TRANSMIT: Starting transmission to {addr}") - for datagram, _ in datagrams: - if self._socket: + # Get current timestamp for timing + import time + + now = time.time() + + datagrams = quic_conn.datagrams_to_send(now=now) + print(f"🔧 TRANSMIT: Got {len(datagrams)} datagrams to send") + + if not datagrams: + print("⚠️ TRANSMIT: No datagrams to send") + return + + for i, (datagram, dest_addr) in enumerate(datagrams): + print(f"🔧 TRANSMIT: Analyzing datagram {i}") + print(f"🔧 TRANSMIT: Datagram size: {len(datagram)} bytes") + print(f"🔧 TRANSMIT: Destination: {dest_addr}") + print(f"🔧 TRANSMIT: Expected destination: {addr}") + + # Analyze datagram content + if len(datagram) > 0: + # QUIC packet format analysis + first_byte = datagram[0] + header_form = (first_byte & 0x80) >> 7 # Bit 7 + fixed_bit = (first_byte & 0x40) >> 6 # Bit 6 + packet_type = (first_byte & 0x30) >> 4 # Bits 4-5 + type_specific = first_byte & 0x0F # Bits 0-3 + + print(f"🔧 TRANSMIT: First byte: 0x{first_byte:02x}") + print( + f"🔧 TRANSMIT: Header form: {header_form} ({'Long' if header_form else 'Short'})" + ) + print( + f"🔧 TRANSMIT: Fixed bit: {fixed_bit} ({'Valid' if fixed_bit else 'INVALID!'})" + ) + print(f"🔧 TRANSMIT: Packet type: {packet_type}") + + # For long header packets (handshake), analyze further + if header_form == 1: # Long header + packet_types = { + 0: "Initial", + 1: "0-RTT", + 2: "Handshake", + 3: "Retry", + } + type_name = packet_types.get(packet_type, "Unknown") + print(f"🔧 TRANSMIT: Long header packet type: {type_name}") + + # Look for CRYPTO frame indicators + # CRYPTO frame type is 0x06 + crypto_frame_found = False + for offset in range(len(datagram)): + if datagram[offset] == 0x06: # CRYPTO frame type + crypto_frame_found = True + print( + f"✅ TRANSMIT: Found CRYPTO frame at offset {offset}" + ) + break + + if not crypto_frame_found: + print("❌ TRANSMIT: NO CRYPTO frame found in datagram!") + # Look for other frame types + frame_types_found = set() + for offset in range(len(datagram)): + frame_type = datagram[offset] + if frame_type in [0x00, 0x01]: # PADDING/PING + frame_types_found.add("PADDING/PING") + elif frame_type == 0x02: # ACK + frame_types_found.add("ACK") + elif frame_type == 0x06: # CRYPTO + frame_types_found.add("CRYPTO") + + print( + f"🔧 TRANSMIT: Frame types detected: {frame_types_found}" + ) + + # Show first few bytes for debugging + preview_bytes = min(32, len(datagram)) + hex_preview = " ".join(f"{b:02x}" for b in datagram[:preview_bytes]) + print(f"🔧 TRANSMIT: First {preview_bytes} bytes: {hex_preview}") + + # Actually send the datagram + if self._socket: + try: + print(f"🔧 TRANSMIT: Sending datagram {i} via socket...") await self._socket.sendto(datagram, addr) + print(f"✅ TRANSMIT: Successfully sent datagram {i}") + except Exception as send_error: + print(f"❌ TRANSMIT: Socket send failed: {send_error}") + else: + print("❌ TRANSMIT: No socket available!") + + # Check if there are more datagrams after sending + remaining_datagrams = quic_conn.datagrams_to_send(now=time.time()) + print( + f"🔧 TRANSMIT: After sending, {len(remaining_datagrams)} datagrams remain" + ) + print("------END OF THIS DATAGRAM LOG-----") except Exception as e: - logger.error(f"Error transmitting packets to {addr}: {e}") + print(f"❌ TRANSMIT: Transmission error: {e}") + import traceback + + traceback.print_exc() async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool: """Start listening on the given multiaddr with enhanced connection handling."""