Merge branch 'main' into add-ws-transport

This commit is contained in:
yashksaini-coder
2025-09-04 15:19:11 +05:30
committed by GitHub
61 changed files with 3936 additions and 634 deletions

View File

@ -1,3 +1,5 @@
"""Libp2p Python implementation."""
from collections.abc import (
Mapping,
Sequence,
@ -6,15 +8,12 @@ from importlib.metadata import version as __version
from typing import (
Literal,
Optional,
Type,
cast,
)
import multiaddr
from libp2p.abc import (
IHost,
IMuxedConn,
INetworkService,
IPeerRouting,
IPeerStore,
@ -33,9 +32,6 @@ from libp2p.custom_types import (
TProtocol,
TSecurityOptions,
)
from libp2p.discovery.mdns.mdns import (
MDNSDiscovery,
)
from libp2p.host.basic_host import (
BasicHost,
)
@ -43,6 +39,8 @@ from libp2p.host.routed_host import (
RoutedHost,
)
from libp2p.network.swarm import (
ConnectionConfig,
RetryConfig,
Swarm,
)
from libp2p.peer.id import (
@ -50,22 +48,25 @@ from libp2p.peer.id import (
)
from libp2p.peer.peerstore import (
PeerStore,
create_signed_peer_record,
)
from libp2p.security.insecure.transport import (
PLAINTEXT_PROTOCOL_ID,
InsecureTransport,
)
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
from libp2p.security.noise.transport import Transport as NoiseTransport
from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
import libp2p.security.secio.transport as secio
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_PROTOCOL_ID,
Mplex,
)
from libp2p.stream_muxer.yamux.yamux import (
PROTOCOL_ID as YAMUX_PROTOCOL_ID,
Yamux,
)
from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID
from libp2p.transport.tcp.tcp import (
TCP,
)
@ -92,7 +93,6 @@ MUXER_MPLEX = "MPLEX"
DEFAULT_NEGOTIATE_TIMEOUT = 5
def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None:
"""
Set the default multiplexer protocol to use.
@ -160,7 +160,6 @@ def get_default_muxer_options() -> TMuxerOptions:
else: # YAMUX is default
return create_yamux_muxer_option()
def new_swarm(
key_pair: KeyPair | None = None,
muxer_opt: TMuxerOptions | None = None,
@ -168,6 +167,8 @@ def new_swarm(
peerstore_opt: IPeerStore | None = None,
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
retry_config: Optional["RetryConfig"] = None,
connection_config: Optional["ConnectionConfig"] = None,
) -> INetworkService:
"""
Create a swarm instance based on the parameters.
@ -284,7 +285,14 @@ def new_swarm(
# Store our key pair in peerstore
peerstore.add_key_pair(id_opt, key_pair)
return Swarm(id_opt, peerstore, upgrader, transport)
return Swarm(
id_opt,
peerstore,
upgrader,
transport,
retry_config=retry_config,
connection_config=connection_config
)
def new_host(
@ -324,6 +332,12 @@ def new_host(
if disc_opt is not None:
return RoutedHost(swarm, disc_opt, enable_mDNS, bootstrap)
return BasicHost(network=swarm,enable_mDNS=enable_mDNS , bootstrap=bootstrap, negotitate_timeout=negotiate_timeout)
return BasicHost(
network=swarm,
enable_mDNS=enable_mDNS,
bootstrap=bootstrap,
negotitate_timeout=negotiate_timeout
)
__version__ = __version("libp2p")

View File

@ -970,6 +970,14 @@ class IPeerStore(
# --------CERTIFIED-ADDR-BOOK----------
@abstractmethod
def get_local_record(self) -> Optional["Envelope"]:
"""Get the local-peer-record wrapped in Envelope"""
@abstractmethod
def set_local_record(self, envelope: "Envelope") -> None:
"""Set the local-peer-record wrapped in Envelope"""
@abstractmethod
def consume_peer_record(self, envelope: "Envelope", ttl: int) -> bool:
"""
@ -1404,15 +1412,16 @@ class INetwork(ABC):
----------
peerstore : IPeerStore
The peer store for managing peer information.
connections : dict[ID, INetConn]
A mapping of peer IDs to network connections.
connections : dict[ID, list[INetConn]]
A mapping of peer IDs to lists of network connections
(multiple connections per peer).
listeners : dict[str, IListener]
A mapping of listener identifiers to listener instances.
"""
peerstore: IPeerStore
connections: dict[ID, INetConn]
connections: dict[ID, list[INetConn]]
listeners: dict[str, IListener]
@abstractmethod
@ -1428,9 +1437,56 @@ class INetwork(ABC):
"""
@abstractmethod
async def dial_peer(self, peer_id: ID) -> INetConn:
def get_connections(self, peer_id: ID | None = None) -> list[INetConn]:
"""
Create a connection to the specified peer.
Get connections for peer (like JS getConnections, Go ConnsToPeer).
Parameters
----------
peer_id : ID | None
The peer ID to get connections for. If None, returns all connections.
Returns
-------
list[INetConn]
List of connections to the specified peer, or all connections
if peer_id is None.
"""
@abstractmethod
def get_connections_map(self) -> dict[ID, list[INetConn]]:
"""
Get all connections map (like JS getConnectionsMap).
Returns
-------
dict[ID, list[INetConn]]
The complete mapping of peer IDs to their connection lists.
"""
@abstractmethod
def get_connection(self, peer_id: ID) -> INetConn | None:
"""
Get single connection for backward compatibility.
Parameters
----------
peer_id : ID
The peer ID to get a connection for.
Returns
-------
INetConn | None
The first available connection, or None if no connections exist.
"""
@abstractmethod
async def dial_peer(self, peer_id: ID) -> list[INetConn]:
"""
Create connections to the specified peer with load balancing.
Parameters
----------
@ -1439,8 +1495,8 @@ class INetwork(ABC):
Returns
-------
INetConn
The network connection instance to the specified peer.
list[INetConn]
List of established connections to the peer.
Raises
------

View File

@ -43,6 +43,7 @@ from libp2p.peer.id import (
from libp2p.peer.peerinfo import (
PeerInfo,
)
from libp2p.peer.peerstore import create_signed_peer_record
from libp2p.protocol_muxer.exceptions import (
MultiselectClientError,
MultiselectError,
@ -110,6 +111,14 @@ class BasicHost(IHost):
if bootstrap:
self.bootstrap = BootstrapDiscovery(network, bootstrap)
# Cache a signed-record if the local-node in the PeerStore
envelope = create_signed_peer_record(
self.get_id(),
self.get_addrs(),
self.get_private_key(),
)
self.get_peerstore().set_local_record(envelope)
def get_id(self) -> ID:
"""
:return: peer_id of host
@ -288,6 +297,11 @@ class BasicHost(IHost):
protocol, handler = await self.multiselect.negotiate(
MultiselectCommunicator(net_stream), self.negotiate_timeout
)
if protocol is None:
await net_stream.reset()
raise StreamFailure(
"Failed to negotiate protocol: no protocol selected"
)
except MultiselectError as error:
peer_id = net_stream.muxed_conn.peer_id
logger.debug(
@ -329,7 +343,7 @@ class BasicHost(IHost):
:param peer_id: ID of the peer to check
:return: True if peer has an active connection, False otherwise
"""
return peer_id in self._network.connections
return len(self._network.get_connections(peer_id)) > 0
def get_peer_connection_info(self, peer_id: ID) -> INetConn | None:
"""
@ -338,4 +352,4 @@ class BasicHost(IHost):
:param peer_id: ID of the peer to get info for
:return: Connection object if peer is connected, None otherwise
"""
return self._network.connections.get(peer_id)
return self._network.get_connection(peer_id)

View File

@ -15,8 +15,7 @@ from libp2p.custom_types import (
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.peer.envelope import seal_record
from libp2p.peer.peer_record import PeerRecord
from libp2p.peer.peerstore import env_to_send_in_RPC
from libp2p.utils import (
decode_varint_with_size,
get_agent_version,
@ -66,9 +65,7 @@ def _mk_identify_protobuf(
protocols = tuple(str(p) for p in host.get_mux().get_protocols() if p is not None)
# Create a signed peer-record for the remote peer
record = PeerRecord(host.get_id(), host.get_addrs())
envelope = seal_record(record, host.get_private_key())
protobuf = envelope.marshal_envelope()
envelope_bytes, _ = env_to_send_in_RPC(host)
observed_addr = observed_multiaddr.to_bytes() if observed_multiaddr else b""
return Identify(
@ -78,7 +75,7 @@ def _mk_identify_protobuf(
listen_addrs=map(_multiaddr_to_bytes, laddrs),
observed_addr=observed_addr,
protocols=protocols,
signedPeerRecord=protobuf,
signedPeerRecord=envelope_bytes,
)

View File

@ -22,15 +22,18 @@ from libp2p.abc import (
IHost,
)
from libp2p.discovery.random_walk.rt_refresh_manager import RTRefreshManager
from libp2p.kad_dht.utils import maybe_consume_signed_record
from libp2p.network.stream.net_stream import (
INetStream,
)
from libp2p.peer.envelope import Envelope
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerinfo import (
PeerInfo,
)
from libp2p.peer.peerstore import env_to_send_in_RPC
from libp2p.tools.async_service import (
Service,
)
@ -234,6 +237,9 @@ class KadDHT(Service):
await self.add_peer(peer_id)
logger.debug(f"Added peer {peer_id} to routing table")
closer_peer_envelope: Envelope | None = None
provider_peer_envelope: Envelope | None = None
try:
# Read varint-prefixed length for the message
length_prefix = b""
@ -274,6 +280,14 @@ class KadDHT(Service):
)
logger.debug(f"Found {len(closest_peers)} peers close to target")
# Consume the source signed_peer_record if sent
if not maybe_consume_signed_record(message, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, dropping the stream"
)
await stream.close()
return
# Build response message with protobuf
response = Message()
response.type = Message.MessageType.FIND_NODE
@ -298,6 +312,21 @@ class KadDHT(Service):
except Exception:
pass
# Add the signed-peer-record for each peer in the peer-proto
# if cached in the peerstore
closer_peer_envelope = (
self.host.get_peerstore().get_peer_record(peer)
)
if closer_peer_envelope is not None:
peer_proto.signedRecord = (
closer_peer_envelope.marshal_envelope()
)
# Create sender_signed_peer_record
envelope_bytes, _ = env_to_send_in_RPC(self.host)
response.senderRecord = envelope_bytes
# Serialize and send response
response_bytes = response.SerializeToString()
await stream.write(varint.encode(len(response_bytes)))
@ -312,6 +341,14 @@ class KadDHT(Service):
key = message.key
logger.debug(f"Received ADD_PROVIDER for key {key.hex()}")
# Consume the source signed-peer-record if sent
if not maybe_consume_signed_record(message, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, dropping the stream"
)
await stream.close()
return
# Extract provider information
for provider_proto in message.providerPeers:
try:
@ -338,6 +375,17 @@ class KadDHT(Service):
logger.debug(
f"Added provider {provider_id} for key {key.hex()}"
)
# Process the signed-records of provider if sent
if not maybe_consume_signed_record(
provider_proto, self.host
):
logger.error(
"Received an invalid-signed-record,"
"dropping the stream"
)
await stream.close()
return
except Exception as e:
logger.warning(f"Failed to process provider info: {e}")
@ -346,6 +394,10 @@ class KadDHT(Service):
response.type = Message.MessageType.ADD_PROVIDER
response.key = key
# Add sender's signed-peer-record
envelope_bytes, _ = env_to_send_in_RPC(self.host)
response.senderRecord = envelope_bytes
response_bytes = response.SerializeToString()
await stream.write(varint.encode(len(response_bytes)))
await stream.write(response_bytes)
@ -357,6 +409,14 @@ class KadDHT(Service):
key = message.key
logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}")
# Consume the source signed_peer_record if sent
if not maybe_consume_signed_record(message, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, dropping the stream"
)
await stream.close()
return
# Find providers for the key
providers = self.provider_store.get_providers(key)
logger.debug(
@ -368,12 +428,28 @@ class KadDHT(Service):
response.type = Message.MessageType.GET_PROVIDERS
response.key = key
# Create sender_signed_peer_record for the response
envelope_bytes, _ = env_to_send_in_RPC(self.host)
response.senderRecord = envelope_bytes
# Add provider information to response
for provider_info in providers:
provider_proto = response.providerPeers.add()
provider_proto.id = provider_info.peer_id.to_bytes()
provider_proto.connection = Message.ConnectionType.CAN_CONNECT
# Add provider signed-records if cached
provider_peer_envelope = (
self.host.get_peerstore().get_peer_record(
provider_info.peer_id
)
)
if provider_peer_envelope is not None:
provider_proto.signedRecord = (
provider_peer_envelope.marshal_envelope()
)
# Add addresses if available
for addr in provider_info.addrs:
provider_proto.addrs.append(addr.to_bytes())
@ -397,6 +473,16 @@ class KadDHT(Service):
peer_proto.id = peer.to_bytes()
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
# Add the signed-records of closest_peers if cached
closer_peer_envelope = (
self.host.get_peerstore().get_peer_record(peer)
)
if closer_peer_envelope is not None:
peer_proto.signedRecord = (
closer_peer_envelope.marshal_envelope()
)
# Add addresses if available
try:
addrs = self.host.get_peerstore().addrs(peer)
@ -417,6 +503,14 @@ class KadDHT(Service):
key = message.key
logger.debug(f"Received GET_VALUE request for key {key.hex()}")
# Consume the sender_signed_peer_record
if not maybe_consume_signed_record(message, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, dropping the stream"
)
await stream.close()
return
value = self.value_store.get(key)
if value:
logger.debug(f"Found value for key {key.hex()}")
@ -431,6 +525,10 @@ class KadDHT(Service):
response.record.value = value
response.record.timeReceived = str(time.time())
# Create sender_signed_peer_record
envelope_bytes, _ = env_to_send_in_RPC(self.host)
response.senderRecord = envelope_bytes
# Serialize and send response
response_bytes = response.SerializeToString()
await stream.write(varint.encode(len(response_bytes)))
@ -444,6 +542,10 @@ class KadDHT(Service):
response.type = Message.MessageType.GET_VALUE
response.key = key
# Create sender_signed_peer_record for the response
envelope_bytes, _ = env_to_send_in_RPC(self.host)
response.senderRecord = envelope_bytes
# Add closest peers to key
closest_peers = self.routing_table.find_local_closest_peers(
key, 20
@ -462,6 +564,16 @@ class KadDHT(Service):
peer_proto.id = peer.to_bytes()
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
# Add signed-records of closer-peers if cached
closer_peer_envelope = (
self.host.get_peerstore().get_peer_record(peer)
)
if closer_peer_envelope is not None:
peer_proto.signedRecord = (
closer_peer_envelope.marshal_envelope()
)
# Add addresses if available
try:
addrs = self.host.get_peerstore().addrs(peer)
@ -484,6 +596,15 @@ class KadDHT(Service):
key = message.record.key
value = message.record.value
success = False
# Consume the source signed_peer_record if sent
if not maybe_consume_signed_record(message, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, dropping the stream"
)
await stream.close()
return
try:
if not (key and value):
raise ValueError(
@ -504,6 +625,12 @@ class KadDHT(Service):
response.type = Message.MessageType.PUT_VALUE
if success:
response.key = key
# Create sender_signed_peer_record for the response
envelope_bytes, _ = env_to_send_in_RPC(self.host)
response.senderRecord = envelope_bytes
# Serialize and send response
response_bytes = response.SerializeToString()
await stream.write(varint.encode(len(response_bytes)))
await stream.write(response_bytes)

View File

@ -27,6 +27,7 @@ message Message {
bytes id = 1;
repeated bytes addrs = 2;
ConnectionType connection = 3;
optional bytes signedRecord = 4; // Envelope(PeerRecord) encoded
}
MessageType type = 1;
@ -35,4 +36,6 @@ message Message {
Record record = 3;
repeated Peer closerPeers = 8;
repeated Peer providerPeers = 9;
optional bytes senderRecord = 11; // Envelope(PeerRecord) encoded
}

View File

@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: libp2p/kad_dht/pb/kademlia.proto
# Protobuf Python Version: 4.25.3
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@ -13,21 +14,21 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n libp2p/kad_dht/pb/kademlia.proto\":\n\x06Record\x12\x0b\n\x03key\x18\x01 \x01(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\x12\x14\n\x0ctimeReceived\x18\x05 \x01(\t\"\xca\x03\n\x07Message\x12\"\n\x04type\x18\x01 \x01(\x0e\x32\x14.Message.MessageType\x12\x17\n\x0f\x63lusterLevelRaw\x18\n \x01(\x05\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x17\n\x06record\x18\x03 \x01(\x0b\x32\x07.Record\x12\"\n\x0b\x63loserPeers\x18\x08 \x03(\x0b\x32\r.Message.Peer\x12$\n\rproviderPeers\x18\t \x03(\x0b\x32\r.Message.Peer\x1aN\n\x04Peer\x12\n\n\x02id\x18\x01 \x01(\x0c\x12\r\n\x05\x61\x64\x64rs\x18\x02 \x03(\x0c\x12+\n\nconnection\x18\x03 \x01(\x0e\x32\x17.Message.ConnectionType\"i\n\x0bMessageType\x12\r\n\tPUT_VALUE\x10\x00\x12\r\n\tGET_VALUE\x10\x01\x12\x10\n\x0c\x41\x44\x44_PROVIDER\x10\x02\x12\x11\n\rGET_PROVIDERS\x10\x03\x12\r\n\tFIND_NODE\x10\x04\x12\x08\n\x04PING\x10\x05\"W\n\x0e\x43onnectionType\x12\x11\n\rNOT_CONNECTED\x10\x00\x12\r\n\tCONNECTED\x10\x01\x12\x0f\n\x0b\x43\x41N_CONNECT\x10\x02\x12\x12\n\x0e\x43\x41NNOT_CONNECT\x10\x03\x62\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n libp2p/kad_dht/pb/kademlia.proto\":\n\x06Record\x12\x0b\n\x03key\x18\x01 \x01(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\x12\x14\n\x0ctimeReceived\x18\x05 \x01(\t\"\xa2\x04\n\x07Message\x12\"\n\x04type\x18\x01 \x01(\x0e\x32\x14.Message.MessageType\x12\x17\n\x0f\x63lusterLevelRaw\x18\n \x01(\x05\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x17\n\x06record\x18\x03 \x01(\x0b\x32\x07.Record\x12\"\n\x0b\x63loserPeers\x18\x08 \x03(\x0b\x32\r.Message.Peer\x12$\n\rproviderPeers\x18\t \x03(\x0b\x32\r.Message.Peer\x12\x19\n\x0csenderRecord\x18\x0b \x01(\x0cH\x00\x88\x01\x01\x1az\n\x04Peer\x12\n\n\x02id\x18\x01 \x01(\x0c\x12\r\n\x05\x61\x64\x64rs\x18\x02 \x03(\x0c\x12+\n\nconnection\x18\x03 \x01(\x0e\x32\x17.Message.ConnectionType\x12\x19\n\x0csignedRecord\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x42\x0f\n\r_signedRecord\"i\n\x0bMessageType\x12\r\n\tPUT_VALUE\x10\x00\x12\r\n\tGET_VALUE\x10\x01\x12\x10\n\x0c\x41\x44\x44_PROVIDER\x10\x02\x12\x11\n\rGET_PROVIDERS\x10\x03\x12\r\n\tFIND_NODE\x10\x04\x12\x08\n\x04PING\x10\x05\"W\n\x0e\x43onnectionType\x12\x11\n\rNOT_CONNECTED\x10\x00\x12\r\n\tCONNECTED\x10\x01\x12\x0f\n\x0b\x43\x41N_CONNECT\x10\x02\x12\x12\n\x0e\x43\x41NNOT_CONNECT\x10\x03\x42\x0f\n\r_senderRecordb\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.kad_dht.pb.kademlia_pb2', globals())
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.kad_dht.pb.kademlia_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_RECORD._serialized_start=36
_RECORD._serialized_end=94
_MESSAGE._serialized_start=97
_MESSAGE._serialized_end=555
_MESSAGE_PEER._serialized_start=281
_MESSAGE_PEER._serialized_end=359
_MESSAGE_MESSAGETYPE._serialized_start=361
_MESSAGE_MESSAGETYPE._serialized_end=466
_MESSAGE_CONNECTIONTYPE._serialized_start=468
_MESSAGE_CONNECTIONTYPE._serialized_end=555
_globals['_RECORD']._serialized_start=36
_globals['_RECORD']._serialized_end=94
_globals['_MESSAGE']._serialized_start=97
_globals['_MESSAGE']._serialized_end=643
_globals['_MESSAGE_PEER']._serialized_start=308
_globals['_MESSAGE_PEER']._serialized_end=430
_globals['_MESSAGE_MESSAGETYPE']._serialized_start=432
_globals['_MESSAGE_MESSAGETYPE']._serialized_end=537
_globals['_MESSAGE_CONNECTIONTYPE']._serialized_start=539
_globals['_MESSAGE_CONNECTIONTYPE']._serialized_end=626
# @@protoc_insertion_point(module_scope)

View File

@ -1,133 +1,70 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
"""
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
import builtins
import collections.abc
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
import sys
import typing
DESCRIPTOR: _descriptor.FileDescriptor
if sys.version_info >= (3, 10):
import typing as typing_extensions
else:
import typing_extensions
class Record(_message.Message):
__slots__ = ("key", "value", "timeReceived")
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
TIMERECEIVED_FIELD_NUMBER: _ClassVar[int]
key: bytes
value: bytes
timeReceived: str
def __init__(self, key: _Optional[bytes] = ..., value: _Optional[bytes] = ..., timeReceived: _Optional[str] = ...) -> None: ...
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
@typing.final
class Record(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
KEY_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
TIMERECEIVED_FIELD_NUMBER: builtins.int
key: builtins.bytes
value: builtins.bytes
timeReceived: builtins.str
def __init__(
self,
*,
key: builtins.bytes = ...,
value: builtins.bytes = ...,
timeReceived: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["key", b"key", "timeReceived", b"timeReceived", "value", b"value"]) -> None: ...
global___Record = Record
@typing.final
class Message(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class _MessageType:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType
class _MessageTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Message._MessageType.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
PUT_VALUE: Message._MessageType.ValueType # 0
GET_VALUE: Message._MessageType.ValueType # 1
ADD_PROVIDER: Message._MessageType.ValueType # 2
GET_PROVIDERS: Message._MessageType.ValueType # 3
FIND_NODE: Message._MessageType.ValueType # 4
PING: Message._MessageType.ValueType # 5
class MessageType(_MessageType, metaclass=_MessageTypeEnumTypeWrapper): ...
PUT_VALUE: Message.MessageType.ValueType # 0
GET_VALUE: Message.MessageType.ValueType # 1
ADD_PROVIDER: Message.MessageType.ValueType # 2
GET_PROVIDERS: Message.MessageType.ValueType # 3
FIND_NODE: Message.MessageType.ValueType # 4
PING: Message.MessageType.ValueType # 5
class _ConnectionType:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType
class _ConnectionTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Message._ConnectionType.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
NOT_CONNECTED: Message._ConnectionType.ValueType # 0
CONNECTED: Message._ConnectionType.ValueType # 1
CAN_CONNECT: Message._ConnectionType.ValueType # 2
CANNOT_CONNECT: Message._ConnectionType.ValueType # 3
class ConnectionType(_ConnectionType, metaclass=_ConnectionTypeEnumTypeWrapper): ...
NOT_CONNECTED: Message.ConnectionType.ValueType # 0
CONNECTED: Message.ConnectionType.ValueType # 1
CAN_CONNECT: Message.ConnectionType.ValueType # 2
CANNOT_CONNECT: Message.ConnectionType.ValueType # 3
@typing.final
class Peer(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ID_FIELD_NUMBER: builtins.int
ADDRS_FIELD_NUMBER: builtins.int
CONNECTION_FIELD_NUMBER: builtins.int
id: builtins.bytes
connection: global___Message.ConnectionType.ValueType
@property
def addrs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ...
def __init__(
self,
*,
id: builtins.bytes = ...,
addrs: collections.abc.Iterable[builtins.bytes] | None = ...,
connection: global___Message.ConnectionType.ValueType = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["addrs", b"addrs", "connection", b"connection", "id", b"id"]) -> None: ...
TYPE_FIELD_NUMBER: builtins.int
CLUSTERLEVELRAW_FIELD_NUMBER: builtins.int
KEY_FIELD_NUMBER: builtins.int
RECORD_FIELD_NUMBER: builtins.int
CLOSERPEERS_FIELD_NUMBER: builtins.int
PROVIDERPEERS_FIELD_NUMBER: builtins.int
type: global___Message.MessageType.ValueType
clusterLevelRaw: builtins.int
key: builtins.bytes
@property
def record(self) -> global___Record: ...
@property
def closerPeers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message.Peer]: ...
@property
def providerPeers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message.Peer]: ...
def __init__(
self,
*,
type: global___Message.MessageType.ValueType = ...,
clusterLevelRaw: builtins.int = ...,
key: builtins.bytes = ...,
record: global___Record | None = ...,
closerPeers: collections.abc.Iterable[global___Message.Peer] | None = ...,
providerPeers: collections.abc.Iterable[global___Message.Peer] | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["record", b"record"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["closerPeers", b"closerPeers", "clusterLevelRaw", b"clusterLevelRaw", "key", b"key", "providerPeers", b"providerPeers", "record", b"record", "type", b"type"]) -> None: ...
global___Message = Message
class Message(_message.Message):
__slots__ = ("type", "clusterLevelRaw", "key", "record", "closerPeers", "providerPeers", "senderRecord")
class MessageType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
PUT_VALUE: _ClassVar[Message.MessageType]
GET_VALUE: _ClassVar[Message.MessageType]
ADD_PROVIDER: _ClassVar[Message.MessageType]
GET_PROVIDERS: _ClassVar[Message.MessageType]
FIND_NODE: _ClassVar[Message.MessageType]
PING: _ClassVar[Message.MessageType]
PUT_VALUE: Message.MessageType
GET_VALUE: Message.MessageType
ADD_PROVIDER: Message.MessageType
GET_PROVIDERS: Message.MessageType
FIND_NODE: Message.MessageType
PING: Message.MessageType
class ConnectionType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
NOT_CONNECTED: _ClassVar[Message.ConnectionType]
CONNECTED: _ClassVar[Message.ConnectionType]
CAN_CONNECT: _ClassVar[Message.ConnectionType]
CANNOT_CONNECT: _ClassVar[Message.ConnectionType]
NOT_CONNECTED: Message.ConnectionType
CONNECTED: Message.ConnectionType
CAN_CONNECT: Message.ConnectionType
CANNOT_CONNECT: Message.ConnectionType
class Peer(_message.Message):
__slots__ = ("id", "addrs", "connection", "signedRecord")
ID_FIELD_NUMBER: _ClassVar[int]
ADDRS_FIELD_NUMBER: _ClassVar[int]
CONNECTION_FIELD_NUMBER: _ClassVar[int]
SIGNEDRECORD_FIELD_NUMBER: _ClassVar[int]
id: bytes
addrs: _containers.RepeatedScalarFieldContainer[bytes]
connection: Message.ConnectionType
signedRecord: bytes
def __init__(self, id: _Optional[bytes] = ..., addrs: _Optional[_Iterable[bytes]] = ..., connection: _Optional[_Union[Message.ConnectionType, str]] = ..., signedRecord: _Optional[bytes] = ...) -> None: ...
TYPE_FIELD_NUMBER: _ClassVar[int]
CLUSTERLEVELRAW_FIELD_NUMBER: _ClassVar[int]
KEY_FIELD_NUMBER: _ClassVar[int]
RECORD_FIELD_NUMBER: _ClassVar[int]
CLOSERPEERS_FIELD_NUMBER: _ClassVar[int]
PROVIDERPEERS_FIELD_NUMBER: _ClassVar[int]
SENDERRECORD_FIELD_NUMBER: _ClassVar[int]
type: Message.MessageType
clusterLevelRaw: int
key: bytes
record: Record
closerPeers: _containers.RepeatedCompositeFieldContainer[Message.Peer]
providerPeers: _containers.RepeatedCompositeFieldContainer[Message.Peer]
senderRecord: bytes
def __init__(self, type: _Optional[_Union[Message.MessageType, str]] = ..., clusterLevelRaw: _Optional[int] = ..., key: _Optional[bytes] = ..., record: _Optional[_Union[Record, _Mapping]] = ..., closerPeers: _Optional[_Iterable[_Union[Message.Peer, _Mapping]]] = ..., providerPeers: _Optional[_Iterable[_Union[Message.Peer, _Mapping]]] = ..., senderRecord: _Optional[bytes] = ...) -> None: ... # type: ignore

View File

@ -15,12 +15,14 @@ from libp2p.abc import (
INetStream,
IPeerRouting,
)
from libp2p.peer.envelope import Envelope
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerinfo import (
PeerInfo,
)
from libp2p.peer.peerstore import env_to_send_in_RPC
from .common import (
ALPHA,
@ -33,6 +35,7 @@ from .routing_table import (
RoutingTable,
)
from .utils import (
maybe_consume_signed_record,
sort_peer_ids_by_distance,
)
@ -255,6 +258,10 @@ class PeerRouting(IPeerRouting):
find_node_msg.type = Message.MessageType.FIND_NODE
find_node_msg.key = target_key # Set target key directly as bytes
# Create sender_signed_peer_record
envelope_bytes, _ = env_to_send_in_RPC(self.host)
find_node_msg.senderRecord = envelope_bytes
# Serialize and send the protobuf message with varint length prefix
proto_bytes = find_node_msg.SerializeToString()
logger.debug(
@ -299,7 +306,22 @@ class PeerRouting(IPeerRouting):
# Process closest peers from response
if response_msg.type == Message.MessageType.FIND_NODE:
# Consume the sender_signed_peer_record
if not maybe_consume_signed_record(response_msg, self.host, peer):
logger.error(
"Received an invalid-signed-record,ignoring the response"
)
return []
for peer_data in response_msg.closerPeers:
# Consume the received closer_peers signed-records, peer-id is
# sent with the peer-data
if not maybe_consume_signed_record(peer_data, self.host):
logger.error(
"Received an invalid-signed-record,ignoring the response"
)
return []
new_peer_id = ID(peer_data.id)
if new_peer_id not in results:
results.append(new_peer_id)
@ -332,6 +354,7 @@ class PeerRouting(IPeerRouting):
"""
try:
# Read message length
peer_id = stream.muxed_conn.peer_id
length_bytes = await stream.read(4)
if not length_bytes:
return
@ -345,10 +368,18 @@ class PeerRouting(IPeerRouting):
# Parse protobuf message
kad_message = Message()
closer_peer_envelope: Envelope | None = None
try:
kad_message.ParseFromString(message_bytes)
if kad_message.type == Message.MessageType.FIND_NODE:
# Consume the sender's signed-peer-record if sent
if not maybe_consume_signed_record(kad_message, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, dropping the stream"
)
return
# Get target key directly from protobuf message
target_key = kad_message.key
@ -361,12 +392,26 @@ class PeerRouting(IPeerRouting):
response = Message()
response.type = Message.MessageType.FIND_NODE
# Create sender_signed_peer_record for the response
envelope_bytes, _ = env_to_send_in_RPC(self.host)
response.senderRecord = envelope_bytes
# Add peer information to response
for peer_id in closest_peers:
peer_proto = response.closerPeers.add()
peer_proto.id = peer_id.to_bytes()
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
# Add the signed-records of closest_peers if cached
closer_peer_envelope = (
self.host.get_peerstore().get_peer_record(peer_id)
)
if isinstance(closer_peer_envelope, Envelope):
peer_proto.signedRecord = (
closer_peer_envelope.marshal_envelope()
)
# Add addresses if available
try:
addrs = self.host.get_peerstore().addrs(peer_id)

View File

@ -22,12 +22,14 @@ from libp2p.abc import (
from libp2p.custom_types import (
TProtocol,
)
from libp2p.kad_dht.utils import maybe_consume_signed_record
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerinfo import (
PeerInfo,
)
from libp2p.peer.peerstore import env_to_send_in_RPC
from .common import (
ALPHA,
@ -240,11 +242,18 @@ class ProviderStore:
message.type = Message.MessageType.ADD_PROVIDER
message.key = key
# Create sender's signed-peer-record
envelope_bytes, _ = env_to_send_in_RPC(self.host)
message.senderRecord = envelope_bytes
# Add our provider info
provider = message.providerPeers.add()
provider.id = self.local_peer_id.to_bytes()
provider.addrs.extend(addrs)
# Add the provider's signed-peer-record
provider.signedRecord = envelope_bytes
# Serialize and send the message
proto_bytes = message.SerializeToString()
await stream.write(varint.encode(len(proto_bytes)))
@ -276,10 +285,15 @@ class ProviderStore:
response = Message()
response.ParseFromString(response_bytes)
# Check response type
response.type == Message.MessageType.ADD_PROVIDER
if response.type:
result = True
if response.type == Message.MessageType.ADD_PROVIDER:
# Consume the sender's signed-peer-record if sent
if not maybe_consume_signed_record(response, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, ignoring the response"
)
result = False
else:
result = True
except Exception as e:
logger.warning(f"Error sending ADD_PROVIDER to {peer_id}: {e}")
@ -380,6 +394,10 @@ class ProviderStore:
message.type = Message.MessageType.GET_PROVIDERS
message.key = key
# Create sender's signed-peer-record
envelope_bytes, _ = env_to_send_in_RPC(self.host)
message.senderRecord = envelope_bytes
# Serialize and send the message
proto_bytes = message.SerializeToString()
await stream.write(varint.encode(len(proto_bytes)))
@ -414,10 +432,26 @@ class ProviderStore:
if response.type != Message.MessageType.GET_PROVIDERS:
return []
# Consume the sender's signed-peer-record if sent
if not maybe_consume_signed_record(response, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, ignoring the response"
)
return []
# Extract provider information
providers = []
for provider_proto in response.providerPeers:
try:
# Consume the provider's signed-peer-record if sent, peer-id
# already sent with the provider-proto
if not maybe_consume_signed_record(provider_proto, self.host):
logger.error(
"Received an invalid-signed-record, "
"ignoring the response"
)
return []
# Create peer ID from bytes
provider_id = ID(provider_proto.id)
@ -431,6 +465,7 @@ class ProviderStore:
# Create PeerInfo and add to result
providers.append(PeerInfo(provider_id, addrs))
except Exception as e:
logger.warning(f"Failed to parse provider info: {e}")

View File

@ -2,13 +2,93 @@
Utility functions for Kademlia DHT implementation.
"""
import logging
import base58
import multihash
from libp2p.abc import IHost
from libp2p.peer.envelope import consume_envelope
from libp2p.peer.id import (
ID,
)
from .pb.kademlia_pb2 import (
Message,
)
logger = logging.getLogger("kademlia-example.utils")
def maybe_consume_signed_record(
msg: Message | Message.Peer, host: IHost, peer_id: ID | None = None
) -> bool:
"""
Attempt to parse and store a signed-peer-record (Envelope) received during
DHT communication. If the record is invalid, the peer-id does not match, or
updating the peerstore fails, the function logs an error and returns False.
Parameters
----------
msg : Message | Message.Peer
The protobuf message received during DHT communication. Can either be a
top-level `Message` containing `senderRecord` or a `Message.Peer`
containing `signedRecord`.
host : IHost
The local host instance, providing access to the peerstore for storing
verified peer records.
peer_id : ID | None, optional
The expected peer ID for record validation. If provided, the peer ID
inside the record must match this value.
Returns
-------
bool
True if a valid signed peer record was successfully consumed and stored,
False otherwise.
"""
if isinstance(msg, Message):
if msg.HasField("senderRecord"):
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, record = consume_envelope(
msg.senderRecord,
"libp2p-peer-record",
)
if not (isinstance(peer_id, ID) and record.peer_id == peer_id):
return False
# Use the default TTL of 2 hours (7200 seconds)
if not host.get_peerstore().consume_peer_record(envelope, 7200):
logger.error("Failed to update the Certified-Addr-Book")
return False
except Exception as e:
logger.error("Failed to update the Certified-Addr-Book: %s", e)
return False
else:
if msg.HasField("signedRecord"):
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, record = consume_envelope(
msg.signedRecord,
"libp2p-peer-record",
)
if not record.peer_id.to_bytes() == msg.id:
return False
# Use the default TTL of 2 hours (7200 seconds)
if not host.get_peerstore().consume_peer_record(envelope, 7200):
logger.error("Failed to update the Certified-Addr-Book")
return False
except Exception as e:
logger.error(
"Failed to update the Certified-Addr-Book: %s",
e,
)
return False
return True
def create_key_from_binary(binary_data: bytes) -> bytes:
"""

View File

@ -15,9 +15,11 @@ from libp2p.abc import (
from libp2p.custom_types import (
TProtocol,
)
from libp2p.kad_dht.utils import maybe_consume_signed_record
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerstore import env_to_send_in_RPC
from .common import (
DEFAULT_TTL,
@ -110,6 +112,10 @@ class ValueStore:
message = Message()
message.type = Message.MessageType.PUT_VALUE
# Create sender's signed-peer-record
envelope_bytes, _ = env_to_send_in_RPC(self.host)
message.senderRecord = envelope_bytes
# Set message fields
message.key = key
message.record.key = key
@ -155,7 +161,13 @@ class ValueStore:
# Check if response is valid
if response.type == Message.MessageType.PUT_VALUE:
if response.key:
# Consume the sender's signed-peer-record if sent
if not maybe_consume_signed_record(response, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, ignoring the response"
)
return False
if response.key == key:
result = True
return result
@ -231,6 +243,10 @@ class ValueStore:
message.type = Message.MessageType.GET_VALUE
message.key = key
# Create sender's signed-peer-record
envelope_bytes, _ = env_to_send_in_RPC(self.host)
message.senderRecord = envelope_bytes
# Serialize and send the protobuf message
proto_bytes = message.SerializeToString()
await stream.write(varint.encode(len(proto_bytes)))
@ -275,6 +291,13 @@ class ValueStore:
and response.HasField("record")
and response.record.value
):
# Consume the sender's signed-peer-record
if not maybe_consume_signed_record(response, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, ignoring the response"
)
return None
logger.debug(
f"Received value for key {key.hex()} from peer {peer_id}"
)

View File

@ -2,7 +2,9 @@ from collections.abc import (
Awaitable,
Callable,
)
from dataclasses import dataclass
import logging
import random
from multiaddr import (
Multiaddr,
@ -59,6 +61,59 @@ from .exceptions import (
logger = logging.getLogger("libp2p.network.swarm")
@dataclass
class RetryConfig:
"""
Configuration for retry logic with exponential backoff.
This configuration controls how connection attempts are retried when they fail.
The retry mechanism uses exponential backoff with jitter to prevent thundering
herd problems in distributed systems.
Attributes:
max_retries: Maximum number of retry attempts before giving up.
Default: 3 attempts
initial_delay: Initial delay in seconds before the first retry.
Default: 0.1 seconds (100ms)
max_delay: Maximum delay cap in seconds to prevent excessive wait times.
Default: 30.0 seconds
backoff_multiplier: Multiplier for exponential backoff (each retry multiplies
the delay by this factor). Default: 2.0 (doubles each time)
jitter_factor: Random jitter factor (0.0-1.0) to add randomness to delays
and prevent synchronized retries. Default: 0.1 (10% jitter)
"""
max_retries: int = 3
initial_delay: float = 0.1
max_delay: float = 30.0
backoff_multiplier: float = 2.0
jitter_factor: float = 0.1
@dataclass
class ConnectionConfig:
"""
Configuration for multi-connection support.
This configuration controls how multiple connections per peer are managed,
including connection limits, timeouts, and load balancing strategies.
Attributes:
max_connections_per_peer: Maximum number of connections allowed to a single
peer. Default: 3 connections
connection_timeout: Timeout in seconds for establishing new connections.
Default: 30.0 seconds
load_balancing_strategy: Strategy for distributing streams across connections.
Options: "round_robin" (default) or "least_loaded"
"""
max_connections_per_peer: int = 3
connection_timeout: float = 30.0
load_balancing_strategy: str = "round_robin" # or "least_loaded"
def create_default_stream_handler(network: INetworkService) -> StreamHandlerFn:
async def stream_handler(stream: INetStream) -> None:
await network.get_manager().wait_finished()
@ -71,9 +126,8 @@ class Swarm(Service, INetworkService):
peerstore: IPeerStore
upgrader: TransportUpgrader
transport: ITransport
# TODO: Connection and `peer_id` are 1-1 mapping in our implementation,
# whereas in Go one `peer_id` may point to multiple connections.
connections: dict[ID, INetConn]
# Enhanced: Support for multiple connections per peer
connections: dict[ID, list[INetConn]] # Multiple connections per peer
listeners: dict[str, IListener]
common_stream_handler: StreamHandlerFn
listener_nursery: trio.Nursery | None
@ -81,18 +135,31 @@ class Swarm(Service, INetworkService):
notifees: list[INotifee]
# Enhanced: New configuration
retry_config: RetryConfig
connection_config: ConnectionConfig
_round_robin_index: dict[ID, int]
def __init__(
self,
peer_id: ID,
peerstore: IPeerStore,
upgrader: TransportUpgrader,
transport: ITransport,
retry_config: RetryConfig | None = None,
connection_config: ConnectionConfig | None = None,
):
self.self_id = peer_id
self.peerstore = peerstore
self.upgrader = upgrader
self.transport = transport
self.connections = dict()
# Enhanced: Initialize retry and connection configuration
self.retry_config = retry_config or RetryConfig()
self.connection_config = connection_config or ConnectionConfig()
# Enhanced: Initialize connections as 1:many mapping
self.connections = {}
self.listeners = dict()
# Create Notifee array
@ -103,6 +170,9 @@ class Swarm(Service, INetworkService):
self.listener_nursery = None
self.event_listener_nursery_created = trio.Event()
# Load balancing state
self._round_robin_index = {}
async def run(self) -> None:
async with trio.open_nursery() as nursery:
# Create a nursery for listener tasks.
@ -122,18 +192,74 @@ class Swarm(Service, INetworkService):
def set_stream_handler(self, stream_handler: StreamHandlerFn) -> None:
self.common_stream_handler = stream_handler
async def dial_peer(self, peer_id: ID) -> INetConn:
def get_connections(self, peer_id: ID | None = None) -> list[INetConn]:
"""
Try to create a connection to peer_id.
Get connections for peer (like JS getConnections, Go ConnsToPeer).
Parameters
----------
peer_id : ID | None
The peer ID to get connections for. If None, returns all connections.
Returns
-------
list[INetConn]
List of connections to the specified peer, or all connections
if peer_id is None.
"""
if peer_id is not None:
return self.connections.get(peer_id, [])
# Return all connections from all peers
all_conns = []
for conns in self.connections.values():
all_conns.extend(conns)
return all_conns
def get_connections_map(self) -> dict[ID, list[INetConn]]:
"""
Get all connections map (like JS getConnectionsMap).
Returns
-------
dict[ID, list[INetConn]]
The complete mapping of peer IDs to their connection lists.
"""
return self.connections.copy()
def get_connection(self, peer_id: ID) -> INetConn | None:
"""
Get single connection for backward compatibility.
Parameters
----------
peer_id : ID
The peer ID to get a connection for.
Returns
-------
INetConn | None
The first available connection, or None if no connections exist.
"""
conns = self.get_connections(peer_id)
return conns[0] if conns else None
async def dial_peer(self, peer_id: ID) -> list[INetConn]:
"""
Try to create connections to peer_id with enhanced retry logic.
:param peer_id: peer if we want to dial
:raises SwarmException: raised when an error occurs
:return: muxed connection
:return: list of muxed connections
"""
if peer_id in self.connections:
# If muxed connection already exists for peer_id,
# set muxed connection equal to existing muxed connection
return self.connections[peer_id]
# Check if we already have connections
existing_connections = self.get_connections(peer_id)
if existing_connections:
logger.debug(f"Reusing existing connections to peer {peer_id}")
return existing_connections
logger.debug("attempting to dial peer %s", peer_id)
@ -146,12 +272,19 @@ class Swarm(Service, INetworkService):
if not addrs:
raise SwarmException(f"No known addresses to peer {peer_id}")
connections = []
exceptions: list[SwarmException] = []
# Try all known addresses
# Enhanced: Try all known addresses with retry logic
for multiaddr in addrs:
try:
return await self.dial_addr(multiaddr, peer_id)
connection = await self._dial_with_retry(multiaddr, peer_id)
connections.append(connection)
# Limit number of connections per peer
if len(connections) >= self.connection_config.max_connections_per_peer:
break
except SwarmException as e:
exceptions.append(e)
logger.debug(
@ -161,15 +294,73 @@ class Swarm(Service, INetworkService):
exc_info=e,
)
# Tried all addresses, raising exception.
raise SwarmException(
f"unable to connect to {peer_id}, no addresses established a successful "
"connection (with exceptions)"
) from MultiError(exceptions)
if not connections:
# Tried all addresses, raising exception.
raise SwarmException(
f"unable to connect to {peer_id}, no addresses established a "
"successful connection (with exceptions)"
) from MultiError(exceptions)
async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn:
return connections
async def _dial_with_retry(self, addr: Multiaddr, peer_id: ID) -> INetConn:
"""
Try to create a connection to peer_id with addr.
Enhanced: Dial with retry logic and exponential backoff.
:param addr: the address to dial
:param peer_id: the peer we want to connect to
:raises SwarmException: raised when all retry attempts fail
:return: network connection
"""
last_exception = None
for attempt in range(self.retry_config.max_retries + 1):
try:
return await self._dial_addr_single_attempt(addr, peer_id)
except Exception as e:
last_exception = e
if attempt < self.retry_config.max_retries:
delay = self._calculate_backoff_delay(attempt)
logger.debug(
f"Connection attempt {attempt + 1} failed, "
f"retrying in {delay:.2f}s: {e}"
)
await trio.sleep(delay)
else:
logger.debug(f"All {self.retry_config.max_retries} attempts failed")
# Convert the last exception to SwarmException for consistency
if last_exception is not None:
if isinstance(last_exception, SwarmException):
raise last_exception
else:
raise SwarmException(
f"Failed to connect after {self.retry_config.max_retries} attempts"
) from last_exception
# This should never be reached, but mypy requires it
raise SwarmException("Unexpected error in retry logic")
def _calculate_backoff_delay(self, attempt: int) -> float:
"""
Enhanced: Calculate backoff delay with jitter to prevent thundering herd.
:param attempt: the current attempt number (0-based)
:return: delay in seconds
"""
delay = min(
self.retry_config.initial_delay
* (self.retry_config.backoff_multiplier**attempt),
self.retry_config.max_delay,
)
# Add jitter to prevent synchronized retries
jitter = delay * self.retry_config.jitter_factor
return delay + random.uniform(-jitter, jitter)
async def _dial_addr_single_attempt(self, addr: Multiaddr, peer_id: ID) -> INetConn:
"""
Enhanced: Single attempt to dial an address (extracted from original dial_addr).
:param addr: the address we want to connect with
:param peer_id: the peer we want to connect to
@ -216,19 +407,97 @@ class Swarm(Service, INetworkService):
return swarm_conn
async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn:
"""
Enhanced: Try to create a connection to peer_id with addr using retry logic.
:param addr: the address we want to connect with
:param peer_id: the peer we want to connect to
:raises SwarmException: raised when an error occurs
:return: network connection
"""
return await self._dial_with_retry(addr, peer_id)
async def new_stream(self, peer_id: ID) -> INetStream:
"""
Enhanced: Create a new stream with load balancing across multiple connections.
:param peer_id: peer_id of destination
:raises SwarmException: raised when an error occurs
:return: net stream instance
"""
logger.debug("attempting to open a stream to peer %s", peer_id)
swarm_conn = await self.dial_peer(peer_id)
# Get existing connections or dial new ones
connections = self.get_connections(peer_id)
if not connections:
connections = await self.dial_peer(peer_id)
net_stream = await swarm_conn.new_stream()
logger.debug("successfully opened a stream to peer %s", peer_id)
return net_stream
# Load balancing strategy at interface level
connection = self._select_connection(connections, peer_id)
try:
net_stream = await connection.new_stream()
logger.debug("successfully opened a stream to peer %s", peer_id)
return net_stream
except Exception as e:
logger.debug(f"Failed to create stream on connection: {e}")
# Try other connections if available
for other_conn in connections:
if other_conn != connection:
try:
net_stream = await other_conn.new_stream()
logger.debug(
f"Successfully opened a stream to peer {peer_id} "
"using alternative connection"
)
return net_stream
except Exception:
continue
# All connections failed, raise exception
raise SwarmException(f"Failed to create stream to peer {peer_id}") from e
def _select_connection(self, connections: list[INetConn], peer_id: ID) -> INetConn:
"""
Select connection based on load balancing strategy.
Parameters
----------
connections : list[INetConn]
List of available connections.
peer_id : ID
The peer ID for round-robin tracking.
strategy : str
Load balancing strategy ("round_robin", "least_loaded", etc.).
Returns
-------
INetConn
Selected connection.
"""
if not connections:
raise ValueError("No connections available")
strategy = self.connection_config.load_balancing_strategy
if strategy == "round_robin":
# Simple round-robin selection
if peer_id not in self._round_robin_index:
self._round_robin_index[peer_id] = 0
index = self._round_robin_index[peer_id] % len(connections)
self._round_robin_index[peer_id] += 1
return connections[index]
elif strategy == "least_loaded":
# Find connection with least streams
return min(connections, key=lambda c: len(c.get_streams()))
else:
# Default to first connection
return connections[0]
async def listen(self, *multiaddrs: Multiaddr) -> bool:
"""
@ -250,11 +519,14 @@ class Swarm(Service, INetworkService):
# We need to wait until `self.listener_nursery` is created.
await self.event_listener_nursery_created.wait()
success_count = 0
for maddr in multiaddrs:
logger.debug(f"Swarm.listen processing multiaddr: {maddr}")
if str(maddr) in self.listeners:
logger.debug(f"Swarm.listen: listener already exists for {maddr}")
return True
success_count += 1
continue
async def conn_handler(
read_write_closer: ReadWriteCloser, maddr: Multiaddr = maddr
@ -309,13 +581,14 @@ class Swarm(Service, INetworkService):
# Call notifiers since event occurred
await self.notify_listen(maddr)
return True
success_count += 1
logger.debug("successfully started listening on: %s", maddr)
except OSError:
# Failed. Continue looping.
logger.debug("fail to listen on: %s", maddr)
# No maddr succeeded
return False
# Return true if at least one address succeeded
return success_count > 0
async def close(self) -> None:
"""
@ -328,9 +601,9 @@ class Swarm(Service, INetworkService):
# Perform alternative cleanup if the manager isn't initialized
# Close all connections manually
if hasattr(self, "connections"):
for conn_id in list(self.connections.keys()):
conn = self.connections[conn_id]
await conn.close()
for peer_id, conns in list(self.connections.items()):
for conn in conns:
await conn.close()
# Clear connection tracking dictionary
self.connections.clear()
@ -360,12 +633,28 @@ class Swarm(Service, INetworkService):
logger.debug("swarm successfully closed")
async def close_peer(self, peer_id: ID) -> None:
if peer_id not in self.connections:
"""
Close all connections to the specified peer.
Parameters
----------
peer_id : ID
The peer ID to close connections for.
"""
connections = self.get_connections(peer_id)
if not connections:
return
connection = self.connections[peer_id]
# NOTE: `connection.close` will delete `peer_id` from `self.connections`
# and `notify_disconnected` for us.
await connection.close()
# Close all connections
for connection in connections:
try:
await connection.close()
except Exception as e:
logger.warning(f"Error closing connection to {peer_id}: {e}")
# Remove from connections dict
self.connections.pop(peer_id, None)
logger.debug("successfully close the connection to peer %s", peer_id)
@ -384,21 +673,71 @@ class Swarm(Service, INetworkService):
await muxed_conn.event_started.wait()
self.manager.run_task(swarm_conn.start)
await swarm_conn.event_started.wait()
# Store muxed_conn with peer id
self.connections[muxed_conn.peer_id] = swarm_conn
# Add to connections dict with deduplication
peer_id = muxed_conn.peer_id
if peer_id not in self.connections:
self.connections[peer_id] = []
# Check for duplicate connections by comparing the underlying muxed connection
for existing_conn in self.connections[peer_id]:
if existing_conn.muxed_conn == muxed_conn:
logger.debug(f"Connection already exists for peer {peer_id}")
# existing_conn is a SwarmConn since it's stored in the connections list
return existing_conn # type: ignore[return-value]
self.connections[peer_id].append(swarm_conn)
# Trim if we exceed max connections
max_conns = self.connection_config.max_connections_per_peer
if len(self.connections[peer_id]) > max_conns:
self._trim_connections(peer_id)
# Call notifiers since event occurred
await self.notify_connected(swarm_conn)
return swarm_conn
def _trim_connections(self, peer_id: ID) -> None:
"""
Remove oldest connections when limit is exceeded.
"""
connections = self.connections[peer_id]
if len(connections) <= self.connection_config.max_connections_per_peer:
return
# Sort by creation time and remove oldest
# For now, just keep the most recent connections
max_conns = self.connection_config.max_connections_per_peer
connections_to_remove = connections[:-max_conns]
for conn in connections_to_remove:
logger.debug(f"Trimming old connection for peer {peer_id}")
trio.lowlevel.spawn_system_task(self._close_connection_async, conn)
# Keep only the most recent connections
max_conns = self.connection_config.max_connections_per_peer
self.connections[peer_id] = connections[-max_conns:]
async def _close_connection_async(self, connection: INetConn) -> None:
"""Close a connection asynchronously."""
try:
await connection.close()
except Exception as e:
logger.warning(f"Error closing connection: {e}")
def remove_conn(self, swarm_conn: SwarmConn) -> None:
"""
Simply remove the connection from Swarm's records, without closing
the connection.
"""
peer_id = swarm_conn.muxed_conn.peer_id
if peer_id not in self.connections:
return
del self.connections[peer_id]
if peer_id in self.connections:
self.connections[peer_id] = [
conn for conn in self.connections[peer_id] if conn != swarm_conn
]
if not self.connections[peer_id]:
del self.connections[peer_id]
# Notifee
@ -444,3 +783,21 @@ class Swarm(Service, INetworkService):
async with trio.open_nursery() as nursery:
for notifee in self.notifees:
nursery.start_soon(notifier, notifee)
# Backward compatibility properties
@property
def connections_legacy(self) -> dict[ID, INetConn]:
"""
Legacy 1:1 mapping for backward compatibility.
Returns
-------
dict[ID, INetConn]
Legacy mapping with only the first connection per peer.
"""
legacy_conns = {}
for peer_id, conns in self.connections.items():
if conns:
legacy_conns[peer_id] = conns[0]
return legacy_conns

View File

@ -1,5 +1,7 @@
from typing import Any, cast
import multiaddr
from libp2p.crypto.ed25519 import Ed25519PublicKey
from libp2p.crypto.keys import PrivateKey, PublicKey
from libp2p.crypto.rsa import RSAPublicKey
@ -131,6 +133,9 @@ class Envelope:
)
return False
def _env_addrs_set(self) -> set[multiaddr.Multiaddr]:
return {b for b in self.record().addrs}
def pub_key_to_protobuf(pub_key: PublicKey) -> cryto_pb.PublicKey:
"""

View File

@ -16,6 +16,7 @@ import trio
from trio import MemoryReceiveChannel, MemorySendChannel
from libp2p.abc import (
IHost,
IPeerStore,
)
from libp2p.crypto.keys import (
@ -23,7 +24,8 @@ from libp2p.crypto.keys import (
PrivateKey,
PublicKey,
)
from libp2p.peer.envelope import Envelope
from libp2p.peer.envelope import Envelope, seal_record
from libp2p.peer.peer_record import PeerRecord
from .id import (
ID,
@ -39,6 +41,86 @@ from .peerinfo import (
PERMANENT_ADDR_TTL = 0
def create_signed_peer_record(
peer_id: ID, addrs: list[Multiaddr], pvt_key: PrivateKey
) -> Envelope:
"""Creates a signed_peer_record wrapped in an Envelope"""
record = PeerRecord(peer_id, addrs)
envelope = seal_record(record, pvt_key)
return envelope
def env_to_send_in_RPC(host: IHost) -> tuple[bytes, bool]:
"""
Return the signed peer record (Envelope) to be sent in an RPC.
This function checks whether the host already has a cached signed peer record
(SPR). If one exists and its addresses match the host's current listen
addresses, the cached envelope is reused. Otherwise, a new signed peer record
is created, cached, and returned.
Parameters
----------
host : IHost
The local host instance, providing access to peer ID, listen addresses,
private key, and the peerstore.
Returns
-------
tuple[bytes, bool]
A 2-tuple where the first element is the serialized envelope (bytes)
for the signed peer record, and the second element is a boolean flag
indicating whether a new record was created (True) or an existing cached
one was reused (False).
"""
listen_addrs_set = {addr for addr in host.get_addrs()}
local_env = host.get_peerstore().get_local_record()
if local_env is None:
# No cached SPR yet -> create one
return issue_and_cache_local_record(host), True
else:
record_addrs_set = local_env._env_addrs_set()
if record_addrs_set == listen_addrs_set:
# Perfect match -> reuse cached envelope
return local_env.marshal_envelope(), False
else:
# Addresses changed -> issue a new SPR and cache it
return issue_and_cache_local_record(host), True
def issue_and_cache_local_record(host: IHost) -> bytes:
"""
Create and cache a new signed peer record (Envelope) for the host.
This function generates a new signed peer record from the hosts peer ID,
listen addresses, and private key. The resulting envelope is stored in
the peerstore as the local record for future reuse.
Parameters
----------
host : IHost
The local host instance, providing access to peer ID, listen addresses,
private key, and the peerstore.
Returns
-------
bytes
The serialized envelope (bytes) representing the newly created signed
peer record.
"""
env = create_signed_peer_record(
host.get_id(),
host.get_addrs(),
host.get_private_key(),
)
# Cache it for next time use
host.get_peerstore().set_local_record(env)
return env.marshal_envelope()
class PeerRecordState:
envelope: Envelope
seq: int
@ -55,8 +137,17 @@ class PeerStore(IPeerStore):
self.peer_data_map = defaultdict(PeerData)
self.addr_update_channels: dict[ID, MemorySendChannel[Multiaddr]] = {}
self.peer_record_map: dict[ID, PeerRecordState] = {}
self.local_peer_record: Envelope | None = None
self.max_records = max_records
def get_local_record(self) -> Envelope | None:
"""Get the local-signed-record wrapped in Envelope"""
return self.local_peer_record
def set_local_record(self, envelope: Envelope) -> None:
"""Set the local-signed-record wrapped in Envelope"""
self.local_peer_record = envelope
def peer_info(self, peer_id: ID) -> PeerInfo:
"""
:param peer_id: peer ID to get info for

View File

@ -15,6 +15,7 @@ from libp2p.custom_types import (
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerstore import env_to_send_in_RPC
from .exceptions import (
PubsubRouterError,
@ -103,6 +104,11 @@ class FloodSub(IPubsubRouter):
)
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
# Add the senderRecord of the peer in the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
rpc_msg.senderRecord = envelope_bytes
logger.debug("publishing message %s", pubsub_msg)
if self.pubsub is None:

View File

@ -34,10 +34,12 @@ from libp2p.peer.peerinfo import (
)
from libp2p.peer.peerstore import (
PERMANENT_ADDR_TTL,
env_to_send_in_RPC,
)
from libp2p.pubsub import (
floodsub,
)
from libp2p.pubsub.utils import maybe_consume_signed_record
from libp2p.tools.async_service import (
Service,
)
@ -226,6 +228,12 @@ class GossipSub(IPubsubRouter, Service):
:param rpc: RPC message
:param sender_peer_id: id of the peer who sent the message
"""
# Process the senderRecord if sent
if isinstance(self.pubsub, Pubsub):
if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id):
logger.error("Received an invalid-signed-record, ignoring the message")
return
control_message = rpc.control
# Relay each rpc control message to the appropriate handler
@ -253,6 +261,11 @@ class GossipSub(IPubsubRouter, Service):
)
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
# Add the senderRecord of the peer in the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
rpc_msg.senderRecord = envelope_bytes
logger.debug("publishing message %s", pubsub_msg)
for peer_id in peers_gen:
@ -818,6 +831,13 @@ class GossipSub(IPubsubRouter, Service):
# 1) Package these messages into a single packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
# Here the an RPC message is being created and published in response
# to the iwant control msg, so we will send a freshly created senderRecord
# with the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
packet.senderRecord = envelope_bytes
packet.publish.extend(msgs_to_forward)
if self.pubsub is None:
@ -973,6 +993,12 @@ class GossipSub(IPubsubRouter, Service):
raise NoPubsubAttached
# Add control message to packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
# Add the sender's peer-record in the RPC msg
if isinstance(self.pubsub, Pubsub):
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
packet.senderRecord = envelope_bytes
packet.control.CopyFrom(control_msg)
# Get stream for peer from pubsub

View File

@ -14,6 +14,7 @@ message RPC {
}
optional ControlMessage control = 3;
optional bytes senderRecord = 4;
}
message Message {

View File

@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: libp2p/pubsub/pb/rpc.proto
# Protobuf Python Version: 4.25.3
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@ -13,39 +14,39 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xca\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x12\x14\n\x0csenderRecord\x18\x04 \x01(\x0c\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals())
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_RPC._serialized_start=42
_RPC._serialized_end=222
_RPC_SUBOPTS._serialized_start=177
_RPC_SUBOPTS._serialized_end=222
_MESSAGE._serialized_start=224
_MESSAGE._serialized_end=329
_CONTROLMESSAGE._serialized_start=332
_CONTROLMESSAGE._serialized_end=508
_CONTROLIHAVE._serialized_start=510
_CONTROLIHAVE._serialized_end=561
_CONTROLIWANT._serialized_start=563
_CONTROLIWANT._serialized_end=597
_CONTROLGRAFT._serialized_start=599
_CONTROLGRAFT._serialized_end=630
_CONTROLPRUNE._serialized_start=632
_CONTROLPRUNE._serialized_end=716
_PEERINFO._serialized_start=718
_PEERINFO._serialized_end=770
_TOPICDESCRIPTOR._serialized_start=773
_TOPICDESCRIPTOR._serialized_end=1164
_TOPICDESCRIPTOR_AUTHOPTS._serialized_start=906
_TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1030
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=992
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1030
_TOPICDESCRIPTOR_ENCOPTS._serialized_start=1033
_TOPICDESCRIPTOR_ENCOPTS._serialized_end=1164
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1121
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1164
_globals['_RPC']._serialized_start=42
_globals['_RPC']._serialized_end=244
_globals['_RPC_SUBOPTS']._serialized_start=199
_globals['_RPC_SUBOPTS']._serialized_end=244
_globals['_MESSAGE']._serialized_start=246
_globals['_MESSAGE']._serialized_end=351
_globals['_CONTROLMESSAGE']._serialized_start=354
_globals['_CONTROLMESSAGE']._serialized_end=530
_globals['_CONTROLIHAVE']._serialized_start=532
_globals['_CONTROLIHAVE']._serialized_end=583
_globals['_CONTROLIWANT']._serialized_start=585
_globals['_CONTROLIWANT']._serialized_end=619
_globals['_CONTROLGRAFT']._serialized_start=621
_globals['_CONTROLGRAFT']._serialized_end=652
_globals['_CONTROLPRUNE']._serialized_start=654
_globals['_CONTROLPRUNE']._serialized_end=738
_globals['_PEERINFO']._serialized_start=740
_globals['_PEERINFO']._serialized_end=792
_globals['_TOPICDESCRIPTOR']._serialized_start=795
_globals['_TOPICDESCRIPTOR']._serialized_end=1186
_globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=928
_globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1052
_globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1014
_globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1052
_globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1055
_globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1186
_globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1143
_globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1186
# @@protoc_insertion_point(module_scope)

View File

@ -1,323 +1,132 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto"""
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
import builtins
import collections.abc
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
import sys
import typing
DESCRIPTOR: _descriptor.FileDescriptor
if sys.version_info >= (3, 10):
import typing as typing_extensions
else:
import typing_extensions
class RPC(_message.Message):
__slots__ = ("subscriptions", "publish", "control", "senderRecord")
class SubOpts(_message.Message):
__slots__ = ("subscribe", "topicid")
SUBSCRIBE_FIELD_NUMBER: _ClassVar[int]
TOPICID_FIELD_NUMBER: _ClassVar[int]
subscribe: bool
topicid: str
def __init__(self, subscribe: bool = ..., topicid: _Optional[str] = ...) -> None: ...
SUBSCRIPTIONS_FIELD_NUMBER: _ClassVar[int]
PUBLISH_FIELD_NUMBER: _ClassVar[int]
CONTROL_FIELD_NUMBER: _ClassVar[int]
SENDERRECORD_FIELD_NUMBER: _ClassVar[int]
subscriptions: _containers.RepeatedCompositeFieldContainer[RPC.SubOpts]
publish: _containers.RepeatedCompositeFieldContainer[Message]
control: ControlMessage
senderRecord: bytes
def __init__(self, subscriptions: _Optional[_Iterable[_Union[RPC.SubOpts, _Mapping]]] = ..., publish: _Optional[_Iterable[_Union[Message, _Mapping]]] = ..., control: _Optional[_Union[ControlMessage, _Mapping]] = ..., senderRecord: _Optional[bytes] = ...) -> None: ... # type: ignore
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
class Message(_message.Message):
__slots__ = ("from_id", "data", "seqno", "topicIDs", "signature", "key")
FROM_ID_FIELD_NUMBER: _ClassVar[int]
DATA_FIELD_NUMBER: _ClassVar[int]
SEQNO_FIELD_NUMBER: _ClassVar[int]
TOPICIDS_FIELD_NUMBER: _ClassVar[int]
SIGNATURE_FIELD_NUMBER: _ClassVar[int]
KEY_FIELD_NUMBER: _ClassVar[int]
from_id: bytes
data: bytes
seqno: bytes
topicIDs: _containers.RepeatedScalarFieldContainer[str]
signature: bytes
key: bytes
def __init__(self, from_id: _Optional[bytes] = ..., data: _Optional[bytes] = ..., seqno: _Optional[bytes] = ..., topicIDs: _Optional[_Iterable[str]] = ..., signature: _Optional[bytes] = ..., key: _Optional[bytes] = ...) -> None: ...
@typing.final
class RPC(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class ControlMessage(_message.Message):
__slots__ = ("ihave", "iwant", "graft", "prune")
IHAVE_FIELD_NUMBER: _ClassVar[int]
IWANT_FIELD_NUMBER: _ClassVar[int]
GRAFT_FIELD_NUMBER: _ClassVar[int]
PRUNE_FIELD_NUMBER: _ClassVar[int]
ihave: _containers.RepeatedCompositeFieldContainer[ControlIHave]
iwant: _containers.RepeatedCompositeFieldContainer[ControlIWant]
graft: _containers.RepeatedCompositeFieldContainer[ControlGraft]
prune: _containers.RepeatedCompositeFieldContainer[ControlPrune]
def __init__(self, ihave: _Optional[_Iterable[_Union[ControlIHave, _Mapping]]] = ..., iwant: _Optional[_Iterable[_Union[ControlIWant, _Mapping]]] = ..., graft: _Optional[_Iterable[_Union[ControlGraft, _Mapping]]] = ..., prune: _Optional[_Iterable[_Union[ControlPrune, _Mapping]]] = ...) -> None: ... # type: ignore
@typing.final
class SubOpts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class ControlIHave(_message.Message):
__slots__ = ("topicID", "messageIDs")
TOPICID_FIELD_NUMBER: _ClassVar[int]
MESSAGEIDS_FIELD_NUMBER: _ClassVar[int]
topicID: str
messageIDs: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, topicID: _Optional[str] = ..., messageIDs: _Optional[_Iterable[str]] = ...) -> None: ...
SUBSCRIBE_FIELD_NUMBER: builtins.int
TOPICID_FIELD_NUMBER: builtins.int
subscribe: builtins.bool
"""subscribe or unsubscribe"""
topicid: builtins.str
def __init__(
self,
*,
subscribe: builtins.bool | None = ...,
topicid: builtins.str | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["subscribe", b"subscribe", "topicid", b"topicid"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["subscribe", b"subscribe", "topicid", b"topicid"]) -> None: ...
class ControlIWant(_message.Message):
__slots__ = ("messageIDs",)
MESSAGEIDS_FIELD_NUMBER: _ClassVar[int]
messageIDs: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, messageIDs: _Optional[_Iterable[str]] = ...) -> None: ...
SUBSCRIPTIONS_FIELD_NUMBER: builtins.int
PUBLISH_FIELD_NUMBER: builtins.int
CONTROL_FIELD_NUMBER: builtins.int
@property
def subscriptions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RPC.SubOpts]: ...
@property
def publish(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message]: ...
@property
def control(self) -> global___ControlMessage: ...
def __init__(
self,
*,
subscriptions: collections.abc.Iterable[global___RPC.SubOpts] | None = ...,
publish: collections.abc.Iterable[global___Message] | None = ...,
control: global___ControlMessage | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["control", b"control"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["control", b"control", "publish", b"publish", "subscriptions", b"subscriptions"]) -> None: ...
class ControlGraft(_message.Message):
__slots__ = ("topicID",)
TOPICID_FIELD_NUMBER: _ClassVar[int]
topicID: str
def __init__(self, topicID: _Optional[str] = ...) -> None: ...
global___RPC = RPC
class ControlPrune(_message.Message):
__slots__ = ("topicID", "peers", "backoff")
TOPICID_FIELD_NUMBER: _ClassVar[int]
PEERS_FIELD_NUMBER: _ClassVar[int]
BACKOFF_FIELD_NUMBER: _ClassVar[int]
topicID: str
peers: _containers.RepeatedCompositeFieldContainer[PeerInfo]
backoff: int
def __init__(self, topicID: _Optional[str] = ..., peers: _Optional[_Iterable[_Union[PeerInfo, _Mapping]]] = ..., backoff: _Optional[int] = ...) -> None: ... # type: ignore
@typing.final
class Message(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class PeerInfo(_message.Message):
__slots__ = ("peerID", "signedPeerRecord")
PEERID_FIELD_NUMBER: _ClassVar[int]
SIGNEDPEERRECORD_FIELD_NUMBER: _ClassVar[int]
peerID: bytes
signedPeerRecord: bytes
def __init__(self, peerID: _Optional[bytes] = ..., signedPeerRecord: _Optional[bytes] = ...) -> None: ...
FROM_ID_FIELD_NUMBER: builtins.int
DATA_FIELD_NUMBER: builtins.int
SEQNO_FIELD_NUMBER: builtins.int
TOPICIDS_FIELD_NUMBER: builtins.int
SIGNATURE_FIELD_NUMBER: builtins.int
KEY_FIELD_NUMBER: builtins.int
from_id: builtins.bytes
data: builtins.bytes
seqno: builtins.bytes
signature: builtins.bytes
key: builtins.bytes
@property
def topicIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
def __init__(
self,
*,
from_id: builtins.bytes | None = ...,
data: builtins.bytes | None = ...,
seqno: builtins.bytes | None = ...,
topicIDs: collections.abc.Iterable[builtins.str] | None = ...,
signature: builtins.bytes | None = ...,
key: builtins.bytes | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["data", b"data", "from_id", b"from_id", "key", b"key", "seqno", b"seqno", "signature", b"signature"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["data", b"data", "from_id", b"from_id", "key", b"key", "seqno", b"seqno", "signature", b"signature", "topicIDs", b"topicIDs"]) -> None: ...
global___Message = Message
@typing.final
class ControlMessage(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
IHAVE_FIELD_NUMBER: builtins.int
IWANT_FIELD_NUMBER: builtins.int
GRAFT_FIELD_NUMBER: builtins.int
PRUNE_FIELD_NUMBER: builtins.int
@property
def ihave(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIHave]: ...
@property
def iwant(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIWant]: ...
@property
def graft(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlGraft]: ...
@property
def prune(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlPrune]: ...
def __init__(
self,
*,
ihave: collections.abc.Iterable[global___ControlIHave] | None = ...,
iwant: collections.abc.Iterable[global___ControlIWant] | None = ...,
graft: collections.abc.Iterable[global___ControlGraft] | None = ...,
prune: collections.abc.Iterable[global___ControlPrune] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["graft", b"graft", "ihave", b"ihave", "iwant", b"iwant", "prune", b"prune"]) -> None: ...
global___ControlMessage = ControlMessage
@typing.final
class ControlIHave(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TOPICID_FIELD_NUMBER: builtins.int
MESSAGEIDS_FIELD_NUMBER: builtins.int
topicID: builtins.str
@property
def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
def __init__(
self,
*,
topicID: builtins.str | None = ...,
messageIDs: collections.abc.Iterable[builtins.str] | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs", "topicID", b"topicID"]) -> None: ...
global___ControlIHave = ControlIHave
@typing.final
class ControlIWant(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGEIDS_FIELD_NUMBER: builtins.int
@property
def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
def __init__(
self,
*,
messageIDs: collections.abc.Iterable[builtins.str] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs"]) -> None: ...
global___ControlIWant = ControlIWant
@typing.final
class ControlGraft(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TOPICID_FIELD_NUMBER: builtins.int
topicID: builtins.str
def __init__(
self,
*,
topicID: builtins.str | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ...
global___ControlGraft = ControlGraft
@typing.final
class ControlPrune(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TOPICID_FIELD_NUMBER: builtins.int
PEERS_FIELD_NUMBER: builtins.int
BACKOFF_FIELD_NUMBER: builtins.int
topicID: builtins.str
backoff: builtins.int
@property
def peers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___PeerInfo]: ...
def __init__(
self,
*,
topicID: builtins.str | None = ...,
peers: collections.abc.Iterable[global___PeerInfo] | None = ...,
backoff: builtins.int | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["backoff", b"backoff", "topicID", b"topicID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["backoff", b"backoff", "peers", b"peers", "topicID", b"topicID"]) -> None: ...
global___ControlPrune = ControlPrune
@typing.final
class PeerInfo(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PEERID_FIELD_NUMBER: builtins.int
SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int
peerID: builtins.bytes
signedPeerRecord: builtins.bytes
def __init__(
self,
*,
peerID: builtins.bytes | None = ...,
signedPeerRecord: builtins.bytes | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["peerID", b"peerID", "signedPeerRecord", b"signedPeerRecord"]) -> None: ...
global___PeerInfo = PeerInfo
@typing.final
class TopicDescriptor(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@typing.final
class AuthOpts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class _AuthMode:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType
class _AuthModeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicDescriptor.AuthOpts._AuthMode.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
NONE: TopicDescriptor.AuthOpts._AuthMode.ValueType # 0
"""no authentication, anyone can publish"""
KEY: TopicDescriptor.AuthOpts._AuthMode.ValueType # 1
"""only messages signed by keys in the topic descriptor are accepted"""
WOT: TopicDescriptor.AuthOpts._AuthMode.ValueType # 2
"""web of trust, certificates can allow publisher set to grow"""
class AuthMode(_AuthMode, metaclass=_AuthModeEnumTypeWrapper): ...
NONE: TopicDescriptor.AuthOpts.AuthMode.ValueType # 0
"""no authentication, anyone can publish"""
KEY: TopicDescriptor.AuthOpts.AuthMode.ValueType # 1
"""only messages signed by keys in the topic descriptor are accepted"""
WOT: TopicDescriptor.AuthOpts.AuthMode.ValueType # 2
"""web of trust, certificates can allow publisher set to grow"""
MODE_FIELD_NUMBER: builtins.int
KEYS_FIELD_NUMBER: builtins.int
mode: global___TopicDescriptor.AuthOpts.AuthMode.ValueType
@property
def keys(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]:
"""root keys to trust"""
def __init__(
self,
*,
mode: global___TopicDescriptor.AuthOpts.AuthMode.ValueType | None = ...,
keys: collections.abc.Iterable[builtins.bytes] | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["mode", b"mode"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["keys", b"keys", "mode", b"mode"]) -> None: ...
@typing.final
class EncOpts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
class _EncMode:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType
class _EncModeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicDescriptor.EncOpts._EncMode.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
NONE: TopicDescriptor.EncOpts._EncMode.ValueType # 0
"""no encryption, anyone can read"""
SHAREDKEY: TopicDescriptor.EncOpts._EncMode.ValueType # 1
"""messages are encrypted with shared key"""
WOT: TopicDescriptor.EncOpts._EncMode.ValueType # 2
"""web of trust, certificates can allow publisher set to grow"""
class EncMode(_EncMode, metaclass=_EncModeEnumTypeWrapper): ...
NONE: TopicDescriptor.EncOpts.EncMode.ValueType # 0
"""no encryption, anyone can read"""
SHAREDKEY: TopicDescriptor.EncOpts.EncMode.ValueType # 1
"""messages are encrypted with shared key"""
WOT: TopicDescriptor.EncOpts.EncMode.ValueType # 2
"""web of trust, certificates can allow publisher set to grow"""
MODE_FIELD_NUMBER: builtins.int
KEYHASHES_FIELD_NUMBER: builtins.int
mode: global___TopicDescriptor.EncOpts.EncMode.ValueType
@property
def keyHashes(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]:
"""the hashes of the shared keys used (salted)"""
def __init__(
self,
*,
mode: global___TopicDescriptor.EncOpts.EncMode.ValueType | None = ...,
keyHashes: collections.abc.Iterable[builtins.bytes] | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["mode", b"mode"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["keyHashes", b"keyHashes", "mode", b"mode"]) -> None: ...
NAME_FIELD_NUMBER: builtins.int
AUTH_FIELD_NUMBER: builtins.int
ENC_FIELD_NUMBER: builtins.int
name: builtins.str
@property
def auth(self) -> global___TopicDescriptor.AuthOpts: ...
@property
def enc(self) -> global___TopicDescriptor.EncOpts: ...
def __init__(
self,
*,
name: builtins.str | None = ...,
auth: global___TopicDescriptor.AuthOpts | None = ...,
enc: global___TopicDescriptor.EncOpts | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["auth", b"auth", "enc", b"enc", "name", b"name"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["auth", b"auth", "enc", b"enc", "name", b"name"]) -> None: ...
global___TopicDescriptor = TopicDescriptor
class TopicDescriptor(_message.Message):
__slots__ = ("name", "auth", "enc")
class AuthOpts(_message.Message):
__slots__ = ("mode", "keys")
class AuthMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
NONE: _ClassVar[TopicDescriptor.AuthOpts.AuthMode]
KEY: _ClassVar[TopicDescriptor.AuthOpts.AuthMode]
WOT: _ClassVar[TopicDescriptor.AuthOpts.AuthMode]
NONE: TopicDescriptor.AuthOpts.AuthMode
KEY: TopicDescriptor.AuthOpts.AuthMode
WOT: TopicDescriptor.AuthOpts.AuthMode
MODE_FIELD_NUMBER: _ClassVar[int]
KEYS_FIELD_NUMBER: _ClassVar[int]
mode: TopicDescriptor.AuthOpts.AuthMode
keys: _containers.RepeatedScalarFieldContainer[bytes]
def __init__(self, mode: _Optional[_Union[TopicDescriptor.AuthOpts.AuthMode, str]] = ..., keys: _Optional[_Iterable[bytes]] = ...) -> None: ...
class EncOpts(_message.Message):
__slots__ = ("mode", "keyHashes")
class EncMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
NONE: _ClassVar[TopicDescriptor.EncOpts.EncMode]
SHAREDKEY: _ClassVar[TopicDescriptor.EncOpts.EncMode]
WOT: _ClassVar[TopicDescriptor.EncOpts.EncMode]
NONE: TopicDescriptor.EncOpts.EncMode
SHAREDKEY: TopicDescriptor.EncOpts.EncMode
WOT: TopicDescriptor.EncOpts.EncMode
MODE_FIELD_NUMBER: _ClassVar[int]
KEYHASHES_FIELD_NUMBER: _ClassVar[int]
mode: TopicDescriptor.EncOpts.EncMode
keyHashes: _containers.RepeatedScalarFieldContainer[bytes]
def __init__(self, mode: _Optional[_Union[TopicDescriptor.EncOpts.EncMode, str]] = ..., keyHashes: _Optional[_Iterable[bytes]] = ...) -> None: ...
NAME_FIELD_NUMBER: _ClassVar[int]
AUTH_FIELD_NUMBER: _ClassVar[int]
ENC_FIELD_NUMBER: _ClassVar[int]
name: str
auth: TopicDescriptor.AuthOpts
enc: TopicDescriptor.EncOpts
def __init__(self, name: _Optional[str] = ..., auth: _Optional[_Union[TopicDescriptor.AuthOpts, _Mapping]] = ..., enc: _Optional[_Union[TopicDescriptor.EncOpts, _Mapping]] = ...) -> None: ... # type: ignore

View File

@ -56,6 +56,8 @@ from libp2p.peer.id import (
from libp2p.peer.peerdata import (
PeerDataError,
)
from libp2p.peer.peerstore import env_to_send_in_RPC
from libp2p.pubsub.utils import maybe_consume_signed_record
from libp2p.tools.async_service import (
Service,
)
@ -247,6 +249,10 @@ class Pubsub(Service, IPubsub):
packet.subscriptions.extend(
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
)
# Add the sender's signedRecord in the RPC message
envelope_bytes, _ = env_to_send_in_RPC(self.host)
packet.senderRecord = envelope_bytes
return packet
async def continuously_read_stream(self, stream: INetStream) -> None:
@ -263,6 +269,14 @@ class Pubsub(Service, IPubsub):
incoming: bytes = await read_varint_prefixed_bytes(stream)
rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming)
# Process the sender's signed-record if sent
if not maybe_consume_signed_record(rpc_incoming, self.host, peer_id):
logger.error(
"Received an invalid-signed-record, ignoring the incoming msg"
)
continue
if rpc_incoming.publish:
# deal with RPC.publish
for msg in rpc_incoming.publish:
@ -572,6 +586,9 @@ class Pubsub(Service, IPubsub):
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
)
# Add the senderRecord of the peer in the RPC msg
envelope_bytes, _ = env_to_send_in_RPC(self.host)
packet.senderRecord = envelope_bytes
# Send out subscribe message to all peers
await self.message_all_peers(packet.SerializeToString())
@ -604,6 +621,9 @@ class Pubsub(Service, IPubsub):
packet.subscriptions.extend(
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]
)
# Add the senderRecord of the peer in the RPC msg
envelope_bytes, _ = env_to_send_in_RPC(self.host)
packet.senderRecord = envelope_bytes
# Send out unsubscribe message to all peers
await self.message_all_peers(packet.SerializeToString())

50
libp2p/pubsub/utils.py Normal file
View File

@ -0,0 +1,50 @@
import logging
from libp2p.abc import IHost
from libp2p.peer.envelope import consume_envelope
from libp2p.peer.id import ID
from libp2p.pubsub.pb.rpc_pb2 import RPC
logger = logging.getLogger("pubsub-example.utils")
def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool:
"""
Attempt to parse and store a signed-peer-record (Envelope) received during
PubSub communication. If the record is invalid, the peer-id does not match, or
updating the peerstore fails, the function logs an error and returns False.
Parameters
----------
msg : RPC
The protobuf message received during PubSub communication.
host : IHost
The local host instance, providing access to the peerstore for storing
verified peer records.
peer_id : ID | None, optional
The expected peer ID for record validation. If provided, the peer ID
inside the record must match this value.
Returns
-------
bool
True if a valid signed peer record was successfully consumed and stored,
False otherwise.
"""
if msg.HasField("senderRecord"):
try:
# Convert the signed-peer-record(Envelope) from
# protobuf bytes
envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record")
if not record.peer_id == peer_id:
return False
# Use the default TTL of 2 hours (7200 seconds)
if not host.get_peerstore().consume_peer_record(envelope, 7200):
logger.error("Failed to update the Certified-Addr-Book")
return False
except Exception as e:
logger.error("Failed to update the Certified-Addr-Book: %s", e)
return False
return True

View File

@ -118,6 +118,8 @@ class SecurityMultistream(ABC):
# Select protocol if non-initiator
protocol, _ = await self.multiselect.negotiate(communicator)
if protocol is None:
raise MultiselectError("fail to negotiate a security protocol")
raise MultiselectError(
"Failed to negotiate a security protocol: no protocol selected"
)
# Return transport from protocol
return self.transports[protocol]

View File

@ -85,7 +85,9 @@ class MuxerMultistream:
else:
protocol, _ = await self.multiselect.negotiate(communicator)
if protocol is None:
raise MultiselectError("fail to negotiate a stream muxer protocol")
raise MultiselectError(
"Fail to negotiate a stream muxer protocol: no protocol selected"
)
return self.transports[protocol]
async def new_conn(self, conn: ISecureConn, peer_id: ID) -> IMuxedConn:

View File

@ -1,9 +1,7 @@
from libp2p.abc import (
IListener,
IMuxedConn,
IRawConnection,
ISecureConn,
ITransport,
)
from libp2p.custom_types import (
TMuxerOptions,
@ -43,10 +41,6 @@ class TransportUpgrader:
self.security_multistream = SecurityMultistream(secure_transports_by_protocol)
self.muxer_multistream = MuxerMultistream(muxer_transports_by_protocol)
def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None:
"""Upgrade multiaddr listeners to libp2p-transport listeners."""
# TODO: Figure out what to do with this function.
async def upgrade_security(
self,
raw_conn: IRawConnection,

View File

@ -15,6 +15,13 @@ from libp2p.utils.version import (
get_agent_version,
)
from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
expand_wildcard_address,
find_free_port,
)
__all__ = [
"decode_uvarint_from_stream",
"encode_delim",
@ -26,4 +33,8 @@ __all__ = [
"decode_varint_from_bytes",
"decode_varint_with_size",
"read_length_prefixed_protobuf",
"get_available_interfaces",
"get_optimal_binding_address",
"expand_wildcard_address",
"find_free_port",
]

View File

@ -0,0 +1,160 @@
from __future__ import annotations
import socket
from multiaddr import Multiaddr
try:
from multiaddr.utils import ( # type: ignore
get_network_addrs,
get_thin_waist_addresses,
)
_HAS_THIN_WAIST = True
except ImportError: # pragma: no cover - only executed in older environments
_HAS_THIN_WAIST = False
get_thin_waist_addresses = None # type: ignore
get_network_addrs = None # type: ignore
def _safe_get_network_addrs(ip_version: int) -> list[str]:
"""
Internal safe wrapper. Returns a list of IP addresses for the requested IP version.
Falls back to minimal defaults when Thin Waist helpers are missing.
:param ip_version: 4 or 6
"""
if _HAS_THIN_WAIST and get_network_addrs:
try:
return get_network_addrs(ip_version) or []
except Exception: # pragma: no cover - defensive
return []
# Fallback behavior (very conservative)
if ip_version == 4:
return ["127.0.0.1"]
if ip_version == 6:
return ["::1"]
return []
def find_free_port() -> int:
"""Find a free port on localhost."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) # Bind to a free port provided by the OS
return s.getsockname()[1]
def _safe_expand(addr: Multiaddr, port: int | None = None) -> list[Multiaddr]:
"""
Internal safe expansion wrapper. Returns a list of Multiaddr objects.
If Thin Waist isn't available, returns [addr] (identity).
"""
if _HAS_THIN_WAIST and get_thin_waist_addresses:
try:
if port is not None:
return get_thin_waist_addresses(addr, port=port) or []
return get_thin_waist_addresses(addr) or []
except Exception: # pragma: no cover - defensive
return [addr]
return [addr]
def get_available_interfaces(port: int, protocol: str = "tcp") -> list[Multiaddr]:
"""
Discover available network interfaces (IPv4 + IPv6 if supported) for binding.
:param port: Port number to bind to.
:param protocol: Transport protocol (e.g., "tcp" or "udp").
:return: List of Multiaddr objects representing candidate interface addresses.
"""
addrs: list[Multiaddr] = []
# IPv4 enumeration
seen_v4: set[str] = set()
for ip in _safe_get_network_addrs(4):
seen_v4.add(ip)
addrs.append(Multiaddr(f"/ip4/{ip}/{protocol}/{port}"))
# Ensure IPv4 loopback is always included when IPv4 interfaces are discovered
if seen_v4 and "127.0.0.1" not in seen_v4:
addrs.append(Multiaddr(f"/ip4/127.0.0.1/{protocol}/{port}"))
# TODO: IPv6 support temporarily disabled due to libp2p handshake issues
# IPv6 connections fail during protocol negotiation (SecurityUpgradeFailure)
# Re-enable IPv6 support once the following issues are resolved:
# - libp2p security handshake over IPv6
# - multiselect protocol over IPv6
# - connection establishment over IPv6
#
# seen_v6: set[str] = set()
# for ip in _safe_get_network_addrs(6):
# seen_v6.add(ip)
# addrs.append(Multiaddr(f"/ip6/{ip}/{protocol}/{port}"))
#
# # Always include IPv6 loopback for testing purposes when IPv6 is available
# # This ensures IPv6 functionality can be tested even without global IPv6 addresses
# if "::1" not in seen_v6:
# addrs.append(Multiaddr(f"/ip6/::1/{protocol}/{port}"))
# Fallback if nothing discovered
if not addrs:
addrs.append(Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}"))
return addrs
def expand_wildcard_address(
addr: Multiaddr, port: int | None = None
) -> list[Multiaddr]:
"""
Expand a wildcard (e.g. /ip4/0.0.0.0/tcp/0) into all concrete interfaces.
:param addr: Multiaddr to expand.
:param port: Optional override for port selection.
:return: List of concrete Multiaddr instances.
"""
expanded = _safe_expand(addr, port=port)
if not expanded: # Safety fallback
return [addr]
return expanded
def get_optimal_binding_address(port: int, protocol: str = "tcp") -> Multiaddr:
"""
Choose an optimal address for an example to bind to:
- Prefer non-loopback IPv4
- Then non-loopback IPv6
- Fallback to loopback
- Fallback to wildcard
:param port: Port number.
:param protocol: Transport protocol.
:return: A single Multiaddr chosen heuristically.
"""
candidates = get_available_interfaces(port, protocol)
def is_non_loopback(ma: Multiaddr) -> bool:
s = str(ma)
return not ("/ip4/127." in s or "/ip6/::1" in s)
for c in candidates:
if "/ip4/" in str(c) and is_non_loopback(c):
return c
for c in candidates:
if "/ip6/" in str(c) and is_non_loopback(c):
return c
for c in candidates:
if "/ip4/127." in str(c) or "/ip6/::1" in str(c):
return c
# As a final fallback, produce a wildcard
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")
__all__ = [
"get_available_interfaces",
"get_optimal_binding_address",
"expand_wildcard_address",
"find_free_port",
]

View File

@ -1,7 +1,4 @@
import atexit
from datetime import (
datetime,
)
import logging
import logging.handlers
import os
@ -21,6 +18,9 @@ log_queue: "queue.Queue[Any]" = queue.Queue()
# Store the current listener to stop it on exit
_current_listener: logging.handlers.QueueListener | None = None
# Store the handlers for proper cleanup
_current_handlers: list[logging.Handler] = []
# Event to track when the listener is ready
_listener_ready = threading.Event()
@ -95,7 +95,7 @@ def setup_logging() -> None:
- Child loggers inherit their parent's level unless explicitly set
- The root libp2p logger controls the default level
"""
global _current_listener, _listener_ready
global _current_listener, _listener_ready, _current_handlers
# Reset the event
_listener_ready.clear()
@ -105,6 +105,12 @@ def setup_logging() -> None:
_current_listener.stop()
_current_listener = None
# Close and clear existing handlers
for handler in _current_handlers:
if isinstance(handler, logging.FileHandler):
handler.close()
_current_handlers.clear()
# Get the log level from environment variable
debug_str = os.environ.get("LIBP2P_DEBUG", "")
@ -148,13 +154,10 @@ def setup_logging() -> None:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
else:
# Default log file with timestamp and unique identifier
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
unique_id = os.urandom(4).hex() # Add a unique identifier to prevent collisions
if os.name == "nt": # Windows
log_file = f"C:\\Windows\\Temp\\py-libp2p_{timestamp}_{unique_id}.log"
else: # Unix-like
log_file = f"/tmp/py-libp2p_{timestamp}_{unique_id}.log"
# Use cross-platform temp file creation
from libp2p.utils.paths import create_temp_file
log_file = str(create_temp_file(prefix="py-libp2p_", suffix=".log"))
# Print the log file path so users know where to find it
print(f"Logging to: {log_file}", file=sys.stderr)
@ -195,6 +198,9 @@ def setup_logging() -> None:
logger.setLevel(level)
logger.propagate = False # Prevent message duplication
# Store handlers globally for cleanup
_current_handlers.extend(handlers)
# Start the listener AFTER configuring all loggers
_current_listener = logging.handlers.QueueListener(
log_queue, *handlers, respect_handler_level=True
@ -209,7 +215,13 @@ def setup_logging() -> None:
@atexit.register
def cleanup_logging() -> None:
"""Clean up logging resources on exit."""
global _current_listener
global _current_listener, _current_handlers
if _current_listener is not None:
_current_listener.stop()
_current_listener = None
# Close all file handlers to ensure proper cleanup on Windows
for handler in _current_handlers:
if isinstance(handler, logging.FileHandler):
handler.close()
_current_handlers.clear()

267
libp2p/utils/paths.py Normal file
View File

@ -0,0 +1,267 @@
"""
Cross-platform path utilities for py-libp2p.
This module provides standardized path operations to ensure consistent
behavior across Windows, macOS, and Linux platforms.
"""
import os
from pathlib import Path
import sys
import tempfile
from typing import Union
PathLike = Union[str, Path]
def get_temp_dir() -> Path:
"""
Get cross-platform temporary directory.
Returns:
Path: Platform-specific temporary directory path
"""
return Path(tempfile.gettempdir())
def get_project_root() -> Path:
"""
Get the project root directory.
Returns:
Path: Path to the py-libp2p project root
"""
# Navigate from libp2p/utils/paths.py to project root
return Path(__file__).parent.parent.parent
def join_paths(*parts: PathLike) -> Path:
"""
Cross-platform path joining.
Args:
*parts: Path components to join
Returns:
Path: Joined path using platform-appropriate separator
"""
return Path(*parts)
def ensure_dir_exists(path: PathLike) -> Path:
"""
Ensure directory exists, create if needed.
Args:
path: Directory path to ensure exists
Returns:
Path: Path object for the directory
"""
path_obj = Path(path)
path_obj.mkdir(parents=True, exist_ok=True)
return path_obj
def get_config_dir() -> Path:
"""
Get user config directory (cross-platform).
Returns:
Path: Platform-specific config directory
"""
if os.name == "nt": # Windows
appdata = os.environ.get("APPDATA", "")
if appdata:
return Path(appdata) / "py-libp2p"
else:
# Fallback to user home directory
return Path.home() / "AppData" / "Roaming" / "py-libp2p"
else: # Unix-like (Linux, macOS)
return Path.home() / ".config" / "py-libp2p"
def get_script_dir(script_path: PathLike | None = None) -> Path:
"""
Get the directory containing a script file.
Args:
script_path: Path to the script file. If None, uses __file__
Returns:
Path: Directory containing the script
Raises:
RuntimeError: If script path cannot be determined
"""
if script_path is None:
# This will be the directory of the calling script
import inspect
frame = inspect.currentframe()
if frame and frame.f_back:
script_path = frame.f_back.f_globals.get("__file__")
else:
raise RuntimeError("Could not determine script path")
if script_path is None:
raise RuntimeError("Script path is None")
return Path(script_path).parent.absolute()
def create_temp_file(prefix: str = "py-libp2p_", suffix: str = ".log") -> Path:
"""
Create a temporary file with a unique name.
Args:
prefix: File name prefix
suffix: File name suffix
Returns:
Path: Path to the created temporary file
"""
temp_dir = get_temp_dir()
# Create a unique filename using timestamp and random bytes
import secrets
import time
timestamp = time.strftime("%Y%m%d_%H%M%S")
microseconds = f"{time.time() % 1:.6f}"[2:] # Get microseconds as string
unique_id = secrets.token_hex(4)
filename = f"{prefix}{timestamp}_{microseconds}_{unique_id}{suffix}"
temp_file = temp_dir / filename
# Create the file by touching it
temp_file.touch()
return temp_file
def resolve_relative_path(base_path: PathLike, relative_path: PathLike) -> Path:
"""
Resolve a relative path from a base path.
Args:
base_path: Base directory path
relative_path: Relative path to resolve
Returns:
Path: Resolved absolute path
"""
base = Path(base_path).resolve()
relative = Path(relative_path)
if relative.is_absolute():
return relative
else:
return (base / relative).resolve()
def normalize_path(path: PathLike) -> Path:
"""
Normalize a path, resolving any symbolic links and relative components.
Args:
path: Path to normalize
Returns:
Path: Normalized absolute path
"""
return Path(path).resolve()
def get_venv_path() -> Path | None:
"""
Get virtual environment path if active.
Returns:
Path: Virtual environment path if active, None otherwise
"""
venv_path = os.environ.get("VIRTUAL_ENV")
if venv_path:
return Path(venv_path)
return None
def get_python_executable() -> Path:
"""
Get current Python executable path.
Returns:
Path: Path to the current Python executable
"""
return Path(sys.executable)
def find_executable(name: str) -> Path | None:
"""
Find executable in system PATH.
Args:
name: Name of the executable to find
Returns:
Path: Path to executable if found, None otherwise
"""
# Check if name already contains path
if os.path.dirname(name):
path = Path(name)
if path.exists() and os.access(path, os.X_OK):
return path
return None
# Search in PATH
for path_dir in os.environ.get("PATH", "").split(os.pathsep):
if not path_dir:
continue
path = Path(path_dir) / name
if path.exists() and os.access(path, os.X_OK):
return path
return None
def get_script_binary_path() -> Path:
"""
Get path to script's binary directory.
Returns:
Path: Directory containing the script's binary
"""
return get_python_executable().parent
def get_binary_path(binary_name: str) -> Path | None:
"""
Find binary in PATH or virtual environment.
Args:
binary_name: Name of the binary to find
Returns:
Path: Path to binary if found, None otherwise
"""
# First check in virtual environment if active
venv_path = get_venv_path()
if venv_path:
venv_bin = venv_path / "bin" if os.name != "nt" else venv_path / "Scripts"
binary_path = venv_bin / binary_name
if binary_path.exists() and os.access(binary_path, os.X_OK):
return binary_path
# Fall back to system PATH
return find_executable(binary_name)