5 Commits

59 changed files with 171 additions and 3023 deletions

View File

@ -3,65 +3,6 @@ Release Notes
.. towncrier release notes start .. towncrier release notes start
py-libp2p v0.2.9 (2025-07-09)
-----------------------------
Breaking Changes
~~~~~~~~~~~~~~~~
- Reordered the arguments to ``upgrade_security`` to place ``is_initiator`` before ``peer_id``, and made ``peer_id`` optional.
This allows the method to reflect the fact that peer identity is not required for inbound connections. (`#681 <https://github.com/libp2p/py-libp2p/issues/681>`__)
Bugfixes
~~~~~~~~
- Add timeout wrappers in:
1. ``multiselect.py``: ``negotiate`` function
2. ``multiselect_client.py``: ``select_one_of`` , ``query_multistream_command`` functions
to prevent indefinite hangs when a remote peer does not respond. (`#696 <https://github.com/libp2p/py-libp2p/issues/696>`__)
- Align stream creation logic with yamux specification (`#701 <https://github.com/libp2p/py-libp2p/issues/701>`__)
- Fixed an issue in ``Pubsub`` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior. (`#702 <https://github.com/libp2p/py-libp2p/issues/702>`__)
Features
~~~~~~~~
- Added support for ``Kademlia DHT`` in py-libp2p. (`#579 <https://github.com/libp2p/py-libp2p/issues/579>`__)
- Limit concurrency in ``push_identify_to_peers`` to prevent resource congestion under high peer counts. (`#621 <https://github.com/libp2p/py-libp2p/issues/621>`__)
- Store public key and peer ID in peerstore during handshake
Modified the InsecureTransport class to accept an optional peerstore parameter and updated the handshake process to store the received public key and peer ID in the peerstore when available.
Added test cases to verify:
1. The peerstore remains unchanged when handshake fails due to peer ID mismatch
2. The handshake correctly adds a public key to a peer ID that already exists in the peerstore but doesn't have a public key yet (`#631 <https://github.com/libp2p/py-libp2p/issues/631>`__)
- Fixed several flow-control and concurrency issues in the ``YamuxStream`` class. Previously, stress-testing revealed that transferring data over ``DEFAULT_WINDOW_SIZE`` would break the stream due to inconsistent window update handling and lock management. The fixes include:
- Removed sending of window updates during writes to maintain correct flow-control.
- Added proper timeout handling when releasing and acquiring locks to prevent concurrency errors.
- Corrected the ``read`` function to properly handle window updates for both ``read_until_EOF`` and ``read_n_bytes``.
- Added event logging at ``send_window_updates`` and ``waiting_for_window_updates`` for better observability. (`#639 <https://github.com/libp2p/py-libp2p/issues/639>`__)
- Added support for ``Multicast DNS`` in py-libp2p (`#649 <https://github.com/libp2p/py-libp2p/issues/649>`__)
- Optimized pubsub publishing to send multiple topics in a single message instead of separate messages per topic. (`#685 <https://github.com/libp2p/py-libp2p/issues/685>`__)
- Optimized pubsub message writing by implementing a write_msg() method that uses pre-allocated buffers and single write operations, improving performance by eliminating separate varint prefix encoding and write operations in FloodSub and GossipSub. (`#687 <https://github.com/libp2p/py-libp2p/issues/687>`__)
- Added peer exchange and backoff logic as part of Gossipsub v1.1 upgrade (`#690 <https://github.com/libp2p/py-libp2p/issues/690>`__)
Internal Changes - for py-libp2p Contributors
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Added sparse connect utility function to pubsub test utilities for creating test networks with configurable connectivity. (`#679 <https://github.com/libp2p/py-libp2p/issues/679>`__)
- Added comprehensive tests for pubsub connection utility functions to verify degree limits are enforced, excess peers are handled correctly, and edge cases (degree=0, negative values, empty lists) are managed gracefully. (`#707 <https://github.com/libp2p/py-libp2p/issues/707>`__)
- Added extra tests for identify push concurrency cap under high peer load (`#708 <https://github.com/libp2p/py-libp2p/issues/708>`__)
Miscellaneous Changes
~~~~~~~~~~~~~~~~~~~~~
- `#678 <https://github.com/libp2p/py-libp2p/issues/678>`__, `#684 <https://github.com/libp2p/py-libp2p/issues/684>`__
py-libp2p v0.2.8 (2025-06-10) py-libp2p v0.2.8 (2025-06-10)
----------------------------- -----------------------------

View File

@ -8,10 +8,9 @@ import trio
from libp2p import ( from libp2p import (
new_host, new_host,
) )
from libp2p.identity.identify.identify import ( from libp2p.identity.identify.identify import ID as IDENTIFY_PROTOCOL_ID
ID as IDENTIFY_PROTOCOL_ID, from libp2p.identity.identify.pb.identify_pb2 import (
identify_handler_for, Identify,
parse_identify_response,
) )
from libp2p.peer.peerinfo import ( from libp2p.peer.peerinfo import (
info_from_p2p_addr, info_from_p2p_addr,
@ -51,7 +50,7 @@ def print_identify_response(identify_response):
) )
async def run(port: int, destination: str, use_varint_format: bool = True) -> None: async def run(port: int, destination: str) -> None:
localhost_ip = "0.0.0.0" localhost_ip = "0.0.0.0"
if not destination: if not destination:
@ -59,24 +58,11 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}") listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
host_a = new_host() host_a = new_host()
# Set up identify handler with specified format
identify_handler = identify_handler_for(
host_a, use_varint_format=use_varint_format
)
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
async with host_a.run(listen_addrs=[listen_addr]): async with host_a.run(listen_addrs=[listen_addr]):
# Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
# connections
server_addr = str(host_a.get_addrs()[0])
client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/")
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
print( print(
f"First host listening (using {format_name} format). " "First host listening. Run this from another console:\n\n"
f"Run this from another console:\n\n"
f"identify-demo " f"identify-demo "
f"-d {client_addr}\n" f"-d {host_a.get_addrs()[0]}\n"
) )
print("Waiting for incoming identify request...") print("Waiting for incoming identify request...")
await trio.sleep_forever() await trio.sleep_forever()
@ -98,18 +84,11 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
try: try:
print("Starting identify protocol...") print("Starting identify protocol...")
response = await stream.read()
# Read the complete response (could be either format)
# Read a larger chunk to get all the data before stream closes
response = await stream.read(8192) # Read enough data in one go
await stream.close() await stream.close()
identify_msg = Identify()
# Parse the response using the robust protocol-level function identify_msg.ParseFromString(response)
# This handles both old and new formats automatically
identify_msg = parse_identify_response(response)
print_identify_response(identify_msg) print_identify_response(identify_msg)
except Exception as e: except Exception as e:
print(f"Identify protocol error: {e}") print(f"Identify protocol error: {e}")
@ -119,12 +98,9 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
def main() -> None: def main() -> None:
description = """ description = """
This program demonstrates the libp2p identify protocol. This program demonstrates the libp2p identify protocol.
First run 'identify-demo -p <PORT> [--raw-format]' to start a listener. First run identify-demo -p <PORT>' to start a listener.
Then run 'identify-demo <ANOTHER_PORT> -d <DESTINATION>' Then run 'identify-demo <ANOTHER_PORT> -d <DESTINATION>'
where <DESTINATION> is the multiaddress shown by the listener. where <DESTINATION> is the multiaddress shown by the listener.
Use --raw-format to send raw protobuf messages (old format) instead of
length-prefixed protobuf messages (new format, default).
""" """
example_maddr = ( example_maddr = (
@ -139,22 +115,10 @@ def main() -> None:
type=str, type=str,
help=f"destination multiaddr string, e.g. {example_maddr}", help=f"destination multiaddr string, e.g. {example_maddr}",
) )
parser.add_argument(
"--raw-format",
action="store_true",
help=(
"use raw protobuf format (old format) instead of "
"length-prefixed (new format)"
),
)
args = parser.parse_args() args = parser.parse_args()
# Determine format: raw format if --raw-format is specified, otherwise
# length-prefixed
use_varint_format = not args.raw_format
try: try:
trio.run(run, *(args.port, args.destination, use_varint_format)) trio.run(run, *(args.port, args.destination))
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@ -57,56 +57,18 @@ from libp2p.peer.peerinfo import (
logger = logging.getLogger("libp2p.identity.identify-push-example") logger = logging.getLogger("libp2p.identity.identify-push-example")
def custom_identify_push_handler_for(host, use_varint_format: bool = True): def custom_identify_push_handler_for(host):
""" """
Create a custom handler for the identify/push protocol that logs and prints Create a custom handler for the identify/push protocol that logs and prints
the identity information received from the dialer. the identity information received from the dialer.
Args:
host: The libp2p host
use_varint_format: If True, expect length-prefixed format; if False, expect
raw protobuf
""" """
async def handle_identify_push(stream: INetStream) -> None: async def handle_identify_push(stream: INetStream) -> None:
peer_id = stream.muxed_conn.peer_id peer_id = stream.muxed_conn.peer_id
try: try:
if use_varint_format: # Read the identify message from the stream
# Read length-prefixed identify message from the stream data = await stream.read()
from libp2p.utils.varint import decode_varint_from_bytes
# First read the varint length prefix
length_bytes = b""
while True:
b = await stream.read(1)
if not b:
break
length_bytes += b
if b[0] & 0x80 == 0:
break
if not length_bytes:
logger.warning("No length prefix received from peer %s", peer_id)
return
msg_length = decode_varint_from_bytes(length_bytes)
# Read the protobuf message
data = await stream.read(msg_length)
if len(data) != msg_length:
logger.warning("Incomplete message received from peer %s", peer_id)
return
else:
# Read raw protobuf message from the stream
data = b""
while True:
chunk = await stream.read(4096)
if not chunk:
break
data += chunk
identify_msg = Identify() identify_msg = Identify()
identify_msg.ParseFromString(data) identify_msg.ParseFromString(data)
@ -167,13 +129,9 @@ def custom_identify_push_handler_for(host, use_varint_format: bool = True):
return handle_identify_push return handle_identify_push
async def run_listener(port: int, use_varint_format: bool = True) -> None: async def run_listener(port: int) -> None:
"""Run a host in listener mode.""" """Run a host in listener mode."""
format_name = "length-prefixed" if use_varint_format else "raw protobuf" print(f"\n==== Starting Identify-Push Listener on port {port} ====\n")
print(
f"\n==== Starting Identify-Push Listener on port {port} "
f"(using {format_name} format) ====\n"
)
# Create key pair for the listener # Create key pair for the listener
key_pair = create_new_key_pair() key_pair = create_new_key_pair()
@ -181,14 +139,9 @@ async def run_listener(port: int, use_varint_format: bool = True) -> None:
# Create the listener host # Create the listener host
host = new_host(key_pair=key_pair) host = new_host(key_pair=key_pair)
# Set up the identify and identify/push handlers with specified format # Set up the identify and identify/push handlers
host.set_stream_handler( host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host))
ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format) host.set_stream_handler(ID_IDENTIFY_PUSH, custom_identify_push_handler_for(host))
)
host.set_stream_handler(
ID_IDENTIFY_PUSH,
identify_push_handler_for(host, use_varint_format=use_varint_format),
)
# Start listening # Start listening
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
@ -212,15 +165,9 @@ async def run_listener(port: int, use_varint_format: bool = True) -> None:
await trio.sleep_forever() await trio.sleep_forever()
async def run_dialer( async def run_dialer(port: int, destination: str) -> None:
port: int, destination: str, use_varint_format: bool = True
) -> None:
"""Run a host in dialer mode that connects to a listener.""" """Run a host in dialer mode that connects to a listener."""
format_name = "length-prefixed" if use_varint_format else "raw protobuf" print(f"\n==== Starting Identify-Push Dialer on port {port} ====\n")
print(
f"\n==== Starting Identify-Push Dialer on port {port} "
f"(using {format_name} format) ====\n"
)
# Create key pair for the dialer # Create key pair for the dialer
key_pair = create_new_key_pair() key_pair = create_new_key_pair()
@ -228,14 +175,9 @@ async def run_dialer(
# Create the dialer host # Create the dialer host
host = new_host(key_pair=key_pair) host = new_host(key_pair=key_pair)
# Set up the identify and identify/push handlers with specified format # Set up the identify and identify/push handlers
host.set_stream_handler( host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host))
ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format) host.set_stream_handler(ID_IDENTIFY_PUSH, identify_push_handler_for(host))
)
host.set_stream_handler(
ID_IDENTIFY_PUSH,
identify_push_handler_for(host, use_varint_format=use_varint_format),
)
# Start listening on a different port # Start listening on a different port
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
@ -264,9 +206,7 @@ async def run_dialer(
try: try:
# Call push_identify_to_peer which returns a boolean # Call push_identify_to_peer which returns a boolean
success = await push_identify_to_peer( success = await push_identify_to_peer(host, peer_info.peer_id)
host, peer_info.peer_id, use_varint_format=use_varint_format
)
if success: if success:
logger.info("Identify push completed successfully!") logger.info("Identify push completed successfully!")
@ -300,40 +240,29 @@ def main() -> None:
This program demonstrates the libp2p identify/push protocol. This program demonstrates the libp2p identify/push protocol.
Without arguments, it runs as a listener on random port. Without arguments, it runs as a listener on random port.
With -d parameter, it runs as a dialer on random port. With -d parameter, it runs as a dialer on random port.
Use --raw-format to send raw protobuf messages (old format) instead of
length-prefixed protobuf messages (new format, default).
""" """
example = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description) parser = argparse.ArgumentParser(description=description)
parser.add_argument("-p", "--port", default=0, type=int, help="source port number") parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
parser.add_argument( parser.add_argument(
"-d", "-d",
"--destination", "--destination",
type=str, type=str,
help="destination multiaddr string", help=f"destination multiaddr string, e.g. {example}",
)
parser.add_argument(
"--raw-format",
action="store_true",
help=(
"use raw protobuf format (old format) instead of "
"length-prefixed (new format)"
),
) )
args = parser.parse_args() args = parser.parse_args()
# Determine format: raw format if --raw-format is specified, otherwise
# length-prefixed
use_varint_format = not args.raw_format
try: try:
if args.destination: if args.destination:
# Run in dialer mode with random available port if not specified # Run in dialer mode with random available port if not specified
trio.run(run_dialer, args.port, args.destination, use_varint_format) trio.run(run_dialer, args.port, args.destination)
else: else:
# Run in listener mode with random available port if not specified # Run in listener mode with random available port if not specified
trio.run(run_listener, args.port, use_varint_format) trio.run(run_listener, args.port)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nInterrupted by user") print("\nInterrupted by user")
logger.info("Interrupted by user") logger.info("Interrupted by user")

View File

