mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
verify peer_id in signed-record matches authenticated sender
This commit is contained in:
@ -280,7 +280,7 @@ class KadDHT(Service):
|
|||||||
logger.debug(f"Found {len(closest_peers)} peers close to target")
|
logger.debug(f"Found {len(closest_peers)} peers close to target")
|
||||||
|
|
||||||
# Consume the source signed_peer_record if sent
|
# Consume the source signed_peer_record if sent
|
||||||
if not maybe_consume_signed_record(message, self.host):
|
if not maybe_consume_signed_record(message, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, dropping the stream"
|
"Received an invalid-signed-record, dropping the stream"
|
||||||
)
|
)
|
||||||
@ -341,7 +341,7 @@ class KadDHT(Service):
|
|||||||
logger.debug(f"Received ADD_PROVIDER for key {key.hex()}")
|
logger.debug(f"Received ADD_PROVIDER for key {key.hex()}")
|
||||||
|
|
||||||
# Consume the source signed-peer-record if sent
|
# Consume the source signed-peer-record if sent
|
||||||
if not maybe_consume_signed_record(message, self.host):
|
if not maybe_consume_signed_record(message, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, dropping the stream"
|
"Received an invalid-signed-record, dropping the stream"
|
||||||
)
|
)
|
||||||
@ -376,7 +376,9 @@ class KadDHT(Service):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Process the signed-records of provider if sent
|
# Process the signed-records of provider if sent
|
||||||
if not maybe_consume_signed_record(message, self.host):
|
if not maybe_consume_signed_record(
|
||||||
|
message, self.host, peer_id
|
||||||
|
):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record,"
|
"Received an invalid-signed-record,"
|
||||||
"dropping the stream"
|
"dropping the stream"
|
||||||
@ -407,7 +409,7 @@ class KadDHT(Service):
|
|||||||
logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}")
|
logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}")
|
||||||
|
|
||||||
# Consume the source signed_peer_record if sent
|
# Consume the source signed_peer_record if sent
|
||||||
if not maybe_consume_signed_record(message, self.host):
|
if not maybe_consume_signed_record(message, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, dropping the stream"
|
"Received an invalid-signed-record, dropping the stream"
|
||||||
)
|
)
|
||||||
@ -501,7 +503,7 @@ class KadDHT(Service):
|
|||||||
logger.debug(f"Received GET_VALUE request for key {key.hex()}")
|
logger.debug(f"Received GET_VALUE request for key {key.hex()}")
|
||||||
|
|
||||||
# Consume the sender_signed_peer_record
|
# Consume the sender_signed_peer_record
|
||||||
if not maybe_consume_signed_record(message, self.host):
|
if not maybe_consume_signed_record(message, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, dropping the stream"
|
"Received an invalid-signed-record, dropping the stream"
|
||||||
)
|
)
|
||||||
@ -595,7 +597,7 @@ class KadDHT(Service):
|
|||||||
success = False
|
success = False
|
||||||
|
|
||||||
# Consume the source signed_peer_record if sent
|
# Consume the source signed_peer_record if sent
|
||||||
if not maybe_consume_signed_record(message, self.host):
|
if not maybe_consume_signed_record(message, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, dropping the stream"
|
"Received an invalid-signed-record, dropping the stream"
|
||||||
)
|
)
|
||||||
|
|||||||
@ -307,14 +307,15 @@ class PeerRouting(IPeerRouting):
|
|||||||
# Process closest peers from response
|
# Process closest peers from response
|
||||||
if response_msg.type == Message.MessageType.FIND_NODE:
|
if response_msg.type == Message.MessageType.FIND_NODE:
|
||||||
# Consume the sender_signed_peer_record
|
# Consume the sender_signed_peer_record
|
||||||
if not maybe_consume_signed_record(response_msg, self.host):
|
if not maybe_consume_signed_record(response_msg, self.host, peer):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record,ignoring the response"
|
"Received an invalid-signed-record,ignoring the response"
|
||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
for peer_data in response_msg.closerPeers:
|
for peer_data in response_msg.closerPeers:
|
||||||
# Consume the received closer_peers signed-records
|
# Consume the received closer_peers signed-records, peer-id is
|
||||||
|
# sent with the peer-data
|
||||||
if not maybe_consume_signed_record(peer_data, self.host):
|
if not maybe_consume_signed_record(peer_data, self.host):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record,ignoring the response"
|
"Received an invalid-signed-record,ignoring the response"
|
||||||
@ -353,6 +354,7 @@ class PeerRouting(IPeerRouting):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Read message length
|
# Read message length
|
||||||
|
peer_id = stream.muxed_conn.peer_id
|
||||||
length_bytes = await stream.read(4)
|
length_bytes = await stream.read(4)
|
||||||
if not length_bytes:
|
if not length_bytes:
|
||||||
return
|
return
|
||||||
@ -372,7 +374,7 @@ class PeerRouting(IPeerRouting):
|
|||||||
|
|
||||||
if kad_message.type == Message.MessageType.FIND_NODE:
|
if kad_message.type == Message.MessageType.FIND_NODE:
|
||||||
# Consume the sender's signed-peer-record if sent
|
# Consume the sender's signed-peer-record if sent
|
||||||
if not maybe_consume_signed_record(kad_message, self.host):
|
if not maybe_consume_signed_record(kad_message, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Receivedf an invalid-signed-record, dropping the stream"
|
"Receivedf an invalid-signed-record, dropping the stream"
|
||||||
)
|
)
|
||||||
|
|||||||
@ -286,7 +286,7 @@ class ProviderStore:
|
|||||||
|
|
||||||
if response.type == Message.MessageType.ADD_PROVIDER:
|
if response.type == Message.MessageType.ADD_PROVIDER:
|
||||||
# Consume the sender's signed-peer-record if sent
|
# Consume the sender's signed-peer-record if sent
|
||||||
if not maybe_consume_signed_record(response, self.host):
|
if not maybe_consume_signed_record(response, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, ignoring the response"
|
"Received an invalid-signed-record, ignoring the response"
|
||||||
)
|
)
|
||||||
@ -432,7 +432,7 @@ class ProviderStore:
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
# Consume the sender's signed-peer-record if sent
|
# Consume the sender's signed-peer-record if sent
|
||||||
if not maybe_consume_signed_record(response, self.host):
|
if not maybe_consume_signed_record(response, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Recieved an invalid-signed-record, ignoring the response"
|
"Recieved an invalid-signed-record, ignoring the response"
|
||||||
)
|
)
|
||||||
@ -442,7 +442,8 @@ class ProviderStore:
|
|||||||
providers = []
|
providers = []
|
||||||
for provider_proto in response.providerPeers:
|
for provider_proto in response.providerPeers:
|
||||||
try:
|
try:
|
||||||
# Consume the provider's signed-peer-record if sent
|
# Consume the provider's signed-peer-record if sent, peer-id
|
||||||
|
# already sent with the provider-proto
|
||||||
if not maybe_consume_signed_record(provider_proto, self.host):
|
if not maybe_consume_signed_record(provider_proto, self.host):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Recieved an invalid-signed-record, "
|
"Recieved an invalid-signed-record, "
|
||||||
|
|||||||
@ -21,16 +21,20 @@ from .pb.kademlia_pb2 import (
|
|||||||
logger = logging.getLogger("kademlia-example.utils")
|
logger = logging.getLogger("kademlia-example.utils")
|
||||||
|
|
||||||
|
|
||||||
def maybe_consume_signed_record(msg: Message | Message.Peer, host: IHost) -> bool:
|
def maybe_consume_signed_record(
|
||||||
|
msg: Message | Message.Peer, host: IHost, peer_id: ID | None = None
|
||||||
|
) -> bool:
|
||||||
if isinstance(msg, Message):
|
if isinstance(msg, Message):
|
||||||
if msg.HasField("senderRecord"):
|
if msg.HasField("senderRecord"):
|
||||||
try:
|
try:
|
||||||
# Convert the signed-peer-record(Envelope) from
|
# Convert the signed-peer-record(Envelope) from
|
||||||
# protobuf bytes
|
# protobuf bytes
|
||||||
envelope, _ = consume_envelope(
|
envelope, record = consume_envelope(
|
||||||
msg.senderRecord,
|
msg.senderRecord,
|
||||||
"libp2p-peer-record",
|
"libp2p-peer-record",
|
||||||
)
|
)
|
||||||
|
if not (isinstance(peer_id, ID) and record.peer_id == peer_id):
|
||||||
|
return False
|
||||||
# Use the default TTL of 2 hours (7200 seconds)
|
# Use the default TTL of 2 hours (7200 seconds)
|
||||||
if not host.get_peerstore().consume_peer_record(envelope, 7200):
|
if not host.get_peerstore().consume_peer_record(envelope, 7200):
|
||||||
logger.error("Updating the certified-addr-book was unsuccessful")
|
logger.error("Updating the certified-addr-book was unsuccessful")
|
||||||
@ -39,13 +43,16 @@ def maybe_consume_signed_record(msg: Message | Message.Peer, host: IHost) -> boo
|
|||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
if msg.HasField("signedRecord"):
|
if msg.HasField("signedRecord"):
|
||||||
|
# TODO: Check in with the Message.Peer id with the record's id
|
||||||
try:
|
try:
|
||||||
# Convert the signed-peer-record(Envelope) from
|
# Convert the signed-peer-record(Envelope) from
|
||||||
# protobuf bytes
|
# protobuf bytes
|
||||||
envelope, _ = consume_envelope(
|
envelope, record = consume_envelope(
|
||||||
msg.signedRecord,
|
msg.signedRecord,
|
||||||
"libp2p-peer-record",
|
"libp2p-peer-record",
|
||||||
)
|
)
|
||||||
|
if not record.peer_id.to_bytes() == msg.id:
|
||||||
|
return False
|
||||||
# Use the default TTL of 2 hours (7200 seconds)
|
# Use the default TTL of 2 hours (7200 seconds)
|
||||||
if not host.get_peerstore().consume_peer_record(envelope, 7200):
|
if not host.get_peerstore().consume_peer_record(envelope, 7200):
|
||||||
logger.error("Failed to update the Certified-Addr-Book")
|
logger.error("Failed to update the Certified-Addr-Book")
|
||||||
|
|||||||
@ -161,7 +161,7 @@ class ValueStore:
|
|||||||
# Check if response is valid
|
# Check if response is valid
|
||||||
if response.type == Message.MessageType.PUT_VALUE:
|
if response.type == Message.MessageType.PUT_VALUE:
|
||||||
# Consume the sender's signed-peer-record if sent
|
# Consume the sender's signed-peer-record if sent
|
||||||
if not maybe_consume_signed_record(response, self.host):
|
if not maybe_consume_signed_record(response, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, ignoring the response"
|
"Received an invalid-signed-record, ignoring the response"
|
||||||
)
|
)
|
||||||
@ -291,7 +291,7 @@ class ValueStore:
|
|||||||
and response.record.value
|
and response.record.value
|
||||||
):
|
):
|
||||||
# Consume the sender's signed-peer-record
|
# Consume the sender's signed-peer-record
|
||||||
if not maybe_consume_signed_record(response, self.host):
|
if not maybe_consume_signed_record(response, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, ignoring the response"
|
"Received an invalid-signed-record, ignoring the response"
|
||||||
)
|
)
|
||||||
|
|||||||
@ -25,7 +25,9 @@ from libp2p.kad_dht.kad_dht import (
|
|||||||
from libp2p.kad_dht.utils import (
|
from libp2p.kad_dht.utils import (
|
||||||
create_key_from_binary,
|
create_key_from_binary,
|
||||||
)
|
)
|
||||||
from libp2p.peer.envelope import Envelope
|
from libp2p.peer.envelope import Envelope, seal_record
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
from libp2p.peer.peer_record import PeerRecord
|
||||||
from libp2p.peer.peerinfo import (
|
from libp2p.peer.peerinfo import (
|
||||||
PeerInfo,
|
PeerInfo,
|
||||||
)
|
)
|
||||||
@ -394,6 +396,8 @@ async def test_dht_req_fail_with_invalid_record_transfer(
|
|||||||
|
|
||||||
# Corrupt dht_a's local peer_record
|
# Corrupt dht_a's local peer_record
|
||||||
envelope = dht_a.host.get_peerstore().get_local_record()
|
envelope = dht_a.host.get_peerstore().get_local_record()
|
||||||
|
if envelope is not None:
|
||||||
|
true_record = envelope.record()
|
||||||
key_pair = create_new_key_pair()
|
key_pair = create_new_key_pair()
|
||||||
|
|
||||||
if envelope is not None:
|
if envelope is not None:
|
||||||
@ -401,9 +405,21 @@ async def test_dht_req_fail_with_invalid_record_transfer(
|
|||||||
dht_a.host.get_peerstore().set_local_record(envelope)
|
dht_a.host.get_peerstore().set_local_record(envelope)
|
||||||
|
|
||||||
await dht_a.put_value(key, value)
|
await dht_a.put_value(key, value)
|
||||||
|
retrieved_value = dht_b.value_store.get(key)
|
||||||
value = dht_b.value_store.get(key)
|
|
||||||
|
|
||||||
# This proves that DHT_B rejected DHT_A PUT_RECORD req upon receiving
|
# This proves that DHT_B rejected DHT_A PUT_RECORD req upon receiving
|
||||||
# the corrupted invalid record
|
# the corrupted invalid record
|
||||||
assert value is None
|
assert retrieved_value is None
|
||||||
|
|
||||||
|
# Create a corrupt envelope with correct signature but false peer_id
|
||||||
|
false_record = PeerRecord(ID.from_pubkey(key_pair.public_key), true_record.addrs)
|
||||||
|
false_envelope = seal_record(false_record, dht_a.host.get_private_key())
|
||||||
|
|
||||||
|
dht_a.host.get_peerstore().set_local_record(false_envelope)
|
||||||
|
|
||||||
|
await dht_a.put_value(key, value)
|
||||||
|
retrieved_value = dht_b.value_store.get(key)
|
||||||
|
|
||||||
|
# This proves that DHT_B rejected DHT_A PUT_RECORD req upon receving
|
||||||
|
# the record with a different peer_id regardless of a valid signature
|
||||||
|
assert retrieved_value is None
|
||||||
|
|||||||
Reference in New Issue
Block a user