diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 4b5bc251..4bc260be 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -10,6 +10,7 @@ from collections.abc import ( ) import logging import random +import time from typing import ( Any, DefaultDict, @@ -17,8 +18,6 @@ from typing import ( import trio -import time - from libp2p.abc import ( IPubsubRouter, ) @@ -139,6 +138,7 @@ class GossipSub(IPubsubRouter, Service): self.direct_peers[direct_peer.peer_id] = direct_peer self.direct_connect_interval = direct_connect_interval self.direct_connect_initial_delay = direct_connect_initial_delay + self.time_since_last_publish = {} async def run(self) -> None: if self.pubsub is None: @@ -255,7 +255,7 @@ class GossipSub(IPubsubRouter, Service): logger.debug("Fail to publish message to %s: stream closed", peer_id) self.pubsub._handle_dead_peer(peer_id) for topic in pubsub_msg.topicIDs: - self.time_since_last_publish[topic] = int(time.time()) + self.time_since_last_publish[topic] = int(time.time()) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -518,11 +518,10 @@ class GossipSub(IPubsubRouter, Service): def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec - for topic in self.fanout: - if ( - topic not in self.pubsub.peer_topics - or self.time_since_last_publish.get(topic, 0) + self.time_to_live < int(time.time()) - ): + for topic in list(self.fanout): + if topic not in self.pubsub.peer_topics or self.time_since_last_publish.get( + topic, 0 + ) + self.time_to_live < int(time.time()): # Remove topic from fanout del self.fanout[topic] else: diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 5e681091..8e113fa4 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -22,7 +22,7 @@ from tests.utils.pubsub.utils import ( @pytest.mark.trio async def test_join(): async with PubsubFactory.create_batch_with_gossipsub( - 4, degree=4, degree_low=3, degree_high=5 + 4, degree=4, degree_low=3, degree_high=5, time_to_live=2 ) as pubsubs_gsub: gossipsubs = [pubsub.router for pubsub in pubsubs_gsub] hosts = [pubsub.host for pubsub in pubsubs_gsub] @@ -49,12 +49,18 @@ async def test_join(): # is added to central node's fanout # publish from the randomly chosen host await pubsubs_gsub[central_node_index].publish(topic, b"data") - + await trio.sleep(1) # Check that the gossipsub of central node has fanout for the topic assert topic in gossipsubs[central_node_index].fanout # Check that the gossipsub of central node does not have a mesh for the topic assert topic not in gossipsubs[central_node_index].mesh + # Check that the gossipsub of central node + # has a time_since_last_publish for the topic + assert topic in gossipsubs[central_node_index].time_since_last_publish + await trio.sleep(2) + # Check that after ttl the topic is no more in fanout of central node + assert topic not in gossipsubs[central_node_index].fanout # Central node subscribes the topic await pubsubs_gsub[central_node_index].subscribe(topic)