mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-09 22:50:54 +00:00
Merge branch 'main' into px-backoff
This commit is contained in:
@ -200,7 +200,9 @@ def new_swarm(
|
||||
key_pair, noise_privkey=noise_key_pair.private_key
|
||||
),
|
||||
TProtocol(secio.ID): secio.Transport(key_pair),
|
||||
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
|
||||
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(
|
||||
key_pair, peerstore=peerstore_opt
|
||||
),
|
||||
}
|
||||
|
||||
# Use given muxer preference if provided, otherwise use global default
|
||||
|
||||
14
libp2p/kad_dht/common.py
Normal file
14
libp2p/kad_dht/common.py
Normal file
@ -0,0 +1,14 @@
|
||||
"""
|
||||
Shared constants and protocol parameters for the Kademlia DHT.
|
||||
"""
|
||||
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
|
||||
# Constants for the Kademlia algorithm
|
||||
ALPHA = 3 # Concurrency parameter
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
QUERY_TIMEOUT = 10
|
||||
|
||||
TTL = DEFAULT_TTL = 24 * 60 * 60 # 24 hours in seconds
|
||||
@ -5,7 +5,9 @@ This module provides a complete Distributed Hash Table (DHT)
|
||||
implementation based on the Kademlia algorithm and protocol.
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from enum import (
|
||||
Enum,
|
||||
)
|
||||
import logging
|
||||
import time
|
||||
|
||||
@ -18,9 +20,6 @@ import varint
|
||||
from libp2p.abc import (
|
||||
IHost,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.network.stream.net_stream import (
|
||||
INetStream,
|
||||
)
|
||||
@ -34,6 +33,11 @@ from libp2p.tools.async_service import (
|
||||
Service,
|
||||
)
|
||||
|
||||
from .common import (
|
||||
ALPHA,
|
||||
PROTOCOL_ID,
|
||||
QUERY_TIMEOUT,
|
||||
)
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
@ -53,11 +57,7 @@ from .value_store import (
|
||||
logger = logging.getLogger("kademlia-example.kad_dht")
|
||||
# logger = logging.getLogger("libp2p.kademlia")
|
||||
# Default parameters
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
ROUTING_TABLE_REFRESH_INTERVAL = 1 * 60 # 1 min in seconds for testing
|
||||
TTL = 24 * 60 * 60 # 24 hours in seconds
|
||||
ALPHA = 3
|
||||
QUERY_TIMEOUT = 10 # seconds
|
||||
ROUTING_TABLE_REFRESH_INTERVAL = 60 # 1 min in seconds for testing
|
||||
|
||||
|
||||
class DHTMode(Enum):
|
||||
|
||||
@ -15,9 +15,6 @@ from libp2p.abc import (
|
||||
INetStream,
|
||||
IPeerRouting,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
)
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
@ -25,6 +22,10 @@ from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
|
||||
from .common import (
|
||||
ALPHA,
|
||||
PROTOCOL_ID,
|
||||
)
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
@ -38,10 +39,7 @@ from .utils import (
|
||||
# logger = logging.getLogger("libp2p.kademlia.peer_routing")
|
||||
logger = logging.getLogger("kademlia-example.peer_routing")
|
||||
|
||||
# Constants for the Kademlia algorithm
|
||||
ALPHA = 3 # Concurrency parameter
|
||||
MAX_PEER_LOOKUP_ROUNDS = 20 # Maximum number of rounds in peer lookup
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
|
||||
|
||||
class PeerRouting(IPeerRouting):
|
||||
@ -62,7 +60,6 @@ class PeerRouting(IPeerRouting):
|
||||
"""
|
||||
self.host = host
|
||||
self.routing_table = routing_table
|
||||
self.protocol_id = PROTOCOL_ID
|
||||
|
||||
async def find_peer(self, peer_id: ID) -> PeerInfo | None:
|
||||
"""
|
||||
@ -247,7 +244,7 @@ class PeerRouting(IPeerRouting):
|
||||
# Open a stream to the peer using the Kademlia protocol
|
||||
logger.debug(f"Opening stream to {peer} for closest peers query")
|
||||
try:
|
||||
stream = await self.host.new_stream(peer, [self.protocol_id])
|
||||
stream = await self.host.new_stream(peer, [PROTOCOL_ID])
|
||||
logger.debug(f"Stream opened to {peer}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to open stream to {peer}: {e}")
|
||||
|
||||
@ -29,6 +29,11 @@ from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
|
||||
from .common import (
|
||||
ALPHA,
|
||||
PROTOCOL_ID,
|
||||
QUERY_TIMEOUT,
|
||||
)
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
@ -40,9 +45,6 @@ logger = logging.getLogger("kademlia-example.provider_store")
|
||||
PROVIDER_RECORD_REPUBLISH_INTERVAL = 22 * 60 * 60 # 22 hours in seconds
|
||||
PROVIDER_RECORD_EXPIRATION_INTERVAL = 48 * 60 * 60 # 48 hours in seconds
|
||||
PROVIDER_ADDRESS_TTL = 30 * 60 # 30 minutes in seconds
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
ALPHA = 3 # Number of parallel queries/advertisements
|
||||
QUERY_TIMEOUT = 10 # Timeout for each query in seconds
|
||||
|
||||
|
||||
class ProviderRecord:
|
||||
|
||||
@ -13,10 +13,9 @@ import trio
|
||||
from libp2p.abc import (
|
||||
IHost,
|
||||
)
|
||||
from libp2p.custom_types import (
|
||||
TProtocol,
|
||||
from libp2p.kad_dht.utils import (
|
||||
xor_distance,
|
||||
)
|
||||
from libp2p.kad_dht.utils import xor_distance
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
@ -24,6 +23,9 @@ from libp2p.peer.peerinfo import (
|
||||
PeerInfo,
|
||||
)
|
||||
|
||||
from .common import (
|
||||
PROTOCOL_ID,
|
||||
)
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
@ -242,12 +244,9 @@ class KBucket:
|
||||
if not peer_info:
|
||||
raise ValueError(f"Peer {peer_id} not in bucket")
|
||||
|
||||
# Default protocol ID for Kademlia DHT
|
||||
protocol_id = TProtocol("/ipfs/kad/1.0.0")
|
||||
|
||||
try:
|
||||
# Open a stream to the peer with the DHT protocol
|
||||
stream = await self.host.new_stream(peer_id, [protocol_id])
|
||||
stream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
|
||||
|
||||
try:
|
||||
# Create ping protobuf message
|
||||
|
||||
@ -19,6 +19,10 @@ from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
|
||||
from .common import (
|
||||
DEFAULT_TTL,
|
||||
PROTOCOL_ID,
|
||||
)
|
||||
from .pb.kademlia_pb2 import (
|
||||
Message,
|
||||
)
|
||||
@ -26,10 +30,6 @@ from .pb.kademlia_pb2 import (
|
||||
# logger = logging.getLogger("libp2p.kademlia.value_store")
|
||||
logger = logging.getLogger("kademlia-example.value_store")
|
||||
|
||||
# Default time to live for values in seconds (24 hours)
|
||||
DEFAULT_TTL = 24 * 60 * 60
|
||||
PROTOCOL_ID = TProtocol("/ipfs/kad/1.0.0")
|
||||
|
||||
|
||||
class ValueStore:
|
||||
"""
|
||||
|
||||
@ -1,4 +1,7 @@
|
||||
from collections.abc import Callable
|
||||
|
||||
from libp2p.abc import (
|
||||
IPeerStore,
|
||||
IRawConnection,
|
||||
ISecureConn,
|
||||
)
|
||||
@ -6,6 +9,7 @@ from libp2p.crypto.exceptions import (
|
||||
MissingDeserializerError,
|
||||
)
|
||||
from libp2p.crypto.keys import (
|
||||
KeyPair,
|
||||
PrivateKey,
|
||||
PublicKey,
|
||||
)
|
||||
@ -30,11 +34,15 @@ from libp2p.network.connection.exceptions import (
|
||||
from libp2p.peer.id import (
|
||||
ID,
|
||||
)
|
||||
from libp2p.peer.peerstore import (
|
||||
PeerStoreError,
|
||||
)
|
||||
from libp2p.security.base_session import (
|
||||
BaseSession,
|
||||
)
|
||||
from libp2p.security.base_transport import (
|
||||
BaseSecureTransport,
|
||||
default_secure_bytes_provider,
|
||||
)
|
||||
from libp2p.security.exceptions import (
|
||||
HandshakeFailure,
|
||||
@ -102,6 +110,7 @@ async def run_handshake(
|
||||
conn: IRawConnection,
|
||||
is_initiator: bool,
|
||||
remote_peer_id: ID | None,
|
||||
peerstore: IPeerStore | None = None,
|
||||
) -> ISecureConn:
|
||||
"""Raise `HandshakeFailure` when handshake failed."""
|
||||
msg = make_exchange_message(local_private_key.get_public_key())
|
||||
@ -164,7 +173,14 @@ async def run_handshake(
|
||||
conn=conn,
|
||||
)
|
||||
|
||||
# TODO: Store `pubkey` and `peer_id` to `PeerStore`
|
||||
# Store `pubkey` and `peer_id` to `PeerStore`
|
||||
if peerstore is not None:
|
||||
try:
|
||||
peerstore.add_pubkey(received_peer_id, received_pubkey)
|
||||
except PeerStoreError:
|
||||
# If peer ID and pubkey don't match, it would have already been caught above
|
||||
# This might happen if the peer is already in the store
|
||||
pass
|
||||
|
||||
return secure_conn
|
||||
|
||||
@ -175,6 +191,18 @@ class InsecureTransport(BaseSecureTransport):
|
||||
transport does not add any additional security.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
local_key_pair: KeyPair,
|
||||
secure_bytes_provider: Callable[[int], bytes] | None = None,
|
||||
peerstore: IPeerStore | None = None,
|
||||
) -> None:
|
||||
# If secure_bytes_provider is None, use the default one
|
||||
if secure_bytes_provider is None:
|
||||
secure_bytes_provider = default_secure_bytes_provider
|
||||
super().__init__(local_key_pair, secure_bytes_provider)
|
||||
self.peerstore = peerstore
|
||||
|
||||
async def secure_inbound(self, conn: IRawConnection) -> ISecureConn:
|
||||
"""
|
||||
Secure the connection, either locally or by communicating with opposing
|
||||
@ -183,8 +211,9 @@ class InsecureTransport(BaseSecureTransport):
|
||||
|
||||
:return: secure connection object (that implements secure_conn_interface)
|
||||
"""
|
||||
# For inbound connections, we don't know the remote peer ID yet
|
||||
return await run_handshake(
|
||||
self.local_peer, self.local_private_key, conn, False, None
|
||||
self.local_peer, self.local_private_key, conn, False, None, self.peerstore
|
||||
)
|
||||
|
||||
async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn:
|
||||
@ -195,7 +224,7 @@ class InsecureTransport(BaseSecureTransport):
|
||||
:return: secure connection object (that implements secure_conn_interface)
|
||||
"""
|
||||
return await run_handshake(
|
||||
self.local_peer, self.local_private_key, conn, True, peer_id
|
||||
self.local_peer, self.local_private_key, conn, True, peer_id, self.peerstore
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user