mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Fixed variable imports
This commit is contained in:
@ -46,6 +46,35 @@ MAX_AUTO_RELAY_ATTEMPTS = 3
|
||||
RESERVATION_REFRESH_THRESHOLD = 0.8 # Refresh at 80% of TTL
|
||||
MAX_CONCURRENT_RESERVATIONS = 2
|
||||
|
||||
# Timeout constants for different components
|
||||
DEFAULT_DISCOVERY_STREAM_TIMEOUT = 10 # seconds
|
||||
DEFAULT_PEER_PROTOCOL_TIMEOUT = 5 # seconds
|
||||
DEFAULT_PROTOCOL_READ_TIMEOUT = 15 # seconds
|
||||
DEFAULT_PROTOCOL_WRITE_TIMEOUT = 15 # seconds
|
||||
DEFAULT_PROTOCOL_CLOSE_TIMEOUT = 10 # seconds
|
||||
DEFAULT_DCUTR_READ_TIMEOUT = 30 # seconds
|
||||
DEFAULT_DCUTR_WRITE_TIMEOUT = 30 # seconds
|
||||
DEFAULT_DIAL_TIMEOUT = 10 # seconds
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimeoutConfig:
|
||||
"""Timeout configuration for different Circuit Relay v2 components."""
|
||||
|
||||
# Discovery timeouts
|
||||
discovery_stream_timeout: int = DEFAULT_DISCOVERY_STREAM_TIMEOUT
|
||||
peer_protocol_timeout: int = DEFAULT_PEER_PROTOCOL_TIMEOUT
|
||||
|
||||
# Core protocol timeouts
|
||||
protocol_read_timeout: int = DEFAULT_PROTOCOL_READ_TIMEOUT
|
||||
protocol_write_timeout: int = DEFAULT_PROTOCOL_WRITE_TIMEOUT
|
||||
protocol_close_timeout: int = DEFAULT_PROTOCOL_CLOSE_TIMEOUT
|
||||
|
||||
# DCUtR timeouts
|
||||
dcutr_read_timeout: int = DEFAULT_DCUTR_READ_TIMEOUT
|
||||
dcutr_write_timeout: int = DEFAULT_DCUTR_WRITE_TIMEOUT
|
||||
dial_timeout: int = DEFAULT_DIAL_TIMEOUT
|
||||
|
||||
|
||||
# Relay roles enum
|
||||
class RelayRole(Flag):
|
||||
@ -83,6 +112,9 @@ class RelayConfig:
|
||||
max_circuit_duration: int = DEFAULT_MAX_CIRCUIT_DURATION
|
||||
max_circuit_bytes: int = DEFAULT_MAX_CIRCUIT_BYTES
|
||||
|
||||
# Timeout configuration
|
||||
timeouts: TimeoutConfig = field(default_factory=TimeoutConfig)
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# Backwards-compat boolean helpers. Existing code that still accesses
|
||||
# ``cfg.enable_hop, cfg.enable_stop, cfg.enable_client`` will continue to work.
|
||||
|
||||
@ -29,6 +29,11 @@ from libp2p.peer.id import (
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
from libp2p.relay.circuit_v2.config import (
|
||||
DEFAULT_DCUTR_READ_TIMEOUT,
|
||||
DEFAULT_DCUTR_WRITE_TIMEOUT,
|
||||
DEFAULT_DIAL_TIMEOUT,
|
||||
)
|
||||
from libp2p.relay.circuit_v2.nat import (
|
||||
ReachabilityChecker,
|
||||
)
|
||||
@ -47,11 +52,7 @@ PROTOCOL_ID = TProtocol("/libp2p/dcutr")
|
||||
# Maximum message size for DCUtR (4KiB as per spec)
|
||||
MAX_MESSAGE_SIZE = 4 * 1024
|
||||
|
||||
# Timeouts
|
||||
STREAM_READ_TIMEOUT = 30 # seconds
|
||||
STREAM_WRITE_TIMEOUT = 30 # seconds
|
||||
DIAL_TIMEOUT = 10 # seconds
|
||||
|
||||
# DCUtR protocol constants
|
||||
# Maximum number of hole punch attempts per peer
|
||||
MAX_HOLE_PUNCH_ATTEMPTS = 5
|
||||
|
||||
@ -70,7 +71,13 @@ class DCUtRProtocol(Service):
|
||||
hole punching, after they have established an initial connection through a relay.
|
||||
"""
|
||||
|
||||
def __init__(self, host: IHost):
|
||||
def __init__(
|
||||
self,
|
||||
host: IHost,
|
||||
read_timeout: int = DEFAULT_DCUTR_READ_TIMEOUT,
|
||||
write_timeout: int = DEFAULT_DCUTR_WRITE_TIMEOUT,
|
||||
dial_timeout: int = DEFAULT_DIAL_TIMEOUT,
|
||||
):
|
||||
"""
|
||||
Initialize the DCUtR protocol.
|
||||
|
||||
@ -78,10 +85,19 @@ class DCUtRProtocol(Service):
|
||||
----------
|
||||
host : IHost
|
||||
The libp2p host this protocol is running on
|
||||
read_timeout : int
|
||||
Timeout for stream read operations, in seconds
|
||||
write_timeout : int
|
||||
Timeout for stream write operations, in seconds
|
||||
dial_timeout : int
|
||||
Timeout for dial operations, in seconds
|
||||
|
||||
"""
|
||||
super().__init__()
|
||||
self.host = host
|
||||
self.read_timeout = read_timeout
|
||||
self.write_timeout = write_timeout
|
||||
self.dial_timeout = dial_timeout
|
||||
self.event_started = trio.Event()
|
||||
self._hole_punch_attempts: dict[ID, int] = {}
|
||||
self._direct_connections: set[ID] = set()
|
||||
@ -161,7 +177,7 @@ class DCUtRProtocol(Service):
|
||||
|
||||
try:
|
||||
# Read the CONNECT message
|
||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
||||
with trio.fail_after(self.read_timeout):
|
||||
msg_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
||||
|
||||
# Parse the message
|
||||
@ -196,7 +212,7 @@ class DCUtRProtocol(Service):
|
||||
response.type = HolePunch.CONNECT
|
||||
response.ObsAddrs.extend(our_addrs)
|
||||
|
||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
||||
with trio.fail_after(self.write_timeout):
|
||||
await stream.write(response.SerializeToString())
|
||||
|
||||
logger.debug(
|
||||
@ -206,7 +222,7 @@ class DCUtRProtocol(Service):
|
||||
)
|
||||
|
||||
# Wait for SYNC message
|
||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
||||
with trio.fail_after(self.read_timeout):
|
||||
sync_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
||||
|
||||
# Parse the SYNC message
|
||||
@ -300,7 +316,7 @@ class DCUtRProtocol(Service):
|
||||
connect_msg.ObsAddrs.extend(our_addrs)
|
||||
|
||||
start_time = time.time()
|
||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
||||
with trio.fail_after(self.write_timeout):
|
||||
await stream.write(connect_msg.SerializeToString())
|
||||
|
||||
logger.debug(
|
||||
@ -310,7 +326,7 @@ class DCUtRProtocol(Service):
|
||||
)
|
||||
|
||||
# Receive the peer's CONNECT message
|
||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
||||
with trio.fail_after(self.read_timeout):
|
||||
resp_bytes = await stream.read(MAX_MESSAGE_SIZE)
|
||||
|
||||
# Calculate RTT
|
||||
@ -349,7 +365,7 @@ class DCUtRProtocol(Service):
|
||||
sync_msg = HolePunch()
|
||||
sync_msg.type = HolePunch.SYNC
|
||||
|
||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
||||
with trio.fail_after(self.write_timeout):
|
||||
await stream.write(sync_msg.SerializeToString())
|
||||
|
||||
logger.debug("Sent SYNC message to %s", peer_id)
|
||||
@ -468,7 +484,7 @@ class DCUtRProtocol(Service):
|
||||
peer_info = PeerInfo(peer_id, [addr])
|
||||
|
||||
# Try to connect with timeout
|
||||
with trio.fail_after(DIAL_TIMEOUT):
|
||||
with trio.fail_after(self.dial_timeout):
|
||||
await self.host.connect(peer_info)
|
||||
|
||||
logger.info("Successfully connected to %s at %s", peer_id, addr)
|
||||
|
||||
@ -31,6 +31,11 @@ from libp2p.tools.async_service import (
|
||||
Service,
|
||||
)
|
||||
|
||||
from .config import (
|
||||
DEFAULT_DISCOVERY_INTERVAL,
|
||||
DEFAULT_DISCOVERY_STREAM_TIMEOUT,
|
||||
DEFAULT_PEER_PROTOCOL_TIMEOUT,
|
||||
)
|
||||
from .pb.circuit_pb2 import (
|
||||
HopMessage,
|
||||
)
|
||||
@ -43,11 +48,8 @@ from .protocol_buffer import (
|
||||
|
||||
logger = logging.getLogger("libp2p.relay.circuit_v2.discovery")
|
||||
|
||||
# Constants
|
||||
# Discovery constants
|
||||
MAX_RELAYS_TO_TRACK = 10
|
||||
DEFAULT_DISCOVERY_INTERVAL = 60 # seconds
|
||||
STREAM_TIMEOUT = 10 # seconds
|
||||
PEER_PROTOCOL_TIMEOUT = 5 # seconds
|
||||
|
||||
|
||||
# Extended interfaces for type checking
|
||||
@ -87,6 +89,8 @@ class RelayDiscovery(Service):
|
||||
auto_reserve: bool = False,
|
||||
discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL,
|
||||
max_relays: int = MAX_RELAYS_TO_TRACK,
|
||||
stream_timeout: int = DEFAULT_DISCOVERY_STREAM_TIMEOUT,
|
||||
peer_protocol_timeout: int = DEFAULT_PEER_PROTOCOL_TIMEOUT,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the discovery service.
|
||||
@ -101,6 +105,10 @@ class RelayDiscovery(Service):
|
||||
How often to run discovery, in seconds
|
||||
max_relays : int
|
||||
Maximum number of relays to track
|
||||
stream_timeout : int
|
||||
Timeout for stream operations during discovery, in seconds
|
||||
peer_protocol_timeout : int
|
||||
Timeout for checking peer protocol support, in seconds
|
||||
|
||||
"""
|
||||
super().__init__()
|
||||
@ -108,6 +116,8 @@ class RelayDiscovery(Service):
|
||||
self.auto_reserve = auto_reserve
|
||||
self.discovery_interval = discovery_interval
|
||||
self.max_relays = max_relays
|
||||
self.stream_timeout = stream_timeout
|
||||
self.peer_protocol_timeout = peer_protocol_timeout
|
||||
self._discovered_relays: dict[ID, RelayInfo] = {}
|
||||
self._protocol_cache: dict[
|
||||
ID, set[str]
|
||||
@ -167,19 +177,19 @@ class RelayDiscovery(Service):
|
||||
continue
|
||||
|
||||
# Don't wait too long for protocol info
|
||||
with trio.move_on_after(PEER_PROTOCOL_TIMEOUT):
|
||||
with trio.move_on_after(self.peer_protocol_timeout):
|
||||
if await self._supports_relay_protocol(peer_id):
|
||||
await self._add_relay(peer_id)
|
||||
|
||||
# Limit number of relays we track
|
||||
if len(self._discovered_relays) > MAX_RELAYS_TO_TRACK:
|
||||
if len(self._discovered_relays) > self.max_relays:
|
||||
# Sort by last seen time and keep only the most recent ones
|
||||
sorted_relays = sorted(
|
||||
self._discovered_relays.items(),
|
||||
key=lambda x: x[1].last_seen,
|
||||
reverse=True,
|
||||
)
|
||||
to_remove = sorted_relays[MAX_RELAYS_TO_TRACK:]
|
||||
to_remove = sorted_relays[self.max_relays :]
|
||||
for peer_id, _ in to_remove:
|
||||
del self._discovered_relays[peer_id]
|
||||
|
||||
@ -265,7 +275,7 @@ class RelayDiscovery(Service):
|
||||
async def _check_via_direct_connection(self, peer_id: ID) -> bool | None:
|
||||
"""Check protocol support via direct connection."""
|
||||
try:
|
||||
with trio.fail_after(STREAM_TIMEOUT):
|
||||
with trio.fail_after(self.stream_timeout):
|
||||
stream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
|
||||
if stream:
|
||||
await stream.close()
|
||||
@ -371,7 +381,7 @@ class RelayDiscovery(Service):
|
||||
|
||||
# Open a stream to the relay with timeout
|
||||
try:
|
||||
with trio.fail_after(STREAM_TIMEOUT):
|
||||
with trio.fail_after(self.stream_timeout):
|
||||
stream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
|
||||
if not stream:
|
||||
logger.error("Failed to open stream to relay %s", peer_id)
|
||||
@ -387,7 +397,7 @@ class RelayDiscovery(Service):
|
||||
peer=self.host.get_id().to_bytes(),
|
||||
)
|
||||
|
||||
with trio.fail_after(STREAM_TIMEOUT):
|
||||
with trio.fail_after(self.stream_timeout):
|
||||
await stream.write(request.SerializeToString())
|
||||
|
||||
# Wait for response
|
||||
@ -464,7 +474,7 @@ class RelayDiscovery(Service):
|
||||
|
||||
for peer_id, relay_info in self._discovered_relays.items():
|
||||
# Check if relay hasn't been seen in a while (3x discovery interval)
|
||||
if now - relay_info.last_seen > DEFAULT_DISCOVERY_INTERVAL * 3:
|
||||
if now - relay_info.last_seen > self.discovery_interval * 3:
|
||||
to_remove.append(peer_id)
|
||||
continue
|
||||
|
||||
|
||||
@ -43,6 +43,9 @@ from .config import (
|
||||
DEFAULT_MAX_CIRCUIT_CONNS,
|
||||
DEFAULT_MAX_CIRCUIT_DURATION,
|
||||
DEFAULT_MAX_RESERVATIONS,
|
||||
DEFAULT_PROTOCOL_CLOSE_TIMEOUT,
|
||||
DEFAULT_PROTOCOL_READ_TIMEOUT,
|
||||
DEFAULT_PROTOCOL_WRITE_TIMEOUT,
|
||||
)
|
||||
from .pb.circuit_pb2 import (
|
||||
HopMessage,
|
||||
@ -80,10 +83,7 @@ DEFAULT_RELAY_LIMITS = RelayLimits(
|
||||
max_reservations=DEFAULT_MAX_RESERVATIONS,
|
||||
)
|
||||
|
||||
# Stream operation timeouts
|
||||
STREAM_READ_TIMEOUT = 15 # seconds
|
||||
STREAM_WRITE_TIMEOUT = 15 # seconds
|
||||
STREAM_CLOSE_TIMEOUT = 10 # seconds
|
||||
# Stream operation constants
|
||||
MAX_READ_RETRIES = 5 # Maximum number of read retries
|
||||
|
||||
|
||||
@ -127,6 +127,9 @@ class CircuitV2Protocol(Service):
|
||||
host: IHost,
|
||||
limits: RelayLimits | None = None,
|
||||
allow_hop: bool = False,
|
||||
read_timeout: int = DEFAULT_PROTOCOL_READ_TIMEOUT,
|
||||
write_timeout: int = DEFAULT_PROTOCOL_WRITE_TIMEOUT,
|
||||
close_timeout: int = DEFAULT_PROTOCOL_CLOSE_TIMEOUT,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize a Circuit Relay v2 protocol instance.
|
||||
@ -139,11 +142,20 @@ class CircuitV2Protocol(Service):
|
||||
Resource limits for the relay
|
||||
allow_hop : bool
|
||||
Whether to allow this node to act as a relay
|
||||
read_timeout : int
|
||||
Timeout for stream read operations, in seconds
|
||||
write_timeout : int
|
||||
Timeout for stream write operations, in seconds
|
||||
close_timeout : int
|
||||
Timeout for stream close operations, in seconds
|
||||
|
||||
"""
|
||||
self.host = host
|
||||
self.limits = limits or DEFAULT_RELAY_LIMITS
|
||||
self.allow_hop = allow_hop
|
||||
self.read_timeout = read_timeout
|
||||
self.write_timeout = write_timeout
|
||||
self.close_timeout = close_timeout
|
||||
self.resource_manager = RelayResourceManager(self.limits)
|
||||
self._active_relays: dict[ID, tuple[INetStream, INetStream | None]] = {}
|
||||
self.event_started = trio.Event()
|
||||
@ -188,7 +200,7 @@ class CircuitV2Protocol(Service):
|
||||
return
|
||||
|
||||
try:
|
||||
with trio.fail_after(STREAM_CLOSE_TIMEOUT):
|
||||
with trio.fail_after(self.close_timeout):
|
||||
await stream.close()
|
||||
except Exception:
|
||||
try:
|
||||
@ -230,7 +242,7 @@ class CircuitV2Protocol(Service):
|
||||
|
||||
while retries < max_retries:
|
||||
try:
|
||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
||||
with trio.fail_after(self.read_timeout):
|
||||
# Try reading with timeout
|
||||
logger.debug(
|
||||
"Attempting to read from stream (attempt %d/%d)",
|
||||
@ -307,7 +319,7 @@ class CircuitV2Protocol(Service):
|
||||
# First, handle the read timeout gracefully
|
||||
try:
|
||||
with trio.fail_after(
|
||||
STREAM_READ_TIMEOUT * 2
|
||||
self.read_timeout * 2
|
||||
): # Double the timeout for reading
|
||||
msg_bytes = await stream.read()
|
||||
if not msg_bytes:
|
||||
@ -428,7 +440,7 @@ class CircuitV2Protocol(Service):
|
||||
"""
|
||||
try:
|
||||
# Read the incoming message with timeout
|
||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
||||
with trio.fail_after(self.read_timeout):
|
||||
msg_bytes = await stream.read()
|
||||
stop_msg = StopMessage()
|
||||
stop_msg.ParseFromString(msg_bytes)
|
||||
@ -535,7 +547,7 @@ class CircuitV2Protocol(Service):
|
||||
ttl = self.resource_manager.reserve(peer_id)
|
||||
|
||||
# Send reservation success response
|
||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
||||
with trio.fail_after(self.write_timeout):
|
||||
status = create_status(
|
||||
code=StatusCode.OK, message="Reservation accepted"
|
||||
)
|
||||
@ -586,7 +598,7 @@ class CircuitV2Protocol(Service):
|
||||
# Always close the stream when done with reservation
|
||||
if cast(INetStreamWithExtras, stream).is_open():
|
||||
try:
|
||||
with trio.fail_after(STREAM_CLOSE_TIMEOUT):
|
||||
with trio.fail_after(self.close_timeout):
|
||||
await stream.close()
|
||||
except Exception as close_err:
|
||||
logger.error("Error closing stream: %s", str(close_err))
|
||||
@ -622,7 +634,7 @@ class CircuitV2Protocol(Service):
|
||||
self._active_relays[peer_id] = (stream, None)
|
||||
|
||||
# Try to connect to the destination with timeout
|
||||
with trio.fail_after(STREAM_READ_TIMEOUT):
|
||||
with trio.fail_after(self.read_timeout):
|
||||
dst_stream = await self.host.new_stream(peer_id, [STOP_PROTOCOL_ID])
|
||||
if not dst_stream:
|
||||
raise ConnectionError("Could not connect to destination")
|
||||
@ -751,7 +763,7 @@ class CircuitV2Protocol(Service):
|
||||
|
||||
# Write data with timeout
|
||||
try:
|
||||
with trio.fail_after(STREAM_WRITE_TIMEOUT):
|
||||
with trio.fail_after(self.write_timeout):
|
||||
await dst_stream.write(data)
|
||||
except trio.TooSlowError:
|
||||
logger.error("Timeout writing in %s", direction.name)
|
||||
@ -786,7 +798,7 @@ class CircuitV2Protocol(Service):
|
||||
"""Send a status message."""
|
||||
try:
|
||||
logger.debug("Sending status message with code %s: %s", code, message)
|
||||
with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout
|
||||
with trio.fail_after(self.write_timeout * 2): # Double the timeout
|
||||
# Create a proto Status directly
|
||||
pb_status = PbStatus()
|
||||
pb_status.code = cast(
|
||||
@ -824,7 +836,7 @@ class CircuitV2Protocol(Service):
|
||||
"""Send a status message on a STOP stream."""
|
||||
try:
|
||||
logger.debug("Sending stop status message with code %s: %s", code, message)
|
||||
with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout
|
||||
with trio.fail_after(self.write_timeout * 2): # Double the timeout
|
||||
# Create a proto Status directly
|
||||
pb_status = PbStatus()
|
||||
pb_status.code = cast(
|
||||
|
||||
@ -89,6 +89,8 @@ class CircuitV2Transport(ITransport):
|
||||
auto_reserve=config.enable_client,
|
||||
discovery_interval=config.discovery_interval,
|
||||
max_relays=config.max_relays,
|
||||
stream_timeout=config.timeouts.discovery_stream_timeout,
|
||||
peer_protocol_timeout=config.timeouts.peer_protocol_timeout,
|
||||
)
|
||||
|
||||
async def dial(
|
||||
|
||||
11
newsfragments/917.internal.rst
Normal file
11
newsfragments/917.internal.rst
Normal file
@ -0,0 +1,11 @@
|
||||
Replace magic numbers with named constants and enums for clarity and maintainability
|
||||
|
||||
**Key Changes:**
|
||||
- **Introduced type-safe enums** for better code clarity:
|
||||
- `RelayRole(Flag)` enum with HOP, STOP, CLIENT roles supporting bitwise combinations (e.g., `RelayRole.HOP | RelayRole.STOP`)
|
||||
- `ReservationStatus(Enum)` for reservation lifecycle management (ACTIVE, EXPIRED, REJECTED)
|
||||
- **Replaced magic numbers with named constants** throughout the codebase, improving code maintainability and eliminating hardcoded timeout values (15s, 30s, 10s) with descriptive constant names
|
||||
- **Added comprehensive timeout configuration system** with new `TimeoutConfig` dataclass supporting component-specific timeouts (discovery, protocol, DCUtR)
|
||||
- **Enhanced configurability** of `RelayDiscovery`, `CircuitV2Protocol`, and `DCUtRProtocol` constructors with optional timeout parameters
|
||||
- **Improved architecture consistency** with clean configuration flow across all circuit relay components
|
||||
**Backward Compatibility:** All changes maintain full backward compatibility. Existing code continues to work unchanged while new timeout configuration options are available for users who need them.
|
||||
Reference in New Issue
Block a user