fix: use ASN.1 format certificate extension

This commit is contained in:
Akash Mondal
2025-08-14 14:14:15 +00:00
committed by lla-dane
parent f550c19b2c
commit 5ed3707a51
4 changed files with 257 additions and 89 deletions

View File

@ -172,9 +172,7 @@ class QUICTransportConfig:
"""Backoff factor for stream error retries."""
# Protocol identifiers matching go-libp2p
# TODO: UNTIL MUITIADDR REPO IS UPDATED
# PROTOCOL_QUIC_V1: TProtocol = TProtocol("/quic-v1") # RFC 9000
PROTOCOL_QUIC_V1: TProtocol = TProtocol("quic") # RFC 9000
PROTOCOL_QUIC_V1: TProtocol = TProtocol("quic-v1") # RFC 9000
PROTOCOL_QUIC_DRAFT29: TProtocol = TProtocol("quic") # draft-29
def __post_init__(self) -> None:

View File

@ -519,6 +519,7 @@ class QUICConnection(IRawConnection, IMuxedConn):
self._peer_verified = True
logger.debug(f"Peer identity verified successfully: {verified_peer_id}")
return verified_peer_id
except QUICPeerVerificationError:

View File

@ -80,7 +80,8 @@ class LibP2PExtensionHandler:
@staticmethod
def create_signed_key_extension(
libp2p_private_key: PrivateKey, cert_public_key: bytes
libp2p_private_key: PrivateKey,
cert_public_key: bytes,
) -> bytes:
"""
Create the libp2p Public Key Extension with signed key proof.
@ -94,7 +95,7 @@ class LibP2PExtensionHandler:
cert_public_key: The certificate's public key bytes
Returns:
ASN.1 encoded extension value
Encoded extension value
"""
try:
@ -107,33 +108,78 @@ class LibP2PExtensionHandler:
# Sign the payload with the libp2p private key
signature = libp2p_private_key.sign(signature_payload)
# Create the SignedKey structure (simplified ASN.1 encoding)
# In a full implementation, this would use proper ASN.1 encoding
# Get the public key bytes
public_key_bytes = libp2p_public_key.serialize()
# Simple encoding:
# [public_key_length][public_key][signature_length][signature]
extension_data = (
len(public_key_bytes).to_bytes(4, byteorder="big")
+ public_key_bytes
+ len(signature).to_bytes(4, byteorder="big")
+ signature
# Create ASN.1 DER encoded structure (go-libp2p compatible)
return LibP2PExtensionHandler._create_asn1_der_extension(
public_key_bytes, signature
)
return extension_data
except Exception as e:
raise QUICCertificateError(
f"Failed to create signed key extension: {e}"
) from e
@staticmethod
def _create_asn1_der_extension(public_key_bytes: bytes, signature: bytes) -> bytes:
"""
Create ASN.1 DER encoded extension (go-libp2p compatible).
Structure:
SEQUENCE {
publicKey OCTET STRING,
signature OCTET STRING
}
"""
# Encode public key as OCTET STRING
pubkey_octets = LibP2PExtensionHandler._encode_der_octet_string(
public_key_bytes
)
# Encode signature as OCTET STRING
sig_octets = LibP2PExtensionHandler._encode_der_octet_string(signature)
# Combine into SEQUENCE
sequence_content = pubkey_octets + sig_octets
# Encode as SEQUENCE
return LibP2PExtensionHandler._encode_der_sequence(sequence_content)
@staticmethod
def _encode_der_length(length: int) -> bytes:
"""Encode length in DER format."""
if length < 128:
# Short form
return bytes([length])
else:
# Long form
length_bytes = length.to_bytes(
(length.bit_length() + 7) // 8, byteorder="big"
)
return bytes([0x80 | len(length_bytes)]) + length_bytes
@staticmethod
def _encode_der_octet_string(data: bytes) -> bytes:
"""Encode data as DER OCTET STRING."""
return (
bytes([0x04]) + LibP2PExtensionHandler._encode_der_length(len(data)) + data
)
@staticmethod
def _encode_der_sequence(data: bytes) -> bytes:
"""Encode data as DER SEQUENCE."""
return (
bytes([0x30]) + LibP2PExtensionHandler._encode_der_length(len(data)) + data
)
@staticmethod
def parse_signed_key_extension(
extension: Extension[Any],
) -> tuple[PublicKey, bytes]:
"""
Parse the libp2p Public Key Extension with support for all crypto types.
Handles Ed25519, Secp256k1, RSA, ECDSA, and ECC_P256 signature formats.
Handles both ASN.1 DER format (from go-libp2p) and simple binary format.
"""
try:
logger.debug(f"🔍 Extension type: {type(extension)}")
@ -155,59 +201,13 @@ class LibP2PExtensionHandler:
if not isinstance(raw_bytes, bytes):
raise QUICCertificateError(f"Expected bytes, got {type(raw_bytes)}")
offset = 0
# Parse public key length and data
if len(raw_bytes) < 4:
raise QUICCertificateError("Extension too short for public key length")
public_key_length = int.from_bytes(
raw_bytes[offset : offset + 4], byteorder="big"
)
logger.debug(f"🔍 Public key length: {public_key_length} bytes")
offset += 4
if len(raw_bytes) < offset + public_key_length:
raise QUICCertificateError("Extension too short for public key data")
public_key_bytes = raw_bytes[offset : offset + public_key_length]
logger.debug(f"🔍 Public key data: {public_key_bytes.hex()}")
offset += public_key_length
# Parse signature length and data
if len(raw_bytes) < offset + 4:
raise QUICCertificateError("Extension too short for signature length")
signature_length = int.from_bytes(
raw_bytes[offset : offset + 4], byteorder="big"
)
logger.debug(f"🔍 Signature length: {signature_length} bytes")
offset += 4
if len(raw_bytes) < offset + signature_length:
raise QUICCertificateError("Extension too short for signature data")
signature_data = raw_bytes[offset : offset + signature_length]
logger.debug(f"🔍 Signature data length: {len(signature_data)} bytes")
logger.debug(
f"🔍 Signature data hex (first 20 bytes): {signature_data[:20].hex()}"
)
# Deserialize the public key to determine the crypto type
public_key = LibP2PKeyConverter.deserialize_public_key(public_key_bytes)
logger.debug(f"🔍 Successfully deserialized public key: {type(public_key)}")
# Extract signature based on key type
signature = LibP2PExtensionHandler._extract_signature_by_key_type(
public_key, signature_data
)
logger.debug(f"🔍 Final signature to return: {len(signature)} bytes")
logger.debug(
f"🔍 Final signature hex (first 20 bytes): {signature[:20].hex()}"
)
return public_key, signature
# Check if this is ASN.1 DER encoded (from go-libp2p)
if len(raw_bytes) >= 4 and raw_bytes[0] == 0x30:
logger.debug("🔍 Detected ASN.1 DER encoding")
return LibP2PExtensionHandler._parse_asn1_der_extension(raw_bytes)
else:
logger.debug("🔍 Using simple binary format parsing")
return LibP2PExtensionHandler._parse_simple_binary_extension(raw_bytes)
except Exception as e:
logger.debug(f"❌ Extension parsing failed: {e}")
@ -218,6 +218,165 @@ class LibP2PExtensionHandler:
f"Failed to parse signed key extension: {e}"
) from e
@staticmethod
def _parse_asn1_der_extension(raw_bytes: bytes) -> tuple[PublicKey, bytes]:
"""
Parse ASN.1 DER encoded extension (go-libp2p format).
The structure is typically:
SEQUENCE {
publicKey OCTET STRING,
signature OCTET STRING
}
"""
try:
offset = 0
# Parse SEQUENCE tag
if raw_bytes[offset] != 0x30:
raise QUICCertificateError(
f"Expected SEQUENCE tag (0x30), got {raw_bytes[offset]:02x}"
)
offset += 1
# Parse SEQUENCE length
seq_length, length_bytes = LibP2PExtensionHandler._parse_der_length(
raw_bytes[offset:]
)
offset += length_bytes
logger.debug(f"🔍 SEQUENCE length: {seq_length} bytes")
# Parse first OCTET STRING (public key)
if raw_bytes[offset] != 0x04:
raise QUICCertificateError(
f"Expected OCTET STRING tag (0x04), got {raw_bytes[offset]:02x}"
)
offset += 1
pubkey_length, length_bytes = LibP2PExtensionHandler._parse_der_length(
raw_bytes[offset:]
)
offset += length_bytes
logger.debug(f"🔍 Public key length: {pubkey_length} bytes")
if len(raw_bytes) < offset + pubkey_length:
raise QUICCertificateError("Extension too short for public key data")
public_key_bytes = raw_bytes[offset : offset + pubkey_length]
offset += pubkey_length
# Parse second OCTET STRING (signature)
if offset < len(raw_bytes) and raw_bytes[offset] == 0x04:
offset += 1
sig_length, length_bytes = LibP2PExtensionHandler._parse_der_length(
raw_bytes[offset:]
)
offset += length_bytes
logger.debug(f"🔍 Signature length: {sig_length} bytes")
if len(raw_bytes) < offset + sig_length:
raise QUICCertificateError("Extension too short for signature data")
signature_data = raw_bytes[offset : offset + sig_length]
else:
# Signature might be the remaining bytes
signature_data = raw_bytes[offset:]
logger.debug(f"🔍 Public key data length: {len(public_key_bytes)} bytes")
logger.debug(f"🔍 Signature data length: {len(signature_data)} bytes")
# Deserialize the public key
public_key = LibP2PKeyConverter.deserialize_public_key(public_key_bytes)
logger.debug(f"🔍 Successfully deserialized public key: {type(public_key)}")
# Extract signature based on key type
signature = LibP2PExtensionHandler._extract_signature_by_key_type(
public_key, signature_data
)
return public_key, signature
except Exception as e:
raise QUICCertificateError(
f"Failed to parse ASN.1 DER extension: {e}"
) from e
@staticmethod
def _parse_der_length(data: bytes) -> tuple[int, int]:
"""
Parse DER length encoding.
Returns (length_value, bytes_consumed).
"""
if not data:
raise QUICCertificateError("No data for DER length")
first_byte = data[0]
# Short form (length < 128)
if first_byte < 0x80:
return first_byte, 1
# Long form
num_bytes = first_byte & 0x7F
if len(data) < 1 + num_bytes:
raise QUICCertificateError("Insufficient data for DER long form length")
length = 0
for i in range(1, num_bytes + 1):
length = (length << 8) | data[i]
return length, 1 + num_bytes
@staticmethod
def _parse_simple_binary_extension(raw_bytes: bytes) -> tuple[PublicKey, bytes]:
"""
Parse simple binary format extension (original py-libp2p format).
Format: [4-byte pubkey length][pubkey][4-byte sig length][signature]
"""
offset = 0
# Parse public key length and data
if len(raw_bytes) < 4:
raise QUICCertificateError("Extension too short for public key length")
public_key_length = int.from_bytes(
raw_bytes[offset : offset + 4], byteorder="big"
)
logger.debug(f"🔍 Public key length: {public_key_length} bytes")
offset += 4
if len(raw_bytes) < offset + public_key_length:
raise QUICCertificateError("Extension too short for public key data")
public_key_bytes = raw_bytes[offset : offset + public_key_length]
offset += public_key_length
# Parse signature length and data
if len(raw_bytes) < offset + 4:
raise QUICCertificateError("Extension too short for signature length")
signature_length = int.from_bytes(
raw_bytes[offset : offset + 4], byteorder="big"
)
logger.debug(f"🔍 Signature length: {signature_length} bytes")
offset += 4
if len(raw_bytes) < offset + signature_length:
raise QUICCertificateError("Extension too short for signature data")
signature_data = raw_bytes[offset : offset + signature_length]
# Deserialize the public key
public_key = LibP2PKeyConverter.deserialize_public_key(public_key_bytes)
logger.debug(f"🔍 Successfully deserialized public key: {type(public_key)}")
# Extract signature based on key type
signature = LibP2PExtensionHandler._extract_signature_by_key_type(
public_key, signature_data
)
return public_key, signature
@staticmethod
def _extract_signature_by_key_type(
public_key: PublicKey, signature_data: bytes
@ -582,7 +741,7 @@ class CertificateGenerator:
)
logger.info(f"Generated libp2p TLS certificate for peer {peer_id}")
print(f"Certificate valid from {not_before} to {not_after}")
logger.debug(f"Certificate valid from {not_before} to {not_after}")
return TLSConfig(
certificate=certificate, private_key=cert_private_key, peer_id=peer_id
@ -630,11 +789,11 @@ class PeerAuthenticator:
raise QUICPeerVerificationError("Certificate missing libp2p extension")
assert libp2p_extension.value is not None
print(f"Extension type: {type(libp2p_extension)}")
print(f"Extension value type: {type(libp2p_extension.value)}")
logger.debug(f"Extension type: {type(libp2p_extension)}")
logger.debug(f"Extension value type: {type(libp2p_extension.value)}")
if hasattr(libp2p_extension.value, "__len__"):
print(f"Extension value length: {len(libp2p_extension.value)}")
print(f"Extension value: {libp2p_extension.value}")
logger.debug(f"Extension value length: {len(libp2p_extension.value)}")
logger.debug(f"Extension value: {libp2p_extension.value}")
# Parse the extension to get public key and signature
public_key, signature = self.extension_handler.parse_signed_key_extension(
libp2p_extension
@ -661,14 +820,16 @@ class PeerAuthenticator:
# Verify against expected peer ID if provided
if expected_peer_id and derived_peer_id != expected_peer_id:
print(f"Expected Peer id: {expected_peer_id}")
print(f"Derived Peer ID: {derived_peer_id}")
logger.debug(f"Expected Peer id: {expected_peer_id}")
logger.debug(f"Derived Peer ID: {derived_peer_id}")
raise QUICPeerVerificationError(
f"Peer ID mismatch: expected {expected_peer_id}, "
f"got {derived_peer_id}"
)
logger.info(f"Successfully verified peer certificate for {derived_peer_id}")
logger.debug(
f"Successfully verified peer certificate for {derived_peer_id}"
)
return derived_peer_id
except QUICPeerVerificationError:
@ -822,21 +983,23 @@ class QUICTLSSecurityConfig:
return {"error": str(e)}
def debug_config(self) -> None:
"""Print debugging information about this configuration."""
print(f"=== TLS Security Config Debug ({self.config_name or 'unnamed'}) ===")
print(f"Is client config: {self.is_client_config}")
print(f"ALPN protocols: {self.alpn_protocols}")
print(f"Verify mode: {self.verify_mode}")
print(f"Check hostname: {self.check_hostname}")
print(f"Certificate chain length: {len(self.certificate_chain)}")
"""logger.debug debugging information about this configuration."""
logger.debug(
f"=== TLS Security Config Debug ({self.config_name or 'unnamed'}) ==="
)
logger.debug(f"Is client config: {self.is_client_config}")
logger.debug(f"ALPN protocols: {self.alpn_protocols}")
logger.debug(f"Verify mode: {self.verify_mode}")
logger.debug(f"Check hostname: {self.check_hostname}")
logger.debug(f"Certificate chain length: {len(self.certificate_chain)}")
cert_info: dict[Any, Any] = self.get_certificate_info()
for key, value in cert_info.items():
print(f"Certificate {key}: {value}")
logger.debug(f"Certificate {key}: {value}")
print(f"Private key type: {type(self.private_key).__name__}")
logger.debug(f"Private key type: {type(self.private_key).__name__}")
if hasattr(self.private_key, "key_size"):
print(f"Private key size: {self.private_key.key_size}")
logger.debug(f"Private key size: {self.private_key.key_size}")
def create_server_tls_config(

View File

@ -255,6 +255,12 @@ class QUICTransport(ITransport):
try:
# Extract connection details from multiaddr
host, port = quic_multiaddr_to_endpoint(maddr)
remote_peer_id = maddr.get_peer_id()
if remote_peer_id is not None:
remote_peer_id = ID.from_base58(remote_peer_id)
if remote_peer_id is None:
raise QUICDialError("Unable to derive peer id from multiaddr")
quic_version = multiaddr_to_quic_version(maddr)
# Get appropriate QUIC client configuration
@ -288,7 +294,7 @@ class QUICTransport(ITransport):
connection = QUICConnection(
quic_connection=native_quic_connection,
remote_addr=(host, port),
remote_peer_id=None,
remote_peer_id=remote_peer_id,
local_peer_id=self._peer_id,
is_initiator=True,
maddr=maddr,