@ -50,11 +50,6 @@ if TYPE_CHECKING:
Pubsub, Pubsub,
) )
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.pubsub.pb import ( from libp2p.pubsub.pb import (
rpc_pb2, rpc_pb2,
) )
@ -1550,8 +1545,9 @@ class IHost(ABC):
""" """
# FIXME: Replace with correct return type
@abstractmethod @abstractmethod
def get_mux(self) -> "Multiselect": def get_mux(self) -> Any:
""" """
Retrieve the muxer instance for the host. Retrieve the muxer instance for the host.
@ -2162,7 +2158,6 @@ class IMultiselectMuxer(ABC):
""" """
@abstractmethod
def get_protocols(self) -> tuple[TProtocol | None, ...]: def get_protocols(self) -> tuple[TProtocol | None, ...]:
""" """
Retrieve the protocols for which handlers have been registered. Retrieve the protocols for which handlers have been registered.
@ -2173,6 +2168,7 @@ class IMultiselectMuxer(ABC):
A tuple of registered protocol names. A tuple of registered protocol names.
""" """
return tuple(self.handlers.keys())
@abstractmethod @abstractmethod
async def negotiate( async def negotiate(

View File

@ -26,8 +26,5 @@ if TYPE_CHECKING:
def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]": def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]":
return OrderedDict( return OrderedDict(
( ((IdentifyID, identify_handler_for(host)), (PingID, handle_ping))
(IdentifyID, identify_handler_for(host, use_varint_format=True)),
(PingID, handle_ping),
)
) )

View File

@ -16,9 +16,7 @@ from libp2p.network.stream.exceptions import (
StreamClosed, StreamClosed,
) )
from libp2p.utils import ( from libp2p.utils import (
decode_varint_with_size,
get_agent_version, get_agent_version,
varint,
) )
from .pb.identify_pb2 import ( from .pb.identify_pb2 import (
@ -61,7 +59,7 @@ def _mk_identify_protobuf(
) -> Identify: ) -> Identify:
public_key = host.get_public_key() public_key = host.get_public_key()
laddrs = host.get_addrs() laddrs = host.get_addrs()
protocols = tuple(str(p) for p in host.get_mux().get_protocols() if p is not None) protocols = host.get_mux().get_protocols()
observed_addr = observed_multiaddr.to_bytes() if observed_multiaddr else b"" observed_addr = observed_multiaddr.to_bytes() if observed_multiaddr else b""
return Identify( return Identify(
@ -74,47 +72,7 @@ def _mk_identify_protobuf(
) )
def parse_identify_response(response: bytes) -> Identify: def identify_handler_for(host: IHost) -> StreamHandlerFn:
"""
Parse identify response that could be either:
- Old format: raw protobuf
- New format: length-prefixed protobuf
This function provides backward and forward compatibility.
"""
# Try new format first: length-prefixed protobuf
if len(response) >= 1:
length, varint_size = decode_varint_with_size(response)
if varint_size > 0 and length > 0 and varint_size + length <= len(response):
protobuf_data = response[varint_size : varint_size + length]
try:
identify_response = Identify()
identify_response.ParseFromString(protobuf_data)
# Sanity check: must have agent_version (protocol_version is optional)
if identify_response.agent_version:
logger.debug(
"Parsed length-prefixed identify response (new format)"
)
return identify_response
except Exception:
pass # Fall through to old format
# Fall back to old format: raw protobuf
try:
identify_response = Identify()
identify_response.ParseFromString(response)
logger.debug("Parsed raw protobuf identify response (old format)")
return identify_response
except Exception as e:
logger.error(f"Failed to parse identify response: {e}")
logger.error(f"Response length: {len(response)}")
logger.error(f"Response hex: {response.hex()}")
raise
def identify_handler_for(
host: IHost, use_varint_format: bool = False
) -> StreamHandlerFn:
async def handle_identify(stream: INetStream) -> None: async def handle_identify(stream: INetStream) -> None:
# get observed address from ``stream`` # get observed address from ``stream``
peer_id = ( peer_id = (
@ -142,21 +100,7 @@ def identify_handler_for(
response = protobuf.SerializeToString() response = protobuf.SerializeToString()
try: try:
if use_varint_format: await stream.write(response)
# Send length-prefixed protobuf message (new format)
await stream.write(varint.encode_uvarint(len(response)))
await stream.write(response)
logger.debug(
"Sent new format (length-prefixed) identify response to %s",
peer_id,
)
else:
# Send raw protobuf message (old format for backward compatibility)
await stream.write(response)
logger.debug(
"Sent old format (raw protobuf) identify response to %s",
peer_id,
)
except StreamClosed: except StreamClosed:
logger.debug("Fail to respond to %s request: stream closed", ID) logger.debug("Fail to respond to %s request: stream closed", ID)
else: else:

View File

@ -25,10 +25,6 @@ from libp2p.peer.id import (
) )
from libp2p.utils import ( from libp2p.utils import (
get_agent_version, get_agent_version,
varint,
)
from libp2p.utils.varint import (
decode_varint_from_bytes,
) )
from ..identify.identify import ( from ..identify.identify import (
@ -47,69 +43,20 @@ AGENT_VERSION = get_agent_version()
CONCURRENCY_LIMIT = 10 CONCURRENCY_LIMIT = 10
def identify_push_handler_for( def identify_push_handler_for(host: IHost) -> StreamHandlerFn:
host: IHost, use_varint_format: bool = True
) -> StreamHandlerFn:
""" """
Create a handler for the identify/push protocol. Create a handler for the identify/push protocol.
This handler receives pushed identify messages from remote peers and updates This handler receives pushed identify messages from remote peers and updates
the local peerstore with the new information. the local peerstore with the new information.
Args:
host: The libp2p host.
use_varint_format: True=length-prefixed, False=raw protobuf.
""" """
async def handle_identify_push(stream: INetStream) -> None: async def handle_identify_push(stream: INetStream) -> None:
peer_id = stream.muxed_conn.peer_id peer_id = stream.muxed_conn.peer_id
try: try:
if use_varint_format: # Read the identify message from the stream
# Read length-prefixed identify message from the stream data = await stream.read()
# First read the varint length prefix
length_bytes = b""
while True:
b = await stream.read(1)
if not b:
break
length_bytes += b
if b[0] & 0x80 == 0:
break
if not length_bytes:
logger.warning("No length prefix received from peer %s", peer_id)
return
msg_length = decode_varint_from_bytes(length_bytes)
# Read the protobuf message
data = await stream.read(msg_length)
if len(data) != msg_length:
logger.warning("Incomplete message received from peer %s", peer_id)
return
else:
# Read raw protobuf message from the stream
# For raw format, we need to read all data before the stream is closed
data = b""
try:
# Read all available data in a single operation
data = await stream.read()
except StreamClosed:
# Try to read any remaining data
try:
data = await stream.read()
except Exception:
pass
# If we got no data, log a warning and return
if not data:
logger.warning(
"No data received in raw format from peer %s", peer_id
)
return
identify_msg = Identify() identify_msg = Identify()
identify_msg.ParseFromString(data) identify_msg.ParseFromString(data)
@ -190,7 +137,6 @@ async def push_identify_to_peer(
peer_id: ID, peer_id: ID,
observed_multiaddr: Multiaddr | None = None, observed_multiaddr: Multiaddr | None = None,
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT), limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
use_varint_format: bool = True,
) -> bool: ) -> bool:
""" """
Push an identify message to a specific peer. Push an identify message to a specific peer.
@ -198,15 +144,10 @@ async def push_identify_to_peer(
This function opens a stream to the peer using the identify/push protocol, This function opens a stream to the peer using the identify/push protocol,
sends the identify message, and closes the stream. sends the identify message, and closes the stream.
Args: Returns
host: The libp2p host. -------
peer_id: The peer ID to push to. bool
observed_multiaddr: The observed multiaddress (optional). True if the push was successful, False otherwise.
limit: Semaphore for concurrency control.
use_varint_format: True=length-prefixed, False=raw protobuf.
Returns:
bool: True if the push was successful, False otherwise.
""" """
async with limit: async with limit:
@ -218,13 +159,8 @@ async def push_identify_to_peer(
identify_msg = _mk_identify_protobuf(host, observed_multiaddr) identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
response = identify_msg.SerializeToString() response = identify_msg.SerializeToString()
if use_varint_format: # Send the identify message
# Send length-prefixed identify message await stream.write(response)
await stream.write(varint.encode_uvarint(len(response)))
await stream.write(response)
else:
# Send raw protobuf message
await stream.write(response)
# Close the stream # Close the stream
await stream.close() await stream.close()
@ -240,36 +176,18 @@ async def push_identify_to_peers(
host: IHost, host: IHost,
peer_ids: set[ID] | None = None, peer_ids: set[ID] | None = None,
observed_multiaddr: Multiaddr | None = None, observed_multiaddr: Multiaddr | None = None,
use_varint_format: bool = True,
) -> None: ) -> None:
""" """
Push an identify message to multiple peers in parallel. Push an identify message to multiple peers in parallel.
If peer_ids is None, push to all connected peers. If peer_ids is None, push to all connected peers.
Args:
host: The libp2p host.
peer_ids: Set of peer IDs to push to (if None, push to all connected peers).
observed_multiaddr: The observed multiaddress (optional).
use_varint_format: True=length-prefixed, False=raw protobuf.
""" """
if peer_ids is None: if peer_ids is None:
# Get all connected peers # Get all connected peers
peer_ids = set(host.get_connected_peers()) peer_ids = set(host.get_connected_peers())
# Create a single shared semaphore for concurrency control
limit = trio.Semaphore(CONCURRENCY_LIMIT)
# Push to each peer in parallel using a trio.Nursery # Push to each peer in parallel using a trio.Nursery
# limiting concurrent connections to CONCURRENCY_LIMIT # limiting concurrent connections to 10
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for peer_id in peer_ids: for peer_id in peer_ids:
nursery.start_soon( nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr)
push_identify_to_peer,
host,
peer_id,
observed_multiaddr,
limit,
use_varint_format,
)

View File

@ -3,11 +3,9 @@ from collections.abc import (
) )
from typing import ( from typing import (
Any, Any,
cast,
) )
import multiaddr import multiaddr
from multiaddr.protocols import Protocol
from .id import ( from .id import (
ID, ID,
@ -44,8 +42,7 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
p2p_protocols = p2p_part.protocols() p2p_protocols = p2p_part.protocols()
if not p2p_protocols: if not p2p_protocols:
raise InvalidAddrError("The last part of the address has no protocols") raise InvalidAddrError("The last part of the address has no protocols")
last_protocol = cast(Protocol, p2p_part.protocols()[0]) last_protocol = p2p_protocols[0]
if last_protocol is None: if last_protocol is None:
raise InvalidAddrError("The last protocol is None") raise InvalidAddrError("The last protocol is None")

View File

@ -64,11 +64,7 @@ class PeerStore(IPeerStore):
return list(self.peer_data_map.keys()) return list(self.peer_data_map.keys())
def clear_peerdata(self, peer_id: ID) -> None: def clear_peerdata(self, peer_id: ID) -> None:
"""Clears all data associated with the given peer_id.""" """Clears the peer data of the peer"""
if peer_id in self.peer_data_map:
del self.peer_data_map[peer_id]
else:
raise PeerStoreError("peer ID not found")
def valid_peer_ids(self) -> list[ID]: def valid_peer_ids(self) -> list[ID]:
""" """

View File

@ -101,18 +101,6 @@ class Multiselect(IMultiselectMuxer):
except trio.TooSlowError: except trio.TooSlowError:
raise MultiselectError("handshake read timeout") raise MultiselectError("handshake read timeout")
def get_protocols(self) -> tuple[TProtocol | None, ...]:
"""
Retrieve the protocols for which handlers have been registered.
Returns
-------
tuple[TProtocol, ...]
A tuple of registered protocol names.
"""
return tuple(self.handlers.keys())
async def handshake(self, communicator: IMultiselectCommunicator) -> None: async def handshake(self, communicator: IMultiselectCommunicator) -> None:
""" """
Perform handshake to agree on multiselect protocol. Perform handshake to agree on multiselect protocol.

View File

@ -11,6 +11,10 @@ import functools
import hashlib import hashlib
import logging import logging
import time import time
from typing import (
NamedTuple,
cast,
)
import base58 import base58
import trio import trio
@ -26,6 +30,8 @@ from libp2p.crypto.keys import (
PrivateKey, PrivateKey,
) )
from libp2p.custom_types import ( from libp2p.custom_types import (
AsyncValidatorFn,
SyncValidatorFn,
TProtocol, TProtocol,
ValidatorFn, ValidatorFn,
) )
@ -71,11 +77,6 @@ from .pubsub_notifee import (
from .subscription import ( from .subscription import (
TrioSubscriptionAPI, TrioSubscriptionAPI,
) )
from .validation_throttler import (
TopicValidator,
ValidationResult,
ValidationThrottler,
)
from .validators import ( from .validators import (
PUBSUB_SIGNING_PREFIX, PUBSUB_SIGNING_PREFIX,
signature_validator, signature_validator,
@ -96,6 +97,11 @@ def get_content_addressed_msg_id(msg: rpc_pb2.Message) -> bytes:
return base64.b64encode(hashlib.sha256(msg.data).digest()) return base64.b64encode(hashlib.sha256(msg.data).digest())
class TopicValidator(NamedTuple):
validator: ValidatorFn
is_async: bool
class Pubsub(Service, IPubsub): class Pubsub(Service, IPubsub):
host: IHost host: IHost
@ -137,11 +143,6 @@ class Pubsub(Service, IPubsub):
msg_id_constructor: Callable[ msg_id_constructor: Callable[
[rpc_pb2.Message], bytes [rpc_pb2.Message], bytes
] = get_peer_and_seqno_msg_id, ] = get_peer_and_seqno_msg_id,
# TODO: these values have been copied from Go, but try to tune these dynamically
validation_queue_size: int = 32,
global_throttle_limit: int = 8192,
default_topic_throttle_limit: int = 1024,
validation_worker_count: int | None = None,
) -> None: ) -> None:
""" """
Construct a new Pubsub object, which is responsible for handling all Construct a new Pubsub object, which is responsible for handling all
@ -202,15 +203,7 @@ class Pubsub(Service, IPubsub):
# Create peers map, which maps peer_id (as string) to stream (to a given peer) # Create peers map, which maps peer_id (as string) to stream (to a given peer)
self.peers = {} self.peers = {}
# Validation Throttler # Map of topic to topic validator
self.validation_throttler = ValidationThrottler(
queue_size=validation_queue_size,
global_throttle_limit=global_throttle_limit,
default_topic_throttle_limit=default_topic_throttle_limit,
worker_count=validation_worker_count or 4,
)
# Keep a mapping of topic -> TopicValidator for easier lookup
self.topic_validators = {} self.topic_validators = {}
self.counter = int(time.time()) self.counter = int(time.time())
@ -222,19 +215,10 @@ class Pubsub(Service, IPubsub):
self.event_handle_dead_peer_queue_started = trio.Event() self.event_handle_dead_peer_queue_started = trio.Event()
async def run(self) -> None: async def run(self) -> None:
self.manager.run_daemon_task(self._start_validation_throttler)
self.manager.run_daemon_task(self.handle_peer_queue) self.manager.run_daemon_task(self.handle_peer_queue)
self.manager.run_daemon_task(self.handle_dead_peer_queue) self.manager.run_daemon_task(self.handle_dead_peer_queue)
await self.manager.wait_finished() await self.manager.wait_finished()
async def _start_validation_throttler(self) -> None:
"""Start validation throttler in current nursery context"""
async with trio.open_nursery() as nursery:
await self.validation_throttler.start(nursery)
# Keep nursery alive until service stops
while self.manager.is_running:
await self.manager.wait_finished()
@property @property
def my_id(self) -> ID: def my_id(self) -> ID:
return self.host.get_id() return self.host.get_id()
@ -314,12 +298,7 @@ class Pubsub(Service, IPubsub):
) )
def set_topic_validator( def set_topic_validator(
self, self, topic: str, validator: ValidatorFn, is_async_validator: bool
topic: str,
validator: ValidatorFn,
is_async_validator: bool,
timeout: float | None = None,
throttle_limit: int | None = None,
) -> None: ) -> None:
""" """
Register a validator under the given topic. One topic can only have one Register a validator under the given topic. One topic can only have one
@ -328,18 +307,8 @@ class Pubsub(Service, IPubsub):
:param topic: the topic to register validator under :param topic: the topic to register validator under
:param validator: the validator used to validate messages published to the topic :param validator: the validator used to validate messages published to the topic
:param is_async_validator: indicate if the validator is an asynchronous validator :param is_async_validator: indicate if the validator is an asynchronous validator
:param timeout: optional timeout for the validator
:param throttle_limit: optional throttle limit for the validator
""" # noqa: E501 """ # noqa: E501
# Create throttled topic validator self.topic_validators[topic] = TopicValidator(validator, is_async_validator)
topic_validator = self.validation_throttler.create_topic_validator(
topic=topic,
validator=validator,
is_async=is_async_validator,
timeout=timeout,
throttle_limit=throttle_limit,
)
self.topic_validators[topic] = topic_validator
def remove_topic_validator(self, topic: str) -> None: def remove_topic_validator(self, topic: str) -> None:
""" """
@ -349,18 +318,17 @@ class Pubsub(Service, IPubsub):
""" """
self.topic_validators.pop(topic, None) self.topic_validators.pop(topic, None)
def get_msg_validators(self, msg: rpc_pb2.Message) -> list[TopicValidator]: def get_msg_validators(self, msg: rpc_pb2.Message) -> tuple[TopicValidator, ...]:
""" """
Get all validators corresponding to the topics in the message. Get all validators corresponding to the topics in the message.
:param msg: the message published to the topic :param msg: the message published to the topic
:return: list of topic validators for the message's topics
""" """
return [ return tuple(
self.topic_validators[topic] self.topic_validators[topic]
for topic in msg.topicIDs for topic in msg.topicIDs
if topic in self.topic_validators if topic in self.topic_validators
] )
def add_to_blacklist(self, peer_id: ID) -> None: def add_to_blacklist(self, peer_id: ID) -> None:
""" """
@ -696,56 +664,38 @@ class Pubsub(Service, IPubsub):
:param msg_forwarder: the peer who forward us the message. :param msg_forwarder: the peer who forward us the message.
:param msg: the message. :param msg: the message.
""" """
# Get applicable validators for this message sync_topic_validators: list[SyncValidatorFn] = []
validators = self.get_msg_validators(msg) async_topic_validators: list[AsyncValidatorFn] = []
for topic_validator in self.get_msg_validators(msg):
if topic_validator.is_async:
async_topic_validators.append(
cast(AsyncValidatorFn, topic_validator.validator)
)
else:
sync_topic_validators.append(
cast(SyncValidatorFn, topic_validator.validator)
)
if not validators: for validator in sync_topic_validators:
# No validators, accept immediately if not validator(msg_forwarder, msg):
return raise ValidationError(f"Validation failed for msg={msg}")
# Use trio.Event for async coordination # TODO: Implement throttle on async validators
validation_event = trio.Event()
result_container: dict[str, ValidationResult | None | Exception] = {
"result": None,
"error": None,
}
def handle_validation_result( if len(async_topic_validators) > 0:
result: ValidationResult, error: Exception | None # Appends to lists are thread safe in CPython
) -> None: results = []
result_container["result"] = result
result_container["error"] = error
validation_event.set()
# Submit for throttled validation async def run_async_validator(func: AsyncValidatorFn) -> None:
success = await self.validation_throttler.submit_validation( result = await func(msg_forwarder, msg)
validators=validators, results.append(result)
msg_forwarder=msg_forwarder,
msg=msg,
result_callback=handle_validation_result,
)
if not success: async with trio.open_nursery() as nursery:
# Validation was throttled at queue level for async_validator in async_topic_validators:
raise ValidationError("Validation throttled at queue level") nursery.start_soon(run_async_validator, async_validator)
# Wait for validation result if not all(results):
await validation_event.wait() raise ValidationError(f"Validation failed for msg={msg}")
result = result_container["result"]
error = result_container["error"]
if error:
raise ValidationError(f"Validation error: {error}")
if result == ValidationResult.REJECT:
raise ValidationError("Message validation rejected")
elif result == ValidationResult.THROTTLED:
raise ValidationError("Message validation throttled")
elif result == ValidationResult.IGNORE:
# Treat IGNORE as rejection for now, or you could silently drop
raise ValidationError("Message validation ignored")
# ACCEPT case - just return normally
async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
""" """

View File

@ -1,314 +0,0 @@
from collections.abc import (
Callable,
)
from dataclasses import dataclass
from enum import Enum
import logging
from typing import (
NamedTuple,
cast,
)
import trio
from libp2p.custom_types import AsyncValidatorFn, ValidatorFn
from libp2p.peer.id import (
ID,
)
from .pb import (
rpc_pb2,
)
logger = logging.getLogger("libp2p.pubsub.validation")
class ValidationResult(Enum):
ACCEPT = "accept"
REJECT = "reject"
IGNORE = "ignore"
THROTTLED = "throttled"
@dataclass
class ValidationRequest:
"""Request for message validation"""
validators: list["TopicValidator"]
msg_forwarder: ID # peer ID
msg: rpc_pb2.Message # message object
result_callback: Callable[[ValidationResult, Exception | None], None]
class TopicValidator(NamedTuple):
topic: str
validator: ValidatorFn
is_async: bool
timeout: float | None = None
# Per-topic throttle semaphore
throttle_semaphore: trio.Semaphore | None = None
class ValidationThrottler:
"""Manages all validation throttling mechanisms"""
def __init__(
self,
queue_size: int = 32,
global_throttle_limit: int = 8192,
default_topic_throttle_limit: int = 1024,
worker_count: int | None = None,
):
# 1. Queue-level throttling - bounded memory channel
self._validation_send, self._validation_receive = trio.open_memory_channel[
ValidationRequest
](queue_size)
# 2. Global validation throttling - limits total concurrent async validations
self._global_throttle = trio.Semaphore(global_throttle_limit)
# 3. Per-topic throttling - each validator gets its own semaphore
self._default_topic_throttle_limit = default_topic_throttle_limit
# Worker management
# TODO: Find a better way to manage worker count
self._worker_count = worker_count or 4
self._running = False
async def start(self, nursery: trio.Nursery) -> None:
"""Start the validation workers"""
self._running = True
# Start validation worker tasks
for i in range(self._worker_count):
nursery.start_soon(self._validation_worker, f"worker-{i}")
async def stop(self) -> None:
"""Stop the validation system"""
self._running = False
await self._validation_send.aclose()
def create_topic_validator(
self,
topic: str,
validator: ValidatorFn,
is_async: bool,
timeout: float | None = None,
throttle_limit: int | None = None,
) -> TopicValidator:
"""Create a new topic validator with its own throttle"""
limit = throttle_limit or self._default_topic_throttle_limit
throttle_sem = trio.Semaphore(limit)
return TopicValidator(
topic=topic,
validator=validator,
is_async=is_async,
timeout=timeout,
throttle_semaphore=throttle_sem,
)
async def submit_validation(
self,
validators: list[TopicValidator],
msg_forwarder: ID,
msg: rpc_pb2.Message,
result_callback: Callable[[ValidationResult, Exception | None], None],
) -> bool:
"""
Submit a message for validation.
Returns True if queued successfully, False if queue is full (throttled).
"""
if not self._running:
result_callback(
ValidationResult.REJECT, Exception("Validation system not running")
)
return False
request = ValidationRequest(
validators=validators,
msg_forwarder=msg_forwarder,
msg=msg,
result_callback=result_callback,
)
try:
# This will raise trio.WouldBlock if queue is full
self._validation_send.send_nowait(request)
return True
except trio.WouldBlock:
# Queue-level throttling: drop the message
logger.debug(
"Validation queue full, dropping message from %s", msg_forwarder
)
result_callback(
ValidationResult.THROTTLED, Exception("Validation queue full")
)
return False
async def _validation_worker(self, worker_id: str) -> None:
"""Worker that processes validation requests"""
logger.debug("Validation worker %s started", worker_id)
async with self._validation_receive:
async for request in self._validation_receive:
if not self._running:
break
try:
# Process the validation request
result = await self._validate_message(request)
request.result_callback(result, None)
except Exception as e:
logger.exception("Error in validation worker %s", worker_id)
request.result_callback(ValidationResult.REJECT, e)
logger.debug("Validation worker %s stopped", worker_id)
async def _validate_message(self, request: ValidationRequest) -> ValidationResult:
"""Core validation logic with throttling"""
validators = request.validators
msg_forwarder = request.msg_forwarder
msg = request.msg
if not validators:
return ValidationResult.ACCEPT
# Separate sync and async validators
sync_validators = [v for v in validators if not v.is_async]
async_validators = [v for v in validators if v.is_async]
# Run synchronous validators first
for validator in sync_validators:
try:
# Apply per-topic throttling even for sync validators
if validator.throttle_semaphore:
validator.throttle_semaphore.acquire_nowait()
try:
result = validator.validator(msg_forwarder, msg)
if not result:
return ValidationResult.REJECT
finally:
validator.throttle_semaphore.release()
else:
result = validator.validator(msg_forwarder, msg)
if not result:
return ValidationResult.REJECT
except trio.WouldBlock:
# Per-topic throttling for sync validator
logger.debug("Sync validation throttled for topic %s", validator.topic)
return ValidationResult.THROTTLED
except Exception as e:
logger.exception(
"Sync validator failed for topic %s: %s", validator.topic, e
)
return ValidationResult.REJECT
# Handle async validators with global + per-topic throttling
if async_validators:
return await self._validate_async_validators(
async_validators, msg_forwarder, msg
)
return ValidationResult.ACCEPT
async def _validate_async_validators(
self, validators: list[TopicValidator], msg_forwarder: ID, msg: rpc_pb2.Message
) -> ValidationResult:
"""Handle async validators with proper throttling"""
if len(validators) == 1:
# Fast path for single validator
return await self._validate_single_async_validator(
validators[0], msg_forwarder, msg
)
# Multiple async validators - run them concurrently
try:
# Try to acquire global throttle slot
self._global_throttle.acquire_nowait()
except trio.WouldBlock:
logger.debug(
"Global validation throttle exceeded, dropping message from %s",
msg_forwarder,
)
return ValidationResult.THROTTLED
try:
async with trio.open_nursery() as nursery:
results = {}
async def run_validator(validator: TopicValidator, index: int) -> None:
"""Run a single async validator and store the result"""
nonlocal results
result = await self._validate_single_async_validator(
validator, msg_forwarder, msg
)
results[index] = result
# Start all validators concurrently
for i, validator in enumerate(validators):
nursery.start_soon(run_validator, validator, i)
# Process results - any reject or throttle causes overall failure
final_result = ValidationResult.ACCEPT
for result in results.values():
if result == ValidationResult.REJECT:
return ValidationResult.REJECT
elif result == ValidationResult.THROTTLED:
final_result = ValidationResult.THROTTLED
elif (
result == ValidationResult.IGNORE
and final_result == ValidationResult.ACCEPT
):
final_result = ValidationResult.IGNORE
return final_result
finally:
self._global_throttle.release()
return ValidationResult.IGNORE
async def _validate_single_async_validator(
self, validator: TopicValidator, msg_forwarder: ID, msg: rpc_pb2.Message
) -> ValidationResult:
"""Validate with a single async validator"""
# Apply per-topic throttling
if validator.throttle_semaphore:
try:
validator.throttle_semaphore.acquire_nowait()
except trio.WouldBlock:
logger.debug(
"Per-topic validation throttled for topic %s", validator.topic
)
return ValidationResult.THROTTLED
else:
# Fallback if no throttle semaphore configured
pass
try:
# Apply timeout if configured
result: bool
if validator.timeout:
with trio.fail_after(validator.timeout):
func = cast(AsyncValidatorFn, validator.validator)
result = await func(msg_forwarder, msg)
else:
func = cast(AsyncValidatorFn, validator.validator)
result = await func(msg_forwarder, msg)
return ValidationResult.ACCEPT if result else ValidationResult.REJECT
except trio.TooSlowError:
logger.debug("Validation timeout for topic %s", validator.topic)
return ValidationResult.IGNORE
except Exception as e:
logger.exception(
"Async validator failed for topic %s: %s", validator.topic, e
)
return ValidationResult.REJECT
finally:
if validator.throttle_semaphore:
validator.throttle_semaphore.release()
return ValidationResult.IGNORE

View File

@ -234,8 +234,7 @@ class RelayDiscovery(Service):
if not callable(proto_getter): if not callable(proto_getter):
return None return None
if peer_id not in peerstore.peer_ids():
return None
try: try:
# Try to get protocols # Try to get protocols
proto_result = proto_getter(peer_id) proto_result = proto_getter(peer_id)
@ -284,6 +283,8 @@ class RelayDiscovery(Service):
return None return None
mux = self.host.get_mux() mux = self.host.get_mux()
if not hasattr(mux, "protocols"):
return None
peer_protocols = set() peer_protocols = set()
# Get protocols from mux with proper type safety # Get protocols from mux with proper type safety
@ -292,9 +293,7 @@ class RelayDiscovery(Service):
# Get protocols with proper typing # Get protocols with proper typing
mux_protocols = mux.get_protocols() mux_protocols = mux.get_protocols()
if isinstance(mux_protocols, (list, tuple)): if isinstance(mux_protocols, (list, tuple)):
available_protocols = [ available_protocols = list(mux_protocols)
p for p in mux.get_protocols() if p is not None
]
for protocol in available_protocols: for protocol in available_protocols:
try: try:
@ -314,7 +313,7 @@ class RelayDiscovery(Service):
self._protocol_cache[peer_id] = peer_protocols self._protocol_cache[peer_id] = peer_protocols
protocol_str = str(PROTOCOL_ID) protocol_str = str(PROTOCOL_ID)
for protocol in map(TProtocol, peer_protocols): for protocol in peer_protocols:
if protocol == protocol_str: if protocol == protocol_str:
return True return True
return False return False

View File

@ -7,8 +7,6 @@ from libp2p.utils.varint import (
encode_varint_prefixed, encode_varint_prefixed,
read_delim, read_delim,
read_varint_prefixed_bytes, read_varint_prefixed_bytes,
decode_varint_from_bytes,
decode_varint_with_size,
) )
from libp2p.utils.version import ( from libp2p.utils.version import (
get_agent_version, get_agent_version,
@ -22,6 +20,4 @@ __all__ = [
"get_agent_version", "get_agent_version",
"read_delim", "read_delim",
"read_varint_prefixed_bytes", "read_varint_prefixed_bytes",
"decode_varint_from_bytes",
"decode_varint_with_size",
] ]

View File

@ -39,38 +39,12 @@ def encode_uvarint(number: int) -> bytes:
return buf return buf
def decode_varint_from_bytes(data: bytes) -> int:
"""
Decode a varint from bytes and return the value.
This is a synchronous version of decode_uvarint_from_stream for already-read bytes.
"""
res = 0
for shift in itertools.count(0, 7):
if shift > SHIFT_64_BIT_MAX:
raise ParseError("Integer is too large...")
if not data:
raise ParseError("Unexpected end of data")
value = data[0]
data = data[1:]
res += (value & LOW_MASK) << shift
if not value & HIGH_MASK:
break
return res
async def decode_uvarint_from_stream(reader: Reader) -> int: async def decode_uvarint_from_stream(reader: Reader) -> int:
"""https://en.wikipedia.org/wiki/LEB128.""" """https://en.wikipedia.org/wiki/LEB128."""
res = 0 res = 0
for shift in itertools.count(0, 7): for shift in itertools.count(0, 7):
if shift > SHIFT_64_BIT_MAX: if shift > SHIFT_64_BIT_MAX:
raise ParseError( raise ParseError("TODO: better exception msg: Integer is too large...")
"Varint decoding error: integer exceeds maximum size of 64 bits."
)
byte = await read_exactly(reader, 1) byte = await read_exactly(reader, 1)
value = byte[0] value = byte[0]
@ -82,33 +56,6 @@ async def decode_uvarint_from_stream(reader: Reader) -> int:
return res return res
def decode_varint_with_size(data: bytes) -> tuple[int, int]:
"""
Decode a varint from bytes and return (value, bytes_consumed).
Returns (0, 0) if the data doesn't start with a valid varint.
"""
try:
# Calculate how many bytes the varint consumes
varint_size = 0
for i, byte in enumerate(data):
varint_size += 1
if (byte & 0x80) == 0:
break
if varint_size == 0:
return 0, 0
# Extract just the varint bytes
varint_bytes = data[:varint_size]
# Decode the varint
value = decode_varint_from_bytes(varint_bytes)
return value, varint_size
except Exception:
return 0, 0
def encode_varint_prefixed(msg_bytes: bytes) -> bytes: def encode_varint_prefixed(msg_bytes: bytes) -> bytes:
varint_len = encode_uvarint(len(msg_bytes)) varint_len = encode_uvarint(len(msg_bytes))
return varint_len + msg_bytes return varint_len + msg_bytes

View File

@ -0,0 +1 @@
Added support for ``Kademlia DHT`` in py-libp2p.

View File

@ -0,0 +1 @@
Limit concurrency in `push_identify_to_peers` to prevent resource congestion under high peer counts.

View File

@ -0,0 +1,7 @@
Store public key and peer ID in peerstore during handshake
Modified the InsecureTransport class to accept an optional peerstore parameter and updated the handshake process to store the received public key and peer ID in the peerstore when available.
Added test cases to verify:
1. The peerstore remains unchanged when handshake fails due to peer ID mismatch
2. The handshake correctly adds a public key to a peer ID that already exists in the peerstore but doesn't have a public key yet

View File

@ -0,0 +1,6 @@
Fixed several flow-control and concurrency issues in the `YamuxStream` class. Previously, stress-testing revealed that transferring data over `DEFAULT_WINDOW_SIZE` would break the stream due to inconsistent window update handling and lock management. The fixes include:
- Removed sending of window updates during writes to maintain correct flow-control.
- Added proper timeout handling when releasing and acquiring locks to prevent concurrency errors.
- Corrected the `read` function to properly handle window updates for both `read_until_EOF` and `read_n_bytes`.
- Added event logging at `send_window_updates` and `waiting_for_window_updates` for better observability.

View File

@ -0,0 +1 @@
Added support for ``Multicast DNS`` in py-libp2p

View File

@ -0,0 +1 @@
Refactored gossipsub heartbeat logic to use a single helper method `_handle_topic_heartbeat` that handles both fanout and gossip heartbeats.

View File

@ -0,0 +1 @@
Added sparse connect utility function to pubsub test utilities for creating test networks with configurable connectivity.

View File

@ -0,0 +1,2 @@
Reordered the arguments to `upgrade_security` to place `is_initiator` before `peer_id`, and made `peer_id` optional.
This allows the method to reflect the fact that peer identity is not required for inbound connections.

View File

@ -0,0 +1 @@
Uses the `decapsulate` method of the `Multiaddr` class to clean up the observed address.

View File

@ -0,0 +1 @@
Optimized pubsub publishing to send multiple topics in a single message instead of separate messages per topic.

View File

@ -0,0 +1 @@
Optimized pubsub message writing by implementing a write_msg() method that uses pre-allocated buffers and single write operations, improving performance by eliminating separate varint prefix encoding and write operations in FloodSub and GossipSub.

View File

@ -0,0 +1 @@
added peer exchange and backoff logic as part of Gossipsub v1.1 upgrade

View File

@ -0,0 +1,4 @@
Add timeout wrappers in:
1. multiselect.py: `negotiate` function
2. multiselect_client.py: `select_one_of` , `query_multistream_command` functions
to prevent indefinite hangs when a remote peer does not respond.

View File

@ -0,0 +1 @@
align stream creation logic with yamux specification

View File

@ -0,0 +1 @@
Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior.

View File

@ -0,0 +1 @@
Added comprehensive tests for pubsub connection utility functions to verify degree limits are enforced, excess peers are handled correctly, and edge cases (degree=0, negative values, empty lists) are managed gracefully.

View File

@ -0,0 +1 @@
Added extra tests for identify push concurrency cap under high peer load

View File

@ -0,0 +1 @@
update cryptographic dependencies: pycryptodome to ≥3.19.1, pynacl to ≥1.5.0, coincurve to ≥21.0.0

View File

@ -1,3 +0,0 @@
Improved type safety in `get_mux()` and `get_protocols()` by returning properly typed values instead
of `Any`. Also updated `identify.py` and `discovery.py` to handle `None` values safely and
compare protocols correctly.

View File

@ -1 +0,0 @@
Add comprehensive tests for relay_discovery method in circuit_relay_v2

View File

@ -1 +0,0 @@
Add logic to clear_peerdata method in peerstore

View File

@ -1 +0,0 @@
fixed malformed PeerId in test_peerinfo

View File

@ -1 +0,0 @@
fixed a typecheck error using cast in peerinfo.py

View File

@ -1 +0,0 @@
Improve error message under the function decode_uvarint_from_stream in libp2p/utils/varint.py file

View File

@ -1 +0,0 @@
identify protocol use now prefix-length messages by default. use use_varint_format param for old raw messages

View File

@ -1 +0,0 @@
add length-prefixed support to identify protocol

View File

@ -1 +0,0 @@
Fix raw format reading in identify/push protocol and add comprehensive test coverage for both varint and raw formats

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "libp2p" name = "libp2p"
version = "0.2.9" version = "0.2.8"
description = "libp2p: The Python implementation of the libp2p networking stack" description = "libp2p: The Python implementation of the libp2p networking stack"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10, <4.0" requires-python = ">=3.10, <4.0"
@ -15,17 +15,17 @@ authors = [
] ]
dependencies = [ dependencies = [
"base58>=1.0.3", "base58>=1.0.3",
"coincurve>=10.0.0", "coincurve>=21.0.0",
"exceptiongroup>=1.2.0; python_version < '3.11'", "exceptiongroup>=1.2.0; python_version < '3.11'",
"grpcio>=1.41.0", "grpcio>=1.41.0",
"lru-dict>=1.1.6", "lru-dict>=1.1.6",
"multiaddr>=0.0.9", "multiaddr>=0.0.9",
"mypy-protobuf>=3.0.0", "mypy-protobuf>=3.0.0",
"noiseprotocol>=0.3.0", "noiseprotocol>=0.3.0",
"pycryptodome>=3.19.1",
"protobuf>=4.21.0,<5.0.0", "protobuf>=4.21.0,<5.0.0",
"pycryptodome>=3.9.2",
"pymultihash>=0.8.2", "pymultihash>=0.8.2",
"pynacl>=1.3.0", "pynacl>=1.5.0",
"rpcudp>=3.0.0", "rpcudp>=3.0.0",
"trio-typing>=0.0.4", "trio-typing>=0.0.4",
"trio>=0.26.0", "trio>=0.26.0",
@ -188,7 +188,7 @@ name = "Removals"
showcontent = true showcontent = true
[tool.bumpversion] [tool.bumpversion]
current_version = "0.2.9" current_version = "0.2.8"
parse = """ parse = """
(?P<major>\\d+) (?P<major>\\d+)
\\.(?P<minor>\\d+) \\.(?P<minor>\\d+)

View File

@ -11,7 +11,9 @@ from libp2p.identity.identify.identify import (
PROTOCOL_VERSION, PROTOCOL_VERSION,
_mk_identify_protobuf, _mk_identify_protobuf,
_multiaddr_to_bytes, _multiaddr_to_bytes,
parse_identify_response, )
from libp2p.identity.identify.pb.identify_pb2 import (
Identify,
) )
from tests.utils.factories import ( from tests.utils.factories import (
host_pair_factory, host_pair_factory,
@ -27,18 +29,14 @@ async def test_identify_protocol(security_protocol):
host_b, host_b,
): ):
# Here, host_b is the requester and host_a is the responder. # Here, host_b is the requester and host_a is the responder.
# observed_addr represent host_b's address as observed by host_a # observed_addr represent host_bs address as observed by host_a
# (i.e., the address from which host_b's request was received). # (i.e., the address from which host_bs request was received).
stream = await host_b.new_stream(host_a.get_id(), (ID,)) stream = await host_b.new_stream(host_a.get_id(), (ID,))
response = await stream.read()
# Read the response (could be either format)
# Read a larger chunk to get all the data before stream closes
response = await stream.read(8192) # Read enough data in one go
await stream.close() await stream.close()
# Parse the response (handles both old and new formats) identify_response = Identify()
identify_response = parse_identify_response(response) identify_response.ParseFromString(response)
logger.debug("host_a: %s", host_a.get_addrs()) logger.debug("host_a: %s", host_a.get_addrs())
logger.debug("host_b: %s", host_b.get_addrs()) logger.debug("host_b: %s", host_b.get_addrs())
@ -64,9 +62,8 @@ async def test_identify_protocol(security_protocol):
logger.debug("observed_addr: %s", Multiaddr(identify_response.observed_addr)) logger.debug("observed_addr: %s", Multiaddr(identify_response.observed_addr))
logger.debug("host_b.get_addrs()[0]: %s", host_b.get_addrs()[0]) logger.debug("host_b.get_addrs()[0]: %s", host_b.get_addrs()[0])
logger.debug("cleaned_addr= %s", cleaned_addr)
# The observed address should match the cleaned address assert identify_response.observed_addr == _multiaddr_to_bytes(cleaned_addr)
assert Multiaddr(identify_response.observed_addr) == cleaned_addr
# Check protocols # Check protocols
assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols()) assert set(identify_response.protocols) == set(host_a.get_mux().get_protocols())

View File

@ -1,410 +0,0 @@
import pytest
from libp2p.identity.identify.identify import (
_mk_identify_protobuf,
)
from libp2p.identity.identify.pb.identify_pb2 import (
Identify,
)
from libp2p.io.abc import Closer, Reader, Writer
from libp2p.utils.varint import (
decode_varint_from_bytes,
encode_varint_prefixed,
)
from tests.utils.factories import (
host_pair_factory,
)
class MockStream(Reader, Writer, Closer):
"""Mock stream for testing identify protocol compatibility."""
def __init__(self, data: bytes):
self.data = data
self.position = 0
self.closed = False
async def read(self, n: int | None = None) -> bytes:
if self.closed or self.position >= len(self.data):
return b""
if n is None:
n = len(self.data) - self.position
result = self.data[self.position : self.position + n]
self.position += len(result)
return result
async def write(self, data: bytes) -> None:
# Mock write - just store the data
pass
async def close(self) -> None:
self.closed = True
def create_identify_message(host, observed_multiaddr=None):
"""Create an identify protobuf message."""
return _mk_identify_protobuf(host, observed_multiaddr)
def create_new_format_message(identify_msg):
"""Create a new format (length-prefixed) identify message."""
msg_bytes = identify_msg.SerializeToString()
return encode_varint_prefixed(msg_bytes)
def create_old_format_message(identify_msg):
"""Create an old format (raw protobuf) identify message."""
return identify_msg.SerializeToString()
async def read_new_format_message(stream) -> bytes:
"""Read a new format (length-prefixed) identify message."""
# Read varint length prefix
length_bytes = b""
while True:
b = await stream.read(1)
if not b:
break
length_bytes += b
if b[0] & 0x80 == 0:
break
if not length_bytes:
raise ValueError("No length prefix received")
msg_length = decode_varint_from_bytes(length_bytes)
# Read the protobuf message
response = await stream.read(msg_length)
if len(response) != msg_length:
raise ValueError("Incomplete message received")
return response
async def read_old_format_message(stream) -> bytes:
"""Read an old format (raw protobuf) identify message."""
# Read all available data
response = b""
while True:
chunk = await stream.read(4096)
if not chunk:
break
response += chunk
return response
async def read_compatible_message(stream) -> bytes:
"""Read an identify message in either old or new format."""
# Try to read a few bytes to detect the format
first_bytes = await stream.read(10)
if not first_bytes:
raise ValueError("No data received")
# Try to decode as varint length prefix (new format)
try:
msg_length = decode_varint_from_bytes(first_bytes)
# Validate that the length is reasonable (not too large)
if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB
# Calculate how many bytes the varint consumed
varint_len = 0
for i, byte in enumerate(first_bytes):
varint_len += 1
if (byte & 0x80) == 0:
break
# Read the remaining protobuf message
remaining_bytes = await stream.read(
msg_length - (len(first_bytes) - varint_len)
)
if len(remaining_bytes) == msg_length - (len(first_bytes) - varint_len):
message_data = first_bytes[varint_len:] + remaining_bytes
# Try to parse as protobuf to validate
try:
Identify().ParseFromString(message_data)
return message_data
except Exception:
# If protobuf parsing fails, fall back to old format
pass
except Exception:
pass
# Fall back to old format (raw protobuf)
response = first_bytes
# Read more data if available
while True:
chunk = await stream.read(4096)
if not chunk:
break
response += chunk
return response
async def read_compatible_message_simple(stream) -> bytes:
"""Read a message in either old or new format (simplified version for testing)."""
# Try to read a few bytes to detect the format
first_bytes = await stream.read(10)
if not first_bytes:
raise ValueError("No data received")
# Try to decode as varint length prefix (new format)
try:
msg_length = decode_varint_from_bytes(first_bytes)
# Validate that the length is reasonable (not too large)
if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB
# Calculate how many bytes the varint consumed
varint_len = 0
for i, byte in enumerate(first_bytes):
varint_len += 1
if (byte & 0x80) == 0:
break
# Read the remaining message
remaining_bytes = await stream.read(
msg_length - (len(first_bytes) - varint_len)
)
if len(remaining_bytes) == msg_length - (len(first_bytes) - varint_len):
return first_bytes[varint_len:] + remaining_bytes
except Exception:
pass
# Fall back to old format (raw data)
response = first_bytes
# Read more data if available
while True:
chunk = await stream.read(4096)
if not chunk:
break
response += chunk
return response
def detect_format(data):
"""Detect if data is in new or old format (varint-prefixed or raw protobuf)."""
if not data:
return "unknown"
# Try to decode as varint
try:
msg_length = decode_varint_from_bytes(data)
# Validate that the length is reasonable
if msg_length > 0 and msg_length <= 1024 * 1024: # Max 1MB
# Calculate varint length
varint_len = 0
for i, byte in enumerate(data):
varint_len += 1
if (byte & 0x80) == 0:
break
# Check if we have enough data for the message
if len(data) >= varint_len + msg_length:
# Additional check: try to parse the message as protobuf
try:
message_data = data[varint_len : varint_len + msg_length]
Identify().ParseFromString(message_data)
return "new"
except Exception:
# If protobuf parsing fails, it's probably not a valid new format
pass
except Exception:
pass
# If varint decoding fails or length is unreasonable, assume old format
return "old"
@pytest.mark.trio
async def test_identify_new_format_compatibility(security_protocol):
"""Test that identify protocol works with new format (length-prefixed) messages."""
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
# Create identify message
identify_msg = create_identify_message(host_a)
# Create new format message
new_format_data = create_new_format_message(identify_msg)
# Create mock stream with new format data
stream = MockStream(new_format_data)
# Read using new format reader
response = await read_new_format_message(stream)
# Parse the response
parsed_msg = Identify()
parsed_msg.ParseFromString(response)
# Verify the message content
assert parsed_msg.protocol_version == identify_msg.protocol_version
assert parsed_msg.agent_version == identify_msg.agent_version
assert parsed_msg.public_key == identify_msg.public_key
@pytest.mark.trio
async def test_identify_old_format_compatibility(security_protocol):
"""Test that identify protocol works with old format (raw protobuf) messages."""
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
# Create identify message
identify_msg = create_identify_message(host_a)
# Create old format message
old_format_data = create_old_format_message(identify_msg)
# Create mock stream with old format data
stream = MockStream(old_format_data)
# Read using old format reader
response = await read_old_format_message(stream)
# Parse the response
parsed_msg = Identify()
parsed_msg.ParseFromString(response)
# Verify the message content
assert parsed_msg.protocol_version == identify_msg.protocol_version
assert parsed_msg.agent_version == identify_msg.agent_version
assert parsed_msg.public_key == identify_msg.public_key
@pytest.mark.trio
async def test_identify_backward_compatibility_old_format(security_protocol):
"""Test backward compatibility reader with old format messages."""
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
# Create identify message
identify_msg = create_identify_message(host_a)
# Create old format message
old_format_data = create_old_format_message(identify_msg)
# Create mock stream with old format data
stream = MockStream(old_format_data)
# Read using old format reader (which should work reliably)
response = await read_old_format_message(stream)
# Parse the response
parsed_msg = Identify()
parsed_msg.ParseFromString(response)
# Verify the message content
assert parsed_msg.protocol_version == identify_msg.protocol_version
assert parsed_msg.agent_version == identify_msg.agent_version
assert parsed_msg.public_key == identify_msg.public_key
@pytest.mark.trio
async def test_identify_backward_compatibility_new_format(security_protocol):
"""Test backward compatibility reader with new format messages."""
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
# Create identify message
identify_msg = create_identify_message(host_a)
# Create new format message
new_format_data = create_new_format_message(identify_msg)
# Create mock stream with new format data
stream = MockStream(new_format_data)
# Read using new format reader (which should work reliably)
response = await read_new_format_message(stream)
# Parse the response
parsed_msg = Identify()
parsed_msg.ParseFromString(response)
# Verify the message content
assert parsed_msg.protocol_version == identify_msg.protocol_version
assert parsed_msg.agent_version == identify_msg.agent_version
assert parsed_msg.public_key == identify_msg.public_key
@pytest.mark.trio
async def test_identify_format_detection(security_protocol):
"""Test that the format detection works correctly."""
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
# Create identify message
identify_msg = create_identify_message(host_a)
# Test new format detection
new_format_data = create_new_format_message(identify_msg)
format_type = detect_format(new_format_data)
assert format_type == "new", "New format should be detected correctly"
# Test old format detection
old_format_data = create_old_format_message(identify_msg)
format_type = detect_format(old_format_data)
assert format_type == "old", "Old format should be detected correctly"
@pytest.mark.trio
async def test_identify_error_handling(security_protocol):
"""Test error handling for malformed messages."""
from libp2p.exceptions import ParseError
# Test with empty data
stream = MockStream(b"")
with pytest.raises(ValueError, match="No data received"):
await read_compatible_message(stream)
# Test with incomplete varint
stream = MockStream(b"\x80") # Incomplete varint
with pytest.raises(ParseError, match="Unexpected end of data"):
await read_new_format_message(stream)
# Test with invalid protobuf data
stream = MockStream(b"\x05invalid") # Length prefix but invalid protobuf
with pytest.raises(Exception): # Should fail when parsing protobuf
response = await read_new_format_message(stream)
Identify().ParseFromString(response)
@pytest.mark.trio
async def test_identify_message_equivalence(security_protocol):
"""Test that old and new format messages are equivalent."""
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
# Create identify message
identify_msg = create_identify_message(host_a)
# Create both formats
new_format_data = create_new_format_message(identify_msg)
old_format_data = create_old_format_message(identify_msg)
# Extract the protobuf message from new format
varint_len = 0
for i, byte in enumerate(new_format_data):
varint_len += 1
if (byte & 0x80) == 0:
break
new_format_protobuf = new_format_data[varint_len:]
# The protobuf messages should be identical
assert new_format_protobuf == old_format_data, (
"Protobuf messages should be identical in both formats"
)

View File

@ -459,11 +459,7 @@ async def test_push_identify_to_peers_respects_concurrency_limit():
lock = trio.Lock() lock = trio.Lock()
async def mock_push_identify_to_peer( async def mock_push_identify_to_peer(
host, host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT)
peer_id,
observed_multiaddr=None,
limit=trio.Semaphore(CONCURRENCY_LIMIT),
use_varint_format=True,
) -> bool: ) -> bool:
""" """
Mock function to test concurrency by simulating an identify message. Mock function to test concurrency by simulating an identify message.
@ -597,104 +593,3 @@ async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_lo
assert peer_id_a in dummy_peerstore.peer_ids() assert peer_id_a in dummy_peerstore.peer_ids()
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
@pytest.mark.trio
async def test_identify_push_default_varint_format(security_protocol):
"""
Test that the identify/push protocol uses varint format by default.
This test verifies that:
1. The default behavior uses length-prefixed messages (varint format)
2. Messages are correctly encoded with varint length prefix
3. Messages are correctly decoded with varint length prefix
4. The peerstore is updated correctly with the received information
"""
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
# Set up the identify/push handlers with default settings
# (use_varint_format=True)
host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b))
# Push identify information from host_a to host_b using default settings
success = await push_identify_to_peer(host_a, host_b.get_id())
assert success, "Identify push should succeed with default varint format"
# Wait a bit for the push to complete
await trio.sleep(0.1)
# Get the peerstore from host_b
peerstore = host_b.get_peerstore()
peer_id = host_a.get_id()
# Verify that the peerstore was updated correctly
assert peer_id in peerstore.peer_ids()
# Check that addresses have been updated
host_a_addrs = set(host_a.get_addrs())
peerstore_addrs = set(peerstore.addrs(peer_id))
assert all(addr in peerstore_addrs for addr in host_a_addrs)
# Check that protocols have been updated
host_a_protocols = set(host_a.get_mux().get_protocols())
peerstore_protocols = set(peerstore.get_protocols(peer_id))
assert all(protocol in peerstore_protocols for protocol in host_a_protocols)
# Check that the public key has been updated
host_a_public_key = host_a.get_public_key().serialize()
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
assert host_a_public_key == peerstore_public_key
@pytest.mark.trio
async def test_identify_push_legacy_raw_format(security_protocol):
"""
Test that the identify/push protocol can use legacy raw format when specified.
This test verifies that:
1. When use_varint_format=False, messages are sent without length prefix
2. Raw protobuf messages are correctly encoded and decoded
3. The peerstore is updated correctly with the received information
4. The legacy format is backward compatible
"""
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
# Set up the identify/push handlers with legacy format (use_varint_format=False)
host_b.set_stream_handler(
ID_PUSH, identify_push_handler_for(host_b, use_varint_format=False)
)
# Push identify information from host_a to host_b using legacy format
success = await push_identify_to_peer(
host_a, host_b.get_id(), use_varint_format=False
)
assert success, "Identify push should succeed with legacy raw format"
# Wait a bit for the push to complete
await trio.sleep(0.1)
# Get the peerstore from host_b
peerstore = host_b.get_peerstore()
peer_id = host_a.get_id()
# Verify that the peerstore was updated correctly
assert peer_id in peerstore.peer_ids()
# Check that addresses have been updated
host_a_addrs = set(host_a.get_addrs())
peerstore_addrs = set(peerstore.addrs(peer_id))
assert all(addr in peerstore_addrs for addr in host_a_addrs)
# Check that protocols have been updated
host_a_protocols = set(host_a.get_mux().get_protocols())
peerstore_protocols = set(peerstore.get_protocols(peer_id))
assert all(protocol in peerstore_protocols for protocol in host_a_protocols)
# Check that the public key has been updated
host_a_public_key = host_a.get_public_key().serialize()
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
assert host_a_public_key == peerstore_public_key

