mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 08:00:54 +00:00
Merge branch 'main' into add-ws-transport
This commit is contained in:
82
tests/core/network/test_notifee_performance.py
Normal file
82
tests/core/network/test_notifee_performance.py
Normal file
@ -0,0 +1,82 @@
|
||||
import pytest
|
||||
from multiaddr import Multiaddr
|
||||
import trio
|
||||
|
||||
from libp2p.abc import (
|
||||
INetConn,
|
||||
INetStream,
|
||||
INetwork,
|
||||
INotifee,
|
||||
)
|
||||
from libp2p.tools.utils import connect_swarm
|
||||
from tests.utils.factories import SwarmFactory
|
||||
|
||||
|
||||
class CountingNotifee(INotifee):
|
||||
def __init__(self, event: trio.Event) -> None:
|
||||
self._event = event
|
||||
|
||||
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def connected(self, network: INetwork, conn: INetConn) -> None:
|
||||
self._event.set()
|
||||
|
||||
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
|
||||
pass
|
||||
|
||||
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
pass
|
||||
|
||||
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class SlowNotifee(INotifee):
|
||||
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def connected(self, network: INetwork, conn: INetConn) -> None:
|
||||
await trio.sleep(0.5)
|
||||
|
||||
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
|
||||
pass
|
||||
|
||||
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
pass
|
||||
|
||||
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_many_notifees_receive_connected_quickly() -> None:
|
||||
async with SwarmFactory.create_batch_and_listen(2) as swarms:
|
||||
count = 200
|
||||
events = [trio.Event() for _ in range(count)]
|
||||
for ev in events:
|
||||
swarms[0].register_notifee(CountingNotifee(ev))
|
||||
await connect_swarm(swarms[0], swarms[1])
|
||||
with trio.fail_after(1.5):
|
||||
for ev in events:
|
||||
await ev.wait()
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_slow_notifee_does_not_block_others() -> None:
|
||||
async with SwarmFactory.create_batch_and_listen(2) as swarms:
|
||||
fast_events = [trio.Event() for _ in range(20)]
|
||||
for ev in fast_events:
|
||||
swarms[0].register_notifee(CountingNotifee(ev))
|
||||
swarms[0].register_notifee(SlowNotifee())
|
||||
await connect_swarm(swarms[0], swarms[1])
|
||||
# Fast notifees should complete quickly despite one slow notifee
|
||||
with trio.fail_after(0.3):
|
||||
for ev in fast_events:
|
||||
await ev.wait()
|
||||
76
tests/core/network/test_notify_listen_lifecycle.py
Normal file
76
tests/core/network/test_notify_listen_lifecycle.py
Normal file
@ -0,0 +1,76 @@
|
||||
import enum
|
||||
|
||||
import pytest
|
||||
from multiaddr import Multiaddr
|
||||
import trio
|
||||
|
||||
from libp2p.abc import (
|
||||
INetConn,
|
||||
INetStream,
|
||||
INetwork,
|
||||
INotifee,
|
||||
)
|
||||
from libp2p.tools.async_service import background_trio_service
|
||||
from libp2p.tools.constants import LISTEN_MADDR
|
||||
from tests.utils.factories import SwarmFactory
|
||||
|
||||
|
||||
class Event(enum.Enum):
|
||||
Listen = 0
|
||||
ListenClose = 1
|
||||
|
||||
|
||||
class MyNotifee(INotifee):
|
||||
def __init__(self, events: list[Event]):
|
||||
self.events = events
|
||||
|
||||
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
async def connected(self, network: INetwork, conn: INetConn) -> None:
|
||||
pass
|
||||
|
||||
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
|
||||
pass
|
||||
|
||||
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
self.events.append(Event.Listen)
|
||||
|
||||
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
|
||||
self.events.append(Event.ListenClose)
|
||||
|
||||
|
||||
async def wait_for_event(
|
||||
events_list: list[Event], event: Event, timeout: float = 1.0
|
||||
) -> bool:
|
||||
with trio.move_on_after(timeout):
|
||||
while event not in events_list:
|
||||
await trio.sleep(0.01)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_listen_emitted_when_registered_before_listen():
|
||||
events: list[Event] = []
|
||||
swarm = SwarmFactory.build()
|
||||
swarm.register_notifee(MyNotifee(events))
|
||||
async with background_trio_service(swarm):
|
||||
# Start listening now; notifee was registered beforehand
|
||||
assert await swarm.listen(LISTEN_MADDR)
|
||||
assert await wait_for_event(events, Event.Listen)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_single_listener_close_emits_listen_close():
|
||||
events: list[Event] = []
|
||||
swarm = SwarmFactory.build()
|
||||
swarm.register_notifee(MyNotifee(events))
|
||||
async with background_trio_service(swarm):
|
||||
assert await swarm.listen(LISTEN_MADDR)
|
||||
# Explicitly notify listen_close (close path via manager doesn't emit it)
|
||||
await swarm.notify_listen_close(LISTEN_MADDR)
|
||||
assert await wait_for_event(events, Event.ListenClose)
|
||||
@ -1,9 +1,9 @@
|
||||
from collections import deque
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
||||
from libp2p.abc import (
|
||||
IMultiselectCommunicator,
|
||||
)
|
||||
from libp2p.abc import IMultiselectCommunicator, INetStream
|
||||
from libp2p.custom_types import TProtocol
|
||||
from libp2p.protocol_muxer.exceptions import (
|
||||
MultiselectClientError,
|
||||
@ -13,6 +13,10 @@ from libp2p.protocol_muxer.multiselect import Multiselect
|
||||
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
||||
|
||||
|
||||
async def dummy_handler(stream: INetStream) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class DummyMultiselectCommunicator(IMultiselectCommunicator):
|
||||
"""
|
||||
Dummy MultiSelectCommunicator to test out negotiate timmeout.
|
||||
@ -31,7 +35,7 @@ class DummyMultiselectCommunicator(IMultiselectCommunicator):
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_select_one_of_timeout():
|
||||
async def test_select_one_of_timeout() -> None:
|
||||
ECHO = TProtocol("/echo/1.0.0")
|
||||
communicator = DummyMultiselectCommunicator()
|
||||
|
||||
@ -42,7 +46,7 @@ async def test_select_one_of_timeout():
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_query_multistream_command_timeout():
|
||||
async def test_query_multistream_command_timeout() -> None:
|
||||
communicator = DummyMultiselectCommunicator()
|
||||
client = MultiselectClient()
|
||||
|
||||
@ -51,9 +55,95 @@ async def test_query_multistream_command_timeout():
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_timeout():
|
||||
async def test_negotiate_timeout() -> None:
|
||||
communicator = DummyMultiselectCommunicator()
|
||||
server = Multiselect()
|
||||
|
||||
with pytest.raises(MultiselectError, match="handshake read timeout"):
|
||||
await server.negotiate(communicator, 2)
|
||||
|
||||
|
||||
class HandshakeThenHangCommunicator(IMultiselectCommunicator):
|
||||
handshaked: bool
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.handshaked = False
|
||||
|
||||
async def write(self, msg_str: str) -> None:
|
||||
if msg_str == "/multistream/1.0.0":
|
||||
self.handshaked = True
|
||||
return
|
||||
|
||||
async def read(self) -> str:
|
||||
if not self.handshaked:
|
||||
return "/multistream/1.0.0"
|
||||
# After handshake, hang on read.
|
||||
await trio.sleep_forever()
|
||||
# Should not be reached.
|
||||
return ""
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_timeout_post_handshake() -> None:
|
||||
communicator = HandshakeThenHangCommunicator()
|
||||
server = Multiselect()
|
||||
with pytest.raises(MultiselectError, match="handshake read timeout"):
|
||||
await server.negotiate(communicator, 1)
|
||||
|
||||
|
||||
class MockCommunicator(IMultiselectCommunicator):
|
||||
def __init__(self, commands_to_read: list[str]):
|
||||
self.read_queue = deque(commands_to_read)
|
||||
self.written_data: list[str] = []
|
||||
|
||||
async def write(self, msg_str: str) -> None:
|
||||
self.written_data.append(msg_str)
|
||||
|
||||
async def read(self) -> str:
|
||||
if not self.read_queue:
|
||||
raise EOFError
|
||||
return self.read_queue.popleft()
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_empty_string_command() -> None:
|
||||
# server receives an empty string, which means client wants `None` protocol.
|
||||
server = Multiselect({None: dummy_handler})
|
||||
# Handshake, then empty command
|
||||
communicator = MockCommunicator(["/multistream/1.0.0", ""])
|
||||
protocol, handler = await server.negotiate(communicator)
|
||||
assert protocol is None
|
||||
assert handler == dummy_handler
|
||||
# Check that server sent back handshake and the protocol confirmation (empty string)
|
||||
assert communicator.written_data == ["/multistream/1.0.0", ""]
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_with_none_handler() -> None:
|
||||
# server has None handler, client sends "" to select it.
|
||||
server = Multiselect({None: dummy_handler, TProtocol("/proto1"): dummy_handler})
|
||||
# Handshake, then empty command
|
||||
communicator = MockCommunicator(["/multistream/1.0.0", ""])
|
||||
protocol, handler = await server.negotiate(communicator)
|
||||
assert protocol is None
|
||||
assert handler == dummy_handler
|
||||
# Check written data: handshake, protocol confirmation
|
||||
assert communicator.written_data == ["/multistream/1.0.0", ""]
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_with_none_handler_ls() -> None:
|
||||
# server has None handler, client sends "ls" then empty string.
|
||||
server = Multiselect({None: dummy_handler, TProtocol("/proto1"): dummy_handler})
|
||||
# Handshake, ls, empty command
|
||||
communicator = MockCommunicator(["/multistream/1.0.0", "ls", ""])
|
||||
protocol, handler = await server.negotiate(communicator)
|
||||
assert protocol is None
|
||||
assert handler == dummy_handler
|
||||
# Check written data: handshake, ls response, protocol confirmation
|
||||
assert communicator.written_data[0] == "/multistream/1.0.0"
|
||||
assert "/proto1" in communicator.written_data[1]
|
||||
# Note: `ls` should not list the `None` protocol.
|
||||
assert "None" not in communicator.written_data[1]
|
||||
assert "\n\n" not in communicator.written_data[1]
|
||||
assert communicator.written_data[2] == ""
|
||||
|
||||
@ -159,3 +159,41 @@ async def test_get_protocols_returns_all_registered_protocols():
|
||||
protocols = ms.get_protocols()
|
||||
|
||||
assert set(protocols) == {p1, p2, p3}
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_optional_tprotocol(security_protocol):
|
||||
with pytest.raises(Exception):
|
||||
await perform_simple_test(
|
||||
None,
|
||||
[None],
|
||||
[None],
|
||||
security_protocol,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_optional_tprotocol_client_none_server_no_none(
|
||||
security_protocol,
|
||||
):
|
||||
with pytest.raises(Exception):
|
||||
await perform_simple_test(None, [None], [PROTOCOL_ECHO], security_protocol)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_optional_tprotocol_client_none_in_list(security_protocol):
|
||||
expected_selected_protocol = PROTOCOL_ECHO
|
||||
await perform_simple_test(
|
||||
expected_selected_protocol,
|
||||
[None, PROTOCOL_ECHO],
|
||||
[PROTOCOL_ECHO],
|
||||
security_protocol,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_negotiate_optional_tprotocol_server_none_client_other(
|
||||
security_protocol,
|
||||
):
|
||||
with pytest.raises(Exception):
|
||||
await perform_simple_test(None, [PROTOCOL_ECHO], [None], security_protocol)
|
||||
|
||||
90
tests/core/pubsub/test_pubsub_notifee_integration.py
Normal file
90
tests/core/pubsub/test_pubsub_notifee_integration.py
Normal file
@ -0,0 +1,90 @@
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
||||
from libp2p.tools.utils import connect
|
||||
from tests.utils.factories import PubsubFactory
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_connected_enqueues_and_adds_peer():
|
||||
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
|
||||
await connect(p0.host, p1.host)
|
||||
await p0.wait_until_ready()
|
||||
# Wait until peer is added via queue processing
|
||||
with trio.fail_after(1.0):
|
||||
while p1.my_id not in p0.peers:
|
||||
await trio.sleep(0.01)
|
||||
assert p1.my_id in p0.peers
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_disconnected_enqueues_and_removes_peer():
|
||||
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
|
||||
await connect(p0.host, p1.host)
|
||||
await p0.wait_until_ready()
|
||||
# Ensure present first
|
||||
with trio.fail_after(1.0):
|
||||
while p1.my_id not in p0.peers:
|
||||
await trio.sleep(0.01)
|
||||
# Now disconnect and expect removal via dead peer queue
|
||||
await p0.host.get_network().close_peer(p1.host.get_id())
|
||||
with trio.fail_after(1.0):
|
||||
while p1.my_id in p0.peers:
|
||||
await trio.sleep(0.01)
|
||||
assert p1.my_id not in p0.peers
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_channel_closed_is_swallowed_in_notifee(monkeypatch) -> None:
|
||||
# Ensure PubsubNotifee catches BrokenResourceError from its send channel
|
||||
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
|
||||
# Find the PubsubNotifee registered on the network
|
||||
from libp2p.pubsub.pubsub_notifee import PubsubNotifee
|
||||
|
||||
network = p0.host.get_network()
|
||||
notifees = getattr(network, "notifees", [])
|
||||
target = None
|
||||
for nf in notifees:
|
||||
if isinstance(nf, cast(type, PubsubNotifee)):
|
||||
target = nf
|
||||
break
|
||||
assert target is not None, "PubsubNotifee not found on network"
|
||||
|
||||
async def failing_send(_peer_id): # type: ignore[no-redef]
|
||||
raise trio.BrokenResourceError
|
||||
|
||||
# Make initiator queue send fail; PubsubNotifee should swallow
|
||||
monkeypatch.setattr(target.initiator_peers_queue, "send", failing_send)
|
||||
|
||||
# Connect peers; if exceptions are swallowed, service stays running
|
||||
await connect(p0.host, p1.host)
|
||||
await p0.wait_until_ready()
|
||||
assert True
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_duplicate_connection_does_not_duplicate_peer_state():
|
||||
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
|
||||
await connect(p0.host, p1.host)
|
||||
await p0.wait_until_ready()
|
||||
with trio.fail_after(1.0):
|
||||
while p1.my_id not in p0.peers:
|
||||
await trio.sleep(0.01)
|
||||
# Connect again should not add duplicates
|
||||
await connect(p0.host, p1.host)
|
||||
await trio.sleep(0.1)
|
||||
assert list(p0.peers.keys()).count(p1.my_id) == 1
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_blacklist_blocks_peer_added_by_notifee():
|
||||
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
|
||||
# Blacklist before connecting
|
||||
p0.add_to_blacklist(p1.my_id)
|
||||
await connect(p0.host, p1.host)
|
||||
await p0.wait_until_ready()
|
||||
# Give handler a chance to run
|
||||
await trio.sleep(0.1)
|
||||
assert p1.my_id not in p0.peers
|
||||
Reference in New Issue
Block a user