diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index f0e09404..8167581d 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -15,7 +15,7 @@ from libp2p.custom_types import ( from libp2p.peer.id import ( ID, ) -from libp2p.pubsub.utils import env_to_send_in_RPC +from libp2p.peer.peerstore import env_to_send_in_RPC from .exceptions import ( PubsubRouterError, @@ -106,7 +106,7 @@ class FloodSub(IPubsubRouter): # Add the senderRecord of the peer in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host) rpc_msg.senderRecord = envelope_bytes logger.debug("publishing message %s", pubsub_msg) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index aaf0b2fa..a4c8c463 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -34,11 +34,12 @@ from libp2p.peer.peerinfo import ( ) from libp2p.peer.peerstore import ( PERMANENT_ADDR_TTL, + env_to_send_in_RPC, ) from libp2p.pubsub import ( floodsub, ) -from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record +from libp2p.pubsub.utils import maybe_consume_signed_record from libp2p.tools.async_service import ( Service, ) @@ -229,7 +230,7 @@ class GossipSub(IPubsubRouter, Service): """ # Process the senderRecord if sent if isinstance(self.pubsub, Pubsub): - if not maybe_consume_signed_record(rpc, self.pubsub.host): + if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id): logger.error("Received an invalid-signed-record, ignoring the message") return @@ -262,7 +263,7 @@ class GossipSub(IPubsubRouter, Service): # Add the senderRecord of the peer in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host) rpc_msg.senderRecord = envelope_bytes logger.debug("publishing message %s", pubsub_msg) @@ -834,7 +835,7 @@ class GossipSub(IPubsubRouter, Service): # to the iwant control msg, so we will send a freshly created senderRecord # with the RPC msg if isinstance(self.pubsub, Pubsub): - envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host) packet.senderRecord = envelope_bytes packet.publish.extend(msgs_to_forward) @@ -995,7 +996,7 @@ class GossipSub(IPubsubRouter, Service): # Add the sender's peer-record in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host) packet.senderRecord = envelope_bytes packet.control.CopyFrom(control_msg) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 3200c73a..2c605fc3 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -56,7 +56,8 @@ from libp2p.peer.id import ( from libp2p.peer.peerdata import ( PeerDataError, ) -from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record +from libp2p.peer.peerstore import env_to_send_in_RPC +from libp2p.pubsub.utils import maybe_consume_signed_record from libp2p.tools.async_service import ( Service, ) @@ -249,7 +250,7 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) # Add the sender's signedRecord in the RPC message - envelope_bytes, bool = env_to_send_in_RPC(self.host) + envelope_bytes, _ = env_to_send_in_RPC(self.host) packet.senderRecord = envelope_bytes return packet @@ -270,7 +271,7 @@ class Pubsub(Service, IPubsub): rpc_incoming.ParseFromString(incoming) # Process the sender's signed-record if sent - if not maybe_consume_signed_record(rpc_incoming, self.host): + if not maybe_consume_signed_record(rpc_incoming, self.host, peer_id): logger.error( "Received an invalid-signed-record, ignoring the incoming msg" ) @@ -586,7 +587,7 @@ class Pubsub(Service, IPubsub): ) # Add the senderRecord of the peer in the RPC msg - envelope_bytes, bool = env_to_send_in_RPC(self.host) + envelope_bytes, _ = env_to_send_in_RPC(self.host) packet.senderRecord = envelope_bytes # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -621,7 +622,7 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)] ) # Add the senderRecord of the peer in the RPC msg - envelope_bytes, bool = env_to_send_in_RPC(self.host) + envelope_bytes, _ = env_to_send_in_RPC(self.host) packet.senderRecord = envelope_bytes # Send out unsubscribe message to all peers diff --git a/libp2p/pubsub/utils.py b/libp2p/pubsub/utils.py index 163a2870..3a69becb 100644 --- a/libp2p/pubsub/utils.py +++ b/libp2p/pubsub/utils.py @@ -2,50 +2,49 @@ import logging from libp2p.abc import IHost from libp2p.peer.envelope import consume_envelope -from libp2p.peer.peerstore import create_signed_peer_record +from libp2p.peer.id import ID from libp2p.pubsub.pb.rpc_pb2 import RPC logger = logging.getLogger("pubsub-example.utils") -def maybe_consume_signed_record(msg: RPC, host: IHost) -> bool: +def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool: + """ + Attempt to parse and store a signed-peer-record (Envelope) received during + PubSub communication. If the record is invalid, the peer-id does not match, or + updating the peerstore fails, the function logs an error and returns False. + + Parameters + ---------- + msg : RPC + The protobuf message received during PubSub communication. + host : IHost + The local host instance, providing access to the peerstore for storing + verified peer records. + peer_id : ID | None, optional + The expected peer ID for record validation. If provided, the peer ID + inside the record must match this value. + + Returns + ------- + bool + True if a valid signed peer record was successfully consumed and stored, + False otherwise. + + """ if msg.HasField("senderRecord"): try: # Convert the signed-peer-record(Envelope) from # protobuf bytes - envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record") + envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record") + if not record.peer_id == peer_id: + return False + # Use the default TTL of 2 hours (7200 seconds) if not host.get_peerstore().consume_peer_record(envelope, 7200): logger.error("Updating the certified-addr-book was unsuccessful") + return False except Exception as e: logger.error("Error updating the certified addr book for peer: %s", e) return False return True - - -def env_to_send_in_RPC(host: IHost) -> tuple[bytes, bool]: - listen_addrs_set = {addr for addr in host.get_addrs()} - local_env = host.get_peerstore().get_local_record() - - if local_env is None: - # No cached SPR yet -> create one - return issue_and_cache_local_record(host), True - else: - record_addrs_set = local_env._env_addrs_set() - if record_addrs_set == listen_addrs_set: - # Perfect match -> reuse the cached envelope - return local_env.marshal_envelope(), False - else: - # Addresses changed -> issue a new SPR and cache it - return issue_and_cache_local_record(host), True - - -def issue_and_cache_local_record(host: IHost) -> bytes: - env = create_signed_peer_record( - host.get_id(), - host.get_addrs(), - host.get_private_key(), - ) - # Cache it for next time - host.get_peerstore().set_local_record(env) - return env.marshal_envelope() diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index 54bc67a1..9a09f34f 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -19,10 +19,11 @@ from libp2p.exceptions import ( from libp2p.network.stream.exceptions import ( StreamEOF, ) -from libp2p.peer.envelope import Envelope +from libp2p.peer.envelope import Envelope, seal_record from libp2p.peer.id import ( ID, ) +from libp2p.peer.peer_record import PeerRecord from libp2p.pubsub.pb import ( rpc_pb2, ) @@ -170,6 +171,8 @@ async def test_peer_subscribe_fail_upon_invald_record_transfer(): # Corrupt host_a's local peer record envelope = pubsubs_fsub[0].host.get_peerstore().get_local_record() + if envelope is not None: + true_record = envelope.record() key_pair = create_new_key_pair() if envelope is not None: @@ -183,6 +186,23 @@ async def test_peer_subscribe_fail_upon_invald_record_transfer(): TESTING_TOPIC, set() ) + # Create a corrupt envelope with correct signature but false peer-id + false_record = PeerRecord( + ID.from_pubkey(key_pair.public_key), true_record.addrs + ) + false_envelope = seal_record( + false_record, pubsubs_fsub[0].host.get_private_key() + ) + + pubsubs_fsub[0].host.get_peerstore().set_local_record(false_envelope) + + await pubsubs_fsub[0].subscribe(TESTING_TOPIC) + # Yeild to let 0 notify 1 + await trio.sleep(1) + assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics.get( + TESTING_TOPIC, set() + ) + @pytest.mark.trio async def test_get_hello_packet():