Merge branch 'main' into feature/mDNS

This commit is contained in:
Manu Sheel Gupta
2025-06-29 09:43:03 -07:00
committed by GitHub
20 changed files with 924 additions and 64 deletions

View File

@ -24,6 +24,12 @@ async def main():
insecure_transport = InsecureTransport(
# local_key_pair: The key pair used for libp2p identity
local_key_pair=key_pair,
# secure_bytes_provider: Optional function to generate secure random bytes
# (defaults to secrets.token_bytes)
secure_bytes_provider=None, # Use default implementation
# peerstore: Optional peerstore to store peer IDs and public keys
# (defaults to None)
peerstore=None,
)
# Create a security options dictionary mapping protocol ID to transport

View File

@ -203,7 +203,9 @@ def new_swarm(
key_pair, noise_privkey=noise_key_pair.private_key
),
TProtocol(secio.ID): secio.Transport(key_pair),
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(
key_pair, peerstore=peerstore_opt
),
}
# Use given muxer preference if provided, otherwise use global default

View File

@ -66,5 +66,23 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
return PeerInfo(peer_id, [addr])
def peer_info_to_bytes(peer_info: PeerInfo) -> bytes:
lines = [str(peer_info.peer_id)] + [str(addr) for addr in peer_info.addrs]
return "\n".join(lines).encode("utf-8")
def peer_info_from_bytes(data: bytes) -> PeerInfo:
try:
lines = data.decode("utf-8").splitlines()
if not lines:
raise InvalidAddrError("no data to decode PeerInfo")
peer_id = ID.from_base58(lines[0])
addrs = [multiaddr.Multiaddr(addr_str) for addr_str in lines[1:]]
return PeerInfo(peer_id, addrs)
except Exception as e:
raise InvalidAddrError(f"failed to decode PeerInfo: {e}")
class InvalidAddrError(ValueError):
pass

View File

