signed-peer-record transfer integrated with pubsub rpc message trasfer

This commit is contained in:
lla-dane
2025-08-11 18:27:11 +05:30
parent 5c11ac20e7
commit 56526b4870
6 changed files with 266 additions and 346 deletions

View File

@ -50,12 +50,14 @@ from libp2p.network.stream.exceptions import (
StreamEOF,
StreamReset,
)
from libp2p.peer.envelope import consume_envelope
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerdata import (
PeerDataError,
)
from libp2p.peer.peerstore import create_signed_peer_record
from libp2p.tools.async_service import (
Service,
)
@ -247,6 +249,14 @@ class Pubsub(Service, IPubsub):
packet.subscriptions.extend(
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
)
# Add the sender's signedRecord in the RPC message
envelope = create_signed_peer_record(
self.host.get_id(),
self.host.get_addrs(),
self.host.get_private_key(),
)
packet.senderRecord = envelope.marshal_envelope()
return packet
async def continuously_read_stream(self, stream: INetStream) -> None:
@ -263,6 +273,27 @@ class Pubsub(Service, IPubsub):
incoming: bytes = await read_varint_prefixed_bytes(stream)
rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming)
# Process the sender's signed-record if sent
if rpc_incoming.HasField("senderRecord"):
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
rpc_incoming.senderRecord, "libp2p-peer-record"
)
# Use the default TTL of 2 hours (7200 seconds)
if self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
if rpc_incoming.publish:
# deal with RPC.publish
for msg in rpc_incoming.publish:
@ -572,6 +603,14 @@ class Pubsub(Service, IPubsub):
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
)
# Add the senderRecord of the peer in the RPC msg
envelope = create_signed_peer_record(
self.host.get_id(),
self.host.get_addrs(),
self.host.get_private_key(),
)
packet.senderRecord = envelope.marshal_envelope()
# Send out subscribe message to all peers
await self.message_all_peers(packet.SerializeToString())
@ -604,6 +643,13 @@ class Pubsub(Service, IPubsub):
packet.subscriptions.extend(
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]
)
# Add the senderRecord of the peer in the RPC msg
envelope = create_signed_peer_record(
self.host.get_id(),
self.host.get_addrs(),
self.host.get_private_key(),
)
packet.senderRecord = envelope.marshal_envelope()
# Send out unsubscribe message to all peers
await self.message_all_peers(packet.SerializeToString())