From 788b4cf51a09144b6e8b38e201c47ef2a19133c3 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Wed, 18 Jun 2025 18:57:09 +0530 Subject: [PATCH] added complete back_off implementation --- libp2p/pubsub/gossipsub.py | 118 ++++++++++++++++++++++------ tests/core/pubsub/test_gossipsub.py | 4 +- 2 files changed, 95 insertions(+), 27 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index d8f11215..7abe6251 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -93,7 +93,8 @@ class GossipSub(IPubsubRouter, Service): direct_connect_interval: int do_px: bool - back_off: int + back_off: dict[str, dict[ID, int]] + prune_back_off: int unsubscribe_back_off: int def __init__( @@ -111,7 +112,7 @@ class GossipSub(IPubsubRouter, Service): direct_connect_initial_delay: float = 0.1, direct_connect_interval: int = 300, do_px: bool = False, - back_off: int = 60, + prune_back_off: int = 60, unsubscribe_back_off: int = 10, ) -> None: self.protocols = list(protocols) @@ -148,7 +149,8 @@ class GossipSub(IPubsubRouter, Service): self.time_since_last_publish = {} self.do_px = do_px - self.back_off = back_off + self.back_off = dict() + self.prune_back_off = prune_back_off self.unsubscribe_back_off = unsubscribe_back_off async def run(self) -> None: @@ -345,15 +347,21 @@ class GossipSub(IPubsubRouter, Service): self.mesh[topic] = set() topic_in_fanout: bool = topic in self.fanout - fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set() + fanout_peers: set[ID] = set() + + for peer in self.fanout[topic]: + if self._check_back_off(peer, topic): + continue + fanout_peers.add(peer) + fanout_size = len(fanout_peers) if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): # There are less than D peers (let this number be x) # in the fanout for a topic (or the topic is not in the fanout). # Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. - if topic in self.pubsub.peer_topics: + if self.pubsub is not None and topic in self.pubsub.peer_topics: selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - fanout_size, fanout_peers + topic, self.degree - fanout_size, fanout_peers, True ) # Combine fanout peers with selected peers fanout_peers.update(selected_peers) @@ -380,7 +388,8 @@ class GossipSub(IPubsubRouter, Service): return # Notify the peers in mesh[topic] with a PRUNE(topic) message for peer in self.mesh[topic]: - await self.emit_prune(topic, peer, do_px=self.do_px, is_unsubscribe=True) + await self.emit_prune(topic, peer, self.do_px, True) + self._add_back_off(peer, topic, True) # Forget mesh[topic] self.mesh.pop(topic, None) @@ -516,7 +525,7 @@ class GossipSub(IPubsubRouter, Service): if num_mesh_peers_in_topic < self.degree_low: # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501 selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic] + topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic], True ) for peer in selected_peers: @@ -579,9 +588,7 @@ class GossipSub(IPubsubRouter, Service): if len(in_topic_peers) < self.degree: # Select additional peers from peers.gossipsub[topic] selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree - len(in_topic_peers), - in_topic_peers, + topic, self.degree - len(in_topic_peers), in_topic_peers, True ) # Add the selected peers in_topic_peers.update(selected_peers) @@ -592,7 +599,7 @@ class GossipSub(IPubsubRouter, Service): if msg_ids: # Select D peers from peers.gossipsub[topic] excluding current peers peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, current_peers + topic, self.degree, current_peers, True ) msg_id_strs = [str(msg_id) for msg_id in msg_ids] for peer in peers_to_emit_ihave_to: @@ -666,7 +673,11 @@ class GossipSub(IPubsubRouter, Service): return selection def _get_in_topic_gossipsub_peers_from_minus( - self, topic: str, num_to_select: int, minus: Iterable[ID] + self, + topic: str, + num_to_select: int, + minus: Iterable[ID], + backoff_check: bool = False, ) -> list[ID]: if self.pubsub is None: raise NoPubsubAttached @@ -675,8 +686,57 @@ class GossipSub(IPubsubRouter, Service): for peer_id in self.pubsub.peer_topics[topic] if self.peer_protocol[peer_id] == PROTOCOL_ID } + if backoff_check: + # filter out peers that are in back off for this topic + gossipsub_peers_in_topic = { + peer_id + for peer_id in gossipsub_peers_in_topic + if self._check_back_off(peer_id, topic) is False + } return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus) + def _add_back_off( + self, peer: ID, topic: str, is_unsubscribe: bool, backoff_duration: int = 0 + ) -> None: + """ + Add back off for a peer in a topic. + :param peer: peer to add back off for + :param topic: topic to add back off for + :param is_unsubscribe: whether this is an unsubscribe operation + :param backoff_duration: duration of back off in seconds, if 0, use default + """ + if topic not in self.back_off: + self.back_off[topic] = dict() + + backoff_till = int(time.time()) + if backoff_duration > 0: + backoff_till += backoff_duration + else: + if is_unsubscribe: + backoff_till += self.unsubscribe_back_off + else: + backoff_till += self.prune_back_off + + if peer not in self.back_off[topic]: + self.back_off[topic][peer] = backoff_till + else: + self.back_off[topic][peer] = max(self.back_off[topic][peer], backoff_till) + + def _check_back_off(self, peer: ID, topic: str) -> bool: + """ + Check if a peer is in back off for a topic and cleanup expired back off entries. + :param peer: peer to check + :param topic: topic to check + :return: True if the peer is in back off, False otherwise + """ + if topic not in self.back_off: + return False + if self.back_off[topic].get(peer, 0) > int(time.time()): + return True + else: + del self.back_off[topic][peer] + return False + # RPC handlers async def handle_ihave( @@ -762,9 +822,6 @@ class GossipSub(IPubsubRouter, Service): ) -> None: topic: str = graft_msg.topicID - # TODO: complete the remaining logic - self.do_px - # Add peer to mesh for topic if topic in self.mesh: for direct_peer in self.direct_peers: @@ -772,26 +829,38 @@ class GossipSub(IPubsubRouter, Service): logger.warning( "GRAFT: ignoring request from direct peer %s", sender_peer_id ) - await self.emit_prune( - topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False - ) + await self.emit_prune(topic, sender_peer_id, False, False) return + if self._check_back_off(sender_peer_id, topic): + logger.warning( + "GRAFT: ignoring request from %s, back off until %d", + sender_peer_id, + self.back_off[topic][sender_peer_id], + ) + self._add_back_off(sender_peer_id, topic, False) + await self.emit_prune(topic, sender_peer_id, False, False) + return + if sender_peer_id not in self.mesh[topic]: self.mesh[topic].add(sender_peer_id) else: # Respond with PRUNE if not subscribed to the topic - await self.emit_prune( - topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False - ) + await self.emit_prune(topic, sender_peer_id, self.do_px, False) async def handle_prune( self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID ) -> None: topic: str = prune_msg.topicID + backoff_till: int = prune_msg.backoff # Remove peer from mesh for topic if topic in self.mesh: + if backoff_till > 0: + self._add_back_off(sender_peer_id, topic, False, backoff_till) + else: + self._add_back_off(sender_peer_id, topic, False) + self.mesh[topic].discard(sender_peer_id) # RPC emitters @@ -845,12 +914,11 @@ class GossipSub(IPubsubRouter, Service): async def emit_prune( self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool ) -> None: - async def emit_prune(self, topic: str, id: ID) -> None: """Emit graft message, sent to to_peer, for topic.""" prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() prune_msg.topicID = topic - back_off_duration = self.back_off + back_off_duration = self.prune_back_off if is_unsubscribe: back_off_duration = self.unsubscribe_back_off @@ -862,7 +930,7 @@ class GossipSub(IPubsubRouter, Service): control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.prune.extend([prune_msg]) - await self.emit_control_message(control_msg, id) + await self.emit_control_message(control_msg, to_peer) async def emit_control_message( self, control_msg: rpc_pb2.ControlMessage, to_peer: ID diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 4dec971d..9a767608 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -134,7 +134,7 @@ async def test_handle_graft(monkeypatch): # check if it is called in `handle_graft` event_emit_prune = trio.Event() - async def emit_prune(topic, sender_peer_id): + async def emit_prune(topic, sender_peer_id, do_px, is_unsubscribe): event_emit_prune.set() await trio.lowlevel.checkpoint() @@ -193,7 +193,7 @@ async def test_handle_prune(): # alice emit prune message to bob, alice should be removed # from bob's mesh peer - await gossipsubs[index_alice].emit_prune(topic, id_bob) + await gossipsubs[index_alice].emit_prune(topic, id_bob, False, False) # `emit_prune` does not remove bob from alice's mesh peers assert id_bob in gossipsubs[index_alice].mesh[topic]