diff --git a/examples/chat/chat.py b/examples/chat/chat.py index 87e7a44a..05a9b918 100755 --- a/examples/chat/chat.py +++ b/examples/chat/chat.py @@ -43,6 +43,9 @@ async def run(port: int, destination: str) -> None: listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") host = new_host() async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + if not destination: # its the server async def stream_handler(stream: INetStream) -> None: diff --git a/examples/echo/echo.py b/examples/echo/echo.py index 9f1722b2..126a7da2 100644 --- a/examples/echo/echo.py +++ b/examples/echo/echo.py @@ -45,7 +45,10 @@ async def run(port: int, destination: str, seed: int | None = None) -> None: secret = secrets.token_bytes(32) host = new_host(key_pair=create_new_key_pair(secret)) - async with host.run(listen_addrs=[listen_addr]): + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + print(f"I am {host.get_id().to_string()}") if not destination: # its the server diff --git a/examples/identify/identify.py b/examples/identify/identify.py index 38fe9574..98980f99 100644 --- a/examples/identify/identify.py +++ b/examples/identify/identify.py @@ -14,6 +14,8 @@ from libp2p.identity.identify.identify import ( identify_handler_for, parse_identify_response, ) +from libp2p.identity.identify.pb.identify_pb2 import Identify +from libp2p.peer.envelope import debug_dump_envelope, unmarshal_envelope from libp2p.peer.peerinfo import ( info_from_p2p_addr, ) @@ -32,10 +34,11 @@ def decode_multiaddrs(raw_addrs): return decoded_addrs -def print_identify_response(identify_response): +def print_identify_response(identify_response: Identify): """Pretty-print Identify response.""" public_key_b64 = base64.b64encode(identify_response.public_key).decode("utf-8") listen_addrs = decode_multiaddrs(identify_response.listen_addrs) + signed_peer_record = unmarshal_envelope(identify_response.signedPeerRecord) try: observed_addr_decoded = decode_multiaddrs([identify_response.observed_addr]) except Exception: @@ -51,6 +54,8 @@ def print_identify_response(identify_response): f" Agent Version: {identify_response.agent_version}" ) + debug_dump_envelope(signed_peer_record) + async def run(port: int, destination: str, use_varint_format: bool = True) -> None: localhost_ip = "0.0.0.0" @@ -61,12 +66,19 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No host_a = new_host() # Set up identify handler with specified format + # Set use_varint_format = False, if want to checkout the Signed-PeerRecord identify_handler = identify_handler_for( host_a, use_varint_format=use_varint_format ) host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler) - async with host_a.run(listen_addrs=[listen_addr]): + async with ( + host_a.run(listen_addrs=[listen_addr]), + trio.open_nursery() as nursery, + ): + # Start the peer-store cleanup task + nursery.start_soon(host_a.get_peerstore().start_cleanup_task, 60) + # Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client # connections server_addr = str(host_a.get_addrs()[0]) @@ -125,7 +137,13 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}") host_b = new_host() - async with host_b.run(listen_addrs=[listen_addr]): + async with ( + host_b.run(listen_addrs=[listen_addr]), + trio.open_nursery() as nursery, + ): + # Start the peer-store cleanup task + nursery.start_soon(host_b.get_peerstore().start_cleanup_task, 60) + # Connect to the first host print(f"dialer (host_b) listening on {host_b.get_addrs()[0]}") maddr = multiaddr.Multiaddr(destination) @@ -238,9 +256,9 @@ def main() -> None: args = parser.parse_args() - # Determine format: raw format if --raw-format is specified, otherwise - # length-prefixed - use_varint_format = not args.raw_format + # Determine format: use varint (length-prefixed) if --raw-format is specified, + # otherwise use raw protobuf format (old format) + use_varint_format = args.raw_format try: if args.destination: diff --git a/examples/identify_push/identify_push_demo.py b/examples/identify_push/identify_push_demo.py index 5a293f07..ccd8b29d 100644 --- a/examples/identify_push/identify_push_demo.py +++ b/examples/identify_push/identify_push_demo.py @@ -211,7 +211,15 @@ async def main() -> None: listen_addr_1 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") listen_addr_2 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") - async with host_1.run([listen_addr_1]), host_2.run([listen_addr_2]): + async with ( + host_1.run([listen_addr_1]), + host_2.run([listen_addr_2]), + trio.open_nursery() as nursery, + ): + # Start the peer-store cleanup task + nursery.start_soon(host_1.get_peerstore().start_cleanup_task, 60) + nursery.start_soon(host_2.get_peerstore().start_cleanup_task, 60) + # Get the addresses of both hosts addr_1 = host_1.get_addrs()[0] addr_2 = host_2.get_addrs()[0] diff --git a/examples/kademlia/kademlia.py b/examples/kademlia/kademlia.py index ada81d87..00c7915a 100644 --- a/examples/kademlia/kademlia.py +++ b/examples/kademlia/kademlia.py @@ -151,7 +151,10 @@ async def run_node( host = new_host(key_pair=key_pair) listen_addr = Multiaddr(f"/ip4/127.0.0.1/tcp/{port}") - async with host.run(listen_addrs=[listen_addr]): + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + peer_id = host.get_id().pretty() addr_str = f"/ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}" await connect_to_bootstrap_nodes(host, bootstrap_nodes) diff --git a/examples/mDNS/mDNS.py b/examples/mDNS/mDNS.py index 794e05c8..d3f11b56 100644 --- a/examples/mDNS/mDNS.py +++ b/examples/mDNS/mDNS.py @@ -46,7 +46,10 @@ async def run(port: int) -> None: logger.info("Starting peer Discovery") host = new_host(key_pair=key_pair, enable_mDNS=True) - async with host.run(listen_addrs=[listen_addr]): + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + await trio.sleep_forever() diff --git a/examples/ping/ping.py b/examples/ping/ping.py index 647a607b..d1a5daae 100644 --- a/examples/ping/ping.py +++ b/examples/ping/ping.py @@ -59,6 +59,9 @@ async def run(port: int, destination: str) -> None: host = new_host(listen_addrs=[listen_addr]) async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + if not destination: host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) diff --git a/examples/pubsub/pubsub.py b/examples/pubsub/pubsub.py index 9dca415f..1ab6d650 100644 --- a/examples/pubsub/pubsub.py +++ b/examples/pubsub/pubsub.py @@ -144,6 +144,9 @@ async def run(topic: str, destination: str | None, port: int | None) -> None: pubsub = Pubsub(host, gossipsub) termination_event = trio.Event() # Event to signal termination async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + logger.info(f"Node started with peer ID: {host.get_id()}") logger.info(f"Listening on: {listen_addr}") logger.info("Initializing PubSub and GossipSub...") diff --git a/libp2p/abc.py b/libp2p/abc.py index 3adb04aa..d3df0a8c 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -16,6 +16,7 @@ from typing import ( TYPE_CHECKING, Any, AsyncContextManager, + Optional, ) from multiaddr import ( @@ -41,20 +42,19 @@ from libp2p.io.abc import ( from libp2p.peer.id import ( ID, ) +import libp2p.peer.pb.peer_record_pb2 as pb from libp2p.peer.peerinfo import ( PeerInfo, ) if TYPE_CHECKING: + from libp2p.peer.envelope import Envelope + from libp2p.peer.peer_record import PeerRecord + from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.pubsub.pubsub import ( Pubsub, ) -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from libp2p.protocol_muxer.multiselect import Multiselect - from libp2p.pubsub.pb import ( rpc_pb2, ) @@ -493,6 +493,71 @@ class IAddrBook(ABC): """ +# ------------------ certified-addr-book interface.py --------------------- +class ICertifiedAddrBook(ABC): + """ + Interface for a certified address book. + + Provides methods for managing signed peer records + """ + + @abstractmethod + def consume_peer_record(self, envelope: "Envelope", ttl: int) -> bool: + """ + Accept and store a signed PeerRecord, unless it's older than + the one already stored. + + This function: + - Extracts the peer ID and sequence number from the envelope + - Rejects the record if it's older (lower seq) + - Updates the stored peer record and replaces associated + addresses if accepted + + + Parameters + ---------- + envelope: + Signed envelope containing a PeerRecord. + ttl: + Time-to-live for the included multiaddrs (in seconds). + + """ + + @abstractmethod + def get_peer_record(self, peer_id: ID) -> Optional["Envelope"]: + """ + Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists + and is still relevant. + + First, it runs cleanup via `maybe_delete_peer_record` to purge stale data. + Then it checks whether the peer has valid, unexpired addresses before + returning the associated envelope. + + + Parameters + ---------- + peer_id : ID + The peer to look up. + + """ + + @abstractmethod + def maybe_delete_peer_record(self, peer_id: ID) -> None: + """ + Delete the signed peer record for a peer if it has no know + (non-expired) addresses. + + This is a garbage collection mechanism: if all addresses for a peer have expired + or been cleared, there's no point holding onto its signed `Envelope` + + Parameters + ---------- + peer_id : ID + The peer whose record we may delete. + + """ + + # -------------------------- keybook interface.py -------------------------- @@ -758,7 +823,9 @@ class IProtoBook(ABC): # -------------------------- peerstore interface.py -------------------------- -class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook): +class IPeerStore( + IPeerMetadata, IAddrBook, ICertifiedAddrBook, IKeyBook, IMetrics, IProtoBook +): """ Interface for a peer store. @@ -893,7 +960,65 @@ class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook): """ + # --------CERTIFIED-ADDR-BOOK---------- + + @abstractmethod + def consume_peer_record(self, envelope: "Envelope", ttl: int) -> bool: + """ + Accept and store a signed PeerRecord, unless it's older + than the one already stored. + + This function: + - Extracts the peer ID and sequence number from the envelope + - Rejects the record if it's older (lower seq) + - Updates the stored peer record and replaces associated addresses if accepted + + + Parameters + ---------- + envelope: + Signed envelope containing a PeerRecord. + ttl: + Time-to-live for the included multiaddrs (in seconds). + + """ + + @abstractmethod + def get_peer_record(self, peer_id: ID) -> Optional["Envelope"]: + """ + Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists + and is still relevant. + + First, it runs cleanup via `maybe_delete_peer_record` to purge stale data. + Then it checks whether the peer has valid, unexpired addresses before + returning the associated envelope. + + + Parameters + ---------- + peer_id : ID + The peer to look up. + + """ + + @abstractmethod + def maybe_delete_peer_record(self, peer_id: ID) -> None: + """ + Delete the signed peer record for a peer if it has no + know (non-expired) addresses. + + This is a garbage collection mechanism: if all addresses for a peer have expired + or been cleared, there's no point holding onto its signed `Envelope` + + Parameters + ---------- + peer_id : ID + The peer whose record we may delete. + + """ + # --------KEY-BOOK---------- + @abstractmethod def pubkey(self, peer_id: ID) -> PublicKey: """ @@ -1202,6 +1327,10 @@ class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook): def clear_peerdata(self, peer_id: ID) -> None: """clear_peerdata""" + @abstractmethod + async def start_cleanup_task(self, cleanup_interval: int = 3600) -> None: + """Start periodic cleanup of expired peer records and addresses.""" + # -------------------------- listener interface.py -------------------------- @@ -1689,6 +1818,121 @@ class IHost(ABC): """ +# -------------------------- peer-record interface.py -------------------------- +class IPeerRecord(ABC): + """ + Interface for a libp2p PeerRecord object. + + A PeerRecord contains metadata about a peer such as its ID, public addresses, + and a strictly increasing sequence number for versioning. + + PeerRecords are used in signed routing Envelopes for secure peer data propagation. + """ + + @abstractmethod + def domain(self) -> str: + """ + Return the domain string for this record type. + + Used in envelope validation to distinguish different record types. + """ + + @abstractmethod + def codec(self) -> bytes: + """ + Return a binary codec prefix that identifies the PeerRecord type. + + This is prepended in signed envelopes to allow type-safe decoding. + """ + + @abstractmethod + def to_protobuf(self) -> pb.PeerRecord: + """ + Convert this PeerRecord into its Protobuf representation. + + :raises ValueError: if serialization fails (e.g., invalid peer ID). + :return: A populated protobuf `PeerRecord` message. + """ + + @abstractmethod + def marshal_record(self) -> bytes: + """ + Serialize this PeerRecord into a byte string. + + Used when signing or sealing the record in an envelope. + + :raises ValueError: if protobuf serialization fails. + :return: Byte-encoded PeerRecord. + """ + + @abstractmethod + def equal(self, other: object) -> bool: + """ + Compare this PeerRecord with another for equality. + + Two PeerRecords are considered equal if: + - They have the same `peer_id` + - Their `seq` numbers match + - Their address lists are identical and ordered + + :param other: Object to compare with. + :return: True if equal, False otherwise. + """ + + +# -------------------------- envelope interface.py -------------------------- +class IEnvelope(ABC): + @abstractmethod + def marshal_envelope(self) -> bytes: + """ + Serialize this Envelope into its protobuf wire format. + + Converts all envelope fields into a `pb.Envelope` protobuf message + and returns the serialized bytes. + + :return: Serialized envelope as bytes. + """ + + @abstractmethod + def validate(self, domain: str) -> None: + """ + Verify the envelope's signature within the given domain scope. + + This ensures that the envelope has not been tampered with + and was signed under the correct usage context. + + :param domain: Domain string that contextualizes the signature. + :raises ValueError: If the signature is invalid. + """ + + @abstractmethod + def record(self) -> "PeerRecord": + """ + Lazily decode and return the embedded PeerRecord. + + This method unmarshals the payload bytes into a `PeerRecord` instance, + using the registered codec to identify the type. The decoded result + is cached for future use. + + :return: Decoded PeerRecord object. + :raises Exception: If decoding fails or payload type is unsupported. + """ + + @abstractmethod + def equal(self, other: Any) -> bool: + """ + Compare this Envelope with another for structural equality. + + Two envelopes are considered equal if: + - They have the same public key + - The payload type and payload bytes match + - Their signatures are identical + + :param other: Another object to compare. + :return: True if equal, False otherwise. + """ + + # -------------------------- peerdata interface.py -------------------------- diff --git a/libp2p/identity/identify/identify.py b/libp2p/identity/identify/identify.py index 04f9efed..b2811ff9 100644 --- a/libp2p/identity/identify/identify.py +++ b/libp2p/identity/identify/identify.py @@ -15,6 +15,8 @@ from libp2p.custom_types import ( from libp2p.network.stream.exceptions import ( StreamClosed, ) +from libp2p.peer.envelope import seal_record +from libp2p.peer.peer_record import PeerRecord from libp2p.utils import ( decode_varint_with_size, get_agent_version, @@ -63,6 +65,11 @@ def _mk_identify_protobuf( laddrs = host.get_addrs() protocols = tuple(str(p) for p in host.get_mux().get_protocols() if p is not None) + # Create a signed peer-record for the remote peer + record = PeerRecord(host.get_id(), host.get_addrs()) + envelope = seal_record(record, host.get_private_key()) + protobuf = envelope.marshal_envelope() + observed_addr = observed_multiaddr.to_bytes() if observed_multiaddr else b"" return Identify( protocol_version=PROTOCOL_VERSION, @@ -71,6 +78,7 @@ def _mk_identify_protobuf( listen_addrs=map(_multiaddr_to_bytes, laddrs), observed_addr=observed_addr, protocols=protocols, + signedPeerRecord=protobuf, ) diff --git a/libp2p/identity/identify/pb/identify.proto b/libp2p/identity/identify/pb/identify.proto index cc4392a0..4b62c04c 100644 --- a/libp2p/identity/identify/pb/identify.proto +++ b/libp2p/identity/identify/pb/identify.proto @@ -9,4 +9,5 @@ message Identify { repeated bytes listen_addrs = 2; optional bytes observed_addr = 4; repeated string protocols = 3; + optional bytes signedPeerRecord = 8; } diff --git a/libp2p/identity/identify/pb/identify_pb2.py b/libp2p/identity/identify/pb/identify_pb2.py index 4c89157e..2db3c552 100644 --- a/libp2p/identity/identify/pb/identify_pb2.py +++ b/libp2p/identity/identify/pb/identify_pb2.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: libp2p/identity/identify/pb/identify.proto +# Protobuf Python Version: 4.25.3 """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -13,13 +14,13 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*libp2p/identity/identify/pb/identify.proto\x12\x0bidentify.pb\"\x8f\x01\n\x08Identify\x12\x18\n\x10protocol_version\x18\x05 \x01(\t\x12\x15\n\ragent_version\x18\x06 \x01(\t\x12\x12\n\npublic_key\x18\x01 \x01(\x0c\x12\x14\n\x0clisten_addrs\x18\x02 \x03(\x0c\x12\x15\n\robserved_addr\x18\x04 \x01(\x0c\x12\x11\n\tprotocols\x18\x03 \x03(\t') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*libp2p/identity/identify/pb/identify.proto\x12\x0bidentify.pb\"\xa9\x01\n\x08Identify\x12\x18\n\x10protocol_version\x18\x05 \x01(\t\x12\x15\n\ragent_version\x18\x06 \x01(\t\x12\x12\n\npublic_key\x18\x01 \x01(\x0c\x12\x14\n\x0clisten_addrs\x18\x02 \x03(\x0c\x12\x15\n\robserved_addr\x18\x04 \x01(\x0c\x12\x11\n\tprotocols\x18\x03 \x03(\t\x12\x18\n\x10signedPeerRecord\x18\x08 \x01(\x0c') -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.identity.identify.pb.identify_pb2', globals()) +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.identity.identify.pb.identify_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _IDENTIFY._serialized_start=60 - _IDENTIFY._serialized_end=203 + _globals['_IDENTIFY']._serialized_start=60 + _globals['_IDENTIFY']._serialized_end=229 # @@protoc_insertion_point(module_scope) diff --git a/libp2p/identity/identify/pb/identify_pb2.pyi b/libp2p/identity/identify/pb/identify_pb2.pyi index 83a72380..428dcf35 100644 --- a/libp2p/identity/identify/pb/identify_pb2.pyi +++ b/libp2p/identity/identify/pb/identify_pb2.pyi @@ -1,46 +1,24 @@ -""" -@generated by mypy-protobuf. Do not edit manually! -isort:skip_file -""" +from google.protobuf.internal import containers as _containers +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Iterable as _Iterable, Optional as _Optional -import builtins -import collections.abc -import google.protobuf.descriptor -import google.protobuf.internal.containers -import google.protobuf.message -import typing +DESCRIPTOR: _descriptor.FileDescriptor -DESCRIPTOR: google.protobuf.descriptor.FileDescriptor - -@typing.final -class Identify(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - PROTOCOL_VERSION_FIELD_NUMBER: builtins.int - AGENT_VERSION_FIELD_NUMBER: builtins.int - PUBLIC_KEY_FIELD_NUMBER: builtins.int - LISTEN_ADDRS_FIELD_NUMBER: builtins.int - OBSERVED_ADDR_FIELD_NUMBER: builtins.int - PROTOCOLS_FIELD_NUMBER: builtins.int - protocol_version: builtins.str - agent_version: builtins.str - public_key: builtins.bytes - observed_addr: builtins.bytes - @property - def listen_addrs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ... - @property - def protocols(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... - def __init__( - self, - *, - protocol_version: builtins.str | None = ..., - agent_version: builtins.str | None = ..., - public_key: builtins.bytes | None = ..., - listen_addrs: collections.abc.Iterable[builtins.bytes] | None = ..., - observed_addr: builtins.bytes | None = ..., - protocols: collections.abc.Iterable[builtins.str] | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["agent_version", b"agent_version", "observed_addr", b"observed_addr", "protocol_version", b"protocol_version", "public_key", b"public_key"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["agent_version", b"agent_version", "listen_addrs", b"listen_addrs", "observed_addr", b"observed_addr", "protocol_version", b"protocol_version", "protocols", b"protocols", "public_key", b"public_key"]) -> None: ... - -global___Identify = Identify +class Identify(_message.Message): + __slots__ = ("protocol_version", "agent_version", "public_key", "listen_addrs", "observed_addr", "protocols", "signedPeerRecord") + PROTOCOL_VERSION_FIELD_NUMBER: _ClassVar[int] + AGENT_VERSION_FIELD_NUMBER: _ClassVar[int] + PUBLIC_KEY_FIELD_NUMBER: _ClassVar[int] + LISTEN_ADDRS_FIELD_NUMBER: _ClassVar[int] + OBSERVED_ADDR_FIELD_NUMBER: _ClassVar[int] + PROTOCOLS_FIELD_NUMBER: _ClassVar[int] + SIGNEDPEERRECORD_FIELD_NUMBER: _ClassVar[int] + protocol_version: str + agent_version: str + public_key: bytes + listen_addrs: _containers.RepeatedScalarFieldContainer[bytes] + observed_addr: bytes + protocols: _containers.RepeatedScalarFieldContainer[str] + signedPeerRecord: bytes + def __init__(self, protocol_version: _Optional[str] = ..., agent_version: _Optional[str] = ..., public_key: _Optional[bytes] = ..., listen_addrs: _Optional[_Iterable[bytes]] = ..., observed_addr: _Optional[bytes] = ..., protocols: _Optional[_Iterable[str]] = ..., signedPeerRecord: _Optional[bytes] = ...) -> None: ... diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index fec62089..5b23851b 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -20,6 +20,7 @@ from libp2p.custom_types import ( from libp2p.network.stream.exceptions import ( StreamClosed, ) +from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) @@ -140,6 +141,19 @@ async def _update_peerstore_from_identify( except Exception as e: logger.error("Error updating protocols for peer %s: %s", peer_id, e) + if identify_msg.HasField("signedPeerRecord"): + try: + # Convert the signed-peer-record(Envelope) from prtobuf bytes + envelope, _ = consume_envelope( + identify_msg.signedPeerRecord, "libp2p-peer-record" + ) + # Use a default TTL of 2 hours (7200 seconds) + if not peerstore.consume_peer_record(envelope, 7200): + logger.error("Updating Certified-Addr-Book was unsuccessful") + except Exception as e: + logger.error( + "Error updating the certified addr book for peer %s: %s", peer_id, e + ) # Update observed address if present if identify_msg.HasField("observed_addr") and identify_msg.observed_addr: try: diff --git a/libp2p/peer/envelope.py b/libp2p/peer/envelope.py new file mode 100644 index 00000000..e93a8280 --- /dev/null +++ b/libp2p/peer/envelope.py @@ -0,0 +1,271 @@ +from typing import Any, cast + +from libp2p.crypto.ed25519 import Ed25519PublicKey +from libp2p.crypto.keys import PrivateKey, PublicKey +from libp2p.crypto.rsa import RSAPublicKey +from libp2p.crypto.secp256k1 import Secp256k1PublicKey +import libp2p.peer.pb.crypto_pb2 as cryto_pb +import libp2p.peer.pb.envelope_pb2 as pb +import libp2p.peer.pb.peer_record_pb2 as record_pb +from libp2p.peer.peer_record import ( + PeerRecord, + peer_record_from_protobuf, + unmarshal_record, +) +from libp2p.utils.varint import encode_uvarint + +ENVELOPE_DOMAIN = "libp2p-peer-record" +PEER_RECORD_CODEC = b"\x03\x01" + + +class Envelope: + """ + A signed wrapper around a serialized libp2p record. + + Envelopes are cryptographically signed by the author's private key + and are scoped to a specific 'domain' to prevent cross-protocol replay. + + Attributes: + public_key: The public key that can verify the envelope's signature. + payload_type: A multicodec code identifying the type of payload inside. + raw_payload: The raw serialized record data. + signature: Signature over the domain-scoped payload content. + + """ + + public_key: PublicKey + payload_type: bytes + raw_payload: bytes + signature: bytes + + _cached_record: PeerRecord | None = None + _unmarshal_error: Exception | None = None + + def __init__( + self, + public_key: PublicKey, + payload_type: bytes, + raw_payload: bytes, + signature: bytes, + ): + self.public_key = public_key + self.payload_type = payload_type + self.raw_payload = raw_payload + self.signature = signature + + def marshal_envelope(self) -> bytes: + """ + Serialize this Envelope into its protobuf wire format. + + Converts all envelope fields into a `pb.Envelope` protobuf message + and returns the serialized bytes. + + :return: Serialized envelope as bytes. + """ + pb_env = pb.Envelope( + public_key=pub_key_to_protobuf(self.public_key), + payload_type=self.payload_type, + payload=self.raw_payload, + signature=self.signature, + ) + return pb_env.SerializeToString() + + def validate(self, domain: str) -> None: + """ + Verify the envelope's signature within the given domain scope. + + This ensures that the envelope has not been tampered with + and was signed under the correct usage context. + + :param domain: Domain string that contextualizes the signature. + :raises ValueError: If the signature is invalid. + """ + unsigned = make_unsigned(domain, self.payload_type, self.raw_payload) + if not self.public_key.verify(unsigned, self.signature): + raise ValueError("Invalid envelope signature") + + def record(self) -> PeerRecord: + """ + Lazily decode and return the embedded PeerRecord. + + This method unmarshals the payload bytes into a `PeerRecord` instance, + using the registered codec to identify the type. The decoded result + is cached for future use. + + :return: Decoded PeerRecord object. + :raises Exception: If decoding fails or payload type is unsupported. + """ + if self._cached_record is not None: + return self._cached_record + + try: + if self.payload_type != PEER_RECORD_CODEC: + raise ValueError("Unsuported payload type in envelope") + msg = record_pb.PeerRecord() + msg.ParseFromString(self.raw_payload) + + self._cached_record = peer_record_from_protobuf(msg) + return self._cached_record + except Exception as e: + self._unmarshal_error = e + raise + + def equal(self, other: Any) -> bool: + """ + Compare this Envelope with another for structural equality. + + Two envelopes are considered equal if: + - They have the same public key + - The payload type and payload bytes match + - Their signatures are identical + + :param other: Another object to compare. + :return: True if equal, False otherwise. + """ + if isinstance(other, Envelope): + return ( + self.public_key.__eq__(other.public_key) + and self.payload_type == other.payload_type + and self.signature == other.signature + and self.raw_payload == other.raw_payload + ) + return False + + +def pub_key_to_protobuf(pub_key: PublicKey) -> cryto_pb.PublicKey: + """ + Convert a Python PublicKey object to its protobuf equivalent. + + :param pub_key: A libp2p-compatible PublicKey instance. + :return: Serialized protobuf PublicKey message. + """ + internal_key_type = pub_key.get_type() + key_type = cast(cryto_pb.KeyType, internal_key_type.value) + data = pub_key.to_bytes() + protobuf_key = cryto_pb.PublicKey(Type=key_type, Data=data) + return protobuf_key + + +def pub_key_from_protobuf(pb_key: cryto_pb.PublicKey) -> PublicKey: + """ + Parse a protobuf PublicKey message into a native libp2p PublicKey. + + Supports Ed25519, RSA, and Secp256k1 key types. + + :param pb_key: Protobuf representation of a public key. + :return: Parsed PublicKey object. + :raises ValueError: If the key type is unrecognized. + """ + if pb_key.Type == cryto_pb.KeyType.Ed25519: + return Ed25519PublicKey.from_bytes(pb_key.Data) + elif pb_key.Type == cryto_pb.KeyType.RSA: + return RSAPublicKey.from_bytes(pb_key.Data) + elif pb_key.Type == cryto_pb.KeyType.Secp256k1: + return Secp256k1PublicKey.from_bytes(pb_key.Data) + # libp2p.crypto.ecdsa not implemented + else: + raise ValueError(f"Unknown key type: {pb_key.Type}") + + +def seal_record(record: PeerRecord, private_key: PrivateKey) -> Envelope: + """ + Create and sign a new Envelope from a PeerRecord. + + The record is serialized and signed in the scope of its domain and codec. + The result is a self-contained, verifiable Envelope. + + :param record: A PeerRecord to encapsulate. + :param private_key: The signer's private key. + :return: A signed Envelope instance. + """ + payload = record.marshal_record() + + unsigned = make_unsigned(record.domain(), record.codec(), payload) + signature = private_key.sign(unsigned) + + return Envelope( + public_key=private_key.get_public_key(), + payload_type=record.codec(), + raw_payload=payload, + signature=signature, + ) + + +def consume_envelope(data: bytes, domain: str) -> tuple[Envelope, PeerRecord]: + """ + Parse, validate, and decode an Envelope from bytes. + + Validates the envelope's signature using the given domain and decodes + the inner payload into a PeerRecord. + + :param data: Serialized envelope bytes. + :param domain: Domain string to verify signature against. + :return: Tuple of (Envelope, PeerRecord). + :raises ValueError: If signature validation or decoding fails. + """ + env = unmarshal_envelope(data) + env.validate(domain) + record = env.record() + return env, record + + +def unmarshal_envelope(data: bytes) -> Envelope: + """ + Deserialize an Envelope from its wire format. + + This parses the protobuf fields without verifying the signature. + + :param data: Serialized envelope bytes. + :return: Parsed Envelope object. + :raises DecodeError: If protobuf parsing fails. + """ + pb_env = pb.Envelope() + pb_env.ParseFromString(data) + pk = pub_key_from_protobuf(pb_env.public_key) + + return Envelope( + public_key=pk, + payload_type=pb_env.payload_type, + raw_payload=pb_env.payload, + signature=pb_env.signature, + ) + + +def make_unsigned(domain: str, payload_type: bytes, payload: bytes) -> bytes: + """ + Build a byte buffer to be signed for an Envelope. + + The unsigned byte structure is: + varint(len(domain)) || domain || + varint(len(payload_type)) || payload_type || + varint(len(payload)) || payload + + This is the exact input used during signing and verification. + + :param domain: Domain string for signature scoping. + :param payload_type: Identifier for the type of payload. + :param payload: Raw serialized payload bytes. + :return: Byte buffer to be signed or verified. + """ + fields = [domain.encode(), payload_type, payload] + buf = bytearray() + + for field in fields: + buf.extend(encode_uvarint(len(field))) + buf.extend(field) + + return bytes(buf) + + +def debug_dump_envelope(env: Envelope) -> None: + print("\n=== Envelope ===") + print(f"Payload Type: {env.payload_type!r}") + print(f"Signature: {env.signature.hex()} ({len(env.signature)} bytes)") + print(f"Raw Payload: {env.raw_payload.hex()} ({len(env.raw_payload)} bytes)") + + try: + peer_record = unmarshal_record(env.raw_payload) + print("\n=== Parsed PeerRecord ===") + print(peer_record) + except Exception as e: + print("Failed to parse PeerRecord:", e) diff --git a/libp2p/peer/pb/crypto.proto b/libp2p/peer/pb/crypto.proto new file mode 100644 index 00000000..b2327e68 --- /dev/null +++ b/libp2p/peer/pb/crypto.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package libp2p.peer.pb.crypto; + +option go_package = "github.com/libp2p/go-libp2p/core/crypto/pb"; + +enum KeyType { + RSA = 0; + Ed25519 = 1; + Secp256k1 = 2; + ECDSA = 3; +} + +message PublicKey { + KeyType Type = 1; + bytes Data = 2; +} + +message PrivateKey { + KeyType Type = 1; + bytes Data = 2; +} diff --git a/libp2p/peer/pb/crypto_pb2.py b/libp2p/peer/pb/crypto_pb2.py new file mode 100644 index 00000000..d7cd0e76 --- /dev/null +++ b/libp2p/peer/pb/crypto_pb2.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: libp2p/peer/pb/crypto.proto +# Protobuf Python Version: 4.25.3 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1blibp2p/peer/pb/crypto.proto\x12\x15libp2p.peer.pb.crypto\"G\n\tPublicKey\x12,\n\x04Type\x18\x01 \x01(\x0e\x32\x1e.libp2p.peer.pb.crypto.KeyType\x12\x0c\n\x04\x44\x61ta\x18\x02 \x01(\x0c\"H\n\nPrivateKey\x12,\n\x04Type\x18\x01 \x01(\x0e\x32\x1e.libp2p.peer.pb.crypto.KeyType\x12\x0c\n\x04\x44\x61ta\x18\x02 \x01(\x0c*9\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03\x42,Z*github.com/libp2p/go-libp2p/core/crypto/pbb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.peer.pb.crypto_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + _globals['DESCRIPTOR']._options = None + _globals['DESCRIPTOR']._serialized_options = b'Z*github.com/libp2p/go-libp2p/core/crypto/pb' + _globals['_KEYTYPE']._serialized_start=201 + _globals['_KEYTYPE']._serialized_end=258 + _globals['_PUBLICKEY']._serialized_start=54 + _globals['_PUBLICKEY']._serialized_end=125 + _globals['_PRIVATEKEY']._serialized_start=127 + _globals['_PRIVATEKEY']._serialized_end=199 +# @@protoc_insertion_point(module_scope) diff --git a/libp2p/peer/pb/crypto_pb2.pyi b/libp2p/peer/pb/crypto_pb2.pyi new file mode 100644 index 00000000..f23c1b65 --- /dev/null +++ b/libp2p/peer/pb/crypto_pb2.pyi @@ -0,0 +1,33 @@ +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class KeyType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + RSA: _ClassVar[KeyType] + Ed25519: _ClassVar[KeyType] + Secp256k1: _ClassVar[KeyType] + ECDSA: _ClassVar[KeyType] +RSA: KeyType +Ed25519: KeyType +Secp256k1: KeyType +ECDSA: KeyType + +class PublicKey(_message.Message): + __slots__ = ("Type", "Data") + TYPE_FIELD_NUMBER: _ClassVar[int] + DATA_FIELD_NUMBER: _ClassVar[int] + Type: KeyType + Data: bytes + def __init__(self, Type: _Optional[_Union[KeyType, str]] = ..., Data: _Optional[bytes] = ...) -> None: ... + +class PrivateKey(_message.Message): + __slots__ = ("Type", "Data") + TYPE_FIELD_NUMBER: _ClassVar[int] + DATA_FIELD_NUMBER: _ClassVar[int] + Type: KeyType + Data: bytes + def __init__(self, Type: _Optional[_Union[KeyType, str]] = ..., Data: _Optional[bytes] = ...) -> None: ... diff --git a/libp2p/peer/pb/envelope.proto b/libp2p/peer/pb/envelope.proto new file mode 100644 index 00000000..7eb498fb --- /dev/null +++ b/libp2p/peer/pb/envelope.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package libp2p.peer.pb.record; + +import "libp2p/peer/pb/crypto.proto"; + +option go_package = "github.com/libp2p/go-libp2p/core/record/pb"; + +message Envelope { + libp2p.peer.pb.crypto.PublicKey public_key = 1; + bytes payload_type = 2; + bytes payload = 3; + bytes signature = 5; +} diff --git a/libp2p/peer/pb/envelope_pb2.py b/libp2p/peer/pb/envelope_pb2.py new file mode 100644 index 00000000..f731cb25 --- /dev/null +++ b/libp2p/peer/pb/envelope_pb2.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: libp2p/peer/pb/envelope.proto +# Protobuf Python Version: 4.25.3 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from libp2p.peer.pb import crypto_pb2 as libp2p_dot_peer_dot_pb_dot_crypto__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1dlibp2p/peer/pb/envelope.proto\x12\x15libp2p.peer.pb.record\x1a\x1blibp2p/peer/pb/crypto.proto\"z\n\x08\x45nvelope\x12\x34\n\npublic_key\x18\x01 \x01(\x0b\x32 .libp2p.peer.pb.crypto.PublicKey\x12\x14\n\x0cpayload_type\x18\x02 \x01(\x0c\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x42,Z*github.com/libp2p/go-libp2p/core/record/pbb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.peer.pb.envelope_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + _globals['DESCRIPTOR']._options = None + _globals['DESCRIPTOR']._serialized_options = b'Z*github.com/libp2p/go-libp2p/core/record/pb' + _globals['_ENVELOPE']._serialized_start=85 + _globals['_ENVELOPE']._serialized_end=207 +# @@protoc_insertion_point(module_scope) diff --git a/libp2p/peer/pb/envelope_pb2.pyi b/libp2p/peer/pb/envelope_pb2.pyi new file mode 100644 index 00000000..c6f383aa --- /dev/null +++ b/libp2p/peer/pb/envelope_pb2.pyi @@ -0,0 +1,18 @@ +from libp2p.peer.pb import crypto_pb2 as _crypto_pb2 +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class Envelope(_message.Message): + __slots__ = ("public_key", "payload_type", "payload", "signature") + PUBLIC_KEY_FIELD_NUMBER: _ClassVar[int] + PAYLOAD_TYPE_FIELD_NUMBER: _ClassVar[int] + PAYLOAD_FIELD_NUMBER: _ClassVar[int] + SIGNATURE_FIELD_NUMBER: _ClassVar[int] + public_key: _crypto_pb2.PublicKey + payload_type: bytes + payload: bytes + signature: bytes + def __init__(self, public_key: _Optional[_Union[_crypto_pb2.PublicKey, _Mapping]] = ..., payload_type: _Optional[bytes] = ..., payload: _Optional[bytes] = ..., signature: _Optional[bytes] = ...) -> None: ... # type: ignore[type-arg] diff --git a/libp2p/peer/pb/peer_record.proto b/libp2p/peer/pb/peer_record.proto new file mode 100644 index 00000000..c5022f49 --- /dev/null +++ b/libp2p/peer/pb/peer_record.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package peer.pb; + +option go_package = "github.com/libp2p/go-libp2p/core/peer/pb"; + +// PeerRecord messages contain information that is useful to share with other peers. +// Currently, a PeerRecord contains the public listen addresses for a peer, but this +// is expected to expand to include other information in the future. +// +// PeerRecords are designed to be serialized to bytes and placed inside of +// SignedEnvelopes before sharing with other peers. +// See https://github.com/libp2p/go-libp2p/blob/master/core/record/pb/envelope.proto for +// the SignedEnvelope definition. +message PeerRecord { + + // AddressInfo is a wrapper around a binary multiaddr. It is defined as a + // separate message to allow us to add per-address metadata in the future. + message AddressInfo { + bytes multiaddr = 1; + } + + // peer_id contains a libp2p peer id in its binary representation. + bytes peer_id = 1; + + // seq contains a monotonically-increasing sequence counter to order PeerRecords in time. + uint64 seq = 2; + + // addresses is a list of public listen addresses for the peer. + repeated AddressInfo addresses = 3; +} diff --git a/libp2p/peer/pb/peer_record_pb2.py b/libp2p/peer/pb/peer_record_pb2.py new file mode 100644 index 00000000..9a7f3a6f --- /dev/null +++ b/libp2p/peer/pb/peer_record_pb2.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: libp2p/peer/pb/peer_record.proto +# Protobuf Python Version: 4.25.3 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n libp2p/peer/pb/peer_record.proto\x12\x07peer.pb\"\x80\x01\n\nPeerRecord\x12\x0f\n\x07peer_id\x18\x01 \x01(\x0c\x12\x0b\n\x03seq\x18\x02 \x01(\x04\x12\x32\n\taddresses\x18\x03 \x03(\x0b\x32\x1f.peer.pb.PeerRecord.AddressInfo\x1a \n\x0b\x41\x64\x64ressInfo\x12\x11\n\tmultiaddr\x18\x01 \x01(\x0c\x42*Z(github.com/libp2p/go-libp2p/core/peer/pbb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.peer.pb.peer_record_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + _globals['DESCRIPTOR']._options = None + _globals['DESCRIPTOR']._serialized_options = b'Z(github.com/libp2p/go-libp2p/core/peer/pb' + _globals['_PEERRECORD']._serialized_start=46 + _globals['_PEERRECORD']._serialized_end=174 + _globals['_PEERRECORD_ADDRESSINFO']._serialized_start=142 + _globals['_PEERRECORD_ADDRESSINFO']._serialized_end=174 +# @@protoc_insertion_point(module_scope) diff --git a/libp2p/peer/pb/peer_record_pb2.pyi b/libp2p/peer/pb/peer_record_pb2.pyi new file mode 100644 index 00000000..97ee1657 --- /dev/null +++ b/libp2p/peer/pb/peer_record_pb2.pyi @@ -0,0 +1,21 @@ +from google.protobuf.internal import containers as _containers +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class PeerRecord(_message.Message): + __slots__ = ("peer_id", "seq", "addresses") + class AddressInfo(_message.Message): + __slots__ = ("multiaddr",) + MULTIADDR_FIELD_NUMBER: _ClassVar[int] + multiaddr: bytes + def __init__(self, multiaddr: _Optional[bytes] = ...) -> None: ... + PEER_ID_FIELD_NUMBER: _ClassVar[int] + SEQ_FIELD_NUMBER: _ClassVar[int] + ADDRESSES_FIELD_NUMBER: _ClassVar[int] + peer_id: bytes + seq: int + addresses: _containers.RepeatedCompositeFieldContainer[PeerRecord.AddressInfo] + def __init__(self, peer_id: _Optional[bytes] = ..., seq: _Optional[int] = ..., addresses: _Optional[_Iterable[_Union[PeerRecord.AddressInfo, _Mapping]]] = ...) -> None: ... # type: ignore[type-arg] diff --git a/libp2p/peer/peer_record.py b/libp2p/peer/peer_record.py new file mode 100644 index 00000000..535907b2 --- /dev/null +++ b/libp2p/peer/peer_record.py @@ -0,0 +1,251 @@ +from collections.abc import Sequence +import threading +import time +from typing import Any + +from multiaddr import Multiaddr + +from libp2p.abc import IPeerRecord +from libp2p.peer.id import ID +import libp2p.peer.pb.peer_record_pb2 as pb +from libp2p.peer.peerinfo import PeerInfo + +PEER_RECORD_ENVELOPE_DOMAIN = "libp2p-peer-record" +PEER_RECORD_ENVELOPE_PAYLOAD_TYPE = b"\x03\x01" + +_last_timestamp_lock = threading.Lock() +_last_timestamp: int = 0 + + +class PeerRecord(IPeerRecord): + """ + A record that contains metatdata about a peer in the libp2p network. + + This includes: + - `peer_id`: The peer's globally unique indentifier. + - `addrs`: A list of the peer's publicly reachable multiaddrs. + - `seq`: A strictly monotonically increasing timestamp used + to order records over time. + + PeerRecords are designed to be signed and transmitted in libp2p routing Envelopes. + """ + + peer_id: ID + addrs: list[Multiaddr] + seq: int + + def __init__( + self, + peer_id: ID | None = None, + addrs: list[Multiaddr] | None = None, + seq: int | None = None, + ) -> None: + """ + Initialize a new PeerRecord. + If `seq` is not provided, a timestamp-based strictly increasing sequence + number will be generated. + + :param peer_id: ID of the peer this record refers to. + :param addrs: Public multiaddrs of the peer. + :param seq: Monotonic sequence number. + + """ + if peer_id is not None: + self.peer_id = peer_id + self.addrs = addrs or [] + if seq is not None: + self.seq = seq + else: + self.seq = timestamp_seq() + + def __repr__(self) -> str: + return ( + f"PeerRecord(\n" + f" peer_id={self.peer_id},\n" + f" multiaddrs={[str(m) for m in self.addrs]},\n" + f" seq={self.seq}\n" + f")" + ) + + def domain(self) -> str: + """ + Return the domain string associated with this PeerRecord. + + Used during record signing and envelope validation to identify the record type. + """ + return PEER_RECORD_ENVELOPE_DOMAIN + + def codec(self) -> bytes: + """ + Return the codec identifier for PeerRecords. + + This binary perfix helps distinguish PeerRecords in serialized envelopes. + """ + return PEER_RECORD_ENVELOPE_PAYLOAD_TYPE + + def to_protobuf(self) -> pb.PeerRecord: + """ + Convert the current PeerRecord into a ProtoBuf PeerRecord message. + + :raises ValueError: if peer_id serialization fails. + :return: A ProtoBuf-encoded PeerRecord message object. + """ + try: + id_bytes = self.peer_id.to_bytes() + except Exception as e: + raise ValueError(f"failed to marshal peer_id: {e}") + + msg = pb.PeerRecord() + msg.peer_id = id_bytes + msg.seq = self.seq + msg.addresses.extend(addrs_to_protobuf(self.addrs)) + return msg + + def marshal_record(self) -> bytes: + """ + Serialize a PeerRecord into raw bytes suitable for embedding in an Envelope. + + This is typically called during the process of signing or sealing the record. + :raises ValueError: if serialization to protobuf fails. + :return: Serialized PeerRecord bytes. + """ + try: + msg = self.to_protobuf() + return msg.SerializeToString() + except Exception as e: + raise ValueError(f"failed to marshal PeerRecord: {e}") + + def equal(self, other: Any) -> bool: + """ + Check if this PeerRecord is identical to another. + + Two PeerRecords are considered equal if: + - Their peer IDs match. + - Their sequence numbers are identical. + - Their address lists are identical and in the same order. + + :param other: Another PeerRecord instance. + :return: True if all fields mathch, False otherwise. + """ + if isinstance(other, PeerRecord): + if self.peer_id == other.peer_id: + if self.seq == other.seq: + if len(self.addrs) == len(other.addrs): + for a1, a2 in zip(self.addrs, other.addrs): + if a1 == a2: + continue + else: + return False + return True + return False + + +def unmarshal_record(data: bytes) -> PeerRecord: + """ + Deserialize a PeerRecord from its serialized byte representation. + + Typically used when receiveing a PeerRecord inside a signed routing Envelope. + + :param data: Serialized protobuf-encoded bytes. + :raises ValueError: if parsing or conversion fails. + :reurn: A valid PeerRecord instance. + """ + if data is None: + raise ValueError("cannot unmarshal PeerRecord from None") + + msg = pb.PeerRecord() + try: + msg.ParseFromString(data) + except Exception as e: + raise ValueError(f"Failed to parse PeerRecord protobuf: {e}") + + try: + record = peer_record_from_protobuf(msg) + except Exception as e: + raise ValueError(f"Failed to convert protobuf to PeerRecord: {e}") + + return record + + +def timestamp_seq() -> int: + """ + Generate a strictly increasing timestamp-based sequence number. + + Ensures that even if multiple PeerRecords are generated in the same nanosecond, + their `seq` values will still be strictly increasing by using a lock to track the + last value. + + :return: A strictly increasing integer timestamp. + """ + global _last_timestamp + now = int(time.time_ns()) + with _last_timestamp_lock: + if now <= _last_timestamp: + now = _last_timestamp + 1 + _last_timestamp = now + return now + + +def peer_record_from_peer_info(info: PeerInfo) -> PeerRecord: + """ + Create a PeerRecord from a PeerInfo object. + + This automatically assigns a timestamp-based sequence number to the record. + :param info: A PeerInfo instance (contains peer_id and addrs). + :return: A PeerRecord instance. + """ + record = PeerRecord() + record.peer_id = info.peer_id + record.addrs = info.addrs + return record + + +def peer_record_from_protobuf(msg: pb.PeerRecord) -> PeerRecord: + """ + Convert a protobuf PeerRecord message into a PeerRecord object. + + :param msg: Protobuf PeerRecord message. + :raises ValueError: if the peer_id cannot be parsed. + :return: A deserialized PeerRecord instance. + """ + try: + peer_id = ID(msg.peer_id) + except Exception as e: + raise ValueError(f"Failed to unmarshal peer_id: {e}") + + addrs = addrs_from_protobuf(msg.addresses) + seq = msg.seq + + return PeerRecord(peer_id, addrs, seq) + + +def addrs_from_protobuf(addrs: Sequence[pb.PeerRecord.AddressInfo]) -> list[Multiaddr]: + """ + Convert a list of protobuf address records to Multiaddr objects. + + :param addrs: A list of protobuf PeerRecord.AddressInfo messages. + :return: A list of decoded Multiaddr instances (invalid ones are skipped). + """ + out = [] + for addr_info in addrs: + try: + addr = Multiaddr(addr_info.multiaddr) + out.append(addr) + except Exception: + continue + return out + + +def addrs_to_protobuf(addrs: list[Multiaddr]) -> list[pb.PeerRecord.AddressInfo]: + """ + Convert a list of Multiaddr objects into their protobuf representation. + + :param addrs: A list of Multiaddr instances. + :return: A list of PeerRecord.AddressInfo protobuf messages. + """ + out = [] + for addr in addrs: + addr_info = pb.PeerRecord.AddressInfo() + addr_info.multiaddr = addr.to_bytes() + out.append(addr_info) + return out diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index 7f67e575..1f5ea36a 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -23,6 +23,7 @@ from libp2p.crypto.keys import ( PrivateKey, PublicKey, ) +from libp2p.peer.envelope import Envelope from .id import ( ID, @@ -38,12 +39,25 @@ from .peerinfo import ( PERMANENT_ADDR_TTL = 0 +# TODO: Set up an async task for periodic peer-store cleanup +# for expired addresses and records. +class PeerRecordState: + envelope: Envelope + seq: int + + def __init__(self, envelope: Envelope, seq: int): + self.envelope = envelope + self.seq = seq + + class PeerStore(IPeerStore): peer_data_map: dict[ID, PeerData] - def __init__(self) -> None: + def __init__(self, max_records: int = 10000) -> None: self.peer_data_map = defaultdict(PeerData) self.addr_update_channels: dict[ID, MemorySendChannel[Multiaddr]] = {} + self.peer_record_map: dict[ID, PeerRecordState] = {} + self.max_records = max_records def peer_info(self, peer_id: ID) -> PeerInfo: """ @@ -70,6 +84,10 @@ class PeerStore(IPeerStore): else: raise PeerStoreError("peer ID not found") + # Clear the peer records + if peer_id in self.peer_record_map: + self.peer_record_map.pop(peer_id, None) + def valid_peer_ids(self) -> list[ID]: """ :return: all of the valid peer IDs stored in peer store @@ -82,6 +100,38 @@ class PeerStore(IPeerStore): peer_data.clear_addrs() return valid_peer_ids + def _enforce_record_limit(self) -> None: + """Enforce maximum number of stored records.""" + if len(self.peer_record_map) > self.max_records: + # Record oldest records based on seequence number + sorted_records = sorted( + self.peer_record_map.items(), key=lambda x: x[1].seq + ) + records_to_remove = len(self.peer_record_map) - self.max_records + for peer_id, _ in sorted_records[:records_to_remove]: + self.maybe_delete_peer_record(peer_id) + del self.peer_record_map[peer_id] + + async def start_cleanup_task(self, cleanup_interval: int = 3600) -> None: + """Start periodic cleanup of expired peer records and addresses.""" + while True: + await trio.sleep(cleanup_interval) + self._cleanup_expired_records() + + def _cleanup_expired_records(self) -> None: + """Remove expired peer records and addresses""" + expired_peers = [] + + for peer_id, peer_data in self.peer_data_map.items(): + if peer_data.is_expired(): + expired_peers.append(peer_id) + + for peer_id in expired_peers: + self.maybe_delete_peer_record(peer_id) + del self.peer_data_map[peer_id] + + self._enforce_record_limit() + # --------PROTO-BOOK-------- def get_protocols(self, peer_id: ID) -> list[str]: @@ -165,6 +215,85 @@ class PeerStore(IPeerStore): peer_data = self.peer_data_map[peer_id] peer_data.clear_metadata() + # -----CERT-ADDR-BOOK----- + + # TODO: Make proper use of this function + def maybe_delete_peer_record(self, peer_id: ID) -> None: + """ + Delete the signed peer record for a peer if it has no know + (non-expired) addresses. + + This is a garbage collection mechanism: if all addresses for a peer have expired + or been cleared, there's no point holding onto its signed `Envelope` + + :param peer_id: The peer whose record we may delete/ + """ + if peer_id in self.peer_record_map: + if not self.addrs(peer_id): + self.peer_record_map.pop(peer_id, None) + + def consume_peer_record(self, envelope: Envelope, ttl: int) -> bool: + """ + Accept and store a signed PeerRecord, unless it's older than + the one already stored. + + This function: + - Extracts the peer ID and sequence number from the envelope + - Rejects the record if it's older (lower seq) + - Updates the stored peer record and replaces associated addresses if accepted + + :param envelope: Signed envelope containing a PeerRecord. + :param ttl: Time-to-live for the included multiaddrs (in seconds). + :return: True if the record was accepted and stored; False if it was rejected. + """ + record = envelope.record() + peer_id = record.peer_id + + existing = self.peer_record_map.get(peer_id) + if existing and existing.seq > record.seq: + return False # reject older record + + new_addrs = set(record.addrs) + + self.peer_record_map[peer_id] = PeerRecordState(envelope, record.seq) + self.peer_data_map[peer_id].clear_addrs() + self.add_addrs(peer_id, list(new_addrs), ttl) + + return True + + def consume_peer_records(self, envelopes: list[Envelope], ttl: int) -> list[bool]: + """Consume multiple peer records in a single operation.""" + results = [] + for envelope in envelopes: + results.append(self.consume_peer_record(envelope, ttl)) + return results + + def get_peer_record(self, peer_id: ID) -> Envelope | None: + """ + Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists + and is still relevant. + + First, it runs cleanup via `maybe_delete_peer_record` to purge stale data. + Then it checks whether the peer has valid, unexpired addresses before + returning the associated envelope. + + :param peer_id: The peer to look up. + :return: The signed Envelope if the peer is known and has valid + addresses; None otherwise. + + """ + self.maybe_delete_peer_record(peer_id) + + # Check if the peer has any valid addresses + if ( + peer_id in self.peer_data_map + and not self.peer_data_map[peer_id].is_expired() + ): + state = self.peer_record_map.get(peer_id) + if state is not None: + return state.envelope + return None + # -------ADDR-BOOK-------- def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int = 0) -> None: @@ -193,6 +322,8 @@ class PeerStore(IPeerStore): except trio.WouldBlock: pass # Or consider logging / dropping / replacing stream + self.maybe_delete_peer_record(peer_id) + def addrs(self, peer_id: ID) -> list[Multiaddr]: """ :param peer_id: peer ID to get addrs for @@ -216,6 +347,8 @@ class PeerStore(IPeerStore): if peer_id in self.peer_data_map: self.peer_data_map[peer_id].clear_addrs() + self.maybe_delete_peer_record(peer_id) + def peers_with_addrs(self) -> list[ID]: """ :return: all of the peer IDs which has addrsfloat stored in peer store diff --git a/newsfragments/753.feature.rst b/newsfragments/753.feature.rst new file mode 100644 index 00000000..9daa3c6c --- /dev/null +++ b/newsfragments/753.feature.rst @@ -0,0 +1,2 @@ +Added the `Certified Addr-Book` interface supported by `Envelope` and `PeerRecord` class. +Integrated the signed-peer-record transfer in the identify/push protocols. diff --git a/pyproject.toml b/pyproject.toml index 39818244..f0add7d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ dependencies = [ "multiaddr @ git+https://github.com/multiformats/py-multiaddr.git@db8124e2321f316d3b7d2733c7df11d6ad9c03e6", "mypy-protobuf>=3.0.0", "noiseprotocol>=0.3.0", - "protobuf>=4.21.0,<5.0.0", + "protobuf>=4.25.0,<5.0.0", "pycryptodome>=3.9.2", "pymultihash>=0.8.2", "pynacl>=1.3.0", diff --git a/tests/core/identity/identify/test_identify.py b/tests/core/identity/identify/test_identify.py index ee721299..ae7b4ab1 100644 --- a/tests/core/identity/identify/test_identify.py +++ b/tests/core/identity/identify/test_identify.py @@ -13,6 +13,8 @@ from libp2p.identity.identify.identify import ( _multiaddr_to_bytes, parse_identify_response, ) +from libp2p.peer.envelope import Envelope, consume_envelope, unmarshal_envelope +from libp2p.peer.peer_record import unmarshal_record from tests.utils.factories import ( host_pair_factory, ) @@ -40,6 +42,19 @@ async def test_identify_protocol(security_protocol): # Parse the response (handles both old and new formats) identify_response = parse_identify_response(response) + # Validate the recieved envelope and then store it in the certified-addr-book + envelope, record = consume_envelope( + identify_response.signedPeerRecord, "libp2p-peer-record" + ) + assert host_b.peerstore.consume_peer_record(envelope, ttl=7200) + + # Check if the peer_id in the record is same as of host_a + assert record.peer_id == host_a.get_id() + + # Check if the peer-record is correctly consumed + assert host_a.get_addrs() == host_b.peerstore.addrs(host_a.get_id()) + assert isinstance(host_b.peerstore.get_peer_record(host_a.get_id()), Envelope) + logger.debug("host_a: %s", host_a.get_addrs()) logger.debug("host_b: %s", host_b.get_addrs()) @@ -71,5 +86,14 @@ async def test_identify_protocol(security_protocol): # Check protocols assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols()) - # sanity check - assert identify_response == _mk_identify_protobuf(host_a, cleaned_addr) + # sanity check if the peer_id of the identify msg are same + assert ( + unmarshal_record( + unmarshal_envelope(identify_response.signedPeerRecord).raw_payload + ).peer_id + == unmarshal_record( + unmarshal_envelope( + _mk_identify_protobuf(host_a, cleaned_addr).signedPeerRecord + ).raw_payload + ).peer_id + ) diff --git a/tests/core/peer/test_addrbook.py b/tests/core/peer/test_addrbook.py index 1b642cb2..ea736654 100644 --- a/tests/core/peer/test_addrbook.py +++ b/tests/core/peer/test_addrbook.py @@ -3,7 +3,10 @@ from multiaddr import ( Multiaddr, ) +from libp2p.crypto.rsa import create_new_key_pair +from libp2p.peer.envelope import Envelope, seal_record from libp2p.peer.id import ID +from libp2p.peer.peer_record import PeerRecord from libp2p.peer.peerstore import ( PeerStore, PeerStoreError, @@ -84,3 +87,53 @@ def test_peers_with_addrs(): store.clear_addrs(ID(b"peer2")) assert set(store.peers_with_addrs()) == {ID(b"peer3")} + + +def test_ceritified_addr_book(): + store = PeerStore() + + key_pair = create_new_key_pair() + peer_id = ID.from_pubkey(key_pair.public_key) + addrs = [ + Multiaddr("/ip4/127.0.0.1/tcp/9000"), + Multiaddr("/ip4/127.0.0.1/tcp/9001"), + ] + ttl = 60 + + # Construct signed PereRecord + record = PeerRecord(peer_id, addrs, 21) + envelope = seal_record(record, key_pair.private_key) + + result = store.consume_peer_record(envelope, ttl) + assert result is True + # Retrieve the record + + retrieved = store.get_peer_record(peer_id) + assert retrieved is not None + assert isinstance(retrieved, Envelope) + + addr_list = store.addrs(peer_id) + assert set(addr_list) == set(addrs) + + # Now try to push an older record (should be rejected) + old_record = PeerRecord(peer_id, [Multiaddr("/ip4/10.0.0.1/tcp/4001")], 20) + old_envelope = seal_record(old_record, key_pair.private_key) + result = store.consume_peer_record(old_envelope, ttl) + assert result is False + + # Push a new record (should override) + new_addrs = [Multiaddr("/ip4/192.168.0.1/tcp/5001")] + new_record = PeerRecord(peer_id, new_addrs, 23) + new_envelope = seal_record(new_record, key_pair.private_key) + result = store.consume_peer_record(new_envelope, ttl) + assert result is True + + # Confirm the record is updated + latest = store.get_peer_record(peer_id) + assert isinstance(latest, Envelope) + assert latest.record().seq == 23 + + # Merged addresses = old addres + new_addrs + expected_addrs = set(new_addrs) + actual_addrs = set(store.addrs(peer_id)) + assert actual_addrs == expected_addrs diff --git a/tests/core/peer/test_envelope.py b/tests/core/peer/test_envelope.py new file mode 100644 index 00000000..74d46077 --- /dev/null +++ b/tests/core/peer/test_envelope.py @@ -0,0 +1,129 @@ +from multiaddr import Multiaddr + +from libp2p.crypto.rsa import ( + create_new_key_pair, +) +from libp2p.peer.envelope import ( + Envelope, + consume_envelope, + make_unsigned, + seal_record, + unmarshal_envelope, +) +from libp2p.peer.id import ID +import libp2p.peer.pb.crypto_pb2 as crypto_pb +import libp2p.peer.pb.envelope_pb2 as env_pb +from libp2p.peer.peer_record import PeerRecord + +DOMAIN = "libp2p-peer-record" + + +def test_basic_protobuf_serialization_deserialization(): + pubkey = crypto_pb.PublicKey() + pubkey.Type = crypto_pb.KeyType.Ed25519 + pubkey.Data = b"\x01\x02\x03" + + env = env_pb.Envelope() + env.public_key.CopyFrom(pubkey) + env.payload_type = b"\x03\x01" + env.payload = b"test-payload" + env.signature = b"signature-bytes" + + serialized = env.SerializeToString() + + new_env = env_pb.Envelope() + new_env.ParseFromString(serialized) + + assert new_env.public_key.Type == crypto_pb.KeyType.Ed25519 + assert new_env.public_key.Data == b"\x01\x02\x03" + assert new_env.payload_type == b"\x03\x01" + assert new_env.payload == b"test-payload" + assert new_env.signature == b"signature-bytes" + + +def test_enevelope_marshal_unmarshal_roundtrip(): + keypair = create_new_key_pair() + pubkey = keypair.public_key + private_key = keypair.private_key + + payload_type = b"\x03\x01" + payload = b"test-record" + sig = private_key.sign(make_unsigned(DOMAIN, payload_type, payload)) + + env = Envelope(pubkey, payload_type, payload, sig) + serialized = env.marshal_envelope() + new_env = unmarshal_envelope(serialized) + + assert new_env.public_key == pubkey + assert new_env.payload_type == payload_type + assert new_env.raw_payload == payload + assert new_env.signature == sig + + +def test_seal_and_consume_envelope_roundtrip(): + keypair = create_new_key_pair() + priv_key = keypair.private_key + pub_key = keypair.public_key + + peer_id = ID.from_pubkey(pub_key) + addrs = [Multiaddr("/ip4/127.0.0.1/tcp/4001"), Multiaddr("/ip4/127.0.0.1/tcp/4002")] + seq = 12345 + + record = PeerRecord(peer_id=peer_id, addrs=addrs, seq=seq) + + # Seal + envelope = seal_record(record, priv_key) + serialized = envelope.marshal_envelope() + + # Consume + env, rec = consume_envelope(serialized, record.domain()) + + # Assertions + assert env.public_key == pub_key + assert rec.peer_id == peer_id + assert rec.seq == seq + assert rec.addrs == addrs + + +def test_envelope_equal(): + # Create a new keypair + keypair = create_new_key_pair() + private_key = keypair.private_key + + # Create a mock PeerRecord + record = PeerRecord( + peer_id=ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px"), + seq=1, + addrs=[Multiaddr("/ip4/127.0.0.1/tcp/4001")], + ) + + # Seal it into an Envelope + env1 = seal_record(record, private_key) + + # Create a second identical envelope + env2 = Envelope( + public_key=env1.public_key, + payload_type=env1.payload_type, + raw_payload=env1.raw_payload, + signature=env1.signature, + ) + + # They should be equal + assert env1.equal(env2) + + # Now change something — payload type + env2.payload_type = b"\x99\x99" + assert not env1.equal(env2) + + # Restore payload_type but change signature + env2.payload_type = env1.payload_type + env2.signature = b"wrong-signature" + assert not env1.equal(env2) + + # Restore signature but change payload + env2.signature = env1.signature + env2.raw_payload = b"tampered" + assert not env1.equal(env2) + + # Finally, test with a non-envelope object + assert not env1.equal("not-an-envelope") diff --git a/tests/core/peer/test_peer_record.py b/tests/core/peer/test_peer_record.py new file mode 100644 index 00000000..2e4a6029 --- /dev/null +++ b/tests/core/peer/test_peer_record.py @@ -0,0 +1,112 @@ +import time + +from multiaddr import Multiaddr + +from libp2p.peer.id import ID +import libp2p.peer.pb.peer_record_pb2 as pb +from libp2p.peer.peer_record import ( + PeerRecord, + addrs_from_protobuf, + peer_record_from_protobuf, + unmarshal_record, +) + +# Testing methods from PeerRecord base class and PeerRecord protobuf: + + +def test_basic_protobuf_serializatrion_deserialization(): + record = pb.PeerRecord() + record.seq = 1 + + serialized = record.SerializeToString() + new_record = pb.PeerRecord() + new_record.ParseFromString(serialized) + + assert new_record.seq == 1 + + +def test_timestamp_seq_monotonicity(): + rec1 = PeerRecord() + time.sleep(1) + rec2 = PeerRecord() + + assert isinstance(rec1.seq, int) + assert isinstance(rec2.seq, int) + assert rec2.seq > rec1.seq, f"Expected seq2 ({rec2.seq}) > seq1 ({rec1.seq})" + + +def test_addrs_from_protobuf_multiple_addresses(): + ma1 = Multiaddr("/ip4/127.0.0.1/tcp/4001") + ma2 = Multiaddr("/ip4/127.0.0.1/tcp/4002") + + addr_info1 = pb.PeerRecord.AddressInfo() + addr_info1.multiaddr = ma1.to_bytes() + + addr_info2 = pb.PeerRecord.AddressInfo() + addr_info2.multiaddr = ma2.to_bytes() + + result = addrs_from_protobuf([addr_info1, addr_info2]) + assert result == [ma1, ma2] + + +def test_peer_record_from_protobuf(): + peer_id = ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px") + record = pb.PeerRecord() + record.peer_id = peer_id.to_bytes() + record.seq = 42 + + for addr_str in ["/ip4/127.0.0.1/tcp/4001", "/ip4/127.0.0.1/tcp/4002"]: + ma = Multiaddr(addr_str) + addr_info = pb.PeerRecord.AddressInfo() + addr_info.multiaddr = ma.to_bytes() + record.addresses.append(addr_info) + + result = peer_record_from_protobuf(record) + + assert result.peer_id == peer_id + assert result.seq == 42 + assert len(result.addrs) == 2 + assert str(result.addrs[0]) == "/ip4/127.0.0.1/tcp/4001" + assert str(result.addrs[1]) == "/ip4/127.0.0.1/tcp/4002" + + +def test_to_protobuf_generates_correct_message(): + peer_id = ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px") + addrs = [Multiaddr("/ip4/127.0.0.1/tcp/4001")] + seq = 12345 + + record = PeerRecord(peer_id, addrs, seq) + proto = record.to_protobuf() + + assert isinstance(proto, pb.PeerRecord) + assert proto.peer_id == peer_id.to_bytes() + assert proto.seq == seq + assert len(proto.addresses) == 1 + assert proto.addresses[0].multiaddr == addrs[0].to_bytes() + + +def test_unmarshal_record_roundtrip(): + record = PeerRecord( + peer_id=ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px"), + addrs=[Multiaddr("/ip4/127.0.0.1/tcp/4001")], + seq=999, + ) + + serialized = record.to_protobuf().SerializeToString() + deserialized = unmarshal_record(serialized) + + assert deserialized.peer_id == record.peer_id + assert deserialized.seq == record.seq + assert len(deserialized.addrs) == 1 + assert deserialized.addrs[0] == record.addrs[0] + + +def test_marshal_record_and_equal(): + peer_id = ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px") + addrs = [Multiaddr("/ip4/127.0.0.1/tcp/4001")] + original = PeerRecord(peer_id, addrs) + + serialized = original.marshal_record() + deserailzed = unmarshal_record(serialized) + + assert original.equal(deserailzed) diff --git a/tests/core/peer/test_peerstore.py b/tests/core/peer/test_peerstore.py index c5f31767..4aa6c55b 100644 --- a/tests/core/peer/test_peerstore.py +++ b/tests/core/peer/test_peerstore.py @@ -120,3 +120,30 @@ async def test_addr_stream_yields_new_addrs(): nursery.cancel_scope.cancel() assert collected == [addr1, addr2] + + +@pytest.mark.trio +async def test_cleanup_task_remove_expired_data(): + store = PeerStore() + peer_id = ID(b"peer123") + addr = Multiaddr("/ip4/127.0.0.1/tcp/4040") + + # Insert addrs with short TTL (0.01s) + store.add_addr(peer_id, addr, 1) + + assert store.addrs(peer_id) == [addr] + assert peer_id in store.peer_data_map + + # Start cleanup task in a nursery + async with trio.open_nursery() as nursery: + # Run the cleanup task with a short interval so it runs soon + nursery.start_soon(store.start_cleanup_task, 1) + + # Sleep long enough for TTL to expire and cleanup to run + await trio.sleep(3) + + # Cancel the nursery to stop background tasks + nursery.cancel_scope.cancel() + + # Confirm the peer data is gone from the peer_data_map + assert peer_id not in store.peer_data_map