Merge branch 'main' into async-validators

This commit is contained in:
Manu Sheel Gupta
2025-07-03 01:31:48 -07:00
committed by GitHub
5 changed files with 130 additions and 23 deletions

View File

@ -40,6 +40,7 @@ logger = logging.getLogger(__name__)
ID_PUSH = TProtocol("/ipfs/id/push/1.0.0")
PROTOCOL_VERSION = "ipfs/0.1.0"
AGENT_VERSION = get_agent_version()
CONCURRENCY_LIMIT = 10
def identify_push_handler_for(host: IHost) -> StreamHandlerFn:
@ -132,7 +133,10 @@ async def _update_peerstore_from_identify(
async def push_identify_to_peer(
host: IHost, peer_id: ID, observed_multiaddr: Multiaddr | None = None
host: IHost,
peer_id: ID,
observed_multiaddr: Multiaddr | None = None,
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
) -> bool:
"""
Push an identify message to a specific peer.
@ -146,25 +150,26 @@ async def push_identify_to_peer(
True if the push was successful, False otherwise.
"""
try:
# Create a new stream to the peer using the identify/push protocol
stream = await host.new_stream(peer_id, [ID_PUSH])
async with limit:
try:
# Create a new stream to the peer using the identify/push protocol
stream = await host.new_stream(peer_id, [ID_PUSH])
# Create the identify message
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
response = identify_msg.SerializeToString()
# Create the identify message
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
response = identify_msg.SerializeToString()
# Send the identify message
await stream.write(response)
# Send the identify message
await stream.write(response)
# Close the stream
await stream.close()
# Close the stream
await stream.close()
logger.debug("Successfully pushed identify to peer %s", peer_id)
return True
except Exception as e:
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
return False
logger.debug("Successfully pushed identify to peer %s", peer_id)
return True
except Exception as e:
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
return False
async def push_identify_to_peers(
@ -179,13 +184,10 @@ async def push_identify_to_peers(
"""
if peer_ids is None:
# Get all connected peers
peer_ids = set(host.get_peerstore().peer_ids())
peer_ids = set(host.get_connected_peers())
# Push to each peer in parallel using a trio.Nursery
# TODO: Consider using a bounded nursery to limit concurrency
# and avoid overwhelming the network. This can be done by using
# trio.open_nursery(max_concurrent=10) or similar.
# For now, we will use an unbounded nursery for simplicity.
# limiting concurrent connections to 10
async with trio.open_nursery() as nursery:
for peer_id in peer_ids:
nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr)