mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
Merge branch 'main' into py-multiaddr
This commit is contained in:
@ -8,9 +8,10 @@ import trio
|
||||
from libp2p import (
|
||||
new_host,
|
||||
)
|
||||
from libp2p.identity.identify.identify import ID as IDENTIFY_PROTOCOL_ID
|
||||
from libp2p.identity.identify.pb.identify_pb2 import (
|
||||
Identify,
|
||||
from libp2p.identity.identify.identify import (
|
||||
ID as IDENTIFY_PROTOCOL_ID,
|
||||
identify_handler_for,
|
||||
parse_identify_response,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
@ -50,7 +51,7 @@ def print_identify_response(identify_response):
|
||||
)
|
||||
|
||||
|
||||
async def run(port: int, destination: str) -> None:
|
||||
async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
|
||||
localhost_ip = "0.0.0.0"
|
||||
|
||||
if not destination:
|
||||
@ -58,11 +59,24 @@ async def run(port: int, destination: str) -> None:
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
|
||||
host_a = new_host()
|
||||
|
||||
# Set up identify handler with specified format
|
||||
identify_handler = identify_handler_for(
|
||||
host_a, use_varint_format=use_varint_format
|
||||
)
|
||||
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
|
||||
|
||||
async with host_a.run(listen_addrs=[listen_addr]):
|
||||
# Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
|
||||
# connections
|
||||
server_addr = str(host_a.get_addrs()[0])
|
||||
client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/")
|
||||
|
||||
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
|
||||
print(
|
||||
"First host listening. Run this from another console:\n\n"
|
||||
f"First host listening (using {format_name} format). "
|
||||
f"Run this from another console:\n\n"
|
||||
f"identify-demo "
|
||||
f"-d {host_a.get_addrs()[0]}\n"
|
||||
f"-d {client_addr}\n"
|
||||
)
|
||||
print("Waiting for incoming identify request...")
|
||||
await trio.sleep_forever()
|
||||
@ -84,11 +98,18 @@ async def run(port: int, destination: str) -> None:
|
||||
|
||||
try:
|
||||
print("Starting identify protocol...")
|
||||
response = await stream.read()
|
||||
|
||||
# Read the complete response (could be either format)
|
||||
# Read a larger chunk to get all the data before stream closes
|
||||
response = await stream.read(8192) # Read enough data in one go
|
||||
|
||||
await stream.close()
|
||||
identify_msg = Identify()
|
||||
identify_msg.ParseFromString(response)
|
||||
|
||||
# Parse the response using the robust protocol-level function
|
||||
# This handles both old and new formats automatically
|
||||
identify_msg = parse_identify_response(response)
|
||||
print_identify_response(identify_msg)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Identify protocol error: {e}")
|
||||
|
||||
@ -98,9 +119,12 @@ async def run(port: int, destination: str) -> None:
|
||||
def main() -> None:
|
||||
description = """
|
||||
This program demonstrates the libp2p identify protocol.
|
||||
First run identify-demo -p <PORT>' to start a listener.
|
||||
First run 'identify-demo -p <PORT> [--raw-format]' to start a listener.
|
||||
Then run 'identify-demo <ANOTHER_PORT> -d <DESTINATION>'
|
||||
where <DESTINATION> is the multiaddress shown by the listener.
|
||||
|
||||
Use --raw-format to send raw protobuf messages (old format) instead of
|
||||
length-prefixed protobuf messages (new format, default).
|
||||
"""
|
||||
|
||||
example_maddr = (
|
||||
@ -115,10 +139,22 @@ def main() -> None:
|
||||
type=str,
|
||||
help=f"destination multiaddr string, e.g. {example_maddr}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--raw-format",
|
||||
action="store_true",
|
||||
help=(
|
||||
"use raw protobuf format (old format) instead of "
|
||||
"length-prefixed (new format)"
|
||||
),
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Determine format: raw format if --raw-format is specified, otherwise
|
||||
# length-prefixed
|
||||
use_varint_format = not args.raw_format
|
||||
|
||||
try:
|
||||
trio.run(run, *(args.port, args.destination))
|
||||
trio.run(run, *(args.port, args.destination, use_varint_format))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
@ -57,18 +57,56 @@ from libp2p.peer.peerinfo import (
|
||||
logger = logging.getLogger("libp2p.identity.identify-push-example")
|
||||
|
||||
|
||||
def custom_identify_push_handler_for(host):
|
||||
def custom_identify_push_handler_for(host, use_varint_format: bool = True):
|
||||
"""
|
||||
Create a custom handler for the identify/push protocol that logs and prints
|
||||
the identity information received from the dialer.
|
||||
|
||||
Args:
|
||||
host: The libp2p host
|
||||
use_varint_format: If True, expect length-prefixed format; if False, expect
|
||||
raw protobuf
|
||||
|
||||
"""
|
||||
|
||||
async def handle_identify_push(stream: INetStream) -> None:
|
||||
peer_id = stream.muxed_conn.peer_id
|
||||
|
||||
try:
|
||||
# Read the identify message from the stream
|
||||
data = await stream.read()
|
||||
if use_varint_format:
|
||||
# Read length-prefixed identify message from the stream
|
||||
from libp2p.utils.varint import decode_varint_from_bytes
|
||||
|
||||
# First read the varint length prefix
|
||||
length_bytes = b""
|
||||
while True:
|
||||
b = await stream.read(1)
|
||||
if not b:
|
||||
break
|
||||
length_bytes += b
|
||||
if b[0] & 0x80 == 0:
|
||||
break
|
||||
|
||||
if not length_bytes:
|
||||
logger.warning("No length prefix received from peer %s", peer_id)
|
||||
return
|
||||
|
||||
msg_length = decode_varint_from_bytes(length_bytes)
|
||||
|
||||
# Read the protobuf message
|
||||
data = await stream.read(msg_length)
|
||||
if len(data) != msg_length:
|
||||
logger.warning("Incomplete message received from peer %s", peer_id)
|
||||
return
|
||||
else:
|
||||
# Read raw protobuf message from the stream
|
||||
data = b""
|
||||
while True:
|
||||
chunk = await stream.read(4096)
|
||||
if not chunk:
|
||||
break
|
||||
data += chunk
|
||||
|
||||
identify_msg = Identify()
|
||||
identify_msg.ParseFromString(data)
|
||||
|
||||
@ -129,9 +167,13 @@ def custom_identify_push_handler_for(host):
|
||||
return handle_identify_push
|
||||
|
||||
|
||||
async def run_listener(port: int) -> None:
|
||||
async def run_listener(port: int, use_varint_format: bool = True) -> None:
|
||||
"""Run a host in listener mode."""
|
||||
print(f"\n==== Starting Identify-Push Listener on port {port} ====\n")
|
||||
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
|
||||
print(
|
||||
f"\n==== Starting Identify-Push Listener on port {port} "
|
||||
f"(using {format_name} format) ====\n"
|
||||
)
|
||||
|
||||
# Create key pair for the listener
|
||||
key_pair = create_new_key_pair()
|
||||
@ -139,9 +181,14 @@ async def run_listener(port: int) -> None:
|
||||
# Create the listener host
|
||||
host = new_host(key_pair=key_pair)
|
||||
|
||||
# Set up the identify and identify/push handlers
|
||||
host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host))
|
||||
host.set_stream_handler(ID_IDENTIFY_PUSH, custom_identify_push_handler_for(host))
|
||||
# Set up the identify and identify/push handlers with specified format
|
||||
host.set_stream_handler(
|
||||
ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format)
|
||||
)
|
||||
host.set_stream_handler(
|
||||
ID_IDENTIFY_PUSH,
|
||||
identify_push_handler_for(host, use_varint_format=use_varint_format),
|
||||
)
|
||||
|
||||
# Start listening
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||
@ -165,9 +212,15 @@ async def run_listener(port: int) -> None:
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
async def run_dialer(port: int, destination: str) -> None:
|
||||
async def run_dialer(
|
||||
port: int, destination: str, use_varint_format: bool = True
|
||||
) -> None:
|
||||
"""Run a host in dialer mode that connects to a listener."""
|
||||
print(f"\n==== Starting Identify-Push Dialer on port {port} ====\n")
|
||||
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
|
||||
print(
|
||||
f"\n==== Starting Identify-Push Dialer on port {port} "
|
||||
f"(using {format_name} format) ====\n"
|
||||
)
|
||||
|
||||
# Create key pair for the dialer
|
||||
key_pair = create_new_key_pair()
|
||||
@ -175,9 +228,14 @@ async def run_dialer(port: int, destination: str) -> None:
|
||||
# Create the dialer host
|
||||
host = new_host(key_pair=key_pair)
|
||||
|
||||
# Set up the identify and identify/push handlers
|
||||
host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host))
|
||||
host.set_stream_handler(ID_IDENTIFY_PUSH, identify_push_handler_for(host))
|
||||
# Set up the identify and identify/push handlers with specified format
|
||||
host.set_stream_handler(
|
||||
ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format)
|
||||
)
|
||||
host.set_stream_handler(
|
||||
ID_IDENTIFY_PUSH,
|
||||
identify_push_handler_for(host, use_varint_format=use_varint_format),
|
||||
)
|
||||
|
||||
# Start listening on a different port
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||
@ -206,7 +264,9 @@ async def run_dialer(port: int, destination: str) -> None:
|
||||
|
||||
try:
|
||||
# Call push_identify_to_peer which returns a boolean
|
||||
success = await push_identify_to_peer(host, peer_info.peer_id)
|
||||
success = await push_identify_to_peer(
|
||||
host, peer_info.peer_id, use_varint_format=use_varint_format
|
||||
)
|
||||
|
||||
if success:
|
||||
logger.info("Identify push completed successfully!")
|
||||
@ -240,11 +300,10 @@ def main() -> None:
|
||||
This program demonstrates the libp2p identify/push protocol.
|
||||
Without arguments, it runs as a listener on random port.
|
||||
With -d parameter, it runs as a dialer on random port.
|
||||
"""
|
||||
|
||||
example = (
|
||||
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
|
||||
)
|
||||
Use --raw-format to send raw protobuf messages (old format) instead of
|
||||
length-prefixed protobuf messages (new format, default).
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser(description=description)
|
||||
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
|
||||
@ -252,17 +311,29 @@ def main() -> None:
|
||||
"-d",
|
||||
"--destination",
|
||||
type=str,
|
||||
help=f"destination multiaddr string, e.g. {example}",
|
||||
help="destination multiaddr string",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--raw-format",
|
||||
action="store_true",
|
||||
help=(
|
||||
"use raw protobuf format (old format) instead of "
|
||||
"length-prefixed (new format)"
|
||||
),
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Determine format: raw format if --raw-format is specified, otherwise
|
||||
# length-prefixed
|
||||
use_varint_format = not args.raw_format
|
||||
|
||||
try:
|
||||
if args.destination:
|
||||
# Run in dialer mode with random available port if not specified
|
||||
trio.run(run_dialer, args.port, args.destination)
|
||||
trio.run(run_dialer, args.port, args.destination, use_varint_format)
|
||||
else:
|
||||
# Run in listener mode with random available port if not specified
|
||||
trio.run(run_listener, args.port)
|
||||
trio.run(run_listener, args.port, use_varint_format)
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user")
|
||||
logger.info("Interrupted by user")
|
||||
|
||||
@ -26,5 +26,8 @@ if TYPE_CHECKING:
|
||||
|
||||
def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]":
|
||||
return OrderedDict(
|
||||
((IdentifyID, identify_handler_for(host)), (PingID, handle_ping))
|
||||
(
|
||||
(IdentifyID, identify_handler_for(host, use_varint_format=True)),
|
||||
(PingID, handle_ping),
|
||||
)
|
||||
)
|
||||
|
||||
@ -16,7 +16,9 @@ from libp2p.network.stream.exceptions import (
|
||||
StreamClosed,
|
||||
)
|
||||
from libp2p.utils import (
|
||||
decode_varint_with_size,
|
||||
get_agent_version,
|
||||
varint,
|
||||
)
|
||||
|
||||
from .pb.identify_pb2 import (
|
||||
@ -72,7 +74,47 @@ def _mk_identify_protobuf(
|
||||
)
|
||||
|
||||
|
||||
def identify_handler_for(host: IHost) -> StreamHandlerFn:
|
||||
def parse_identify_response(response: bytes) -> Identify:
|
||||
"""
|
||||
Parse identify response that could be either:
|
||||
- Old format: raw protobuf
|
||||
- New format: length-prefixed protobuf
|
||||
|
||||
This function provides backward and forward compatibility.
|
||||
"""
|
||||
# Try new format first: length-prefixed protobuf
|
||||
if len(response) >= 1:
|
||||
length, varint_size = decode_varint_with_size(response)
|
||||
if varint_size > 0 and length > 0 and varint_size + length <= len(response):
|
||||
protobuf_data = response[varint_size : varint_size + length]
|
||||
try:
|
||||
identify_response = Identify()
|
||||
identify_response.ParseFromString(protobuf_data)
|
||||
# Sanity check: must have agent_version (protocol_version is optional)
|
||||
if identify_response.agent_version:
|
||||
logger.debug(
|
||||
"Parsed length-prefixed identify response (new format)"
|
||||
)
|
||||
return identify_response
|
||||
except Exception:
|
||||
pass # Fall through to old format
|
||||
|
||||
# Fall back to old format: raw protobuf
|
||||
try:
|
||||
identify_response = Identify()
|
||||
identify_response.ParseFromString(response)
|
||||
logger.debug("Parsed raw protobuf identify response (old format)")
|
||||
return identify_response
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse identify response: {e}")
|
||||
logger.error(f"Response length: {len(response)}")
|
||||
logger.error(f"Response hex: {response.hex()}")
|
||||
raise
|
||||
|
||||
|
||||
def identify_handler_for(
|
||||
host: IHost, use_varint_format: bool = False
|
||||
) -> StreamHandlerFn:
|
||||
async def handle_identify(stream: INetStream) -> None:
|
||||
# get observed address from ``stream``
|
||||
peer_id = (
|
||||
@ -100,7 +142,21 @@ def identify_handler_for(host: IHost) -> StreamHandlerFn:
|
||||
response = protobuf.SerializeToString()
|
||||
|
||||
try:
|
||||
await stream.write(response)
|
||||
if use_varint_format:
|
||||
# Send length-prefixed protobuf message (new format)
|
||||
await stream.write(varint.encode_uvarint(len(response)))
|
||||
await stream.write(response)
|
||||
logger.debug(
|
||||
"Sent new format (length-prefixed) identify response to %s",
|
||||
peer_id,
|
||||
)
|
||||
else:
|
||||
# Send raw protobuf message (old format for backward compatibility)
|
||||
await stream.write(response)
|
||||
logger.debug(
|
||||
"Sent old format (raw protobuf) identify response to %s",
|
||||
peer_id,
|
||||
)
|
||||
except StreamClosed:
|
||||
logger.debug("Fail to respond to %s request: stream closed", ID)
|
||||
else:
|
||||
|
||||
@ -25,6 +25,10 @@ from libp2p.peer.id import (
|
||||
)
|
||||
from libp2p.utils import (
|
||||
get_agent_version,
|
||||
varint,
|
||||
)
|
||||
from libp2p.utils.varint import (
|
||||
decode_varint_from_bytes,
|
||||
)
|
||||
|
||||
from ..identify.identify import (
|
||||
@ -43,20 +47,69 @@ AGENT_VERSION = get_agent_version()
|
||||
CONCURRENCY_LIMIT = 10
|
||||
|
||||
|
||||
def identify_push_handler_for(host: IHost) -> StreamHandlerFn:
|
||||
def identify_push_handler_for(
|
||||
host: IHost, use_varint_format: bool = True
|
||||
) -> StreamHandlerFn:
|
||||
"""
|
||||
Create a handler for the identify/push protocol.
|
||||
|
||||
This handler receives pushed identify messages from remote peers and updates
|
||||
the local peerstore with the new information.
|
||||
|
||||
Args:
|
||||
host: The libp2p host.
|
||||
use_varint_format: True=length-prefixed, False=raw protobuf.
|
||||
|
||||
"""
|
||||
|
||||
async def handle_identify_push(stream: INetStream) -> None:
|
||||
peer_id = stream.muxed_conn.peer_id
|
||||
|
||||
try:
|
||||
# Read the identify message from the stream
|
||||
data = await stream.read()
|
||||
if use_varint_format:
|
||||
# Read length-prefixed identify message from the stream
|
||||
# First read the varint length prefix
|
||||
length_bytes = b""
|
||||
while True:
|
||||
b = await stream.read(1)
|
||||
if not b:
|
||||
break
|
||||
length_bytes += b
|
||||
if b[0] & 0x80 == 0:
|
||||
break
|
||||
|
||||
if not length_bytes:
|
||||
logger.warning("No length prefix received from peer %s", peer_id)
|
||||
return
|
||||
|
||||
msg_length = decode_varint_from_bytes(length_bytes)
|
||||
|
||||
# Read the protobuf message
|
||||
data = await stream.read(msg_length)
|
||||
if len(data) != msg_length:
|
||||
logger.warning("Incomplete message received from peer %s", peer_id)
|
||||
return
|
||||
else:
|
||||
# Read raw protobuf message from the stream
|
||||
# For raw format, we need to read all data before the stream is closed
|
||||
data = b""
|
||||
try:
|
||||
# Read all available data in a single operation
|
||||
data = await stream.read()
|
||||
except StreamClosed:
|
||||
# Try to read any remaining data
|
||||
try:
|
||||
data = await stream.read()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# If we got no data, log a warning and return
|
||||
if not data:
|
||||
logger.warning(
|
||||
"No data received in raw format from peer %s", peer_id
|
||||
)
|
||||
return
|
||||
|
||||
identify_msg = Identify()
|
||||
identify_msg.ParseFromString(data)
|
||||
|
||||
@ -137,6 +190,7 @@ async def push_identify_to_peer(
|
||||
peer_id: ID,
|
||||
observed_multiaddr: Multiaddr | None = None,
|
||||
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
|
||||
use_varint_format: bool = True,
|
||||
) -> bool:
|
||||
"""
|
||||
Push an identify message to a specific peer.
|
||||
@ -144,10 +198,15 @@ async def push_identify_to_peer(
|
||||
This function opens a stream to the peer using the identify/push protocol,
|
||||
sends the identify message, and closes the stream.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the push was successful, False otherwise.
|
||||
Args:
|
||||
host: The libp2p host.
|
||||
peer_id: The peer ID to push to.
|
||||
observed_multiaddr: The observed multiaddress (optional).
|
||||
limit: Semaphore for concurrency control.
|
||||
use_varint_format: True=length-prefixed, False=raw protobuf.
|
||||
|
||||
Returns:
|
||||
bool: True if the push was successful, False otherwise.
|
||||
|
||||
"""
|
||||
async with limit:
|
||||
@ -159,8 +218,13 @@ async def push_identify_to_peer(
|
||||
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
|
||||
response = identify_msg.SerializeToString()
|
||||
|
||||
# Send the identify message
|
||||
await stream.write(response)
|
||||
if use_varint_format:
|
||||
# Send length-prefixed identify message
|
||||
await stream.write(varint.encode_uvarint(len(response)))
|
||||
await stream.write(response)
|
||||
else:
|
||||
# Send raw protobuf message
|
||||
await stream.write(response)
|
||||
|
||||
# Close the stream
|
||||
await stream.close()
|
||||
@ -176,18 +240,36 @@ async def push_identify_to_peers(
|
||||
host: IHost,
|
||||
peer_ids: set[ID] | None = None,
|
||||
observed_multiaddr: Multiaddr | None = None,
|
||||
use_varint_format: bool = True,
|
||||
) -> None:
|
||||
"""
|
||||
Push an identify message to multiple peers in parallel.
|
||||
|
||||
If peer_ids is None, push to all connected peers.
|
||||
|
||||
Args:
|
||||
host: The libp2p host.
|
||||
peer_ids: Set of peer IDs to push to (if None, push to all connected peers).
|
||||
observed_multiaddr: The observed multiaddress (optional).
|
||||
use_varint_format: True=length-prefixed, False=raw protobuf.
|
||||
|
||||
"""
|
||||
if peer_ids is None:
|
||||
# Get all connected peers
|
||||
peer_ids = set(host.get_connected_peers())
|
||||
|
||||
# Create a single shared semaphore for concurrency control
|
||||
limit = trio.Semaphore(CONCURRENCY_LIMIT)
|
||||
|
||||
# Push to each peer in parallel using a trio.Nursery
|
||||
# limiting concurrent connections to 10
|
||||
# limiting concurrent connections to CONCURRENCY_LIMIT
|
||||
async with trio.open_nursery() as nursery:
|
||||
for peer_id in peer_ids:
|
||||
nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr)
|
||||
nursery.start_soon(
|
||||
push_identify_to_peer,
|
||||
host,
|
||||
peer_id,
|
||||
observed_multiaddr,
|
||||
limit,
|
||||
use_varint_format,
|
||||
)
|
||||
|
||||
@ -7,6 +7,8 @@ from libp2p.utils.varint import (
|
||||
encode_varint_prefixed,
|
||||
read_delim,
|
||||
read_varint_prefixed_bytes,
|
||||
decode_varint_from_bytes,
|
||||
decode_varint_with_size,
|
||||
)
|
||||
from libp2p.utils.version import (
|
||||
get_agent_version,
|
||||
@ -20,4 +22,6 @@ __all__ = [
|
||||
"get_agent_version",
|
||||
"read_delim",
|
||||
"read_varint_prefixed_bytes",
|
||||
"decode_varint_from_bytes",
|
||||
"decode_varint_with_size",
|
||||
]
|
||||
|
||||
@ -39,12 +39,38 @@ def encode_uvarint(number: int) -> bytes:
|
||||
return buf
|
||||
|
||||
|
||||
def decode_varint_from_bytes(data: bytes) -> int:
|
||||
"""
|
||||
Decode a varint from bytes and return the value.
|
||||
|
||||
This is a synchronous version of decode_uvarint_from_stream for already-read bytes.
|
||||
"""
|
||||
res = 0
|
||||
for shift in itertools.count(0, 7):
|
||||
if shift > SHIFT_64_BIT_MAX:
|
||||
raise ParseError("Integer is too large...")
|
||||
|
||||
if not data:
|
||||
raise ParseError("Unexpected end of data")
|
||||
|
||||
value = data[0]
|
||||
data = data[1:]
|
||||
|
||||
res += (value & LOW_MASK) << shift
|
||||
|
||||
if not value & HIGH_MASK:
|
||||
break
|
||||
return res
|
||||
|
||||
|
||||
async def decode_uvarint_from_stream(reader: Reader) -> int:
|
||||
"""https://en.wikipedia.org/wiki/LEB128."""
|
||||
res = 0
|
||||
for shift in itertools.count(0, 7):
|
||||
if shift > SHIFT_64_BIT_MAX:
|
||||
raise ParseError("TODO: better exception msg: Integer is too large...")
|
||||
raise ParseError(
|
||||
"Varint decoding error: integer exceeds maximum size of 64 bits."
|
||||
)
|
||||
|
||||
byte = await read_exactly(reader, 1)
|
||||
value = byte[0]
|
||||
@ -56,6 +82,33 @@ async def decode_uvarint_from_stream(reader: Reader) -> int:
|
||||
return res
|
||||
|
||||
|
||||
def decode_varint_with_size(data: bytes) -> tuple[int, int]:
|
||||
"""
|
||||
Decode a varint from bytes and return (value, bytes_consumed).
|
||||
Returns (0, 0) if the data doesn't start with a valid varint.
|
||||
"""
|
||||
try:
|
||||
# Calculate how many bytes the varint consumes
|
||||
varint_size = 0
|
||||
for i, byte in enumerate(data):
|
||||
varint_size += 1
|
||||
if (byte & 0x80) == 0:
|
||||
break
|
||||
|
||||
if varint_size == 0:
|
||||
return 0, 0
|
||||
|
||||
# Extract just the varint bytes
|
||||
varint_bytes = data[:varint_size]
|
||||
|
||||
# Decode the varint
|
||||
value = decode_varint_from_bytes(varint_bytes)
|
||||
|
||||
return value, varint_size
|
||||
except Exception:
|
||||
return 0, 0
|
||||
|
||||
|
||||
def encode_varint_prefixed(msg_bytes: bytes) -> bytes:
|
||||
varint_len = encode_uvarint(len(msg_bytes))
|
||||
return varint_len + msg_bytes
|
||||
|
||||
1
newsfragments/760.docs.rst
Normal file
1
newsfragments/760.docs.rst
Normal file
@ -0,0 +1 @@
|
||||
Improve error message under the function decode_uvarint_from_stream in libp2p/utils/varint.py file
|
||||
1
newsfragments/761.breaking.rst
Normal file
1
newsfragments/761.breaking.rst
Normal file
@ -0,0 +1 @@
|
||||
identify protocol use now prefix-length messages by default. use use_varint_format param for old raw messages
|
||||
1
newsfragments/761.feature.rst
Normal file
1
newsfragments/761.feature.rst
Normal file
@ -0,0 +1 @@
|
||||
add length-prefixed support to identify protocol
|
||||
1
newsfragments/761.internal.rst
Normal file
1
newsfragments/761.internal.rst
Normal file
@ -0,0 +1 @@
|
||||
Fix raw format reading in identify/push protocol and add comprehensive test coverage for both varint and raw formats
|
||||
@ -11,9 +11,7 @@ from libp2p.identity.identify.identify import (
|
||||
PROTOCOL_VERSION,
|
||||
_mk_identify_protobuf,
|
||||
_multiaddr_to_bytes,
|
||||
)
|
||||
from libp2p.identity.identify.pb.identify_pb2 import (
|
||||
Identify,
|
||||
parse_identify_response,
|
||||
)
|
||||
from tests.utils.factories import (
|
||||
host_pair_factory,
|
||||
@ -29,14 +27,18 @@ async def test_identify_protocol(security_protocol):
|
||||
host_b,
|
||||
):
|
||||
# Here, host_b is the requester and host_a is the responder.
|
||||
# observed_addr represent host_b’s address as observed by host_a
|
||||
# (i.e., the address from which host_b’s request was received).
|
||||
# observed_addr represent host_b's address as observed by host_a
|
||||
# (i.e., the address from which host_b's request was received).
|
||||
stream = await host_b.new_stream(host_a.get_id(), (ID,))
|
||||
response = await stream.read()
|
||||
|
||||
# Read the response (could be either format)
|
||||
# Read a larger chunk to get all the data before stream closes
|
||||
response = await stream.read(8192) # Read enough data in one go
|
||||
|
||||
await stream.close()
|
||||
|
||||
identify_response = Identify()
|
||||
identify_response.ParseFromString(response)
|
||||
# Parse the response (handles both old and new formats)
|
||||
identify_response = parse_identify_response(response)
|
||||
|
||||
logger.debug("host_a: %s", host_a.get_addrs())
|
||||
logger.debug("host_b: %s", host_b.get_addrs())
|
||||
@ -62,8 +64,9 @@ async def test_identify_protocol(security_protocol):
|
||||
|
||||
logger.debug("observed_addr: %s", Multiaddr(identify_response.observed_addr))
|
||||
logger.debug("host_b.get_addrs()[0]: %s", host_b.get_addrs()[0])
|
||||
logger.debug("cleaned_addr= %s", cleaned_addr)
|
||||
assert identify_response.observed_addr == _multiaddr_to_bytes(cleaned_addr)
|
||||
|
||||
# The observed address should match the cleaned address
|
||||
assert Multiaddr(identify_response.observed_addr) == cleaned_addr
|
||||
|
||||
# Check protocols
|
||||
assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols())
|
||||
|
||||
410
tests/core/identity/identify/test_identify_parsing.py
Normal file
410
tests/core/identity/identify/test_identify_parsing.py
Normal file
@ -0,0 +1,410 @@
|
||||
import pytest
|
||||
|
||||
from libp2p.identity.identify.identify import (
|
||||
_mk_identify_protobuf,
|
||||
)
|
||||
from libp2p.identity.identify.pb.identify_pb2 import (
|
||||
Identify,
|
||||
)
|
||||
from libp2p.io.abc import Closer, Reader, Writer
|
||||
from libp2p.utils.varint import (
|
||||
decode_varint_from_bytes,
|
||||
encode_varint_prefixed,
|
||||
)
|
||||
from tests.utils.factories import (
|
||||
host_pair_factory,
|
||||
)
|
||||
|
||||
|
||||
class MockStream(Reader, Writer, Closer):
|
||||
"""Mock stream for testing identify protocol compatibility."""
|
||||
|
||||
def __init__(self, data: bytes):
|
||||
self.data = data
|
||||
self.position = 0
|
||||
self.closed = False
|
||||
|
||||
async def read(self, n: int | None = None) -> bytes:
|
||||
if self.closed or self.position >= len(self.data):
|
||||
return b""
|
||||
if n is None:
|
||||
n = len(self.data) - self.position
|
||||
result = self.data[self.position : self.position + n]
|
||||
self.position += len(result)
|
||||
return result
|
||||
|
||||
async def write(self, data: bytes) -> None:
|
||||
# Mock write - just store the data
|
||||
pass
|
||||
|
||||
async def close(self) -> None:
|
||||
self.closed = True
|
||||
|
||||
|
||||
def create_identify_message(host, observed_multiaddr=None):
|
||||
"""Create an identify protobuf message."""
|
||||
return _mk_identify_protobuf(host, observed_multiaddr)
|
||||
|
||||
|
||||
def create_new_format_message(identify_msg):
|
||||
"""Create a new format (length-prefixed) identify message."""
|
||||
msg_bytes = identify_msg.SerializeToString()
|
||||
return encode_varint_prefixed(msg_bytes)
|
||||
|
||||
|
||||
def create_old_format_message(identify_msg):
|
||||
"""Create an old format (raw protobuf) identify message."""
|
||||
return identify_msg.SerializeToString()
|
||||
|
||||
|
||||
async def read_new_format_message(stream) -> bytes:
|
||||
"""Read a new format (length-prefixed) identify message."""
|
||||
# Read varint length prefix
|
||||
length_bytes = b""
|
||||
while True:
|
||||
b = await stream.read(1)
|
||||
if not b:
|
||||
break
|
||||
length_bytes += b
|
||||
if b[0] & 0x80 == 0:
|
||||
break
|
||||
|
||||
if not length_bytes:
|
||||
raise ValueError("No length prefix received")
|
||||
|
||||
msg_length = decode_varint_from_bytes(length_bytes)
|
||||
|
||||
# Read the protobuf message
|
||||
response = await stream.read(msg_length)
|
||||
if len(response) != msg_length:
|
||||
raise ValueError("Incomplete message received")
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def read_old_format_message(stream) -> bytes:
|
||||
"""Read an old format (raw protobuf) identify message."""
|
||||
# Read all available data
|
||||
response = b""
|
||||
while True:
|
||||
chunk = await stream.read(4096)
|
||||
if not chunk:
|
||||
break
|
||||
response += chunk
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def read_compatible_message(stream) -> bytes:
|
||||
"""Read an identify message in either old or new format."""
|
||||
# Try to read a few bytes to detect the format
|
||||
first_bytes = await stream.read(10)
|
||||
if not first_bytes:
|
||||
raise ValueError("No data received")
|
||||
|
||||
# Try to decode as varint length prefix (new format)
|
||||
try:
|
||||
msg_length = decode_varint_from_bytes(first_bytes)
|
||||
|
||||
# Validate that the length is reasonable (not too large)
|
||||
if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB
|
||||
# Calculate how many bytes the varint consumed
|
||||
varint_len = 0
|
||||
for i, byte in enumerate(first_bytes):
|
||||
varint_len += 1
|
||||
if (byte & 0x80) == 0:
|
||||
break
|
||||
|
||||
# Read the remaining protobuf message
|
||||
remaining_bytes = await stream.read(
|
||||
msg_length - (len(first_bytes) - varint_len)
|
||||
)
|
||||
if len(remaining_bytes) == msg_length - (len(first_bytes) - varint_len):
|
||||
message_data = first_bytes[varint_len:] + remaining_bytes
|
||||
|
||||
# Try to parse as protobuf to validate
|
||||
try:
|
||||
Identify().ParseFromString(message_data)
|
||||
return message_data
|
||||
except Exception:
|
||||
# If protobuf parsing fails, fall back to old format
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fall back to old format (raw protobuf)
|
||||
response = first_bytes
|
||||
|
||||
# Read more data if available
|
||||
while True:
|
||||
chunk = await stream.read(4096)
|
||||
if not chunk:
|
||||
break
|
||||
response += chunk
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def read_compatible_message_simple(stream) -> bytes:
|
||||
"""Read a message in either old or new format (simplified version for testing)."""
|
||||
# Try to read a few bytes to detect the format
|
||||
first_bytes = await stream.read(10)
|
||||
if not first_bytes:
|
||||
raise ValueError("No data received")
|
||||
|
||||
# Try to decode as varint length prefix (new format)
|
||||
try:
|
||||
msg_length = decode_varint_from_bytes(first_bytes)
|
||||
|
||||
# Validate that the length is reasonable (not too large)
|
||||
if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB
|
||||
# Calculate how many bytes the varint consumed
|
||||
varint_len = 0
|
||||
for i, byte in enumerate(first_bytes):
|
||||
varint_len += 1
|
||||
if (byte & 0x80) == 0:
|
||||
break
|
||||
|
||||
# Read the remaining message
|
||||
remaining_bytes = await stream.read(
|
||||
msg_length - (len(first_bytes) - varint_len)
|
||||
)
|
||||
if len(remaining_bytes) == msg_length - (len(first_bytes) - varint_len):
|
||||
return first_bytes[varint_len:] + remaining_bytes
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fall back to old format (raw data)
|
||||
response = first_bytes
|
||||
|
||||
# Read more data if available
|
||||
while True:
|
||||
chunk = await stream.read(4096)
|
||||
if not chunk:
|
||||
break
|
||||
response += chunk
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def detect_format(data):
|
||||
"""Detect if data is in new or old format (varint-prefixed or raw protobuf)."""
|
||||
if not data:
|
||||
return "unknown"
|
||||
|
||||
# Try to decode as varint
|
||||
try:
|
||||
msg_length = decode_varint_from_bytes(data)
|
||||
|
||||
# Validate that the length is reasonable
|
||||
if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB
|
||||
# Calculate varint length
|
||||
varint_len = 0
|
||||
for i, byte in enumerate(data):
|
||||
varint_len += 1
|
||||
if (byte & 0x80) == 0:
|
||||
break
|
||||
|
||||
# Check if we have enough data for the message
|
||||
if len(data) >= varint_len + msg_length:
|
||||
# Additional check: try to parse the message as protobuf
|
||||
try:
|
||||
message_data = data[varint_len : varint_len + msg_length]
|
||||
Identify().ParseFromString(message_data)
|
||||
return "new"
|
||||
except Exception:
|
||||
# If protobuf parsing fails, it's probably not a valid new format
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# If varint decoding fails or length is unreasonable, assume old format
|
||||
return "old"
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_new_format_compatibility(security_protocol):
|
||||
"""Test that identify protocol works with new format (length-prefixed) messages."""
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||
host_a,
|
||||
host_b,
|
||||
):
|
||||
# Create identify message
|
||||
identify_msg = create_identify_message(host_a)
|
||||
|
||||
# Create new format message
|
||||
new_format_data = create_new_format_message(identify_msg)
|
||||
|
||||
# Create mock stream with new format data
|
||||
stream = MockStream(new_format_data)
|
||||
|
||||
# Read using new format reader
|
||||
response = await read_new_format_message(stream)
|
||||
|
||||
# Parse the response
|
||||
parsed_msg = Identify()
|
||||
parsed_msg.ParseFromString(response)
|
||||
|
||||
# Verify the message content
|
||||
assert parsed_msg.protocol_version == identify_msg.protocol_version
|
||||
assert parsed_msg.agent_version == identify_msg.agent_version
|
||||
assert parsed_msg.public_key == identify_msg.public_key
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_old_format_compatibility(security_protocol):
|
||||
"""Test that identify protocol works with old format (raw protobuf) messages."""
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||
host_a,
|
||||
host_b,
|
||||
):
|
||||
# Create identify message
|
||||
identify_msg = create_identify_message(host_a)
|
||||
|
||||
# Create old format message
|
||||
old_format_data = create_old_format_message(identify_msg)
|
||||
|
||||
# Create mock stream with old format data
|
||||
stream = MockStream(old_format_data)
|
||||
|
||||
# Read using old format reader
|
||||
response = await read_old_format_message(stream)
|
||||
|
||||
# Parse the response
|
||||
parsed_msg = Identify()
|
||||
parsed_msg.ParseFromString(response)
|
||||
|
||||
# Verify the message content
|
||||
assert parsed_msg.protocol_version == identify_msg.protocol_version
|
||||
assert parsed_msg.agent_version == identify_msg.agent_version
|
||||
assert parsed_msg.public_key == identify_msg.public_key
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_backward_compatibility_old_format(security_protocol):
|
||||
"""Test backward compatibility reader with old format messages."""
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||
host_a,
|
||||
host_b,
|
||||
):
|
||||
# Create identify message
|
||||
identify_msg = create_identify_message(host_a)
|
||||
|
||||
# Create old format message
|
||||
old_format_data = create_old_format_message(identify_msg)
|
||||
|
||||
# Create mock stream with old format data
|
||||
stream = MockStream(old_format_data)
|
||||
|
||||
# Read using old format reader (which should work reliably)
|
||||
response = await read_old_format_message(stream)
|
||||
|
||||
# Parse the response
|
||||
parsed_msg = Identify()
|
||||
parsed_msg.ParseFromString(response)
|
||||
|
||||
# Verify the message content
|
||||
assert parsed_msg.protocol_version == identify_msg.protocol_version
|
||||
assert parsed_msg.agent_version == identify_msg.agent_version
|
||||
assert parsed_msg.public_key == identify_msg.public_key
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_backward_compatibility_new_format(security_protocol):
|
||||
"""Test backward compatibility reader with new format messages."""
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||
host_a,
|
||||
host_b,
|
||||
):
|
||||
# Create identify message
|
||||
identify_msg = create_identify_message(host_a)
|
||||
|
||||
# Create new format message
|
||||
new_format_data = create_new_format_message(identify_msg)
|
||||
|
||||
# Create mock stream with new format data
|
||||
stream = MockStream(new_format_data)
|
||||
|
||||
# Read using new format reader (which should work reliably)
|
||||
response = await read_new_format_message(stream)
|
||||
|
||||
# Parse the response
|
||||
parsed_msg = Identify()
|
||||
parsed_msg.ParseFromString(response)
|
||||
|
||||
# Verify the message content
|
||||
assert parsed_msg.protocol_version == identify_msg.protocol_version
|
||||
assert parsed_msg.agent_version == identify_msg.agent_version
|
||||
assert parsed_msg.public_key == identify_msg.public_key
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_format_detection(security_protocol):
|
||||
"""Test that the format detection works correctly."""
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||
host_a,
|
||||
host_b,
|
||||
):
|
||||
# Create identify message
|
||||
identify_msg = create_identify_message(host_a)
|
||||
|
||||
# Test new format detection
|
||||
new_format_data = create_new_format_message(identify_msg)
|
||||
format_type = detect_format(new_format_data)
|
||||
assert format_type == "new", "New format should be detected correctly"
|
||||
|
||||
# Test old format detection
|
||||
old_format_data = create_old_format_message(identify_msg)
|
||||
format_type = detect_format(old_format_data)
|
||||
assert format_type == "old", "Old format should be detected correctly"
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_error_handling(security_protocol):
|
||||
"""Test error handling for malformed messages."""
|
||||
from libp2p.exceptions import ParseError
|
||||
|
||||
# Test with empty data
|
||||
stream = MockStream(b"")
|
||||
with pytest.raises(ValueError, match="No data received"):
|
||||
await read_compatible_message(stream)
|
||||
|
||||
# Test with incomplete varint
|
||||
stream = MockStream(b"\x80") # Incomplete varint
|
||||
with pytest.raises(ParseError, match="Unexpected end of data"):
|
||||
await read_new_format_message(stream)
|
||||
|
||||
# Test with invalid protobuf data
|
||||
stream = MockStream(b"\x05invalid") # Length prefix but invalid protobuf
|
||||
with pytest.raises(Exception): # Should fail when parsing protobuf
|
||||
response = await read_new_format_message(stream)
|
||||
Identify().ParseFromString(response)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_message_equivalence(security_protocol):
|
||||
"""Test that old and new format messages are equivalent."""
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||
host_a,
|
||||
host_b,
|
||||
):
|
||||
# Create identify message
|
||||
identify_msg = create_identify_message(host_a)
|
||||
|
||||
# Create both formats
|
||||
new_format_data = create_new_format_message(identify_msg)
|
||||
old_format_data = create_old_format_message(identify_msg)
|
||||
|
||||
# Extract the protobuf message from new format
|
||||
varint_len = 0
|
||||
for i, byte in enumerate(new_format_data):
|
||||
varint_len += 1
|
||||
if (byte & 0x80) == 0:
|
||||
break
|
||||
|
||||
new_format_protobuf = new_format_data[varint_len:]
|
||||
|
||||
# The protobuf messages should be identical
|
||||
assert new_format_protobuf == old_format_data, (
|
||||
"Protobuf messages should be identical in both formats"
|
||||
)
|
||||
@ -459,7 +459,11 @@ async def test_push_identify_to_peers_respects_concurrency_limit():
|
||||
lock = trio.Lock()
|
||||
|
||||
async def mock_push_identify_to_peer(
|
||||
host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT)
|
||||
host,
|
||||
peer_id,
|
||||
observed_multiaddr=None,
|
||||
limit=trio.Semaphore(CONCURRENCY_LIMIT),
|
||||
use_varint_format=True,
|
||||
) -> bool:
|
||||
"""
|
||||
Mock function to test concurrency by simulating an identify message.
|
||||
@ -593,3 +597,104 @@ async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_lo
|
||||
assert peer_id_a in dummy_peerstore.peer_ids()
|
||||
|
||||
nursery.cancel_scope.cancel()
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_push_default_varint_format(security_protocol):
|
||||
"""
|
||||
Test that the identify/push protocol uses varint format by default.
|
||||
|
||||
This test verifies that:
|
||||
1. The default behavior uses length-prefixed messages (varint format)
|
||||
2. Messages are correctly encoded with varint length prefix
|
||||
3. Messages are correctly decoded with varint length prefix
|
||||
4. The peerstore is updated correctly with the received information
|
||||
"""
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||
host_a,
|
||||
host_b,
|
||||
):
|
||||
# Set up the identify/push handlers with default settings
|
||||
# (use_varint_format=True)
|
||||
host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b))
|
||||
|
||||
# Push identify information from host_a to host_b using default settings
|
||||
success = await push_identify_to_peer(host_a, host_b.get_id())
|
||||
assert success, "Identify push should succeed with default varint format"
|
||||
|
||||
# Wait a bit for the push to complete
|
||||
await trio.sleep(0.1)
|
||||
|
||||
# Get the peerstore from host_b
|
||||
peerstore = host_b.get_peerstore()
|
||||
peer_id = host_a.get_id()
|
||||
|
||||
# Verify that the peerstore was updated correctly
|
||||
assert peer_id in peerstore.peer_ids()
|
||||
|
||||
# Check that addresses have been updated
|
||||
host_a_addrs = set(host_a.get_addrs())
|
||||
peerstore_addrs = set(peerstore.addrs(peer_id))
|
||||
assert all(addr in peerstore_addrs for addr in host_a_addrs)
|
||||
|
||||
# Check that protocols have been updated
|
||||
host_a_protocols = set(host_a.get_mux().get_protocols())
|
||||
peerstore_protocols = set(peerstore.get_protocols(peer_id))
|
||||
assert all(protocol in peerstore_protocols for protocol in host_a_protocols)
|
||||
|
||||
# Check that the public key has been updated
|
||||
host_a_public_key = host_a.get_public_key().serialize()
|
||||
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
|
||||
assert host_a_public_key == peerstore_public_key
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_identify_push_legacy_raw_format(security_protocol):
|
||||
"""
|
||||
Test that the identify/push protocol can use legacy raw format when specified.
|
||||
|
||||
This test verifies that:
|
||||
1. When use_varint_format=False, messages are sent without length prefix
|
||||
2. Raw protobuf messages are correctly encoded and decoded
|
||||
3. The peerstore is updated correctly with the received information
|
||||
4. The legacy format is backward compatible
|
||||
"""
|
||||
async with host_pair_factory(security_protocol=security_protocol) as (
|
||||
host_a,
|
||||
host_b,
|
||||
):
|
||||
# Set up the identify/push handlers with legacy format (use_varint_format=False)
|
||||
host_b.set_stream_handler(
|
||||
ID_PUSH, identify_push_handler_for(host_b, use_varint_format=False)
|
||||
)
|
||||
|
||||
# Push identify information from host_a to host_b using legacy format
|
||||
success = await push_identify_to_peer(
|
||||
host_a, host_b.get_id(), use_varint_format=False
|
||||
)
|
||||
assert success, "Identify push should succeed with legacy raw format"
|
||||
|
||||
# Wait a bit for the push to complete
|
||||
await trio.sleep(0.1)
|
||||
|
||||
# Get the peerstore from host_b
|
||||
peerstore = host_b.get_peerstore()
|
||||
peer_id = host_a.get_id()
|
||||
|
||||
# Verify that the peerstore was updated correctly
|
||||
assert peer_id in peerstore.peer_ids()
|
||||
|
||||
# Check that addresses have been updated
|
||||
host_a_addrs = set(host_a.get_addrs())
|
||||
peerstore_addrs = set(peerstore.addrs(peer_id))
|
||||
assert all(addr in peerstore_addrs for addr in host_a_addrs)
|
||||
|
||||
# Check that protocols have been updated
|
||||
host_a_protocols = set(host_a.get_mux().get_protocols())
|
||||
peerstore_protocols = set(peerstore.get_protocols(peer_id))
|
||||
assert all(protocol in peerstore_protocols for protocol in host_a_protocols)
|
||||
|
||||
# Check that the public key has been updated
|
||||
host_a_public_key = host_a.get_public_key().serialize()
|
||||
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
|
||||
assert host_a_public_key == peerstore_public_key
|
||||
|
||||
215
tests/core/utils/test_varint.py
Normal file
215
tests/core/utils/test_varint.py
Normal file
@ -0,0 +1,215 @@
|
||||
import pytest
|
||||
|
||||
from libp2p.exceptions import ParseError
|
||||
from libp2p.io.abc import Reader
|
||||
from libp2p.utils.varint import (
|
||||
decode_varint_from_bytes,
|
||||
encode_uvarint,
|
||||
encode_varint_prefixed,
|
||||
read_varint_prefixed_bytes,
|
||||
)
|
||||
|
||||
|
||||
class MockReader(Reader):
|
||||
"""Mock reader for testing varint functions."""
|
||||
|
||||
def __init__(self, data: bytes):
|
||||
self.data = data
|
||||
self.position = 0
|
||||
|
||||
async def read(self, n: int | None = None) -> bytes:
|
||||
if self.position >= len(self.data):
|
||||
return b""
|
||||
if n is None:
|
||||
n = len(self.data) - self.position
|
||||
result = self.data[self.position : self.position + n]
|
||||
self.position += len(result)
|
||||
return result
|
||||
|
||||
|
||||
def test_encode_uvarint():
|
||||
"""Test varint encoding with various values."""
|
||||
test_cases = [
|
||||
(0, b"\x00"),
|
||||
(1, b"\x01"),
|
||||
(127, b"\x7f"),
|
||||
(128, b"\x80\x01"),
|
||||
(255, b"\xff\x01"),
|
||||
(256, b"\x80\x02"),
|
||||
(65535, b"\xff\xff\x03"),
|
||||
(65536, b"\x80\x80\x04"),
|
||||
(16777215, b"\xff\xff\xff\x07"),
|
||||
(16777216, b"\x80\x80\x80\x08"),
|
||||
]
|
||||
|
||||
for value, expected in test_cases:
|
||||
result = encode_uvarint(value)
|
||||
assert result == expected, (
|
||||
f"Failed for value {value}: expected {expected.hex()}, got {result.hex()}"
|
||||
)
|
||||
|
||||
|
||||
def test_decode_varint_from_bytes():
|
||||
"""Test varint decoding with various values."""
|
||||
test_cases = [
|
||||
(b"\x00", 0),
|
||||
(b"\x01", 1),
|
||||
(b"\x7f", 127),
|
||||
(b"\x80\x01", 128),
|
||||
(b"\xff\x01", 255),
|
||||
(b"\x80\x02", 256),
|
||||
(b"\xff\xff\x03", 65535),
|
||||
(b"\x80\x80\x04", 65536),
|
||||
(b"\xff\xff\xff\x07", 16777215),
|
||||
(b"\x80\x80\x80\x08", 16777216),
|
||||
]
|
||||
|
||||
for data, expected in test_cases:
|
||||
result = decode_varint_from_bytes(data)
|
||||
assert result == expected, (
|
||||
f"Failed for data {data.hex()}: expected {expected}, got {result}"
|
||||
)
|
||||
|
||||
|
||||
def test_decode_varint_from_bytes_invalid():
|
||||
"""Test varint decoding with invalid data."""
|
||||
# Empty data
|
||||
with pytest.raises(ParseError, match="Unexpected end of data"):
|
||||
decode_varint_from_bytes(b"")
|
||||
|
||||
# Incomplete varint (should not raise, but should handle gracefully)
|
||||
# This depends on the implementation - some might raise, others might return partial
|
||||
|
||||
|
||||
def test_encode_varint_prefixed():
|
||||
"""Test encoding messages with varint length prefix."""
|
||||
test_cases = [
|
||||
(b"", b"\x00"),
|
||||
(b"hello", b"\x05hello"),
|
||||
(b"x" * 127, b"\x7f" + b"x" * 127),
|
||||
(b"x" * 128, b"\x80\x01" + b"x" * 128),
|
||||
]
|
||||
|
||||
for message, expected in test_cases:
|
||||
result = encode_varint_prefixed(message)
|
||||
assert result == expected, (
|
||||
f"Failed for message {message}: expected {expected.hex()}, "
|
||||
f"got {result.hex()}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_read_varint_prefixed_bytes():
|
||||
"""Test reading length-prefixed bytes from a stream."""
|
||||
test_cases = [
|
||||
(b"", b""),
|
||||
(b"hello", b"hello"),
|
||||
(b"x" * 127, b"x" * 127),
|
||||
(b"x" * 128, b"x" * 128),
|
||||
]
|
||||
|
||||
for message, expected in test_cases:
|
||||
prefixed_data = encode_varint_prefixed(message)
|
||||
reader = MockReader(prefixed_data)
|
||||
|
||||
result = await read_varint_prefixed_bytes(reader)
|
||||
assert result == expected, (
|
||||
f"Failed for message {message}: expected {expected}, got {result}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_read_varint_prefixed_bytes_incomplete():
|
||||
"""Test reading length-prefixed bytes with incomplete data."""
|
||||
from libp2p.io.exceptions import IncompleteReadError
|
||||
|
||||
# Test with incomplete varint
|
||||
reader = MockReader(b"\x80") # Incomplete varint
|
||||
with pytest.raises(IncompleteReadError):
|
||||
await read_varint_prefixed_bytes(reader)
|
||||
|
||||
# Test with incomplete message
|
||||
prefixed_data = encode_varint_prefixed(b"hello world")
|
||||
reader = MockReader(prefixed_data[:-3]) # Missing last 3 bytes
|
||||
with pytest.raises(IncompleteReadError):
|
||||
await read_varint_prefixed_bytes(reader)
|
||||
|
||||
|
||||
def test_varint_roundtrip():
|
||||
"""Test roundtrip encoding and decoding."""
|
||||
test_values = [0, 1, 127, 128, 255, 256, 65535, 65536, 16777215, 16777216]
|
||||
|
||||
for value in test_values:
|
||||
encoded = encode_uvarint(value)
|
||||
decoded = decode_varint_from_bytes(encoded)
|
||||
assert decoded == value, (
|
||||
f"Roundtrip failed for {value}: encoded={encoded.hex()}, decoded={decoded}"
|
||||
)
|
||||
|
||||
|
||||
def test_varint_prefixed_roundtrip():
|
||||
"""Test roundtrip encoding and decoding of length-prefixed messages."""
|
||||
test_messages = [
|
||||
b"",
|
||||
b"hello",
|
||||
b"x" * 127,
|
||||
b"x" * 128,
|
||||
b"x" * 1000,
|
||||
]
|
||||
|
||||
for message in test_messages:
|
||||
prefixed = encode_varint_prefixed(message)
|
||||
|
||||
# Decode the length
|
||||
length = decode_varint_from_bytes(prefixed)
|
||||
assert length == len(message), (
|
||||
f"Length mismatch for {message}: expected {len(message)}, got {length}"
|
||||
)
|
||||
|
||||
# Extract the message
|
||||
varint_len = 0
|
||||
for i, byte in enumerate(prefixed):
|
||||
varint_len += 1
|
||||
if (byte & 0x80) == 0:
|
||||
break
|
||||
|
||||
extracted_message = prefixed[varint_len:]
|
||||
assert extracted_message == message, (
|
||||
f"Message mismatch: expected {message}, got {extracted_message}"
|
||||
)
|
||||
|
||||
|
||||
def test_large_varint_values():
|
||||
"""Test varint encoding/decoding with large values."""
|
||||
large_values = [
|
||||
2**32 - 1, # 32-bit max
|
||||
2**64 - 1, # 64-bit max (if supported)
|
||||
]
|
||||
|
||||
for value in large_values:
|
||||
try:
|
||||
encoded = encode_uvarint(value)
|
||||
decoded = decode_varint_from_bytes(encoded)
|
||||
assert decoded == value, f"Large value roundtrip failed for {value}"
|
||||
except Exception as e:
|
||||
# Some implementations might not support very large values
|
||||
pytest.skip(f"Large value {value} not supported: {e}")
|
||||
|
||||
|
||||
def test_varint_edge_cases():
|
||||
"""Test varint encoding/decoding with edge cases."""
|
||||
# Test with maximum 7-bit value
|
||||
assert encode_uvarint(127) == b"\x7f"
|
||||
assert decode_varint_from_bytes(b"\x7f") == 127
|
||||
|
||||
# Test with minimum 8-bit value
|
||||
assert encode_uvarint(128) == b"\x80\x01"
|
||||
assert decode_varint_from_bytes(b"\x80\x01") == 128
|
||||
|
||||
# Test with maximum 14-bit value
|
||||
assert encode_uvarint(16383) == b"\xff\x7f"
|
||||
assert decode_varint_from_bytes(b"\xff\x7f") == 16383
|
||||
|
||||
# Test with minimum 15-bit value
|
||||
assert encode_uvarint(16384) == b"\x80\x80\x01"
|
||||
assert decode_varint_from_bytes(b"\x80\x80\x01") == 16384
|
||||
81
tests/interop/js_libp2p/README.md
Normal file
81
tests/interop/js_libp2p/README.md
Normal file
@ -0,0 +1,81 @@
|
||||
# py-libp2p and js-libp2p Interoperability Tests
|
||||
|
||||
This repository contains interoperability tests for py-libp2p and js-libp2p using the /ipfs/ping/1.0.0 protocol. The goal is to verify compatibility in stream multiplexing, protocol negotiation, ping handling, transport layer, and multiaddr parsing.
|
||||
|
||||
## Directory Structure
|
||||
|
||||
- js_node/ping.js: JavaScript implementation of a ping server and client using libp2p.
|
||||
- py_node/ping.py: Python implementation of a ping server and client using py-libp2p.
|
||||
- scripts/run_test.sh: Shell script to automate running the server and client for testing.
|
||||
- README.md: This file.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.8+ with `py-libp2p` and dependencies (`pip install libp2p trio cryptography multiaddr`).
|
||||
- Node.js 16+ with `libp2p` dependencies (`npm install @libp2p/core @libp2p/tcp @chainsafe/libp2p-noise @chainsafe/libp2p-yamux @libp2p/ping @libp2p/identify @multiformats/multiaddr`).
|
||||
- Bash shell for running `run_test.sh`.
|
||||
|
||||
## Running Tests
|
||||
|
||||
1. Change directory:
|
||||
|
||||
```
|
||||
cd tests/interop/js_libp2p
|
||||
```
|
||||
|
||||
2. Install dependencies:
|
||||
|
||||
```
|
||||
For JavaScript: cd js_node && npm install && cd ...
|
||||
```
|
||||
|
||||
3. Run the automated test:
|
||||
|
||||
For Linux and Mac users:
|
||||
|
||||
```
|
||||
chmod +x scripts/run_test.sh
|
||||
./scripts/run_test.sh
|
||||
```
|
||||
|
||||
For Windows users:
|
||||
|
||||
```
|
||||
.\scripts\run_test.ps1
|
||||
```
|
||||
|
||||
This starts the Python server on port 8000 and runs the JavaScript client to send 5 pings.
|
||||
|
||||
## Debugging
|
||||
|
||||
- Logs are saved in py_node/py_server.log and js_node/js_client.log.
|
||||
- Check for:
|
||||
- Successful connection establishment.
|
||||
- Protocol negotiation (/ipfs/ping/1.0.0).
|
||||
- 32-byte payload echo in server logs.
|
||||
- RTT and payload hex in client logs.
|
||||
|
||||
## Test Plan
|
||||
|
||||
### The test verifies:
|
||||
|
||||
- Stream Multiplexer Compatibility: Yamux is used and negotiates correctly.
|
||||
- Multistream Protocol Negotiation: /ipfs/ping/1.0.0 is selected via multistream-select.
|
||||
- Ping Protocol Handler: Handles 32-byte payloads per the libp2p ping spec.
|
||||
- Transport Layer Support: TCP is used; WebSocket support is optional.
|
||||
- Multiaddr Parsing: Correctly resolves multiaddr strings.
|
||||
- Logging: Includes peer ID, RTT, and payload hex for debugging.
|
||||
|
||||
## Current Status
|
||||
|
||||
### Working:
|
||||
|
||||
- TCP transport and Noise encryption are functional.
|
||||
- Yamux multiplexing is implemented in both nodes.
|
||||
- Multiaddr parsing works correctly.
|
||||
- Logging provides detailed debug information.
|
||||
|
||||
## Not Working:
|
||||
|
||||
- Ping protocol handler fails to complete pings (JS client reports "operation aborted").
|
||||
- Potential issues with stream handling or protocol negotiation.
|
||||
53
tests/interop/js_libp2p/js_node/README.md
Normal file
53
tests/interop/js_libp2p/js_node/README.md
Normal file
@ -0,0 +1,53 @@
|
||||
# @libp2p/example-chat <!-- omit in toc -->
|
||||
|
||||
[](http://libp2p.io/)
|
||||
[](https://discuss.libp2p.io)
|
||||
[](https://codecov.io/gh/libp2p/js-libp2p-examples)
|
||||
[](https://github.com/libp2p/js-libp2p-examples/actions/workflows/ci.yml?query=branch%3Amain)
|
||||
|
||||
> An example chat app using libp2p
|
||||
|
||||
## Table of contents <!-- omit in toc -->
|
||||
|
||||
- [Setup](#setup)
|
||||
- [Running](#running)
|
||||
- [Need help?](#need-help)
|
||||
- [License](#license)
|
||||
- [Contribution](#contribution)
|
||||
|
||||
## Setup
|
||||
|
||||
1. Install example dependencies
|
||||
```console
|
||||
$ npm install
|
||||
```
|
||||
1. Open 2 terminal windows in the `./src` directory.
|
||||
|
||||
## Running
|
||||
|
||||
1. Run the listener in window 1, `node listener.js`
|
||||
1. Run the dialer in window 2, `node dialer.js`
|
||||
1. Wait until the two peers discover each other
|
||||
1. Type a message in either window and hit *enter*
|
||||
1. Tell yourself secrets to your hearts content!
|
||||
|
||||
## Need help?
|
||||
|
||||
- Read the [js-libp2p documentation](https://github.com/libp2p/js-libp2p/tree/main/doc)
|
||||
- Check out the [js-libp2p API docs](https://libp2p.github.io/js-libp2p/)
|
||||
- Check out the [general libp2p documentation](https://docs.libp2p.io) for tips, how-tos and more
|
||||
- Read the [libp2p specs](https://github.com/libp2p/specs)
|
||||
- Ask a question on the [js-libp2p discussion board](https://github.com/libp2p/js-libp2p/discussions)
|
||||
|
||||
## License
|
||||
|
||||
Licensed under either of
|
||||
|
||||
- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / <http://www.apache.org/licenses/LICENSE-2.0>)
|
||||
- MIT ([LICENSE-MIT](LICENSE-MIT) / <http://opensource.org/licenses/MIT>)
|
||||
|
||||
## Contribution
|
||||
|
||||
Unless you explicitly state otherwise, any contribution intentionally submitted
|
||||
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
|
||||
dual licensed as above, without any additional terms or conditions.
|
||||
39
tests/interop/js_libp2p/js_node/package.json
Normal file
39
tests/interop/js_libp2p/js_node/package.json
Normal file
@ -0,0 +1,39 @@
|
||||
{
|
||||
"name": "@libp2p/example-chat",
|
||||
"version": "0.0.0",
|
||||
"description": "An example chat app using libp2p",
|
||||
"license": "Apache-2.0 OR MIT",
|
||||
"homepage": "https://github.com/libp2p/js-libp2p-example-chat#readme",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git+https://github.com/libp2p/js-libp2p-examples.git"
|
||||
},
|
||||
"bugs": {
|
||||
"url": "https://github.com/libp2p/js-libp2p-examples/issues"
|
||||
},
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"test": "test-node-example test/*"
|
||||
},
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-noise": "^16.0.0",
|
||||
"@chainsafe/libp2p-yamux": "^7.0.0",
|
||||
"@libp2p/identify": "^3.0.33",
|
||||
"@libp2p/mdns": "^11.0.1",
|
||||
"@libp2p/ping": "^2.0.33",
|
||||
"@libp2p/tcp": "^10.0.0",
|
||||
"@libp2p/websockets": "^9.0.0",
|
||||
"@multiformats/multiaddr": "^12.3.1",
|
||||
"@nodeutils/defaults-deep": "^1.1.0",
|
||||
"it-length-prefixed": "^10.0.1",
|
||||
"it-map": "^3.0.3",
|
||||
"it-pipe": "^3.0.1",
|
||||
"libp2p": "^2.0.0",
|
||||
"p-defer": "^4.0.0",
|
||||
"uint8arrays": "^5.1.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"test-ipfs-example": "^1.1.0"
|
||||
},
|
||||
"private": true
|
||||
}
|
||||
204
tests/interop/js_libp2p/js_node/src/ping.js
Normal file
204
tests/interop/js_libp2p/js_node/src/ping.js
Normal file
@ -0,0 +1,204 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import { createLibp2p } from 'libp2p'
|
||||
import { tcp } from '@libp2p/tcp'
|
||||
import { noise } from '@chainsafe/libp2p-noise'
|
||||
import { yamux } from '@chainsafe/libp2p-yamux'
|
||||
import { ping } from '@libp2p/ping'
|
||||
import { identify } from '@libp2p/identify'
|
||||
import { multiaddr } from '@multiformats/multiaddr'
|
||||
|
||||
async function createNode() {
|
||||
return await createLibp2p({
|
||||
addresses: {
|
||||
listen: ['/ip4/0.0.0.0/tcp/0']
|
||||
},
|
||||
transports: [
|
||||
tcp()
|
||||
],
|
||||
connectionEncrypters: [
|
||||
noise()
|
||||
],
|
||||
streamMuxers: [
|
||||
yamux()
|
||||
],
|
||||
services: {
|
||||
// Use ipfs prefix to match py-libp2p example
|
||||
ping: ping({
|
||||
protocolPrefix: 'ipfs',
|
||||
maxInboundStreams: 32,
|
||||
maxOutboundStreams: 64,
|
||||
timeout: 30000
|
||||
}),
|
||||
identify: identify()
|
||||
},
|
||||
connectionManager: {
|
||||
minConnections: 0,
|
||||
maxConnections: 100,
|
||||
dialTimeout: 30000
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async function runServer() {
|
||||
console.log('🚀 Starting js-libp2p ping server...')
|
||||
|
||||
const node = await createNode()
|
||||
await node.start()
|
||||
|
||||
console.log('✅ Server started!')
|
||||
console.log(`📋 Peer ID: ${node.peerId.toString()}`)
|
||||
console.log('📍 Listening addresses:')
|
||||
|
||||
node.getMultiaddrs().forEach(addr => {
|
||||
console.log(` ${addr.toString()}`)
|
||||
})
|
||||
|
||||
// Listen for connections
|
||||
node.addEventListener('peer:connect', (evt) => {
|
||||
console.log(`🔗 Peer connected: ${evt.detail.toString()}`)
|
||||
})
|
||||
|
||||
node.addEventListener('peer:disconnect', (evt) => {
|
||||
console.log(`❌ Peer disconnected: ${evt.detail.toString()}`)
|
||||
})
|
||||
|
||||
console.log('\n🎧 Server ready for ping requests...')
|
||||
console.log('Press Ctrl+C to exit')
|
||||
|
||||
// Graceful shutdown
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('\n🛑 Shutting down...')
|
||||
await node.stop()
|
||||
process.exit(0)
|
||||
})
|
||||
|
||||
// Keep alive
|
||||
while (true) {
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
}
|
||||
}
|
||||
|
||||
async function runClient(targetAddr, count = 5) {
|
||||
console.log('🚀 Starting js-libp2p ping client...')
|
||||
|
||||
const node = await createNode()
|
||||
await node.start()
|
||||
|
||||
console.log(`📋 Our Peer ID: ${node.peerId.toString()}`)
|
||||
console.log(`🎯 Target: ${targetAddr}`)
|
||||
|
||||
try {
|
||||
const ma = multiaddr(targetAddr)
|
||||
const targetPeerId = ma.getPeerId()
|
||||
|
||||
if (!targetPeerId) {
|
||||
throw new Error('Could not extract peer ID from multiaddr')
|
||||
}
|
||||
|
||||
console.log(`🎯 Target Peer ID: ${targetPeerId}`)
|
||||
console.log('🔗 Connecting to peer...')
|
||||
|
||||
const connection = await node.dial(ma)
|
||||
console.log('✅ Connection established!')
|
||||
console.log(`🔗 Connected to: ${connection.remotePeer.toString()}`)
|
||||
|
||||
// Add a small delay to let the connection fully establish
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
|
||||
const rtts = []
|
||||
|
||||
for (let i = 1; i <= count; i++) {
|
||||
try {
|
||||
console.log(`\n🏓 Sending ping ${i}/${count}...`);
|
||||
console.log('[DEBUG] Attempting to open ping stream with protocol: /ipfs/ping/1.0.0');
|
||||
const start = Date.now()
|
||||
|
||||
const stream = await connection.newStream(['/ipfs/ping/1.0.0']).catch(err => {
|
||||
console.error(`[ERROR] Failed to open ping stream: ${err.message}`);
|
||||
throw err;
|
||||
});
|
||||
console.log('[DEBUG] Ping stream opened successfully');
|
||||
|
||||
const latency = await Promise.race([
|
||||
node.services.ping.ping(connection.remotePeer),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Ping timeout')), 30000) // Increased timeout
|
||||
)
|
||||
]).catch(err => {
|
||||
console.error(`[ERROR] Ping ${i} error: ${err.message}`);
|
||||
throw err;
|
||||
});
|
||||
|
||||
const rtt = Date.now() - start;
|
||||
|
||||
rtts.push(latency)
|
||||
console.log(`✅ Ping ${i} successful!`)
|
||||
console.log(` Reported latency: ${latency}ms`)
|
||||
console.log(` Measured RTT: ${rtt}ms`)
|
||||
|
||||
if (i < count) {
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`❌ Ping ${i} failed:`, error.message)
|
||||
// Try to continue with other pings
|
||||
}
|
||||
}
|
||||
|
||||
// Stats
|
||||
if (rtts.length > 0) {
|
||||
const avg = rtts.reduce((a, b) => a + b, 0) / rtts.length
|
||||
const min = Math.min(...rtts)
|
||||
const max = Math.max(...rtts)
|
||||
|
||||
console.log(`\n📊 Ping Statistics:`)
|
||||
console.log(` Packets: Sent=${count}, Received=${rtts.length}, Lost=${count - rtts.length}`)
|
||||
console.log(` Latency: min=${min}ms, avg=${avg.toFixed(2)}ms, max=${max}ms`)
|
||||
} else {
|
||||
console.log(`\n📊 All pings failed (${count} attempts)`)
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error('❌ Client error:', error.message)
|
||||
console.error('Stack:', error.stack)
|
||||
process.exit(1)
|
||||
} finally {
|
||||
await node.stop()
|
||||
console.log('\n⏹️ Client stopped')
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const args = process.argv.slice(2)
|
||||
|
||||
if (args.length === 0) {
|
||||
console.log('Usage:')
|
||||
console.log(' node ping.js server # Start ping server')
|
||||
console.log(' node ping.js client <multiaddr> [count] # Ping a peer')
|
||||
console.log('')
|
||||
console.log('Examples:')
|
||||
console.log(' node ping.js server')
|
||||
console.log(' node ping.js client /ip4/127.0.0.1/tcp/12345/p2p/12D3Ko... 5')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
const mode = args[0]
|
||||
|
||||
if (mode === 'server') {
|
||||
await runServer()
|
||||
} else if (mode === 'client') {
|
||||
if (args.length < 2) {
|
||||
console.error('❌ Client mode requires target multiaddr')
|
||||
process.exit(1)
|
||||
}
|
||||
const targetAddr = args[1]
|
||||
const count = parseInt(args[2]) || 5
|
||||
await runClient(targetAddr, count)
|
||||
} else {
|
||||
console.error('❌ Invalid mode. Use "server" or "client"')
|
||||
process.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
main().catch(console.error)
|
||||
241
tests/interop/js_libp2p/js_node/src/ping_client.js
Normal file
241
tests/interop/js_libp2p/js_node/src/ping_client.js
Normal file
@ -0,0 +1,241 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import { createLibp2p } from 'libp2p'
|
||||
import { tcp } from '@libp2p/tcp'
|
||||
import { noise } from '@chainsafe/libp2p-noise'
|
||||
import { yamux } from '@chainsafe/libp2p-yamux'
|
||||
import { ping } from '@libp2p/ping'
|
||||
import { identify } from '@libp2p/identify'
|
||||
import { multiaddr } from '@multiformats/multiaddr'
|
||||
import fs from 'fs'
|
||||
import path from 'path'
|
||||
|
||||
// Create logs directory if it doesn't exist
|
||||
const logsDir = path.join(process.cwd(), '../logs')
|
||||
if (!fs.existsSync(logsDir)) {
|
||||
fs.mkdirSync(logsDir, { recursive: true })
|
||||
}
|
||||
|
||||
// Setup logging
|
||||
const logFile = path.join(logsDir, 'js_ping_client.log')
|
||||
const logStream = fs.createWriteStream(logFile, { flags: 'w' })
|
||||
|
||||
function log(message) {
|
||||
const timestamp = new Date().toISOString()
|
||||
const logLine = `${timestamp} - ${message}\n`
|
||||
logStream.write(logLine)
|
||||
console.log(message)
|
||||
}
|
||||
|
||||
async function createNode() {
|
||||
log('🔧 Creating libp2p node...')
|
||||
|
||||
const node = await createLibp2p({
|
||||
addresses: {
|
||||
listen: ['/ip4/0.0.0.0/tcp/0'] // Random port
|
||||
},
|
||||
transports: [
|
||||
tcp()
|
||||
],
|
||||
connectionEncrypters: [
|
||||
noise()
|
||||
],
|
||||
streamMuxers: [
|
||||
yamux()
|
||||
],
|
||||
services: {
|
||||
ping: ping({
|
||||
protocolPrefix: 'ipfs', // Use ipfs prefix to match py-libp2p
|
||||
maxInboundStreams: 32,
|
||||
maxOutboundStreams: 64,
|
||||
timeout: 30000,
|
||||
runOnTransientConnection: true
|
||||
}),
|
||||
identify: identify()
|
||||
},
|
||||
connectionManager: {
|
||||
minConnections: 0,
|
||||
maxConnections: 100,
|
||||
dialTimeout: 30000,
|
||||
maxParallelDials: 10
|
||||
}
|
||||
})
|
||||
|
||||
log('✅ Node created successfully')
|
||||
return node
|
||||
}
|
||||
|
||||
async function runClient(targetAddr, count = 5) {
|
||||
log('🚀 Starting js-libp2p ping client...')
|
||||
|
||||
const node = await createNode()
|
||||
|
||||
// Add connection event listeners
|
||||
node.addEventListener('peer:connect', (evt) => {
|
||||
log(`🔗 Connected to peer: ${evt.detail.toString()}`)
|
||||
})
|
||||
|
||||
node.addEventListener('peer:disconnect', (evt) => {
|
||||
log(`❌ Disconnected from peer: ${evt.detail.toString()}`)
|
||||
})
|
||||
|
||||
await node.start()
|
||||
log('✅ Node started')
|
||||
|
||||
log(`📋 Our Peer ID: ${node.peerId.toString()}`)
|
||||
log(`🎯 Target: ${targetAddr}`)
|
||||
|
||||
try {
|
||||
const ma = multiaddr(targetAddr)
|
||||
const targetPeerId = ma.getPeerId()
|
||||
|
||||
if (!targetPeerId) {
|
||||
throw new Error('Could not extract peer ID from multiaddr')
|
||||
}
|
||||
|
||||
log(`🎯 Target Peer ID: ${targetPeerId}`)
|
||||
|
||||
// Parse multiaddr components for debugging
|
||||
const components = ma.toString().split('/')
|
||||
log(`📍 Target components: ${components.join(' → ')}`)
|
||||
|
||||
log('🔗 Attempting to dial peer...')
|
||||
const connection = await node.dial(ma)
|
||||
log('✅ Connection established!')
|
||||
log(`🔗 Connected to: ${connection.remotePeer.toString()}`)
|
||||
log(`🔗 Connection status: ${connection.status}`)
|
||||
log(`🔗 Connection direction: ${connection.direction}`)
|
||||
|
||||
// List available protocols
|
||||
if (connection.remoteAddr) {
|
||||
log(`🌐 Remote address: ${connection.remoteAddr.toString()}`)
|
||||
}
|
||||
|
||||
// Wait for connection to stabilize
|
||||
log('⏳ Waiting for connection to stabilize...')
|
||||
await new Promise(resolve => setTimeout(resolve, 2000))
|
||||
|
||||
// Attempt ping sequence
|
||||
log(`\n🏓 Starting ping sequence (${count} pings)...`)
|
||||
const rtts = []
|
||||
|
||||
for (let i = 1; i <= count; i++) {
|
||||
try {
|
||||
log(`\n🏓 Sending ping ${i}/${count}...`)
|
||||
const start = Date.now()
|
||||
|
||||
// Create a more robust ping with better error handling
|
||||
const pingPromise = node.services.ping.ping(connection.remotePeer)
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Ping timeout (15s)')), 15000)
|
||||
)
|
||||
|
||||
const latency = await Promise.race([pingPromise, timeoutPromise])
|
||||
const totalRtt = Date.now() - start
|
||||
|
||||
rtts.push(latency)
|
||||
log(`✅ Ping ${i} successful!`)
|
||||
log(` Reported latency: ${latency}ms`)
|
||||
log(` Total RTT: ${totalRtt}ms`)
|
||||
|
||||
// Wait between pings
|
||||
if (i < count) {
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
}
|
||||
} catch (error) {
|
||||
log(`❌ Ping ${i} failed: ${error.message}`)
|
||||
log(` Error type: ${error.constructor.name}`)
|
||||
if (error.code) {
|
||||
log(` Error code: ${error.code}`)
|
||||
}
|
||||
|
||||
// Check if connection is still alive
|
||||
if (connection.status !== 'open') {
|
||||
log(`⚠️ Connection status changed to: ${connection.status}`)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Print statistics
|
||||
if (rtts.length > 0) {
|
||||
const avg = rtts.reduce((a, b) => a + b, 0) / rtts.length
|
||||
const min = Math.min(...rtts)
|
||||
const max = Math.max(...rtts)
|
||||
const lossRate = ((count - rtts.length) / count * 100).toFixed(1)
|
||||
|
||||
log(`\n📊 Ping Statistics:`)
|
||||
log(` Packets: Sent=${count}, Received=${rtts.length}, Lost=${count - rtts.length}`)
|
||||
log(` Loss rate: ${lossRate}%`)
|
||||
log(` Latency: min=${min}ms, avg=${avg.toFixed(2)}ms, max=${max}ms`)
|
||||
} else {
|
||||
log(`\n📊 All pings failed (${count} attempts)`)
|
||||
}
|
||||
|
||||
// Close connection gracefully
|
||||
log('\n🔒 Closing connection...')
|
||||
await connection.close()
|
||||
|
||||
} catch (error) {
|
||||
log(`❌ Client error: ${error.message}`)
|
||||
log(` Error type: ${error.constructor.name}`)
|
||||
if (error.stack) {
|
||||
log(` Stack trace: ${error.stack}`)
|
||||
}
|
||||
process.exit(1)
|
||||
} finally {
|
||||
log('🛑 Stopping node...')
|
||||
await node.stop()
|
||||
log('⏹️ Client stopped')
|
||||
logStream.end()
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const args = process.argv.slice(2)
|
||||
|
||||
if (args.length === 0) {
|
||||
console.log('Usage:')
|
||||
console.log(' node ping-client.js <target-multiaddr> [count]')
|
||||
console.log('')
|
||||
console.log('Examples:')
|
||||
console.log(' node ping-client.js /ip4/127.0.0.1/tcp/8000/p2p/QmExample... 5')
|
||||
console.log(' node ping-client.js /ip4/127.0.0.1/tcp/8000/p2p/QmExample... 10')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
const targetAddr = args[0]
|
||||
const count = parseInt(args[1]) || 5
|
||||
|
||||
if (count <= 0 || count > 100) {
|
||||
console.error('❌ Count must be between 1 and 100')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
await runClient(targetAddr, count)
|
||||
}
|
||||
|
||||
// Handle graceful shutdown
|
||||
process.on('SIGINT', () => {
|
||||
log('\n👋 Shutting down...')
|
||||
logStream.end()
|
||||
process.exit(0)
|
||||
})
|
||||
|
||||
process.on('uncaughtException', (error) => {
|
||||
log(`💥 Uncaught exception: ${error.message}`)
|
||||
if (error.stack) {
|
||||
log(`Stack: ${error.stack}`)
|
||||
}
|
||||
logStream.end()
|
||||
process.exit(1)
|
||||
})
|
||||
|
||||
main().catch((error) => {
|
||||
log(`💥 Fatal error: ${error.message}`)
|
||||
if (error.stack) {
|
||||
log(`Stack: ${error.stack}`)
|
||||
}
|
||||
logStream.end()
|
||||
process.exit(1)
|
||||
})
|
||||
167
tests/interop/js_libp2p/js_node/src/ping_server.js
Normal file
167
tests/interop/js_libp2p/js_node/src/ping_server.js
Normal file
@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import { createLibp2p } from 'libp2p'
|
||||
import { tcp } from '@libp2p/tcp'
|
||||
import { noise } from '@chainsafe/libp2p-noise'
|
||||
import { yamux } from '@chainsafe/libp2p-yamux'
|
||||
import { ping } from '@libp2p/ping'
|
||||
import { identify } from '@libp2p/identify'
|
||||
import fs from 'fs'
|
||||
import path from 'path'
|
||||
|
||||
// Create logs directory if it doesn't exist
|
||||
const logsDir = path.join(process.cwd(), '../logs')
|
||||
if (!fs.existsSync(logsDir)) {
|
||||
fs.mkdirSync(logsDir, { recursive: true })
|
||||
}
|
||||
|
||||
// Setup logging
|
||||
const logFile = path.join(logsDir, 'js_ping_server.log')
|
||||
const logStream = fs.createWriteStream(logFile, { flags: 'w' })
|
||||
|
||||
function log(message) {
|
||||
const timestamp = new Date().toISOString()
|
||||
const logLine = `${timestamp} - ${message}\n`
|
||||
logStream.write(logLine)
|
||||
console.log(message)
|
||||
}
|
||||
|
||||
async function createNode(port) {
|
||||
log('🔧 Creating libp2p node...')
|
||||
|
||||
const node = await createLibp2p({
|
||||
addresses: {
|
||||
listen: [`/ip4/0.0.0.0/tcp/${port}`]
|
||||
},
|
||||
transports: [
|
||||
tcp()
|
||||
],
|
||||
connectionEncrypters: [
|
||||
noise()
|
||||
],
|
||||
streamMuxers: [
|
||||
yamux()
|
||||
],
|
||||
services: {
|
||||
ping: ping({
|
||||
protocolPrefix: 'ipfs', // Use ipfs prefix to match py-libp2p
|
||||
maxInboundStreams: 32,
|
||||
maxOutboundStreams: 64,
|
||||
timeout: 30000,
|
||||
runOnTransientConnection: true
|
||||
}),
|
||||
identify: identify()
|
||||
},
|
||||
connectionManager: {
|
||||
minConnections: 0,
|
||||
maxConnections: 100,
|
||||
dialTimeout: 30000,
|
||||
maxParallelDials: 10
|
||||
}
|
||||
})
|
||||
|
||||
log('✅ Node created successfully')
|
||||
return node
|
||||
}
|
||||
|
||||
async function runServer(port) {
|
||||
log('🚀 Starting js-libp2p ping server...')
|
||||
|
||||
const node = await createNode(port)
|
||||
|
||||
// Add connection event listeners
|
||||
node.addEventListener('peer:connect', (evt) => {
|
||||
log(`🔗 New peer connected: ${evt.detail.toString()}`)
|
||||
})
|
||||
|
||||
node.addEventListener('peer:disconnect', (evt) => {
|
||||
log(`❌ Peer disconnected: ${evt.detail.toString()}`)
|
||||
})
|
||||
|
||||
// Add protocol handler for incoming streams
|
||||
node.addEventListener('peer:identify', (evt) => {
|
||||
log(`🔍 Peer identified: ${evt.detail.peerId.toString()}`)
|
||||
log(` Protocols: ${evt.detail.protocols.join(', ')}`)
|
||||
log(` Listen addresses: ${evt.detail.listenAddrs.map(addr => addr.toString()).join(', ')}`)
|
||||
})
|
||||
|
||||
await node.start()
|
||||
log('✅ Node started')
|
||||
|
||||
const peerId = node.peerId.toString()
|
||||
const listenAddrs = node.getMultiaddrs()
|
||||
|
||||
log(`📋 Peer ID: ${peerId}`)
|
||||
log(`🌐 Listen addresses:`)
|
||||
listenAddrs.forEach(addr => {
|
||||
log(` ${addr.toString()}`)
|
||||
})
|
||||
|
||||
// Find the main TCP address for easy copy-paste
|
||||
const tcpAddr = listenAddrs.find(addr =>
|
||||
addr.toString().includes('/tcp/') &&
|
||||
!addr.toString().includes('/ws')
|
||||
)
|
||||
|
||||
if (tcpAddr) {
|
||||
log(`\n🧪 Test with py-libp2p:`)
|
||||
log(` python ping_client.py ${tcpAddr.toString()}`)
|
||||
log(`\n🧪 Test with js-libp2p:`)
|
||||
log(` node ping-client.js ${tcpAddr.toString()}`)
|
||||
}
|
||||
|
||||
log(`\n🏓 Ping service is running with protocol: /ipfs/ping/1.0.0`)
|
||||
log(`🔐 Security: Noise encryption`)
|
||||
log(`🚇 Muxer: Yamux stream multiplexing`)
|
||||
log(`\n⏳ Waiting for connections...`)
|
||||
log('Press Ctrl+C to exit')
|
||||
|
||||
// Keep the server running
|
||||
return new Promise((resolve, reject) => {
|
||||
process.on('SIGINT', () => {
|
||||
log('\n🛑 Shutting down server...')
|
||||
node.stop().then(() => {
|
||||
log('⏹️ Server stopped')
|
||||
logStream.end()
|
||||
resolve()
|
||||
}).catch(reject)
|
||||
})
|
||||
|
||||
process.on('uncaughtException', (error) => {
|
||||
log(`💥 Uncaught exception: ${error.message}`)
|
||||
if (error.stack) {
|
||||
log(`Stack: ${error.stack}`)
|
||||
}
|
||||
logStream.end()
|
||||
reject(error)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const args = process.argv.slice(2)
|
||||
const port = parseInt(args[0]) || 9000
|
||||
|
||||
if (port <= 0 || port > 65535) {
|
||||
console.error('❌ Port must be between 1 and 65535')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
try {
|
||||
await runServer(port)
|
||||
} catch (error) {
|
||||
console.error(`💥 Fatal error: ${error.message}`)
|
||||
if (error.stack) {
|
||||
console.error(`Stack: ${error.stack}`)
|
||||
}
|
||||
process.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((error) => {
|
||||
console.error(`💥 Fatal error: ${error.message}`)
|
||||
if (error.stack) {
|
||||
console.error(`Stack: ${error.stack}`)
|
||||
}
|
||||
process.exit(1)
|
||||
})
|
||||
194
tests/interop/js_libp2p/scripts/run_test.ps1
Normal file
194
tests/interop/js_libp2p/scripts/run_test.ps1
Normal file
@ -0,0 +1,194 @@
|
||||
#!/usr/bin/env pwsh
|
||||
|
||||
# run_test.ps1 - libp2p Interoperability Test Runner (PowerShell)
|
||||
# Tests py-libp2p <-> js-libp2p ping communication
|
||||
|
||||
$ErrorActionPreference = "Stop"
|
||||
|
||||
# Colors for output
|
||||
$Red = "`e[31m"
|
||||
$Green = "`e[32m"
|
||||
$Yellow = "`e[33m"
|
||||
$Blue = "`e[34m"
|
||||
$Cyan = "`e[36m"
|
||||
$Reset = "`e[0m"
|
||||
|
||||
function Write-ColorOutput {
|
||||
param([string]$Message, [string]$Color = $Reset)
|
||||
Write-Host "${Color}${Message}${Reset}"
|
||||
}
|
||||
|
||||
Write-ColorOutput "[CHECK] Checking prerequisites..." $Cyan
|
||||
if (-not (Get-Command python -ErrorAction SilentlyContinue)) {
|
||||
Write-ColorOutput "[ERROR] Python not found. Install Python 3.7+" $Red
|
||||
exit 1
|
||||
}
|
||||
if (-not (Get-Command node -ErrorAction SilentlyContinue)) {
|
||||
Write-ColorOutput "[ERROR] Node.js not found. Install Node.js 16+" $Red
|
||||
exit 1
|
||||
}
|
||||
|
||||
Write-ColorOutput "[CHECK] Checking port 8000..." $Blue
|
||||
$portCheck = netstat -a -n -o | findstr :8000
|
||||
if ($portCheck) {
|
||||
Write-ColorOutput "[ERROR] Port 8000 in use. Free the port." $Red
|
||||
Write-ColorOutput $portCheck $Yellow
|
||||
exit 1
|
||||
}
|
||||
|
||||
Write-ColorOutput "[DEBUG] Cleaning up Python processes..." $Blue
|
||||
Get-Process -Name "python" -ErrorAction SilentlyContinue | Where-Object { $_.CommandLine -like "*ping.py*" } | Stop-Process -Force -ErrorAction SilentlyContinue
|
||||
|
||||
Write-ColorOutput "[PYTHON] Starting server on port 8000..." $Yellow
|
||||
Set-Location -Path "py_node"
|
||||
$pyLogFile = "py_server_8000.log"
|
||||
$pyErrLogFile = "py_server_8000.log.err"
|
||||
$pyDebugLogFile = "ping_debug.log"
|
||||
|
||||
if (Test-Path $pyLogFile) { Remove-Item $pyLogFile -Force -ErrorAction SilentlyContinue }
|
||||
if (Test-Path $pyErrLogFile) { Remove-Item $pyErrLogFile -Force -ErrorAction SilentlyContinue }
|
||||
if (Test-Path $pyDebugLogFile) { Remove-Item $pyDebugLogFile -Force -ErrorAction SilentlyContinue }
|
||||
|
||||
$pyProcess = Start-Process -FilePath "python" -ArgumentList "-u", "ping.py", "server", "--port", "8000" -NoNewWindow -PassThru -RedirectStandardOutput $pyLogFile -RedirectStandardError $pyErrLogFile
|
||||
Write-ColorOutput "[DEBUG] Python server PID: $($pyProcess.Id)" $Blue
|
||||
Write-ColorOutput "[DEBUG] Python logs: $((Get-Location).Path)\$pyLogFile, $((Get-Location).Path)\$pyErrLogFile, $((Get-Location).Path)\$pyDebugLogFile" $Blue
|
||||
|
||||
$timeoutSeconds = 20
|
||||
$startTime = Get-Date
|
||||
$serverStarted = $false
|
||||
|
||||
while (((Get-Date) - $startTime).TotalSeconds -lt $timeoutSeconds -and -not $serverStarted) {
|
||||
if (Test-Path $pyLogFile) {
|
||||
$content = Get-Content $pyLogFile -Raw -ErrorAction SilentlyContinue
|
||||
if ($content -match "Server started|Listening") {
|
||||
$serverStarted = $true
|
||||
Write-ColorOutput "[OK] Python server started" $Green
|
||||
}
|
||||
}
|
||||
if (Test-Path $pyErrLogFile) {
|
||||
$errContent = Get-Content $pyErrLogFile -Raw -ErrorAction SilentlyContinue
|
||||
if ($errContent) {
|
||||
Write-ColorOutput "[DEBUG] Error log: $errContent" $Yellow
|
||||
}
|
||||
}
|
||||
Start-Sleep -Milliseconds 500
|
||||
}
|
||||
|
||||
if (-not $serverStarted) {
|
||||
Write-ColorOutput "[ERROR] Python server failed to start" $Red
|
||||
Write-ColorOutput "[DEBUG] Logs:" $Yellow
|
||||
if (Test-Path $pyLogFile) { Get-Content $pyLogFile | Write-ColorOutput -Color $Yellow }
|
||||
if (Test-Path $pyErrLogFile) { Get-Content $pyErrLogFile | Write-ColorOutput -Color $Yellow }
|
||||
if (Test-Path $pyDebugLogFile) { Get-Content $pyDebugLogFile | Write-ColorOutput -Color $Yellow }
|
||||
Write-ColorOutput "[DEBUG] Trying foreground run..." $Yellow
|
||||
python -u ping.py server --port 8000
|
||||
exit 1
|
||||
}
|
||||
|
||||
# Extract Peer ID
|
||||
$peerInfo = $null
|
||||
if (Test-Path $pyLogFile) {
|
||||
$content = Get-Content $pyLogFile -Raw
|
||||
$peerIdPattern = "Peer ID:\s*([A-Za-z0-9]+)"
|
||||
$peerIdMatch = [regex]::Match($content, $peerIdPattern)
|
||||
if ($peerIdMatch.Success) {
|
||||
$peerId = $peerIdMatch.Groups[1].Value
|
||||
$peerInfo = @{
|
||||
PeerId = $peerId
|
||||
MultiAddr = "/ip4/127.0.0.1/tcp/8000/p2p/$peerId"
|
||||
}
|
||||
Write-ColorOutput "[OK] Peer ID: $peerId" $Cyan
|
||||
Write-ColorOutput "[OK] MultiAddr: $($peerInfo.MultiAddr)" $Cyan
|
||||
}
|
||||
}
|
||||
|
||||
if (-not $peerInfo) {
|
||||
Write-ColorOutput "[ERROR] Could not extract Peer ID" $Red
|
||||
if (Test-Path $pyLogFile) { Get-Content $pyLogFile | Write-ColorOutput -Color $Yellow }
|
||||
if (Test-Path $pyErrLogFile) { Get-Content $pyErrLogFile | Write-ColorOutput -Color $Yellow }
|
||||
if (Test-Path $pyDebugLogFile) { Get-Content $pyDebugLogFile | Write-ColorOutput -Color $Yellow }
|
||||
Stop-Process -Id $pyProcess.Id -Force -ErrorAction SilentlyContinue
|
||||
exit 1
|
||||
}
|
||||
|
||||
# Start JavaScript client
|
||||
Write-ColorOutput "[JAVASCRIPT] Starting client..." $Yellow
|
||||
Set-Location -Path "../js_node"
|
||||
$jsLogFile = "test_js_client_to_py_server.log"
|
||||
$jsErrLogFile = "test_js_client_to_py_server.log.err"
|
||||
|
||||
if (Test-Path $jsLogFile) { Remove-Item $jsLogFile -Force -ErrorAction SilentlyContinue }
|
||||
if (Test-Path $jsErrLogFile) { Remove-Item $jsErrLogFile -Force -ErrorAction SilentlyContinue }
|
||||
|
||||
$jsProcess = Start-Process -FilePath "node" -ArgumentList "src/ping.js", "client", $peerInfo.MultiAddr, "3" -NoNewWindow -PassThru -RedirectStandardOutput $jsLogFile -RedirectStandardError $jsErrLogFile
|
||||
Write-ColorOutput "[DEBUG] JavaScript client PID: $($jsProcess.Id)" $Blue
|
||||
Write-ColorOutput "[DEBUG] Client logs: $((Get-Location).Path)\$jsLogFile, $((Get-Location).Path)\$jsErrLogFile" $Blue
|
||||
|
||||
# Wait for client to complete
|
||||
$clientTimeout = 10
|
||||
$clientStart = Get-Date
|
||||
while (-not $jsProcess.HasExited -and (((Get-Date) - $clientStart).TotalSeconds -lt $clientTimeout)) {
|
||||
Start-Sleep -Seconds 1
|
||||
}
|
||||
|
||||
if (-not $jsProcess.HasExited) {
|
||||
Write-ColorOutput "[DEBUG] JavaScript client did not exit, terminating..." $Yellow
|
||||
Stop-Process -Id $jsProcess.Id -Force -ErrorAction SilentlyContinue
|
||||
}
|
||||
|
||||
Write-ColorOutput "[CHECK] Results..." $Cyan
|
||||
$success = $false
|
||||
if (Test-Path $jsLogFile) {
|
||||
$jsLogContent = Get-Content $jsLogFile -Raw -ErrorAction SilentlyContinue
|
||||
if ($jsLogContent -match "successful|Ping.*successful") {
|
||||
$success = $true
|
||||
Write-ColorOutput "[SUCCESS] Ping test passed" $Green
|
||||
} else {
|
||||
Write-ColorOutput "[FAILED] No successful pings" $Red
|
||||
Write-ColorOutput "[DEBUG] Client log path: $((Get-Location).Path)\$jsLogFile" $Yellow
|
||||
Write-ColorOutput "Client log:" $Yellow
|
||||
Write-ColorOutput $jsLogContent $Yellow
|
||||
if (Test-Path $jsErrLogFile) {
|
||||
Write-ColorOutput "[DEBUG] Client error log path: $((Get-Location).Path)\$jsErrLogFile" $Yellow
|
||||
Write-ColorOutput "Client error log:" $Yellow
|
||||
Get-Content $jsErrLogFile | Write-ColorOutput -Color $Yellow
|
||||
}
|
||||
Write-ColorOutput "[DEBUG] Python server log path: $((Get-Location).Path)\..\py_node\$pyLogFile" $Yellow
|
||||
Write-ColorOutput "Python server log:" $Yellow
|
||||
if (Test-Path "../py_node/$pyLogFile") {
|
||||
$pyLogContent = Get-Content "../py_node/$pyLogFile" -Raw -ErrorAction SilentlyContinue
|
||||
if ($pyLogContent) { Write-ColorOutput $pyLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow }
|
||||
} else {
|
||||
Write-ColorOutput "File not found" $Yellow
|
||||
}
|
||||
Write-ColorOutput "[DEBUG] Python server error log path: $((Get-Location).Path)\..\py_node\$pyErrLogFile" $Yellow
|
||||
Write-ColorOutput "Python server error log:" $Yellow
|
||||
if (Test-Path "../py_node/$pyErrLogFile") {
|
||||
$pyErrLogContent = Get-Content "../py_node/$pyErrLogFile" -Raw -ErrorAction SilentlyContinue
|
||||
if ($pyErrLogContent) { Write-ColorOutput $pyErrLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow }
|
||||
} else {
|
||||
Write-ColorOutput "File not found" $Yellow
|
||||
}
|
||||
Write-ColorOutput "[DEBUG] Python debug log path: $((Get-Location).Path)\..\py_node\$pyDebugLogFile" $Yellow
|
||||
Write-ColorOutput "Python debug log:" $Yellow
|
||||
if (Test-Path "../py_node/$pyDebugLogFile") {
|
||||
$pyDebugLogContent = Get-Content "../py_node/$pyDebugLogFile" -Raw -ErrorAction SilentlyContinue
|
||||
if ($pyDebugLogContent) { Write-ColorOutput $pyDebugLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow }
|
||||
} else {
|
||||
Write-ColorOutput "File not found" $Yellow
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Write-ColorOutput "[CLEANUP] Stopping processes..." $Yellow
|
||||
Stop-Process -Id $pyProcess.Id -Force -ErrorAction SilentlyContinue
|
||||
Stop-Process -Id $jsProcess.Id -Force -ErrorAction SilentlyContinue
|
||||
Set-Location -Path "../"
|
||||
|
||||
if ($success) {
|
||||
Write-ColorOutput "[SUCCESS] Test completed" $Green
|
||||
exit 0
|
||||
} else {
|
||||
Write-ColorOutput "[FAILED] Test failed" $Red
|
||||
exit 1
|
||||
}
|
||||
215
tests/interop/js_libp2p/scripts/run_test.sh
Normal file
215
tests/interop/js_libp2p/scripts/run_test.sh
Normal file
@ -0,0 +1,215 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# run_test.sh - libp2p Interoperability Test Runner (Bash)
|
||||
# Tests py-libp2p <-> js-libp2p ping communication
|
||||
|
||||
set -e
|
||||
|
||||
# Colors for output
|
||||
RED='\033[31m'
|
||||
GREEN='\033[32m'
|
||||
YELLOW='\033[33m'
|
||||
BLUE='\033[34m'
|
||||
CYAN='\033[36m'
|
||||
RESET='\033[0m'
|
||||
|
||||
write_color_output() {
|
||||
local message="$1"
|
||||
local color="${2:-$RESET}"
|
||||
echo -e "${color}${message}${RESET}"
|
||||
}
|
||||
|
||||
write_color_output "[CHECK] Checking prerequisites..." "$CYAN"
|
||||
if ! command -v python3 &> /dev/null && ! command -v python &> /dev/null; then
|
||||
write_color_output "[ERROR] Python not found. Install Python 3.7+" "$RED"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Use python3 if available, otherwise python
|
||||
PYTHON_CMD="python3"
|
||||
if ! command -v python3 &> /dev/null; then
|
||||
PYTHON_CMD="python"
|
||||
fi
|
||||
|
||||
if ! command -v node &> /dev/null; then
|
||||
write_color_output "[ERROR] Node.js not found. Install Node.js 16+" "$RED"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
write_color_output "[CHECK] Checking port 8000..." "$BLUE"
|
||||
if netstat -tuln 2>/dev/null | grep -q ":8000 " || ss -tuln 2>/dev/null | grep -q ":8000 "; then
|
||||
write_color_output "[ERROR] Port 8000 in use. Free the port." "$RED"
|
||||
if command -v netstat &> /dev/null; then
|
||||
netstat -tuln | grep ":8000 " | write_color_output "$(cat)" "$YELLOW"
|
||||
elif command -v ss &> /dev/null; then
|
||||
ss -tuln | grep ":8000 " | write_color_output "$(cat)" "$YELLOW"
|
||||
fi
|
||||
exit 1
|
||||
fi
|
||||
|
||||
write_color_output "[DEBUG] Cleaning up Python processes..." "$BLUE"
|
||||
pkill -f "ping.py" 2>/dev/null || true
|
||||
|
||||
write_color_output "[PYTHON] Starting server on port 8000..." "$YELLOW"
|
||||
cd py_node
|
||||
|
||||
PY_LOG_FILE="py_server_8000.log"
|
||||
PY_ERR_LOG_FILE="py_server_8000.log.err"
|
||||
PY_DEBUG_LOG_FILE="ping_debug.log"
|
||||
|
||||
rm -f "$PY_LOG_FILE" "$PY_ERR_LOG_FILE" "$PY_DEBUG_LOG_FILE"
|
||||
|
||||
$PYTHON_CMD -u ping.py server --port 8000 > "$PY_LOG_FILE" 2> "$PY_ERR_LOG_FILE" &
|
||||
PY_PROCESS_PID=$!
|
||||
|
||||
write_color_output "[DEBUG] Python server PID: $PY_PROCESS_PID" "$BLUE"
|
||||
write_color_output "[DEBUG] Python logs: $(pwd)/$PY_LOG_FILE, $(pwd)/$PY_ERR_LOG_FILE, $(pwd)/$PY_DEBUG_LOG_FILE" "$BLUE"
|
||||
|
||||
TIMEOUT_SECONDS=20
|
||||
START_TIME=$(date +%s)
|
||||
SERVER_STARTED=false
|
||||
|
||||
while [ $(($(date +%s) - START_TIME)) -lt $TIMEOUT_SECONDS ] && [ "$SERVER_STARTED" = false ]; do
|
||||
if [ -f "$PY_LOG_FILE" ]; then
|
||||
if grep -q "Server started\|Listening" "$PY_LOG_FILE" 2>/dev/null; then
|
||||
SERVER_STARTED=true
|
||||
write_color_output "[OK] Python server started" "$GREEN"
|
||||
fi
|
||||
fi
|
||||
if [ -f "$PY_ERR_LOG_FILE" ] && [ -s "$PY_ERR_LOG_FILE" ]; then
|
||||
ERR_CONTENT=$(cat "$PY_ERR_LOG_FILE" 2>/dev/null || true)
|
||||
if [ -n "$ERR_CONTENT" ]; then
|
||||
write_color_output "[DEBUG] Error log: $ERR_CONTENT" "$YELLOW"
|
||||
fi
|
||||
fi
|
||||
sleep 0.5
|
||||
done
|
||||
|
||||
if [ "$SERVER_STARTED" = false ]; then
|
||||
write_color_output "[ERROR] Python server failed to start" "$RED"
|
||||
write_color_output "[DEBUG] Logs:" "$YELLOW"
|
||||
[ -f "$PY_LOG_FILE" ] && cat "$PY_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
|
||||
[ -f "$PY_ERR_LOG_FILE" ] && cat "$PY_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
|
||||
[ -f "$PY_DEBUG_LOG_FILE" ] && cat "$PY_DEBUG_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
|
||||
write_color_output "[DEBUG] Trying foreground run..." "$YELLOW"
|
||||
$PYTHON_CMD -u ping.py server --port 8000
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Extract Peer ID
|
||||
PEER_ID=""
|
||||
MULTI_ADDR=""
|
||||
if [ -f "$PY_LOG_FILE" ]; then
|
||||
CONTENT=$(cat "$PY_LOG_FILE" 2>/dev/null || true)
|
||||
PEER_ID=$(echo "$CONTENT" | grep -oP "Peer ID:\s*\K[A-Za-z0-9]+" || true)
|
||||
if [ -n "$PEER_ID" ]; then
|
||||
MULTI_ADDR="/ip4/127.0.0.1/tcp/8000/p2p/$PEER_ID"
|
||||
write_color_output "[OK] Peer ID: $PEER_ID" "$CYAN"
|
||||
write_color_output "[OK] MultiAddr: $MULTI_ADDR" "$CYAN"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$PEER_ID" ]; then
|
||||
write_color_output "[ERROR] Could not extract Peer ID" "$RED"
|
||||
[ -f "$PY_LOG_FILE" ] && cat "$PY_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
|
||||
[ -f "$PY_ERR_LOG_FILE" ] && cat "$PY_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
|
||||
[ -f "$PY_DEBUG_LOG_FILE" ] && cat "$PY_DEBUG_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
|
||||
kill $PY_PROCESS_PID 2>/dev/null || true
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Start JavaScript client
|
||||
write_color_output "[JAVASCRIPT] Starting client..." "$YELLOW"
|
||||
cd ../js_node
|
||||
|
||||
JS_LOG_FILE="test_js_client_to_py_server.log"
|
||||
JS_ERR_LOG_FILE="test_js_client_to_py_server.log.err"
|
||||
|
||||
rm -f "$JS_LOG_FILE" "$JS_ERR_LOG_FILE"
|
||||
|
||||
node src/ping.js client "$MULTI_ADDR" 3 > "$JS_LOG_FILE" 2> "$JS_ERR_LOG_FILE" &
|
||||
JS_PROCESS_PID=$!
|
||||
|
||||
write_color_output "[DEBUG] JavaScript client PID: $JS_PROCESS_PID" "$BLUE"
|
||||
write_color_output "[DEBUG] Client logs: $(pwd)/$JS_LOG_FILE, $(pwd)/$JS_ERR_LOG_FILE" "$BLUE"
|
||||
|
||||
# Wait for client to complete
|
||||
CLIENT_TIMEOUT=10
|
||||
CLIENT_START=$(date +%s)
|
||||
while kill -0 $JS_PROCESS_PID 2>/dev/null && [ $(($(date +%s) - CLIENT_START)) -lt $CLIENT_TIMEOUT ]; do
|
||||
sleep 1
|
||||
done
|
||||
|
||||
if kill -0 $JS_PROCESS_PID 2>/dev/null; then
|
||||
write_color_output "[DEBUG] JavaScript client did not exit, terminating..." "$YELLOW"
|
||||
kill $JS_PROCESS_PID 2>/dev/null || true
|
||||
fi
|
||||
|
||||
write_color_output "[CHECK] Results..." "$CYAN"
|
||||
SUCCESS=false
|
||||
if [ -f "$JS_LOG_FILE" ]; then
|
||||
JS_LOG_CONTENT=$(cat "$JS_LOG_FILE" 2>/dev/null || true)
|
||||
if echo "$JS_LOG_CONTENT" | grep -q "successful\|Ping.*successful"; then
|
||||
SUCCESS=true
|
||||
write_color_output "[SUCCESS] Ping test passed" "$GREEN"
|
||||
else
|
||||
write_color_output "[FAILED] No successful pings" "$RED"
|
||||
write_color_output "[DEBUG] Client log path: $(pwd)/$JS_LOG_FILE" "$YELLOW"
|
||||
write_color_output "Client log:" "$YELLOW"
|
||||
write_color_output "$JS_LOG_CONTENT" "$YELLOW"
|
||||
if [ -f "$JS_ERR_LOG_FILE" ]; then
|
||||
write_color_output "[DEBUG] Client error log path: $(pwd)/$JS_ERR_LOG_FILE" "$YELLOW"
|
||||
write_color_output "Client error log:" "$YELLOW"
|
||||
cat "$JS_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
|
||||
fi
|
||||
write_color_output "[DEBUG] Python server log path: $(pwd)/../py_node/$PY_LOG_FILE" "$YELLOW"
|
||||
write_color_output "Python server log:" "$YELLOW"
|
||||
if [ -f "../py_node/$PY_LOG_FILE" ]; then
|
||||
PY_LOG_CONTENT=$(cat "../py_node/$PY_LOG_FILE" 2>/dev/null || true)
|
||||
if [ -n "$PY_LOG_CONTENT" ]; then
|
||||
write_color_output "$PY_LOG_CONTENT" "$YELLOW"
|
||||
else
|
||||
write_color_output "Empty or inaccessible" "$YELLOW"
|
||||
fi
|
||||
else
|
||||
write_color_output "File not found" "$YELLOW"
|
||||
fi
|
||||
write_color_output "[DEBUG] Python server error log path: $(pwd)/../py_node/$PY_ERR_LOG_FILE" "$YELLOW"
|
||||
write_color_output "Python server error log:" "$YELLOW"
|
||||
if [ -f "../py_node/$PY_ERR_LOG_FILE" ]; then
|
||||
PY_ERR_LOG_CONTENT=$(cat "../py_node/$PY_ERR_LOG_FILE" 2>/dev/null || true)
|
||||
if [ -n "$PY_ERR_LOG_CONTENT" ]; then
|
||||
write_color_output "$PY_ERR_LOG_CONTENT" "$YELLOW"
|
||||
else
|
||||
write_color_output "Empty or inaccessible" "$YELLOW"
|
||||
fi
|
||||
else
|
||||
write_color_output "File not found" "$YELLOW"
|
||||
fi
|
||||
write_color_output "[DEBUG] Python debug log path: $(pwd)/../py_node/$PY_DEBUG_LOG_FILE" "$YELLOW"
|
||||
write_color_output "Python debug log:" "$YELLOW"
|
||||
if [ -f "../py_node/$PY_DEBUG_LOG_FILE" ]; then
|
||||
PY_DEBUG_LOG_CONTENT=$(cat "../py_node/$PY_DEBUG_LOG_FILE" 2>/dev/null || true)
|
||||
if [ -n "$PY_DEBUG_LOG_CONTENT" ]; then
|
||||
write_color_output "$PY_DEBUG_LOG_CONTENT" "$YELLOW"
|
||||
else
|
||||
write_color_output "Empty or inaccessible" "$YELLOW"
|
||||
fi
|
||||
else
|
||||
write_color_output "File not found" "$YELLOW"
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
write_color_output "[CLEANUP] Stopping processes..." "$YELLOW"
|
||||
kill $PY_PROCESS_PID 2>/dev/null || true
|
||||
kill $JS_PROCESS_PID 2>/dev/null || true
|
||||
cd ../
|
||||
|
||||
if [ "$SUCCESS" = true ]; then
|
||||
write_color_output "[SUCCESS] Test completed" "$GREEN"
|
||||
exit 0
|
||||
else
|
||||
write_color_output "[FAILED] Test failed" "$RED"
|
||||
exit 1
|
||||
fi
|
||||
@ -1,5 +0,0 @@
|
||||
def test_js_libp2p_placeholder():
|
||||
"""
|
||||
Placeholder test for js-libp2p interop tests.
|
||||
"""
|
||||
assert True, "Placeholder test for js-libp2p interop tests"
|
||||
Reference in New Issue
Block a user