diff --git a/libp2p/kad_dht/kad_dht.py b/libp2p/kad_dht/kad_dht.py index 78cf50e2..db0e635e 100644 --- a/libp2p/kad_dht/kad_dht.py +++ b/libp2p/kad_dht/kad_dht.py @@ -22,10 +22,11 @@ from libp2p.abc import ( IHost, ) from libp2p.discovery.random_walk.rt_refresh_manager import RTRefreshManager +from libp2p.kad_dht.utils import maybe_consume_signed_record from libp2p.network.stream.net_stream import ( INetStream, ) -from libp2p.peer.envelope import Envelope, consume_envelope +from libp2p.peer.envelope import Envelope from libp2p.peer.id import ( ID, ) @@ -280,24 +281,7 @@ class KadDHT(Service): logger.debug(f"Found {len(closest_peers)} peers close to target") # Consume the source signed_peer_record if sent - if message.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - message.senderRecord, "libp2p-peer-record" - ) - # Use the defualt TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + success = maybe_consume_signed_record(message, self.host) # Build response message with protobuf response = Message() @@ -357,24 +341,7 @@ class KadDHT(Service): logger.debug(f"Received ADD_PROVIDER for key {key.hex()}") # Consume the source signed-peer-record if sent - if message.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - message.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + success = maybe_consume_signed_record(message, self.host) # Extract provider information for provider_proto in message.providerPeers: @@ -402,31 +369,13 @@ class KadDHT(Service): logger.debug( f"Added provider {provider_id} for key {key.hex()}" ) - except Exception as e: - logger.warning(f"Failed to process provider info: {e}") # Process the signed-records of provider if sent - if provider_proto.HasField("signedRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - provider_proto.signedRecord, - "libp2p-peer-record", - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( # noqa - envelope, 7200 - ): - logger.error( - "Failed to update the Certified-Addr-Book" - ) - except Exception as e: - logger.error( - "Error updating the certified-addr-book for peer %s: %s", # noqa - provider_id, - e, - ) + success = maybe_consume_signed_record( + provider_proto, self.host + ) + except Exception as e: + logger.warning(f"Failed to process provider info: {e}") # Send acknowledgement response = Message() @@ -453,24 +402,7 @@ class KadDHT(Service): logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}") # Consume the source signed_peer_record if sent - if message.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - message.senderRecord, "libp2p-peer-record" - ) - # Use the defualt TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + success = maybe_consume_signed_record(message, self.host) # Find providers for the key providers = self.provider_store.get_providers(key) @@ -563,24 +495,7 @@ class KadDHT(Service): logger.debug(f"Received GET_VALUE request for key {key.hex()}") # Consume the sender_signed_peer_record - if message.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - message.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating teh Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + success = maybe_consume_signed_record(message, self.host) value = self.value_store.get(key) if value: @@ -677,24 +592,7 @@ class KadDHT(Service): success = False # Consume the source signed_peer_record if sent - if message.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - message.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the certified-addr-book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + success = maybe_consume_signed_record(message, self.host) try: if not (key and value): diff --git a/libp2p/kad_dht/peer_routing.py b/libp2p/kad_dht/peer_routing.py index 58406f05..e36f7caf 100644 --- a/libp2p/kad_dht/peer_routing.py +++ b/libp2p/kad_dht/peer_routing.py @@ -15,7 +15,7 @@ from libp2p.abc import ( INetStream, IPeerRouting, ) -from libp2p.peer.envelope import Envelope, consume_envelope +from libp2p.peer.envelope import Envelope from libp2p.peer.id import ( ID, ) @@ -35,6 +35,7 @@ from .routing_table import ( RoutingTable, ) from .utils import ( + maybe_consume_signed_record, sort_peer_ids_by_distance, ) @@ -308,24 +309,7 @@ class PeerRouting(IPeerRouting): # Process closest peers from response if response_msg.type == Message.MessageType.FIND_NODE: # Consume the sender_signed_peer_record - if response_msg.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - response_msg.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating teh Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(response_msg, self.host) for peer_data in response_msg.closerPeers: new_peer_id = ID(peer_data.id) @@ -340,25 +324,7 @@ class PeerRouting(IPeerRouting): self.host.get_peerstore().add_addrs(new_peer_id, addrs, 3600) # Consume the received closer_peers signed-records - if peer_data.HasField("signedRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - peer_data.signedRecord, - "libp2p-peer-record", - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error("Failed to update certified-addr-book") - except Exception as e: - logger.error( - "Error updating the certified-addr-book for peer %s: %s", # noqa - new_peer_id, - e, - ) + _ = maybe_consume_signed_record(peer_data, self.host) except Exception as e: logger.debug(f"Error querying peer {peer} for closest: {e}") @@ -400,24 +366,7 @@ class PeerRouting(IPeerRouting): if kad_message.type == Message.MessageType.FIND_NODE: # Consume the sender's signed-peer-record if sent - if kad_message.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - kad_message.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(kad_message, self.host) # Get target key directly from protobuf message target_key = kad_message.key diff --git a/libp2p/kad_dht/provider_store.py b/libp2p/kad_dht/provider_store.py index c5800914..21bd1c80 100644 --- a/libp2p/kad_dht/provider_store.py +++ b/libp2p/kad_dht/provider_store.py @@ -22,7 +22,7 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.peer.envelope import consume_envelope +from libp2p.kad_dht.utils import maybe_consume_signed_record from libp2p.peer.id import ( ID, ) @@ -291,25 +291,7 @@ class ProviderStore: if response.type == Message.MessageType.ADD_PROVIDER: # Consume the sender's signed-peer-record if sent - if response.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - response.senderRecord, "libp2p-peer-record" - ) - # Use the defualt TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) - + _ = maybe_consume_signed_record(response, self.host) result = True except Exception as e: @@ -454,24 +436,7 @@ class ProviderStore: return [] # Consume the sender's signed-peer-record if sent - if response.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - response.senderRecord, "libp2p-peer-record" - ) - # Use the defualt TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(response, self.host) # Extract provider information providers = [] @@ -492,27 +457,7 @@ class ProviderStore: providers.append(PeerInfo(provider_id, addrs)) # Consume the provider's signed-peer-record if sent - if provider_proto.HasField("signedRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - provider_proto.signedRecord, - "libp2p-peer-record", - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( # noqa - envelope, 7200 - ): - logger.error( - "Failed to update the Certified-Addr-Book" - ) - except Exception as e: - logger.error( - "Error updating the certified-addr-book for peer %s: %s", # noqa - provider_id, - e, - ) + _ = maybe_consume_signed_record(provider_proto, self.host) except Exception as e: logger.warning(f"Failed to parse provider info: {e}") diff --git a/libp2p/kad_dht/utils.py b/libp2p/kad_dht/utils.py index 61158320..64976cb3 100644 --- a/libp2p/kad_dht/utils.py +++ b/libp2p/kad_dht/utils.py @@ -2,13 +2,57 @@ Utility functions for Kademlia DHT implementation. """ +import logging + import base58 import multihash +from libp2p.abc import IHost +from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) +from .pb.kademlia_pb2 import ( + Message, +) + +logger = logging.getLogger("kademlia-example.utils") + + +def maybe_consume_signed_record(msg: Message | Message.Peer, host: IHost) -> bool: + if isinstance(msg, Message): + if msg.HasField("senderRecord"): + try: + # Convert the signed-peer-record(Envelope) from + # protobuf bytes + envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record") + # Use the default TTL of 2 hours (7200 seconds) + if not host.get_peerstore().consume_peer_record(envelope, 7200): + logger.error("Updating the certified-addr-book was unsuccessful") + except Exception as e: + logger.error("Error updating teh certified addr book for peer: %s", e) + return False + else: + if msg.HasField("signedRecord"): + try: + # Convert the signed-peer-record(Envelope) from + # protobuf bytes + envelope, _ = consume_envelope( + msg.signedRecord, + "libp2p-peer-record", + ) + # Use the default TTL of 2 hours (7200 seconds) + if not host.get_peerstore().consume_peer_record(envelope, 7200): + logger.error("Failed to update the Certified-Addr-Book") + except Exception as e: + logger.error( + "Error updating the certified-addr-book: %s", + e, + ) + + return True + def create_key_from_binary(binary_data: bytes) -> bytes: """ diff --git a/libp2p/kad_dht/value_store.py b/libp2p/kad_dht/value_store.py index 28cc6d8c..adc37b72 100644 --- a/libp2p/kad_dht/value_store.py +++ b/libp2p/kad_dht/value_store.py @@ -15,7 +15,7 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.peer.envelope import consume_envelope +from libp2p.kad_dht.utils import maybe_consume_signed_record from libp2p.peer.id import ( ID, ) @@ -166,24 +166,7 @@ class ValueStore: # Check if response is valid if response.type == Message.MessageType.PUT_VALUE: # Consume the sender's signed-peer-record if sent - if response.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - response.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the certified-addr-book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(response, self.host) if response.key == key: result = True @@ -314,24 +297,7 @@ class ValueStore: and response.record.value ): # Consume the sender's signed-peer-record - if response.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - response.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if not self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the certified-addr-book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(response, self.host) logger.debug( f"Received value for key {key.hex()} from peer {peer_id}"