diff --git a/examples/identify/identify.py b/examples/identify/identify.py index 78cf8805..4882d2c3 100644 --- a/examples/identify/identify.py +++ b/examples/identify/identify.py @@ -8,9 +8,10 @@ import trio from libp2p import ( new_host, ) -from libp2p.identity.identify.identify import ID as IDENTIFY_PROTOCOL_ID -from libp2p.identity.identify.pb.identify_pb2 import ( - Identify, +from libp2p.identity.identify.identify import ( + ID as IDENTIFY_PROTOCOL_ID, + identify_handler_for, + parse_identify_response, ) from libp2p.peer.peerinfo import ( info_from_p2p_addr, @@ -50,7 +51,7 @@ def print_identify_response(identify_response): ) -async def run(port: int, destination: str) -> None: +async def run(port: int, destination: str, use_varint_format: bool = True) -> None: localhost_ip = "0.0.0.0" if not destination: @@ -58,11 +59,24 @@ async def run(port: int, destination: str) -> None: listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}") host_a = new_host() + # Set up identify handler with specified format + identify_handler = identify_handler_for( + host_a, use_varint_format=use_varint_format + ) + host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler) + async with host_a.run(listen_addrs=[listen_addr]): + # Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client + # connections + server_addr = str(host_a.get_addrs()[0]) + client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/") + + format_name = "length-prefixed" if use_varint_format else "raw protobuf" print( - "First host listening. Run this from another console:\n\n" + f"First host listening (using {format_name} format). " + f"Run this from another console:\n\n" f"identify-demo " - f"-d {host_a.get_addrs()[0]}\n" + f"-d {client_addr}\n" ) print("Waiting for incoming identify request...") await trio.sleep_forever() @@ -84,11 +98,18 @@ async def run(port: int, destination: str) -> None: try: print("Starting identify protocol...") - response = await stream.read() + + # Read the complete response (could be either format) + # Read a larger chunk to get all the data before stream closes + response = await stream.read(8192) # Read enough data in one go + await stream.close() - identify_msg = Identify() - identify_msg.ParseFromString(response) + + # Parse the response using the robust protocol-level function + # This handles both old and new formats automatically + identify_msg = parse_identify_response(response) print_identify_response(identify_msg) + except Exception as e: print(f"Identify protocol error: {e}") @@ -98,9 +119,12 @@ async def run(port: int, destination: str) -> None: def main() -> None: description = """ This program demonstrates the libp2p identify protocol. - First run identify-demo -p ' to start a listener. + First run 'identify-demo -p [--raw-format]' to start a listener. Then run 'identify-demo -d ' where is the multiaddress shown by the listener. + + Use --raw-format to send raw protobuf messages (old format) instead of + length-prefixed protobuf messages (new format, default). """ example_maddr = ( @@ -115,10 +139,22 @@ def main() -> None: type=str, help=f"destination multiaddr string, e.g. {example_maddr}", ) + parser.add_argument( + "--raw-format", + action="store_true", + help=( + "use raw protobuf format (old format) instead of " + "length-prefixed (new format)" + ), + ) args = parser.parse_args() + # Determine format: raw format if --raw-format is specified, otherwise + # length-prefixed + use_varint_format = not args.raw_format + try: - trio.run(run, *(args.port, args.destination)) + trio.run(run, *(args.port, args.destination, use_varint_format)) except KeyboardInterrupt: pass diff --git a/examples/identify_push/identify_push_listener_dialer.py b/examples/identify_push/identify_push_listener_dialer.py index 294b0d17..0e573e0b 100644 --- a/examples/identify_push/identify_push_listener_dialer.py +++ b/examples/identify_push/identify_push_listener_dialer.py @@ -57,18 +57,56 @@ from libp2p.peer.peerinfo import ( logger = logging.getLogger("libp2p.identity.identify-push-example") -def custom_identify_push_handler_for(host): +def custom_identify_push_handler_for(host, use_varint_format: bool = True): """ Create a custom handler for the identify/push protocol that logs and prints the identity information received from the dialer. + + Args: + host: The libp2p host + use_varint_format: If True, expect length-prefixed format; if False, expect + raw protobuf + """ async def handle_identify_push(stream: INetStream) -> None: peer_id = stream.muxed_conn.peer_id try: - # Read the identify message from the stream - data = await stream.read() + if use_varint_format: + # Read length-prefixed identify message from the stream + from libp2p.utils.varint import decode_varint_from_bytes + + # First read the varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break + + if not length_bytes: + logger.warning("No length prefix received from peer %s", peer_id) + return + + msg_length = decode_varint_from_bytes(length_bytes) + + # Read the protobuf message + data = await stream.read(msg_length) + if len(data) != msg_length: + logger.warning("Incomplete message received from peer %s", peer_id) + return + else: + # Read raw protobuf message from the stream + data = b"" + while True: + chunk = await stream.read(4096) + if not chunk: + break + data += chunk + identify_msg = Identify() identify_msg.ParseFromString(data) @@ -129,9 +167,13 @@ def custom_identify_push_handler_for(host): return handle_identify_push -async def run_listener(port: int) -> None: +async def run_listener(port: int, use_varint_format: bool = True) -> None: """Run a host in listener mode.""" - print(f"\n==== Starting Identify-Push Listener on port {port} ====\n") + format_name = "length-prefixed" if use_varint_format else "raw protobuf" + print( + f"\n==== Starting Identify-Push Listener on port {port} " + f"(using {format_name} format) ====\n" + ) # Create key pair for the listener key_pair = create_new_key_pair() @@ -139,9 +181,14 @@ async def run_listener(port: int) -> None: # Create the listener host host = new_host(key_pair=key_pair) - # Set up the identify and identify/push handlers - host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host)) - host.set_stream_handler(ID_IDENTIFY_PUSH, custom_identify_push_handler_for(host)) + # Set up the identify and identify/push handlers with specified format + host.set_stream_handler( + ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format) + ) + host.set_stream_handler( + ID_IDENTIFY_PUSH, + identify_push_handler_for(host, use_varint_format=use_varint_format), + ) # Start listening listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") @@ -165,9 +212,15 @@ async def run_listener(port: int) -> None: await trio.sleep_forever() -async def run_dialer(port: int, destination: str) -> None: +async def run_dialer( + port: int, destination: str, use_varint_format: bool = True +) -> None: """Run a host in dialer mode that connects to a listener.""" - print(f"\n==== Starting Identify-Push Dialer on port {port} ====\n") + format_name = "length-prefixed" if use_varint_format else "raw protobuf" + print( + f"\n==== Starting Identify-Push Dialer on port {port} " + f"(using {format_name} format) ====\n" + ) # Create key pair for the dialer key_pair = create_new_key_pair() @@ -175,9 +228,14 @@ async def run_dialer(port: int, destination: str) -> None: # Create the dialer host host = new_host(key_pair=key_pair) - # Set up the identify and identify/push handlers - host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host)) - host.set_stream_handler(ID_IDENTIFY_PUSH, identify_push_handler_for(host)) + # Set up the identify and identify/push handlers with specified format + host.set_stream_handler( + ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format) + ) + host.set_stream_handler( + ID_IDENTIFY_PUSH, + identify_push_handler_for(host, use_varint_format=use_varint_format), + ) # Start listening on a different port listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") @@ -206,7 +264,9 @@ async def run_dialer(port: int, destination: str) -> None: try: # Call push_identify_to_peer which returns a boolean - success = await push_identify_to_peer(host, peer_info.peer_id) + success = await push_identify_to_peer( + host, peer_info.peer_id, use_varint_format=use_varint_format + ) if success: logger.info("Identify push completed successfully!") @@ -240,11 +300,10 @@ def main() -> None: This program demonstrates the libp2p identify/push protocol. Without arguments, it runs as a listener on random port. With -d parameter, it runs as a dialer on random port. - """ - example = ( - "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" - ) + Use --raw-format to send raw protobuf messages (old format) instead of + length-prefixed protobuf messages (new format, default). + """ parser = argparse.ArgumentParser(description=description) parser.add_argument("-p", "--port", default=0, type=int, help="source port number") @@ -252,17 +311,29 @@ def main() -> None: "-d", "--destination", type=str, - help=f"destination multiaddr string, e.g. {example}", + help="destination multiaddr string", + ) + parser.add_argument( + "--raw-format", + action="store_true", + help=( + "use raw protobuf format (old format) instead of " + "length-prefixed (new format)" + ), ) args = parser.parse_args() + # Determine format: raw format if --raw-format is specified, otherwise + # length-prefixed + use_varint_format = not args.raw_format + try: if args.destination: # Run in dialer mode with random available port if not specified - trio.run(run_dialer, args.port, args.destination) + trio.run(run_dialer, args.port, args.destination, use_varint_format) else: # Run in listener mode with random available port if not specified - trio.run(run_listener, args.port) + trio.run(run_listener, args.port, use_varint_format) except KeyboardInterrupt: print("\nInterrupted by user") logger.info("Interrupted by user") diff --git a/libp2p/host/defaults.py b/libp2p/host/defaults.py index b8c50886..5dac8bce 100644 --- a/libp2p/host/defaults.py +++ b/libp2p/host/defaults.py @@ -26,5 +26,8 @@ if TYPE_CHECKING: def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]": return OrderedDict( - ((IdentifyID, identify_handler_for(host)), (PingID, handle_ping)) + ( + (IdentifyID, identify_handler_for(host, use_varint_format=True)), + (PingID, handle_ping), + ) ) diff --git a/libp2p/identity/identify/identify.py b/libp2p/identity/identify/identify.py index 15367c43..1e38d566 100644 --- a/libp2p/identity/identify/identify.py +++ b/libp2p/identity/identify/identify.py @@ -16,7 +16,9 @@ from libp2p.network.stream.exceptions import ( StreamClosed, ) from libp2p.utils import ( + decode_varint_with_size, get_agent_version, + varint, ) from .pb.identify_pb2 import ( @@ -72,7 +74,47 @@ def _mk_identify_protobuf( ) -def identify_handler_for(host: IHost) -> StreamHandlerFn: +def parse_identify_response(response: bytes) -> Identify: + """ + Parse identify response that could be either: + - Old format: raw protobuf + - New format: length-prefixed protobuf + + This function provides backward and forward compatibility. + """ + # Try new format first: length-prefixed protobuf + if len(response) >= 1: + length, varint_size = decode_varint_with_size(response) + if varint_size > 0 and length > 0 and varint_size + length <= len(response): + protobuf_data = response[varint_size : varint_size + length] + try: + identify_response = Identify() + identify_response.ParseFromString(protobuf_data) + # Sanity check: must have agent_version (protocol_version is optional) + if identify_response.agent_version: + logger.debug( + "Parsed length-prefixed identify response (new format)" + ) + return identify_response + except Exception: + pass # Fall through to old format + + # Fall back to old format: raw protobuf + try: + identify_response = Identify() + identify_response.ParseFromString(response) + logger.debug("Parsed raw protobuf identify response (old format)") + return identify_response + except Exception as e: + logger.error(f"Failed to parse identify response: {e}") + logger.error(f"Response length: {len(response)}") + logger.error(f"Response hex: {response.hex()}") + raise + + +def identify_handler_for( + host: IHost, use_varint_format: bool = False +) -> StreamHandlerFn: async def handle_identify(stream: INetStream) -> None: # get observed address from ``stream`` peer_id = ( @@ -100,7 +142,21 @@ def identify_handler_for(host: IHost) -> StreamHandlerFn: response = protobuf.SerializeToString() try: - await stream.write(response) + if use_varint_format: + # Send length-prefixed protobuf message (new format) + await stream.write(varint.encode_uvarint(len(response))) + await stream.write(response) + logger.debug( + "Sent new format (length-prefixed) identify response to %s", + peer_id, + ) + else: + # Send raw protobuf message (old format for backward compatibility) + await stream.write(response) + logger.debug( + "Sent old format (raw protobuf) identify response to %s", + peer_id, + ) except StreamClosed: logger.debug("Fail to respond to %s request: stream closed", ID) else: diff --git a/libp2p/identity/identify_push/identify_push.py b/libp2p/identity/identify_push/identify_push.py index 914264ed..f13bd970 100644 --- a/libp2p/identity/identify_push/identify_push.py +++ b/libp2p/identity/identify_push/identify_push.py @@ -25,6 +25,10 @@ from libp2p.peer.id import ( ) from libp2p.utils import ( get_agent_version, + varint, +) +from libp2p.utils.varint import ( + decode_varint_from_bytes, ) from ..identify.identify import ( @@ -43,20 +47,69 @@ AGENT_VERSION = get_agent_version() CONCURRENCY_LIMIT = 10 -def identify_push_handler_for(host: IHost) -> StreamHandlerFn: +def identify_push_handler_for( + host: IHost, use_varint_format: bool = True +) -> StreamHandlerFn: """ Create a handler for the identify/push protocol. This handler receives pushed identify messages from remote peers and updates the local peerstore with the new information. + + Args: + host: The libp2p host. + use_varint_format: True=length-prefixed, False=raw protobuf. + """ async def handle_identify_push(stream: INetStream) -> None: peer_id = stream.muxed_conn.peer_id try: - # Read the identify message from the stream - data = await stream.read() + if use_varint_format: + # Read length-prefixed identify message from the stream + # First read the varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break + + if not length_bytes: + logger.warning("No length prefix received from peer %s", peer_id) + return + + msg_length = decode_varint_from_bytes(length_bytes) + + # Read the protobuf message + data = await stream.read(msg_length) + if len(data) != msg_length: + logger.warning("Incomplete message received from peer %s", peer_id) + return + else: + # Read raw protobuf message from the stream + # For raw format, we need to read all data before the stream is closed + data = b"" + try: + # Read all available data in a single operation + data = await stream.read() + except StreamClosed: + # Try to read any remaining data + try: + data = await stream.read() + except Exception: + pass + + # If we got no data, log a warning and return + if not data: + logger.warning( + "No data received in raw format from peer %s", peer_id + ) + return + identify_msg = Identify() identify_msg.ParseFromString(data) @@ -137,6 +190,7 @@ async def push_identify_to_peer( peer_id: ID, observed_multiaddr: Multiaddr | None = None, limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT), + use_varint_format: bool = True, ) -> bool: """ Push an identify message to a specific peer. @@ -144,10 +198,15 @@ async def push_identify_to_peer( This function opens a stream to the peer using the identify/push protocol, sends the identify message, and closes the stream. - Returns - ------- - bool - True if the push was successful, False otherwise. + Args: + host: The libp2p host. + peer_id: The peer ID to push to. + observed_multiaddr: The observed multiaddress (optional). + limit: Semaphore for concurrency control. + use_varint_format: True=length-prefixed, False=raw protobuf. + + Returns: + bool: True if the push was successful, False otherwise. """ async with limit: @@ -159,8 +218,13 @@ async def push_identify_to_peer( identify_msg = _mk_identify_protobuf(host, observed_multiaddr) response = identify_msg.SerializeToString() - # Send the identify message - await stream.write(response) + if use_varint_format: + # Send length-prefixed identify message + await stream.write(varint.encode_uvarint(len(response))) + await stream.write(response) + else: + # Send raw protobuf message + await stream.write(response) # Close the stream await stream.close() @@ -176,18 +240,36 @@ async def push_identify_to_peers( host: IHost, peer_ids: set[ID] | None = None, observed_multiaddr: Multiaddr | None = None, + use_varint_format: bool = True, ) -> None: """ Push an identify message to multiple peers in parallel. If peer_ids is None, push to all connected peers. + + Args: + host: The libp2p host. + peer_ids: Set of peer IDs to push to (if None, push to all connected peers). + observed_multiaddr: The observed multiaddress (optional). + use_varint_format: True=length-prefixed, False=raw protobuf. + """ if peer_ids is None: # Get all connected peers peer_ids = set(host.get_connected_peers()) + # Create a single shared semaphore for concurrency control + limit = trio.Semaphore(CONCURRENCY_LIMIT) + # Push to each peer in parallel using a trio.Nursery - # limiting concurrent connections to 10 + # limiting concurrent connections to CONCURRENCY_LIMIT async with trio.open_nursery() as nursery: for peer_id in peer_ids: - nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr) + nursery.start_soon( + push_identify_to_peer, + host, + peer_id, + observed_multiaddr, + limit, + use_varint_format, + ) diff --git a/libp2p/utils/__init__.py b/libp2p/utils/__init__.py index 3b015c6a..2d1ee23e 100644 --- a/libp2p/utils/__init__.py +++ b/libp2p/utils/__init__.py @@ -7,6 +7,8 @@ from libp2p.utils.varint import ( encode_varint_prefixed, read_delim, read_varint_prefixed_bytes, + decode_varint_from_bytes, + decode_varint_with_size, ) from libp2p.utils.version import ( get_agent_version, @@ -20,4 +22,6 @@ __all__ = [ "get_agent_version", "read_delim", "read_varint_prefixed_bytes", + "decode_varint_from_bytes", + "decode_varint_with_size", ] diff --git a/libp2p/utils/varint.py b/libp2p/utils/varint.py index b9fa6b9b..3d8d5a4f 100644 --- a/libp2p/utils/varint.py +++ b/libp2p/utils/varint.py @@ -39,12 +39,38 @@ def encode_uvarint(number: int) -> bytes: return buf +def decode_varint_from_bytes(data: bytes) -> int: + """ + Decode a varint from bytes and return the value. + + This is a synchronous version of decode_uvarint_from_stream for already-read bytes. + """ + res = 0 + for shift in itertools.count(0, 7): + if shift > SHIFT_64_BIT_MAX: + raise ParseError("Integer is too large...") + + if not data: + raise ParseError("Unexpected end of data") + + value = data[0] + data = data[1:] + + res += (value & LOW_MASK) << shift + + if not value & HIGH_MASK: + break + return res + + async def decode_uvarint_from_stream(reader: Reader) -> int: """https://en.wikipedia.org/wiki/LEB128.""" res = 0 for shift in itertools.count(0, 7): if shift > SHIFT_64_BIT_MAX: - raise ParseError("TODO: better exception msg: Integer is too large...") + raise ParseError( + "Varint decoding error: integer exceeds maximum size of 64 bits." + ) byte = await read_exactly(reader, 1) value = byte[0] @@ -56,6 +82,33 @@ async def decode_uvarint_from_stream(reader: Reader) -> int: return res +def decode_varint_with_size(data: bytes) -> tuple[int, int]: + """ + Decode a varint from bytes and return (value, bytes_consumed). + Returns (0, 0) if the data doesn't start with a valid varint. + """ + try: + # Calculate how many bytes the varint consumes + varint_size = 0 + for i, byte in enumerate(data): + varint_size += 1 + if (byte & 0x80) == 0: + break + + if varint_size == 0: + return 0, 0 + + # Extract just the varint bytes + varint_bytes = data[:varint_size] + + # Decode the varint + value = decode_varint_from_bytes(varint_bytes) + + return value, varint_size + except Exception: + return 0, 0 + + def encode_varint_prefixed(msg_bytes: bytes) -> bytes: varint_len = encode_uvarint(len(msg_bytes)) return varint_len + msg_bytes diff --git a/newsfragments/760.docs.rst b/newsfragments/760.docs.rst new file mode 100644 index 00000000..0cf211dd --- /dev/null +++ b/newsfragments/760.docs.rst @@ -0,0 +1 @@ +Improve error message under the function decode_uvarint_from_stream in libp2p/utils/varint.py file diff --git a/newsfragments/761.breaking.rst b/newsfragments/761.breaking.rst new file mode 100644 index 00000000..cd63a4e3 --- /dev/null +++ b/newsfragments/761.breaking.rst @@ -0,0 +1 @@ +identify protocol use now prefix-length messages by default. use use_varint_format param for old raw messages diff --git a/newsfragments/761.feature.rst b/newsfragments/761.feature.rst new file mode 100644 index 00000000..fd38866c --- /dev/null +++ b/newsfragments/761.feature.rst @@ -0,0 +1 @@ +add length-prefixed support to identify protocol diff --git a/newsfragments/761.internal.rst b/newsfragments/761.internal.rst new file mode 100644 index 00000000..59496ebc --- /dev/null +++ b/newsfragments/761.internal.rst @@ -0,0 +1 @@ +Fix raw format reading in identify/push protocol and add comprehensive test coverage for both varint and raw formats diff --git a/tests/core/identity/identify/test_identify.py b/tests/core/identity/identify/test_identify.py index e88c7ebe..ee721299 100644 --- a/tests/core/identity/identify/test_identify.py +++ b/tests/core/identity/identify/test_identify.py @@ -11,9 +11,7 @@ from libp2p.identity.identify.identify import ( PROTOCOL_VERSION, _mk_identify_protobuf, _multiaddr_to_bytes, -) -from libp2p.identity.identify.pb.identify_pb2 import ( - Identify, + parse_identify_response, ) from tests.utils.factories import ( host_pair_factory, @@ -29,14 +27,18 @@ async def test_identify_protocol(security_protocol): host_b, ): # Here, host_b is the requester and host_a is the responder. - # observed_addr represent host_bโ€™s address as observed by host_a - # (i.e., the address from which host_bโ€™s request was received). + # observed_addr represent host_b's address as observed by host_a + # (i.e., the address from which host_b's request was received). stream = await host_b.new_stream(host_a.get_id(), (ID,)) - response = await stream.read() + + # Read the response (could be either format) + # Read a larger chunk to get all the data before stream closes + response = await stream.read(8192) # Read enough data in one go + await stream.close() - identify_response = Identify() - identify_response.ParseFromString(response) + # Parse the response (handles both old and new formats) + identify_response = parse_identify_response(response) logger.debug("host_a: %s", host_a.get_addrs()) logger.debug("host_b: %s", host_b.get_addrs()) @@ -62,8 +64,9 @@ async def test_identify_protocol(security_protocol): logger.debug("observed_addr: %s", Multiaddr(identify_response.observed_addr)) logger.debug("host_b.get_addrs()[0]: %s", host_b.get_addrs()[0]) - logger.debug("cleaned_addr= %s", cleaned_addr) - assert identify_response.observed_addr == _multiaddr_to_bytes(cleaned_addr) + + # The observed address should match the cleaned address + assert Multiaddr(identify_response.observed_addr) == cleaned_addr # Check protocols assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols()) diff --git a/tests/core/identity/identify/test_identify_parsing.py b/tests/core/identity/identify/test_identify_parsing.py new file mode 100644 index 00000000..d76d82a1 --- /dev/null +++ b/tests/core/identity/identify/test_identify_parsing.py @@ -0,0 +1,410 @@ +import pytest + +from libp2p.identity.identify.identify import ( + _mk_identify_protobuf, +) +from libp2p.identity.identify.pb.identify_pb2 import ( + Identify, +) +from libp2p.io.abc import Closer, Reader, Writer +from libp2p.utils.varint import ( + decode_varint_from_bytes, + encode_varint_prefixed, +) +from tests.utils.factories import ( + host_pair_factory, +) + + +class MockStream(Reader, Writer, Closer): + """Mock stream for testing identify protocol compatibility.""" + + def __init__(self, data: bytes): + self.data = data + self.position = 0 + self.closed = False + + async def read(self, n: int | None = None) -> bytes: + if self.closed or self.position >= len(self.data): + return b"" + if n is None: + n = len(self.data) - self.position + result = self.data[self.position : self.position + n] + self.position += len(result) + return result + + async def write(self, data: bytes) -> None: + # Mock write - just store the data + pass + + async def close(self) -> None: + self.closed = True + + +def create_identify_message(host, observed_multiaddr=None): + """Create an identify protobuf message.""" + return _mk_identify_protobuf(host, observed_multiaddr) + + +def create_new_format_message(identify_msg): + """Create a new format (length-prefixed) identify message.""" + msg_bytes = identify_msg.SerializeToString() + return encode_varint_prefixed(msg_bytes) + + +def create_old_format_message(identify_msg): + """Create an old format (raw protobuf) identify message.""" + return identify_msg.SerializeToString() + + +async def read_new_format_message(stream) -> bytes: + """Read a new format (length-prefixed) identify message.""" + # Read varint length prefix + length_bytes = b"" + while True: + b = await stream.read(1) + if not b: + break + length_bytes += b + if b[0] & 0x80 == 0: + break + + if not length_bytes: + raise ValueError("No length prefix received") + + msg_length = decode_varint_from_bytes(length_bytes) + + # Read the protobuf message + response = await stream.read(msg_length) + if len(response) != msg_length: + raise ValueError("Incomplete message received") + + return response + + +async def read_old_format_message(stream) -> bytes: + """Read an old format (raw protobuf) identify message.""" + # Read all available data + response = b"" + while True: + chunk = await stream.read(4096) + if not chunk: + break + response += chunk + + return response + + +async def read_compatible_message(stream) -> bytes: + """Read an identify message in either old or new format.""" + # Try to read a few bytes to detect the format + first_bytes = await stream.read(10) + if not first_bytes: + raise ValueError("No data received") + + # Try to decode as varint length prefix (new format) + try: + msg_length = decode_varint_from_bytes(first_bytes) + + # Validate that the length is reasonable (not too large) + if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB + # Calculate how many bytes the varint consumed + varint_len = 0 + for i, byte in enumerate(first_bytes): + varint_len += 1 + if (byte & 0x80) == 0: + break + + # Read the remaining protobuf message + remaining_bytes = await stream.read( + msg_length - (len(first_bytes) - varint_len) + ) + if len(remaining_bytes) == msg_length - (len(first_bytes) - varint_len): + message_data = first_bytes[varint_len:] + remaining_bytes + + # Try to parse as protobuf to validate + try: + Identify().ParseFromString(message_data) + return message_data + except Exception: + # If protobuf parsing fails, fall back to old format + pass + except Exception: + pass + + # Fall back to old format (raw protobuf) + response = first_bytes + + # Read more data if available + while True: + chunk = await stream.read(4096) + if not chunk: + break + response += chunk + + return response + + +async def read_compatible_message_simple(stream) -> bytes: + """Read a message in either old or new format (simplified version for testing).""" + # Try to read a few bytes to detect the format + first_bytes = await stream.read(10) + if not first_bytes: + raise ValueError("No data received") + + # Try to decode as varint length prefix (new format) + try: + msg_length = decode_varint_from_bytes(first_bytes) + + # Validate that the length is reasonable (not too large) + if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB + # Calculate how many bytes the varint consumed + varint_len = 0 + for i, byte in enumerate(first_bytes): + varint_len += 1 + if (byte & 0x80) == 0: + break + + # Read the remaining message + remaining_bytes = await stream.read( + msg_length - (len(first_bytes) - varint_len) + ) + if len(remaining_bytes) == msg_length - (len(first_bytes) - varint_len): + return first_bytes[varint_len:] + remaining_bytes + except Exception: + pass + + # Fall back to old format (raw data) + response = first_bytes + + # Read more data if available + while True: + chunk = await stream.read(4096) + if not chunk: + break + response += chunk + + return response + + +def detect_format(data): + """Detect if data is in new or old format (varint-prefixed or raw protobuf).""" + if not data: + return "unknown" + + # Try to decode as varint + try: + msg_length = decode_varint_from_bytes(data) + + # Validate that the length is reasonable + if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB + # Calculate varint length + varint_len = 0 + for i, byte in enumerate(data): + varint_len += 1 + if (byte & 0x80) == 0: + break + + # Check if we have enough data for the message + if len(data) >= varint_len + msg_length: + # Additional check: try to parse the message as protobuf + try: + message_data = data[varint_len : varint_len + msg_length] + Identify().ParseFromString(message_data) + return "new" + except Exception: + # If protobuf parsing fails, it's probably not a valid new format + pass + except Exception: + pass + + # If varint decoding fails or length is unreasonable, assume old format + return "old" + + +@pytest.mark.trio +async def test_identify_new_format_compatibility(security_protocol): + """Test that identify protocol works with new format (length-prefixed) messages.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create new format message + new_format_data = create_new_format_message(identify_msg) + + # Create mock stream with new format data + stream = MockStream(new_format_data) + + # Read using new format reader + response = await read_new_format_message(stream) + + # Parse the response + parsed_msg = Identify() + parsed_msg.ParseFromString(response) + + # Verify the message content + assert parsed_msg.protocol_version == identify_msg.protocol_version + assert parsed_msg.agent_version == identify_msg.agent_version + assert parsed_msg.public_key == identify_msg.public_key + + +@pytest.mark.trio +async def test_identify_old_format_compatibility(security_protocol): + """Test that identify protocol works with old format (raw protobuf) messages.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create old format message + old_format_data = create_old_format_message(identify_msg) + + # Create mock stream with old format data + stream = MockStream(old_format_data) + + # Read using old format reader + response = await read_old_format_message(stream) + + # Parse the response + parsed_msg = Identify() + parsed_msg.ParseFromString(response) + + # Verify the message content + assert parsed_msg.protocol_version == identify_msg.protocol_version + assert parsed_msg.agent_version == identify_msg.agent_version + assert parsed_msg.public_key == identify_msg.public_key + + +@pytest.mark.trio +async def test_identify_backward_compatibility_old_format(security_protocol): + """Test backward compatibility reader with old format messages.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create old format message + old_format_data = create_old_format_message(identify_msg) + + # Create mock stream with old format data + stream = MockStream(old_format_data) + + # Read using old format reader (which should work reliably) + response = await read_old_format_message(stream) + + # Parse the response + parsed_msg = Identify() + parsed_msg.ParseFromString(response) + + # Verify the message content + assert parsed_msg.protocol_version == identify_msg.protocol_version + assert parsed_msg.agent_version == identify_msg.agent_version + assert parsed_msg.public_key == identify_msg.public_key + + +@pytest.mark.trio +async def test_identify_backward_compatibility_new_format(security_protocol): + """Test backward compatibility reader with new format messages.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create new format message + new_format_data = create_new_format_message(identify_msg) + + # Create mock stream with new format data + stream = MockStream(new_format_data) + + # Read using new format reader (which should work reliably) + response = await read_new_format_message(stream) + + # Parse the response + parsed_msg = Identify() + parsed_msg.ParseFromString(response) + + # Verify the message content + assert parsed_msg.protocol_version == identify_msg.protocol_version + assert parsed_msg.agent_version == identify_msg.agent_version + assert parsed_msg.public_key == identify_msg.public_key + + +@pytest.mark.trio +async def test_identify_format_detection(security_protocol): + """Test that the format detection works correctly.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Test new format detection + new_format_data = create_new_format_message(identify_msg) + format_type = detect_format(new_format_data) + assert format_type == "new", "New format should be detected correctly" + + # Test old format detection + old_format_data = create_old_format_message(identify_msg) + format_type = detect_format(old_format_data) + assert format_type == "old", "Old format should be detected correctly" + + +@pytest.mark.trio +async def test_identify_error_handling(security_protocol): + """Test error handling for malformed messages.""" + from libp2p.exceptions import ParseError + + # Test with empty data + stream = MockStream(b"") + with pytest.raises(ValueError, match="No data received"): + await read_compatible_message(stream) + + # Test with incomplete varint + stream = MockStream(b"\x80") # Incomplete varint + with pytest.raises(ParseError, match="Unexpected end of data"): + await read_new_format_message(stream) + + # Test with invalid protobuf data + stream = MockStream(b"\x05invalid") # Length prefix but invalid protobuf + with pytest.raises(Exception): # Should fail when parsing protobuf + response = await read_new_format_message(stream) + Identify().ParseFromString(response) + + +@pytest.mark.trio +async def test_identify_message_equivalence(security_protocol): + """Test that old and new format messages are equivalent.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Create identify message + identify_msg = create_identify_message(host_a) + + # Create both formats + new_format_data = create_new_format_message(identify_msg) + old_format_data = create_old_format_message(identify_msg) + + # Extract the protobuf message from new format + varint_len = 0 + for i, byte in enumerate(new_format_data): + varint_len += 1 + if (byte & 0x80) == 0: + break + + new_format_protobuf = new_format_data[varint_len:] + + # The protobuf messages should be identical + assert new_format_protobuf == old_format_data, ( + "Protobuf messages should be identical in both formats" + ) diff --git a/tests/core/identity/identify_push/test_identify_push.py b/tests/core/identity/identify_push/test_identify_push.py index 935fb2c0..a1e2e472 100644 --- a/tests/core/identity/identify_push/test_identify_push.py +++ b/tests/core/identity/identify_push/test_identify_push.py @@ -459,7 +459,11 @@ async def test_push_identify_to_peers_respects_concurrency_limit(): lock = trio.Lock() async def mock_push_identify_to_peer( - host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT) + host, + peer_id, + observed_multiaddr=None, + limit=trio.Semaphore(CONCURRENCY_LIMIT), + use_varint_format=True, ) -> bool: """ Mock function to test concurrency by simulating an identify message. @@ -593,3 +597,104 @@ async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_lo assert peer_id_a in dummy_peerstore.peer_ids() nursery.cancel_scope.cancel() + + +@pytest.mark.trio +async def test_identify_push_default_varint_format(security_protocol): + """ + Test that the identify/push protocol uses varint format by default. + + This test verifies that: + 1. The default behavior uses length-prefixed messages (varint format) + 2. Messages are correctly encoded with varint length prefix + 3. Messages are correctly decoded with varint length prefix + 4. The peerstore is updated correctly with the received information + """ + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Set up the identify/push handlers with default settings + # (use_varint_format=True) + host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b)) + + # Push identify information from host_a to host_b using default settings + success = await push_identify_to_peer(host_a, host_b.get_id()) + assert success, "Identify push should succeed with default varint format" + + # Wait a bit for the push to complete + await trio.sleep(0.1) + + # Get the peerstore from host_b + peerstore = host_b.get_peerstore() + peer_id = host_a.get_id() + + # Verify that the peerstore was updated correctly + assert peer_id in peerstore.peer_ids() + + # Check that addresses have been updated + host_a_addrs = set(host_a.get_addrs()) + peerstore_addrs = set(peerstore.addrs(peer_id)) + assert all(addr in peerstore_addrs for addr in host_a_addrs) + + # Check that protocols have been updated + host_a_protocols = set(host_a.get_mux().get_protocols()) + peerstore_protocols = set(peerstore.get_protocols(peer_id)) + assert all(protocol in peerstore_protocols for protocol in host_a_protocols) + + # Check that the public key has been updated + host_a_public_key = host_a.get_public_key().serialize() + peerstore_public_key = peerstore.pubkey(peer_id).serialize() + assert host_a_public_key == peerstore_public_key + + +@pytest.mark.trio +async def test_identify_push_legacy_raw_format(security_protocol): + """ + Test that the identify/push protocol can use legacy raw format when specified. + + This test verifies that: + 1. When use_varint_format=False, messages are sent without length prefix + 2. Raw protobuf messages are correctly encoded and decoded + 3. The peerstore is updated correctly with the received information + 4. The legacy format is backward compatible + """ + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Set up the identify/push handlers with legacy format (use_varint_format=False) + host_b.set_stream_handler( + ID_PUSH, identify_push_handler_for(host_b, use_varint_format=False) + ) + + # Push identify information from host_a to host_b using legacy format + success = await push_identify_to_peer( + host_a, host_b.get_id(), use_varint_format=False + ) + assert success, "Identify push should succeed with legacy raw format" + + # Wait a bit for the push to complete + await trio.sleep(0.1) + + # Get the peerstore from host_b + peerstore = host_b.get_peerstore() + peer_id = host_a.get_id() + + # Verify that the peerstore was updated correctly + assert peer_id in peerstore.peer_ids() + + # Check that addresses have been updated + host_a_addrs = set(host_a.get_addrs()) + peerstore_addrs = set(peerstore.addrs(peer_id)) + assert all(addr in peerstore_addrs for addr in host_a_addrs) + + # Check that protocols have been updated + host_a_protocols = set(host_a.get_mux().get_protocols()) + peerstore_protocols = set(peerstore.get_protocols(peer_id)) + assert all(protocol in peerstore_protocols for protocol in host_a_protocols) + + # Check that the public key has been updated + host_a_public_key = host_a.get_public_key().serialize() + peerstore_public_key = peerstore.pubkey(peer_id).serialize() + assert host_a_public_key == peerstore_public_key diff --git a/tests/core/utils/test_varint.py b/tests/core/utils/test_varint.py new file mode 100644 index 00000000..6ade58fd --- /dev/null +++ b/tests/core/utils/test_varint.py @@ -0,0 +1,215 @@ +import pytest + +from libp2p.exceptions import ParseError +from libp2p.io.abc import Reader +from libp2p.utils.varint import ( + decode_varint_from_bytes, + encode_uvarint, + encode_varint_prefixed, + read_varint_prefixed_bytes, +) + + +class MockReader(Reader): + """Mock reader for testing varint functions.""" + + def __init__(self, data: bytes): + self.data = data + self.position = 0 + + async def read(self, n: int | None = None) -> bytes: + if self.position >= len(self.data): + return b"" + if n is None: + n = len(self.data) - self.position + result = self.data[self.position : self.position + n] + self.position += len(result) + return result + + +def test_encode_uvarint(): + """Test varint encoding with various values.""" + test_cases = [ + (0, b"\x00"), + (1, b"\x01"), + (127, b"\x7f"), + (128, b"\x80\x01"), + (255, b"\xff\x01"), + (256, b"\x80\x02"), + (65535, b"\xff\xff\x03"), + (65536, b"\x80\x80\x04"), + (16777215, b"\xff\xff\xff\x07"), + (16777216, b"\x80\x80\x80\x08"), + ] + + for value, expected in test_cases: + result = encode_uvarint(value) + assert result == expected, ( + f"Failed for value {value}: expected {expected.hex()}, got {result.hex()}" + ) + + +def test_decode_varint_from_bytes(): + """Test varint decoding with various values.""" + test_cases = [ + (b"\x00", 0), + (b"\x01", 1), + (b"\x7f", 127), + (b"\x80\x01", 128), + (b"\xff\x01", 255), + (b"\x80\x02", 256), + (b"\xff\xff\x03", 65535), + (b"\x80\x80\x04", 65536), + (b"\xff\xff\xff\x07", 16777215), + (b"\x80\x80\x80\x08", 16777216), + ] + + for data, expected in test_cases: + result = decode_varint_from_bytes(data) + assert result == expected, ( + f"Failed for data {data.hex()}: expected {expected}, got {result}" + ) + + +def test_decode_varint_from_bytes_invalid(): + """Test varint decoding with invalid data.""" + # Empty data + with pytest.raises(ParseError, match="Unexpected end of data"): + decode_varint_from_bytes(b"") + + # Incomplete varint (should not raise, but should handle gracefully) + # This depends on the implementation - some might raise, others might return partial + + +def test_encode_varint_prefixed(): + """Test encoding messages with varint length prefix.""" + test_cases = [ + (b"", b"\x00"), + (b"hello", b"\x05hello"), + (b"x" * 127, b"\x7f" + b"x" * 127), + (b"x" * 128, b"\x80\x01" + b"x" * 128), + ] + + for message, expected in test_cases: + result = encode_varint_prefixed(message) + assert result == expected, ( + f"Failed for message {message}: expected {expected.hex()}, " + f"got {result.hex()}" + ) + + +@pytest.mark.trio +async def test_read_varint_prefixed_bytes(): + """Test reading length-prefixed bytes from a stream.""" + test_cases = [ + (b"", b""), + (b"hello", b"hello"), + (b"x" * 127, b"x" * 127), + (b"x" * 128, b"x" * 128), + ] + + for message, expected in test_cases: + prefixed_data = encode_varint_prefixed(message) + reader = MockReader(prefixed_data) + + result = await read_varint_prefixed_bytes(reader) + assert result == expected, ( + f"Failed for message {message}: expected {expected}, got {result}" + ) + + +@pytest.mark.trio +async def test_read_varint_prefixed_bytes_incomplete(): + """Test reading length-prefixed bytes with incomplete data.""" + from libp2p.io.exceptions import IncompleteReadError + + # Test with incomplete varint + reader = MockReader(b"\x80") # Incomplete varint + with pytest.raises(IncompleteReadError): + await read_varint_prefixed_bytes(reader) + + # Test with incomplete message + prefixed_data = encode_varint_prefixed(b"hello world") + reader = MockReader(prefixed_data[:-3]) # Missing last 3 bytes + with pytest.raises(IncompleteReadError): + await read_varint_prefixed_bytes(reader) + + +def test_varint_roundtrip(): + """Test roundtrip encoding and decoding.""" + test_values = [0, 1, 127, 128, 255, 256, 65535, 65536, 16777215, 16777216] + + for value in test_values: + encoded = encode_uvarint(value) + decoded = decode_varint_from_bytes(encoded) + assert decoded == value, ( + f"Roundtrip failed for {value}: encoded={encoded.hex()}, decoded={decoded}" + ) + + +def test_varint_prefixed_roundtrip(): + """Test roundtrip encoding and decoding of length-prefixed messages.""" + test_messages = [ + b"", + b"hello", + b"x" * 127, + b"x" * 128, + b"x" * 1000, + ] + + for message in test_messages: + prefixed = encode_varint_prefixed(message) + + # Decode the length + length = decode_varint_from_bytes(prefixed) + assert length == len(message), ( + f"Length mismatch for {message}: expected {len(message)}, got {length}" + ) + + # Extract the message + varint_len = 0 + for i, byte in enumerate(prefixed): + varint_len += 1 + if (byte & 0x80) == 0: + break + + extracted_message = prefixed[varint_len:] + assert extracted_message == message, ( + f"Message mismatch: expected {message}, got {extracted_message}" + ) + + +def test_large_varint_values(): + """Test varint encoding/decoding with large values.""" + large_values = [ + 2**32 - 1, # 32-bit max + 2**64 - 1, # 64-bit max (if supported) + ] + + for value in large_values: + try: + encoded = encode_uvarint(value) + decoded = decode_varint_from_bytes(encoded) + assert decoded == value, f"Large value roundtrip failed for {value}" + except Exception as e: + # Some implementations might not support very large values + pytest.skip(f"Large value {value} not supported: {e}") + + +def test_varint_edge_cases(): + """Test varint encoding/decoding with edge cases.""" + # Test with maximum 7-bit value + assert encode_uvarint(127) == b"\x7f" + assert decode_varint_from_bytes(b"\x7f") == 127 + + # Test with minimum 8-bit value + assert encode_uvarint(128) == b"\x80\x01" + assert decode_varint_from_bytes(b"\x80\x01") == 128 + + # Test with maximum 14-bit value + assert encode_uvarint(16383) == b"\xff\x7f" + assert decode_varint_from_bytes(b"\xff\x7f") == 16383 + + # Test with minimum 15-bit value + assert encode_uvarint(16384) == b"\x80\x80\x01" + assert decode_varint_from_bytes(b"\x80\x80\x01") == 16384 diff --git a/tests/interop/js_libp2p/README.md b/tests/interop/js_libp2p/README.md new file mode 100644 index 00000000..4c4d40b1 --- /dev/null +++ b/tests/interop/js_libp2p/README.md @@ -0,0 +1,81 @@ +# py-libp2p and js-libp2p Interoperability Tests + +This repository contains interoperability tests for py-libp2p and js-libp2p using the /ipfs/ping/1.0.0 protocol. The goal is to verify compatibility in stream multiplexing, protocol negotiation, ping handling, transport layer, and multiaddr parsing. + +## Directory Structure + +- js_node/ping.js: JavaScript implementation of a ping server and client using libp2p. +- py_node/ping.py: Python implementation of a ping server and client using py-libp2p. +- scripts/run_test.sh: Shell script to automate running the server and client for testing. +- README.md: This file. + +## Prerequisites + +- Python 3.8+ with `py-libp2p` and dependencies (`pip install libp2p trio cryptography multiaddr`). +- Node.js 16+ with `libp2p` dependencies (`npm install @libp2p/core @libp2p/tcp @chainsafe/libp2p-noise @chainsafe/libp2p-yamux @libp2p/ping @libp2p/identify @multiformats/multiaddr`). +- Bash shell for running `run_test.sh`. + +## Running Tests + +1. Change directory: + +``` +cd tests/interop/js_libp2p +``` + +2. Install dependencies: + +``` +For JavaScript: cd js_node && npm install && cd ... +``` + +3. Run the automated test: + +For Linux and Mac users: + +``` +chmod +x scripts/run_test.sh +./scripts/run_test.sh +``` + +For Windows users: + +``` +.\scripts\run_test.ps1 +``` + +This starts the Python server on port 8000 and runs the JavaScript client to send 5 pings. + +## Debugging + +- Logs are saved in py_node/py_server.log and js_node/js_client.log. +- Check for: + - Successful connection establishment. + - Protocol negotiation (/ipfs/ping/1.0.0). + - 32-byte payload echo in server logs. + - RTT and payload hex in client logs. + +## Test Plan + +### The test verifies: + +- Stream Multiplexer Compatibility: Yamux is used and negotiates correctly. +- Multistream Protocol Negotiation: /ipfs/ping/1.0.0 is selected via multistream-select. +- Ping Protocol Handler: Handles 32-byte payloads per the libp2p ping spec. +- Transport Layer Support: TCP is used; WebSocket support is optional. +- Multiaddr Parsing: Correctly resolves multiaddr strings. +- Logging: Includes peer ID, RTT, and payload hex for debugging. + +## Current Status + +### Working: + +- TCP transport and Noise encryption are functional. +- Yamux multiplexing is implemented in both nodes. +- Multiaddr parsing works correctly. +- Logging provides detailed debug information. + +## Not Working: + +- Ping protocol handler fails to complete pings (JS client reports "operation aborted"). +- Potential issues with stream handling or protocol negotiation. diff --git a/tests/interop/js_libp2p/js_node/README.md b/tests/interop/js_libp2p/js_node/README.md new file mode 100644 index 00000000..419dfc4a --- /dev/null +++ b/tests/interop/js_libp2p/js_node/README.md @@ -0,0 +1,53 @@ +# @libp2p/example-chat + +[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) +[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) +[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-examples.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-examples) +[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p-examples/ci.yml?branch=main&style=flat-square)](https://github.com/libp2p/js-libp2p-examples/actions/workflows/ci.yml?query=branch%3Amain) + +> An example chat app using libp2p + +## Table of contents + +- [Setup](#setup) +- [Running](#running) +- [Need help?](#need-help) +- [License](#license) +- [Contribution](#contribution) + +## Setup + +1. Install example dependencies + ```console + $ npm install + ``` +1. Open 2 terminal windows in the `./src` directory. + +## Running + +1. Run the listener in window 1, `node listener.js` +1. Run the dialer in window 2, `node dialer.js` +1. Wait until the two peers discover each other +1. Type a message in either window and hit *enter* +1. Tell yourself secrets to your hearts content! + +## Need help? + +- Read the [js-libp2p documentation](https://github.com/libp2p/js-libp2p/tree/main/doc) +- Check out the [js-libp2p API docs](https://libp2p.github.io/js-libp2p/) +- Check out the [general libp2p documentation](https://docs.libp2p.io) for tips, how-tos and more +- Read the [libp2p specs](https://github.com/libp2p/specs) +- Ask a question on the [js-libp2p discussion board](https://github.com/libp2p/js-libp2p/discussions) + +## License + +Licensed under either of + +- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / ) +- MIT ([LICENSE-MIT](LICENSE-MIT) / ) + +## Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/tests/interop/js_libp2p/js_node/package.json b/tests/interop/js_libp2p/js_node/package.json new file mode 100644 index 00000000..e89ebc8f --- /dev/null +++ b/tests/interop/js_libp2p/js_node/package.json @@ -0,0 +1,39 @@ +{ + "name": "@libp2p/example-chat", + "version": "0.0.0", + "description": "An example chat app using libp2p", + "license": "Apache-2.0 OR MIT", + "homepage": "https://github.com/libp2p/js-libp2p-example-chat#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/libp2p/js-libp2p-examples.git" + }, + "bugs": { + "url": "https://github.com/libp2p/js-libp2p-examples/issues" + }, + "type": "module", + "scripts": { + "test": "test-node-example test/*" + }, + "dependencies": { + "@chainsafe/libp2p-noise": "^16.0.0", + "@chainsafe/libp2p-yamux": "^7.0.0", + "@libp2p/identify": "^3.0.33", + "@libp2p/mdns": "^11.0.1", + "@libp2p/ping": "^2.0.33", + "@libp2p/tcp": "^10.0.0", + "@libp2p/websockets": "^9.0.0", + "@multiformats/multiaddr": "^12.3.1", + "@nodeutils/defaults-deep": "^1.1.0", + "it-length-prefixed": "^10.0.1", + "it-map": "^3.0.3", + "it-pipe": "^3.0.1", + "libp2p": "^2.0.0", + "p-defer": "^4.0.0", + "uint8arrays": "^5.1.0" + }, + "devDependencies": { + "test-ipfs-example": "^1.1.0" + }, + "private": true +} diff --git a/tests/interop/js_libp2p/js_node/src/ping.js b/tests/interop/js_libp2p/js_node/src/ping.js new file mode 100644 index 00000000..c5a658c7 --- /dev/null +++ b/tests/interop/js_libp2p/js_node/src/ping.js @@ -0,0 +1,204 @@ +#!/usr/bin/env node + +import { createLibp2p } from 'libp2p' +import { tcp } from '@libp2p/tcp' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { ping } from '@libp2p/ping' +import { identify } from '@libp2p/identify' +import { multiaddr } from '@multiformats/multiaddr' + +async function createNode() { + return await createLibp2p({ + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + }, + transports: [ + tcp() + ], + connectionEncrypters: [ + noise() + ], + streamMuxers: [ + yamux() + ], + services: { + // Use ipfs prefix to match py-libp2p example + ping: ping({ + protocolPrefix: 'ipfs', + maxInboundStreams: 32, + maxOutboundStreams: 64, + timeout: 30000 + }), + identify: identify() + }, + connectionManager: { + minConnections: 0, + maxConnections: 100, + dialTimeout: 30000 + } + }) +} + +async function runServer() { + console.log('๐Ÿš€ Starting js-libp2p ping server...') + + const node = await createNode() + await node.start() + + console.log('โœ… Server started!') + console.log(`๐Ÿ“‹ Peer ID: ${node.peerId.toString()}`) + console.log('๐Ÿ“ Listening addresses:') + + node.getMultiaddrs().forEach(addr => { + console.log(` ${addr.toString()}`) + }) + + // Listen for connections + node.addEventListener('peer:connect', (evt) => { + console.log(`๐Ÿ”— Peer connected: ${evt.detail.toString()}`) + }) + + node.addEventListener('peer:disconnect', (evt) => { + console.log(`โŒ Peer disconnected: ${evt.detail.toString()}`) + }) + + console.log('\n๐ŸŽง Server ready for ping requests...') + console.log('Press Ctrl+C to exit') + + // Graceful shutdown + process.on('SIGINT', async () => { + console.log('\n๐Ÿ›‘ Shutting down...') + await node.stop() + process.exit(0) + }) + + // Keep alive + while (true) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } +} + +async function runClient(targetAddr, count = 5) { + console.log('๐Ÿš€ Starting js-libp2p ping client...') + + const node = await createNode() + await node.start() + + console.log(`๐Ÿ“‹ Our Peer ID: ${node.peerId.toString()}`) + console.log(`๐ŸŽฏ Target: ${targetAddr}`) + + try { + const ma = multiaddr(targetAddr) + const targetPeerId = ma.getPeerId() + + if (!targetPeerId) { + throw new Error('Could not extract peer ID from multiaddr') + } + + console.log(`๐ŸŽฏ Target Peer ID: ${targetPeerId}`) + console.log('๐Ÿ”— Connecting to peer...') + + const connection = await node.dial(ma) + console.log('โœ… Connection established!') + console.log(`๐Ÿ”— Connected to: ${connection.remotePeer.toString()}`) + + // Add a small delay to let the connection fully establish + await new Promise(resolve => setTimeout(resolve, 1000)) + + const rtts = [] + + for (let i = 1; i <= count; i++) { + try { + console.log(`\n๐Ÿ“ Sending ping ${i}/${count}...`); + console.log('[DEBUG] Attempting to open ping stream with protocol: /ipfs/ping/1.0.0'); + const start = Date.now() + + const stream = await connection.newStream(['/ipfs/ping/1.0.0']).catch(err => { + console.error(`[ERROR] Failed to open ping stream: ${err.message}`); + throw err; + }); + console.log('[DEBUG] Ping stream opened successfully'); + + const latency = await Promise.race([ + node.services.ping.ping(connection.remotePeer), + new Promise((_, reject) => + setTimeout(() => reject(new Error('Ping timeout')), 30000) // Increased timeout + ) + ]).catch(err => { + console.error(`[ERROR] Ping ${i} error: ${err.message}`); + throw err; + }); + + const rtt = Date.now() - start; + + rtts.push(latency) + console.log(`โœ… Ping ${i} successful!`) + console.log(` Reported latency: ${latency}ms`) + console.log(` Measured RTT: ${rtt}ms`) + + if (i < count) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } + } catch (error) { + console.error(`โŒ Ping ${i} failed:`, error.message) + // Try to continue with other pings + } + } + + // Stats + if (rtts.length > 0) { + const avg = rtts.reduce((a, b) => a + b, 0) / rtts.length + const min = Math.min(...rtts) + const max = Math.max(...rtts) + + console.log(`\n๐Ÿ“Š Ping Statistics:`) + console.log(` Packets: Sent=${count}, Received=${rtts.length}, Lost=${count - rtts.length}`) + console.log(` Latency: min=${min}ms, avg=${avg.toFixed(2)}ms, max=${max}ms`) + } else { + console.log(`\n๐Ÿ“Š All pings failed (${count} attempts)`) + } + + } catch (error) { + console.error('โŒ Client error:', error.message) + console.error('Stack:', error.stack) + process.exit(1) + } finally { + await node.stop() + console.log('\nโน๏ธ Client stopped') + } +} + +async function main() { + const args = process.argv.slice(2) + + if (args.length === 0) { + console.log('Usage:') + console.log(' node ping.js server # Start ping server') + console.log(' node ping.js client [count] # Ping a peer') + console.log('') + console.log('Examples:') + console.log(' node ping.js server') + console.log(' node ping.js client /ip4/127.0.0.1/tcp/12345/p2p/12D3Ko... 5') + process.exit(1) + } + + const mode = args[0] + + if (mode === 'server') { + await runServer() + } else if (mode === 'client') { + if (args.length < 2) { + console.error('โŒ Client mode requires target multiaddr') + process.exit(1) + } + const targetAddr = args[1] + const count = parseInt(args[2]) || 5 + await runClient(targetAddr, count) + } else { + console.error('โŒ Invalid mode. Use "server" or "client"') + process.exit(1) + } +} + +main().catch(console.error) diff --git a/tests/interop/js_libp2p/js_node/src/ping_client.js b/tests/interop/js_libp2p/js_node/src/ping_client.js new file mode 100644 index 00000000..4708dd4f --- /dev/null +++ b/tests/interop/js_libp2p/js_node/src/ping_client.js @@ -0,0 +1,241 @@ +#!/usr/bin/env node + +import { createLibp2p } from 'libp2p' +import { tcp } from '@libp2p/tcp' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { ping } from '@libp2p/ping' +import { identify } from '@libp2p/identify' +import { multiaddr } from '@multiformats/multiaddr' +import fs from 'fs' +import path from 'path' + +// Create logs directory if it doesn't exist +const logsDir = path.join(process.cwd(), '../logs') +if (!fs.existsSync(logsDir)) { + fs.mkdirSync(logsDir, { recursive: true }) +} + +// Setup logging +const logFile = path.join(logsDir, 'js_ping_client.log') +const logStream = fs.createWriteStream(logFile, { flags: 'w' }) + +function log(message) { + const timestamp = new Date().toISOString() + const logLine = `${timestamp} - ${message}\n` + logStream.write(logLine) + console.log(message) +} + +async function createNode() { + log('๐Ÿ”ง Creating libp2p node...') + + const node = await createLibp2p({ + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] // Random port + }, + transports: [ + tcp() + ], + connectionEncrypters: [ + noise() + ], + streamMuxers: [ + yamux() + ], + services: { + ping: ping({ + protocolPrefix: 'ipfs', // Use ipfs prefix to match py-libp2p + maxInboundStreams: 32, + maxOutboundStreams: 64, + timeout: 30000, + runOnTransientConnection: true + }), + identify: identify() + }, + connectionManager: { + minConnections: 0, + maxConnections: 100, + dialTimeout: 30000, + maxParallelDials: 10 + } + }) + + log('โœ… Node created successfully') + return node +} + +async function runClient(targetAddr, count = 5) { + log('๐Ÿš€ Starting js-libp2p ping client...') + + const node = await createNode() + + // Add connection event listeners + node.addEventListener('peer:connect', (evt) => { + log(`๐Ÿ”— Connected to peer: ${evt.detail.toString()}`) + }) + + node.addEventListener('peer:disconnect', (evt) => { + log(`โŒ Disconnected from peer: ${evt.detail.toString()}`) + }) + + await node.start() + log('โœ… Node started') + + log(`๐Ÿ“‹ Our Peer ID: ${node.peerId.toString()}`) + log(`๐ŸŽฏ Target: ${targetAddr}`) + + try { + const ma = multiaddr(targetAddr) + const targetPeerId = ma.getPeerId() + + if (!targetPeerId) { + throw new Error('Could not extract peer ID from multiaddr') + } + + log(`๐ŸŽฏ Target Peer ID: ${targetPeerId}`) + + // Parse multiaddr components for debugging + const components = ma.toString().split('/') + log(`๐Ÿ“ Target components: ${components.join(' โ†’ ')}`) + + log('๐Ÿ”— Attempting to dial peer...') + const connection = await node.dial(ma) + log('โœ… Connection established!') + log(`๐Ÿ”— Connected to: ${connection.remotePeer.toString()}`) + log(`๐Ÿ”— Connection status: ${connection.status}`) + log(`๐Ÿ”— Connection direction: ${connection.direction}`) + + // List available protocols + if (connection.remoteAddr) { + log(`๐ŸŒ Remote address: ${connection.remoteAddr.toString()}`) + } + + // Wait for connection to stabilize + log('โณ Waiting for connection to stabilize...') + await new Promise(resolve => setTimeout(resolve, 2000)) + + // Attempt ping sequence + log(`\n๐Ÿ“ Starting ping sequence (${count} pings)...`) + const rtts = [] + + for (let i = 1; i <= count; i++) { + try { + log(`\n๐Ÿ“ Sending ping ${i}/${count}...`) + const start = Date.now() + + // Create a more robust ping with better error handling + const pingPromise = node.services.ping.ping(connection.remotePeer) + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Ping timeout (15s)')), 15000) + ) + + const latency = await Promise.race([pingPromise, timeoutPromise]) + const totalRtt = Date.now() - start + + rtts.push(latency) + log(`โœ… Ping ${i} successful!`) + log(` Reported latency: ${latency}ms`) + log(` Total RTT: ${totalRtt}ms`) + + // Wait between pings + if (i < count) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } + } catch (error) { + log(`โŒ Ping ${i} failed: ${error.message}`) + log(` Error type: ${error.constructor.name}`) + if (error.code) { + log(` Error code: ${error.code}`) + } + + // Check if connection is still alive + if (connection.status !== 'open') { + log(`โš ๏ธ Connection status changed to: ${connection.status}`) + break + } + } + } + + // Print statistics + if (rtts.length > 0) { + const avg = rtts.reduce((a, b) => a + b, 0) / rtts.length + const min = Math.min(...rtts) + const max = Math.max(...rtts) + const lossRate = ((count - rtts.length) / count * 100).toFixed(1) + + log(`\n๐Ÿ“Š Ping Statistics:`) + log(` Packets: Sent=${count}, Received=${rtts.length}, Lost=${count - rtts.length}`) + log(` Loss rate: ${lossRate}%`) + log(` Latency: min=${min}ms, avg=${avg.toFixed(2)}ms, max=${max}ms`) + } else { + log(`\n๐Ÿ“Š All pings failed (${count} attempts)`) + } + + // Close connection gracefully + log('\n๐Ÿ”’ Closing connection...') + await connection.close() + + } catch (error) { + log(`โŒ Client error: ${error.message}`) + log(` Error type: ${error.constructor.name}`) + if (error.stack) { + log(` Stack trace: ${error.stack}`) + } + process.exit(1) + } finally { + log('๐Ÿ›‘ Stopping node...') + await node.stop() + log('โน๏ธ Client stopped') + logStream.end() + } +} + +async function main() { + const args = process.argv.slice(2) + + if (args.length === 0) { + console.log('Usage:') + console.log(' node ping-client.js [count]') + console.log('') + console.log('Examples:') + console.log(' node ping-client.js /ip4/127.0.0.1/tcp/8000/p2p/QmExample... 5') + console.log(' node ping-client.js /ip4/127.0.0.1/tcp/8000/p2p/QmExample... 10') + process.exit(1) + } + + const targetAddr = args[0] + const count = parseInt(args[1]) || 5 + + if (count <= 0 || count > 100) { + console.error('โŒ Count must be between 1 and 100') + process.exit(1) + } + + await runClient(targetAddr, count) +} + +// Handle graceful shutdown +process.on('SIGINT', () => { + log('\n๐Ÿ‘‹ Shutting down...') + logStream.end() + process.exit(0) +}) + +process.on('uncaughtException', (error) => { + log(`๐Ÿ’ฅ Uncaught exception: ${error.message}`) + if (error.stack) { + log(`Stack: ${error.stack}`) + } + logStream.end() + process.exit(1) +}) + +main().catch((error) => { + log(`๐Ÿ’ฅ Fatal error: ${error.message}`) + if (error.stack) { + log(`Stack: ${error.stack}`) + } + logStream.end() + process.exit(1) +}) diff --git a/tests/interop/js_libp2p/js_node/src/ping_server.js b/tests/interop/js_libp2p/js_node/src/ping_server.js new file mode 100644 index 00000000..6188cc65 --- /dev/null +++ b/tests/interop/js_libp2p/js_node/src/ping_server.js @@ -0,0 +1,167 @@ +#!/usr/bin/env node + +import { createLibp2p } from 'libp2p' +import { tcp } from '@libp2p/tcp' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { ping } from '@libp2p/ping' +import { identify } from '@libp2p/identify' +import fs from 'fs' +import path from 'path' + +// Create logs directory if it doesn't exist +const logsDir = path.join(process.cwd(), '../logs') +if (!fs.existsSync(logsDir)) { + fs.mkdirSync(logsDir, { recursive: true }) +} + +// Setup logging +const logFile = path.join(logsDir, 'js_ping_server.log') +const logStream = fs.createWriteStream(logFile, { flags: 'w' }) + +function log(message) { + const timestamp = new Date().toISOString() + const logLine = `${timestamp} - ${message}\n` + logStream.write(logLine) + console.log(message) +} + +async function createNode(port) { + log('๐Ÿ”ง Creating libp2p node...') + + const node = await createLibp2p({ + addresses: { + listen: [`/ip4/0.0.0.0/tcp/${port}`] + }, + transports: [ + tcp() + ], + connectionEncrypters: [ + noise() + ], + streamMuxers: [ + yamux() + ], + services: { + ping: ping({ + protocolPrefix: 'ipfs', // Use ipfs prefix to match py-libp2p + maxInboundStreams: 32, + maxOutboundStreams: 64, + timeout: 30000, + runOnTransientConnection: true + }), + identify: identify() + }, + connectionManager: { + minConnections: 0, + maxConnections: 100, + dialTimeout: 30000, + maxParallelDials: 10 + } + }) + + log('โœ… Node created successfully') + return node +} + +async function runServer(port) { + log('๐Ÿš€ Starting js-libp2p ping server...') + + const node = await createNode(port) + + // Add connection event listeners + node.addEventListener('peer:connect', (evt) => { + log(`๐Ÿ”— New peer connected: ${evt.detail.toString()}`) + }) + + node.addEventListener('peer:disconnect', (evt) => { + log(`โŒ Peer disconnected: ${evt.detail.toString()}`) + }) + + // Add protocol handler for incoming streams + node.addEventListener('peer:identify', (evt) => { + log(`๐Ÿ” Peer identified: ${evt.detail.peerId.toString()}`) + log(` Protocols: ${evt.detail.protocols.join(', ')}`) + log(` Listen addresses: ${evt.detail.listenAddrs.map(addr => addr.toString()).join(', ')}`) + }) + + await node.start() + log('โœ… Node started') + + const peerId = node.peerId.toString() + const listenAddrs = node.getMultiaddrs() + + log(`๐Ÿ“‹ Peer ID: ${peerId}`) + log(`๐ŸŒ Listen addresses:`) + listenAddrs.forEach(addr => { + log(` ${addr.toString()}`) + }) + + // Find the main TCP address for easy copy-paste + const tcpAddr = listenAddrs.find(addr => + addr.toString().includes('/tcp/') && + !addr.toString().includes('/ws') + ) + + if (tcpAddr) { + log(`\n๐Ÿงช Test with py-libp2p:`) + log(` python ping_client.py ${tcpAddr.toString()}`) + log(`\n๐Ÿงช Test with js-libp2p:`) + log(` node ping-client.js ${tcpAddr.toString()}`) + } + + log(`\n๐Ÿ“ Ping service is running with protocol: /ipfs/ping/1.0.0`) + log(`๐Ÿ” Security: Noise encryption`) + log(`๐Ÿš‡ Muxer: Yamux stream multiplexing`) + log(`\nโณ Waiting for connections...`) + log('Press Ctrl+C to exit') + + // Keep the server running + return new Promise((resolve, reject) => { + process.on('SIGINT', () => { + log('\n๐Ÿ›‘ Shutting down server...') + node.stop().then(() => { + log('โน๏ธ Server stopped') + logStream.end() + resolve() + }).catch(reject) + }) + + process.on('uncaughtException', (error) => { + log(`๐Ÿ’ฅ Uncaught exception: ${error.message}`) + if (error.stack) { + log(`Stack: ${error.stack}`) + } + logStream.end() + reject(error) + }) + }) +} + +async function main() { + const args = process.argv.slice(2) + const port = parseInt(args[0]) || 9000 + + if (port <= 0 || port > 65535) { + console.error('โŒ Port must be between 1 and 65535') + process.exit(1) + } + + try { + await runServer(port) + } catch (error) { + console.error(`๐Ÿ’ฅ Fatal error: ${error.message}`) + if (error.stack) { + console.error(`Stack: ${error.stack}`) + } + process.exit(1) + } +} + +main().catch((error) => { + console.error(`๐Ÿ’ฅ Fatal error: ${error.message}`) + if (error.stack) { + console.error(`Stack: ${error.stack}`) + } + process.exit(1) +}) diff --git a/tests/interop/js_libp2p/scripts/run_test.ps1 b/tests/interop/js_libp2p/scripts/run_test.ps1 new file mode 100644 index 00000000..9654fc50 --- /dev/null +++ b/tests/interop/js_libp2p/scripts/run_test.ps1 @@ -0,0 +1,194 @@ +#!/usr/bin/env pwsh + +# run_test.ps1 - libp2p Interoperability Test Runner (PowerShell) +# Tests py-libp2p <-> js-libp2p ping communication + +$ErrorActionPreference = "Stop" + +# Colors for output +$Red = "`e[31m" +$Green = "`e[32m" +$Yellow = "`e[33m" +$Blue = "`e[34m" +$Cyan = "`e[36m" +$Reset = "`e[0m" + +function Write-ColorOutput { + param([string]$Message, [string]$Color = $Reset) + Write-Host "${Color}${Message}${Reset}" +} + +Write-ColorOutput "[CHECK] Checking prerequisites..." $Cyan +if (-not (Get-Command python -ErrorAction SilentlyContinue)) { + Write-ColorOutput "[ERROR] Python not found. Install Python 3.7+" $Red + exit 1 +} +if (-not (Get-Command node -ErrorAction SilentlyContinue)) { + Write-ColorOutput "[ERROR] Node.js not found. Install Node.js 16+" $Red + exit 1 +} + +Write-ColorOutput "[CHECK] Checking port 8000..." $Blue +$portCheck = netstat -a -n -o | findstr :8000 +if ($portCheck) { + Write-ColorOutput "[ERROR] Port 8000 in use. Free the port." $Red + Write-ColorOutput $portCheck $Yellow + exit 1 +} + +Write-ColorOutput "[DEBUG] Cleaning up Python processes..." $Blue +Get-Process -Name "python" -ErrorAction SilentlyContinue | Where-Object { $_.CommandLine -like "*ping.py*" } | Stop-Process -Force -ErrorAction SilentlyContinue + +Write-ColorOutput "[PYTHON] Starting server on port 8000..." $Yellow +Set-Location -Path "py_node" +$pyLogFile = "py_server_8000.log" +$pyErrLogFile = "py_server_8000.log.err" +$pyDebugLogFile = "ping_debug.log" + +if (Test-Path $pyLogFile) { Remove-Item $pyLogFile -Force -ErrorAction SilentlyContinue } +if (Test-Path $pyErrLogFile) { Remove-Item $pyErrLogFile -Force -ErrorAction SilentlyContinue } +if (Test-Path $pyDebugLogFile) { Remove-Item $pyDebugLogFile -Force -ErrorAction SilentlyContinue } + +$pyProcess = Start-Process -FilePath "python" -ArgumentList "-u", "ping.py", "server", "--port", "8000" -NoNewWindow -PassThru -RedirectStandardOutput $pyLogFile -RedirectStandardError $pyErrLogFile +Write-ColorOutput "[DEBUG] Python server PID: $($pyProcess.Id)" $Blue +Write-ColorOutput "[DEBUG] Python logs: $((Get-Location).Path)\$pyLogFile, $((Get-Location).Path)\$pyErrLogFile, $((Get-Location).Path)\$pyDebugLogFile" $Blue + +$timeoutSeconds = 20 +$startTime = Get-Date +$serverStarted = $false + +while (((Get-Date) - $startTime).TotalSeconds -lt $timeoutSeconds -and -not $serverStarted) { + if (Test-Path $pyLogFile) { + $content = Get-Content $pyLogFile -Raw -ErrorAction SilentlyContinue + if ($content -match "Server started|Listening") { + $serverStarted = $true + Write-ColorOutput "[OK] Python server started" $Green + } + } + if (Test-Path $pyErrLogFile) { + $errContent = Get-Content $pyErrLogFile -Raw -ErrorAction SilentlyContinue + if ($errContent) { + Write-ColorOutput "[DEBUG] Error log: $errContent" $Yellow + } + } + Start-Sleep -Milliseconds 500 +} + +if (-not $serverStarted) { + Write-ColorOutput "[ERROR] Python server failed to start" $Red + Write-ColorOutput "[DEBUG] Logs:" $Yellow + if (Test-Path $pyLogFile) { Get-Content $pyLogFile | Write-ColorOutput -Color $Yellow } + if (Test-Path $pyErrLogFile) { Get-Content $pyErrLogFile | Write-ColorOutput -Color $Yellow } + if (Test-Path $pyDebugLogFile) { Get-Content $pyDebugLogFile | Write-ColorOutput -Color $Yellow } + Write-ColorOutput "[DEBUG] Trying foreground run..." $Yellow + python -u ping.py server --port 8000 + exit 1 +} + +# Extract Peer ID +$peerInfo = $null +if (Test-Path $pyLogFile) { + $content = Get-Content $pyLogFile -Raw + $peerIdPattern = "Peer ID:\s*([A-Za-z0-9]+)" + $peerIdMatch = [regex]::Match($content, $peerIdPattern) + if ($peerIdMatch.Success) { + $peerId = $peerIdMatch.Groups[1].Value + $peerInfo = @{ + PeerId = $peerId + MultiAddr = "/ip4/127.0.0.1/tcp/8000/p2p/$peerId" + } + Write-ColorOutput "[OK] Peer ID: $peerId" $Cyan + Write-ColorOutput "[OK] MultiAddr: $($peerInfo.MultiAddr)" $Cyan + } +} + +if (-not $peerInfo) { + Write-ColorOutput "[ERROR] Could not extract Peer ID" $Red + if (Test-Path $pyLogFile) { Get-Content $pyLogFile | Write-ColorOutput -Color $Yellow } + if (Test-Path $pyErrLogFile) { Get-Content $pyErrLogFile | Write-ColorOutput -Color $Yellow } + if (Test-Path $pyDebugLogFile) { Get-Content $pyDebugLogFile | Write-ColorOutput -Color $Yellow } + Stop-Process -Id $pyProcess.Id -Force -ErrorAction SilentlyContinue + exit 1 +} + +# Start JavaScript client +Write-ColorOutput "[JAVASCRIPT] Starting client..." $Yellow +Set-Location -Path "../js_node" +$jsLogFile = "test_js_client_to_py_server.log" +$jsErrLogFile = "test_js_client_to_py_server.log.err" + +if (Test-Path $jsLogFile) { Remove-Item $jsLogFile -Force -ErrorAction SilentlyContinue } +if (Test-Path $jsErrLogFile) { Remove-Item $jsErrLogFile -Force -ErrorAction SilentlyContinue } + +$jsProcess = Start-Process -FilePath "node" -ArgumentList "src/ping.js", "client", $peerInfo.MultiAddr, "3" -NoNewWindow -PassThru -RedirectStandardOutput $jsLogFile -RedirectStandardError $jsErrLogFile +Write-ColorOutput "[DEBUG] JavaScript client PID: $($jsProcess.Id)" $Blue +Write-ColorOutput "[DEBUG] Client logs: $((Get-Location).Path)\$jsLogFile, $((Get-Location).Path)\$jsErrLogFile" $Blue + +# Wait for client to complete +$clientTimeout = 10 +$clientStart = Get-Date +while (-not $jsProcess.HasExited -and (((Get-Date) - $clientStart).TotalSeconds -lt $clientTimeout)) { + Start-Sleep -Seconds 1 +} + +if (-not $jsProcess.HasExited) { + Write-ColorOutput "[DEBUG] JavaScript client did not exit, terminating..." $Yellow + Stop-Process -Id $jsProcess.Id -Force -ErrorAction SilentlyContinue +} + +Write-ColorOutput "[CHECK] Results..." $Cyan +$success = $false +if (Test-Path $jsLogFile) { + $jsLogContent = Get-Content $jsLogFile -Raw -ErrorAction SilentlyContinue + if ($jsLogContent -match "successful|Ping.*successful") { + $success = $true + Write-ColorOutput "[SUCCESS] Ping test passed" $Green + } else { + Write-ColorOutput "[FAILED] No successful pings" $Red + Write-ColorOutput "[DEBUG] Client log path: $((Get-Location).Path)\$jsLogFile" $Yellow + Write-ColorOutput "Client log:" $Yellow + Write-ColorOutput $jsLogContent $Yellow + if (Test-Path $jsErrLogFile) { + Write-ColorOutput "[DEBUG] Client error log path: $((Get-Location).Path)\$jsErrLogFile" $Yellow + Write-ColorOutput "Client error log:" $Yellow + Get-Content $jsErrLogFile | Write-ColorOutput -Color $Yellow + } + Write-ColorOutput "[DEBUG] Python server log path: $((Get-Location).Path)\..\py_node\$pyLogFile" $Yellow + Write-ColorOutput "Python server log:" $Yellow + if (Test-Path "../py_node/$pyLogFile") { + $pyLogContent = Get-Content "../py_node/$pyLogFile" -Raw -ErrorAction SilentlyContinue + if ($pyLogContent) { Write-ColorOutput $pyLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow } + } else { + Write-ColorOutput "File not found" $Yellow + } + Write-ColorOutput "[DEBUG] Python server error log path: $((Get-Location).Path)\..\py_node\$pyErrLogFile" $Yellow + Write-ColorOutput "Python server error log:" $Yellow + if (Test-Path "../py_node/$pyErrLogFile") { + $pyErrLogContent = Get-Content "../py_node/$pyErrLogFile" -Raw -ErrorAction SilentlyContinue + if ($pyErrLogContent) { Write-ColorOutput $pyErrLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow } + } else { + Write-ColorOutput "File not found" $Yellow + } + Write-ColorOutput "[DEBUG] Python debug log path: $((Get-Location).Path)\..\py_node\$pyDebugLogFile" $Yellow + Write-ColorOutput "Python debug log:" $Yellow + if (Test-Path "../py_node/$pyDebugLogFile") { + $pyDebugLogContent = Get-Content "../py_node/$pyDebugLogFile" -Raw -ErrorAction SilentlyContinue + if ($pyDebugLogContent) { Write-ColorOutput $pyDebugLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow } + } else { + Write-ColorOutput "File not found" $Yellow + } + } +} + +Write-ColorOutput "[CLEANUP] Stopping processes..." $Yellow +Stop-Process -Id $pyProcess.Id -Force -ErrorAction SilentlyContinue +Stop-Process -Id $jsProcess.Id -Force -ErrorAction SilentlyContinue +Set-Location -Path "../" + +if ($success) { + Write-ColorOutput "[SUCCESS] Test completed" $Green + exit 0 +} else { + Write-ColorOutput "[FAILED] Test failed" $Red + exit 1 +} diff --git a/tests/interop/js_libp2p/scripts/run_test.sh b/tests/interop/js_libp2p/scripts/run_test.sh new file mode 100644 index 00000000..cbf9e627 --- /dev/null +++ b/tests/interop/js_libp2p/scripts/run_test.sh @@ -0,0 +1,215 @@ +#!/usr/bin/env bash + +# run_test.sh - libp2p Interoperability Test Runner (Bash) +# Tests py-libp2p <-> js-libp2p ping communication + +set -e + +# Colors for output +RED='\033[31m' +GREEN='\033[32m' +YELLOW='\033[33m' +BLUE='\033[34m' +CYAN='\033[36m' +RESET='\033[0m' + +write_color_output() { + local message="$1" + local color="${2:-$RESET}" + echo -e "${color}${message}${RESET}" +} + +write_color_output "[CHECK] Checking prerequisites..." "$CYAN" +if ! command -v python3 &> /dev/null && ! command -v python &> /dev/null; then + write_color_output "[ERROR] Python not found. Install Python 3.7+" "$RED" + exit 1 +fi + +# Use python3 if available, otherwise python +PYTHON_CMD="python3" +if ! command -v python3 &> /dev/null; then + PYTHON_CMD="python" +fi + +if ! command -v node &> /dev/null; then + write_color_output "[ERROR] Node.js not found. Install Node.js 16+" "$RED" + exit 1 +fi + +write_color_output "[CHECK] Checking port 8000..." "$BLUE" +if netstat -tuln 2>/dev/null | grep -q ":8000 " || ss -tuln 2>/dev/null | grep -q ":8000 "; then + write_color_output "[ERROR] Port 8000 in use. Free the port." "$RED" + if command -v netstat &> /dev/null; then + netstat -tuln | grep ":8000 " | write_color_output "$(cat)" "$YELLOW" + elif command -v ss &> /dev/null; then + ss -tuln | grep ":8000 " | write_color_output "$(cat)" "$YELLOW" + fi + exit 1 +fi + +write_color_output "[DEBUG] Cleaning up Python processes..." "$BLUE" +pkill -f "ping.py" 2>/dev/null || true + +write_color_output "[PYTHON] Starting server on port 8000..." "$YELLOW" +cd py_node + +PY_LOG_FILE="py_server_8000.log" +PY_ERR_LOG_FILE="py_server_8000.log.err" +PY_DEBUG_LOG_FILE="ping_debug.log" + +rm -f "$PY_LOG_FILE" "$PY_ERR_LOG_FILE" "$PY_DEBUG_LOG_FILE" + +$PYTHON_CMD -u ping.py server --port 8000 > "$PY_LOG_FILE" 2> "$PY_ERR_LOG_FILE" & +PY_PROCESS_PID=$! + +write_color_output "[DEBUG] Python server PID: $PY_PROCESS_PID" "$BLUE" +write_color_output "[DEBUG] Python logs: $(pwd)/$PY_LOG_FILE, $(pwd)/$PY_ERR_LOG_FILE, $(pwd)/$PY_DEBUG_LOG_FILE" "$BLUE" + +TIMEOUT_SECONDS=20 +START_TIME=$(date +%s) +SERVER_STARTED=false + +while [ $(($(date +%s) - START_TIME)) -lt $TIMEOUT_SECONDS ] && [ "$SERVER_STARTED" = false ]; do + if [ -f "$PY_LOG_FILE" ]; then + if grep -q "Server started\|Listening" "$PY_LOG_FILE" 2>/dev/null; then + SERVER_STARTED=true + write_color_output "[OK] Python server started" "$GREEN" + fi + fi + if [ -f "$PY_ERR_LOG_FILE" ] && [ -s "$PY_ERR_LOG_FILE" ]; then + ERR_CONTENT=$(cat "$PY_ERR_LOG_FILE" 2>/dev/null || true) + if [ -n "$ERR_CONTENT" ]; then + write_color_output "[DEBUG] Error log: $ERR_CONTENT" "$YELLOW" + fi + fi + sleep 0.5 +done + +if [ "$SERVER_STARTED" = false ]; then + write_color_output "[ERROR] Python server failed to start" "$RED" + write_color_output "[DEBUG] Logs:" "$YELLOW" + [ -f "$PY_LOG_FILE" ] && cat "$PY_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + [ -f "$PY_ERR_LOG_FILE" ] && cat "$PY_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + [ -f "$PY_DEBUG_LOG_FILE" ] && cat "$PY_DEBUG_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + write_color_output "[DEBUG] Trying foreground run..." "$YELLOW" + $PYTHON_CMD -u ping.py server --port 8000 + exit 1 +fi + +# Extract Peer ID +PEER_ID="" +MULTI_ADDR="" +if [ -f "$PY_LOG_FILE" ]; then + CONTENT=$(cat "$PY_LOG_FILE" 2>/dev/null || true) + PEER_ID=$(echo "$CONTENT" | grep -oP "Peer ID:\s*\K[A-Za-z0-9]+" || true) + if [ -n "$PEER_ID" ]; then + MULTI_ADDR="/ip4/127.0.0.1/tcp/8000/p2p/$PEER_ID" + write_color_output "[OK] Peer ID: $PEER_ID" "$CYAN" + write_color_output "[OK] MultiAddr: $MULTI_ADDR" "$CYAN" + fi +fi + +if [ -z "$PEER_ID" ]; then + write_color_output "[ERROR] Could not extract Peer ID" "$RED" + [ -f "$PY_LOG_FILE" ] && cat "$PY_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + [ -f "$PY_ERR_LOG_FILE" ] && cat "$PY_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + [ -f "$PY_DEBUG_LOG_FILE" ] && cat "$PY_DEBUG_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + kill $PY_PROCESS_PID 2>/dev/null || true + exit 1 +fi + +# Start JavaScript client +write_color_output "[JAVASCRIPT] Starting client..." "$YELLOW" +cd ../js_node + +JS_LOG_FILE="test_js_client_to_py_server.log" +JS_ERR_LOG_FILE="test_js_client_to_py_server.log.err" + +rm -f "$JS_LOG_FILE" "$JS_ERR_LOG_FILE" + +node src/ping.js client "$MULTI_ADDR" 3 > "$JS_LOG_FILE" 2> "$JS_ERR_LOG_FILE" & +JS_PROCESS_PID=$! + +write_color_output "[DEBUG] JavaScript client PID: $JS_PROCESS_PID" "$BLUE" +write_color_output "[DEBUG] Client logs: $(pwd)/$JS_LOG_FILE, $(pwd)/$JS_ERR_LOG_FILE" "$BLUE" + +# Wait for client to complete +CLIENT_TIMEOUT=10 +CLIENT_START=$(date +%s) +while kill -0 $JS_PROCESS_PID 2>/dev/null && [ $(($(date +%s) - CLIENT_START)) -lt $CLIENT_TIMEOUT ]; do + sleep 1 +done + +if kill -0 $JS_PROCESS_PID 2>/dev/null; then + write_color_output "[DEBUG] JavaScript client did not exit, terminating..." "$YELLOW" + kill $JS_PROCESS_PID 2>/dev/null || true +fi + +write_color_output "[CHECK] Results..." "$CYAN" +SUCCESS=false +if [ -f "$JS_LOG_FILE" ]; then + JS_LOG_CONTENT=$(cat "$JS_LOG_FILE" 2>/dev/null || true) + if echo "$JS_LOG_CONTENT" | grep -q "successful\|Ping.*successful"; then + SUCCESS=true + write_color_output "[SUCCESS] Ping test passed" "$GREEN" + else + write_color_output "[FAILED] No successful pings" "$RED" + write_color_output "[DEBUG] Client log path: $(pwd)/$JS_LOG_FILE" "$YELLOW" + write_color_output "Client log:" "$YELLOW" + write_color_output "$JS_LOG_CONTENT" "$YELLOW" + if [ -f "$JS_ERR_LOG_FILE" ]; then + write_color_output "[DEBUG] Client error log path: $(pwd)/$JS_ERR_LOG_FILE" "$YELLOW" + write_color_output "Client error log:" "$YELLOW" + cat "$JS_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done + fi + write_color_output "[DEBUG] Python server log path: $(pwd)/../py_node/$PY_LOG_FILE" "$YELLOW" + write_color_output "Python server log:" "$YELLOW" + if [ -f "../py_node/$PY_LOG_FILE" ]; then + PY_LOG_CONTENT=$(cat "../py_node/$PY_LOG_FILE" 2>/dev/null || true) + if [ -n "$PY_LOG_CONTENT" ]; then + write_color_output "$PY_LOG_CONTENT" "$YELLOW" + else + write_color_output "Empty or inaccessible" "$YELLOW" + fi + else + write_color_output "File not found" "$YELLOW" + fi + write_color_output "[DEBUG] Python server error log path: $(pwd)/../py_node/$PY_ERR_LOG_FILE" "$YELLOW" + write_color_output "Python server error log:" "$YELLOW" + if [ -f "../py_node/$PY_ERR_LOG_FILE" ]; then + PY_ERR_LOG_CONTENT=$(cat "../py_node/$PY_ERR_LOG_FILE" 2>/dev/null || true) + if [ -n "$PY_ERR_LOG_CONTENT" ]; then + write_color_output "$PY_ERR_LOG_CONTENT" "$YELLOW" + else + write_color_output "Empty or inaccessible" "$YELLOW" + fi + else + write_color_output "File not found" "$YELLOW" + fi + write_color_output "[DEBUG] Python debug log path: $(pwd)/../py_node/$PY_DEBUG_LOG_FILE" "$YELLOW" + write_color_output "Python debug log:" "$YELLOW" + if [ -f "../py_node/$PY_DEBUG_LOG_FILE" ]; then + PY_DEBUG_LOG_CONTENT=$(cat "../py_node/$PY_DEBUG_LOG_FILE" 2>/dev/null || true) + if [ -n "$PY_DEBUG_LOG_CONTENT" ]; then + write_color_output "$PY_DEBUG_LOG_CONTENT" "$YELLOW" + else + write_color_output "Empty or inaccessible" "$YELLOW" + fi + else + write_color_output "File not found" "$YELLOW" + fi + fi +fi + +write_color_output "[CLEANUP] Stopping processes..." "$YELLOW" +kill $PY_PROCESS_PID 2>/dev/null || true +kill $JS_PROCESS_PID 2>/dev/null || true +cd ../ + +if [ "$SUCCESS" = true ]; then + write_color_output "[SUCCESS] Test completed" "$GREEN" + exit 0 +else + write_color_output "[FAILED] Test failed" "$RED" + exit 1 +fi diff --git a/tests/interop/js_libp2p/test_js_basic.py b/tests/interop/js_libp2p/test_js_basic.py deleted file mode 100644 index f59dc4cf..00000000 --- a/tests/interop/js_libp2p/test_js_basic.py +++ /dev/null @@ -1,5 +0,0 @@ -def test_js_libp2p_placeholder(): - """ - Placeholder test for js-libp2p interop tests. - """ - assert True, "Placeholder test for js-libp2p interop tests"