mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
add reissuing mechanism of records if addrs dont change as done in #815
This commit is contained in:
@ -15,7 +15,7 @@ from libp2p.custom_types import (
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerstore import create_signed_peer_record
|
||||
from libp2p.pubsub.utils import env_to_send_in_RPC
|
||||
|
||||
from .exceptions import (
|
||||
PubsubRouterError,
|
||||
@ -106,12 +106,8 @@ class FloodSub(IPubsubRouter):
|
||||
|
||||
# Add the senderRecord of the peer in the RPC msg
|
||||
if isinstance(self.pubsub, Pubsub):
|
||||
envelope = create_signed_peer_record(
|
||||
self.pubsub.host.get_id(),
|
||||
self.pubsub.host.get_addrs(),
|
||||
self.pubsub.host.get_private_key(),
|
||||
)
|
||||
rpc_msg.senderRecord = envelope.marshal_envelope()
|
||||
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
|
||||
rpc_msg.senderRecord = envelope_bytes
|
||||
|
||||
logger.debug("publishing message %s", pubsub_msg)
|
||||
|
||||
|
||||
@ -24,7 +24,6 @@ from libp2p.abc import (
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.peer.envelope import consume_envelope
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
@ -35,11 +34,11 @@ from libp2p.peer.peerinfo import (
|
||||
)
|
||||
from libp2p.peer.peerstore import (
|
||||
PERMANENT_ADDR_TTL,
|
||||
create_signed_peer_record,
|
||||
)
|
||||
from libp2p.pubsub import (
|
||||
floodsub,
|
||||
)
|
||||
from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record
|
||||
from libp2p.tools.async_service import (
|
||||
Service,
|
||||
)
|
||||
@ -230,24 +229,7 @@ class GossipSub(IPubsubRouter, Service):
|
||||
"""
|
||||
# Process the senderRecord if sent
|
||||
if isinstance(self.pubsub, Pubsub):
|
||||
if rpc.HasField("senderRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
rpc.senderRecord, "libp2p-peer-record"
|
||||
)
|
||||
# Use the default TTL of 2 hours (7200 seconds)
|
||||
if self.pubsub.host.get_peerstore().consume_peer_record(
|
||||
envelope, 7200
|
||||
):
|
||||
logger.error(
|
||||
"Updating the Certified-Addr-Book was unsuccessful"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified addr book for peer: %s", e
|
||||
)
|
||||
_ = maybe_consume_signed_record(rpc, self.pubsub.host)
|
||||
|
||||
control_message = rpc.control
|
||||
|
||||
@ -278,12 +260,8 @@ class GossipSub(IPubsubRouter, Service):
|
||||
|
||||
# Add the senderRecord of the peer in the RPC msg
|
||||
if isinstance(self.pubsub, Pubsub):
|
||||
envelope = create_signed_peer_record(
|
||||
self.pubsub.host.get_id(),
|
||||
self.pubsub.host.get_addrs(),
|
||||
self.pubsub.host.get_private_key(),
|
||||
)
|
||||
rpc_msg.senderRecord = envelope.marshal_envelope()
|
||||
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
|
||||
rpc_msg.senderRecord = envelope_bytes
|
||||
|
||||
logger.debug("publishing message %s", pubsub_msg)
|
||||
|
||||
@ -854,12 +832,8 @@ class GossipSub(IPubsubRouter, Service):
|
||||
# to the iwant control msg, so we will send a freshly created senderRecord
|
||||
# with the RPC msg
|
||||
if isinstance(self.pubsub, Pubsub):
|
||||
envelope = create_signed_peer_record(
|
||||
self.pubsub.host.get_id(),
|
||||
self.pubsub.host.get_addrs(),
|
||||
self.pubsub.host.get_private_key(),
|
||||
)
|
||||
packet.senderRecord = envelope.marshal_envelope()
|
||||
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
|
||||
packet.senderRecord = envelope_bytes
|
||||
|
||||
packet.publish.extend(msgs_to_forward)
|
||||
|
||||
@ -1019,12 +993,8 @@ class GossipSub(IPubsubRouter, Service):
|
||||
|
||||
# Add the sender's peer-record in the RPC msg
|
||||
if isinstance(self.pubsub, Pubsub):
|
||||
envelope = create_signed_peer_record(
|
||||
self.pubsub.host.get_id(),
|
||||
self.pubsub.host.get_addrs(),
|
||||
self.pubsub.host.get_private_key(),
|
||||
)
|
||||
packet.senderRecord = envelope.marshal_envelope()
|
||||
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
|
||||
packet.senderRecord = envelope_bytes
|
||||
|
||||
packet.control.CopyFrom(control_msg)
|
||||
|
||||
|
||||
@ -50,14 +50,13 @@ from libp2p.network.stream.exceptions import (
|
||||
StreamEOF,
|
||||
StreamReset,
|
||||
)
|
||||
from libp2p.peer.envelope import consume_envelope
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerdata import (
|
||||
PeerDataError,
|
||||
)
|
||||
from libp2p.peer.peerstore import create_signed_peer_record
|
||||
from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record
|
||||
from libp2p.tools.async_service import (
|
||||
Service,
|
||||
)
|
||||
@ -250,12 +249,8 @@ class Pubsub(Service, IPubsub):
|
||||
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
|
||||
)
|
||||
# Add the sender's signedRecord in the RPC message
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
packet.senderRecord = envelope.marshal_envelope()
|
||||
envelope_bytes, bool = env_to_send_in_RPC(self.host)
|
||||
packet.senderRecord = envelope_bytes
|
||||
|
||||
return packet
|
||||
|
||||
@ -275,24 +270,7 @@ class Pubsub(Service, IPubsub):
|
||||
rpc_incoming.ParseFromString(incoming)
|
||||
|
||||
# Process the sender's signed-record if sent
|
||||
if rpc_incoming.HasField("senderRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(
|
||||
rpc_incoming.senderRecord, "libp2p-peer-record"
|
||||
)
|
||||
# Use the default TTL of 2 hours (7200 seconds)
|
||||
if self.host.get_peerstore().consume_peer_record(
|
||||
envelope, 7200
|
||||
):
|
||||
logger.error(
|
||||
"Updating the Certified-Addr-Book was unsuccessful"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error updating the certified addr book for peer: %s", e
|
||||
)
|
||||
_ = maybe_consume_signed_record(rpc_incoming, self.host)
|
||||
|
||||
if rpc_incoming.publish:
|
||||
# deal with RPC.publish
|
||||
@ -604,13 +582,8 @@ class Pubsub(Service, IPubsub):
|
||||
)
|
||||
|
||||
# Add the senderRecord of the peer in the RPC msg
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
packet.senderRecord = envelope.marshal_envelope()
|
||||
|
||||
envelope_bytes, bool = env_to_send_in_RPC(self.host)
|
||||
packet.senderRecord = envelope_bytes
|
||||
# Send out subscribe message to all peers
|
||||
await self.message_all_peers(packet.SerializeToString())
|
||||
|
||||
@ -644,12 +617,8 @@ class Pubsub(Service, IPubsub):
|
||||
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]
|
||||
)
|
||||
# Add the senderRecord of the peer in the RPC msg
|
||||
envelope = create_signed_peer_record(
|
||||
self.host.get_id(),
|
||||
self.host.get_addrs(),
|
||||
self.host.get_private_key(),
|
||||
)
|
||||
packet.senderRecord = envelope.marshal_envelope()
|
||||
envelope_bytes, bool = env_to_send_in_RPC(self.host)
|
||||
packet.senderRecord = envelope_bytes
|
||||
|
||||
# Send out unsubscribe message to all peers
|
||||
await self.message_all_peers(packet.SerializeToString())
|
||||
|
||||
51
libp2p/pubsub/utils.py
Normal file
51
libp2p/pubsub/utils.py
Normal file
@ -0,0 +1,51 @@
|
||||
import logging
|
||||
|
||||
from libp2p.abc import IHost
|
||||
from libp2p.peer.envelope import consume_envelope
|
||||
from libp2p.peer.peerstore import create_signed_peer_record
|
||||
from libp2p.pubsub.pb.rpc_pb2 import RPC
|
||||
|
||||
logger = logging.getLogger("pubsub-example.utils")
|
||||
|
||||
|
||||
def maybe_consume_signed_record(msg: RPC, host: IHost) -> bool:
|
||||
if msg.HasField("senderRecord"):
|
||||
try:
|
||||
# Convert the signed-peer-record(Envelope) from
|
||||
# protobuf bytes
|
||||
envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record")
|
||||
# Use the default TTL of 2 hours (7200 seconds)
|
||||
if not host.get_peerstore().consume_peer_record(envelope, 7200):
|
||||
logger.error("Updating the certified-addr-book was unsuccessful")
|
||||
except Exception as e:
|
||||
logger.error("Error updating the certified addr book for peer: %s", e)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def env_to_send_in_RPC(host: IHost) -> tuple[bytes, bool]:
|
||||
listen_addrs_set = {addr for addr in host.get_addrs()}
|
||||
local_env = host.get_peerstore().get_local_record()
|
||||
|
||||
if local_env is None:
|
||||
# No cached SPR yet -> create one
|
||||
return issue_and_cache_local_record(host), True
|
||||
else:
|
||||
record_addrs_set = local_env._env_addrs_set()
|
||||
if record_addrs_set == listen_addrs_set:
|
||||
# Perfect match -> reuse the cached envelope
|
||||
return local_env.marshal_envelope(), False
|
||||
else:
|
||||
# Addresses changed -> issue a new SPR and cache it
|
||||
return issue_and_cache_local_record(host), True
|
||||
|
||||
|
||||
def issue_and_cache_local_record(host: IHost) -> bytes:
|
||||
env = create_signed_peer_record(
|
||||
host.get_id(),
|
||||
host.get_addrs(),
|
||||
host.get_private_key(),
|
||||
)
|
||||
# Cache it for next time
|
||||
host.get_peerstore().set_local_record(env)
|
||||
return env.marshal_envelope()
|
||||
Reference in New Issue
Block a user