mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Add time_since_last_publish
This commit is contained in:
@ -17,6 +17,8 @@ from typing import (
|
|||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
from libp2p.abc import (
|
from libp2p.abc import (
|
||||||
IPubsubRouter,
|
IPubsubRouter,
|
||||||
)
|
)
|
||||||
@ -80,8 +82,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
# The protocol peer supports
|
# The protocol peer supports
|
||||||
peer_protocol: dict[ID, TProtocol]
|
peer_protocol: dict[ID, TProtocol]
|
||||||
|
|
||||||
# TODO: Add `time_since_last_publish`
|
time_since_last_publish: dict[str, int]
|
||||||
# Create topic --> time since last publish map.
|
|
||||||
|
|
||||||
mcache: MessageCache
|
mcache: MessageCache
|
||||||
|
|
||||||
@ -253,6 +254,8 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
except StreamClosed:
|
except StreamClosed:
|
||||||
logger.debug("Fail to publish message to %s: stream closed", peer_id)
|
logger.debug("Fail to publish message to %s: stream closed", peer_id)
|
||||||
self.pubsub._handle_dead_peer(peer_id)
|
self.pubsub._handle_dead_peer(peer_id)
|
||||||
|
for topic in pubsub_msg.topicIDs:
|
||||||
|
self.time_since_last_publish[topic] = int(time.time())
|
||||||
|
|
||||||
def _get_peers_to_send(
|
def _get_peers_to_send(
|
||||||
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
|
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
|
||||||
@ -342,6 +345,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
await self.emit_graft(topic, peer)
|
await self.emit_graft(topic, peer)
|
||||||
|
|
||||||
self.fanout.pop(topic, None)
|
self.fanout.pop(topic, None)
|
||||||
|
self.time_since_last_publish.pop(topic, None)
|
||||||
|
|
||||||
async def leave(self, topic: str) -> None:
|
async def leave(self, topic: str) -> None:
|
||||||
# Note: the comments here are the near-exact algorithm description from the spec
|
# Note: the comments here are the near-exact algorithm description from the spec
|
||||||
@ -515,9 +519,10 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
def fanout_heartbeat(self) -> None:
|
def fanout_heartbeat(self) -> None:
|
||||||
# Note: the comments here are the exact pseudocode from the spec
|
# Note: the comments here are the exact pseudocode from the spec
|
||||||
for topic in self.fanout:
|
for topic in self.fanout:
|
||||||
# Delete topic entry if it's not in `pubsub.peer_topics`
|
if (
|
||||||
# or (TODO) if it's time-since-last-published > ttl
|
topic not in self.pubsub.peer_topics
|
||||||
if topic not in self.pubsub.peer_topics:
|
or self.time_since_last_publish.get(topic, 0) + self.time_to_live < int(time.time())
|
||||||
|
):
|
||||||
# Remove topic from fanout
|
# Remove topic from fanout
|
||||||
del self.fanout[topic]
|
del self.fanout[topic]
|
||||||
else:
|
else:
|
||||||
|
|||||||
Reference in New Issue
Block a user