mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
fix: add echo example
This commit is contained in:
@ -34,6 +34,11 @@ if TYPE_CHECKING:
|
||||
from .security import QUICTLSConfigManager
|
||||
from .transport import QUICTransport
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler()],
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -286,11 +291,13 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
try:
|
||||
with QUICErrorContext("connection_establishment", "connection"):
|
||||
# Start the connection if not already started
|
||||
print("STARTING TO CONNECT")
|
||||
if not self._started:
|
||||
await self.start()
|
||||
|
||||
# Start background event processing
|
||||
if not self._background_tasks_started:
|
||||
print("STARTING BACKGROUND TASK")
|
||||
await self._start_background_tasks()
|
||||
|
||||
# Wait for handshake completion with timeout
|
||||
@ -324,16 +331,17 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
self._background_tasks_started = True
|
||||
|
||||
# Start event processing task
|
||||
self._nursery.start_soon(self._event_processing_loop)
|
||||
self._nursery.start_soon(async_fn=self._event_processing_loop)
|
||||
|
||||
# Start periodic tasks
|
||||
self._nursery.start_soon(self._periodic_maintenance)
|
||||
# self._nursery.start_soon(async_fn=self._periodic_maintenance)
|
||||
|
||||
logger.debug("Started background tasks for QUIC connection")
|
||||
|
||||
async def _event_processing_loop(self) -> None:
|
||||
"""Main event processing loop for the connection."""
|
||||
logger.debug("Started QUIC event processing loop")
|
||||
print("Started QUIC event processing loop")
|
||||
|
||||
try:
|
||||
while not self._closed:
|
||||
@ -347,7 +355,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
await self._transmit()
|
||||
|
||||
# Short sleep to prevent busy waiting
|
||||
await trio.sleep(0.001) # 1ms
|
||||
await trio.sleep(0.01)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in event processing loop: {e}")
|
||||
@ -381,6 +389,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
QUICPeerVerificationError: If peer verification fails
|
||||
|
||||
"""
|
||||
print("VERIFYING PEER IDENTITY")
|
||||
if not self._security_manager:
|
||||
logger.warning("No security manager available for peer verification")
|
||||
return
|
||||
@ -719,6 +728,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
|
||||
async def _handle_quic_event(self, event: events.QuicEvent) -> None:
|
||||
"""Handle a single QUIC event."""
|
||||
print(f"QUIC event: {type(event).__name__}")
|
||||
if isinstance(event, events.ConnectionTerminated):
|
||||
await self._handle_connection_terminated(event)
|
||||
elif isinstance(event, events.HandshakeCompleted):
|
||||
@ -731,6 +741,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
await self._handle_datagram_received(event)
|
||||
else:
|
||||
logger.debug(f"Unhandled QUIC event: {type(event).__name__}")
|
||||
print(f"Unhandled QUIC event: {type(event).__name__}")
|
||||
|
||||
async def _handle_handshake_completed(
|
||||
self, event: events.HandshakeCompleted
|
||||
@ -897,6 +908,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
"""Send pending datagrams using trio."""
|
||||
sock = self._socket
|
||||
if not sock:
|
||||
print("No socket to transmit")
|
||||
return
|
||||
|
||||
try:
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -13,7 +13,7 @@ from aioquic.quic.configuration import (
|
||||
QuicConfiguration,
|
||||
)
|
||||
from aioquic.quic.connection import (
|
||||
QuicConnection,
|
||||
QuicConnection as NativeQUICConnection,
|
||||
)
|
||||
import multiaddr
|
||||
import trio
|
||||
@ -60,6 +60,11 @@ from .security import (
|
||||
QUIC_V1_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_V1
|
||||
QUIC_DRAFT29_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_DRAFT29
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler()],
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -279,20 +284,24 @@ class QUICTransport(ITransport):
|
||||
|
||||
# Get appropriate QUIC client configuration
|
||||
config_key = TProtocol(f"{quic_version}_client")
|
||||
print("config_key", config_key, self._quic_configs.keys())
|
||||
config = self._quic_configs.get(config_key)
|
||||
if not config:
|
||||
raise QUICDialError(f"Unsupported QUIC version: {quic_version}")
|
||||
|
||||
config.is_client = True
|
||||
logger.debug(
|
||||
f"Dialing QUIC connection to {host}:{port} (version: {quic_version})"
|
||||
)
|
||||
|
||||
print("Start QUIC Connection")
|
||||
# Create QUIC connection using aioquic's sans-IO core
|
||||
quic_connection = QuicConnection(configuration=config)
|
||||
native_quic_connection = NativeQUICConnection(configuration=config)
|
||||
|
||||
print("QUIC Connection Created")
|
||||
# Create trio-based QUIC connection wrapper with security
|
||||
connection = QUICConnection(
|
||||
quic_connection=quic_connection,
|
||||
quic_connection=native_quic_connection,
|
||||
remote_addr=(host, port),
|
||||
peer_id=peer_id,
|
||||
local_peer_id=self._peer_id,
|
||||
@ -354,6 +363,7 @@ class QUICTransport(ITransport):
|
||||
)
|
||||
|
||||
logger.info(f"Peer identity verified: {verified_peer_id}")
|
||||
print(f"Peer identity verified: {verified_peer_id}")
|
||||
|
||||
except Exception as e:
|
||||
raise QUICSecurityError(f"Peer identity verification failed: {e}") from e
|
||||
|
||||
@ -5,14 +5,19 @@ Based on go-libp2p and js-libp2p QUIC implementations.
|
||||
"""
|
||||
|
||||
import ipaddress
|
||||
import logging
|
||||
|
||||
from aioquic.quic.configuration import QuicConfiguration
|
||||
import multiaddr
|
||||
|
||||
from libp2p.custom_types import TProtocol
|
||||
from libp2p.transport.quic.security import QUICTLSConfigManager
|
||||
|
||||
from .config import QUICTransportConfig
|
||||
from .exceptions import QUICInvalidMultiaddrError, QUICUnsupportedVersionError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Protocol constants
|
||||
QUIC_V1_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_V1
|
||||
QUIC_DRAFT29_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_DRAFT29
|
||||
@ -20,6 +25,18 @@ UDP_PROTOCOL = "udp"
|
||||
IP4_PROTOCOL = "ip4"
|
||||
IP6_PROTOCOL = "ip6"
|
||||
|
||||
SERVER_CONFIG_PROTOCOL_V1 = f"{QUIC_V1_PROTOCOL}_SERVER"
|
||||
SERVER_CONFIG_PROTOCOL_DRAFT_29 = f"{QUIC_V1_PROTOCOL}_SERVER"
|
||||
CLIENT_CONFIG_PROTCOL_V1 = f"{QUIC_DRAFT29_PROTOCOL}_SERVER"
|
||||
CLIENT_CONFIG_PROTOCOL_DRAFT_29 = f"{QUIC_DRAFT29_PROTOCOL}_SERVER"
|
||||
|
||||
CUSTOM_QUIC_VERSION_MAPPING = {
|
||||
SERVER_CONFIG_PROTOCOL_V1: 0x00000001, # RFC 9000
|
||||
CLIENT_CONFIG_PROTCOL_V1: 0x00000001, # RFC 9000
|
||||
SERVER_CONFIG_PROTOCOL_DRAFT_29: 0xFF00001D, # draft-29
|
||||
CLIENT_CONFIG_PROTOCOL_DRAFT_29: 0xFF00001D, # draft-29
|
||||
}
|
||||
|
||||
# QUIC version to wire format mappings (required for aioquic)
|
||||
QUIC_VERSION_MAPPINGS = {
|
||||
QUIC_V1_PROTOCOL: 0x00000001, # RFC 9000
|
||||
@ -218,6 +235,27 @@ def quic_version_to_wire_format(version: TProtocol) -> int:
|
||||
return wire_version
|
||||
|
||||
|
||||
def custom_quic_version_to_wire_format(version: TProtocol) -> int:
|
||||
"""
|
||||
Convert QUIC version string to wire format integer for aioquic.
|
||||
|
||||
Args:
|
||||
version: QUIC version string ("quic-v1" or "quic")
|
||||
|
||||
Returns:
|
||||
Wire format version number
|
||||
|
||||
Raises:
|
||||
QUICUnsupportedVersionError: If version is not supported
|
||||
|
||||
"""
|
||||
wire_version = QUIC_VERSION_MAPPINGS.get(version)
|
||||
if wire_version is None:
|
||||
raise QUICUnsupportedVersionError(f"Unsupported QUIC version: {version}")
|
||||
|
||||
return wire_version
|
||||
|
||||
|
||||
def get_alpn_protocols() -> list[str]:
|
||||
"""
|
||||
Get ALPN protocols for libp2p over QUIC.
|
||||
@ -250,3 +288,94 @@ def normalize_quic_multiaddr(maddr: multiaddr.Multiaddr) -> multiaddr.Multiaddr:
|
||||
version = multiaddr_to_quic_version(maddr)
|
||||
|
||||
return create_quic_multiaddr(host, port, version)
|
||||
|
||||
|
||||
def create_server_config_from_base(
|
||||
base_config: QuicConfiguration,
|
||||
security_manager: QUICTLSConfigManager | None = None,
|
||||
transport_config: QUICTransportConfig | None = None,
|
||||
) -> QuicConfiguration:
|
||||
"""
|
||||
Create a server configuration without using deepcopy.
|
||||
Manually copies attributes while handling cryptography objects properly.
|
||||
"""
|
||||
try:
|
||||
# Create new server configuration from scratch
|
||||
server_config = QuicConfiguration(is_client=False)
|
||||
|
||||
# Copy basic configuration attributes (these are safe to copy)
|
||||
copyable_attrs = [
|
||||
"alpn_protocols",
|
||||
"verify_mode",
|
||||
"max_datagram_frame_size",
|
||||
"idle_timeout",
|
||||
"max_concurrent_streams",
|
||||
"supported_versions",
|
||||
"max_data",
|
||||
"max_stream_data",
|
||||
"stateless_retry",
|
||||
"quantum_readiness_test",
|
||||
]
|
||||
|
||||
for attr in copyable_attrs:
|
||||
if hasattr(base_config, attr):
|
||||
value = getattr(base_config, attr)
|
||||
if value is not None:
|
||||
setattr(server_config, attr, value)
|
||||
|
||||
# Handle cryptography objects - these need direct reference, not copying
|
||||
crypto_attrs = [
|
||||
"certificate",
|
||||
"private_key",
|
||||
"certificate_chain",
|
||||
"ca_certs",
|
||||
]
|
||||
|
||||
for attr in crypto_attrs:
|
||||
if hasattr(base_config, attr):
|
||||
value = getattr(base_config, attr)
|
||||
if value is not None:
|
||||
setattr(server_config, attr, value)
|
||||
|
||||
# Apply security manager configuration if available
|
||||
if security_manager:
|
||||
try:
|
||||
server_tls_config = security_manager.create_server_config()
|
||||
|
||||
# Override with security manager's TLS configuration
|
||||
if "certificate" in server_tls_config:
|
||||
server_config.certificate = server_tls_config["certificate"]
|
||||
if "private_key" in server_tls_config:
|
||||
server_config.private_key = server_tls_config["private_key"]
|
||||
if "certificate_chain" in server_tls_config:
|
||||
# type: ignore
|
||||
server_config.certificate_chain = server_tls_config[ # type: ignore
|
||||
"certificate_chain" # type: ignore
|
||||
]
|
||||
if "alpn_protocols" in server_tls_config:
|
||||
# type: ignore
|
||||
server_config.alpn_protocols = server_tls_config["alpn_protocols"] # type: ignore
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to apply security manager config: {e}")
|
||||
|
||||
# Set transport-specific defaults if provided
|
||||
if transport_config:
|
||||
if server_config.idle_timeout == 0:
|
||||
server_config.idle_timeout = getattr(
|
||||
transport_config, "idle_timeout", 30.0
|
||||
)
|
||||
if server_config.max_datagram_frame_size is None:
|
||||
server_config.max_datagram_frame_size = getattr(
|
||||
transport_config, "max_datagram_size", 1200
|
||||
)
|
||||
# Ensure we have ALPN protocols
|
||||
if server_config.alpn_protocols:
|
||||
server_config.alpn_protocols = ["libp2p"]
|
||||
|
||||
logger.debug("Successfully created server config without deepcopy")
|
||||
return server_config
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create server config: {e}")
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user