added dedicated test file and moved timed_cache to tools

This commit is contained in:
Mystical
2025-03-15 13:19:59 +05:30
committed by Paul Robinson
parent bf699351e1
commit c86f3d0467
12 changed files with 201 additions and 67 deletions

View File

@ -17,7 +17,6 @@ Subpackages
libp2p.pubsub libp2p.pubsub
libp2p.security libp2p.security
libp2p.stream_muxer libp2p.stream_muxer
libp2p.timed_cache
libp2p.tools libp2p.tools
libp2p.transport libp2p.transport

View File

@ -1,37 +0,0 @@
libp2p.timed_cache package
==========================
Submodules
----------
libp2p.timed\_cache.basic\_time\_cache module
---------------------------------------------
.. automodule:: libp2p.timed_cache.basic_time_cache
:members:
:undoc-members:
:show-inheritance:
libp2p.timed\_cache.first\_seen\_cache module
---------------------------------------------
.. automodule:: libp2p.timed_cache.first_seen_cache
:members:
:undoc-members:
:show-inheritance:
libp2p.timed\_cache.last\_seen\_cache module
--------------------------------------------
.. automodule:: libp2p.timed_cache.last_seen_cache
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: libp2p.timed_cache
:members:
:undoc-members:
:show-inheritance:

View File

@ -9,6 +9,7 @@ Subpackages
libp2p.tools.async_service libp2p.tools.async_service
libp2p.tools.pubsub libp2p.tools.pubsub
libp2p.tools.timed_cache
Submodules Submodules
---------- ----------

View File

@ -0,0 +1,37 @@
libp2p.tools.timed\_cache package
=================================
Submodules
----------
libp2p.tools.timed\_cache.base\_timed\_cache module
---------------------------------------------------
.. automodule:: libp2p.tools.timed_cache.base_timed_cache
:members:
:undoc-members:
:show-inheritance:
libp2p.tools.timed\_cache.first\_seen\_cache module
---------------------------------------------------
.. automodule:: libp2p.tools.timed_cache.first_seen_cache
:members:
:undoc-members:
:show-inheritance:
libp2p.tools.timed\_cache.last\_seen\_cache module
--------------------------------------------------
.. automodule:: libp2p.tools.timed_cache.last_seen_cache
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: libp2p.tools.timed_cache
:members:
:undoc-members:
:show-inheritance:

View File

@ -88,7 +88,7 @@ class GossipSub(IPubsubRouter, Service):
degree: int, degree: int,
degree_low: int, degree_low: int,
degree_high: int, degree_high: int,
time_to_live: int, time_to_live: int = 60,
gossip_window: int = 3, gossip_window: int = 3,
gossip_history: int = 5, gossip_history: int = 5,
heartbeat_initial_delay: float = 0.1, heartbeat_initial_delay: float = 0.1,

View File

