breaking: identify protocol use now prefix-length messages by default. use use_varint_format param for old raw messages

This commit is contained in:
acul71
2025-07-13 17:24:56 +02:00
parent 4bbb08ce2d
commit 1c59653946
6 changed files with 209 additions and 57 deletions

View File

@ -10,6 +10,7 @@ from libp2p import (
) )
from libp2p.identity.identify.identify import ( from libp2p.identity.identify.identify import (
ID as IDENTIFY_PROTOCOL_ID, ID as IDENTIFY_PROTOCOL_ID,
identify_handler_for,
parse_identify_response, parse_identify_response,
) )
from libp2p.peer.peerinfo import ( from libp2p.peer.peerinfo import (
@ -50,7 +51,7 @@ def print_identify_response(identify_response):
) )
async def run(port: int, destination: str) -> None: async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
localhost_ip = "0.0.0.0" localhost_ip = "0.0.0.0"
if not destination: if not destination:
@ -58,11 +59,24 @@ async def run(port: int, destination: str) -> None:
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}") listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
host_a = new_host() host_a = new_host()
# Set up identify handler with specified format
identify_handler = identify_handler_for(
host_a, use_varint_format=use_varint_format
)
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
async with host_a.run(listen_addrs=[listen_addr]): async with host_a.run(listen_addrs=[listen_addr]):
# Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
# connections
server_addr = str(host_a.get_addrs()[0])
client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/")
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
print( print(
"First host listening. Run this from another console:\n\n" f"First host listening (using {format_name} format). "
f"Run this from another console:\n\n"
f"identify-demo " f"identify-demo "
f"-d {host_a.get_addrs()[0]}\n" f"-d {client_addr}\n"
) )
print("Waiting for incoming identify request...") print("Waiting for incoming identify request...")
await trio.sleep_forever() await trio.sleep_forever()
@ -105,9 +119,12 @@ async def run(port: int, destination: str) -> None:
def main() -> None: def main() -> None:
description = """ description = """
This program demonstrates the libp2p identify protocol. This program demonstrates the libp2p identify protocol.
First run identify-demo -p <PORT>' to start a listener. First run 'identify-demo -p <PORT> [--raw-format]' to start a listener.
Then run 'identify-demo <ANOTHER_PORT> -d <DESTINATION>' Then run 'identify-demo <ANOTHER_PORT> -d <DESTINATION>'
where <DESTINATION> is the multiaddress shown by the listener. where <DESTINATION> is the multiaddress shown by the listener.
Use --raw-format to send raw protobuf messages (old format) instead of
length-prefixed protobuf messages (new format, default).
""" """
example_maddr = ( example_maddr = (
@ -122,10 +139,22 @@ def main() -> None:
type=str, type=str,
help=f"destination multiaddr string, e.g. {example_maddr}", help=f"destination multiaddr string, e.g. {example_maddr}",
) )
parser.add_argument(
"--raw-format",
action="store_true",
help=(
"use raw protobuf format (old format) instead of "
"length-prefixed (new format)"
),
)
args = parser.parse_args() args = parser.parse_args()
# Determine format: raw format if --raw-format is specified, otherwise
# length-prefixed
use_varint_format = not args.raw_format
try: try:
trio.run(run, *(args.port, args.destination)) trio.run(run, *(args.port, args.destination, use_varint_format))
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@ -57,18 +57,56 @@ from libp2p.peer.peerinfo import (
logger = logging.getLogger("libp2p.identity.identify-push-example") logger = logging.getLogger("libp2p.identity.identify-push-example")
def custom_identify_push_handler_for(host): def custom_identify_push_handler_for(host, use_varint_format: bool = True):
""" """
Create a custom handler for the identify/push protocol that logs and prints Create a custom handler for the identify/push protocol that logs and prints
the identity information received from the dialer. the identity information received from the dialer.
Args:
host: The libp2p host
use_varint_format: If True, expect length-prefixed format; if False, expect
raw protobuf
""" """
async def handle_identify_push(stream: INetStream) -> None: async def handle_identify_push(stream: INetStream) -> None:
peer_id = stream.muxed_conn.peer_id peer_id = stream.muxed_conn.peer_id
try: try:
# Read the identify message from the stream if use_varint_format:
data = await stream.read() # Read length-prefixed identify message from the stream
from libp2p.utils.varint import decode_varint_from_bytes
# First read the varint length prefix
length_bytes = b""
while True:
b = await stream.read(1)
if not b:
break
length_bytes += b
if b[0] & 0x80 == 0:
break
if not length_bytes:
logger.warning("No length prefix received from peer %s", peer_id)
return
msg_length = decode_varint_from_bytes(length_bytes)
# Read the protobuf message
data = await stream.read(msg_length)
if len(data) != msg_length:
logger.warning("Incomplete message received from peer %s", peer_id)
return
else:
# Read raw protobuf message from the stream
data = b""
while True:
chunk = await stream.read(4096)
if not chunk:
break
data += chunk
identify_msg = Identify() identify_msg = Identify()
identify_msg.ParseFromString(data) identify_msg.ParseFromString(data)
@ -129,9 +167,13 @@ def custom_identify_push_handler_for(host):
return handle_identify_push return handle_identify_push
async def run_listener(port: int) -> None: async def run_listener(port: int, use_varint_format: bool = True) -> None:
"""Run a host in listener mode.""" """Run a host in listener mode."""
print(f"\n==== Starting Identify-Push Listener on port {port} ====\n") format_name = "length-prefixed" if use_varint_format else "raw protobuf"
print(
f"\n==== Starting Identify-Push Listener on port {port} "
f"(using {format_name} format) ====\n"
)
# Create key pair for the listener # Create key pair for the listener
key_pair = create_new_key_pair() key_pair = create_new_key_pair()
@ -139,9 +181,14 @@ async def run_listener(port: int) -> None:
# Create the listener host # Create the listener host
host = new_host(key_pair=key_pair) host = new_host(key_pair=key_pair)
# Set up the identify and identify/push handlers # Set up the identify and identify/push handlers with specified format
host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host)) host.set_stream_handler(
host.set_stream_handler(ID_IDENTIFY_PUSH, custom_identify_push_handler_for(host)) ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format)
)
host.set_stream_handler(
ID_IDENTIFY_PUSH,
identify_push_handler_for(host, use_varint_format=use_varint_format),
)
# Start listening # Start listening
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
@ -165,9 +212,15 @@ async def run_listener(port: int) -> None:
await trio.sleep_forever() await trio.sleep_forever()
async def run_dialer(port: int, destination: str) -> None: async def run_dialer(
port: int, destination: str, use_varint_format: bool = True
) -> None:
"""Run a host in dialer mode that connects to a listener.""" """Run a host in dialer mode that connects to a listener."""
print(f"\n==== Starting Identify-Push Dialer on port {port} ====\n") format_name = "length-prefixed" if use_varint_format else "raw protobuf"
print(
f"\n==== Starting Identify-Push Dialer on port {port} "
f"(using {format_name} format) ====\n"
)
# Create key pair for the dialer # Create key pair for the dialer
key_pair = create_new_key_pair() key_pair = create_new_key_pair()
@ -175,9 +228,14 @@ async def run_dialer(port: int, destination: str) -> None:
# Create the dialer host # Create the dialer host
host = new_host(key_pair=key_pair) host = new_host(key_pair=key_pair)
# Set up the identify and identify/push handlers # Set up the identify and identify/push handlers with specified format
host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host)) host.set_stream_handler(
host.set_stream_handler(ID_IDENTIFY_PUSH, identify_push_handler_for(host)) ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format)
)
host.set_stream_handler(
ID_IDENTIFY_PUSH,
identify_push_handler_for(host, use_varint_format=use_varint_format),
)
# Start listening on a different port # Start listening on a different port
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
@ -206,7 +264,9 @@ async def run_dialer(port: int, destination: str) -> None:
try: try:
# Call push_identify_to_peer which returns a boolean # Call push_identify_to_peer which returns a boolean
success = await push_identify_to_peer(host, peer_info.peer_id) success = await push_identify_to_peer(
host, peer_info.peer_id, use_varint_format=use_varint_format
)
if success: if success:
logger.info("Identify push completed successfully!") logger.info("Identify push completed successfully!")
@ -240,11 +300,10 @@ def main() -> None:
This program demonstrates the libp2p identify/push protocol. This program demonstrates the libp2p identify/push protocol.
Without arguments, it runs as a listener on random port. Without arguments, it runs as a listener on random port.
With -d parameter, it runs as a dialer on random port. With -d parameter, it runs as a dialer on random port.
"""
example = ( Use --raw-format to send raw protobuf messages (old format) instead of
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" length-prefixed protobuf messages (new format, default).
) """
parser = argparse.ArgumentParser(description=description) parser = argparse.ArgumentParser(description=description)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number") parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
@ -252,17 +311,29 @@ def main() -> None:
"-d", "-d",
"--destination", "--destination",
type=str, type=str,
help=f"destination multiaddr string, e.g. {example}", help="destination multiaddr string",
)
parser.add_argument(
"--raw-format",
action="store_true",
help=(
"use raw protobuf format (old format) instead of "
"length-prefixed (new format)"
),
) )
args = parser.parse_args() args = parser.parse_args()
# Determine format: raw format if --raw-format is specified, otherwise
# length-prefixed
use_varint_format = not args.raw_format
try: try:
if args.destination: if args.destination:
# Run in dialer mode with random available port if not specified # Run in dialer mode with random available port if not specified
trio.run(run_dialer, args.port, args.destination) trio.run(run_dialer, args.port, args.destination, use_varint_format)
else: else:
# Run in listener mode with random available port if not specified # Run in listener mode with random available port if not specified
trio.run(run_listener, args.port) trio.run(run_listener, args.port, use_varint_format)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nInterrupted by user") print("\nInterrupted by user")
logger.info("Interrupted by user") logger.info("Interrupted by user")

