mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-10 15:10:54 +00:00
enforce_peer_record_limit
This commit is contained in:
@ -158,7 +158,7 @@ def pub_key_from_protobuf(pb_key: cryto_pb.PublicKey) -> PublicKey:
|
|||||||
return RSAPublicKey.from_bytes(pb_key.Data)
|
return RSAPublicKey.from_bytes(pb_key.Data)
|
||||||
elif pb_key.Type == cryto_pb.KeyType.Secp256k1:
|
elif pb_key.Type == cryto_pb.KeyType.Secp256k1:
|
||||||
return Secp256k1PublicKey.from_bytes(pb_key.Data)
|
return Secp256k1PublicKey.from_bytes(pb_key.Data)
|
||||||
# TODO: Add suport fot ECDSA parsing also
|
# libp2p.crypto.ecdsa not implemented
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown key type: {pb_key.Type}")
|
raise ValueError(f"Unknown key type: {pb_key.Type}")
|
||||||
|
|
||||||
|
|||||||
@ -53,10 +53,11 @@ class PeerRecordState:
|
|||||||
class PeerStore(IPeerStore):
|
class PeerStore(IPeerStore):
|
||||||
peer_data_map: dict[ID, PeerData]
|
peer_data_map: dict[ID, PeerData]
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self, max_records: int = 10000) -> None:
|
||||||
self.peer_data_map = defaultdict(PeerData)
|
self.peer_data_map = defaultdict(PeerData)
|
||||||
self.addr_update_channels: dict[ID, MemorySendChannel[Multiaddr]] = {}
|
self.addr_update_channels: dict[ID, MemorySendChannel[Multiaddr]] = {}
|
||||||
self.peer_record_map: dict[ID, PeerRecordState] = {}
|
self.peer_record_map: dict[ID, PeerRecordState] = {}
|
||||||
|
self.max_records = max_records
|
||||||
|
|
||||||
def peer_info(self, peer_id: ID) -> PeerInfo:
|
def peer_info(self, peer_id: ID) -> PeerInfo:
|
||||||
"""
|
"""
|
||||||
@ -95,6 +96,18 @@ class PeerStore(IPeerStore):
|
|||||||
peer_data.clear_addrs()
|
peer_data.clear_addrs()
|
||||||
return valid_peer_ids
|
return valid_peer_ids
|
||||||
|
|
||||||
|
def _enforce_record_limit(self) -> None:
|
||||||
|
"""Enforce maximum number of stored records."""
|
||||||
|
if len(self.peer_record_map) > self.max_records:
|
||||||
|
# Record oldest records based on seequence number
|
||||||
|
sorted_records = sorted(
|
||||||
|
self.peer_record_map.items(), key=lambda x: x[1].seq
|
||||||
|
)
|
||||||
|
records_to_remove = len(self.peer_record_map) - self.max_records
|
||||||
|
for peer_id, _ in sorted_records[:records_to_remove]:
|
||||||
|
self.maybe_delete_peer_record(peer_id)
|
||||||
|
del self.peer_record_map[peer_id]
|
||||||
|
|
||||||
# --------PROTO-BOOK--------
|
# --------PROTO-BOOK--------
|
||||||
|
|
||||||
def get_protocols(self, peer_id: ID) -> list[str]:
|
def get_protocols(self, peer_id: ID) -> list[str]:
|
||||||
@ -211,15 +224,27 @@ class PeerStore(IPeerStore):
|
|||||||
record = envelope.record()
|
record = envelope.record()
|
||||||
peer_id = record.peer_id
|
peer_id = record.peer_id
|
||||||
|
|
||||||
# TODO: Put up a limit on the number of records to be stored ?
|
|
||||||
existing = self.peer_record_map.get(peer_id)
|
existing = self.peer_record_map.get(peer_id)
|
||||||
if existing and existing.seq > record.seq:
|
if existing and existing.seq > record.seq:
|
||||||
return False # reject older record
|
return False # reject older record
|
||||||
|
|
||||||
|
# Merge new addresses with existing ones if peer exists
|
||||||
|
if peer_id in self.peer_data_map:
|
||||||
|
try:
|
||||||
|
existing_addrs = set(self.addrs(peer_id))
|
||||||
|
except PeerStoreError:
|
||||||
|
existing_addrs = set()
|
||||||
|
else:
|
||||||
|
existing_addrs = set()
|
||||||
|
|
||||||
|
new_addrs = set(record.addrs)
|
||||||
|
merged_addrs = list(existing_addrs.union(new_addrs))
|
||||||
|
|
||||||
# TODO: In case of overwriting a record, what should be do with the
|
# TODO: In case of overwriting a record, what should be do with the
|
||||||
# old addresses, do we overwrite them with the new addresses too ?
|
# old addresses, do we overwrite them with the new addresses too ?
|
||||||
self.peer_record_map[peer_id] = PeerRecordState(envelope, record.seq)
|
self.peer_record_map[peer_id] = PeerRecordState(envelope, record.seq)
|
||||||
self.add_addrs(peer_id, record.addrs, ttl)
|
self.clear_addrs(peer_id)
|
||||||
|
self.add_addrs(peer_id, merged_addrs, ttl)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|||||||
@ -106,8 +106,8 @@ def test_ceritified_addr_book():
|
|||||||
|
|
||||||
result = store.consume_peer_record(envelope, ttl)
|
result = store.consume_peer_record(envelope, ttl)
|
||||||
assert result is True
|
assert result is True
|
||||||
|
|
||||||
# Retrieve the record
|
# Retrieve the record
|
||||||
|
|
||||||
retrieved = store.get_peer_record(peer_id)
|
retrieved = store.get_peer_record(peer_id)
|
||||||
assert retrieved is not None
|
assert retrieved is not None
|
||||||
assert isinstance(retrieved, Envelope)
|
assert isinstance(retrieved, Envelope)
|
||||||
@ -132,4 +132,8 @@ def test_ceritified_addr_book():
|
|||||||
latest = store.get_peer_record(peer_id)
|
latest = store.get_peer_record(peer_id)
|
||||||
assert isinstance(latest, Envelope)
|
assert isinstance(latest, Envelope)
|
||||||
assert latest.record().seq == 23
|
assert latest.record().seq == 23
|
||||||
assert set(new_addrs).issubset(set(store.addrs(peer_id)))
|
|
||||||
|
# Merged addresses = old addres + new_addrs
|
||||||
|
expected_addrs = set(addrs).union(set(new_addrs))
|
||||||
|
actual_addrs = set(store.addrs(peer_id))
|
||||||
|
assert actual_addrs == expected_addrs
|
||||||
|
|||||||
Reference in New Issue
Block a user