Merge branch 'main' into add-ws-transport

This commit is contained in:
yashksaini-coder
2025-09-10 02:49:52 +05:30
committed by GitHub
40 changed files with 9325 additions and 89 deletions

70
libp2p/network/config.py Normal file
View File

@ -0,0 +1,70 @@
from dataclasses import dataclass
@dataclass
class RetryConfig:
"""
Configuration for retry logic with exponential backoff.
This configuration controls how connection attempts are retried when they fail.
The retry mechanism uses exponential backoff with jitter to prevent thundering
herd problems in distributed systems.
Attributes:
max_retries: Maximum number of retry attempts before giving up.
Default: 3 attempts
initial_delay: Initial delay in seconds before the first retry.
Default: 0.1 seconds (100ms)
max_delay: Maximum delay cap in seconds to prevent excessive wait times.
Default: 30.0 seconds
backoff_multiplier: Multiplier for exponential backoff (each retry multiplies
the delay by this factor). Default: 2.0 (doubles each time)
jitter_factor: Random jitter factor (0.0-1.0) to add randomness to delays
and prevent synchronized retries. Default: 0.1 (10% jitter)
"""
max_retries: int = 3
initial_delay: float = 0.1
max_delay: float = 30.0
backoff_multiplier: float = 2.0
jitter_factor: float = 0.1
@dataclass
class ConnectionConfig:
"""
Configuration for multi-connection support.
This configuration controls how multiple connections per peer are managed,
including connection limits, timeouts, and load balancing strategies.
Attributes:
max_connections_per_peer: Maximum number of connections allowed to a single
peer. Default: 3 connections
connection_timeout: Timeout in seconds for establishing new connections.
Default: 30.0 seconds
load_balancing_strategy: Strategy for distributing streams across connections.
Options: "round_robin" (default) or "least_loaded"
"""
max_connections_per_peer: int = 3
connection_timeout: float = 30.0
load_balancing_strategy: str = "round_robin" # or "least_loaded"
def __post_init__(self) -> None:
"""Validate configuration after initialization."""
if not (
self.load_balancing_strategy == "round_robin"
or self.load_balancing_strategy == "least_loaded"
):
raise ValueError(
"Load balancing strategy can only be 'round_robin' or 'least_loaded'"
)
if self.max_connections_per_peer < 1:
raise ValueError("Max connection per peer should be atleast 1")
if self.connection_timeout < 0:
raise ValueError("Connection timeout should be positive")

View File

