diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 8613bfe8..4b5bc251 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -17,6 +17,8 @@ from typing import ( import trio +import time + from libp2p.abc import ( IPubsubRouter, ) @@ -80,8 +82,7 @@ class GossipSub(IPubsubRouter, Service): # The protocol peer supports peer_protocol: dict[ID, TProtocol] - # TODO: Add `time_since_last_publish` - # Create topic --> time since last publish map. + time_since_last_publish: dict[str, int] mcache: MessageCache @@ -253,6 +254,8 @@ class GossipSub(IPubsubRouter, Service): except StreamClosed: logger.debug("Fail to publish message to %s: stream closed", peer_id) self.pubsub._handle_dead_peer(peer_id) + for topic in pubsub_msg.topicIDs: + self.time_since_last_publish[topic] = int(time.time()) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -342,6 +345,7 @@ class GossipSub(IPubsubRouter, Service): await self.emit_graft(topic, peer) self.fanout.pop(topic, None) + self.time_since_last_publish.pop(topic, None) async def leave(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec @@ -515,9 +519,10 @@ class GossipSub(IPubsubRouter, Service): def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.fanout: - # Delete topic entry if it's not in `pubsub.peer_topics` - # or (TODO) if it's time-since-last-published > ttl - if topic not in self.pubsub.peer_topics: + if ( + topic not in self.pubsub.peer_topics + or self.time_since_last_publish.get(topic, 0) + self.time_to_live < int(time.time()) + ): # Remove topic from fanout del self.fanout[topic] else: