From c86f3d0467d7b5e8b0af85c1748dc48988b9eb63 Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Sat, 15 Mar 2025 13:19:59 +0530 Subject: [PATCH] added dedicated test file and moved timed_cache to tools --- docs/libp2p.rst | 1 - docs/libp2p.timed_cache.rst | 37 ----- docs/libp2p.tools.rst | 1 + docs/libp2p.tools.timed_cache.rst | 37 +++++ libp2p/pubsub/gossipsub.py | 2 +- libp2p/pubsub/pubsub.py | 18 +-- libp2p/tools/factories.py | 10 +- libp2p/{ => tools}/timed_cache/__init__.py | 0 .../timed_cache/base_timed_cache.py} | 19 +-- .../timed_cache/first_seen_cache.py | 6 +- .../timed_cache/last_seen_cache.py | 6 +- .../tools/timed_cache/test_timed_cache.py | 131 ++++++++++++++++++ 12 files changed, 201 insertions(+), 67 deletions(-) delete mode 100644 docs/libp2p.timed_cache.rst create mode 100644 docs/libp2p.tools.timed_cache.rst rename libp2p/{ => tools}/timed_cache/__init__.py (100%) rename libp2p/{timed_cache/basic_time_cache.py => tools/timed_cache/base_timed_cache.py} (76%) rename libp2p/{ => tools}/timed_cache/first_seen_cache.py (81%) rename libp2p/{ => tools}/timed_cache/last_seen_cache.py (84%) create mode 100644 tests/core/tools/timed_cache/test_timed_cache.py diff --git a/docs/libp2p.rst b/docs/libp2p.rst index 1c489d6d..8b1a8f08 100644 --- a/docs/libp2p.rst +++ b/docs/libp2p.rst @@ -17,7 +17,6 @@ Subpackages libp2p.pubsub libp2p.security libp2p.stream_muxer - libp2p.timed_cache libp2p.tools libp2p.transport diff --git a/docs/libp2p.timed_cache.rst b/docs/libp2p.timed_cache.rst deleted file mode 100644 index 8d93e90d..00000000 --- a/docs/libp2p.timed_cache.rst +++ /dev/null @@ -1,37 +0,0 @@ -libp2p.timed_cache package -========================== - -Submodules ----------- - -libp2p.timed\_cache.basic\_time\_cache module ---------------------------------------------- - -.. automodule:: libp2p.timed_cache.basic_time_cache - :members: - :undoc-members: - :show-inheritance: - -libp2p.timed\_cache.first\_seen\_cache module ---------------------------------------------- - -.. automodule:: libp2p.timed_cache.first_seen_cache - :members: - :undoc-members: - :show-inheritance: - -libp2p.timed\_cache.last\_seen\_cache module --------------------------------------------- - -.. automodule:: libp2p.timed_cache.last_seen_cache - :members: - :undoc-members: - :show-inheritance: - -Module contents ---------------- - -.. automodule:: libp2p.timed_cache - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/libp2p.tools.rst b/docs/libp2p.tools.rst index b412db74..c0457fb8 100644 --- a/docs/libp2p.tools.rst +++ b/docs/libp2p.tools.rst @@ -9,6 +9,7 @@ Subpackages libp2p.tools.async_service libp2p.tools.pubsub + libp2p.tools.timed_cache Submodules ---------- diff --git a/docs/libp2p.tools.timed_cache.rst b/docs/libp2p.tools.timed_cache.rst new file mode 100644 index 00000000..456af190 --- /dev/null +++ b/docs/libp2p.tools.timed_cache.rst @@ -0,0 +1,37 @@ +libp2p.tools.timed\_cache package +================================= + +Submodules +---------- + +libp2p.tools.timed\_cache.base\_timed\_cache module +--------------------------------------------------- + +.. automodule:: libp2p.tools.timed_cache.base_timed_cache + :members: + :undoc-members: + :show-inheritance: + +libp2p.tools.timed\_cache.first\_seen\_cache module +--------------------------------------------------- + +.. automodule:: libp2p.tools.timed_cache.first_seen_cache + :members: + :undoc-members: + :show-inheritance: + +libp2p.tools.timed\_cache.last\_seen\_cache module +-------------------------------------------------- + +.. automodule:: libp2p.tools.timed_cache.last_seen_cache + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.tools.timed_cache + :members: + :undoc-members: + :show-inheritance: diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 66dad7e1..f7ec49cb 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -88,7 +88,7 @@ class GossipSub(IPubsubRouter, Service): degree: int, degree_low: int, degree_high: int, - time_to_live: int, + time_to_live: int = 60, gossip_window: int = 3, gossip_history: int = 5, heartbeat_initial_delay: float = 0.1, diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 6fbb8798..e1deb0ab 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -11,11 +11,11 @@ import hashlib import logging import time from typing import ( - TYPE_CHECKING, Callable, NamedTuple, cast, ) +from typing import Any # noqa: F401 import base58 import trio @@ -26,6 +26,7 @@ from libp2p.abc import ( IPubsub, ISubscriptionAPI, ) +from libp2p.abc import IPubsubRouter # noqa: F401 from libp2p.crypto.keys import ( PrivateKey, ) @@ -53,12 +54,12 @@ from libp2p.network.stream.exceptions import ( from libp2p.peer.id import ( ID, ) -from libp2p.timed_cache.last_seen_cache import ( - LastSeenCache, -) from libp2p.tools.async_service import ( Service, ) +from libp2p.tools.timed_cache.last_seen_cache import ( + LastSeenCache, +) from libp2p.utils import ( encode_varint_prefixed, read_varint_prefixed_bytes, @@ -78,12 +79,6 @@ from .validators import ( signature_validator, ) -if TYPE_CHECKING: - from typing import Any # noqa: F401 - - from .abc import IPubsubRouter # noqa: F401 - - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/40e1c94708658b155f30cf99e4574f384756d83c/topic.go#L97 # noqa: E501 SUBSCRIPTION_CHANNEL_SIZE = 32 @@ -137,6 +132,7 @@ class Pubsub(Service, IPubsub): router: IPubsubRouter, cache_size: int = None, seen_ttl: int = 120, + sweep_interval: int = 60, strict_signing: bool = True, msg_id_constructor: Callable[ [rpc_pb2.Message], bytes @@ -188,7 +184,7 @@ class Pubsub(Service, IPubsub): else: self.sign_key = None - self.seen_messages = LastSeenCache(seen_ttl) + self.seen_messages = LastSeenCache(seen_ttl, sweep_interval) # Map of topics we are subscribed to blocking queues # for when the given topic receives a message diff --git a/libp2p/tools/factories.py b/libp2p/tools/factories.py index 309ccc1c..6744ddc3 100644 --- a/libp2p/tools/factories.py +++ b/libp2p/tools/factories.py @@ -424,7 +424,6 @@ class GossipsubFactory(factory.Factory): degree = GOSSIPSUB_PARAMS.degree degree_low = GOSSIPSUB_PARAMS.degree_low degree_high = GOSSIPSUB_PARAMS.degree_high - time_to_live = GOSSIPSUB_PARAMS.time_to_live gossip_window = GOSSIPSUB_PARAMS.gossip_window gossip_history = GOSSIPSUB_PARAMS.gossip_history heartbeat_initial_delay = GOSSIPSUB_PARAMS.heartbeat_initial_delay @@ -448,6 +447,7 @@ class PubsubFactory(factory.Factory): router: IPubsubRouter, cache_size: int, seen_ttl: int, + sweep_interval: int, strict_signing: bool, msg_id_constructor: Callable[[rpc_pb2.Message], bytes] = None, ) -> AsyncIterator[Pubsub]: @@ -456,6 +456,7 @@ class PubsubFactory(factory.Factory): router=router, cache_size=cache_size, seen_ttl=seen_ttl, + sweep_interval=sweep_interval, strict_signing=strict_signing, msg_id_constructor=msg_id_constructor, ) @@ -470,7 +471,8 @@ class PubsubFactory(factory.Factory): number: int, routers: Sequence[IPubsubRouter], cache_size: int = None, - seen_ttl: int = None, + seen_ttl: int = 120, + sweep_interval: int = 60, strict_signing: bool = False, security_protocol: TProtocol = None, muxer_opt: TMuxerOptions = None, @@ -488,6 +490,7 @@ class PubsubFactory(factory.Factory): router, cache_size, seen_ttl, + sweep_interval, strict_signing, msg_id_constructor, ) @@ -503,6 +506,7 @@ class PubsubFactory(factory.Factory): number: int, cache_size: int = None, seen_ttl: int = 120, + sweep_interval: int = 60, strict_signing: bool = False, protocols: Sequence[TProtocol] = None, security_protocol: TProtocol = None, @@ -520,6 +524,7 @@ class PubsubFactory(factory.Factory): floodsubs, cache_size, seen_ttl, + sweep_interval, strict_signing, security_protocol=security_protocol, muxer_opt=muxer_opt, @@ -567,7 +572,6 @@ class PubsubFactory(factory.Factory): degree=degree, degree_low=degree_low, degree_high=degree_high, - time_to_live=time_to_live, gossip_window=gossip_window, heartbeat_interval=heartbeat_interval, ) diff --git a/libp2p/timed_cache/__init__.py b/libp2p/tools/timed_cache/__init__.py similarity index 100% rename from libp2p/timed_cache/__init__.py rename to libp2p/tools/timed_cache/__init__.py diff --git a/libp2p/timed_cache/basic_time_cache.py b/libp2p/tools/timed_cache/base_timed_cache.py similarity index 76% rename from libp2p/timed_cache/basic_time_cache.py rename to libp2p/tools/timed_cache/base_timed_cache.py index a5ee9713..0fef90d1 100644 --- a/libp2p/timed_cache/basic_time_cache.py +++ b/libp2p/tools/timed_cache/base_timed_cache.py @@ -1,21 +1,24 @@ +from abc import ( + ABC, + abstractmethod, +) import threading import time -class TimedCache: +class BaseTimedCache(ABC): """Base class for Timed Cache with cleanup mechanism.""" cache: dict[bytes, int] - SWEEP_INTERVAL = 60 # 1-minute interval between each sweep - - def __init__(self, ttl: int) -> None: + def __init__(self, ttl: int, sweep_interval: int = 60) -> None: """ - Initialize a new TimedCache with a time-to-live for cache entries + Initialize a new BaseTimedCache with a time-to-live for cache entries :param ttl: no of seconds as time-to-live for each cache entry """ self.ttl = ttl + self.sweep_interval = sweep_interval self.lock = threading.Lock() self.cache = {} self._stop_event = threading.Event() @@ -23,7 +26,7 @@ class TimedCache: self._thread.start() def _background_cleanup(self) -> None: - while not self._stop_event.wait(self.SWEEP_INTERVAL): + while not self._stop_event.wait(self.sweep_interval): self._sweep() def _sweep(self) -> None: @@ -42,10 +45,10 @@ class TimedCache: def length(self) -> int: return len(self.cache) + @abstractmethod def add(self, key: bytes) -> bool: """To be implemented in subclasses.""" - raise NotImplementedError + @abstractmethod def has(self, key: bytes) -> bool: """To be implemented in subclasses.""" - raise NotImplementedError diff --git a/libp2p/timed_cache/first_seen_cache.py b/libp2p/tools/timed_cache/first_seen_cache.py similarity index 81% rename from libp2p/timed_cache/first_seen_cache.py rename to libp2p/tools/timed_cache/first_seen_cache.py index 1692ee59..473af04b 100644 --- a/libp2p/timed_cache/first_seen_cache.py +++ b/libp2p/tools/timed_cache/first_seen_cache.py @@ -1,11 +1,11 @@ import time -from .basic_time_cache import ( - TimedCache, +from .base_timed_cache import ( + BaseTimedCache, ) -class FirstSeenCache(TimedCache): +class FirstSeenCache(BaseTimedCache): """Cache where expiry is set only when first added.""" def add(self, key: bytes) -> bool: diff --git a/libp2p/timed_cache/last_seen_cache.py b/libp2p/tools/timed_cache/last_seen_cache.py similarity index 84% rename from libp2p/timed_cache/last_seen_cache.py rename to libp2p/tools/timed_cache/last_seen_cache.py index ffc70553..8cb07695 100644 --- a/libp2p/timed_cache/last_seen_cache.py +++ b/libp2p/tools/timed_cache/last_seen_cache.py @@ -1,11 +1,11 @@ import time -from .basic_time_cache import ( - TimedCache, +from .base_timed_cache import ( + BaseTimedCache, ) -class LastSeenCache(TimedCache): +class LastSeenCache(BaseTimedCache): """Cache where expiry is updated on every access.""" def add(self, key: bytes) -> bool: diff --git a/tests/core/tools/timed_cache/test_timed_cache.py b/tests/core/tools/timed_cache/test_timed_cache.py new file mode 100644 index 00000000..99c4aa9f --- /dev/null +++ b/tests/core/tools/timed_cache/test_timed_cache.py @@ -0,0 +1,131 @@ +import pytest +import trio + +from libp2p.tools.timed_cache.first_seen_cache import ( + FirstSeenCache, +) +from libp2p.tools.timed_cache.last_seen_cache import ( + LastSeenCache, +) + +MSG_1 = b"msg1" + + +@pytest.mark.trio +async def test_simple_first_seen_cache(): + """Test that FirstSeenCache correctly stores and retrieves messages.""" + cache = FirstSeenCache(ttl=2, sweep_interval=1) + + assert cache.add(MSG_1) is True # First addition should return True + assert cache.has(MSG_1) is True # Should exist + assert cache.add(MSG_1) is False # Duplicate should return False + + await trio.sleep(2.5) # Wait beyond TTL + assert cache.has(MSG_1) is False # Should be expired + + cache.stop() + + +@pytest.mark.trio +async def test_simple_last_seen_cache(): + """Test that LastSeenCache correctly refreshes expiry when accessed.""" + cache = LastSeenCache(ttl=2, sweep_interval=1) + + assert cache.add(MSG_1) is True # First addition should return True + assert cache.has(MSG_1) is True # Should exist + + await trio.sleep(1) + assert cache.has(MSG_1) is True # Accessing should extend TTL + + await trio.sleep(1.5) # Would have expired if TTL wasn't extended + assert cache.has(MSG_1) is True # Should still exist + + await trio.sleep(2.5) # Now let it expire + assert cache.has(MSG_1) is False # Should be expired + + cache.stop() + + +@pytest.mark.trio +async def test_timed_cache_expiry(): + """Test expiry behavior in FirstSeenCache and LastSeenCache.""" + for cache_class in [FirstSeenCache, LastSeenCache]: + cache = cache_class(ttl=1, sweep_interval=1) + + assert cache.add(MSG_1) is True + await trio.sleep(1.5) # Let it expire + assert cache.has(MSG_1) is False # Should be expired + + cache.stop() + + +@pytest.mark.trio +async def test_concurrent_access(): + """Test that multiple tasks can safely access and modify the cache.""" + cache = LastSeenCache(ttl=2, sweep_interval=1) + + async def add_message(i): + cache.add(f"msg{i}".encode()) + assert cache.has(f"msg{i}".encode()) is True + + async with trio.open_nursery() as nursery: + for i in range(50): + nursery.start_soon(add_message, i) + + # Ensure all elements exist before expiry + for i in range(50): + assert cache.has(f"msg{i}".encode()) is True + + cache.stop() + + +@pytest.mark.trio +async def test_timed_cache_stress_test(): + """Stress test cache by adding a large number of elements.""" + cache = FirstSeenCache(ttl=2, sweep_interval=1) + + for i in range(1000): + assert cache.add(f"msg{i}".encode()) is True # All should be added successfully + + # Ensure all elements exist before expiry + for i in range(1000): + assert cache.has(f"msg{i}".encode()) is True + + await trio.sleep(2.5) # Wait for expiry + + # Ensure all elements have expired + for i in range(1000): + assert cache.has(f"msg{i}".encode()) is False + + cache.stop() + + +@pytest.mark.trio +async def test_expiry_removal(): + """Test that expired items are removed by the background sweeper.""" + cache = LastSeenCache(ttl=2, sweep_interval=1) + cache.add(MSG_1) + await trio.sleep(2.1) # Wait for sweeper to remove expired items + assert MSG_1 not in cache.cache # Should be removed + cache.stop() + + +@pytest.mark.trio +async def test_readding_after_expiry(): + """Test that an item can be re-added after expiry.""" + cache = FirstSeenCache(ttl=2, sweep_interval=1) + cache.add(MSG_1) + await trio.sleep(2) # Let it expire + assert cache.add(MSG_1) is True # Should allow re-adding + assert cache.has(MSG_1) is True + cache.stop() + + +@pytest.mark.trio +async def test_multiple_adds_before_expiry(): + """Ensure multiple adds before expiry behave correctly.""" + cache = LastSeenCache(ttl=5) + assert cache.add(MSG_1) is True + assert cache.add(MSG_1) is False # Second add should return False + assert cache.has(MSG_1) is True # Should still be in cache + cache.stop()