feat: Add identify-push raw format support and yamux logging improvements

- Add comprehensive integration tests for identify-push protocol
- Support both raw protobuf and varint message formats
- Improve yamux logging integration with LIBP2P_DEBUG
- Fix RawConnError handling to reduce log noise
- Add Ctrl+C handling to identify examples
- Enhance identify-push listener/dialer demo

Fixes: #784
This commit is contained in:
acul71
2025-07-20 20:19:18 +02:00
parent e6a355d395
commit 37e4fee9f8
11 changed files with 1181 additions and 314 deletions

View File

@ -1,6 +1,7 @@
import argparse
import base64
import logging
import sys
import multiaddr
import trio
@ -112,7 +113,12 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
# Replace the handler with our custom one
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, custom_identify_handler)
await trio.sleep_forever()
try:
await trio.sleep_forever()
except KeyboardInterrupt:
print("\n🛑 Shutting down listener...")
logger.info("Listener interrupted by user")
return
else:
# Create second host (dialer)
@ -147,38 +153,13 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
try:
print("Starting identify protocol...")
# Read the response properly based on the format
if use_varint_format:
# For length-prefixed format, read varint length first
from libp2p.utils.varint import decode_varint_from_bytes
# Read the response using the utility function
from libp2p.utils.varint import read_length_prefixed_protobuf
# Read varint length prefix
length_bytes = b""
while True:
b = await stream.read(1)
if not b:
raise Exception("Stream closed while reading varint length")
length_bytes += b
if b[0] & 0x80 == 0:
break
msg_length = decode_varint_from_bytes(length_bytes)
print(f"Expected message length: {msg_length} bytes")
# Read the protobuf message
response = await stream.read(msg_length)
if len(response) != msg_length:
raise Exception(
f"Incomplete message: expected {msg_length} bytes, "
f"got {len(response)}"
)
# Combine length prefix and message
full_response = length_bytes + response
else:
# For raw format, read all available data
response = await stream.read(8192)
full_response = response
response = await read_length_prefixed_protobuf(
stream, use_varint_format
)
full_response = response
await stream.close()
@ -254,6 +235,7 @@ def main() -> None:
"length-prefixed (new format)"
),
)
args = parser.parse_args()
# Determine format: raw format if --raw-format is specified, otherwise
@ -261,9 +243,19 @@ def main() -> None:
use_varint_format = not args.raw_format
try:
trio.run(run, *(args.port, args.destination, use_varint_format))
if args.destination:
# Run in dialer mode
trio.run(run, *(args.port, args.destination, use_varint_format))
else:
# Run in listener mode
trio.run(run, *(args.port, args.destination, use_varint_format))
except KeyboardInterrupt:
pass
print("\n👋 Goodbye!")
logger.info("Application interrupted by user")
except Exception as e:
print(f"\n❌ Error: {str(e)}")
logger.error("Error: %s", str(e))
sys.exit(1)
if __name__ == "__main__":

View File

