mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
fix: proper connection config setup
This commit is contained in:
@ -1,3 +1,5 @@
|
|||||||
|
"""Libp2p Python implementation."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from libp2p.transport.quic.utils import is_quic_multiaddr
|
from libp2p.transport.quic.utils import is_quic_multiaddr
|
||||||
@ -197,10 +199,10 @@ def new_swarm(
|
|||||||
id_opt = generate_peer_id_from(key_pair)
|
id_opt = generate_peer_id_from(key_pair)
|
||||||
|
|
||||||
transport: TCP | QUICTransport
|
transport: TCP | QUICTransport
|
||||||
|
quic_transport_opt = connection_config if isinstance(connection_config, QUICTransportConfig) else None
|
||||||
|
|
||||||
if listen_addrs is None:
|
if listen_addrs is None:
|
||||||
if enable_quic:
|
if enable_quic:
|
||||||
quic_transport_opt = connection_config if isinstance(connection_config, QUICTransportConfig) else None
|
|
||||||
transport = QUICTransport(key_pair.private_key, config=quic_transport_opt)
|
transport = QUICTransport(key_pair.private_key, config=quic_transport_opt)
|
||||||
else:
|
else:
|
||||||
transport = TCP()
|
transport = TCP()
|
||||||
@ -210,7 +212,6 @@ def new_swarm(
|
|||||||
if addr.__contains__("tcp"):
|
if addr.__contains__("tcp"):
|
||||||
transport = TCP()
|
transport = TCP()
|
||||||
elif is_quic:
|
elif is_quic:
|
||||||
quic_transport_opt = connection_config if isinstance(connection_config, QUICTransportConfig) else None
|
|
||||||
transport = QUICTransport(key_pair.private_key, config=quic_transport_opt)
|
transport = QUICTransport(key_pair.private_key, config=quic_transport_opt)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown transport in listen_addrs: {listen_addrs}")
|
raise ValueError(f"Unknown transport in listen_addrs: {listen_addrs}")
|
||||||
|
|||||||
@ -52,3 +52,19 @@ class ConnectionConfig:
|
|||||||
max_connections_per_peer: int = 3
|
max_connections_per_peer: int = 3
|
||||||
connection_timeout: float = 30.0
|
connection_timeout: float = 30.0
|
||||||
load_balancing_strategy: str = "round_robin" # or "least_loaded"
|
load_balancing_strategy: str = "round_robin" # or "least_loaded"
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
"""Validate configuration after initialization."""
|
||||||
|
if not (
|
||||||
|
self.load_balancing_strategy == "round_robin"
|
||||||
|
or self.load_balancing_strategy == "least_loaded"
|
||||||
|
):
|
||||||
|
raise ValueError(
|
||||||
|
"Load balancing strategy can only be 'round_robin' or 'least_loaded'"
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.max_connections_per_peer < 1:
|
||||||
|
raise ValueError("Max connection per peer should be atleast 1")
|
||||||
|
|
||||||
|
if self.connection_timeout < 0:
|
||||||
|
raise ValueError("Connection timeout should be positive")
|
||||||
|
|||||||
@ -465,8 +465,6 @@ class Swarm(Service, INetworkService):
|
|||||||
# Default to first connection
|
# Default to first connection
|
||||||
return connections[0]
|
return connections[0]
|
||||||
|
|
||||||
# >>>>>>> upstream/main
|
|
||||||
|
|
||||||
async def listen(self, *multiaddrs: Multiaddr) -> bool:
|
async def listen(self, *multiaddrs: Multiaddr) -> bool:
|
||||||
"""
|
"""
|
||||||
:param multiaddrs: one or many multiaddrs to start listening on
|
:param multiaddrs: one or many multiaddrs to start listening on
|
||||||
|
|||||||
@ -147,7 +147,7 @@ class MultiselectClient(IMultiselectClient):
|
|||||||
except MultiselectCommunicatorError as error:
|
except MultiselectCommunicatorError as error:
|
||||||
raise MultiselectClientError() from error
|
raise MultiselectClientError() from error
|
||||||
|
|
||||||
if response == protocol:
|
if response == protocol_str:
|
||||||
return protocol
|
return protocol
|
||||||
if response == PROTOCOL_NOT_FOUND_MSG:
|
if response == PROTOCOL_NOT_FOUND_MSG:
|
||||||
raise MultiselectClientError("protocol not supported")
|
raise MultiselectClientError("protocol not supported")
|
||||||
|
|||||||
@ -87,9 +87,15 @@ class QUICTransportConfig(ConnectionConfig):
|
|||||||
MAX_INCOMING_STREAMS: int = 1000
|
MAX_INCOMING_STREAMS: int = 1000
|
||||||
"""Maximum number of incoming streams per connection."""
|
"""Maximum number of incoming streams per connection."""
|
||||||
|
|
||||||
|
CONNECTION_HANDSHAKE_TIMEOUT: float = 60.0
|
||||||
|
"""Timeout for connection handshake (seconds)."""
|
||||||
|
|
||||||
MAX_OUTGOING_STREAMS: int = 1000
|
MAX_OUTGOING_STREAMS: int = 1000
|
||||||
"""Maximum number of outgoing streams per connection."""
|
"""Maximum number of outgoing streams per connection."""
|
||||||
|
|
||||||
|
CONNECTION_CLOSE_TIMEOUT: int = 10
|
||||||
|
"""Timeout for opening new connection (seconds)."""
|
||||||
|
|
||||||
# Stream timeouts
|
# Stream timeouts
|
||||||
STREAM_OPEN_TIMEOUT: float = 5.0
|
STREAM_OPEN_TIMEOUT: float = 5.0
|
||||||
"""Timeout for opening new streams (seconds)."""
|
"""Timeout for opening new streams (seconds)."""
|
||||||
@ -284,24 +290,6 @@ class QUICStreamFlowControlConfig:
|
|||||||
self.enable_auto_tuning = enable_auto_tuning
|
self.enable_auto_tuning = enable_auto_tuning
|
||||||
|
|
||||||
|
|
||||||
class QUICStreamMetricsConfig:
|
|
||||||
"""Configuration for QUIC stream metrics collection."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
enable_latency_tracking: bool = True,
|
|
||||||
enable_throughput_tracking: bool = True,
|
|
||||||
enable_error_tracking: bool = True,
|
|
||||||
metrics_retention_duration: float = 3600.0, # 1 hour
|
|
||||||
metrics_aggregation_interval: float = 60.0, # 1 minute
|
|
||||||
):
|
|
||||||
self.enable_latency_tracking = enable_latency_tracking
|
|
||||||
self.enable_throughput_tracking = enable_throughput_tracking
|
|
||||||
self.enable_error_tracking = enable_error_tracking
|
|
||||||
self.metrics_retention_duration = metrics_retention_duration
|
|
||||||
self.metrics_aggregation_interval = metrics_aggregation_interval
|
|
||||||
|
|
||||||
|
|
||||||
def create_stream_config_for_use_case(
|
def create_stream_config_for_use_case(
|
||||||
use_case: Literal[
|
use_case: Literal[
|
||||||
"high_throughput", "low_latency", "many_streams", "memory_constrained"
|
"high_throughput", "low_latency", "many_streams", "memory_constrained"
|
||||||
|
|||||||
@ -61,7 +61,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
MAX_CONCURRENT_STREAMS = 256
|
MAX_CONCURRENT_STREAMS = 256
|
||||||
MAX_INCOMING_STREAMS = 1000
|
MAX_INCOMING_STREAMS = 1000
|
||||||
MAX_OUTGOING_STREAMS = 1000
|
MAX_OUTGOING_STREAMS = 1000
|
||||||
STREAM_ACCEPT_TIMEOUT = 60.0
|
|
||||||
CONNECTION_HANDSHAKE_TIMEOUT = 60.0
|
CONNECTION_HANDSHAKE_TIMEOUT = 60.0
|
||||||
CONNECTION_CLOSE_TIMEOUT = 10.0
|
CONNECTION_CLOSE_TIMEOUT = 10.0
|
||||||
|
|
||||||
@ -145,7 +144,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
self.on_close: Callable[[], Awaitable[None]] | None = None
|
self.on_close: Callable[[], Awaitable[None]] | None = None
|
||||||
self.event_started = trio.Event()
|
self.event_started = trio.Event()
|
||||||
|
|
||||||
# *** NEW: Connection ID tracking - CRITICAL for fixing the original issue ***
|
|
||||||
self._available_connection_ids: set[bytes] = set()
|
self._available_connection_ids: set[bytes] = set()
|
||||||
self._current_connection_id: bytes | None = None
|
self._current_connection_id: bytes | None = None
|
||||||
self._retired_connection_ids: set[bytes] = set()
|
self._retired_connection_ids: set[bytes] = set()
|
||||||
@ -155,6 +153,14 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
self._event_processing_active = False
|
self._event_processing_active = False
|
||||||
self._pending_events: list[events.QuicEvent] = []
|
self._pending_events: list[events.QuicEvent] = []
|
||||||
|
|
||||||
|
# Set quic connection configuration
|
||||||
|
self.CONNECTION_CLOSE_TIMEOUT = transport._config.CONNECTION_CLOSE_TIMEOUT
|
||||||
|
self.MAX_INCOMING_STREAMS = transport._config.MAX_INCOMING_STREAMS
|
||||||
|
self.MAX_OUTGOING_STREAMS = transport._config.MAX_OUTGOING_STREAMS
|
||||||
|
self.CONNECTION_HANDSHAKE_TIMEOUT = (
|
||||||
|
transport._config.CONNECTION_HANDSHAKE_TIMEOUT
|
||||||
|
)
|
||||||
|
|
||||||
# Performance and monitoring
|
# Performance and monitoring
|
||||||
self._connection_start_time = time.time()
|
self._connection_start_time = time.time()
|
||||||
self._stats = {
|
self._stats = {
|
||||||
@ -166,7 +172,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
"bytes_received": 0,
|
"bytes_received": 0,
|
||||||
"packets_sent": 0,
|
"packets_sent": 0,
|
||||||
"packets_received": 0,
|
"packets_received": 0,
|
||||||
# *** NEW: Connection ID statistics ***
|
|
||||||
"connection_ids_issued": 0,
|
"connection_ids_issued": 0,
|
||||||
"connection_ids_retired": 0,
|
"connection_ids_retired": 0,
|
||||||
"connection_id_changes": 0,
|
"connection_id_changes": 0,
|
||||||
@ -191,11 +196,9 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
For libp2p, we primarily use bidirectional streams.
|
For libp2p, we primarily use bidirectional streams.
|
||||||
"""
|
"""
|
||||||
if self._is_initiator:
|
if self._is_initiator:
|
||||||
return 0 # Client starts with 0, then 4, 8, 12...
|
return 0
|
||||||
else:
|
else:
|
||||||
return 1 # Server starts with 1, then 5, 9, 13...
|
return 1
|
||||||
|
|
||||||
# Properties
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_initiator(self) -> bool: # type: ignore
|
def is_initiator(self) -> bool: # type: ignore
|
||||||
@ -234,7 +237,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
"""Get the remote peer ID."""
|
"""Get the remote peer ID."""
|
||||||
return self._remote_peer_id
|
return self._remote_peer_id
|
||||||
|
|
||||||
# *** NEW: Connection ID management methods ***
|
|
||||||
def get_connection_id_stats(self) -> dict[str, Any]:
|
def get_connection_id_stats(self) -> dict[str, Any]:
|
||||||
"""Get connection ID statistics and current state."""
|
"""Get connection ID statistics and current state."""
|
||||||
return {
|
return {
|
||||||
@ -420,7 +422,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
# Check for idle streams that can be cleaned up
|
# Check for idle streams that can be cleaned up
|
||||||
await self._cleanup_idle_streams()
|
await self._cleanup_idle_streams()
|
||||||
|
|
||||||
# *** NEW: Log connection ID status periodically ***
|
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
cid_stats = self.get_connection_id_stats()
|
cid_stats = self.get_connection_id_stats()
|
||||||
logger.debug(f"Connection ID stats: {cid_stats}")
|
logger.debug(f"Connection ID stats: {cid_stats}")
|
||||||
|
|||||||
Reference in New Issue
Block a user