remove too much repeatitive code

This commit is contained in:
lla-dane
2025-08-12 13:53:40 +05:30
parent a21d9e878b
commit 702ad4876e
5 changed files with 68 additions and 266 deletions

View File

@ -22,10 +22,11 @@ from libp2p.abc import (
IHost, IHost,
) )
from libp2p.discovery.random_walk.rt_refresh_manager import RTRefreshManager from libp2p.discovery.random_walk.rt_refresh_manager import RTRefreshManager
from libp2p.kad_dht.utils import maybe_consume_signed_record
from libp2p.network.stream.net_stream import ( from libp2p.network.stream.net_stream import (
INetStream, INetStream,
) )
from libp2p.peer.envelope import Envelope, consume_envelope from libp2p.peer.envelope import Envelope
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
) )
@ -280,24 +281,7 @@ class KadDHT(Service):
logger.debug(f"Found {len(closest_peers)} peers close to target") logger.debug(f"Found {len(closest_peers)} peers close to target")
# Consume the source signed_peer_record if sent # Consume the source signed_peer_record if sent
if message.HasField("senderRecord"): success = maybe_consume_signed_record(message, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
message.senderRecord, "libp2p-peer-record"
)
# Use the defualt TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
# Build response message with protobuf # Build response message with protobuf
response = Message() response = Message()
@ -357,24 +341,7 @@ class KadDHT(Service):
logger.debug(f"Received ADD_PROVIDER for key {key.hex()}") logger.debug(f"Received ADD_PROVIDER for key {key.hex()}")
# Consume the source signed-peer-record if sent # Consume the source signed-peer-record if sent
if message.HasField("senderRecord"): success = maybe_consume_signed_record(message, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
message.senderRecord, "libp2p-peer-record"
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
# Extract provider information # Extract provider information
for provider_proto in message.providerPeers: for provider_proto in message.providerPeers:
@ -402,31 +369,13 @@ class KadDHT(Service):
logger.debug( logger.debug(
f"Added provider {provider_id} for key {key.hex()}" f"Added provider {provider_id} for key {key.hex()}"
) )
except Exception as e:
logger.warning(f"Failed to process provider info: {e}")
# Process the signed-records of provider if sent # Process the signed-records of provider if sent
if provider_proto.HasField("signedRecord"): success = maybe_consume_signed_record(
try: provider_proto, self.host
# Convert the signed-peer-record(Envelope) from )
# protobuf bytes except Exception as e:
envelope, _ = consume_envelope( logger.warning(f"Failed to process provider info: {e}")
provider_proto.signedRecord,
"libp2p-peer-record",
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record( # noqa
envelope, 7200
):
logger.error(
"Failed to update the Certified-Addr-Book"
)
except Exception as e:
logger.error(
"Error updating the certified-addr-book for peer %s: %s", # noqa
provider_id,
e,
)
# Send acknowledgement # Send acknowledgement
response = Message() response = Message()
@ -453,24 +402,7 @@ class KadDHT(Service):
logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}") logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}")
# Consume the source signed_peer_record if sent # Consume the source signed_peer_record if sent
if message.HasField("senderRecord"): success = maybe_consume_signed_record(message, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
message.senderRecord, "libp2p-peer-record"
)
# Use the defualt TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
# Find providers for the key # Find providers for the key
providers = self.provider_store.get_providers(key) providers = self.provider_store.get_providers(key)
@ -563,24 +495,7 @@ class KadDHT(Service):
logger.debug(f"Received GET_VALUE request for key {key.hex()}") logger.debug(f"Received GET_VALUE request for key {key.hex()}")
# Consume the sender_signed_peer_record # Consume the sender_signed_peer_record
if message.HasField("senderRecord"): success = maybe_consume_signed_record(message, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
message.senderRecord, "libp2p-peer-record"
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating teh Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
value = self.value_store.get(key) value = self.value_store.get(key)
if value: if value:
@ -677,24 +592,7 @@ class KadDHT(Service):
success = False success = False
# Consume the source signed_peer_record if sent # Consume the source signed_peer_record if sent
if message.HasField("senderRecord"): success = maybe_consume_signed_record(message, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
message.senderRecord, "libp2p-peer-record"
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the certified-addr-book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
try: try:
if not (key and value): if not (key and value):

View File

@ -15,7 +15,7 @@ from libp2p.abc import (
INetStream, INetStream,
IPeerRouting, IPeerRouting,
) )
from libp2p.peer.envelope import Envelope, consume_envelope from libp2p.peer.envelope import Envelope
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
) )
@ -35,6 +35,7 @@ from .routing_table import (
RoutingTable, RoutingTable,
) )
from .utils import ( from .utils import (
maybe_consume_signed_record,
sort_peer_ids_by_distance, sort_peer_ids_by_distance,
) )
@ -308,24 +309,7 @@ class PeerRouting(IPeerRouting):
# Process closest peers from response # Process closest peers from response
if response_msg.type == Message.MessageType.FIND_NODE: if response_msg.type == Message.MessageType.FIND_NODE:
# Consume the sender_signed_peer_record # Consume the sender_signed_peer_record
if response_msg.HasField("senderRecord"): _ = maybe_consume_signed_record(response_msg, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
response_msg.senderRecord, "libp2p-peer-record"
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating teh Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
for peer_data in response_msg.closerPeers: for peer_data in response_msg.closerPeers:
new_peer_id = ID(peer_data.id) new_peer_id = ID(peer_data.id)
@ -340,25 +324,7 @@ class PeerRouting(IPeerRouting):
self.host.get_peerstore().add_addrs(new_peer_id, addrs, 3600) self.host.get_peerstore().add_addrs(new_peer_id, addrs, 3600)
# Consume the received closer_peers signed-records # Consume the received closer_peers signed-records
if peer_data.HasField("signedRecord"): _ = maybe_consume_signed_record(peer_data, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
peer_data.signedRecord,
"libp2p-peer-record",
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error("Failed to update certified-addr-book")
except Exception as e:
logger.error(
"Error updating the certified-addr-book for peer %s: %s", # noqa
new_peer_id,
e,
)
except Exception as e: except Exception as e:
logger.debug(f"Error querying peer {peer} for closest: {e}") logger.debug(f"Error querying peer {peer} for closest: {e}")
@ -400,24 +366,7 @@ class PeerRouting(IPeerRouting):
if kad_message.type == Message.MessageType.FIND_NODE: if kad_message.type == Message.MessageType.FIND_NODE:
# Consume the sender's signed-peer-record if sent # Consume the sender's signed-peer-record if sent
if kad_message.HasField("senderRecord"): _ = maybe_consume_signed_record(kad_message, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
kad_message.senderRecord, "libp2p-peer-record"
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
# Get target key directly from protobuf message # Get target key directly from protobuf message
target_key = kad_message.key target_key = kad_message.key

View File

@ -22,7 +22,7 @@ from libp2p.abc import (
from libp2p.custom_types import ( from libp2p.custom_types import (
TProtocol, TProtocol,
) )
from libp2p.peer.envelope import consume_envelope from libp2p.kad_dht.utils import maybe_consume_signed_record
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
) )
@ -291,25 +291,7 @@ class ProviderStore:
if response.type == Message.MessageType.ADD_PROVIDER: if response.type == Message.MessageType.ADD_PROVIDER:
# Consume the sender's signed-peer-record if sent # Consume the sender's signed-peer-record if sent
if response.HasField("senderRecord"): _ = maybe_consume_signed_record(response, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
response.senderRecord, "libp2p-peer-record"
)
# Use the defualt TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
result = True result = True
except Exception as e: except Exception as e:
@ -454,24 +436,7 @@ class ProviderStore:
return [] return []
# Consume the sender's signed-peer-record if sent # Consume the sender's signed-peer-record if sent
if response.HasField("senderRecord"): _ = maybe_consume_signed_record(response, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
response.senderRecord, "libp2p-peer-record"
)
# Use the defualt TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the Certified-Addr-Book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
# Extract provider information # Extract provider information
providers = [] providers = []
@ -492,27 +457,7 @@ class ProviderStore:
providers.append(PeerInfo(provider_id, addrs)) providers.append(PeerInfo(provider_id, addrs))
# Consume the provider's signed-peer-record if sent # Consume the provider's signed-peer-record if sent
if provider_proto.HasField("signedRecord"): _ = maybe_consume_signed_record(provider_proto, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
provider_proto.signedRecord,
"libp2p-peer-record",
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record( # noqa
envelope, 7200
):
logger.error(
"Failed to update the Certified-Addr-Book"
)
except Exception as e:
logger.error(
"Error updating the certified-addr-book for peer %s: %s", # noqa
provider_id,
e,
)
except Exception as e: except Exception as e:
logger.warning(f"Failed to parse provider info: {e}") logger.warning(f"Failed to parse provider info: {e}")

View File

@ -2,13 +2,57 @@
Utility functions for Kademlia DHT implementation. Utility functions for Kademlia DHT implementation.
""" """
import logging
import base58 import base58
import multihash import multihash
from libp2p.abc import IHost
from libp2p.peer.envelope import consume_envelope
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
) )
from .pb.kademlia_pb2 import (
Message,
)
logger = logging.getLogger("kademlia-example.utils")
def maybe_consume_signed_record(msg: Message | Message.Peer, host: IHost) -> bool:
if isinstance(msg, Message):
if msg.HasField("senderRecord"):
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record")
# Use the default TTL of 2 hours (7200 seconds)
if not host.get_peerstore().consume_peer_record(envelope, 7200):
logger.error("Updating the certified-addr-book was unsuccessful")
except Exception as e:
logger.error("Error updating teh certified addr book for peer: %s", e)
return False
else:
if msg.HasField("signedRecord"):
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
msg.signedRecord,
"libp2p-peer-record",
)
# Use the default TTL of 2 hours (7200 seconds)
if not host.get_peerstore().consume_peer_record(envelope, 7200):
logger.error("Failed to update the Certified-Addr-Book")
except Exception as e:
logger.error(
"Error updating the certified-addr-book: %s",
e,
)
return True
def create_key_from_binary(binary_data: bytes) -> bytes: def create_key_from_binary(binary_data: bytes) -> bytes:
""" """

