From 303bf3060afc802bd98e2bd7364ec243fc2a1882 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Fri, 20 Jun 2025 13:10:52 +0530 Subject: [PATCH] implemented peer exchange --- libp2p/peer/peerinfo.py | 18 ++++++++++++ libp2p/pubsub/gossipsub.py | 56 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index f3b3bd7b..29ce4e66 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -66,5 +66,23 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: return PeerInfo(peer_id, [addr]) +def peer_info_to_bytes(peer_info: PeerInfo) -> bytes: + lines = [str(peer_info.peer_id)] + [str(addr) for addr in peer_info.addrs] + return "\n".join(lines).encode("utf-8") + + +def peer_info_from_bytes(data: bytes) -> PeerInfo: + try: + lines = data.decode("utf-8").splitlines() + if not lines: + raise InvalidAddrError("no data to decode PeerInfo") + + peer_id = ID.from_base58(lines[0]) + addrs = [multiaddr.Multiaddr(addr_str) for addr_str in lines[1:]] + return PeerInfo(peer_id, addrs) + except Exception as e: + raise InvalidAddrError(f"failed to decode PeerInfo: {e}") + + class InvalidAddrError(ValueError): pass diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 7abe6251..43602a29 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -32,6 +32,8 @@ from libp2p.peer.id import ( ) from libp2p.peer.peerinfo import ( PeerInfo, + peer_info_from_bytes, + peer_info_to_bytes, ) from libp2p.peer.peerstore import ( PERMANENT_ADDR_TTL, @@ -93,6 +95,7 @@ class GossipSub(IPubsubRouter, Service): direct_connect_interval: int do_px: bool + px_peers_count: int back_off: dict[str, dict[ID, int]] prune_back_off: int unsubscribe_back_off: int @@ -112,6 +115,7 @@ class GossipSub(IPubsubRouter, Service): direct_connect_initial_delay: float = 0.1, direct_connect_interval: int = 300, do_px: bool = False, + px_peers_count: int = 16, prune_back_off: int = 60, unsubscribe_back_off: int = 10, ) -> None: @@ -149,6 +153,7 @@ class GossipSub(IPubsubRouter, Service): self.time_since_last_publish = {} self.do_px = do_px + self.px_peers_count = px_peers_count self.back_off = dict() self.prune_back_off = prune_back_off self.unsubscribe_back_off = unsubscribe_back_off @@ -737,6 +742,37 @@ class GossipSub(IPubsubRouter, Service): del self.back_off[topic][peer] return False + async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None: + if len(px_peers) > self.px_peers_count: + px_peers = px_peers[: self.px_peers_count] + + for peer in px_peers: + peer_id: ID = ID(peer.peerID) + + if self.pubsub and peer_id in self.pubsub.peers: + continue + + try: + peer_info = peer_info_from_bytes(peer.signedPeerRecord) + try: + if self.pubsub is None: + raise NoPubsubAttached + await self.pubsub.host.connect(peer_info) + except Exception as e: + logger.warning( + "failed to connect to px peer %s: %s", + peer_id, + e, + ) + continue + except Exception as e: + logger.warning( + "failed to parse peer info from px peer %s: %s", + peer_id, + e, + ) + continue + # RPC handlers async def handle_ihave( @@ -853,6 +889,9 @@ class GossipSub(IPubsubRouter, Service): ) -> None: topic: str = prune_msg.topicID backoff_till: int = prune_msg.backoff + px_peers: list[rpc_pb2.PeerInfo] = [] + for peer in prune_msg.peers: + px_peers.append(peer) # Remove peer from mesh for topic if topic in self.mesh: @@ -863,6 +902,9 @@ class GossipSub(IPubsubRouter, Service): self.mesh[topic].discard(sender_peer_id) + if px_peers: + await self._do_px(px_peers) + # RPC emitters def pack_control_msgs( @@ -924,8 +966,18 @@ class GossipSub(IPubsubRouter, Service): prune_msg.backoff = back_off_duration - # TODO: add peers once peerstore changes are complete - # prune_msg.peers = + if do_px: + exchange_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.px_peers_count, [to_peer] + ) + for peer in exchange_peers: + if self.pubsub is None: + raise NoPubsubAttached + peer_info = self.pubsub.host.get_peerstore().peer_info(peer) + signed_peer_record: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo() + signed_peer_record.peerID = peer.to_bytes() + signed_peer_record.signedPeerRecord = peer_info_to_bytes(peer_info) + prune_msg.peers.append(signed_peer_record) control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.prune.extend([prune_msg])