View File

@ -13,9 +13,7 @@ from libp2p.peer.peerinfo import (
) )
ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
VALID_MULTI_ADDR_STR = ( VALID_MULTI_ADDR_STR = "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" # noqa: E501
"/ip4/127.0.0.1/tcp/8000/p2p/QmWQqHcMi6Cay5M6KWSNVYSDnxzfqWb1aGFQFSRzBNe49t"
)
def test_init_(): def test_init_():
@ -52,6 +50,9 @@ def test_info_from_p2p_addr_invalid(addr):
def test_info_from_p2p_addr_valid(): def test_info_from_p2p_addr_valid():
m_addr = multiaddr.Multiaddr(VALID_MULTI_ADDR_STR) m_addr = multiaddr.Multiaddr(VALID_MULTI_ADDR_STR)
info = info_from_p2p_addr(m_addr) info = info_from_p2p_addr(m_addr)
assert info.peer_id.pretty() == "QmWQqHcMi6Cay5M6KWSNVYSDnxzfqWb1aGFQFSRzBNe49t" assert (
info.peer_id.pretty()
== "3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ"
)
assert len(info.addrs) == 1 assert len(info.addrs) == 1
assert str(info.addrs[0]) == "/ip4/127.0.0.1/tcp/8000" assert str(info.addrs[0]) == "/ip4/127.0.0.1/tcp/8000"

View File

@ -3,7 +3,6 @@ import pytest
from libp2p.custom_types import ( from libp2p.custom_types import (
TProtocol, TProtocol,
) )
from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.tools.utils import ( from libp2p.tools.utils import (
create_echo_stream_handler, create_echo_stream_handler,
) )
@ -139,23 +138,3 @@ async def test_multistream_command(security_protocol):
# Dialer asks for unspoorted command # Dialer asks for unspoorted command
with pytest.raises(ValueError, match="Command not supported"): with pytest.raises(ValueError, match="Command not supported"):
await dialer.send_command(listener.get_id(), "random") await dialer.send_command(listener.get_id(), "random")
@pytest.mark.trio
async def test_get_protocols_returns_all_registered_protocols():
ms = Multiselect()
async def dummy_handler(stream):
pass
p1 = TProtocol("/echo/1.0.0")
p2 = TProtocol("/foo/1.0.0")
p3 = TProtocol("/bar/1.0.0")
ms.add_handler(p1, dummy_handler)
ms.add_handler(p2, dummy_handler)
ms.add_handler(p3, dummy_handler)
protocols = ms.get_protocols()
assert set(protocols) == {p1, p2, p3}

View File

@ -105,11 +105,11 @@ async def test_relay_discovery_initialization():
@pytest.mark.trio @pytest.mark.trio
async def test_relay_discovery_find_relay_peerstore_method(): async def test_relay_discovery_find_relay():
"""Test finding a relay node via discovery using the peerstore method.""" """Test finding a relay node via discovery."""
async with HostFactory.create_batch_and_listen(2) as hosts: async with HostFactory.create_batch_and_listen(2) as hosts:
relay_host, client_host = hosts relay_host, client_host = hosts
logger.info("Created host for test_relay_discovery_find_relay_peerstore_method") logger.info("Created hosts for test_relay_discovery_find_relay")
logger.info("Relay host ID: %s", relay_host.get_id()) logger.info("Relay host ID: %s", relay_host.get_id())
logger.info("Client host ID: %s", client_host.get_id()) logger.info("Client host ID: %s", client_host.get_id())
@ -144,19 +144,19 @@ async def test_relay_discovery_find_relay_peerstore_method():
# Start discovery service # Start discovery service
async with background_trio_service(client_discovery): async with background_trio_service(client_discovery):
await client_discovery.event_started.wait() await client_discovery.event_started.wait()
logger.info("Client discovery service started (peerstore method)") logger.info("Client discovery service started")
# Wait for discovery to find the relay using the peerstore method # Wait for discovery to find the relay
logger.info("Waiting for relay discovery using peerstore...") logger.info("Waiting for relay discovery...")
# Manually trigger discovery which uses peerstore as default # Manually trigger discovery instead of waiting
await client_discovery.discover_relays() await client_discovery.discover_relays()
# Check if relay was found # Check if relay was found
with trio.fail_after(DISCOVERY_TIMEOUT): with trio.fail_after(DISCOVERY_TIMEOUT):
for _ in range(20): # Try multiple times for _ in range(20): # Try multiple times
if relay_host.get_id() in client_discovery._discovered_relays: if relay_host.get_id() in client_discovery._discovered_relays:
logger.info("Relay discovered successfully (peerstore method)") logger.info("Relay discovered successfully")
break break
# Wait and try again # Wait and try again
@ -164,194 +164,14 @@ async def test_relay_discovery_find_relay_peerstore_method():
# Manually trigger discovery again # Manually trigger discovery again
await client_discovery.discover_relays() await client_discovery.discover_relays()
else: else:
pytest.fail( pytest.fail("Failed to discover relay node within timeout")
"Failed to discover relay node within timeout(peerstore method)"
)
# Verify that relay was found and is valid # Verify that relay was found and is valid
assert relay_host.get_id() in client_discovery._discovered_relays, ( assert relay_host.get_id() in client_discovery._discovered_relays, (
"Relay should be discovered (peerstore method)" "Relay should be discovered"
) )
relay_info = client_discovery._discovered_relays[relay_host.get_id()] relay_info = client_discovery._discovered_relays[relay_host.get_id()]
assert relay_info.peer_id == relay_host.get_id(), ( assert relay_info.peer_id == relay_host.get_id(), "Peer ID should match"
"Peer ID should match (peerstore method)"
)
@pytest.mark.trio
async def test_relay_discovery_find_relay_direct_connection_method():
"""Test finding a relay node via discovery using the direct connection method."""
async with HostFactory.create_batch_and_listen(2) as hosts:
relay_host, client_host = hosts
logger.info("Created hosts for test_relay_discovery_find_relay_direct_method")
logger.info("Relay host ID: %s", relay_host.get_id())
logger.info("Client host ID: %s", client_host.get_id())
# Explicitly register the protocol handlers on relay_host
relay_host.set_stream_handler(PROTOCOL_ID, simple_stream_handler)
relay_host.set_stream_handler(STOP_PROTOCOL_ID, simple_stream_handler)
# Manually add protocol to peerstore for testing, then remove to force fallback
client_host.get_peerstore().add_protocols(
relay_host.get_id(), [str(PROTOCOL_ID)]
)
# Set up discovery on the client host
client_discovery = RelayDiscovery(
client_host, discovery_interval=5
) # Use shorter interval for testing
try:
# Connect peers so they can discover each other
with trio.fail_after(CONNECT_TIMEOUT):
logger.info("Connecting client host to relay host")
await connect(client_host, relay_host)
assert relay_host.get_network().connections[client_host.get_id()], (
"Peers not connected"
)
logger.info("Connection established between peers")
except Exception as e:
logger.error("Failed to connect peers: %s", str(e))
raise
# Remove the relay from the peerstore to test fallback to direct connection
client_host.get_peerstore().clear_peerdata(relay_host.get_id())
# Make sure that peer_id is not present in peerstore
assert relay_host.get_id() not in client_host.get_peerstore().peer_ids()
# Start discovery service
async with background_trio_service(client_discovery):
await client_discovery.event_started.wait()
logger.info("Client discovery service started (direct connection method)")
# Wait for discovery to find the relay using the direct connection method
logger.info(
"Waiting for relay discovery using direct connection fallback..."
)
# Manually trigger discovery which should fallback to direct connection
await client_discovery.discover_relays()
# Check if relay was found
with trio.fail_after(DISCOVERY_TIMEOUT):
for _ in range(20): # Try multiple times
if relay_host.get_id() in client_discovery._discovered_relays:
logger.info("Relay discovered successfully (direct method)")
break
# Wait and try again
await trio.sleep(1)
# Manually trigger discovery again
await client_discovery.discover_relays()
else:
pytest.fail(
"Failed to discover relay node within timeout (direct method)"
)
# Verify that relay was found and is valid
assert relay_host.get_id() in client_discovery._discovered_relays, (
"Relay should be discovered (direct method)"
)
relay_info = client_discovery._discovered_relays[relay_host.get_id()]
assert relay_info.peer_id == relay_host.get_id(), (
"Peer ID should match (direct method)"
)
@pytest.mark.trio
async def test_relay_discovery_find_relay_mux_method():
"""
Test finding a relay node via discovery using the mux method
(fallback after direct connection fails).
"""
async with HostFactory.create_batch_and_listen(2) as hosts:
relay_host, client_host = hosts
logger.info("Created hosts for test_relay_discovery_find_relay_mux_method")
logger.info("Relay host ID: %s", relay_host.get_id())
logger.info("Client host ID: %s", client_host.get_id())
# Explicitly register the protocol handlers on relay_host
relay_host.set_stream_handler(PROTOCOL_ID, simple_stream_handler)
relay_host.set_stream_handler(STOP_PROTOCOL_ID, simple_stream_handler)
client_host.set_stream_handler(PROTOCOL_ID, simple_stream_handler)
client_host.set_stream_handler(STOP_PROTOCOL_ID, simple_stream_handler)
# Set up discovery on the client host
client_discovery = RelayDiscovery(
client_host, discovery_interval=5
) # Use shorter interval for testing
try:
# Connect peers so they can discover each other
with trio.fail_after(CONNECT_TIMEOUT):
logger.info("Connecting client host to relay host")
await connect(client_host, relay_host)
assert relay_host.get_network().connections[client_host.get_id()], (
"Peers not connected"
)
logger.info("Connection established between peers")
except Exception as e:
logger.error("Failed to connect peers: %s", str(e))
raise
# Remove the relay from the peerstore to test fallback
client_host.get_peerstore().clear_peerdata(relay_host.get_id())
# Make sure that peer_id is not present in peerstore
assert relay_host.get_id() not in client_host.get_peerstore().peer_ids()
# Mock the _check_via_direct_connection method to return None
# This forces the discovery to fall back to the mux method
async def mock_direct_check_fails(peer_id):
"""Mock that always returns None to force mux fallback."""
return None
client_discovery._check_via_direct_connection = mock_direct_check_fails
# Start discovery service
async with background_trio_service(client_discovery):
await client_discovery.event_started.wait()
logger.info("Client discovery service started (mux method)")
# Wait for discovery to find the relay using the mux method
logger.info("Waiting for relay discovery using mux fallback...")
# Manually trigger discovery which should fallback to mux method
await client_discovery.discover_relays()
# Check if relay was found
with trio.fail_after(DISCOVERY_TIMEOUT):
for _ in range(20): # Try multiple times
if relay_host.get_id() in client_discovery._discovered_relays:
logger.info("Relay discovered successfully (mux method)")
break
# Wait and try again
await trio.sleep(1)
# Manually trigger discovery again
await client_discovery.discover_relays()
else:
pytest.fail(
"Failed to discover relay node within timeout (mux method)"
)
# Verify that relay was found and is valid
assert relay_host.get_id() in client_discovery._discovered_relays, (
"Relay should be discovered (mux method)"
)
relay_info = client_discovery._discovered_relays[relay_host.get_id()]
assert relay_info.peer_id == relay_host.get_id(), (
"Peer ID should match (mux method)"
)
# Verify that the protocol was cached via mux method
assert relay_host.get_id() in client_discovery._protocol_cache, (
"Protocol should be cached (mux method)"
)
assert (
str(PROTOCOL_ID)
in client_discovery._protocol_cache[relay_host.get_id()]
), "Relay protocol should be in cache (mux method)"
@pytest.mark.trio @pytest.mark.trio

View File

@ -1,215 +0,0 @@
import pytest
from libp2p.exceptions import ParseError
from libp2p.io.abc import Reader
from libp2p.utils.varint import (
decode_varint_from_bytes,
encode_uvarint,
encode_varint_prefixed,
read_varint_prefixed_bytes,
)
class MockReader(Reader):
"""Mock reader for testing varint functions."""
def __init__(self, data: bytes):
self.data = data
self.position = 0
async def read(self, n: int | None = None) -> bytes:
if self.position >= len(self.data):
return b""
if n is None:
n = len(self.data) - self.position
result = self.data[self.position : self.position + n]
self.position += len(result)
return result
def test_encode_uvarint():
"""Test varint encoding with various values."""
test_cases = [
(0, b"\x00"),
(1, b"\x01"),
(127, b"\x7f"),
(128, b"\x80\x01"),
(255, b"\xff\x01"),
(256, b"\x80\x02"),
(65535, b"\xff\xff\x03"),
(65536, b"\x80\x80\x04"),
(16777215, b"\xff\xff\xff\x07"),
(16777216, b"\x80\x80\x80\x08"),
]
for value, expected in test_cases:
result = encode_uvarint(value)
assert result == expected, (
f"Failed for value {value}: expected {expected.hex()}, got {result.hex()}"
)
def test_decode_varint_from_bytes():
"""Test varint decoding with various values."""
test_cases = [
(b"\x00", 0),
(b"\x01", 1),
(b"\x7f", 127),
(b"\x80\x01", 128),
(b"\xff\x01", 255),
(b"\x80\x02", 256),
(b"\xff\xff\x03", 65535),
(b"\x80\x80\x04", 65536),
(b"\xff\xff\xff\x07", 16777215),
(b"\x80\x80\x80\x08", 16777216),
]
for data, expected in test_cases:
result = decode_varint_from_bytes(data)
assert result == expected, (
f"Failed for data {data.hex()}: expected {expected}, got {result}"
)
def test_decode_varint_from_bytes_invalid():
"""Test varint decoding with invalid data."""
# Empty data
with pytest.raises(ParseError, match="Unexpected end of data"):
decode_varint_from_bytes(b"")
# Incomplete varint (should not raise, but should handle gracefully)
# This depends on the implementation - some might raise, others might return partial
def test_encode_varint_prefixed():
"""Test encoding messages with varint length prefix."""
test_cases = [
(b"", b"\x00"),
(b"hello", b"\x05hello"),
(b"x" * 127, b"\x7f" + b"x" * 127),
(b"x" * 128, b"\x80\x01" + b"x" * 128),
]
for message, expected in test_cases:
result = encode_varint_prefixed(message)
assert result == expected, (
f"Failed for message {message}: expected {expected.hex()}, "
f"got {result.hex()}"
)
@pytest.mark.trio
async def test_read_varint_prefixed_bytes():
"""Test reading length-prefixed bytes from a stream."""
test_cases = [
(b"", b""),
(b"hello", b"hello"),
(b"x" * 127, b"x" * 127),
(b"x" * 128, b"x" * 128),
]
for message, expected in test_cases:
prefixed_data = encode_varint_prefixed(message)
reader = MockReader(prefixed_data)
result = await read_varint_prefixed_bytes(reader)
assert result == expected, (
f"Failed for message {message}: expected {expected}, got {result}"
)
@pytest.mark.trio
async def test_read_varint_prefixed_bytes_incomplete():
"""Test reading length-prefixed bytes with incomplete data."""
from libp2p.io.exceptions import IncompleteReadError
# Test with incomplete varint
reader = MockReader(b"\x80") # Incomplete varint
with pytest.raises(IncompleteReadError):
await read_varint_prefixed_bytes(reader)
# Test with incomplete message
prefixed_data = encode_varint_prefixed(b"hello world")
reader = MockReader(prefixed_data[:-3]) # Missing last 3 bytes
with pytest.raises(IncompleteReadError):
await read_varint_prefixed_bytes(reader)
def test_varint_roundtrip():
"""Test roundtrip encoding and decoding."""
test_values = [0, 1, 127, 128, 255, 256, 65535, 65536, 16777215, 16777216]
for value in test_values:
encoded = encode_uvarint(value)
decoded = decode_varint_from_bytes(encoded)
assert decoded == value, (
f"Roundtrip failed for {value}: encoded={encoded.hex()}, decoded={decoded}"
)
def test_varint_prefixed_roundtrip():
"""Test roundtrip encoding and decoding of length-prefixed messages."""
test_messages = [
b"",
b"hello",
b"x" * 127,
b"x" * 128,
b"x" * 1000,
]
for message in test_messages:
prefixed = encode_varint_prefixed(message)
# Decode the length
length = decode_varint_from_bytes(prefixed)
assert length == len(message), (
f"Length mismatch for {message}: expected {len(message)}, got {length}"
)
# Extract the message
varint_len = 0
for i, byte in enumerate(prefixed):
varint_len += 1
if (byte & 0x80) == 0:
break
extracted_message = prefixed[varint_len:]
assert extracted_message == message, (
f"Message mismatch: expected {message}, got {extracted_message}"
)
def test_large_varint_values():
"""Test varint encoding/decoding with large values."""
large_values = [
2**32 - 1, # 32-bit max
2**64 - 1, # 64-bit max (if supported)
]
for value in large_values:
try:
encoded = encode_uvarint(value)
decoded = decode_varint_from_bytes(encoded)
assert decoded == value, f"Large value roundtrip failed for {value}"
except Exception as e:
# Some implementations might not support very large values
pytest.skip(f"Large value {value} not supported: {e}")
def test_varint_edge_cases():
"""Test varint encoding/decoding with edge cases."""
# Test with maximum 7-bit value
assert encode_uvarint(127) == b"\x7f"
assert decode_varint_from_bytes(b"\x7f") == 127
# Test with minimum 8-bit value
assert encode_uvarint(128) == b"\x80\x01"
assert decode_varint_from_bytes(b"\x80\x01") == 128
# Test with maximum 14-bit value
assert encode_uvarint(16383) == b"\xff\x7f"
assert decode_varint_from_bytes(b"\xff\x7f") == 16383
# Test with minimum 15-bit value
assert encode_uvarint(16384) == b"\x80\x80\x01"
assert decode_varint_from_bytes(b"\x80\x80\x01") == 16384

