Merge branch 'main' into feat/804-add-thin-waist-address

This commit is contained in:
Manu Sheel Gupta
2025-08-25 15:52:14 +05:30
committed by GitHub
4 changed files with 249 additions and 0 deletions

View File

@ -0,0 +1 @@
Improved PubsubNotifee integration tests and added failure scenario coverage.

View File

@ -0,0 +1,82 @@
import pytest
from multiaddr import Multiaddr
import trio
from libp2p.abc import (
INetConn,
INetStream,
INetwork,
INotifee,
)
from libp2p.tools.utils import connect_swarm
from tests.utils.factories import SwarmFactory
class CountingNotifee(INotifee):
def __init__(self, event: trio.Event) -> None:
self._event = event
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
pass
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
pass
async def connected(self, network: INetwork, conn: INetConn) -> None:
self._event.set()
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
pass
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
pass
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
pass
class SlowNotifee(INotifee):
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
pass
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
pass
async def connected(self, network: INetwork, conn: INetConn) -> None:
await trio.sleep(0.5)
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
pass
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
pass
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
pass
@pytest.mark.trio
async def test_many_notifees_receive_connected_quickly() -> None:
async with SwarmFactory.create_batch_and_listen(2) as swarms:
count = 200
events = [trio.Event() for _ in range(count)]
for ev in events:
swarms[0].register_notifee(CountingNotifee(ev))
await connect_swarm(swarms[0], swarms[1])
with trio.fail_after(1.5):
for ev in events:
await ev.wait()
@pytest.mark.trio
async def test_slow_notifee_does_not_block_others() -> None:
async with SwarmFactory.create_batch_and_listen(2) as swarms:
fast_events = [trio.Event() for _ in range(20)]
for ev in fast_events:
swarms[0].register_notifee(CountingNotifee(ev))
swarms[0].register_notifee(SlowNotifee())
await connect_swarm(swarms[0], swarms[1])
# Fast notifees should complete quickly despite one slow notifee
with trio.fail_after(0.3):
for ev in fast_events:
await ev.wait()

View File

@ -0,0 +1,76 @@
import enum
import pytest
from multiaddr import Multiaddr
import trio
from libp2p.abc import (
INetConn,
INetStream,
INetwork,
INotifee,
)
from libp2p.tools.async_service import background_trio_service
from libp2p.tools.constants import LISTEN_MADDR
from tests.utils.factories import SwarmFactory
class Event(enum.Enum):
Listen = 0
ListenClose = 1
class MyNotifee(INotifee):
def __init__(self, events: list[Event]):
self.events = events
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
pass
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
pass
async def connected(self, network: INetwork, conn: INetConn) -> None:
pass
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
pass
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
self.events.append(Event.Listen)
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
self.events.append(Event.ListenClose)
async def wait_for_event(
events_list: list[Event], event: Event, timeout: float = 1.0
) -> bool:
with trio.move_on_after(timeout):
while event not in events_list:
await trio.sleep(0.01)
return True
return False
@pytest.mark.trio
async def test_listen_emitted_when_registered_before_listen():
events: list[Event] = []
swarm = SwarmFactory.build()
swarm.register_notifee(MyNotifee(events))
async with background_trio_service(swarm):
# Start listening now; notifee was registered beforehand
assert await swarm.listen(LISTEN_MADDR)
assert await wait_for_event(events, Event.Listen)
@pytest.mark.trio
async def test_single_listener_close_emits_listen_close():
events: list[Event] = []
swarm = SwarmFactory.build()
swarm.register_notifee(MyNotifee(events))
async with background_trio_service(swarm):
assert await swarm.listen(LISTEN_MADDR)
# Explicitly notify listen_close (close path via manager doesn't emit it)
await swarm.notify_listen_close(LISTEN_MADDR)
assert await wait_for_event(events, Event.ListenClose)

View File

@ -0,0 +1,90 @@
from typing import cast
import pytest
import trio
from libp2p.tools.utils import connect
from tests.utils.factories import PubsubFactory
@pytest.mark.trio
async def test_connected_enqueues_and_adds_peer():
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
await connect(p0.host, p1.host)
await p0.wait_until_ready()
# Wait until peer is added via queue processing
with trio.fail_after(1.0):
while p1.my_id not in p0.peers:
await trio.sleep(0.01)
assert p1.my_id in p0.peers
@pytest.mark.trio
async def test_disconnected_enqueues_and_removes_peer():
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
await connect(p0.host, p1.host)
await p0.wait_until_ready()
# Ensure present first
with trio.fail_after(1.0):
while p1.my_id not in p0.peers:
await trio.sleep(0.01)
# Now disconnect and expect removal via dead peer queue
await p0.host.get_network().close_peer(p1.host.get_id())
with trio.fail_after(1.0):
while p1.my_id in p0.peers:
await trio.sleep(0.01)
assert p1.my_id not in p0.peers
@pytest.mark.trio
async def test_channel_closed_is_swallowed_in_notifee(monkeypatch) -> None:
# Ensure PubsubNotifee catches BrokenResourceError from its send channel
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
# Find the PubsubNotifee registered on the network
from libp2p.pubsub.pubsub_notifee import PubsubNotifee
network = p0.host.get_network()
notifees = getattr(network, "notifees", [])
target = None
for nf in notifees:
if isinstance(nf, cast(type, PubsubNotifee)):
target = nf
break
assert target is not None, "PubsubNotifee not found on network"
async def failing_send(_peer_id): # type: ignore[no-redef]
raise trio.BrokenResourceError
# Make initiator queue send fail; PubsubNotifee should swallow
monkeypatch.setattr(target.initiator_peers_queue, "send", failing_send)
# Connect peers; if exceptions are swallowed, service stays running
await connect(p0.host, p1.host)
await p0.wait_until_ready()
assert True
@pytest.mark.trio
async def test_duplicate_connection_does_not_duplicate_peer_state():
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
await connect(p0.host, p1.host)
await p0.wait_until_ready()
with trio.fail_after(1.0):
while p1.my_id not in p0.peers:
await trio.sleep(0.01)
# Connect again should not add duplicates
await connect(p0.host, p1.host)
await trio.sleep(0.1)
assert list(p0.peers.keys()).count(p1.my_id) == 1
@pytest.mark.trio
async def test_blacklist_blocks_peer_added_by_notifee():
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
# Blacklist before connecting
p0.add_to_blacklist(p1.my_id)
await connect(p0.host, p1.host)
await p0.wait_until_ready()
# Give handler a chance to run
await trio.sleep(0.1)
assert p1.my_id not in p0.peers