@ -11,23 +11,26 @@ This example shows how to:
import logging
import multiaddr
import trio
from libp2p import (
new_host,
)
from libp2p.abc import (
INetStream,
)
from libp2p.crypto.secp256k1 import (
create_new_key_pair,
)
from libp2p.custom_types import (
TProtocol,
)
from libp2p.identity.identify import (
identify_handler_for,
from libp2p.identity.identify.pb.identify_pb2 import (
Identify,
)
from libp2p.identity.identify_push import (
ID_PUSH,
identify_push_handler_for,
push_identify_to_peer,
)
from libp2p.peer.peerinfo import (
@ -38,8 +41,145 @@ from libp2p.peer.peerinfo import (
logger = logging.getLogger(__name__)
def create_custom_identify_handler(host, host_name: str):
"""Create a custom identify handler that displays received information."""
async def handle_identify(stream: INetStream) -> None:
peer_id = stream.muxed_conn.peer_id
print(f"\n🔍 {host_name} received identify request from peer: {peer_id}")
# Get the standard identify response using the existing function
from libp2p.identity.identify.identify import (
_mk_identify_protobuf,
_remote_address_to_multiaddr,
)
# Get observed address
observed_multiaddr = None
try:
remote_address = stream.get_remote_address()
if remote_address:
observed_multiaddr = _remote_address_to_multiaddr(remote_address)
except Exception:
pass
# Build the identify protobuf
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
response_data = identify_msg.SerializeToString()
print(f" 📋 {host_name} identify information:")
if identify_msg.HasField("protocol_version"):
print(f" Protocol Version: {identify_msg.protocol_version}")
if identify_msg.HasField("agent_version"):
print(f" Agent Version: {identify_msg.agent_version}")
if identify_msg.HasField("public_key"):
print(f" Public Key: {identify_msg.public_key.hex()[:16]}...")
if identify_msg.listen_addrs:
print(" Listen Addresses:")
for addr_bytes in identify_msg.listen_addrs:
addr = multiaddr.Multiaddr(addr_bytes)
print(f" - {addr}")
if identify_msg.protocols:
print(" Supported Protocols:")
for protocol in identify_msg.protocols:
print(f" - {protocol}")
# Send the response
await stream.write(response_data)
await stream.close()
return handle_identify
def create_custom_identify_push_handler(host, host_name: str):
"""Create a custom identify/push handler that displays received information."""
async def handle_identify_push(stream: INetStream) -> None:
peer_id = stream.muxed_conn.peer_id
print(f"\n📤 {host_name} received identify/push from peer: {peer_id}")
try:
# Read the identify message using the utility function
from libp2p.utils.varint import read_length_prefixed_protobuf
data = await read_length_prefixed_protobuf(stream, use_varint_format=True)
# Parse the identify message
identify_msg = Identify()
identify_msg.ParseFromString(data)
print(" 📋 Received identify information:")
if identify_msg.HasField("protocol_version"):
print(f" Protocol Version: {identify_msg.protocol_version}")
if identify_msg.HasField("agent_version"):
print(f" Agent Version: {identify_msg.agent_version}")
if identify_msg.HasField("public_key"):
print(f" Public Key: {identify_msg.public_key.hex()[:16]}...")
if identify_msg.HasField("observed_addr") and identify_msg.observed_addr:
observed_addr = multiaddr.Multiaddr(identify_msg.observed_addr)
print(f" Observed Address: {observed_addr}")
if identify_msg.listen_addrs:
print(" Listen Addresses:")
for addr_bytes in identify_msg.listen_addrs:
addr = multiaddr.Multiaddr(addr_bytes)
print(f" - {addr}")
if identify_msg.protocols:
print(" Supported Protocols:")
for protocol in identify_msg.protocols:
print(f" - {protocol}")
# Update the peerstore with the new information
from libp2p.identity.identify_push.identify_push import (
_update_peerstore_from_identify,
)
await _update_peerstore_from_identify(
host.get_peerstore(), peer_id, identify_msg
)
print(f"{host_name} updated peerstore with new information")
except Exception as e:
print(f" ❌ Error processing identify/push: {e}")
finally:
await stream.close()
return handle_identify_push
async def display_peerstore_info(host, host_name: str, peer_id, description: str):
"""Display peerstore information for a specific peer."""
peerstore = host.get_peerstore()
try:
addrs = peerstore.addrs(peer_id)
except Exception:
addrs = []
try:
protocols = peerstore.get_protocols(peer_id)
except Exception:
protocols = []
print(f"\n📚 {host_name} peerstore for {description}:")
print(f" Peer ID: {peer_id}")
if addrs:
print(" Addresses:")
for addr in addrs:
print(f" - {addr}")
else:
print(" Addresses: None")
if protocols:
print(" Protocols:")
for protocol in protocols:
print(f" - {protocol}")
else:
print(" Protocols: None")
async def main() -> None:
print("\n==== Starting Identify-Push Example ====\n")
print("\n==== Starting Enhanced Identify-Push Example ====\n")
# Create key pairs for the two hosts
key_pair_1 = create_new_key_pair()
@ -48,45 +188,49 @@ async def main() -> None:
# Create the first host
host_1 = new_host(key_pair=key_pair_1)
# Set up the identify and identify/push handlers
host_1.set_stream_handler(TProtocol("/ipfs/id/1.0.0"), identify_handler_for(host_1))
host_1.set_stream_handler(ID_PUSH, identify_push_handler_for(host_1))
# Set up custom identify and identify/push handlers
host_1.set_stream_handler(
TProtocol("/ipfs/id/1.0.0"), create_custom_identify_handler(host_1, "Host 1")
)
host_1.set_stream_handler(
ID_PUSH, create_custom_identify_push_handler(host_1, "Host 1")
)
# Create the second host
host_2 = new_host(key_pair=key_pair_2)
# Set up the identify and identify/push handlers
host_2.set_stream_handler(TProtocol("/ipfs/id/1.0.0"), identify_handler_for(host_2))
host_2.set_stream_handler(ID_PUSH, identify_push_handler_for(host_2))
# Set up custom identify and identify/push handlers
host_2.set_stream_handler(
TProtocol("/ipfs/id/1.0.0"), create_custom_identify_handler(host_2, "Host 2")
)
host_2.set_stream_handler(
ID_PUSH, create_custom_identify_push_handler(host_2, "Host 2")
)
# Start listening on random ports using the run context manager
import multiaddr
listen_addr_1 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
listen_addr_2 = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
async with host_1.run([listen_addr_1]), host_2.run([listen_addr_2]):
# Get the addresses of both hosts
addr_1 = host_1.get_addrs()[0]
logger.info(f"Host 1 listening on {addr_1}")
print(f"Host 1 listening on {addr_1}")
print(f"Peer ID: {host_1.get_id().pretty()}")
addr_2 = host_2.get_addrs()[0]
logger.info(f"Host 2 listening on {addr_2}")
print(f"Host 2 listening on {addr_2}")
print(f"Peer ID: {host_2.get_id().pretty()}")
print("\nConnecting Host 2 to Host 1...")
print("🏠 Host Configuration:")
print(f" Host 1: {addr_1}")
print(f" Host 1 Peer ID: {host_1.get_id().pretty()}")
print(f" Host 2: {addr_2}")
print(f" Host 2 Peer ID: {host_2.get_id().pretty()}")
print("\n🔗 Connecting Host 2 to Host 1...")
# Connect host_2 to host_1
peer_info = info_from_p2p_addr(addr_1)
await host_2.connect(peer_info)
logger.info("Host 2 connected to Host 1")
print("Host 2 successfully connected to Host 1")
print("Host 2 successfully connected to Host 1")
# Run the identify protocol from host_2 to host_1
# (so Host 1 learns Host 2's address)
print("\n🔄 Running identify protocol (Host 2 → Host 1)...")
from libp2p.identity.identify.identify import ID as IDENTIFY_PROTOCOL_ID
stream = await host_2.new_stream(host_1.get_id(), (IDENTIFY_PROTOCOL_ID,))
@ -94,64 +238,58 @@ async def main() -> None:
await stream.close()
# Run the identify protocol from host_1 to host_2
# (so Host 2 learns Host 1's address)
print("\n🔄 Running identify protocol (Host 1 → Host 2)...")
stream = await host_1.new_stream(host_2.get_id(), (IDENTIFY_PROTOCOL_ID,))
response = await stream.read()
await stream.close()
# --- NEW CODE: Update Host 1's peerstore with Host 2's addresses ---
from libp2p.identity.identify.pb.identify_pb2 import (
Identify,
)
# Update Host 1's peerstore with Host 2's addresses
identify_msg = Identify()
identify_msg.ParseFromString(response)
peerstore_1 = host_1.get_peerstore()
peer_id_2 = host_2.get_id()
for addr_bytes in identify_msg.listen_addrs:
maddr = multiaddr.Multiaddr(addr_bytes)
# TTL can be any positive int
peerstore_1.add_addr(
peer_id_2,
maddr,
ttl=3600,
)
# --- END NEW CODE ---
peerstore_1.add_addr(peer_id_2, maddr, ttl=3600)
# Now Host 1's peerstore should have Host 2's address
peerstore_1 = host_1.get_peerstore()
peer_id_2 = host_2.get_id()
addrs_1_for_2 = peerstore_1.addrs(peer_id_2)
logger.info(
f"[DEBUG] Host 1 peerstore addresses for Host 2 before push: "
f"{addrs_1_for_2}"
)
print(
f"[DEBUG] Host 1 peerstore addresses for Host 2 before push: "
f"{addrs_1_for_2}"
# Display peerstore information before push
await display_peerstore_info(
host_1, "Host 1", peer_id_2, "Host 2 (before push)"
)
# Push identify information from host_1 to host_2
logger.info("Host 1 pushing identify information to Host 2")
print("\nHost 1 pushing identify information to Host 2...")
print("\n📤 Host 1 pushing identify information to Host 2...")
try:
# Call push_identify_to_peer which now returns a boolean
success = await push_identify_to_peer(host_1, host_2.get_id())
if success:
logger.info("Identify push completed successfully")
print("Identify push completed successfully!")
print("Identify push completed successfully!")
else:
logger.warning("Identify push didn't complete successfully")
print("\nWarning: Identify push didn't complete successfully")
print("⚠️ Identify push didn't complete successfully")
except Exception as e:
logger.error(f"Error during identify push: {str(e)}")
print(f"\nError during identify push: {str(e)}")
print(f"Error during identify push: {str(e)}")
# Add this at the end of your async with block:
await trio.sleep(0.5) # Give background tasks time to finish
# Give a moment for the identify/push processing to complete
await trio.sleep(0.5)
# Display peerstore information after push
await display_peerstore_info(host_1, "Host 1", peer_id_2, "Host 2 (after push)")
await display_peerstore_info(
host_2, "Host 2", host_1.get_id(), "Host 1 (after push)"
)
# Give more time for background tasks to finish and connections to stabilize
print("\n⏳ Waiting for background tasks to complete...")
await trio.sleep(1.0)
# Gracefully close connections to prevent connection errors
print("🔌 Closing connections...")
await host_2.disconnect(host_1.get_id())
await trio.sleep(0.2)
print("\n🎉 Example completed successfully!")
if __name__ == "__main__":

View File

@ -41,6 +41,9 @@ from libp2p.identity.identify import (
ID as ID_IDENTIFY,
identify_handler_for,
)
from libp2p.identity.identify.identify import (
_remote_address_to_multiaddr,
)
from libp2p.identity.identify.pb.identify_pb2 import (
Identify,
)
@ -72,40 +75,30 @@ def custom_identify_push_handler_for(host, use_varint_format: bool = True):
async def handle_identify_push(stream: INetStream) -> None:
peer_id = stream.muxed_conn.peer_id
# Get remote address information
try:
if use_varint_format:
# Read length-prefixed identify message from the stream
from libp2p.utils.varint import decode_varint_from_bytes
remote_address = stream.get_remote_address()
if remote_address:
observed_multiaddr = _remote_address_to_multiaddr(remote_address)
logger.info(
"Connection from remote peer %s, address: %s, multiaddr: %s",
peer_id,
remote_address,
observed_multiaddr,
)
print(f"\n🔗 Received identify/push request from peer: {peer_id}")
# Add the peer ID to create a complete multiaddr
complete_multiaddr = f"{observed_multiaddr}/p2p/{peer_id}"
print(f" Remote address: {complete_multiaddr}")
except Exception as e:
logger.error("Error getting remote address: %s", e)
print(f"\n🔗 Received identify/push request from peer: {peer_id}")
# First read the varint length prefix
length_bytes = b""
while True:
b = await stream.read(1)
if not b:
break
length_bytes += b
if b[0] & 0x80 == 0:
break
try:
# Use the utility function to read the protobuf message
from libp2p.utils.varint import read_length_prefixed_protobuf
if not length_bytes:
logger.warning("No length prefix received from peer %s", peer_id)
return
msg_length = decode_varint_from_bytes(length_bytes)
# Read the protobuf message
data = await stream.read(msg_length)
if len(data) != msg_length:
logger.warning("Incomplete message received from peer %s", peer_id)
return
else:
# Read raw protobuf message from the stream
data = b""
while True:
chunk = await stream.read(4096)
if not chunk:
break
data += chunk
data = await read_length_prefixed_protobuf(stream, use_varint_format)
identify_msg = Identify()
identify_msg.ParseFromString(data)
@ -155,11 +148,41 @@ def custom_identify_push_handler_for(host, use_varint_format: bool = True):
await _update_peerstore_from_identify(peerstore, peer_id, identify_msg)
logger.info("Successfully processed identify/push from peer %s", peer_id)
print(f"\nSuccessfully processed identify/push from peer {peer_id}")
print(f"Successfully processed identify/push from peer {peer_id}")
except Exception as e:
logger.error("Error processing identify/push from %s: %s", peer_id, e)
print(f"\nError processing identify/push from {peer_id}: {e}")
error_msg = str(e)
logger.error(
"Error processing identify/push from %s: %s", peer_id, error_msg
)
print(f"\nError processing identify/push from {peer_id}: {error_msg}")
# Check for specific format mismatch errors
if (
"Error parsing message" in error_msg
or "DecodeError" in error_msg
or "ParseFromString" in error_msg
):
print("\n" + "=" * 60)
print("FORMAT MISMATCH DETECTED!")
print("=" * 60)
if use_varint_format:
print(
"You are using length-prefixed format (default) but the "
"dialer is using raw protobuf format."
)
print("\nTo fix this, run the dialer with the --raw-format flag:")
print(
"identify-push-listener-dialer-demo --raw-format -d <ADDRESS>"
)
else:
print("You are using raw protobuf format but the dialer")
print("is using length-prefixed format (default).")
print(
"\nTo fix this, run the dialer without the --raw-format flag:"
)
print("identify-push-listener-dialer-demo -d <ADDRESS>")
print("=" * 60)
finally:
# Close the stream after processing
await stream.close()
@ -167,7 +190,9 @@ def custom_identify_push_handler_for(host, use_varint_format: bool = True):
return handle_identify_push
async def run_listener(port: int, use_varint_format: bool = True) -> None:
async def run_listener(
port: int, use_varint_format: bool = True, raw_format_flag: bool = False
) -> None:
"""Run a host in listener mode."""
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
print(
@ -187,29 +212,41 @@ async def run_listener(port: int, use_varint_format: bool = True) -> None:
)
host.set_stream_handler(
ID_IDENTIFY_PUSH,
identify_push_handler_for(host, use_varint_format=use_varint_format),
custom_identify_push_handler_for(host, use_varint_format=use_varint_format),
)
# Start listening
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
async with host.run([listen_addr]):
addr = host.get_addrs()[0]
logger.info("Listener host ready!")
print("Listener host ready!")
try:
async with host.run([listen_addr]):
addr = host.get_addrs()[0]
logger.info("Listener host ready!")
print("Listener host ready!")
logger.info(f"Listening on: {addr}")
print(f"Listening on: {addr}")
logger.info(f"Listening on: {addr}")
print(f"Listening on: {addr}")
logger.info(f"Peer ID: {host.get_id().pretty()}")
print(f"Peer ID: {host.get_id().pretty()}")
logger.info(f"Peer ID: {host.get_id().pretty()}")
print(f"Peer ID: {host.get_id().pretty()}")
print("\nRun dialer with command:")
print(f"identify-push-listener-dialer-demo -d {addr}")
print("\nWaiting for incoming connections... (Ctrl+C to exit)")
print("\nRun dialer with command:")
if raw_format_flag:
print(f"identify-push-listener-dialer-demo -d {addr} --raw-format")
else:
print(f"identify-push-listener-dialer-demo -d {addr}")
print("\nWaiting for incoming identify/push requests... (Ctrl+C to exit)")
# Keep running until interrupted
await trio.sleep_forever()
# Keep running until interrupted
try:
await trio.sleep_forever()
except KeyboardInterrupt:
print("\n🛑 Shutting down listener...")
logger.info("Listener interrupted by user")
return
except Exception as e:
logger.error(f"Listener error: {e}")
raise
async def run_dialer(
@ -256,7 +293,9 @@ async def run_dialer(
try:
await host.connect(peer_info)
logger.info("Successfully connected to listener!")
print("Successfully connected to listener!")
print("Successfully connected to listener!")
print(f" Connected to: {peer_info.peer_id}")
print(f" Full address: {destination}")
# Push identify information to the listener
logger.info("Pushing identify information to listener...")
@ -270,7 +309,7 @@ async def run_dialer(
if success:
logger.info("Identify push completed successfully!")
print("Identify push completed successfully!")
print("Identify push completed successfully!")
logger.info("Example completed successfully!")
print("\nExample completed successfully!")
@ -281,17 +320,57 @@ async def run_dialer(
logger.warning("Example completed with warnings.")
print("Example completed with warnings.")
except Exception as e:
logger.error(f"Error during identify push: {str(e)}")
print(f"\nError during identify push: {str(e)}")
error_msg = str(e)
logger.error(f"Error during identify push: {error_msg}")
print(f"\nError during identify push: {error_msg}")
# Check for specific format mismatch errors
if (
"Error parsing message" in error_msg
or "DecodeError" in error_msg
or "ParseFromString" in error_msg
):
print("\n" + "=" * 60)
print("FORMAT MISMATCH DETECTED!")
print("=" * 60)
if use_varint_format:
print(
"You are using length-prefixed format (default) but the "
"listener is using raw protobuf format."
)
print(
"\nTo fix this, run the dialer with the --raw-format flag:"
)
print(
f"identify-push-listener-dialer-demo --raw-format -d "
f"{destination}"
)
else:
print("You are using raw protobuf format but the listener")
print("is using length-prefixed format (default).")
print(
"\nTo fix this, run the dialer without the --raw-format "
"flag:"
)
print(f"identify-push-listener-dialer-demo -d {destination}")
print("=" * 60)
logger.error("Example completed with errors.")
print("Example completed with errors.")
# Continue execution despite the push error
except Exception as e:
logger.error(f"Error during dialer operation: {str(e)}")
print(f"\nError during dialer operation: {str(e)}")
raise
error_msg = str(e)
if "unable to connect" in error_msg or "SwarmException" in error_msg:
print(f"\n❌ Cannot connect to peer: {peer_info.peer_id}")
print(f" Address: {destination}")
print(f" Error: {error_msg}")
print("\n💡 Make sure the peer is running and the address is correct.")
return
else:
logger.error(f"Error during dialer operation: {error_msg}")
print(f"\nError during dialer operation: {error_msg}")
raise
def main() -> None:
@ -301,12 +380,21 @@ def main() -> None:
Without arguments, it runs as a listener on random port.
With -d parameter, it runs as a dialer on random port.
Port 0 (default) means the OS will automatically assign an available port.
This prevents port conflicts when running multiple instances.
Use --raw-format to send raw protobuf messages (old format) instead of
length-prefixed protobuf messages (new format, default).
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
parser.add_argument(
"-p",
"--port",
default=0,
type=int,
help="source port number (0 = random available port)",
)
parser.add_argument(
"-d",
"--destination",
@ -321,6 +409,7 @@ def main() -> None:
"length-prefixed (new format)"
),
)
args = parser.parse_args()
# Determine format: raw format if --raw-format is specified, otherwise
@ -333,12 +422,12 @@ def main() -> None:
trio.run(run_dialer, args.port, args.destination, use_varint_format)
else:
# Run in listener mode with random available port if not specified
trio.run(run_listener, args.port, use_varint_format)
trio.run(run_listener, args.port, use_varint_format, args.raw_format)
except KeyboardInterrupt:
print("\nInterrupted by user")
logger.info("Interrupted by user")
print("\n👋 Goodbye!")
logger.info("Application interrupted by user")
except Exception as e:
print(f"\nError: {str(e)}")
print(f"\nError: {str(e)}")
logger.error("Error: %s", str(e))
sys.exit(1)