mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Completed: CertifiedAddrBook interface with related tests
This commit is contained in:
134
libp2p/abc.py
134
libp2p/abc.py
@ -47,16 +47,13 @@ from libp2p.peer.peerinfo import (
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||
from libp2p.peer.envelope import Envelope
|
||||
from libp2p.peer.peer_record import PeerRecord
|
||||
from libp2p.pubsub.pubsub import (
|
||||
Pubsub,
|
||||
)
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||
|
||||
from libp2p.pubsub.pb import (
|
||||
rpc_pb2,
|
||||
)
|
||||
@ -495,6 +492,71 @@ class IAddrBook(ABC):
|
||||
"""
|
||||
|
||||
|
||||
# ------------------ certified-addr-book interface.py ---------------------
|
||||
class ICertifiedAddrBook(ABC):
|
||||
"""
|
||||
Interface for a certified address book.
|
||||
|
||||
Provides methods for managing signed peer records
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def consume_peer_record(self, envelope: Envelope, ttl: int) -> bool:
|
||||
"""
|
||||
Accept and store a signed PeerRecord, unless it's older than
|
||||
the one already stored.
|
||||
|
||||
This function:
|
||||
- Extracts the peer ID and sequence number from the envelope
|
||||
- Rejects the record if it's older (lower seq)
|
||||
- Updates the stored peer record and replaces associated
|
||||
addresses if accepted
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
envelope:
|
||||
Signed envelope containing a PeerRecord.
|
||||
ttl:
|
||||
Time-to-live for the included multiaddrs (in seconds).
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_peer_record(self, peer_id: ID) -> Envelope | None:
|
||||
"""
|
||||
Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists
|
||||
and is still relevant.
|
||||
|
||||
First, it runs cleanup via `maybe_delete_peer_record` to purge stale data.
|
||||
Then it checks whether the peer has valid, unexpired addresses before
|
||||
returning the associated envelope.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
peer_id : ID
|
||||
The peer to look up.
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def maybe_delete_peer_record(self, peer_id: ID) -> None:
|
||||
"""
|
||||
Delete the signed peer record for a peer if it has no know
|
||||
(non-expired) addresses.
|
||||
|
||||
This is a garbage collection mechanism: if all addresses for a peer have expired
|
||||
or been cleared, there's no point holding onto its signed `Envelope`
|
||||
|
||||
Parameters
|
||||
----------
|
||||
peer_id : ID
|
||||
The peer whose record we may delete.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
# -------------------------- keybook interface.py --------------------------
|
||||
|
||||
|
||||
@ -760,7 +822,9 @@ class IProtoBook(ABC):
|
||||
# -------------------------- peerstore interface.py --------------------------
|
||||
|
||||
|
||||
class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook):
|
||||
class IPeerStore(
|
||||
IPeerMetadata, IAddrBook, ICertifiedAddrBook, IKeyBook, IMetrics, IProtoBook
|
||||
):
|
||||
"""
|
||||
Interface for a peer store.
|
||||
|
||||
@ -895,7 +959,65 @@ class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook):
|
||||
|
||||
"""
|
||||
|
||||
# --------CERTIFIED-ADDR-BOOK----------
|
||||
|
||||
@abstractmethod
|
||||
def consume_peer_record(self, envelope: Envelope, ttl: int) -> bool:
|
||||
"""
|
||||
Accept and store a signed PeerRecord, unless it's older
|
||||
than the one already stored.
|
||||
|
||||
This function:
|
||||
- Extracts the peer ID and sequence number from the envelope
|
||||
- Rejects the record if it's older (lower seq)
|
||||
- Updates the stored peer record and replaces associated addresses if accepted
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
envelope:
|
||||
Signed envelope containing a PeerRecord.
|
||||
ttl:
|
||||
Time-to-live for the included multiaddrs (in seconds).
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_peer_record(self, peer_id: ID) -> Envelope | None:
|
||||
"""
|
||||
Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists
|
||||
and is still relevant.
|
||||
|
||||
First, it runs cleanup via `maybe_delete_peer_record` to purge stale data.
|
||||
Then it checks whether the peer has valid, unexpired addresses before
|
||||
returning the associated envelope.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
peer_id : ID
|
||||
The peer to look up.
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def maybe_delete_peer_record(self, peer_id: ID) -> None:
|
||||
"""
|
||||
Delete the signed peer record for a peer if it has no
|
||||
know (non-expired) addresses.
|
||||
|
||||
This is a garbage collection mechanism: if all addresses for a peer have expired
|
||||
or been cleared, there's no point holding onto its signed `Envelope`
|
||||
|
||||
Parameters
|
||||
----------
|
||||
peer_id : ID
|
||||
The peer whose record we may delete.
|
||||
|
||||
"""
|
||||
|
||||
# --------KEY-BOOK----------
|
||||
|
||||
@abstractmethod
|
||||
def pubkey(self, peer_id: ID) -> PublicKey:
|
||||
"""
|
||||
|
||||
@ -23,6 +23,7 @@ from libp2p.crypto.keys import (
|
||||
PrivateKey,
|
||||
PublicKey,
|
||||
)
|
||||
from libp2p.peer.envelope import Envelope
|
||||
|
||||
from .id import (
|
||||
ID,
|
||||
@ -38,12 +39,24 @@ from .peerinfo import (
|
||||
PERMANENT_ADDR_TTL = 0
|
||||
|
||||
|
||||
# TODO: Set up an async task for periodic peer-store cleanup
|
||||
# for expired addresses and records.
|
||||
class PeerRecordState:
|
||||
envelope: Envelope
|
||||
seq: int
|
||||
|
||||
def __init__(self, envelope: Envelope, seq: int):
|
||||
self.envelope = envelope
|
||||
self.seq = seq
|
||||
|
||||
|
||||
class PeerStore(IPeerStore):
|
||||
peer_data_map: dict[ID, PeerData]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.peer_data_map = defaultdict(PeerData)
|
||||
self.addr_update_channels: dict[ID, MemorySendChannel[Multiaddr]] = {}
|
||||
self.peer_record_map: dict[ID, PeerRecordState] = {}
|
||||
|
||||
def peer_info(self, peer_id: ID) -> PeerInfo:
|
||||
"""
|
||||
@ -165,6 +178,76 @@ class PeerStore(IPeerStore):
|
||||
peer_data = self.peer_data_map[peer_id]
|
||||
peer_data.clear_metadata()
|
||||
|
||||
# -----CERT-ADDR-BOOK-----
|
||||
|
||||
# TODO: Make proper use of this function
|
||||
def maybe_delete_peer_record(self, peer_id: ID) -> None:
|
||||
"""
|
||||
Delete the signed peer record for a peer if it has no know
|
||||
(non-expired) addresses.
|
||||
|
||||
This is a garbage collection mechanism: if all addresses for a peer have expired
|
||||
or been cleared, there's no point holding onto its signed `Envelope`
|
||||
|
||||
:param peer_id: The peer whose record we may delete/
|
||||
"""
|
||||
if not self.addrs(peer_id):
|
||||
self.peer_record_map.pop(peer_id, None)
|
||||
|
||||
def consume_peer_record(self, envelope: Envelope, ttl: int) -> bool:
|
||||
"""
|
||||
Accept and store a signed PeerRecord, unless it's older than
|
||||
the one already stored.
|
||||
|
||||
This function:
|
||||
- Extracts the peer ID and sequence number from the envelope
|
||||
- Rejects the record if it's older (lower seq)
|
||||
- Updates the stored peer record and replaces associated addresses if accepted
|
||||
|
||||
:param envelope: Signed envelope containing a PeerRecord.
|
||||
:param ttl: Time-to-live for the included multiaddrs (in seconds).
|
||||
:return: True if the record was accepted and stored; False if it was rejected.
|
||||
"""
|
||||
record = envelope.record()
|
||||
peer_id = record.peer_id
|
||||
|
||||
# TODO: Put up a limit on the number of records to be stored ?
|
||||
existing = self.peer_record_map.get(peer_id)
|
||||
if existing and existing.seq > record.seq:
|
||||
return False # reject older record
|
||||
|
||||
# TODO: In case of overwriting a record, what should be do with the
|
||||
# old addresses, do we overwrite them with the new addresses too ?
|
||||
self.peer_record_map[peer_id] = PeerRecordState(envelope, record.seq)
|
||||
self.add_addrs(peer_id, record.addrs, ttl)
|
||||
|
||||
return True
|
||||
|
||||
def get_peer_record(self, peer_id: ID) -> Envelope | None:
|
||||
"""
|
||||
Retrieve the most recent signed PeerRecord `Envelope` for a peer, if it exists
|
||||
and is still relevant.
|
||||
|
||||
First, it runs cleanup via `maybe_delete_peer_record` to purge stale data.
|
||||
Then it checks whether the peer has valid, unexpired addresses before
|
||||
returning the associated envelope.
|
||||
|
||||
:param peer_id: The peer to look up.
|
||||
:return: The signed Envelope if the peer is known and has valid
|
||||
addresses; None otherwise.
|
||||
"""
|
||||
self.maybe_delete_peer_record(peer_id)
|
||||
|
||||
# Check if the peer has any valid addresses
|
||||
if (
|
||||
peer_id in self.peer_data_map
|
||||
and not self.peer_data_map[peer_id].is_expired()
|
||||
):
|
||||
state = self.peer_record_map.get(peer_id)
|
||||
if state is not None:
|
||||
return state.envelope
|
||||
return None
|
||||
|
||||
# -------ADDR-BOOK--------
|
||||
|
||||
def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int = 0) -> None:
|
||||
@ -193,6 +276,8 @@ class PeerStore(IPeerStore):
|
||||
except trio.WouldBlock:
|
||||
pass # Or consider logging / dropping / replacing stream
|
||||
|
||||
self.maybe_delete_peer_record(peer_id)
|
||||
|
||||
def addrs(self, peer_id: ID) -> list[Multiaddr]:
|
||||
"""
|
||||
:param peer_id: peer ID to get addrs for
|
||||
|
||||
@ -3,7 +3,10 @@ from multiaddr import (
|
||||
Multiaddr,
|
||||
)
|
||||
|
||||
from libp2p.crypto.rsa import create_new_key_pair
|
||||
from libp2p.peer.envelope import Envelope, seal_record
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.peer.peer_record import PeerRecord
|
||||
from libp2p.peer.peerstore import (
|
||||
PeerStore,
|
||||
PeerStoreError,
|
||||
@ -84,3 +87,49 @@ def test_peers_with_addrs():
|
||||
store.clear_addrs(ID(b"peer2"))
|
||||
|
||||
assert set(store.peers_with_addrs()) == {ID(b"peer3")}
|
||||
|
||||
|
||||
def test_ceritified_addr_book():
|
||||
store = PeerStore()
|
||||
|
||||
key_pair = create_new_key_pair()
|
||||
peer_id = ID.from_pubkey(key_pair.public_key)
|
||||
addrs = [
|
||||
Multiaddr("/ip4/127.0.0.1/tcp/9000"),
|
||||
Multiaddr("/ip4/127.0.0.1/tcp/9001"),
|
||||
]
|
||||
ttl = 60
|
||||
|
||||
# Construct signed PereRecord
|
||||
record = PeerRecord(peer_id, addrs, 21)
|
||||
envelope = seal_record(record, key_pair.private_key)
|
||||
|
||||
result = store.consume_peer_record(envelope, ttl)
|
||||
assert result is True
|
||||
|
||||
# Retrieve the record
|
||||
retrieved = store.get_peer_record(peer_id)
|
||||
assert retrieved is not None
|
||||
assert isinstance(retrieved, Envelope)
|
||||
|
||||
addr_list = store.addrs(peer_id)
|
||||
assert set(addr_list) == set(addrs)
|
||||
|
||||
# Now try to push an older record (should be rejected)
|
||||
old_record = PeerRecord(peer_id, [Multiaddr("/ip4/10.0.0.1/tcp/4001")], 20)
|
||||
old_envelope = seal_record(old_record, key_pair.private_key)
|
||||
result = store.consume_peer_record(old_envelope, ttl)
|
||||
assert result is False
|
||||
|
||||
# Push a new record (should override)
|
||||
new_addrs = [Multiaddr("/ip4/192.168.0.1/tcp/5001")]
|
||||
new_record = PeerRecord(peer_id, new_addrs, 23)
|
||||
new_envelope = seal_record(new_record, key_pair.private_key)
|
||||
result = store.consume_peer_record(new_envelope, ttl)
|
||||
assert result is True
|
||||
|
||||
# Confirm the record is updated
|
||||
latest = store.get_peer_record(peer_id)
|
||||
assert isinstance(latest, Envelope)
|
||||
assert latest.record().seq == 23
|
||||
assert set(new_addrs).issubset(set(store.addrs(peer_id)))
|
||||
|
||||
Reference in New Issue
Block a user