diff --git a/libp2p/abc.py b/libp2p/abc.py index 37d3ad58..b09e4322 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -1143,6 +1143,12 @@ class IHost(ABC): """ + @abstractmethod + def get_live_peers(self) -> list[ID]: + """ + :return: List of peer IDs that have active connections + """ + @abstractmethod def run(self, listen_addrs: Sequence[Multiaddr]) -> AsyncContextManager[None]: """ diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index fbe2e667..24014321 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -8,12 +8,14 @@ from contextlib import ( import logging from typing import ( TYPE_CHECKING, + Optional, ) import multiaddr from libp2p.abc import ( IHost, + INetConn, INetStream, INetworkService, IPeerStore, @@ -234,3 +236,29 @@ class BasicHost(IHost): return net_stream.set_protocol(protocol) await handler(net_stream) + + def get_live_peers(self) -> list[ID]: + """ + Returns a list of currently connected peer IDs. + + :return: List of peer IDs that have active connections + """ + return list(self._network.connections.keys()) + + def is_peer_connected(self, peer_id: ID) -> bool: + """ + Check if a specific peer is currently connected. + + :param peer_id: ID of the peer to check + :return: True if peer has an active connection, False otherwise + """ + return peer_id in self._network.connections + + def get_peer_connection_info(self, peer_id: ID) -> Optional[INetConn]: + """ + Get connection information for a specific peer if connected. + + :param peer_id: ID of the peer to get info for + :return: Connection object if peer is connected, None otherwise + """ + return self._network.connections.get(peer_id) diff --git a/newsfragments/420.feature.rst b/newsfragments/420.feature.rst new file mode 100644 index 00000000..92987871 --- /dev/null +++ b/newsfragments/420.feature.rst @@ -0,0 +1 @@ +Adds the ability to check connection status of a peer in the peerstore. diff --git a/tests/core/host/test_live_peers.py b/tests/core/host/test_live_peers.py new file mode 100644 index 00000000..4fa8bfec --- /dev/null +++ b/tests/core/host/test_live_peers.py @@ -0,0 +1,182 @@ +import pytest +import trio + +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) +from libp2p.tools.factories import ( + HostFactory, +) + + +@pytest.mark.trio +async def test_live_peers_basic(security_protocol): + """Test basic live peers functionality.""" + async with HostFactory.create_batch_and_listen( + 2, security_protocol=security_protocol + ) as hosts: + host_a, host_b = hosts + + # Initially no live peers + assert len(host_a.get_live_peers()) == 0 + assert len(host_b.get_live_peers()) == 0 + + # Connect the hosts + addr = host_b.get_addrs()[0] + info = info_from_p2p_addr(addr) + await host_a.connect(info) + + # Both should show each other as live peers + assert host_b.get_id() in host_a.get_live_peers() + assert host_a.get_id() in host_b.get_live_peers() + + +@pytest.mark.trio +async def test_live_peers_disconnect(security_protocol): + """ + Test that disconnected peers are removed from live peers but remain in peerstore. + """ + async with HostFactory.create_batch_and_listen( + 2, security_protocol=security_protocol + ) as hosts: + host_a, host_b = hosts + + # Store peer IDs first for clarity + peer_a_id = host_a.get_id() + peer_b_id = host_b.get_id() + + # Initially no connections + assert len(host_a.get_connected_peers()) == 0 + assert len(host_b.get_connected_peers()) == 0 + + # Connect the hosts + addr = host_b.get_addrs()[0] + info = info_from_p2p_addr(addr) + await host_a.connect(info) + + # Add a small delay to allow connection setup to complete + await trio.sleep(0.1) + + # Verify connection state using get_connected_peers() + assert len(host_a.get_connected_peers()) == 1 + assert len(host_b.get_connected_peers()) == 1 + assert peer_b_id in host_a.get_connected_peers() + assert peer_a_id in host_b.get_connected_peers() + + # Store the connected peers for later comparison + connected_peers_a = set(host_a.get_connected_peers()) + connected_peers_b = set(host_b.get_connected_peers()) + + # Disconnect host_a from host_b + await host_a.disconnect(peer_b_id) + await trio.sleep(0.1) + + # Verify peers are no longer in live peers + assert peer_b_id not in host_a.get_live_peers() + assert peer_a_id not in host_b.get_live_peers() + + # But verify they remain in the peerstore by checking against original sets + assert peer_b_id in connected_peers_a + assert peer_a_id in connected_peers_b + + +@pytest.mark.trio +async def test_live_peers_multiple_connections(security_protocol): + """Test live peers with multiple connections.""" + async with HostFactory.create_batch_and_listen( + 3, security_protocol=security_protocol + ) as hosts: + host_a, host_b, host_c = hosts + + # Connect host_a to both host_b and host_c + for peer in [host_b, host_c]: + addr = peer.get_addrs()[0] + info = info_from_p2p_addr(addr) + await host_a.connect(info) + + # Verify host_a sees both peers as live + live_peers = host_a.get_live_peers() + assert len(live_peers) == 2 + assert host_b.get_id() in live_peers + assert host_c.get_id() in live_peers + + # Verify host_b and host_c each see host_a as live + assert host_a.get_id() in host_b.get_live_peers() + assert host_a.get_id() in host_c.get_live_peers() + + # Disconnect one peer + await host_a.disconnect(host_b.get_id()) + + # Verify only host_c remains as live peer for host_a + live_peers = host_a.get_live_peers() + assert len(live_peers) == 1 + assert host_c.get_id() in live_peers + + +@pytest.mark.trio +async def test_live_peers_reconnect(security_protocol): + """Test that peers can be reconnected and appear as live again.""" + async with HostFactory.create_batch_and_listen( + 2, security_protocol=security_protocol + ) as hosts: + host_a, host_b = hosts + + # Initial connection + addr = host_b.get_addrs()[0] + info = info_from_p2p_addr(addr) + await host_a.connect(info) + + # Verify connection + assert host_b.get_id() in host_a.get_live_peers() + + # Disconnect + await host_a.disconnect(host_b.get_id()) + assert host_b.get_id() not in host_a.get_live_peers() + + # Reconnect + await host_a.connect(info) + + # Verify reconnection + assert host_b.get_id() in host_a.get_live_peers() + + +@pytest.mark.trio +async def test_live_peers_unexpected_drop(security_protocol): + """ + Test that live peers are updated correctly when connections drop unexpectedly. + """ + async with HostFactory.create_batch_and_listen( + 2, security_protocol=security_protocol + ) as hosts: + host_a, host_b = hosts + + # Store peer IDs + peer_a_id = host_a.get_id() + peer_b_id = host_b.get_id() + + # Initial connection + addr = host_b.get_addrs()[0] + info = info_from_p2p_addr(addr) + await host_a.connect(info) + + # Verify initial connection + assert peer_b_id in host_a.get_live_peers() + assert peer_a_id in host_b.get_live_peers() + + # Simulate unexpected connection drop by directly closing the connection + conn = host_a.get_network().connections[peer_b_id] + await conn.muxed_conn.close() + + # Allow for connection cleanup + await trio.sleep(0.1) + + # Verify peers are no longer in live peers + assert peer_b_id not in host_a.get_live_peers() + assert peer_a_id not in host_b.get_live_peers() + + # Verify we can reconnect after unexpected drop + await host_a.connect(info) + + # Verify reconnection successful + assert peer_b_id in host_a.get_live_peers() + assert peer_a_id in host_b.get_live_peers()