mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
chore: log cleanup
This commit is contained in:
@ -11,7 +11,7 @@ Fixed to properly separate client and server modes - clients don't start listene
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
import multiaddr
|
||||
from multiaddr import Multiaddr
|
||||
import trio
|
||||
|
||||
from libp2p import new_host
|
||||
@ -33,13 +33,13 @@ async def _echo_stream_handler(stream: INetStream) -> None:
|
||||
print(f"Echo handler error: {e}")
|
||||
try:
|
||||
await stream.close()
|
||||
except:
|
||||
except: # noqa: E722
|
||||
pass
|
||||
|
||||
|
||||
async def run_server(port: int, seed: int | None = None) -> None:
|
||||
"""Run echo server with QUIC transport."""
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/udp/{port}/quic")
|
||||
listen_addr = Multiaddr(f"/ip4/0.0.0.0/udp/{port}/quic")
|
||||
|
||||
if seed:
|
||||
import random
|
||||
@ -116,7 +116,7 @@ async def run_client(destination: str, seed: int | None = None) -> None:
|
||||
async with host.run(listen_addrs=[]): # Empty listen_addrs for client
|
||||
print(f"I am {host.get_id().to_string()}")
|
||||
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
maddr = Multiaddr(destination)
|
||||
info = info_from_p2p_addr(maddr)
|
||||
|
||||
# Connect to server
|
||||
|
||||
@ -282,7 +282,6 @@ def new_host(
|
||||
:param transport_opt: optional dictionary of properties of transport
|
||||
:return: return a host instance
|
||||
"""
|
||||
print("INIT")
|
||||
swarm = new_swarm(
|
||||
key_pair=key_pair,
|
||||
muxer_opt=muxer_opt,
|
||||
|
||||
@ -299,7 +299,9 @@ class BasicHost(IHost):
|
||||
)
|
||||
except MultiselectError as error:
|
||||
peer_id = net_stream.muxed_conn.peer_id
|
||||
print("failed to accept a stream from peer %s, error=%s", peer_id, error)
|
||||
logger.debug(
|
||||
"failed to accept a stream from peer %s, error=%s", peer_id, error
|
||||
)
|
||||
await net_stream.reset()
|
||||
return
|
||||
if protocol is None:
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
from enum import (
|
||||
Enum,
|
||||
)
|
||||
import inspect
|
||||
|
||||
import trio
|
||||
|
||||
@ -165,25 +164,20 @@ class NetStream(INetStream):
|
||||
data = await self.muxed_stream.read(n)
|
||||
return data
|
||||
except MuxedStreamEOF as error:
|
||||
print("NETSTREAM: READ ERROR, RECEIVED EOF")
|
||||
async with self._state_lock:
|
||||
if self.__stream_state == StreamState.CLOSE_WRITE:
|
||||
self.__stream_state = StreamState.CLOSE_BOTH
|
||||
print("NETSTREAM: READ ERROR, REMOVING STREAM")
|
||||
await self._remove()
|
||||
elif self.__stream_state == StreamState.OPEN:
|
||||
print("NETSTREAM: READ ERROR, NEW STATE -> CLOSE_READ")
|
||||
self.__stream_state = StreamState.CLOSE_READ
|
||||
raise StreamEOF() from error
|
||||
except (MuxedStreamReset, QUICStreamClosedError, QUICStreamResetError) as error:
|
||||
print("NETSTREAM: READ ERROR, MUXED STREAM RESET")
|
||||
async with self._state_lock:
|
||||
if self.__stream_state in [
|
||||
StreamState.OPEN,
|
||||
StreamState.CLOSE_READ,
|
||||
StreamState.CLOSE_WRITE,
|
||||
]:
|
||||
print("NETSTREAM: READ ERROR, NEW STATE -> RESET")
|
||||
self.__stream_state = StreamState.RESET
|
||||
await self._remove()
|
||||
raise StreamReset() from error
|
||||
@ -222,8 +216,6 @@ class NetStream(INetStream):
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close stream for writing."""
|
||||
print("NETSTREAM: CLOSING STREAM, CURRENT STATE: ", self.__stream_state)
|
||||
print("CALLED BY: ", inspect.stack()[1].function)
|
||||
async with self._state_lock:
|
||||
if self.__stream_state in [
|
||||
StreamState.CLOSE_BOTH,
|
||||
@ -243,7 +235,6 @@ class NetStream(INetStream):
|
||||
|
||||
async def reset(self) -> None:
|
||||
"""Reset stream, closing both ends."""
|
||||
print("NETSTREAM: RESETING STREAM")
|
||||
async with self._state_lock:
|
||||
if self.__stream_state == StreamState.RESET:
|
||||
return
|
||||
|
||||
@ -59,7 +59,6 @@ from .exceptions import (
|
||||
)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
@ -182,7 +181,13 @@ class Swarm(Service, INetworkService):
|
||||
async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn:
|
||||
"""
|
||||
Try to create a connection to peer_id with addr.
|
||||
:param addr: the address we want to connect with
|
||||
:param peer_id: the peer we want to connect to
|
||||
:raises SwarmException: raised when an error occurs
|
||||
:return: network connection
|
||||
"""
|
||||
# Dial peer (connection to peer does not yet exist)
|
||||
# Transport dials peer (gets back a raw conn)
|
||||
try:
|
||||
raw_conn = await self.transport.dial(addr)
|
||||
except OpenConnectionError as error:
|
||||
@ -191,9 +196,19 @@ class Swarm(Service, INetworkService):
|
||||
f"fail to open connection to peer {peer_id}"
|
||||
) from error
|
||||
|
||||
if isinstance(self.transport, QUICTransport) and isinstance(
|
||||
raw_conn, IMuxedConn
|
||||
):
|
||||
logger.info(
|
||||
"Skipping upgrade for QUIC, QUIC connections are already multiplexed"
|
||||
)
|
||||
swarm_conn = await self.add_conn(raw_conn)
|
||||
return swarm_conn
|
||||
|
||||
logger.debug("dialed peer %s over base transport", peer_id)
|
||||
|
||||
# Standard TCP flow - security then mux upgrade
|
||||
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure
|
||||
# the conn and then mux the conn
|
||||
try:
|
||||
secured_conn = await self.upgrader.upgrade_security(raw_conn, True, peer_id)
|
||||
except SecurityUpgradeFailure as error:
|
||||
@ -227,6 +242,9 @@ class Swarm(Service, INetworkService):
|
||||
logger.debug("attempting to open a stream to peer %s", peer_id)
|
||||
|
||||
swarm_conn = await self.dial_peer(peer_id)
|
||||
dd = "Yes" if swarm_conn is None else "No"
|
||||
|
||||
print(f"Is swarm conn None: {dd}")
|
||||
|
||||
net_stream = await swarm_conn.new_stream()
|
||||
logger.debug("successfully opened a stream to peer %s", peer_id)
|
||||
@ -249,7 +267,7 @@ class Swarm(Service, INetworkService):
|
||||
- Map multiaddr to listener
|
||||
"""
|
||||
# We need to wait until `self.listener_nursery` is created.
|
||||
logger.debug("SWARM LISTEN CALLED")
|
||||
logger.debug("Starting to listen")
|
||||
await self.event_listener_nursery_created.wait()
|
||||
|
||||
success_count = 0
|
||||
|
||||
@ -147,7 +147,6 @@ class MultiselectClient(IMultiselectClient):
|
||||
except MultiselectCommunicatorError as error:
|
||||
raise MultiselectClientError() from error
|
||||
|
||||
print("Response: ", response)
|
||||
if response == protocol:
|
||||
return protocol
|
||||
if response == PROTOCOL_NOT_FOUND_MSG:
|
||||
|
||||
@ -292,11 +292,11 @@ class QUICListener(IListener):
|
||||
async with self._connection_lock:
|
||||
if dest_cid in self._connections:
|
||||
connection_obj = self._connections[dest_cid]
|
||||
print(f"PACKET: Routing to established connection {dest_cid.hex()}")
|
||||
logger.debug(f"Routing to established connection {dest_cid.hex()}")
|
||||
|
||||
elif dest_cid in self._pending_connections:
|
||||
pending_quic_conn = self._pending_connections[dest_cid]
|
||||
print(f"PACKET: Routing to pending connection {dest_cid.hex()}")
|
||||
logger.debug(f"Routing to pending connection {dest_cid.hex()}")
|
||||
|
||||
else:
|
||||
# Check if this is a new connection
|
||||
@ -327,9 +327,6 @@ class QUICListener(IListener):
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing packet from {addr}: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
||||
async def _handle_established_connection_packet(
|
||||
self,
|
||||
@ -340,10 +337,6 @@ class QUICListener(IListener):
|
||||
) -> None:
|
||||
"""Handle packet for established connection WITHOUT holding connection lock."""
|
||||
try:
|
||||
print(f" ESTABLISHED: Handling packet for connection {dest_cid.hex()}")
|
||||
|
||||
# Forward packet to connection object
|
||||
# This may trigger event processing and stream creation
|
||||
await self._route_to_connection(connection_obj, data, addr)
|
||||
|
||||
except Exception as e:
|
||||
@ -358,19 +351,19 @@ class QUICListener(IListener):
|
||||
) -> None:
|
||||
"""Handle packet for pending connection WITHOUT holding connection lock."""
|
||||
try:
|
||||
print(f"Handling packet for pending connection {dest_cid.hex()}")
|
||||
print(f"Packet size: {len(data)} bytes from {addr}")
|
||||
logger.debug(f"Handling packet for pending connection {dest_cid.hex()}")
|
||||
logger.debug(f"Packet size: {len(data)} bytes from {addr}")
|
||||
|
||||
# Feed data to QUIC connection
|
||||
quic_conn.receive_datagram(data, addr, now=time.time())
|
||||
print("PENDING: Datagram received by QUIC connection")
|
||||
logger.debug("PENDING: Datagram received by QUIC connection")
|
||||
|
||||
# Process events - this is crucial for handshake progression
|
||||
print("Processing QUIC events...")
|
||||
logger.debug("Processing QUIC events...")
|
||||
await self._process_quic_events(quic_conn, addr, dest_cid)
|
||||
|
||||
# Send any outgoing packets
|
||||
print("Transmitting response...")
|
||||
logger.debug("Transmitting response...")
|
||||
await self._transmit_for_connection(quic_conn, addr)
|
||||
|
||||
# Check if handshake completed (with minimal locking)
|
||||
@ -378,16 +371,13 @@ class QUICListener(IListener):
|
||||
hasattr(quic_conn, "_handshake_complete")
|
||||
and quic_conn._handshake_complete
|
||||
):
|
||||
print("PENDING: Handshake completed, promoting connection")
|
||||
logger.debug("PENDING: Handshake completed, promoting connection")
|
||||
await self._promote_pending_connection(quic_conn, addr, dest_cid)
|
||||
else:
|
||||
print("Handshake still in progress")
|
||||
logger.debug("Handshake still in progress")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling pending connection {dest_cid.hex()}: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
||||
async def _send_version_negotiation(
|
||||
self, addr: tuple[str, int], source_cid: bytes
|
||||
@ -520,9 +510,6 @@ class QUICListener(IListener):
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling new connection from {addr}: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
self._stats["connections_rejected"] += 1
|
||||
return None
|
||||
|
||||
@ -531,12 +518,11 @@ class QUICListener(IListener):
|
||||
) -> None:
|
||||
"""Handle short header packets for established connections."""
|
||||
try:
|
||||
print(f" SHORT_HDR: Handling short header packet from {addr}")
|
||||
logger.debug(f" SHORT_HDR: Handling short header packet from {addr}")
|
||||
|
||||
# First, try address-based lookup
|
||||
dest_cid = self._addr_to_cid.get(addr)
|
||||
if dest_cid and dest_cid in self._connections:
|
||||
print(f"SHORT_HDR: Routing via address mapping to {dest_cid.hex()}")
|
||||
connection = self._connections[dest_cid]
|
||||
await self._route_to_connection(connection, data, addr)
|
||||
return
|
||||
@ -546,7 +532,6 @@ class QUICListener(IListener):
|
||||
potential_cid = data[1:9]
|
||||
|
||||
if potential_cid in self._connections:
|
||||
print(f"SHORT_HDR: Routing via extracted CID {potential_cid.hex()}")
|
||||
connection = self._connections[potential_cid]
|
||||
|
||||
# Update mappings for future packets
|
||||
@ -556,7 +541,7 @@ class QUICListener(IListener):
|
||||
await self._route_to_connection(connection, data, addr)
|
||||
return
|
||||
|
||||
print(f"❌ SHORT_HDR: No matching connection found for {addr}")
|
||||
logger.debug(f"❌ SHORT_HDR: No matching connection found for {addr}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling short header packet from {addr}: {e}")
|
||||
@ -593,7 +578,7 @@ class QUICListener(IListener):
|
||||
quic_conn.receive_datagram(data, addr, now=time.time())
|
||||
|
||||
if quic_conn.tls:
|
||||
print(f"TLS state after: {quic_conn.tls.state}")
|
||||
logger.debug(f"TLS state after: {quic_conn.tls.state}")
|
||||
|
||||
# Process events - this is crucial for handshake progression
|
||||
await self._process_quic_events(quic_conn, addr, dest_cid)
|
||||
@ -608,9 +593,6 @@ class QUICListener(IListener):
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling pending connection {dest_cid.hex()}: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
||||
# Remove problematic pending connection
|
||||
logger.error(f"Removing problematic connection {dest_cid.hex()}")
|
||||
@ -668,7 +650,7 @@ class QUICListener(IListener):
|
||||
await connection._handle_stream_reset(event)
|
||||
|
||||
elif isinstance(event, events.ConnectionIdIssued):
|
||||
print(
|
||||
logger.debug(
|
||||
f"QUIC EVENT: Connection ID issued: {event.connection_id.hex()}"
|
||||
)
|
||||
# Add new CID to the same address mapping
|
||||
@ -681,7 +663,7 @@ class QUICListener(IListener):
|
||||
)
|
||||
|
||||
elif isinstance(event, events.ConnectionIdRetired):
|
||||
print(f"EVENT: Connection ID retired: {event.connection_id.hex()}")
|
||||
logger.info(f"Connection ID retired: {event.connection_id.hex()}")
|
||||
retired_cid = event.connection_id
|
||||
if retired_cid in self._cid_to_addr:
|
||||
addr = self._cid_to_addr[retired_cid]
|
||||
@ -690,18 +672,10 @@ class QUICListener(IListener):
|
||||
if self._addr_to_cid.get(addr) == retired_cid:
|
||||
del self._addr_to_cid[addr]
|
||||
else:
|
||||
print(f" EVENT: Unhandled event type: {type(event).__name__}")
|
||||
|
||||
if events_processed == 0:
|
||||
print(" EVENT: No events to process")
|
||||
else:
|
||||
print(f" EVENT: Processed {events_processed} events total")
|
||||
logger.warning(f"Unhandled event type: {type(event).__name__}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ EVENT: Error processing events: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
logger.debug(f"❌ EVENT: Error processing events: {e}")
|
||||
|
||||
async def _promote_pending_connection(
|
||||
self, quic_conn: QuicConnection, addr: tuple[str, int], dest_cid: bytes
|
||||
@ -773,7 +747,7 @@ class QUICListener(IListener):
|
||||
logger.debug(f"Successfully added connection {dest_cid.hex()} to swarm")
|
||||
|
||||
try:
|
||||
print(f"Invoking user callback {dest_cid.hex()}")
|
||||
logger.debug(f"Invoking user callback {dest_cid.hex()}")
|
||||
await self._handler(connection)
|
||||
|
||||
except Exception as e:
|
||||
@ -826,7 +800,7 @@ class QUICListener(IListener):
|
||||
) -> None:
|
||||
"""Enhanced transmission diagnostics to analyze datagram content."""
|
||||
try:
|
||||
print(f" TRANSMIT: Starting transmission to {addr}")
|
||||
logger.debug(f" TRANSMIT: Starting transmission to {addr}")
|
||||
|
||||
# Get current timestamp for timing
|
||||
import time
|
||||
@ -834,17 +808,17 @@ class QUICListener(IListener):
|
||||
now = time.time()
|
||||
|
||||
datagrams = quic_conn.datagrams_to_send(now=now)
|
||||
print(f" TRANSMIT: Got {len(datagrams)} datagrams to send")
|
||||
logger.debug(f" TRANSMIT: Got {len(datagrams)} datagrams to send")
|
||||
|
||||
if not datagrams:
|
||||
print("⚠️ TRANSMIT: No datagrams to send")
|
||||
logger.debug("⚠️ TRANSMIT: No datagrams to send")
|
||||
return
|
||||
|
||||
for i, (datagram, dest_addr) in enumerate(datagrams):
|
||||
print(f" TRANSMIT: Analyzing datagram {i}")
|
||||
print(f" TRANSMIT: Datagram size: {len(datagram)} bytes")
|
||||
print(f" TRANSMIT: Destination: {dest_addr}")
|
||||
print(f" TRANSMIT: Expected destination: {addr}")
|
||||
logger.debug(f" TRANSMIT: Analyzing datagram {i}")
|
||||
logger.debug(f" TRANSMIT: Datagram size: {len(datagram)} bytes")
|
||||
logger.debug(f" TRANSMIT: Destination: {dest_addr}")
|
||||
logger.debug(f" TRANSMIT: Expected destination: {addr}")
|
||||
|
||||
# Analyze datagram content
|
||||
if len(datagram) > 0:
|
||||
@ -862,7 +836,7 @@ class QUICListener(IListener):
|
||||
break
|
||||
|
||||
if not crypto_frame_found:
|
||||
print("❌ TRANSMIT: NO CRYPTO frame found in datagram!")
|
||||
logger.error("No CRYPTO frame found in datagram!")
|
||||
# Look for other frame types
|
||||
frame_types_found = set()
|
||||
for offset in range(len(datagram)):
|
||||
@ -876,25 +850,13 @@ class QUICListener(IListener):
|
||||
|
||||
if self._socket:
|
||||
try:
|
||||
print(f" TRANSMIT: Sending datagram {i} via socket...")
|
||||
await self._socket.sendto(datagram, addr)
|
||||
print(f"TRANSMIT: Successfully sent datagram {i}")
|
||||
except Exception as send_error:
|
||||
print(f"❌ TRANSMIT: Socket send failed: {send_error}")
|
||||
logger.error(f"Socket send failed: {send_error}")
|
||||
else:
|
||||
print("❌ TRANSMIT: No socket available!")
|
||||
|
||||
# Check if there are more datagrams after sending
|
||||
remaining_datagrams = quic_conn.datagrams_to_send(now=time.time())
|
||||
logger.debug(
|
||||
f" TRANSMIT: After sending, {len(remaining_datagrams)} datagrams remain"
|
||||
)
|
||||
|
||||
logger.error("No socket available!")
|
||||
except Exception as e:
|
||||
print(f"❌ TRANSMIT: Transmission error: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
logger.debug(f"Transmission error: {e}")
|
||||
|
||||
async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool:
|
||||
"""Start listening on the given multiaddr with enhanced connection handling."""
|
||||
|
||||
Reference in New Issue
Block a user