diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 813719dd..ccfeb941 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -528,82 +528,99 @@ class GossipSub(IPubsubRouter, Service): peers_to_prune[peer].append(topic) return peers_to_graft, peers_to_prune - def fanout_heartbeat(self) -> None: - # Note: the comments here are the exact pseudocode from the spec - for topic in list(self.fanout): - if ( - self.pubsub is not None - and topic not in self.pubsub.peer_topics - and self.time_since_last_publish.get(topic, 0) + self.time_to_live - < int(time.time()) + def _handle_topic_heartbeat( + self, + topic: str, + current_peers: set[ID], + is_fanout: bool = False, + peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] | None = None, + ) -> tuple[set[ID], bool]: + """ + Helper method to handle heartbeat for a single topic, + supporting both fanout and gossip. + + :param topic: The topic to handle + :param current_peers: Current set of peers in the topic + :param is_fanout: Whether this is a fanout topic (affects expiration check) + :param peers_to_gossip: Optional dictionary to store peers to gossip to + :return: Tuple of (updated_peers, should_remove_topic) + """ + if self.pubsub is None: + raise NoPubsubAttached + + # Skip if no peers have subscribed to the topic + if topic not in self.pubsub.peer_topics: + return current_peers, False + + # For fanout topics, check if we should remove the topic + if is_fanout: + if self.time_since_last_publish.get(topic, 0) + self.time_to_live < int( + time.time() ): - # Remove topic from fanout + return set(), True + + # Check if peers are still in the topic and remove the ones that are not + in_topic_peers: set[ID] = { + peer for peer in current_peers if peer in self.pubsub.peer_topics[topic] + } + + # If we need more peers to reach target degree + if len(in_topic_peers) < self.degree: + # Select additional peers from peers.gossipsub[topic] + selected_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree - len(in_topic_peers), + in_topic_peers, + ) + # Add the selected peers + in_topic_peers.update(selected_peers) + + # Handle gossip if requested + if peers_to_gossip is not None: + msg_ids = self.mcache.window(topic) + if msg_ids: + # Select D peers from peers.gossipsub[topic] excluding current peers + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree, current_peers + ) + msg_id_strs = [str(msg_id) for msg_id in msg_ids] + for peer in peers_to_emit_ihave_to: + peers_to_gossip[peer][topic] = msg_id_strs + + return in_topic_peers, False + + def fanout_heartbeat(self) -> None: + """ + Maintain fanout topics by: + 1. Removing expired topics + 2. Removing peers that are no longer in the topic + 3. Adding new peers if needed to maintain the target degree + """ + for topic in list(self.fanout): + updated_peers, should_remove = self._handle_topic_heartbeat( + topic, self.fanout[topic], is_fanout=True + ) + if should_remove: del self.fanout[topic] else: - # Check if fanout peers are still in the topic and remove the ones that are not # noqa: E501 - # ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501 - - in_topic_fanout_peers: list[ID] = [] - if self.pubsub is not None: - in_topic_fanout_peers = [ - peer - for peer in self.fanout[topic] - if peer in self.pubsub.peer_topics[topic] - ] - self.fanout[topic] = set(in_topic_fanout_peers) - num_fanout_peers_in_topic = len(self.fanout[topic]) - - # If |fanout[topic]| < D - if num_fanout_peers_in_topic < self.degree: - # Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] # noqa: E501 - selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree - num_fanout_peers_in_topic, - self.fanout[topic], - ) - # Add the peers to fanout[topic] - self.fanout[topic].update(selected_peers) + self.fanout[topic] = updated_peers def gossip_heartbeat(self) -> DefaultDict[ID, dict[str, list[str]]]: peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] = defaultdict(dict) + + # Handle mesh topics for topic in self.mesh: - msg_ids = self.mcache.window(topic) - if msg_ids: - if self.pubsub is None: - raise NoPubsubAttached - # Get all pubsub peers in a topic and only add them if they are - # gossipsub peers too - if topic in self.pubsub.peer_topics: - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = ( - self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, self.mesh[topic] - ) - ) + self._handle_topic_heartbeat( + topic, self.mesh[topic], peers_to_gossip=peers_to_gossip + ) - msg_id_strs = [str(msg_id) for msg_id in msg_ids] - for peer in peers_to_emit_ihave_to: - peers_to_gossip[peer][topic] = msg_id_strs - - # TODO: Refactor and Dedup. This section is the roughly the same as the above. - # Do the same for fanout, for all topics not already hit in mesh + # Handle fanout topics that aren't in mesh for topic in self.fanout: - msg_ids = self.mcache.window(topic) - if msg_ids: - if self.pubsub is None: - raise NoPubsubAttached - # Get all pubsub peers in topic and only add if they are - # gossipsub peers also - if topic in self.pubsub.peer_topics: - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = ( - self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, self.fanout[topic] - ) - ) - msg_id_strs = [str(msg) for msg in msg_ids] - for peer in peers_to_emit_ihave_to: - peers_to_gossip[peer][topic] = msg_id_strs + if topic not in self.mesh: + self._handle_topic_heartbeat( + topic, self.fanout[topic], peers_to_gossip=peers_to_gossip + ) + return peers_to_gossip @staticmethod diff --git a/newsfragments/678.misc.rst b/newsfragments/678.misc.rst new file mode 100644 index 00000000..e63e2b95 --- /dev/null +++ b/newsfragments/678.misc.rst @@ -0,0 +1 @@ +Refactored gossipsub heartbeat logic to use a single helper method `_handle_topic_heartbeat` that handles both fanout and gossip heartbeats. diff --git a/newsfragments/684.misc.rst b/newsfragments/684.misc.rst new file mode 100644 index 00000000..0f16fb8f --- /dev/null +++ b/newsfragments/684.misc.rst @@ -0,0 +1 @@ +Uses the `decapsulate` method of the `Multiaddr` class to clean up the observed address. diff --git a/tests/core/identity/identify/test_identify.py b/tests/core/identity/identify/test_identify.py index 5c8bf8e3..e88c7ebe 100644 --- a/tests/core/identity/identify/test_identify.py +++ b/tests/core/identity/identify/test_identify.py @@ -56,16 +56,9 @@ async def test_identify_protocol(security_protocol): ) # Check observed address - # TODO: use decapsulateCode(protocols('p2p').code) - # when the Multiaddr class will implement it host_b_addr = host_b.get_addrs()[0] - cleaned_addr = Multiaddr.join( - *( - host_b_addr.split()[:-1] - if str(host_b_addr.split()[-1]).startswith("/p2p/") - else host_b_addr.split() - ) - ) + host_b_peer_id = host_b.get_id() + cleaned_addr = host_b_addr.decapsulate(Multiaddr(f"/p2p/{host_b_peer_id}")) logger.debug("observed_addr: %s", Multiaddr(identify_response.observed_addr)) logger.debug("host_b.get_addrs()[0]: %s", host_b.get_addrs()[0])