diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 8613bfe8..cbed462d 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -10,6 +10,7 @@ from collections.abc import ( ) import logging import random +import time from typing import ( Any, DefaultDict, @@ -80,8 +81,7 @@ class GossipSub(IPubsubRouter, Service): # The protocol peer supports peer_protocol: dict[ID, TProtocol] - # TODO: Add `time_since_last_publish` - # Create topic --> time since last publish map. + time_since_last_publish: dict[str, int] mcache: MessageCache @@ -138,6 +138,7 @@ class GossipSub(IPubsubRouter, Service): self.direct_peers[direct_peer.peer_id] = direct_peer self.direct_connect_interval = direct_connect_interval self.direct_connect_initial_delay = direct_connect_initial_delay + self.time_since_last_publish = {} async def run(self) -> None: if self.pubsub is None: @@ -253,6 +254,8 @@ class GossipSub(IPubsubRouter, Service): except StreamClosed: logger.debug("Fail to publish message to %s: stream closed", peer_id) self.pubsub._handle_dead_peer(peer_id) + for topic in pubsub_msg.topicIDs: + self.time_since_last_publish[topic] = int(time.time()) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -342,6 +345,7 @@ class GossipSub(IPubsubRouter, Service): await self.emit_graft(topic, peer) self.fanout.pop(topic, None) + self.time_since_last_publish.pop(topic, None) async def leave(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec @@ -514,10 +518,12 @@ class GossipSub(IPubsubRouter, Service): def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec - for topic in self.fanout: - # Delete topic entry if it's not in `pubsub.peer_topics` - # or (TODO) if it's time-since-last-published > ttl - if topic not in self.pubsub.peer_topics: + for topic in list(self.fanout): + if ( + topic not in self.pubsub.peer_topics + and self.time_since_last_publish.get(topic, 0) + self.time_to_live + < int(time.time()) + ): # Remove topic from fanout del self.fanout[topic] else: diff --git a/newsfragments/636.feature.rst b/newsfragments/636.feature.rst new file mode 100644 index 00000000..7ec489be --- /dev/null +++ b/newsfragments/636.feature.rst @@ -0,0 +1 @@ +feat: add method to compute time since last message published by a peer and remove fanout peers based on ttl. diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 5e681091..20c315ef 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -22,13 +22,14 @@ from tests.utils.pubsub.utils import ( @pytest.mark.trio async def test_join(): async with PubsubFactory.create_batch_with_gossipsub( - 4, degree=4, degree_low=3, degree_high=5 + 4, degree=4, degree_low=3, degree_high=5, heartbeat_interval=1, time_to_live=1 ) as pubsubs_gsub: gossipsubs = [pubsub.router for pubsub in pubsubs_gsub] hosts = [pubsub.host for pubsub in pubsubs_gsub] hosts_indices = list(range(len(pubsubs_gsub))) topic = "test_join" + to_drop_topic = "test_drop_topic" central_node_index = 0 # Remove index of central host from the indices hosts_indices.remove(central_node_index) @@ -42,23 +43,31 @@ async def test_join(): # Connect central host to all other hosts await one_to_all_connect(hosts, central_node_index) - # Wait 2 seconds for heartbeat to allow mesh to connect - await trio.sleep(2) + # Wait 1 seconds for heartbeat to allow mesh to connect + await trio.sleep(1) # Central node publish to the topic so that this topic # is added to central node's fanout # publish from the randomly chosen host await pubsubs_gsub[central_node_index].publish(topic, b"data") + await pubsubs_gsub[central_node_index].publish(to_drop_topic, b"data") + await trio.sleep(0.5) + # Check that the gossipsub of central node has fanout for the topics + assert topic, to_drop_topic in gossipsubs[central_node_index].fanout + # Check that the gossipsub of central node does not have a mesh for the topics + assert topic, to_drop_topic not in gossipsubs[central_node_index].mesh + # Check that the gossipsub of central node + # has a time_since_last_publish for the topics + assert topic in gossipsubs[central_node_index].time_since_last_publish + assert to_drop_topic in gossipsubs[central_node_index].time_since_last_publish - # Check that the gossipsub of central node has fanout for the topic - assert topic in gossipsubs[central_node_index].fanout - # Check that the gossipsub of central node does not have a mesh for the topic - assert topic not in gossipsubs[central_node_index].mesh - + await trio.sleep(1) + # Check that after ttl the to_drop_topic is no more in fanout of central node + assert to_drop_topic not in gossipsubs[central_node_index].fanout # Central node subscribes the topic await pubsubs_gsub[central_node_index].subscribe(topic) - await trio.sleep(2) + await trio.sleep(1) # Check that the gossipsub of central node no longer has fanout for the topic assert topic not in gossipsubs[central_node_index].fanout