@ -32,6 +32,8 @@ from libp2p.peer.id import (
)
from libp2p.peer.peerinfo import (
PeerInfo,
peer_info_from_bytes,
peer_info_to_bytes,
)
from libp2p.peer.peerstore import (
PERMANENT_ADDR_TTL,
@ -92,6 +94,12 @@ class GossipSub(IPubsubRouter, Service):
direct_connect_initial_delay: float
direct_connect_interval: int
do_px: bool
px_peers_count: int
back_off: dict[str, dict[ID, int]]
prune_back_off: int
unsubscribe_back_off: int
def __init__(
self,
protocols: Sequence[TProtocol],
@ -106,6 +114,10 @@ class GossipSub(IPubsubRouter, Service):
heartbeat_interval: int = 120,
direct_connect_initial_delay: float = 0.1,
direct_connect_interval: int = 300,
do_px: bool = False,
px_peers_count: int = 16,
prune_back_off: int = 60,
unsubscribe_back_off: int = 10,
) -> None:
self.protocols = list(protocols)
self.pubsub = None
@ -140,6 +152,12 @@ class GossipSub(IPubsubRouter, Service):
self.direct_connect_initial_delay = direct_connect_initial_delay
self.time_since_last_publish = {}
self.do_px = do_px
self.px_peers_count = px_peers_count
self.back_off = dict()
self.prune_back_off = prune_back_off
self.unsubscribe_back_off = unsubscribe_back_off
async def run(self) -> None:
self.manager.run_daemon_task(self.heartbeat)
if len(self.direct_peers) > 0:
@ -251,7 +269,6 @@ class GossipSub(IPubsubRouter, Service):
stream = self.pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
@ -334,15 +351,22 @@ class GossipSub(IPubsubRouter, Service):
self.mesh[topic] = set()
topic_in_fanout: bool = topic in self.fanout
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_peers: set[ID] = set()
if topic_in_fanout:
for peer in self.fanout[topic]:
if self._check_back_off(peer, topic):
continue
fanout_peers.add(peer)
fanout_size = len(fanout_peers)
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
# There are less than D peers (let this number be x)
# in the fanout for a topic (or the topic is not in the fanout).
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
if topic in self.pubsub.peer_topics:
if self.pubsub is not None and topic in self.pubsub.peer_topics:
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - fanout_size, fanout_peers
topic, self.degree - fanout_size, fanout_peers, True
)
# Combine fanout peers with selected peers
fanout_peers.update(selected_peers)
@ -369,7 +393,8 @@ class GossipSub(IPubsubRouter, Service):
return
# Notify the peers in mesh[topic] with a PRUNE(topic) message
for peer in self.mesh[topic]:
await self.emit_prune(topic, peer)
await self.emit_prune(topic, peer, self.do_px, True)
self._add_back_off(peer, topic, True)
# Forget mesh[topic]
self.mesh.pop(topic, None)
@ -459,8 +484,8 @@ class GossipSub(IPubsubRouter, Service):
self.fanout_heartbeat()
# Get the peers to send IHAVE to
peers_to_gossip = self.gossip_heartbeat()
# Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and
# send it
# Pack(piggyback) GRAFT, PRUNE and IHAVE for the same peer into
# one control message and send it
await self._emit_control_msgs(
peers_to_graft, peers_to_prune, peers_to_gossip
)
@ -505,7 +530,7 @@ class GossipSub(IPubsubRouter, Service):
if num_mesh_peers_in_topic < self.degree_low:
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic], True
)
for peer in selected_peers:
@ -568,9 +593,7 @@ class GossipSub(IPubsubRouter, Service):
if len(in_topic_peers) < self.degree:
# Select additional peers from peers.gossipsub[topic]
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree - len(in_topic_peers),
in_topic_peers,
topic, self.degree - len(in_topic_peers), in_topic_peers, True
)
# Add the selected peers
in_topic_peers.update(selected_peers)
@ -581,7 +604,7 @@ class GossipSub(IPubsubRouter, Service):
if msg_ids:
# Select D peers from peers.gossipsub[topic] excluding current peers
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, current_peers
topic, self.degree, current_peers, True
)
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
for peer in peers_to_emit_ihave_to:
@ -655,7 +678,11 @@ class GossipSub(IPubsubRouter, Service):
return selection
def _get_in_topic_gossipsub_peers_from_minus(
self, topic: str, num_to_select: int, minus: Iterable[ID]
self,
topic: str,
num_to_select: int,
minus: Iterable[ID],
backoff_check: bool = False,
) -> list[ID]:
if self.pubsub is None:
raise NoPubsubAttached
@ -664,8 +691,88 @@ class GossipSub(IPubsubRouter, Service):
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == PROTOCOL_ID
}
if backoff_check:
# filter out peers that are in back off for this topic
gossipsub_peers_in_topic = {
peer_id
for peer_id in gossipsub_peers_in_topic
if self._check_back_off(peer_id, topic) is False
}
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
def _add_back_off(
self, peer: ID, topic: str, is_unsubscribe: bool, backoff_duration: int = 0
) -> None:
"""
Add back off for a peer in a topic.
:param peer: peer to add back off for
:param topic: topic to add back off for
:param is_unsubscribe: whether this is an unsubscribe operation
:param backoff_duration: duration of back off in seconds, if 0, use default
"""
if topic not in self.back_off:
self.back_off[topic] = dict()
backoff_till = int(time.time())
if backoff_duration > 0:
backoff_till += backoff_duration
else:
if is_unsubscribe:
backoff_till += self.unsubscribe_back_off
else:
backoff_till += self.prune_back_off
if peer not in self.back_off[topic]:
self.back_off[topic][peer] = backoff_till
else:
self.back_off[topic][peer] = max(self.back_off[topic][peer], backoff_till)
def _check_back_off(self, peer: ID, topic: str) -> bool:
"""
Check if a peer is in back off for a topic and cleanup expired back off entries.
:param peer: peer to check
:param topic: topic to check
:return: True if the peer is in back off, False otherwise
"""
if topic not in self.back_off or peer not in self.back_off[topic]:
return False
if self.back_off[topic].get(peer, 0) > int(time.time()):
return True
else:
del self.back_off[topic][peer]
return False
async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
if len(px_peers) > self.px_peers_count:
px_peers = px_peers[: self.px_peers_count]
for peer in px_peers:
peer_id: ID = ID(peer.peerID)
if self.pubsub and peer_id in self.pubsub.peers:
continue
try:
peer_info = peer_info_from_bytes(peer.signedPeerRecord)
try:
if self.pubsub is None:
raise NoPubsubAttached
await self.pubsub.host.connect(peer_info)
except Exception as e:
logger.warning(
"failed to connect to px peer %s: %s",
peer_id,
e,
)
continue
except Exception as e:
logger.warning(
"failed to parse peer info from px peer %s: %s",
peer_id,
e,
)
continue
# RPC handlers
async def handle_ihave(
@ -758,24 +865,46 @@ class GossipSub(IPubsubRouter, Service):
logger.warning(
"GRAFT: ignoring request from direct peer %s", sender_peer_id
)
await self.emit_prune(topic, sender_peer_id)
await self.emit_prune(topic, sender_peer_id, False, False)
return
if self._check_back_off(sender_peer_id, topic):
logger.warning(
"GRAFT: ignoring request from %s, back off until %d",
sender_peer_id,
self.back_off[topic][sender_peer_id],
)
self._add_back_off(sender_peer_id, topic, False)
await self.emit_prune(topic, sender_peer_id, False, False)
return
if sender_peer_id not in self.mesh[topic]:
self.mesh[topic].add(sender_peer_id)
else:
# Respond with PRUNE if not subscribed to the topic
await self.emit_prune(topic, sender_peer_id)
await self.emit_prune(topic, sender_peer_id, self.do_px, False)
async def handle_prune(
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
) -> None:
topic: str = prune_msg.topicID
backoff_till: int = prune_msg.backoff
px_peers: list[rpc_pb2.PeerInfo] = []
for peer in prune_msg.peers:
px_peers.append(peer)
# Remove peer from mesh for topic
if topic in self.mesh:
if backoff_till > 0:
self._add_back_off(sender_peer_id, topic, False, backoff_till)
else:
self._add_back_off(sender_peer_id, topic, False)
self.mesh[topic].discard(sender_peer_id)
if px_peers:
await self._do_px(px_peers)
# RPC emitters
def pack_control_msgs(
@ -824,15 +953,36 @@ class GossipSub(IPubsubRouter, Service):
await self.emit_control_message(control_msg, id)
async def emit_prune(self, topic: str, id: ID) -> None:
async def emit_prune(
self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool
) -> None:
"""Emit graft message, sent to to_peer, for topic."""
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
prune_msg.topicID = topic
back_off_duration = self.prune_back_off
if is_unsubscribe:
back_off_duration = self.unsubscribe_back_off
prune_msg.backoff = back_off_duration
if do_px:
exchange_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.px_peers_count, [to_peer]
)
for peer in exchange_peers:
if self.pubsub is None:
raise NoPubsubAttached
peer_info = self.pubsub.host.get_peerstore().peer_info(peer)
signed_peer_record: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo()
signed_peer_record.peerID = peer.to_bytes()
signed_peer_record.signedPeerRecord = peer_info_to_bytes(peer_info)
prune_msg.peers.append(signed_peer_record)
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.prune.extend([prune_msg])
await self.emit_control_message(control_msg, id)
await self.emit_control_message(control_msg, to_peer)
async def emit_control_message(
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID

View File

@ -47,6 +47,13 @@ message ControlGraft {
message ControlPrune {
optional string topicID = 1;
repeated PeerInfo peers = 2;
optional uint64 backoff = 3;
}
message PeerInfo {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
}
message TopicDescriptor {

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: libp2p/pubsub/pb/rpc.proto
# source: rpc.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
@ -13,37 +13,39 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'rpc_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_RPC._serialized_start=42
_RPC._serialized_end=222
_RPC_SUBOPTS._serialized_start=177
_RPC_SUBOPTS._serialized_end=222
_MESSAGE._serialized_start=224
_MESSAGE._serialized_end=329
_CONTROLMESSAGE._serialized_start=332
_CONTROLMESSAGE._serialized_end=508
_CONTROLIHAVE._serialized_start=510
_CONTROLIHAVE._serialized_end=561
_CONTROLIWANT._serialized_start=563
_CONTROLIWANT._serialized_end=597
_CONTROLGRAFT._serialized_start=599
_CONTROLGRAFT._serialized_end=630
_CONTROLPRUNE._serialized_start=632
_CONTROLPRUNE._serialized_end=663
_TOPICDESCRIPTOR._serialized_start=666
_TOPICDESCRIPTOR._serialized_end=1057
_TOPICDESCRIPTOR_AUTHOPTS._serialized_start=799
_TOPICDESCRIPTOR_AUTHOPTS._serialized_end=923
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=885
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=923
_TOPICDESCRIPTOR_ENCOPTS._serialized_start=926
_TOPICDESCRIPTOR_ENCOPTS._serialized_end=1057
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1014
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1057
_RPC._serialized_start=25
_RPC._serialized_end=205
_RPC_SUBOPTS._serialized_start=160
_RPC_SUBOPTS._serialized_end=205
_MESSAGE._serialized_start=207
_MESSAGE._serialized_end=312
_CONTROLMESSAGE._serialized_start=315
_CONTROLMESSAGE._serialized_end=491
_CONTROLIHAVE._serialized_start=493
_CONTROLIHAVE._serialized_end=544
_CONTROLIWANT._serialized_start=546
_CONTROLIWANT._serialized_end=580
_CONTROLGRAFT._serialized_start=582
_CONTROLGRAFT._serialized_end=613
_CONTROLPRUNE._serialized_start=615
_CONTROLPRUNE._serialized_end=699
_PEERINFO._serialized_start=701
_PEERINFO._serialized_end=753
_TOPICDESCRIPTOR._serialized_start=756
_TOPICDESCRIPTOR._serialized_end=1147
_TOPICDESCRIPTOR_AUTHOPTS._serialized_start=889
_TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1013
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=975
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1013
_TOPICDESCRIPTOR_ENCOPTS._serialized_start=1016
_TOPICDESCRIPTOR_ENCOPTS._serialized_end=1147
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1104
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1147
# @@protoc_insertion_point(module_scope)

View File

@ -179,17 +179,43 @@ class ControlPrune(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TOPICID_FIELD_NUMBER: builtins.int
PEERS_FIELD_NUMBER: builtins.int
BACKOFF_FIELD_NUMBER: builtins.int
topicID: builtins.str
backoff: builtins.int
@property
def peers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___PeerInfo]: ...
def __init__(
self,
*,
topicID: builtins.str | None = ...,
peers: collections.abc.Iterable[global___PeerInfo] | None = ...,
backoff: builtins.int | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ...
def HasField(self, field_name: typing.Literal["backoff", b"backoff", "topicID", b"topicID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["backoff", b"backoff", "peers", b"peers", "topicID", b"topicID"]) -> None: ...
global___ControlPrune = ControlPrune
@typing.final
class PeerInfo(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PEERID_FIELD_NUMBER: builtins.int
SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int
peerID: builtins.bytes
signedPeerRecord: builtins.bytes
def __init__(
self,
*,
peerID: builtins.bytes | None = ...,
signedPeerRecord: builtins.bytes | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> None: ...
global___PeerInfo = PeerInfo
@typing.final
class TopicDescriptor(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

View File

@ -682,19 +682,18 @@ class Pubsub(Service, IPubsub):
# TODO: Implement throttle on async validators
if len(async_topic_validators) > 0:
# TODO: Use a better pattern
final_result: bool = True
# Appends to lists are thread safe in CPython
results = []
async def run_async_validator(func: AsyncValidatorFn) -> None:
nonlocal final_result
result = await func(msg_forwarder, msg)
final_result = final_result and result
results.append(result)
async with trio.open_nursery() as nursery:
for async_validator in async_topic_validators:
nursery.start_soon(run_async_validator, async_validator)
if not final_result:
if not all(results):
raise ValidationError(f"Validation failed for msg={msg}")
async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:

View File

@ -1,4 +1,7 @@
from collections.abc import Callable
from libp2p.abc import (
IPeerStore,
IRawConnection,
ISecureConn,
)
@ -6,6 +9,7 @@ from libp2p.crypto.exceptions import (
MissingDeserializerError,
)
from libp2p.crypto.keys import (
KeyPair,
PrivateKey,
PublicKey,
)
@ -30,11 +34,15 @@ from libp2p.network.connection.exceptions import (
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerstore import (
PeerStoreError,
)
from libp2p.security.base_session import (
BaseSession,
)
from libp2p.security.base_transport import (
BaseSecureTransport,
default_secure_bytes_provider,
)
from libp2p.security.exceptions import (
HandshakeFailure,
@ -102,6 +110,7 @@ async def run_handshake(
conn: IRawConnection,
is_initiator: bool,
remote_peer_id: ID | None,
peerstore: IPeerStore | None = None,
) -> ISecureConn:
"""Raise `HandshakeFailure` when handshake failed."""
msg = make_exchange_message(local_private_key.get_public_key())
@ -164,7 +173,14 @@ async def run_handshake(
conn=conn,
)
# TODO: Store `pubkey` and `peer_id` to `PeerStore`
# Store `pubkey` and `peer_id` to `PeerStore`
if peerstore is not None:
try:
peerstore.add_pubkey(received_peer_id, received_pubkey)
except PeerStoreError:
# If peer ID and pubkey don't match, it would have already been caught above
# This might happen if the peer is already in the store
pass
return secure_conn
@ -175,6 +191,18 @@ class InsecureTransport(BaseSecureTransport):
transport does not add any additional security.
"""
def __init__(
self,
local_key_pair: KeyPair,
secure_bytes_provider: Callable[[int], bytes] | None = None,
peerstore: IPeerStore | None = None,
) -> None:
# If secure_bytes_provider is None, use the default one
if secure_bytes_provider is None:
secure_bytes_provider = default_secure_bytes_provider
super().__init__(local_key_pair, secure_bytes_provider)
self.peerstore = peerstore
async def secure_inbound(self, conn: IRawConnection) -> ISecureConn:
"""
Secure the connection, either locally or by communicating with opposing
@ -183,8 +211,9 @@ class InsecureTransport(BaseSecureTransport):
:return: secure connection object (that implements secure_conn_interface)
"""
# For inbound connections, we don't know the remote peer ID yet
return await run_handshake(
self.local_peer, self.local_private_key, conn, False, None
self.local_peer, self.local_private_key, conn, False, None, self.peerstore
)
async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn:
@ -195,7 +224,7 @@ class InsecureTransport(BaseSecureTransport):
:return: secure connection object (that implements secure_conn_interface)
"""
return await run_handshake(
self.local_peer, self.local_private_key, conn, True, peer_id
self.local_peer, self.local_private_key, conn, True, peer_id, self.peerstore
)

View File

@ -493,7 +493,7 @@ class Yamux(IMuxedConn):
f"type={typ}, flags={flags}, stream_id={stream_id},"
f"length={length}"
)
if typ == TYPE_DATA and flags & FLAG_SYN:
if (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE) and flags & FLAG_SYN:
async with self.streams_lock:
if stream_id not in self.streams:
stream = YamuxStream(stream_id, self, False)

View File

@ -26,6 +26,7 @@ LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID
GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID
GOSSIPSUB_PROTOCOL_ID_V1 = gossipsub.PROTOCOL_ID_V11
class GossipsubParams(NamedTuple):
@ -40,6 +41,10 @@ class GossipsubParams(NamedTuple):
heartbeat_interval: float = 0.5
direct_connect_initial_delay: float = 0.1
direct_connect_interval: int = 300
do_px: bool = False
px_peers_count: int = 16
prune_back_off: int = 60
unsubscribe_back_off: int = 10
GOSSIPSUB_PARAMS = GossipsubParams()

View File

@ -0,0 +1,7 @@
Store public key and peer ID in peerstore during handshake
Modified the InsecureTransport class to accept an optional peerstore parameter and updated the handshake process to store the received public key and peer ID in the peerstore when available.
Added test cases to verify:
1. The peerstore remains unchanged when handshake fails due to peer ID mismatch
2. The handshake correctly adds a public key to a peer ID that already exists in the peerstore but doesn't have a public key yet

View File

@ -0,0 +1 @@
added peer exchange and backoff logic as part of Gossipsub v1.1 upgrade

View File

@ -0,0 +1 @@
align stream creation logic with yamux specification

View File

@ -0,0 +1 @@
Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior.

View File

@ -134,7 +134,7 @@ async def test_handle_graft(monkeypatch):
# check if it is called in `handle_graft`
event_emit_prune = trio.Event()
async def emit_prune(topic, sender_peer_id):
async def emit_prune(topic, sender_peer_id, do_px, is_unsubscribe):
event_emit_prune.set()
await trio.lowlevel.checkpoint()
@ -193,7 +193,7 @@ async def test_handle_prune():
# alice emit prune message to bob, alice should be removed
# from bob's mesh peer
await gossipsubs[index_alice].emit_prune(topic, id_bob)
await gossipsubs[index_alice].emit_prune(topic, id_bob, False, False)
# `emit_prune` does not remove bob from alice's mesh peers
assert id_bob in gossipsubs[index_alice].mesh[topic]
@ -292,7 +292,9 @@ async def test_fanout():
@pytest.mark.trio
@pytest.mark.slow
async def test_fanout_maintenance():
async with PubsubFactory.create_batch_with_gossipsub(10) as pubsubs_gsub:
async with PubsubFactory.create_batch_with_gossipsub(
10, unsubscribe_back_off=1
) as pubsubs_gsub:
hosts = [pubsub.host for pubsub in pubsubs_gsub]
num_msgs = 5

View File

@ -0,0 +1,274 @@
import pytest
import trio
from libp2p.pubsub.gossipsub import (
GossipSub,
)
from libp2p.tools.utils import (
connect,
)
from tests.utils.factories import (
PubsubFactory,
)
@pytest.mark.trio
async def test_prune_backoff():
async with PubsubFactory.create_batch_with_gossipsub(
2, heartbeat_interval=0.5, prune_back_off=2
) as pubsubs:
gsub0 = pubsubs[0].router
gsub1 = pubsubs[1].router
assert isinstance(gsub0, GossipSub)
assert isinstance(gsub1, GossipSub)
host_0 = pubsubs[0].host
host_1 = pubsubs[1].host
topic = "test_prune_backoff"
# connect hosts
await connect(host_0, host_1)
await trio.sleep(0.5)
# both join the topic
await gsub0.join(topic)
await gsub1.join(topic)
await gsub0.emit_graft(topic, host_1.get_id())
await trio.sleep(0.5)
# ensure peer is registered in mesh
assert host_0.get_id() in gsub1.mesh[topic]
# prune host_1 from gsub0's mesh
await gsub0.emit_prune(topic, host_1.get_id(), False, False)
await trio.sleep(0.5)
# host_0 should not be in gsub1's mesh
assert host_0.get_id() not in gsub1.mesh[topic]
# try to graft again immediately (should be rejected due to backoff)
await gsub0.emit_graft(topic, host_1.get_id())
await trio.sleep(0.5)
assert host_0.get_id() not in gsub1.mesh[topic], (
"peer should be backoffed and not re-added"
)
# try to graft again (should succeed after backoff)
await trio.sleep(2)
await gsub0.emit_graft(topic, host_1.get_id())
await trio.sleep(1)
assert host_0.get_id() in gsub1.mesh[topic], (
"peer should be able to rejoin after backoff"
)
@pytest.mark.trio
async def test_unsubscribe_backoff():
async with PubsubFactory.create_batch_with_gossipsub(
2, heartbeat_interval=1, prune_back_off=1, unsubscribe_back_off=2
) as pubsubs:
gsub0 = pubsubs[0].router
gsub1 = pubsubs[1].router
assert isinstance(gsub0, GossipSub)
assert isinstance(gsub1, GossipSub)
host_0 = pubsubs[0].host
host_1 = pubsubs[1].host
topic = "test_unsubscribe_backoff"
# connect hosts
await connect(host_0, host_1)
await trio.sleep(0.5)
# both join the topic
await gsub0.join(topic)
await gsub1.join(topic)
await gsub0.emit_graft(topic, host_1.get_id())
await trio.sleep(0.5)
# ensure peer is registered in mesh
assert host_0.get_id() in gsub1.mesh[topic]
# host_1 unsubscribes from the topic
await gsub1.leave(topic)
await trio.sleep(0.5)
assert topic not in gsub1.mesh
# host_1 resubscribes to the topic
await gsub1.join(topic)
await trio.sleep(0.5)
assert topic in gsub1.mesh
# try to graft again immediately (should be rejected due to backoff)
await gsub0.emit_graft(topic, host_1.get_id())
await trio.sleep(0.5)
assert host_0.get_id() not in gsub1.mesh[topic], (
"peer should be backoffed and not re-added"
)
# try to graft again (should succeed after backoff)
await trio.sleep(1)
await gsub0.emit_graft(topic, host_1.get_id())
await trio.sleep(1)
assert host_0.get_id() in gsub1.mesh[topic], (
"peer should be able to rejoin after backoff"
)
@pytest.mark.trio
async def test_peer_exchange():
async with PubsubFactory.create_batch_with_gossipsub(
3,
heartbeat_interval=0.5,
do_px=True,
px_peers_count=1,
) as pubsubs:
gsub0 = pubsubs[0].router
gsub1 = pubsubs[1].router
gsub2 = pubsubs[2].router
assert isinstance(gsub0, GossipSub)
assert isinstance(gsub1, GossipSub)
assert isinstance(gsub2, GossipSub)
host_0 = pubsubs[0].host
host_1 = pubsubs[1].host
host_2 = pubsubs[2].host
topic = "test_peer_exchange"
# connect hosts
await connect(host_1, host_0)
await connect(host_1, host_2)
await trio.sleep(0.5)
# all join the topic and 0 <-> 1 and 1 <-> 2 graft
await pubsubs[1].subscribe(topic)
await pubsubs[0].subscribe(topic)
await pubsubs[2].subscribe(topic)
await gsub1.emit_graft(topic, host_0.get_id())
await gsub1.emit_graft(topic, host_2.get_id())
await gsub0.emit_graft(topic, host_1.get_id())
await gsub2.emit_graft(topic, host_1.get_id())
await trio.sleep(1)
# ensure peer is registered in mesh
assert host_0.get_id() in gsub1.mesh[topic]
assert host_2.get_id() in gsub1.mesh[topic]
assert host_2.get_id() not in gsub0.mesh[topic]
# host_1 unsubscribes from the topic
await gsub1.leave(topic)
await trio.sleep(1) # Wait for heartbeat to update mesh
assert topic not in gsub1.mesh
# Wait for gsub0 to graft host_2 into its mesh via PX
await trio.sleep(1)
assert host_2.get_id() in gsub0.mesh[topic]
@pytest.mark.trio
async def test_topics_are_isolated():
async with PubsubFactory.create_batch_with_gossipsub(
2, heartbeat_interval=0.5, prune_back_off=2
) as pubsubs:
gsub0 = pubsubs[0].router
gsub1 = pubsubs[1].router
assert isinstance(gsub0, GossipSub)
assert isinstance(gsub1, GossipSub)
host_0 = pubsubs[0].host
host_1 = pubsubs[1].host
topic1 = "test_prune_backoff"
topic2 = "test_prune_backoff2"
# connect hosts
await connect(host_0, host_1)
await trio.sleep(0.5)
# both peers join both the topics
await gsub0.join(topic1)
await gsub1.join(topic1)
await gsub0.join(topic2)
await gsub1.join(topic2)
await gsub0.emit_graft(topic1, host_1.get_id())
await trio.sleep(0.5)
# ensure topic1 for peer is registered in mesh
assert host_0.get_id() in gsub1.mesh[topic1]
# prune topic1 for host_1 from gsub0's mesh
await gsub0.emit_prune(topic1, host_1.get_id(), False, False)
await trio.sleep(0.5)
# topic1 for host_0 should not be in gsub1's mesh
assert host_0.get_id() not in gsub1.mesh[topic1]
# try to regraft topic1 and graft new topic2
await gsub0.emit_graft(topic1, host_1.get_id())
await gsub0.emit_graft(topic2, host_1.get_id())
await trio.sleep(0.5)
assert host_0.get_id() not in gsub1.mesh[topic1], (
"peer should be backoffed and not re-added"
)
assert host_0.get_id() in gsub1.mesh[topic2], (
"peer should be able to join a different topic"
)
@pytest.mark.trio
async def test_stress_churn():
NUM_PEERS = 5
CHURN_CYCLES = 30
TOPIC = "stress_churn_topic"
PRUNE_BACKOFF = 1
HEARTBEAT_INTERVAL = 0.2
async with PubsubFactory.create_batch_with_gossipsub(
NUM_PEERS,
heartbeat_interval=HEARTBEAT_INTERVAL,
prune_back_off=PRUNE_BACKOFF,
) as pubsubs:
routers: list[GossipSub] = []
for ps in pubsubs:
assert isinstance(ps.router, GossipSub)
routers.append(ps.router)
hosts = [ps.host for ps in pubsubs]
# fully connect all peers
for i in range(NUM_PEERS):
for j in range(i + 1, NUM_PEERS):
await connect(hosts[i], hosts[j])
await trio.sleep(1)
# all peers join the topic
for router in routers:
await router.join(TOPIC)
await trio.sleep(1)
# rapid join/prune cycles
for cycle in range(CHURN_CYCLES):
for i, router in enumerate(routers):
# prune all other peers from this router's mesh
for j, peer_host in enumerate(hosts):
if i != j:
await router.emit_prune(TOPIC, peer_host.get_id(), False, False)
await trio.sleep(0.1)
for i, router in enumerate(routers):
# graft all other peers back
for j, peer_host in enumerate(hosts):
if i != j:
await router.emit_graft(TOPIC, peer_host.get_id())
await trio.sleep(0.1)
# wait for backoff entries to expire and cleanup
await trio.sleep(PRUNE_BACKOFF * 2)
# check that the backoff table is not unbounded
for router in routers:
# backoff is a dict: topic -> peer -> expiry
backoff = getattr(router, "back_off", None)
assert backoff is not None, "router missing backoff table"
# only a small number of entries should remain (ideally 0)
total_entries = sum(len(peers) for peers in backoff.values())
assert total_entries < NUM_PEERS * 2, (
f"backoff table grew too large: {total_entries} entries"
)

View File

@ -0,0 +1,314 @@
import pytest
import trio
from trio.testing import memory_stream_pair
from libp2p.abc import IRawConnection
from libp2p.crypto.rsa import create_new_key_pair
from libp2p.peer.id import ID
from libp2p.peer.peerdata import PeerData
from libp2p.peer.peerstore import PeerStore
from libp2p.security.exceptions import HandshakeFailure
from libp2p.security.insecure.transport import InsecureTransport
# Adapter class to bridge between trio streams and libp2p raw connections
class TrioStreamAdapter(IRawConnection):
def __init__(self, send_stream, receive_stream, is_initiator: bool = False):
self.send_stream = send_stream
self.receive_stream = receive_stream
self.is_initiator = is_initiator
async def write(self, data: bytes) -> None:
await self.send_stream.send_all(data)
async def read(self, n: int | None = None) -> bytes:
if n is None or n == -1:
raise ValueError("Reading unbounded not supported")
return await self.receive_stream.receive_some(n)
async def close(self) -> None:
await self.send_stream.aclose()
await self.receive_stream.aclose()
def get_remote_address(self) -> tuple[str, int] | None:
# Return None since this is a test adapter without real network info
return None
@pytest.mark.trio
async def test_insecure_transport_stores_pubkey_in_peerstore():
"""
Test that InsecureTransport stores the pubkey and peerid in
peerstore during handshake.
"""
# Create key pairs for both sides
local_key_pair = create_new_key_pair()
remote_key_pair = create_new_key_pair()
# Create peer IDs
remote_peer_id = ID.from_pubkey(remote_key_pair.public_key)
# Create peerstore
peerstore = PeerStore()
# Create memory streams for communication
local_send, remote_receive = memory_stream_pair()
remote_send, local_receive = memory_stream_pair()
# Create adapters
local_stream = TrioStreamAdapter(local_send, local_receive, is_initiator=True)
remote_stream = TrioStreamAdapter(remote_send, remote_receive, is_initiator=False)
# Create transports
local_transport = InsecureTransport(local_key_pair, peerstore=peerstore)
remote_transport = InsecureTransport(remote_key_pair, peerstore=None)
# Run handshake
async def run_local_handshake(nursery_results):
with trio.move_on_after(5):
local_conn = await local_transport.secure_outbound(
local_stream, remote_peer_id
)
nursery_results["local"] = local_conn
async def run_remote_handshake(nursery_results):
with trio.move_on_after(5):
remote_conn = await remote_transport.secure_inbound(remote_stream)
nursery_results["remote"] = remote_conn
nursery_results = {}
async with trio.open_nursery() as nursery:
nursery.start_soon(run_local_handshake, nursery_results)
nursery.start_soon(run_remote_handshake, nursery_results)
await trio.sleep(0.1) # Give tasks a chance to finish
local_conn = nursery_results.get("local")
remote_conn = nursery_results.get("remote")
assert local_conn is not None, "Local handshake failed"
assert remote_conn is not None, "Remote handshake failed"
# Verify that the remote peer ID is in the peerstore
assert remote_peer_id in peerstore.peer_ids()
# Verify that the public key was stored and matches
stored_pubkey = peerstore.pubkey(remote_peer_id)
assert stored_pubkey is not None
assert stored_pubkey.serialize() == remote_key_pair.public_key.serialize()
@pytest.mark.trio
async def test_insecure_transport_without_peerstore():
"""
Test that InsecureTransport works correctly
without a peerstore.
"""
# Create key pairs for both sides
local_key_pair = create_new_key_pair()
remote_key_pair = create_new_key_pair()
# Create peer IDs
remote_peer_id = ID.from_pubkey(remote_key_pair.public_key)
# Create memory streams for communication
local_send, remote_receive = memory_stream_pair()
remote_send, local_receive = memory_stream_pair()
# Create adapters
local_stream = TrioStreamAdapter(local_send, local_receive, is_initiator=True)
remote_stream = TrioStreamAdapter(remote_send, remote_receive, is_initiator=False)
# Create transports without peerstore
local_transport = InsecureTransport(local_key_pair, peerstore=None)
remote_transport = InsecureTransport(remote_key_pair, peerstore=None)
# Run handshake
async def run_local_handshake(nursery_results):
with trio.move_on_after(5):
local_conn = await local_transport.secure_outbound(
local_stream, remote_peer_id
)
nursery_results["local"] = local_conn
async def run_remote_handshake(nursery_results):
with trio.move_on_after(5):
remote_conn = await remote_transport.secure_inbound(remote_stream)
nursery_results["remote"] = remote_conn
nursery_results = {}
async with trio.open_nursery() as nursery:
nursery.start_soon(run_local_handshake, nursery_results)
nursery.start_soon(run_remote_handshake, nursery_results)
await trio.sleep(0.1) # Give tasks a chance to finish
local_conn = nursery_results.get("local")
remote_conn = nursery_results.get("remote")
# Verify that handshake still works without a peerstore
assert local_conn is not None, "Local handshake failed"
assert remote_conn is not None, "Remote handshake failed"
@pytest.mark.trio
async def test_peerstore_unchanged_when_handshake_fails():
"""
Test that the peerstore remains unchanged if the handshake fails
due to a peer ID mismatch.
"""
# Create key pairs for both sides
local_key_pair = create_new_key_pair()
remote_key_pair = create_new_key_pair()
# Create a third key pair to cause a mismatch
mismatch_key_pair = create_new_key_pair()
# Create peer IDs
remote_peer_id = ID.from_pubkey(remote_key_pair.public_key)
mismatch_peer_id = ID.from_pubkey(mismatch_key_pair.public_key)
# Create peerstore and add some initial data to verify it stays unchanged
peerstore = PeerStore()
# Store some initial data in peerstore to verify it remains unchanged
initial_key_pair = create_new_key_pair()
initial_peer_id = ID.from_pubkey(initial_key_pair.public_key)
peerstore.add_pubkey(initial_peer_id, initial_key_pair.public_key)
# Remember the initial state of the peerstore
initial_peer_ids = set(peerstore.peer_ids())
# Create memory streams for communication
local_send, remote_receive = memory_stream_pair()
remote_send, local_receive = memory_stream_pair()
# Create adapters
local_stream = TrioStreamAdapter(local_send, local_receive, is_initiator=True)
remote_stream = TrioStreamAdapter(remote_send, remote_receive, is_initiator=False)
# Create transports
local_transport = InsecureTransport(local_key_pair, peerstore=peerstore)
remote_transport = InsecureTransport(remote_key_pair, peerstore=None)
# Run handshake with mismatched peer_id
# (expecting remote_peer_id but sending mismatch_peer_id to cause a failure)
async def run_local_handshake(nursery_results):
with trio.move_on_after(5):
try:
# Pass mismatch_peer_id instead of remote_peer_id
# to cause a handshake failure
local_conn = await local_transport.secure_outbound(
local_stream, mismatch_peer_id
)
nursery_results["local"] = local_conn
except HandshakeFailure:
nursery_results["local_error"] = True
async def run_remote_handshake(nursery_results):
with trio.move_on_after(5):
try:
remote_conn = await remote_transport.secure_inbound(remote_stream)
nursery_results["remote"] = remote_conn
except HandshakeFailure:
nursery_results["remote_error"] = True
nursery_results = {}
async with trio.open_nursery() as nursery:
nursery.start_soon(run_local_handshake, nursery_results)
nursery.start_soon(run_remote_handshake, nursery_results)
await trio.sleep(0.1)
# Verify that at least one side encountered an error
assert "local_error" in nursery_results or "remote_error" in nursery_results, (
"Expected handshake to fail due to peer ID mismatch"
)
# Verify that the peerstore remains unchanged
current_peer_ids = set(peerstore.peer_ids())
assert current_peer_ids == initial_peer_ids, (
"Peerstore should remain unchanged when handshake fails"
)
# Verify that neither the remote_peer_id nor mismatch_peer_id was added
assert remote_peer_id not in peerstore.peer_ids(), (
"Remote peer ID should not be added on handshake failure"
)
assert mismatch_peer_id not in peerstore.peer_ids(), (
"Mismatch peer ID should not be added on handshake failure"
)
@pytest.mark.trio
async def test_handshake_adds_pubkey_to_existing_peer():
"""
Test that when a peer ID already exists in the peerstore but without
a public key, the handshake correctly adds the public key.
This tests the case where we might have a peer ID from another source
(like a routing table) but don't yet have its public key.
"""
# Create key pairs for both sides
local_key_pair = create_new_key_pair()
remote_key_pair = create_new_key_pair()
# Create peer IDs
remote_peer_id = ID.from_pubkey(remote_key_pair.public_key)
# Create peerstore and add the peer ID without a public key
peerstore = PeerStore()
# Add the peer ID to the peerstore without its public key
# (adding an address for the peer, which creates the peer entry)
# This simulates having discovered a peer through DHT or other means
# without having its public key yet
peerstore.peer_data_map[remote_peer_id] = PeerData()
# Verify initial state - the peer ID should exist but without a public key
assert remote_peer_id in peerstore.peer_ids()
with pytest.raises(Exception):
peerstore.pubkey(remote_peer_id)
# Create memory streams for communication
local_send, remote_receive = memory_stream_pair()
remote_send, local_receive = memory_stream_pair()
# Create adapters
local_stream = TrioStreamAdapter(local_send, local_receive, is_initiator=True)
remote_stream = TrioStreamAdapter(remote_send, remote_receive, is_initiator=False)
# Create transports
local_transport = InsecureTransport(local_key_pair, peerstore=peerstore)
remote_transport = InsecureTransport(remote_key_pair, peerstore=None)
# Run handshake
async def run_local_handshake(nursery_results):
with trio.move_on_after(5):
local_conn = await local_transport.secure_outbound(
local_stream, remote_peer_id
)
nursery_results["local"] = local_conn
async def run_remote_handshake(nursery_results):
with trio.move_on_after(5):
remote_conn = await remote_transport.secure_inbound(remote_stream)
nursery_results["remote"] = remote_conn
nursery_results = {}
async with trio.open_nursery() as nursery:
nursery.start_soon(run_local_handshake, nursery_results)
nursery.start_soon(run_remote_handshake, nursery_results)
await trio.sleep(0.1) # Give tasks a chance to finish
local_conn = nursery_results.get("local")
remote_conn = nursery_results.get("remote")
# Verify that the handshake succeeded
assert local_conn is not None, "Local handshake failed"
assert remote_conn is not None, "Remote handshake failed"
# Verify that the peer ID is still in the peerstore
assert remote_peer_id in peerstore.peer_ids()
# Verify that the public key was added
stored_pubkey = peerstore.pubkey(remote_peer_id)
assert stored_pubkey is not None
assert stored_pubkey.serialize() == remote_key_pair.public_key.serialize()

View File

@ -79,7 +79,7 @@ async def secure_conn_pair(key_pair, peer_id):
client_rw = TrioStreamAdapter(client_send, client_receive, is_initiator=True)
server_rw = TrioStreamAdapter(server_send, server_receive, is_initiator=False)
insecure_transport = InsecureTransport(key_pair)
insecure_transport = InsecureTransport(key_pair, peerstore=None)
async def run_outbound(nursery_results):
with trio.move_on_after(5):

View File

@ -161,8 +161,8 @@ def noise_handshake_payload_factory() -> NoiseHandshakePayload:
)
def plaintext_transport_factory(key_pair: KeyPair) -> ISecureTransport:
return InsecureTransport(key_pair)
def plaintext_transport_factory(key_pair: KeyPair, peerstore=None) -> ISecureTransport:
return InsecureTransport(key_pair, peerstore=peerstore)
def secio_transport_factory(key_pair: KeyPair) -> ISecureTransport:
@ -443,6 +443,10 @@ class GossipsubFactory(factory.Factory):
heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval
direct_connect_initial_delay = GOSSIPSUB_PARAMS.direct_connect_initial_delay
direct_connect_interval = GOSSIPSUB_PARAMS.direct_connect_interval
do_px = GOSSIPSUB_PARAMS.do_px
px_peers_count = GOSSIPSUB_PARAMS.px_peers_count
prune_back_off = GOSSIPSUB_PARAMS.prune_back_off
unsubscribe_back_off = GOSSIPSUB_PARAMS.unsubscribe_back_off
class PubsubFactory(factory.Factory):
@ -568,6 +572,10 @@ class PubsubFactory(factory.Factory):
heartbeat_initial_delay: float = GOSSIPSUB_PARAMS.heartbeat_initial_delay,
direct_connect_initial_delay: float = GOSSIPSUB_PARAMS.direct_connect_initial_delay, # noqa: E501
direct_connect_interval: int = GOSSIPSUB_PARAMS.direct_connect_interval,
do_px: bool = GOSSIPSUB_PARAMS.do_px,
px_peers_count: int = GOSSIPSUB_PARAMS.px_peers_count,
prune_back_off: int = GOSSIPSUB_PARAMS.prune_back_off,
unsubscribe_back_off: int = GOSSIPSUB_PARAMS.unsubscribe_back_off,
security_protocol: TProtocol | None = None,
muxer_opt: TMuxerOptions | None = None,
msg_id_constructor: None
@ -588,6 +596,10 @@ class PubsubFactory(factory.Factory):
heartbeat_interval=heartbeat_interval,
direct_connect_initial_delay=direct_connect_initial_delay,
direct_connect_interval=direct_connect_interval,
do_px=do_px,
px_peers_count=px_peers_count,
prune_back_off=prune_back_off,
unsubscribe_back_off=unsubscribe_back_off,
)
else:
gossipsubs = GossipsubFactory.create_batch(
@ -602,6 +614,10 @@ class PubsubFactory(factory.Factory):
heartbeat_initial_delay=heartbeat_initial_delay,
direct_connect_initial_delay=direct_connect_initial_delay,
direct_connect_interval=direct_connect_interval,
do_px=do_px,
px_peers_count=px_peers_count,
prune_back_off=prune_back_off,
unsubscribe_back_off=unsubscribe_back_off,
)
async with cls._create_batch_with_router(