From c48618825da7af0377a92257e00d7b2b850a0bf1 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Fri, 30 May 2025 19:10:09 +0530 Subject: [PATCH 1/8] updated protobuf for prune message --- libp2p/pubsub/pb/rpc.proto | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index a0857569..7abce0d6 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -47,6 +47,13 @@ message ControlGraft { message ControlPrune { optional string topicID = 1; + repeated PeerInfo peers = 2; + optional uint64 backoff = 3; +} + +message PeerInfo { + optional bytes peerID = 1; + optional bytes signedPeerRecord = 2; } message TopicDescriptor { From b78468ca32c2034f277458acf1d2142c48ae596e Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Sun, 1 Jun 2025 12:19:09 +0530 Subject: [PATCH 2/8] added params for peer exchange and back off --- libp2p/pubsub/gossipsub.py | 36 ++++++++++++++++++++-- libp2p/pubsub/pb/rpc_pb2.py | 60 +++++++++++++++++++----------------- libp2p/pubsub/pb/rpc_pb2.pyi | 30 ++++++++++++++++-- 3 files changed, 92 insertions(+), 34 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index ccfeb941..d8f11215 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -92,6 +92,10 @@ class GossipSub(IPubsubRouter, Service): direct_connect_initial_delay: float direct_connect_interval: int + do_px: bool + back_off: int + unsubscribe_back_off: int + def __init__( self, protocols: Sequence[TProtocol], @@ -106,6 +110,9 @@ class GossipSub(IPubsubRouter, Service): heartbeat_interval: int = 120, direct_connect_initial_delay: float = 0.1, direct_connect_interval: int = 300, + do_px: bool = False, + back_off: int = 60, + unsubscribe_back_off: int = 10, ) -> None: self.protocols = list(protocols) self.pubsub = None @@ -140,6 +147,10 @@ class GossipSub(IPubsubRouter, Service): self.direct_connect_initial_delay = direct_connect_initial_delay self.time_since_last_publish = {} + self.do_px = do_px + self.back_off = back_off + self.unsubscribe_back_off = unsubscribe_back_off + async def run(self) -> None: self.manager.run_daemon_task(self.heartbeat) if len(self.direct_peers) > 0: @@ -369,7 +380,7 @@ class GossipSub(IPubsubRouter, Service): return # Notify the peers in mesh[topic] with a PRUNE(topic) message for peer in self.mesh[topic]: - await self.emit_prune(topic, peer) + await self.emit_prune(topic, peer, do_px=self.do_px, is_unsubscribe=True) # Forget mesh[topic] self.mesh.pop(topic, None) @@ -751,6 +762,9 @@ class GossipSub(IPubsubRouter, Service): ) -> None: topic: str = graft_msg.topicID + # TODO: complete the remaining logic + self.do_px + # Add peer to mesh for topic if topic in self.mesh: for direct_peer in self.direct_peers: @@ -758,14 +772,18 @@ class GossipSub(IPubsubRouter, Service): logger.warning( "GRAFT: ignoring request from direct peer %s", sender_peer_id ) - await self.emit_prune(topic, sender_peer_id) + await self.emit_prune( + topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False + ) return if sender_peer_id not in self.mesh[topic]: self.mesh[topic].add(sender_peer_id) else: # Respond with PRUNE if not subscribed to the topic - await self.emit_prune(topic, sender_peer_id) + await self.emit_prune( + topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False + ) async def handle_prune( self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID @@ -824,11 +842,23 @@ class GossipSub(IPubsubRouter, Service): await self.emit_control_message(control_msg, id) + async def emit_prune( + self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool + ) -> None: async def emit_prune(self, topic: str, id: ID) -> None: """Emit graft message, sent to to_peer, for topic.""" prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() prune_msg.topicID = topic + back_off_duration = self.back_off + if is_unsubscribe: + back_off_duration = self.unsubscribe_back_off + + prune_msg.backoff = back_off_duration + + # TODO: add peers once peerstore changes are complete + # prune_msg.peers = + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.prune.extend([prune_msg]) diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index 2f010fa3..7941d655 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: libp2p/pubsub/pb/rpc.proto +# source: rpc.proto """Generated protocol buffer code.""" from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor @@ -13,37 +13,39 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'rpc_pb2', globals()) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _RPC._serialized_start=42 - _RPC._serialized_end=222 - _RPC_SUBOPTS._serialized_start=177 - _RPC_SUBOPTS._serialized_end=222 - _MESSAGE._serialized_start=224 - _MESSAGE._serialized_end=329 - _CONTROLMESSAGE._serialized_start=332 - _CONTROLMESSAGE._serialized_end=508 - _CONTROLIHAVE._serialized_start=510 - _CONTROLIHAVE._serialized_end=561 - _CONTROLIWANT._serialized_start=563 - _CONTROLIWANT._serialized_end=597 - _CONTROLGRAFT._serialized_start=599 - _CONTROLGRAFT._serialized_end=630 - _CONTROLPRUNE._serialized_start=632 - _CONTROLPRUNE._serialized_end=663 - _TOPICDESCRIPTOR._serialized_start=666 - _TOPICDESCRIPTOR._serialized_end=1057 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_start=799 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_end=923 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=885 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=923 - _TOPICDESCRIPTOR_ENCOPTS._serialized_start=926 - _TOPICDESCRIPTOR_ENCOPTS._serialized_end=1057 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1014 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1057 + _RPC._serialized_start=25 + _RPC._serialized_end=205 + _RPC_SUBOPTS._serialized_start=160 + _RPC_SUBOPTS._serialized_end=205 + _MESSAGE._serialized_start=207 + _MESSAGE._serialized_end=312 + _CONTROLMESSAGE._serialized_start=315 + _CONTROLMESSAGE._serialized_end=491 + _CONTROLIHAVE._serialized_start=493 + _CONTROLIHAVE._serialized_end=544 + _CONTROLIWANT._serialized_start=546 + _CONTROLIWANT._serialized_end=580 + _CONTROLGRAFT._serialized_start=582 + _CONTROLGRAFT._serialized_end=613 + _CONTROLPRUNE._serialized_start=615 + _CONTROLPRUNE._serialized_end=699 + _PEERINFO._serialized_start=701 + _PEERINFO._serialized_end=753 + _TOPICDESCRIPTOR._serialized_start=756 + _TOPICDESCRIPTOR._serialized_end=1147 + _TOPICDESCRIPTOR_AUTHOPTS._serialized_start=889 + _TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1013 + _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=975 + _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1013 + _TOPICDESCRIPTOR_ENCOPTS._serialized_start=1016 + _TOPICDESCRIPTOR_ENCOPTS._serialized_end=1147 + _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1104 + _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1147 # @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pb/rpc_pb2.pyi b/libp2p/pubsub/pb/rpc_pb2.pyi index 232d90d2..88738e2e 100644 --- a/libp2p/pubsub/pb/rpc_pb2.pyi +++ b/libp2p/pubsub/pb/rpc_pb2.pyi @@ -179,17 +179,43 @@ class ControlPrune(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor TOPICID_FIELD_NUMBER: builtins.int + PEERS_FIELD_NUMBER: builtins.int + BACKOFF_FIELD_NUMBER: builtins.int topicID: builtins.str + backoff: builtins.int + @property + def peers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___PeerInfo]: ... def __init__( self, *, topicID: builtins.str | None = ..., + peers: collections.abc.Iterable[global___PeerInfo] | None = ..., + backoff: builtins.int | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ... + def HasField(self, field_name: typing.Literal["backoff", b"backoff", "topicID", b"topicID"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["backoff", b"backoff", "peers", b"peers", "topicID", b"topicID"]) -> None: ... global___ControlPrune = ControlPrune +@typing.final +class PeerInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + PEERID_FIELD_NUMBER: builtins.int + SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int + peerID: builtins.bytes + signedPeerRecord: builtins.bytes + def __init__( + self, + *, + peerID: builtins.bytes | None = ..., + signedPeerRecord: builtins.bytes | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> None: ... + +global___PeerInfo = PeerInfo + @typing.final class TopicDescriptor(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor From 788b4cf51a09144b6e8b38e201c47ef2a19133c3 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Wed, 18 Jun 2025 18:57:09 +0530 Subject: [PATCH 3/8] added complete back_off implementation --- libp2p/pubsub/gossipsub.py | 118 ++++++++++++++++++++++------ tests/core/pubsub/test_gossipsub.py | 4 +- 2 files changed, 95 insertions(+), 27 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index d8f11215..7abe6251 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -93,7 +93,8 @@ class GossipSub(IPubsubRouter, Service): direct_connect_interval: int do_px: bool - back_off: int + back_off: dict[str, dict[ID, int]] + prune_back_off: int unsubscribe_back_off: int def __init__( @@ -111,7 +112,7 @@ class GossipSub(IPubsubRouter, Service): direct_connect_initial_delay: float = 0.1, direct_connect_interval: int = 300, do_px: bool = False, - back_off: int = 60, + prune_back_off: int = 60, unsubscribe_back_off: int = 10, ) -> None: self.protocols = list(protocols) @@ -148,7 +149,8 @@ class GossipSub(IPubsubRouter, Service): self.time_since_last_publish = {} self.do_px = do_px - self.back_off = back_off + self.back_off = dict() + self.prune_back_off = prune_back_off self.unsubscribe_back_off = unsubscribe_back_off async def run(self) -> None: @@ -345,15 +347,21 @@ class GossipSub(IPubsubRouter, Service): self.mesh[topic] = set() topic_in_fanout: bool = topic in self.fanout - fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set() + fanout_peers: set[ID] = set() + + for peer in self.fanout[topic]: + if self._check_back_off(peer, topic): + continue + fanout_peers.add(peer) + fanout_size = len(fanout_peers) if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): # There are less than D peers (let this number be x) # in the fanout for a topic (or the topic is not in the fanout). # Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. - if topic in self.pubsub.peer_topics: + if self.pubsub is not None and topic in self.pubsub.peer_topics: selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - fanout_size, fanout_peers + topic, self.degree - fanout_size, fanout_peers, True ) # Combine fanout peers with selected peers fanout_peers.update(selected_peers) @@ -380,7 +388,8 @@ class GossipSub(IPubsubRouter, Service): return # Notify the peers in mesh[topic] with a PRUNE(topic) message for peer in self.mesh[topic]: - await self.emit_prune(topic, peer, do_px=self.do_px, is_unsubscribe=True) + await self.emit_prune(topic, peer, self.do_px, True) + self._add_back_off(peer, topic, True) # Forget mesh[topic] self.mesh.pop(topic, None) @@ -516,7 +525,7 @@ class GossipSub(IPubsubRouter, Service): if num_mesh_peers_in_topic < self.degree_low: # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501 selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic] + topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic], True ) for peer in selected_peers: @@ -579,9 +588,7 @@ class GossipSub(IPubsubRouter, Service): if len(in_topic_peers) < self.degree: # Select additional peers from peers.gossipsub[topic] selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, - self.degree - len(in_topic_peers), - in_topic_peers, + topic, self.degree - len(in_topic_peers), in_topic_peers, True ) # Add the selected peers in_topic_peers.update(selected_peers) @@ -592,7 +599,7 @@ class GossipSub(IPubsubRouter, Service): if msg_ids: # Select D peers from peers.gossipsub[topic] excluding current peers peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, current_peers + topic, self.degree, current_peers, True ) msg_id_strs = [str(msg_id) for msg_id in msg_ids] for peer in peers_to_emit_ihave_to: @@ -666,7 +673,11 @@ class GossipSub(IPubsubRouter, Service): return selection def _get_in_topic_gossipsub_peers_from_minus( - self, topic: str, num_to_select: int, minus: Iterable[ID] + self, + topic: str, + num_to_select: int, + minus: Iterable[ID], + backoff_check: bool = False, ) -> list[ID]: if self.pubsub is None: raise NoPubsubAttached @@ -675,8 +686,57 @@ class GossipSub(IPubsubRouter, Service): for peer_id in self.pubsub.peer_topics[topic] if self.peer_protocol[peer_id] == PROTOCOL_ID } + if backoff_check: + # filter out peers that are in back off for this topic + gossipsub_peers_in_topic = { + peer_id + for peer_id in gossipsub_peers_in_topic + if self._check_back_off(peer_id, topic) is False + } return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus) + def _add_back_off( + self, peer: ID, topic: str, is_unsubscribe: bool, backoff_duration: int = 0 + ) -> None: + """ + Add back off for a peer in a topic. + :param peer: peer to add back off for + :param topic: topic to add back off for + :param is_unsubscribe: whether this is an unsubscribe operation + :param backoff_duration: duration of back off in seconds, if 0, use default + """ + if topic not in self.back_off: + self.back_off[topic] = dict() + + backoff_till = int(time.time()) + if backoff_duration > 0: + backoff_till += backoff_duration + else: + if is_unsubscribe: + backoff_till += self.unsubscribe_back_off + else: + backoff_till += self.prune_back_off + + if peer not in self.back_off[topic]: + self.back_off[topic][peer] = backoff_till + else: + self.back_off[topic][peer] = max(self.back_off[topic][peer], backoff_till) + + def _check_back_off(self, peer: ID, topic: str) -> bool: + """ + Check if a peer is in back off for a topic and cleanup expired back off entries. + :param peer: peer to check + :param topic: topic to check + :return: True if the peer is in back off, False otherwise + """ + if topic not in self.back_off: + return False + if self.back_off[topic].get(peer, 0) > int(time.time()): + return True + else: + del self.back_off[topic][peer] + return False + # RPC handlers async def handle_ihave( @@ -762,9 +822,6 @@ class GossipSub(IPubsubRouter, Service): ) -> None: topic: str = graft_msg.topicID - # TODO: complete the remaining logic - self.do_px - # Add peer to mesh for topic if topic in self.mesh: for direct_peer in self.direct_peers: @@ -772,26 +829,38 @@ class GossipSub(IPubsubRouter, Service): logger.warning( "GRAFT: ignoring request from direct peer %s", sender_peer_id ) - await self.emit_prune( - topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False - ) + await self.emit_prune(topic, sender_peer_id, False, False) return + if self._check_back_off(sender_peer_id, topic): + logger.warning( + "GRAFT: ignoring request from %s, back off until %d", + sender_peer_id, + self.back_off[topic][sender_peer_id], + ) + self._add_back_off(sender_peer_id, topic, False) + await self.emit_prune(topic, sender_peer_id, False, False) + return + if sender_peer_id not in self.mesh[topic]: self.mesh[topic].add(sender_peer_id) else: # Respond with PRUNE if not subscribed to the topic - await self.emit_prune( - topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False - ) + await self.emit_prune(topic, sender_peer_id, self.do_px, False) async def handle_prune( self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID ) -> None: topic: str = prune_msg.topicID + backoff_till: int = prune_msg.backoff # Remove peer from mesh for topic if topic in self.mesh: + if backoff_till > 0: + self._add_back_off(sender_peer_id, topic, False, backoff_till) + else: + self._add_back_off(sender_peer_id, topic, False) + self.mesh[topic].discard(sender_peer_id) # RPC emitters @@ -845,12 +914,11 @@ class GossipSub(IPubsubRouter, Service): async def emit_prune( self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool ) -> None: - async def emit_prune(self, topic: str, id: ID) -> None: """Emit graft message, sent to to_peer, for topic.""" prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() prune_msg.topicID = topic - back_off_duration = self.back_off + back_off_duration = self.prune_back_off if is_unsubscribe: back_off_duration = self.unsubscribe_back_off @@ -862,7 +930,7 @@ class GossipSub(IPubsubRouter, Service): control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.prune.extend([prune_msg]) - await self.emit_control_message(control_msg, id) + await self.emit_control_message(control_msg, to_peer) async def emit_control_message( self, control_msg: rpc_pb2.ControlMessage, to_peer: ID diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 4dec971d..9a767608 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -134,7 +134,7 @@ async def test_handle_graft(monkeypatch): # check if it is called in `handle_graft` event_emit_prune = trio.Event() - async def emit_prune(topic, sender_peer_id): + async def emit_prune(topic, sender_peer_id, do_px, is_unsubscribe): event_emit_prune.set() await trio.lowlevel.checkpoint() @@ -193,7 +193,7 @@ async def test_handle_prune(): # alice emit prune message to bob, alice should be removed # from bob's mesh peer - await gossipsubs[index_alice].emit_prune(topic, id_bob) + await gossipsubs[index_alice].emit_prune(topic, id_bob, False, False) # `emit_prune` does not remove bob from alice's mesh peers assert id_bob in gossipsubs[index_alice].mesh[topic] From 303bf3060afc802bd98e2bd7364ec243fc2a1882 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Fri, 20 Jun 2025 13:10:52 +0530 Subject: [PATCH 4/8] implemented peer exchange --- libp2p/peer/peerinfo.py | 18 ++++++++++++ libp2p/pubsub/gossipsub.py | 56 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index f3b3bd7b..29ce4e66 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -66,5 +66,23 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: return PeerInfo(peer_id, [addr]) +def peer_info_to_bytes(peer_info: PeerInfo) -> bytes: + lines = [str(peer_info.peer_id)] + [str(addr) for addr in peer_info.addrs] + return "\n".join(lines).encode("utf-8") + + +def peer_info_from_bytes(data: bytes) -> PeerInfo: + try: + lines = data.decode("utf-8").splitlines() + if not lines: + raise InvalidAddrError("no data to decode PeerInfo") + + peer_id = ID.from_base58(lines[0]) + addrs = [multiaddr.Multiaddr(addr_str) for addr_str in lines[1:]] + return PeerInfo(peer_id, addrs) + except Exception as e: + raise InvalidAddrError(f"failed to decode PeerInfo: {e}") + + class InvalidAddrError(ValueError): pass diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 7abe6251..43602a29 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -32,6 +32,8 @@ from libp2p.peer.id import ( ) from libp2p.peer.peerinfo import ( PeerInfo, + peer_info_from_bytes, + peer_info_to_bytes, ) from libp2p.peer.peerstore import ( PERMANENT_ADDR_TTL, @@ -93,6 +95,7 @@ class GossipSub(IPubsubRouter, Service): direct_connect_interval: int do_px: bool + px_peers_count: int back_off: dict[str, dict[ID, int]] prune_back_off: int unsubscribe_back_off: int @@ -112,6 +115,7 @@ class GossipSub(IPubsubRouter, Service): direct_connect_initial_delay: float = 0.1, direct_connect_interval: int = 300, do_px: bool = False, + px_peers_count: int = 16, prune_back_off: int = 60, unsubscribe_back_off: int = 10, ) -> None: @@ -149,6 +153,7 @@ class GossipSub(IPubsubRouter, Service): self.time_since_last_publish = {} self.do_px = do_px + self.px_peers_count = px_peers_count self.back_off = dict() self.prune_back_off = prune_back_off self.unsubscribe_back_off = unsubscribe_back_off @@ -737,6 +742,37 @@ class GossipSub(IPubsubRouter, Service): del self.back_off[topic][peer] return False + async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None: + if len(px_peers) > self.px_peers_count: + px_peers = px_peers[: self.px_peers_count] + + for peer in px_peers: + peer_id: ID = ID(peer.peerID) + + if self.pubsub and peer_id in self.pubsub.peers: + continue + + try: + peer_info = peer_info_from_bytes(peer.signedPeerRecord) + try: + if self.pubsub is None: + raise NoPubsubAttached + await self.pubsub.host.connect(peer_info) + except Exception as e: + logger.warning( + "failed to connect to px peer %s: %s", + peer_id, + e, + ) + continue + except Exception as e: + logger.warning( + "failed to parse peer info from px peer %s: %s", + peer_id, + e, + ) + continue + # RPC handlers async def handle_ihave( @@ -853,6 +889,9 @@ class GossipSub(IPubsubRouter, Service): ) -> None: topic: str = prune_msg.topicID backoff_till: int = prune_msg.backoff + px_peers: list[rpc_pb2.PeerInfo] = [] + for peer in prune_msg.peers: + px_peers.append(peer) # Remove peer from mesh for topic if topic in self.mesh: @@ -863,6 +902,9 @@ class GossipSub(IPubsubRouter, Service): self.mesh[topic].discard(sender_peer_id) + if px_peers: + await self._do_px(px_peers) + # RPC emitters def pack_control_msgs( @@ -924,8 +966,18 @@ class GossipSub(IPubsubRouter, Service): prune_msg.backoff = back_off_duration - # TODO: add peers once peerstore changes are complete - # prune_msg.peers = + if do_px: + exchange_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.px_peers_count, [to_peer] + ) + for peer in exchange_peers: + if self.pubsub is None: + raise NoPubsubAttached + peer_info = self.pubsub.host.get_peerstore().peer_info(peer) + signed_peer_record: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo() + signed_peer_record.peerID = peer.to_bytes() + signed_peer_record.signedPeerRecord = peer_info_to_bytes(peer_info) + prune_msg.peers.append(signed_peer_record) control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.prune.extend([prune_msg]) From 24e73207d2a7a67e41ab68602e037d7d026d421a Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Fri, 20 Jun 2025 13:57:02 +0530 Subject: [PATCH 5/8] fixed failing demo Co-authored-by: Khwahish Patel --- libp2p/pubsub/gossipsub.py | 9 +++++---- libp2p/tools/constants.py | 5 +++++ tests/utils/factories.py | 16 ++++++++++++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 43602a29..9f26865c 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -354,10 +354,11 @@ class GossipSub(IPubsubRouter, Service): topic_in_fanout: bool = topic in self.fanout fanout_peers: set[ID] = set() - for peer in self.fanout[topic]: - if self._check_back_off(peer, topic): - continue - fanout_peers.add(peer) + if topic_in_fanout: + for peer in self.fanout[topic]: + if self._check_back_off(peer, topic): + continue + fanout_peers.add(peer) fanout_size = len(fanout_peers) if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index a9ba4b76..f7d367e7 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -26,6 +26,7 @@ LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID +GOSSIPSUB_PROTOCOL_ID_V1 = gossipsub.PROTOCOL_ID_V11 class GossipsubParams(NamedTuple): @@ -40,6 +41,10 @@ class GossipsubParams(NamedTuple): heartbeat_interval: float = 0.5 direct_connect_initial_delay: float = 0.1 direct_connect_interval: int = 300 + do_px: bool = False + px_peers_count: int = 16 + prune_back_off: int = 60 + unsubscribe_back_off: int = 10 GOSSIPSUB_PARAMS = GossipsubParams() diff --git a/tests/utils/factories.py b/tests/utils/factories.py index 4df82033..1beac861 100644 --- a/tests/utils/factories.py +++ b/tests/utils/factories.py @@ -443,6 +443,10 @@ class GossipsubFactory(factory.Factory): heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval direct_connect_initial_delay = GOSSIPSUB_PARAMS.direct_connect_initial_delay direct_connect_interval = GOSSIPSUB_PARAMS.direct_connect_interval + do_px = GOSSIPSUB_PARAMS.do_px + px_peers_count = GOSSIPSUB_PARAMS.px_peers_count + prune_back_off = GOSSIPSUB_PARAMS.prune_back_off + unsubscribe_back_off = GOSSIPSUB_PARAMS.unsubscribe_back_off class PubsubFactory(factory.Factory): @@ -568,6 +572,10 @@ class PubsubFactory(factory.Factory): heartbeat_initial_delay: float = GOSSIPSUB_PARAMS.heartbeat_initial_delay, direct_connect_initial_delay: float = GOSSIPSUB_PARAMS.direct_connect_initial_delay, # noqa: E501 direct_connect_interval: int = GOSSIPSUB_PARAMS.direct_connect_interval, + do_px: bool = GOSSIPSUB_PARAMS.do_px, + px_peers_count: int = GOSSIPSUB_PARAMS.px_peers_count, + prune_back_off: int = GOSSIPSUB_PARAMS.prune_back_off, + unsubscribe_back_off: int = GOSSIPSUB_PARAMS.unsubscribe_back_off, security_protocol: TProtocol | None = None, muxer_opt: TMuxerOptions | None = None, msg_id_constructor: None @@ -588,6 +596,10 @@ class PubsubFactory(factory.Factory): heartbeat_interval=heartbeat_interval, direct_connect_initial_delay=direct_connect_initial_delay, direct_connect_interval=direct_connect_interval, + do_px=do_px, + px_peers_count=px_peers_count, + prune_back_off=prune_back_off, + unsubscribe_back_off=unsubscribe_back_off, ) else: gossipsubs = GossipsubFactory.create_batch( @@ -602,6 +614,10 @@ class PubsubFactory(factory.Factory): heartbeat_initial_delay=heartbeat_initial_delay, direct_connect_initial_delay=direct_connect_initial_delay, direct_connect_interval=direct_connect_interval, + do_px=do_px, + px_peers_count=px_peers_count, + prune_back_off=prune_back_off, + unsubscribe_back_off=unsubscribe_back_off, ) async with cls._create_batch_with_router( From ea6eef6ed57a62dff941749e6c35e5f1c1451715 Mon Sep 17 00:00:00 2001 From: Khwahish29 Date: Mon, 23 Jun 2025 00:41:13 +0530 Subject: [PATCH 6/8] test px and backoff --- libp2p/pubsub/gossipsub.py | 2 +- tests/core/pubsub/test_gossipsub.py | 4 +- .../pubsub/test_gossipsub_px_and_backoff.py | 115 ++++++++++++++++++ 3 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 tests/core/pubsub/test_gossipsub_px_and_backoff.py diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 9f26865c..a8ffae7e 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -735,7 +735,7 @@ class GossipSub(IPubsubRouter, Service): :param topic: topic to check :return: True if the peer is in back off, False otherwise """ - if topic not in self.back_off: + if topic not in self.back_off or peer not in self.back_off[topic]: return False if self.back_off[topic].get(peer, 0) > int(time.time()): return True diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 9a767608..03276a78 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -292,7 +292,9 @@ async def test_fanout(): @pytest.mark.trio @pytest.mark.slow async def test_fanout_maintenance(): - async with PubsubFactory.create_batch_with_gossipsub(10) as pubsubs_gsub: + async with PubsubFactory.create_batch_with_gossipsub( + 10, unsubscribe_back_off=1 + ) as pubsubs_gsub: hosts = [pubsub.host for pubsub in pubsubs_gsub] num_msgs = 5 diff --git a/tests/core/pubsub/test_gossipsub_px_and_backoff.py b/tests/core/pubsub/test_gossipsub_px_and_backoff.py new file mode 100644 index 00000000..a47aa3ce --- /dev/null +++ b/tests/core/pubsub/test_gossipsub_px_and_backoff.py @@ -0,0 +1,115 @@ +import pytest +import trio + +from libp2p.pubsub.gossipsub import ( + GossipSub, +) +from libp2p.tools.utils import ( + connect, +) +from tests.utils.factories import ( + PubsubFactory, +) + + +@pytest.mark.trio +async def test_prune_backoff(): + async with PubsubFactory.create_batch_with_gossipsub( + 2, heartbeat_interval=0.5, prune_back_off=2 + ) as pubsubs: + gsub0 = pubsubs[0].router + gsub1 = pubsubs[1].router + assert isinstance(gsub0, GossipSub) + assert isinstance(gsub1, GossipSub) + host_0 = pubsubs[0].host + host_1 = pubsubs[1].host + + topic = "test_prune_backoff" + + # connect hosts + await connect(host_0, host_1) + await trio.sleep(0.5) + + # both join the topic + await gsub0.join(topic) + await gsub1.join(topic) + await gsub0.emit_graft(topic, host_1.get_id()) + await trio.sleep(0.5) + + # ensure peer is registered in mesh + assert host_0.get_id() in gsub1.mesh[topic] + + # prune host_1 from gsub0's mesh + await gsub0.emit_prune(topic, host_1.get_id(), False, False) + await trio.sleep(0.5) + + # host_0 should not be in gsub1's mesh + assert host_0.get_id() not in gsub1.mesh[topic] + + # try to graft again immediately (should be rejected due to backoff) + await gsub0.emit_graft(topic, host_1.get_id()) + await trio.sleep(0.5) + assert host_0.get_id() not in gsub1.mesh[topic], ( + "peer should be backoffed and not re-added" + ) + + # try to graft again (should succeed after backoff) + await trio.sleep(2) + await gsub0.emit_graft(topic, host_1.get_id()) + await trio.sleep(1) + assert host_0.get_id() in gsub1.mesh[topic], ( + "peer should be able to rejoin after backoff" + ) + + +@pytest.mark.trio +async def test_unsubscribe_backoff(): + async with PubsubFactory.create_batch_with_gossipsub( + 2, heartbeat_interval=1, prune_back_off=1, unsubscribe_back_off=2 + ) as pubsubs: + gsub0 = pubsubs[0].router + gsub1 = pubsubs[1].router + assert isinstance(gsub0, GossipSub) + assert isinstance(gsub1, GossipSub) + host_0 = pubsubs[0].host + host_1 = pubsubs[1].host + + topic = "test_unsubscribe_backoff" + + # connect hosts + await connect(host_0, host_1) + await trio.sleep(0.5) + + # both join the topic + await gsub0.join(topic) + await gsub1.join(topic) + await gsub0.emit_graft(topic, host_1.get_id()) + await trio.sleep(0.5) + + # ensure peer is registered in mesh + assert host_0.get_id() in gsub1.mesh[topic] + + # host_1 unsubscribes from the topic + await gsub1.leave(topic) + await trio.sleep(0.5) + assert topic not in gsub1.mesh + + # host_1 resubscribes to the topic + await gsub1.join(topic) + await trio.sleep(0.5) + assert topic in gsub1.mesh + + # try to graft again immediately (should be rejected due to backoff) + await gsub0.emit_graft(topic, host_1.get_id()) + await trio.sleep(0.5) + assert host_0.get_id() not in gsub1.mesh[topic], ( + "peer should be backoffed and not re-added" + ) + + # try to graft again (should succeed after backoff) + await trio.sleep(1) + await gsub0.emit_graft(topic, host_1.get_id()) + await trio.sleep(1) + assert host_0.get_id() in gsub1.mesh[topic], ( + "peer should be able to rejoin after backoff" + ) From fbee0ba2ab3c0aa8ee3c18ceed64fba24cc3144f Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Mon, 23 Jun 2025 01:00:46 +0530 Subject: [PATCH 7/8] added newsfragment --- newsfragments/690.feature.rst | 1 + .../pubsub/test_gossipsub_px_and_backoff.py | 50 +++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 newsfragments/690.feature.rst diff --git a/newsfragments/690.feature.rst b/newsfragments/690.feature.rst new file mode 100644 index 00000000..450ffdfd --- /dev/null +++ b/newsfragments/690.feature.rst @@ -0,0 +1 @@ +added peer exchange and backoff logic as part of Gossipsub v1.1 upgrade diff --git a/tests/core/pubsub/test_gossipsub_px_and_backoff.py b/tests/core/pubsub/test_gossipsub_px_and_backoff.py index a47aa3ce..8c623ebd 100644 --- a/tests/core/pubsub/test_gossipsub_px_and_backoff.py +++ b/tests/core/pubsub/test_gossipsub_px_and_backoff.py @@ -113,3 +113,53 @@ async def test_unsubscribe_backoff(): assert host_0.get_id() in gsub1.mesh[topic], ( "peer should be able to rejoin after backoff" ) + + +@pytest.mark.trio +async def test_peer_exchange(): + async with PubsubFactory.create_batch_with_gossipsub( + 3, + heartbeat_interval=0.5, + do_px=True, + px_peers_count=1, + ) as pubsubs: + gsub0 = pubsubs[0].router + gsub1 = pubsubs[1].router + gsub2 = pubsubs[2].router + assert isinstance(gsub0, GossipSub) + assert isinstance(gsub1, GossipSub) + assert isinstance(gsub2, GossipSub) + host_0 = pubsubs[0].host + host_1 = pubsubs[1].host + host_2 = pubsubs[2].host + + topic = "test_peer_exchange" + + # connect hosts + await connect(host_1, host_0) + await connect(host_1, host_2) + await trio.sleep(0.5) + + # all join the topic and 0 <-> 1 and 1 <-> 2 graft + await pubsubs[1].subscribe(topic) + await pubsubs[0].subscribe(topic) + await pubsubs[2].subscribe(topic) + await gsub1.emit_graft(topic, host_0.get_id()) + await gsub1.emit_graft(topic, host_2.get_id()) + await gsub0.emit_graft(topic, host_1.get_id()) + await gsub2.emit_graft(topic, host_1.get_id()) + await trio.sleep(1) + + # ensure peer is registered in mesh + assert host_0.get_id() in gsub1.mesh[topic] + assert host_2.get_id() in gsub1.mesh[topic] + assert host_2.get_id() not in gsub0.mesh[topic] + + # host_1 unsubscribes from the topic + await gsub1.leave(topic) + await trio.sleep(1) # Wait for heartbeat to update mesh + assert topic not in gsub1.mesh + + # Wait for gsub0 to graft host_2 into its mesh via PX + await trio.sleep(1) + assert host_2.get_id() in gsub0.mesh[topic] From 73ebd27c5941401af4f15e59fb7d87f6982bd285 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Thu, 26 Jun 2025 01:28:21 +0530 Subject: [PATCH 8/8] added isolated_topics_test and stress_test --- .../pubsub/test_gossipsub_px_and_backoff.py | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/tests/core/pubsub/test_gossipsub_px_and_backoff.py b/tests/core/pubsub/test_gossipsub_px_and_backoff.py index 8c623ebd..72ad5f9d 100644 --- a/tests/core/pubsub/test_gossipsub_px_and_backoff.py +++ b/tests/core/pubsub/test_gossipsub_px_and_backoff.py @@ -163,3 +163,112 @@ async def test_peer_exchange(): # Wait for gsub0 to graft host_2 into its mesh via PX await trio.sleep(1) assert host_2.get_id() in gsub0.mesh[topic] + + +@pytest.mark.trio +async def test_topics_are_isolated(): + async with PubsubFactory.create_batch_with_gossipsub( + 2, heartbeat_interval=0.5, prune_back_off=2 + ) as pubsubs: + gsub0 = pubsubs[0].router + gsub1 = pubsubs[1].router + assert isinstance(gsub0, GossipSub) + assert isinstance(gsub1, GossipSub) + host_0 = pubsubs[0].host + host_1 = pubsubs[1].host + + topic1 = "test_prune_backoff" + topic2 = "test_prune_backoff2" + + # connect hosts + await connect(host_0, host_1) + await trio.sleep(0.5) + + # both peers join both the topics + await gsub0.join(topic1) + await gsub1.join(topic1) + await gsub0.join(topic2) + await gsub1.join(topic2) + await gsub0.emit_graft(topic1, host_1.get_id()) + await trio.sleep(0.5) + + # ensure topic1 for peer is registered in mesh + assert host_0.get_id() in gsub1.mesh[topic1] + + # prune topic1 for host_1 from gsub0's mesh + await gsub0.emit_prune(topic1, host_1.get_id(), False, False) + await trio.sleep(0.5) + + # topic1 for host_0 should not be in gsub1's mesh + assert host_0.get_id() not in gsub1.mesh[topic1] + + # try to regraft topic1 and graft new topic2 + await gsub0.emit_graft(topic1, host_1.get_id()) + await gsub0.emit_graft(topic2, host_1.get_id()) + await trio.sleep(0.5) + assert host_0.get_id() not in gsub1.mesh[topic1], ( + "peer should be backoffed and not re-added" + ) + assert host_0.get_id() in gsub1.mesh[topic2], ( + "peer should be able to join a different topic" + ) + + +@pytest.mark.trio +async def test_stress_churn(): + NUM_PEERS = 5 + CHURN_CYCLES = 30 + TOPIC = "stress_churn_topic" + PRUNE_BACKOFF = 1 + HEARTBEAT_INTERVAL = 0.2 + + async with PubsubFactory.create_batch_with_gossipsub( + NUM_PEERS, + heartbeat_interval=HEARTBEAT_INTERVAL, + prune_back_off=PRUNE_BACKOFF, + ) as pubsubs: + routers: list[GossipSub] = [] + for ps in pubsubs: + assert isinstance(ps.router, GossipSub) + routers.append(ps.router) + hosts = [ps.host for ps in pubsubs] + + # fully connect all peers + for i in range(NUM_PEERS): + for j in range(i + 1, NUM_PEERS): + await connect(hosts[i], hosts[j]) + await trio.sleep(1) + + # all peers join the topic + for router in routers: + await router.join(TOPIC) + await trio.sleep(1) + + # rapid join/prune cycles + for cycle in range(CHURN_CYCLES): + for i, router in enumerate(routers): + # prune all other peers from this router's mesh + for j, peer_host in enumerate(hosts): + if i != j: + await router.emit_prune(TOPIC, peer_host.get_id(), False, False) + await trio.sleep(0.1) + for i, router in enumerate(routers): + # graft all other peers back + for j, peer_host in enumerate(hosts): + if i != j: + await router.emit_graft(TOPIC, peer_host.get_id()) + await trio.sleep(0.1) + + # wait for backoff entries to expire and cleanup + await trio.sleep(PRUNE_BACKOFF * 2) + + # check that the backoff table is not unbounded + for router in routers: + # backoff is a dict: topic -> peer -> expiry + backoff = getattr(router, "back_off", None) + assert backoff is not None, "router missing backoff table" + # only a small number of entries should remain (ideally 0) + total_entries = sum(len(peers) for peers in backoff.values()) + assert total_entries < NUM_PEERS * 2, ( + f"backoff table grew too large: {total_entries} entries" + )