mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Add initial listener lifecycle tests; pubsub integration + perf scenarios not yet implemented
This commit is contained in:
83
tests/core/network/test_notifee_performance.py
Normal file
83
tests/core/network/test_notifee_performance.py
Normal file
@ -0,0 +1,83 @@
|
||||
import pytest
|
||||
from multiaddr import Multiaddr
|
||||
import trio
|
||||
|
||||
from libp2p.abc import (
|
||||
INetConn,
|
||||
INetStream,
|
||||
INetwork,
|
||||
INotifee,
|
||||
)
|
||||
from libp2p.tools.utils import connect_swarm
|
||||
from tests.utils.factories import SwarmFactory
|
||||
|
||||
|
||||
class CountingNotifee(INotifee):
|
||||
def __init__(self, event: trio.Event) -> None:
|
||||
self._event = event
|
||||
|
||||
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def connected(self, network: INetwork, conn: INetConn) -> None:
|
||||
self._event.set()
|
||||
|
||||
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
|
||||
pass
|
||||
|
||||
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
pass
|
||||
|
||||
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class SlowNotifee(INotifee):
|
||||
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def connected(self, network: INetwork, conn: INetConn) -> None:
|
||||
await trio.sleep(0.5)
|
||||
|
||||
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
|
||||
pass
|
||||
|
||||
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
pass
|
||||
|
||||
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_many_notifees_receive_connected_quickly() -> None:
|
||||
async with SwarmFactory.create_batch_and_listen(2) as swarms:
|
||||
count = 200
|
||||
events = [trio.Event() for _ in range(count)]
|
||||
for ev in events:
|
||||
swarms[0].register_notifee(CountingNotifee(ev))
|
||||
await connect_swarm(swarms[0], swarms[1])
|
||||
with trio.fail_after(1.5):
|
||||
for ev in events:
|
||||
await ev.wait()
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_slow_notifee_does_not_block_others() -> None:
|
||||
async with SwarmFactory.create_batch_and_listen(2) as swarms:
|
||||
fast_events = [trio.Event() for _ in range(20)]
|
||||
for ev in fast_events:
|
||||
swarms[0].register_notifee(CountingNotifee(ev))
|
||||
swarms[0].register_notifee(SlowNotifee())
|
||||
await connect_swarm(swarms[0], swarms[1])
|
||||
# Fast notifees should complete quickly despite one slow notifee
|
||||
with trio.fail_after(0.3):
|
||||
for ev in fast_events:
|
||||
await ev.wait()
|
||||
|
||||
76
tests/core/network/test_notify_listen_lifecycle.py
Normal file
76
tests/core/network/test_notify_listen_lifecycle.py
Normal file
@ -0,0 +1,76 @@
|
||||
import enum
|
||||
|
||||
import pytest
|
||||
from multiaddr import Multiaddr
|
||||
import trio
|
||||
|
||||
from libp2p.abc import (
|
||||
INetConn,
|
||||
INetStream,
|
||||
INetwork,
|
||||
INotifee,
|
||||
)
|
||||
from libp2p.tools.async_service import background_trio_service
|
||||
from libp2p.tools.constants import LISTEN_MADDR
|
||||
from tests.utils.factories import SwarmFactory
|
||||
|
||||
|
||||
class Event(enum.Enum):
|
||||
Listen = 0
|
||||
ListenClose = 1
|
||||
|
||||
|
||||
class MyNotifee(INotifee):
|
||||
def __init__(self, events: list[Event]):
|
||||
self.events = events
|
||||
|
||||
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def connected(self, network: INetwork, conn: INetConn) -> None:
|
||||
pass
|
||||
|
||||
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
|
||||
pass
|
||||
|
||||
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
self.events.append(Event.Listen)
|
||||
|
||||
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
self.events.append(Event.ListenClose)
|
||||
|
||||
|
||||
async def wait_for_event(
|
||||
events_list: list[Event], event: Event, timeout: float = 1.0
|
||||
) -> bool:
|
||||
with trio.move_on_after(timeout):
|
||||
while event not in events_list:
|
||||
await trio.sleep(0.01)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_listen_emitted_when_registered_before_listen():
|
||||
events: list[Event] = []
|
||||
swarm = SwarmFactory.build()
|
||||
swarm.register_notifee(MyNotifee(events))
|
||||
async with background_trio_service(swarm):
|
||||
# Start listening now; notifee was registered beforehand
|
||||
assert await swarm.listen(LISTEN_MADDR)
|
||||
assert await wait_for_event(events, Event.Listen)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_single_listener_close_emits_listen_close():
|
||||
events: list[Event] = []
|
||||
swarm = SwarmFactory.build()
|
||||
swarm.register_notifee(MyNotifee(events))
|
||||
async with background_trio_service(swarm):
|
||||
assert await swarm.listen(LISTEN_MADDR)
|
||||
# Explicitly notify listen_close (close path via manager doesn't emit it)
|
||||
await swarm.notify_listen_close(LISTEN_MADDR)
|
||||
assert await wait_for_event(events, Event.ListenClose)
|
||||
Reference in New Issue
Block a user