diff --git a/newsfragments/761.feature.rst b/newsfragments/761.feature.rst new file mode 100644 index 00000000..fd38866c --- /dev/null +++ b/newsfragments/761.feature.rst @@ -0,0 +1 @@ +add length-prefixed support to identify protocol diff --git a/tests/interop/js_libp2p/py_node/ping.py b/tests/interop/js_libp2p/py_node/ping.py deleted file mode 100644 index a13a8ace..00000000 --- a/tests/interop/js_libp2p/py_node/ping.py +++ /dev/null @@ -1,398 +0,0 @@ -import argparse -import logging - -from cryptography.hazmat.primitives.asymmetric import ( - x25519, -) -import multiaddr -import trio - -from libp2p import ( - generate_new_rsa_identity, - new_host, -) -from libp2p.custom_types import ( - TProtocol, -) -from libp2p.network.stream.net_stream import ( - INetStream, -) -from libp2p.peer.peerinfo import ( - info_from_p2p_addr, -) -from libp2p.security.noise.transport import Transport as NoiseTransport -from libp2p.stream_muxer.yamux.yamux import ( - Yamux, -) -from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID - -# Configure detailed logging -logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s - %(levelname)s - %(message)s", - handlers=[ - logging.StreamHandler(), - logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), - ], -) - -PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") -PING_LENGTH = 32 -RESP_TIMEOUT = 60 - - -async def handle_ping(stream: INetStream) -> None: - """Handle incoming ping requests from js-libp2p clients""" - peer_id = stream.muxed_conn.peer_id - print(f"[INFO] New ping stream opened by {peer_id}") - logging.info(f"Ping handler called for peer {peer_id}") - - ping_count = 0 - - try: - while True: - try: - print(f"[INFO] Waiting for ping data from {peer_id}...") - logging.debug(f"Stream state: {stream}") - data = await stream.read(PING_LENGTH) - - if not data: - print( - f"[INFO] No data received," - f"connection likely closed by {peer_id}" - ) - logging.debug("No data received, stream closed") - break - - if len(data) == 0: - print(f"[INFO] Empty data received, connection closed by {peer_id}") - logging.debug("Empty data received") - break - - ping_count += 1 - print( - f"[PING {ping_count}] Received ping from {peer_id}:" - f"{len(data)} bytes" - ) - logging.debug(f"Ping data: {data.hex()}") - - await stream.write(data) - print(f"[PING {ping_count}] Echoed ping back to {peer_id}") - - except Exception as e: - print(f"[ERROR] Error in ping loop with {peer_id}: {e}") - logging.exception("Ping loop error") - break - - except Exception as e: - print(f"[ERROR] Error handling ping from {peer_id}: {e}") - logging.exception("Ping handler error") - finally: - try: - print(f"[INFO] Closing ping stream with {peer_id}") - await stream.close() - except Exception as e: - logging.debug(f"Error closing stream: {e}") - - print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)") - - -async def send_ping_sequence(stream: INetStream, count: int = 5) -> None: - """Send a sequence of pings compatible with js-libp2p.""" - peer_id = stream.muxed_conn.peer_id - print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)") - - import os - import time - - rtts = [] - - for i in range(1, count + 1): - try: - payload = os.urandom(PING_LENGTH) - print(f"[PING {i}/{count}] Sending ping to {peer_id}") - logging.debug(f"Sending payload: {payload.hex()}") - start_time = time.time() - - await stream.write(payload) - - with trio.fail_after(RESP_TIMEOUT): - response = await stream.read(PING_LENGTH) - - end_time = time.time() - rtt = (end_time - start_time) * 1000 - - if ( - response - and len(response) >= PING_LENGTH - and response[:PING_LENGTH] == payload - ): - rtts.append(rtt) - print(f"[PING {i}] Successful! RTT: {rtt:.2f}ms") - else: - print(f"[ERROR] Ping {i} failed: response mismatch or incomplete") - if response: - logging.debug(f"Expected: {payload.hex()}") - logging.debug(f"Received: {response.hex()}") - - if i < count: - await trio.sleep(1) - - except trio.TooSlowError: - print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s") - except Exception as e: - print(f"[ERROR] Ping {i} failed: {e}") - logging.exception(f"Ping {i} error") - - if rtts: - avg_rtt = sum(rtts) / len(rtts) - min_rtt = min(rtts) - max_rtts = max(rtts) - success_count = len(rtts) - loss_rate = ((count - success_count) / count) * 100 - - print( - f" Packets: Sent={count}, Received={success_count}," - f" Lost={count - success_count}" - ) - print(f" Loss rate: {loss_rate:.1f}%") - print( - f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms," f"max={max_rtts:.2f}ms" - ) - else: - print(f"\n[STATS] All pings failed ({count} attempts)") - - -def create_noise_keypair(): - try: - x25519_private_key = x25519.X25519PrivateKey.generate() - - class NoisePrivateKey: - def __init__(self, key): - self._key = key - - def to_bytes(self): - return self._key.private_bytes_raw() - - def public_key(self): - return NoisePublicKey(self._key.public_key()) - - def get_public_key(self): - return NoisePublicKey(self._key.public_key()) - - class NoisePublicKey: - def __init__(self, key): - self._key = key - - def to_bytes(self): - return self._key.public_bytes_raw() - - return NoisePrivateKey(x25519_private_key) - except Exception as e: - logging.error(f"Failed to create Noise keypair: {e}") - return None - - -async def run_server(port: int) -> None: - """Run ping server that accepts connections from js-libp2p clients.""" - listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - - key_pair = generate_new_rsa_identity() - logging.debug("Generated RSA keypair") - - noise_privkey = create_noise_keypair() - logging.debug("Generated Noise keypair") - - noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) - logging.debug(f"Noise transport initialized: {noise_transport}") - sec_opt = {TProtocol("/noise"): noise_transport} - muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} - - logging.info(f"Using muxer: {muxer_opt}") - - host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) - - print("[INFO] Starting py-libp2p ping server...") - - async with host.run(listen_addrs=[listen_addr]): - print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}") - host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) - - alt_protocols = [ - TProtocol("/ping/1.0.0"), - TProtocol("/libp2p/ping/1.0.0"), - ] - - for alt_proto in alt_protocols: - print(f"[INFO] Also registering handler for: {alt_proto}") - host.set_stream_handler(alt_proto, handle_ping) - - print("[INFO] Server started!") - print(f"[INFO] Peer ID: {host.get_id()}") - print(f"[INFO] Listening: /ip4/0.0.0.0/tcp/{port}") - print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}") - # print(f"[INFO] Security: Noise encryption") - # print(f"[INFO] Muxer: Yamux stream multiplexing") - - print("\n[INFO] Registered protocols:") - print(f" - {PING_PROTOCOL_ID}") - for proto in alt_protocols: - print(f" - {proto}") - - peer_id = host.get_id() - print("\n[TEST] Test with js-libp2p:") - print(f" node ping.js client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") - - print("\n[TEST] Test with py-libp2p:") - print(f" python ping.py client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") - - print("\n[INFO] Waiting for connections...") - print("Press Ctrl+C to exit") - - await trio.sleep_forever() - - -async def run_client(destination: str, count: int = 5) -> None: - """Run ping client to test connectivity with another peer.""" - listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") - - key_pair = generate_new_rsa_identity() - logging.debug("Generated RSA keypair") - - noise_privkey = create_noise_keypair() - logging.debug("Generated Noise keypair") - - noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) - logging.debug(f"Noise transport initialized: {noise_transport}") - sec_opt = {TProtocol("/noise"): noise_transport} - muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} - - logging.info(f"Using muxer: {muxer_opt}") - - host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) - - print("[INFO] Starting py-libp2p ping client...") - - async with host.run(listen_addrs=[listen_addr]): - print(f"[INFO] Our Peer ID: {host.get_id()}") - print(f"[INFO] Target: {destination}") - print("[INFO] Security: Noise encryption") - print("[INFO] Muxer: Yamux stream multiplexing") - - try: - maddr = multiaddr.Multiaddr(destination) - info = info_from_p2p_addr(maddr) - target_peer_id = info.peer_id - - print(f"[INFO] Target Peer ID: {target_peer_id}") - print("[INFO] Connecting to peer...") - - await host.connect(info) - print("[INFO] Connection established!") - - protocols_to_try = [ - PING_PROTOCOL_ID, - TProtocol("/ping/1.0.0"), - TProtocol("/libp2p/ping/1.0.0"), - ] - - stream = None - - for proto in protocols_to_try: - try: - print(f"[INFO] Trying to open stream with protocol: {proto}") - stream = await host.new_stream(target_peer_id, [proto]) - print(f"[INFO] Stream opened with protocol: {proto}") - break - except Exception as e: - print(f"[ERROR] Failed to open stream with {proto}: {e}") - continue - - if not stream: - print("[ERROR] Failed to open stream with any ping protocol") - return 1 - - await send_ping_sequence(stream, count) - - await stream.close() - - except Exception as e: - print(f"[ERROR] Client error: {e}") - import traceback - - traceback.print_exc() - return 1 - - print("\n[INFO] Client stopped") - return 0 - - -def main() -> None: - """Main function with argument parsing.""" - description = """ - py-libp2p ping tool for interoperability testing with js-libp2p. - Uses Noise encryption and Yamux multiplexing for compatibility. - - Server mode: Listens for ping requests from js-libp2p or py-libp2p clients. - Client mode: Sends ping requests to js-libp2p or py-libp2p servers. - """ - - example_maddr = ( - "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" - ) - - parser = argparse.ArgumentParser( - description=description, - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=f""" -Examples: - python ping.py server # Start server on port 8000 - python ping.py server --port 9000 # Start server on port 9000 - python ping.py client {example_maddr} - python ping.py client {example_maddr} --count 10 - """, - ) - - subparsers = parser.add_subparsers(dest="mode", help="Operation mode") - - server_parser = subparsers.add_parser("server", help="Run as ping server") - server_parser.add_argument( - "--port", "-p", type=int, default=8000, help="Port to listen on (default: 8000)" - ) - - client_parser = subparsers.add_parser("client", help="Run as ping client") - client_parser.add_argument("destination", help="Target peer multiaddr") - client_parser.add_argument( - "--count", - "-c", - type=int, - default=5, - help="Number of pings to send (default: 5)", - ) - - args = parser.parse_args() - - if not args.mode: - parser.print_help() - return 1 - - try: - if args.mode == "server": - trio.run(run_server, args.port) - elif args.mode == "client": - return trio.run(run_client, args.destination, args.count) - except KeyboardInterrupt: - print("\n[INFO] Goodbye!") - return 0 - except Exception as e: - print(f"[ERROR] Fatal error: {e}") - import traceback - - traceback.print_exc() - return 1 - - return 0 - - -if __name__ == "__main__": - exit(main())