Merge branch 'main' into dependency-chore

This commit is contained in:
Manu Sheel Gupta
2025-08-18 22:04:14 +05:30
committed by GitHub
20 changed files with 560 additions and 139 deletions

View File

@ -69,6 +69,8 @@ PYI = $(PB:.proto=_pb2.pyi)
## Set default to `protobufs`, otherwise `format` is called when typing only `make` ## Set default to `protobufs`, otherwise `format` is called when typing only `make`
all: protobufs all: protobufs
.PHONY: protobufs clean-proto
protobufs: $(PY) protobufs: $(PY)
%_pb2.py: %.proto %_pb2.py: %.proto
@ -77,6 +79,11 @@ protobufs: $(PY)
clean-proto: clean-proto:
rm -f $(PY) $(PYI) rm -f $(PY) $(PYI)
# Force protobuf regeneration by making them always out of date
$(PY): FORCE
FORCE:
# docs commands # docs commands
docs: check-docs docs: check-docs

View File

@ -12,13 +12,13 @@
[![Build Status](https://img.shields.io/github/actions/workflow/status/libp2p/py-libp2p/tox.yml?branch=main&label=build%20status)](https://github.com/libp2p/py-libp2p/actions/workflows/tox.yml) [![Build Status](https://img.shields.io/github/actions/workflow/status/libp2p/py-libp2p/tox.yml?branch=main&label=build%20status)](https://github.com/libp2p/py-libp2p/actions/workflows/tox.yml)
[![Docs build](https://readthedocs.org/projects/py-libp2p/badge/?version=latest)](http://py-libp2p.readthedocs.io/en/latest/?badge=latest) [![Docs build](https://readthedocs.org/projects/py-libp2p/badge/?version=latest)](http://py-libp2p.readthedocs.io/en/latest/?badge=latest)
> ⚠️ **Warning:** py-libp2p is an experimental and work-in-progress repo under development. We do not yet recommend using py-libp2p in production environments. > py-libp2p has moved beyond its experimental roots and is steadily progressing toward production readiness. The core features are stable, and were focused on refining performance, expanding protocol support, and ensuring smooth interop with other libp2p implementations. We welcome contributions and real-world usage feedback to help us reach full production maturity.
Read more in the [documentation on ReadTheDocs](https://py-libp2p.readthedocs.io/). [View the release notes](https://py-libp2p.readthedocs.io/en/latest/release_notes.html). Read more in the [documentation on ReadTheDocs](https://py-libp2p.readthedocs.io/). [View the release notes](https://py-libp2p.readthedocs.io/en/latest/release_notes.html).
## Maintainers ## Maintainers
Currently maintained by [@pacrob](https://github.com/pacrob), [@seetadev](https://github.com/seetadev) and [@dhuseby](https://github.com/dhuseby), looking for assistance! Currently maintained by [@pacrob](https://github.com/pacrob), [@seetadev](https://github.com/seetadev) and [@dhuseby](https://github.com/dhuseby). Please reach out to us for collaboration or active feedback. If you have questions, feel free to open a new [discussion](https://github.com/libp2p/py-libp2p/discussions). We are also available on the libp2p Discord — join us at #py-libp2p [sub-channel](https://discord.gg/d92MEugb).
## Feature Breakdown ## Feature Breakdown

View File

@ -13,7 +13,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1dlibp2p/crypto/pb/crypto.proto\x12\tcrypto.pb\"?\n\tPublicKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"@\n\nPrivateKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c*G\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03\x12\x0c\n\x08\x45\x43\x43_P256\x10\x04') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1dlibp2p/crypto/pb/crypto.proto\x12\tcrypto.pb\"?\n\tPublicKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"@\n\nPrivateKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c*S\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03\x12\x0c\n\x08\x45\x43\x43_P256\x10\x04\x12\n\n\x06X25519\x10\x05')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.crypto.pb.crypto_pb2', globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.crypto.pb.crypto_pb2', globals())
@ -21,7 +21,7 @@ if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None DESCRIPTOR._options = None
_KEYTYPE._serialized_start=175 _KEYTYPE._serialized_start=175
_KEYTYPE._serialized_end=246 _KEYTYPE._serialized_end=258
_PUBLICKEY._serialized_start=44 _PUBLICKEY._serialized_start=44
_PUBLICKEY._serialized_end=107 _PUBLICKEY._serialized_end=107
_PRIVATEKEY._serialized_start=109 _PRIVATEKEY._serialized_start=109

View File

@ -28,6 +28,7 @@ class _KeyTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTy
Secp256k1: _KeyType.ValueType # 2 Secp256k1: _KeyType.ValueType # 2
ECDSA: _KeyType.ValueType # 3 ECDSA: _KeyType.ValueType # 3
ECC_P256: _KeyType.ValueType # 4 ECC_P256: _KeyType.ValueType # 4
X25519: _KeyType.ValueType # 5
class KeyType(_KeyType, metaclass=_KeyTypeEnumTypeWrapper): ... class KeyType(_KeyType, metaclass=_KeyTypeEnumTypeWrapper): ...
@ -36,6 +37,7 @@ Ed25519: KeyType.ValueType # 1
Secp256k1: KeyType.ValueType # 2 Secp256k1: KeyType.ValueType # 2
ECDSA: KeyType.ValueType # 3 ECDSA: KeyType.ValueType # 3
ECC_P256: KeyType.ValueType # 4 ECC_P256: KeyType.ValueType # 4
X25519: KeyType.ValueType # 5
global___KeyType = KeyType global___KeyType = KeyType
@typing.final @typing.final

View File

@ -1,12 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT! # Generated by the protocol buffer compiler. DO NOT EDIT!
# source: libp2p/identity/identify/pb/identify.proto # source: libp2p/identity/identify/pb/identify.proto
# Protobuf Python Version: 4.25.3
"""Generated protocol buffer code.""" """Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports) # @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default() _sym_db = _symbol_database.Default()
@ -16,11 +15,11 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*libp2p/identity/identify/pb/identify.proto\x12\x0bidentify.pb\"\xa9\x01\n\x08Identify\x12\x18\n\x10protocol_version\x18\x05 \x01(\t\x12\x15\n\ragent_version\x18\x06 \x01(\t\x12\x12\n\npublic_key\x18\x01 \x01(\x0c\x12\x14\n\x0clisten_addrs\x18\x02 \x03(\x0c\x12\x15\n\robserved_addr\x18\x04 \x01(\x0c\x12\x11\n\tprotocols\x18\x03 \x03(\t\x12\x18\n\x10signedPeerRecord\x18\x08 \x01(\x0c') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*libp2p/identity/identify/pb/identify.proto\x12\x0bidentify.pb\"\xa9\x01\n\x08Identify\x12\x18\n\x10protocol_version\x18\x05 \x01(\t\x12\x15\n\ragent_version\x18\x06 \x01(\t\x12\x12\n\npublic_key\x18\x01 \x01(\x0c\x12\x14\n\x0clisten_addrs\x18\x02 \x03(\x0c\x12\x15\n\robserved_addr\x18\x04 \x01(\x0c\x12\x11\n\tprotocols\x18\x03 \x03(\t\x12\x18\n\x10signedPeerRecord\x18\x08 \x01(\x0c')
_globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.identity.identify.pb.identify_pb2', globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.identity.identify.pb.identify_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False: if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None DESCRIPTOR._options = None
_globals['_IDENTIFY']._serialized_start=60 _IDENTIFY._serialized_start=60
_globals['_IDENTIFY']._serialized_end=229 _IDENTIFY._serialized_end=229
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@ -1,24 +1,49 @@
from google.protobuf.internal import containers as _containers """
from google.protobuf import descriptor as _descriptor @generated by mypy-protobuf. Do not edit manually!
from google.protobuf import message as _message isort:skip_file
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Optional as _Optional """
DESCRIPTOR: _descriptor.FileDescriptor import builtins
import collections.abc
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.message
import typing
class Identify(_message.Message): DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
__slots__ = ("protocol_version", "agent_version", "public_key", "listen_addrs", "observed_addr", "protocols", "signedPeerRecord")
PROTOCOL_VERSION_FIELD_NUMBER: _ClassVar[int] @typing.final
AGENT_VERSION_FIELD_NUMBER: _ClassVar[int] class Identify(google.protobuf.message.Message):
PUBLIC_KEY_FIELD_NUMBER: _ClassVar[int] DESCRIPTOR: google.protobuf.descriptor.Descriptor
LISTEN_ADDRS_FIELD_NUMBER: _ClassVar[int]
OBSERVED_ADDR_FIELD_NUMBER: _ClassVar[int] PROTOCOL_VERSION_FIELD_NUMBER: builtins.int
PROTOCOLS_FIELD_NUMBER: _ClassVar[int] AGENT_VERSION_FIELD_NUMBER: builtins.int
SIGNEDPEERRECORD_FIELD_NUMBER: _ClassVar[int] PUBLIC_KEY_FIELD_NUMBER: builtins.int
protocol_version: str LISTEN_ADDRS_FIELD_NUMBER: builtins.int
agent_version: str OBSERVED_ADDR_FIELD_NUMBER: builtins.int
public_key: bytes PROTOCOLS_FIELD_NUMBER: builtins.int
listen_addrs: _containers.RepeatedScalarFieldContainer[bytes] SIGNEDPEERRECORD_FIELD_NUMBER: builtins.int
observed_addr: bytes protocol_version: builtins.str
protocols: _containers.RepeatedScalarFieldContainer[str] agent_version: builtins.str
signedPeerRecord: bytes public_key: builtins.bytes
def __init__(self, protocol_version: _Optional[str] = ..., agent_version: _Optional[str] = ..., public_key: _Optional[bytes] = ..., listen_addrs: _Optional[_Iterable[bytes]] = ..., observed_addr: _Optional[bytes] = ..., protocols: _Optional[_Iterable[str]] = ..., signedPeerRecord: _Optional[bytes] = ...) -> None: ... observed_addr: builtins.bytes
signedPeerRecord: builtins.bytes
@property
def listen_addrs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ...
@property
def protocols(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
def __init__(
self,
*,
protocol_version: builtins.str | None = ...,
agent_version: builtins.str | None = ...,
public_key: builtins.bytes | None = ...,
listen_addrs: collections.abc.Iterable[builtins.bytes] | None = ...,
observed_addr: builtins.bytes | None = ...,
protocols: collections.abc.Iterable[builtins.str] | None = ...,
signedPeerRecord: builtins.bytes | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["agent_version", b"agent_version", "observed_addr", b"observed_addr", "protocol_version", b"protocol_version", "public_key", b"public_key", "signedPeerRecord", b"signedPeerRecord"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["agent_version", b"agent_version", "listen_addrs", b"listen_addrs", "observed_addr", b"observed_addr", "protocol_version", b"protocol_version", "protocols", b"protocols", "public_key", b"public_key", "signedPeerRecord", b"signedPeerRecord"]) -> None: ...
global___Identify = Identify

View File

@ -2,10 +2,10 @@
# Generated by the protocol buffer compiler. DO NOT EDIT! # Generated by the protocol buffer compiler. DO NOT EDIT!
# source: libp2p/kad_dht/pb/kademlia.proto # source: libp2p/kad_dht/pb/kademlia.proto
"""Generated protocol buffer code.""" """Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports) # @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default() _sym_db = _symbol_database.Default()
@ -15,19 +15,19 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n libp2p/kad_dht/pb/kademlia.proto\":\n\x06Record\x12\x0b\n\x03key\x18\x01 \x01(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\x12\x14\n\x0ctimeReceived\x18\x05 \x01(\t\"\xca\x03\n\x07Message\x12\"\n\x04type\x18\x01 \x01(\x0e\x32\x14.Message.MessageType\x12\x17\n\x0f\x63lusterLevelRaw\x18\n \x01(\x05\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x17\n\x06record\x18\x03 \x01(\x0b\x32\x07.Record\x12\"\n\x0b\x63loserPeers\x18\x08 \x03(\x0b\x32\r.Message.Peer\x12$\n\rproviderPeers\x18\t \x03(\x0b\x32\r.Message.Peer\x1aN\n\x04Peer\x12\n\n\x02id\x18\x01 \x01(\x0c\x12\r\n\x05\x61\x64\x64rs\x18\x02 \x03(\x0c\x12+\n\nconnection\x18\x03 \x01(\x0e\x32\x17.Message.ConnectionType\"i\n\x0bMessageType\x12\r\n\tPUT_VALUE\x10\x00\x12\r\n\tGET_VALUE\x10\x01\x12\x10\n\x0c\x41\x44\x44_PROVIDER\x10\x02\x12\x11\n\rGET_PROVIDERS\x10\x03\x12\r\n\tFIND_NODE\x10\x04\x12\x08\n\x04PING\x10\x05\"W\n\x0e\x43onnectionType\x12\x11\n\rNOT_CONNECTED\x10\x00\x12\r\n\tCONNECTED\x10\x01\x12\x0f\n\x0b\x43\x41N_CONNECT\x10\x02\x12\x12\n\x0e\x43\x41NNOT_CONNECT\x10\x03\x62\x06proto3') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n libp2p/kad_dht/pb/kademlia.proto\":\n\x06Record\x12\x0b\n\x03key\x18\x01 \x01(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\x12\x14\n\x0ctimeReceived\x18\x05 \x01(\t\"\xca\x03\n\x07Message\x12\"\n\x04type\x18\x01 \x01(\x0e\x32\x14.Message.MessageType\x12\x17\n\x0f\x63lusterLevelRaw\x18\n \x01(\x05\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x17\n\x06record\x18\x03 \x01(\x0b\x32\x07.Record\x12\"\n\x0b\x63loserPeers\x18\x08 \x03(\x0b\x32\r.Message.Peer\x12$\n\rproviderPeers\x18\t \x03(\x0b\x32\r.Message.Peer\x1aN\n\x04Peer\x12\n\n\x02id\x18\x01 \x01(\x0c\x12\r\n\x05\x61\x64\x64rs\x18\x02 \x03(\x0c\x12+\n\nconnection\x18\x03 \x01(\x0e\x32\x17.Message.ConnectionType\"i\n\x0bMessageType\x12\r\n\tPUT_VALUE\x10\x00\x12\r\n\tGET_VALUE\x10\x01\x12\x10\n\x0c\x41\x44\x44_PROVIDER\x10\x02\x12\x11\n\rGET_PROVIDERS\x10\x03\x12\r\n\tFIND_NODE\x10\x04\x12\x08\n\x04PING\x10\x05\"W\n\x0e\x43onnectionType\x12\x11\n\rNOT_CONNECTED\x10\x00\x12\r\n\tCONNECTED\x10\x01\x12\x0f\n\x0b\x43\x41N_CONNECT\x10\x02\x12\x12\n\x0e\x43\x41NNOT_CONNECT\x10\x03\x62\x06proto3')
_globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.kad_dht.pb.kademlia_pb2', globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.kad_dht.pb.kademlia_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False: if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None DESCRIPTOR._options = None
_globals['_RECORD']._serialized_start=36 _RECORD._serialized_start=36
_globals['_RECORD']._serialized_end=94 _RECORD._serialized_end=94
_globals['_MESSAGE']._serialized_start=97 _MESSAGE._serialized_start=97
_globals['_MESSAGE']._serialized_end=555 _MESSAGE._serialized_end=555
_globals['_MESSAGE_PEER']._serialized_start=281 _MESSAGE_PEER._serialized_start=281
_globals['_MESSAGE_PEER']._serialized_end=359 _MESSAGE_PEER._serialized_end=359
_globals['_MESSAGE_MESSAGETYPE']._serialized_start=361 _MESSAGE_MESSAGETYPE._serialized_start=361
_globals['_MESSAGE_MESSAGETYPE']._serialized_end=466 _MESSAGE_MESSAGETYPE._serialized_end=466
_globals['_MESSAGE_CONNECTIONTYPE']._serialized_start=468 _MESSAGE_CONNECTIONTYPE._serialized_start=468
_globals['_MESSAGE_CONNECTIONTYPE']._serialized_end=555 _MESSAGE_CONNECTIONTYPE._serialized_end=555
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@ -8,6 +8,7 @@ from collections import (
import logging import logging
import time import time
import multihash
import trio import trio
from libp2p.abc import ( from libp2p.abc import (
@ -40,6 +41,22 @@ PEER_REFRESH_INTERVAL = 60 # Interval to refresh peers in seconds
STALE_PEER_THRESHOLD = 3600 # Time in seconds after which a peer is considered stale STALE_PEER_THRESHOLD = 3600 # Time in seconds after which a peer is considered stale
def peer_id_to_key(peer_id: ID) -> bytes:
"""
Convert a peer ID to a 256-bit key for routing table operations.
This normalizes all peer IDs to exactly 256 bits by hashing them with SHA-256.
:param peer_id: The peer ID to convert
:return: 32-byte (256-bit) key for routing table operations
"""
return multihash.digest(peer_id.to_bytes(), "sha2-256").digest
def key_to_int(key: bytes) -> int:
"""Convert a 256-bit key to an integer for range calculations."""
return int.from_bytes(key, byteorder="big")
class KBucket: class KBucket:
""" """
A k-bucket implementation for the Kademlia DHT. A k-bucket implementation for the Kademlia DHT.
@ -357,9 +374,24 @@ class KBucket:
True if the key is in range, False otherwise True if the key is in range, False otherwise
""" """
key_int = int.from_bytes(key, byteorder="big") key_int = key_to_int(key)
return self.min_range <= key_int < self.max_range return self.min_range <= key_int < self.max_range
def peer_id_in_range(self, peer_id: ID) -> bool:
"""
Check if a peer ID is in the range of this bucket.
params: peer_id: The peer ID to check
Returns
-------
bool
True if the peer ID is in range, False otherwise
"""
key = peer_id_to_key(peer_id)
return self.key_in_range(key)
def split(self) -> tuple["KBucket", "KBucket"]: def split(self) -> tuple["KBucket", "KBucket"]:
""" """
Split the bucket into two buckets. Split the bucket into two buckets.
@ -376,8 +408,9 @@ class KBucket:
# Redistribute peers # Redistribute peers
for peer_id, (peer_info, timestamp) in self.peers.items(): for peer_id, (peer_info, timestamp) in self.peers.items():
peer_key = int.from_bytes(peer_id.to_bytes(), byteorder="big") peer_key = peer_id_to_key(peer_id)
if peer_key < midpoint: peer_key_int = key_to_int(peer_key)
if peer_key_int < midpoint:
lower_bucket.peers[peer_id] = (peer_info, timestamp) lower_bucket.peers[peer_id] = (peer_info, timestamp)
else: else:
upper_bucket.peers[peer_id] = (peer_info, timestamp) upper_bucket.peers[peer_id] = (peer_info, timestamp)
@ -458,7 +491,38 @@ class RoutingTable:
success = await bucket.add_peer(peer_info) success = await bucket.add_peer(peer_info)
if success: if success:
logger.debug(f"Successfully added peer {peer_id} to routing table") logger.debug(f"Successfully added peer {peer_id} to routing table")
return success return True
# If bucket is full and couldn't add peer, try splitting the bucket
# Only split if the bucket contains our Peer ID
if self._should_split_bucket(bucket):
logger.debug(
f"Bucket is full, attempting to split bucket for peer {peer_id}"
)
split_success = self._split_bucket(bucket)
if split_success:
# After splitting,
# find the appropriate bucket for the peer and try to add it
target_bucket = self.find_bucket(peer_info.peer_id)
success = await target_bucket.add_peer(peer_info)
if success:
logger.debug(
f"Successfully added peer {peer_id} after bucket split"
)
return True
else:
logger.debug(
f"Failed to add peer {peer_id} even after bucket split"
)
return False
else:
logger.debug(f"Failed to split bucket for peer {peer_id}")
return False
else:
logger.debug(
f"Bucket is full and cannot be split, peer {peer_id} not added"
)
return False
except Exception as e: except Exception as e:
logger.debug(f"Error adding peer {peer_obj} to routing table: {e}") logger.debug(f"Error adding peer {peer_obj} to routing table: {e}")
@ -480,9 +544,9 @@ class RoutingTable:
def find_bucket(self, peer_id: ID) -> KBucket: def find_bucket(self, peer_id: ID) -> KBucket:
""" """
Find the bucket that would contain the given peer ID or PeerInfo. Find the bucket that would contain the given peer ID.
:param peer_obj: Either a peer ID or a PeerInfo object :param peer_id: The peer ID to find a bucket for
Returns Returns
------- -------
@ -490,7 +554,7 @@ class RoutingTable:
""" """
for bucket in self.buckets: for bucket in self.buckets:
if bucket.key_in_range(peer_id.to_bytes()): if bucket.peer_id_in_range(peer_id):
return bucket return bucket
return self.buckets[0] return self.buckets[0]
@ -513,7 +577,11 @@ class RoutingTable:
all_peers.extend(bucket.peer_ids()) all_peers.extend(bucket.peer_ids())
# Sort by XOR distance to the key # Sort by XOR distance to the key
all_peers.sort(key=lambda p: xor_distance(p.to_bytes(), key)) def distance_to_key(peer_id: ID) -> int:
peer_key = peer_id_to_key(peer_id)
return xor_distance(peer_key, key)
all_peers.sort(key=distance_to_key)
return all_peers[:count] return all_peers[:count]
@ -591,6 +659,20 @@ class RoutingTable:
stale_peers.extend(bucket.get_stale_peers(stale_threshold_seconds)) stale_peers.extend(bucket.get_stale_peers(stale_threshold_seconds))
return stale_peers return stale_peers
def get_peer_infos(self) -> list[PeerInfo]:
"""
Get all PeerInfo objects in the routing table.
Returns
-------
List[PeerInfo]: List of all PeerInfo objects
"""
peer_infos = []
for bucket in self.buckets:
peer_infos.extend(bucket.peer_infos())
return peer_infos
def cleanup_routing_table(self) -> None: def cleanup_routing_table(self) -> None:
""" """
Cleanup the routing table by removing all data. Cleanup the routing table by removing all data.
@ -598,3 +680,66 @@ class RoutingTable:
""" """
self.buckets = [KBucket(self.host, BUCKET_SIZE)] self.buckets = [KBucket(self.host, BUCKET_SIZE)]
logger.info("Routing table cleaned up, all data removed.") logger.info("Routing table cleaned up, all data removed.")
def _should_split_bucket(self, bucket: KBucket) -> bool:
"""
Check if a bucket should be split according to Kademlia rules.
:param bucket: The bucket to check
:return: True if the bucket should be split
"""
# Check if we've exceeded maximum buckets
if len(self.buckets) >= MAXIMUM_BUCKETS:
logger.debug("Maximum number of buckets reached, cannot split")
return False
# Check if the bucket contains our local ID
local_key = peer_id_to_key(self.local_id)
local_key_int = key_to_int(local_key)
contains_local_id = bucket.min_range <= local_key_int < bucket.max_range
logger.debug(
f"Bucket range: {bucket.min_range} - {bucket.max_range}, "
f"local_key_int: {local_key_int}, contains_local: {contains_local_id}"
)
return contains_local_id
def _split_bucket(self, bucket: KBucket) -> bool:
"""
Split a bucket into two buckets.
:param bucket: The bucket to split
:return: True if the bucket was successfully split
"""
try:
# Find the bucket index
bucket_index = self.buckets.index(bucket)
logger.debug(f"Splitting bucket at index {bucket_index}")
# Split the bucket
lower_bucket, upper_bucket = bucket.split()
# Replace the original bucket with the two new buckets
self.buckets[bucket_index] = lower_bucket
self.buckets.insert(bucket_index + 1, upper_bucket)
logger.debug(
f"Bucket split successful. New bucket count: {len(self.buckets)}"
)
logger.debug(
f"Lower bucket range: "
f"{lower_bucket.min_range} - {lower_bucket.max_range}, "
f"peers: {lower_bucket.size()}"
)
logger.debug(
f"Upper bucket range: "
f"{upper_bucket.min_range} - {upper_bucket.max_range}, "
f"peers: {upper_bucket.size()}"
)
return True
except Exception as e:
logger.error(f"Error splitting bucket: {e}")
return False

View File

@ -23,7 +23,8 @@ if TYPE_CHECKING:
""" """
Reference: https://github.com/libp2p/go-libp2p-swarm/blob/04c86bbdafd390651cb2ee14e334f7caeedad722/swarm_conn.go Reference: https://github.com/libp2p/go-libp2p-swarm/blob/
04c86bbdafd390651cb2ee14e334f7caeedad722/swarm_conn.go
""" """
@ -43,6 +44,21 @@ class SwarmConn(INetConn):
self.streams = set() self.streams = set()
self.event_closed = trio.Event() self.event_closed = trio.Event()
self.event_started = trio.Event() self.event_started = trio.Event()
# Provide back-references/hooks expected by NetStream
try:
setattr(self.muxed_conn, "swarm", self.swarm)
# NetStream expects an awaitable remove_stream hook
async def _remove_stream_hook(stream: NetStream) -> None:
self.remove_stream(stream)
setattr(self.muxed_conn, "remove_stream", _remove_stream_hook)
except Exception as e:
logging.warning(
f"Failed to set optional conveniences on muxed_conn "
f"for peer {muxed_conn.peer_id}: {e}"
)
# optional conveniences
if hasattr(muxed_conn, "on_close"): if hasattr(muxed_conn, "on_close"):
logging.debug(f"Setting on_close for peer {muxed_conn.peer_id}") logging.debug(f"Setting on_close for peer {muxed_conn.peer_id}")
setattr(muxed_conn, "on_close", self._on_muxed_conn_closed) setattr(muxed_conn, "on_close", self._on_muxed_conn_closed)

View File

@ -1,3 +1,7 @@
from collections.abc import (
Awaitable,
Callable,
)
import logging import logging
from multiaddr import ( from multiaddr import (
@ -326,8 +330,16 @@ class Swarm(Service, INetworkService):
# Close all listeners # Close all listeners
if hasattr(self, "listeners"): if hasattr(self, "listeners"):
for listener in self.listeners.values(): for maddr_str, listener in self.listeners.items():
await listener.close() await listener.close()
# Notify about listener closure
try:
multiaddr = Multiaddr(maddr_str)
await self.notify_listen_close(multiaddr)
except Exception as e:
logger.warning(
f"Failed to notify listen_close for {maddr_str}: {e}"
)
self.listeners.clear() self.listeners.clear()
# Close the transport if it exists and has a close method # Close the transport if it exists and has a close method
@ -411,7 +423,17 @@ class Swarm(Service, INetworkService):
nursery.start_soon(notifee.listen, self, multiaddr) nursery.start_soon(notifee.listen, self, multiaddr)
async def notify_closed_stream(self, stream: INetStream) -> None: async def notify_closed_stream(self, stream: INetStream) -> None:
raise NotImplementedError async with trio.open_nursery() as nursery:
for notifee in self.notifees:
nursery.start_soon(notifee.closed_stream, self, stream)
async def notify_listen_close(self, multiaddr: Multiaddr) -> None: async def notify_listen_close(self, multiaddr: Multiaddr) -> None:
raise NotImplementedError async with trio.open_nursery() as nursery:
for notifee in self.notifees:
nursery.start_soon(notifee.listen_close, self, multiaddr)
# Generic notifier used by NetStream._notify_closed
async def notify_all(self, notifier: Callable[[INotifee], Awaitable[None]]) -> None:
async with trio.open_nursery() as nursery:
for notifee in self.notifees:
nursery.start_soon(notifier, notifee)

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT! # Generated by the protocol buffer compiler. DO NOT EDIT!
# source: rpc.proto # source: libp2p/pubsub/pb/rpc.proto
"""Generated protocol buffer code.""" """Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor as _descriptor
@ -13,39 +13,39 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'rpc_pb2', globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False: if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None DESCRIPTOR._options = None
_RPC._serialized_start=25 _RPC._serialized_start=42
_RPC._serialized_end=205 _RPC._serialized_end=222
_RPC_SUBOPTS._serialized_start=160 _RPC_SUBOPTS._serialized_start=177
_RPC_SUBOPTS._serialized_end=205 _RPC_SUBOPTS._serialized_end=222
_MESSAGE._serialized_start=207 _MESSAGE._serialized_start=224
_MESSAGE._serialized_end=312 _MESSAGE._serialized_end=329
_CONTROLMESSAGE._serialized_start=315 _CONTROLMESSAGE._serialized_start=332
_CONTROLMESSAGE._serialized_end=491 _CONTROLMESSAGE._serialized_end=508
_CONTROLIHAVE._serialized_start=493 _CONTROLIHAVE._serialized_start=510
_CONTROLIHAVE._serialized_end=544 _CONTROLIHAVE._serialized_end=561
_CONTROLIWANT._serialized_start=546 _CONTROLIWANT._serialized_start=563
_CONTROLIWANT._serialized_end=580 _CONTROLIWANT._serialized_end=597
_CONTROLGRAFT._serialized_start=582 _CONTROLGRAFT._serialized_start=599
_CONTROLGRAFT._serialized_end=613 _CONTROLGRAFT._serialized_end=630
_CONTROLPRUNE._serialized_start=615 _CONTROLPRUNE._serialized_start=632
_CONTROLPRUNE._serialized_end=699 _CONTROLPRUNE._serialized_end=716
_PEERINFO._serialized_start=701 _PEERINFO._serialized_start=718
_PEERINFO._serialized_end=753 _PEERINFO._serialized_end=770
_TOPICDESCRIPTOR._serialized_start=756 _TOPICDESCRIPTOR._serialized_start=773
_TOPICDESCRIPTOR._serialized_end=1147 _TOPICDESCRIPTOR._serialized_end=1164
_TOPICDESCRIPTOR_AUTHOPTS._serialized_start=889 _TOPICDESCRIPTOR_AUTHOPTS._serialized_start=906
_TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1013 _TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1030
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=975 _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=992
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1013 _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1030
_TOPICDESCRIPTOR_ENCOPTS._serialized_start=1016 _TOPICDESCRIPTOR_ENCOPTS._serialized_start=1033
_TOPICDESCRIPTOR_ENCOPTS._serialized_end=1147 _TOPICDESCRIPTOR_ENCOPTS._serialized_end=1164
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1104 _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1121
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1147 _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1164
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT! # Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: libp2p/relay/circuit_v2/pb/circuit.proto # source: libp2p/relay/circuit_v2/pb/circuit.proto
"""Generated protocol buffer code.""" """Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder from google.protobuf.internal import builder as _builder
@ -12,11 +11,14 @@ from google.protobuf import symbol_database as _symbol_database
_sym_db = _symbol_database.Default() _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n(libp2p/relay/circuit_v2/pb/circuit.proto\x12\rcircuit.pb.v2\"\xf3\x01\n\nHopMessage\x12,\n\x04type\x18\x01 \x01(\x0e\x32\x1e.circuit.pb.v2.HopMessage.Type\x12\x0c\n\x04peer\x18\x02 \x01(\x0c\x12/\n\x0breservation\x18\x03 \x01(\x0b\x32\x1a.circuit.pb.v2.Reservation\x12#\n\x05limit\x18\x04 \x01(\x0b\x32\x14.circuit.pb.v2.Limit\x12%\n\x06status\x18\x05 \x01(\x0b\x32\x15.circuit.pb.v2.Status\",\n\x04Type\x12\x0b\n\x07RESERVE\x10\x00\x12\x0b\n\x07\x43ONNECT\x10\x01\x12\n\n\x06STATUS\x10\x02\"\x92\x01\n\x0bStopMessage\x12-\n\x04type\x18\x01 \x01(\x0e\x32\x1f.circuit.pb.v2.StopMessage.Type\x12\x0c\n\x04peer\x18\x02 \x01(\x0c\x12%\n\x06status\x18\x03 \x01(\x0b\x32\x15.circuit.pb.v2.Status\"\x1f\n\x04Type\x12\x0b\n\x07\x43ONNECT\x10\x00\x12\n\n\x06STATUS\x10\x01\"A\n\x0bReservation\x12\x0f\n\x07voucher\x18\x01 \x01(\x0c\x12\x11\n\tsignature\x18\x02 \x01(\x0c\x12\x0e\n\x06\x65xpire\x18\x03 \x01(\x03\"\'\n\x05Limit\x12\x10\n\x08\x64uration\x18\x01 \x01(\x03\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x03\"\xf6\x01\n\x06Status\x12(\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1a.circuit.pb.v2.Status.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\"\xb0\x01\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\x17\n\x13RESERVATION_REFUSED\x10\x64\x12\x1b\n\x17RESOURCE_LIMIT_EXCEEDED\x10\x65\x12\x15\n\x11PERMISSION_DENIED\x10\x66\x12\x16\n\x11\x43ONNECTION_FAILED\x10\xc8\x01\x12\x11\n\x0c\x44IAL_REFUSED\x10\xc9\x01\x12\x10\n\x0bSTOP_FAILED\x10\xac\x02\x12\x16\n\x11MALFORMED_MESSAGE\x10\x90\x03\x62\x06proto3') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n(libp2p/relay/circuit_v2/pb/circuit.proto\x12\rcircuit.pb.v2\"\xf3\x01\n\nHopMessage\x12,\n\x04type\x18\x01 \x01(\x0e\x32\x1e.circuit.pb.v2.HopMessage.Type\x12\x0c\n\x04peer\x18\x02 \x01(\x0c\x12/\n\x0breservation\x18\x03 \x01(\x0b\x32\x1a.circuit.pb.v2.Reservation\x12#\n\x05limit\x18\x04 \x01(\x0b\x32\x14.circuit.pb.v2.Limit\x12%\n\x06status\x18\x05 \x01(\x0b\x32\x15.circuit.pb.v2.Status\",\n\x04Type\x12\x0b\n\x07RESERVE\x10\x00\x12\x0b\n\x07\x43ONNECT\x10\x01\x12\n\n\x06STATUS\x10\x02\"\x92\x01\n\x0bStopMessage\x12-\n\x04type\x18\x01 \x01(\x0e\x32\x1f.circuit.pb.v2.StopMessage.Type\x12\x0c\n\x04peer\x18\x02 \x01(\x0c\x12%\n\x06status\x18\x03 \x01(\x0b\x32\x15.circuit.pb.v2.Status\"\x1f\n\x04Type\x12\x0b\n\x07\x43ONNECT\x10\x00\x12\n\n\x06STATUS\x10\x01\"A\n\x0bReservation\x12\x0f\n\x07voucher\x18\x01 \x01(\x0c\x12\x11\n\tsignature\x18\x02 \x01(\x0c\x12\x0e\n\x06\x65xpire\x18\x03 \x01(\x03\"\'\n\x05Limit\x12\x10\n\x08\x64uration\x18\x01 \x01(\x03\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x03\"\xf6\x01\n\x06Status\x12(\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1a.circuit.pb.v2.Status.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\"\xb0\x01\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\x17\n\x13RESERVATION_REFUSED\x10\x64\x12\x1b\n\x17RESOURCE_LIMIT_EXCEEDED\x10\x65\x12\x15\n\x11PERMISSION_DENIED\x10\x66\x12\x16\n\x11\x43ONNECTION_FAILED\x10\xc8\x01\x12\x11\n\x0c\x44IAL_REFUSED\x10\xc9\x01\x12\x10\n\x0bSTOP_FAILED\x10\xac\x02\x12\x16\n\x11MALFORMED_MESSAGE\x10\x90\x03\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.relay.circuit_v2.pb.circuit_pb2', globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.relay.circuit_v2.pb.circuit_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False: if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None DESCRIPTOR._options = None
_HOPMESSAGE._serialized_start=60 _HOPMESSAGE._serialized_start=60
_HOPMESSAGE._serialized_end=303 _HOPMESSAGE._serialized_end=303

View File

@ -13,11 +13,12 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n&libp2p/relay/circuit_v2/pb/dcutr.proto\x12\x0cholepunch.pb\"\x69\n\tHolePunch\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.holepunch.pb.HolePunch.Type\x12\x10\n\x08ObsAddrs\x18\x02 \x03(\x0c\"\x1e\n\x04Type\x12\x0b\n\x07CONNECT\x10\x64\x12\t\n\x04SYNC\x10\xac\x02') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n&libp2p/relay/circuit_v2/pb/dcutr.proto\x12\x0cholepunch.pb\"i\n\tHolePunch\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.holepunch.pb.HolePunch.Type\x12\x10\n\x08ObsAddrs\x18\x02 \x03(\x0c\"\x1e\n\x04Type\x12\x0b\n\x07\x43ONNECT\x10\x64\x12\t\n\x04SYNC\x10\xac\x02')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.relay.circuit_v2.pb.dcutr_pb2', globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.relay.circuit_v2.pb.dcutr_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False: if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None DESCRIPTOR._options = None
_HOLEPUNCH._serialized_start=56 _HOLEPUNCH._serialized_start=56
_HOLEPUNCH._serialized_end=161 _HOLEPUNCH._serialized_end=161

View File

@ -7,47 +7,46 @@ import builtins
import collections.abc import collections.abc
import google.protobuf.descriptor import google.protobuf.descriptor
import google.protobuf.internal.containers import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message import google.protobuf.message
import sys
import typing import typing
if sys.version_info >= (3, 10):
import typing as typing_extensions
else:
import typing_extensions
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
@typing.final @typing.final
class HolePunch(google.protobuf.message.Message): class HolePunch(google.protobuf.message.Message):
"""HolePunch message for the DCUtR protocol."""
DESCRIPTOR: google.protobuf.descriptor.Descriptor DESCRIPTOR: google.protobuf.descriptor.Descriptor
class Type(builtins.int): class _Type:
"""Message types for HolePunch""" ValueType = typing.NewType("ValueType", builtins.int)
@builtins.classmethod V: typing_extensions.TypeAlias = ValueType
def Name(cls, number: builtins.int) -> builtins.str: ...
@builtins.classmethod class _TypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[HolePunch._Type.ValueType], builtins.type):
def Value(cls, name: builtins.str) -> 'HolePunch.Type': ... DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
@builtins.classmethod CONNECT: HolePunch._Type.ValueType # 100
def keys(cls) -> typing.List[builtins.str]: ... SYNC: HolePunch._Type.ValueType # 300
@builtins.classmethod
def values(cls) -> typing.List['HolePunch.Type']: ... class Type(_Type, metaclass=_TypeEnumTypeWrapper): ...
@builtins.classmethod CONNECT: HolePunch.Type.ValueType # 100
def items(cls) -> typing.List[typing.Tuple[builtins.str, 'HolePunch.Type']]: ... SYNC: HolePunch.Type.ValueType # 300
CONNECT: HolePunch.Type # 100
SYNC: HolePunch.Type # 300
TYPE_FIELD_NUMBER: builtins.int TYPE_FIELD_NUMBER: builtins.int
OBSADDRS_FIELD_NUMBER: builtins.int OBSADDRS_FIELD_NUMBER: builtins.int
type: HolePunch.Type type: global___HolePunch.Type.ValueType
@property @property
def ObsAddrs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ... def ObsAddrs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ...
def __init__( def __init__(
self, self,
*, *,
type: HolePunch.Type = ..., type: global___HolePunch.Type.ValueType | None = ...,
ObsAddrs: collections.abc.Iterable[builtins.bytes] = ..., ObsAddrs: collections.abc.Iterable[builtins.bytes] | None = ...,
) -> None: ... ) -> None: ...
def HasField(self, field_name: typing.Literal["type", b"type"]) -> builtins.bool: ... def HasField(self, field_name: typing.Literal["type", b"type"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["ObsAddrs", b"ObsAddrs", "type", b"type"]) -> None: ... def ClearField(self, field_name: typing.Literal["ObsAddrs", b"ObsAddrs", "type", b"type"]) -> None: ...

View File

@ -0,0 +1 @@
Recompiled protobufs that were out of date and added a `make` rule so that protobufs are always up to date.

View File

@ -0,0 +1,6 @@
Implement closed_stream notification in MyNotifee
- Add notify_closed_stream method to swarm notification system for proper stream lifecycle management
- Integrate remove_stream hook in SwarmConn to enable stream closure notifications
- Add comprehensive tests for closed_stream functionality in test_notify.py
- Enable stream lifecycle integration for proper cleanup and resource management

View File

@ -0,0 +1 @@
Fix kbucket splitting in routing table when full. Routing table now maintains multiple kbuckets and properly distributes peers as specified by the Kademlia DHT protocol.

View File

@ -10,8 +10,10 @@ readme = "README.md"
requires-python = ">=3.10, <4.0" requires-python = ">=3.10, <4.0"
license = { text = "MIT AND Apache-2.0" } license = { text = "MIT AND Apache-2.0" }
keywords = ["libp2p", "p2p"] keywords = ["libp2p", "p2p"]
authors = [ maintainers = [
{ name = "The Ethereum Foundation", email = "snakecharmers@ethereum.org" }, { name = "pacrob", email = "pacrob@protonmail.com" },
{ name = "Manu Sheel Gupta", email = "manu@seeta.in" },
{ name = "Dave Grantham", email = "dave@aviation.community" },
] ]
dependencies = [ dependencies = [
"base58>=1.0.3", "base58>=1.0.3",

View File

@ -226,6 +226,32 @@ class TestKBucket:
class TestRoutingTable: class TestRoutingTable:
"""Test suite for RoutingTable class.""" """Test suite for RoutingTable class."""
@pytest.mark.trio
async def test_kbucket_split_behavior(self, mock_host, local_peer_id):
"""
Test that adding more than BUCKET_SIZE peers to the routing table
triggers kbucket splitting and all peers are added.
"""
routing_table = RoutingTable(local_peer_id, mock_host)
num_peers = BUCKET_SIZE + 5
peer_ids = []
for i in range(num_peers):
key_pair = create_new_key_pair()
peer_id = ID.from_pubkey(key_pair.public_key)
peer_info = PeerInfo(peer_id, [Multiaddr(f"/ip4/127.0.0.1/tcp/{9000 + i}")])
peer_ids.append(peer_id)
added = await routing_table.add_peer(peer_info)
assert added, f"Peer {peer_id} should be added"
assert len(routing_table.buckets) > 1, "KBucket splitting did not occur"
for pid in peer_ids:
assert routing_table.peer_in_table(pid), f"Peer {pid} not found after split"
all_peer_ids = routing_table.get_peer_ids()
assert set(peer_ids).issubset(set(all_peer_ids)), (
"Not all peers present after split"
)
@pytest.fixture @pytest.fixture
def mock_host(self): def mock_host(self):
"""Create a mock host for testing.""" """Create a mock host for testing."""

View File

@ -5,11 +5,12 @@ the stream passed into opened_stream is correct.
Note: Listen event does not get hit because MyNotifee is passed Note: Listen event does not get hit because MyNotifee is passed
into network after network has already started listening into network after network has already started listening
TODO: Add tests for closed_stream, listen_close when those Note: ClosedStream events are processed asynchronously and may not be
features are implemented in swarm immediately available due to the rapid nature of operations
""" """
import enum import enum
from unittest.mock import Mock
import pytest import pytest
from multiaddr import Multiaddr from multiaddr import Multiaddr
@ -29,11 +30,11 @@ from tests.utils.factories import (
class Event(enum.Enum): class Event(enum.Enum):
OpenedStream = 0 OpenedStream = 0
ClosedStream = 1 # Not implemented ClosedStream = 1
Connected = 2 Connected = 2
Disconnected = 3 Disconnected = 3
Listen = 4 Listen = 4
ListenClose = 5 # Not implemented ListenClose = 5
class MyNotifee(INotifee): class MyNotifee(INotifee):
@ -44,8 +45,11 @@ class MyNotifee(INotifee):
self.events.append(Event.OpenedStream) self.events.append(Event.OpenedStream)
async def closed_stream(self, network: INetwork, stream: INetStream) -> None: async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
# TODO: It is not implemented yet. if network is None:
pass raise ValueError("network parameter cannot be None")
if stream is None:
raise ValueError("stream parameter cannot be None")
self.events.append(Event.ClosedStream)
async def connected(self, network: INetwork, conn: INetConn) -> None: async def connected(self, network: INetwork, conn: INetConn) -> None:
self.events.append(Event.Connected) self.events.append(Event.Connected)
@ -57,8 +61,11 @@ class MyNotifee(INotifee):
self.events.append(Event.Listen) self.events.append(Event.Listen)
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None: async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
# TODO: It is not implemented yet. if network is None:
pass raise ValueError("network parameter cannot be None")
if multiaddr is None:
raise ValueError("multiaddr parameter cannot be None")
self.events.append(Event.ListenClose)
@pytest.mark.trio @pytest.mark.trio
@ -103,28 +110,188 @@ async def test_notify(security_protocol):
# Wait for events # Wait for events
assert await wait_for_event(events_0_0, Event.Connected, 1.0) assert await wait_for_event(events_0_0, Event.Connected, 1.0)
assert await wait_for_event(events_0_0, Event.OpenedStream, 1.0) assert await wait_for_event(events_0_0, Event.OpenedStream, 1.0)
# assert await wait_for_event( assert await wait_for_event(events_0_0, Event.ClosedStream, 1.0)
# events_0_0, Event.ClosedStream, 1.0
# ) # Not implemented
assert await wait_for_event(events_0_0, Event.Disconnected, 1.0) assert await wait_for_event(events_0_0, Event.Disconnected, 1.0)
assert await wait_for_event(events_0_1, Event.Connected, 1.0) assert await wait_for_event(events_0_1, Event.Connected, 1.0)
assert await wait_for_event(events_0_1, Event.OpenedStream, 1.0) assert await wait_for_event(events_0_1, Event.OpenedStream, 1.0)
# assert await wait_for_event( assert await wait_for_event(events_0_1, Event.ClosedStream, 1.0)
# events_0_1, Event.ClosedStream, 1.0
# ) # Not implemented
assert await wait_for_event(events_0_1, Event.Disconnected, 1.0) assert await wait_for_event(events_0_1, Event.Disconnected, 1.0)
assert await wait_for_event(events_1_0, Event.Connected, 1.0) assert await wait_for_event(events_1_0, Event.Connected, 1.0)
assert await wait_for_event(events_1_0, Event.OpenedStream, 1.0) assert await wait_for_event(events_1_0, Event.OpenedStream, 1.0)
# assert await wait_for_event( assert await wait_for_event(events_1_0, Event.ClosedStream, 1.0)
# events_1_0, Event.ClosedStream, 1.0
# ) # Not implemented
assert await wait_for_event(events_1_0, Event.Disconnected, 1.0) assert await wait_for_event(events_1_0, Event.Disconnected, 1.0)
assert await wait_for_event(events_1_1, Event.Connected, 1.0) assert await wait_for_event(events_1_1, Event.Connected, 1.0)
assert await wait_for_event(events_1_1, Event.OpenedStream, 1.0) assert await wait_for_event(events_1_1, Event.OpenedStream, 1.0)
# assert await wait_for_event( assert await wait_for_event(events_1_1, Event.ClosedStream, 1.0)
# events_1_1, Event.ClosedStream, 1.0
# ) # Not implemented
assert await wait_for_event(events_1_1, Event.Disconnected, 1.0) assert await wait_for_event(events_1_1, Event.Disconnected, 1.0)
# Note: ListenClose events are triggered when swarm closes during cleanup
# The test framework automatically closes listeners, triggering ListenClose
# notifications
async def wait_for_event(events_list, event, timeout=1.0):
"""Helper to wait for a specific event to appear in the events list."""
with trio.move_on_after(timeout):
while event not in events_list:
await trio.sleep(0.01)
return True
return False
@pytest.mark.trio
async def test_notify_with_closed_stream_and_listen_close():
"""Test that closed_stream and listen_close events are properly triggered."""
# Event lists for notifees
events_0 = []
events_1 = []
# Create two swarms
async with SwarmFactory.create_batch_and_listen(2) as swarms:
# Register notifees
notifee_0 = MyNotifee(events_0)
notifee_1 = MyNotifee(events_1)
swarms[0].register_notifee(notifee_0)
swarms[1].register_notifee(notifee_1)
# Connect swarms
await connect_swarm(swarms[0], swarms[1])
# Create and close a stream to trigger closed_stream event
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
await stream.close()
# Note: Events are processed asynchronously and may not be immediately available
# due to the rapid nature of operations
@pytest.mark.trio
async def test_notify_edge_cases():
"""Test edge cases for notify system."""
events = []
async with SwarmFactory.create_batch_and_listen(2) as swarms:
notifee = MyNotifee(events)
swarms[0].register_notifee(notifee)
# Connect swarms first
await connect_swarm(swarms[0], swarms[1])
# Test 1: Multiple rapid stream operations
streams = []
for _ in range(5):
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
streams.append(stream)
# Close all streams rapidly
for stream in streams:
await stream.close()
@pytest.mark.trio
async def test_my_notifee_error_handling():
"""Test error handling for invalid parameters in MyNotifee methods."""
events = []
notifee = MyNotifee(events)
# Mock objects for testing
mock_network = Mock(spec=INetwork)
mock_stream = Mock(spec=INetStream)
mock_multiaddr = Mock(spec=Multiaddr)
# Test closed_stream with None parameters
with pytest.raises(ValueError, match="network parameter cannot be None"):
await notifee.closed_stream(None, mock_stream) # type: ignore
with pytest.raises(ValueError, match="stream parameter cannot be None"):
await notifee.closed_stream(mock_network, None) # type: ignore
# Test listen_close with None parameters
with pytest.raises(ValueError, match="network parameter cannot be None"):
await notifee.listen_close(None, mock_multiaddr) # type: ignore
with pytest.raises(ValueError, match="multiaddr parameter cannot be None"):
await notifee.listen_close(mock_network, None) # type: ignore
# Verify no events were recorded due to errors
assert len(events) == 0
@pytest.mark.trio
async def test_rapid_stream_operations():
"""Test rapid stream open/close operations."""
events_0 = []
events_1 = []
async with SwarmFactory.create_batch_and_listen(2) as swarms:
notifee_0 = MyNotifee(events_0)
notifee_1 = MyNotifee(events_1)
swarms[0].register_notifee(notifee_0)
swarms[1].register_notifee(notifee_1)
# Connect swarms
await connect_swarm(swarms[0], swarms[1])
# Rapidly create and close multiple streams
streams = []
for _ in range(3):
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
streams.append(stream)
# Close all streams immediately
for stream in streams:
await stream.close()
# Verify OpenedStream events are recorded
assert events_0.count(Event.OpenedStream) == 3
assert events_1.count(Event.OpenedStream) == 3
# Close peer to trigger disconnection events
await swarms[0].close_peer(swarms[1].get_peer_id())
@pytest.mark.trio
async def test_concurrent_stream_operations():
"""Test concurrent stream operations using trio nursery."""
events_0 = []
events_1 = []
async with SwarmFactory.create_batch_and_listen(2) as swarms:
notifee_0 = MyNotifee(events_0)
notifee_1 = MyNotifee(events_1)
swarms[0].register_notifee(notifee_0)
swarms[1].register_notifee(notifee_1)
# Connect swarms
await connect_swarm(swarms[0], swarms[1])
async def create_and_close_stream():
"""Create and immediately close a stream."""
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
await stream.close()
# Run multiple stream operations concurrently
async with trio.open_nursery() as nursery:
for _ in range(4):
nursery.start_soon(create_and_close_stream)
# Verify some OpenedStream events are recorded
# (concurrent operations may not all succeed)
opened_count_0 = events_0.count(Event.OpenedStream)
opened_count_1 = events_1.count(Event.OpenedStream)
assert opened_count_0 > 0, (
f"Expected some OpenedStream events, got {opened_count_0}"
)
assert opened_count_1 > 0, (
f"Expected some OpenedStream events, got {opened_count_1}"
)
# Close peer to trigger disconnection events
await swarms[0].close_peer(swarms[1].get_peer_id())