Merge branch 'main' into piggyback-gossipsub

This commit is contained in:
Manu Sheel Gupta
2025-06-26 06:44:03 -07:00
committed by GitHub
10 changed files with 550 additions and 48 deletions

View File

@ -32,6 +32,8 @@ from libp2p.peer.id import (
)
from libp2p.peer.peerinfo import (
PeerInfo,
peer_info_from_bytes,
peer_info_to_bytes,
)
from libp2p.peer.peerstore import (
PERMANENT_ADDR_TTL,
@ -92,6 +94,12 @@ class GossipSub(IPubsubRouter, Service):
direct_connect_initial_delay: float
direct_connect_interval: int
do_px: bool
px_peers_count: int
back_off: dict[str, dict[ID, int]]
prune_back_off: int
unsubscribe_back_off: int
def __init__(
self,
protocols: Sequence[TProtocol],
@ -106,6 +114,10 @@ class GossipSub(IPubsubRouter, Service):
heartbeat_interval: int = 120,
direct_connect_initial_delay: float = 0.1,
direct_connect_interval: int = 300,
do_px: bool = False,
px_peers_count: int = 16,
prune_back_off: int = 60,
unsubscribe_back_off: int = 10,
) -> None:
self.protocols = list(protocols)
self.pubsub = None
@ -140,6 +152,12 @@ class GossipSub(IPubsubRouter, Service):
self.direct_connect_initial_delay = direct_connect_initial_delay
self.time_since_last_publish = {}
self.do_px = do_px
self.px_peers_count = px_peers_count
self.back_off = dict()
self.prune_back_off = prune_back_off
self.unsubscribe_back_off = unsubscribe_back_off
async def run(self) -> None:
self.manager.run_daemon_task(self.heartbeat)
if len(self.direct_peers) > 0:
@ -333,15 +351,22 @@ class GossipSub(IPubsubRouter, Service):
self.mesh[topic] = set()
topic_in_fanout: bool = topic in self.fanout
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_peers: set[ID] = set()
if topic_in_fanout:
for peer in self.fanout[topic]:
if self._check_back_off(peer, topic):
continue
fanout_peers.add(peer)
fanout_size = len(fanout_peers)
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
# There are less than D peers (let this number be x)
# in the fanout for a topic (or the topic is not in the fanout).
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
if topic in self.pubsub.peer_topics:
if self.pubsub is not None and topic in self.pubsub.peer_topics:
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - fanout_size, fanout_peers
topic, self.degree - fanout_size, fanout_peers, True
)
# Combine fanout peers with selected peers
fanout_peers.update(selected_peers)
@ -368,7 +393,8 @@ class GossipSub(IPubsubRouter, Service):
return
# Notify the peers in mesh[topic] with a PRUNE(topic) message
for peer in self.mesh[topic]:
await self.emit_prune(topic, peer)
await self.emit_prune(topic, peer, self.do_px, True)
self._add_back_off(peer, topic, True)
# Forget mesh[topic]
self.mesh.pop(topic, None)
@ -504,7 +530,7 @@ class GossipSub(IPubsubRouter, Service):
if num_mesh_peers_in_topic < self.degree_low:
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic], True
)
for peer in selected_peers:
@ -567,9 +593,7 @@ class GossipSub(IPubsubRouter, Service):
if len(in_topic_peers) < self.degree:
# Select additional peers from peers.gossipsub[topic]
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree - len(in_topic_peers),
in_topic_peers,
topic, self.degree - len(in_topic_peers), in_topic_peers, True
)
# Add the selected peers
in_topic_peers.update(selected_peers)
@ -580,7 +604,7 @@ class GossipSub(IPubsubRouter, Service):
if msg_ids:
# Select D peers from peers.gossipsub[topic] excluding current peers
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, current_peers
topic, self.degree, current_peers, True
)
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
for peer in peers_to_emit_ihave_to:
@ -654,7 +678,11 @@ class GossipSub(IPubsubRouter, Service):
return selection
def _get_in_topic_gossipsub_peers_from_minus(
self, topic: str, num_to_select: int, minus: Iterable[ID]
self,
topic: str,
num_to_select: int,
minus: Iterable[ID],
backoff_check: bool = False,
) -> list[ID]:
if self.pubsub is None:
raise NoPubsubAttached
@ -663,8 +691,88 @@ class GossipSub(IPubsubRouter, Service):
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == PROTOCOL_ID
}
if backoff_check:
# filter out peers that are in back off for this topic
gossipsub_peers_in_topic = {
peer_id
for peer_id in gossipsub_peers_in_topic
if self._check_back_off(peer_id, topic) is False
}
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
def _add_back_off(
self, peer: ID, topic: str, is_unsubscribe: bool, backoff_duration: int = 0
) -> None:
"""
Add back off for a peer in a topic.
:param peer: peer to add back off for
:param topic: topic to add back off for
:param is_unsubscribe: whether this is an unsubscribe operation
:param backoff_duration: duration of back off in seconds, if 0, use default
"""
if topic not in self.back_off:
self.back_off[topic] = dict()
backoff_till = int(time.time())
if backoff_duration > 0:
backoff_till += backoff_duration
else:
if is_unsubscribe:
backoff_till += self.unsubscribe_back_off
else:
backoff_till += self.prune_back_off
if peer not in self.back_off[topic]:
self.back_off[topic][peer] = backoff_till
else:
self.back_off[topic][peer] = max(self.back_off[topic][peer], backoff_till)
def _check_back_off(self, peer: ID, topic: str) -> bool:
"""
Check if a peer is in back off for a topic and cleanup expired back off entries.
:param peer: peer to check
:param topic: topic to check
:return: True if the peer is in back off, False otherwise
"""
if topic not in self.back_off or peer not in self.back_off[topic]:
return False
if self.back_off[topic].get(peer, 0) > int(time.time()):
return True
else:
del self.back_off[topic][peer]
return False
async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
if len(px_peers) > self.px_peers_count:
px_peers = px_peers[: self.px_peers_count]
for peer in px_peers:
peer_id: ID = ID(peer.peerID)
if self.pubsub and peer_id in self.pubsub.peers:
continue
try:
peer_info = peer_info_from_bytes(peer.signedPeerRecord)
try:
if self.pubsub is None:
raise NoPubsubAttached
await self.pubsub.host.connect(peer_info)
except Exception as e:
logger.warning(
"failed to connect to px peer %s: %s",
peer_id,
e,
)
continue
except Exception as e:
logger.warning(
"failed to parse peer info from px peer %s: %s",
peer_id,
e,
)
continue
# RPC handlers
async def handle_ihave(
@ -757,24 +865,46 @@ class GossipSub(IPubsubRouter, Service):
logger.warning(
"GRAFT: ignoring request from direct peer %s", sender_peer_id
)
await self.emit_prune(topic, sender_peer_id)
await self.emit_prune(topic, sender_peer_id, False, False)
return
if self._check_back_off(sender_peer_id, topic):
logger.warning(
"GRAFT: ignoring request from %s, back off until %d",
sender_peer_id,
self.back_off[topic][sender_peer_id],
)
self._add_back_off(sender_peer_id, topic, False)
await self.emit_prune(topic, sender_peer_id, False, False)
return
if sender_peer_id not in self.mesh[topic]:
self.mesh[topic].add(sender_peer_id)
else:
# Respond with PRUNE if not subscribed to the topic
await self.emit_prune(topic, sender_peer_id)
await self.emit_prune(topic, sender_peer_id, self.do_px, False)
async def handle_prune(
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
) -> None:
topic: str = prune_msg.topicID
backoff_till: int = prune_msg.backoff
px_peers: list[rpc_pb2.PeerInfo] = []
for peer in prune_msg.peers:
px_peers.append(peer)
# Remove peer from mesh for topic
if topic in self.mesh:
if backoff_till > 0:
self._add_back_off(sender_peer_id, topic, False, backoff_till)
else:
self._add_back_off(sender_peer_id, topic, False)
self.mesh[topic].discard(sender_peer_id)
if px_peers:
await self._do_px(px_peers)
# RPC emitters
def pack_control_msgs(
@ -823,15 +953,36 @@ class GossipSub(IPubsubRouter, Service):
await self.emit_control_message(control_msg, id)
async def emit_prune(self, topic: str, id: ID) -> None:
async def emit_prune(
self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool
) -> None:
"""Emit graft message, sent to to_peer, for topic."""
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
prune_msg.topicID = topic
back_off_duration = self.prune_back_off
if is_unsubscribe:
back_off_duration = self.unsubscribe_back_off
prune_msg.backoff = back_off_duration
if do_px:
exchange_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.px_peers_count, [to_peer]
)
for peer in exchange_peers:
if self.pubsub is None:
raise NoPubsubAttached
peer_info = self.pubsub.host.get_peerstore().peer_info(peer)
signed_peer_record: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo()
signed_peer_record.peerID = peer.to_bytes()
signed_peer_record.signedPeerRecord = peer_info_to_bytes(peer_info)
prune_msg.peers.append(signed_peer_record)
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.prune.extend([prune_msg])
await self.emit_control_message(control_msg, id)
await self.emit_control_message(control_msg, to_peer)
async def emit_control_message(
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID