Replace magic number with named constants

This commit is contained in:
parth-soni07
2025-09-09 13:24:07 +05:30
parent 74f4aaf136
commit 4a36d6efeb
5 changed files with 194 additions and 59 deletions

View File

@ -9,6 +9,7 @@ from dataclasses import (
dataclass,
field,
)
from enum import Flag, auto
from libp2p.peer.peerinfo import (
PeerInfo,
@ -18,29 +19,95 @@ from .resources import (
RelayLimits,
)
DEFAULT_MIN_RELAYS = 3
DEFAULT_MAX_RELAYS = 20
DEFAULT_DISCOVERY_INTERVAL = 300 # seconds
DEFAULT_RESERVATION_TTL = 3600 # seconds
DEFAULT_MAX_CIRCUIT_DURATION = 3600 # seconds
DEFAULT_MAX_CIRCUIT_BYTES = 1024 * 1024 * 1024 # 1GB
DEFAULT_MAX_CIRCUIT_CONNS = 8
DEFAULT_MAX_RESERVATIONS = 4
MAX_RESERVATIONS_PER_IP = 8
MAX_CIRCUITS_PER_IP = 16
RESERVATION_RATE_PER_IP = 4 # per minute
CIRCUIT_RATE_PER_IP = 8 # per minute
MAX_CIRCUITS_TOTAL = 64
MAX_RESERVATIONS_TOTAL = 32
MAX_BANDWIDTH_PER_CIRCUIT = 1024 * 1024 # 1MB/s
MAX_BANDWIDTH_TOTAL = 10 * 1024 * 1024 # 10MB/s
MIN_RELAY_SCORE = 0.5
MAX_RELAY_LATENCY = 1.0 # seconds
ENABLE_AUTO_RELAY = True
AUTO_RELAY_TIMEOUT = 30 # seconds
MAX_AUTO_RELAY_ATTEMPTS = 3
RESERVATION_REFRESH_THRESHOLD = 0.8 # Refresh at 80% of TTL
MAX_CONCURRENT_RESERVATIONS = 2
# Shared timeout constants (used across modules)
STREAM_READ_TIMEOUT = 15 # seconds
STREAM_WRITE_TIMEOUT = 15 # seconds
STREAM_CLOSE_TIMEOUT = 10 # seconds
DIAL_TIMEOUT = 10 # seconds
# NAT reachability timeout
REACHABILITY_TIMEOUT = 10 # seconds
# Relay roles enum -----------------------------------------------------------
class RelayRole(Flag):
"""
Bit-flag enum that captures the three possible relay capabilities.
A node can combine multiple roles using bit-wise OR, for example::
RelayRole.HOP | RelayRole.STOP
"""
HOP = auto() # Act as a relay for others ("hop")
STOP = auto() # Accept relayed connections ("stop")
CLIENT = auto() # Dial through existing relays ("client")
@dataclass
class RelayConfig:
"""Configuration for Circuit Relay v2."""
# Role configuration
enable_hop: bool = False # Whether to act as a relay (hop)
enable_stop: bool = True # Whether to accept relayed connections (stop)
enable_client: bool = True # Whether to use relays for dialing
# Role configuration (bit-flags)
roles: RelayRole = RelayRole.STOP | RelayRole.CLIENT
# Resource limits
limits: RelayLimits | None = None
# Discovery configuration
bootstrap_relays: list[PeerInfo] = field(default_factory=list)
min_relays: int = 3
max_relays: int = 20
discovery_interval: int = 300 # seconds
min_relays: int = DEFAULT_MIN_RELAYS
max_relays: int = DEFAULT_MAX_RELAYS
discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL
# Connection configuration
reservation_ttl: int = 3600 # seconds
max_circuit_duration: int = 3600 # seconds
max_circuit_bytes: int = 1024 * 1024 * 1024 # 1GB
reservation_ttl: int = DEFAULT_RESERVATION_TTL
max_circuit_duration: int = DEFAULT_MAX_CIRCUIT_DURATION
max_circuit_bytes: int = DEFAULT_MAX_CIRCUIT_BYTES
# ---------------------------------------------------------------------
# Backwards-compat boolean helpers. Existing code that still accesses
# ``cfg.enable_hop, cfg.enable_stop, cfg.enable_client`` will continue to work.
# ---------------------------------------------------------------------
@property
def enable_hop(self) -> bool: # pragma: no cover helper
return bool(self.roles & RelayRole.HOP)
@property
def enable_stop(self) -> bool: # pragma: no cover helper
return bool(self.roles & RelayRole.STOP)
@property
def enable_client(self) -> bool: # pragma: no cover helper
return bool(self.roles & RelayRole.CLIENT)
def __post_init__(self) -> None:
"""Initialize default values."""
@ -48,8 +115,8 @@ class RelayConfig:
self.limits = RelayLimits(
duration=self.max_circuit_duration,
data=self.max_circuit_bytes,
max_circuit_conns=8,
max_reservations=4,
max_circuit_conns=DEFAULT_MAX_CIRCUIT_CONNS,
max_reservations=DEFAULT_MAX_RESERVATIONS,
)
@ -58,20 +125,20 @@ class HopConfig:
"""Configuration specific to relay (hop) nodes."""
# Resource limits per IP
max_reservations_per_ip: int = 8
max_circuits_per_ip: int = 16
max_reservations_per_ip: int = MAX_RESERVATIONS_PER_IP
max_circuits_per_ip: int = MAX_CIRCUITS_PER_IP
# Rate limiting
reservation_rate_per_ip: int = 4 # per minute
circuit_rate_per_ip: int = 8 # per minute
reservation_rate_per_ip: int = RESERVATION_RATE_PER_IP
circuit_rate_per_ip: int = CIRCUIT_RATE_PER_IP
# Resource quotas
max_circuits_total: int = 64
max_reservations_total: int = 32
max_circuits_total: int = MAX_CIRCUITS_TOTAL
max_reservations_total: int = MAX_RESERVATIONS_TOTAL
# Bandwidth limits
max_bandwidth_per_circuit: int = 1024 * 1024 # 1MB/s
max_bandwidth_total: int = 10 * 1024 * 1024 # 10MB/s
max_bandwidth_per_circuit: int = MAX_BANDWIDTH_PER_CIRCUIT
max_bandwidth_total: int = MAX_BANDWIDTH_TOTAL
@dataclass
@ -79,14 +146,14 @@ class ClientConfig:
"""Configuration specific to relay clients."""
# Relay selection
min_relay_score: float = 0.5
max_relay_latency: float = 1.0 # seconds
min_relay_score: float = MIN_RELAY_SCORE
max_relay_latency: float = MAX_RELAY_LATENCY
# Auto-relay settings
enable_auto_relay: bool = True
auto_relay_timeout: int = 30 # seconds
max_auto_relay_attempts: int = 3
enable_auto_relay: bool = ENABLE_AUTO_RELAY
auto_relay_timeout: int = AUTO_RELAY_TIMEOUT
max_auto_relay_attempts: int = MAX_AUTO_RELAY_ATTEMPTS
# Reservation management
reservation_refresh_threshold: float = 0.8 # Refresh at 80% of TTL
max_concurrent_reservations: int = 2
reservation_refresh_threshold: float = RESERVATION_REFRESH_THRESHOLD
max_concurrent_reservations: int = MAX_CONCURRENT_RESERVATIONS

View File

@ -39,6 +39,12 @@ from libp2p.tools.async_service import (
Service,
)
from .config import (
DIAL_TIMEOUT,
STREAM_READ_TIMEOUT,
STREAM_WRITE_TIMEOUT,
)
logger = logging.getLogger(__name__)
# Protocol ID for DCUtR
@ -47,11 +53,6 @@ PROTOCOL_ID = TProtocol("/libp2p/dcutr")
# Maximum message size for DCUtR (4KiB as per spec)
MAX_MESSAGE_SIZE = 4 * 1024
# Timeouts
STREAM_READ_TIMEOUT = 30 # seconds
STREAM_WRITE_TIMEOUT = 30 # seconds
DIAL_TIMEOUT = 10 # seconds
# Maximum number of hole punch attempts per peer
MAX_HOLE_PUNCH_ATTEMPTS = 5

View File

@ -31,6 +31,9 @@ from libp2p.tools.async_service import (
Service,
)
from .config import (
DEFAULT_DISCOVERY_INTERVAL as CFG_DISCOVERY_INTERVAL,
)
from .pb.circuit_pb2 import (
HopMessage,
)
@ -43,10 +46,11 @@ from .protocol_buffer import (
logger = logging.getLogger("libp2p.relay.circuit_v2.discovery")
# Constants
MAX_RELAYS_TO_TRACK = 10
DEFAULT_DISCOVERY_INTERVAL = 60 # seconds
# Constants (single-source-of-truth)
DEFAULT_DISCOVERY_INTERVAL = CFG_DISCOVERY_INTERVAL
MAX_RELAYS_TO_TRACK = 10 # Still discovery-specific
STREAM_TIMEOUT = 10 # seconds
PEER_PROTOCOL_TIMEOUT = 5 # seconds
# Extended interfaces for type checking
@ -165,20 +169,20 @@ class RelayDiscovery(Service):
self._discovered_relays[peer_id].last_seen = time.time()
continue
# Check if peer supports the relay protocol
with trio.move_on_after(5): # Don't wait too long for protocol info
# Don't wait too long for protocol info
with trio.move_on_after(PEER_PROTOCOL_TIMEOUT):
if await self._supports_relay_protocol(peer_id):
await self._add_relay(peer_id)
# Limit number of relays we track
if len(self._discovered_relays) > self.max_relays:
if len(self._discovered_relays) > MAX_RELAYS_TO_TRACK:
# Sort by last seen time and keep only the most recent ones
sorted_relays = sorted(
self._discovered_relays.items(),
key=lambda x: x[1].last_seen,
reverse=True,
)
to_remove = sorted_relays[self.max_relays :]
to_remove = sorted_relays[MAX_RELAYS_TO_TRACK:]
for peer_id, _ in to_remove:
del self._discovered_relays[peer_id]
@ -463,7 +467,7 @@ class RelayDiscovery(Service):
for peer_id, relay_info in self._discovered_relays.items():
# Check if relay hasn't been seen in a while (3x discovery interval)
if now - relay_info.last_seen > self.discovery_interval * 3:
if now - relay_info.last_seen > DEFAULT_DISCOVERY_INTERVAL * 3:
to_remove.append(peer_id)
continue

View File

@ -5,6 +5,7 @@ This module implements the Circuit Relay v2 protocol as specified in:
https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md
"""
from enum import Enum, auto
import logging
import time
from typing import (
@ -37,6 +38,15 @@ from libp2p.tools.async_service import (
Service,
)
from .config import (
DEFAULT_MAX_CIRCUIT_BYTES,
DEFAULT_MAX_CIRCUIT_CONNS,
DEFAULT_MAX_CIRCUIT_DURATION,
DEFAULT_MAX_RESERVATIONS,
STREAM_CLOSE_TIMEOUT,
STREAM_READ_TIMEOUT,
STREAM_WRITE_TIMEOUT,
)
from .pb.circuit_pb2 import (
HopMessage,
Limit,
@ -58,18 +68,21 @@ logger = logging.getLogger("libp2p.relay.circuit_v2")
PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0")
STOP_PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0/stop")
# Default limits for relay resources
# Direction enum for data piping
class Pipe(Enum):
SRC_TO_DST = auto()
DST_TO_SRC = auto()
# Default limits for relay resources (single source of truth)
DEFAULT_RELAY_LIMITS = RelayLimits(
duration=60 * 60, # 1 hour
data=1024 * 1024 * 1024, # 1GB
max_circuit_conns=8,
max_reservations=4,
duration=DEFAULT_MAX_CIRCUIT_DURATION,
data=DEFAULT_MAX_CIRCUIT_BYTES,
max_circuit_conns=DEFAULT_MAX_CIRCUIT_CONNS,
max_reservations=DEFAULT_MAX_RESERVATIONS,
)
# Stream operation timeouts
STREAM_READ_TIMEOUT = 15 # seconds
STREAM_WRITE_TIMEOUT = 15 # seconds
STREAM_CLOSE_TIMEOUT = 10 # seconds
MAX_READ_RETRIES = 5 # Maximum number of read retries
@ -458,8 +471,20 @@ class CircuitV2Protocol(Service):
# Start relaying data
async with trio.open_nursery() as nursery:
nursery.start_soon(self._relay_data, src_stream, stream, peer_id)
nursery.start_soon(self._relay_data, stream, src_stream, peer_id)
nursery.start_soon(
self._relay_data,
src_stream,
stream,
peer_id,
Pipe.SRC_TO_DST,
)
nursery.start_soon(
self._relay_data,
stream,
src_stream,
peer_id,
Pipe.DST_TO_SRC,
)
except trio.TooSlowError:
logger.error("Timeout reading from stop stream")
@ -648,8 +673,20 @@ class CircuitV2Protocol(Service):
# Start relaying data
async with trio.open_nursery() as nursery:
nursery.start_soon(self._relay_data, stream, dst_stream, peer_id)
nursery.start_soon(self._relay_data, dst_stream, stream, peer_id)
nursery.start_soon(
self._relay_data,
stream,
dst_stream,
peer_id,
Pipe.SRC_TO_DST,
)
nursery.start_soon(
self._relay_data,
dst_stream,
stream,
peer_id,
Pipe.DST_TO_SRC,
)
except (trio.TooSlowError, ConnectionError) as e:
logger.error("Error establishing relay connection: %s", str(e))
@ -685,6 +722,7 @@ class CircuitV2Protocol(Service):
src_stream: INetStream,
dst_stream: INetStream,
peer_id: ID,
direction: Pipe,
) -> None:
"""
Relay data between two streams.
@ -698,13 +736,16 @@ class CircuitV2Protocol(Service):
peer_id : ID
ID of the peer being relayed
direction : Pipe
Direction of data flow (``Pipe.SRC_TO_DST`` or ``Pipe.DST_TO_SRC``)
"""
try:
while True:
# Read data with retries
data = await self._read_stream_with_retry(src_stream)
if not data:
logger.info("Source stream closed/reset")
logger.info("%s closed/reset", direction.name)
break
# Write data with timeout
@ -712,10 +753,10 @@ class CircuitV2Protocol(Service):
with trio.fail_after(STREAM_WRITE_TIMEOUT):
await dst_stream.write(data)
except trio.TooSlowError:
logger.error("Timeout writing to destination stream")
logger.error("Timeout writing in %s", direction.name)
break
except Exception as e:
logger.error("Error writing to destination stream: %s", str(e))
logger.error("Error writing in %s: %s", direction.name, str(e))
break
# Update resource usage

View File

@ -8,6 +8,7 @@ including reservations and connection limits.
from dataclasses import (
dataclass,
)
from enum import Enum, auto
import hashlib
import os
import time
@ -19,6 +20,18 @@ from libp2p.peer.id import (
# Import the protobuf definitions
from .pb.circuit_pb2 import Reservation as PbReservation
RANDOM_BYTES_LENGTH = 16 # 128 bits of randomness
TIMESTAMP_MULTIPLIER = 1000000 # To convert seconds to microseconds
# Reservation status enum
class ReservationStatus(Enum):
"""Lifecycle status of a relay reservation."""
ACTIVE = auto()
EXPIRED = auto()
REJECTED = auto()
@dataclass
class RelayLimits:
@ -68,8 +81,8 @@ class Reservation:
# - Peer ID to bind it to the specific peer
# - Timestamp for uniqueness
# - Hash everything for a fixed size output
random_bytes = os.urandom(16) # 128 bits of randomness
timestamp = str(int(self.created_at * 1000000)).encode()
random_bytes = os.urandom(RANDOM_BYTES_LENGTH)
timestamp = str(int(self.created_at * TIMESTAMP_MULTIPLIER)).encode()
peer_bytes = self.peer_id.to_bytes()
# Combine all elements and hash them
@ -84,6 +97,15 @@ class Reservation:
"""Check if the reservation has expired."""
return time.time() > self.expires_at
# Expose a friendly status enum --------------------------------------
@property
def status(self) -> ReservationStatus:
"""Return the current status as a ``ReservationStatus`` enum."""
return (
ReservationStatus.EXPIRED if self.is_expired() else ReservationStatus.ACTIVE
)
def can_accept_connection(self) -> bool:
"""Check if a new connection can be accepted."""
return (