diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 813719dd..ccfeb941 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -528,82 +528,99 @@ class GossipSub(IPubsubRouter, Service): peers_to_prune[peer].append(topic) return peers_to_graft, peers_to_prune - def fanout_heartbeat(self) -> None: - # Note: the comments here are the exact pseudocode from the spec - for topic in list(self.fanout): - if ( - self.pubsub is not None - and topic not in self.pubsub.peer_topics - and self.time_since_last_publish.get(topic, 0) + self.time_to_live - < int(time.time()) + def _handle_topic_heartbeat( + self, + topic: str, + current_peers: set[ID], + is_fanout: bool = False, + peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] | None = None, + ) -> tuple[set[ID], bool]: + """ + Helper method to handle heartbeat for a single topic, + supporting both fanout and gossip. + + :param topic: The topic to handle + :param current_peers: Current set of peers in the topic + :param is_fanout: Whether this is a fanout topic (affects expiration check) + :param peers_to_gossip: Optional dictionary to store peers to gossip to + :return: Tuple of (updated_peers, should_remove_topic) + """ + if self.pubsub is None: + raise NoPubsubAttached + + # Skip if no peers have subscribed to the topic + if topic not in self.pubsub.peer_topics: + return current_peers, False + + # For fanout topics, check if we should remove the topic + if is_fanout: + if self.time_since_last_publish.get(topic, 0) + self.time_to_live < int( + time.time() ): - # Remove topic from fanout + return set(), True + + # Check if peers are still in the topic and remove the ones that are not + in_topic_peers: set[ID] = { + peer for peer in current_peers if peer in self.pubsub.peer_topics[topic] + } + + # If we need more peers to reach target degree + if len(in_topic_peers) < self.degree: + # Select additional peers from peers.gossipsub[topic] + selected_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree - len(in_topic_peers), + in_topic_peers, + ) + # Add the selected peers + in_topic_peers.update(selected_peers) + + # Handle gossip if requested + if peers_to_gossip is not None: + msg_ids = self.mcache.window(topic) + if msg_ids: + # Select D peers from peers.gossipsub[topic] excluding current peers + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree, current_peers + ) + msg_id_strs = [str(msg_id) for msg_id in msg_ids] + for peer in peers_to_emit_ihave_to: + peers_to_gossip[peer][topic] = msg_id_strs + + return in_topic_peers, False + + def fanout_heartbeat(self) -> None: + """ + Maintain fanout topics by: + 1. Removing expired topics + 2. Removing peers that are no longer in the topic + 3. Adding new peers if needed to maintain the target degree + """ + for topic in list(self.fanout): + updated_peers, should_remove = self._handle_topic_heartbeat( + topic, self.fanout[topic], is_fanout=True + ) + if should_remove: del self.fanout[topic] else: - # Check if fanout peers are still in the topic and remove the ones that are not # noqa: E501 - # ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501 - - in_topic_fanout_peers: list[ID] = [] - if self.pubsub is not None: - in_topic_fanout_peers = [ - peer - for peer in self.fanout[topic] - if peer in self.pubsub.peer_topics[topic] - ] - self.fanout[topic] = set(in_topic_fanout_peers) - num_fanout_peers_in_topic = len(self.fanout[topic]) - - # If |fanout[topic]| < D - if num_fanout_peers_in_topic < self.degree: - # Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] # noqa: E501 - selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree - num_fanout_peers_in_topic, - self.fanout[topic], - ) - # Add the peers to fanout[topic] - self.fanout[topic].update(selected_peers) + self.fanout[topic] = updated_peers def gossip_heartbeat(self) -> DefaultDict[ID, dict[str, list[str]]]: peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] = defaultdict(dict) + + # Handle mesh topics for topic in self.mesh: - msg_ids = self.mcache.window(topic) - if msg_ids: - if self.pubsub is None: - raise NoPubsubAttached - # Get all pubsub peers in a topic and only add them if they are - # gossipsub peers too - if topic in self.pubsub.peer_topics: - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = ( - self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, self.mesh[topic] - ) - ) + self._handle_topic_heartbeat( + topic, self.mesh[topic], peers_to_gossip=peers_to_gossip + ) - msg_id_strs = [str(msg_id) for msg_id in msg_ids] - for peer in peers_to_emit_ihave_to: - peers_to_gossip[peer][topic] = msg_id_strs - - # TODO: Refactor and Dedup. This section is the roughly the same as the above. - # Do the same for fanout, for all topics not already hit in mesh + # Handle fanout topics that aren't in mesh for topic in self.fanout: - msg_ids = self.mcache.window(topic) - if msg_ids: - if self.pubsub is None: - raise NoPubsubAttached - # Get all pubsub peers in topic and only add if they are - # gossipsub peers also - if topic in self.pubsub.peer_topics: - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = ( - self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, self.fanout[topic] - ) - ) - msg_id_strs = [str(msg) for msg in msg_ids] - for peer in peers_to_emit_ihave_to: - peers_to_gossip[peer][topic] = msg_id_strs + if topic not in self.mesh: + self._handle_topic_heartbeat( + topic, self.fanout[topic], peers_to_gossip=peers_to_gossip + ) + return peers_to_gossip @staticmethod diff --git a/newsfragments/678.misc.rst b/newsfragments/678.misc.rst new file mode 100644 index 00000000..e63e2b95 --- /dev/null +++ b/newsfragments/678.misc.rst @@ -0,0 +1 @@ +Refactored gossipsub heartbeat logic to use a single helper method `_handle_topic_heartbeat` that handles both fanout and gossip heartbeats.