mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Kademlia DHT implementation in py-libp2p (#579)
* initialise the module * added content routing * added routing module * added peer routing * added value store * added utilities functions * added main kademlia file * fixed create_key_from_binary function * example to test kademlia dht * added protocol ID and enhanced logging for peer store size in provider and consumer nodes * refactor: specify stream type in handle_stream method and add peer in routing table * removed content routing * added default value of count for finding closest peers * added functions to find close peers * refactor: remove content routing and enhance peer discovery * added put value function * added get value function * fix: improve logging and handle key encoding in get_value method * refactor: remove ContentRouting import from __init__.py * refactor: improved basic kademlia example * added protobuf files * replaced json with protobuf * refactor: enhance peer discovery and routing logic in KadDHT * refactor: enhance Kademlia routing table to use PeerInfo objects and improve peer management * refactor: enhance peer addition logic to utilize PeerInfo objects in routing table * feat: implement content provider functionality in Kademlia DHT * refactor: update value store to use datetime for validity management * refactor: update RoutingTable initialization to include host reference * refactor: enhance KBucket and RoutingTable for improved peer management and functionality * refactor: streamline peer discovery and value storage methods in KadDHT * refactor: update KadDHT and related classes for async peer management and enhanced value storage * refactor: enhance ProviderStore initialization and improve peer routing integration * test: add tests for Kademlia DHT functionality * fix linting issues * pydocstyle issues fixed * CICD pipeline issues solved * fix: update docstring format for find_peer method * refactor: improve logging and remove unused code in DHT implementation * refactor: clean up logging and remove unused imports in DHT and test files * Refactor logging setup and improve DHT stream handling with varint length prefixes * Update bootstrap peer handling in basic_dht example and refactor peer routing to accept string addresses * Enhance peer querying in Kademlia DHT by implementing parallel queries using Trio. * Enhance peer querying by adding deduplication checks * Refactor DHT implementation to use varint for length prefixes and enhance logging for better traceability * Add base58 encoding for value storage and enhance logging in basic_dht example * Refactor Kademlia DHT to support server/client modes * Added unit tests * Refactor documentation to fixsome warning * Add unit tests and remove outdated tests * Fixed precommit errora * Refactor error handling test to raise StringParseError for invalid bootstrap addresses * Add libp2p.kad_dht to the list of subpackages in documentation * Fix expiration and republish checks to use inclusive comparison * Add __init__.py file to libp2p.kad_dht.pb package * Refactor get value and put value to run in parallel with query timeout * Refactor provider message handling to use parallel processing with timeout * Add methods for provider store in KadDHT class * Refactor KadDHT and ProviderStore methods to improve type hints and enhance parallel processing * Add documentation for libp2p.kad_dht.pb module. * Update documentation for libp2p.kad_dht package to include subpackages and correct formatting * Fix formatting in documentation for libp2p.kad_dht package by correcting the subpackage reference * Fix header formatting in libp2p.kad_dht.pb documentation * Change log level from info to debug for various logging statements. * fix CICD issues (post revamp) * fixed value store unit test * Refactored kademlia example * Refactor Kademlia example: enhance logging, improve bootstrap node connection, and streamline server address handling * removed bootstrap module * Refactor Kademlia DHT example and core modules: enhance logging, remove unused code, and improve peer handling * Added docs of kad dht example * Update server address log file path to use the script's directory * Refactor: Introduce DHTMode enum for clearer mode management * moved xor_distance function to utils.py * Enhance logging in ValueStore and KadDHT: include decoded value in debug logs and update parameter description for validity * Add handling for closest peers in GET_VALUE response when value is not found * Handled failure scenario for PUT_VALUE * Remove kademlia demo from project scripts and contributing documentation * spelling and logging --------- Co-authored-by: pacrob <5199899+pacrob@users.noreply.github.com>
This commit is contained in:
30
libp2p/kad_dht/__init__.py
Normal file
30
libp2p/kad_dht/__init__.py
Normal file
@ -0,0 +1,30 @@
|
||||
"""
|
||||
Kademlia DHT implementation for py-libp2p.
|
||||
|
||||
This module provides a Distributed Hash Table (DHT) implementation
|
||||
based on the Kademlia protocol.
|
||||
"""
|
||||
|
||||
from .kad_dht import (
|
||||
KadDHT,
|
||||
)
|
||||
from .peer_routing import (
|
||||
PeerRouting,
|
||||
)
|
||||
from .routing_table import (
|
||||
RoutingTable,
|
||||
)
|
||||
from .utils import (
|
||||
create_key_from_binary,
|
||||
)
|
||||
from .value_store import (
|
||||
ValueStore,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"KadDHT",
|
||||
"RoutingTable",
|
||||
"PeerRouting",
|
||||
"ValueStore",
|
||||
"create_key_from_binary",
|
||||
]
|
||||
616
libp2p/kad_dht/kad_dht.py
Normal file
616
libp2p/kad_dht/kad_dht.py
Normal file
@ -0,0 +1,616 @@
|
||||
"""
|
||||
Kademlia DHT implementation for py-libp2p.
|
||||
|
||||
This module provides a complete Distributed Hash Table (DHT)
|
||||
implementation based on the Kademlia algorithm and protocol.
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
import logging
|
||||
import time
|
||||
|
||||
from multiaddr import (
|
||||
Multiaddr,
|
||||
)
|
||||
import trio
|
||||
import varint
|
||||
|
||||
from libp2p.abc import (
|
||||
IHost,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.network.stream.net_stream import (
|
||||
INetStream,
|
||||
)
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
from libp2p.tools.async_service import (
|
||||
Service,
|
||||
)
|
||||
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
from .peer_routing import (
|
||||
PeerRouting,
|
||||
)
|
||||
from .provider_store import (
|
||||
ProviderStore,
|
||||
)
|
||||
from .routing_table import (
|
||||
RoutingTable,
|
||||
)
|
||||
from .value_store import (
|
||||
ValueStore,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("kademlia-example.kad_dht")
|
||||
# logger = logging.getLogger("libp2p.kademlia")
|
||||
# Default parameters
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
ROUTING_TABLE_REFRESH_INTERVAL = 1 * 60 # 1 min in seconds for testing
|
||||
TTL = 24 * 60 * 60 # 24 hours in seconds
|
||||
ALPHA = 3
|
||||
QUERY_TIMEOUT = 10 # seconds
|
||||
|
||||
|
||||
class DHTMode(Enum):
|
||||
"""DHT operation modes."""
|
||||
|
||||
CLIENT = "CLIENT"
|
||||
SERVER = "SERVER"
|
||||
|
||||
|
||||
class KadDHT(Service):
|
||||
"""
|
||||
Kademlia DHT implementation for libp2p.
|
||||
|
||||
This class provides a DHT implementation that combines routing table management,
|
||||
peer discovery, content routing, and value storage.
|
||||
"""
|
||||
|
||||
def __init__(self, host: IHost, mode: DHTMode):
|
||||
"""
|
||||
Initialize a new Kademlia DHT node.
|
||||
|
||||
:param host: The libp2p host.
|
||||
:param mode: The mode of host (Client or Server) - must be DHTMode enum
|
||||
"""
|
||||
super().__init__()
|
||||
|
||||
self.host = host
|
||||
self.local_peer_id = host.get_id()
|
||||
|
||||
# Validate that mode is a DHTMode enum
|
||||
if not isinstance(mode, DHTMode):
|
||||
raise TypeError(f"mode must be DHTMode enum, got {type(mode)}")
|
||||
|
||||
self.mode = mode
|
||||
|
||||
# Initialize the routing table
|
||||
self.routing_table = RoutingTable(self.local_peer_id, self.host)
|
||||
|
||||
# Initialize peer routing
|
||||
self.peer_routing = PeerRouting(host, self.routing_table)
|
||||
|
||||
# Initialize value store
|
||||
self.value_store = ValueStore(host=host, local_peer_id=self.local_peer_id)
|
||||
|
||||
# Initialize provider store with host and peer_routing references
|
||||
self.provider_store = ProviderStore(host=host, peer_routing=self.peer_routing)
|
||||
|
||||
# Last time we republished provider records
|
||||
self._last_provider_republish = time.time()
|
||||
|
||||
# Set protocol handlers
|
||||
host.set_stream_handler(PROTOCOL_ID, self.handle_stream)
|
||||
|
||||
async def run(self) -> None:
|
||||
"""Run the DHT service."""
|
||||
logger.info(f"Starting Kademlia DHT with peer ID {self.local_peer_id}")
|
||||
|
||||
# Main service loop
|
||||
while self.manager.is_running:
|
||||
# Periodically refresh the routing table
|
||||
await self.refresh_routing_table()
|
||||
|
||||
# Check if it's time to republish provider records
|
||||
current_time = time.time()
|
||||
# await self._republish_provider_records()
|
||||
self._last_provider_republish = current_time
|
||||
|
||||
# Clean up expired values and provider records
|
||||
expired_values = self.value_store.cleanup_expired()
|
||||
if expired_values > 0:
|
||||
logger.debug(f"Cleaned up {expired_values} expired values")
|
||||
|
||||
self.provider_store.cleanup_expired()
|
||||
|
||||
# Wait before next maintenance cycle
|
||||
await trio.sleep(ROUTING_TABLE_REFRESH_INTERVAL)
|
||||
|
||||
async def switch_mode(self, new_mode: DHTMode) -> DHTMode:
|
||||
"""
|
||||
Switch the DHT mode.
|
||||
|
||||
:param new_mode: The new mode - must be DHTMode enum
|
||||
:return: The new mode as DHTMode enum
|
||||
"""
|
||||
# Validate that new_mode is a DHTMode enum
|
||||
if not isinstance(new_mode, DHTMode):
|
||||
raise TypeError(f"new_mode must be DHTMode enum, got {type(new_mode)}")
|
||||
|
||||
if new_mode == DHTMode.CLIENT:
|
||||
self.routing_table.cleanup_routing_table()
|
||||
self.mode = new_mode
|
||||
logger.info(f"Switched to {new_mode.value} mode")
|
||||
return self.mode
|
||||
|
||||
async def handle_stream(self, stream: INetStream) -> None:
|
||||
"""
|
||||
Handle an incoming DHT stream using varint length prefixes.
|
||||
"""
|
||||
if self.mode == DHTMode.CLIENT:
|
||||
stream.close
|
||||
return
|
||||
peer_id = stream.muxed_conn.peer_id
|
||||
logger.debug(f"Received DHT stream from peer {peer_id}")
|
||||
await self.add_peer(peer_id)
|
||||
logger.debug(f"Added peer {peer_id} to routing table")
|
||||
|
||||
try:
|
||||
# Read varint-prefixed length for the message
|
||||
length_prefix = b""
|
||||
while True:
|
||||
byte = await stream.read(1)
|
||||
if not byte:
|
||||
logger.warning("Stream closed while reading varint length")
|
||||
await stream.close()
|
||||
return
|
||||
length_prefix += byte
|
||||
if byte[0] & 0x80 == 0:
|
||||
break
|
||||
msg_length = varint.decode_bytes(length_prefix)
|
||||
|
||||
# Read the message bytes
|
||||
msg_bytes = await stream.read(msg_length)
|
||||
if len(msg_bytes) < msg_length:
|
||||
logger.warning("Failed to read full message from stream")
|
||||
await stream.close()
|
||||
return
|
||||
|
||||
try:
|
||||
# Parse as protobuf
|
||||
message = Message()
|
||||
message.ParseFromString(msg_bytes)
|
||||
logger.debug(
|
||||
f"Received DHT message from {peer_id}, type: {message.type}"
|
||||
)
|
||||
|
||||
# Handle FIND_NODE message
|
||||
if message.type == Message.MessageType.FIND_NODE:
|
||||
# Get target key directly from protobuf
|
||||
target_key = message.key
|
||||
|
||||
# Find closest peers to the target key
|
||||
closest_peers = self.routing_table.find_local_closest_peers(
|
||||
target_key, 20
|
||||
)
|
||||
logger.debug(f"Found {len(closest_peers)} peers close to target")
|
||||
|
||||
# Build response message with protobuf
|
||||
response = Message()
|
||||
response.type = Message.MessageType.FIND_NODE
|
||||
|
||||
# Add closest peers to response
|
||||
for peer in closest_peers:
|
||||
# Skip if the peer is the requester
|
||||
if peer == peer_id:
|
||||
continue
|
||||
|
||||
# Add peer to closerPeers field
|
||||
peer_proto = response.closerPeers.add()
|
||||
peer_proto.id = peer.to_bytes()
|
||||
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
|
||||
|
||||
# Add addresses if available
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer)
|
||||
if addrs:
|
||||
for addr in addrs:
|
||||
peer_proto.addrs.append(addr.to_bytes())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Serialize and send response
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
await stream.write(response_bytes)
|
||||
logger.debug(
|
||||
f"Sent FIND_NODE response with{len(response.closerPeers)} peers"
|
||||
)
|
||||
|
||||
# Handle ADD_PROVIDER message
|
||||
elif message.type == Message.MessageType.ADD_PROVIDER:
|
||||
# Process ADD_PROVIDER
|
||||
key = message.key
|
||||
logger.debug(f"Received ADD_PROVIDER for key {key.hex()}")
|
||||
|
||||
# Extract provider information
|
||||
for provider_proto in message.providerPeers:
|
||||
try:
|
||||
# Validate that the provider is the sender
|
||||
provider_id = ID(provider_proto.id)
|
||||
if provider_id != peer_id:
|
||||
logger.warning(
|
||||
f"Provider ID {provider_id} doesn't"
|
||||
f"match sender {peer_id}, ignoring"
|
||||
)
|
||||
continue
|
||||
|
||||
# Convert addresses to Multiaddr
|
||||
addrs = []
|
||||
for addr_bytes in provider_proto.addrs:
|
||||
try:
|
||||
addrs.append(Multiaddr(addr_bytes))
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse address: {e}")
|
||||
|
||||
# Add to provider store
|
||||
provider_info = PeerInfo(provider_id, addrs)
|
||||
self.provider_store.add_provider(key, provider_info)
|
||||
logger.debug(
|
||||
f"Added provider {provider_id} for key {key.hex()}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to process provider info: {e}")
|
||||
|
||||
# Send acknowledgement
|
||||
response = Message()
|
||||
response.type = Message.MessageType.ADD_PROVIDER
|
||||
response.key = key
|
||||
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
await stream.write(response_bytes)
|
||||
logger.debug("Sent ADD_PROVIDER acknowledgement")
|
||||
|
||||
# Handle GET_PROVIDERS message
|
||||
elif message.type == Message.MessageType.GET_PROVIDERS:
|
||||
# Process GET_PROVIDERS
|
||||
key = message.key
|
||||
logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}")
|
||||
|
||||
# Find providers for the key
|
||||
providers = self.provider_store.get_providers(key)
|
||||
logger.debug(
|
||||
f"Found {len(providers)} providers for key {key.hex()}"
|
||||
)
|
||||
|
||||
# Create response
|
||||
response = Message()
|
||||
response.type = Message.MessageType.GET_PROVIDERS
|
||||
response.key = key
|
||||
|
||||
# Add provider information to response
|
||||
for provider_info in providers:
|
||||
provider_proto = response.providerPeers.add()
|
||||
provider_proto.id = provider_info.peer_id.to_bytes()
|
||||
provider_proto.connection = Message.ConnectionType.CAN_CONNECT
|
||||
|
||||
# Add addresses if available
|
||||
for addr in provider_info.addrs:
|
||||
provider_proto.addrs.append(addr.to_bytes())
|
||||
|
||||
# Also include closest peers if we don't have providers
|
||||
if not providers:
|
||||
closest_peers = self.routing_table.find_local_closest_peers(
|
||||
key, 20
|
||||
)
|
||||
logger.debug(
|
||||
f"No providers found, including {len(closest_peers)}"
|
||||
"closest peers"
|
||||
)
|
||||
|
||||
for peer in closest_peers:
|
||||
# Skip if peer is the requester
|
||||
if peer == peer_id:
|
||||
continue
|
||||
|
||||
peer_proto = response.closerPeers.add()
|
||||
peer_proto.id = peer.to_bytes()
|
||||
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
|
||||
|
||||
# Add addresses if available
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer)
|
||||
for addr in addrs:
|
||||
peer_proto.addrs.append(addr.to_bytes())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Serialize and send response
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
await stream.write(response_bytes)
|
||||
logger.debug("Sent GET_PROVIDERS response")
|
||||
|
||||
# Handle GET_VALUE message
|
||||
elif message.type == Message.MessageType.GET_VALUE:
|
||||
# Process GET_VALUE
|
||||
key = message.key
|
||||
logger.debug(f"Received GET_VALUE request for key {key.hex()}")
|
||||
|
||||
value = self.value_store.get(key)
|
||||
if value:
|
||||
logger.debug(f"Found value for key {key.hex()}")
|
||||
|
||||
# Create response using protobuf
|
||||
response = Message()
|
||||
response.type = Message.MessageType.GET_VALUE
|
||||
|
||||
# Create record
|
||||
response.key = key
|
||||
response.record.key = key
|
||||
response.record.value = value
|
||||
response.record.timeReceived = str(time.time())
|
||||
|
||||
# Serialize and send response
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
await stream.write(response_bytes)
|
||||
logger.debug("Sent GET_VALUE response")
|
||||
else:
|
||||
logger.debug(f"No value found for key {key.hex()}")
|
||||
|
||||
# Create response with closest peers when no value is found
|
||||
response = Message()
|
||||
response.type = Message.MessageType.GET_VALUE
|
||||
response.key = key
|
||||
|
||||
# Add closest peers to key
|
||||
closest_peers = self.routing_table.find_local_closest_peers(
|
||||
key, 20
|
||||
)
|
||||
logger.debug(
|
||||
"No value found,"
|
||||
f"including {len(closest_peers)} closest peers"
|
||||
)
|
||||
|
||||
for peer in closest_peers:
|
||||
# Skip if peer is the requester
|
||||
if peer == peer_id:
|
||||
continue
|
||||
|
||||
peer_proto = response.closerPeers.add()
|
||||
peer_proto.id = peer.to_bytes()
|
||||
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
|
||||
|
||||
# Add addresses if available
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer)
|
||||
for addr in addrs:
|
||||
peer_proto.addrs.append(addr.to_bytes())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Serialize and send response
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
await stream.write(response_bytes)
|
||||
logger.debug("Sent GET_VALUE response with closest peers")
|
||||
|
||||
# Handle PUT_VALUE message
|
||||
elif message.type == Message.MessageType.PUT_VALUE and message.HasField(
|
||||
"record"
|
||||
):
|
||||
# Process PUT_VALUE
|
||||
key = message.record.key
|
||||
value = message.record.value
|
||||
success = False
|
||||
try:
|
||||
if not (key and value):
|
||||
raise ValueError(
|
||||
"Missing key or value in PUT_VALUE message"
|
||||
)
|
||||
|
||||
self.value_store.put(key, value)
|
||||
logger.debug(f"Stored value {value.hex()} for key {key.hex()}")
|
||||
success = True
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to store value {value.hex()} for key "
|
||||
f"{key.hex()}: {e}"
|
||||
)
|
||||
finally:
|
||||
# Send acknowledgement
|
||||
response = Message()
|
||||
response.type = Message.MessageType.PUT_VALUE
|
||||
if success:
|
||||
response.key = key
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(varint.encode(len(response_bytes)))
|
||||
await stream.write(response_bytes)
|
||||
logger.debug("Sent PUT_VALUE acknowledgement")
|
||||
|
||||
except Exception as proto_err:
|
||||
logger.warning(f"Failed to parse protobuf message: {proto_err}")
|
||||
|
||||
await stream.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling DHT stream: {e}")
|
||||
await stream.close()
|
||||
|
||||
async def refresh_routing_table(self) -> None:
|
||||
"""Refresh the routing table."""
|
||||
logger.debug("Refreshing routing table")
|
||||
await self.peer_routing.refresh_routing_table()
|
||||
|
||||
# Peer routing methods
|
||||
|
||||
async def find_peer(self, peer_id: ID) -> PeerInfo | None:
|
||||
"""
|
||||
Find a peer with the given ID.
|
||||
"""
|
||||
logger.debug(f"Finding peer: {peer_id}")
|
||||
return await self.peer_routing.find_peer(peer_id)
|
||||
|
||||
# Value storage and retrieval methods
|
||||
|
||||
async def put_value(self, key: bytes, value: bytes) -> None:
|
||||
"""
|
||||
Store a value in the DHT.
|
||||
"""
|
||||
logger.debug(f"Storing value for key {key.hex()}")
|
||||
|
||||
# 1. Store locally first
|
||||
self.value_store.put(key, value)
|
||||
try:
|
||||
decoded_value = value.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
decoded_value = value.hex()
|
||||
logger.debug(
|
||||
f"Stored value locally for key {key.hex()} with value {decoded_value}"
|
||||
)
|
||||
|
||||
# 2. Get closest peers, excluding self
|
||||
closest_peers = [
|
||||
peer
|
||||
for peer in self.routing_table.find_local_closest_peers(key)
|
||||
if peer != self.local_peer_id
|
||||
]
|
||||
logger.debug(f"Found {len(closest_peers)} peers to store value at")
|
||||
|
||||
# 3. Store at remote peers in batches of ALPHA, in parallel
|
||||
stored_count = 0
|
||||
for i in range(0, len(closest_peers), ALPHA):
|
||||
batch = closest_peers[i : i + ALPHA]
|
||||
batch_results = [False] * len(batch)
|
||||
|
||||
async def store_one(idx: int, peer: ID) -> None:
|
||||
try:
|
||||
with trio.move_on_after(QUERY_TIMEOUT):
|
||||
success = await self.value_store._store_at_peer(
|
||||
peer, key, value
|
||||
)
|
||||
batch_results[idx] = success
|
||||
if success:
|
||||
logger.debug(f"Stored value at peer {peer}")
|
||||
else:
|
||||
logger.debug(f"Failed to store value at peer {peer}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Error storing value at peer {peer}: {e}")
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
for idx, peer in enumerate(batch):
|
||||
nursery.start_soon(store_one, idx, peer)
|
||||
|
||||
stored_count += sum(batch_results)
|
||||
|
||||
logger.info(f"Successfully stored value at {stored_count} peers")
|
||||
|
||||
async def get_value(self, key: bytes) -> bytes | None:
|
||||
logger.debug(f"Getting value for key: {key.hex()}")
|
||||
|
||||
# 1. Check local store first
|
||||
value = self.value_store.get(key)
|
||||
if value:
|
||||
logger.debug("Found value locally")
|
||||
return value
|
||||
|
||||
# 2. Get closest peers, excluding self
|
||||
closest_peers = [
|
||||
peer
|
||||
for peer in self.routing_table.find_local_closest_peers(key)
|
||||
if peer != self.local_peer_id
|
||||
]
|
||||
logger.debug(f"Searching {len(closest_peers)} peers for value")
|
||||
|
||||
# 3. Query ALPHA peers at a time in parallel
|
||||
for i in range(0, len(closest_peers), ALPHA):
|
||||
batch = closest_peers[i : i + ALPHA]
|
||||
found_value = None
|
||||
|
||||
async def query_one(peer: ID) -> None:
|
||||
nonlocal found_value
|
||||
try:
|
||||
with trio.move_on_after(QUERY_TIMEOUT):
|
||||
value = await self.value_store._get_from_peer(peer, key)
|
||||
if value is not None and found_value is None:
|
||||
found_value = value
|
||||
logger.debug(f"Found value at peer {peer}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Error querying peer {peer}: {e}")
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
for peer in batch:
|
||||
nursery.start_soon(query_one, peer)
|
||||
|
||||
if found_value is not None:
|
||||
self.value_store.put(key, found_value)
|
||||
logger.info("Successfully retrieved value from network")
|
||||
return found_value
|
||||
|
||||
# 4. Not found
|
||||
logger.warning(f"Value not found for key {key.hex()}")
|
||||
return None
|
||||
|
||||
# Add these methods in the Utility methods section
|
||||
|
||||
# Utility methods
|
||||
|
||||
async def add_peer(self, peer_id: ID) -> bool:
|
||||
"""
|
||||
Add a peer to the routing table.
|
||||
|
||||
params: peer_id: The peer ID to add.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if peer was added or updated, False otherwise.
|
||||
|
||||
"""
|
||||
return await self.routing_table.add_peer(peer_id)
|
||||
|
||||
async def provide(self, key: bytes) -> bool:
|
||||
"""
|
||||
Reference to provider_store.provide for convenience.
|
||||
"""
|
||||
return await self.provider_store.provide(key)
|
||||
|
||||
async def find_providers(self, key: bytes, count: int = 20) -> list[PeerInfo]:
|
||||
"""
|
||||
Reference to provider_store.find_providers for convenience.
|
||||
"""
|
||||
return await self.provider_store.find_providers(key, count)
|
||||
|
||||
def get_routing_table_size(self) -> int:
|
||||
"""
|
||||
Get the number of peers in the routing table.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
Number of peers.
|
||||
|
||||
"""
|
||||
return self.routing_table.size()
|
||||
|
||||
def get_value_store_size(self) -> int:
|
||||
"""
|
||||
Get the number of items in the value store.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
Number of items.
|
||||
|
||||
"""
|
||||
return self.value_store.size()
|
||||
0
libp2p/kad_dht/pb/__init__.py
Normal file
0
libp2p/kad_dht/pb/__init__.py
Normal file
38
libp2p/kad_dht/pb/kademlia.proto
Normal file
38
libp2p/kad_dht/pb/kademlia.proto
Normal file
@ -0,0 +1,38 @@
|
||||
syntax = "proto3";
|
||||
|
||||
message Record {
|
||||
bytes key = 1;
|
||||
bytes value = 2;
|
||||
string timeReceived = 5;
|
||||
};
|
||||
|
||||
message Message {
|
||||
enum MessageType {
|
||||
PUT_VALUE = 0;
|
||||
GET_VALUE = 1;
|
||||
ADD_PROVIDER = 2;
|
||||
GET_PROVIDERS = 3;
|
||||
FIND_NODE = 4;
|
||||
PING = 5;
|
||||
}
|
||||
|
||||
enum ConnectionType {
|
||||
NOT_CONNECTED = 0;
|
||||
CONNECTED = 1;
|
||||
CAN_CONNECT = 2;
|
||||
CANNOT_CONNECT = 3;
|
||||
}
|
||||
|
||||
message Peer {
|
||||
bytes id = 1;
|
||||
repeated bytes addrs = 2;
|
||||
ConnectionType connection = 3;
|
||||
}
|
||||
|
||||
MessageType type = 1;
|
||||
int32 clusterLevelRaw = 10;
|
||||
bytes key = 2;
|
||||
Record record = 3;
|
||||
repeated Peer closerPeers = 8;
|
||||
repeated Peer providerPeers = 9;
|
||||
}
|
||||
33
libp2p/kad_dht/pb/kademlia_pb2.py
Normal file
33
libp2p/kad_dht/pb/kademlia_pb2.py
Normal file
@ -0,0 +1,33 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: libp2p/kad_dht/pb/kademlia.proto
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
from google.protobuf.internal import builder as _builder
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n libp2p/kad_dht/pb/kademlia.proto\":\n\x06Record\x12\x0b\n\x03key\x18\x01 \x01(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\x12\x14\n\x0ctimeReceived\x18\x05 \x01(\t\"\xca\x03\n\x07Message\x12\"\n\x04type\x18\x01 \x01(\x0e\x32\x14.Message.MessageType\x12\x17\n\x0f\x63lusterLevelRaw\x18\n \x01(\x05\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x17\n\x06record\x18\x03 \x01(\x0b\x32\x07.Record\x12\"\n\x0b\x63loserPeers\x18\x08 \x03(\x0b\x32\r.Message.Peer\x12$\n\rproviderPeers\x18\t \x03(\x0b\x32\r.Message.Peer\x1aN\n\x04Peer\x12\n\n\x02id\x18\x01 \x01(\x0c\x12\r\n\x05\x61\x64\x64rs\x18\x02 \x03(\x0c\x12+\n\nconnection\x18\x03 \x01(\x0e\x32\x17.Message.ConnectionType\"i\n\x0bMessageType\x12\r\n\tPUT_VALUE\x10\x00\x12\r\n\tGET_VALUE\x10\x01\x12\x10\n\x0c\x41\x44\x44_PROVIDER\x10\x02\x12\x11\n\rGET_PROVIDERS\x10\x03\x12\r\n\tFIND_NODE\x10\x04\x12\x08\n\x04PING\x10\x05\"W\n\x0e\x43onnectionType\x12\x11\n\rNOT_CONNECTED\x10\x00\x12\r\n\tCONNECTED\x10\x01\x12\x0f\n\x0b\x43\x41N_CONNECT\x10\x02\x12\x12\n\x0e\x43\x41NNOT_CONNECT\x10\x03\x62\x06proto3')
|
||||
|
||||
_globals = globals()
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.kad_dht.pb.kademlia_pb2', _globals)
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
DESCRIPTOR._options = None
|
||||
_globals['_RECORD']._serialized_start=36
|
||||
_globals['_RECORD']._serialized_end=94
|
||||
_globals['_MESSAGE']._serialized_start=97
|
||||
_globals['_MESSAGE']._serialized_end=555
|
||||
_globals['_MESSAGE_PEER']._serialized_start=281
|
||||
_globals['_MESSAGE_PEER']._serialized_end=359
|
||||
_globals['_MESSAGE_MESSAGETYPE']._serialized_start=361
|
||||
_globals['_MESSAGE_MESSAGETYPE']._serialized_end=466
|
||||
_globals['_MESSAGE_CONNECTIONTYPE']._serialized_start=468
|
||||
_globals['_MESSAGE_CONNECTIONTYPE']._serialized_end=555
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
133
libp2p/kad_dht/pb/kademlia_pb2.pyi
Normal file
133
libp2p/kad_dht/pb/kademlia_pb2.pyi
Normal file
@ -0,0 +1,133 @@
|
||||
"""
|
||||
@generated by mypy-protobuf. Do not edit manually!
|
||||
isort:skip_file
|
||||
"""
|
||||
|
||||
import builtins
|
||||
import collections.abc
|
||||
import google.protobuf.descriptor
|
||||
import google.protobuf.internal.containers
|
||||
import google.protobuf.internal.enum_type_wrapper
|
||||
import google.protobuf.message
|
||||
import sys
|
||||
import typing
|
||||
|
||||
if sys.version_info >= (3, 10):
|
||||
import typing as typing_extensions
|
||||
else:
|
||||
import typing_extensions
|
||||
|
||||
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
|
||||
|
||||
@typing.final
|
||||
class Record(google.protobuf.message.Message):
|
||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||
|
||||
KEY_FIELD_NUMBER: builtins.int
|
||||
VALUE_FIELD_NUMBER: builtins.int
|
||||
TIMERECEIVED_FIELD_NUMBER: builtins.int
|
||||
key: builtins.bytes
|
||||
value: builtins.bytes
|
||||
timeReceived: builtins.str
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
key: builtins.bytes = ...,
|
||||
value: builtins.bytes = ...,
|
||||
timeReceived: builtins.str = ...,
|
||||
) -> None: ...
|
||||
def ClearField(self, field_name: typing.Literal["key", b"key", "timeReceived", b"timeReceived", "value", b"value"]) -> None: ...
|
||||
|
||||
global___Record = Record
|
||||
|
||||
@typing.final
|
||||
class Message(google.protobuf.message.Message):
|
||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||
|
||||
class _MessageType:
|
||||
ValueType = typing.NewType("ValueType", builtins.int)
|
||||
V: typing_extensions.TypeAlias = ValueType
|
||||
|
||||
class _MessageTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Message._MessageType.ValueType], builtins.type):
|
||||
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
|
||||
PUT_VALUE: Message._MessageType.ValueType # 0
|
||||
GET_VALUE: Message._MessageType.ValueType # 1
|
||||
ADD_PROVIDER: Message._MessageType.ValueType # 2
|
||||
GET_PROVIDERS: Message._MessageType.ValueType # 3
|
||||
FIND_NODE: Message._MessageType.ValueType # 4
|
||||
PING: Message._MessageType.ValueType # 5
|
||||
|
||||
class MessageType(_MessageType, metaclass=_MessageTypeEnumTypeWrapper): ...
|
||||
PUT_VALUE: Message.MessageType.ValueType # 0
|
||||
GET_VALUE: Message.MessageType.ValueType # 1
|
||||
ADD_PROVIDER: Message.MessageType.ValueType # 2
|
||||
GET_PROVIDERS: Message.MessageType.ValueType # 3
|
||||
FIND_NODE: Message.MessageType.ValueType # 4
|
||||
PING: Message.MessageType.ValueType # 5
|
||||
|
||||
class _ConnectionType:
|
||||
ValueType = typing.NewType("ValueType", builtins.int)
|
||||
V: typing_extensions.TypeAlias = ValueType
|
||||
|
||||
class _ConnectionTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Message._ConnectionType.ValueType], builtins.type):
|
||||
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
|
||||
NOT_CONNECTED: Message._ConnectionType.ValueType # 0
|
||||
CONNECTED: Message._ConnectionType.ValueType # 1
|
||||
CAN_CONNECT: Message._ConnectionType.ValueType # 2
|
||||
CANNOT_CONNECT: Message._ConnectionType.ValueType # 3
|
||||
|
||||
class ConnectionType(_ConnectionType, metaclass=_ConnectionTypeEnumTypeWrapper): ...
|
||||
NOT_CONNECTED: Message.ConnectionType.ValueType # 0
|
||||
CONNECTED: Message.ConnectionType.ValueType # 1
|
||||
CAN_CONNECT: Message.ConnectionType.ValueType # 2
|
||||
CANNOT_CONNECT: Message.ConnectionType.ValueType # 3
|
||||
|
||||
@typing.final
|
||||
class Peer(google.protobuf.message.Message):
|
||||
DESCRIPTOR: google.protobuf.descriptor.Descriptor
|
||||
|
||||
ID_FIELD_NUMBER: builtins.int
|
||||
ADDRS_FIELD_NUMBER: builtins.int
|
||||
CONNECTION_FIELD_NUMBER: builtins.int
|
||||
id: builtins.bytes
|
||||
connection: global___Message.ConnectionType.ValueType
|
||||
@property
|
||||
def addrs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ...
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
id: builtins.bytes = ...,
|
||||
addrs: collections.abc.Iterable[builtins.bytes] | None = ...,
|
||||
connection: global___Message.ConnectionType.ValueType = ...,
|
||||
) -> None: ...
|
||||
def ClearField(self, field_name: typing.Literal["addrs", b"addrs", "connection", b"connection", "id", b"id"]) -> None: ...
|
||||
|
||||
TYPE_FIELD_NUMBER: builtins.int
|
||||
CLUSTERLEVELRAW_FIELD_NUMBER: builtins.int
|
||||
KEY_FIELD_NUMBER: builtins.int
|
||||
RECORD_FIELD_NUMBER: builtins.int
|
||||
CLOSERPEERS_FIELD_NUMBER: builtins.int
|
||||
PROVIDERPEERS_FIELD_NUMBER: builtins.int
|
||||
type: global___Message.MessageType.ValueType
|
||||
clusterLevelRaw: builtins.int
|
||||
key: builtins.bytes
|
||||
@property
|
||||
def record(self) -> global___Record: ...
|
||||
@property
|
||||
def closerPeers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message.Peer]: ...
|
||||
@property
|
||||
def providerPeers(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message.Peer]: ...
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
type: global___Message.MessageType.ValueType = ...,
|
||||
clusterLevelRaw: builtins.int = ...,
|
||||
key: builtins.bytes = ...,
|
||||
record: global___Record | None = ...,
|
||||
closerPeers: collections.abc.Iterable[global___Message.Peer] | None = ...,
|
||||
providerPeers: collections.abc.Iterable[global___Message.Peer] | None = ...,
|
||||
) -> None: ...
|
||||
def HasField(self, field_name: typing.Literal["record", b"record"]) -> builtins.bool: ...
|
||||
def ClearField(self, field_name: typing.Literal["closerPeers", b"closerPeers", "clusterLevelRaw", b"clusterLevelRaw", "key", b"key", "providerPeers", b"providerPeers", "record", b"record", "type", b"type"]) -> None: ...
|
||||
|
||||
global___Message = Message
|
||||
418
libp2p/kad_dht/peer_routing.py
Normal file
418
libp2p/kad_dht/peer_routing.py
Normal file
@ -0,0 +1,418 @@
|
||||
"""
|
||||
Peer routing implementation for Kademlia DHT.
|
||||
|
||||
This module implements the peer routing interface using Kademlia's algorithm
|
||||
to efficiently locate peers in a distributed network.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import trio
|
||||
import varint
|
||||
|
||||
from libp2p.abc import (
|
||||
IHost,
|
||||
INetStream,
|
||||
IPeerRouting,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
from .routing_table import (
|
||||
RoutingTable,
|
||||
)
|
||||
from .utils import (
|
||||
sort_peer_ids_by_distance,
|
||||
)
|
||||
|
||||
# logger = logging.getLogger("libp2p.kademlia.peer_routing")
|
||||
logger = logging.getLogger("kademlia-example.peer_routing")
|
||||
|
||||
# Constants for the Kademlia algorithm
|
||||
ALPHA = 3 # Concurrency parameter
|
||||
MAX_PEER_LOOKUP_ROUNDS = 20 # Maximum number of rounds in peer lookup
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
|
||||
|
||||
class PeerRouting(IPeerRouting):
|
||||
"""
|
||||
Implementation of peer routing using the Kademlia algorithm.
|
||||
|
||||
This class provides methods to find peers in the DHT network
|
||||
and helps maintain the routing table.
|
||||
"""
|
||||
|
||||
def __init__(self, host: IHost, routing_table: RoutingTable):
|
||||
"""
|
||||
Initialize the peer routing service.
|
||||
|
||||
:param host: The libp2p host
|
||||
:param routing_table: The Kademlia routing table
|
||||
|
||||
"""
|
||||
self.host = host
|
||||
self.routing_table = routing_table
|
||||
self.protocol_id = PROTOCOL_ID
|
||||
|
||||
async def find_peer(self, peer_id: ID) -> PeerInfo | None:
|
||||
"""
|
||||
Find a peer with the given ID.
|
||||
|
||||
:param peer_id: The ID of the peer to find
|
||||
|
||||
Returns
|
||||
-------
|
||||
Optional[PeerInfo]
|
||||
The peer information if found, None otherwise
|
||||
|
||||
"""
|
||||
# Check if this is actually our peer ID
|
||||
if peer_id == self.host.get_id():
|
||||
try:
|
||||
# Return our own peer info
|
||||
return PeerInfo(peer_id, self.host.get_addrs())
|
||||
except Exception:
|
||||
logger.exception("Error getting our own peer info")
|
||||
return None
|
||||
|
||||
# First check if the peer is in our routing table
|
||||
peer_info = self.routing_table.get_peer_info(peer_id)
|
||||
if peer_info:
|
||||
logger.debug(f"Found peer {peer_id} in routing table")
|
||||
return peer_info
|
||||
|
||||
# Then check if the peer is in our peerstore
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer_id)
|
||||
if addrs:
|
||||
logger.debug(f"Found peer {peer_id} in peerstore")
|
||||
return PeerInfo(peer_id, addrs)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# If not found locally, search the network
|
||||
try:
|
||||
closest_peers = await self.find_closest_peers_network(peer_id.to_bytes())
|
||||
logger.info(f"Closest peers found: {closest_peers}")
|
||||
|
||||
# Check if we found the peer we're looking for
|
||||
for found_peer in closest_peers:
|
||||
if found_peer == peer_id:
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(found_peer)
|
||||
if addrs:
|
||||
return PeerInfo(found_peer, addrs)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error searching for peer {peer_id}: {e}")
|
||||
|
||||
# Not found
|
||||
logger.info(f"Peer {peer_id} not found")
|
||||
return None
|
||||
|
||||
async def _query_single_peer_for_closest(
|
||||
self, peer: ID, target_key: bytes, new_peers: list[ID]
|
||||
) -> None:
|
||||
"""
|
||||
Query a single peer for closest peers and append results to the shared list.
|
||||
|
||||
params: peer : ID
|
||||
The peer to query
|
||||
params: target_key : bytes
|
||||
The target key to find closest peers for
|
||||
params: new_peers : list[ID]
|
||||
Shared list to append results to
|
||||
|
||||
"""
|
||||
try:
|
||||
result = await self._query_peer_for_closest(peer, target_key)
|
||||
# Add deduplication to prevent duplicate peers
|
||||
for peer_id in result:
|
||||
if peer_id not in new_peers:
|
||||
new_peers.append(peer_id)
|
||||
logger.debug(
|
||||
"Queried peer %s for closest peers, got %d results (%d unique)",
|
||||
peer,
|
||||
len(result),
|
||||
len([p for p in result if p not in new_peers[: -len(result)]]),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Query to peer {peer} failed: {e}")
|
||||
|
||||
async def find_closest_peers_network(
|
||||
self, target_key: bytes, count: int = 20
|
||||
) -> list[ID]:
|
||||
"""
|
||||
Find the closest peers to a target key in the entire network.
|
||||
|
||||
Performs an iterative lookup by querying peers for their closest peers.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[ID]
|
||||
Closest peer IDs
|
||||
|
||||
"""
|
||||
# Start with closest peers from our routing table
|
||||
closest_peers = self.routing_table.find_local_closest_peers(target_key, count)
|
||||
logger.debug("Local closest peers: %d found", len(closest_peers))
|
||||
queried_peers: set[ID] = set()
|
||||
rounds = 0
|
||||
|
||||
# Return early if we have no peers to start with
|
||||
if not closest_peers:
|
||||
logger.warning("No local peers available for network lookup")
|
||||
return []
|
||||
|
||||
# Iterative lookup until convergence
|
||||
while rounds < MAX_PEER_LOOKUP_ROUNDS:
|
||||
rounds += 1
|
||||
logger.debug(f"Lookup round {rounds}/{MAX_PEER_LOOKUP_ROUNDS}")
|
||||
|
||||
# Find peers we haven't queried yet
|
||||
peers_to_query = [p for p in closest_peers if p not in queried_peers]
|
||||
if not peers_to_query:
|
||||
logger.debug("No more unqueried peers available, ending lookup")
|
||||
break # No more peers to query
|
||||
|
||||
# Query these peers for their closest peers to target
|
||||
peers_batch = peers_to_query[:ALPHA] # Limit to ALPHA peers at a time
|
||||
|
||||
# Mark these peers as queried before we actually query them
|
||||
for peer in peers_batch:
|
||||
queried_peers.add(peer)
|
||||
|
||||
# Run queries in parallel for this batch using trio nursery
|
||||
new_peers: list[ID] = [] # Shared array to collect all results
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
for peer in peers_batch:
|
||||
nursery.start_soon(
|
||||
self._query_single_peer_for_closest, peer, target_key, new_peers
|
||||
)
|
||||
|
||||
# If we got no new peers, we're done
|
||||
if not new_peers:
|
||||
logger.debug("No new peers discovered in this round, ending lookup")
|
||||
break
|
||||
|
||||
# Update our list of closest peers
|
||||
all_candidates = closest_peers + new_peers
|
||||
old_closest_peers = closest_peers[:]
|
||||
closest_peers = sort_peer_ids_by_distance(target_key, all_candidates)[
|
||||
:count
|
||||
]
|
||||
logger.debug(f"Updated closest peers count: {len(closest_peers)}")
|
||||
|
||||
# Check if we made any progress (found closer peers)
|
||||
if closest_peers == old_closest_peers:
|
||||
logger.debug("No improvement in closest peers, ending lookup")
|
||||
break
|
||||
|
||||
logger.info(
|
||||
f"Network lookup completed after {rounds} rounds, "
|
||||
f"found {len(closest_peers)} peers"
|
||||
)
|
||||
return closest_peers
|
||||
|
||||
async def _query_peer_for_closest(self, peer: ID, target_key: bytes) -> list[ID]:
|
||||
"""
|
||||
Query a peer for their closest peers
|
||||
to the target key using varint length prefix
|
||||
"""
|
||||
stream = None
|
||||
results = []
|
||||
try:
|
||||
# Add the peer to our routing table regardless of query outcome
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer)
|
||||
if addrs:
|
||||
peer_info = PeerInfo(peer, addrs)
|
||||
await self.routing_table.add_peer(peer_info)
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to add peer {peer} to routing table: {e}")
|
||||
|
||||
# Open a stream to the peer using the Kademlia protocol
|
||||
logger.debug(f"Opening stream to {peer} for closest peers query")
|
||||
try:
|
||||
stream = await self.host.new_stream(peer, [self.protocol_id])
|
||||
logger.debug(f"Stream opened to {peer}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to open stream to {peer}: {e}")
|
||||
return []
|
||||
|
||||
# Create and send FIND_NODE request using protobuf
|
||||
find_node_msg = Message()
|
||||
find_node_msg.type = Message.MessageType.FIND_NODE
|
||||
find_node_msg.key = target_key # Set target key directly as bytes
|
||||
|
||||
# Serialize and send the protobuf message with varint length prefix
|
||||
proto_bytes = find_node_msg.SerializeToString()
|
||||
logger.debug(
|
||||
f"Sending FIND_NODE: {proto_bytes.hex()} (len={len(proto_bytes)})"
|
||||
)
|
||||
await stream.write(varint.encode(len(proto_bytes)))
|
||||
await stream.write(proto_bytes)
|
||||
|
||||
# Read varint-prefixed response length
|
||||
length_bytes = b""
|
||||
while True:
|
||||
b = await stream.read(1)
|
||||
if not b:
|
||||
logger.warning(
|
||||
"Error reading varint length from stream: connection closed"
|
||||
)
|
||||
return []
|
||||
length_bytes += b
|
||||
if b[0] & 0x80 == 0:
|
||||
break
|
||||
response_length = varint.decode_bytes(length_bytes)
|
||||
|
||||
# Read response data
|
||||
response_bytes = b""
|
||||
remaining = response_length
|
||||
while remaining > 0:
|
||||
chunk = await stream.read(remaining)
|
||||
if not chunk:
|
||||
logger.debug(f"Connection closed by peer {peer} while reading data")
|
||||
return []
|
||||
response_bytes += chunk
|
||||
remaining -= len(chunk)
|
||||
|
||||
# Parse the protobuf response
|
||||
response_msg = Message()
|
||||
response_msg.ParseFromString(response_bytes)
|
||||
logger.debug(
|
||||
"Received response from %s with %d peers",
|
||||
peer,
|
||||
len(response_msg.closerPeers),
|
||||
)
|
||||
|
||||
# Process closest peers from response
|
||||
if response_msg.type == Message.MessageType.FIND_NODE:
|
||||
for peer_data in response_msg.closerPeers:
|
||||
new_peer_id = ID(peer_data.id)
|
||||
if new_peer_id not in results:
|
||||
results.append(new_peer_id)
|
||||
if peer_data.addrs:
|
||||
from multiaddr import (
|
||||
Multiaddr,
|
||||
)
|
||||
|
||||
addrs = [Multiaddr(addr) for addr in peer_data.addrs]
|
||||
self.host.get_peerstore().add_addrs(new_peer_id, addrs, 3600)
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error querying peer {peer} for closest: {e}")
|
||||
|
||||
finally:
|
||||
if stream:
|
||||
await stream.close()
|
||||
return results
|
||||
|
||||
async def _handle_kad_stream(self, stream: INetStream) -> None:
|
||||
"""
|
||||
Handle incoming Kademlia protocol streams.
|
||||
|
||||
params: stream: The incoming stream
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
|
||||
"""
|
||||
try:
|
||||
# Read message length
|
||||
length_bytes = await stream.read(4)
|
||||
if not length_bytes:
|
||||
return
|
||||
|
||||
message_length = int.from_bytes(length_bytes, byteorder="big")
|
||||
|
||||
# Read message
|
||||
message_bytes = await stream.read(message_length)
|
||||
if not message_bytes:
|
||||
return
|
||||
|
||||
# Parse protobuf message
|
||||
kad_message = Message()
|
||||
try:
|
||||
kad_message.ParseFromString(message_bytes)
|
||||
|
||||
if kad_message.type == Message.MessageType.FIND_NODE:
|
||||
# Get target key directly from protobuf message
|
||||
target_key = kad_message.key
|
||||
|
||||
# Find closest peers to target
|
||||
closest_peers = self.routing_table.find_local_closest_peers(
|
||||
target_key, 20
|
||||
)
|
||||
|
||||
# Create protobuf response
|
||||
response = Message()
|
||||
response.type = Message.MessageType.FIND_NODE
|
||||
|
||||
# Add peer information to response
|
||||
for peer_id in closest_peers:
|
||||
peer_proto = response.closerPeers.add()
|
||||
peer_proto.id = peer_id.to_bytes()
|
||||
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
|
||||
|
||||
# Add addresses if available
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer_id)
|
||||
if addrs:
|
||||
for addr in addrs:
|
||||
peer_proto.addrs.append(addr.to_bytes())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Send response
|
||||
response_bytes = response.SerializeToString()
|
||||
await stream.write(len(response_bytes).to_bytes(4, byteorder="big"))
|
||||
await stream.write(response_bytes)
|
||||
|
||||
except Exception as parse_err:
|
||||
logger.error(f"Failed to parse protocol buffer message: {parse_err}")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error handling Kademlia stream: {e}")
|
||||
finally:
|
||||
await stream.close()
|
||||
|
||||
async def refresh_routing_table(self) -> None:
|
||||
"""
|
||||
Refresh the routing table by performing lookups for random keys.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
|
||||
"""
|
||||
logger.info("Refreshing routing table")
|
||||
|
||||
# Perform a lookup for ourselves to populate the routing table
|
||||
local_id = self.host.get_id()
|
||||
closest_peers = await self.find_closest_peers_network(local_id.to_bytes())
|
||||
|
||||
# Add discovered peers to routing table
|
||||
for peer_id in closest_peers:
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer_id)
|
||||
if addrs:
|
||||
peer_info = PeerInfo(peer_id, addrs)
|
||||
await self.routing_table.add_peer(peer_info)
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to add discovered peer {peer_id}: {e}")
|
||||
575
libp2p/kad_dht/provider_store.py
Normal file
575
libp2p/kad_dht/provider_store.py
Normal file
@ -0,0 +1,575 @@
|
||||
"""
|
||||
Provider record storage for Kademlia DHT.
|
||||
|
||||
This module implements the storage for content provider records in the Kademlia DHT.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
)
|
||||
|
||||
from multiaddr import (
|
||||
Multiaddr,
|
||||
)
|
||||
import trio
|
||||
import varint
|
||||
|
||||
from libp2p.abc import (
|
||||
IHost,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
|
||||
# logger = logging.getLogger("libp2p.kademlia.provider_store")
|
||||
logger = logging.getLogger("kademlia-example.provider_store")
|
||||
|
||||
# Constants for provider records (based on IPFS standards)
|
||||
PROVIDER_RECORD_REPUBLISH_INTERVAL = 22 * 60 * 60 # 22 hours in seconds
|
||||
PROVIDER_RECORD_EXPIRATION_INTERVAL = 48 * 60 * 60 # 48 hours in seconds
|
||||
PROVIDER_ADDRESS_TTL = 30 * 60 # 30 minutes in seconds
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
ALPHA = 3 # Number of parallel queries/advertisements
|
||||
QUERY_TIMEOUT = 10 # Timeout for each query in seconds
|
||||
|
||||
|
||||
class ProviderRecord:
|
||||
"""
|
||||
A record for a content provider in the DHT.
|
||||
|
||||
Contains the peer information and timestamp.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
provider_info: PeerInfo,
|
||||
timestamp: float | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize a new provider record.
|
||||
|
||||
:param provider_info: The provider's peer information
|
||||
:param timestamp: Time this record was created/updated
|
||||
(defaults to current time)
|
||||
|
||||
"""
|
||||
self.provider_info = provider_info
|
||||
self.timestamp = timestamp or time.time()
|
||||
|
||||
def is_expired(self) -> bool:
|
||||
"""
|
||||
Check if this provider record has expired.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the record has expired
|
||||
|
||||
"""
|
||||
current_time = time.time()
|
||||
return (current_time - self.timestamp) >= PROVIDER_RECORD_EXPIRATION_INTERVAL
|
||||
|
||||
def should_republish(self) -> bool:
|
||||
"""
|
||||
Check if this provider record should be republished.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the record should be republished
|
||||
|
||||
"""
|
||||
current_time = time.time()
|
||||
return (current_time - self.timestamp) >= PROVIDER_RECORD_REPUBLISH_INTERVAL
|
||||
|
||||
@property
|
||||
def peer_id(self) -> ID:
|
||||
"""Get the provider's peer ID."""
|
||||
return self.provider_info.peer_id
|
||||
|
||||
@property
|
||||
def addresses(self) -> list[Multiaddr]:
|
||||
"""Get the provider's addresses."""
|
||||
return self.provider_info.addrs
|
||||
|
||||
|
||||
class ProviderStore:
|
||||
"""
|
||||
Store for content provider records in the Kademlia DHT.
|
||||
|
||||
Maps content keys to provider records, with support for expiration.
|
||||
"""
|
||||
|
||||
def __init__(self, host: IHost, peer_routing: Any = None) -> None:
|
||||
"""
|
||||
Initialize a new provider store.
|
||||
|
||||
:param host: The libp2p host instance (optional)
|
||||
:param peer_routing: The peer routing instance (optional)
|
||||
"""
|
||||
# Maps content keys to a dict of provider records (peer_id -> record)
|
||||
self.providers: dict[bytes, dict[str, ProviderRecord]] = {}
|
||||
self.host = host
|
||||
self.peer_routing = peer_routing
|
||||
self.providing_keys: set[bytes] = set()
|
||||
self.local_peer_id = host.get_id()
|
||||
|
||||
async def _republish_provider_records(self) -> None:
|
||||
"""Republish all provider records for content this node is providing."""
|
||||
# First, republish keys we're actively providing
|
||||
for key in self.providing_keys:
|
||||
logger.debug(f"Republishing provider record for key {key.hex()}")
|
||||
await self.provide(key)
|
||||
|
||||
# Also check for any records that should be republished
|
||||
time.time()
|
||||
for key, providers in self.providers.items():
|
||||
for peer_id_str, record in providers.items():
|
||||
# Only republish records for our own peer
|
||||
if self.local_peer_id and str(self.local_peer_id) == peer_id_str:
|
||||
if record.should_republish():
|
||||
logger.debug(
|
||||
f"Republishing old provider record for key {key.hex()}"
|
||||
)
|
||||
await self.provide(key)
|
||||
|
||||
async def provide(self, key: bytes) -> bool:
|
||||
"""
|
||||
Advertise that this node can provide a piece of content.
|
||||
|
||||
Finds the k closest peers to the key and sends them ADD_PROVIDER messages.
|
||||
|
||||
:param key: The content key (multihash) to advertise
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the advertisement was successful
|
||||
|
||||
"""
|
||||
if not self.host or not self.peer_routing:
|
||||
logger.error("Host or peer_routing not initialized, cannot provide content")
|
||||
return False
|
||||
|
||||
# Add to local provider store
|
||||
local_addrs = []
|
||||
for addr in self.host.get_addrs():
|
||||
local_addrs.append(addr)
|
||||
|
||||
local_peer_info = PeerInfo(self.host.get_id(), local_addrs)
|
||||
self.add_provider(key, local_peer_info)
|
||||
|
||||
# Track that we're providing this key
|
||||
self.providing_keys.add(key)
|
||||
|
||||
# Find the k closest peers to the key
|
||||
closest_peers = await self.peer_routing.find_closest_peers_network(key)
|
||||
logger.debug(
|
||||
"Found %d peers close to key %s for provider advertisement",
|
||||
len(closest_peers),
|
||||
key.hex(),
|
||||
)
|
||||
|
||||
# Send ADD_PROVIDER messages to these ALPHA peers in parallel.
|
||||
success_count = 0
|
||||
for i in range(0, len(closest_peers), ALPHA):
|
||||
batch = closest_peers[i : i + ALPHA]
|
||||
results: list[bool] = [False] * len(batch)
|
||||
|
||||
async def send_one(
|
||||
idx: int, peer_id: ID, results: list[bool] = results
|
||||
) -> None:
|
||||
if peer_id == self.local_peer_id:
|
||||
return
|
||||
try:
|
||||
with trio.move_on_after(QUERY_TIMEOUT):
|
||||
success = await self._send_add_provider(peer_id, key)
|
||||
results[idx] = success
|
||||
if not success:
|
||||
logger.warning(f"Failed to send ADD_PROVIDER to {peer_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error sending ADD_PROVIDER to {peer_id}: {e}")
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
for idx, peer_id in enumerate(batch):
|
||||
nursery.start_soon(send_one, idx, peer_id, results)
|
||||
success_count += sum(results)
|
||||
|
||||
logger.info(f"Successfully advertised to {success_count} peers")
|
||||
return success_count > 0
|
||||
|
||||
async def _send_add_provider(self, peer_id: ID, key: bytes) -> bool:
|
||||
"""
|
||||
Send ADD_PROVIDER message to a specific peer.
|
||||
|
||||
:param peer_id: The peer to send the message to
|
||||
:param key: The content key being provided
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the message was successfully sent and acknowledged
|
||||
|
||||
"""
|
||||
try:
|
||||
result = False
|
||||
# Open a stream to the peer
|
||||
stream = await self.host.new_stream(peer_id, [TProtocol(PROTOCOL_ID)])
|
||||
|
||||
# Get our addresses to include in the message
|
||||
addrs = []
|
||||
for addr in self.host.get_addrs():
|
||||
addrs.append(addr.to_bytes())
|
||||
|
||||
# Create the ADD_PROVIDER message
|
||||
message = Message()
|
||||
message.type = Message.MessageType.ADD_PROVIDER
|
||||
message.key = key
|
||||
|
||||
# Add our provider info
|
||||
provider = message.providerPeers.add()
|
||||
provider.id = self.local_peer_id.to_bytes()
|
||||
provider.addrs.extend(addrs)
|
||||
|
||||
# Serialize and send the message
|
||||
proto_bytes = message.SerializeToString()
|
||||
await stream.write(varint.encode(len(proto_bytes)))
|
||||
await stream.write(proto_bytes)
|
||||
logger.debug(f"Sent ADD_PROVIDER to {peer_id} for key {key.hex()}")
|
||||
# Read response length prefix
|
||||
length_bytes = b""
|
||||
while True:
|
||||
logger.debug("Reading response length prefix in add provider")
|
||||
b = await stream.read(1)
|
||||
if not b:
|
||||
return False
|
||||
length_bytes += b
|
||||
if b[0] & 0x80 == 0:
|
||||
break
|
||||
|
||||
response_length = varint.decode_bytes(length_bytes)
|
||||
# Read response data
|
||||
response_bytes = b""
|
||||
remaining = response_length
|
||||
while remaining > 0:
|
||||
chunk = await stream.read(remaining)
|
||||
if not chunk:
|
||||
return False
|
||||
response_bytes += chunk
|
||||
remaining -= len(chunk)
|
||||
|
||||
# Parse response
|
||||
response = Message()
|
||||
response.ParseFromString(response_bytes)
|
||||
|
||||
# Check response type
|
||||
response.type == Message.MessageType.ADD_PROVIDER
|
||||
if response.type:
|
||||
result = True
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error sending ADD_PROVIDER to {peer_id}: {e}")
|
||||
|
||||
finally:
|
||||
await stream.close()
|
||||
return result
|
||||
|
||||
async def find_providers(self, key: bytes, count: int = 20) -> list[PeerInfo]:
|
||||
"""
|
||||
Find content providers for a given key.
|
||||
|
||||
:param key: The content key to look for
|
||||
:param count: Maximum number of providers to return
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[PeerInfo]
|
||||
List of content providers
|
||||
|
||||
"""
|
||||
if not self.host or not self.peer_routing:
|
||||
logger.error("Host or peer_routing not initialized, cannot find providers")
|
||||
return []
|
||||
|
||||
# Check local provider store first
|
||||
local_providers = self.get_providers(key)
|
||||
if local_providers:
|
||||
logger.debug(
|
||||
f"Found {len(local_providers)} providers locally for {key.hex()}"
|
||||
)
|
||||
return local_providers[:count]
|
||||
logger.debug("local providers are %s", local_providers)
|
||||
|
||||
# Find the closest peers to the key
|
||||
closest_peers = await self.peer_routing.find_closest_peers_network(key)
|
||||
logger.debug(
|
||||
f"Searching {len(closest_peers)} peers for providers of {key.hex()}"
|
||||
)
|
||||
|
||||
# Query these peers for providers in batches of ALPHA, in parallel, with timeout
|
||||
all_providers = []
|
||||
for i in range(0, len(closest_peers), ALPHA):
|
||||
batch = closest_peers[i : i + ALPHA]
|
||||
batch_results: list[list[PeerInfo]] = [[] for _ in batch]
|
||||
|
||||
async def get_one(
|
||||
idx: int,
|
||||
peer_id: ID,
|
||||
batch_results: list[list[PeerInfo]] = batch_results,
|
||||
) -> None:
|
||||
if peer_id == self.local_peer_id:
|
||||
return
|
||||
try:
|
||||
with trio.move_on_after(QUERY_TIMEOUT):
|
||||
providers = await self._get_providers_from_peer(peer_id, key)
|
||||
if providers:
|
||||
for provider in providers:
|
||||
self.add_provider(key, provider)
|
||||
batch_results[idx] = providers
|
||||
else:
|
||||
logger.debug(f"No providers found at peer {peer_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get providers from {peer_id}: {e}")
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
for idx, peer_id in enumerate(batch):
|
||||
nursery.start_soon(get_one, idx, peer_id, batch_results)
|
||||
|
||||
for providers in batch_results:
|
||||
all_providers.extend(providers)
|
||||
if len(all_providers) >= count:
|
||||
return all_providers[:count]
|
||||
|
||||
return all_providers[:count]
|
||||
|
||||
async def _get_providers_from_peer(self, peer_id: ID, key: bytes) -> list[PeerInfo]:
|
||||
"""
|
||||
Get content providers from a specific peer.
|
||||
|
||||
:param peer_id: The peer to query
|
||||
:param key: The content key to look for
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[PeerInfo]
|
||||
List of provider information
|
||||
|
||||
"""
|
||||
providers: list[PeerInfo] = []
|
||||
try:
|
||||
# Open a stream to the peer
|
||||
stream = await self.host.new_stream(peer_id, [TProtocol(PROTOCOL_ID)])
|
||||
|
||||
try:
|
||||
# Create the GET_PROVIDERS message
|
||||
message = Message()
|
||||
message.type = Message.MessageType.GET_PROVIDERS
|
||||
message.key = key
|
||||
|
||||
# Serialize and send the message
|
||||
proto_bytes = message.SerializeToString()
|
||||
await stream.write(varint.encode(len(proto_bytes)))
|
||||
await stream.write(proto_bytes)
|
||||
|
||||
# Read response length prefix
|
||||
length_bytes = b""
|
||||
while True:
|
||||
b = await stream.read(1)
|
||||
if not b:
|
||||
return []
|
||||
length_bytes += b
|
||||
if b[0] & 0x80 == 0:
|
||||
break
|
||||
|
||||
response_length = varint.decode_bytes(length_bytes)
|
||||
# Read response data
|
||||
response_bytes = b""
|
||||
remaining = response_length
|
||||
while remaining > 0:
|
||||
chunk = await stream.read(remaining)
|
||||
if not chunk:
|
||||
return []
|
||||
response_bytes += chunk
|
||||
remaining -= len(chunk)
|
||||
|
||||
# Parse response
|
||||
response = Message()
|
||||
response.ParseFromString(response_bytes)
|
||||
|
||||
# Check response type
|
||||
if response.type != Message.MessageType.GET_PROVIDERS:
|
||||
return []
|
||||
|
||||
# Extract provider information
|
||||
providers = []
|
||||
for provider_proto in response.providerPeers:
|
||||
try:
|
||||
# Create peer ID from bytes
|
||||
provider_id = ID(provider_proto.id)
|
||||
|
||||
# Convert addresses to Multiaddr
|
||||
addrs = []
|
||||
for addr_bytes in provider_proto.addrs:
|
||||
try:
|
||||
addrs.append(Multiaddr(addr_bytes))
|
||||
except Exception:
|
||||
pass # Skip invalid addresses
|
||||
|
||||
# Create PeerInfo and add to result
|
||||
providers.append(PeerInfo(provider_id, addrs))
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse provider info: {e}")
|
||||
|
||||
finally:
|
||||
await stream.close()
|
||||
return providers
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error getting providers from {peer_id}: {e}")
|
||||
return []
|
||||
|
||||
def add_provider(self, key: bytes, provider: PeerInfo) -> None:
|
||||
"""
|
||||
Add a provider for a given content key.
|
||||
|
||||
:param key: The content key
|
||||
:param provider: The provider's peer information
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
|
||||
"""
|
||||
# Initialize providers for this key if needed
|
||||
if key not in self.providers:
|
||||
self.providers[key] = {}
|
||||
|
||||
# Add or update the provider record
|
||||
peer_id_str = str(provider.peer_id) # Use string representation as dict key
|
||||
self.providers[key][peer_id_str] = ProviderRecord(
|
||||
provider_info=provider, timestamp=time.time()
|
||||
)
|
||||
logger.debug(f"Added provider {provider.peer_id} for key {key.hex()}")
|
||||
|
||||
def get_providers(self, key: bytes) -> list[PeerInfo]:
|
||||
"""
|
||||
Get all providers for a given content key.
|
||||
|
||||
:param key: The content key
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[PeerInfo]
|
||||
List of providers for the key
|
||||
|
||||
"""
|
||||
if key not in self.providers:
|
||||
return []
|
||||
|
||||
# Collect valid provider records (not expired)
|
||||
result = []
|
||||
current_time = time.time()
|
||||
expired_peers = []
|
||||
|
||||
for peer_id_str, record in self.providers[key].items():
|
||||
# Check if the record has expired
|
||||
if current_time - record.timestamp > PROVIDER_RECORD_EXPIRATION_INTERVAL:
|
||||
expired_peers.append(peer_id_str)
|
||||
continue
|
||||
|
||||
# Use addresses only if they haven't expired
|
||||
addresses = []
|
||||
if current_time - record.timestamp <= PROVIDER_ADDRESS_TTL:
|
||||
addresses = record.addresses
|
||||
|
||||
# Create PeerInfo and add to results
|
||||
result.append(PeerInfo(record.peer_id, addresses))
|
||||
|
||||
# Clean up expired records
|
||||
for peer_id in expired_peers:
|
||||
del self.providers[key][peer_id]
|
||||
|
||||
# Remove the key if no providers left
|
||||
if not self.providers[key]:
|
||||
del self.providers[key]
|
||||
|
||||
return result
|
||||
|
||||
def cleanup_expired(self) -> None:
|
||||
"""Remove expired provider records."""
|
||||
current_time = time.time()
|
||||
expired_keys = []
|
||||
|
||||
for key, providers in self.providers.items():
|
||||
expired_providers = []
|
||||
|
||||
for peer_id_str, record in providers.items():
|
||||
if (
|
||||
current_time - record.timestamp
|
||||
> PROVIDER_RECORD_EXPIRATION_INTERVAL
|
||||
):
|
||||
expired_providers.append(peer_id_str)
|
||||
logger.debug(
|
||||
f"Removing expired provider {peer_id_str} for key {key.hex()}"
|
||||
)
|
||||
|
||||
# Remove expired providers
|
||||
for peer_id in expired_providers:
|
||||
del providers[peer_id]
|
||||
|
||||
# Track empty keys for removal
|
||||
if not providers:
|
||||
expired_keys.append(key)
|
||||
|
||||
# Remove empty keys
|
||||
for key in expired_keys:
|
||||
del self.providers[key]
|
||||
logger.debug(f"Removed key with no providers: {key.hex()}")
|
||||
|
||||
def get_provided_keys(self, peer_id: ID) -> list[bytes]:
|
||||
"""
|
||||
Get all content keys provided by a specific peer.
|
||||
|
||||
:param peer_id: The peer ID to look for
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[bytes]
|
||||
List of content keys provided by the peer
|
||||
|
||||
"""
|
||||
peer_id_str = str(peer_id)
|
||||
result = []
|
||||
|
||||
for key, providers in self.providers.items():
|
||||
if peer_id_str in providers:
|
||||
result.append(key)
|
||||
|
||||
return result
|
||||
|
||||
def size(self) -> int:
|
||||
"""
|
||||
Get the total number of provider records in the store.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
Total number of provider records across all keys
|
||||
|
||||
"""
|
||||
total = 0
|
||||
for providers in self.providers.values():
|
||||
total += len(providers)
|
||||
return total
|
||||
601
libp2p/kad_dht/routing_table.py
Normal file
601
libp2p/kad_dht/routing_table.py
Normal file
@ -0,0 +1,601 @@
|
||||
"""
|
||||
Kademlia DHT routing table implementation.
|
||||
"""
|
||||
|
||||
from collections import (
|
||||
OrderedDict,
|
||||
)
|
||||
import logging
|
||||
import time
|
||||
|
||||
import trio
|
||||
|
||||
from libp2p.abc import (
|
||||
IHost,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.kad_dht.utils import xor_distance
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
|
||||
# logger = logging.getLogger("libp2p.kademlia.routing_table")
|
||||
logger = logging.getLogger("kademlia-example.routing_table")
|
||||
|
||||
# Default parameters
|
||||
BUCKET_SIZE = 20 # k in the Kademlia paper
|
||||
MAXIMUM_BUCKETS = 256 # Maximum number of buckets (for 256-bit keys)
|
||||
PEER_REFRESH_INTERVAL = 60 # Interval to refresh peers in seconds
|
||||
STALE_PEER_THRESHOLD = 3600 # Time in seconds after which a peer is considered stale
|
||||
|
||||
|
||||
class KBucket:
|
||||
"""
|
||||
A k-bucket implementation for the Kademlia DHT.
|
||||
|
||||
Each k-bucket stores up to k (BUCKET_SIZE) peers, sorted by least-recently seen.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: IHost,
|
||||
bucket_size: int = BUCKET_SIZE,
|
||||
min_range: int = 0,
|
||||
max_range: int = 2**256,
|
||||
):
|
||||
"""
|
||||
Initialize a new k-bucket.
|
||||
|
||||
:param host: The host this bucket belongs to
|
||||
:param bucket_size: Maximum number of peers to store in the bucket
|
||||
:param min_range: Lower boundary of the bucket's key range (inclusive)
|
||||
:param max_range: Upper boundary of the bucket's key range (exclusive)
|
||||
|
||||
"""
|
||||
self.bucket_size = bucket_size
|
||||
self.host = host
|
||||
self.min_range = min_range
|
||||
self.max_range = max_range
|
||||
# Store PeerInfo objects along with last-seen timestamp
|
||||
self.peers: OrderedDict[ID, tuple[PeerInfo, float]] = OrderedDict()
|
||||
|
||||
def peer_ids(self) -> list[ID]:
|
||||
"""Get all peer IDs in the bucket."""
|
||||
return list(self.peers.keys())
|
||||
|
||||
def peer_infos(self) -> list[PeerInfo]:
|
||||
"""Get all PeerInfo objects in the bucket."""
|
||||
return [info for info, _ in self.peers.values()]
|
||||
|
||||
def get_oldest_peer(self) -> ID | None:
|
||||
"""Get the least-recently seen peer."""
|
||||
if not self.peers:
|
||||
return None
|
||||
return next(iter(self.peers.keys()))
|
||||
|
||||
async def add_peer(self, peer_info: PeerInfo) -> bool:
|
||||
"""
|
||||
Add a peer to the bucket. Returns True if the peer was added or updated,
|
||||
False if the bucket is full.
|
||||
"""
|
||||
current_time = time.time()
|
||||
peer_id = peer_info.peer_id
|
||||
|
||||
# If peer is already in the bucket, move it to the end (most recently seen)
|
||||
if peer_id in self.peers:
|
||||
self.refresh_peer_last_seen(peer_id)
|
||||
return True
|
||||
|
||||
# If bucket has space, add the peer
|
||||
if len(self.peers) < self.bucket_size:
|
||||
self.peers[peer_id] = (peer_info, current_time)
|
||||
return True
|
||||
|
||||
# If bucket is full, we need to replace the least-recently seen peer
|
||||
# Get the least-recently seen peer
|
||||
oldest_peer_id = self.get_oldest_peer()
|
||||
if oldest_peer_id is None:
|
||||
logger.warning("No oldest peer found when bucket is full")
|
||||
return False
|
||||
|
||||
# Check if the old peer is responsive to ping request
|
||||
try:
|
||||
# Try to ping the oldest peer, not the new peer
|
||||
response = await self._ping_peer(oldest_peer_id)
|
||||
if response:
|
||||
# If the old peer is still alive, we will not add the new peer
|
||||
logger.debug(
|
||||
"Old peer %s is still alive, cannot add new peer %s",
|
||||
oldest_peer_id,
|
||||
peer_id,
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
# If the old peer is unresponsive, we can replace it with the new peer
|
||||
logger.debug(
|
||||
"Old peer %s is unresponsive, replacing with new peer %s: %s",
|
||||
oldest_peer_id,
|
||||
peer_id,
|
||||
str(e),
|
||||
)
|
||||
self.peers.popitem(last=False) # Remove oldest peer
|
||||
self.peers[peer_id] = (peer_info, current_time)
|
||||
return True
|
||||
|
||||
# If we got here, the oldest peer responded but we couldn't add the new peer
|
||||
return False
|
||||
|
||||
def remove_peer(self, peer_id: ID) -> bool:
|
||||
"""
|
||||
Remove a peer from the bucket.
|
||||
Returns True if the peer was in the bucket, False otherwise.
|
||||
"""
|
||||
if peer_id in self.peers:
|
||||
del self.peers[peer_id]
|
||||
return True
|
||||
return False
|
||||
|
||||
def has_peer(self, peer_id: ID) -> bool:
|
||||
"""Check if the peer is in the bucket."""
|
||||
return peer_id in self.peers
|
||||
|
||||
def get_peer_info(self, peer_id: ID) -> PeerInfo | None:
|
||||
"""Get the PeerInfo for a given peer ID if it exists in the bucket."""
|
||||
if peer_id in self.peers:
|
||||
return self.peers[peer_id][0]
|
||||
return None
|
||||
|
||||
def size(self) -> int:
|
||||
"""Get the number of peers in the bucket."""
|
||||
return len(self.peers)
|
||||
|
||||
def get_stale_peers(self, stale_threshold_seconds: int = 3600) -> list[ID]:
|
||||
"""
|
||||
Get peers that haven't been pinged recently.
|
||||
|
||||
params: stale_threshold_seconds: Time in seconds
|
||||
params: after which a peer is considered stale
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[ID]
|
||||
List of peer IDs that need to be refreshed
|
||||
|
||||
"""
|
||||
current_time = time.time()
|
||||
stale_peers = []
|
||||
|
||||
for peer_id, (_, last_seen) in self.peers.items():
|
||||
if current_time - last_seen > stale_threshold_seconds:
|
||||
stale_peers.append(peer_id)
|
||||
|
||||
return stale_peers
|
||||
|
||||
async def _periodic_peer_refresh(self) -> None:
|
||||
"""Background task to periodically refresh peers"""
|
||||
try:
|
||||
while True:
|
||||
await trio.sleep(PEER_REFRESH_INTERVAL) # Check every minute
|
||||
|
||||
# Find stale peers (not pinged in last hour)
|
||||
stale_peers = self.get_stale_peers(
|
||||
stale_threshold_seconds=STALE_PEER_THRESHOLD
|
||||
)
|
||||
if stale_peers:
|
||||
logger.debug(f"Found {len(stale_peers)} stale peers to refresh")
|
||||
|
||||
for peer_id in stale_peers:
|
||||
try:
|
||||
# Try to ping the peer
|
||||
logger.debug("Pinging stale peer %s", peer_id)
|
||||
responce = await self._ping_peer(peer_id)
|
||||
if responce:
|
||||
# Update the last seen time
|
||||
self.refresh_peer_last_seen(peer_id)
|
||||
logger.debug(f"Refreshed peer {peer_id}")
|
||||
else:
|
||||
# If ping fails, remove the peer
|
||||
logger.debug(f"Failed to ping peer {peer_id}")
|
||||
self.remove_peer(peer_id)
|
||||
logger.info(f"Removed unresponsive peer {peer_id}")
|
||||
|
||||
logger.debug(f"Successfully refreshed peer {peer_id}")
|
||||
except Exception as e:
|
||||
# If ping fails, remove the peer
|
||||
logger.debug(
|
||||
"Failed to ping peer %s: %s",
|
||||
peer_id,
|
||||
e,
|
||||
)
|
||||
self.remove_peer(peer_id)
|
||||
logger.info(f"Removed unresponsive peer {peer_id}")
|
||||
except trio.Cancelled:
|
||||
logger.debug("Peer refresh task cancelled")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in peer refresh task: {e}", exc_info=True)
|
||||
|
||||
async def _ping_peer(self, peer_id: ID) -> bool:
|
||||
"""
|
||||
Ping a peer using protobuf message to check
|
||||
if it's still alive and update last seen time.
|
||||
|
||||
params: peer_id: The ID of the peer to ping
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if ping successful, False otherwise
|
||||
|
||||
"""
|
||||
result = False
|
||||
# Get peer info directly from the bucket
|
||||
peer_info = self.get_peer_info(peer_id)
|
||||
if not peer_info:
|
||||
raise ValueError(f"Peer {peer_id} not in bucket")
|
||||
|
||||
# Default protocol ID for Kademlia DHT
|
||||
protocol_id = TProtocol("/ipfs/kad/1.0.0")
|
||||
|
||||
try:
|
||||
# Open a stream to the peer with the DHT protocol
|
||||
stream = await self.host.new_stream(peer_id, [protocol_id])
|
||||
|
||||
try:
|
||||
# Create ping protobuf message
|
||||
ping_msg = Message()
|
||||
ping_msg.type = Message.PING # Use correct enum
|
||||
|
||||
# Serialize and send with length prefix (4 bytes big-endian)
|
||||
msg_bytes = ping_msg.SerializeToString()
|
||||
logger.debug(
|
||||
f"Sending PING message to {peer_id}, size: {len(msg_bytes)} bytes"
|
||||
)
|
||||
await stream.write(len(msg_bytes).to_bytes(4, byteorder="big"))
|
||||
await stream.write(msg_bytes)
|
||||
|
||||
# Wait for response with timeout
|
||||
with trio.move_on_after(2): # 2 second timeout
|
||||
# Read response length (4 bytes)
|
||||
length_bytes = await stream.read(4)
|
||||
if not length_bytes or len(length_bytes) < 4:
|
||||
logger.warning(f"Peer {peer_id} disconnected during ping")
|
||||
return False
|
||||
|
||||
msg_len = int.from_bytes(length_bytes, byteorder="big")
|
||||
if (
|
||||
msg_len <= 0 or msg_len > 1024 * 1024
|
||||
): # Sanity check on message size
|
||||
logger.warning(
|
||||
f"Invalid message length from {peer_id}: {msg_len}"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.debug(
|
||||
f"Receiving response from {peer_id}, size: {msg_len} bytes"
|
||||
)
|
||||
|
||||
# Read full message
|
||||
response_bytes = await stream.read(msg_len)
|
||||
if not response_bytes:
|
||||
logger.warning(f"Failed to read response from {peer_id}")
|
||||
return False
|
||||
|
||||
# Parse protobuf response
|
||||
response = Message()
|
||||
try:
|
||||
response.ParseFromString(response_bytes)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to parse protobuf response from {peer_id}: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
if response.type == Message.PING:
|
||||
# Update the last seen timestamp for this peer
|
||||
logger.debug(f"Successfully pinged peer {peer_id}")
|
||||
result = True
|
||||
return result
|
||||
|
||||
else:
|
||||
logger.warning(
|
||||
f"Unexpected response type from {peer_id}: {response.type}"
|
||||
)
|
||||
return False
|
||||
|
||||
# If we get here, the ping timed out
|
||||
logger.warning(f"Ping to peer {peer_id} timed out")
|
||||
return False
|
||||
|
||||
finally:
|
||||
await stream.close()
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error pinging peer {peer_id}: {str(e)}")
|
||||
return False
|
||||
|
||||
def refresh_peer_last_seen(self, peer_id: ID) -> bool:
|
||||
"""
|
||||
Update the last-seen timestamp for a peer in the bucket.
|
||||
|
||||
params: peer_id: The ID of the peer to refresh
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the peer was found and refreshed, False otherwise
|
||||
|
||||
"""
|
||||
if peer_id in self.peers:
|
||||
# Get current peer info and update the timestamp
|
||||
peer_info, _ = self.peers[peer_id]
|
||||
current_time = time.time()
|
||||
self.peers[peer_id] = (peer_info, current_time)
|
||||
# Move to end of ordered dict to mark as most recently seen
|
||||
self.peers.move_to_end(peer_id)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def key_in_range(self, key: bytes) -> bool:
|
||||
"""
|
||||
Check if a key is in the range of this bucket.
|
||||
|
||||
params: key: The key to check (bytes)
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the key is in range, False otherwise
|
||||
|
||||
"""
|
||||
key_int = int.from_bytes(key, byteorder="big")
|
||||
return self.min_range <= key_int < self.max_range
|
||||
|
||||
def split(self) -> tuple["KBucket", "KBucket"]:
|
||||
"""
|
||||
Split the bucket into two buckets.
|
||||
|
||||
Returns
|
||||
-------
|
||||
tuple
|
||||
(lower_bucket, upper_bucket)
|
||||
|
||||
"""
|
||||
midpoint = (self.min_range + self.max_range) // 2
|
||||
lower_bucket = KBucket(self.host, self.bucket_size, self.min_range, midpoint)
|
||||
upper_bucket = KBucket(self.host, self.bucket_size, midpoint, self.max_range)
|
||||
|
||||
# Redistribute peers
|
||||
for peer_id, (peer_info, timestamp) in self.peers.items():
|
||||
peer_key = int.from_bytes(peer_id.to_bytes(), byteorder="big")
|
||||
if peer_key < midpoint:
|
||||
lower_bucket.peers[peer_id] = (peer_info, timestamp)
|
||||
else:
|
||||
upper_bucket.peers[peer_id] = (peer_info, timestamp)
|
||||
|
||||
return lower_bucket, upper_bucket
|
||||
|
||||
|
||||
class RoutingTable:
|
||||
"""
|
||||
The Kademlia routing table maintains information on which peers to contact for any
|
||||
given peer ID in the network.
|
||||
"""
|
||||
|
||||
def __init__(self, local_id: ID, host: IHost) -> None:
|
||||
"""
|
||||
Initialize the routing table.
|
||||
|
||||
:param local_id: The ID of the local node.
|
||||
:param host: The host this routing table belongs to.
|
||||
|
||||
"""
|
||||
self.local_id = local_id
|
||||
self.host = host
|
||||
self.buckets = [KBucket(host, BUCKET_SIZE)]
|
||||
|
||||
async def add_peer(self, peer_obj: PeerInfo | ID) -> bool:
|
||||
"""
|
||||
Add a peer to the routing table.
|
||||
|
||||
:param peer_obj: Either PeerInfo object or peer ID to add
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool: True if the peer was added or updated, False otherwise
|
||||
|
||||
"""
|
||||
peer_id = None
|
||||
peer_info = None
|
||||
|
||||
try:
|
||||
# Handle different types of input
|
||||
if isinstance(peer_obj, PeerInfo):
|
||||
# Already have PeerInfo object
|
||||
peer_info = peer_obj
|
||||
peer_id = peer_obj.peer_id
|
||||
else:
|
||||
# Assume it's a peer ID
|
||||
peer_id = peer_obj
|
||||
# Try to get addresses from the peerstore if available
|
||||
try:
|
||||
addrs = self.host.get_peerstore().addrs(peer_id)
|
||||
if addrs:
|
||||
# Create PeerInfo object
|
||||
peer_info = PeerInfo(peer_id, addrs)
|
||||
else:
|
||||
logger.debug(
|
||||
"No addresses found for peer %s in peerstore, skipping",
|
||||
peer_id,
|
||||
)
|
||||
return False
|
||||
except Exception as peerstore_error:
|
||||
# Handle case where peer is not in peerstore yet
|
||||
logger.debug(
|
||||
"Peer %s not found in peerstore: %s, skipping",
|
||||
peer_id,
|
||||
str(peerstore_error),
|
||||
)
|
||||
return False
|
||||
|
||||
# Don't add ourselves
|
||||
if peer_id == self.local_id:
|
||||
return False
|
||||
|
||||
# Find the right bucket for this peer
|
||||
bucket = self.find_bucket(peer_id)
|
||||
|
||||
# Try to add to the bucket
|
||||
success = await bucket.add_peer(peer_info)
|
||||
if success:
|
||||
logger.debug(f"Successfully added peer {peer_id} to routing table")
|
||||
return success
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error adding peer {peer_obj} to routing table: {e}")
|
||||
return False
|
||||
|
||||
def remove_peer(self, peer_id: ID) -> bool:
|
||||
"""
|
||||
Remove a peer from the routing table.
|
||||
|
||||
:param peer_id: The ID of the peer to remove
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool: True if the peer was removed, False otherwise
|
||||
|
||||
"""
|
||||
bucket = self.find_bucket(peer_id)
|
||||
return bucket.remove_peer(peer_id)
|
||||
|
||||
def find_bucket(self, peer_id: ID) -> KBucket:
|
||||
"""
|
||||
Find the bucket that would contain the given peer ID or PeerInfo.
|
||||
|
||||
:param peer_obj: Either a peer ID or a PeerInfo object
|
||||
|
||||
Returns
|
||||
-------
|
||||
KBucket: The bucket for this peer
|
||||
|
||||
"""
|
||||
for bucket in self.buckets:
|
||||
if bucket.key_in_range(peer_id.to_bytes()):
|
||||
return bucket
|
||||
|
||||
return self.buckets[0]
|
||||
|
||||
def find_local_closest_peers(self, key: bytes, count: int = 20) -> list[ID]:
|
||||
"""
|
||||
Find the closest peers to a given key.
|
||||
|
||||
:param key: The key to find closest peers to (bytes)
|
||||
:param count: Maximum number of peers to return
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[ID]: List of peer IDs closest to the key
|
||||
|
||||
"""
|
||||
# Get all peers from all buckets
|
||||
all_peers = []
|
||||
for bucket in self.buckets:
|
||||
all_peers.extend(bucket.peer_ids())
|
||||
|
||||
# Sort by XOR distance to the key
|
||||
all_peers.sort(key=lambda p: xor_distance(p.to_bytes(), key))
|
||||
|
||||
return all_peers[:count]
|
||||
|
||||
def get_peer_ids(self) -> list[ID]:
|
||||
"""
|
||||
Get all peer IDs in the routing table.
|
||||
|
||||
Returns
|
||||
-------
|
||||
:param List[ID]: List of all peer IDs
|
||||
|
||||
"""
|
||||
peers = []
|
||||
for bucket in self.buckets:
|
||||
peers.extend(bucket.peer_ids())
|
||||
return peers
|
||||
|
||||
def get_peer_info(self, peer_id: ID) -> PeerInfo | None:
|
||||
"""
|
||||
Get the peer info for a specific peer.
|
||||
|
||||
:param peer_id: The ID of the peer to get info for
|
||||
|
||||
Returns
|
||||
-------
|
||||
PeerInfo: The peer info, or None if not found
|
||||
|
||||
"""
|
||||
bucket = self.find_bucket(peer_id)
|
||||
return bucket.get_peer_info(peer_id)
|
||||
|
||||
def peer_in_table(self, peer_id: ID) -> bool:
|
||||
"""
|
||||
Check if a peer is in the routing table.
|
||||
|
||||
:param peer_id: The ID of the peer to check
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool: True if the peer is in the routing table, False otherwise
|
||||
|
||||
"""
|
||||
bucket = self.find_bucket(peer_id)
|
||||
return bucket.has_peer(peer_id)
|
||||
|
||||
def size(self) -> int:
|
||||
"""
|
||||
Get the number of peers in the routing table.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int: Number of peers
|
||||
|
||||
"""
|
||||
count = 0
|
||||
for bucket in self.buckets:
|
||||
count += bucket.size()
|
||||
return count
|
||||
|
||||
def get_stale_peers(self, stale_threshold_seconds: int = 3600) -> list[ID]:
|
||||
"""
|
||||
Get all stale peers from all buckets
|
||||
|
||||
params: stale_threshold_seconds:
|
||||
Time in seconds after which a peer is considered stale
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[ID]
|
||||
List of stale peer IDs
|
||||
|
||||
"""
|
||||
stale_peers = []
|
||||
for bucket in self.buckets:
|
||||
stale_peers.extend(bucket.get_stale_peers(stale_threshold_seconds))
|
||||
return stale_peers
|
||||
|
||||
def cleanup_routing_table(self) -> None:
|
||||
"""
|
||||
Cleanup the routing table by removing all data.
|
||||
This is useful for resetting the routing table during tests or reinitialization.
|
||||
"""
|
||||
self.buckets = [KBucket(self.host, BUCKET_SIZE)]
|
||||
logger.info("Routing table cleaned up, all data removed.")
|
||||
117
libp2p/kad_dht/utils.py
Normal file
117
libp2p/kad_dht/utils.py
Normal file
@ -0,0 +1,117 @@
|
||||
"""
|
||||
Utility functions for Kademlia DHT implementation.
|
||||
"""
|
||||
|
||||
import base58
|
||||
import multihash
|
||||
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
|
||||
|
||||
def create_key_from_binary(binary_data: bytes) -> bytes:
|
||||
"""
|
||||
Creates a key for the DHT by hashing binary data with SHA-256.
|
||||
|
||||
params: binary_data: The binary data to hash.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bytes: The resulting key.
|
||||
|
||||
"""
|
||||
return multihash.digest(binary_data, "sha2-256").digest
|
||||
|
||||
|
||||
def xor_distance(key1: bytes, key2: bytes) -> int:
|
||||
"""
|
||||
Calculate the XOR distance between two keys.
|
||||
|
||||
params: key1: First key (bytes)
|
||||
params: key2: Second key (bytes)
|
||||
|
||||
Returns
|
||||
-------
|
||||
int: The XOR distance between the keys
|
||||
|
||||
"""
|
||||
# Ensure the inputs are bytes
|
||||
if not isinstance(key1, bytes) or not isinstance(key2, bytes):
|
||||
raise TypeError("Both key1 and key2 must be bytes objects")
|
||||
|
||||
# Convert to integers
|
||||
k1 = int.from_bytes(key1, byteorder="big")
|
||||
k2 = int.from_bytes(key2, byteorder="big")
|
||||
|
||||
# Calculate XOR distance
|
||||
return k1 ^ k2
|
||||
|
||||
|
||||
def bytes_to_base58(data: bytes) -> str:
|
||||
"""
|
||||
Convert bytes to base58 encoded string.
|
||||
|
||||
params: data: Input bytes
|
||||
|
||||
Returns
|
||||
-------
|
||||
str: Base58 encoded string
|
||||
|
||||
"""
|
||||
return base58.b58encode(data).decode("utf-8")
|
||||
|
||||
|
||||
def sort_peer_ids_by_distance(target_key: bytes, peer_ids: list[ID]) -> list[ID]:
|
||||
"""
|
||||
Sort a list of peer IDs by their distance to the target key.
|
||||
|
||||
params: target_key: The target key to measure distance from
|
||||
params: peer_ids: List of peer IDs to sort
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[ID]: Sorted list of peer IDs from closest to furthest
|
||||
|
||||
"""
|
||||
|
||||
def get_distance(peer_id: ID) -> int:
|
||||
# Hash the peer ID bytes to get a key for distance calculation
|
||||
peer_hash = multihash.digest(peer_id.to_bytes(), "sha2-256").digest
|
||||
return xor_distance(target_key, peer_hash)
|
||||
|
||||
return sorted(peer_ids, key=get_distance)
|
||||
|
||||
|
||||
def shared_prefix_len(first: bytes, second: bytes) -> int:
|
||||
"""
|
||||
Calculate the number of prefix bits shared by two byte sequences.
|
||||
|
||||
params: first: First byte sequence
|
||||
params: second: Second byte sequence
|
||||
|
||||
Returns
|
||||
-------
|
||||
int: Number of shared prefix bits
|
||||
|
||||
"""
|
||||
# Compare each byte to find the first bit difference
|
||||
common_length = 0
|
||||
for i in range(min(len(first), len(second))):
|
||||
byte_first = first[i]
|
||||
byte_second = second[i]
|
||||
|
||||
if byte_first == byte_second:
|
||||
common_length += 8
|
||||
else:
|
||||
# Find specific bit where they differ
|
||||
xor = byte_first ^ byte_second
|
||||
# Count leading zeros in the xor result
|
||||
for j in range(7, -1, -1):
|
||||
if (xor >> j) & 1 == 1:
|
||||
return common_length + (7 - j)
|
||||
|
||||
# This shouldn't be reached if xor != 0
|
||||
return common_length + 8
|
||||
|
||||
return common_length
|
||||
393
libp2p/kad_dht/value_store.py
Normal file
393
libp2p/kad_dht/value_store.py
Normal file
@ -0,0 +1,393 @@
|
||||
"""
|
||||
Value store implementation for Kademlia DHT.
|
||||
|
||||
Provides a way to store and retrieve key-value pairs with optional expiration.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
import varint
|
||||
|
||||
from libp2p.abc import (
|
||||
IHost,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
|
||||
# logger = logging.getLogger("libp2p.kademlia.value_store")
|
||||
logger = logging.getLogger("kademlia-example.value_store")
|
||||
|
||||
# Default time to live for values in seconds (24 hours)
|
||||
DEFAULT_TTL = 24 * 60 * 60
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
|
||||
|
||||
class ValueStore:
|
||||
"""
|
||||
Store for key-value pairs in a Kademlia DHT.
|
||||
|
||||
Values are stored with a timestamp and optional expiration time.
|
||||
"""
|
||||
|
||||
def __init__(self, host: IHost, local_peer_id: ID):
|
||||
"""
|
||||
Initialize an empty value store.
|
||||
|
||||
:param host: The libp2p host instance.
|
||||
:param local_peer_id: The local peer ID to ignore in peer requests.
|
||||
|
||||
"""
|
||||
# Store format: {key: (value, validity)}
|
||||
self.store: dict[bytes, tuple[bytes, float]] = {}
|
||||
# Store references to the host and local peer ID for making requests
|
||||
self.host = host
|
||||
self.local_peer_id = local_peer_id
|
||||
|
||||
def put(self, key: bytes, value: bytes, validity: float = 0.0) -> None:
|
||||
"""
|
||||
Store a value in the DHT.
|
||||
|
||||
:param key: The key to store the value under
|
||||
:param value: The value to store
|
||||
:param validity: validity in seconds before the value expires.
|
||||
Defaults to `DEFAULT_TTL` if set to 0.0.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
|
||||
"""
|
||||
if validity == 0.0:
|
||||
validity = time.time() + DEFAULT_TTL
|
||||
logger.debug(
|
||||
"Storing value for key %s... with validity %s", key.hex(), validity
|
||||
)
|
||||
self.store[key] = (value, validity)
|
||||
logger.debug(f"Stored value for key {key.hex()}")
|
||||
|
||||
async def _store_at_peer(self, peer_id: ID, key: bytes, value: bytes) -> bool:
|
||||
"""
|
||||
Store a value at a specific peer.
|
||||
|
||||
params: peer_id: The ID of the peer to store the value at
|
||||
params: key: The key to store
|
||||
params: value: The value to store
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the value was successfully stored, False otherwise
|
||||
|
||||
"""
|
||||
result = False
|
||||
stream = None
|
||||
try:
|
||||
# Don't try to store at ourselves
|
||||
if self.local_peer_id and peer_id == self.local_peer_id:
|
||||
result = True
|
||||
return result
|
||||
|
||||
if not self.host:
|
||||
logger.error("Host not initialized, cannot store value at peer")
|
||||
return False
|
||||
|
||||
logger.debug(f"Storing value for key {key.hex()} at peer {peer_id}")
|
||||
|
||||
# Open a stream to the peer
|
||||
stream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
|
||||
logger.debug(f"Opened stream to peer {peer_id}")
|
||||
|
||||
# Create the PUT_VALUE message with protobuf
|
||||
message = Message()
|
||||
message.type = Message.MessageType.PUT_VALUE
|
||||
|
||||
# Set message fields
|
||||
message.key = key
|
||||
message.record.key = key
|
||||
message.record.value = value
|
||||
message.record.timeReceived = str(time.time())
|
||||
|
||||
# Serialize and send the protobuf message with length prefix
|
||||
proto_bytes = message.SerializeToString()
|
||||
await stream.write(varint.encode(len(proto_bytes)))
|
||||
await stream.write(proto_bytes)
|
||||
logger.debug("Sent PUT_VALUE protobuf message with varint length")
|
||||
# Read varint-prefixed response length
|
||||
|
||||
length_bytes = b""
|
||||
while True:
|
||||
logger.debug("Reading varint length prefix for response...")
|
||||
b = await stream.read(1)
|
||||
if not b:
|
||||
logger.warning("Connection closed while reading varint length")
|
||||
return False
|
||||
length_bytes += b
|
||||
if b[0] & 0x80 == 0:
|
||||
break
|
||||
logger.debug(f"Received varint length bytes: {length_bytes.hex()}")
|
||||
response_length = varint.decode_bytes(length_bytes)
|
||||
logger.debug("Response length: %d bytes", response_length)
|
||||
# Read response data
|
||||
response_bytes = b""
|
||||
remaining = response_length
|
||||
while remaining > 0:
|
||||
chunk = await stream.read(remaining)
|
||||
if not chunk:
|
||||
logger.debug(
|
||||
f"Connection closed by peer {peer_id} while reading data"
|
||||
)
|
||||
return False
|
||||
response_bytes += chunk
|
||||
remaining -= len(chunk)
|
||||
|
||||
# Parse protobuf response
|
||||
response = Message()
|
||||
response.ParseFromString(response_bytes)
|
||||
|
||||
# Check if response is valid
|
||||
if response.type == Message.MessageType.PUT_VALUE:
|
||||
if response.key:
|
||||
result = True
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to store value at peer {peer_id}: {e}")
|
||||
return False
|
||||
|
||||
finally:
|
||||
if stream:
|
||||
await stream.close()
|
||||
return result
|
||||
|
||||
def get(self, key: bytes) -> bytes | None:
|
||||
"""
|
||||
Retrieve a value from the DHT.
|
||||
|
||||
params: key: The key to look up
|
||||
|
||||
Returns
|
||||
-------
|
||||
Optional[bytes]
|
||||
The stored value, or None if not found or expired
|
||||
|
||||
"""
|
||||
logger.debug("Retrieving value for key %s...", key.hex()[:8])
|
||||
if key not in self.store:
|
||||
return None
|
||||
|
||||
value, validity = self.store[key]
|
||||
logger.debug(
|
||||
"Found value for key %s... with validity %s",
|
||||
key.hex(),
|
||||
validity,
|
||||
)
|
||||
# Check if the value has expired
|
||||
if validity is not None and validity < time.time():
|
||||
logger.debug(
|
||||
"Value for key %s... has expired, removing it",
|
||||
key.hex()[:8],
|
||||
)
|
||||
self.remove(key)
|
||||
return None
|
||||
|
||||
return value
|
||||
|
||||
async def _get_from_peer(self, peer_id: ID, key: bytes) -> bytes | None:
|
||||
"""
|
||||
Retrieve a value from a specific peer.
|
||||
|
||||
params: peer_id: The ID of the peer to retrieve the value from
|
||||
params: key: The key to retrieve
|
||||
|
||||
Returns
|
||||
-------
|
||||
Optional[bytes]
|
||||
The value if found, None otherwise
|
||||
|
||||
"""
|
||||
stream = None
|
||||
try:
|
||||
# Don't try to get from ourselves
|
||||
if peer_id == self.local_peer_id:
|
||||
return None
|
||||
|
||||
logger.debug(f"Getting value for key {key.hex()} from peer {peer_id}")
|
||||
|
||||
# Open a stream to the peer
|
||||
stream = await self.host.new_stream(peer_id, [TProtocol(PROTOCOL_ID)])
|
||||
logger.debug(f"Opened stream to peer {peer_id} for GET_VALUE")
|
||||
|
||||
# Create the GET_VALUE message using protobuf
|
||||
message = Message()
|
||||
message.type = Message.MessageType.GET_VALUE
|
||||
message.key = key
|
||||
|
||||
# Serialize and send the protobuf message
|
||||
proto_bytes = message.SerializeToString()
|
||||
await stream.write(varint.encode(len(proto_bytes)))
|
||||
await stream.write(proto_bytes)
|
||||
|
||||
# Read response length
|
||||
length_bytes = b""
|
||||
while True:
|
||||
b = await stream.read(1)
|
||||
if not b:
|
||||
logger.warning("Connection closed while reading length")
|
||||
return None
|
||||
length_bytes += b
|
||||
if b[0] & 0x80 == 0:
|
||||
break
|
||||
response_length = varint.decode_bytes(length_bytes)
|
||||
# Read response data
|
||||
response_bytes = b""
|
||||
remaining = response_length
|
||||
while remaining > 0:
|
||||
chunk = await stream.read(remaining)
|
||||
if not chunk:
|
||||
logger.debug(
|
||||
f"Connection closed by peer {peer_id} while reading data"
|
||||
)
|
||||
return None
|
||||
response_bytes += chunk
|
||||
remaining -= len(chunk)
|
||||
|
||||
# Parse protobuf response
|
||||
try:
|
||||
response = Message()
|
||||
response.ParseFromString(response_bytes)
|
||||
logger.debug(
|
||||
f"Received protobuf response from peer"
|
||||
f" {peer_id}, type: {response.type}"
|
||||
)
|
||||
|
||||
# Process protobuf response
|
||||
if (
|
||||
response.type == Message.MessageType.GET_VALUE
|
||||
and response.HasField("record")
|
||||
and response.record.value
|
||||
):
|
||||
logger.debug(
|
||||
f"Received value for key {key.hex()} from peer {peer_id}"
|
||||
)
|
||||
return response.record.value
|
||||
|
||||
# Handle case where value is not found but peer infos are returned
|
||||
else:
|
||||
logger.debug(
|
||||
f"Value not found for key {key.hex()} from peer {peer_id},"
|
||||
f" received {len(response.closerPeers)} closer peers"
|
||||
)
|
||||
return None
|
||||
|
||||
except Exception as proto_err:
|
||||
logger.warning(f"Failed to parse as protobuf: {proto_err}")
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get value from peer {peer_id}: {e}")
|
||||
return None
|
||||
|
||||
finally:
|
||||
if stream:
|
||||
await stream.close()
|
||||
|
||||
def remove(self, key: bytes) -> bool:
|
||||
"""
|
||||
Remove a value from the DHT.
|
||||
|
||||
|
||||
params: key: The key to remove
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the key was found and removed, False otherwise
|
||||
|
||||
"""
|
||||
if key in self.store:
|
||||
del self.store[key]
|
||||
logger.debug(f"Removed value for key {key.hex()[:8]}...")
|
||||
return True
|
||||
return False
|
||||
|
||||
def has(self, key: bytes) -> bool:
|
||||
"""
|
||||
Check if a key exists in the store and hasn't expired.
|
||||
|
||||
params: key: The key to check
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the key exists and hasn't expired, False otherwise
|
||||
|
||||
"""
|
||||
if key not in self.store:
|
||||
return False
|
||||
|
||||
_, validity = self.store[key]
|
||||
if validity is not None and time.time() > validity:
|
||||
self.remove(key)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def cleanup_expired(self) -> int:
|
||||
"""
|
||||
Remove all expired values from the store.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The number of expired values that were removed
|
||||
|
||||
"""
|
||||
current_time = time.time()
|
||||
expired_keys = [
|
||||
key for key, (_, validity) in self.store.items() if current_time > validity
|
||||
]
|
||||
|
||||
for key in expired_keys:
|
||||
del self.store[key]
|
||||
|
||||
if expired_keys:
|
||||
logger.debug(f"Cleaned up {len(expired_keys)} expired values")
|
||||
|
||||
return len(expired_keys)
|
||||
|
||||
def get_keys(self) -> list[bytes]:
|
||||
"""
|
||||
Get all non-expired keys in the store.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[bytes]
|
||||
List of keys
|
||||
|
||||
"""
|
||||
# Clean up expired values first
|
||||
self.cleanup_expired()
|
||||
return list(self.store.keys())
|
||||
|
||||
def size(self) -> int:
|
||||
"""
|
||||
Get the number of items in the store (after removing expired entries).
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
Number of items
|
||||
|
||||
"""
|
||||
self.cleanup_expired()
|
||||
return len(self.store)
|
||||
Reference in New Issue
Block a user