diff --git a/examples/echo/echo_quic.py b/examples/echo/echo_quic.py index cdead8dd..009c98df 100644 --- a/examples/echo/echo_quic.py +++ b/examples/echo/echo_quic.py @@ -11,7 +11,7 @@ Fixed to properly separate client and server modes - clients don't start listene import argparse import logging -import multiaddr +from multiaddr import Multiaddr import trio from libp2p import new_host @@ -33,13 +33,13 @@ async def _echo_stream_handler(stream: INetStream) -> None: print(f"Echo handler error: {e}") try: await stream.close() - except: + except: # noqa: E722 pass async def run_server(port: int, seed: int | None = None) -> None: """Run echo server with QUIC transport.""" - listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/udp/{port}/quic") + listen_addr = Multiaddr(f"/ip4/0.0.0.0/udp/{port}/quic") if seed: import random @@ -116,7 +116,7 @@ async def run_client(destination: str, seed: int | None = None) -> None: async with host.run(listen_addrs=[]): # Empty listen_addrs for client print(f"I am {host.get_id().to_string()}") - maddr = multiaddr.Multiaddr(destination) + maddr = Multiaddr(destination) info = info_from_p2p_addr(maddr) # Connect to server diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 59a42ff6..d87e14ef 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -282,7 +282,6 @@ def new_host( :param transport_opt: optional dictionary of properties of transport :return: return a host instance """ - print("INIT") swarm = new_swarm( key_pair=key_pair, muxer_opt=muxer_opt, diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index e32c48ac..a0311bd8 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -299,7 +299,9 @@ class BasicHost(IHost): ) except MultiselectError as error: peer_id = net_stream.muxed_conn.peer_id - print("failed to accept a stream from peer %s, error=%s", peer_id, error) + logger.debug( + "failed to accept a stream from peer %s, error=%s", peer_id, error + ) await net_stream.reset() return if protocol is None: diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 5e40f775..49daab9c 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,7 +1,6 @@ from enum import ( Enum, ) -import inspect import trio @@ -165,25 +164,20 @@ class NetStream(INetStream): data = await self.muxed_stream.read(n) return data except MuxedStreamEOF as error: - print("NETSTREAM: READ ERROR, RECEIVED EOF") async with self._state_lock: if self.__stream_state == StreamState.CLOSE_WRITE: self.__stream_state = StreamState.CLOSE_BOTH - print("NETSTREAM: READ ERROR, REMOVING STREAM") await self._remove() elif self.__stream_state == StreamState.OPEN: - print("NETSTREAM: READ ERROR, NEW STATE -> CLOSE_READ") self.__stream_state = StreamState.CLOSE_READ raise StreamEOF() from error except (MuxedStreamReset, QUICStreamClosedError, QUICStreamResetError) as error: - print("NETSTREAM: READ ERROR, MUXED STREAM RESET") async with self._state_lock: if self.__stream_state in [ StreamState.OPEN, StreamState.CLOSE_READ, StreamState.CLOSE_WRITE, ]: - print("NETSTREAM: READ ERROR, NEW STATE -> RESET") self.__stream_state = StreamState.RESET await self._remove() raise StreamReset() from error @@ -222,8 +216,6 @@ class NetStream(INetStream): async def close(self) -> None: """Close stream for writing.""" - print("NETSTREAM: CLOSING STREAM, CURRENT STATE: ", self.__stream_state) - print("CALLED BY: ", inspect.stack()[1].function) async with self._state_lock: if self.__stream_state in [ StreamState.CLOSE_BOTH, @@ -243,7 +235,6 @@ class NetStream(INetStream): async def reset(self) -> None: """Reset stream, closing both ends.""" - print("NETSTREAM: RESETING STREAM") async with self._state_lock: if self.__stream_state == StreamState.RESET: return diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 12b6378c..a4230507 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -59,7 +59,6 @@ from .exceptions import ( ) logging.basicConfig( - level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) @@ -182,7 +181,13 @@ class Swarm(Service, INetworkService): async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn: """ Try to create a connection to peer_id with addr. + :param addr: the address we want to connect with + :param peer_id: the peer we want to connect to + :raises SwarmException: raised when an error occurs + :return: network connection """ + # Dial peer (connection to peer does not yet exist) + # Transport dials peer (gets back a raw conn) try: raw_conn = await self.transport.dial(addr) except OpenConnectionError as error: @@ -191,9 +196,19 @@ class Swarm(Service, INetworkService): f"fail to open connection to peer {peer_id}" ) from error + if isinstance(self.transport, QUICTransport) and isinstance( + raw_conn, IMuxedConn + ): + logger.info( + "Skipping upgrade for QUIC, QUIC connections are already multiplexed" + ) + swarm_conn = await self.add_conn(raw_conn) + return swarm_conn + logger.debug("dialed peer %s over base transport", peer_id) - # Standard TCP flow - security then mux upgrade + # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure + # the conn and then mux the conn try: secured_conn = await self.upgrader.upgrade_security(raw_conn, True, peer_id) except SecurityUpgradeFailure as error: @@ -227,6 +242,9 @@ class Swarm(Service, INetworkService): logger.debug("attempting to open a stream to peer %s", peer_id) swarm_conn = await self.dial_peer(peer_id) + dd = "Yes" if swarm_conn is None else "No" + + print(f"Is swarm conn None: {dd}") net_stream = await swarm_conn.new_stream() logger.debug("successfully opened a stream to peer %s", peer_id) @@ -249,7 +267,7 @@ class Swarm(Service, INetworkService): - Map multiaddr to listener """ # We need to wait until `self.listener_nursery` is created. - logger.debug("SWARM LISTEN CALLED") + logger.debug("Starting to listen") await self.event_listener_nursery_created.wait() success_count = 0 diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 837ea6ee..e5ae315b 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -147,7 +147,6 @@ class MultiselectClient(IMultiselectClient): except MultiselectCommunicatorError as error: raise MultiselectClientError() from error - print("Response: ", response) if response == protocol: return protocol if response == PROTOCOL_NOT_FOUND_MSG: diff --git a/libp2p/transport/quic/listener.py b/libp2p/transport/quic/listener.py index 0ad08813..2e6bf3de 100644 --- a/libp2p/transport/quic/listener.py +++ b/libp2p/transport/quic/listener.py @@ -292,11 +292,11 @@ class QUICListener(IListener): async with self._connection_lock: if dest_cid in self._connections: connection_obj = self._connections[dest_cid] - print(f"PACKET: Routing to established connection {dest_cid.hex()}") + logger.debug(f"Routing to established connection {dest_cid.hex()}") elif dest_cid in self._pending_connections: pending_quic_conn = self._pending_connections[dest_cid] - print(f"PACKET: Routing to pending connection {dest_cid.hex()}") + logger.debug(f"Routing to pending connection {dest_cid.hex()}") else: # Check if this is a new connection @@ -327,9 +327,6 @@ class QUICListener(IListener): except Exception as e: logger.error(f"Error processing packet from {addr}: {e}") - import traceback - - traceback.print_exc() async def _handle_established_connection_packet( self, @@ -340,10 +337,6 @@ class QUICListener(IListener): ) -> None: """Handle packet for established connection WITHOUT holding connection lock.""" try: - print(f" ESTABLISHED: Handling packet for connection {dest_cid.hex()}") - - # Forward packet to connection object - # This may trigger event processing and stream creation await self._route_to_connection(connection_obj, data, addr) except Exception as e: @@ -358,19 +351,19 @@ class QUICListener(IListener): ) -> None: """Handle packet for pending connection WITHOUT holding connection lock.""" try: - print(f"Handling packet for pending connection {dest_cid.hex()}") - print(f"Packet size: {len(data)} bytes from {addr}") + logger.debug(f"Handling packet for pending connection {dest_cid.hex()}") + logger.debug(f"Packet size: {len(data)} bytes from {addr}") # Feed data to QUIC connection quic_conn.receive_datagram(data, addr, now=time.time()) - print("PENDING: Datagram received by QUIC connection") + logger.debug("PENDING: Datagram received by QUIC connection") # Process events - this is crucial for handshake progression - print("Processing QUIC events...") + logger.debug("Processing QUIC events...") await self._process_quic_events(quic_conn, addr, dest_cid) # Send any outgoing packets - print("Transmitting response...") + logger.debug("Transmitting response...") await self._transmit_for_connection(quic_conn, addr) # Check if handshake completed (with minimal locking) @@ -378,16 +371,13 @@ class QUICListener(IListener): hasattr(quic_conn, "_handshake_complete") and quic_conn._handshake_complete ): - print("PENDING: Handshake completed, promoting connection") + logger.debug("PENDING: Handshake completed, promoting connection") await self._promote_pending_connection(quic_conn, addr, dest_cid) else: - print("Handshake still in progress") + logger.debug("Handshake still in progress") except Exception as e: logger.error(f"Error handling pending connection {dest_cid.hex()}: {e}") - import traceback - - traceback.print_exc() async def _send_version_negotiation( self, addr: tuple[str, int], source_cid: bytes @@ -520,9 +510,6 @@ class QUICListener(IListener): except Exception as e: logger.error(f"Error handling new connection from {addr}: {e}") - import traceback - - traceback.print_exc() self._stats["connections_rejected"] += 1 return None @@ -531,12 +518,11 @@ class QUICListener(IListener): ) -> None: """Handle short header packets for established connections.""" try: - print(f" SHORT_HDR: Handling short header packet from {addr}") + logger.debug(f" SHORT_HDR: Handling short header packet from {addr}") # First, try address-based lookup dest_cid = self._addr_to_cid.get(addr) if dest_cid and dest_cid in self._connections: - print(f"SHORT_HDR: Routing via address mapping to {dest_cid.hex()}") connection = self._connections[dest_cid] await self._route_to_connection(connection, data, addr) return @@ -546,7 +532,6 @@ class QUICListener(IListener): potential_cid = data[1:9] if potential_cid in self._connections: - print(f"SHORT_HDR: Routing via extracted CID {potential_cid.hex()}") connection = self._connections[potential_cid] # Update mappings for future packets @@ -556,7 +541,7 @@ class QUICListener(IListener): await self._route_to_connection(connection, data, addr) return - print(f"❌ SHORT_HDR: No matching connection found for {addr}") + logger.debug(f"❌ SHORT_HDR: No matching connection found for {addr}") except Exception as e: logger.error(f"Error handling short header packet from {addr}: {e}") @@ -593,7 +578,7 @@ class QUICListener(IListener): quic_conn.receive_datagram(data, addr, now=time.time()) if quic_conn.tls: - print(f"TLS state after: {quic_conn.tls.state}") + logger.debug(f"TLS state after: {quic_conn.tls.state}") # Process events - this is crucial for handshake progression await self._process_quic_events(quic_conn, addr, dest_cid) @@ -608,9 +593,6 @@ class QUICListener(IListener): except Exception as e: logger.error(f"Error handling pending connection {dest_cid.hex()}: {e}") - import traceback - - traceback.print_exc() # Remove problematic pending connection logger.error(f"Removing problematic connection {dest_cid.hex()}") @@ -668,7 +650,7 @@ class QUICListener(IListener): await connection._handle_stream_reset(event) elif isinstance(event, events.ConnectionIdIssued): - print( + logger.debug( f"QUIC EVENT: Connection ID issued: {event.connection_id.hex()}" ) # Add new CID to the same address mapping @@ -681,7 +663,7 @@ class QUICListener(IListener): ) elif isinstance(event, events.ConnectionIdRetired): - print(f"EVENT: Connection ID retired: {event.connection_id.hex()}") + logger.info(f"Connection ID retired: {event.connection_id.hex()}") retired_cid = event.connection_id if retired_cid in self._cid_to_addr: addr = self._cid_to_addr[retired_cid] @@ -690,18 +672,10 @@ class QUICListener(IListener): if self._addr_to_cid.get(addr) == retired_cid: del self._addr_to_cid[addr] else: - print(f" EVENT: Unhandled event type: {type(event).__name__}") - - if events_processed == 0: - print(" EVENT: No events to process") - else: - print(f" EVENT: Processed {events_processed} events total") + logger.warning(f"Unhandled event type: {type(event).__name__}") except Exception as e: - print(f"❌ EVENT: Error processing events: {e}") - import traceback - - traceback.print_exc() + logger.debug(f"❌ EVENT: Error processing events: {e}") async def _promote_pending_connection( self, quic_conn: QuicConnection, addr: tuple[str, int], dest_cid: bytes @@ -773,7 +747,7 @@ class QUICListener(IListener): logger.debug(f"Successfully added connection {dest_cid.hex()} to swarm") try: - print(f"Invoking user callback {dest_cid.hex()}") + logger.debug(f"Invoking user callback {dest_cid.hex()}") await self._handler(connection) except Exception as e: @@ -826,7 +800,7 @@ class QUICListener(IListener): ) -> None: """Enhanced transmission diagnostics to analyze datagram content.""" try: - print(f" TRANSMIT: Starting transmission to {addr}") + logger.debug(f" TRANSMIT: Starting transmission to {addr}") # Get current timestamp for timing import time @@ -834,17 +808,17 @@ class QUICListener(IListener): now = time.time() datagrams = quic_conn.datagrams_to_send(now=now) - print(f" TRANSMIT: Got {len(datagrams)} datagrams to send") + logger.debug(f" TRANSMIT: Got {len(datagrams)} datagrams to send") if not datagrams: - print("⚠️ TRANSMIT: No datagrams to send") + logger.debug("⚠️ TRANSMIT: No datagrams to send") return for i, (datagram, dest_addr) in enumerate(datagrams): - print(f" TRANSMIT: Analyzing datagram {i}") - print(f" TRANSMIT: Datagram size: {len(datagram)} bytes") - print(f" TRANSMIT: Destination: {dest_addr}") - print(f" TRANSMIT: Expected destination: {addr}") + logger.debug(f" TRANSMIT: Analyzing datagram {i}") + logger.debug(f" TRANSMIT: Datagram size: {len(datagram)} bytes") + logger.debug(f" TRANSMIT: Destination: {dest_addr}") + logger.debug(f" TRANSMIT: Expected destination: {addr}") # Analyze datagram content if len(datagram) > 0: @@ -862,7 +836,7 @@ class QUICListener(IListener): break if not crypto_frame_found: - print("❌ TRANSMIT: NO CRYPTO frame found in datagram!") + logger.error("No CRYPTO frame found in datagram!") # Look for other frame types frame_types_found = set() for offset in range(len(datagram)): @@ -876,25 +850,13 @@ class QUICListener(IListener): if self._socket: try: - print(f" TRANSMIT: Sending datagram {i} via socket...") await self._socket.sendto(datagram, addr) - print(f"TRANSMIT: Successfully sent datagram {i}") except Exception as send_error: - print(f"❌ TRANSMIT: Socket send failed: {send_error}") + logger.error(f"Socket send failed: {send_error}") else: - print("❌ TRANSMIT: No socket available!") - - # Check if there are more datagrams after sending - remaining_datagrams = quic_conn.datagrams_to_send(now=time.time()) - logger.debug( - f" TRANSMIT: After sending, {len(remaining_datagrams)} datagrams remain" - ) - + logger.error("No socket available!") except Exception as e: - print(f"❌ TRANSMIT: Transmission error: {e}") - import traceback - - traceback.print_exc() + logger.debug(f"Transmission error: {e}") async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool: """Start listening on the given multiaddr with enhanced connection handling."""