From b838a0e3b672eb875047acdf3449e65702f5c0ee Mon Sep 17 00:00:00 2001 From: unniznd Date: Tue, 12 Aug 2025 21:50:10 +0530 Subject: [PATCH 01/17] added none type to return value of negotiate and changed caller handles to handle none. Added newsfragment. --- libp2p/host/basic_host.py | 3 +++ libp2p/protocol_muxer/multiselect.py | 2 +- libp2p/security/security_multistream.py | 7 ++++++- libp2p/stream_muxer/muxer_multistream.py | 7 ++++++- newsfragments/837.fix.rst | 1 + 5 files changed, 17 insertions(+), 3 deletions(-) create mode 100644 newsfragments/837.fix.rst diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 70e41953..008fe7e5 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -288,6 +288,9 @@ class BasicHost(IHost): protocol, handler = await self.multiselect.negotiate( MultiselectCommunicator(net_stream), self.negotiate_timeout ) + if protocol is None: + await net_stream.reset() + raise StreamFailure("No protocol selected") except MultiselectError as error: peer_id = net_stream.muxed_conn.peer_id logger.debug( diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 8d311391..e58c0981 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -53,7 +53,7 @@ class Multiselect(IMultiselectMuxer): self, communicator: IMultiselectCommunicator, negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, - ) -> tuple[TProtocol, StreamHandlerFn | None]: + ) -> tuple[TProtocol | None, StreamHandlerFn | None]: """ Negotiate performs protocol selection. diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 193cc092..d15dbbd9 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -26,6 +26,9 @@ from libp2p.protocol_muxer.multiselect_client import ( from libp2p.protocol_muxer.multiselect_communicator import ( MultiselectCommunicator, ) +from libp2p.transport.exceptions import ( + SecurityUpgradeFailure, +) """ Represents a secured connection object, which includes a connection and details about @@ -104,7 +107,7 @@ class SecurityMultistream(ABC): :param is_initiator: true if we are the initiator, false otherwise :return: selected secure transport """ - protocol: TProtocol + protocol: TProtocol | None communicator = MultiselectCommunicator(conn) if is_initiator: # Select protocol if initiator @@ -114,5 +117,7 @@ class SecurityMultistream(ABC): else: # Select protocol if non-initiator protocol, _ = await self.multiselect.negotiate(communicator) + if protocol is None: + raise SecurityUpgradeFailure("No protocol selected") # Return transport from protocol return self.transports[protocol] diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index 76699c67..d96820a4 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -30,6 +30,9 @@ from libp2p.stream_muxer.yamux.yamux import ( PROTOCOL_ID, Yamux, ) +from libp2p.transport.exceptions import ( + MuxerUpgradeFailure, +) class MuxerMultistream: @@ -73,7 +76,7 @@ class MuxerMultistream: :param conn: conn to choose a transport over :return: selected muxer transport """ - protocol: TProtocol + protocol: TProtocol | None communicator = MultiselectCommunicator(conn) if conn.is_initiator: protocol = await self.multiselect_client.select_one_of( @@ -81,6 +84,8 @@ class MuxerMultistream: ) else: protocol, _ = await self.multiselect.negotiate(communicator) + if protocol is None: + raise MuxerUpgradeFailure("No protocol selected") return self.transports[protocol] async def new_conn(self, conn: ISecureConn, peer_id: ID) -> IMuxedConn: diff --git a/newsfragments/837.fix.rst b/newsfragments/837.fix.rst new file mode 100644 index 00000000..47919c23 --- /dev/null +++ b/newsfragments/837.fix.rst @@ -0,0 +1 @@ +Added multiselect type consistency in negotiate method. Updates all the usages of the method. From 1ecff5437ce8bbd6c2edf66f12f02466d5d3ad7c Mon Sep 17 00:00:00 2001 From: unniznd Date: Thu, 14 Aug 2025 07:29:06 +0530 Subject: [PATCH 02/17] fixed newsfragment filename issue. --- newsfragments/{837.fix.rst => 837.bugfix.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename newsfragments/{837.fix.rst => 837.bugfix.rst} (100%) diff --git a/newsfragments/837.fix.rst b/newsfragments/837.bugfix.rst similarity index 100% rename from newsfragments/837.fix.rst rename to newsfragments/837.bugfix.rst From 621469734949df6e7b9abecfb4edc585f97766d2 Mon Sep 17 00:00:00 2001 From: unniznd Date: Mon, 25 Aug 2025 23:01:35 +0530 Subject: [PATCH 03/17] removed redundant imports --- libp2p/security/security_multistream.py | 3 --- libp2p/stream_muxer/muxer_multistream.py | 3 --- 2 files changed, 6 deletions(-) diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 9b341ed7..a9c4b19c 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -29,9 +29,6 @@ from libp2p.protocol_muxer.multiselect_client import ( from libp2p.protocol_muxer.multiselect_communicator import ( MultiselectCommunicator, ) -from libp2p.transport.exceptions import ( - SecurityUpgradeFailure, -) """ Represents a secured connection object, which includes a connection and details about diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index 4a07b261..322db912 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -33,9 +33,6 @@ from libp2p.stream_muxer.yamux.yamux import ( PROTOCOL_ID, Yamux, ) -from libp2p.transport.exceptions import ( - MuxerUpgradeFailure, -) class MuxerMultistream: From c08007feda758dfc16efb29940f153e751d8922c Mon Sep 17 00:00:00 2001 From: unniznd Date: Wed, 27 Aug 2025 21:54:05 +0530 Subject: [PATCH 04/17] improve error message in basic host --- libp2p/host/basic_host.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 1ef5dda2..ee1bb04d 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -290,7 +290,9 @@ class BasicHost(IHost): ) if protocol is None: await net_stream.reset() - raise StreamFailure("No protocol selected") + raise StreamFailure( + "Failed to negotiate protocol: no protocol selected" + ) except MultiselectError as error: peer_id = net_stream.muxed_conn.peer_id logger.debug( From 9f80dbae12920622416cd774b6db1198965cb718 Mon Sep 17 00:00:00 2001 From: unniznd Date: Wed, 27 Aug 2025 22:05:19 +0530 Subject: [PATCH 05/17] added the testcase for StreamFailure --- tests/core/host/test_basic_host.py | 37 ++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/core/host/test_basic_host.py b/tests/core/host/test_basic_host.py index ed21ad80..635f2863 100644 --- a/tests/core/host/test_basic_host.py +++ b/tests/core/host/test_basic_host.py @@ -1,3 +1,10 @@ +from unittest.mock import ( + AsyncMock, + MagicMock, +) + +import pytest + from libp2p import ( new_swarm, ) @@ -10,6 +17,9 @@ from libp2p.host.basic_host import ( from libp2p.host.defaults import ( get_default_protocols, ) +from libp2p.host.exceptions import ( + StreamFailure, +) def test_default_protocols(): @@ -22,3 +32,30 @@ def test_default_protocols(): # NOTE: comparing keys for equality as handlers may be closures that do not compare # in the way this test is concerned with assert handlers.keys() == get_default_protocols(host).keys() + + +@pytest.mark.trio +async def test_swarm_stream_handler_no_protocol_selected(monkeypatch): + key_pair = create_new_key_pair() + swarm = new_swarm(key_pair) + host = BasicHost(swarm) + + # Create a mock net_stream + net_stream = MagicMock() + net_stream.reset = AsyncMock() + net_stream.muxed_conn.peer_id = "peer-test" + + # Monkeypatch negotiate to simulate "no protocol selected" + async def fake_negotiate(comm, timeout): + return None, None + + monkeypatch.setattr(host.multiselect, "negotiate", fake_negotiate) + + # Now run the handler and expect StreamFailure + with pytest.raises( + StreamFailure, match="Failed to negotiate protocol: no protocol selected" + ): + await host._swarm_stream_handler(net_stream) + + # Ensure reset was called since negotiation failed + net_stream.reset.assert_awaited() From 3c52b859baca1af6ade433569fbd57d083d8f432 Mon Sep 17 00:00:00 2001 From: unniznd Date: Fri, 29 Aug 2025 11:30:17 +0530 Subject: [PATCH 06/17] improved the error message --- libp2p/security/security_multistream.py | 4 +++- libp2p/stream_muxer/muxer_multistream.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index a9c4b19c..f7c81de1 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -118,6 +118,8 @@ class SecurityMultistream(ABC): # Select protocol if non-initiator protocol, _ = await self.multiselect.negotiate(communicator) if protocol is None: - raise MultiselectError("fail to negotiate a security protocol") + raise MultiselectError( + "fail to negotiate a security protocol: no protocl selected" + ) # Return transport from protocol return self.transports[protocol] diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index 322db912..76689c17 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -85,7 +85,9 @@ class MuxerMultistream: else: protocol, _ = await self.multiselect.negotiate(communicator) if protocol is None: - raise MultiselectError("fail to negotiate a stream muxer protocol") + raise MultiselectError( + "fail to negotiate a stream muxer protocol: no protocol selected" + ) return self.transports[protocol] async def new_conn(self, conn: ISecureConn, peer_id: ID) -> IMuxedConn: From 56526b48707de39da8c74e68c31775f38a8352be Mon Sep 17 00:00:00 2001 From: lla-dane Date: Mon, 11 Aug 2025 18:27:11 +0530 Subject: [PATCH 07/17] signed-peer-record transfer integrated with pubsub rpc message trasfer --- libp2p/pubsub/floodsub.py | 10 + libp2p/pubsub/gossipsub.py | 53 +++++ libp2p/pubsub/pb/rpc.proto | 1 + libp2p/pubsub/pb/rpc_pb2.py | 67 +++--- libp2p/pubsub/pb/rpc_pb2.pyi | 435 ++++++++++------------------------- libp2p/pubsub/pubsub.py | 46 ++++ 6 files changed, 266 insertions(+), 346 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 3e0d454f..170f558d 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -15,6 +15,7 @@ from libp2p.custom_types import ( from libp2p.peer.id import ( ID, ) +from libp2p.peer.peerstore import create_signed_peer_record from .exceptions import ( PubsubRouterError, @@ -103,6 +104,15 @@ class FloodSub(IPubsubRouter): ) rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) + # Add the senderRecord of the peer in the RPC msg + if isinstance(self.pubsub, Pubsub): + envelope = create_signed_peer_record( + self.pubsub.host.get_id(), + self.pubsub.host.get_addrs(), + self.pubsub.host.get_private_key(), + ) + rpc_msg.senderRecord = envelope.marshal_envelope() + logger.debug("publishing message %s", pubsub_msg) if self.pubsub is None: diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index c345c138..b7c70c55 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -24,6 +24,7 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) +from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) @@ -34,6 +35,7 @@ from libp2p.peer.peerinfo import ( ) from libp2p.peer.peerstore import ( PERMANENT_ADDR_TTL, + create_signed_peer_record, ) from libp2p.pubsub import ( floodsub, @@ -226,6 +228,27 @@ class GossipSub(IPubsubRouter, Service): :param rpc: RPC message :param sender_peer_id: id of the peer who sent the message """ + # Process the senderRecord if sent + if isinstance(self.pubsub, Pubsub): + if rpc.HasField("senderRecord"): + try: + # Convert the signed-peer-record(Envelope) from + # protobuf bytes + envelope, _ = consume_envelope( + rpc.senderRecord, "libp2p-peer-record" + ) + # Use the default TTL of 2 hours (7200 seconds) + if self.pubsub.host.get_peerstore().consume_peer_record( + envelope, 7200 + ): + logger.error( + "Updating the Certified-Addr-Book was unsuccessful" + ) + except Exception as e: + logger.error( + "Error updating the certified addr book for peer: %s", e + ) + control_message = rpc.control # Relay each rpc control message to the appropriate handler @@ -253,6 +276,15 @@ class GossipSub(IPubsubRouter, Service): ) rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) + # Add the senderRecord of the peer in the RPC msg + if isinstance(self.pubsub, Pubsub): + envelope = create_signed_peer_record( + self.pubsub.host.get_id(), + self.pubsub.host.get_addrs(), + self.pubsub.host.get_private_key(), + ) + rpc_msg.senderRecord = envelope.marshal_envelope() + logger.debug("publishing message %s", pubsub_msg) for peer_id in peers_gen: @@ -818,6 +850,17 @@ class GossipSub(IPubsubRouter, Service): # 1) Package these messages into a single packet packet: rpc_pb2.RPC = rpc_pb2.RPC() + # Here the an RPC message is being created and published in response + # to the iwant control msg, so we will send a freshly created senderRecord + # with the RPC msg + if isinstance(self.pubsub, Pubsub): + envelope = create_signed_peer_record( + self.pubsub.host.get_id(), + self.pubsub.host.get_addrs(), + self.pubsub.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() + packet.publish.extend(msgs_to_forward) if self.pubsub is None: @@ -973,6 +1016,16 @@ class GossipSub(IPubsubRouter, Service): raise NoPubsubAttached # Add control message to packet packet: rpc_pb2.RPC = rpc_pb2.RPC() + + # Add the sender's peer-record in the RPC msg + if isinstance(self.pubsub, Pubsub): + envelope = create_signed_peer_record( + self.pubsub.host.get_id(), + self.pubsub.host.get_addrs(), + self.pubsub.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() + packet.control.CopyFrom(control_msg) # Get stream for peer from pubsub diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index 7abce0d6..d24db281 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -14,6 +14,7 @@ message RPC { } optional ControlMessage control = 3; + optional bytes senderRecord = 4; } message Message { diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index 30f0281b..e4a35745 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: libp2p/pubsub/pb/rpc.proto +# Protobuf Python Version: 4.25.3 """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -13,39 +14,39 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xca\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x12\x14\n\x0csenderRecord\x18\x04 \x01(\x0c\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals()) +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _RPC._serialized_start=42 - _RPC._serialized_end=222 - _RPC_SUBOPTS._serialized_start=177 - _RPC_SUBOPTS._serialized_end=222 - _MESSAGE._serialized_start=224 - _MESSAGE._serialized_end=329 - _CONTROLMESSAGE._serialized_start=332 - _CONTROLMESSAGE._serialized_end=508 - _CONTROLIHAVE._serialized_start=510 - _CONTROLIHAVE._serialized_end=561 - _CONTROLIWANT._serialized_start=563 - _CONTROLIWANT._serialized_end=597 - _CONTROLGRAFT._serialized_start=599 - _CONTROLGRAFT._serialized_end=630 - _CONTROLPRUNE._serialized_start=632 - _CONTROLPRUNE._serialized_end=716 - _PEERINFO._serialized_start=718 - _PEERINFO._serialized_end=770 - _TOPICDESCRIPTOR._serialized_start=773 - _TOPICDESCRIPTOR._serialized_end=1164 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_start=906 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1030 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=992 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1030 - _TOPICDESCRIPTOR_ENCOPTS._serialized_start=1033 - _TOPICDESCRIPTOR_ENCOPTS._serialized_end=1164 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1121 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1164 + _globals['_RPC']._serialized_start=42 + _globals['_RPC']._serialized_end=244 + _globals['_RPC_SUBOPTS']._serialized_start=199 + _globals['_RPC_SUBOPTS']._serialized_end=244 + _globals['_MESSAGE']._serialized_start=246 + _globals['_MESSAGE']._serialized_end=351 + _globals['_CONTROLMESSAGE']._serialized_start=354 + _globals['_CONTROLMESSAGE']._serialized_end=530 + _globals['_CONTROLIHAVE']._serialized_start=532 + _globals['_CONTROLIHAVE']._serialized_end=583 + _globals['_CONTROLIWANT']._serialized_start=585 + _globals['_CONTROLIWANT']._serialized_end=619 + _globals['_CONTROLGRAFT']._serialized_start=621 + _globals['_CONTROLGRAFT']._serialized_end=652 + _globals['_CONTROLPRUNE']._serialized_start=654 + _globals['_CONTROLPRUNE']._serialized_end=738 + _globals['_PEERINFO']._serialized_start=740 + _globals['_PEERINFO']._serialized_end=792 + _globals['_TOPICDESCRIPTOR']._serialized_start=795 + _globals['_TOPICDESCRIPTOR']._serialized_end=1186 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=928 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1052 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1014 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1052 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1055 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1186 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1143 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1186 # @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pb/rpc_pb2.pyi b/libp2p/pubsub/pb/rpc_pb2.pyi index 88738e2e..2609fd11 100644 --- a/libp2p/pubsub/pb/rpc_pb2.pyi +++ b/libp2p/pubsub/pb/rpc_pb2.pyi @@ -1,323 +1,132 @@ -""" -@generated by mypy-protobuf. Do not edit manually! -isort:skip_file -Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto""" +from google.protobuf.internal import containers as _containers +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union -import builtins -import collections.abc -import google.protobuf.descriptor -import google.protobuf.internal.containers -import google.protobuf.internal.enum_type_wrapper -import google.protobuf.message -import sys -import typing +DESCRIPTOR: _descriptor.FileDescriptor -if sys.version_info >= (3, 10): - import typing as typing_extensions -else: - import typing_extensions +class RPC(_message.Message): + __slots__ = ("subscriptions", "publish", "control", "senderRecord") + class SubOpts(_message.Message): + __slots__ = ("subscribe", "topicid") + SUBSCRIBE_FIELD_NUMBER: _ClassVar[int] + TOPICID_FIELD_NUMBER: _ClassVar[int] + subscribe: bool + topicid: str + def __init__(self, subscribe: bool = ..., topicid: _Optional[str] = ...) -> None: ... + SUBSCRIPTIONS_FIELD_NUMBER: _ClassVar[int] + PUBLISH_FIELD_NUMBER: _ClassVar[int] + CONTROL_FIELD_NUMBER: _ClassVar[int] + SENDERRECORD_FIELD_NUMBER: _ClassVar[int] + subscriptions: _containers.RepeatedCompositeFieldContainer[RPC.SubOpts] + publish: _containers.RepeatedCompositeFieldContainer[Message] + control: ControlMessage + senderRecord: bytes + def __init__(self, subscriptions: _Optional[_Iterable[_Union[RPC.SubOpts, _Mapping]]] = ..., publish: _Optional[_Iterable[_Union[Message, _Mapping]]] = ..., control: _Optional[_Union[ControlMessage, _Mapping]] = ..., senderRecord: _Optional[bytes] = ...) -> None: ... # type: ignore -DESCRIPTOR: google.protobuf.descriptor.FileDescriptor +class Message(_message.Message): + __slots__ = ("from_id", "data", "seqno", "topicIDs", "signature", "key") + FROM_ID_FIELD_NUMBER: _ClassVar[int] + DATA_FIELD_NUMBER: _ClassVar[int] + SEQNO_FIELD_NUMBER: _ClassVar[int] + TOPICIDS_FIELD_NUMBER: _ClassVar[int] + SIGNATURE_FIELD_NUMBER: _ClassVar[int] + KEY_FIELD_NUMBER: _ClassVar[int] + from_id: bytes + data: bytes + seqno: bytes + topicIDs: _containers.RepeatedScalarFieldContainer[str] + signature: bytes + key: bytes + def __init__(self, from_id: _Optional[bytes] = ..., data: _Optional[bytes] = ..., seqno: _Optional[bytes] = ..., topicIDs: _Optional[_Iterable[str]] = ..., signature: _Optional[bytes] = ..., key: _Optional[bytes] = ...) -> None: ... -@typing.final -class RPC(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor +class ControlMessage(_message.Message): + __slots__ = ("ihave", "iwant", "graft", "prune") + IHAVE_FIELD_NUMBER: _ClassVar[int] + IWANT_FIELD_NUMBER: _ClassVar[int] + GRAFT_FIELD_NUMBER: _ClassVar[int] + PRUNE_FIELD_NUMBER: _ClassVar[int] + ihave: _containers.RepeatedCompositeFieldContainer[ControlIHave] + iwant: _containers.RepeatedCompositeFieldContainer[ControlIWant] + graft: _containers.RepeatedCompositeFieldContainer[ControlGraft] + prune: _containers.RepeatedCompositeFieldContainer[ControlPrune] + def __init__(self, ihave: _Optional[_Iterable[_Union[ControlIHave, _Mapping]]] = ..., iwant: _Optional[_Iterable[_Union[ControlIWant, _Mapping]]] = ..., graft: _Optional[_Iterable[_Union[ControlGraft, _Mapping]]] = ..., prune: _Optional[_Iterable[_Union[ControlPrune, _Mapping]]] = ...) -> None: ... # type: ignore - @typing.final - class SubOpts(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor +class ControlIHave(_message.Message): + __slots__ = ("topicID", "messageIDs") + TOPICID_FIELD_NUMBER: _ClassVar[int] + MESSAGEIDS_FIELD_NUMBER: _ClassVar[int] + topicID: str + messageIDs: _containers.RepeatedScalarFieldContainer[str] + def __init__(self, topicID: _Optional[str] = ..., messageIDs: _Optional[_Iterable[str]] = ...) -> None: ... - SUBSCRIBE_FIELD_NUMBER: builtins.int - TOPICID_FIELD_NUMBER: builtins.int - subscribe: builtins.bool - """subscribe or unsubscribe""" - topicid: builtins.str - def __init__( - self, - *, - subscribe: builtins.bool | None = ..., - topicid: builtins.str | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["subscribe", b"subscribe", "topicid", b"topicid"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["subscribe", b"subscribe", "topicid", b"topicid"]) -> None: ... +class ControlIWant(_message.Message): + __slots__ = ("messageIDs",) + MESSAGEIDS_FIELD_NUMBER: _ClassVar[int] + messageIDs: _containers.RepeatedScalarFieldContainer[str] + def __init__(self, messageIDs: _Optional[_Iterable[str]] = ...) -> None: ... - SUBSCRIPTIONS_FIELD_NUMBER: builtins.int - PUBLISH_FIELD_NUMBER: builtins.int - CONTROL_FIELD_NUMBER: builtins.int - @property - def subscriptions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RPC.SubOpts]: ... - @property - def publish(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message]: ... - @property - def control(self) -> global___ControlMessage: ... - def __init__( - self, - *, - subscriptions: collections.abc.Iterable[global___RPC.SubOpts] | None = ..., - publish: collections.abc.Iterable[global___Message] | None = ..., - control: global___ControlMessage | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["control", b"control"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["control", b"control", "publish", b"publish", "subscriptions", b"subscriptions"]) -> None: ... +class ControlGraft(_message.Message): + __slots__ = ("topicID",) + TOPICID_FIELD_NUMBER: _ClassVar[int] + topicID: str + def __init__(self, topicID: _Optional[str] = ...) -> None: ... -global___RPC = RPC +class ControlPrune(_message.Message): + __slots__ = ("topicID", "peers", "backoff") + TOPICID_FIELD_NUMBER: _ClassVar[int] + PEERS_FIELD_NUMBER: _ClassVar[int] + BACKOFF_FIELD_NUMBER: _ClassVar[int] + topicID: str + peers: _containers.RepeatedCompositeFieldContainer[PeerInfo] + backoff: int + def __init__(self, topicID: _Optional[str] = ..., peers: _Optional[_Iterable[_Union[PeerInfo, _Mapping]]] = ..., backoff: _Optional[int] = ...) -> None: ... # type: ignore -@typing.final -class Message(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor +class PeerInfo(_message.Message): + __slots__ = ("peerID", "signedPeerRecord") + PEERID_FIELD_NUMBER: _ClassVar[int] + SIGNEDPEERRECORD_FIELD_NUMBER: _ClassVar[int] + peerID: bytes + signedPeerRecord: bytes + def __init__(self, peerID: _Optional[bytes] = ..., signedPeerRecord: _Optional[bytes] = ...) -> None: ... - FROM_ID_FIELD_NUMBER: builtins.int - DATA_FIELD_NUMBER: builtins.int - SEQNO_FIELD_NUMBER: builtins.int - TOPICIDS_FIELD_NUMBER: builtins.int - SIGNATURE_FIELD_NUMBER: builtins.int - KEY_FIELD_NUMBER: builtins.int - from_id: builtins.bytes - data: builtins.bytes - seqno: builtins.bytes - signature: builtins.bytes - key: builtins.bytes - @property - def topicIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... - def __init__( - self, - *, - from_id: builtins.bytes | None = ..., - data: builtins.bytes | None = ..., - seqno: builtins.bytes | None = ..., - topicIDs: collections.abc.Iterable[builtins.str] | None = ..., - signature: builtins.bytes | None = ..., - key: builtins.bytes | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["data", b"data", "from_id", b"from_id", "key", b"key", "seqno", b"seqno", "signature", b"signature"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["data", b"data", "from_id", b"from_id", "key", b"key", "seqno", b"seqno", "signature", b"signature", "topicIDs", b"topicIDs"]) -> None: ... - -global___Message = Message - -@typing.final -class ControlMessage(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - IHAVE_FIELD_NUMBER: builtins.int - IWANT_FIELD_NUMBER: builtins.int - GRAFT_FIELD_NUMBER: builtins.int - PRUNE_FIELD_NUMBER: builtins.int - @property - def ihave(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIHave]: ... - @property - def iwant(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIWant]: ... - @property - def graft(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlGraft]: ... - @property - def prune(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlPrune]: ... - def __init__( - self, - *, - ihave: collections.abc.Iterable[global___ControlIHave] | None = ..., - iwant: collections.abc.Iterable[global___ControlIWant] | None = ..., - graft: collections.abc.Iterable[global___ControlGraft] | None = ..., - prune: collections.abc.Iterable[global___ControlPrune] | None = ..., - ) -> None: ... - def ClearField(self, field_name: typing.Literal["graft", b"graft", "ihave", b"ihave", "iwant", b"iwant", "prune", b"prune"]) -> None: ... - -global___ControlMessage = ControlMessage - -@typing.final -class ControlIHave(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - TOPICID_FIELD_NUMBER: builtins.int - MESSAGEIDS_FIELD_NUMBER: builtins.int - topicID: builtins.str - @property - def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... - def __init__( - self, - *, - topicID: builtins.str | None = ..., - messageIDs: collections.abc.Iterable[builtins.str] | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs", "topicID", b"topicID"]) -> None: ... - -global___ControlIHave = ControlIHave - -@typing.final -class ControlIWant(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - MESSAGEIDS_FIELD_NUMBER: builtins.int - @property - def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... - def __init__( - self, - *, - messageIDs: collections.abc.Iterable[builtins.str] | None = ..., - ) -> None: ... - def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs"]) -> None: ... - -global___ControlIWant = ControlIWant - -@typing.final -class ControlGraft(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - TOPICID_FIELD_NUMBER: builtins.int - topicID: builtins.str - def __init__( - self, - *, - topicID: builtins.str | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ... - -global___ControlGraft = ControlGraft - -@typing.final -class ControlPrune(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - TOPICID_FIELD_NUMBER: builtins.int - PEERS_FIELD_NUMBER: builtins.int - BACKOFF_FIELD_NUMBER: builtins.int - topicID: builtins.str - backoff: builtins.int - @property - def peers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___PeerInfo]: ... - def __init__( - self, - *, - topicID: builtins.str | None = ..., - peers: collections.abc.Iterable[global___PeerInfo] | None = ..., - backoff: builtins.int | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["backoff", b"backoff", "topicID", b"topicID"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["backoff", b"backoff", "peers", b"peers", "topicID", b"topicID"]) -> None: ... - -global___ControlPrune = ControlPrune - -@typing.final -class PeerInfo(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - PEERID_FIELD_NUMBER: builtins.int - SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int - peerID: builtins.bytes - signedPeerRecord: builtins.bytes - def __init__( - self, - *, - peerID: builtins.bytes | None = ..., - signedPeerRecord: builtins.bytes | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> None: ... - -global___PeerInfo = PeerInfo - -@typing.final -class TopicDescriptor(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - @typing.final - class AuthOpts(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - class _AuthMode: - ValueType = typing.NewType("ValueType", builtins.int) - V: typing_extensions.TypeAlias = ValueType - - class _AuthModeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicDescriptor.AuthOpts._AuthMode.ValueType], builtins.type): - DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - NONE: TopicDescriptor.AuthOpts._AuthMode.ValueType # 0 - """no authentication, anyone can publish""" - KEY: TopicDescriptor.AuthOpts._AuthMode.ValueType # 1 - """only messages signed by keys in the topic descriptor are accepted""" - WOT: TopicDescriptor.AuthOpts._AuthMode.ValueType # 2 - """web of trust, certificates can allow publisher set to grow""" - - class AuthMode(_AuthMode, metaclass=_AuthModeEnumTypeWrapper): ... - NONE: TopicDescriptor.AuthOpts.AuthMode.ValueType # 0 - """no authentication, anyone can publish""" - KEY: TopicDescriptor.AuthOpts.AuthMode.ValueType # 1 - """only messages signed by keys in the topic descriptor are accepted""" - WOT: TopicDescriptor.AuthOpts.AuthMode.ValueType # 2 - """web of trust, certificates can allow publisher set to grow""" - - MODE_FIELD_NUMBER: builtins.int - KEYS_FIELD_NUMBER: builtins.int - mode: global___TopicDescriptor.AuthOpts.AuthMode.ValueType - @property - def keys(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: - """root keys to trust""" - - def __init__( - self, - *, - mode: global___TopicDescriptor.AuthOpts.AuthMode.ValueType | None = ..., - keys: collections.abc.Iterable[builtins.bytes] | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["mode", b"mode"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["keys", b"keys", "mode", b"mode"]) -> None: ... - - @typing.final - class EncOpts(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - class _EncMode: - ValueType = typing.NewType("ValueType", builtins.int) - V: typing_extensions.TypeAlias = ValueType - - class _EncModeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicDescriptor.EncOpts._EncMode.ValueType], builtins.type): - DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - NONE: TopicDescriptor.EncOpts._EncMode.ValueType # 0 - """no encryption, anyone can read""" - SHAREDKEY: TopicDescriptor.EncOpts._EncMode.ValueType # 1 - """messages are encrypted with shared key""" - WOT: TopicDescriptor.EncOpts._EncMode.ValueType # 2 - """web of trust, certificates can allow publisher set to grow""" - - class EncMode(_EncMode, metaclass=_EncModeEnumTypeWrapper): ... - NONE: TopicDescriptor.EncOpts.EncMode.ValueType # 0 - """no encryption, anyone can read""" - SHAREDKEY: TopicDescriptor.EncOpts.EncMode.ValueType # 1 - """messages are encrypted with shared key""" - WOT: TopicDescriptor.EncOpts.EncMode.ValueType # 2 - """web of trust, certificates can allow publisher set to grow""" - - MODE_FIELD_NUMBER: builtins.int - KEYHASHES_FIELD_NUMBER: builtins.int - mode: global___TopicDescriptor.EncOpts.EncMode.ValueType - @property - def keyHashes(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: - """the hashes of the shared keys used (salted)""" - - def __init__( - self, - *, - mode: global___TopicDescriptor.EncOpts.EncMode.ValueType | None = ..., - keyHashes: collections.abc.Iterable[builtins.bytes] | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["mode", b"mode"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["keyHashes", b"keyHashes", "mode", b"mode"]) -> None: ... - - NAME_FIELD_NUMBER: builtins.int - AUTH_FIELD_NUMBER: builtins.int - ENC_FIELD_NUMBER: builtins.int - name: builtins.str - @property - def auth(self) -> global___TopicDescriptor.AuthOpts: ... - @property - def enc(self) -> global___TopicDescriptor.EncOpts: ... - def __init__( - self, - *, - name: builtins.str | None = ..., - auth: global___TopicDescriptor.AuthOpts | None = ..., - enc: global___TopicDescriptor.EncOpts | None = ..., - ) -> None: ... - def HasField(self, field_name: typing.Literal["auth", b"auth", "enc", b"enc", "name", b"name"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["auth", b"auth", "enc", b"enc", "name", b"name"]) -> None: ... - -global___TopicDescriptor = TopicDescriptor +class TopicDescriptor(_message.Message): + __slots__ = ("name", "auth", "enc") + class AuthOpts(_message.Message): + __slots__ = ("mode", "keys") + class AuthMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + NONE: _ClassVar[TopicDescriptor.AuthOpts.AuthMode] + KEY: _ClassVar[TopicDescriptor.AuthOpts.AuthMode] + WOT: _ClassVar[TopicDescriptor.AuthOpts.AuthMode] + NONE: TopicDescriptor.AuthOpts.AuthMode + KEY: TopicDescriptor.AuthOpts.AuthMode + WOT: TopicDescriptor.AuthOpts.AuthMode + MODE_FIELD_NUMBER: _ClassVar[int] + KEYS_FIELD_NUMBER: _ClassVar[int] + mode: TopicDescriptor.AuthOpts.AuthMode + keys: _containers.RepeatedScalarFieldContainer[bytes] + def __init__(self, mode: _Optional[_Union[TopicDescriptor.AuthOpts.AuthMode, str]] = ..., keys: _Optional[_Iterable[bytes]] = ...) -> None: ... + class EncOpts(_message.Message): + __slots__ = ("mode", "keyHashes") + class EncMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + NONE: _ClassVar[TopicDescriptor.EncOpts.EncMode] + SHAREDKEY: _ClassVar[TopicDescriptor.EncOpts.EncMode] + WOT: _ClassVar[TopicDescriptor.EncOpts.EncMode] + NONE: TopicDescriptor.EncOpts.EncMode + SHAREDKEY: TopicDescriptor.EncOpts.EncMode + WOT: TopicDescriptor.EncOpts.EncMode + MODE_FIELD_NUMBER: _ClassVar[int] + KEYHASHES_FIELD_NUMBER: _ClassVar[int] + mode: TopicDescriptor.EncOpts.EncMode + keyHashes: _containers.RepeatedScalarFieldContainer[bytes] + def __init__(self, mode: _Optional[_Union[TopicDescriptor.EncOpts.EncMode, str]] = ..., keyHashes: _Optional[_Iterable[bytes]] = ...) -> None: ... + NAME_FIELD_NUMBER: _ClassVar[int] + AUTH_FIELD_NUMBER: _ClassVar[int] + ENC_FIELD_NUMBER: _ClassVar[int] + name: str + auth: TopicDescriptor.AuthOpts + enc: TopicDescriptor.EncOpts + def __init__(self, name: _Optional[str] = ..., auth: _Optional[_Union[TopicDescriptor.AuthOpts, _Mapping]] = ..., enc: _Optional[_Union[TopicDescriptor.EncOpts, _Mapping]] = ...) -> None: ... # type: ignore diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5641ec5d..54430f1b 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -50,12 +50,14 @@ from libp2p.network.stream.exceptions import ( StreamEOF, StreamReset, ) +from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) from libp2p.peer.peerdata import ( PeerDataError, ) +from libp2p.peer.peerstore import create_signed_peer_record from libp2p.tools.async_service import ( Service, ) @@ -247,6 +249,14 @@ class Pubsub(Service, IPubsub): packet.subscriptions.extend( [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) + # Add the sender's signedRecord in the RPC message + envelope = create_signed_peer_record( + self.host.get_id(), + self.host.get_addrs(), + self.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() + return packet async def continuously_read_stream(self, stream: INetStream) -> None: @@ -263,6 +273,27 @@ class Pubsub(Service, IPubsub): incoming: bytes = await read_varint_prefixed_bytes(stream) rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) + + # Process the sender's signed-record if sent + if rpc_incoming.HasField("senderRecord"): + try: + # Convert the signed-peer-record(Envelope) from + # protobuf bytes + envelope, _ = consume_envelope( + rpc_incoming.senderRecord, "libp2p-peer-record" + ) + # Use the default TTL of 2 hours (7200 seconds) + if self.host.get_peerstore().consume_peer_record( + envelope, 7200 + ): + logger.error( + "Updating the Certified-Addr-Book was unsuccessful" + ) + except Exception as e: + logger.error( + "Error updating the certified addr book for peer: %s", e + ) + if rpc_incoming.publish: # deal with RPC.publish for msg in rpc_incoming.publish: @@ -572,6 +603,14 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) + # Add the senderRecord of the peer in the RPC msg + envelope = create_signed_peer_record( + self.host.get_id(), + self.host.get_addrs(), + self.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() + # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -604,6 +643,13 @@ class Pubsub(Service, IPubsub): packet.subscriptions.extend( [rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)] ) + # Add the senderRecord of the peer in the RPC msg + envelope = create_signed_peer_record( + self.host.get_id(), + self.host.get_addrs(), + self.host.get_private_key(), + ) + packet.senderRecord = envelope.marshal_envelope() # Send out unsubscribe message to all peers await self.message_all_peers(packet.SerializeToString()) From d4c387f9234d8231e99a23d9a48d3a269d10a5f9 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Thu, 14 Aug 2025 11:26:14 +0530 Subject: [PATCH 08/17] add reissuing mechanism of records if addrs dont change as done in #815 --- libp2p/pubsub/floodsub.py | 10 +++----- libp2p/pubsub/gossipsub.py | 46 ++++++---------------------------- libp2p/pubsub/pubsub.py | 47 ++++++----------------------------- libp2p/pubsub/utils.py | 51 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 84 deletions(-) create mode 100644 libp2p/pubsub/utils.py diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 170f558d..f0e09404 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -15,7 +15,7 @@ from libp2p.custom_types import ( from libp2p.peer.id import ( ID, ) -from libp2p.peer.peerstore import create_signed_peer_record +from libp2p.pubsub.utils import env_to_send_in_RPC from .exceptions import ( PubsubRouterError, @@ -106,12 +106,8 @@ class FloodSub(IPubsubRouter): # Add the senderRecord of the peer in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope = create_signed_peer_record( - self.pubsub.host.get_id(), - self.pubsub.host.get_addrs(), - self.pubsub.host.get_private_key(), - ) - rpc_msg.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + rpc_msg.senderRecord = envelope_bytes logger.debug("publishing message %s", pubsub_msg) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index b7c70c55..fa221a0f 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -24,7 +24,6 @@ from libp2p.abc import ( from libp2p.custom_types import ( TProtocol, ) -from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) @@ -35,11 +34,11 @@ from libp2p.peer.peerinfo import ( ) from libp2p.peer.peerstore import ( PERMANENT_ADDR_TTL, - create_signed_peer_record, ) from libp2p.pubsub import ( floodsub, ) +from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record from libp2p.tools.async_service import ( Service, ) @@ -230,24 +229,7 @@ class GossipSub(IPubsubRouter, Service): """ # Process the senderRecord if sent if isinstance(self.pubsub, Pubsub): - if rpc.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - rpc.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if self.pubsub.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(rpc, self.pubsub.host) control_message = rpc.control @@ -278,12 +260,8 @@ class GossipSub(IPubsubRouter, Service): # Add the senderRecord of the peer in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope = create_signed_peer_record( - self.pubsub.host.get_id(), - self.pubsub.host.get_addrs(), - self.pubsub.host.get_private_key(), - ) - rpc_msg.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + rpc_msg.senderRecord = envelope_bytes logger.debug("publishing message %s", pubsub_msg) @@ -854,12 +832,8 @@ class GossipSub(IPubsubRouter, Service): # to the iwant control msg, so we will send a freshly created senderRecord # with the RPC msg if isinstance(self.pubsub, Pubsub): - envelope = create_signed_peer_record( - self.pubsub.host.get_id(), - self.pubsub.host.get_addrs(), - self.pubsub.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + packet.senderRecord = envelope_bytes packet.publish.extend(msgs_to_forward) @@ -1019,12 +993,8 @@ class GossipSub(IPubsubRouter, Service): # Add the sender's peer-record in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope = create_signed_peer_record( - self.pubsub.host.get_id(), - self.pubsub.host.get_addrs(), - self.pubsub.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + packet.senderRecord = envelope_bytes packet.control.CopyFrom(control_msg) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 54430f1b..cbaaafb5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -50,14 +50,13 @@ from libp2p.network.stream.exceptions import ( StreamEOF, StreamReset, ) -from libp2p.peer.envelope import consume_envelope from libp2p.peer.id import ( ID, ) from libp2p.peer.peerdata import ( PeerDataError, ) -from libp2p.peer.peerstore import create_signed_peer_record +from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record from libp2p.tools.async_service import ( Service, ) @@ -250,12 +249,8 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) # Add the sender's signedRecord in the RPC message - envelope = create_signed_peer_record( - self.host.get_id(), - self.host.get_addrs(), - self.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.host) + packet.senderRecord = envelope_bytes return packet @@ -275,24 +270,7 @@ class Pubsub(Service, IPubsub): rpc_incoming.ParseFromString(incoming) # Process the sender's signed-record if sent - if rpc_incoming.HasField("senderRecord"): - try: - # Convert the signed-peer-record(Envelope) from - # protobuf bytes - envelope, _ = consume_envelope( - rpc_incoming.senderRecord, "libp2p-peer-record" - ) - # Use the default TTL of 2 hours (7200 seconds) - if self.host.get_peerstore().consume_peer_record( - envelope, 7200 - ): - logger.error( - "Updating the Certified-Addr-Book was unsuccessful" - ) - except Exception as e: - logger.error( - "Error updating the certified addr book for peer: %s", e - ) + _ = maybe_consume_signed_record(rpc_incoming, self.host) if rpc_incoming.publish: # deal with RPC.publish @@ -604,13 +582,8 @@ class Pubsub(Service, IPubsub): ) # Add the senderRecord of the peer in the RPC msg - envelope = create_signed_peer_record( - self.host.get_id(), - self.host.get_addrs(), - self.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() - + envelope_bytes, bool = env_to_send_in_RPC(self.host) + packet.senderRecord = envelope_bytes # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -644,12 +617,8 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)] ) # Add the senderRecord of the peer in the RPC msg - envelope = create_signed_peer_record( - self.host.get_id(), - self.host.get_addrs(), - self.host.get_private_key(), - ) - packet.senderRecord = envelope.marshal_envelope() + envelope_bytes, bool = env_to_send_in_RPC(self.host) + packet.senderRecord = envelope_bytes # Send out unsubscribe message to all peers await self.message_all_peers(packet.SerializeToString()) diff --git a/libp2p/pubsub/utils.py b/libp2p/pubsub/utils.py new file mode 100644 index 00000000..163a2870 --- /dev/null +++ b/libp2p/pubsub/utils.py @@ -0,0 +1,51 @@ +import logging + +from libp2p.abc import IHost +from libp2p.peer.envelope import consume_envelope +from libp2p.peer.peerstore import create_signed_peer_record +from libp2p.pubsub.pb.rpc_pb2 import RPC + +logger = logging.getLogger("pubsub-example.utils") + + +def maybe_consume_signed_record(msg: RPC, host: IHost) -> bool: + if msg.HasField("senderRecord"): + try: + # Convert the signed-peer-record(Envelope) from + # protobuf bytes + envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record") + # Use the default TTL of 2 hours (7200 seconds) + if not host.get_peerstore().consume_peer_record(envelope, 7200): + logger.error("Updating the certified-addr-book was unsuccessful") + except Exception as e: + logger.error("Error updating the certified addr book for peer: %s", e) + return False + return True + + +def env_to_send_in_RPC(host: IHost) -> tuple[bytes, bool]: + listen_addrs_set = {addr for addr in host.get_addrs()} + local_env = host.get_peerstore().get_local_record() + + if local_env is None: + # No cached SPR yet -> create one + return issue_and_cache_local_record(host), True + else: + record_addrs_set = local_env._env_addrs_set() + if record_addrs_set == listen_addrs_set: + # Perfect match -> reuse the cached envelope + return local_env.marshal_envelope(), False + else: + # Addresses changed -> issue a new SPR and cache it + return issue_and_cache_local_record(host), True + + +def issue_and_cache_local_record(host: IHost) -> bytes: + env = create_signed_peer_record( + host.get_id(), + host.get_addrs(), + host.get_private_key(), + ) + # Cache it for next time + host.get_peerstore().set_local_record(env) + return env.marshal_envelope() From cdfb083c0617ce81cad363889b0b0787e2643570 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Thu, 14 Aug 2025 15:53:05 +0530 Subject: [PATCH 09/17] added tests to see if transfer works correctly --- tests/core/pubsub/test_pubsub.py | 61 ++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index e674dbc0..179f359c 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -8,6 +8,7 @@ from typing import ( from unittest.mock import patch import pytest +import multiaddr import trio from libp2p.custom_types import AsyncValidatorFn @@ -17,6 +18,7 @@ from libp2p.exceptions import ( from libp2p.network.stream.exceptions import ( StreamEOF, ) +from libp2p.peer.envelope import Envelope from libp2p.peer.id import ( ID, ) @@ -87,6 +89,45 @@ async def test_re_unsubscribe(): assert TESTING_TOPIC not in pubsubs_fsub[0].topic_ids +@pytest.mark.trio +async def test_reissue_when_listen_addrs_change(): + async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: + await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host) + await pubsubs_fsub[0].subscribe(TESTING_TOPIC) + # Yield to let 0 notify 1 + await trio.sleep(1) + assert pubsubs_fsub[0].my_id in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] + + # Check whether signed-records were transfered properly in the subscribe call + envelope_b_sub = ( + pubsubs_fsub[1] + .host.get_peerstore() + .get_peer_record(pubsubs_fsub[0].host.get_id()) + ) + assert isinstance(envelope_b_sub, Envelope) + + # Simulate pubsubs_fsub[1].host listen addrs changing (different port) + new_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/123") + + # Patch just for the duration we force A to unsubscribe + with patch.object(pubsubs_fsub[0].host, "get_addrs", return_value=[new_addr]): + # Unsubscribe from A's side so that a new_record is issued + await pubsubs_fsub[0].unsubscribe(TESTING_TOPIC) + await trio.sleep(1) + + # B should be holding A's new record with bumped seq + envelope_b_unsub = ( + pubsubs_fsub[1] + .host.get_peerstore() + .get_peer_record(pubsubs_fsub[0].host.get_id()) + ) + assert isinstance(envelope_b_unsub, Envelope) + + # This proves that a freshly signed record was issued rather than + # the latest-cached-one creating one. + assert envelope_b_sub.record().seq < envelope_b_unsub.record().seq + + @pytest.mark.trio async def test_peers_subscribe(): async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: @@ -95,11 +136,31 @@ async def test_peers_subscribe(): # Yield to let 0 notify 1 await trio.sleep(1) assert pubsubs_fsub[0].my_id in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] + + # Check whether signed-records were transfered properly in the subscribe call + envelope_b_sub = ( + pubsubs_fsub[1] + .host.get_peerstore() + .get_peer_record(pubsubs_fsub[0].host.get_id()) + ) + assert isinstance(envelope_b_sub, Envelope) + await pubsubs_fsub[0].unsubscribe(TESTING_TOPIC) # Yield to let 0 notify 1 await trio.sleep(1) assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] + envelope_b_unsub = ( + pubsubs_fsub[1] + .host.get_peerstore() + .get_peer_record(pubsubs_fsub[0].host.get_id()) + ) + assert isinstance(envelope_b_unsub, Envelope) + + # This proves that the latest-cached-record was re-issued rather than + # freshly creating one. + assert envelope_b_sub.record().seq == envelope_b_unsub.record().seq + @pytest.mark.trio async def test_get_hello_packet(): From d99b67eafa3727f8730597860e3634ea629aeb7f Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sun, 17 Aug 2025 13:53:25 +0530 Subject: [PATCH 10/17] now ignoring pubsub messages upon receving invalid-signed-records --- libp2p/pubsub/gossipsub.py | 4 +++- libp2p/pubsub/pubsub.py | 6 +++++- tests/core/pubsub/test_pubsub.py | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index fa221a0f..aaf0b2fa 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -229,7 +229,9 @@ class GossipSub(IPubsubRouter, Service): """ # Process the senderRecord if sent if isinstance(self.pubsub, Pubsub): - _ = maybe_consume_signed_record(rpc, self.pubsub.host) + if not maybe_consume_signed_record(rpc, self.pubsub.host): + logger.error("Received an invalid-signed-record, ignoring the message") + return control_message = rpc.control diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index cbaaafb5..3200c73a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -270,7 +270,11 @@ class Pubsub(Service, IPubsub): rpc_incoming.ParseFromString(incoming) # Process the sender's signed-record if sent - _ = maybe_consume_signed_record(rpc_incoming, self.host) + if not maybe_consume_signed_record(rpc_incoming, self.host): + logger.error( + "Received an invalid-signed-record, ignoring the incoming msg" + ) + continue if rpc_incoming.publish: # deal with RPC.publish diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index 179f359c..54bc67a1 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -11,6 +11,7 @@ import pytest import multiaddr import trio +from libp2p.crypto.rsa import create_new_key_pair from libp2p.custom_types import AsyncValidatorFn from libp2p.exceptions import ( ValidationError, @@ -162,6 +163,27 @@ async def test_peers_subscribe(): assert envelope_b_sub.record().seq == envelope_b_unsub.record().seq +@pytest.mark.trio +async def test_peer_subscribe_fail_upon_invald_record_transfer(): + async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: + await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host) + + # Corrupt host_a's local peer record + envelope = pubsubs_fsub[0].host.get_peerstore().get_local_record() + key_pair = create_new_key_pair() + + if envelope is not None: + envelope.public_key = key_pair.public_key + pubsubs_fsub[0].host.get_peerstore().set_local_record(envelope) + + await pubsubs_fsub[0].subscribe(TESTING_TOPIC) + # Yeild to let 0 notify 1 + await trio.sleep(1) + assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics.get( + TESTING_TOPIC, set() + ) + + @pytest.mark.trio async def test_get_hello_packet(): async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub: From b26e8333bdb5a81fa779ff46938d6c69a7602360 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 23 Aug 2025 18:01:57 +0530 Subject: [PATCH 11/17] updated as per the suggestions in #815 --- libp2p/pubsub/floodsub.py | 4 +-- libp2p/pubsub/gossipsub.py | 11 +++--- libp2p/pubsub/pubsub.py | 11 +++--- libp2p/pubsub/utils.py | 61 ++++++++++++++++---------------- tests/core/pubsub/test_pubsub.py | 22 +++++++++++- 5 files changed, 65 insertions(+), 44 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index f0e09404..8167581d 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -15,7 +15,7 @@ from libp2p.custom_types import ( from libp2p.peer.id import ( ID, ) -from libp2p.pubsub.utils import env_to_send_in_RPC +from libp2p.peer.peerstore import env_to_send_in_RPC from .exceptions import ( PubsubRouterError, @@ -106,7 +106,7 @@ class FloodSub(IPubsubRouter): # Add the senderRecord of the peer in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host) rpc_msg.senderRecord = envelope_bytes logger.debug("publishing message %s", pubsub_msg) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index aaf0b2fa..a4c8c463 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -34,11 +34,12 @@ from libp2p.peer.peerinfo import ( ) from libp2p.peer.peerstore import ( PERMANENT_ADDR_TTL, + env_to_send_in_RPC, ) from libp2p.pubsub import ( floodsub, ) -from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record +from libp2p.pubsub.utils import maybe_consume_signed_record from libp2p.tools.async_service import ( Service, ) @@ -229,7 +230,7 @@ class GossipSub(IPubsubRouter, Service): """ # Process the senderRecord if sent if isinstance(self.pubsub, Pubsub): - if not maybe_consume_signed_record(rpc, self.pubsub.host): + if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id): logger.error("Received an invalid-signed-record, ignoring the message") return @@ -262,7 +263,7 @@ class GossipSub(IPubsubRouter, Service): # Add the senderRecord of the peer in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host) rpc_msg.senderRecord = envelope_bytes logger.debug("publishing message %s", pubsub_msg) @@ -834,7 +835,7 @@ class GossipSub(IPubsubRouter, Service): # to the iwant control msg, so we will send a freshly created senderRecord # with the RPC msg if isinstance(self.pubsub, Pubsub): - envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host) packet.senderRecord = envelope_bytes packet.publish.extend(msgs_to_forward) @@ -995,7 +996,7 @@ class GossipSub(IPubsubRouter, Service): # Add the sender's peer-record in the RPC msg if isinstance(self.pubsub, Pubsub): - envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host) + envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host) packet.senderRecord = envelope_bytes packet.control.CopyFrom(control_msg) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 3200c73a..2c605fc3 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -56,7 +56,8 @@ from libp2p.peer.id import ( from libp2p.peer.peerdata import ( PeerDataError, ) -from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record +from libp2p.peer.peerstore import env_to_send_in_RPC +from libp2p.pubsub.utils import maybe_consume_signed_record from libp2p.tools.async_service import ( Service, ) @@ -249,7 +250,7 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] ) # Add the sender's signedRecord in the RPC message - envelope_bytes, bool = env_to_send_in_RPC(self.host) + envelope_bytes, _ = env_to_send_in_RPC(self.host) packet.senderRecord = envelope_bytes return packet @@ -270,7 +271,7 @@ class Pubsub(Service, IPubsub): rpc_incoming.ParseFromString(incoming) # Process the sender's signed-record if sent - if not maybe_consume_signed_record(rpc_incoming, self.host): + if not maybe_consume_signed_record(rpc_incoming, self.host, peer_id): logger.error( "Received an invalid-signed-record, ignoring the incoming msg" ) @@ -586,7 +587,7 @@ class Pubsub(Service, IPubsub): ) # Add the senderRecord of the peer in the RPC msg - envelope_bytes, bool = env_to_send_in_RPC(self.host) + envelope_bytes, _ = env_to_send_in_RPC(self.host) packet.senderRecord = envelope_bytes # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -621,7 +622,7 @@ class Pubsub(Service, IPubsub): [rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)] ) # Add the senderRecord of the peer in the RPC msg - envelope_bytes, bool = env_to_send_in_RPC(self.host) + envelope_bytes, _ = env_to_send_in_RPC(self.host) packet.senderRecord = envelope_bytes # Send out unsubscribe message to all peers diff --git a/libp2p/pubsub/utils.py b/libp2p/pubsub/utils.py index 163a2870..3a69becb 100644 --- a/libp2p/pubsub/utils.py +++ b/libp2p/pubsub/utils.py @@ -2,50 +2,49 @@ import logging from libp2p.abc import IHost from libp2p.peer.envelope import consume_envelope -from libp2p.peer.peerstore import create_signed_peer_record +from libp2p.peer.id import ID from libp2p.pubsub.pb.rpc_pb2 import RPC logger = logging.getLogger("pubsub-example.utils") -def maybe_consume_signed_record(msg: RPC, host: IHost) -> bool: +def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool: + """ + Attempt to parse and store a signed-peer-record (Envelope) received during + PubSub communication. If the record is invalid, the peer-id does not match, or + updating the peerstore fails, the function logs an error and returns False. + + Parameters + ---------- + msg : RPC + The protobuf message received during PubSub communication. + host : IHost + The local host instance, providing access to the peerstore for storing + verified peer records. + peer_id : ID | None, optional + The expected peer ID for record validation. If provided, the peer ID + inside the record must match this value. + + Returns + ------- + bool + True if a valid signed peer record was successfully consumed and stored, + False otherwise. + + """ if msg.HasField("senderRecord"): try: # Convert the signed-peer-record(Envelope) from # protobuf bytes - envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record") + envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record") + if not record.peer_id == peer_id: + return False + # Use the default TTL of 2 hours (7200 seconds) if not host.get_peerstore().consume_peer_record(envelope, 7200): logger.error("Updating the certified-addr-book was unsuccessful") + return False except Exception as e: logger.error("Error updating the certified addr book for peer: %s", e) return False return True - - -def env_to_send_in_RPC(host: IHost) -> tuple[bytes, bool]: - listen_addrs_set = {addr for addr in host.get_addrs()} - local_env = host.get_peerstore().get_local_record() - - if local_env is None: - # No cached SPR yet -> create one - return issue_and_cache_local_record(host), True - else: - record_addrs_set = local_env._env_addrs_set() - if record_addrs_set == listen_addrs_set: - # Perfect match -> reuse the cached envelope - return local_env.marshal_envelope(), False - else: - # Addresses changed -> issue a new SPR and cache it - return issue_and_cache_local_record(host), True - - -def issue_and_cache_local_record(host: IHost) -> bytes: - env = create_signed_peer_record( - host.get_id(), - host.get_addrs(), - host.get_private_key(), - ) - # Cache it for next time - host.get_peerstore().set_local_record(env) - return env.marshal_envelope() diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index 54bc67a1..9a09f34f 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -19,10 +19,11 @@ from libp2p.exceptions import ( from libp2p.network.stream.exceptions import ( StreamEOF, ) -from libp2p.peer.envelope import Envelope +from libp2p.peer.envelope import Envelope, seal_record from libp2p.peer.id import ( ID, ) +from libp2p.peer.peer_record import PeerRecord from libp2p.pubsub.pb import ( rpc_pb2, ) @@ -170,6 +171,8 @@ async def test_peer_subscribe_fail_upon_invald_record_transfer(): # Corrupt host_a's local peer record envelope = pubsubs_fsub[0].host.get_peerstore().get_local_record() + if envelope is not None: + true_record = envelope.record() key_pair = create_new_key_pair() if envelope is not None: @@ -183,6 +186,23 @@ async def test_peer_subscribe_fail_upon_invald_record_transfer(): TESTING_TOPIC, set() ) + # Create a corrupt envelope with correct signature but false peer-id + false_record = PeerRecord( + ID.from_pubkey(key_pair.public_key), true_record.addrs + ) + false_envelope = seal_record( + false_record, pubsubs_fsub[0].host.get_private_key() + ) + + pubsubs_fsub[0].host.get_peerstore().set_local_record(false_envelope) + + await pubsubs_fsub[0].subscribe(TESTING_TOPIC) + # Yeild to let 0 notify 1 + await trio.sleep(1) + assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics.get( + TESTING_TOPIC, set() + ) + @pytest.mark.trio async def test_get_hello_packet(): From cb5bfeda396d60ab0f5b29030205c04ea4cb73c5 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 23 Aug 2025 18:22:45 +0530 Subject: [PATCH 12/17] Use the same comment in maybe_consume_peer_record function --- libp2p/pubsub/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/utils.py b/libp2p/pubsub/utils.py index 3a69becb..6686ba69 100644 --- a/libp2p/pubsub/utils.py +++ b/libp2p/pubsub/utils.py @@ -42,9 +42,9 @@ def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool: # Use the default TTL of 2 hours (7200 seconds) if not host.get_peerstore().consume_peer_record(envelope, 7200): - logger.error("Updating the certified-addr-book was unsuccessful") + logger.error("Failed to update the Certified-Addr-Book") return False except Exception as e: - logger.error("Error updating the certified addr book for peer: %s", e) + logger.error("Failed to update the Certified-Addr-Book: %s", e) return False return True From 96e2149f4d2234a729b7ea9a00d3f73422fa36dc Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 26 Aug 2025 12:56:20 +0530 Subject: [PATCH 13/17] added newsfragment --- newsfragments/835.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/835.feature.rst diff --git a/newsfragments/835.feature.rst b/newsfragments/835.feature.rst new file mode 100644 index 00000000..7e42f18e --- /dev/null +++ b/newsfragments/835.feature.rst @@ -0,0 +1 @@ +PubSub routers now include signed-peer-records in RPC messages for secure peer-info exchange. From 31040931ea7543e3d993662ddb9564bd77f40c04 Mon Sep 17 00:00:00 2001 From: acul71 Date: Sat, 30 Aug 2025 23:44:49 +0200 Subject: [PATCH 14/17] fix: remove unused upgrade_listener function (Issue 2 from #726) --- libp2p/transport/upgrader.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 8b47fff4..40ba5321 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,9 +1,7 @@ from libp2p.abc import ( - IListener, IMuxedConn, IRawConnection, ISecureConn, - ITransport, ) from libp2p.custom_types import ( TMuxerOptions, @@ -43,10 +41,6 @@ class TransportUpgrader: self.security_multistream = SecurityMultistream(secure_transports_by_protocol) self.muxer_multistream = MuxerMultistream(muxer_transports_by_protocol) - def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None: - """Upgrade multiaddr listeners to libp2p-transport listeners.""" - # TODO: Figure out what to do with this function. - async def upgrade_security( self, raw_conn: IRawConnection, From d620270eafa1b1858874f77b05e51f3dcf6e3a45 Mon Sep 17 00:00:00 2001 From: acul71 Date: Sun, 31 Aug 2025 00:10:15 +0200 Subject: [PATCH 15/17] docs: add newsfragment for issue 883 - remove unused upgrade_listener function --- newsfragments/883.internal.rst | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 newsfragments/883.internal.rst diff --git a/newsfragments/883.internal.rst b/newsfragments/883.internal.rst new file mode 100644 index 00000000..a9ca3a0e --- /dev/null +++ b/newsfragments/883.internal.rst @@ -0,0 +1,5 @@ +Remove unused upgrade_listener function from transport upgrader + +- Remove unused `upgrade_listener` function from `libp2p/transport/upgrader.py` (Issue 2 from #726) +- Clean up unused imports related to the removed function +- Improve code maintainability by removing dead code From 7d6eb28d7c8fa30468de635890a6d194d56014f5 Mon Sep 17 00:00:00 2001 From: unniznd Date: Mon, 1 Sep 2025 09:48:08 +0530 Subject: [PATCH 16/17] message inconsistency fixed --- libp2p/security/security_multistream.py | 2 +- libp2p/stream_muxer/muxer_multistream.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index f7c81de1..ee8d4475 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -119,7 +119,7 @@ class SecurityMultistream(ABC): protocol, _ = await self.multiselect.negotiate(communicator) if protocol is None: raise MultiselectError( - "fail to negotiate a security protocol: no protocl selected" + "Failed to negotiate a security protocol: no protocol selected" ) # Return transport from protocol return self.transports[protocol] diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index 76689c17..ef90fac0 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -86,7 +86,7 @@ class MuxerMultistream: protocol, _ = await self.multiselect.negotiate(communicator) if protocol is None: raise MultiselectError( - "fail to negotiate a stream muxer protocol: no protocol selected" + "Fail to negotiate a stream muxer protocol: no protocol selected" ) return self.transports[protocol] From 10775161968d72b733d2df0bb844aab3fa68b7a0 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Mon, 1 Sep 2025 18:11:22 +0530 Subject: [PATCH 17/17] update newsfragment --- newsfragments/{835.feature.rst => 889.feature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename newsfragments/{835.feature.rst => 889.feature.rst} (100%) diff --git a/newsfragments/835.feature.rst b/newsfragments/889.feature.rst similarity index 100% rename from newsfragments/835.feature.rst rename to newsfragments/889.feature.rst