mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
fix typos
This commit is contained in:
@ -25,12 +25,14 @@ from libp2p.discovery.random_walk.rt_refresh_manager import RTRefreshManager
|
||||
from libp2p.network.stream.net_stream import (
|
||||
INetStream,
|
||||
)
|
||||
from libp2p.peer.envelope import Envelope, consume_envelope
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
from libp2p.peer.peerstore import create_signed_peer_record
|
||||
from libp2p.tools.async_service import (
|
||||
Service,
|
||||
)
|
||||
@ -234,6 +236,9 @@ class KadDHT(Service):
|
||||
await self.add_peer(peer_id)
|
||||
logger.debug(f"Added peer {peer_id} to routing table")
|
||||
|
||||
closer_peer_envelope: Envelope | None = None
|
||||
provider_peer_envelope: Envelope | None = None
|
||||
|
||||
try:
|
||||
# Read varint-prefixed length for the message
|
||||
length_prefix = b""
|
||||
@ -266,6 +271,7 @@ class KadDHT(Service):
|
||||
# Handle FIND_NODE message
|
||||
if message.type == Message.MessageType.FIND_NODE:
|
||||
# Get target key directly from protobuf
|
||||
print("FIND NODE RECEIVED")
|
||||
target_key = message.key
|
||||
|
||||
# Find closest peers to the target key
|
||||
@ -274,6 +280,26 @@ class KadDHT(Service):
|
||||
)
|
||||
logger.debug(f"Found {len(closest_peers)} peers close to target")
|
||||
|
||||
# Consume the source signed_peer_record if sent
|
||||
if message.HasField("senderRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
message.senderRecord, "libp2p-peer-record"
|
||||
)
|
||||
# Use the defualt TTL of 2 hours (7200 seconds)
|
||||
if not self.host.get_peerstore().consume_peer_record(
|
||||
envelope, 7200
|
||||
):
|
||||
logger.error(
|
||||
"Updating the Certified-Addr-Book was unsuccessful"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified addr book for peer: %s", e
|
||||
)
|
||||
|
||||
# Build response message with protobuf
|
||||
response = Message()
|
||||
response.type = Message.MessageType.FIND_NODE
|
||||
@ -298,6 +324,25 @@ class KadDHT(Service):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Add the signed-peer-record for each peer in the peer-proto
|
||||
# if cached in the peerstore
|
||||
closer_peer_envelope = (
|
||||
self.host.get_peerstore().get_peer_record(peer)
|
||||
)
|
||||
|
||||
if closer_peer_envelope is not None:
|
||||
peer_proto.signedRecord = (
|
||||
closer_peer_envelope.marshal_envelope()
|
||||
)
|
||||
|
||||
# Create sender_signed_peer_record
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
response.senderRecord = envelope.marshal_envelope()
|
||||
|
||||
# Serialize and send response
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
@ -312,6 +357,26 @@ class KadDHT(Service):
|
||||
key = message.key
|
||||
logger.debug(f"Received ADD_PROVIDER for key {key.hex()}")
|
||||
|
||||
# Consume the source signed-peer-record if sent
|
||||
if message.HasField("senderRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
message.senderRecord, "libp2p-peer-record"
|
||||
)
|
||||
# Use the default TTL of 2 hours (72000 seconds)
|
||||
if not self.host.get_peerstore().consume_peer_record(
|
||||
envelope, 7200
|
||||
):
|
||||
logger.error(
|
||||
"Updating the Certified-Addr-Book was unsuccessful"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified addr book for peer: %s", e
|
||||
)
|
||||
|
||||
# Extract provider information
|
||||
for provider_proto in message.providerPeers:
|
||||
try:
|
||||
@ -341,11 +406,42 @@ class KadDHT(Service):
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to process provider info: {e}")
|
||||
|
||||
# Process the signed-records of provider if sent
|
||||
if provider_proto.HasField("signedRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
provider_proto.signedRecord,
|
||||
"libp2p-peer-record",
|
||||
)
|
||||
# Use the default TTL of 2 hours (7200 seconds)
|
||||
if not self.host.get_peerstore().consume_peer_record( # noqa
|
||||
envelope, 7200
|
||||
):
|
||||
logger.error(
|
||||
"Failed to update the Certified-Addr-Book"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified-addr-book for peer %s: %s", # noqa
|
||||
provider_id,
|
||||
e,
|
||||
)
|
||||
|
||||
# Send acknowledgement
|
||||
response = Message()
|
||||
response.type = Message.MessageType.ADD_PROVIDER
|
||||
response.key = key
|
||||
|
||||
# Add sender's signed-peer-record
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
response.senderRecord = envelope.marshal_envelope()
|
||||
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
await stream.write(response_bytes)
|
||||
@ -357,6 +453,26 @@ class KadDHT(Service):
|
||||
key = message.key
|
||||
logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}")
|
||||
|
||||
# Consume the source signed_peer_record if sent
|
||||
if message.HasField("senderRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
message.senderRecord, "libp2p-peer-record"
|
||||
)
|
||||
# Use the defualt TTL of 2 hours (7200 seconds)
|
||||
if not self.host.get_peerstore().consume_peer_record(
|
||||
envelope, 7200
|
||||
):
|
||||
logger.error(
|
||||
"Updating the Certified-Addr-Book was unsuccessful"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified addr book for peer: %s", e
|
||||
)
|
||||
|
||||
# Find providers for the key
|
||||
providers = self.provider_store.get_providers(key)
|
||||
logger.debug(
|
||||
@ -368,12 +484,32 @@ class KadDHT(Service):
|
||||
response.type = Message.MessageType.GET_PROVIDERS
|
||||
response.key = key
|
||||
|
||||
# Create sender_signed_peer_record for the response
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
response.senderRecord = envelope.marshal_envelope()
|
||||
|
||||
# Add provider information to response
|
||||
for provider_info in providers:
|
||||
provider_proto = response.providerPeers.add()
|
||||
provider_proto.id = provider_info.peer_id.to_bytes()
|
||||
provider_proto.connection = Message.ConnectionType.CAN_CONNECT
|
||||
|
||||
# Add provider signed-records if cached
|
||||
provider_peer_envelope = (
|
||||
self.host.get_peerstore().get_peer_record(
|
||||
provider_info.peer_id
|
||||
)
|
||||
)
|
||||
|
||||
if provider_peer_envelope is not None:
|
||||
provider_proto.signedRecord = (
|
||||
provider_peer_envelope.marshal_envelope()
|
||||
)
|
||||
|
||||
# Add addresses if available
|
||||
for addr in provider_info.addrs:
|
||||
provider_proto.addrs.append(addr.to_bytes())
|
||||
@ -397,6 +533,16 @@ class KadDHT(Service):
|
||||
peer_proto.id = peer.to_bytes()
|
||||
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
|
||||
|
||||
# Add the signed-records of closest_peers if cached
|
||||
closer_peer_envelope = (
|
||||
self.host.get_peerstore().get_peer_record(peer)
|
||||
)
|
||||
|
||||
if closer_peer_envelope is not None:
|
||||
peer_proto.signedRecord = (
|
||||
closer_peer_envelope.marshal_envelope()
|
||||
)
|
||||
|
||||
# Add addresses if available
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer)
|
||||
@ -417,6 +563,26 @@ class KadDHT(Service):
|
||||
key = message.key
|
||||
logger.debug(f"Received GET_VALUE request for key {key.hex()}")
|
||||
|
||||
# Consume the sender_signed_peer_record
|
||||
if message.HasField("senderRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
message.senderRecord, "libp2p-peer-record"
|
||||
)
|
||||
# Use the default TTL of 2 hours (7200 seconds)
|
||||
if not self.host.get_peerstore().consume_peer_record(
|
||||
envelope, 7200
|
||||
):
|
||||
logger.error(
|
||||
"Updating teh Certified-Addr-Book was unsuccessful"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified addr book for peer: %s", e
|
||||
)
|
||||
|
||||
value = self.value_store.get(key)
|
||||
if value:
|
||||
logger.debug(f"Found value for key {key.hex()}")
|
||||
@ -431,6 +597,14 @@ class KadDHT(Service):
|
||||
response.record.value = value
|
||||
response.record.timeReceived = str(time.time())
|
||||
|
||||
# Create sender_signed_peer_record
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
response.senderRecord = envelope.marshal_envelope()
|
||||
|
||||
# Serialize and send response
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
@ -444,6 +618,14 @@ class KadDHT(Service):
|
||||
response.type = Message.MessageType.GET_VALUE
|
||||
response.key = key
|
||||
|
||||
# Create sender_signed_peer_record for the response
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
response.senderRecord = envelope.marshal_envelope()
|
||||
|
||||
# Add closest peers to key
|
||||
closest_peers = self.routing_table.find_local_closest_peers(
|
||||
key, 20
|
||||
@ -462,6 +644,16 @@ class KadDHT(Service):
|
||||
peer_proto.id = peer.to_bytes()
|
||||
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
|
||||
|
||||
# Add signed-records of closer-peers if cached
|
||||
closer_peer_envelope = (
|
||||
self.host.get_peerstore().get_peer_record(peer)
|
||||
)
|
||||
|
||||
if closer_peer_envelope is not None:
|
||||
peer_proto.signedRecord = (
|
||||
closer_peer_envelope.marshal_envelope()
|
||||
)
|
||||
|
||||
# Add addresses if available
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer)
|
||||
@ -484,6 +676,27 @@ class KadDHT(Service):
|
||||
key = message.record.key
|
||||
value = message.record.value
|
||||
success = False
|
||||
|
||||
# Consume the source signed_peer_record if sent
|
||||
if message.HasField("senderRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
message.senderRecord, "libp2p-peer-record"
|
||||
)
|
||||
# Use the default TTL of 2 hours (7200 seconds)
|
||||
if not self.host.get_peerstore().consume_peer_record(
|
||||
envelope, 7200
|
||||
):
|
||||
logger.error(
|
||||
"Updating the certified-addr-book was unsuccessful"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified addr book for peer: %s", e
|
||||
)
|
||||
|
||||
try:
|
||||
if not (key and value):
|
||||
raise ValueError(
|
||||
@ -504,6 +717,16 @@ class KadDHT(Service):
|
||||
response.type = Message.MessageType.PUT_VALUE
|
||||
if success:
|
||||
response.key = key
|
||||
|
||||
# Create sender_signed_peer_record for the response
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
response.senderRecord = envelope.marshal_envelope()
|
||||
|
||||
# Serialize and send response
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
await stream.write(response_bytes)
|
||||
|
||||
Reference in New Issue
Block a user