From 4a53fc311155ccb3d1decbb74916ce1e167c9685 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Tue, 20 May 2025 19:01:57 +0530 Subject: [PATCH] Direct Peers : Gossipsub V1.1 (#594) * added basic structure for direct peers * added direct connect heartbeat * added logic to reject GRAFT from direct peers * added invocation of direct_connect_heartbeat * updated _get_peers_to_send to include direct peers * fixed failing gossipsub core and demo tests * fixed failing test_examples.py * add tests for peer management * fix lint * update tests * fixed direct_peers type and peer_records test * fixed failing gossipsub direct peers test * added reject graft test * updated reconnection test * added newsfragment * improved reject graft test * updated default value for direct peers * renamed direct_connect_init_delay parameter * reverted back to direct_connect_initial_delay param name --------- Co-authored-by: Khwahish Patel --- examples/pubsub/pubsub.py | 1 + libp2p/peer/peerstore.py | 3 + libp2p/pubsub/gossipsub.py | 58 ++++++ libp2p/tools/constants.py | 9 + newsfragments/594.feature.rst | 1 + tests/core/examples/test_examples.py | 4 +- .../pubsub/test_gossipsub_direct_peers.py | 170 ++++++++++++++++++ tests/utils/factories.py | 16 ++ 8 files changed, 260 insertions(+), 2 deletions(-) create mode 100644 newsfragments/594.feature.rst create mode 100644 tests/core/pubsub/test_gossipsub_direct_peers.py diff --git a/examples/pubsub/pubsub.py b/examples/pubsub/pubsub.py index dbfc2413..9f853744 100644 --- a/examples/pubsub/pubsub.py +++ b/examples/pubsub/pubsub.py @@ -136,6 +136,7 @@ async def run(topic: str, destination: Optional[str], port: Optional[int]) -> No degree=3, # Number of peers to maintain in mesh degree_low=2, # Lower bound for mesh peers degree_high=4, # Upper bound for mesh peers + direct_peers=None, # Direct peers time_to_live=60, # TTL for message cache in seconds gossip_window=2, # Smaller window for faster gossip gossip_history=5, # Keep more history diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index f49739ba..efee6059 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -4,6 +4,7 @@ from collections import ( from collections.abc import ( Sequence, ) +import sys from typing import ( Any, ) @@ -32,6 +33,8 @@ from .peerinfo import ( PeerInfo, ) +PERMANENT_ADDR_TTL = sys.maxsize + class PeerStore(IPeerStore): peer_data_map: dict[ID, PeerData] diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index f7ec49cb..8613bfe8 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -29,6 +29,12 @@ from libp2p.network.stream.exceptions import ( from libp2p.peer.id import ( ID, ) +from libp2p.peer.peerinfo import ( + PeerInfo, +) +from libp2p.peer.peerstore import ( + PERMANENT_ADDR_TTL, +) from libp2p.pubsub import ( floodsub, ) @@ -82,17 +88,24 @@ class GossipSub(IPubsubRouter, Service): heartbeat_initial_delay: float heartbeat_interval: int + direct_peers: dict[ID, PeerInfo] + direct_connect_initial_delay: float + direct_connect_interval: int + def __init__( self, protocols: Sequence[TProtocol], degree: int, degree_low: int, degree_high: int, + direct_peers: Sequence[PeerInfo] = None, time_to_live: int = 60, gossip_window: int = 3, gossip_history: int = 5, heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120, + direct_connect_initial_delay: float = 0.1, + direct_connect_interval: int = 300, ) -> None: self.protocols = list(protocols) self.pubsub = None @@ -119,10 +132,19 @@ class GossipSub(IPubsubRouter, Service): self.heartbeat_initial_delay = heartbeat_initial_delay self.heartbeat_interval = heartbeat_interval + # Create direct peers + self.direct_peers = dict() + for direct_peer in direct_peers or []: + self.direct_peers[direct_peer.peer_id] = direct_peer + self.direct_connect_interval = direct_connect_interval + self.direct_connect_initial_delay = direct_connect_initial_delay + async def run(self) -> None: if self.pubsub is None: raise NoPubsubAttached self.manager.run_daemon_task(self.heartbeat) + if len(self.direct_peers) > 0: + self.manager.run_daemon_task(self.direct_connect_heartbeat) await self.manager.wait_finished() # Interface functions @@ -142,6 +164,12 @@ class GossipSub(IPubsubRouter, Service): """ self.pubsub = pubsub + if len(self.direct_peers) > 0: + for pi in self.direct_peers: + self.pubsub.host.get_peerstore().add_addrs( + pi, self.direct_peers[pi].addrs, PERMANENT_ADDR_TTL + ) + logger.debug("attached to pusub") def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: @@ -241,6 +269,10 @@ class GossipSub(IPubsubRouter, Service): if topic not in self.pubsub.peer_topics: continue + # direct peers + _direct_peers: set[ID] = {_peer for _peer in self.direct_peers} + send_to.update(_direct_peers) + # floodsub peers floodsub_peers: set[ID] = { peer_id @@ -425,6 +457,24 @@ class GossipSub(IPubsubRouter, Service): await trio.sleep(self.heartbeat_interval) + async def direct_connect_heartbeat(self) -> None: + """ + Connect to direct peers. + """ + await trio.sleep(self.direct_connect_initial_delay) + while True: + for direct_peer in self.direct_peers: + if direct_peer not in self.pubsub.peers: + try: + await self.pubsub.host.connect(self.direct_peers[direct_peer]) + except Exception as e: + logger.debug( + "failed to connect to a direct peer %s: %s", + direct_peer, + e, + ) + await trio.sleep(self.direct_connect_interval) + def mesh_heartbeat( self, ) -> tuple[DefaultDict[ID, list[str]], DefaultDict[ID, list[str]]]: @@ -654,6 +704,14 @@ class GossipSub(IPubsubRouter, Service): # Add peer to mesh for topic if topic in self.mesh: + for direct_peer in self.direct_peers: + if direct_peer == sender_peer_id: + logger.warning( + "GRAFT: ignoring request from direct peer %s", sender_peer_id + ) + await self.emit_prune(topic, sender_peer_id) + return + if sender_peer_id not in self.mesh[topic]: self.mesh[topic].add(sender_peer_id) else: diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index 9306b066..b9d5c849 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -1,9 +1,15 @@ +from collections.abc import ( + Sequence, +) from typing import ( NamedTuple, ) import multiaddr +from libp2p.peer.peerinfo import ( + PeerInfo, +) from libp2p.pubsub import ( floodsub, gossipsub, @@ -26,11 +32,14 @@ class GossipsubParams(NamedTuple): degree: int = 10 degree_low: int = 9 degree_high: int = 11 + direct_peers: Sequence[PeerInfo] = None time_to_live: int = 30 gossip_window: int = 3 gossip_history: int = 5 heartbeat_initial_delay: float = 0.1 heartbeat_interval: float = 0.5 + direct_connect_initial_delay: float = 0.1 + direct_connect_interval: int = 300 GOSSIPSUB_PARAMS = GossipsubParams() diff --git a/newsfragments/594.feature.rst b/newsfragments/594.feature.rst new file mode 100644 index 00000000..adb6ea41 --- /dev/null +++ b/newsfragments/594.feature.rst @@ -0,0 +1 @@ +added ``direct peers`` as part of gossipsub v1.1 upgrade. diff --git a/tests/core/examples/test_examples.py b/tests/core/examples/test_examples.py index 2b86fc72..567593b1 100644 --- a/tests/core/examples/test_examples.py +++ b/tests/core/examples/test_examples.py @@ -209,8 +209,8 @@ async def ping_demo(host_a, host_b): async def pubsub_demo(host_a, host_b): - gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, 0.1, 1) - gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, 0.1, 1) + gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 0.1, 1) + gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 0.1, 1) pubsub_a = Pubsub(host_a, gossipsub_a) pubsub_b = Pubsub(host_b, gossipsub_b) message_a_to_b = "Hello from A to B" diff --git a/tests/core/pubsub/test_gossipsub_direct_peers.py b/tests/core/pubsub/test_gossipsub_direct_peers.py new file mode 100644 index 00000000..d8464a4b --- /dev/null +++ b/tests/core/pubsub/test_gossipsub_direct_peers.py @@ -0,0 +1,170 @@ +import pytest +import trio + +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) +from libp2p.tools.utils import ( + connect, +) +from tests.utils.factories import ( + PubsubFactory, +) + + +@pytest.mark.trio +async def test_attach_peer_records(): + """Test that attach ensures existence of peer records in peer store.""" + # Create first host + async with PubsubFactory.create_batch_with_gossipsub(1) as pubsubs_gsub_0: + host_0 = pubsubs_gsub_0[0].host + + # Create second host with first host as direct peer + async with PubsubFactory.create_batch_with_gossipsub( + 1, + direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])], + ) as pubsubs_gsub_1: + host_1 = pubsubs_gsub_1[0].host + + # Wait for heartbeat to allow mesh to connect + await trio.sleep(2) + + try: + # Verify that peer records exist in peer store + peer_store_0 = host_0.get_peerstore() + peer_store_1 = host_1.get_peerstore() + + # Check that each host has the other's peer record + peer_ids_0 = peer_store_0.peer_ids() + peer_ids_1 = peer_store_1.peer_ids() + + print(f"Peer store 0 IDs: {peer_ids_0}") + print(f"Peer store 1 IDs: {peer_ids_1}") + print(f"Host 0 ID: {host_0.get_id()}") + print(f"Host 1 ID: {host_1.get_id()}") + + assert host_0.get_id() in peer_ids_1, "Peer 0 not found in peer store 1" + + except Exception as e: + print(f"Test failed with error: {e}") + raise + + +@pytest.mark.trio +async def test_reject_graft(): + """Test that graft requests are rejected if the sender is a direct peer.""" + # Create first host + async with PubsubFactory.create_batch_with_gossipsub( + 1, heartbeat_interval=1, direct_connect_interval=2 + ) as pubsubs_gsub_0: + host_0 = pubsubs_gsub_0[0].host + + # Create second host with first host as direct peer + async with PubsubFactory.create_batch_with_gossipsub( + 1, + heartbeat_interval=1, + direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])], + direct_connect_interval=2, + ) as pubsubs_gsub_1: + host_1 = pubsubs_gsub_1[0].host + + try: + # Connect the hosts + await connect(host_0, host_1) + + # Wait 2 seconds for heartbeat to allow mesh to connect + await trio.sleep(1) + + topic = "test_reject_graft" + + # Gossipsub 0 and 1 joins topic + await pubsubs_gsub_0[0].router.join(topic) + await pubsubs_gsub_1[0].router.join(topic) + + # Pre-Graft assertions + assert ( + topic in pubsubs_gsub_0[0].router.mesh + ), "topic not in mesh for gossipsub 0" + assert ( + topic in pubsubs_gsub_1[0].router.mesh + ), "topic not in mesh for gossipsub 1" + assert ( + host_1.get_id() not in pubsubs_gsub_0[0].router.mesh[topic] + ), "gossipsub 1 in mesh topic for gossipsub 0" + assert ( + host_0.get_id() not in pubsubs_gsub_1[0].router.mesh[topic] + ), "gossipsub 0 in mesh topic for gossipsub 1" + + # Gossipsub 1 emits a graft request to Gossipsub 0 + await pubsubs_gsub_0[0].router.emit_graft(topic, host_1.get_id()) + + await trio.sleep(1) + + # Post-Graft assertions + assert ( + host_1.get_id() not in pubsubs_gsub_0[0].router.mesh[topic] + ), "gossipsub 1 in mesh topic for gossipsub 0" + assert ( + host_0.get_id() not in pubsubs_gsub_1[0].router.mesh[topic] + ), "gossipsub 0 in mesh topic for gossipsub 1" + + except Exception as e: + print(f"Test failed with error: {e}") + raise + + +@pytest.mark.trio +async def test_heartbeat_reconnect(): + """Test that heartbeat can reconnect with disconnected direct peers gracefully.""" + # Create first host + async with PubsubFactory.create_batch_with_gossipsub( + 1, heartbeat_interval=1, direct_connect_interval=3 + ) as pubsubs_gsub_0: + host_0 = pubsubs_gsub_0[0].host + + # Create second host with first host as direct peer + async with PubsubFactory.create_batch_with_gossipsub( + 1, + heartbeat_interval=1, + direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])], + direct_connect_interval=3, + ) as pubsubs_gsub_1: + host_1 = pubsubs_gsub_1[0].host + + # Connect the hosts + await connect(host_0, host_1) + + try: + # Wait for initial connection and mesh setup + await trio.sleep(1) + + # Verify initial connection + assert ( + host_1.get_id() in pubsubs_gsub_0[0].peers + ), "Initial connection not established for gossipsub 0" + assert ( + host_0.get_id() in pubsubs_gsub_1[0].peers + ), "Initial connection not established for gossipsub 0" + + # Simulate disconnection + await host_0.disconnect(host_1.get_id()) + + # Wait for heartbeat to detect disconnection + await trio.sleep(1) + + # Verify that peers are removed after disconnection + assert ( + host_0.get_id() not in pubsubs_gsub_1[0].peers + ), "Peer 0 still in gossipsub 1 after disconnection" + + # Wait for heartbeat to reestablish connection + await trio.sleep(2) + + # Verify connection reestablishment + assert ( + host_0.get_id() in pubsubs_gsub_1[0].peers + ), "Reconnection not established for gossipsub 0" + + except Exception as e: + print(f"Test failed with error: {e}") + raise diff --git a/tests/utils/factories.py b/tests/utils/factories.py index 08a5b67e..bd661320 100644 --- a/tests/utils/factories.py +++ b/tests/utils/factories.py @@ -421,10 +421,13 @@ class GossipsubFactory(factory.Factory): degree = GOSSIPSUB_PARAMS.degree degree_low = GOSSIPSUB_PARAMS.degree_low degree_high = GOSSIPSUB_PARAMS.degree_high + direct_peers = GOSSIPSUB_PARAMS.direct_peers gossip_window = GOSSIPSUB_PARAMS.gossip_window gossip_history = GOSSIPSUB_PARAMS.gossip_history heartbeat_initial_delay = GOSSIPSUB_PARAMS.heartbeat_initial_delay heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval + direct_connect_initial_delay = GOSSIPSUB_PARAMS.direct_connect_initial_delay + direct_connect_interval = GOSSIPSUB_PARAMS.direct_connect_interval class PubsubFactory(factory.Factory): @@ -541,11 +544,14 @@ class PubsubFactory(factory.Factory): degree: int = GOSSIPSUB_PARAMS.degree, degree_low: int = GOSSIPSUB_PARAMS.degree_low, degree_high: int = GOSSIPSUB_PARAMS.degree_high, + direct_peers: Sequence[PeerInfo] = GOSSIPSUB_PARAMS.direct_peers, time_to_live: int = GOSSIPSUB_PARAMS.time_to_live, gossip_window: int = GOSSIPSUB_PARAMS.gossip_window, gossip_history: int = GOSSIPSUB_PARAMS.gossip_history, heartbeat_interval: float = GOSSIPSUB_PARAMS.heartbeat_interval, heartbeat_initial_delay: float = GOSSIPSUB_PARAMS.heartbeat_initial_delay, + direct_connect_initial_delay: float = GOSSIPSUB_PARAMS.direct_connect_initial_delay, # noqa: E501 + direct_connect_interval: int = GOSSIPSUB_PARAMS.direct_connect_interval, security_protocol: TProtocol = None, muxer_opt: TMuxerOptions = None, msg_id_constructor: Callable[ @@ -559,9 +565,14 @@ class PubsubFactory(factory.Factory): degree=degree, degree_low=degree_low, degree_high=degree_high, + direct_peers=direct_peers, time_to_live=time_to_live, gossip_window=gossip_window, + gossip_history=gossip_history, + heartbeat_initial_delay=heartbeat_initial_delay, heartbeat_interval=heartbeat_interval, + direct_connect_initial_delay=direct_connect_initial_delay, + direct_connect_interval=direct_connect_interval, ) else: gossipsubs = GossipsubFactory.create_batch( @@ -569,8 +580,13 @@ class PubsubFactory(factory.Factory): degree=degree, degree_low=degree_low, degree_high=degree_high, + direct_peers=direct_peers, + time_to_live=time_to_live, gossip_window=gossip_window, heartbeat_interval=heartbeat_interval, + heartbeat_initial_delay=heartbeat_initial_delay, + direct_connect_initial_delay=direct_connect_initial_delay, + direct_connect_interval=direct_connect_interval, ) async with cls._create_batch_with_router(