@ -17,6 +17,7 @@ from libp2p.stream_muxer.exceptions import (
MuxedStreamError,
MuxedStreamReset,
)
from libp2p.transport.quic.exceptions import QUICStreamClosedError, QUICStreamResetError
from .exceptions import (
StreamClosed,
@ -170,7 +171,7 @@ class NetStream(INetStream):
elif self.__stream_state == StreamState.OPEN:
self.__stream_state = StreamState.CLOSE_READ
raise StreamEOF() from error
except MuxedStreamReset as error:
except (MuxedStreamReset, QUICStreamClosedError, QUICStreamResetError) as error:
async with self._state_lock:
if self.__stream_state in [
StreamState.OPEN,
@ -199,7 +200,12 @@ class NetStream(INetStream):
try:
await self.muxed_stream.write(data)
except (MuxedStreamClosed, MuxedStreamError) as error:
except (
MuxedStreamClosed,
MuxedStreamError,
QUICStreamClosedError,
QUICStreamResetError,
) as error:
async with self._state_lock:
if self.__stream_state == StreamState.OPEN:
self.__stream_state = StreamState.CLOSE_WRITE

View File

@ -2,9 +2,9 @@ from collections.abc import (
Awaitable,
Callable,
)
from dataclasses import dataclass
import logging
import random
from typing import cast
from multiaddr import (
Multiaddr,
@ -27,6 +27,7 @@ from libp2p.custom_types import (
from libp2p.io.abc import (
ReadWriteCloser,
)
from libp2p.network.config import ConnectionConfig, RetryConfig
from libp2p.peer.id import (
ID,
)
@ -41,6 +42,9 @@ from libp2p.transport.exceptions import (
OpenConnectionError,
SecurityUpgradeFailure,
)
from libp2p.transport.quic.config import QUICTransportConfig
from libp2p.transport.quic.connection import QUICConnection
from libp2p.transport.quic.transport import QUICTransport
from libp2p.transport.upgrader import (
TransportUpgrader,
)
@ -61,59 +65,6 @@ from .exceptions import (
logger = logging.getLogger("libp2p.network.swarm")
@dataclass
class RetryConfig:
"""
Configuration for retry logic with exponential backoff.
This configuration controls how connection attempts are retried when they fail.
The retry mechanism uses exponential backoff with jitter to prevent thundering
herd problems in distributed systems.
Attributes:
max_retries: Maximum number of retry attempts before giving up.
Default: 3 attempts
initial_delay: Initial delay in seconds before the first retry.
Default: 0.1 seconds (100ms)
max_delay: Maximum delay cap in seconds to prevent excessive wait times.
Default: 30.0 seconds
backoff_multiplier: Multiplier for exponential backoff (each retry multiplies
the delay by this factor). Default: 2.0 (doubles each time)
jitter_factor: Random jitter factor (0.0-1.0) to add randomness to delays
and prevent synchronized retries. Default: 0.1 (10% jitter)
"""
max_retries: int = 3
initial_delay: float = 0.1
max_delay: float = 30.0
backoff_multiplier: float = 2.0
jitter_factor: float = 0.1
@dataclass
class ConnectionConfig:
"""
Configuration for multi-connection support.
This configuration controls how multiple connections per peer are managed,
including connection limits, timeouts, and load balancing strategies.
Attributes:
max_connections_per_peer: Maximum number of connections allowed to a single
peer. Default: 3 connections
connection_timeout: Timeout in seconds for establishing new connections.
Default: 30.0 seconds
load_balancing_strategy: Strategy for distributing streams across connections.
Options: "round_robin" (default) or "least_loaded"
"""
max_connections_per_peer: int = 3
connection_timeout: float = 30.0
load_balancing_strategy: str = "round_robin" # or "least_loaded"
def create_default_stream_handler(network: INetworkService) -> StreamHandlerFn:
async def stream_handler(stream: INetStream) -> None:
await network.get_manager().wait_finished()
@ -126,8 +77,7 @@ class Swarm(Service, INetworkService):
peerstore: IPeerStore
upgrader: TransportUpgrader
transport: ITransport
# Enhanced: Support for multiple connections per peer
connections: dict[ID, list[INetConn]] # Multiple connections per peer
connections: dict[ID, list[INetConn]]
listeners: dict[str, IListener]
common_stream_handler: StreamHandlerFn
listener_nursery: trio.Nursery | None
@ -137,7 +87,7 @@ class Swarm(Service, INetworkService):
# Enhanced: New configuration
retry_config: RetryConfig
connection_config: ConnectionConfig
connection_config: ConnectionConfig | QUICTransportConfig
_round_robin_index: dict[ID, int]
def __init__(
@ -147,7 +97,7 @@ class Swarm(Service, INetworkService):
upgrader: TransportUpgrader,
transport: ITransport,
retry_config: RetryConfig | None = None,
connection_config: ConnectionConfig | None = None,
connection_config: ConnectionConfig | QUICTransportConfig | None = None,
):
self.self_id = peer_id
self.peerstore = peerstore
@ -178,6 +128,11 @@ class Swarm(Service, INetworkService):
# Create a nursery for listener tasks.
self.listener_nursery = nursery
self.event_listener_nursery_created.set()
if isinstance(self.transport, QUICTransport):
self.transport.set_background_nursery(nursery)
self.transport.set_swarm(self)
try:
await self.manager.wait_finished()
finally:
@ -370,6 +325,7 @@ class Swarm(Service, INetworkService):
# Dial peer (connection to peer does not yet exist)
# Transport dials peer (gets back a raw conn)
try:
addr = Multiaddr(f"{addr}/p2p/{peer_id}")
raw_conn = await self.transport.dial(addr)
except OpenConnectionError as error:
logger.debug("fail to dial peer %s over base transport", peer_id)
@ -377,6 +333,15 @@ class Swarm(Service, INetworkService):
f"fail to open connection to peer {peer_id}"
) from error
if isinstance(self.transport, QUICTransport) and isinstance(
raw_conn, IMuxedConn
):
logger.info(
"Skipping upgrade for QUIC, QUIC connections are already multiplexed"
)
swarm_conn = await self.add_conn(raw_conn)
return swarm_conn
logger.debug("dialed peer %s over base transport", peer_id)
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure
@ -402,9 +367,7 @@ class Swarm(Service, INetworkService):
logger.debug("upgraded mux for peer %s", peer_id)
swarm_conn = await self.add_conn(muxed_conn)
logger.debug("successfully dialed peer %s", peer_id)
return swarm_conn
async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn:
@ -427,7 +390,6 @@ class Swarm(Service, INetworkService):
:return: net stream instance
"""
logger.debug("attempting to open a stream to peer %s", peer_id)
# Get existing connections or dial new ones
connections = self.get_connections(peer_id)
if not connections:
@ -436,6 +398,10 @@ class Swarm(Service, INetworkService):
# Load balancing strategy at interface level
connection = self._select_connection(connections, peer_id)
if isinstance(self.transport, QUICTransport) and connection is not None:
conn = cast(SwarmConn, connection)
return await conn.new_stream()
try:
net_stream = await connection.new_stream()
logger.debug("successfully opened a stream to peer %s", peer_id)
@ -517,6 +483,7 @@ class Swarm(Service, INetworkService):
"""
logger.debug(f"Swarm.listen called with multiaddrs: {multiaddrs}")
# We need to wait until `self.listener_nursery` is created.
logger.debug("Starting to listen")
await self.event_listener_nursery_created.wait()
success_count = 0
@ -531,6 +498,22 @@ class Swarm(Service, INetworkService):
async def conn_handler(
read_write_closer: ReadWriteCloser, maddr: Multiaddr = maddr
) -> None:
# No need to upgrade QUIC Connection
if isinstance(self.transport, QUICTransport):
try:
quic_conn = cast(QUICConnection, read_write_closer)
await self.add_conn(quic_conn)
peer_id = quic_conn.peer_id
logger.debug(
f"successfully opened quic connection to peer {peer_id}"
)
# NOTE: This is a intentional barrier to prevent from the
# handler exiting and closing the connection.
await self.manager.wait_finished()
except Exception:
await read_write_closer.close()
return
raw_conn = RawConnection(read_write_closer, False)
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first
@ -668,9 +651,10 @@ class Swarm(Service, INetworkService):
muxed_conn,
self,
)
logger.debug("Swarm::add_conn | starting muxed connection")
self.manager.run_task(muxed_conn.start)
await muxed_conn.event_started.wait()
logger.debug("Swarm::add_conn | starting swarm connection")
self.manager.run_task(swarm_conn.start)
await swarm_conn.event_started.wait()