From cb6fd27626b157a291c316781a3d5a4870d87d9a Mon Sep 17 00:00:00 2001 From: Akash Mondal Date: Tue, 17 Jun 2025 08:46:54 +0000 Subject: [PATCH] fix: process packets received and send to quic --- examples/echo/echo_quic.py | 9 +--- libp2p/network/swarm.py | 7 +++ libp2p/transport/quic/connection.py | 66 +++++++++++++++++++++++------ libp2p/transport/quic/listener.py | 5 ++- libp2p/transport/quic/security.py | 6 ++- libp2p/transport/quic/transport.py | 14 +++++- 6 files changed, 81 insertions(+), 26 deletions(-) diff --git a/examples/echo/echo_quic.py b/examples/echo/echo_quic.py index 6289cc54..f31041ad 100644 --- a/examples/echo/echo_quic.py +++ b/examples/echo/echo_quic.py @@ -144,19 +144,14 @@ def main() -> None: type=int, help="provide a seed to the random number generator", ) - parser.add_argument( - "-log", - "--loglevel", - default="DEBUG", - help="Provide logging level. Example --loglevel debug, default=warning", - ) args = parser.parse_args() - logging.basicConfig(level=args.loglevel.upper()) + try: trio.run(run, args.port, args.destination, args.seed) except KeyboardInterrupt: pass +logging.basicConfig(level=logging.DEBUG) if __name__ == "__main__": main() diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 331a0ce4..7873a056 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -3,6 +3,7 @@ from collections.abc import ( Callable, ) import logging +import sys from multiaddr import ( Multiaddr, @@ -56,6 +57,11 @@ from .exceptions import ( SwarmException, ) +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], +) logger = logging.getLogger("libp2p.network.swarm") @@ -245,6 +251,7 @@ class Swarm(Service, INetworkService): - Map multiaddr to listener """ # We need to wait until `self.listener_nursery` is created. + logger.debug("SWARM LISTEN CALLED") await self.event_listener_nursery_created.wait() success_count = 0 diff --git a/libp2p/transport/quic/connection.py b/libp2p/transport/quic/connection.py index e1693fa4..c647c159 100644 --- a/libp2p/transport/quic/connection.py +++ b/libp2p/transport/quic/connection.py @@ -5,6 +5,7 @@ Uses aioquic's sans-IO core with trio for async operations. import logging import socket +from sys import stdout import time from typing import TYPE_CHECKING, Any, Optional @@ -34,10 +35,11 @@ if TYPE_CHECKING: from .security import QUICTLSConfigManager from .transport import QUICTransport +logging.root.handlers = [] logging.basicConfig( - level="DEBUG", - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[logging.StreamHandler()], + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", + handlers=[logging.StreamHandler(stdout)], ) logger = logging.getLogger(__name__) @@ -252,18 +254,17 @@ class QUICConnection(IRawConnection, IMuxedConn): raise QUICConnectionError(f"Connection start failed: {e}") from e async def _initiate_connection(self) -> None: - """Initiate client-side connection establishment.""" + """Initiate client-side connection, reusing listener socket if available.""" try: with QUICErrorContext("connection_initiation", "connection"): - # Create UDP socket using trio - self._socket = trio.socket.socket( - family=socket.AF_INET, type=socket.SOCK_DGRAM - ) + if not self._socket: + logger.debug("Creating new socket for outbound connection") + self._socket = trio.socket.socket( + family=socket.AF_INET, type=socket.SOCK_DGRAM + ) - # Connect the socket to the remote address - await self._socket.connect(self._remote_addr) + await self._socket.bind(("0.0.0.0", 0)) - # Start the connection establishment self._quic.connect(self._remote_addr, now=time.time()) # Send initial packet(s) @@ -297,8 +298,10 @@ class QUICConnection(IRawConnection, IMuxedConn): # Start background event processing if not self._background_tasks_started: - print("STARTING BACKGROUND TASK") + logger.debug("STARTING BACKGROUND TASK") await self._start_background_tasks() + else: + logger.debug("BACKGROUND TASK ALREADY STARTED") # Wait for handshake completion with timeout with trio.move_on_after( @@ -330,11 +333,14 @@ class QUICConnection(IRawConnection, IMuxedConn): self._background_tasks_started = True + if self.__is_initiator: # Only for client connections + self._nursery.start_soon(async_fn=self._client_packet_receiver) + # Start event processing task self._nursery.start_soon(async_fn=self._event_processing_loop) # Start periodic tasks - # self._nursery.start_soon(async_fn=self._periodic_maintenance) + self._nursery.start_soon(async_fn=self._periodic_maintenance) logger.debug("Started background tasks for QUIC connection") @@ -379,6 +385,40 @@ class QUICConnection(IRawConnection, IMuxedConn): except Exception as e: logger.error(f"Error in periodic maintenance: {e}") + async def _client_packet_receiver(self) -> None: + """Receive packets for client connections.""" + logger.debug("Starting client packet receiver") + print("Started QUIC client packet receiver") + + try: + while not self._closed and self._socket: + try: + # Receive UDP packets + data, addr = await self._socket.recvfrom(65536) + print(f"Client received {len(data)} bytes from {addr}") + + # Feed packet to QUIC connection + self._quic.receive_datagram(data, addr, now=time.time()) + + # Process any events that result from the packet + await self._process_quic_events() + + # Send any response packets + await self._transmit() + + except trio.ClosedResourceError: + logger.debug("Client socket closed") + break + except Exception as e: + logger.error(f"Error receiving client packet: {e}") + await trio.sleep(0.01) + + except trio.Cancelled: + logger.info("Client packet receiver cancelled") + raise + finally: + logger.debug("Client packet receiver terminated") + # Security and identity methods async def _verify_peer_identity_with_security(self) -> None: diff --git a/libp2p/transport/quic/listener.py b/libp2p/transport/quic/listener.py index fd023a3a..bb7f3fd5 100644 --- a/libp2p/transport/quic/listener.py +++ b/libp2p/transport/quic/listener.py @@ -5,6 +5,7 @@ QUIC Listener import logging import socket import struct +import sys import time from typing import TYPE_CHECKING @@ -35,8 +36,8 @@ if TYPE_CHECKING: logging.basicConfig( level=logging.DEBUG, - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[logging.StreamHandler()], + format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger(__name__) diff --git a/libp2p/transport/quic/security.py b/libp2p/transport/quic/security.py index 82132b6b..1e265241 100644 --- a/libp2p/transport/quic/security.py +++ b/libp2p/transport/quic/security.py @@ -440,7 +440,8 @@ class QUICTLSConfigManager: "private_key": self.tls_config.private_key, "certificate_chain": [], "alpn_protocols": ["libp2p"], - "verify_mode": True, + "verify_mode": False, + "check_hostname": False, } return config @@ -458,7 +459,8 @@ class QUICTLSConfigManager: "private_key": self.tls_config.private_key, "certificate_chain": [], "alpn_protocols": ["libp2p"], - "verify_mode": True, + "verify_mode": False, + "check_hostname": False, } return config diff --git a/libp2p/transport/quic/transport.py b/libp2p/transport/quic/transport.py index 71d4891e..30218a12 100644 --- a/libp2p/transport/quic/transport.py +++ b/libp2p/transport/quic/transport.py @@ -8,6 +8,7 @@ Updated to include Module 5 security integration. from collections.abc import Iterable import copy import logging +import sys from aioquic.quic.configuration import ( QuicConfiguration, @@ -15,6 +16,7 @@ from aioquic.quic.configuration import ( from aioquic.quic.connection import ( QuicConnection as NativeQUICConnection, ) +from aioquic.quic.logger import QuicLogger import multiaddr import trio @@ -62,8 +64,8 @@ QUIC_DRAFT29_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_DRAFT29 logging.basicConfig( level=logging.DEBUG, - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[logging.StreamHandler()], + format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger(__name__) @@ -290,6 +292,7 @@ class QUICTransport(ITransport): raise QUICDialError(f"Unsupported QUIC version: {quic_version}") config.is_client = True + config.quic_logger = QuicLogger() logger.debug( f"Dialing QUIC connection to {host}:{port} (version: {quic_version})" ) @@ -484,3 +487,10 @@ class QUICTransport(ITransport): """ return self._security_manager + + def get_listener_socket(self) -> trio.socket.SocketType | None: + """Get the socket from the first active listener.""" + for listener in self._listeners: + if listener.is_listening() and listener._socket: + return listener._socket + return None