Add test for time_since_last_publish

Signed-off-by: sukhman <sukhmansinghsaluja@gmail.com>
This commit is contained in:
sukhman
2025-06-04 14:05:14 +05:30
parent c2046e6aa4
commit 338672214c
2 changed files with 15 additions and 10 deletions

View File

@ -10,6 +10,7 @@ from collections.abc import (
)
import logging
import random
import time
from typing import (
Any,
DefaultDict,
@ -17,8 +18,6 @@ from typing import (
import trio
import time
from libp2p.abc import (
IPubsubRouter,
)
@ -139,6 +138,7 @@ class GossipSub(IPubsubRouter, Service):
self.direct_peers[direct_peer.peer_id] = direct_peer
self.direct_connect_interval = direct_connect_interval
self.direct_connect_initial_delay = direct_connect_initial_delay
self.time_since_last_publish = {}
async def run(self) -> None:
if self.pubsub is None:
@ -255,7 +255,7 @@ class GossipSub(IPubsubRouter, Service):
logger.debug("Fail to publish message to %s: stream closed", peer_id)
self.pubsub._handle_dead_peer(peer_id)
for topic in pubsub_msg.topicIDs:
self.time_since_last_publish[topic] = int(time.time())
self.time_since_last_publish[topic] = int(time.time())
def _get_peers_to_send(
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
@ -518,11 +518,10 @@ class GossipSub(IPubsubRouter, Service):
def fanout_heartbeat(self) -> None:
# Note: the comments here are the exact pseudocode from the spec
for topic in self.fanout:
if (
topic not in self.pubsub.peer_topics
or self.time_since_last_publish.get(topic, 0) + self.time_to_live < int(time.time())
):
for topic in list(self.fanout):
if topic not in self.pubsub.peer_topics or self.time_since_last_publish.get(
topic, 0
) + self.time_to_live < int(time.time()):
# Remove topic from fanout
del self.fanout[topic]
else:

View File

@ -22,7 +22,7 @@ from tests.utils.pubsub.utils import (
@pytest.mark.trio
async def test_join():
async with PubsubFactory.create_batch_with_gossipsub(
4, degree=4, degree_low=3, degree_high=5
4, degree=4, degree_low=3, degree_high=5, time_to_live=2
) as pubsubs_gsub:
gossipsubs = [pubsub.router for pubsub in pubsubs_gsub]
hosts = [pubsub.host for pubsub in pubsubs_gsub]
@ -49,12 +49,18 @@ async def test_join():
# is added to central node's fanout
# publish from the randomly chosen host
await pubsubs_gsub[central_node_index].publish(topic, b"data")
await trio.sleep(1)
# Check that the gossipsub of central node has fanout for the topic
assert topic in gossipsubs[central_node_index].fanout
# Check that the gossipsub of central node does not have a mesh for the topic
assert topic not in gossipsubs[central_node_index].mesh
# Check that the gossipsub of central node
# has a time_since_last_publish for the topic
assert topic in gossipsubs[central_node_index].time_since_last_publish
await trio.sleep(2)
# Check that after ttl the topic is no more in fanout of central node
assert topic not in gossipsubs[central_node_index].fanout
# Central node subscribes the topic
await pubsubs_gsub[central_node_index].subscribe(topic)