diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 32f3b31d..606d3140 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -1,3 +1,5 @@ +"""Libp2p Python implementation.""" + import logging from libp2p.transport.quic.utils import is_quic_multiaddr @@ -197,10 +199,10 @@ def new_swarm( id_opt = generate_peer_id_from(key_pair) transport: TCP | QUICTransport + quic_transport_opt = connection_config if isinstance(connection_config, QUICTransportConfig) else None if listen_addrs is None: if enable_quic: - quic_transport_opt = connection_config if isinstance(connection_config, QUICTransportConfig) else None transport = QUICTransport(key_pair.private_key, config=quic_transport_opt) else: transport = TCP() @@ -210,7 +212,6 @@ def new_swarm( if addr.__contains__("tcp"): transport = TCP() elif is_quic: - quic_transport_opt = connection_config if isinstance(connection_config, QUICTransportConfig) else None transport = QUICTransport(key_pair.private_key, config=quic_transport_opt) else: raise ValueError(f"Unknown transport in listen_addrs: {listen_addrs}") diff --git a/libp2p/network/config.py b/libp2p/network/config.py index 33934ed5..e0fad33c 100644 --- a/libp2p/network/config.py +++ b/libp2p/network/config.py @@ -52,3 +52,19 @@ class ConnectionConfig: max_connections_per_peer: int = 3 connection_timeout: float = 30.0 load_balancing_strategy: str = "round_robin" # or "least_loaded" + + def __post_init__(self) -> None: + """Validate configuration after initialization.""" + if not ( + self.load_balancing_strategy == "round_robin" + or self.load_balancing_strategy == "least_loaded" + ): + raise ValueError( + "Load balancing strategy can only be 'round_robin' or 'least_loaded'" + ) + + if self.max_connections_per_peer < 1: + raise ValueError("Max connection per peer should be atleast 1") + + if self.connection_timeout < 0: + raise ValueError("Connection timeout should be positive") diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 800c55b2..b182def2 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -465,8 +465,6 @@ class Swarm(Service, INetworkService): # Default to first connection return connections[0] - # >>>>>>> upstream/main - async def listen(self, *multiaddrs: Multiaddr) -> bool: """ :param multiaddrs: one or many multiaddrs to start listening on diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index e5ae315b..90adb251 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -147,7 +147,7 @@ class MultiselectClient(IMultiselectClient): except MultiselectCommunicatorError as error: raise MultiselectClientError() from error - if response == protocol: + if response == protocol_str: return protocol if response == PROTOCOL_NOT_FOUND_MSG: raise MultiselectClientError("protocol not supported") diff --git a/libp2p/transport/quic/config.py b/libp2p/transport/quic/config.py index 5b70f0e5..e0c87adf 100644 --- a/libp2p/transport/quic/config.py +++ b/libp2p/transport/quic/config.py @@ -87,9 +87,15 @@ class QUICTransportConfig(ConnectionConfig): MAX_INCOMING_STREAMS: int = 1000 """Maximum number of incoming streams per connection.""" + CONNECTION_HANDSHAKE_TIMEOUT: float = 60.0 + """Timeout for connection handshake (seconds).""" + MAX_OUTGOING_STREAMS: int = 1000 """Maximum number of outgoing streams per connection.""" + CONNECTION_CLOSE_TIMEOUT: int = 10 + """Timeout for opening new connection (seconds).""" + # Stream timeouts STREAM_OPEN_TIMEOUT: float = 5.0 """Timeout for opening new streams (seconds).""" @@ -284,24 +290,6 @@ class QUICStreamFlowControlConfig: self.enable_auto_tuning = enable_auto_tuning -class QUICStreamMetricsConfig: - """Configuration for QUIC stream metrics collection.""" - - def __init__( - self, - enable_latency_tracking: bool = True, - enable_throughput_tracking: bool = True, - enable_error_tracking: bool = True, - metrics_retention_duration: float = 3600.0, # 1 hour - metrics_aggregation_interval: float = 60.0, # 1 minute - ): - self.enable_latency_tracking = enable_latency_tracking - self.enable_throughput_tracking = enable_throughput_tracking - self.enable_error_tracking = enable_error_tracking - self.metrics_retention_duration = metrics_retention_duration - self.metrics_aggregation_interval = metrics_aggregation_interval - - def create_stream_config_for_use_case( use_case: Literal[ "high_throughput", "low_latency", "many_streams", "memory_constrained" diff --git a/libp2p/transport/quic/connection.py b/libp2p/transport/quic/connection.py index 7e8ce4e5..799008f1 100644 --- a/libp2p/transport/quic/connection.py +++ b/libp2p/transport/quic/connection.py @@ -61,7 +61,6 @@ class QUICConnection(IRawConnection, IMuxedConn): MAX_CONCURRENT_STREAMS = 256 MAX_INCOMING_STREAMS = 1000 MAX_OUTGOING_STREAMS = 1000 - STREAM_ACCEPT_TIMEOUT = 60.0 CONNECTION_HANDSHAKE_TIMEOUT = 60.0 CONNECTION_CLOSE_TIMEOUT = 10.0 @@ -145,7 +144,6 @@ class QUICConnection(IRawConnection, IMuxedConn): self.on_close: Callable[[], Awaitable[None]] | None = None self.event_started = trio.Event() - # *** NEW: Connection ID tracking - CRITICAL for fixing the original issue *** self._available_connection_ids: set[bytes] = set() self._current_connection_id: bytes | None = None self._retired_connection_ids: set[bytes] = set() @@ -155,6 +153,14 @@ class QUICConnection(IRawConnection, IMuxedConn): self._event_processing_active = False self._pending_events: list[events.QuicEvent] = [] + # Set quic connection configuration + self.CONNECTION_CLOSE_TIMEOUT = transport._config.CONNECTION_CLOSE_TIMEOUT + self.MAX_INCOMING_STREAMS = transport._config.MAX_INCOMING_STREAMS + self.MAX_OUTGOING_STREAMS = transport._config.MAX_OUTGOING_STREAMS + self.CONNECTION_HANDSHAKE_TIMEOUT = ( + transport._config.CONNECTION_HANDSHAKE_TIMEOUT + ) + # Performance and monitoring self._connection_start_time = time.time() self._stats = { @@ -166,7 +172,6 @@ class QUICConnection(IRawConnection, IMuxedConn): "bytes_received": 0, "packets_sent": 0, "packets_received": 0, - # *** NEW: Connection ID statistics *** "connection_ids_issued": 0, "connection_ids_retired": 0, "connection_id_changes": 0, @@ -191,11 +196,9 @@ class QUICConnection(IRawConnection, IMuxedConn): For libp2p, we primarily use bidirectional streams. """ if self._is_initiator: - return 0 # Client starts with 0, then 4, 8, 12... + return 0 else: - return 1 # Server starts with 1, then 5, 9, 13... - - # Properties + return 1 @property def is_initiator(self) -> bool: # type: ignore @@ -234,7 +237,6 @@ class QUICConnection(IRawConnection, IMuxedConn): """Get the remote peer ID.""" return self._remote_peer_id - # *** NEW: Connection ID management methods *** def get_connection_id_stats(self) -> dict[str, Any]: """Get connection ID statistics and current state.""" return { @@ -420,7 +422,6 @@ class QUICConnection(IRawConnection, IMuxedConn): # Check for idle streams that can be cleaned up await self._cleanup_idle_streams() - # *** NEW: Log connection ID status periodically *** if logger.isEnabledFor(logging.DEBUG): cid_stats = self.get_connection_id_stats() logger.debug(f"Connection ID stats: {cid_stats}")