diff --git a/libp2p/transport/quic/security.py b/libp2p/transport/quic/security.py index 3d123c7d..d09aeda3 100644 --- a/libp2p/transport/quic/security.py +++ b/libp2p/transport/quic/security.py @@ -28,6 +28,7 @@ from .exceptions import ( ) logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) # libp2p TLS Extension OID - Official libp2p specification LIBP2P_TLS_EXTENSION_OID = x509.ObjectIdentifier("1.3.6.1.4.1.53594.1.1") @@ -133,7 +134,8 @@ class LibP2PExtensionHandler: extension: Extension[Any], ) -> tuple[PublicKey, bytes]: """ - Parse the libp2p Public Key Extension with enhanced debugging. + Parse the libp2p Public Key Extension with support for all crypto types. + Handles Ed25519, Secp256k1, RSA, ECDSA, and ECC_P256 signature formats. """ try: logger.debug(f"🔍 Extension type: {type(extension)}") @@ -141,13 +143,11 @@ class LibP2PExtensionHandler: # Extract the raw bytes from the extension if isinstance(extension.value, UnrecognizedExtension): - # Use the .value property to get the bytes raw_bytes = extension.value.value logger.debug( "🔍 Extension is UnrecognizedExtension, using .value property" ) else: - # Fallback if it's already bytes somehow raw_bytes = extension.value logger.debug("🔍 Extension.value is already bytes") @@ -175,7 +175,6 @@ class LibP2PExtensionHandler: public_key_bytes = raw_bytes[offset : offset + public_key_length] logger.debug(f"🔍 Public key data: {public_key_bytes.hex()}") offset += public_key_length - logger.debug(f"🔍 Offset after public key: {offset}") # Parse signature length and data if len(raw_bytes) < offset + 4: @@ -186,55 +185,29 @@ class LibP2PExtensionHandler: ) logger.debug(f"🔍 Signature length: {signature_length} bytes") offset += 4 - logger.debug(f"🔍 Offset after signature length: {offset}") if len(raw_bytes) < offset + signature_length: raise QUICCertificateError("Extension too short for signature data") - signature = raw_bytes[offset : offset + signature_length] - logger.debug(f"🔍 Extracted signature length: {len(signature)} bytes") - logger.debug(f"🔍 Signature hex (first 20 bytes): {signature[:20].hex()}") + signature_data = raw_bytes[offset : offset + signature_length] + logger.debug(f"🔍 Signature data length: {len(signature_data)} bytes") logger.debug( - f"🔍 Signature starts with DER header: {signature[:2].hex() == '3045'}" + f"🔍 Signature data hex (first 20 bytes): {signature_data[:20].hex()}" ) - # Detailed signature analysis - if len(signature) >= 2: - if signature[0] == 0x30: - der_length = signature[1] - logger.debug( - f"🔍 Expected DER total: {der_length + 2}" - f"🔍 Actual signature length: {len(signature)}" - ) - - if len(signature) != der_length + 2: - logger.debug( - "⚠️ DER length mismatch! " - f"Expected {der_length + 2}, got {len(signature)}" - ) - # Try truncating to correct DER length - if der_length + 2 < len(signature): - logger.debug( - "🔧 Truncating signature to correct DER length: " - f"{der_length + 2}" - ) - signature = signature[: der_length + 2] - - # Check if we have extra data - expected_total = 4 + public_key_length + 4 + signature_length - logger.debug(f"🔍 Expected total length: {expected_total}") - logger.debug(f"🔍 Actual total length: {len(raw_bytes)}") - - if len(raw_bytes) > expected_total: - extra_bytes = len(raw_bytes) - expected_total - logger.debug(f"⚠️ Extra {extra_bytes} bytes detected!") - logger.debug(f"🔍 Extra data: {raw_bytes[expected_total:].hex()}") - - # Deserialize the public key + # Deserialize the public key to determine the crypto type public_key = LibP2PKeyConverter.deserialize_public_key(public_key_bytes) logger.debug(f"🔍 Successfully deserialized public key: {type(public_key)}") + # Extract signature based on key type + signature = LibP2PExtensionHandler._extract_signature_by_key_type( + public_key, signature_data + ) + logger.debug(f"🔍 Final signature to return: {len(signature)} bytes") + logger.debug( + f"🔍 Final signature hex (first 20 bytes): {signature[:20].hex()}" + ) return public_key, signature @@ -247,6 +220,238 @@ class LibP2PExtensionHandler: f"Failed to parse signed key extension: {e}" ) from e + @staticmethod + def _extract_signature_by_key_type( + public_key: PublicKey, signature_data: bytes + ) -> bytes: + """ + Extract the actual signature from signature_data based on the key type. + Different crypto libraries have different signature formats. + """ + if not hasattr(public_key, "get_type"): + logger.debug("⚠️ Public key has no get_type method, using signature as-is") + return signature_data + + key_type = public_key.get_type() + key_type_name = key_type.name if hasattr(key_type, "name") else str(key_type) + logger.debug(f"🔍 Processing signature for key type: {key_type_name}") + + # Handle different key types + if key_type_name == "Ed25519": + return LibP2PExtensionHandler._extract_ed25519_signature(signature_data) + + elif key_type_name == "Secp256k1": + return LibP2PExtensionHandler._extract_secp256k1_signature(signature_data) + + elif key_type_name == "RSA": + return LibP2PExtensionHandler._extract_rsa_signature(signature_data) + + elif key_type_name in ["ECDSA", "ECC_P256"]: + return LibP2PExtensionHandler._extract_ecdsa_signature(signature_data) + + else: + logger.debug( + f"⚠️ Unknown key type {key_type_name}, using generic extraction" + ) + return LibP2PExtensionHandler._extract_generic_signature(signature_data) + + @staticmethod + def _extract_ed25519_signature(signature_data: bytes) -> bytes: + """Extract Ed25519 signature (must be exactly 64 bytes).""" + logger.debug("🔧 Extracting Ed25519 signature") + + if len(signature_data) == 64: + logger.debug("✅ Ed25519 signature is already 64 bytes") + return signature_data + + logger.debug( + f"⚠️ Ed25519 signature is {len(signature_data)} bytes, extracting 64 bytes" + ) + + # Look for the payload marker and extract signature before it + payload_marker = b"libp2p-tls-handshake:" + marker_index = signature_data.find(payload_marker) + + if marker_index >= 64: + # The signature is likely the first 64 bytes before the payload + signature = signature_data[:64] + logger.debug("🔧 Using first 64 bytes as Ed25519 signature") + return signature + + elif marker_index > 0 and marker_index == 64: + # Perfect case: signature is exactly before the marker + signature = signature_data[:marker_index] + logger.debug(f"🔧 Using {len(signature)} bytes before payload marker") + return signature + + else: + # Fallback: try to extract first 64 bytes + if len(signature_data) >= 64: + signature = signature_data[:64] + logger.debug("🔧 Fallback: using first 64 bytes") + return signature + else: + logger.debug( + f"❌ Cannot extract 64 bytes from {len(signature_data)} byte signature" + ) + return signature_data + + @staticmethod + def _extract_secp256k1_signature(signature_data: bytes) -> bytes: + """ + Extract Secp256k1 signature. + Secp256k1 can use either DER-encoded or raw format depending on the implementation. + """ + logger.debug("🔧 Extracting Secp256k1 signature") + + # Look for payload marker to separate signature from payload + payload_marker = b"libp2p-tls-handshake:" + marker_index = signature_data.find(payload_marker) + + if marker_index > 0: + signature = signature_data[:marker_index] + logger.debug(f"🔧 Using {len(signature)} bytes before payload marker") + + # Check if it's DER-encoded (starts with 0x30) + if len(signature) >= 2 and signature[0] == 0x30: + logger.debug("🔍 Secp256k1 signature appears to be DER-encoded") + return LibP2PExtensionHandler._validate_der_signature(signature) + else: + logger.debug("🔍 Secp256k1 signature appears to be raw format") + return signature + else: + # No marker found, check if the whole data is DER-encoded + if len(signature_data) >= 2 and signature_data[0] == 0x30: + logger.debug( + "🔍 Secp256k1 signature appears to be DER-encoded (no marker)" + ) + return LibP2PExtensionHandler._validate_der_signature(signature_data) + else: + logger.debug("🔍 Using Secp256k1 signature data as-is") + return signature_data + + @staticmethod + def _extract_rsa_signature(signature_data: bytes) -> bytes: + """ + Extract RSA signature. + RSA signatures are typically raw bytes with length matching the key size. + """ + logger.debug("🔧 Extracting RSA signature") + + # Look for payload marker to separate signature from payload + payload_marker = b"libp2p-tls-handshake:" + marker_index = signature_data.find(payload_marker) + + if marker_index > 0: + signature = signature_data[:marker_index] + logger.debug( + f"🔧 Using {len(signature)} bytes before payload marker for RSA" + ) + return signature + else: + logger.debug("🔍 Using RSA signature data as-is") + return signature_data + + @staticmethod + def _extract_ecdsa_signature(signature_data: bytes) -> bytes: + """ + Extract ECDSA signature (typically DER-encoded ASN.1). + ECDSA signatures start with 0x30 (ASN.1 SEQUENCE). + """ + logger.debug("🔧 Extracting ECDSA signature") + + # Look for payload marker to separate signature from payload + payload_marker = b"libp2p-tls-handshake:" + marker_index = signature_data.find(payload_marker) + + if marker_index > 0: + signature = signature_data[:marker_index] + logger.debug(f"🔧 Using {len(signature)} bytes before payload marker") + + # Validate DER encoding for ECDSA + if len(signature) >= 2 and signature[0] == 0x30: + return LibP2PExtensionHandler._validate_der_signature(signature) + else: + logger.debug( + "⚠️ ECDSA signature doesn't start with DER header, using as-is" + ) + return signature + else: + # Check if the whole data is DER-encoded + if len(signature_data) >= 2 and signature_data[0] == 0x30: + logger.debug("🔍 ECDSA signature appears to be DER-encoded (no marker)") + return LibP2PExtensionHandler._validate_der_signature(signature_data) + else: + logger.debug("🔍 Using ECDSA signature data as-is") + return signature_data + + @staticmethod + def _extract_generic_signature(signature_data: bytes) -> bytes: + """ + Generic signature extraction for unknown key types. + Tries to detect DER encoding or extract based on payload marker. + """ + logger.debug("🔧 Extracting signature using generic method") + + # Look for payload marker to separate signature from payload + payload_marker = b"libp2p-tls-handshake:" + marker_index = signature_data.find(payload_marker) + + if marker_index > 0: + signature = signature_data[:marker_index] + logger.debug(f"🔧 Using {len(signature)} bytes before payload marker") + + # Check if it's DER-encoded + if len(signature) >= 2 and signature[0] == 0x30: + return LibP2PExtensionHandler._validate_der_signature(signature) + else: + return signature + else: + # Check if the whole data is DER-encoded + if len(signature_data) >= 2 and signature_data[0] == 0x30: + logger.debug( + "🔍 Generic signature appears to be DER-encoded (no marker)" + ) + return LibP2PExtensionHandler._validate_der_signature(signature_data) + else: + logger.debug("🔍 Using signature data as-is") + return signature_data + + @staticmethod + def _validate_der_signature(signature: bytes) -> bytes: + """ + Validate and potentially fix DER-encoded signatures. + DER signatures have the format: 30 [length] ... + """ + if len(signature) < 2: + return signature + + if signature[0] != 0x30: + logger.debug("⚠️ Signature doesn't start with DER SEQUENCE tag") + return signature + + # Get the DER length + der_length = signature[1] + expected_total_length = der_length + 2 + + logger.debug( + f"🔍 DER signature: length byte = {der_length}, " + f"expected total = {expected_total_length}, " + f"actual length = {len(signature)}" + ) + + if len(signature) == expected_total_length: + logger.debug("✅ DER signature length is correct") + return signature + elif len(signature) > expected_total_length: + logger.debug( + f"🔧 Truncating DER signature from {len(signature)} to {expected_total_length} bytes" + ) + return signature[:expected_total_length] + else: + logger.debug(f"⚠️ DER signature is shorter than expected, using as-is") + return signature + class LibP2PKeyConverter: """ @@ -378,7 +583,7 @@ class CertificateGenerator: ) logger.info(f"Generated libp2p TLS certificate for peer {peer_id}") - logger.debug(f"Certificate valid from {not_before} to {not_after}") + print(f"Certificate valid from {not_before} to {not_after}") return TLSConfig( certificate=certificate, private_key=cert_private_key, peer_id=peer_id @@ -426,11 +631,11 @@ class PeerAuthenticator: raise QUICPeerVerificationError("Certificate missing libp2p extension") assert libp2p_extension.value is not None - logger.debug(f"Extension type: {type(libp2p_extension)}") - logger.debug(f"Extension value type: {type(libp2p_extension.value)}") + print(f"Extension type: {type(libp2p_extension)}") + print(f"Extension value type: {type(libp2p_extension.value)}") if hasattr(libp2p_extension.value, "__len__"): - logger.debug(f"Extension value length: {len(libp2p_extension.value)}") - logger.debug(f"Extension value: {libp2p_extension.value}") + print(f"Extension value length: {len(libp2p_extension.value)}") + print(f"Extension value: {libp2p_extension.value}") # Parse the extension to get public key and signature public_key, signature = self.extension_handler.parse_signed_key_extension( libp2p_extension @@ -457,8 +662,8 @@ class PeerAuthenticator: # Verify against expected peer ID if provided if expected_peer_id and derived_peer_id != expected_peer_id: - logger.debug(f"Expected Peer id: {expected_peer_id}") - logger.debug(f"Derived Peer ID: {derived_peer_id}") + print(f"Expected Peer id: {expected_peer_id}") + print(f"Derived Peer ID: {derived_peer_id}") raise QUICPeerVerificationError( f"Peer ID mismatch: expected {expected_peer_id}, " f"got {derived_peer_id}" @@ -618,23 +823,21 @@ class QUICTLSSecurityConfig: return {"error": str(e)} def debug_config(self) -> None: - """logger.debug debugging information about this configuration.""" - logger.debug( - f"=== TLS Security Config Debug ({self.config_name or 'unnamed'}) ===" - ) - logger.debug(f"Is client config: {self.is_client_config}") - logger.debug(f"ALPN protocols: {self.alpn_protocols}") - logger.debug(f"Verify mode: {self.verify_mode}") - logger.debug(f"Check hostname: {self.check_hostname}") - logger.debug(f"Certificate chain length: {len(self.certificate_chain)}") + """print debugging information about this configuration.""" + print(f"=== TLS Security Config Debug ({self.config_name or 'unnamed'}) ===") + print(f"Is client config: {self.is_client_config}") + print(f"ALPN protocols: {self.alpn_protocols}") + print(f"Verify mode: {self.verify_mode}") + print(f"Check hostname: {self.check_hostname}") + print(f"Certificate chain length: {len(self.certificate_chain)}") cert_info: dict[Any, Any] = self.get_certificate_info() for key, value in cert_info.items(): - logger.debug(f"Certificate {key}: {value}") + print(f"Certificate {key}: {value}") - logger.debug(f"Private key type: {type(self.private_key).__name__}") + print(f"Private key type: {type(self.private_key).__name__}") if hasattr(self.private_key, "key_size"): - logger.debug(f"Private key size: {self.private_key.key_size}") + print(f"Private key size: {self.private_key.key_size}") def create_server_tls_config( @@ -731,7 +934,7 @@ class QUICTLSConfigManager: peer_id=self.peer_id, ) - logger.debug("🔧 SECURITY: Created server config") + print("🔧 SECURITY: Created server config") return config def create_client_config(self) -> QUICTLSSecurityConfig: @@ -748,7 +951,7 @@ class QUICTLSConfigManager: peer_id=self.peer_id, ) - logger.debug("🔧 SECURITY: Created client config") + print("🔧 SECURITY: Created client config") return config def verify_peer_identity( @@ -817,4 +1020,4 @@ def cleanup_tls_config(config: TLSConfig) -> None: temporary files, but kept for compatibility. """ # New implementation doesn't use temporary files - logger.debug("TLS config cleanup completed") + print("TLS config cleanup completed")