fixed fanout_heartbeat bug and gossipsub join test

This commit is contained in:
Mystical
2025-06-05 13:39:07 +05:30
parent 338672214c
commit cef217358f
2 changed files with 21 additions and 16 deletions

View File

@ -519,9 +519,11 @@ class GossipSub(IPubsubRouter, Service):
def fanout_heartbeat(self) -> None:
# Note: the comments here are the exact pseudocode from the spec
for topic in list(self.fanout):
if topic not in self.pubsub.peer_topics or self.time_since_last_publish.get(
topic, 0
) + self.time_to_live < int(time.time()):
if (
topic not in self.pubsub.peer_topics
and self.time_since_last_publish.get(topic, 0) + self.time_to_live
< int(time.time())
):
# Remove topic from fanout
del self.fanout[topic]
else:

View File

@ -22,13 +22,14 @@ from tests.utils.pubsub.utils import (
@pytest.mark.trio
async def test_join():
async with PubsubFactory.create_batch_with_gossipsub(
4, degree=4, degree_low=3, degree_high=5, time_to_live=2
4, degree=4, degree_low=3, degree_high=5, heartbeat_interval=1, time_to_live=1
) as pubsubs_gsub:
gossipsubs = [pubsub.router for pubsub in pubsubs_gsub]
hosts = [pubsub.host for pubsub in pubsubs_gsub]
hosts_indices = list(range(len(pubsubs_gsub)))
topic = "test_join"
to_drop_topic = "test_drop_topic"
central_node_index = 0
# Remove index of central host from the indices
hosts_indices.remove(central_node_index)
@ -42,29 +43,31 @@ async def test_join():
# Connect central host to all other hosts
await one_to_all_connect(hosts, central_node_index)
# Wait 2 seconds for heartbeat to allow mesh to connect
await trio.sleep(2)
# Wait 1 seconds for heartbeat to allow mesh to connect
await trio.sleep(1)
# Central node publish to the topic so that this topic
# is added to central node's fanout
# publish from the randomly chosen host
await pubsubs_gsub[central_node_index].publish(topic, b"data")
await trio.sleep(1)
# Check that the gossipsub of central node has fanout for the topic
assert topic in gossipsubs[central_node_index].fanout
# Check that the gossipsub of central node does not have a mesh for the topic
assert topic not in gossipsubs[central_node_index].mesh
await pubsubs_gsub[central_node_index].publish(to_drop_topic, b"data")
await trio.sleep(0.5)
# Check that the gossipsub of central node has fanout for the topics
assert topic, to_drop_topic in gossipsubs[central_node_index].fanout
# Check that the gossipsub of central node does not have a mesh for the topics
assert topic, to_drop_topic not in gossipsubs[central_node_index].mesh
# Check that the gossipsub of central node
# has a time_since_last_publish for the topic
# has a time_since_last_publish for the topics
assert topic in gossipsubs[central_node_index].time_since_last_publish
assert to_drop_topic in gossipsubs[central_node_index].time_since_last_publish
await trio.sleep(2)
# Check that after ttl the topic is no more in fanout of central node
assert topic not in gossipsubs[central_node_index].fanout
await trio.sleep(1)
# Check that after ttl the to_drop_topic is no more in fanout of central node
assert to_drop_topic not in gossipsubs[central_node_index].fanout
# Central node subscribes the topic
await pubsubs_gsub[central_node_index].subscribe(topic)
await trio.sleep(2)
await trio.sleep(1)
# Check that the gossipsub of central node no longer has fanout for the topic
assert topic not in gossipsubs[central_node_index].fanout