@ -11,11 +11,11 @@ import hashlib
import logging import logging
import time import time
from typing import ( from typing import (
TYPE_CHECKING,
Callable, Callable,
NamedTuple, NamedTuple,
cast, cast,
) )
from typing import Any # noqa: F401
import base58 import base58
import trio import trio
@ -26,6 +26,7 @@ from libp2p.abc import (
IPubsub, IPubsub,
ISubscriptionAPI, ISubscriptionAPI,
) )
from libp2p.abc import IPubsubRouter # noqa: F401
from libp2p.crypto.keys import ( from libp2p.crypto.keys import (
PrivateKey, PrivateKey,
) )
@ -53,12 +54,12 @@ from libp2p.network.stream.exceptions import (
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
) )
from libp2p.timed_cache.last_seen_cache import (
LastSeenCache,
)
from libp2p.tools.async_service import ( from libp2p.tools.async_service import (
Service, Service,
) )
from libp2p.tools.timed_cache.last_seen_cache import (
LastSeenCache,
)
from libp2p.utils import ( from libp2p.utils import (
encode_varint_prefixed, encode_varint_prefixed,
read_varint_prefixed_bytes, read_varint_prefixed_bytes,
@ -78,12 +79,6 @@ from .validators import (
signature_validator, signature_validator,
) )
if TYPE_CHECKING:
from typing import Any # noqa: F401
from .abc import IPubsubRouter # noqa: F401
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/40e1c94708658b155f30cf99e4574f384756d83c/topic.go#L97 # noqa: E501 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/40e1c94708658b155f30cf99e4574f384756d83c/topic.go#L97 # noqa: E501
SUBSCRIPTION_CHANNEL_SIZE = 32 SUBSCRIPTION_CHANNEL_SIZE = 32
@ -137,6 +132,7 @@ class Pubsub(Service, IPubsub):
router: IPubsubRouter, router: IPubsubRouter,
cache_size: int = None, cache_size: int = None,
seen_ttl: int = 120, seen_ttl: int = 120,
sweep_interval: int = 60,
strict_signing: bool = True, strict_signing: bool = True,
msg_id_constructor: Callable[ msg_id_constructor: Callable[
[rpc_pb2.Message], bytes [rpc_pb2.Message], bytes
@ -188,7 +184,7 @@ class Pubsub(Service, IPubsub):
else: else:
self.sign_key = None self.sign_key = None
self.seen_messages = LastSeenCache(seen_ttl) self.seen_messages = LastSeenCache(seen_ttl, sweep_interval)
# Map of topics we are subscribed to blocking queues # Map of topics we are subscribed to blocking queues
# for when the given topic receives a message # for when the given topic receives a message

View File

@ -424,7 +424,6 @@ class GossipsubFactory(factory.Factory):
degree = GOSSIPSUB_PARAMS.degree degree = GOSSIPSUB_PARAMS.degree
degree_low = GOSSIPSUB_PARAMS.degree_low degree_low = GOSSIPSUB_PARAMS.degree_low
degree_high = GOSSIPSUB_PARAMS.degree_high degree_high = GOSSIPSUB_PARAMS.degree_high
time_to_live = GOSSIPSUB_PARAMS.time_to_live
gossip_window = GOSSIPSUB_PARAMS.gossip_window gossip_window = GOSSIPSUB_PARAMS.gossip_window
gossip_history = GOSSIPSUB_PARAMS.gossip_history gossip_history = GOSSIPSUB_PARAMS.gossip_history
heartbeat_initial_delay = GOSSIPSUB_PARAMS.heartbeat_initial_delay heartbeat_initial_delay = GOSSIPSUB_PARAMS.heartbeat_initial_delay
@ -448,6 +447,7 @@ class PubsubFactory(factory.Factory):
router: IPubsubRouter, router: IPubsubRouter,
cache_size: int, cache_size: int,
seen_ttl: int, seen_ttl: int,
sweep_interval: int,
strict_signing: bool, strict_signing: bool,
msg_id_constructor: Callable[[rpc_pb2.Message], bytes] = None, msg_id_constructor: Callable[[rpc_pb2.Message], bytes] = None,
) -> AsyncIterator[Pubsub]: ) -> AsyncIterator[Pubsub]:
@ -456,6 +456,7 @@ class PubsubFactory(factory.Factory):
router=router, router=router,
cache_size=cache_size, cache_size=cache_size,
seen_ttl=seen_ttl, seen_ttl=seen_ttl,
sweep_interval=sweep_interval,
strict_signing=strict_signing, strict_signing=strict_signing,
msg_id_constructor=msg_id_constructor, msg_id_constructor=msg_id_constructor,
) )
@ -470,7 +471,8 @@ class PubsubFactory(factory.Factory):
number: int, number: int,
routers: Sequence[IPubsubRouter], routers: Sequence[IPubsubRouter],
cache_size: int = None, cache_size: int = None,
seen_ttl: int = None, seen_ttl: int = 120,
sweep_interval: int = 60,
strict_signing: bool = False, strict_signing: bool = False,
security_protocol: TProtocol = None, security_protocol: TProtocol = None,
muxer_opt: TMuxerOptions = None, muxer_opt: TMuxerOptions = None,
@ -488,6 +490,7 @@ class PubsubFactory(factory.Factory):
router, router,
cache_size, cache_size,
seen_ttl, seen_ttl,
sweep_interval,
strict_signing, strict_signing,
msg_id_constructor, msg_id_constructor,
) )
@ -503,6 +506,7 @@ class PubsubFactory(factory.Factory):
number: int, number: int,
cache_size: int = None, cache_size: int = None,
seen_ttl: int = 120, seen_ttl: int = 120,
sweep_interval: int = 60,
strict_signing: bool = False, strict_signing: bool = False,
protocols: Sequence[TProtocol] = None, protocols: Sequence[TProtocol] = None,
security_protocol: TProtocol = None, security_protocol: TProtocol = None,
@ -520,6 +524,7 @@ class PubsubFactory(factory.Factory):
floodsubs, floodsubs,
cache_size, cache_size,
seen_ttl, seen_ttl,
sweep_interval,
strict_signing, strict_signing,
security_protocol=security_protocol, security_protocol=security_protocol,
muxer_opt=muxer_opt, muxer_opt=muxer_opt,
@ -567,7 +572,6 @@ class PubsubFactory(factory.Factory):
degree=degree, degree=degree,
degree_low=degree_low, degree_low=degree_low,
degree_high=degree_high, degree_high=degree_high,
time_to_live=time_to_live,
gossip_window=gossip_window, gossip_window=gossip_window,
heartbeat_interval=heartbeat_interval, heartbeat_interval=heartbeat_interval,
) )

