Merge branch 'main' into feat/619-store-pubkey-peerid-peerstore

This commit is contained in:
Soham Bhoir
2025-06-10 21:12:28 +05:30
committed by GitHub
123 changed files with 2849 additions and 1444 deletions

View File

@ -40,7 +40,6 @@ async def write_data(stream: INetStream) -> None:
async def run(port: int, destination: str) -> None:
localhost_ip = "127.0.0.1"
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
host = new_host()
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
@ -54,8 +53,8 @@ async def run(port: int, destination: str) -> None:
print(
"Run this from the same folder in another console:\n\n"
f"chat-demo -p {int(port) + 1} "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id().pretty()}\n"
f"chat-demo "
f"-d {host.get_addrs()[0]}\n"
)
print("Waiting for incoming connection...")
@ -87,9 +86,7 @@ def main() -> None:
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-p", "--port", default=8000, type=int, help="source port number"
)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
parser.add_argument(
"-d",
"--destination",
@ -98,9 +95,6 @@ def main() -> None:
)
args = parser.parse_args()
if not args.port:
raise RuntimeError("was not able to determine a local port")
try:
trio.run(run, *(args.port, args.destination))
except KeyboardInterrupt:

View File

@ -9,8 +9,10 @@ from libp2p import (
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
from libp2p.security.noise.transport import Transport as NoiseTransport
from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
async def main():

View File

@ -9,8 +9,10 @@ from libp2p import (
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.security.secio.transport import ID as SECIO_PROTOCOL_ID
from libp2p.security.secio.transport import Transport as SecioTransport
from libp2p.security.secio.transport import (
ID as SECIO_PROTOCOL_ID,
Transport as SecioTransport,
)
async def main():
@ -22,9 +24,6 @@ async def main():
secio_transport = SecioTransport(
# local_key_pair: The key pair used for libp2p identity and authentication
local_key_pair=key_pair,
# secure_bytes_provider: Optional function to generate secure random bytes
# (defaults to secrets.token_bytes)
secure_bytes_provider=None, # Use default implementation
)
# Create a security options dictionary mapping protocol ID to transport

View File

@ -9,10 +9,9 @@ from libp2p import (
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
from libp2p.security.noise.transport import Transport as NoiseTransport
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_PROTOCOL_ID,
from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
@ -37,14 +36,8 @@ async def main():
# Create a security options dictionary mapping protocol ID to transport
security_options = {NOISE_PROTOCOL_ID: noise_transport}
# Create a muxer options dictionary mapping protocol ID to muxer class
# We don't need to instantiate the muxer here, the host will do that for us
muxer_options = {MPLEX_PROTOCOL_ID: None}
# Create a host with the key pair, Noise security, and mplex multiplexer
host = new_host(
key_pair=key_pair, sec_opt=security_options, muxer_opt=muxer_options
)
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
port = 8000

View File

@ -0,0 +1,263 @@
"""
Enhanced NetStream Example for py-libp2p with State Management
This example demonstrates the new NetStream features including:
- State tracking and transitions
- Proper error handling and validation
- Resource cleanup and event notifications
- Thread-safe operations with Trio locks
Based on the standard echo demo but enhanced to show NetStream state management.
"""
import argparse
import random
import secrets
import multiaddr
import trio
from libp2p import (
new_host,
)
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.exceptions import (
StreamClosed,
StreamEOF,
StreamReset,
)
from libp2p.network.stream.net_stream import (
NetStream,
StreamState,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
PROTOCOL_ID = TProtocol("/echo/1.0.0")
async def enhanced_echo_handler(stream: NetStream) -> None:
"""
Enhanced echo handler that demonstrates NetStream state management.
"""
print(f"New connection established: {stream}")
print(f"Initial stream state: {await stream.state}")
try:
# Verify stream is in expected initial state
assert await stream.state == StreamState.OPEN
assert await stream.is_readable()
assert await stream.is_writable()
print("✓ Stream initialized in OPEN state")
# Read incoming data with proper state checking
print("Waiting for client data...")
while await stream.is_readable():
try:
# Read data from client
data = await stream.read(1024)
if not data:
print("Received empty data, client may have closed")
break
print(f"Received: {data.decode('utf-8').strip()}")
# Check if we can still write before echoing
if await stream.is_writable():
await stream.write(data)
print(f"Echoed: {data.decode('utf-8').strip()}")
else:
print("Cannot echo - stream not writable")
break
except StreamEOF:
print("Client closed their write side (EOF)")
break
except StreamReset:
print("Stream was reset by client")
return
except StreamClosed as e:
print(f"Stream operation failed: {e}")
break
# Demonstrate graceful closure
current_state = await stream.state
print(f"Current state before close: {current_state}")
if current_state not in [StreamState.CLOSE_BOTH, StreamState.RESET]:
await stream.close()
print("Server closed write side")
final_state = await stream.state
print(f"Final stream state: {final_state}")
except Exception as e:
print(f"Handler error: {e}")
# Reset stream on unexpected errors
if await stream.state not in [StreamState.RESET, StreamState.CLOSE_BOTH]:
await stream.reset()
print("Stream reset due to error")
async def enhanced_client_demo(stream: NetStream) -> None:
"""
Enhanced client that demonstrates various NetStream state scenarios.
"""
print(f"Client stream established: {stream}")
print(f"Initial state: {await stream.state}")
try:
# Verify initial state
assert await stream.state == StreamState.OPEN
print("✓ Client stream in OPEN state")
# Scenario 1: Normal communication
message = b"Hello from enhanced NetStream client!\n"
if await stream.is_writable():
await stream.write(message)
print(f"Sent: {message.decode('utf-8').strip()}")
else:
print("Cannot write - stream not writable")
return
# Close write side to signal EOF to server
await stream.close()
print("Client closed write side")
# Verify state transition
state_after_close = await stream.state
print(f"State after close: {state_after_close}")
assert state_after_close == StreamState.CLOSE_WRITE
assert await stream.is_readable() # Should still be readable
assert not await stream.is_writable() # Should not be writable
# Try to write (should fail)
try:
await stream.write(b"This should fail")
print("ERROR: Write succeeded when it should have failed!")
except StreamClosed as e:
print(f"✓ Expected error when writing to closed stream: {e}")
# Read the echo response
if await stream.is_readable():
try:
response = await stream.read()
print(f"Received echo: {response.decode('utf-8').strip()}")
except StreamEOF:
print("Server closed their write side")
except StreamReset:
print("Stream was reset")
# Check final state
final_state = await stream.state
print(f"Final client state: {final_state}")
except Exception as e:
print(f"Client error: {e}")
# Reset on error
await stream.reset()
print("Client reset stream due to error")
async def run_enhanced_demo(
port: int, destination: str, seed: int | None = None
) -> None:
"""
Run enhanced echo demo with NetStream state management.
"""
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
# Generate or use provided key
if seed:
random.seed(seed)
secret_number = random.getrandbits(32 * 8)
secret = secret_number.to_bytes(length=32, byteorder="big")
else:
secret = secrets.token_bytes(32)
host = new_host(key_pair=create_new_key_pair(secret))
async with host.run(listen_addrs=[listen_addr]):
print(f"Host ID: {host.get_id().to_string()}")
print("=" * 60)
if not destination: # Server mode
print("🖥️ ENHANCED ECHO SERVER MODE")
print("=" * 60)
# type: ignore: Stream is type of NetStream
host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler)
print(
"Run client from another console:\n"
f"python3 example_net_stream.py "
f"-d {host.get_addrs()[0]}\n"
)
print("Waiting for connections...")
print("Press Ctrl+C to stop server")
await trio.sleep_forever()
else: # Client mode
print("📱 ENHANCED ECHO CLIENT MODE")
print("=" * 60)
# Connect to server
maddr = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(maddr)
await host.connect(info)
print(f"Connected to server: {info.peer_id.pretty()}")
# Create stream and run enhanced demo
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])
if isinstance(stream, NetStream):
await enhanced_client_demo(stream)
print("\n" + "=" * 60)
print("CLIENT DEMO COMPLETE")
def main() -> None:
example_maddr = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
parser.add_argument(
"-d",
"--destination",
type=str,
help=f"destination multiaddr string, e.g. {example_maddr}",
)
parser.add_argument(
"-s",
"--seed",
type=int,
help="seed for deterministic peer ID generation",
)
parser.add_argument(
"--demo-states", action="store_true", help="run state transition demo only"
)
args = parser.parse_args()
try:
trio.run(run_enhanced_demo, args.port, args.destination, args.seed)
except KeyboardInterrupt:
print("\n👋 Demo interrupted by user")
except Exception as e:
print(f"❌ Demo failed: {e}")
if __name__ == "__main__":
main()

View File

@ -12,10 +12,9 @@ from libp2p.crypto.secp256k1 import (
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
from libp2p.security.noise.transport import Transport as NoiseTransport
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_PROTOCOL_ID,
from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
@ -40,14 +39,8 @@ async def main():
# Create a security options dictionary mapping protocol ID to transport
security_options = {NOISE_PROTOCOL_ID: noise_transport}
# Create a muxer options dictionary mapping protocol ID to muxer class
# We don't need to instantiate the muxer here, the host will do that for us
muxer_options = {MPLEX_PROTOCOL_ID: None}
# Create a host with the key pair, Noise security, and mplex multiplexer
host = new_host(
key_pair=key_pair, sec_opt=security_options, muxer_opt=muxer_options
)
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
port = 8000

View File

@ -9,10 +9,9 @@ from libp2p import (
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
from libp2p.security.noise.transport import Transport as NoiseTransport
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_PROTOCOL_ID,
from libp2p.security.noise.transport import (
PROTOCOL_ID as NOISE_PROTOCOL_ID,
Transport as NoiseTransport,
)
@ -37,14 +36,8 @@ async def main():
# Create a security options dictionary mapping protocol ID to transport
security_options = {NOISE_PROTOCOL_ID: noise_transport}
# Create a muxer options dictionary mapping protocol ID to muxer class
# We don't need to instantiate the muxer here, the host will do that for us
muxer_options = {MPLEX_PROTOCOL_ID: None}
# Create a host with the key pair, Noise security, and mplex multiplexer
host = new_host(
key_pair=key_pair, sec_opt=security_options, muxer_opt=muxer_options
)
host = new_host(key_pair=key_pair, sec_opt=security_options)
# Configure the listening address
port = 8000

View File

@ -29,8 +29,7 @@ async def _echo_stream_handler(stream: INetStream) -> None:
await stream.close()
async def run(port: int, destination: str, seed: int = None) -> None:
localhost_ip = "127.0.0.1"
async def run(port: int, destination: str, seed: int | None = None) -> None:
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
if seed:
@ -53,8 +52,8 @@ async def run(port: int, destination: str, seed: int = None) -> None:
print(
"Run this from the same folder in another console:\n\n"
f"echo-demo -p {int(port) + 1} "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id().pretty()}\n"
f"echo-demo "
f"-d {host.get_addrs()[0]}\n"
)
print("Waiting for incoming connections...")
await trio.sleep_forever()
@ -73,6 +72,7 @@ async def run(port: int, destination: str, seed: int = None) -> None:
msg = b"hi, there!\n"
await stream.write(msg)
# TODO: check why the stream is closed after the first write ???
# Notify the other side about EOF
await stream.close()
response = await stream.read()
@ -94,9 +94,7 @@ def main() -> None:
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-p", "--port", default=8000, type=int, help="source port number"
)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
parser.add_argument(
"-d",
"--destination",
@ -110,10 +108,6 @@ def main() -> None:
help="provide a seed to the random number generator (e.g. to fix peer IDs across runs)", # noqa: E501
)
args = parser.parse_args()
if not args.port:
raise RuntimeError("was not able to determine a local port")
try:
trio.run(run, args.port, args.destination, args.seed)
except KeyboardInterrupt:

View File

@ -61,20 +61,20 @@ async def run(port: int, destination: str) -> None:
async with host_a.run(listen_addrs=[listen_addr]):
print(
"First host listening. Run this from another console:\n\n"
f"identify-demo -p {int(port) + 1} "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host_a.get_id().pretty()}\n"
f"identify-demo "
f"-d {host_a.get_addrs()[0]}\n"
)
print("Waiting for incoming identify request...")
await trio.sleep_forever()
else:
# Create second host (dialer)
print(f"dialer (host_b) listening on /ip4/{localhost_ip}/tcp/{port}")
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
host_b = new_host()
async with host_b.run(listen_addrs=[listen_addr]):
# Connect to the first host
print(f"dialer (host_b) listening on {host_b.get_addrs()[0]}")
maddr = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(maddr)
print(f"Second host connecting to peer: {info.peer_id}")
@ -104,13 +104,11 @@ def main() -> None:
"""
example_maddr = (
"/ip4/127.0.0.1/tcp/8888/p2p/QmQn4SwGkDZkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
"/ip4/127.0.0.1/tcp/8888/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-p", "--port", default=8888, type=int, help="source port number"
)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
parser.add_argument(
"-d",
"--destination",
@ -119,9 +117,6 @@ def main() -> None:
)
args = parser.parse_args()
if not args.port:
raise RuntimeError("failed to determine local port")
try:
trio.run(run, *(args.port, args.destination))
except KeyboardInterrupt:

View File

@ -38,17 +38,17 @@ from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.identity.identify import (
ID as ID_IDENTIFY,
identify_handler_for,
)
from libp2p.identity.identify import ID as ID_IDENTIFY
from libp2p.identity.identify.pb.identify_pb2 import (
Identify,
)
from libp2p.identity.identify_push import (
ID_PUSH as ID_IDENTIFY_PUSH,
identify_push_handler_for,
push_identify_to_peer,
)
from libp2p.identity.identify_push import ID_PUSH as ID_IDENTIFY_PUSH
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
@ -56,9 +56,6 @@ from libp2p.peer.peerinfo import (
# Configure logging
logger = logging.getLogger("libp2p.identity.identify-push-example")
# Default port configuration
DEFAULT_PORT = 8888
def custom_identify_push_handler_for(host):
"""
@ -241,25 +238,16 @@ def main() -> None:
"""Parse arguments and start the appropriate mode."""
description = """
This program demonstrates the libp2p identify/push protocol.
Without arguments, it runs as a listener on port 8888.
With -d parameter, it runs as a dialer on port 8889.
Without arguments, it runs as a listener on random port.
With -d parameter, it runs as a dialer on random port.
"""
example = (
f"/ip4/127.0.0.1/tcp/{DEFAULT_PORT}/p2p/"
"QmQn4SwGkDZkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-p",
"--port",
type=int,
help=(
f"port to listen on (default: {DEFAULT_PORT} for listener, "
f"{DEFAULT_PORT + 1} for dialer)"
),
)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
parser.add_argument(
"-d",
"--destination",
@ -270,13 +258,11 @@ def main() -> None:
try:
if args.destination:
# Run in dialer mode with default port DEFAULT_PORT + 1 if not specified
port = args.port if args.port is not None else DEFAULT_PORT + 1
trio.run(run_dialer, port, args.destination)
# Run in dialer mode with random available port if not specified
trio.run(run_dialer, args.port, args.destination)
else:
# Run in listener mode with default port DEFAULT_PORT if not specified
port = args.port if args.port is not None else DEFAULT_PORT
trio.run(run_listener, port)
# Run in listener mode with random available port if not specified
trio.run(run_listener, args.port)
except KeyboardInterrupt:
print("\nInterrupted by user")
logger.info("Interrupted by user")

View File

@ -55,7 +55,6 @@ async def send_ping(stream: INetStream) -> None:
async def run(port: int, destination: str) -> None:
localhost_ip = "127.0.0.1"
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
host = new_host(listen_addrs=[listen_addr])
@ -65,8 +64,8 @@ async def run(port: int, destination: str) -> None:
print(
"Run this from the same folder in another console:\n\n"
f"ping-demo -p {int(port) + 1} "
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id().pretty()}\n"
f"ping-demo "
f"-d {host.get_addrs()[0]}\n"
)
print("Waiting for incoming connection...")
@ -96,10 +95,8 @@ def main() -> None:
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
parser.add_argument(
"-p", "--port", default=8000, type=int, help="source port number"
)
parser.add_argument(
"-d",
"--destination",
@ -108,9 +105,6 @@ def main() -> None:
)
args = parser.parse_args()
if not args.port:
raise RuntimeError("failed to determine local port")
try:
trio.run(run, *(args.port, args.destination))
except KeyboardInterrupt:

View File

@ -1,9 +1,6 @@
import argparse
import logging
import socket
from typing import (
Optional,
)
import base58
import multiaddr
@ -109,7 +106,7 @@ async def monitor_peer_topics(pubsub, nursery, termination_event):
await trio.sleep(2)
async def run(topic: str, destination: Optional[str], port: Optional[int]) -> None:
async def run(topic: str, destination: str | None, port: int | None) -> None:
# Initialize network settings
localhost_ip = "127.0.0.1"