mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Merge branch 'main' into add-ws-transport
This commit is contained in:
@ -43,6 +43,9 @@ async def run(port: int, destination: str) -> None:
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||
host = new_host()
|
||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
if not destination: # its the server
|
||||
|
||||
async def stream_handler(stream: INetStream) -> None:
|
||||
|
||||
@ -45,7 +45,10 @@ async def run(port: int, destination: str, seed: int | None = None) -> None:
|
||||
secret = secrets.token_bytes(32)
|
||||
|
||||
host = new_host(key_pair=create_new_key_pair(secret))
|
||||
async with host.run(listen_addrs=[listen_addr]):
|
||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
print(f"I am {host.get_id().to_string()}")
|
||||
|
||||
if not destination: # its the server
|
||||
|
||||
@ -14,6 +14,8 @@ from libp2p.identity.identify.identify import (
|
||||
identify_handler_for,
|
||||
parse_identify_response,
|
||||
)
|
||||
from libp2p.identity.identify.pb.identify_pb2 import Identify
|
||||
from libp2p.peer.envelope import debug_dump_envelope, unmarshal_envelope
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
)
|
||||
@ -32,10 +34,11 @@ def decode_multiaddrs(raw_addrs):
|
||||
return decoded_addrs
|
||||
|
||||
|
||||
def print_identify_response(identify_response):
|
||||
def print_identify_response(identify_response: Identify):
|
||||
"""Pretty-print Identify response."""
|
||||
public_key_b64 = base64.b64encode(identify_response.public_key).decode("utf-8")
|
||||
listen_addrs = decode_multiaddrs(identify_response.listen_addrs)
|
||||
signed_peer_record = unmarshal_envelope(identify_response.signedPeerRecord)
|
||||
try:
|
||||
observed_addr_decoded = decode_multiaddrs([identify_response.observed_addr])
|
||||
except Exception:
|
||||
@ -51,6 +54,8 @@ def print_identify_response(identify_response):
|
||||
f" Agent Version: {identify_response.agent_version}"
|
||||
)
|
||||
|
||||
debug_dump_envelope(signed_peer_record)
|
||||
|
||||
|
||||
async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
|
||||
localhost_ip = "0.0.0.0"
|
||||
@ -61,12 +66,19 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
|
||||
host_a = new_host()
|
||||
|
||||
# Set up identify handler with specified format
|
||||
# Set use_varint_format = False, if want to checkout the Signed-PeerRecord
|
||||
identify_handler = identify_handler_for(
|
||||
host_a, use_varint_format=use_varint_format
|
||||
)
|
||||
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
|
||||
|
||||
async with host_a.run(listen_addrs=[listen_addr]):
|
||||
async with (
|
||||
host_a.run(listen_addrs=[listen_addr]),
|
||||
trio.open_nursery() as nursery,
|
||||
):
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host_a.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
# Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
|
||||
# connections
|
||||
server_addr = str(host_a.get_addrs()[0])
|
||||
@ -125,7 +137,13 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
|
||||
host_b = new_host()
|
||||
|
||||
async with host_b.run(listen_addrs=[listen_addr]):
|
||||
async with (
|
||||
host_b.run(listen_addrs=[listen_addr]),
|
||||
trio.open_nursery() as nursery,
|
||||
):
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host_b.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
# Connect to the first host
|
||||
print(f"dialer (host_b) listening on {host_b.get_addrs()[0]}")
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
@ -238,9 +256,9 @@ def main() -> None:
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Determine format: raw format if --raw-format is specified, otherwise
|
||||
# length-prefixed
|
||||
use_varint_format = not args.raw_format
|
||||
# Determine format: use varint (length-prefixed) if --raw-format is specified,
|
||||
# otherwise use raw protobuf format (old format)
|
||||
use_varint_format = args.raw_format
|
||||
|
||||
try:
|
||||
if args.destination:
|
||||
|
||||
@ -211,7 +211,15 @@ async def main() -> None:
|
||||
listen_addr_1 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||
listen_addr_2 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||
|
||||
async with host_1.run([listen_addr_1]), host_2.run([listen_addr_2]):
|
||||
async with (
|
||||
host_1.run([listen_addr_1]),
|
||||
host_2.run([listen_addr_2]),
|
||||
trio.open_nursery() as nursery,
|
||||
):
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host_1.get_peerstore().start_cleanup_task, 60)
|
||||
nursery.start_soon(host_2.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
# Get the addresses of both hosts
|
||||
addr_1 = host_1.get_addrs()[0]
|
||||
addr_2 = host_2.get_addrs()[0]
|
||||
|
||||
@ -151,7 +151,10 @@ async def run_node(
|
||||
host = new_host(key_pair=key_pair)
|
||||
listen_addr = Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")
|
||||
|
||||
async with host.run(listen_addrs=[listen_addr]):
|
||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
peer_id = host.get_id().pretty()
|
||||
addr_str = f"/ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}"
|
||||
await connect_to_bootstrap_nodes(host, bootstrap_nodes)
|
||||
|
||||
@ -46,7 +46,10 @@ async def run(port: int) -> None:
|
||||
|
||||
logger.info("Starting peer Discovery")
|
||||
host = new_host(key_pair=key_pair, enable_mDNS=True)
|
||||
async with host.run(listen_addrs=[listen_addr]):
|
||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
|
||||
@ -59,6 +59,9 @@ async def run(port: int, destination: str) -> None:
|
||||
host = new_host(listen_addrs=[listen_addr])
|
||||
|
||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
if not destination:
|
||||
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
|
||||
|
||||
|
||||
@ -144,6 +144,9 @@ async def run(topic: str, destination: str | None, port: int | None) -> None:
|
||||
pubsub = Pubsub(host, gossipsub)
|
||||
termination_event = trio.Event() # Event to signal termination
|
||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||
# Start the peer-store cleanup task
|
||||
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||
|
||||
logger.info(f"Node started with peer ID: {host.get_id()}")
|
||||
logger.info(f"Listening on: {listen_addr}")
|
||||
logger.info("Initializing PubSub and GossipSub...")
|
||||
|
||||
256
libp2p/abc.py
256
libp2p/abc.py
@ -16,6 +16,7 @@ from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
AsyncContextManager,
|
||||
Optional,
|
||||
)
|
||||
|
||||
from multiaddr import (
|
||||
@ -41,20 +42,19 @@ from libp2p.io.abc import (
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
import libp2p.peer.pb.peer_record_pb2 as pb
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from libp2p.peer.envelope import Envelope
|
||||
from libp2p.peer.peer_record import PeerRecord
|
||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||
from libp2p.pubsub.pubsub import (
|
||||
Pubsub,
|
||||
)
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||
|
||||
from libp2p.pubsub.pb import (
|
||||
rpc_pb2,
|
||||
)
|
||||
@ -493,6 +493,71 @@ class IAddrBook(ABC):
|
||||
"""
|
||||
|
||||
|
||||
# ------------------ certified-addr-book interface.py ---------------------
|
||||
class ICertifiedAddrBook(ABC):
|
||||
"""
|
||||
Interface for a certified address book.
|
||||
|
||||
Provides methods for managing signed peer records
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def consume_peer_record(self, envelope: "Envelope", ttl: int) -> bool:
|
||||
"""
|
||||
Accept and store a signed PeerRecord, unless it's older than
|
||||
the one already stored.
|
||||
|
||||
This function:
|
||||
- Extracts the peer ID and sequence number from the envelope
|
||||
- Rejects the record if it's older (lower seq)
|
||||
- Updates the stored peer record and replaces associated
|
||||
addresses if accepted
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
envelope:
|
||||
Signed envelope containing a PeerRecord.
|
||||
ttl:
|
||||
Time-to-live for the included multiaddrs (in seconds).
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_peer_record(self, peer_id: ID) -> Optional["Envelope"]:
|
||||
"""
|
||||
Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists
|
||||
and is still relevant.
|
||||
|
||||
First, it runs cleanup via `maybe_delete_peer_record` to purge stale data.
|
||||
Then it checks whether the peer has valid, unexpired addresses before
|
||||
returning the associated envelope.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
peer_id : ID
|
||||
The peer to look up.
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def maybe_delete_peer_record(self, peer_id: ID) -> None:
|
||||
"""
|
||||
Delete the signed peer record for a peer if it has no know
|
||||
(non-expired) addresses.
|
||||
|
||||
This is a garbage collection mechanism: if all addresses for a peer have expired
|
||||
or been cleared, there's no point holding onto its signed `Envelope`
|
||||
|
||||
Parameters
|
||||
----------
|
||||
peer_id : ID
|
||||
The peer whose record we may delete.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
# -------------------------- keybook interface.py --------------------------
|
||||
|
||||
|
||||
@ -758,7 +823,9 @@ class IProtoBook(ABC):
|
||||
# -------------------------- peerstore interface.py --------------------------
|
||||
|
||||
|
||||
class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook):
|
||||
class IPeerStore(
|
||||
IPeerMetadata, IAddrBook, ICertifiedAddrBook, IKeyBook, IMetrics, IProtoBook
|
||||
):
|
||||
"""
|
||||
Interface for a peer store.
|
||||
|
||||
@ -893,7 +960,65 @@ class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook):
|
||||
|
||||
"""
|
||||
|
||||
# --------CERTIFIED-ADDR-BOOK----------
|
||||
|
||||
@abstractmethod
|
||||
def consume_peer_record(self, envelope: "Envelope", ttl: int) -> bool:
|
||||
"""
|
||||
Accept and store a signed PeerRecord, unless it's older
|
||||
than the one already stored.
|
||||
|
||||
This function:
|
||||
- Extracts the peer ID and sequence number from the envelope
|
||||
- Rejects the record if it's older (lower seq)
|
||||
- Updates the stored peer record and replaces associated addresses if accepted
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
envelope:
|
||||
Signed envelope containing a PeerRecord.
|
||||
ttl:
|
||||
Time-to-live for the included multiaddrs (in seconds).
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_peer_record(self, peer_id: ID) -> Optional["Envelope"]:
|
||||
"""
|
||||
Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists
|
||||
and is still relevant.
|
||||
|
||||
First, it runs cleanup via `maybe_delete_peer_record` to purge stale data.
|
||||
Then it checks whether the peer has valid, unexpired addresses before
|
||||
returning the associated envelope.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
peer_id : ID
|
||||
The peer to look up.
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def maybe_delete_peer_record(self, peer_id: ID) -> None:
|
||||
"""
|
||||
Delete the signed peer record for a peer if it has no
|
||||
know (non-expired) addresses.
|
||||
|
||||
This is a garbage collection mechanism: if all addresses for a peer have expired
|
||||
or been cleared, there's no point holding onto its signed `Envelope`
|
||||
|
||||
Parameters
|
||||
----------
|
||||
peer_id : ID
|
||||
The peer whose record we may delete.
|
||||
|
||||
"""
|
||||
|
||||
# --------KEY-BOOK----------
|
||||
|
||||
@abstractmethod
|
||||
def pubkey(self, peer_id: ID) -> PublicKey:
|
||||
"""
|
||||
@ -1202,6 +1327,10 @@ class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook):
|
||||
def clear_peerdata(self, peer_id: ID) -> None:
|
||||
"""clear_peerdata"""
|
||||
|
||||
@abstractmethod
|
||||
async def start_cleanup_task(self, cleanup_interval: int = 3600) -> None:
|
||||
"""Start periodic cleanup of expired peer records and addresses."""
|
||||
|
||||
|
||||
# -------------------------- listener interface.py --------------------------
|
||||
|
||||
@ -1689,6 +1818,121 @@ class IHost(ABC):
|
||||
"""
|
||||
|
||||
|
||||
# -------------------------- peer-record interface.py --------------------------
|
||||
class IPeerRecord(ABC):
|
||||
"""
|
||||
Interface for a libp2p PeerRecord object.
|
||||
|
||||
A PeerRecord contains metadata about a peer such as its ID, public addresses,
|
||||
and a strictly increasing sequence number for versioning.
|
||||
|
||||
PeerRecords are used in signed routing Envelopes for secure peer data propagation.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def domain(self) -> str:
|
||||
"""
|
||||
Return the domain string for this record type.
|
||||
|
||||
Used in envelope validation to distinguish different record types.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def codec(self) -> bytes:
|
||||
"""
|
||||
Return a binary codec prefix that identifies the PeerRecord type.
|
||||
|
||||
This is prepended in signed envelopes to allow type-safe decoding.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def to_protobuf(self) -> pb.PeerRecord:
|
||||
"""
|
||||
Convert this PeerRecord into its Protobuf representation.
|
||||
|
||||
:raises ValueError: if serialization fails (e.g., invalid peer ID).
|
||||
:return: A populated protobuf `PeerRecord` message.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def marshal_record(self) -> bytes:
|
||||
"""
|
||||
Serialize this PeerRecord into a byte string.
|
||||
|
||||
Used when signing or sealing the record in an envelope.
|
||||
|
||||
:raises ValueError: if protobuf serialization fails.
|
||||
:return: Byte-encoded PeerRecord.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def equal(self, other: object) -> bool:
|
||||
"""
|
||||
Compare this PeerRecord with another for equality.
|
||||
|
||||
Two PeerRecords are considered equal if:
|
||||
- They have the same `peer_id`
|
||||
- Their `seq` numbers match
|
||||
- Their address lists are identical and ordered
|
||||
|
||||
:param other: Object to compare with.
|
||||
:return: True if equal, False otherwise.
|
||||
"""
|
||||
|
||||
|
||||
# -------------------------- envelope interface.py --------------------------
|
||||
class IEnvelope(ABC):
|
||||
@abstractmethod
|
||||
def marshal_envelope(self) -> bytes:
|
||||
"""
|
||||
Serialize this Envelope into its protobuf wire format.
|
||||
|
||||
Converts all envelope fields into a `pb.Envelope` protobuf message
|
||||
and returns the serialized bytes.
|
||||
|
||||
:return: Serialized envelope as bytes.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def validate(self, domain: str) -> None:
|
||||
"""
|
||||
Verify the envelope's signature within the given domain scope.
|
||||
|
||||
This ensures that the envelope has not been tampered with
|
||||
and was signed under the correct usage context.
|
||||
|
||||
:param domain: Domain string that contextualizes the signature.
|
||||
:raises ValueError: If the signature is invalid.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def record(self) -> "PeerRecord":
|
||||
"""
|
||||
Lazily decode and return the embedded PeerRecord.
|
||||
|
||||
This method unmarshals the payload bytes into a `PeerRecord` instance,
|
||||
using the registered codec to identify the type. The decoded result
|
||||
is cached for future use.
|
||||
|
||||
:return: Decoded PeerRecord object.
|
||||
:raises Exception: If decoding fails or payload type is unsupported.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def equal(self, other: Any) -> bool:
|
||||
"""
|
||||
Compare this Envelope with another for structural equality.
|
||||
|
||||
Two envelopes are considered equal if:
|
||||
- They have the same public key
|
||||
- The payload type and payload bytes match
|
||||
- Their signatures are identical
|
||||
|
||||
:param other: Another object to compare.
|
||||
:return: True if equal, False otherwise.
|
||||
"""
|
||||
|
||||
|
||||
# -------------------------- peerdata interface.py --------------------------
|
||||
|
||||
|
||||
|
||||
@ -15,6 +15,8 @@ from libp2p.custom_types import (
|
||||
from libp2p.network.stream.exceptions import (
|
||||
StreamClosed,
|
||||
)
|
||||
from libp2p.peer.envelope import seal_record
|
||||
from libp2p.peer.peer_record import PeerRecord
|
||||
from libp2p.utils import (
|
||||
decode_varint_with_size,
|
||||
get_agent_version,
|
||||
@ -63,6 +65,11 @@ def _mk_identify_protobuf(
|
||||
laddrs = host.get_addrs()
|
||||
protocols = tuple(str(p) for p in host.get_mux().get_protocols() if p is not None)
|
||||
|
||||
# Create a signed peer-record for the remote peer
|
||||
record = PeerRecord(host.get_id(), host.get_addrs())
|
||||
envelope = seal_record(record, host.get_private_key())
|
||||
protobuf = envelope.marshal_envelope()
|
||||
|
||||
observed_addr = observed_multiaddr.to_bytes() if observed_multiaddr else b""
|
||||
return Identify(
|
||||
protocol_version=PROTOCOL_VERSION,
|
||||
@ -71,6 +78,7 @@ def _mk_identify_protobuf(
|
||||
listen_addrs=map(_multiaddr_to_bytes, laddrs),
|
||||
observed_addr=observed_addr,
|
||||
protocols=protocols,
|
||||
signedPeerRecord=protobuf,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -9,4 +9,5 @@ message Identify {
|
||||
repeated bytes listen_addrs = 2;
|
||||
optional bytes observed_addr = 4;
|
||||
repeated string protocols = 3;
|
||||
optional bytes signedPeerRecord = 8;
|
||||
}
|
||||
|
||||
@ -1,11 +1,12 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: libp2p/identity/identify/pb/identify.proto
|
||||
# Protobuf Python Version: 4.25.3
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf.internal import builder as _builder
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
from google.protobuf.internal import builder as _builder
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
@ -13,13 +14,13 @@ _sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*libp2p/identity/identify/pb/identify.proto\x12\x0bidentify.pb\"\x8f\x01\n\x08Identify\x12\x18\n\x10protocol_version\x18\x05 \x01(\t\x12\x15\n\ragent_version\x18\x06 \x01(\t\x12\x12\n\npublic_key\x18\x01 \x01(\x0c\x12\x14\n\x0clisten_addrs\x18\x02 \x03(\x0c\x12\x15\n\robserved_addr\x18\x04 \x01(\x0c\x12\x11\n\tprotocols\x18\x03 \x03(\t')
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*libp2p/identity/identify/pb/identify.proto\x12\x0bidentify.pb\"\xa9\x01\n\x08Identify\x12\x18\n\x10protocol_version\x18\x05 \x01(\t\x12\x15\n\ragent_version\x18\x06 \x01(\t\x12\x12\n\npublic_key\x18\x01 \x01(\x0c\x12\x14\n\x0clisten_addrs\x18\x02 \x03(\x0c\x12\x15\n\robserved_addr\x18\x04 \x01(\x0c\x12\x11\n\tprotocols\x18\x03 \x03(\t\x12\x18\n\x10signedPeerRecord\x18\x08 \x01(\x0c')
|
||||
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.identity.identify.pb.identify_pb2', globals())
|
||||
_globals = globals()
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.identity.identify.pb.identify_pb2', _globals)
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
|
||||
DESCRIPTOR._options = None
|
||||
_IDENTIFY._serialized_start=60
|
||||
_IDENTIFY._serialized_end=203
|
||||
_globals['_IDENTIFY']._serialized_start=60
|
||||
_globals['_IDENTIFY']._serialized_end=229
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
|
||||
@ -1,46 +1,24 @@
|
||||
"""
|
||||
@generated by mypy-protobuf. Do not edit manually!
|
||||
isort:skip_file
|
||||
"""
|
||||
from google.protobuf.internal import containers as _containers
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import message as _message
|
||||
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Optional as _Optional
|
||||
|
||||
import builtins
|
||||
import collections.abc
|
||||
import google.protobuf.descriptor
|
||||
import google.protobuf.internal.containers
|
||||
import google.protobuf.message
|
||||
import typing
|
||||
DESCRIPTOR: _descriptor.FileDescriptor
|
||||
|
||||
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
|
||||
|
||||
@typing.final
|
||||
class Identify(google.protobuf.message.Message):
|
||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||
|
||||
PROTOCOL_VERSION_FIELD_NUMBER: builtins.int
|
||||
AGENT_VERSION_FIELD_NUMBER: builtins.int
|
||||
PUBLIC_KEY_FIELD_NUMBER: builtins.int
|
||||
LISTEN_ADDRS_FIELD_NUMBER: builtins.int
|
||||
OBSERVED_ADDR_FIELD_NUMBER: builtins.int
|
||||
PROTOCOLS_FIELD_NUMBER: builtins.int
|
||||
protocol_version: builtins.str
|
||||
agent_version: builtins.str
|
||||
public_key: builtins.bytes
|
||||
observed_addr: builtins.bytes
|
||||
@property
|
||||
def listen_addrs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ...
|
||||
@property
|
||||
def protocols(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
protocol_version: builtins.str | None = ...,
|
||||
agent_version: builtins.str | None = ...,
|
||||
public_key: builtins.bytes | None = ...,
|
||||
listen_addrs: collections.abc.Iterable[builtins.bytes] | None = ...,
|
||||
observed_addr: builtins.bytes | None = ...,
|
||||
protocols: collections.abc.Iterable[builtins.str] | None = ...,
|
||||
) -> None: ...
|
||||
def HasField(self, field_name: typing.Literal["agent_version", b"agent_version", "observed_addr", b"observed_addr", "protocol_version", b"protocol_version", "public_key", b"public_key"]) -> builtins.bool: ...
|
||||
def ClearField(self, field_name: typing.Literal["agent_version", b"agent_version", "listen_addrs", b"listen_addrs", "observed_addr", b"observed_addr", "protocol_version", b"protocol_version", "protocols", b"protocols", "public_key", b"public_key"]) -> None: ...
|
||||
|
||||
global___Identify = Identify
|
||||
class Identify(_message.Message):
|
||||
__slots__ = ("protocol_version", "agent_version", "public_key", "listen_addrs", "observed_addr", "protocols", "signedPeerRecord")
|
||||
PROTOCOL_VERSION_FIELD_NUMBER: _ClassVar[int]
|
||||
AGENT_VERSION_FIELD_NUMBER: _ClassVar[int]
|
||||
PUBLIC_KEY_FIELD_NUMBER: _ClassVar[int]
|
||||
LISTEN_ADDRS_FIELD_NUMBER: _ClassVar[int]
|
||||
OBSERVED_ADDR_FIELD_NUMBER: _ClassVar[int]
|
||||
PROTOCOLS_FIELD_NUMBER: _ClassVar[int]
|
||||
SIGNEDPEERRECORD_FIELD_NUMBER: _ClassVar[int]
|
||||
protocol_version: str
|
||||
agent_version: str
|
||||
public_key: bytes
|
||||
listen_addrs: _containers.RepeatedScalarFieldContainer[bytes]
|
||||
observed_addr: bytes
|
||||
protocols: _containers.RepeatedScalarFieldContainer[str]
|
||||
signedPeerRecord: bytes
|
||||
def __init__(self, protocol_version: _Optional[str] = ..., agent_version: _Optional[str] = ..., public_key: _Optional[bytes] = ..., listen_addrs: _Optional[_Iterable[bytes]] = ..., observed_addr: _Optional[bytes] = ..., protocols: _Optional[_Iterable[str]] = ..., signedPeerRecord: _Optional[bytes] = ...) -> None: ...
|
||||
|
||||
@ -20,6 +20,7 @@ from libp2p.custom_types import (
|
||||
from libp2p.network.stream.exceptions import (
|
||||
StreamClosed,
|
||||
)
|
||||
from libp2p.peer.envelope import consume_envelope
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
@ -140,6 +141,19 @@ async def _update_peerstore_from_identify(
|
||||
except Exception as e:
|
||||
logger.error("Error updating protocols for peer %s: %s", peer_id, e)
|
||||
|
||||
if identify_msg.HasField("signedPeerRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from prtobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
identify_msg.signedPeerRecord, "libp2p-peer-record"
|
||||
)
|
||||
# Use a default TTL of 2 hours (7200 seconds)
|
||||
if not peerstore.consume_peer_record(envelope, 7200):
|
||||
logger.error("Updating Certified-Addr-Book was unsuccessful")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified addr book for peer %s: %s", peer_id, e
|
||||
)
|
||||
# Update observed address if present
|
||||
if identify_msg.HasField("observed_addr") and identify_msg.observed_addr:
|
||||
try:
|
||||
|
||||
271
libp2p/peer/envelope.py
Normal file
271
libp2p/peer/envelope.py
Normal file
@ -0,0 +1,271 @@
|
||||
from typing import Any, cast
|
||||
|
||||
from libp2p.crypto.ed25519 import Ed25519PublicKey
|
||||
from libp2p.crypto.keys import PrivateKey, PublicKey
|
||||
from libp2p.crypto.rsa import RSAPublicKey
|
||||
from libp2p.crypto.secp256k1 import Secp256k1PublicKey
|
||||
import libp2p.peer.pb.crypto_pb2 as cryto_pb
|
||||
import libp2p.peer.pb.envelope_pb2 as pb
|
||||
import libp2p.peer.pb.peer_record_pb2 as record_pb
|
||||
from libp2p.peer.peer_record import (
|
||||
PeerRecord,
|
||||
peer_record_from_protobuf,
|
||||
unmarshal_record,
|
||||
)
|
||||
from libp2p.utils.varint import encode_uvarint
|
||||
|
||||
ENVELOPE_DOMAIN = "libp2p-peer-record"
|
||||
PEER_RECORD_CODEC = b"\x03\x01"
|
||||
|
||||
|
||||
class Envelope:
|
||||
"""
|
||||
A signed wrapper around a serialized libp2p record.
|
||||
|
||||
Envelopes are cryptographically signed by the author's private key
|
||||
and are scoped to a specific 'domain' to prevent cross-protocol replay.
|
||||
|
||||
Attributes:
|
||||
public_key: The public key that can verify the envelope's signature.
|
||||
payload_type: A multicodec code identifying the type of payload inside.
|
||||
raw_payload: The raw serialized record data.
|
||||
signature: Signature over the domain-scoped payload content.
|
||||
|
||||
"""
|
||||
|
||||
public_key: PublicKey
|
||||
payload_type: bytes
|
||||
raw_payload: bytes
|
||||
signature: bytes
|
||||
|
||||
_cached_record: PeerRecord | None = None
|
||||
_unmarshal_error: Exception | None = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
public_key: PublicKey,
|
||||
payload_type: bytes,
|
||||
raw_payload: bytes,
|
||||
signature: bytes,
|
||||
):
|
||||
self.public_key = public_key
|
||||
self.payload_type = payload_type
|
||||
self.raw_payload = raw_payload
|
||||
self.signature = signature
|
||||
|
||||
def marshal_envelope(self) -> bytes:
|
||||
"""
|
||||
Serialize this Envelope into its protobuf wire format.
|
||||
|
||||
Converts all envelope fields into a `pb.Envelope` protobuf message
|
||||
and returns the serialized bytes.
|
||||
|
||||
:return: Serialized envelope as bytes.
|
||||
"""
|
||||
pb_env = pb.Envelope(
|
||||
public_key=pub_key_to_protobuf(self.public_key),
|
||||
payload_type=self.payload_type,
|
||||
payload=self.raw_payload,
|
||||
signature=self.signature,
|
||||
)
|
||||
return pb_env.SerializeToString()
|
||||
|
||||
def validate(self, domain: str) -> None:
|
||||
"""
|
||||
Verify the envelope's signature within the given domain scope.
|
||||
|
||||
This ensures that the envelope has not been tampered with
|
||||
and was signed under the correct usage context.
|
||||
|
||||
:param domain: Domain string that contextualizes the signature.
|
||||
:raises ValueError: If the signature is invalid.
|
||||
"""
|
||||
unsigned = make_unsigned(domain, self.payload_type, self.raw_payload)
|
||||
if not self.public_key.verify(unsigned, self.signature):
|
||||
raise ValueError("Invalid envelope signature")
|
||||
|
||||
def record(self) -> PeerRecord:
|
||||
"""
|
||||
Lazily decode and return the embedded PeerRecord.
|
||||
|
||||
This method unmarshals the payload bytes into a `PeerRecord` instance,
|
||||
using the registered codec to identify the type. The decoded result
|
||||
is cached for future use.
|
||||
|
||||
:return: Decoded PeerRecord object.
|
||||
:raises Exception: If decoding fails or payload type is unsupported.
|
||||
"""
|
||||
if self._cached_record is not None:
|
||||
return self._cached_record
|
||||
|
||||
try:
|
||||
if self.payload_type != PEER_RECORD_CODEC:
|
||||
raise ValueError("Unsuported payload type in envelope")
|
||||
msg = record_pb.PeerRecord()
|
||||
msg.ParseFromString(self.raw_payload)
|
||||
|
||||
self._cached_record = peer_record_from_protobuf(msg)
|
||||
return self._cached_record
|
||||
except Exception as e:
|
||||
self._unmarshal_error = e
|
||||
raise
|
||||
|
||||
def equal(self, other: Any) -> bool:
|
||||
"""
|
||||
Compare this Envelope with another for structural equality.
|
||||
|
||||
Two envelopes are considered equal if:
|
||||
- They have the same public key
|
||||
- The payload type and payload bytes match
|
||||
- Their signatures are identical
|
||||
|
||||
:param other: Another object to compare.
|
||||
:return: True if equal, False otherwise.
|
||||
"""
|
||||
if isinstance(other, Envelope):
|
||||
return (
|
||||
self.public_key.__eq__(other.public_key)
|
||||
and self.payload_type == other.payload_type
|
||||
and self.signature == other.signature
|
||||
and self.raw_payload == other.raw_payload
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def pub_key_to_protobuf(pub_key: PublicKey) -> cryto_pb.PublicKey:
|
||||
"""
|
||||
Convert a Python PublicKey object to its protobuf equivalent.
|
||||
|
||||
:param pub_key: A libp2p-compatible PublicKey instance.
|
||||
:return: Serialized protobuf PublicKey message.
|
||||
"""
|
||||
internal_key_type = pub_key.get_type()
|
||||
key_type = cast(cryto_pb.KeyType, internal_key_type.value)
|
||||
data = pub_key.to_bytes()
|
||||
protobuf_key = cryto_pb.PublicKey(Type=key_type, Data=data)
|
||||
return protobuf_key
|
||||
|
||||
|
||||
def pub_key_from_protobuf(pb_key: cryto_pb.PublicKey) -> PublicKey:
|
||||
"""
|
||||
Parse a protobuf PublicKey message into a native libp2p PublicKey.
|
||||
|
||||
Supports Ed25519, RSA, and Secp256k1 key types.
|
||||
|
||||
:param pb_key: Protobuf representation of a public key.
|
||||
:return: Parsed PublicKey object.
|
||||
:raises ValueError: If the key type is unrecognized.
|
||||
"""
|
||||
if pb_key.Type == cryto_pb.KeyType.Ed25519:
|
||||
return Ed25519PublicKey.from_bytes(pb_key.Data)
|
||||
elif pb_key.Type == cryto_pb.KeyType.RSA:
|
||||
return RSAPublicKey.from_bytes(pb_key.Data)
|
||||
elif pb_key.Type == cryto_pb.KeyType.Secp256k1:
|
||||
return Secp256k1PublicKey.from_bytes(pb_key.Data)
|
||||
# libp2p.crypto.ecdsa not implemented
|
||||
else:
|
||||
raise ValueError(f"Unknown key type: {pb_key.Type}")
|
||||
|
||||
|
||||
def seal_record(record: PeerRecord, private_key: PrivateKey) -> Envelope:
|
||||
"""
|
||||
Create and sign a new Envelope from a PeerRecord.
|
||||
|
||||
The record is serialized and signed in the scope of its domain and codec.
|
||||
The result is a self-contained, verifiable Envelope.
|
||||
|
||||
:param record: A PeerRecord to encapsulate.
|
||||
:param private_key: The signer's private key.
|
||||
:return: A signed Envelope instance.
|
||||
"""
|
||||
payload = record.marshal_record()
|
||||
|
||||
unsigned = make_unsigned(record.domain(), record.codec(), payload)
|
||||
signature = private_key.sign(unsigned)
|
||||
|
||||
return Envelope(
|
||||
public_key=private_key.get_public_key(),
|
||||
payload_type=record.codec(),
|
||||
raw_payload=payload,
|
||||
signature=signature,
|
||||
)
|
||||
|
||||
|
||||
def consume_envelope(data: bytes, domain: str) -> tuple[Envelope, PeerRecord]:
|
||||
"""
|
||||
Parse, validate, and decode an Envelope from bytes.
|
||||
|
||||
Validates the envelope's signature using the given domain and decodes
|
||||
the inner payload into a PeerRecord.
|
||||
|
||||
:param data: Serialized envelope bytes.
|
||||
:param domain: Domain string to verify signature against.
|
||||
:return: Tuple of (Envelope, PeerRecord).
|
||||
:raises ValueError: If signature validation or decoding fails.
|
||||
"""
|
||||
env = unmarshal_envelope(data)
|
||||
env.validate(domain)
|
||||
record = env.record()
|
||||
return env, record
|
||||
|
||||
|
||||
def unmarshal_envelope(data: bytes) -> Envelope:
|
||||
"""
|
||||
Deserialize an Envelope from its wire format.
|
||||
|
||||
This parses the protobuf fields without verifying the signature.
|
||||
|
||||
:param data: Serialized envelope bytes.
|
||||
:return: Parsed Envelope object.
|
||||
:raises DecodeError: If protobuf parsing fails.
|
||||
"""
|
||||
pb_env = pb.Envelope()
|
||||
pb_env.ParseFromString(data)
|
||||
pk = pub_key_from_protobuf(pb_env.public_key)
|
||||
|
||||
return Envelope(
|
||||
public_key=pk,
|
||||
payload_type=pb_env.payload_type,
|
||||
raw_payload=pb_env.payload,
|
||||
signature=pb_env.signature,
|
||||
)
|
||||
|
||||
|
||||
def make_unsigned(domain: str, payload_type: bytes, payload: bytes) -> bytes:
|
||||
"""
|
||||
Build a byte buffer to be signed for an Envelope.
|
||||
|
||||
The unsigned byte structure is:
|
||||
varint(len(domain)) || domain ||
|
||||
varint(len(payload_type)) || payload_type ||
|
||||
varint(len(payload)) || payload
|
||||
|
||||
This is the exact input used during signing and verification.
|
||||
|
||||
:param domain: Domain string for signature scoping.
|
||||
:param payload_type: Identifier for the type of payload.
|
||||
:param payload: Raw serialized payload bytes.
|
||||
:return: Byte buffer to be signed or verified.
|
||||
"""
|
||||
fields = [domain.encode(), payload_type, payload]
|
||||
buf = bytearray()
|
||||
|
||||
for field in fields:
|
||||
buf.extend(encode_uvarint(len(field)))
|
||||
buf.extend(field)
|
||||
|
||||
return bytes(buf)
|
||||
|
||||
|
||||
def debug_dump_envelope(env: Envelope) -> None:
|
||||
print("\n=== Envelope ===")
|
||||
print(f"Payload Type: {env.payload_type!r}")
|
||||
print(f"Signature: {env.signature.hex()} ({len(env.signature)} bytes)")
|
||||
print(f"Raw Payload: {env.raw_payload.hex()} ({len(env.raw_payload)} bytes)")
|
||||
|
||||
try:
|
||||
peer_record = unmarshal_record(env.raw_payload)
|
||||
print("\n=== Parsed PeerRecord ===")
|
||||
print(peer_record)
|
||||
except Exception as e:
|
||||
print("Failed to parse PeerRecord:", e)
|
||||
22
libp2p/peer/pb/crypto.proto
Normal file
22
libp2p/peer/pb/crypto.proto
Normal file
@ -0,0 +1,22 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package libp2p.peer.pb.crypto;
|
||||
|
||||
option go_package = "github.com/libp2p/go-libp2p/core/crypto/pb";
|
||||
|
||||
enum KeyType {
|
||||
RSA = 0;
|
||||
Ed25519 = 1;
|
||||
Secp256k1 = 2;
|
||||
ECDSA = 3;
|
||||
}
|
||||
|
||||
message PublicKey {
|
||||
KeyType Type = 1;
|
||||
bytes Data = 2;
|
||||
}
|
||||
|
||||
message PrivateKey {
|
||||
KeyType Type = 1;
|
||||
bytes Data = 2;
|
||||
}
|
||||
31
libp2p/peer/pb/crypto_pb2.py
Normal file
31
libp2p/peer/pb/crypto_pb2.py
Normal file
@ -0,0 +1,31 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: libp2p/peer/pb/crypto.proto
|
||||
# Protobuf Python Version: 4.25.3
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
from google.protobuf.internal import builder as _builder
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1blibp2p/peer/pb/crypto.proto\x12\x15libp2p.peer.pb.crypto\"G\n\tPublicKey\x12,\n\x04Type\x18\x01 \x01(\x0e\x32\x1e.libp2p.peer.pb.crypto.KeyType\x12\x0c\n\x04\x44\x61ta\x18\x02 \x01(\x0c\"H\n\nPrivateKey\x12,\n\x04Type\x18\x01 \x01(\x0e\x32\x1e.libp2p.peer.pb.crypto.KeyType\x12\x0c\n\x04\x44\x61ta\x18\x02 \x01(\x0c*9\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03\x42,Z*github.com/libp2p/go-libp2p/core/crypto/pbb\x06proto3')
|
||||
|
||||
_globals = globals()
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.peer.pb.crypto_pb2', _globals)
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
_globals['DESCRIPTOR']._options = None
|
||||
_globals['DESCRIPTOR']._serialized_options = b'Z*github.com/libp2p/go-libp2p/core/crypto/pb'
|
||||
_globals['_KEYTYPE']._serialized_start=201
|
||||
_globals['_KEYTYPE']._serialized_end=258
|
||||
_globals['_PUBLICKEY']._serialized_start=54
|
||||
_globals['_PUBLICKEY']._serialized_end=125
|
||||
_globals['_PRIVATEKEY']._serialized_start=127
|
||||
_globals['_PRIVATEKEY']._serialized_end=199
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
33
libp2p/peer/pb/crypto_pb2.pyi
Normal file
33
libp2p/peer/pb/crypto_pb2.pyi
Normal file
@ -0,0 +1,33 @@
|
||||
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import message as _message
|
||||
from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union
|
||||
|
||||
DESCRIPTOR: _descriptor.FileDescriptor
|
||||
|
||||
class KeyType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
|
||||
__slots__ = ()
|
||||
RSA: _ClassVar[KeyType]
|
||||
Ed25519: _ClassVar[KeyType]
|
||||
Secp256k1: _ClassVar[KeyType]
|
||||
ECDSA: _ClassVar[KeyType]
|
||||
RSA: KeyType
|
||||
Ed25519: KeyType
|
||||
Secp256k1: KeyType
|
||||
ECDSA: KeyType
|
||||
|
||||
class PublicKey(_message.Message):
|
||||
__slots__ = ("Type", "Data")
|
||||
TYPE_FIELD_NUMBER: _ClassVar[int]
|
||||
DATA_FIELD_NUMBER: _ClassVar[int]
|
||||
Type: KeyType
|
||||
Data: bytes
|
||||
def __init__(self, Type: _Optional[_Union[KeyType, str]] = ..., Data: _Optional[bytes] = ...) -> None: ...
|
||||
|
||||
class PrivateKey(_message.Message):
|
||||
__slots__ = ("Type", "Data")
|
||||
TYPE_FIELD_NUMBER: _ClassVar[int]
|
||||
DATA_FIELD_NUMBER: _ClassVar[int]
|
||||
Type: KeyType
|
||||
Data: bytes
|
||||
def __init__(self, Type: _Optional[_Union[KeyType, str]] = ..., Data: _Optional[bytes] = ...) -> None: ...
|
||||
14
libp2p/peer/pb/envelope.proto
Normal file
14
libp2p/peer/pb/envelope.proto
Normal file
@ -0,0 +1,14 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package libp2p.peer.pb.record;
|
||||
|
||||
import "libp2p/peer/pb/crypto.proto";
|
||||
|
||||
option go_package = "github.com/libp2p/go-libp2p/core/record/pb";
|
||||
|
||||
message Envelope {
|
||||
libp2p.peer.pb.crypto.PublicKey public_key = 1;
|
||||
bytes payload_type = 2;
|
||||
bytes payload = 3;
|
||||
bytes signature = 5;
|
||||
}
|
||||
28
libp2p/peer/pb/envelope_pb2.py
Normal file
28
libp2p/peer/pb/envelope_pb2.py
Normal file
@ -0,0 +1,28 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: libp2p/peer/pb/envelope.proto
|
||||
# Protobuf Python Version: 4.25.3
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
from google.protobuf.internal import builder as _builder
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
from libp2p.peer.pb import crypto_pb2 as libp2p_dot_peer_dot_pb_dot_crypto__pb2
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1dlibp2p/peer/pb/envelope.proto\x12\x15libp2p.peer.pb.record\x1a\x1blibp2p/peer/pb/crypto.proto\"z\n\x08\x45nvelope\x12\x34\n\npublic_key\x18\x01 \x01(\x0b\x32 .libp2p.peer.pb.crypto.PublicKey\x12\x14\n\x0cpayload_type\x18\x02 \x01(\x0c\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x42,Z*github.com/libp2p/go-libp2p/core/record/pbb\x06proto3')
|
||||
|
||||
_globals = globals()
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.peer.pb.envelope_pb2', _globals)
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
_globals['DESCRIPTOR']._options = None
|
||||
_globals['DESCRIPTOR']._serialized_options = b'Z*github.com/libp2p/go-libp2p/core/record/pb'
|
||||
_globals['_ENVELOPE']._serialized_start=85
|
||||
_globals['_ENVELOPE']._serialized_end=207
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
18
libp2p/peer/pb/envelope_pb2.pyi
Normal file
18
libp2p/peer/pb/envelope_pb2.pyi
Normal file
@ -0,0 +1,18 @@
|
||||
from libp2p.peer.pb import crypto_pb2 as _crypto_pb2
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import message as _message
|
||||
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union
|
||||
|
||||
DESCRIPTOR: _descriptor.FileDescriptor
|
||||
|
||||
class Envelope(_message.Message):
|
||||
__slots__ = ("public_key", "payload_type", "payload", "signature")
|
||||
PUBLIC_KEY_FIELD_NUMBER: _ClassVar[int]
|
||||
PAYLOAD_TYPE_FIELD_NUMBER: _ClassVar[int]
|
||||
PAYLOAD_FIELD_NUMBER: _ClassVar[int]
|
||||
SIGNATURE_FIELD_NUMBER: _ClassVar[int]
|
||||
public_key: _crypto_pb2.PublicKey
|
||||
payload_type: bytes
|
||||
payload: bytes
|
||||
signature: bytes
|
||||
def __init__(self, public_key: _Optional[_Union[_crypto_pb2.PublicKey, _Mapping]] = ..., payload_type: _Optional[bytes] = ..., payload: _Optional[bytes] = ..., signature: _Optional[bytes] = ...) -> None: ... # type: ignore[type-arg]
|
||||
31
libp2p/peer/pb/peer_record.proto
Normal file
31
libp2p/peer/pb/peer_record.proto
Normal file
@ -0,0 +1,31 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package peer.pb;
|
||||
|
||||
option go_package = "github.com/libp2p/go-libp2p/core/peer/pb";
|
||||
|
||||
// PeerRecord messages contain information that is useful to share with other peers.
|
||||
// Currently, a PeerRecord contains the public listen addresses for a peer, but this
|
||||
// is expected to expand to include other information in the future.
|
||||
//
|
||||
// PeerRecords are designed to be serialized to bytes and placed inside of
|
||||
// SignedEnvelopes before sharing with other peers.
|
||||
// See https://github.com/libp2p/go-libp2p/blob/master/core/record/pb/envelope.proto for
|
||||
// the SignedEnvelope definition.
|
||||
message PeerRecord {
|
||||
|
||||
// AddressInfo is a wrapper around a binary multiaddr. It is defined as a
|
||||
// separate message to allow us to add per-address metadata in the future.
|
||||
message AddressInfo {
|
||||
bytes multiaddr = 1;
|
||||
}
|
||||
|
||||
// peer_id contains a libp2p peer id in its binary representation.
|
||||
bytes peer_id = 1;
|
||||
|
||||
// seq contains a monotonically-increasing sequence counter to order PeerRecords in time.
|
||||
uint64 seq = 2;
|
||||
|
||||
// addresses is a list of public listen addresses for the peer.
|
||||
repeated AddressInfo addresses = 3;
|
||||
}
|
||||
29
libp2p/peer/pb/peer_record_pb2.py
Normal file
29
libp2p/peer/pb/peer_record_pb2.py
Normal file
@ -0,0 +1,29 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: libp2p/peer/pb/peer_record.proto
|
||||
# Protobuf Python Version: 4.25.3
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
from google.protobuf.internal import builder as _builder
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n libp2p/peer/pb/peer_record.proto\x12\x07peer.pb\"\x80\x01\n\nPeerRecord\x12\x0f\n\x07peer_id\x18\x01 \x01(\x0c\x12\x0b\n\x03seq\x18\x02 \x01(\x04\x12\x32\n\taddresses\x18\x03 \x03(\x0b\x32\x1f.peer.pb.PeerRecord.AddressInfo\x1a \n\x0b\x41\x64\x64ressInfo\x12\x11\n\tmultiaddr\x18\x01 \x01(\x0c\x42*Z(github.com/libp2p/go-libp2p/core/peer/pbb\x06proto3')
|
||||
|
||||
_globals = globals()
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.peer.pb.peer_record_pb2', _globals)
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
_globals['DESCRIPTOR']._options = None
|
||||
_globals['DESCRIPTOR']._serialized_options = b'Z(github.com/libp2p/go-libp2p/core/peer/pb'
|
||||
_globals['_PEERRECORD']._serialized_start=46
|
||||
_globals['_PEERRECORD']._serialized_end=174
|
||||
_globals['_PEERRECORD_ADDRESSINFO']._serialized_start=142
|
||||
_globals['_PEERRECORD_ADDRESSINFO']._serialized_end=174
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
21
libp2p/peer/pb/peer_record_pb2.pyi
Normal file
21
libp2p/peer/pb/peer_record_pb2.pyi
Normal file
@ -0,0 +1,21 @@
|
||||
from google.protobuf.internal import containers as _containers
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import message as _message
|
||||
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
|
||||
|
||||
DESCRIPTOR: _descriptor.FileDescriptor
|
||||
|
||||
class PeerRecord(_message.Message):
|
||||
__slots__ = ("peer_id", "seq", "addresses")
|
||||
class AddressInfo(_message.Message):
|
||||
__slots__ = ("multiaddr",)
|
||||
MULTIADDR_FIELD_NUMBER: _ClassVar[int]
|
||||
multiaddr: bytes
|
||||
def __init__(self, multiaddr: _Optional[bytes] = ...) -> None: ...
|
||||
PEER_ID_FIELD_NUMBER: _ClassVar[int]
|
||||
SEQ_FIELD_NUMBER: _ClassVar[int]
|
||||
ADDRESSES_FIELD_NUMBER: _ClassVar[int]
|
||||
peer_id: bytes
|
||||
seq: int
|
||||
addresses: _containers.RepeatedCompositeFieldContainer[PeerRecord.AddressInfo]
|
||||
def __init__(self, peer_id: _Optional[bytes] = ..., seq: _Optional[int] = ..., addresses: _Optional[_Iterable[_Union[PeerRecord.AddressInfo, _Mapping]]] = ...) -> None: ... # type: ignore[type-arg]
|
||||
251
libp2p/peer/peer_record.py
Normal file
251
libp2p/peer/peer_record.py
Normal file
@ -0,0 +1,251 @@
|
||||
from collections.abc import Sequence
|
||||
import threading
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.abc import IPeerRecord
|
||||
from libp2p.peer.id import ID
|
||||
import libp2p.peer.pb.peer_record_pb2 as pb
|
||||
from libp2p.peer.peerinfo import PeerInfo
|
||||
|
||||
PEER_RECORD_ENVELOPE_DOMAIN = "libp2p-peer-record"
|
||||
PEER_RECORD_ENVELOPE_PAYLOAD_TYPE = b"\x03\x01"
|
||||
|
||||
_last_timestamp_lock = threading.Lock()
|
||||
_last_timestamp: int = 0
|
||||
|
||||
|
||||
class PeerRecord(IPeerRecord):
|
||||
"""
|
||||
A record that contains metatdata about a peer in the libp2p network.
|
||||
|
||||
This includes:
|
||||
- `peer_id`: The peer's globally unique indentifier.
|
||||
- `addrs`: A list of the peer's publicly reachable multiaddrs.
|
||||
- `seq`: A strictly monotonically increasing timestamp used
|
||||
to order records over time.
|
||||
|
||||
PeerRecords are designed to be signed and transmitted in libp2p routing Envelopes.
|
||||
"""
|
||||
|
||||
peer_id: ID
|
||||
addrs: list[Multiaddr]
|
||||
seq: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
peer_id: ID | None = None,
|
||||
addrs: list[Multiaddr] | None = None,
|
||||
seq: int | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize a new PeerRecord.
|
||||
If `seq` is not provided, a timestamp-based strictly increasing sequence
|
||||
number will be generated.
|
||||
|
||||
:param peer_id: ID of the peer this record refers to.
|
||||
:param addrs: Public multiaddrs of the peer.
|
||||
:param seq: Monotonic sequence number.
|
||||
|
||||
"""
|
||||
if peer_id is not None:
|
||||
self.peer_id = peer_id
|
||||
self.addrs = addrs or []
|
||||
if seq is not None:
|
||||
self.seq = seq
|
||||
else:
|
||||
self.seq = timestamp_seq()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"PeerRecord(\n"
|
||||
f" peer_id={self.peer_id},\n"
|
||||
f" multiaddrs={[str(m) for m in self.addrs]},\n"
|
||||
f" seq={self.seq}\n"
|
||||
f")"
|
||||
)
|
||||
|
||||
def domain(self) -> str:
|
||||
"""
|
||||
Return the domain string associated with this PeerRecord.
|
||||
|
||||
Used during record signing and envelope validation to identify the record type.
|
||||
"""
|
||||
return PEER_RECORD_ENVELOPE_DOMAIN
|
||||
|
||||
def codec(self) -> bytes:
|
||||
"""
|
||||
Return the codec identifier for PeerRecords.
|
||||
|
||||
This binary perfix helps distinguish PeerRecords in serialized envelopes.
|
||||
"""
|
||||
return PEER_RECORD_ENVELOPE_PAYLOAD_TYPE
|
||||
|
||||
def to_protobuf(self) -> pb.PeerRecord:
|
||||
"""
|
||||
Convert the current PeerRecord into a ProtoBuf PeerRecord message.
|
||||
|
||||
:raises ValueError: if peer_id serialization fails.
|
||||
:return: A ProtoBuf-encoded PeerRecord message object.
|
||||
"""
|
||||
try:
|
||||
id_bytes = self.peer_id.to_bytes()
|
||||
except Exception as e:
|
||||
raise ValueError(f"failed to marshal peer_id: {e}")
|
||||
|
||||
msg = pb.PeerRecord()
|
||||
msg.peer_id = id_bytes
|
||||
msg.seq = self.seq
|
||||
msg.addresses.extend(addrs_to_protobuf(self.addrs))
|
||||
return msg
|
||||
|
||||
def marshal_record(self) -> bytes:
|
||||
"""
|
||||
Serialize a PeerRecord into raw bytes suitable for embedding in an Envelope.
|
||||
|
||||
This is typically called during the process of signing or sealing the record.
|
||||
:raises ValueError: if serialization to protobuf fails.
|
||||
:return: Serialized PeerRecord bytes.
|
||||
"""
|
||||
try:
|
||||
msg = self.to_protobuf()
|
||||
return msg.SerializeToString()
|
||||
except Exception as e:
|
||||
raise ValueError(f"failed to marshal PeerRecord: {e}")
|
||||
|
||||
def equal(self, other: Any) -> bool:
|
||||
"""
|
||||
Check if this PeerRecord is identical to another.
|
||||
|
||||
Two PeerRecords are considered equal if:
|
||||
- Their peer IDs match.
|
||||
- Their sequence numbers are identical.
|
||||
- Their address lists are identical and in the same order.
|
||||
|
||||
:param other: Another PeerRecord instance.
|
||||
:return: True if all fields mathch, False otherwise.
|
||||
"""
|
||||
if isinstance(other, PeerRecord):
|
||||
if self.peer_id == other.peer_id:
|
||||
if self.seq == other.seq:
|
||||
if len(self.addrs) == len(other.addrs):
|
||||
for a1, a2 in zip(self.addrs, other.addrs):
|
||||
if a1 == a2:
|
||||
continue
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def unmarshal_record(data: bytes) -> PeerRecord:
|
||||
"""
|
||||
Deserialize a PeerRecord from its serialized byte representation.
|
||||
|
||||
Typically used when receiveing a PeerRecord inside a signed routing Envelope.
|
||||
|
||||
:param data: Serialized protobuf-encoded bytes.
|
||||
:raises ValueError: if parsing or conversion fails.
|
||||
:reurn: A valid PeerRecord instance.
|
||||
"""
|
||||
if data is None:
|
||||
raise ValueError("cannot unmarshal PeerRecord from None")
|
||||
|
||||
msg = pb.PeerRecord()
|
||||
try:
|
||||
msg.ParseFromString(data)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to parse PeerRecord protobuf: {e}")
|
||||
|
||||
try:
|
||||
record = peer_record_from_protobuf(msg)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to convert protobuf to PeerRecord: {e}")
|
||||
|
||||
return record
|
||||
|
||||
|
||||
def timestamp_seq() -> int:
|
||||
"""
|
||||
Generate a strictly increasing timestamp-based sequence number.
|
||||
|
||||
Ensures that even if multiple PeerRecords are generated in the same nanosecond,
|
||||
their `seq` values will still be strictly increasing by using a lock to track the
|
||||
last value.
|
||||
|
||||
:return: A strictly increasing integer timestamp.
|
||||
"""
|
||||
global _last_timestamp
|
||||
now = int(time.time_ns())
|
||||
with _last_timestamp_lock:
|
||||
if now <= _last_timestamp:
|
||||
now = _last_timestamp + 1
|
||||
_last_timestamp = now
|
||||
return now
|
||||
|
||||
|
||||
def peer_record_from_peer_info(info: PeerInfo) -> PeerRecord:
|
||||
"""
|
||||
Create a PeerRecord from a PeerInfo object.
|
||||
|
||||
This automatically assigns a timestamp-based sequence number to the record.
|
||||
:param info: A PeerInfo instance (contains peer_id and addrs).
|
||||
:return: A PeerRecord instance.
|
||||
"""
|
||||
record = PeerRecord()
|
||||
record.peer_id = info.peer_id
|
||||
record.addrs = info.addrs
|
||||
return record
|
||||
|
||||
|
||||
def peer_record_from_protobuf(msg: pb.PeerRecord) -> PeerRecord:
|
||||
"""
|
||||
Convert a protobuf PeerRecord message into a PeerRecord object.
|
||||
|
||||
:param msg: Protobuf PeerRecord message.
|
||||
:raises ValueError: if the peer_id cannot be parsed.
|
||||
:return: A deserialized PeerRecord instance.
|
||||
"""
|
||||
try:
|
||||
peer_id = ID(msg.peer_id)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to unmarshal peer_id: {e}")
|
||||
|
||||
addrs = addrs_from_protobuf(msg.addresses)
|
||||
seq = msg.seq
|
||||
|
||||
return PeerRecord(peer_id, addrs, seq)
|
||||
|
||||
|
||||
def addrs_from_protobuf(addrs: Sequence[pb.PeerRecord.AddressInfo]) -> list[Multiaddr]:
|
||||
"""
|
||||
Convert a list of protobuf address records to Multiaddr objects.
|
||||
|
||||
:param addrs: A list of protobuf PeerRecord.AddressInfo messages.
|
||||
:return: A list of decoded Multiaddr instances (invalid ones are skipped).
|
||||
"""
|
||||
out = []
|
||||
for addr_info in addrs:
|
||||
try:
|
||||
addr = Multiaddr(addr_info.multiaddr)
|
||||
out.append(addr)
|
||||
except Exception:
|
||||
continue
|
||||
return out
|
||||
|
||||
|
||||
def addrs_to_protobuf(addrs: list[Multiaddr]) -> list[pb.PeerRecord.AddressInfo]:
|
||||
"""
|
||||
Convert a list of Multiaddr objects into their protobuf representation.
|
||||
|
||||
:param addrs: A list of Multiaddr instances.
|
||||
:return: A list of PeerRecord.AddressInfo protobuf messages.
|
||||
"""
|
||||
out = []
|
||||
for addr in addrs:
|
||||
addr_info = pb.PeerRecord.AddressInfo()
|
||||
addr_info.multiaddr = addr.to_bytes()
|
||||
out.append(addr_info)
|
||||
return out
|
||||
@ -23,6 +23,7 @@ from libp2p.crypto.keys import (
|
||||
PrivateKey,
|
||||
PublicKey,
|
||||
)
|
||||
from libp2p.peer.envelope import Envelope
|
||||
|
||||
from .id import (
|
||||
ID,
|
||||
@ -38,12 +39,25 @@ from .peerinfo import (
|
||||
PERMANENT_ADDR_TTL = 0
|
||||
|
||||
|
||||
# TODO: Set up an async task for periodic peer-store cleanup
|
||||
# for expired addresses and records.
|
||||
class PeerRecordState:
|
||||
envelope: Envelope
|
||||
seq: int
|
||||
|
||||
def __init__(self, envelope: Envelope, seq: int):
|
||||
self.envelope = envelope
|
||||
self.seq = seq
|
||||
|
||||
|
||||
class PeerStore(IPeerStore):
|
||||
peer_data_map: dict[ID, PeerData]
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, max_records: int = 10000) -> None:
|
||||
self.peer_data_map = defaultdict(PeerData)
|
||||
self.addr_update_channels: dict[ID, MemorySendChannel[Multiaddr]] = {}
|
||||
self.peer_record_map: dict[ID, PeerRecordState] = {}
|
||||
self.max_records = max_records
|
||||
|
||||
def peer_info(self, peer_id: ID) -> PeerInfo:
|
||||
"""
|
||||
@ -70,6 +84,10 @@ class PeerStore(IPeerStore):
|
||||
else:
|
||||
raise PeerStoreError("peer ID not found")
|
||||
|
||||
# Clear the peer records
|
||||
if peer_id in self.peer_record_map:
|
||||
self.peer_record_map.pop(peer_id, None)
|
||||
|
||||
def valid_peer_ids(self) -> list[ID]:
|
||||
"""
|
||||
:return: all of the valid peer IDs stored in peer store
|
||||
@ -82,6 +100,38 @@ class PeerStore(IPeerStore):
|
||||
peer_data.clear_addrs()
|
||||
return valid_peer_ids
|
||||
|
||||
def _enforce_record_limit(self) -> None:
|
||||
"""Enforce maximum number of stored records."""
|
||||
if len(self.peer_record_map) > self.max_records:
|
||||
# Record oldest records based on seequence number
|
||||
sorted_records = sorted(
|
||||
self.peer_record_map.items(), key=lambda x: x[1].seq
|
||||
)
|
||||
records_to_remove = len(self.peer_record_map) - self.max_records
|
||||
for peer_id, _ in sorted_records[:records_to_remove]:
|
||||
self.maybe_delete_peer_record(peer_id)
|
||||
del self.peer_record_map[peer_id]
|
||||
|
||||
async def start_cleanup_task(self, cleanup_interval: int = 3600) -> None:
|
||||
"""Start periodic cleanup of expired peer records and addresses."""
|
||||
while True:
|
||||
await trio.sleep(cleanup_interval)
|
||||
self._cleanup_expired_records()
|
||||
|
||||
def _cleanup_expired_records(self) -> None:
|
||||
"""Remove expired peer records and addresses"""
|
||||
expired_peers = []
|
||||
|
||||
for peer_id, peer_data in self.peer_data_map.items():
|
||||
if peer_data.is_expired():
|
||||
expired_peers.append(peer_id)
|
||||
|
||||
for peer_id in expired_peers:
|
||||
self.maybe_delete_peer_record(peer_id)
|
||||
del self.peer_data_map[peer_id]
|
||||
|
||||
self._enforce_record_limit()
|
||||
|
||||
# --------PROTO-BOOK--------
|
||||
|
||||
def get_protocols(self, peer_id: ID) -> list[str]:
|
||||
@ -165,6 +215,85 @@ class PeerStore(IPeerStore):
|
||||
peer_data = self.peer_data_map[peer_id]
|
||||
peer_data.clear_metadata()
|
||||
|
||||
# -----CERT-ADDR-BOOK-----
|
||||
|
||||
# TODO: Make proper use of this function
|
||||
def maybe_delete_peer_record(self, peer_id: ID) -> None:
|
||||
"""
|
||||
Delete the signed peer record for a peer if it has no know
|
||||
(non-expired) addresses.
|
||||
|
||||
This is a garbage collection mechanism: if all addresses for a peer have expired
|
||||
or been cleared, there's no point holding onto its signed `Envelope`
|
||||
|
||||
:param peer_id: The peer whose record we may delete/
|
||||
"""
|
||||
if peer_id in self.peer_record_map:
|
||||
if not self.addrs(peer_id):
|
||||
self.peer_record_map.pop(peer_id, None)
|
||||
|
||||
def consume_peer_record(self, envelope: Envelope, ttl: int) -> bool:
|
||||
"""
|
||||
Accept and store a signed PeerRecord, unless it's older than
|
||||
the one already stored.
|
||||
|
||||
This function:
|
||||
- Extracts the peer ID and sequence number from the envelope
|
||||
- Rejects the record if it's older (lower seq)
|
||||
- Updates the stored peer record and replaces associated addresses if accepted
|
||||
|
||||
:param envelope: Signed envelope containing a PeerRecord.
|
||||
:param ttl: Time-to-live for the included multiaddrs (in seconds).
|
||||
:return: True if the record was accepted and stored; False if it was rejected.
|
||||
"""
|
||||
record = envelope.record()
|
||||
peer_id = record.peer_id
|
||||
|
||||
existing = self.peer_record_map.get(peer_id)
|
||||
if existing and existing.seq > record.seq:
|
||||
return False # reject older record
|
||||
|
||||
new_addrs = set(record.addrs)
|
||||
|
||||
self.peer_record_map[peer_id] = PeerRecordState(envelope, record.seq)
|
||||
self.peer_data_map[peer_id].clear_addrs()
|
||||
self.add_addrs(peer_id, list(new_addrs), ttl)
|
||||
|
||||
return True
|
||||
|
||||
def consume_peer_records(self, envelopes: list[Envelope], ttl: int) -> list[bool]:
|
||||
"""Consume multiple peer records in a single operation."""
|
||||
results = []
|
||||
for envelope in envelopes:
|
||||
results.append(self.consume_peer_record(envelope, ttl))
|
||||
return results
|
||||
|
||||
def get_peer_record(self, peer_id: ID) -> Envelope | None:
|
||||
"""
|
||||
Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists
|
||||
and is still relevant.
|
||||
|
||||
First, it runs cleanup via `maybe_delete_peer_record` to purge stale data.
|
||||
Then it checks whether the peer has valid, unexpired addresses before
|
||||
returning the associated envelope.
|
||||
|
||||
:param peer_id: The peer to look up.
|
||||
:return: The signed Envelope if the peer is known and has valid
|
||||
addresses; None otherwise.
|
||||
|
||||
"""
|
||||
self.maybe_delete_peer_record(peer_id)
|
||||
|
||||
# Check if the peer has any valid addresses
|
||||
if (
|
||||
peer_id in self.peer_data_map
|
||||
and not self.peer_data_map[peer_id].is_expired()
|
||||
):
|
||||
state = self.peer_record_map.get(peer_id)
|
||||
if state is not None:
|
||||
return state.envelope
|
||||
return None
|
||||
|
||||
# -------ADDR-BOOK--------
|
||||
|
||||
def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int = 0) -> None:
|
||||
@ -193,6 +322,8 @@ class PeerStore(IPeerStore):
|
||||
except trio.WouldBlock:
|
||||
pass # Or consider logging / dropping / replacing stream
|
||||
|
||||
self.maybe_delete_peer_record(peer_id)
|
||||
|
||||
def addrs(self, peer_id: ID) -> list[Multiaddr]:
|
||||
"""
|
||||
:param peer_id: peer ID to get addrs for
|
||||
@ -216,6 +347,8 @@ class PeerStore(IPeerStore):
|
||||
if peer_id in self.peer_data_map:
|
||||
self.peer_data_map[peer_id].clear_addrs()
|
||||
|
||||
self.maybe_delete_peer_record(peer_id)
|
||||
|
||||
def peers_with_addrs(self) -> list[ID]:
|
||||
"""
|
||||
:return: all of the peer IDs which has addrsfloat stored in peer store
|
||||
|
||||
2
newsfragments/753.feature.rst
Normal file
2
newsfragments/753.feature.rst
Normal file
@ -0,0 +1,2 @@
|
||||
Added the `Certified Addr-Book` interface supported by `Envelope` and `PeerRecord` class.
|
||||
Integrated the signed-peer-record transfer in the identify/push protocols.
|
||||
@ -23,7 +23,7 @@ dependencies = [
|
||||
"multiaddr @ git+https://github.com/multiformats/py-multiaddr.git@db8124e2321f316d3b7d2733c7df11d6ad9c03e6",
|
||||
"mypy-protobuf>=3.0.0",
|
||||
"noiseprotocol>=0.3.0",
|
||||
"protobuf>=4.21.0,<5.0.0",
|
||||
"protobuf>=4.25.0,<5.0.0",
|
||||
"pycryptodome>=3.9.2",
|
||||
"pymultihash>=0.8.2",
|
||||
"pynacl>=1.3.0",
|
||||
|
||||
@ -13,6 +13,8 @@ from libp2p.identity.identify.identify import (
|
||||
_multiaddr_to_bytes,
|
||||
parse_identify_response,
|
||||
)
|
||||
from libp2p.peer.envelope import Envelope, consume_envelope, unmarshal_envelope
|
||||
from libp2p.peer.peer_record import unmarshal_record
|
||||
from tests.utils.factories import (
|
||||
host_pair_factory,
|
||||
)
|
||||
@ -40,6 +42,19 @@ async def test_identify_protocol(security_protocol):
|
||||
# Parse the response (handles both old and new formats)
|
||||
identify_response = parse_identify_response(response)
|
||||
|
||||
# Validate the recieved envelope and then store it in the certified-addr-book
|
||||
envelope, record = consume_envelope(
|
||||
identify_response.signedPeerRecord, "libp2p-peer-record"
|
||||
)
|
||||
assert host_b.peerstore.consume_peer_record(envelope, ttl=7200)
|
||||
|
||||
# Check if the peer_id in the record is same as of host_a
|
||||
assert record.peer_id == host_a.get_id()
|
||||
|
||||
# Check if the peer-record is correctly consumed
|
||||
assert host_a.get_addrs() == host_b.peerstore.addrs(host_a.get_id())
|
||||
assert isinstance(host_b.peerstore.get_peer_record(host_a.get_id()), Envelope)
|
||||
|
||||
logger.debug("host_a: %s", host_a.get_addrs())
|
||||
logger.debug("host_b: %s", host_b.get_addrs())
|
||||
|
||||
@ -71,5 +86,14 @@ async def test_identify_protocol(security_protocol):
|
||||
# Check protocols
|
||||
assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols())
|
||||
|
||||
# sanity check
|
||||
assert identify_response == _mk_identify_protobuf(host_a, cleaned_addr)
|
||||
# sanity check if the peer_id of the identify msg are same
|
||||
assert (
|
||||
unmarshal_record(
|
||||
unmarshal_envelope(identify_response.signedPeerRecord).raw_payload
|
||||
).peer_id
|
||||
== unmarshal_record(
|
||||
unmarshal_envelope(
|
||||
_mk_identify_protobuf(host_a, cleaned_addr).signedPeerRecord
|
||||
).raw_payload
|
||||
).peer_id
|
||||
)
|
||||
|
||||
@ -3,7 +3,10 @@ from multiaddr import (
|
||||
Multiaddr,
|
||||
)
|
||||
|
||||
from libp2p.crypto.rsa import create_new_key_pair
|
||||
from libp2p.peer.envelope import Envelope, seal_record
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.peer.peer_record import PeerRecord
|
||||
from libp2p.peer.peerstore import (
|
||||
PeerStore,
|
||||
PeerStoreError,
|
||||
@ -84,3 +87,53 @@ def test_peers_with_addrs():
|
||||
store.clear_addrs(ID(b"peer2"))
|
||||
|
||||
assert set(store.peers_with_addrs()) == {ID(b"peer3")}
|
||||
|
||||
|
||||
def test_ceritified_addr_book():
|
||||
store = PeerStore()
|
||||
|
||||
key_pair = create_new_key_pair()
|
||||
peer_id = ID.from_pubkey(key_pair.public_key)
|
||||
addrs = [
|
||||
Multiaddr("/ip4/127.0.0.1/tcp/9000"),
|
||||
Multiaddr("/ip4/127.0.0.1/tcp/9001"),
|
||||
]
|
||||
ttl = 60
|
||||
|
||||
# Construct signed PereRecord
|
||||
record = PeerRecord(peer_id, addrs, 21)
|
||||
envelope = seal_record(record, key_pair.private_key)
|
||||
|
||||
result = store.consume_peer_record(envelope, ttl)
|
||||
assert result is True
|
||||
# Retrieve the record
|
||||
|
||||
retrieved = store.get_peer_record(peer_id)
|
||||
assert retrieved is not None
|
||||
assert isinstance(retrieved, Envelope)
|
||||
|
||||
addr_list = store.addrs(peer_id)
|
||||
assert set(addr_list) == set(addrs)
|
||||
|
||||
# Now try to push an older record (should be rejected)
|
||||
old_record = PeerRecord(peer_id, [Multiaddr("/ip4/10.0.0.1/tcp/4001")], 20)
|
||||
old_envelope = seal_record(old_record, key_pair.private_key)
|
||||
result = store.consume_peer_record(old_envelope, ttl)
|
||||
assert result is False
|
||||
|
||||
# Push a new record (should override)
|
||||
new_addrs = [Multiaddr("/ip4/192.168.0.1/tcp/5001")]
|
||||
new_record = PeerRecord(peer_id, new_addrs, 23)
|
||||
new_envelope = seal_record(new_record, key_pair.private_key)
|
||||
result = store.consume_peer_record(new_envelope, ttl)
|
||||
assert result is True
|
||||
|
||||
# Confirm the record is updated
|
||||
latest = store.get_peer_record(peer_id)
|
||||
assert isinstance(latest, Envelope)
|
||||
assert latest.record().seq == 23
|
||||
|
||||
# Merged addresses = old addres + new_addrs
|
||||
expected_addrs = set(new_addrs)
|
||||
actual_addrs = set(store.addrs(peer_id))
|
||||
assert actual_addrs == expected_addrs
|
||||
|
||||
129
tests/core/peer/test_envelope.py
Normal file
129
tests/core/peer/test_envelope.py
Normal file
@ -0,0 +1,129 @@
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.crypto.rsa import (
|
||||
create_new_key_pair,
|
||||
)
|
||||
from libp2p.peer.envelope import (
|
||||
Envelope,
|
||||
consume_envelope,
|
||||
make_unsigned,
|
||||
seal_record,
|
||||
unmarshal_envelope,
|
||||
)
|
||||
from libp2p.peer.id import ID
|
||||
import libp2p.peer.pb.crypto_pb2 as crypto_pb
|
||||
import libp2p.peer.pb.envelope_pb2 as env_pb
|
||||
from libp2p.peer.peer_record import PeerRecord
|
||||
|
||||
DOMAIN = "libp2p-peer-record"
|
||||
|
||||
|
||||
def test_basic_protobuf_serialization_deserialization():
|
||||
pubkey = crypto_pb.PublicKey()
|
||||
pubkey.Type = crypto_pb.KeyType.Ed25519
|
||||
pubkey.Data = b"\x01\x02\x03"
|
||||
|
||||
env = env_pb.Envelope()
|
||||
env.public_key.CopyFrom(pubkey)
|
||||
env.payload_type = b"\x03\x01"
|
||||
env.payload = b"test-payload"
|
||||
env.signature = b"signature-bytes"
|
||||
|
||||
serialized = env.SerializeToString()
|
||||
|
||||
new_env = env_pb.Envelope()
|
||||
new_env.ParseFromString(serialized)
|
||||
|
||||
assert new_env.public_key.Type == crypto_pb.KeyType.Ed25519
|
||||
assert new_env.public_key.Data == b"\x01\x02\x03"
|
||||
assert new_env.payload_type == b"\x03\x01"
|
||||
assert new_env.payload == b"test-payload"
|
||||
assert new_env.signature == b"signature-bytes"
|
||||
|
||||
|
||||
def test_enevelope_marshal_unmarshal_roundtrip():
|
||||
keypair = create_new_key_pair()
|
||||
pubkey = keypair.public_key
|
||||
private_key = keypair.private_key
|
||||
|
||||
payload_type = b"\x03\x01"
|
||||
payload = b"test-record"
|
||||
sig = private_key.sign(make_unsigned(DOMAIN, payload_type, payload))
|
||||
|
||||
env = Envelope(pubkey, payload_type, payload, sig)
|
||||
serialized = env.marshal_envelope()
|
||||
new_env = unmarshal_envelope(serialized)
|
||||
|
||||
assert new_env.public_key == pubkey
|
||||
assert new_env.payload_type == payload_type
|
||||
assert new_env.raw_payload == payload
|
||||
assert new_env.signature == sig
|
||||
|
||||
|
||||
def test_seal_and_consume_envelope_roundtrip():
|
||||
keypair = create_new_key_pair()
|
||||
priv_key = keypair.private_key
|
||||
pub_key = keypair.public_key
|
||||
|
||||
peer_id = ID.from_pubkey(pub_key)
|
||||
addrs = [Multiaddr("/ip4/127.0.0.1/tcp/4001"), Multiaddr("/ip4/127.0.0.1/tcp/4002")]
|
||||
seq = 12345
|
||||
|
||||
record = PeerRecord(peer_id=peer_id, addrs=addrs, seq=seq)
|
||||
|
||||
# Seal
|
||||
envelope = seal_record(record, priv_key)
|
||||
serialized = envelope.marshal_envelope()
|
||||
|
||||
# Consume
|
||||
env, rec = consume_envelope(serialized, record.domain())
|
||||
|
||||
# Assertions
|
||||
assert env.public_key == pub_key
|
||||
assert rec.peer_id == peer_id
|
||||
assert rec.seq == seq
|
||||
assert rec.addrs == addrs
|
||||
|
||||
|
||||
def test_envelope_equal():
|
||||
# Create a new keypair
|
||||
keypair = create_new_key_pair()
|
||||
private_key = keypair.private_key
|
||||
|
||||
# Create a mock PeerRecord
|
||||
record = PeerRecord(
|
||||
peer_id=ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px"),
|
||||
seq=1,
|
||||
addrs=[Multiaddr("/ip4/127.0.0.1/tcp/4001")],
|
||||
)
|
||||
|
||||
# Seal it into an Envelope
|
||||
env1 = seal_record(record, private_key)
|
||||
|
||||
# Create a second identical envelope
|
||||
env2 = Envelope(
|
||||
public_key=env1.public_key,
|
||||
payload_type=env1.payload_type,
|
||||
raw_payload=env1.raw_payload,
|
||||
signature=env1.signature,
|
||||
)
|
||||
|
||||
# They should be equal
|
||||
assert env1.equal(env2)
|
||||
|
||||
# Now change something — payload type
|
||||
env2.payload_type = b"\x99\x99"
|
||||
assert not env1.equal(env2)
|
||||
|
||||
# Restore payload_type but change signature
|
||||
env2.payload_type = env1.payload_type
|
||||
env2.signature = b"wrong-signature"
|
||||
assert not env1.equal(env2)
|
||||
|
||||
# Restore signature but change payload
|
||||
env2.signature = env1.signature
|
||||
env2.raw_payload = b"tampered"
|
||||
assert not env1.equal(env2)
|
||||
|
||||
# Finally, test with a non-envelope object
|
||||
assert not env1.equal("not-an-envelope")
|
||||
112
tests/core/peer/test_peer_record.py
Normal file
112
tests/core/peer/test_peer_record.py
Normal file
@ -0,0 +1,112 @@
|
||||
import time
|
||||
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.peer.id import ID
|
||||
import libp2p.peer.pb.peer_record_pb2 as pb
|
||||
from libp2p.peer.peer_record import (
|
||||
PeerRecord,
|
||||
addrs_from_protobuf,
|
||||
peer_record_from_protobuf,
|
||||
unmarshal_record,
|
||||
)
|
||||
|
||||
# Testing methods from PeerRecord base class and PeerRecord protobuf:
|
||||
|
||||
|
||||
def test_basic_protobuf_serializatrion_deserialization():
|
||||
record = pb.PeerRecord()
|
||||
record.seq = 1
|
||||
|
||||
serialized = record.SerializeToString()
|
||||
new_record = pb.PeerRecord()
|
||||
new_record.ParseFromString(serialized)
|
||||
|
||||
assert new_record.seq == 1
|
||||
|
||||
|
||||
def test_timestamp_seq_monotonicity():
|
||||
rec1 = PeerRecord()
|
||||
time.sleep(1)
|
||||
rec2 = PeerRecord()
|
||||
|
||||
assert isinstance(rec1.seq, int)
|
||||
assert isinstance(rec2.seq, int)
|
||||
assert rec2.seq > rec1.seq, f"Expected seq2 ({rec2.seq}) > seq1 ({rec1.seq})"
|
||||
|
||||
|
||||
def test_addrs_from_protobuf_multiple_addresses():
|
||||
ma1 = Multiaddr("/ip4/127.0.0.1/tcp/4001")
|
||||
ma2 = Multiaddr("/ip4/127.0.0.1/tcp/4002")
|
||||
|
||||
addr_info1 = pb.PeerRecord.AddressInfo()
|
||||
addr_info1.multiaddr = ma1.to_bytes()
|
||||
|
||||
addr_info2 = pb.PeerRecord.AddressInfo()
|
||||
addr_info2.multiaddr = ma2.to_bytes()
|
||||
|
||||
result = addrs_from_protobuf([addr_info1, addr_info2])
|
||||
assert result == [ma1, ma2]
|
||||
|
||||
|
||||
def test_peer_record_from_protobuf():
|
||||
peer_id = ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px")
|
||||
record = pb.PeerRecord()
|
||||
record.peer_id = peer_id.to_bytes()
|
||||
record.seq = 42
|
||||
|
||||
for addr_str in ["/ip4/127.0.0.1/tcp/4001", "/ip4/127.0.0.1/tcp/4002"]:
|
||||
ma = Multiaddr(addr_str)
|
||||
addr_info = pb.PeerRecord.AddressInfo()
|
||||
addr_info.multiaddr = ma.to_bytes()
|
||||
record.addresses.append(addr_info)
|
||||
|
||||
result = peer_record_from_protobuf(record)
|
||||
|
||||
assert result.peer_id == peer_id
|
||||
assert result.seq == 42
|
||||
assert len(result.addrs) == 2
|
||||
assert str(result.addrs[0]) == "/ip4/127.0.0.1/tcp/4001"
|
||||
assert str(result.addrs[1]) == "/ip4/127.0.0.1/tcp/4002"
|
||||
|
||||
|
||||
def test_to_protobuf_generates_correct_message():
|
||||
peer_id = ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px")
|
||||
addrs = [Multiaddr("/ip4/127.0.0.1/tcp/4001")]
|
||||
seq = 12345
|
||||
|
||||
record = PeerRecord(peer_id, addrs, seq)
|
||||
proto = record.to_protobuf()
|
||||
|
||||
assert isinstance(proto, pb.PeerRecord)
|
||||
assert proto.peer_id == peer_id.to_bytes()
|
||||
assert proto.seq == seq
|
||||
assert len(proto.addresses) == 1
|
||||
assert proto.addresses[0].multiaddr == addrs[0].to_bytes()
|
||||
|
||||
|
||||
def test_unmarshal_record_roundtrip():
|
||||
record = PeerRecord(
|
||||
peer_id=ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px"),
|
||||
addrs=[Multiaddr("/ip4/127.0.0.1/tcp/4001")],
|
||||
seq=999,
|
||||
)
|
||||
|
||||
serialized = record.to_protobuf().SerializeToString()
|
||||
deserialized = unmarshal_record(serialized)
|
||||
|
||||
assert deserialized.peer_id == record.peer_id
|
||||
assert deserialized.seq == record.seq
|
||||
assert len(deserialized.addrs) == 1
|
||||
assert deserialized.addrs[0] == record.addrs[0]
|
||||
|
||||
|
||||
def test_marshal_record_and_equal():
|
||||
peer_id = ID.from_base58("QmNM23MiU1Kd7yfiKVdUnaDo8RYca8By4zDmr7uSaVV8Px")
|
||||
addrs = [Multiaddr("/ip4/127.0.0.1/tcp/4001")]
|
||||
original = PeerRecord(peer_id, addrs)
|
||||
|
||||
serialized = original.marshal_record()
|
||||
deserailzed = unmarshal_record(serialized)
|
||||
|
||||
assert original.equal(deserailzed)
|
||||
@ -120,3 +120,30 @@ async def test_addr_stream_yields_new_addrs():
|
||||
nursery.cancel_scope.cancel()
|
||||
|
||||
assert collected == [addr1, addr2]
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_cleanup_task_remove_expired_data():
|
||||
store = PeerStore()
|
||||
peer_id = ID(b"peer123")
|
||||
addr = Multiaddr("/ip4/127.0.0.1/tcp/4040")
|
||||
|
||||
# Insert addrs with short TTL (0.01s)
|
||||
store.add_addr(peer_id, addr, 1)
|
||||
|
||||
assert store.addrs(peer_id) == [addr]
|
||||
assert peer_id in store.peer_data_map
|
||||
|
||||
# Start cleanup task in a nursery
|
||||
async with trio.open_nursery() as nursery:
|
||||
# Run the cleanup task with a short interval so it runs soon
|
||||
nursery.start_soon(store.start_cleanup_task, 1)
|
||||
|
||||
# Sleep long enough for TTL to expire and cleanup to run
|
||||
await trio.sleep(3)
|
||||
|
||||
# Cancel the nursery to stop background tasks
|
||||
nursery.cancel_scope.cancel()
|
||||
|
||||
# Confirm the peer data is gone from the peer_data_map
|
||||
assert peer_id not in store.peer_data_map
|
||||
|
||||
Reference in New Issue
Block a user