View File

@ -15,7 +15,7 @@ from libp2p.abc import (
from libp2p.custom_types import ( from libp2p.custom_types import (
TProtocol, TProtocol,
) )
from libp2p.peer.envelope import consume_envelope from libp2p.kad_dht.utils import maybe_consume_signed_record
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
) )
@ -166,24 +166,7 @@ class ValueStore:
# Check if response is valid # Check if response is valid
if response.type == Message.MessageType.PUT_VALUE: if response.type == Message.MessageType.PUT_VALUE:
# Consume the sender's signed-peer-record if sent # Consume the sender's signed-peer-record if sent
if response.HasField("senderRecord"): _ = maybe_consume_signed_record(response, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
response.senderRecord, "libp2p-peer-record"
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the certified-addr-book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
if response.key == key: if response.key == key:
result = True result = True
@ -314,24 +297,7 @@ class ValueStore:
and response.record.value and response.record.value
): ):
# Consume the sender's signed-peer-record # Consume the sender's signed-peer-record
if response.HasField("senderRecord"): _ = maybe_consume_signed_record(response, self.host)
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, _ = consume_envelope(
response.senderRecord, "libp2p-peer-record"
)
# Use the default TTL of 2 hours (7200 seconds)
if not self.host.get_peerstore().consume_peer_record(
envelope, 7200
):
logger.error(
"Updating the certified-addr-book was unsuccessful"
)
except Exception as e:
logger.error(
"Error updating the certified addr book for peer: %s", e
)
logger.debug( logger.debug(
f"Received value for key {key.hex()} from peer {peer_id}" f"Received value for key {key.hex()} from peer {peer_id}"