From 56526b48707de39da8c74e68c31775f38a8352be Mon Sep 17 00:00:00 2001 From: lla-dane Date: Mon, 11 Aug 2025 18:27:11 +0530 Subject: [PATCH] signed-peer-record transfer integrated with pubsub rpc message trasfer --- libp2p/pubsub/floodsub.py | 10 + libp2p/pubsub/gossipsub.py | 53 +++++ libp2p/pubsub/pb/rpc.proto | 1 + libp2p/pubsub/pb/rpc_pb2.py | 67 +++--- libp2p/pubsub/pb/rpc_pb2.pyi | 435 ++++++++++------------------------- libp2p/pubsub/pubsub.py | 46 ++++ 6 files changed, 266 insertions(+), 346 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 3e0d454f..170f558d 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -15,6 +15,7 @@ from libp2p.custom_types import ( from libp2p.peer.id import ( ID, ) +from libp2p.peer.peerstore import create_signed_peer_record from .exceptions import ( PubsubRouterError, @@ -103,6 +104,15 @@ class FloodSub(IPubsubRouter): ) rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) + # Add the senderRecord of the peer in the RPC msg + if isinstance(self.pubsub, Pubsub): + envelope = create_signed_peer_record( + self.pubsub.host.get_id(), + self.pubsub.host.get_addrs(), + self.pubsub.host.get_private_key(), + ) + rpc_msg.senderRecord = envelope.marshal_envelope() + logger.debug("publishing message %s", pubsub_msg) if self.pubsub is None: diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index c345c138..b7c70c55 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -24,6 +24,7 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) +from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) @@ -34,6 +35,7 @@ from libp2p.peer.peerinfo import ( ) from libp2p.peer.peerstore import ( PERMANENT_ADDR_TTL, + create_signed_peer_record, ) from libp2p.pubsub import ( floodsub, @@ -226,6 +228,27 @@ class GossipSub(IPubsubRouter, Service): :param rpc: RPC message :param sender_peer_id: id of the peer who sent the message """ + # Process the senderRecord if sent + if isinstance(self.pubsub, Pubsub): + if rpc.HasField("senderRecord"): + try: + # Convert the signed-peer-record(Envelope) from + # protobuf bytes + envelope, _ = consume_envelope( + rpc.senderRecord, "libp2p-peer-record" + ) + # Use the default TTL of 2 hours (7200 seconds) + if self.pubsub.host.get_peerstore().consume_peer_record( + envelope, 7200 + ): + logger.error( + "Updating the Certified-Addr-Book was unsuccessful" + ) + except Exception as e: + logger.error( + "Error updating the certified addr book for peer: %s", e + ) + control_message = rpc.control # Relay each rpc control message to the appropriate handler @@ -253,6 +276,15 @@ class GossipSub(IPubsubRouter, Service): ) rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) + # Add the senderRecord of the peer in the RPC msg + if isinstance(self.pubsub, Pubsub): + envelope = create_signed_peer_record( + self.pubsub.host.get_id(), + self.pubsub.host.get_addrs(), + self.pubsub.host.get_private_key(), + ) + rpc_msg.senderRecord = envelope.marshal_envelope() + logger.debug("publishing message %s", pubsub_msg) for peer_id in peers_gen: @@ -818,6 +850,17 @@ class GossipSub(IPubsubRouter, Service): # 1) Package these messages into a single packet packet: rpc_pb2.RPC = rpc_pb2.RPC() + # Here the an RPC message is being created and published in response + # to the iwant control msg, so we will send a freshly created senderRecord + # with the RPC msg + if isinstance(self.pubsub, Pubsub): + envelope = create_signed_peer_record( + self.pubsub.host.get_id(), + self.pubsub.host.get_addrs(), + self.pubsub.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() + packet.publish.extend(msgs_to_forward) if self.pubsub is None: @@ -973,6 +1016,16 @@ class GossipSub(IPubsubRouter, Service): raise NoPubsubAttached # Add control message to packet packet: rpc_pb2.RPC = rpc_pb2.RPC() + + # Add the sender's peer-record in the RPC msg + if isinstance(self.pubsub, Pubsub): + envelope = create_signed_peer_record( + self.pubsub.host.get_id(), + self.pubsub.host.get_addrs(), + self.pubsub.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() + packet.control.CopyFrom(control_msg) # Get stream for peer from pubsub diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index 7abce0d6..d24db281 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -14,6 +14,7 @@ message RPC { } optional ControlMessage control = 3; + optional bytes senderRecord = 4; } message Message { diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index 30f0281b..e4a35745 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: libp2p/pubsub/pb/rpc.proto +# Protobuf Python Version: 4.25.3 """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -13,39 +14,39 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xca\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x12\x14\n\x0csenderRecord\x18\x04 \x01(\x0c\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals()) +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _RPC._serialized_start=42 - _RPC._serialized_end=222 - _RPC_SUBOPTS._serialized_start=177 - _RPC_SUBOPTS._serialized_end=222 - _MESSAGE._serialized_start=224 - _MESSAGE._serialized_end=329 - _CONTROLMESSAGE._serialized_start=332 - _CONTROLMESSAGE._serialized_end=508 - _CONTROLIHAVE._serialized_start=510 - _CONTROLIHAVE._serialized_end=561 - _CONTROLIWANT._serialized_start=563 - _CONTROLIWANT._serialized_end=597 - _CONTROLGRAFT._serialized_start=599 - _CONTROLGRAFT._serialized_end=630 - _CONTROLPRUNE._serialized_start=632 - _CONTROLPRUNE._serialized_end=716 - _PEERINFO._serialized_start=718 - _PEERINFO._serialized_end=770 - _TOPICDESCRIPTOR._serialized_start=773 - _TOPICDESCRIPTOR._serialized_end=1164 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_start=906 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1030 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=992 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1030 - _TOPICDESCRIPTOR_ENCOPTS._serialized_start=1033 - _TOPICDESCRIPTOR_ENCOPTS._serialized_end=1164 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1121 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1164 + _globals['_RPC']._serialized_start=42 + _globals['_RPC']._serialized_end=244 + _globals['_RPC_SUBOPTS']._serialized_start=199 + _globals['_RPC_SUBOPTS']._serialized_end=244 + _globals['_MESSAGE']._serialized_start=246 + _globals['_MESSAGE']._serialized_end=351 + _globals['_CONTROLMESSAGE']._serialized_start=354 + _globals['_CONTROLMESSAGE']._serialized_end=530 + _globals['_CONTROLIHAVE']._serialized_start=532 + _globals['_CONTROLIHAVE']._serialized_end=583 + _globals['_CONTROLIWANT']._serialized_start=585 + _globals['_CONTROLIWANT']._serialized_end=619 + _globals['_CONTROLGRAFT']._serialized_start=621 + _globals['_CONTROLGRAFT']._serialized_end=652 + _globals['_CONTROLPRUNE']._serialized_start=654 + _globals['_CONTROLPRUNE']._serialized_end=738 + _globals['_PEERINFO']._serialized_start=740 + _globals['_PEERINFO']._serialized_end=792 + _globals['_TOPICDESCRIPTOR']._serialized_start=795 + _globals['_TOPICDESCRIPTOR']._serialized_end=1186 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=928 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1052 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1014 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1052 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1055 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1186 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1143 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1186 # @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pb/rpc_pb2.pyi b/libp2p/pubsub/pb/rpc_pb2.pyi index 88738e2e..2609fd11 100644 --- a/libp2p/pubsub/pb/rpc_pb2.pyi +++ b/libp2p/pubsub/pb/rpc_pb2.pyi @@ -1,323 +1,132 @@ -""" -@generated by mypy-protobuf. Do not edit manually! -isort:skip_file -Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto""" +from google.protobuf.internal import containers as _containers +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union -import builtins -import collections.abc -import google.protobuf.descriptor -import google.protobuf.internal.containers -import google.protobuf.internal.enum_type_wrapper -import google.protobuf.message -import sys -import typing +DESCRIPTOR: _descriptor.FileDescriptor -if sys.version_info >= (3, 10): - import typing as typing_extensions -else: - import typing_extensions +class RPC(_message.Message): + __slots__ = ("subscriptions", "publish", "control", "senderRecord") + class SubOpts(_message.Message): + __slots__ = ("subscribe", "topicid") + SUBSCRIBE_FIELD_NUMBER: _ClassVar[int] + TOPICID_FIELD_NUMBER: _ClassVar[int] + subscribe: bool + topicid: str + def __init__(self, subscribe: bool = ..., topicid: _Optional[str] = ...) -> None: ... + SUBSCRIPTIONS_FIELD_NUMBER: _ClassVar[int] + PUBLISH_FIELD_NUMBER: _ClassVar[int] + CONTROL_FIELD_NUMBER: _ClassVar[int] + SENDERRECORD_FIELD_NUMBER: _ClassVar[int] + subscriptions: _containers.RepeatedCompositeFieldContainer[RPC.SubOpts] + publish: _containers.RepeatedCompositeFieldContainer[Message] + control: ControlMessage + senderRecord: bytes + def __init__(self, subscriptions: _Optional[_Iterable[_Union[RPC.SubOpts, _Mapping]]] = ..., publish: _Optional[_Iterable[_Union[Message, _Mapping]]] = ..., control: _Optional[_Union[ControlMessage, _Mapping]] = ..., senderRecord: _Optional[bytes] = ...) -> None: ... # type: ignore -DESCRIPTOR: google.protobuf.descriptor.FileDescriptor +class Message(_message.Message): + __slots__ = ("from_id", "data", "seqno", "topicIDs", "signature", "key") + FROM_ID_FIELD_NUMBER: _ClassVar[int] + DATA_FIELD_NUMBER: _ClassVar[int] + SEQNO_FIELD_NUMBER: _ClassVar[int] + TOPICIDS_FIELD_NUMBER: _ClassVar[int] + SIGNATURE_FIELD_NUMBER: _ClassVar[int] + KEY_FIELD_NUMBER: _ClassVar[int] + from_id: bytes + data: bytes + seqno: bytes + topicIDs: _containers.RepeatedScalarFieldContainer[str] + signature: bytes + key: bytes + def __init__(self, from_id: _Optional[bytes] = ..., data: _Optional[bytes] = ..., seqno: _Optional[bytes] = ..., topicIDs: _Optional[_Iterable[str]] = ..., signature: _Optional[bytes] = ..., key: _Optional[bytes] = ...) -> None: ... -@typing.final -class RPC(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor +class ControlMessage(_message.Message): + __slots__ = ("ihave", "iwant", "graft", "prune") + IHAVE_FIELD_NUMBER: _ClassVar[int] + IWANT_FIELD_NUMBER: _ClassVar[int] + GRAFT_FIELD_NUMBER: _ClassVar[int] + PRUNE_FIELD_NUMBER: _ClassVar[int] + ihave: _containers.RepeatedCompositeFieldContainer[ControlIHave] + iwant: _containers.RepeatedCompositeFieldContainer[ControlIWant] + graft: _containers.RepeatedCompositeFieldContainer[ControlGraft] + prune: _containers.RepeatedCompositeFieldContainer[ControlPrune] + def __init__(self, ihave: _Optional[_Iterable[_Union[ControlIHave, _Mapping]]] = ..., iwant: _Optional[_Iterable[_Union[ControlIWant, _Mapping]]] = ..., graft: _Optional[_Iterable[_Union[ControlGraft, _Mapping]]] = ..., prune: _Optional[_Iterable[_Union[ControlPrune, _Mapping]]] = ...) -> None: ... # type: ignore - @typing.final - class SubOpts(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor +class ControlIHave(_message.Message): + __slots__ = ("topicID", "messageIDs") + TOPICID_FIELD_NUMBER: _ClassVar[int] + MESSAGEIDS_FIELD_NUMBER: _ClassVar[int] + topicID: str + messageIDs: _containers.RepeatedScalarFieldContainer[str] + def __init__(self, topicID: _Optional[str] = ..., messageIDs: _Optional[_Iterable[str]] = ...) -> None: ... - SUBSCRIBE_FIELD_NUMBER: builtins.int - TOPICID_FIELD_NUMBER: builtins.int - subscribe: builtins.bool - """subscribe or unsubscribe""" - topicid: builtins.str - def __init__( - self, - *, - subscribe: builtins.bool | None = ..., - topicid: builtins.str | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["subscribe", b"subscribe", "topicid", b"topicid"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["subscribe", b"subscribe", "topicid", b"topicid"]) -> None: ... +class ControlIWant(_message.Message): + __slots__ = ("messageIDs",) + MESSAGEIDS_FIELD_NUMBER: _ClassVar[int] + messageIDs: _containers.RepeatedScalarFieldContainer[str] + def __init__(self, messageIDs: _Optional[_Iterable[str]] = ...) -> None: ... - SUBSCRIPTIONS_FIELD_NUMBER: builtins.int - PUBLISH_FIELD_NUMBER: builtins.int - CONTROL_FIELD_NUMBER: builtins.int - @property - def subscriptions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RPC.SubOpts]: ... - @property - def publish(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message]: ... - @property - def control(self) -> global___ControlMessage: ... - def __init__( - self, - *, - subscriptions: collections.abc.Iterable[global___RPC.SubOpts] | None = ..., - publish: collections.abc.Iterable[global___Message] | None = ..., - control: global___ControlMessage | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["control", b"control"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["control", b"control", "publish", b"publish", "subscriptions", b"subscriptions"]) -> None: ... +class ControlGraft(_message.Message): + __slots__ = ("topicID",) + TOPICID_FIELD_NUMBER: _ClassVar[int] + topicID: str + def __init__(self, topicID: _Optional[str] = ...) -> None: ... -global___RPC = RPC +class ControlPrune(_message.Message): + __slots__ = ("topicID", "peers", "backoff") + TOPICID_FIELD_NUMBER: _ClassVar[int] + PEERS_FIELD_NUMBER: _ClassVar[int] + BACKOFF_FIELD_NUMBER: _ClassVar[int] + topicID: str + peers: _containers.RepeatedCompositeFieldContainer[PeerInfo] + backoff: int + def __init__(self, topicID: _Optional[str] = ..., peers: _Optional[_Iterable[_Union[PeerInfo, _Mapping]]] = ..., backoff: _Optional[int] = ...) -> None: ... # type: ignore -@typing.final -class Message(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor +class PeerInfo(_message.Message): + __slots__ = ("peerID", "signedPeerRecord") + PEERID_FIELD_NUMBER: _ClassVar[int] + SIGNEDPEERRECORD_FIELD_NUMBER: _ClassVar[int] + peerID: bytes + signedPeerRecord: bytes + def __init__(self, peerID: _Optional[bytes] = ..., signedPeerRecord: _Optional[bytes] = ...) -> None: ... - FROM_ID_FIELD_NUMBER: builtins.int - DATA_FIELD_NUMBER: builtins.int - SEQNO_FIELD_NUMBER: builtins.int - TOPICIDS_FIELD_NUMBER: builtins.int - SIGNATURE_FIELD_NUMBER: builtins.int - KEY_FIELD_NUMBER: builtins.int - from_id: builtins.bytes - data: builtins.bytes - seqno: builtins.bytes - signature: builtins.bytes - key: builtins.bytes - @property - def topicIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... - def __init__( - self, - *, - from_id: builtins.bytes | None = ..., - data: builtins.bytes | None = ..., - seqno: builtins.bytes | None = ..., - topicIDs: collections.abc.Iterable[builtins.str] | None = ..., - signature: builtins.bytes | None = ..., - key: builtins.bytes | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["data", b"data", "from_id", b"from_id", "key", b"key", "seqno", b"seqno", "signature", b"signature"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["data", b"data", "from_id", b"from_id", "key", b"key", "seqno", b"seqno", "signature", b"signature", "topicIDs", b"topicIDs"]) -> None: ... - -global___Message = Message - -@typing.final -class ControlMessage(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - IHAVE_FIELD_NUMBER: builtins.int - IWANT_FIELD_NUMBER: builtins.int - GRAFT_FIELD_NUMBER: builtins.int - PRUNE_FIELD_NUMBER: builtins.int - @property - def ihave(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIHave]: ... - @property - def iwant(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIWant]: ... - @property - def graft(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlGraft]: ... - @property - def prune(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlPrune]: ... - def __init__( - self, - *, - ihave: collections.abc.Iterable[global___ControlIHave] | None = ..., - iwant: collections.abc.Iterable[global___ControlIWant] | None = ..., - graft: collections.abc.Iterable[global___ControlGraft] | None = ..., - prune: collections.abc.Iterable[global___ControlPrune] | None = ..., - ) -> None: ... - def ClearField(self, field_name: typing.Literal["graft", b"graft", "ihave", b"ihave", "iwant", b"iwant", "prune", b"prune"]) -> None: ... - -global___ControlMessage = ControlMessage - -@typing.final -class ControlIHave(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - TOPICID_FIELD_NUMBER: builtins.int - MESSAGEIDS_FIELD_NUMBER: builtins.int - topicID: builtins.str - @property - def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... - def __init__( - self, - *, - topicID: builtins.str | None = ..., - messageIDs: collections.abc.Iterable[builtins.str] | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs", "topicID", b"topicID"]) -> None: ... - -global___ControlIHave = ControlIHave - -@typing.final -class ControlIWant(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - MESSAGEIDS_FIELD_NUMBER: builtins.int - @property - def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... - def __init__( - self, - *, - messageIDs: collections.abc.Iterable[builtins.str] | None = ..., - ) -> None: ... - def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs"]) -> None: ... - -global___ControlIWant = ControlIWant - -@typing.final -class ControlGraft(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - TOPICID_FIELD_NUMBER: builtins.int - topicID: builtins.str - def __init__( - self, - *, - topicID: builtins.str | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ... - -global___ControlGraft = ControlGraft - -@typing.final -class ControlPrune(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - TOPICID_FIELD_NUMBER: builtins.int - PEERS_FIELD_NUMBER: builtins.int - BACKOFF_FIELD_NUMBER: builtins.int - topicID: builtins.str - backoff: builtins.int - @property - def peers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___PeerInfo]: ... - def __init__( - self, - *, - topicID: builtins.str | None = ..., - peers: collections.abc.Iterable[global___PeerInfo] | None = ..., - backoff: builtins.int | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["backoff", b"backoff", "topicID", b"topicID"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["backoff", b"backoff", "peers", b"peers", "topicID", b"topicID"]) -> None: ... - -global___ControlPrune = ControlPrune - -@typing.final -class PeerInfo(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - PEERID_FIELD_NUMBER: builtins.int - SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int - peerID: builtins.bytes - signedPeerRecord: builtins.bytes - def __init__( - self, - *, - peerID: builtins.bytes | None = ..., - signedPeerRecord: builtins.bytes | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> None: ... - -global___PeerInfo = PeerInfo - -@typing.final -class TopicDescriptor(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - @typing.final - class AuthOpts(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - class _AuthMode: - ValueType = typing.NewType("ValueType", builtins.int) - V: typing_extensions.TypeAlias = ValueType - - class _AuthModeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicDescriptor.AuthOpts._AuthMode.ValueType], builtins.type): - DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - NONE: TopicDescriptor.AuthOpts._AuthMode.ValueType # 0 - """no authentication, anyone can publish""" - KEY: TopicDescriptor.AuthOpts._AuthMode.ValueType # 1 - """only messages signed by keys in the topic descriptor are accepted""" - WOT: TopicDescriptor.AuthOpts._AuthMode.ValueType # 2 - """web of trust, certificates can allow publisher set to grow""" - - class AuthMode(_AuthMode, metaclass=_AuthModeEnumTypeWrapper): ... - NONE: TopicDescriptor.AuthOpts.AuthMode.ValueType # 0 - """no authentication, anyone can publish""" - KEY: TopicDescriptor.AuthOpts.AuthMode.ValueType # 1 - """only messages signed by keys in the topic descriptor are accepted""" - WOT: TopicDescriptor.AuthOpts.AuthMode.ValueType # 2 - """web of trust, certificates can allow publisher set to grow""" - - MODE_FIELD_NUMBER: builtins.int - KEYS_FIELD_NUMBER: builtins.int - mode: global___TopicDescriptor.AuthOpts.AuthMode.ValueType - @property - def keys(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: - """root keys to trust""" - - def __init__( - self, - *, - mode: global___TopicDescriptor.AuthOpts.AuthMode.ValueType | None = ..., - keys: collections.abc.Iterable[builtins.bytes] | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["mode", b"mode"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["keys", b"keys", "mode", b"mode"]) -> None: ... - - @typing.final - class EncOpts(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - class _EncMode: - ValueType = typing.NewType("ValueType", builtins.int) - V: typing_extensions.TypeAlias = ValueType - - class _EncModeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicDescriptor.EncOpts._EncMode.ValueType], builtins.type): - DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - NONE: TopicDescriptor.EncOpts._EncMode.ValueType # 0 - """no encryption, anyone can read""" - SHAREDKEY: TopicDescriptor.EncOpts._EncMode.ValueType # 1 - """messages are encrypted with shared key""" - WOT: TopicDescriptor.EncOpts._EncMode.ValueType # 2 - """web of trust, certificates can allow publisher set to grow""" - - class EncMode(_EncMode, metaclass=_EncModeEnumTypeWrapper): ... - NONE: TopicDescriptor.EncOpts.EncMode.ValueType # 0 - """no encryption, anyone can read""" - SHAREDKEY: TopicDescriptor.EncOpts.EncMode.ValueType # 1 - """messages are encrypted with shared key""" - WOT: TopicDescriptor.EncOpts.EncMode.ValueType # 2 - """web of trust, certificates can allow publisher set to grow""" - - MODE_FIELD_NUMBER: builtins.int - KEYHASHES_FIELD_NUMBER: builtins.int - mode: global___TopicDescriptor.EncOpts.EncMode.ValueType - @property - def keyHashes(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: - """the hashes of the shared keys used (salted)""" - - def __init__( - self, - *, - mode: global___TopicDescriptor.EncOpts.EncMode.ValueType | None = ..., - keyHashes: collections.abc.Iterable[builtins.bytes] | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["mode", b"mode"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["keyHashes", b"keyHashes", "mode", b"mode"]) -> None: ... - - NAME_FIELD_NUMBER: builtins.int - AUTH_FIELD_NUMBER: builtins.int - ENC_FIELD_NUMBER: builtins.int - name: builtins.str - @property - def auth(self) -> global___TopicDescriptor.AuthOpts: ... - @property - def enc(self) -> global___TopicDescriptor.EncOpts: ... - def __init__( - self, - *, - name: builtins.str | None = ..., - auth: global___TopicDescriptor.AuthOpts | None = ..., - enc: global___TopicDescriptor.EncOpts | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["auth", b"auth", "enc", b"enc", "name", b"name"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["auth", b"auth", "enc", b"enc", "name", b"name"]) -> None: ... - -global___TopicDescriptor = TopicDescriptor +class TopicDescriptor(_message.Message): + __slots__ = ("name", "auth", "enc") + class AuthOpts(_message.Message): + __slots__ = ("mode", "keys") + class AuthMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + NONE: _ClassVar[TopicDescriptor.AuthOpts.AuthMode] + KEY: _ClassVar[TopicDescriptor.AuthOpts.AuthMode] + WOT: _ClassVar[TopicDescriptor.AuthOpts.AuthMode] + NONE: TopicDescriptor.AuthOpts.AuthMode + KEY: TopicDescriptor.AuthOpts.AuthMode + WOT: TopicDescriptor.AuthOpts.AuthMode + MODE_FIELD_NUMBER: _ClassVar[int] + KEYS_FIELD_NUMBER: _ClassVar[int] + mode: TopicDescriptor.AuthOpts.AuthMode + keys: _containers.RepeatedScalarFieldContainer[bytes] + def __init__(self, mode: _Optional[_Union[TopicDescriptor.AuthOpts.AuthMode, str]] = ..., keys: _Optional[_Iterable[bytes]] = ...) -> None: ... + class EncOpts(_message.Message): + __slots__ = ("mode", "keyHashes") + class EncMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + NONE: _ClassVar[TopicDescriptor.EncOpts.EncMode] + SHAREDKEY: _ClassVar[TopicDescriptor.EncOpts.EncMode] + WOT: _ClassVar[TopicDescriptor.EncOpts.EncMode] + NONE: TopicDescriptor.EncOpts.EncMode + SHAREDKEY: TopicDescriptor.EncOpts.EncMode + WOT: TopicDescriptor.EncOpts.EncMode + MODE_FIELD_NUMBER: _ClassVar[int] + KEYHASHES_FIELD_NUMBER: _ClassVar[int] + mode: TopicDescriptor.EncOpts.EncMode + keyHashes: _containers.RepeatedScalarFieldContainer[bytes] + def __init__(self, mode: _Optional[_Union[TopicDescriptor.EncOpts.EncMode, str]] = ..., keyHashes: _Optional[_Iterable[bytes]] = ...) -> None: ... + NAME_FIELD_NUMBER: _ClassVar[int] + AUTH_FIELD_NUMBER: _ClassVar[int] + ENC_FIELD_NUMBER: _ClassVar[int] + name: str + auth: TopicDescriptor.AuthOpts + enc: TopicDescriptor.EncOpts + def __init__(self, name: _Optional[str] = ..., auth: _Optional[_Union[TopicDescriptor.AuthOpts, _Mapping]] = ..., enc: _Optional[_Union[TopicDescriptor.EncOpts, _Mapping]] = ...) -> None: ... # type: ignore diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5641ec5d..54430f1b 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -50,12 +50,14 @@ from libp2p.network.stream.exceptions import ( StreamEOF, StreamReset, ) +from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) from libp2p.peer.peerdata import ( PeerDataError, ) +from libp2p.peer.peerstore import create_signed_peer_record from libp2p.tools.async_service import ( Service, ) @@ -247,6 +249,14 @@ class Pubsub(Service, IPubsub): packet.subscriptions.extend( [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) + # Add the sender's signedRecord in the RPC message + envelope = create_signed_peer_record( + self.host.get_id(), + self.host.get_addrs(), + self.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() + return packet async def continuously_read_stream(self, stream: INetStream) -> None: @@ -263,6 +273,27 @@ class Pubsub(Service, IPubsub): incoming: bytes = await read_varint_prefixed_bytes(stream) rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) + + # Process the sender's signed-record if sent + if rpc_incoming.HasField("senderRecord"): + try: + # Convert the signed-peer-record(Envelope) from + # protobuf bytes + envelope, _ = consume_envelope( + rpc_incoming.senderRecord, "libp2p-peer-record" + ) + # Use the default TTL of 2 hours (7200 seconds) + if self.host.get_peerstore().consume_peer_record( + envelope, 7200 + ): + logger.error( + "Updating the Certified-Addr-Book was unsuccessful" + ) + except Exception as e: + logger.error( + "Error updating the certified addr book for peer: %s", e + ) + if rpc_incoming.publish: # deal with RPC.publish for msg in rpc_incoming.publish: @@ -572,6 +603,14 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) + # Add the senderRecord of the peer in the RPC msg + envelope = create_signed_peer_record( + self.host.get_id(), + self.host.get_addrs(), + self.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() + # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -604,6 +643,13 @@ class Pubsub(Service, IPubsub): packet.subscriptions.extend( [rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)] ) + # Add the senderRecord of the peer in the RPC msg + envelope = create_signed_peer_record( + self.host.get_id(), + self.host.get_addrs(), + self.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() # Send out unsubscribe message to all peers await self.message_all_peers(packet.SerializeToString())