diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 170f558d..f0e09404 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -15,7 +15,7 @@ from libp2p.custom_types import ( from libp2p.peer.id import ( ID, ) -from libp2p.peer.peerstore import create_signed_peer_record +from libp2p.pubsub.utils import env_to_send_in_RPC from .exceptions import ( PubsubRouterError, @@ -106,12 +106,8 @@ class FloodSub(IPubsubRouter): # Add the senderRecord of the peer in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope = create_signed_peer_record( - self.pubsub.host.get_id(), - self.pubsub.host.get_addrs(), - self.pubsub.host.get_private_key(), - ) - rpc_msg.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + rpc_msg.senderRecord = envelope_bytes logger.debug("publishing message %s", pubsub_msg) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index b7c70c55..fa221a0f 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -24,7 +24,6 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) @@ -35,11 +34,11 @@ from libp2p.peer.peerinfo import ( ) from libp2p.peer.peerstore import ( PERMANENT_ADDR_TTL, - create_signed_peer_record, ) from libp2p.pubsub import ( floodsub, ) +from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record from libp2p.tools.async_service import ( Service, ) @@ -230,24 +229,7 @@ class GossipSub(IPubsubRouter, Service): """ # Process the senderRecord if sent if isinstance(self.pubsub, Pubsub): - if rpc.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - rpc.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if self.pubsub.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(rpc, self.pubsub.host) control_message = rpc.control @@ -278,12 +260,8 @@ class GossipSub(IPubsubRouter, Service): # Add the senderRecord of the peer in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope = create_signed_peer_record( - self.pubsub.host.get_id(), - self.pubsub.host.get_addrs(), - self.pubsub.host.get_private_key(), - ) - rpc_msg.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + rpc_msg.senderRecord = envelope_bytes logger.debug("publishing message %s", pubsub_msg) @@ -854,12 +832,8 @@ class GossipSub(IPubsubRouter, Service): # to the iwant control msg, so we will send a freshly created senderRecord # with the RPC msg if isinstance(self.pubsub, Pubsub): - envelope = create_signed_peer_record( - self.pubsub.host.get_id(), - self.pubsub.host.get_addrs(), - self.pubsub.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + packet.senderRecord = envelope_bytes packet.publish.extend(msgs_to_forward) @@ -1019,12 +993,8 @@ class GossipSub(IPubsubRouter, Service): # Add the sender's peer-record in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope = create_signed_peer_record( - self.pubsub.host.get_id(), - self.pubsub.host.get_addrs(), - self.pubsub.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + packet.senderRecord = envelope_bytes packet.control.CopyFrom(control_msg) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 54430f1b..cbaaafb5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -50,14 +50,13 @@ from libp2p.network.stream.exceptions import ( StreamEOF, StreamReset, ) -from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) from libp2p.peer.peerdata import ( PeerDataError, ) -from libp2p.peer.peerstore import create_signed_peer_record +from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record from libp2p.tools.async_service import ( Service, ) @@ -250,12 +249,8 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) # Add the sender's signedRecord in the RPC message - envelope = create_signed_peer_record( - self.host.get_id(), - self.host.get_addrs(), - self.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.host) + packet.senderRecord = envelope_bytes return packet @@ -275,24 +270,7 @@ class Pubsub(Service, IPubsub): rpc_incoming.ParseFromString(incoming) # Process the sender's signed-record if sent - if rpc_incoming.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - rpc_incoming.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(rpc_incoming, self.host) if rpc_incoming.publish: # deal with RPC.publish @@ -604,13 +582,8 @@ class Pubsub(Service, IPubsub): ) # Add the senderRecord of the peer in the RPC msg - envelope = create_signed_peer_record( - self.host.get_id(), - self.host.get_addrs(), - self.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() - + envelope_bytes, bool = env_to_send_in_RPC(self.host) + packet.senderRecord = envelope_bytes # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -644,12 +617,8 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)] ) # Add the senderRecord of the peer in the RPC msg - envelope = create_signed_peer_record( - self.host.get_id(), - self.host.get_addrs(), - self.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.host) + packet.senderRecord = envelope_bytes # Send out unsubscribe message to all peers await self.message_all_peers(packet.SerializeToString()) diff --git a/libp2p/pubsub/utils.py b/libp2p/pubsub/utils.py new file mode 100644 index 00000000..163a2870 --- /dev/null +++ b/libp2p/pubsub/utils.py @@ -0,0 +1,51 @@ +import logging + +from libp2p.abc import IHost +from libp2p.peer.envelope import consume_envelope +from libp2p.peer.peerstore import create_signed_peer_record +from libp2p.pubsub.pb.rpc_pb2 import RPC + +logger = logging.getLogger("pubsub-example.utils") + + +def maybe_consume_signed_record(msg: RPC, host: IHost) -> bool: + if msg.HasField("senderRecord"): + try: + # Convert the signed-peer-record(Envelope) from + # protobuf bytes + envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record") + # Use the default TTL of 2 hours (7200 seconds) + if not host.get_peerstore().consume_peer_record(envelope, 7200): + logger.error("Updating the certified-addr-book was unsuccessful") + except Exception as e: + logger.error("Error updating the certified addr book for peer: %s", e) + return False + return True + + +def env_to_send_in_RPC(host: IHost) -> tuple[bytes, bool]: + listen_addrs_set = {addr for addr in host.get_addrs()} + local_env = host.get_peerstore().get_local_record() + + if local_env is None: + # No cached SPR yet -> create one + return issue_and_cache_local_record(host), True + else: + record_addrs_set = local_env._env_addrs_set() + if record_addrs_set == listen_addrs_set: + # Perfect match -> reuse the cached envelope + return local_env.marshal_envelope(), False + else: + # Addresses changed -> issue a new SPR and cache it + return issue_and_cache_local_record(host), True + + +def issue_and_cache_local_record(host: IHost) -> bytes: + env = create_signed_peer_record( + host.get_id(), + host.get_addrs(), + host.get_private_key(), + ) + # Cache it for next time + host.get_peerstore().set_local_record(env) + return env.marshal_envelope()