diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index c649c368..b6034956 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -40,6 +40,7 @@ logger = logging.getLogger(__name__) ID_PUSH = TProtocol("/ipfs/id/push/1.0.0") PROTOCOL_VERSION = "ipfs/0.1.0" AGENT_VERSION = get_agent_version() +LIMIT = trio.CapacityLimiter(10) def identify_push_handler_for(host: IHost) -> StreamHandlerFn: @@ -146,25 +147,26 @@ async def push_identify_to_peer( True if the push was successful, False otherwise. """ - try: - # Create a new stream to the peer using the identify/push protocol - stream = await host.new_stream(peer_id, [ID_PUSH]) + async with LIMIT: + try: + # Create a new stream to the peer using the identify/push protocol + stream = await host.new_stream(peer_id, [ID_PUSH]) - # Create the identify message - identify_msg = _mk_identify_protobuf(host, observed_multiaddr) - response = identify_msg.SerializeToString() + # Create the identify message + identify_msg = _mk_identify_protobuf(host, observed_multiaddr) + response = identify_msg.SerializeToString() - # Send the identify message - await stream.write(response) + # Send the identify message + await stream.write(response) - # Close the stream - await stream.close() + # Close the stream + await stream.close() - logger.debug("Successfully pushed identify to peer %s", peer_id) - return True - except Exception as e: - logger.error("Error pushing identify to peer %s: %s", peer_id, e) - return False + logger.debug("Successfully pushed identify to peer %s", peer_id) + return True + except Exception as e: + logger.error("Error pushing identify to peer %s: %s", peer_id, e) + return False async def push_identify_to_peers( @@ -179,13 +181,10 @@ async def push_identify_to_peers( """ if peer_ids is None: # Get all connected peers - peer_ids = set(host.get_peerstore().peer_ids()) + peer_ids = set(host.get_connected_peers()) # Push to each peer in parallel using a trio.Nursery - # TODO: Consider using a bounded nursery to limit concurrency - # and avoid overwhelming the network. This can be done by using - # trio.open_nursery(max_concurrent=10) or similar. - # For now, we will use an unbounded nursery for simplicity. + # limiting concurrent connections to 10 async with trio.open_nursery() as nursery: for peer_id in peer_ids: nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr)