mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Merge pull request #690 from mystical-prog/px-backoff
Peer Exchange and Back Off
This commit is contained in:
@ -66,5 +66,23 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
|
|||||||
return PeerInfo(peer_id, [addr])
|
return PeerInfo(peer_id, [addr])
|
||||||
|
|
||||||
|
|
||||||
|
def peer_info_to_bytes(peer_info: PeerInfo) -> bytes:
|
||||||
|
lines = [str(peer_info.peer_id)] + [str(addr) for addr in peer_info.addrs]
|
||||||
|
return "\n".join(lines).encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def peer_info_from_bytes(data: bytes) -> PeerInfo:
|
||||||
|
try:
|
||||||
|
lines = data.decode("utf-8").splitlines()
|
||||||
|
if not lines:
|
||||||
|
raise InvalidAddrError("no data to decode PeerInfo")
|
||||||
|
|
||||||
|
peer_id = ID.from_base58(lines[0])
|
||||||
|
addrs = [multiaddr.Multiaddr(addr_str) for addr_str in lines[1:]]
|
||||||
|
return PeerInfo(peer_id, addrs)
|
||||||
|
except Exception as e:
|
||||||
|
raise InvalidAddrError(f"failed to decode PeerInfo: {e}")
|
||||||
|
|
||||||
|
|
||||||
class InvalidAddrError(ValueError):
|
class InvalidAddrError(ValueError):
|
||||||
pass
|
pass
|
||||||
|
|||||||
@ -32,6 +32,8 @@ from libp2p.peer.id import (
|
|||||||
)
|
)
|
||||||
from libp2p.peer.peerinfo import (
|
from libp2p.peer.peerinfo import (
|
||||||
PeerInfo,
|
PeerInfo,
|
||||||
|
peer_info_from_bytes,
|
||||||
|
peer_info_to_bytes,
|
||||||
)
|
)
|
||||||
from libp2p.peer.peerstore import (
|
from libp2p.peer.peerstore import (
|
||||||
PERMANENT_ADDR_TTL,
|
PERMANENT_ADDR_TTL,
|
||||||
@ -92,6 +94,12 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
direct_connect_initial_delay: float
|
direct_connect_initial_delay: float
|
||||||
direct_connect_interval: int
|
direct_connect_interval: int
|
||||||
|
|
||||||
|
do_px: bool
|
||||||
|
px_peers_count: int
|
||||||
|
back_off: dict[str, dict[ID, int]]
|
||||||
|
prune_back_off: int
|
||||||
|
unsubscribe_back_off: int
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
protocols: Sequence[TProtocol],
|
protocols: Sequence[TProtocol],
|
||||||
@ -106,6 +114,10 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
heartbeat_interval: int = 120,
|
heartbeat_interval: int = 120,
|
||||||
direct_connect_initial_delay: float = 0.1,
|
direct_connect_initial_delay: float = 0.1,
|
||||||
direct_connect_interval: int = 300,
|
direct_connect_interval: int = 300,
|
||||||
|
do_px: bool = False,
|
||||||
|
px_peers_count: int = 16,
|
||||||
|
prune_back_off: int = 60,
|
||||||
|
unsubscribe_back_off: int = 10,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.protocols = list(protocols)
|
self.protocols = list(protocols)
|
||||||
self.pubsub = None
|
self.pubsub = None
|
||||||
@ -140,6 +152,12 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
self.direct_connect_initial_delay = direct_connect_initial_delay
|
self.direct_connect_initial_delay = direct_connect_initial_delay
|
||||||
self.time_since_last_publish = {}
|
self.time_since_last_publish = {}
|
||||||
|
|
||||||
|
self.do_px = do_px
|
||||||
|
self.px_peers_count = px_peers_count
|
||||||
|
self.back_off = dict()
|
||||||
|
self.prune_back_off = prune_back_off
|
||||||
|
self.unsubscribe_back_off = unsubscribe_back_off
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
self.manager.run_daemon_task(self.heartbeat)
|
self.manager.run_daemon_task(self.heartbeat)
|
||||||
if len(self.direct_peers) > 0:
|
if len(self.direct_peers) > 0:
|
||||||
@ -334,15 +352,22 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
self.mesh[topic] = set()
|
self.mesh[topic] = set()
|
||||||
|
|
||||||
topic_in_fanout: bool = topic in self.fanout
|
topic_in_fanout: bool = topic in self.fanout
|
||||||
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
|
fanout_peers: set[ID] = set()
|
||||||
|
|
||||||
|
if topic_in_fanout:
|
||||||
|
for peer in self.fanout[topic]:
|
||||||
|
if self._check_back_off(peer, topic):
|
||||||
|
continue
|
||||||
|
fanout_peers.add(peer)
|
||||||
|
|
||||||
fanout_size = len(fanout_peers)
|
fanout_size = len(fanout_peers)
|
||||||
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
|
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
|
||||||
# There are less than D peers (let this number be x)
|
# There are less than D peers (let this number be x)
|
||||||
# in the fanout for a topic (or the topic is not in the fanout).
|
# in the fanout for a topic (or the topic is not in the fanout).
|
||||||
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
|
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
|
||||||
if topic in self.pubsub.peer_topics:
|
if self.pubsub is not None and topic in self.pubsub.peer_topics:
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree - fanout_size, fanout_peers
|
topic, self.degree - fanout_size, fanout_peers, True
|
||||||
)
|
)
|
||||||
# Combine fanout peers with selected peers
|
# Combine fanout peers with selected peers
|
||||||
fanout_peers.update(selected_peers)
|
fanout_peers.update(selected_peers)
|
||||||
@ -369,7 +394,8 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
return
|
return
|
||||||
# Notify the peers in mesh[topic] with a PRUNE(topic) message
|
# Notify the peers in mesh[topic] with a PRUNE(topic) message
|
||||||
for peer in self.mesh[topic]:
|
for peer in self.mesh[topic]:
|
||||||
await self.emit_prune(topic, peer)
|
await self.emit_prune(topic, peer, self.do_px, True)
|
||||||
|
self._add_back_off(peer, topic, True)
|
||||||
|
|
||||||
# Forget mesh[topic]
|
# Forget mesh[topic]
|
||||||
self.mesh.pop(topic, None)
|
self.mesh.pop(topic, None)
|
||||||
@ -505,7 +531,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
if num_mesh_peers_in_topic < self.degree_low:
|
if num_mesh_peers_in_topic < self.degree_low:
|
||||||
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
|
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
|
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic], True
|
||||||
)
|
)
|
||||||
|
|
||||||
for peer in selected_peers:
|
for peer in selected_peers:
|
||||||
@ -568,9 +594,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
if len(in_topic_peers) < self.degree:
|
if len(in_topic_peers) < self.degree:
|
||||||
# Select additional peers from peers.gossipsub[topic]
|
# Select additional peers from peers.gossipsub[topic]
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic,
|
topic, self.degree - len(in_topic_peers), in_topic_peers, True
|
||||||
self.degree - len(in_topic_peers),
|
|
||||||
in_topic_peers,
|
|
||||||
)
|
)
|
||||||
# Add the selected peers
|
# Add the selected peers
|
||||||
in_topic_peers.update(selected_peers)
|
in_topic_peers.update(selected_peers)
|
||||||
@ -581,7 +605,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
if msg_ids:
|
if msg_ids:
|
||||||
# Select D peers from peers.gossipsub[topic] excluding current peers
|
# Select D peers from peers.gossipsub[topic] excluding current peers
|
||||||
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree, current_peers
|
topic, self.degree, current_peers, True
|
||||||
)
|
)
|
||||||
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
||||||
for peer in peers_to_emit_ihave_to:
|
for peer in peers_to_emit_ihave_to:
|
||||||
@ -655,7 +679,11 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
return selection
|
return selection
|
||||||
|
|
||||||
def _get_in_topic_gossipsub_peers_from_minus(
|
def _get_in_topic_gossipsub_peers_from_minus(
|
||||||
self, topic: str, num_to_select: int, minus: Iterable[ID]
|
self,
|
||||||
|
topic: str,
|
||||||
|
num_to_select: int,
|
||||||
|
minus: Iterable[ID],
|
||||||
|
backoff_check: bool = False,
|
||||||
) -> list[ID]:
|
) -> list[ID]:
|
||||||
if self.pubsub is None:
|
if self.pubsub is None:
|
||||||
raise NoPubsubAttached
|
raise NoPubsubAttached
|
||||||
@ -664,8 +692,88 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
for peer_id in self.pubsub.peer_topics[topic]
|
for peer_id in self.pubsub.peer_topics[topic]
|
||||||
if self.peer_protocol[peer_id] == PROTOCOL_ID
|
if self.peer_protocol[peer_id] == PROTOCOL_ID
|
||||||
}
|
}
|
||||||
|
if backoff_check:
|
||||||
|
# filter out peers that are in back off for this topic
|
||||||
|
gossipsub_peers_in_topic = {
|
||||||
|
peer_id
|
||||||
|
for peer_id in gossipsub_peers_in_topic
|
||||||
|
if self._check_back_off(peer_id, topic) is False
|
||||||
|
}
|
||||||
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
|
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
|
||||||
|
|
||||||
|
def _add_back_off(
|
||||||
|
self, peer: ID, topic: str, is_unsubscribe: bool, backoff_duration: int = 0
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Add back off for a peer in a topic.
|
||||||
|
:param peer: peer to add back off for
|
||||||
|
:param topic: topic to add back off for
|
||||||
|
:param is_unsubscribe: whether this is an unsubscribe operation
|
||||||
|
:param backoff_duration: duration of back off in seconds, if 0, use default
|
||||||
|
"""
|
||||||
|
if topic not in self.back_off:
|
||||||
|
self.back_off[topic] = dict()
|
||||||
|
|
||||||
|
backoff_till = int(time.time())
|
||||||
|
if backoff_duration > 0:
|
||||||
|
backoff_till += backoff_duration
|
||||||
|
else:
|
||||||
|
if is_unsubscribe:
|
||||||
|
backoff_till += self.unsubscribe_back_off
|
||||||
|
else:
|
||||||
|
backoff_till += self.prune_back_off
|
||||||
|
|
||||||
|
if peer not in self.back_off[topic]:
|
||||||
|
self.back_off[topic][peer] = backoff_till
|
||||||
|
else:
|
||||||
|
self.back_off[topic][peer] = max(self.back_off[topic][peer], backoff_till)
|
||||||
|
|
||||||
|
def _check_back_off(self, peer: ID, topic: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if a peer is in back off for a topic and cleanup expired back off entries.
|
||||||
|
:param peer: peer to check
|
||||||
|
:param topic: topic to check
|
||||||
|
:return: True if the peer is in back off, False otherwise
|
||||||
|
"""
|
||||||
|
if topic not in self.back_off or peer not in self.back_off[topic]:
|
||||||
|
return False
|
||||||
|
if self.back_off[topic].get(peer, 0) > int(time.time()):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
del self.back_off[topic][peer]
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
|
||||||
|
if len(px_peers) > self.px_peers_count:
|
||||||
|
px_peers = px_peers[: self.px_peers_count]
|
||||||
|
|
||||||
|
for peer in px_peers:
|
||||||
|
peer_id: ID = ID(peer.peerID)
|
||||||
|
|
||||||
|
if self.pubsub and peer_id in self.pubsub.peers:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
peer_info = peer_info_from_bytes(peer.signedPeerRecord)
|
||||||
|
try:
|
||||||
|
if self.pubsub is None:
|
||||||
|
raise NoPubsubAttached
|
||||||
|
await self.pubsub.host.connect(peer_info)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"failed to connect to px peer %s: %s",
|
||||||
|
peer_id,
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"failed to parse peer info from px peer %s: %s",
|
||||||
|
peer_id,
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
# RPC handlers
|
# RPC handlers
|
||||||
|
|
||||||
async def handle_ihave(
|
async def handle_ihave(
|
||||||
@ -758,24 +866,46 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
logger.warning(
|
logger.warning(
|
||||||
"GRAFT: ignoring request from direct peer %s", sender_peer_id
|
"GRAFT: ignoring request from direct peer %s", sender_peer_id
|
||||||
)
|
)
|
||||||
await self.emit_prune(topic, sender_peer_id)
|
await self.emit_prune(topic, sender_peer_id, False, False)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if self._check_back_off(sender_peer_id, topic):
|
||||||
|
logger.warning(
|
||||||
|
"GRAFT: ignoring request from %s, back off until %d",
|
||||||
|
sender_peer_id,
|
||||||
|
self.back_off[topic][sender_peer_id],
|
||||||
|
)
|
||||||
|
self._add_back_off(sender_peer_id, topic, False)
|
||||||
|
await self.emit_prune(topic, sender_peer_id, False, False)
|
||||||
|
return
|
||||||
|
|
||||||
if sender_peer_id not in self.mesh[topic]:
|
if sender_peer_id not in self.mesh[topic]:
|
||||||
self.mesh[topic].add(sender_peer_id)
|
self.mesh[topic].add(sender_peer_id)
|
||||||
else:
|
else:
|
||||||
# Respond with PRUNE if not subscribed to the topic
|
# Respond with PRUNE if not subscribed to the topic
|
||||||
await self.emit_prune(topic, sender_peer_id)
|
await self.emit_prune(topic, sender_peer_id, self.do_px, False)
|
||||||
|
|
||||||
async def handle_prune(
|
async def handle_prune(
|
||||||
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
|
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
|
||||||
) -> None:
|
) -> None:
|
||||||
topic: str = prune_msg.topicID
|
topic: str = prune_msg.topicID
|
||||||
|
backoff_till: int = prune_msg.backoff
|
||||||
|
px_peers: list[rpc_pb2.PeerInfo] = []
|
||||||
|
for peer in prune_msg.peers:
|
||||||
|
px_peers.append(peer)
|
||||||
|
|
||||||
# Remove peer from mesh for topic
|
# Remove peer from mesh for topic
|
||||||
if topic in self.mesh:
|
if topic in self.mesh:
|
||||||
|
if backoff_till > 0:
|
||||||
|
self._add_back_off(sender_peer_id, topic, False, backoff_till)
|
||||||
|
else:
|
||||||
|
self._add_back_off(sender_peer_id, topic, False)
|
||||||
|
|
||||||
self.mesh[topic].discard(sender_peer_id)
|
self.mesh[topic].discard(sender_peer_id)
|
||||||
|
|
||||||
|
if px_peers:
|
||||||
|
await self._do_px(px_peers)
|
||||||
|
|
||||||
# RPC emitters
|
# RPC emitters
|
||||||
|
|
||||||
def pack_control_msgs(
|
def pack_control_msgs(
|
||||||
@ -824,15 +954,36 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
|
|
||||||
await self.emit_control_message(control_msg, id)
|
await self.emit_control_message(control_msg, id)
|
||||||
|
|
||||||
async def emit_prune(self, topic: str, id: ID) -> None:
|
async def emit_prune(
|
||||||
|
self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool
|
||||||
|
) -> None:
|
||||||
"""Emit graft message, sent to to_peer, for topic."""
|
"""Emit graft message, sent to to_peer, for topic."""
|
||||||
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
|
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
|
||||||
prune_msg.topicID = topic
|
prune_msg.topicID = topic
|
||||||
|
|
||||||
|
back_off_duration = self.prune_back_off
|
||||||
|
if is_unsubscribe:
|
||||||
|
back_off_duration = self.unsubscribe_back_off
|
||||||
|
|
||||||
|
prune_msg.backoff = back_off_duration
|
||||||
|
|
||||||
|
if do_px:
|
||||||
|
exchange_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
|
topic, self.px_peers_count, [to_peer]
|
||||||
|
)
|
||||||
|
for peer in exchange_peers:
|
||||||
|
if self.pubsub is None:
|
||||||
|
raise NoPubsubAttached
|
||||||
|
peer_info = self.pubsub.host.get_peerstore().peer_info(peer)
|
||||||
|
signed_peer_record: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo()
|
||||||
|
signed_peer_record.peerID = peer.to_bytes()
|
||||||
|
signed_peer_record.signedPeerRecord = peer_info_to_bytes(peer_info)
|
||||||
|
prune_msg.peers.append(signed_peer_record)
|
||||||
|
|
||||||
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
|
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
|
||||||
control_msg.prune.extend([prune_msg])
|
control_msg.prune.extend([prune_msg])
|
||||||
|
|
||||||
await self.emit_control_message(control_msg, id)
|
await self.emit_control_message(control_msg, to_peer)
|
||||||
|
|
||||||
async def emit_control_message(
|
async def emit_control_message(
|
||||||
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID
|
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID
|
||||||
|
|||||||
@ -47,6 +47,13 @@ message ControlGraft {
|
|||||||
|
|
||||||
message ControlPrune {
|
message ControlPrune {
|
||||||
optional string topicID = 1;
|
optional string topicID = 1;
|
||||||
|
repeated PeerInfo peers = 2;
|
||||||
|
optional uint64 backoff = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message PeerInfo {
|
||||||
|
optional bytes peerID = 1;
|
||||||
|
optional bytes signedPeerRecord = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message TopicDescriptor {
|
message TopicDescriptor {
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||||
# source: libp2p/pubsub/pb/rpc.proto
|
# source: rpc.proto
|
||||||
"""Generated protocol buffer code."""
|
"""Generated protocol buffer code."""
|
||||||
from google.protobuf.internal import builder as _builder
|
from google.protobuf.internal import builder as _builder
|
||||||
from google.protobuf import descriptor as _descriptor
|
from google.protobuf import descriptor as _descriptor
|
||||||
@ -13,37 +13,39 @@ _sym_db = _symbol_database.Default()
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
|
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
|
||||||
|
|
||||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
|
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
|
||||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals())
|
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'rpc_pb2', globals())
|
||||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||||
|
|
||||||
DESCRIPTOR._options = None
|
DESCRIPTOR._options = None
|
||||||
_RPC._serialized_start=42
|
_RPC._serialized_start=25
|
||||||
_RPC._serialized_end=222
|
_RPC._serialized_end=205
|
||||||
_RPC_SUBOPTS._serialized_start=177
|
_RPC_SUBOPTS._serialized_start=160
|
||||||
_RPC_SUBOPTS._serialized_end=222
|
_RPC_SUBOPTS._serialized_end=205
|
||||||
_MESSAGE._serialized_start=224
|
_MESSAGE._serialized_start=207
|
||||||
_MESSAGE._serialized_end=329
|
_MESSAGE._serialized_end=312
|
||||||
_CONTROLMESSAGE._serialized_start=332
|
_CONTROLMESSAGE._serialized_start=315
|
||||||
_CONTROLMESSAGE._serialized_end=508
|
_CONTROLMESSAGE._serialized_end=491
|
||||||
_CONTROLIHAVE._serialized_start=510
|
_CONTROLIHAVE._serialized_start=493
|
||||||
_CONTROLIHAVE._serialized_end=561
|
_CONTROLIHAVE._serialized_end=544
|
||||||
_CONTROLIWANT._serialized_start=563
|
_CONTROLIWANT._serialized_start=546
|
||||||
_CONTROLIWANT._serialized_end=597
|
_CONTROLIWANT._serialized_end=580
|
||||||
_CONTROLGRAFT._serialized_start=599
|
_CONTROLGRAFT._serialized_start=582
|
||||||
_CONTROLGRAFT._serialized_end=630
|
_CONTROLGRAFT._serialized_end=613
|
||||||
_CONTROLPRUNE._serialized_start=632
|
_CONTROLPRUNE._serialized_start=615
|
||||||
_CONTROLPRUNE._serialized_end=663
|
_CONTROLPRUNE._serialized_end=699
|
||||||
_TOPICDESCRIPTOR._serialized_start=666
|
_PEERINFO._serialized_start=701
|
||||||
_TOPICDESCRIPTOR._serialized_end=1057
|
_PEERINFO._serialized_end=753
|
||||||
_TOPICDESCRIPTOR_AUTHOPTS._serialized_start=799
|
_TOPICDESCRIPTOR._serialized_start=756
|
||||||
_TOPICDESCRIPTOR_AUTHOPTS._serialized_end=923
|
_TOPICDESCRIPTOR._serialized_end=1147
|
||||||
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=885
|
_TOPICDESCRIPTOR_AUTHOPTS._serialized_start=889
|
||||||
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=923
|
_TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1013
|
||||||
_TOPICDESCRIPTOR_ENCOPTS._serialized_start=926
|
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=975
|
||||||
_TOPICDESCRIPTOR_ENCOPTS._serialized_end=1057
|
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1013
|
||||||
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1014
|
_TOPICDESCRIPTOR_ENCOPTS._serialized_start=1016
|
||||||
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1057
|
_TOPICDESCRIPTOR_ENCOPTS._serialized_end=1147
|
||||||
|
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1104
|
||||||
|
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1147
|
||||||
# @@protoc_insertion_point(module_scope)
|
# @@protoc_insertion_point(module_scope)
|
||||||
|
|||||||
@ -179,17 +179,43 @@ class ControlPrune(google.protobuf.message.Message):
|
|||||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||||
|
|
||||||
TOPICID_FIELD_NUMBER: builtins.int
|
TOPICID_FIELD_NUMBER: builtins.int
|
||||||
|
PEERS_FIELD_NUMBER: builtins.int
|
||||||
|
BACKOFF_FIELD_NUMBER: builtins.int
|
||||||
topicID: builtins.str
|
topicID: builtins.str
|
||||||
|
backoff: builtins.int
|
||||||
|
@property
|
||||||
|
def peers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___PeerInfo]: ...
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
topicID: builtins.str | None = ...,
|
topicID: builtins.str | None = ...,
|
||||||
|
peers: collections.abc.Iterable[global___PeerInfo] | None = ...,
|
||||||
|
backoff: builtins.int | None = ...,
|
||||||
) -> None: ...
|
) -> None: ...
|
||||||
def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ...
|
def HasField(self, field_name: typing.Literal["backoff", b"backoff", "topicID", b"topicID"]) -> builtins.bool: ...
|
||||||
def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ...
|
def ClearField(self, field_name: typing.Literal["backoff", b"backoff", "peers", b"peers", "topicID", b"topicID"]) -> None: ...
|
||||||
|
|
||||||
global___ControlPrune = ControlPrune
|
global___ControlPrune = ControlPrune
|
||||||
|
|
||||||
|
@typing.final
|
||||||
|
class PeerInfo(google.protobuf.message.Message):
|
||||||
|
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||||
|
|
||||||
|
PEERID_FIELD_NUMBER: builtins.int
|
||||||
|
SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int
|
||||||
|
peerID: builtins.bytes
|
||||||
|
signedPeerRecord: builtins.bytes
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
peerID: builtins.bytes | None = ...,
|
||||||
|
signedPeerRecord: builtins.bytes | None = ...,
|
||||||
|
) -> None: ...
|
||||||
|
def HasField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ...
|
||||||
|
def ClearField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> None: ...
|
||||||
|
|
||||||
|
global___PeerInfo = PeerInfo
|
||||||
|
|
||||||
@typing.final
|
@typing.final
|
||||||
class TopicDescriptor(google.protobuf.message.Message):
|
class TopicDescriptor(google.protobuf.message.Message):
|
||||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||||
|
|||||||
@ -26,6 +26,7 @@ LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
|||||||
|
|
||||||
FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID
|
FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID
|
||||||
GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID
|
GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID
|
||||||
|
GOSSIPSUB_PROTOCOL_ID_V1 = gossipsub.PROTOCOL_ID_V11
|
||||||
|
|
||||||
|
|
||||||
class GossipsubParams(NamedTuple):
|
class GossipsubParams(NamedTuple):
|
||||||
@ -40,6 +41,10 @@ class GossipsubParams(NamedTuple):
|
|||||||
heartbeat_interval: float = 0.5
|
heartbeat_interval: float = 0.5
|
||||||
direct_connect_initial_delay: float = 0.1
|
direct_connect_initial_delay: float = 0.1
|
||||||
direct_connect_interval: int = 300
|
direct_connect_interval: int = 300
|
||||||
|
do_px: bool = False
|
||||||
|
px_peers_count: int = 16
|
||||||
|
prune_back_off: int = 60
|
||||||
|
unsubscribe_back_off: int = 10
|
||||||
|
|
||||||
|
|
||||||
GOSSIPSUB_PARAMS = GossipsubParams()
|
GOSSIPSUB_PARAMS = GossipsubParams()
|
||||||
|
|||||||
1
newsfragments/690.feature.rst
Normal file
1
newsfragments/690.feature.rst
Normal file
@ -0,0 +1 @@
|
|||||||
|
added peer exchange and backoff logic as part of Gossipsub v1.1 upgrade
|
||||||
@ -134,7 +134,7 @@ async def test_handle_graft(monkeypatch):
|
|||||||
# check if it is called in `handle_graft`
|
# check if it is called in `handle_graft`
|
||||||
event_emit_prune = trio.Event()
|
event_emit_prune = trio.Event()
|
||||||
|
|
||||||
async def emit_prune(topic, sender_peer_id):
|
async def emit_prune(topic, sender_peer_id, do_px, is_unsubscribe):
|
||||||
event_emit_prune.set()
|
event_emit_prune.set()
|
||||||
await trio.lowlevel.checkpoint()
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
@ -193,7 +193,7 @@ async def test_handle_prune():
|
|||||||
|
|
||||||
# alice emit prune message to bob, alice should be removed
|
# alice emit prune message to bob, alice should be removed
|
||||||
# from bob's mesh peer
|
# from bob's mesh peer
|
||||||
await gossipsubs[index_alice].emit_prune(topic, id_bob)
|
await gossipsubs[index_alice].emit_prune(topic, id_bob, False, False)
|
||||||
# `emit_prune` does not remove bob from alice's mesh peers
|
# `emit_prune` does not remove bob from alice's mesh peers
|
||||||
assert id_bob in gossipsubs[index_alice].mesh[topic]
|
assert id_bob in gossipsubs[index_alice].mesh[topic]
|
||||||
|
|
||||||
@ -292,7 +292,9 @@ async def test_fanout():
|
|||||||
@pytest.mark.trio
|
@pytest.mark.trio
|
||||||
@pytest.mark.slow
|
@pytest.mark.slow
|
||||||
async def test_fanout_maintenance():
|
async def test_fanout_maintenance():
|
||||||
async with PubsubFactory.create_batch_with_gossipsub(10) as pubsubs_gsub:
|
async with PubsubFactory.create_batch_with_gossipsub(
|
||||||
|
10, unsubscribe_back_off=1
|
||||||
|
) as pubsubs_gsub:
|
||||||
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
||||||
num_msgs = 5
|
num_msgs = 5
|
||||||
|
|
||||||
|
|||||||
274
tests/core/pubsub/test_gossipsub_px_and_backoff.py
Normal file
274
tests/core/pubsub/test_gossipsub_px_and_backoff.py
Normal file
@ -0,0 +1,274 @@
|
|||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from libp2p.pubsub.gossipsub import (
|
||||||
|
GossipSub,
|
||||||
|
)
|
||||||
|
from libp2p.tools.utils import (
|
||||||
|
connect,
|
||||||
|
)
|
||||||
|
from tests.utils.factories import (
|
||||||
|
PubsubFactory,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_prune_backoff():
|
||||||
|
async with PubsubFactory.create_batch_with_gossipsub(
|
||||||
|
2, heartbeat_interval=0.5, prune_back_off=2
|
||||||
|
) as pubsubs:
|
||||||
|
gsub0 = pubsubs[0].router
|
||||||
|
gsub1 = pubsubs[1].router
|
||||||
|
assert isinstance(gsub0, GossipSub)
|
||||||
|
assert isinstance(gsub1, GossipSub)
|
||||||
|
host_0 = pubsubs[0].host
|
||||||
|
host_1 = pubsubs[1].host
|
||||||
|
|
||||||
|
topic = "test_prune_backoff"
|
||||||
|
|
||||||
|
# connect hosts
|
||||||
|
await connect(host_0, host_1)
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# both join the topic
|
||||||
|
await gsub0.join(topic)
|
||||||
|
await gsub1.join(topic)
|
||||||
|
await gsub0.emit_graft(topic, host_1.get_id())
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# ensure peer is registered in mesh
|
||||||
|
assert host_0.get_id() in gsub1.mesh[topic]
|
||||||
|
|
||||||
|
# prune host_1 from gsub0's mesh
|
||||||
|
await gsub0.emit_prune(topic, host_1.get_id(), False, False)
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# host_0 should not be in gsub1's mesh
|
||||||
|
assert host_0.get_id() not in gsub1.mesh[topic]
|
||||||
|
|
||||||
|
# try to graft again immediately (should be rejected due to backoff)
|
||||||
|
await gsub0.emit_graft(topic, host_1.get_id())
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
assert host_0.get_id() not in gsub1.mesh[topic], (
|
||||||
|
"peer should be backoffed and not re-added"
|
||||||
|
)
|
||||||
|
|
||||||
|
# try to graft again (should succeed after backoff)
|
||||||
|
await trio.sleep(2)
|
||||||
|
await gsub0.emit_graft(topic, host_1.get_id())
|
||||||
|
await trio.sleep(1)
|
||||||
|
assert host_0.get_id() in gsub1.mesh[topic], (
|
||||||
|
"peer should be able to rejoin after backoff"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_unsubscribe_backoff():
|
||||||
|
async with PubsubFactory.create_batch_with_gossipsub(
|
||||||
|
2, heartbeat_interval=1, prune_back_off=1, unsubscribe_back_off=2
|
||||||
|
) as pubsubs:
|
||||||
|
gsub0 = pubsubs[0].router
|
||||||
|
gsub1 = pubsubs[1].router
|
||||||
|
assert isinstance(gsub0, GossipSub)
|
||||||
|
assert isinstance(gsub1, GossipSub)
|
||||||
|
host_0 = pubsubs[0].host
|
||||||
|
host_1 = pubsubs[1].host
|
||||||
|
|
||||||
|
topic = "test_unsubscribe_backoff"
|
||||||
|
|
||||||
|
# connect hosts
|
||||||
|
await connect(host_0, host_1)
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# both join the topic
|
||||||
|
await gsub0.join(topic)
|
||||||
|
await gsub1.join(topic)
|
||||||
|
await gsub0.emit_graft(topic, host_1.get_id())
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# ensure peer is registered in mesh
|
||||||
|
assert host_0.get_id() in gsub1.mesh[topic]
|
||||||
|
|
||||||
|
# host_1 unsubscribes from the topic
|
||||||
|
await gsub1.leave(topic)
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
assert topic not in gsub1.mesh
|
||||||
|
|
||||||
|
# host_1 resubscribes to the topic
|
||||||
|
await gsub1.join(topic)
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
assert topic in gsub1.mesh
|
||||||
|
|
||||||
|
# try to graft again immediately (should be rejected due to backoff)
|
||||||
|
await gsub0.emit_graft(topic, host_1.get_id())
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
assert host_0.get_id() not in gsub1.mesh[topic], (
|
||||||
|
"peer should be backoffed and not re-added"
|
||||||
|
)
|
||||||
|
|
||||||
|
# try to graft again (should succeed after backoff)
|
||||||
|
await trio.sleep(1)
|
||||||
|
await gsub0.emit_graft(topic, host_1.get_id())
|
||||||
|
await trio.sleep(1)
|
||||||
|
assert host_0.get_id() in gsub1.mesh[topic], (
|
||||||
|
"peer should be able to rejoin after backoff"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_peer_exchange():
|
||||||
|
async with PubsubFactory.create_batch_with_gossipsub(
|
||||||
|
3,
|
||||||
|
heartbeat_interval=0.5,
|
||||||
|
do_px=True,
|
||||||
|
px_peers_count=1,
|
||||||
|
) as pubsubs:
|
||||||
|
gsub0 = pubsubs[0].router
|
||||||
|
gsub1 = pubsubs[1].router
|
||||||
|
gsub2 = pubsubs[2].router
|
||||||
|
assert isinstance(gsub0, GossipSub)
|
||||||
|
assert isinstance(gsub1, GossipSub)
|
||||||
|
assert isinstance(gsub2, GossipSub)
|
||||||
|
host_0 = pubsubs[0].host
|
||||||
|
host_1 = pubsubs[1].host
|
||||||
|
host_2 = pubsubs[2].host
|
||||||
|
|
||||||
|
topic = "test_peer_exchange"
|
||||||
|
|
||||||
|
# connect hosts
|
||||||
|
await connect(host_1, host_0)
|
||||||
|
await connect(host_1, host_2)
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# all join the topic and 0 <-> 1 and 1 <-> 2 graft
|
||||||
|
await pubsubs[1].subscribe(topic)
|
||||||
|
await pubsubs[0].subscribe(topic)
|
||||||
|
await pubsubs[2].subscribe(topic)
|
||||||
|
await gsub1.emit_graft(topic, host_0.get_id())
|
||||||
|
await gsub1.emit_graft(topic, host_2.get_id())
|
||||||
|
await gsub0.emit_graft(topic, host_1.get_id())
|
||||||
|
await gsub2.emit_graft(topic, host_1.get_id())
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
# ensure peer is registered in mesh
|
||||||
|
assert host_0.get_id() in gsub1.mesh[topic]
|
||||||
|
assert host_2.get_id() in gsub1.mesh[topic]
|
||||||
|
assert host_2.get_id() not in gsub0.mesh[topic]
|
||||||
|
|
||||||
|
# host_1 unsubscribes from the topic
|
||||||
|
await gsub1.leave(topic)
|
||||||
|
await trio.sleep(1) # Wait for heartbeat to update mesh
|
||||||
|
assert topic not in gsub1.mesh
|
||||||
|
|
||||||
|
# Wait for gsub0 to graft host_2 into its mesh via PX
|
||||||
|
await trio.sleep(1)
|
||||||
|
assert host_2.get_id() in gsub0.mesh[topic]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_topics_are_isolated():
|
||||||
|
async with PubsubFactory.create_batch_with_gossipsub(
|
||||||
|
2, heartbeat_interval=0.5, prune_back_off=2
|
||||||
|
) as pubsubs:
|
||||||
|
gsub0 = pubsubs[0].router
|
||||||
|
gsub1 = pubsubs[1].router
|
||||||
|
assert isinstance(gsub0, GossipSub)
|
||||||
|
assert isinstance(gsub1, GossipSub)
|
||||||
|
host_0 = pubsubs[0].host
|
||||||
|
host_1 = pubsubs[1].host
|
||||||
|
|
||||||
|
topic1 = "test_prune_backoff"
|
||||||
|
topic2 = "test_prune_backoff2"
|
||||||
|
|
||||||
|
# connect hosts
|
||||||
|
await connect(host_0, host_1)
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# both peers join both the topics
|
||||||
|
await gsub0.join(topic1)
|
||||||
|
await gsub1.join(topic1)
|
||||||
|
await gsub0.join(topic2)
|
||||||
|
await gsub1.join(topic2)
|
||||||
|
await gsub0.emit_graft(topic1, host_1.get_id())
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# ensure topic1 for peer is registered in mesh
|
||||||
|
assert host_0.get_id() in gsub1.mesh[topic1]
|
||||||
|
|
||||||
|
# prune topic1 for host_1 from gsub0's mesh
|
||||||
|
await gsub0.emit_prune(topic1, host_1.get_id(), False, False)
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
|
# topic1 for host_0 should not be in gsub1's mesh
|
||||||
|
assert host_0.get_id() not in gsub1.mesh[topic1]
|
||||||
|
|
||||||
|
# try to regraft topic1 and graft new topic2
|
||||||
|
await gsub0.emit_graft(topic1, host_1.get_id())
|
||||||
|
await gsub0.emit_graft(topic2, host_1.get_id())
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
assert host_0.get_id() not in gsub1.mesh[topic1], (
|
||||||
|
"peer should be backoffed and not re-added"
|
||||||
|
)
|
||||||
|
assert host_0.get_id() in gsub1.mesh[topic2], (
|
||||||
|
"peer should be able to join a different topic"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_stress_churn():
|
||||||
|
NUM_PEERS = 5
|
||||||
|
CHURN_CYCLES = 30
|
||||||
|
TOPIC = "stress_churn_topic"
|
||||||
|
PRUNE_BACKOFF = 1
|
||||||
|
HEARTBEAT_INTERVAL = 0.2
|
||||||
|
|
||||||
|
async with PubsubFactory.create_batch_with_gossipsub(
|
||||||
|
NUM_PEERS,
|
||||||
|
heartbeat_interval=HEARTBEAT_INTERVAL,
|
||||||
|
prune_back_off=PRUNE_BACKOFF,
|
||||||
|
) as pubsubs:
|
||||||
|
routers: list[GossipSub] = []
|
||||||
|
for ps in pubsubs:
|
||||||
|
assert isinstance(ps.router, GossipSub)
|
||||||
|
routers.append(ps.router)
|
||||||
|
hosts = [ps.host for ps in pubsubs]
|
||||||
|
|
||||||
|
# fully connect all peers
|
||||||
|
for i in range(NUM_PEERS):
|
||||||
|
for j in range(i + 1, NUM_PEERS):
|
||||||
|
await connect(hosts[i], hosts[j])
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
# all peers join the topic
|
||||||
|
for router in routers:
|
||||||
|
await router.join(TOPIC)
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
# rapid join/prune cycles
|
||||||
|
for cycle in range(CHURN_CYCLES):
|
||||||
|
for i, router in enumerate(routers):
|
||||||
|
# prune all other peers from this router's mesh
|
||||||
|
for j, peer_host in enumerate(hosts):
|
||||||
|
if i != j:
|
||||||
|
await router.emit_prune(TOPIC, peer_host.get_id(), False, False)
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
for i, router in enumerate(routers):
|
||||||
|
# graft all other peers back
|
||||||
|
for j, peer_host in enumerate(hosts):
|
||||||
|
if i != j:
|
||||||
|
await router.emit_graft(TOPIC, peer_host.get_id())
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
# wait for backoff entries to expire and cleanup
|
||||||
|
await trio.sleep(PRUNE_BACKOFF * 2)
|
||||||
|
|
||||||
|
# check that the backoff table is not unbounded
|
||||||
|
for router in routers:
|
||||||
|
# backoff is a dict: topic -> peer -> expiry
|
||||||
|
backoff = getattr(router, "back_off", None)
|
||||||
|
assert backoff is not None, "router missing backoff table"
|
||||||
|
# only a small number of entries should remain (ideally 0)
|
||||||
|
total_entries = sum(len(peers) for peers in backoff.values())
|
||||||
|
assert total_entries < NUM_PEERS * 2, (
|
||||||
|
f"backoff table grew too large: {total_entries} entries"
|
||||||
|
)
|
||||||
@ -443,6 +443,10 @@ class GossipsubFactory(factory.Factory):
|
|||||||
heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval
|
heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval
|
||||||
direct_connect_initial_delay = GOSSIPSUB_PARAMS.direct_connect_initial_delay
|
direct_connect_initial_delay = GOSSIPSUB_PARAMS.direct_connect_initial_delay
|
||||||
direct_connect_interval = GOSSIPSUB_PARAMS.direct_connect_interval
|
direct_connect_interval = GOSSIPSUB_PARAMS.direct_connect_interval
|
||||||
|
do_px = GOSSIPSUB_PARAMS.do_px
|
||||||
|
px_peers_count = GOSSIPSUB_PARAMS.px_peers_count
|
||||||
|
prune_back_off = GOSSIPSUB_PARAMS.prune_back_off
|
||||||
|
unsubscribe_back_off = GOSSIPSUB_PARAMS.unsubscribe_back_off
|
||||||
|
|
||||||
|
|
||||||
class PubsubFactory(factory.Factory):
|
class PubsubFactory(factory.Factory):
|
||||||
@ -568,6 +572,10 @@ class PubsubFactory(factory.Factory):
|
|||||||
heartbeat_initial_delay: float = GOSSIPSUB_PARAMS.heartbeat_initial_delay,
|
heartbeat_initial_delay: float = GOSSIPSUB_PARAMS.heartbeat_initial_delay,
|
||||||
direct_connect_initial_delay: float = GOSSIPSUB_PARAMS.direct_connect_initial_delay, # noqa: E501
|
direct_connect_initial_delay: float = GOSSIPSUB_PARAMS.direct_connect_initial_delay, # noqa: E501
|
||||||
direct_connect_interval: int = GOSSIPSUB_PARAMS.direct_connect_interval,
|
direct_connect_interval: int = GOSSIPSUB_PARAMS.direct_connect_interval,
|
||||||
|
do_px: bool = GOSSIPSUB_PARAMS.do_px,
|
||||||
|
px_peers_count: int = GOSSIPSUB_PARAMS.px_peers_count,
|
||||||
|
prune_back_off: int = GOSSIPSUB_PARAMS.prune_back_off,
|
||||||
|
unsubscribe_back_off: int = GOSSIPSUB_PARAMS.unsubscribe_back_off,
|
||||||
security_protocol: TProtocol | None = None,
|
security_protocol: TProtocol | None = None,
|
||||||
muxer_opt: TMuxerOptions | None = None,
|
muxer_opt: TMuxerOptions | None = None,
|
||||||
msg_id_constructor: None
|
msg_id_constructor: None
|
||||||
@ -588,6 +596,10 @@ class PubsubFactory(factory.Factory):
|
|||||||
heartbeat_interval=heartbeat_interval,
|
heartbeat_interval=heartbeat_interval,
|
||||||
direct_connect_initial_delay=direct_connect_initial_delay,
|
direct_connect_initial_delay=direct_connect_initial_delay,
|
||||||
direct_connect_interval=direct_connect_interval,
|
direct_connect_interval=direct_connect_interval,
|
||||||
|
do_px=do_px,
|
||||||
|
px_peers_count=px_peers_count,
|
||||||
|
prune_back_off=prune_back_off,
|
||||||
|
unsubscribe_back_off=unsubscribe_back_off,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
gossipsubs = GossipsubFactory.create_batch(
|
gossipsubs = GossipsubFactory.create_batch(
|
||||||
@ -602,6 +614,10 @@ class PubsubFactory(factory.Factory):
|
|||||||
heartbeat_initial_delay=heartbeat_initial_delay,
|
heartbeat_initial_delay=heartbeat_initial_delay,
|
||||||
direct_connect_initial_delay=direct_connect_initial_delay,
|
direct_connect_initial_delay=direct_connect_initial_delay,
|
||||||
direct_connect_interval=direct_connect_interval,
|
direct_connect_interval=direct_connect_interval,
|
||||||
|
do_px=do_px,
|
||||||
|
px_peers_count=px_peers_count,
|
||||||
|
prune_back_off=prune_back_off,
|
||||||
|
unsubscribe_back_off=unsubscribe_back_off,
|
||||||
)
|
)
|
||||||
|
|
||||||
async with cls._create_batch_with_router(
|
async with cls._create_batch_with_router(
|
||||||
|
|||||||
Reference in New Issue
Block a user