From 8c96c5a941472e4ee22271a23471f000c3e612b4 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Fri, 25 Jul 2025 23:50:08 +0530 Subject: [PATCH] Add the periodic peer-store cleanup in all the examples --- examples/chat/chat.py | 3 +++ examples/echo/echo.py | 5 ++++- examples/identify/identify.py | 16 +++++++++++++-- examples/identify_push/identify_push_demo.py | 10 +++++++++- examples/kademlia/kademlia.py | 5 ++++- examples/mDNS/mDNS.py | 5 ++++- examples/pubsub/pubsub.py | 3 +++ .../identity/identify_push/identify_push.py | 20 +++++++++---------- 8 files changed, 51 insertions(+), 16 deletions(-) diff --git a/examples/chat/chat.py b/examples/chat/chat.py index 87e7a44a..05a9b918 100755 --- a/examples/chat/chat.py +++ b/examples/chat/chat.py @@ -43,6 +43,9 @@ async def run(port: int, destination: str) -> None: listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") host = new_host() async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + if not destination: # its the server async def stream_handler(stream: INetStream) -> None: diff --git a/examples/echo/echo.py b/examples/echo/echo.py index 9f1722b2..126a7da2 100644 --- a/examples/echo/echo.py +++ b/examples/echo/echo.py @@ -45,7 +45,10 @@ async def run(port: int, destination: str, seed: int | None = None) -> None: secret = secrets.token_bytes(32) host = new_host(key_pair=create_new_key_pair(secret)) - async with host.run(listen_addrs=[listen_addr]): + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + print(f"I am {host.get_id().to_string()}") if not destination: # its the server diff --git a/examples/identify/identify.py b/examples/identify/identify.py index 06f30cff..98980f99 100644 --- a/examples/identify/identify.py +++ b/examples/identify/identify.py @@ -72,7 +72,13 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No ) host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler) - async with host_a.run(listen_addrs=[listen_addr]): + async with ( + host_a.run(listen_addrs=[listen_addr]), + trio.open_nursery() as nursery, + ): + # Start the peer-store cleanup task + nursery.start_soon(host_a.get_peerstore().start_cleanup_task, 60) + # Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client # connections server_addr = str(host_a.get_addrs()[0]) @@ -131,7 +137,13 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}") host_b = new_host() - async with host_b.run(listen_addrs=[listen_addr]): + async with ( + host_b.run(listen_addrs=[listen_addr]), + trio.open_nursery() as nursery, + ): + # Start the peer-store cleanup task + nursery.start_soon(host_b.get_peerstore().start_cleanup_task, 60) + # Connect to the first host print(f"dialer (host_b) listening on {host_b.get_addrs()[0]}") maddr = multiaddr.Multiaddr(destination) diff --git a/examples/identify_push/identify_push_demo.py b/examples/identify_push/identify_push_demo.py index 5a293f07..ccd8b29d 100644 --- a/examples/identify_push/identify_push_demo.py +++ b/examples/identify_push/identify_push_demo.py @@ -211,7 +211,15 @@ async def main() -> None: listen_addr_1 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") listen_addr_2 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") - async with host_1.run([listen_addr_1]), host_2.run([listen_addr_2]): + async with ( + host_1.run([listen_addr_1]), + host_2.run([listen_addr_2]), + trio.open_nursery() as nursery, + ): + # Start the peer-store cleanup task + nursery.start_soon(host_1.get_peerstore().start_cleanup_task, 60) + nursery.start_soon(host_2.get_peerstore().start_cleanup_task, 60) + # Get the addresses of both hosts addr_1 = host_1.get_addrs()[0] addr_2 = host_2.get_addrs()[0] diff --git a/examples/kademlia/kademlia.py b/examples/kademlia/kademlia.py index ada81d87..00c7915a 100644 --- a/examples/kademlia/kademlia.py +++ b/examples/kademlia/kademlia.py @@ -151,7 +151,10 @@ async def run_node( host = new_host(key_pair=key_pair) listen_addr = Multiaddr(f"/ip4/127.0.0.1/tcp/{port}") - async with host.run(listen_addrs=[listen_addr]): + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + peer_id = host.get_id().pretty() addr_str = f"/ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}" await connect_to_bootstrap_nodes(host, bootstrap_nodes) diff --git a/examples/mDNS/mDNS.py b/examples/mDNS/mDNS.py index 794e05c8..d3f11b56 100644 --- a/examples/mDNS/mDNS.py +++ b/examples/mDNS/mDNS.py @@ -46,7 +46,10 @@ async def run(port: int) -> None: logger.info("Starting peer Discovery") host = new_host(key_pair=key_pair, enable_mDNS=True) - async with host.run(listen_addrs=[listen_addr]): + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + await trio.sleep_forever() diff --git a/examples/pubsub/pubsub.py b/examples/pubsub/pubsub.py index 9dca415f..1ab6d650 100644 --- a/examples/pubsub/pubsub.py +++ b/examples/pubsub/pubsub.py @@ -144,6 +144,9 @@ async def run(topic: str, destination: str | None, port: int | None) -> None: pubsub = Pubsub(host, gossipsub) termination_event = trio.Event() # Event to signal termination async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start the peer-store cleanup task + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + logger.info(f"Node started with peer ID: {host.get_id()}") logger.info(f"Listening on: {listen_addr}") logger.info("Initializing PubSub and GossipSub...") diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 9ed3f5bf..5b23851b 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -141,16 +141,6 @@ async def _update_peerstore_from_identify( except Exception as e: logger.error("Error updating protocols for peer %s: %s", peer_id, e) - # Update observed address if present - if identify_msg.HasField("observed_addr") and identify_msg.observed_addr: - try: - # Convert bytes to Multiaddr object - observed_addr = Multiaddr(identify_msg.observed_addr) - # Add the observed address to the peerstore - # Use a default TTL of 2 hours (7200 seconds) - peerstore.add_addr(peer_id, observed_addr, 7200) - except Exception as e: - logger.error("Error updating observed address for peer %s: %s", peer_id, e) if identify_msg.HasField("signedPeerRecord"): try: # Convert the signed-peer-record(Envelope) from prtobuf bytes @@ -164,6 +154,16 @@ async def _update_peerstore_from_identify( logger.error( "Error updating the certified addr book for peer %s: %s", peer_id, e ) + # Update observed address if present + if identify_msg.HasField("observed_addr") and identify_msg.observed_addr: + try: + # Convert bytes to Multiaddr object + observed_addr = Multiaddr(identify_msg.observed_addr) + # Add the observed address to the peerstore + # Use a default TTL of 2 hours (7200 seconds) + peerstore.add_addr(peer_id, observed_addr, 7200) + except Exception as e: + logger.error("Error updating observed address for peer %s: %s", peer_id, e) async def push_identify_to_peer(