diff --git a/examples/echo/test_quic.py b/examples/echo/test_quic.py index 446b8e57..29d62cab 100644 --- a/examples/echo/test_quic.py +++ b/examples/echo/test_quic.py @@ -11,6 +11,7 @@ import sys import trio from libp2p.crypto.ed25519 import create_new_key_pair +from libp2p.transport.quic.security import LIBP2P_TLS_EXTENSION_OID from libp2p.transport.quic.transport import QUICTransport, QUICTransportConfig from libp2p.transport.quic.utils import create_quic_multiaddr @@ -59,11 +60,10 @@ async def test_certificate_generation(): # Check for libp2p extension has_libp2p_ext = False for ext in cert.extensions: - if str(ext.oid) == "1.3.6.1.4.1.53594.1.1": + if ext.oid == LIBP2P_TLS_EXTENSION_OID: has_libp2p_ext = True print(f"✅ Found libp2p extension: {ext.oid}") print(f"Extension critical: {ext.critical}") - print(f"Extension value length: {len(ext.value)} bytes") break if not has_libp2p_ext: @@ -209,7 +209,7 @@ async def test_server_startup(): # Check for libp2p extension has_libp2p_ext = False for ext in cert.extensions: - if str(ext.oid) == "1.3.6.1.4.1.53594.1.1": + if ext.oid == LIBP2P_TLS_EXTENSION_OID: has_libp2p_ext = True break print(f"Has libp2p extension: {has_libp2p_ext}") diff --git a/libp2p/transport/quic/listener.py b/libp2p/transport/quic/listener.py index b14efd5e..411697ec 100644 --- a/libp2p/transport/quic/listener.py +++ b/libp2p/transport/quic/listener.py @@ -17,7 +17,10 @@ import trio from libp2p.abc import IListener from libp2p.custom_types import THandler, TProtocol -from libp2p.transport.quic.security import QUICTLSConfigManager +from libp2p.transport.quic.security import ( + LIBP2P_TLS_EXTENSION_OID, + QUICTLSConfigManager, +) from .config import QUICTransportConfig from .connection import QUICConnection @@ -442,7 +445,7 @@ class QUICListener(IListener): cert = server_config.certificate has_libp2p_ext = False for ext in cert.extensions: - if str(ext.oid) == "1.3.6.1.4.1.53594.1.1": + if ext.oid == LIBP2P_TLS_EXTENSION_OID: has_libp2p_ext = True break print(f"🔧 NEW_CONN: Certificate has libp2p extension: {has_libp2p_ext}") @@ -557,10 +560,10 @@ class QUICListener(IListener): cert = config.certificate print(f"🔧 QUIC_STATE: Certificate subject: {cert.subject}") print( - f"🔧 QUIC_STATE: Certificate valid from: {cert.not_valid_before}" + f"🔧 QUIC_STATE: Certificate valid from: {cert.not_valid_before_utc}" ) print( - f"🔧 QUIC_STATE: Certificate valid until: {cert.not_valid_after}" + f"🔧 QUIC_STATE: Certificate valid until: {cert.not_valid_after_utc}" ) # Check for connection errors diff --git a/libp2p/transport/quic/security.py b/libp2p/transport/quic/security.py index 28abc626..d805753e 100644 --- a/libp2p/transport/quic/security.py +++ b/libp2p/transport/quic/security.py @@ -5,7 +5,6 @@ Based on go-libp2p and js-libp2p security patterns. """ from dataclasses import dataclass, field -from datetime import datetime, timedelta import logging import ssl from typing import List, Optional, Union @@ -280,15 +279,15 @@ class CertificateGenerator: libp2p_private_key, cert_public_key_bytes ) - # Set validity period using datetime objects (FIXED) - now = datetime.utcnow() # Use datetime instead of time.time() - not_before = now - timedelta(seconds=CERTIFICATE_NOT_BEFORE_BUFFER) + from datetime import datetime, timedelta, timezone + + now = datetime.now(timezone.utc) + not_before = now - timedelta(minutes=1) not_after = now + timedelta(days=validity_days) # Generate serial number - serial_number = int(now.timestamp()) # Convert datetime to timestamp + serial_number = int(now.timestamp()) - # Build certificate with proper datetime objects certificate = ( x509.CertificateBuilder() .subject_name( @@ -537,9 +536,8 @@ class QUICTLSSecurityConfig: """ try: - libp2p_oid = "1.3.6.1.4.1.53594.1.1" for ext in self.certificate.extensions: - if str(ext.oid) == libp2p_oid: + if ext.oid == LIBP2P_TLS_EXTENSION_OID: return True return False except Exception: @@ -554,14 +552,13 @@ class QUICTLSSecurityConfig: """ try: - from datetime import datetime + from datetime import datetime, timezone - now = datetime.utcnow() - return ( - self.certificate.not_valid_before - <= now - <= self.certificate.not_valid_after - ) + now = datetime.now(timezone.utc) + not_before = self.certificate.not_valid_before_utc + not_after = self.certificate.not_valid_after_utc + + return not_before <= now <= not_after except Exception: return False @@ -578,8 +575,8 @@ class QUICTLSSecurityConfig: "subject": str(self.certificate.subject), "issuer": str(self.certificate.issuer), "serial_number": self.certificate.serial_number, - "not_valid_before": self.certificate.not_valid_before, - "not_valid_after": self.certificate.not_valid_after, + "not_valid_before_utc": self.certificate.not_valid_before_utc, + "not_valid_after_utc": self.certificate.not_valid_after_utc, "has_libp2p_extension": self.has_libp2p_extension(), "is_valid": self.is_certificate_valid(), "certificate_key_match": self.validate_certificate_key_match(), @@ -630,7 +627,7 @@ def create_server_tls_config( peer_id=peer_id, is_client_config=False, config_name="server", - verify_mode=False, # Server doesn't verify client certs in libp2p + verify_mode=ssl.CERT_REQUIRED, # Server doesn't verify client certs in libp2p check_hostname=False, **kwargs, ) @@ -661,7 +658,7 @@ def create_client_tls_config( peer_id=peer_id, is_client_config=True, config_name="client", - verify_mode=False, # Client doesn't verify server certs in libp2p + verify_mode=ssl.CERT_NONE, # Client doesn't verify server certs in libp2p check_hostname=False, **kwargs, ) diff --git a/libp2p/transport/quic/transport.py b/libp2p/transport/quic/transport.py index 8aed36f0..1a884040 100644 --- a/libp2p/transport/quic/transport.py +++ b/libp2p/transport/quic/transport.py @@ -7,6 +7,7 @@ Updated to include Module 5 security integration. import copy import logging +import ssl import sys from aioquic.quic.configuration import ( @@ -202,48 +203,20 @@ class QUICTransport(ITransport): """ try: - - # The security manager should return cryptography objects - # not DER bytes, but if it returns DER bytes, we need to handle that - certificate = tls_config.certificate - private_key = tls_config.private_key - - # Check if we received DER bytes and need - # to convert to cryptography objects - if isinstance(certificate, bytes): - from cryptography import x509 - - certificate = x509.load_der_x509_certificate(certificate) - - if isinstance(private_key, bytes): - from cryptography.hazmat.primitives import serialization - - private_key = serialization.load_der_private_key( # type: ignore - private_key, password=None - ) - - # Set directly on the configuration object - config.certificate = certificate - config.private_key = private_key - - # Handle certificate chain if provided - certificate_chain = tls_config.certificate_chain - # Convert DER bytes to cryptography objects if needed - chain_objects = [] - for cert in certificate_chain: - if isinstance(cert, bytes): - from cryptography import x509 - - cert = x509.load_der_x509_certificate(cert) - chain_objects.append(cert) - config.certificate_chain = chain_objects - - # Set ALPN protocols + # Access attributes directly from QUICTLSSecurityConfig + config.certificate = tls_config.certificate + config.private_key = tls_config.private_key + config.certificate_chain = tls_config.certificate_chain config.alpn_protocols = tls_config.alpn_protocols - # Set certificate verification mode + # Set verification mode (though libp2p typically doesn't verify) config.verify_mode = tls_config.verify_mode + if tls_config.is_client_config: + config.verify_mode = ssl.CERT_NONE + else: + config.verify_mode = ssl.CERT_REQUIRED + logger.debug("Successfully applied TLS configuration to QUIC config") except Exception as e: diff --git a/libp2p/transport/quic/utils.py b/libp2p/transport/quic/utils.py index 03708778..22cbf4c4 100644 --- a/libp2p/transport/quic/utils.py +++ b/libp2p/transport/quic/utils.py @@ -6,6 +6,7 @@ Based on go-libp2p and js-libp2p QUIC implementations. import ipaddress import logging +import ssl from aioquic.quic.configuration import QuicConfiguration import multiaddr @@ -302,6 +303,7 @@ def create_server_config_from_base( try: # Create new server configuration from scratch server_config = QuicConfiguration(is_client=False) + server_config.verify_mode = ssl.CERT_REQUIRED # Copy basic configuration attributes (these are safe to copy) copyable_attrs = [ @@ -343,18 +345,14 @@ def create_server_config_from_base( server_tls_config = security_manager.create_server_config() # Override with security manager's TLS configuration - if "certificate" in server_tls_config: - server_config.certificate = server_tls_config["certificate"] - if "private_key" in server_tls_config: - server_config.private_key = server_tls_config["private_key"] - if "certificate_chain" in server_tls_config: - # type: ignore - server_config.certificate_chain = server_tls_config[ # type: ignore - "certificate_chain" # type: ignore - ] - if "alpn_protocols" in server_tls_config: - # type: ignore - server_config.alpn_protocols = server_tls_config["alpn_protocols"] # type: ignore + if server_tls_config.certificate: + server_config.certificate = server_tls_config.certificate + if server_tls_config.private_key: + server_config.private_key = server_tls_config.private_key + if server_tls_config.certificate_chain: + server_config.certificate_chain = server_tls_config.certificate_chain + if server_tls_config.alpn_protocols: + server_config.alpn_protocols = server_tls_config.alpn_protocols except Exception as e: logger.warning(f"Failed to apply security manager config: {e}")