From d99b67eafa3727f8730597860e3634ea629aeb7f Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sun, 17 Aug 2025 13:53:25 +0530 Subject: [PATCH] now ignoring pubsub messages upon receving invalid-signed-records --- libp2p/pubsub/gossipsub.py | 4 +++- libp2p/pubsub/pubsub.py | 6 +++++- tests/core/pubsub/test_pubsub.py | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index fa221a0f..aaf0b2fa 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -229,7 +229,9 @@ class GossipSub(IPubsubRouter, Service): """ # Process the senderRecord if sent if isinstance(self.pubsub, Pubsub): - _ = maybe_consume_signed_record(rpc, self.pubsub.host) + if not maybe_consume_signed_record(rpc, self.pubsub.host): + logger.error("Received an invalid-signed-record, ignoring the message") + return control_message = rpc.control diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index cbaaafb5..3200c73a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -270,7 +270,11 @@ class Pubsub(Service, IPubsub): rpc_incoming.ParseFromString(incoming) # Process the sender's signed-record if sent - _ = maybe_consume_signed_record(rpc_incoming, self.host) + if not maybe_consume_signed_record(rpc_incoming, self.host): + logger.error( + "Received an invalid-signed-record, ignoring the incoming msg" + ) + continue if rpc_incoming.publish: # deal with RPC.publish diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index 179f359c..54bc67a1 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -11,6 +11,7 @@ import pytest import multiaddr import trio +from libp2p.crypto.rsa import create_new_key_pair from libp2p.custom_types import AsyncValidatorFn from libp2p.exceptions import ( ValidationError, @@ -162,6 +163,27 @@ async def test_peers_subscribe(): assert envelope_b_sub.record().seq == envelope_b_unsub.record().seq +@pytest.mark.trio +async def test_peer_subscribe_fail_upon_invald_record_transfer(): + async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: + await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host) + + # Corrupt host_a's local peer record + envelope = pubsubs_fsub[0].host.get_peerstore().get_local_record() + key_pair = create_new_key_pair() + + if envelope is not None: + envelope.public_key = key_pair.public_key + pubsubs_fsub[0].host.get_peerstore().set_local_record(envelope) + + await pubsubs_fsub[0].subscribe(TESTING_TOPIC) + # Yeild to let 0 notify 1 + await trio.sleep(1) + assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics.get( + TESTING_TOPIC, set() + ) + + @pytest.mark.trio async def test_get_hello_packet(): async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub: