mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Add time_since_last_publish (#642)
Added `time_since_last_publish` field to gossipsub. Took reference from https://github.com/libp2p/go-libp2p-pubsub/blob/master/gossipsub.go#L1224 Issue https://github.com/libp2p/py-libp2p/issues/636 ## How was it fixed? whenever someone publishes message to a topic or set of topics, `time_since_last_publish` gets updated and whenever we clear fanout peers or time exceeds ttl, we clear `time_since_last_publish` from dict. ### To-Do Creating draft PR for now. Tests and type-binding is left for this issue. #### Cute Animal Picture 
This commit is contained in:
@ -10,6 +10,7 @@ from collections.abc import (
|
|||||||
)
|
)
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
DefaultDict,
|
DefaultDict,
|
||||||
@ -80,8 +81,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
# The protocol peer supports
|
# The protocol peer supports
|
||||||
peer_protocol: dict[ID, TProtocol]
|
peer_protocol: dict[ID, TProtocol]
|
||||||
|
|
||||||
# TODO: Add `time_since_last_publish`
|
time_since_last_publish: dict[str, int]
|
||||||
# Create topic --> time since last publish map.
|
|
||||||
|
|
||||||
mcache: MessageCache
|
mcache: MessageCache
|
||||||
|
|
||||||
@ -138,6 +138,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
self.direct_peers[direct_peer.peer_id] = direct_peer
|
self.direct_peers[direct_peer.peer_id] = direct_peer
|
||||||
self.direct_connect_interval = direct_connect_interval
|
self.direct_connect_interval = direct_connect_interval
|
||||||
self.direct_connect_initial_delay = direct_connect_initial_delay
|
self.direct_connect_initial_delay = direct_connect_initial_delay
|
||||||
|
self.time_since_last_publish = {}
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
if self.pubsub is None:
|
if self.pubsub is None:
|
||||||
@ -253,6 +254,8 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
except StreamClosed:
|
except StreamClosed:
|
||||||
logger.debug("Fail to publish message to %s: stream closed", peer_id)
|
logger.debug("Fail to publish message to %s: stream closed", peer_id)
|
||||||
self.pubsub._handle_dead_peer(peer_id)
|
self.pubsub._handle_dead_peer(peer_id)
|
||||||
|
for topic in pubsub_msg.topicIDs:
|
||||||
|
self.time_since_last_publish[topic] = int(time.time())
|
||||||
|
|
||||||
def _get_peers_to_send(
|
def _get_peers_to_send(
|
||||||
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
|
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
|
||||||
@ -342,6 +345,7 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
await self.emit_graft(topic, peer)
|
await self.emit_graft(topic, peer)
|
||||||
|
|
||||||
self.fanout.pop(topic, None)
|
self.fanout.pop(topic, None)
|
||||||
|
self.time_since_last_publish.pop(topic, None)
|
||||||
|
|
||||||
async def leave(self, topic: str) -> None:
|
async def leave(self, topic: str) -> None:
|
||||||
# Note: the comments here are the near-exact algorithm description from the spec
|
# Note: the comments here are the near-exact algorithm description from the spec
|
||||||
@ -514,10 +518,12 @@ class GossipSub(IPubsubRouter, Service):
|
|||||||
|
|
||||||
def fanout_heartbeat(self) -> None:
|
def fanout_heartbeat(self) -> None:
|
||||||
# Note: the comments here are the exact pseudocode from the spec
|
# Note: the comments here are the exact pseudocode from the spec
|
||||||
for topic in self.fanout:
|
for topic in list(self.fanout):
|
||||||
# Delete topic entry if it's not in `pubsub.peer_topics`
|
if (
|
||||||
# or (TODO) if it's time-since-last-published > ttl
|
topic not in self.pubsub.peer_topics
|
||||||
if topic not in self.pubsub.peer_topics:
|
and self.time_since_last_publish.get(topic, 0) + self.time_to_live
|
||||||
|
< int(time.time())
|
||||||
|
):
|
||||||
# Remove topic from fanout
|
# Remove topic from fanout
|
||||||
del self.fanout[topic]
|
del self.fanout[topic]
|
||||||
else:
|
else:
|
||||||
|
|||||||
1
newsfragments/636.feature.rst
Normal file
1
newsfragments/636.feature.rst
Normal file
@ -0,0 +1 @@
|
|||||||
|
feat: add method to compute time since last message published by a peer and remove fanout peers based on ttl.
|
||||||
@ -22,13 +22,14 @@ from tests.utils.pubsub.utils import (
|
|||||||
@pytest.mark.trio
|
@pytest.mark.trio
|
||||||
async def test_join():
|
async def test_join():
|
||||||
async with PubsubFactory.create_batch_with_gossipsub(
|
async with PubsubFactory.create_batch_with_gossipsub(
|
||||||
4, degree=4, degree_low=3, degree_high=5
|
4, degree=4, degree_low=3, degree_high=5, heartbeat_interval=1, time_to_live=1
|
||||||
) as pubsubs_gsub:
|
) as pubsubs_gsub:
|
||||||
gossipsubs = [pubsub.router for pubsub in pubsubs_gsub]
|
gossipsubs = [pubsub.router for pubsub in pubsubs_gsub]
|
||||||
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
hosts = [pubsub.host for pubsub in pubsubs_gsub]
|
||||||
hosts_indices = list(range(len(pubsubs_gsub)))
|
hosts_indices = list(range(len(pubsubs_gsub)))
|
||||||
|
|
||||||
topic = "test_join"
|
topic = "test_join"
|
||||||
|
to_drop_topic = "test_drop_topic"
|
||||||
central_node_index = 0
|
central_node_index = 0
|
||||||
# Remove index of central host from the indices
|
# Remove index of central host from the indices
|
||||||
hosts_indices.remove(central_node_index)
|
hosts_indices.remove(central_node_index)
|
||||||
@ -42,23 +43,31 @@ async def test_join():
|
|||||||
# Connect central host to all other hosts
|
# Connect central host to all other hosts
|
||||||
await one_to_all_connect(hosts, central_node_index)
|
await one_to_all_connect(hosts, central_node_index)
|
||||||
|
|
||||||
# Wait 2 seconds for heartbeat to allow mesh to connect
|
# Wait 1 seconds for heartbeat to allow mesh to connect
|
||||||
await trio.sleep(2)
|
await trio.sleep(1)
|
||||||
|
|
||||||
# Central node publish to the topic so that this topic
|
# Central node publish to the topic so that this topic
|
||||||
# is added to central node's fanout
|
# is added to central node's fanout
|
||||||
# publish from the randomly chosen host
|
# publish from the randomly chosen host
|
||||||
await pubsubs_gsub[central_node_index].publish(topic, b"data")
|
await pubsubs_gsub[central_node_index].publish(topic, b"data")
|
||||||
|
await pubsubs_gsub[central_node_index].publish(to_drop_topic, b"data")
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
# Check that the gossipsub of central node has fanout for the topics
|
||||||
|
assert topic, to_drop_topic in gossipsubs[central_node_index].fanout
|
||||||
|
# Check that the gossipsub of central node does not have a mesh for the topics
|
||||||
|
assert topic, to_drop_topic not in gossipsubs[central_node_index].mesh
|
||||||
|
# Check that the gossipsub of central node
|
||||||
|
# has a time_since_last_publish for the topics
|
||||||
|
assert topic in gossipsubs[central_node_index].time_since_last_publish
|
||||||
|
assert to_drop_topic in gossipsubs[central_node_index].time_since_last_publish
|
||||||
|
|
||||||
# Check that the gossipsub of central node has fanout for the topic
|
await trio.sleep(1)
|
||||||
assert topic in gossipsubs[central_node_index].fanout
|
# Check that after ttl the to_drop_topic is no more in fanout of central node
|
||||||
# Check that the gossipsub of central node does not have a mesh for the topic
|
assert to_drop_topic not in gossipsubs[central_node_index].fanout
|
||||||
assert topic not in gossipsubs[central_node_index].mesh
|
|
||||||
|
|
||||||
# Central node subscribes the topic
|
# Central node subscribes the topic
|
||||||
await pubsubs_gsub[central_node_index].subscribe(topic)
|
await pubsubs_gsub[central_node_index].subscribe(topic)
|
||||||
|
|
||||||
await trio.sleep(2)
|
await trio.sleep(1)
|
||||||
|
|
||||||
# Check that the gossipsub of central node no longer has fanout for the topic
|
# Check that the gossipsub of central node no longer has fanout for the topic
|
||||||
assert topic not in gossipsubs[central_node_index].fanout
|
assert topic not in gossipsubs[central_node_index].fanout
|
||||||
|
|||||||
Reference in New Issue
Block a user