From cef217358f9208b7725dce2873003d14f58eed0f Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Thu, 5 Jun 2025 13:39:07 +0530 Subject: [PATCH] fixed fanout_heartbeat bug and gossipsub join test --- libp2p/pubsub/gossipsub.py | 8 +++++--- tests/core/pubsub/test_gossipsub.py | 29 ++++++++++++++++------------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 4bc260be..cbed462d 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -519,9 +519,11 @@ class GossipSub(IPubsubRouter, Service): def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in list(self.fanout): - if topic not in self.pubsub.peer_topics or self.time_since_last_publish.get( - topic, 0 - ) + self.time_to_live < int(time.time()): + if ( + topic not in self.pubsub.peer_topics + and self.time_since_last_publish.get(topic, 0) + self.time_to_live + < int(time.time()) + ): # Remove topic from fanout del self.fanout[topic] else: diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 8e113fa4..20c315ef 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -22,13 +22,14 @@ from tests.utils.pubsub.utils import ( @pytest.mark.trio async def test_join(): async with PubsubFactory.create_batch_with_gossipsub( - 4, degree=4, degree_low=3, degree_high=5, time_to_live=2 + 4, degree=4, degree_low=3, degree_high=5, heartbeat_interval=1, time_to_live=1 ) as pubsubs_gsub: gossipsubs = [pubsub.router for pubsub in pubsubs_gsub] hosts = [pubsub.host for pubsub in pubsubs_gsub] hosts_indices = list(range(len(pubsubs_gsub))) topic = "test_join" + to_drop_topic = "test_drop_topic" central_node_index = 0 # Remove index of central host from the indices hosts_indices.remove(central_node_index) @@ -42,29 +43,31 @@ async def test_join(): # Connect central host to all other hosts await one_to_all_connect(hosts, central_node_index) - # Wait 2 seconds for heartbeat to allow mesh to connect - await trio.sleep(2) + # Wait 1 seconds for heartbeat to allow mesh to connect + await trio.sleep(1) # Central node publish to the topic so that this topic # is added to central node's fanout # publish from the randomly chosen host await pubsubs_gsub[central_node_index].publish(topic, b"data") - await trio.sleep(1) - # Check that the gossipsub of central node has fanout for the topic - assert topic in gossipsubs[central_node_index].fanout - # Check that the gossipsub of central node does not have a mesh for the topic - assert topic not in gossipsubs[central_node_index].mesh + await pubsubs_gsub[central_node_index].publish(to_drop_topic, b"data") + await trio.sleep(0.5) + # Check that the gossipsub of central node has fanout for the topics + assert topic, to_drop_topic in gossipsubs[central_node_index].fanout + # Check that the gossipsub of central node does not have a mesh for the topics + assert topic, to_drop_topic not in gossipsubs[central_node_index].mesh # Check that the gossipsub of central node - # has a time_since_last_publish for the topic + # has a time_since_last_publish for the topics assert topic in gossipsubs[central_node_index].time_since_last_publish + assert to_drop_topic in gossipsubs[central_node_index].time_since_last_publish - await trio.sleep(2) - # Check that after ttl the topic is no more in fanout of central node - assert topic not in gossipsubs[central_node_index].fanout + await trio.sleep(1) + # Check that after ttl the to_drop_topic is no more in fanout of central node + assert to_drop_topic not in gossipsubs[central_node_index].fanout # Central node subscribes the topic await pubsubs_gsub[central_node_index].subscribe(topic) - await trio.sleep(2) + await trio.sleep(1) # Check that the gossipsub of central node no longer has fanout for the topic assert topic not in gossipsubs[central_node_index].fanout