From 4a36d6efeb276cb55111e4a16714203fd1fff78b Mon Sep 17 00:00:00 2001 From: parth-soni07 Date: Tue, 9 Sep 2025 13:24:07 +0530 Subject: [PATCH 1/8] Replace magic number with named constants --- libp2p/relay/circuit_v2/config.py | 123 +++++++++++++++++++++------ libp2p/relay/circuit_v2/dcutr.py | 11 +-- libp2p/relay/circuit_v2/discovery.py | 20 +++-- libp2p/relay/circuit_v2/protocol.py | 73 ++++++++++++---- libp2p/relay/circuit_v2/resources.py | 26 +++++- 5 files changed, 194 insertions(+), 59 deletions(-) diff --git a/libp2p/relay/circuit_v2/config.py b/libp2p/relay/circuit_v2/config.py index 3315c74f..70046c6a 100644 --- a/libp2p/relay/circuit_v2/config.py +++ b/libp2p/relay/circuit_v2/config.py @@ -9,6 +9,7 @@ from dataclasses import ( dataclass, field, ) +from enum import Flag, auto from libp2p.peer.peerinfo import ( PeerInfo, @@ -18,29 +19,95 @@ from .resources import ( RelayLimits, ) +DEFAULT_MIN_RELAYS = 3 +DEFAULT_MAX_RELAYS = 20 +DEFAULT_DISCOVERY_INTERVAL = 300 # seconds +DEFAULT_RESERVATION_TTL = 3600 # seconds +DEFAULT_MAX_CIRCUIT_DURATION = 3600 # seconds +DEFAULT_MAX_CIRCUIT_BYTES = 1024 * 1024 * 1024 # 1GB + +DEFAULT_MAX_CIRCUIT_CONNS = 8 +DEFAULT_MAX_RESERVATIONS = 4 + +MAX_RESERVATIONS_PER_IP = 8 +MAX_CIRCUITS_PER_IP = 16 +RESERVATION_RATE_PER_IP = 4 # per minute +CIRCUIT_RATE_PER_IP = 8 # per minute +MAX_CIRCUITS_TOTAL = 64 +MAX_RESERVATIONS_TOTAL = 32 +MAX_BANDWIDTH_PER_CIRCUIT = 1024 * 1024 # 1MB/s +MAX_BANDWIDTH_TOTAL = 10 * 1024 * 1024 # 10MB/s + +MIN_RELAY_SCORE = 0.5 +MAX_RELAY_LATENCY = 1.0 # seconds +ENABLE_AUTO_RELAY = True +AUTO_RELAY_TIMEOUT = 30 # seconds +MAX_AUTO_RELAY_ATTEMPTS = 3 +RESERVATION_REFRESH_THRESHOLD = 0.8 # Refresh at 80% of TTL +MAX_CONCURRENT_RESERVATIONS = 2 + +# Shared timeout constants (used across modules) +STREAM_READ_TIMEOUT = 15 # seconds +STREAM_WRITE_TIMEOUT = 15 # seconds +STREAM_CLOSE_TIMEOUT = 10 # seconds +DIAL_TIMEOUT = 10 # seconds + +# NAT reachability timeout +REACHABILITY_TIMEOUT = 10 # seconds + +# Relay roles enum ----------------------------------------------------------- + + +class RelayRole(Flag): + """ + Bit-flag enum that captures the three possible relay capabilities. + + A node can combine multiple roles using bit-wise OR, for example:: + + RelayRole.HOP | RelayRole.STOP + """ + + HOP = auto() # Act as a relay for others ("hop") + STOP = auto() # Accept relayed connections ("stop") + CLIENT = auto() # Dial through existing relays ("client") + -@dataclass class RelayConfig: """Configuration for Circuit Relay v2.""" - # Role configuration - enable_hop: bool = False # Whether to act as a relay (hop) - enable_stop: bool = True # Whether to accept relayed connections (stop) - enable_client: bool = True # Whether to use relays for dialing + # Role configuration (bit-flags) + roles: RelayRole = RelayRole.STOP | RelayRole.CLIENT # Resource limits limits: RelayLimits | None = None # Discovery configuration bootstrap_relays: list[PeerInfo] = field(default_factory=list) - min_relays: int = 3 - max_relays: int = 20 - discovery_interval: int = 300 # seconds + min_relays: int = DEFAULT_MIN_RELAYS + max_relays: int = DEFAULT_MAX_RELAYS + discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL # Connection configuration - reservation_ttl: int = 3600 # seconds - max_circuit_duration: int = 3600 # seconds - max_circuit_bytes: int = 1024 * 1024 * 1024 # 1GB + reservation_ttl: int = DEFAULT_RESERVATION_TTL + max_circuit_duration: int = DEFAULT_MAX_CIRCUIT_DURATION + max_circuit_bytes: int = DEFAULT_MAX_CIRCUIT_BYTES + + # --------------------------------------------------------------------- + # Backwards-compat boolean helpers. Existing code that still accesses + # ``cfg.enable_hop, cfg.enable_stop, cfg.enable_client`` will continue to work. + # --------------------------------------------------------------------- + + @property + def enable_hop(self) -> bool: # pragma: no cover – helper + return bool(self.roles & RelayRole.HOP) + + @property + def enable_stop(self) -> bool: # pragma: no cover – helper + return bool(self.roles & RelayRole.STOP) + + @property + def enable_client(self) -> bool: # pragma: no cover – helper + return bool(self.roles & RelayRole.CLIENT) def __post_init__(self) -> None: """Initialize default values.""" @@ -48,8 +115,8 @@ class RelayConfig: self.limits = RelayLimits( duration=self.max_circuit_duration, data=self.max_circuit_bytes, - max_circuit_conns=8, - max_reservations=4, + max_circuit_conns=DEFAULT_MAX_CIRCUIT_CONNS, + max_reservations=DEFAULT_MAX_RESERVATIONS, ) @@ -58,20 +125,20 @@ class HopConfig: """Configuration specific to relay (hop) nodes.""" # Resource limits per IP - max_reservations_per_ip: int = 8 - max_circuits_per_ip: int = 16 + max_reservations_per_ip: int = MAX_RESERVATIONS_PER_IP + max_circuits_per_ip: int = MAX_CIRCUITS_PER_IP # Rate limiting - reservation_rate_per_ip: int = 4 # per minute - circuit_rate_per_ip: int = 8 # per minute + reservation_rate_per_ip: int = RESERVATION_RATE_PER_IP + circuit_rate_per_ip: int = CIRCUIT_RATE_PER_IP # Resource quotas - max_circuits_total: int = 64 - max_reservations_total: int = 32 + max_circuits_total: int = MAX_CIRCUITS_TOTAL + max_reservations_total: int = MAX_RESERVATIONS_TOTAL # Bandwidth limits - max_bandwidth_per_circuit: int = 1024 * 1024 # 1MB/s - max_bandwidth_total: int = 10 * 1024 * 1024 # 10MB/s + max_bandwidth_per_circuit: int = MAX_BANDWIDTH_PER_CIRCUIT + max_bandwidth_total: int = MAX_BANDWIDTH_TOTAL @dataclass @@ -79,14 +146,14 @@ class ClientConfig: """Configuration specific to relay clients.""" # Relay selection - min_relay_score: float = 0.5 - max_relay_latency: float = 1.0 # seconds + min_relay_score: float = MIN_RELAY_SCORE + max_relay_latency: float = MAX_RELAY_LATENCY # Auto-relay settings - enable_auto_relay: bool = True - auto_relay_timeout: int = 30 # seconds - max_auto_relay_attempts: int = 3 + enable_auto_relay: bool = ENABLE_AUTO_RELAY + auto_relay_timeout: int = AUTO_RELAY_TIMEOUT + max_auto_relay_attempts: int = MAX_AUTO_RELAY_ATTEMPTS # Reservation management - reservation_refresh_threshold: float = 0.8 # Refresh at 80% of TTL - max_concurrent_reservations: int = 2 + reservation_refresh_threshold: float = RESERVATION_REFRESH_THRESHOLD + max_concurrent_reservations: int = MAX_CONCURRENT_RESERVATIONS diff --git a/libp2p/relay/circuit_v2/dcutr.py b/libp2p/relay/circuit_v2/dcutr.py index 2cece5d2..a67ddd5e 100644 --- a/libp2p/relay/circuit_v2/dcutr.py +++ b/libp2p/relay/circuit_v2/dcutr.py @@ -39,6 +39,12 @@ from libp2p.tools.async_service import ( Service, ) +from .config import ( + DIAL_TIMEOUT, + STREAM_READ_TIMEOUT, + STREAM_WRITE_TIMEOUT, +) + logger = logging.getLogger(__name__) # Protocol ID for DCUtR @@ -47,11 +53,6 @@ PROTOCOL_ID = TProtocol("/libp2p/dcutr") # Maximum message size for DCUtR (4KiB as per spec) MAX_MESSAGE_SIZE = 4 * 1024 -# Timeouts -STREAM_READ_TIMEOUT = 30 # seconds -STREAM_WRITE_TIMEOUT = 30 # seconds -DIAL_TIMEOUT = 10 # seconds - # Maximum number of hole punch attempts per peer MAX_HOLE_PUNCH_ATTEMPTS = 5 diff --git a/libp2p/relay/circuit_v2/discovery.py b/libp2p/relay/circuit_v2/discovery.py index a35eacdc..798eaa3e 100644 --- a/libp2p/relay/circuit_v2/discovery.py +++ b/libp2p/relay/circuit_v2/discovery.py @@ -31,6 +31,9 @@ from libp2p.tools.async_service import ( Service, ) +from .config import ( + DEFAULT_DISCOVERY_INTERVAL as CFG_DISCOVERY_INTERVAL, +) from .pb.circuit_pb2 import ( HopMessage, ) @@ -43,10 +46,11 @@ from .protocol_buffer import ( logger = logging.getLogger("libp2p.relay.circuit_v2.discovery") -# Constants -MAX_RELAYS_TO_TRACK = 10 -DEFAULT_DISCOVERY_INTERVAL = 60 # seconds +# Constants (single-source-of-truth) +DEFAULT_DISCOVERY_INTERVAL = CFG_DISCOVERY_INTERVAL +MAX_RELAYS_TO_TRACK = 10 # Still discovery-specific STREAM_TIMEOUT = 10 # seconds +PEER_PROTOCOL_TIMEOUT = 5 # seconds # Extended interfaces for type checking @@ -165,20 +169,20 @@ class RelayDiscovery(Service): self._discovered_relays[peer_id].last_seen = time.time() continue - # Check if peer supports the relay protocol - with trio.move_on_after(5): # Don't wait too long for protocol info + # Don't wait too long for protocol info + with trio.move_on_after(PEER_PROTOCOL_TIMEOUT): if await self._supports_relay_protocol(peer_id): await self._add_relay(peer_id) # Limit number of relays we track - if len(self._discovered_relays) > self.max_relays: + if len(self._discovered_relays) > MAX_RELAYS_TO_TRACK: # Sort by last seen time and keep only the most recent ones sorted_relays = sorted( self._discovered_relays.items(), key=lambda x: x[1].last_seen, reverse=True, ) - to_remove = sorted_relays[self.max_relays :] + to_remove = sorted_relays[MAX_RELAYS_TO_TRACK:] for peer_id, _ in to_remove: del self._discovered_relays[peer_id] @@ -463,7 +467,7 @@ class RelayDiscovery(Service): for peer_id, relay_info in self._discovered_relays.items(): # Check if relay hasn't been seen in a while (3x discovery interval) - if now - relay_info.last_seen > self.discovery_interval * 3: + if now - relay_info.last_seen > DEFAULT_DISCOVERY_INTERVAL * 3: to_remove.append(peer_id) continue diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index 1cf76efa..ae852a1f 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -5,6 +5,7 @@ This module implements the Circuit Relay v2 protocol as specified in: https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md """ +from enum import Enum, auto import logging import time from typing import ( @@ -37,6 +38,15 @@ from libp2p.tools.async_service import ( Service, ) +from .config import ( + DEFAULT_MAX_CIRCUIT_BYTES, + DEFAULT_MAX_CIRCUIT_CONNS, + DEFAULT_MAX_CIRCUIT_DURATION, + DEFAULT_MAX_RESERVATIONS, + STREAM_CLOSE_TIMEOUT, + STREAM_READ_TIMEOUT, + STREAM_WRITE_TIMEOUT, +) from .pb.circuit_pb2 import ( HopMessage, Limit, @@ -58,18 +68,21 @@ logger = logging.getLogger("libp2p.relay.circuit_v2") PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0") STOP_PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0/stop") -# Default limits for relay resources + +# Direction enum for data piping +class Pipe(Enum): + SRC_TO_DST = auto() + DST_TO_SRC = auto() + + +# Default limits for relay resources (single source of truth) DEFAULT_RELAY_LIMITS = RelayLimits( - duration=60 * 60, # 1 hour - data=1024 * 1024 * 1024, # 1GB - max_circuit_conns=8, - max_reservations=4, + duration=DEFAULT_MAX_CIRCUIT_DURATION, + data=DEFAULT_MAX_CIRCUIT_BYTES, + max_circuit_conns=DEFAULT_MAX_CIRCUIT_CONNS, + max_reservations=DEFAULT_MAX_RESERVATIONS, ) -# Stream operation timeouts -STREAM_READ_TIMEOUT = 15 # seconds -STREAM_WRITE_TIMEOUT = 15 # seconds -STREAM_CLOSE_TIMEOUT = 10 # seconds MAX_READ_RETRIES = 5 # Maximum number of read retries @@ -458,8 +471,20 @@ class CircuitV2Protocol(Service): # Start relaying data async with trio.open_nursery() as nursery: - nursery.start_soon(self._relay_data, src_stream, stream, peer_id) - nursery.start_soon(self._relay_data, stream, src_stream, peer_id) + nursery.start_soon( + self._relay_data, + src_stream, + stream, + peer_id, + Pipe.SRC_TO_DST, + ) + nursery.start_soon( + self._relay_data, + stream, + src_stream, + peer_id, + Pipe.DST_TO_SRC, + ) except trio.TooSlowError: logger.error("Timeout reading from stop stream") @@ -648,8 +673,20 @@ class CircuitV2Protocol(Service): # Start relaying data async with trio.open_nursery() as nursery: - nursery.start_soon(self._relay_data, stream, dst_stream, peer_id) - nursery.start_soon(self._relay_data, dst_stream, stream, peer_id) + nursery.start_soon( + self._relay_data, + stream, + dst_stream, + peer_id, + Pipe.SRC_TO_DST, + ) + nursery.start_soon( + self._relay_data, + dst_stream, + stream, + peer_id, + Pipe.DST_TO_SRC, + ) except (trio.TooSlowError, ConnectionError) as e: logger.error("Error establishing relay connection: %s", str(e)) @@ -685,6 +722,7 @@ class CircuitV2Protocol(Service): src_stream: INetStream, dst_stream: INetStream, peer_id: ID, + direction: Pipe, ) -> None: """ Relay data between two streams. @@ -698,13 +736,16 @@ class CircuitV2Protocol(Service): peer_id : ID ID of the peer being relayed + direction : Pipe + Direction of data flow (``Pipe.SRC_TO_DST`` or ``Pipe.DST_TO_SRC``) + """ try: while True: # Read data with retries data = await self._read_stream_with_retry(src_stream) if not data: - logger.info("Source stream closed/reset") + logger.info("%s closed/reset", direction.name) break # Write data with timeout @@ -712,10 +753,10 @@ class CircuitV2Protocol(Service): with trio.fail_after(STREAM_WRITE_TIMEOUT): await dst_stream.write(data) except trio.TooSlowError: - logger.error("Timeout writing to destination stream") + logger.error("Timeout writing in %s", direction.name) break except Exception as e: - logger.error("Error writing to destination stream: %s", str(e)) + logger.error("Error writing in %s: %s", direction.name, str(e)) break # Update resource usage diff --git a/libp2p/relay/circuit_v2/resources.py b/libp2p/relay/circuit_v2/resources.py index 4da67ec6..bd5d5fe0 100644 --- a/libp2p/relay/circuit_v2/resources.py +++ b/libp2p/relay/circuit_v2/resources.py @@ -8,6 +8,7 @@ including reservations and connection limits. from dataclasses import ( dataclass, ) +from enum import Enum, auto import hashlib import os import time @@ -19,6 +20,18 @@ from libp2p.peer.id import ( # Import the protobuf definitions from .pb.circuit_pb2 import Reservation as PbReservation +RANDOM_BYTES_LENGTH = 16 # 128 bits of randomness +TIMESTAMP_MULTIPLIER = 1000000 # To convert seconds to microseconds + + +# Reservation status enum +class ReservationStatus(Enum): + """Lifecycle status of a relay reservation.""" + + ACTIVE = auto() + EXPIRED = auto() + REJECTED = auto() + @dataclass class RelayLimits: @@ -68,8 +81,8 @@ class Reservation: # - Peer ID to bind it to the specific peer # - Timestamp for uniqueness # - Hash everything for a fixed size output - random_bytes = os.urandom(16) # 128 bits of randomness - timestamp = str(int(self.created_at * 1000000)).encode() + random_bytes = os.urandom(RANDOM_BYTES_LENGTH) + timestamp = str(int(self.created_at * TIMESTAMP_MULTIPLIER)).encode() peer_bytes = self.peer_id.to_bytes() # Combine all elements and hash them @@ -84,6 +97,15 @@ class Reservation: """Check if the reservation has expired.""" return time.time() > self.expires_at + # Expose a friendly status enum -------------------------------------- + + @property + def status(self) -> ReservationStatus: + """Return the current status as a ``ReservationStatus`` enum.""" + return ( + ReservationStatus.EXPIRED if self.is_expired() else ReservationStatus.ACTIVE + ) + def can_accept_connection(self) -> bool: """Check if a new connection can be accepted.""" return ( From 87936675030590f2d8de81c4ce57c36595926d14 Mon Sep 17 00:00:00 2001 From: parth-soni07 Date: Thu, 11 Sep 2025 14:18:40 +0530 Subject: [PATCH 2/8] Updated config & minor changes --- libp2p/relay/circuit_v2/config.py | 13 ++----------- libp2p/relay/circuit_v2/dcutr.py | 11 +++++------ libp2p/relay/circuit_v2/discovery.py | 9 +++------ libp2p/relay/circuit_v2/protocol.py | 9 +++++---- libp2p/relay/circuit_v2/resources.py | 2 +- 5 files changed, 16 insertions(+), 28 deletions(-) diff --git a/libp2p/relay/circuit_v2/config.py b/libp2p/relay/circuit_v2/config.py index 70046c6a..8eafbe91 100644 --- a/libp2p/relay/circuit_v2/config.py +++ b/libp2p/relay/circuit_v2/config.py @@ -46,18 +46,8 @@ MAX_AUTO_RELAY_ATTEMPTS = 3 RESERVATION_REFRESH_THRESHOLD = 0.8 # Refresh at 80% of TTL MAX_CONCURRENT_RESERVATIONS = 2 -# Shared timeout constants (used across modules) -STREAM_READ_TIMEOUT = 15 # seconds -STREAM_WRITE_TIMEOUT = 15 # seconds -STREAM_CLOSE_TIMEOUT = 10 # seconds -DIAL_TIMEOUT = 10 # seconds - -# NAT reachability timeout -REACHABILITY_TIMEOUT = 10 # seconds - -# Relay roles enum ----------------------------------------------------------- - +# Relay roles enum class RelayRole(Flag): """ Bit-flag enum that captures the three possible relay capabilities. @@ -72,6 +62,7 @@ class RelayRole(Flag): CLIENT = auto() # Dial through existing relays ("client") +@dataclass class RelayConfig: """Configuration for Circuit Relay v2.""" diff --git a/libp2p/relay/circuit_v2/dcutr.py b/libp2p/relay/circuit_v2/dcutr.py index a67ddd5e..2cece5d2 100644 --- a/libp2p/relay/circuit_v2/dcutr.py +++ b/libp2p/relay/circuit_v2/dcutr.py @@ -39,12 +39,6 @@ from libp2p.tools.async_service import ( Service, ) -from .config import ( - DIAL_TIMEOUT, - STREAM_READ_TIMEOUT, - STREAM_WRITE_TIMEOUT, -) - logger = logging.getLogger(__name__) # Protocol ID for DCUtR @@ -53,6 +47,11 @@ PROTOCOL_ID = TProtocol("/libp2p/dcutr") # Maximum message size for DCUtR (4KiB as per spec) MAX_MESSAGE_SIZE = 4 * 1024 +# Timeouts +STREAM_READ_TIMEOUT = 30 # seconds +STREAM_WRITE_TIMEOUT = 30 # seconds +DIAL_TIMEOUT = 10 # seconds + # Maximum number of hole punch attempts per peer MAX_HOLE_PUNCH_ATTEMPTS = 5 diff --git a/libp2p/relay/circuit_v2/discovery.py b/libp2p/relay/circuit_v2/discovery.py index 798eaa3e..45775647 100644 --- a/libp2p/relay/circuit_v2/discovery.py +++ b/libp2p/relay/circuit_v2/discovery.py @@ -31,9 +31,6 @@ from libp2p.tools.async_service import ( Service, ) -from .config import ( - DEFAULT_DISCOVERY_INTERVAL as CFG_DISCOVERY_INTERVAL, -) from .pb.circuit_pb2 import ( HopMessage, ) @@ -46,9 +43,9 @@ from .protocol_buffer import ( logger = logging.getLogger("libp2p.relay.circuit_v2.discovery") -# Constants (single-source-of-truth) -DEFAULT_DISCOVERY_INTERVAL = CFG_DISCOVERY_INTERVAL -MAX_RELAYS_TO_TRACK = 10 # Still discovery-specific +# Constants +MAX_RELAYS_TO_TRACK = 10 +DEFAULT_DISCOVERY_INTERVAL = 60 # seconds STREAM_TIMEOUT = 10 # seconds PEER_PROTOCOL_TIMEOUT = 5 # seconds diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index ae852a1f..3c378897 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -43,9 +43,6 @@ from .config import ( DEFAULT_MAX_CIRCUIT_CONNS, DEFAULT_MAX_CIRCUIT_DURATION, DEFAULT_MAX_RESERVATIONS, - STREAM_CLOSE_TIMEOUT, - STREAM_READ_TIMEOUT, - STREAM_WRITE_TIMEOUT, ) from .pb.circuit_pb2 import ( HopMessage, @@ -75,7 +72,7 @@ class Pipe(Enum): DST_TO_SRC = auto() -# Default limits for relay resources (single source of truth) +# Default limits for relay resources DEFAULT_RELAY_LIMITS = RelayLimits( duration=DEFAULT_MAX_CIRCUIT_DURATION, data=DEFAULT_MAX_CIRCUIT_BYTES, @@ -83,6 +80,10 @@ DEFAULT_RELAY_LIMITS = RelayLimits( max_reservations=DEFAULT_MAX_RESERVATIONS, ) +# Stream operation timeouts +STREAM_READ_TIMEOUT = 15 # seconds +STREAM_WRITE_TIMEOUT = 15 # seconds +STREAM_CLOSE_TIMEOUT = 10 # seconds MAX_READ_RETRIES = 5 # Maximum number of read retries diff --git a/libp2p/relay/circuit_v2/resources.py b/libp2p/relay/circuit_v2/resources.py index bd5d5fe0..d621990d 100644 --- a/libp2p/relay/circuit_v2/resources.py +++ b/libp2p/relay/circuit_v2/resources.py @@ -97,7 +97,7 @@ class Reservation: """Check if the reservation has expired.""" return time.time() > self.expires_at - # Expose a friendly status enum -------------------------------------- + # Expose a friendly status enum @property def status(self) -> ReservationStatus: From 52625e0f68282c76e8f9d57b998092afdb21d9fc Mon Sep 17 00:00:00 2001 From: acul71 <34693171+acul71@users.noreply.github.com> Date: Sun, 14 Sep 2025 19:45:22 -0400 Subject: [PATCH 3/8] Fix multiaddr dep to use specific commit hash to resolve install issue (#928) * Fix multiaddr dependency to use specific commit hash to resolve installation issues * fix: ops wrong filename --- newsfragments/927.bugfix.rst | 1 + pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 newsfragments/927.bugfix.rst diff --git a/newsfragments/927.bugfix.rst b/newsfragments/927.bugfix.rst new file mode 100644 index 00000000..99573ff9 --- /dev/null +++ b/newsfragments/927.bugfix.rst @@ -0,0 +1 @@ +Fix multiaddr dependency to use the last py-multiaddr commit hash to resolve installation issues diff --git a/pyproject.toml b/pyproject.toml index ab4824ab..86be25d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "grpcio>=1.41.0", "lru-dict>=1.1.6", # "multiaddr (>=0.0.9,<0.0.10)", - "multiaddr @ git+https://github.com/multiformats/py-multiaddr.git@3ea7f866fda9268ee92506edf9d8e975274bf941", + "multiaddr @ git+https://github.com/multiformats/py-multiaddr.git@b186e2ccadc22545dec4069ff313787bf29265e0", "mypy-protobuf>=3.0.0", "noiseprotocol>=0.3.0", "protobuf>=4.25.0,<5.0.0", From 02ff688b5af2a54873d192d228a981ed1b075071 Mon Sep 17 00:00:00 2001 From: unniznd Date: Thu, 4 Sep 2025 14:58:22 +0530 Subject: [PATCH 4/8] Added timeout passing in muxermultistream. Updated the usages. Tested the params are passed correctly --- libp2p/host/basic_host.py | 3 +- libp2p/stream_muxer/muxer_multistream.py | 17 ++- libp2p/transport/upgrader.py | 8 +- newsfragments/896.bugfix.rst | 1 + .../stream_muxer/test_muxer_multistream.py | 108 ++++++++++++++++++ tests/core/transport/test_upgrader.py | 27 +++++ 6 files changed, 157 insertions(+), 7 deletions(-) create mode 100644 newsfragments/896.bugfix.rst create mode 100644 tests/core/stream_muxer/test_muxer_multistream.py create mode 100644 tests/core/transport/test_upgrader.py diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index e370a3de..6b7eb1d3 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -213,7 +213,6 @@ class BasicHost(IHost): self, peer_id: ID, protocol_ids: Sequence[TProtocol], - negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> INetStream: """ :param peer_id: peer_id that host is connecting @@ -227,7 +226,7 @@ class BasicHost(IHost): selected_protocol = await self.multiselect_client.select_one_of( list(protocol_ids), MultiselectCommunicator(net_stream), - negotitate_timeout, + self.negotiate_timeout, ) except MultiselectClientError as error: logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index ef90fac0..2d206141 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -21,6 +21,7 @@ from libp2p.protocol_muxer.exceptions import ( MultiselectError, ) from libp2p.protocol_muxer.multiselect import ( + DEFAULT_NEGOTIATE_TIMEOUT, Multiselect, ) from libp2p.protocol_muxer.multiselect_client import ( @@ -46,11 +47,17 @@ class MuxerMultistream: transports: "OrderedDict[TProtocol, TMuxerClass]" multiselect: Multiselect multiselect_client: MultiselectClient + negotiate_timeout: int - def __init__(self, muxer_transports_by_protocol: TMuxerOptions) -> None: + def __init__( + self, + muxer_transports_by_protocol: TMuxerOptions, + negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, + ) -> None: self.transports = OrderedDict() self.multiselect = Multiselect() self.multistream_client = MultiselectClient() + self.negotiate_timeout = negotiate_timeout for protocol, transport in muxer_transports_by_protocol.items(): self.add_transport(protocol, transport) @@ -80,10 +87,12 @@ class MuxerMultistream: communicator = MultiselectCommunicator(conn) if conn.is_initiator: protocol = await self.multiselect_client.select_one_of( - tuple(self.transports.keys()), communicator + tuple(self.transports.keys()), communicator, self.negotiate_timeout ) else: - protocol, _ = await self.multiselect.negotiate(communicator) + protocol, _ = await self.multiselect.negotiate( + communicator, self.negotiate_timeout + ) if protocol is None: raise MultiselectError( "Fail to negotiate a stream muxer protocol: no protocol selected" @@ -93,7 +102,7 @@ class MuxerMultistream: async def new_conn(self, conn: ISecureConn, peer_id: ID) -> IMuxedConn: communicator = MultiselectCommunicator(conn) protocol = await self.multistream_client.select_one_of( - tuple(self.transports.keys()), communicator + tuple(self.transports.keys()), communicator, self.negotiate_timeout ) transport_class = self.transports[protocol] if protocol == PROTOCOL_ID: diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 40ba5321..dad2ad72 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -14,6 +14,9 @@ from libp2p.protocol_muxer.exceptions import ( MultiselectClientError, MultiselectError, ) +from libp2p.protocol_muxer.multiselect import ( + DEFAULT_NEGOTIATE_TIMEOUT, +) from libp2p.security.exceptions import ( HandshakeFailure, ) @@ -37,9 +40,12 @@ class TransportUpgrader: self, secure_transports_by_protocol: TSecurityOptions, muxer_transports_by_protocol: TMuxerOptions, + negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ): self.security_multistream = SecurityMultistream(secure_transports_by_protocol) - self.muxer_multistream = MuxerMultistream(muxer_transports_by_protocol) + self.muxer_multistream = MuxerMultistream( + muxer_transports_by_protocol, negotiate_timeout + ) async def upgrade_security( self, diff --git a/newsfragments/896.bugfix.rst b/newsfragments/896.bugfix.rst new file mode 100644 index 00000000..aaf338d4 --- /dev/null +++ b/newsfragments/896.bugfix.rst @@ -0,0 +1 @@ +Exposed timeout method in muxer multistream and updated all the usage. Added testcases to verify that timeout value is passed correctly diff --git a/tests/core/stream_muxer/test_muxer_multistream.py b/tests/core/stream_muxer/test_muxer_multistream.py new file mode 100644 index 00000000..070d47ae --- /dev/null +++ b/tests/core/stream_muxer/test_muxer_multistream.py @@ -0,0 +1,108 @@ +from unittest.mock import ( + AsyncMock, + MagicMock, +) + +import pytest + +from libp2p.custom_types import ( + TMuxerClass, + TProtocol, +) +from libp2p.peer.id import ( + ID, +) +from libp2p.protocol_muxer.exceptions import ( + MultiselectError, +) +from libp2p.stream_muxer.muxer_multistream import ( + MuxerMultistream, +) + + +@pytest.mark.trio +async def test_muxer_timeout_configuration(): + """Test that muxer respects timeout configuration.""" + muxer = MuxerMultistream({}, negotiate_timeout=1) + assert muxer.negotiate_timeout == 1 + + +@pytest.mark.trio +async def test_select_transport_passes_timeout_to_multiselect(): + """Test that timeout is passed to multiselect client in select_transport.""" + # Mock dependencies + mock_conn = MagicMock() + mock_conn.is_initiator = False + + # Mock MultiselectClient + muxer = MuxerMultistream({}, negotiate_timeout=10) + muxer.multiselect.negotiate = AsyncMock(return_value=("mock_protocol", None)) + muxer.transports[TProtocol("mock_protocol")] = MagicMock(return_value=MagicMock()) + + # Call select_transport + await muxer.select_transport(mock_conn) + + # Verify that select_one_of was called with the correct timeout + args, _ = muxer.multiselect.negotiate.call_args + assert args[1] == 10 + + +@pytest.mark.trio +async def test_new_conn_passes_timeout_to_multistream_client(): + """Test that timeout is passed to multistream client in new_conn.""" + # Mock dependencies + mock_conn = MagicMock() + mock_conn.is_initiator = True + mock_peer_id = ID(b"test_peer") + mock_communicator = MagicMock() + + # Mock MultistreamClient and transports + muxer = MuxerMultistream({}, negotiate_timeout=30) + muxer.multistream_client.select_one_of = AsyncMock(return_value="mock_protocol") + muxer.transports[TProtocol("mock_protocol")] = MagicMock(return_value=MagicMock()) + + # Call new_conn + await muxer.new_conn(mock_conn, mock_peer_id) + + # Verify that select_one_of was called with the correct timeout + muxer.multistream_client.select_one_of( + tuple(muxer.transports.keys()), mock_communicator, 30 + ) + + +@pytest.mark.trio +async def test_select_transport_no_protocol_selected(): + """ + Test that select_transport raises MultiselectError when no protocol is selected. + """ + # Mock dependencies + mock_conn = MagicMock() + mock_conn.is_initiator = False + + # Mock Multiselect to return None + muxer = MuxerMultistream({}, negotiate_timeout=30) + muxer.multiselect.negotiate = AsyncMock(return_value=(None, None)) + + # Expect MultiselectError to be raised + with pytest.raises(MultiselectError, match="no protocol selected"): + await muxer.select_transport(mock_conn) + + +@pytest.mark.trio +async def test_add_transport_updates_precedence(): + """Test that adding a transport updates protocol precedence.""" + # Mock transport classes + mock_transport1 = MagicMock(spec=TMuxerClass) + mock_transport2 = MagicMock(spec=TMuxerClass) + + # Initialize muxer and add transports + muxer = MuxerMultistream({}, negotiate_timeout=30) + muxer.add_transport(TProtocol("proto1"), mock_transport1) + muxer.add_transport(TProtocol("proto2"), mock_transport2) + + # Verify transport order + assert list(muxer.transports.keys()) == ["proto1", "proto2"] + + # Re-add proto1 to check if it moves to the end + muxer.add_transport(TProtocol("proto1"), mock_transport1) + assert list(muxer.transports.keys()) == ["proto2", "proto1"] diff --git a/tests/core/transport/test_upgrader.py b/tests/core/transport/test_upgrader.py new file mode 100644 index 00000000..8535a039 --- /dev/null +++ b/tests/core/transport/test_upgrader.py @@ -0,0 +1,27 @@ +import pytest + +from libp2p.custom_types import ( + TMuxerOptions, + TSecurityOptions, +) +from libp2p.transport.upgrader import ( + TransportUpgrader, +) + + +@pytest.mark.trio +async def test_transport_upgrader_security_and_muxer_initialization(): + """Test TransportUpgrader initializes security and muxer multistreams correctly.""" + secure_transports: TSecurityOptions = {} + muxer_transports: TMuxerOptions = {} + negotiate_timeout = 15 + + upgrader = TransportUpgrader( + secure_transports, muxer_transports, negotiate_timeout=negotiate_timeout + ) + + # Verify security multistream initialization + assert upgrader.security_multistream.transports == secure_transports + # Verify muxer multistream initialization and timeout + assert upgrader.muxer_multistream.transports == muxer_transports + assert upgrader.muxer_multistream.negotiate_timeout == negotiate_timeout From 35a4bf2d426c078f4d23e5cceaa234aa0a5c583c Mon Sep 17 00:00:00 2001 From: acul71 Date: Tue, 16 Sep 2025 20:09:10 -0400 Subject: [PATCH 5/8] Update multiaddr to version 0.0.11 - Switch from git dependency to pip package - Update from git+https://github.com/multiformats/py-multiaddr.git@b186e2ccadc22545dec4069ff313787bf29265e0 - Use multiaddr>=0.0.11 from PyPI Fixes #934 --- newsfragments/934.misc.rst | 1 + pyproject.toml | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 newsfragments/934.misc.rst diff --git a/newsfragments/934.misc.rst b/newsfragments/934.misc.rst new file mode 100644 index 00000000..0a6d9120 --- /dev/null +++ b/newsfragments/934.misc.rst @@ -0,0 +1 @@ +Updated multiaddr dependency from git repository to pip package version 0.0.11. diff --git a/pyproject.toml b/pyproject.toml index 86be25d1..dbe2267a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,8 +23,7 @@ dependencies = [ "fastecdsa==2.3.2; sys_platform != 'win32'", "grpcio>=1.41.0", "lru-dict>=1.1.6", - # "multiaddr (>=0.0.9,<0.0.10)", - "multiaddr @ git+https://github.com/multiformats/py-multiaddr.git@b186e2ccadc22545dec4069ff313787bf29265e0", + "multiaddr>=0.0.11", "mypy-protobuf>=3.0.0", "noiseprotocol>=0.3.0", "protobuf>=4.25.0,<5.0.0", From 721da9364e52d25c2d186f63e4df32eba6fd79de Mon Sep 17 00:00:00 2001 From: parth-soni07 Date: Sun, 21 Sep 2025 01:36:06 +0530 Subject: [PATCH 6/8] Fixed variable imports --- libp2p/relay/circuit_v2/config.py | 32 +++++++++++++++++++++ libp2p/relay/circuit_v2/dcutr.py | 42 +++++++++++++++++++--------- libp2p/relay/circuit_v2/discovery.py | 32 +++++++++++++-------- libp2p/relay/circuit_v2/protocol.py | 40 ++++++++++++++++---------- libp2p/relay/circuit_v2/transport.py | 2 ++ newsfragments/917.internal.rst | 11 ++++++++ 6 files changed, 121 insertions(+), 38 deletions(-) create mode 100644 newsfragments/917.internal.rst diff --git a/libp2p/relay/circuit_v2/config.py b/libp2p/relay/circuit_v2/config.py index 8eafbe91..d56839e0 100644 --- a/libp2p/relay/circuit_v2/config.py +++ b/libp2p/relay/circuit_v2/config.py @@ -46,6 +46,35 @@ MAX_AUTO_RELAY_ATTEMPTS = 3 RESERVATION_REFRESH_THRESHOLD = 0.8 # Refresh at 80% of TTL MAX_CONCURRENT_RESERVATIONS = 2 +# Timeout constants for different components +DEFAULT_DISCOVERY_STREAM_TIMEOUT = 10 # seconds +DEFAULT_PEER_PROTOCOL_TIMEOUT = 5 # seconds +DEFAULT_PROTOCOL_READ_TIMEOUT = 15 # seconds +DEFAULT_PROTOCOL_WRITE_TIMEOUT = 15 # seconds +DEFAULT_PROTOCOL_CLOSE_TIMEOUT = 10 # seconds +DEFAULT_DCUTR_READ_TIMEOUT = 30 # seconds +DEFAULT_DCUTR_WRITE_TIMEOUT = 30 # seconds +DEFAULT_DIAL_TIMEOUT = 10 # seconds + + +@dataclass +class TimeoutConfig: + """Timeout configuration for different Circuit Relay v2 components.""" + + # Discovery timeouts + discovery_stream_timeout: int = DEFAULT_DISCOVERY_STREAM_TIMEOUT + peer_protocol_timeout: int = DEFAULT_PEER_PROTOCOL_TIMEOUT + + # Core protocol timeouts + protocol_read_timeout: int = DEFAULT_PROTOCOL_READ_TIMEOUT + protocol_write_timeout: int = DEFAULT_PROTOCOL_WRITE_TIMEOUT + protocol_close_timeout: int = DEFAULT_PROTOCOL_CLOSE_TIMEOUT + + # DCUtR timeouts + dcutr_read_timeout: int = DEFAULT_DCUTR_READ_TIMEOUT + dcutr_write_timeout: int = DEFAULT_DCUTR_WRITE_TIMEOUT + dial_timeout: int = DEFAULT_DIAL_TIMEOUT + # Relay roles enum class RelayRole(Flag): @@ -83,6 +112,9 @@ class RelayConfig: max_circuit_duration: int = DEFAULT_MAX_CIRCUIT_DURATION max_circuit_bytes: int = DEFAULT_MAX_CIRCUIT_BYTES + # Timeout configuration + timeouts: TimeoutConfig = field(default_factory=TimeoutConfig) + # --------------------------------------------------------------------- # Backwards-compat boolean helpers. Existing code that still accesses # ``cfg.enable_hop, cfg.enable_stop, cfg.enable_client`` will continue to work. diff --git a/libp2p/relay/circuit_v2/dcutr.py b/libp2p/relay/circuit_v2/dcutr.py index 2cece5d2..644ea75f 100644 --- a/libp2p/relay/circuit_v2/dcutr.py +++ b/libp2p/relay/circuit_v2/dcutr.py @@ -29,6 +29,11 @@ from libp2p.peer.id import ( from libp2p.peer.peerinfo import ( PeerInfo, ) +from libp2p.relay.circuit_v2.config import ( + DEFAULT_DCUTR_READ_TIMEOUT, + DEFAULT_DCUTR_WRITE_TIMEOUT, + DEFAULT_DIAL_TIMEOUT, +) from libp2p.relay.circuit_v2.nat import ( ReachabilityChecker, ) @@ -47,11 +52,7 @@ PROTOCOL_ID = TProtocol("/libp2p/dcutr") # Maximum message size for DCUtR (4KiB as per spec) MAX_MESSAGE_SIZE = 4 * 1024 -# Timeouts -STREAM_READ_TIMEOUT = 30 # seconds -STREAM_WRITE_TIMEOUT = 30 # seconds -DIAL_TIMEOUT = 10 # seconds - +# DCUtR protocol constants # Maximum number of hole punch attempts per peer MAX_HOLE_PUNCH_ATTEMPTS = 5 @@ -70,7 +71,13 @@ class DCUtRProtocol(Service): hole punching, after they have established an initial connection through a relay. """ - def __init__(self, host: IHost): + def __init__( + self, + host: IHost, + read_timeout: int = DEFAULT_DCUTR_READ_TIMEOUT, + write_timeout: int = DEFAULT_DCUTR_WRITE_TIMEOUT, + dial_timeout: int = DEFAULT_DIAL_TIMEOUT, + ): """ Initialize the DCUtR protocol. @@ -78,10 +85,19 @@ class DCUtRProtocol(Service): ---------- host : IHost The libp2p host this protocol is running on + read_timeout : int + Timeout for stream read operations, in seconds + write_timeout : int + Timeout for stream write operations, in seconds + dial_timeout : int + Timeout for dial operations, in seconds """ super().__init__() self.host = host + self.read_timeout = read_timeout + self.write_timeout = write_timeout + self.dial_timeout = dial_timeout self.event_started = trio.Event() self._hole_punch_attempts: dict[ID, int] = {} self._direct_connections: set[ID] = set() @@ -161,7 +177,7 @@ class DCUtRProtocol(Service): try: # Read the CONNECT message - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): msg_bytes = await stream.read(MAX_MESSAGE_SIZE) # Parse the message @@ -196,7 +212,7 @@ class DCUtRProtocol(Service): response.type = HolePunch.CONNECT response.ObsAddrs.extend(our_addrs) - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): await stream.write(response.SerializeToString()) logger.debug( @@ -206,7 +222,7 @@ class DCUtRProtocol(Service): ) # Wait for SYNC message - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): sync_bytes = await stream.read(MAX_MESSAGE_SIZE) # Parse the SYNC message @@ -300,7 +316,7 @@ class DCUtRProtocol(Service): connect_msg.ObsAddrs.extend(our_addrs) start_time = time.time() - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): await stream.write(connect_msg.SerializeToString()) logger.debug( @@ -310,7 +326,7 @@ class DCUtRProtocol(Service): ) # Receive the peer's CONNECT message - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): resp_bytes = await stream.read(MAX_MESSAGE_SIZE) # Calculate RTT @@ -349,7 +365,7 @@ class DCUtRProtocol(Service): sync_msg = HolePunch() sync_msg.type = HolePunch.SYNC - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): await stream.write(sync_msg.SerializeToString()) logger.debug("Sent SYNC message to %s", peer_id) @@ -468,7 +484,7 @@ class DCUtRProtocol(Service): peer_info = PeerInfo(peer_id, [addr]) # Try to connect with timeout - with trio.fail_after(DIAL_TIMEOUT): + with trio.fail_after(self.dial_timeout): await self.host.connect(peer_info) logger.info("Successfully connected to %s at %s", peer_id, addr) diff --git a/libp2p/relay/circuit_v2/discovery.py b/libp2p/relay/circuit_v2/discovery.py index 45775647..50ee8d90 100644 --- a/libp2p/relay/circuit_v2/discovery.py +++ b/libp2p/relay/circuit_v2/discovery.py @@ -31,6 +31,11 @@ from libp2p.tools.async_service import ( Service, ) +from .config import ( + DEFAULT_DISCOVERY_INTERVAL, + DEFAULT_DISCOVERY_STREAM_TIMEOUT, + DEFAULT_PEER_PROTOCOL_TIMEOUT, +) from .pb.circuit_pb2 import ( HopMessage, ) @@ -43,11 +48,8 @@ from .protocol_buffer import ( logger = logging.getLogger("libp2p.relay.circuit_v2.discovery") -# Constants +# Discovery constants MAX_RELAYS_TO_TRACK = 10 -DEFAULT_DISCOVERY_INTERVAL = 60 # seconds -STREAM_TIMEOUT = 10 # seconds -PEER_PROTOCOL_TIMEOUT = 5 # seconds # Extended interfaces for type checking @@ -87,6 +89,8 @@ class RelayDiscovery(Service): auto_reserve: bool = False, discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL, max_relays: int = MAX_RELAYS_TO_TRACK, + stream_timeout: int = DEFAULT_DISCOVERY_STREAM_TIMEOUT, + peer_protocol_timeout: int = DEFAULT_PEER_PROTOCOL_TIMEOUT, ) -> None: """ Initialize the discovery service. @@ -101,6 +105,10 @@ class RelayDiscovery(Service): How often to run discovery, in seconds max_relays : int Maximum number of relays to track + stream_timeout : int + Timeout for stream operations during discovery, in seconds + peer_protocol_timeout : int + Timeout for checking peer protocol support, in seconds """ super().__init__() @@ -108,6 +116,8 @@ class RelayDiscovery(Service): self.auto_reserve = auto_reserve self.discovery_interval = discovery_interval self.max_relays = max_relays + self.stream_timeout = stream_timeout + self.peer_protocol_timeout = peer_protocol_timeout self._discovered_relays: dict[ID, RelayInfo] = {} self._protocol_cache: dict[ ID, set[str] @@ -167,19 +177,19 @@ class RelayDiscovery(Service): continue # Don't wait too long for protocol info - with trio.move_on_after(PEER_PROTOCOL_TIMEOUT): + with trio.move_on_after(self.peer_protocol_timeout): if await self._supports_relay_protocol(peer_id): await self._add_relay(peer_id) # Limit number of relays we track - if len(self._discovered_relays) > MAX_RELAYS_TO_TRACK: + if len(self._discovered_relays) > self.max_relays: # Sort by last seen time and keep only the most recent ones sorted_relays = sorted( self._discovered_relays.items(), key=lambda x: x[1].last_seen, reverse=True, ) - to_remove = sorted_relays[MAX_RELAYS_TO_TRACK:] + to_remove = sorted_relays[self.max_relays :] for peer_id, _ in to_remove: del self._discovered_relays[peer_id] @@ -265,7 +275,7 @@ class RelayDiscovery(Service): async def _check_via_direct_connection(self, peer_id: ID) -> bool | None: """Check protocol support via direct connection.""" try: - with trio.fail_after(STREAM_TIMEOUT): + with trio.fail_after(self.stream_timeout): stream = await self.host.new_stream(peer_id, [PROTOCOL_ID]) if stream: await stream.close() @@ -371,7 +381,7 @@ class RelayDiscovery(Service): # Open a stream to the relay with timeout try: - with trio.fail_after(STREAM_TIMEOUT): + with trio.fail_after(self.stream_timeout): stream = await self.host.new_stream(peer_id, [PROTOCOL_ID]) if not stream: logger.error("Failed to open stream to relay %s", peer_id) @@ -387,7 +397,7 @@ class RelayDiscovery(Service): peer=self.host.get_id().to_bytes(), ) - with trio.fail_after(STREAM_TIMEOUT): + with trio.fail_after(self.stream_timeout): await stream.write(request.SerializeToString()) # Wait for response @@ -464,7 +474,7 @@ class RelayDiscovery(Service): for peer_id, relay_info in self._discovered_relays.items(): # Check if relay hasn't been seen in a while (3x discovery interval) - if now - relay_info.last_seen > DEFAULT_DISCOVERY_INTERVAL * 3: + if now - relay_info.last_seen > self.discovery_interval * 3: to_remove.append(peer_id) continue diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index 3c378897..a6a80c20 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -43,6 +43,9 @@ from .config import ( DEFAULT_MAX_CIRCUIT_CONNS, DEFAULT_MAX_CIRCUIT_DURATION, DEFAULT_MAX_RESERVATIONS, + DEFAULT_PROTOCOL_CLOSE_TIMEOUT, + DEFAULT_PROTOCOL_READ_TIMEOUT, + DEFAULT_PROTOCOL_WRITE_TIMEOUT, ) from .pb.circuit_pb2 import ( HopMessage, @@ -80,10 +83,7 @@ DEFAULT_RELAY_LIMITS = RelayLimits( max_reservations=DEFAULT_MAX_RESERVATIONS, ) -# Stream operation timeouts -STREAM_READ_TIMEOUT = 15 # seconds -STREAM_WRITE_TIMEOUT = 15 # seconds -STREAM_CLOSE_TIMEOUT = 10 # seconds +# Stream operation constants MAX_READ_RETRIES = 5 # Maximum number of read retries @@ -127,6 +127,9 @@ class CircuitV2Protocol(Service): host: IHost, limits: RelayLimits | None = None, allow_hop: bool = False, + read_timeout: int = DEFAULT_PROTOCOL_READ_TIMEOUT, + write_timeout: int = DEFAULT_PROTOCOL_WRITE_TIMEOUT, + close_timeout: int = DEFAULT_PROTOCOL_CLOSE_TIMEOUT, ) -> None: """ Initialize a Circuit Relay v2 protocol instance. @@ -139,11 +142,20 @@ class CircuitV2Protocol(Service): Resource limits for the relay allow_hop : bool Whether to allow this node to act as a relay + read_timeout : int + Timeout for stream read operations, in seconds + write_timeout : int + Timeout for stream write operations, in seconds + close_timeout : int + Timeout for stream close operations, in seconds """ self.host = host self.limits = limits or DEFAULT_RELAY_LIMITS self.allow_hop = allow_hop + self.read_timeout = read_timeout + self.write_timeout = write_timeout + self.close_timeout = close_timeout self.resource_manager = RelayResourceManager(self.limits) self._active_relays: dict[ID, tuple[INetStream, INetStream | None]] = {} self.event_started = trio.Event() @@ -188,7 +200,7 @@ class CircuitV2Protocol(Service): return try: - with trio.fail_after(STREAM_CLOSE_TIMEOUT): + with trio.fail_after(self.close_timeout): await stream.close() except Exception: try: @@ -230,7 +242,7 @@ class CircuitV2Protocol(Service): while retries < max_retries: try: - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): # Try reading with timeout logger.debug( "Attempting to read from stream (attempt %d/%d)", @@ -307,7 +319,7 @@ class CircuitV2Protocol(Service): # First, handle the read timeout gracefully try: with trio.fail_after( - STREAM_READ_TIMEOUT * 2 + self.read_timeout * 2 ): # Double the timeout for reading msg_bytes = await stream.read() if not msg_bytes: @@ -428,7 +440,7 @@ class CircuitV2Protocol(Service): """ try: # Read the incoming message with timeout - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): msg_bytes = await stream.read() stop_msg = StopMessage() stop_msg.ParseFromString(msg_bytes) @@ -535,7 +547,7 @@ class CircuitV2Protocol(Service): ttl = self.resource_manager.reserve(peer_id) # Send reservation success response - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): status = create_status( code=StatusCode.OK, message="Reservation accepted" ) @@ -586,7 +598,7 @@ class CircuitV2Protocol(Service): # Always close the stream when done with reservation if cast(INetStreamWithExtras, stream).is_open(): try: - with trio.fail_after(STREAM_CLOSE_TIMEOUT): + with trio.fail_after(self.close_timeout): await stream.close() except Exception as close_err: logger.error("Error closing stream: %s", str(close_err)) @@ -622,7 +634,7 @@ class CircuitV2Protocol(Service): self._active_relays[peer_id] = (stream, None) # Try to connect to the destination with timeout - with trio.fail_after(STREAM_READ_TIMEOUT): + with trio.fail_after(self.read_timeout): dst_stream = await self.host.new_stream(peer_id, [STOP_PROTOCOL_ID]) if not dst_stream: raise ConnectionError("Could not connect to destination") @@ -751,7 +763,7 @@ class CircuitV2Protocol(Service): # Write data with timeout try: - with trio.fail_after(STREAM_WRITE_TIMEOUT): + with trio.fail_after(self.write_timeout): await dst_stream.write(data) except trio.TooSlowError: logger.error("Timeout writing in %s", direction.name) @@ -786,7 +798,7 @@ class CircuitV2Protocol(Service): """Send a status message.""" try: logger.debug("Sending status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(self.write_timeout * 2): # Double the timeout # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( @@ -824,7 +836,7 @@ class CircuitV2Protocol(Service): """Send a status message on a STOP stream.""" try: logger.debug("Sending stop status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(self.write_timeout * 2): # Double the timeout # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index ffd31090..3632615a 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -89,6 +89,8 @@ class CircuitV2Transport(ITransport): auto_reserve=config.enable_client, discovery_interval=config.discovery_interval, max_relays=config.max_relays, + stream_timeout=config.timeouts.discovery_stream_timeout, + peer_protocol_timeout=config.timeouts.peer_protocol_timeout, ) async def dial( diff --git a/newsfragments/917.internal.rst b/newsfragments/917.internal.rst new file mode 100644 index 00000000..ed06f3ed --- /dev/null +++ b/newsfragments/917.internal.rst @@ -0,0 +1,11 @@ +Replace magic numbers with named constants and enums for clarity and maintainability + +**Key Changes:** +- **Introduced type-safe enums** for better code clarity: + - `RelayRole(Flag)` enum with HOP, STOP, CLIENT roles supporting bitwise combinations (e.g., `RelayRole.HOP | RelayRole.STOP`) + - `ReservationStatus(Enum)` for reservation lifecycle management (ACTIVE, EXPIRED, REJECTED) +- **Replaced magic numbers with named constants** throughout the codebase, improving code maintainability and eliminating hardcoded timeout values (15s, 30s, 10s) with descriptive constant names +- **Added comprehensive timeout configuration system** with new `TimeoutConfig` dataclass supporting component-specific timeouts (discovery, protocol, DCUtR) +- **Enhanced configurability** of `RelayDiscovery`, `CircuitV2Protocol`, and `DCUtRProtocol` constructors with optional timeout parameters +- **Improved architecture consistency** with clean configuration flow across all circuit relay components +**Backward Compatibility:** All changes maintain full backward compatibility. Existing code continues to work unchanged while new timeout configuration options are available for users who need them. From 93c2d5002f30c60ab7f1b21d89af62ea25fc9c93 Mon Sep 17 00:00:00 2001 From: paschal533 Date: Tue, 2 Sep 2025 03:50:00 -0700 Subject: [PATCH 7/8] fix: GossipSub peer propagation to include FloodSub peers --- libp2p/pubsub/gossipsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 45c6cd81..e92c457d 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -308,7 +308,7 @@ class GossipSub(IPubsubRouter, Service): floodsub_peers: set[ID] = { peer_id for peer_id in self.pubsub.peer_topics[topic] - if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID + if peer_id in self.peer_protocol and self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID } send_to.update(floodsub_peers) From d64f9e10fd0a1f7a95fba21a0a9808526cb4fd4f Mon Sep 17 00:00:00 2001 From: Paschal <58183764+paschal533@users.noreply.github.com> Date: Tue, 2 Sep 2025 04:31:35 -0700 Subject: [PATCH 8/8] Fix: lint error --- libp2p/pubsub/gossipsub.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index e92c457d..f0e84641 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -308,7 +308,8 @@ class GossipSub(IPubsubRouter, Service): floodsub_peers: set[ID] = { peer_id for peer_id in self.pubsub.peer_topics[topic] - if peer_id in self.peer_protocol and self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID + if peer_id in self.peer_protocol + and self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID } send_to.update(floodsub_peers)