Merge branch 'main' into fix_pubsub_msg_id_type_inconsistency

This commit is contained in:
unniznd
2025-09-02 10:16:17 +05:30
16 changed files with 416 additions and 354 deletions

View File

@ -32,10 +32,12 @@ from libp2p.peer.peerinfo import (
)
from libp2p.peer.peerstore import (
PERMANENT_ADDR_TTL,
env_to_send_in_RPC,
)
from libp2p.pubsub import (
floodsub,
)
from libp2p.pubsub.utils import maybe_consume_signed_record
from libp2p.tools.async_service import (
Service,
)
@ -228,6 +230,12 @@ class GossipSub(IPubsubRouter, Service):
:param rpc: RPC message
:param sender_peer_id: id of the peer who sent the message
"""
# Process the senderRecord if sent
if isinstance(self.pubsub, Pubsub):
if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id):
logger.error("Received an invalid-signed-record, ignoring the message")
return
control_message = rpc.control
# Relay each rpc control message to the appropriate handler
@ -255,6 +263,11 @@ class GossipSub(IPubsubRouter, Service):
)
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
# Add the senderRecord of the peer in the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
rpc_msg.senderRecord = envelope_bytes
logger.debug("publishing message %s", pubsub_msg)
for peer_id in peers_gen:
@ -820,6 +833,13 @@ class GossipSub(IPubsubRouter, Service):
# 1) Package these messages into a single packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
# Here the an RPC message is being created and published in response
# to the iwant control msg, so we will send a freshly created senderRecord
# with the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
packet.senderRecord = envelope_bytes
packet.publish.extend(msgs_to_forward)
if self.pubsub is None:
@ -975,6 +995,12 @@ class GossipSub(IPubsubRouter, Service):
raise NoPubsubAttached
# Add control message to packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
# Add the sender's peer-record in the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
packet.senderRecord = envelope_bytes
packet.control.CopyFrom(control_msg)
# Get stream for peer from pubsub