diff --git a/libp2p/abc.py b/libp2p/abc.py index 0b63ed04..343ae0a7 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -479,18 +479,6 @@ class IAddrBook(ABC): """ - @abstractmethod - def set_addr(self, peer_id: ID, addr: Multiaddr, ttl: int) -> None: - """Set addr""" - - @abstractmethod - def set_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int) -> None: - """Set addrs""" - - @abstractmethod - def update_addrs(self, peer_id: ID, oldTTL: int, newTTL: int) -> None: - """Update addrs""" - @abstractmethod def addr_stream(self, peer_id: ID) -> None: """Addr stream""" @@ -527,7 +515,7 @@ class IKeyBook(ABC): """peer_with_keys""" @abstractmethod - def clear_keydata(self, peer_id: ID) -> PublicKey: + def clear_keydata(self, peer_id: ID) -> None: """clear_keydata""" @@ -671,18 +659,6 @@ class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook): """ - @abstractmethod - def set_addr(self, peer_id: ID, addr: Multiaddr, ttl: int) -> None: - """set_addr""" - - @abstractmethod - def set_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int) -> None: - """set_addrs""" - - @abstractmethod - def update_addrs(self, peer_id: ID, oldTTL: int, newTTL: int) -> None: - """update_addrs""" - @abstractmethod def addrs(self, peer_id: ID) -> list[Multiaddr]: """ @@ -835,7 +811,7 @@ class IPeerStore(IPeerMetadata, IAddrBook, IKeyBook, IMetrics, IProtoBook): """peer_with_keys""" @abstractmethod - def clear_keydata(self, peer_id: ID) -> PublicKey: + def clear_keydata(self, peer_id: ID) -> None: """clear_keydata""" ## @@ -1488,7 +1464,7 @@ class IPeerData(ABC): """ @abstractmethod - def add_addrs(self, addrs: Sequence[Multiaddr], ttl: int) -> None: + def add_addrs(self, addrs: Sequence[Multiaddr]) -> None: """ Add multiple multiaddresses to the peer's data. diff --git a/libp2p/peer/peerdata.py b/libp2p/peer/peerdata.py index bd0c4d0b..bf54a494 100644 --- a/libp2p/peer/peerdata.py +++ b/libp2p/peer/peerdata.py @@ -34,6 +34,7 @@ class PeerData(IPeerData): addrs: list[Multiaddr] last_identified: int ttl: int # Keep ttl=0 by default for always valid + latmap: float def __init__(self) -> None: self.pubkey = None @@ -43,6 +44,7 @@ class PeerData(IPeerData): self.addrs = [] self.last_identified = int(time.time()) self.ttl = 0 + self.latmap = 0 def get_protocols(self) -> list[str]: """ @@ -92,52 +94,13 @@ class PeerData(IPeerData): """Clear all protocols""" self.protocols = [] - def add_addrs(self, addrs: Sequence[Multiaddr], ttl: int) -> None: + def add_addrs(self, addrs: Sequence[Multiaddr]) -> None: """ :param addrs: multiaddresses to add """ - expiry = time.time() + ttl if ttl is not None else float("inf") for addr in addrs: if addr not in self.addrs: self.addrs.append(addr) - current_expiry = self.addrs_ttl.get(addr, 0) - if expiry > current_expiry: - self.addrs_ttl[addr] = expiry - - def set_addrs(self, addrs: Sequence[Multiaddr], ttl: int) -> None: - """ - :param addrs: multiaddresses to update - :param ttl: new ttl - """ - now = time.time() - - if ttl <= 0: - # Put the TTL value to -1 - for addr in addrs: - # TODO! if addr in self.addrs, remove them? - if addr in self.addrs_ttl: - del self.addrs_ttl[addr] - return - - expiry = now + ttl - for addr in addrs: - # TODO! if addr not in self.addrs, add them? - self.addrs_ttl[addr] = expiry - - def update_addrs(self, oldTTL: int, newTTL: int) -> None: - """ - :param oldTTL: old ttl - :param newTTL: new ttl - """ - now = time.time() - - new_expiry = now + newTTL - old_expiry = now + oldTTL - - for addr, expiry in list(self.addrs_ttl.items()): - # Approximate match by expiry time - if abs(expiry - old_expiry) < 1: - self.addrs_ttl[addr] = new_expiry def get_addrs(self) -> list[Multiaddr]: """ @@ -200,6 +163,36 @@ class PeerData(IPeerData): raise PeerDataError("private key not found") return self.privkey + def clear_keydata(self) -> None: + """Clears keydata""" + self.pubkey = None + self.privkey = None + + def record_latency(self, new_latency: float) -> None: + """ + Records a new latency measurement for the given peer + using Exponentially Weighted Moving Average (EWMA) + :param new_latency: the new latency value + """ + s = LATENCY_EWMA_SMOOTHING + if s > 1 or s < 0: + s = 0.1 + + if self.latmap is None: + self.latmap = new_latency + else: + prev = self.latmap + updated = ((1.0 - s) * prev) + (s * new_latency) + self.latmap = updated + + def latency_EWMA(self) -> float: + """Returns the latency EWMA value""" + return self.latmap + + def clear_metrics(self) -> None: + """Clear the latency metrics""" + self.latmap = 0 + def update_last_identified(self) -> None: self.last_identified = int(time.time()) diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index ada56f47..4539fc87 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -100,7 +100,17 @@ class PeerStore(IPeerStore): """ :return: all of the peer IDs stored in peer store """ - return list(self.peer_data_map.keys()) + peer_data = self.peer_data_map[peer_id] + return peer_data.supports_protocols(protocols) + + def first_supported_protocol(self, peer_id: ID, protocols: Sequence[str]) -> str: + peer_data = self.peer_data_map[peer_id] + return peer_data.first_supported_protocol(protocols) + + def clear_protocol_data(self, peer_id: ID) -> None: + """Clears prtocoldata""" + peer_data = self.peer_data_map[peer_id] + peer_data.clear_protocol_data() def valid_peer_ids(self) -> list[ID]: """ @@ -138,6 +148,11 @@ class PeerStore(IPeerStore): peer_data = self.peer_data_map[peer_id] peer_data.put_metadata(key, val) + def clear_metadata(self, peer_id: ID) -> None: + """Clears metadata""" + peer_data = self.peer_data_map[peer_id] + peer_data.clear_metadata() + def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int = 0) -> None: """ :param peer_id: peer ID to add address for diff --git a/libp2p/tools/async_service/base.py b/libp2p/tools/async_service/base.py index bd6c3ef0..a23f0e75 100644 --- a/libp2p/tools/async_service/base.py +++ b/libp2p/tools/async_service/base.py @@ -18,6 +18,7 @@ import sys from typing import ( Any, TypeVar, + cast, ) import uuid @@ -360,7 +361,7 @@ class BaseManager(InternalManagerAPI): # Only show stacktrace if this is **not** a DaemonTaskExit error exc_info=not isinstance(err, DaemonTaskExit), ) - self._errors.append(sys.exc_info()) + self._errors.append(cast(EXC_INFO, sys.exc_info())) self.cancel() else: if task.parent is None: diff --git a/libp2p/tools/async_service/trio_service.py b/libp2p/tools/async_service/trio_service.py index 61b5cb7a..3fdddb81 100644 --- a/libp2p/tools/async_service/trio_service.py +++ b/libp2p/tools/async_service/trio_service.py @@ -52,6 +52,7 @@ from .exceptions import ( LifecycleError, ) from .typing import ( + EXC_INFO, AsyncFn, ) @@ -231,7 +232,7 @@ class TrioManager(BaseManager): # Exceptions from any tasks spawned by our service will be # caught by trio and raised here, so we store them to report # together with any others we have already captured. - self._errors.append(sys.exc_info()) + self._errors.append(cast(EXC_INFO, sys.exc_info())) finally: system_nursery.cancel_scope.cancel()