View File

@ -1,81 +0,0 @@
# py-libp2p and js-libp2p Interoperability Tests
This repository contains interoperability tests for py-libp2p and js-libp2p using the /ipfs/ping/1.0.0 protocol. The goal is to verify compatibility in stream multiplexing, protocol negotiation, ping handling, transport layer, and multiaddr parsing.
## Directory Structure
- js_node/ping.js: JavaScript implementation of a ping server and client using libp2p.
- py_node/ping.py: Python implementation of a ping server and client using py-libp2p.
- scripts/run_test.sh: Shell script to automate running the server and client for testing.
- README.md: This file.
## Prerequisites
- Python 3.8+ with `py-libp2p` and dependencies (`pip install libp2p trio cryptography multiaddr`).
- Node.js 16+ with `libp2p` dependencies (`npm install @libp2p/core @libp2p/tcp @chainsafe/libp2p-noise @chainsafe/libp2p-yamux @libp2p/ping @libp2p/identify @multiformats/multiaddr`).
- Bash shell for running `run_test.sh`.
## Running Tests
1. Change directory:
```
cd tests/interop/js_libp2p
```
2. Install dependencies:
```
For JavaScript: cd js_node && npm install && cd ...
```
3. Run the automated test:
For Linux and Mac users:
```
chmod +x scripts/run_test.sh
./scripts/run_test.sh
```
For Windows users:
```
.\scripts\run_test.ps1
```
This starts the Python server on port 8000 and runs the JavaScript client to send 5 pings.
## Debugging
- Logs are saved in py_node/py_server.log and js_node/js_client.log.
- Check for:
- Successful connection establishment.
- Protocol negotiation (/ipfs/ping/1.0.0).
- 32-byte payload echo in server logs.
- RTT and payload hex in client logs.
## Test Plan
### The test verifies:
- Stream Multiplexer Compatibility: Yamux is used and negotiates correctly.
- Multistream Protocol Negotiation: /ipfs/ping/1.0.0 is selected via multistream-select.
- Ping Protocol Handler: Handles 32-byte payloads per the libp2p ping spec.
- Transport Layer Support: TCP is used; WebSocket support is optional.
- Multiaddr Parsing: Correctly resolves multiaddr strings.
- Logging: Includes peer ID, RTT, and payload hex for debugging.
## Current Status
### Working:
- TCP transport and Noise encryption are functional.
- Yamux multiplexing is implemented in both nodes.
- Multiaddr parsing works correctly.
- Logging provides detailed debug information.
## Not Working:
- Ping protocol handler fails to complete pings (JS client reports "operation aborted").
- Potential issues with stream handling or protocol negotiation.

View File

