From 721da9364e52d25c2d186f63e4df32eba6fd79de Mon Sep 17 00:00:00 2001 From: parth-soni07 Date: Sun, 21 Sep 2025 01:36:06 +0530 Subject: [PATCH] Fixed variable imports --- libp2p/relay/circuit_v2/config.py | 32 +++++++++++++++++++++ libp2p/relay/circuit_v2/dcutr.py | 42 +++++++++++++++++++--------- libp2p/relay/circuit_v2/discovery.py | 32 +++++++++++++-------- libp2p/relay/circuit_v2/protocol.py | 40 ++++++++++++++++---------- libp2p/relay/circuit_v2/transport.py | 2 ++ newsfragments/917.internal.rst | 11 ++++++++ 6 files changed, 121 insertions(+), 38 deletions(-) create mode 100644 newsfragments/917.internal.rst diff --git a/libp2p/relay/circuit_v2/config.py b/libp2p/relay/circuit_v2/config.py index 8eafbe91..d56839e0 100644 --- a/libp2p/relay/circuit_v2/config.py +++ b/libp2p/relay/circuit_v2/config.py @@ -46,6 +46,35 @@ MAX_AUTO_RELAY_ATTEMPTS = 3 RESERVATION_REFRESH_THRESHOLD = 0.8 # Refresh at 80% of TTL MAX_CONCURRENT_RESERVATIONS = 2 +# Timeout constants for different components +DEFAULT_DISCOVERY_STREAM_TIMEOUT = 10 # seconds +DEFAULT_PEER_PROTOCOL_TIMEOUT = 5 # seconds +DEFAULT_PROTOCOL_READ_TIMEOUT = 15 # seconds +DEFAULT_PROTOCOL_WRITE_TIMEOUT = 15 # seconds +DEFAULT_PROTOCOL_CLOSE_TIMEOUT = 10 # seconds +DEFAULT_DCUTR_READ_TIMEOUT = 30 # seconds +DEFAULT_DCUTR_WRITE_TIMEOUT = 30 # seconds +DEFAULT_DIAL_TIMEOUT = 10 # seconds + + +@dataclass +class TimeoutConfig: + """Timeout configuration for different Circuit Relay v2 components.""" + + # Discovery timeouts + discovery_stream_timeout: int = DEFAULT_DISCOVERY_STREAM_TIMEOUT + peer_protocol_timeout: int = DEFAULT_PEER_PROTOCOL_TIMEOUT + + # Core protocol timeouts + protocol_read_timeout: int = DEFAULT_PROTOCOL_READ_TIMEOUT + protocol_write_timeout: int = DEFAULT_PROTOCOL_WRITE_TIMEOUT + protocol_close_timeout: int = DEFAULT_PROTOCOL_CLOSE_TIMEOUT + + # DCUtR timeouts + dcutr_read_timeout: int = DEFAULT_DCUTR_READ_TIMEOUT + dcutr_write_timeout: int = DEFAULT_DCUTR_WRITE_TIMEOUT + dial_timeout: int = DEFAULT_DIAL_TIMEOUT + # Relay roles enum class RelayRole(Flag): @@ -83,6 +112,9 @@ class RelayConfig: max_circuit_duration: int = DEFAULT_MAX_CIRCUIT_DURATION max_circuit_bytes: int = DEFAULT_MAX_CIRCUIT_BYTES + # Timeout configuration + timeouts: TimeoutConfig = field(default_factory=TimeoutConfig) + # --------------------------------------------------------------------- # Backwards-compat boolean helpers. Existing code that still accesses # ``cfg.enable_hop, cfg.enable_stop, cfg.enable_client`` will continue to work. diff --git a/libp2p/relay/circuit_v2/dcutr.py b/libp2p/relay/circuit_v2/dcutr.py index 2cece5d2..644ea75f 100644 --- a/libp2p/relay/circuit_v2/dcutr.py +++ b/libp2p/relay/circuit_v2/dcutr.py @@ -29,6 +29,11 @@ from libp2p.peer.id import ( from libp2p.peer.peerinfo import ( PeerInfo, ) +from libp2p.relay.circuit_v2.config import ( + DEFAULT_DCUTR_READ_TIMEOUT, + DEFAULT_DCUTR_WRITE_TIMEOUT, + DEFAULT_DIAL_TIMEOUT, +) from libp2p.relay.circuit_v2.nat import ( ReachabilityChecker, ) @@ -47,11 +52,7 @@ PROTOCOL_ID = TProtocol("/libp2p/dcutr") # Maximum message size for DCUtR (4KiB as per spec) MAX_MESSAGE_SIZE = 4 * 1024 -# Timeouts -STREAM_READ_TIMEOUT = 30 # seconds -STREAM_WRITE_TIMEOUT = 30 # seconds -DIAL_TIMEOUT = 10 # seconds - +# DCUtR protocol constants # Maximum number of hole punch attempts per peer MAX_HOLE_PUNCH_ATTEMPTS = 5 @@ -70,7 +71,13 @@ class DCUtRProtocol(Service): hole punching, after they have established an initial connection through a relay. """ - def __init__(self, host: IHost): + def __init__( + self, + host: IHost, + read_timeout: int = DEFAULT_DCUTR_READ_TIMEOUT, + write_timeout: int = DEFAULT_DCUTR_WRITE_TIMEOUT, + dial_timeout: int = DEFAULT_DIAL_TIMEOUT, + ): """ Initialize the DCUtR protocol. @@ -78,10 +85,19 @@ class DCUtRProtocol(Service): ---------- host : IHost The libp2p host this protocol is running on + read_timeout : int + Timeout for stream read operations, in seconds + write_timeout : int + Timeout for stream write operations, in seconds + dial_timeout : int + Timeout for dial operations, in seconds """ super().__init__() self.host = host + self.read_timeout = read_timeout + self.write_timeout = write_timeout + self.dial_timeout = dial_timeout self.event_started = trio.Event() self._hole_punch_attempts: dict[ID, int] = {} self._direct_connections: set[ID] = set() @@ -161,7 +177,7 @@ class DCUtRProtocol(Service): try: # Read the CONNECT message - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): msg_bytes = await stream.read(MAX_MESSAGE_SIZE) # Parse the message @@ -196,7 +212,7 @@ class DCUtRProtocol(Service): response.type = HolePunch.CONNECT response.ObsAddrs.extend(our_addrs) - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): await stream.write(response.SerializeToString()) logger.debug( @@ -206,7 +222,7 @@ class DCUtRProtocol(Service): ) # Wait for SYNC message - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): sync_bytes = await stream.read(MAX_MESSAGE_SIZE) # Parse the SYNC message @@ -300,7 +316,7 @@ class DCUtRProtocol(Service): connect_msg.ObsAddrs.extend(our_addrs) start_time = time.time() - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): await stream.write(connect_msg.SerializeToString()) logger.debug( @@ -310,7 +326,7 @@ class DCUtRProtocol(Service): ) # Receive the peer's CONNECT message - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): resp_bytes = await stream.read(MAX_MESSAGE_SIZE) # Calculate RTT @@ -349,7 +365,7 @@ class DCUtRProtocol(Service): sync_msg = HolePunch() sync_msg.type = HolePunch.SYNC - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): await stream.write(sync_msg.SerializeToString()) logger.debug("Sent SYNC message to %s", peer_id) @@ -468,7 +484,7 @@ class DCUtRProtocol(Service): peer_info = PeerInfo(peer_id, [addr]) # Try to connect with timeout - with trio.fail_after(DIAL_TIMEOUT): + with trio.fail_after(self.dial_timeout): await self.host.connect(peer_info) logger.info("Successfully connected to %s at %s", peer_id, addr) diff --git a/libp2p/relay/circuit_v2/discovery.py b/libp2p/relay/circuit_v2/discovery.py index 45775647..50ee8d90 100644 --- a/libp2p/relay/circuit_v2/discovery.py +++ b/libp2p/relay/circuit_v2/discovery.py @@ -31,6 +31,11 @@ from libp2p.tools.async_service import ( Service, ) +from .config import ( + DEFAULT_DISCOVERY_INTERVAL, + DEFAULT_DISCOVERY_STREAM_TIMEOUT, + DEFAULT_PEER_PROTOCOL_TIMEOUT, +) from .pb.circuit_pb2 import ( HopMessage, ) @@ -43,11 +48,8 @@ from .protocol_buffer import ( logger = logging.getLogger("libp2p.relay.circuit_v2.discovery") -# Constants +# Discovery constants MAX_RELAYS_TO_TRACK = 10 -DEFAULT_DISCOVERY_INTERVAL = 60 # seconds -STREAM_TIMEOUT = 10 # seconds -PEER_PROTOCOL_TIMEOUT = 5 # seconds # Extended interfaces for type checking @@ -87,6 +89,8 @@ class RelayDiscovery(Service): auto_reserve: bool = False, discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL, max_relays: int = MAX_RELAYS_TO_TRACK, + stream_timeout: int = DEFAULT_DISCOVERY_STREAM_TIMEOUT, + peer_protocol_timeout: int = DEFAULT_PEER_PROTOCOL_TIMEOUT, ) -> None: """ Initialize the discovery service. @@ -101,6 +105,10 @@ class RelayDiscovery(Service): How often to run discovery, in seconds max_relays : int Maximum number of relays to track + stream_timeout : int + Timeout for stream operations during discovery, in seconds + peer_protocol_timeout : int + Timeout for checking peer protocol support, in seconds """ super().__init__() @@ -108,6 +116,8 @@ class RelayDiscovery(Service): self.auto_reserve = auto_reserve self.discovery_interval = discovery_interval self.max_relays = max_relays + self.stream_timeout = stream_timeout + self.peer_protocol_timeout = peer_protocol_timeout self._discovered_relays: dict[ID, RelayInfo] = {} self._protocol_cache: dict[ ID, set[str] @@ -167,19 +177,19 @@ class RelayDiscovery(Service): continue # Don't wait too long for protocol info - with trio.move_on_after(PEER_PROTOCOL_TIMEOUT): + with trio.move_on_after(self.peer_protocol_timeout): if await self._supports_relay_protocol(peer_id): await self._add_relay(peer_id) # Limit number of relays we track - if len(self._discovered_relays) > MAX_RELAYS_TO_TRACK: + if len(self._discovered_relays) > self.max_relays: # Sort by last seen time and keep only the most recent ones sorted_relays = sorted( self._discovered_relays.items(), key=lambda x: x[1].last_seen, reverse=True, ) - to_remove = sorted_relays[MAX_RELAYS_TO_TRACK:] + to_remove = sorted_relays[self.max_relays :] for peer_id, _ in to_remove: del self._discovered_relays[peer_id] @@ -265,7 +275,7 @@ class RelayDiscovery(Service): async def _check_via_direct_connection(self, peer_id: ID) -> bool | None: """Check protocol support via direct connection.""" try: - with trio.fail_after(STREAM_TIMEOUT): + with trio.fail_after(self.stream_timeout): stream = await self.host.new_stream(peer_id, [PROTOCOL_ID]) if stream: await stream.close() @@ -371,7 +381,7 @@ class RelayDiscovery(Service): # Open a stream to the relay with timeout try: - with trio.fail_after(STREAM_TIMEOUT): + with trio.fail_after(self.stream_timeout): stream = await self.host.new_stream(peer_id, [PROTOCOL_ID]) if not stream: logger.error("Failed to open stream to relay %s", peer_id) @@ -387,7 +397,7 @@ class RelayDiscovery(Service): peer=self.host.get_id().to_bytes(), ) - with trio.fail_after(STREAM_TIMEOUT): + with trio.fail_after(self.stream_timeout): await stream.write(request.SerializeToString()) # Wait for response @@ -464,7 +474,7 @@ class RelayDiscovery(Service): for peer_id, relay_info in self._discovered_relays.items(): # Check if relay hasn't been seen in a while (3x discovery interval) - if now - relay_info.last_seen > DEFAULT_DISCOVERY_INTERVAL * 3: + if now - relay_info.last_seen > self.discovery_interval * 3: to_remove.append(peer_id) continue diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index 3c378897..a6a80c20 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -43,6 +43,9 @@ from .config import ( DEFAULT_MAX_CIRCUIT_CONNS, DEFAULT_MAX_CIRCUIT_DURATION, DEFAULT_MAX_RESERVATIONS, + DEFAULT_PROTOCOL_CLOSE_TIMEOUT, + DEFAULT_PROTOCOL_READ_TIMEOUT, + DEFAULT_PROTOCOL_WRITE_TIMEOUT, ) from .pb.circuit_pb2 import ( HopMessage, @@ -80,10 +83,7 @@ DEFAULT_RELAY_LIMITS = RelayLimits( max_reservations=DEFAULT_MAX_RESERVATIONS, ) -# Stream operation timeouts -STREAM_READ_TIMEOUT = 15 # seconds -STREAM_WRITE_TIMEOUT = 15 # seconds -STREAM_CLOSE_TIMEOUT = 10 # seconds +# Stream operation constants MAX_READ_RETRIES = 5 # Maximum number of read retries @@ -127,6 +127,9 @@ class CircuitV2Protocol(Service): host: IHost, limits: RelayLimits | None = None, allow_hop: bool = False, + read_timeout: int = DEFAULT_PROTOCOL_READ_TIMEOUT, + write_timeout: int = DEFAULT_PROTOCOL_WRITE_TIMEOUT, + close_timeout: int = DEFAULT_PROTOCOL_CLOSE_TIMEOUT, ) -> None: """ Initialize a Circuit Relay v2 protocol instance. @@ -139,11 +142,20 @@ class CircuitV2Protocol(Service): Resource limits for the relay allow_hop : bool Whether to allow this node to act as a relay + read_timeout : int + Timeout for stream read operations, in seconds + write_timeout : int + Timeout for stream write operations, in seconds + close_timeout : int + Timeout for stream close operations, in seconds """ self.host = host self.limits = limits or DEFAULT_RELAY_LIMITS self.allow_hop = allow_hop + self.read_timeout = read_timeout + self.write_timeout = write_timeout + self.close_timeout = close_timeout self.resource_manager = RelayResourceManager(self.limits) self._active_relays: dict[ID, tuple[INetStream, INetStream | None]] = {} self.event_started = trio.Event() @@ -188,7 +200,7 @@ class CircuitV2Protocol(Service): return try: - with trio.fail_after(STREAM_CLOSE_TIMEOUT): + with trio.fail_after(self.close_timeout): await stream.close() except Exception: try: @@ -230,7 +242,7 @@ class CircuitV2Protocol(Service): while retries < max_retries: try: - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): # Try reading with timeout logger.debug( "Attempting to read from stream (attempt %d/%d)", @@ -307,7 +319,7 @@ class CircuitV2Protocol(Service): # First, handle the read timeout gracefully try: with trio.fail_after( - STREAM_READ_TIMEOUT * 2 + self.read_timeout * 2 ): # Double the timeout for reading msg_bytes = await stream.read() if not msg_bytes: @@ -428,7 +440,7 @@ class CircuitV2Protocol(Service): """ try: # Read the incoming message with timeout - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): msg_bytes = await stream.read() stop_msg = StopMessage() stop_msg.ParseFromString(msg_bytes) @@ -535,7 +547,7 @@ class CircuitV2Protocol(Service): ttl = self.resource_manager.reserve(peer_id) # Send reservation success response - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): status = create_status( code=StatusCode.OK, message="Reservation accepted" ) @@ -586,7 +598,7 @@ class CircuitV2Protocol(Service): # Always close the stream when done with reservation if cast(INetStreamWithExtras, stream).is_open(): try: - with trio.fail_after(STREAM_CLOSE_TIMEOUT): + with trio.fail_after(self.close_timeout): await stream.close() except Exception as close_err: logger.error("Error closing stream: %s", str(close_err)) @@ -622,7 +634,7 @@ class CircuitV2Protocol(Service): self._active_relays[peer_id] = (stream, None) # Try to connect to the destination with timeout - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): dst_stream = await self.host.new_stream(peer_id, [STOP_PROTOCOL_ID]) if not dst_stream: raise ConnectionError("Could not connect to destination") @@ -751,7 +763,7 @@ class CircuitV2Protocol(Service): # Write data with timeout try: - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): await dst_stream.write(data) except trio.TooSlowError: logger.error("Timeout writing in %s", direction.name) @@ -786,7 +798,7 @@ class CircuitV2Protocol(Service): """Send a status message.""" try: logger.debug("Sending status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(self.write_timeout * 2): # Double the timeout # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( @@ -824,7 +836,7 @@ class CircuitV2Protocol(Service): """Send a status message on a STOP stream.""" try: logger.debug("Sending stop status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(self.write_timeout * 2): # Double the timeout # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index ffd31090..3632615a 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -89,6 +89,8 @@ class CircuitV2Transport(ITransport): auto_reserve=config.enable_client, discovery_interval=config.discovery_interval, max_relays=config.max_relays, + stream_timeout=config.timeouts.discovery_stream_timeout, + peer_protocol_timeout=config.timeouts.peer_protocol_timeout, ) async def dial( diff --git a/newsfragments/917.internal.rst b/newsfragments/917.internal.rst new file mode 100644 index 00000000..ed06f3ed --- /dev/null +++ b/newsfragments/917.internal.rst @@ -0,0 +1,11 @@ +Replace magic numbers with named constants and enums for clarity and maintainability + +**Key Changes:** +- **Introduced type-safe enums** for better code clarity: + - `RelayRole(Flag)` enum with HOP, STOP, CLIENT roles supporting bitwise combinations (e.g., `RelayRole.HOP | RelayRole.STOP`) + - `ReservationStatus(Enum)` for reservation lifecycle management (ACTIVE, EXPIRED, REJECTED) +- **Replaced magic numbers with named constants** throughout the codebase, improving code maintainability and eliminating hardcoded timeout values (15s, 30s, 10s) with descriptive constant names +- **Added comprehensive timeout configuration system** with new `TimeoutConfig` dataclass supporting component-specific timeouts (discovery, protocol, DCUtR) +- **Enhanced configurability** of `RelayDiscovery`, `CircuitV2Protocol`, and `DCUtRProtocol` constructors with optional timeout parameters +- **Improved architecture consistency** with clean configuration flow across all circuit relay components +**Backward Compatibility:** All changes maintain full backward compatibility. Existing code continues to work unchanged while new timeout configuration options are available for users who need them.