chore: cleanup and near v1 quic impl

This commit is contained in:
Akash Mondal
2025-07-02 16:51:16 +00:00
committed by lla-dane
parent c15c317514
commit 03bf071739
12 changed files with 311 additions and 2124 deletions

View File

@ -1,4 +1,3 @@
"""
QUIC Security implementation for py-libp2p Module 5.
Implements libp2p TLS specification for QUIC transport with peer identity integration.
@ -8,7 +7,7 @@ Based on go-libp2p and js-libp2p security patterns.
from dataclasses import dataclass, field
import logging
import ssl
from typing import List, Optional, Union
from typing import Any
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
@ -130,14 +129,16 @@ class LibP2PExtensionHandler:
) from e
@staticmethod
def parse_signed_key_extension(extension: Extension) -> tuple[PublicKey, bytes]:
def parse_signed_key_extension(
extension: Extension[Any],
) -> tuple[PublicKey, bytes]:
"""
Parse the libp2p Public Key Extension with enhanced debugging.
"""
try:
print(f"🔍 Extension type: {type(extension)}")
print(f"🔍 Extension.value type: {type(extension.value)}")
# Extract the raw bytes from the extension
if isinstance(extension.value, UnrecognizedExtension):
# Use the .value property to get the bytes
@ -147,10 +148,10 @@ class LibP2PExtensionHandler:
# Fallback if it's already bytes somehow
raw_bytes = extension.value
print("🔍 Extension.value is already bytes")
print(f"🔍 Total extension length: {len(raw_bytes)} bytes")
print(f"🔍 Extension hex (first 50 bytes): {raw_bytes[:50].hex()}")
if not isinstance(raw_bytes, bytes):
raise QUICCertificateError(f"Expected bytes, got {type(raw_bytes)}")
@ -191,28 +192,37 @@ class LibP2PExtensionHandler:
signature = raw_bytes[offset : offset + signature_length]
print(f"🔍 Extracted signature length: {len(signature)} bytes")
print(f"🔍 Signature hex (first 20 bytes): {signature[:20].hex()}")
print(f"🔍 Signature starts with DER header: {signature[:2].hex() == '3045'}")
print(
f"🔍 Signature starts with DER header: {signature[:2].hex() == '3045'}"
)
# Detailed signature analysis
if len(signature) >= 2:
if signature[0] == 0x30:
der_length = signature[1]
print(f"🔍 DER sequence length field: {der_length}")
print(f"🔍 Expected DER total: {der_length + 2}")
print(f"🔍 Actual signature length: {len(signature)}")
logger.debug(
f"🔍 Expected DER total: {der_length + 2}"
f"🔍 Actual signature length: {len(signature)}"
)
if len(signature) != der_length + 2:
print(f"⚠️ DER length mismatch! Expected {der_length + 2}, got {len(signature)}")
logger.debug(
"⚠️ DER length mismatch! "
f"Expected {der_length + 2}, got {len(signature)}"
)
# Try truncating to correct DER length
if der_length + 2 < len(signature):
print(f"🔧 Truncating signature to correct DER length: {der_length + 2}")
signature = signature[:der_length + 2]
logger.debug(
"🔧 Truncating signature to correct DER length: "
f"{der_length + 2}"
)
signature = signature[: der_length + 2]
# Check if we have extra data
expected_total = 4 + public_key_length + 4 + signature_length
print(f"🔍 Expected total length: {expected_total}")
print(f"🔍 Actual total length: {len(raw_bytes)}")
if len(raw_bytes) > expected_total:
extra_bytes = len(raw_bytes) - expected_total
print(f"⚠️ Extra {extra_bytes} bytes detected!")
@ -221,7 +231,7 @@ class LibP2PExtensionHandler:
# Deserialize the public key
public_key = LibP2PKeyConverter.deserialize_public_key(public_key_bytes)
print(f"🔍 Successfully deserialized public key: {type(public_key)}")
print(f"🔍 Final signature to return: {len(signature)} bytes")
return public_key, signature
@ -229,6 +239,7 @@ class LibP2PExtensionHandler:
except Exception as e:
print(f"❌ Extension parsing failed: {e}")
import traceback
print(f"❌ Traceback: {traceback.format_exc()}")
raise QUICCertificateError(
f"Failed to parse signed key extension: {e}"
@ -470,26 +481,26 @@ class QUICTLSSecurityConfig:
# Core TLS components (required)
certificate: Certificate
private_key: Union[EllipticCurvePrivateKey, RSAPrivateKey]
private_key: EllipticCurvePrivateKey | RSAPrivateKey
# Certificate chain (optional)
certificate_chain: List[Certificate] = field(default_factory=list)
certificate_chain: list[Certificate] = field(default_factory=list)
# ALPN protocols
alpn_protocols: List[str] = field(default_factory=lambda: ["libp2p"])
alpn_protocols: list[str] = field(default_factory=lambda: ["libp2p"])
# TLS verification settings
verify_mode: ssl.VerifyMode = ssl.CERT_NONE
check_hostname: bool = False
# Optional peer ID for validation
peer_id: Optional[ID] = None
peer_id: ID | None = None
# Configuration metadata
is_client_config: bool = False
config_name: Optional[str] = None
config_name: str | None = None
def __post_init__(self):
def __post_init__(self) -> None:
"""Validate configuration after initialization."""
self._validate()
@ -516,46 +527,6 @@ class QUICTLSSecurityConfig:
if not self.alpn_protocols:
raise ValueError("At least one ALPN protocol is required")
def to_dict(self) -> dict:
"""
Convert to dictionary format for compatibility with existing code.
Returns:
Dictionary compatible with the original TSecurityConfig format
"""
return {
"certificate": self.certificate,
"private_key": self.private_key,
"certificate_chain": self.certificate_chain.copy(),
"alpn_protocols": self.alpn_protocols.copy(),
"verify_mode": self.verify_mode,
"check_hostname": self.check_hostname,
}
@classmethod
def from_dict(cls, config_dict: dict, **kwargs) -> "QUICTLSSecurityConfig":
"""
Create instance from dictionary format.
Args:
config_dict: Dictionary in TSecurityConfig format
**kwargs: Additional parameters for the config
Returns:
QUICTLSSecurityConfig instance
"""
return cls(
certificate=config_dict["certificate"],
private_key=config_dict["private_key"],
certificate_chain=config_dict.get("certificate_chain", []),
alpn_protocols=config_dict.get("alpn_protocols", ["libp2p"]),
verify_mode=config_dict.get("verify_mode", False),
check_hostname=config_dict.get("check_hostname", False),
**kwargs,
)
def validate_certificate_key_match(self) -> bool:
"""
Validate that the certificate and private key match.
@ -621,7 +592,7 @@ class QUICTLSSecurityConfig:
except Exception:
return False
def get_certificate_info(self) -> dict:
def get_certificate_info(self) -> dict[Any, Any]:
"""
Get certificate information for debugging.
@ -652,7 +623,7 @@ class QUICTLSSecurityConfig:
print(f"Check hostname: {self.check_hostname}")
print(f"Certificate chain length: {len(self.certificate_chain)}")
cert_info = self.get_certificate_info()
cert_info: dict[Any, Any] = self.get_certificate_info()
for key, value in cert_info.items():
print(f"Certificate {key}: {value}")
@ -663,9 +634,9 @@ class QUICTLSSecurityConfig:
def create_server_tls_config(
certificate: Certificate,
private_key: Union[EllipticCurvePrivateKey, RSAPrivateKey],
peer_id: Optional[ID] = None,
**kwargs,
private_key: EllipticCurvePrivateKey | RSAPrivateKey,
peer_id: ID | None = None,
**kwargs: Any,
) -> QUICTLSSecurityConfig:
"""
Create a server TLS configuration.
@ -694,9 +665,9 @@ def create_server_tls_config(
def create_client_tls_config(
certificate: Certificate,
private_key: Union[EllipticCurvePrivateKey, RSAPrivateKey],
peer_id: Optional[ID] = None,
**kwargs,
private_key: EllipticCurvePrivateKey | RSAPrivateKey,
peer_id: ID | None = None,
**kwargs: Any,
) -> QUICTLSSecurityConfig:
"""
Create a client TLS configuration.
@ -729,7 +700,7 @@ class QUICTLSConfigManager:
Integrates with aioquic's TLS configuration system.
"""
def __init__(self, libp2p_private_key: PrivateKey, peer_id: ID):
def __init__(self, libp2p_private_key: PrivateKey, peer_id: ID) -> None:
self.libp2p_private_key = libp2p_private_key
self.peer_id = peer_id
self.certificate_generator = CertificateGenerator()