@ -1,53 +0,0 @@
# @libp2p/example-chat <!-- omit in toc -->
[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io)
[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-examples.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-examples)
[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p-examples/ci.yml?branch=main&style=flat-square)](https://github.com/libp2p/js-libp2p-examples/actions/workflows/ci.yml?query=branch%3Amain)
> An example chat app using libp2p
## Table of contents <!-- omit in toc -->
- [Setup](#setup)
- [Running](#running)
- [Need help?](#need-help)
- [License](#license)
- [Contribution](#contribution)
## Setup
1. Install example dependencies
```console
$ npm install
```
1. Open 2 terminal windows in the `./src` directory.
## Running
1. Run the listener in window 1, `node listener.js`
1. Run the dialer in window 2, `node dialer.js`
1. Wait until the two peers discover each other
1. Type a message in either window and hit *enter*
1. Tell yourself secrets to your hearts content!
## Need help?
- Read the [js-libp2p documentation](https://github.com/libp2p/js-libp2p/tree/main/doc)
- Check out the [js-libp2p API docs](https://libp2p.github.io/js-libp2p/)
- Check out the [general libp2p documentation](https://docs.libp2p.io) for tips, how-tos and more
- Read the [libp2p specs](https://github.com/libp2p/specs)
- Ask a question on the [js-libp2p discussion board](https://github.com/libp2p/js-libp2p/discussions)
## License
Licensed under either of
- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT ([LICENSE-MIT](LICENSE-MIT) / <http://opensource.org/licenses/MIT>)
## Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.

View File

@ -1,39 +0,0 @@
{
"name": "@libp2p/example-chat",
"version": "0.0.0",
"description": "An example chat app using libp2p",
"license": "Apache-2.0 OR MIT",
"homepage": "https://github.com/libp2p/js-libp2p-example-chat#readme",
"repository": {
"type": "git",
"url": "git+https://github.com/libp2p/js-libp2p-examples.git"
},
"bugs": {
"url": "https://github.com/libp2p/js-libp2p-examples/issues"
},
"type": "module",
"scripts": {
"test": "test-node-example test/*"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^16.0.0",
"@chainsafe/libp2p-yamux": "^7.0.0",
"@libp2p/identify": "^3.0.33",
"@libp2p/mdns": "^11.0.1",
"@libp2p/ping": "^2.0.33",
"@libp2p/tcp": "^10.0.0",
"@libp2p/websockets": "^9.0.0",
"@multiformats/multiaddr": "^12.3.1",
"@nodeutils/defaults-deep": "^1.1.0",
"it-length-prefixed": "^10.0.1",
"it-map": "^3.0.3",
"it-pipe": "^3.0.1",
"libp2p": "^2.0.0",
"p-defer": "^4.0.0",
"uint8arrays": "^5.1.0"
},
"devDependencies": {
"test-ipfs-example": "^1.1.0"
},
"private": true
}

View File

@ -1,204 +0,0 @@
#!/usr/bin/env node
import { createLibp2p } from 'libp2p'
import { tcp } from '@libp2p/tcp'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import { ping } from '@libp2p/ping'
import { identify } from '@libp2p/identify'
import { multiaddr } from '@multiformats/multiaddr'
async function createNode() {
return await createLibp2p({
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0']
},
transports: [
tcp()
],
connectionEncrypters: [
noise()
],
streamMuxers: [
yamux()
],
services: {
// Use ipfs prefix to match py-libp2p example
ping: ping({
protocolPrefix: 'ipfs',
maxInboundStreams: 32,
maxOutboundStreams: 64,
timeout: 30000
}),
identify: identify()
},
connectionManager: {
minConnections: 0,
maxConnections: 100,
dialTimeout: 30000
}
})
}
async function runServer() {
console.log('🚀 Starting js-libp2p ping server...')
const node = await createNode()
await node.start()
console.log('✅ Server started!')
console.log(`📋 Peer ID: ${node.peerId.toString()}`)
console.log('📍 Listening addresses:')
node.getMultiaddrs().forEach(addr => {
console.log(` ${addr.toString()}`)
})
// Listen for connections
node.addEventListener('peer:connect', (evt) => {
console.log(`🔗 Peer connected: ${evt.detail.toString()}`)
})
node.addEventListener('peer:disconnect', (evt) => {
console.log(`❌ Peer disconnected: ${evt.detail.toString()}`)
})
console.log('\n🎧 Server ready for ping requests...')
console.log('Press Ctrl+C to exit')
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('\n🛑 Shutting down...')
await node.stop()
process.exit(0)
})
// Keep alive
while (true) {
await new Promise(resolve => setTimeout(resolve, 1000))
}
}
async function runClient(targetAddr, count = 5) {
console.log('🚀 Starting js-libp2p ping client...')
const node = await createNode()
await node.start()
console.log(`📋 Our Peer ID: ${node.peerId.toString()}`)
console.log(`🎯 Target: ${targetAddr}`)
try {
const ma = multiaddr(targetAddr)
const targetPeerId = ma.getPeerId()
if (!targetPeerId) {
throw new Error('Could not extract peer ID from multiaddr')
}
console.log(`🎯 Target Peer ID: ${targetPeerId}`)
console.log('🔗 Connecting to peer...')
const connection = await node.dial(ma)
console.log('✅ Connection established!')
console.log(`🔗 Connected to: ${connection.remotePeer.toString()}`)
// Add a small delay to let the connection fully establish
await new Promise(resolve => setTimeout(resolve, 1000))
const rtts = []
for (let i = 1; i <= count; i++) {
try {
console.log(`\n🏓 Sending ping ${i}/${count}...`);
console.log('[DEBUG] Attempting to open ping stream with protocol: /ipfs/ping/1.0.0');
const start = Date.now()
const stream = await connection.newStream(['/ipfs/ping/1.0.0']).catch(err => {
console.error(`[ERROR] Failed to open ping stream: ${err.message}`);
throw err;
});
console.log('[DEBUG] Ping stream opened successfully');
const latency = await Promise.race([
node.services.ping.ping(connection.remotePeer),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Ping timeout')), 30000) // Increased timeout
)
]).catch(err => {
console.error(`[ERROR] Ping ${i} error: ${err.message}`);
throw err;
});
const rtt = Date.now() - start;
rtts.push(latency)
console.log(`✅ Ping ${i} successful!`)
console.log(` Reported latency: ${latency}ms`)
console.log(` Measured RTT: ${rtt}ms`)
if (i < count) {
await new Promise(resolve => setTimeout(resolve, 1000))
}
} catch (error) {
console.error(`❌ Ping ${i} failed:`, error.message)
// Try to continue with other pings
}
}
// Stats
if (rtts.length > 0) {
const avg = rtts.reduce((a, b) => a + b, 0) / rtts.length
const min = Math.min(...rtts)
const max = Math.max(...rtts)
console.log(`\n📊 Ping Statistics:`)
console.log(` Packets: Sent=${count}, Received=${rtts.length}, Lost=${count - rtts.length}`)
console.log(` Latency: min=${min}ms, avg=${avg.toFixed(2)}ms, max=${max}ms`)
} else {
console.log(`\n📊 All pings failed (${count} attempts)`)
}
} catch (error) {
console.error('❌ Client error:', error.message)
console.error('Stack:', error.stack)
process.exit(1)
} finally {
await node.stop()
console.log('\n⏹ Client stopped')
}
}
async function main() {
const args = process.argv.slice(2)
if (args.length === 0) {
console.log('Usage:')
console.log(' node ping.js server # Start ping server')
console.log(' node ping.js client <multiaddr> [count] # Ping a peer')
console.log('')
console.log('Examples:')
console.log(' node ping.js server')
console.log(' node ping.js client /ip4/127.0.0.1/tcp/12345/p2p/12D3Ko... 5')
process.exit(1)
}
const mode = args[0]
if (mode === 'server') {
await runServer()
} else if (mode === 'client') {
if (args.length < 2) {
console.error('❌ Client mode requires target multiaddr')
process.exit(1)
}
const targetAddr = args[1]
const count = parseInt(args[2]) || 5
await runClient(targetAddr, count)
} else {
console.error('❌ Invalid mode. Use "server" or "client"')
process.exit(1)
}
}
main().catch(console.error)

View File

@ -1,241 +0,0 @@
#!/usr/bin/env node
import { createLibp2p } from 'libp2p'
import { tcp } from '@libp2p/tcp'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import { ping } from '@libp2p/ping'
import { identify } from '@libp2p/identify'
import { multiaddr } from '@multiformats/multiaddr'
import fs from 'fs'
import path from 'path'
// Create logs directory if it doesn't exist
const logsDir = path.join(process.cwd(), '../logs')
if (!fs.existsSync(logsDir)) {
fs.mkdirSync(logsDir, { recursive: true })
}
// Setup logging
const logFile = path.join(logsDir, 'js_ping_client.log')
const logStream = fs.createWriteStream(logFile, { flags: 'w' })
function log(message) {
const timestamp = new Date().toISOString()
const logLine = `${timestamp} - ${message}\n`
logStream.write(logLine)
console.log(message)
}
async function createNode() {
log('🔧 Creating libp2p node...')
const node = await createLibp2p({
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0'] // Random port
},
transports: [
tcp()
],
connectionEncrypters: [
noise()
],
streamMuxers: [
yamux()
],
services: {
ping: ping({
protocolPrefix: 'ipfs', // Use ipfs prefix to match py-libp2p
maxInboundStreams: 32,
maxOutboundStreams: 64,
timeout: 30000,
runOnTransientConnection: true
}),
identify: identify()
},
connectionManager: {
minConnections: 0,
maxConnections: 100,
dialTimeout: 30000,
maxParallelDials: 10
}
})
log('✅ Node created successfully')
return node
}
async function runClient(targetAddr, count = 5) {
log('🚀 Starting js-libp2p ping client...')
const node = await createNode()
// Add connection event listeners
node.addEventListener('peer:connect', (evt) => {
log(`🔗 Connected to peer: ${evt.detail.toString()}`)
})
node.addEventListener('peer:disconnect', (evt) => {
log(`❌ Disconnected from peer: ${evt.detail.toString()}`)
})
await node.start()
log('✅ Node started')
log(`📋 Our Peer ID: ${node.peerId.toString()}`)
log(`🎯 Target: ${targetAddr}`)
try {
const ma = multiaddr(targetAddr)
const targetPeerId = ma.getPeerId()
if (!targetPeerId) {
throw new Error('Could not extract peer ID from multiaddr')
}
log(`🎯 Target Peer ID: ${targetPeerId}`)
// Parse multiaddr components for debugging
const components = ma.toString().split('/')
log(`📍 Target components: ${components.join(' → ')}`)
log('🔗 Attempting to dial peer...')
const connection = await node.dial(ma)
log('✅ Connection established!')
log(`🔗 Connected to: ${connection.remotePeer.toString()}`)
log(`🔗 Connection status: ${connection.status}`)
log(`🔗 Connection direction: ${connection.direction}`)
// List available protocols
if (connection.remoteAddr) {
log(`🌐 Remote address: ${connection.remoteAddr.toString()}`)
}
// Wait for connection to stabilize
log('⏳ Waiting for connection to stabilize...')
await new Promise(resolve => setTimeout(resolve, 2000))
// Attempt ping sequence
log(`\n🏓 Starting ping sequence (${count} pings)...`)
const rtts = []
for (let i = 1; i <= count; i++) {
try {
log(`\n🏓 Sending ping ${i}/${count}...`)
const start = Date.now()
// Create a more robust ping with better error handling
const pingPromise = node.services.ping.ping(connection.remotePeer)
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Ping timeout (15s)')), 15000)
)
const latency = await Promise.race([pingPromise, timeoutPromise])
const totalRtt = Date.now() - start
rtts.push(latency)
log(`✅ Ping ${i} successful!`)
log(` Reported latency: ${latency}ms`)
log(` Total RTT: ${totalRtt}ms`)
// Wait between pings
if (i < count) {
await new Promise(resolve => setTimeout(resolve, 1000))
}
} catch (error) {
log(`❌ Ping ${i} failed: ${error.message}`)
log(` Error type: ${error.constructor.name}`)
if (error.code) {
log(` Error code: ${error.code}`)
}
// Check if connection is still alive
if (connection.status !== 'open') {
log(`⚠️ Connection status changed to: ${connection.status}`)
break
}
}
}
// Print statistics
if (rtts.length > 0) {
const avg = rtts.reduce((a, b) => a + b, 0) / rtts.length
const min = Math.min(...rtts)
const max = Math.max(...rtts)
const lossRate = ((count - rtts.length) / count * 100).toFixed(1)
log(`\n📊 Ping Statistics:`)
log(` Packets: Sent=${count}, Received=${rtts.length}, Lost=${count - rtts.length}`)
log(` Loss rate: ${lossRate}%`)
log(` Latency: min=${min}ms, avg=${avg.toFixed(2)}ms, max=${max}ms`)
} else {
log(`\n📊 All pings failed (${count} attempts)`)
}
// Close connection gracefully
log('\n🔒 Closing connection...')
await connection.close()
} catch (error) {
log(`❌ Client error: ${error.message}`)
log(` Error type: ${error.constructor.name}`)
if (error.stack) {
log(` Stack trace: ${error.stack}`)
}
process.exit(1)
} finally {
log('🛑 Stopping node...')
await node.stop()
log('⏹️ Client stopped')
logStream.end()
}
}
async function main() {
const args = process.argv.slice(2)
if (args.length === 0) {
console.log('Usage:')
console.log(' node ping-client.js <target-multiaddr> [count]')
console.log('')
console.log('Examples:')
console.log(' node ping-client.js /ip4/127.0.0.1/tcp/8000/p2p/QmExample... 5')
console.log(' node ping-client.js /ip4/127.0.0.1/tcp/8000/p2p/QmExample... 10')
process.exit(1)
}
const targetAddr = args[0]
const count = parseInt(args[1]) || 5
if (count <= 0 || count > 100) {
console.error('❌ Count must be between 1 and 100')
process.exit(1)
}
await runClient(targetAddr, count)
}
// Handle graceful shutdown
process.on('SIGINT', () => {
log('\n👋 Shutting down...')
logStream.end()
process.exit(0)
})
process.on('uncaughtException', (error) => {
log(`💥 Uncaught exception: ${error.message}`)
if (error.stack) {
log(`Stack: ${error.stack}`)
}
logStream.end()
process.exit(1)
})
main().catch((error) => {
log(`💥 Fatal error: ${error.message}`)
if (error.stack) {
log(`Stack: ${error.stack}`)
}
logStream.end()
process.exit(1)
})

View File

@ -1,167 +0,0 @@
#!/usr/bin/env node
import { createLibp2p } from 'libp2p'
import { tcp } from '@libp2p/tcp'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import { ping } from '@libp2p/ping'
import { identify } from '@libp2p/identify'
import fs from 'fs'
import path from 'path'
// Create logs directory if it doesn't exist
const logsDir = path.join(process.cwd(), '../logs')
if (!fs.existsSync(logsDir)) {
fs.mkdirSync(logsDir, { recursive: true })
}
// Setup logging
const logFile = path.join(logsDir, 'js_ping_server.log')
const logStream = fs.createWriteStream(logFile, { flags: 'w' })
function log(message) {
const timestamp = new Date().toISOString()
const logLine = `${timestamp} - ${message}\n`
logStream.write(logLine)
console.log(message)
}
async function createNode(port) {
log('🔧 Creating libp2p node...')
const node = await createLibp2p({
addresses: {
listen: [`/ip4/0.0.0.0/tcp/${port}`]
},
transports: [
tcp()
],
connectionEncrypters: [
noise()
],
streamMuxers: [
yamux()
],
services: {
ping: ping({
protocolPrefix: 'ipfs', // Use ipfs prefix to match py-libp2p
maxInboundStreams: 32,
maxOutboundStreams: 64,
timeout: 30000,
runOnTransientConnection: true
}),
identify: identify()
},
connectionManager: {
minConnections: 0,
maxConnections: 100,
dialTimeout: 30000,
maxParallelDials: 10
}
})
log('✅ Node created successfully')
return node
}
async function runServer(port) {
log('🚀 Starting js-libp2p ping server...')
const node = await createNode(port)
// Add connection event listeners
node.addEventListener('peer:connect', (evt) => {
log(`🔗 New peer connected: ${evt.detail.toString()}`)
})
node.addEventListener('peer:disconnect', (evt) => {
log(`❌ Peer disconnected: ${evt.detail.toString()}`)
})
// Add protocol handler for incoming streams
node.addEventListener('peer:identify', (evt) => {
log(`🔍 Peer identified: ${evt.detail.peerId.toString()}`)
log(` Protocols: ${evt.detail.protocols.join(', ')}`)
log(` Listen addresses: ${evt.detail.listenAddrs.map(addr => addr.toString()).join(', ')}`)
})
await node.start()
log('✅ Node started')
const peerId = node.peerId.toString()
const listenAddrs = node.getMultiaddrs()
log(`📋 Peer ID: ${peerId}`)
log(`🌐 Listen addresses:`)
listenAddrs.forEach(addr => {
log(` ${addr.toString()}`)
})
// Find the main TCP address for easy copy-paste
const tcpAddr = listenAddrs.find(addr =>
addr.toString().includes('/tcp/') &&
!addr.toString().includes('/ws')
)
if (tcpAddr) {
log(`\n🧪 Test with py-libp2p:`)
log(` python ping_client.py ${tcpAddr.toString()}`)
log(`\n🧪 Test with js-libp2p:`)
log(` node ping-client.js ${tcpAddr.toString()}`)
}
log(`\n🏓 Ping service is running with protocol: /ipfs/ping/1.0.0`)
log(`🔐 Security: Noise encryption`)
log(`🚇 Muxer: Yamux stream multiplexing`)
log(`\n⏳ Waiting for connections...`)
log('Press Ctrl+C to exit')
// Keep the server running
return new Promise((resolve, reject) => {
process.on('SIGINT', () => {
log('\n🛑 Shutting down server...')
node.stop().then(() => {
log('⏹️ Server stopped')
logStream.end()
resolve()
}).catch(reject)
})
process.on('uncaughtException', (error) => {
log(`💥 Uncaught exception: ${error.message}`)
if (error.stack) {
log(`Stack: ${error.stack}`)
}
logStream.end()
reject(error)
})
})
}
async function main() {
const args = process.argv.slice(2)
const port = parseInt(args[0]) || 9000
if (port <= 0 || port > 65535) {
console.error('❌ Port must be between 1 and 65535')
process.exit(1)
}
try {
await runServer(port)
} catch (error) {
console.error(`💥 Fatal error: ${error.message}`)
if (error.stack) {
console.error(`Stack: ${error.stack}`)
}
process.exit(1)
}
}
main().catch((error) => {
console.error(`💥 Fatal error: ${error.message}`)
if (error.stack) {
console.error(`Stack: ${error.stack}`)
}
process.exit(1)
})

View File

@ -1,194 +0,0 @@
#!/usr/bin/env pwsh
# run_test.ps1 - libp2p Interoperability Test Runner (PowerShell)
# Tests py-libp2p <-> js-libp2p ping communication
$ErrorActionPreference = "Stop"
# Colors for output
$Red = "`e[31m"
$Green = "`e[32m"
$Yellow = "`e[33m"
$Blue = "`e[34m"
$Cyan = "`e[36m"
$Reset = "`e[0m"
function Write-ColorOutput {
param([string]$Message, [string]$Color = $Reset)
Write-Host "${Color}${Message}${Reset}"
}
Write-ColorOutput "[CHECK] Checking prerequisites..." $Cyan
if (-not (Get-Command python -ErrorAction SilentlyContinue)) {
Write-ColorOutput "[ERROR] Python not found. Install Python 3.7+" $Red
exit 1
}
if (-not (Get-Command node -ErrorAction SilentlyContinue)) {
Write-ColorOutput "[ERROR] Node.js not found. Install Node.js 16+" $Red
exit 1
}
Write-ColorOutput "[CHECK] Checking port 8000..." $Blue
$portCheck = netstat -a -n -o | findstr :8000
if ($portCheck) {
Write-ColorOutput "[ERROR] Port 8000 in use. Free the port." $Red
Write-ColorOutput $portCheck $Yellow
exit 1
}
Write-ColorOutput "[DEBUG] Cleaning up Python processes..." $Blue
Get-Process -Name "python" -ErrorAction SilentlyContinue | Where-Object { $_.CommandLine -like "*ping.py*" } | Stop-Process -Force -ErrorAction SilentlyContinue
Write-ColorOutput "[PYTHON] Starting server on port 8000..." $Yellow
Set-Location -Path "py_node"
$pyLogFile = "py_server_8000.log"
$pyErrLogFile = "py_server_8000.log.err"
$pyDebugLogFile = "ping_debug.log"
if (Test-Path $pyLogFile) { Remove-Item $pyLogFile -Force -ErrorAction SilentlyContinue }
if (Test-Path $pyErrLogFile) { Remove-Item $pyErrLogFile -Force -ErrorAction SilentlyContinue }
if (Test-Path $pyDebugLogFile) { Remove-Item $pyDebugLogFile -Force -ErrorAction SilentlyContinue }
$pyProcess = Start-Process -FilePath "python" -ArgumentList "-u", "ping.py", "server", "--port", "8000" -NoNewWindow -PassThru -RedirectStandardOutput $pyLogFile -RedirectStandardError $pyErrLogFile
Write-ColorOutput "[DEBUG] Python server PID: $($pyProcess.Id)" $Blue
Write-ColorOutput "[DEBUG] Python logs: $((Get-Location).Path)\$pyLogFile, $((Get-Location).Path)\$pyErrLogFile, $((Get-Location).Path)\$pyDebugLogFile" $Blue
$timeoutSeconds = 20
$startTime = Get-Date
$serverStarted = $false
while (((Get-Date) - $startTime).TotalSeconds -lt $timeoutSeconds -and -not $serverStarted) {
if (Test-Path $pyLogFile) {
$content = Get-Content $pyLogFile -Raw -ErrorAction SilentlyContinue
if ($content -match "Server started|Listening") {
$serverStarted = $true
Write-ColorOutput "[OK] Python server started" $Green
}
}
if (Test-Path $pyErrLogFile) {
$errContent = Get-Content $pyErrLogFile -Raw -ErrorAction SilentlyContinue
if ($errContent) {
Write-ColorOutput "[DEBUG] Error log: $errContent" $Yellow
}
}
Start-Sleep -Milliseconds 500
}
if (-not $serverStarted) {
Write-ColorOutput "[ERROR] Python server failed to start" $Red
Write-ColorOutput "[DEBUG] Logs:" $Yellow
if (Test-Path $pyLogFile) { Get-Content $pyLogFile | Write-ColorOutput -Color $Yellow }
if (Test-Path $pyErrLogFile) { Get-Content $pyErrLogFile | Write-ColorOutput -Color $Yellow }
if (Test-Path $pyDebugLogFile) { Get-Content $pyDebugLogFile | Write-ColorOutput -Color $Yellow }
Write-ColorOutput "[DEBUG] Trying foreground run..." $Yellow
python -u ping.py server --port 8000
exit 1
}
# Extract Peer ID
$peerInfo = $null
if (Test-Path $pyLogFile) {
$content = Get-Content $pyLogFile -Raw
$peerIdPattern = "Peer ID:\s*([A-Za-z0-9]+)"
$peerIdMatch = [regex]::Match($content, $peerIdPattern)
if ($peerIdMatch.Success) {
$peerId = $peerIdMatch.Groups[1].Value
$peerInfo = @{
PeerId = $peerId
MultiAddr = "/ip4/127.0.0.1/tcp/8000/p2p/$peerId"
}
Write-ColorOutput "[OK] Peer ID: $peerId" $Cyan
Write-ColorOutput "[OK] MultiAddr: $($peerInfo.MultiAddr)" $Cyan
}
}
if (-not $peerInfo) {
Write-ColorOutput "[ERROR] Could not extract Peer ID" $Red
if (Test-Path $pyLogFile) { Get-Content $pyLogFile | Write-ColorOutput -Color $Yellow }
if (Test-Path $pyErrLogFile) { Get-Content $pyErrLogFile | Write-ColorOutput -Color $Yellow }
if (Test-Path $pyDebugLogFile) { Get-Content $pyDebugLogFile | Write-ColorOutput -Color $Yellow }
Stop-Process -Id $pyProcess.Id -Force -ErrorAction SilentlyContinue
exit 1
}
# Start JavaScript client
Write-ColorOutput "[JAVASCRIPT] Starting client..." $Yellow
Set-Location -Path "../js_node"
$jsLogFile = "test_js_client_to_py_server.log"
$jsErrLogFile = "test_js_client_to_py_server.log.err"
if (Test-Path $jsLogFile) { Remove-Item $jsLogFile -Force -ErrorAction SilentlyContinue }
if (Test-Path $jsErrLogFile) { Remove-Item $jsErrLogFile -Force -ErrorAction SilentlyContinue }
$jsProcess = Start-Process -FilePath "node" -ArgumentList "src/ping.js", "client", $peerInfo.MultiAddr, "3" -NoNewWindow -PassThru -RedirectStandardOutput $jsLogFile -RedirectStandardError $jsErrLogFile
Write-ColorOutput "[DEBUG] JavaScript client PID: $($jsProcess.Id)" $Blue
Write-ColorOutput "[DEBUG] Client logs: $((Get-Location).Path)\$jsLogFile, $((Get-Location).Path)\$jsErrLogFile" $Blue
# Wait for client to complete
$clientTimeout = 10
$clientStart = Get-Date
while (-not $jsProcess.HasExited -and (((Get-Date) - $clientStart).TotalSeconds -lt $clientTimeout)) {
Start-Sleep -Seconds 1
}
if (-not $jsProcess.HasExited) {
Write-ColorOutput "[DEBUG] JavaScript client did not exit, terminating..." $Yellow
Stop-Process -Id $jsProcess.Id -Force -ErrorAction SilentlyContinue
}
Write-ColorOutput "[CHECK] Results..." $Cyan
$success = $false
if (Test-Path $jsLogFile) {
$jsLogContent = Get-Content $jsLogFile -Raw -ErrorAction SilentlyContinue
if ($jsLogContent -match "successful|Ping.*successful") {
$success = $true
Write-ColorOutput "[SUCCESS] Ping test passed" $Green
} else {
Write-ColorOutput "[FAILED] No successful pings" $Red
Write-ColorOutput "[DEBUG] Client log path: $((Get-Location).Path)\$jsLogFile" $Yellow
Write-ColorOutput "Client log:" $Yellow
Write-ColorOutput $jsLogContent $Yellow
if (Test-Path $jsErrLogFile) {
Write-ColorOutput "[DEBUG] Client error log path: $((Get-Location).Path)\$jsErrLogFile" $Yellow
Write-ColorOutput "Client error log:" $Yellow
Get-Content $jsErrLogFile | Write-ColorOutput -Color $Yellow
}
Write-ColorOutput "[DEBUG] Python server log path: $((Get-Location).Path)\..\py_node\$pyLogFile" $Yellow
Write-ColorOutput "Python server log:" $Yellow
if (Test-Path "../py_node/$pyLogFile") {
$pyLogContent = Get-Content "../py_node/$pyLogFile" -Raw -ErrorAction SilentlyContinue
if ($pyLogContent) { Write-ColorOutput $pyLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow }
} else {
Write-ColorOutput "File not found" $Yellow
}
Write-ColorOutput "[DEBUG] Python server error log path: $((Get-Location).Path)\..\py_node\$pyErrLogFile" $Yellow
Write-ColorOutput "Python server error log:" $Yellow
if (Test-Path "../py_node/$pyErrLogFile") {
$pyErrLogContent = Get-Content "../py_node/$pyErrLogFile" -Raw -ErrorAction SilentlyContinue
if ($pyErrLogContent) { Write-ColorOutput $pyErrLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow }
} else {
Write-ColorOutput "File not found" $Yellow
}
Write-ColorOutput "[DEBUG] Python debug log path: $((Get-Location).Path)\..\py_node\$pyDebugLogFile" $Yellow
Write-ColorOutput "Python debug log:" $Yellow
if (Test-Path "../py_node/$pyDebugLogFile") {
$pyDebugLogContent = Get-Content "../py_node/$pyDebugLogFile" -Raw -ErrorAction SilentlyContinue
if ($pyDebugLogContent) { Write-ColorOutput $pyDebugLogContent $Yellow } else { Write-ColorOutput "Empty or inaccessible" $Yellow }
} else {
Write-ColorOutput "File not found" $Yellow
}
}
}
Write-ColorOutput "[CLEANUP] Stopping processes..." $Yellow
Stop-Process -Id $pyProcess.Id -Force -ErrorAction SilentlyContinue
Stop-Process -Id $jsProcess.Id -Force -ErrorAction SilentlyContinue
Set-Location -Path "../"
if ($success) {
Write-ColorOutput "[SUCCESS] Test completed" $Green
exit 0
} else {
Write-ColorOutput "[FAILED] Test failed" $Red
exit 1
}

View File

@ -1,215 +0,0 @@
#!/usr/bin/env bash
# run_test.sh - libp2p Interoperability Test Runner (Bash)
# Tests py-libp2p <-> js-libp2p ping communication
set -e
# Colors for output
RED='\033[31m'
GREEN='\033[32m'
YELLOW='\033[33m'
BLUE='\033[34m'
CYAN='\033[36m'
RESET='\033[0m'
write_color_output() {
local message="$1"
local color="${2:-$RESET}"
echo -e "${color}${message}${RESET}"
}
write_color_output "[CHECK] Checking prerequisites..." "$CYAN"
if ! command -v python3 &> /dev/null && ! command -v python &> /dev/null; then
write_color_output "[ERROR] Python not found. Install Python 3.7+" "$RED"
exit 1
fi
# Use python3 if available, otherwise python
PYTHON_CMD="python3"
if ! command -v python3 &> /dev/null; then
PYTHON_CMD="python"
fi
if ! command -v node &> /dev/null; then
write_color_output "[ERROR] Node.js not found. Install Node.js 16+" "$RED"
exit 1
fi
write_color_output "[CHECK] Checking port 8000..." "$BLUE"
if netstat -tuln 2>/dev/null | grep -q ":8000 " || ss -tuln 2>/dev/null | grep -q ":8000 "; then
write_color_output "[ERROR] Port 8000 in use. Free the port." "$RED"
if command -v netstat &> /dev/null; then
netstat -tuln | grep ":8000 " | write_color_output "$(cat)" "$YELLOW"
elif command -v ss &> /dev/null; then
ss -tuln | grep ":8000 " | write_color_output "$(cat)" "$YELLOW"
fi
exit 1
fi
write_color_output "[DEBUG] Cleaning up Python processes..." "$BLUE"
pkill -f "ping.py" 2>/dev/null || true
write_color_output "[PYTHON] Starting server on port 8000..." "$YELLOW"
cd py_node
PY_LOG_FILE="py_server_8000.log"
PY_ERR_LOG_FILE="py_server_8000.log.err"
PY_DEBUG_LOG_FILE="ping_debug.log"
rm -f "$PY_LOG_FILE" "$PY_ERR_LOG_FILE" "$PY_DEBUG_LOG_FILE"
$PYTHON_CMD -u ping.py server --port 8000 > "$PY_LOG_FILE" 2> "$PY_ERR_LOG_FILE" &
PY_PROCESS_PID=$!
write_color_output "[DEBUG] Python server PID: $PY_PROCESS_PID" "$BLUE"
write_color_output "[DEBUG] Python logs: $(pwd)/$PY_LOG_FILE, $(pwd)/$PY_ERR_LOG_FILE, $(pwd)/$PY_DEBUG_LOG_FILE" "$BLUE"
TIMEOUT_SECONDS=20
START_TIME=$(date +%s)
SERVER_STARTED=false
while [ $(($(date +%s) - START_TIME)) -lt $TIMEOUT_SECONDS ] && [ "$SERVER_STARTED" = false ]; do
if [ -f "$PY_LOG_FILE" ]; then
if grep -q "Server started\|Listening" "$PY_LOG_FILE" 2>/dev/null; then
SERVER_STARTED=true
write_color_output "[OK] Python server started" "$GREEN"
fi
fi
if [ -f "$PY_ERR_LOG_FILE" ] && [ -s "$PY_ERR_LOG_FILE" ]; then
ERR_CONTENT=$(cat "$PY_ERR_LOG_FILE" 2>/dev/null || true)
if [ -n "$ERR_CONTENT" ]; then
write_color_output "[DEBUG] Error log: $ERR_CONTENT" "$YELLOW"
fi
fi
sleep 0.5
done
if [ "$SERVER_STARTED" = false ]; then
write_color_output "[ERROR] Python server failed to start" "$RED"
write_color_output "[DEBUG] Logs:" "$YELLOW"
[ -f "$PY_LOG_FILE" ] && cat "$PY_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
[ -f "$PY_ERR_LOG_FILE" ] && cat "$PY_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
[ -f "$PY_DEBUG_LOG_FILE" ] && cat "$PY_DEBUG_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
write_color_output "[DEBUG] Trying foreground run..." "$YELLOW"
$PYTHON_CMD -u ping.py server --port 8000
exit 1
fi
# Extract Peer ID
PEER_ID=""
MULTI_ADDR=""
if [ -f "$PY_LOG_FILE" ]; then
CONTENT=$(cat "$PY_LOG_FILE" 2>/dev/null || true)
PEER_ID=$(echo "$CONTENT" | grep -oP "Peer ID:\s*\K[A-Za-z0-9]+" || true)
if [ -n "$PEER_ID" ]; then
MULTI_ADDR="/ip4/127.0.0.1/tcp/8000/p2p/$PEER_ID"
write_color_output "[OK] Peer ID: $PEER_ID" "$CYAN"
write_color_output "[OK] MultiAddr: $MULTI_ADDR" "$CYAN"
fi
fi
if [ -z "$PEER_ID" ]; then
write_color_output "[ERROR] Could not extract Peer ID" "$RED"
[ -f "$PY_LOG_FILE" ] && cat "$PY_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
[ -f "$PY_ERR_LOG_FILE" ] && cat "$PY_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
[ -f "$PY_DEBUG_LOG_FILE" ] && cat "$PY_DEBUG_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
kill $PY_PROCESS_PID 2>/dev/null || true
exit 1
fi
# Start JavaScript client
write_color_output "[JAVASCRIPT] Starting client..." "$YELLOW"
cd ../js_node
JS_LOG_FILE="test_js_client_to_py_server.log"
JS_ERR_LOG_FILE="test_js_client_to_py_server.log.err"
rm -f "$JS_LOG_FILE" "$JS_ERR_LOG_FILE"
node src/ping.js client "$MULTI_ADDR" 3 > "$JS_LOG_FILE" 2> "$JS_ERR_LOG_FILE" &
JS_PROCESS_PID=$!
write_color_output "[DEBUG] JavaScript client PID: $JS_PROCESS_PID" "$BLUE"
write_color_output "[DEBUG] Client logs: $(pwd)/$JS_LOG_FILE, $(pwd)/$JS_ERR_LOG_FILE" "$BLUE"
# Wait for client to complete
CLIENT_TIMEOUT=10
CLIENT_START=$(date +%s)
while kill -0 $JS_PROCESS_PID 2>/dev/null && [ $(($(date +%s) - CLIENT_START)) -lt $CLIENT_TIMEOUT ]; do
sleep 1
done
if kill -0 $JS_PROCESS_PID 2>/dev/null; then
write_color_output "[DEBUG] JavaScript client did not exit, terminating..." "$YELLOW"
kill $JS_PROCESS_PID 2>/dev/null || true
fi
write_color_output "[CHECK] Results..." "$CYAN"
SUCCESS=false
if [ -f "$JS_LOG_FILE" ]; then
JS_LOG_CONTENT=$(cat "$JS_LOG_FILE" 2>/dev/null || true)
if echo "$JS_LOG_CONTENT" | grep -q "successful\|Ping.*successful"; then
SUCCESS=true
write_color_output "[SUCCESS] Ping test passed" "$GREEN"
else
write_color_output "[FAILED] No successful pings" "$RED"
write_color_output "[DEBUG] Client log path: $(pwd)/$JS_LOG_FILE" "$YELLOW"
write_color_output "Client log:" "$YELLOW"
write_color_output "$JS_LOG_CONTENT" "$YELLOW"
if [ -f "$JS_ERR_LOG_FILE" ]; then
write_color_output "[DEBUG] Client error log path: $(pwd)/$JS_ERR_LOG_FILE" "$YELLOW"
write_color_output "Client error log:" "$YELLOW"
cat "$JS_ERR_LOG_FILE" | while read line; do write_color_output "$line" "$YELLOW"; done
fi
write_color_output "[DEBUG] Python server log path: $(pwd)/../py_node/$PY_LOG_FILE" "$YELLOW"
write_color_output "Python server log:" "$YELLOW"
if [ -f "../py_node/$PY_LOG_FILE" ]; then
PY_LOG_CONTENT=$(cat "../py_node/$PY_LOG_FILE" 2>/dev/null || true)
if [ -n "$PY_LOG_CONTENT" ]; then
write_color_output "$PY_LOG_CONTENT" "$YELLOW"
else
write_color_output "Empty or inaccessible" "$YELLOW"
fi
else
write_color_output "File not found" "$YELLOW"
fi
write_color_output "[DEBUG] Python server error log path: $(pwd)/../py_node/$PY_ERR_LOG_FILE" "$YELLOW"
write_color_output "Python server error log:" "$YELLOW"
if [ -f "../py_node/$PY_ERR_LOG_FILE" ]; then
PY_ERR_LOG_CONTENT=$(cat "../py_node/$PY_ERR_LOG_FILE" 2>/dev/null || true)
if [ -n "$PY_ERR_LOG_CONTENT" ]; then
write_color_output "$PY_ERR_LOG_CONTENT" "$YELLOW"
else
write_color_output "Empty or inaccessible" "$YELLOW"
fi
else
write_color_output "File not found" "$YELLOW"
fi
write_color_output "[DEBUG] Python debug log path: $(pwd)/../py_node/$PY_DEBUG_LOG_FILE" "$YELLOW"
write_color_output "Python debug log:" "$YELLOW"
if [ -f "../py_node/$PY_DEBUG_LOG_FILE" ]; then
PY_DEBUG_LOG_CONTENT=$(cat "../py_node/$PY_DEBUG_LOG_FILE" 2>/dev/null || true)
if [ -n "$PY_DEBUG_LOG_CONTENT" ]; then
write_color_output "$PY_DEBUG_LOG_CONTENT" "$YELLOW"
else
write_color_output "Empty or inaccessible" "$YELLOW"
fi
else
write_color_output "File not found" "$YELLOW"
fi
fi
fi
write_color_output "[CLEANUP] Stopping processes..." "$YELLOW"
kill $PY_PROCESS_PID 2>/dev/null || true
kill $JS_PROCESS_PID 2>/dev/null || true
cd ../
if [ "$SUCCESS" = true ]; then
write_color_output "[SUCCESS] Test completed" "$GREEN"
exit 0
else
write_color_output "[FAILED] Test failed" "$RED"
exit 1
fi

View File

@ -0,0 +1,5 @@
def test_js_libp2p_placeholder():
"""
Placeholder test for js-libp2p interop tests.
"""
assert True, "Placeholder test for js-libp2p interop tests"