mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 08:00:54 +00:00
Integrated Signed-peer-record transfer with identify/identify-push
This commit is contained in:
@ -14,6 +14,8 @@ from libp2p.identity.identify.identify import (
|
|||||||
identify_handler_for,
|
identify_handler_for,
|
||||||
parse_identify_response,
|
parse_identify_response,
|
||||||
)
|
)
|
||||||
|
from libp2p.identity.identify.pb.identify_pb2 import Identify
|
||||||
|
from libp2p.peer.envelope import debug_dump_envelope, unmarshal_envelope
|
||||||
from libp2p.peer.peerinfo import (
|
from libp2p.peer.peerinfo import (
|
||||||
info_from_p2p_addr,
|
info_from_p2p_addr,
|
||||||
)
|
)
|
||||||
@ -32,10 +34,11 @@ def decode_multiaddrs(raw_addrs):
|
|||||||
return decoded_addrs
|
return decoded_addrs
|
||||||
|
|
||||||
|
|
||||||
def print_identify_response(identify_response):
|
def print_identify_response(identify_response: Identify):
|
||||||
"""Pretty-print Identify response."""
|
"""Pretty-print Identify response."""
|
||||||
public_key_b64 = base64.b64encode(identify_response.public_key).decode("utf-8")
|
public_key_b64 = base64.b64encode(identify_response.public_key).decode("utf-8")
|
||||||
listen_addrs = decode_multiaddrs(identify_response.listen_addrs)
|
listen_addrs = decode_multiaddrs(identify_response.listen_addrs)
|
||||||
|
signed_peer_record = unmarshal_envelope(identify_response.signedPeerRecord)
|
||||||
try:
|
try:
|
||||||
observed_addr_decoded = decode_multiaddrs([identify_response.observed_addr])
|
observed_addr_decoded = decode_multiaddrs([identify_response.observed_addr])
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -51,6 +54,8 @@ def print_identify_response(identify_response):
|
|||||||
f" Agent Version: {identify_response.agent_version}"
|
f" Agent Version: {identify_response.agent_version}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
debug_dump_envelope(signed_peer_record)
|
||||||
|
|
||||||
|
|
||||||
async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
|
async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
|
||||||
localhost_ip = "0.0.0.0"
|
localhost_ip = "0.0.0.0"
|
||||||
@ -61,6 +66,7 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
|
|||||||
host_a = new_host()
|
host_a = new_host()
|
||||||
|
|
||||||
# Set up identify handler with specified format
|
# Set up identify handler with specified format
|
||||||
|
# Set use_varint_format = False, if want to checkout the Signed-PeerRecord
|
||||||
identify_handler = identify_handler_for(
|
identify_handler = identify_handler_for(
|
||||||
host_a, use_varint_format=use_varint_format
|
host_a, use_varint_format=use_varint_format
|
||||||
)
|
)
|
||||||
@ -238,9 +244,9 @@ def main() -> None:
|
|||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# Determine format: raw format if --raw-format is specified, otherwise
|
# Determine format: use varint (length-prefixed) if --raw-format is specified,
|
||||||
# length-prefixed
|
# otherwise use raw protobuf format (old format)
|
||||||
use_varint_format = not args.raw_format
|
use_varint_format = args.raw_format
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if args.destination:
|
if args.destination:
|
||||||
|
|||||||
@ -15,6 +15,8 @@ from libp2p.custom_types import (
|
|||||||
from libp2p.network.stream.exceptions import (
|
from libp2p.network.stream.exceptions import (
|
||||||
StreamClosed,
|
StreamClosed,
|
||||||
)
|
)
|
||||||
|
from libp2p.peer.envelope import seal_record
|
||||||
|
from libp2p.peer.peer_record import PeerRecord
|
||||||
from libp2p.utils import (
|
from libp2p.utils import (
|
||||||
decode_varint_with_size,
|
decode_varint_with_size,
|
||||||
get_agent_version,
|
get_agent_version,
|
||||||
@ -63,6 +65,11 @@ def _mk_identify_protobuf(
|
|||||||
laddrs = host.get_addrs()
|
laddrs = host.get_addrs()
|
||||||
protocols = tuple(str(p) for p in host.get_mux().get_protocols() if p is not None)
|
protocols = tuple(str(p) for p in host.get_mux().get_protocols() if p is not None)
|
||||||
|
|
||||||
|
# Create a signed peer-record for the remote peer
|
||||||
|
record = PeerRecord(host.get_id(), host.get_addrs())
|
||||||
|
envelope = seal_record(record, host.get_private_key())
|
||||||
|
protobuf = envelope.marshal_envelope()
|
||||||
|
|
||||||
observed_addr = observed_multiaddr.to_bytes() if observed_multiaddr else b""
|
observed_addr = observed_multiaddr.to_bytes() if observed_multiaddr else b""
|
||||||
return Identify(
|
return Identify(
|
||||||
protocol_version=PROTOCOL_VERSION,
|
protocol_version=PROTOCOL_VERSION,
|
||||||
@ -71,6 +78,7 @@ def _mk_identify_protobuf(
|
|||||||
listen_addrs=map(_multiaddr_to_bytes, laddrs),
|
listen_addrs=map(_multiaddr_to_bytes, laddrs),
|
||||||
observed_addr=observed_addr,
|
observed_addr=observed_addr,
|
||||||
protocols=protocols,
|
protocols=protocols,
|
||||||
|
signedPeerRecord=protobuf,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -9,4 +9,5 @@ message Identify {
|
|||||||
repeated bytes listen_addrs = 2;
|
repeated bytes listen_addrs = 2;
|
||||||
optional bytes observed_addr = 4;
|
optional bytes observed_addr = 4;
|
||||||
repeated string protocols = 3;
|
repeated string protocols = 3;
|
||||||
|
optional bytes signedPeerRecord = 8;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,11 +1,12 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||||
# source: libp2p/identity/identify/pb/identify.proto
|
# source: libp2p/identity/identify/pb/identify.proto
|
||||||
|
# Protobuf Python Version: 4.25.3
|
||||||
"""Generated protocol buffer code."""
|
"""Generated protocol buffer code."""
|
||||||
from google.protobuf.internal import builder as _builder
|
|
||||||
from google.protobuf import descriptor as _descriptor
|
from google.protobuf import descriptor as _descriptor
|
||||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||||
from google.protobuf import symbol_database as _symbol_database
|
from google.protobuf import symbol_database as _symbol_database
|
||||||
|
from google.protobuf.internal import builder as _builder
|
||||||
# @@protoc_insertion_point(imports)
|
# @@protoc_insertion_point(imports)
|
||||||
|
|
||||||
_sym_db = _symbol_database.Default()
|
_sym_db = _symbol_database.Default()
|
||||||
@ -13,13 +14,13 @@ _sym_db = _symbol_database.Default()
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*libp2p/identity/identify/pb/identify.proto\x12\x0bidentify.pb\"\x8f\x01\n\x08Identify\x12\x18\n\x10protocol_version\x18\x05 \x01(\t\x12\x15\n\ragent_version\x18\x06 \x01(\t\x12\x12\n\npublic_key\x18\x01 \x01(\x0c\x12\x14\n\x0clisten_addrs\x18\x02 \x03(\x0c\x12\x15\n\robserved_addr\x18\x04 \x01(\x0c\x12\x11\n\tprotocols\x18\x03 \x03(\t')
|
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*libp2p/identity/identify/pb/identify.proto\x12\x0bidentify.pb\"\xa9\x01\n\x08Identify\x12\x18\n\x10protocol_version\x18\x05 \x01(\t\x12\x15\n\ragent_version\x18\x06 \x01(\t\x12\x12\n\npublic_key\x18\x01 \x01(\x0c\x12\x14\n\x0clisten_addrs\x18\x02 \x03(\x0c\x12\x15\n\robserved_addr\x18\x04 \x01(\x0c\x12\x11\n\tprotocols\x18\x03 \x03(\t\x12\x18\n\x10signedPeerRecord\x18\x08 \x01(\x0c')
|
||||||
|
|
||||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
|
_globals = globals()
|
||||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.identity.identify.pb.identify_pb2', globals())
|
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||||
|
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.identity.identify.pb.identify_pb2', _globals)
|
||||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||||
|
|
||||||
DESCRIPTOR._options = None
|
DESCRIPTOR._options = None
|
||||||
_IDENTIFY._serialized_start=60
|
_globals['_IDENTIFY']._serialized_start=60
|
||||||
_IDENTIFY._serialized_end=203
|
_globals['_IDENTIFY']._serialized_end=229
|
||||||
# @@protoc_insertion_point(module_scope)
|
# @@protoc_insertion_point(module_scope)
|
||||||
|
|||||||
@ -1,46 +1,24 @@
|
|||||||
"""
|
from google.protobuf.internal import containers as _containers
|
||||||
@generated by mypy-protobuf. Do not edit manually!
|
from google.protobuf import descriptor as _descriptor
|
||||||
isort:skip_file
|
from google.protobuf import message as _message
|
||||||
"""
|
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Optional as _Optional
|
||||||
|
|
||||||
import builtins
|
DESCRIPTOR: _descriptor.FileDescriptor
|
||||||
import collections.abc
|
|
||||||
import google.protobuf.descriptor
|
|
||||||
import google.protobuf.internal.containers
|
|
||||||
import google.protobuf.message
|
|
||||||
import typing
|
|
||||||
|
|
||||||
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
|
class Identify(_message.Message):
|
||||||
|
__slots__ = ("protocol_version", "agent_version", "public_key", "listen_addrs", "observed_addr", "protocols", "signedPeerRecord")
|
||||||
@typing.final
|
PROTOCOL_VERSION_FIELD_NUMBER: _ClassVar[int]
|
||||||
class Identify(google.protobuf.message.Message):
|
AGENT_VERSION_FIELD_NUMBER: _ClassVar[int]
|
||||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
PUBLIC_KEY_FIELD_NUMBER: _ClassVar[int]
|
||||||
|
LISTEN_ADDRS_FIELD_NUMBER: _ClassVar[int]
|
||||||
PROTOCOL_VERSION_FIELD_NUMBER: builtins.int
|
OBSERVED_ADDR_FIELD_NUMBER: _ClassVar[int]
|
||||||
AGENT_VERSION_FIELD_NUMBER: builtins.int
|
PROTOCOLS_FIELD_NUMBER: _ClassVar[int]
|
||||||
PUBLIC_KEY_FIELD_NUMBER: builtins.int
|
SIGNEDPEERRECORD_FIELD_NUMBER: _ClassVar[int]
|
||||||
LISTEN_ADDRS_FIELD_NUMBER: builtins.int
|
protocol_version: str
|
||||||
OBSERVED_ADDR_FIELD_NUMBER: builtins.int
|
agent_version: str
|
||||||
PROTOCOLS_FIELD_NUMBER: builtins.int
|
public_key: bytes
|
||||||
protocol_version: builtins.str
|
listen_addrs: _containers.RepeatedScalarFieldContainer[bytes]
|
||||||
agent_version: builtins.str
|
observed_addr: bytes
|
||||||
public_key: builtins.bytes
|
protocols: _containers.RepeatedScalarFieldContainer[str]
|
||||||
observed_addr: builtins.bytes
|
signedPeerRecord: bytes
|
||||||
@property
|
def __init__(self, protocol_version: _Optional[str] = ..., agent_version: _Optional[str] = ..., public_key: _Optional[bytes] = ..., listen_addrs: _Optional[_Iterable[bytes]] = ..., observed_addr: _Optional[bytes] = ..., protocols: _Optional[_Iterable[str]] = ..., signedPeerRecord: _Optional[bytes] = ...) -> None: ...
|
||||||
def listen_addrs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ...
|
|
||||||
@property
|
|
||||||
def protocols(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
protocol_version: builtins.str | None = ...,
|
|
||||||
agent_version: builtins.str | None = ...,
|
|
||||||
public_key: builtins.bytes | None = ...,
|
|
||||||
listen_addrs: collections.abc.Iterable[builtins.bytes] | None = ...,
|
|
||||||
observed_addr: builtins.bytes | None = ...,
|
|
||||||
protocols: collections.abc.Iterable[builtins.str] | None = ...,
|
|
||||||
) -> None: ...
|
|
||||||
def HasField(self, field_name: typing.Literal["agent_version", b"agent_version", "observed_addr", b"observed_addr", "protocol_version", b"protocol_version", "public_key", b"public_key"]) -> builtins.bool: ...
|
|
||||||
def ClearField(self, field_name: typing.Literal["agent_version", b"agent_version", "listen_addrs", b"listen_addrs", "observed_addr", b"observed_addr", "protocol_version", b"protocol_version", "protocols", b"protocols", "public_key", b"public_key"]) -> None: ...
|
|
||||||
|
|
||||||
global___Identify = Identify
|
|
||||||
|
|||||||
@ -20,6 +20,7 @@ from libp2p.custom_types import (
|
|||||||
from libp2p.network.stream.exceptions import (
|
from libp2p.network.stream.exceptions import (
|
||||||
StreamClosed,
|
StreamClosed,
|
||||||
)
|
)
|
||||||
|
from libp2p.peer.envelope import consume_envelope
|
||||||
from libp2p.peer.id import (
|
from libp2p.peer.id import (
|
||||||
ID,
|
ID,
|
||||||
)
|
)
|
||||||
@ -150,6 +151,19 @@ async def _update_peerstore_from_identify(
|
|||||||
peerstore.add_addr(peer_id, observed_addr, 7200)
|
peerstore.add_addr(peer_id, observed_addr, 7200)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error updating observed address for peer %s: %s", peer_id, e)
|
logger.error("Error updating observed address for peer %s: %s", peer_id, e)
|
||||||
|
if identify_msg.HasField("signedPeerRecord"):
|
||||||
|
try:
|
||||||
|
# Convert the signed-peer-record(Envelope) from prtobuf bytes
|
||||||
|
envelope, _ = consume_envelope(
|
||||||
|
identify_msg.signedPeerRecord, "libp2p-peer-record"
|
||||||
|
)
|
||||||
|
# Use a default TTL of 2 hours (7200 seconds)
|
||||||
|
if not peerstore.consume_peer_record(envelope, 7200):
|
||||||
|
logger.error("Updating Certified-Addr-Book was unsuccessful")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"Error updating the certified addr book for peer %s: %s", peer_id, e
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def push_identify_to_peer(
|
async def push_identify_to_peer(
|
||||||
|
|||||||
@ -7,7 +7,11 @@ from libp2p.crypto.secp256k1 import Secp256k1PublicKey
|
|||||||
import libp2p.peer.pb.crypto_pb2 as cryto_pb
|
import libp2p.peer.pb.crypto_pb2 as cryto_pb
|
||||||
import libp2p.peer.pb.envelope_pb2 as pb
|
import libp2p.peer.pb.envelope_pb2 as pb
|
||||||
import libp2p.peer.pb.peer_record_pb2 as record_pb
|
import libp2p.peer.pb.peer_record_pb2 as record_pb
|
||||||
from libp2p.peer.peer_record import PeerRecord, peer_record_from_protobuf
|
from libp2p.peer.peer_record import (
|
||||||
|
PeerRecord,
|
||||||
|
peer_record_from_protobuf,
|
||||||
|
unmarshal_record,
|
||||||
|
)
|
||||||
from libp2p.utils.varint import encode_uvarint
|
from libp2p.utils.varint import encode_uvarint
|
||||||
|
|
||||||
ENVELOPE_DOMAIN = "libp2p-peer-record"
|
ENVELOPE_DOMAIN = "libp2p-peer-record"
|
||||||
@ -251,3 +255,17 @@ def make_unsigned(domain: str, payload_type: bytes, payload: bytes) -> bytes:
|
|||||||
buf.extend(field)
|
buf.extend(field)
|
||||||
|
|
||||||
return bytes(buf)
|
return bytes(buf)
|
||||||
|
|
||||||
|
|
||||||
|
def debug_dump_envelope(env: Envelope) -> None:
|
||||||
|
print("\n=== Envelope ===")
|
||||||
|
print(f"Payload Type: {env.payload_type!r}")
|
||||||
|
print(f"Signature: {env.signature.hex()} ({len(env.signature)} bytes)")
|
||||||
|
print(f"Raw Payload: {env.raw_payload.hex()} ({len(env.raw_payload)} bytes)")
|
||||||
|
|
||||||
|
try:
|
||||||
|
peer_record = unmarshal_record(env.raw_payload)
|
||||||
|
print("\n=== Parsed PeerRecord ===")
|
||||||
|
print(peer_record)
|
||||||
|
except Exception as e:
|
||||||
|
print("Failed to parse PeerRecord:", e)
|
||||||
|
|||||||
@ -58,6 +58,15 @@ class PeerRecord(IPeerRecord):
|
|||||||
else:
|
else:
|
||||||
self.seq = timestamp_seq()
|
self.seq = timestamp_seq()
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return (
|
||||||
|
f"PeerRecord(\n"
|
||||||
|
f" peer_id={self.peer_id},\n"
|
||||||
|
f" multiaddrs={[str(m) for m in self.addrs]},\n"
|
||||||
|
f" seq={self.seq}\n"
|
||||||
|
f")"
|
||||||
|
)
|
||||||
|
|
||||||
def domain(self) -> str:
|
def domain(self) -> str:
|
||||||
"""
|
"""
|
||||||
Return the domain string associated with this PeerRecord.
|
Return the domain string associated with this PeerRecord.
|
||||||
|
|||||||
@ -13,6 +13,8 @@ from libp2p.identity.identify.identify import (
|
|||||||
_multiaddr_to_bytes,
|
_multiaddr_to_bytes,
|
||||||
parse_identify_response,
|
parse_identify_response,
|
||||||
)
|
)
|
||||||
|
from libp2p.peer.envelope import Envelope, consume_envelope, unmarshal_envelope
|
||||||
|
from libp2p.peer.peer_record import unmarshal_record
|
||||||
from tests.utils.factories import (
|
from tests.utils.factories import (
|
||||||
host_pair_factory,
|
host_pair_factory,
|
||||||
)
|
)
|
||||||
@ -40,6 +42,19 @@ async def test_identify_protocol(security_protocol):
|
|||||||
# Parse the response (handles both old and new formats)
|
# Parse the response (handles both old and new formats)
|
||||||
identify_response = parse_identify_response(response)
|
identify_response = parse_identify_response(response)
|
||||||
|
|
||||||
|
# Validate the recieved envelope and then store it in the certified-addr-book
|
||||||
|
envelope, record = consume_envelope(
|
||||||
|
identify_response.signedPeerRecord, "libp2p-peer-record"
|
||||||
|
)
|
||||||
|
assert host_b.peerstore.consume_peer_record(envelope, ttl=7200)
|
||||||
|
|
||||||
|
# Check if the peer_id in the record is same as of host_a
|
||||||
|
assert record.peer_id == host_a.get_id()
|
||||||
|
|
||||||
|
# Check if the peer-record is correctly consumed
|
||||||
|
assert host_a.get_addrs() == host_b.peerstore.addrs(host_a.get_id())
|
||||||
|
assert isinstance(host_b.peerstore.get_peer_record(host_a.get_id()), Envelope)
|
||||||
|
|
||||||
logger.debug("host_a: %s", host_a.get_addrs())
|
logger.debug("host_a: %s", host_a.get_addrs())
|
||||||
logger.debug("host_b: %s", host_b.get_addrs())
|
logger.debug("host_b: %s", host_b.get_addrs())
|
||||||
|
|
||||||
@ -71,5 +86,14 @@ async def test_identify_protocol(security_protocol):
|
|||||||
# Check protocols
|
# Check protocols
|
||||||
assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols())
|
assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols())
|
||||||
|
|
||||||
# sanity check
|
# sanity check if the peer_id of the identify msg are same
|
||||||
assert identify_response == _mk_identify_protobuf(host_a, cleaned_addr)
|
assert (
|
||||||
|
unmarshal_record(
|
||||||
|
unmarshal_envelope(identify_response.signedPeerRecord).raw_payload
|
||||||
|
).peer_id
|
||||||
|
== unmarshal_record(
|
||||||
|
unmarshal_envelope(
|
||||||
|
_mk_identify_protobuf(host_a, cleaned_addr).signedPeerRecord
|
||||||
|
).raw_payload
|
||||||
|
).peer_id
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user