View File

@ -1,21 +1,24 @@
from abc import (
ABC,
abstractmethod,
)
import threading import threading
import time import time
class TimedCache: class BaseTimedCache(ABC):
"""Base class for Timed Cache with cleanup mechanism.""" """Base class for Timed Cache with cleanup mechanism."""
cache: dict[bytes, int] cache: dict[bytes, int]
SWEEP_INTERVAL = 60 # 1-minute interval between each sweep def __init__(self, ttl: int, sweep_interval: int = 60) -> None:
def __init__(self, ttl: int) -> None:
""" """
Initialize a new TimedCache with a time-to-live for cache entries Initialize a new BaseTimedCache with a time-to-live for cache entries
:param ttl: no of seconds as time-to-live for each cache entry :param ttl: no of seconds as time-to-live for each cache entry
""" """
self.ttl = ttl self.ttl = ttl
self.sweep_interval = sweep_interval
self.lock = threading.Lock() self.lock = threading.Lock()
self.cache = {} self.cache = {}
self._stop_event = threading.Event() self._stop_event = threading.Event()
@ -23,7 +26,7 @@ class TimedCache:
self._thread.start() self._thread.start()
def _background_cleanup(self) -> None: def _background_cleanup(self) -> None:
while not self._stop_event.wait(self.SWEEP_INTERVAL): while not self._stop_event.wait(self.sweep_interval):
self._sweep() self._sweep()
def _sweep(self) -> None: def _sweep(self) -> None:
@ -42,10 +45,10 @@ class TimedCache:
def length(self) -> int: def length(self) -> int:
return len(self.cache) return len(self.cache)
@abstractmethod
def add(self, key: bytes) -> bool: def add(self, key: bytes) -> bool:
"""To be implemented in subclasses.""" """To be implemented in subclasses."""
raise NotImplementedError
@abstractmethod
def has(self, key: bytes) -> bool: def has(self, key: bytes) -> bool:
"""To be implemented in subclasses.""" """To be implemented in subclasses."""
raise NotImplementedError

View File

@ -1,11 +1,11 @@
import time import time
from .basic_time_cache import ( from .base_timed_cache import (
TimedCache, BaseTimedCache,
) )
class FirstSeenCache(TimedCache): class FirstSeenCache(BaseTimedCache):
"""Cache where expiry is set only when first added.""" """Cache where expiry is set only when first added."""
def add(self, key: bytes) -> bool: def add(self, key: bytes) -> bool:

View File

@ -1,11 +1,11 @@
import time import time
from .basic_time_cache import ( from .base_timed_cache import (
TimedCache, BaseTimedCache,
) )
class LastSeenCache(TimedCache): class LastSeenCache(BaseTimedCache):
"""Cache where expiry is updated on every access.""" """Cache where expiry is updated on every access."""
def add(self, key: bytes) -> bool: def add(self, key: bytes) -> bool:

