chore: remove unwanted code, fix type issues and comments

This commit is contained in:
Akash Mondal
2025-08-31 13:15:51 +00:00
parent e1141ee376
commit 186113968e
6 changed files with 42 additions and 52 deletions

View File

@ -37,7 +37,6 @@ jobs:
with: with:
python-version: ${{ matrix.python }} python-version: ${{ matrix.python }}
# Add Nim installation for interop tests
- name: Install Nim for interop testing - name: Install Nim for interop testing
if: matrix.toxenv == 'interop' if: matrix.toxenv == 'interop'
run: | run: |
@ -46,7 +45,6 @@ jobs:
echo "$HOME/.nimble/bin" >> $GITHUB_PATH echo "$HOME/.nimble/bin" >> $GITHUB_PATH
echo "$HOME/.choosenim/toolchains/nim-stable/bin" >> $GITHUB_PATH echo "$HOME/.choosenim/toolchains/nim-stable/bin" >> $GITHUB_PATH
# Cache nimble packages - ADD THIS
- name: Cache nimble packages - name: Cache nimble packages
if: matrix.toxenv == 'interop' if: matrix.toxenv == 'interop'
uses: actions/cache@v4 uses: actions/cache@v4

View File

@ -1,12 +1,11 @@
""" """
QUIC Connection implementation. QUIC Connection implementation.
Uses aioquic's sans-IO core with trio for async operations. Manages bidirectional QUIC connections with integrated stream multiplexing.
""" """
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
import logging import logging
import socket import socket
from sys import stdout
import time import time
from typing import TYPE_CHECKING, Any, Optional from typing import TYPE_CHECKING, Any, Optional
@ -37,14 +36,7 @@ if TYPE_CHECKING:
from .security import QUICTLSConfigManager from .security import QUICTLSConfigManager
from .transport import QUICTransport from .transport import QUICTransport
logging.root.handlers = []
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
handlers=[logging.StreamHandler(stdout)],
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class QUICConnection(IRawConnection, IMuxedConn): class QUICConnection(IRawConnection, IMuxedConn):
@ -66,11 +58,11 @@ class QUICConnection(IRawConnection, IMuxedConn):
- COMPLETE connection ID management (fixes the original issue) - COMPLETE connection ID management (fixes the original issue)
""" """
MAX_CONCURRENT_STREAMS = 100 MAX_CONCURRENT_STREAMS = 256
MAX_INCOMING_STREAMS = 1000 MAX_INCOMING_STREAMS = 1000
MAX_OUTGOING_STREAMS = 1000 MAX_OUTGOING_STREAMS = 1000
STREAM_ACCEPT_TIMEOUT = 30.0 STREAM_ACCEPT_TIMEOUT = 60.0
CONNECTION_HANDSHAKE_TIMEOUT = 30.0 CONNECTION_HANDSHAKE_TIMEOUT = 60.0
CONNECTION_CLOSE_TIMEOUT = 10.0 CONNECTION_CLOSE_TIMEOUT = 10.0
def __init__( def __init__(
@ -107,7 +99,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._remote_peer_id = remote_peer_id self._remote_peer_id = remote_peer_id
self._local_peer_id = local_peer_id self._local_peer_id = local_peer_id
self.peer_id = remote_peer_id or local_peer_id self.peer_id = remote_peer_id or local_peer_id
self.__is_initiator = is_initiator self._is_initiator = is_initiator
self._maddr = maddr self._maddr = maddr
self._transport = transport self._transport = transport
self._security_manager = security_manager self._security_manager = security_manager
@ -198,7 +190,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
For libp2p, we primarily use bidirectional streams. For libp2p, we primarily use bidirectional streams.
""" """
if self.__is_initiator: if self._is_initiator:
return 0 # Client starts with 0, then 4, 8, 12... return 0 # Client starts with 0, then 4, 8, 12...
else: else:
return 1 # Server starts with 1, then 5, 9, 13... return 1 # Server starts with 1, then 5, 9, 13...
@ -208,7 +200,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
@property @property
def is_initiator(self) -> bool: # type: ignore def is_initiator(self) -> bool: # type: ignore
"""Check if this connection is the initiator.""" """Check if this connection is the initiator."""
return self.__is_initiator return self._is_initiator
@property @property
def is_closed(self) -> bool: def is_closed(self) -> bool:
@ -283,7 +275,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
try: try:
# If this is a client connection, we need to establish the connection # If this is a client connection, we need to establish the connection
if self.__is_initiator: if self._is_initiator:
await self._initiate_connection() await self._initiate_connection()
else: else:
# For server connections, we're already connected via the listener # For server connections, we're already connected via the listener
@ -383,7 +375,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._background_tasks_started = True self._background_tasks_started = True
if self.__is_initiator: if self._is_initiator:
self._nursery.start_soon(async_fn=self._client_packet_receiver) self._nursery.start_soon(async_fn=self._client_packet_receiver)
self._nursery.start_soon(async_fn=self._event_processing_loop) self._nursery.start_soon(async_fn=self._event_processing_loop)
@ -616,7 +608,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
"handshake_complete": self._handshake_completed, "handshake_complete": self._handshake_completed,
"peer_id": str(self._remote_peer_id) if self._remote_peer_id else None, "peer_id": str(self._remote_peer_id) if self._remote_peer_id else None,
"local_peer_id": str(self._local_peer_id), "local_peer_id": str(self._local_peer_id),
"is_initiator": self.__is_initiator, "is_initiator": self._is_initiator,
"has_certificate": self._peer_certificate is not None, "has_certificate": self._peer_certificate is not None,
"security_manager_available": self._security_manager is not None, "security_manager_available": self._security_manager is not None,
} }
@ -808,8 +800,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
logger.debug(f"Removed stream {stream_id} from connection") logger.debug(f"Removed stream {stream_id} from connection")
# *** UPDATED: Complete QUIC event handling - FIXES THE ORIGINAL ISSUE ***
async def _process_quic_events(self) -> None: async def _process_quic_events(self) -> None:
"""Process all pending QUIC events.""" """Process all pending QUIC events."""
if self._event_processing_active: if self._event_processing_active:
@ -868,8 +858,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
except Exception as e: except Exception as e:
logger.error(f"Error handling QUIC event {type(event).__name__}: {e}") logger.error(f"Error handling QUIC event {type(event).__name__}: {e}")
# *** NEW: Connection ID event handlers - THE MAIN FIX ***
async def _handle_connection_id_issued( async def _handle_connection_id_issued(
self, event: events.ConnectionIdIssued self, event: events.ConnectionIdIssued
) -> None: ) -> None:
@ -919,10 +907,15 @@ class QUICConnection(IRawConnection, IMuxedConn):
if self._current_connection_id == event.connection_id: if self._current_connection_id == event.connection_id:
if self._available_connection_ids: if self._available_connection_ids:
self._current_connection_id = next(iter(self._available_connection_ids)) self._current_connection_id = next(iter(self._available_connection_ids))
logger.debug( if self._current_connection_id:
f"Switching new connection ID: {self._current_connection_id.hex()}" logger.debug(
) "Switching to new connection ID: "
self._stats["connection_id_changes"] += 1 f"{self._current_connection_id.hex()}"
)
self._stats["connection_id_changes"] += 1
else:
logger.warning("⚠️ No available connection IDs after retirement!")
logger.debug("⚠️ No available connection IDs after retirement!")
else: else:
self._current_connection_id = None self._current_connection_id = None
logger.warning("⚠️ No available connection IDs after retirement!") logger.warning("⚠️ No available connection IDs after retirement!")
@ -931,8 +924,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Update statistics # Update statistics
self._stats["connection_ids_retired"] += 1 self._stats["connection_ids_retired"] += 1
# *** NEW: Additional event handlers for completeness ***
async def _handle_ping_acknowledged(self, event: events.PingAcknowledged) -> None: async def _handle_ping_acknowledged(self, event: events.PingAcknowledged) -> None:
"""Handle ping acknowledgment.""" """Handle ping acknowledgment."""
logger.debug(f"Ping acknowledged: uid={event.uid}") logger.debug(f"Ping acknowledged: uid={event.uid}")
@ -957,8 +948,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
# Handle stop sending on the stream if method exists # Handle stop sending on the stream if method exists
await stream.handle_stop_sending(event.error_code) await stream.handle_stop_sending(event.error_code)
# *** EXISTING event handlers (unchanged) ***
async def _handle_handshake_completed( async def _handle_handshake_completed(
self, event: events.HandshakeCompleted self, event: events.HandshakeCompleted
) -> None: ) -> None:
@ -1108,7 +1097,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
- Even IDs are client-initiated - Even IDs are client-initiated
- Odd IDs are server-initiated - Odd IDs are server-initiated
""" """
if self.__is_initiator: if self._is_initiator:
# We're the client, so odd stream IDs are incoming # We're the client, so odd stream IDs are incoming
return stream_id % 2 == 1 return stream_id % 2 == 1
else: else:
@ -1336,7 +1325,6 @@ class QUICConnection(IRawConnection, IMuxedConn):
QUICStreamTimeoutError: If read timeout occurs. QUICStreamTimeoutError: If read timeout occurs.
""" """
# This method doesn't make sense for a muxed connection
# It's here for interface compatibility but should not be used # It's here for interface compatibility but should not be used
raise NotImplementedError( raise NotImplementedError(
"Use streams for reading data from QUIC connections. " "Use streams for reading data from QUIC connections. "
@ -1399,7 +1387,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
return ( return (
f"QUICConnection(peer={self._remote_peer_id}, " f"QUICConnection(peer={self._remote_peer_id}, "
f"addr={self._remote_addr}, " f"addr={self._remote_addr}, "
f"initiator={self.__is_initiator}, " f"initiator={self._is_initiator}, "
f"verified={self._peer_verified}, " f"verified={self._peer_verified}, "
f"established={self._established}, " f"established={self._established}, "
f"streams={len(self._streams)}, " f"streams={len(self._streams)}, "

View File

@ -778,6 +778,16 @@ class PeerAuthenticator:
""" """
try: try:
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
if certificate.not_valid_after_utc < now:
raise QUICPeerVerificationError("Certificate has expired")
if certificate.not_valid_before_utc > now:
raise QUICPeerVerificationError("Certificate not yet valid")
# Extract libp2p extension # Extract libp2p extension
libp2p_extension = None libp2p_extension = None
for extension in certificate.extensions: for extension in certificate.extensions:

View File

@ -1,7 +1,6 @@
""" """
QUIC Stream implementation for py-libp2p Module 3. QUIC Stream implementation
Based on patterns from go-libp2p and js-libp2p QUIC implementations. Provides stream interface over QUIC's native multiplexing.
Uses aioquic's native stream capabilities with libp2p interface compliance.
""" """
from enum import Enum from enum import Enum

View File

@ -5,7 +5,6 @@ QUIC Transport implementation
import copy import copy
import logging import logging
import ssl import ssl
import sys
from typing import TYPE_CHECKING, cast from typing import TYPE_CHECKING, cast
from aioquic.quic.configuration import ( from aioquic.quic.configuration import (
@ -66,11 +65,6 @@ from .security import (
QUIC_V1_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_V1 QUIC_V1_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_V1
QUIC_DRAFT29_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_DRAFT29 QUIC_DRAFT29_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_DRAFT29
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -27,25 +27,26 @@ IP4_PROTOCOL = "ip4"
IP6_PROTOCOL = "ip6" IP6_PROTOCOL = "ip6"
SERVER_CONFIG_PROTOCOL_V1 = f"{QUIC_V1_PROTOCOL}_server" SERVER_CONFIG_PROTOCOL_V1 = f"{QUIC_V1_PROTOCOL}_server"
SERVER_CONFIG_PROTOCOL_DRAFT_29 = f"{QUIC_V1_PROTOCOL}_server" CLIENT_CONFIG_PROTCOL_V1 = f"{QUIC_V1_PROTOCOL}_client"
CLIENT_CONFIG_PROTCOL_V1 = f"{QUIC_DRAFT29_PROTOCOL}_client"
SERVER_CONFIG_PROTOCOL_DRAFT_29 = f"{QUIC_DRAFT29_PROTOCOL}_server"
CLIENT_CONFIG_PROTOCOL_DRAFT_29 = f"{QUIC_DRAFT29_PROTOCOL}_client" CLIENT_CONFIG_PROTOCOL_DRAFT_29 = f"{QUIC_DRAFT29_PROTOCOL}_client"
CUSTOM_QUIC_VERSION_MAPPING = { CUSTOM_QUIC_VERSION_MAPPING: dict[str, int] = {
SERVER_CONFIG_PROTOCOL_V1: 0x00000001, # RFC 9000 SERVER_CONFIG_PROTOCOL_V1: 0x00000001, # RFC 9000
CLIENT_CONFIG_PROTCOL_V1: 0x00000001, # RFC 9000 CLIENT_CONFIG_PROTCOL_V1: 0x00000001, # RFC 9000
SERVER_CONFIG_PROTOCOL_DRAFT_29: 0x00000001, # draft-29 SERVER_CONFIG_PROTOCOL_DRAFT_29: 0xFF00001D, # draft-29
CLIENT_CONFIG_PROTOCOL_DRAFT_29: 0x00000001, # draft-29 CLIENT_CONFIG_PROTOCOL_DRAFT_29: 0xFF00001D, # draft-29
} }
# QUIC version to wire format mappings (required for aioquic) # QUIC version to wire format mappings (required for aioquic)
QUIC_VERSION_MAPPINGS = { QUIC_VERSION_MAPPINGS: dict[TProtocol, int] = {
QUIC_V1_PROTOCOL: 0x00000001, # RFC 9000 QUIC_V1_PROTOCOL: 0x00000001, # RFC 9000
QUIC_DRAFT29_PROTOCOL: 0x00000001, # draft-29 QUIC_DRAFT29_PROTOCOL: 0xFF00001D, # draft-29
} }
# ALPN protocols for libp2p over QUIC # ALPN protocols for libp2p over QUIC
LIBP2P_ALPN_PROTOCOLS = ["libp2p"] LIBP2P_ALPN_PROTOCOLS: list[str] = ["libp2p"]
def is_quic_multiaddr(maddr: multiaddr.Multiaddr) -> bool: def is_quic_multiaddr(maddr: multiaddr.Multiaddr) -> bool: