mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Direct Peers : Gossipsub V1.1 (#594)
* added basic structure for direct peers * added direct connect heartbeat * added logic to reject GRAFT from direct peers * added invocation of direct_connect_heartbeat * updated _get_peers_to_send to include direct peers * fixed failing gossipsub core and demo tests * fixed failing test_examples.py * add tests for peer management * fix lint * update tests * fixed direct_peers type and peer_records test * fixed failing gossipsub direct peers test * added reject graft test * updated reconnection test * added newsfragment * improved reject graft test * updated default value for direct peers * renamed direct_connect_init_delay parameter * reverted back to direct_connect_initial_delay param name --------- Co-authored-by: Khwahish Patel <khwahish.p1@ahduni.edu.in>
This commit is contained in:
@ -136,6 +136,7 @@ async def run(topic: str, destination: Optional[str], port: Optional[int]) -> No
|
||||
degree=3, # Number of peers to maintain in mesh
|
||||
degree_low=2, # Lower bound for mesh peers
|
||||
degree_high=4, # Upper bound for mesh peers
|
||||
direct_peers=None, # Direct peers
|
||||
time_to_live=60, # TTL for message cache in seconds
|
||||
gossip_window=2, # Smaller window for faster gossip
|
||||
gossip_history=5, # Keep more history
|
||||
|
||||
@ -4,6 +4,7 @@ from collections import (
|
||||
from collections.abc import (
|
||||
Sequence,
|
||||
)
|
||||
import sys
|
||||
from typing import (
|
||||
Any,
|
||||
)
|
||||
@ -32,6 +33,8 @@ from .peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
|
||||
PERMANENT_ADDR_TTL = sys.maxsize
|
||||
|
||||
|
||||
class PeerStore(IPeerStore):
|
||||
peer_data_map: dict[ID, PeerData]
|
||||
|
||||
@ -29,6 +29,12 @@ from libp2p.network.stream.exceptions import (
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
from libp2p.peer.peerstore import (
|
||||
PERMANENT_ADDR_TTL,
|
||||
)
|
||||
from libp2p.pubsub import (
|
||||
floodsub,
|
||||
)
|
||||
@ -82,17 +88,24 @@ class GossipSub(IPubsubRouter, Service):
|
||||
heartbeat_initial_delay: float
|
||||
heartbeat_interval: int
|
||||
|
||||
direct_peers: dict[ID, PeerInfo]
|
||||
direct_connect_initial_delay: float
|
||||
direct_connect_interval: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
protocols: Sequence[TProtocol],
|
||||
degree: int,
|
||||
degree_low: int,
|
||||
degree_high: int,
|
||||
direct_peers: Sequence[PeerInfo] = None,
|
||||
time_to_live: int = 60,
|
||||
gossip_window: int = 3,
|
||||
gossip_history: int = 5,
|
||||
heartbeat_initial_delay: float = 0.1,
|
||||
heartbeat_interval: int = 120,
|
||||
direct_connect_initial_delay: float = 0.1,
|
||||
direct_connect_interval: int = 300,
|
||||
) -> None:
|
||||
self.protocols = list(protocols)
|
||||
self.pubsub = None
|
||||
@ -119,10 +132,19 @@ class GossipSub(IPubsubRouter, Service):
|
||||
self.heartbeat_initial_delay = heartbeat_initial_delay
|
||||
self.heartbeat_interval = heartbeat_interval
|
||||
|
||||
# Create direct peers
|
||||
self.direct_peers = dict()
|
||||
for direct_peer in direct_peers or []:
|
||||
self.direct_peers[direct_peer.peer_id] = direct_peer
|
||||
self.direct_connect_interval = direct_connect_interval
|
||||
self.direct_connect_initial_delay = direct_connect_initial_delay
|
||||
|
||||
async def run(self) -> None:
|
||||
if self.pubsub is None:
|
||||
raise NoPubsubAttached
|
||||
self.manager.run_daemon_task(self.heartbeat)
|
||||
if len(self.direct_peers) > 0:
|
||||
self.manager.run_daemon_task(self.direct_connect_heartbeat)
|
||||
await self.manager.wait_finished()
|
||||
|
||||
# Interface functions
|
||||
@ -142,6 +164,12 @@ class GossipSub(IPubsubRouter, Service):
|
||||
"""
|
||||
self.pubsub = pubsub
|
||||
|
||||
if len(self.direct_peers) > 0:
|
||||
for pi in self.direct_peers:
|
||||
self.pubsub.host.get_peerstore().add_addrs(
|
||||
pi, self.direct_peers[pi].addrs, PERMANENT_ADDR_TTL
|
||||
)
|
||||
|
||||
logger.debug("attached to pusub")
|
||||
|
||||
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
|
||||
@ -241,6 +269,10 @@ class GossipSub(IPubsubRouter, Service):
|
||||
if topic not in self.pubsub.peer_topics:
|
||||
continue
|
||||
|
||||
# direct peers
|
||||
_direct_peers: set[ID] = {_peer for _peer in self.direct_peers}
|
||||
send_to.update(_direct_peers)
|
||||
|
||||
# floodsub peers
|
||||
floodsub_peers: set[ID] = {
|
||||
peer_id
|
||||
@ -425,6 +457,24 @@ class GossipSub(IPubsubRouter, Service):
|
||||
|
||||
await trio.sleep(self.heartbeat_interval)
|
||||
|
||||
async def direct_connect_heartbeat(self) -> None:
|
||||
"""
|
||||
Connect to direct peers.
|
||||
"""
|
||||
await trio.sleep(self.direct_connect_initial_delay)
|
||||
while True:
|
||||
for direct_peer in self.direct_peers:
|
||||
if direct_peer not in self.pubsub.peers:
|
||||
try:
|
||||
await self.pubsub.host.connect(self.direct_peers[direct_peer])
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"failed to connect to a direct peer %s: %s",
|
||||
direct_peer,
|
||||
e,
|
||||
)
|
||||
await trio.sleep(self.direct_connect_interval)
|
||||
|
||||
def mesh_heartbeat(
|
||||
self,
|
||||
) -> tuple[DefaultDict[ID, list[str]], DefaultDict[ID, list[str]]]:
|
||||
@ -654,6 +704,14 @@ class GossipSub(IPubsubRouter, Service):
|
||||
|
||||
# Add peer to mesh for topic
|
||||
if topic in self.mesh:
|
||||
for direct_peer in self.direct_peers:
|
||||
if direct_peer == sender_peer_id:
|
||||
logger.warning(
|
||||
"GRAFT: ignoring request from direct peer %s", sender_peer_id
|
||||
)
|
||||
await self.emit_prune(topic, sender_peer_id)
|
||||
return
|
||||
|
||||
if sender_peer_id not in self.mesh[topic]:
|
||||
self.mesh[topic].add(sender_peer_id)
|
||||
else:
|
||||
|
||||
@ -1,9 +1,15 @@
|
||||
from collections.abc import (
|
||||
Sequence,
|
||||
)
|
||||
from typing import (
|
||||
NamedTuple,
|
||||
)
|
||||
|
||||
import multiaddr
|
||||
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
from libp2p.pubsub import (
|
||||
floodsub,
|
||||
gossipsub,
|
||||
@ -26,11 +32,14 @@ class GossipsubParams(NamedTuple):
|
||||
degree: int = 10
|
||||
degree_low: int = 9
|
||||
degree_high: int = 11
|
||||
direct_peers: Sequence[PeerInfo] = None
|
||||
time_to_live: int = 30
|
||||
gossip_window: int = 3
|
||||
gossip_history: int = 5
|
||||
heartbeat_initial_delay: float = 0.1
|
||||
heartbeat_interval: float = 0.5
|
||||
direct_connect_initial_delay: float = 0.1
|
||||
direct_connect_interval: int = 300
|
||||
|
||||
|
||||
GOSSIPSUB_PARAMS = GossipsubParams()
|
||||
|
||||
1
newsfragments/594.feature.rst
Normal file
1
newsfragments/594.feature.rst
Normal file
@ -0,0 +1 @@
|
||||
added ``direct peers`` as part of gossipsub v1.1 upgrade.
|
||||
@ -209,8 +209,8 @@ async def ping_demo(host_a, host_b):
|
||||
|
||||
|
||||
async def pubsub_demo(host_a, host_b):
|
||||
gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, 0.1, 1)
|
||||
gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, 0.1, 1)
|
||||
gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 0.1, 1)
|
||||
gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 0.1, 1)
|
||||
pubsub_a = Pubsub(host_a, gossipsub_a)
|
||||
pubsub_b = Pubsub(host_b, gossipsub_b)
|
||||
message_a_to_b = "Hello from A to B"
|
||||
|
||||
170
tests/core/pubsub/test_gossipsub_direct_peers.py
Normal file
170
tests/core/pubsub/test_gossipsub_direct_peers.py
Normal file
@ -0,0 +1,170 @@
|
||||
import pytest
|
||||
import trio
|
||||
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
)
|
||||
from libp2p.tools.utils import (
|
||||
connect,
|
||||
)
|
||||
from tests.utils.factories import (
|
||||
PubsubFactory,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_attach_peer_records():
|
||||
"""Test that attach ensures existence of peer records in peer store."""
|
||||
# Create first host
|
||||
async with PubsubFactory.create_batch_with_gossipsub(1) as pubsubs_gsub_0:
|
||||
host_0 = pubsubs_gsub_0[0].host
|
||||
|
||||
# Create second host with first host as direct peer
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
1,
|
||||
direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])],
|
||||
) as pubsubs_gsub_1:
|
||||
host_1 = pubsubs_gsub_1[0].host
|
||||
|
||||
# Wait for heartbeat to allow mesh to connect
|
||||
await trio.sleep(2)
|
||||
|
||||
try:
|
||||
# Verify that peer records exist in peer store
|
||||
peer_store_0 = host_0.get_peerstore()
|
||||
peer_store_1 = host_1.get_peerstore()
|
||||
|
||||
# Check that each host has the other's peer record
|
||||
peer_ids_0 = peer_store_0.peer_ids()
|
||||
peer_ids_1 = peer_store_1.peer_ids()
|
||||
|
||||
print(f"Peer store 0 IDs: {peer_ids_0}")
|
||||
print(f"Peer store 1 IDs: {peer_ids_1}")
|
||||
print(f"Host 0 ID: {host_0.get_id()}")
|
||||
print(f"Host 1 ID: {host_1.get_id()}")
|
||||
|
||||
assert host_0.get_id() in peer_ids_1, "Peer 0 not found in peer store 1"
|
||||
|
||||
except Exception as e:
|
||||
print(f"Test failed with error: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_reject_graft():
|
||||
"""Test that graft requests are rejected if the sender is a direct peer."""
|
||||
# Create first host
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
1, heartbeat_interval=1, direct_connect_interval=2
|
||||
) as pubsubs_gsub_0:
|
||||
host_0 = pubsubs_gsub_0[0].host
|
||||
|
||||
# Create second host with first host as direct peer
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
1,
|
||||
heartbeat_interval=1,
|
||||
direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])],
|
||||
direct_connect_interval=2,
|
||||
) as pubsubs_gsub_1:
|
||||
host_1 = pubsubs_gsub_1[0].host
|
||||
|
||||
try:
|
||||
# Connect the hosts
|
||||
await connect(host_0, host_1)
|
||||
|
||||
# Wait 2 seconds for heartbeat to allow mesh to connect
|
||||
await trio.sleep(1)
|
||||
|
||||
topic = "test_reject_graft"
|
||||
|
||||
# Gossipsub 0 and 1 joins topic
|
||||
await pubsubs_gsub_0[0].router.join(topic)
|
||||
await pubsubs_gsub_1[0].router.join(topic)
|
||||
|
||||
# Pre-Graft assertions
|
||||
assert (
|
||||
topic in pubsubs_gsub_0[0].router.mesh
|
||||
), "topic not in mesh for gossipsub 0"
|
||||
assert (
|
||||
topic in pubsubs_gsub_1[0].router.mesh
|
||||
), "topic not in mesh for gossipsub 1"
|
||||
assert (
|
||||
host_1.get_id() not in pubsubs_gsub_0[0].router.mesh[topic]
|
||||
), "gossipsub 1 in mesh topic for gossipsub 0"
|
||||
assert (
|
||||
host_0.get_id() not in pubsubs_gsub_1[0].router.mesh[topic]
|
||||
), "gossipsub 0 in mesh topic for gossipsub 1"
|
||||
|
||||
# Gossipsub 1 emits a graft request to Gossipsub 0
|
||||
await pubsubs_gsub_0[0].router.emit_graft(topic, host_1.get_id())
|
||||
|
||||
await trio.sleep(1)
|
||||
|
||||
# Post-Graft assertions
|
||||
assert (
|
||||
host_1.get_id() not in pubsubs_gsub_0[0].router.mesh[topic]
|
||||
), "gossipsub 1 in mesh topic for gossipsub 0"
|
||||
assert (
|
||||
host_0.get_id() not in pubsubs_gsub_1[0].router.mesh[topic]
|
||||
), "gossipsub 0 in mesh topic for gossipsub 1"
|
||||
|
||||
except Exception as e:
|
||||
print(f"Test failed with error: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_heartbeat_reconnect():
|
||||
"""Test that heartbeat can reconnect with disconnected direct peers gracefully."""
|
||||
# Create first host
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
1, heartbeat_interval=1, direct_connect_interval=3
|
||||
) as pubsubs_gsub_0:
|
||||
host_0 = pubsubs_gsub_0[0].host
|
||||
|
||||
# Create second host with first host as direct peer
|
||||
async with PubsubFactory.create_batch_with_gossipsub(
|
||||
1,
|
||||
heartbeat_interval=1,
|
||||
direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])],
|
||||
direct_connect_interval=3,
|
||||
) as pubsubs_gsub_1:
|
||||
host_1 = pubsubs_gsub_1[0].host
|
||||
|
||||
# Connect the hosts
|
||||
await connect(host_0, host_1)
|
||||
|
||||
try:
|
||||
# Wait for initial connection and mesh setup
|
||||
await trio.sleep(1)
|
||||
|
||||
# Verify initial connection
|
||||
assert (
|
||||
host_1.get_id() in pubsubs_gsub_0[0].peers
|
||||
), "Initial connection not established for gossipsub 0"
|
||||
assert (
|
||||
host_0.get_id() in pubsubs_gsub_1[0].peers
|
||||
), "Initial connection not established for gossipsub 0"
|
||||
|
||||
# Simulate disconnection
|
||||
await host_0.disconnect(host_1.get_id())
|
||||
|
||||
# Wait for heartbeat to detect disconnection
|
||||
await trio.sleep(1)
|
||||
|
||||
# Verify that peers are removed after disconnection
|
||||
assert (
|
||||
host_0.get_id() not in pubsubs_gsub_1[0].peers
|
||||
), "Peer 0 still in gossipsub 1 after disconnection"
|
||||
|
||||
# Wait for heartbeat to reestablish connection
|
||||
await trio.sleep(2)
|
||||
|
||||
# Verify connection reestablishment
|
||||
assert (
|
||||
host_0.get_id() in pubsubs_gsub_1[0].peers
|
||||
), "Reconnection not established for gossipsub 0"
|
||||
|
||||
except Exception as e:
|
||||
print(f"Test failed with error: {e}")
|
||||
raise
|
||||
@ -421,10 +421,13 @@ class GossipsubFactory(factory.Factory):
|
||||
degree = GOSSIPSUB_PARAMS.degree
|
||||
degree_low = GOSSIPSUB_PARAMS.degree_low
|
||||
degree_high = GOSSIPSUB_PARAMS.degree_high
|
||||
direct_peers = GOSSIPSUB_PARAMS.direct_peers
|
||||
gossip_window = GOSSIPSUB_PARAMS.gossip_window
|
||||
gossip_history = GOSSIPSUB_PARAMS.gossip_history
|
||||
heartbeat_initial_delay = GOSSIPSUB_PARAMS.heartbeat_initial_delay
|
||||
heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval
|
||||
direct_connect_initial_delay = GOSSIPSUB_PARAMS.direct_connect_initial_delay
|
||||
direct_connect_interval = GOSSIPSUB_PARAMS.direct_connect_interval
|
||||
|
||||
|
||||
class PubsubFactory(factory.Factory):
|
||||
@ -541,11 +544,14 @@ class PubsubFactory(factory.Factory):
|
||||
degree: int = GOSSIPSUB_PARAMS.degree,
|
||||
degree_low: int = GOSSIPSUB_PARAMS.degree_low,
|
||||
degree_high: int = GOSSIPSUB_PARAMS.degree_high,
|
||||
direct_peers: Sequence[PeerInfo] = GOSSIPSUB_PARAMS.direct_peers,
|
||||
time_to_live: int = GOSSIPSUB_PARAMS.time_to_live,
|
||||
gossip_window: int = GOSSIPSUB_PARAMS.gossip_window,
|
||||
gossip_history: int = GOSSIPSUB_PARAMS.gossip_history,
|
||||
heartbeat_interval: float = GOSSIPSUB_PARAMS.heartbeat_interval,
|
||||
heartbeat_initial_delay: float = GOSSIPSUB_PARAMS.heartbeat_initial_delay,
|
||||
direct_connect_initial_delay: float = GOSSIPSUB_PARAMS.direct_connect_initial_delay, # noqa: E501
|
||||
direct_connect_interval: int = GOSSIPSUB_PARAMS.direct_connect_interval,
|
||||
security_protocol: TProtocol = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
msg_id_constructor: Callable[
|
||||
@ -559,9 +565,14 @@ class PubsubFactory(factory.Factory):
|
||||
degree=degree,
|
||||
degree_low=degree_low,
|
||||
degree_high=degree_high,
|
||||
direct_peers=direct_peers,
|
||||
time_to_live=time_to_live,
|
||||
gossip_window=gossip_window,
|
||||
gossip_history=gossip_history,
|
||||
heartbeat_initial_delay=heartbeat_initial_delay,
|
||||
heartbeat_interval=heartbeat_interval,
|
||||
direct_connect_initial_delay=direct_connect_initial_delay,
|
||||
direct_connect_interval=direct_connect_interval,
|
||||
)
|
||||
else:
|
||||
gossipsubs = GossipsubFactory.create_batch(
|
||||
@ -569,8 +580,13 @@ class PubsubFactory(factory.Factory):
|
||||
degree=degree,
|
||||
degree_low=degree_low,
|
||||
degree_high=degree_high,
|
||||
direct_peers=direct_peers,
|
||||
time_to_live=time_to_live,
|
||||
gossip_window=gossip_window,
|
||||
heartbeat_interval=heartbeat_interval,
|
||||
heartbeat_initial_delay=heartbeat_initial_delay,
|
||||
direct_connect_initial_delay=direct_connect_initial_delay,
|
||||
direct_connect_interval=direct_connect_interval,
|
||||
)
|
||||
|
||||
async with cls._create_batch_with_router(
|
||||
|
||||
Reference in New Issue
Block a user