View File

@ -27,7 +27,7 @@ if TYPE_CHECKING:
def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]": def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]":
return OrderedDict( return OrderedDict(
( (
(IdentifyID, identify_handler_for(host, use_varint_format=False)), (IdentifyID, identify_handler_for(host, use_varint_format=True)),
(PingID, handle_ping), (PingID, handle_ping),
) )
) )

View File

@ -47,40 +47,57 @@ AGENT_VERSION = get_agent_version()
CONCURRENCY_LIMIT = 10 CONCURRENCY_LIMIT = 10
def identify_push_handler_for(host: IHost) -> StreamHandlerFn: def identify_push_handler_for(
host: IHost, use_varint_format: bool = True
) -> StreamHandlerFn:
""" """
Create a handler for the identify/push protocol. Create a handler for the identify/push protocol.
This handler receives pushed identify messages from remote peers and updates This handler receives pushed identify messages from remote peers and updates
the local peerstore with the new information. the local peerstore with the new information.
Args:
host: The libp2p host.
use_varint_format: If True, expect length-prefixed format; if False,
expect raw protobuf.
""" """
async def handle_identify_push(stream: INetStream) -> None: async def handle_identify_push(stream: INetStream) -> None:
peer_id = stream.muxed_conn.peer_id peer_id = stream.muxed_conn.peer_id
try: try:
# Read length-prefixed identify message from the stream if use_varint_format:
# First read the varint length prefix # Read length-prefixed identify message from the stream
length_bytes = b"" # First read the varint length prefix
while True: length_bytes = b""
b = await stream.read(1) while True:
if not b: b = await stream.read(1)
break if not b:
length_bytes += b break
if b[0] & 0x80 == 0: length_bytes += b
break if b[0] & 0x80 == 0:
break
if not length_bytes: if not length_bytes:
logger.warning("No length prefix received from peer %s", peer_id) logger.warning("No length prefix received from peer %s", peer_id)
return return
msg_length = decode_varint_from_bytes(length_bytes) msg_length = decode_varint_from_bytes(length_bytes)
# Read the protobuf message # Read the protobuf message
data = await stream.read(msg_length) data = await stream.read(msg_length)
if len(data) != msg_length: if len(data) != msg_length:
logger.warning("Incomplete message received from peer %s", peer_id) logger.warning("Incomplete message received from peer %s", peer_id)
return return
else:
# Read raw protobuf message from the stream
data = b""
while True:
chunk = await stream.read(4096)
if not chunk:
break
data += chunk
identify_msg = Identify() identify_msg = Identify()
identify_msg.ParseFromString(data) identify_msg.ParseFromString(data)
@ -162,6 +179,7 @@ async def push_identify_to_peer(
peer_id: ID, peer_id: ID,
observed_multiaddr: Multiaddr | None = None, observed_multiaddr: Multiaddr | None = None,
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT), limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
use_varint_format: bool = True,
) -> bool: ) -> bool:
""" """
Push an identify message to a specific peer. Push an identify message to a specific peer.
@ -169,10 +187,16 @@ async def push_identify_to_peer(
This function opens a stream to the peer using the identify/push protocol, This function opens a stream to the peer using the identify/push protocol,
sends the identify message, and closes the stream. sends the identify message, and closes the stream.
Returns Args:
------- host: The libp2p host.
bool peer_id: The peer ID to push to.
True if the push was successful, False otherwise. observed_multiaddr: The observed multiaddress (optional).
limit: Semaphore for concurrency control.
use_varint_format: If True, send length-prefixed format; if False,
send raw protobuf.
Returns:
bool: True if the push was successful, False otherwise.
""" """
async with limit: async with limit:
@ -184,9 +208,13 @@ async def push_identify_to_peer(
identify_msg = _mk_identify_protobuf(host, observed_multiaddr) identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
response = identify_msg.SerializeToString() response = identify_msg.SerializeToString()
# Send length-prefixed identify message if use_varint_format:
await stream.write(varint.encode_uvarint(len(response))) # Send length-prefixed identify message
await stream.write(response) await stream.write(varint.encode_uvarint(len(response)))
await stream.write(response)
else:
# Send raw protobuf message
await stream.write(response)
# Close the stream # Close the stream
await stream.close() await stream.close()
@ -202,18 +230,37 @@ async def push_identify_to_peers(
host: IHost, host: IHost,
peer_ids: set[ID] | None = None, peer_ids: set[ID] | None = None,
observed_multiaddr: Multiaddr | None = None, observed_multiaddr: Multiaddr | None = None,
use_varint_format: bool = True,
) -> None: ) -> None:
""" """
Push an identify message to multiple peers in parallel. Push an identify message to multiple peers in parallel.
If peer_ids is None, push to all connected peers. If peer_ids is None, push to all connected peers.
Args:
host: The libp2p host.
peer_ids: Set of peer IDs to push to (if None, push to all connected peers).
observed_multiaddr: The observed multiaddress (optional).
use_varint_format: If True, send length-prefixed format; if False,
send raw protobuf.
""" """
if peer_ids is None: if peer_ids is None:
# Get all connected peers # Get all connected peers
peer_ids = set(host.get_connected_peers()) peer_ids = set(host.get_connected_peers())
# Create a single shared semaphore for concurrency control
limit = trio.Semaphore(CONCURRENCY_LIMIT)
# Push to each peer in parallel using a trio.Nursery # Push to each peer in parallel using a trio.Nursery
# limiting concurrent connections to 10 # limiting concurrent connections to CONCURRENCY_LIMIT
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for peer_id in peer_ids: for peer_id in peer_ids:
nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr) nursery.start_soon(
push_identify_to_peer,
host,
peer_id,
observed_multiaddr,
limit,
use_varint_format,
)

View File

@ -0,0 +1 @@
identify protocol use now prefix-length messages by default. use use_varint_format param for old raw messages

View File

@ -459,7 +459,11 @@ async def test_push_identify_to_peers_respects_concurrency_limit():
lock = trio.Lock() lock = trio.Lock()
async def mock_push_identify_to_peer( async def mock_push_identify_to_peer(
host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT) host,
peer_id,
observed_multiaddr=None,
limit=trio.Semaphore(CONCURRENCY_LIMIT),
use_varint_format=True,
) -> bool: ) -> bool:
""" """
Mock function to test concurrency by simulating an identify message. Mock function to test concurrency by simulating an identify message.