From c2046e6aa444bab4d3ef655651e861886f64ad1b Mon Sep 17 00:00:00 2001 From: sukhman Date: Sun, 1 Jun 2025 01:47:47 +0530 Subject: [PATCH 1/5] Add time_since_last_publish --- libp2p/pubsub/gossipsub.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 8613bfe8..4b5bc251 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -17,6 +17,8 @@ from typing import ( import trio +import time + from libp2p.abc import ( IPubsubRouter, ) @@ -80,8 +82,7 @@ class GossipSub(IPubsubRouter, Service): # The protocol peer supports peer_protocol: dict[ID, TProtocol] - # TODO: Add `time_since_last_publish` - # Create topic --> time since last publish map. + time_since_last_publish: dict[str, int] mcache: MessageCache @@ -253,6 +254,8 @@ class GossipSub(IPubsubRouter, Service): except StreamClosed: logger.debug("Fail to publish message to %s: stream closed", peer_id) self.pubsub._handle_dead_peer(peer_id) + for topic in pubsub_msg.topicIDs: + self.time_since_last_publish[topic] = int(time.time()) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -342,6 +345,7 @@ class GossipSub(IPubsubRouter, Service): await self.emit_graft(topic, peer) self.fanout.pop(topic, None) + self.time_since_last_publish.pop(topic, None) async def leave(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec @@ -515,9 +519,10 @@ class GossipSub(IPubsubRouter, Service): def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.fanout: - # Delete topic entry if it's not in `pubsub.peer_topics` - # or (TODO) if it's time-since-last-published > ttl - if topic not in self.pubsub.peer_topics: + if ( + topic not in self.pubsub.peer_topics + or self.time_since_last_publish.get(topic, 0) + self.time_to_live < int(time.time()) + ): # Remove topic from fanout del self.fanout[topic] else: From 338672214c40e5b6e0c5c09f7ebc13fa73afaf65 Mon Sep 17 00:00:00 2001 From: sukhman Date: Wed, 4 Jun 2025 14:05:14 +0530 Subject: [PATCH 2/5] Add test for time_since_last_publish Signed-off-by: sukhman --- libp2p/pubsub/gossipsub.py | 15 +++++++-------- tests/core/pubsub/test_gossipsub.py | 10 ++++++++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 4b5bc251..4bc260be 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -10,6 +10,7 @@ from collections.abc import ( ) import logging import random +import time from typing import ( Any, DefaultDict, @@ -17,8 +18,6 @@ from typing import ( import trio -import time - from libp2p.abc import ( IPubsubRouter, ) @@ -139,6 +138,7 @@ class GossipSub(IPubsubRouter, Service): self.direct_peers[direct_peer.peer_id] = direct_peer self.direct_connect_interval = direct_connect_interval self.direct_connect_initial_delay = direct_connect_initial_delay + self.time_since_last_publish = {} async def run(self) -> None: if self.pubsub is None: @@ -255,7 +255,7 @@ class GossipSub(IPubsubRouter, Service): logger.debug("Fail to publish message to %s: stream closed", peer_id) self.pubsub._handle_dead_peer(peer_id) for topic in pubsub_msg.topicIDs: - self.time_since_last_publish[topic] = int(time.time()) + self.time_since_last_publish[topic] = int(time.time()) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -518,11 +518,10 @@ class GossipSub(IPubsubRouter, Service): def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec - for topic in self.fanout: - if ( - topic not in self.pubsub.peer_topics - or self.time_since_last_publish.get(topic, 0) + self.time_to_live < int(time.time()) - ): + for topic in list(self.fanout): + if topic not in self.pubsub.peer_topics or self.time_since_last_publish.get( + topic, 0 + ) + self.time_to_live < int(time.time()): # Remove topic from fanout del self.fanout[topic] else: diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 5e681091..8e113fa4 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -22,7 +22,7 @@ from tests.utils.pubsub.utils import ( @pytest.mark.trio async def test_join(): async with PubsubFactory.create_batch_with_gossipsub( - 4, degree=4, degree_low=3, degree_high=5 + 4, degree=4, degree_low=3, degree_high=5, time_to_live=2 ) as pubsubs_gsub: gossipsubs = [pubsub.router for pubsub in pubsubs_gsub] hosts = [pubsub.host for pubsub in pubsubs_gsub] @@ -49,12 +49,18 @@ async def test_join(): # is added to central node's fanout # publish from the randomly chosen host await pubsubs_gsub[central_node_index].publish(topic, b"data") - + await trio.sleep(1) # Check that the gossipsub of central node has fanout for the topic assert topic in gossipsubs[central_node_index].fanout # Check that the gossipsub of central node does not have a mesh for the topic assert topic not in gossipsubs[central_node_index].mesh + # Check that the gossipsub of central node + # has a time_since_last_publish for the topic + assert topic in gossipsubs[central_node_index].time_since_last_publish + await trio.sleep(2) + # Check that after ttl the topic is no more in fanout of central node + assert topic not in gossipsubs[central_node_index].fanout # Central node subscribes the topic await pubsubs_gsub[central_node_index].subscribe(topic) From cef217358f9208b7725dce2873003d14f58eed0f Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Thu, 5 Jun 2025 13:39:07 +0530 Subject: [PATCH 3/5] fixed fanout_heartbeat bug and gossipsub join test --- libp2p/pubsub/gossipsub.py | 8 +++++--- tests/core/pubsub/test_gossipsub.py | 29 ++++++++++++++++------------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 4bc260be..cbed462d 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -519,9 +519,11 @@ class GossipSub(IPubsubRouter, Service): def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in list(self.fanout): - if topic not in self.pubsub.peer_topics or self.time_since_last_publish.get( - topic, 0 - ) + self.time_to_live < int(time.time()): + if ( + topic not in self.pubsub.peer_topics + and self.time_since_last_publish.get(topic, 0) + self.time_to_live + < int(time.time()) + ): # Remove topic from fanout del self.fanout[topic] else: diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 8e113fa4..20c315ef 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -22,13 +22,14 @@ from tests.utils.pubsub.utils import ( @pytest.mark.trio async def test_join(): async with PubsubFactory.create_batch_with_gossipsub( - 4, degree=4, degree_low=3, degree_high=5, time_to_live=2 + 4, degree=4, degree_low=3, degree_high=5, heartbeat_interval=1, time_to_live=1 ) as pubsubs_gsub: gossipsubs = [pubsub.router for pubsub in pubsubs_gsub] hosts = [pubsub.host for pubsub in pubsubs_gsub] hosts_indices = list(range(len(pubsubs_gsub))) topic = "test_join" + to_drop_topic = "test_drop_topic" central_node_index = 0 # Remove index of central host from the indices hosts_indices.remove(central_node_index) @@ -42,29 +43,31 @@ async def test_join(): # Connect central host to all other hosts await one_to_all_connect(hosts, central_node_index) - # Wait 2 seconds for heartbeat to allow mesh to connect - await trio.sleep(2) + # Wait 1 seconds for heartbeat to allow mesh to connect + await trio.sleep(1) # Central node publish to the topic so that this topic # is added to central node's fanout # publish from the randomly chosen host await pubsubs_gsub[central_node_index].publish(topic, b"data") - await trio.sleep(1) - # Check that the gossipsub of central node has fanout for the topic - assert topic in gossipsubs[central_node_index].fanout - # Check that the gossipsub of central node does not have a mesh for the topic - assert topic not in gossipsubs[central_node_index].mesh + await pubsubs_gsub[central_node_index].publish(to_drop_topic, b"data") + await trio.sleep(0.5) + # Check that the gossipsub of central node has fanout for the topics + assert topic, to_drop_topic in gossipsubs[central_node_index].fanout + # Check that the gossipsub of central node does not have a mesh for the topics + assert topic, to_drop_topic not in gossipsubs[central_node_index].mesh # Check that the gossipsub of central node - # has a time_since_last_publish for the topic + # has a time_since_last_publish for the topics assert topic in gossipsubs[central_node_index].time_since_last_publish + assert to_drop_topic in gossipsubs[central_node_index].time_since_last_publish - await trio.sleep(2) - # Check that after ttl the topic is no more in fanout of central node - assert topic not in gossipsubs[central_node_index].fanout + await trio.sleep(1) + # Check that after ttl the to_drop_topic is no more in fanout of central node + assert to_drop_topic not in gossipsubs[central_node_index].fanout # Central node subscribes the topic await pubsubs_gsub[central_node_index].subscribe(topic) - await trio.sleep(2) + await trio.sleep(1) # Check that the gossipsub of central node no longer has fanout for the topic assert topic not in gossipsubs[central_node_index].fanout From d4785b9e2692221bfdd342ec4e4a6c706666031d Mon Sep 17 00:00:00 2001 From: sukhman Date: Thu, 5 Jun 2025 14:00:23 +0530 Subject: [PATCH 4/5] Add newsfragments to the PR Signed-off-by: sukhman --- newsfragments/636.features.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/636.features.rst diff --git a/newsfragments/636.features.rst b/newsfragments/636.features.rst new file mode 100644 index 00000000..7ec489be --- /dev/null +++ b/newsfragments/636.features.rst @@ -0,0 +1 @@ +feat: add method to compute time since last message published by a peer and remove fanout peers based on ttl. From d75886b180392e9a174738e30b95103a689d7730 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Fri, 6 Jun 2025 17:55:40 +0530 Subject: [PATCH 5/5] renamed newsfragment file causing docs ci failure --- newsfragments/{636.features.rst => 636.feature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename newsfragments/{636.features.rst => 636.feature.rst} (100%) diff --git a/newsfragments/636.features.rst b/newsfragments/636.feature.rst similarity index 100% rename from newsfragments/636.features.rst rename to newsfragments/636.feature.rst