mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
updated as per the suggestions in #815
This commit is contained in:
@ -15,7 +15,7 @@ from libp2p.custom_types import (
|
|||||||
from libp2p.peer.id import (
|
from libp2p.peer.id import (
|
||||||
ID,
|
ID,
|
||||||
)
|
)
|
||||||
from libp2p.pubsub.utils import env_to_send_in_RPC
|
from libp2p.peer.peerstore import env_to_send_in_RPC
|
||||||
|
|
||||||
from .exceptions import (
|
from .exceptions import (
|
||||||
PubsubRouterError,
|
PubsubRouterError,
|
||||||
@ -106,7 +106,7 @@ class FloodSub(IPubsubRouter):
|
|||||||
|
|
||||||
# Add the senderRecord of the peer in the RPC msg
|
# Add the senderRecord of the peer in the RPC msg
|
||||||
if isinstance(self.pubsub, Pubsub):
|
if isinstance(self.pubsub, Pubsub):
|
||||||
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
|
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
|
||||||
rpc_msg.senderRecord = envelope_bytes
|
rpc_msg.senderRecord = envelope_bytes
|
||||||
|
|
||||||
logger.debug("publishing message %s", pubsub_msg)
|
logger.debug("publishing message %s", pubsub_msg)
|
||||||
|
|||||||
@ -34,11 +34,12 @@ from libp2p.peer.peerinfo import (
|
|||||||
)
|
)
|
||||||
from libp2p.peer.peerstore import (
|
from libp2p.peer.peerstore import (
|
||||||
PERMANENT_ADDR_TTL,
|
PERMANENT_ADDR_TTL,
|
||||||
|
env_to_send_in_RPC,
|
||||||
)
|
)
|
||||||
from libp2p.pubsub import (
|
from libp2p.pubsub import (
|
||||||
floodsub,
|
floodsub,
|
||||||
)
|
)
|
||||||
from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record
|
from libp2p.pubsub.utils import maybe_consume_signed_record
|
||||||
from libp2p.tools.async_service import (
|
from libp2p.tools.async_service import (
|
||||||
Service,
|
Service,
|
||||||
)
|
)
|
||||||
@ -229,7 +230,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
"""
|
"""
|
||||||
# Process the senderRecord if sent
|
# Process the senderRecord if sent
|
||||||
if isinstance(self.pubsub, Pubsub):
|
if isinstance(self.pubsub, Pubsub):
|
||||||
if not maybe_consume_signed_record(rpc, self.pubsub.host):
|
if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id):
|
||||||
logger.error("Received an invalid-signed-record, ignoring the message")
|
logger.error("Received an invalid-signed-record, ignoring the message")
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -262,7 +263,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
|
|
||||||
# Add the senderRecord of the peer in the RPC msg
|
# Add the senderRecord of the peer in the RPC msg
|
||||||
if isinstance(self.pubsub, Pubsub):
|
if isinstance(self.pubsub, Pubsub):
|
||||||
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
|
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
|
||||||
rpc_msg.senderRecord = envelope_bytes
|
rpc_msg.senderRecord = envelope_bytes
|
||||||
|
|
||||||
logger.debug("publishing message %s", pubsub_msg)
|
logger.debug("publishing message %s", pubsub_msg)
|
||||||
@ -834,7 +835,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
# to the iwant control msg, so we will send a freshly created senderRecord
|
# to the iwant control msg, so we will send a freshly created senderRecord
|
||||||
# with the RPC msg
|
# with the RPC msg
|
||||||
if isinstance(self.pubsub, Pubsub):
|
if isinstance(self.pubsub, Pubsub):
|
||||||
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
|
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
|
||||||
packet.senderRecord = envelope_bytes
|
packet.senderRecord = envelope_bytes
|
||||||
|
|
||||||
packet.publish.extend(msgs_to_forward)
|
packet.publish.extend(msgs_to_forward)
|
||||||
@ -995,7 +996,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
|
|
||||||
# Add the sender's peer-record in the RPC msg
|
# Add the sender's peer-record in the RPC msg
|
||||||
if isinstance(self.pubsub, Pubsub):
|
if isinstance(self.pubsub, Pubsub):
|
||||||
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
|
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
|
||||||
packet.senderRecord = envelope_bytes
|
packet.senderRecord = envelope_bytes
|
||||||
|
|
||||||
packet.control.CopyFrom(control_msg)
|
packet.control.CopyFrom(control_msg)
|
||||||
|
|||||||
@ -56,7 +56,8 @@ from libp2p.peer.id import (
|
|||||||
from libp2p.peer.peerdata import (
|
from libp2p.peer.peerdata import (
|
||||||
PeerDataError,
|
PeerDataError,
|
||||||
)
|
)
|
||||||
from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record
|
from libp2p.peer.peerstore import env_to_send_in_RPC
|
||||||
|
from libp2p.pubsub.utils import maybe_consume_signed_record
|
||||||
from libp2p.tools.async_service import (
|
from libp2p.tools.async_service import (
|
||||||
Service,
|
Service,
|
||||||
)
|
)
|
||||||
@ -249,7 +250,7 @@ class Pubsub(Service, IPubsub):
|
|||||||
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
|
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
|
||||||
)
|
)
|
||||||
# Add the sender's signedRecord in the RPC message
|
# Add the sender's signedRecord in the RPC message
|
||||||
envelope_bytes, bool = env_to_send_in_RPC(self.host)
|
envelope_bytes, _ = env_to_send_in_RPC(self.host)
|
||||||
packet.senderRecord = envelope_bytes
|
packet.senderRecord = envelope_bytes
|
||||||
|
|
||||||
return packet
|
return packet
|
||||||
@ -270,7 +271,7 @@ class Pubsub(Service, IPubsub):
|
|||||||
rpc_incoming.ParseFromString(incoming)
|
rpc_incoming.ParseFromString(incoming)
|
||||||
|
|
||||||
# Process the sender's signed-record if sent
|
# Process the sender's signed-record if sent
|
||||||
if not maybe_consume_signed_record(rpc_incoming, self.host):
|
if not maybe_consume_signed_record(rpc_incoming, self.host, peer_id):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Received an invalid-signed-record, ignoring the incoming msg"
|
"Received an invalid-signed-record, ignoring the incoming msg"
|
||||||
)
|
)
|
||||||
@ -586,7 +587,7 @@ class Pubsub(Service, IPubsub):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Add the senderRecord of the peer in the RPC msg
|
# Add the senderRecord of the peer in the RPC msg
|
||||||
envelope_bytes, bool = env_to_send_in_RPC(self.host)
|
envelope_bytes, _ = env_to_send_in_RPC(self.host)
|
||||||
packet.senderRecord = envelope_bytes
|
packet.senderRecord = envelope_bytes
|
||||||
# Send out subscribe message to all peers
|
# Send out subscribe message to all peers
|
||||||
await self.message_all_peers(packet.SerializeToString())
|
await self.message_all_peers(packet.SerializeToString())
|
||||||
@ -621,7 +622,7 @@ class Pubsub(Service, IPubsub):
|
|||||||
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]
|
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]
|
||||||
)
|
)
|
||||||
# Add the senderRecord of the peer in the RPC msg
|
# Add the senderRecord of the peer in the RPC msg
|
||||||
envelope_bytes, bool = env_to_send_in_RPC(self.host)
|
envelope_bytes, _ = env_to_send_in_RPC(self.host)
|
||||||
packet.senderRecord = envelope_bytes
|
packet.senderRecord = envelope_bytes
|
||||||
|
|
||||||
# Send out unsubscribe message to all peers
|
# Send out unsubscribe message to all peers
|
||||||
|
|||||||
@ -2,50 +2,49 @@ import logging
|
|||||||
|
|
||||||
from libp2p.abc import IHost
|
from libp2p.abc import IHost
|
||||||
from libp2p.peer.envelope import consume_envelope
|
from libp2p.peer.envelope import consume_envelope
|
||||||
from libp2p.peer.peerstore import create_signed_peer_record
|
from libp2p.peer.id import ID
|
||||||
from libp2p.pubsub.pb.rpc_pb2 import RPC
|
from libp2p.pubsub.pb.rpc_pb2 import RPC
|
||||||
|
|
||||||
logger = logging.getLogger("pubsub-example.utils")
|
logger = logging.getLogger("pubsub-example.utils")
|
||||||
|
|
||||||
|
|
||||||
def maybe_consume_signed_record(msg: RPC, host: IHost) -> bool:
|
def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool:
|
||||||
|
"""
|
||||||
|
Attempt to parse and store a signed-peer-record (Envelope) received during
|
||||||
|
PubSub communication. If the record is invalid, the peer-id does not match, or
|
||||||
|
updating the peerstore fails, the function logs an error and returns False.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
msg : RPC
|
||||||
|
The protobuf message received during PubSub communication.
|
||||||
|
host : IHost
|
||||||
|
The local host instance, providing access to the peerstore for storing
|
||||||
|
verified peer records.
|
||||||
|
peer_id : ID | None, optional
|
||||||
|
The expected peer ID for record validation. If provided, the peer ID
|
||||||
|
inside the record must match this value.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
bool
|
||||||
|
True if a valid signed peer record was successfully consumed and stored,
|
||||||
|
False otherwise.
|
||||||
|
|
||||||
|
"""
|
||||||
if msg.HasField("senderRecord"):
|
if msg.HasField("senderRecord"):
|
||||||
try:
|
try:
|
||||||
# Convert the signed-peer-record(Envelope) from
|
# Convert the signed-peer-record(Envelope) from
|
||||||
# protobuf bytes
|
# protobuf bytes
|
||||||
envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record")
|
envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record")
|
||||||
|
if not record.peer_id == peer_id:
|
||||||
|
return False
|
||||||
|
|
||||||
# Use the default TTL of 2 hours (7200 seconds)
|
# Use the default TTL of 2 hours (7200 seconds)
|
||||||
if not host.get_peerstore().consume_peer_record(envelope, 7200):
|
if not host.get_peerstore().consume_peer_record(envelope, 7200):
|
||||||
logger.error("Updating the certified-addr-book was unsuccessful")
|
logger.error("Updating the certified-addr-book was unsuccessful")
|
||||||
|
return False
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error updating the certified addr book for peer: %s", e)
|
logger.error("Error updating the certified addr book for peer: %s", e)
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def env_to_send_in_RPC(host: IHost) -> tuple[bytes, bool]:
|
|
||||||
listen_addrs_set = {addr for addr in host.get_addrs()}
|
|
||||||
local_env = host.get_peerstore().get_local_record()
|
|
||||||
|
|
||||||
if local_env is None:
|
|
||||||
# No cached SPR yet -> create one
|
|
||||||
return issue_and_cache_local_record(host), True
|
|
||||||
else:
|
|
||||||
record_addrs_set = local_env._env_addrs_set()
|
|
||||||
if record_addrs_set == listen_addrs_set:
|
|
||||||
# Perfect match -> reuse the cached envelope
|
|
||||||
return local_env.marshal_envelope(), False
|
|
||||||
else:
|
|
||||||
# Addresses changed -> issue a new SPR and cache it
|
|
||||||
return issue_and_cache_local_record(host), True
|
|
||||||
|
|
||||||
|
|
||||||
def issue_and_cache_local_record(host: IHost) -> bytes:
|
|
||||||
env = create_signed_peer_record(
|
|
||||||
host.get_id(),
|
|
||||||
host.get_addrs(),
|
|
||||||
host.get_private_key(),
|
|
||||||
)
|
|
||||||
# Cache it for next time
|
|
||||||
host.get_peerstore().set_local_record(env)
|
|
||||||
return env.marshal_envelope()
|
|
||||||
|
|||||||
@ -19,10 +19,11 @@ from libp2p.exceptions import (
|
|||||||
from libp2p.network.stream.exceptions import (
|
from libp2p.network.stream.exceptions import (
|
||||||
StreamEOF,
|
StreamEOF,
|
||||||
)
|
)
|
||||||
from libp2p.peer.envelope import Envelope
|
from libp2p.peer.envelope import Envelope, seal_record
|
||||||
from libp2p.peer.id import (
|
from libp2p.peer.id import (
|
||||||
ID,
|
ID,
|
||||||
)
|
)
|
||||||
|
from libp2p.peer.peer_record import PeerRecord
|
||||||
from libp2p.pubsub.pb import (
|
from libp2p.pubsub.pb import (
|
||||||
rpc_pb2,
|
rpc_pb2,
|
||||||
)
|
)
|
||||||
@ -170,6 +171,8 @@ async def test_peer_subscribe_fail_upon_invald_record_transfer():
|
|||||||
|
|
||||||
# Corrupt host_a's local peer record
|
# Corrupt host_a's local peer record
|
||||||
envelope = pubsubs_fsub[0].host.get_peerstore().get_local_record()
|
envelope = pubsubs_fsub[0].host.get_peerstore().get_local_record()
|
||||||
|
if envelope is not None:
|
||||||
|
true_record = envelope.record()
|
||||||
key_pair = create_new_key_pair()
|
key_pair = create_new_key_pair()
|
||||||
|
|
||||||
if envelope is not None:
|
if envelope is not None:
|
||||||
@ -183,6 +186,23 @@ async def test_peer_subscribe_fail_upon_invald_record_transfer():
|
|||||||
TESTING_TOPIC, set()
|
TESTING_TOPIC, set()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Create a corrupt envelope with correct signature but false peer-id
|
||||||
|
false_record = PeerRecord(
|
||||||
|
ID.from_pubkey(key_pair.public_key), true_record.addrs
|
||||||
|
)
|
||||||
|
false_envelope = seal_record(
|
||||||
|
false_record, pubsubs_fsub[0].host.get_private_key()
|
||||||
|
)
|
||||||
|
|
||||||
|
pubsubs_fsub[0].host.get_peerstore().set_local_record(false_envelope)
|
||||||
|
|
||||||
|
await pubsubs_fsub[0].subscribe(TESTING_TOPIC)
|
||||||
|
# Yeild to let 0 notify 1
|
||||||
|
await trio.sleep(1)
|
||||||
|
assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics.get(
|
||||||
|
TESTING_TOPIC, set()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.trio
|
@pytest.mark.trio
|
||||||
async def test_get_hello_packet():
|
async def test_get_hello_packet():
|
||||||
|
|||||||
Reference in New Issue
Block a user