diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index ccfeb941..d8f11215 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -92,6 +92,10 @@ class GossipSub(IPubsubRouter, Service): direct_connect_initial_delay: float direct_connect_interval: int + do_px: bool + back_off: int + unsubscribe_back_off: int + def __init__( self, protocols: Sequence[TProtocol], @@ -106,6 +110,9 @@ class GossipSub(IPubsubRouter, Service): heartbeat_interval: int = 120, direct_connect_initial_delay: float = 0.1, direct_connect_interval: int = 300, + do_px: bool = False, + back_off: int = 60, + unsubscribe_back_off: int = 10, ) -> None: self.protocols = list(protocols) self.pubsub = None @@ -140,6 +147,10 @@ class GossipSub(IPubsubRouter, Service): self.direct_connect_initial_delay = direct_connect_initial_delay self.time_since_last_publish = {} + self.do_px = do_px + self.back_off = back_off + self.unsubscribe_back_off = unsubscribe_back_off + async def run(self) -> None: self.manager.run_daemon_task(self.heartbeat) if len(self.direct_peers) > 0: @@ -369,7 +380,7 @@ class GossipSub(IPubsubRouter, Service): return # Notify the peers in mesh[topic] with a PRUNE(topic) message for peer in self.mesh[topic]: - await self.emit_prune(topic, peer) + await self.emit_prune(topic, peer, do_px=self.do_px, is_unsubscribe=True) # Forget mesh[topic] self.mesh.pop(topic, None) @@ -751,6 +762,9 @@ class GossipSub(IPubsubRouter, Service): ) -> None: topic: str = graft_msg.topicID + # TODO: complete the remaining logic + self.do_px + # Add peer to mesh for topic if topic in self.mesh: for direct_peer in self.direct_peers: @@ -758,14 +772,18 @@ class GossipSub(IPubsubRouter, Service): logger.warning( "GRAFT: ignoring request from direct peer %s", sender_peer_id ) - await self.emit_prune(topic, sender_peer_id) + await self.emit_prune( + topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False + ) return if sender_peer_id not in self.mesh[topic]: self.mesh[topic].add(sender_peer_id) else: # Respond with PRUNE if not subscribed to the topic - await self.emit_prune(topic, sender_peer_id) + await self.emit_prune( + topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False + ) async def handle_prune( self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID @@ -824,11 +842,23 @@ class GossipSub(IPubsubRouter, Service): await self.emit_control_message(control_msg, id) + async def emit_prune( + self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool + ) -> None: async def emit_prune(self, topic: str, id: ID) -> None: """Emit graft message, sent to to_peer, for topic.""" prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune() prune_msg.topicID = topic + back_off_duration = self.back_off + if is_unsubscribe: + back_off_duration = self.unsubscribe_back_off + + prune_msg.backoff = back_off_duration + + # TODO: add peers once peerstore changes are complete + # prune_msg.peers = + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() control_msg.prune.extend([prune_msg]) diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index 2f010fa3..7941d655 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: libp2p/pubsub/pb/rpc.proto +# source: rpc.proto """Generated protocol buffer code.""" from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor @@ -13,37 +13,39 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'rpc_pb2', globals()) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _RPC._serialized_start=42 - _RPC._serialized_end=222 - _RPC_SUBOPTS._serialized_start=177 - _RPC_SUBOPTS._serialized_end=222 - _MESSAGE._serialized_start=224 - _MESSAGE._serialized_end=329 - _CONTROLMESSAGE._serialized_start=332 - _CONTROLMESSAGE._serialized_end=508 - _CONTROLIHAVE._serialized_start=510 - _CONTROLIHAVE._serialized_end=561 - _CONTROLIWANT._serialized_start=563 - _CONTROLIWANT._serialized_end=597 - _CONTROLGRAFT._serialized_start=599 - _CONTROLGRAFT._serialized_end=630 - _CONTROLPRUNE._serialized_start=632 - _CONTROLPRUNE._serialized_end=663 - _TOPICDESCRIPTOR._serialized_start=666 - _TOPICDESCRIPTOR._serialized_end=1057 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_start=799 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_end=923 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=885 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=923 - _TOPICDESCRIPTOR_ENCOPTS._serialized_start=926 - _TOPICDESCRIPTOR_ENCOPTS._serialized_end=1057 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1014 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1057 + _RPC._serialized_start=25 + _RPC._serialized_end=205 + _RPC_SUBOPTS._serialized_start=160 + _RPC_SUBOPTS._serialized_end=205 + _MESSAGE._serialized_start=207 + _MESSAGE._serialized_end=312 + _CONTROLMESSAGE._serialized_start=315 + _CONTROLMESSAGE._serialized_end=491 + _CONTROLIHAVE._serialized_start=493 + _CONTROLIHAVE._serialized_end=544 + _CONTROLIWANT._serialized_start=546 + _CONTROLIWANT._serialized_end=580 + _CONTROLGRAFT._serialized_start=582 + _CONTROLGRAFT._serialized_end=613 + _CONTROLPRUNE._serialized_start=615 + _CONTROLPRUNE._serialized_end=699 + _PEERINFO._serialized_start=701 + _PEERINFO._serialized_end=753 + _TOPICDESCRIPTOR._serialized_start=756 + _TOPICDESCRIPTOR._serialized_end=1147 + _TOPICDESCRIPTOR_AUTHOPTS._serialized_start=889 + _TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1013 + _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=975 + _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1013 + _TOPICDESCRIPTOR_ENCOPTS._serialized_start=1016 + _TOPICDESCRIPTOR_ENCOPTS._serialized_end=1147 + _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1104 + _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1147 # @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pb/rpc_pb2.pyi b/libp2p/pubsub/pb/rpc_pb2.pyi index 232d90d2..88738e2e 100644 --- a/libp2p/pubsub/pb/rpc_pb2.pyi +++ b/libp2p/pubsub/pb/rpc_pb2.pyi @@ -179,17 +179,43 @@ class ControlPrune(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor TOPICID_FIELD_NUMBER: builtins.int + PEERS_FIELD_NUMBER: builtins.int + BACKOFF_FIELD_NUMBER: builtins.int topicID: builtins.str + backoff: builtins.int + @property + def peers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___PeerInfo]: ... def __init__( self, *, topicID: builtins.str | None = ..., + peers: collections.abc.Iterable[global___PeerInfo] | None = ..., + backoff: builtins.int | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ... + def HasField(self, field_name: typing.Literal["backoff", b"backoff", "topicID", b"topicID"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["backoff", b"backoff", "peers", b"peers", "topicID", b"topicID"]) -> None: ... global___ControlPrune = ControlPrune +@typing.final +class PeerInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + PEERID_FIELD_NUMBER: builtins.int + SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int + peerID: builtins.bytes + signedPeerRecord: builtins.bytes + def __init__( + self, + *, + peerID: builtins.bytes | None = ..., + signedPeerRecord: builtins.bytes | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> None: ... + +global___PeerInfo = PeerInfo + @typing.final class TopicDescriptor(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor