mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Add the periodic peer-store cleanup in all the examples
This commit is contained in:
@ -43,6 +43,9 @@ async def run(port: int, destination: str) -> None:
|
|||||||
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||||
host = new_host()
|
host = new_host()
|
||||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||||
|
# Start the peer-store cleanup task
|
||||||
|
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||||
|
|
||||||
if not destination: # its the server
|
if not destination: # its the server
|
||||||
|
|
||||||
async def stream_handler(stream: INetStream) -> None:
|
async def stream_handler(stream: INetStream) -> None:
|
||||||
|
|||||||
@ -45,7 +45,10 @@ async def run(port: int, destination: str, seed: int | None = None) -> None:
|
|||||||
secret = secrets.token_bytes(32)
|
secret = secrets.token_bytes(32)
|
||||||
|
|
||||||
host = new_host(key_pair=create_new_key_pair(secret))
|
host = new_host(key_pair=create_new_key_pair(secret))
|
||||||
async with host.run(listen_addrs=[listen_addr]):
|
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||||
|
# Start the peer-store cleanup task
|
||||||
|
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||||
|
|
||||||
print(f"I am {host.get_id().to_string()}")
|
print(f"I am {host.get_id().to_string()}")
|
||||||
|
|
||||||
if not destination: # its the server
|
if not destination: # its the server
|
||||||
|
|||||||
@ -72,7 +72,13 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
|
|||||||
)
|
)
|
||||||
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
|
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
|
||||||
|
|
||||||
async with host_a.run(listen_addrs=[listen_addr]):
|
async with (
|
||||||
|
host_a.run(listen_addrs=[listen_addr]),
|
||||||
|
trio.open_nursery() as nursery,
|
||||||
|
):
|
||||||
|
# Start the peer-store cleanup task
|
||||||
|
nursery.start_soon(host_a.get_peerstore().start_cleanup_task, 60)
|
||||||
|
|
||||||
# Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
|
# Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
|
||||||
# connections
|
# connections
|
||||||
server_addr = str(host_a.get_addrs()[0])
|
server_addr = str(host_a.get_addrs()[0])
|
||||||
@ -131,7 +137,13 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
|
|||||||
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
|
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
|
||||||
host_b = new_host()
|
host_b = new_host()
|
||||||
|
|
||||||
async with host_b.run(listen_addrs=[listen_addr]):
|
async with (
|
||||||
|
host_b.run(listen_addrs=[listen_addr]),
|
||||||
|
trio.open_nursery() as nursery,
|
||||||
|
):
|
||||||
|
# Start the peer-store cleanup task
|
||||||
|
nursery.start_soon(host_b.get_peerstore().start_cleanup_task, 60)
|
||||||
|
|
||||||
# Connect to the first host
|
# Connect to the first host
|
||||||
print(f"dialer (host_b) listening on {host_b.get_addrs()[0]}")
|
print(f"dialer (host_b) listening on {host_b.get_addrs()[0]}")
|
||||||
maddr = multiaddr.Multiaddr(destination)
|
maddr = multiaddr.Multiaddr(destination)
|
||||||
|
|||||||
@ -211,7 +211,15 @@ async def main() -> None:
|
|||||||
listen_addr_1 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
listen_addr_1 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||||
listen_addr_2 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
listen_addr_2 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
|
||||||
|
|
||||||
async with host_1.run([listen_addr_1]), host_2.run([listen_addr_2]):
|
async with (
|
||||||
|
host_1.run([listen_addr_1]),
|
||||||
|
host_2.run([listen_addr_2]),
|
||||||
|
trio.open_nursery() as nursery,
|
||||||
|
):
|
||||||
|
# Start the peer-store cleanup task
|
||||||
|
nursery.start_soon(host_1.get_peerstore().start_cleanup_task, 60)
|
||||||
|
nursery.start_soon(host_2.get_peerstore().start_cleanup_task, 60)
|
||||||
|
|
||||||
# Get the addresses of both hosts
|
# Get the addresses of both hosts
|
||||||
addr_1 = host_1.get_addrs()[0]
|
addr_1 = host_1.get_addrs()[0]
|
||||||
addr_2 = host_2.get_addrs()[0]
|
addr_2 = host_2.get_addrs()[0]
|
||||||
|
|||||||
@ -151,7 +151,10 @@ async def run_node(
|
|||||||
host = new_host(key_pair=key_pair)
|
host = new_host(key_pair=key_pair)
|
||||||
listen_addr = Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")
|
listen_addr = Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")
|
||||||
|
|
||||||
async with host.run(listen_addrs=[listen_addr]):
|
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||||
|
# Start the peer-store cleanup task
|
||||||
|
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||||
|
|
||||||
peer_id = host.get_id().pretty()
|
peer_id = host.get_id().pretty()
|
||||||
addr_str = f"/ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}"
|
addr_str = f"/ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}"
|
||||||
await connect_to_bootstrap_nodes(host, bootstrap_nodes)
|
await connect_to_bootstrap_nodes(host, bootstrap_nodes)
|
||||||
|
|||||||
@ -46,7 +46,10 @@ async def run(port: int) -> None:
|
|||||||
|
|
||||||
logger.info("Starting peer Discovery")
|
logger.info("Starting peer Discovery")
|
||||||
host = new_host(key_pair=key_pair, enable_mDNS=True)
|
host = new_host(key_pair=key_pair, enable_mDNS=True)
|
||||||
async with host.run(listen_addrs=[listen_addr]):
|
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||||
|
# Start the peer-store cleanup task
|
||||||
|
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||||
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -144,6 +144,9 @@ async def run(topic: str, destination: str | None, port: int | None) -> None:
|
|||||||
pubsub = Pubsub(host, gossipsub)
|
pubsub = Pubsub(host, gossipsub)
|
||||||
termination_event = trio.Event() # Event to signal termination
|
termination_event = trio.Event() # Event to signal termination
|
||||||
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
|
||||||
|
# Start the peer-store cleanup task
|
||||||
|
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
|
||||||
|
|
||||||
logger.info(f"Node started with peer ID: {host.get_id()}")
|
logger.info(f"Node started with peer ID: {host.get_id()}")
|
||||||
logger.info(f"Listening on: {listen_addr}")
|
logger.info(f"Listening on: {listen_addr}")
|
||||||
logger.info("Initializing PubSub and GossipSub...")
|
logger.info("Initializing PubSub and GossipSub...")
|
||||||
|
|||||||
@ -141,16 +141,6 @@ async def _update_peerstore_from_identify(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error updating protocols for peer %s: %s", peer_id, e)
|
logger.error("Error updating protocols for peer %s: %s", peer_id, e)
|
||||||
|
|
||||||
# Update observed address if present
|
|
||||||
if identify_msg.HasField("observed_addr") and identify_msg.observed_addr:
|
|
||||||
try:
|
|
||||||
# Convert bytes to Multiaddr object
|
|
||||||
observed_addr = Multiaddr(identify_msg.observed_addr)
|
|
||||||
# Add the observed address to the peerstore
|
|
||||||
# Use a default TTL of 2 hours (7200 seconds)
|
|
||||||
peerstore.add_addr(peer_id, observed_addr, 7200)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Error updating observed address for peer %s: %s", peer_id, e)
|
|
||||||
if identify_msg.HasField("signedPeerRecord"):
|
if identify_msg.HasField("signedPeerRecord"):
|
||||||
try:
|
try:
|
||||||
# Convert the signed-peer-record(Envelope) from prtobuf bytes
|
# Convert the signed-peer-record(Envelope) from prtobuf bytes
|
||||||
@ -164,6 +154,16 @@ async def _update_peerstore_from_identify(
|
|||||||
logger.error(
|
logger.error(
|
||||||
"Error updating the certified addr book for peer %s: %s", peer_id, e
|
"Error updating the certified addr book for peer %s: %s", peer_id, e
|
||||||
)
|
)
|
||||||
|
# Update observed address if present
|
||||||
|
if identify_msg.HasField("observed_addr") and identify_msg.observed_addr:
|
||||||
|
try:
|
||||||
|
# Convert bytes to Multiaddr object
|
||||||
|
observed_addr = Multiaddr(identify_msg.observed_addr)
|
||||||
|
# Add the observed address to the peerstore
|
||||||
|
# Use a default TTL of 2 hours (7200 seconds)
|
||||||
|
peerstore.add_addr(peer_id, observed_addr, 7200)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error updating observed address for peer %s: %s", peer_id, e)
|
||||||
|
|
||||||
|
|
||||||
async def push_identify_to_peer(
|
async def push_identify_to_peer(
|
||||||
|
|||||||
Reference in New Issue
Block a user