Merge pull request #889 from lla-dane/pubsub-record

Signed-Peer-Record support in Pubsub/Gossipsub message transfer
This commit is contained in:
Manu Sheel Gupta
2025-09-02 01:39:44 +05:30
committed by GitHub
9 changed files with 363 additions and 346 deletions

View File

@ -15,6 +15,7 @@ from libp2p.custom_types import (
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerstore import env_to_send_in_RPC
from .exceptions import (
PubsubRouterError,
@ -103,6 +104,11 @@ class FloodSub(IPubsubRouter):
)
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
# Add the senderRecord of the peer in the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
rpc_msg.senderRecord = envelope_bytes
logger.debug("publishing message %s", pubsub_msg)
if self.pubsub is None:

View File

@ -34,10 +34,12 @@ from libp2p.peer.peerinfo import (
)
from libp2p.peer.peerstore import (
PERMANENT_ADDR_TTL,
env_to_send_in_RPC,
)
from libp2p.pubsub import (
floodsub,
)
from libp2p.pubsub.utils import maybe_consume_signed_record
from libp2p.tools.async_service import (
Service,
)
@ -226,6 +228,12 @@ class GossipSub(IPubsubRouter, Service):
:param rpc: RPC message
:param sender_peer_id: id of the peer who sent the message
"""
# Process the senderRecord if sent
if isinstance(self.pubsub, Pubsub):
if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id):
logger.error("Received an invalid-signed-record, ignoring the message")
return
control_message = rpc.control
# Relay each rpc control message to the appropriate handler
@ -253,6 +261,11 @@ class GossipSub(IPubsubRouter, Service):
)
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
# Add the senderRecord of the peer in the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
rpc_msg.senderRecord = envelope_bytes
logger.debug("publishing message %s", pubsub_msg)
for peer_id in peers_gen:
@ -818,6 +831,13 @@ class GossipSub(IPubsubRouter, Service):
# 1) Package these messages into a single packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
# Here the an RPC message is being created and published in response
# to the iwant control msg, so we will send a freshly created senderRecord
# with the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
packet.senderRecord = envelope_bytes
packet.publish.extend(msgs_to_forward)
if self.pubsub is None:
@ -973,6 +993,12 @@ class GossipSub(IPubsubRouter, Service):
raise NoPubsubAttached
# Add control message to packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
# Add the sender's peer-record in the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
packet.senderRecord = envelope_bytes
packet.control.CopyFrom(control_msg)
# Get stream for peer from pubsub

View File

@ -14,6 +14,7 @@ message RPC {
}
optional ControlMessage control = 3;
optional bytes senderRecord = 4;
}
message Message {

View File

@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: libp2p/pubsub/pb/rpc.proto
# Protobuf Python Version: 4.25.3
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@ -13,39 +14,39 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xca\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x12\x14\n\x0csenderRecord\x18\x04 \x01(\x0c\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals())
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_RPC._serialized_start=42
_RPC._serialized_end=222
_RPC_SUBOPTS._serialized_start=177
_RPC_SUBOPTS._serialized_end=222
_MESSAGE._serialized_start=224
_MESSAGE._serialized_end=329
_CONTROLMESSAGE._serialized_start=332
_CONTROLMESSAGE._serialized_end=508
_CONTROLIHAVE._serialized_start=510
_CONTROLIHAVE._serialized_end=561
_CONTROLIWANT._serialized_start=563
_CONTROLIWANT._serialized_end=597
_CONTROLGRAFT._serialized_start=599
_CONTROLGRAFT._serialized_end=630
_CONTROLPRUNE._serialized_start=632
_CONTROLPRUNE._serialized_end=716
_PEERINFO._serialized_start=718
_PEERINFO._serialized_end=770
_TOPICDESCRIPTOR._serialized_start=773
_TOPICDESCRIPTOR._serialized_end=1164
_TOPICDESCRIPTOR_AUTHOPTS._serialized_start=906
_TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1030
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=992
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1030
_TOPICDESCRIPTOR_ENCOPTS._serialized_start=1033
_TOPICDESCRIPTOR_ENCOPTS._serialized_end=1164
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1121
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1164
_globals['_RPC']._serialized_start=42
_globals['_RPC']._serialized_end=244
_globals['_RPC_SUBOPTS']._serialized_start=199
_globals['_RPC_SUBOPTS']._serialized_end=244
_globals['_MESSAGE']._serialized_start=246
_globals['_MESSAGE']._serialized_end=351
_globals['_CONTROLMESSAGE']._serialized_start=354
_globals['_CONTROLMESSAGE']._serialized_end=530
_globals['_CONTROLIHAVE']._serialized_start=532
_globals['_CONTROLIHAVE']._serialized_end=583
_globals['_CONTROLIWANT']._serialized_start=585
_globals['_CONTROLIWANT']._serialized_end=619
_globals['_CONTROLGRAFT']._serialized_start=621
_globals['_CONTROLGRAFT']._serialized_end=652
_globals['_CONTROLPRUNE']._serialized_start=654
_globals['_CONTROLPRUNE']._serialized_end=738
_globals['_PEERINFO']._serialized_start=740
_globals['_PEERINFO']._serialized_end=792
_globals['_TOPICDESCRIPTOR']._serialized_start=795
_globals['_TOPICDESCRIPTOR']._serialized_end=1186
_globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=928
_globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1052
_globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1014
_globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1052
_globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1055
_globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1186
_globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1143
_globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1186
# @@protoc_insertion_point(module_scope)

View File

@ -1,323 +1,132 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto"""
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
import builtins
import collections.abc
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
import sys
import typing
DESCRIPTOR: _descriptor.FileDescriptor
if sys.version_info >= (3, 10):
import typing as typing_extensions
else:
import typing_extensions
class RPC(_message.Message):
__slots__ = ("subscriptions", "publish", "control", "senderRecord")
class SubOpts(_message.Message):
__slots__ = ("subscribe", "topicid")
SUBSCRIBE_FIELD_NUMBER: _ClassVar[int]
TOPICID_FIELD_NUMBER: _ClassVar[int]
subscribe: bool
topicid: str
def __init__(self, subscribe: bool = ..., topicid: _Optional[str] = ...) -> None: ...
SUBSCRIPTIONS_FIELD_NUMBER: _ClassVar[int]
PUBLISH_FIELD_NUMBER: _ClassVar[int]
CONTROL_FIELD_NUMBER: _ClassVar[int]
SENDERRECORD_FIELD_NUMBER: _ClassVar[int]
subscriptions: _containers.RepeatedCompositeFieldContainer[RPC.SubOpts]
publish: _containers.RepeatedCompositeFieldContainer[Message]
control: ControlMessage
senderRecord: bytes
def __init__(self, subscriptions: _Optional[_Iterable[_Union[RPC.SubOpts, _Mapping]]] = ..., publish: _Optional[_Iterable[_Union[Message, _Mapping]]] = ..., control: _Optional[_Union[ControlMessage, _Mapping]] = ..., senderRecord: _Optional[bytes] = ...) -> None: ... # type: ignore
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
class Message(_message.Message):
__slots__ = ("from_id", "data", "seqno", "topicIDs", "signature", "key")
FROM_ID_FIELD_NUMBER: _ClassVar[int]
DATA_FIELD_NUMBER: _ClassVar[int]
SEQNO_FIELD_NUMBER: _ClassVar[int]
TOPICIDS_FIELD_NUMBER: _ClassVar[int]
SIGNATURE_FIELD_NUMBER: _ClassVar[int]
KEY_FIELD_NUMBER: _ClassVar[int]
from_id: bytes
data: bytes
seqno: bytes
topicIDs: _containers.RepeatedScalarFieldContainer[str]
signature: bytes
key: bytes
def __init__(self, from_id: _Optional[bytes] = ..., data: _Optional[bytes] = ..., seqno: _Optional[bytes] = ..., topicIDs: _Optional[_Iterable[str]] = ..., signature: _Optional[bytes] = ..., key: _Optional[bytes] = ...) -> None: ...
@typing.final
class RPC(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class ControlMessage(_message.Message):
__slots__ = ("ihave", "iwant", "graft", "prune")
IHAVE_FIELD_NUMBER: _ClassVar[int]
IWANT_FIELD_NUMBER: _ClassVar[int]
GRAFT_FIELD_NUMBER: _ClassVar[int]
PRUNE_FIELD_NUMBER: _ClassVar[int]
ihave: _containers.RepeatedCompositeFieldContainer[ControlIHave]
iwant: _containers.RepeatedCompositeFieldContainer[ControlIWant]
graft: _containers.RepeatedCompositeFieldContainer[ControlGraft]
prune: _containers.RepeatedCompositeFieldContainer[ControlPrune]
def __init__(self, ihave: _Optional[_Iterable[_Union[ControlIHave, _Mapping]]] = ..., iwant: _Optional[_Iterable[_Union[ControlIWant, _Mapping]]] = ..., graft: _Optional[_Iterable[_Union[ControlGraft, _Mapping]]] = ..., prune: _Optional[_Iterable[_Union[ControlPrune, _Mapping]]] = ...) -> None: ... # type: ignore
@typing.final
class SubOpts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class ControlIHave(_message.Message):
__slots__ = ("topicID", "messageIDs")
TOPICID_FIELD_NUMBER: _ClassVar[int]
MESSAGEIDS_FIELD_NUMBER: _ClassVar[int]
topicID: str
messageIDs: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, topicID: _Optional[str] = ..., messageIDs: _Optional[_Iterable[str]] = ...) -> None: ...
SUBSCRIBE_FIELD_NUMBER: builtins.int
TOPICID_FIELD_NUMBER: builtins.int
subscribe: builtins.bool
"""subscribe or unsubscribe"""
topicid: builtins.str
def __init__(
self,
*,
subscribe: builtins.bool | None = ...,
topicid: builtins.str | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["subscribe", b"subscribe", "topicid", b"topicid"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["subscribe", b"subscribe", "topicid", b"topicid"]) -> None: ...
class ControlIWant(_message.Message):
__slots__ = ("messageIDs",)
MESSAGEIDS_FIELD_NUMBER: _ClassVar[int]
messageIDs: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, messageIDs: _Optional[_Iterable[str]] = ...) -> None: ...
SUBSCRIPTIONS_FIELD_NUMBER: builtins.int
PUBLISH_FIELD_NUMBER: builtins.int
CONTROL_FIELD_NUMBER: builtins.int
@property
def subscriptions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RPC.SubOpts]: ...
@property
def publish(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message]: ...
@property
def control(self) -> global___ControlMessage: ...
def __init__(
self,
*,
subscriptions: collections.abc.Iterable[global___RPC.SubOpts] | None = ...,
publish: collections.abc.Iterable[global___Message] | None = ...,
control: global___ControlMessage | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["control", b"control"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["control", b"control", "publish", b"publish", "subscriptions", b"subscriptions"]) -> None: ...
class ControlGraft(_message.Message):
__slots__ = ("topicID",)
TOPICID_FIELD_NUMBER: _ClassVar[int]
topicID: str
def __init__(self, topicID: _Optional[str] = ...) -> None: ...
global___RPC = RPC
class ControlPrune(_message.Message):
__slots__ = ("topicID", "peers", "backoff")
TOPICID_FIELD_NUMBER: _ClassVar[int]
PEERS_FIELD_NUMBER: _ClassVar[int]
BACKOFF_FIELD_NUMBER: _ClassVar[int]
topicID: str
peers: _containers.RepeatedCompositeFieldContainer[PeerInfo]
backoff: int
def __init__(self, topicID: _Optional[str] = ..., peers: _Optional[_Iterable[_Union[PeerInfo, _Mapping]]] = ..., backoff: _Optional[int] = ...) -> None: ... # type: ignore
@typing.final
class Message(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class PeerInfo(_message.Message):
__slots__ = ("peerID", "signedPeerRecord")
PEERID_FIELD_NUMBER: _ClassVar[int]
SIGNEDPEERRECORD_FIELD_NUMBER: _ClassVar[int]
peerID: bytes
signedPeerRecord: bytes
def __init__(self, peerID: _Optional[bytes] = ..., signedPeerRecord: _Optional[bytes] = ...) -> None: ...
FROM_ID_FIELD_NUMBER: builtins.int
DATA_FIELD_NUMBER: builtins.int
SEQNO_FIELD_NUMBER: builtins.int
TOPICIDS_FIELD_NUMBER: builtins.int
SIGNATURE_FIELD_NUMBER: builtins.int
KEY_FIELD_NUMBER: builtins.int
from_id: builtins.bytes
data: builtins.bytes
seqno: builtins.bytes
signature: builtins.bytes
key: builtins.bytes
@property
def topicIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
def __init__(
self,
*,
from_id: builtins.bytes | None = ...,
data: builtins.bytes | None = ...,
seqno: builtins.bytes | None = ...,
topicIDs: collections.abc.Iterable[builtins.str] | None = ...,
signature: builtins.bytes | None = ...,
key: builtins.bytes | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["data", b"data", "from_id", b"from_id", "key", b"key", "seqno", b"seqno", "signature", b"signature"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["data", b"data", "from_id", b"from_id", "key", b"key", "seqno", b"seqno", "signature", b"signature", "topicIDs", b"topicIDs"]) -> None: ...
global___Message = Message
@typing.final
class ControlMessage(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
IHAVE_FIELD_NUMBER: builtins.int
IWANT_FIELD_NUMBER: builtins.int
GRAFT_FIELD_NUMBER: builtins.int
PRUNE_FIELD_NUMBER: builtins.int
@property
def ihave(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIHave]: ...
@property
def iwant(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIWant]: ...
@property
def graft(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlGraft]: ...
@property
def prune(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlPrune]: ...
def __init__(
self,
*,
ihave: collections.abc.Iterable[global___ControlIHave] | None = ...,
iwant: collections.abc.Iterable[global___ControlIWant] | None = ...,
graft: collections.abc.Iterable[global___ControlGraft] | None = ...,
prune: collections.abc.Iterable[global___ControlPrune] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["graft", b"graft", "ihave", b"ihave", "iwant", b"iwant", "prune", b"prune"]) -> None: ...
global___ControlMessage = ControlMessage
@typing.final
class ControlIHave(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TOPICID_FIELD_NUMBER: builtins.int
MESSAGEIDS_FIELD_NUMBER: builtins.int
topicID: builtins.str
@property
def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
def __init__(
self,
*,
topicID: builtins.str | None = ...,
messageIDs: collections.abc.Iterable[builtins.str] | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs", "topicID", b"topicID"]) -> None: ...
global___ControlIHave = ControlIHave
@typing.final
class ControlIWant(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGEIDS_FIELD_NUMBER: builtins.int
@property
def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
def __init__(
self,
*,
messageIDs: collections.abc.Iterable[builtins.str] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs"]) -> None: ...
global___ControlIWant = ControlIWant
@typing.final
class ControlGraft(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TOPICID_FIELD_NUMBER: builtins.int
topicID: builtins.str
def __init__(
self,
*,
topicID: builtins.str | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ...
global___ControlGraft = ControlGraft
@typing.final
class ControlPrune(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TOPICID_FIELD_NUMBER: builtins.int
PEERS_FIELD_NUMBER: builtins.int
BACKOFF_FIELD_NUMBER: builtins.int
topicID: builtins.str
backoff: builtins.int
@property
def peers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___PeerInfo]: ...
def __init__(
self,
*,
topicID: builtins.str | None = ...,
peers: collections.abc.Iterable[global___PeerInfo] | None = ...,
backoff: builtins.int | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["backoff", b"backoff", "topicID", b"topicID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["backoff", b"backoff", "peers", b"peers", "topicID", b"topicID"]) -> None: ...
global___ControlPrune = ControlPrune
@typing.final
class PeerInfo(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PEERID_FIELD_NUMBER: builtins.int
SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int
peerID: builtins.bytes
signedPeerRecord: builtins.bytes
def __init__(
self,
*,
peerID: builtins.bytes | None = ...,
signedPeerRecord: builtins.bytes | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> None: ...
global___PeerInfo = PeerInfo
@typing.final
class TopicDescriptor(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@typing.final
class AuthOpts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class _AuthMode:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType
class _AuthModeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicDescriptor.AuthOpts._AuthMode.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
NONE: TopicDescriptor.AuthOpts._AuthMode.ValueType # 0
"""no authentication, anyone can publish"""
KEY: TopicDescriptor.AuthOpts._AuthMode.ValueType # 1
"""only messages signed by keys in the topic descriptor are accepted"""
WOT: TopicDescriptor.AuthOpts._AuthMode.ValueType # 2
"""web of trust, certificates can allow publisher set to grow"""
class AuthMode(_AuthMode, metaclass=_AuthModeEnumTypeWrapper): ...
NONE: TopicDescriptor.AuthOpts.AuthMode.ValueType # 0
"""no authentication, anyone can publish"""
KEY: TopicDescriptor.AuthOpts.AuthMode.ValueType # 1
"""only messages signed by keys in the topic descriptor are accepted"""
WOT: TopicDescriptor.AuthOpts.AuthMode.ValueType # 2
"""web of trust, certificates can allow publisher set to grow"""
MODE_FIELD_NUMBER: builtins.int
KEYS_FIELD_NUMBER: builtins.int
mode: global___TopicDescriptor.AuthOpts.AuthMode.ValueType
@property
def keys(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]:
"""root keys to trust"""
def __init__(
self,
*,
mode: global___TopicDescriptor.AuthOpts.AuthMode.ValueType | None = ...,
keys: collections.abc.Iterable[builtins.bytes] | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["mode", b"mode"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["keys", b"keys", "mode", b"mode"]) -> None: ...
@typing.final
class EncOpts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class _EncMode:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType
class _EncModeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicDescriptor.EncOpts._EncMode.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
NONE: TopicDescriptor.EncOpts._EncMode.ValueType # 0
"""no encryption, anyone can read"""
SHAREDKEY: TopicDescriptor.EncOpts._EncMode.ValueType # 1
"""messages are encrypted with shared key"""
WOT: TopicDescriptor.EncOpts._EncMode.ValueType # 2
"""web of trust, certificates can allow publisher set to grow"""
class EncMode(_EncMode, metaclass=_EncModeEnumTypeWrapper): ...
NONE: TopicDescriptor.EncOpts.EncMode.ValueType # 0
"""no encryption, anyone can read"""
SHAREDKEY: TopicDescriptor.EncOpts.EncMode.ValueType # 1
"""messages are encrypted with shared key"""
WOT: TopicDescriptor.EncOpts.EncMode.ValueType # 2
"""web of trust, certificates can allow publisher set to grow"""
MODE_FIELD_NUMBER: builtins.int
KEYHASHES_FIELD_NUMBER: builtins.int
mode: global___TopicDescriptor.EncOpts.EncMode.ValueType
@property
def keyHashes(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]:
"""the hashes of the shared keys used (salted)"""
def __init__(
self,
*,
mode: global___TopicDescriptor.EncOpts.EncMode.ValueType | None = ...,
keyHashes: collections.abc.Iterable[builtins.bytes] | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["mode", b"mode"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["keyHashes", b"keyHashes", "mode", b"mode"]) -> None: ...
NAME_FIELD_NUMBER: builtins.int
AUTH_FIELD_NUMBER: builtins.int
ENC_FIELD_NUMBER: builtins.int
name: builtins.str
@property
def auth(self) -> global___TopicDescriptor.AuthOpts: ...
@property
def enc(self) -> global___TopicDescriptor.EncOpts: ...
def __init__(
self,
*,
name: builtins.str | None = ...,
auth: global___TopicDescriptor.AuthOpts | None = ...,
enc: global___TopicDescriptor.EncOpts | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["auth", b"auth", "enc", b"enc", "name", b"name"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["auth", b"auth", "enc", b"enc", "name", b"name"]) -> None: ...
global___TopicDescriptor = TopicDescriptor
class TopicDescriptor(_message.Message):
__slots__ = ("name", "auth", "enc")
class AuthOpts(_message.Message):
__slots__ = ("mode", "keys")
class AuthMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
NONE: _ClassVar[TopicDescriptor.AuthOpts.AuthMode]
KEY: _ClassVar[TopicDescriptor.AuthOpts.AuthMode]
WOT: _ClassVar[TopicDescriptor.AuthOpts.AuthMode]
NONE: TopicDescriptor.AuthOpts.AuthMode
KEY: TopicDescriptor.AuthOpts.AuthMode
WOT: TopicDescriptor.AuthOpts.AuthMode
MODE_FIELD_NUMBER: _ClassVar[int]
KEYS_FIELD_NUMBER: _ClassVar[int]
mode: TopicDescriptor.AuthOpts.AuthMode
keys: _containers.RepeatedScalarFieldContainer[bytes]
def __init__(self, mode: _Optional[_Union[TopicDescriptor.AuthOpts.AuthMode, str]] = ..., keys: _Optional[_Iterable[bytes]] = ...) -> None: ...
class EncOpts(_message.Message):
__slots__ = ("mode", "keyHashes")
class EncMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
NONE: _ClassVar[TopicDescriptor.EncOpts.EncMode]
SHAREDKEY: _ClassVar[TopicDescriptor.EncOpts.EncMode]
WOT: _ClassVar[TopicDescriptor.EncOpts.EncMode]
NONE: TopicDescriptor.EncOpts.EncMode
SHAREDKEY: TopicDescriptor.EncOpts.EncMode
WOT: TopicDescriptor.EncOpts.EncMode
MODE_FIELD_NUMBER: _ClassVar[int]
KEYHASHES_FIELD_NUMBER: _ClassVar[int]
mode: TopicDescriptor.EncOpts.EncMode
keyHashes: _containers.RepeatedScalarFieldContainer[bytes]
def __init__(self, mode: _Optional[_Union[TopicDescriptor.EncOpts.EncMode, str]] = ..., keyHashes: _Optional[_Iterable[bytes]] = ...) -> None: ...
NAME_FIELD_NUMBER: _ClassVar[int]
AUTH_FIELD_NUMBER: _ClassVar[int]
ENC_FIELD_NUMBER: _ClassVar[int]
name: str
auth: TopicDescriptor.AuthOpts
enc: TopicDescriptor.EncOpts
def __init__(self, name: _Optional[str] = ..., auth: _Optional[_Union[TopicDescriptor.AuthOpts, _Mapping]] = ..., enc: _Optional[_Union[TopicDescriptor.EncOpts, _Mapping]] = ...) -> None: ... # type: ignore

View File

@ -56,6 +56,8 @@ from libp2p.peer.id import (
from libp2p.peer.peerdata import (
PeerDataError,
)
from libp2p.peer.peerstore import env_to_send_in_RPC
from libp2p.pubsub.utils import maybe_consume_signed_record
from libp2p.tools.async_service import (
Service,
)
@ -247,6 +249,10 @@ class Pubsub(Service, IPubsub):
packet.subscriptions.extend(
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
)
# Add the sender's signedRecord in the RPC message
envelope_bytes, _ = env_to_send_in_RPC(self.host)
packet.senderRecord = envelope_bytes
return packet
async def continuously_read_stream(self, stream: INetStream) -> None:
@ -263,6 +269,14 @@ class Pubsub(Service, IPubsub):
incoming: bytes = await read_varint_prefixed_bytes(stream)
rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming)
# Process the sender's signed-record if sent
if not maybe_consume_signed_record(rpc_incoming, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, ignoring the incoming msg"
)
continue
if rpc_incoming.publish:
# deal with RPC.publish
for msg in rpc_incoming.publish:
@ -572,6 +586,9 @@ class Pubsub(Service, IPubsub):
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
)
# Add the senderRecord of the peer in the RPC msg
envelope_bytes, _ = env_to_send_in_RPC(self.host)
packet.senderRecord = envelope_bytes
# Send out subscribe message to all peers
await self.message_all_peers(packet.SerializeToString())
@ -604,6 +621,9 @@ class Pubsub(Service, IPubsub):
packet.subscriptions.extend(
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]
)
# Add the senderRecord of the peer in the RPC msg
envelope_bytes, _ = env_to_send_in_RPC(self.host)
packet.senderRecord = envelope_bytes
# Send out unsubscribe message to all peers
await self.message_all_peers(packet.SerializeToString())

50
libp2p/pubsub/utils.py Normal file
View File

@ -0,0 +1,50 @@
import logging
from libp2p.abc import IHost
from libp2p.peer.envelope import consume_envelope
from libp2p.peer.id import ID
from libp2p.pubsub.pb.rpc_pb2 import RPC
logger = logging.getLogger("pubsub-example.utils")
def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool:
"""
Attempt to parse and store a signed-peer-record (Envelope) received during
PubSub communication. If the record is invalid, the peer-id does not match, or
updating the peerstore fails, the function logs an error and returns False.
Parameters
----------
msg : RPC
The protobuf message received during PubSub communication.
host : IHost
The local host instance, providing access to the peerstore for storing
verified peer records.
peer_id : ID | None, optional
The expected peer ID for record validation. If provided, the peer ID
inside the record must match this value.
Returns
-------
bool
True if a valid signed peer record was successfully consumed and stored,
False otherwise.
"""
if msg.HasField("senderRecord"):
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record")
if not record.peer_id == peer_id:
return False
# Use the default TTL of 2 hours (7200 seconds)
if not host.get_peerstore().consume_peer_record(envelope, 7200):
logger.error("Failed to update the Certified-Addr-Book")
return False
except Exception as e:
logger.error("Failed to update the Certified-Addr-Book: %s", e)
return False
return True

View File

@ -0,0 +1 @@
PubSub routers now include signed-peer-records in RPC messages for secure peer-info exchange.

View File

@ -8,8 +8,10 @@ from typing import (
from unittest.mock import patch
import pytest
import multiaddr
import trio
from libp2p.crypto.rsa import create_new_key_pair
from libp2p.custom_types import AsyncValidatorFn
from libp2p.exceptions import (
ValidationError,
@ -17,9 +19,11 @@ from libp2p.exceptions import (
from libp2p.network.stream.exceptions import (
StreamEOF,
)
from libp2p.peer.envelope import Envelope, seal_record
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peer_record import PeerRecord
from libp2p.pubsub.pb import (
rpc_pb2,
)
@ -87,6 +91,45 @@ async def test_re_unsubscribe():
assert TESTING_TOPIC not in pubsubs_fsub[0].topic_ids
@pytest.mark.trio
async def test_reissue_when_listen_addrs_change():
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host)
await pubsubs_fsub[0].subscribe(TESTING_TOPIC)
# Yield to let 0 notify 1
await trio.sleep(1)
assert pubsubs_fsub[0].my_id in pubsubs_fsub[1].peer_topics[TESTING_TOPIC]
# Check whether signed-records were transfered properly in the subscribe call
envelope_b_sub = (
pubsubs_fsub[1]
.host.get_peerstore()
.get_peer_record(pubsubs_fsub[0].host.get_id())
)
assert isinstance(envelope_b_sub, Envelope)
# Simulate pubsubs_fsub[1].host listen addrs changing (different port)
new_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/123")
# Patch just for the duration we force A to unsubscribe
with patch.object(pubsubs_fsub[0].host, "get_addrs", return_value=[new_addr]):
# Unsubscribe from A's side so that a new_record is issued
await pubsubs_fsub[0].unsubscribe(TESTING_TOPIC)
await trio.sleep(1)
# B should be holding A's new record with bumped seq
envelope_b_unsub = (
pubsubs_fsub[1]
.host.get_peerstore()
.get_peer_record(pubsubs_fsub[0].host.get_id())
)
assert isinstance(envelope_b_unsub, Envelope)
# This proves that a freshly signed record was issued rather than
# the latest-cached-one creating one.
assert envelope_b_sub.record().seq < envelope_b_unsub.record().seq
@pytest.mark.trio
async def test_peers_subscribe():
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
@ -95,11 +138,71 @@ async def test_peers_subscribe():
# Yield to let 0 notify 1
await trio.sleep(1)
assert pubsubs_fsub[0].my_id in pubsubs_fsub[1].peer_topics[TESTING_TOPIC]
# Check whether signed-records were transfered properly in the subscribe call
envelope_b_sub = (
pubsubs_fsub[1]
.host.get_peerstore()
.get_peer_record(pubsubs_fsub[0].host.get_id())
)
assert isinstance(envelope_b_sub, Envelope)
await pubsubs_fsub[0].unsubscribe(TESTING_TOPIC)
# Yield to let 0 notify 1
await trio.sleep(1)
assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics[TESTING_TOPIC]
envelope_b_unsub = (
pubsubs_fsub[1]
.host.get_peerstore()
.get_peer_record(pubsubs_fsub[0].host.get_id())
)
assert isinstance(envelope_b_unsub, Envelope)
# This proves that the latest-cached-record was re-issued rather than
# freshly creating one.
assert envelope_b_sub.record().seq == envelope_b_unsub.record().seq
@pytest.mark.trio
async def test_peer_subscribe_fail_upon_invald_record_transfer():
async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub:
await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host)
# Corrupt host_a's local peer record
envelope = pubsubs_fsub[0].host.get_peerstore().get_local_record()
if envelope is not None:
true_record = envelope.record()
key_pair = create_new_key_pair()
if envelope is not None:
envelope.public_key = key_pair.public_key
pubsubs_fsub[0].host.get_peerstore().set_local_record(envelope)
await pubsubs_fsub[0].subscribe(TESTING_TOPIC)
# Yeild to let 0 notify 1
await trio.sleep(1)
assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics.get(
TESTING_TOPIC, set()
)
# Create a corrupt envelope with correct signature but false peer-id
false_record = PeerRecord(
ID.from_pubkey(key_pair.public_key), true_record.addrs
)
false_envelope = seal_record(
false_record, pubsubs_fsub[0].host.get_private_key()
)
pubsubs_fsub[0].host.get_peerstore().set_local_record(false_envelope)
await pubsubs_fsub[0].subscribe(TESTING_TOPIC)
# Yeild to let 0 notify 1
await trio.sleep(1)
assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics.get(
TESTING_TOPIC, set()
)
@pytest.mark.trio
async def test_get_hello_packet():