View File

@ -0,0 +1,131 @@
import pytest
import trio
from libp2p.tools.timed_cache.first_seen_cache import (
FirstSeenCache,
)
from libp2p.tools.timed_cache.last_seen_cache import (
LastSeenCache,
)
MSG_1 = b"msg1"
@pytest.mark.trio
async def test_simple_first_seen_cache():
"""Test that FirstSeenCache correctly stores and retrieves messages."""
cache = FirstSeenCache(ttl=2, sweep_interval=1)
assert cache.add(MSG_1) is True # First addition should return True
assert cache.has(MSG_1) is True # Should exist
assert cache.add(MSG_1) is False # Duplicate should return False
await trio.sleep(2.5) # Wait beyond TTL
assert cache.has(MSG_1) is False # Should be expired
cache.stop()
@pytest.mark.trio
async def test_simple_last_seen_cache():
"""Test that LastSeenCache correctly refreshes expiry when accessed."""
cache = LastSeenCache(ttl=2, sweep_interval=1)
assert cache.add(MSG_1) is True # First addition should return True
assert cache.has(MSG_1) is True # Should exist
await trio.sleep(1)
assert cache.has(MSG_1) is True # Accessing should extend TTL
await trio.sleep(1.5) # Would have expired if TTL wasn't extended
assert cache.has(MSG_1) is True # Should still exist
await trio.sleep(2.5) # Now let it expire
assert cache.has(MSG_1) is False # Should be expired
cache.stop()
@pytest.mark.trio
async def test_timed_cache_expiry():
"""Test expiry behavior in FirstSeenCache and LastSeenCache."""
for cache_class in [FirstSeenCache, LastSeenCache]:
cache = cache_class(ttl=1, sweep_interval=1)
assert cache.add(MSG_1) is True
await trio.sleep(1.5) # Let it expire
assert cache.has(MSG_1) is False # Should be expired
cache.stop()
@pytest.mark.trio
async def test_concurrent_access():
"""Test that multiple tasks can safely access and modify the cache."""
cache = LastSeenCache(ttl=2, sweep_interval=1)
async def add_message(i):
cache.add(f"msg{i}".encode())
assert cache.has(f"msg{i}".encode()) is True
async with trio.open_nursery() as nursery:
for i in range(50):
nursery.start_soon(add_message, i)
# Ensure all elements exist before expiry
for i in range(50):
assert cache.has(f"msg{i}".encode()) is True
cache.stop()
@pytest.mark.trio
async def test_timed_cache_stress_test():
"""Stress test cache by adding a large number of elements."""
cache = FirstSeenCache(ttl=2, sweep_interval=1)
for i in range(1000):
assert cache.add(f"msg{i}".encode()) is True # All should be added successfully
# Ensure all elements exist before expiry
for i in range(1000):
assert cache.has(f"msg{i}".encode()) is True
await trio.sleep(2.5) # Wait for expiry
# Ensure all elements have expired
for i in range(1000):
assert cache.has(f"msg{i}".encode()) is False
cache.stop()
@pytest.mark.trio
async def test_expiry_removal():
"""Test that expired items are removed by the background sweeper."""
cache = LastSeenCache(ttl=2, sweep_interval=1)
cache.add(MSG_1)
await trio.sleep(2.1) # Wait for sweeper to remove expired items
assert MSG_1 not in cache.cache # Should be removed
cache.stop()
@pytest.mark.trio
async def test_readding_after_expiry():
"""Test that an item can be re-added after expiry."""
cache = FirstSeenCache(ttl=2, sweep_interval=1)
cache.add(MSG_1)
await trio.sleep(2) # Let it expire
assert cache.add(MSG_1) is True # Should allow re-adding
assert cache.has(MSG_1) is True
cache.stop()
@pytest.mark.trio
async def test_multiple_adds_before_expiry():
"""Ensure multiple adds before expiry behave correctly."""
cache = LastSeenCache(ttl=5)
assert cache.add(MSG_1) is True
assert cache.add(MSG_1) is False # Second add should return False
assert cache.has(MSG_1) is True # Should still be in cache
cache.stop()