diff --git a/examples/identify/identify.py b/examples/identify/identify.py index c6276ad5..4882d2c3 100644 --- a/examples/identify/identify.py +++ b/examples/identify/identify.py @@ -10,6 +10,7 @@ from libp2p import ( ) from libp2p.identity.identify.identify import ( ID as IDENTIFY_PROTOCOL_ID, + identify_handler_for, parse_identify_response, ) from libp2p.peer.peerinfo import ( @@ -50,7 +51,7 @@ def print_identify_response(identify_response): ) -async def run(port: int, destination: str) -> None: +async def run(port: int, destination: str, use_varint_format: bool = True) -> None: localhost_ip = "0.0.0.0" if not destination: @@ -58,11 +59,24 @@ async def run(port: int, destination: str) -> None: listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}") host_a = new_host() + # Set up identify handler with specified format + identify_handler = identify_handler_for( + host_a, use_varint_format=use_varint_format + ) + host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler) + async with host_a.run(listen_addrs=[listen_addr]): + # Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client + # connections + server_addr = str(host_a.get_addrs()[0]) + client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/") + + format_name = "length-prefixed" if use_varint_format else "raw protobuf" print( - "First host listening. Run this from another console:\n\n" + f"First host listening (using {format_name} format). " + f"Run this from another console:\n\n" f"identify-demo " - f"-d {host_a.get_addrs()[0]}\n" + f"-d {client_addr}\n" ) print("Waiting for incoming identify request...") await trio.sleep_forever() @@ -105,9 +119,12 @@ async def run(port: int, destination: str) -> None: def main() -> None: description = """ This program demonstrates the libp2p identify protocol. - First run identify-demo -p ' to start a listener. + First run 'identify-demo -p [--raw-format]' to start a listener. Then run 'identify-demo -d ' where is the multiaddress shown by the listener. + + Use --raw-format to send raw protobuf messages (old format) instead of + length-prefixed protobuf messages (new format, default). """ example_maddr = ( @@ -122,10 +139,22 @@ def main() -> None: type=str, help=f"destination multiaddr string, e.g. {example_maddr}", ) + parser.add_argument( + "--raw-format", + action="store_true", + help=( + "use raw protobuf format (old format) instead of " + "length-prefixed (new format)" + ), + ) args = parser.parse_args() + # Determine format: raw format if --raw-format is specified, otherwise + # length-prefixed + use_varint_format = not args.raw_format + try: - trio.run(run, *(args.port, args.destination)) + trio.run(run, *(args.port, args.destination, use_varint_format)) except KeyboardInterrupt: pass diff --git a/examples/identify_push/identify_push_listener_dialer.py b/examples/identify_push/identify_push_listener_dialer.py index 294b0d17..0e573e0b 100644 --- a/examples/identify_push/identify_push_listener_dialer.py +++ b/examples/identify_push/identify_push_listener_dialer.py @@ -57,18 +57,56 @@ from libp2p.peer.peerinfo import ( logger = logging.getLogger("libp2p.identity.identify-push-example") -def custom_identify_push_handler_for(host): +def custom_identify_push_handler_for(host, use_varint_format: bool = True): """ Create a custom handler for the identify/push protocol that logs and prints the identity information received from the dialer. + + Args: + host: The libp2p host + use_varint_format: If True, expect length-prefixed format; if False, expect + raw protobuf + """ async def handle_identify_push(stream: INetStream) -> None: peer_id = stream.muxed_conn.peer_id try: - # Read the identify message from the stream - data = await stream.read() + if use_varint_format: + # Read length-prefixed identify message from the stream + from libp2p.utils.varint import decode_varint_from_bytes + + # First read the varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break + + if not length_bytes: + logger.warning("No length prefix received from peer %s", peer_id) + return + + msg_length = decode_varint_from_bytes(length_bytes) + + # Read the protobuf message + data = await stream.read(msg_length) + if len(data) != msg_length: + logger.warning("Incomplete message received from peer %s", peer_id) + return + else: + # Read raw protobuf message from the stream + data = b"" + while True: + chunk = await stream.read(4096) + if not chunk: + break + data += chunk + identify_msg = Identify() identify_msg.ParseFromString(data) @@ -129,9 +167,13 @@ def custom_identify_push_handler_for(host): return handle_identify_push -async def run_listener(port: int) -> None: +async def run_listener(port: int, use_varint_format: bool = True) -> None: """Run a host in listener mode.""" - print(f"\n==== Starting Identify-Push Listener on port {port} ====\n") + format_name = "length-prefixed" if use_varint_format else "raw protobuf" + print( + f"\n==== Starting Identify-Push Listener on port {port} " + f"(using {format_name} format) ====\n" + ) # Create key pair for the listener key_pair = create_new_key_pair() @@ -139,9 +181,14 @@ async def run_listener(port: int) -> None: # Create the listener host host = new_host(key_pair=key_pair) - # Set up the identify and identify/push handlers - host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host)) - host.set_stream_handler(ID_IDENTIFY_PUSH, custom_identify_push_handler_for(host)) + # Set up the identify and identify/push handlers with specified format + host.set_stream_handler( + ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format) + ) + host.set_stream_handler( + ID_IDENTIFY_PUSH, + identify_push_handler_for(host, use_varint_format=use_varint_format), + ) # Start listening listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") @@ -165,9 +212,15 @@ async def run_listener(port: int) -> None: await trio.sleep_forever() -async def run_dialer(port: int, destination: str) -> None: +async def run_dialer( + port: int, destination: str, use_varint_format: bool = True +) -> None: """Run a host in dialer mode that connects to a listener.""" - print(f"\n==== Starting Identify-Push Dialer on port {port} ====\n") + format_name = "length-prefixed" if use_varint_format else "raw protobuf" + print( + f"\n==== Starting Identify-Push Dialer on port {port} " + f"(using {format_name} format) ====\n" + ) # Create key pair for the dialer key_pair = create_new_key_pair() @@ -175,9 +228,14 @@ async def run_dialer(port: int, destination: str) -> None: # Create the dialer host host = new_host(key_pair=key_pair) - # Set up the identify and identify/push handlers - host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host)) - host.set_stream_handler(ID_IDENTIFY_PUSH, identify_push_handler_for(host)) + # Set up the identify and identify/push handlers with specified format + host.set_stream_handler( + ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format) + ) + host.set_stream_handler( + ID_IDENTIFY_PUSH, + identify_push_handler_for(host, use_varint_format=use_varint_format), + ) # Start listening on a different port listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") @@ -206,7 +264,9 @@ async def run_dialer(port: int, destination: str) -> None: try: # Call push_identify_to_peer which returns a boolean - success = await push_identify_to_peer(host, peer_info.peer_id) + success = await push_identify_to_peer( + host, peer_info.peer_id, use_varint_format=use_varint_format + ) if success: logger.info("Identify push completed successfully!") @@ -240,11 +300,10 @@ def main() -> None: This program demonstrates the libp2p identify/push protocol. Without arguments, it runs as a listener on random port. With -d parameter, it runs as a dialer on random port. - """ - example = ( - "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" - ) + Use --raw-format to send raw protobuf messages (old format) instead of + length-prefixed protobuf messages (new format, default). + """ parser = argparse.ArgumentParser(description=description) parser.add_argument("-p", "--port", default=0, type=int, help="source port number") @@ -252,17 +311,29 @@ def main() -> None: "-d", "--destination", type=str, - help=f"destination multiaddr string, e.g. {example}", + help="destination multiaddr string", + ) + parser.add_argument( + "--raw-format", + action="store_true", + help=( + "use raw protobuf format (old format) instead of " + "length-prefixed (new format)" + ), ) args = parser.parse_args() + # Determine format: raw format if --raw-format is specified, otherwise + # length-prefixed + use_varint_format = not args.raw_format + try: if args.destination: # Run in dialer mode with random available port if not specified - trio.run(run_dialer, args.port, args.destination) + trio.run(run_dialer, args.port, args.destination, use_varint_format) else: # Run in listener mode with random available port if not specified - trio.run(run_listener, args.port) + trio.run(run_listener, args.port, use_varint_format) except KeyboardInterrupt: print("\nInterrupted by user") logger.info("Interrupted by user") diff --git a/libp2p/host/defaults.py b/libp2p/host/defaults.py index f0fe855e..5dac8bce 100644 --- a/libp2p/host/defaults.py +++ b/libp2p/host/defaults.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]": return OrderedDict( ( - (IdentifyID, identify_handler_for(host, use_varint_format=False)), + (IdentifyID, identify_handler_for(host, use_varint_format=True)), (PingID, handle_ping), ) ) diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index f9b031de..688737c3 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -47,40 +47,57 @@ AGENT_VERSION = get_agent_version() CONCURRENCY_LIMIT = 10 -def identify_push_handler_for(host: IHost) -> StreamHandlerFn: +def identify_push_handler_for( + host: IHost, use_varint_format: bool = True +) -> StreamHandlerFn: """ Create a handler for the identify/push protocol. This handler receives pushed identify messages from remote peers and updates the local peerstore with the new information. + + Args: + host: The libp2p host. + use_varint_format: If True, expect length-prefixed format; if False, + expect raw protobuf. + """ async def handle_identify_push(stream: INetStream) -> None: peer_id = stream.muxed_conn.peer_id try: - # Read length-prefixed identify message from the stream - # First read the varint length prefix - length_bytes = b"" - while True: - b = await stream.read(1) - if not b: - break - length_bytes += b - if b[0] & 0x80 == 0: - break + if use_varint_format: + # Read length-prefixed identify message from the stream + # First read the varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break - if not length_bytes: - logger.warning("No length prefix received from peer %s", peer_id) - return + if not length_bytes: + logger.warning("No length prefix received from peer %s", peer_id) + return - msg_length = decode_varint_from_bytes(length_bytes) + msg_length = decode_varint_from_bytes(length_bytes) - # Read the protobuf message - data = await stream.read(msg_length) - if len(data) != msg_length: - logger.warning("Incomplete message received from peer %s", peer_id) - return + # Read the protobuf message + data = await stream.read(msg_length) + if len(data) != msg_length: + logger.warning("Incomplete message received from peer %s", peer_id) + return + else: + # Read raw protobuf message from the stream + data = b"" + while True: + chunk = await stream.read(4096) + if not chunk: + break + data += chunk identify_msg = Identify() identify_msg.ParseFromString(data) @@ -162,6 +179,7 @@ async def push_identify_to_peer( peer_id: ID, observed_multiaddr: Multiaddr | None = None, limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT), + use_varint_format: bool = True, ) -> bool: """ Push an identify message to a specific peer. @@ -169,10 +187,16 @@ async def push_identify_to_peer( This function opens a stream to the peer using the identify/push protocol, sends the identify message, and closes the stream. - Returns - ------- - bool - True if the push was successful, False otherwise. + Args: + host: The libp2p host. + peer_id: The peer ID to push to. + observed_multiaddr: The observed multiaddress (optional). + limit: Semaphore for concurrency control. + use_varint_format: If True, send length-prefixed format; if False, + send raw protobuf. + + Returns: + bool: True if the push was successful, False otherwise. """ async with limit: @@ -184,9 +208,13 @@ async def push_identify_to_peer( identify_msg = _mk_identify_protobuf(host, observed_multiaddr) response = identify_msg.SerializeToString() - # Send length-prefixed identify message - await stream.write(varint.encode_uvarint(len(response))) - await stream.write(response) + if use_varint_format: + # Send length-prefixed identify message + await stream.write(varint.encode_uvarint(len(response))) + await stream.write(response) + else: + # Send raw protobuf message + await stream.write(response) # Close the stream await stream.close() @@ -202,18 +230,37 @@ async def push_identify_to_peers( host: IHost, peer_ids: set[ID] | None = None, observed_multiaddr: Multiaddr | None = None, + use_varint_format: bool = True, ) -> None: """ Push an identify message to multiple peers in parallel. If peer_ids is None, push to all connected peers. + + Args: + host: The libp2p host. + peer_ids: Set of peer IDs to push to (if None, push to all connected peers). + observed_multiaddr: The observed multiaddress (optional). + use_varint_format: If True, send length-prefixed format; if False, + send raw protobuf. + """ if peer_ids is None: # Get all connected peers peer_ids = set(host.get_connected_peers()) + # Create a single shared semaphore for concurrency control + limit = trio.Semaphore(CONCURRENCY_LIMIT) + # Push to each peer in parallel using a trio.Nursery - # limiting concurrent connections to 10 + # limiting concurrent connections to CONCURRENCY_LIMIT async with trio.open_nursery() as nursery: for peer_id in peer_ids: - nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr) + nursery.start_soon( + push_identify_to_peer, + host, + peer_id, + observed_multiaddr, + limit, + use_varint_format, + ) diff --git a/newsfragments/761.breaking.rst b/newsfragments/761.breaking.rst new file mode 100644 index 00000000..cd63a4e3 --- /dev/null +++ b/newsfragments/761.breaking.rst @@ -0,0 +1 @@ +identify protocol use now prefix-length messages by default. use use_varint_format param for old raw messages diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index 935fb2c0..e62bad7a 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -459,7 +459,11 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): lock = trio.Lock() async def mock_push_identify_to_peer( - host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT) + host, + peer_id, + observed_multiaddr=None, + limit=trio.Semaphore(CONCURRENCY_LIMIT), + use_varint_format=True, ) -> bool: """ Mock function to test concurrency by simulating an identify message.