feat(host): add get_live_peers() to track connection state

This commit is contained in:
Winter-Soren
2025-03-07 01:38:22 +05:30
committed by Paul Robinson
parent f6279c23ac
commit b2a6294cfa
4 changed files with 217 additions and 0 deletions

View File

@ -8,12 +8,14 @@ from contextlib import (
import logging
from typing import (
TYPE_CHECKING,
Optional,
)
import multiaddr
from libp2p.abc import (
IHost,
INetConn,
INetStream,
INetworkService,
IPeerStore,
@ -234,3 +236,29 @@ class BasicHost(IHost):
return
net_stream.set_protocol(protocol)
await handler(net_stream)
def get_live_peers(self) -> list[ID]:
"""
Returns a list of currently connected peer IDs.
:return: List of peer IDs that have active connections
"""
return list(self._network.connections.keys())
def is_peer_connected(self, peer_id: ID) -> bool:
"""
Check if a specific peer is currently connected.
:param peer_id: ID of the peer to check
:return: True if peer has an active connection, False otherwise
"""
return peer_id in self._network.connections
def get_peer_connection_info(self, peer_id: ID) -> Optional[INetConn]:
"""
Get connection information for a specific peer if connected.
:param peer_id: ID of the peer to get info for
:return: Connection object if peer is connected, None otherwise
"""
return self._network.connections.get(peer_id)