mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Merge branch 'main' into fix/issue-952-windows-cdci-python-version
This commit is contained in:
@ -9,6 +9,7 @@ from dataclasses import (
|
|||||||
dataclass,
|
dataclass,
|
||||||
field,
|
field,
|
||||||
)
|
)
|
||||||
|
from enum import Flag, auto
|
||||||
|
|
||||||
from libp2p.peer.peerinfo import (
|
from libp2p.peer.peerinfo import (
|
||||||
PeerInfo,
|
PeerInfo,
|
||||||
@ -18,29 +19,118 @@ from .resources import (
|
|||||||
RelayLimits,
|
RelayLimits,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
DEFAULT_MIN_RELAYS = 3
|
||||||
|
DEFAULT_MAX_RELAYS = 20
|
||||||
|
DEFAULT_DISCOVERY_INTERVAL = 300 # seconds
|
||||||
|
DEFAULT_RESERVATION_TTL = 3600 # seconds
|
||||||
|
DEFAULT_MAX_CIRCUIT_DURATION = 3600 # seconds
|
||||||
|
DEFAULT_MAX_CIRCUIT_BYTES = 1024 * 1024 * 1024 # 1GB
|
||||||
|
|
||||||
|
DEFAULT_MAX_CIRCUIT_CONNS = 8
|
||||||
|
DEFAULT_MAX_RESERVATIONS = 4
|
||||||
|
|
||||||
|
MAX_RESERVATIONS_PER_IP = 8
|
||||||
|
MAX_CIRCUITS_PER_IP = 16
|
||||||
|
RESERVATION_RATE_PER_IP = 4 # per minute
|
||||||
|
CIRCUIT_RATE_PER_IP = 8 # per minute
|
||||||
|
MAX_CIRCUITS_TOTAL = 64
|
||||||
|
MAX_RESERVATIONS_TOTAL = 32
|
||||||
|
MAX_BANDWIDTH_PER_CIRCUIT = 1024 * 1024 # 1MB/s
|
||||||
|
MAX_BANDWIDTH_TOTAL = 10 * 1024 * 1024 # 10MB/s
|
||||||
|
|
||||||
|
MIN_RELAY_SCORE = 0.5
|
||||||
|
MAX_RELAY_LATENCY = 1.0 # seconds
|
||||||
|
ENABLE_AUTO_RELAY = True
|
||||||
|
AUTO_RELAY_TIMEOUT = 30 # seconds
|
||||||
|
MAX_AUTO_RELAY_ATTEMPTS = 3
|
||||||
|
RESERVATION_REFRESH_THRESHOLD = 0.8 # Refresh at 80% of TTL
|
||||||
|
MAX_CONCURRENT_RESERVATIONS = 2
|
||||||
|
|
||||||
|
# Timeout constants for different components
|
||||||
|
DEFAULT_DISCOVERY_STREAM_TIMEOUT = 10 # seconds
|
||||||
|
DEFAULT_PEER_PROTOCOL_TIMEOUT = 5 # seconds
|
||||||
|
DEFAULT_PROTOCOL_READ_TIMEOUT = 15 # seconds
|
||||||
|
DEFAULT_PROTOCOL_WRITE_TIMEOUT = 15 # seconds
|
||||||
|
DEFAULT_PROTOCOL_CLOSE_TIMEOUT = 10 # seconds
|
||||||
|
DEFAULT_DCUTR_READ_TIMEOUT = 30 # seconds
|
||||||
|
DEFAULT_DCUTR_WRITE_TIMEOUT = 30 # seconds
|
||||||
|
DEFAULT_DIAL_TIMEOUT = 10 # seconds
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TimeoutConfig:
|
||||||
|
"""Timeout configuration for different Circuit Relay v2 components."""
|
||||||
|
|
||||||
|
# Discovery timeouts
|
||||||
|
discovery_stream_timeout: int = DEFAULT_DISCOVERY_STREAM_TIMEOUT
|
||||||
|
peer_protocol_timeout: int = DEFAULT_PEER_PROTOCOL_TIMEOUT
|
||||||
|
|
||||||
|
# Core protocol timeouts
|
||||||
|
protocol_read_timeout: int = DEFAULT_PROTOCOL_READ_TIMEOUT
|
||||||
|
protocol_write_timeout: int = DEFAULT_PROTOCOL_WRITE_TIMEOUT
|
||||||
|
protocol_close_timeout: int = DEFAULT_PROTOCOL_CLOSE_TIMEOUT
|
||||||
|
|
||||||
|
# DCUtR timeouts
|
||||||
|
dcutr_read_timeout: int = DEFAULT_DCUTR_READ_TIMEOUT
|
||||||
|
dcutr_write_timeout: int = DEFAULT_DCUTR_WRITE_TIMEOUT
|
||||||
|
dial_timeout: int = DEFAULT_DIAL_TIMEOUT
|
||||||
|
|
||||||
|
|
||||||
|
# Relay roles enum
|
||||||
|
class RelayRole(Flag):
|
||||||
|
"""
|
||||||
|
Bit-flag enum that captures the three possible relay capabilities.
|
||||||
|
|
||||||
|
A node can combine multiple roles using bit-wise OR, for example::
|
||||||
|
|
||||||
|
RelayRole.HOP | RelayRole.STOP
|
||||||
|
"""
|
||||||
|
|
||||||
|
HOP = auto() # Act as a relay for others ("hop")
|
||||||
|
STOP = auto() # Accept relayed connections ("stop")
|
||||||
|
CLIENT = auto() # Dial through existing relays ("client")
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RelayConfig:
|
class RelayConfig:
|
||||||
"""Configuration for Circuit Relay v2."""
|
"""Configuration for Circuit Relay v2."""
|
||||||
|
|
||||||
# Role configuration
|
# Role configuration (bit-flags)
|
||||||
enable_hop: bool = False # Whether to act as a relay (hop)
|
roles: RelayRole = RelayRole.STOP | RelayRole.CLIENT
|
||||||
enable_stop: bool = True # Whether to accept relayed connections (stop)
|
|
||||||
enable_client: bool = True # Whether to use relays for dialing
|
|
||||||
|
|
||||||
# Resource limits
|
# Resource limits
|
||||||
limits: RelayLimits | None = None
|
limits: RelayLimits | None = None
|
||||||
|
|
||||||
# Discovery configuration
|
# Discovery configuration
|
||||||
bootstrap_relays: list[PeerInfo] = field(default_factory=list)
|
bootstrap_relays: list[PeerInfo] = field(default_factory=list)
|
||||||
min_relays: int = 3
|
min_relays: int = DEFAULT_MIN_RELAYS
|
||||||
max_relays: int = 20
|
max_relays: int = DEFAULT_MAX_RELAYS
|
||||||
discovery_interval: int = 300 # seconds
|
discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL
|
||||||
|
|
||||||
# Connection configuration
|
# Connection configuration
|
||||||
reservation_ttl: int = 3600 # seconds
|
reservation_ttl: int = DEFAULT_RESERVATION_TTL
|
||||||
max_circuit_duration: int = 3600 # seconds
|
max_circuit_duration: int = DEFAULT_MAX_CIRCUIT_DURATION
|
||||||
max_circuit_bytes: int = 1024 * 1024 * 1024 # 1GB
|
max_circuit_bytes: int = DEFAULT_MAX_CIRCUIT_BYTES
|
||||||
|
|
||||||
|
# Timeout configuration
|
||||||
|
timeouts: TimeoutConfig = field(default_factory=TimeoutConfig)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------
|
||||||
|
# Backwards-compat boolean helpers. Existing code that still accesses
|
||||||
|
# ``cfg.enable_hop, cfg.enable_stop, cfg.enable_client`` will continue to work.
|
||||||
|
# ---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enable_hop(self) -> bool: # pragma: no cover – helper
|
||||||
|
return bool(self.roles & RelayRole.HOP)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enable_stop(self) -> bool: # pragma: no cover – helper
|
||||||
|
return bool(self.roles & RelayRole.STOP)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enable_client(self) -> bool: # pragma: no cover – helper
|
||||||
|
return bool(self.roles & RelayRole.CLIENT)
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
"""Initialize default values."""
|
"""Initialize default values."""
|
||||||
@ -48,8 +138,8 @@ class RelayConfig:
|
|||||||
self.limits = RelayLimits(
|
self.limits = RelayLimits(
|
||||||
duration=self.max_circuit_duration,
|
duration=self.max_circuit_duration,
|
||||||
data=self.max_circuit_bytes,
|
data=self.max_circuit_bytes,
|
||||||
max_circuit_conns=8,
|
max_circuit_conns=DEFAULT_MAX_CIRCUIT_CONNS,
|
||||||
max_reservations=4,
|
max_reservations=DEFAULT_MAX_RESERVATIONS,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -58,20 +148,20 @@ class HopConfig:
|
|||||||
"""Configuration specific to relay (hop) nodes."""
|
"""Configuration specific to relay (hop) nodes."""
|
||||||
|
|
||||||
# Resource limits per IP
|
# Resource limits per IP
|
||||||
max_reservations_per_ip: int = 8
|
max_reservations_per_ip: int = MAX_RESERVATIONS_PER_IP
|
||||||
max_circuits_per_ip: int = 16
|
max_circuits_per_ip: int = MAX_CIRCUITS_PER_IP
|
||||||
|
|
||||||
# Rate limiting
|
# Rate limiting
|
||||||
reservation_rate_per_ip: int = 4 # per minute
|
reservation_rate_per_ip: int = RESERVATION_RATE_PER_IP
|
||||||
circuit_rate_per_ip: int = 8 # per minute
|
circuit_rate_per_ip: int = CIRCUIT_RATE_PER_IP
|
||||||
|
|
||||||
# Resource quotas
|
# Resource quotas
|
||||||
max_circuits_total: int = 64
|
max_circuits_total: int = MAX_CIRCUITS_TOTAL
|
||||||
max_reservations_total: int = 32
|
max_reservations_total: int = MAX_RESERVATIONS_TOTAL
|
||||||
|
|
||||||
# Bandwidth limits
|
# Bandwidth limits
|
||||||
max_bandwidth_per_circuit: int = 1024 * 1024 # 1MB/s
|
max_bandwidth_per_circuit: int = MAX_BANDWIDTH_PER_CIRCUIT
|
||||||
max_bandwidth_total: int = 10 * 1024 * 1024 # 10MB/s
|
max_bandwidth_total: int = MAX_BANDWIDTH_TOTAL
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -79,14 +169,14 @@ class ClientConfig:
|
|||||||
"""Configuration specific to relay clients."""
|
"""Configuration specific to relay clients."""
|
||||||
|
|
||||||
# Relay selection
|
# Relay selection
|
||||||
min_relay_score: float = 0.5
|
min_relay_score: float = MIN_RELAY_SCORE
|
||||||
max_relay_latency: float = 1.0 # seconds
|
max_relay_latency: float = MAX_RELAY_LATENCY
|
||||||
|
|
||||||
# Auto-relay settings
|
# Auto-relay settings
|
||||||
enable_auto_relay: bool = True
|
enable_auto_relay: bool = ENABLE_AUTO_RELAY
|
||||||
auto_relay_timeout: int = 30 # seconds
|
auto_relay_timeout: int = AUTO_RELAY_TIMEOUT
|
||||||
max_auto_relay_attempts: int = 3
|
max_auto_relay_attempts: int = MAX_AUTO_RELAY_ATTEMPTS
|
||||||
|
|
||||||
# Reservation management
|
# Reservation management
|
||||||
reservation_refresh_threshold: float = 0.8 # Refresh at 80% of TTL
|
reservation_refresh_threshold: float = RESERVATION_REFRESH_THRESHOLD
|
||||||
max_concurrent_reservations: int = 2
|
max_concurrent_reservations: int = MAX_CONCURRENT_RESERVATIONS
|
||||||
|
|||||||
@ -29,6 +29,11 @@ from libp2p.peer.id import (
|
|||||||
from libp2p.peer.peerinfo import (
|
from libp2p.peer.peerinfo import (
|
||||||
PeerInfo,
|
PeerInfo,
|
||||||
)
|
)
|
||||||
|
from libp2p.relay.circuit_v2.config import (
|
||||||
|
DEFAULT_DCUTR_READ_TIMEOUT,
|
||||||
|
DEFAULT_DCUTR_WRITE_TIMEOUT,
|
||||||
|
DEFAULT_DIAL_TIMEOUT,
|
||||||
|
)
|
||||||
from libp2p.relay.circuit_v2.nat import (
|
from libp2p.relay.circuit_v2.nat import (
|
||||||
ReachabilityChecker,
|
ReachabilityChecker,
|
||||||
)
|
)
|
||||||
@ -47,11 +52,7 @@ PROTOCOL_ID = TProtocol("/libp2p/dcutr")
|
|||||||
# Maximum message size for DCUtR (4KiB as per spec)
|
# Maximum message size for DCUtR (4KiB as per spec)
|
||||||
MAX_MESSAGE_SIZE = 4 * 1024
|
MAX_MESSAGE_SIZE = 4 * 1024
|
||||||
|
|
||||||
# Timeouts
|
# DCUtR protocol constants
|
||||||
STREAM_READ_TIMEOUT = 30 # seconds
|
|
||||||
STREAM_WRITE_TIMEOUT = 30 # seconds
|
|
||||||
DIAL_TIMEOUT = 10 # seconds
|
|
||||||
|
|
||||||
# Maximum number of hole punch attempts per peer
|
# Maximum number of hole punch attempts per peer
|
||||||
MAX_HOLE_PUNCH_ATTEMPTS = 5
|
MAX_HOLE_PUNCH_ATTEMPTS = 5
|
||||||
|
|
||||||
@ -70,7 +71,13 @@ class DCUtRProtocol(Service):
|
|||||||
hole punching, after they have established an initial connection through a relay.
|
hole punching, after they have established an initial connection through a relay.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, host: IHost):
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: IHost,
|
||||||
|
read_timeout: int = DEFAULT_DCUTR_READ_TIMEOUT,
|
||||||
|
write_timeout: int = DEFAULT_DCUTR_WRITE_TIMEOUT,
|
||||||
|
dial_timeout: int = DEFAULT_DIAL_TIMEOUT,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Initialize the DCUtR protocol.
|
Initialize the DCUtR protocol.
|
||||||
|
|
||||||
@ -78,10 +85,19 @@ class DCUtRProtocol(Service):
|
|||||||
----------
|
----------
|
||||||
host : IHost
|
host : IHost
|
||||||
The libp2p host this protocol is running on
|
The libp2p host this protocol is running on
|
||||||
|
read_timeout : int
|
||||||
|
Timeout for stream read operations, in seconds
|
||||||
|
write_timeout : int
|
||||||
|
Timeout for stream write operations, in seconds
|
||||||
|
dial_timeout : int
|
||||||
|
Timeout for dial operations, in seconds
|
||||||
|
|
||||||
"""
|
"""
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.host = host
|
self.host = host
|
||||||
|
self.read_timeout = read_timeout
|
||||||
|
self.write_timeout = write_timeout
|
||||||
|
self.dial_timeout = dial_timeout
|
||||||
self.event_started = trio.Event()
|
self.event_started = trio.Event()
|
||||||
self._hole_punch_attempts: dict[ID, int] = {}
|
self._hole_punch_attempts: dict[ID, int] = {}
|
||||||
self._direct_connections: set[ID] = set()
|
self._direct_connections: set[ID] = set()
|
||||||
@ -161,7 +177,7 @@ class DCUtRProtocol(Service):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Read the CONNECT message
|
# Read the CONNECT message
|
||||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
with trio.fail_after(self.read_timeout):
|
||||||
msg_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
msg_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
||||||
|
|
||||||
# Parse the message
|
# Parse the message
|
||||||
@ -196,7 +212,7 @@ class DCUtRProtocol(Service):
|
|||||||
response.type = HolePunch.CONNECT
|
response.type = HolePunch.CONNECT
|
||||||
response.ObsAddrs.extend(our_addrs)
|
response.ObsAddrs.extend(our_addrs)
|
||||||
|
|
||||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
with trio.fail_after(self.write_timeout):
|
||||||
await stream.write(response.SerializeToString())
|
await stream.write(response.SerializeToString())
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@ -206,7 +222,7 @@ class DCUtRProtocol(Service):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Wait for SYNC message
|
# Wait for SYNC message
|
||||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
with trio.fail_after(self.read_timeout):
|
||||||
sync_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
sync_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
||||||
|
|
||||||
# Parse the SYNC message
|
# Parse the SYNC message
|
||||||
@ -300,7 +316,7 @@ class DCUtRProtocol(Service):
|
|||||||
connect_msg.ObsAddrs.extend(our_addrs)
|
connect_msg.ObsAddrs.extend(our_addrs)
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
with trio.fail_after(self.write_timeout):
|
||||||
await stream.write(connect_msg.SerializeToString())
|
await stream.write(connect_msg.SerializeToString())
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@ -310,7 +326,7 @@ class DCUtRProtocol(Service):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Receive the peer's CONNECT message
|
# Receive the peer's CONNECT message
|
||||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
with trio.fail_after(self.read_timeout):
|
||||||
resp_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
resp_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
||||||
|
|
||||||
# Calculate RTT
|
# Calculate RTT
|
||||||
@ -349,7 +365,7 @@ class DCUtRProtocol(Service):
|
|||||||
sync_msg = HolePunch()
|
sync_msg = HolePunch()
|
||||||
sync_msg.type = HolePunch.SYNC
|
sync_msg.type = HolePunch.SYNC
|
||||||
|
|
||||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
with trio.fail_after(self.write_timeout):
|
||||||
await stream.write(sync_msg.SerializeToString())
|
await stream.write(sync_msg.SerializeToString())
|
||||||
|
|
||||||
logger.debug("Sent SYNC message to %s", peer_id)
|
logger.debug("Sent SYNC message to %s", peer_id)
|
||||||
@ -468,7 +484,7 @@ class DCUtRProtocol(Service):
|
|||||||
peer_info = PeerInfo(peer_id, [addr])
|
peer_info = PeerInfo(peer_id, [addr])
|
||||||
|
|
||||||
# Try to connect with timeout
|
# Try to connect with timeout
|
||||||
with trio.fail_after(DIAL_TIMEOUT):
|
with trio.fail_after(self.dial_timeout):
|
||||||
await self.host.connect(peer_info)
|
await self.host.connect(peer_info)
|
||||||
|
|
||||||
logger.info("Successfully connected to %s at %s", peer_id, addr)
|
logger.info("Successfully connected to %s at %s", peer_id, addr)
|
||||||
|
|||||||
@ -31,6 +31,11 @@ from libp2p.tools.async_service import (
|
|||||||
Service,
|
Service,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from .config import (
|
||||||
|
DEFAULT_DISCOVERY_INTERVAL,
|
||||||
|
DEFAULT_DISCOVERY_STREAM_TIMEOUT,
|
||||||
|
DEFAULT_PEER_PROTOCOL_TIMEOUT,
|
||||||
|
)
|
||||||
from .pb.circuit_pb2 import (
|
from .pb.circuit_pb2 import (
|
||||||
HopMessage,
|
HopMessage,
|
||||||
)
|
)
|
||||||
@ -43,10 +48,8 @@ from .protocol_buffer import (
|
|||||||
|
|
||||||
logger = logging.getLogger("libp2p.relay.circuit_v2.discovery")
|
logger = logging.getLogger("libp2p.relay.circuit_v2.discovery")
|
||||||
|
|
||||||
# Constants
|
# Discovery constants
|
||||||
MAX_RELAYS_TO_TRACK = 10
|
MAX_RELAYS_TO_TRACK = 10
|
||||||
DEFAULT_DISCOVERY_INTERVAL = 60 # seconds
|
|
||||||
STREAM_TIMEOUT = 10 # seconds
|
|
||||||
|
|
||||||
|
|
||||||
# Extended interfaces for type checking
|
# Extended interfaces for type checking
|
||||||
@ -86,6 +89,8 @@ class RelayDiscovery(Service):
|
|||||||
auto_reserve: bool = False,
|
auto_reserve: bool = False,
|
||||||
discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL,
|
discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL,
|
||||||
max_relays: int = MAX_RELAYS_TO_TRACK,
|
max_relays: int = MAX_RELAYS_TO_TRACK,
|
||||||
|
stream_timeout: int = DEFAULT_DISCOVERY_STREAM_TIMEOUT,
|
||||||
|
peer_protocol_timeout: int = DEFAULT_PEER_PROTOCOL_TIMEOUT,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize the discovery service.
|
Initialize the discovery service.
|
||||||
@ -100,6 +105,10 @@ class RelayDiscovery(Service):
|
|||||||
How often to run discovery, in seconds
|
How often to run discovery, in seconds
|
||||||
max_relays : int
|
max_relays : int
|
||||||
Maximum number of relays to track
|
Maximum number of relays to track
|
||||||
|
stream_timeout : int
|
||||||
|
Timeout for stream operations during discovery, in seconds
|
||||||
|
peer_protocol_timeout : int
|
||||||
|
Timeout for checking peer protocol support, in seconds
|
||||||
|
|
||||||
"""
|
"""
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@ -107,6 +116,8 @@ class RelayDiscovery(Service):
|
|||||||
self.auto_reserve = auto_reserve
|
self.auto_reserve = auto_reserve
|
||||||
self.discovery_interval = discovery_interval
|
self.discovery_interval = discovery_interval
|
||||||
self.max_relays = max_relays
|
self.max_relays = max_relays
|
||||||
|
self.stream_timeout = stream_timeout
|
||||||
|
self.peer_protocol_timeout = peer_protocol_timeout
|
||||||
self._discovered_relays: dict[ID, RelayInfo] = {}
|
self._discovered_relays: dict[ID, RelayInfo] = {}
|
||||||
self._protocol_cache: dict[
|
self._protocol_cache: dict[
|
||||||
ID, set[str]
|
ID, set[str]
|
||||||
@ -165,8 +176,8 @@ class RelayDiscovery(Service):
|
|||||||
self._discovered_relays[peer_id].last_seen = time.time()
|
self._discovered_relays[peer_id].last_seen = time.time()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check if peer supports the relay protocol
|
# Don't wait too long for protocol info
|
||||||
with trio.move_on_after(5): # Don't wait too long for protocol info
|
with trio.move_on_after(self.peer_protocol_timeout):
|
||||||
if await self._supports_relay_protocol(peer_id):
|
if await self._supports_relay_protocol(peer_id):
|
||||||
await self._add_relay(peer_id)
|
await self._add_relay(peer_id)
|
||||||
|
|
||||||
@ -264,7 +275,7 @@ class RelayDiscovery(Service):
|
|||||||
async def _check_via_direct_connection(self, peer_id: ID) -> bool | None:
|
async def _check_via_direct_connection(self, peer_id: ID) -> bool | None:
|
||||||
"""Check protocol support via direct connection."""
|
"""Check protocol support via direct connection."""
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(STREAM_TIMEOUT):
|
with trio.fail_after(self.stream_timeout):
|
||||||
stream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
|
stream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
|
||||||
if stream:
|
if stream:
|
||||||
await stream.close()
|
await stream.close()
|
||||||
@ -370,7 +381,7 @@ class RelayDiscovery(Service):
|
|||||||
|
|
||||||
# Open a stream to the relay with timeout
|
# Open a stream to the relay with timeout
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(STREAM_TIMEOUT):
|
with trio.fail_after(self.stream_timeout):
|
||||||
stream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
|
stream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
|
||||||
if not stream:
|
if not stream:
|
||||||
logger.error("Failed to open stream to relay %s", peer_id)
|
logger.error("Failed to open stream to relay %s", peer_id)
|
||||||
@ -386,7 +397,7 @@ class RelayDiscovery(Service):
|
|||||||
peer=self.host.get_id().to_bytes(),
|
peer=self.host.get_id().to_bytes(),
|
||||||
)
|
)
|
||||||
|
|
||||||
with trio.fail_after(STREAM_TIMEOUT):
|
with trio.fail_after(self.stream_timeout):
|
||||||
await stream.write(request.SerializeToString())
|
await stream.write(request.SerializeToString())
|
||||||
|
|
||||||
# Wait for response
|
# Wait for response
|
||||||
|
|||||||
@ -5,6 +5,7 @@ This module implements the Circuit Relay v2 protocol as specified in:
|
|||||||
https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md
|
https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from enum import Enum, auto
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
@ -37,6 +38,15 @@ from libp2p.tools.async_service import (
|
|||||||
Service,
|
Service,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from .config import (
|
||||||
|
DEFAULT_MAX_CIRCUIT_BYTES,
|
||||||
|
DEFAULT_MAX_CIRCUIT_CONNS,
|
||||||
|
DEFAULT_MAX_CIRCUIT_DURATION,
|
||||||
|
DEFAULT_MAX_RESERVATIONS,
|
||||||
|
DEFAULT_PROTOCOL_CLOSE_TIMEOUT,
|
||||||
|
DEFAULT_PROTOCOL_READ_TIMEOUT,
|
||||||
|
DEFAULT_PROTOCOL_WRITE_TIMEOUT,
|
||||||
|
)
|
||||||
from .pb.circuit_pb2 import (
|
from .pb.circuit_pb2 import (
|
||||||
HopMessage,
|
HopMessage,
|
||||||
Limit,
|
Limit,
|
||||||
@ -58,18 +68,22 @@ logger = logging.getLogger("libp2p.relay.circuit_v2")
|
|||||||
PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0")
|
PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0")
|
||||||
STOP_PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0/stop")
|
STOP_PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0/stop")
|
||||||
|
|
||||||
|
|
||||||
|
# Direction enum for data piping
|
||||||
|
class Pipe(Enum):
|
||||||
|
SRC_TO_DST = auto()
|
||||||
|
DST_TO_SRC = auto()
|
||||||
|
|
||||||
|
|
||||||
# Default limits for relay resources
|
# Default limits for relay resources
|
||||||
DEFAULT_RELAY_LIMITS = RelayLimits(
|
DEFAULT_RELAY_LIMITS = RelayLimits(
|
||||||
duration=60 * 60, # 1 hour
|
duration=DEFAULT_MAX_CIRCUIT_DURATION,
|
||||||
data=1024 * 1024 * 1024, # 1GB
|
data=DEFAULT_MAX_CIRCUIT_BYTES,
|
||||||
max_circuit_conns=8,
|
max_circuit_conns=DEFAULT_MAX_CIRCUIT_CONNS,
|
||||||
max_reservations=4,
|
max_reservations=DEFAULT_MAX_RESERVATIONS,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Stream operation timeouts
|
# Stream operation constants
|
||||||
STREAM_READ_TIMEOUT = 15 # seconds
|
|
||||||
STREAM_WRITE_TIMEOUT = 15 # seconds
|
|
||||||
STREAM_CLOSE_TIMEOUT = 10 # seconds
|
|
||||||
MAX_READ_RETRIES = 5 # Maximum number of read retries
|
MAX_READ_RETRIES = 5 # Maximum number of read retries
|
||||||
|
|
||||||
|
|
||||||
@ -113,6 +127,9 @@ class CircuitV2Protocol(Service):
|
|||||||
host: IHost,
|
host: IHost,
|
||||||
limits: RelayLimits | None = None,
|
limits: RelayLimits | None = None,
|
||||||
allow_hop: bool = False,
|
allow_hop: bool = False,
|
||||||
|
read_timeout: int = DEFAULT_PROTOCOL_READ_TIMEOUT,
|
||||||
|
write_timeout: int = DEFAULT_PROTOCOL_WRITE_TIMEOUT,
|
||||||
|
close_timeout: int = DEFAULT_PROTOCOL_CLOSE_TIMEOUT,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize a Circuit Relay v2 protocol instance.
|
Initialize a Circuit Relay v2 protocol instance.
|
||||||
@ -125,11 +142,20 @@ class CircuitV2Protocol(Service):
|
|||||||
Resource limits for the relay
|
Resource limits for the relay
|
||||||
allow_hop : bool
|
allow_hop : bool
|
||||||
Whether to allow this node to act as a relay
|
Whether to allow this node to act as a relay
|
||||||
|
read_timeout : int
|
||||||
|
Timeout for stream read operations, in seconds
|
||||||
|
write_timeout : int
|
||||||
|
Timeout for stream write operations, in seconds
|
||||||
|
close_timeout : int
|
||||||
|
Timeout for stream close operations, in seconds
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self.host = host
|
self.host = host
|
||||||
self.limits = limits or DEFAULT_RELAY_LIMITS
|
self.limits = limits or DEFAULT_RELAY_LIMITS
|
||||||
self.allow_hop = allow_hop
|
self.allow_hop = allow_hop
|
||||||
|
self.read_timeout = read_timeout
|
||||||
|
self.write_timeout = write_timeout
|
||||||
|
self.close_timeout = close_timeout
|
||||||
self.resource_manager = RelayResourceManager(self.limits)
|
self.resource_manager = RelayResourceManager(self.limits)
|
||||||
self._active_relays: dict[ID, tuple[INetStream, INetStream | None]] = {}
|
self._active_relays: dict[ID, tuple[INetStream, INetStream | None]] = {}
|
||||||
self.event_started = trio.Event()
|
self.event_started = trio.Event()
|
||||||
@ -174,7 +200,7 @@ class CircuitV2Protocol(Service):
|
|||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(STREAM_CLOSE_TIMEOUT):
|
with trio.fail_after(self.close_timeout):
|
||||||
await stream.close()
|
await stream.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
try:
|
try:
|
||||||
@ -216,7 +242,7 @@ class CircuitV2Protocol(Service):
|
|||||||
|
|
||||||
while retries < max_retries:
|
while retries < max_retries:
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
with trio.fail_after(self.read_timeout):
|
||||||
# Try reading with timeout
|
# Try reading with timeout
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Attempting to read from stream (attempt %d/%d)",
|
"Attempting to read from stream (attempt %d/%d)",
|
||||||
@ -293,7 +319,7 @@ class CircuitV2Protocol(Service):
|
|||||||
# First, handle the read timeout gracefully
|
# First, handle the read timeout gracefully
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(
|
with trio.fail_after(
|
||||||
STREAM_READ_TIMEOUT * 2
|
self.read_timeout * 2
|
||||||
): # Double the timeout for reading
|
): # Double the timeout for reading
|
||||||
msg_bytes = await stream.read()
|
msg_bytes = await stream.read()
|
||||||
if not msg_bytes:
|
if not msg_bytes:
|
||||||
@ -414,7 +440,7 @@ class CircuitV2Protocol(Service):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Read the incoming message with timeout
|
# Read the incoming message with timeout
|
||||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
with trio.fail_after(self.read_timeout):
|
||||||
msg_bytes = await stream.read()
|
msg_bytes = await stream.read()
|
||||||
stop_msg = StopMessage()
|
stop_msg = StopMessage()
|
||||||
stop_msg.ParseFromString(msg_bytes)
|
stop_msg.ParseFromString(msg_bytes)
|
||||||
@ -458,8 +484,20 @@ class CircuitV2Protocol(Service):
|
|||||||
|
|
||||||
# Start relaying data
|
# Start relaying data
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
nursery.start_soon(self._relay_data, src_stream, stream, peer_id)
|
nursery.start_soon(
|
||||||
nursery.start_soon(self._relay_data, stream, src_stream, peer_id)
|
self._relay_data,
|
||||||
|
src_stream,
|
||||||
|
stream,
|
||||||
|
peer_id,
|
||||||
|
Pipe.SRC_TO_DST,
|
||||||
|
)
|
||||||
|
nursery.start_soon(
|
||||||
|
self._relay_data,
|
||||||
|
stream,
|
||||||
|
src_stream,
|
||||||
|
peer_id,
|
||||||
|
Pipe.DST_TO_SRC,
|
||||||
|
)
|
||||||
|
|
||||||
except trio.TooSlowError:
|
except trio.TooSlowError:
|
||||||
logger.error("Timeout reading from stop stream")
|
logger.error("Timeout reading from stop stream")
|
||||||
@ -509,7 +547,7 @@ class CircuitV2Protocol(Service):
|
|||||||
ttl = self.resource_manager.reserve(peer_id)
|
ttl = self.resource_manager.reserve(peer_id)
|
||||||
|
|
||||||
# Send reservation success response
|
# Send reservation success response
|
||||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
with trio.fail_after(self.write_timeout):
|
||||||
status = create_status(
|
status = create_status(
|
||||||
code=StatusCode.OK, message="Reservation accepted"
|
code=StatusCode.OK, message="Reservation accepted"
|
||||||
)
|
)
|
||||||
@ -560,7 +598,7 @@ class CircuitV2Protocol(Service):
|
|||||||
# Always close the stream when done with reservation
|
# Always close the stream when done with reservation
|
||||||
if cast(INetStreamWithExtras, stream).is_open():
|
if cast(INetStreamWithExtras, stream).is_open():
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(STREAM_CLOSE_TIMEOUT):
|
with trio.fail_after(self.close_timeout):
|
||||||
await stream.close()
|
await stream.close()
|
||||||
except Exception as close_err:
|
except Exception as close_err:
|
||||||
logger.error("Error closing stream: %s", str(close_err))
|
logger.error("Error closing stream: %s", str(close_err))
|
||||||
@ -596,7 +634,7 @@ class CircuitV2Protocol(Service):
|
|||||||
self._active_relays[peer_id] = (stream, None)
|
self._active_relays[peer_id] = (stream, None)
|
||||||
|
|
||||||
# Try to connect to the destination with timeout
|
# Try to connect to the destination with timeout
|
||||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
with trio.fail_after(self.read_timeout):
|
||||||
dst_stream = await self.host.new_stream(peer_id, [STOP_PROTOCOL_ID])
|
dst_stream = await self.host.new_stream(peer_id, [STOP_PROTOCOL_ID])
|
||||||
if not dst_stream:
|
if not dst_stream:
|
||||||
raise ConnectionError("Could not connect to destination")
|
raise ConnectionError("Could not connect to destination")
|
||||||
@ -648,8 +686,20 @@ class CircuitV2Protocol(Service):
|
|||||||
|
|
||||||
# Start relaying data
|
# Start relaying data
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
nursery.start_soon(self._relay_data, stream, dst_stream, peer_id)
|
nursery.start_soon(
|
||||||
nursery.start_soon(self._relay_data, dst_stream, stream, peer_id)
|
self._relay_data,
|
||||||
|
stream,
|
||||||
|
dst_stream,
|
||||||
|
peer_id,
|
||||||
|
Pipe.SRC_TO_DST,
|
||||||
|
)
|
||||||
|
nursery.start_soon(
|
||||||
|
self._relay_data,
|
||||||
|
dst_stream,
|
||||||
|
stream,
|
||||||
|
peer_id,
|
||||||
|
Pipe.DST_TO_SRC,
|
||||||
|
)
|
||||||
|
|
||||||
except (trio.TooSlowError, ConnectionError) as e:
|
except (trio.TooSlowError, ConnectionError) as e:
|
||||||
logger.error("Error establishing relay connection: %s", str(e))
|
logger.error("Error establishing relay connection: %s", str(e))
|
||||||
@ -685,6 +735,7 @@ class CircuitV2Protocol(Service):
|
|||||||
src_stream: INetStream,
|
src_stream: INetStream,
|
||||||
dst_stream: INetStream,
|
dst_stream: INetStream,
|
||||||
peer_id: ID,
|
peer_id: ID,
|
||||||
|
direction: Pipe,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Relay data between two streams.
|
Relay data between two streams.
|
||||||
@ -698,24 +749,27 @@ class CircuitV2Protocol(Service):
|
|||||||
peer_id : ID
|
peer_id : ID
|
||||||
ID of the peer being relayed
|
ID of the peer being relayed
|
||||||
|
|
||||||
|
direction : Pipe
|
||||||
|
Direction of data flow (``Pipe.SRC_TO_DST`` or ``Pipe.DST_TO_SRC``)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
# Read data with retries
|
# Read data with retries
|
||||||
data = await self._read_stream_with_retry(src_stream)
|
data = await self._read_stream_with_retry(src_stream)
|
||||||
if not data:
|
if not data:
|
||||||
logger.info("Source stream closed/reset")
|
logger.info("%s closed/reset", direction.name)
|
||||||
break
|
break
|
||||||
|
|
||||||
# Write data with timeout
|
# Write data with timeout
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
with trio.fail_after(self.write_timeout):
|
||||||
await dst_stream.write(data)
|
await dst_stream.write(data)
|
||||||
except trio.TooSlowError:
|
except trio.TooSlowError:
|
||||||
logger.error("Timeout writing to destination stream")
|
logger.error("Timeout writing in %s", direction.name)
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error writing to destination stream: %s", str(e))
|
logger.error("Error writing in %s: %s", direction.name, str(e))
|
||||||
break
|
break
|
||||||
|
|
||||||
# Update resource usage
|
# Update resource usage
|
||||||
@ -744,7 +798,7 @@ class CircuitV2Protocol(Service):
|
|||||||
"""Send a status message."""
|
"""Send a status message."""
|
||||||
try:
|
try:
|
||||||
logger.debug("Sending status message with code %s: %s", code, message)
|
logger.debug("Sending status message with code %s: %s", code, message)
|
||||||
with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout
|
with trio.fail_after(self.write_timeout * 2): # Double the timeout
|
||||||
# Create a proto Status directly
|
# Create a proto Status directly
|
||||||
pb_status = PbStatus()
|
pb_status = PbStatus()
|
||||||
pb_status.code = cast(
|
pb_status.code = cast(
|
||||||
@ -782,7 +836,7 @@ class CircuitV2Protocol(Service):
|
|||||||
"""Send a status message on a STOP stream."""
|
"""Send a status message on a STOP stream."""
|
||||||
try:
|
try:
|
||||||
logger.debug("Sending stop status message with code %s: %s", code, message)
|
logger.debug("Sending stop status message with code %s: %s", code, message)
|
||||||
with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout
|
with trio.fail_after(self.write_timeout * 2): # Double the timeout
|
||||||
# Create a proto Status directly
|
# Create a proto Status directly
|
||||||
pb_status = PbStatus()
|
pb_status = PbStatus()
|
||||||
pb_status.code = cast(
|
pb_status.code = cast(
|
||||||
|
|||||||
@ -8,6 +8,7 @@ including reservations and connection limits.
|
|||||||
from dataclasses import (
|
from dataclasses import (
|
||||||
dataclass,
|
dataclass,
|
||||||
)
|
)
|
||||||
|
from enum import Enum, auto
|
||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
@ -19,6 +20,18 @@ from libp2p.peer.id import (
|
|||||||
# Import the protobuf definitions
|
# Import the protobuf definitions
|
||||||
from .pb.circuit_pb2 import Reservation as PbReservation
|
from .pb.circuit_pb2 import Reservation as PbReservation
|
||||||
|
|
||||||
|
RANDOM_BYTES_LENGTH = 16 # 128 bits of randomness
|
||||||
|
TIMESTAMP_MULTIPLIER = 1000000 # To convert seconds to microseconds
|
||||||
|
|
||||||
|
|
||||||
|
# Reservation status enum
|
||||||
|
class ReservationStatus(Enum):
|
||||||
|
"""Lifecycle status of a relay reservation."""
|
||||||
|
|
||||||
|
ACTIVE = auto()
|
||||||
|
EXPIRED = auto()
|
||||||
|
REJECTED = auto()
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RelayLimits:
|
class RelayLimits:
|
||||||
@ -68,8 +81,8 @@ class Reservation:
|
|||||||
# - Peer ID to bind it to the specific peer
|
# - Peer ID to bind it to the specific peer
|
||||||
# - Timestamp for uniqueness
|
# - Timestamp for uniqueness
|
||||||
# - Hash everything for a fixed size output
|
# - Hash everything for a fixed size output
|
||||||
random_bytes = os.urandom(16) # 128 bits of randomness
|
random_bytes = os.urandom(RANDOM_BYTES_LENGTH)
|
||||||
timestamp = str(int(self.created_at * 1000000)).encode()
|
timestamp = str(int(self.created_at * TIMESTAMP_MULTIPLIER)).encode()
|
||||||
peer_bytes = self.peer_id.to_bytes()
|
peer_bytes = self.peer_id.to_bytes()
|
||||||
|
|
||||||
# Combine all elements and hash them
|
# Combine all elements and hash them
|
||||||
@ -84,6 +97,15 @@ class Reservation:
|
|||||||
"""Check if the reservation has expired."""
|
"""Check if the reservation has expired."""
|
||||||
return time.time() > self.expires_at
|
return time.time() > self.expires_at
|
||||||
|
|
||||||
|
# Expose a friendly status enum
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status(self) -> ReservationStatus:
|
||||||
|
"""Return the current status as a ``ReservationStatus`` enum."""
|
||||||
|
return (
|
||||||
|
ReservationStatus.EXPIRED if self.is_expired() else ReservationStatus.ACTIVE
|
||||||
|
)
|
||||||
|
|
||||||
def can_accept_connection(self) -> bool:
|
def can_accept_connection(self) -> bool:
|
||||||
"""Check if a new connection can be accepted."""
|
"""Check if a new connection can be accepted."""
|
||||||
return (
|
return (
|
||||||
|
|||||||
@ -89,6 +89,8 @@ class CircuitV2Transport(ITransport):
|
|||||||
auto_reserve=config.enable_client,
|
auto_reserve=config.enable_client,
|
||||||
discovery_interval=config.discovery_interval,
|
discovery_interval=config.discovery_interval,
|
||||||
max_relays=config.max_relays,
|
max_relays=config.max_relays,
|
||||||
|
stream_timeout=config.timeouts.discovery_stream_timeout,
|
||||||
|
peer_protocol_timeout=config.timeouts.peer_protocol_timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def dial(
|
async def dial(
|
||||||
|
|||||||
11
newsfragments/917.internal.rst
Normal file
11
newsfragments/917.internal.rst
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
Replace magic numbers with named constants and enums for clarity and maintainability
|
||||||
|
|
||||||
|
**Key Changes:**
|
||||||
|
- **Introduced type-safe enums** for better code clarity:
|
||||||
|
- `RelayRole(Flag)` enum with HOP, STOP, CLIENT roles supporting bitwise combinations (e.g., `RelayRole.HOP | RelayRole.STOP`)
|
||||||
|
- `ReservationStatus(Enum)` for reservation lifecycle management (ACTIVE, EXPIRED, REJECTED)
|
||||||
|
- **Replaced magic numbers with named constants** throughout the codebase, improving code maintainability and eliminating hardcoded timeout values (15s, 30s, 10s) with descriptive constant names
|
||||||
|
- **Added comprehensive timeout configuration system** with new `TimeoutConfig` dataclass supporting component-specific timeouts (discovery, protocol, DCUtR)
|
||||||
|
- **Enhanced configurability** of `RelayDiscovery`, `CircuitV2Protocol`, and `DCUtRProtocol` constructors with optional timeout parameters
|
||||||
|
- **Improved architecture consistency** with clean configuration flow across all circuit relay components
|
||||||
|
**Backward Compatibility:** All changes maintain full backward compatibility. Existing code continues to work unchanged while new timeout configuration options are available for users who need them.
|
||||||
Reference in New Issue
Block a user