mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
fix: refine selection of quic transport while init
This commit is contained in:
@ -19,7 +19,6 @@ from libp2p.crypto.secp256k1 import create_new_key_pair
|
|||||||
from libp2p.custom_types import TProtocol
|
from libp2p.custom_types import TProtocol
|
||||||
from libp2p.network.stream.net_stream import INetStream
|
from libp2p.network.stream.net_stream import INetStream
|
||||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
from libp2p.transport.quic.config import QUICTransportConfig
|
|
||||||
|
|
||||||
PROTOCOL_ID = TProtocol("/echo/1.0.0")
|
PROTOCOL_ID = TProtocol("/echo/1.0.0")
|
||||||
|
|
||||||
@ -52,18 +51,10 @@ async def run_server(port: int, seed: int | None = None) -> None:
|
|||||||
|
|
||||||
secret = secrets.token_bytes(32)
|
secret = secrets.token_bytes(32)
|
||||||
|
|
||||||
# QUIC transport configuration
|
|
||||||
quic_config = QUICTransportConfig(
|
|
||||||
idle_timeout=30.0,
|
|
||||||
max_concurrent_streams=100,
|
|
||||||
connection_timeout=10.0,
|
|
||||||
enable_draft29=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create host with QUIC transport
|
# Create host with QUIC transport
|
||||||
host = new_host(
|
host = new_host(
|
||||||
|
enable_quic=True,
|
||||||
key_pair=create_new_key_pair(secret),
|
key_pair=create_new_key_pair(secret),
|
||||||
transport_opt={"quic_config": quic_config},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Server mode: start listener
|
# Server mode: start listener
|
||||||
@ -98,18 +89,10 @@ async def run_client(destination: str, seed: int | None = None) -> None:
|
|||||||
|
|
||||||
secret = secrets.token_bytes(32)
|
secret = secrets.token_bytes(32)
|
||||||
|
|
||||||
# QUIC transport configuration
|
|
||||||
quic_config = QUICTransportConfig(
|
|
||||||
idle_timeout=30.0,
|
|
||||||
max_concurrent_streams=100,
|
|
||||||
connection_timeout=10.0,
|
|
||||||
enable_draft29=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create host with QUIC transport
|
# Create host with QUIC transport
|
||||||
host = new_host(
|
host = new_host(
|
||||||
|
enable_quic=True,
|
||||||
key_pair=create_new_key_pair(secret),
|
key_pair=create_new_key_pair(secret),
|
||||||
transport_opt={"quic_config": quic_config},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Client mode: NO listener, just connect
|
# Client mode: NO listener, just connect
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
from libp2p.transport.quic.utils import is_quic_multiaddr
|
from libp2p.transport.quic.utils import is_quic_multiaddr
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from libp2p.transport.quic.transport import QUICTransport
|
from libp2p.transport.quic.transport import QUICTransport
|
||||||
@ -87,7 +89,7 @@ MUXER_YAMUX = "YAMUX"
|
|||||||
MUXER_MPLEX = "MPLEX"
|
MUXER_MPLEX = "MPLEX"
|
||||||
DEFAULT_NEGOTIATE_TIMEOUT = 5
|
DEFAULT_NEGOTIATE_TIMEOUT = 5
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None:
|
def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None:
|
||||||
"""
|
"""
|
||||||
@ -163,7 +165,8 @@ def new_swarm(
|
|||||||
peerstore_opt: IPeerStore | None = None,
|
peerstore_opt: IPeerStore | None = None,
|
||||||
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
|
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
|
||||||
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
|
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
|
||||||
transport_opt: dict[Any, Any] | None = None,
|
enable_quic: bool = False,
|
||||||
|
quic_transport_opt: QUICTransportConfig | None = None,
|
||||||
) -> INetworkService:
|
) -> INetworkService:
|
||||||
"""
|
"""
|
||||||
Create a swarm instance based on the parameters.
|
Create a swarm instance based on the parameters.
|
||||||
@ -174,7 +177,8 @@ def new_swarm(
|
|||||||
:param peerstore_opt: optional peerstore
|
:param peerstore_opt: optional peerstore
|
||||||
:param muxer_preference: optional explicit muxer preference
|
:param muxer_preference: optional explicit muxer preference
|
||||||
:param listen_addrs: optional list of multiaddrs to listen on
|
:param listen_addrs: optional list of multiaddrs to listen on
|
||||||
:param transport_opt: options for transport
|
:param enable_quic: enable quic for transport
|
||||||
|
:param quic_transport_opt: options for transport
|
||||||
:return: return a default swarm instance
|
:return: return a default swarm instance
|
||||||
|
|
||||||
Note: Yamux (/yamux/1.0.0) is the preferred stream multiplexer
|
Note: Yamux (/yamux/1.0.0) is the preferred stream multiplexer
|
||||||
@ -182,6 +186,10 @@ def new_swarm(
|
|||||||
Mplex (/mplex/6.7.0) is retained for backward compatibility
|
Mplex (/mplex/6.7.0) is retained for backward compatibility
|
||||||
but may be deprecated in the future.
|
but may be deprecated in the future.
|
||||||
"""
|
"""
|
||||||
|
if not enable_quic and quic_transport_opt is not None:
|
||||||
|
logger.warning(f"QUIC config provided but QUIC not enabled, ignoring QUIC config")
|
||||||
|
quic_transport_opt = None
|
||||||
|
|
||||||
if key_pair is None:
|
if key_pair is None:
|
||||||
key_pair = generate_new_rsa_identity()
|
key_pair = generate_new_rsa_identity()
|
||||||
|
|
||||||
@ -190,22 +198,17 @@ def new_swarm(
|
|||||||
transport: TCP | QUICTransport
|
transport: TCP | QUICTransport
|
||||||
|
|
||||||
if listen_addrs is None:
|
if listen_addrs is None:
|
||||||
transport_opt = transport_opt or {}
|
if enable_quic:
|
||||||
quic_config: QUICTransportConfig | None = transport_opt.get('quic_config')
|
transport = QUICTransport(key_pair.private_key, config=quic_transport_opt)
|
||||||
|
|
||||||
if quic_config:
|
|
||||||
transport = QUICTransport(key_pair.private_key, quic_config)
|
|
||||||
else:
|
else:
|
||||||
transport = TCP()
|
transport = TCP()
|
||||||
else:
|
else:
|
||||||
addr = listen_addrs[0]
|
addr = listen_addrs[0]
|
||||||
is_quic = addr.__contains__("quic") or addr.__contains__("quic-v1")
|
is_quic = is_quic_multiaddr(addr)
|
||||||
if addr.__contains__("tcp"):
|
if addr.__contains__("tcp"):
|
||||||
transport = TCP()
|
transport = TCP()
|
||||||
elif is_quic:
|
elif is_quic:
|
||||||
transport_opt = transport_opt or {}
|
transport = QUICTransport(key_pair.private_key, config=quic_transport_opt)
|
||||||
quic_config = transport_opt.get('quic_config', QUICTransportConfig())
|
|
||||||
transport = QUICTransport(key_pair.private_key, quic_config)
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown transport in listen_addrs: {listen_addrs}")
|
raise ValueError(f"Unknown transport in listen_addrs: {listen_addrs}")
|
||||||
|
|
||||||
@ -266,7 +269,8 @@ def new_host(
|
|||||||
enable_mDNS: bool = False,
|
enable_mDNS: bool = False,
|
||||||
bootstrap: list[str] | None = None,
|
bootstrap: list[str] | None = None,
|
||||||
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
|
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
|
||||||
transport_opt: dict[Any, Any] | None = None,
|
enable_quic: bool = False,
|
||||||
|
quic_transport_opt: QUICTransportConfig | None = None,
|
||||||
) -> IHost:
|
) -> IHost:
|
||||||
"""
|
"""
|
||||||
Create a new libp2p host based on the given parameters.
|
Create a new libp2p host based on the given parameters.
|
||||||
@ -280,17 +284,23 @@ def new_host(
|
|||||||
:param listen_addrs: optional list of multiaddrs to listen on
|
:param listen_addrs: optional list of multiaddrs to listen on
|
||||||
:param enable_mDNS: whether to enable mDNS discovery
|
:param enable_mDNS: whether to enable mDNS discovery
|
||||||
:param bootstrap: optional list of bootstrap peer addresses as strings
|
:param bootstrap: optional list of bootstrap peer addresses as strings
|
||||||
:param transport_opt: optional dictionary of properties of transport
|
:param enable_quic: optinal choice to use QUIC for transport
|
||||||
|
:param transport_opt: optional configuration for quic transport
|
||||||
:return: return a host instance
|
:return: return a host instance
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if not enable_quic and quic_transport_opt is not None:
|
||||||
|
logger.warning(f"QUIC config provided but QUIC not enabled, ignoring QUIC config")
|
||||||
|
|
||||||
swarm = new_swarm(
|
swarm = new_swarm(
|
||||||
|
enable_quic=enable_quic,
|
||||||
key_pair=key_pair,
|
key_pair=key_pair,
|
||||||
muxer_opt=muxer_opt,
|
muxer_opt=muxer_opt,
|
||||||
sec_opt=sec_opt,
|
sec_opt=sec_opt,
|
||||||
peerstore_opt=peerstore_opt,
|
peerstore_opt=peerstore_opt,
|
||||||
muxer_preference=muxer_preference,
|
muxer_preference=muxer_preference,
|
||||||
listen_addrs=listen_addrs,
|
listen_addrs=listen_addrs,
|
||||||
transport_opt=transport_opt
|
quic_transport_opt=quic_transport_opt if enable_quic else None
|
||||||
)
|
)
|
||||||
|
|
||||||
if disc_opt is not None:
|
if disc_opt is not None:
|
||||||
|
|||||||
@ -51,9 +51,13 @@ class QUICTransportConfig:
|
|||||||
"""Configuration for QUIC transport."""
|
"""Configuration for QUIC transport."""
|
||||||
|
|
||||||
# Connection settings
|
# Connection settings
|
||||||
idle_timeout: float = 30.0 # Connection idle timeout in seconds
|
idle_timeout: float = 30.0 # Seconds before an idle connection is closed.
|
||||||
max_datagram_size: int = 1200 # Maximum UDP datagram size
|
max_datagram_size: int = (
|
||||||
local_port: int | None = None # Local port for binding (None = random)
|
1200 # Maximum size of UDP datagrams to avoid IP fragmentation.
|
||||||
|
)
|
||||||
|
local_port: int | None = (
|
||||||
|
None # Local port to bind to. If None, a random port is chosen.
|
||||||
|
)
|
||||||
|
|
||||||
# Protocol version support
|
# Protocol version support
|
||||||
enable_draft29: bool = True # Enable QUIC draft-29 for compatibility
|
enable_draft29: bool = True # Enable QUIC draft-29 for compatibility
|
||||||
@ -102,14 +106,14 @@ class QUICTransportConfig:
|
|||||||
"""Timeout for graceful stream close (seconds)."""
|
"""Timeout for graceful stream close (seconds)."""
|
||||||
|
|
||||||
# Flow control configuration
|
# Flow control configuration
|
||||||
STREAM_FLOW_CONTROL_WINDOW: int = 512 * 1024 # 512KB
|
STREAM_FLOW_CONTROL_WINDOW: int = 1024 * 1024 # 1MB
|
||||||
"""Per-stream flow control window size."""
|
"""Per-stream flow control window size."""
|
||||||
|
|
||||||
CONNECTION_FLOW_CONTROL_WINDOW: int = 768 * 1024 # 768KB
|
CONNECTION_FLOW_CONTROL_WINDOW: int = 1536 * 1024 # 1.5MB
|
||||||
"""Connection-wide flow control window size."""
|
"""Connection-wide flow control window size."""
|
||||||
|
|
||||||
# Buffer management
|
# Buffer management
|
||||||
MAX_STREAM_RECEIVE_BUFFER: int = 1024 * 1024 # 1MB
|
MAX_STREAM_RECEIVE_BUFFER: int = 2 * 1024 * 1024 # 2MB
|
||||||
"""Maximum receive buffer size per stream."""
|
"""Maximum receive buffer size per stream."""
|
||||||
|
|
||||||
STREAM_RECEIVE_BUFFER_LOW_WATERMARK: int = 64 * 1024 # 64KB
|
STREAM_RECEIVE_BUFFER_LOW_WATERMARK: int = 64 * 1024 # 64KB
|
||||||
|
|||||||
@ -655,13 +655,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
|||||||
|
|
||||||
return info
|
return info
|
||||||
|
|
||||||
# Legacy compatibility for existing code
|
|
||||||
async def verify_peer_identity(self) -> None:
|
|
||||||
"""
|
|
||||||
Legacy method for compatibility - delegates to security manager.
|
|
||||||
"""
|
|
||||||
await self._verify_peer_identity_with_security()
|
|
||||||
|
|
||||||
# Stream management methods (IMuxedConn interface)
|
# Stream management methods (IMuxedConn interface)
|
||||||
|
|
||||||
async def open_stream(self, timeout: float = 5.0) -> QUICStream:
|
async def open_stream(self, timeout: float = 5.0) -> QUICStream:
|
||||||
|
|||||||
@ -1163,20 +1163,3 @@ def create_quic_security_transport(
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
return QUICTLSConfigManager(libp2p_private_key, peer_id)
|
return QUICTLSConfigManager(libp2p_private_key, peer_id)
|
||||||
|
|
||||||
|
|
||||||
# Legacy compatibility functions for existing code
|
|
||||||
def generate_libp2p_tls_config(private_key: PrivateKey, peer_id: ID) -> TLSConfig:
|
|
||||||
"""
|
|
||||||
Legacy function for compatibility with existing transport code.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
private_key: libp2p private key
|
|
||||||
peer_id: libp2p peer ID
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
TLS configuration
|
|
||||||
|
|
||||||
"""
|
|
||||||
generator = CertificateGenerator()
|
|
||||||
return generator.generate_certificate(private_key, peer_id)
|
|
||||||
|
|||||||
@ -11,7 +11,6 @@ from libp2p import new_host
|
|||||||
from libp2p.crypto.secp256k1 import create_new_key_pair
|
from libp2p.crypto.secp256k1 import create_new_key_pair
|
||||||
from libp2p.custom_types import TProtocol
|
from libp2p.custom_types import TProtocol
|
||||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
from libp2p.transport.quic.config import QUICTransportConfig
|
|
||||||
from libp2p.utils.varint import encode_varint_prefixed, read_varint_prefixed_bytes
|
from libp2p.utils.varint import encode_varint_prefixed, read_varint_prefixed_bytes
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
@ -88,16 +87,10 @@ class NimEchoServer:
|
|||||||
async def run_echo_test(server_addr: str, messages: list[str]):
|
async def run_echo_test(server_addr: str, messages: list[str]):
|
||||||
"""Test echo protocol against nim server with proper timeout handling."""
|
"""Test echo protocol against nim server with proper timeout handling."""
|
||||||
# Create py-libp2p QUIC client with shorter timeouts
|
# Create py-libp2p QUIC client with shorter timeouts
|
||||||
quic_config = QUICTransportConfig(
|
|
||||||
idle_timeout=10.0,
|
|
||||||
max_concurrent_streams=10,
|
|
||||||
connection_timeout=5.0,
|
|
||||||
enable_draft29=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
host = new_host(
|
host = new_host(
|
||||||
|
enable_quic=True,
|
||||||
key_pair=create_new_key_pair(),
|
key_pair=create_new_key_pair(),
|
||||||
transport_opt={"quic_config": quic_config},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/udp/0/quic-v1")
|
listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/udp/0/quic-v1")
|
||||||
|
|||||||
Reference in New Issue
Block a user